成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

彌補延時消息的不足,RocketMQ 基于時間輪算法實現了定時消息!

開發 架構
RocketMQ 的延時消息是指 Producer 發送消息后,Consumer 不會立即消費,而是需要等待固定的時間才能消費。在一些場景下,延時消息是很有用的,比如電商場景下關閉 30 分鐘內未支付的訂單。

?大家好,我是君哥。

在 RocketMQ 4.x 版本,使用延時消息來實現消息的定時消費。延時消息可以一定程度上實現定時發送,但是有一些局限。

RocketMQ 新版本基于時間輪算法引入了定時消息,目前,精確到秒級的定時消息實現的 pr 已經提交到社區,今天來介紹一下。

1 延時消息

1.1 簡介

RocketMQ 的延時消息是指 Producer 發送消息后,Consumer 不會立即消費,而是需要等待固定的時間才能消費。在一些場景下,延時消息是很有用的,比如電商場景下關閉 30 分鐘內未支付的訂單。

使用延時消息非常簡單,只需要給消息的 delayTimeLevel 屬性賦值就可以。參考下面代碼:

Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
//3 個級別,10s
message.setDelayTimeLevel(3);
producer.send(message);

延時消息有 18 個級別,如下:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

1.2 實現原理

延時消息的實現原理如下圖:

圖片

Producer 把消息發送到 Broker 后,Broker 判斷到是延時消息,首先會把消息投遞到延時隊列(Topic = SCHEDULE_TOPIC_XXXX,queueId = delayTimeLevel - 1)。定時任務線程池會有 18 個線程來對延時隊列進行調度,每個線程調度一個延時級別,調度任務把延時消息再投遞到原始隊列,這樣 Consumer 就可以拉取到了。

1.3 存在不足

延時消息存在著一些不足:

1.延時級別只有 18 個,并不能滿足所有場景;

2.如果通過修改 messageDelayLevel 配置來自定義延時級別,并不靈活,比如一個在大規模的平臺上,延時級別成百上千,而且隨時可能增加新的延時時間;

3.延時時間不準確,后臺的定時線程可能會因為處理消息量大導致延時誤差大。

2 定時消息

為了彌補延時消息的不足,RocketMQ 5.0 引入了定時消息。

2.1 時間輪算法

為了解決定時任務隊列遍歷任務導致的性能開銷,RocketMQ 定時消息引入了秒級的時間輪算法。如下圖:

圖片

圖中是一個 60s 的時間輪,時間輪上會有一個指向當前時間的指針定時地移動到下一個時間(秒級)。

時間輪算法的優勢是不用去遍歷所有的任務,每一個時間節點上的任務用鏈表串起來,當時間輪上的指針移動到當前的時間時,這個時間節點上的全部任務都執行。

雖然上面只是一個 60s 的時間輪,但是對于所有的時間延時,都是支持的。可以在每個時間節點增加一個 round 字段,記錄時間輪轉動的圈數,比如對于延時 130s 的任務,round 就是 2,放在第 10 個時間刻度的鏈表中。這樣當時間輪轉到一個節點,執行節點上的任務時,首先判斷 round 是否等于 0,如果等于 0,則把這個任務從任務鏈表中移出交給異步線程執行,否則將 round 減 1 繼續檢查后面的任務。

2.2 使用方式

基于時間輪算法的思想,RocketMQ 實現了精準的定時消息。使用 RocketMQ 定時消息時,客戶端定義消息的示例代碼如下:

org.apache.rocketmq.common.message.Message messageExt = this.sendMessageActivity.buildMessage(null,
Lists.newArrayList(
Message.newBuilder()
.setTopic(Resource.newBuilder()
.setName(TOPIC)
.build())
.setSystemProperties(SystemProperties.newBuilder()
.setMessageId(msgId)
.setQueueId(0)
.setMessageType(MessageType.DELAY)
.setDeliveryTimestamp(Timestamps.fromMillis(deliveryTime))
//定義消息投遞時間
.setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis()))
.setBornHost(StringUtils.defaultString(RemotingUtil.getLocalAddress(), "127.0.0.1:1234"))
.build())
.setBody(ByteString.copyFromUtf8("123"))
.build()
),
Resource.newBuilder().setName(TOPIC).build()).get(0);

