成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

聊聊 Flink SQL增量查詢Hudi表

數據庫 其他數據庫
read.end-commit 增量查詢結束時間 不指定該參數則默認讀取到最新的記錄,該參數一般只適用于批讀,因為流讀一般的需求是查詢所有的增量數據

?官網文檔

地址:https://hudi.apache.org/cn/docs/querying_data#incremental-query

參數

  • read.start-commit 增量查詢開始時間 對于流讀,如果不指定該值,默認取最新的instantTime,也就是流讀默認從最新的instantTime開始讀(包含最新的)。對于批讀,如果不指定該參數,只指定read.end-commit,則實現時間旅行的功能,可查詢歷史記錄
  • read.end-commit 增量查詢結束時間 不指定該參數則默認讀取到最新的記錄,該參數一般只適用于批讀,因為流讀一般的需求是查詢所有的增量數據
  • read.streaming.enabled 是否流讀 默認false
  • read.streaming.check-interval  流讀的檢查時間間隔,單位秒(s),默認值60,也就是一分鐘查詢范圍 [BEGIN_INSTANTTIME,END_INSTANTTIME],既包含開始時間又包含結束時間,對于默認值可參考上面的參數說明

版本

建表造數:

  • Hudi 0.9.0
  • Spark 2.4.5

我這里建表造數使用Hudi Spark SQL 0.9.0,目的是為了模擬項目上用Java Client和Spark SQL創建的Hudi表,以驗證Hudi Flink SQL增量查詢時是否兼容舊版本的Hudi表(大家沒有這種需求的,可以使用任何方式正常造數)

查詢

  • Hudi 0.13.0-SNAPSHOT
  • Flink 1.14.3 (增量查詢)
  • Spark 3.1.2 (主要是為了使用Call Procedures命令查看commit信息)

建表造數

-- Spark SQL Hudi 0.9.0
create table hudi.test_flink_incremental (
id int,
name string,
price double,
ts long,
dt string
) using hudi
partitioned by (dt)
options (
primaryKey = 'id',
preCombineField = 'ts',
type = 'cow'
);

insert into hudi.test_flink_incremental values (1,'a1', 10, 1000, '2022-11-25');
insert into hudi.test_flink_incremental values (2,'a2', 20, 2000, '2022-11-25');
update hudi.test_flink_incremental set name='hudi2_update' where id = 2;
insert into hudi.test_flink_incremental values (3,'a3', 30, 3000, '2022-11-26');
insert into hudi.test_flink_incremental values (4,'a4', 40, 4000, '2022-12-26');

用show_commits看一下有哪些commits(這里查詢用的是Hudi的master,因為show_commits是在0.11.0版本開始支持的,也可以通過使用hadoop命令查看.hoodie文件夾下的.commit文件)

call show_commits(table => 'hudi.test_flink_incremental');
20221205152736
20221205152723
20221205152712
20221205152702
20221205152650

Flink SQL創建Hudi內存表

CREATE TABLE test_flink_incremental (
id int PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
price double,
ts bigint,
dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://cluster1/warehouse/tablespace/managed/hive/hudi.db/test_flink_incremental'
);

建表時不指定增量查詢相關的參數,我們在查詢時動態指定,這樣比較靈活。動態指定參數方法,在查詢語句后面加上如下形式的語句

/*+ 
options(
'read.start-commit' = '20221205152723',
'read.end-commit'='20221205152736'
)
*/

批讀

Flink SQL讀Hudi有兩種模式:批讀和流讀。默認批讀,先看一下批讀的增量查詢

驗證是否包含起始時間和默認結束時間

select * from test_flink_incremental 
/*+
options(
'read.start-commit' = '20221205152723' --起始時間對應id=3的記錄
)
*/

結果包含起始時間,不指定結束時間默認讀到最新的數據

id   name     price        ts                 dt
4 a4 40.0 4000 dt=2022-12-26
3 a3 30.0 3000 dt=2022-11-26

驗證是否包含結束時間

