From 0959abc1dc347c7401c4875aab8b8cfc777d4738 Mon Sep 17 00:00:00 2001 From: yangzhg <780531911@qq.com> Date: Tue, 17 Mar 2020 10:54:40 +0800 Subject: [PATCH] [ExceptNode] Implement except node (#3056) implement except node, support statement like: ``` select a from t1 except select b from t2 ``` --- be/src/exec/except_node.cpp | 215 ++++++++++++++++-- be/src/exec/except_node.h | 73 ++---- be/src/exec/exec_node.cpp | 6 +- be/src/exec/hash_table.h | 15 +- be/src/exec/hash_table.hpp | 4 +- be/src/exec/intersect_node.cpp | 49 ++-- be/src/exec/intersect_node.h | 3 + .../doris/analysis/SetOperationStmt.java | 25 +- .../doris/planner/SetOperationNode.java | 18 +- .../doris/planner/SingleNodePlanner.java | 12 - 10 files changed, 297 insertions(+), 123 deletions(-) diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp index 889c50a7f2..6965bb20d3 100644 --- a/be/src/exec/except_node.cpp +++ b/be/src/exec/except_node.cpp @@ -17,36 +17,21 @@ #include "exec/except_node.h" +#include "exec/hash_table.hpp" #include "exprs/expr.h" +#include "runtime/raw_value.h" +#include "runtime/row_batch.h" +#include "runtime/runtime_state.h" namespace doris { -// TODO(yangzhengguo) implememt this class -ExceptNode::ExceptNode(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs), - _tuple_id(tnode.except_node.tuple_id), - _tuple_desc(nullptr), - _first_materialized_child_idx(tnode.except_node.first_materialized_child_idx), - _child_idx(0), - _child_batch(nullptr), - _child_row_idx(0), - _child_eos(false), - _const_expr_list_idx(0), - _to_close_child_idx(-1) { -} +ExceptNode::ExceptNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : ExecNode(pool, tnode, descs) {} Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) { - // RETURN_IF_ERROR(ExecNode::init(tnode, state)); RETURN_IF_ERROR(ExecNode::init(tnode, state)); DCHECK(tnode.__isset.except_node); DCHECK_EQ(_conjunct_ctxs.size(), 0); - // Create const_expr_ctx_lists_ from thrift exprs. - auto& const_texpr_lists = tnode.except_node.const_expr_lists; - for (auto& texprs : const_texpr_lists) { - std::vector ctxs; - RETURN_IF_ERROR(Expr::create_expr_trees(_pool, texprs, &ctxs)); - _const_expr_lists.push_back(ctxs); - } + DCHECK_GE(_children.size(), 2); // Create result_expr_ctx_lists_ from thrift exprs. auto& result_texpr_lists = tnode.except_node.result_expr_lists; for (auto& texprs : result_texpr_lists) { @@ -56,4 +41,188 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) { } return Status::OK(); } -} \ No newline at end of file + +Status ExceptNode::prepare(RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::prepare(state)); + _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); + _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime"); + _build_pool.reset(new MemPool(mem_tracker())); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + for (size_t i = 0; i < _child_expr_lists.size(); ++i) { + RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(), + expr_mem_tracker())); + } + _build_tuple_size = child(0)->row_desc().tuple_descriptors().size(); + _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*); + _build_tuple_idx.reserve(_build_tuple_size); + + for (int i = 0; i < _build_tuple_size; ++i) { + TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i]; + _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id())); + } + _find_nulls = std::vector(_build_tuple_size, true); + return Status::OK(); +} +Status ExceptNode::close(RuntimeState* state) { + if (is_closed()) { + return Status::OK(); + } + for (auto& exprs : _child_expr_lists) { + Expr::close(exprs, state); + } + + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); + // Must reset _probe_batch in close() to release resources + _probe_batch.reset(NULL); + + if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) { + COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes()); + COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size()); + } + if (_hash_tbl.get() != NULL) { + _hash_tbl->close(); + } + if (_build_pool.get() != NULL) { + _build_pool->free_all(); + } + return ExecNode::close(state); +} +Status ExceptNode::open(RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::open(state)); + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_CANCELLED(state); + // open result expr lists. + for (const vector& exprs : _child_expr_lists) { + RETURN_IF_ERROR(Expr::open(exprs, state)); + } + // initial build hash table, use _child_expr_lists[0] as probe is used for remove duplicted + _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[0], _build_tuple_size, + true, _find_nulls, id(), mem_tracker(), 1024)); + RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker()); + RETURN_IF_ERROR(child(0)->open(state)); + + bool eos = false; + while (!eos) { + SCOPED_TIMER(_build_timer); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos)); + // take ownership of tuple data of build_batch + _build_pool->acquire_data(build_batch.tuple_data_pool(), false); + RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table."); + // build hash table and remvoe duplicate items + for (int i = 0; i < build_batch.num_rows(); ++i) { + _hash_tbl->insert_unique(build_batch.get_row(i)); + } + VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc()); + build_batch.reset(); + } + // if a table is empty, the result must be empty + + if (_hash_tbl->size() == 0) { + _hash_tbl_iterator = _hash_tbl->begin(); + return Status::OK(); + } + + for (int i = 1; i < _children.size(); ++i) { + // rebuid hash table, for first time will rebuild with the no duplicated _hash_tbl, + if (i > 1) { + SCOPED_TIMER(_build_timer); + std::unique_ptr temp_tbl( + new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size, + true, _find_nulls, id(), mem_tracker(), 1024)); + _hash_tbl_iterator = _hash_tbl->begin(); + uint32_t previous_hash = -1; + while (_hash_tbl_iterator.has_next()) { + if (previous_hash != _hash_tbl_iterator.get_hash()) { + previous_hash = _hash_tbl_iterator.get_hash(); + if (!_hash_tbl_iterator.matched()) { + temp_tbl->insert(_hash_tbl_iterator.get_row()); + } + } + _hash_tbl_iterator.next(); + } + _hash_tbl.swap(temp_tbl); + temp_tbl->close(); + } + // probe + _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker())); + ScopedTimer probe_timer(_probe_timer); + RETURN_IF_ERROR(child(i)->open(state)); + eos = false; + while (!eos) { + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos)); + RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table."); + for (int j = 0; j < _probe_batch->num_rows(); ++j) { + _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j)); + if (_hash_tbl_iterator != _hash_tbl->end()) { + _hash_tbl_iterator.set_matched(); + } + } + _probe_batch->reset(); + } + // if a table is empty, the result must be empty + if (_hash_tbl->size() == 0) { + break; + } + } + _hash_tbl_iterator = _hash_tbl->begin(); + return Status::OK(); +} + +Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) { + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); + RETURN_IF_CANCELLED(state); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + *eos = true; + if (reached_limit()) { + return Status::OK(); + } + uint32_t previous_hash = -1; + TupleRow* previous_row = nullptr; + while (_hash_tbl_iterator.has_next()) { + if (!_hash_tbl_iterator.matched()) { + if (previous_hash != _hash_tbl_iterator.get_hash() || + !equals(previous_row, _hash_tbl_iterator.get_row())) { + int row_idx = out_batch->add_row(); + TupleRow* out_row = out_batch->get_row(row_idx); + uint8_t* out_ptr = reinterpret_cast(out_row); + memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size); + out_batch->commit_last_row(); + ++_num_rows_returned; + } + } + previous_hash = _hash_tbl_iterator.get_hash(); + previous_row = _hash_tbl_iterator.get_row(); + _hash_tbl_iterator.next(); + + *eos = !_hash_tbl_iterator.has_next() || reached_limit(); + if (out_batch->is_full() || out_batch->at_resource_limit() || *eos) { + return Status::OK(); + } + } + return Status::OK(); +} + +bool ExceptNode::equals(TupleRow* row, TupleRow* other) { + DCHECK(!(row == nullptr && other == nullptr)); + if (row == nullptr || other == nullptr) { + return false; + } + for (int i = 0; i < _child_expr_lists[0].size(); ++i) { + void* val_row = _child_expr_lists[0][i]->get_value(row); + void* val_other = _child_expr_lists[0][i]->get_value(other); + if (val_row == nullptr && val_other == nullptr) { + continue; + } else if (val_row == nullptr || val_other == nullptr) { + return false; + } else if (!RawValue::eq(val_row, val_other, _child_expr_lists[0][i]->root()->type())) { + return false; + } + } + + return true; +} + +} // namespace doris diff --git a/be/src/exec/except_node.h b/be/src/exec/except_node.h index 58ce0641dc..17802cf796 100644 --- a/be/src/exec/except_node.h +++ b/be/src/exec/except_node.h @@ -15,21 +15,23 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_QUERY_EXEC_EXCEPT_NODE_H -#define DORIS_BE_SRC_QUERY_EXEC_EXCEPT_NODE_H +#ifndef DORIS_BE_SRC_QUERY_EXEC_EXCEPT_NODE_H +#define DORIS_BE_SRC_QUERY_EXEC_EXCEPT_NODE_H #include "exec/exec_node.h" -#include "runtime/row_batch.h" -#include "runtime/runtime_state.h" +#include "exec/hash_table.h" namespace doris { +class MemPool; +class RowBatch; +class TupleRow; + // Node that calulate the except results of its children by either materializing their // evaluated expressions into row batches or passing through (forwarding) the // batches if the input tuple layout is identical to the output tuple layout -// and expressions don't need to be evaluated. The children should be ordered -// such that all passthrough children come before the children that need -// materialization. The except node pulls from its children sequentially, i.e. +// and expressions don't need to be evaluated. The except node pulls from its +// children sequentially, i.e. // it exhausts one child completely before moving on to the next one. class ExceptNode : public ExecNode { public: @@ -37,60 +39,31 @@ public: virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); virtual Status prepare(RuntimeState* state); - virtual void codegen(RuntimeState* state); virtual Status open(RuntimeState* state); virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos); - // virtual Status reset(RuntimeState* state); virtual Status close(RuntimeState* state); private: - /// Tuple id resolved in Prepare() to set tuple_desc_; - const int _tuple_id; + // Returns true if the values of row and other are equal + bool equals(TupleRow* row, TupleRow* other); - /// Descriptor for tuples this union node constructs. - const TupleDescriptor* _tuple_desc; - - /// Index of the first non-passthrough child; i.e. a child that needs materialization. - /// 0 when all children are materialized, '_children.size()' when no children are - /// materialized. - const int _first_materialized_child_idx; - - /// Const exprs materialized by this node. These exprs don't refer to any children. - /// Only materialized by the first fragment instance to avoid duplication. - std::vector> _const_expr_lists; - - /// Exprs materialized by this node. The i-th result expr list refers to the i-th child. + // Exprs materialized by this node. The i-th result expr list refers to the i-th child. std::vector> _child_expr_lists; - ///////////////////////////////////////// - /// BEGIN: Members that must be Reset() + std::unique_ptr _hash_tbl; + HashTable::Iterator _hash_tbl_iterator; + std::unique_ptr _probe_batch; - /// Index of current child. - int _child_idx; + // holds everything referenced in _hash_tbl + std::unique_ptr _build_pool; - /// Current row batch of current child. We reset the pointer to a new RowBatch - /// when switching to a different child. - std::unique_ptr _child_batch; + std::vector _build_tuple_idx; + int _build_tuple_size; + int _build_tuple_row_size; + std::vector _find_nulls; - /// Index of current row in child_row_batch_. - int _child_row_idx; - - typedef void (*ExceptMaterializeBatchFn)(ExceptNode*, RowBatch*, uint8_t**); - /// Vector of pointers to codegen'ed materialize_batch functions. The vector contains one - /// function for each child. The size of the vector should be equal to the number of - /// children. If a child is passthrough, there should be a NULL for that child. If - /// Codegen is disabled, there should be a NULL for every child. - std::vector _codegend_except_materialize_batch_fns; - - /// Saved from the last to GetNext() on the current child. - bool _child_eos; - - /// Index of current const result expr list. - int _const_expr_list_idx; - - /// Index of the child that needs to be closed on the next GetNext() call. Should be set - /// to -1 if no child needs to be closed. - int _to_close_child_idx; + RuntimeProfile::Counter* _build_timer; // time to build hash table + RuntimeProfile::Counter* _probe_timer; // time to probe }; }; // namespace doris diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index bc67a9d440..f624aec546 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -446,9 +446,9 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN *node = pool->add(new IntersectNode(pool, tnode, descs)); return Status::OK(); - // case TPlanNodeType::EXCEPT_NODE: - // *node = pool->add(new ExceptNode(pool, tnode, descs)); - // return Status::OK(); + case TPlanNodeType::EXCEPT_NODE: + *node = pool->add(new ExceptNode(pool, tnode, descs)); + return Status::OK(); case TPlanNodeType::BROKER_SCAN_NODE: *node = pool->add(new BrokerScanNode(pool, tnode, descs)); diff --git a/be/src/exec/hash_table.h b/be/src/exec/hash_table.h index 3d4b502975..06544b34d0 100644 --- a/be/src/exec/hash_table.h +++ b/be/src/exec/hash_table.h @@ -113,6 +113,14 @@ public: insert_impl(row); } + // Insert row into the hash table. if the row is alread exist will not insert + void IR_ALWAYS_INLINE insert_unique(TupleRow* row) { + if (find(row, false) == end()) { + insert(row); + } + } + + // Returns the start iterator for all rows that match 'probe_row'. 'probe_row' is // evaluated with _probe_expr_ctxs. The iterator can be iterated until HashTable::end() // to find all the matching rows. @@ -122,7 +130,7 @@ public: // Advancing the returned iterator will go to the next matching row. The matching // rows are evaluated lazily (i.e. computed as the Iterator is moved). // Returns HashTable::end() if there is no match. - Iterator IR_ALWAYS_INLINE find(TupleRow* probe_row); + Iterator IR_ALWAYS_INLINE find(TupleRow* probe_row, bool probe = true); // Returns number of elements in the hash table int64_t size() { @@ -197,6 +205,11 @@ public: return _table->get_node(_node_idx)->data(); } + // Returns Hash + uint32_t get_hash() { + return _table->get_node(_node_idx)->_hash; + } + // Returns if the iterator is at the end bool has_next() { return _node_idx != -1; diff --git a/be/src/exec/hash_table.hpp b/be/src/exec/hash_table.hpp index e613ac3971..70f0e38f7f 100644 --- a/be/src/exec/hash_table.hpp +++ b/be/src/exec/hash_table.hpp @@ -22,8 +22,8 @@ namespace doris { -inline HashTable::Iterator HashTable::find(TupleRow* probe_row) { - bool has_nulls = eval_probe_row(probe_row); +inline HashTable::Iterator HashTable::find(TupleRow* probe_row, bool probe) { + bool has_nulls = probe ? eval_probe_row(probe_row) : eval_build_row(probe_row); if (!_stores_nulls && has_nulls) { return end(); diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp index ca590c1ef7..b694a4ee00 100755 --- a/be/src/exec/intersect_node.cpp +++ b/be/src/exec/intersect_node.cpp @@ -44,6 +44,8 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) { Status IntersectNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); _build_pool.reset(new MemPool(mem_tracker())); + _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); + _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime"); SCOPED_TIMER(_runtime_profile->total_time_counter()); for (size_t i = 0; i < _child_expr_lists.size(); ++i) { RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(), @@ -57,7 +59,7 @@ Status IntersectNode::prepare(RuntimeState* state) { TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i]; _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id())); } - _find_nulls = std::vector(_child_expr_lists.size(), true); + _find_nulls = std::vector(_build_tuple_size, true); return Status::OK(); } @@ -108,13 +110,14 @@ Status IntersectNode::open(RuntimeState* state) { RETURN_IF_ERROR(child(0)->open(state)); bool eos = false; while (!eos) { + SCOPED_TIMER(_build_timer); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos)); // take ownership of tuple data of build_batch _build_pool->acquire_data(build_batch.tuple_data_pool(), false); RETURN_IF_LIMIT_EXCEEDED(state, " Intersect, while constructing the hash table."); for (int i = 0; i < build_batch.num_rows(); ++i) { - _hash_tbl->insert(build_batch.get_row(i)); + _hash_tbl->insert_unique(build_batch.get_row(i)); } VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc()); build_batch.reset(); @@ -126,24 +129,8 @@ Status IntersectNode::open(RuntimeState* state) { } for (int i = 1; i < _children.size(); ++i) { - // probe - _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker())); - RETURN_IF_ERROR(child(i)->open(state)); - eos = false; - while (!eos) { - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos)); - RETURN_IF_LIMIT_EXCEEDED(state, " Intersect , while probing the hash table."); - for (int j = 0; j < _probe_batch->num_rows(); ++j) { - _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j)); - if (_hash_tbl_iterator != _hash_tbl->end()) { - _hash_tbl_iterator.set_matched(); - } - } - _probe_batch->reset(); - } - // rebuid hash table - if (i != _children.size() - 1) { + if (i > 1) { + SCOPED_TIMER(_build_timer); std::unique_ptr temp_tbl( new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size, true, _find_nulls, id(), mem_tracker(), 1024)); @@ -163,6 +150,23 @@ Status IntersectNode::open(RuntimeState* state) { break; } } + // probe + _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker())); + ScopedTimer probe_timer(_probe_timer); + RETURN_IF_ERROR(child(i)->open(state)); + eos = false; + while (!eos) { + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos)); + RETURN_IF_LIMIT_EXCEEDED(state, " Intersect , while probing the hash table."); + for (int j = 0; j < _probe_batch->num_rows(); ++j) { + _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j)); + if (_hash_tbl_iterator != _hash_tbl->end()) { + _hash_tbl_iterator.set_matched(); + } + } + _probe_batch->reset(); + } } _hash_tbl_iterator = _hash_tbl->begin(); return Status::OK(); @@ -183,11 +187,12 @@ Status IntersectNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* e uint8_t* out_ptr = reinterpret_cast(out_row); memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size); out_batch->commit_last_row(); + ++_num_rows_returned; } _hash_tbl_iterator.next(); - *eos = !_hash_tbl_iterator.has_next(); - if (out_batch->is_full() || out_batch->at_resource_limit()) { + *eos = !_hash_tbl_iterator.has_next() || reached_limit(); + if (out_batch->is_full() || out_batch->at_resource_limit() || *eos) { return Status::OK(); } } diff --git a/be/src/exec/intersect_node.h b/be/src/exec/intersect_node.h index 249be5d4b4..5e1f021cc5 100644 --- a/be/src/exec/intersect_node.h +++ b/be/src/exec/intersect_node.h @@ -58,6 +58,9 @@ private: int _build_tuple_size; int _build_tuple_row_size; std::vector _find_nulls; + + RuntimeProfile::Counter* _build_timer; // time to build hash table + RuntimeProfile::Counter* _probe_timer; // time to probe }; }; // namespace doris diff --git a/fe/src/main/java/org/apache/doris/analysis/SetOperationStmt.java b/fe/src/main/java/org/apache/doris/analysis/SetOperationStmt.java index 1a24fcad60..75237d2b8c 100644 --- a/fe/src/main/java/org/apache/doris/analysis/SetOperationStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/SetOperationStmt.java @@ -29,6 +29,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -474,7 +475,6 @@ public class SetOperationStmt extends QueryStmt { baseTblResultExprs = resultExprs; } - /** * Marks the baseTblResultExprs of its operands as materialized, based on * which of the output slots have been marked. @@ -649,6 +649,29 @@ public class SetOperationStmt extends QueryStmt { if (isAnalyzed()) { return; } + if (queryStmt instanceof SelectStmt && ((SelectStmt) queryStmt).fromClause_.isEmpty()) { + // rewrite select 1 to select * from (select 1) __DORIS_DUAL__ , because when using select 1 it will be + // transformed to a union node, select 1 is a literal, it doesn't have a tuple but will produce a slot, + // this will cause be core dump + QueryStmt inlineQuery = queryStmt.clone(); + Map map = new HashMap<>(); + // rename select 2,2 to select 2 as 2_1, 2 as 2_2 to avoid duplicated column in inline view + for (int i = 0; i < ((SelectStmt) inlineQuery).selectList.getItems().size(); ++i) { + SelectListItem item = ((SelectStmt) inlineQuery).selectList.getItems().get(i); + String col = item.toColumnLabel(); + Integer count = map.get(col); + count = (count == null) ? 1 : count + 1; + map.put(col, count); + if (count > 1) { + ((SelectStmt) inlineQuery).selectList.getItems() + .set(i, new SelectListItem(item.getExpr(), col + "_" + count.toString())); + } + } + ((SelectStmt) queryStmt).fromClause_.add(new InlineViewRef("__DORIS_DUAL__", inlineQuery)); + List slist = ((SelectStmt) queryStmt).selectList.getItems(); + slist.clear(); + slist.add(SelectListItem.createStarItem(null)); + } // Oracle and ms-SQLServer do not support INTERSECT ALL and EXCEPT ALL, postgres support it, // but it is very ambiguous if (qualifier_ == Qualifier.ALL && (operation == Operation.EXCEPT || operation == Operation.INTERSECT)) { diff --git a/fe/src/main/java/org/apache/doris/planner/SetOperationNode.java b/fe/src/main/java/org/apache/doris/planner/SetOperationNode.java index fb0f6c7394..9d2b959daf 100644 --- a/fe/src/main/java/org/apache/doris/planner/SetOperationNode.java +++ b/fe/src/main/java/org/apache/doris/planner/SetOperationNode.java @@ -17,17 +17,17 @@ package org.apache.doris.planner; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.analysis.TupleId; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; import org.apache.doris.thrift.TExceptNode; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TExpr; @@ -38,9 +38,9 @@ import org.apache.doris.thrift.TUnionNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; /** * Node that merges the results of its child plans, Normally, this is done by @@ -266,10 +266,10 @@ public abstract class SetOperationNode extends PlanNode { Preconditions.checkState(conjuncts.isEmpty()); computeMemLayout(analyzer); computeStats(analyzer); + // except Node must not reorder the child if (!(this instanceof ExceptNode)) { computePassthrough(analyzer); } - // drop resultExprs/constExprs that aren't getting materialized (= where the // corresponding output slot isn't being materialized) materializedResultExprLists_.clear(); diff --git a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 6a6d02cdbd..4f487867ac 100644 --- a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1533,7 +1533,6 @@ public class SingleNodePlanner { throws UserException, AnalysisException { SetOperationNode setOpNode; SetOperationStmt.Operation operation = null; - boolean hasConst = false; for (SetOperationStmt.SetOperand setOperand : setOperands) { if (setOperand.getOperation() != null) { if (operation == null) { @@ -1542,17 +1541,6 @@ public class SingleNodePlanner { Preconditions.checkState(operation == setOperand.getOperation(), "can not support mixed set " + "operations at here"); } - if (setOperand.getQueryStmt() instanceof SelectStmt) { - SelectStmt selectStmt = (SelectStmt) setOperand.getQueryStmt(); - if (selectStmt.getTableRefs().isEmpty()) { - hasConst = true; - } - } - } - if (hasConst && !((SelectStmt) setOperands.get(0).getQueryStmt()).getTableRefs().isEmpty() - && operation != SetOperationStmt.Operation.UNION) { - // TODO(yangzhengguo) fix be will core when const exprs not at first - throw new AnalysisException("not supported const expr in INTERSECT or EXCEPT"); } switch (operation) { case UNION: