成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

RocketMQ 有哪些消息類型,你知道嗎?

開發 前端
Consumer的pullBatchSize屬性與consumeMessageBatchMaxSize屬性是否設置的越大越好?當然不是。 pullBatchSize值設置的越大,Consumer每拉取一次需要的時間就會越長,且在網絡上傳輸出現 問題的可能性就越高。若在拉取過程中若出現了問題,那么本批次所有消息都需要全部重新拉取。

一、普通消息

1 消息發送分類

Producer對于消息的發送方式也有多種選擇,不同的方式會產生不同的系統效果。

同步發送消息

同步發送消息是指,Producer發出?條消息后,會在收到MQ返回的ACK之后才發下?條消息。該方式 的消息可靠性最高,但消息發送效率太低。

異步發送消息

異步發送消息是指,Producer發出消息后無需等待MQ返回ACK,直接發送下?條消息。該方式的消息可靠性可以得到保障,消息發送效率也可以。

單向發送消息

單向發送消息是指,Producer僅負責發送消息,不等待、不處理MQ的ACK。該發送方式時MQ也不返回ACK。該方式的消息發送效率最高,但消息可靠性較差。

二、順序消息

1 什么是順序消息

順序消息指的是,嚴格按照消息的發送順序進行消費的消息(FIFO)。

默認情況下生產者會把消息以Round Robin輪詢方式發送到不同的Queue分區隊列;而消費消息時會從多個Queue上拉取消息,這種情況下的發送和消費是不能保證順序的。如果將消息僅發送到同一個Queue中,消費時也只從這個Queue上拉取消息,就嚴格保證了消息的順序性。

2 為什么需要順序消息

例如,現在有TOPIC ORDER_STATUS (訂單狀態),其下有4個Queue隊列,該Topic中的不同消息用于 描述當前訂單的不同狀態。假設訂單有狀態:未支付已支付發貨中發貨成功發貨失敗

根據以上訂單狀態,生產者從時序上可以生成如下幾個消息:

訂單T0000001:未支付 --> 訂單T0000001:已支付 --> 訂單T0000001:發貨中 --> 訂單

T0000001:發貨失敗

消息發送到MQ中之后,Queue的選擇如果采用輪詢策略,消息在MQ的存儲可能如下:

這種情況下,我們希望Consumer消費消息的順序和我們發送是一致的,然而上述MQ的投遞和消費方式,我們無法保證順序是正確的。對于順序異常的消息,Consumer即使設置有一定的狀態容錯,也不能完全處理好這么多種隨機出現組合情況。

基于上述的情況,可以設計如下方案:對于相同訂單號的消息,通過一定的策略,將其放置在一個Queue中,然后消費者再采用一定的策略(例如,一個線程獨立處理一個queue,保證處理消息的順序性),能夠保證消費的順序性。

3 有序性分類

根據有序范圍的不同,RocketMQ可以嚴格地保證兩種消息的有序性:分區有序全局有序

全局有序

當發送和消費參與的Queue只有一個時所保證的有序是整個Topic中消息的順序, 稱為全局有序

在創建Topic時指定Queue的數量。有三種指定方式:

1)在代碼中創建Producer時,可以指定其自動創建的TopicQueue數量

2)在RocketMQ可視化控制臺中手動創建Topic時指定Queue數量

3)使用mqadmin命令手動創建Topic時指定Queue數量

分區有序

如 果有多個Queue參與,其僅可保證在該Queue分區隊列上的消息順序,則稱為分區有序

如何實現Queue的選擇?在定義Producer時我們可以指定消息隊列選擇器,而這個選擇器是我們自己實現了MessageQueueSelector接口定義的。

在定義選擇器的選擇算法時,一般需要使用選擇key。這個選擇key可以是消息key也可以是其它數據。但無論誰做選擇key,都不能重復,都是唯一的。

一般性的選擇算法是,讓選擇key(或其hash值)與該Topic所包含的Queue的數量取模,其結果即為選擇出的Queue的QueueId。