select * from test_flink_incremental 
/*+
options(
'read.start-commit' = '20221205152712', --起始時間對應id=2的記錄
'read.end-commit'='20221205152723' --結束時間對應id=3的記錄
)
*/

結果包含結束時間

id           name        price       ts                 dt
3 a3 30.0 3000 dt=2022-11-26
2 hudi2_update 20.0 2000 dt=2022-11-25

驗證默認開始時間

這種情況是指定結束時間,但不指定開始時間,如果都不指定,則讀表所有的最新版本的記錄。

select * from test_flink_incremental 
/*+
options(
'read.end-commit'='20221205152712' --結束時間對應id=2的更新記錄
)
*/

結果:只查詢end-commit對應的記錄

id           name        price       ts                 dt
2 hudi2_update 20.0 2000 dt=2022-11-25

時間旅行(查詢歷史記錄)

驗證是否可以查詢歷史記錄,我們更新id為2的name,更新前name為a2,更新后為hudi2_update,我們驗證一下,是否可以通過FlinkSQL查詢Hudi歷史記錄,預期結果查出id=2,name=a2

select * from test_flink_incremental 
/*+
options(
'read.end-commit'='20221205152702' --結束時間對應id=2的歷史記錄
)
*/

結果:可以正確查詢歷史記錄

id           name        price       ts                 dt
2 a2 20.0 2000 dt=2022-11-25

流讀

開啟流讀的參數:

read.streaming.enabled = true

流讀不需要設置結束時間,因為一般的需求是讀所有的增量數據,我們只需要驗證開始時間就好了

驗證默認開始時間

select * from test_flink_incremental 
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4'
)
*/

結果:從最新的instantTime開始增量讀取,也就是默認的read.start-commit為最新的instantTime

id   name     price        ts                 dt
4 a4 40.0 4000 dt=2022-12-26

驗證指定開始時間

select * from test_flink_incremental 
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4',
'read.start-commit' = '20221205152712'
)
*/

結果:

id           name        price       ts                 dt
2 hudi2_update 20.0 2000 dt=2022-11-25
3 a3 30.0 3000 dt=2022-11-26
4 a4 40.0 4000 dt=2022-11-26

如果想第一次查詢全部的歷史數據,可以將start-commit設置的早一點,比如設置到去年:'read.start-commit' = '20211205152712'

select * from test_flink_incremental 
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4',
'read.start-commit' = '20211205152712'
)
*/
id           name        price       ts                 dt
1 a1 10.0 1000 dt=2022-11-25
2 hudi2_update 20.0 2000 dt=2022-11-25
3 a3 30.0 3000 dt=2022-11-26
4 a4 40.0 4000 dt=2022-11-26

驗證流讀的連續性

驗證新的增量數據進來,是否可以持續消費Hudi增量數據,驗證數據的準確一致性,為了方便驗證,我可以使用Flink SQL增量流讀Hudi表然后Sink到MySQL表中,最后通過讀取MySQL表中的數據驗證數據的準確性

Flink SQL讀寫MySQL需要配置jar包,將flink-connector-jdbc_2.12-1.14.3.jar?放到lib?下即可,下載地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.3/flink-connector-jdbc_2.12-1.14.3.jar

先在MySQL中創建一張Sink表

-- MySQL
CREATE TABLE `test_sink` (
`id` int(11),
`name` text DEFAULT NULL,
`price` int(11),
`ts` int(11),
`dt` text DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Flink中創建對應的sink表

create table test_sink (
id int,
name string,
price double,
ts bigint,
dt string
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8',
'username' = 'root',
'password' = 'root-123',
'table-name' = 'test_sink',
'sink.buffer-flush.max-rows' = '1'
);

然后流式增量讀取Hudi表Sink Mysql

insert into test_sink
select * from test_flink_incremental
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4',
'read.start-commit' = '20221205152712'
)
*/

這樣會起一個長任務,一直處于running狀態,我們可以在yarn-session界面上驗證這一點

圖片

然后先在MySQL中驗證一下歷史數據的準確性

圖片

再利用Spark SQL往source表插入兩條數據

