成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Apache Flink 漫談系列(11) - Temporal Table JOIN

開發 開發工具
在《Apache Flink 漫談系列 - JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家詳細介紹什么是Temporal Table JOIN。

一、什么是Temporal Table

在《Apache Flink 漫談系列 - JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家詳細介紹什么是Temporal Table JOIN。

ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的數據庫廠商也先后實現了這個標準。Temporal Table記錄了歷史上任何時間點所有的數據改動,Temporal Table的工作流程如下:

emporal Table

上圖示意Temporal Table具有普通table的特性,有具體獨特的DDL/DML/QUERY語法,時間是其核心屬性。歷史意味著時間,意味著快照Snapshot。

二、ANSI-SQL 2011 Temporal Table示例

我們以一個DDL和一套DML示例說明Temporal Table的原理,DDL定義PK是可選的,下面的示例我們以不定義PK的為例進行說明:

1. DDL 示例

  1. CREATE TABLE Emp 
  2. ENo INTEGER, 
  3. Sys_Start TIMESTAMP(12) GENERATED 
  4. ALWAYS AS ROW Start, 
  5. Sys_end TIMESTAMP(12) GENERATED 
  6. ALWAYS AS ROW END, 
  7. EName VARCHAR(30), 
  8. PERIOD FOR SYSTEM_TIME (Sys_Start,Sys_end) 
  9. ) WITH SYSTEM VERSIONING 

2. DML 示例

(1) INSERT

  1. INSERT INTO Emp (ENo, EName) VALUES (22217, 'Joe') 

說明: 其中Sys_Start和Sys_End是數據庫系統默認填充的。

(2) UPDATE

  1. UPDATE Emp SET EName = 'Tom' WHERE ENo = 22217 

說明: 假設是在 2012-02-03 10:00:00 執行的UPDATE,執行之后上一個值"Joe"的Sys_End值由9999-12-31 23:59:59 變成了 2012-02-03 10:00:00, 也就是下一個值"Tom"生效的開始時間。可見我們執行的是UPDATE但是數據庫里面會存在兩條數據,數據值和有效期不同,也就是版本不同。

(3) DELETE (假設執行DELETE之前的表內容如下)

  1. DELETE FROM Emp WHERE ENo = 22217 

說明: 假設我們是在2012-06-01 00:00:00執行的DELETE,則Sys_End值由9999-12-31 23:59:59 變成了 2012-06-01 00:00:00, 也就是在執行DELETE時候沒有真正的刪除符合條件的行,而是系統將符合條件的行的Sys_end修改為執行DELETE的操作時間。標識數據的有效期到DELETE執行那一刻為止。

(4) SELECT

  1. SELECT ENo,EName,Sys_Start,Sys_End FROM Emp 
  2. FOR SYSTEM_TIME AS OF TIMESTAMP '2011-01-02 00:00:00' 

說明: 這個查詢會返回所有Sys_Start <= 2011-01-02 00:00:00 并且 Sys_end > 2011-01-02 00:00:00 的記錄。

三、SQLServer Temporal Table 示例

1. DDL

  1. CREATE TABLE Department 
  2. DeptID int NOT NULL PRIMARY KEY CLUSTERED 
  3. , DeptName varchar(50) NOT NULL 
  4. , ManagerID INT NULL 
  5. , ParentDeptID int NULL 
  6. , SysStartTime datetime2 GENERATED ALWAYS AS ROW Start NOT NULL 
  7. , SysEndTime datetime2 GENERATED ALWAYS AS ROW END NOT NULL 
  8. , PERIOD FOR SYSTEM_TIME (SysStartTime,SysEndTime) 
  9. WITH (SYSTEM_VERSIONING = ON); 

執行上面的語句,在數據庫會創建當前表和歷史表,如下圖:

Department 顯示是有版本控制的,歷史表是默認的名字,我也可以指定名字如:SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.DepartmentHistory)。

2. DML

(1) INSERT - 插入列不包含SysStartTime和SysEndTime列

  1. INSERT INTO [dbo].[Department] ([DeptID] ,[DeptName] ,[ManagerID] ,[ParentDeptID]) 
  2. VALUES(10, 'Marketing', 101, 1); 

VALUES(10, 'Marketing', 101, 1);

