成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Hudi Java Client總結之讀取Hive寫Hudi代碼

開發 前端
initTable主要是根據一些配置信息,生成.hoodie元數據路徑,并生成hoodie.properties元數據文件,該文件里持久化保存了Hudi的一些配置信息。

前言

Hudi除了支持Spark、Fink寫Hudi外,還支持Java客戶端。本文總結Hudi Java Client如何使用,主要為代碼示例,可以實現讀取Hive表寫Hudi表。當然也支持讀取其他數據源,比如mysql,實現讀取mysql的歷史數據和增量數據寫Hudi。

版本

Hudi 0.12.0

功能支持

支持insert/upsert/delete,暫不支持bulkInsert目前僅支持COW表支持完整的寫Hudi操作,包括rollback、clean、archive等。

代碼

完整代碼已上傳GitHub:https://github.com/dongkelun/hudi-demo/tree/master/java-client。

其中HoodieJavaWriteClientExample是從Hudi源碼里拷貝的,包含了insert/upsert/delte/的代碼示例,JavaClientHive2Hudi是我自己的寫的代碼示例總結,實現了kerberos認證、讀取Hive表Schema作為寫hudi的Schema、讀取Hive表數據寫hudi表,并同步hudi元數據至hive元數據,實現自動創建Hive元數據,當然也支持讀取其他數據源,比如mysql,實現歷史和增量寫。

相比于HoodieJavaWriteClientExample,JavaClientHive2Hudi加了很多配置參數,更貼近實際使用,比如HoodieJavaWriteClientExample的payload為HoodieAvroPayload這只能作為示例使用,JavaClientHive2Hudi使用的為DefaultHoodieRecordPayload它支持預合并和歷史值比較,關于這一點可以參考我之前寫的文章:Hudi preCombinedField 總結(二)-源碼分析,如果只需要預合并功能,可以使用OverwriteWithLatestAvroPayload,這倆分別是Spark SQL 和 Spark DF的默認值,當然都不需要的話,也支持HoodieAvroPayload,代碼里是根據條件判斷需要用哪個payloadClassName。

String payloadClassName = shouldOrdering ? DefaultHoodieRecordPayload.class.getName() :
shouldCombine ? OverwriteWithLatestAvroPayload.class.getName() : HoodieAvroPayload.class.getName();

然后利用反射構造payload,其實這里反射的邏輯就是Hudi Spark源碼里的邏輯。

另一個它更貼近實際使用的原因就是我們項目上就是將Hudi Java Client封裝成了一個NIFI processor,然后用NIFI調度,其性能和穩定性都能夠滿足項目需求,這里的核心邏輯和實際項目中的邏輯是差不多的。關于我們使用Java客戶端的原因是由于歷史原因造成的,因為我們之前還沒有調度Spark、Flink的開發工具(之前用的NIFI),而開發一個新的開發工具的話是需要時間成本的,所以選擇了Java客戶端,我們現在已經將Apache DolphinScheduler作為自己的開發調度工具了,后面會主要使用Spark/Flink,所以現在總結一下Hudi Java Client的使用以及源碼,避免遺忘,也希望對大家有所幫助。

初始化Hudi表

Java Client的代碼更貼近源碼。

initTable主要是根據一些配置信息,生成.hoodie元數據路徑,并生成hoodie.properties元數據文件,該文件里持久化保存了Hudi的一些配置信息。

if (!(fs.exists(path) && fs.exists(hoodiePath))) { //根據Hudi路徑存不存在,判斷Hudi表需不需要初始化
if (Arrays.asList(INSERT_OPERATION, UPSERT_OPERATION).contains(writeOperationType)) {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(TABLE_TYPE)
.setTableName(targetTable)
.setPayloadClassName(payloadClassName)
.setRecordKeyFields(recordKeyFields)
.setPreCombineField(preCombineField)
.setPartitionFields(partitionFields)
.setBootstrapIndexClass(NoOpBootstrapIndex.class.getName())
.initTable(hadoopConf, tablePath);
} else if (writeOperationType.equals(DELETE_OPERATION)) { //Delete操作,Hudi表必須已經存在
throw new TableNotFoundException(tablePath);
}
}

hoodie.properties

