成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Apache Flink 漫談系列(12) - Time Interval(Time-windowed) JOIN

開發 開發工具
本篇將介紹在UnBounded數據流上按時間維度進行數據劃分進行JOIN操作 - Time Interval(Time-windowed)JOIN, 后面我們叫做Interval JOIN。

一、說什么

JOIN 算子是數據處理的核心算子,前面我們在《Apache Flink 漫談系列(09) - JOIN 算子》介紹了UnBounded的雙流JOIN,在《Apache Flink 漫談系列(10) - JOIN LATERAL》介紹了單流與UDTF的JOIN操作,在《Apache Flink 漫談系列(11) - Temporal Table JOIN》又介紹了單流與版本表的JOIN,本篇將介紹在UnBounded數據流上按時間維度進行數據劃分進行JOIN操作 - Time Interval(Time-windowed)JOIN, 后面我們叫做Interval JOIN。

二、實際問題

前面章節我們介紹了Flink中對各種JOIN的支持,那么想想下面的查詢需求之前介紹的JOIN能否滿足?需求描述如下:

比如有一個訂單表Orders(orderId, productName, orderTime)和付款表Payment(orderId, payType, payTime)。 假設我們要統計下單一小時內付款的訂單信息。

1. 傳統數據庫解決方式

在傳統劉數據庫中完成上面的需求非常簡單,查詢sql如下::

  1. SELECT 
  2. o.orderId, 
  3. o.productName, 
  4. p.payType, 
  5. o.orderTime, 
  6. payTime 
  7. FROM 
  8. Orders AS o JOIN Payment AS p ON 
  9. o.orderId = p.orderId AND p.payTime >= orderTime AND p.payTime < orderTime + 3600 // 秒 

上面查詢可以***的完成查詢需求,那么在Apache Flink里面應該如何完成上面的需求呢?

2. Apache Flink解決方式

(1) UnBounded 雙流 JOIN

上面查詢需求我們很容易想到利用《Apache Flink 漫談系列(09) - JOIN 算子》介紹了UnBounded的雙流JOIN,SQL語句如下:

  1. SELECT 
  2. o.orderId, 
  3. o.productName, 
  4. p.payType, 
  5. o.orderTime, 
  6. payTime 
  7. FROM 
  8. Orders AS o JOIN Payment AS p ON 
  9. o.orderId = p.orderId AND p.payTime >= orderTime AND p.payTime as timestamp < TIMESTAMPADD(SECOND, 3600, orderTime) 

UnBounded雙流JOIN可以解決上面問題,這個示例和本篇要介紹的Interval JOIN有什么關系呢?

(2) 性能問題

雖然我們利用UnBounded的JOIN能解決上面的問題,但是仔細分析用戶需求,會發現這個需求場景訂單信息和付款信息并不需要長期存儲,比如2018-12-27 14:22:22的訂單只需要保持1小時,因為超過1個小時的訂單如果沒有被付款就是無效訂單了。同樣付款信息也不需要長期保持,2018-12-27 14:22:22的訂單付款信息如果是2018-12-27 15:22:22以后到達的那么我們也沒有必要保存到State中。 而對于UnBounded的雙流JOIN我們會一直將數據保存到State中,如下示意圖:

這樣的底層實現,對于當前需求有不必要的性能損失。所以我們有必要開發一種新的可以清除State的JOIN方式(Interval JOIN)來高性能的完成上面的查詢需求。

(3) 功能擴展

目前的UnBounded的雙流JOIN是后面是沒有辦法再進行Event-Time的Window Aggregate的。也就是下面的語句在Apache Flink上面是無法支持的:

  1. SELECT COUNT(*) FROM ( 
  2. SELECT 
  3. ..., 
  4. payTime 
  5. FROM Orders AS o JOIN Payment AS p ON 
  6. o.orderId = p.orderId 
  7. ) GROUP BY TUMBLE(payTime, INTERVAL '15' MINUTE) 

因為在UnBounded的雙流JOIN中無法保證payTime的值一定大于WaterMark(WaterMark相關可以查閱<>). Apache Flink的Interval JOIN之后可以進行Event-Time的Window Aggregate。

3. Interval JOIN

為了完成上面需求,并且解決性能和功能擴展的問題,Apache Flink在1.4開始開發了Time-windowed Join,也就是本文所說的Interval JOIN。接下來我們詳細介紹Interval JOIN的語法,語義和實現原理。

三、什么是Interval JOIN

Interval JOIN 相對于UnBounded的雙流JOIN來說是Bounded JOIN。就是每條流的每一條數據會與另一條流上的不同時間區域的數據進行JOIN。對應Apache Flink官方文檔的 Time-windowed JOIN(release-1.7之前都叫Time-Windowed JOIN)。

