成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Kafka又出問題了!

開發(fā) 架構(gòu) Kafka
作者個人研發(fā)的在高并發(fā)場景下,提供的簡單、穩(wěn)定、可擴(kuò)展的延遲消息隊列框架,具有精準(zhǔn)的定時任務(wù)和延遲隊列處理功能。

[[384383]]

 作者個人研發(fā)的在高并發(fā)場景下,提供的簡單、穩(wěn)定、可擴(kuò)展的延遲消息隊列框架,具有精準(zhǔn)的定時任務(wù)和延遲隊列處理功能。自開源半年多以來,已成功為十幾家中小型企業(yè)提供了精準(zhǔn)定時調(diào)度方案,經(jīng)受住了生產(chǎn)環(huán)境的考驗。為使更多童鞋受益,現(xiàn)給出開源框架地址:https://github.com/sunshinelyz/mykit-delay

寫在前面

估計運(yùn)維年前沒有祭拜服務(wù)器,Nginx的問題修復(fù)了,Kafka又不行了。今天,本來想再睡會,結(jié)果,電話又響了。還是運(yùn)營,“喂,冰河,到公司了嗎?趕緊看看服務(wù)器吧,又出問題了“。“在路上了,運(yùn)維那哥們兒還沒上班嗎”?“還在休假。。。”, 我:“。。。”。哎,這哥們兒是跑路了嗎?先不管他,問題還是要解決。

問題重現(xiàn)

到公司后,放下我專用的雙肩包,拿出我的利器——筆記本電腦,打開后迅速登錄監(jiān)控系統(tǒng),發(fā)現(xiàn)主要業(yè)務(wù)系統(tǒng)沒啥問題。一個非核心服務(wù)發(fā)出了告警,并且監(jiān)控系統(tǒng)中顯示這個服務(wù)頻繁的拋出如下異常。

  1. 2021-02-28 22:03:05 131 pool-7-thread-3 ERROR [] -  
  2. commit failed  
  3. org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 
  4.         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na] 
  5.         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na] 
  6.         at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na] 
  7.         at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na] 
  8.         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161] 
  9.         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161] 
  10.         at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161] 

從上面輸出的異常信息,大概可以判斷出系統(tǒng)出現(xiàn)的問題:Kafka消費(fèi)者在處理完一批poll消息后,在同步提交偏移量給broker時報錯了。大概就是因為當(dāng)前消費(fèi)者線程的分區(qū)被broker給回收了,因為Kafka認(rèn)為這個消費(fèi)者掛掉了,我們可以從下面的輸出信息中可以看出這一點(diǎn)。

  1. Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 

Kafka內(nèi)部觸發(fā)了Rebalance機(jī)制,明確了問題,接下來,我們就開始分析問題了。

分析問題

既然Kafka觸發(fā)了Rebalance機(jī)制,那我就來說說Kafka觸發(fā)Rebalance的時機(jī)。

什么是Rebalance

舉個具體點(diǎn)的例子,比如某個分組下有10個Consumer實(shí)例,這個分組訂閱了一個50個分區(qū)的主題。正常情況下,Kafka會為每個消費(fèi)者分配5個分區(qū)。這個分配的過程就是Rebalance。

觸發(fā)Rebalance的時機(jī)

當(dāng)Kafka中滿足如下條件時,會觸發(fā)Rebalance:

  • 組內(nèi)成員的個數(shù)發(fā)生了變化,比如有新的消費(fèi)者加入消費(fèi)組,或者離開消費(fèi)組。組成員離開消費(fèi)組包含組成員崩潰或者主動離開消費(fèi)組。
  • 訂閱的主題個數(shù)發(fā)生了變化。
  • 訂閱的主題分區(qū)數(shù)發(fā)生了變化。

后面兩種情況我們可以人為的避免,在實(shí)際工作過程中,對于Kafka發(fā)生Rebalance最常見的原因是消費(fèi)組成員的變化。

消費(fèi)者成員正常的添加和停掉導(dǎo)致Rebalance,這種情況無法避免,但是時在某些情況下,Consumer 實(shí)例會被 Coordinator 錯誤地認(rèn)為 “已停止” 從而被“踢出”Group,導(dǎo)致Rebalance。

當(dāng) Consumer Group 完成 Rebalance 之后,每個 Consumer 實(shí)例都會定期地向 Coordinator 發(fā)送心跳請求,表明它還存活著。如果某個 Consumer 實(shí)例不能及時地發(fā)送這些心跳請求,Coordinator 就會認(rèn)為該 Consumer 已經(jīng) “死” 了,從而將其從 Group 中移除,然后開啟新一輪 Rebalance。這個時間可以通過Consumer 端的參數(shù) session.timeout.ms 進(jìn)行配置。默認(rèn)值是 10 秒。