取模算法存在一個問題:不同選擇keyQueue數量取模結果可能會是相同的,即不同選擇key的消息可能會出現在相同的Queue,即同一個Consuemr可能會消費到不同選擇key的消息。這個問題如何解決?一般性的作法是,從消息中獲取到選擇key,對其進行判斷。若是當前Consumer需要消費的消息,則直接消費,否則,什么也不做。這種做法要求選擇key要能夠隨著消息一起被Consumer獲取到。此時使用消息key作為選擇key是比較好的做法。

以上做法會不會出現如下新的問題呢?不屬于那個Consumer的消息被拉取走了,那么應該消費該消息的Consumer是否還能再消費該消息呢?同一個Queue中的消息不可能被同一個Group中的 不同Consumer同時消費。所以,消費現一個Queue的不同選擇key的消息的Consumer一定屬于不同的Group。而不同的Group中的Consumer間的消費是相互隔離的,互不影響的。

三、延時消息

1 什么是延時消息

當消息寫入到Broker后,在指定的時長后才可被消費處理的消息,稱為延時消息。

采用RocketMQ的延時消息可以實現定時任務的功能,而無需使用定時器。典型的應用場景是,電商交易中超時未支付關閉訂單的場景,12306平臺訂票超時未支付取消訂票的場景。

在電商平臺中,訂單創建時會發送一條延遲消息。這條消息將會在30分鐘后投遞給后臺業務系 統(Consumer),后臺業務系統收到該消息后會判斷對應的訂單是否已經完成支付。如果未完 成,則取消訂單,將商品再次放回到庫存;如果完成支付,則忽略。

12306平臺中,車票預訂成功后就會發送一條延遲消息。這條消息將會在45分鐘后投遞給后臺業務系統(Consumer),后臺業務系統收到該消息后會判斷對應的訂單是否已經完成支付。如 果未完成,則取消預訂,將車票再次放回到票池;如果完成支付,則忽略。

2 延時等級

延時消息的延遲時長不支持隨意時長的延遲,是通過特定的延遲等級來指定的。延時等級定義在RocketMQ服務端的MessageStoreConfig類中的如下變量中:

即,若指定的延時等級為3,則表示延遲時長為10s,即延遲等級是從1開始計數的。

當然,如果需要自定義的延時等級,可以通過在broker加載的配置中新增如下配置(例如下面增加了1天這個等級1d)。配置文件在RocketMQ安裝目錄下的conf目錄中。

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d

3 延時消息實現原理

具體實現方案是:

修改消息

Producer將消息發送到Broker后,Broker會首先將消息寫入到commitlog文件,然后需要將其分發到相應的consumequeue。不過,在分發之前,系統會先判斷消息中是否帶有延時等級。若沒有,則直接正

常分發;若有則需要經歷一個復雜的過程:

  • 修改消息的Topic為SCHEDULE_TOPIC_XXXX
  • 根據延時等級,在consumequeue目錄中SCHEDULE_TOPIC_XXXX主題下創建出相應的queueId目錄與consumequeue文件(如果沒有這些目錄與文件的話)。

延遲等級delayLevelqueueId的對應關系為queueId = delayLevel -1 需要注意,在創建queueId目錄時,并不是一次性地將所有延遲等級對應的目錄全部創建完畢,而是用到哪個延遲等級創建哪個目錄

  • 修改消息索引單元內容。索引單元中的Message Tag HashCode部分原本存放的是消息的Tag的Hash值。現修改為消息的投遞時間。投遞時間是指該消息被重新修改為原Topic后再次被寫入到commitlog中的時間。投遞時間 = 消息存儲時間 + 延時等級時間。消息存儲時間指的是消息被發送到Broker時的時間戳。
  • 將消息索引寫入到SCHEDULE_TOPIC_XXXX主題下相應的consumequeue中

SCHEDULE_TOPIC_XXXX目錄中各個延時等級Queue中的消息是如何排序的?

是按照消息投遞時間排序的。一個Broker中同一等級的所有延時消息會被寫入到consumequeue目錄中SCHEDULE_TOPIC_XXXX目錄下相同Queue中。即一個Queue中消息投遞時間的延遲等級時間是相同的。那么投遞時間就取決于于消息存儲時間了。即按照消息被發送到Broker的時間進行排序的。

投遞延時消息