執行之后我們分別查詢當前表和歷史表,如下圖:

我們***條INSERT語句數據值的有效時間是操作那一刻2018-06-06 05:50:20.7913985 到永遠 9999-12-31 23:59:59.9999999,但這時刻歷史表還沒有任何信息。我們接下來進行更新操作。

(2) UPDATE

  1. UPDATE [dbo].[Department] SET [ManagerID] = 501 WHERE [DeptID] = 10 

執行之后當前表信息會更新并在歷史表里面產生一條歷史信息,如下:

 

注意當前表的SysStartTime意見發生了變化,歷史表產生了一條記錄,SyStartTIme是原當前表記錄的SysStartTime,SysEndTime是當前表記錄的SystemStartTime。我們再更新一次:

  1. UPDATE [dbo].[Department] SET [ManagerID] = 201 WHERE [DeptID] = 10 

到這里我們了解到SQLServer里面關于Temporal Table的邏輯是有當前表和歷史表來存儲數據,并且數據庫內部以StartTime和EndTime的方式管理數據的版本。

(3) SELECT

  1. SELECT [DeptID], [DeptName], [SysStartTime],[SysEndTime] 
  2. FROM [dbo].[Department] 
  3. FOR SYSTEM_TIME AS OF '2018-06-06 05:50:21.0000000' ; 

SELECT語句查詢的是Department的表,實際返回的數據是從歷史表里面查詢出來的,查詢的底層邏輯就是 SysStartTime <= '2018-06-06 05:50:21.0000000' and SysEndTime > '2018-06-06 05:50:21.0000000' 。

四、Apache Flink Temporal Table

我們不止一次的提到Apache Flink遵循ANSI-SQL標準,Apache Flink中Temporal Table的概念也源于ANSI-2011的標準語義,但目前的實現在語法層面和ANSI-SQL略有差別,上面看到ANSI-2011中使用FOR SYSTEM_TIME AS OF的語法,目前Apache Flink中使用 LATERAL TABLE(TemporalTableFunction)的語法。這一點后續需要推動社區進行改進。

1. 為啥需要 Temporal Table

我們以具體的查詢示例來說明為啥需要Temporal Table,假設我們有一張實時變化的匯率表(RatesHistory),如下:

RatesHistory代表了Yen匯率(Yen匯率為1),是不斷變化的Append only的匯率表。例如,Euro兌Yen匯率從09:00至10:45的匯率為114。從10點45分到11點15分是116。

假設我們想在10:58輸出所有當前匯率,我們需要以下SQL查詢來計算結果表:

  1. SELECT * 
  2. FROM RatesHistory AS r 
  3. WHERE r.rowtime = ( 
  4. SELECT MAX(rowtime) 
  5. FROM RatesHistory AS r2 
  6. WHERE rr2.currency = r.currency 
  7. AND r2.rowtime <= '10:58'); 

