從未如此簡單:10分鐘帶你逆襲Kafka!
原創(chuàng)【51CTO.com原創(chuàng)稿件】Apache Kafka 是一個(gè)快速、可擴(kuò)展的、高吞吐的、可容錯(cuò)的分布式“發(fā)布-訂閱”消息系統(tǒng), 使用 Scala 與 Java 語言編寫,能夠?qū)⑾囊粋€(gè)端點(diǎn)傳遞到另一個(gè)端點(diǎn)。
圖片來自 Pexels
較之傳統(tǒng)的消息中間件(例如 ActiveMQ、RabbitMQ),Kafka 具有高吞吐量、內(nèi)置分區(qū)、支持消息副本和高容錯(cuò)的特性,非常適合大規(guī)模消息處理應(yīng)用程序。
Kafka 官網(wǎng):http://kafka.apache.org/
Kafka 主要設(shè)計(jì)目標(biāo)如下:
- 以時(shí)間復(fù)雜度為 O(1) 的方式提供消息持久化能力,即使對 TB 級以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪問性能。
- 高吞吐率。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒 100K 條消息的傳輸。
- 支持 Kafka Server 間的消息分區(qū),及分布式消費(fèi),同時(shí)保證每個(gè) Partition 內(nèi)的消息順序傳輸。
- 同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理。
- 支持在線水平擴(kuò)展。
Kafka 通常用于兩大類應(yīng)用程序:
- 建立實(shí)時(shí)流數(shù)據(jù)管道,以可靠地在系統(tǒng)或應(yīng)用程序之間獲取數(shù)據(jù)。
- 構(gòu)建實(shí)時(shí)流應(yīng)用程序,以轉(zhuǎn)換或響應(yīng)數(shù)據(jù)流。
要了解 Kafka 如何執(zhí)行這些操作,讓我們從頭開始深入研究 Kafka 的功能。
首先幾個(gè)概念:
- Kafka 在一個(gè)或多個(gè)可以跨越多個(gè)數(shù)據(jù)中心的服務(wù)器上作為集群運(yùn)行。
- Kafka 集群將記錄流存儲在稱為主題的類別中。
- 每個(gè)記錄由一個(gè)鍵,一個(gè)值和一個(gè)時(shí)間戳組成。
Kafka 架構(gòu)體系如下圖:
Kafka 的應(yīng)用場景非常多, 下面我們就來舉幾個(gè)我們最常見的場景:
①用戶的活動跟蹤:用戶在網(wǎng)站的不同活動消息發(fā)布到不同的主題中心,然后可以對這些消息進(jìn)行實(shí)時(shí)監(jiān)測、實(shí)時(shí)處理。
當(dāng)然,也可以加載到 Hadoop 或離線處理數(shù)據(jù)倉庫,對用戶進(jìn)行畫像。像淘寶、天貓、京東這些大型電商平臺,用戶的所有活動都要進(jìn)行追蹤的。
②日志收集如下圖:
③限流削峰如下圖:
④高吞吐率實(shí)現(xiàn):Kafka 與其他 MQ 相比,最大的特點(diǎn)就是高吞吐率。為了增加存儲能力,Kafka 將所有的消息都寫入到了低速大容量的硬盤。
按理說,這將導(dǎo)致性能損失,但實(shí)際上,Kafka 仍然可以保持超高的吞吐率,并且其性能并未受到影響。
其主要采用如下方式實(shí)現(xiàn)了高吞吐率:
- 順序讀寫:Kafka 將消息寫入到了分區(qū) Partition 中,而分區(qū)中的消息又是順序讀寫的。順序讀寫要快于隨機(jī)讀寫。
- 零拷貝:生產(chǎn)者、消費(fèi)者對于 Kafka 中的消息是采用零拷貝實(shí)現(xiàn)的。
- 批量發(fā)送:Kafka 允許批量發(fā)送模式。
- 消息壓縮:Kafka 允許對消息集合進(jìn)行壓縮。
Kafka的優(yōu)點(diǎn)如下:
①解耦:在項(xiàng)目啟動之初來預(yù)測將來項(xiàng)目會碰到什么需求,是極其困難的。
消息系統(tǒng)在處理過程中間插入了一個(gè)隱含的、基于數(shù)據(jù)的接口層,兩邊的處理過程都要實(shí)現(xiàn)這一接口。
這允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
②冗余(副本):有些情況下,處理數(shù)據(jù)的過程會失敗。除非數(shù)據(jù)被持久化,否則將造成丟失。
消息隊(duì)列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)。
許多消息隊(duì)列所采用的"插入-獲取-刪除"范式中,在把一個(gè)消息從隊(duì)列中刪除之前,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢。
③擴(kuò)展性:因?yàn)橄㈥?duì)列解耦了你的處理過程,所以增大消息入隊(duì)和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調(diào)節(jié)參數(shù)。擴(kuò)展就像調(diào)大電力按鈕一樣簡單。
④靈活性&峰值處理能力:在訪問量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見;如果為以能處理這類峰值訪問為標(biāo)準(zhǔn)來投入資源隨時(shí)待命無疑是巨大的浪費(fèi)。
使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會因?yàn)橥话l(fā)的超負(fù)荷的請求而完全崩潰。
⑤可恢復(fù)性:系統(tǒng)的一部分組件失效時(shí),不會影響到整個(gè)系統(tǒng)。消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。
⑥順序保證:在大多使用場景下,數(shù)據(jù)處理的順序都很重要。大部分消息隊(duì)列本來就是排序的,并且能保證數(shù)據(jù)會按照特定的順序來處理。Kafka 保證一個(gè) Partition 內(nèi)的消息的有序性。
⑦緩沖:在任何重要的系統(tǒng)中,都會有需要不同的處理時(shí)間的元素。例如,加載一張圖片比應(yīng)用過濾器花費(fèi)更少的時(shí)間。
消息隊(duì)列通過一個(gè)緩沖層來幫助任務(wù)最高效率的執(zhí)行,寫入隊(duì)列的處理會盡可能的快速。該緩沖有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度。
⑧異步通信:很多時(shí)候,用戶不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,允許用戶把一個(gè)消息放入隊(duì)列,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少,然后在需要的時(shí)候再去處理它們。
Kafka 于其他 MQ 對比如下:
①RabbitMQ:RabbitMQ 是使用 Erlang 編寫的一個(gè)開源的消息隊(duì)列,本身支持很多的協(xié)議:AMQP,XMPP,SMTP,STOMP,也正因如此,它非常重量級,更適合于企業(yè)級的開發(fā)。
同時(shí)實(shí)現(xiàn)了 Broker 構(gòu)架,這意味著消息在發(fā)送給客戶端時(shí)先在中心隊(duì)列排隊(duì)。對路由,負(fù)載均衡或者數(shù)據(jù)持久化都有很好的支持。
②Redis:Redis 是一個(gè)基于 Key-Value 對的 NoSQL 數(shù)據(jù)庫,開發(fā)維護(hù)很活躍。
雖然它是一個(gè) Key-Value 數(shù)據(jù)庫存儲系統(tǒng),但它本身支持 MQ 功能,所以完全可以當(dāng)做一個(gè)輕量級的隊(duì)列服務(wù)來使用。
對于 RabbitMQ 和 Redis 的入隊(duì)和出隊(duì)操作,各執(zhí)行 100 萬次,每 10 萬次記錄一次執(zhí)行時(shí)間。測試數(shù)據(jù)分為 128Bytes、512Bytes、1K 和 10K 四個(gè)不同大小的數(shù)據(jù)。
實(shí)驗(yàn)表明:入隊(duì)時(shí),當(dāng)數(shù)據(jù)比較小時(shí) Redis 的性能要高于 RabbitMQ,而如果數(shù)據(jù)大小超過了 10K,Redis 則慢的無法忍受;出隊(duì)時(shí),無論數(shù)據(jù)大小,Redis 都表現(xiàn)出非常好的性能,而 RabbitMQ 的出隊(duì)性能則遠(yuǎn)低于 Redis。
③ZeroMQ:ZeroMQ 號稱最快的消息隊(duì)列系統(tǒng),尤其針對大吞吐量的需求場景。
ZeroMQ 能夠?qū)崿F(xiàn) RabbitMQ 不擅長的高級/復(fù)雜的隊(duì)列,但是開發(fā)人員需要自己組合多種技術(shù)框架,技術(shù)上的復(fù)雜度是對這 MQ 能夠應(yīng)用成功的挑戰(zhàn)。
ZeroMQ 具有一個(gè)獨(dú)特的非中間件的模式,你不需要安裝和運(yùn)行一個(gè)消息服務(wù)器或中間件,因?yàn)槟愕膽?yīng)用程序?qū)缪葸@個(gè)服務(wù)器角色。
你只需要簡單的引用 ZeroMQ 程序庫,可以使用 NuGet 安裝,然后你就可以愉快的在應(yīng)用程序之間發(fā)送消息了。
但是 ZeroMQ 僅提供非持久性的隊(duì)列,也就是說如果宕機(jī),數(shù)據(jù)將會丟失。其中,Twitter 的 Storm 0.9.0 以前的版本中默認(rèn)使用 ZeroMQ 作為數(shù)據(jù)流的傳輸(Storm 從 0.9 版本開始同時(shí)支持 ZeroMQ 和 Netty 作為傳輸模塊)。
④ActiveMQ:ActiveMQ 是 Apache 下的一個(gè)子項(xiàng)目。類似于 ZeroMQ,它能夠以代理人和點(diǎn)對點(diǎn)的技術(shù)實(shí)現(xiàn)隊(duì)列。同時(shí)類似于 RabbitMQ,它少量代碼就可以高效地實(shí)現(xiàn)高級應(yīng)用場景。
⑤Kafka/Jafka:Kafka 是 Apache 下的一個(gè)子項(xiàng)目,是一個(gè)高性能跨語言分布式發(fā)布/訂閱消息隊(duì)列系統(tǒng),而 Jafka 是在 Kafka 之上孵化而來的,即 Kafka 的一個(gè)升級版。
具有以下特性:
- 快速持久化,可以在 O(1) 的系統(tǒng)開銷下進(jìn)行消息持久化。
- 高吞吐,在一臺普通的服務(wù)器上既可以達(dá)到 10W/s 的吞吐速率。
- 完全的分布式系統(tǒng),Broker、Producer、Consumer 都原生自動支持分布式,自動實(shí)現(xiàn)負(fù)載均衡。
- 支持 Hadoop 數(shù)據(jù)并行加載,對于像 Hadoop 的一樣的日志數(shù)據(jù)和離線分析系統(tǒng),但又要求實(shí)時(shí)處理的限制,這是一個(gè)可行的解決方案。
Kafka 通過 Hadoop 的并行加載機(jī)制統(tǒng)一了在線和離線的消息處理。Apache Kafka 相對于 ActiveMQ 是一個(gè)非常輕量級的消息系統(tǒng),除了性能非常好之外,還是一個(gè)工作良好的分布式系統(tǒng)。
Kafka的幾種重要角色如下:
①Kafka 作為存儲系統(tǒng):任何允許發(fā)布與使用無關(guān)的消息發(fā)布的消息隊(duì)列都有效地充當(dāng)了運(yùn)行中消息的存儲系統(tǒng)。Kafka 的不同之處在于它是一個(gè)非常好的存儲系統(tǒng)。
寫入 Kafka 的數(shù)據(jù)將寫入磁盤并進(jìn)行復(fù)制以實(shí)現(xiàn)容錯(cuò)功能。Kafka 允許生產(chǎn)者等待確認(rèn),以便直到完全復(fù)制并確保即使寫入服務(wù)器失敗的情況下寫入也不會完成。
Kafka 的磁盤結(jié)構(gòu)可以很好地?cái)U(kuò)展使用-無論服務(wù)器上有 50KB 還是 50TB 的持久數(shù)據(jù),Kafka 都將執(zhí)行相同的操作。
由于認(rèn)真對待存儲并允許客戶端控制其讀取位置,因此您可以將 Kafka 視為一種專用于高性能,低延遲提交日志存儲,復(fù)制和傳播的專用分布式文件系統(tǒng)。
②Kafka 作為消息傳遞系統(tǒng):Kafka 的流概念與傳統(tǒng)的企業(yè)消息傳遞系統(tǒng)相比如何?
傳統(tǒng)上,消息傳遞具有兩種模型:排隊(duì)和發(fā)布訂閱。在隊(duì)列中,一組使用者可以從服務(wù)器中讀取內(nèi)容,并且每條記錄都將轉(zhuǎn)到其中一個(gè)。
在發(fā)布-訂閱記錄中廣播給所有消費(fèi)者。這兩個(gè)模型中的每一個(gè)都有優(yōu)點(diǎn)和缺點(diǎn)。
排隊(duì)的優(yōu)勢在于,它允許您將數(shù)據(jù)處理劃分到多個(gè)使用者實(shí)例上,從而擴(kuò)展處理量。
不幸的是,隊(duì)列不是多用戶的—一次進(jìn)程讀取了丟失的數(shù)據(jù)。發(fā)布-訂閱允許您將數(shù)據(jù)廣播到多個(gè)進(jìn)程,但是由于每條消息都傳遞給每個(gè)訂閱者,因此無法擴(kuò)展處理。
Kafka 的消費(fèi)者群體概念概括了這兩個(gè)概念。與隊(duì)列一樣,使用者組允許您將處理劃分為一組進(jìn)程(使用者組的成員)。與發(fā)布訂閱一樣,Kafka 允許您將消息廣播到多個(gè)消費(fèi)者組。
Kafka 模型的優(yōu)點(diǎn)在于,每個(gè)主題都具有這些屬性-可以擴(kuò)展處理范圍,并且是多訂閱者,無需選擇其中一個(gè)。
與傳統(tǒng)的消息傳遞系統(tǒng)相比,Kafka 還具有更強(qiáng)的訂購保證。傳統(tǒng)隊(duì)列將記錄按順序保留在服務(wù)器上,如果多個(gè)使用者從隊(duì)列中消費(fèi),則服務(wù)器將按記錄的存儲順序分發(fā)記錄。
但是,盡管服務(wù)器按順序分發(fā)記錄,但是這些記錄是異步傳遞給使用者的,因此它們可能在不同的使用者上亂序到達(dá)。
這實(shí)際上意味著在并行使用的情況下會丟失記錄的順序。消息傳遞系統(tǒng)通常通過“專有使用者”的概念來解決此問題,該概念僅允許一個(gè)進(jìn)程從隊(duì)列中使用,但是,這當(dāng)然意味著在處理中沒有并行性。
Kafka 做得更好,通過在主題內(nèi)具有并行性(即分區(qū))的概念,Kafka 能夠在用戶進(jìn)程池中提供排序保證和負(fù)載均衡。
這是通過將主題中的分區(qū)分配給消費(fèi)者組中的消費(fèi)者來實(shí)現(xiàn)的,以便每個(gè)分區(qū)都由組中的一個(gè)消費(fèi)者完全消費(fèi)。
通過這樣做,我們確保使用者是該分區(qū)的唯一讀取器,并按順序使用數(shù)據(jù)。由于存在許多分區(qū),因此仍然可以平衡許多使用者實(shí)例上的負(fù)載。但是請注意,使用者組中的使用者實(shí)例不能超過分區(qū)。
③Kafka 用作流處理:僅讀取,寫入和存儲數(shù)據(jù)流是不夠的,目的是實(shí)現(xiàn)對流的實(shí)時(shí)處理。
在 Kafka 中,流處理器是指從輸入主題中獲取連續(xù)數(shù)據(jù)流,對該輸入進(jìn)行一些處理并生成連續(xù)數(shù)據(jù)流以輸出主題的任何東西。
例如,零售應(yīng)用程序可以接受銷售和裝運(yùn)的輸入流,并輸出根據(jù)此數(shù)據(jù)計(jì)算出的重新訂購和價(jià)格調(diào)整流。
可以直接使用生產(chǎn)者和消費(fèi)者 API 進(jìn)行簡單處理。但是,對于更復(fù)雜的轉(zhuǎn)換,Kafka 提供了完全集成的 Streams API。
這允許構(gòu)建執(zhí)行非重要處理的應(yīng)用程序,這些應(yīng)用程序計(jì)算流的聚合或?qū)⒘鬟B接在一起。
該功能有助于解決此類應(yīng)用程序所面臨的難題:處理無序數(shù)據(jù),在代碼更改時(shí)重新處理輸入,執(zhí)行狀態(tài)計(jì)算等。
流 API 建立在 Kafka 提供的核心原語之上:它使用生產(chǎn)者和使用者 API 進(jìn)行輸入,使用 Kafka 進(jìn)行狀態(tài)存儲,并使用相同的組機(jī)制來實(shí)現(xiàn)流處理器實(shí)例之間的容錯(cuò)。
Kafka 中的關(guān)鍵術(shù)語解釋
Topic:主題。在 Kafka 中,使用一個(gè)類別屬性來劃分消息的所屬類,劃分消息的這個(gè)類稱為 Topic。Topic 相當(dāng)于消息的分類標(biāo)簽,是一個(gè)邏輯概念。
物理上不同 Topic 的消息分開存儲,邏輯上一個(gè) Topic 的消息雖然保存于一個(gè)或多個(gè) Broker 上但用戶只需指定消息的 Topic 即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處。
Partition:分區(qū)。Topic 中的消息被分割為一個(gè)或多個(gè) Partition,其是一個(gè)物理概念,對應(yīng)到系統(tǒng)上 就是一個(gè)或若干個(gè)目錄。Partition 內(nèi)部的消息是有序的,但 Partition 間的消息是無序的。
Segment 段。將 Partition 進(jìn)一步細(xì)分為了若干的 Segment,每個(gè) Segment 文件的大小相等。
Broker:Kafka 集群包含一個(gè)或多個(gè)服務(wù)器,每個(gè)服務(wù)器節(jié)點(diǎn)稱為一個(gè) Broker。
Broker 存儲 Topic 的數(shù)據(jù)。如果某 Topic 有 N 個(gè) Partition,集群有 N 個(gè) Broker,那么每個(gè) Broker 存儲該 Topic 的一個(gè) Partition。
如果某 Topic 有 N 個(gè) Partition,集群有(N+M)個(gè) Broker,那么其中有 N 個(gè) Broker 存儲該 Topic 的一個(gè) Partition,剩下的 M 個(gè) Broker 不存儲該 Topic 的 Partition 數(shù)據(jù)。
如果某 Topic 有 N 個(gè) Partition,集群中 Broker 數(shù)目少于 N 個(gè),那么一個(gè) Broker 存儲該 Topic 的一個(gè)或多個(gè) Partition。
在實(shí)際生產(chǎn)環(huán)境中,盡量避免這種情況的發(fā)生,這種情況容易導(dǎo)致 Kafka 集群數(shù)據(jù)不均衡。
Producer:生產(chǎn)者。即消息的發(fā)布者,生產(chǎn)者將數(shù)據(jù)發(fā)布到他們選擇的主題。
生產(chǎn)者負(fù)責(zé)選擇將哪個(gè)記錄分配給主題中的哪個(gè)分區(qū)。即:生產(chǎn)者生產(chǎn)的一條消息,會被寫入到某一個(gè) Partition。
Consumer:消費(fèi)者。可以從 Broker 中讀取消息。一個(gè)消費(fèi)者可以消費(fèi)多個(gè) Topic 的消息;一個(gè)消費(fèi)者可以消費(fèi)同一個(gè) Topic 中的多個(gè) Partition 中的消息;一個(gè) Partiton 允許多個(gè) Consumer 同時(shí)消費(fèi)。
Consumer Group:Consumer Group 是 Kafka 提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制。
組內(nèi)可以有多個(gè)消費(fèi)者,它們共享一個(gè)公共的 ID,即 Group ID。組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來消費(fèi)訂閱主題 的所有分區(qū)。
Kafka 保證同一個(gè) Consumer Group 中只有一個(gè) Consumer 會消費(fèi)某條消息。
實(shí)際上,Kafka 保證的是穩(wěn)定狀態(tài)下每一個(gè) Consumer 實(shí)例只會消費(fèi)某一個(gè)或多個(gè)特定的 Partition,而某個(gè) Partition 的數(shù)據(jù)只會被某一個(gè)特定的 Consumer 實(shí)例所消費(fèi)。
下面我們用官網(wǎng)的一張圖, 來標(biāo)識 Consumer 數(shù)量和 Partition 數(shù)量的對應(yīng)關(guān)系。
由兩臺服務(wù)器組成的 Kafka 群集,其中包含四個(gè)帶有兩個(gè)使用者組的分區(qū)(P0-P3)。消費(fèi)者組 A 有兩個(gè)消費(fèi)者實(shí)例,組 B 有四個(gè)。
對于這個(gè)消費(fèi)組, 以前一直搞不明白,我自己的總結(jié)是:Topic 中的 Partitoin 到 Group 是發(fā)布訂閱的通信方式。
即一條 Topic 的 Partition 的消息會被所有的 Group 消費(fèi),屬于一對多模式;Group 到 Consumer 是點(diǎn)對點(diǎn)通信方式,屬于一對一模式。
舉個(gè)例子:不使用 Group 的話,啟動 10 個(gè) Consumer 消費(fèi)一個(gè) Topic,這 10 個(gè) Consumer 都能得到 Topic 的所有數(shù)據(jù),相當(dāng)于這個(gè) Topic 中的任一條消息被消費(fèi) 10 次。
使用 Group 的話,連接時(shí)帶上 groupid,Topic 的消息會分發(fā)到 10 個(gè) Consumer 上,每條消息只被消費(fèi) 1 次。
Replizcas of partition:分區(qū)副本。副本是一個(gè)分區(qū)的備份,是為了防止消息丟失而創(chuàng)建的分區(qū)的備份。
Partition Leader:每個(gè) Partition 有多個(gè)副本,其中有且僅有一個(gè)作為 Leader,Leader 是當(dāng)前負(fù)責(zé)消息讀寫 的 Partition。即所有讀寫操作只能發(fā)生于 Leader 分區(qū)上。
Partition Follower:所有 Follower 都需要從 Leader 同步消息,F(xiàn)ollower 與 Leader 始終保持消息同步。Leader 與 Follower 的關(guān)系是主備關(guān)系,而非主從關(guān)系。
ISR:
- ISR,In-Sync Replicas,是指副本同步列表。ISR 列表是由 Leader 負(fù)責(zé)維護(hù)。
- AR,Assigned Replicas,指某個(gè) Partition 的所有副本, 即已分配的副本列表。
- OSR,Outof-Sync Replicas,即非同步的副本列表。
- AR=ISR+OSR
Offset:偏移量。每條消息都有一個(gè)當(dāng)前 Partition 下唯一的 64 字節(jié)的 Offset,它是相當(dāng)于當(dāng)前分區(qū)第一條消息的偏移量。
Broker Controller:Kafka集群的多個(gè) Broker 中,有一個(gè)會被選舉 Controller,負(fù)責(zé)管理整個(gè)集群中 Partition 和 Replicas 的狀態(tài)。
只有 Broker Controller 會向 Zookeeper 中注冊 Watcher,其他 Broker 及分區(qū)無需注冊。即 Zookeeper 僅需監(jiān)聽 Broker Controller 的狀態(tài)變化即可。
HW 與 LEO:
- HW,HighWatermark,高水位,表示 Consumer 可以消費(fèi)到的最高 Partition 偏移量。HW 保證了 Kafka 集群中消息的一致性。確切地說,是保證了 Partition 的 Follower 與 Leader 間數(shù) 據(jù)的一致性。
- LEO,Log End Offset,日志最后消息的偏移量。消息是被寫入到 Kafka 的日志文件中的, 這是當(dāng)前最后一個(gè)寫入的消息在 Partition 中的偏移量。
- 對于 Leader 新寫入的消息,Consumer 是不能立刻消費(fèi)的。Leader 會等待該消息被所有 ISR 中的 Partition Follower 同步后才會更新 HW,此時(shí)消息才能被 Consumer 消費(fèi)。
我相信你看完上面的概念還是懵逼的,好吧!下面我們就用圖來形象話的表示兩者的關(guān)系吧:
Zookeeper:Zookeeper 負(fù)責(zé)維護(hù)和協(xié)調(diào) Broker,負(fù)責(zé) Broker Controller 的選舉。在 Kafka 0.9 之前版本,Offset 是由 ZK 負(fù)責(zé)管理的。
總結(jié):ZK 負(fù)責(zé) Controller 的選舉,Controller 負(fù)責(zé) Leader 的選舉。
Coordinator:一般指的是運(yùn)行在每個(gè) Broker 上的 Group Coordinator 進(jìn)程,用于管理 Consumer Group 中的各個(gè)成員,主要用于 Offset 位移管理和 Rebalance。一個(gè) Coordinator 可以同時(shí)管理多個(gè)消費(fèi)者組。
Rebalance:當(dāng)消費(fèi)者組中的數(shù)量發(fā)生變化,或者 Topic 中的 Partition 數(shù)量發(fā)生了變化時(shí),Partition 的所有權(quán)會在消費(fèi)者間轉(zhuǎn)移,即 Partition 會重新分配,這個(gè)過程稱為再均衡 Rebalance。
再均衡能夠給消費(fèi)者組及 Broker 帶來高性能、高可用性和伸縮,但在再均衡期間消費(fèi)者是無法讀取消息的,即整個(gè) Broker 集群有小一段時(shí)間是不可用的。因此要避免不必要的再均衡。
Offset Commit:Consumer 從 Broker 中取一批消息寫入 Buffer 進(jìn)行消費(fèi),在規(guī)定的時(shí)間內(nèi)消費(fèi)完消息后,會自動將其消費(fèi)消息的 Offset 提交給 Broker,以記錄下哪些消息是消費(fèi)過的。當(dāng)然,若在時(shí)限內(nèi)沒有消費(fèi)完畢,其是不會提交 Offset 的。
Kafka的工作原理和過程
①消息寫入算法
消息發(fā)送者將消息發(fā)送給 Broker, 并形成最終的可供消費(fèi)者消費(fèi)的 log,是已給比較復(fù)雜的過程:
- Producer 先從 Zookeeper 中找到該 Partition 的 Leader。
- Producer將消息發(fā)送給該 Leader。
- Leader 將消息接入本地的 log,并通知 ISR 的 Followers。
- ISR 中的 Followers 從 Leader 中 Pull 消息, 寫入本地 log 后向 Leader 發(fā)送 Ack。
- Leader 收到所有 ISR 中的 Followers 的 Ack 后,增加 HW 并向 Producer 發(fā)送 Ack,表示消息寫入成功。
②消息路由策略
在通過 API 方式發(fā)布消息時(shí),生產(chǎn)者是以 Record 為消息進(jìn)行發(fā)布的。
Record 中包含 Key 與 Value,Value 才是我們真正的消息本身,而 Key 用于路由消息所要存放的 Partition。
消息要寫入到哪個(gè) Partition 并不是隨機(jī)的,而是有路由策略的:
- 若指定了 Partition,則直接寫入到指定的 Partition。
- 若未指定 Partition 但指定了 Key,則通過對 Key 的 Hash 值與 Partition 數(shù)量取模,該取模。
- 結(jié)果就是要選出的 Partition 索引。
- 若 Partition 和 Key 都未指定,則使用輪詢算法選出一個(gè) Partition。
③HW 截?cái)鄼C(jī)制
如果 Partition Leader 接收到了新的消息, ISR 中其它 Follower 正在同步過程中,還未同步完畢時(shí) leader 宕機(jī)。
此時(shí)就需要選舉出新的 Leader。若沒有 HW 截?cái)鄼C(jī)制,將會導(dǎo)致 Partition 中 Leader 與 Follower 數(shù)據(jù)的不一致。
當(dāng)原 Leader 宕機(jī)后又恢復(fù)時(shí),將其 LEO 回退到其宕機(jī)時(shí)的 HW,然后再與新的 Leader 進(jìn)行數(shù)據(jù)同步,這樣就可以保證老 Leader 與新 Leader 中數(shù)據(jù)一致了,這種機(jī)制稱為 HW 截?cái)鄼C(jī)制。
④消息發(fā)送的可靠性
生產(chǎn)者向 Kafka 發(fā)送消息時(shí),可以選擇需要的可靠性級別。通過 request.required.acks 參數(shù)的值進(jìn)行設(shè)置。
0 值:異步發(fā)送。生產(chǎn)者向 Kafka 發(fā)送消息而不需要 Kafka 反饋成功 Ack。該方式效率最高,但可靠性最低。
其可能會存在消息丟失的情況:
- 在傳輸過程中會出現(xiàn)消息丟失。
- 在 Broker 內(nèi)部會出現(xiàn)消息丟失。
- 會出現(xiàn)寫入到 Kafka 中的消息的順序與生產(chǎn)順序不一致的情況。
1 值:同步發(fā)送。生產(chǎn)者發(fā)送消息給 Kafka,Broker 的 Partition Leader 在收到消息后馬上發(fā)送成功 Ack(無需等等 ISR 中的 Follower 同步)。
生產(chǎn)者收到后知道消息發(fā)送成功,然后會再發(fā)送消息。如果一直未收到 Kafka 的 Ack,則生產(chǎn)者會認(rèn)為消息發(fā)送失敗,會重發(fā)消息。
該方式對于 Producer 來說,若沒有收到 Ack,一定可以確認(rèn)消息發(fā)送失敗了,然后可以重發(fā)。
但是,即使收到了 ACK,也不能保證消息一定就發(fā)送成功了。故,這種情況,也可能會發(fā)生消息丟失的情況。
-1 值:同步發(fā)送。生產(chǎn)者發(fā)送消息給 Kafka,Kafka 收到消息后要等到 ISR 列表中的所有副本都 同步消息完成后,才向生產(chǎn)者發(fā)送成功 Ack。
如果一直未收到 Kafka 的 Ack,則認(rèn)為消息發(fā)送 失敗,會自動重發(fā)消息。該方式會出現(xiàn)消息重復(fù)接收的情況。
⑤消費(fèi)者消費(fèi)過程解析
生產(chǎn)者將消息發(fā)送到 Topitc 中,消費(fèi)者即可對其進(jìn)行消費(fèi),其消費(fèi)過程如下:
- Consumer 向 Broker 提交連接請求,其所連接上的 Broker 都會向其發(fā)送Broker Controller 的通信 URL,即配置文件中的 Listeners 地址。
- 當(dāng) Consumer 指定了要消費(fèi)的 Topic 后,會向 Broker Controller 發(fā)送消費(fèi)請求。
- Broker Controller 會為 Consumer 分配一個(gè)或幾個(gè) Partition Leader,并將該 Partition 的當(dāng)前 Offset 發(fā)送給 Consumer。
- Consumer 會按照 Broker Controller 分配的 Partition 對其中的消息進(jìn)行消費(fèi)。
- 當(dāng) Consumer 消費(fèi)完該條消息后,Consumer 會向 Broker 發(fā)送一個(gè)消息已經(jīng)被消費(fèi)反饋,即該消息的 Offset。
- 在 Broker 接收到 Consumer 的 Offset 后,會更新相應(yīng)的 __consumer_offset 中。
- 以上過程會一直重復(fù),知道消費(fèi)者停止請求消費(fèi)。
- Consumer 可以重置 Offset,從而可以靈活消費(fèi)存儲在 Broker 上的消息。
⑥Partition Leader 選舉范圍
當(dāng) Leader 宕機(jī)后,Broker Controller 會從 ISR 中挑選一個(gè) Follower 成為新的 Leader。
如果 ISR 中沒有其他副本怎么辦?可以通過 unclean.leader.election.enable 的值來設(shè)置 Leader 選舉范圍。
False:必須等到 ISR 列表中所有的副本都活過來才進(jìn)行新的選舉。該策略可靠性有保證,但可用性低。
True:在 ISR 列表中沒有副本的情況下,可以選擇任意一個(gè)沒有宕機(jī)的主機(jī)作為新的 Leader,該策略可用性高,但可靠性沒有保證。
⑦重復(fù)消費(fèi)問題的解決方案
同一個(gè) Consumer 重復(fù)消費(fèi):當(dāng) Consumer 由于消費(fèi)能力低而引發(fā)了消費(fèi)超時(shí),則可能會形成重復(fù)消費(fèi)。
在某數(shù)據(jù)剛好消費(fèi)完畢,但是正準(zhǔn)備提交 Offset 時(shí)候,消費(fèi)時(shí)間超時(shí),則 Broker 認(rèn)為這條消息未消費(fèi)成功。這時(shí)就會產(chǎn)生重復(fù)消費(fèi)問題。其解決方案:延長 Offset 提交時(shí)間。
不同的 Consumer 重復(fù)消費(fèi):當(dāng) Consumer 消費(fèi)了消息,但還沒有提交 Offset 時(shí)宕機(jī),則這些已經(jīng)被消費(fèi)過的消息會被重復(fù)消費(fèi)。其解決方案:將自動提交改為手動提交。
⑧從架構(gòu)設(shè)計(jì)上解決 Kafka 重復(fù)消費(fèi)的問題
我們在設(shè)計(jì)程序的時(shí)候,比如考慮到網(wǎng)絡(luò)故障等一些異常的情況,我們都會設(shè)置消息的重試次數(shù),可能還有其他可能出現(xiàn)消息重復(fù),那我們應(yīng)該如何解決呢?下面提供三個(gè)方案:
方案一:保存并查詢
給每個(gè)消息都設(shè)置一個(gè)獨(dú)一無二的 uuid,所有的消息,我們都要存一個(gè) uuid。
我們在消費(fèi)消息的時(shí)候,首先去持久化系統(tǒng)中查詢一下看這個(gè)看是否以前消費(fèi)過,如沒有消費(fèi)過,在進(jìn)行消費(fèi),如果已經(jīng)消費(fèi)過,丟棄就好了。
下圖表明了這種方案:
方案二:利用冪等
冪等(Idempotence)在數(shù)學(xué)上是這樣定義的,如果一個(gè)函數(shù) f(x) 滿足:f(f(x)) = f(x),則函數(shù) f(x) 滿足冪等性。
這個(gè)概念被拓展到計(jì)算機(jī)領(lǐng)域,被用來描述一個(gè)操作、方法或者服務(wù)。一個(gè)冪等操作的特點(diǎn)是,其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。
一個(gè)冪等的方法,使用同樣的參數(shù),對它進(jìn)行多次調(diào)用和一次調(diào)用,對系統(tǒng)產(chǎn)生的影響是一樣的。所以,對于冪等的方法,不用擔(dān)心重復(fù)執(zhí)行會對系統(tǒng)造成任何改變。
我們舉個(gè)例子來說明一下。在不考慮并發(fā)的情況下,“將 X 老師的賬戶余額設(shè)置為 100 萬元”,執(zhí)行一次后對系統(tǒng)的影響是,X 老師的賬戶余額變成了 100 萬元。
只要提供的參數(shù) 100 萬元不變,那即使再執(zhí)行多少次,X 老師的賬戶余額始終都是 100 萬元,不會變化,這個(gè)操作就是一個(gè)冪等的操作。
再舉一個(gè)例子,“將 X 老師的余額加 100 萬元”,這個(gè)操作它就不是冪等的,每執(zhí)行一次,賬戶余額就會增加 100 萬元,執(zhí)行多次和執(zhí)行一次對系統(tǒng)的影響(也就是賬戶的余額)是不一樣的。
所以,通過這兩個(gè)例子,我們可以想到如果系統(tǒng)消費(fèi)消息的業(yè)務(wù)邏輯具備冪等性,那就不用擔(dān)心消息重復(fù)的問題了,因?yàn)橥粭l消息,消費(fèi)一次和消費(fèi)多次對系統(tǒng)的影響是完全一樣的。也就可以認(rèn)為,消費(fèi)多次等于消費(fèi)一次。
那么,如何實(shí)現(xiàn)冪等操作呢?最好的方式就是,從業(yè)務(wù)邏輯設(shè)計(jì)上入手,將消費(fèi)的業(yè)務(wù)邏輯設(shè)計(jì)成具備冪等性的操作。
但是,不是所有的業(yè)務(wù)都能設(shè)計(jì)成天然冪等的,這里就需要一些方法和技巧來實(shí)現(xiàn)冪等。
下面我們介紹一種常用的方法:利用數(shù)據(jù)庫的唯一約束實(shí)現(xiàn)冪等。
例如,我們剛剛提到的那個(gè)不具備冪等特性的轉(zhuǎn)賬的例子:將 X 老師的賬戶余額加 100 萬元。在這個(gè)例子中,我們可以通過改造業(yè)務(wù)邏輯,讓它具備冪等性。
首先,我們可以限定,對于每個(gè)轉(zhuǎn)賬單每個(gè)賬戶只可以執(zhí)行一次變更操作,在分布式系統(tǒng)中,這個(gè)限制實(shí)現(xiàn)的方法非常多,最簡單的是我們在數(shù)據(jù)庫中建一張轉(zhuǎn)賬流水表。
這個(gè)表有三個(gè)字段:轉(zhuǎn)賬單 ID、賬戶 ID 和變更金額,然后給轉(zhuǎn)賬單 ID 和賬戶 ID 這兩個(gè)字段聯(lián)合起來創(chuàng)建一個(gè)唯一約束,這樣對于相同的轉(zhuǎn)賬單 ID 和賬戶 ID,表里至多只能存在一條記錄。
這樣,我們消費(fèi)消息的邏輯可以變?yōu)椋?ldquo;在轉(zhuǎn)賬流水表中增加一條轉(zhuǎn)賬記錄,然后再根據(jù)轉(zhuǎn)賬記錄,異步操作更新用戶余額即可。”
在轉(zhuǎn)賬流水表增加一條轉(zhuǎn)賬記錄這個(gè)操作中,由于我們在這個(gè)表中預(yù)先定義了“賬戶 ID 轉(zhuǎn)賬單 ID”的唯一約束,對于同一個(gè)轉(zhuǎn)賬單同一個(gè)賬戶只能插入一條記錄,后續(xù)重復(fù)的插入操作都會失敗,這樣就實(shí)現(xiàn)了一個(gè)冪等的操作。
方案三:設(shè)置前提條件
為更新的數(shù)據(jù)設(shè)置前置條件另外一種實(shí)現(xiàn)冪等的思路是,給數(shù)據(jù)變更設(shè)置一個(gè)前置條件,如果滿足條件就更新數(shù)據(jù),否則拒絕更新數(shù)據(jù),在更新數(shù)據(jù)的時(shí)候,同時(shí)變更前置條件中需要判斷的數(shù)據(jù)。
這樣,重復(fù)執(zhí)行這個(gè)操作時(shí),由于第一次更新數(shù)據(jù)的時(shí)候已經(jīng)變更了前置條件中需要判斷的數(shù)據(jù),不滿足前置條件,則不會重復(fù)執(zhí)行更新數(shù)據(jù)操作。
比如,剛剛我們說過,“將 X 老師的賬戶的余額增加 100 萬元”這個(gè)操作并不滿足冪等性,我們可以把這個(gè)操作加上一個(gè)前置條件,變?yōu)椋?ldquo;如果 X 老師的賬戶當(dāng)前的余額為 500 萬元,將余額加 100 萬元”,這個(gè)操作就具備了冪等性。
對應(yīng)到消息隊(duì)列中的使用時(shí),可以在發(fā)消息時(shí)在消息體中帶上當(dāng)前的余額,在消費(fèi)的時(shí)候進(jìn)行判斷數(shù)據(jù)庫中,當(dāng)前余額是否與消息中的余額相等,只有相等才執(zhí)行變更操作。
但是,如果我們要更新的數(shù)據(jù)不是數(shù)值,或者我們要做一個(gè)比較復(fù)雜的更新操作怎么辦?用什么作為前置判斷條件呢?
更加通用的方法是,給你的數(shù)據(jù)增加一個(gè)版本號屬性,每次更數(shù)據(jù)前,比較當(dāng)前數(shù)據(jù)的版本號是否和消息中的版本號一致,如果不一致就拒絕更新數(shù)據(jù),更新數(shù)據(jù)的同時(shí)將版本號 +1,一樣可以實(shí)現(xiàn)冪等。
Kafka 集群搭建
我們在工作中,為了保證環(huán)境的高可用,防止單點(diǎn),Kafka 都是以集群的方式出現(xiàn)的,下面就帶領(lǐng)大家一起搭建一套 Kafka 集群環(huán)境。
我們在官網(wǎng)下載 Kafka,下載地址為:http://kafka.apache.org/downloads,下載我們需要的版本,推薦使用穩(wěn)定的版本。
搭建集群
①下載并解壓
- cd /usr/local/src
- wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.11-2.4.0.tgz
- mkdir /data/servers
- tar xzvf kafka_2.11-2.4.0.tgz -C /data/servers/
- cd /data/servers/kafka_2.11-2.4.0
②修改配置文件
Kafka 的配置文件 $KAFKA_HOME/config/server.properties,主要修改一下下面幾項(xiàng):
- 確保每個(gè)機(jī)器上的id不一樣
- broker.id=0
- 配置服務(wù)端的監(jiān)控地址
- listeners=PLAINTEXT://192.168.51.128:9092
- kafka 日志目錄
- log.dirs=/data/servers/kafka_2.11-2.4.0/logs
- #kafka設(shè)置的partitons的個(gè)數(shù)
- num.partitions=1
- zookeeper的連接地址, 如果有自己的zookeeper集群, 請直接使用自己搭建的zookeeper集群
- zookeeper.connect=192.168.51.128:2181
因?yàn)槲易约菏潜緳C(jī)做實(shí)驗(yàn),所有使用的是一個(gè)主機(jī)的不同端口,在線上,就是不同的機(jī)器,大家參考即可。
我們這里使用 Kafka 的 Zookeeper,只啟動一個(gè)節(jié)點(diǎn),但是正真的生產(chǎn)過程中,是需要 Zookeeper 集群,自己搭建就好,后期我們也會出 Zookeeper 的教程,大家請關(guān)注就好了。
③拷貝 3 份配置文件
- #創(chuàng)建對應(yīng)的日志目錄
- mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9092
- mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9093
- mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9094
- #拷貝三份配置文件
- cp server.properties server_9092.properties
- cp server.properties server_9093.properties
- cp server.properties server_9094.properties
④修改不同端口對應(yīng)的文件
- #9092的id為0, 9093的id為1, 9094的id為2
- broker.id=0
- # 配置服務(wù)端的監(jiān)控地址, 分別在不通的配置文件中寫入不同的端口
- listeners=PLAINTEXT://192.168.51.128:9092
- # kafka 日志目錄, 目錄也是對應(yīng)不同的端口
- log.dirs=/data/servers/kafka_2.11-2.4.0/logs/9092
- # kafka設(shè)置的partitons的個(gè)數(shù)
- num.partitions=1
- # zookeeper的連接地址, 如果有自己的zookeeper集群, 請直接使用自己搭建的zookeeper集群
- zookeeper.connect=192.168.51.128:2181
修改 Zookeeper 的配置文件:
- dataDir=/data/servers/zookeeper
- server.1=192.168.51.128:2888:3888
然后創(chuàng)建 Zookeeper 的 myid 文件:
- echo "1"> /data/servers/zookeeper/myid
⑤啟動 Zookeeper
使用 Kafka 內(nèi)置的 Zookeeper:
- cd /data/servers/kafka_2.11-2.4.0/bin
- zookeeper-server-start.sh -daemon ../config/zookeeper.properties
- netstat -anp |grep 2181
啟動 Kafka:
- ./kafka-server-start.sh -daemon ../config/server_9092.properties
- ./kafka-server-start.sh -daemon ../config/server_9093.properties
- ./kafka-server-start.sh -daemon ../config/server_9094.properties
Kafka 的操作
①Topic
我們先來看一下創(chuàng)建 Topic 常用的參數(shù)吧:
- --create:創(chuàng)建 topic
- --delete:刪除 topic
- --alter:修改 topic 的名字或者 partition 個(gè)數(shù)
- --list:查看 topic
- --describe:查看 topic 的詳細(xì)信息
- --topic
- --zookeeper
示例:
- cd /data/servers/kafka_2.11-2.4.0/bin
- # 創(chuàng)建topic test1
- kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test1
- # 創(chuàng)建topic test2
- kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test2
- # 查看topic
- kafka-topics.sh --list --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094
②自動創(chuàng)建 Topic
我們在工作中,如果我們不想去管理 Topic,可以通過 Kafka 的配置文件來管理。
我們可以讓 Kafka 自動創(chuàng)建 Topic,需要在我們的 Kafka 配置文件中加入如下配置文件:
- auto.create.topics.enable=true
如果刪除 Topic 想達(dá)到物理刪除的目的,也是需要配置的:
- delete.topic.enable=true
③發(fā)送消息
他們可以通過客戶端的命令生產(chǎn)消息,先來看看 kafka-console-producer.sh 常用的幾個(gè)參數(shù)吧:
- --topic <String: topic>:指定 topic
- --timeout <Integer: timeout_ms>:超時(shí)時(shí)間
- --sync:異步發(fā)送消息
- --broker-list <String: broker-list>:官網(wǎng)提示: REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.
這個(gè)參數(shù)是必須的:
- kafka-console-producer.sh --broker-list 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1
④消費(fèi)消息
我們也還是先來看看 kafka-console-consumer.sh 的參數(shù)吧:
- --topic <String: topic>:指定 topic
- --group <String: consumer group id>:指定消費(fèi)者組
- --from-beginning:指定從開始進(jìn)行消費(fèi), 如果不指定, 就從當(dāng)前進(jìn)行消費(fèi)
- --bootstrap-server:Kafka 的連接地址
- kafka-console-consumer.sh --bootstrap-server 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1 ---beginning
Kafka 的日志
Kafka 的日志分兩種:
- 第一種日志是我們的 Kafka 的啟動日志,就是我們排查問題,查看報(bào)錯(cuò)信息的日志。
- 第二種日志就是我們的數(shù)據(jù)日志,Kafka 是我們的數(shù)據(jù)是以日志的形式存在存盤中的,我們第二種所說的日志就是我們的 Partiton 與 Segment。
那我們就來說說備份和分區(qū)吧:我們創(chuàng)建一個(gè)分區(qū),一個(gè)備份,那么 test 就應(yīng)該在三臺機(jī)器上或者三個(gè)數(shù)據(jù)目錄只有一個(gè) test-0。(分區(qū)的下標(biāo)是從 0 開始的)
如果我們創(chuàng)建 N 個(gè)分區(qū),我們就會在三個(gè)服務(wù)器上發(fā)現(xiàn),test_0-n,如果我們創(chuàng)建 M 個(gè)備份,我們就會在發(fā)現(xiàn),test_0 到 test_n 每一個(gè)都是 M 個(gè)。
Kafka API
使用 Kafka 原生的 API
①消費(fèi)者自動提交
定義自己的生產(chǎn)者:
- import org.apache.kafka.clients.producer.Callback;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
- import java.util.Properties;
- /**
- * @ClassName MyKafkaProducer
- * @Description TODO
- * @Author lingxiangxiang
- * @Date 3:37 PM
- * @Version 1.0
- **/
- public class MyKafkaProducer {
- private org.apache.kafka.clients.producer.KafkaProducer<Integer, String> producer;
- public MyKafkaProducer() {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
- properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
- properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- // 設(shè)置批量發(fā)送
- properties.put("batch.size", 16384);
- // 批量發(fā)送的等待時(shí)間50ms, 超過50ms, 不足批量大小也發(fā)送
- properties.put("linger.ms", 50);
- this.producer = new org.apache.kafka.clients.producer.KafkaProducer<Integer, String>(properties);
- }
- public boolean sendMsg() {
- boolean result = true;
- try {
- // 正常發(fā)送, test2是topic, 0代表的是分區(qū), 1代表的是key, hello world是發(fā)送的消息內(nèi)容
- final ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("test2", 0, 1, "hello world");
- producer.send(record);
- // 有回調(diào)函數(shù)的調(diào)用
- producer.send(record, new Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- System.out.println(recordMetadata.topic());
- System.out.println(recordMetadata.partition());
- System.out.println(recordMetadata.offset());
- }
- });
- // 自己定義一個(gè)類
- producer.send(record, new MyCallback(record));
- } catch (Exception e) {
- result = false;
- }
- return result;
- }
- }
定義生產(chǎn)者發(fā)送成功的回調(diào)函數(shù):
- import org.apache.kafka.clients.producer.Callback;
- import org.apache.kafka.clients.producer.RecordMetadata;
- /**
- * @ClassName MyCallback
- * @Description TODO
- * @Author lingxiangxiang
- * @Date 3:51 PM
- * @Version 1.0
- **/
- public class MyCallback implements Callback {
- private Object msg;
- public MyCallback(Object msg) {
- this.msg = msg;
- }
- @Override
- public void onCompletion(RecordMetadata metadata, Exception e) {
- System.out.println("topic = " + metadata.topic());
- System.out.println("partiton = " + metadata.partition());
- System.out.println("offset = " + metadata.offset());
- System.out.println(msg);
- }
- }
生產(chǎn)者測試類:在生產(chǎn)者測試類中,自己遇到一個(gè)坑,就是最后自己沒有加 sleep,就是怎么檢查自己的代碼都沒有問題,但是最后就是沒法發(fā)送成功消息,最后加了一個(gè) sleep 就可以了。
因?yàn)橹骱瘮?shù) main 已經(jīng)執(zhí)行完退出,但是消息并沒有發(fā)送完成,需要進(jìn)行等待一下。當(dāng)然,你在生產(chǎn)環(huán)境中可能不會遇到這樣問題,呵呵!
代碼如下:
- import static java.lang.Thread.sleep;
- /**
- * @ClassName MyKafkaProducerTest
- * @Description TODO
- * @Author lingxiangxiang
- * @Date 3:46 PM
- * @Version 1.0
- **/
- public class MyKafkaProducerTest {
- public static void main(String[] args) throws InterruptedException {
- MyKafkaProducer producer = new MyKafkaProducer();
- boolean result = producer.sendMsg();
- System.out.println("send msg " + result);
- sleep(1000);
- }
- }
消費(fèi)者類:
- import kafka.utils.ShutdownableThread;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import java.util.Arrays;
- import java.util.Collections;
- import java.util.Properties;
- /**
- * @ClassName MyKafkaConsumer
- * @Description TODO
- * @Author lingxiangxiang
- * @Date 4:12 PM
- * @Version 1.0
- **/
- public class MyKafkaConsumer extends ShutdownableThread {
- private KafkaConsumer<Integer, String> consumer;
- public MyKafkaConsumer() {
- super("KafkaConsumerTest", false);
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
- properties.put("group.id", "mygroup");
- properties.put("enable.auto.commit", "true");
- properties.put("auto.commit.interval.ms", "1000");
- properties.put("session.timeout.ms", "30000");
- properties.put("heartbeat.interval.ms", "10000");
- properties.put("auto.offset.reset", "earliest");
- properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
- properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- this.consumer = new KafkaConsumer<Integer, String>(properties);
- }
- @Override
- public void doWork() {
- consumer.subscribe(Arrays.asList("test2"));
- ConsumerRecords<Integer, String>records = consumer.poll(1000);
- for (ConsumerRecord record : records) {
- System.out.println("topic = " + record.topic());
- System.out.println("partition = " + record.partition());
- System.out.println("key = " + record.key());
- System.out.println("value = " + record.value());
- }
- }
- }
消費(fèi)者的測試類:
- /**
- * @ClassName MyConsumerTest
- * @Description TODO
- * @Author lingxiangxiang
- * @Date 4:23 PM
- * @Version 1.0
- **/
- public class MyConsumerTest {
- public static void main(String[] args) {
- MyKafkaConsumer consumer = new MyKafkaConsumer();
- consumer.start();
- System.out.println("==================");
- }
- }
②消費(fèi)者同步手動提交
前面的消費(fèi)者都是以自動提交 Offset 的方式對 Broker 中的消息進(jìn)行消費(fèi)的,但自動提交 可能會出現(xiàn)消息重復(fù)消費(fèi)的情況。
所以在生產(chǎn)環(huán)境下,很多時(shí)候需要對 Offset 進(jìn)行手動提交, 以解決重復(fù)消費(fèi)的問題。
手動提交又可以劃分為同步提交、異步提交,同異步聯(lián)合提交。這些提交方式僅僅是 doWork() 方法不相同,其構(gòu)造器是相同的。
所以下面首先在前面消費(fèi)者類的基礎(chǔ)上進(jìn)行構(gòu)造器的修改,然后再分別實(shí)現(xiàn)三種不同的提交方式。
同步提交方式是,消費(fèi)者向 Broker 提交 Offset 后等待 Broker 成功響應(yīng)。若沒有收到響應(yīng),則會重新提交,直到獲取到響應(yīng)。
而在這個(gè)等待過程中,消費(fèi)者是阻塞的。其嚴(yán)重影響了消費(fèi)者的吞吐量。
修改前面的 MyKafkaConsumer.java, 主要修改下面的配置:
- import kafka.utils.ShutdownableThread;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import java.util.Arrays;
- import java.util.Collections;
- import java.util.Properties;
- /**
- * @ClassName MyKafkaConsumer
- * @Description TODO
- * @Author lingxiangxiang
- * @Date 4:12 PM
- * @Version 1.0
- **/
- public class MyKafkaConsumer extends ShutdownableThread {
- private KafkaConsumer<Integer, String> consumer;
- public MyKafkaConsumer() {
- super("KafkaConsumerTest", false);
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
- properties.put("group.id", "mygroup");
- // 這里要修改成手動提交
- properties.put("enable.auto.commit", "false");
- // properties.put("auto.commit.interval.ms", "1000");
- properties.put("session.timeout.ms", "30000");
- properties.put("heartbeat.interval.ms", "10000");
- properties.put("auto.offset.reset", "earliest");
- properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
- properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- this.consumer = new KafkaConsumer<Integer, String>(properties);
- }
- @Override
- public void doWork() {
- consumer.subscribe(Arrays.asList("test2"));
- ConsumerRecords<Integer, String>records = consumer.poll(1000);
- for (ConsumerRecord record : records) {
- System.out.println("topic = " + record.topic());
- System.out.println("partition = " + record.partition());
- System.out.println("key = " + record.key());
- System.out.println("value = " + record.value());
- //手動同步提交
- consumer.commitSync();
- }
- }
- }
③消費(fèi)者異步手工提交
手動同步提交方式需要等待 Broker 的成功響應(yīng),效率太低,影響消費(fèi)者的吞吐量。
異步提交方式是,消費(fèi)者向 Broker 提交 Offset 后不用等待成功響應(yīng),所以其增加了消費(fèi)者的吞吐量。
- import kafka.utils.ShutdownableThread;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import java.util.Arrays;
- import java.util.Collections;
- import java.util.Properties;
- /**
- * @ClassName MyKafkaConsumer
- * @Description TODO
- * @Author lingxiangxiang
- * @Date 4:12 PM
- * @Version 1.0
- **/
- public class MyKafkaConsumer extends ShutdownableThread {
- private KafkaConsumer<Integer, String> consumer;
- public MyKafkaConsumer() {
- super("KafkaConsumerTest", false);
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
- properties.put("group.id", "mygroup");
- // 這里要修改成手動提交
- properties.put("enable.auto.commit", "false");
- // properties.put("auto.commit.interval.ms", "1000");
- properties.put("session.timeout.ms", "30000");
- properties.put("heartbeat.interval.ms", "10000");
- properties.put("auto.offset.reset", "earliest");
- properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
- properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- this.consumer = new KafkaConsumer<Integer, String>(properties);
- }
- @Override
- public void doWork() {
- consumer.subscribe(Arrays.asList("test2"));
- ConsumerRecords<Integer, String>records = consumer.poll(1000);
- for (ConsumerRecord record : records) {
- System.out.println("topic = " + record.topic());
- System.out.println("partition = " + record.partition());
- System.out.println("key = " + record.key());
- System.out.println("value = " + record.value());
- //手動同步提交
- // consumer.commitSync();
- //手動異步提交
- // consumer.commitAsync();
- // 帶回調(diào)公共的手動異步提交
- consumer.commitAsync((offsets, e) -> {
- if(e != null) {
- System.out.println("提交次數(shù), offsets = " + offsets);
- System.out.println("exception = " + e);
- }
- });
- }
- }
- }
Spring Boot 使用 Kafka
現(xiàn)在大家的開發(fā)過程中,很多都用的是 Spring Boot 的項(xiàng)目,直接啟動了,如果還是用原生的 API,就是有點(diǎn) Low 了啊,那 Kafka 是如何和 Spring Boot 進(jìn)行聯(lián)合的呢?
maven 配置:
- <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.1.1</version>
- </dependency>
添加配置文件,在 application.properties 中加入如下配置信息:
Kafka 連接地址:
- spring.kafka.bootstrap-servers = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094
生產(chǎn)者:
- spring.kafka.producer.acks = 0
- spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.producer.retries = 3
- spring.kafka.producer.batch-size = 4096
- spring.kafka.producer.buffer-memory = 33554432
- spring.kafka.producer.compression-type = gzip
消費(fèi)者:
- spring.kafka.consumer.group-id = mygroup
- spring.kafka.consumer.auto-commit-interval = 5000
- spring.kafka.consumer.heartbeat-interval = 3000
- spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.consumer.auto-offset-reset = earliest
- spring.kafka.consumer.enable-auto-commit = true
- # listenner, 標(biāo)識消費(fèi)者監(jiān)聽的個(gè)數(shù)
- spring.kafka.listener.concurrency = 8
- # topic的名字
- kafka.topic1 = topic1
生產(chǎn)者:
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.kafka.core.KafkaTemplate;
- @Service
- @Slf4j
- public class MyKafkaProducerServiceImpl implements MyKafkaProducerService {
- @Resource
- private KafkaTemplate<String, String> kafkaTemplate;
- // 讀取配置文件
- @Value("${kafka.topic1}")
- private String topic;
- @Override
- public void sendKafka() {
- kafkaTemplate.send(topic, "hell world");
- }
- }
消費(fèi)者:
- @Component
- @Slf4j
- public class MyKafkaConsumer {
- @KafkaListener(topics = "${kafka.topic1}")
- public void listen(ConsumerRecord<?, ?> record) {
- Optional<?> kafkaMessage = Optional.ofNullable(record.value());
- if (kafkaMessage.isPresent()) {
- log.info("----------------- record =" + record);
- log.info("------------------ message =" + kafkaMessage.get());
- }
【51CTO原創(chuàng)稿件,合作站點(diǎn)轉(zhuǎn)載請注明原文作者和出處為51CTO.com】