成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

RocketMQ 如何保證發(fā)送消息不丟失?

開(kāi)發(fā)
本文分析了 RocketMQ 同步發(fā)送、異步發(fā)送和單向發(fā)送三種方式的原理、優(yōu)缺點(diǎn)以及使用場(chǎng)景,并且分析了每種方式涉及到的核心源碼。

在 RocketMQ 中,有 3種簡(jiǎn)單的消息發(fā)送方式:同步發(fā)送、異步發(fā)送和單向發(fā)送。這篇文章,我們將詳細(xì)分析這三種發(fā)送方式的原理、優(yōu)缺點(diǎn)、使用場(chǎng)景以及使用該方式是否會(huì)丟失數(shù)據(jù)。

本文源碼基于: Apache RocketMQ release-5.2.0

一、同步發(fā)送

1.原理分析

在同步發(fā)送模式下,RocketMQ 默認(rèn)采用同步刷盤(pán)方式,當(dāng)生產(chǎn)者將消息發(fā)送到 Broker 后,會(huì)等待 Broker 的響應(yīng)(默認(rèn)超時(shí) 5分鐘),Broker 接收消息后,會(huì)將其寫(xiě)入內(nèi)存緩存,并進(jìn)行刷盤(pán)操作。因此,如果 Broker 響應(yīng)成功,代表消息一定成功寫(xiě)入磁盤(pán)。

同步發(fā)送主要涉及以下幾個(gè)步驟:

  • 創(chuàng)建Producer:創(chuàng)建一個(gè)Producer對(duì)象;
  • 創(chuàng)建消息:創(chuàng)建一個(gè)Message對(duì)象,設(shè)置Topic、Tag標(biāo)簽和消息體;
  • 發(fā)送消息:調(diào)用DefaultMQProducer的send方法;
  • 等待響應(yīng):發(fā)送方會(huì)阻塞等待服務(wù)器的響應(yīng),直到收到確認(rèn)消息;

如下示例代碼為一個(gè)完整的同步發(fā)送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class SyncProducerTest {
  public static void main(String[] args) throws Exception {
    // 1、創(chuàng)建 producer,設(shè)置組名為 SyncGroupTest
    DefaultMQProducer producer = new DefaultMQProducer("SyncGroup");
    // 2、指定 NameServer的地址,以獲取 Broker路由地址
    producer.setNamesrvAddr("x.x.x.x:9876");
    // 3、啟動(dòng) producer
    producer.start();
    // 4、創(chuàng)建消息,并指定 Topic,Tag和消息體
    Message msg = new Message("SyncTopic", "sync", "SyncMessage".getBytes("UTF-8"));
    // 5、發(fā)送同步消息
    SendResult sendResult = producer.send(msg);
    // 6、通過(guò) sendResult 判斷消息是否成功送達(dá)
    System.out.printf("message send result:" + sendResult);
    // 7、關(guān)閉 Producer
    producer.shutdown();
  }
}

RocketMQ 的同步發(fā)送主要涉及以下幾個(gè)關(guān)鍵源碼類(lèi)和方法:

  • DefaultMQProducer:生產(chǎn)者類(lèi),負(fù)責(zé)發(fā)送消息。
  • MQClientAPIImpl#sendMessage:底層消息發(fā)送實(shí)現(xiàn)。
  • NettyRemotingClient#invokeSync:通過(guò) Netty 實(shí)現(xiàn)網(wǎng)絡(luò)通信。
  • Broker 端的 SendMessageProcessor:處理發(fā)送請(qǐng)求。

源碼參考:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message msg)

2.優(yōu)缺點(diǎn)

優(yōu)點(diǎn):

  • 簡(jiǎn)單易用。
  • 可靠性高,發(fā)送方可以確認(rèn)消息是否成功發(fā)送,一旦發(fā)送成功,消息就已經(jīng)寫(xiě)入磁盤(pán),消息不會(huì)丟失。

缺點(diǎn):

  • 延遲較高,需要等待服務(wù)器的響應(yīng)。
  • 吞吐量可能受限于網(wǎng)絡(luò)延遲和服務(wù)器性能。

3.使用場(chǎng)景

