面試被問到Redis實現發布與訂閱,手摸手教
簡介
Redis發布與發布功能(Pub/Sub)是基于事件座位基本的通信機制,是目前應用比較普遍的通信模型,它的目的主要是解除消息的發布者與訂閱者之間的耦合關系。
Redis作為消息發布和訂閱之間的服務器,起到橋梁的作用,在Redis里面有一個channel的概念,也就是頻道,發布者通過指定發布到某個頻道,然后只要有訂閱者訂閱了該頻道,該消息就會發送給訂閱者,原理圖如下所示:
Redis同時也可以使用list類型實現消息隊列(消息隊列的實現以及應用場景會在下一篇文章繼續講解)。
Redis的發布與訂閱的功能應用還是比較廣泛的,它的應用場景有很多。比如:最常見的就是實現實時聊天的功能,還是有就是博客的粉絲文章的推送,當博主推送原創文章的時候,就會將文章實時推送給博主的粉絲。
簡介完Redis的發布于訂閱功能,下面就要來實操一下,包括linux命令的實操和java代碼的實現。
命令實操
這里就假設各位讀者都已經安裝好自己的虛擬機環境和Redis了,若是沒有安裝好的,可以參考這一篇博文:https://www.cnblogs.com/zuidongfeng/p/8032505.html
我這里是已經安裝好了Redis了,直接啟動我們的Redis,我已經設置好了開機啟動,上面的那篇博文有講解怎么設置開機啟動。
發布消息
Redis中發布消息的命令是publish,具體使用如下所示:
PUBLISH test "haha":test表示頻道的名稱,haha表示發布的內容,這樣就完成了一個一個消息的發布,后面的返回(integer)0表示0人訂閱。
訂閱頻道
于此同時再啟動一個窗口,這個窗口作為訂閱者,訂閱者的命令subscribe,使用SUBSCRIBE test就表示訂閱了test這個頻道
訂閱后返回的結果中由三條信息,第一個表示類型、第二個表示訂閱的頻道,第三個表示訂閱的數量。接著在第一個窗口進行發布消息:
可以看到發布者發布的消息,訂閱者都會實時的接收到,并發訂閱者收到的信息中也會出現三條信息,分別表示:返回值的類型、頻道名稱、消息內容。
取消訂閱
若是想取消之前的訂閱可以使用unsubscribe命令,格式為:
- unsubscribe 頻道名稱
- // 取消之前訂閱的test頻道
- unsubscribe test
輸入命令后,返回以下結果:
- [root@pinyoyougou-docker src]# ./redis-cli
- 127.0.0.1:6379> UNSUBSCRIBE test
- 1) "unsubscribe"
- 2) "test"
- 3) (integer) 0
它分別表示:返回值的類型、頻道的名稱、該頻道訂閱的數量。
按模式訂閱
除了直接以特定的名城進行訂閱,還可以按照模式進行訂閱,模式的方式進行訂閱可以一次訂閱多個頻道,按照模式進行訂閱的命令為psubscribe,具體格式如下:
- psubscribe 模式
- // 表示訂閱名稱以ldc開頭的頻道
- psubscribe ldc*
輸入上面的命令后,返回如下結果:
- 127.0.0.1:6379> PSUBSCRIBE ldc*
- Reading messages... (press Ctrl-C to quit)
- 1) "psubscribe"
- 2) "ldc*"
- 3) (integer) 1
這個也是非常簡單,分別表示:返回的類型(表示按模式訂閱類型)、訂閱的模式、訂閱數。
取消按模式訂閱
假如你想取消之前的按模式訂閱,可以使用punsubscribe來取消,具體格式:
- punsubscribe 模式
- // 取消頻道名稱按照ldc開頭的頻道
- punsubscribe ldc*
他的返回值,如下所示:
- 127.0.0.1:6379> PUNSUBSCRIBE ldc*
- 1) "punsubscribe"
- 2) "ldc*"
- 3) (integer) 0
這個就不多說了,表示的意思和上面的一樣,可以看到上面的命令都是有規律的訂閱SUBSCRIBE,取消就是UNSUBSCRIBE,前面加前綴UN,按模式訂閱也是。
查看訂閱消息
(1)你想查看某一個模式下訂閱數是大于零的頻道,可以使用如下格式的命令進行操作:
- pubsub channels 模式
- // 查看頻道名稱以ldc模式開頭的訂閱數大于零的頻道
- pubsub channels ldc*
(2)假如你想查看某一個頻道的訂閱數,可以使用如下命令:
- pubsub numsub 頻道名稱
(3)查看按照模式的訂閱數,可以使用如下命令進行操作:
- pubsub numpat
到這里以上的命令操作就基本結束了,下面就來代碼實戰。
代碼實練
(1)首先第一步想要操作Redis,再SpringBoot項目中引入jedis的依賴,畢竟jedis是官方推薦使用操作Redis的工具。
- <dependency>
- <groupId>redis.clients</groupId>
- <artifactId>jedis</artifactId>
- <version>2.9.0</version>
- </dependency>
(2)然后創建發布者Publisher,用于消息的發布,具體代碼如下:
- package com.ldc.org.myproject.demo.redis;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisPool;
- /**
- * 發布者
- * @author liduchang
- *
- */
- public class Publisher extends Thread{
- // 連接池
- private final JedisPool jedisPool;
- // 發布頻道名稱
- private String name;
- public Publisher(JedisPool jedisPool, String name) {
- super();
- this.jedisPool = jedisPool;
- this.name = name;
- }
- @Override
- public void run() {
- // 獲取要發布的消息
- BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
- // 獲取連接
- Jedis resource = jedisPool.getResource();
- while (true) {
- String message = null;
- try {
- message = reader.readLine();
- if (!"exit".equals(message)) {
- // 發布消息
- resource.publish(name, "發布者:"+Thread.currentThread().getName()+"發布消息:"+message);
- } else {
- break;
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
(3)接著創建訂閱類Subscriber,并且繼承JedisPubSub 類,重寫onMessage、onSubscribe、onUnsubscribe三個方法,這三個方法的調用時機在注釋上都有說明,具體的實現代碼如下:
- package com.ldc.org.myproject.demo.redis;
- import com.fasterxml.jackson.core.sym.Name;
- import redis.clients.jedis.JedisPubSub;
- /**
- * 訂閱者
- * @author liduchang
- */
- public class Subscriber extends JedisPubSub {
- //訂閱頻道名稱
- private String name;
- public Subscriber(String name) {
- this.name = name;
- }
- /**
- * 訂閱者收到消息時會調用
- */
- @Override
- public void onMessage(String channel, String message) {
- // TODO Auto-generated method stub
- super.onMessage(channel, message);
- System.out.println("頻道:"+channel+" 接受的消息為:"+message);
- }
- /**
- * 訂閱了頻道會被調用
- */
- @Override
- public void onSubscribe(String channel, int subscribedChannels) {
- System.out.println("訂閱了頻道:"+channel+" 訂閱數為:"+subscribedChannels);
- }
- /**
- * 取消訂閱頻道會被調用
- */
- @Override
- public void onUnsubscribe(String channel, int subscribedChannels) {
- System.out.println("取消訂閱的頻道:"+channel+" 訂閱的頻道數量為:"+subscribedChannels);
- }
- }
(4)這次創建的才是真正的訂閱者SubThread,上面的Subscriber是指為了測試實訂閱的時候或者發布消息,能夠有信息輸出:
- package com.ldc.org.myproject.demo.redis;
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisPool;
- /**
- * 訂閱者線程
- * @author liduchang
- *
- */
- public class SubThread extends Thread {
- private final JedisPool jedisPool;
- private final Subscriber subscriber;
- private String name;
- public SubThread(JedisPool jedisPool,Subscriber subscriber,String name) {
- super();
- this.jedisPool = jedisPool;
- this.subscriber = subscriber;
- this.name = name;
- }
- @Override
- public void run() {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- // 訂閱頻道為name
- jedis.subscribe(subscriber, name);
- } catch (Exception e) {
- System.err.println("訂閱失敗");
- e.printStackTrace();
- } finally {
- if (jedis!=null) {
- // jedis.close();
- //歸還連接到redis池中
- jedisPool.returnResource(jedis);
- }
- }
- }
- }
(5)后面就是測試了,分別測試發布與訂閱的測試,發布者為TestPublisher,訂閱者為TestSubscriber:
- package com.ldc.org.myproject.demo.redis;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- import redis.clients.jedis.JedisPool;
- public class TestPublisher {
- public static void main(String[] args) throws InterruptedException {
- JedisPool jedisPool = new JedisPool("192.168.163.155");
- // 向ldc頻道發布消息
- Publisher publisher = new Publisher(jedisPool, "ldc");
- publisher.start();
- }
- }
訂閱者
- package com.ldc.org.myproject.demo.redis;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- import redis.clients.jedis.JedisPool;
- public class TestSubscriber1 {
- public static void main(String[] args) throws InterruptedException {
- JedisPool jedisPool = new JedisPool("192.168.163.155",6379);
- Subscriber subscriber = new Subscriber("黎杜");
- // 訂閱ldc頻道
- SubThread thread= new SubThread(jedisPool, subscriber, "ldc");
- thread.start();
- Thread.sleep(600000);
- // 取消訂閱
- subscriber.unsubscribe("ldc");
- }
- }
這里為了測試方便就直接創建線程的方式,更好的話可以使用線程池的方式通過線程池的submit方法來執行線程,若是不用了可以使用shutdown方式關閉。