成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

消費者實現邏輯-kafka知識體系(四)

開發 架構 Kafka
Consumer Group 是 Kafka 提供的可擴展且具有容錯性的消費者機制。Kafka 僅僅使用 Consumer Group 這一種機制,卻同時實現了傳統消息引擎系統的兩大模型:消息隊列模型、發布 / 訂閱模型。

[[410017]]

上篇文章分享kafka broker 的實現原理、數據的存儲結構和消息持久化相關的東西,那消息存儲完了之后,怎么被消費端消費呢,本文來聊一聊Kafka 消費端的那些事兒。

1)拉取機制

Kafka生產端是推的機制即Push,消費端是拉的機制即Pull。

2)Pull的優缺點

優點是消費端可以自己控制消息的讀取速度和數量;

缺點是不知道服務端有沒有數據,所以要一直pull或隔一定時間pull,可能要pull多次并等待。

3)消息投遞語義:

Kafka默認保證at-least-once delivery,容許用戶實現at-most-once語義,exactly-once的實現取決于目的存儲系統。

4)分區分配策略

RangeAssignor:按照分區范圍分配,當前默認策略;

RoundRobinAssignor:輪詢的方式分配;

StickyAssignor:Kafka 0.11版本引入,根據更多指標比如負載,盡可能均勻。

這些前面的文章中也有提到。

消費者組

Consumer Group 是 Kafka 提供的可擴展且具有容錯性的消費者機制。Kafka 僅僅使用 Consumer Group 這一種機制,卻同時實現了傳統消息引擎系統的兩大模型:消息隊列模型、發布 / 訂閱模型。

理想情況下,Consumer 實例的數量應該等于該 Group 訂閱主題的分區總數。

【消費者和消費組】

Kafka消費者是消費組的一部分,當多個消費者形成一個消費組來消費主題時,每個消費者會收到不同分區的消息。假設有一個T1主題,該主題有4個分區;同時我們有一個消費組G1,這個消費組只有一個消費者C1。那么消費者C1將會收到這4個分區的消息,如下所示:

Kafka一個很重要的特性就是,只需寫入一次消息,可以支持任意多的應用讀取這個消息。換句話說,每個應用都可以讀到全量的消息。為了使得每個應用都能讀到全量消息,應用需要有不同的消費組。對于上面的例子,假如我們新增了一個新的消費組G2,而這個消費組有兩個消費者,那么會是這樣的:

這里值得我們注意的是:

  • 一個topic 可以被 多個 消費者組 消費,

但是每個 消費者組 消費的數據是 互不干擾 的,也就是說,每個 消費組 消費的都是 完整的數據 。

  • 一個分區只能被 同一個消費組內 的一個 消費者 消費,

而 不能拆給多個消費者 消費,也就是說如果你某個 消費者組內的消費者數 比 該 Topic 的分區數還多,那么多余的消費者是不起作用的

消費者分區分配的過程

那么我們現在就來看看分配過程是怎么樣的。

1.確定 群組協調器

每當我們創建一個消費組,kafka 會為我們分配一個 broker 作為該消費組的 coordinator(協調器)

2.注冊消費者 并選出 leader consumer

當我們的有了 coordinator 之后,消費者將會開始往該 coordinator上進行注冊,第一個注冊的 消費者將成為該消費組的 leader,后續的 作為 follower

3.當 leader 選出來后,

他會從coordinator那里實時獲取分區 和 consumer 信息,并根據分區策略給每個consumer 分配 分區,并將分配結果告訴 coordinator。

4.follower 消費者將從 coordinator 那里獲取到自己相關的分區信息進行消費,

對于所有的 follower 消費者而言,他們只知道自己消費的分區,并不知道其他消費者的存在。

5.至此,消費者都知道自己的消費的分區,

分區過程結束,當發生 分區再均衡 的時候,leader 將會重復分配過程

具體的流程圖可以翻閱前面的文章。

關于位移

【位移 offset】

  • 每個消費者在消費消息的過程中必然需要有個字段記錄它當前消費到了分區的哪個位置上,這個字段就是消費者位移(Consumer Offset),它是消費者消費進度的指示器。
  • 看上去Offset 就是一個數值而已,其實對于 Consumer Group 而言,它是一組 KV 對,Key 是分區,V 對應 Consumer 消費該分區的最新位移 TopicPartition->long
  • 不過切記的是消費者位移是下一條消息的位移,而不是目前最新消費消息的位移。
  • 提交位移主要是為了表征 Consumer 的消費進度,這樣當 Consumer 發生故障重啟之后,就能夠從 Kafka 中讀取之前提交的位移值,然后從相應的位移處繼續消費,從而避免整個消費過程重來一遍。

