彌補延時消息的不足,RocketMQ 基于時間輪算法實現了定時消息!
?大家好,我是君哥。
在 RocketMQ 4.x 版本,使用延時消息來實現消息的定時消費。延時消息可以一定程度上實現定時發送,但是有一些局限。
RocketMQ 新版本基于時間輪算法引入了定時消息,目前,精確到秒級的定時消息實現的 pr 已經提交到社區,今天來介紹一下。
1 延時消息
1.1 簡介
RocketMQ 的延時消息是指 Producer 發送消息后,Consumer 不會立即消費,而是需要等待固定的時間才能消費。在一些場景下,延時消息是很有用的,比如電商場景下關閉 30 分鐘內未支付的訂單。
使用延時消息非常簡單,只需要給消息的 delayTimeLevel 屬性賦值就可以。參考下面代碼:
延時消息有 18 個級別,如下:
1.2 實現原理
延時消息的實現原理如下圖:
Producer 把消息發送到 Broker 后,Broker 判斷到是延時消息,首先會把消息投遞到延時隊列(Topic = SCHEDULE_TOPIC_XXXX,queueId = delayTimeLevel - 1)。定時任務線程池會有 18 個線程來對延時隊列進行調度,每個線程調度一個延時級別,調度任務把延時消息再投遞到原始隊列,這樣 Consumer 就可以拉取到了。
1.3 存在不足
延時消息存在著一些不足:
1.延時級別只有 18 個,并不能滿足所有場景;
2.如果通過修改 messageDelayLevel 配置來自定義延時級別,并不靈活,比如一個在大規模的平臺上,延時級別成百上千,而且隨時可能增加新的延時時間;
3.延時時間不準確,后臺的定時線程可能會因為處理消息量大導致延時誤差大。
2 定時消息
為了彌補延時消息的不足,RocketMQ 5.0 引入了定時消息。
2.1 時間輪算法
為了解決定時任務隊列遍歷任務導致的性能開銷,RocketMQ 定時消息引入了秒級的時間輪算法。如下圖:
圖中是一個 60s 的時間輪,時間輪上會有一個指向當前時間的指針定時地移動到下一個時間(秒級)。
時間輪算法的優勢是不用去遍歷所有的任務,每一個時間節點上的任務用鏈表串起來,當時間輪上的指針移動到當前的時間時,這個時間節點上的全部任務都執行。
雖然上面只是一個 60s 的時間輪,但是對于所有的時間延時,都是支持的。可以在每個時間節點增加一個 round 字段,記錄時間輪轉動的圈數,比如對于延時 130s 的任務,round 就是 2,放在第 10 個時間刻度的鏈表中。這樣當時間輪轉到一個節點,執行節點上的任務時,首先判斷 round 是否等于 0,如果等于 0,則把這個任務從任務鏈表中移出交給異步線程執行,否則將 round 減 1 繼續檢查后面的任務。
2.2 使用方式
基于時間輪算法的思想,RocketMQ 實現了精準的定時消息。使用 RocketMQ 定時消息時,客戶端定義消息的示例代碼如下:
2.3 實現原理
2.3.1 消息投遞
上面的代碼構中,Producer 創建消息時給消息傳了一個系統屬性 deliveryTimestamp,這個屬性指定了消息投遞的時間,并且封裝到消息的 TIMER_DELIVER_MS 屬性,代碼如下:
Broker 收到這個消息后,如果判斷到 TIMER_DELIVER_MS 這個屬性有值,就會把這個消息投遞到 Topic 是 rmq_sys_wheel_timer 的隊列中,queueId 是 0,同時會保存原始消息的 Topic、queueId、投遞時間(TIMER_OUT_MS)。
TimerMessageStore 中有個定時任務 TimerEnqueueGetService 會從 rmq_sys_wheel_timer 這個 Topic 中讀取消息,然后封裝 TimerRequest 請求并放到隊列 enqueuePutQueue。
2.3.2 綁定時間輪
RocketMQ 使用 TimerLog 來保存消息的原始數據綁定到時間輪上。首先看一下 TimerLog 保存的數據結構,如下圖:
參考下面代碼:
TimerEnqueuePutService 這個定時任務從上面的 enqueuePutQueue(2.3.1節) 取出 TimerRequest 然后封裝成 TimerLog。
那時間輪是怎么跟 TimerLog 關聯起來的呢?RocketMQ 使用 TimerWheel 來描述時間輪,TimerWheel 中每一個時間節點是一個 Slot,Slot 保存了這個延時時間的 TimerLog 信息。數據結構如下圖:
參考下面代碼:
這樣時間輪跟 TimerLog 就關聯起來了,見下圖:
如果時間輪的一個時間節點(Slot)上有一條新的消息到來,那只要新建一個 TimerLog,然后把它的指針指向該時間節點的最后一個 TimerLog,然后把 Slot 的 lastPos 屬性指向新建的這個 TimerLog,如下圖:
從源碼上看,RocketMQ 定義了一個 7 天的以秒為單位的時間輪。
2.3.3 時間輪轉動
轉動時間輪時,TimerDequeueGetService 這個定時任務從當前時間節點(Slot)對應的 TimerLog 中取出數據,封裝成 TimerRequest 放入 dequeueGetQueue 隊列。
2.3.4 CommitLog 中讀取消息
定時任務 TimerDequeueGetMessageService 從隊列 dequeueGetQueue 中拉取 TimerRequest 請求,然后根據 TimerRequest 中的參數去 CommitLog(MessageExt) 中查找消息,查出后把消息封裝到 TimerRequest 中,然后把 TimerRequest 寫入 dequeuePutQueue 這個隊列。
2.3.5 寫入原隊列
定時任務 TimerDequeuePutMessageService 從 dequeuePutQueue 隊列中獲取消息,把消息轉換成原始消息,投入到原始隊列中,這樣消費者就可以拉取到了。
3 總結
RocketMQ 4.x 版本只支持延時消息,有一些局限性。而 RocketMQ 新版本引入了定時消息,彌補了延時消息的不足。定時消息的處理流程如下圖:
可以看到,RocketMQ 的定時消息的實現還是有一定復雜度的,這里用到 5 個定時任務和 3 個隊列來實現。
最后,對于定時時間的定義,客戶端、Broker 和時間輪的默認最大延時時間定義是不同的,使用的時候需要注意。