[feature-wip](nereids) Support Q-Error to measure the accuracy of derived statistics (#17185)
Collect each estimated output rows and exact output rows for each plan node, and use this to measure the accuracy of derived statistics. The estimated result is managed by ProfileManager. We would get this estimated result in the http request by query id later.
This commit is contained in:
@ -110,6 +110,8 @@ public class ProfileManager {
|
||||
public MultiProfileTreeBuilder builder = null;
|
||||
public String errMsg = "";
|
||||
|
||||
public double qError;
|
||||
|
||||
// lazy load profileContent because sometimes profileContent is very large
|
||||
public String getProfileContent() {
|
||||
if (profileContent != null) {
|
||||
@ -119,6 +121,15 @@ public class ProfileManager {
|
||||
profileContent = profile.toString();
|
||||
return profileContent;
|
||||
}
|
||||
|
||||
public double getqError() {
|
||||
return qError;
|
||||
}
|
||||
|
||||
public void setqError(double qError) {
|
||||
this.qError = qError;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// only protect queryIdDeque; queryIdToProfileMap is concurrent, no need to protect
|
||||
@ -252,6 +263,10 @@ public class ProfileManager {
|
||||
}
|
||||
}
|
||||
|
||||
public ProfileElement findProfileElementObject(String queryId) {
|
||||
return queryIdToProfileMap.get(queryId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the query with specific query id is queried by specific user.
|
||||
*
|
||||
@ -371,4 +386,11 @@ public class ProfileManager {
|
||||
public boolean isQueryProfile(RuntimeProfile profile) {
|
||||
return "Query".equals(profile.getName());
|
||||
}
|
||||
|
||||
public void setQErrorToProfileElementObject(String queryId, double qError) {
|
||||
ProfileElement profileElement = findProfileElementObject(queryId);
|
||||
if (profileElement != null) {
|
||||
profileElement.setqError(qError);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -137,7 +137,6 @@ public class RuntimeProfile {
|
||||
// preorder traversal, idx should be modified in the traversal process
|
||||
private void update(List<TRuntimeProfileNode> nodes, Reference<Integer> idx) {
|
||||
TRuntimeProfileNode node = nodes.get(idx.getRef());
|
||||
|
||||
// update this level's counters
|
||||
if (node.counters != null) {
|
||||
for (TCounter tcounter : node.counters) {
|
||||
|
||||
@ -101,12 +101,13 @@ public class NereidsPlanner extends Planner {
|
||||
return;
|
||||
}
|
||||
PhysicalPlan physicalPlan = (PhysicalPlan) resultPlan;
|
||||
PhysicalPlanTranslator physicalPlanTranslator = new PhysicalPlanTranslator();
|
||||
PlanTranslatorContext planTranslatorContext = new PlanTranslatorContext(cascadesContext);
|
||||
PhysicalPlanTranslator physicalPlanTranslator = new PhysicalPlanTranslator(planTranslatorContext,
|
||||
ConnectContext.get().getStatsErrorEstimator());
|
||||
if (ConnectContext.get().getSessionVariable().isEnableNereidsTrace()) {
|
||||
CounterEvent.clearCounter();
|
||||
}
|
||||
PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan, planTranslatorContext);
|
||||
PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan);
|
||||
|
||||
scanNodeList = planTranslatorContext.getScanNodes();
|
||||
descTable = planTranslatorContext.getDescTable();
|
||||
|
||||
@ -54,6 +54,7 @@ import org.apache.doris.nereids.properties.DistributionSpecReplicated;
|
||||
import org.apache.doris.nereids.properties.OrderKey;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup;
|
||||
import org.apache.doris.nereids.stats.StatsErrorEstimator;
|
||||
import org.apache.doris.nereids.trees.expressions.AggregateExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.Alias;
|
||||
import org.apache.doris.nereids.trees.expressions.Cast;
|
||||
@ -70,6 +71,7 @@ import org.apache.doris.nereids.trees.expressions.WindowFrame;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
|
||||
import org.apache.doris.nereids.trees.plans.AbstractPlan;
|
||||
import org.apache.doris.nereids.trees.plans.AggMode;
|
||||
import org.apache.doris.nereids.trees.plans.AggPhase;
|
||||
import org.apache.doris.nereids.trees.plans.JoinType;
|
||||
@ -173,15 +175,30 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, PlanTranslatorContext> {
|
||||
private static final Logger LOG = LogManager.getLogger(PhysicalPlanTranslator.class);
|
||||
PlanTranslatorContext context;
|
||||
|
||||
StatsErrorEstimator statsErrorEstimator;
|
||||
|
||||
public PhysicalPlanTranslator() {
|
||||
}
|
||||
|
||||
public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator statsErrorEstimator) {
|
||||
this.context = context;
|
||||
this.statsErrorEstimator = statsErrorEstimator;
|
||||
}
|
||||
|
||||
public PlanFragment translatePlan(PhysicalPlan physicalPlan, PlanTranslatorContext context) {
|
||||
this.context = context;
|
||||
return translatePlan(physicalPlan);
|
||||
}
|
||||
|
||||
/**
|
||||
* Translate Nereids Physical Plan tree to Stale Planner PlanFragment tree.
|
||||
*
|
||||
* @param physicalPlan Nereids Physical Plan tree
|
||||
* @param context context to help translate
|
||||
* @return Stale Planner PlanFragment tree
|
||||
*/
|
||||
public PlanFragment translatePlan(PhysicalPlan physicalPlan, PlanTranslatorContext context) {
|
||||
public PlanFragment translatePlan(PhysicalPlan physicalPlan) {
|
||||
PlanFragment rootFragment = physicalPlan.accept(this, context);
|
||||
if (physicalPlan instanceof PhysicalDistribute) {
|
||||
PhysicalDistribute distribute = (PhysicalDistribute) physicalPlan;
|
||||
@ -369,7 +386,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
inputPlanFragment.getPlanRoot(), groupingInfo, repeatSlotIdList,
|
||||
allSlotId, repeat.computeVirtualSlotValues(sortedVirtualSlots));
|
||||
repeatNode.setNumInstances(inputPlanFragment.getPlanRoot().getNumInstances());
|
||||
inputPlanFragment.addPlanRoot(repeatNode);
|
||||
addPlanRoot(inputPlanFragment, repeatNode, repeat);
|
||||
inputPlanFragment.updateDataPartition(DataPartition.RANDOM);
|
||||
return inputPlanFragment;
|
||||
}
|
||||
@ -388,8 +405,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
tupleIds.add(tupleDescriptor.getId());
|
||||
EmptySetNode emptySetNode = new EmptySetNode(context.nextPlanNodeId(), tupleIds);
|
||||
|
||||
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), emptySetNode,
|
||||
DataPartition.UNPARTITIONED);
|
||||
PlanFragment planFragment = createPlanFragment(emptySetNode,
|
||||
DataPartition.UNPARTITIONED, emptyRelation);
|
||||
context.addPlanFragment(planFragment);
|
||||
return planFragment;
|
||||
}
|
||||
@ -421,7 +438,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
unionNode.addConstExprList(legacyExprs);
|
||||
unionNode.finalizeForNereids(oneRowTuple.getSlots(), new ArrayList<>());
|
||||
|
||||
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), unionNode, DataPartition.UNPARTITIONED);
|
||||
PlanFragment planFragment = createPlanFragment(unionNode, DataPartition.UNPARTITIONED, oneRowRelation);
|
||||
context.addPlanFragment(planFragment);
|
||||
return planFragment;
|
||||
}
|
||||
@ -521,7 +538,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
.map(context::findSlotRef).collect(Collectors.toList());
|
||||
dataPartition = new DataPartition(TPartitionType.HASH_PARTITIONED, partitionExprs);
|
||||
}
|
||||
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), olapScanNode, dataPartition);
|
||||
PlanFragment planFragment = createPlanFragment(olapScanNode, dataPartition, olapScan);
|
||||
context.addPlanFragment(planFragment);
|
||||
return planFragment;
|
||||
}
|
||||
@ -544,8 +561,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
);
|
||||
scanNode.finalizeForNereids();
|
||||
context.getScanNodes().add(scanNode);
|
||||
PlanFragment planFragment =
|
||||
new PlanFragment(context.nextFragmentId(), scanNode, DataPartition.RANDOM);
|
||||
PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan);
|
||||
context.addPlanFragment(planFragment);
|
||||
return planFragment;
|
||||
}
|
||||
@ -573,7 +589,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
Utils.execWithUncheckedException(fileScanNode::finalizeForNereids);
|
||||
// Create PlanFragment
|
||||
DataPartition dataPartition = DataPartition.RANDOM;
|
||||
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), fileScanNode, dataPartition);
|
||||
PlanFragment planFragment = createPlanFragment(fileScanNode, dataPartition, fileScan);
|
||||
context.addPlanFragment(planFragment);
|
||||
return planFragment;
|
||||
}
|
||||
@ -600,7 +616,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
context.findSlotRef(slot.getExprId()).setLabel(tableColumnName);
|
||||
}
|
||||
|
||||
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), scanNode, DataPartition.RANDOM);
|
||||
PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, tvfRelation);
|
||||
context.addPlanFragment(planFragment);
|
||||
return planFragment;
|
||||
}
|
||||
@ -695,7 +711,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
if (!sort.getSortPhase().isMerge()) {
|
||||
// For localSort or Gather->Sort, we just need to add sortNode
|
||||
SortNode sortNode = translateSortNode(sort, inputFragment.getPlanRoot(), context);
|
||||
currentFragment.addPlanRoot(sortNode);
|
||||
addPlanRoot(currentFragment, sortNode, sort);
|
||||
} else {
|
||||
// For mergeSort, we need to push sortInfo to exchangeNode
|
||||
if (!(currentFragment.getPlanRoot() instanceof ExchangeNode)) {
|
||||
@ -800,7 +816,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
} else {
|
||||
analyticEvalNode.setNumInstances(inputPlanFragment.getPlanRoot().getNumInstances());
|
||||
}
|
||||
inputPlanFragment.addPlanRoot(analyticEvalNode);
|
||||
addPlanRoot(inputPlanFragment, analyticEvalNode, physicalWindow);
|
||||
return inputPlanFragment;
|
||||
}
|
||||
|
||||
@ -831,7 +847,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
|
||||
sortNode.setOffset(topN.getOffset());
|
||||
sortNode.setLimit(topN.getLimit());
|
||||
currentFragment.addPlanRoot(sortNode);
|
||||
addPlanRoot(currentFragment, sortNode, topN);
|
||||
} else {
|
||||
// For mergeSort, we need to push sortInfo to exchangeNode
|
||||
if (!(currentFragment.getPlanRoot() instanceof ExchangeNode)) {
|
||||
@ -1163,8 +1179,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
boolean needNewRootFragment = nestedLoopJoin.child(0) instanceof PhysicalDistribute;
|
||||
PlanFragment joinFragment;
|
||||
if (needNewRootFragment) {
|
||||
joinFragment = new PlanFragment(context.nextFragmentId(), nestedLoopJoinNode,
|
||||
DataPartition.UNPARTITIONED);
|
||||
joinFragment = createPlanFragment(nestedLoopJoinNode,
|
||||
DataPartition.UNPARTITIONED, nestedLoopJoin);
|
||||
context.addPlanFragment(joinFragment);
|
||||
} else {
|
||||
joinFragment = leftFragment;
|
||||
@ -1408,7 +1424,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
// the three nodes don't support conjuncts, need create a SelectNode to filter data
|
||||
SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), planNode);
|
||||
addConjunctsToPlanNode(filter, selectNode, context);
|
||||
inputFragment.addPlanRoot(selectNode);
|
||||
addPlanRoot(inputFragment, selectNode, filter);
|
||||
} else {
|
||||
if (!(filter.child(0) instanceof AbstractPhysicalJoin)) {
|
||||
addConjunctsToPlanNode(filter, planNode, context);
|
||||
@ -1423,6 +1439,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
filter.getConjuncts().stream()
|
||||
.map(e -> ExpressionTranslator.translate(e, context))
|
||||
.forEach(planNode::addConjunct);
|
||||
updateLegacyPlanIdToPhysicalPlan(planNode, filter);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1475,7 +1492,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
currentFragment.setPlanRoot(currentFragment.getPlanRoot().getChild(0));
|
||||
currentFragment = createParentFragment(currentFragment, DataPartition.UNPARTITIONED, context);
|
||||
}
|
||||
currentFragment.addPlanRoot(assertNumRowsNode);
|
||||
addPlanRoot(currentFragment, assertNumRowsNode, assertNumRows);
|
||||
return currentFragment;
|
||||
}
|
||||
|
||||
@ -1555,8 +1572,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
// If all child fragments are unpartitioned, return a single unpartitioned fragment
|
||||
// with a UnionNode that merges all child fragments.
|
||||
if (allChildFragmentsUnPartitioned(childrenFragments)) {
|
||||
setOperationFragment = new PlanFragment(
|
||||
context.nextFragmentId(), setOperationNode, DataPartition.UNPARTITIONED);
|
||||
setOperationFragment = createPlanFragment(setOperationNode, DataPartition.UNPARTITIONED, setOperation);
|
||||
// Absorb the plan trees of all childFragments into unionNode
|
||||
// and fix up the fragment tree in the process.
|
||||
for (int i = 0; i < childrenFragments.size(); ++i) {
|
||||
@ -1565,9 +1581,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
context);
|
||||
}
|
||||
} else {
|
||||
setOperationFragment = new PlanFragment(context.nextFragmentId(), setOperationNode,
|
||||
setOperationFragment = createPlanFragment(setOperationNode,
|
||||
new DataPartition(TPartitionType.HASH_PARTITIONED,
|
||||
setOperationNode.getMaterializedResultExprLists().get(0)));
|
||||
setOperationNode.getMaterializedResultExprLists().get(0)), setOperation);
|
||||
for (int i = 0; i < childrenFragments.size(); ++i) {
|
||||
PlanFragment childFragment = childrenFragments.get(i);
|
||||
// Connect the unpartitioned child fragments to SetOperationNode via a random exchange.
|
||||
@ -1597,7 +1613,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
.collect(Collectors.toList());
|
||||
TableFunctionNode tableFunctionNode = new TableFunctionNode(context.nextPlanNodeId(),
|
||||
currentFragment.getPlanRoot(), tupleDescriptor.getId(), functionCalls, outputSlotIds);
|
||||
currentFragment.addPlanRoot(tableFunctionNode);
|
||||
addPlanRoot(currentFragment, tableFunctionNode, generate);
|
||||
return currentFragment;
|
||||
}
|
||||
|
||||
@ -1856,7 +1872,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
Expr.cloneList(lhsJoinExprs, null));
|
||||
DataPartition rhsJoinPartition =
|
||||
new DataPartition(TPartitionType.HASH_PARTITIONED, rhsJoinExprs);
|
||||
PlanFragment joinFragment = new PlanFragment(context.nextFragmentId(), hashJoinNode, lhsJoinPartition);
|
||||
PlanFragment joinFragment = createPlanFragment(hashJoinNode, lhsJoinPartition, physicalHashJoin);
|
||||
context.addPlanFragment(joinFragment);
|
||||
|
||||
connectChildFragment(hashJoinNode, 0, joinFragment, leftFragment, context);
|
||||
@ -2058,4 +2074,21 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
&& selectIndexColumns.contains(((SlotReference) slot).getColumn().get()))
|
||||
.collect(ImmutableList.toImmutableList());
|
||||
}
|
||||
|
||||
private PlanFragment createPlanFragment(PlanNode planNode, DataPartition dataPartition, AbstractPlan physicalPlan) {
|
||||
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), planNode, dataPartition);
|
||||
updateLegacyPlanIdToPhysicalPlan(planNode, physicalPlan);
|
||||
return planFragment;
|
||||
}
|
||||
|
||||
private void addPlanRoot(PlanFragment fragment, PlanNode planNode, AbstractPlan physicalPlan) {
|
||||
fragment.addPlanRoot(planNode);
|
||||
updateLegacyPlanIdToPhysicalPlan(planNode, physicalPlan);
|
||||
}
|
||||
|
||||
private void updateLegacyPlanIdToPhysicalPlan(PlanNode planNode, AbstractPlan physicalPlan) {
|
||||
if (statsErrorEstimator != null) {
|
||||
statsErrorEstimator.updateLegacyPlanIdToPhysicalPlan(planNode, physicalPlan);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,114 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.stats;
|
||||
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.ProfileManager;
|
||||
import org.apache.doris.nereids.trees.plans.AbstractPlan;
|
||||
import org.apache.doris.planner.PlanNode;
|
||||
import org.apache.doris.thrift.TReportExecStatusParams;
|
||||
import org.apache.doris.thrift.TRuntimeProfileNode;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* Used to estimate the bias of stats estimation.
|
||||
*/
|
||||
public class StatsErrorEstimator {
|
||||
private Map<Integer, Pair<Double, Double>> legacyPlanIdToPhysicalPlan;
|
||||
|
||||
public StatsErrorEstimator() {
|
||||
legacyPlanIdToPhysicalPlan = new HashMap<>();
|
||||
}
|
||||
|
||||
public void updateLegacyPlanIdToPhysicalPlan(PlanNode planNode, AbstractPlan physicalPlan) {
|
||||
legacyPlanIdToPhysicalPlan.put(planNode.getId().asInt(), Pair.of(physicalPlan.getStats().getRowCount(),
|
||||
(double) 0));
|
||||
}
|
||||
|
||||
/**
|
||||
* Q-error:
|
||||
* q = max_{i=1}^{n}(max(\frac{b^\prime}{b}, \frac{b}{b^\prime})
|
||||
*/
|
||||
public double calculateQError() {
|
||||
double qError = Double.NEGATIVE_INFINITY;
|
||||
for (Entry<Integer, Pair<Double, Double>> entry : legacyPlanIdToPhysicalPlan.entrySet()) {
|
||||
double exactReturnedRows = entry.getValue().second;
|
||||
double estimateReturnedRows = entry.getValue().first;
|
||||
qError = Math.max(qError,
|
||||
Math.max(exactReturnedRows / oneIfZero(estimateReturnedRows),
|
||||
estimateReturnedRows / oneIfZero(exactReturnedRows)));
|
||||
}
|
||||
return qError;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update extract returned rows incrementally, since there may be many execution instances of plan fragment.
|
||||
*/
|
||||
public void updateExactReturnedRows(TReportExecStatusParams tReportExecStatusParams) {
|
||||
TUniqueId tUniqueId = tReportExecStatusParams.query_id;
|
||||
for (TRuntimeProfileNode runtimeProfileNode : tReportExecStatusParams.profile.nodes) {
|
||||
String name = runtimeProfileNode.name;
|
||||
int planId = extractPlanNodeIdFromName(name);
|
||||
if (planId == -1) {
|
||||
continue;
|
||||
}
|
||||
double rowsReturned = runtimeProfileNode.counters.stream()
|
||||
.filter(p -> p.name.equals("RowsReturned")).mapToDouble(p -> (double) p.getValue()).sum();
|
||||
Pair<Double, Double> pair = legacyPlanIdToPhysicalPlan.get(planId);
|
||||
pair.second = pair.second + rowsReturned;
|
||||
}
|
||||
double qError = calculateQError();
|
||||
ProfileManager.getInstance()
|
||||
.setQErrorToProfileElementObject(DebugUtil.printId(tUniqueId), qError);
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO: The execution report from BE doesn't have any schema, so we have to use regex to extract the plan node id.
|
||||
*/
|
||||
private int extractPlanNodeIdFromName(String name) {
|
||||
Pattern p = Pattern.compile("\\b(?!dst_id=)id=(\\d+)\\b");
|
||||
Matcher m = p.matcher(name);
|
||||
if (!m.find()) {
|
||||
return -1;
|
||||
}
|
||||
return Integer.parseInt(m.group(1));
|
||||
}
|
||||
|
||||
private Double extractRowsReturned(String rowsReturnedStr) {
|
||||
if (rowsReturnedStr == null) {
|
||||
return 0.0;
|
||||
}
|
||||
Pattern p = Pattern.compile("\\((\\d+)\\)");
|
||||
Matcher m = p.matcher(rowsReturnedStr);
|
||||
if (!m.find()) {
|
||||
return 0.0;
|
||||
}
|
||||
return Double.parseDouble(m.group(1));
|
||||
}
|
||||
|
||||
private double oneIfZero(double d) {
|
||||
return d == 0.0 ? 1.0 : d;
|
||||
}
|
||||
}
|
||||
@ -34,6 +34,7 @@ import org.apache.doris.mysql.MysqlChannel;
|
||||
import org.apache.doris.mysql.MysqlCommand;
|
||||
import org.apache.doris.mysql.MysqlSslContext;
|
||||
import org.apache.doris.nereids.StatementContext;
|
||||
import org.apache.doris.nereids.stats.StatsErrorEstimator;
|
||||
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.thrift.TResourceInfo;
|
||||
@ -166,6 +167,8 @@ public class ConnectContext {
|
||||
*/
|
||||
private int executionTimeoutS;
|
||||
|
||||
private StatsErrorEstimator statsErrorEstimator;
|
||||
|
||||
public void setUserQueryTimeout(long queryTimeout) {
|
||||
this.userQueryTimeout = queryTimeout;
|
||||
}
|
||||
@ -710,5 +713,12 @@ public class ConnectContext {
|
||||
return "stmt[" + stmtId + ", " + DebugUtil.printId(queryId) + "]";
|
||||
}
|
||||
|
||||
public StatsErrorEstimator getStatsErrorEstimator() {
|
||||
return statsErrorEstimator;
|
||||
}
|
||||
|
||||
public void setStatsErrorEstimator(StatsErrorEstimator statsErrorEstimator) {
|
||||
this.statsErrorEstimator = statsErrorEstimator;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -187,6 +187,10 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
}
|
||||
final TReportExecStatusResult result = new TReportExecStatusResult();
|
||||
final QueryInfo info = coordinatorMap.get(params.query_id);
|
||||
if (info != null && info.connectContext != null && info.connectContext.getStatsErrorEstimator() != null) {
|
||||
info.connectContext.getStatsErrorEstimator().updateExactReturnedRows(params);
|
||||
}
|
||||
|
||||
if (info == null) {
|
||||
result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR));
|
||||
LOG.info("ReportExecStatus() runtime error, query {} does not exist", DebugUtil.printId(params.query_id));
|
||||
|
||||
@ -101,6 +101,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.nereids.NereidsPlanner;
|
||||
import org.apache.doris.nereids.StatementContext;
|
||||
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
|
||||
import org.apache.doris.nereids.stats.StatsErrorEstimator;
|
||||
import org.apache.doris.nereids.trees.plans.commands.Command;
|
||||
import org.apache.doris.planner.OlapScanNode;
|
||||
import org.apache.doris.planner.OriginalPlanner;
|
||||
@ -441,6 +442,10 @@ public class StmtExecutor implements ProfileWriter {
|
||||
// Exception:
|
||||
// IOException: talk with client failed.
|
||||
public void execute(TUniqueId queryId) throws Exception {
|
||||
SessionVariable sessionVariable = context.getSessionVariable();
|
||||
if (sessionVariable.enableProfile && sessionVariable.isEnableNereidsPlanner()) {
|
||||
ConnectContext.get().setStatsErrorEstimator(new StatsErrorEstimator());
|
||||
}
|
||||
context.setStartTime();
|
||||
|
||||
plannerProfile.setQueryBeginTime();
|
||||
@ -638,7 +643,6 @@ public class StmtExecutor implements ProfileWriter {
|
||||
} finally {
|
||||
// revert Session Value
|
||||
try {
|
||||
SessionVariable sessionVariable = context.getSessionVariable();
|
||||
VariableMgr.revertSessionValue(sessionVariable);
|
||||
// origin value init
|
||||
sessionVariable.setIsSingleSetVar(false);
|
||||
|
||||
@ -75,7 +75,7 @@ public class PhysicalPlanTranslatorTest {
|
||||
PhysicalProject<PhysicalFilter<PhysicalOlapScan>> project = new PhysicalProject<>(projList,
|
||||
placeHolder, filter);
|
||||
PlanTranslatorContext planTranslatorContext = new PlanTranslatorContext();
|
||||
PhysicalPlanTranslator translator = new PhysicalPlanTranslator();
|
||||
PhysicalPlanTranslator translator = new PhysicalPlanTranslator(planTranslatorContext, null);
|
||||
PlanFragment fragment = translator.visitPhysicalProject(project, planTranslatorContext);
|
||||
PlanNode planNode = fragment.getPlanRoot();
|
||||
List<OlapScanNode> scanNodeList = new ArrayList<>();
|
||||
|
||||
Reference in New Issue
Block a user