Redis List 是否適合做消息隊列?Spring Boot 與 Redission 實現 Redis 消息隊列!
分布式系統中必備的一個中間件就是消息隊列,通過消息隊列你能對服務間進行異步解耦、流量消峰、實現最終一致性。
目前市面上已經有 RabbitMQ、RochetMQ、ActiveMQ、Kafka等,有人會問:“Redis 適合做消息隊列么?”
在回答這個問題之前,你先從本質思考。
- 消息隊列提供了什么特性?
- Redis 如何實現消息隊列?是否滿足存取需求?
我將結合消息隊列的特點,分析使用 Redis 的 List 作為消息隊列的實現原理,并分享如何把 SpringBoot 與 Redission 整合來操作 Redis 運用到項目中。
什么是消息隊列
消息隊列是一種異步的服務間通信方式,適用于分布式和微服務架構。消息在被處理和刪除之前一直存儲在隊列上。
每條消息僅可被一位用戶處理一次。消息隊列可被用于分離重量級處理、緩沖或批處理工作以及緩解高峰期工作負載。
- Producer:消息生產者,負責產生和發送消息到 Broker;
- Broker:消息處理中心。負責消息存儲、確認、重試等,一般其中會包含多個 queue;
- Consumer:消息消費者,負責從 Broker 中獲取消息,并進行相應處理;
MySQL:“消息隊列的使用場景有哪些呢?“
消息隊列在實際應用中包括如下四個場景。
- 應用耦合:發送方、接收方系統之間不需要了解雙方,只需要認識消息。多應用間通過消息隊列對同一消息進行處理,避免調用接口失敗導致整個過程失敗。
- 異步處理:多應用對消息隊列中同一消息進行處理,應用間并發處理消息,相比串行處理,減少處理時間。
- 限流削峰:廣泛應用于秒殺或搶購活動中,避免流量過大導致應用系統掛掉的情況。
- 消息驅動的系統:系統分為消息隊列、消息生產者、消息消費者,生產者負責產生消息,消費者(可能有多個)負責對消息進行處理。
消息隊列滿足哪些特性
消息有序性
消息是異步處理的,但是消費者需要按照生產者發送消息的順序來消費,避免出現后發送的消息被先處理的情況。
重復消息處理
生產者可能因為網絡問題出現消息重傳導致消費者可能會收到多條重復消息。
同樣的消息重復多次的話可能會造成一業務邏輯多次執行,需要確保如何避免重復消費問題。
可靠性
一次保證消息的傳遞。如果發送消息時接收者不可用,消息隊列會保留消息,直到成功地傳遞它。
當消費者重啟后,可以繼續讀取消息進行處理,防止消息遺漏。
LPUSH
生產者使用 LPUSH key element[element...] 將消息插入到隊列的頭部,如果 key 不存在則會創建一個空的隊列再插入消息。
如下,生產者向隊列 queue 先后插入了 “Java”、“碼哥字節”、“Go”,返回值表示消息插入隊列后的個數。
> LPUSH queue Java 碼哥字節 Go
(integer) 3
MySQL:“如果生產者消息發送很快,消費者處理不過來,會導致消息積壓,占用過多的 Redis 內存?!?/p>
確實,List 并沒有提供類似于 Kafka 的 ConsumeGroup ,會使用多個消費者策劃給你續組成一個消費組來分擔處理隊列消息。不過在 Redis 5.0 之后,提供了 Streams 數據類型,后面我會介紹到。
RPOP
消費者使用 RPOP key 依次讀取隊列的消息,先進先出,所以 “Java”會先讀取消費:
> RPOP queue
"Java"
> RPOP queue
"碼哥字節"
> RPOP queue
"Go"
圖2-13
實時消費問題
謝霸戈:“這么簡單就實現了?”
別高興的太早,LPUSH、RPOP 存在一個性能風險,生產者向隊列插入數據的時候,List 并不會主動通知消費者及時消費。
謝霸戈:“那我寫一個 while(true) 不停地調用 RPOP 指令,當有新消息就消費“
程序需要不斷輪詢并判斷是否為空再執行消費邏輯,這就會導致即使沒有新消息寫入隊列,消費者也在不停地調用 RPOP 命令占用 CPU 資源。
謝霸戈:“如何避免循環調用導致的 CPU 性能損耗呢?”
請叫我貼心哥 Redis,我提供了 BLPOP、BRPOP 阻塞讀取的命令,消費者在讀取隊列沒有數據的時候自動阻塞,直到有新的消息寫入隊列,才會繼續讀取新消息執行業務邏輯。
BRPOP queue 0
參數 0 表示阻塞等待時間無止期,哪怕是煙花易冷人事易分,雨紛紛舊故里草木深,斑駁的城門盤踞著老樹根,石板上回蕩的是再等,一直等到“心上人”來。
重復消費解決方案
- 消息隊列為自動每一條消息生成一個全局 ID;
- 生產者為每一條消息創建一個全局 ID,消費者把處理過的消息 ID 記錄下來判斷是否重復。
其實這就是冪等,對于同一條消息,消費者收到后處理一次的結果和多次的結果是一致的。
消息可靠性解決方案
謝霸戈:“消費者讀取消息,處理過程中宕機了就會導致消息沒有處理完成,可是數據已經不在隊列中了咋辦?”
本質就是消費者在處理消息的時候崩潰了,無法再讀取消息,缺乏一個消息確認可靠機制。
我提供了 BRPOPLPUSH source destination timeout指令,含義是阻塞的方式從 source 隊列讀取消息的同時把這條消息復制到另一個 destination 隊列中(備份),并且是原子操作。
不過這個指令在 6.2 版本被 BLMOVE 取代。接下來,上才藝!生產者使用 LPUSH 把消息依次從存入 order:pay 隊列隊頭(左端)。
LPUSH order:pay "謝霸戈"
LPUSH order:pay "肖材吉"
消費者消費消息的時候在 while循環使用BLMOVE 以阻塞的方式從隊列 order:pay 隊尾(右端)彈出消息“謝霸戈”,同時把該消息復制到隊列 order:pay:back 隊頭(左端),該操作是原子性的,最后一個參數 timeout = 0 表示持續等待。
BLMOVE order:pay order:pay:back RIGHT LEFT 0
如果消費消息“謝霸戈”成功,那就使用 LREM 把隊列 order:pay:back 的“謝霸戈”消息刪除,從而實現 ACK 確認機制。
LREM order:pay:back 0 "謝霸戈"
倒數第二個參數 count 的含義如下。
- count > 0,從表頭(左端)向表尾(右端),依次刪除 count 個 value。
- count < 0,從表尾(右端)向表頭(左端),依次刪除 count 絕對值個 value。
- count = 0,刪除所有的 value。
消費異常的話,應用程序使用 BRPOP order:pay:back 從備份隊列再次讀取消息處理即可。
Redisson 實戰
在 Java 中,你可以利用 Redission 封裝的 API 來快速實現隊列,接下來我將基于 SpringBoot 2.1.4 版本來教你如何整合 Redisson。
添加依賴
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.23.3</version>
</dependency>
application.yaml引入 Redisson 配置文件。
spring:
application:
name: redission
redis:
redisson:
file: classpath:redisson-config.yaml
創建 redisson-config.yaml 配置。
singleServerConfig:
idleConnectionTimeout: 10000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
password: magebyte
subscriptionsPerConnection: 5
clientName: redissonClient
address: "redis://127.0.0.1:6379"
subscriptionConnectionMinimumIdleSize: 1
subscriptionConnectionPoolSize: 50
connectionMinimumIdleSize: 24
connectionPoolSize: 64
database: 0
dnsMonitoringInterval: 5000
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.Kryo5Codec> {}
transportMode: "NIO"
在代碼中,我使用的是阻塞雙端隊列,消費者開啟死循環,執行 BLMOVE
指令。
@Slf4j
@Service
public class QueueService {
@Autowired
private RedissonClient redissonClient;
private static final String ORDER_PAY_QUEUE = "order:pay";
private static final String ORDER_PAY_BACK_QUEUE = "order:pay:back";
/**
* 生產者發送消息到隊列頭部
*
* @param message
*/
public void sendMessage(String message) {
RBlockingDeque<String> orderPayQueue = redissonClient.getBlockingDeque(ORDER_PAY_QUEUE);
try {
orderPayQueue.putFirst(message);
log.info("將消息: {} 插入到隊列 {}。", message, ORDER_PAY_QUEUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 消費者消費消息
*/
public void onMessage() {
RBlockingDeque<String> orderPayQueue = redissonClient.getBlockingDeque(ORDER_PAY_QUEUE);
while (true) {
// BLMOVE order:pay order:pay:back RIGHT LEFT 0
String message = orderPayQueue.move(Duration.ofSeconds(0), DequeMoveArgs.pollLast()
.addFirstTo(ORDER_PAY_BACK_QUEUE));
log.info("從隊列 {} 中讀取到消息:{},并把消息復制到 {} 隊列.", ORDER_PAY_QUEUE, message, ORDER_PAY_BACK_QUEUE);
// 消費正常,從 ORDER_PAY_BACK_QUEUE 刪除這條消息,LREM order:pay:back 0 message
removeBackQueueMessage(message, ORDER_PAY_BACK_QUEUE);
}
}
/**
* 從隊列中刪除消息
* @param message
* @param queueName
*/
private void removeBackQueueMessage(String message, String queueName) {
RBlockingDeque<String> orderPayBackDeque = redissonClient.getBlockingDeque(queueName);
boolean remove = orderPayBackDeque.remove(message);
log.info("消費正常,刪除隊列 {} 的消息 {}。", queueName, message);
}
}
單元測試
RunWith(SpringRunner.class)
@SpringBootTest(classes = RedissionApplication.class)
public class RedissionApplicationTests {
@Autowired
private QueueService queueService;
@Test
public void testQueue() throws InterruptedException {
new Thread(() -> {
for (int i = 0; i < 1000; i++) {
queueService.sendMessage("消息" + i);
}
}).start();
new Thread(() -> queueService.onMessage()).start();
Thread.currentThread().join();
}
}
總結
可以使用 List 數據結構來實現消息隊列,滿足先進先出。
Redis 是一個非常輕量級的鍵值數據庫,部署一個 Redis 實例就是啟動一個進程,部署 Redis 集群,也就是部署多個 Redis 實例。
而 Kafka、RabbitMQ 部署時,涉及額外的組件,例如 Kafka 的運行就需要再部署 ZooKeeper。相比 Redis 來說,Kafka 和 RabbitMQ 一般被認為是重量級的消息隊列。
需要注意的是,我們要避免生產者過快,消費者過慢導致的消息堆積占用 Redis 的內存。
在消息量不大的情況下使用 Redis 作為消息隊列,他能給我們帶來高性能的消息讀寫,這似乎也是一個很好消息隊列解決方案。