[opt](catalog) merge scan range to avoid too many splits (#38311) (#38964)

bp #38311
This commit is contained in:
Mingyu Chen
2024-08-06 21:57:02 +08:00
committed by GitHub
parent 2540835b58
commit bc644cb253
15 changed files with 98 additions and 53 deletions

View File

@ -72,7 +72,7 @@ public class AlterDatabaseQuotaStmt extends DdlStmt {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
if (quotaType == QuotaType.DATA) {
quota = ParseUtil.analyzeDataVolumn(quotaValue);
quota = ParseUtil.analyzeDataVolume(quotaValue);
} else if (quotaType == QuotaType.REPLICA) {
quota = ParseUtil.analyzeReplicaNumber(quotaValue);
} else if (quotaType == QuotaType.TRANSACTION) {

View File

@ -116,7 +116,7 @@ public class CreateTableStmt extends DdlStmt {
distributionDesc.setBuckets(FeConstants.default_bucket_num);
} else {
long partitionSize = ParseUtil
.analyzeDataVolumn(newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE));
.analyzeDataVolume(newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE));
distributionDesc.setBuckets(AutoBucketUtils.getBucketsNum(partitionSize, Config.autobucket_min_buckets));
}

View File

@ -573,7 +573,7 @@ public class OutFileClause {
}
if (properties.containsKey(PROP_MAX_FILE_SIZE)) {
maxFileSizeBytes = ParseUtil.analyzeDataVolumn(properties.get(PROP_MAX_FILE_SIZE));
maxFileSizeBytes = ParseUtil.analyzeDataVolume(properties.get(PROP_MAX_FILE_SIZE));
if (maxFileSizeBytes > MAX_FILE_SIZE_BYTES || maxFileSizeBytes < MIN_FILE_SIZE_BYTES) {
throw new AnalysisException("max file size should between 5MB and 2GB. Given: " + maxFileSizeBytes);
}

View File

@ -179,12 +179,22 @@ public class SetVar {
this.result = (LiteralExpr) this.value;
}
if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT)) {
this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getResult().getStringValue())));
if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT)
|| getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT)) {
this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolume(getResult().getStringValue())));
this.result = (LiteralExpr) this.value;
}
if (getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT)) {
this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getResult().getStringValue())));
if (getVariable().equalsIgnoreCase(SessionVariable.FILE_SPLIT_SIZE)) {
try {
this.value = new StringLiteral(
Long.toString(ParseUtil.analyzeDataVolume(getResult().getStringValue())));
} catch (Throwable t) {
// The way of handling file_split_size should be same as exec_mem_limit or scan_queue_mem_limit.
// But ParseUtil.analyzeDataVolume() does not accept 0 as a valid value.
// So for compatibility, we set origin value to file_split_size
// when the value is 0 or other invalid value.
this.value = new StringLiteral(getResult().getStringValue());
}
this.result = (LiteralExpr) this.value;
}
if (getVariable().equalsIgnoreCase("is_report_success")) {

View File

@ -41,7 +41,7 @@ public class ParseUtil {
private static Pattern dataVolumnPattern = Pattern.compile("(\\d+)(\\D*)");
public static long analyzeDataVolumn(String dataVolumnStr) throws AnalysisException {
public static long analyzeDataVolume(String dataVolumnStr) throws AnalysisException {
long dataVolumn = 0;
Matcher m = dataVolumnPattern.matcher(dataVolumnStr);
if (m.matches()) {

View File

@ -26,7 +26,6 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.FileSplit.FileSplitCreator;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
@ -63,7 +62,7 @@ import java.util.Map;
public abstract class FileScanNode extends ExternalScanNode {
private static final Logger LOG = LogManager.getLogger(FileScanNode.class);
public static final long DEFAULT_SPLIT_SIZE = 8 * 1024 * 1024; // 8MB
public static final long DEFAULT_SPLIT_SIZE = 64 * 1024 * 1024; // 64MB
// For explain
protected long inputSplitsNum = 0;
@ -71,6 +70,7 @@ public abstract class FileScanNode extends ExternalScanNode {
protected long totalPartitionNum = 0;
protected long readPartitionNum = 0;
protected long fileSplitSize;
protected boolean isSplitSizeSetBySession = false;
public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType,
boolean needCheckColumnPriv) {
@ -80,7 +80,15 @@ public abstract class FileScanNode extends ExternalScanNode {
@Override
public void init() throws UserException {
initFileSplitSize();
}
private void initFileSplitSize() {
this.fileSplitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
this.isSplitSizeSetBySession = this.fileSplitSize > 0;
if (this.fileSplitSize <= 0) {
this.fileSplitSize = DEFAULT_SPLIT_SIZE;
}
}
@Override
@ -235,12 +243,6 @@ public abstract class FileScanNode extends ExternalScanNode {
}
}
protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length,
long modificationTime, boolean splittable, List<String> partitionValues) throws IOException {
return splitFile(path, blockSize, blockLocations, length, modificationTime, splittable, partitionValues,
FileSplitCreator.DEFAULT);
}
protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length,
long modificationTime, boolean splittable, List<String> partitionValues, SplitCreator splitCreator)
throws IOException {
@ -257,11 +259,11 @@ public abstract class FileScanNode extends ExternalScanNode {
result.add(splitCreator.create(path, 0, length, length, modificationTime, hosts, partitionValues));
return result;
}
if (fileSplitSize <= 0) {
fileSplitSize = blockSize;
// if file split size is set by session variable, use session variable.
// Otherwise, use max(file split size, block size)
if (!isSplitSizeSetBySession) {
fileSplitSize = Math.max(fileSplitSize, blockSize);
}
// Min split size is DEFAULT_SPLIT_SIZE(128MB).
fileSplitSize = Math.max(fileSplitSize, DEFAULT_SPLIT_SIZE);
long bytesRemaining;
for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D;
bytesRemaining -= fileSplitSize) {

View File

@ -35,7 +35,6 @@ import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.util.CacheBulkLoader;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.fs.FileSystemCache;
@ -512,8 +511,7 @@ public class HiveMetaStoreCache {
if (LOG.isDebugEnabled()) {
LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms",
fileLists.stream().mapToInt(l -> l.getFiles() == null
? (l.getSplits() == null ? 0 : l.getSplits().size()) : l.getFiles().size()).sum(),
fileLists.stream().mapToInt(l -> l.getFiles() == null ? 0 : l.getFiles().size()).sum(),
partitions.size(), catalog.getName(), (System.currentTimeMillis() - start));
}
return fileLists;
@ -992,9 +990,6 @@ public class HiveMetaStoreCache {
public static class FileCacheValue {
// File Cache for self splitter.
private final List<HiveFileStatus> files = Lists.newArrayList();
// File split cache for old splitter. This is a temp variable.
@Deprecated
private final List<FileSplit> splits = Lists.newArrayList();
private boolean isSplittable;
// The values of partitions.
// e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile
@ -1015,13 +1010,6 @@ public class HiveMetaStoreCache {
}
}
@Deprecated
public void addSplit(FileSplit split) {
if (isFileVisible(split.getPath())) {
splits.add(split);
}
}
public int getValuesSize() {
return partitionValues == null ? 0 : partitionValues.size();
}

View File

@ -320,10 +320,6 @@ public class HiveScanNode extends FileQueryScanNode {
return;
}
for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
// This if branch is to support old splitter, will remove later.
if (fileCacheValue.getSplits() != null) {
allFiles.addAll(fileCacheValue.getSplits());
}
if (fileCacheValue.getFiles() != null) {
boolean isSplittable = fileCacheValue.isSplittable();
for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) {

View File

@ -37,7 +37,6 @@ import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
@ -205,14 +204,12 @@ public class IcebergScanNode extends FileQueryScanNode {
// get splits
List<Split> splits = new ArrayList<>();
int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
// Min split size is DEFAULT_SPLIT_SIZE(128MB).
long splitSize = Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), DEFAULT_SPLIT_SIZE);
HashSet<String> partitionPathSet = new HashSet<>();
boolean isPartitionedTable = icebergTable.spec().isPartitioned();
CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize);
CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), fileSplitSize);
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
TableScanUtil.planTasks(fileScanTasks, fileSplitSize, 1, 0)) {
combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> {
String dataFilePath = normalizeLocation(splitTask.file().path().toString());

View File

@ -27,6 +27,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.FileSplit.FileSplitCreator;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
@ -139,7 +140,8 @@ public class TVFScanNode extends FileQueryScanNode {
Path path = new Path(fileStatus.getPath());
try {
splits.addAll(splitFile(path, fileStatus.getBlockSize(), null, fileStatus.getSize(),
fileStatus.getModificationTime(), fileStatus.isSplitable, null));
fileStatus.getModificationTime(), fileStatus.isSplitable, null,
FileSplitCreator.DEFAULT));
} catch (IOException e) {
LOG.warn("get file split failed for TVF: {}", path, e);
throw new UserException(e);

View File

@ -654,7 +654,7 @@ public class CreateTableInfo {
if (!newProperties.containsKey(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)) {
distributionDesc.updateBucketNum(FeConstants.default_bucket_num);
} else {
long partitionSize = ParseUtil.analyzeDataVolumn(
long partitionSize = ParseUtil.analyzeDataVolume(
newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE));
distributionDesc.updateBucketNum(AutoBucketUtils.getBucketsNum(partitionSize,
Config.autobucket_min_buckets));