Python線程安全之三大同步原語
使用同步原語進行通信和協調
在這些方法中,使用事件、條件和屏障對象等synchronization原語可以促進多個線程之間的通信和協調。
1:事件信號
可以使用事件對象進行信號通信,讓一個線程向一個或多個線程通知某個操作,具體操作如下,先創建一個Event事件,事件對象有一個內部標記,默認為False,可以使用.set()設置標記為True,也可以使用.clear() 將其重置為False,當其它線程調用.wait() 方法時,它會阻塞,直到事件對象的內部標志被設置為True。
舉個例子:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
bank_open = threading.Event()
transactions_open = threading.Event()
def serve_customer(customer_data):
print(f"{customer_data['name']} 正在等待銀行開門。")
bank_open.wait()
print(f"{customer_data['name']} 進入了銀行")
if customer_data["type"] == "WITHDRAW_MONEY":
print(f"{customer_data['name']} 正在等待交易開始。")
transactions_open.wait()
print(f"{customer_data['name']} 開始交易。")
# 模擬執行交易的時間
time.sleep(2)
print(f"{customer_data['name']} 完成交易并離開了銀行")
else:
# 模擬其他銀行業務的時間
time.sleep(2)
print(f"{customer_data['name']} 已離開銀行")
customers = [
{"name": "客戶 1", "type": "WITHDRAW_MONEY"},
{"name": "客戶 2", "type": "CHECK_BALANCE"},
{"name": "客戶 3", "type": "WITHDRAW_MONEY"},
{"name": "客戶 4", "type": "WITHDRAW_MONEY"},
{"name": "客戶 5", "type": "WITHDRAW_MONEY"},
{"name": "客戶 6", "type": "WITHDRAW_MONEY"},
]
with ThreadPoolExecutor(max_workers=4) as executor:
for customer_data in customers:
executor.submit(serve_customer, customer_data)
print("銀行經理正在準備開門。")
time.sleep(2)
print("銀行現在開門了!")
bank_open.set() # 發出銀行開門的信號
time.sleep(3)
print("交易現在開放!")
transactions_open.set()
print("所有客戶已完成交易。")
猜猜結果是什么:
? 事件控制:bank_open和transactions_open兩個事件標記,控制銀行何時開門以及交易何時開始,所有客戶在銀行開門前會被阻塞,等待bank_open.set(),而需取款的客戶會繼續等待transactions_open.set() 才能執行取款操作。
? 線程池的使用:ThreadPoolExecutor限制了同時執行的線程數,最多服務4個客戶,當一個客戶完成服務后,線程池會釋放一個線程,這樣新客戶可以繼續進入銀行。
? CHECK_BALANC類型的客戶不需要等待transactions_open事件,因此會在銀行開門后直接完成操作并離開。
客戶 1 正在等待銀行開門。
客戶 2 正在等待銀行開門。
客戶 3 正在等待銀行開門。
客戶 4 正在等待銀行開門。
客戶 5 正在等待銀行開門。
客戶 6 正在等待銀行開門。
銀行經理正在準備開門。
銀行現在開門了!
客戶 1 進入了銀行
客戶 2 進入了銀行
客戶 3 進入了銀行
客戶 4 進入了銀行
客戶 1 正在等待交易開始。
客戶 3 正在等待交易開始。
客戶 4 正在等待交易開始。
客戶 2 已離開銀行
客戶 5 進入了銀行
客戶 5 正在等待交易開始。
客戶 6 進入了銀行
客戶 6 正在等待交易開始。
交易現在開放!
客戶 1 開始交易。
客戶 3 開始交易。
客戶 4 開始交易。
客戶 5 開始交易。
客戶 1 完成交易并離開了銀行
客戶 3 完成交易并離開了銀行
客戶 4 完成交易并離開了銀行
客戶 6 開始交易。
客戶 5 完成交易并離開了銀行
客戶 6 完成交易并離開了銀行
所有客戶已完成交易。
在需要同時向多個等待線程發出狀態變化信號的情況下,事件對象尤其有用。
Conditions條件等待
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor
customer_available_condition = threading.Condition()
# Customers waiting to be served by the Teller
customer_queue = []
def now():
return time.strftime("%H:%M:%S")
def serve_customers():
while True:
with customer_available_condition:
# Wait for a customer to arrive
while not customer_queue:
print(f"{now()}: Teller is waiting for a customer.")
customer_available_condition.wait()
# Serve the customer
customer = customer_queue.pop(0)
print(f"{now()}: Teller is serving {customer}.")
# Simulate the time taken to serve the customer
time.sleep(random.randint(1, 5))
print(f"{now()}: Teller has finished serving {customer}.")
def add_customer_to_queue(name):
with customer_available_condition:
print(f"{now()}: {name} has arrived at the bank.")
customer_queue.append(name)
customer_available_condition.notify()
customer_names = [
"Customer 1",
"Customer 2",
"Customer 3",
"Customer 4",
"Customer 5",
]
with ThreadPoolExecutor(max_workers=6) as executor:
teller_thread = executor.submit(serve_customers)
for name in customer_names:
# Simulate customers arriving at random intervals
time.sleep(random.randint(1, 3))
executor.submit(add_customer_to_queue, name)
利用條件對象condition來協調生產者-消費者模型中的線程通信,使線程在特定條件滿足時再繼續執行,從而有效管理多線程中的執行流程。
Condition對象(customer_available_condition)既用作鎖來保護共享資源(customer_queue),也用作線程間的通信工具。通過wait()和notify()方法,柜員可以等待客戶到來,客戶到達后再通知柜員開始服務,從而避免了“忙等”。
在with上下文管理器中,condition對象確保在臨界區內自動加鎖和釋放鎖,保護共享資源customer_queue,serve_customers()中的無限循環讓柜員可以持續服務來訪的客戶,而在隊列為空時,通過wait()等待,避免無效的資源占用,使用condition實現同步,使得只有在客戶隊列非空時柜員才會服務,避免了資源的浪費和繁瑣的輪詢。
可能的輸出如下:
10:15:08: Teller is waiting for a customer.
10:15:09: Customer 1 has arrived at the bank.
10:15:09: Teller is serving Customer 1.
10:15:11: Customer 2 has arrived at the bank.
10:15:12: Teller has finished serving Customer 1.
10:15:12: Teller is serving Customer 2.
10:15:13: Teller has finished serving Customer 2.
10:15:13: Teller is waiting for a customer.
10:15:14: Customer 3 has arrived at the bank.
10:15:14: Teller is serving Customer 3.
10:15:15: Customer 4 has arrived at the bank.
10:15:17: Customer 5 has arrived at the bank.
10:15:18: Teller has finished serving Customer 3.
10:15:18: Teller is serving Customer 4.
10:15:22: Teller has finished serving Customer 4.
10:15:22: Teller is serving Customer 5.
10:15:25: Teller has finished serving Customer 5.
10:15:25: Teller is waiting for a customer.
Barriers
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor
teller_barrier = threading.Barrier(3)
def now():
return time.strftime("%H:%M:%S")
def prepare_for_work(name):
print(f"{now()}: {name} is preparing their counter.")
# Simulate the delay to prepare the counter
time.sleep(random.randint(1, 3))
print(f"{now()}: {name} has finished preparing.")
# Wait for all tellers to finish preparing
teller_barrier.wait()
print(f"{now()}: {name} is now ready to serve customers.")
tellers = ["Teller 1", "Teller 2", "Teller 3"]
with ThreadPoolExecutor(max_workers=4) as executor:
for teller_name in tellers:
executor.submit(prepare_for_work, teller_name)
print(f"{now()}: All tellers are ready to serve customers.")
Barrier用于多線程場景中,當多個線程都到達指定的同步點(即wait()方法)后,所有線程才能繼續執行,在銀行場景中,Barrier確保所有柜員準備就緒后才能開始為客戶服務。
Barrier(3)指定了屏障點需要3個線程才能通過,確保所有3個柜員必須完成準備才會繼續,一旦最后一個柜員完成準備,所有線程(柜員)同時通過屏障,開始為客戶服務。
總結
在多線程編碼中,因為由上下文切換,所以某個代碼塊需要作為一個原子單元執行(即不可分割),就需要使用鎖的機制來保護;同時在修改共享可變數據的時候,一定也要通過鎖機制保護;另外使用的第三方庫可能不是線程安全的;不確定線程安全性時使用互斥鎖是一種最佳實踐。
同步工具包括:
- ? Lock 和 RLock:用于實現互斥鎖,確保某段代碼在一個線程執行時不被其他線程打斷。
- ? Semaphore:用于限制資源的并發訪問次數,可以控制同時運行的線程數量。
- ? Event:用于在線程間發送信號,通知某些條件已滿足。
- ? Condition:允許線程等待特定條件并在條件滿足時繼續執行,常用于生產者-消費者模型。
- ? Barrier:用于協調多個線程的執行步調,確保所有線程在同步點上會合后一起繼續。