成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Redis5新特性Streams作消息隊列

數據庫 其他數據庫 Redis
本文所使用 Redis 版本為 5.0.5 。如果使用更早的 5.x 版本,有些 API 使用效果,與本文中描述略有不同。

前言

Redis 5 新特性中,Streams 數據結構的引入,可以說它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 作為消息隊列使用時,得到更完善,更強大的原生支持,其中尤為明顯的是持久化消息隊列。同時,stream 借鑒了 kafka 的消費組模型概念和設計,使消費消息處理上更加高效快速。本文就 Streams 數據結構中常用 API 進行分析。

準備

本文所使用 Redis 版本為 5.0.5 。如果使用更早的 5.x 版本,有些 API 使用效果,與本文中描述略有不同。

添加消息

Streams 添加數據使用 XADD 指令進行添加,消息中的數據以 K-V 鍵值對的形式進行操作。一條消息可以存在多個鍵值對,添加命令格式: 

  1. XADD key ID field string [field string ...] 

其中 key 為 Streams 的名稱,ID 為消息的唯一標志,不可重復,field string 就為鍵值對。下面我們就添加以 person 為名稱的流,進行操作。 

  1. XADD person * name ytao des https://ytao.top 

上面添加案例中,ID 使用 * 號復制,這里代表著服務端自動生成 Id,添加后返回數據 "1578238486193-0"

這里自動生成的 Id 格式為 <millisecondstime>-<sequencenumber> Id 是由兩部分組成:

  1.  millisecondsTime 為當前服務器時間毫秒時間戳。
  2.  sequenceNumber 當前序列號,取值來源于當前毫秒內,生成消息的順序,默認從 0 開始加 1 遞增。

比如:1578238486193-3 表示在 1578238486193 毫秒的時間戳時,添加的第 4 條消息。

除了服務端自動生成 Id 方式外,也支持指定 Id 的生成,但是指定 Id 有以下條件限制:

  1.  Id 中的前后部分必須為數字。
  2.  最小 Id 為 0-1,不能為 0-0,但是 2-0,3-0 .... 是被允許的。
  3.  添加的消息,Id 的前半部分不能比存在 Id 最大的值小,Id 后半部分不能比存在前半部分相同的最大后半部分小。

否則,當不滿足上述條件時,添加后會拋出異常: 

  1. (error) ERR The ID specified in XADD is equal or smaller than the target stream top item 

實際上,當添加一條消息時,會進行兩部操作。第一步,先判斷如果不存在 Streams,則創建 Streams 的名稱,再添加消息到 Streams 中。即使添加消息時,由于 Id 異常,也可以在 Redis 中存在以當前 Streams 的名稱。 Streams 中 Id 也可作為指針使用,因為它是一個有序的標記。

生產中,如果這樣使用添加消息,會存在一個問題,那就是消息數量太大時,會使服務宕機。這里 Streams 的設計初期也有考慮到這個問題,那就是可以指定 Streams 的容量。如果容量操作這個設定的值,就會對調舊的消息。在添加消息時,設置 MAXLEN 參數。 

  1. XADD person MAXLEN 5 * name ytao des https://ytao.top 

這樣就指定該了 Streams 中的容量為 5 條消息。也可使用 XTRIM 截取消息,從小到大剔除多余的消息: 

  1. XTRIM person MAXLEN 8 

消息數量

查看消息數量使用 XLEN 指令進行操作。 

  1. XLEN key 

例:查看 person 流中的消息數量: 

  1. > XLEN person  
  2. (integer) 5 

查詢消息

查詢 Streams 中的消息使用 XRANGE 和 XREVRANGE 指令。

XRANGE

查詢數據時,可以按照指定 Id 范圍進行查詢,XRANGE 查詢指令格式: 

  1. XRANGE key start end [COUNT count] 

參數說明:

  •  key 為 Streams 的名稱
  •  start 為范圍查詢開始 Id,包含本 Id。
  •  start 為范圍查詢結束 Id,包含本 Id。
  •  Count 為查詢返回最大的消息數量,非必填。

