成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

阿里面試:Fluss 是什么?如何使用 Fluss 構建流式湖倉一體?

大數據
Fluss是一種支持亞秒級低延遲流式讀寫的流式存儲系統。通過Lakehouse Storage,Fluss在Lakehouse之上提供實時流數據服務,實現了數據流和數據湖倉的統一。

一、湖倉概述

1. 傳統湖倉架構的挑戰

數據湖倉(Lakehouse)是一種結合了數據湖的可擴展性和成本效益與數據倉庫的可靠性和性能的新型開放架構。Apache Iceberg、Apache Paimon、Apache Hudi和Delta Lake等知名數據湖格式在湖倉架構中扮演著關鍵角色,它們在單一統一平臺內促進了數據存儲、可靠性和分析能力之間的和諧平衡。 

然而,傳統湖倉架構面臨著實時性與分析能力平衡的挑戰:

(1) 實時性與分析效率的矛盾

  • 如果要求低延遲,就需要頻繁寫入和提交,這會產生大量小型Parquet文件,導致讀取效率低下
  • 如果要求讀取效率,就需要積累數據直到能寫入大型Parquet文件,但這會引入更高的延遲 

(2) 數據新鮮度限制

即使在最佳使用條件下,這些數據湖格式通常也只能在分鐘級粒度內實現數據新鮮度。

二、流式湖倉一體化

1. 流與湖的統一

Fluss是一種支持亞秒級低延遲流式讀寫的流式存儲系統。通過Lakehouse Storage,Fluss在Lakehouse之上提供實時流數據服務,實現了數據流和數據湖倉的統一。這不僅為數據湖倉帶來了低延遲,還為數據流增加了強大的分析能力。 

為了構建流式湖倉,Fluss維護了一個分層服務,該服務將實時數據從Fluss集群壓縮到存儲在Lakehouse Storage中的數據湖格式。Fluss集群中的數據(流式Arrow格式)針對低延遲讀寫進行了優化,而Lakehouse中的壓縮數據(帶壓縮的Parquet格式)針對強大的分析和長期數據存儲進行了優化。因此,Fluss集群中的數據作為實時數據層,保留了具有亞秒級新鮮度的數據;而Lakehouse中的數據作為歷史數據層,保留了具有分鐘級新鮮度的數據。

2. 共享數據與共享元數據

流式湖倉的核心理念是流和湖倉之間共享數據和共享元數據,避免數據重復和元數據不一致。它提供了以下強大功能:

  • 統一元數據:Fluss為流和湖倉中的數據提供統一的表元數據。用戶只需處理一個表,但可以訪問實時流數據、歷史數據或它們的聯合。
  • 聯合讀取:計算引擎對表執行查詢時將讀取實時流數據和湖倉數據的聯合。目前,只有Flink支持聯合讀取,但更多引擎正在規劃中。
  • 實時湖倉:聯合讀取幫助湖倉從近實時分析發展到真正的實時分析,使企業能夠從實時數據中獲得更有價值的洞察。
  • 分析流:聯合讀取幫助數據流具備強大的分析能力,這減少了開發流應用程序的復雜性,簡化了調試,并允許立即訪問實時數據洞察。
  • 連接到湖倉生態系統:Fluss在將數據壓縮到Lakehouse時保持表元數據與數據湖目錄同步,這允許Spark、StarRocks、Flink、Trino等外部引擎通過連接到數據湖目錄直接讀取數據。 

3. 數據湖集成

(1) Paimon集成

Apache Paimon創新地結合了湖格式和LSM結構,將高效更新引入湖架構。 

要將Fluss與Paimon集成,必須啟用lakehouse存儲并將Paimon配置為lakehouse存儲。具體步驟如下:

① 配置Lakehouse存儲

首先,在server.yaml中配置lakehouse存儲:

datalake.format: paimon  


# Paimon目錄配置,假設使用Filesystem目錄  
datalake.paimon.metastore: filesystem  
datalake.paimon.warehouse: /tmp/paimon_data_warehouse

② 啟動數據湖分層服務

然后,啟動數據湖分層服務,將Fluss的數據壓縮到lakehouse存儲:

# 切換到Fluss目錄  
cd $FLUSS_HOME  


# 啟動分層服務,假設rest端點是localhost:8081  
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081

③ 為表啟用Lakehouse存儲

要為表啟用lakehouse存儲,必須在創建表時使用選項'table.datalake.enabled' = 'true':

