得物自建DTS平臺的技術演進
前言
DTS是數據傳輸平臺(Data Transfer Platform的縮寫)
隨著得物App的用戶流量增長,業務選擇的數據庫越來越多樣化,異構數據源之間的數據同步需求也逐漸增多。為了控制成本并更好地支持業務發展,我們決定自建DTS平臺。本文主要從技術選型、能力支持與演化的角度出發,分享了在DTS平臺升級過程中獲得的經驗,并提供一些參考。
1技術選型
DTS的主要目標是支持不同類型的數據源之間的數據交互,包括關系型數據庫(RDBMS)、NoSQL數據庫、OLAP等,同時整合了數據庫配置管理、數據訂閱、數據同步、數據遷移、DRC雙活數據同步支持、數據巡檢、監控報警、統一權限等多個模塊,以構建安全、可擴展、高可用的數據架構平臺。
1.1 能力對比
圖片
1.2 DTS 1.0 - 以 canal/otter/datax 作為執行引擎
圖片
1.3 為什么要切換到Flink?
為了支持多種讀端數據源和寫端數據源,需要一個統一數據處理框架,以減少重復組件和提高開發效率。同時數據源類型和組件的維護難度與復雜度呈線性增長,現有的組件需要統一維護到一個項目中。
Canal和Otter等組件的社區活躍度低,很長時間沒有得到維護更新。因此,需要選擇一個新的、活躍的框架。此外,現有組件也無法有效支持全量+增量一體化的操作。
因此,使用一個統一的數據處理框架,能夠同時支持多種讀端數據源和寫端數據源,以及全量+增量一體化的功能,是必要的。這樣能夠降低組件的維護難度和復雜度,提高開發效率。
通過DTS 2.0,我們希望將canal/otter/datax演化為一個任務執行框架+管理平臺,能夠為后續大量數據源迭代提速。
1.4 DTS 2.0 以Flink作為執行引擎
現有的開發流程:
- 統一的任務執行框架,集成flink并引入connectors根據配置組裝出具體的DTS任務
- 維護并研發新的 connector
當我們需要支持新的數據源, 首先將數據源相關插件維護在connector中,接著在執行框架中引入需要的組件,其中存在大量的可復用的功能,這樣就做到了connector及功能組件復用的效果。
2DTS 現有能力
圖片
3我們做了什么?
3.1 DTS Connectors框架 - 數據源支持提速
在Flink CDC基礎上實現的全量/增量任務同步框架,基本的架構如下
圖片
其中Connector中分別實現了Flink提供的SourceFunction和SinkFunction函數,分別負責從讀端讀取數據,往寫端寫入數據,因此一個Connector可同時存在于上游或者下游。
任務的啟動流程:
- 指定任務Json配置, 根據類型加載SourceFunction和SinkFunction構建通用能力函數并啟動
a. 任務的Main函數如下所示, 根據如下的Json文件加載到對應的Connector中的SourceFactory或者SinkFactory來構造對應的DataStream。
DataStream是Flink中提供的數據流操作類
public class Main {
public static void main(String[] args) throws Exception {
// 解析參數
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String[] parsedArgs = parseArgs(parameterTool);
Options options = new OptionParser(parsedArgs).getOptions();
options.setJobName(options.getJobName());
// 執行任務
StreamExecutionEnvironment environment =
EnvFactory.createStreamExecutionEnvironment(options);
exeJob(environment, options);
}
任務Json配置:
{
"job":{
"content":{
"reader":{
"name":"binlogreader",
"parameter":{
"accessKey":"",
"binlogOssApiUrl":"",
"delayBetweenRestartAttempts":2000,
"fetchSize":1,
"instanceId":"",
"rdsPlatform":"",
"restartAttempts":5,
"secretKey":"",
"serverTimezone":"",
"splitSize":1024,
"startupMode":"LATEST_OFFSET"
}
},
"writer":{
"name":"jdbcwriter",
"parameter":{
"batchSize":10000,
"concurrentWrite":true,
],
"dryRun":false,
"dumpCommitData":false,
"errorRecord":0,
"flushIntervalMills":30000,
"poolSize":10,
"retries":3,
"smallBatchSize":200
}
}
},
}
}
b. 我們提供了兩個抽象工廠類,SourceFactory, SinkFactory, 其中的createSource, createSink便是子工廠需要實現的方法,不同的數據源實現不同。
public abstract class SourceFactory<T> {
public abstract DataStream<T> createSource();
}
public abstract class SinkFactory<T> {
public abstract void createSink(DataStream<T> rowData) throws Exception;
}
c. 接下來,我們只需要實現對應的子工廠方法就可以了
public class BinlogSourceFactory extends AbstractJdbcSourceFactory {
@Override
public DataStream<TableRowData> createSource() {
List<String> tables = this.binlogSourceConf.getConnection().getTable();
Set<String> databaseList = new HashSet<>(2);
// 使用對應的Connector構建DataStream
}
}
d. 通用能力函數:RateLimitFunction, BinlogPositionFunction 其中分別實現了對應的任務能力,例如限流,任務位點保存等。
public class RateLimiterMapFunction<T> extends RichMapFunction<T, T> {
private transient FlinkConnectorRateLimiter rateLimiter;
@Override
public T map(T value) throws Exception {
if (rateLimiterEnabled) {
rateLimiter.acquire(1);
}
return value;
}
當任務所需的函數都創建完成后,任務就真正開始運行了。
收益:
3.2 RDS日志獲取
DTS通過提供增量和全量同步能力為業務提供數據同步功能,但在增量訂閱/同步任務執行過程中,可能會遇到一些異常情況。其中,以下三種情況需要特別處理:
- Binlog可用性
云廠商的數據庫實例本地binlog有效期8小時,過期部分進行OSS備份。MySQL業務高峰期或者DDL變產生大量的binlog, DTS任務嘗試獲取過期數據失敗,任務因此中斷。因此,DTS支持了本地binlog+OSS備份binlog的獲取及切換,保障日志可用性。
- 數據庫實例主從切換
RDS經常會發生主備節點切換,在切換的過程中要保證數據不丟。由于切換前后兩個數據庫實例 Binlog 文件一般都是不一致的,此時任務位點記錄方式是 BinlogPosition 模式,則在切換之后任務需要自動進行 Binlog 對齊操作,進而保證數據的完整性。將新數據實例上的位點查詢時間戳提前1-2分鐘即可。
- 讀實例訂閱支持
DTS任務binlog dump連接數過多造成主庫壓力及影響DDL變更,因此需要支持讀庫訂閱。云廠商的讀庫不提供備份,在讀庫日志過期時需要切換到主庫進行讀取。
3.3 全量增量一體化功能
圖片
全量增量一體化是指先同步存量數據,待存量結束之后再開始同步增量數據。其中也加入了增量階段的OSS備份日志獲取。但存量階段依然存在一些問題,需要進一步改造優化。
全量模式下新增表先進行存量數據同步再進行增量數據同步,該任務中已存在的表會因此導致數據延遲。待新增表數據同步完成,任務延遲則會恢復正常。 |
3.4 數據源接入- starrocks, postgres等
支持從mysql同步到starrocks和postgres, 在任務執行框架的基礎上,只需要開發starrocks-connector, postgres connector支持對應的數據源即可。其中的其他能力,像多表同步、分庫分表等場景都可以達到復用的效果。
3.5 JBDC寫入改造
腳本擴展和動態表名路由:
圖片
數據合并和多線程寫入:
圖片
3.6 監控告警
DTS任務需要采集flink任務指標,主要包括任務延遲、各個算子階段的寫入速率,算子被壓及使用率等。其中 任務延遲需要接入告警服務,于是我們選擇了引入redis來緩存任務的延遲時間,再上報到告警服務來完成飛書的消息和電話告警。
4最佳實踐
4.1 0000-00-00 00:00:00時間戳的問題
MySQL的時間戳允許為0000-00-00 00:00:00, 在Flink任務中通常會被轉換為null, 導致寫入下游數據源失敗, 因此需要做特殊標記對于不同的數據源做不同的轉化保證寫入的正切行。
4.2 Flink CDC任務serverId唯一性
Flink CDC source 會偽裝成 MySQL slave節點,為了保證數據的準確性,每個slave必須擁有唯一的serverId來標記該slave的唯一性。因此在flink cdc的任務中我們為每一個任務分配了一個唯一的serverId區間(范圍區間是為了支持多并行度)。
4.3 Flink任務數據序列化瓶頸
在flink任務中使用DataStreamAPI并使用比較復雜的數據結構進行傳輸時,算子之間的序列化成本較高,兩個方向,一是建立更為高效的數據結構進行傳輸,二是開啟flink對象復用,并盡可能減少不同并行度之間的數據傳輸。
5未來演進
DTS作為一個數據同步平臺主要功能是盡可能提供高效的數據源同步功能,助力于多變的業務場景。
5.1 基于Flink SQL的ETL任務管理
流式數據處理除了現有的DataStream API還存在SQL的形式,SQL作為一種通用的語言,對于數據相關的業務同學極大的降低了學習成本。而通過Flink SQL可以做到的ETL流式數據加工也能解決一些復雜業務場景的處理邏輯,將業務邏輯轉化為DAG的流式處理圖,通過拖拽的方式也能方便使用,FLINK SQL的演進方向能夠和現有的Flink DataStream API互補。
應用場景:ETL強大的流式數據轉換處理能力大幅提升數據集成效率,也能建實時報表體系,提高分析效率,同時也可以應用于一些實時大屏的場景。
5.2 統一技術棧
將現有的DTS能力都遷移到Flink平臺上,保持統一的技術棧,能夠極大的降低維護成本。現有遺留的雙向同步、數據比對等能力需要做進一步的改造和遷移,符合整體技術收斂的趨勢。
6總結
本文主要分享了以下幾個方面:Flink相比現有的技術棧帶來的收益,切換到Flink以后的迭代方向及架構功能上的變更、帶來新的問題如何解決,以及未來的一些迭代方向,希望能讓大家有所收獲。