成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

剖析 Redis List 消息隊列的三種消費線程模型

數據庫 Redis
使用 List 做消息隊列,不可避免的會有消息丟失,所以我們需要用定時任務做補償,每隔一段時間去業務表里查詢業務狀態機,若狀態機不符合條件,則觸發補償策略。

Redis 列表(List)是一種簡單的字符串列表,它的底層實現是一個雙向鏈表。

生產環境,很多公司都將 Redis 列表應用于輕量級消息隊列 。這篇文章,我們聊聊如何使用 List 命令實現消息隊列的功能以及剖析消費者線程模型 。

圖片圖片

一、核心流程

生產者使用 LPUSH key element[element...] 將消息插入到隊列的頭部,如果 key 不存在則會創建一個空的隊列再插入消息。

如下,生產者向隊列 queue 先后插入了 「Java」「勇哥」「Go」,返回值表示消息插入隊列后的個數。

> LPUSH queue Java 勇哥 Go
(integer) 3

消費者使用 RPOP key 依次讀取隊列的消息,先進先出,所以 「Java」會先讀取消費:

> RPOP queue
"Java"
> RPOP queue
"勇哥"
> RPOP queue
"Go"

圖片圖片

接下來,我們可以通過 spring-data-redis API 演示生產消費流程:

  • 生產者
redisTemplate.opsForList().leftPush("queue" , "Java");
redisTemplate.opsForList().leftPush("queue" , "勇哥");
redisTemplate.opsForList().leftPush("queue" , "Go");
  • 消費者

我們啟動一個獨立的線程從隊列中讀取消息(RPOP 命令),讀取成功之后,消費消息,若沒有消息,則休眠一會,下一次循環再繼續。

圖片圖片

上圖的偽代碼中, while(true) 循環內不停地調用 RPOP 指令,當有消息時,可以及時處理,但假如沒有讀取到消息,則需要休眠一會。

這里要加休眠,主要是為了減少空讀的頻率,避免 CPU 無意義的消耗。

有什么更優化的方式嗎?有,那就是使用 Redis 阻塞讀取 List 的命令。

Redis 提供了 BLPOP、BRPOP 阻塞讀取的命令,消費者在在讀取隊列沒有數據的時自動阻塞,直到有新的消息寫入隊列,才會繼續讀取新消息執行業務邏輯。

BRPOP queue 0

參數 0 表示阻塞等待時間無限制 。

圖片圖片

如圖,我們啟動一個消費線程永動機,消費線程拉取消息后,執行消費邏輯。

這種消費者線程模型非常容易理解,同時也非常適合順序消費的模式。同時,假如我們在消費消息時,服務器宕機或者斷電,可能丟失一條消息。

接下來,我們想一想,有沒有消費速度更高的消費模型嗎?筆者根據過往的經歷,列舉三種模式:

  • 拉取線程 + 消費線程池(非阻塞模式)
  • 拉取線程 + 消費線程池 (阻塞模式)
  • 拉取線程 + Disruptor(阻塞模式)

二、拉取線程 + 消費線程池(非阻塞模式)

為了提升消費速度,我們可以將拉取和消費拆分成兩種動作,分別通過不同的線程池來處理。拉取線程池負責拉取消息,消費線程池負責消費消息。

圖片圖片

偽代碼類似:

圖片圖片

如圖,在拉取線程內部,我們拉取完消息后,將消息提交到消費線程 consumeExecutor 。

這樣方式可以通過多線程執行大幅度提升消費速度 ,但是這里還是有一個問題:

假如消費速度很慢,生產者速度很高,那么就會在線程池內容易產生消息堆積,這里面會產生兩個隱形風險:

  • 線程池隊列無限堆積,則可能有 OOM 的風險 ;
  • 假如消費者服務器宕機或者斷電,那么會丟失大量的消息。

那么如何優化這種模式呢 ?

答案是:拉取線程提交消息到線程池時,當隊列中消息數量到達一定數量時,提交消息到線程池會阻塞。

三、拉取線程 + 消費線程池(阻塞模式)

我們將消息包裝為 Runnable ,然后通過消費線程池執行 execute ,拉取線程會不會阻塞呢 ?

下圖是執行的源碼:

圖片圖片

可以看到,第 30 行調用的是 workQueue 的非阻塞的 offer 方法。

如果隊列已滿,新提交的任務并不會被 block 住,反而會調用后續的 reject 流程。

如果我們想要達到阻塞生產者的目的的話,可以采取如下的兩種方案:

  • 信號量限制同時進入線程池等待隊列的任務數 。

圖片圖片

  • 使用線程池的拒絕機制,把新加入的任務 put 到等待隊列里,這樣也可以阻塞住生產者。

圖片圖片

四、拉取線程 + Disruptor

下圖展示了 Disruptor 的流程圖 。

圖片圖片

和線程池機制非常類似, Disruptor 也是非常典型的生產者/消費者模式。線程池存儲提交任務的容器是阻塞隊列,而 Disruptor 使用的是環形緩沖區 RingBuffer。

環形緩沖區的設計相比阻塞隊列有如下優點:

  • 環形數組結構

為了避免垃圾回收,采用數組而非鏈表。同時,數組對處理器的緩存機制更加友好。

  • 元素位置定位

數組長度 2^n,通過位運算,加快定位的速度。下標采取遞增的形式,不用擔心 index 溢出的問題。index 是 long 類型,即使100萬QPS的處理速度,也需要30萬年才能用完。

  • 無鎖設計