1. Interval JOIN 語法

  1. SELECT ... FROM t1 JOIN t2 ON t1.key = t2.key AND TIMEBOUND_EXPRESSION 

TIMEBOUND_EXPRESSION 有兩種寫法,如下:

  • L.time between LowerBound(R.time) and UpperBound(R.time)
  • R.time between LowerBound(L.time) and UpperBound(L.time)
  • 帶有時間屬性(L.time/R.time)的比較表達式。

2. Interval JOIN 語義

Interval JOIN 的語義就是每條數據對應一個 Interval 的數據區間,比如有一個訂單表Orders(orderId, productName, orderTime)和付款表Payment(orderId, payType, payTime)。 假設我們要統計在下單一小時內付款的訂單信息。SQL查詢如下:

  1. SELECT 
  2. o.orderId, 
  3. o.productName, 
  4. p.payType, 
  5. o.orderTime, 
  6. cast(payTime as timestamp) as payTime 
  7. FROM 
  8. Orders AS o JOIN Payment AS p ON 
  9. o.orderId = p.orderId AND 
  10. p.payTime BETWEEN orderTime AND 
  11. orderTime + INTERVAL '1' HOUR 
  • Orders訂單數據

  • Payment付款數據

符合語義的預期結果是 訂單id為003的信息不出現在結果表中,因為下單時間2018-12-26 04:53:24.0, 付款時間是 2018-12-26 05:53:30.0超過了1小時付款。

那么預期的結果信息如下:

這樣Id為003的訂單是無效訂單,可以更新庫存繼續售賣。

接下來我們以圖示的方式直觀說明Interval JOIN的語義,我們對上面的示例需求稍微變化一下: 訂單可以預付款(不管是否合理,我們只是為了說明語義)也就是訂單 前后 1小時的付款都是有效的。SQL語句如下:

  1. SELECT 
  2. ... 
  3. FROM 
  4. Orders AS o JOIN Payment AS p ON 
  5. o.orderId = p.orderId AND 
  6. p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND 
  7. orderTime + INTERVAL '1' HOUR 

這樣的查詢語義示意圖如下:

上圖有幾個關鍵點,如下:

  • 數據JOIN的區間 - 比如Order時間為3的訂單會在付款時間為[2, 4]區間進行JOIN。
  • WaterMark - 比如圖示Order***一條數據時間是3,Payment***一條數據時間是5,那么WaterMark是根據實際最小值減去UpperBound生成,即:Min(3,5)-1 = 2
  • 過期數據 - 出于性能和存儲的考慮,要將過期數據清除,如圖當WaterMark是2的時候時間為2以前的數據過期了,可以被清除。

3. Interval JOIN 實現原理

由于Interval JOIN和雙流JOIN類似都要存儲左右兩邊的數據,所以底層實現中仍然是利用State進行數據的存儲。流計算的特點是數據不停的流入,我們可以不停的進行增量計算,也就是我們每條數據流入都可以進行JOIN計算。我們還是以具體示例和圖示來說明內部計算邏輯,如下圖:

簡單解釋一下每條記錄的處理邏輯如下:

實際的內部邏輯會比描述的復雜的多,大家可以根據如上簡述理解內部原理即可。

四、示例代碼

