Flink SQL 知其所以然:基礎 DML SQL 執行語義!
1.DML:With 子句?
- 應用場景(支持 Batch\Streaming):With 語句和離線 Hive SQL With 語句一樣的,xdm,語法糖 +1,使用它可以讓你的代碼邏輯更加清晰。
- 直接上案例:
-- 語法糖+1
WITH orders_with_total AS (
SELECT
order_id
, price + tax AS total
FROM Orders
)
SELECT
order_id
, SUM(total)
FROM orders_with_total
GROUP BY
order_id;
2.DML:SELECT & WHERE 子句?
INSERT INTO target_table
SELECT * FROM Orders
INSERT INTO target_table
SELECT order_id, price + tax FROM Orders
INSERT INTO target_table
-- 自定義 Source 的數據
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price)
INSERT INTO target_table
SELECT price + tax FROM Orders WHERE id = 10
-- 使用 UDF 做字段標準化處理
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
-- 過濾條件
Where id > 3
- SQL 語義:
其實理解一個 SQL 最后生成的任務是怎樣執行的,最好的方式就是理解其語義。
以下面的 SQL 為例,我們來介紹下其在離線中和在實時中執行的區別,對比學習一下,大家就比較清楚了。
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
Where id > 3
這個 SQL 對應的實時任務,假設 Orders 為 kafka,target_table 也為 Kafka,在執行時,會生成三個算子:
- 數據源算子(From Order):連接到 Kafka topic,數據源算子一直運行,實時的從 Order Kafka 中一條一條的讀取數據,然后一條一條發送給下游的 過濾和字段標準化算子。
- 過濾和字段標準化算子(Where id > 3 和 PRETTY_PRINT(order_id)):接收到上游算子發的一條一條的數據,然后判斷 id > 3?將判斷結果為 true 的數據執行 PRETTY_PRINT UDF 后,一條一條將計算結果數據發給下游 數據匯算子。
- 數據匯算子(INSERT INTO target_table):接收到上游發的一條一條的數據,寫入到 target_table Kafka 中。
可以看到這個實時任務的所有算子是以一種 pipeline 模式運行的,所有的算子在同一時刻都是處于 running 狀態的,24 小時一直在運行,實時任務中也沒有離線中常見的分區概念。
select & where
關于看如何看一段 Flink SQL 最終的執行計劃:
最好的方法就如上圖,看 Flink web ui 的算子圖,算子圖上詳細的標記清楚了每一個算子做的事情。以上圖來說,我們可以看到主要有三個算子:
- Source 算子:Source: TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, name]) -> Calc(select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]) -> WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]) ,其中 Source 表名稱為 table=[[default_catalog, default_database, Orders],字段為 select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time],Watermark 策略為 rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]。
- 過濾算子:Calc(select=[order_id, name, row_time], where=[(order_id > 3)]) -> NotNullEnforcer(fields=[order_id]),其中過濾條件為 where=[(order_id > 3)],結果字段為 select=[order_id, name, row_time]
- Sink 算子:Sink: Sink(table=[default_catalog.default_database.target_table], fields=[order_id, name, row_time]),其中最終產出的表名稱為 table=[default_catalog.default_database.target_table],表字段為 fields=[order_id, name, row_time]。
可以看到 Flink SQL 具體執行了哪些操作是非常詳細的標記在算子圖上。所以小伙伴萌一定要學會看算子圖,這是掌握 debug、調優前最基礎的一個技巧。
那么如果這個 SQL 放在 Hive 中執行時,假設其中 Orders 為 Hive 表,target_table 也為 Hive 表,其也會生成三個類似的算子(雖然實際可能會被優化為一個算子,這里為了方便對比,劃分為三個進行介紹),離線和實時任務的執行方式完全不同:
- 數據源算子(From Order):數據源從 Order Hive 表(通常都是讀一天、一小時的分區數據)中一次性讀取所有的數據,然后將讀到的數據全部發給下游 過濾字段標準化算子,然后 數據源算子就運行結束了,釋放資源了。
- 過濾和字段標準化算子(Where id > 3 和 PRETTY_PRINT(order_id)):接收到上游算子的所有數據,然后遍歷所有數據判斷 id > 3?將判斷結果為 true 的數據執行 PRETTY_PRINT UDF 后,將所有數據發給下游 數據匯算子,然后 過濾和字段標準化算子 就運行結束了,釋放資源了。
- 數據匯算子(INSERT INTO target_table):接收到上游的所有數據,將所有數據都寫到 target_table Hive 表中,然后整個任務就運行結束了,整個任務的資源也就都釋放了。
可以看到離線任務的算子是分階段(stage)進行運行的,每一個 stage 運行結束之后,然后下一個 stage 開始運行,全部的 stage 運行完成之后,這個離線任務就跑結束了。
注意:
很多小伙伴都是之前做過離線數倉的,熟悉了離線的分區、計算任務定時調度運行這兩個概念,所以在最初接觸 Flink SQL 時,會以為 Flink SQL 實時任務也會存在這兩個概念,這里博主做一下解釋。
- 分區概念:離線由于能力限制問題,通常都是進行一批一批的數據計算,每一批數據的數據量都是有限的集合,這一批一批的數據自然的劃分方式就是時間,比如按小時、天進行劃分分區。但是 在實時任務中,是沒有分區的概念的,實時任務的上游、下游都是無限的數據流。
- 計算任務定時調度概念:同上,離線就是由于計算能力限制,數據要一批一批算,一批一批輸入、產出,所以要按照小時、天定時的調度和計算。但是在實時任務中,是沒有定時調度的概念的,實時任務一旦運行起來就是 24 小時不間斷,不間斷的處理上游無限的數據,不簡單的產出數據給到下游。
3.DML:SELECT DISTINCT 子句
- 應用場景(支持 Batch\Streaming):語句和離線 Hive SQL SELECT DISTINCT 語句一樣的,xdm,用作根據 key 進行數據去重。
- 直接上案例:
INSERT into target_table
SELECT
DISTINCT id
FROM Orders
- SQL 語義:
也是拿離線和實時做對比。
這個 SQL 對應的實時任務,假設 Orders 為 kafka,target_table 也為 Kafka,在執行時,會生成三個算子:
- 數據源算子(From Order):連接到 Kafka topic,數據源算子一直運行,實時的從 Order Kafka 中一條一條的讀取數據,然后一條一條發送給下游的 去重算子。
- 去重算子(DISTINCT id):接收到上游算子發的一條一條的數據,然后判斷這個 id 之前是否已經來過了,判斷方式就是使用 Flink 中的 state 狀態,如果狀態中已經有這個 id 了,則說明已經來過了,不往下游算子發,如果狀態中沒有這個 id,則說明沒來過,則往下游算子發,也是一條一條發給下游 。 數據匯算子數據匯算子(INSERT INTO target_table):接收到上游發的一條一條的數據,寫入到target_table Kafka 中。
select distinct
注意:
對于實時任務,計算時的狀態可能會無限增長。
狀態大小取決于不同 key(上述案例為 id 字段)的數量。為了防止狀態無限變大,我們可以設置狀態的 TTL。但是這可能會影響查詢結果的正確性,比如某個 key 的數據過期從狀態中刪除了,那么下次再來這么一個 key,由于在狀態中找不到,就又會輸出一遍。
那么如果這個 SQL 放在 Hive 中執行時,假設其中 Orders 為 Hive 表,target_table 也為 Hive 表,其也會生成三個相同的算子(雖然可能會被優化為一個算子,這里為了方便對比,劃分為三個進行介紹),但是其和實時任務的執行方式完全不同:
- 數據源算子(From Order):數據源從 Order Hive 表(通常都有天、小時分區限制)中一次性讀取所有的數據,然后將讀到的數據全部發給下游去重算子,然后 數據源算子 就運行結束了,釋放資源了。
- 去重算子(DISTINCT id):接收到上游算子的所有數據,然后遍歷所有數據進行去重,將去重完的所有結果數據發給下游 數據匯算子,然后 去重算子就運行結束了,釋放資源了。
- 數據匯算子(INSERT INTO target_table):接收到上游的所有數據,將所有數據都寫到 target_table Hive 中,然后整個任務就運行結束了,整個任務的資源也就都釋放了。