這里 start 和 end 有-和+兩個非指定值,他們分別表示無窮小和無窮大,所以當使用這個兩個值時,會查詢出全部的消息。 

  1. > XRANGE person - +  
  2. 1) 1) "0-1"  
  3.    2) 1) "name"  
  4.       2) "ytao"  
  5.       3) "des"  
  6.       4) "https://ytao.top"  
  7. 2) 1) "0-2"  
  8.    2) 1) "name"  
  9.       2) "luffy"  
  10.       3) "des"  
  11.       4) "valiant!"  
  12. 3) 1) "2-0"  
  13.    2) 1) "name"  
  14.       2) "gaga"  
  15.       3) "des"  
  16.       4) "fishion!" 

上面查詢的消息數據,可以看到是按照先進先出的順序查詢出來的。

使用 COUNT 指定查詢返回的數量: 

  1. # 查詢所有的消息,并且返回一條數據  
  2. > XRANGE person - + COUNT 1  
  3. 1) 1) "0-1"  
  4.    2) 1) "name"  
  5.       2) "ytao"  
  6.       3) "des"  
  7.       4) "https://ytao.top" 

在范圍查詢中,Id 的后半部分可省略,后半部分中的數據會全部查詢到。

XREVRANGE

XREVRANGE 的查詢和 XRANGE 指令中的使用類似,但查詢的 start 和 end 參數順序進行了調換: 

  1. XREVRANGE key end start [COUNT count] 

使用案例: 

  1. > XREVRANGE person +  -  
  2. 1) 1) "2-0"  
  3.    2) 1) "name"  
  4.       2) "gaga"  
  5.       3) "des"  
  6.       4) "fishion!"  
  7. 2) 1) "0-2"  
  8.    2) 1) "name"  
  9.       2) "luffy"  
  10.       3) "des"  
  11.       4) "valiant!"  
  12. 3) 1) "0-1"  
  13.    2) 1) "name"  
  14.       2) "ytao"  
  15.       3) "des"  
  16.       4) "https://ytao.top" 

查詢后的結果與 XRANGE 的結果順序剛好相反,其他都一樣,這兩個指令可進行消息的升序和降序的返回。

刪除消息

刪除消息使用 XDEL 指令操作,只需指定將要刪除的 Streams 名稱和 Id 即可,支持一次刪除多個消息 。 

  1. XDEL key ID [ID ...] 

刪除案例: 

  1. # 查詢所有消息  
  2. > XRANGE person - +  
  3. 1) 1) "0-1"  
  4.    2) 1) "name"  
  5.       2) "ytao"  
  6.       3) "des"  
  7.       4) "https://ytao.top"  
  8. 2) 1) "0-2"  
  9.    2) 1) "name"  
  10.       2) "luffy"  
  11.       3) "des"  
  12.       4) "valiant!"  
  13. 3) 1) "2-0"  
  14.    2) 1) "name"  
  15.       2) "gaga"  
  16.       3) "des"  
  17.       4) "fishion!"  
  18. # 刪除消息        
  19. > XDEL person 2-0  
  20. (integer) 1  
  21. # 再次查詢刪除后的所有消息  
  22. > XRANGE person - +  
  23. 1) 1) "0-1"  
  24.    2) 1) "name"  
  25.       2) "ytao"  
  26.       3) "des"  
  27.       4) "https://ytao.top"  
  28. 2) 1) "0-2"  
  29.    2) 1) "name"  
  30.       2) "luffy"  
  31.       3) "des"  
  32.       4) "valiant!"  
  33. # 查詢刪除后的長度        
  34. > XLEN person  
  35. (integer) 2        

從上面可以看到,刪除消息后,長度也會減少相應的數量。

消費消息

在 Redis 的 PUB/SUB 中,我們是通過訂閱來消費消息,在 Streams 數據結構中,同樣也能實現同等功能,當沒有新的消息時,可進行阻塞等待。不僅支持單獨消費,而且還可以支持群組消費。

單獨消費

單獨消費使用 XREAD 指令。可以看到,下面命令中,STREAMS,key, 以及 ID 為必填項。ID 表示將要讀取大于該 ID 的消息。當 ID 值使用 $ 賦予時,表示已存在消息的最大 Id 值。 

  1. XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] 

