[opt](hudi) get hudi split concurrently by using parallelStream (#21871)
This PR contains two optimizations: 1. Using parallel stream to get hoodie splits concurrently. It reduce the split time from 1min20s to 12s when splitting 10,000 partitions. 2. Reading hoodie meta table to get table partitions. It reduce the getting partition time from 12min to 3s when reading 10,000 partitions.
This commit is contained in:
@ -171,7 +171,7 @@ public class TablePartitionValues {
|
||||
return readWriteLock.writeLock();
|
||||
}
|
||||
|
||||
private void cleanPartitions() {
|
||||
public void cleanPartitions() {
|
||||
nextPartitionId = 0;
|
||||
idToPartitionItem.clear();
|
||||
partitionNameToIdMap.clear();
|
||||
|
||||
@ -23,6 +23,7 @@ import org.apache.doris.datasource.CacheException;
|
||||
import org.apache.doris.planner.external.TablePartitionValues;
|
||||
import org.apache.doris.planner.external.TablePartitionValues.TablePartitionKey;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
@ -39,10 +40,12 @@ import java.util.stream.Collectors;
|
||||
|
||||
public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
|
||||
private final long catalogId;
|
||||
private final Executor executor;
|
||||
private final LoadingCache<TablePartitionKey, TablePartitionValues> partitionCache;
|
||||
|
||||
public HudiCachedPartitionProcessor(long catalogId, Executor executor) {
|
||||
this.catalogId = catalogId;
|
||||
this.executor = executor;
|
||||
this.partitionCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num)
|
||||
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES)
|
||||
.build(CacheLoader.asyncReloading(
|
||||
@ -54,6 +57,10 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
|
||||
}, executor));
|
||||
}
|
||||
|
||||
public Executor getExecutor() {
|
||||
return executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanUp() {
|
||||
partitionCache.cleanUp();
|
||||
@ -63,7 +70,6 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
|
||||
public void cleanDatabasePartitions(String dbName) {
|
||||
partitionCache.asMap().keySet().stream().filter(k -> k.getDbName().equals(dbName)).collect(Collectors.toList())
|
||||
.forEach(partitionCache::invalidate);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -74,9 +80,34 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
|
||||
.forEach(partitionCache::invalidate);
|
||||
}
|
||||
|
||||
public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table,
|
||||
HoodieTableMetaClient tableMetaClient, String timestamp) {
|
||||
Preconditions.checkState(catalogId == table.getCatalog().getId());
|
||||
Option<String[]> partitionColumns = tableMetaClient.getTableConfig().getPartitionFields();
|
||||
if (!partitionColumns.isPresent()) {
|
||||
return null;
|
||||
}
|
||||
HoodieTimeline timeline = tableMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
|
||||
Option<HoodieInstant> lastInstant = timeline.lastInstant();
|
||||
if (!lastInstant.isPresent()) {
|
||||
return null;
|
||||
}
|
||||
long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp());
|
||||
if (Long.parseLong(timestamp) == lastTimestamp) {
|
||||
return getPartitionValues(table, tableMetaClient);
|
||||
}
|
||||
List<String> partitionNames = getPartitionNamesBeforeOrEquals(timeline, timestamp);
|
||||
List<String> partitionColumnsList = Arrays.asList(partitionColumns.get());
|
||||
TablePartitionValues partitionValues = new TablePartitionValues();
|
||||
partitionValues.addPartitions(partitionNames,
|
||||
partitionNames.stream().map(p -> parsePartitionValues(partitionColumnsList, p))
|
||||
.collect(Collectors.toList()), table.getPartitionColumnTypes());
|
||||
return partitionValues;
|
||||
}
|
||||
|
||||
public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient)
|
||||
throws CacheException {
|
||||
assert (catalogId == table.getCatalog().getId());
|
||||
Preconditions.checkState(catalogId == table.getCatalog().getId());
|
||||
Option<String[]> partitionColumns = tableMetaClient.getTableConfig().getPartitionFields();
|
||||
if (!partitionColumns.isPresent()) {
|
||||
return null;
|
||||
@ -93,10 +124,9 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
|
||||
partitionValues.readLock().lock();
|
||||
try {
|
||||
long lastUpdateTimestamp = partitionValues.getLastUpdateTimestamp();
|
||||
if (lastTimestamp == lastUpdateTimestamp) {
|
||||
if (lastTimestamp <= lastUpdateTimestamp) {
|
||||
return partitionValues;
|
||||
}
|
||||
assert (lastTimestamp > lastUpdateTimestamp);
|
||||
} finally {
|
||||
partitionValues.readLock().unlock();
|
||||
}
|
||||
@ -104,18 +134,12 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
|
||||
partitionValues.writeLock().lock();
|
||||
try {
|
||||
long lastUpdateTimestamp = partitionValues.getLastUpdateTimestamp();
|
||||
if (lastTimestamp == lastUpdateTimestamp) {
|
||||
if (lastTimestamp <= lastUpdateTimestamp) {
|
||||
return partitionValues;
|
||||
}
|
||||
assert (lastTimestamp > lastUpdateTimestamp);
|
||||
List<String> partitionNames;
|
||||
if (lastUpdateTimestamp == 0) {
|
||||
partitionNames = getAllPartitionNames(tableMetaClient);
|
||||
} else {
|
||||
partitionNames = getPartitionNamesInRange(timeline, String.valueOf(lastUpdateTimestamp),
|
||||
String.valueOf(lastTimestamp));
|
||||
}
|
||||
List<String> partitionNames = getAllPartitionNames(tableMetaClient);
|
||||
List<String> partitionColumnsList = Arrays.asList(partitionColumns.get());
|
||||
partitionValues.cleanPartitions();
|
||||
partitionValues.addPartitions(partitionNames,
|
||||
partitionNames.stream().map(p -> parsePartitionValues(partitionColumnsList, p))
|
||||
.collect(Collectors.toList()), table.getPartitionColumnTypes());
|
||||
|
||||
@ -18,7 +18,6 @@
|
||||
package org.apache.doris.planner.external.hudi;
|
||||
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
@ -47,12 +46,8 @@ public abstract class HudiPartitionProcessor {
|
||||
}
|
||||
|
||||
public List<String> getAllPartitionNames(HoodieTableMetaClient tableMetaClient) throws IOException {
|
||||
TypedProperties configProperties = new TypedProperties();
|
||||
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
|
||||
.fromProperties(configProperties)
|
||||
.enable(configProperties.getBoolean(HoodieMetadataConfig.ENABLE.key(),
|
||||
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS)
|
||||
&& HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient))
|
||||
.enable(HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient))
|
||||
.build();
|
||||
|
||||
HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(
|
||||
@ -60,19 +55,29 @@ public abstract class HudiPartitionProcessor {
|
||||
tableMetaClient.getBasePathV2().toString(),
|
||||
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true);
|
||||
|
||||
return newTableMetadata.getPartitionPathWithPathPrefixes(Collections.singletonList(""));
|
||||
return newTableMetadata.getAllPartitionPaths();
|
||||
}
|
||||
|
||||
public List<String> getPartitionNamesBeforeOrEquals(HoodieTimeline timeline, String timestamp) {
|
||||
return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths(
|
||||
timeline.findInstantsBeforeOrEquals(timestamp).getInstants().stream().map(instant -> {
|
||||
try {
|
||||
return TimelineUtils.getCommitMetadata(instant, timeline);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}).collect(Collectors.toList())));
|
||||
}
|
||||
|
||||
public List<String> getPartitionNamesInRange(HoodieTimeline timeline, String startTimestamp, String endTimestamp) {
|
||||
return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths(
|
||||
timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants().stream()
|
||||
.map(instant -> {
|
||||
try {
|
||||
return TimelineUtils.getCommitMetadata(instant, timeline);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}).collect(Collectors.toList())));
|
||||
timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants().stream().map(instant -> {
|
||||
try {
|
||||
return TimelineUtils.getCommitMetadata(instant, timeline);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}).collect(Collectors.toList())));
|
||||
}
|
||||
|
||||
public static List<String> parsePartitionValues(List<String> partitionColumns, String partitionPath) {
|
||||
|
||||
@ -61,6 +61,7 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
@ -69,6 +70,9 @@ import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class HudiScanNode extends HiveScanNode {
|
||||
@ -77,7 +81,7 @@ public class HudiScanNode extends HiveScanNode {
|
||||
|
||||
private final boolean isCowTable;
|
||||
|
||||
private long noLogsSplitNum = 0;
|
||||
private final AtomicLong noLogsSplitNum = new AtomicLong(0);
|
||||
|
||||
/**
|
||||
* External file scan node for Query Hudi table
|
||||
@ -147,12 +151,18 @@ public class HudiScanNode extends HiveScanNode {
|
||||
rangeDesc.setTableFormatParams(tableFormatFileDesc);
|
||||
}
|
||||
|
||||
private List<HivePartition> getPrunedPartitions(HoodieTableMetaClient metaClient) throws AnalysisException {
|
||||
private List<HivePartition> getPrunedPartitions(
|
||||
HoodieTableMetaClient metaClient, Option<String> snapshotTimestamp) throws AnalysisException {
|
||||
List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
|
||||
if (!partitionColumnTypes.isEmpty()) {
|
||||
HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv()
|
||||
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog());
|
||||
TablePartitionValues partitionValues = processor.getPartitionValues(hmsTable, metaClient);
|
||||
TablePartitionValues partitionValues;
|
||||
if (snapshotTimestamp.isPresent()) {
|
||||
partitionValues = processor.getSnapshotPartitionValues(hmsTable, metaClient, snapshotTimestamp.get());
|
||||
} else {
|
||||
partitionValues = processor.getPartitionValues(hmsTable, metaClient);
|
||||
}
|
||||
if (partitionValues != null) {
|
||||
// 2. prune partitions by expr
|
||||
partitionValues.readLock().lock();
|
||||
@ -215,7 +225,6 @@ public class HudiScanNode extends HiveScanNode {
|
||||
List<FieldSchema> allFields = hmsTable.getRemoteTable().getSd().getCols();
|
||||
allFields.addAll(hmsTable.getRemoteTable().getPartitionKeys());
|
||||
|
||||
List<Split> splits = new ArrayList<>();
|
||||
for (Schema.Field hudiField : hudiSchema.getFields()) {
|
||||
String columnName = hudiField.name().toLowerCase(Locale.ROOT);
|
||||
// keep hive metastore column in hudi avro schema.
|
||||
@ -231,14 +240,17 @@ public class HudiScanNode extends HiveScanNode {
|
||||
|
||||
HoodieTimeline timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
|
||||
String queryInstant;
|
||||
Option<String> snapshotTimestamp;
|
||||
if (desc.getRef().getTableSnapshot() != null) {
|
||||
queryInstant = desc.getRef().getTableSnapshot().getTime();
|
||||
snapshotTimestamp = Option.of(queryInstant);
|
||||
} else {
|
||||
Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
|
||||
if (!snapshotInstant.isPresent()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
queryInstant = snapshotInstant.get().getTimestamp();
|
||||
snapshotTimestamp = Option.empty();
|
||||
}
|
||||
// Non partition table will get one dummy partition
|
||||
UserGroupInformation ugi = HiveMetaStoreClientHelper.getUserGroupInformation(
|
||||
@ -247,71 +259,79 @@ public class HudiScanNode extends HiveScanNode {
|
||||
if (ugi != null) {
|
||||
try {
|
||||
partitions = ugi.doAs(
|
||||
(PrivilegedExceptionAction<List<HivePartition>>) () -> getPrunedPartitions(hudiClient));
|
||||
(PrivilegedExceptionAction<List<HivePartition>>) () -> getPrunedPartitions(hudiClient,
|
||||
snapshotTimestamp));
|
||||
} catch (Exception e) {
|
||||
throw new UserException(e);
|
||||
}
|
||||
} else {
|
||||
partitions = getPrunedPartitions(hudiClient);
|
||||
partitions = getPrunedPartitions(hudiClient, snapshotTimestamp);
|
||||
}
|
||||
try {
|
||||
for (HivePartition partition : partitions) {
|
||||
String globPath;
|
||||
String partitionName = "";
|
||||
if (partition.isDummyPartition()) {
|
||||
globPath = hudiClient.getBasePathV2().toString() + "/*";
|
||||
} else {
|
||||
partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
|
||||
new Path(partition.getPath()));
|
||||
globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName);
|
||||
}
|
||||
List<FileStatus> statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
|
||||
new Path(globPath));
|
||||
HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
|
||||
timeline, statuses.toArray(new FileStatus[0]));
|
||||
|
||||
if (isCowTable) {
|
||||
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> {
|
||||
noLogsSplitNum++;
|
||||
String filePath = baseFile.getPath();
|
||||
long fileSize = baseFile.getFileSize();
|
||||
FileSplit split = new FileSplit(new Path(filePath), 0, fileSize, fileSize, new String[0],
|
||||
partition.getPartitionValues());
|
||||
splits.add(split);
|
||||
});
|
||||
} else {
|
||||
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
|
||||
.forEach(fileSlice -> {
|
||||
Optional<HoodieBaseFile> baseFile = fileSlice.getBaseFile().toJavaOptional();
|
||||
String filePath = baseFile.map(BaseFile::getPath).orElse("");
|
||||
long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L);
|
||||
|
||||
List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath)
|
||||
.map(Path::toString)
|
||||
.collect(Collectors.toList());
|
||||
if (logs.isEmpty()) {
|
||||
noLogsSplitNum++;
|
||||
}
|
||||
|
||||
HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize,
|
||||
new String[0], partition.getPartitionValues());
|
||||
split.setTableFormatType(TableFormatType.HUDI);
|
||||
split.setDataFilePath(filePath);
|
||||
split.setHudiDeltaLogs(logs);
|
||||
split.setInputFormat(inputFormat);
|
||||
split.setSerde(serdeLib);
|
||||
split.setBasePath(basePath);
|
||||
split.setHudiColumnNames(columnNames);
|
||||
split.setHudiColumnTypes(columnTypes);
|
||||
split.setInstantTime(queryInstant);
|
||||
splits.add(split);
|
||||
});
|
||||
}
|
||||
Executor executor = ((HudiCachedPartitionProcessor) Env.getCurrentEnv()
|
||||
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog())).getExecutor();
|
||||
List<Split> splits = Collections.synchronizedList(new ArrayList<>());
|
||||
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
|
||||
partitions.forEach(partition -> executor.execute(() -> {
|
||||
String globPath;
|
||||
String partitionName = "";
|
||||
if (partition.isDummyPartition()) {
|
||||
globPath = hudiClient.getBasePathV2().toString() + "/*";
|
||||
} else {
|
||||
partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
|
||||
new Path(partition.getPath()));
|
||||
globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
String errorMsg = String.format("Failed to get hudi info on basePath: %s", basePath);
|
||||
LOG.error(errorMsg, e);
|
||||
throw new IllegalArgumentException(errorMsg, e);
|
||||
List<FileStatus> statuses;
|
||||
try {
|
||||
statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
|
||||
new Path(globPath));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e);
|
||||
}
|
||||
HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
|
||||
timeline, statuses.toArray(new FileStatus[0]));
|
||||
|
||||
if (isCowTable) {
|
||||
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> {
|
||||
noLogsSplitNum.incrementAndGet();
|
||||
String filePath = baseFile.getPath();
|
||||
long fileSize = baseFile.getFileSize();
|
||||
splits.add(new FileSplit(new Path(filePath), 0, fileSize, fileSize, new String[0],
|
||||
partition.getPartitionValues()));
|
||||
});
|
||||
} else {
|
||||
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant).forEach(fileSlice -> {
|
||||
Optional<HoodieBaseFile> baseFile = fileSlice.getBaseFile().toJavaOptional();
|
||||
String filePath = baseFile.map(BaseFile::getPath).orElse("");
|
||||
long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L);
|
||||
|
||||
List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath)
|
||||
.map(Path::toString)
|
||||
.collect(Collectors.toList());
|
||||
if (logs.isEmpty()) {
|
||||
noLogsSplitNum.incrementAndGet();
|
||||
}
|
||||
|
||||
HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize,
|
||||
new String[0], partition.getPartitionValues());
|
||||
split.setTableFormatType(TableFormatType.HUDI);
|
||||
split.setDataFilePath(filePath);
|
||||
split.setHudiDeltaLogs(logs);
|
||||
split.setInputFormat(inputFormat);
|
||||
split.setSerde(serdeLib);
|
||||
split.setBasePath(basePath);
|
||||
split.setHudiColumnNames(columnNames);
|
||||
split.setHudiColumnTypes(columnTypes);
|
||||
split.setInstantTime(queryInstant);
|
||||
splits.add(split);
|
||||
});
|
||||
}
|
||||
countDownLatch.countDown();
|
||||
}));
|
||||
try {
|
||||
countDownLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
return splits;
|
||||
}
|
||||
@ -319,6 +339,6 @@ public class HudiScanNode extends HiveScanNode {
|
||||
@Override
|
||||
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
|
||||
return super.getNodeExplainString(prefix, detailLevel)
|
||||
+ String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum, inputSplitsNum);
|
||||
+ String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum.get(), inputSplitsNum);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user