CREATE TABLE datalake_enriched_orders (  
    `order_key` BIGINT,  
    `cust_key` INT NOT NULL,  
    `total_price` DECIMAL(15, 2),  
    `order_date` DATE,  
    `order_priority` STRING,  
    `clerk` STRING,  
    `cust_name` STRING,  
    `cust_phone` STRING,  
    `cust_acctbal` DECIMAL(15, 2),  
    `cust_mktsegment` STRING,  
    `nation_name` STRING,  
    PRIMARY KEY (`order_key`) NOT ENFORCED  
) WITH ('table.datalake.enabled' = 'true');

當在Fluss中創建或修改帶有選項'table.datalake.enabled' = 'true'的表時,Fluss將創建一個具有相同表路徑的相應Paimon表。Paimon表的模式與Fluss表的模式相同,只是在最后附加了兩個額外的列__offset和__timestamp。這兩列用于幫助Fluss客戶端以流式方式消費Paimon中的數據,例如按偏移量/時間戳查找等。 

然后,數據湖分層服務會持續將數據從Fluss壓縮到Paimon。對于主鍵表,它還將生成Paimon格式的變更日志,使您能夠以Paimon方式流式消費它。 

(2) Iceberg集成規劃

目前,Fluss支持Paimon作為Lakehouse存儲,更多種類的數據湖格式正在規劃中。 

Fluss社區正在積極努力增強流和湖倉統一功能,重點關注以下關鍵領域:

① 擴展聯合讀取生態系統

目前,聯合讀取功能已與Apache Flink集成,實現了實時和歷史數據的無縫查詢。未來,社區計劃擴展此功能以支持其他查詢引擎,如Apache Spark和StarRocks,進一步擴大其生態系統兼容性和采用。

② 多樣化湖存儲格式

目前,Fluss支持Apache Paimon作為其主要湖存儲。為了滿足多樣化的用戶需求,社區計劃添加對更多湖格式的支持,包括Apache Iceberg和Apache Hudi,從而提供更大的靈活性和與更廣泛的湖倉生態系統的互操作性。

4. 聯合讀取(Union Read)

(1) 實時數據與歷史數據的無縫結合

對于帶有選項'table.datalake.enabled' = 'true'的表,有兩部分數據:保留在Fluss中的數據和已經在Paimon中的數據。現在,您有兩種表視圖:一種是具有分鐘級延遲的Paimon數據視圖,一種是聯合Fluss和Paimon數據的完整數據視圖,具有秒級延遲。 

Flink使您能夠決定選擇哪種視圖:

  • 僅Paimon意味著更好的分析性能,但數據新鮮度較差
  • 結合Fluss和Paimon意味著更好的數據新鮮度,但分析性能下降

① 僅讀取Paimon中的數據

要指定讀取Paimon中的數據,必須使用$lake后綴指定表,以下SQL顯示了如何執行此操作:

-- 假設我們有一個名為`orders`的表  


-- 從paimon讀取  
SELECT COUNT(*) FROM orders$lake;  


-- 我們還可以查詢系統表  
SELECT * FROM orders$lake$snapshots;

當在查詢中使用$lake后綴指定表時,它就像一個普通的Paimon表,因此它繼承了Paimon表的所有能力。您可以享受Flink在Paimon上支持/優化的所有功能,如查詢系統表、時間旅行等。 

② 聯合讀取Fluss和Paimon中的數據

要指定讀取聯合Fluss和Paimon的完整數據,只需像查詢普通表一樣查詢它,無需任何后綴或其他內容,以下SQL顯示了如何執行此操作:

-- 查詢將聯合Fluss和Paimon的數據  
SELECT SUM(order_count) as total_orders FROM ads_nation_purchase_power;

查詢可能看起來比只查詢Paimon中的數據慢,但它查詢了完整數據,這意味著更好的數據新鮮度。您可以多次運行查詢,由于數據持續寫入表中,每次運行都應該得到不同的結果。 

(2) 查詢引擎支持

① Flink支持

Fluss向Flink用戶公開統一的API,允許他們選擇是使用聯合讀取還是僅在Lakehouse上進行讀取,使用以下SQL:

SELECT * FROM orders

這將讀取orders表的完整數據,Flink將聯合讀取Fluss和Lakehouse中的數據。如果用戶只需要讀取數據湖上的數據,可以在要讀取的表后添加$lake后綴。SQL如下:

-- 分析查詢  
SELECT COUNT(*), MAX(t), SUM(amount)   
FROM orders$lake  


-- 查詢系統表  
SELECT * FROM orders$lake$snapshots

5. 實時湖倉與分析流

(1) 為湖倉帶來真正的實時性

傳統的數據湖倉架構通常只能提供分鐘級的數據新鮮度,這對于許多實時分析場景來說是不夠的。Fluss的流式湖倉一體化架構通過聯合讀取(Union Read)機制,為湖倉帶來了真正的實時性:

  • 秒級數據新鮮度:通過將Fluss的實時流數據與Paimon的歷史數據聯合起來,查詢可以訪問最新寫入的數據,實現秒級數據新鮮度。
  • 實時分析能力增強:企業可以在保持湖倉強大分析能力的同時,獲得實時數據洞察,使決策更加及時和準確。
  • 無需數據復制:不需要將數據復制到單獨的實時系統中,減少了存儲成本和數據一致性問題。

(2) 為數據流增加強大的分析能力

傳統的流處理系統通常缺乏強大的分析能力,而Fluss通過與湖倉的集成,為數據流增加了這些能力:

  • 復雜分析查詢支持:流數據可以與歷史數據結合,支持更復雜的分析查詢,如聚合、窗口分析等。
  • 開發簡化:減少了開發流應用程序的復雜性,簡化了調試過程。 
  • 生態系統集成:可以利用現有的湖倉生態系統工具和查詢引擎,如Spark、StarRocks、Trino等。

6. 流式湖倉一體化實施流程

下面是實施Fluss流式湖倉一體化的詳細步驟和流程圖:

(1) 準備環境

確保已安裝Fluss和Flink,并且兩者都正常運行。Fluss需要一個運行中的Flink集群來執行數據湖分層服務。

(2) 配置Lakehouse存儲

在Fluss的server.yaml配置文件中設置Lakehouse存儲:

# 指定數據湖格式為Paimon  
datalake.format: paimon  


# Paimon目錄配置  
datalake.paimon.metastore: filesystem  
datalake.paimon.warehouse: /path/to/paimon/warehouse

(3) 啟動數據湖分層服務

數據湖分層服務是一個Flink作業,負責將數據從Fluss壓縮到Paimon:

# 切換到Fluss安裝目錄  
cd $FLUSS_HOME  


# 啟動分層服務,指定Flink REST端點  
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081

您還可以設置其他Flink配置參數,例如:

# 設置檢查點間隔為10秒  
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081 -D flink.execution.checkpointing.interval=10s  


# 僅同步特定數據庫中的表  
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081 -D database=fluss_\\w+

(4) 創建啟用Lakehouse的表

使用Flink SQL創建啟用了Lakehouse的表,通過設置'table.datalake.enabled' = 'true'選項:

-- 創建一個啟用了Lakehouse的日志表  
CREATE TABLE log_orders (  
    order_id BIGINT,  
    customer_id BIGINT,  
    order_time TIMESTAMP,  
    amount DECIMAL(10, 2),  
    status STRING  
) WITH (  
    'bucket.num' = '4',  
    'table.datalake.enabled' = 'true'  
);  


-- 創建一個啟用了Lakehouse的主鍵表  
CREATE TABLE pk_orders (  
    order_id BIGINT,  
    customer_id BIGINT,  
    order_time TIMESTAMP,  
    amount DECIMAL(10, 2),  
    status STRING,  
    PRIMARY KEY (order_id) NOT ENFORCED  
) WITH (  
    'bucket.num' = '4',  
    'table.datalake.enabled' = 'true'  
);

當創建啟用了Lakehouse的表時,Fluss會自動創建一個對應的Paimon表,并在表末尾添加兩個額外的列__offset和__timestamp,用于支持流式消費。

(5) 數據寫入

使用標準的Flink SQL向表中寫入數據:

-- 插入數據到日志表  
INSERT INTO log_orders  
VALUES (1001, 101, TIMESTAMP '2025-05-18 10:30:00', 199.99, 'PENDING');  


-- 插入數據到主鍵表  
INSERT INTO pk_orders  
VALUES (1001, 101, TIMESTAMP '2025-05-18 10:30:00', 199.99, 'PENDING');  


-- 或者從其他表插入數據  
INSERT INTO pk_orders  
SELECT order_id, customer_id, order_time, amount, status  
FROM source_table;

