被錘了:Acks=all消息也會丟失?
消息隊列是面試中一定會被問到的技術模塊,雖然它在面試題占比不及并發編程和數據庫,但也屬于面試中的關鍵性問題。所以今天我們就來看一道,MQ 中高頻,但可能會打破你以往認知的一道面試題。
所謂的關鍵問題指的是這道面試題會影響你整體面試結果。
我們在面試消息隊列(Message Queue,MQ)時,尤其是面試 Kafka 時,經常會被問到:如何保證消息不丟失?
那么,我們的回答會分為以下 3 部分:
- 保證生產者消息不丟失。
- 保證 Kafka 服務(器端)消息不丟失。
- 保證消費者消息不丟失。
只有保證這 3 部分消息都不丟失,才能保證 Kafka 整體消息不丟失。
因為 Kafka 消息的傳遞流程如下(總共包含 3 部分):
1.如何保證生產者消息不丟失?
那怎么保證生產者消息不丟失呢?
要搞明白這個事,我們就要先了解一下生產者發送消息的執行流程。
Kafka 生產者發送消息的執行流程如下:
默認情況下,所有的消息會先緩存到 RecordAccumulator 緩存中,再由 Sender 線程拉取消息發送到 Kafka 服務器端,通過 RecordAccumulator 和 Sender 線程的協作,實現了消息的批量發送、性能優化和異常處理等功能,確保了消息的高效可靠傳輸。
(1)RecordAccumulator 緩存作用
- 暫存消息:RecordAccumulator 是 Kafk a生產者中的一個關鍵組件,它充當了一個緩存的角色,用于暫存主線程(Main Thread)發送過來的消息。這些消息在 RecordAccumulato r中等待被 Sender 線程批量發送。
- 批量發送:RecordAccumulator 通過批量收集消息,減少了單個消息發送的網絡請求次數,從而提高了發送效率。Sender 線程可以從 RecordAccumulator 中批量獲取消息,一次性發送到 Kafka 集群,減少了網絡傳輸的資源消耗。
- 性能優化:RecordAccumulator的緩存大小可以通過生產者客戶端參數 buffer.memory 進行配置(默認值為 32MB)。合理的緩存大小設置可以平衡內存使用與發送效率,達到最優的性能表現。
- 內存管理:如果 RecordAccumulator 的緩存空間被占滿,生產者再次調用 send() 方法發送消息時,會出現阻塞(默認阻塞時間為 60 秒,可通過 max.block.ms 參數配置)。如果阻塞超時,則會拋出異常。這種機制有助于防止生產者因為無限制地緩存消息而耗盡系統資源。
- ByteBuffer 復用:為了減少頻繁創建和釋放 ByteBuffer 所造成的資源消耗,RecordAccumulator 內部還維護了一個 BufferPool,用于實現 ByteBuffer 的復用。特定大小的 ByteBuffer 會被緩存起來,以便后續消息發送時重復使用。
(2)Sender 線程作用
- 拉取消息:Sender 線程是 Kafka 生產者中的一個后臺線程,它負責從 RecordAccumulator 中拉取緩存的消息。Sender 線程會定期輪詢 RecordAccumulator,檢查是否有新消息需要發送。
- 批量構建請求:當 Sender 線程發現有新消息需要發送時,它會構建一個或多個 ProducerRequest 請求。每個請求包含多個消息,以便進行有效的批量發送。這種批量發送機制可以顯著提高網絡傳輸效率。
- 發送消息到 Kafka 集群:Sender 線程將構建的 ProducerRequest 請求發送到 Kafka 集群的相應分區。它會根據分區的 Leader 節點信息,將消息發送給對應的 Broker 節點。
- 異常處理:在消息發送過程中,可能會出現網絡故障、分區不可用等異常情況。Sender 線程負責處理這些異常,例如進行重試、重新連接等操作,以確保消息的可靠發送。
- 狀態更新:一旦消息被成功接收并記錄在 Kafka Broker 的日志中,Sender 線程會通知 RecordAccumulator 更新消息的狀態。這樣,生產者就能夠知道哪些消息已經被成功發送,哪些消息還需要重試發送。
2.生產者消息丟失的兩種場景
了解了 Kafka 生產者發送消息的流程之后,我們就能知道在這個環節丟失消息的情況有以下兩種:
- 網絡抖動(消息不可達):生產者與 Kafka 服務端之間的鏈路不可達,發送超時。此時各個節點的狀態是正常,但消費端就是沒有消費消息,就像消息丟失了一樣。
- 無消息確認(ack):生產者消息發送之后,無 ack 消息確認,直接返回消息發送成功,但消息發送之后,Kafka 服務宕機或掉電了,導致消息丟失。
怎么解決這個問題呢?
(1)網絡波動問題處理
網絡波動的話設置消息重試即可,因為網絡抖動消息不可達,所以只要配置了重試次數,那么就會消息重試以此來保證消息不丟失。
在 Spring Boot 項目中,只需要在配置文件 application.yml 中,設置生產者的重試次數即可:
spring:
kafka:
producer:
retries: 3
(2)消息確認設置
Kafka 生產者的 ACK(Acknowledgment)機制是指生產者在發送消息到 Kafka 集群后,等待確認的方式。這個機制決定了生產者何時認為消息已經成功發送,并直接影響到消息的可靠性和性能。
Kafka 生產者的 ACK 機制主要有以下三種類型。
① acks=0
生產者在將消息發送到網絡緩沖區后,立即認為消息已被提交,不會等待任何來自服務器的響應。這時設置的重試次數 retries 無效。
特點:
- 最高性能:由于不需要等待任何確認,因此具有最高的吞吐量。
- 最低可靠性:消息可能會在發送過程中丟失,生產者無法知道消息是否成功到達服務器。
適用場景:對消息可靠性要求不高,但追求極致性能的場景。
② acks=1
生產者在將消息發送到主題的分區 leader 后,等待 leader 的確認,即認為消息已被提交(此時 leader 寫入成功,并沒有刷新到磁盤),不用等待所有副本的確認。
特點:
- 中等可靠性和性能:提供了一定程度的可靠性,因為只有領導者副本確認消息后生產者才會收到確認。但如果領導者副本在確認后發生故障,而消息還未復制到其他副本,則消息可能會丟失。
- 性能與可靠性平衡:在生產者性能和消息可靠性之間提供了一個折衷方案。
適用場景:適用于傳輸普通日志,允許偶爾丟失少量數據的場景。
③ acks=all 或 acks=-1
生產者需要等待所有同步副本(ISR, In-Sync Replicas)都成功寫入消息后,才認為消息已被提交。
特點:
- 最高可靠性:只有當所有同步副本都確認接收到消息后,生產者才會收到確認,確保了消息的可靠性。
- 較低性能:由于需要等待所有同步副本的確認,因此可能會導致消息發送的延遲增加,從而影響性能。
適用場景:適用于對消息可靠性要求極高的場景,如金融交易等關鍵任務應用。
在 Spring Boot 項目中,acks 可以在配置文件 application.yml 中設置:
spring:
kafka:
producer:
acks: all
3.acks=all消息一定不會丟失嗎?
正常情況下當我們設置 acks=all 時,其實是可以保證數據不丟失了。但是有一種特殊情況,如果 Topic 只有一個 Partition(分區時),也就是只有一個 Leader 節點時,此時消息也是會丟失的。
如果只有一個 Leader 節點,acks=all 的設置和 acks=1 的設置效果基本類似,當 Leader 確認消息之后,還沒來得及將消息刷到磁盤之前宕機了,那么就會造成消息丟失。
萬事必有妖,當面試官用疑問語句問你時,答案基本是否定的。如果是確定的話,面試官可能也就不會再問你了,所以當你聽到一個有悖于常識的問題時,先努力思考這個問題還有沒有其他答案。