[improvement](hudi)Obtain partition information through HMS's API (#30962)
When a Hudi table is synchronized to HMS, the partition information is also synchronized, so even if the metastore is not enabled in the Hudi table (for example, if the Metastore is false for a Hudi table created with Flink), you can still obtain the partition information through the HMS API.
This commit is contained in:
@ -104,6 +104,9 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
private static final String SPARK_STATS_MAX_LEN = ".avgLen";
|
||||
private static final String SPARK_STATS_HISTOGRAM = ".histogram";
|
||||
|
||||
private static final String USE_HIVE_SYNC_PARTITION = "use_hive_sync_partition";
|
||||
|
||||
|
||||
static {
|
||||
SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet();
|
||||
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat");
|
||||
@ -227,6 +230,14 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
return "org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Some data lakes (such as Hudi) will synchronize their partition information to HMS,
|
||||
* then we can quickly obtain the partition information of the table from HMS.
|
||||
*/
|
||||
public boolean useHiveSyncPartition() {
|
||||
return Boolean.parseBoolean(catalog.getProperties().getOrDefault(USE_HIVE_SYNC_PARTITION, "false"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Now we only support three file input format hive tables: parquet/orc/text.
|
||||
* Support managed_table and external_table.
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.common.Config;
|
||||
import org.apache.doris.datasource.CacheException;
|
||||
import org.apache.doris.datasource.TablePartitionValues;
|
||||
import org.apache.doris.datasource.TablePartitionValues.TablePartitionKey;
|
||||
import org.apache.doris.datasource.hive.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -31,6 +32,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
@ -39,6 +42,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HudiCachedPartitionProcessor.class);
|
||||
private final long catalogId;
|
||||
private final Executor executor;
|
||||
private final LoadingCache<TablePartitionKey, TablePartitionValues> partitionCache;
|
||||
@ -81,7 +85,7 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
|
||||
}
|
||||
|
||||
public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table,
|
||||
HoodieTableMetaClient tableMetaClient, String timestamp) {
|
||||
HoodieTableMetaClient tableMetaClient, String timestamp, boolean useHiveSyncPartition) {
|
||||
Preconditions.checkState(catalogId == table.getCatalog().getId());
|
||||
Option<String[]> partitionColumns = tableMetaClient.getTableConfig().getPartitionFields();
|
||||
if (!partitionColumns.isPresent()) {
|
||||
@ -94,7 +98,7 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
|
||||
}
|
||||
long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp());
|
||||
if (Long.parseLong(timestamp) == lastTimestamp) {
|
||||
return getPartitionValues(table, tableMetaClient);
|
||||
return getPartitionValues(table, tableMetaClient, useHiveSyncPartition);
|
||||
}
|
||||
List<String> partitionNameAndValues = getPartitionNamesBeforeOrEquals(timeline, timestamp);
|
||||
List<String> partitionNames = Arrays.asList(partitionColumns.get());
|
||||
@ -105,7 +109,8 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
|
||||
return partitionValues;
|
||||
}
|
||||
|
||||
public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient)
|
||||
public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient,
|
||||
boolean useHiveSyncPartition)
|
||||
throws CacheException {
|
||||
Preconditions.checkState(catalogId == table.getCatalog().getId());
|
||||
Option<String[]> partitionColumns = tableMetaClient.getTableConfig().getPartitionFields();
|
||||
@ -137,7 +142,21 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
|
||||
if (lastTimestamp <= lastUpdateTimestamp) {
|
||||
return partitionValues;
|
||||
}
|
||||
List<String> partitionNames = getAllPartitionNames(tableMetaClient);
|
||||
HMSExternalCatalog catalog = (HMSExternalCatalog) table.getCatalog();
|
||||
List<String> partitionNames;
|
||||
if (useHiveSyncPartition) {
|
||||
// When a Hudi table is synchronized to HMS, the partition information is also synchronized,
|
||||
// so even if the metastore is not enabled in the Hudi table
|
||||
// (for example, if the Metastore is false for a Hudi table created with Flink),
|
||||
// we can still obtain the partition information through the HMS API.
|
||||
partitionNames = catalog.getClient().listPartitionNames(table.getDbName(), table.getName());
|
||||
if (partitionNames.size() == 0) {
|
||||
LOG.warn("Failed to get partitions from hms api, switch it from hudi api.");
|
||||
partitionNames = getAllPartitionNames(tableMetaClient);
|
||||
}
|
||||
} else {
|
||||
partitionNames = getAllPartitionNames(tableMetaClient);
|
||||
}
|
||||
List<String> partitionColumnsList = Arrays.asList(partitionColumns.get());
|
||||
partitionValues.cleanPartitions();
|
||||
partitionValues.addPartitions(partitionNames,
|
||||
|
||||
@ -83,6 +83,8 @@ public class HudiScanNode extends HiveScanNode {
|
||||
|
||||
private final AtomicLong noLogsSplitNum = new AtomicLong(0);
|
||||
|
||||
private final boolean useHiveSyncPartition;
|
||||
|
||||
/**
|
||||
* External file scan node for Query Hudi table
|
||||
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
|
||||
@ -102,6 +104,7 @@ public class HudiScanNode extends HiveScanNode {
|
||||
LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE", hmsTable.getName());
|
||||
}
|
||||
}
|
||||
useHiveSyncPartition = hmsTable.useHiveSyncPartition();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -171,9 +174,10 @@ public class HudiScanNode extends HiveScanNode {
|
||||
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog());
|
||||
TablePartitionValues partitionValues;
|
||||
if (snapshotTimestamp.isPresent()) {
|
||||
partitionValues = processor.getSnapshotPartitionValues(hmsTable, metaClient, snapshotTimestamp.get());
|
||||
partitionValues = processor.getSnapshotPartitionValues(
|
||||
hmsTable, metaClient, snapshotTimestamp.get(), useHiveSyncPartition);
|
||||
} else {
|
||||
partitionValues = processor.getPartitionValues(hmsTable, metaClient);
|
||||
partitionValues = processor.getPartitionValues(hmsTable, metaClient, useHiveSyncPartition);
|
||||
}
|
||||
if (partitionValues != null) {
|
||||
// 2. prune partitions by expr
|
||||
|
||||
Reference in New Issue
Block a user