Flink SQL 知其所以然:Group 聚合操作
Group 聚合
- Group 聚合定義(支持 Batch\Streaming 任務):Flink 也支持 Group 聚合。Group 聚合和上面介紹到的窗口聚合的不同之處,就在于 Group 聚合是按照數據的類別進行分組,比如年齡、性別,是橫向的;而窗口聚合是在時間粒度上對數據進行分組,是縱向的。如下圖所示,就展示出了其區別。其中按顏色分 key(橫向)? 就是 Group 聚合,按窗口劃分(縱向) 就是窗口聚合。
tumble window + key
- 應用場景:一般用于對數據進行分組,然后后續使用聚合函數進行 count、sum 等聚合操作。
那么這時候,小伙伴萌就會問到,我其實可以把窗口聚合的寫法也轉換為 Group 聚合,只需要把 Group 聚合的 Group By key 換成時間就行,那這兩個聚合的區別到底在哪?
首先來舉一個例子看看怎么將窗口聚合轉換為 Group 聚合。假如一個窗口聚合是按照 1 分鐘的粒度進行聚合,如下 SQL:
-- 數據源表
CREATE TABLE source_table (
-- 維度數據
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)
-- 數據匯表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)
-- 數據處理邏輯
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 計算 uv 數
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start
from source_table
group by
dim,
-- 按照 Flink SQL tumble 窗口寫法劃分窗口
tumble(row_time, interval '1' minute)
轉換為 Group 聚合的寫法如下:
Group 聚合
-- 數據源表
CREATE TABLE source_table (
-- 維度數據
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 數據匯表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
-- 數據處理邏輯
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 計算 uv 數
count(distinct user_id) as uv,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group by
dim,
-- 將秒級別時間戳 / 60 轉化為 1min
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)
確實沒錯,上面這個轉換是一點問題都沒有的。
但是窗口聚合和 Group by 聚合的差異在于:
本質區別:窗口聚合是具有時間語義的,其本質是想實現窗口結束輸出結果之后,后續有遲到的數據也不會對原有的結果發生更改了,即輸出結果值是定值(不考慮 allowLateness)。而 Group by 聚合是沒有時間語義的,不管數據遲到多長時間,只要數據來了,就把上一次的輸出的結果數據撤回,然后把計算好的新的結果數據發出。
運行層面:窗口聚合是和 時間 綁定的,窗口聚合其中窗口的計算結果觸發都是由時間(Watermark)推動的。Group by 聚合完全由數據推動觸發計算,新來一條數據去根據這條數據進行計算出結果發出;由此可見兩者的實現方式也大為不同。
- SQL 語義
也是拿離線和實時做對比,Orders 為 kafka,target_table 為 Kafka,這個 SQL 生成的實時任務,在執行時,會生成三個算子:
數據源算子?(From Order):數據源算子一直運行,實時的從 Order Kafka 中一條一條的讀取數據,然后一條一條發送給下游的Group 聚合算子,向下游發送數據的 shuffle 策略是根據 group by 中的 key 進行發送,相同的 key 發到同一個 SubTask(并發) 中。
Group 聚合算子?(group by key + sum\count\max\min):接收到上游算子發的一條一條的數據,去狀態 state 中找這個 key 之前的 sum\count\max\min 結果。如果有結果oldResult?,拿出來和當前的數據進行sum\count\max\min? 計算出這個 key 的新結果newResult?,并將新結果[key, newResult]? 更新到 state 中,在向下游發送新計算的結果之前,先發一條撤回上次結果的消息-[key, oldResult]?,然后再將新結果發往下游+[key, newResult]?;如果 state 中沒有當前 key 的結果,則直接使用當前這條數據計算 sum\max\min 結果newResult?,并將新結果[key, newResult]? 更新到 state 中,當前是第一次往下游發,則不需要先發回撤消息,直接發送+[key, newResult]。
數據匯算子(INSERT INTO target_table):接收到上游發的一條一條的數據,寫入到 target_table Kafka 中。
這個實時任務也是 24 小時一直在運行的,所有的算子在同一時刻都是處于 running 狀態的。
特別注意:
- Group by 聚合涉及到了回撤流(也叫 retract 流),會產生回撤流是因為從整個 SQL 的語義來看,上游的 Kafk數據是源源不斷的,無窮無盡的,那么每次這個 SQL 任務產出的結果都是一個中間結果,所以每次結果發生更新時,都需要將上一次發出的中間結果給撤回,然后將最新的結果發下去。
- Group by 聚合涉及到了狀態:狀態大小也取決于不同 key 的數量。為了防止狀態無限變大,我們可以設置狀態的 TTL。以上面的 SQL 為例,上面 SQL 是按照分鐘進行聚合的,理論上到了今天,通常我們就可以不用關心昨天的數據了,那么我們可以設置狀態過期時間為一天。關于狀態過期時間的設置參數可以參考下文運行時參數 小節。
如果這個 SQL 放在 Hive 中執行時,其中 Orders 為 Hive,target_table 也為 Hive,其也會生成三個相同的算子,但是其和實時任務的執行方式完全不同:
- 數據源算子?(From Order):數據源算子從 Order Hive 中讀取到所有的數據,然后所有數據發送給下游的Group 聚合算子,向下游發送數據的 shuffle 策略是根據 group by 中的 key 進行發送,相同的 key 發到同一個算子中,然后這個算子就運行結束了,釋放資源了。
- Group 聚合算子?(group by + sum\count\max\min):接收到上游算子發的所有數據,然后遍歷計算 sum\count\max\min 結果,批量發給下游數據匯算子,這個算子也就運行結束了,釋放資源了。
- 數據匯算子(INSERT INTO target_table):接收到上游發的一條一條的數據,寫入到 target_table Hive 中,整個任務也就運行結束了,整個任務的資源也就都釋放了。
Group 聚合支持 Grouping sets、Rollup、Cube
Group 聚合也支持 Grouping sets、Rollup、Cube。
舉一個 Grouping sets 的案例:
SELECT
supplier_id
, rating
, product_id
, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
( supplier_id, product_id, rating ),
( supplier_id, product_id ),
( supplier_id, rating ),
( supplier_id ),
( product_id, rating ),
( product_id ),
( rating ),
( )
)?