成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

使用Rust構(gòu)建可以并發(fā)執(zhí)行多個(gè)任務(wù)的線程池

開發(fā) 前端
線程池是工作線程的集合,創(chuàng)建這些線程是為了同時(shí)執(zhí)行多個(gè)任務(wù)并等待新任務(wù)的到來。這意味著一開始創(chuàng)建了多個(gè)線程,并且所有線程都處于空閑狀態(tài)。

在這篇文章中讓我們探討一下如何使用Rust構(gòu)建線程池來并發(fā)地管理多個(gè)任務(wù)。

在開始實(shí)際的編碼之前,讓我們首先了解線程池是什么以及它是如何工作的。

線程池

線程池是工作線程的集合,創(chuàng)建這些線程是為了同時(shí)執(zhí)行多個(gè)任務(wù)并等待新任務(wù)的到來。這意味著一開始創(chuàng)建了多個(gè)線程,并且所有線程都處于空閑狀態(tài)。

每當(dāng)你的系統(tǒng)獲得任務(wù)時(shí),它可以快速地將任務(wù)分配給這些線程,從而節(jié)省大量時(shí)間,而無需多次創(chuàng)建和刪除線程。

圖片圖片

正如圖所看到的,線程池是等待從主線程接收任務(wù)以執(zhí)行的多個(gè)線程的集合。

在該圖中,主線程中總共有15個(gè)任務(wù),所有這些任務(wù)都被轉(zhuǎn)發(fā)給不同的工作線程并發(fā)執(zhí)行。了解了線程池的概念后,讓我們來理解線程池的內(nèi)部工作原理。

線程池是如何工作的?

在線程池體系結(jié)構(gòu)中,主線程只有兩個(gè)任務(wù):

1,接收所有的任務(wù)并將它們存儲(chǔ)在一個(gè)地方。

2,創(chuàng)建多個(gè)線程,并定期為它們分配不同的任務(wù)。

在接收任務(wù)之前創(chuàng)建線程集,并使用ID存儲(chǔ)在某個(gè)地方,以便我們可以通過ID識(shí)別它們。

然后每個(gè)線程都在等待接收任務(wù),如果它們得到任務(wù),就開始處理任務(wù)。完成任務(wù)后,他們再次等待下一個(gè)任務(wù)。

當(dāng)該線程忙于執(zhí)行任務(wù)時(shí),主線程將更多的任務(wù)分配給其他線程,這樣在主線程結(jié)束任務(wù)之前沒有線程空閑。在完成所有任務(wù)后,主線程終止所有線程并關(guān)閉線程池。

現(xiàn)在我們了解了線程池是如何工作的。接下來,讓我們使用Rust實(shí)現(xiàn)一個(gè)線程池。

使用Rust實(shí)現(xiàn)線程池

1. 創(chuàng)建線程

我們需要一個(gè)函數(shù)來生成一個(gè)線程并返回它的JoinHandle。

此外,我們需要知道線程的ID,如果我們搞砸了,就可以用線程ID記錄錯(cuò)誤,這樣我們就可以知道哪個(gè)線程出錯(cuò)了。

可以看出,如果兩個(gè)相互關(guān)聯(lián)的數(shù)據(jù)需要組合,需要一個(gè)結(jié)構(gòu)體。我們來創(chuàng)建一個(gè):

struct Worker {
    id: usize,
    thread: JoinHandle<()>
}

現(xiàn)在我們實(shí)現(xiàn)一個(gè)可以返回新Worker的構(gòu)造函數(shù):

impl Worker {
    fn new(id: usize) -> Self {
        let thread = thread::spawn(|| {});

        Self {id, thread}
    }
}

現(xiàn)在,我們的函數(shù)已經(jīng)準(zhǔn)備好創(chuàng)建線程并將它們返回給調(diào)用者。

2. 存放線程

我們需要一個(gè)結(jié)構(gòu)來保存所有線程的所有JoinHandles,我們還想控制線程池可以擁有多少線程。

這意味著,我們需要一個(gè)帶有構(gòu)造函數(shù)的結(jié)構(gòu)體,該函數(shù)指定一個(gè)數(shù)字來指示線程的數(shù)量,并且必須調(diào)用Worker來創(chuàng)建線程。

struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    fn new(size: usize) -> Self {
        assert!(size > 0, "Need at least 1 worker!");

        let mut workers = Vec::with_capacity(size);

        for i in 0..size {
            workers.push(Worker::new(i));
        }

        Self { workers }
    }
}

我們有了創(chuàng)建線程和管理線程的函數(shù),現(xiàn)在是時(shí)候創(chuàng)建一個(gè)可以將任務(wù)分配給不同線程的函數(shù)了。

3. 給線程分配任務(wù)

我們的線程池結(jié)構(gòu)體必須有一個(gè)函數(shù),該函數(shù)可以在線程內(nèi)部分配和執(zhí)行任務(wù)。但是有一個(gè)問題,我們?nèi)绾螌⑷蝿?wù)發(fā)送給線程,以便線程能夠執(zhí)行任務(wù)?

為此,我們需要一個(gè)task類型來表示我們需要完成的任務(wù):

type task = Box<dyn FnOnce() + Send + 'static>;

在這里,意味著我們的任務(wù)必須實(shí)現(xiàn)Box<dyn>里的這些Trait:

1,實(shí)現(xiàn)FnOnce()意味著我們的任務(wù)是一個(gè)只能運(yùn)行一次的函數(shù)。

2,實(shí)現(xiàn)Send,因?yàn)槲覀儗⑷蝿?wù)從主線程發(fā)送到工作線程,所以將任務(wù)設(shè)置為Send類型,以便它可以在線程之間安全地傳輸。

3,實(shí)現(xiàn)'static,意味著我們的任務(wù)必須和程序運(yùn)行的時(shí)間一樣長。

現(xiàn)在是時(shí)候?qū)⑷蝿?wù)發(fā)送給工作線程了,但要做到這一點(diǎn),我們必須在主線程和所有工作線程之間建立一個(gè)通道,因此我們需要使用Arc<Mutex<()>>。

讓我們來更新這兩個(gè)構(gòu)造函數(shù):

struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>
}

impl ThreadPool {
    fn new(size: usize) -> Self {
        assert!(size > 0, "Need at least 1 worker!");

        let (sender, reciever) = mpsc::channel();
        let reciever = Arc::new(Mutex::new(reciever));

        let mut workers = Vec::with_capacity(size);

        for i in 0..size {
            workers.push(Worker::new(i, Arc::clone(&reciever)));
        }

        Self {
            workers,
            sender: Some(sender)
        }
    }
}

impl Worker {
    fn new(id: usize, reciever: Arc<Mutex<Receiver<Task>>>) -> Self {
        let thread = thread::spawn(move || {});

        Self {
            id,
            thread
        }
    }
}

在ThreadPool構(gòu)造函數(shù)中,我們創(chuàng)建了一個(gè)新的通道,并在Arc<Mutex<()>>中封裝了接收器,我們把接收器發(fā)送給工作線程,以便主線程可以發(fā)送任務(wù),工作線程可以接收任務(wù)。

此外,我們必須更新ThreadPool結(jié)構(gòu)體,以包含一個(gè)發(fā)送者,它將被主線程用來向不同的線程發(fā)送任務(wù)。

現(xiàn)在,讓我們實(shí)現(xiàn)在工作線程中執(zhí)行任務(wù)的邏輯:

fn new(id: usize, reciever: Arc<Mutex<Receiver<task>>>) -> Self {
    let thread = thread::spawn(move || {
        loop {
            let receiver = reciever.lock()
                .expect("Failed to grab the lock!")
                .recv();

            match receiver {
                Ok(task) => {
                    println!("Thread {} got the task& executing.", id);
                    task();
                    thread::sleep(Duration::from_millis(10));
                },

                Err(_) => {
                    println!("No got the task");
                    break;
                }
            }
        }
    });

    Self {
        id,
        thread
    }
}

這里,在每個(gè)循環(huán)中,我們都試圖獲得鎖并調(diào)用鎖上的recv(),以便我們可以獲得主線程發(fā)送的任務(wù)。

接下來,我們在ThreadPool中實(shí)現(xiàn)一個(gè)函數(shù),將任務(wù)發(fā)送到不同的線程。

impl ThreadPool {
    fn new(size: usize) -> Self {
        // snip
    }

    fn execute<F>(&self, job: F)
    where
        F: FnOnce() + Send + 'static
    {
        let job = Box::new(job);

        self.sender.send(job)
            .expect("Failed to send the job to workers!");
    }
}

