解鎖 Redis 發(fā)布訂閱模式:通過實踐演示挖掘消息通信潛能
Redis 的發(fā)布訂閱模式,正是這樣一把強大的“通信鑰匙”,它為開發(fā)者們開啟了一扇實現(xiàn)高效消息交互的大門。通過發(fā)布訂閱模式,一個消息生產(chǎn)者可以將消息發(fā)布到指定的頻道中,而多個對該頻道感興趣的消息消費者能夠同時接收到這些消息,實現(xiàn)了消息的一對多廣播。
在本文中,我們將通過詳細的實踐演示,深入探索 Redis 發(fā)布訂閱模式的奧秘。從基礎(chǔ)概念的簡要回顧,到搭建環(huán)境、編寫代碼示例,再到對運行結(jié)果的詳細分析,一步步帶你領(lǐng)略這一強大模式在實際項目中的應(yīng)用魅力,讓你在面對類似的消息通信需求時能夠游刃有余。
一、詳解redis發(fā)布訂閱模式
1. 什么是發(fā)布與訂閱
redis發(fā)布訂閱是一種解耦生產(chǎn)者和消費者一種消息通信模式,訂閱者通過訂閱channel等待最新的消息,發(fā)布者按需將消息發(fā)送到指定channel上供訂閱者消費:
2. redis發(fā)布訂閱模式的使用
3. redis發(fā)布訂閱的兩種模式
redis發(fā)布訂閱有兩種方式:
- 基于精確頻道的訂閱模式,即訂閱者訂閱名為channel-1的頻道,那么只有channel-1有消息時才會通知這些訂閱。
- 基于匹配模型的訂閱模式,即訂閱者可以通過表達式訂閱頻道,例如訂閱者訂閱了channel*的頻道,那么所有前綴為channel的頻道都會向這個訂閱者發(fā)布消息。
4. 基于頻道的訂閱模式
基于頻道訂閱模式的指令格式如下,可以看到訂閱者可以訂閱多個頻道
subscribe channel [channel ...]
所以我們開啟一個redis客戶端,訂閱一個channel:sport的頻道,對應(yīng)的指令和輸出結(jié)果如下,可以看到發(fā)送指令后redis服務(wù)端返回1,通知訂閱者成功訂閱該頻道:
# 客戶端1 訂閱 channel:sport
127.0.0.1:6379> SUBSCRIBE channel:sport
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:sport"
3) (integer) 1
對應(yīng)的我們也給出發(fā)布消息的指令格式:
publish channel message
此時,我們再開啟一個redis客戶端,發(fā)布一條消息到channel:sport的頻道:
# 另一個客戶端發(fā)送消息
127.0.0.1:6379> PUBLISH channel:sport "this is why we play"
(integer) 1
127.0.0.1:6379>
此時剛剛訂閱消息的redis客戶端就會實時的收到這條消息:
5. 基于匹配模式的發(fā)布與訂閱示例
有時候我們會訂閱多個頻道,我們不可能每次都去手動增加訂閱的頻道,例如我們當前訂閱頻道有:c1、c2、c3、c4、c5、c6、c7、c8。將來還可能出現(xiàn)c9等情況。我們不可能實時去添加訂閱的頻道。 觀察上面的頻道我們發(fā)現(xiàn)頻道都是以c開頭,后續(xù)的數(shù)字不斷變化,所以我們完全可以使用模式匹配來實現(xiàn)頻道訂閱。
模式訂閱和取消的命令為:
psubscribe pattern [pattern...]
punsubscribe [pattern [pattern ...]]
關(guān)于parttern常見的匹配符有:
- :表示任意占位符,例如c,可以匹配c、c1、c111
- ?*:匹配一個及以上個占位符
- ?:表示匹配一個占位符
我們希望訂閱c1-c9的頻道基于模式匹配我們就能夠做到這一點。
我們首先開啟一個客戶端,使用模式匹配發(fā)起訂閱:
# 訂閱匹配cxx相關(guān)的模式
PSUBSCRIBE c?*
然后我們在開啟另一個客戶端,發(fā)送消息到c1頻道:
127.0.0.1:6379> PUBLISH c1 "this is c1 message"
(integer) 1
127.0.0.1:6379>
剛剛訂閱的客戶端就會收到消息:
127.0.0.1:6379> PSUBSCRIBE c?*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c?*"
3) (integer) 1
1) "pmessage"
2) "c?*"
3) "c1"
4) "this is c1 message"
注意:當你訂閱PSUBSCRIBE c?* c1訂閱時,若另一個客戶端發(fā)送消息到c1你會收到兩條消息(原因會在后文源碼解析時補充)。
如下便是PSUBSCRIBE c?* c1的收到PUBLISH c1 "this is c1 message"的消息內(nèi)容:
# 訂閱c?* 和精確的c1兩個頻道
127.0.0.1:6379> PSUBSCRIBE c?* c1
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c?*"
3) (integer) 1
1) "psubscribe"
2) "c1"
3) (integer) 2
1) "pmessage"
2) "c1"
3) "c1"
4) "this is c1 message"
1) "pmessage"
2) "c?*"
3) "c1"
4) "this is c1 message"
6. 基于spring boot集成redis落地發(fā)布與訂閱模式
我們將使用一個用戶訂閱channel:sport,一個用戶訂閱channel:stock,而另一個用戶則通過channel*訂閱兩個都訂閱:
了解了大體思路,我們就開始落地這個需求,首先第一步還是引入redis的依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
然后在配置文件中給出redis文件的配置信息:
spring.redis.host=127.0.0.1
spring.redis.port=6379
隨后就是定義3個訂閱者的處理器的通用接口定義,后續(xù)我們將通過函數(shù)時編程來分別落地3個訂閱者的處理器:
public interface RedisMsgHandler {
/**
* 處理發(fā)布者發(fā)送的消息
* @param message
*/
void handleMessage(String message);
}
然后我們就可以針對redis不同的訂閱者進行配置進行聲明:
@Configuration
@EnableCaching
@Slf4j
public class RedisConfig {
/**
* Redis消息監(jiān)聽器容器
*
* @param connectionFactory
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//訂閱體育消息
container.addMessageListener(listenerAdapter(msg -> log.info("收到體育新聞,消息內(nèi)容:{}", msg)), new PatternTopic("channel:sport"));
//訂閱股票消息
container.addMessageListener(listenerAdapter(msg -> log.info("收到庫存消息,消息內(nèi)容:{}", msg)), new PatternTopic("channel:stock"));
//channel前綴的消息都訂閱
container.addMessageListener(listenerAdapter(msg -> log.info("收到channel前綴的消息:{}", msg)), new PatternTopic("channel:*"));
return container;
}
/**
* 配置消息接收處理類
*
* @return
*/
@Bean
@Scope("prototype")
MessageListenerAdapter listenerAdapter(RedisMsgHandler handler) {
//這個地方 是給messageListenerAdapter 傳入一個消息接受的處理器,利用反射的方法調(diào)用“receiveMessage”
//也有好幾個重載方法,這邊默認調(diào)用處理器的方法 叫 handleMessage 可以自己到源碼里面看
return new MessageListenerAdapter(handler, "handleMessage");//注意2個通道調(diào)用的方法都要為 receiveMessage
}
}
最后我們啟動一個定時器定時向頻道發(fā)送消息:
@EnableScheduling
@Component
public class SenderTask {
@Autowired
private StringRedisTemplate stringRedisTemplate;
//向redis消息隊列index通道發(fā)布消息
@Scheduled(fixedRate = 2000)
public void sendMessage() {
stringRedisTemplate.convertAndSend("channel:stock", "股票漲了"+Math.random()+"個百分點");
stringRedisTemplate.convertAndSend("channel:sport", "籃球新聞 No."+((int)(Math.random()*100)+1)+"選手得分");
}
}
最后運行結(jié)果如下所示:
2025-01-11 22:39:03.129 INFO 16952 --- [ container-3] com.sharkChili.RedisConfig : 收到channel前綴的消息:股票漲了0.5562952653668436個百分點
2025-01-11 22:39:03.129 INFO 16952 --- [ container-4] com.sharkChili.RedisConfig : 收到體育新聞,消息內(nèi)容:籃球新聞 No.20選手得分
2025-01-11 22:39:03.129 INFO 16952 --- [ container-5] com.sharkChili.RedisConfig : 收到channel前綴的消息:籃球新聞 No.20選手得分
2025-01-11 22:39:05.121 INFO 16952 --- [ container-6] com.sharkChili.RedisConfig : 收到庫存消息,消息內(nèi)容:股票漲了0.47411355078897777個百分點
2025-01-11 22:39:05.122 INFO 16952 --- [ container-7] com.sharkChili.RedisConfig : 收到channel前綴的消息:股票漲了0.47411355078897777個百分點
2025-01-11 22:39:05.135 INFO 16952 --- [ container-9] com.sharkChili.RedisConfig : 收到channel前綴的消息:籃球新聞 No.15選手得分
2025-01-11 22:39:05.135 INFO 16952 --- [ container-8] com.sharkChili.RedisConfig : 收到體育新聞,消息內(nèi)容:籃球新聞 No.15選手得分
2025-01-11 22:39:07.135 INFO 16952 --- [ container-11] com.sharkChili.RedisConfig : 收到channel前綴的消息:股票漲了0.886860372468581個百分點
2025-01-11 22:39:07.135 INFO 16952 --- [ container-10] com.sharkChili.RedisConfig : 收到庫存消息,消息內(nèi)容:股票漲了0.886860372468581個百分點
2025-01-11 22:39:07.136 INFO 16952 --- [ container-12] com.sharkChili.RedisConfig : 收到體育新聞,消息內(nèi)容:籃球新聞 No.89選手得分
2025-01-11 22:39:07.136 INFO 16952 --- [ container-13] com.sharkChili.RedisConfig : 收到channel前綴的消息:籃球新聞 No.89選手得分
7. 發(fā)布訂閱的常見的使用場景和優(yōu)缺點
使用場景如:聊天室、公告牌、異步處理電商訂單非核心操作等需要實現(xiàn)消息解耦的場景都可以使用消息訂閱發(fā)布,如下所示,這就是電商下單業(yè)務(wù)對于redis發(fā)布訂閱模式的使用模型圖:
當然redis發(fā)布訂閱模式優(yōu)缺點也很明顯,我們先來說說優(yōu)點,redis發(fā)布訂閱模式 實現(xiàn)簡單,對于簡單的解耦生產(chǎn)者和消費者關(guān)系的應(yīng)用場景綽綽有余,而缺點也是一樣,因為redis發(fā)布訂閱模式實現(xiàn)比較簡單,并沒有持久化機制無法保證可靠消費和故障恢復(fù),同時,相較于Kafka、RocketMQ,Redis的實現(xiàn)略顯粗糙,無法實現(xiàn)消息堆積與回溯。
二、redis如何實現(xiàn)發(fā)布訂閱模型
關(guān)于redis發(fā)布訂閱模型,對于底層實現(xiàn)感興趣的讀者,可以參考筆者下面這篇關(guān)于pub/sub的源碼解析:《聊聊 Redis 的發(fā)布訂閱設(shè)計與實現(xiàn)》
三、小結(jié)
本文從redis發(fā)布訂閱模型基本介紹和幾個實踐案例角度帶讀者快速入門了一下redis發(fā)布訂閱模型的使用場景以及一些注意事項,希望對你有幫助。