數據寫入后,會首先存儲在Fluss的實時層中,然后由數據湖分層服務定期壓縮到Paimon中。對于主鍵表,還會生成Paimon格式的變更日志。

(6) 數據查詢

① 聯合讀取(實時+歷史數據)

要查詢完整的數據(Fluss中的實時數據和Paimon中的歷史數據),直接查詢表名,無需任何后綴:

-- 聯合讀取實時和歷史數據  
SELECT COUNT(*) FROM pk_orders;  


-- 流式查詢,會持續獲取最新數據  
SELECT * FROM pk_orders WHERE amount > 100;

聯合讀取提供秒級數據新鮮度,但可能會比僅查詢Paimon數據慢一些。由于數據持續寫入,多次運行相同的查詢可能會得到不同的結果。

② 僅讀取Lakehouse數據

要僅查詢Paimon中的歷史數據,在表名后添加$lake后綴:

-- 僅查詢Paimon中的歷史數據  
SELECT COUNT(*) FROM pk_orders$lake;  


-- 查詢Paimon系統表  
SELECT * FROM pk_orders$lake$snapshots;  


-- 使用Paimon的時間旅行功能  
SELECT * FROM pk_orders$lake FOR TIMESTAMP AS OF '2025-05-17 00:00:00';

僅讀取Paimon數據提供更好的分析性能,但數據新鮮度較差(分鐘級)。您可以使用Paimon的所有功能,如查詢系統表、時間旅行等。

7. 流式湖倉一體化架構圖

三、實際應用場景示例

場景1:實時儀表盤

一個電子商務平臺需要一個實時儀表盤,顯示最新的銷售數據和趨勢。

實現方式:

  • 創建啟用Lakehouse的訂單表
  • 使用Flink SQL進行聯合讀取,獲取最新的銷售數據
  • 將結果推送到儀表盤應用
-- 創建啟用Lakehouse的訂單表  
CREATE TABLE sales_orders (  
    order_id BIGINT,  
    product_id BIGINT,  
    customer_id BIGINT,  
    order_time TIMESTAMP,  
    amount DECIMAL(10, 2),  
    PRIMARY KEY (order_id) NOT ENFORCED  
) WITH (  
    'bucket.num' = '8',  
    'table.datalake.enabled' = 'true'  
);  


-- 實時計算最近1小時的銷售額  
SELECT   
    TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,  
    SUM(amount) AS total_sales  
FROM sales_orders  
WHERE order_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR  
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE);

場景2:歷史數據分析與實時數據結合

一個金融機構需要分析客戶的歷史交易模式,并將其與實時交易數據結合,以檢測潛在的欺詐行為。

實現方式:

  • 創建啟用Lakehouse的交易表
  • 使用Paimon查詢歷史交易模式
  • 使用聯合讀取將歷史模式與實時交易結合分析
-- 創建啟用Lakehouse的交易表  
CREATE TABLE transactions (  
    transaction_id BIGINT,  
    account_id BIGINT,  
    transaction_time TIMESTAMP,  
    amount DECIMAL(15, 2),  
    merchant_id BIGINT,  
    location STRING,  
    PRIMARY KEY (transaction_id) NOT ENFORCED  
) WITH (  
    'bucket.num' = '16',  
    'table.datalake.enabled' = 'true'  
);  


-- 查詢歷史交易模式(僅Paimon數據)  
SELECT   
    account_id,  
    AVG(amount) AS avg_amount,  
    STDDEV(amount) AS stddev_amount  
FROM transactions$lake  
WHERE transaction_time BETWEEN TIMESTAMP '2025-04-01 00:00:00' AND TIMESTAMP '2025-05-01 00:00:00'  
GROUP BY account_id;
-- 將歷史模式與實時交易結合分析(聯合讀取)  
WITH historical_patterns AS (  
    SELECT   
        account_id,  
        AVG(amount) AS avg_amount,  
        STDDEV(amount) AS stddev_amount  
    FROM transactions$lake  
    WHERE transaction_time BETWEEN TIMESTAMP '2025-04-01 00:00:00' AND TIMESTAMP '2025-05-01 00:00:00'  
    GROUP BY account_id  
)  
SELECT   
    t.transaction_id,  
    t.account_id,  
    t.transaction_time,  
    t.amount,  
    t.merchant_id,  
    t.location,  
    h.avg_amount,  
    h.stddev_amount,  
    CASE   
        WHEN ABS(t.amount - h.avg_amount) > 3 * h.stddev_amount THEN 'SUSPICIOUS'  
        ELSE 'NORMAL'  
    END AS risk_level  