【位移的保存】

其實Consumer 端應用程序在提交位移時,其實是向 Coordinator 所在的 Broker 提交位移。同樣地,當 Consumer 應用啟動時,也是向 Coordinator 所在的 Broker 發送各種請求,然后由 Coordinator 負責執行消費者組的注冊、成員管理記錄等元數據管理操作。

老版本的 Consumer Group 把位移保存在 ZooKeeper 中,新版本的 Consumer Group 中,Kafka 社區重新設計了 Consumer Group 的位移管理方式,采用了將位移保存在 Kafka內部主題的方法,也就是__consumer_offsets,俗稱位移主題。至于為什么放棄kafka 保存位移請看我前面的文章《基礎概念、架構和新版的升級Kafka知識體系1》。

【位移主題的數據格式】

key

  • 位移主題的 Key 中應該保存 3 部分內容:Group ID,主題名,分區號

value

  • 主要保存的是offset 的信息,當然還有時間戳等信息,你還記得你可以根據時間重置一個消費者開始消費的地方嗎

【位移的提交】

1. 自動提交

最簡單的提交方式是讓消費者自動提交偏移量,如果 enable.auto.commit 被設為 true,那么每過 5s,消費者會自動把從 poll() 方法接收到的最大偏移量提交上去。

可能造成的問題:數據重復讀

假設我們仍然使用默認的 5s 提交時間間隔,在最近一次提交之后的 3s 發生了再均衡,再均衡之后,消費者從最后一次提交的偏移量位置開始讀取消息。這個時候偏移量已經落后了 3s,所以在這 3s內到達的消息會被重復處理??梢酝ㄟ^修改提交時間間隔來更頻繁地提交偏移量,減小可能出現重復消息的時間窗,不過這種情況是無法完全避免的。

2. 手動提交

2.1 同步提交

同步存在的問題

  • 從名字上來看,它是一個同步操作,即該方法會一直等待,直到位移被成功提交才會返回。如果提交過程中出現異常,該方法會將異常信息拋出。
  • commitSync()的問題在于,Consumer程序會處于阻塞狀態,直到遠端的Broker返回提交結果,這個狀態才會結束,需要注意的是同步提交會在提交失敗之后進行重試
  • 在任何系統中,因為程序而非資源限制而導致的阻塞都可能是系統的瓶頸,會影響整個應用程序的 TPS,影響吞吐量。

2.2 異步提交

手動提交有一個不足之處,在 broker 對提交請求作出回應之前,應用程序會一直阻塞,這樣會限制應用程序的吞吐量。我們可以通過降低提交頻率來提升吞吐量,但如果發生了再均衡,會增加重復消息的數量。

這時可以使用異步提交,只管發送提交請求,無需等待 broker 的響應。它之所以不進行重試,是因為在它收到服務器響應的時候,可能有一個更大的偏移量已經提交成功。

假設我們發出一個請求用于提交偏移量2000,這個時候發生了短暫的通信問題,服務器收不到請求,自然也不會作出任何響應。與此同時,我們處理了另外一批消息,并成功提交了偏移量3000。如果commitAsync()重新嘗試提交偏移量2000,它有可能在偏移量3000之后提交成功。這個時候如果發生再均衡,就會出現重復消息。

異步存在的問題

  • commitAsync 的問題在于,出現問題時它不會自動重試。因為它是異步操作,倘若提交失敗后自動重試,那么它重試時提交的位移值可能早已經“過期”或不是最新值了。因此,異步提交的重試其實沒有意義,所以 commitAsync 是不會重試的,所以只要在程序停止前最后一次提交成功即可。
  • 這里提供一個解決方案,那就是不論成功還是失敗我們都將offsets信息記錄下來,如果最后一次提交成功那就忽略,如果最后一次沒有提交成功,我們可以在下次重啟的時候手動指定offset

綜合異步和同步來提交

同時使用了 commitSync() 和 commitAsync()。對于常規性、階段性的手動提交,我們調用 commitAsync() 避免程序阻塞,而在 Consumer 要關閉前,我們調用 commitSync() 方法執行同步阻塞式的位移提交,以確保 Consumer 關閉前能夠保存正確的位移數據。

