From 36b6cea462cffdb139c9062ec9babfe1c0b45821 Mon Sep 17 00:00:00 2001 From: AKIRA <33112463+Kikyou1997@users.noreply.github.com> Date: Wed, 8 Mar 2023 17:26:24 +0900 Subject: [PATCH] [feature-wip](nereids) Support Q-Error to measure the accuracy of derived statistics (#17185) Collect each estimated output rows and exact output rows for each plan node, and use this to measure the accuracy of derived statistics. The estimated result is managed by ProfileManager. We would get this estimated result in the http request by query id later. --- .../doris/common/util/ProfileManager.java | 22 ++++ .../doris/common/util/RuntimeProfile.java | 1 - .../apache/doris/nereids/NereidsPlanner.java | 5 +- .../translator/PhysicalPlanTranslator.java | 81 +++++++++---- .../nereids/stats/StatsErrorEstimator.java | 114 ++++++++++++++++++ .../org/apache/doris/qe/ConnectContext.java | 10 ++ .../org/apache/doris/qe/QeProcessorImpl.java | 4 + .../org/apache/doris/qe/StmtExecutor.java | 6 +- .../PhysicalPlanTranslatorTest.java | 2 +- 9 files changed, 216 insertions(+), 29 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsErrorEstimator.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index 0d4c6d49b8..7734e76db8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -110,6 +110,8 @@ public class ProfileManager { public MultiProfileTreeBuilder builder = null; public String errMsg = ""; + public double qError; + // lazy load profileContent because sometimes profileContent is very large public String getProfileContent() { if (profileContent != null) { @@ -119,6 +121,15 @@ public class ProfileManager { profileContent = profile.toString(); return profileContent; } + + public double getqError() { + return qError; + } + + public void setqError(double qError) { + this.qError = qError; + } + } // only protect queryIdDeque; queryIdToProfileMap is concurrent, no need to protect @@ -252,6 +263,10 @@ public class ProfileManager { } } + public ProfileElement findProfileElementObject(String queryId) { + return queryIdToProfileMap.get(queryId); + } + /** * Check if the query with specific query id is queried by specific user. * @@ -371,4 +386,11 @@ public class ProfileManager { public boolean isQueryProfile(RuntimeProfile profile) { return "Query".equals(profile.getName()); } + + public void setQErrorToProfileElementObject(String queryId, double qError) { + ProfileElement profileElement = findProfileElementObject(queryId); + if (profileElement != null) { + profileElement.setqError(qError); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index aceb7dc1e5..981f7be77d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -137,7 +137,6 @@ public class RuntimeProfile { // preorder traversal, idx should be modified in the traversal process private void update(List nodes, Reference idx) { TRuntimeProfileNode node = nodes.get(idx.getRef()); - // update this level's counters if (node.counters != null) { for (TCounter tcounter : node.counters) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 8456f6c2db..33025b2559 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -101,12 +101,13 @@ public class NereidsPlanner extends Planner { return; } PhysicalPlan physicalPlan = (PhysicalPlan) resultPlan; - PhysicalPlanTranslator physicalPlanTranslator = new PhysicalPlanTranslator(); PlanTranslatorContext planTranslatorContext = new PlanTranslatorContext(cascadesContext); + PhysicalPlanTranslator physicalPlanTranslator = new PhysicalPlanTranslator(planTranslatorContext, + ConnectContext.get().getStatsErrorEstimator()); if (ConnectContext.get().getSessionVariable().isEnableNereidsTrace()) { CounterEvent.clearCounter(); } - PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan, planTranslatorContext); + PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan); scanNodeList = planTranslatorContext.getScanNodes(); descTable = planTranslatorContext.getDescTable(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 74c9517aff..5db3b71fd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -54,6 +54,7 @@ import org.apache.doris.nereids.properties.DistributionSpecReplicated; import org.apache.doris.nereids.properties.OrderKey; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup; +import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.nereids.trees.expressions.AggregateExpression; import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.Cast; @@ -70,6 +71,7 @@ import org.apache.doris.nereids.trees.expressions.WindowFrame; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam; import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral; +import org.apache.doris.nereids.trees.plans.AbstractPlan; import org.apache.doris.nereids.trees.plans.AggMode; import org.apache.doris.nereids.trees.plans.AggPhase; import org.apache.doris.nereids.trees.plans.JoinType; @@ -173,15 +175,30 @@ import java.util.stream.Stream; */ public class PhysicalPlanTranslator extends DefaultPlanVisitor { private static final Logger LOG = LogManager.getLogger(PhysicalPlanTranslator.class); + PlanTranslatorContext context; + + StatsErrorEstimator statsErrorEstimator; + + public PhysicalPlanTranslator() { + } + + public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator statsErrorEstimator) { + this.context = context; + this.statsErrorEstimator = statsErrorEstimator; + } + + public PlanFragment translatePlan(PhysicalPlan physicalPlan, PlanTranslatorContext context) { + this.context = context; + return translatePlan(physicalPlan); + } /** * Translate Nereids Physical Plan tree to Stale Planner PlanFragment tree. * * @param physicalPlan Nereids Physical Plan tree - * @param context context to help translate * @return Stale Planner PlanFragment tree */ - public PlanFragment translatePlan(PhysicalPlan physicalPlan, PlanTranslatorContext context) { + public PlanFragment translatePlan(PhysicalPlan physicalPlan) { PlanFragment rootFragment = physicalPlan.accept(this, context); if (physicalPlan instanceof PhysicalDistribute) { PhysicalDistribute distribute = (PhysicalDistribute) physicalPlan; @@ -369,7 +386,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor()); - PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), unionNode, DataPartition.UNPARTITIONED); + PlanFragment planFragment = createPlanFragment(unionNode, DataPartition.UNPARTITIONED, oneRowRelation); context.addPlanFragment(planFragment); return planFragment; } @@ -521,7 +538,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitorSort, we just need to add sortNode SortNode sortNode = translateSortNode(sort, inputFragment.getPlanRoot(), context); - currentFragment.addPlanRoot(sortNode); + addPlanRoot(currentFragment, sortNode, sort); } else { // For mergeSort, we need to push sortInfo to exchangeNode if (!(currentFragment.getPlanRoot() instanceof ExchangeNode)) { @@ -800,7 +816,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor ExpressionTranslator.translate(e, context)) .forEach(planNode::addConjunct); + updateLegacyPlanIdToPhysicalPlan(planNode, filter); } @Override @@ -1475,7 +1492,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor> legacyPlanIdToPhysicalPlan; + + public StatsErrorEstimator() { + legacyPlanIdToPhysicalPlan = new HashMap<>(); + } + + public void updateLegacyPlanIdToPhysicalPlan(PlanNode planNode, AbstractPlan physicalPlan) { + legacyPlanIdToPhysicalPlan.put(planNode.getId().asInt(), Pair.of(physicalPlan.getStats().getRowCount(), + (double) 0)); + } + + /** + * Q-error: + * q = max_{i=1}^{n}(max(\frac{b^\prime}{b}, \frac{b}{b^\prime}) + */ + public double calculateQError() { + double qError = Double.NEGATIVE_INFINITY; + for (Entry> entry : legacyPlanIdToPhysicalPlan.entrySet()) { + double exactReturnedRows = entry.getValue().second; + double estimateReturnedRows = entry.getValue().first; + qError = Math.max(qError, + Math.max(exactReturnedRows / oneIfZero(estimateReturnedRows), + estimateReturnedRows / oneIfZero(exactReturnedRows))); + } + return qError; + } + + /** + * Update extract returned rows incrementally, since there may be many execution instances of plan fragment. + */ + public void updateExactReturnedRows(TReportExecStatusParams tReportExecStatusParams) { + TUniqueId tUniqueId = tReportExecStatusParams.query_id; + for (TRuntimeProfileNode runtimeProfileNode : tReportExecStatusParams.profile.nodes) { + String name = runtimeProfileNode.name; + int planId = extractPlanNodeIdFromName(name); + if (planId == -1) { + continue; + } + double rowsReturned = runtimeProfileNode.counters.stream() + .filter(p -> p.name.equals("RowsReturned")).mapToDouble(p -> (double) p.getValue()).sum(); + Pair pair = legacyPlanIdToPhysicalPlan.get(planId); + pair.second = pair.second + rowsReturned; + } + double qError = calculateQError(); + ProfileManager.getInstance() + .setQErrorToProfileElementObject(DebugUtil.printId(tUniqueId), qError); + } + + /** + * TODO: The execution report from BE doesn't have any schema, so we have to use regex to extract the plan node id. + */ + private int extractPlanNodeIdFromName(String name) { + Pattern p = Pattern.compile("\\b(?!dst_id=)id=(\\d+)\\b"); + Matcher m = p.matcher(name); + if (!m.find()) { + return -1; + } + return Integer.parseInt(m.group(1)); + } + + private Double extractRowsReturned(String rowsReturnedStr) { + if (rowsReturnedStr == null) { + return 0.0; + } + Pattern p = Pattern.compile("\\((\\d+)\\)"); + Matcher m = p.matcher(rowsReturnedStr); + if (!m.find()) { + return 0.0; + } + return Double.parseDouble(m.group(1)); + } + + private double oneIfZero(double d) { + return d == 0.0 ? 1.0 : d; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 1598ecbdbc..42cdb8402e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -34,6 +34,7 @@ import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.mysql.MysqlSslContext; import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; import org.apache.doris.resource.Tag; import org.apache.doris.thrift.TResourceInfo; @@ -166,6 +167,8 @@ public class ConnectContext { */ private int executionTimeoutS; + private StatsErrorEstimator statsErrorEstimator; + public void setUserQueryTimeout(long queryTimeout) { this.userQueryTimeout = queryTimeout; } @@ -710,5 +713,12 @@ public class ConnectContext { return "stmt[" + stmtId + ", " + DebugUtil.printId(queryId) + "]"; } + public StatsErrorEstimator getStatsErrorEstimator() { + return statsErrorEstimator; + } + + public void setStatsErrorEstimator(StatsErrorEstimator statsErrorEstimator) { + this.statsErrorEstimator = statsErrorEstimator; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 1bf11ece97..81a6bdf4d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -187,6 +187,10 @@ public final class QeProcessorImpl implements QeProcessor { } final TReportExecStatusResult result = new TReportExecStatusResult(); final QueryInfo info = coordinatorMap.get(params.query_id); + if (info != null && info.connectContext != null && info.connectContext.getStatsErrorEstimator() != null) { + info.connectContext.getStatsErrorEstimator().updateExactReturnedRows(params); + } + if (info == null) { result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR)); LOG.info("ReportExecStatus() runtime error, query {} does not exist", DebugUtil.printId(params.query_id)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 39a7d05b32..d0c865c2bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -101,6 +101,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.OriginalPlanner; @@ -441,6 +442,10 @@ public class StmtExecutor implements ProfileWriter { // Exception: // IOException: talk with client failed. public void execute(TUniqueId queryId) throws Exception { + SessionVariable sessionVariable = context.getSessionVariable(); + if (sessionVariable.enableProfile && sessionVariable.isEnableNereidsPlanner()) { + ConnectContext.get().setStatsErrorEstimator(new StatsErrorEstimator()); + } context.setStartTime(); plannerProfile.setQueryBeginTime(); @@ -638,7 +643,6 @@ public class StmtExecutor implements ProfileWriter { } finally { // revert Session Value try { - SessionVariable sessionVariable = context.getSessionVariable(); VariableMgr.revertSessionValue(sessionVariable); // origin value init sessionVariable.setIsSingleSetVar(false); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java index 2da342ff26..eb8f95a47f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java @@ -75,7 +75,7 @@ public class PhysicalPlanTranslatorTest { PhysicalProject> project = new PhysicalProject<>(projList, placeHolder, filter); PlanTranslatorContext planTranslatorContext = new PlanTranslatorContext(); - PhysicalPlanTranslator translator = new PhysicalPlanTranslator(); + PhysicalPlanTranslator translator = new PhysicalPlanTranslator(planTranslatorContext, null); PlanFragment fragment = translator.visitPhysicalProject(project, planTranslatorContext); PlanNode planNode = fragment.getPlanRoot(); List scanNodeList = new ArrayList<>();