[Improvement](multi catalog)Move split size config to session variable (#18355)
Move split size config to session variable. Before, it was in Config class, user need to restart FE after change it.
This commit is contained in:
@ -23,7 +23,7 @@ import org.apache.doris.common.UserException;
|
||||
import java.util.List;
|
||||
|
||||
public interface Splitter {
|
||||
static final long DEFAULT_SPLIT_SIZE = 32 * 1024 * 1024; // 32mb
|
||||
static final long DEFAULT_SPLIT_SIZE = 128 * 1024 * 1024; // 128MB
|
||||
|
||||
List<Split> getSplits(List<Expr> exprs) throws UserException;
|
||||
}
|
||||
|
||||
@ -17,8 +17,9 @@
|
||||
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.common.Config;
|
||||
|
||||
/**
|
||||
* TODO: This class would be used later for split assignment.
|
||||
*/
|
||||
public class FileSplitStrategy {
|
||||
private long totalSplitSize;
|
||||
private int splitNum;
|
||||
@ -34,7 +35,7 @@ public class FileSplitStrategy {
|
||||
}
|
||||
|
||||
public boolean hasNext() {
|
||||
return totalSplitSize >= Config.file_scan_node_split_size || splitNum >= Config.file_scan_node_split_num;
|
||||
return true;
|
||||
}
|
||||
|
||||
public void next() {
|
||||
|
||||
@ -23,7 +23,6 @@ import org.apache.doris.catalog.ListPartitionItem;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
@ -34,6 +33,7 @@ import org.apache.doris.planner.ColumnRange;
|
||||
import org.apache.doris.planner.ListPartitionPrunerV2;
|
||||
import org.apache.doris.planner.Split;
|
||||
import org.apache.doris.planner.Splitter;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -174,13 +174,14 @@ public class HiveSplitter implements Splitter {
|
||||
}
|
||||
return splits.toArray(new FileSplit[splits.size()]);
|
||||
}
|
||||
long splitSize = Config.file_split_size;
|
||||
boolean useDefaultBlockSize = (splitSize <= 0);
|
||||
long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
|
||||
while (locatedFileStatusRemoteIterator.hasNext()) {
|
||||
LocatedFileStatus status = locatedFileStatusRemoteIterator.next();
|
||||
if (useDefaultBlockSize) {
|
||||
splitSize = status.getBlockSize() > 0 ? status.getBlockSize() : DEFAULT_SPLIT_SIZE;
|
||||
if (splitSize <= 0) {
|
||||
splitSize = status.getBlockSize();
|
||||
}
|
||||
// Min split size is DEFAULT_SPLIT_SIZE(128MB).
|
||||
splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE;
|
||||
BlockLocation[] blockLocations = status.getBlockLocations();
|
||||
long length = status.getLen();
|
||||
long bytesRemaining;
|
||||
|
||||
@ -104,11 +104,9 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
|
||||
} else if (locationType == TFileType.FILE_S3) {
|
||||
context.params.setProperties(locationProperties);
|
||||
}
|
||||
TScanRangeLocations curLocations = newLocations(context.params, backendPolicy);
|
||||
|
||||
FileSplitStrategy fileSplitStrategy = new FileSplitStrategy();
|
||||
|
||||
for (Split split : inputSplits) {
|
||||
TScanRangeLocations curLocations = newLocations(context.params, backendPolicy);
|
||||
FileSplit fileSplit = (FileSplit) split;
|
||||
List<String> pathPartitionKeys = getPathPartitionKeys();
|
||||
List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
|
||||
@ -124,18 +122,8 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
|
||||
LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}",
|
||||
curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(),
|
||||
fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts()));
|
||||
|
||||
fileSplitStrategy.update(fileSplit);
|
||||
// Add a new location when it's can be split
|
||||
if (fileSplitStrategy.hasNext()) {
|
||||
scanRangeLocations.add(curLocations);
|
||||
curLocations = newLocations(context.params, backendPolicy);
|
||||
fileSplitStrategy.next();
|
||||
}
|
||||
this.inputFileSize += fileSplit.getLength();
|
||||
}
|
||||
if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) {
|
||||
scanRangeLocations.add(curLocations);
|
||||
this.inputFileSize += fileSplit.getLength();
|
||||
}
|
||||
LOG.debug("create #{} ScanRangeLocations cost: {} ms",
|
||||
scanRangeLocations.size(), (System.currentTimeMillis() - start));
|
||||
|
||||
@ -18,10 +18,10 @@
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.planner.Split;
|
||||
import org.apache.doris.planner.Splitter;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
|
||||
import org.apache.doris.thrift.TBrokerFileStatus;
|
||||
|
||||
@ -50,10 +50,12 @@ public class TVFSplitter implements Splitter {
|
||||
long fileLength = fileStatus.getSize();
|
||||
Path path = new Path(fileStatus.getPath());
|
||||
if (fileStatus.isSplitable) {
|
||||
long splitSize = Config.file_split_size;
|
||||
long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
|
||||
if (splitSize <= 0) {
|
||||
splitSize = fileStatus.getBlockSize() > 0 ? fileStatus.getBlockSize() : DEFAULT_SPLIT_SIZE;
|
||||
splitSize = fileStatus.getBlockSize();
|
||||
}
|
||||
// Min split size is DEFAULT_SPLIT_SIZE(128MB).
|
||||
splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE;
|
||||
addFileSplits(path, fileLength, splitSize, splits);
|
||||
} else {
|
||||
Split split = new FileSplit(path, 0, fileLength, fileLength, new String[0]);
|
||||
|
||||
@ -295,6 +295,9 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
public static final String DRY_RUN_QUERY = "dry_run_query";
|
||||
|
||||
// Split size for ExternalFileScanNode. Default value 0 means use the block size of HDFS/S3.
|
||||
public static final String FILE_SPLIT_SIZE = "file_split_size";
|
||||
|
||||
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
|
||||
SKIP_DELETE_PREDICATE,
|
||||
SKIP_DELETE_BITMAP,
|
||||
@ -790,6 +793,9 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = DRY_RUN_QUERY, needForward = true)
|
||||
public boolean dryRunQuery = false;
|
||||
|
||||
@VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true)
|
||||
public long fileSplitSize = 0;
|
||||
|
||||
// If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables,
|
||||
// not the default value set in the code.
|
||||
public void initFuzzyModeVariables() {
|
||||
@ -1363,6 +1369,14 @@ public class SessionVariable implements Serializable, Writable {
|
||||
return enableCboStatistics;
|
||||
}
|
||||
|
||||
public long getFileSplitSize() {
|
||||
return fileSplitSize;
|
||||
}
|
||||
|
||||
public void setFileSplitSize(long fileSplitSize) {
|
||||
this.fileSplitSize = fileSplitSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* getInsertVisibleTimeoutMs.
|
||||
**/
|
||||
|
||||
Reference in New Issue
Block a user