關于再均衡Rebalance

分區的所有權從一個消費者轉移到另一個消費者,這樣的行為被稱為再均衡(Rebalance)。再均衡非常重要,為消費者組帶來了高可用性和伸縮性,可以放心的增加或移除消費者。以下是觸發再均衡的三種行為:

  1. 當一個 消費者 加入組時,讀取了原本由其他消費者讀取的分區,會觸發再均衡。
  2. 當一個 消費者 離開組時(被關閉或發生崩潰),原本由它讀取的分區將被組里的其他 消費者 來讀取,會觸發再均衡。
  3. 當 Topic 發生變化時,比如添加了新的分區,會發生分區重分配,會觸發再均衡。

分區再均衡 期間該 Topic 是不可用的,所以Rebalance 實在是太慢了!!!

這里再補充一下生產環境中因為不正確的配置引起的不需要的分區再均衡。

正常集群變動不再考慮范圍內:

1.防止 因為未能及時發送心跳,導致Consumer 超時被踢出消費者組。

這里可以設置 session.timeout.ms超時時間 和 heartbeat.interval.ms 心跳間隔一般可以把 超時時間設置為 心跳間隔的 3倍。

2.Consumer消費時間過長導致的。

Consumer端如果無法在規定時間內消費完 poll 來的消息,那么就認為該消費者有問題,從而該消費者會自主離組,所以我們可以設置 max.poll.interval.ms比處理時間略長。

3.從第二點我們還可能引申一點就是,如果集群經常發生 分區在均衡,

那么你可能需要去觀察下消費者執行任務的耗時,特別注意觀察下 GC 的占用時間。

往往線上出問題也是因為配置不合理導致的。

 

責任編輯:姜華 來源: 今日頭條
相關推薦

2021-07-12 10:25:03

RocketMQ數據結構kafka

2021-07-05 06:26:08

生產者kafka架構

2021-07-07 07:06:31

Brokerkafka架構

2023-06-01 08:08:38

kafka消費者分區策略

2015-07-28 17:52:36

IOS知識體系

2021-07-14 17:18:14

RocketMQ消息分布式

2017-06-22 13:07:21

2012-03-08 11:13:23

企業架構

2021-10-26 10:50:25

Kafkabroker

2021-07-13 11:52:47

順序消息RocketMQkafka

2021-12-22 11:00:05

模型Golang語言

2017-02-27 16:42:23

Spark識體系

2017-04-03 15:35:13

知識體系架構

2021-07-02 06:27:00

Kafka架構主從架構

2015-08-26 09:39:30

java消費者

2022-08-02 10:01:42

架構

2017-05-16 12:30:21

Python多線程生產者消費者模式

2022-07-07 09:00:49

RocketMQ消費者消息消費

2021-07-08 07:16:24

RocketMQ數據結構Message

2011-08-05 16:21:24

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 免费一区二区三区 | 午夜精品一区二区三区在线视频 | 成人av鲁丝片一区二区小说 | 国产成人精品一区二区三区 | 国产成人jvid在线播放 | 中文字幕不卡在线观看 | 日韩在线观看一区 | 免费在线观看成人 | 亚洲欧美综合精品久久成人 | 国产精品久久久久久久岛一牛影视 | 国产乱码精品一区二区三区五月婷 | 久久久视 | 亚洲日韩中文字幕一区 | 精品国产91乱码一区二区三区 | 精品欧美久久 | 日韩资源 | 久久久久国产一区二区三区 | 狠狠婷婷综合久久久久久妖精 | 91精品国产91久久久久青草 | 亚洲视频在线免费观看 | 午夜影院网站 | 久久com| 美女露尿口视频 | 日韩欧美在线视频一区 | 国产一级在线 | 亚洲午夜视频 | 欧美日韩一区二区在线 | 亚洲传媒在线 | 毛片免费观看视频 | 久久久国产精品 | 你懂的在线视频播放 | 国产美女精品视频 | 91精品国产高清一区二区三区 | 偷拍自拍在线观看 | 久久精品—区二区三区 | 国产精品一区二区免费 | 国产精品三级久久久久久电影 | 偷拍自拍在线观看 | 亚洲天堂中文字幕 | 中文字幕 视频一区 | 欧美在线不卡 |