Broker內部有?個延遲消息服務類ScheuleMessageService,其會消費SCHEDULE_TOPIC_XXXX中的消息,即按照每條消息的投遞時間,將延時消息投遞到?標Topic中。不過,在投遞之前會從commitlog

中將原來寫入的消息再次讀出,并將其原來的延時等級設置為0,即原消息變為了一條不延遲的普通消息。然后再次將消息投遞到目標Topic中。

ScheuleMessageServiceBroker啟動時,會創建并啟動一個定時器TImer,用于執行相應的定時任務。系統會根據延時等級的個數,定義相應數量的TimerTask,每個TimerTask負責一個延遲 等級消息的消費與投遞。每個TimerTask都會檢測相應Queue隊列的第一條消息是否到期。若第 一條消息未到期,則后面的所有消息更不會到期(消息是按照投遞時間排序的);若第一條消息到期了,則將該消息投遞到目標Topic,即消費該消息。

將消息重新寫入commitlog

延遲消息服務類ScheuleMessageService將延遲消息再次發送給了commitlog,并再次形成新的消息索引條目,分發到相應Queue。

這其實就是一次普通消息發送。只不過這次的消息Producer是延遲消息服務類 ScheuleMessageService

四、事務消息

1 問題引入

這里的一個需求場景是:工行用戶A向建行用戶B轉賬1萬元。

我們可以使用同步消息來處理該需求場景:

  • 工行系統發送一個給B增款1萬元的同步消息M給Broker
  • 消息被Broker成功接收后,向工行系統發送成功ACK
  • 工行系統收到成功ACK后從用戶A中扣款1萬元
  • 建行系統從Broker中獲取到消息M
  • 建行系統消費消息M,即向用戶B中增加1萬元

這其中是有問題的:若第3步中的扣款操作失敗,但消息已經成功發送到了Broker。對于MQ來 說,只要消息寫入成功,那么這個消息就可以被消費。此時建行系統中用戶B增加了1萬元。出 現了數據不一致問題。

2 解決思路

解決思路是,讓第1、2、3步具有原子性,要么全部成功,要么全部失敗。即消息發送成功后,必須要保證扣款成功。如果扣款失敗,則回滾發送成功的消息。而該思路即使用事務消息。這里要使用分布式事務解決方案。

使用事務消息來處理該需求場景:

  • 事務管理器TM向事務協調器TC發起指令,開啟全局事務
  • 工行系統發一個給B增款1萬元的事務消息M給TC
  • TC會向Broker發送半事務消息prepareHalf,將消息M預提交到Broker。此時的建行系統是看不到Broker中的消息M的
  • Broker會將預提交執行結果Report給TC。
  • 如果預提交失敗,則TC會向TM上報預提交失敗的響應,全局事務結束;如果預提交成功,TC會調用工行系統的回調操作,去完成工行用戶A的預扣款1萬元的操作
  • 工行系統會向TC發送預扣款執行結果,即本地事務的執行狀態
  • TC收到預扣款執行結果后,會將結果上報給TM。

預扣款執行結果存在三種可能性:

// 描述本地事務執行狀態 
public enum LocalTransactionState { 
    COMMIT_MESSAGE, // 本地事務執行成功 
    ROLLBACK_MESSAGE, // 本地事務執行失敗 
    UNKNOW, // 不確定,表示需要進行回查以確定本地事務的執行結果 
}
  • TM會根據上報結果向TC發出不同的確認指令

若預扣款成功(本地事務狀態為COMMIT_MESSAGE),則TM向TC發送Global Commit指令

若預扣款失敗(本地事務狀態為ROLLBACK_MESSAGE),則TM向TC發送Global Rollback指令

若現未知狀態(本地事務狀態為UNKNOW),則會觸發工行系統的本地事務狀態回查操作。回查操作會將回查結果,即COMMIT_MESSAGE或ROLLBACK_MESSAGE Report給TC。TC將結果上 報給TM,TM會再向TC發送最終確認指令Global Commit或Global Rollback

  • TC在接收到指令后會向Broker與工行系統發出確認指令