相應Flink代碼如下:

  • 定義數據源-genRatesHistorySource
    1. def genRatesHistorySource: CsvTableSource = { 
    2.  
    3. val csvRecords = Seq
    4. "rowtime ,currency ,rate", 
    5. "09:00:00 ,US Dollar , 102", 
    6. "09:00:00 ,Euro , 114", 
    7. "09:00:00 ,Yen , 1", 
    8. "10:45:00 ,Euro , 116", 
    9. "11:15:00 ,Euro , 119", 
    10. "11:49:00 ,Pounds , 108" 
    11. // 測試數據寫入臨時文件 
    12. val tempFilePath = 
    13. writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp") 
    14.  
    15. // 創建Source connector 
    16. new CsvTableSource( 
    17. tempFilePath, 
    18. Array("rowtime","currency","rate"), 
    19. Array( 
    20. Types.STRING,Types.STRING,Types.STRING 
    21. ), 
    22. fieldDelim = ","
    23. rowDelim = "$"
    24. ignoreFirstLine = true
    25. ignoreComments = "%" 
    26. def writeToTempFile( 
    27. contents: String, 
    28. filePrefix: String, 
    29. fileSuffix: String, 
    30. charset: String = "UTF-8"): String = { 
    31. val tempFile = File.createTempFile(filePrefix, fileSuffix) 
    32. val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), charset) 
    33. tmpWriter.write(contents) 
    34. tmpWriter.close() 
    35. tempFile.getAbsolutePath} 
  • 主程序代碼
    1. def main(args: Array[String]): Unit = { 
    2. // Streaming 環境 
    3. val env = StreamExecutionEnvironment.getExecutionEnvironment 
    4. val tEnv = TableEnvironment.getTableEnvironment(env) 
    5.  
    6. //方便我們查出輸出數據 
    7. env.setParallelism(1) 
    8.  
    9. val sourceTableName = "RatesHistory" 
    10. // 創建CSV source數據結構 
    11. val tableSource = CsvTableSourceUtils.genRatesHistorySource 
    12. // 注冊source 
    13. tEnv.registerTableSource(sourceTableName, tableSource) 
    14.  
    15. // 注冊retract sink 
    16. val sinkTableName = "retractSink" 
    17. val fieldNames = Array("rowtime", "currency", "rate") 
    18. val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.STRING, Types.STRING) 
    19.  
    20. tEnv.registerTableSink( 
    21. sinkTableName, 
    22. fieldNames, 
    23. fieldTypes, 
    24. new MemoryRetractSink) 
    25.  
    26. val SQL = 
    27. ""
    28. |SELECT * 
    29. |FROM RatesHistory AS r 
    30. |WHERE r.rowtime = ( 
    31. | SELECT MAX(rowtime) 
    32. | FROM RatesHistory AS r2 
    33. | WHERE rr2.currency = r.currency 
    34. | AND r2.rowtime <= '10:58:00' ) 
    35. """.stripMargin 
    36.  
    37. // 執行查詢 
    38. val result = tEnv.SQLQuery(SQL) 
    39.  
    40. // 將結果插入sink 
    41. result.insertInto(sinkTableName) 
    42. env.execute() 
  • 執行結果如下圖:

結果表格化一下:

Temporal Table的概念旨在簡化此類查詢,加速它們的執行。Temporal Table是Append Only表上的參數化視圖,它把Append Only的表變化解釋為表的Changelog,并在特定時間點提供該表的版本(時間版本)。將Applend Only表解釋為changelog需要指定主鍵屬性和時間戳屬性。主鍵確定覆蓋哪些行,時間戳確定行有效的時間,也就是數據版本,與上面SQL Server示例的有效期的概念一致。

在上面的示例中,currency是RatesHistory表的主鍵,而rowtime是timestamp屬性。

2. 如何定義Temporal Table

在Apache Flink中擴展了TableFunction的接口,在TableFunction接口的基礎上添加了時間屬性和pk屬性。

(1) 內部TemporalTableFunction定義如下:

  1. class TemporalTableFunction private( 
  2. @transient private val underlyingHistoryTable: Table, 
  3. // 時間屬性,相當于版本信息 
  4. private val timeAttribute: Expression, 
  5. // 主鍵定義 
  6. private val primaryKey: String, 
  7. private val resultType: RowTypeInfo) 
  8. extends TableFunction[Row] { 
  9. ...} 

(2) 用戶創建TemporalTableFunction方式

在Table中添加了createTemporalTableFunction方法,該方法需要傳入時間屬性和主鍵,接口定義如下:

  1. // Creates TemporalTableFunction backed up by this table as a history table. 
  2.  
  3. def createTemporalTableFunction( 
  4. timeAttribute: Expression, 
  5. primaryKey: Expression): TemporalTableFunction = { 
  6. ...} 

用戶通過如下方式調用就可以得到一個TemporalTableFunction的實例,代碼如下:

  1. val tab = ... 
  2. val temporalTableFunction = tab.createTemporalTableFunction('time, 'pk) 
  3. ... 

3. 案例代碼

(1) 需求描述

假設我們有一張訂單表Orders和一張匯率表Rates,那么訂單來自于不同的地區,所以支付的幣種各不一樣,那么假設需要統計每個訂單在下單時候Yen幣種對應的金額。

(2) Orders 數據

(3) Rates 數據

(4) 統計需求對應的SQL

  1. SELECT o.currency, o.amount, r.rate 
  2. o.amount * r.rate AS yen_amount 
  3. FROM 
  4. Orders AS o, 
  5. LATERAL TABLE (Rates(o.rowtime)) AS r 
  6. WHERE r.currency = o.currency 

(5) 預期結果

4. Without connnector 實現代碼

  1. object TemporalTableJoinTest { 
  2. def main(args: Array[String]): Unit = { 
  3. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  4. val tEnv = TableEnvironment.getTableEnvironment(env) 
  5. env.setParallelism(1) 
  6. // 設置時間類型是 event-time env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
  7. // 構造訂單數據 
  8. val ordersData = new mutable.MutableList[(Long, String, Timestamp)] 
  9. ordersData.+=((2L, "Euro", new Timestamp(2L))) 
  10. ordersData.+=((1L, "US Dollar", new Timestamp(3L))) 
  11. ordersData.+=((50L, "Yen", new Timestamp(4L))) 
  12. ordersData.+=((3L, "Euro", new Timestamp(5L))) 
  13.  
  14. //構造匯率數據 
  15. val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)] 
  16. ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L))) 
  17. ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L))) 
  18. ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L))) 
  19. ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L))) 
  20. ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L))) 
  21.  
  22. // 進行訂單表 event-time 的提取 
  23. val orders = env 
  24. .fromCollection(ordersData) 
  25. .assignTimestampsAndWatermarks(new OrderTimestampExtractor[Long, String]()) 
  26. .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime) 
  27.  
  28. // 進行匯率表 event-time 的提取 
  29. val ratesHistory = env 
  30. .fromCollection(ratesHistoryData) 
  31. .assignTimestampsAndWatermarks(new OrderTimestampExtractor[String, Long]()) 
  32. .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime) 
  33.  
  34. // 注冊訂單表和匯率表 
  35. tEnv.registerTable("Orders", orders) 
  36. tEnv.registerTable("RatesHistory", ratesHistory) 
  37. val tab = tEnv.scan("RatesHistory"); 
  38. // 創建TemporalTableFunction 
  39. val temporalTableFunction = tab.createTemporalTableFunction('rowtime, 'currency) 
  40. //注冊TemporalTableFunction 
  41. tEnv.registerFunction("Rates",temporalTableFunction) 
  42.  
  43. val SQLQuery = 
  44. ""
  45. |SELECT o.currency, o.amount, r.rate, 
  46. | o.amount * r.rate AS yen_amount 
  47. |FROM 
  48. | Orders AS o, 
  49. | LATERAL TABLE (Rates(o.rowtime)) AS r 
  50. |WHERE r.currency = o.currency 
  51. |""".stripMargin 
  52.  
  53. tEnv.registerTable("TemporalJoinResult", tEnv.SQLQuery(SQLQuery)) 
  54.  
  55. val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] 
  56. // 打印查詢結果 
  57. result.print() 
  58. env.execute() 

