使用Python和Asyncio編寫在線多人游戲(二)
在 Python 中用過異步編程嗎?本文中我會告訴你怎樣做,而且用一個能工作的例子來展示它:這是一個流行的貪吃蛇游戲,而且是為多人游戲而設計的。
介紹和理論部分參見“第一部分 異步化”。

3、編寫游戲循環主體
游戲循環是每一個游戲的核心。它持續地運行以讀取玩家的輸入、更新游戲的狀態,并且在屏幕上渲染游戲結果。在在線游戲中,游戲循環分為客戶端和服務端兩部分,所以一般有兩個循環通過網絡通信。通常客戶端的角色是獲取玩家輸入,比如按鍵或者鼠標移動,將數據傳輸給服務端,然后接收需要渲染的數據。服務端處理來自玩家的所有數據,更新游戲的狀態,執行渲染下一幀的必要計算,然后將結果傳回客戶端,例如游戲中對象的新位置。如果沒有可靠的理由,不混淆客戶端和服務端的角色是一件很重要的事。如果你在客戶端執行游戲邏輯的計算,很容易就會和其它客戶端失去同步,其實你的游戲也可以通過簡單地傳遞客戶端的數據來創建。
游戲循環的一次迭代稱為一個嘀嗒(tick)。嘀嗒是一個事件,表示當前游戲循環的迭代已經結束,下一幀(或者多幀)的數據已經就緒。
在后面的例子中,我們使用相同的客戶端,它使用 WebSocket 從一個網頁上連接到服務端。它執行一個簡單的循環,將按鍵碼發送給服務端,并顯示來自服務端的所有信息。客戶端代碼戳這里。
例子 3.1:基本游戲循環
我們使用 aiohttp 庫來創建游戲服務器。它可以通過 asyncio 創建網頁服務器和客戶端。這個庫的一個優勢是它同時支持普通 http 請求和 websocket。所以我們不用其他網頁服務器來渲染游戲的 html 頁面。
下面是啟動服務器的方法:
- app = web.Application()
- app["sockets"] = []
- asyncio.ensure_future(game_loop(app))
- app.router.add_route('GET', '/connect', wshandler)
- app.router.add_route('GET', '/', handle)
- web.run_app(app)
web.run_app 是創建服務主任務的快捷方法,通過它的 run_forever() 方法來執行 asyncio 事件循環。建議你查看這個方法的源碼,弄清楚服務器到底是如何創建和結束的。
app 變量就是一個類似于字典的對象,它用于在所連接的客戶端之間共享數據。我們使用它來存儲連接的套接字的列表。隨后會用這個列表來給所有連接的客戶端發送消息。asyncio.ensure_future() 調用會啟動主游戲循環的任務,每隔2 秒向客戶端發送嘀嗒消息。這個任務會在同樣的 asyncio 事件循環中和網頁服務器并行執行。
有兩個網頁請求處理器:handle 是提供 html 頁面的處理器;wshandler 是主要的 websocket 服務器任務,處理和客戶端之間的交互。在事件循環中,每一個連接的客戶端都會創建一個新的 wshandler 任務。這個任務會添加客戶端的套接字到列表中,以便 game_loop 任務可以給所有的客戶端發送消息。然后它將隨同消息回顯客戶端的每個擊鍵。
在啟動的任務中,我們在 asyncio 的主事件循環中啟動 worker 循環。任務之間的切換發生在它們之間任何一個使用 await語句來等待某個協程結束時。例如 asyncio.sleep 僅僅是將程序執行權交給調度器一段指定的時間;ws.receive 等待 websocket 的消息,此時調度器可能切換到其它任務。
在瀏覽器中打開主頁,連接上服務器后,試試隨便按下鍵。它們的鍵值會從服務端返回,每隔 2 秒這個數字會被游戲循環中發給所有客戶端的嘀嗒消息所覆蓋。
我們剛剛創建了一個處理客戶端按鍵的服務器,主游戲循環在后臺做一些處理,周期性地同時更新所有的客戶端。
例子 3.2: 根據請求啟動游戲
在前一個例子中,在服務器的生命周期內,游戲循環一直運行著。但是現實中,如果沒有一個人連接服務器,空運行游戲循環通常是不合理的。而且,同一個服務器上可能有不同的“游戲房間”。在這種假設下,每一個玩家“創建”一個游戲會話(比如說,多人游戲中的一個比賽或者大型多人游戲中的副本),這樣其他用戶可以加入其中。當游戲會話開始時,游戲循環才開始執行。
在這個例子中,我們使用一個全局標記來檢測游戲循環是否在執行。當第一個用戶發起連接時,啟動它。最開始,游戲循環沒有執行,標記設置為 False。游戲循環是通過客戶端的處理方法啟動的。
- if app["game_is_running"] == False:
- asyncio.ensure_future(game_loop(app))
當 game_loop() 運行時,這個標記設置為 True;當所有客戶端都斷開連接時,其又被設置為 False。
例子 3.3:管理任務
這個例子用來解釋如何和任務對象協同工作。我們把游戲循環的任務直接存儲在游戲循環的全局字典中,代替標記的使用。在像這樣的一個簡單例子中并不一定是最優的,但是有時候你可能需要控制所有已經啟動的任務。
- if app["game_loop"] is None or \
- app["game_loop"].cancelled():
- app["game_loop"] = asyncio.ensure_future(game_loop(app))
這里 ensure_future() 返回我們存放在全局字典中的任務對象,當所有用戶都斷開連接時,我們使用下面方式取消任務:
- app["game_loop"].cancel()
這個 cancel() 調用將通知調度器不要向這個協程傳遞執行權,而且將它的狀態設置為已取消:cancelled,之后可以通過 cancelled() 方法來檢查是否已取消。這里有一個值得一提的小注意點:當你持有一個任務對象的外部引用時,而這個任務執行中發生了異常,這個異常不會拋出。取而代之的是為這個任務設置一個異常狀態,可以通過 exception() 方法來檢查是否出現了異常。這種悄無聲息地失敗在調試時不是很有用。所以,你可能想用拋出所有異常來取代這種做法。你可以對所有未完成的任務顯式地調用 result() 來實現。可以通過如下的回調來實現:
- app["game_loop"].add_done_callback(lambda t: t.result())
如果我們打算在我們代碼中取消這個任務,但是又不想產生 CancelError 異常,有一個檢查 cancelled 狀態的點:
- app["game_loop"].add_done_callback(lambda t: t.result() if not t.cancelled() else None)
注意僅當你持有任務對象的引用時才需要這么做。在前一個例子,所有的異常都是沒有額外的回調,直接拋出所有異常。
例子 3.4:等待多個事件
在許多場景下,在客戶端的處理方法中你需要等待多個事件的發生。除了來自客戶端的消息,你可能需要等待不同類型事件的發生。比如,如果你的游戲時間有限制,那么你可能需要等一個來自定時器的信號。或者你需要使用管道來等待來自其它進程的消息。亦或者是使用分布式消息系統的網絡中其它服務器的信息。
為了簡單起見,這個例子是基于例子 3.1。但是這個例子中我們使用 Condition 對象來與已連接的客戶端保持游戲循環的同步。我們不保存套接字的全局列表,因為只在該處理方法中使用套接字。當游戲循環停止迭代時,我們使用 Condition.notify_all() 方法來通知所有的客戶端。這個方法允許在 asyncio 的事件循環中使用發布/訂閱的模式。
為了等待這兩個事件,首先我們使用 ensure_future() 來封裝任務中這個可等待對象。
- if not recv_task:
- recv_task = asyncio.ensure_future(ws.receive())
- if not tick_task:
- await tick.acquire()
- tick_task = asyncio.ensure_future(tick.wait())
在我們調用 Condition.wait() 之前,我們需要在它后面獲取一把鎖。這就是我們為什么先調用 tick.acquire() 的原因。在調用 tick.wait() 之后,鎖會被釋放,這樣其他的協程也可以使用它。但是當我們收到通知時,會重新獲取鎖,所以在收到通知后需要調用 tick.release() 來釋放它。
我們使用 asyncio.wait() 協程來等待兩個任務。
- done, pending = await asyncio.wait(
- [recv_task,
- tick_task],
- return_when=asyncio.FIRST_COMPLETED)
程序會阻塞,直到列表中的任意一個任務完成。然后它返回兩個列表:執行完成的任務列表和仍然在執行的任務列表。如果任務執行完成了,其對應變量賦值為 None,所以在下一個迭代時,它可能會被再次創建。
例子 3.5: 結合多個線程
在這個例子中,我們結合 asyncio 循環和線程,在一個單獨的線程中執行主游戲循環。我之前提到過,由于 GIL 的存在,Python 代碼的真正并行執行是不可能的。所以使用其它線程來執行復雜計算并不是一個好主意。然而,在使用 asyncio 時結合線程有原因的:當我們使用的其它庫不支持 asyncio 時就需要。在主線程中調用這些庫會阻塞循環的執行,所以異步使用他們的唯一方法是在不同的線程中使用他們。
我們使用 asyncio 循環的run_in_executor() 方法和 ThreadPoolExecutor 來執行游戲循環。注意 game_loop() 已經不再是一個協程了。它是一個由其它線程執行的函數。然而我們需要和主線程交互,在游戲事件到來時通知客戶端。asyncio 本身不是線程安全的,它提供了可以在其它線程中執行你的代碼的方法。普通函數有 call_soon_threadsafe(),協程有 run_coroutine_threadsafe()。我們在 notify() 協程中增加了通知客戶端游戲的嘀嗒的代碼,然后通過另外一個線程執行主事件循環。
- def game_loop(asyncio_loop):
- print("Game loop thread id {}".format(threading.get_ident()))
- async def notify():
- print("Notify thread id {}".format(threading.get_ident()))
- await tick.acquire()
- tick.notify_all()
- tick.release()
- while 1:
- task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
- # blocking the thread
- sleep(1)
- # make sure the task has finished
- task.result()
當你執行這個例子時,你會看到 “Notify thread id” 和 “Main thread id” 相等,因為 notify() 協程在主線程中執行。與此同時 sleep(1) 在另外一個線程中執行,因此它不會阻塞主事件循環。
例子 3.6:多進程和擴展
單線程的服務器可能運行得很好,但是它只能使用一個 CPU 核。為了將服務擴展到多核,我們需要執行多個進程,每個進程執行各自的事件循環。這樣我們需要在進程間交互信息或者共享游戲的數據。而且在一個游戲中經常需要進行復雜的計算,例如路徑查找之類。這些任務有時候在一個游戲嘀嗒中沒法快速完成。在協程中不推薦進行費時的計算,因為它會阻塞事件的處理。在這種情況下,將這個復雜任務交給其它并行執行的進程可能更合理。
最簡單的使用多個核的方法是啟動多個使用單核的服務器,就像之前的例子中一樣,每個服務器占用不同的端口。你可以使用 supervisord 或者其它進程控制的系統。這個時候你需要一個像 HAProxy 這樣的負載均衡器,使得連接的客戶端分布在多個進程間。已經有一些可以連接 asyncio 和一些流行的消息及存儲系統的適配系統。例如:
- aiomcache 用于 memcached 客戶端
- aiozmq 用于 zeroMQ
- aioredis 用于 Redis 存儲,支持發布/訂閱
你可以在 github 或者 pypi 上找到其它的軟件包,大部分以 aio 開頭。
使用網絡服務在存儲持久狀態和交換某些信息時可能比較有效。但是如果你需要進行進程間通信的實時處理,它的性能可能不足。此時,使用標準的 unix 管道可能更合適。asyncio 支持管道,在aiohttp倉庫有個 使用管道的服務器的非常底層的例子。
在當前的例子中,我們使用 Python 的高層類庫 multiprocessing 來在不同的核上啟動復雜的計算,使用 multiprocessing.Queue 來進行進程間的消息交互。不幸的是,當前的 multiprocessing 實現與 asyncio 不兼容。所以每一個阻塞方法的調用都會阻塞事件循環。但是此時線程正好可以起到幫助作用,因為如果在不同線程里面執行 multiprocessing 的代碼,它就不會阻塞主線程。所有我們需要做的就是把所有進程間的通信放到另外一個線程中去。這個例子會解釋如何使用這個方法。和上面的多線程例子非常類似,但是我們從線程中創建的是一個新的進程。
- def game_loop(asyncio_loop):
- # coroutine to run in main thread
- async def notify():
- await tick.acquire()
- tick.notify_all()
- tick.release()
- queue = Queue()
- # function to run in a different process
- def worker():
- while 1:
- print("doing heavy calculation in process {}".format(os.getpid()))
- sleep(1)
- queue.put("calculation result")
- Process(target=worker).start()
- while 1:
- # blocks this thread but not main thread with event loop
- result = queue.get()
- print("getting {} in process {}".format(result, os.getpid()))
- task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
- task.result()
這里我們在另外一個進程中運行 worker() 函數。它包括一個執行復雜計算并把計算結果放到 queue 中的循環,這個 queue 是 multiprocessing.Queue 的實例。然后我們就可以在另外一個線程的主事件循環中獲取結果并通知客戶端,就和例子 3.5 一樣。這個例子已經非常簡化了,它沒有合理的結束進程。而且在真實的游戲中,我們可能需要另外一個隊列來將數據傳遞給 worker。
有一個項目叫 aioprocessing,它封裝了 multiprocessing,使得它可以和 asyncio 兼容。但是實際上它只是和上面例子使用了完全一樣的方法:從線程中創建進程。它并沒有給你帶來任何方便,除了它使用了簡單的接口隱藏了后面的這些技巧。希望在 Python 的下一個版本中,我們能有一個基于協程且支持 asyncio 的 multiprocessing 庫。
注意!如果你從主線程或者主進程中創建了一個不同的線程或者子進程來運行另外一個 asyncio 事件循環,你需要顯式地使用 asyncio.new_event_loop() 來創建循環,不然的話可能程序不會正常工作。