TC接收的若是Global Commit指令,則向Broker與工行系統發送Branch Commit指令。此時 Broker中的消息M才可被建行系統看到;此時的工行用戶A中的扣款操作才真正被確認 TC接收到的若是Global Rollback指令,則向Broker與工行系統發送Branch Rollback指令。此時 Broker中的消息M將被撤銷;工行用戶A中的扣款操作將被回滾

以上方案就是為了確保消息投遞扣款操作能夠在一個事務中,要成功都成功,有一個失敗,則全部回滾。

以上方案并不是一個典型的XA模式。因為XA模式中的分支事務是異步的,而事務消息方案中的消息預提交與預扣款操作間是同步的。

3 基礎

分布式事務

對于分布式事務,通俗地說就是,一次操作由若干分支操作組成,這些分支操作分屬不同應用,分布在 不同服務器上。分布式事務需要保證這些分支操作要么全部成功,要么全部失敗。分布式事務與普通事務一樣,就是為了保證操作結果的一致性。

事務消息

RocketMQ提供了類似X/Open XA的分布式事務功能,通過事務消息能達到分布式事務的最終一致。XA 是一種分布式事務解決方案,一種分布式事務處理模式。

半事務消息

暫不能投遞的消息,發送方已經成功地將消息發送到了Broker,但是Broker未收到最終確認指令,此時 該消息被標記成“暫不能投遞”狀態,即不能被消費者看到。處于該種狀態下的消息即半事務消息。

本地事務狀態

Producer回調操作執行的結果為本地事務狀態,其會發送給TC,而TC會再發送給TM。TM會根據TC發送來的本地事務狀態來決定全局事務確認指令。

消息回查

消息回查,即重新查詢本地事務的執行狀態。本例就是重新到DB中查看預扣款操作是否執行成功。

注意,消息回查不是重新執行回調操作。回調操作是進行預扣款操作,而消息回查則是查看預 扣款操作執行的結果。

引發消息回查的原因最常見的有兩個:

1)回調操作返回UNKNWON

2)TC沒有接收到TM的最終全局事務確認指令

RocketMQ中的消息回查設置

關于消息回查,有三個常見的屬性設置。它們都在broker加載的配置文件中設置,例如:

  • transactinotallow=20,指定TM在20秒內應將最終確認狀態發送給TC,否則引發消息回查。默認為60秒
  • transactinotallow=5,指定最多回查5次,超過后將丟棄消息并記錄錯誤日志。默認15次。
  • transactinotallow=10,指定設置的多次消息回查的時間間隔為10秒。默認為60秒。

4 XA模式三劍客

XA協議

XA(Unix Transaction)是一種分布式事務解決方案,一種分布式事務處理模式,是基于XA協議的。

XA協議由Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式操作擴展之后的Unix事務系統)首先提出的,并交給X/Open組織,作為資源管理器與事務管理器的接口標

準。

XA模式中有三個重要組件:TC、TM、RM。

TC

Transaction Coordinator,事務協調者。維護全局和分支事務的狀態,驅動全局事務提交或回滾。

RocketMQBroker充當著TC

TM

Transaction Manager,事務管理器。定義全局事務的范圍:開始全局事務、提交或回滾全局事務。它 實際是全局事務的發起者。

RocketMQ中事務消息的Producer充當著TM

RM

Resource Manager,資源管理器。管理分支事務處理的資源,與TC交談以注冊分支事務和報告分支事務的狀態,并驅動分支事務提交或回滾。

RocketMQ中事務消息的ProducerBroker均是RM

5 注意

事務消息不支持延時消息

對于事務消息要做好冪等性檢查,因為事務消息可能不止一次被消費(因為存在回滾后再提交的情況)

五、批量消息

1 批量發送消息

發送限制

生產者進行消息發送時可以一次發送多條消息,這可以大大提升Producer的發送效率。不過需要注意以 下幾點:

  • 批量發送的消息必須具有相同的Topic
  • 批量發送的消息必須具有相同的刷盤策略
  • 批量發送的消息不能是延時消息與事務消息

批量發送大小

默認情況下,一批發送的消息總大小不能超過4MB字節。如果想超出該值,有兩種解決方案:

方案一:將批量消息進行拆分,拆分為若干不大于4M的消息集合分多次批量發送

方案二:在Producer端與Broker端修改屬性

