@ -39,6 +39,7 @@ import org.apache.doris.thrift.TFileScanNode;
|
||||
import org.apache.doris.thrift.TFileScanRangeParams;
|
||||
import org.apache.doris.thrift.TPlanNode;
|
||||
import org.apache.doris.thrift.TPlanNodeType;
|
||||
import org.apache.doris.thrift.TPushAggOp;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -70,6 +71,7 @@ public abstract class FileScanNode extends ExternalScanNode {
|
||||
protected long totalPartitionNum = 0;
|
||||
protected long readPartitionNum = 0;
|
||||
protected long fileSplitSize;
|
||||
public long rowCount = 0;
|
||||
|
||||
public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType,
|
||||
boolean needCheckColumnPriv) {
|
||||
@ -95,6 +97,10 @@ public abstract class FileScanNode extends ExternalScanNode {
|
||||
planNode.setFileScanNode(fileScanNode);
|
||||
}
|
||||
|
||||
public long getPushDownCount() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
|
||||
StringBuilder output = new StringBuilder();
|
||||
@ -170,7 +176,13 @@ public abstract class FileScanNode extends ExternalScanNode {
|
||||
output.append(String.format("avgRowSize=%s, ", avgRowSize));
|
||||
}
|
||||
output.append(String.format("numNodes=%s", numNodes)).append("\n");
|
||||
output.append(prefix).append(String.format("pushdown agg=%s", pushDownAggNoGroupingOp)).append("\n");
|
||||
|
||||
// pushdown agg
|
||||
output.append(prefix).append(String.format("pushdown agg=%s", pushDownAggNoGroupingOp));
|
||||
if (pushDownAggNoGroupingOp.equals(TPushAggOp.COUNT)) {
|
||||
output.append(" (").append(getPushDownCount()).append(")");
|
||||
}
|
||||
output.append("\n");
|
||||
|
||||
return output.toString();
|
||||
}
|
||||
|
||||
@ -78,7 +78,6 @@ import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -213,6 +212,12 @@ public class IcebergScanNode extends FileQueryScanNode {
|
||||
HashSet<String> partitionPathSet = new HashSet<>();
|
||||
boolean isPartitionedTable = icebergTable.spec().isPartitioned();
|
||||
|
||||
long rowCount = getCountFromSnapshot();
|
||||
if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT) && rowCount >= 0) {
|
||||
this.rowCount = rowCount;
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize);
|
||||
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
|
||||
TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
|
||||
@ -266,12 +271,6 @@ public class IcebergScanNode extends FileQueryScanNode {
|
||||
throw new UserException(e.getMessage(), e.getCause());
|
||||
}
|
||||
|
||||
TPushAggOp aggOp = getPushDownAggNoGroupingOp();
|
||||
if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() > 0) {
|
||||
// we can create a special empty split and skip the plan process
|
||||
return Collections.singletonList(splits.get(0));
|
||||
}
|
||||
|
||||
readPartitionNum = partitionPathSet.size();
|
||||
|
||||
return splits;
|
||||
@ -425,7 +424,7 @@ public class IcebergScanNode extends FileQueryScanNode {
|
||||
|
||||
// empty table
|
||||
if (snapshot == null) {
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
Map<String, String> summary = snapshot.summary();
|
||||
@ -442,12 +441,17 @@ public class IcebergScanNode extends FileQueryScanNode {
|
||||
super.toThrift(planNode);
|
||||
if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) {
|
||||
long countFromSnapshot = getCountFromSnapshot();
|
||||
if (countFromSnapshot > 0) {
|
||||
if (countFromSnapshot >= 0) {
|
||||
planNode.setPushDownCount(countFromSnapshot);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPushDownCount() {
|
||||
return getCountFromSnapshot();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
|
||||
if (pushdownIcebergPredicates.isEmpty()) {
|
||||
|
||||
@ -20,10 +20,12 @@ package org.apache.doris.nereids;
|
||||
import org.apache.doris.analysis.DescriptorTable;
|
||||
import org.apache.doris.analysis.ExplainOptions;
|
||||
import org.apache.doris.analysis.StatementBase;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.common.FormatOptions;
|
||||
import org.apache.doris.common.NereidsException;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.profile.SummaryProfile;
|
||||
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
|
||||
import org.apache.doris.nereids.CascadesContext.Lock;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
|
||||
@ -49,14 +51,18 @@ import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
|
||||
import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.planner.RuntimeFilter;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
import org.apache.doris.qe.CommonResultSet;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.ResultSet;
|
||||
import org.apache.doris.qe.ResultSetMetaData;
|
||||
import org.apache.doris.qe.SessionVariable;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
@ -66,6 +72,7 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
@ -540,7 +547,23 @@ public class NereidsPlanner extends Planner {
|
||||
}
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
if (physicalPlan instanceof PhysicalResultSink
|
||||
&& physicalPlan.child(0) instanceof PhysicalHashAggregate && !getScanNodes().isEmpty()
|
||||
&& getScanNodes().get(0) instanceof IcebergScanNode) {
|
||||
List<Column> columns = Lists.newArrayList();
|
||||
NamedExpression output = physicalPlan.getOutput().get(0);
|
||||
columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType()));
|
||||
if (((IcebergScanNode) getScanNodes().get(0)).rowCount > 0) {
|
||||
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
|
||||
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(
|
||||
Lists.newArrayList(String.valueOf(((IcebergScanNode) getScanNodes().get(0)).rowCount))));
|
||||
// only support one iceberg scan node and one count, e.g. select count(*) from icetbl;
|
||||
return Optional.of(resultSet);
|
||||
}
|
||||
return Optional.empty();
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
private void setFormatOptions() {
|
||||
|
||||
@ -189,6 +189,7 @@ import org.apache.doris.planner.SetOperationNode;
|
||||
import org.apache.doris.planner.SortNode;
|
||||
import org.apache.doris.planner.TableFunctionNode;
|
||||
import org.apache.doris.planner.UnionNode;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.statistics.StatisticConstants;
|
||||
import org.apache.doris.tablefunction.TableValuedFunctionIf;
|
||||
import org.apache.doris.thrift.TFetchOption;
|
||||
@ -1087,6 +1088,12 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
+ storageLayerAggregate.getAggOp());
|
||||
}
|
||||
|
||||
if (storageLayerAggregate.getRelation() instanceof PhysicalFileScan
|
||||
&& pushAggOp.equals(TPushAggOp.COUNT)
|
||||
&& !ConnectContext.get().getSessionVariable().isEnableCountPushDownForExternalTable()) {
|
||||
pushAggOp = TPushAggOp.NONE;
|
||||
}
|
||||
|
||||
context.setRelationPushAggOp(
|
||||
storageLayerAggregate.getRelation().getRelationId(), pushAggOp);
|
||||
|
||||
|
||||
@ -41,6 +41,7 @@ import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FormatOptions;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
|
||||
import org.apache.doris.nereids.PlannerHook;
|
||||
import org.apache.doris.qe.CommonResultSet;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
@ -641,6 +642,21 @@ public class OriginalPlanner extends Planner {
|
||||
List<Column> columns = new ArrayList<>(selectItems.size());
|
||||
List<String> columnLabels = parsedSelectStmt.getColLabels();
|
||||
List<String> data = new ArrayList<>();
|
||||
if ((singleNodePlanner.getScanNodes().size() > 0 && singleNodePlanner.getScanNodes().get(0)
|
||||
instanceof IcebergScanNode) && (((IcebergScanNode) getScanNodes().get(0)).rowCount > 0)) {
|
||||
SelectListItem item = selectItems.get(0);
|
||||
Expr expr = item.getExpr();
|
||||
String columnName = columnLabels.get(0);
|
||||
columns.add(new Column(columnName, expr.getType()));
|
||||
data.add(String.valueOf(((IcebergScanNode) getScanNodes().get(0)).rowCount));
|
||||
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
|
||||
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data));
|
||||
// only support one iceberg scan node and one count, e.g. select count(*) from icetbl;
|
||||
return Optional.of(resultSet);
|
||||
}
|
||||
if (!parsedSelectStmt.getTableRefs().isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
FormatOptions options = FormatOptions.getDefault();
|
||||
for (int i = 0; i < selectItems.size(); i++) {
|
||||
SelectListItem item = selectItems.get(i);
|
||||
|
||||
@ -564,6 +564,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
public static final String FORCE_JNI_SCANNER = "force_jni_scanner";
|
||||
|
||||
public static final String ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE = "enable_count_push_down_for_external_table";
|
||||
|
||||
public static final String SHOW_ALL_FE_CONNECTION = "show_all_fe_connection";
|
||||
|
||||
public static final String MAX_MSG_SIZE_OF_RESULT_RECEIVER = "max_msg_size_of_result_receiver";
|
||||
@ -1757,6 +1759,10 @@ public class SessionVariable implements Serializable, Writable {
|
||||
description = {"强制使用jni方式读取外表", "Force the use of jni mode to read external table"})
|
||||
private boolean forceJniScanner = false;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE,
|
||||
description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown optimization for external table"})
|
||||
private boolean enableCountPushDownForExternalTable = true;
|
||||
|
||||
public static final String IGNORE_RUNTIME_FILTER_IDS = "ignore_runtime_filter_ids";
|
||||
|
||||
public Set<Integer> getIgnoredRuntimeFilterIds() {
|
||||
@ -3919,6 +3925,10 @@ public class SessionVariable implements Serializable, Writable {
|
||||
forceJniScanner = force;
|
||||
}
|
||||
|
||||
public boolean isEnableCountPushDownForExternalTable() {
|
||||
return enableCountPushDownForExternalTable;
|
||||
}
|
||||
|
||||
public boolean isForceToLocalShuffle() {
|
||||
return getEnablePipelineXEngine() && enableLocalShuffle && enableNereidsPlanner && forceToLocalShuffle;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user