2.3 實現原理

2.3.1 消息投遞

上面的代碼構中,Producer 創建消息時給消息傳了一個系統屬性 deliveryTimestamp,這個屬性指定了消息投遞的時間,并且封裝到消息的 TIMER_DELIVER_MS 屬性,代碼如下:

protected void fillDelayMessageProperty(apache.rocketmq.v2.Message message, org.apache.rocketmq.common.message.Message messageWithHeader){
if (message.getSystemProperties().hasDeliveryTimestamp()) {
Timestamp deliveryTimestamp = message.getSystemProperties().getDeliveryTimestamp();
//delayTime 這個延時時間默認不能超過 1,可以配置
long deliveryTimestampMs = Timestamps.toMillis(deliveryTimestamp);
validateDelayTime(deliveryTimestampMs);
//...
String timestampString = String.valueOf(deliveryTimestampMs);
//MessageConst.PROPERTY_TIMER_DELIVER_MS="TIMER_DELIVER_MS"
MessageAccessor.putProperty(messageWithHeader, MessageConst.PROPERTY_TIMER_DELIVER_MS, timestampString);
}
}

Broker 收到這個消息后,如果判斷到 TIMER_DELIVER_MS 這個屬性有值,就會把這個消息投遞到 Topic 是 rmq_sys_wheel_timer 的隊列中,queueId 是 0,同時會保存原始消息的 Topic、queueId、投遞時間(TIMER_OUT_MS)。

TimerMessageStore 中有個定時任務 TimerEnqueueGetService 會從 rmq_sys_wheel_timer 這個 Topic 中讀取消息,然后封裝 TimerRequest 請求并放到隊列 enqueuePutQueue。

2.3.2 綁定時間輪

RocketMQ 使用 TimerLog 來保存消息的原始數據綁定到時間輪上。首先看一下 TimerLog 保存的數據結構,如下圖:

圖片

參考下面代碼:

//TimerMessageStore類
ByteBuffer tmpBuffer = timerLogBuffer;
tmpBuffer.clear();
tmpBuffer.putInt(TimerLog.UNIT_SIZE); //size
tmpBuffer.putLong(slot.lastPos); //prev pos
tmpBuffer.putInt(magic); //magic
tmpBuffer.putLong(tmpWriteTimeMs); //currWriteTime
tmpBuffer.putInt((int) (delayedTime - tmpWriteTimeMs)); //delayTime
tmpBuffer.putLong(offsetPy); //offset
tmpBuffer.putInt(sizePy); //size
tmpBuffer.putInt(hashTopicForMetrics(realTopic)); //hashcode of real topic
tmpBuffer.putLong(0); //reserved value, just set to 0 now
long ret = timerLog.append(tmpBuffer.array(), 0, TimerLog.UNIT_SIZE);
if (-1 != ret) {
// If it's a delete message, then slot's total num -1
// TODO: check if the delete msg is in the same slot with "the msg to be deleted".
timerWheel.putSlot(delayedTime, slot.firstPos == -1 ? ret : slot.firstPos, ret,
isDelete ? slot.num - 1 : slot.num + 1, slot.magic);

}

TimerEnqueuePutService 這個定時任務從上面的 enqueuePutQueue(2.3.1節) 取出 TimerRequest 然后封裝成  TimerLog。

那時間輪是怎么跟 TimerLog 關聯起來的呢?RocketMQ 使用 TimerWheel 來描述時間輪,TimerWheel 中每一個時間節點是一個 Slot,Slot 保存了這個延時時間的 TimerLog 信息。數據結構如下圖:

圖片

參考下面代碼:

//類 TimerWheel
public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic){
localBuffer.get().position(getSlotIndex(timeMs) * Slot.SIZE);
localBuffer.get().putLong(timeMs / precisionMs);
localBuffer.get().putLong(firstPos);
localBuffer.get().putLong(lastPos);
localBuffer.get().putInt(num);
localBuffer.get().putInt(magic);
}

這樣時間輪跟 TimerLog 就關聯起來了,見下圖:

圖片

