Flink SQL 知其所以然:Deduplication去重以及如何獲取最新狀態操作
作者:antigeneral了呀
今天我們來學習 Flink SQL 中的 Deduplication 去重以及如何通過 Deduplication 操作獲取最新的狀態。
大家好,我是老羊,今天我們來學習 Flink SQL 中的 Deduplication 去重以及如何通過 Deduplication 操作獲取最新的狀態。
- Deduplication 定義(支持 Batch\Streaming):Deduplication 其實就是去重,也即上文介紹到的 TopN 中 row_number = 1 的場景,但是這里有一點不一樣在于其排序字段一定是時間屬性列,不能是其他非時間屬性的普通列。在 row_number = 1 時,如果排序字段是普通列 planner 會翻譯成 TopN 算子,如果是時間屬性列 planner 會翻譯成 Deduplication,這兩者最終的執行算子是不一樣的,Deduplication 相比 TopN 算子專門做了對應的優化,性能會有很大提升。
- 應用場景:比如上游數據發重了,或者計算 DAU 明細數據等場景,都可以使用 Deduplication 語法去做去重。
- SQL 語法標準:
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1
其中:
- ROW_NUMBER():標識當前數據的排序值。
- PARTITION BY col1[, col2...]:標識分區字段,代表按照這個 col 字段作為分區粒度對數據進行排序。
- ORDER BY time_attr [asc|desc]:標識排序規則,必須為時間戳列,當前 Flink SQL 支持處理時間、事件時間,ASC 代表保留第一行,DESC 代表保留最后一行。
- WHERE rownum = 1:這個子句是一定需要的,而且必須為 rownum = 1。
- 實際案例:
博主這里舉兩個案例:
- 案例 1(事件時間):是騰訊 QQ 用戶等級的場景,每一個 QQ 用戶都有一個 QQ 用戶等級,需要求出當前用戶等級在星星,月亮,太陽 的用戶數分別有多少。
-- 數據源:當每一個用戶的等級初始化及后續變化的時候的數據,即用戶等級變化明細數據。
CREATE TABLE source_table (
user_id BIGINT COMMENT '用戶 id',
level STRING COMMENT '用戶等級',
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)) COMMENT '事件時間戳',
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.level.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '1000000'
);
-- 數據匯:輸出即每一個等級的用戶數
CREATE TABLE sink_table (
level STRING COMMENT '等級',
uv BIGINT COMMENT '當前等級用戶數',
row_time timestamp(3) COMMENT '時間戳'
) WITH (
'connector' = 'print'
);
-- 處理邏輯:
INSERT INTO sink_table
select
level
, count(1) as uv
, max(row_time) as row_time
from (
SELECT
user_id,
level,
row_time,
row_number() over(partition by user_id order by row_time) as rn
FROM source_table
)
where rn = 1
group by
level
輸出結果:
+I[等級 1, 6928, 2021-1-28T22:34]
-I[等級 1, 6928, 2021-1-28T22:34]
+I[等級 1, 8670, 2021-1-28T22:34]
-I[等級 1, 8670, 2021-1-28T22:34]
+I[等級 1, 77287, 2021-1-28T22:34]
...
可以看到其有回撤數據。
其對應的 SQL 語義如下:
- 數據源:消費到 Kafka 中數據后,將數據按照 partition by 的 key 通過 hash 分發策略發送到下游去重算子。
- Deduplication 去重算子:接受到上游數據之后,根據 order by 中的條件判斷當前的這條數據和之前數據時間戳大小,以上面案例來說,如果當前數據時間戳大于之前數據時間戳,則撤回之前向下游發的中間結果,然后將最新的結果發向下游(發送策略也為 hash,具體的 hash 策略為按照 group by 中 key 進行發送),如果當前數據時間戳小于之前數據時間戳,則不做操作。次算子產出的結果就是每一個用戶的對應的最新等級信息。
- Group by 聚合算子:接受到上游數據之后,根據 Group by 聚合粒度對數據進行聚合計算結果(每一個等級的用戶數),發往下游數據匯算子。
- 數據匯:接收到上游的數據之后,然后輸出到外部存儲引擎中。
- 案例 2(處理時間):最原始的日志是明細數據,需要我們根據用戶 id 篩選出這個用戶當天的第一條數據,發往下游,下游可以據此計算分各種維度的 DAU。
-- 數據源:原始日志明細數據
CREATE TABLE source_table (
user_id BIGINT COMMENT '用戶 id',
name STRING COMMENT '用戶姓名',
server_timestamp BIGINT COMMENT '用戶訪問時間戳',
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.name.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10',
'fields.server_timestamp.min' = '1',
'fields.server_timestamp.max' = '100000'
);
-- 數據匯:根據 user_id 去重的第一條數據
CREATE TABLE sink_table (
user_id BIGINT,
name STRING,
server_timestamp BIGINT
) WITH (
'connector' = 'print'
);
-- 處理邏輯:
INSERT INTO sink_table
select user_id,
name,
server_timestamp
from (
SELECT
user_id,
name,
server_timestamp,
row_number() over(partition by user_id order by proctime) as rn
FROM source_table
)
where rn = 1
輸出結果:
+I[1, 用戶 1, 2021-1-28T22:34]
+I[2, 用戶 2, 2021-1-28T22:34]
+I[3, 用戶 3, 2021-1-28T22:34]
...
可以看到這個處理邏輯是沒有回撤數據的。其對應的 SQL 語義如下:
- 數據源:消費到 Kafka 中數據后,將數據按照 partition by 的 key 通過 hash 分發策略發送到下游去重算子。
- Deduplication 去重算子:處理時間語義下,如果是當前 key 的第一條數據,則直接發往下游,如果判斷(根據 state 中是否存儲過改 key)不是第一條,則直接丟棄。
- 數據匯:接收到上游的數據之后,然后輸出到外部存儲引擎中。
注意:
在 Deduplication 關于是否會出現回撤流,博主總結如下:
- ? Order by 事件時間 DESC:會出現回撤流,因為當前 key 下可能會有 比當前事件時間還大的數據。
- ? Order by 事件時間 ASC:會出現回撤流,因為當前 key 下可能會有 比當前事件時間還小的數據。
- ? Order by 處理時間 DESC:會出現回撤流,因為當前 key 下可能會有 比當前處理時間還大的數據。
- ? Order by 處理時間 ASC:不會出現回撤流,因為當前 key 下不可能會有 比當前處理時間還小的數據。
責任編輯:姜華
來源:
大數據羊說