除了這個參數(shù),Consumer 還提供了一個控制發(fā)送心跳請求頻率的參數(shù),就是 heartbeat.interval.ms。這個值設(shè)置得越小,Consumer 實(shí)例發(fā)送心跳請求的頻率就越高。頻繁地發(fā)送心跳請求會額外消耗帶寬資源,但好處是能夠更加快速地知曉當(dāng)前是否開啟 Rebalance,因為,目前 Coordinator 通知各個 Consumer 實(shí)例開啟 Rebalance 的方法,就是將 REBALANCE_NEEDED 標(biāo)志封裝進(jìn)心跳請求的響應(yīng)體中。

除了以上兩個參數(shù),Consumer 端還有一個參數(shù),用于控制 Consumer 實(shí)際消費(fèi)能力對 Rebalance 的影響,即 max.poll.interval.ms 參數(shù)。它限定了 Consumer 端應(yīng)用程序兩次調(diào)用 poll 方法的最大時間間隔。它的默認(rèn)值是 5 分鐘,表示 Consumer 程序如果在 5 分鐘之內(nèi)無法消費(fèi)完 poll 方法返回的消息,那么 Consumer 會主動發(fā)起 “離開組” 的請求,Coordinator 也會開啟新一輪 Rebalance。

通過上面的分析,我們可以看一下那些rebalance是可以避免的:

第一類非必要 Rebalance 是因為未能及時發(fā)送心跳,導(dǎo)致 Consumer 被 “踢出”Group 而引發(fā)的。這種情況下我們可以設(shè)置 session.timeout.ms 和 heartbeat.interval.ms 的值,來盡量避免rebalance的出現(xiàn)。(以下的配置是在網(wǎng)上找到的最佳實(shí)踐,暫時還沒測試過)

  • 設(shè)置 session.timeout.ms = 6s。
  • 設(shè)置 heartbeat.interval.ms = 2s。
  • 要保證 Consumer 實(shí)例在被判定為 “dead” 之前,能夠發(fā)送至少 3 輪的心跳請求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

將 session.timeout.ms 設(shè)置成 6s 主要是為了讓 Coordinator 能夠更快地定位已經(jīng)掛掉的 Consumer,早日把它們踢出 Group。

第二類非必要 Rebalance 是 Consumer 消費(fèi)時間過長導(dǎo)致的。此時,max.poll.interval.ms 參數(shù)值的設(shè)置顯得尤為關(guān)鍵。如果要避免非預(yù)期的 Rebalance,最好將該參數(shù)值設(shè)置得大一點(diǎn),比下游最大處理時間稍長一點(diǎn)。

總之,要為業(yè)務(wù)處理邏輯留下充足的時間。這樣,Consumer 就不會因為處理這些消息的時間太長而引發(fā) Rebalance 。

拉取偏移量與提交偏移量

kafka的偏移量(offset)是由消費(fèi)者進(jìn)行管理的,偏移量有兩種,拉取偏移量(position)與提交偏移量(committed)。拉取偏移量代表當(dāng)前消費(fèi)者分區(qū)消費(fèi)進(jìn)度。每次消息消費(fèi)后,需要提交偏移量。在提交偏移量時,kafka會使用拉取偏移量的值作為分區(qū)的提交偏移量發(fā)送給協(xié)調(diào)者。

如果沒有提交偏移量,下一次消費(fèi)者重新與broker連接后,會從當(dāng)前消費(fèi)者group已提交到broker的偏移量處開始消費(fèi)。

所以,問題就在這里,當(dāng)我們處理消息時間太長時,已經(jīng)被broker剔除,提交偏移量又會報錯。所以拉取偏移量沒有提交到broker,分區(qū)又rebalance。下一次重新分配分區(qū)時,消費(fèi)者會從最新的已提交偏移量處開始消費(fèi)。這里就出現(xiàn)了重復(fù)消費(fèi)的問題。

異常日志提示的方案

其實(shí),說了這么多,Kafka消費(fèi)者輸出的異常日志中也給出了相應(yīng)的解決方案。

接下來,我們說說Kafka中的拉取偏移量和提交偏移量。

其實(shí),從輸出的日志信息中,也大概給出了解決問題的方式,簡單點(diǎn)來說,就是可以通過增加 max.poll.interval.ms 時長和 session.timeout.ms時長,減少 max.poll.records的配置值,并且消費(fèi)端在處理完消息時要及時提交偏移量。

問題解決

