diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp index 406e9f1acf..e6de96189a 100644 --- a/be/src/exec/except_node.cpp +++ b/be/src/exec/except_node.cpp @@ -39,43 +39,14 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) { } Status ExceptNode::open(RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::open(state)); - RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_CANCELLED(state); - // open result expr lists. - for (const vector& exprs : _child_expr_lists) { - RETURN_IF_ERROR(Expr::open(exprs, state)); - } - // initial build hash table, use _child_expr_lists[0] as probe is used for remove duplicted - _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[0], _build_tuple_size, - true, _find_nulls, id(), mem_tracker(), 1024)); - RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker()); - RETURN_IF_ERROR(child(0)->open(state)); - - bool eos = false; - while (!eos) { - SCOPED_TIMER(_build_timer); - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos)); - // take ownership of tuple data of build_batch - _build_pool->acquire_data(build_batch.tuple_data_pool(), false); - RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table."); - // build hash table and remove duplicate items - for (int i = 0; i < build_batch.num_rows(); ++i) { - VLOG_ROW << "build row: " - << get_row_output_string(build_batch.get_row(i), child(0)->row_desc()); - _hash_tbl->insert_unique(build_batch.get_row(i)); - } - VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc()); - build_batch.reset(); - } + RETURN_IF_ERROR(SetOperationNode::open(state)); // if a table is empty, the result must be empty if (_hash_tbl->size() == 0) { _hash_tbl_iterator = _hash_tbl->begin(); return Status::OK(); } + bool eos = false; for (int i = 1; i < _children.size(); ++i) { // rebuid hash table, for first time will rebuild with the no duplicated _hash_tbl, @@ -116,6 +87,8 @@ Status ExceptNode::open(RuntimeState* state) { _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j)); if (_hash_tbl_iterator != _hash_tbl->end()) { _hash_tbl_iterator.set_matched(); + VLOG_ROW << "probe matched: " + << get_row_output_string(_hash_tbl_iterator.get_row(), child(0)->row_desc()); } } _probe_batch->reset(); diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp index 4131e6efc9..39bd10cfca 100755 --- a/be/src/exec/intersect_node.cpp +++ b/be/src/exec/intersect_node.cpp @@ -43,41 +43,13 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) { // 2 probe with child(1), then filter the hash table and find the matched item, use them to rebuild a hash table // repeat [2] this for all the rest child Status IntersectNode::open(RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::open(state)); - RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_CANCELLED(state); - // open result expr lists. - for (const vector& exprs : _child_expr_lists) { - RETURN_IF_ERROR(Expr::open(exprs, state)); - } - - // initial build hash table - _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], _build_tuple_size, - true, _find_nulls, id(), mem_tracker(), 1024)); - RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker()); - RETURN_IF_ERROR(child(0)->open(state)); - bool eos = false; - while (!eos) { - SCOPED_TIMER(_build_timer); - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos)); - // take ownership of tuple data of build_batch - _build_pool->acquire_data(build_batch.tuple_data_pool(), false); - RETURN_IF_LIMIT_EXCEEDED(state, " Intersect, while constructing the hash table."); - for (int i = 0; i < build_batch.num_rows(); ++i) { - VLOG_ROW << "build row: " - << get_row_output_string(build_batch.get_row(i), child(0)->row_desc()); - _hash_tbl->insert_unique(build_batch.get_row(i)); - } - VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc()); - build_batch.reset(); - } + RETURN_IF_ERROR(SetOperationNode::open(state)); // if a table is empty, the result must be empty if (_hash_tbl->size() == 0) { _hash_tbl_iterator = _hash_tbl->begin(); return Status::OK(); } + bool eos = false; for (int i = 1; i < _children.size(); ++i) { if (i > 1) { @@ -119,6 +91,8 @@ Status IntersectNode::open(RuntimeState* state) { _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j)); if (_hash_tbl_iterator != _hash_tbl->end()) { _hash_tbl_iterator.set_matched(); + VLOG_ROW << "probe matched: " + << get_row_output_string(_hash_tbl_iterator.get_row(), child(0)->row_desc()); } } _probe_batch->reset(); diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp index d21f7436d0..8321899212 100644 --- a/be/src/exec/set_operation_node.cpp +++ b/be/src/exec/set_operation_node.cpp @@ -133,4 +133,39 @@ bool SetOperationNode::equals(TupleRow* row, TupleRow* other) { } return true; } + +Status SetOperationNode::open(RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::open(state)); + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_CANCELLED(state); + // open result expr lists. + for (const vector& exprs : _child_expr_lists) { + RETURN_IF_ERROR(Expr::open(exprs, state)); + } + // initial build hash table used for remove duplicted + _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], _build_tuple_size, + true, _find_nulls, id(), mem_tracker(), 1024)); + RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker()); + RETURN_IF_ERROR(child(0)->open(state)); + + bool eos = false; + while (!eos) { + SCOPED_TIMER(_build_timer); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos)); + // take ownership of tuple data of build_batch + _build_pool->acquire_data(build_batch.tuple_data_pool(), false); + RETURN_IF_LIMIT_EXCEEDED(state, " SetOperation, while constructing the hash table."); + // build hash table and remove duplicate items + for (int i = 0; i < build_batch.num_rows(); ++i) { + VLOG_ROW << "build row: " + << get_row_output_string(build_batch.get_row(i), child(0)->row_desc()); + _hash_tbl->insert_unique(build_batch.get_row(i)); + } + VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc()); + build_batch.reset(); + } + return Status::OK(); +} } // namespace doris diff --git a/be/src/exec/set_operation_node.h b/be/src/exec/set_operation_node.h index 6b12bf2e45..38c5def2d0 100644 --- a/be/src/exec/set_operation_node.h +++ b/be/src/exec/set_operation_node.h @@ -41,6 +41,8 @@ public: virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); virtual Status prepare(RuntimeState* state); virtual Status close(RuntimeState* state); + virtual Status open(RuntimeState* state); + protected: std::string get_row_output_string(TupleRow* row, const RowDescriptor& row_desc);