FROM transactions t  
JOIN historical_patterns h ON t.account_id = h.account_id  
WHERE t.transaction_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR;

場景3:實時數據湖分層架構

一個大型零售企業需要構建一個數據湖分層架構(Bronze、Silver、Gold),同時保持各層的數據新鮮度。

實現方式:

  • 創建各層的啟用Lakehouse的表
  • 使用Flink SQL將數據從一層轉換到下一層
  • 利用聯合讀取確保各層都具有秒級數據新鮮度

-- Bronze層:創建原始銷售數據表  
CREATE TABLE sales_raw (  
    event_time TIMESTAMP,  
    store_id BIGINT,  
    product_id BIGINT,  
    quantity INT,  
    amount DECIMAL(10, 2),  
    payment_method STRING,  
    source STRING  
) WITH (  
    'bucket.num' = '16',  
    'table.datalake.enabled' = 'true'  
);  


-- Silver層:創建清洗后的銷售數據表  
CREATE TABLE sales_cleaned (  
    event_time TIMESTAMP,  
    store_id BIGINT,  
    product_id BIGINT,  
    quantity INT,  
    amount DECIMAL(10, 2),  
    payment_method STRING,  
    source STRING,  
    PRIMARY KEY (store_id, product_id, event_time) NOT ENFORCED  
) WITH (  
    'bucket.num' = '16',  
    'table.datalake.enabled' = 'true'  
);  


-- Gold層:創建聚合的銷售指標表  
CREATE TABLE sales_aggregated (  
    window_start TIMESTAMP,  
    window_end TIMESTAMP,  
    store_id BIGINT,  
    product_id BIGINT,  
    total_quantity BIGINT,  
    total_amount DECIMAL(15, 2),  
    PRIMARY KEY (window_start, window_end, store_id, product_id) NOT ENFORCED  
) WITH (  
    'bucket.num' = '8',  
    'table.datalake.enabled' = 'true'  
);  


-- Silver層ETL:清洗和轉換數據  
INSERT INTO sales_cleaned  
SELECT   
    event_time,  
    store_id,  
    product_id,  
    CASE WHEN quantity <= 0 THEN 1 ELSE quantity END AS quantity,  
    amount,  
    payment_method,  
    source  
FROM sales_raw  
WHERE amount > 0;  


-- Gold層ETL:聚合數據  
INSERT INTO sales_aggregated  
SELECT   
    TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,  
    TUMBLE_END(event_time, INTERVAL '1' HOUR) AS window_end,  
    store_id,  
    product_id,  
    SUM(quantity) AS total_quantity,  
    SUM(amount) AS total_amount  
FROM sales_cleaned  
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), store_id, product_id;

四、流式湖倉一體化的技術優勢

1. 數據分布一致性

Fluss和Paimon之間的數據分布是嚴格對齊的。Fluss支持分區表和桶,其分桶算法與Paimon完全一致。這確保了給定的數據始終分配到兩個系統中的相同桶,創建了Fluss桶和Paimon桶之間的一對一對應關系。

這種數據分布的一致性提供了兩個顯著優勢:

(1) 消除分層過程中的Shuffle開銷

當將數據從Fluss分層到Paimon格式時:

  • Fluss桶(例如bucket1)可以直接分層到相應的Paimon桶(bucket1)
  • 無需讀取Fluss桶中的數據,計算每條數據屬于哪個Paimon桶,然后將其寫入適當的Paimon桶

通過繞過這個中間重新分配步驟,架構避免了昂貴的shuffle開銷,顯著提高了壓縮效率。

(2) 防止數據不一致

通過在Fluss和Paimon中使用相同的分桶算法,確保了數據一致性。該算法計算每條數據的桶分配如下:

bucket_id = hash(row) % bucket_num

2. 更高效的數據追蹤