通過之前的分析,我們應(yīng)該知道如何解決這個問題了。這里需要說一下的是,我在集成Kafka的時候,使用的是SpringBoot和Kafka消費(fèi)監(jiān)聽器,消費(fèi)端的主要代碼結(jié)構(gòu)如下所示。

  1. @KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory"
  2. public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){ 
  3.     logger.info("topic is {}, offset is {}, value is {} n", record.topic(), record.offset(), record.value()); 
  4.     try { 
  5.         Object value = record.value(); 
  6.         logger.info(value.toString()); 
  7.         ack.acknowledge(); 
  8.     } catch (Exception e) { 
  9.         logger.error("日志消費(fèi)端異常: {}", e); 
  10.     } 

上述代碼邏輯比較簡單,就是獲取到Kafka中的消息后直接打印輸出到日志文件中。

嘗試解決

這里,我先根據(jù)異常日志的提示信息進(jìn)行配置,所以,我在SpringBoot的application.yml文件中新增了如下配置信息。

  1. spring: 
  2.   kafka: 
  3.     consumer: 
  4.     properties: 
  5.      max.poll.interval.ms: 3600000 
  6.      max.poll.records: 50 
  7.      session.timeout.ms: 60000 
  8.      heartbeat.interval.ms: 3000 

配置完成后,再次測試消費(fèi)者邏輯,發(fā)現(xiàn)還是拋出Rebalance異常。

最終解決

我們從另一個角度來看下Kafka消費(fèi)者所產(chǎn)生的問題:一個Consumer在生產(chǎn)消息,另一個Consumer在消費(fèi)它的消息,它們不能在同一個groupId 下面,更改其中一個的groupId 即可。

這里,我們的業(yè)務(wù)項目是分模塊和子系統(tǒng)進(jìn)行開發(fā)的,例如模塊A在生產(chǎn)消息,模塊B消費(fèi)模塊A生產(chǎn)的消息。此時,修改配置參數(shù),例如 session.timeout.ms: 60000,根本不起作用,還是拋出Rebalance異常。

此時,我嘗試修改下消費(fèi)者分組的groupId,將下面的代碼

  1. @KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory"
  2. public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){ 

修改為如下所示的代碼。

  1. @KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer-logs", containerFactory = "kafkaListenerContainerFactory"
  2. public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){ 

再次測試,問題解決~~

 本文轉(zhuǎn)載自微信公眾號「冰河技術(shù)」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系冰河技術(shù)公眾號。

 

責(zé)任編輯:武曉燕 來源: 冰河技術(shù)
相關(guān)推薦

2022-09-19 08:35:28

Kafka節(jié)點(diǎn)故障

2012-05-16 13:43:20

操作系統(tǒng)故障檢修系統(tǒng)管理

2013-10-18 17:09:18

Windows 8.1微軟

2022-06-07 00:33:21

驅(qū)動安卓開發(fā)

2021-04-23 09:33:55

Windows10操作系統(tǒng)微軟

2020-05-27 15:14:55

iOSiPhone更新

2019-02-14 10:13:42

網(wǎng)絡(luò)故障RIPIGRP

2021-05-31 09:47:03

Windows10操作系統(tǒng)微軟

2020-03-04 15:20:17

Windows 10Windows更新

2023-07-27 15:17:56

微軟Windows 11

2019-05-25 17:19:33

Apple 支持蘋果設(shè)備

2021-03-12 15:50:54

Windows 10Windows操作系統(tǒng)

2021-06-15 05:36:45

Gulpawaitasync

2021-06-28 07:27:43

AwaitAsync語法

2018-08-22 10:12:07

2021-03-06 10:25:19

內(nèi)存Java代碼

2010-03-22 16:27:57

Windows安全殺毒軟件

2021-02-03 15:12:08

java內(nèi)存溢出

2010-02-01 16:39:32

Dell主板質(zhì)量

2019-02-27 16:00:28

IT資產(chǎn)審計
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 国产精品中文字幕在线观看 | 久久成人国产 | 狠狠色综合网站久久久久久久 | 日本黄色片免费在线观看 | 亚洲第一区久久 | 亚洲福利在线观看 | 日本免费黄色一级片 | 精品一区二区三区在线观看国产 | 精品一区二区在线观看 | 日韩www| 色婷婷亚洲国产女人的天堂 | 狠狠夜夜 | 欧洲一区视频 | 九九久视频| 毛片链接| 国产69久久精品成人看动漫 | 欧美综合国产精品久久丁香 | 国产午夜在线 | 欧美黑人一区二区三区 | 亚洲福利av| 久草青青草 | 久久精品国产v日韩v亚洲 | 国产一级免费视频 | 精品日韩一区 | 欧美日韩视频在线 | 一区二区三区视频 | 日韩成人av在线播放 | 国产色黄| 中文字幕精品一区二区三区精品 | 在线免费观看黄色网址 | 婷婷丁香激情 | 国产精品福利网站 | 欧美综合一区二区三区 | 高清国产午夜精品久久久久久 | 婷婷毛片 | 成人二区| 天天插日日操 | 亚洲综合中文字幕在线观看 | 欧美久久久久久 | 男女激情网站免费 | 亚洲高清电影 |