#Properties saved on 2022-10-24T07:40:36.530Z
#Mon Oct 24 15:40:36 CST 2022
hoodie.table.name=test_hudi_target
hoodie.archivelog.folder=archived
hoodie.table.type=COPY_ON_WRITE
hoodie.table.version=5
hoodie.timeline.layout.version=1
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.checksum=1749434190

創建HoodieJavaWriteClient

首先要創建HoodieWriteConfig,主要是hudi的一些配置,比如Schema、表名、payload、索引、clean等一些參數,具體可以自己去了解。

HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(writeSchema.toString())
.withParallelism(2, 2).withDeleteParallelism(2)
.forTable(targetTable)
.withWritePayLoad(payloadClassName)
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(orderingField).build())
.withIndexConfig(HoodieIndexConfig.newBuilder()
.withIndexType(HoodieIndex.IndexType.BLOOM)
// .bloomIndexPruneByRanges(false) // 1000萬總體時間提升1分鐘
.bloomFilterFPP(0.000001) // 1000萬總體時間提升3分鐘
.fromProperties(indexProperties)
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(smallFileLimit)
.approxRecordSize(recordSizeEstimate).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(150, 200).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(100).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.parquetMaxFileSize(maxFileSize).build())
.build();

writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)

startCommit

返回commitTime,首先會執行rollback,然后創建一個.commit.request,再將commitTime返回。

String newCommitTime = writeClient.startCommit();

generateRecord

這里主要是構造寫hudi需要的數據結構,包含HoodieKey和payLoad,其中delete操作只需要HoodieKey。

public static List<HoodieRecord<HoodieRecordPayload>> generateRecord(ResultSet rs,
Schema writeSchema,
String payloadClassName,
boolean shouldCombine) throws IOException, SQLException {
List<HoodieRecord<HoodieRecordPayload>> list = new ArrayList<>();

while (rs.next()) {
GenericRecord rec = new GenericData.Record(writeSchema);

writeSchema.getFields().forEach(field -> {
try {
rec.put(field.name(), convertValueType(rs, field.name(), field.schema().getType()));
} catch (SQLException e) {
throw new RuntimeException(e);
}
});

String partitionPath = partitionFields == null ? "" : getRecordPartitionPath(rs, writeSchema);
System.out.println(partitionPath);
String rowKey = recordKeyFields == null && writeOperationType.equals(INSERT_OPERATION) ? UUID.randomUUID().toString() : getRecordKey(rs, writeSchema);
HoodieKey key = new HoodieKey(rowKey, partitionPath);
if (shouldCombine) {
Object orderingVal = HoodieAvroUtils.getNestedFieldVal(rec, preCombineField, false, false);
list.add(new HoodieAvroRecord<>(key, createPayload(payloadClassName, rec, (Comparable) orderingVal)));
} else {
list.add(new HoodieAvroRecord<>(key, createPayload(payloadClassName, rec)));
}

}
return list;
}

寫Hudi

最后執行寫Hudi的操作,常用upsert/insert/delete,Java Client也是默認開啟clean等操作的,具體的實現是在HoodieJavaCopyOnWriteTable中。目前還不支持bulkInsert等操作,后面如果我有能力的話,會嘗試提交PR支持。

writeClient.upsert(records, newCommitTime);
writeClient.insert(records, newCommitTime);
writeClient.delete(records, newCommitTime);

同步Hive

最后是同步元數據至Hive,實現在hive中建表,這一步是可選的。這樣可以利用Hive SQL和Spark SQL查詢Hudi表。

/**
* 利用HiveSyncTool同步Hive元數據
* Spark寫Hudi同步hive元數據的源碼就是這樣同步的
*
* @param properties
* @param hiveConf
*/
public static void syncHive(TypedProperties properties, HiveConf hiveConf) {
HiveSyncTool hiveSyncTool = new HiveSyncTool(properties, hiveConf);
hiveSyncTool.syncHoodieTable();
}

public static HiveConf getHiveConf(String hiveSitePath, String coreSitePath, String hdfsSitePath) {
HiveConf configuration = new HiveConf();
configuration.addResource(new Path(hiveSitePath));
configuration.addResource(new Path(coreSitePath));
configuration.addResource(new Path(hdfsSitePath));

return configuration;
}

