阿里面試:Fluss 是什么?如何使用 Fluss 構建流式湖倉一體?
一、湖倉概述
1. 傳統湖倉架構的挑戰
數據湖倉(Lakehouse)是一種結合了數據湖的可擴展性和成本效益與數據倉庫的可靠性和性能的新型開放架構。Apache Iceberg、Apache Paimon、Apache Hudi和Delta Lake等知名數據湖格式在湖倉架構中扮演著關鍵角色,它們在單一統一平臺內促進了數據存儲、可靠性和分析能力之間的和諧平衡。
然而,傳統湖倉架構面臨著實時性與分析能力平衡的挑戰:
(1) 實時性與分析效率的矛盾
- 如果要求低延遲,就需要頻繁寫入和提交,這會產生大量小型Parquet文件,導致讀取效率低下
- 如果要求讀取效率,就需要積累數據直到能寫入大型Parquet文件,但這會引入更高的延遲
(2) 數據新鮮度限制
即使在最佳使用條件下,這些數據湖格式通常也只能在分鐘級粒度內實現數據新鮮度。
二、流式湖倉一體化
1. 流與湖的統一
Fluss是一種支持亞秒級低延遲流式讀寫的流式存儲系統。通過Lakehouse Storage,Fluss在Lakehouse之上提供實時流數據服務,實現了數據流和數據湖倉的統一。這不僅為數據湖倉帶來了低延遲,還為數據流增加了強大的分析能力。
為了構建流式湖倉,Fluss維護了一個分層服務,該服務將實時數據從Fluss集群壓縮到存儲在Lakehouse Storage中的數據湖格式。Fluss集群中的數據(流式Arrow格式)針對低延遲讀寫進行了優化,而Lakehouse中的壓縮數據(帶壓縮的Parquet格式)針對強大的分析和長期數據存儲進行了優化。因此,Fluss集群中的數據作為實時數據層,保留了具有亞秒級新鮮度的數據;而Lakehouse中的數據作為歷史數據層,保留了具有分鐘級新鮮度的數據。
2. 共享數據與共享元數據
流式湖倉的核心理念是流和湖倉之間共享數據和共享元數據,避免數據重復和元數據不一致。它提供了以下強大功能:
- 統一元數據:Fluss為流和湖倉中的數據提供統一的表元數據。用戶只需處理一個表,但可以訪問實時流數據、歷史數據或它們的聯合。
- 聯合讀取:計算引擎對表執行查詢時將讀取實時流數據和湖倉數據的聯合。目前,只有Flink支持聯合讀取,但更多引擎正在規劃中。
- 實時湖倉:聯合讀取幫助湖倉從近實時分析發展到真正的實時分析,使企業能夠從實時數據中獲得更有價值的洞察。
- 分析流:聯合讀取幫助數據流具備強大的分析能力,這減少了開發流應用程序的復雜性,簡化了調試,并允許立即訪問實時數據洞察。
- 連接到湖倉生態系統:Fluss在將數據壓縮到Lakehouse時保持表元數據與數據湖目錄同步,這允許Spark、StarRocks、Flink、Trino等外部引擎通過連接到數據湖目錄直接讀取數據。
3. 數據湖集成
(1) Paimon集成
Apache Paimon創新地結合了湖格式和LSM結構,將高效更新引入湖架構。
要將Fluss與Paimon集成,必須啟用lakehouse存儲并將Paimon配置為lakehouse存儲。具體步驟如下:
① 配置Lakehouse存儲
首先,在server.yaml中配置lakehouse存儲:
datalake.format: paimon
# Paimon目錄配置,假設使用Filesystem目錄
datalake.paimon.metastore: filesystem
datalake.paimon.warehouse: /tmp/paimon_data_warehouse
② 啟動數據湖分層服務
然后,啟動數據湖分層服務,將Fluss的數據壓縮到lakehouse存儲:
# 切換到Fluss目錄
cd $FLUSS_HOME
# 啟動分層服務,假設rest端點是localhost:8081
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081
③ 為表啟用Lakehouse存儲
要為表啟用lakehouse存儲,必須在創建表時使用選項'table.datalake.enabled' = 'true':
CREATE TABLE datalake_enriched_orders (
`order_key` BIGINT,
`cust_key` INT NOT NULL,
`total_price` DECIMAL(15, 2),
`order_date` DATE,
`order_priority` STRING,
`clerk` STRING,
`cust_name` STRING,
`cust_phone` STRING,
`cust_acctbal` DECIMAL(15, 2),
`cust_mktsegment` STRING,
`nation_name` STRING,
PRIMARY KEY (`order_key`) NOT ENFORCED
) WITH ('table.datalake.enabled' = 'true');
當在Fluss中創建或修改帶有選項'table.datalake.enabled' = 'true'的表時,Fluss將創建一個具有相同表路徑的相應Paimon表。Paimon表的模式與Fluss表的模式相同,只是在最后附加了兩個額外的列__offset和__timestamp。這兩列用于幫助Fluss客戶端以流式方式消費Paimon中的數據,例如按偏移量/時間戳查找等。
然后,數據湖分層服務會持續將數據從Fluss壓縮到Paimon。對于主鍵表,它還將生成Paimon格式的變更日志,使您能夠以Paimon方式流式消費它。
(2) Iceberg集成規劃
目前,Fluss支持Paimon作為Lakehouse存儲,更多種類的數據湖格式正在規劃中。
Fluss社區正在積極努力增強流和湖倉統一功能,重點關注以下關鍵領域:
① 擴展聯合讀取生態系統
目前,聯合讀取功能已與Apache Flink集成,實現了實時和歷史數據的無縫查詢。未來,社區計劃擴展此功能以支持其他查詢引擎,如Apache Spark和StarRocks,進一步擴大其生態系統兼容性和采用。
② 多樣化湖存儲格式
目前,Fluss支持Apache Paimon作為其主要湖存儲。為了滿足多樣化的用戶需求,社區計劃添加對更多湖格式的支持,包括Apache Iceberg和Apache Hudi,從而提供更大的靈活性和與更廣泛的湖倉生態系統的互操作性。
4. 聯合讀取(Union Read)
(1) 實時數據與歷史數據的無縫結合
對于帶有選項'table.datalake.enabled' = 'true'的表,有兩部分數據:保留在Fluss中的數據和已經在Paimon中的數據。現在,您有兩種表視圖:一種是具有分鐘級延遲的Paimon數據視圖,一種是聯合Fluss和Paimon數據的完整數據視圖,具有秒級延遲。
Flink使您能夠決定選擇哪種視圖:
- 僅Paimon意味著更好的分析性能,但數據新鮮度較差
- 結合Fluss和Paimon意味著更好的數據新鮮度,但分析性能下降
① 僅讀取Paimon中的數據
要指定讀取Paimon中的數據,必須使用$lake后綴指定表,以下SQL顯示了如何執行此操作:
-- 假設我們有一個名為`orders`的表
-- 從paimon讀取
SELECT COUNT(*) FROM orders$lake;
-- 我們還可以查詢系統表
SELECT * FROM orders$lake$snapshots;
當在查詢中使用$lake后綴指定表時,它就像一個普通的Paimon表,因此它繼承了Paimon表的所有能力。您可以享受Flink在Paimon上支持/優化的所有功能,如查詢系統表、時間旅行等。
② 聯合讀取Fluss和Paimon中的數據
要指定讀取聯合Fluss和Paimon的完整數據,只需像查詢普通表一樣查詢它,無需任何后綴或其他內容,以下SQL顯示了如何執行此操作:
-- 查詢將聯合Fluss和Paimon的數據
SELECT SUM(order_count) as total_orders FROM ads_nation_purchase_power;
查詢可能看起來比只查詢Paimon中的數據慢,但它查詢了完整數據,這意味著更好的數據新鮮度。您可以多次運行查詢,由于數據持續寫入表中,每次運行都應該得到不同的結果。
(2) 查詢引擎支持
① Flink支持
Fluss向Flink用戶公開統一的API,允許他們選擇是使用聯合讀取還是僅在Lakehouse上進行讀取,使用以下SQL:
SELECT * FROM orders
這將讀取orders表的完整數據,Flink將聯合讀取Fluss和Lakehouse中的數據。如果用戶只需要讀取數據湖上的數據,可以在要讀取的表后添加$lake后綴。SQL如下:
-- 分析查詢
SELECT COUNT(*), MAX(t), SUM(amount)
FROM orders$lake
-- 查詢系統表
SELECT * FROM orders$lake$snapshots
5. 實時湖倉與分析流
(1) 為湖倉帶來真正的實時性
傳統的數據湖倉架構通常只能提供分鐘級的數據新鮮度,這對于許多實時分析場景來說是不夠的。Fluss的流式湖倉一體化架構通過聯合讀取(Union Read)機制,為湖倉帶來了真正的實時性:
- 秒級數據新鮮度:通過將Fluss的實時流數據與Paimon的歷史數據聯合起來,查詢可以訪問最新寫入的數據,實現秒級數據新鮮度。
- 實時分析能力增強:企業可以在保持湖倉強大分析能力的同時,獲得實時數據洞察,使決策更加及時和準確。
- 無需數據復制:不需要將數據復制到單獨的實時系統中,減少了存儲成本和數據一致性問題。
(2) 為數據流增加強大的分析能力
傳統的流處理系統通常缺乏強大的分析能力,而Fluss通過與湖倉的集成,為數據流增加了這些能力:
- 復雜分析查詢支持:流數據可以與歷史數據結合,支持更復雜的分析查詢,如聚合、窗口分析等。
- 開發簡化:減少了開發流應用程序的復雜性,簡化了調試過程。
- 生態系統集成:可以利用現有的湖倉生態系統工具和查詢引擎,如Spark、StarRocks、Trino等。
6. 流式湖倉一體化實施流程
下面是實施Fluss流式湖倉一體化的詳細步驟和流程圖:
(1) 準備環境
確保已安裝Fluss和Flink,并且兩者都正常運行。Fluss需要一個運行中的Flink集群來執行數據湖分層服務。
(2) 配置Lakehouse存儲
在Fluss的server.yaml配置文件中設置Lakehouse存儲:
# 指定數據湖格式為Paimon
datalake.format: paimon
# Paimon目錄配置
datalake.paimon.metastore: filesystem
datalake.paimon.warehouse: /path/to/paimon/warehouse
(3) 啟動數據湖分層服務
數據湖分層服務是一個Flink作業,負責將數據從Fluss壓縮到Paimon:
# 切換到Fluss安裝目錄
cd $FLUSS_HOME
# 啟動分層服務,指定Flink REST端點
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081
您還可以設置其他Flink配置參數,例如:
# 設置檢查點間隔為10秒
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081 -D flink.execution.checkpointing.interval=10s
# 僅同步特定數據庫中的表
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081 -D database=fluss_\\w+
(4) 創建啟用Lakehouse的表
使用Flink SQL創建啟用了Lakehouse的表,通過設置'table.datalake.enabled' = 'true'選項:
-- 創建一個啟用了Lakehouse的日志表
CREATE TABLE log_orders (
order_id BIGINT,
customer_id BIGINT,
order_time TIMESTAMP,
amount DECIMAL(10, 2),
status STRING
) WITH (
'bucket.num' = '4',
'table.datalake.enabled' = 'true'
);
-- 創建一個啟用了Lakehouse的主鍵表
CREATE TABLE pk_orders (
order_id BIGINT,
customer_id BIGINT,
order_time TIMESTAMP,
amount DECIMAL(10, 2),
status STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'bucket.num' = '4',
'table.datalake.enabled' = 'true'
);
當創建啟用了Lakehouse的表時,Fluss會自動創建一個對應的Paimon表,并在表末尾添加兩個額外的列__offset和__timestamp,用于支持流式消費。
(5) 數據寫入
使用標準的Flink SQL向表中寫入數據:
-- 插入數據到日志表
INSERT INTO log_orders
VALUES (1001, 101, TIMESTAMP '2025-05-18 10:30:00', 199.99, 'PENDING');
-- 插入數據到主鍵表
INSERT INTO pk_orders
VALUES (1001, 101, TIMESTAMP '2025-05-18 10:30:00', 199.99, 'PENDING');
-- 或者從其他表插入數據
INSERT INTO pk_orders
SELECT order_id, customer_id, order_time, amount, status
FROM source_table;
數據寫入后,會首先存儲在Fluss的實時層中,然后由數據湖分層服務定期壓縮到Paimon中。對于主鍵表,還會生成Paimon格式的變更日志。
(6) 數據查詢
① 聯合讀取(實時+歷史數據)
要查詢完整的數據(Fluss中的實時數據和Paimon中的歷史數據),直接查詢表名,無需任何后綴:
-- 聯合讀取實時和歷史數據
SELECT COUNT(*) FROM pk_orders;
-- 流式查詢,會持續獲取最新數據
SELECT * FROM pk_orders WHERE amount > 100;
聯合讀取提供秒級數據新鮮度,但可能會比僅查詢Paimon數據慢一些。由于數據持續寫入,多次運行相同的查詢可能會得到不同的結果。
② 僅讀取Lakehouse數據
要僅查詢Paimon中的歷史數據,在表名后添加$lake后綴:
-- 僅查詢Paimon中的歷史數據
SELECT COUNT(*) FROM pk_orders$lake;
-- 查詢Paimon系統表
SELECT * FROM pk_orders$lake$snapshots;
-- 使用Paimon的時間旅行功能
SELECT * FROM pk_orders$lake FOR TIMESTAMP AS OF '2025-05-17 00:00:00';
僅讀取Paimon數據提供更好的分析性能,但數據新鮮度較差(分鐘級)。您可以使用Paimon的所有功能,如查詢系統表、時間旅行等。
7. 流式湖倉一體化架構圖
三、實際應用場景示例
場景1:實時儀表盤
一個電子商務平臺需要一個實時儀表盤,顯示最新的銷售數據和趨勢。
實現方式:
- 創建啟用Lakehouse的訂單表
- 使用Flink SQL進行聯合讀取,獲取最新的銷售數據
- 將結果推送到儀表盤應用
-- 創建啟用Lakehouse的訂單表
CREATE TABLE sales_orders (
order_id BIGINT,
product_id BIGINT,
customer_id BIGINT,
order_time TIMESTAMP,
amount DECIMAL(10, 2),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'bucket.num' = '8',
'table.datalake.enabled' = 'true'
);
-- 實時計算最近1小時的銷售額
SELECT
TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,
SUM(amount) AS total_sales
FROM sales_orders
WHERE order_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE);
場景2:歷史數據分析與實時數據結合
一個金融機構需要分析客戶的歷史交易模式,并將其與實時交易數據結合,以檢測潛在的欺詐行為。
實現方式:
- 創建啟用Lakehouse的交易表
- 使用Paimon查詢歷史交易模式
- 使用聯合讀取將歷史模式與實時交易結合分析
-- 創建啟用Lakehouse的交易表
CREATE TABLE transactions (
transaction_id BIGINT,
account_id BIGINT,
transaction_time TIMESTAMP,
amount DECIMAL(15, 2),
merchant_id BIGINT,
location STRING,
PRIMARY KEY (transaction_id) NOT ENFORCED
) WITH (
'bucket.num' = '16',
'table.datalake.enabled' = 'true'
);
-- 查詢歷史交易模式(僅Paimon數據)
SELECT
account_id,
AVG(amount) AS avg_amount,
STDDEV(amount) AS stddev_amount
FROM transactions$lake
WHERE transaction_time BETWEEN TIMESTAMP '2025-04-01 00:00:00' AND TIMESTAMP '2025-05-01 00:00:00'
GROUP BY account_id;
-- 將歷史模式與實時交易結合分析(聯合讀取)
WITH historical_patterns AS (
SELECT
account_id,
AVG(amount) AS avg_amount,
STDDEV(amount) AS stddev_amount
FROM transactions$lake
WHERE transaction_time BETWEEN TIMESTAMP '2025-04-01 00:00:00' AND TIMESTAMP '2025-05-01 00:00:00'
GROUP BY account_id
)
SELECT
t.transaction_id,
t.account_id,
t.transaction_time,
t.amount,
t.merchant_id,
t.location,
h.avg_amount,
h.stddev_amount,
CASE
WHEN ABS(t.amount - h.avg_amount) > 3 * h.stddev_amount THEN 'SUSPICIOUS'
ELSE 'NORMAL'
END AS risk_level
FROM transactions t
JOIN historical_patterns h ON t.account_id = h.account_id
WHERE t.transaction_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR;
場景3:實時數據湖分層架構
一個大型零售企業需要構建一個數據湖分層架構(Bronze、Silver、Gold),同時保持各層的數據新鮮度。
實現方式:
- 創建各層的啟用Lakehouse的表
- 使用Flink SQL將數據從一層轉換到下一層
- 利用聯合讀取確保各層都具有秒級數據新鮮度
-- Bronze層:創建原始銷售數據表
CREATE TABLE sales_raw (
event_time TIMESTAMP,
store_id BIGINT,
product_id BIGINT,
quantity INT,
amount DECIMAL(10, 2),
payment_method STRING,
source STRING
) WITH (
'bucket.num' = '16',
'table.datalake.enabled' = 'true'
);
-- Silver層:創建清洗后的銷售數據表
CREATE TABLE sales_cleaned (
event_time TIMESTAMP,
store_id BIGINT,
product_id BIGINT,
quantity INT,
amount DECIMAL(10, 2),
payment_method STRING,
source STRING,
PRIMARY KEY (store_id, product_id, event_time) NOT ENFORCED
) WITH (
'bucket.num' = '16',
'table.datalake.enabled' = 'true'
);
-- Gold層:創建聚合的銷售指標表
CREATE TABLE sales_aggregated (
window_start TIMESTAMP,
window_end TIMESTAMP,
store_id BIGINT,
product_id BIGINT,
total_quantity BIGINT,
total_amount DECIMAL(15, 2),
PRIMARY KEY (window_start, window_end, store_id, product_id) NOT ENFORCED
) WITH (
'bucket.num' = '8',
'table.datalake.enabled' = 'true'
);
-- Silver層ETL:清洗和轉換數據
INSERT INTO sales_cleaned
SELECT
event_time,
store_id,
product_id,
CASE WHEN quantity <= 0 THEN 1 ELSE quantity END AS quantity,
amount,
payment_method,
source
FROM sales_raw
WHERE amount > 0;
-- Gold層ETL:聚合數據
INSERT INTO sales_aggregated
SELECT
TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
TUMBLE_END(event_time, INTERVAL '1' HOUR) AS window_end,
store_id,
product_id,
SUM(quantity) AS total_quantity,
SUM(amount) AS total_amount
FROM sales_cleaned
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), store_id, product_id;
四、流式湖倉一體化的技術優勢
1. 數據分布一致性
Fluss和Paimon之間的數據分布是嚴格對齊的。Fluss支持分區表和桶,其分桶算法與Paimon完全一致。這確保了給定的數據始終分配到兩個系統中的相同桶,創建了Fluss桶和Paimon桶之間的一對一對應關系。
這種數據分布的一致性提供了兩個顯著優勢:
(1) 消除分層過程中的Shuffle開銷
當將數據從Fluss分層到Paimon格式時:
- Fluss桶(例如bucket1)可以直接分層到相應的Paimon桶(bucket1)
- 無需讀取Fluss桶中的數據,計算每條數據屬于哪個Paimon桶,然后將其寫入適當的Paimon桶
通過繞過這個中間重新分配步驟,架構避免了昂貴的shuffle開銷,顯著提高了壓縮效率。
(2) 防止數據不一致
通過在Fluss和Paimon中使用相同的分桶算法,確保了數據一致性。該算法計算每條數據的桶分配如下:
bucket_id = hash(row) % bucket_num
2. 更高效的數據追蹤
在Fluss架構中,歷史數據存儲在Lakehouse存儲中,而實時數據保留在Fluss中。在流式讀取期間,這種架構實現了歷史和實時數據訪問的無縫結合:
- 歷史數據訪問:Fluss直接從Lakehouse存儲檢索歷史數據,利用其固有優勢,如:
- 高效的過濾下推:使查詢引擎能夠在存儲層應用過濾條件,減少讀取的數據量并提高性能
- 列裁剪:允許僅檢索必要的列,優化數據傳輸和查詢效率
- 高壓縮比:在保持快速檢索速度的同時最小化存儲開銷
- 實時數據訪問:Fluss同時從自己的存儲中讀取最新的實時數據,確保毫秒級的新鮮度
3. 一致的數據新鮮度
在構建數據倉庫時,通常按層組織和管理數據,如Bronze、Silver和Gold層。當數據流經這些層時,維護數據新鮮度變得至關重要。
當Paimon作為每一層的唯一存儲解決方案時,數據可見性取決于Flink檢查點間隔,這會引入累積延遲:
- 給定層的變更日志僅在Flink檢查點完成后才可見
- 當這個變更日志傳播到后續層時,數據新鮮度延遲隨著每個檢查點間隔而增加
例如,使用1分鐘的Flink檢查點間隔:
- Bronze層經歷1分鐘延遲
- Silver層增加另一個1分鐘延遲,總計2分鐘
- Gold層再增加1分鐘延遲,累積為3分鐘延遲
而使用Fluss和Paimon,我們獲得:
- 即時數據可見性:Fluss中的數據在攝入后立即可見,無需等待Flink檢查點完成。變更日志立即傳輸到下一層
- 一致的數據新鮮度:所有層的數據新鮮度是一致的,以秒為單位,消除了累積延遲
4. 完整實施示例:電子商務實時分析平臺
下面是一個完整的電子商務實時分析平臺實施示例,展示了如何使用Fluss的流式湖倉一體化架構構建端到端解決方案。
步驟1:環境準備
首先,確保已安裝并配置好Fluss和Flink環境:
# 下載并解壓Fluss
wget https://github.com/alibaba/fluss/releases/download/v0.6.0/fluss-0.6.0.tgz
tar -xzf fluss-0.6.0.tgz
cd fluss-0.6.0
# 啟動本地Fluss集群
./bin/start-local.sh
# 啟動本地Flink集群(用于數據湖分層服務)
cd /path/to/flink
./bin/start-cluster.sh
在Fluss的conf/server.yaml中配置Paimon作為Lakehouse存儲:
datalake.format: paimon
datalake.paimon.metastore: filesystem
datalake.paimon.warehouse: /path/to/paimon/warehouse
步驟3:啟動數據湖分層服務
啟動數據湖分層服務,將數據從Fluss壓縮到Paimon:
cd /path/to/fluss
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081 -D flink.execution.checkpointing.interval=10s
步驟4:創建數據模型
使用Flink SQL創建電子商務分析所需的表結構:
-- 創建訂單表(主鍵表)
CREATE TABLE orders (
order_id BIGINT,
customer_id BIGINT,
order_time TIMESTAMP,
total_amount DECIMAL(10, 2),
status STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'bucket.num' = '8',
'table.datalake.enabled' = 'true'
);
-- 創建訂單明細表(日志表)
CREATE TABLE order_items (
item_id BIGINT,
order_id BIGINT,
product_id BIGINT,
quantity INT,
unit_price DECIMAL(10, 2),
discount DECIMAL(5, 2)
) WITH (
'bucket.num' = '8',
'table.datalake.enabled' = 'true'
);
-- 創建產品表(主鍵表)
CREATE TABLE products (
product_id BIGINT,
name STRING,
category STRING,
brand STRING,
price DECIMAL(10, 2),
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'bucket.num' = '4',
'table.datalake.enabled' = 'true'
);
-- 創建客戶表(主鍵表)
CREATE TABLE customers (
customer_id BIGINT,
name STRING,
email STRING,
registration_time TIMESTAMP,
vip_level INT,
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'bucket.num' = '4',
'table.datalake.enabled' = 'true'
);
-- 創建實時銷售指標表(主鍵表)
CREATE TABLE sales_metrics (
window_start TIMESTAMP,
window_end TIMESTAMP,
category STRING,
total_orders BIGINT,
total_sales DECIMAL(15, 2),
PRIMARY KEY (window_start, window_end, category) NOT ENFORCED
) WITH (
'bucket.num' = '4',
'table.datalake.enabled' = 'true'
);
步驟5:數據處理流程
使用Flink SQL實現數據處理流程:
-- 1. 計算實時銷售指標(每5分鐘更新一次)
INSERT INTO sales_metrics
SELECT
TUMBLE_START(o.order_time, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(o.order_time, INTERVAL '5' MINUTE) AS window_end,
p.category,
COUNT(DISTINCT o.order_id) AS total_orders,
SUM(i.quantity * i.unit_price * (1 - i.discount/100)) AS total_sales
FROM orders o
JOIN order_items i ON o.order_id = i.order_id
JOIN products p ON i.product_id = p.product_id
WHERE o.status = 'COMPLETED'
GROUP BY
TUMBLE(o.order_time, INTERVAL '5' MINUTE),
p.category;
-- 2. 創建VIP客戶實時推薦視圖
CREATE VIEW vip_recommendations AS
SELECT
c.customer_id,
c.name,
p.category,
p.brand,
COUNT(*) AS purchase_count
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
JOIN order_items i ON o.order_id = i.order_id
JOIN products p ON i.product_id = p.product_id
WHERE
c.vip_level >= 3 AND
o.order_time > CURRENT_TIMESTAMP - INTERVAL '30' DAY
GROUP BY
c.customer_id, c.name, p.category, p.brand
HAVING COUNT(*) >= 2;
步驟6:實時儀表盤查詢
使用聯合讀取查詢實時銷售指標:
-- 實時銷售趨勢(聯合讀取)
SELECT
window_start,
category,
total_sales
FROM sales_metrics
WHERE
window_start >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR
ORDER BY
window_start DESC, total_sales DESC;
步驟7:歷史分析查詢
使用Paimon查詢歷史數據分析:
-- 歷史銷售分析(僅Paimon數據)
SELECT
p.category,
MONTH(o.order_time) AS month,
SUM(i.quantity * i.unit_price * (1 - i.discount/100)) AS total_sales
FROM orders$lake o
JOIN order_items$lake i ON o.order_id = i.order_id
JOIN products$lake p ON i.product_id = p.product_id
WHERE
o.order_time >= TIMESTAMP '2025-01-01 00:00:00' AND
o.order_time < TIMESTAMP '2025-05-01 00:00:00'
GROUP BY
p.category, MONTH(o.order_time)
ORDER BY
p.category, month;
這種查詢只訪問Paimon中的歷史數據,提供更好的查詢性能,適合大規模歷史數據分析。
5. 流式湖倉一體化的關鍵技術點
(1) 統一元數據管理
在傳統架構中,流存儲系統(如Kafka)和湖倉存儲解決方案(如Paimon)作為獨立實體運行,各自維護自己的元數據。這給計算引擎(如Flink)帶來了兩個主要挑戰:
雙重目錄:用戶需要創建和管理兩個獨立的目錄——一個用于流存儲,另一個用于湖存儲
手動切換:訪問數據需要手動在目錄之間切換,以確定是查詢流存儲還是湖存儲,導致操作復雜性和效率低下
而在Fluss中,雖然Fluss和Paimon仍然維護獨立的元數據,但它們向計算引擎(如Flink)公開統一的目錄和單一的表抽象。這種統一方法提供了幾個關鍵優勢:
- 簡化數據訪問:用戶可以通過單一目錄無縫訪問湖倉存儲(Paimon)和流存儲(Fluss),無需管理或在獨立目錄之間切換
- 集成查詢:統一表抽象允許直接訪問Fluss中的實時數據和Paimon中的歷史數據
- 操作效率:通過提供一致的接口,該架構降低了操作復雜性,使用戶更容易在單一工作流中處理實時和歷史數據
(2) 數據分布對齊
Fluss和Paimon之間的數據分布是嚴格對齊的,這是通過使用相同的分桶算法實現的:
bucket_id = hash(row) % bucket_num
這種對齊確保了:
- 分層效率:Fluss桶可以直接分層到對應的Paimon桶,無需重新分配數據
- 數據一致性:相同的數據在兩個系統中分配到相同的桶,防止數據不一致
(3) 檢查點間隔與數據新鮮度
在傳統的Paimon架構中,數據新鮮度受Flink檢查點間隔的限制。例如,使用1分鐘的檢查點間隔時:
而在Fluss流式湖倉架構中,數據在寫入后立即可見:
6. 流式湖倉一體化的最佳實踐
(1) 表設計最佳實踐
- 合理設置桶數量:根據數據量和查詢模式設置適當的桶數量,通常為2的冪次方(4、8、16等)
- 選擇合適的主鍵:對于主鍵表,選擇具有良好分布特性的主鍵,避免熱點
- 考慮分區策略:對于大型表,使用分區來提高查詢性能和管理數據生命周期
(2) 數據湖分層服務配置
- 檢查點間隔:根據實時性需求和系統負載調整檢查點間隔,通常在10-60秒之間
- 資源分配:為數據湖分層服務分配足夠的資源,確保能夠及時處理數據
- 監控:定期監控分層服務的性能和延遲,確保數據及時壓縮到Paimon
( 3)查詢優化
①選擇合適的查詢模式
- 對于需要最新數據的查詢,使用聯合讀取
- 對于大規模歷史數據分析,使用僅Paimon查詢
②過濾下推:在查詢中盡可能早地應用過濾條件,利用Paimon的過濾下推能力
③列裁剪:只選擇需要的列,減少數據傳輸和處理量