在Fluss架構中,歷史數據存儲在Lakehouse存儲中,而實時數據保留在Fluss中。在流式讀取期間,這種架構實現了歷史和實時數據訪問的無縫結合:

  • 歷史數據訪問:Fluss直接從Lakehouse存儲檢索歷史數據,利用其固有優勢,如:
  • 高效的過濾下推:使查詢引擎能夠在存儲層應用過濾條件,減少讀取的數據量并提高性能
  • 列裁剪:允許僅檢索必要的列,優化數據傳輸和查詢效率
  • 高壓縮比:在保持快速檢索速度的同時最小化存儲開銷
  • 實時數據訪問:Fluss同時從自己的存儲中讀取最新的實時數據,確保毫秒級的新鮮度

3. 一致的數據新鮮度

在構建數據倉庫時,通常按層組織和管理數據,如Bronze、Silver和Gold層。當數據流經這些層時,維護數據新鮮度變得至關重要。

當Paimon作為每一層的唯一存儲解決方案時,數據可見性取決于Flink檢查點間隔,這會引入累積延遲:

  • 給定層的變更日志僅在Flink檢查點完成后才可見
  • 當這個變更日志傳播到后續層時,數據新鮮度延遲隨著每個檢查點間隔而增加

例如,使用1分鐘的Flink檢查點間隔:

  • Bronze層經歷1分鐘延遲
  • Silver層增加另一個1分鐘延遲,總計2分鐘
  • Gold層再增加1分鐘延遲,累積為3分鐘延遲

而使用Fluss和Paimon,我們獲得:

  • 即時數據可見性:Fluss中的數據在攝入后立即可見,無需等待Flink檢查點完成。變更日志立即傳輸到下一層
  • 一致的數據新鮮度:所有層的數據新鮮度是一致的,以秒為單位,消除了累積延遲

4. 完整實施示例:電子商務實時分析平臺

下面是一個完整的電子商務實時分析平臺實施示例,展示了如何使用Fluss的流式湖倉一體化架構構建端到端解決方案。

步驟1:環境準備

首先,確保已安裝并配置好Fluss和Flink環境:

# 下載并解壓Fluss  
wget https://github.com/alibaba/fluss/releases/download/v0.6.0/fluss-0.6.0.tgz  
tar -xzf fluss-0.6.0.tgz  
cd fluss-0.6.0  


# 啟動本地Fluss集群  
./bin/start-local.sh  


# 啟動本地Flink集群(用于數據湖分層服務)  
cd /path/to/flink  
./bin/start-cluster.sh
步驟2:配置Lakehouse存儲

在Fluss的conf/server.yaml中配置Paimon作為Lakehouse存儲:

datalake.format: paimon  
datalake.paimon.metastore: filesystem  
datalake.paimon.warehouse: /path/to/paimon/warehouse

步驟3:啟動數據湖分層服務

啟動數據湖分層服務,將數據從Fluss壓縮到Paimon:

cd /path/to/fluss  
./bin/lakehouse.sh -D flink.rest.address=localhost -D flink.rest.port=8081 -D flink.execution.checkpointing.interval=10s

步驟4:創建數據模型

使用Flink SQL創建電子商務分析所需的表結構:

-- 創建訂單表(主鍵表)  
CREATE TABLE orders (  
    order_id BIGINT,  
    customer_id BIGINT,  
    order_time TIMESTAMP,  
    total_amount DECIMAL(10, 2),  
    status STRING,  
    PRIMARY KEY (order_id) NOT ENFORCED  
) WITH (  
    'bucket.num' = '8',  
    'table.datalake.enabled' = 'true'  
);  


-- 創建訂單明細表(日志表)  
CREATE TABLE order_items (  
    item_id BIGINT,  
    order_id BIGINT,  
    product_id BIGINT,  
    quantity INT,  
    unit_price DECIMAL(10, 2),  
    discount DECIMAL(5, 2)  
) WITH (  
    'bucket.num' = '8',  
    'table.datalake.enabled' = 'true'  
);  


-- 創建產品表(主鍵表)  
CREATE TABLE products (  
    product_id BIGINT,  
    name STRING,  
    category STRING,  
    brand STRING,  
    price DECIMAL(10, 2),  
    PRIMARY KEY (product_id) NOT ENFORCED  
) WITH (  
    'bucket.num' = '4',  
    'table.datalake.enabled' = 'true'  
);  


-- 創建客戶表(主鍵表)  
CREATE TABLE customers (  
    customer_id BIGINT,  
    name STRING,  
    email STRING,  
    registration_time TIMESTAMP,  
    vip_level INT,  
    PRIMARY KEY (customer_id) NOT ENFORCED  
) WITH (  
    'bucket.num' = '4',  
    'table.datalake.enabled' = 'true'  
);  


