From dcb165cc9fd5a05b317ee6943f27fc98a02a168e Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Tue, 18 Jul 2023 23:19:34 +0800 Subject: [PATCH] [opt](hudi) get hudi split concurrently by using parallelStream (#21871) This PR contains two optimizations: 1. Using parallel stream to get hoodie splits concurrently. It reduce the split time from 1min20s to 12s when splitting 10,000 partitions. 2. Reading hoodie meta table to get table partitions. It reduce the getting partition time from 12min to 3s when reading 10,000 partitions. --- .../external/TablePartitionValues.java | 2 +- .../hudi/HudiCachedPartitionProcessor.java | 50 ++++-- .../external/hudi/HudiPartitionProcessor.java | 35 +++-- .../planner/external/hudi/HudiScanNode.java | 148 ++++++++++-------- 4 files changed, 142 insertions(+), 93 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java index bcc967501c..87f11e5863 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java @@ -171,7 +171,7 @@ public class TablePartitionValues { return readWriteLock.writeLock(); } - private void cleanPartitions() { + public void cleanPartitions() { nextPartitionId = 0; idToPartitionItem.clear(); partitionNameToIdMap.clear(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java index ab6a8839b5..37225c2339 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java @@ -23,6 +23,7 @@ import org.apache.doris.datasource.CacheException; import org.apache.doris.planner.external.TablePartitionValues; import org.apache.doris.planner.external.TablePartitionValues.TablePartitionKey; +import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -39,10 +40,12 @@ import java.util.stream.Collectors; public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { private final long catalogId; + private final Executor executor; private final LoadingCache partitionCache; public HudiCachedPartitionProcessor(long catalogId, Executor executor) { this.catalogId = catalogId; + this.executor = executor; this.partitionCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num) .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) .build(CacheLoader.asyncReloading( @@ -54,6 +57,10 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { }, executor)); } + public Executor getExecutor() { + return executor; + } + @Override public void cleanUp() { partitionCache.cleanUp(); @@ -63,7 +70,6 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { public void cleanDatabasePartitions(String dbName) { partitionCache.asMap().keySet().stream().filter(k -> k.getDbName().equals(dbName)).collect(Collectors.toList()) .forEach(partitionCache::invalidate); - } @Override @@ -74,9 +80,34 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { .forEach(partitionCache::invalidate); } + public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table, + HoodieTableMetaClient tableMetaClient, String timestamp) { + Preconditions.checkState(catalogId == table.getCatalog().getId()); + Option partitionColumns = tableMetaClient.getTableConfig().getPartitionFields(); + if (!partitionColumns.isPresent()) { + return null; + } + HoodieTimeline timeline = tableMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); + Option lastInstant = timeline.lastInstant(); + if (!lastInstant.isPresent()) { + return null; + } + long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp()); + if (Long.parseLong(timestamp) == lastTimestamp) { + return getPartitionValues(table, tableMetaClient); + } + List partitionNames = getPartitionNamesBeforeOrEquals(timeline, timestamp); + List partitionColumnsList = Arrays.asList(partitionColumns.get()); + TablePartitionValues partitionValues = new TablePartitionValues(); + partitionValues.addPartitions(partitionNames, + partitionNames.stream().map(p -> parsePartitionValues(partitionColumnsList, p)) + .collect(Collectors.toList()), table.getPartitionColumnTypes()); + return partitionValues; + } + public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient) throws CacheException { - assert (catalogId == table.getCatalog().getId()); + Preconditions.checkState(catalogId == table.getCatalog().getId()); Option partitionColumns = tableMetaClient.getTableConfig().getPartitionFields(); if (!partitionColumns.isPresent()) { return null; @@ -93,10 +124,9 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { partitionValues.readLock().lock(); try { long lastUpdateTimestamp = partitionValues.getLastUpdateTimestamp(); - if (lastTimestamp == lastUpdateTimestamp) { + if (lastTimestamp <= lastUpdateTimestamp) { return partitionValues; } - assert (lastTimestamp > lastUpdateTimestamp); } finally { partitionValues.readLock().unlock(); } @@ -104,18 +134,12 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { partitionValues.writeLock().lock(); try { long lastUpdateTimestamp = partitionValues.getLastUpdateTimestamp(); - if (lastTimestamp == lastUpdateTimestamp) { + if (lastTimestamp <= lastUpdateTimestamp) { return partitionValues; } - assert (lastTimestamp > lastUpdateTimestamp); - List partitionNames; - if (lastUpdateTimestamp == 0) { - partitionNames = getAllPartitionNames(tableMetaClient); - } else { - partitionNames = getPartitionNamesInRange(timeline, String.valueOf(lastUpdateTimestamp), - String.valueOf(lastTimestamp)); - } + List partitionNames = getAllPartitionNames(tableMetaClient); List partitionColumnsList = Arrays.asList(partitionColumns.get()); + partitionValues.cleanPartitions(); partitionValues.addPartitions(partitionNames, partitionNames.stream().map(p -> parsePartitionValues(partitionColumnsList, p)) .collect(Collectors.toList()), table.getPartitionColumnTypes()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java index 3be3e1f080..807bb37d46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java @@ -18,7 +18,6 @@ package org.apache.doris.planner.external.hudi; import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -47,12 +46,8 @@ public abstract class HudiPartitionProcessor { } public List getAllPartitionNames(HoodieTableMetaClient tableMetaClient) throws IOException { - TypedProperties configProperties = new TypedProperties(); HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder() - .fromProperties(configProperties) - .enable(configProperties.getBoolean(HoodieMetadataConfig.ENABLE.key(), - HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS) - && HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient)) + .enable(HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient)) .build(); HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create( @@ -60,19 +55,29 @@ public abstract class HudiPartitionProcessor { tableMetaClient.getBasePathV2().toString(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true); - return newTableMetadata.getPartitionPathWithPathPrefixes(Collections.singletonList("")); + return newTableMetadata.getAllPartitionPaths(); + } + + public List getPartitionNamesBeforeOrEquals(HoodieTimeline timeline, String timestamp) { + return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths( + timeline.findInstantsBeforeOrEquals(timestamp).getInstants().stream().map(instant -> { + try { + return TimelineUtils.getCommitMetadata(instant, timeline); + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + }).collect(Collectors.toList()))); } public List getPartitionNamesInRange(HoodieTimeline timeline, String startTimestamp, String endTimestamp) { return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths( - timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants().stream() - .map(instant -> { - try { - return TimelineUtils.getCommitMetadata(instant, timeline); - } catch (IOException e) { - throw new RuntimeException(e.getMessage(), e); - } - }).collect(Collectors.toList()))); + timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants().stream().map(instant -> { + try { + return TimelineUtils.getCommitMetadata(instant, timeline); + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + }).collect(Collectors.toList()))); } public static List parsePartitionValues(List partitionColumns, String partitionPath) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java index 734b3943a9..20a3a1cf6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java @@ -61,6 +61,7 @@ import org.apache.hudi.common.util.Option; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; @@ -69,6 +70,9 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; public class HudiScanNode extends HiveScanNode { @@ -77,7 +81,7 @@ public class HudiScanNode extends HiveScanNode { private final boolean isCowTable; - private long noLogsSplitNum = 0; + private final AtomicLong noLogsSplitNum = new AtomicLong(0); /** * External file scan node for Query Hudi table @@ -147,12 +151,18 @@ public class HudiScanNode extends HiveScanNode { rangeDesc.setTableFormatParams(tableFormatFileDesc); } - private List getPrunedPartitions(HoodieTableMetaClient metaClient) throws AnalysisException { + private List getPrunedPartitions( + HoodieTableMetaClient metaClient, Option snapshotTimestamp) throws AnalysisException { List partitionColumnTypes = hmsTable.getPartitionColumnTypes(); if (!partitionColumnTypes.isEmpty()) { HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv() .getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog()); - TablePartitionValues partitionValues = processor.getPartitionValues(hmsTable, metaClient); + TablePartitionValues partitionValues; + if (snapshotTimestamp.isPresent()) { + partitionValues = processor.getSnapshotPartitionValues(hmsTable, metaClient, snapshotTimestamp.get()); + } else { + partitionValues = processor.getPartitionValues(hmsTable, metaClient); + } if (partitionValues != null) { // 2. prune partitions by expr partitionValues.readLock().lock(); @@ -215,7 +225,6 @@ public class HudiScanNode extends HiveScanNode { List allFields = hmsTable.getRemoteTable().getSd().getCols(); allFields.addAll(hmsTable.getRemoteTable().getPartitionKeys()); - List splits = new ArrayList<>(); for (Schema.Field hudiField : hudiSchema.getFields()) { String columnName = hudiField.name().toLowerCase(Locale.ROOT); // keep hive metastore column in hudi avro schema. @@ -231,14 +240,17 @@ public class HudiScanNode extends HiveScanNode { HoodieTimeline timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); String queryInstant; + Option snapshotTimestamp; if (desc.getRef().getTableSnapshot() != null) { queryInstant = desc.getRef().getTableSnapshot().getTime(); + snapshotTimestamp = Option.of(queryInstant); } else { Option snapshotInstant = timeline.lastInstant(); if (!snapshotInstant.isPresent()) { return Collections.emptyList(); } queryInstant = snapshotInstant.get().getTimestamp(); + snapshotTimestamp = Option.empty(); } // Non partition table will get one dummy partition UserGroupInformation ugi = HiveMetaStoreClientHelper.getUserGroupInformation( @@ -247,71 +259,79 @@ public class HudiScanNode extends HiveScanNode { if (ugi != null) { try { partitions = ugi.doAs( - (PrivilegedExceptionAction>) () -> getPrunedPartitions(hudiClient)); + (PrivilegedExceptionAction>) () -> getPrunedPartitions(hudiClient, + snapshotTimestamp)); } catch (Exception e) { throw new UserException(e); } } else { - partitions = getPrunedPartitions(hudiClient); + partitions = getPrunedPartitions(hudiClient, snapshotTimestamp); } - try { - for (HivePartition partition : partitions) { - String globPath; - String partitionName = ""; - if (partition.isDummyPartition()) { - globPath = hudiClient.getBasePathV2().toString() + "/*"; - } else { - partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(), - new Path(partition.getPath())); - globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName); - } - List statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(), - new Path(globPath)); - HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, - timeline, statuses.toArray(new FileStatus[0])); - - if (isCowTable) { - fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { - noLogsSplitNum++; - String filePath = baseFile.getPath(); - long fileSize = baseFile.getFileSize(); - FileSplit split = new FileSplit(new Path(filePath), 0, fileSize, fileSize, new String[0], - partition.getPartitionValues()); - splits.add(split); - }); - } else { - fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant) - .forEach(fileSlice -> { - Optional baseFile = fileSlice.getBaseFile().toJavaOptional(); - String filePath = baseFile.map(BaseFile::getPath).orElse(""); - long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L); - - List logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath) - .map(Path::toString) - .collect(Collectors.toList()); - if (logs.isEmpty()) { - noLogsSplitNum++; - } - - HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize, - new String[0], partition.getPartitionValues()); - split.setTableFormatType(TableFormatType.HUDI); - split.setDataFilePath(filePath); - split.setHudiDeltaLogs(logs); - split.setInputFormat(inputFormat); - split.setSerde(serdeLib); - split.setBasePath(basePath); - split.setHudiColumnNames(columnNames); - split.setHudiColumnTypes(columnTypes); - split.setInstantTime(queryInstant); - splits.add(split); - }); - } + Executor executor = ((HudiCachedPartitionProcessor) Env.getCurrentEnv() + .getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog())).getExecutor(); + List splits = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch countDownLatch = new CountDownLatch(partitions.size()); + partitions.forEach(partition -> executor.execute(() -> { + String globPath; + String partitionName = ""; + if (partition.isDummyPartition()) { + globPath = hudiClient.getBasePathV2().toString() + "/*"; + } else { + partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(), + new Path(partition.getPath())); + globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName); } - } catch (Exception e) { - String errorMsg = String.format("Failed to get hudi info on basePath: %s", basePath); - LOG.error(errorMsg, e); - throw new IllegalArgumentException(errorMsg, e); + List statuses; + try { + statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(), + new Path(globPath)); + } catch (IOException e) { + throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e); + } + HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, + timeline, statuses.toArray(new FileStatus[0])); + + if (isCowTable) { + fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { + noLogsSplitNum.incrementAndGet(); + String filePath = baseFile.getPath(); + long fileSize = baseFile.getFileSize(); + splits.add(new FileSplit(new Path(filePath), 0, fileSize, fileSize, new String[0], + partition.getPartitionValues())); + }); + } else { + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant).forEach(fileSlice -> { + Optional baseFile = fileSlice.getBaseFile().toJavaOptional(); + String filePath = baseFile.map(BaseFile::getPath).orElse(""); + long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L); + + List logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath) + .map(Path::toString) + .collect(Collectors.toList()); + if (logs.isEmpty()) { + noLogsSplitNum.incrementAndGet(); + } + + HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize, + new String[0], partition.getPartitionValues()); + split.setTableFormatType(TableFormatType.HUDI); + split.setDataFilePath(filePath); + split.setHudiDeltaLogs(logs); + split.setInputFormat(inputFormat); + split.setSerde(serdeLib); + split.setBasePath(basePath); + split.setHudiColumnNames(columnNames); + split.setHudiColumnTypes(columnTypes); + split.setInstantTime(queryInstant); + splits.add(split); + }); + } + countDownLatch.countDown(); + })); + try { + countDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e.getMessage(), e); } return splits; } @@ -319,6 +339,6 @@ public class HudiScanNode extends HiveScanNode { @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { return super.getNodeExplainString(prefix, detailLevel) - + String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum, inputSplitsNum); + + String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum.get(), inputSplitsNum); } }