適用于對(duì)消息可靠性要求較高的場(chǎng)景,如訂單系統(tǒng)、金融交易、重要的消息通知等。

二、異步發(fā)送

1.原理分析

在異步發(fā)送模式下,RocketMQ 默認(rèn)采用異步刷盤(pán)方式,當(dāng)生產(chǎn)者發(fā)送消息到 Broker 后,消息寫(xiě)入內(nèi)存緩存成功后,Broker 立即返回響應(yīng)(默認(rèn)超時(shí) 5分鐘),后臺(tái)線(xiàn)程再異步將消息批量寫(xiě)入磁盤(pán)。因此,這種方式提高了系統(tǒng)的吞吐量和性能,但在系統(tǒng)崩潰時(shí)可能會(huì)丟失部分未刷盤(pán)的消息。

異步發(fā)送主要涉及以下幾個(gè)步驟:

  • 創(chuàng)建Producer:創(chuàng)建一個(gè)Producer對(duì)象;
  • 創(chuàng)建消息:同樣創(chuàng)建一個(gè)Message對(duì)象。
  • 發(fā)送消息:調(diào)用DefaultMQProducer的send方法,傳遞一個(gè)SendCallback回調(diào)對(duì)象。
  • 處理響應(yīng):回調(diào)函數(shù)會(huì)在消息發(fā)送成功或失敗時(shí)被調(diào)用。

如下示例代碼為一個(gè)完整的異步發(fā)送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class AsyncProducerTest {
  public static void main(String[] args) throws Exception {
    // 1、創(chuàng)建 producer,設(shè)置組名為 AsyncGroupTest
    DefaultMQProducer producer = new DefaultMQProducer("AsyncGroup");
    // 2、指定 NameServer的地址,以獲取 Broker路由地址
    producer.setNamesrvAddr("x.x.x.x:9876");
    // 3、啟動(dòng) producer
    producer.start();
    // 4、創(chuàng)建消息,并指定Topic,Tag和消息體
    Message msg = new Message("AsyncTopic","async", "AsyncMessage".getBytes("UTF-8"));
    // 5、發(fā)送異步消息,SendCallback是處理異步回調(diào)的方法
    producer.send(msg, new SendCallback() {
      @Override
      public void onSuccess(SendResult sendResult) {  // 成功回調(diào)
        System.out.println("message send success: " + sendResult);
      }
      @Override
      public void onException(Throwable throwable) {  // 失敗回調(diào)
        System.out.println("message send fail: " + throwable);
      }
    });
    // 6、關(guān)閉 Producer
    producer.shutdown();
  }
}

RocketMQ 的異步發(fā)送主要涉及以下幾個(gè)關(guān)鍵源碼類(lèi)和方法:

  • DefaultMQProducer:生產(chǎn)者類(lèi),負(fù)責(zé)發(fā)送消息。
  • MQClientAPIImpl#sendMessage:底層消息發(fā)送實(shí)現(xiàn)。
  • NettyRemotingClient#invokeAsync:通過(guò) Netty 實(shí)現(xiàn)網(wǎng)絡(luò)通信。
  • Broker 端的 SendMessageProcessor:處理發(fā)送請(qǐng)求。

源碼參考:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message msg, SendCallback sendCallback)

2.優(yōu)缺點(diǎn)

優(yōu)點(diǎn):

  • 非阻塞,發(fā)送方可以繼續(xù)執(zhí)行其他任務(wù),提高吞吐量。
  • 延遲較低,適用于對(duì)響應(yīng)時(shí)間敏感的場(chǎng)景。

缺點(diǎn):

  • 實(shí)現(xiàn)復(fù)雜度較高,需要處理異步回調(diào)。
  • 可靠性相對(duì)降低,需要處理失敗重試等問(wèn)題。
  • 無(wú)法保證發(fā)送出去的數(shù)據(jù)不丟失。

3.使用場(chǎng)景

適用于對(duì)響應(yīng)時(shí)間要求較高的場(chǎng)景,如實(shí)時(shí)數(shù)據(jù)處理、日志采集、消費(fèi)信息的推送等。

三、單向發(fā)送

1.原理分析

