Flink 中的 Savepoint 和 Checkpoint 有什么區(qū)別?
Savepoint(保存點)和Checkpoint(檢查點)都是Flink中用于狀態(tài)持久化和恢復(fù)的機制,但它們在目的、觸發(fā)方式、生命周期管理和使用場景上有著明顯區(qū)別。
一、基本概念
1. Checkpoint(檢查點)
檢查點是Flink自動觸發(fā)的狀態(tài)快照,用于故障恢復(fù),主要特點:
- 自動創(chuàng)建:由Flink定期自動觸發(fā)
- 生命周期:通常在作業(yè)運行期間有限存在,舊的檢查點會被新的覆蓋
- 主要目的:實現(xiàn)容錯機制,在故障發(fā)生時能夠恢復(fù)到最近的一致狀態(tài)
- 存儲格式:針對性能優(yōu)化,可能使用增量存儲機制
2. Savepoint(保存點)
保存點是用戶手動觸發(fā)的狀態(tài)快照,用于有計劃的作業(yè)升級或維護,主要特點:
- 手動創(chuàng)建:由用戶通過命令或API手動觸發(fā)
- 生命周期:長期存在,直到用戶明確刪除
- 主要目的:應(yīng)用版本升級、集群遷移、A/B測試等有計劃的操作
- 存儲格式:更加完整和自包含,確保長期兼容性
二、詳細(xì)對比
三、在FlinkSQL 中配置和使用
1. Checkpoint 配置
1.SQL Client 模式
-- 啟用檢查點,每10秒觸發(fā)一次
SET 'execution.checkpointing.interval' = '10s';
-- 設(shè)置檢查點模式(EXACTLY_ONCE/AT_LEAST_ONCE)
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
-- 設(shè)置檢查點超時時間
SET 'execution.checkpointing.timeout' = '5min';
-- 設(shè)置檢查點最小間隔
SET 'execution.checkpointing.min-pause' = '1s';
-- 設(shè)置同時進行的檢查點數(shù)量上限
SET 'execution.checkpointing.max-concurrent-checkpoints' = '1';
-- 設(shè)置檢查點存儲位置
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
-- 設(shè)置保留的檢查點數(shù)量
SET 'state.checkpoints.num-retained' = '5';
-- 作業(yè)取消時保留檢查點
SET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
2. Java API模式
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每10秒觸發(fā)一次檢查點
env.enableCheckpointing(10000);
// 獲取檢查點配置
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
// 設(shè)置模式為EXACTLY_ONCE
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 設(shè)置檢查點超時時間
checkpointConfig.setCheckpointTimeout(300000);
// 設(shè)置最小間隔時間
checkpointConfig.setMinPauseBetweenCheckpoints(1000);
// 設(shè)置最大并發(fā)檢查點數(shù)
checkpointConfig.setMaxConcurrentCheckpoints(1);
// 設(shè)置外部化檢查點的清理行為
checkpointConfig.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 設(shè)置檢查點存儲
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints"));
2. Savepoint 操作
1. 創(chuàng)建Savepoint
使用Flink CLI:
# 為作業(yè)創(chuàng)建保存點
flink savepoint :jobId [:targetDirectory]
# 例如
flink savepoint 1234567890abcdef hdfs:///flink/savepoints
使用REST API:
# 發(fā)送POST請求到作業(yè)管理器
curl -X POST "http://jobmanager:8081/jobs/:jobId/savepoints" \
-d '{"target-directory": "hdfs:///flink/savepoints", "cancel-job": false}'
2. 從Savepoint恢復(fù)
使用Flink CLI:
# 從保存點恢復(fù)作業(yè)
flink run -s :savepointPath [:runArgs]
# 例如
flink run -s hdfs:///flink/savepoints/savepoint-1234567-aabbccdd jarfile.jar
使用SQL Client:
-- 設(shè)置從保存點恢復(fù)
SET 'execution.savepoint.path' = 'hdfs:///flink/savepoints/savepoint-1234567-aabbccdd';
-- 執(zhí)行SQL任務(wù)
INSERT INTO target_table SELECT * FROM source_table;
使用Java API:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設(shè)置從保存點恢復(fù)
env.setRestartStrategy(RestartStrategies.noRestart());
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints"));
// 指定保存點路徑
String savepointPath = "hdfs:///flink/savepoints/savepoint-1234567-aabbccdd";
Configuration configuration = new Configuration();
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
env.configure(configuration);
// 構(gòu)建作業(yè)并執(zhí)行
// ...
env.execute("Job restored from savepoint");
四、常見應(yīng)用場景
1. Checkpoint 應(yīng)用場景
- 故障自動恢復(fù): – 節(jié)點崩潰時,作業(yè)自動從最近的檢查點恢復(fù) – 網(wǎng)絡(luò)分區(qū)時,保持?jǐn)?shù)據(jù)一致性
- 保證數(shù)據(jù)處理的一致性: – 實現(xiàn)exactly-once 或at-least-once 語義 – 確保在失敗時不丟失狀態(tài)
- 處理反壓和資源限制: – 當(dāng)系統(tǒng)出現(xiàn)反壓時可以從檢查點恢復(fù) – 資源緊張時重新平衡負(fù)載
2. Savepoint 應(yīng)用場景
(1) 應(yīng)用升級和版本遷移:
① 在升級前備份當(dāng)前狀態(tài) -- (通過Flink CLI創(chuàng)建保存點)
flink savepoint 1234567890abcdef hdfs:///flink/savepoints
② 停止當(dāng)前作業(yè)
flink cancel 1234567890abcdef
③ 部署新版本代碼
④ 從保存點恢復(fù)到新版本
flink run -s hdfs:///flink/savepoints/savepoint-1234567-aabbccdd new-version.jar
(2) 集群遷移或擴展:
① 在原集群創(chuàng)建保存點
flink savepoint 1234567890abcdef hdfs:///flink/savepoints
② 在新集群使用保存點恢復(fù)作業(yè)
flink run -s hdfs:///flink/savepoints/savepoint-1234567-aabbccdd jarfile.jar
(3) A/B 測試和算法切換:
# 創(chuàng)建保存點
flink savepoint 1234567890abcdef hdfs:///flink/savepoints
# 使用保存點啟動算法A的實現(xiàn)
flink run -s hdfs:///flink/savepoints/savepoint-1234567-aabbccdd algorithm-a.jar
# 使用同一保存點啟動算法B的實現(xiàn)進行比較
flink run -s hdfs:///flink/savepoints/savepoint-1234567-aabbccdd algorithm-b.jar
(4) 生產(chǎn)環(huán)境回滾:
# 當(dāng)新版本出現(xiàn)問題時,使用之前的保存點回滾
flink run -s hdfs:///flink/savepoints/savepoint-previous-version old-version.jar