成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

理解Python asyncio內部實現機制

開發(fā) 后端
本文需要提前了解 Python 的 yeild from 語法,不了解的話,可以看看 之前關于 Generator 的文章 ;另外,最好對 future/promise 的概念有一定了解。文中不會介紹如何使用 asyncio 及協(xié)程,并且文中給出的代碼不一定能實際運行(不然代碼量太大)。

協(xié)程 (coroutine) 幾乎是 Python 里最為復雜的特性之一了,這篇文章我們來說一說 asyncio 的內部實現機制,借此來理解一門語言要支持協(xié)程需要做的工作。

本文需要提前了解 Python 的 yeild from 語法,不了解的話,可以看看 之前關于 Generator 的文章 ;另外,***對 future/promise 的概念有一定了解。文中不會介紹如何使用 asyncio 及協(xié)程,并且文中給出的代碼不一定能實際運行(不然代碼量太大)。

多線程與協(xié)程

CPU 的執(zhí)行是順序的,線程是操作系統(tǒng)提供的一種機制,允許我們在操作系統(tǒng)的層面上實現“并行”。而協(xié)程則可以認為是應用程序提供的一種機制(用戶或庫來完成),允許我們在應用程序的層面上實現“并行”。

由于本質上程序是順序執(zhí)行的,要實現這種“并行”的假像,我們需要一種機制,來“暫停”當前的執(zhí)行流,并在之后“恢復”之前的執(zhí)行流。這在操作系統(tǒng)及多線程/多進程中稱為“上下文切換” (context switch)。其中“上下文”記錄了某個線程執(zhí)行的狀態(tài),包括線程里用到的各個變量,線程的調用棧等。而“切換”指的就是保存某個線程當前的運行狀態(tài),之后再從之前的狀態(tài)中恢復。只不過線程相關的工作是由操作系統(tǒng)完成,而協(xié)程則是由應用程序自己來完成。

與線程不同的時,協(xié)程完成的功能通常較小,所以會有需求將不同的協(xié)程串起來,我們暫時稱它為協(xié)程鏈 (coroutine chain)。

那么,與線程類似,要實現一個協(xié)程的庫,我們需要這幾樣東西:

  1. 事件循環(huán) (event loop)。一方面,它類似于 CPU ,順序執(zhí)行協(xié)程的代碼;另一方面,它相當于操作系統(tǒng),完成協(xié)程的調度,即一個協(xié)程“暫停”時,決定接下來執(zhí)行哪個協(xié)程。
  2. 上下文的表示。在 Python 中,我們使用 Python 本身支持的生成器 Generator 來代表基本的上下文,但協(xié)程鏈是如何工作的呢?
  3. 上下文的切換。最基礎的切換也是通過 Python 生成器的 yeild 加強版語法來完成的,但我們還要考慮協(xié)程鏈的情況。

Event Loop

首先,因為協(xié)程是一種能暫停的函數,那么它暫停是為了什么?一般是等待某個事件,比如說某個連接建立了;某個 socket 接收到數據了;某個計時器歸零了等。而這些事件應用程序只能通過輪詢的方式得知是否完成,但是操作系統(tǒng)(所有現代的操作系統(tǒng))可以提供一些中斷的方式通知應用程序,如 select , epoll , kqueue 等等。

那么有了操作系統(tǒng)的支持,我們就可以手寫這樣的循環(huán)(偽代碼):

 

  1. while True 
  2.  happend = poll_events(events_to_listen, timeout) 
  3.  process_events(happend) 

***個問題是:如何注冊我們想監(jiān)聽的事件?很簡單,把事件加到 events_to_listen 里就可以了。第二個問題,可以監(jiān)聽什么事件?由于 process_events 需要操作系統(tǒng)的支持,那么我們想監(jiān)聽的事件是需要操作系統(tǒng)支持才行的,一般操作系統(tǒng)支持網絡 I/O 的文件描述符 (file descriptor)。