** Producer端需要在發送之前設置Producer的maxMessageSize屬性

** Broker端需要修改其加載的配置文件中的maxMessageSize屬性

生產者發送的消息大小

生產者通過send()方法發送的Message,并不是直接將Message序列化后發送到網絡上的,而是通過這 個Message生成了一個字符串發送出去的。這個字符串由四部分構成:Topic、消息Body、消息日志(占20字節),及用于描述消息的一堆屬性key-value。這些屬性中包含例如生產者地址、生產時間、要發送的QueueId等。最終寫入到Broker中消息單元中的數據都是來自于這些屬性。

2 批量消費消息

修改批量屬性

Consumer的MessageListenerConcurrently監聽接口的consumeMessage()方法的第一個參數為消息列 表,但默認情況下每次只能消費一條消息。若要使其一次可以消費多條消息,則可以通過改Consumer的consumeMessageBatchMaxSize屬性來指定。不過,該值不能超過32。因為默認情況下消 費者每次可以拉取的消息最多是32條。若要修改一次拉取的最大值,則可通過修改Consumer的pullBatchSize屬性來指定。

存在的問題

Consumer的pullBatchSize屬性與consumeMessageBatchMaxSize屬性是否設置的越大越好?當然不是。 pullBatchSize值設置的越大,Consumer每拉取一次需要的時間就會越長,且在網絡上傳輸出現 問題的可能性就越高。若在拉取過程中若出現了問題,那么本批次所有消息都需要全部重新拉 取。

,你consumeMessageBatchMaxSize值設置的越大,Consumer的消息并發消費能力越低,且這批被消費的消息具有相同的消費結果。因為consumeMessageBatchMaxSize指定的一批消息只會使用一 個線程進行處理,且在處理過程中只要有一個消息處理異常,則這批消息需要全部重新再次消費 處理。

責任編輯:武曉燕 來源: 今日頭條
相關推薦

2022-06-30 13:41:44

SQL 語句group by

2023-04-26 10:21:04

2024-09-18 07:00:00

消息隊列中間件消息隊列

2022-08-02 06:55:35

移動設備Android

2020-11-04 17:35:39

網絡安全漏洞技術

2022-12-09 19:00:02

Vite兼容性BigInt

2024-10-22 09:59:36

虛擬化容器化系統

2022-09-14 08:11:06

分頁模糊查詢

2024-02-19 08:01:59

服務微服務授權

2024-11-26 14:29:48

2023-09-01 07:38:45

ArrayListArrayst實線類

2022-01-09 23:20:50

手機國產蘋果

2023-12-07 07:08:09

Angular函數

2020-03-23 08:15:43

JavaScriptError對象函數

2023-07-04 08:48:24

靜態代碼分析工具

2022-03-13 18:53:31

interfacetypeTypeScript

2023-12-20 08:23:53

NIO組件非阻塞

2023-12-12 08:41:01

2024-04-30 09:02:48

2024-05-28 09:12:10

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 成人黄色av | 一区二区三区久久久 | 久草免费在线视频 | 欧美在线天堂 | 日韩在线大片 | 久久国产精品99久久久大便 | 国产精品久久久久久婷婷天堂 | 成人三级av | 成人国产免费视频 | 亚洲第1页 | 精品欧美一区二区在线观看欧美熟 | 日韩欧美国产电影 | av手机在线| 91精品国产乱码久久久久久久久 | 国产一区二区三区免费观看在线 | 男人的天堂中文字幕 | 国产精品一区二区三 | 怡红院怡春院一级毛片 | 91久久久久久 | 亚洲免费成人av | 精品国产欧美 | 亚洲国产成人精品女人 | 久久精品伊人 | 亚洲精品一二区 | 精品久久久久久久久亚洲 | 91色视频在线观看 | 黄频视频 | 国产精品一区在线观看你懂的 | 91视频在线看| 夜色www国产精品资源站 | 中文成人在线 | 久久久久久久97 | 又黄又色| 日韩毛片免费看 | av一二三区 | 免费欧美| 久久精品91久久久久久再现 | 成人在线视频一区二区三区 | 就操在线 | 亚洲成人精品久久久 | 久久久久久久久久久久久久av |