Python 異步協程:從 async/await 到 asyncio 再到 async with
在 Python 3.8 以后的版本中,異步編程變得越來越重要。本文將系統介紹 Python 標準庫中的異步編程工具,帶領大家掌握 async/await 語法和 asyncio 的使用。
從一個簡單的場景開始
假設我們在處理一些耗時的 I/O 操作,比如讀取多個文件或處理多個數據。為了模擬這種場景,我們先用 time.sleep() 來代表耗時操作:
import time
import random
def process_item(item):
# 模擬耗時操作
print(f"處理中:{item}")
process_time = random.uniform(0.5, 2.0)
time.sleep(process_time)
return f"處理完成:{item},耗時 {process_time:.2f} 秒"
def process_all_items():
items = ["任務A", "任務B", "任務C", "任務D"]
results = []
for item in items:
result = process_item(item)
results.append(result)
return results
if __name__ == "__main__":
start = time.time()
results = process_all_items()
end = time.time()
print("\n".join(results))
print(f"總耗時:{end - start:.2f} 秒")
處理中:任務A
處理中:任務B
處理中:任務C
處理中:任務D
處理完成:任務A,耗時 1.97 秒
處理完成:任務B,耗時 1.28 秒
處理完成:任務C,耗時 0.66 秒
處理完成:任務D,耗時 1.80 秒
總耗時:5.72 秒
這段代碼的問題很明顯:每個任務都必須等待前一個任務完成才能開始。如果有4個任務,每個任務平均耗時1秒,那么總耗時就接近4秒。
認識 async/await
Python 引入了 async/await 語法來支持異步編程。當我們在函數定義前加上 async 關鍵字時,這個函數就變成了一個"協程"(coroutine)。而 await 關鍵字則用于等待一個協程完成。讓我們改寫上面的代碼:
import asyncio
import random
import time
async def process_item(item):
print(f"處理中:{item}")
# async 定義的函數變成了協程
process_time = random.uniform(0.5, 2.0)
# time.sleep() 換成 asyncio.sleep()
await asyncio.sleep(process_time) # await 等待異步操作完成
return f"處理完成:{item},耗時 {process_time:.2f} 秒"
async def process_all_items():
items = ["任務A", "任務B", "任務C", "任務D"]
# 創建任務列表
tasks = [
asyncio.create_task(process_item(item))
for item in items
]
print("開始處理")
results = await asyncio.gather(*tasks)
return results
async def main():
start = time.time()
results = await process_all_items()
end = time.time()
print("\n".join(results))
print(f"總耗時:{end - start:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
開始處理
處理中:任務A
處理中:任務B
處理中:任務C
處理中:任務D
處理完成:任務A,耗時 1.97 秒
處理完成:任務B,耗時 0.80 秒
處理完成:任務C,耗時 0.83 秒
處理完成:任務D,耗時 1.46 秒
總耗時:1.97 秒
讓我們詳細解釋這段代碼的執行過程:
- 當函數被 async 關鍵字修飾后,調用該函數不會直接執行函數體,而是返回一個協程對象
- await 關鍵字只能在 async 函數內使用,它表示"等待這個操作完成后再繼續"
- asyncio.create_task() 將協程包裝成一個任務,該任務會被事件循環調度執行
- asyncio.gather() 并發運行多個任務,并等待它們全部完成
- asyncio.run() 創建事件循環,運行 main() 協程,直到它完成
使用 asyncio.wait_for 添加超時控制
在實際應用中,我們往往需要為異步操作設置超時時間:
import asyncio
import random
import time
async def process_item(item):
process_time = random.uniform(0.5, 2.0)
try:
# 設置1秒超時
await asyncio.wait_for(
asyncio.sleep(process_time),
timeout=1.0
)
return f"處理完成:{item},耗時 {process_time:.2f} 秒"
except asyncio.TimeoutError:
return f"處理超時:{item}"
async def main():
items = ["任務A", "任務B", "任務C", "任務D"]
tasks = [
asyncio.create_task(process_item(item))
for item in items
]
start = time.time()
results = await asyncio.gather(*tasks, return_exceptions=True)
end = time.time()
print("\n".join(results))
print(f"總耗時:{end - start:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
處理超時:任務A
處理完成:任務B,耗時 0.94 秒
處理超時:任務C
處理完成:任務D,耗時 0.78 秒
總耗時:1.00 秒
使用異步上下文管理器
Python 中的 with 語句可以用于資源管理,類似地,異步編程中我們可以使用 async with 。一個類要支持異步上下文管理,需要實現 __aenter__ 和 __aexit__ 方法:
import asyncio
import random
class AsyncResource:
async def __aenter__(self):
# 異步初始化資源
print("正在初始化資源...")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 異步清理資源
print("正在清理資源...")
await asyncio.sleep(0.1)
async def process(self, item):
# 異步處理任務
print(f"正在處理任務:{item}")
process_time = random.uniform(0.5, 2.0)
await asyncio.sleep(process_time)
return f"處理完成:{item},耗時 {process_time:.2f} 秒"
async def main():
items = ["任務A", "任務B", "任務C"]
async with AsyncResource() as resource:
tasks = [
asyncio.create_task(resource.process(item))
for item in items
]
results = await asyncio.gather(*tasks)
print("\n".join(results))
if __name__ == "__main__":
asyncio.run(main())
正在初始化資源...
正在處理任務:任務A
正在處理任務:任務B
正在處理任務:任務C
正在清理資源...
處理完成:任務A,耗時 1.31 秒
處理完成:任務B,耗時 0.77 秒
處理完成:任務C,耗時 0.84 秒
使用事件循環執行阻塞操作 run_in_executor
在異步編程中,我們可能會遇到一些無法避免的阻塞操作(比如調用傳統的同步API)。這時,asyncio.get_running_loop() 和 run_in_executor 就顯得特別重要:
import asyncio
import time
import requests # 一個同步的HTTP客戶端庫
async def blocking_operation():
# 獲取當前事件循環
loop = asyncio.get_running_loop()
# 在線程池中執行阻塞操作
result = await loop.run_in_executor(
None, # 使用默認的線程池執行器
requests.get, # 要執行的阻塞函數
'http://httpbin.org/delay/1' # 函數參數
)
return result.status_code
async def non_blocking_operation():
await asyncio.sleep(1)
return "非阻塞操作完成"
async def main():
# 同時執行阻塞和非阻塞操作
tasks = [
asyncio.create_task(blocking_operation()),
asyncio.create_task(non_blocking_operation())
]
start = time.time()
results = await asyncio.gather(*tasks)
end = time.time()
print(f"操作結果:{results}")
print(f"總耗時:{end - start:.2f} 秒")
if __name__ == "__main__":
asyncio.run(main())
輸出:
操作結果:[200, '非阻塞操作完成']
總耗時:1.99 秒
這個例子展示了如何在異步程序中優雅地處理同步操作。如果不使用 run_in_executor,阻塞操作會阻塞整個事件循環,導致其他任務無法執行:
- requests.get() 是同步操作,會阻塞當前線程
- 事件循環運行在主線程上
- 如果直接在協程中調用 requests.get() ,整個事件循環都會被阻塞
- 其他任務無法在這期間執行
- run_in_executor 會將阻塞操作放到另一個線程中執行
- 主線程的事件循環可以繼續處理其他任務
- 當線程池中的操作完成時,結果會被返回給事件循環
最佳實踐是:
- 盡量使用原生支持異步的庫(如 aiohttp)
- 如果必須使用同步庫,就用 run_in_executor
- 對于 CPU 密集型任務也可以用 run_in_executor 放到進程池中執行
任務取消:優雅地終止異步操作
有時我們需要取消正在執行的異步任務,比如用戶中斷操作或超時處理:
import asyncio
import random
async def long_operation(name):
try:
print(f"{name} 開始執行")
while True: # 模擬一個持續運行的操作
await asyncio.sleep(0.5)
print(f"{name} 正在執行...")
except asyncio.CancelledError:
print(f"{name} 被取消了")
raise # 重要:繼續傳播取消信號
async def main():
# 創建三個任務
task1 = asyncio.create_task(long_operation("任務1"))
task2 = asyncio.create_task(long_operation("任務2"))
task3 = asyncio.create_task(long_operation("任務3"))
# 等待1秒后取消task1
await asyncio.sleep(1)
task1.cancel()
# 等待2秒后取消其余任務
await asyncio.sleep(1)
task2.cancel()
task3.cancel()
try:
# 等待所有任務完成或被取消
await asyncio.gather(task1, task2, task3, return_exceptions=True)
except asyncio.CancelledError:
print("某個任務被取消了")
if __name__ == "__main__":
asyncio.run(main())
輸出:
任務1 開始執行
任務2 開始執行
任務3 開始執行
任務1 正在執行...
任務2 正在執行...
任務3 正在執行...
任務1 被取消了
任務2 正在執行...
任務3 正在執行...
任務2 正在執行...
任務3 正在執行...
任務2 被取消了
任務3 被取消了
這個例子展示了如何正確處理任務取消:
- 任務可以在執行過程中被取消
- 被取消的任務會拋出 CancelledError
- 我們應該適當處理取消信號,確保資源被正確清理
深入理解協程:為什么需要 async/await?
協程(Coroutine)是一種特殊的函數,它可以在執行過程中暫停,并在之后從暫停的地方繼續執行。當我們使用 async 定義一個函數時,我們實際上是在定義一個協程:
import asyncio
# 這是一個普通函數
def normal_function():
return "Hello"
# 這是一個協程
async def coroutine_function():
await asyncio.sleep(1)
return "Hello"
# 讓我們看看它們的區別
print(normal_function) # <function normal_function at 0x1052cc040>
print(coroutine_function) # <function coroutine_function at 0x1054b9790>
# 調用它們的結果不同
print(normal_function()) # 直接返回: "Hello"
print(coroutine_function()) # RuntimeWarning: coroutine 'coroutine_function' was never awaited
# <coroutine object coroutine_function at 0x105962e40>
await 如何與事件循環協作
協程(Coroutine)的核心在于它可以在執行過程中主動交出控制權,讓其他代碼有機會執行。讓我們通過一個詳細的例子來理解這個過程:
import asyncio
async def task1():
print("任務1:開始")
print("任務1:準備休眠")
await asyncio.sleep(2) # 關鍵點1:交出控制權
print("任務1:休眠結束")
async def task2():
print("任務2:開始")
print("任務2:準備休眠")
await asyncio.sleep(1) # 關鍵點2:交出控制權
print("任務2:休眠結束")
async def main():
# 同時執行兩個任務
await asyncio.gather(task1(), task2())
asyncio.run(main())
這段代碼的輸出會是:
任務1:開始
任務1:準備休眠
任務2:開始
任務2:準備休眠
任務2:休眠結束 # 1秒后
任務1:休眠結束 # 2秒后
讓我們詳細解釋執行過程:
- 當程序遇到 await asyncio.sleep(2) 時:
這個 sleep 操作被注冊到事件循環中
Python 記錄當前的執行位置
task1 主動交出控制權
重要:task1 并沒有停止運行,而是被暫停了,等待之后恢復
- 事件循環接管控制權后:
尋找其他可以執行的協程(這里是 task2)
開始執行 task2,直到遇到 await asyncio.sleep(1)
task2 也交出控制權,被暫停
- 事件循環繼續工作:
管理一個計時器,追蹤這兩個 sleep 操作
1秒后,發現 task2 的 sleep 時間到了
恢復 task2 的執行,打印"任務2:休眠結束"
2秒到時,恢復 task1 的執行,打印"任務1:休眠結束"
這就像是一個指揮家(事件循環)在指揮一個管弦樂隊(多個協程):
- 當某個樂器(協程)需要休息時,它舉手示意(await)
- 指揮家看到后,立即指揮其他樂器演奏
- 當休息時間到了,指揮家會示意這個樂器繼續演奏
代碼驗證:
import asyncio
import time
async def report_time(name, sleep_time):
print(f"{time.strftime('%H:%M:%S')} - {name}開始")
await asyncio.sleep(sleep_time)
print(f"{time.strftime('%H:%M:%S')} - {name}結束")
async def main():
# 同時執行多個任務
await asyncio.gather(
report_time("任務A", 2),
report_time("任務B", 1),
report_time("任務C", 3)
)
asyncio.run(main())
輸出:
00:19:26 - 任務A開始
00:19:26 - 任務B開始
00:19:26 - 任務C開始
00:19:27 - 任務B結束
00:19:28 - 任務A結束
00:19:29 - 任務C結束
這種機制的優勢在于:
- 單線程執行,沒有線程切換開銷
- 協程主動交出控制權,而不是被操作系統強制切換
- 比起回調地獄,代碼更清晰易讀
- 錯誤處理更直觀,可以使用普通的 try/except
理解了這個機制,我們就能更好地使用異步編程:
- 在 await 的時候,其他協程有機會執行
- 耗時操作應該是真正的異步操作(比如 asyncio.sleep )
- 不要在協程中使用阻塞操作,那樣會卡住整個事件循環
小結
Python 的異步編程主要依賴以下概念:
- async/await 語法:定義和等待協程
- asyncio 模塊:提供事件循環和任務調度
- Task 對象:表示待執行的工作單元
- 異步上下文管理器:管理異步資源
使用異步編程的關鍵點:
- I/O 密集型任務最適合使用異步編程
- 所有耗時操作都應該是真正的異步操作
- 注意處理超時和異常情況
- 合理使用 asyncio.gather() 和 asyncio.wait_for()
異步編程不是萬能的,但在處理 I/O 密集型任務時確實能帶來顯著的性能提升。合理使用這些工具,能讓我們的程序更高效、更優雅。