聊聊 Redis 的發布訂閱設計與實現
借一個午休的時光整理一下關于redis發布訂閱源碼的設計與實現,通過本文的閱讀,你將會對發布訂閱模型的設計思想以及對哨兵間選舉通信的流程有著更底層的視角。
一、詳解redis中發布訂閱的設計
1. channel的設計
redis服務端啟動時,會初始化一個記錄channel以及channel訂閱者的鍵值對結構,它用channel的名稱作為key,用一個鏈表記錄這個訂閱這個channel的客戶端:
對此我們給出redis初始化函數initServer的代碼片段,可以看到其內部調用dictCreate方法為pubsub_channels 這個記錄channel和channel訂閱者的指針初始化了一個頻道名稱為key,鏈表為value的字典:
void initServer(void) {
//......
// 初始化pubsub_channels存儲頻道信息,keylistDictType用頻道名稱作為key,訂閱者list作為value
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
//......
}
2. pub/sub的實現
當客戶端1通過SUBSCRIBE mychannel訂閱mychannel這個頻道,本質上就是redis服務端解析SUBSCRIBE指令并調用subscribeCommand函數,該方法會檢查這個channel是否存在,如果不存在則則以channel名稱為key,初始化一個鏈表作為value,將訂閱這個channel的客戶端追加到鏈表中。反之,如果channel存在則直接將客戶端信息存入鏈表即可:
圖片==
對此我們給出對應的源碼實現,該函數subscribeCommand位于pubsub.c可以看到其入口邏輯就是遍歷參數得到當前客戶端想訂閱的頻道,然后調用pubsubSubscribeChannel將該客戶端追加到這個頻道的鏈表上:
void subscribeCommand(redisClient *c) {
int j;
//遍歷頻道將該客戶端存入
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
//將當前客戶端標識為做了發布訂閱
c->flags |= REDIS_PUBSUB;
}
我們步入pubsubSubscribeChannel方法即可看到上圖所說明的邏輯,如果對應的頻道不存在,則初始化然后將客戶端追加到鏈表中,反之直接追加到鏈表中:
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
//頻道添加到pubsub_channels中
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
//查看這個頻道的訂閱者鏈表是否存在
de = dictFind(server.pubsub_channels,channel);
//如果頻道不存在,則直接初始化鏈表
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
//將客戶端追加的鏈表尾巴
listAddNodeTail(clients,c);
}
//......
}
同理,當我們通過redis客戶端鍵入publish mychannel "hello"向mychannel 發送一個hello消息時,redis服務端會解析這條publish指令并調用publishCommand完成消息發布,通知到各個訂閱者:
圖片==
我們給出publishCommand的源碼,位于pubsub.c這個源代碼文件中,可以看到這段代碼會將channel和對應的消息傳入pubsubPublishMessage方法中,并返回接收者數:
void publishCommand(redisClient *c) {
//發布消息返回接收者 PUBLISH <channel> <message>,返回接收者的數量
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
//......
}
步入pubsubPublishMessage即可看到發布消息的核心邏輯,可以看到這個方法用receivers來記錄接收的通知者,它會先進行精準匹配,到pubsub_channels找到和channel名字一致的channel并向該channel的訂閱者發布消息,然后在進行模糊匹配,遍歷所有的channel找到模糊匹配上的channel并向訂閱者發布消息:
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
listNode *ln;
listIter li;
//查找名字相同的channel
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de);
//......
//移動至訂閱者鏈表首部
listRewind(list,&li);
//遍歷并向這些訂閱者發布消息
while ((ln = listNext(&li)) != NULL) {
redisClient *c = ln->value;
//......
//發布消息
addReplyBulk(c,message);
//接收數++
receivers++;
}
}
if (listLength(server.pubsub_patterns)) {
//移動至channel鏈表首部
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
//遍歷channel
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
//找到匹配的channel并發布消息
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
//......
addReplyBulk(pat->client,message);
receivers++;
}
}
}
return receivers;
}
3. 哨兵如何利用發布訂閱完成消息通信的
關于pub/sub模式,redis中的哨兵就很好的利用這種模式進行溝通和選舉等各個工作,當我們的redis以哨兵的方式啟動時,redis會定期執行哨兵的定時任務,該任務會在檢查連接時檢查發布訂閱master的連接是否為空,若為空則調用異步連接綁定的方式訂閱master的"__sentinel__:hello"頻道,而該頻道主要負責下面這些工作:
- Sentinel 實例的發現與信息交換:每個 Sentinel 實例會定期通過 __sentinel__:hello 頻道發布自己的信息,包括 Sentinel 的 IP 地址、端口、運行 ID、當前配置的紀元(epoch)等。 其他 Sentinel 實例會訂閱這個頻道,從而感知到其他 Sentinel 的存在,并獲取它們的信息。 監控主從節點的狀態:
- Sentinel 實例通過 __sentinel__:hello 頻道共享它們對 Redis 主節點和從節點的監控信息:例如,某個 Sentinel 實例檢測到主節點不可用時,會通過這個頻道通知其他 Sentinel 實例,以便它們確認并共同決定是否進行故障轉移。 故障轉移的協調:
在故障轉移過程中,Sentinel 實例會通過 __sentinel__:hello 頻道交換信息,協調誰來執行故障轉移操作,并確保只有一個 Sentinel 實例負責執行。
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
//......
//檢查發布訂閱是否為空
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && ri->pc == NULL) {
//若為空則pc指針指向異步連接
ri->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR);
if (ri->pc->err) {
//......
} else {//如果沒有報錯,則訂閱__sentinel__:hello頻道
int retval;
//......
//哨兵訂閱 __sentinel__:hello 頻道(也就是下面的常量SENTINEL_HELLO_CHANNEL),通過sentinelReceiveHelloMessages處理回調
retval = redisAsyncCommand(ri->pc,
sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
SENTINEL_HELLO_CHANNEL);
//......
}
}
}
//......
}
通過master的hello頻道,哨兵會定期publish自己的信息到hello頻道,其他哨兵就可以基于這個頻道發現其他的哨兵由此完成通信:
對此我們給出哨兵定期發送hello的函數入口sentinelSendPeriodicCommands,這個方法會被定期執行,其內部邏輯一旦檢查到pub/sub時間間隔過長時就會發送調用sentinelSendHello向hello頻道發送當前哨兵的信息讓其他哨兵感知:
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
//......
ping_period = ri->down_after_period;
if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
//......
} else if ((now - ri->last_pong_time) > ping_period) {//超過ping間隔發ping
//......
} else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
//超過pub最大間隔SENTINEL_PUBLISH_PERIOD則發送發送哨兵自身ip端口等信息到hello頻道
sentinelSendHello(ri);
}
}
步入sentinelSendHello即可看到我們上文所說的邏輯,可以看到當前哨兵會組裝個人信息通過異步連接cc指針維護的連接信息PUBLISH 個人信息到hello頻道:
int sentinelSendHello(sentinelRedisInstance *ri) {
//......
//獲取當前哨兵ip
if (sentinel.announce_ip) {
announce_ip = sentinel.announce_ip;
} else {
if (anetSockName(ri->cc->c.fd,ip,sizeof(ip),NULL) == -1)
return REDIS_ERR;
announce_ip = ip;
}
//獲取當前哨兵端口
announce_port = sentinel.announce_port ?
sentinel.announce_port : server.port;
//將數據拼接到payload中
snprintf(payload,sizeof(payload),
"%s,%d,%s,%llu," /* Info about this sentinel. */
"%s,%s,%d,%llu", /* Info about current master. */
announce_ip, announce_port, server.runid,
(unsigned long long) sentinel.current_epoch,
/* --- */
master->name,master_addr->ip,master_addr->port,
(unsigned long long) master->config_epoch);
//將組裝的哨兵信息publish到hello頻道(SENTINEL_HELLO_CHANNEL就是hello頻道的常量變量值)
retval = redisAsyncCommand(ri->cc,
sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
SENTINEL_HELLO_CHANNEL,payload);
//......
}
二、小結
自此我們將發現redis發布訂閱的設計與實現,本質上就是通過一個個鏈表管理訂閱者,通過pub指令定位到channel后將消息遍歷發送到對應客戶端socket上,這里筆者也簡單的補充一句,從源碼中我們可以看到redis的發布訂閱模型沒有持久化機制,所以對于可靠性要求高的場景筆者還是不太建議使用pub/sub。