成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Rust Tokio取消任務的幾種模式,你知道嗎?

開發 前端
Tokio channel看起來有點復雜,但同時就程序的內存安全和彈性而言,它很強大。Tokio channel創建了兩個不同的對象,用于任務之間的通信,不能同時使用一個通道對象來接收和發送。

Rust提供了對異步編程的支持,它可以生成異步任務,然后通過運行時執行器在操作系統線程之間調度執行。

與Rust中的所有東西一樣,異步編程必須是內存安全的,因此需要確保借用檢查器可以編譯通過。

這篇文章是關于任務取消模式的,下面我們來介紹Tokio任務的取消模式。

Select 和 Channels

所有這些模式的核心是兩個tokio特性:

  • channel:用于任務間通信
  • select:用于等待多個異步計算(不一定是任務!)

Tokio channel看起來有點復雜,但同時就程序的內存安全和彈性而言,它很強大。Tokio channel創建了兩個不同的對象,用于任務之間的通信,不能同時使用一個通道對象來接收和發送。

Tokio提供的頻道實際上有四種:

  • mpsc:多個生產者,單一消費者
  • oneshot:用于發送和接收單個值,發送后,通道關閉。
  • broadcast:多個發送者,多個消費者
  • watch:單一生產者,多個消費者

Drop JoinHandle不會取消任務

JoinHandle在刪除關聯的任務時將其分離,這意味著不再有任何任務句柄,也沒有辦法對其進行連接。

每次在tokio中生成任務時,都會返回JoinHandle??梢允褂胘oin句柄來等待任務完成,但是認為可以使用它來簡單地通過刪除任務來強制終止任務是錯誤的。這里有一個愚蠢的例子:

use tokio::time::{self, Duration};

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // do some work
        tokio::time::sleep(Duration::from_secs(10)).await;
        println!("Task completed");
    });

    // 100毫秒后取消任務
    time::sleep(Duration::from_millis(100)).await;
    drop(handle);

    println!("Task was cancelled");
}

丟棄句柄并不會取消正在運行的任務!

Abort任務

這是取消任務的最極端的方式,沒有清理的空間:

use tokio::time::{self, Duration};

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // do some work
        tokio::time::sleep(Duration::from_secs(1)).await;
        println!("Task completed");
    });

    // 100毫秒后取消任務
    time::sleep(Duration::from_millis(100)).await;
    handle.abort();
    time::sleep(Duration::from_secs(2)).await;

    println!("Task was cancelled");
}

使用oneshot channel

oneshot channel允許通道上的發送單個值,可以由多個接收器偵聽。與drop模式不同,此模式允許通道執行一些清理工作。這里有一個例子:

use tokio::sync::oneshot;
use tokio::time::Duration;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    let task = tokio::spawn(async move {
        tokio::select! {
            _ = rx => {
                println!("Task is cancelling...");
            }
            _ = tokio::time::sleep(Duration::from_secs(10)) => {
                println!("Task completed normally");
            }
        }
        println!("Task is cleaning up");
    });

    tokio::time::sleep(Duration::from_millis(100)).await;

    // 發送取消信號
    let _ = tx.send(());

    // 等待任務完成
    let _ = task.await;
}

運行結果如下:

Task is cancelling...
Task is cleaning up

oneshot channel的限制是你不能用它來取消多個任務。

使用broadcast channel取消多個任務

如果要取消多個任務,可以使用broad channel。可以有多個生產者向通道發送信息,也可以有多個消費者從通道接收信息。每個接收方都可以看到在通道上發送的每個值。

這里有一個簡單的例子,來演示如何使用它來取消多個任務:

use tokio::sync::broadcast;
use tokio::time::Duration;

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = broadcast::channel(1);
    let mut rx2 = tx.subscribe();

    let task1 = tokio::spawn(async move {
        tokio::select! {
            _ = rx1.recv() => {
                println!("Task 1 is cancelling...");
            }
            _ = tokio::time::sleep(Duration::from_secs(10)) => {
                println!("Task 1 completed normally");
            }
        }
        println!("Task 1 is cleaning up");
    });

    let task2 = tokio::spawn(async move {
        tokio::select! {
            _ = rx2.recv() => {
                println!("Task 2 is cancelling...");
            }
            _ = tokio::time::sleep(Duration::from_secs(10)) => {
                println!("Task 2 completed normally");
            }
        }
        println!("Task 2 is cleaning up");
    });

    tokio::time::sleep(Duration::from_millis(100)).await;

    // 發送取消信號
    let _ = tx.send(());

    // 等待任務完成
    let _ = tokio::join!(task1, task2);
}

運行結果如下:

Task 2 is cancelling...
Task 2 is cleaning up
Task 1 is cancelling...
Task 1 is cleaning up

取消的順序可能會有所不同,因為任務可能會以不同的順序取消!

如果只想從單個任務向多個任務發送取消信號,那么broad channel可能有點過度,因為它提供了在多個任務之間傳遞消息的所有機制。

如果既需要消息傳遞又需要消息取消,這很方便。但如果只需要消息取消,還有更好的方法,開銷更少:watch channel。