接下來,當事件發(fā)生時,我們要指定做一些事,一般稱為回調 (callback)。也就是說我們需要告訴 event loop 一個 事件:回調 的對應關系。現在我們把 event loop 用類表示:

 

  1. class EventLoop: 
  2.  def __init__(self): 
  3.  self.events_to_listen = [] 
  4.  self.callbacks = {} 
  5.  self.timeout = None 
  6.  
  7.  def register_event(self, event, callback): 
  8.  self.events_to_listen.append(event) 
  9.  self.callbacks[event] = callback 
  10.  
  11.  def unregister_event(self, event): 
  12.  self.events_to_listen.remove(evenrt) 
  13.  del self.callbacks[event] 
  14.  
  15.  def _process_events(self, events): 
  16.  for event in events: 
  17.  self.callbacks[event](event) 
  18.  
  19.  def start_loop(self): 
  20.  while True
  21.  events_happend = poll_events(self.events_to_listen, timeout) 
  22.  self._process_events(events_happend) 
  23.  
  24. loop = EventLoop() 
  25. loop.register_event(fd, callback) 
  26. loop.start_loop() 

register_event 用到注冊 事件: 回調 的關系, start_loop 用于開啟事件循環(huán)。

現在,你不是想說,之前提到過事件也包括“某個計時器歸零了”,但 poll_events 只支持網絡 I/O 的文件描述符,計時器又要如何實現呢?一般 poll_events 函數是支持 timeout 參數表示等待的時間。因此,可以修改 start_loop :

 

  1. def call_later(self, delay, callback): 
  2.  self.call_at(now() + delay, callback) 
  3.  
  4. def call_at(self, when, callback): 
  5.  self.timeout_callbacks[when] = callback 
  6.  
  7. def start_loop(self): 
  8.  while True
  9.  timeout = min(self.timeout_callbacks.keys()) - now() 
  10.  events_happend = poll_events(self.events_to_listen, timeout) 
  11.  if not empty(events_happend): 
  12.  self._process_events(events_happend) 
  13.  self._process_timeout_events() 
  14.  
  15. def _process_timeout_events(self): 
  16.  time_now = now() 
  17.  for time, callback in self.timeout_callbacks.iteritems(): 
  18.  if time < time_now: 
  19.  callback() 
  20.  del self.timeout_callbacks[time

這里 poll_events 之前,會去計算所有計時器事件最少需要等待的時間,這個時間內即使沒有事件發(fā)生, poll_events 也會退出,以便觸發(fā)計時器事件。 _process_timeout_events 函數的作用是對比當前時間與計時器的目標執(zhí)行時間,如果目標執(zhí)行時間已經到達,則執(zhí)行相應的回調函數。

于是一個簡單的 event loop 就完成了。可以看到,它是異步操作的基礎:允許等待某個事件的發(fā)生并執(zhí)行相應的操作。同時,它還是個簡單的調度器,能順序地執(zhí)行發(fā)生事件的回調函數。

Callback vs Promise vs await

好了,現在我們有了 event loop ,它允許我們?yōu)槭录曰卣{函數。現在假設我們要順序調用幾個 API, 用阻塞式編程如下:

 

  1. result1 = api1() 
  2. result2 = api2(result1) 
  3. result3 = api3(result2) 
  4. ... 

如果這幾個 API 都是異步的,用 event loop + callback 怎么實現?

 

  1. # Implementation for api 
  2. def api1(callback): 
  3.  def callback_for_api1(): 
  4.  result1 = some_calculation_1() 
  5.  event_loop.unregister_event(event1) 
  6.  return callback(result1) 
  7.  event_loop.register_event(event1, callback_for_api1) 
  8.  
  9. def api2(result, callback): 
  10.  def callback_for_api2(): 
  11.  result2 = some_calculation_2(result) 
  12.  event_loop.unregister_event(event2) 
  13.  return callback(result2) 
  14.  event_loop.register_event(event2, callback_for_api2) 
  15. ... 
  16.  
  17. # Our code 
  18. global result 
  19. def api1_callback(result1): 
  20.  def api2_callback(result2): 
  21.  def api3_callback(result3): 
  22.  global result 
  23.  result = some_calculation(result3) 
  24.  return api3(result2, api3_callback) 
  25.  return api2(result1, api2_callback) 
  26. api1(api1_callback) 

這里 api1 api2 的實現由于需要用 event loop 來注冊注銷某些事件,所以顯得特別復雜,這里我們可以先忽略它們的實現,但是看***一段“用戶代碼”是不是極其復雜?隨著操作的復雜性增加,回調函數的嵌套會越變越深。如果你熟悉Javascript,你應該聽過“callback hell”的大名。回調函數的方式為什么不好?最重要的就是它違反了我們寫代碼的直覺,我們都習慣順序執(zhí)行的代碼。

例如上例中,我們期待的是 api1 先執(zhí)行,我們再用它的結果做點什么,但采用回調的方式,我們就需要在寫 api1 的回調時,就去思考我們想用它的結果做些什么操作。在這個例子里,我們需要調用 api2 及 api3 ,這些嵌套的思考又得一遍遍重復下去。最終代碼非常難以理解。

因此 Javascript 提出了 Promise ,所謂的 promise 像是一個占位符,它表示一個運算現在還未完成,但我保證它會做完的;你可以指定它完成的時候做些其它的事。下面我們嘗試用這個思路去做一些改進(Python 沒有原生的 promise 支持):

 

  1. class Promise(): 
  2.  def __init__(self): 
  3.  pass 
  4.  def then(self, callback_that_return_promise): 
  5.  self._then = callback_that_return_promise 
  6.  def set_result(self, result): 
  7.  return self._then(result) 
  8.  
  9. # Implementation for api 
  10. def api1(): 
  11.  promise = Promise() 
  12.  def callback_for_api1(): 
  13.  promise.set_result(some_calculation_1()) 
  14.  event_loop.unregister_event(event1) 
  15.  event_loop.register_event(event1, callback_for_api1) 
  16.  return promise 
  17.  
  18. def api2(result): 
  19.  promise = Promise() 
  20.  def callback_for_api2(): 
  21.  promise.set_result((some_calculation_2(result)) 
  22.  event_loop.unregister_event(event2) 
  23.  return callback(result2) 
  24.  return promise 
  25. ... 
  26.  
  27. # Our code 
  28. global result 
  29. promise = api1().then(lambda result1: return api2(result1)) 
  30.  .then(lambda result2: return api3(result3)) 
  31.  .then(lambda result3: global result; result = result3) 
  32.  
  33. promise.wait_till_complete() 

這里我們簡單實現了一個我們自己的 Promise 類,當它的 set_result 方法被調用時,Promise 會去執(zhí)行之前用 .then 注冊的回調函數,該回調函數將執(zhí)行另一些操作并返回一個新的 Promise。也因此,我們可以不斷地調用 then 將不同的 Promise 組合起來。可以看到,現在我們的代碼就是線性的了!

然而故事還沒有結束,人們依舊不滿于 Promise 的寫法和用法,又提出了 async/await 的寫法。在 Python 中,上面的代碼用 async/await 重寫如下:

 

  1. result1 = await api1() 
  2. result2 = await api2(result1) 
  3. result3 = await api3(result2) 

 

是不是簡單明了?它的效果和我們前幾個例子是等價的,但它的寫法與我們初開始的阻塞版本幾乎一致。這樣能把異步與同步的編碼在結構上盡量統(tǒng)一起來。

這里我不禁想問,為什么大家沒有一開始就想到 async/await 的方式呢?我的一個假設是 async/await 是需要語言本身的支持的,而寫編譯器/解釋器的專家不一定有編寫應用的豐富經驗,是很可能從一開始就拒絕這樣的修改的。因此程序員們只能自己用庫的形式添加支持了。當然這純粹是猜測,只想感嘆下不同領域的隔閡。

總而言之,有了 event loop 我們就能通過回調函數來完成異步編程,但這種方式非常不友好,因此人們又提出了類似 Promise 的思想,讓我們能順序編寫異步代碼,***通過語言對 async/await 的語法支持,異步與同步代碼的結構就幾乎達到統(tǒng)一。這種統(tǒng)一有很重要的意義,它使我們能以同步的思維去理解異步的代碼而不受回調方式的代碼結構的影響。

而這一切都是為了將不同的異步函數“鏈接”起來,只不過是 async/await 的方式最為方便。對比線程,操作系統(tǒng)是沒有提供方式將不同的線程鏈接起來的,因此這種將不同的協(xié)程鏈接起來的工具是協(xié)程比線程好的一個方面。

上下文切換(恢復控制流)

前面提到過,如果某個協(xié)程在等待某些資源,我們需要暫停它的執(zhí)行,在 event loop 中注冊這個事件,以便當事件發(fā)生的時候,能再次喚醒該協(xié)程的執(zhí)行。

這里舉一個 Python 官方文檔 的例子:

 

  1. import asyncio  
  2. async def compute(x, y): 
  3.  print("Compute %s + %s ..." % (x, y)) 
  4.  await asyncio.sleep(1.0) 
  5.  return x + y  
  6. async def print_sum(x, y): 
  7.  result = await compute(x, y) 
  8.  print("%s + %s = %s" % (x, y, result))  
  9. loop = asyncio.get_event_loop() 
  10. loop.run_until_complete(print_sum(1, 2)) 
  11. loop.close() 

上面的代碼的執(zhí)行流程是:

理解Python asyncio內部實現機制

這里有兩個問題:

  1. 誰向 event loop 注冊了事件(及回調)?
  2. 程序從哪里恢復執(zhí)行?

程序從 print_sum 開始執(zhí)行,執(zhí)行到 asyncio.sleep 時需要暫停,那么肯定是在 sleep 中向 event loop 注冊了計時器事件。那們問題來了,當程序恢復執(zhí)行時,它應該從哪里恢復呢?

從上面的流程圖中,可以看見它是從 print_sum 開始恢復,但這樣的話, sleep 注冊事件時就需要知道是誰(即 print_sum )調用了它,這樣才能在 callback 中指定從 print_sum 開始恢復執(zhí)行!

但如果不是從 print_sum 恢復執(zhí)行,那么一樣的,從 sleep 恢復執(zhí)行后, sleep 需要知道接下來返回到什么位置(即 compute 函數中的 await 位置), asyncio 又是如何做到這點的?

那么事實(代碼實現)是怎樣的呢?

當我們把一個協(xié)程用 loop.run_until_complete (或其它相似方法)執(zhí)行時, event loop 會把它包裹成一個 Task 。當協(xié)程開始執(zhí)行或被喚醒時,Task 的 _step 方法會被調用, 這里 它會調用 coro.send(None) 來執(zhí)行/喚醒它包裹著的協(xié)程。

 

  1. if exc is None: 
  2.  # We use the `send` method directly, because coroutines 
  3.  # don't have `__iter__` and `__next__` methods. 
  4.  result = coro.send(None) 
  5. else
  6.  result = coro.throw(exc) 

注意到這里將 coro.send 的結果賦值給了 result ,那么它會返回什么呢?在我們這個例子中,協(xié)程鏈的最末尾是 asyncio.sleep ,我們看看 它的實現 :

 

  1. @coroutine 
  2. def sleep(delay, result=None, *, loop=None): 
  3.  """Coroutine that completes after a given time (in seconds).""" 
  4.  if delay == 0: 
  5.  yield 
  6.  return result 
  7.  
  8.  if loop is None: 
  9.  loop = events.get_event_loop() 
  10.  future = loop.create_future() 
  11.  h = future._loop.call_later(delay, 
  12.  futures._set_result_unless_cancelled, 
  13.  future, result) 
  14.  try: 
  15.  return (yield from future) 
  16.  finally: 
  17.  h.cancel() 

這里它創(chuàng)建了一個 future 并為它注冊了事件( call_later ),最終調用了 yield from future 返回。它代表什么呢?我們已經假設你明白 yield from 的使用方法,這代表 Python 會首先調用 future.__iter__ 函數,我們來看看 它長什么樣 :

 

  1. def __iter__(self): 
  2.  if not self.done(): 
  3.  self._asyncio_future_blocking = True 
  4.  yield self # This tells Task to wait for completion. 
  5.  assert self.done(), "yield from wasn't used with future" 
  6.  return self.result() # May raise too. 
  7.  
  8. if compat.PY35: 
  9.  __await__ = __iter__ # make compatible with 'await' expression 

注意這里的 yield self !也就是說 future 在***次執(zhí)行到這里時,會暫停執(zhí)行并返回它自己,由于 coroutine 中使用的都是 yield from/await (它們在接收的參數上有區(qū)別,但在本文的討論中沒有區(qū)別),因此這個值會一直向上傳遞,到 Task._step 函數的 result = coro.send(None) 這里,那我們來看看 Task 對 result 做了什么,重要的是 這一句 :

  1. result.add_done_callback(self._wakeup) 

也就是說 task( print_sum ) 得到了最內層暫停的 sleep 生成的 future 并為該 future 注冊了一個回調,使得在 future.set_result 被調用時, task._wakeup 會被調用。這部分的邏輯可以看 這里 。

我們再回過頭來看看 future.set_result 會在什么時候被調用,在 asyncio.sleep 函數里,我們?yōu)?event loop 注冊了一個回調函數:

 

  1. h = future._loop.call_later(delay, 
  2.  futures._set_result_unless_cancelled, 
  3.  future, result) 

那么這個 _set_result_unless_cancelled 是這樣的:

 

  1. def _set_result_unless_cancelled(fut, result): 
  2.  """Helper setting the result only if the future was not cancelled.""" 
  3.  if fut.cancelled(): 
  4.  return 
  5.  fut.set_result(result) 

因此,所有的流程應該是這樣的:

理解Python asyncio內部實現機制

小結

那么 asyncio 做為一個庫,做了什么,沒做什么?

  1. 控制流的暫停與恢復,這是通過 Python 內部的 Generator(生成器)相關的功能實現的。
  2. 協(xié)程鏈,即把不同協(xié)程鏈鏈接在一起的機制。依舊是通過 Python 的內置支持,即 async/await,或者說是生成器的 yield from。
  3. Event Loop,這個是 asyncio 實現的。它決定了我們能對什么事件進行異步操作,目前只支持定時器與網絡 IO 的異步。
  4. 協(xié)程鏈的控制流恢復,即內部的協(xié)程暫停了,恢復時卻需要從最外層的協(xié)程開始恢復。這是 asyncio 實現的內容。
  5. 其它的庫支持,這里指的是像 asyncio.sleep() 這種協(xié)程鏈的最內層的協(xié)程,因此我們一般不希望自己去調用 event loop 注冊/注銷事件。

因此,如果沒有 asyncio,我們要實現相應的功能,主要的內容就是 Event Loop 及控制流的恢復,***再加上一些好用的協(xié)程函數。

責任編輯:未麗燕 來源: 三點水
相關推薦

2010-09-26 16:14:22

JVM實現機制JVM

2020-10-14 09:11:44

IO 多路復用實現機

2017-09-05 10:20:30

PyTorchTensorPython

2023-12-14 10:35:22

虛擬機程序

2017-02-14 13:08:45

2014-03-31 10:51:40

pythonasyncio

2017-08-02 15:00:12

PythonAsyncio異步編程

2020-02-21 08:00:00

Pythonasyncio編程語言

2017-05-05 08:44:24

PythonAsyncio異步編程

2014-06-13 11:08:52

Redis主鍵失效

2014-06-17 10:27:39

Redis緩存

2013-08-28 10:11:37

RedisRedis主鍵失效NoSQL

2022-06-27 11:04:24

RocketMQ順序消息

2022-07-18 21:53:46

RocketMQ廣播消息

2025-04-07 11:10:00

Python列表開發(fā)

2017-05-22 15:42:39

Python字典哈希表

2017-05-24 15:50:08

PythonCPython

2023-06-07 15:25:19

Kafka版本日志

2010-06-01 16:43:07

Cassandra內部

2012-06-02 00:55:44

HibernateflushJava
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 一区二区三区四区电影视频在线观看 | 日本欧美国产在线观看 | 久久狠狠 | 日韩欧美国产精品 | 农村妇女毛片精品久久久 | 亚洲一级二级三级 | 日韩福利在线 | 亚洲午夜电影 | 美女亚洲一区 | 一区二区亚洲 | 日本一区二区高清不卡 | 亚洲国产一区二区三区 | 一区日韩| 91av免费看 | 不卡一二区 | 精品久久久久久久久久久久 | www.黄色在线观看 | 久久亚洲国产 | 欧美a∨| 免费看色| 精品在线一区 | 国产精品久久久久一区二区三区 | 成人精品久久日伦片大全免费 | 亚洲毛片| 鸳鸯谱在线观看高清 | av在线播放不卡 | 欧美黄色一级毛片 | 免费黄色片在线观看 | 91久久精品一区二区二区 | 欧美一区二区在线观看 | 亚洲国产精品久久久久婷婷老年 | 国产免费一区二区三区 | 亚洲社区在线 | 国产欧美日韩综合精品一区二区 | 久久看精品 | 欧美综合一区二区 | 色婷婷亚洲 | 欧美久久一区 | 天天操天天怕 | 日韩在线免费 | 国产欧美视频一区二区三区 |