成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Kafka 如何解決消息不丟失?

開發 架構 Kafka
Kafka 消息框架,大家一定不陌生,很多人工作中都有接觸。它的核心思路,通過一個高性能的MQ服務來連接生產和消費兩個系統,達到系統間的解耦,有很強的擴展性。

[[415220]]

本文轉載自微信公眾號「微觀技術」,作者微觀技術。轉載本文請聯系微觀技術公眾號。

大家好,我是Tom哥~

Kafka 消息框架,大家一定不陌生,很多人工作中都有接觸。它的核心思路,通過一個高性能的MQ服務來連接生產和消費兩個系統,達到系統間的解耦,有很強的擴展性。

你可能會有疑問,如果中間某一個環節斷掉了,那怎么辦?

這種情況,我們稱之為消息丟失,會造成系統間的數據不一致。

那如何解決這個問題?需要從生產端、MQ服務端、消費端,三個維度來處理。

1、生產端

生產端的職責就是,確保生產的消息能到達MQ服務端,這里我們需要有一個響應來判斷本次的操作是否成功。

  1. Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) 

比如,上面的代碼就是通過一個Callback函數,來判斷消息是否發送成功,如果失敗,我們需要補償處理。

另外,為了提升發送時的靈活性,kafka提供了多種參數,供不同業務自己選擇

1.1 參數 acks

該參數表示有多少個分區副本收到消息,才認為本次發送是成功的。

acks=0,只要發送消息就認為成功,生產端不等待服務器節點的響應

acks=1,表示生產者收到 leader 分區的響應就認為發送成功

acks=-1,只有當 ISR 中的副本全部收到消息時,生產端才會認為是成功的。這種配置是最安全的,但由于同步的節點較多,吞吐量會降低。

1.2 參數 retries

表示生產端的重試次數,如果重試次數用完后,還是失敗,會將消息臨時存儲在本地磁盤,待服務恢復后再重新發送。建議值 retries=3

1.3 參數 retry.backoff.m

消息發送超時或失敗后,間隔的重試時間。一般推薦的設置時間是 300 毫秒。

這里要特別注意一種特殊情況,如果MQ服務沒有正常響應,不一定代表消息發送失敗,也有可能是響應時正好趕上網絡抖動,響應超時。

當生產端做完這些,一定能保證消息發送成功了,但可能發送多次,這樣就會導致消息重復,這個我們后面再講解決方案。

2、MQ服務端

MQ服務端作為消息的存儲介質,也有可能會丟失消息。比如:一個分區突然掛掉,那么怎么保證這個分區的數據不丟失,我們會引入副本概念,通過備份來解決這個問題。

具體可設置哪些參數?

2.1 參數 replication.factor

表示分區副本的個數,replication.factor >1 當leader 副本掛了,follower副本會被選舉為leader繼續提供服務。

2.2 參數 min.insync.replicas

表示 ISR 最少的副本數量,通常設置 min.insync.replicas >1,這樣才有可用的follower副本執行替換,保證消息不丟失

2.3 參數 unclean.leader.election.enable

是否可以把非 ISR 集合中的副本選舉為 leader 副本。

如果設置為true,而follower副本的同步消息進度落后較多,此時被選舉為leader,會導致消息丟失,慎用。

3、消費端

消費端要做的是把消息完整的消費處理掉。但是這里面有個提交位移的步驟。

有的同學,考慮到業務處理消耗時間較長,會單獨啟動線程拉取消息存儲到本地內存隊列,然后再搞個線程池并行處理業務邏輯。這樣設計有個風險,本地消息如果沒有處理完,服務器宕機了,會造成消息丟失。

正確的做法:拉取消息 --- 業務處理 ---- 提交消費位移

關于提交位移,kafka提供了集中參數配置

參數 enable.auto.commit

表示消費位移是否自動提交。

如果拉取了消息,業務邏輯還沒處理完,提交了消費位移但是消費端卻掛了,消費端恢復或其他消費端接管該分片再也拉取不到這條消息,會造成消息丟失。所以,我們通常設置 enable.auto.commit=false,手動提交消費位移。

  1. List<String> messages = consumer.poll(); 
  2. processMsg(messages); 
  3. consumer.commitOffset(); 

