Flink CDC 在大健云倉的實踐
摘要:本文整理自大健云倉基礎架構負責人、Flink CDC Maintainer 龔中強在 5 月 21 日 Flink CDC Meetup 的演講。主要內容包括:
- 引入 Flink CDC 的背景
- 現今內部落地的業務場景
- 未來內部推廣及平臺化建設
- 社區合作
一、引入 Flink CDC 的背景
公司引入 CDC 技術,主要基于以下四個角色的需求:
- 物流科學家:需要庫存、銷售訂單、物流賬單等數據用于做分析。
- 開發:需要同步其他業務系統的基本信息。
- 財務:希望財務數據能夠實時傳送到財務系統,而不是月結前才能看到。
- 老板:需要數據大屏,通過大屏查看公司的業務和運營情況。
CDC 是數據捕獲變更的技術。廣義上來說,但凡能夠捕獲數據變更的技術,都能被稱為 CDC。但通常我們說的 CDC 技術主要面向數據庫的變更。
CDC 的實現方式主要有兩種,分別是基于查詢和基于日志:
- 基于查詢:查詢后插入、更新到數據庫即可,無須數據庫的特殊配置以及賬號權限。它的實時性基于查詢頻率決定,只能通過提高查詢頻率來保證實時性,而這必然會對 DB 造成巨大壓力。此外,因為是基于查詢,所以它無法捕獲兩次查詢之間數據的變更記錄,也就無法保證數據的一致性。
- 基于日志:通過實時消費數據的變更日志實現,因此實時性很高。而且不會對 DB 造成很大的影響,也能夠保證數據的一致性,因為數據庫會將所有數據的變動記錄在變更日志中。通過對日志的消費,即可明確知道數據的變化過程。它的缺點是實現相對復雜,因為不同數據庫的變動日志實現不一樣,格式、開啟方式以及特殊權限都不一樣,需要針對每一種數據庫做相應的適配開發。
正如 Flink 的宣言 “實時即未來”,在如今的大背景下,實時性是亟待解決的重要問題。因此,我們將主流 CDC 基于日志的技術做了對比,如上圖所示:
- 數據源:Flink CDC 除了對傳統的關系型數據庫做到了很好的支持外,對文檔型、NewSQL(TiDB、OceanBase) 等當下流行的數據庫都能夠支持;Debezium 對數據庫的支持相對沒有那么廣泛,但是對主流的關系型數據庫都做到了很好的支撐;Canal 和 OGG 只支持單一的數據源。
- 斷點續傳:四種技術都能夠支持。
- 同步模式:除了 Canal 只支持增量,其他技術均支持全量 + 增量的方式。而全量 + 增量的方式意味著第一次上線時全量到增量的切換過程全部可以通過 CDC 技術實現,無須人為地通過全量的任務加上增量的 job 去實現全量 + 增量數據的讀取。
- 活躍度:Flink CDC 擁有非常活躍的社區,資料豐富,官方也提供了詳盡的教程以及快速上手教程;Debezium 社區也相當活躍,但資料大多是英文的;Canal 的用戶基數特別大,資料也相對較多,但社區活躍度一般;OGG 是 Oracle 的大數據套件,需要付費,只有官方資料。
- 開發難度:Flink CDC 依靠 Flink SQL 和 Flink DataStream 兩種開發模式,尤其是 Flink SQL,通過非常簡單的 SQL 即可完成數據同步任務的開發,開發上手尤為簡單;Debezium 需要自己解析采集到的數據變更日志進行單獨處理,Canal 亦是如此。
- 運行環境依賴:Flink CDC 是以 Flink 作為引擎,Debezium通常是將 Kafka connector 作為運行容器;而 Canal 和 OGG 都是單獨運行。
- 下游豐富程度:Flink CDC 依靠 Flink 非常活躍的周邊以及豐富的生態,能夠打通豐富的下游,對普通的關系型數據庫以及大數據存儲引擎 Iceberg、ClickHouse、Hudi 等都做了很好的支持;Debezium 有 Kafka JDBC connector, 支持 MySQL 、Oracle 、SqlServer;Canal 只能直接消費數據或將其輸出到 MQ 中進行下游的消費;OGG 因為是官方套件,下游豐富程度不佳。
二、現今內部落地的業務場景
- 2018 年之前,大健云倉數據同步的方式為:通過多數據應用定時同步系統之間的數據。
- 2020 年之后,隨著跨境業務的飛速發展,多數據源應用經常打滿 DB 影響在線應用,同時定時任務的執行順序管理混亂。
- 因此, 2021 年我們開始調研選型 CDC 技術,搭建了小型試驗場景,進行小規模的試驗。
- 2022 年,上線了基于 Flink CDC 實現的 LDSS 系統庫存場景同步功能。
- 未來,我們希望依托 Flink CDC 打造數據同步平臺,通過界面的開發和配置完成同步任務的開發、測試和上線,能夠全程在線管理同步任務的整個生命周期。
LDSS 庫存管理的業務場景主要有以下四種:
- 倉儲部門:要求倉庫的庫存容量和商品品類分布合理,庫存容量方面,需要留一些 buffer 以防突如其來的入庫單導致爆倉;商品品類方面,季節性的商品庫存分配不合理導致熱點問題,這必將給倉庫的管理帶來巨大挑戰。
- 平臺客戶:希望訂單處理及時,貨物能夠快速、精準地交到客戶手上。
- 物流部門:希望能夠提升物流效率,降低物流成本,高效利用有限的運力。
- 決策部門:希望 LDSS 系統能夠對在何時何地新建倉庫提供科學的建議。
上圖為 LDSS 庫存管理分單場景架構圖。
首先,通過多數據源同步的應用向下拉取倉儲系統、平臺系統以及內部 ERP 系統數據,將所需數據抽取到 LDSS 系統的數據庫中,以支撐 LDSS 系統訂單、庫存、物流三大模塊的業務功能。
其次,需要產品信息、訂單信息以及倉庫信息才能進行有效的分單決策。多數據源定時同步任務基于 JDBC 查詢,通過時間做篩選,同步變更的數據到 LDSS 系統中。LDSS 系統基于這些數據做分單決策,以獲得最優解。
定時任務同步的代碼,首先需要定義定時任務、定義定時任務的類、執行方法以及執行間隔。
上圖左側為定時任務的定義,右側是定時任務的邏輯開發。首先,打開 Oracle 數據庫進行查詢,然后 upsert 到 MySQL 數據庫,即完成了定時任務的開發。此處以接近原生 JDBC 的查詢方式,將數據依次塞到對應的數據庫表中,開發邏輯十分繁瑣,也容易出現 bug。
因此,我們基于 Flink CDC 對其進行了改造。
上圖為基于 Flink CDC 實現的實時同步場景,唯一的變化是將此前的多數據源同步應用程序換成了 Flink CDC 。
首先,通過 SqlServer CDC、MySQL CDC、Oracle CDC 分別連接抽取對應倉儲平臺、 ERP 系統數據庫的表數據,然后通過 Flink 提供的 JDBC connector 寫入到 LDSS 系統的 MySQL 數據庫中。能夠通過 SqlServer CDC、MySQL CDC、Oracle CDC 將異構數據源轉化為統一的 Flink 內部類型,再往下游寫。
此架構相比于之前的架構,對業務系統沒有侵入性,而且實現較為簡單。
我們引入了 MySQL CDC 和 SqlServer CDC 分別連接 B2B 平臺的 MySQL 數據庫以及倉儲系統的 SqlServer 數據庫,然后將抽取到的數據通過 JDBC Connector 寫入到 LDSS 系統的 MySQL 數據庫。
通過以上改造,得益于 Flink CDC 賦予其實時的能力,不需要管理繁雜的定時任務。
基于 Flink CDC 同步代碼的實現分為以下三步:
- 第一步,定義源表 —— 需要同步的表;
- 第二步,定義目標表 —— 需要寫入數據的目標表;
- 第三步,通過 insert select 語句,即可完成 CDC 同步任務的開發。
上述開發模式非常簡單,邏輯清晰。此外,依托 Flink CDC 的同步任務和 Flink 架構,還獲得了失敗重試、分布式、高可用、全量增量一致性切換等特性。
三、未來內部推廣及平臺化建設
上圖為平臺架構圖。
左側 source 是由 Flink CDC + Flink 提供的源端,能夠通過豐富的源端抽取數據,通過數據平臺上的開發寫入到目標端。目標端又依托于 Flink 的強大生態,能夠很好地支撐數據湖、關系型數據庫、MQ 等。
Flink 目前有兩種運行方式,一種是國內比較流行的 Flink on Yarn,另一種是 Flink on Kubernets。中間部分的數據平臺向下管理 Flink 集群,以向上支撐 SQL 在線開發、任務開發、血緣管理、任務提交、在線 Notebook 開發、權限和配置以及對任務性能的監控和告警,同時也能夠對數據源做到很好的管理。
數據同步的需求在公司內部特別旺盛,需要通過平臺來提高開發效率,加快交付速度。而且平臺化之后,可以統一公司內部的數據同步技術,收攏同步技術棧,減少維護成本。
平臺化的目標如下:
- 能夠很好地管理數據源、表等元信息;
- 任務的整個生命周期都可以在平臺上完成;
- 實現任務的性能觀測以及告警;
- 簡化開發,快速上手,業務開發人員經過簡單培訓即可上手開發同步任務。
平臺化能帶來以下三個方面的收益:
- 收攏數據同步任務,統一來管理;
- 平臺管理維護同步任務的全生命周期;
- 專門的團隊負責,團隊能夠專注前沿的數據集成技術。
有了平臺之后,即可快速落地應用更多的業務場景。
- 實時數倉:希望通過 Flink CDC 以支持更多實時數倉的業務場景,借助 Flink 強大的計算能力做一些數據庫的物化視圖。將計算從 DB 里解脫出來,通過 Flink 的外部計算再重新寫回數據庫,以加速平臺應用的報表、統計、分析等實時應用場景。
- 實時應用:Flink CDC 能夠從 DB 層捕獲變更,因此可以通過 Flink CDC 實時更新搜索引擎中的內容,實時向財務系統推送財務和核算數據。因為大部分財務系統的數據都需要業務系統通過跑定時任務以及經過大量關聯、聚合、分組等操作才能計算出來,再推送到財務系統中。而借助 Flink CDC 強大的數據捕獲能力,再加上 Flink 的計算能力,將這些數據實時地推送到核算系統和財務系統,就能夠及時發現業務的問題,減少公司的損失。
- 緩存:通過 Flink CDC,能夠構建一個脫離于傳統的應用之外的實時緩存,對于在線應用的性能有極大的提升。
有了平臺的助力,相信 Flink CDC 能夠在公司內部更好地釋放它的能力。
上圖展示了 SqlServer CDC 的原理。
社區同學使用了當前版本的 SqlServer CDC 后,主要反饋的問題有以下三個:
- 快照過程中鎖表:鎖表操作對于 DBA 和在線應用都是不可忍受的, DBA 無法接受數據庫被夯住,同時也會影響在線應用。
- 快照過程中不能 checkpoint:不能 checkpoint 就意味著快照過程中一旦失敗,只能重新開始跑快照過程,這對于大表非常不友好。
- 快照過程只支持單并發:千萬級、上億級的大表,在單并發的情況下需要同步十幾甚至幾十個小時,極大束縛了 SqlServer CDC 的應用場景。
我們針對上述問題做了實踐和改進,參考社區 2.0 版本 MySQL CDC 并發無鎖算法的思想,對 SqlServer CDC 進行了優化,最終實現了快照過程中無鎖,實現一致性快照;快照過程中支持 checkpoint ;快照過程中支持并發,加速快照過程。在大表同步的情況下,并發優勢尤為明顯。
但是由于 2.2 版本社區將 MySQL 的并發無鎖思想抽象成了統一公共的框架,SqlServer CDC 需要重新適配這套通用框架后才能貢獻給社區。
提問&解答
Q1需要開啟 SqlServer 自己的 CDC 嗎?
是的,SqlServer CDC 的功能就是基于 SqlServer 數據庫自己的 CDC 特性實現的。
Q2物化視圖通過什么方式去刷新定時任務觸發器?
通過 Flink CDC 將需要生成物化視圖的 SQL 放在 Flink 里運行,通過原表的變動觸發計算,然后同步到物化視圖表里。
Q3平臺化是怎么做的?
平臺化參考了社區眾多的開源項目以及優秀的開源平臺,比如 StreamX、DLink 等優秀的開源項目。
Q4SqlServer CDC 在消費 transaction log 時有瓶頸嗎?
SqlServer 并沒有直接消費 log,其原理是 SqlServer capture process 去匹配 log 內哪些表開啟了 CDC ,然后將這些表從日志里撈到開啟 CDC 表的變更數據,再轉插到 change table 里,最后通過開啟 CDC 之后數據庫生成的 CDC query function 獲取到數據的變更。
Q5Flink CDC 高可用如何保障同步任務過多或密集處理方案?
Flink 的高可用依賴于 Flink 特性比如 checkpoint 等來保證。同步任務過多或處理方案密集的情況,建議使用多套 Flink 下游集群,然后根據同步的實時性區分對待,將任務發布到相應的集群中。
Q6中間需要 Kafka 嗎?
取決于同步任務或數倉架構是否需要將中間數據做 Kafka 落地。
Q7一個數據庫中有多張表,可以放到一個任務里運行嗎?
取決于開發方式。如果是 SQL 的開發方式,要實現一次性寫多表只能通過多個任務。但 Flink CDC 提供了另外一種比較高階的開發方式 DataStream ,可以將多表放到一個任務里運行。
Q8Flink CDC 支持讀取 Oracle 從庫的日志嗎?
目前還無法實現。
Q9通過 CDC 同步后兩個端的數據質量如何監控,如何比對?
目前只能通過定時抽樣來做數據質量的檢查,數據質量問題一直是業內比較棘手的問題。
Q10大健云倉用的什么調度系統?系統如何與 Flink CDC 集合?
使用 XXL Job 作為分布式的任務調度,CDC 沒有用到定時任務。
Q11如果采集增刪表,SqlServer CDC 需要重啟嗎?
SqlServer CDC 目前不支持動態加表的功能。
Q12同步任務會影響系統性能嗎?
基于 CDC 做同步任務肯定會影響系統性能,尤其是快照過程對數據庫會有影響,進而影響應用系統。社區將來會做限流、對所有 connector 做并發無鎖的實現,都是為了擴大 CDC 的應用場景以及易用性。
Q13全量和增量的 savepoint 怎么處理?
(未通過并發無鎖框架實現的連接器)全量過程中不可以觸發 savepoint,增量過程中如果需要停機發布,可通過 savepoint 恢復任務。
Q14CDC 同步數據到 Kafka ,而 Kafka 里面存的是 Binlog ,如何保存歷史數據和實時數據?
將 CDC 同步的數據全部 Sync 到 Kafka,保留的數據取決于 Kafka log 的清理策略,可以全部保留。
Q15CDC 會對 Binlog 的日志操作類型進行過濾嗎?會影響效率嗎?
即使有過濾操作,對性能影響也不大。
Q16CDC 讀 MySQL 初始化快照階段,多個程序讀不同的表會有程序報錯無法獲取鎖表的權限,這是什么原因?
建議先查看 MySQL CDC 是不是使用老的方式實現,可以嘗試新版本的并發無鎖實現。
Q17MySQL 上億大表全量和增量如何銜接?
建議閱讀雪盡老師在 2.0 的相關博客,非常簡單清晰地介紹了并發無鎖如何實現一致性快照,完成全量和增量的切換。