成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Flink SQL 知其所以然:Deduplication去重以及如何獲取最新狀態操作

數據庫 其他數據庫
今天我們來學習 Flink SQL 中的 Deduplication 去重以及如何通過 Deduplication 操作獲取最新的狀態。

大家好,我是老羊,今天我們來學習 Flink SQL 中的 Deduplication 去重以及如何通過 Deduplication 操作獲取最新的狀態。

  1. Deduplication 定義(支持 Batch\Streaming):Deduplication 其實就是去重,也即上文介紹到的 TopN 中 row_number = 1 的場景,但是這里有一點不一樣在于其排序字段一定是時間屬性列,不能是其他非時間屬性的普通列。在 row_number = 1 時,如果排序字段是普通列 planner 會翻譯成 TopN 算子,如果是時間屬性列 planner 會翻譯成 Deduplication,這兩者最終的執行算子是不一樣的,Deduplication 相比 TopN 算子專門做了對應的優化,性能會有很大提升。
  2. 應用場景:比如上游數據發重了,或者計算 DAU 明細數據等場景,都可以使用 Deduplication 語法去做去重。
  3. SQL 語法標準:
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1

其中:

  • ROW_NUMBER():標識當前數據的排序值。
  • PARTITION BY col1[, col2...]:標識分區字段,代表按照這個 col 字段作為分區粒度對數據進行排序。
  • ORDER BY time_attr [asc|desc]:標識排序規則,必須為時間戳列,當前 Flink SQL 支持處理時間、事件時間,ASC 代表保留第一行,DESC 代表保留最后一行。
  • WHERE rownum = 1:這個子句是一定需要的,而且必須為 rownum = 1。
  1. 實際案例:

博主這里舉兩個案例:

  • 案例 1(事件時間):是騰訊 QQ 用戶等級的場景,每一個 QQ 用戶都有一個 QQ 用戶等級,需要求出當前用戶等級在星星,月亮,太陽 的用戶數分別有多少。
-- 數據源:當每一個用戶的等級初始化及后續變化的時候的數據,即用戶等級變化明細數據。
CREATE TABLE source_table (
user_id BIGINT COMMENT '用戶 id',
level STRING COMMENT '用戶等級',
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)) COMMENT '事件時間戳',
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.level.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '1000000'
);
-- 數據匯:輸出即每一個等級的用戶數
CREATE TABLE sink_table (
level STRING COMMENT '等級',
uv BIGINT COMMENT '當前等級用戶數',
row_time timestamp(3) COMMENT '時間戳'
) WITH (
'connector' = 'print'
);
-- 處理邏輯:
INSERT INTO sink_table
select
level
, count(1) as uv
, max(row_time) as row_time
from (
SELECT
user_id,
level,
row_time,
row_number() over(partition by user_id order by row_time) as rn
FROM source_table
)
where rn = 1
group by
level

輸出結果:

+I[等級 1, 6928, 2021-1-28T22:34]
-I[等級 1, 6928, 2021-1-28T22:34]
+I[等級 1, 8670, 2021-1-28T22:34]
-I[等級 1, 8670, 2021-1-28T22:34]
+I[等級 1, 77287, 2021-1-28T22:34]
...

可以看到其有回撤數據。

其對應的 SQL 語義如下:

  • 數據源:消費到 Kafka 中數據后,將數據按照 partition by 的 key 通過 hash 分發策略發送到下游去重算子。
  • Deduplication 去重算子:接受到上游數據之后,根據 order by 中的條件判斷當前的這條數據和之前數據時間戳大小,以上面案例來說,如果當前數據時間戳大于之前數據時間戳,則撤回之前向下游發的中間結果,然后將最新的結果發向下游(發送策略也為 hash,具體的 hash 策略為按照 group by 中 key 進行發送),如果當前數據時間戳小于之前數據時間戳,則不做操作。次算子產出的結果就是每一個用戶的對應的最新等級信息。
  • Group by 聚合算子:接受到上游數據之后,根據 Group by 聚合粒度對數據進行聚合計算結果(每一個等級的用戶數),發往下游數據匯算子。
  • 數據匯:接收到上游的數據之后,然后輸出到外部存儲引擎中。
  • 案例 2(處理時間):最原始的日志是明細數據,需要我們根據用戶 id 篩選出這個用戶當天的第一條數據,發往下游,下游可以據此計算分各種維度的 DAU。
