diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java index bcc967501c..87f11e5863 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java @@ -171,7 +171,7 @@ public class TablePartitionValues { return readWriteLock.writeLock(); } - private void cleanPartitions() { + public void cleanPartitions() { nextPartitionId = 0; idToPartitionItem.clear(); partitionNameToIdMap.clear(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java index ab6a8839b5..37225c2339 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java @@ -23,6 +23,7 @@ import org.apache.doris.datasource.CacheException; import org.apache.doris.planner.external.TablePartitionValues; import org.apache.doris.planner.external.TablePartitionValues.TablePartitionKey; +import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -39,10 +40,12 @@ import java.util.stream.Collectors; public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { private final long catalogId; + private final Executor executor; private final LoadingCache partitionCache; public HudiCachedPartitionProcessor(long catalogId, Executor executor) { this.catalogId = catalogId; + this.executor = executor; this.partitionCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num) .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) .build(CacheLoader.asyncReloading( @@ -54,6 +57,10 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { }, executor)); } + public Executor getExecutor() { + return executor; + } + @Override public void cleanUp() { partitionCache.cleanUp(); @@ -63,7 +70,6 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { public void cleanDatabasePartitions(String dbName) { partitionCache.asMap().keySet().stream().filter(k -> k.getDbName().equals(dbName)).collect(Collectors.toList()) .forEach(partitionCache::invalidate); - } @Override @@ -74,9 +80,34 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { .forEach(partitionCache::invalidate); } + public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table, + HoodieTableMetaClient tableMetaClient, String timestamp) { + Preconditions.checkState(catalogId == table.getCatalog().getId()); + Option partitionColumns = tableMetaClient.getTableConfig().getPartitionFields(); + if (!partitionColumns.isPresent()) { + return null; + } + HoodieTimeline timeline = tableMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); + Option lastInstant = timeline.lastInstant(); + if (!lastInstant.isPresent()) { + return null; + } + long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp()); + if (Long.parseLong(timestamp) == lastTimestamp) { + return getPartitionValues(table, tableMetaClient); + } + List partitionNames = getPartitionNamesBeforeOrEquals(timeline, timestamp); + List partitionColumnsList = Arrays.asList(partitionColumns.get()); + TablePartitionValues partitionValues = new TablePartitionValues(); + partitionValues.addPartitions(partitionNames, + partitionNames.stream().map(p -> parsePartitionValues(partitionColumnsList, p)) + .collect(Collectors.toList()), table.getPartitionColumnTypes()); + return partitionValues; + } + public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient) throws CacheException { - assert (catalogId == table.getCatalog().getId()); + Preconditions.checkState(catalogId == table.getCatalog().getId()); Option partitionColumns = tableMetaClient.getTableConfig().getPartitionFields(); if (!partitionColumns.isPresent()) { return null; @@ -93,10 +124,9 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { partitionValues.readLock().lock(); try { long lastUpdateTimestamp = partitionValues.getLastUpdateTimestamp(); - if (lastTimestamp == lastUpdateTimestamp) { + if (lastTimestamp <= lastUpdateTimestamp) { return partitionValues; } - assert (lastTimestamp > lastUpdateTimestamp); } finally { partitionValues.readLock().unlock(); } @@ -104,18 +134,12 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { partitionValues.writeLock().lock(); try { long lastUpdateTimestamp = partitionValues.getLastUpdateTimestamp(); - if (lastTimestamp == lastUpdateTimestamp) { + if (lastTimestamp <= lastUpdateTimestamp) { return partitionValues; } - assert (lastTimestamp > lastUpdateTimestamp); - List partitionNames; - if (lastUpdateTimestamp == 0) { - partitionNames = getAllPartitionNames(tableMetaClient); - } else { - partitionNames = getPartitionNamesInRange(timeline, String.valueOf(lastUpdateTimestamp), - String.valueOf(lastTimestamp)); - } + List partitionNames = getAllPartitionNames(tableMetaClient); List partitionColumnsList = Arrays.asList(partitionColumns.get()); + partitionValues.cleanPartitions(); partitionValues.addPartitions(partitionNames, partitionNames.stream().map(p -> parsePartitionValues(partitionColumnsList, p)) .collect(Collectors.toList()), table.getPartitionColumnTypes()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java index 3be3e1f080..807bb37d46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java @@ -18,7 +18,6 @@ package org.apache.doris.planner.external.hudi; import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -47,12 +46,8 @@ public abstract class HudiPartitionProcessor { } public List getAllPartitionNames(HoodieTableMetaClient tableMetaClient) throws IOException { - TypedProperties configProperties = new TypedProperties(); HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder() - .fromProperties(configProperties) - .enable(configProperties.getBoolean(HoodieMetadataConfig.ENABLE.key(), - HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS) - && HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient)) + .enable(HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient)) .build(); HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create( @@ -60,19 +55,29 @@ public abstract class HudiPartitionProcessor { tableMetaClient.getBasePathV2().toString(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true); - return newTableMetadata.getPartitionPathWithPathPrefixes(Collections.singletonList("")); + return newTableMetadata.getAllPartitionPaths(); + } + + public List getPartitionNamesBeforeOrEquals(HoodieTimeline timeline, String timestamp) { + return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths( + timeline.findInstantsBeforeOrEquals(timestamp).getInstants().stream().map(instant -> { + try { + return TimelineUtils.getCommitMetadata(instant, timeline); + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + }).collect(Collectors.toList()))); } public List getPartitionNamesInRange(HoodieTimeline timeline, String startTimestamp, String endTimestamp) { return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths( - timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants().stream() - .map(instant -> { - try { - return TimelineUtils.getCommitMetadata(instant, timeline); - } catch (IOException e) { - throw new RuntimeException(e.getMessage(), e); - } - }).collect(Collectors.toList()))); + timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants().stream().map(instant -> { + try { + return TimelineUtils.getCommitMetadata(instant, timeline); + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + }).collect(Collectors.toList()))); } public static List parsePartitionValues(List partitionColumns, String partitionPath) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java index 734b3943a9..20a3a1cf6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java @@ -61,6 +61,7 @@ import org.apache.hudi.common.util.Option; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; @@ -69,6 +70,9 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; public class HudiScanNode extends HiveScanNode { @@ -77,7 +81,7 @@ public class HudiScanNode extends HiveScanNode { private final boolean isCowTable; - private long noLogsSplitNum = 0; + private final AtomicLong noLogsSplitNum = new AtomicLong(0); /** * External file scan node for Query Hudi table @@ -147,12 +151,18 @@ public class HudiScanNode extends HiveScanNode { rangeDesc.setTableFormatParams(tableFormatFileDesc); } - private List getPrunedPartitions(HoodieTableMetaClient metaClient) throws AnalysisException { + private List getPrunedPartitions( + HoodieTableMetaClient metaClient, Option snapshotTimestamp) throws AnalysisException { List partitionColumnTypes = hmsTable.getPartitionColumnTypes(); if (!partitionColumnTypes.isEmpty()) { HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv() .getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog()); - TablePartitionValues partitionValues = processor.getPartitionValues(hmsTable, metaClient); + TablePartitionValues partitionValues; + if (snapshotTimestamp.isPresent()) { + partitionValues = processor.getSnapshotPartitionValues(hmsTable, metaClient, snapshotTimestamp.get()); + } else { + partitionValues = processor.getPartitionValues(hmsTable, metaClient); + } if (partitionValues != null) { // 2. prune partitions by expr partitionValues.readLock().lock(); @@ -215,7 +225,6 @@ public class HudiScanNode extends HiveScanNode { List allFields = hmsTable.getRemoteTable().getSd().getCols(); allFields.addAll(hmsTable.getRemoteTable().getPartitionKeys()); - List splits = new ArrayList<>(); for (Schema.Field hudiField : hudiSchema.getFields()) { String columnName = hudiField.name().toLowerCase(Locale.ROOT); // keep hive metastore column in hudi avro schema. @@ -231,14 +240,17 @@ public class HudiScanNode extends HiveScanNode { HoodieTimeline timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); String queryInstant; + Option snapshotTimestamp; if (desc.getRef().getTableSnapshot() != null) { queryInstant = desc.getRef().getTableSnapshot().getTime(); + snapshotTimestamp = Option.of(queryInstant); } else { Option snapshotInstant = timeline.lastInstant(); if (!snapshotInstant.isPresent()) { return Collections.emptyList(); } queryInstant = snapshotInstant.get().getTimestamp(); + snapshotTimestamp = Option.empty(); } // Non partition table will get one dummy partition UserGroupInformation ugi = HiveMetaStoreClientHelper.getUserGroupInformation( @@ -247,71 +259,79 @@ public class HudiScanNode extends HiveScanNode { if (ugi != null) { try { partitions = ugi.doAs( - (PrivilegedExceptionAction>) () -> getPrunedPartitions(hudiClient)); + (PrivilegedExceptionAction>) () -> getPrunedPartitions(hudiClient, + snapshotTimestamp)); } catch (Exception e) { throw new UserException(e); } } else { - partitions = getPrunedPartitions(hudiClient); + partitions = getPrunedPartitions(hudiClient, snapshotTimestamp); } - try { - for (HivePartition partition : partitions) { - String globPath; - String partitionName = ""; - if (partition.isDummyPartition()) { - globPath = hudiClient.getBasePathV2().toString() + "/*"; - } else { - partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(), - new Path(partition.getPath())); - globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName); - } - List statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(), - new Path(globPath)); - HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, - timeline, statuses.toArray(new FileStatus[0])); - - if (isCowTable) { - fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { - noLogsSplitNum++; - String filePath = baseFile.getPath(); - long fileSize = baseFile.getFileSize(); - FileSplit split = new FileSplit(new Path(filePath), 0, fileSize, fileSize, new String[0], - partition.getPartitionValues()); - splits.add(split); - }); - } else { - fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant) - .forEach(fileSlice -> { - Optional baseFile = fileSlice.getBaseFile().toJavaOptional(); - String filePath = baseFile.map(BaseFile::getPath).orElse(""); - long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L); - - List logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath) - .map(Path::toString) - .collect(Collectors.toList()); - if (logs.isEmpty()) { - noLogsSplitNum++; - } - - HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize, - new String[0], partition.getPartitionValues()); - split.setTableFormatType(TableFormatType.HUDI); - split.setDataFilePath(filePath); - split.setHudiDeltaLogs(logs); - split.setInputFormat(inputFormat); - split.setSerde(serdeLib); - split.setBasePath(basePath); - split.setHudiColumnNames(columnNames); - split.setHudiColumnTypes(columnTypes); - split.setInstantTime(queryInstant); - splits.add(split); - }); - } + Executor executor = ((HudiCachedPartitionProcessor) Env.getCurrentEnv() + .getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog())).getExecutor(); + List splits = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch countDownLatch = new CountDownLatch(partitions.size()); + partitions.forEach(partition -> executor.execute(() -> { + String globPath; + String partitionName = ""; + if (partition.isDummyPartition()) { + globPath = hudiClient.getBasePathV2().toString() + "/*"; + } else { + partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(), + new Path(partition.getPath())); + globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName); } - } catch (Exception e) { - String errorMsg = String.format("Failed to get hudi info on basePath: %s", basePath); - LOG.error(errorMsg, e); - throw new IllegalArgumentException(errorMsg, e); + List statuses; + try { + statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(), + new Path(globPath)); + } catch (IOException e) { + throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e); + } + HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, + timeline, statuses.toArray(new FileStatus[0])); + + if (isCowTable) { + fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { + noLogsSplitNum.incrementAndGet(); + String filePath = baseFile.getPath(); + long fileSize = baseFile.getFileSize(); + splits.add(new FileSplit(new Path(filePath), 0, fileSize, fileSize, new String[0], + partition.getPartitionValues())); + }); + } else { + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant).forEach(fileSlice -> { + Optional baseFile = fileSlice.getBaseFile().toJavaOptional(); + String filePath = baseFile.map(BaseFile::getPath).orElse(""); + long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L); + + List logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath) + .map(Path::toString) + .collect(Collectors.toList()); + if (logs.isEmpty()) { + noLogsSplitNum.incrementAndGet(); + } + + HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize, + new String[0], partition.getPartitionValues()); + split.setTableFormatType(TableFormatType.HUDI); + split.setDataFilePath(filePath); + split.setHudiDeltaLogs(logs); + split.setInputFormat(inputFormat); + split.setSerde(serdeLib); + split.setBasePath(basePath); + split.setHudiColumnNames(columnNames); + split.setHudiColumnTypes(columnTypes); + split.setInstantTime(queryInstant); + splits.add(split); + }); + } + countDownLatch.countDown(); + })); + try { + countDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e.getMessage(), e); } return splits; } @@ -319,6 +339,6 @@ public class HudiScanNode extends HiveScanNode { @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { return super.getNodeExplainString(prefix, detailLevel) - + String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum, inputSplitsNum); + + String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum.get(), inputSplitsNum); } }