成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Flink無法將聚合結果直接寫入Kafka怎么辦?

原創
開發
既然知道問題,知道有實際業務需求,為啥Flink不改進,不把這種情況支持掉呢?

拋出疑無路?

【Flink 1.10】- 有一種情況是所有的系統或應用之間的橋梁都是Kafka,而這個時候恰恰是上游需要做Unbound的聚合統計。From @PyFlink 企業用戶。

示例代碼:

INSERT INTO kafkaSink 
SELECT
id,
SUM(cnt)
FROM csvSource
GROUP BY id

執行這個SQL,在【Flink 1.10】版本會拋出如下異常:

圖片

再現又一村!

【Flink-1.10】這個問題是因Flink內部Retract機制導致,在沒有考慮對Chanage log全鏈路支持之前,無法在Kafka這樣的Append only的消息隊列增加對Retract/Upsert的支持。這個做法是出于語義完整性考慮做出的決定。但現實業務場景總是有著這樣或那樣的實際業務需求,業務不關心你語義是否okay,業務關心我不改變我原有的技術選型。

在這個基礎之上只要你告訴我Sink到Kafka的行為就行,我會根據你的產出行為,在業務上面做適配,所以這個時候就是實用為主,不管什么語義不語義了......,所以這個時候應該怎么辦呢?

我們的做法是將 Kafka的sink由原有的AppendStreamTableSink變成UpsertStreamTableSink或者RetractStreamTableSink。但出于性能考慮,我們改變成UpsertStreamTableSink,這個改動不多,但是對于初學者來講還是不太愿意動手改代碼,所以為大家提供一份:

  • KafkaTableSinkBase.java

https://github.com/sunjincheng121/know_how_know_why/blob/master/QA/upsertKafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java

  • KafkaTableSourceSinkFactoryBase.java

https://github.com/sunjincheng121/know_how_know_why/blob/master/QA/upsertKafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java

在你的項目創建 org.apache.flink.streaming.connectors.kafka包 并把上面的兩個類放入該包,用于覆蓋官方KafkaConnector里面的實現。

特別強調:這樣的變化會導致寫入Kafka的結果不會是每個Group Key只有一條結果,而是每個Key可能有很多條結果。這個大家可以自行測試一下:

package cdc

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._

/**
* Test for sink data to Kafka with upsert mode.
*/
object UpsertKafka {
def main(args: Array[String]): Unit = {
val sourceData = "file:///Users/jincheng.sunjc/work/know_how_know_why/QA/upsertKafka/src/main/scala/cdc/id_cnt_data.csv"
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

val sourceDDL = "CREATE TABLE csvSource (" +
" id VARCHAR," +
" cnt INT" +
") WITH (" +
"'connector.type' = 'filesystem'," +
"'connector.path' = '" + sourceData + "'," +
"'format.type' = 'csv'" +
")"

val sinkDDL = "CREATE TABLE kafkaSink (" +
" id VARCHAR," +
" cnt INT " +
") WITH (" +
"'connector.type' = 'kafka'," +
"'connector.version' = '0.10'," +
"'connector.topic' = 'test'," +
"'connector.properties.zookeeper.connect' = 'localhost:2181'," +
"'connector.properties.bootstrap.servers' = 'localhost:9092'," +
"'connector.properties.group.id' = 'data_Group'," +
"'format.type' = 'json')"

tEnv.sqlUpdate(sourceDDL)
tEnv.sqlUpdate(sinkDDL)

val sql = "INSERT INTO kafkaSink" +
" SELECT id, SUM(cnt) FROM csvSource GROUP BY id"
tEnv.sqlUpdate(sql)
env.execute("RetractKafka")
}
}

當然,也可以clone我的git代碼【https://github.com/sunjincheng121/know_how_know_why/tree/master/QA/upsertKafka】直觀體驗一下。由于本系列文章只關注解決問題,不論述細節原理,有關原理性知識,我會在我的視頻課程《Apache 知其然,知其所以然》中進行介紹。

Flink 的鍋?...

看到上面的問題有些朋友可能會問,既然知道問題,知道有實際業務需求,為啥Flink不改進,不把這種情況支持掉呢?問的好,就這個問題而言,Flink是委屈的,Flink已經在努力支持這個場景了,預期Flink-1.12的版本大家會體驗到完整的CDC(change data capture)支持。

眾人拾柴

期待你典型問題的拋出... 我將知無不言...言無不盡... 我在又一村等你...

作者介紹

孫金城,51CTO社區編輯,Apache Flink PMC 成員,Apache Beam Committer,Apache IoTDB PMC 成員,ALC Beijing 成員,Apache ShenYu 導師,Apache 軟件基金會成員。關注技術領域流計算和時序數據存儲。

責任編輯:張燕妮 來源: 孫金城
相關推薦

2011-07-28 13:45:06

2021-02-24 08:38:48

Kafka消息Consumer

2022-10-31 09:30:32

kafkaconsumer服務端

2013-01-29 13:22:24

系統服務

2018-08-08 16:15:00

WindowsWindows 10USB

2017-03-02 21:00:53

Windows 10Windows搜索框

2017-03-01 15:08:44

Linuxboot目錄啟動

2019-06-12 10:55:30

IPv6Windows網絡連接

2011-02-23 17:33:48

FileZilla

2022-07-14 10:23:39

數據

2012-05-16 12:39:23

Windows7視頻

2012-06-06 17:05:36

谷歌視頻

2009-11-03 08:56:02

linux死機操作系統

2024-04-22 08:17:23

MySQL誤刪數據

2022-12-19 11:31:57

緩存失效數據庫

2017-02-21 13:11:43

SDN網絡體系SDN架構

2022-05-19 08:01:49

PostgreSQL數據庫

2009-11-27 11:16:30

2017-07-17 10:15:07

Windows 10Windowsinternet ex

2019-10-12 09:50:46

Redis內存數據庫
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 欧美成人aaa级毛片在线视频 | 免费观看日韩精品 | 亚洲三级在线观看 | 国产成人高清在线观看 | 在线视频日韩 | 天天射影院 | 精品久久久久久久久久久久久久久久久 | 亚洲精品久久久久中文字幕二区 | 日本黄色片免费在线观看 | 一级片aaa | 亚洲一区二区在线视频 | 亚洲欧美日韩一区二区 | 国产欧美日韩一区二区三区在线观看 | 成人无遮挡毛片免费看 | 亚洲欧美日韩精品久久亚洲区 | 成人免费黄色片 | 国产一区二区三区久久久久久久久 | 国产一级淫片免费视频 | 国产一级在线观看 | 正在播放国产精品 | 国产综合久久 | 亚洲永久入口 | 天堂一区二区三区 | 久久国内精品 | 成人九区 | 丝袜美腿一区二区三区动态图 | 日韩精品免费在线观看 | 人人种亚洲 | 日本一区二区三区四区 | 在线视频一区二区三区 | 国产精品久久久久久久久久了 | 中文字幕成人av | 成人福利网站 | 老外几下就让我高潮了 | 毛片免费观看 | 午夜精品在线 | 99久久日韩精品免费热麻豆美女 | 日本一区二区三区在线观看 | 欧美成人第一页 | 亚洲高清视频一区 | 一区二区三区四区免费在线观看 |