成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

消息隊(duì)列批量收發(fā)消息,請(qǐng)避開(kāi)這五個(gè)坑!

開(kāi)發(fā) 架構(gòu)
使用批量消息,在一定程度上可以提高性能和吞吐量,但是確實(shí)也會(huì)存在一些問(wèn)題,使用的時(shí)候要結(jié)合業(yè)務(wù)場(chǎng)景避開(kāi)這些坑。

大家好,我是君哥。

使用消息隊(duì)列時(shí),為了提高生產(chǎn)和消費(fèi)的性能,有時(shí)會(huì)開(kāi)啟批量處理。

在生產(chǎn)端,生產(chǎn)者發(fā)送的消息先發(fā)送到一個(gè)消息列表,積累到一定的消息量之后再批量發(fā)送給 Broker,如下圖:

在消費(fèi)端,消費(fèi)者拉取消息后先不立即處理,而是把消息轉(zhuǎn)存到一個(gè)內(nèi)存隊(duì)列或數(shù)據(jù)庫(kù),由業(yè)務(wù)線程去處理,如下圖:

無(wú)論是生產(chǎn)者做批量發(fā)送,還是消費(fèi)者做批量處理,都需要考慮使用批量消息的業(yè)務(wù)場(chǎng)景,避免踩坑。下面看一下批量操作可能會(huì)遇到哪些坑。

批量大小

當(dāng)生產(chǎn)者采用批量發(fā)送的方式來(lái)提高發(fā)送性能時(shí),一定要考慮發(fā)送消息的批量大小。下面是 RocketMQ 批量發(fā)送的官方示例:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
    producer.send(messages);
} catch (Exception e) {
    e.printStackTrace();
    //handle the error
}

RocketMQ 默認(rèn)消息大小是 4M,由 maxMessageSize 參數(shù)控制,如果批量消息大小超過(guò) maxMessageSize,則會(huì)拋出異常。

如果遇到消息大小超過(guò) maxMessageSize 的情況時(shí),可以用下面方法進(jìn)行處理:

  • 把這個(gè)參數(shù)改大,但需要考慮 Broker 的性能和網(wǎng)絡(luò)帶寬;
  • 將消息進(jìn)行拆分后分批發(fā)送;
  • 對(duì)消息進(jìn)行壓縮處理。

RabbitMQ 相關(guān)的 API 則提供了更加靈活的批量控制,對(duì)消息數(shù)量和消息大小都做了控制,下面看一下源碼:

冪等

消費(fèi)端可以批量拉取消息進(jìn)行消費(fèi),這樣可以減少拉取消息時(shí)的 RPC 次數(shù),提升消費(fèi)性能。比如在 RocketMQ 中,可以通過(guò) Consumer 中的 pullBatchSize 來(lái)設(shè)置一次拉取的消息數(shù)量,通過(guò) consumeMessageBatchMaxSize 參數(shù)來(lái)設(shè)置一次消費(fèi)的消息數(shù)量。

但需要注意的是,如果批量消息中一條消息消費(fèi)失敗了,這一批消息都需要進(jìn)行重試,已經(jīng)消費(fèi)成功的消息會(huì)被重復(fù)消費(fèi),帶來(lái)業(yè)務(wù)問(wèn)題。

為了不對(duì)業(yè)務(wù)造成影響,必須考慮冪等。一個(gè)簡(jiǎn)單的方法是在消息中增加全局唯一 id 屬性,對(duì)消息消費(fèi)結(jié)果進(jìn)行記錄,消費(fèi)成功后保存 id。這樣在消費(fèi)消息之前先查詢(xún)是否存在消費(fèi)成功的記錄,如果存在則直接返回處理成功。

時(shí)延

在使用消息隊(duì)列進(jìn)行批量操作時(shí),必須要考慮到時(shí)延問(wèn)題。比如我們?cè)O(shè)置一個(gè)批次 100 條消息,積累夠 100 條消息后再發(fā)送,在消息量小的情況下,可能積累夠 100 條消息會(huì)很長(zhǎng)時(shí)間,導(dǎo)致消費(fèi)端拉取到一條消息時(shí)延很大。

雖然消息隊(duì)列的一個(gè)重要作用是削峰填谷,但在一些場(chǎng)景下,對(duì)消息的實(shí)時(shí)性也有要求。比如在車(chē)聯(lián)網(wǎng)的充電場(chǎng)景,車(chē)聯(lián)網(wǎng)平臺(tái)需要實(shí)時(shí)感知充電樁的狀態(tài),如果充電樁積累夠一批消息再上報(bào)平臺(tái),平臺(tái)獲取到的狀態(tài)會(huì)不準(zhǔn)確,如果心跳消息延時(shí)太久,平臺(tái)會(huì)認(rèn)為充電樁離線。

對(duì)于有時(shí)延要求又需要批量操作的場(chǎng)景,可以設(shè)置一個(gè)超時(shí)時(shí)間,超時(shí)后即使消息數(shù)量不夠,也會(huì)發(fā)送出去??聪?RabbitMQ 的處理:

public synchronized void send(String exchange, String routingKey, Message message, CorrelationData correlationData)
  throws AmqpException {
 if (correlationData != null) {
  //...
  super.send(exchange, routingKey, message, correlationData);
 }
 else {
  if (this.scheduledTask != null) {
   this.scheduledTask.cancel(false);
  }
  MessageBatch batch = this.batchingStrategy.addToBatch(exchange, routingKey, message);
  if (batch != null) {
   super.send(batch.getExchange(), batch.getRoutingKey(), batch.getMessage(), null);
  }
  //這里獲取到超時(shí)時(shí)間,到達(dá)超時(shí)時(shí)間后使用定時(shí)器將消息發(fā)送出去
  Date next = this.batchingStrategy.nextRelease();
  if (next != null) {
   this.scheduledTask = this.scheduler.schedule((Runnable) () -> releaseBatches(), next);
  }
 }
}