如果時間輪的一個時間節點(Slot)上有一條新的消息到來,那只要新建一個 TimerLog,然后把它的指針指向該時間節點的最后一個 TimerLog,然后把 Slot 的 lastPos 屬性指向新建的這個 TimerLog,如下圖:

圖片

從源碼上看,RocketMQ 定義了一個 7 天的以秒為單位的時間輪。

2.3.3 時間輪轉動

轉動時間輪時,TimerDequeueGetService 這個定時任務從當前時間節點(Slot)對應的 TimerLog 中取出數據,封裝成 TimerRequest 放入 dequeueGetQueue 隊列。

2.3.4 CommitLog 中讀取消息

定時任務 TimerDequeueGetMessageService 從隊列 dequeueGetQueue 中拉取 TimerRequest 請求,然后根據 TimerRequest 中的參數去 CommitLog(MessageExt) 中查找消息,查出后把消息封裝到 TimerRequest 中,然后把 TimerRequest 寫入 dequeuePutQueue 這個隊列。

2.3.5 寫入原隊列

定時任務 TimerDequeuePutMessageService 從 dequeuePutQueue 隊列中獲取消息,把消息轉換成原始消息,投入到原始隊列中,這樣消費者就可以拉取到了。

3 總結

RocketMQ 4.x 版本只支持延時消息,有一些局限性。而 RocketMQ 新版本引入了定時消息,彌補了延時消息的不足。定時消息的處理流程如下圖:

圖片

可以看到,RocketMQ 的定時消息的實現還是有一定復雜度的,這里用到 5 個定時任務和 3 個隊列來實現。

最后,對于定時時間的定義,客戶端、Broker 和時間輪的默認最大延時時間定義是不同的,使用的時候需要注意。

責任編輯:武曉燕 來源: 君哥聊技術
相關推薦

2024-10-11 09:15:33

2022-06-13 11:05:35

RocketMQ消費者線程

2022-07-12 17:33:00

消息定時提醒鴻蒙

2022-12-22 10:03:18

消息集成

2022-05-24 10:43:02

延時消息分布式MQ

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2024-10-29 08:34:27

RocketMQ消息類型事務消息

2024-11-13 00:59:13

2021-07-14 17:18:14

RocketMQ消息分布式

2021-07-13 11:52:47

順序消息RocketMQkafka

2023-12-30 13:47:48

Redis消息隊列機制

2023-07-18 09:03:01

RocketMQ場景消息

2022-06-02 08:21:07

RocketMQ消息中間件

2025-04-09 08:20:00

RocketMQ消息隊列開發

2023-07-17 08:34:03

RocketMQ消息初體驗

2023-12-21 08:01:41

RocketMQ消息堆積

2022-03-31 08:26:44

RocketMQ消息排查

2021-10-03 21:41:13

RocketMQKafkaPulsar

2023-09-04 08:00:53

提交事務消息

2020-11-13 16:40:05

RocketMQ延遲消息架構
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 中国三级黄色录像 | 在线日韩中文字幕 | 日本 欧美 三级 高清 视频 | 日韩av最新网址 | 老司机久久 | 久久成人精品视频 | 免费一区二区三区 | 中文精品一区二区 | 欧美激情一区二区 | 天天操天天干天天透 | 久在线视频播放免费视频 | 一区二区免费视频 | 欧美视频偷拍 | 国产清纯白嫩初高生在线播放视频 | 日韩国产一区二区三区 | 国产日批| 亚洲成人精品 | 久久综合一区二区 | 久久久久久毛片免费观看 | 久久中文字幕一区 | 天天插天天舔 | 中文字幕综合 | 久久激情五月丁香伊人 | 日韩电影在线 | 伊人春色在线 | 久久精品欧美一区二区三区不卡 | 一级aaaa毛片 | 91大片 | 狠狠入ady亚洲精品经典电影 | 男女啪啪网址 | 看av在线| 在线日韩中文字幕 | 久久久精 | 999久久久久久久久6666 | 国产精品18hdxxxⅹ在线 | 欧美高清免费 | 人人玩人人添人人澡欧美 | 成人毛片网站 | av网站免费观看 | 国产精品一区二区免费 | 青青草国产在线观看 |