/**
* 同步Hive元數據的一些屬性配置
* @param basePath
* @return
*/
public static TypedProperties getHiveSyncProperties(String basePath) {
TypedProperties properties = new TypedProperties();
properties.put(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), HiveSyncMode.HMS.name());
properties.put(HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE.key(), true);
properties.put(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), dbName);
properties.put(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), targetTable);
properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), basePath);
properties.put(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), MultiPartKeysValueExtractor.class.getName());
properties.put(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), partitionFields);
if (partitionFields != null && !partitionFields.isEmpty()) {
properties.put(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), partitionFields);
}

return properties;
}

與0.9.0版本差異

之前是基于0.9.0版本開發的,本文代碼示例基于0.12.0,核心代碼是一樣的,差異的地方有兩處。

1、0.9.0 clean、archive的參數都是在withCompactionConfig中,現在單獨拎出來2、0.9.0 HiveSyncTool的參數為HiveSyncConfig,現在為TypedProperties。

總結

Hudi Java Client和Spark、Flink一樣都可以實現完整的寫Hudi的邏輯,但是目前功能支持還不完善,比如不支持MOR表,而且性能上也不如Spark、Flink,畢竟Spark、FLink都是集群,但是Hudi Java Client可以集成到其他框架中,比如NIFI,集成起來比較方便,集成到NIFI的好處是,可以通過拖來拽配置參數的形式完成歷史數據和增量數據寫入Hudi。也可以自己實現多線程,提升性能,我們目前測試的性能是Insert可以達到10000條/s,而upsert因為需要讀取索引,還有歷史數據的更新,可能需要重寫整個表,所以當歷史數據比較大且更新占比比較高時,單線程的性能會非常差,但是我們基于源碼改造,將布隆索引和寫數據的部分改為多線程后,性能就會提升很多,當然這也取決于機器的性能,和CPU、內存有關。對于數據量不是很大的ZF數據,一般大表幾十億,性能還是可以滿足要求的。

責任編輯:武曉燕 來源: 倫少的博客
相關推薦

2022-11-01 07:43:30

2022-11-03 07:22:42

2023-02-26 00:12:10

Hadoop數據湖存儲

2022-10-17 07:51:31

Hudi異常HDFS

2024-04-26 07:36:42

Hudi 1.0數據湖倉數據查詢

2022-10-24 00:26:51

大數據Hadoop存儲層

2021-08-31 10:07:16

Flink Hud數據湖阿里云

2022-12-08 07:17:49

2022-12-23 16:52:22

Lakehouse數據湖

2022-07-20 11:47:18

數據

2022-10-17 10:48:50

Hudi大數據Hadoop

2020-03-26 10:05:18

大數據IT互聯網

2023-07-19 16:22:00

Hudi機器學習

2021-09-13 14:19:03

HudiLakehouse阿里云

2023-12-14 13:01:00

Hudivivo

2023-09-05 07:22:17

Hudi數據存儲

2025-06-09 09:57:16

2021-09-13 13:46:29

Apache HudiB 站數據湖

2022-06-09 14:19:46

順豐數據集成Flink

2022-06-08 13:25:51

數據
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产精品久久久久久久毛片 | 在线视频日韩 | 美女毛片免费看 | 337p日韩| 国产精品爱久久久久久久 | 欧美一级www片免费观看 | 久久成人免费 | 欧美一级www片免费观看 | 国产区精品视频 | 欧美在线视频网 | 精品免费国产 | www日日日 | 亚洲精品乱码久久久久久9色 | 国产午夜精品视频 | 盗摄精品av一区二区三区 | 亚洲激情一区二区三区 | 亚洲夜夜爽 | 羞羞视频在线观看 | 我想看国产一级毛片 | 天天干天天草 | 91欧美| 国产综合视频 | 日韩精品一区在线 | 国产免费麻豆视频 | caoporn地址 | 一级片在线观看 | 日韩精品二区 | 久久婷婷色 | 国产高清精品一区二区三区 | 亚洲视频免费 | 欧美美女爱爱视频 | 日韩成人免费中文字幕 | 99久久精品国产一区二区三区 | 鲁一鲁资源影视 | 日韩久草 | 精品欧美一区二区精品久久久 | cao视频 | 看片国产 | 国产中文字幕亚洲 | 青草久久免费视频 | av黄色免费在线观看 |