成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Python 并發編程實戰:優雅地使用 Concurrent.futures

開發 后端
Concurrent.futures? 模塊為 Python 并發編程提供了一個優雅的高級接口。相比傳統的 threading? / multiprocessing 模塊。

在 Python 多線程編程中,concurrent.futures 模塊提供了一個高層的接口來異步執行可調用對象。今天,我們將通過一個循序漸進的案例,深入了解如何使用這個強大的工具。

從一個模擬場景開始

假設我們需要處理一批網絡請求。為了模擬這個場景,我們使用 sleep 來代表耗時操作:

import time
import random

def slow_operation(task_id):
    """模擬一個耗時的網絡請求"""
    sleep_time = random.uniform(0.5, 2)
    time.sleep(sleep_time)
    return f"Task {task_id} completed in {sleep_time:.2f} seconds"

# 串行處理
def process_serial():
    start = time.perf_counter()
    results = []
    for i in range(10):
        result = slow_operation(i)
        results.append(result)
    end = time.perf_counter()
    print(f"串行處理總耗時:{end - start:.2f} 秒")
    return results

# 運行示例
if __name__ == '__main__':
    results = process_serial()
    for r in results:
        print(r)
串行處理總耗時:11.75 秒
Task 0 completed in 1.27 seconds
Task 1 completed in 1.10 seconds
Task 2 completed in 1.35 seconds
Task 3 completed in 1.36 seconds
Task 4 completed in 1.42 seconds
Task 5 completed in 1.55 seconds
Task 6 completed in 0.74 seconds
Task 7 completed in 0.55 seconds
Task 8 completed in 1.40 seconds
Task 9 completed in 0.97 seconds

運行這段代碼,你會發現處理 10 個任務需要大約 10-15 秒。這顯然不夠高效。

使用傳統的 threading 模塊

讓我們先看看使用傳統的 threading 模塊如何改進:

import threading
from queue import Queue

def slow_operation(task_id):
    """模擬一個耗時的網絡請求"""
    sleep_time = random.uniform(0.5, 2)
    time.sleep(sleep_time)
    return f"Task {task_id} completed in {sleep_time:.2f} seconds"

def process_threading():
    start = time.perf_counter()
    results = []
    work_queue = Queue()
    lock = threading.Lock()
    
    # 填充工作隊列
    for i in range(10):
        work_queue.put(i)
    
    def worker():
        while True:
            try:
                task_id = work_queue.get_nowait()
                result = slow_operation(task_id)
                with lock:
                    results.append(result)
                work_queue.task_done()
            except Queue.Empty:
                break
    
    threads = []
    for _ in range(4):  # 使用4個線程
        t = threading.Thread(target=worker)
        t.start()
        threads.append(t)
    
    for t in threads:
        t.join()
    
    end = time.perf_counter()
    print(f"多線程處理總耗時:{end - start:.2f} 秒")
    return results
多線程處理總耗時:3.24 秒

這個版本使用了多線程,性能確實提升了,但代碼比較復雜,需要手動管理線程、鎖和隊列。

concurrent.futures 的優雅解決方案

現在,讓我們看看如何使用 concurrent.futures 來簡化代碼:

import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed

def slow_operation(task_id):
    """模擬一個耗時的網絡請求"""
    sleep_time = random.uniform(0.5, 2)
    time.sleep(sleep_time)
    return f"Task {task_id} completed in {sleep_time:.2f} seconds"

def process_concurrent():
    start = time.perf_counter()
    results = []
    
    # 創建線程池,設置最大線程數為4
    with ThreadPoolExecutor(max_workers=4) as executor:
        # 提交任務到線程池
        future_to_id = {executor.submit(slow_operation, i): i for i in range(10)}
        
        # 獲取結果
        for future in as_completed(future_to_id):
            results.append(future.result())
    
    end = time.perf_counter()
    print(f"concurrent.futures 處理總耗時:{end - start:.2f} 秒")
    return results

process_concurrent()
concurrent.futures 處理總耗時:3.54 秒

