bp #45148 ### What problem does this PR solve? Problem Summary: Optimize reading of maxcompute partition tables: 1. Introduce batch mode to generate splits for Maxcompute partition tables to optimize scenarios with a large number of partitions. Control it through the variable `num_partitions_in_batch_mode`. 2. Introduce catalog parameter `mc.split_cross_partition`. The parameter is true, which is more friendly to reading partition tables, and false, which is more friendly to debug. 3. Add `-Darrow.enable_null_check_for_get=false` to be jvm to improve the efficiency of mc arrow data conversion.
This commit is contained in:
@ -21,13 +21,13 @@ CUR_DATE=`date +%Y%m%d-%H%M%S`
|
||||
LOG_DIR="${DORIS_HOME}/log/"
|
||||
|
||||
# For jdk 8
|
||||
JAVA_OPTS="-Dfile.encoding=UTF-8 -Xmx2048m -DlogPath=$LOG_DIR/jni.log -Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives"
|
||||
JAVA_OPTS="-Dfile.encoding=UTF-8 -Xmx2048m -DlogPath=$LOG_DIR/jni.log -Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives -Darrow.enable_null_check_for_get=false"
|
||||
|
||||
# For jdk 9+, this JAVA_OPTS will be used as default JVM options
|
||||
JAVA_OPTS_FOR_JDK_9="-Dfile.encoding=UTF-8 -Xmx2048m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$LOG_DIR/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives --add-opens=java.base/java.nio=ALL-UNNAMED"
|
||||
JAVA_OPTS_FOR_JDK_9="-Dfile.encoding=UTF-8 -Xmx2048m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$LOG_DIR/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives --add-opens=java.base/java.nio=ALL-UNNAMED -Darrow.enable_null_check_for_get=false"
|
||||
|
||||
# For jdk 17+, this JAVA_OPTS will be used as default JVM options
|
||||
JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 -Xmx2048m -DlogPath=$LOG_DIR/jni.log -Xlog:gc:$LOG_DIR/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED"
|
||||
JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 -Xmx2048m -DlogPath=$LOG_DIR/jni.log -Xlog:gc:$LOG_DIR/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED -Darrow.enable_null_check_for_get=false"
|
||||
|
||||
# since 1.2, the JAVA_HOME need to be set to run BE process.
|
||||
# JAVA_HOME=/path/to/jdk/
|
||||
|
||||
@ -164,22 +164,24 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
|
||||
defaultProject = props.get(MCProperties.PROJECT);
|
||||
quota = props.getOrDefault(MCProperties.QUOTA, MCProperties.DEFAULT_QUOTA);
|
||||
|
||||
boolean splitCrossPartition =
|
||||
Boolean.parseBoolean(props.getOrDefault(MCProperties.SPLIT_CROSS_PARTITION,
|
||||
MCProperties.DEFAULT_SPLIT_CROSS_PARTITION));
|
||||
|
||||
splitStrategy = props.getOrDefault(MCProperties.SPLIT_STRATEGY, MCProperties.DEFAULT_SPLIT_STRATEGY);
|
||||
if (splitStrategy.equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY)) {
|
||||
splitByteSize = Long.parseLong(props.getOrDefault(MCProperties.SPLIT_BYTE_SIZE,
|
||||
MCProperties.DEFAULT_SPLIT_BYTE_SIZE));
|
||||
|
||||
splitOptions = SplitOptions.newBuilder()
|
||||
.SplitByByteSize(splitByteSize)
|
||||
.withCrossPartition(false)
|
||||
.withCrossPartition(splitCrossPartition)
|
||||
.build();
|
||||
} else {
|
||||
splitRowCount = Long.parseLong(props.getOrDefault(MCProperties.SPLIT_ROW_COUNT,
|
||||
MCProperties.DEFAULT_SPLIT_ROW_COUNT));
|
||||
splitOptions = SplitOptions.newBuilder()
|
||||
.SplitByRowOffset()
|
||||
.withCrossPartition(false)
|
||||
.withCrossPartition(splitCrossPartition)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@ -29,6 +29,7 @@ import org.apache.doris.analysis.LiteralExpr;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
@ -43,6 +44,7 @@ import org.apache.doris.datasource.property.constants.MCProperties;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
|
||||
import org.apache.doris.nereids.util.DateUtils;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.spi.Split;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
@ -79,15 +81,18 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class MaxComputeScanNode extends FileQueryScanNode {
|
||||
|
||||
private final MaxComputeExternalTable table;
|
||||
private TableBatchReadSession tableBatchReadSession;
|
||||
private Predicate filterPredicate;
|
||||
private static final LocationPath ROW_OFFSET_PATH = new LocationPath("/row_offset", Maps.newHashMap());
|
||||
private static final LocationPath BYTE_SIZE_PATH = new LocationPath("/byte_size", Maps.newHashMap());
|
||||
List<String> requiredPartitionColumns = new ArrayList<>();
|
||||
List<String> orderedRequiredDataColumns = new ArrayList<>();
|
||||
|
||||
private int connectTimeout;
|
||||
private int readTimeout;
|
||||
@ -96,6 +101,10 @@ public class MaxComputeScanNode extends FileQueryScanNode {
|
||||
@Setter
|
||||
private SelectedPartitions selectedPartitions = null;
|
||||
|
||||
private static final LocationPath ROW_OFFSET_PATH = new LocationPath("/row_offset", Maps.newHashMap());
|
||||
private static final LocationPath BYTE_SIZE_PATH = new LocationPath("/byte_size", Maps.newHashMap());
|
||||
|
||||
|
||||
// For new planner
|
||||
public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc,
|
||||
SelectedPartitions selectedPartitions, boolean needCheckColumnPriv) {
|
||||
@ -143,33 +152,17 @@ public class MaxComputeScanNode extends FileQueryScanNode {
|
||||
rangeDesc.setSize(maxComputeSplit.getLength());
|
||||
}
|
||||
|
||||
// Return false if no need to read any partition data.
|
||||
// Return true if need to read partition data.
|
||||
boolean createTableBatchReadSession() throws UserException {
|
||||
List<String> requiredPartitionColumns = new ArrayList<>();
|
||||
List<String> orderedRequiredDataColumns = new ArrayList<>();
|
||||
|
||||
List<PartitionSpec> requiredPartitionSpecs = new ArrayList<>();
|
||||
//if requiredPartitionSpecs is empty, get all partition data.
|
||||
if (!table.getPartitionColumns().isEmpty() && selectedPartitions != SelectedPartitions.NOT_PRUNED) {
|
||||
this.totalPartitionNum = selectedPartitions.totalPartitionNum;
|
||||
this.selectedPartitionNum = selectedPartitions.selectedPartitions.size();
|
||||
|
||||
if (selectedPartitions.selectedPartitions.isEmpty()) {
|
||||
//no need read any partition data.
|
||||
return false;
|
||||
}
|
||||
selectedPartitions.selectedPartitions.forEach(
|
||||
(key, value) -> requiredPartitionSpecs.add(new PartitionSpec(key))
|
||||
);
|
||||
}
|
||||
|
||||
private void createRequiredColumns() {
|
||||
Set<String> requiredSlots =
|
||||
desc.getSlots().stream().map(e -> e.getColumn().getName()).collect(Collectors.toSet());
|
||||
|
||||
Set<String> partitionColumns =
|
||||
table.getPartitionColumns().stream().map(Column::getName).collect(Collectors.toSet());
|
||||
|
||||
requiredPartitionColumns.clear();
|
||||
orderedRequiredDataColumns.clear();
|
||||
|
||||
for (Column column : table.getColumns()) {
|
||||
String columnName = column.getName();
|
||||
if (!requiredSlots.contains(columnName)) {
|
||||
@ -181,32 +174,118 @@ public class MaxComputeScanNode extends FileQueryScanNode {
|
||||
orderedRequiredDataColumns.add(columnName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* For no partition table: request requiredPartitionSpecs is empty
|
||||
* For partition table: if requiredPartitionSpecs is empty, get all partition data.
|
||||
*/
|
||||
TableBatchReadSession createTableBatchReadSession(List<PartitionSpec> requiredPartitionSpecs) throws IOException {
|
||||
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
|
||||
|
||||
try {
|
||||
TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder();
|
||||
tableBatchReadSession =
|
||||
scanBuilder.identifier(TableIdentifier.of(table.getDbName(), table.getName()))
|
||||
.withSettings(mcCatalog.getSettings())
|
||||
.withSplitOptions(mcCatalog.getSplitOption())
|
||||
.requiredPartitionColumns(requiredPartitionColumns)
|
||||
.requiredPartitions(requiredPartitionSpecs)
|
||||
.requiredDataColumns(orderedRequiredDataColumns)
|
||||
.withArrowOptions(
|
||||
ArrowOptions.newBuilder()
|
||||
.withDatetimeUnit(TimestampUnit.MILLI)
|
||||
.withTimestampUnit(TimestampUnit.NANO)
|
||||
.build()
|
||||
)
|
||||
.withFilterPredicate(filterPredicate)
|
||||
.buildBatchReadSession();
|
||||
} catch (java.io.IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
readTimeout = mcCatalog.getReadTimeout();
|
||||
connectTimeout = mcCatalog.getConnectTimeout();
|
||||
retryTimes = mcCatalog.getRetryTimes();
|
||||
|
||||
TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder();
|
||||
return scanBuilder.identifier(TableIdentifier.of(table.getDbName(), table.getName()))
|
||||
.withSettings(mcCatalog.getSettings())
|
||||
.withSplitOptions(mcCatalog.getSplitOption())
|
||||
.requiredPartitionColumns(requiredPartitionColumns)
|
||||
.requiredDataColumns(orderedRequiredDataColumns)
|
||||
.withFilterPredicate(filterPredicate)
|
||||
.requiredPartitions(requiredPartitionSpecs)
|
||||
.withArrowOptions(
|
||||
ArrowOptions.newBuilder()
|
||||
.withDatetimeUnit(TimestampUnit.MILLI)
|
||||
.withTimestampUnit(TimestampUnit.NANO)
|
||||
.build()
|
||||
).buildBatchReadSession();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBatchMode() {
|
||||
if (table.getPartitionColumns().isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
||||
com.aliyun.odps.Table odpsTable = table.getOdpsTable();
|
||||
if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
|
||||
return numPartitions > 0
|
||||
&& selectedPartitions != SelectedPartitions.NOT_PRUNED
|
||||
&& selectedPartitions.selectedPartitions.size() >= numPartitions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numApproximateSplits() {
|
||||
return selectedPartitions.selectedPartitions.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startSplit() {
|
||||
this.totalPartitionNum = selectedPartitions.totalPartitionNum;
|
||||
this.selectedPartitionNum = selectedPartitions.selectedPartitions.size();
|
||||
|
||||
if (selectedPartitions.selectedPartitions.isEmpty()) {
|
||||
//no need read any partition data.
|
||||
return;
|
||||
}
|
||||
|
||||
createRequiredColumns();
|
||||
List<PartitionSpec> requiredPartitionSpecs = new ArrayList<>();
|
||||
selectedPartitions.selectedPartitions.forEach(
|
||||
(key, value) -> requiredPartitionSpecs.add(new PartitionSpec(key))
|
||||
);
|
||||
|
||||
|
||||
int batchNumPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
|
||||
|
||||
Executor scheduleExecutor = Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor();
|
||||
AtomicReference<UserException> batchException = new AtomicReference<>(null);
|
||||
AtomicInteger numFinishedPartitions = new AtomicInteger(0);
|
||||
|
||||
CompletableFuture.runAsync(() -> {
|
||||
for (int beginIndex = 0; beginIndex < requiredPartitionSpecs.size(); beginIndex += batchNumPartitions) {
|
||||
int endIndex = Math.min(beginIndex + batchNumPartitions, requiredPartitionSpecs.size());
|
||||
if (batchException.get() != null || splitAssignment.isStop()) {
|
||||
break;
|
||||
}
|
||||
List<PartitionSpec> requiredBatchPartitionSpecs = requiredPartitionSpecs.subList(beginIndex, endIndex);
|
||||
int curBatchSize = endIndex - beginIndex;
|
||||
|
||||
try {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
TableBatchReadSession tableBatchReadSession =
|
||||
createTableBatchReadSession(requiredBatchPartitionSpecs);
|
||||
List<Split> batchSplit = getSplitByTableSession(tableBatchReadSession);
|
||||
|
||||
splitAssignment.addToQueue(batchSplit);
|
||||
} catch (IOException e) {
|
||||
batchException.set(new UserException(e.getMessage(), e));
|
||||
} finally {
|
||||
if (batchException.get() != null) {
|
||||
splitAssignment.setException(batchException.get());
|
||||
}
|
||||
|
||||
if (numFinishedPartitions.addAndGet(curBatchSize) == requiredPartitionSpecs.size()) {
|
||||
splitAssignment.finishSchedule();
|
||||
}
|
||||
}
|
||||
}, scheduleExecutor);
|
||||
} catch (Exception e) {
|
||||
batchException.set(new UserException(e.getMessage(), e));
|
||||
}
|
||||
|
||||
if (batchException.get() != null) {
|
||||
splitAssignment.setException(batchException.get());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -467,6 +546,56 @@ public class MaxComputeScanNode extends FileQueryScanNode {
|
||||
return new HashMap<>();
|
||||
}
|
||||
|
||||
List<Split> getSplitByTableSession(TableBatchReadSession tableBatchReadSession) throws java.io.IOException {
|
||||
List<Split> result = new ArrayList<>();
|
||||
String scanSessionSerialize = serializeSession(tableBatchReadSession);
|
||||
InputSplitAssigner assigner = tableBatchReadSession.getInputSplitAssigner();
|
||||
long modificationTime = table.getOdpsTable().getLastDataModifiedTime().getTime();
|
||||
|
||||
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
|
||||
|
||||
if (mcCatalog.getSplitStrategy().equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY)) {
|
||||
|
||||
for (com.aliyun.odps.table.read.split.InputSplit split : assigner.getAllSplits()) {
|
||||
MaxComputeSplit maxComputeSplit =
|
||||
new MaxComputeSplit(BYTE_SIZE_PATH,
|
||||
((IndexedInputSplit) split).getSplitIndex(), -1,
|
||||
mcCatalog.getSplitByteSize(),
|
||||
modificationTime, null,
|
||||
Collections.emptyList());
|
||||
|
||||
|
||||
maxComputeSplit.scanSerialize = scanSessionSerialize;
|
||||
maxComputeSplit.splitType = SplitType.BYTE_SIZE;
|
||||
maxComputeSplit.sessionId = split.getSessionId();
|
||||
|
||||
result.add(maxComputeSplit);
|
||||
}
|
||||
} else {
|
||||
long totalRowCount = assigner.getTotalRowCount();
|
||||
|
||||
long recordsPerSplit = mcCatalog.getSplitRowCount();
|
||||
for (long offset = 0; offset < totalRowCount; offset += recordsPerSplit) {
|
||||
recordsPerSplit = Math.min(recordsPerSplit, totalRowCount - offset);
|
||||
com.aliyun.odps.table.read.split.InputSplit split =
|
||||
assigner.getSplitByRowOffset(offset, recordsPerSplit);
|
||||
|
||||
MaxComputeSplit maxComputeSplit =
|
||||
new MaxComputeSplit(ROW_OFFSET_PATH,
|
||||
offset, recordsPerSplit, totalRowCount, modificationTime, null,
|
||||
Collections.emptyList());
|
||||
|
||||
maxComputeSplit.scanSerialize = scanSessionSerialize;
|
||||
maxComputeSplit.splitType = SplitType.ROW_OFFSET;
|
||||
maxComputeSplit.sessionId = split.getSessionId();
|
||||
|
||||
result.add(maxComputeSplit);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public List<Split> getSplits() throws UserException {
|
||||
List<Split> result = new ArrayList<>();
|
||||
@ -475,59 +604,26 @@ public class MaxComputeScanNode extends FileQueryScanNode {
|
||||
return result;
|
||||
}
|
||||
|
||||
if (!createTableBatchReadSession()) {
|
||||
return result;
|
||||
createRequiredColumns();
|
||||
|
||||
List<PartitionSpec> requiredPartitionSpecs = new ArrayList<>();
|
||||
//if requiredPartitionSpecs is empty, get all partition data.
|
||||
if (!table.getPartitionColumns().isEmpty() && selectedPartitions != SelectedPartitions.NOT_PRUNED) {
|
||||
this.totalPartitionNum = selectedPartitions.totalPartitionNum;
|
||||
this.selectedPartitionNum = selectedPartitions.selectedPartitions.size();
|
||||
|
||||
if (selectedPartitions.selectedPartitions.isEmpty()) {
|
||||
//no need read any partition data.
|
||||
return result;
|
||||
}
|
||||
selectedPartitions.selectedPartitions.forEach(
|
||||
(key, value) -> requiredPartitionSpecs.add(new PartitionSpec(key))
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
String scanSessionSerialize = serializeSession(tableBatchReadSession);
|
||||
InputSplitAssigner assigner = tableBatchReadSession.getInputSplitAssigner();
|
||||
long modificationTime = table.getOdpsTable().getLastDataModifiedTime().getTime();
|
||||
|
||||
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
|
||||
|
||||
readTimeout = mcCatalog.getReadTimeout();
|
||||
connectTimeout = mcCatalog.getConnectTimeout();
|
||||
retryTimes = mcCatalog.getRetryTimes();
|
||||
|
||||
if (mcCatalog.getSplitStrategy().equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY)) {
|
||||
|
||||
for (com.aliyun.odps.table.read.split.InputSplit split : assigner.getAllSplits()) {
|
||||
MaxComputeSplit maxComputeSplit =
|
||||
new MaxComputeSplit(BYTE_SIZE_PATH,
|
||||
((IndexedInputSplit) split).getSplitIndex(), -1,
|
||||
mcCatalog.getSplitByteSize(),
|
||||
modificationTime, null,
|
||||
Collections.emptyList());
|
||||
|
||||
|
||||
maxComputeSplit.scanSerialize = scanSessionSerialize;
|
||||
maxComputeSplit.splitType = SplitType.BYTE_SIZE;
|
||||
maxComputeSplit.sessionId = split.getSessionId();
|
||||
|
||||
result.add(maxComputeSplit);
|
||||
}
|
||||
} else {
|
||||
long totalRowCount = assigner.getTotalRowCount();
|
||||
|
||||
long recordsPerSplit = mcCatalog.getSplitRowCount();
|
||||
for (long offset = 0; offset < totalRowCount; offset += recordsPerSplit) {
|
||||
recordsPerSplit = Math.min(recordsPerSplit, totalRowCount - offset);
|
||||
com.aliyun.odps.table.read.split.InputSplit split =
|
||||
assigner.getSplitByRowOffset(offset, recordsPerSplit);
|
||||
|
||||
MaxComputeSplit maxComputeSplit =
|
||||
new MaxComputeSplit(ROW_OFFSET_PATH,
|
||||
offset, recordsPerSplit, totalRowCount, modificationTime, null,
|
||||
Collections.emptyList());
|
||||
|
||||
maxComputeSplit.scanSerialize = scanSessionSerialize;
|
||||
maxComputeSplit.splitType = SplitType.ROW_OFFSET;
|
||||
maxComputeSplit.sessionId = split.getSessionId();
|
||||
|
||||
result.add(maxComputeSplit);
|
||||
}
|
||||
}
|
||||
TableBatchReadSession tableBatchReadSession = createTableBatchReadSession(requiredPartitionSpecs);
|
||||
result = getSplitByTableSession(tableBatchReadSession);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
@ -64,6 +64,13 @@ public class MCProperties extends BaseProperties {
|
||||
public static final String DEFAULT_READ_TIMEOUT = "120"; // 120s
|
||||
public static final String DEFAULT_RETRY_COUNT = "4"; // 4 times
|
||||
|
||||
//withCrossPartition(true):
|
||||
// Very friendly to scenarios where there are many partitions but each partition is very small.
|
||||
//withCrossPartition(false):
|
||||
// Very debug friendly.
|
||||
public static final String SPLIT_CROSS_PARTITION = "mc.split_cross_partition";
|
||||
public static final String DEFAULT_SPLIT_CROSS_PARTITION = "true";
|
||||
|
||||
public static CloudCredential getCredential(Map<String, String> props) {
|
||||
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -104,179 +104,191 @@ suite("test_max_compute_partition_prune", "p2,external,maxcompute,external_remot
|
||||
String mc_db = "mc_datalake"
|
||||
String mc_catalog_name = "test_max_compute_partition_prune"
|
||||
|
||||
sql """drop catalog if exists ${mc_catalog_name};"""
|
||||
sql """
|
||||
create catalog if not exists ${mc_catalog_name} properties (
|
||||
"type" = "max_compute",
|
||||
"mc.default.project" = "${mc_db}",
|
||||
"mc.access_key" = "${ak}",
|
||||
"mc.secret_key" = "${sk}",
|
||||
"mc.endpoint" = "http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api"
|
||||
);
|
||||
"""
|
||||
sql """ switch ${mc_catalog_name} """
|
||||
sql """ use ${mc_db}"""
|
||||
|
||||
qt_one_partition_1_1 one_partition_1_1
|
||||
explain {
|
||||
sql("${one_partition_1_1}")
|
||||
contains "partition=1/2"
|
||||
for (String enable_profile : ["true","false"] ) {
|
||||
sql """set enable_profile = ${enable_profile} """;
|
||||
|
||||
for (String num_partitions : ["1","10","100"] ) {
|
||||
sql "set num_partitions_in_batch_mode = ${num_partitions} "
|
||||
|
||||
for (String cross_partition : ["true","false"] ) {
|
||||
|
||||
sql """drop catalog if exists ${mc_catalog_name};"""
|
||||
sql """
|
||||
create catalog if not exists ${mc_catalog_name} properties (
|
||||
"type" = "max_compute",
|
||||
"mc.default.project" = "${mc_db}",
|
||||
"mc.access_key" = "${ak}",
|
||||
"mc.secret_key" = "${sk}",
|
||||
"mc.endpoint" = "http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api",
|
||||
"mc.split_cross_partition" = "${cross_partition}"
|
||||
);
|
||||
"""
|
||||
sql """ switch ${mc_catalog_name} """
|
||||
sql """ use ${mc_db}"""
|
||||
|
||||
qt_one_partition_1_1 one_partition_1_1
|
||||
explain {
|
||||
sql("${one_partition_1_1}")
|
||||
contains "partition=1/2"
|
||||
}
|
||||
|
||||
qt_one_partition_2_1 one_partition_2_1
|
||||
explain {
|
||||
sql("${one_partition_2_1}")
|
||||
contains "partition=1/2"
|
||||
}
|
||||
|
||||
qt_one_partition_3_all one_partition_3_all
|
||||
explain {
|
||||
sql("${one_partition_3_all}")
|
||||
contains "partition=2/2"
|
||||
}
|
||||
|
||||
qt_one_partition_4_all one_partition_4_all
|
||||
explain {
|
||||
sql("${one_partition_4_all}")
|
||||
contains "partition=2/2"
|
||||
}
|
||||
|
||||
qt_one_partition_5_1 one_partition_5_1
|
||||
explain {
|
||||
sql("${one_partition_5_1}")
|
||||
contains "partition=1/2"
|
||||
}
|
||||
|
||||
|
||||
qt_two_partition_1_1 two_partition_1_1
|
||||
explain {
|
||||
sql("${two_partition_1_1}")
|
||||
contains "partition=1/4"
|
||||
}
|
||||
|
||||
qt_two_partition_2_1 two_partition_2_1
|
||||
explain {
|
||||
sql("${two_partition_2_1}")
|
||||
contains "partition=1/4"
|
||||
}
|
||||
|
||||
qt_two_partition_3_2 two_partition_3_2
|
||||
explain {
|
||||
sql("${two_partition_3_2}")
|
||||
contains "partition=2/4"
|
||||
}
|
||||
|
||||
qt_two_partition_4_all two_partition_4_all
|
||||
explain {
|
||||
sql("${two_partition_4_all}")
|
||||
contains "partition=4/4"
|
||||
}
|
||||
|
||||
qt_two_partition_5_1 two_partition_5_1
|
||||
explain {
|
||||
sql("${two_partition_5_1}")
|
||||
contains "partition=1/4"
|
||||
}
|
||||
|
||||
qt_two_partition_6_1 two_partition_6_1
|
||||
explain {
|
||||
sql("${two_partition_6_1}")
|
||||
contains "partition=1/4"
|
||||
}
|
||||
|
||||
|
||||
|
||||
qt_three_partition_1_1 three_partition_1_1
|
||||
explain {
|
||||
sql("${three_partition_1_1}")
|
||||
contains "partition=1/10"
|
||||
}
|
||||
|
||||
qt_three_partition_2_1 three_partition_2_1
|
||||
explain {
|
||||
sql("${three_partition_2_1}")
|
||||
contains "partition=1/10"
|
||||
}
|
||||
|
||||
qt_three_partition_3_3 three_partition_3_3
|
||||
explain {
|
||||
sql("${three_partition_3_3}")
|
||||
contains "partition=3/10"
|
||||
}
|
||||
|
||||
qt_three_partition_4_2 three_partition_4_2
|
||||
explain {
|
||||
sql("${three_partition_4_2}")
|
||||
contains "partition=2/10"
|
||||
}
|
||||
|
||||
qt_three_partition_5_all three_partition_5_all
|
||||
explain {
|
||||
sql("${three_partition_5_all}")
|
||||
contains "partition=10/10"
|
||||
}
|
||||
|
||||
qt_three_partition_6_1 three_partition_6_1
|
||||
explain {
|
||||
sql("${three_partition_6_1}")
|
||||
contains "partition=1/10"
|
||||
}
|
||||
|
||||
qt_three_partition_7_7 three_partition_7_7
|
||||
explain {
|
||||
sql("${three_partition_7_7}")
|
||||
contains "partition=7/10"
|
||||
}
|
||||
|
||||
qt_three_partition_8_2 three_partition_8_2
|
||||
explain {
|
||||
sql("${three_partition_8_2}")
|
||||
contains "partition=2/10"
|
||||
}
|
||||
|
||||
|
||||
// 0 partitions
|
||||
def one_partition_6_0 = """SELECT * FROM one_partition_tb WHERE part1 = 2023 ORDER BY id;"""
|
||||
qt_one_partition_6_0 one_partition_6_0
|
||||
explain {
|
||||
sql("${one_partition_6_0}")
|
||||
contains "partition=0/2"
|
||||
}
|
||||
|
||||
def two_partition_7_0 = """SELECT * FROM two_partition_tb WHERE part1 = 'CN' AND part2 = 1 ORDER BY id;"""
|
||||
qt_two_partition_7_0 two_partition_7_0
|
||||
explain {
|
||||
sql("${two_partition_7_0}")
|
||||
contains "partition=0/4"
|
||||
}
|
||||
|
||||
def two_partition_8_0 = """SELECT * FROM two_partition_tb WHERE part1 = 'US' AND part2 = 3 ORDER BY id;"""
|
||||
qt_two_partition_8_0 two_partition_8_0
|
||||
explain {
|
||||
sql("${two_partition_8_0}")
|
||||
contains "partition=0/4"
|
||||
}
|
||||
|
||||
def three_partition_9_0 = """SELECT * FROM three_partition_tb WHERE part1 = 'US' AND part2 = 2023 AND part3 = 'Q1' ORDER BY id;"""
|
||||
qt_three_partition_9_0 three_partition_9_0
|
||||
explain {
|
||||
sql("${three_partition_9_0}")
|
||||
contains "partition=0/10"
|
||||
}
|
||||
|
||||
def three_partition_10_0 = """SELECT * FROM three_partition_tb WHERE part1 = 'EU' AND part2 = 2024 AND part3 = 'Q4' ORDER BY id;"""
|
||||
qt_three_partition_10_0 three_partition_10_0
|
||||
explain {
|
||||
sql("${three_partition_10_0}")
|
||||
contains "partition=0/10"
|
||||
}
|
||||
|
||||
def three_partition_11_0 = """SELECT * FROM three_partition_tb WHERE part1 = 'AS' AND part2 = 2025 AND part3 = 'Q4' ORDER BY id;"""
|
||||
qt_three_partition_11_0 three_partition_11_0
|
||||
explain {
|
||||
sql("${three_partition_11_0}")
|
||||
contains "partition=0/10"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
qt_one_partition_2_1 one_partition_2_1
|
||||
explain {
|
||||
sql("${one_partition_2_1}")
|
||||
contains "partition=1/2"
|
||||
}
|
||||
|
||||
qt_one_partition_3_all one_partition_3_all
|
||||
explain {
|
||||
sql("${one_partition_3_all}")
|
||||
contains "partition=2/2"
|
||||
}
|
||||
|
||||
qt_one_partition_4_all one_partition_4_all
|
||||
explain {
|
||||
sql("${one_partition_4_all}")
|
||||
contains "partition=2/2"
|
||||
}
|
||||
|
||||
qt_one_partition_5_1 one_partition_5_1
|
||||
explain {
|
||||
sql("${one_partition_5_1}")
|
||||
contains "partition=1/2"
|
||||
}
|
||||
|
||||
|
||||
qt_two_partition_1_1 two_partition_1_1
|
||||
explain {
|
||||
sql("${two_partition_1_1}")
|
||||
contains "partition=1/4"
|
||||
}
|
||||
|
||||
qt_two_partition_2_1 two_partition_2_1
|
||||
explain {
|
||||
sql("${two_partition_2_1}")
|
||||
contains "partition=1/4"
|
||||
}
|
||||
|
||||
qt_two_partition_3_2 two_partition_3_2
|
||||
explain {
|
||||
sql("${two_partition_3_2}")
|
||||
contains "partition=2/4"
|
||||
}
|
||||
|
||||
qt_two_partition_4_all two_partition_4_all
|
||||
explain {
|
||||
sql("${two_partition_4_all}")
|
||||
contains "partition=4/4"
|
||||
}
|
||||
|
||||
qt_two_partition_5_1 two_partition_5_1
|
||||
explain {
|
||||
sql("${two_partition_5_1}")
|
||||
contains "partition=1/4"
|
||||
}
|
||||
|
||||
qt_two_partition_6_1 two_partition_6_1
|
||||
explain {
|
||||
sql("${two_partition_6_1}")
|
||||
contains "partition=1/4"
|
||||
}
|
||||
|
||||
|
||||
|
||||
qt_three_partition_1_1 three_partition_1_1
|
||||
explain {
|
||||
sql("${three_partition_1_1}")
|
||||
contains "partition=1/10"
|
||||
}
|
||||
|
||||
qt_three_partition_2_1 three_partition_2_1
|
||||
explain {
|
||||
sql("${three_partition_2_1}")
|
||||
contains "partition=1/10"
|
||||
}
|
||||
|
||||
qt_three_partition_3_3 three_partition_3_3
|
||||
explain {
|
||||
sql("${three_partition_3_3}")
|
||||
contains "partition=3/10"
|
||||
}
|
||||
|
||||
qt_three_partition_4_2 three_partition_4_2
|
||||
explain {
|
||||
sql("${three_partition_4_2}")
|
||||
contains "partition=2/10"
|
||||
}
|
||||
|
||||
qt_three_partition_5_all three_partition_5_all
|
||||
explain {
|
||||
sql("${three_partition_5_all}")
|
||||
contains "partition=10/10"
|
||||
}
|
||||
|
||||
qt_three_partition_6_1 three_partition_6_1
|
||||
explain {
|
||||
sql("${three_partition_6_1}")
|
||||
contains "partition=1/10"
|
||||
}
|
||||
|
||||
qt_three_partition_7_7 three_partition_7_7
|
||||
explain {
|
||||
sql("${three_partition_7_7}")
|
||||
contains "partition=7/10"
|
||||
}
|
||||
|
||||
qt_three_partition_8_2 three_partition_8_2
|
||||
explain {
|
||||
sql("${three_partition_8_2}")
|
||||
contains "partition=2/10"
|
||||
}
|
||||
|
||||
|
||||
// 0 partitions
|
||||
def one_partition_6_0 = """SELECT * FROM one_partition_tb WHERE part1 = 2023 ORDER BY id;"""
|
||||
qt_one_partition_6_0 one_partition_6_0
|
||||
explain {
|
||||
sql("${one_partition_6_0}")
|
||||
contains "partition=0/2"
|
||||
}
|
||||
|
||||
def two_partition_7_0 = """SELECT * FROM two_partition_tb WHERE part1 = 'CN' AND part2 = 1 ORDER BY id;"""
|
||||
qt_two_partition_7_0 two_partition_7_0
|
||||
explain {
|
||||
sql("${two_partition_7_0}")
|
||||
contains "partition=0/4"
|
||||
}
|
||||
|
||||
def two_partition_8_0 = """SELECT * FROM two_partition_tb WHERE part1 = 'US' AND part2 = 3 ORDER BY id;"""
|
||||
qt_two_partition_8_0 two_partition_8_0
|
||||
explain {
|
||||
sql("${two_partition_8_0}")
|
||||
contains "partition=0/4"
|
||||
}
|
||||
|
||||
def three_partition_9_0 = """SELECT * FROM three_partition_tb WHERE part1 = 'US' AND part2 = 2023 AND part3 = 'Q1' ORDER BY id;"""
|
||||
qt_three_partition_9_0 three_partition_9_0
|
||||
explain {
|
||||
sql("${three_partition_9_0}")
|
||||
contains "partition=0/10"
|
||||
}
|
||||
|
||||
def three_partition_10_0 = """SELECT * FROM three_partition_tb WHERE part1 = 'EU' AND part2 = 2024 AND part3 = 'Q4' ORDER BY id;"""
|
||||
qt_three_partition_10_0 three_partition_10_0
|
||||
explain {
|
||||
sql("${three_partition_10_0}")
|
||||
contains "partition=0/10"
|
||||
}
|
||||
|
||||
def three_partition_11_0 = """SELECT * FROM three_partition_tb WHERE part1 = 'AS' AND part2 = 2025 AND part3 = 'Q4' ORDER BY id;"""
|
||||
qt_three_partition_11_0 three_partition_11_0
|
||||
explain {
|
||||
sql("${three_partition_11_0}")
|
||||
contains "partition=0/10"
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user