SpringBoot整合Flink CDC,實時追蹤數據變動,無縫同步至Redis
環境:SpringBoot2.7.16 + Flink 1.19.0 + JDK21
1. 簡介
Flink CDC(Flink Change Data Capture)是基于數據庫的日志CDC技術,實現了全增量一體化讀取的數據集成框架。它搭配Flink計算框架,能夠高效實現海量數據的實時集成。Flink CDC的核心功能在于實時地監視數據庫或數據流中發生的數據變動,并將這些變動抽取出來,以便進一步的處理和分析。通過使用Flink CDC,用戶可以輕松地構建實時數據管道,對數據變動進行實時響應和處理,為實時分析、實時報表和實時決策等場景提供強大的支持。
具體來說,Flink CDC的應用場景包括但不限于實時數據倉庫更新、實時數據同步和遷移、實時數據處理等。它還可以確保數據一致性,并在數據發生變更時能夠準確地捕獲和處理。此外,Flink CDC支持與多種數據源進行集成,如MySQL、PostgreSQL、Oracle等,并提供了相應的連接器,方便數據的捕獲和處理。
接下來將詳細的介紹關于MySQL CDC的使用。MySQL CDC 連接器允許從 MySQL 數據庫讀取快照數據和增量數據。
支持的數據庫
Connector | Database | Driver |
mysql-cdc |
| JDBC Driver 8.0.27 |
2. 實戰案例
2.1 MySQL開啟Binlog
在MySQL的配置文件中(如Linux的/etc/my.cnf或Windows的\my.ini),需要在[mysqld]部分設置相關參數以開啟binlog功能,如下:
[mysqld]
server-id=1
# 格式,行級格式
binlog-format=Row
# binlog 日志文件的前綴
log-bin=mysql-bin
# 指定哪些數據庫需要記錄二進制日志
binlog_do_db=testjpa
除了開啟binlog功能外,Flink CDC還需要其他配置和權限來確保能夠正常連接到MySQL并讀取數據。例如,需要授予Flink CDC連接MySQL的用戶必要的權限,包括SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。這些權限是Flink CDC讀取數據和元數據所必需的。
查看是否開啟了binlog功能
mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
以上就對mysql相關的配置完成了。
2.2 依賴管理
<properties>
<flink.version>1.19.0</flink.version>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
2.3 代碼實現
@Component
public class MonitorMySQLCDC implements InitializingBean {
// 該隊列專門用來臨時保存變化的數據(實際生產環境,你應該使用MQ相關的產品)
public static final LinkedBlockingQueue<Map<String, Object>> queue = new LinkedBlockingQueue<>() ;
private final StringRedisTemplate stringRedisTemplate ;
// 保存到redis中key的前綴
private final String PREFIX = "users:" ;
// 數據發生變化后的sink處理
private final CustomSink customSink ;
public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) {
this.customSink = customSink ;
this.stringRedisTemplate = stringRedisTemplate ;
}
@Override
public void afterPropertiesSet() throws Exception {
// 啟動異步線程,實時處理隊列中的數據
new Thread(() -> {
while(true) {
try {
Map<String, Object> result = queue.take();
this.doAction(result) ;
} catch (Exception e) {
e.printStackTrace();
}
}
}).start() ;
Properties jdbcProperties = new Properties() ;
jdbcProperties.setProperty("useSSL", "false") ;
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname("127.0.0.1")
.port(3306)
// 可配置多個數據庫
.databaseList("testjpa")
// 可配置多個表
.tableList("testjpa.users")
.username("root")
.password("123123")
.jdbcProperties(jdbcProperties)
// 包括schema的改變
.includeSchemaChanges(true)
// 反序列化設置
// .deserializer(new StringDebeziumDeserializationSchema())
.deserializer(new JsonDebeziumDeserializationSchema(true))
// 啟動模式;關于啟動模式下面詳細介紹
.startupOptions(StartupOptions.initial())
.build() ;
// 環境配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
// 設置 6s 的 checkpoint 間隔
env.enableCheckpointing(6000) ;
// 設置 source 節點的并行度為 4
env.setParallelism(4) ;
env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")
// 添加Sink
.addSink(this.customSink) ;
env.execute() ;
}
@SuppressWarnings("unchecked")
private void doAction(Map<String, Object> result) throws Exception {
Map<String, Object> payload = (Map<String, Object>) result.get("payload") ;
String op = (String) payload.get("op") ;
switch (op) {
// 更新和插入操作
case "u", "c" -> {
Map<String, Object> after = (Map<String, Object>) payload.get("after") ;
String id = after.get("id").toString();
System.out.printf("操作:%s, ID: %s%n", op, id) ;
stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after)) ;
}
// 刪除操作
case "d" -> {
Map<String, Object> after = (Map<String, Object>) payload.get("before") ;
String id = after.get("id").toString();
stringRedisTemplate.delete(PREFIX + id) ;
}
}
}
}
啟動模式:
- initial (默認):在第一次啟動時對受監視的數據庫表執行初始快照,并繼續讀取最新的 binlog。
- earliest-offset:跳過快照階段,從可讀取的最早 binlog 位點開始讀取
- latest-offset:首次啟動時,從不對受監視的數據庫表執行快照, 連接器僅從 binlog 的結尾處開始讀取,這意味著連接器只能讀取在連接器啟動之后的數據更改。
- specific-offset:跳過快照階段,從指定的 binlog 位點開始讀取。位點可通過 binlog 文件名和位置指定,或者在 GTID 在集群上啟用時通過 GTID 集合指定。
- timestamp:跳過快照階段,從指定的時間戳開始讀取 binlog 事件。
數據處理Sink
@Component
public class CustomSink extends RichSinkFunction<String> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public void invoke(String value, Context context) throws Exception {
System.out.printf("數據發生變化: %s%n", value);
TypeReference<Map<String, Object>> valueType = new TypeReference<Map<String, Object>>() {
};
Map<String, Object> result = mapper.readValue(value, valueType);
Map<String, Object> payload = (Map<String, Object>) result.get("payload");
String op = (String) payload.get("op") ;
// 不對讀操作處理
if (!"r".equals(op)) {
MonitorMySQLCDC.queue.put(result);
}
}
}
以上就是實現通過FlinkCDC實時通過數據到Redis的所有代碼。
2.4 Web監控頁面
引入flink web依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
環境配置
Configuration config = new Configuration() ;
config.set(RestOptions.PORT, 9090) ;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;
web監聽9090端口。
圖片
通過web控制臺你可以管理查看到更多的信息。