這里我們用到了幾個關鍵概念:

  • ThreadPoolExecutor :線程池執行器,用于管理一組工作線程。創建時可以指定最大線程數。
  • executor.submit() :向線程池提交一個任務。返回 Future 對象,代表將來某個時刻會完成的操作。
  • as_completed() :返回一個迭代器,在 Future 完成時產生對應的 Future 對象。這意味著結果是按照完成順序而不是提交順序返回的。

Future 對象的高級用法

Future 對象提供了多個有用的方法,讓我們通過實例來了解:

import time
import random
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED

def slow_operation(task_id):
    """模擬一個耗時的網絡請求"""
    sleep_time = random.uniform(0.5, 2)
    time.sleep(sleep_time)
    return f"Task {task_id} completed in {sleep_time:.2f} seconds"

def demonstrate_future_features():
    with ThreadPoolExecutor(max_workers=4) as executor:
        # 提交任務并獲取 Future 對象
        futures = [executor.submit(slow_operation, i) for i in range(10)]
        
        # 1. done() 檢查任務是否完成
        print("檢查第一個任務是否完成:", futures[0].done())
        
        # 2. 使用 wait() 等待部分任務完成
        done, not_done = wait(futures, return_when=FIRST_COMPLETED)
        print(f"完成的任務數: {len(done)}, 未完成的任務數: {len(not_done)}")
        
        # 3. 獲取結果時設置超時
        try:
            result = futures[0].result(timeout=1.0)
            print("獲取到結果:", result)
        except TimeoutError:
            print("獲取結果超時")
        
        # 4. cancel() 取消未開始的任務
        for f in not_done:
            cancelled = f.cancel()
            print(f"取消任務: {'成功' if cancelled else '失敗'}")

demonstrate_future_features()
檢查第一個任務是否完成: False
完成的任務數: 1, 未完成的任務數: 9
獲取到結果: Task 0 completed in 1.07 seconds
取消任務: 失敗
取消任務: 成功
取消任務: 成功
取消任務: 失敗
取消任務: 失敗
取消任務: 失敗
取消任務: 失敗
取消任務: 成功
取消任務: 失敗

線程/進程池還是異步 IO?

IO 密集型任務:優先選擇 asyncio

為什么選擇 asyncio ?

  • 更低的資源開銷 asyncio 使用協程,不需要創建額外的線程或進程
  • 更高的并發量:單線程可以輕松處理數千個并發任務
  • 沒有 GIL 的限制:協程在單線程內切換,完全規避了 GIL 的影響

讓我們通過一個網絡請求的例子來對比:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

# 模擬網絡請求
def sync_request(url):
    time.sleep(1)  # 模擬網絡延遲
    return f"Response from {url}"

async def async_request(url):
    await asyncio.sleep(1)  # 模擬網絡延遲
    return f"Response from {url}"

# 使用線程池
def thread_pool_example():
    urls = [f"http://example.com/{i}" for i in range(100)]
    start = time.perf_counter()
    
    with ThreadPoolExecutor(max_workers=20) as executor:
        results = list(executor.map(sync_request, urls))
    
    end = time.perf_counter()
    print(f"ThreadPoolExecutor 耗時: {end - start:.2f} 秒")
    return results

# 使用 asyncio
async def asyncio_example():
    urls = [f"http://example.com/{i}" for i in range(100)]
    start = time.perf_counter()
    
    tasks = [async_request(url) for url in urls]
    results = await asyncio.gather(*tasks)
    
    end = time.perf_counter()
    print(f"asyncio 耗時: {end - start:.2f} 秒")
    return results

if __name__ == '__main__':
    # 運行線程池版本
    thread_results = thread_pool_example()
    
    # 運行 asyncio 版本
    asyncio_results = asyncio.run(asyncio_example())
ThreadPoolExecutor 耗時: 5.03 秒
asyncio 耗時: 1.00 秒

在這個例子中, asyncio 版本通常會表現出更好的性能,尤其是在并發量大的情況下。

CPU 密集型任務:使用 ProcessPoolExecutor

為什么選擇多進程?

  • 繞過 GIL:每個進程都有自己的 Python 解釋器和 GIL
  • 充分利用多核性能:可以真正實現并行計算
  • 適合計算密集型任務:如數據處理、圖像處理等

來看一個計算密集型任務的對比:

