From 389c702a3f957730c14503ac131e2f82563fc303 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Thu, 22 Feb 2024 13:39:17 +0800 Subject: [PATCH] [improvement](hudi)Obtain partition information through HMS's API (#30962) When a Hudi table is synchronized to HMS, the partition information is also synchronized, so even if the metastore is not enabled in the Hudi table (for example, if the Metastore is false for a Hudi table created with Flink), you can still obtain the partition information through the HMS API. --- docs/en/docs/lakehouse/multi-catalog/hudi.md | 6 +++++ .../docs/lakehouse/multi-catalog/hudi.md | 6 +++++ .../datasource/hive/HMSExternalTable.java | 11 ++++++++ .../source/HudiCachedPartitionProcessor.java | 27 ++++++++++++++++--- .../datasource/hudi/source/HudiScanNode.java | 8 ++++-- 5 files changed, 52 insertions(+), 6 deletions(-) diff --git a/docs/en/docs/lakehouse/multi-catalog/hudi.md b/docs/en/docs/lakehouse/multi-catalog/hudi.md index 52892db2df..a52c2370ce 100644 --- a/docs/en/docs/lakehouse/multi-catalog/hudi.md +++ b/docs/en/docs/lakehouse/multi-catalog/hudi.md @@ -55,6 +55,12 @@ CREATE CATALOG hudi PROPERTIES ( ); ``` +Optional configuration parameters: + +|name|description|default| +|---|---|---| +|use_hive_sync_partition|Use hms synchronized partition data|false| + ## Column Type Mapping Same as that in Hive Catalogs. See the relevant section in [Hive](./hive.md). diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md index b619283cac..38bb26d3bc 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md @@ -55,6 +55,12 @@ CREATE CATALOG hudi PROPERTIES ( ); ``` +可选配置参数: + +|参数名|说明|默认值| +|---|---|---| +|use_hive_sync_partition|使用hms已同步的分区数据|false| + ## 列类型映射 和 Hive Catalog 一致,可参阅 [Hive Catalog](./hive.md) 中 **列类型映射** 一节。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index c841c749fb..62b2c35b8c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -104,6 +104,9 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI private static final String SPARK_STATS_MAX_LEN = ".avgLen"; private static final String SPARK_STATS_HISTOGRAM = ".histogram"; + private static final String USE_HIVE_SYNC_PARTITION = "use_hive_sync_partition"; + + static { SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet(); SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"); @@ -227,6 +230,14 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI return "org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName); } + /** + * Some data lakes (such as Hudi) will synchronize their partition information to HMS, + * then we can quickly obtain the partition information of the table from HMS. + */ + public boolean useHiveSyncPartition() { + return Boolean.parseBoolean(catalog.getProperties().getOrDefault(USE_HIVE_SYNC_PARTITION, "false")); + } + /** * Now we only support three file input format hive tables: parquet/orc/text. * Support managed_table and external_table. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java index eb09c5efb5..90c89dbbda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java @@ -21,6 +21,7 @@ import org.apache.doris.common.Config; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.TablePartitionValues; import org.apache.doris.datasource.TablePartitionValues.TablePartitionKey; +import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import com.google.common.base.Preconditions; @@ -31,6 +32,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.List; @@ -39,6 +42,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { + private static final Logger LOG = LoggerFactory.getLogger(HudiCachedPartitionProcessor.class); private final long catalogId; private final Executor executor; private final LoadingCache partitionCache; @@ -81,7 +85,7 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { } public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table, - HoodieTableMetaClient tableMetaClient, String timestamp) { + HoodieTableMetaClient tableMetaClient, String timestamp, boolean useHiveSyncPartition) { Preconditions.checkState(catalogId == table.getCatalog().getId()); Option partitionColumns = tableMetaClient.getTableConfig().getPartitionFields(); if (!partitionColumns.isPresent()) { @@ -94,7 +98,7 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { } long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp()); if (Long.parseLong(timestamp) == lastTimestamp) { - return getPartitionValues(table, tableMetaClient); + return getPartitionValues(table, tableMetaClient, useHiveSyncPartition); } List partitionNameAndValues = getPartitionNamesBeforeOrEquals(timeline, timestamp); List partitionNames = Arrays.asList(partitionColumns.get()); @@ -105,7 +109,8 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { return partitionValues; } - public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient) + public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient, + boolean useHiveSyncPartition) throws CacheException { Preconditions.checkState(catalogId == table.getCatalog().getId()); Option partitionColumns = tableMetaClient.getTableConfig().getPartitionFields(); @@ -137,7 +142,21 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { if (lastTimestamp <= lastUpdateTimestamp) { return partitionValues; } - List partitionNames = getAllPartitionNames(tableMetaClient); + HMSExternalCatalog catalog = (HMSExternalCatalog) table.getCatalog(); + List partitionNames; + if (useHiveSyncPartition) { + // When a Hudi table is synchronized to HMS, the partition information is also synchronized, + // so even if the metastore is not enabled in the Hudi table + // (for example, if the Metastore is false for a Hudi table created with Flink), + // we can still obtain the partition information through the HMS API. + partitionNames = catalog.getClient().listPartitionNames(table.getDbName(), table.getName()); + if (partitionNames.size() == 0) { + LOG.warn("Failed to get partitions from hms api, switch it from hudi api."); + partitionNames = getAllPartitionNames(tableMetaClient); + } + } else { + partitionNames = getAllPartitionNames(tableMetaClient); + } List partitionColumnsList = Arrays.asList(partitionColumns.get()); partitionValues.cleanPartitions(); partitionValues.addPartitions(partitionNames, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 144d308140..4f4b1c3e8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -83,6 +83,8 @@ public class HudiScanNode extends HiveScanNode { private final AtomicLong noLogsSplitNum = new AtomicLong(0); + private final boolean useHiveSyncPartition; + /** * External file scan node for Query Hudi table * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv @@ -102,6 +104,7 @@ public class HudiScanNode extends HiveScanNode { LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE", hmsTable.getName()); } } + useHiveSyncPartition = hmsTable.useHiveSyncPartition(); } @Override @@ -171,9 +174,10 @@ public class HudiScanNode extends HiveScanNode { .getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog()); TablePartitionValues partitionValues; if (snapshotTimestamp.isPresent()) { - partitionValues = processor.getSnapshotPartitionValues(hmsTable, metaClient, snapshotTimestamp.get()); + partitionValues = processor.getSnapshotPartitionValues( + hmsTable, metaClient, snapshotTimestamp.get(), useHiveSyncPartition); } else { - partitionValues = processor.getPartitionValues(hmsTable, metaClient); + partitionValues = processor.getPartitionValues(hmsTable, metaClient, useHiveSyncPartition); } if (partitionValues != null) { // 2. prune partitions by expr