單向(OneWay)發(fā)送是一種只負(fù)責(zé)發(fā)送消息而不等待任何響應(yīng)的方式。生產(chǎn)者將消息發(fā)送到 Broker 后(默認(rèn)超時(shí) 5分鐘),不關(guān)心消息是否成功到達(dá)或被持久化,主要依賴(lài) Broker 進(jìn)行刷盤(pán)操作,單向發(fā)送通常與異步刷盤(pán)結(jié)合使用,以提高發(fā)送效率。

單向發(fā)送主要涉及以下幾個(gè)步驟:

  • 創(chuàng)建Producer:創(chuàng)建一個(gè)Producer對(duì)象;
  • 創(chuàng)建消息:創(chuàng)建一個(gè)Message對(duì)象。
  • 發(fā)送消息:調(diào)用DefaultMQProducer的sendOneway方法。

如下示例代碼為一個(gè)完整的單向發(fā)送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class OneWayProducerTest {
  public static void main(String[] args) throws Exception {
    // 1、創(chuàng)建 producer,設(shè)置組名為 OneWayGroupTest
    DefaultMQProducer producer = new DefaultMQProducer("OneWayGroup");
    // 2、指定 NameServer的地址,以獲取 Broker路由地址
    producer.setNamesrvAddr("x.x.x.x:9876");
    // 3、啟動(dòng) producer
    producer.start();
    // 4、創(chuàng)建消息,并指定Topic,Tag和消息體
    Message msg = new Message("OneWayTopic","oneway", "OneWayMessage".getBytes("UTF-8"));
    // 5、發(fā)送單向消息
    producer.sendOneway(msg);
    // 6、關(guān)閉 Producer
    producer.shutdown();
  }
}

RocketMQ 的單向發(fā)送主要涉及以下幾個(gè)關(guān)鍵類(lèi)和方法:

  • DefaultMQProducer:生產(chǎn)者類(lèi),負(fù)責(zé)發(fā)送消息。
  • MQClientAPIImpl#sendMessage:底層消息發(fā)送實(shí)現(xiàn)。
  • NettyRemotingClient#invokeOneway:通過(guò) Netty 實(shí)現(xiàn)網(wǎng)絡(luò)通信。
  • Broker 端的 SendMessageProcessor:處理發(fā)送請(qǐng)求。

源碼參考:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendOneway(Message msg)

2.優(yōu)缺點(diǎn)

優(yōu)點(diǎn):

  • 非常高效,延遲最低。
  • 適用于對(duì)可靠性要求不高的場(chǎng)景。

缺點(diǎn):

  • 無(wú)法確認(rèn)消息是否成功發(fā)送。
  • 可靠性最低,消息可能丟失。

3.使用場(chǎng)景

適用于對(duì)可靠性要求不高的場(chǎng)景,如日志收集、監(jiān)控?cái)?shù)據(jù)上報(bào)等。

三種方式對(duì)比

發(fā)送方式

優(yōu)點(diǎn)

缺點(diǎn)

使用場(chǎng)景

同步發(fā)送

可靠性高,簡(jiǎn)單易用

延遲較高,吞吐量受限

訂單系統(tǒng)、金融交易、重要的消息通知等

異步發(fā)送

非阻塞,延遲較低

實(shí)現(xiàn)復(fù)雜度高,可靠性相對(duì)降低

實(shí)時(shí)數(shù)據(jù)處理、日志采集、消費(fèi)信息的推送等

單向發(fā)送

高效,延遲最低

無(wú)法確認(rèn)消息是否成功發(fā)送,可靠性最低

日志收集、監(jiān)控?cái)?shù)據(jù)上報(bào)等

如何選擇?

  • 同步發(fā)送:消息發(fā)送后會(huì)等待服務(wù)器的響應(yīng),整個(gè)過(guò)程業(yè)務(wù)是阻塞等待的,適用于對(duì)可靠性要求高的場(chǎng)景,比如 訂單系統(tǒng)、金融交易等。
  • 異步發(fā)送:消息發(fā)送后,不等待服務(wù)器響應(yīng),而是通過(guò)回調(diào)函數(shù)處理響應(yīng),適用于對(duì)響應(yīng)時(shí)間要求高的場(chǎng)景,比如實(shí)時(shí)數(shù)據(jù)處理、日志采集、消費(fèi)信息的推送等
  • 單向發(fā)送::?jiǎn)蜗虬l(fā)送只負(fù)責(zé)發(fā)送消息而不等待任何響應(yīng)的方式,也不需要對(duì)發(fā)送的狀態(tài)、結(jié)果負(fù)責(zé),適用于對(duì)可靠性要求不高的場(chǎng)景,比如日志收集、監(jiān)控?cái)?shù)據(jù)上報(bào)等。