-- Spark SQL
insert into hudi.test_flink_incremental values (5,'a5', 50, 5000, '2022-12-07');
insert into hudi.test_flink_incremental values (6,'a6', 60, 6000, '2022-12-07');

我們增量讀取的間隔設置的4s,成功插入數據等待4s后,再在MySQL表中驗證一下數據

圖片

發現新增的數據已經成功Sink到MySQL中了,并且數據沒有重復

最后驗證一下更新的增量數據,Spark SQL更新Hudi source表

-- Spark SQL
update hudi.test_flink_incremental set name='hudi5_update' where id = 5;

繼續驗證結果

圖片

結果是更新的增量數據也會insert到MySQL中的sink表,但是不會更新原來的數據

那如果想實現更新的效果呢?我們需要在MySQL和Flink的sink表中加上主鍵字段,兩者缺一不可,如下:

-- MySQL
CREATE TABLE `test_sink` (
`id` int(11),
`name` text DEFAULT NULL,
`price` int(11),
`ts` int(11),
`dt` text DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- Flink SQL
create table test_sink (
id int PRIMARY KEY NOT ENFORCED,
name string,
price double,
ts bigint,
dt string
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8',
'username' = 'root',
'password' = 'root-123',
'table-name' = 'test_sink',
'sink.buffer-flush.max-rows' = '1'
);

將剛才起的長任務關掉,重新執行剛才的insert語句,先跑一下歷史數據,最后再驗證一下增量效果

-- Spark SQL
update hudi.test_flink_incremental set name='hudi6_update' where id = 6;
insert into hudi.test_flink_incremental values (7,'a7', 70, 7000, '2022-12-07');

可以看到,達到了預期效果,對于id=6的執行更新操作,對于id=7的執行插入操作。

圖片

責任編輯:武曉燕 來源: 倫少的博客
相關推薦

2022-11-03 07:22:42

2022-11-01 07:43:30

2023-08-29 09:46:12

SQLCTE遞歸

2024-02-27 08:05:32

Flink分區機制數據傳輸

2021-08-31 10:07:16

Flink Hud數據湖阿里云

2024-01-29 08:07:42

FlinkYARN架構

2024-08-05 00:10:00

2023-02-26 00:12:10

Hadoop數據湖存儲

2024-04-09 07:50:59

Flink語義Watermark

2022-05-09 09:03:04

SQL數據流數據

2010-09-26 15:23:24

SQL語句

2010-09-28 15:34:05

SQL表結構

2024-02-04 09:00:00

向量查詢數據檢索MyScale

2010-09-28 10:53:53

SQL表結構

2010-10-21 14:43:46

SQL Server查

2010-11-11 14:36:11

SQL Server系

2021-08-31 07:54:24

SQLDblink查詢

2010-09-25 16:42:45

sql語句

2022-06-09 14:19:46

順豐數據集成Flink

2021-11-09 06:55:03

SQLServer排序
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产伦精品一区二区三区在线 | 日韩在线国产精品 | 亚洲精品2 | 日本不卡一区 | 欧美一二三 | 日韩在线xx | av永久| 国产亚洲黄色片 | 亚洲色图在线观看 | 国产一二三区精品视频 | 国产精品自产拍在线观看蜜 | 亚洲在线成人 | 手机在线不卡av | 日韩精品免费视频 | 国产在线观看免费 | 欧美午夜影院 | 亚洲网址 | 亚洲精品国产电影 | 亚洲国产精品久久久 | 一级片av | 免费成人毛片 | 中文字幕国产精品 | 男人的天堂久久 | 少妇性l交大片免费一 | 99re在线播放 | 国产精品久久久久久久久久 | 国产黄色av电影 | 亚洲欧美在线观看 | 四虎影院在线观看免费视频 | 午夜三区| 日韩成人av在线 | 久久久久久免费观看 | 亚洲精品久久嫩草网站秘色 | 欧美国产日韩在线 | 精品国产一二三区 | 伊人伊人伊人 | 日韩在线观看中文字幕 | 黑人精品欧美一区二区蜜桃 | 欧美成人免费电影 | av国产精品毛片一区二区小说 | 91成人在线视频 |