成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Flink SQL 知其所以然:Group 聚合操作

數據庫 其他數據庫
架構

Group 聚合

  • Group 聚合定義(支持 Batch\Streaming 任務):Flink 也支持 Group 聚合。Group 聚合和上面介紹到的窗口聚合的不同之處,就在于 Group 聚合是按照數據的類別進行分組,比如年齡、性別,是橫向的;而窗口聚合是在時間粒度上對數據進行分組,是縱向的。如下圖所示,就展示出了其區別。其中按顏色分 key(橫向)? 就是 Group 聚合,按窗口劃分(縱向) 就是窗口聚合。

圖片

tumble window + key

  • 應用場景:一般用于對數據進行分組,然后后續使用聚合函數進行 count、sum 等聚合操作。

那么這時候,小伙伴萌就會問到,我其實可以把窗口聚合的寫法也轉換為 Group 聚合,只需要把 Group 聚合的 Group By key 換成時間就行,那這兩個聚合的區別到底在哪?

首先來舉一個例子看看怎么將窗口聚合轉換為 Group 聚合。假如一個窗口聚合是按照 1 分鐘的粒度進行聚合,如下 SQL:

-- 數據源表
CREATE TABLE source_table (
-- 維度數據
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)

-- 數據匯表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)

-- 數據處理邏輯
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 計算 uv 數
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start
from source_table
group by
dim,
-- 按照 Flink SQL tumble 窗口寫法劃分窗口
tumble(row_time, interval '1' minute)

轉換為 Group 聚合的寫法如下:

Group 聚合

-- 數據源表
CREATE TABLE source_table (
-- 維度數據
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);

-- 數據匯表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);

-- 數據處理邏輯
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 計算 uv 數
count(distinct user_id) as uv,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group by
dim,
-- 將秒級別時間戳 / 60 轉化為 1min
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

確實沒錯,上面這個轉換是一點問題都沒有的。

但是窗口聚合和 Group by 聚合的差異在于:

本質區別:窗口聚合是具有時間語義的,其本質是想實現窗口結束輸出結果之后,后續有遲到的數據也不會對原有的結果發生更改了,即輸出結果值是定值(不考慮 allowLateness)。而 Group by 聚合是沒有時間語義的,不管數據遲到多長時間,只要數據來了,就把上一次的輸出的結果數據撤回,然后把計算好的新的結果數據發出。

運行層面:窗口聚合是和 時間 綁定的,窗口聚合其中窗口的計算結果觸發都是由時間(Watermark)推動的。Group by 聚合完全由數據推動觸發計算,新來一條數據去根據這條數據進行計算出結果發出;由此可見兩者的實現方式也大為不同。

  • SQL 語義

也是拿離線和實時做對比,Orders 為 kafka,target_table 為 Kafka,這個 SQL 生成的實時任務,在執行時,會生成三個算子:

數據源算子?(From Order):數據源算子一直運行,實時的從 Order Kafka 中一條一條的讀取數據,然后一條一條發送給下游的Group 聚合算子,向下游發送數據的 shuffle 策略是根據 group by 中的 key 進行發送,相同的 key 發到同一個 SubTask(并發) 中。

Group 聚合算子?(group by key + sum\count\max\min):接收到上游算子發的一條一條的數據,去狀態 state 中找這個 key 之前的 sum\count\max\min 結果。如果有結果oldResult?,拿出來和當前的數據進行sum\count\max\min? 計算出這個 key 的新結果newResult?,并將新結果[key, newResult]? 更新到 state 中,在向下游發送新計算的結果之前,先發一條撤回上次結果的消息-[key, oldResult]?,然后再將新結果發往下游+[key, newResult]?;如果 state 中沒有當前 key 的結果,則直接使用當前這條數據計算 sum\max\min 結果newResult?,并將新結果[key, newResult]? 更新到 state 中,當前是第一次往下游發,則不需要先發回撤消息,直接發送+[key, newResult]。

數據匯算子(INSERT INTO target_table):接收到上游發的一條一條的數據,寫入到 target_table Kafka 中。

這個實時任務也是 24 小時一直在運行的,所有的算子在同一時刻都是處于 running 狀態的。

