剖析 Redis List 消息隊列的三種消費線程模型
Redis 列表(List)是一種簡單的字符串列表,它的底層實現是一個雙向鏈表。
生產環境,很多公司都將 Redis 列表應用于輕量級消息隊列 。這篇文章,我們聊聊如何使用 List 命令實現消息隊列的功能以及剖析消費者線程模型 。
圖片
一、核心流程
生產者使用 LPUSH key element[element...] 將消息插入到隊列的頭部,如果 key 不存在則會創建一個空的隊列再插入消息。
如下,生產者向隊列 queue 先后插入了 「Java」「勇哥」「Go」,返回值表示消息插入隊列后的個數。
> LPUSH queue Java 勇哥 Go
(integer) 3
消費者使用 RPOP key 依次讀取隊列的消息,先進先出,所以 「Java」會先讀取消費:
> RPOP queue
"Java"
> RPOP queue
"勇哥"
> RPOP queue
"Go"
圖片
接下來,我們可以通過 spring-data-redis API 演示生產消費流程:
- 生產者
redisTemplate.opsForList().leftPush("queue" , "Java");
redisTemplate.opsForList().leftPush("queue" , "勇哥");
redisTemplate.opsForList().leftPush("queue" , "Go");
- 消費者
我們啟動一個獨立的線程從隊列中讀取消息(RPOP 命令),讀取成功之后,消費消息,若沒有消息,則休眠一會,下一次循環再繼續。
圖片
上圖的偽代碼中, while(true) 循環內不停地調用 RPOP 指令,當有消息時,可以及時處理,但假如沒有讀取到消息,則需要休眠一會。
這里要加休眠,主要是為了減少空讀的頻率,避免 CPU 無意義的消耗。
有什么更優化的方式嗎?有,那就是使用 Redis 阻塞讀取 List 的命令。
Redis 提供了 BLPOP、BRPOP 阻塞讀取的命令,消費者在在讀取隊列沒有數據的時自動阻塞,直到有新的消息寫入隊列,才會繼續讀取新消息執行業務邏輯。
BRPOP queue 0
參數 0 表示阻塞等待時間無限制 。
圖片
如圖,我們啟動一個消費線程永動機,消費線程拉取消息后,執行消費邏輯。
這種消費者線程模型非常容易理解,同時也非常適合順序消費的模式。同時,假如我們在消費消息時,服務器宕機或者斷電,可能丟失一條消息。
接下來,我們想一想,有沒有消費速度更高的消費模型嗎?筆者根據過往的經歷,列舉三種模式:
- 拉取線程 + 消費線程池(非阻塞模式)
- 拉取線程 + 消費線程池 (阻塞模式)
- 拉取線程 + Disruptor(阻塞模式)
二、拉取線程 + 消費線程池(非阻塞模式)
為了提升消費速度,我們可以將拉取和消費拆分成兩種動作,分別通過不同的線程池來處理。拉取線程池負責拉取消息,消費線程池負責消費消息。
圖片
偽代碼類似:
圖片
如圖,在拉取線程內部,我們拉取完消息后,將消息提交到消費線程 consumeExecutor 。
這樣方式可以通過多線程執行大幅度提升消費速度 ,但是這里還是有一個問題:
假如消費速度很慢,生產者速度很高,那么就會在線程池內容易產生消息堆積,這里面會產生兩個隱形風險:
- 線程池隊列無限堆積,則可能有 OOM 的風險 ;
- 假如消費者服務器宕機或者斷電,那么會丟失大量的消息。
那么如何優化這種模式呢 ?
答案是:拉取線程提交消息到線程池時,當隊列中消息數量到達一定數量時,提交消息到線程池會阻塞。
三、拉取線程 + 消費線程池(阻塞模式)
我們將消息包裝為 Runnable ,然后通過消費線程池執行 execute ,拉取線程會不會阻塞呢 ?
下圖是執行的源碼:
圖片
可以看到,第 30 行調用的是 workQueue 的非阻塞的 offer 方法。
如果隊列已滿,新提交的任務并不會被 block 住,反而會調用后續的 reject 流程。
如果我們想要達到阻塞生產者的目的的話,可以采取如下的兩種方案:
- 信號量限制同時進入線程池等待隊列的任務數 。
圖片
- 使用線程池的拒絕機制,把新加入的任務 put 到等待隊列里,這樣也可以阻塞住生產者。
圖片
四、拉取線程 + Disruptor
下圖展示了 Disruptor 的流程圖 。
圖片
和線程池機制非常類似, Disruptor 也是非常典型的生產者/消費者模式。線程池存儲提交任務的容器是阻塞隊列,而 Disruptor 使用的是環形緩沖區 RingBuffer。
環形緩沖區的設計相比阻塞隊列有如下優點:
- 環形數組結構
為了避免垃圾回收,采用數組而非鏈表。同時,數組對處理器的緩存機制更加友好。
- 元素位置定位
數組長度 2^n,通過位運算,加快定位的速度。下標采取遞增的形式,不用擔心 index 溢出的問題。index 是 long 類型,即使100萬QPS的處理速度,也需要30萬年才能用完。
- 無鎖設計
每個生產者或者消費者線程,會先申請可以操作的元素在數組中的位置,申請到之后,直接在該位置寫入或者讀取數據。
此刻大家并不需要理解環形緩沖區的讀寫機制,只需要明白 環形緩沖區 RingBuffer 是 Disruptor 的精髓即可。
將消費線程池替換成 Disruptor 有兩個明顯的優點:
- 無鎖隊列,寫入讀取性能非常好
- 當拉取線程提交消息到 Disruptor 時,若環形緩沖區 RingBuffer 已經滿了,則拉取線程會阻塞,這樣天然的可以避免無限拉取,同時避免 OOM 的問題。
偽代碼類似:
1.定義 Disruptor
圖片
2.拉取線程將消息發送到 Disruptor Ringbuffer
圖片
3.消費消息
圖片
整體的消費者線程模型如下圖:
圖片
五、平滑停服 + 定時任務補償
當我們分析消費者線程模型時,無論我們使用哪種方式,假如服務器突然宕機、或者物理機斷電,則會丟失消息。
筆者推薦兩種方式:
1.平滑停服
平滑停服是指在停止應用程序時,盡量避免中斷正在進行的請求或任務,盡量讓正在進行的任務處理完成,并且不再接收新的任務,等所有任務執行完成后關閉應用。
在 Unix/Linux 系統中,可以使用 kill 命令發送信號給運行中的進程。
常見的信號有:
- SIGTERM (15):請求進程終止,可以被捕捉和處理,用于優雅地停止進程。
- SIGKILL (9):強制終止進程,不能被捕捉或忽略。
- SIGQUIT (3):進程退出并生成核心轉儲(core dump)。
為了實現平滑停服,可以使用 Java 的 Runtime.getRuntime().addShutdownHook 方法注冊一個關閉鉤子(shutdown hook)。當 JVM 接收到SIGTERM信號時,關閉鉤子會被執行,從而可以在應用程序停止前執行一些清理工作。
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutdown hook triggered. Performing cleanup...");
// 在這里執行清理工作,如關閉資源、保存狀態等
}));
我們可以在鉤子里,關閉拉取線程池 ,優雅關閉消費線程池等 ,這樣可以盡量避免丟失消息。
2.定時任務補償
使用 List 做消息隊列,不可避免的會有消息丟失,所以我們需要用定時任務做補償,每隔一段時間去業務表里查詢業務狀態機,若狀態機不符合條件,則觸發補償策略。
參考資料:
https://www.redis.net.cn/tutorial/3510.html
https://redis.io/docs/latest/develop/data-types/lists/