From 3b5a0b60604ac0948889d063887e5c7a9b255f86 Mon Sep 17 00:00:00 2001 From: yangzhg <780531911@qq.com> Date: Thu, 27 Feb 2020 16:03:31 +0800 Subject: [PATCH] [TPCDS] Implement the planner for set operation (#2957) Implement intersect and except planner. This CL does not implement intersect and except node in execution level. --- be/src/exec/CMakeLists.txt | 2 + be/src/exec/except_node.cpp | 59 + be/src/exec/except_node.h | 98 + be/src/exec/intersect_node.cpp | 59 + be/src/exec/intersect_node.h | 100 + .../doris/analysis/SetOperationStmt.java | 34 +- .../doris/planner/DistributedPlanner.java | 45 +- .../org/apache/doris/planner/ExceptNode.java | 41 + .../apache/doris/planner/IntersectNode.java | 41 + .../doris/planner/SetOperationNode.java | 377 +++ .../doris/planner/SingleNodePlanner.java | 139 +- .../org/apache/doris/planner/UnionNode.java | 303 +-- .../java/org/apache/doris/qe/Coordinator.java | 60 + .../apache/doris/analysis/SelectStmtTest.java | 14 +- .../doris/planner/ConstantExpressTest.java | 3 +- .../org/apache/doris/planner/PlannerTest.java | 2078 +++++++++++++++++ gensrc/thrift/PlanNodes.thrift | 33 +- 17 files changed, 3117 insertions(+), 369 deletions(-) create mode 100644 be/src/exec/except_node.cpp create mode 100644 be/src/exec/except_node.h create mode 100644 be/src/exec/intersect_node.cpp create mode 100644 be/src/exec/intersect_node.h create mode 100644 fe/src/main/java/org/apache/doris/planner/ExceptNode.java create mode 100644 fe/src/main/java/org/apache/doris/planner/IntersectNode.java create mode 100644 fe/src/main/java/org/apache/doris/planner/SetOperationNode.java create mode 100644 fe/src/test/java/org/apache/doris/planner/PlannerTest.java diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 4f6fb8f2b1..62391ffda9 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -71,6 +71,8 @@ set(EXEC_FILES spill_sort_node.cc union_node.cpp union_node_ir.cpp + intersect_node.cpp + except_node.cpp repeat_node.cpp schema_scanner.cpp schema_scan_node.cpp diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp new file mode 100644 index 0000000000..889c50a7f2 --- /dev/null +++ b/be/src/exec/except_node.cpp @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/except_node.h" + +#include "exprs/expr.h" + +namespace doris { +// TODO(yangzhengguo) implememt this class +ExceptNode::ExceptNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : ExecNode(pool, tnode, descs), + _tuple_id(tnode.except_node.tuple_id), + _tuple_desc(nullptr), + _first_materialized_child_idx(tnode.except_node.first_materialized_child_idx), + _child_idx(0), + _child_batch(nullptr), + _child_row_idx(0), + _child_eos(false), + _const_expr_list_idx(0), + _to_close_child_idx(-1) { +} + +Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) { + // RETURN_IF_ERROR(ExecNode::init(tnode, state)); + RETURN_IF_ERROR(ExecNode::init(tnode, state)); + DCHECK(tnode.__isset.except_node); + DCHECK_EQ(_conjunct_ctxs.size(), 0); + // Create const_expr_ctx_lists_ from thrift exprs. + auto& const_texpr_lists = tnode.except_node.const_expr_lists; + for (auto& texprs : const_texpr_lists) { + std::vector ctxs; + RETURN_IF_ERROR(Expr::create_expr_trees(_pool, texprs, &ctxs)); + _const_expr_lists.push_back(ctxs); + } + // Create result_expr_ctx_lists_ from thrift exprs. + auto& result_texpr_lists = tnode.except_node.result_expr_lists; + for (auto& texprs : result_texpr_lists) { + std::vector ctxs; + RETURN_IF_ERROR(Expr::create_expr_trees(_pool, texprs, &ctxs)); + _child_expr_lists.push_back(ctxs); + } + return Status::OK(); +} +} \ No newline at end of file diff --git a/be/src/exec/except_node.h b/be/src/exec/except_node.h new file mode 100644 index 0000000000..58ce0641dc --- /dev/null +++ b/be/src/exec/except_node.h @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_QUERY_EXEC_EXCEPT_NODE_H +#define DORIS_BE_SRC_QUERY_EXEC_EXCEPT_NODE_H + +#include "exec/exec_node.h" +#include "runtime/row_batch.h" +#include "runtime/runtime_state.h" + +namespace doris { + +// Node that calulate the except results of its children by either materializing their +// evaluated expressions into row batches or passing through (forwarding) the +// batches if the input tuple layout is identical to the output tuple layout +// and expressions don't need to be evaluated. The children should be ordered +// such that all passthrough children come before the children that need +// materialization. The except node pulls from its children sequentially, i.e. +// it exhausts one child completely before moving on to the next one. +class ExceptNode : public ExecNode { +public: + ExceptNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + + virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); + virtual Status prepare(RuntimeState* state); + virtual void codegen(RuntimeState* state); + virtual Status open(RuntimeState* state); + virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos); + // virtual Status reset(RuntimeState* state); + virtual Status close(RuntimeState* state); + +private: + /// Tuple id resolved in Prepare() to set tuple_desc_; + const int _tuple_id; + + /// Descriptor for tuples this union node constructs. + const TupleDescriptor* _tuple_desc; + + /// Index of the first non-passthrough child; i.e. a child that needs materialization. + /// 0 when all children are materialized, '_children.size()' when no children are + /// materialized. + const int _first_materialized_child_idx; + + /// Const exprs materialized by this node. These exprs don't refer to any children. + /// Only materialized by the first fragment instance to avoid duplication. + std::vector> _const_expr_lists; + + /// Exprs materialized by this node. The i-th result expr list refers to the i-th child. + std::vector> _child_expr_lists; + + ///////////////////////////////////////// + /// BEGIN: Members that must be Reset() + + /// Index of current child. + int _child_idx; + + /// Current row batch of current child. We reset the pointer to a new RowBatch + /// when switching to a different child. + std::unique_ptr _child_batch; + + /// Index of current row in child_row_batch_. + int _child_row_idx; + + typedef void (*ExceptMaterializeBatchFn)(ExceptNode*, RowBatch*, uint8_t**); + /// Vector of pointers to codegen'ed materialize_batch functions. The vector contains one + /// function for each child. The size of the vector should be equal to the number of + /// children. If a child is passthrough, there should be a NULL for that child. If + /// Codegen is disabled, there should be a NULL for every child. + std::vector _codegend_except_materialize_batch_fns; + + /// Saved from the last to GetNext() on the current child. + bool _child_eos; + + /// Index of current const result expr list. + int _const_expr_list_idx; + + /// Index of the child that needs to be closed on the next GetNext() call. Should be set + /// to -1 if no child needs to be closed. + int _to_close_child_idx; +}; + +}; // namespace doris + +#endif // DORIS_BE_SRC_QUERY_EXEC_EXCEPT_NODE_H \ No newline at end of file diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp new file mode 100644 index 0000000000..57321f35da --- /dev/null +++ b/be/src/exec/intersect_node.cpp @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/intersect_node.h" + +#include "exprs/expr.h" + +namespace doris { +// TODO(yangzhengguo) implememt this class +IntersectNode::IntersectNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : ExecNode(pool, tnode, descs), + _tuple_id(tnode.intersect_node.tuple_id), + _tuple_desc(nullptr), + _first_materialized_child_idx(tnode.intersect_node.first_materialized_child_idx), + _child_idx(0), + _child_batch(nullptr), + _child_row_idx(0), + _child_eos(false), + _const_expr_list_idx(0), + _to_close_child_idx(-1) { +} + +Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) { + // RETURN_IF_ERROR(ExecNode::init(tnode, state)); + RETURN_IF_ERROR(ExecNode::init(tnode, state)); + DCHECK(tnode.__isset.intersect_node); + DCHECK_EQ(_conjunct_ctxs.size(), 0); + // Create const_expr_ctx_lists_ from thrift exprs. + auto& const_texpr_lists = tnode.intersect_node.const_expr_lists; + for (auto& texprs : const_texpr_lists) { + std::vector ctxs; + RETURN_IF_ERROR(Expr::create_expr_trees(_pool, texprs, &ctxs)); + _const_expr_lists.push_back(ctxs); + } + // Create result_expr_ctx_lists_ from thrift exprs. + auto& result_texpr_lists = tnode.intersect_node.result_expr_lists; + for (auto& texprs : result_texpr_lists) { + std::vector ctxs; + RETURN_IF_ERROR(Expr::create_expr_trees(_pool, texprs, &ctxs)); + _child_expr_lists.push_back(ctxs); + } + return Status::OK(); +} +} \ No newline at end of file diff --git a/be/src/exec/intersect_node.h b/be/src/exec/intersect_node.h new file mode 100644 index 0000000000..ceb35667bb --- /dev/null +++ b/be/src/exec/intersect_node.h @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef DORIS_BE_SRC_QUERY_EXEC_INTERSECT_NODE_H +#define DORIS_BE_SRC_QUERY_EXEC_INTERSECT_NODE_H + +#include "exec/exec_node.h" +#include "runtime/row_batch.h" +#include "runtime/runtime_state.h" + +namespace doris { + +// Node that calulate the intersect results of its children by either materializing their +// evaluated expressions into row batches or passing through (forwarding) the +// batches if the input tuple layout is identical to the output tuple layout +// and expressions don't need to be evaluated. The children should be ordered +// such that all passthrough children come before the children that need +// materialization. The interscet node pulls from its children sequentially, i.e. +// it exhausts one child completely before moving on to the next one. +class IntersectNode : public ExecNode { +public: + IntersectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + + virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); + virtual Status prepare(RuntimeState* state); + virtual void codegen(RuntimeState* state); + virtual Status open(RuntimeState* state); + virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos); + // virtual Status reset(RuntimeState* state); + virtual Status close(RuntimeState* state); + +private: + /// Tuple id resolved in Prepare() to set tuple_desc_; + const int _tuple_id; + + /// Descriptor for tuples this union node constructs. + const TupleDescriptor* _tuple_desc; + + /// Index of the first non-passthrough child; i.e. a child that needs materialization. + /// 0 when all children are materialized, '_children.size()' when no children are + /// materialized. + const int _first_materialized_child_idx; + + /// Const exprs materialized by this node. These exprs don't refer to any children. + /// Only materialized by the first fragment instance to avoid duplication. + std::vector> _const_expr_lists; + + /// Exprs materialized by this node. The i-th result expr list refers to the i-th child. + std::vector> _child_expr_lists; + + ///////////////////////////////////////// + /// BEGIN: Members that must be Reset() + + /// Index of current child. + int _child_idx; + + /// Current row batch of current child. We reset the pointer to a new RowBatch + /// when switching to a different child. + std::unique_ptr _child_batch; + + /// Index of current row in child_row_batch_. + int _child_row_idx; + + typedef void (*IntersectMaterializeBatchFn)(IntersectNode*, RowBatch*, uint8_t**); + /// Vector of pointers to codegen'ed materialize_batch functions. The vector contains one + /// function for each child. The size of the vector should be equal to the number of + /// children. If a child is passthrough, there should be a NULL for that child. If + /// Codegen is disabled, there should be a NULL for every child. + std::vector _codegend_except_materialize_batch_fns; + + /// Saved from the last to GetNext() on the current child. + bool _child_eos; + + /// Index of current const result expr list. + int _const_expr_list_idx; + + /// Index of the child that needs to be closed on the next GetNext() call. Should be set + /// to -1 if no child needs to be closed. + int _to_close_child_idx; + +}; + +}; // namespace doris + +#endif // DORIS_BE_SRC_QUERY_EXEC_INTERSECT_NODE_H \ No newline at end of file diff --git a/fe/src/main/java/org/apache/doris/analysis/SetOperationStmt.java b/fe/src/main/java/org/apache/doris/analysis/SetOperationStmt.java index 597d78302c..29def10612 100644 --- a/fe/src/main/java/org/apache/doris/analysis/SetOperationStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/SetOperationStmt.java @@ -187,11 +187,6 @@ public class SetOperationStmt extends QueryStmt { super.analyze(analyzer); Preconditions.checkState(operands.size() > 0); - for (SetOperand op : operands) { - if (op.getOperation() != null && op.getOperation() != Operation.UNION) { - throw new AnalysisException("INTERSECT/EXCEPT is not implemented yet."); - } - } // Propagates DISTINCT from left to right, propagateDistinct(); @@ -281,29 +276,28 @@ public class SetOperationStmt extends QueryStmt { allOperands_.add(operands.get(0)); return; } - // find index of first ALL operand - int firstUnionAllIdx = operands.size(); + int firstAllIdx = operands.size(); for (int i = 1; i < operands.size(); ++i) { SetOperand operand = operands.get(i); if (operand.getQualifier() == Qualifier.ALL) { - firstUnionAllIdx = (i == 1 ? 0 : i); + firstAllIdx = (i == 1 ? 0 : i); break; } } // operands[0] is always implicitly ALL, so operands[1] can't be the // first one - Preconditions.checkState(firstUnionAllIdx != 1); + Preconditions.checkState(firstAllIdx != 1); // unnest DISTINCT operands Preconditions.checkState(distinctOperands_.isEmpty()); - for (int i = 0; i < firstUnionAllIdx; ++i) { + for (int i = 0; i < firstAllIdx; ++i) { unnestOperand(distinctOperands_, Qualifier.DISTINCT, operands.get(i)); } // unnest ALL operands Preconditions.checkState(allOperands_.isEmpty()); - for (int i = firstUnionAllIdx; i < operands.size(); ++i) { + for (int i = firstAllIdx; i < operands.size(); ++i) { unnestOperand(allOperands_, Qualifier.ALL, operands.get(i)); } @@ -330,7 +324,16 @@ public class SetOperationStmt extends QueryStmt { Preconditions.checkState(queryStmt instanceof SetOperationStmt); SetOperationStmt setOperationStmt = (SetOperationStmt) queryStmt; - if (setOperationStmt.hasLimit() || setOperationStmt.hasOffset()) { + boolean mixed = false; + if (operand.getOperation() != null) { + for (int i = 1; i < setOperationStmt.operands.size(); ++i) { + if (operand.getOperation() != setOperationStmt.operands.get(i).getOperation()) { + mixed = true; + break; + } + } + } + if (setOperationStmt.hasLimit() || setOperationStmt.hasOffset() || mixed) { // we must preserve the nested SetOps target.add(operand); } else if (targetQualifier == Qualifier.DISTINCT || !setOperationStmt.hasDistinctOps()) { @@ -643,7 +646,12 @@ public class SetOperationStmt extends QueryStmt { } public void analyze(Analyzer parent) throws AnalysisException, UserException { - if (isAnalyzed()) return; + if (isAnalyzed()) { + return; + } + if (qualifier_ == Qualifier.ALL && operation != Operation.UNION) { + throw new AnalysisException("INTERSECT and EXCEPT does not support ALL qualifier."); + } analyzer = new Analyzer(parent); queryStmt.analyze(analyzer); } diff --git a/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 2daeffaeb8..c3adbe1f46 100644 --- a/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -206,8 +206,8 @@ public class DistributedPlanner { result = createSelectNodeFragment((SelectNode) root, childFragments); } else if (root instanceof OlapRewriteNode) { result = createOlapRewriteNodeFragment((OlapRewriteNode) root, childFragments); - } else if (root instanceof UnionNode) { - result = createUnionNodeFragment((UnionNode) root, childFragments, fragments); + } else if (root instanceof SetOperationNode) { + result = createSetOperationNodeFragment((SetOperationNode) root, childFragments, fragments); } else if (root instanceof MergeNode) { result = createMergeNodeFragment((MergeNode) root, childFragments, fragments); } else if (root instanceof AggregationNode) { @@ -604,16 +604,16 @@ public class DistributedPlanner { * into the UnionNode. All unpartitioned child fragments are connected to the * UnionNode via a RANDOM exchange, and remain unchanged otherwise. */ - private PlanFragment createUnionNodeFragment( - UnionNode unionNode, ArrayList childFragments, ArrayList fragments) - throws UserException { - Preconditions.checkState(unionNode.getChildren().size() == childFragments.size()); + private PlanFragment createSetOperationNodeFragment( + SetOperationNode setOperationNode, ArrayList childFragments, + ArrayList fragments) throws UserException { + Preconditions.checkState(setOperationNode.getChildren().size() == childFragments.size()); // A UnionNode could have no children or constant selects if all of its operands // were dropped because of constant predicates that evaluated to false. - if (unionNode.getChildren().isEmpty()) { + if (setOperationNode.getChildren().isEmpty()) { return new PlanFragment( - ctx_.getNextFragmentId(), unionNode, DataPartition.UNPARTITIONED); + ctx_.getNextFragmentId(), setOperationNode, DataPartition.UNPARTITIONED); } Preconditions.checkState(!childFragments.isEmpty()); @@ -624,28 +624,29 @@ public class DistributedPlanner { // remove all children to avoid them being tagged with the wrong // fragment (in the PlanFragment c'tor; we haven't created ExchangeNodes yet) - unionNode.clearChildren(); + setOperationNode.clearChildren(); // If all child fragments are unpartitioned, return a single unpartitioned fragment // with a UnionNode that merges all child fragments. if (numUnpartitionedChildFragments == childFragments.size()) { - PlanFragment unionFragment = new PlanFragment( - ctx_.getNextFragmentId(), unionNode, DataPartition.UNPARTITIONED); + PlanFragment setOperationFragment = new PlanFragment( + ctx_.getNextFragmentId(), setOperationNode, DataPartition.UNPARTITIONED); // Absorb the plan trees of all childFragments into unionNode // and fix up the fragment tree in the process. for (int i = 0; i < childFragments.size(); ++i) { - unionNode.addChild(childFragments.get(i).getPlanRoot()); - unionFragment.setFragmentInPlanTree(unionNode.getChild(i)); - unionFragment.addChildren(childFragments.get(i).getChildren()); + setOperationNode.addChild(childFragments.get(i).getPlanRoot()); + setOperationFragment.setFragmentInPlanTree(setOperationNode.getChild(i)); + setOperationFragment.addChildren(childFragments.get(i).getChildren()); } - unionNode.init(ctx_.getRootAnalyzer()); + setOperationNode.init(ctx_.getRootAnalyzer()); // All child fragments have been absorbed into unionFragment. fragments.removeAll(childFragments); - return unionFragment; + return setOperationFragment; } // There is at least one partitioned child fragment. - PlanFragment unionFragment = new PlanFragment(ctx_.getNextFragmentId(), unionNode, DataPartition.RANDOM); + PlanFragment setOperationFragment = new PlanFragment(ctx_.getNextFragmentId(), setOperationNode, + DataPartition.RANDOM); for (int i = 0; i < childFragments.size(); ++i) { PlanFragment childFragment = childFragments.get(i); /* if (childFragment.isPartitioned() && childFragment.getPlanRoot().getNumInstances() > 1) { @@ -667,13 +668,13 @@ public class DistributedPlanner { // the degree of concurrency. // chenhao16 add // dummy entry for subsequent addition of the ExchangeNode - unionNode.addChild(null); - // Connect the unpartitioned child fragments to unionNode via a random exchange. - connectChildFragment(unionNode, i, unionFragment, childFragment); + setOperationNode.addChild(null); + // Connect the unpartitioned child fragments to SetOperationNode via a random exchange. + connectChildFragment(setOperationNode, i, setOperationFragment, childFragment); childFragment.setOutputPartition(DataPartition.RANDOM); } - unionNode.init(ctx_.getRootAnalyzer()); - return unionFragment; + setOperationNode.init(ctx_.getRootAnalyzer()); + return setOperationFragment; } /** diff --git a/fe/src/main/java/org/apache/doris/planner/ExceptNode.java b/fe/src/main/java/org/apache/doris/planner/ExceptNode.java new file mode 100644 index 0000000000..4ce760e74a --- /dev/null +++ b/fe/src/main/java/org/apache/doris/planner/ExceptNode.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import java.util.List; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; + +public class ExceptNode extends SetOperationNode { + protected ExceptNode(PlanNodeId id, TupleId tupleId) { + super(id, tupleId, "EXCEPT"); + } + + protected ExceptNode(PlanNodeId id, TupleId tupleId, + List setOpResultExprs, boolean isInSubplan) { + super(id, tupleId, "EXCEPT", setOpResultExprs, isInSubplan); + } + + @Override + protected void toThrift(TPlanNode msg) { + toThrift(msg, TPlanNodeType.EXCEPT_NODE); + } +} diff --git a/fe/src/main/java/org/apache/doris/planner/IntersectNode.java b/fe/src/main/java/org/apache/doris/planner/IntersectNode.java new file mode 100644 index 0000000000..ffe16e1537 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/planner/IntersectNode.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import java.util.List; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; + +public class IntersectNode extends SetOperationNode { + protected IntersectNode(PlanNodeId id, TupleId tupleId) { + super(id, tupleId, "INTERSECT"); + } + + protected IntersectNode(PlanNodeId id, TupleId tupleId, + List setOpResultExprs, boolean isInSubplan) { + super(id, tupleId, "INTERSECT", setOpResultExprs, isInSubplan); + } + + @Override + protected void toThrift(TPlanNode msg) { + toThrift(msg, TPlanNodeType.INTERSECT_NODE); + } +} diff --git a/fe/src/main/java/org/apache/doris/planner/SetOperationNode.java b/fe/src/main/java/org/apache/doris/planner/SetOperationNode.java new file mode 100644 index 0000000000..a6623cd923 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/planner/SetOperationNode.java @@ -0,0 +1,377 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.thrift.TExceptNode; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TExpr; +import org.apache.doris.thrift.TIntersectNode; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; +import org.apache.doris.thrift.TUnionNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Node that merges the results of its child plans, Normally, this is done by + * materializing the corresponding result exprs into a new tuple. However, if + * a child has an identical tuple layout as the output of the set operation node, and + * the child only has naked SlotRefs as result exprs, then the child is marked + * as 'passthrough'. The rows of passthrough children are directly returned by + * the set operation node, instead of materializing the child's result exprs into new + * tuples. + */ +public abstract class SetOperationNode extends PlanNode { + private final static Logger LOG = LoggerFactory.getLogger(SetOperationNode.class); + + // List of set operation result exprs of the originating UnionStmt. Used for + // determining passthrough-compatibility of children. + protected List setOpResultExprs_; + + // Expr lists corresponding to the input query stmts. + // The ith resultExprList belongs to the ith child. + // All exprs are resolved to base tables. + protected List> resultExprLists_ = Lists.newArrayList(); + + // Expr lists that originate from constant select stmts. + // We keep them separate from the regular expr lists to avoid null children. + protected List> constExprLists_ = Lists.newArrayList(); + + // Materialized result/const exprs corresponding to materialized slots. + // Set in init() and substituted against the corresponding child's output smap. + protected List> materializedResultExprLists_ = Lists.newArrayList(); + protected List> materializedConstExprLists_ = Lists.newArrayList(); + + // Indicates if this UnionNode is inside a subplan. + protected boolean isInSubplan_; + + // Index of the first non-passthrough child. + protected int firstMaterializedChildIdx_; + + protected final TupleId tupleId_; + + protected SetOperationNode(PlanNodeId id, TupleId tupleId, String planNodeName) { + super(id, tupleId.asList(), planNodeName); + setOpResultExprs_ = Lists.newArrayList(); + tupleId_ = tupleId; + isInSubplan_ = false; + } + + protected SetOperationNode(PlanNodeId id, TupleId tupleId, String planNodeName, + List setOpResultExprs, boolean isInSubplan) { + super(id, tupleId.asList(), planNodeName); + setOpResultExprs_ = setOpResultExprs; + tupleId_ = tupleId; + isInSubplan_ = isInSubplan; + } + + public void addConstExprList(List exprs) { + constExprLists_.add(exprs); + } + + /** + * Returns true if this UnionNode has only constant exprs. + */ + public boolean isConstantUnion() { + return resultExprLists_.isEmpty(); + } + + /** + * Add a child tree plus its corresponding unresolved resultExprs. + */ + public void addChild(PlanNode node, List resultExprs) { + super.addChild(node); + resultExprLists_.add(resultExprs); + } + + @Override + public void computeStats(Analyzer analyzer) { + super.computeStats(analyzer); + cardinality = constExprLists_.size(); + for (PlanNode child : children) { + // ignore missing child cardinality info in the hope it won't matter enough + // to change the planning outcome + if (child.cardinality > 0) { + cardinality = addCardinalities(cardinality, child.cardinality); + } + } + // The number of nodes of a set operation node is -1 (invalid) if all the referenced tables + // are inline views (e.g. select 1 FROM (VALUES(1 x, 1 y)) a FULL OUTER JOIN + // (VALUES(1 x, 1 y)) b ON (a.x = b.y)). We need to set the correct value. + if (numNodes == -1) { + numNodes = 1; + } + cardinality = capAtLimit(cardinality); + if (LOG.isTraceEnabled()) { + LOG.trace("stats Union: cardinality=" + Long.toString(cardinality)); + } + } + + protected long capAtLimit(long cardinality) { + if (hasLimit()) { + if (cardinality == -1) { + return limit; + } else { + return Math.min(cardinality, limit); + } + } + return cardinality; + } + + /* + @Override + public void computeResourceProfile(TQueryOptions queryOptions) { + // TODO: add an estimate + resourceProfile_ = new ResourceProfile(0, 0); + } + */ + + /** + * Returns true if rows from the child with 'childTupleIds' and 'childResultExprs' can + * be returned directly by the set operation node (without materialization into a new tuple). + */ + private boolean isChildPassthrough( + Analyzer analyzer, PlanNode childNode, List childExprList) { + List childTupleIds = childNode.getTupleIds(); + // Check that if the child outputs a single tuple, then it's not nullable. Tuple + // nullability can be considered to be part of the physical row layout. + Preconditions.checkState(childTupleIds.size() != 1 || + !childNode.getNullableTupleIds().contains(childTupleIds.get(0))); + // If the Union node is inside a subplan, passthrough should be disabled to avoid + // performance issues by forcing tiny batches. + // TODO: Remove this as part of IMPALA-4179. + if (isInSubplan_) { + return false; + } + // Pass through is only done for the simple case where the row has a single tuple. One + // of the motivations for this is that the output of a UnionNode is a row with a + // single tuple. + if (childTupleIds.size() != 1) { + return false; + } + Preconditions.checkState(!setOpResultExprs_.isEmpty()); + + TupleDescriptor setOpTupleDescriptor = analyzer.getDescTbl().getTupleDesc(tupleId_); + TupleDescriptor childTupleDescriptor = + analyzer.getDescTbl().getTupleDesc(childTupleIds.get(0)); + + // Verify that the set operation tuple descriptor has one slot for every expression. + Preconditions.checkState(setOpTupleDescriptor.getSlots().size() == setOpResultExprs_.size()); + // Verify that the set operation node has one slot for every child expression. + Preconditions.checkState( + setOpTupleDescriptor.getSlots().size() == childExprList.size()); + + if (setOpResultExprs_.size() != childTupleDescriptor.getSlots().size()) { + return false; + } + if (setOpTupleDescriptor.getByteSize() != childTupleDescriptor.getByteSize()) { + return false; + } + + for (int i = 0; i < setOpResultExprs_.size(); ++i) { + if (!setOpTupleDescriptor.getSlots().get(i).isMaterialized()) + continue; + SlotRef setOpSlotRef = setOpResultExprs_.get(i).unwrapSlotRef(false); + SlotRef childSlotRef = childExprList.get(i).unwrapSlotRef(false); + Preconditions.checkNotNull(setOpSlotRef); + if (childSlotRef == null) { + return false; + } + if (!childSlotRef.getDesc().LayoutEquals(setOpSlotRef.getDesc())) { + return false; + } + } + return true; + } + + /** + * Compute which children are passthrough and reorder them such that the passthrough + * children come before the children that need to be materialized. Also reorder + * 'resultExprLists_'. The children are reordered to simplify the implementation in the + * BE. + */ + void computePassthrough(Analyzer analyzer) { + List> newResultExprLists = Lists.newArrayList(); + ArrayList newChildren = Lists.newArrayList(); + for (int i = 0; i < children.size(); i++) { + if (isChildPassthrough(analyzer, children.get(i), resultExprLists_.get(i))) { + newResultExprLists.add(resultExprLists_.get(i)); + newChildren.add(children.get(i)); + } + } + firstMaterializedChildIdx_ = newChildren.size(); + + for (int i = 0; i < children.size(); i++) { + if (!isChildPassthrough(analyzer, children.get(i), resultExprLists_.get(i))) { + newResultExprLists.add(resultExprLists_.get(i)); + newChildren.add(children.get(i)); + } + } + + Preconditions.checkState(resultExprLists_.size() == newResultExprLists.size()); + resultExprLists_ = newResultExprLists; + Preconditions.checkState(children.size() == newChildren.size()); + children = newChildren; + } + + /** + * Must be called after addChild()/addConstExprList(). Computes the materialized + * result/const expr lists based on the materialized slots of this UnionNode's + * produced tuple. The UnionNode doesn't need an smap: like a ScanNode, it + * materializes an original tuple. + * There is no need to call assignConjuncts() because all non-constant conjuncts + * have already been assigned to the set operation operands, and all constant conjuncts have + * been evaluated during registration to set analyzer.hasEmptyResultSet_. + */ + @Override + public void init(Analyzer analyzer) { + Preconditions.checkState(conjuncts.isEmpty()); + computeMemLayout(analyzer); + computeStats(analyzer); + computePassthrough(analyzer); + + // drop resultExprs/constExprs that aren't getting materialized (= where the + // corresponding output slot isn't being materialized) + materializedResultExprLists_.clear(); + Preconditions.checkState(resultExprLists_.size() == children.size()); + List slots = analyzer.getDescTbl().getTupleDesc(tupleId_).getSlots(); + for (int i = 0; i < resultExprLists_.size(); ++i) { + List exprList = resultExprLists_.get(i); + List newExprList = Lists.newArrayList(); + Preconditions.checkState(exprList.size() == slots.size()); + for (int j = 0; j < exprList.size(); ++j) { + if (slots.get(j).isMaterialized()) { + newExprList.add(exprList.get(j)); + } + } + materializedResultExprLists_.add( + Expr.substituteList(newExprList, getChild(i).getOutputSmap(), analyzer, true)); + } + Preconditions.checkState( + materializedResultExprLists_.size() == getChildren().size()); + + materializedConstExprLists_.clear(); + for (List exprList : constExprLists_) { + Preconditions.checkState(exprList.size() == slots.size()); + List newExprList = Lists.newArrayList(); + for (int i = 0; i < exprList.size(); ++i) { + if (slots.get(i).isMaterialized()) { + newExprList.add(exprList.get(i)); + } + } + materializedConstExprLists_.add(newExprList); + } + } + + protected void toThrift(TPlanNode msg, TPlanNodeType nodeType) { + Preconditions.checkState( materializedResultExprLists_.size() == children.size()); + List> texprLists = Lists.newArrayList(); + for (List exprList : materializedResultExprLists_) { + texprLists.add(Expr.treesToThrift(exprList)); + } + List> constTexprLists = Lists.newArrayList(); + for (List constTexprList : materializedConstExprLists_) { + constTexprLists.add(Expr.treesToThrift(constTexprList)); + } + Preconditions.checkState(firstMaterializedChildIdx_ <= children.size()); + switch (nodeType) { + case UNION_NODE: + msg.union_node = new TUnionNode( + tupleId_.asInt(), texprLists, constTexprLists, firstMaterializedChildIdx_); + msg.node_type = TPlanNodeType.UNION_NODE; + break; + case INTERSECT_NODE: + msg.intersect_node = new TIntersectNode( + tupleId_.asInt(), texprLists, constTexprLists, firstMaterializedChildIdx_); + msg.node_type = TPlanNodeType.INTERSECT_NODE; + break; + case EXCEPT_NODE: + msg.except_node = new TExceptNode( + tupleId_.asInt(), texprLists, constTexprLists, firstMaterializedChildIdx_); + msg.node_type = TPlanNodeType.EXCEPT_NODE; + break; + default: + LOG.error("Node type: " + nodeType.toString() + " is invalid."); + break; + } + } + + @Override + protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) { + StringBuilder output = new StringBuilder(); + // A SetOperationNode may have predicates if a union is set operation inside an inline view, + // and the enclosing select stmt has predicates referring to the inline view. + if (CollectionUtils.isNotEmpty(conjuncts)) { + output.append(prefix).append("predicates: ").append(getExplainString(conjuncts)).append("\n"); + } + if (CollectionUtils.isNotEmpty(constExprLists_)) { + output.append(prefix).append("constant exprs: ").append("\n"); + for(List exprs : constExprLists_) { + output.append(prefix).append(" ").append(exprs.stream().map(Expr::toSql) + .collect(Collectors.joining(" | "))).append("\n"); + } + } + if (detailLevel == TExplainLevel.VERBOSE) { + if (CollectionUtils.isNotEmpty(materializedResultExprLists_)) { + output.append(prefix).append("child exprs: ").append("\n"); + for(List exprs : materializedResultExprLists_) { + output.append(prefix).append(" ").append(exprs.stream().map(Expr::toSql) + .collect(Collectors.joining(" | "))).append("\n"); + } + } + List passThroughNodeIds = Lists.newArrayList(); + for (int i = 0; i < firstMaterializedChildIdx_; ++i) { + passThroughNodeIds.add(children.get(i).getId().toString()); + } + if (!passThroughNodeIds.isEmpty()) { + String result = prefix + "pass-through-operands: "; + if (passThroughNodeIds.size() == children.size()) { + output.append(result + "all\n"); + } else { + output.append(result).append(Joiner.on(",").join(passThroughNodeIds)).append("\n"); + } + } + } + return output.toString(); + } + + @Override + public int getNumInstances() { + int numInstances = 0; + for (PlanNode child : children) { + numInstances += child.getNumInstances(); + } + numInstances = Math.max(1, numInstances); + return numInstances; + } +} diff --git a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index f0728ea9b9..eb6a70a95a 100644 --- a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1518,18 +1518,43 @@ public class SingleNodePlanner { } /** - * Create a plan tree corresponding to 'unionOperands' for the given unionStmt. - * The individual operands' plan trees are attached to a single UnionNode. - * If unionDistinctPlan is not null, it is expected to contain the plan for the - * distinct portion of the given unionStmt. The unionDistinctPlan is then added - * as a child of the returned UnionNode. + * Create a plan tree corresponding to 'setOperands' for the given SetOperationStmt. + * The individual operands' plan trees are attached to a single SetOperationNode. + * If setOperationDistinctPlan is not null, it is expected to contain the plan for the + * distinct portion of the given SetOperationStmt. The setOperationDistinctPlan is then added + * as a child of the returned SetOperationNode. */ - private UnionNode createUnionPlan( + private SetOperationNode createSetOperationPlan( Analyzer analyzer, SetOperationStmt setOperationStmt, List setOperands, - PlanNode unionDistinctPlan, long defaultOrderByLimit) + PlanNode setOperationDistinctPlan, long defaultOrderByLimit) throws UserException, AnalysisException { - UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(), setOperationStmt.getTupleId(), - setOperationStmt.getSetOpsResultExprs(), false); + SetOperationNode setOpNode; + SetOperationStmt.Operation operation = null; + for (SetOperationStmt.SetOperand setOperand : setOperands) { + if (setOperand.getOperation() != null) { + if (operation == null) { + operation = setOperand.getOperation(); + } + Preconditions.checkState(operation == setOperand.getOperation(), "can not support mixed set " + + "operations at here"); + } + } + switch (operation) { + case UNION: + setOpNode = new UnionNode(ctx_.getNextNodeId(), setOperationStmt.getTupleId(), + setOperationStmt.getSetOpsResultExprs(), false); + break; + case INTERSECT: + setOpNode = new IntersectNode(ctx_.getNextNodeId(), setOperationStmt.getTupleId(), + setOperationStmt.getSetOpsResultExprs(), false); + break; + case EXCEPT: + setOpNode = new ExceptNode(ctx_.getNextNodeId(), setOperationStmt.getTupleId(), + setOperationStmt.getSetOpsResultExprs(), false); + break; + default: + throw new AnalysisException("not supported set operations: " + operation); + } for (SetOperationStmt.SetOperand op : setOperands) { if (op.getAnalyzer().hasEmptyResultSet()) { unmarkCollectionSlots(op.getQueryStmt()); @@ -1539,7 +1564,7 @@ public class SingleNodePlanner { if (queryStmt instanceof SelectStmt) { SelectStmt selectStmt = (SelectStmt) queryStmt; if (selectStmt.getTableRefs().isEmpty()) { - unionNode.addConstExprList(selectStmt.getResultExprs()); + setOpNode.addConstExprList(selectStmt.getResultExprs()); continue; } } @@ -1550,17 +1575,21 @@ public class SingleNodePlanner { if (opPlan instanceof EmptySetNode) { continue; } - unionNode.addChild(opPlan, op.getQueryStmt().getResultExprs()); + setOpNode.addChild(opPlan, op.getQueryStmt().getResultExprs()); } - - if (unionDistinctPlan != null) { + // If it is a union or other same operation, there are only two possibilities, + // one is the root node, and the other is a distinct node in front, so the setOperationDistinctPlan will + // be aggregate node, if this is a mixed operation + if (setOperationDistinctPlan != null && setOperationDistinctPlan instanceof SetOperationNode) { + Preconditions.checkState(!setOperationDistinctPlan.getClass().equals(setOpNode.getClass())); + setOpNode.addChild(setOperationDistinctPlan, setOperationStmt.getResultExprs()); + } else if (setOperationDistinctPlan != null) { Preconditions.checkState(setOperationStmt.hasDistinctOps()); - Preconditions.checkState(unionDistinctPlan instanceof AggregationNode); - unionNode.addChild(unionDistinctPlan, - setOperationStmt.getDistinctAggInfo().getGroupingExprs()); - } - unionNode.init(analyzer); - return unionNode; + Preconditions.checkState(setOperationDistinctPlan instanceof AggregationNode); + setOpNode.addChild(setOperationDistinctPlan, + setOperationStmt.getDistinctAggInfo().getGroupingExprs()); } + setOpNode.init(analyzer); + return setOpNode; } /** @@ -1632,16 +1661,33 @@ public class SingleNodePlanner { setOperationStmt.materializeRequiredSlots(analyzer); PlanNode result = null; - // create DISTINCT tree - if (setOperationStmt.hasDistinctOps()) { - result = createUnionPlan( - analyzer, setOperationStmt, setOperationStmt.getDistinctOperands(), null, defaultOrderByLimit); - result = new AggregationNode(ctx_.getNextNodeId(), result, setOperationStmt.getDistinctAggInfo()); - result.init(analyzer); + SetOperationStmt.Operation operation = null; + List partialOperands = new ArrayList<>(); + // create plan for a union b intersect c except b to three fragments + // 3:[2:[1:[a union b] intersect c] except c] + for (SetOperationStmt.SetOperand op : setOperationStmt.getOperands()) { + if (op.getOperation() == null) { + partialOperands.add(op); + } else if (operation == null && op.getOperation() != null) { + operation = op.getOperation(); + partialOperands.add(op); + } else if (operation != null && op.getOperation() == operation) { + partialOperands.add(op); + } else if (operation != null && op.getOperation() != operation) { + if (partialOperands.size() > 0) { + result = createPartialSetOperationPlan(analyzer, setOperationStmt, partialOperands, result, + defaultOrderByLimit); + partialOperands.clear(); + } + operation = op.getOperation(); + partialOperands.add(op); + } else { + throw new AnalysisException("invalid set operation statement."); + } } - // create ALL tree - if (setOperationStmt.hasAllOps()) { - result = createUnionPlan(analyzer, setOperationStmt, setOperationStmt.getAllOperands(), result, defaultOrderByLimit); + if (partialOperands.size() > 0) { + result = createPartialSetOperationPlan(analyzer, setOperationStmt, partialOperands, result, + defaultOrderByLimit); } if (setOperationStmt.hasAnalyticExprs() || hasConstantOp) { @@ -1651,6 +1697,43 @@ public class SingleNodePlanner { return result; } + // create partial plan for example: a union b intersect c + // first partial plan is a union b as a result d, + // the second partial plan d intersect c + private PlanNode createPartialSetOperationPlan(Analyzer analyzer, SetOperationStmt setOperationStmt, + List setOperands, + PlanNode setOperationDistinctPlan, long defaultOrderByLimit) + throws UserException { + boolean hasDistinctOps = false; + boolean hasAllOps = false; + List allOps = new ArrayList<>(); + List distinctOps = new ArrayList<>(); + for (SetOperationStmt.SetOperand op: setOperands) { + if (op.getQualifier() == SetOperationStmt.Qualifier.DISTINCT) { + hasDistinctOps = true; + distinctOps.add(op); + } + if (op.getQualifier() == SetOperationStmt.Qualifier.ALL) { + hasAllOps = true; + allOps.add(op); + } + } + // create DISTINCT tree + if (hasDistinctOps) { + setOperationDistinctPlan = createSetOperationPlan( + analyzer, setOperationStmt, distinctOps, setOperationDistinctPlan, defaultOrderByLimit); + setOperationDistinctPlan = new AggregationNode(ctx_.getNextNodeId(), setOperationDistinctPlan, + setOperationStmt.getDistinctAggInfo()); + setOperationDistinctPlan.init(analyzer); + } + // create ALL tree + if (hasAllOps) { + setOperationDistinctPlan = createSetOperationPlan(analyzer, setOperationStmt, allOps, + setOperationDistinctPlan, defaultOrderByLimit); + } + return setOperationDistinctPlan; + } + private PlanNode createAssertRowCountNode(PlanNode input, AssertNumRowsElement assertNumRowsElement, Analyzer analyzer) throws UserException { AssertNumRowsNode root = new AssertNumRowsNode(ctx_.getNextNodeId(), input, assertNumRowsElement); diff --git a/fe/src/main/java/org/apache/doris/planner/UnionNode.java b/fe/src/main/java/org/apache/doris/planner/UnionNode.java index 340a0890b1..b42fc68e31 100644 --- a/fe/src/main/java/org/apache/doris/planner/UnionNode.java +++ b/fe/src/main/java/org/apache/doris/planner/UnionNode.java @@ -17,318 +17,25 @@ package org.apache.doris.planner; -import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; -import com.google.common.base.Joiner; -import org.apache.commons.collections.CollectionUtils; -import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; -import org.apache.doris.analysis.SlotDescriptor; -import org.apache.doris.analysis.SlotRef; -import org.apache.doris.thrift.TExplainLevel; -import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; -import org.apache.doris.thrift.TUnionNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * Node that merges the results of its child plans, Normally, this is done by - * materializing the corresponding result exprs into a new tuple. However, if - * a child has an identical tuple layout as the output of the union node, and - * the child only has naked SlotRefs as result exprs, then the child is marked - * as 'passthrough'. The rows of passthrough children are directly returned by - * the union node, instead of materializing the child's result exprs into new - * tuples. - */ -public class UnionNode extends PlanNode { - private final static Logger LOG = LoggerFactory.getLogger(UnionNode.class); - - // List of union result exprs of the originating UnionStmt. Used for - // determining passthrough-compatibility of children. - protected List unionResultExprs_; - - // Expr lists corresponding to the input query stmts. - // The ith resultExprList belongs to the ith child. - // All exprs are resolved to base tables. - protected List> resultExprLists_ = Lists.newArrayList(); - - // Expr lists that originate from constant select stmts. - // We keep them separate from the regular expr lists to avoid null children. - protected List> constExprLists_ = Lists.newArrayList(); - - // Materialized result/const exprs corresponding to materialized slots. - // Set in init() and substituted against the corresponding child's output smap. - protected List> materializedResultExprLists_ = Lists.newArrayList(); - protected List> materializedConstExprLists_ = Lists.newArrayList(); - - // Indicates if this UnionNode is inside a subplan. - protected boolean isInSubplan_; - - // Index of the first non-passthrough child. - protected int firstMaterializedChildIdx_; - - protected final TupleId tupleId_; +public class UnionNode extends SetOperationNode { protected UnionNode(PlanNodeId id, TupleId tupleId) { - super(id, tupleId.asList(), "UNION"); - unionResultExprs_ = Lists.newArrayList(); - tupleId_ = tupleId; - isInSubplan_ = false; + super(id, tupleId, "UNION"); } protected UnionNode(PlanNodeId id, TupleId tupleId, - List unionResultExprs, boolean isInSubplan) { - super(id, tupleId.asList(), "UNION"); - unionResultExprs_ = unionResultExprs; - tupleId_ = tupleId; - isInSubplan_ = isInSubplan; - } - - public void addConstExprList(List exprs) { constExprLists_.add(exprs); } - - /** - * Returns true if this UnionNode has only constant exprs. - */ - public boolean isConstantUnion() { return resultExprLists_.isEmpty(); } - - /** - * Add a child tree plus its corresponding unresolved resultExprs. - */ - public void addChild(PlanNode node, List resultExprs) { - super.addChild(node); - resultExprLists_.add(resultExprs); - } - - @Override - public void computeStats(Analyzer analyzer) { - super.computeStats(analyzer); - cardinality = constExprLists_.size(); - for (PlanNode child: children) { - // ignore missing child cardinality info in the hope it won't matter enough - // to change the planning outcome - if (child.cardinality > 0) { - cardinality = addCardinalities(cardinality, child.cardinality); - } - } - // The number of nodes of a union node is -1 (invalid) if all the referenced tables - // are inline views (e.g. select 1 FROM (VALUES(1 x, 1 y)) a FULL OUTER JOIN - // (VALUES(1 x, 1 y)) b ON (a.x = b.y)). We need to set the correct value. - if (numNodes == -1) numNodes = 1; - cardinality = capAtLimit(cardinality); - if (LOG.isTraceEnabled()) { - LOG.trace("stats Union: cardinality=" + Long.toString(cardinality)); - } - } - - protected long capAtLimit(long cardinality) { - if (hasLimit()) { - if (cardinality == -1) { - return limit; - } else { - return Math.min(cardinality, limit); - } - } - return cardinality; - } - - /* - @Override - public void computeResourceProfile(TQueryOptions queryOptions) { - // TODO: add an estimate - resourceProfile_ = new ResourceProfile(0, 0); - } - */ - - /** - * Returns true if rows from the child with 'childTupleIds' and 'childResultExprs' can - * be returned directly by the union node (without materialization into a new tuple). - */ - private boolean isChildPassthrough( - Analyzer analyzer, PlanNode childNode, List childExprList) { - List childTupleIds = childNode.getTupleIds(); - // Check that if the child outputs a single tuple, then it's not nullable. Tuple - // nullability can be considered to be part of the physical row layout. - Preconditions.checkState(childTupleIds.size() != 1 || - !childNode.getNullableTupleIds().contains(childTupleIds.get(0))); - // If the Union node is inside a subplan, passthrough should be disabled to avoid - // performance issues by forcing tiny batches. - // TODO: Remove this as part of IMPALA-4179. - if (isInSubplan_) return false; - // Pass through is only done for the simple case where the row has a single tuple. One - // of the motivations for this is that the output of a UnionNode is a row with a - // single tuple. - if (childTupleIds.size() != 1) return false; - Preconditions.checkState(!unionResultExprs_.isEmpty()); - - TupleDescriptor unionTupleDescriptor = analyzer.getDescTbl().getTupleDesc(tupleId_); - TupleDescriptor childTupleDescriptor = - analyzer.getDescTbl().getTupleDesc(childTupleIds.get(0)); - - // Verify that the union tuple descriptor has one slot for every expression. - Preconditions.checkState( - unionTupleDescriptor.getSlots().size() == unionResultExprs_.size()); - // Verify that the union node has one slot for every child expression. - Preconditions.checkState( - unionTupleDescriptor.getSlots().size() == childExprList.size()); - - if (unionResultExprs_.size() != childTupleDescriptor.getSlots().size()) return false; - if (unionTupleDescriptor.getByteSize() != childTupleDescriptor.getByteSize()) { - return false; - } - - for (int i = 0; i < unionResultExprs_.size(); ++i) { - if (!unionTupleDescriptor.getSlots().get(i).isMaterialized()) continue; - SlotRef unionSlotRef = unionResultExprs_.get(i).unwrapSlotRef(false); - SlotRef childSlotRef = childExprList.get(i).unwrapSlotRef(false); - Preconditions.checkNotNull(unionSlotRef); - if (childSlotRef == null) return false; - if (!childSlotRef.getDesc().LayoutEquals(unionSlotRef.getDesc())) return false; - } - return true; - } - - /** - * Compute which children are passthrough and reorder them such that the passthrough - * children come before the children that need to be materialized. Also reorder - * 'resultExprLists_'. The children are reordered to simplify the implementation in the - * BE. - */ - void computePassthrough(Analyzer analyzer) { - List> newResultExprLists = Lists.newArrayList(); - ArrayList newChildren = Lists.newArrayList(); - for (int i = 0; i < children.size(); i++) { - if (isChildPassthrough(analyzer, children.get(i), resultExprLists_.get(i))) { - newResultExprLists.add(resultExprLists_.get(i)); - newChildren.add(children.get(i)); - } - } - firstMaterializedChildIdx_ = newChildren.size(); - - for (int i = 0; i < children.size(); i++) { - if (!isChildPassthrough(analyzer, children.get(i), resultExprLists_.get(i))) { - newResultExprLists.add(resultExprLists_.get(i)); - newChildren.add(children.get(i)); - } - } - - Preconditions.checkState(resultExprLists_.size() == newResultExprLists.size()); - resultExprLists_ = newResultExprLists; - Preconditions.checkState(children.size() == newChildren.size()); - children = newChildren; - } - - /** - * Must be called after addChild()/addConstExprList(). Computes the materialized - * result/const expr lists based on the materialized slots of this UnionNode's - * produced tuple. The UnionNode doesn't need an smap: like a ScanNode, it - * materializes an original tuple. - * There is no need to call assignConjuncts() because all non-constant conjuncts - * have already been assigned to the union operands, and all constant conjuncts have - * been evaluated during registration to set analyzer.hasEmptyResultSet_. - */ - @Override - public void init(Analyzer analyzer) { - Preconditions.checkState(conjuncts.isEmpty()); - computeMemLayout(analyzer); - computeStats(analyzer); - computePassthrough(analyzer); - - // drop resultExprs/constExprs that aren't getting materialized (= where the - // corresponding output slot isn't being materialized) - materializedResultExprLists_.clear(); - Preconditions.checkState(resultExprLists_.size() == children.size()); - List slots = analyzer.getDescTbl().getTupleDesc(tupleId_).getSlots(); - for (int i = 0; i < resultExprLists_.size(); ++i) { - List exprList = resultExprLists_.get(i); - List newExprList = Lists.newArrayList(); - Preconditions.checkState(exprList.size() == slots.size()); - for (int j = 0; j < exprList.size(); ++j) { - if (slots.get(j).isMaterialized()) newExprList.add(exprList.get(j)); - } - materializedResultExprLists_.add( - Expr.substituteList(newExprList, getChild(i).getOutputSmap(), analyzer, true)); - } - Preconditions.checkState( - materializedResultExprLists_.size() == getChildren().size()); - - materializedConstExprLists_.clear(); - for (List exprList: constExprLists_) { - Preconditions.checkState(exprList.size() == slots.size()); - List newExprList = Lists.newArrayList(); - for (int i = 0; i < exprList.size(); ++i) { - if (slots.get(i).isMaterialized()) newExprList.add(exprList.get(i)); - } - materializedConstExprLists_.add(newExprList); - } + List setOpResultExprs, boolean isInSubplan) { + super(id, tupleId, "UNION", setOpResultExprs, isInSubplan); } @Override protected void toThrift(TPlanNode msg) { - Preconditions.checkState(materializedResultExprLists_.size() == children.size()); - List> texprLists = Lists.newArrayList(); - for (List exprList: materializedResultExprLists_) { - texprLists.add(Expr.treesToThrift(exprList)); - } - List> constTexprLists = Lists.newArrayList(); - for (List constTexprList: materializedConstExprLists_) { - constTexprLists.add(Expr.treesToThrift(constTexprList)); - } - Preconditions.checkState(firstMaterializedChildIdx_ <= children.size()); - msg.union_node = new TUnionNode( - tupleId_.asInt(), texprLists, constTexprLists, firstMaterializedChildIdx_); - msg.node_type = TPlanNodeType.UNION_NODE; - } - - - @Override - protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - // A UnionNode may have predicates if a union is used inside an inline view, - // and the enclosing select stmt has predicates referring to the inline view. - if (CollectionUtils.isNotEmpty(conjuncts)) { - output.append(prefix).append("predicates: ").append(getExplainString(conjuncts)).append("\n"); - } - if (CollectionUtils.isNotEmpty(constExprLists_)) { - output.append(prefix).append("constant exprs: "); - for(List exprs : constExprLists_) { - output.append(exprs.stream().map(Expr::toSql) - .collect(Collectors.joining(" | "))); - output.append("\n"); - } - } - if (detailLevel == TExplainLevel.VERBOSE) { - List passThroughNodeIds = Lists.newArrayList(); - for (int i = 0; i < firstMaterializedChildIdx_; ++i) { - passThroughNodeIds.add(children.get(i).getId().toString()); - } - if (!passThroughNodeIds.isEmpty()) { - String result = prefix + "pass-through-operands: "; - if (passThroughNodeIds.size() == children.size()) { - output.append(result).append("all\n"); - } else { - output.append(result).append(Joiner.on(",").join(passThroughNodeIds)).append("\n"); - } - } - } - return output.toString(); - } - - @Override - public int getNumInstances() { - int numInstances = 0; - for (PlanNode child : children) { - numInstances += child.getNumInstances(); - } - numInstances = Math.max(1, numInstances); - return numInstances; + toThrift(msg, TPlanNodeType.UNION_NODE); } } diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index d798278f0d..20c4477138 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -35,8 +35,10 @@ import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataSink; import org.apache.doris.planner.DataStreamSink; +import org.apache.doris.planner.ExceptNode; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.HashJoinNode; +import org.apache.doris.planner.IntersectNode; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; @@ -45,6 +47,7 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.Planner; import org.apache.doris.planner.ResultSink; import org.apache.doris.planner.ScanNode; +import org.apache.doris.planner.SetOperationNode; import org.apache.doris.planner.UnionNode; import org.apache.doris.proto.PExecPlanFragmentResult; import org.apache.doris.proto.PPlanFragmentCancelReason; @@ -809,6 +812,63 @@ public class Coordinator { return false; } + // estimate if this fragment contains IntersectNode + private boolean containsIntersectNode(PlanNode node) { + if (node instanceof IntersectNode) { + return true; + } + + for (PlanNode child : node.getChildren()) { + if (child instanceof ExchangeNode) { + // Ignore other fragment's node + continue; + } else if (child instanceof IntersectNode) { + return true; + } else { + return containsIntersectNode(child); + } + } + return false; + } + + // estimate if this fragment contains ExceptNode + private boolean containsExceptNode(PlanNode node) { + if (node instanceof ExceptNode) { + return true; + } + + for (PlanNode child : node.getChildren()) { + if (child instanceof ExchangeNode) { + // Ignore other fragment's node + continue; + } else if (child instanceof ExceptNode) { + return true; + } else { + return containsExceptNode(child); + } + } + return false; + } + + // estimate if this fragment contains SetOperationNode + private boolean containsSetOperationNode(PlanNode node) { + if (node instanceof SetOperationNode) { + return true; + } + + for (PlanNode child : node.getChildren()) { + if (child instanceof ExchangeNode) { + // Ignore other fragment's node + continue; + } else if (child instanceof SetOperationNode) { + return true; + } else { + return containsSetOperationNode(child); + } + } + return false; + } + // For each fragment in fragments, computes hosts on which to run the instances // and stores result in fragmentExecParams.hosts. private void computeFragmentHosts() throws Exception { diff --git a/fe/src/test/java/org/apache/doris/analysis/SelectStmtTest.java b/fe/src/test/java/org/apache/doris/analysis/SelectStmtTest.java index 0e792c1c51..9491d4f81c 100644 --- a/fe/src/test/java/org/apache/doris/analysis/SelectStmtTest.java +++ b/fe/src/test/java/org/apache/doris/analysis/SelectStmtTest.java @@ -1,11 +1,15 @@ package org.apache.doris.analysis; +import java.io.File; import java.util.UUID; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; import org.apache.doris.qe.ConnectContext; import org.apache.doris.utframe.UtFrameUtils; + +import org.apache.commons.io.FileUtils; +import org.junit.AfterClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -16,6 +20,11 @@ public class SelectStmtTest { @Rule public ExpectedException expectedEx = ExpectedException.none(); + @AfterClass + public static void afterClass() throws Exception { + FileUtils.deleteDirectory(new File(runningDir)); + } + @Test public void testGroupingSets() throws Exception { ConnectContext ctx = UtFrameUtils.createDefaultCtx(); @@ -41,11 +50,6 @@ public class SelectStmtTest { String selectStmtStr4 = "select k1,k4+k4,MAX(k4+k4) from db1.tbl1 GROUP BY GROUPING sets ((k1,k4),(k1),(k4),()" + ");"; UtFrameUtils.parseAndAnalyzeStmt(selectStmtStr4, ctx); - - - - - } diff --git a/fe/src/test/java/org/apache/doris/planner/ConstantExpressTest.java b/fe/src/test/java/org/apache/doris/planner/ConstantExpressTest.java index 38b20c78ad..c5365b822d 100644 --- a/fe/src/test/java/org/apache/doris/planner/ConstantExpressTest.java +++ b/fe/src/test/java/org/apache/doris/planner/ConstantExpressTest.java @@ -31,6 +31,7 @@ public class ConstantExpressTest { private static String runningDir = "fe/mocked/ConstantExpressTest/" + UUID.randomUUID().toString() + "/"; private static ConnectContext connectContext; + @BeforeClass public static void beforeClass() throws Exception { UtFrameUtils.startFEServer(runningDir); @@ -40,7 +41,7 @@ public class ConstantExpressTest { private static void testConstantExpressResult(String sql, String result) throws Exception { String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql); System.out.println(explainString); - Assert.assertTrue(explainString.contains("constant exprs: "+result)); + Assert.assertTrue(explainString.contains("constant exprs: \n " + result)); } @Test diff --git a/fe/src/test/java/org/apache/doris/planner/PlannerTest.java b/fe/src/test/java/org/apache/doris/planner/PlannerTest.java new file mode 100644 index 0000000000..663133c61e --- /dev/null +++ b/fe/src/test/java/org/apache/doris/planner/PlannerTest.java @@ -0,0 +1,2078 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.utframe.UtFrameUtils; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.util.List; +import java.util.UUID; + +public class PlannerTest { + private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/"; + + @After + public void tearDown() throws Exception { + FileUtils.deleteDirectory(new File(runningDir)); + } + + @Test + public void testSetOperation() throws Exception { + // union + + ConnectContext ctx = UtFrameUtils.createDefaultCtx(); + UtFrameUtils.createMinDorisCluster(runningDir); + String createDbStmtStr = "create database db1;"; + CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx); + Catalog.getCurrentCatalog().createDb(createDbStmt); + // 3. create table tbl1 + String createTblStmtStr = "create table db1.tbl1(k1 varchar(32), k2 varchar(32), k3 varchar(32), k4 int) " + + "AGGREGATE KEY(k1, k2,k3,k4) distributed by hash(k1) buckets 3 properties('replication_num' = '1');"; + CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx); + Catalog.getCurrentCatalog().createTable(createTableStmt); + String sql1 = "explain select * from\n" + + " (select k1, k2 from db1.tbl1\n" + + " union all\n" + + " select k1, k2 from db1.tbl1) a\n" + + " inner join\n" + + " db1.tbl1 b\n" + + " on (a.k1 = b.k1)\n" + + "where b.k1 = 'a'"; + String plan1 = "PLAN FRAGMENT 0\n" + + " OUTPUT EXPRS: | | `b`.`k1` | `b`.`k2` | `b`.`k3` | `b`.`k4`\n" + + " PARTITION: RANDOM\n" + + "\n" + + " RESULT SINK\n" + + "\n" + + " 4:HASH JOIN\n" + + " | join op: INNER JOIN (BROADCAST)\n" + + " | hash predicates:\n" + + " | colocate: false, reason: Node type not match\n" + + "( = `b`.`k1`)\n" + + " | tuple ids: 2 4 \n" + + " | \n" + + " |----7:EXCHANGE\n" + + " | tuple ids: 4 \n" + + " | \n" + + " 0:UNION\n" + + " | child exprs: \n" + + " | `k1` | `k2`\n" + + " | `k1` | `k2`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 2 \n" + + " | \n" + + " |----6:EXCHANGE\n" + + " | tuple ids: 1 \n" + + " | \n" + + " 5:EXCHANGE\n" + + " tuple ids: 0 \n" + + "\n" + + "PLAN FRAGMENT 1\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 07\n" + + " UNPARTITIONED\n" + + "\n" + + " 3:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: null\n" + + " PREDICATES: `b`.`k1` = 'a'\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 4 \n" + + "\n" + + "PLAN FRAGMENT 2\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 06\n" + + " RANDOM\n" + + "\n" + + " 2:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 1 \n" + + "\n" + + "PLAN FRAGMENT 3\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 05\n" + + " RANDOM\n" + + "\n" + + " 1:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 0 \n"; + StmtExecutor stmtExecutor1 = new StmtExecutor(ctx, sql1); + stmtExecutor1.execute(); + Planner planner1 = stmtExecutor1.planner(); + List fragments1 = planner1.getFragments(); + Assert.assertEquals(plan1, planner1.getExplainString(fragments1, TExplainLevel.VERBOSE)); + String sql2 = "explain select * from db1.tbl1 where k1='a' and k4=1\n" + + "union distinct\n" + + " (select * from db1.tbl1 where k1='b' and k4=2\n" + + " union all\n" + + " select * from db1.tbl1 where k1='b' and k4=2)\n" + + "union distinct\n" + + " (select * from db1.tbl1 where k1='b' and k4=2\n" + + " union all\n" + + " (select * from db1.tbl1 where k1='b' and k4=3)\n" + + " order by 3 limit 3)\n" + + "union all\n" + + " (select * from db1.tbl1 where k1='b' and k4=3\n" + + " union all\n" + + " select * from db1.tbl1 where k1='b' and k4=4)\n" + + "union all\n" + + " (select * from db1.tbl1 where k1='b' and k4=3\n" + + " union all\n" + + " (select * from db1.tbl1 where k1='b' and k4=5)\n" + + " order by 3 limit 3)"; + + String plan2 = "PLAN FRAGMENT 0\n" + + " OUTPUT EXPRS: | | | \n" + + " PARTITION: UNPARTITIONED\n" + + "\n" + + " RESULT SINK\n" + + "\n" + + " 30:EXCHANGE\n" + + " tuple ids: 15 \n" + + "\n" + + "PLAN FRAGMENT 1\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 30\n" + + " UNPARTITIONED\n" + + "\n" + + " 9:UNION\n" + + " | child exprs: \n" + + " | | | | \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | | | | \n" + + " | pass-through-operands: 26,27,28\n" + + " | tuple ids: 15 \n" + + " | \n" + + " |----27:EXCHANGE\n" + + " | tuple ids: 8 \n" + + " | \n" + + " |----28:EXCHANGE\n" + + " | tuple ids: 9 \n" + + " | \n" + + " |----29:EXCHANGE\n" + + " | limit: 3\n" + + " | tuple ids: 14 \n" + + " | \n" + + " 26:EXCHANGE\n" + + " tuple ids: 15 \n" + + "\n" + + "PLAN FRAGMENT 2\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: UNPARTITIONED\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 29\n" + + " RANDOM\n" + + "\n" + + " 25:MERGING-EXCHANGE\n" + + " limit: 3\n" + + " tuple ids: 14 \n" + + "\n" + + "PLAN FRAGMENT 3\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 25\n" + + " UNPARTITIONED\n" + + "\n" + + " 15:TOP-N\n" + + " | order by: ASC\n" + + " | offset: 0\n" + + " | limit: 3\n" + + " | tuple ids: 14 \n" + + " | \n" + + " 12:UNION\n" + + " | child exprs: \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 13 \n" + + " | \n" + + " |----24:EXCHANGE\n" + + " | tuple ids: 12 \n" + + " | \n" + + " 23:EXCHANGE\n" + + " tuple ids: 11 \n" + + "\n" + + "PLAN FRAGMENT 4\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 24\n" + + " RANDOM\n" + + "\n" + + " 14:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 5\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 12 \n" + + "\n" + + "PLAN FRAGMENT 5\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 23\n" + + " RANDOM\n" + + "\n" + + " 13:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 3\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 11 \n" + + "\n" + + "PLAN FRAGMENT 6\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 28\n" + + " RANDOM\n" + + "\n" + + " 11:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 4\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 9 \n" + + "\n" + + "PLAN FRAGMENT 7\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 27\n" + + " RANDOM\n" + + "\n" + + " 10:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 3\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 8 \n" + + "\n" + + "PLAN FRAGMENT 8\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 26\n" + + " RANDOM\n" + + "\n" + + " 8:AGGREGATE (update finalize)\n" + + " | group by: , , , \n" + + " | tuple ids: 15 \n" + + " | \n" + + " 0:UNION\n" + + " | child exprs: \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | | | | \n" + + " | pass-through-operands: 19,20,21\n" + + " | tuple ids: 15 \n" + + " | \n" + + " |----20:EXCHANGE\n" + + " | tuple ids: 1 \n" + + " | \n" + + " |----21:EXCHANGE\n" + + " | tuple ids: 2 \n" + + " | \n" + + " |----22:EXCHANGE\n" + + " | limit: 3\n" + + " | tuple ids: 7 \n" + + " | \n" + + " 19:EXCHANGE\n" + + " tuple ids: 0 \n" + + "\n" + + "PLAN FRAGMENT 9\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: UNPARTITIONED\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 22\n" + + " RANDOM\n" + + "\n" + + " 18:MERGING-EXCHANGE\n" + + " limit: 3\n" + + " tuple ids: 7 \n" + + "\n" + + "PLAN FRAGMENT 10\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 18\n" + + " UNPARTITIONED\n" + + "\n" + + " 7:TOP-N\n" + + " | order by: ASC\n" + + " | offset: 0\n" + + " | limit: 3\n" + + " | tuple ids: 7 \n" + + " | \n" + + " 4:UNION\n" + + " | child exprs: \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 6 \n" + + " | \n" + + " |----17:EXCHANGE\n" + + " | tuple ids: 5 \n" + + " | \n" + + " 16:EXCHANGE\n" + + " tuple ids: 4 \n" + + "\n" + + "PLAN FRAGMENT 11\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 17\n" + + " RANDOM\n" + + "\n" + + " 6:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 3\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 5 \n" + + "\n" + + "PLAN FRAGMENT 12\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 16\n" + + " RANDOM\n" + + "\n" + + " 5:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 2\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 4 \n" + + "\n" + + "PLAN FRAGMENT 13\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 21\n" + + " RANDOM\n" + + "\n" + + " 3:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 2\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 2 \n" + + "\n" + + "PLAN FRAGMENT 14\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 20\n" + + " RANDOM\n" + + "\n" + + " 2:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 2\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 1 \n" + + "\n" + + "PLAN FRAGMENT 15\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 19\n" + + " RANDOM\n" + + "\n" + + " 1:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'a', `k4` = 1\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 0 \n"; + StmtExecutor stmtExecutor2 = new StmtExecutor(ctx, sql2); + stmtExecutor2.execute(); + Planner planner2 = stmtExecutor2.planner(); + List fragments2 = planner2.getFragments(); + Assert.assertEquals(plan2, planner2.getExplainString(fragments2, TExplainLevel.VERBOSE)); + // intersect + String sql3 = "explain select * from\n" + + " (select k1, k2 from db1.tbl1\n" + + " intersect\n" + + " select k1, k2 from db1.tbl1) a\n" + + " inner join\n" + + " db1.tbl1 b\n" + + " on (a.k1 = b.k1)\n" + + "where b.k1 = 'a'"; + String plan3 = "PLAN FRAGMENT 0\n" + + " OUTPUT EXPRS: | | `b`.`k1` | `b`.`k2` | `b`.`k3` | `b`.`k4`\n" + + " PARTITION: RANDOM\n" + + "\n" + + " RESULT SINK\n" + + "\n" + + " 5:HASH JOIN\n" + + " | join op: INNER JOIN (BROADCAST)\n" + + " | hash predicates:\n" + + " | colocate: false, reason: Node type not match\n" + + "( = `b`.`k1`)\n" + + " | tuple ids: 2 4 \n" + + " | \n" + + " |----8:EXCHANGE\n" + + " | tuple ids: 4 \n" + + " | \n" + + " 3:AGGREGATE (update finalize)\n" + + " | group by: , \n" + + " | tuple ids: 2 \n" + + " | \n" + + " 0:INTERSECT\n" + + " | child exprs: \n" + + " | `k1` | `k2`\n" + + " | `k1` | `k2`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 2 \n" + + " | \n" + + " |----7:EXCHANGE\n" + + " | tuple ids: 1 \n" + + " | \n" + + " 6:EXCHANGE\n" + + " tuple ids: 0 \n" + + "\n" + + "PLAN FRAGMENT 1\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 08\n" + + " UNPARTITIONED\n" + + "\n" + + " 4:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: null\n" + + " PREDICATES: `b`.`k1` = 'a'\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 4 \n" + + "\n" + + "PLAN FRAGMENT 2\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 07\n" + + " RANDOM\n" + + "\n" + + " 2:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 1 \n" + + "\n" + + "PLAN FRAGMENT 3\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 06\n" + + " RANDOM\n" + + "\n" + + " 1:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 0 \n"; + StmtExecutor stmtExecutor3 = new StmtExecutor(ctx, sql3); + stmtExecutor3.execute(); + Planner planner3 = stmtExecutor3.planner(); + List fragments3 = planner3.getFragments(); + Assert.assertEquals(plan3, planner3.getExplainString(fragments3, TExplainLevel.VERBOSE)); + String sql4 = "explain select * from db1.tbl1 where k1='a' and k4=1\n" + + "intersect distinct\n" + + " (select * from db1.tbl1 where k1='b' and k4=2\n" + + " intersect\n" + + " select * from db1.tbl1 where k1='b' and k4=2)\n" + + "intersect distinct\n" + + " (select * from db1.tbl1 where k1='b' and k4=2\n" + + " intersect\n" + + " (select * from db1.tbl1 where k1='b' and k4=3)\n" + + " order by 3 limit 3)\n" + + "intersect\n" + + " (select * from db1.tbl1 where k1='b' and k4=3\n" + + " intersect\n" + + " select * from db1.tbl1 where k1='b' and k4=4)\n" + + "intersect\n" + + " (select * from db1.tbl1 where k1='b' and k4=3\n" + + " intersect\n" + + " (select * from db1.tbl1 where k1='b' and k4=5)\n" + + " order by 3 limit 3)"; + + String plan4 = "PLAN FRAGMENT 0\n" + + " OUTPUT EXPRS: | | | \n" + + " PARTITION: UNPARTITIONED\n" + + "\n" + + " RESULT SINK\n" + + "\n" + + " 32:EXCHANGE\n" + + " tuple ids: 15 \n" + + "\n" + + "PLAN FRAGMENT 1\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: HASH_PARTITIONED: , , , \n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 32\n" + + " UNPARTITIONED\n" + + "\n" + + " 31:AGGREGATE (merge finalize)\n" + + " | group by: , , , \n" + + " | tuple ids: 15 \n" + + " | \n" + + " 30:EXCHANGE\n" + + " tuple ids: 15 \n" + + "\n" + + "PLAN FRAGMENT 2\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 30\n" + + " HASH_PARTITIONED: , , , \n" + + "\n" + + " 16:AGGREGATE (update serialize)\n" + + " | STREAMING\n" + + " | group by: , , , \n" + + " | tuple ids: 15 \n" + + " | \n" + + " 0:INTERSECT\n" + + " | child exprs: \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | | | | \n" + + " | | | | \n" + + " | pass-through-operands: 23,24,25,27,28\n" + + " | tuple ids: 15 \n" + + " | \n" + + " |----24:EXCHANGE\n" + + " | tuple ids: 1 \n" + + " | \n" + + " |----25:EXCHANGE\n" + + " | tuple ids: 2 \n" + + " | \n" + + " |----27:EXCHANGE\n" + + " | tuple ids: 8 \n" + + " | \n" + + " |----28:EXCHANGE\n" + + " | tuple ids: 9 \n" + + " | \n" + + " |----26:EXCHANGE\n" + + " | limit: 3\n" + + " | tuple ids: 7 \n" + + " | \n" + + " |----29:EXCHANGE\n" + + " | limit: 3\n" + + " | tuple ids: 14 \n" + + " | \n" + + " 23:EXCHANGE\n" + + " tuple ids: 0 \n" + + "\n" + + "PLAN FRAGMENT 3\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: UNPARTITIONED\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 29\n" + + " RANDOM\n" + + "\n" + + " 22:MERGING-EXCHANGE\n" + + " limit: 3\n" + + " tuple ids: 14 \n" + + "\n" + + "PLAN FRAGMENT 4\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 22\n" + + " UNPARTITIONED\n" + + "\n" + + " 15:TOP-N\n" + + " | order by: ASC\n" + + " | offset: 0\n" + + " | limit: 3\n" + + " | tuple ids: 14 \n" + + " | \n" + + " 14:AGGREGATE (update finalize)\n" + + " | group by: , , , \n" + + " | tuple ids: 13 \n" + + " | \n" + + " 11:INTERSECT\n" + + " | child exprs: \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 13 \n" + + " | \n" + + " |----21:EXCHANGE\n" + + " | tuple ids: 12 \n" + + " | \n" + + " 20:EXCHANGE\n" + + " tuple ids: 11 \n" + + "\n" + + "PLAN FRAGMENT 5\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 21\n" + + " RANDOM\n" + + "\n" + + " 13:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 5\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 12 \n" + + "\n" + + "PLAN FRAGMENT 6\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 20\n" + + " RANDOM\n" + + "\n" + + " 12:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 3\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 11 \n" + + "\n" + + "PLAN FRAGMENT 7\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 28\n" + + " RANDOM\n" + + "\n" + + " 10:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 4\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 9 \n" + + "\n" + + "PLAN FRAGMENT 8\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 27\n" + + " RANDOM\n" + + "\n" + + " 9:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 3\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 8 \n" + + "\n" + + "PLAN FRAGMENT 9\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: UNPARTITIONED\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 26\n" + + " RANDOM\n" + + "\n" + + " 19:MERGING-EXCHANGE\n" + + " limit: 3\n" + + " tuple ids: 7 \n" + + "\n" + + "PLAN FRAGMENT 10\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 19\n" + + " UNPARTITIONED\n" + + "\n" + + " 8:TOP-N\n" + + " | order by: ASC\n" + + " | offset: 0\n" + + " | limit: 3\n" + + " | tuple ids: 7 \n" + + " | \n" + + " 7:AGGREGATE (update finalize)\n" + + " | group by: , , , \n" + + " | tuple ids: 6 \n" + + " | \n" + + " 4:INTERSECT\n" + + " | child exprs: \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 6 \n" + + " | \n" + + " |----18:EXCHANGE\n" + + " | tuple ids: 5 \n" + + " | \n" + + " 17:EXCHANGE\n" + + " tuple ids: 4 \n" + + "\n" + + "PLAN FRAGMENT 11\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 18\n" + + " RANDOM\n" + + "\n" + + " 6:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 3\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 5 \n" + + "\n" + + "PLAN FRAGMENT 12\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 17\n" + + " RANDOM\n" + + "\n" + + " 5:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 2\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 4 \n" + + "\n" + + "PLAN FRAGMENT 13\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 25\n" + + " RANDOM\n" + + "\n" + + " 3:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 2\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 2 \n" + + "\n" + + "PLAN FRAGMENT 14\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 24\n" + + " RANDOM\n" + + "\n" + + " 2:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 2\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 1 \n" + + "\n" + + "PLAN FRAGMENT 15\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 23\n" + + " RANDOM\n" + + "\n" + + " 1:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'a', `k4` = 1\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 0 \n"; + StmtExecutor stmtExecutor4 = new StmtExecutor(ctx, sql4); + stmtExecutor4.execute(); + Planner planner4 = stmtExecutor4.planner(); + List fragments4 = planner4.getFragments(); + Assert.assertEquals(plan4, planner4.getExplainString(fragments4, TExplainLevel.VERBOSE)); + + // except + String sql5 = "explain select * from\n" + + " (select k1, k2 from db1.tbl1\n" + + " except\n" + + " select k1, k2 from db1.tbl1) a\n" + + " inner join\n" + + " db1.tbl1 b\n" + + " on (a.k1 = b.k1)\n" + + "where b.k1 = 'a'"; + String plan5 = "PLAN FRAGMENT 0\n" + + " OUTPUT EXPRS: | | `b`.`k1` | `b`.`k2` | `b`.`k3` | `b`.`k4`\n" + + " PARTITION: RANDOM\n" + + "\n" + + " RESULT SINK\n" + + "\n" + + " 5:HASH JOIN\n" + + " | join op: INNER JOIN (BROADCAST)\n" + + " | hash predicates:\n" + + " | colocate: false, reason: Node type not match\n" + + "( = `b`.`k1`)\n" + + " | tuple ids: 2 4 \n" + + " | \n" + + " |----8:EXCHANGE\n" + + " | tuple ids: 4 \n" + + " | \n" + + " 3:AGGREGATE (update finalize)\n" + + " | group by: , \n" + + " | tuple ids: 2 \n" + + " | \n" + + " 0:EXCEPT\n" + + " | child exprs: \n" + + " | `k1` | `k2`\n" + + " | `k1` | `k2`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 2 \n" + + " | \n" + + " |----7:EXCHANGE\n" + + " | tuple ids: 1 \n" + + " | \n" + + " 6:EXCHANGE\n" + + " tuple ids: 0 \n" + + "\n" + + "PLAN FRAGMENT 1\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 08\n" + + " UNPARTITIONED\n" + + "\n" + + " 4:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: null\n" + + " PREDICATES: `b`.`k1` = 'a'\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 4 \n" + + "\n" + + "PLAN FRAGMENT 2\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 07\n" + + " RANDOM\n" + + "\n" + + " 2:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 1 \n" + + "\n" + + "PLAN FRAGMENT 3\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 06\n" + + " RANDOM\n" + + "\n" + + " 1:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 0 \n"; + StmtExecutor stmtExecutor5 = new StmtExecutor(ctx, sql5); + stmtExecutor5.execute(); + Planner planner5 = stmtExecutor5.planner(); + List fragments5 = planner5.getFragments(); + Assert.assertEquals(plan5, planner5.getExplainString(fragments5, TExplainLevel.VERBOSE)); + + String sql6 = "select * from db1.tbl1 where k1='a' and k4=1\n" + + "except\n" + + "select * from db1.tbl1 where k1='a' and k4=1\n" + + "except\n" + + "select * from db1.tbl1 where k1='a' and k4=2\n" + + "except distinct\n" + + "(select * from db1.tbl1 where k1='a' and k4=2)\n" + + "order by 3 limit 3"; + String plan6 = "PLAN FRAGMENT 0\n" + + " OUTPUT EXPRS: | | | \n" + + " PARTITION: UNPARTITIONED\n" + + "\n" + + " RESULT SINK\n" + + "\n" + + " 11:MERGING-EXCHANGE\n" + + " limit: 3\n" + + " tuple ids: 5 \n" + + "\n" + + "PLAN FRAGMENT 1\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 11\n" + + " UNPARTITIONED\n" + + "\n" + + " 6:TOP-N\n" + + " | order by: ASC\n" + + " | offset: 0\n" + + " | limit: 3\n" + + " | tuple ids: 5 \n" + + " | \n" + + " 5:AGGREGATE (update finalize)\n" + + " | group by: , , , \n" + + " | tuple ids: 4 \n" + + " | \n" + + " 0:EXCEPT\n" + + " | child exprs: \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 4 \n" + + " | \n" + + " |----8:EXCHANGE\n" + + " | tuple ids: 1 \n" + + " | \n" + + " |----9:EXCHANGE\n" + + " | tuple ids: 2 \n" + + " | \n" + + " |----10:EXCHANGE\n" + + " | tuple ids: 3 \n" + + " | \n" + + " 7:EXCHANGE\n" + + " tuple ids: 0 \n" + + "\n" + + "PLAN FRAGMENT 2\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 10\n" + + " RANDOM\n" + + "\n" + + " 4:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'a', `k4` = 2\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 3 \n" + + "\n" + + "PLAN FRAGMENT 3\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 09\n" + + " RANDOM\n" + + "\n" + + " 3:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'a', `k4` = 2\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 2 \n" + + "\n" + + "PLAN FRAGMENT 4\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 08\n" + + " RANDOM\n" + + "\n" + + " 2:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'a', `k4` = 1\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 1 \n" + + "\n" + + "PLAN FRAGMENT 5\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 07\n" + + " RANDOM\n" + + "\n" + + " 1:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'a', `k4` = 1\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 0 \n"; + StmtExecutor stmtExecutor6 = new StmtExecutor(ctx, sql6); + stmtExecutor6.execute(); + Planner planner6 = stmtExecutor6.planner(); + List fragments6 = planner6.getFragments(); + Assert.assertEquals(plan6, planner6.getExplainString(fragments6, TExplainLevel.VERBOSE)); + + String sql7 = "select * from db1.tbl1 where k1='a' and k4=1\n" + + "except distinct\n" + + "select * from db1.tbl1 where k1='a' and k4=1\n" + + "except\n" + + "select * from db1.tbl1 where k1='a' and k4=2\n" + + "except\n" + + "(select * from db1.tbl1 where k1='a' and k4=2)\n" + + "order by 3 limit 3"; + String plan7 = "PLAN FRAGMENT 0\n" + + " OUTPUT EXPRS: | | | \n" + + " PARTITION: UNPARTITIONED\n" + + "\n" + + " RESULT SINK\n" + + "\n" + + " 11:MERGING-EXCHANGE\n" + + " limit: 3\n" + + " tuple ids: 5 \n" + + "\n" + + "PLAN FRAGMENT 1\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 11\n" + + " UNPARTITIONED\n" + + "\n" + + " 6:TOP-N\n" + + " | order by: ASC\n" + + " | offset: 0\n" + + " | limit: 3\n" + + " | tuple ids: 5 \n" + + " | \n" + + " 5:AGGREGATE (update finalize)\n" + + " | group by: , , , \n" + + " | tuple ids: 4 \n" + + " | \n" + + " 0:EXCEPT\n" + + " | child exprs: \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 4 \n" + + " | \n" + + " |----8:EXCHANGE\n" + + " | tuple ids: 1 \n" + + " | \n" + + " |----9:EXCHANGE\n" + + " | tuple ids: 2 \n" + + " | \n" + + " |----10:EXCHANGE\n" + + " | tuple ids: 3 \n" + + " | \n" + + " 7:EXCHANGE\n" + + " tuple ids: 0 \n" + + "\n" + + "PLAN FRAGMENT 2\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 10\n" + + " RANDOM\n" + + "\n" + + " 4:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'a', `k4` = 2\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 3 \n" + + "\n" + + "PLAN FRAGMENT 3\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 09\n" + + " RANDOM\n" + + "\n" + + " 3:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'a', `k4` = 2\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 2 \n" + + "\n" + + "PLAN FRAGMENT 4\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 08\n" + + " RANDOM\n" + + "\n" + + " 2:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'a', `k4` = 1\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 1 \n" + + "\n" + + "PLAN FRAGMENT 5\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 07\n" + + " RANDOM\n" + + "\n" + + " 1:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'a', `k4` = 1\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 0 \n"; + StmtExecutor stmtExecutor7 = new StmtExecutor(ctx, sql7); + stmtExecutor7.execute(); + Planner planner7 = stmtExecutor7.planner(); + List fragments7 = planner7.getFragments(); + Assert.assertEquals(plan7, planner7.getExplainString(fragments7, TExplainLevel.VERBOSE)); + + // mixed + String sql8 = "select * from db1.tbl1 where k1='a' and k4=1\n" + + "union\n" + + "select * from db1.tbl1 where k1='a' and k4=1\n" + + "except\n" + + "select * from db1.tbl1 where k1='a' and k4=2\n" + + "intersect\n" + + "(select * from db1.tbl1 where k1='a' and k4=2)\n" + + "order by 3 limit 3"; + String plan8 = "PLAN FRAGMENT 0\n" + + " OUTPUT EXPRS: | | | \n" + + " PARTITION: UNPARTITIONED\n" + + "\n" + + " RESULT SINK\n" + + "\n" + + " 17:MERGING-EXCHANGE\n" + + " limit: 3\n" + + " tuple ids: 5 \n" + + "\n" + + "PLAN FRAGMENT 1\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 17\n" + + " UNPARTITIONED\n" + + "\n" + + " 10:TOP-N\n" + + " | order by: ASC\n" + + " | offset: 0\n" + + " | limit: 3\n" + + " | tuple ids: 5 \n" + + " | \n" + + " 9:AGGREGATE (update finalize)\n" + + " | group by: , , , \n" + + " | tuple ids: 4 \n" + + " | \n" + + " 7:INTERSECT\n" + + " | child exprs: \n" + + " | | | | \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 4 \n" + + " | \n" + + " |----16:EXCHANGE\n" + + " | tuple ids: 3 \n" + + " | \n" + + " 15:EXCHANGE\n" + + " tuple ids: 4 \n" + + "\n" + + "PLAN FRAGMENT 2\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 16\n" + + " RANDOM\n" + + "\n" + + " 8:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'a', `k4` = 2\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 3 \n" + + "\n" + + "PLAN FRAGMENT 3\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 15\n" + + " RANDOM\n" + + "\n" + + " 6:AGGREGATE (update finalize)\n" + + " | group by: , , , \n" + + " | tuple ids: 4 \n" + + " | \n" + + " 4:EXCEPT\n" + + " | child exprs: \n" + + " | | | | \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 4 \n" + + " | \n" + + " |----14:EXCHANGE\n" + + " | tuple ids: 2 \n" + + " | \n" + + " 13:EXCHANGE\n" + + " tuple ids: 4 \n" + + "\n" + + "PLAN FRAGMENT 4\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 14\n" + + " RANDOM\n" + + "\n" + + " 5:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'a', `k4` = 2\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 2 \n" + + "\n" + + "PLAN FRAGMENT 5\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 13\n" + + " RANDOM\n" + + "\n" + + " 3:AGGREGATE (update finalize)\n" + + " | group by: , , , \n" + + " | tuple ids: 4 \n" + + " | \n" + + " 0:UNION\n" + + " | child exprs: \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 4 \n" + + " | \n" + + " |----12:EXCHANGE\n" + + " | tuple ids: 1 \n" + + " | \n" + + " 11:EXCHANGE\n" + + " tuple ids: 0 \n" + + "\n" + + "PLAN FRAGMENT 6\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 12\n" + + " RANDOM\n" + + "\n" + + " 2:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'a', `k4` = 1\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 1 \n" + + "\n" + + "PLAN FRAGMENT 7\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 11\n" + + " RANDOM\n" + + "\n" + + " 1:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'a', `k4` = 1\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 0 \n"; + StmtExecutor stmtExecutor8 = new StmtExecutor(ctx, sql8); + stmtExecutor8.execute(); + Planner planner8 = stmtExecutor8.planner(); + List fragments8 = planner8.getFragments(); + Assert.assertEquals(plan8, planner8.getExplainString(fragments8, TExplainLevel.VERBOSE)); + + String sql9 = "explain select * from db1.tbl1 where k1='a' and k4=1\n" + + "intersect distinct\n" + + " (select * from db1.tbl1 where k1='b' and k4=2\n" + + " union all\n" + + " select * from db1.tbl1 where k1='b' and k4=2)\n" + + "intersect distinct\n" + + " (select * from db1.tbl1 where k1='b' and k4=2\n" + + " except\n" + + " (select * from db1.tbl1 where k1='b' and k4=3)\n" + + " order by 3 limit 3)\n" + + "union all\n" + + " (select * from db1.tbl1 where k1='b' and k4=3\n" + + " intersect\n" + + " select * from db1.tbl1 where k1='b' and k4=4)\n" + + "except\n" + + " (select * from db1.tbl1 where k1='b' and k4=3\n" + + " intersect\n" + + " (select * from db1.tbl1 where k1='b' and k4=5)\n" + + " order by 3 limit 3)"; + + String plan9 = "PLAN FRAGMENT 0\n" + + " OUTPUT EXPRS: | | | \n" + + " PARTITION: UNPARTITIONED\n" + + "\n" + + " RESULT SINK\n" + + "\n" + + " 47:EXCHANGE\n" + + " tuple ids: 15 \n" + + "\n" + + "PLAN FRAGMENT 1\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: HASH_PARTITIONED: , , , \n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 47\n" + + " UNPARTITIONED\n" + + "\n" + + " 46:AGGREGATE (merge finalize)\n" + + " | group by: , , , \n" + + " | tuple ids: 15 \n" + + " | \n" + + " 45:EXCHANGE\n" + + " tuple ids: 15 \n" + + "\n" + + "PLAN FRAGMENT 2\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 45\n" + + " HASH_PARTITIONED: , , , \n" + + "\n" + + " 23:AGGREGATE (update serialize)\n" + + " | STREAMING\n" + + " | group by: , , , \n" + + " | tuple ids: 15 \n" + + " | \n" + + " 17:EXCEPT\n" + + " | child exprs: \n" + + " | | | | \n" + + " | | | | \n" + + " | pass-through-operands: 43\n" + + " | tuple ids: 15 \n" + + " | \n" + + " |----44:EXCHANGE\n" + + " | limit: 3\n" + + " | tuple ids: 14 \n" + + " | \n" + + " 43:EXCHANGE\n" + + " tuple ids: 15 \n" + + "\n" + + "PLAN FRAGMENT 3\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: UNPARTITIONED\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 44\n" + + " RANDOM\n" + + "\n" + + " 42:MERGING-EXCHANGE\n" + + " limit: 3\n" + + " tuple ids: 14 \n" + + "\n" + + "PLAN FRAGMENT 4\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 42\n" + + " UNPARTITIONED\n" + + "\n" + + " 22:TOP-N\n" + + " | order by: ASC\n" + + " | offset: 0\n" + + " | limit: 3\n" + + " | tuple ids: 14 \n" + + " | \n" + + " 21:AGGREGATE (update finalize)\n" + + " | group by: , , , \n" + + " | tuple ids: 13 \n" + + " | \n" + + " 18:INTERSECT\n" + + " | child exprs: \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 13 \n" + + " | \n" + + " |----41:EXCHANGE\n" + + " | tuple ids: 12 \n" + + " | \n" + + " 40:EXCHANGE\n" + + " tuple ids: 11 \n" + + "\n" + + "PLAN FRAGMENT 5\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 41\n" + + " RANDOM\n" + + "\n" + + " 20:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 5\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 12 \n" + + "\n" + + "PLAN FRAGMENT 6\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 40\n" + + " RANDOM\n" + + "\n" + + " 19:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 3\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 11 \n" + + "\n" + + "PLAN FRAGMENT 7\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: HASH_PARTITIONED: , , , \n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 43\n" + + " RANDOM\n" + + "\n" + + " 39:AGGREGATE (merge finalize)\n" + + " | group by: , , , \n" + + " | tuple ids: 15 \n" + + " | \n" + + " 38:EXCHANGE\n" + + " tuple ids: 15 \n" + + "\n" + + "PLAN FRAGMENT 8\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 38\n" + + " HASH_PARTITIONED: , , , \n" + + "\n" + + " 16:AGGREGATE (update serialize)\n" + + " | STREAMING\n" + + " | group by: , , , \n" + + " | tuple ids: 15 \n" + + " | \n" + + " 11:UNION\n" + + " | child exprs: \n" + + " | | | | \n" + + " | | | | \n" + + " | pass-through-operands: all\n" + + " | tuple ids: 15 \n" + + " | \n" + + " |----37:EXCHANGE\n" + + " | tuple ids: 15 \n" + + " | \n" + + " 36:EXCHANGE\n" + + " tuple ids: 10 \n" + + "\n" + + "PLAN FRAGMENT 9\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: HASH_PARTITIONED: , , , \n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 37\n" + + " RANDOM\n" + + "\n" + + " 35:AGGREGATE (merge finalize)\n" + + " | group by: , , , \n" + + " | tuple ids: 15 \n" + + " | \n" + + " 34:EXCHANGE\n" + + " tuple ids: 15 \n" + + "\n" + + "PLAN FRAGMENT 10\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 34\n" + + " HASH_PARTITIONED: , , , \n" + + "\n" + + " 10:AGGREGATE (update serialize)\n" + + " | STREAMING\n" + + " | group by: , , , \n" + + " | tuple ids: 15 \n" + + " | \n" + + " 0:INTERSECT\n" + + " | child exprs: \n" + + " | | | | \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | | | | \n" + + " | pass-through-operands: 31,32\n" + + " | tuple ids: 15 \n" + + " | \n" + + " |----32:EXCHANGE\n" + + " | tuple ids: 0 \n" + + " | \n" + + " |----33:EXCHANGE\n" + + " | limit: 3\n" + + " | tuple ids: 7 \n" + + " | \n" + + " 31:EXCHANGE\n" + + " tuple ids: 3 \n" + + "\n" + + "PLAN FRAGMENT 11\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: UNPARTITIONED\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 33\n" + + " RANDOM\n" + + "\n" + + " 30:MERGING-EXCHANGE\n" + + " limit: 3\n" + + " tuple ids: 7 \n" + + "\n" + + "PLAN FRAGMENT 12\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 30\n" + + " UNPARTITIONED\n" + + "\n" + + " 9:TOP-N\n" + + " | order by: ASC\n" + + " | offset: 0\n" + + " | limit: 3\n" + + " | tuple ids: 7 \n" + + " | \n" + + " 8:AGGREGATE (update finalize)\n" + + " | group by: , , , \n" + + " | tuple ids: 6 \n" + + " | \n" + + " 5:EXCEPT\n" + + " | child exprs: \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 6 \n" + + " | \n" + + " |----29:EXCHANGE\n" + + " | tuple ids: 5 \n" + + " | \n" + + " 28:EXCHANGE\n" + + " tuple ids: 4 \n" + + "\n" + + "PLAN FRAGMENT 13\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 29\n" + + " RANDOM\n" + + "\n" + + " 7:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 3\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 5 \n" + + "\n" + + "PLAN FRAGMENT 14\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 28\n" + + " RANDOM\n" + + "\n" + + " 6:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 2\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 4 \n" + + "\n" + + "PLAN FRAGMENT 15\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 32\n" + + " RANDOM\n" + + "\n" + + " 1:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'a', `k4` = 1\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 0 \n" + + "\n" + + "PLAN FRAGMENT 16\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 31\n" + + " RANDOM\n" + + "\n" + + " 2:UNION\n" + + " | child exprs: \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 3 \n" + + " | \n" + + " |----27:EXCHANGE\n" + + " | tuple ids: 2 \n" + + " | \n" + + " 26:EXCHANGE\n" + + " tuple ids: 1 \n" + + "\n" + + "PLAN FRAGMENT 17\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 27\n" + + " RANDOM\n" + + "\n" + + " 4:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 2\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 2 \n" + + "\n" + + "PLAN FRAGMENT 18\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 26\n" + + " RANDOM\n" + + "\n" + + " 3:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 2\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 1 \n" + + "\n" + + "PLAN FRAGMENT 19\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 36\n" + + " RANDOM\n" + + "\n" + + " 15:AGGREGATE (update finalize)\n" + + " | group by: , , , \n" + + " | tuple ids: 10 \n" + + " | \n" + + " 12:INTERSECT\n" + + " | child exprs: \n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | `default_cluster:db1.tbl1`.`k1` | `default_cluster:db1.tbl1`.`k2` | `default_cluster:db1.tbl1`.`k3` | `default_cluster:db1.tbl1`.`k4`\n" + + " | pass-through-operands: all\n" + + " | tuple ids: 10 \n" + + " | \n" + + " |----25:EXCHANGE\n" + + " | tuple ids: 9 \n" + + " | \n" + + " 24:EXCHANGE\n" + + " tuple ids: 8 \n" + + "\n" + + "PLAN FRAGMENT 20\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 25\n" + + " RANDOM\n" + + "\n" + + " 14:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 4\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 9 \n" + + "\n" + + "PLAN FRAGMENT 21\n" + + " OUTPUT EXPRS:\n" + + " PARTITION: RANDOM\n" + + "\n" + + " STREAM DATA SINK\n" + + " EXCHANGE ID: 24\n" + + " RANDOM\n" + + "\n" + + " 13:OlapScanNode\n" + + " TABLE: tbl1\n" + + " PREAGGREGATION: OFF. Reason: No AggregateInfo\n" + + " PREDICATES: `k1` = 'b', `k4` = 3\n" + + " partitions=0/1\n" + + " rollup: null\n" + + " tabletRatio=0/0\n" + + " tabletList=\n" + + " cardinality=-1\n" + + " avgRowSize=0.0\n" + + " numNodes=0\n" + + " tuple ids: 8 \n"; + StmtExecutor stmtExecutor9 = new StmtExecutor(ctx, sql9); + stmtExecutor9.execute(); + Planner planner9 = stmtExecutor9.planner(); + List fragments9 = planner9.getFragments(); + Assert.assertEquals(plan9, planner9.getExplainString(fragments9, TExplainLevel.VERBOSE)); + } + +} diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 0fcc44bb65..85d56fc1ec 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -46,8 +46,10 @@ enum TPlanNodeType { UNION_NODE, ES_SCAN_NODE, ES_HTTP_SCAN_NODE, - REPEAT_NODE - ASSERT_NUM_ROWS_NODE + REPEAT_NODE, + ASSERT_NUM_ROWS_NODE, + INTERSECT_NODE, + EXCEPT_NODE } // phases of an execution node @@ -546,6 +548,31 @@ struct TUnionNode { 4: required i64 first_materialized_child_idx } +struct TIntersectNode { + // A IntersectNode materializes all const/result exprs into this tuple. + 1: required Types.TTupleId tuple_id + // List or expr lists materialized by this node. + // There is one list of exprs per query stmt feeding into this union node. + 2: required list> result_expr_lists + // Separate list of expr lists coming from a constant select stmts. + 3: required list> const_expr_lists + // Index of the first child that needs to be materialized. + 4: required i64 first_materialized_child_idx +} + +struct TExceptNode { + // A ExceptNode materializes all const/result exprs into this tuple. + 1: required Types.TTupleId tuple_id + // List or expr lists materialized by this node. + // There is one list of exprs per query stmt feeding into this union node. + 2: required list> result_expr_lists + // Separate list of expr lists coming from a constant select stmts. + 3: required list> const_expr_lists + // Index of the first child that needs to be materialized. + 4: required i64 first_materialized_child_idx +} + + struct TExchangeNode { // The ExchangeNode's input rows form a prefix of the output rows it produces; // this describes the composition of that prefix @@ -629,6 +656,8 @@ struct TPlanNode { 30: optional TEsScanNode es_scan_node 31: optional TRepeatNode repeat_node 32: optional TAssertNumRowsNode assert_num_rows_node + 33: optional TIntersectNode intersect_node + 34: optional TExceptNode except_node } // A flattened representation of a tree of PlanNodes, obtained by depth-first