使用watch channel取消多個任務

watch channel是多個消費者頻道的單一生產者。watch channel給了任務清理自己的機會。缺點是,消費者只能看到通道上發送的最近的值——這意味著如果任務在通道上發送了一個值之后啟動,它可能會錯過它,因此不會被取消,所以要小心這一點。這里有一個簡單的例子:

use tokio::sync::watch;
use tokio::time::Duration;

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = watch::channel(false);
    let mut rx2 = tx.subscribe();

    let task1 = tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = rx1.changed() => {
                    if *rx1.borrow() {
                        println!("Task 1 is cancelling...");
                        break;
                    }
                }
                _ = tokio::time::sleep(Duration::from_secs(10)) => {
                    println!("Task 1 completed normally");
                    break;
                }
            }
        }
        println!("Task 1 is cleaning up");
    });

    let task2 = tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = rx2.changed() => {
                    if *rx2.borrow() {
                        println!("Task 2 is cancelling...");
                        break;
                    }
                }
                _ = tokio::time::sleep(Duration::from_secs(10)) => {
                    println!("Task 2 completed normally");
                    break;
                }
            }
        }
        println!("Task 2 is cleaning up");
    });

    tokio::time::sleep(Duration::from_millis(100)).await;

    // 發送取消信號
    let _ = tx.send(true);

    // 等待任務完成
    let _ = tokio::join!(task1, task2);
}

取消令牌

官方的Tokio文檔中列出了一種名為CancellationToken的東西,用于優雅關機。這在tokio crate本身中不可用,但在相關的toko_util crate中可用。

use tokio::time::{sleep, Duration};
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() {
    // Create a CancellationToken
    let token = CancellationToken::new();

    let token1 = token.clone();
    let token2 = token.clone();

    let task1 = tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = token1.cancelled() => {
                        println!("Task 1 is cancelling...");
                        break;
                }
                _ = tokio::time::sleep(Duration::from_secs(10)) => {
                    println!("Task 1 completed normally");
                    break;
                }
            }
        }
        println!("Task 1 is cleaning up");
    });

    let task2 = tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = token2.cancelled() => {
                        println!("Task 2 is cancelling...");
                        break;
                }
                _ = tokio::time::sleep(Duration::from_secs(10)) => {
                    println!("Task 2 completed normally");
                    break;
                }
            }
        }
        println!("Task 2 is cleaning up");
    });

    sleep(Duration::from_millis(100)).await;

    // 發送取消信號
    token.cancel();

    // 等待任務完成
    let _ = tokio::join!(task1, task2);
}

請注意我們是如何克隆令牌的,以便將其移動到各個異步任務中。

責任編輯:武曉燕 來源: coding到燈火闌珊
相關推薦

2024-07-01 08:40:18

tokio派生線程

2024-02-05 12:08:07

線程方式管理

2024-09-27 09:53:22

Rust標準庫優化

2018-09-12 11:18:56

finalJava用法

2022-07-05 08:05:00

策略模式接口實現類

2022-09-20 14:11:37

JVM調優命令

2024-11-01 10:48:01

C#WPF程序

2023-12-12 08:41:01

2019-02-12 11:15:15

Spring設計模式Java

2024-06-12 08:05:06

2024-11-26 14:29:48

2019-12-12 09:23:29

Hello World操作系統函數庫

2022-03-10 08:25:27

JavaScrip變量作用域

2023-12-20 08:23:53

NIO組件非阻塞

2023-04-26 10:21:04

2024-04-30 09:02:48

2021-02-06 21:57:40

Debug模式Release

2024-09-18 07:00:00

消息隊列中間件消息隊列

2021-10-14 06:52:47

算法校驗碼結構

2022-09-29 15:32:58

云計算計算模式
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 欧美日韩视频在线 | 中文字幕一区二区三区在线观看 | www.操.com | 国产中文在线 | 久久久免费观看视频 | 中文字幕一区二区三区乱码图片 | 一区二区三区免费 | 一区在线播放 | 一级特黄在线 | 天天插天天操 | 国产精品区二区三区日本 | 精品九九久久 | 欧美一页| 国产精品资源在线观看 | a黄视频| 成年免费大片黄在线观看岛国 | 欧美在线免费 | 巨大荫蒂视频欧美另类大 | 国产av毛片| 中文字幕精品一区二区三区精品 | 国产亚洲精品精品国产亚洲综合 | 国产精品一区二 | 黄色在线免费观看 | 黄色成人在线观看 | 国产精品一区二区不卡 | 看羞羞视频免费 | 伊人久久综合 | 日韩精品激情 | 香蕉视频久久久 | 欧美天堂 | 午夜精品一区 | 精品国产乱码久久久久久影片 | 国产二区在线播放 | 久久99精品久久久久久 | av大片 | 日日噜噜噜夜夜爽爽狠狠视频, | 亚洲精品国产电影 | 国产一级片一区二区 | 国产亚洲一区二区三区在线 | 国产一区二区在线看 | 中文字幕在线一区二区三区 |