成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

多線程必考的「生產者 - 消費者」模型,看這篇文章就夠了

開發 前端
生產者 - 消費者模型 Producer-consumer problem 是一個非常經典的多線程并發協作的模型,在分布式系統里非常常見。也是面試中無論中美大廠都非常愛考的一個問題,對應屆生問的要少一些,但是對于有工作經驗的工程師來說,非常愛考。

[[341897]]

 生產者 - 消費者模型 Producer-consumer problem 是一個非常經典的多線程并發協作的模型,在分布式系統里非常常見。也是面試中無論中美大廠都非常愛考的一個問題,對應屆生問的要少一些,但是對于有工作經驗的工程師來說,非常愛考。

這個問題有非常多的版本和解決方式,在本文我重點是和大家壹齊理清思路,由淺入深的思考問題,保證大家看完了都能有所收獲。

問題背景

簡單來說,這個模型是由兩類線程構成:

  • 生產者線程:“生產”產品,并把產品放到一個隊列里;
  • 消費者線程:“消費”產品。
  • 隊列:數據緩存區。

有了這個隊列,生產者就只需要關注生產,而不用管消費者的消費行為,更不用等待消費者線程執行完;消費者也只管消費,不用管生產者是怎么生產的,更不用等著生產者生產。

所以該模型實現了生產者和消費者之間的解藕和異步。

什么是異步呢?

比如說你和你女朋友打電話,就得等她接了電話你們才能說話,這是同步。

但是如果你跟她發微信,并不需要等她回復,她也不需要立刻回復,而是等她有空了再回,這就是異步。

但是呢,生產者和消費者之間也不能完全沒有聯系的。

  • 如果隊列里的產品已經滿了,生產者就不能繼續生產;
  • 如果隊列里的產品從無到有,生產者就得通知一下消費者,告訴它可以來消費了;
  • 如果隊列里已經沒有產品了,消費者也無法繼續消費;
  • 如果隊列里的產品從滿到不滿,消費者也得去通知下生產者,說你可以來生產了。

所以它們之間還需要有協作,最經典的就是使用 Object 類里自帶的 wait() 和 notify() 或者 notifyAll() 的消息通知機制。

上述描述中的等著,其實就是用 wait() 來實現的;

而通知,就是 notify() 或者 notifyAll() 。

那么基于這種消息通知機制,我們還能夠平衡生產者和消費者之間的速度差異。

如果生產者的生產速度很慢,但是消費者消費的很快,就像是我們每月工資就發兩次,但是每天都要花錢,也就是 1:15.

那么我們就需要調整生產者(發工資)為 15 個線程,消費者保持 1 個線程,這樣是不是很爽~

總結下該模型的三大優點:

解藕,異步,平衡速度差異。

wait()/notify()

接下來我們需要重點看下這個通知機制。

wait() 和 notify() 都是 Java 中的 Object 類自帶的方法,可以用來實現線程間的通信。

在上一節講的 11 個 APIs 里我也提到了它,我們這里再展開講一下。

wait() 方法是用來讓當前線程等待,直到有別的線程調用 notify() 將它喚醒,或者我們可以設定一個時間讓它自動蘇醒。

調用該方法之前,線程必須要獲得該對象的對象監視器鎖,也就是只能用在加鎖的方法下。

而調用該方法之后,當前線程會釋放鎖。(提示:這里很重要,也是下文代碼中用 while 而非 if 的原因。)

notify() 方法只能通知一個線程,如果多個線程在等待,那就喚醒任意一個。

notifyAll() 方法是可以喚醒所有等待線程,然后加入同步隊列。

這里我們用到了 2 個隊列:

  • 同步隊列:對應于我們上一節講的線程狀態中的 Runnable,也就是線程準備就緒,就等著搶資源了。
  • 等待隊列:對應于我們上一節講的線程狀態中的 Waiting,也就是等待狀態。

這里需要注意,從等待狀態線程無法直接進入 Q2,而是要先重新加入同步隊列,再次等待拿鎖,拿到了鎖才能進去 Q2;一旦出了 Q2,鎖就丟了。

在 Q2 里,其實只有一個線程,因為這里我們必須要加鎖才能進行操作。

實現