上面的 COUNT 參數用來指定讀取的最大數量,與 XRANGE 的用法一樣。 

  1. > XREAD COUNT 1 STREAMS person 0  
  2. 1) 1) "person"  
  3.    2) 1) 1) "0-1"  
  4.          2) 1) "name"  
  5.             2) "ytao"  
  6.             3) "des"  
  7.             4) "https://ytao.top"  
  8. > XREAD COUNT 2 STREAMS person 0  
  9. 1) 1) "person"  
  10.    2) 1) 1) "0-1"  
  11.          2) 1) "name"  
  12.             2) "ytao"  
  13.             3) "des"  
  14.             4) "https://ytao.top"  
  15.       2) 1) "0-2"  
  16.          2) 1) "name"  
  17.             2) "luffy"  
  18.             3) "des"  
  19.             4) "valiant!" 

在 XREAD 里面還有個 BLOCK 參數,這個是用來阻塞訂閱消息的,BLOCK 攜帶的參數為阻塞時間,單位為毫秒,如果在這個時間內沒有新的消息消費,那么就會釋放該阻塞。當這里的時間指定為 0 時,會一直阻塞,直到有新的消息來消費到。 

  1. # 窗口 1 開啟阻塞,等待新消息的到來  
  2. > XREAD BLOCK 0 STREAMS person $  
  3. # 另開一個連接窗口 2,添加一條新的消息  
  4. > XADD person 2-2 name tao des coder  
  5. "2-2"  
  6. # 窗口 1,獲取到有新的消息來消費,并且帶有阻塞的時間  
  7. > XREAD BLOCK 0 STREAMS person $  
  8. 1) 1) "person"  
  9.    2) 1) 1) "2-2"  
  10.          2) 1) "name"  
  11.             2) "tao"  
  12.             3) "des"  
  13.             4) "coder"  
  14. (60.81s) 

當使用 XREAD 進行順序消費時,需要額外記錄下讀取到位置的 Id,方便下次繼續消費。

群組消費

群組消費的主要目的也就是為了分流消息給不同的客戶端處理,以更高效的速率處理消息。為達到這一肝功能需求,我們需要做三件事:創建群組,群組讀取消息,向服務端確認消息以處理。

群組操作

操作群組使用 XGROUP 指令: 

  1. XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername] 

上面命令中,包含操作有:

  •  CREATE 創建消費組。
  •  SETID 修改下一個處理消息的 Id。
  •  DESTROY 銷毀消費組。
  •  DELCONSUMER 刪除消費組中指定的消費者。

我們當前需要使用的是創建消費組: 

  1. # 以當前存在的最大 Id 作為消費起始   
  2. > XGROUP CREATE person group1 $  
  3. OK 

群組讀取消息

群組讀取使用 XREADGROUP 指令,COUNT和BLOCK的使用類似 XREAD 的操作,只是多了個群組和消費者的指定: 

  1. XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] 

由于群組消費和單獨消費類似,這里只進行個阻塞分析,這里 Id 也有個特殊值>,表示還未進行消費的消息: 

  1. # 窗口 1,消費群組中,taotao 消費者建立阻塞監聽  
  2. XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >  
  3. # 窗口 2,消費群組中,yangyang 消費者建立阻塞監聽   
  4. XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >  
  5. # 窗口 3,添加消費消息  
  6. > XADD person 3-1 name tony des 666  
  7. "3-1"  
  8. # 窗口 1,讀取到新消息,此時 窗口 2 沒有任何反應  
  9. > XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >  
  10. 1) 1) "person"  
  11.    2) 1) 1) "3-1"  
  12.          2) 1) "name"  
  13.             2) "tony"  
  14.             3) "des"  
  15.             4) "666"  
  16. (77.54s)  
  17. # 窗口 3,再次添加消費消息  
  18. > XADD person 3-2 name james des abc!  
  19. "3-2"  
  20. # 窗口 2,讀取到新消息,此時 窗口 1 沒有任何反應  
  21. > XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >  
  22. 1) 1) "person"  
  23.    2) 1) 1) "3-2"  
  24.          2) 1) "name"  
  25.             2) "james"  
  26.             3) "des"  
  27.             4) "abc!"  
  28. (76.36s) 

