成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

消息的存儲-RocketMQ知識體系之三

存儲 存儲軟件
CommitLog是消息存儲文件,所有消息主題的消息都存儲在CommitLog文件中;該文件默認最大為1GB,超過1GB后會輪到下一個CommitLog文件。

[[409969]]

RocketMQ存儲概要設計

RocketMQ主要存儲的文件包括commitlog文件、consumeQueue文件、IndexFile文件。

CommitLog是消息存儲文件,所有消息主題的消息都存儲在CommitLog文件中;該文件默認最大為1GB,超過1GB后會輪到下一個CommitLog文件。通過CommitLog,RocketMQ將所有消息存儲在一起,以順序IO的方式寫入磁盤,充分利用了磁盤順序寫減少了IO爭用提高數據存儲的性能。

RocketMQ的Broker機器磁盤上的文件存儲結構

【CommitLog】

消息在CommitLog中的存儲格式如下:

存儲所有消息內容,寫滿一個文件后生成新的 commitlog 文件。所有 topic 的數據存儲在一起,邏輯視圖如下:

CommitLog代碼

  1. private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); 
  2.     /** 
  3.      * MAGIC_CODE - MESSAGE 
  4.      * Message's MAGIC CODE daa320a7 
  5.      * 標記某一段為消息,即:[msgId, MESSAGE_MAGIC_CODE, 消息] 
  6.      */ 
  7.     public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8; 
  8.     /** 
  9.      * MAGIC_CODE - BLANK 
  10.      * End of file empty MAGIC CODE cbd43194 
  11.      * 標記某一段為空白,即:[msgId, BLANK_MAGIC_CODE, 空白] 
  12.      * 當CommitLog無法容納消息時,使用該類型結尾 
  13.      */ 
  14.     private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8; 
  15.     /** 
  16.      * 映射文件隊列 
  17.      */ 
  18.     private final MappedFileQueue mappedFileQueue; 
  19.     /** 
  20.      * 消息存儲 
  21.      */ 
  22.     private final DefaultMessageStore defaultMessageStore; 
  23.     /** 
  24.      * flush commitLog 線程服務 
  25.      */ 
  26.     private final FlushCommitLogService flushCommitLogService; 
  27.     /** 
  28.      * If TransientStorePool enabled, we must flush message to FileChannel at fixed periods 
  29.      * commit commitLog 線程服務 
  30.      */ 
  31.     private final FlushCommitLogService commitLogService; 
  32.     /** 
  33.      * 寫入消息到Buffer Callback 
  34.      */ 
  35.     private final AppendMessageCallback appendMessageCallback; 
  36.     /** 
  37.      * topic消息隊列 與 offset 的Map 
  38.      */ 
  39.     private HashMap<String/* topic-queue_id */, Long/* offset */> topicQueueTable = new HashMap<>(1024); 
  40.     /** 
  41.      * TODO 
  42.      */ 
  43.     private volatile long confirmOffset = -1L; 
  44.     /** 
  45.      * 當前獲取lock時間。 
  46.      * 如果當前解鎖,則為0 
  47.      */ 
  48.     private volatile long beginTimeInLock = 0; 
  49.     /** 
  50.      * true: Can lock, false : in lock. 
  51.      * 添加消息 螺旋鎖(通過while循環實現) 
  52.      */ 
  53.     private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true); 
  54.     /** 
  55.      * 添加消息重入鎖 
  56.      */ 
  57.     private ReentrantLock putMessageNormalLock = new ReentrantLock(); // Non fair Sync 

【ConsumeQueue】

ConsumeQueue是消息消費隊列文件,消息達到commitlog文件后將被異步轉發到消息消費隊列,供消息消費者消費;一個ConsumeQueue表示一個topic的一個queue,類似于kafka的一個partition,但是rocketmq在消息存儲上與kafka有著非常大的不同,RocketMQ的ConsumeQueue中不存儲具體的消息,具體的消息由CommitLog存儲,ConsumeQueue中只存儲路由到該queue中的消息在CommitLog中的offset,消息的大小以及消息所屬的tag的hash(tagCode),一共只占20個字節,整個數據包如下:

ConsumeQueue代碼

  1. public static final int CQ_STORE_UNIT_SIZE = 20; 
  2.     private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); 
  3.     private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME); 
  4.  
  5.     private final DefaultMessageStore defaultMessageStore; 
  6.     /** 
  7.      * 映射文件隊列 
  8.      */ 
  9.     private final MappedFileQueue mappedFileQueue; 
  10.     /** 
  11.      * Topic 
  12.      */ 
  13.     private final String topic; 
  14.     /** 
  15.      * 隊列編號 
  16.      */ 
  17.     private final int queueId; 
  18.     /** 
  19.      * 消息位置信息ByteBuffer 
  20.      */ 
  21.     private final ByteBuffer byteBufferIndex; 
  22.     /** 
  23.      * 文件存儲地址 
  24.      */ 
  25.     private final String storePath; 
  26.     /** 
  27.      * 每個映射文件大小 
  28.      */ 
  29.     private final int mappedFileSize; 
  30.     /** 
  31.      * 最大重放消息commitLog存儲位置 
  32.      */ 
  33.     private long maxPhysicOffset = -1; 
  34.     private volatile long minLogicOffset = 0; 

Consume Queue文件組織,如圖所示:

Consume Queue文件組織示意圖

  • 根據topic和queueId來組織文件,圖中TopicA有兩個隊列0,1,那么TopicA和QueueId=0組成一個ConsumeQueue,TopicA和QueueId=1組成另一個ConsumeQueue。
  • 按照消費端的GroupName來分組重試隊列,如果消費端消費失敗,消息將被發往重試隊列中,比如圖中的%RETRY%ConsumerGroupA。
  • 按照消費端的GroupName來分組死信隊列,如果消費端消費失敗,并重試指定次數后,仍然失敗,則發往死信隊列,比如圖中的%DLQ%ConsumerGroupA。

死信隊列(Dead Letter Queue)一般用于存放由于某種原因無法傳遞的消息,比如處理失敗或者已經過期的消息。

【IndexFile】

IndexFile是消息索引文件,主要存儲的是key和offset的對應關系。

IndexFile(索引文件)提供了一種可以通過key或時間區間來查詢消息的方法。

文件名fileName是以創建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設計為在文件系統中實現HashMap結構,故rocketmq的索引文件其底層實現為hash索引。

  1. private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); 
  2.     private static int hashSlotSize = 4; 
  3.     private static int indexSize = 20; 
  4.     private static int invalidIndex = 0; 
  5.     private final int hashSlotNum; 
  6.     private final int indexNum; 
  7.     private final MappedFile mappedFile; 
  8.     private final FileChannel fileChannel; 
  9.     private final MappedByteBuffer mappedByteBuffer; 
  10.     private final IndexHeader indexHeader; 

IndexFile的存儲結構:

從上面的分析可以看出,RocketMQ采用的是混合型的存儲結構,即為Broker單個實例下所有的隊列共用一個日志數據文件(即為CommitLog)來存儲。RocketMQ的混合型存儲結構(多個Topic的消息實體內容都存儲于一個CommitLog中)針對Producer和Consumer分別采用了數據和索引部分相分離的存儲結構,Producer發送消息至Broker端,然后Broker端使用同步或者異步的方式對消息刷盤持久化,保存至CommitLog中。

只要消息被刷盤持久化至磁盤文件CommitLog中,那么Producer發送的消息就不會丟失。

正因為如此,Consumer也就肯定有機會去消費這條消息。當無法拉取到消息后,可以等下一次消息拉取,同時服務端也支持長輪詢模式,如果一個消息拉取請求未拉取到消息,Broker允許等待30s的時間,只要這段時間內有新消息到達,將直接返回給消費端。這里,RocketMQ的具體做法是,使用Broker端的后臺服務線程—ReputMessageService不停地分發請求并異步構建ConsumeQueue(邏輯消費隊列)和IndexFile(索引文件)數據。

【全局的角度來看消息的存儲】

【消息存儲流程】

Broker端收到消息后,將消息原始信息保存在CommitLog文件對應的MappedFile中,然后異步刷新到磁盤

ReputMessageServie線程異步的將CommitLog中MappedFile中的消息保存到ConsumerQueue和IndexFile中

ConsumerQueue和IndexFile只是原始文件的索引信息

內存映射和數據刷盤

【內存映射流程】

  • 內存映射文件MappedFile通過AllocateMappedFileService創建
  • MappedFile的創建是典型的生產者-消費者模型
  • MappedFileQueue調用getLastMappedFile獲取MappedFile時,將請求放入隊列中
  • AllocateMappedFileService線程持續監聽隊列,隊列有請求時,創建出MappedFile對象
  • 最后將MappedFile對象預熱,底層調用force方法和mlock方法。