import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def cpu_intensive_task(n):
    """計算密集型任務:計算大量浮點數運算"""
    result = 0
    for i in range(n):
        result += i ** 2 / 3.14
    return result

def compare_performance():
    numbers = [10**6] * 20  # 20個大規模計算任務
    
    # 使用線程池
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=4) as executor:
        thread_results = list(executor.map(cpu_intensive_task, numbers))
    thread_time = time.perf_counter() - start
    print(f"線程池耗時: {thread_time:.2f} 秒")
    
    # 使用進程池
    start = time.perf_counter()
    with ProcessPoolExecutor(max_workers=4) as executor:
        process_results = list(executor.map(cpu_intensive_task, numbers))
    process_time = time.perf_counter() - start
    print(f"進程池耗時: {process_time:.2f} 秒")

if __name__ == '__main__':
    compare_performance()
線程池耗時: 4.61 秒
進程池耗時: 1.34 秒

在這種場景下, ProcessPoolExecutor 的性能明顯優于 ThreadPoolExecutor 。

混合型任務:ThreadPoolExecutor 的優勢

為什么有時候選擇線程池?

  • 更容易與現有代碼集成:大多數 Python 庫都是基于同步設計的
  • 資源開銷比進程池小:線程共享內存空間
  1. 適合 IO 和 CPU 混合的場景:當任務既有 IO 操作又有計算時

示例場景:

import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def mixed_task(task_id):
    """混合型任務:既有 IO 操作又有計算"""
    # IO 操作
    time.sleep(0.5)
    
    # CPU 計算
    result = sum(i * i for i in range(10**5))
    
    # 再次 IO 操作
    time.sleep(0.5)
    
    return f"Task {task_id}: {result}"

def demonstrate_mixed_workload():
    tasks = range(10)
    
    # 使用線程池
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=4) as executor:
        thread_results = list(executor.map(mixed_task, tasks))
    thread_time = time.perf_counter() - start
    print(f"線程池處理混合任務耗時: {thread_time:.2f} 秒")
    
    # 使用進程池
    start = time.perf_counter()
    with ProcessPoolExecutor(max_workers=4) as executor:
        process_results = list(executor.map(mixed_task, tasks))
    process_time = time.perf_counter() - start
    print(f"進程池處理混合任務耗時: {process_time:.2f} 秒")

if __name__ == '__main__':
    demonstrate_mixed_workload()
線程池處理混合任務耗時: 3.05 秒
進程池處理混合任務耗時: 3.11 秒

選擇建議的決策樹

在選擇并發方案時,可以參考以下決策流程:

首先判斷任務類型:

  • 如果是純 IO 密集型(網絡請求、文件操作),優先選擇 asyncio。
  • 如果是純 CPU 密集型(大量計算),優先選擇 ProcessPoolExecutor。
  • 如果是混合型任務,考慮使用 ThreadPoolExecutor。

考慮其他因素

  • 現有代碼是否易于改造為異步?
  • 是否需要與同步代碼交互?
  • 并發量有多大?
  • 是否需要跨進程通信?
def choose_concurrency_model(task_type, 
                           concurrent_count,
                           legacy_code=False,
                           need_shared_memory=False):
    """幫助選擇并發模型的示例函數"""
    if task_type == "IO":
        if legacy_code or need_shared_memory:
            return "ThreadPoolExecutor"
        else:
            return "asyncio"
    elif task_type == "CPU":
        if need_shared_memory:
            return "ThreadPoolExecutor"
        else:
            return "ProcessPoolExecutor"
    else:  # mixed
        if concurrent_count > 1000:
            return "asyncio"
        else:
            return "ThreadPoolExecutor"

性能對比總結

方案

IO密集型

CPU密集型

混合型

資源開銷

代碼復雜度

asyncio

最佳

較差

最低

較高

ThreadPoolExecutor

較差

較好

ProcessPoolExecutor

一般

最佳

一般

總的來說,選擇合適的并發方案需要綜合考慮任務特性、性能需求、代碼復雜度等多個因素。在實際應用中,有時候甚至可以混合使用多種方案,以達到最優的性能表現。

實用技巧總結

控制線程池大小

