最常用的五種流式ETL模式!
?1970 年代的許多計算概念已經過時,但ETL (Extract-Transform-Load)及其最近的 anagram shuffle ELT并非如此,它在目的地與飛行中操縱數據。ETL 和 ELT 傳統上是計劃的批處理操作,但隨著對始終在線、始終最新的數據服務的需求成為常態,在數據流上操作的實時 ELT 是許多組織的目標——如果不是現實的話。
在實際使用中,ETL 中的“T”代表由原始操作組裝而成的各種模式。在本文中,我們將探索這些操作并查看如何將它們實現為 SQL 語句的示例。
使用 SQL 語句進行轉換?
是的!SQL 將聲明性語言的強大和簡潔性與任何使用代碼或數據的人的普遍技能相結合。與您可能用作替代的幾乎任何編程語言不同,SQL 的普及要歸功于將近 50 年的壽命——計算行業中的幾乎每個人都曾在某個時候使用過它。SQL 的強大功能和普遍性意味著它無處不在,甚至在構建最新開發人員技術和服務的公司中也是如此。當通過函數增強時,SQL 變得更加強大。
管道模式
大多數 ETL 管道都適合一種或多種模式。Decodable 的連接 - 流 - 管道抽象意味著您可以選擇將所有內容構建到單個管道中,或者根據需要將復雜的轉換分解為由流、跨團隊、區域和用例連接的可重用管道網絡。
1:過濾器
過濾器從流中刪除不需要的記錄,刪除與 SQL where子句中的“規則”不匹配的記錄。過濾器通常用于抑制敏感記錄以確保合規性,或減少目標系統上的處理負載或存儲需求。
1-- Filter only records pertaining to the application
2
3insert into application_events
4
5select * from http_eventswhere hostname = 'app.decodable.co'
6
7
8
9-- Filter only records that modify the inventory
10
11insert into inventory_updates
12
13select * from http_eventswhere hostname = 'api.mycompany.com' and
14
15path like '/v1/inventory%' and
16 method in ( 'POST', 'PUT', 'DELETE', 'PATCH' )
2:路線
Route 模式從一個或多個輸入流創建多個輸出流,根據一組規則將記錄定向到正確的目的地。此模式實際上由多個過濾器組成,它們都可以查看每個輸入記錄,但每個過濾器僅傳輸與該特定目的地的規則匹配的那些記錄。
1-- Route security-related HTTP events
2
3insert into security_events
4
5select * from http_eventswhere path like '/login%' or
6
7path like '/billing/cc%'
8-- Route app-related HTTP events
9
10insert into application_events
11
12select * from http_eventswhere hostname = 'app.decodable.co'
13
14-- Route requests to Customer Success if it looks like the user needs help
15
16insert into cs_alerts
17
18select * from http_events
19
20where response_code between 500 and 599 or -- any server failure
21
22( path = '/signup' and response_code != 200 ) or -- failed to sign up for any reason
3:變換
轉換管道通過修改輸入記錄來創建輸出記錄。通常這將導致 1:1 傳輸,但在某些情況下,輸出來自多個輸入記錄,因此可能存在 1:many 關系。在這里,我們將調用三個專門的轉換:
變換:提取
解析輸入記錄,從輸入記錄中提取數據并將其用作豐富派生輸出記錄的基礎。
1-- Parse timestamp and action
2
3insert into user_events
4
5select
6
7to_date(fields['ts'], 'YYYY-MM-DD''T''HH:MI:SS') as ts,
8 fields['user_id'] as user_id,
9 fields['path'] as path, case fields['method'] when 'GET' then 'read'
10 when 'POST', 'PUT' then 'modify'
11 when 'DELETE' then 'delete'
12 end as actionfrom ( select
13 grok(
14 body, '\[${ISO8661_DATETIME:ts} ${DATA:method} "${PATH:path}" uid:${DATA:user_id}'
15 ) as fields from http_event
16)
變換:歸一化
傳入的數據記錄通常需要針對模式進行規范化,以便目標系統處理它們。缺少的字段可能需要填充默認值,可能需要刪除可選字段,并強制執行數據類型。
1-- Cleanse incoming data for downstream processes
2
3insert into sensor_readings
4
5select
6
7cast(ifnull(sensor_id, '0') as bigint) as sensor_id, lower(trim(name)) as name, cast(`value` as bigint) as reading
8
9from raw_sensor_readings
轉換:匿名化
在目標系統不需要信息來完成處理的情況下,匿名管道只是出于合規、監管或隱私原因而消除了敏感字段。
1-- Anonymize SSNs and zip codes
2insert into user_events_masked
3select
4user_id,
5 username, overlay(ssn placing '*' from 1 for 12) as ssn, substring(zip_code from 1 for 2) as zip_code_1,
6action
7from user_events
4:聚合
聚合管道通常使用 SQL 窗口函數將傳入記錄分組到存儲桶中(通常基于時間),在這些存儲桶上執行聚合操作。Count、Min、Max、Avg、Sum 是典型的運算符,但還有很多。
1-- Count the number of events by path and status every 10 seconds.
2
3insert into site_activity
4
5select
6
7window_start,
8 window_end,
9 path,
10status, count(1) as `count`
11
12from table(
13
14tumble( table http_events, descriptor(_time),
15 interval '10' seconds
16 )
17)group by window_start, window_end, path, status
5:觸發
我們的最終模式是觸發器。與幾乎所有其他模式不同,觸發器輸出記錄可能與輸入記錄的模式幾乎沒有重疊,因為它表明已在一個或多個輸入記錄上檢測到一組條件,并作為結果輸出警報。輸出模式可以表示檢測到的條件、要采取的行動或兩者兼而有之。
1-- Build hourly usage data for a Stripe integration on the output stream
2
3insert into stripe_product_usage
4
5select
6
7window_start as _time,
8 customer_id, 'abcd1234' as price_id sum(bytes_sent) / 1024 / 1024 as mb_sentfrom table(
9 tumble( table document_downloads, descriptor(_time),
10 interval '1' hour
11 )
12)group by window_start, customer_idhaving mb_sent > 1024