可靠性

使用批處理一定要考慮可靠性的問(wèn)題。

在消費(fèi)端,消費(fèi)者批量拉取一批消息后把消息暫存到一個(gè)內(nèi)存臨時(shí)隊(duì)列,然后多線程去臨時(shí)隊(duì)列消費(fèi)消息,如果服務(wù)宕機(jī),臨時(shí)隊(duì)列中的消息會(huì)丟失。

為了避免宕機(jī)引發(fā)的損失,可以拉取一批消息后保存到數(shù)據(jù)庫(kù),然后給 Broker 返回 ACK,之后業(yè)務(wù)代碼去數(shù)據(jù)庫(kù)查詢(xún)消息并消費(fèi),不過(guò)要考慮數(shù)據(jù)庫(kù)大事務(wù)、鎖競(jìng)爭(zhēng)等問(wèn)題。

當(dāng)然,對(duì)于一些消息丟失不敏感的場(chǎng)景,比如日志收集之類(lèi)的,可靠性這個(gè)指標(biāo)是不用太關(guān)注的。

特殊場(chǎng)景

因?yàn)榕肯⒂幸恍?fù)雜性,消息隊(duì)列的部分特性不支持。

事務(wù)消息

批量消息會(huì)增加消息重試的難度,所以對(duì)于事務(wù)消息,建議使用單條消息,一條消息對(duì)應(yīng)一個(gè)事務(wù)。

順序消息

順序消息的實(shí)現(xiàn)思路一般是生產(chǎn)者將消息發(fā)送到同一個(gè)分區(qū),消費(fèi)者綁定這個(gè)分區(qū)并使用單線程消費(fèi)這個(gè)分區(qū)的消息。如果對(duì)同一個(gè) Topic 下的同一個(gè)分區(qū)來(lái)實(shí)現(xiàn)批量發(fā)送,難度會(huì)增大。所以建議順序消息使用單條消息進(jìn)行發(fā)送。

延時(shí)消息

如果延時(shí)消息使用批量進(jìn)行發(fā)送,這一批消息的延時(shí)時(shí)間必須相同,同時(shí)要考慮批量消息的超時(shí)時(shí)間,超時(shí)時(shí)間太大會(huì)影響延時(shí)時(shí)間的準(zhǔn)確性,生產(chǎn)端實(shí)現(xiàn)復(fù)雜度大大增加。

總結(jié)

使用批量消息,在一定程度上可以提高性能和吞吐量,但是確實(shí)也會(huì)存在一些問(wèn)題,使用的時(shí)候要結(jié)合業(yè)務(wù)場(chǎng)景避開(kāi)這些坑。

責(zé)任編輯:姜華 來(lái)源: 君哥聊技術(shù)
相關(guān)推薦

2022-07-26 20:00:35

場(chǎng)景RabbitMQMQ

2020-09-14 11:50:21

SpringBootRabbitMQJava

2017-07-28 09:30:55

2017-10-11 15:08:28

消息隊(duì)列常見(jiàn)

2025-03-28 10:06:01

架構(gòu)輪詢(xún)延時(shí)

2023-09-26 08:20:12

消息隊(duì)列RabbitMQ

2022-08-22 08:45:57

Kafka網(wǎng)絡(luò)層源碼實(shí)現(xiàn)

2020-10-09 15:00:56

實(shí)時(shí)消息編程語(yǔ)言

2025-03-28 12:20:00

代碼C#異步編程

2020-10-10 12:46:17

編程指南誤區(qū)

2015-08-12 10:10:21

2016-08-24 15:43:01

2019-11-19 08:35:09

數(shù)據(jù)數(shù)據(jù)準(zhǔn)備自動(dòng)化

2019-07-19 07:56:13

消息隊(duì)列消息代理消息中間件

2017-02-27 14:25:50

Java隊(duì)列Web

2010-04-21 12:39:48

Unix 消息隊(duì)列

2009-12-07 09:23:05

2022-04-12 11:15:31

Redis消息隊(duì)列數(shù)據(jù)庫(kù)

2021-02-19 09:19:11

消息隊(duì)列場(chǎng)景

2009-11-09 11:15:06

WCF消息隊(duì)列
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 欧美va大片 | 亚洲国产一区二区视频 | 精品免费 | 天堂精品视频 | 91婷婷韩国欧美一区二区 | 99这里只有精品视频 | 亚洲国产精品一区二区久久 | 成人在线网 | 日韩视频三区 | 日本xx视频免费观看 | 欧美福利久久 | 天天操 天天操 | 蜜桃黄网 | 日韩精品一区二区三区在线观看 | 羞羞网站免费观看 | 久久综合欧美 | 国产精品久久久久久久久久不蜜臀 | 久久久99精品免费观看 | 一区中文字幕 | 国产成人一区二区 | 欧美综合国产精品久久丁香 | 成人免费视频 | 欧美中文在线 | 毛片免费观看 | 亚洲精品播放 | 久久久久久久久久久福利观看 | 国产精品91视频 | 亚洲精品久久久久久久久久久久久 | 九一精品| 亚洲高清一区二区三区 | 手机看黄av免费网址 | 国产一区二区黑人欧美xxxx | 欧美精品一区二区在线观看 | 国产精品亚洲二区 | 成人免费视频观看视频 | 精品国产乱码久久久久久果冻传媒 | 一区二区在线不卡 | 97人人草| 青青草av网站 | 伊人操| 亚洲一区久久 |