【刷盤機制】

  1. 異步刷盤:消息被寫入內存的PAGECACHE,返回寫成功狀態,當內存里的消息量積累到一定程度時,統一觸發寫磁盤操作,快速寫入 。吞吐量高,當磁盤損壞時,會丟失消息
  2. 同步刷盤:消息寫入內存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執行完成后喚醒等待的線程,給應用返回消息寫成功的狀態。吞吐量低,但不會造成消息丟失。

【刷盤流程】

producer發送給broker的消息保存在MappedFile中,然后通過刷盤機制同步到磁盤中。

刷盤分為同步刷盤和異步刷盤。

異步刷盤后臺線程按一定時間間隔執行。

同步刷盤也是生產者-消費者模型。broker保存消息到MappedFile后,創建GroupCommitRequest請求放入列表,并阻塞等待。后臺線程從列表中獲取請求并刷新磁盤,成功刷盤后通知等待線程。

RocketMQ 文件存儲模型層次結構

文件存儲模型層次結構圖

RocketMQ文件存儲模型層次結構如上圖所示,根據類別和作用從概念模型上大致可以劃分為5層,下面將從各個層次分別進行分析和闡述:

RocketMQ業務處理器層:Broker端對消息進行讀取和寫入的業務邏輯入口,比如前置的檢查和校驗步驟、構造MessageExtBrokerInner對象、decode反序列化、構造Response返回對象等;

RocketMQ數據存儲組件層;該層主要是RocketMQ的存儲核心類—DefaultMessageStore,其為RocketMQ消息數據文件的訪問入口,通過該類的“putMessage()”和“getMessage()”方法完成對CommitLog消息存儲的日志數據文件進行讀寫操作(具體的讀寫訪問操作還是依賴下一層中CommitLog對象模型提供的方法);另外,在該組件初始化時候,還會啟動很多存儲相關的后臺服務線程,包括AllocateMappedFileService(MappedFile預分配服務線程)、ReputMessageService(回放存儲消息服務線程)、HAService(Broker主從同步高可用服務線程)、StoreStatsService(消息存儲統計服務線程)、IndexService(索引文件服務線程)等;

RocketMQ存儲邏輯對象層:該層主要包含了RocketMQ數據文件存儲直接相關的三個模型類IndexFile、ConsumerQueue和CommitLog。IndexFile為索引數據文件提供訪問服務,ConsumerQueue為邏輯消息隊列提供訪問服務,CommitLog則為消息存儲的日志數據文件提供訪問服務。這三個模型類也是構成了RocketMQ存儲層的整體結構(對于這三個模型類的深入分析將放在后續篇幅中);

封裝的文件內存映射層:RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel兩種方式完成數據文件的讀寫。其中,采用MappedByteBuffer這種內存映射磁盤文件的方式完成對大文件的讀寫,在RocketMQ中將該類封裝成MappedFile類。這里限制的問題在上面已經講過;對于每類大文件(IndexFile/ConsumerQueue/CommitLog),在存儲時分隔成多個固定大小的文件(單個IndexFile文件大小約為400M、單個ConsumerQueue文件大小約5.72M、單個CommitLog文件大小為1G),其中每個分隔文件的文件名為前面所有文件的字節大小數+1,即為文件的起始偏移量,從而實現了整個大文件的串聯。這里,每一種類的單個文件均由MappedFile類提供讀寫操作服務(其中,MappedFile類提供了順序寫/隨機讀、內存數據刷盤、內存清理等和文件相關的服務);

磁盤存儲層:主要指的是部署RocketMQ服務器所用的磁盤。這里,需要考慮不同磁盤類型(如SSD或者普通的HDD)特性以及磁盤的性能參數(如IOPS、吞吐量和訪問時延等指標)對順序寫/隨機讀操作帶來的影響;

文件存儲的高可用

【分布式存儲】

同一個topic 上的數據會分成多個queue 分布在不同的 broker 上,而且每個queue 都有副本機制。

【副本的主從同步(HA)】

RocketMQ 的主從同步機制如下:

1.首先啟動Master并在指定端口監聽;

2.客戶端啟動,主動連接Master,建立TCP連接;

3.客戶端以每隔5s的間隔時間向服務端拉取消息,如果是第一次拉取的話,先獲取本地commitlog文件中最大的偏移量,以該偏移量向服務端拉取消息;

4.服務端解析請求,并返回一批數據給客戶端;

5.客戶端收到一批消息后,將消息寫入本地commitlog文件中,然后向Master匯報拉取進度,并更新下一次待拉取偏移量;

6.然后重復第3步;

文件存儲的優化技術

RocketMQ存儲層采用的幾項優化技術方案在一定程度上可以減少PageCache的缺點帶來的影響,主要包括內存預分配,文件預熱和mlock系統調用。