以上執行流程中,group1 群組中有兩個消費者,當添加兩條消息后,這兩個消費者輪流消費。

消息ACK

消息消費后,為避免再次重復消費,這是需要向服務端發送 ACK,確保消息被消費后的標記。 例如下列情況,我們上面我們將最新兩條消息已進行了消費,但是當我們再次讀取消息時,還是被讀到: 

  1. >  XREADGROUP GROUP group1 yangyang STREAMS person 0  
  2. 1) 1) "person"  
  3.    2) 1) 1) "3-2"  
  4.          2) 1) "name"  
  5.             2) "james"  
  6.             3) "des"  
  7.             4) "abc!" 

這時,我們使用 XACK 指令告訴服務器,我們已處理的消息: 

  1. XACK key group ID [ID ...]0 

讓服務器標記 3-2 已處理: 

  1. > XACK person group1 3-2  
  2. (integer) 1 

再次獲取群組讀取消息: 

  1. >  XREADGROUP GROUP group1 yangyang STREAMS person 0  
  2. 1) 1) "person"  
  3.    2) (empty list or set) 

隊列中沒有了可讀消息。 除了上面以講解到的 API 外,查看消費群組信息可使用 XINFO 指令查看,本文不做分析。

總結

上面對 Streams 常用 API 進行了分析,我們可以感受到 Redis 在消息隊列支持的道路上,也越來越強大。如果使用過它的 PUB/SUB 功能的話,就會感受到 5.x 迭代正是將你的一些痛點進行了優化。 

 

責任編輯:龐桂玉 來源: 中國開源
相關推薦

2021-01-12 08:43:29

Redis ListStreams

2022-04-12 11:15:31

Redis消息隊列數據庫

2009-06-29 17:42:03

Tapestry5新特

2011-08-30 09:07:30

HTML 5

2018-07-30 08:37:02

數據庫Redis數據結構

2024-03-22 12:10:39

Redis消息隊列數據庫

2018-12-05 09:00:00

RedisRedis Strea數據庫

2024-10-25 08:41:18

消息隊列RedisList

2011-07-12 13:21:34

2011-11-09 10:05:26

HTML 5

2021-03-06 08:10:16

Redis6 Java架構分布式框架

2023-12-30 13:47:48

Redis消息隊列機制

2024-04-19 08:32:07

Redis緩存數據庫

2017-10-11 15:08:28

消息隊列常見

2009-09-25 10:23:51

HTML 5新特性

2014-04-15 15:45:22

Java8Java8教程

2021-07-19 07:55:24

多線程模型Redis

2022-01-15 07:20:18

Redis List 消息隊列

2022-01-21 19:22:45

RedisList命令

2011-04-25 14:20:49

DojoHTML 5
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 人操人人干人 | 中文字幕 欧美 日韩 | 国产在线麻豆精品入口 | 看a级黄色毛片 | 色婷婷精品国产一区二区三区 | 国产在线小视频 | 国产精品无码久久久久 | 国产精品资源在线 | 91视视频在线观看入口直接观看 | 日韩一区二区三区在线视频 | 久久久久久免费毛片精品 | 国产精品美女在线观看 | 国产日本精品视频 | 一区二区三区四区不卡视频 | 99re6在线 | 天天拍天天操 | 香蕉婷婷| 欧美精品日韩精品国产精品 | 欧美一级毛片免费观看 | 麻豆久久久久久久久久 | 国产精品久久欧美久久一区 | 成人在线视频免费观看 | 午夜一区二区三区在线观看 | 久久51| 蜜臀91视频| 99国产精品久久久 | 99精品在线观看 | 成人免费毛片在线观看 | 最新中文字幕在线 | 黄色av网站免费看 | 国产精品久久久久久久久大全 | 天天干 夜夜操 | 精品国产一区二区三区性色av | 久久福利电影 | 国产精品一区二区免费 | 免费在线色 | 免费黄色片视频 | 国产一区二区在线免费观看 | 中文字幕视频在线看5 | 国产精品99久久久久久人 | 99re免费|