-- 創建實時銷售指標表(主鍵表)  
CREATE TABLE sales_metrics (  
    window_start TIMESTAMP,  
    window_end TIMESTAMP,  
    category STRING,  
    total_orders BIGINT,  
    total_sales DECIMAL(15, 2),  
    PRIMARY KEY (window_start, window_end, category) NOT ENFORCED  
) WITH (  
    'bucket.num' = '4',  
    'table.datalake.enabled' = 'true'  
);

步驟5:數據處理流程

使用Flink SQL實現數據處理流程:

-- 1. 計算實時銷售指標(每5分鐘更新一次)  
INSERT INTO sales_metrics  
SELECT   
    TUMBLE_START(o.order_time, INTERVAL '5' MINUTE) AS window_start,  
    TUMBLE_END(o.order_time, INTERVAL '5' MINUTE) AS window_end,  
    p.category,  
    COUNT(DISTINCT o.order_id) AS total_orders,  
    SUM(i.quantity * i.unit_price * (1 - i.discount/100)) AS total_sales  
FROM orders o  
JOIN order_items i ON o.order_id = i.order_id  
JOIN products p ON i.product_id = p.product_id  
WHERE o.status = 'COMPLETED'  
GROUP BY   
    TUMBLE(o.order_time, INTERVAL '5' MINUTE),  
    p.category;  


-- 2. 創建VIP客戶實時推薦視圖  
CREATE VIEW vip_recommendations AS  
SELECT   
    c.customer_id,  
    c.name,  
    p.category,  
    p.brand,  
    COUNT(*) AS purchase_count  
FROM customers c  
JOIN orders o ON c.customer_id = o.customer_id  
JOIN order_items i ON o.order_id = i.order_id  
JOIN products p ON i.product_id = p.product_id  
WHERE   
    c.vip_level >= 3 AND  
    o.order_time > CURRENT_TIMESTAMP - INTERVAL '30' DAY  
GROUP BY   
    c.customer_id, c.name, p.category, p.brand  
HAVING COUNT(*) >= 2;

步驟6:實時儀表盤查詢

使用聯合讀取查詢實時銷售指標:

-- 實時銷售趨勢(聯合讀取)  
SELECT   
    window_start,  
    category,  
    total_sales  
FROM sales_metrics  
WHERE   
    window_start >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR  
ORDER BY   
    window_start DESC, total_sales DESC;

步驟7:歷史分析查詢

使用Paimon查詢歷史數據分析:

-- 歷史銷售分析(僅Paimon數據)  
SELECT   
    p.category,  
    MONTH(o.order_time) AS month,  
    SUM(i.quantity * i.unit_price * (1 - i.discount/100)) AS total_sales  
FROM orders$lake o  
JOIN order_items$lake i ON o.order_id = i.order_id  
JOIN products$lake p ON i.product_id = p.product_id  
WHERE   
    o.order_time >= TIMESTAMP '2025-01-01 00:00:00' AND  
    o.order_time < TIMESTAMP '2025-05-01 00:00:00'  
GROUP BY   
    p.category, MONTH(o.order_time)  
ORDER BY   
    p.category, month;

這種查詢只訪問Paimon中的歷史數據,提供更好的查詢性能,適合大規模歷史數據分析。

5. 流式湖倉一體化的關鍵技術點

(1) 統一元數據管理

在傳統架構中,流存儲系統(如Kafka)和湖倉存儲解決方案(如Paimon)作為獨立實體運行,各自維護自己的元數據。這給計算引擎(如Flink)帶來了兩個主要挑戰:

雙重目錄:用戶需要創建和管理兩個獨立的目錄——一個用于流存儲,另一個用于湖存儲

手動切換:訪問數據需要手動在目錄之間切換,以確定是查詢流存儲還是湖存儲,導致操作復雜性和效率低下

而在Fluss中,雖然Fluss和Paimon仍然維護獨立的元數據,但它們向計算引擎(如Flink)公開統一的目錄和單一的表抽象。這種統一方法提供了幾個關鍵優勢:

  • 簡化數據訪問:用戶可以通過單一目錄無縫訪問湖倉存儲(Paimon)和流存儲(Fluss),無需管理或在獨立目錄之間切換
  • 集成查詢:統一表抽象允許直接訪問Fluss中的實時數據和Paimon中的歷史數據
  • 操作效率:通過提供一致的接口,該架構降低了操作復雜性,使用戶更容易在單一工作流中處理實時和歷史數據