這個方案,會產生另外一個問題,我們來看下這個圖:

拉取了消息4~消息8,業務處理后,在提交消費位移時,不湊巧系統宕機了,最后的提交位移并沒有保存到MQ 服務端,下次拉取消息時,依然是從消息4開始拉取,但是這部分消息已經處理過了,這樣便會導致重復消費。

如何解決重復消費,避免引發數據不一致

首先,要解決MQ 服務端的重復消息。kafka 在 0.11.0 版本后,每條消息都有唯一的message id, MQ服務采用空間換時間方式,自動對重復消息過濾處理,保證接口的冪等性。

但這個不能根本上解決消息重復問題,即使MQ服務中存儲的消息沒有重復,但消費端是采用拉取方式,如果重復拉取,也會導致重復消費,如何解決這種場景問題?

方案一:只拉取一次(消費者拉取消息后,先提交 offset 后再處理消息),但是如果系統宕機,業務處理沒有正常結束,后面再也拉取不到這些消息,會導致數據不一致,該方案很少采用。

方案二:允許拉取重復消息,但是消費端自己做冪等性控制。保證只成功消費一次。 

關于冪等技術方案很多,我們可以采用數據表或Redis緩存存儲處理標識,每次拉取到消息,處理前先校驗處理狀態,再決定是處理還是丟棄消息。

 

責任編輯:武曉燕 來源: 微觀技術
相關推薦

2024-06-18 08:26:22

2022-08-29 18:14:55

MQ數據不丟失

2024-08-06 09:55:25

2021-03-08 10:19:59

MQ消息磁盤

2021-09-13 07:23:53

KafkaGo語言

2022-07-14 14:27:34

Javascript數字精度二進制

2021-10-22 08:37:13

消息不丟失rocketmq消息隊列

2022-08-26 05:24:04

中間件技術Kafka

2019-03-13 09:27:57

宕機Kafka數據

2022-08-26 17:08:51

KafkaRedi數據

2024-04-23 08:46:45

消息積壓KafkaMQ

2018-03-29 09:46:02

2022-03-31 08:26:44

RocketMQ消息排查

2009-06-05 15:35:31

網絡不通數據發送

2024-11-11 07:05:00

Redis哨兵模式主從復制

2011-03-07 14:09:10

FileZilla

2024-02-26 08:10:00

Redis數據數據庫

2011-08-22 14:50:39

ssh

2022-07-11 08:01:55

Kafka服務器宕機

2012-09-05 11:09:15

SELinux操作系統
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: japan25hdxxxx日本| 国产色网 | 日本成人中文字幕 | 亚洲一区av在线 | 午夜看电影在线观看 | 一区二区三区不卡视频 | 亚洲国产成人精品久久 | 99久久亚洲 | 天天躁日日躁狠狠很躁 | 亚洲国产成人久久综合一区,久久久国产99 | 日韩在线精品 | 97国产成人 | 亚洲欧美中文日韩在线v日本 | 亚洲另类春色偷拍在线观看 | 美女天天干天天操 | 97精品超碰一区二区三区 | 欧洲成人午夜免费大片 | 美国黄色一级片 | 91香蕉| 天天综合国产 | 99久久婷婷 | 国产精品亚洲精品日韩已方 | 久久av网| 国产中文字幕av | 欧美精品一区二区在线观看 | 成人在线中文字幕 | 日韩欧美三级 | 国精产品一品二品国精在线观看 | 日韩有码一区二区三区 | 亚洲欧美一区二区三区在线 | 综合精品久久久 | 盗摄精品av一区二区三区 | 激情久久av一区av二区av三区 | 99久久中文字幕三级久久日本 | 精品乱子伦一区二区三区 | 亚洲成年人免费网站 | 亚洲成人久久久 | 女人夜夜春 | 日韩精品一区二区三区中文在线 | 亚洲一区网站 | 亚洲精品字幕 |