在運行上面代碼之前需要注意上面代碼中對EventTime時間提取的過程,也就是說Apache Flink的TimeCharacteristic.EventTime 模式,需要調用assignTimestampsAndWatermarks方法設置EventTime的生成方式,這種方式也非常靈活,用戶可以控制業務數據的EventTime的值和WaterMark的產生,WaterMark相關內容可以查閱《Apache Flink 漫談系列(03) - Watermark》。 在本示例中提取EventTime的完整代碼如下:

  1. import java.SQL.Timestamp 
  2.  
  3. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  4. import org.apache.flink.streaming.api.windowing.time.Time 
  5.  
  6. class OrderTimestampExtractor[T1, T2] 
  7. extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) { 
  8. override def extractTimestamp(element: (T1, T2, Timestamp)): Long = { 
  9. element._3.getTime 

查看運行結果:

5. With CSVConnector 實現代碼

在實際的生產開發中,都需要實際的Connector的定義,下面我們以CSV格式的Connector定義來開發Temporal Table JOIN Demo。

(1) genEventRatesHistorySource

  1. def genEventRatesHistorySource: CsvTableSource = { 
  2.  
  3. val csvRecords = Seq
  4. "ts#currency#rate", 
  5. "1#US Dollar#102", 
  6. "1#Euro#114", 
  7. "1#Yen#1", 
  8. "3#Euro#116", 
  9. "5#Euro#119", 
  10. "7#Pounds#108" 
  11. // 測試數據寫入臨時文件 
  12. val tempFilePath = 
  13. FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp") 
  14.  
  15. // 創建Source connector 
  16. new CsvTableSource( 
  17. tempFilePath, 
  18. Array("ts","currency","rate"), 
  19. Array( 
  20. Types.LONG,Types.STRING,Types.LONG 
  21. ), 
  22. fieldDelim = "#"
  23. rowDelim = CommonUtils.line, 
  24. ignoreFirstLine = true
  25. ignoreComments = "%" 
  26. )} 

(2) genRatesOrderSource

  1. def genRatesOrderSource: CsvTableSource = { 
  2.  
  3. val csvRecords = Seq
  4. "ts#currency#amount", 
  5. "2#Euro#10", 
  6. "4#Euro#10" 
  7. // 測試數據寫入臨時文件 
  8. val tempFilePath = 
  9. FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp") 
  10.  
  11. // 創建Source connector 
  12. new CsvTableSource( 
  13. tempFilePath, 
  14. Array("ts","currency", "amount"), 
  15. Array( 
  16. Types.LONG,Types.STRING,Types.LONG 
  17. ), 
  18. fieldDelim = "#"
  19. rowDelim = CommonUtils.line, 
  20. ignoreFirstLine = true
  21. ignoreComments = "%" 

(3) 主程序代碼

  1. /* 
  2.  * Licensed to the Apache Software Foundation (ASF) under one 
  3.  * or more contributor license agreements.  See the NOTICE file 
  4.  * distributed with this work for additional information 
  5.  * regarding copyright ownership.  The ASF licenses this file 
  6.  * to you under the Apache License, Version 2.0 (the 
  7.  * "License"); you may not use this file except in compliance 
  8.  * with the License.  You may obtain a copy of the License at 
  9.  * 
  10.  *     http://www.apache.org/licenses/LICENSE-2.0 
  11.  * 
  12.  * Unless required by applicable law or agreed to in writing, software 
  13.  * distributed under the License is distributed on an "AS IS" BASIS, 
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  15.  * See the License for the specific language governing permissions and 
  16.  * limitations under the License. 
  17.  */ 
  18.  
  19. package org.apache.flink.book.connectors 
  20.  
  21. import java.io.File 
  22.  
  23. import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} 
  24. import org.apache.flink.book.utils.{CommonUtils, FileUtils} 
  25. import org.apache.flink.table.sinks.{CsvTableSink, TableSink} 
  26. import org.apache.flink.table.sources.CsvTableSource 
  27. import org.apache.flink.types.Row 
  28.  
  29. object CsvTableSourceUtils { 
  30.  
  31.   def genWordCountSource: CsvTableSource = { 
  32.     val csvRecords = Seq
  33.       "words", 
  34.       "Hello Flink", 
  35.       "Hi, Apache Flink", 
  36.       "Apache FlinkBook" 
  37.     ) 
  38.     // 測試數據寫入臨時文件 
  39.     val tempFilePath = 
  40.       FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp") 
  41.  
  42.     // 創建Source connector 
  43.     new CsvTableSource( 
  44.       tempFilePath, 
  45.       Array("words"), 
  46.       Array( 
  47.         Types.STRING 
  48.       ), 
  49.       fieldDelim = "#"
  50.       rowDelim = "$"
  51.       ignoreFirstLine = true
  52.       ignoreComments = "%" 
  53.     ) 
  54.   } 
  55.  
  56.  
  57.   def genRatesHistorySource: CsvTableSource = { 
  58.  
  59.     val csvRecords = Seq
  60.       "rowtime ,currency   ,rate", 
  61.     "09:00:00   ,US Dollar  , 102", 
  62.     "09:00:00   ,Euro       , 114", 
  63.     "09:00:00  ,Yen        ,   1", 
  64.     "10:45:00   ,Euro       , 116", 
  65.     "11:15:00   ,Euro       , 119", 
  66.     "11:49:00   ,Pounds     , 108" 
  67.     ) 
  68.     // 測試數據寫入臨時文件 
  69.     val tempFilePath = 
  70.       FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp") 
  71.  
  72.     // 創建Source connector 
  73.     new CsvTableSource( 
  74.       tempFilePath, 
  75.       Array("rowtime","currency","rate"), 
  76.       Array( 
  77.         Types.STRING,Types.STRING,Types.STRING 
  78.       ), 
  79.       fieldDelim = ","
  80.       rowDelim = "$"
  81.       ignoreFirstLine = true
  82.       ignoreComments = "%" 
  83.     ) 
  84.   } 
  85.  
  86.   def genEventRatesHistorySource: CsvTableSource = { 
  87.  
  88.     val csvRecords = Seq
  89.       "ts#currency#rate", 
  90.       "1#US Dollar#102", 
  91.       "1#Euro#114", 
  92.       "1#Yen#1", 
  93.       "3#Euro#116", 
  94.       "5#Euro#119", 
  95.       "7#Pounds#108" 
  96.     ) 
  97.     // 測試數據寫入臨時文件 
  98.     val tempFilePath = 
  99.       FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp") 
  100.  
  101.     // 創建Source connector 
  102.     new CsvTableSource( 
  103.       tempFilePath, 
  104.       Array("ts","currency","rate"), 
  105.       Array( 
  106.         Types.LONG,Types.STRING,Types.LONG 
  107.       ), 
  108.       fieldDelim = "#"
  109.       rowDelim = CommonUtils.line, 
  110.       ignoreFirstLine = true
  111.       ignoreComments = "%" 
  112.     ) 
  113.   } 
  114.  
  115.   def genRatesOrderSource: CsvTableSource = { 
  116.  
  117.     val csvRecords = Seq
  118.       "ts#currency#amount", 
  119.       "2#Euro#10", 
  120.       "4#Euro#10" 
  121.     ) 
  122.     // 測試數據寫入臨時文件 
  123.     val tempFilePath = 
  124.       FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp") 
  125.  
  126.     // 創建Source connector 
  127.     new CsvTableSource( 
  128.       tempFilePath, 
  129.       Array("ts","currency", "amount"), 
  130.       Array( 
  131.         Types.LONG,Types.STRING,Types.LONG 
  132.       ), 
  133.       fieldDelim = "#"
  134.       rowDelim = CommonUtils.line, 
  135.       ignoreFirstLine = true
  136.       ignoreComments = "%" 
  137.     ) 
  138.   } 
  139.  
  140.  
  141.   /** 
  142.     * Example: 
  143.     * genCsvSink( 
  144.     *   Array[String]("word", "count"), 
  145.     *   Array[TypeInformation[_] ](Types.STRING, Types.LONG)) 
  146.     */ 
  147.   def genCsvSink(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = { 
  148.     val tempFile = File.createTempFile("csv_sink_", "tem") 
  149.     if (tempFile.exists()) { 
  150.       tempFile.delete() 
  151.     } 
  152.     new CsvTableSink(tempFile.getAbsolutePath).configure(fieldNames, fieldTypes) 
  153.   } 
  154.  

運行結果如下 :

6. 內部實現原理

我們還是以訂單和匯率關系示例來說明Apache Flink內部實現Temporal Table JOIN的原理,如下圖所示:

五、Temporal Table JOIN vs 雙流JOIN vs Lateral JOIN

在《Apache Flink 漫談系列(09) - JOIN算子》中我們介紹了雙流JOIN,在《Apache Flink 漫談系列(10) - JOIN LATERAL 》中我們介紹了 JOIN LATERAL(TableFunction),那么本篇介紹的Temporal Table JOIN和雙流JOIN/JOIN LATERAL(TableFunction)有什么本質區別呢?

  • 雙流JOIN - 雙流JOIN本質很明確是 Stream JOIN Stream,雙流驅動。
  • LATERAL JOIN - Lateral JOIN的本質是Steam JOIN Table Function, 是單流驅動。
  • Temporal Table JOIN - Temporal Table JOIN 的本質就是 Stream JOIN Temporal Table 或者 Stream JOIN Table with snapshot。Temporal Table JOIN 特點單流驅動,Temporal Table 是被動查詢。

1. Temporal Table JOIN vs LATERAL JOIN

從功能上說Temporal Table JOIN和 LATERAL JOIN都是由左流一條數據獲取多行數據,也就是單流驅動,并且都是被動查詢,那么Temporal JOIN和LATERAL JOIN最本質的區別是什么呢?這里我們說最關鍵的一點是 State 的管理,LATERAL JOIN是一個TableFunction,不具備state的管理能力,數據不具備版本特性。而Temporal Table JOIN是一個具備版本信息的數據表。

2. Temporal Table JOIN vs 雙流 JOIN

Temporal Table JOIN 和 雙流 JOIN都可以管理State,那么他們的本質區別是什么? 那就是計算驅動的差別,Temporal Table JOIN是單邊驅動,Temporal Table是被動的查詢,而雙流JOIN是雙邊驅動,兩邊都是主動的進行JOIN計算。

3. Temporal Table JOIN改進

個人認為Apache Flink的Temporal Table JOIN功能不論在語法和語義上面都要遵循ANSI-SQL標準,后期會推動社區在Temporal Table上面支持ANSI-SQL的FOR SYSTEM_TIME AS OF標準語法。改進后的處理邏輯示意圖:

其中cache是一種性能考慮的優化,詳細內容待社區完善后再細述。

六、小結

本篇結合ANSI-SQL標準和SQL Server對Temporal Table的支持來開篇,然后介紹目前Apache Flink對Temporal Table的支持現狀,以代碼示例和內部處理邏輯示意圖的方式讓大家直觀體驗Temporal Table JOIN的語法和語義。

關于點贊和評論

本系列文章難免有很多缺陷和不足,真誠希望讀者對有收獲的篇章給予點贊鼓勵,對有不足的篇章給予反饋和建議,先行感謝大家!

作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來一直投入于基于Apache Flink的阿里巴巴計算平臺Blink的設計研發工作。

【本文為51CTO專欄作者“金竹”原創稿件,轉載請聯系原作者】

戳這里,看該作者更多好文

責任編輯:趙寧寧 來源: 51CTO專欄
相關推薦

2022-07-13 12:53:59

數據存儲

2018-11-29 09:01:26

Apache FlinJOIN代碼

2018-11-20 07:59:43

Apache Flin JOIN算子代碼

2019-01-03 10:17:53

Apache FlinTable API代碼

2022-06-10 17:26:07

數據集計算

2018-12-29 08:16:32

Apache FlinJOIN代碼

2018-10-09 10:55:52

Apache FlinWatermark流計算

2018-09-26 08:44:22

Apache Flin流計算計算模式

2018-09-26 07:50:52

Apache Flin流計算計算模式

2018-10-16 08:54:35

Apache Flin流計算State

2018-10-22 21:43:39

Apache Flin流計算Fault Toler

2018-11-14 09:01:23

Apache FlinSQL代碼

2022-07-13 13:03:29

流計算亂序

2018-11-07 08:48:31

Apache Flin持續查詢流計算

2022-07-12 10:38:25

分布式框架

2019-01-15 08:50:12

Apache FlinKafka分布式

2018-10-30 14:08:45

Apache Flin流表對偶duality

2020-04-09 11:08:30

PyFlinkJAR依賴

2022-06-20 05:52:27

FlinkTTL流查詢

2018-10-30 11:10:05

Flink數據集計算
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲av毛片 | 欧美日韩亚洲在线 | 久久一级免费视频 | 一区二区精品在线 | 亚洲精品白浆高清久久久久久 | 综合自拍 | 日韩成人一区 | 激情小视频 | 91麻豆精品国产91久久久久久久久 | 九九热精品视频在线观看 | 久久精品国产免费一区二区三区 | 精品国偷自产在线 | 福利网址 | 国产精品日本一区二区不卡视频 | 日日夜夜精品视频 | 欧美一区二区三区视频在线 | 韩国理论电影在线 | 国产农村妇女精品一二区 | 天天碰日日操 | 久久精品小视频 | 久久久不卡网国产精品一区 | 黄色大片视频 | 中文字幕av网站 | 婷婷丁香综合网 | 日韩免费一区二区 | 青青久草 | 亚洲天堂av网 | cao在线 | 亚洲精品成人 | 国产一区二区精品在线观看 | 特级做a爰片毛片免费看108 | 老子午夜影院 | 中文字幕亚洲一区二区三区 | 五月天激情综合网 | 久久一区二区视频 | 国产一级片91 | 国产一级视频在线观看 | 国产中文字幕在线观看 | 老司机精品福利视频 | 国产四虎 | 久久综合一区二区 |