成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

原來Kafka也有事務啊,再也不擔心消息不一致了

開發 架構
本文講解了通過kafka事務可以實現端到端的精確一次的消息語義,通過事務機制,KAFKA 實現了對多個 topic? 的多個 partition 的原子性的寫入,通過一個例子了解了一下如何使用事物。

前言

現在假定這么一個業務場景,從kafka中的topic獲取消息數據,經過一定加工處理后,發送到另外一個topic中,要求整個過程消息不能丟失,也不能重復發送,即實現端到端的Exactly-Once精確一次消息投遞。這該如何實現呢?

圖片

Kafka事務介紹

針對上面的業務場景,kafka已經替我們想到了,在kafka 0.11版本以后,引入了一個重大的特性:冪等性和事務。

冪等性

這里提到冪等性的原因,主要是因為事務的啟用必須要先開啟冪等性,那么什么是冪等性呢?

冪等性是指生產者無論向kafka broker發送多少次重復的數據,broker 端只會持久化一條,保證數據不會重復。

冪等性通過生產者配置項enable.idempotence=true開啟,默認情況下為true。

冪等性實現原理

圖片

  1. 每條消息都有一個主鍵,這個主鍵由 <PID, Partition, SeqNumber>組成。
  • PID:ProducerID,每個生產者啟動時,Kafka 都會給它分配一個 ID,ProducerID 是生產者的唯一標識,需要注意的是,Kafka 重啟也會重新分配 PID。
  • Partition:消息需要發往的分區號。
  • SeqNumber:生產者,他會記錄自己所發送的消息,給他們分配一個自增的 ID,這個 ID 就是 SeqNumber,是該消息的唯一標識,每發送一條消息,序列號加 1。
  1. 對于主鍵相同的數據,kafka 是不會重復持久化的,它只會接收一條。

冪等性缺點

根據冪等性的原理,我們發現它存在下面的缺點:

  • 只能保證單分區、單會話內的數據不重復
  • kafka 掛掉,重新給生產者分配了 PID,還是有可能產生重復的數據

那么如何實現跨分區、kafka broker重啟也能保證不重復呢?這就要使用事務了。

事務

所謂事務,就是要求保證原子性,要么全部成功,要么全部失敗。那么具體該如何開啟呢?

  1. kafka要想開啟事務必須要啟用冪等性,即生產者配置enable.idempotence=true
  2. kafka生產者需要配置唯一的事務idtransactional.id, 最好為其設置一個有意義的名字。
  3. kafka消費端也有一個配置項isolation.level和事務有很大關系。
  • read_uncommitted:默認值,消費端應用可以看到(消費到)未提交的事務,當然對于已提交的事務也是可見的。
  • read_committed:消費端應用只能消費到提交的事務內的消息。

Kafka事務 API

現在我們用java的api來實現一下前面這個“消費-處理-生產“的例子吧。

  1. 引入依賴
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>
  1. 創建事務的生產者
Properties prodcuerProps = new Properties();
// kafka地址
prodcuerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
// key序列化
prodcuerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value序列化
prodcuerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 啟用冪等性
producerProps.put("enable.idempotence", "true");
// 設置事務id
producerProps.put("transactional.id", "prod-1");
KafkaProducer<String, String> producer = new KafkaProducer(prodcuerProps);
  • enable.idempotence配置項目為true
  • 設置transactional.id
  1. 創建事務的消費者
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put("group.id", "my-group-id");
// 設置consumer手動提交
consumerProps.put("enable.auto.commit", "false");
// 設置隔離級別,讀取事務已提交的消息
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
//訂閱主題
consumer.subscribe(Collections.singletonList("topic1"));
  • enable.auto.commit=false,設置手動提交消費者offset
  • 設置isolation.level=read_committed,消費事務已提交的消息

4.核心邏輯

// 初始化事務 
producer.initTransactions();
while(true) {
 // 拉取消息 
 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
    if(!records.isEmpty()){
        // 準備一個 hashmap 來記錄:"分區-消費位移" 鍵值對
        HashMap<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
        // 開啟事務 
        producer.beginTransaction();
        try {
            // 獲取本批消息中所有的分區
            Set<TopicPartition> partitions = records.partitions();
            // 遍歷每個分區
            for (TopicPartition partition : partitions) {
                // 獲取該分區的消息
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                // 遍歷每條消息
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    // 執行數據的業務處理邏輯
                    ProducerRecord<String, String> outRecord = new ProducerRecord<>("topic2", record.key(), record.value().toUpperCase());
                    // 將處理結果寫入 kafka
                    producer.send(outRecord);
                }

                // 將處理完的本分區對應的消費位移記錄到 hashmap 中
                long offset = partitionRecords.get(partitionRecords.size() - 1).offset();
                // 事務提交的是即將到來的偏移量,這意味著我們需要加 1
                offsetsMap.put(partition,new OffsetAndMetadata(offset+1));
            }
            // 向事務管理器提交消費位移 
            producer.sendOffsetsToTransaction(offsetsMap,"groupid");
            // 提交事務 
            producer.commitTransaction();
        } catch(Exeception e) {
            e.printStackTrace();
            // 終止事務 
            producer.abortTransaction();
        }
    }
}
  • initTransactions(): 初始化事務
  • beginTransaction(): 開啟事務
  • sendOffsetsToTransaction(): 在事務內提交已經消費的偏移量(主要用于消費者)
  • commitTransaction(): 提交事務
  • abortTransaction(): 放棄事務

