詳解Kafka消息發送重試機制的案例
在 Kafka 生產者中實現消息發送的重試機制,可以通過配置 KafkaProducer 的相關屬性來實現。以下是一些關鍵的配置項:
retries:設置生產者發送失敗后重試的次數。
retry.backoff.ms:設置生產者在重試前等待的時間。
buffer.memory:設置生產者在內存中緩存數據的最大值,如果達到這個值,生產者會拒絕接受新的消息,直到當前緩存的消息被發送出去。
batch.size:設置生產者在發送批次中可以包含的最大消息數。
linger.ms:設置生產者在發送批次之前等待更多消息的最大時間。
max.in.flight.requests.per.connection:設置每個連接最多數未完成的請求。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String[] args) {
// 配置生產者屬性
Properties props = new Properties();
props.put("bootstrap.servers", "4.5.8.4:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("retries", 5); // 設置重試次數
props.put("retry.backoff.ms", 100); // 設置重試間隔
props.put("buffer.memory", 33554432); // 設置緩沖區大小
props.put("batch.size", 16384); // 設置批次大小
props.put("linger.ms", 1); // 設置等待時間
props.put("max.in.flight.requests.per.connection", 5); // 設置最大在途請求數
// 創建生產者實例
Producer<String, String> producer = new KafkaProducer<>(props);
// 發送消息
for (int i = 0; i < 1000000; i++) {
String key = "案例1=====" + i;
System.out.println("key:"+key);
String value = "Spring AI Alibaba 實現了與阿里云通義模型的完整適配,接下來,我們將學習如何使用 spring ai alibaba 開發一個基于通義模型服務的智能聊天應用" + i;
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", key, value);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 處理消息發送失敗的情況
System.err.println("發送消息失敗:" + exception.getMessage());
} else {
// 處理消息發送成功的情況
System.out.println("消息發送成功,偏移量:" + metadata.offset());
}
});
}
// 關閉生產者
producer.close();
}
}
在這個示例中,我們設置了重試次數、重試間隔、緩沖區大小、批次大小、等待時間和最大在途請求數。此外,我們還為 send 方法提供了一個回調函數,用于處理消息發送成功或失敗的情況。這樣,當消息發送失敗時,生產者會自動重試,直到達到配置的重試次數。如果所有重試都失敗,回調函數會收到異常通知,你可以在回調中實現進一步的錯誤處理邏輯。
?? 如何配置Kafka生產者的重試策略?
其實上面也有說,再次總結下
要配置 Kafka 生產者的重試策略,你可以按照以下步驟進行:
- 設置重試次數:
- 通過設置 retries 屬性來指定生產者在遇到錯誤時重試發送消息的次數。例如,設置 retries 為 3 表示生產者會嘗試最多 3 次發送消息。
- 設置重試間隔:
- 使用 retry.backoff.ms 屬性來配置重試之間的時間間隔。這個設置可以防止生產者在連續的短時間內發送大量重試請求,給 Kafka 集群或網絡造成壓力。
- 確保消息冪等性:
- 設置 enable.idempotence 為 true 以確保生產者發送消息的邏輯是冪等的,即使消息被重復發送也不會影響系統狀態。
- 配置確認策略:
- 通過 acks 屬性來確保消息被所有副本確認。例如,設置 acks 為 “all” 可以確保消息被所有副本確認后才認為是成功發送。
- 異步發送與回調:
- 使用異步發送消息,并在回調中處理發送失敗的情況。在回調中對異常進行分類處理,對于可恢復的錯誤進行重試,對于不可恢復的錯誤進行日志記錄或報警。
- 錯誤處理與日志記錄:
- 在回調函數中捕獲并處理異常,同時記錄詳細的錯誤日志,便于問題排查和監控。
- 監控與告警:
- 對生產者的關鍵性能指標進行監控,如發送延遲、吞吐量等。當指標出現異常時,及時觸發告警通知相關人員處理。
- 合理配置重試機制:
- 根據業務需求合理配置重試次數和重試間隔,以減少因網絡波動或 Kafka 集群短暫不可用導致的消息丟失風險。
- 設置最大在途請求:
- 通過 max.in.flight.requests.per.connection 屬性限制每個連接最多數未完成的請求,這有助于控制內存使用和重試的并發量。
- 配置超時時間:
- Kafka 2.4 版本引入了 delivery.timeout.ms 參數,它設置了發送記錄和接收確認之間的超時時間。這個參數與 retries 結合使用,可以提供更靈活的重試控制。
通過上述配置,你可以為 Kafka 生產者設置一個健壯的重試策略,以確保在面對網絡問題或 Kafka 集群短暫不可用時,消息能夠被可靠地發送。