特別注意:

  • Group by 聚合涉及到了回撤流(也叫 retract 流),會產生回撤流是因為從整個 SQL 的語義來看,上游的 Kafk數據是源源不斷的,無窮無盡的,那么每次這個 SQL 任務產出的結果都是一個中間結果,所以每次結果發生更新時,都需要將上一次發出的中間結果給撤回,然后將最新的結果發下去。
  • Group by 聚合涉及到了狀態:狀態大小也取決于不同 key 的數量。為了防止狀態無限變大,我們可以設置狀態的 TTL。以上面的 SQL 為例,上面 SQL 是按照分鐘進行聚合的,理論上到了今天,通常我們就可以不用關心昨天的數據了,那么我們可以設置狀態過期時間為一天。關于狀態過期時間的設置參數可以參考下文運行時參數 小節。

如果這個 SQL 放在 Hive 中執行時,其中 Orders 為 Hive,target_table 也為 Hive,其也會生成三個相同的算子,但是其和實時任務的執行方式完全不同:

  • 數據源算子?(From Order):數據源算子從 Order Hive 中讀取到所有的數據,然后所有數據發送給下游的Group 聚合算子,向下游發送數據的 shuffle 策略是根據 group by 中的 key 進行發送,相同的 key 發到同一個算子中,然后這個算子就運行結束了,釋放資源了。
  • Group 聚合算子?(group by + sum\count\max\min):接收到上游算子發的所有數據,然后遍歷計算 sum\count\max\min 結果,批量發給下游數據匯算子,這個算子也就運行結束了,釋放資源了。
  • 數據匯算子(INSERT INTO target_table):接收到上游發的一條一條的數據,寫入到 target_table Hive 中,整個任務也就運行結束了,整個任務的資源也就都釋放了。

Group 聚合支持 Grouping sets、Rollup、Cube

Group 聚合也支持 Grouping sets、Rollup、Cube。

舉一個 Grouping sets 的案例:

SELECT 
supplier_id
, rating
, product_id
, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
( supplier_id, product_id, rating ),
( supplier_id, product_id ),
( supplier_id, rating ),
( supplier_id ),
( product_id, rating ),
( product_id ),
( rating ),
( )
)?

責任編輯:武曉燕 來源: 大數據羊說
相關推薦

2022-06-10 09:01:04

OverFlinkSQL

2022-07-05 09:03:05

Flink SQLTopN

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2022-06-29 09:01:38

FlinkSQL時間屬性

2022-05-18 09:02:28

Flink SQLSQL字符串

2022-05-15 09:57:59

Flink SQL時間語義

2022-05-27 09:02:58

SQLHive語義

2021-12-09 06:59:24

FlinkSQL 開發

2022-06-18 09:26:00

Flink SQLJoin 操作

2022-05-12 09:02:47

Flink SQL數據類型

2021-11-28 11:36:08

SQL Flink Join

2022-08-10 10:05:29

FlinkSQL

2021-11-27 09:03:26

flink join數倉

2021-09-12 07:01:07

Flink SQL ETL datastream

2021-12-17 07:54:16

Flink SQLTable DataStream

2022-07-12 09:02:18

Flink SQL去重

2021-12-06 07:15:47

開發Flink SQL

2022-05-09 09:03:04

SQL數據流數據

2021-11-24 08:17:21

Flink SQLCumulate WiSQL

2021-12-13 07:57:47

Flink SQL Flink Hive Udf
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 中文在线一区二区 | 欧美精品福利视频 | 精品久久久久久 | 91一区二区三区 | 色综合久久久 | 日韩欧美在线观看视频 | 粉嫩粉嫩芽的虎白女18在线视频 | 国产精品视频999 | 99re6在线视频精品免费 | 亚洲精品在| 在线观看亚洲一区二区 | 国产一二三区免费视频 | 国产综合av | 成人福利视频 | 国产亚洲一区在线 | 99福利视频 | 亚洲精品视频网站在线观看 | aa级毛片毛片免费观看久 | 久久一区二区三区免费 | 欧美黄色大片在线观看 | 国产高清在线 | 亚洲国产成人精品女人久久久 | 国产成人高清在线观看 | 成人福利网站 | 在线视频一区二区三区 | 一区二区三区小视频 | 日韩伦理电影免费在线观看 | 91传媒在线观看 | 一区二区中文字幕 | 日韩一区二区三区视频在线观看 | 天堂av在线影院 | 亚洲精品免费视频 | 国产激情精品 | 精品乱码久久久久 | 蜜桃官网 | 精品国产欧美 | 国产又色又爽又黄又免费 | 欧美日韩精品一区二区三区蜜桃 | 日韩无| 亚洲视频在线免费观看 | 日韩国产一区 |