利用 Python 實現多任務進程
本文轉載自微信公眾號「杰哥的IT之旅」,作者阿拉斯加 。轉載本文請聯系杰哥的IT之旅公眾號。
一、進程介紹
進程:正在執行的程序,由程序、數據和進程控制塊組成,是正在執行的程序,程序的一次執行過程,是資源調度的基本單位。
程序:沒有執行的代碼,是一個靜態的。
二、線程和進程之間的對比
由圖可知:此時電腦有 9 個應用進程,但是一個進程又會對應于多個線程,可以得出結論:
進程:能夠完成多任務,一臺電腦上可以同時運行多個 QQ
線程:能夠完成多任務,一個 QQ 中的多個聊天窗口
根本區別:進程是操作系統資源分配的基本單位,而線程是任務調度和執行的基本單位.
使用多進程的優勢:
1、擁有獨立GIL:
首先由于進程中 GIL 的存在,Python 中的多線程并不能很好地發揮多核優勢,一個進程中的多個線程,在同 一時刻只能有一個線程運行。而對于多進程來說,每個進程都有屬于自己的 GIL,所以,在多核處理器下,多進程的運行是不會受 GIL的影響的。因此,多進 程能更好地發揮多核的優勢。
2、效率高
當然,對于爬蟲這種 IO 密集型任務來說,多線程和多進程影響差別并不大。對于計算密集型任務來說,Python 的多進程相比多線 程,其多核運行效率會有成倍的提升。
三、Python 實現多進程
我們先用一個實例來感受一下:
1、使用 process 類
- import multiprocessing
- def process(index):
- print(f'Process: {index}')
- if __name__ == '__main__':
- for i in range(5):
- p = multiprocessing.Process(target=process, args=(i,))
- p.start()
這是一個實現多進程最基礎的方式:通過創建 Process 來新建一個子進程,其中 target 參數傳入方法名,args 是方法的參數,是以 元組的形式傳入,其和被調用的方法 process 的參數是一一對應的。
注意:這里 args 必須要是一個元組,如果只有一個參數,那也要在元組第一個元素后面加一個逗號,如果沒有逗號則 和單個元素本身沒有區別,無法構成元組,導致參數傳遞出現問題。創建完進程之后,我們通過調用 start 方法即可啟動進程了。
運行結果如下:
- Process: 0
- Process: 1
- Process: 2
- Process: 3
- Process: 4
可以看到,我們運行了 5 個子進程,每個進程都調用了 process 方法。process 方法的 index 參數通過 Process 的 args 傳入,分別是 0~4 這 5 個序號,最后打印出來,5 個子進程運行結束。
2、繼承 process 類
- from multiprocessing import Process
- import time
- class MyProcess(Process):
- def __init__(self,loop):
- Process.__init__(self)
- self.loop = loop
- def run(self):
- for count in range(self.loop):
- time.sleep(1)
- print(f'Pid:{self.pid} LoopCount: {count}')
- if __name__ == '__main__':
- for i in range(2,5):
- p = MyProcess(i)
- p.start()
我們首先聲明了一個構造方法,這個方法接收一個 loop 參數,代表循環次數,并將其設置為全局變量。在 run方法中,又使用這 個 loop 變量循環了 loop 次并打印了當前的進程號和循環次數。
在調用時,我們用 range 方法得到了 2、3、4 三個數字,并把它們分別初始化了 MyProcess 進程,然后調用 start 方法將進程啟動起 來。
注意:這里進程的執行邏輯需要在 run 方法中實現,啟動進程需要調用 start 方法,調用之后 run 方法便會執行。
運行結果如下:
- Pid:12976 LoopCount: 0
- Pid:15012 LoopCount: 0
- Pid:11976 LoopCount: 0
- Pid:12976 LoopCount: 1
- Pid:15012 LoopCount: 1
- Pid:11976 LoopCount: 1
- Pid:15012 LoopCount: 2
- Pid:11976 LoopCount: 2
- Pid:11976 LoopCount: 3
注意,這里的進程 pid 代表進程號,不同機器、不同時刻運行結果可能不同。
四、進程之間的通信
1、Queue-隊列 先進先出
- from multiprocessing import Queue
- import multiprocessing
- def download(p): # 下載數據
- lst = [11,22,33,44]
- for item in lst:
- p.put(item)
- print('數據已經下載成功....')
- def savedata(p):
- lst = []
- while True:
- data = p.get()
- lst.append(data)
- if p.empty():
- break
- print(lst)
- def main():
- p1 = Queue()
- t1 = multiprocessing.Process(target=download,args=(p1,))
- t2 = multiprocessing.Process(target=savedata,args=(p1,))
- t1.start()
- t2.start()
- if __name__ == '__main__':
- main()
- 數據已經下載成功....
- [11, 22, 33, 44]
2、共享全局變量不適用于多進程編程
- import multiprocessing
- a = 1
- def demo1():
- global a
- a += 1
- def demo2():
- print(a)
- def main():
- t1 = multiprocessing.Process(target=demo1)
- t2 = multiprocessing.Process(target=demo2)
- t1.start()
- t2.start()
- if __name__ == '__main__':
- main()
運行結果:
- 1
有結果可知:全局變量不共享;
五、進程池之間的通信
1、進程池引入
當需要創建的子進程數量不多時,可以直接利用 multiprocessing 中的 Process 動態生成多個進程,但是如果是上百甚至上千個目標,手動的去創建的進程的工作量巨大,此時就可以用到 multiprocessing 模塊提供的 Pool 方法。
- from multiprocessing import Pool
- import os,time,random
- def worker(a):
- t_start = time.time()
- print('%s開始執行,進程號為%d'%(a,os.getpid()))
- time.sleep(random.random()*2)
- t_stop = time.time()
- print(a,"執行完成,耗時%0.2f"%(t_stop-t_start))
- if __name__ == '__main__':
- po = Pool(3) # 定義一個進程池
- for i in range(0,10):
- po.apply_async(worker,(i,)) # 向進程池中添加worker的任務
- print("--start--")
- po.close()
- po.join()
- print("--end--")
運行結果:
- --start--
- 0開始執行,進程號為6664
- 1開始執行,進程號為4772
- 2開始執行,進程號為13256
- 0 執行完成,耗時0.18
- 3開始執行,進程號為6664
- 2 執行完成,耗時0.16
- 4開始執行,進程號為13256
- 1 執行完成,耗時0.67
- 5開始執行,進程號為4772
- 4 執行完成,耗時0.87
- 6開始執行,進程號為13256
- 3 執行完成,耗時1.59
- 7開始執行,進程號為6664
- 5 執行完成,耗時1.15
- 8開始執行,進程號為4772
- 7 執行完成,耗時0.40
- 9開始執行,進程號為6664
- 6 執行完成,耗時1.80
- 8 執行完成,耗時1.49
- 9 執行完成,耗時1.36
- --end--
一個進程池只能容納 3 個進程,執行完成才能添加新的任務,在不斷的打開與釋放的過程中循環往復。
六、案例:文件批量復制
操作思路:
- 獲取要復制文件夾的名字
- 創建一個新的文件夾
- 獲取文件夾里面所有待復制的文件名
- 創建進程池
- 向進程池添加任務
代碼如下:
導包
- import multiprocessing
- import os
- import time
定制文件復制函數
- def copy_file(Q,oldfolderName,newfolderName,file_name):
- # 文件復制,不需要返回
- time.sleep(0.5)
- # print('\r從%s文件夾復制到%s文件夾的%s文件'%(oldfolderName,newfolderName,file_name),end='')
- old_file = open(oldfolderName + '/' + file_name,'rb') # 待復制文件
- content = old_file.read()
- old_file.close()
- new_file = open(newfolderName + '/' + file_name,'wb') # 復制出的新文件
- new_file.write(content)
- new_file.close()
- Q.put(file_name) # 向Q隊列中添加文件
定義主函數
- def main():
- oldfolderName = input('請輸入要復制的文件夾名字:') # 步驟1獲取要復制文件夾的名字(可以手動創建,也可以通過代碼創建,這里我們手動創建)
- newfolderName = oldfolderName + '復件'
- # 步驟二 創建一個新的文件夾
- if not os.path.exists(newfolderName):
- os.mkdir(newfolderName)
- filenames = os.listdir(oldfolderName) # 3.獲取文件夾里面所有待復制的文件名
- # print(filenames)
- pool = multiprocessing.Pool(5) # 4.創建進程池
- Q = multiprocessing.Manager().Queue() # 創建隊列,進行通信
- for file_name in filenames:
- pool.apply_async(copy_file,args=(Q,oldfolderName,newfolderName,file_name)) # 5.向進程池添加任務
- po.close()
- copy_file_num = 0
- file_count = len(filenames)
- # 不知道什么時候完成,所以定義一個死循環
- while True:
- file_name = Q.get()
- copy_file_num += 1
- time.sleep(0.2)
- print('\r拷貝進度%.2f %%'%(copy_file_num * 100/file_count),end='') # 做一個拷貝進度條
- if copy_file_num >= file_count:
- break
程序運行
- if __name__ == '__main__':
- main()
運行結果如下圖所示:
運行前后文件目錄結構對比
運行前
運行后
以上內容就是整體大致結果了,由于 test 里面是隨便粘貼的測試文件,這里就不展開演示了。