(2) 數據分布對齊

Fluss和Paimon之間的數據分布是嚴格對齊的,這是通過使用相同的分桶算法實現的:

bucket_id = hash(row) % bucket_num

這種對齊確保了:

  • 分層效率:Fluss桶可以直接分層到對應的Paimon桶,無需重新分配數據
  • 數據一致性:相同的數據在兩個系統中分配到相同的桶,防止數據不一致

(3) 檢查點間隔與數據新鮮度

在傳統的Paimon架構中,數據新鮮度受Flink檢查點間隔的限制。例如,使用1分鐘的檢查點間隔時:

而在Fluss流式湖倉架構中,數據在寫入后立即可見:

6. 流式湖倉一體化的最佳實踐

(1) 表設計最佳實踐

  • 合理設置桶數量:根據數據量和查詢模式設置適當的桶數量,通常為2的冪次方(4、8、16等)
  • 選擇合適的主鍵:對于主鍵表,選擇具有良好分布特性的主鍵,避免熱點
  • 考慮分區策略:對于大型表,使用分區來提高查詢性能和管理數據生命周期

(2) 數據湖分層服務配置

  • 檢查點間隔:根據實時性需求和系統負載調整檢查點間隔,通常在10-60秒之間
  • 資源分配:為數據湖分層服務分配足夠的資源,確保能夠及時處理數據
  • 監控:定期監控分層服務的性能和延遲,確保數據及時壓縮到Paimon

( 3)查詢優化

①選擇合適的查詢模式

  • 對于需要最新數據的查詢,使用聯合讀取
  • 對于大規模歷史數據分析,使用僅Paimon查詢

②過濾下推:在查詢中盡可能早地應用過濾條件,利用Paimon的過濾下推能力

③列裁剪:只選擇需要的列,減少數據傳輸和處理量

責任編輯:趙寧寧 來源: 大數據技能圈
相關推薦

2022-09-29 09:22:33

數據倉

2022-12-13 17:42:47

Arctic存儲湖倉

2023-08-30 07:14:27

MaxCompute湖倉一體

2024-09-03 14:59:00

2021-06-07 11:22:38

大數據數據倉庫湖倉一體

2023-06-28 07:28:36

湖倉騰訊架構

2023-12-14 13:01:00

Hudivivo

2020-12-02 17:20:58

數據倉庫阿里云數據湖

2023-05-16 07:24:25

數據湖快手

2023-05-26 06:45:08

2023-06-19 07:13:51

云原生湖倉一體

2021-06-07 10:45:16

大數據數據倉庫數據湖

2021-08-31 10:07:16

Flink Hud數據湖阿里云

2021-06-11 14:01:51

數據倉庫湖倉一體 Flink

2021-07-07 10:13:56

大數據Delta Lake 湖倉一體

2024-03-05 08:21:23

湖倉一體數據湖數據倉庫

2025-01-21 17:02:14

谷歌多模態AI
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 中文字幕 在线观看 | 日日操夜夜操视频 | 国产中文视频 | 日本不卡在线观看 | 久久久蜜桃 | 国产黑丝av| 精品久久久久久久久久久下田 | 91九色婷婷 | 国产高清一区二区三区 | 国产一区久久久 | 日本高清视频在线播放 | 日本一二三区高清 | 精品久久久久久一区二区 | 99pao成人国产永久免费视频 | 中文字幕在线看人 | 精品麻豆剧传媒av国产九九九 | 亚洲欧美视频在线观看 | 毛片av免费在线观看 | 国产免费人成xvideos视频 | 欧美精品欧美精品系列 | 日韩成人在线观看 | 天天操夜夜操 | 在线观看日韩精品视频 | 6996成人影院网在线播放 | 成人午夜在线视频 | 成年人黄色小视频 | 国产日韩精品一区二区三区 | 中文字幕av亚洲精品一部二部 | 国产小视频在线 | 日韩欧美在线观看视频网站 | 毛片网在线观看 | 日韩精品成人网 | 亚洲精品一区二区另类图片 | 欧美一区二 | 天堂在线免费视频 | 久久久久久久国产精品视频 | 99亚洲精品 | 色眯眯视频在线观看 | 日韩久久久一区二区 | 国产精品视频二区三区 | 国产精品久久久久久久久久久免费看 |