成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

FlinkSQL 中 Catalog 的使用場景及案例詳解

大數據
Catalog 在 Flink SQL 中是一個元數據管理組件,用于存儲和管理數據庫、表、視 圖、函數等元數據對象的抽象接口。

Catalog 在 Flink SQL 中是一個元數據管理組件,用于存儲和管理數據庫、表、視 圖、函數等元數據對象的抽象接口。它類似于傳統數據庫系統中的元數據倉庫,為 Flink SQL 提供了元數據管理能力。

Catalog 使 Flink 能夠:

  • 以統一的方式訪問不同的外部系統
  • 減少代碼中的硬編 碼配置
  • 實現表元數據的持久化
  • 支持跨會話的元數據共享。

1. Catalog的作用

(1) 管理元數據對象

Catalog 可以管理以下元數據對象: - 數據庫(Database) - 表(Table) - 視圖(View) - 函數(Function) - 分區(Partition)等

(2) 支持多樣化的元數據存儲

Flink 支持多種Catalog 實現,可以連接各種外部元數據系統: - 內存Catalog(默認) - Hive Catalog - JDBC Catalog - 自定義 Catalog

(3) 提供統一的數據訪問接口

無論底層元數據存儲在哪里,都可以通過統一的接口訪問和操作

(4) 簡化元數據管理

開發者可以通過Catalog 注冊永久表,而不是在代碼中重復定義表結構

2. Flink內置的catalog類型

(1) GenericInMemoryCatalog

默認的內存 Catalog,元數據只在單個 Flink 會話中有效,會話結束數據就會丟失。

// 創建內存Catalog 
Catalog inmemory = new GenericInMemoryCatalog("in_memory_catalog"); 
tableEnv.registerCatalog("in_memory_catalog", inmemory); -- SQL 中創建和使用內存Catalog 
CREATE CATALOG my_memory_catalog WITH ( 
'type' = 'generic_in_memory' 
); 
USE CATALOG my_memory_catalog;

(2) HiveCatalog

使用Hive Metastore 存儲元數據,支持持久化,適合生產環境。

// 創建Hive Catalog 
String name = "my_hive_catalog"; 
String defaultDatabase = "default"; 
String hiveConfDir = "/path/to/hive/conf"; 
String version = "2.3.6"; 
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, 
version); 
tableEnv.registerCatalog("my_hive_catalog", hive); -- SQL 中創建和使用Hive Catalog 
CREATE CATALOG my_hive_catalog WITH ( 
'type' = 'hive', 
'default-database' = 'default', 
'hive-conf-dir' = '/path/to/hive/conf', 
'hive-version' = '2.3.6' 
); 
USE CATALOG my_hive_catalog; 
3. JdbcCatalog 
使用關系型數據庫存儲元數據。 
// 創建JDBC Catalog 
String name = "my_jdbc_catalog"; 
String defaultDatabase = "default"; 
String username = "username"; 
String password = "password"; 


String baseUrl = "jdbc:mysql://localhost:3306"; 


JdbcCatalog jdbc = new JdbcCatalog(name, defaultDatabase, username, pas
 sword, baseUrl); 
tableEnv.registerCatalog("my_jdbc_catalog", jdbc); -- SQL中創建和使用JDBC Catalog(Flink 1.15+) 
CREATE CATALOG my_jdbc_catalog WITH ( 
    'type' = 'jdbc', 
    'default-database' = 'default', 
    'username' = 'username', 
    'password' = 'password', 
    'base-url' = 'jdbc:mysql://localhost:3306' 
); 


USE CATALOG my_jdbc_catalog;

(3) 使用Catalog的SQL操作

1. 創建和切換Catalog -- 創建Catalog 
CREATE CATALOG my_catalog WITH ( 
    'type' = 'hive', 
    'hive-conf-dir' = '/path/to/hive/conf' 
); 
 -- 查看所有Catalog 
SHOW CATALOGS; 
 -- 切換當前Catalog 
USE CATALOG my_catalog; 
2. 創建和使用數據庫 -- 創建數據庫 
CREATE DATABASE my_database; 
 -- 查看所有數據庫 
SHOW DATABASES; 
 -- 切換當前數據庫 
USE my_database; 
3. 管理表和視圖 -- 創建表 
CREATE TABLE my_table ( 
    id INT, 
    name STRING, 
    create_time TIMESTAMP(3) 


) WITH ( 
'connector' = 'kafka', 
'topic' = 'my_topic', 
'properties.bootstrap.servers' = 'localhost:9092', 
'format' = 'json' 
); -- 查看所有表 
SHOW TABLES; -- 查看表結構 
DESCRIBE my_table; 
4. 管理函數 -- 創建自定義函數 
CREATE FUNCTION my_function AS 'com.example.MyFunction'; -- 查看所有函數 
SHOW FUNCTIONS;

(4) Catalog 的實際應用示

// 跨會話持久化元數據 
// 會話1:注冊Hive Catalog和表 
StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecuti
 onEnvironment(); 
StreamTableEnvironment tEnv1 = StreamTableEnvironment.create(env1); 
tEnv1.executeSql("CREATE CATALOG hive_catalog WITH ('type' = 'hive')"); 
tEnv1.executeSql("USE CATALOG hive_catalog"); 
tEnv1.executeSql("CREATE DATABASE IF NOT EXISTS db1"); 
tEnv1.executeSql("USE db1"); 
tEnv1.executeSql( 
"CREATE TABLE IF NOT EXISTS orders (" + 
"  order_id STRING, " + 
"  price DECIMAL(10, 2)" + 
") WITH (" + 
"  'connector' = 'kafka', " + 
"  'topic' = 'orders'" + 
")"); 
// 會話2:直接使用之前注冊的表 
StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecuti
 onEnvironment(); 
StreamTableEnvironment tEnv2 = StreamTableEnvironment.create(env2); 
tEnv2.executeSql("USE CATALOG hive_catalog"); 
tEnv2.executeSql("USE db1"); 

// 不需要重新定義表結構,可以直接查詢 
tEnv2.executeSql("SELECT * FROM orders"); 
使用不同類型的Catalog實現多源數據集成 
// 注冊多個Catalog訪問不同系統 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutio
 nEnvironment(); 
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); 
// 注冊Hive Catalog 
tEnv.executeSql("CREATE CATALOG hive_catalog WITH ('type' = 'hive')"); 
// 注冊JDBC Catalog 
tEnv.executeSql( 
"CREATE CATALOG jdbc_catalog WITH (" + 
"  'type' = 'jdbc', " + 
"  'default-database' = 'default', " + 
"  'username' = 'user', " + 
"  'password' = 'password', " + 
"  'base-url' = 'jdbc:mysql://localhost:3306'" + 
")"); 
// 從不同Catalog中的表關聯查詢 
tEnv.executeSql( 
"SELECT o.order_id, o.price, c.name " + 
"FROM hive_catalog.db1.orders o " + 
"JOIN jdbc_catalog.default.customers c " + 
"ON o.customer_id = c.id" 
); 
責任編輯:趙寧寧 來源: 大數據技能圈
相關推薦

2013-12-25 16:03:39

GitGit 命令

2022-07-29 07:48:15

HTTP常用狀態碼

2025-02-07 14:33:04

2020-02-14 13:50:32

JavaScript前端技術

2023-05-15 08:50:58

ContextGolang

2024-10-10 08:46:28

2024-10-06 12:35:50

2020-09-04 13:30:43

Java自定義代碼

2023-05-16 07:47:18

RabbitMQ消息隊列系統

2013-07-10 15:52:17

fragmentAndroid

2024-04-16 12:13:07

usingC#開發

2024-01-30 09:43:43

Java緩存技術

2024-11-12 06:27:16

Python列表元組

2022-12-08 11:54:55

元宇宙

2018-08-15 09:48:27

數據庫Redis應用場景

2009-08-03 11:38:57

linux at命令詳linux at命令

2025-02-11 09:49:12

2011-05-16 15:49:58

JAVA

2009-05-18 13:07:44

類隱藏Java關鍵字

2024-09-06 11:52:47

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久久99精品久久久久久国产越南 | 欧美亚洲国产日韩 | 真人毛片| 亚洲日本国产 | 国产成人免费视频 | 国产成人精品免费视频大全最热 | 免费在线观看av | 免费视频一区 | 日韩视频中文字幕 | 精品国产青草久久久久福利 | 国产一区 | 91.xxx.高清在线 | 成人免费一区二区三区视频网站 | 欧美黑人一区二区三区 | 99视频在线免费观看 | 麻豆hd| 精品美女久久久 | 亚洲成人av一区二区 | a级在线免费观看 | 国产在线永久免费 | 91精品国产一区二区三区 | 日韩视频在线免费观看 | 国产精品视频播放 | 国产中文字幕亚洲 | 美女视频一区 | 国产1区2区在线观看 | 天天夜夜操| 自拍偷拍中文字幕 | 免费一区二区三区在线视频 | 韩国理论电影在线 | 精品av| 欧美日韩在线综合 | 国产精品视频网站 | 亚洲在线看 | 在线观看视频你懂得 | 欧美日韩久 | 久久久久国产视频 | 狠狠躁18三区二区一区 | 在线国产一区 | 日韩欧美在线视频 | 国产乱码精品一区二区三区中文 |