十分鐘了解Flink SQL使用
Flink 是一個流處理和批處理統一的大數據框架,專門為高吞吐量和低延遲而設計。開發者可以使用SQL進行流批統一處理,大大簡化了數據處理的復雜性。本文將介紹Flink SQL的基本原理、使用方法、流批統一,并通過幾個例子進行實踐。
一、Flink SQL基本原理
Flink SQL建立在Apache Flink之上,利用Flink的強大處理能力,使得用戶可以使用SQL語句進行流數據和批數據處理。Flink SQL既支持實時的流數據處理,也支持有界的批數據處理。
Flink SQL用SQL作為處理數據的接口語言,將SQL語句轉換成數據流圖(Dataflow Graph),再由Flink引擎執行。
二、Flink SQL固定編碼套路
使用Flink SQL時,我們通常會遵循如下編碼套路,這些套路和使用Flink API的套路是一樣的:
- 環境準備:初始化一個TableEnvironment對象,它是執行Flink SQL語句的核心。這個環境可以是流數據環境,也可以是批數據環境。
- 數據源定義:通過CREATE TABLE語句定義輸入數據源(source),可以是Kafka、CSV文件等。
- 數據處理:編寫SQL語句對數據進行處理,如查詢、過濾、聚合等。
- 數據輸出:通過CREATE TABLE定義輸出數據源(sink),并將處理結果輸出。
三、Flink SQL代碼示例
以下是一個從CSV文件讀取數據,通過SQL查詢,再將數據輸出到CSV的完整例子。
(1) 先準備input.csv文件內容,如下:
1,product_A,10.5
2,product_B,20.3
3,product_C,15.8
1,product_D,12.2
2,product_A,18.7
(2) 編寫demo代碼
編寫代碼之前先在pom.xml中添加依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
示例代碼如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSqlDemo {
public static void main(String[] args) throws Exception {
// 設置環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); //為了方便測試看效果,這里并行度設置為1
// 使用EnvironmentSettings創建StreamTableEnvironment,明確設置為批處理模式
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode() // 設置為批處理模式,這樣后續才能一次性的輸出到csv中
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 定義輸入數據源
String createSourceTableDdl = "CREATE TABLE csv_source (" +
" user_id INT," +
" product STRING," +
" order_amount DOUBLE" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'file:///path/input.csv'," +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createSourceTableDdl);
// // 編寫 SQL 查詢
// String query = "SELECT user_id, SUM(order_amount) AS total_amount FROM csv_source GROUP BY user_id";
// // 執行查詢并打印
// tableEnv.executeSql(query).print();
// env.execute("Flink SQL Demo");
// 定義輸出數據源
String createSinkTableDdl = "CREATE TABLE csv_sink (" +
" user_id INT," +
" total_amount DOUBLE" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'file:///path/output.csv'," +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createSinkTableDdl);
// 執行查詢并將結果輸出到csv_sink
String query = "INSERT INTO csv_sink " +
"SELECT user_id, SUM(order_amount) as total_amount " +
"FROM csv_source " +
"GROUP BY user_id";
tableEnv.executeSql(query);
// env.execute("Flink SQL Job");
}
}
(3) 執行結果如下:
四、Flink SQL做流批統一
1.什么是流批統一?
流批統一是大數據處理領域的一個概念,它指的是使用一套代碼來同時處理流數據(Streaming)和批數據(Batching)。
流處理和批處理的區別如下:
(1) 批處理(Batch Processing):
- 批處理是指在某一時間點處理大量數據的手段。
- 它通常涉及到對大量靜止的(不再變化的)數據集進行一次性的處理。
- 批處理作業通常在數據集完整可用后開始執行,并且經常是在數據倉庫中進行。
- 例如,一個電商平臺可能在一天結束時運行一個批處理作業來處理當天所有的交易記錄。
(2) 流處理(Stream Processing):
- 流處理是指對數據實時進行處理,通常是數據生成或接收的同時立即進行。
- 流處理適用于連續的數據輸入,這些數據一直在變化,需要立即響應。
- 例如,社交媒體平臺在接收到新的帖子時,可能會實時分析這些帖子的內容和流行趨勢。
在早期,流處理和批處理通常需要不同的系統來執行。對于批處理,可能使用如Hadoop這樣的框架;而對于流處理,可能使用如Apache Storm這樣的框架。這就導致開發者要同時學習多種框架才能處理不同類型的數據作業。
流批統一的概念,就是將這兩種數據處理方式合并到一個平臺中,這樣一個系統既可以處理靜止的大批量數據集,也可以處理實時的數據流。這樣做的優點是顯而易見的:
- 統一的API:開發人員只需要學習和使用一套工具和API,可以共享更多的代碼和邏輯。
- 維護簡便:只需維護一個系統,可以減少學習成本,減輕運維壓力,減少故障點。
- 靈活的數據處理:可以根據不同的業務需求靈活選擇數據處理方式。
2.Flink SQL流批一體的實現原理
Flink很好的實現了流批統一,可以讓開發人員用相同的方式來編寫批處理和流處理程序。不論是對有界(批處理)還是無界(流處理)的數據源,Flink都可以使用相同的API和處理邏輯來處理數據。
Flink 通過內置的表抽象來實現流批一體,這里的"表"可以是動態變化的(例如,來自實時數據流的表)或是靜態的(例如,存儲在文件或數據庫中的批量數據表)。Flink SQL引擎會根據數據的實際來源自動優化執行計劃。
Flink SQL的流批統一核心在于三點:
- 統一的API和SQL語義:Flink SQL提供一致的查詢構建塊(如窗口、時間處理函數),這些在流處理和批處理中語義一致,確保不同模式下行為的統一性。
- 透明的狀態處理:無論是流處理還是批處理,Flink都能夠保持和恢復狀態,為開發者提供一致的高容錯性體驗。
- 多模態存儲和處理能力:Flink SQL能夠訪問不同存儲介質的數據,這意味著相同的SQL語句可以無縫在流數據和存儲的批量數據上執行。
3.Flink SQL流批統一的代碼示例
以下是一個完整的代碼示例,用Flink來實現流批統一處理。Flink同時從Kafka 和 CSV讀取數據,然后合并查詢再輸出結果:
(1) 代碼示例
代碼中,先配置了Flink的流處理環境和表環境,然后用DDL語句在Flink中注冊了Kafka和文件系統數據源。接著執行了一個SQL查詢來合并來自這兩種數據源的數據,并計算總金額。最后,打印出查詢結果并開始執行Flink作業。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class StreamBatchUnifiedDemo {
public static void main(String[] args) throws Exception {
// 設置流處理的環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// Kafka 流處理表
String createKafkaSourceDDL = "CREATE TABLE kafka_stream_orders (" +
"order_id STRING," +
"amount DOUBLE)" +
"WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'topic_test'," +
"'properties.bootstrap.servers' = '10.20.1.26:9092'," +
"'format' = 'json'," +
"'scan.startup.mode' = 'latest-offset'" +
")";
tableEnv.executeSql(createKafkaSourceDDL);
// 文件系統批處理表
String createFilesystemSourceDDL = "CREATE TABLE file_batch_orders (" +
"order_id STRING," +
"amount DOUBLE)" +
"WITH (" +
"'connector' = 'filesystem'," +
"'path' = 'file:///Users/yclxiao/Project/bigdata/flink-blog/doc/input_order.csv'," +
"'format' = 'csv'" +
")";
tableEnv.executeSql(createFilesystemSourceDDL);
// 執行統一查詢,計算總金額
Table resultTable = tableEnv.sqlQuery("SELECT SUM(amount) FROM (" +
"SELECT amount FROM kafka_stream_orders " +
"UNION ALL " +
"SELECT amount FROM file_batch_orders)");
// 打印結果
tableEnv.toRetractStream(resultTable, Row.class).print();
// 開始執行程序
env.execute("Stream-Batch Unified Job");
}
}
(2) 執行效果
通過以上示例代碼,可以看出Flink SQL的流批一體設計:相同的SQL語句可以用在流處理和批處理中,而不需要做任何修改。Flink背后的執行引擎會自動根據數據的特性(流或者批)來進行相應的優化執行。
這就是Flink SQL非常強大的地方,它減少了開發者需要寫不同代碼邏輯的需求,簡化了復雜的數據處理流程。
五、總結
Flink SQL是一個非常強大的數據處理工具,可以應對多種復雜的數據處理場景。
本文主要介紹了Flink SQL的基本原理、編碼套路、流批統一,再結合正確的代碼示例進行實踐。希望對你有幫助。
完整代碼地址:https://github.com/yclxiao/flink-blog