這里我首先建了一個簡單的 Product 類,用來表示生產和消費的產品,大家可以自行添加更多的 fields。

  1. public class Product  { 
  2.     private String name
  3.  
  4.     public Product(String name) { 
  5.         this.name = name
  6.     } 
  7.  
  8.     public String getName() { 
  9.         return name
  10.     } 
  11.  
  12.     public void setName(String name) { 
  13.         this.name = name
  14.     } 

主函數里我設定了兩類線程,并且這里選擇用普通的 ArrayDeque 來實現 Queue,更簡單的方式是直接用 Java 中的 BlockingQueue 來實現。

BlockingQueue 是阻塞隊列,它有一系列的方法可以讓線程實現自動阻塞,常用的 BlockingQueue 有很多,后面會單獨出一篇文章來講。

這里為了更好的理解并發協同的這個過程,我們先自己處理。

  1. public class Test { 
  2.     public static void main(String[] args) { 
  3.         Queue<Product> queue = new ArrayDeque<>(); 
  4.  
  5.         for (int i = 0; i < 100; i++) { 
  6.             new Thread(new Producer(queue, 100)).start(); 
  7.             new Thread(new Consumer(queue, 100)).start(); 
  8.         } 
  9.     } 

然后就是 Producer 和 Consumer 了。

  1. public class Producer implements Runnable{ 
  2.     private Queue<Product> queue; 
  3.     private int maxCapacity; 
  4.  
  5.     public Producer(Queue queue, int maxCapacity) { 
  6.         this.queue = queue; 
  7.         this.maxCapacity = maxCapacity; 
  8.     } 
  9.  
  10.     @Override 
  11.     public void run() { 
  12.         synchronized (queue) { 
  13.             while (queue.size() == maxCapacity) { //一定要用 while,而不是 if,下文解釋 
  14.                 try { 
  15.                     System.out.println("生產者" + Thread.currentThread().getName() + "等待中... Queue 已達到最大容量,無法生產"); 
  16.                     wait(); 
  17.                     System.out.println("生產者" + Thread.currentThread().getName() + "退出等待"); 
  18.                 } catch (InterruptedException e) { 
  19.                     e.printStackTrace(); 
  20.                 } 
  21.             } 
  22.             if (queue.size() == 0) { //隊列里的產品從無到有,需要通知在等待的消費者 
  23.                 queue.notifyAll(); 
  24.             } 
  25.             Random random = new Random(); 
  26.             Integer i = random.nextInt(); 
  27.             queue.offer(new Product("產品"  + i.toString())); 
  28.             System.out.println("生產者" + Thread.currentThread().getName() + "生產了產品:" + i.toString()); 
  29.         } 
  30.     } 

其實它的主邏輯很簡單,我這里為了方便演示加了很多打印語句才顯得有點復雜。

我們把主要邏輯拎出來看:

  1. public void run() { 
  2.        synchronized (queue) { 
  3.            while (queue.size() == maxCapacity) { //一定要用 while,而不是 if,下文解釋 
  4.                try { 
  5.                    wait(); 
  6.                } catch (InterruptedException e) { 
  7.                    e.printStackTrace(); 
  8.                } 
  9.            } 
  10.            if (queue.size() == 0) { 
  11.                queue.notifyAll(); 
  12.            } 
  13.            queue.offer(new Product("產品"  + i.toString())); 
  14.        } 
  15.    } 

這里有 3 塊內容,再對照這個過程來看:

  1. 生產者線程拿到鎖后,其實就是進入了 Q2 階段。首先檢查隊列是否容量已滿,如果滿了,那就要去 Q3 等待;
  2. 如果不滿,先檢查一下隊列原本是否為空,如果原來是空的,那就需要通知消費者;
  3. 最后生產產品。

這里有個問題,為什么只能用 while 而不是 if?

其實在這一小段,生產者線程經歷了幾個過程:

  1. 如果隊列已滿,它就沒法生產,那也不能占著位置不做事,所以要把鎖讓出來,去 Q3 - 等待隊列 等著;
  2. 在等待隊列里被喚醒之后,不能直接奪過鎖來,而是要先加入 Q1 - 同步隊列 等待資源;
  3. 一旦搶到資源,關門上鎖,才能來到 Q2 繼續執行 wait() 之后的活,但是,此時這個隊列有可能又滿了,所以退出 wait() 之后,還需要再次檢查 queue.size() == maxCapacity 這個條件,所以要用 while。

那么為什么可能又滿了呢?

因為線程沒有一直拿著鎖,在被喚醒之后,到拿到鎖之間的這段時間里,有可能其他的生產者線程先拿到了鎖進行了生產,所以隊列又經歷了一個從不滿到滿的過程。

總結:在使用線程的等待通知機制時,一般都要在 while 循環中調用 wait() 方法。

消費者線程是完全對稱的,我們來看代碼。

  1. public class Consumer implements Runnable{ 
  2.     private Queue<Product> queue; 
  3.     private int maxCapacity; 
  4.  
  5.     public Consumer(Queue queue, int maxCapacity) { 
  6.         this.queue = queue; 
  7.         this.maxCapacity = maxCapacity; 
  8.     } 
  9.  
  10.     @Override 
  11.     public void run() { 
  12.         synchronized (queue) { 
  13.             while (queue.isEmpty()) { 
  14.                 try { 
  15.                     System.out.println("消費者" + Thread.currentThread().getName() + "等待中... Queue 已缺貨,無法消費"); 
  16.                     wait(); 
  17.                     System.out.println("消費者" + Thread.currentThread().getName() + "退出等待"); 
  18.                 } catch (InterruptedException e) { 
  19.                     e.printStackTrace(); 
  20.                 } 
  21.             } 
  22.             if (queue.size() == maxCapacity) { 
  23.                 queue.notifyAll(); 
  24.             } 
  25.  
  26.             Product product = queue.poll(); 
  27.             System.out.println("消費者" + Thread.currentThread().getName() + "消費了:" + product.getName()); 
  28.         } 
  29.     } 

結果如下:

小結

生產者 - 消費者問題是面試中經常會遇到的題目,本文首先講了該模型的三大優點:解藕,異步,平衡速度差異,然后講解了等待/通知的消息機制以及在該模型中的應用,最后進行了代碼實現。

文中所有代碼已經放到了我的 Github 上:https://github.com/xiaoqi6666/NYCSDE。

這個 Github 匯總了我所有的文章和資料,之后也會一直更新和維護,還希望大家給小齊點個 Star,你們的支持和認可,就是我創作的最大動力,我們下篇文章見!

本文轉載自微信公眾號「碼農田小齊」,可以通過以下二維碼關注。轉載本文請聯系碼農田小齊公眾號。

 

責任編輯:武曉燕 來源: 碼農田小齊
相關推薦

2012-02-14 12:31:27

Java

2017-05-16 12:30:21

Python多線程生產者消費者模式

2017-03-30 22:41:55

虛擬化操作系統軟件

2021-12-22 11:00:05

模型Golang語言

2015-08-26 09:39:30

java消費者

2024-07-05 11:01:13

2021-11-10 07:47:48

Traefik邊緣網關

2009-08-13 13:14:31

C#生產者和消費者

2024-10-11 09:27:52

2019-09-25 09:17:43

物聯網技術信息安全

2022-05-27 08:18:00

HashMapHash哈希表

2024-03-26 00:00:06

RedisZSet排行榜

2018-10-31 17:22:25

AI人工智能芯片

2024-03-14 11:58:43

2019-10-31 09:48:53

MySQL數據庫事務

2024-09-27 11:51:33

Redis多線程單線程

2021-08-31 10:26:24

存儲

2018-08-17 09:14:43

餓了么容器演進

2020-10-13 07:44:40

緩存雪崩 穿透

2024-02-28 08:59:47

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 91色视频在线观看 | 欧洲精品码一区二区三区免费看 | 日韩视频―中文字幕 | 九色国产 | 国产成人免费视频 | 国产在线视频三区 | 欧美久久一区 | 亚洲国产一区二区三区在线观看 | 毛片一级网站 | 成人国产免费视频 | 精品国产乱码久久久久久久久 | 精品国产成人 | 久久久国产精品 | 一级黄色毛片 | av网站免费在线观看 | 国产精品久久久久久久久久免费看 | 中文字幕在线不卡播放 | 久国产视频 | 国产精品1区 | av中文在线| 日本xx视频免费观看 | 精品久久香蕉国产线看观看亚洲 | 四虎影视1304t | 日韩精品一区二区三区中文字幕 | 久久在看 | 久久精品99| 成人综合视频在线 | 国产精品久久久久一区二区三区 | 日韩国产中文字幕 | 男人亚洲天堂 | 国产一级毛片精品完整视频版 | 国产日韩欧美综合 | 成人欧美一区二区三区在线播放 | 国产大片黄色 | 午夜精品久久久久久久 | 一级看片免费视频囗交动图 | 成年网站在线观看 | 亚洲人在线 | 亚洲一区二区三区四区五区中文 | 欧美高清性xxxxhdvideosex | 99久久精品国产毛片 |