Kafka事務實現原理

kafka事務的實現引入了事務協調器,如下圖所示:

圖片

  1. 生產者使用事務必須配置事務id, kafka根據事務id計算分配事務協調器
  2. 事務協調器返回pid,前面的冪等性中需要
  3. 開始發送消息到topic中,不過這些消息與普通的消息不同,它們帶著一個字段標識自己是事務消息
  4. 當生產者事務內的消息發送完畢,會向事務協調器發送 commit 或 abort 請求,等待 kafka 響應
  5. 事務協調器收到請求后先持久化到內置事務主題__transaction_state中,__transaction_state默認有50個分區,每個分區負責一部分事務。事務劃分是根據transactional.id的hashcode值%50,計算出該事務屬于哪個分區。 該分區Leader副本所在的broker節點即為這個transactional.id對應的Transaction Coordinator節點,這也是上面第一步中的計算邏輯。
  6. 事務協調器后臺會跟topic通信,告訴它們事務是成功還是失敗的。
  • 如果是成功,topic會匯報自己已經收到消息,協調者收到主題的回應便確認了事務完成,并持久化這一結果。
  • 如果是失敗的,主題會把這個事務內的消息丟棄,并匯報給協調者,協調者收到所有結果后再持久化這一信息,事務結束。
  1. 持久化第6步中的事務成功或者失敗的信息, 如果kafka broker配置max.transaction.timeout.ms之前既不提交也不中止事務, kafka broker將中止事務本身。 此屬性的默認值為 15 分鐘。

總結

本文講解了通過kafka事務可以實現端到端的精確一次的消息語義,通過事務機制,KAFKA 實現了對多個 topic 的多個 partition 的原子性的寫入,通過一個例子了解了一下如何使用事物。同時也簡單介紹了事務實現的原理,它底層必須要依賴kafka的冪等性機制,同時通過類似“二段提交”的方式保證事務的原子性。

責任編輯:武曉燕 來源: JAVA旭陽
相關推薦

2021-12-06 15:02:37

RabbitMQ系統消息

2021-12-21 09:05:46

命令Linux敲錯

2020-04-30 09:19:56

Docker容器虛擬機

2020-01-21 21:15:16

WiFi網絡WiFi6

2024-05-11 07:37:43

數據Redis策略

2015-05-29 09:01:48

2020-06-15 08:03:17

大文件OOM內存

2025-04-03 09:51:37

2023-07-14 21:34:40

JVM上下線線程

2017-06-20 09:42:52

網絡安全法數據隱私法網絡安全

2018-07-15 08:18:44

緩存數據庫數據

2017-08-25 17:59:41

浮點運算C語言

2021-08-13 22:38:36

大數據互聯網技術

2019-09-04 10:00:07

手機人臉識別

2021-05-27 18:06:30

MySQL編碼數據

2024-04-07 09:00:00

MySQL

2010-06-02 10:53:28

MySQL版本

2022-03-18 10:53:49

數據系統架構

2020-07-20 14:06:38

數據庫主從同步服務

2018-07-08 07:38:28

數據庫緩存數據
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 影音先锋中文在线 | 91.色| 欧美日韩精品一区二区三区四区 | 99久久婷婷国产综合精品电影 | 黄色片网站国产 | 亚洲精品在线观 | 亚洲精品国产a久久久久久 午夜影院网站 | 日韩av免费在线电影 | 欧美区在线| 成年人网站免费视频 | 国产精品美女久久久久aⅴ国产馆 | av在线黄 | 中文字幕一区二区三区四区五区 | 欧美日韩精品一区 | 精品视频在线观看 | 国产久| 欧美日韩专区 | www.99re| 99色综合| av免费在线观看网站 | 国产日韩精品久久 | 国产一区二区三区四区三区四 | 午夜精品久久久久久久久久久久久 | 91xx在线观看| 91不卡在线| 日韩一级欧美一级 | 人人擦人人干 | 久久久xxx| 高清av在线| 久草新在线| 久草色视频 | 中文字幕 欧美 日韩 | 久久国产视频网站 | 看羞羞视频 | 91看片在线观看 | 亚洲国产成人精品女人久久久 | 欧美1区2区 | 一区二区三区日 | 日韩中文字幕在线视频 | 国产精品久久在线 | 久久久国产精品入口麻豆 |