代碼詳解Python多線程、多進程、協程
一、前言
很多時候我們寫了一個爬蟲,實現了需求后會發現了很多值得改進的地方,其中很重要的一點就是爬取速度。本文就通過代碼講解如何使用多進程、多線程、協程來提升爬取速度。注意:我們不深入介紹理論和原理,一切都在代碼中。
二、同步
首先我們寫一個簡化的爬蟲,對各個功能細分,有意識進行函數式編程。下面代碼的目的是訪問300次百度頁面并返回狀態碼,其中parse_1函數可以設定循環次數,每次循環將當前循環數(從0開始)和url傳入parse_2函數。
import requests def parse_1(): url = 'https://www.baidu.com' for i in range(300): parse_2(url) def parse_2(url): response = requests.get(url) print(response.status_code) if __name__ == '__main__': parse_1()
性能的消耗主要在IO請求中,當單進程單線程模式下請求URL時必然會引起等待
示例代碼就是典型的串行邏輯,parse_1將url和循環數傳遞給parse_2,parse_2請求并返回狀態碼后parse_1繼續迭代一次,重復之前步驟
三、多線程
因為CPU在執行程序時每個時間刻度上只會存在一個線程,因此多線程實際上提高了進程的使用率從而提高了CPU的使用率
實現多線程的庫有很多,這里用concurrent.futures中的ThreadPoolExecutor來演示。介紹ThreadPoolExecutor庫是因為它相比其他庫代碼更簡潔
為了方便說明問題,下面代碼中如果是新增加的部分,代碼行前會加上 > 符號便于觀察說明問題,實際運行需要去掉
import requests > from concurrent.futures import ThreadPoolExecutor def parse_1(): url = 'https://www.baidu.com' # 建立線程池 > pool = ThreadPoolExecutor(6) for i in range(300): > pool.submit(parse_2, url) > pool.shutdown(wait=True) def parse_2(url): response = requests.get(url) print(response.status_code) if __name__ == '__main__': parse_1()
跟同步相對的就是異步。異步就是彼此獨立,在等待某事件的過程中繼續做自己的事,不需要等待這一事件完成后再工作。線程就是實現異步的一個方式,也就是說多線程是異步處理異步就意味著不知道處理結果,有時候我們需要了解處理結果,就可以采用回調
import requests from concurrent.futures import ThreadPoolExecutor # 增加回調函數 > def callback(future): > print(future.result()) def parse_1(): url = 'https://www.baidu.com' pool = ThreadPoolExecutor(6) for i in range(300): > results = pool.submit(parse_2, url) # 回調的關鍵步驟 > results.add_done_callback(callback) pool.shutdown(wait=True) def parse_2(url): response = requests.get(url) print(response.status_code) if __name__ == '__main__': parse_1()
Python實現多線程有一個無數人詬病的GIL(全局解釋器鎖),但多線程對于爬取網頁這種多數屬于IO密集型的任務依舊很合適。
四、多進程
多進程用兩個方法實現:ProcessPoolExecutor和multiprocessing
1. ProcessPoolExecutor
和實現多線程的ThreadPoolExecutor類似
import requests > from concurrent.futures import ProcessPoolExecutor def parse_1(): url = 'https://www.baidu.com' # 建立線程池 > pool = ProcessPoolExecutor(6) for i in range(300): > pool.submit(parse_2, url) > pool.shutdown(wait=True) def parse_2(url): response = requests.get(url) print(response.status_code) if __name__ == '__main__': parse_1()
可以看到改動了兩次類名,代碼依舊很簡潔,同理也可以添加回調函數
import requests from concurrent.futures import ProcessPoolExecutor > def callback(future): > print(future.result()) def parse_1(): url = 'https://www.baidu.com' pool = ProcessPoolExecutor(6) for i in range(300): > results = pool.submit(parse_2, url) > results.add_done_callback(callback) pool.shutdown(wait=True) def parse_2(url): response = requests.get(url) print(response.status_code) if __name__ == '__main__': parse_1()
2. multiprocessing
直接看代碼,一切都在注釋中。
import requests > from multiprocessing import Pool def parse_1(): url = 'https://www.baidu.com' # 建池 > pool = Pool(processes=5) # 存放結果 > res_lst = [] for i in range(300): # 把任務加入池中 > res = pool.apply_async(func=parse_2, args=(url,)) # 獲取完成的結果(需要取出) > res_lst.append(res) # 存放最終結果(也可以直接存儲或者print) > good_res_lst = [] > for res in res_lst: # 利用get獲取處理后的結果 > good_res = res.get() # 判斷結果的好壞 > if good_res: > good_res_lst.append(good_res) # 關閉和等待完成 > pool.close() > pool.join() def parse_2(url): response = requests.get(url) print(response.status_code) if __name__ == '__main__': parse_1()
可以看到multiprocessing庫的代碼稍繁瑣,但支持更多的拓展。多進程和多線程確實能夠達到加速的目的,但如果遇到IO阻塞會出現線程或者進程的浪費,因此有一個更好的方法……
五、異步非阻塞
協程+回調配合動態協作就可以達到異步非阻塞的目的,本質只用了一個線程,所以很大程度利用了資源
實現異步非阻塞經典是利用asyncio庫+yield,為了方便利用逐漸出現了更上層的封裝 aiohttp,要想更好的理解異步非阻塞最好還是深入了解asyncio庫。而gevent是一個非常方便實現協程的庫
import requests > from gevent import monkey # 猴子補丁是協作運行的靈魂 > monkey.patch_all() > import gevent def parse_1(): url = 'https://www.baidu.com' # 建立任務列表 > tasks_list = [] for i in range(300): > task = gevent.spawn(parse_2, url) > tasks_list.append(task) > gevent.joinall(tasks_list) def parse_2(url): response = requests.get(url) print(response.status_code) if __name__ == '__main__': parse_1()
gevent能很大提速,也引入了新的問題:如果我們不想速度太快給服務器造成太大負擔怎么辦?如果是多進程多線程的建池方法,可以控制池內數量。如果用gevent想要控制速度也有一個不錯的方法:建立隊列。gevent中也提供了Quene類,下面代碼改動較大
import requests from gevent import monkey monkey.patch_all() import gevent > from gevent.queue import Queue def parse_1(): url = 'https://www.baidu.com' tasks_list = [] # 實例化隊列 > quene = Queue() for i in range(300): # 全部url壓入隊列 > quene.put_nowait(url) # 兩路隊列 > for _ in range(2): > task = gevent.spawn(parse_2) > tasks_list.append(task) gevent.joinall(tasks_list) # 不需要傳入參數,都在隊列中 > def parse_2(): # 循環判斷隊列是否為空 > while not quene.empty(): # 彈出隊列 > url = quene.get_nowait() response = requests.get(url) # 判斷隊列狀態 > print(quene.qsize(), response.status_code) if __name__ == '__main__': parse_1()
結束語
以上就是幾種常用的加速方法。如果對代碼測試感興趣可以利用time模塊判斷運行時間。爬蟲的加速是重要技能,但適當控制速度也是爬蟲工作者的良好習慣,不要給服務器太大壓力,拜拜~