每個生產者或者消費者線程,會先申請可以操作的元素在數組中的位置,申請到之后,直接在該位置寫入或者讀取數據。

此刻大家并不需要理解環形緩沖區的讀寫機制,只需要明白 環形緩沖區 RingBuffer 是 Disruptor 的精髓即可。

將消費線程池替換成 Disruptor 有兩個明顯的優點:

  • 無鎖隊列,寫入讀取性能非常好
  • 當拉取線程提交消息到 Disruptor 時,若環形緩沖區 RingBuffer 已經滿了,則拉取線程會阻塞,這樣天然的可以避免無限拉取,同時避免 OOM 的問題。

偽代碼類似:

1.定義 Disruptor

圖片圖片

2.拉取線程將消息發送到 Disruptor Ringbuffer

圖片圖片

3.消費消息

圖片圖片

整體的消費者線程模型如下圖:

圖片圖片

五、平滑停服 + 定時任務補償

當我們分析消費者線程模型時,無論我們使用哪種方式,假如服務器突然宕機、或者物理機斷電,則會丟失消息。

筆者推薦兩種方式:

1.平滑停服

平滑停服是指在停止應用程序時,盡量避免中斷正在進行的請求或任務,盡量讓正在進行的任務處理完成,并且不再接收新的任務,等所有任務執行完成后關閉應用。

在 Unix/Linux 系統中,可以使用 kill 命令發送信號給運行中的進程。

常見的信號有:

  • SIGTERM (15):請求進程終止,可以被捕捉和處理,用于優雅地停止進程。
  • SIGKILL (9):強制終止進程,不能被捕捉或忽略。
  • SIGQUIT (3):進程退出并生成核心轉儲(core dump)。

為了實現平滑停服,可以使用 Java 的 Runtime.getRuntime().addShutdownHook 方法注冊一個關閉鉤子(shutdown hook)。當 JVM 接收到SIGTERM信號時,關閉鉤子會被執行,從而可以在應用程序停止前執行一些清理工作。

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        System.out.println("Shutdown hook triggered. Performing cleanup...");
        // 在這里執行清理工作,如關閉資源、保存狀態等
}));

我們可以在鉤子里,關閉拉取線程池 ,優雅關閉消費線程池等 ,這樣可以盡量避免丟失消息。

2.定時任務補償

使用 List 做消息隊列,不可避免的會有消息丟失,所以我們需要用定時任務做補償,每隔一段時間去業務表里查詢業務狀態機,若狀態機不符合條件,則觸發補償策略。

參考資料:

https://www.redis.net.cn/tutorial/3510.html

https://redis.io/docs/latest/develop/data-types/lists/

https://juejin.cn/post/7094272373930590245

https://blog.scottlogic.com/2021/12/01/disruptor.html

責任編輯:武曉燕 來源: 勇哥Java實戰
相關推薦

2021-01-12 08:43:29

Redis ListStreams

2023-03-06 08:40:43

RedisListJava

2009-09-22 14:12:16

Hibernate S

2009-07-16 16:23:59

Swing線程

2022-01-21 19:22:45

RedisList命令

2022-01-15 07:20:18

Redis List 消息隊列

2023-10-13 00:00:00

Redis模塊空間對象

2009-11-09 11:15:06

WCF消息隊列

2021-11-05 21:33:28

Redis數據高并發

2018-04-02 14:29:18

Java多線程方式

2021-12-20 07:11:26

Java List排序 Java 基礎

2024-10-25 08:41:18

消息隊列RedisList

2016-06-12 10:37:32

云計算私有云公有云

2009-12-21 13:37:43

WCF消息交換

2013-05-07 09:39:14

軟件定義網絡SDNOpenFlow

2020-11-03 19:52:54

Java數組編程語言

2011-01-18 15:35:59

jQueryJavaScriptweb

2022-02-28 08:42:49

RedisStream消息隊列

2024-11-18 08:08:21

2020-10-26 09:19:11

線程池消息
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: japan21xxxxhd美女 日本欧美国产在线 | 不卡一区二区三区四区 | 人人看人人爽 | 高清一区二区三区 | 国产黄色小视频在线观看 | 99免费在线观看视频 | 国产欧美精品一区二区三区 | aaa大片免费观看 | 91在线视频免费观看 | 在线观看视频一区 | 国产精品我不卡 | 亚洲欧美综合精品久久成人 | 国产精品欧美精品日韩精品 | 成av在线| 国产一级片av | 国产精品久久久久久久久久久免费看 | 麻豆久久久久久 | 超碰成人免费 | 91精品国产91久久久久久最新 | 97视频免费 | 成人av网站在线观看 | 精品一区二区三区在线观看 | 久久男人| 日本一区二区三区视频在线 | 日韩视频在线播放 | 激情三区 | 国产欧美一区二区三区另类精品 | 亚洲美女网站 | 久久99精品国产麻豆婷婷 | 久久精品久久久久久 | 亚洲成人av在线播放 | 一区二区在线免费观看 | 波多野吉衣在线播放 | 在线欧美一区二区 | 日本视频一区二区三区 | 国产综合在线视频 | 国产精品亚洲成在人线 | 久久精品国产免费看久久精品 | 亚洲色欲色欲www | 亚洲美女一区二区三区 | 国产精品一区二区不卡 |