Python 升級之路( Lv29 ) 并發編程三劍客(上)
大家好,我是了不起。
今天我們將學習并發編程三劍客之一的進程, 了解其創建方式, 通信方式以及管理方式。
今日冒險片段上:
走出暗影迷宮后, 二人便來到了暗精靈圣地熔巖穴, 熔巖穴與火焰圣地卡崖克火山的地脈相連, 據說很久前,那里曾出現過一種怪異的炙熱熔巖,它不僅融化了各種生命體,還使融化后的巖漿變成了有生命的怪物. 現在地底壓力噴出的熔巖,使那里的盜尸者和僵尸出現了奇怪的異變.??而了不起他們也在小心翼翼的躲避著這些怪物, 并同時注意腳下隨時可能噴薄而出的火山熔巖. 但是在即將通過時, 還是被這片區域的領主所阻攔. 而且領主不只有一個, 而是有三個, 分別是:泰坦, 歌利亞, 阿特拉斯. 這三個是熔巖生命體, 不僅身形巨大, 而且身體外表溫度很高. 這讓二人犯了難...
一、進程
書寫上文, 我們可以知道: 進程是資源(CPU、內存等)分配的基本單位,它是程序執行時的一個實例. 程序運行時系統就會創建一個進程,并為它分配資源,然后把該進程放入進程就緒隊列,進程調度器選中它的時候就會為它分配CPU時間,程序開始真正運行. 進程切換需要的資源很最大,效率低。
二、創建方式
進程創建有兩種方式:方法包裝和類包裝。
熟悉Java的人可能會發現, 類包裝更符合我們原來的書寫習慣創建進程后, 我們使用start() 來啟動進程。
1. 類包裝
(1) 主要步驟:
- 定義一個進程類, 并修改初始化構造, 改為有參構造
- 創建進程時, 傳入初始化方法中添加的參數即可
(2) 實操代碼
import time
from multiprocessing import Process
class MyProcess(Process):
"""進程的創建方式: 2.類包裝"""
def __init__(self, name):
Process.__init__(self)
self.name = name
def run(self):
print(f"進程{self.name} 啟動")
time.sleep(3)
print(f"進程{self.name} 結束")
if __name__ == "__main__":
print("創建進程")
p1 = MyProcess("p1")
p2 = MyProcess("p2")
p1.start()
p2.start()
(3) 執行結果
2. 方法包裝
(1) 主要步驟
在創建進程時: 已默認值參數的方式聲明目標函數, 以及傳入目標函數的參數(元組的方式)
(2) 實操代碼
import os
import time
from multiprocessing import Process
def function(name):
"""進程的創建方式: 1.方法包裝"""
print("當前進程ID:", os.getpid())
print("父進程ID", os.getppid())
print(f"Process:{name} start")
time.sleep(3)
print(f"Process:{name} end")
if __name__ == "__main__":
print("當前main進程ID: ", os.getppid())
# 創建進程
p1 = Process(target=function, args=("p1",))
p2 = Process(target=function, args=("p2",))
p1.start()
p2.start()
(3) 執行結果
(4) 多說一句
元組中如果只有一個元素, 是需要加逗號的!!! 這是因為括號( )既可以表示tuple,又可以表示數學公式中的小括號, 所以如果沒有加逗號,那你里面放什么類型的數據那么類型就會是什么。
三、進程間通信方式
進程間通信有兩種方式:Queue隊列和Pipe管道方式。
1. Queue隊列
要實現進程間通信,需要使用 multiprocessing 模塊中的 Queue 類。
進程間通信的方式,就是使用了操作系統給開辟的一個隊列空間,各個進程可以把數據放到該隊列中,當然也可以從隊列中把自己需要的信息取走。
實現核心:
- 這里利用類包裝的方式, 并且添加了一個參數mq
- 主函數聲明一個Queue隊列, 放入需要通信的消息
- 在需要調用時, 利用mq,get 獲取當前進程所傳入的值
實操代碼:
from multiprocessing import Process, Queue
class MyProcess(Process):
def __init__(self, name, mq):
Process.__init__(self)
self.name = name
self.mq = mq
def run(self):
print("Process {} started".format(self.name))
print("===Queue", self.mq.get(), "===")
self.mq.put(self.name)
print("Process {} end".format(self.name))
if __name__ == "__main__":
# 創建進程列表
t_list = []
mq = Queue()
mq.put("1")
mq.put("2")
mq.put("3")
# 利用range序列重復創建進程
for i in range(3):
t = MyProcess("p{}".format(i), mq)
t.start()
t_list.append(t)
# 等待進程結束
for t in t_list:
t.join()
print(mq.get())
print(mq.get())
print(mq.get())
執行結果:
2. Pipe管道
Pipe 直譯過來的意思是“管”或“管道”,和實際生活中的管(管道)是非常類似的. Pipe方法返回(conn1, conn2)代表一個管道的兩個端.
- Pipe方法有duplex參數,如果duplex參數為True(默認值),那么這個參數是全雙工模式,也就是說conn1和conn2均可收發
- 若duplex為False,conn1只負責接收消息,conn2只負責發送消息. send和recv方法分別是發送和接受消息的方法. 例如,在全雙工模式下,可以調用conn1.send發送消息,conn1.recv接收消息.
- 如果沒有消息可接收,recv方法會一直阻塞. 如果管道已經被關閉,那么recv方法會拋出EOFError
實現核心:
- 主函數聲明管道的兩端 conn1, conn2 = multiprocessing.Pipe()
- 以方法包裝方式創建進程后, 在對應方法中調用管道的兩端調用消息收發的方法 conn1.send/conn1.recv
實操代碼:
import multiprocessing
import time
def fun1(conn1):
"""
管道結構
進程<==>conn1(管道頭)==pipe==conn2(管道尾)<==>進程2
"""
sub_info = "進程向conn1發送消息, 管道另一頭conn2 可以接收到消息"
print(f"進程1--{multiprocessing.current_process().pid}發送數據:{sub_info}")
time.sleep(1)
conn1.send(sub_info) # 調用conn1.send發送消息, 發送的消息會被管道的另一頭接收
print(f"conn1接收消息:{conn1.recv()}") # conn1.recv接收消息, 如果沒有消息可接收, recv方法會一直阻塞. 如果管道已經被關閉,那么recv方法會拋出EOFError
time.sleep(1)
def fun2(conn2):
sub_info = "進程向conn2發送消息, 管道另一頭conn1 可以接收到消息"
print(f"進程2--{multiprocessing.current_process().pid}發送數據:{sub_info}")
time.sleep(1)
conn2.send(sub_info)
print(f"conn2接收消息:{conn2.recv()}")
time.sleep(1)
if __name__ == "__main__":
# 創建管道
# Pipe方法返回(conn1, conn2)代表一個管道的兩個端.如果conn1帶表頭, conn2代表尾, conn1發送的消息只會被conn2接收, 同理conn2發送的消息也只會被conn1接收
conn1, conn2 = multiprocessing.Pipe()
# 創建子進程
# Python中,圓括號意味著調用函數. 在沒有圓括號的情況下,Python會把函數當做普通對象
process1 = multiprocessing.Process(target=fun1, args=(conn1,))
process2 = multiprocessing.Process(target=fun2, args=(conn2,))
# 啟動子進程
process1.start()
process2.start()
四、Manager管理器
管理器提供了一種創建共享數據的方法,從而可以在不同進程中共享。
實現核心:
- 創建進程
- 利用Manager創建字典, 列表等對象, 傳入進程
- 在各進程所對應的方法中修改上面創建的對象
實操代碼:
from multiprocessing import Manager, Process
def func1(name,m_list,m_dict):
m_dict['area'] = '羅布泊'
m_list.append('錢三強')
def func2(name, m_list, m_dict):
m_dict['work'] = '造核彈'
m_list.append('鄧稼先')
if __name__ == "__main__":
with Manager() as mgr:
m_list = mgr.list()
m_dict = mgr.dict()
m_list.append("錢學森")
# 兩個進程不能直接互相使用對象,需要互相傳遞
p1 = Process(target=func1, args=('p1', m_list, m_dict))
p1.start()
p1.join() # 等p1進程結束,主進程繼續執行
print(m_list)
print(m_dict)
p2 = Process(target=func2, args=('p1', m_list, m_dict))
p2.start()
p2.join() # 等p2進程結束,主進程繼續執行
print(m_list)
print(m_dict)
執行結果太過機密, 不予展示。
五、進程池(Pool)
進程池可以提供指定數量的進程給用戶使用,即當有新的請求提交到進程池中時,如果池未滿,則會創建一個新的進程用來執行該請求;反之,如果池中的進程數已經達到規定最大值,那么該請求就會等待,只要池中有進程空閑下來,該請求就能得到執行 使用進程池的好處就是可以節約內存空間, 提高資源利用率。
進程池相關方法:
類/方法 | 功能 | 參數 |
Pool(processes) | 創建進程池對象 | processes表示進程池中有多少進程 |
pool.apply_async(func,args,kwds) | 異步執行;將事件放入到進程池隊列 | func 事件函數 args 以元組形式給func傳參kwds 以字典形式給func傳參返回值:返回一個代表進程池事件的對象,通過返回值的get方法可以得到事件函數的返回值 |
pool.apply(func,args,kwds) | 同步執行;將事件放入到進程池隊列 | func 事件函數 args 以元組形式給func傳參kwds 以字典形式給func傳參 |
pool.close() | 關閉進程池 | |
pool.join() | 關閉進程池 | |
pool.map(func,iter) | 類似于python的map函數,將要做的事件放入進程池 | func 要執行的函數 iter 迭代對象 |
實現核心:
- 創建和初始化進程池
- 以方法包裝的方式傳入相關參數, 并調用相關api
實操代碼:
from multiprocessing import Pool
import os
from time import sleep
def func1(name):
print(f"方法1輸出: 當前進程的ID:{os.getpid()},{name}")
sleep(2)
return name
def func2(args):
print("方法2輸出: ", args)
if __name__ == "__main__":
pool = Pool(5)
pool.apply_async(func=func1, args=('進程1',), callback=func2)
pool.apply_async(func=func1, args=('進程2',), callback=func2)
pool.apply_async(func=func1, args=('進程3',), callback=func2)
pool.apply_async(func=func1, args=('進程4',))
pool.apply_async(func=func1, args=('進程5',))
pool.apply_async(func=func1, args=('進程6',))
pool.apply_async(func=func1, args=('進程7',))
pool.close()
pool.join()
執行結果:
1. 使用with管理進程池
使用with 方法, 可以進行優雅的進行資源管理. 在這里是可以幫助我們優雅的關閉線程池。
關于with方法:
with所求值的對象必須有一個enter()方法,一個exit()方法. 緊跟with后面的語句被求值后,返回對象的__enter__()方法被調用, 這個方法的返回值將被賦值給as后面的變量。當with后面的代碼塊全部被執行完之后,將調用前面返回對象的exit()方法。
實操代碼:
from multiprocessing import Pool
import os
from time import sleep
def func1(name):
print(f"方法1輸出: 當前進程的ID:{os.getpid()},{name}")
sleep(2)
return name
if __name__ == "__main__":
with Pool(5) as pool:
args = pool.map(func1, ('進程1,', '進程2,', '進程3,', '進程4,', '進程5,', '進程6,', '進程7,', '進程8,'))
for a in args:
print(a)
今日冒險片段下:
這時, 米斯特建議我們應該先集中精力先擊敗一只, 然后在2v2時利用速度優勢將其余兩只擊敗. 了不起覺得可行, 然后利用魔劍奧義首先將武器附上冰屬性. 通過冰屬性攻擊使其行動減緩之后, 二人便使用技能將將泰坦擊敗. 隨后又花了小半天時間又將剩下的歌利亞, 阿特拉斯擊敗. 就這樣了不起也成功的升級到lv30. 同時領主也掉落了物品熔巖核, 一種含有大量火屬性元素的珠子. 這個珠子現在看起來沒什么作用, 但在將來的某一天可能會是其救命稻草...