(Refactor)[Planner] Remove merge node (#9251)
This commit is contained in:
@ -213,8 +213,6 @@ public class DistributedPlanner {
|
||||
result = createSelectNodeFragment((SelectNode) root, childFragments);
|
||||
} else if (root instanceof SetOperationNode) {
|
||||
result = createSetOperationNodeFragment((SetOperationNode) root, childFragments, fragments);
|
||||
} else if (root instanceof MergeNode) {
|
||||
result = createMergeNodeFragment((MergeNode) root, childFragments, fragments);
|
||||
} else if (root instanceof AggregationNode) {
|
||||
result = createAggregationFragment((AggregationNode) root, childFragments.get(0), fragments);
|
||||
} else if (root instanceof SortNode) {
|
||||
@ -692,67 +690,6 @@ public class DistributedPlanner {
|
||||
return leftChildFragment;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an unpartitioned fragment that merges the outputs of all of its children (with a single ExchangeNode),
|
||||
* corresponding to the 'mergeNode' of the non-distributed plan. Each of the child fragments receives a MergeNode as
|
||||
* a new plan root (with the child fragment's plan tree as its only input), so that each child fragment's output is
|
||||
* mapped onto the MergeNode's result tuple id. TODO: if this is implementing a UNION DISTINCT, the parent of the
|
||||
* mergeNode is a duplicate-removing AggregationNode, which might make sense to apply to the children as well, in
|
||||
* order to reduce the amount of data that needs to be sent to the parent; augment the planner to decide whether
|
||||
* that would reduce the runtime. TODO: since the fragment that does the merge is unpartitioned, it can absorb all
|
||||
* child fragments that are also unpartitioned
|
||||
*/
|
||||
private PlanFragment createMergeNodeFragment(MergeNode mergeNode,
|
||||
ArrayList<PlanFragment> childFragments,
|
||||
ArrayList<PlanFragment> fragments)
|
||||
throws UserException {
|
||||
Preconditions.checkState(mergeNode.getChildren().size() == childFragments.size());
|
||||
|
||||
// If the mergeNode only has constant exprs, return it in an unpartitioned fragment.
|
||||
if (mergeNode.getChildren().isEmpty()) {
|
||||
Preconditions.checkState(!mergeNode.getConstExprLists().isEmpty());
|
||||
return new PlanFragment(ctx_.getNextFragmentId(), mergeNode, DataPartition.UNPARTITIONED);
|
||||
}
|
||||
|
||||
// create an ExchangeNode to perform the merge operation of mergeNode;
|
||||
// the ExchangeNode retains the generic PlanNode parameters of mergeNode
|
||||
ExchangeNode exchNode = new ExchangeNode(ctx_.getNextNodeId(), mergeNode, true);
|
||||
exchNode.setNumInstances(1);
|
||||
exchNode.init(ctx_.getRootAnalyzer());
|
||||
PlanFragment parentFragment =
|
||||
new PlanFragment(ctx_.getNextFragmentId(), exchNode, DataPartition.UNPARTITIONED);
|
||||
|
||||
// we don't expect to be paralleling a MergeNode that was inserted solely
|
||||
// to evaluate conjuncts (ie, that doesn't explicitly materialize its output)
|
||||
Preconditions.checkState(mergeNode.getTupleIds().size() == 1);
|
||||
|
||||
for (int i = 0; i < childFragments.size(); ++i) {
|
||||
PlanFragment childFragment = childFragments.get(i);
|
||||
// create a clone of mergeNode; we want to keep the limit and conjuncts
|
||||
MergeNode childMergeNode = new MergeNode(ctx_.getNextNodeId(), mergeNode);
|
||||
List<Expr> resultExprs = Expr.cloneList(mergeNode.getResultExprLists().get(i), null);
|
||||
childMergeNode.addChild(childFragment.getPlanRoot(), resultExprs);
|
||||
childFragment.setPlanRoot(childMergeNode);
|
||||
childFragment.setDestination(exchNode);
|
||||
}
|
||||
|
||||
// Add an unpartitioned child fragment with a MergeNode for the constant exprs.
|
||||
if (!mergeNode.getConstExprLists().isEmpty()) {
|
||||
MergeNode childMergeNode = new MergeNode(ctx_.getNextNodeId(), mergeNode);
|
||||
childMergeNode.init(ctx_.getRootAnalyzer());
|
||||
childMergeNode.getConstExprLists().addAll(mergeNode.getConstExprLists());
|
||||
// Clear original constant exprs to make sure nobody else picks them up.
|
||||
mergeNode.getConstExprLists().clear();
|
||||
PlanFragment childFragment =
|
||||
new PlanFragment(ctx_.getNextFragmentId(), childMergeNode, DataPartition.UNPARTITIONED);
|
||||
childFragment.setPlanRoot(childMergeNode);
|
||||
childFragment.setDestination(exchNode);
|
||||
childFragments.add(childFragment);
|
||||
fragments.add(childFragment);
|
||||
}
|
||||
return parentFragment;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new fragment with a UnionNode as its root. The data partition of the
|
||||
* returned fragment and how the data of the child fragments is consumed depends on the
|
||||
|
||||
@ -1,224 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SlotId;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.analysis.TupleId;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
import org.apache.doris.thrift.TExpr;
|
||||
import org.apache.doris.thrift.TMergeNode;
|
||||
import org.apache.doris.thrift.TPlanNode;
|
||||
import org.apache.doris.thrift.TPlanNodeType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Node that merges the results of its child plans by materializing
|
||||
* the corresponding result exprs.
|
||||
* If no result exprs are specified for a child, it simply passes on the child's
|
||||
* results.
|
||||
*/
|
||||
public class MergeNode extends PlanNode {
|
||||
private final static Logger LOG = LogManager.getLogger(MergeNode.class);
|
||||
|
||||
// Expr lists corresponding to the input query stmts.
|
||||
// The ith resultExprList belongs to the ith child.
|
||||
// All exprs are resolved to base tables.
|
||||
protected List<List<Expr>> resultExprLists = Lists.newArrayList();
|
||||
|
||||
// Expr lists that originate from constant select stmts.
|
||||
// We keep them separate from the regular expr lists to avoid null children.
|
||||
protected List<List<Expr>> constExprLists = Lists.newArrayList();
|
||||
|
||||
// Output tuple materialized by this node.
|
||||
protected final List<TupleDescriptor> tupleDescs = Lists.newArrayList();
|
||||
|
||||
protected final TupleId tupleId;
|
||||
|
||||
protected MergeNode(PlanNodeId id, MergeNode node) {
|
||||
super(id, node, "MERGE");
|
||||
this.tupleId = node.tupleId;
|
||||
}
|
||||
|
||||
public void addConstExprList(List<Expr> exprs) {
|
||||
constExprLists.add(exprs);
|
||||
}
|
||||
|
||||
public void addChild(PlanNode node, List<Expr> resultExprs) {
|
||||
addChild(node);
|
||||
resultExprLists.add(resultExprs);
|
||||
if (resultExprs != null) {
|
||||
// if we're materializing output, we can only do that into a single
|
||||
// output tuple
|
||||
Preconditions.checkState(tupleIds.size() == 1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This must be called *after* addChild()/addConstExprList() because it recomputes
|
||||
* both of them.
|
||||
* The MergeNode doesn't need an smap: like a ScanNode, it materializes an "original"
|
||||
* tuple id
|
||||
*/
|
||||
@Override
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
assignConjuncts(analyzer);
|
||||
//computeMemLayout(analyzer);
|
||||
computeStats(analyzer);
|
||||
Preconditions.checkState(resultExprLists.size() == getChildren().size());
|
||||
|
||||
// drop resultExprs/constExprs that aren't getting materialized (= where the
|
||||
// corresponding output slot isn't being materialized)
|
||||
List<SlotDescriptor> slots = analyzer.getDescTbl().getTupleDesc(tupleId).getSlots();
|
||||
List<List<Expr>> newResultExprLists = Lists.newArrayList();
|
||||
//
|
||||
// for (int i = 0; i < resultExprLists.size(); ++i) {
|
||||
// List<Expr> exprList = resultExprLists.get(i);
|
||||
// List<Expr> newExprList = Lists.newArrayList();
|
||||
// for (int j = 0; j < exprList.size(); ++j) {
|
||||
// if (slots.get(j).isMaterialized()) newExprList.add(exprList.get(j));
|
||||
// }
|
||||
// newResultExprLists.add(newExprList);
|
||||
// newResultExprLists.add(
|
||||
// Expr.substituteList(newExprList, getChild(i).getOutputSmap(), analyzer, true));
|
||||
// }
|
||||
// resultExprLists = newResultExprLists;
|
||||
//
|
||||
Preconditions.checkState(resultExprLists.size() == getChildren().size());
|
||||
|
||||
List<List<Expr>> newConstExprLists = Lists.newArrayList();
|
||||
for (List<Expr> exprList: constExprLists) {
|
||||
List<Expr> newExprList = Lists.newArrayList();
|
||||
for (int i = 0; i < exprList.size(); ++i) {
|
||||
if (slots.get(i).isMaterialized()) newExprList.add(exprList.get(i));
|
||||
}
|
||||
newConstExprLists.add(newExprList);
|
||||
}
|
||||
constExprLists = newConstExprLists;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void computeStats(Analyzer analyzer) {
|
||||
super.computeStats(analyzer);
|
||||
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
return;
|
||||
}
|
||||
cardinality = constExprLists.size();
|
||||
for (PlanNode child : children) {
|
||||
// ignore missing child cardinality info in the hope it won't matter enough
|
||||
// to change the planning outcome
|
||||
if (child.cardinality > 0) {
|
||||
cardinality += child.cardinality;
|
||||
}
|
||||
}
|
||||
applyConjunctsSelectivity();
|
||||
capCardinalityAtLimit();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("stats Merge: cardinality={}", Long.toString(cardinality));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void computeOldCardinality() {
|
||||
cardinality = constExprLists.size();
|
||||
for (PlanNode child : children) {
|
||||
// ignore missing child cardinality info in the hope it won't matter enough
|
||||
// to change the planning outcome
|
||||
if (child.cardinality > 0) {
|
||||
cardinality += child.cardinality;
|
||||
}
|
||||
}
|
||||
LOG.debug("stats Merge: cardinality={}", Long.toString(cardinality));
|
||||
}
|
||||
|
||||
public List<List<Expr>> getResultExprLists() {
|
||||
return resultExprLists;
|
||||
}
|
||||
|
||||
public List<List<Expr>> getConstExprLists() {
|
||||
return constExprLists;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void toThrift(TPlanNode msg) {
|
||||
List<List<TExpr>> texprLists = Lists.newArrayList();
|
||||
List<List<TExpr>> constTexprLists = Lists.newArrayList();
|
||||
for (List<Expr> exprList : resultExprLists) {
|
||||
if (exprList != null) {
|
||||
texprLists.add(Expr.treesToThrift(exprList));
|
||||
}
|
||||
}
|
||||
for (List<Expr> constTexprList : constExprLists) {
|
||||
constTexprLists.add(Expr.treesToThrift(constTexprList));
|
||||
}
|
||||
msg.merge_node = new TMergeNode(tupleId.asInt(), texprLists, constTexprLists);
|
||||
msg.node_type = TPlanNodeType.MERGE_NODE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
|
||||
if (detailLevel == TExplainLevel.BRIEF) {
|
||||
return "";
|
||||
}
|
||||
StringBuilder output = new StringBuilder();
|
||||
// A MergeNode may have predicates if a union is used inside an inline view,
|
||||
// and the enclosing select stmt has predicates referring to the inline view.
|
||||
if (!conjuncts.isEmpty()) {
|
||||
output.append(prefix + "predicates: " + getExplainString(conjuncts) + "\n");
|
||||
}
|
||||
if (constExprLists.size() > 0) {
|
||||
output.append(prefix + "merging " + constExprLists.size() + " SELECT CONSTANT\n");
|
||||
}
|
||||
return output.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getMaterializedIds(Analyzer analyzer, List<SlotId> ids) {
|
||||
super.getMaterializedIds(analyzer, ids);
|
||||
|
||||
for (List<Expr> resultExprs : resultExprLists) {
|
||||
Expr.getIds(resultExprs, null, ids);
|
||||
}
|
||||
|
||||
// for now, also mark all of our output slots as materialized
|
||||
// TODO: fix this, it's not really necessary, but it breaks the logic
|
||||
// in MergeNode (c++)
|
||||
for (TupleId tupleId : tupleIds) {
|
||||
TupleDescriptor tupleDesc = analyzer.getTupleDesc(tupleId);
|
||||
for (SlotDescriptor slotDesc : tupleDesc.getSlots()) {
|
||||
ids.add(slotDesc.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumInstances() {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user