def demonstrate_pool_sizing():
    # CPU 核心數
    cpu_count = os.cpu_count()
    # IO 密集型任務,線程數可以設置為核心數的 1-4 倍
    io_bound_workers = cpu_count * 2
    # CPU 密集型任務,線程數不應超過核心數
    cpu_bound_workers = cpu_count

    print(f"推薦的線程數:")
    print(f"IO 密集型任務:{io_bound_workers}")
    print(f"CPU 密集型任務:{cpu_bound_workers}")

批量提交任務

def demonstrate_batch_submit():
    with ThreadPoolExecutor(max_workers=4) as executor:
        results_ordered = list(executor.map(slow_operation, range(5)))

        futures = [executor.submit(slow_operation, i) for i in range(5)]
        results_completion = [f.result() for f in as_completed(futures)]

        return results_ordered, results_completion

錯誤處理

def demonstrate_error_handling():
    def faulty_operation(task_id):
        if task_id == 3:
            raise ValueError(f"Task {task_id} failed")
        return slow_operation(task_id)
    
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(faulty_operation, i) for i in range(5)]
        
        for future in as_completed(futures):
            try:
                result = future.result()
                print(f"成功:{result}")
            except Exception as e:
                print(f"錯誤:{str(e)}")

總結

concurrent.futures 模塊為 Python 并發編程提供了一個優雅的高級接口。相比傳統的 threading / multiprocessing 模塊,它具有以下優勢:

  • 使用線程池自動管理線程的生命周期
  • 提供簡潔的接口提交任務和獲取結果
  • 支持超時和錯誤處理
  • 代碼更加 Pythonic 和易于維護
責任編輯:姜華 來源: Piper蛋窩
相關推薦

2024-01-17 12:44:23

Python并發編程

2021-01-13 11:29:43

Python多線程異步

2021-05-12 22:07:43

并發編排任務

2024-04-24 12:34:08

Spring事務編程

2023-10-30 23:25:48

FuturesGo語言

2021-01-28 14:53:19

PHP編碼開發

2023-11-22 13:05:12

Pytest測試

2023-05-12 14:14:00

Java線程中斷

2017-04-12 11:16:08

Python終端編程

2018-08-20 10:40:09

Redis位圖操作

2025-02-12 00:21:44

Java并發編程

2025-01-16 08:08:29

2017-12-14 14:17:08

Windows使用技巧手冊

2021-03-24 10:20:50

Fonts前端代碼

2023-03-28 08:07:12

2021-02-25 22:17:19

開發技術編程

2018-07-23 08:19:26

編程語言Python工具

2024-11-13 16:37:00

Java線程池

2024-06-05 09:17:31

Python數據清洗開發

2020-11-05 18:30:32

接口測試
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 午夜在线| 欧美日韩成人一区二区 | 视频一区中文字幕 | 亚洲久在线 | 特级特黄特色的免费大片 | 91文字幕巨乱亚洲香蕉 | 久久久久久久久淑女av国产精品 | 日日摸日日添日日躁av | 伊人网站在线观看 | 日韩欧美1区2区 | 欧美日韩在线免费 | 欧美国产精品一区二区三区 | 精品国产欧美一区二区三区成人 | 国产色片| 亚洲综合二区 | 午夜一区二区三区在线观看 | 在线观看第一页 | 国产日韩一区二区三免费高清 | av在线免费不卡 | 黄色成人在线观看 | 欧美日韩精品一区二区天天拍 | 一区二区三区不卡视频 | 国产乱肥老妇国产一区二 | 在线免费观看黄a | 中国大陆高清aⅴ毛片 | 色婷婷综合久久久久中文一区二区 | 午夜影院在线观看 | 亚洲免费网站 | 成年人免费网站 | 精品少妇v888av | 免费视频久久久久 | 亚洲成人久久久 | 蜜臀久久99精品久久久久久宅男 | 夜夜爽99久久国产综合精品女不卡 | www.国产一区 | 中文字幕一区二区三区在线观看 | 亚洲国产精品久久 | 一区二区三区视频在线观看 | 久久久91精品国产一区二区三区 | 九九亚洲 | 日韩成人在线看 |