「Kafka技術」Apache Kafka中的事務
在之前的一篇博客文章中,我們介紹了Apache Kafka?的一次語義。這篇文章介紹了各種消息傳遞語義,介紹了冪等生成器、事務和Kafka流的一次處理語義。現在,我們將繼續上一節的內容,深入探討Apache Kafka中的事務。該文檔的目標是讓讀者熟悉有效使用Apache Kafka中的事務API所需的主要概念。
我們將討論設計事務API的主要用例、Kafka的事務語義、用于Java客戶端的事務API的細節、實現的有趣方面,以及在使用API時的重要注意事項。
這篇博客文章并不是關于使用事務細節的教程,我們也不會深入討論設計細節。相反,我們將在適當的地方鏈接到JavaDocs或設計文檔,以供希望深入研究的讀者使用。
我們希望讀者熟悉基本的Kafka概念,比如主題、分區、日志偏移量,以及代理和客戶在基于Kafka的應用程序中的角色。熟悉Java的Kafka客戶機也會有所幫助。
為什么交易?
我們在Kafka中設計的事務主要用于那些顯示“讀-進程-寫”模式的應用程序,其中的讀和寫來自于異步數據流,比如Kafka主題。這種應用程序通常稱為流處理應用程序。
第一代流處理應用程序可以容忍不準確的處理。例如,使用web頁面印象流并生成每個web頁面的視圖聚合計數的應用程序可以容忍計數中的一些錯誤。
然而,隨著這些應用程序的流行,對具有更強語義的流處理應用程序的需求也在增長。例如,一些金融機構使用流處理應用程序來處理用戶帳戶上的借方和貸方。在這些情況下,不能容忍處理過程中的錯誤:我們需要準確地一次處理所有消息,沒有例外。
更正式地說,如果流處理應用程序使用消息a并生成消息B,使得B = F(a),那么僅一次處理就意味著如果且僅當成功生成B時才使用a,反之亦然。
使用配置為至少一次傳遞語義的普通Kafka生產者和消費者,流處理應用程序可能會在以下方面失去一次處理語義:
- 由于內部重試,生產者.send()可能導致消息B的重復寫入。這是由冪等生產者解決的,并不是本文其余部分的重點。
- 我們可能會重新處理輸入消息A,導致將重復的B消息寫入輸出,這違反了一次處理語義。如果流處理應用程序在寫入B之后但在將A標記為已使用之前崩潰,則可能發生重新處理。因此,當它恢復時,它將再次消耗A并再次寫入B,從而導致重復。
- 最后,在分布式環境中,應用程序會崩潰,甚至更糟!-暫時失去與系統其余部分的連接。通常,會自動啟動新實例來替換那些被認為丟失的實例。通過這個過程,我們可能會有多個實例處理相同的輸入主題,并寫入相同的輸出主題,從而導致輸出重復,并違反一次處理語義。我們稱之為“僵尸實例”問題。
我們在Kafka中設計了事務api來解決第二個和第三個問題。事務通過使這些周期成為原子性的,并通過促進僵死的隔離,從而在讀寫周期中實現精確的一次處理。
事務性語義
原子多分區寫道
事務允許對多個Kafka主題和分區進行原子寫入。事務中包含的所有消息都將被成功寫入,或者一個也不寫入。例如,處理過程中的錯誤可能導致事務中止,在這種情況下,來自事務的任何消息都不會被使用者讀取。現在我們來看看它是如何實現原子讀寫周期的。
首先,讓我們考慮原子讀寫周期的含義。簡而言之,這意味著如果一個應用程序使用一個消息的抵消X topic-partition tp0,和寫消息B topic-partition tp1在消息上做一些處理,B = F (a),然后read-process-write周期是a和B原子只有在消息被認為成功地消耗和發表在一起,要么一無所有。
現在,只有當消息A的偏移量X標記為已使用時,才會認為它是從主題分區tp0使用的。將偏移量標記為已使用的偏移量稱為提交偏移量。在Kafka中,我們通過寫入內部Kafka主題offsets主題來記錄偏移量提交。僅當消息的偏移量提交到偏移量主題時,才認為該消息已被消耗。
因此從一個偏移量提交只是另一個寫一個卡夫卡的話題,因為消息被認為是只有當其抵消消費承諾,原子還寫跨多個主題和分區使原子read-process-write周期:提交的抵消X的補償主題寫的消息B tp1將單個事務的一部分,因此原子。
僵尸(Zombie fencing)
我們通過要求為每個事務生產者分配一個稱為transaction .id的惟一標識符來解決zombie實例的問題。這用于跨流程重新啟動標識相同的生產者實例。
API要求事務生產者的第一個操作應該是顯式注冊其事務。使用Kafka集群的id。當它這樣做時,Kafka代理使用給定的事務檢查打開的事務。id并完成它們。它還增加與transaction .id關聯的epoch。epoch是存儲在每個transaction .id中的內部元數據。
一旦epoch被碰撞,任何具有相同事務的生產者。身份證和舊時代被認為是僵尸,被隔離。來自這些生產者的未來事務寫將被拒絕。
讀事務消息
現在,讓我們將注意力轉向在讀取作為事務的一部分寫入的消息時提供的保證。
Kafka使用者只會在事務被提交時才會向應用程序提交事務消息。換句話說,使用者不會交付作為開放事務一部分的事務性消息,也不會交付作為中止事務一部分的消息。
值得注意的是,上面的保證沒有達到原子讀取。特別是,當使用Kafka使用者來消費來自主題的消息時,應用程序將不知道這些消息是否作為事務的一部分寫入,因此它們不知道事務何時開始或結束。進一步說,一個給定的消費者不保證訂閱所有分區事務的一部分,它沒有發現這個方法,這就很難保證所有的信息是一個事務的一部分最終會被一個消費者。
簡而言之:Kafka保證使用者最終只交付非事務性消息或提交的事務性消息。它將從打開的事務中保留消息,并從中止的事務中過濾出消息。
Java中的事務API
事務特性主要是一個服務器端和協議級特性,任何支持它的客戶端庫都可以使用它。用Java編寫的“讀-處理-寫”應用程序,使用Kafka的事務API,看起來應該是這樣的:
第1-5行通過指定事務設置生產者。配置id并將其注冊到initTransactions API。inittransactions()返回后,由具有相同事務的生產者的另一個實例啟動的任何事務。id會被關閉和隔離。
第7-10行指定KafkaConsumer應該只讀取非事務性消息,或者從它的輸入主題中提交事務性消息。流處理應用程序通常在多個讀寫階段處理其數據,每個階段使用前一階段的輸出作為其輸入。通過指定read_committed模式,我們可以在所有階段只執行一次處理。
第14-21行演示了讀寫循環的核心:我們使用一些記錄,啟動一個事務,處理使用的記錄,將處理過的記錄寫入輸出主題,將使用的偏移量發送到偏移量主題,最后提交事務。根據上面提到的保證,我們知道偏移量和輸出記錄將作為一個原子單元提交。
事務是如何工作的
在本節中,我們將簡要概述上述事務api引入的新組件和新數據流。為了更詳盡地討論這個主題,您可以閱讀原始設計文檔,或者觀看介紹事務的Kafka峰會演講。
下面內容的目標是在調試使用事務的應用程序時,或者在嘗試調優事務以獲得更好的性能時,提供一個心智模型。
事務協調器和事務日志
Kafka 0.11.0中的transactions API引入的組件是事務協調器和上圖右側的事務日志。
事務協調器是在每個Kafka代理中運行的模塊。事務日志是一個內部kafka主題。每個協調器在事務日志中擁有一些分區子集。其代理為其領導的分區。
每一個事務。id通過一個簡單的哈希函數映射到事務日志的特定分區。這意味著只有一個協調器擁有給定的transaction .id。
通過這種方式,我們利用Kafka的rock solid復制協議和leader選擇過程來確保事務協調器總是可用的,并且所有事務狀態都被持久地存儲。
值得注意的是,事務日志只存儲事務的最新狀態,而不是事務中的實際消息。消息僅存儲在實際的主題分區中。事務可以處于“進行中”、“準備提交”和“完成”等不同狀態。存儲在事務日志中的就是這種狀態和相關的元數據。
數據流
在較高的層次上,數據流可以分為四種不同的類型。
A:生產者和事務協調者的交互
執行事務時,生產者向事務協調器發出以下請求:
initTransactions API注冊一個事務。id與協調器。此時,協調器將使用該事務關閉任何掛起的事務。id和碰撞的時代,以柵欄出僵尸。每個生產者會話只發生一次。
當生產者在事務中第一次將數據發送到一個分區時,該分區首先向協調器注冊。
當應用程序調用commitTransaction或abortTransaction時,將向協調器發送一個請求,以開始兩階段提交協議。
B:協調器和事務日志的交互
隨著事務的進展,生產者發送上述請求來更新協調器上事務的狀態。事務協調器將其擁有的每個事務的狀態保存在內存中,并將該狀態寫入事務日志(以三種方式復制,因此是持久的)。
事務協調器是從事務日志中讀寫的惟一組件。如果給定的代理失敗,則將選出一個新的協調器作為死代理擁有的事務日志分區的leader,它將從傳入分區讀取消息,以便為這些分區中的事務重建其內存狀態。
C:生產者寫數據到目標主題分區
在向協調器注冊了事務中的新分區之后,生產者將數據正常地發送到實際的分區。這是同一個生產者。發送流,但是要進行一些額外的驗證以確保生產者不受保護。
D:主題分區交互的協調器
在生產者發起提交(或中止)之后,協調器開始兩階段提交協議。
在第一階段,協調器將其內部狀態更新為“prepare_commit”,并在事務日志中更新此狀態。一旦完成了這一步,就可以保證在任何情況下提交事務。
然后協調器開始第2階段,將事務提交標記寫入作為事務一部分的主題分區。
這些事務標記不公開給應用程序,而是由處于read_committed模式的使用者使用,以過濾掉中止的事務中的消息,并且不返回作為打開事務一部分的消息(即,在日志中但沒有與之關聯的事務標記的。
一旦寫入了標記,事務協調器將事務標記為“完成”,并且生產者可以啟動下一個事務。
實踐中處理交易
既然我們已經理解了事務的語義以及它們是如何工作的,那么我們就將注意力轉向編寫利用事務的應用程序的實踐方面。
如何選擇一個transaction .id
事務。id在保護僵尸方面起著重要作用。但是保持一個標識符在不同的生產者會話之間是一致的,并且適當地隔離僵尸是有點棘手的。
正確隔離“僵尸”的關鍵是確保對于給定的transaction .id,讀寫周期中的輸入主題和分區總是相同的。如果這不是真的,那么一些消息可能會通過事務提供的圍欄泄漏。
例如,在一個分布式流處理應用程序中,假設主題分區tp0最初是由transactional處理的。T0 id。如果在以后的某個時候,它可以映射到另一個具有transactional的生產者。id T1,在T0和T1之間沒有柵欄。因此,可以對來自tp0的消息進行重新處理,這違反了一次處理的保證。
實際上,必須存儲輸入分區和事務之間的映射。外部存儲中的id,或者對其進行一些靜態編碼。Kafka Streams選擇后一種方法來解決這個問題。
事務如何執行,以及如何調優它們
事務生產者的性能
讓我們將注意力轉向事務如何執行。
首先,事務只導致適度的寫放大。增加的寫是由于:
- 對于每個事務,我們都有額外的rpc向協調器注冊分區。這些是成批的,因此我們的rpc比事務中的分區要少。
- 在完成事務時,必須將一個事務標記寫入參與事務的每個分區。同樣,事務協調器在單個RPC中批量處理為同一代理綁定的所有標記,因此我們在那里保存RPC開銷。但是我們不能避免對事務中的每個分區進行一次額外的寫操作。
- 最后,我們將狀態更改寫入事務日志。這包括對添加到事務中的每批分區的寫操作、“prepare_commit”狀態和“complete_commit”狀態。
我們可以看到,開銷與作為事務一部分寫入的消息的數量無關。因此,提高吞吐量的關鍵是在每個事務中包含更多的消息。
實際上,對于在最大吞吐量下生成1KB記錄的生產者,每100ms提交一條消息只會導致吞吐量降低3%。較小的消息或較短的事務提交間隔將導致更嚴重的降級。
增加事務持續時間的主要代價是增加了端到端延遲。請記住,讀取事務性消息的使用者不會交付作為開放事務一部分的消息。因此,提交間隔的時間越長,應用程序的等待時間就越長,從而增加了端到端延遲。
事務消費者的性能
事務性消費者比生產者簡單得多,因為它所需要做的就是:
- 篩選屬于中止的事務的消息。
- 不返回作為開放事務一部分的事務消息。
因此,當以read_committed模式讀取事務消息時,事務使用者的吞吐量沒有下降。這樣做的主要原因是,我們在讀取事務性消息時保持零副本讀取。
而且,使用者不需要任何緩沖來等待事務完成。相反,代理不允許它提前進行補償,其中包括打開的事務。
因此,消費者是極其輕量級和高效的。有興趣的讀者可以在本文檔中了解消費者設計的細節。
進一步的閱讀
我們剛剛觸及了Apache Kafka中事務的皮毛。幸運的是,幾乎所有的設計細節都記錄在網上。有關文件如下:
最初的Kafka KIP:它提供了關于數據流的詳細信息和公共接口的概述,特別是隨事務而來的配置選項。
原始設計文檔:不適合膽小的人,這是權威的地方——源代碼外!-了解如何處理每個事務RPC,如何維護事務日志,如何清除事務數據,等等。
KafkaProducer javadocs:這是一個學習如何使用新api的好地方。頁面開頭的示例以及send方法的文檔都是很好的起點。
結論
在這篇文章中,我們了解了Apache Kafka中事務API的關鍵設計目標,理解了事務API的語義,并對API的實際工作方式有了更深入的了解。
如果我們考慮一個讀-進程-寫循環,這篇文章主要討論了讀和寫路徑,處理本身就是一個黑盒。事實上,在處理階段可以做很多事情,這使得僅使用事務api無法保證一次處理。例如,如果處理對其他存儲系統有副作用,這里介紹的api不足以保證只進行一次處理。
Kafka Streams框架使用這里描述的事務api向上移動價值鏈,并為各種流處理應用程序提供一次處理,甚至包括那些在處理期間更新某些額外狀態存儲的應用程序。
將來的一篇博客文章將討論Kafka流如何提供一次處理語義,以及如何編寫利用它的應用程序。
最后,對于那些渴望了解上述api實現細節的人,我們將在另一篇后續博客文章中介紹一些更有趣的解決方案。