我們還是以訂單和付款示例,將完整代碼分享給大家,具體如下(代碼基于flink-1.7.0):

  1. import java.sql.Timestamp 
  2.  
  3. import org.apache.flink.api.scala._ 
  4. import org.apache.flink.streaming.api.TimeCharacteristic 
  5. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  6. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
  7. import org.apache.flink.streaming.api.windowing.time.Time 
  8. import org.apache.flink.table.api.TableEnvironment 
  9. import org.apache.flink.table.api.scala._ 
  10. import org.apache.flink.types.Row 
  11.  
  12. import scala.collection.mutable 
  13.  
  14. object SimpleTimeIntervalJoin { 
  15. def main(args: Array[String]): Unit = { 
  16. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  17. val tEnv = TableEnvironment.getTableEnvironment(env) 
  18. env.setParallelism(1) 
  19. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
  20. // 構造訂單數據 
  21. val ordersData = new mutable.MutableList[(String, String, Timestamp)] 
  22. ordersData.+=(("001", "iphone", new Timestamp(1545800002000L))) 
  23. ordersData.+=(("002", "mac", new Timestamp(1545800003000L))) 
  24. ordersData.+=(("003", "book", new Timestamp(1545800004000L))) 
  25. ordersData.+=(("004", "cup", new Timestamp(1545800018000L))) 
  26.  
  27. // 構造付款表 
  28. val paymentData = new mutable.MutableList[(String, String, Timestamp)] 
  29. paymentData.+=(("001", "alipay", new Timestamp(1545803501000L))) 
  30. paymentData.+=(("002", "card", new Timestamp(1545803602000L))) 
  31. paymentData.+=(("003", "card", new Timestamp(1545803610000L))) 
  32. paymentData.+=(("004", "alipay", new Timestamp(1545803611000L))) 
  33. val orders = env 
  34. .fromCollection(ordersData) 
  35. .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]()) 
  36. .toTable(tEnv, 'orderId, 'productName, 'orderTime.rowtime) 
  37. val ratesHistory = env 
  38. .fromCollection(paymentData) 
  39. .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]()) 
  40. .toTable(tEnv, 'orderId, 'payType, 'payTime.rowtime) 
  41.  
  42. tEnv.registerTable("Orders", orders) 
  43. tEnv.registerTable("Payment", ratesHistory) 
  44.  
  45. var sqlQuery = 
  46. ""
  47. |SELECT 
  48. | o.orderId, 
  49. | o.productName, 
  50. | p.payType, 
  51. | o.orderTime, 
  52. | cast(payTime as timestamp) as payTime 
  53. |FROM 
  54. | Orders AS o JOIN Payment AS p ON o.orderId = p.orderId AND 
  55. | p.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR 
  56. |""".stripMargin 
  57. tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery)) 
  58.  
  59. val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] 
  60. result.print() 
  61. env.execute() 
  62.  
  63.  
  64. class TimestampExtractor[T1, T2] 
  65. extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) { 
  66. override def extractTimestamp(element: (T1, T2, Timestamp)): Long = { 
  67. element._3.getTime 

運行結果如下:

五、小節

本篇由實際業務需求場景切入,介紹了相同業務需求既可以利用Unbounded 雙流JOIN實現,也可以利用Time Interval JOIN來實現,Time Interval JOIN 性能優于UnBounded的雙流JOIN,并且Interval JOIN之后可以進行Window Aggregate算子計算。然后介紹了Interval JOIN的語法,語義和實現原理,***將訂單和付款的完整示例代碼分享給大家。期望本篇能夠讓大家對Apache Flink Time Interval JOIN有一個具體的了解!

關于點贊和評論

本系列文章難免有很多缺陷和不足,真誠希望讀者對有收獲的篇章給予點贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!

作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計算平臺Blink的設計研發工作。

【本文為51CTO專欄作者“金竹”原創稿件,轉載請聯系原作者】

戳這里,看該作者更多好文

責任編輯:趙寧寧 來源: 51CTO專欄
相關推薦

2022-07-13 12:53:59

數據存儲

2018-11-29 09:01:26

Apache FlinJOIN代碼

2018-11-20 07:59:43

Apache Flin JOIN算子代碼

2018-12-11 17:28:22

Apache FlinJOIN代碼

2022-06-10 17:26:07

數據集計算

2018-10-16 08:54:35

Apache Flin流計算State

2018-09-26 07:50:52

Apache Flin流計算計算模式

2018-09-26 08:44:22

Apache Flin流計算計算模式

2018-10-09 10:55:52

Apache FlinWatermark流計算

2018-11-14 09:01:23

Apache FlinSQL代碼

2018-10-22 21:43:39

Apache Flin流計算Fault Toler

2022-07-13 13:03:29

流計算亂序

2019-01-03 10:17:53

Apache FlinTable API代碼

2019-01-15 08:50:12

Apache FlinKafka分布式

2018-11-07 08:48:31

Apache Flin持續查詢流計算

2022-07-12 10:38:25

分布式框架

2018-10-30 14:08:45

Apache Flin流表對偶duality

2022-07-12 11:01:03

數據庫

2022-08-31 14:49:05

IoTDBIoTDatabase

2025-04-25 10:28:40

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 在线国产99 | 久久久精品视频免费看 | 欧美久久久久 | 羞羞视频网 | 午夜精品视频在线观看 | 成人久久视频 | 精品亚洲一区二区 | 欧美xxxx性| caoporn免费 | 国产成人精品网站 | 91精品国产91 | 一级黄色片免费 | 久久尤物免费一区二区三区 | 亚洲男人网 | 伊人操| 亚洲在线 | 日批的视频| 中文av网站 | 欧美综合一区 | 午夜视频在线观看视频 | 国产午夜精品一区二区三区四区 | 欧美一区免费 | 成人亚洲在线 | 日韩二三区 | 国产成人精品一区二区 | 欧美精品中文字幕久久二区 | 精品国产精品国产偷麻豆 | 亚洲国产成人久久综合一区,久久久国产99 | 午夜精品久久久久久久久久久久 | 久久久久久久一区二区 | 97超碰在线免费 | 国精产品一区二区三区 | 亚洲 欧美 日韩在线 | 亚洲精品aⅴ | 成人18亚洲xxoo | 日韩有码一区 | www.国产精品 | 在线观看视频你懂得 | 中文字幕第一页在线 | 日批日韩在线观看 | 国产精品久久久久久久久婷婷 |