我們還需要?jiǎng)?chuàng)建一個(gè)函數(shù),在ThreadPool結(jié)束時(shí)動(dòng)態(tài)終止所有線程。簡單地說,我們必須手動(dòng)實(shí)現(xiàn)ThreadPool的Drop特性,在那里我們將終止所有線程。

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Thread {} is shutting down.", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join()..unwrap_or_else(|_| panic!("Failed to join the thread {}", worker.id));}
        }
    }
}

這里我們還必須刪除發(fā)送方,因?yàn)槿绻覀儾贿@樣做,那么接收方將永遠(yuǎn)循環(huán)。如果刪除發(fā)送者,那么接收者也會(huì)自動(dòng)刪除,我們就可以成功地退出這個(gè)程序。

測試

main函數(shù)代碼如下:

fn main() {
    let pool = ThreadPool::new(5);

    for _ in 0..10 {
        pool.execute(|| println!("Doing something"));
    }
}

運(yùn)行結(jié)果:

Thread 0 is shutting down.
Thread 0 got the job & executing.
Doing something
Thread 3 got the job & executing.
Doing something
Thread 1 got the job & executing.
Thread 2 got the job & executing.
Doing something
Thread 4 got the job & executing.
Doing something
Doing something
Thread 0 got the job & executing.
Doing something
Thread 4 got the job & executing.
Doing something
Thread 3 got the job & executing.
Doing something
Thread 2 got the job & executing.
Doing something
Thread 1 got the job & executing.
Doing something
No got the job
Thread 1 is shutting down.
No got the job
No got the job
No got the job
No got the job
Thread 2 is shutting down.
Thread 3 is shutting down.
Thread 4 is shutting down.


責(zé)任編輯:武曉燕 來源: coding到燈火闌珊
相關(guān)推薦

2022-03-28 08:31:29

線程池定時(shí)任務(wù)

2021-02-06 14:02:55

線程池Builder模式

2023-08-04 11:04:03

線程池項(xiàng)目開發(fā)

2023-07-05 07:48:04

線程池join關(guān)閉狀態(tài)

2022-04-26 08:41:38

Swift并發(fā)系統(tǒng)iOS

2023-12-29 09:38:00

Java線程池

2020-09-04 10:29:47

Java線程池并發(fā)

2017-01-10 13:39:57

Python線程池進(jìn)程池

2020-12-10 07:00:38

編程線程池定時(shí)任務(wù)

2019-08-16 11:16:25

Java程序員流程圖

2022-09-29 09:19:04

線程池并發(fā)線程

2022-11-09 09:01:08

并發(fā)編程線程池

2024-12-27 09:08:25

2022-03-30 08:54:21

線程 Thread判斷線程池任務(wù)Java

2023-11-29 16:38:12

線程池阻塞隊(duì)列開發(fā)

2021-09-11 15:26:23

Java多線程線程池

2020-02-17 16:28:49

開發(fā)技能代碼

2024-09-13 09:55:38

RustP2P網(wǎng)

2024-10-14 13:12:59

2023-04-19 07:39:55

RustHTTP服務(wù)器
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 国产福利精品一区 | 香蕉婷婷| 美女视频黄的 | 精品人伦一区二区三区蜜桃网站 | 天堂资源最新在线 | 在线视频99| 欧美三级视频 | 高清一区二区三区 | 三级视频网站 | 国产成人精品福利 | 一区二区在线视频 | 中国xxxx性xxxx产国 | av一级| 成人国产精品久久久 | 正在播放国产精品 | 欧美日韩三区 | 欧美一级片在线 | 亚洲精品视频在线看 | 超碰最新在线 | 国内91在线 | 国产一在线观看 | www.v888av.com | av中文在线观看 | 日本精品久久久久久久 | 国产综合久久久久久鬼色 | 国内精品久久久久 | www.一级毛片 | 日本视频免费观看 | 国产激情 | 91大神在线资源观看无广告 | 欧美一区免费 | 中文字幕av在线 | 亚洲性视频 | 一级黄色片免费 | 婷婷精品| 国产97在线视频 | 综合国产| 四虎影院在线播放 | 成人精品久久 | 国产目拍亚洲精品99久久精品 | 一级片网址 |