diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index 0d4c6d49b8..7734e76db8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -110,6 +110,8 @@ public class ProfileManager { public MultiProfileTreeBuilder builder = null; public String errMsg = ""; + public double qError; + // lazy load profileContent because sometimes profileContent is very large public String getProfileContent() { if (profileContent != null) { @@ -119,6 +121,15 @@ public class ProfileManager { profileContent = profile.toString(); return profileContent; } + + public double getqError() { + return qError; + } + + public void setqError(double qError) { + this.qError = qError; + } + } // only protect queryIdDeque; queryIdToProfileMap is concurrent, no need to protect @@ -252,6 +263,10 @@ public class ProfileManager { } } + public ProfileElement findProfileElementObject(String queryId) { + return queryIdToProfileMap.get(queryId); + } + /** * Check if the query with specific query id is queried by specific user. * @@ -371,4 +386,11 @@ public class ProfileManager { public boolean isQueryProfile(RuntimeProfile profile) { return "Query".equals(profile.getName()); } + + public void setQErrorToProfileElementObject(String queryId, double qError) { + ProfileElement profileElement = findProfileElementObject(queryId); + if (profileElement != null) { + profileElement.setqError(qError); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index aceb7dc1e5..981f7be77d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -137,7 +137,6 @@ public class RuntimeProfile { // preorder traversal, idx should be modified in the traversal process private void update(List nodes, Reference idx) { TRuntimeProfileNode node = nodes.get(idx.getRef()); - // update this level's counters if (node.counters != null) { for (TCounter tcounter : node.counters) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 8456f6c2db..33025b2559 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -101,12 +101,13 @@ public class NereidsPlanner extends Planner { return; } PhysicalPlan physicalPlan = (PhysicalPlan) resultPlan; - PhysicalPlanTranslator physicalPlanTranslator = new PhysicalPlanTranslator(); PlanTranslatorContext planTranslatorContext = new PlanTranslatorContext(cascadesContext); + PhysicalPlanTranslator physicalPlanTranslator = new PhysicalPlanTranslator(planTranslatorContext, + ConnectContext.get().getStatsErrorEstimator()); if (ConnectContext.get().getSessionVariable().isEnableNereidsTrace()) { CounterEvent.clearCounter(); } - PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan, planTranslatorContext); + PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan); scanNodeList = planTranslatorContext.getScanNodes(); descTable = planTranslatorContext.getDescTable(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 74c9517aff..5db3b71fd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -54,6 +54,7 @@ import org.apache.doris.nereids.properties.DistributionSpecReplicated; import org.apache.doris.nereids.properties.OrderKey; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup; +import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.nereids.trees.expressions.AggregateExpression; import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.Cast; @@ -70,6 +71,7 @@ import org.apache.doris.nereids.trees.expressions.WindowFrame; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam; import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral; +import org.apache.doris.nereids.trees.plans.AbstractPlan; import org.apache.doris.nereids.trees.plans.AggMode; import org.apache.doris.nereids.trees.plans.AggPhase; import org.apache.doris.nereids.trees.plans.JoinType; @@ -173,15 +175,30 @@ import java.util.stream.Stream; */ public class PhysicalPlanTranslator extends DefaultPlanVisitor { private static final Logger LOG = LogManager.getLogger(PhysicalPlanTranslator.class); + PlanTranslatorContext context; + + StatsErrorEstimator statsErrorEstimator; + + public PhysicalPlanTranslator() { + } + + public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator statsErrorEstimator) { + this.context = context; + this.statsErrorEstimator = statsErrorEstimator; + } + + public PlanFragment translatePlan(PhysicalPlan physicalPlan, PlanTranslatorContext context) { + this.context = context; + return translatePlan(physicalPlan); + } /** * Translate Nereids Physical Plan tree to Stale Planner PlanFragment tree. * * @param physicalPlan Nereids Physical Plan tree - * @param context context to help translate * @return Stale Planner PlanFragment tree */ - public PlanFragment translatePlan(PhysicalPlan physicalPlan, PlanTranslatorContext context) { + public PlanFragment translatePlan(PhysicalPlan physicalPlan) { PlanFragment rootFragment = physicalPlan.accept(this, context); if (physicalPlan instanceof PhysicalDistribute) { PhysicalDistribute distribute = (PhysicalDistribute) physicalPlan; @@ -369,7 +386,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor()); - PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), unionNode, DataPartition.UNPARTITIONED); + PlanFragment planFragment = createPlanFragment(unionNode, DataPartition.UNPARTITIONED, oneRowRelation); context.addPlanFragment(planFragment); return planFragment; } @@ -521,7 +538,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitorSort, we just need to add sortNode SortNode sortNode = translateSortNode(sort, inputFragment.getPlanRoot(), context); - currentFragment.addPlanRoot(sortNode); + addPlanRoot(currentFragment, sortNode, sort); } else { // For mergeSort, we need to push sortInfo to exchangeNode if (!(currentFragment.getPlanRoot() instanceof ExchangeNode)) { @@ -800,7 +816,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor ExpressionTranslator.translate(e, context)) .forEach(planNode::addConjunct); + updateLegacyPlanIdToPhysicalPlan(planNode, filter); } @Override @@ -1475,7 +1492,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor> legacyPlanIdToPhysicalPlan; + + public StatsErrorEstimator() { + legacyPlanIdToPhysicalPlan = new HashMap<>(); + } + + public void updateLegacyPlanIdToPhysicalPlan(PlanNode planNode, AbstractPlan physicalPlan) { + legacyPlanIdToPhysicalPlan.put(planNode.getId().asInt(), Pair.of(physicalPlan.getStats().getRowCount(), + (double) 0)); + } + + /** + * Q-error: + * q = max_{i=1}^{n}(max(\frac{b^\prime}{b}, \frac{b}{b^\prime}) + */ + public double calculateQError() { + double qError = Double.NEGATIVE_INFINITY; + for (Entry> entry : legacyPlanIdToPhysicalPlan.entrySet()) { + double exactReturnedRows = entry.getValue().second; + double estimateReturnedRows = entry.getValue().first; + qError = Math.max(qError, + Math.max(exactReturnedRows / oneIfZero(estimateReturnedRows), + estimateReturnedRows / oneIfZero(exactReturnedRows))); + } + return qError; + } + + /** + * Update extract returned rows incrementally, since there may be many execution instances of plan fragment. + */ + public void updateExactReturnedRows(TReportExecStatusParams tReportExecStatusParams) { + TUniqueId tUniqueId = tReportExecStatusParams.query_id; + for (TRuntimeProfileNode runtimeProfileNode : tReportExecStatusParams.profile.nodes) { + String name = runtimeProfileNode.name; + int planId = extractPlanNodeIdFromName(name); + if (planId == -1) { + continue; + } + double rowsReturned = runtimeProfileNode.counters.stream() + .filter(p -> p.name.equals("RowsReturned")).mapToDouble(p -> (double) p.getValue()).sum(); + Pair pair = legacyPlanIdToPhysicalPlan.get(planId); + pair.second = pair.second + rowsReturned; + } + double qError = calculateQError(); + ProfileManager.getInstance() + .setQErrorToProfileElementObject(DebugUtil.printId(tUniqueId), qError); + } + + /** + * TODO: The execution report from BE doesn't have any schema, so we have to use regex to extract the plan node id. + */ + private int extractPlanNodeIdFromName(String name) { + Pattern p = Pattern.compile("\\b(?!dst_id=)id=(\\d+)\\b"); + Matcher m = p.matcher(name); + if (!m.find()) { + return -1; + } + return Integer.parseInt(m.group(1)); + } + + private Double extractRowsReturned(String rowsReturnedStr) { + if (rowsReturnedStr == null) { + return 0.0; + } + Pattern p = Pattern.compile("\\((\\d+)\\)"); + Matcher m = p.matcher(rowsReturnedStr); + if (!m.find()) { + return 0.0; + } + return Double.parseDouble(m.group(1)); + } + + private double oneIfZero(double d) { + return d == 0.0 ? 1.0 : d; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 1598ecbdbc..42cdb8402e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -34,6 +34,7 @@ import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.mysql.MysqlSslContext; import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; import org.apache.doris.resource.Tag; import org.apache.doris.thrift.TResourceInfo; @@ -166,6 +167,8 @@ public class ConnectContext { */ private int executionTimeoutS; + private StatsErrorEstimator statsErrorEstimator; + public void setUserQueryTimeout(long queryTimeout) { this.userQueryTimeout = queryTimeout; } @@ -710,5 +713,12 @@ public class ConnectContext { return "stmt[" + stmtId + ", " + DebugUtil.printId(queryId) + "]"; } + public StatsErrorEstimator getStatsErrorEstimator() { + return statsErrorEstimator; + } + + public void setStatsErrorEstimator(StatsErrorEstimator statsErrorEstimator) { + this.statsErrorEstimator = statsErrorEstimator; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 1bf11ece97..81a6bdf4d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -187,6 +187,10 @@ public final class QeProcessorImpl implements QeProcessor { } final TReportExecStatusResult result = new TReportExecStatusResult(); final QueryInfo info = coordinatorMap.get(params.query_id); + if (info != null && info.connectContext != null && info.connectContext.getStatsErrorEstimator() != null) { + info.connectContext.getStatsErrorEstimator().updateExactReturnedRows(params); + } + if (info == null) { result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR)); LOG.info("ReportExecStatus() runtime error, query {} does not exist", DebugUtil.printId(params.query_id)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 39a7d05b32..d0c865c2bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -101,6 +101,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.OriginalPlanner; @@ -441,6 +442,10 @@ public class StmtExecutor implements ProfileWriter { // Exception: // IOException: talk with client failed. public void execute(TUniqueId queryId) throws Exception { + SessionVariable sessionVariable = context.getSessionVariable(); + if (sessionVariable.enableProfile && sessionVariable.isEnableNereidsPlanner()) { + ConnectContext.get().setStatsErrorEstimator(new StatsErrorEstimator()); + } context.setStartTime(); plannerProfile.setQueryBeginTime(); @@ -638,7 +643,6 @@ public class StmtExecutor implements ProfileWriter { } finally { // revert Session Value try { - SessionVariable sessionVariable = context.getSessionVariable(); VariableMgr.revertSessionValue(sessionVariable); // origin value init sessionVariable.setIsSingleSetVar(false); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java index 2da342ff26..eb8f95a47f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java @@ -75,7 +75,7 @@ public class PhysicalPlanTranslatorTest { PhysicalProject> project = new PhysicalProject<>(projList, placeHolder, filter); PlanTranslatorContext planTranslatorContext = new PlanTranslatorContext(); - PhysicalPlanTranslator translator = new PhysicalPlanTranslator(); + PhysicalPlanTranslator translator = new PhysicalPlanTranslator(planTranslatorContext, null); PlanFragment fragment = translator.visitPhysicalProject(project, planTranslatorContext); PlanNode planNode = fragment.getPlanRoot(); List scanNodeList = new ArrayList<>();