小米面試:Paimon Join 用法有哪些?大規模數據場景下如何優化 Join 性能?
Apache Paimon 是一個開源的流式數據湖框架,它支持多種 Join 操作,使用戶能夠在不同場景下高效地關聯數據。本文將詳細介紹 Paimon 中的各類 Join 語法,包括 Lookup Join、Batch Join 以及其他特殊類型的 Join,并提供詳細的案例說明。
一、Lookup Join
Lookup Join 是 Paimon 中最重要的 Join 類型之一,主要用于流式查詢中將流數據與維度表數據進行關聯。它要求一個表具有處理時間屬性,另一個表作為查找源。
1. 基本概念
Lookup Join 的核心思想是:當流數據到達時,系統會查找維度表中的匹配記錄,然后將兩者關聯起來。這種 Join 方式特別適合于數據流需要與相對靜態的維度數據進行豐富的場景。
2. 語法結構
Paimon 中 Lookup Join 的基本語法如下:
SELECT [列名列表]
FROM 流表 AS 別名1
JOIN 維度表 FOR SYSTEM_TIME AS OF 別名1.處理時間列 AS 別名2
ON 別名1.關聯列 = 別名2.關聯列
其中:
- FOR SYSTEM_TIME AS OF:是 Lookup Join 的關鍵語法
- 處理時間列:必須是流表中的 PROCTIME() 類型列
- 維度表通常是 Paimon 表,具有主鍵
3. 基本案例
以下是一個基本的 Lookup Join 案例:
-- 創建維度表
CREATE TABLE customers (
id INT PRIMARY KEY NOT ENFORCED,
name STRING,
country STRING,
zip STRING
);
-- 插入數據到維度表
INSERT INTO customers VALUES (1, 'Alice', 'USA', '10001'), (2, 'Bob', 'UK', '20002');
-- 創建流表
CREATE TABLE orders (
order_id INT,
total INT,
customer_id INT,
proc_time AS PROCTIME()
);
-- Lookup Join 查詢
SELECT o.order_id, o.total, c.name, c.country
FROM orders AS o
JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
在這個案例中,每當一條新的訂單記錄到達時,系統會查找對應的客戶信息并關聯起來。
4. 緩存模式
Paimon 支持多種緩存策略來優化 Lookup Join 的性能:
(1) AUTO 模式
AUTO 模式是默認的緩存策略,系統會自動選擇最高效的緩存方式。
CREATE TABLE dim (
id INT PRIMARY KEY NOT ENFORCED,
value STRING
) WITH (
'continuous.discovery-interval' = '1 s',
'lookup.cache' = 'auto'
);
(2) FULL 模式
FULL 模式會緩存整個維度表,適用于小到中等大小的維度表。
CREATE TABLE dim (
id INT PRIMARY KEY NOT ENFORCED,
value STRING
) WITH (
'continuous.discovery-interval' = '1 s',
'lookup.cache' = 'full'
);
5. 高級特性
(1) 動態分區查找
對于分區表,Paimon 支持動態分區選擇,可以顯著提高性能:
-- 使用最新分區進行查找
SELECT * FROM orders AS o
LEFT JOIN customers /*+ OPTIONS('lookup.dynamic-partition'='max_pt()') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
分區表達式函數包括:
- max_pt() - 最新分區
- max_pt(n) - 第 n 個最新分區
- 自定義 SQL 表達式
(2) 異步查找
Paimon 支持異步查找,可以提高 Lookup Join 的性能:
SELECT o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='16') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
(3) 重試機制
對于可能因數據不就緒而導致的 Join 失敗,Paimon 提供了重試機制:
-- 同步重試
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
-- 異步重試
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'output-mode'='allow_unordered', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='16') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
(4) 刷新黑名單時段
可以定義不應刷新查找緩存的時間段,這對于防止在高峰負載期間或維護窗口期間刷新緩存很有用:
CREATE TABLE dim (
id INT PRIMARY KEY NOT ENFORCED,
value STRING
) WITH (
'lookup.refresh-time-periods-blacklist' = '2023-10-31 12:00->2023-10-31 16:00'
);
格式為 start-time->end-time,時間戳采用 yyyy-MM-dd HH:mm 格式。多個時段可以用逗號分隔。
6. 查詢服務優化
Paimon 提供了查詢服務功能,可以顯著提高 Lookup Join 的性能:
-- 啟動查詢服務
CALL sys.query_service('database_name.table_name', parallelism);
-- 使用查詢服務的 Lookup Join
SELECT o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
7. 大規模 Lookup Join 優化
對于數據量較大的維度表,Paimon 提供了 shuffle lookup 優化:
-- 啟用 shuffle lookup 優化
SELECT /*+ LOOKUP('table'='c', 'shuffle'='true') */
o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
8. 復雜條件 Lookup Join
Paimon 支持在 Lookup Join 中使用復雜的連接條件:
-- 帶過濾條件的 Lookup Join
SELECT T.i, D.j, D.k1
FROM T
LEFT JOIN DIM for system_time as of T.proctime AS D
ON T.i = D.i AND D.k1 > 111;
二、Batch Join
除了 Lookup Join,Paimon 還支持批處理場景下的各種 Join 操作。
1. 基本 Join 類型
(1) INNER JOIN
內連接返回兩個表中滿足連接條件的記錄:
SELECT a.id, a.name, b.value
FROM table_a a
INNER JOIN table_b b
ON a.id = b.id;
(2) LEFT JOIN
左連接返回左表中的所有記錄,以及右表中滿足連接條件的記錄:
SELECT a.id, a.name, b.value
FROM table_a a
LEFT JOIN table_b b
ON a.id = b.id;
(3) RIGHT JOIN
右連接返回右表中的所有記錄,以及左表中滿足連接條件的記錄:
SELECT a.id, a.name, b.value
FROM table_a a
RIGHT JOIN table_b b
ON a.id = b.id;
(4) FULL JOIN
全連接返回兩個表中的所有記錄,無論它們是否滿足連接條件:
SELECT a.id, a.name, b.value
FROM table_a a
FULL JOIN table_b b
ON a.id = b.id;
2. 動態分區剪枝
Paimon 支持動態分區剪枝,可以顯著提高分區表的 Join 性能:
-- 創建維度表
CREATE TABLE dim (x INT PRIMARY KEY NOT ENFORCED, y STRING, z INT);
INSERT INTO dim VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2);
-- 創建分區事實表
CREATE TABLE fact (a INT, b BIGINT, c STRING, p INT, PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p);
INSERT INTO fact PARTITION (p = 1) VALUES (10, 100, 'aaa'), (11, 101, 'bbb'), (12, 102, 'ccc');
INSERT INTO fact PARTITION (p = 2) VALUES (20, 200, 'aaa'), (21, 201, 'bbb'), (22, 202, 'ccc');
INSERT INTO fact PARTITION (p = 3) VALUES (30, 300, 'aaa'), (31, 301, 'bbb'), (32, 302, 'ccc');
-- 使用動態分區剪枝的 Join
SELECT a, b, c, p, x, y FROM fact INNER JOIN dim ON x = p and z = 1 ORDER BY a;
3. Bucket Join
對于使用相同 bucket 策略的表,Paimon 支持高效的 Bucket Join:
-- 創建具有相同 bucket 配置的表
CREATE TABLE t1 (id INT, c STRING) using paimon TBLPROPERTIES ('primary-key' = 'id', 'bucket'='10');
INSERT INTO t1 VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 'x4'), (5, 'x5');
CREATE TABLE t2 (id INT, c STRING) using paimon TBLPROPERTIES ('primary-key' = 'id', 'bucket'='10');
INSERT INTO t2 VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 'x4'), (5, 'x5');
-- Bucket Join
SELECT * FROM t1 JOIN t2 on t1.id = t2.id order by t1.id;
4. 多表 Join
Paimon 支持多表 Join,可以同時關聯多個表:
-- 多表 Join
SELECT a.id, a.name, b.value, c.description
FROM table_a a
JOIN table_b b ON a.id = b.id
JOIN table_c c ON a.id = c.id;
5. 自連接
Paimon 支持自連接,即表與自身進行 Join:
-- 自連接
SELECT a.id, a.name, b.name as manager_name
FROM employees a
JOIN employees b ON a.manager_id = b.id;
三、特殊 Join 類型
1. 大規模數據的 Bucket Shuffle Join
對于數據量特別大的維度表,傳統的 Lookup Join 可能會導致每個 Flink 子任務都需要存儲整個維度表的副本,這會造成內存壓力。Paimon 提供了 Bucket Shuffle 優化策略,特別適用于固定桶的 Paimon 表。
-- 啟用 bucket shuffle 優化的 Lookup Join
SELECT /*+ LOOKUP('table'='c', 'shuffle'='true') */
o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
這種優化將相同桶的數據發送到指定的子任務,使每個 Flink 子任務只需存儲部分數據,而不是整個表的數據。這種方式適用于 Flink 2.0+ 版本和使用固定桶策略的 Paimon 表。
2. 動態分區 Lookup Join
在傳統數據倉庫中,每個分區通常維護最新的完整數據。對于這種場景,Paimon 專門開發了 max_pt() 功能,允許 Lookup Join 只關聯最新分區的數據。
首先,創建一個分區表:
CREATE TABLE customers (
id INT,
name STRING,
country STRING,
zip STRING,
dt STRING,
PRIMARY KEY (id, dt) NOT ENFORCED
) PARTITIONED BY (dt);
然后,使用動態分區 Lookup Join:
SELECT o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers /*+ OPTIONS('scan.partitions'='max_pt()', 'lookup.dynamic-partition.refresh-interval'='1 h') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
Lookup 節點會自動刷新最新分區并查詢最新分區的數據。scan.partitions 選項也可以指定固定分區,格式為 key1=value1,key2=value2,多個分區用分號 (;) 分隔。
3. Merge Into 操作
Paimon 支持 Merge Into 操作,這是一種特殊的 Join 類型,用于根據源表數據更新目標表。它使用"upsert"語義,意味著如果行存在則更新,否則插入。
MERGE INTO target_table [AS target_alias]
USING source_table [AS source_alias]
ON merge-condition
WHEN MATCHED [AND matched-condition]
THEN UPDATE SET xxx
WHEN MATCHED [AND matched-condition]
THEN DELETE
WHEN NOT MATCHED [AND not_matched_condition]
THEN INSERT VALUES (xxx)
WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
THEN UPDATE SET xxx
WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
THEN DELETE
例如,查找源表中提到的所有訂單,如果價格高于 100 則標記為重要,如果價格低于 10 則刪除:
MERGE INTO T
USING S
ON T.id = S.order_id
WHEN MATCHED AND T.price > 100
THEN UPDATE SET mark = 'important'
WHEN MATCHED AND T.price < 10
THEN DELETE
Merge Into 操作在 Spark 中也有支持:
// 只更新匹配的記錄
MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED THEN
UPDATE SET a = source.a, b = source.b, c = source.c
四、Paimon 中 Join 的優化技術
1. 查詢服務優化
Paimon 提供了查詢服務功能,可以顯著提高 Lookup Join 的性能。通過運行一個 Flink 流式作業來啟動表的查詢服務,當查詢服務存在時,Flink Lookup Join 會優先從中獲取數據。
-- 使用 Flink SQL 啟動查詢服務
CALL sys.query_service('database_name.table_name', parallelism);
也可以使用 Flink Action 啟動查詢服務。
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
query_service \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
[--parallelism <parallelism>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]