成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

RocketMQ結合源碼告訴你消息量大為啥不需要手動壓縮消息

開發 前端
RocketMQ在存儲的時候自己進行消息壓縮,consumer進行消息拉取的時候,broker進行消息解壓縮,然后推送給consumer,這種方式就是消耗broker?cpu,也不能節省網絡帶寬,只能節省存儲空間,所以很明顯是在client端進行壓縮比較好。

背景

最近同事發現線上發送的RocketMQ消息太大,同事為了節省網絡帶寬和存儲空間,手動壓縮消息然后再進行消息發送,發現磁盤也沒有明顯的縮減。

所以我打算結合源碼告訴他RocketMQ自帶的消息壓縮。

RocketMQ版本

  • 5.1.0

為什么需要壓縮消息

首先說一下為什么需要消息壓縮,原因其實很簡單。就是為了節省網絡帶寬和存儲空間。

在哪里壓縮消息

我們的消息壓縮可以在很多個地方進行。

有兩種方案

在client端進行壓縮

比如我們可以在Producer發送消息的時候進行消息壓縮。

然后將壓縮后的消息發送到Broker,broker只管存儲。

等到consumer需要消息的時候,原封不動的推送給消費者,由consumer自己進行解壓縮。

這種方式的好處是broker不需要關心消息的壓縮和解壓縮,只需要存儲消息即可。

在broker端進行壓縮

這種方式就是Producer發送消息的時候,不進行壓縮。

RocketMQ在存儲的時候自己進行消息壓縮,consumer進行消息拉取的時候,broker進行消息解壓縮,然后推送給consumer。

這種方式就是消耗broker cpu,也不能節省網絡帶寬,只能節省存儲空間。

所以很明顯是在client端進行壓縮比較好。

源碼分析

這里我們來具體結合源碼分析下:

在消息發送org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl方法中會對消息進行壓縮判斷。

圖片圖片

tryToCompressMessage 消息壓縮

什么消息會被壓縮呢?

private boolean tryToCompressMessage(final Message msg) {
        if (msg instanceof MessageBatch) {
            //batch does not support compressing right now
            return false;
        }
        byte[] body = msg.getBody();
        if (body != null) {
            if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
                try {
                    byte[] data = compressor.compress(body, compressLevel);
                    if (data != null) {
                        msg.setBody(data);
                        return true;
                    }
                } catch (IOException e) {
                    log.error("tryToCompressMessage exception", e);
                    log.warn(msg.toString());
                }
            }
        }

        return false;
    }
  • 批量消息不支持壓縮。
  • 消息體長度大于defaultMQProducer.getCompressMsgBodyOverHowmuch()的時候進行壓縮。默認1024 * 4 = 4kb。
  • 壓縮算法是什么呢?

RocketMQ目前提供三種壓縮算法

  • LZ4
  • ZSTD
  • ZLIB

默認壓縮算法為ZLIB。

private CompressionType compressType = CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB"));

壓縮等級為5。

private int compressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
  • 消息壓縮完后會通過sysFlag進行標記,表示消息進行了壓縮,方便后續解壓。
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    sysFlag |= compressType.getCompressionFlag();

消息解壓

消息解壓主要是在方法org.apache.rocketmq.common.message.MessageDecoder.decode(java.nio.ByteBuffer, boolean, boolean, boolean, boolean, boolean)中進行的。

在client拉取到消息成功后對PullResult對象進行處理執行decodesBatch方法。

圖片圖片

消息解析decodesBatch方法會調用org.apache.rocketmq.common.message.MessageDecoder.decode(java.nio.ByteBuffer, boolean, boolean, boolean, boolean, boolean)方法。

decode方法會對消息進行解壓。

圖片

總結

  • 消息壓縮主要是為了節省網絡帶寬和存儲空間。
  • RocketMQ提供了三種壓縮算法,分別是LZ4、ZSTD、ZLIB,默認為ZLIB。
  • 消息壓縮主要是在Producer發送消息的時候進行壓縮,broker只管存儲。
  • 消息解壓主要是在Consumer拉取消息的時候進行解壓。
  • RocketMQ消息壓縮僅支持單條消息壓縮,不支持批量消息壓縮。
  • 一般消息壓縮都會選擇在client端進行壓縮,這樣可以節省broker的cpu。
責任編輯:武曉燕 來源: 小奏技術
相關推薦

2012-08-23 09:50:07

測試測試人員軟件測試

2022-09-26 10:43:13

RocketMQ保存消息

2010-11-23 10:55:47

跳槽

2018-01-29 13:18:42

前端JavaScript

2011-09-02 09:45:39

交互設計Android

2021-05-26 10:19:01

jreJava應用程序

2021-05-07 15:18:26

比特幣禁令監管

2019-07-15 08:00:00

AI人工智能

2022-12-22 10:03:18

消息集成

2017-03-13 13:54:40

戴爾

2024-10-11 09:15:33

2024-04-09 09:08:09

Kafka消息架構

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2024-10-29 08:34:27

RocketMQ消息類型事務消息

2020-07-28 08:28:07

JavaScriptswitch開發

2022-06-07 17:01:31

UI框架前端

2022-04-21 08:01:34

React框架action

2009-11-23 12:45:22

2023-04-26 10:06:08

RocketMQ屬性Consumer

2019-07-17 06:17:01

UbuntuUbuntu LTSNvidia驅動
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 免费黄色大片 | 成人在线免费 | 欧美黄色精品 | 日本午夜在线视频 | 午夜免费成人 | 女人毛片a毛片久久人人 | 成人久久| 国产精品成人一区二区三区夜夜夜 | 99久久中文字幕三级久久日本 | 日本国产高清 | 一级黄色毛片免费 | 久久午夜视频 | 综合激情网 | 午夜成人免费视频 | 欧美一区二不卡视频 | 高清黄色毛片 | 亚洲国产精品福利 | 精品日韩一区二区 | 欧美在线a | 日本在线免费视频 | 久久久av| 亚洲综合二区 | 日韩亚洲一区二区 | 久久av影院 | 欧美精品一区二区在线观看 | 中文字幕乱码一区二区三区 | 国产一区二区在线看 | 在线看亚洲 | 国产精品1区2区3区 国产在线观看一区 | 亚洲国产成人精品女人久久久 | 亚洲91视频| 亚洲国产精品一区二区久久 | 99久久免费精品国产免费高清 | 99色综合| 福利视频一区 | 密桃av| 国产精品视频在线播放 | 午夜成人在线视频 | 性色网站 | 欧美日韩国产精品 | 久久久资源|