-- 數據源:原始日志明細數據
CREATE TABLE source_table (
user_id BIGINT COMMENT '用戶 id',
name STRING COMMENT '用戶姓名',
server_timestamp BIGINT COMMENT '用戶訪問時間戳',
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.name.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10',
'fields.server_timestamp.min' = '1',
'fields.server_timestamp.max' = '100000'
);

-- 數據匯:根據 user_id 去重的第一條數據
CREATE TABLE sink_table (
user_id BIGINT,
name STRING,
server_timestamp BIGINT
) WITH (
'connector' = 'print'
);
-- 處理邏輯:
INSERT INTO sink_table
select user_id,
name,
server_timestamp
from (
SELECT
user_id,
name,
server_timestamp,
row_number() over(partition by user_id order by proctime) as rn
FROM source_table
)
where rn = 1

輸出結果:

+I[1, 用戶 1, 2021-1-28T22:34]
+I[2, 用戶 2, 2021-1-28T22:34]
+I[3, 用戶 3, 2021-1-28T22:34]
...

可以看到這個處理邏輯是沒有回撤數據的。其對應的 SQL 語義如下:

  • 數據源:消費到 Kafka 中數據后,將數據按照 partition by 的 key 通過 hash 分發策略發送到下游去重算子。
  • Deduplication 去重算子:處理時間語義下,如果是當前 key 的第一條數據,則直接發往下游,如果判斷(根據 state 中是否存儲過改 key)不是第一條,則直接丟棄。
  • 數據匯:接收到上游的數據之后,然后輸出到外部存儲引擎中。

注意:

在 Deduplication 關于是否會出現回撤流,博主總結如下:

  1. ? Order by 事件時間 DESC:會出現回撤流,因為當前 key 下可能會有 比當前事件時間還大的數據。
  2. ? Order by 事件時間 ASC:會出現回撤流,因為當前 key 下可能會有 比當前事件時間還小的數據。
  3. ? Order by 處理時間 DESC:會出現回撤流,因為當前 key 下可能會有 比當前處理時間還大的數據。
  4. ? Order by 處理時間 ASC:不會出現回撤流,因為當前 key 下不可能會有 比當前處理時間還小的數據。
責任編輯:姜華 來源: 大數據羊說
相關推薦

2022-07-05 09:03:05

Flink SQLTopN

2022-06-10 09:01:04

OverFlinkSQL

2022-06-06 09:27:23

FlinkSQLGroup

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2022-06-29 09:01:38

FlinkSQL時間屬性

2022-05-18 09:02:28

Flink SQLSQL字符串

2022-05-15 09:57:59

Flink SQL時間語義

2021-11-25 07:01:57

SQL應用場景

2022-05-27 09:02:58

SQLHive語義

2021-12-09 06:59:24

FlinkSQL 開發

2022-06-18 09:26:00

Flink SQLJoin 操作

2022-05-12 09:02:47

Flink SQL數據類型

2021-11-28 11:36:08

SQL Flink Join

2022-08-10 10:05:29

FlinkSQL

2021-11-27 09:03:26

flink join數倉

2021-09-12 07:01:07

Flink SQL ETL datastream

2021-12-17 07:54:16

Flink SQLTable DataStream

2021-12-06 07:15:47

開發Flink SQL

2022-05-09 09:03:04

SQL數據流數據

2021-11-24 08:17:21

Flink SQLCumulate WiSQL
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久久天堂网 | 在线免费国产 | 精品欧美一区二区三区精品久久 | 美女一区二区在线观看 | 国产区精品视频 | h视频在线免费观看 | 国产精品免费av | 999久久久 | 国产高清在线观看 | 久久99蜜桃综合影院免费观看 | 国产乱码久久久久久一区二区 | 国产精品欧美一区喷水 | 日韩av.com | 蜜桃特黄a∨片免费观看 | 欧美一区免费 | 91视频正在播放 | 午夜影院| 成人精品一区 | 久久久久黄| 国产香蕉视频在线播放 | 日韩免费视频一区二区 | 欧美三级电影在线播放 | 成人在线视频免费观看 | 天堂在线91 | 久热免费在线 | 国产成人精品一区二三区在线观看 | 成人在线免费观看av | 一区二区三区国产 | 欧美久久久久 | 中日字幕大片在线播放 | 国产免费看| 日韩av一区二区在线观看 | 国产一级大片 | 成人不卡视频 | 国产精品视频在线播放 | 国产91综合一区在线观看 | 天堂av中文在线 | 成人激情视频在线播放 | 久久综合一区二区三区 | 2022国产精品 | 亚洲九色 |