diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index e158b5fede..bc67a9d440 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -48,6 +48,8 @@ #include "exec/analytic_eval_node.h" #include "exec/select_node.h" #include "exec/union_node.h" +#include "exec/intersect_node.h" +#include "exec/except_node.h" #include "exec/repeat_node.h" #include "exec/assert_num_rows_node.h" #include "runtime/exec_env.h" @@ -440,6 +442,14 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN *node = pool->add(new UnionNode(pool, tnode, descs)); return Status::OK(); + case TPlanNodeType::INTERSECT_NODE: + *node = pool->add(new IntersectNode(pool, tnode, descs)); + return Status::OK(); + + // case TPlanNodeType::EXCEPT_NODE: + // *node = pool->add(new ExceptNode(pool, tnode, descs)); + // return Status::OK(); + case TPlanNodeType::BROKER_SCAN_NODE: *node = pool->add(new BrokerScanNode(pool, tnode, descs)); return Status::OK(); diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp old mode 100644 new mode 100755 index 57321f35da..ca590c1ef7 --- a/be/src/exec/intersect_node.cpp +++ b/be/src/exec/intersect_node.cpp @@ -17,36 +17,20 @@ #include "exec/intersect_node.h" +#include "exec/hash_table.hpp" #include "exprs/expr.h" +#include "runtime/row_batch.h" +#include "runtime/runtime_state.h" namespace doris { -// TODO(yangzhengguo) implememt this class -IntersectNode::IntersectNode(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs), - _tuple_id(tnode.intersect_node.tuple_id), - _tuple_desc(nullptr), - _first_materialized_child_idx(tnode.intersect_node.first_materialized_child_idx), - _child_idx(0), - _child_batch(nullptr), - _child_row_idx(0), - _child_eos(false), - _const_expr_list_idx(0), - _to_close_child_idx(-1) { -} +IntersectNode::IntersectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : ExecNode(pool, tnode, descs) {} Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) { - // RETURN_IF_ERROR(ExecNode::init(tnode, state)); RETURN_IF_ERROR(ExecNode::init(tnode, state)); DCHECK(tnode.__isset.intersect_node); DCHECK_EQ(_conjunct_ctxs.size(), 0); - // Create const_expr_ctx_lists_ from thrift exprs. - auto& const_texpr_lists = tnode.intersect_node.const_expr_lists; - for (auto& texprs : const_texpr_lists) { - std::vector ctxs; - RETURN_IF_ERROR(Expr::create_expr_trees(_pool, texprs, &ctxs)); - _const_expr_lists.push_back(ctxs); - } + DCHECK_GE(_children.size(), 2); // Create result_expr_ctx_lists_ from thrift exprs. auto& result_texpr_lists = tnode.intersect_node.result_expr_lists; for (auto& texprs : result_texpr_lists) { @@ -56,4 +40,158 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) { } return Status::OK(); } -} \ No newline at end of file + +Status IntersectNode::prepare(RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::prepare(state)); + _build_pool.reset(new MemPool(mem_tracker())); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + for (size_t i = 0; i < _child_expr_lists.size(); ++i) { + RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(), + expr_mem_tracker())); + } + _build_tuple_size = child(0)->row_desc().tuple_descriptors().size(); + _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*); + _build_tuple_idx.reserve(_build_tuple_size); + + for (int i = 0; i < _build_tuple_size; ++i) { + TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i]; + _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id())); + } + _find_nulls = std::vector(_child_expr_lists.size(), true); + return Status::OK(); +} + +Status IntersectNode::close(RuntimeState* state) { + if (is_closed()) { + return Status::OK(); + } + for (auto& exprs : _child_expr_lists) { + Expr::close(exprs, state); + } + + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); + // Must reset _probe_batch in close() to release resources + _probe_batch.reset(NULL); + + if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) { + COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes()); + COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size()); + } + if (_hash_tbl.get() != NULL) { + _hash_tbl->close(); + } + if (_build_pool.get() != NULL) { + _build_pool->free_all(); + } + + return ExecNode::close(state); +} + +// the actual intersect operation is in this function, +// 1 build a hash table from child(0) +// 2 probe with child(1), then filter the hash table and find the matched item, use them to rebuild a hash table +// repeat [2] this for all the rest child +Status IntersectNode::open(RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::open(state)); + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_CANCELLED(state); + // open result expr lists. + for (const vector& exprs : _child_expr_lists) { + RETURN_IF_ERROR(Expr::open(exprs, state)); + } + + // initial build hash table + _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], _build_tuple_size, + true, _find_nulls, id(), mem_tracker(), 1024)); + RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker()); + RETURN_IF_ERROR(child(0)->open(state)); + bool eos = false; + while (!eos) { + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos)); + // take ownership of tuple data of build_batch + _build_pool->acquire_data(build_batch.tuple_data_pool(), false); + RETURN_IF_LIMIT_EXCEEDED(state, " Intersect, while constructing the hash table."); + for (int i = 0; i < build_batch.num_rows(); ++i) { + _hash_tbl->insert(build_batch.get_row(i)); + } + VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc()); + build_batch.reset(); + } + // if a table is empty, the result must be empty + if (_hash_tbl->size() == 0) { + _hash_tbl_iterator = _hash_tbl->begin(); + return Status::OK(); + } + + for (int i = 1; i < _children.size(); ++i) { + // probe + _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker())); + RETURN_IF_ERROR(child(i)->open(state)); + eos = false; + while (!eos) { + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos)); + RETURN_IF_LIMIT_EXCEEDED(state, " Intersect , while probing the hash table."); + for (int j = 0; j < _probe_batch->num_rows(); ++j) { + _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j)); + if (_hash_tbl_iterator != _hash_tbl->end()) { + _hash_tbl_iterator.set_matched(); + } + } + _probe_batch->reset(); + } + // rebuid hash table + if (i != _children.size() - 1) { + std::unique_ptr temp_tbl( + new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size, + true, _find_nulls, id(), mem_tracker(), 1024)); + _hash_tbl_iterator = _hash_tbl->begin(); + while (_hash_tbl_iterator.has_next()) { + if (_hash_tbl_iterator.matched()) { + temp_tbl->insert(_hash_tbl_iterator.get_row()); + } + _hash_tbl_iterator.next(); + } + _hash_tbl.swap(temp_tbl); + temp_tbl->close(); + VLOG_ROW << "hash table content: " + << _hash_tbl->debug_string(true, &child(0)->row_desc()); + // if a table is empty, the result must be empty + if (_hash_tbl->size() == 0) { + break; + } + } + } + _hash_tbl_iterator = _hash_tbl->begin(); + return Status::OK(); +} + +Status IntersectNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) { + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); + RETURN_IF_CANCELLED(state); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + *eos = true; + if (reached_limit()) { + return Status::OK(); + } + while (_hash_tbl_iterator.has_next()) { + if (_hash_tbl_iterator.matched()) { + int row_idx = out_batch->add_row(); + TupleRow* out_row = out_batch->get_row(row_idx); + uint8_t* out_ptr = reinterpret_cast(out_row); + memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size); + out_batch->commit_last_row(); + } + + _hash_tbl_iterator.next(); + *eos = !_hash_tbl_iterator.has_next(); + if (out_batch->is_full() || out_batch->at_resource_limit()) { + return Status::OK(); + } + } + return Status::OK(); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/intersect_node.h b/be/src/exec/intersect_node.h index ceb35667bb..249be5d4b4 100644 --- a/be/src/exec/intersect_node.h +++ b/be/src/exec/intersect_node.h @@ -15,16 +15,18 @@ // specific language governing permissions and limitations // under the License. - -#ifndef DORIS_BE_SRC_QUERY_EXEC_INTERSECT_NODE_H -#define DORIS_BE_SRC_QUERY_EXEC_INTERSECT_NODE_H +#ifndef DORIS_BE_SRC_QUERY_EXEC_INTERSECT_NODE_H +#define DORIS_BE_SRC_QUERY_EXEC_INTERSECT_NODE_H #include "exec/exec_node.h" -#include "runtime/row_batch.h" -#include "runtime/runtime_state.h" +#include "exec/hash_table.h" namespace doris { +class MemPool; +class RowBatch; +class TupleRow; + // Node that calulate the intersect results of its children by either materializing their // evaluated expressions into row batches or passing through (forwarding) the // batches if the input tuple layout is identical to the output tuple layout @@ -38,61 +40,24 @@ public: virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); virtual Status prepare(RuntimeState* state); - virtual void codegen(RuntimeState* state); virtual Status open(RuntimeState* state); virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos); - // virtual Status reset(RuntimeState* state); virtual Status close(RuntimeState* state); private: - /// Tuple id resolved in Prepare() to set tuple_desc_; - const int _tuple_id; - - /// Descriptor for tuples this union node constructs. - const TupleDescriptor* _tuple_desc; - - /// Index of the first non-passthrough child; i.e. a child that needs materialization. - /// 0 when all children are materialized, '_children.size()' when no children are - /// materialized. - const int _first_materialized_child_idx; - - /// Const exprs materialized by this node. These exprs don't refer to any children. - /// Only materialized by the first fragment instance to avoid duplication. - std::vector> _const_expr_lists; - - /// Exprs materialized by this node. The i-th result expr list refers to the i-th child. + // Exprs materialized by this node. The i-th result expr list refers to the i-th child. std::vector> _child_expr_lists; - ///////////////////////////////////////// - /// BEGIN: Members that must be Reset() - - /// Index of current child. - int _child_idx; - - /// Current row batch of current child. We reset the pointer to a new RowBatch - /// when switching to a different child. - std::unique_ptr _child_batch; - - /// Index of current row in child_row_batch_. - int _child_row_idx; - - typedef void (*IntersectMaterializeBatchFn)(IntersectNode*, RowBatch*, uint8_t**); - /// Vector of pointers to codegen'ed materialize_batch functions. The vector contains one - /// function for each child. The size of the vector should be equal to the number of - /// children. If a child is passthrough, there should be a NULL for that child. If - /// Codegen is disabled, there should be a NULL for every child. - std::vector _codegend_except_materialize_batch_fns; - - /// Saved from the last to GetNext() on the current child. - bool _child_eos; - - /// Index of current const result expr list. - int _const_expr_list_idx; - - /// Index of the child that needs to be closed on the next GetNext() call. Should be set - /// to -1 if no child needs to be closed. - int _to_close_child_idx; + std::unique_ptr _hash_tbl; + HashTable::Iterator _hash_tbl_iterator; + std::unique_ptr _probe_batch; + // holds everything referenced in _hash_tbl + std::unique_ptr _build_pool; + std::vector _build_tuple_idx; + int _build_tuple_size; + int _build_tuple_row_size; + std::vector _find_nulls; }; }; // namespace doris diff --git a/fe/src/main/java/org/apache/doris/planner/SetOperationNode.java b/fe/src/main/java/org/apache/doris/planner/SetOperationNode.java index cb92707f18..fb0f6c7394 100644 --- a/fe/src/main/java/org/apache/doris/planner/SetOperationNode.java +++ b/fe/src/main/java/org/apache/doris/planner/SetOperationNode.java @@ -266,7 +266,9 @@ public abstract class SetOperationNode extends PlanNode { Preconditions.checkState(conjuncts.isEmpty()); computeMemLayout(analyzer); computeStats(analyzer); - computePassthrough(analyzer); + if (!(this instanceof ExceptNode)) { + computePassthrough(analyzer); + } // drop resultExprs/constExprs that aren't getting materialized (= where the // corresponding output slot isn't being materialized) diff --git a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 1ebe7bbf39..6a6d02cdbd 100644 --- a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -17,6 +17,12 @@ package org.apache.doris.planner; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import org.apache.doris.analysis.AggregateInfo; import org.apache.doris.analysis.AnalyticInfo; import org.apache.doris.analysis.Analyzer; @@ -57,13 +63,6 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Reference; import org.apache.doris.common.UserException; - -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -449,7 +448,7 @@ public class SingleNodePlanner { && child.getChild(0).getType().isNumericType()) { returnColumns.add(((SlotRef) child.getChild(0)).getDesc().getColumn()); } else { - turnOffReason = "aggExpr.getChild(0)[" + aggExpr.getChild(0).toSql()+ "] is not Numeric CastExpr"; + turnOffReason = "aggExpr.getChild(0)[" + aggExpr.getChild(0).toSql() + "] is not Numeric CastExpr"; aggExprValidate = false; break; } @@ -569,7 +568,7 @@ public class SingleNodePlanner { break; } } else if (aggExpr.getFnName().getFunction().equalsIgnoreCase(FunctionSet.BITMAP_UNION) - || aggExpr.getFnName().getFunction().equalsIgnoreCase(FunctionSet.BITMAP_UNION_COUNT)) { + || aggExpr.getFnName().getFunction().equalsIgnoreCase(FunctionSet.BITMAP_UNION_COUNT)) { if (col.getAggregationType() != AggregateType.BITMAP_UNION) { turnOffReason = "Aggregate Operator not match: BITMAP_UNION <--> " + col.getAggregationType(); @@ -1310,9 +1309,9 @@ public class SingleNodePlanner { // TODO chenhao, remove evaluateOrderBy when SubQuery's default limit is removed. return inlineViewRef.getViewStmt().evaluateOrderBy() ? false : (!inlineViewRef.getViewStmt().hasLimit() - && !inlineViewRef.getViewStmt().hasOffset() - && (!(inlineViewRef.getViewStmt() instanceof SelectStmt) - || !((SelectStmt) inlineViewRef.getViewStmt()).hasAnalyticInfo())); + && !inlineViewRef.getViewStmt().hasOffset() + && (!(inlineViewRef.getViewStmt() instanceof SelectStmt) + || !((SelectStmt) inlineViewRef.getViewStmt()).hasAnalyticInfo())); } /** @@ -1448,7 +1447,7 @@ public class SingleNodePlanner { * Throws if the JoinNode.init() fails. */ private PlanNode createJoinNode(Analyzer analyzer, PlanNode outer, TableRef outerRef, TableRef innerRef, - SelectStmt selectStmt) + SelectStmt selectStmt) throws UserException, AnalysisException { materializeTableResultForCrossJoinOrCountStar(innerRef, analyzer); // the rows coming from the build node only need to have space for the tuple @@ -1471,7 +1470,7 @@ public class SingleNodePlanner { // construct cross join node LOG.debug("Join between {} and {} requires at least one conjunctive equality predicate between the two tables", - outerRef.getAliasAsName() ,innerRef.getAliasAsName()); + outerRef.getAliasAsName(), innerRef.getAliasAsName()); // TODO If there are eq join predicates then we should construct a hash join CrossJoinNode result = new CrossJoinNode(ctx_.getNextNodeId(), outer, inner, innerRef); @@ -1524,16 +1523,17 @@ public class SingleNodePlanner { /** * Create a plan tree corresponding to 'setOperands' for the given SetOperationStmt. * The individual operands' plan trees are attached to a single SetOperationNode. - * If setOperationDistinctPlan is not null, it is expected to contain the plan for the - * distinct portion of the given SetOperationStmt. The setOperationDistinctPlan is then added + * If result is not null, it is expected to contain the plan for the + * distinct portion of the given SetOperationStmt. The result is then added * as a child of the returned SetOperationNode. */ private SetOperationNode createSetOperationPlan( Analyzer analyzer, SetOperationStmt setOperationStmt, List setOperands, - PlanNode setOperationDistinctPlan, long defaultOrderByLimit) + PlanNode result, long defaultOrderByLimit) throws UserException, AnalysisException { SetOperationNode setOpNode; SetOperationStmt.Operation operation = null; + boolean hasConst = false; for (SetOperationStmt.SetOperand setOperand : setOperands) { if (setOperand.getOperation() != null) { if (operation == null) { @@ -1542,6 +1542,17 @@ public class SingleNodePlanner { Preconditions.checkState(operation == setOperand.getOperation(), "can not support mixed set " + "operations at here"); } + if (setOperand.getQueryStmt() instanceof SelectStmt) { + SelectStmt selectStmt = (SelectStmt) setOperand.getQueryStmt(); + if (selectStmt.getTableRefs().isEmpty()) { + hasConst = true; + } + } + } + if (hasConst && !((SelectStmt) setOperands.get(0).getQueryStmt()).getTableRefs().isEmpty() + && operation != SetOperationStmt.Operation.UNION) { + // TODO(yangzhengguo) fix be will core when const exprs not at first + throw new AnalysisException("not supported const expr in INTERSECT or EXCEPT"); } switch (operation) { case UNION: @@ -1567,7 +1578,7 @@ public class SingleNodePlanner { QueryStmt queryStmt = op.getQueryStmt(); if (queryStmt instanceof SelectStmt) { SelectStmt selectStmt = (SelectStmt) queryStmt; - if (selectStmt.getTableRefs().isEmpty()) { + if (selectStmt.getTableRefs().isEmpty() && setOpNode instanceof UnionNode) { setOpNode.addConstExprList(selectStmt.getResultExprs()); continue; } @@ -1585,17 +1596,17 @@ public class SingleNodePlanner { // one is the root node, and the other is a distinct node in front, so the setOperationDistinctPlan will // be aggregate node, if this is a mixed operation // e.g. : - // a union b -> setOperationDistinctPlan == null - // a union b union all c -> setOperationDistinctPlan == null -> setOperationDistinctPlan == AggregationNode - // a union all b except c -> setOperationDistinctPlan == null -> setOperationDistinctPlan == UnionNode - // a union b except c -> setOperationDistinctPlan == null -> setOperationDistinctPlan == AggregationNode - if (setOperationDistinctPlan != null && setOperationDistinctPlan instanceof SetOperationNode) { - Preconditions.checkState(!setOperationDistinctPlan.getClass().equals(setOpNode.getClass())); - setOpNode.addChild(setOperationDistinctPlan, setOperationStmt.getResultExprs()); - } else if (setOperationDistinctPlan != null) { + // a union b -> result == null + // a union b union all c -> result == null -> result == AggregationNode + // a union all b except c -> result == null -> result == UnionNode + // a union b except c -> result == null -> result == AggregationNode + if (result != null && result instanceof SetOperationNode) { + Preconditions.checkState(!result.getClass().equals(setOpNode.getClass())); + setOpNode.addChild(result, setOperationStmt.getResultExprs()); + } else if (result != null) { Preconditions.checkState(setOperationStmt.hasDistinctOps()); - Preconditions.checkState(setOperationDistinctPlan instanceof AggregationNode); - setOpNode.addChild(setOperationDistinctPlan, + Preconditions.checkState(result instanceof AggregationNode); + setOpNode.addChild(result, setOperationStmt.getDistinctAggInfo().getGroupingExprs()); } setOpNode.init(analyzer); @@ -1652,8 +1663,8 @@ public class SingleNodePlanner { final SelectStmt select = (SelectStmt) queryStmt; op.getAnalyzer().registerConjuncts(opConjuncts, select.getTableRefIds()); } else if (queryStmt instanceof SetOperationStmt) { - final SetOperationStmt union = (SetOperationStmt) queryStmt; - op.getAnalyzer().registerConjuncts(opConjuncts, union.getTupleId().asList()); + final SetOperationStmt subSetOp = (SetOperationStmt) queryStmt; + op.getAnalyzer().registerConjuncts(opConjuncts, subSetOp.getTupleId().asList()); } else { if (selectHasTableRef) { Preconditions.checkArgument(false); @@ -1685,8 +1696,14 @@ public class SingleNodePlanner { partialOperands.add(op); } else if (operation != null && op.getOperation() != operation) { if (partialOperands.size() > 0) { - result = createPartialSetOperationPlan(analyzer, setOperationStmt, partialOperands, result, - defaultOrderByLimit); + if (operation == SetOperationStmt.Operation.INTERSECT + || operation == SetOperationStmt.Operation.EXCEPT) { + result = createSetOperationPlan(analyzer, setOperationStmt, partialOperands, result, + defaultOrderByLimit); + } else { + result = createUnionPartialSetOperationPlan(analyzer, setOperationStmt, partialOperands, result, + defaultOrderByLimit); + } partialOperands.clear(); } operation = op.getOperation(); @@ -1696,8 +1713,14 @@ public class SingleNodePlanner { } } if (partialOperands.size() > 0) { - result = createPartialSetOperationPlan(analyzer, setOperationStmt, partialOperands, result, - defaultOrderByLimit); + if (operation == SetOperationStmt.Operation.INTERSECT + || operation == SetOperationStmt.Operation.EXCEPT) { + result = createSetOperationPlan(analyzer, setOperationStmt, partialOperands, result, + defaultOrderByLimit); + } else { + result = createUnionPartialSetOperationPlan(analyzer, setOperationStmt, partialOperands, result, + defaultOrderByLimit); + } } if (setOperationStmt.hasAnalyticExprs() || hasConstantOp) { @@ -1712,16 +1735,15 @@ public class SingleNodePlanner { // the second partial plan d intersect c // notice that when query is a union b the union operation is in right-hand child(b), // while the left-hand child(a)'s operation is null - private PlanNode createPartialSetOperationPlan(Analyzer analyzer, SetOperationStmt setOperationStmt, - List setOperands, - PlanNode setOperationDistinctPlan, long defaultOrderByLimit) + private PlanNode createUnionPartialSetOperationPlan(Analyzer analyzer, SetOperationStmt setOperationStmt, + List setOperands, + PlanNode result, long defaultOrderByLimit) throws UserException { boolean hasDistinctOps = false; boolean hasAllOps = false; List allOps = new ArrayList<>(); List distinctOps = new ArrayList<>(); - SetOperationStmt.Operation operation = null; - for (SetOperationStmt.SetOperand op: setOperands) { + for (SetOperationStmt.SetOperand op : setOperands) { if (op.getQualifier() == SetOperationStmt.Qualifier.DISTINCT) { hasDistinctOps = true; distinctOps.add(op); @@ -1730,32 +1752,21 @@ public class SingleNodePlanner { hasAllOps = true; allOps.add(op); } - if (operation == null || operation == op.getOperation()) { - operation = op.getOperation(); - } else { - operation = null; - } } - Preconditions.checkNotNull(operation, "invalid operation."); - if (operation == SetOperationStmt.Operation.INTERSECT || operation == SetOperationStmt.Operation.EXCEPT) { - setOperationDistinctPlan = createSetOperationPlan( - analyzer, setOperationStmt, setOperands, setOperationDistinctPlan, defaultOrderByLimit); - } else { - // create DISTINCT tree - if (hasDistinctOps) { - setOperationDistinctPlan = createSetOperationPlan( - analyzer, setOperationStmt, distinctOps, setOperationDistinctPlan, defaultOrderByLimit); - setOperationDistinctPlan = new AggregationNode(ctx_.getNextNodeId(), setOperationDistinctPlan, - setOperationStmt.getDistinctAggInfo()); - setOperationDistinctPlan.init(analyzer); - } - // create ALL tree - if (hasAllOps) { - setOperationDistinctPlan = createSetOperationPlan(analyzer, setOperationStmt, allOps, - setOperationDistinctPlan, defaultOrderByLimit); - } + // create DISTINCT tree + if (hasDistinctOps) { + result = createSetOperationPlan( + analyzer, setOperationStmt, distinctOps, result, defaultOrderByLimit); + result = new AggregationNode(ctx_.getNextNodeId(), result, + setOperationStmt.getDistinctAggInfo()); + result.init(analyzer); } - return setOperationDistinctPlan; + // create ALL tree + if (hasAllOps) { + result = createSetOperationPlan(analyzer, setOperationStmt, allOps, + result, defaultOrderByLimit); + } + return result; } private PlanNode createAssertRowCountNode(PlanNode input, AssertNumRowsElement assertNumRowsElement,