每種發(fā)送方式都有其適用的場(chǎng)景和優(yōu)缺點(diǎn),具體如何選擇,一定需要根據(jù)業(yè)務(wù)需求進(jìn)行權(quán)衡。

總結(jié)

本文分析了 RocketMQ 同步發(fā)送、異步發(fā)送和單向發(fā)送三種方式的原理、優(yōu)缺點(diǎn)以及使用場(chǎng)景,并且分析了每種方式涉及到的核心源碼。

通過(guò)上文的介紹可以知道同步發(fā)送方式可以保證消息發(fā)送時(shí)不丟,但是性能相對(duì)其他兩種方式差一些。

RocketMQ 是一款優(yōu)秀的開(kāi)源消息中間件,作為 Java程序員,建議多去閱讀它的源碼,吸收其中比較好的代碼思維。

責(zé)任編輯:趙寧寧 來(lái)源: 猿java
相關(guān)推薦

2023-09-13 08:14:57

RocketMQ次數(shù)機(jī)制

2021-10-22 08:37:13

消息不丟失rocketmq消息隊(duì)列

2021-03-08 10:19:59

MQ消息磁盤(pán)

2022-03-31 08:26:44

RocketMQ消息排查

2024-06-18 08:26:22

2024-02-26 08:10:00

Redis數(shù)據(jù)數(shù)據(jù)庫(kù)

2024-11-11 07:05:00

Redis哨兵模式主從復(fù)制

2021-02-02 11:01:31

RocketMQ消息分布式

2021-08-04 07:47:18

Kafka消息框架

2021-09-13 07:23:53

KafkaGo語(yǔ)言

2021-04-27 07:52:18

RocketMQ消息投遞

2023-11-27 13:18:00

Redis數(shù)據(jù)不丟失

2022-08-26 05:24:04

中間件技術(shù)Kafka

2024-01-16 08:24:59

消息隊(duì)列KafkaRocketMQ

2024-02-23 14:53:10

Redis持久化

2019-03-13 09:27:57

宕機(jī)Kafka數(shù)據(jù)

2020-10-26 09:19:11

線(xiàn)程池消息

2021-01-12 08:03:19

Redis數(shù)據(jù)系統(tǒng)

2021-03-04 06:49:53

RocketMQ事務(wù)

2024-08-30 08:23:06

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 特级黄一级播放 | 国产精品成av人在线视午夜片 | 亚洲一区二区在线免费观看 | 毛片久久久 | 成年人免费网站 | 久久9999久久| 亚洲精彩视频在线观看 | 五月综合激情在线 | 久久久久久国产一区二区三区 | 久久久久久久久国产成人免费 | 久久久久久网站 | 国产精品久久久久久久久久久免费看 | 久久综合影院 | 欧美日韩三级视频 | 精品综合在线 | 美女天天干天天操 | 国产黄色在线观看 | 美国黄色毛片 | 国产最好的av国产大片 | 夜夜骚视频 | 成人久久 | 婷婷综合 | 国产乱码精品一区二三赶尸艳谈 | 亚洲精品一区国产精品 | 欧美日韩在线综合 | 国产精品欧美一区二区 | 一区二区三区欧美 | 中国一级毛片免费 | 久色视频在线 | 91精品国产91久久久久久吃药 | 日韩精品无码一区二区三区 | 亚洲精品国产第一综合99久久 | aⅴ色国产 欧美 | 日韩午夜在线播放 | 电影在线 | 国产精品久久国产精品 | 亚洲码欧美码一区二区三区 | 中文字幕一区二区三区四区五区 | 精品国产一区二区三区久久 | 干干干操操操 | 男女深夜网站 |