bp #37041
This commit is contained in:
@ -312,7 +312,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
if (splitAssignment.getSampleSplit() == null && !(getLocationType() == TFileType.FILE_STREAM)) {
|
||||
return;
|
||||
}
|
||||
inputSplitsNum = numApproximateSplits();
|
||||
selectedSplitNum = numApproximateSplits();
|
||||
|
||||
TFileType locationType;
|
||||
FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit();
|
||||
@ -322,7 +322,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
} else {
|
||||
locationType = getLocationType(fileSplit.getPath().toString());
|
||||
}
|
||||
totalFileSize = fileSplit.getLength() * inputSplitsNum;
|
||||
totalFileSize = fileSplit.getLength() * selectedSplitNum;
|
||||
long maxWaitTime = ConnectContext.get().getSessionVariable().getFetchSplitsMaxWaitTime();
|
||||
// Not accurate, only used to estimate concurrency.
|
||||
int numSplitsPerBE = numApproximateSplits() / backendPolicy.numBackends();
|
||||
@ -350,7 +350,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
if (ConnectContext.get().getExecutor() != null) {
|
||||
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
|
||||
}
|
||||
inputSplitsNum = inputSplits.size();
|
||||
selectedSplitNum = inputSplits.size();
|
||||
if (inputSplits.isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -65,10 +65,8 @@ public abstract class FileScanNode extends ExternalScanNode {
|
||||
public static final long DEFAULT_SPLIT_SIZE = 64 * 1024 * 1024; // 64MB
|
||||
|
||||
// For explain
|
||||
protected long inputSplitsNum = 0;
|
||||
protected long totalFileSize = 0;
|
||||
protected long totalPartitionNum = 0;
|
||||
protected long readPartitionNum = 0;
|
||||
protected long fileSplitSize;
|
||||
protected boolean isSplitSizeSetBySession = false;
|
||||
|
||||
@ -127,9 +125,9 @@ public abstract class FileScanNode extends ExternalScanNode {
|
||||
if (isBatchMode()) {
|
||||
output.append("(approximate)");
|
||||
}
|
||||
output.append("inputSplitNum=").append(inputSplitsNum).append(", totalFileSize=")
|
||||
output.append("inputSplitNum=").append(selectedSplitNum).append(", totalFileSize=")
|
||||
.append(totalFileSize).append(", scanRanges=").append(scanRangeLocations.size()).append("\n");
|
||||
output.append(prefix).append("partition=").append(readPartitionNum).append("/").append(totalPartitionNum)
|
||||
output.append(prefix).append("partition=").append(selectedPartitionNum).append("/").append(totalPartitionNum)
|
||||
.append("\n");
|
||||
|
||||
if (detailLevel == TExplainLevel.VERBOSE) {
|
||||
@ -299,8 +297,4 @@ public abstract class FileScanNode extends ExternalScanNode {
|
||||
long fileLength = last.getOffset() + last.getLength() - 1L;
|
||||
throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength));
|
||||
}
|
||||
|
||||
public long getReadPartitionNum() {
|
||||
return this.readPartitionNum;
|
||||
}
|
||||
}
|
||||
|
||||
@ -181,7 +181,7 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
partitionItems = selectedPartitions.selectedPartitions.values();
|
||||
}
|
||||
Preconditions.checkNotNull(partitionItems);
|
||||
this.readPartitionNum = partitionItems.size();
|
||||
this.selectedPartitionNum = partitionItems.size();
|
||||
|
||||
// get partitions from cache
|
||||
List<List<String>> partitionValuesList = Lists.newArrayListWithCapacity(partitionItems.size());
|
||||
@ -198,7 +198,7 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
hmsTable.getRemoteTable().getSd().getInputFormat(),
|
||||
hmsTable.getRemoteTable().getSd().getLocation(), null, Maps.newHashMap());
|
||||
this.totalPartitionNum = 1;
|
||||
this.readPartitionNum = 1;
|
||||
this.selectedPartitionNum = 1;
|
||||
resPartitions.add(dummyPartition);
|
||||
}
|
||||
if (ConnectContext.get().getExecutor() != null) {
|
||||
|
||||
@ -285,7 +285,7 @@ public class HudiScanNode extends HiveScanNode {
|
||||
partitionValues.getSingleColumnRangeMap(),
|
||||
true);
|
||||
Collection<Long> filteredPartitionIds = pruner.prune();
|
||||
this.readPartitionNum = filteredPartitionIds.size();
|
||||
this.selectedPartitionNum = filteredPartitionIds.size();
|
||||
// 3. get partitions from cache
|
||||
String dbName = hmsTable.getDbName();
|
||||
String tblName = hmsTable.getName();
|
||||
@ -310,7 +310,7 @@ public class HudiScanNode extends HiveScanNode {
|
||||
hmsTable.getRemoteTable().getSd().getInputFormat(),
|
||||
hmsTable.getRemoteTable().getSd().getLocation(), null, Maps.newHashMap());
|
||||
this.totalPartitionNum = 1;
|
||||
this.readPartitionNum = 1;
|
||||
this.selectedPartitionNum = 1;
|
||||
return Lists.newArrayList(dummyPartition);
|
||||
}
|
||||
|
||||
@ -502,7 +502,7 @@ public class HudiScanNode extends HiveScanNode {
|
||||
return super.getNodeExplainString(prefix, detailLevel);
|
||||
} else {
|
||||
return super.getNodeExplainString(prefix, detailLevel)
|
||||
+ String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum.get(), inputSplitsNum);
|
||||
+ String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum.get(), selectedSplitNum);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -266,7 +266,7 @@ public class IcebergScanNode extends FileQueryScanNode {
|
||||
return splits.isEmpty() ? splits : Collections.singletonList(splits.get(0));
|
||||
}
|
||||
|
||||
readPartitionNum = partitionPathSet.size();
|
||||
selectedPartitionNum = partitionPathSet.size();
|
||||
|
||||
return splits;
|
||||
}
|
||||
|
||||
@ -197,7 +197,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
|
||||
partitionValues.getSingleColumnRangeMap(),
|
||||
false);
|
||||
Collection<Long> filteredPartitionIds = pruner.prune();
|
||||
this.readPartitionNum = filteredPartitionIds.size();
|
||||
this.selectedPartitionNum = filteredPartitionIds.size();
|
||||
// get partitions from cache
|
||||
Map<Long, String> partitionIdToNameMap = partitionValues.getPartitionIdToNameMap();
|
||||
filteredPartitionIds.forEach(id -> result.add(partitionIdToNameMap.get(id)));
|
||||
|
||||
@ -270,7 +270,7 @@ public class PaimonScanNode extends FileQueryScanNode {
|
||||
}
|
||||
splitStats.add(splitStat);
|
||||
}
|
||||
this.readPartitionNum = selectedPartitionValues.size();
|
||||
this.selectedPartitionNum = selectedPartitionValues.size();
|
||||
// TODO: get total partition number
|
||||
return splits;
|
||||
}
|
||||
|
||||
@ -205,7 +205,7 @@ public class FileLoadScanNode extends FileScanNode {
|
||||
LoadScanProvider scanProvider = scanProviders.get(i);
|
||||
finalizeParamsForLoad(context, analyzer);
|
||||
createScanRangeLocations(context, scanProvider, localBackendPolicy);
|
||||
this.inputSplitsNum += scanProvider.getInputSplitNum();
|
||||
this.selectedSplitNum += scanProvider.getInputSplitNum();
|
||||
this.totalFileSize += scanProvider.getInputFileSize();
|
||||
}
|
||||
}
|
||||
|
||||
@ -164,10 +164,8 @@ public class OlapScanNode extends ScanNode {
|
||||
private boolean canTurnOnPreAggr = true;
|
||||
private boolean forceOpenPreAgg = false;
|
||||
private OlapTable olapTable = null;
|
||||
private long selectedTabletsNum = 0;
|
||||
private long totalTabletsNum = 0;
|
||||
private long selectedIndexId = -1;
|
||||
private int selectedPartitionNum = 0;
|
||||
private Collection<Long> selectedPartitionIds = Lists.newArrayList();
|
||||
private long totalBytes = 0;
|
||||
|
||||
@ -299,14 +297,6 @@ public class OlapScanNode extends ScanNode {
|
||||
this.forceOpenPreAgg = forceOpenPreAgg;
|
||||
}
|
||||
|
||||
public Integer getSelectedPartitionNum() {
|
||||
return selectedPartitionNum;
|
||||
}
|
||||
|
||||
public Long getSelectedTabletsNum() {
|
||||
return selectedTabletsNum;
|
||||
}
|
||||
|
||||
public SortInfo getSortInfo() {
|
||||
return sortInfo;
|
||||
}
|
||||
@ -1175,7 +1165,7 @@ public class OlapScanNode extends ScanNode {
|
||||
}
|
||||
|
||||
totalTabletsNum += selectedTable.getTablets().size();
|
||||
selectedTabletsNum += tablets.size();
|
||||
selectedSplitNum += tablets.size();
|
||||
addScanRangeLocations(partition, tablets);
|
||||
}
|
||||
}
|
||||
@ -1337,7 +1327,7 @@ public class OlapScanNode extends ScanNode {
|
||||
.collect(Collectors.joining(","));
|
||||
output.append(prefix).append(String.format("partitions=%s/%s (%s)", selectedPartitionNum,
|
||||
olapTable.getPartitions().size(), selectedPartitions)).append("\n");
|
||||
output.append(prefix).append(String.format("tablets=%s/%s", selectedTabletsNum, totalTabletsNum));
|
||||
output.append(prefix).append(String.format("tablets=%s/%s", selectedSplitNum, totalTabletsNum));
|
||||
// We print up to 3 tablet, and we print "..." if the number is more than 3
|
||||
if (scanTabletIds.size() > 3) {
|
||||
List<Long> firstTenTabletIds = scanTabletIds.subList(0, 3);
|
||||
|
||||
@ -94,6 +94,9 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator {
|
||||
protected PartitionInfo partitionsInfo = null;
|
||||
protected SplitAssignment splitAssignment = null;
|
||||
|
||||
protected long selectedPartitionNum = 0;
|
||||
protected long selectedSplitNum = 0;
|
||||
|
||||
// create a mapping between output slot's id and project expr
|
||||
Map<SlotId, Expr> outputSlotToProjectExpr = new HashMap<>();
|
||||
|
||||
@ -741,4 +744,12 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator {
|
||||
long limitRowsForSingleInstance = ctx == null ? 10000 : ctx.getSessionVariable().limitRowsForSingleInstance;
|
||||
return hasLimit() && getLimit() < limitRowsForSingleInstance && conjuncts.isEmpty();
|
||||
}
|
||||
|
||||
public long getSelectedPartitionNum() {
|
||||
return selectedPartitionNum;
|
||||
}
|
||||
|
||||
public long getSelectedSplitNum() {
|
||||
return selectedSplitNum;
|
||||
}
|
||||
}
|
||||
|
||||
@ -118,6 +118,7 @@ import org.apache.doris.common.util.ProfileManager.ProfileType;
|
||||
import org.apache.doris.common.util.SqlParserUtils;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.FileScanNode;
|
||||
import org.apache.doris.datasource.jdbc.client.JdbcClientException;
|
||||
import org.apache.doris.datasource.tvf.source.TVFScanNode;
|
||||
import org.apache.doris.load.EtlJobType;
|
||||
@ -630,13 +631,13 @@ public class StmtExecutor {
|
||||
}
|
||||
List<ScanNode> scanNodeList = planner.getScanNodes();
|
||||
for (ScanNode scanNode : scanNodeList) {
|
||||
if (scanNode instanceof OlapScanNode) {
|
||||
OlapScanNode olapScanNode = (OlapScanNode) scanNode;
|
||||
if (scanNode instanceof OlapScanNode || scanNode instanceof FileScanNode) {
|
||||
Env.getCurrentEnv().getSqlBlockRuleMgr().checkLimitations(
|
||||
olapScanNode.getSelectedPartitionNum().longValue(),
|
||||
olapScanNode.getSelectedTabletsNum(),
|
||||
olapScanNode.getCardinality(),
|
||||
scanNode.getSelectedPartitionNum(),
|
||||
scanNode.getSelectedSplitNum(),
|
||||
scanNode.getCardinality(),
|
||||
context.getQualifiedUser());
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -714,7 +714,7 @@ public class CacheAnalyzer {
|
||||
private CacheTable buildCacheTableForHiveScanNode(HiveScanNode node) {
|
||||
CacheTable cacheTable = new CacheTable();
|
||||
cacheTable.table = node.getTargetTable();
|
||||
cacheTable.partitionNum = node.getReadPartitionNum();
|
||||
cacheTable.partitionNum = node.getSelectedPartitionNum();
|
||||
cacheTable.latestPartitionTime = cacheTable.table.getUpdateTime();
|
||||
TableIf tableIf = cacheTable.table;
|
||||
DatabaseIf database = tableIf.getDatabase();
|
||||
|
||||
@ -160,7 +160,7 @@ public class ColocatePlanTest extends TestWithFeService {
|
||||
Assert.assertTrue(scanNodeList.get(0) instanceof OlapScanNode);
|
||||
OlapScanNode olapScanNode = (OlapScanNode) scanNodeList.get(0);
|
||||
Assert.assertEquals(olapScanNode.getSelectedPartitionIds().size(), 2);
|
||||
long selectedTablet = Deencapsulation.getField(olapScanNode, "selectedTabletsNum");
|
||||
long selectedTablet = Deencapsulation.getField(olapScanNode, "selectedSplitNum");
|
||||
Assert.assertEquals(selectedTablet, 2);
|
||||
|
||||
List<QueryStatisticsItem.FragmentInstanceInfo> instanceInfo = coordinator.getFragmentInstanceInfos();
|
||||
|
||||
@ -0,0 +1,13 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !sql01 --
|
||||
6179 21 4 20 5.00 4600.10 0.05 0.00 A F 1994-06-05 1994-07-27 1994-06-26 COLLECT COD MAIL silent deposits. furiously us chicago
|
||||
6273 2552 1 51 33.00 31384.65 0.04 0.08 R F 1995-04-23 1995-05-02 1995-05-13 DELIVER IN PERSON TRUCK ges. unusual, pending packages accordi us chicago
|
||||
8645 7554 4 53 34.00 32403.70 0.03 0.03 N O 1996-12-29 1997-01-25 1997-01-16 TAKE BACK RETURN FOB ackages are carefully above the jp tokyo
|
||||
5121 7580 6 79 2.00 1958.14 0.04 0.07 R F 1992-08-10 1992-06-28 1992-08-11 NONE FOB final, regular account us washington
|
||||
2883 2592 1 91 33.00 32705.97 0.08 0.07 R F 1995-02-26 1995-03-04 1995-03-01 NONE RAIL s. final i cn shanghai
|
||||
807 5150 7 149 19.00 19933.66 0.08 0.05 A F 1994-02-10 1994-02-20 1994-03-06 NONE SHIP ns haggle quickly across the furi cn beijing
|
||||
4452 7650 2 149 47.00 49309.58 0.01 0.06 A F 1994-10-08 1994-08-09 1994-10-09 TAKE BACK RETURN TRUCK ts. slyly regular cour us washington
|
||||
4102 5176 5 175 32.00 34405.44 0.08 0.01 N O 1996-05-14 1996-04-29 1996-05-29 NONE RAIL the even requests; regular pinto us washington
|
||||
2117 2680 6 179 27.00 29137.59 0.09 0.08 N O 1997-06-30 1997-06-27 1997-07-11 TAKE BACK RETURN REG AIR the carefully ironic ideas cn shanghai
|
||||
548 7683 3 182 21.00 22725.78 0.03 0.08 A F 1995-01-13 1994-12-18 1995-01-25 NONE AIR ideas. special accounts above the furiou cn beijing
|
||||
|
||||
@ -0,0 +1,102 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import org.awaitility.Awaitility
|
||||
|
||||
suite("test_external_sql_block_rule", "external_docker,hive,external_docker_hive,p0,external") {
|
||||
String enabled = context.config.otherConfigs.get("enableHiveTest")
|
||||
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
|
||||
logger.info("diable Hive test.")
|
||||
return;
|
||||
}
|
||||
|
||||
String hivePrefix = "hive2";
|
||||
String catalog_name = "test_${hivePrefix}_external_sql_block_rule";
|
||||
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
|
||||
String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort")
|
||||
|
||||
sql """drop catalog if exists ${catalog_name} """
|
||||
|
||||
sql """CREATE CATALOG ${catalog_name} PROPERTIES (
|
||||
'type'='hms',
|
||||
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}',
|
||||
'hadoop.username' = 'hive'
|
||||
);"""
|
||||
|
||||
sql "use ${catalog_name}.`default`";
|
||||
qt_sql01 """select * from parquet_partition_table order by l_linenumber,l_orderkey limit 10;"""
|
||||
|
||||
sql """drop sql_block_rule if exists external_hive_partition"""
|
||||
sql """create sql_block_rule external_hive_partition properties("partition_num" = "3", "global" = "false");"""
|
||||
sql """drop sql_block_rule if exists external_hive_partition2"""
|
||||
sql """create sql_block_rule external_hive_partition2 properties("tablet_num" = "3", "global" = "false");"""
|
||||
sql """drop sql_block_rule if exists external_hive_partition3"""
|
||||
sql """create sql_block_rule external_hive_partition3 properties("cardinality" = "3", "global" = "false");"""
|
||||
// create 3 users
|
||||
sql """drop user if exists external_block_user1"""
|
||||
sql """create user external_block_user1;"""
|
||||
sql """SET PROPERTY FOR 'external_block_user1' 'sql_block_rules' = 'external_hive_partition';"""
|
||||
sql """grant all on *.*.* to external_block_user1;"""
|
||||
|
||||
sql """drop user if exists external_block_user2"""
|
||||
sql """create user external_block_user2;"""
|
||||
sql """SET PROPERTY FOR 'external_block_user2' 'sql_block_rules' = 'external_hive_partition2';"""
|
||||
sql """grant all on *.*.* to external_block_user2;"""
|
||||
|
||||
sql """drop user if exists external_block_user3"""
|
||||
sql """create user external_block_user3;"""
|
||||
sql """SET PROPERTY FOR 'external_block_user3' 'sql_block_rules' = 'external_hive_partition3';"""
|
||||
sql """grant all on *.*.* to external_block_user3;"""
|
||||
|
||||
Awaitility.await().atMost(10, TimeUnit.SECONDS).with().pollDelay(1000, TimeUnit.MILLISECONDS).await().until(() -> {
|
||||
def res = sql """show table stats ${catalog_name}.`default`.parquet_partition_table"""
|
||||
print "${res}"
|
||||
if (res[0][2] != 0) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
|
||||
// login as external_block_user1
|
||||
def result1 = connect(user = 'external_block_user1', password = '', url = context.config.jdbcUrl) {
|
||||
sql """set enable_fallback_to_original_planner=false;"""
|
||||
test {
|
||||
sql """select * from ${catalog_name}.`default`.parquet_partition_table order by l_linenumber limit 10;"""
|
||||
exception """sql hits sql block rule: external_hive_partition, reach partition_num : 3"""
|
||||
}
|
||||
}
|
||||
// login as external_block_user2
|
||||
def result2 = connect(user = 'external_block_user2', password = '', url = context.config.jdbcUrl) {
|
||||
sql """set enable_fallback_to_original_planner=false;"""
|
||||
test {
|
||||
sql """select * from ${catalog_name}.`default`.parquet_partition_table order by l_linenumber limit 10;"""
|
||||
exception """sql hits sql block rule: external_hive_partition2, reach tablet_num : 3"""
|
||||
}
|
||||
}
|
||||
// login as external_block_user3
|
||||
def result3 = connect(user = 'external_block_user3', password = '', url = context.config.jdbcUrl) {
|
||||
def res = sql """show property;"""
|
||||
print "${res}"
|
||||
sql """set enable_fallback_to_original_planner=false;"""
|
||||
test {
|
||||
sql """select * from ${catalog_name}.`default`.parquet_partition_table order by l_linenumber limit 10;"""
|
||||
exception """sql hits sql block rule: external_hive_partition3, reach cardinality : 3"""
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user