【預先分配MappedFile】

在消息寫入過程中(調用CommitLog的putMessage()方法),CommitLog會先從MappedFileQueue隊列中獲取一個 MappedFile,如果沒有就新建一個。

RocketMQ中預分配MappedFile的設計非常巧妙,下次獲取時候直接返回就可以不用等待MappedFile創建分配所產生的時間延遲。

【文件預熱&&mlock系統調用】

(1)mlock系統調用:其可以將進程使用的部分或者全部的地址空間鎖定在物理內存中,防止其被交換到swap空間。對于RocketMQ這種的高吞吐量的分布式消息隊列來說,追求的是消息讀寫低延遲,那么肯定希望盡可能地多使用物理內存,提高數據讀寫訪問的操作效率。

(2)文件預熱:預熱的目的主要有兩點;第一點,由于僅分配內存并進行mlock系統調用后并不會為程序完全鎖定這些內存,因為其中的分頁可能是寫時復制的。因此,就有必要對每個內存頁面中寫入一個假的值。其中,RocketMQ是在創建并分配MappedFile的過程中,預先寫入一些隨機值至Mmap映射出的內存空間里。第二,調用Mmap進行內存映射后,OS只是建立虛擬內存地址至物理地址的映射表,而實際并沒有加載任何文件至內存中。程序要訪問數據時OS會檢查該部分的分頁是否已經在內存中,如果不在,則發出一次缺頁中斷。這里,可以想象下1G的CommitLog需要發生多少次缺頁中斷,才能使得對應的數據才能完全加載至物理內存中(ps:X86的Linux中一個標準頁面大小是4KB)?RocketMQ的做法是,在做Mmap內存映射的同時進行madvise系統調用,目的是使OS做一次內存映射后對應的文件數據盡可能多的預加載至內存中,從而達到內存預熱的效果。

本文轉載自微信公眾號「小汪哥寫代碼」,可以通過以下二維碼關注。轉載本文請聯系小汪哥寫代碼公眾號。

 

責任編輯:武曉燕 來源: 小汪哥寫代碼
相關推薦

2021-07-08 07:16:24

RocketMQ數據結構Message

2021-07-13 11:52:47

順序消息RocketMQkafka

2021-07-14 17:18:14

RocketMQ消息分布式

2021-07-09 07:15:48

RocketMQ數據結構kafka

2021-07-16 18:44:42

RocketMQ知識

2021-07-12 10:25:03

RocketMQ數據結構kafka

2021-07-07 07:06:31

Brokerkafka架構

2015-07-28 17:52:36

IOS知識體系

2017-02-27 16:42:23

Spark識體系

2017-04-03 15:35:13

知識體系架構

2017-06-22 13:07:21

2012-03-08 11:13:23

企業架構

2015-07-16 10:15:44

web前端知識體系

2020-10-26 08:34:18

知識體系普適性

2020-09-09 09:15:58

Nginx體系進程

2020-03-09 10:31:58

vue前端開發

2021-07-05 06:26:08

生產者kafka架構

2017-07-25 17:34:54

大數據機器學習數據

2017-08-30 17:30:43

大數據數據化運營

2011-08-18 17:20:43

梭子魚知識體系
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 欧美日韩精品久久久免费观看 | 欧美日韩精品一区二区三区四区 | 久久aⅴ乱码一区二区三区 亚洲欧美综合精品另类天天更新 | 一级黄色片网址 | 一区视频 | 电影91久久久 | 国产精品一区二区免费看 | 美女福利网站 | 免费在线视频一区二区 | 欧美性生活网 | 91天堂网 | 综合精品 | 在线视频一区二区三区 | 成年人在线视频 | 中文字幕伊人 | 精品欧美乱码久久久久久 | 国产在线观看一区二区 | 国产亚洲精品一区二区三区 | 日本成人午夜影院 | 日韩欧美国产一区二区三区 | 一区二区三区四区在线 | 国产99久久精品一区二区永久免费 | 久草免费福利 | 久久久久久91 | 艹逼网| 91亚洲精品在线 | 国产精品亚洲成在人线 | 激情六月丁香 | 午夜小视频在线播放 | 亚洲人人| av三级| 成人影视网址 | 欧美日韩亚洲视频 | 日韩精品久久一区二区三区 | 蜜桃精品视频在线 | 亚洲欧美日韩精品久久亚洲区 | 蜜臀久久99精品久久久久野外 | 久久久青草婷婷精品综合日韩 | 国产麻豆一区二区三区 | 在线看片网站 | 精品国产视频 |