前言
Hudi除了支持Spark、Fink寫Hudi外,還支持Java客戶端。本文總結Hudi Java Client如何使用,主要為代碼示例,可以實現讀取Hive表寫Hudi表。當然也支持讀取其他數據源,比如mysql,實現讀取mysql的歷史數據和增量數據寫Hudi。
版本
Hudi 0.12.0
功能支持
支持insert/upsert/delete,暫不支持bulkInsert目前僅支持COW表支持完整的寫Hudi操作,包括rollback、clean、archive等。
代碼
完整代碼已上傳GitHub:https://github.com/dongkelun/hudi-demo/tree/master/java-client。
其中HoodieJavaWriteClientExample是從Hudi源碼里拷貝的,包含了insert/upsert/delte/的代碼示例,JavaClientHive2Hudi是我自己的寫的代碼示例總結,實現了kerberos認證、讀取Hive表Schema作為寫hudi的Schema、讀取Hive表數據寫hudi表,并同步hudi元數據至hive元數據,實現自動創建Hive元數據,當然也支持讀取其他數據源,比如mysql,實現歷史和增量寫。
相比于HoodieJavaWriteClientExample,JavaClientHive2Hudi加了很多配置參數,更貼近實際使用,比如HoodieJavaWriteClientExample的payload為HoodieAvroPayload這只能作為示例使用,JavaClientHive2Hudi使用的為DefaultHoodieRecordPayload它支持預合并和歷史值比較,關于這一點可以參考我之前寫的文章:Hudi preCombinedField 總結(二)-源碼分析,如果只需要預合并功能,可以使用OverwriteWithLatestAvroPayload,這倆分別是Spark SQL 和 Spark DF的默認值,當然都不需要的話,也支持HoodieAvroPayload,代碼里是根據條件判斷需要用哪個payloadClassName。
String payloadClassName = shouldOrdering ? DefaultHoodieRecordPayload.class.getName() :
shouldCombine ? OverwriteWithLatestAvroPayload.class.getName() : HoodieAvroPayload.class.getName();
然后利用反射構造payload,其實這里反射的邏輯就是Hudi Spark源碼里的邏輯。
另一個它更貼近實際使用的原因就是我們項目上就是將Hudi Java Client封裝成了一個NIFI processor,然后用NIFI調度,其性能和穩定性都能夠滿足項目需求,這里的核心邏輯和實際項目中的邏輯是差不多的。關于我們使用Java客戶端的原因是由于歷史原因造成的,因為我們之前還沒有調度Spark、Flink的開發工具(之前用的NIFI),而開發一個新的開發工具的話是需要時間成本的,所以選擇了Java客戶端,我們現在已經將Apache DolphinScheduler作為自己的開發調度工具了,后面會主要使用Spark/Flink,所以現在總結一下Hudi Java Client的使用以及源碼,避免遺忘,也希望對大家有所幫助。
初始化Hudi表
Java Client的代碼更貼近源碼。
initTable主要是根據一些配置信息,生成.hoodie元數據路徑,并生成hoodie.properties元數據文件,該文件里持久化保存了Hudi的一些配置信息。
if (!(fs.exists(path) && fs.exists(hoodiePath))) { //根據Hudi路徑存不存在,判斷Hudi表需不需要初始化
if (Arrays.asList(INSERT_OPERATION, UPSERT_OPERATION).contains(writeOperationType)) {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(TABLE_TYPE)
.setTableName(targetTable)
.setPayloadClassName(payloadClassName)
.setRecordKeyFields(recordKeyFields)
.setPreCombineField(preCombineField)
.setPartitionFields(partitionFields)
.setBootstrapIndexClass(NoOpBootstrapIndex.class.getName())
.initTable(hadoopConf, tablePath);
} else if (writeOperationType.equals(DELETE_OPERATION)) { //Delete操作,Hudi表必須已經存在
throw new TableNotFoundException(tablePath);
}
}
hoodie.properties
#Properties saved on 2022-10-24T07:40:36.530Z
#Mon Oct 24 15:40:36 CST 2022
hoodie.table.name=test_hudi_target
hoodie.archivelog.folder=archived
hoodie.table.type=COPY_ON_WRITE
hoodie.table.version=5
hoodie.timeline.layout.version=1
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.checksum=1749434190
創建HoodieJavaWriteClient
首先要創建HoodieWriteConfig,主要是hudi的一些配置,比如Schema、表名、payload、索引、clean等一些參數,具體可以自己去了解。
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(writeSchema.toString())
.withParallelism(2, 2).withDeleteParallelism(2)
.forTable(targetTable)
.withWritePayLoad(payloadClassName)
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(orderingField).build())
.withIndexConfig(HoodieIndexConfig.newBuilder()
.withIndexType(HoodieIndex.IndexType.BLOOM)
// .bloomIndexPruneByRanges(false) // 1000萬總體時間提升1分鐘
.bloomFilterFPP(0.000001) // 1000萬總體時間提升3分鐘
.fromProperties(indexProperties)
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(smallFileLimit)
.approxRecordSize(recordSizeEstimate).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(150, 200).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(100).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.parquetMaxFileSize(maxFileSize).build())
.build();
writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)
startCommit
返回commitTime,首先會執行rollback,然后創建一個.commit.request,再將commitTime返回。
String newCommitTime = writeClient.startCommit();
generateRecord
這里主要是構造寫hudi需要的數據結構,包含HoodieKey和payLoad,其中delete操作只需要HoodieKey。
public static List<HoodieRecord<HoodieRecordPayload>> generateRecord(ResultSet rs,
Schema writeSchema,
String payloadClassName,
boolean shouldCombine) throws IOException, SQLException {
List<HoodieRecord<HoodieRecordPayload>> list = new ArrayList<>();
while (rs.next()) {
GenericRecord rec = new GenericData.Record(writeSchema);
writeSchema.getFields().forEach(field -> {
try {
rec.put(field.name(), convertValueType(rs, field.name(), field.schema().getType()));
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
String partitionPath = partitionFields == null ? "" : getRecordPartitionPath(rs, writeSchema);
System.out.println(partitionPath);
String rowKey = recordKeyFields == null && writeOperationType.equals(INSERT_OPERATION) ? UUID.randomUUID().toString() : getRecordKey(rs, writeSchema);
HoodieKey key = new HoodieKey(rowKey, partitionPath);
if (shouldCombine) {
Object orderingVal = HoodieAvroUtils.getNestedFieldVal(rec, preCombineField, false, false);
list.add(new HoodieAvroRecord<>(key, createPayload(payloadClassName, rec, (Comparable) orderingVal)));
} else {
list.add(new HoodieAvroRecord<>(key, createPayload(payloadClassName, rec)));
}
}
return list;
}
寫Hudi
最后執行寫Hudi的操作,常用upsert/insert/delete,Java Client也是默認開啟clean等操作的,具體的實現是在HoodieJavaCopyOnWriteTable中。目前還不支持bulkInsert等操作,后面如果我有能力的話,會嘗試提交PR支持。
writeClient.upsert(records, newCommitTime);
writeClient.insert(records, newCommitTime);
writeClient.delete(records, newCommitTime);
同步Hive
最后是同步元數據至Hive,實現在hive中建表,這一步是可選的。這樣可以利用Hive SQL和Spark SQL查詢Hudi表。
/**
* 利用HiveSyncTool同步Hive元數據
* Spark寫Hudi同步hive元數據的源碼就是這樣同步的
*
* @param properties
* @param hiveConf
*/
public static void syncHive(TypedProperties properties, HiveConf hiveConf) {
HiveSyncTool hiveSyncTool = new HiveSyncTool(properties, hiveConf);
hiveSyncTool.syncHoodieTable();
}
public static HiveConf getHiveConf(String hiveSitePath, String coreSitePath, String hdfsSitePath) {
HiveConf configuration = new HiveConf();
configuration.addResource(new Path(hiveSitePath));
configuration.addResource(new Path(coreSitePath));
configuration.addResource(new Path(hdfsSitePath));
return configuration;
}
/**
* 同步Hive元數據的一些屬性配置
* @param basePath
* @return
*/
public static TypedProperties getHiveSyncProperties(String basePath) {
TypedProperties properties = new TypedProperties();
properties.put(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), HiveSyncMode.HMS.name());
properties.put(HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE.key(), true);
properties.put(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), dbName);
properties.put(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), targetTable);
properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), basePath);
properties.put(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), MultiPartKeysValueExtractor.class.getName());
properties.put(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), partitionFields);
if (partitionFields != null && !partitionFields.isEmpty()) {
properties.put(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), partitionFields);
}
return properties;
}
與0.9.0版本差異
之前是基于0.9.0版本開發的,本文代碼示例基于0.12.0,核心代碼是一樣的,差異的地方有兩處。
1、0.9.0 clean、archive的參數都是在withCompactionConfig中,現在單獨拎出來2、0.9.0 HiveSyncTool的參數為HiveSyncConfig,現在為TypedProperties。
總結
Hudi Java Client和Spark、Flink一樣都可以實現完整的寫Hudi的邏輯,但是目前功能支持還不完善,比如不支持MOR表,而且性能上也不如Spark、Flink,畢竟Spark、FLink都是集群,但是Hudi Java Client可以集成到其他框架中,比如NIFI,集成起來比較方便,集成到NIFI的好處是,可以通過拖來拽配置參數的形式完成歷史數據和增量數據寫入Hudi。也可以自己實現多線程,提升性能,我們目前測試的性能是Insert可以達到10000條/s,而upsert因為需要讀取索引,還有歷史數據的更新,可能需要重寫整個表,所以當歷史數據比較大且更新占比比較高時,單線程的性能會非常差,但是我們基于源碼改造,將布隆索引和寫數據的部分改為多線程后,性能就會提升很多,當然這也取決于機器的性能,和CPU、內存有關。對于數據量不是很大的ZF數據,一般大表幾十億,性能還是可以滿足要求的。