diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 2fd4aade3f..4eea7d4bfb 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -70,6 +70,7 @@ set(EXEC_FILES spill_sort_node.cc union_node.cpp union_node_ir.cpp + set_operation_node.cpp intersect_node.cpp except_node.cpp repeat_node.cpp diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp index 6965bb20d3..406e9f1acf 100644 --- a/be/src/exec/except_node.cpp +++ b/be/src/exec/except_node.cpp @@ -19,19 +19,15 @@ #include "exec/hash_table.hpp" #include "exprs/expr.h" -#include "runtime/raw_value.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" namespace doris { ExceptNode::ExceptNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs) {} + : SetOperationNode(pool, tnode, descs, tnode.except_node.tuple_id) {} Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::init(tnode, state)); - DCHECK(tnode.__isset.except_node); - DCHECK_EQ(_conjunct_ctxs.size(), 0); - DCHECK_GE(_children.size(), 2); + RETURN_IF_ERROR(SetOperationNode::init(tnode, state)); // Create result_expr_ctx_lists_ from thrift exprs. auto& result_texpr_lists = tnode.except_node.result_expr_lists; for (auto& texprs : result_texpr_lists) { @@ -42,51 +38,6 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -Status ExceptNode::prepare(RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::prepare(state)); - _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); - _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime"); - _build_pool.reset(new MemPool(mem_tracker())); - SCOPED_TIMER(_runtime_profile->total_time_counter()); - for (size_t i = 0; i < _child_expr_lists.size(); ++i) { - RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(), - expr_mem_tracker())); - } - _build_tuple_size = child(0)->row_desc().tuple_descriptors().size(); - _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*); - _build_tuple_idx.reserve(_build_tuple_size); - - for (int i = 0; i < _build_tuple_size; ++i) { - TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i]; - _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id())); - } - _find_nulls = std::vector(_build_tuple_size, true); - return Status::OK(); -} -Status ExceptNode::close(RuntimeState* state) { - if (is_closed()) { - return Status::OK(); - } - for (auto& exprs : _child_expr_lists) { - Expr::close(exprs, state); - } - - RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); - // Must reset _probe_batch in close() to release resources - _probe_batch.reset(NULL); - - if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) { - COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes()); - COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size()); - } - if (_hash_tbl.get() != NULL) { - _hash_tbl->close(); - } - if (_build_pool.get() != NULL) { - _build_pool->free_all(); - } - return ExecNode::close(state); -} Status ExceptNode::open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::open(state)); RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); @@ -110,8 +61,10 @@ Status ExceptNode::open(RuntimeState* state) { // take ownership of tuple data of build_batch _build_pool->acquire_data(build_batch.tuple_data_pool(), false); RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table."); - // build hash table and remvoe duplicate items + // build hash table and remove duplicate items for (int i = 0; i < build_batch.num_rows(); ++i) { + VLOG_ROW << "build row: " + << get_row_output_string(build_batch.get_row(i), child(0)->row_desc()); _hash_tbl->insert_unique(build_batch.get_row(i)); } VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc()); @@ -137,6 +90,9 @@ Status ExceptNode::open(RuntimeState* state) { if (previous_hash != _hash_tbl_iterator.get_hash()) { previous_hash = _hash_tbl_iterator.get_hash(); if (!_hash_tbl_iterator.matched()) { + VLOG_ROW << "rebuild row: " + << get_row_output_string(_hash_tbl_iterator.get_row(), + child(0)->row_desc()); temp_tbl->insert(_hash_tbl_iterator.get_row()); } } @@ -155,6 +111,8 @@ Status ExceptNode::open(RuntimeState* state) { RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos)); RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table."); for (int j = 0; j < _probe_batch->num_rows(); ++j) { + VLOG_ROW << "probe row: " + << get_row_output_string(_probe_batch->get_row(j), child(i)->row_desc()); _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j)); if (_hash_tbl_iterator != _hash_tbl->end()) { _hash_tbl_iterator.set_matched(); @@ -179,24 +137,21 @@ Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) if (reached_limit()) { return Status::OK(); } - uint32_t previous_hash = -1; - TupleRow* previous_row = nullptr; + int64_t tuple_buf_size; + uint8_t* tuple_buf; + RETURN_IF_ERROR( + out_batch->resize_and_allocate_tuple_buffer(state, &tuple_buf_size, &tuple_buf)); + memset(tuple_buf, 0, tuple_buf_size); while (_hash_tbl_iterator.has_next()) { + VLOG_ROW << "find row: " + << get_row_output_string(_hash_tbl_iterator.get_row(), child(0)->row_desc()) + << " matched: " << _hash_tbl_iterator.matched(); if (!_hash_tbl_iterator.matched()) { - if (previous_hash != _hash_tbl_iterator.get_hash() || - !equals(previous_row, _hash_tbl_iterator.get_row())) { - int row_idx = out_batch->add_row(); - TupleRow* out_row = out_batch->get_row(row_idx); - uint8_t* out_ptr = reinterpret_cast(out_row); - memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size); - out_batch->commit_last_row(); - ++_num_rows_returned; - } + create_output_row(_hash_tbl_iterator.get_row(), out_batch, tuple_buf); + tuple_buf += _tuple_desc->byte_size(); + ++_num_rows_returned; } - previous_hash = _hash_tbl_iterator.get_hash(); - previous_row = _hash_tbl_iterator.get_row(); _hash_tbl_iterator.next(); - *eos = !_hash_tbl_iterator.has_next() || reached_limit(); if (out_batch->is_full() || out_batch->at_resource_limit() || *eos) { return Status::OK(); @@ -205,24 +160,4 @@ Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) return Status::OK(); } -bool ExceptNode::equals(TupleRow* row, TupleRow* other) { - DCHECK(!(row == nullptr && other == nullptr)); - if (row == nullptr || other == nullptr) { - return false; - } - for (int i = 0; i < _child_expr_lists[0].size(); ++i) { - void* val_row = _child_expr_lists[0][i]->get_value(row); - void* val_other = _child_expr_lists[0][i]->get_value(other); - if (val_row == nullptr && val_other == nullptr) { - continue; - } else if (val_row == nullptr || val_other == nullptr) { - return false; - } else if (!RawValue::eq(val_row, val_other, _child_expr_lists[0][i]->root()->type())) { - return false; - } - } - - return true; -} - } // namespace doris diff --git a/be/src/exec/except_node.h b/be/src/exec/except_node.h index 17802cf796..30a987b25b 100644 --- a/be/src/exec/except_node.h +++ b/be/src/exec/except_node.h @@ -15,11 +15,9 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_QUERY_EXEC_EXCEPT_NODE_H -#define DORIS_BE_SRC_QUERY_EXEC_EXCEPT_NODE_H +#pragma once -#include "exec/exec_node.h" -#include "exec/hash_table.h" +#include "exec/set_operation_node.h" namespace doris { @@ -33,39 +31,13 @@ class TupleRow; // and expressions don't need to be evaluated. The except node pulls from its // children sequentially, i.e. // it exhausts one child completely before moving on to the next one. -class ExceptNode : public ExecNode { +class ExceptNode : public SetOperationNode { public: ExceptNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); - virtual Status prepare(RuntimeState* state); virtual Status open(RuntimeState* state); virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos); - virtual Status close(RuntimeState* state); - -private: - // Returns true if the values of row and other are equal - bool equals(TupleRow* row, TupleRow* other); - - // Exprs materialized by this node. The i-th result expr list refers to the i-th child. - std::vector> _child_expr_lists; - - std::unique_ptr _hash_tbl; - HashTable::Iterator _hash_tbl_iterator; - std::unique_ptr _probe_batch; - - // holds everything referenced in _hash_tbl - std::unique_ptr _build_pool; - - std::vector _build_tuple_idx; - int _build_tuple_size; - int _build_tuple_row_size; - std::vector _find_nulls; - - RuntimeProfile::Counter* _build_timer; // time to build hash table - RuntimeProfile::Counter* _probe_timer; // time to probe }; }; // namespace doris - -#endif // DORIS_BE_SRC_QUERY_EXEC_EXCEPT_NODE_H \ No newline at end of file diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp index b694a4ee00..4131e6efc9 100755 --- a/be/src/exec/intersect_node.cpp +++ b/be/src/exec/intersect_node.cpp @@ -24,13 +24,10 @@ namespace doris { IntersectNode::IntersectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs) {} + : SetOperationNode(pool, tnode, descs, tnode.intersect_node.tuple_id) {} Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::init(tnode, state)); - DCHECK(tnode.__isset.intersect_node); - DCHECK_EQ(_conjunct_ctxs.size(), 0); - DCHECK_GE(_children.size(), 2); + RETURN_IF_ERROR(SetOperationNode::init(tnode, state)); // Create result_expr_ctx_lists_ from thrift exprs. auto& result_texpr_lists = tnode.intersect_node.result_expr_lists; for (auto& texprs : result_texpr_lists) { @@ -41,54 +38,6 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -Status IntersectNode::prepare(RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::prepare(state)); - _build_pool.reset(new MemPool(mem_tracker())); - _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); - _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime"); - SCOPED_TIMER(_runtime_profile->total_time_counter()); - for (size_t i = 0; i < _child_expr_lists.size(); ++i) { - RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(), - expr_mem_tracker())); - } - _build_tuple_size = child(0)->row_desc().tuple_descriptors().size(); - _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*); - _build_tuple_idx.reserve(_build_tuple_size); - - for (int i = 0; i < _build_tuple_size; ++i) { - TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i]; - _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id())); - } - _find_nulls = std::vector(_build_tuple_size, true); - return Status::OK(); -} - -Status IntersectNode::close(RuntimeState* state) { - if (is_closed()) { - return Status::OK(); - } - for (auto& exprs : _child_expr_lists) { - Expr::close(exprs, state); - } - - RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); - // Must reset _probe_batch in close() to release resources - _probe_batch.reset(NULL); - - if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) { - COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes()); - COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size()); - } - if (_hash_tbl.get() != NULL) { - _hash_tbl->close(); - } - if (_build_pool.get() != NULL) { - _build_pool->free_all(); - } - - return ExecNode::close(state); -} - // the actual intersect operation is in this function, // 1 build a hash table from child(0) // 2 probe with child(1), then filter the hash table and find the matched item, use them to rebuild a hash table @@ -117,6 +66,8 @@ Status IntersectNode::open(RuntimeState* state) { _build_pool->acquire_data(build_batch.tuple_data_pool(), false); RETURN_IF_LIMIT_EXCEEDED(state, " Intersect, while constructing the hash table."); for (int i = 0; i < build_batch.num_rows(); ++i) { + VLOG_ROW << "build row: " + << get_row_output_string(build_batch.get_row(i), child(0)->row_desc()); _hash_tbl->insert_unique(build_batch.get_row(i)); } VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc()); @@ -137,6 +88,9 @@ Status IntersectNode::open(RuntimeState* state) { _hash_tbl_iterator = _hash_tbl->begin(); while (_hash_tbl_iterator.has_next()) { if (_hash_tbl_iterator.matched()) { + VLOG_ROW << "rebuild row: " + << get_row_output_string(_hash_tbl_iterator.get_row(), + child(0)->row_desc()); temp_tbl->insert(_hash_tbl_iterator.get_row()); } _hash_tbl_iterator.next(); @@ -160,6 +114,8 @@ Status IntersectNode::open(RuntimeState* state) { RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos)); RETURN_IF_LIMIT_EXCEEDED(state, " Intersect , while probing the hash table."); for (int j = 0; j < _probe_batch->num_rows(); ++j) { + VLOG_ROW << "probe row: " + << get_row_output_string(_probe_batch->get_row(j), child(i)->row_desc()); _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j)); if (_hash_tbl_iterator != _hash_tbl->end()) { _hash_tbl_iterator.set_matched(); @@ -180,16 +136,20 @@ Status IntersectNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* e if (reached_limit()) { return Status::OK(); } + int64_t tuple_buf_size; + uint8_t* tuple_buf; + RETURN_IF_ERROR( + out_batch->resize_and_allocate_tuple_buffer(state, &tuple_buf_size, &tuple_buf)); + memset(tuple_buf, 0, tuple_buf_size); while (_hash_tbl_iterator.has_next()) { + VLOG_ROW << "find row: " + << get_row_output_string(_hash_tbl_iterator.get_row(), child(0)->row_desc()) + << " matched: " << _hash_tbl_iterator.matched(); if (_hash_tbl_iterator.matched()) { - int row_idx = out_batch->add_row(); - TupleRow* out_row = out_batch->get_row(row_idx); - uint8_t* out_ptr = reinterpret_cast(out_row); - memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size); - out_batch->commit_last_row(); + create_output_row(_hash_tbl_iterator.get_row(), out_batch, tuple_buf); + tuple_buf += _tuple_desc->byte_size(); ++_num_rows_returned; } - _hash_tbl_iterator.next(); *eos = !_hash_tbl_iterator.has_next() || reached_limit(); if (out_batch->is_full() || out_batch->at_resource_limit() || *eos) { @@ -198,5 +158,4 @@ Status IntersectNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* e } return Status::OK(); } - } // namespace doris \ No newline at end of file diff --git a/be/src/exec/intersect_node.h b/be/src/exec/intersect_node.h index 5e1f021cc5..c729063036 100644 --- a/be/src/exec/intersect_node.h +++ b/be/src/exec/intersect_node.h @@ -15,11 +15,9 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_QUERY_EXEC_INTERSECT_NODE_H -#define DORIS_BE_SRC_QUERY_EXEC_INTERSECT_NODE_H +#pragma once -#include "exec/exec_node.h" -#include "exec/hash_table.h" +#include "exec/set_operation_node.h" namespace doris { @@ -34,35 +32,13 @@ class TupleRow; // such that all passthrough children come before the children that need // materialization. The interscet node pulls from its children sequentially, i.e. // it exhausts one child completely before moving on to the next one. -class IntersectNode : public ExecNode { +class IntersectNode : public SetOperationNode { public: IntersectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); - virtual Status prepare(RuntimeState* state); virtual Status open(RuntimeState* state); virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos); - virtual Status close(RuntimeState* state); - -private: - // Exprs materialized by this node. The i-th result expr list refers to the i-th child. - std::vector> _child_expr_lists; - - std::unique_ptr _hash_tbl; - HashTable::Iterator _hash_tbl_iterator; - std::unique_ptr _probe_batch; - // holds everything referenced in _hash_tbl - std::unique_ptr _build_pool; - - std::vector _build_tuple_idx; - int _build_tuple_size; - int _build_tuple_row_size; - std::vector _find_nulls; - - RuntimeProfile::Counter* _build_timer; // time to build hash table - RuntimeProfile::Counter* _probe_timer; // time to probe }; }; // namespace doris - -#endif // DORIS_BE_SRC_QUERY_EXEC_INTERSECT_NODE_H \ No newline at end of file diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp new file mode 100644 index 0000000000..d21f7436d0 --- /dev/null +++ b/be/src/exec/set_operation_node.cpp @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/set_operation_node.h" + +#include "exec/hash_table.hpp" +#include "exprs/expr.h" +#include "runtime/raw_value.h" +#include "runtime/row_batch.h" +#include "runtime/runtime_state.h" + +namespace doris { +SetOperationNode::SetOperationNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs, int tuple_id) + : ExecNode(pool, tnode, descs), _tuple_id(tuple_id), _tuple_desc(nullptr) {} + +Status SetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::init(tnode, state)); + DCHECK_EQ(_conjunct_ctxs.size(), 0); + DCHECK_GE(_children.size(), 2); + return Status::OK(); +} + +Status SetOperationNode::prepare(RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::prepare(state)); + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + DCHECK(_tuple_desc != nullptr); + _build_pool.reset(new MemPool(mem_tracker())); + _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); + _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime"); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + for (size_t i = 0; i < _child_expr_lists.size(); ++i) { + RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(), + expr_mem_tracker())); + DCHECK_EQ(_child_expr_lists[i].size(), _tuple_desc->slots().size()); + } + _build_tuple_size = child(0)->row_desc().tuple_descriptors().size(); + _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*); + _build_tuple_idx.reserve(_build_tuple_size); + + for (int i = 0; i < _build_tuple_size; ++i) { + TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i]; + _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id())); + } + _find_nulls = std::vector(); + for (auto ctx : _child_expr_lists[0]) { + _find_nulls.push_back(!ctx->root()->is_slotref() || ctx->is_nullable()); + } + return Status::OK(); +} + +Status SetOperationNode::close(RuntimeState* state) { + if (is_closed()) { + return Status::OK(); + } + for (auto& exprs : _child_expr_lists) { + Expr::close(exprs, state); + } + + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); + // Must reset _probe_batch in close() to release resources + _probe_batch.reset(NULL); + + if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) { + COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes()); + COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size()); + } + if (_hash_tbl.get() != NULL) { + _hash_tbl->close(); + } + if (_build_pool.get() != NULL) { + _build_pool->free_all(); + } + + return ExecNode::close(state); +} + +string SetOperationNode::get_row_output_string(TupleRow* row, const RowDescriptor& row_desc) { + std::stringstream out; + out << "["; + for (int i = 0; i < row_desc.tuple_descriptors().size(); ++i) { + if (i != 0) { + out << " "; + } + out << Tuple::to_string(row->get_tuple(i), *row_desc.tuple_descriptors()[i]); + } + + out << "]"; + return out.str(); +} + +void SetOperationNode::create_output_row(TupleRow* input_row, RowBatch* row_batch, + uint8_t* tuple_buf) { + TupleRow* output_row = row_batch->get_row(row_batch->add_row()); + Tuple* dst_tuple = reinterpret_cast(tuple_buf); + const std::vector& exprs = _child_expr_lists[0]; + dst_tuple->materialize_exprs(input_row, *_tuple_desc, exprs, + row_batch->tuple_data_pool(), nullptr, nullptr); + output_row->set_tuple(0, dst_tuple); + row_batch->commit_last_row(); + VLOG_ROW << "commit row: " << get_row_output_string(output_row, row_desc()); +} + +bool SetOperationNode::equals(TupleRow* row, TupleRow* other) { + DCHECK(!(row == nullptr && other == nullptr)); + if (row == nullptr || other == nullptr) { + return false; + } + for (int i = 0; i < _child_expr_lists[0].size(); ++i) { + void* val_row = _child_expr_lists[0][i]->get_value(row); + void* val_other = _child_expr_lists[0][i]->get_value(other); + if (_find_nulls[i] && val_row == nullptr && val_other == nullptr) { + continue; + } else if (val_row == nullptr || val_other == nullptr) { + return false; + } else if (!RawValue::eq(val_row, val_other, _child_expr_lists[0][i]->root()->type())) { + return false; + } + } + return true; +} +} // namespace doris diff --git a/be/src/exec/set_operation_node.h b/be/src/exec/set_operation_node.h new file mode 100644 index 0000000000..6b12bf2e45 --- /dev/null +++ b/be/src/exec/set_operation_node.h @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/exec_node.h" +#include "exec/hash_table.h" + +namespace doris { + +class MemPool; +class RowBatch; +class TupleRow; + +// Node that calulate the set operation results of its children by either materializing their +// evaluated expressions into row batches or passing through (forwarding) the +// batches if the input tuple layout is identical to the output tuple layout +// and expressions don't need to be evaluated. The children should be ordered +// such that all passthrough children come before the children that need +// materialization. The set operation node pulls from its children sequentially, i.e. +// it exhausts one child completely before moving on to the next one. +class SetOperationNode : public ExecNode { +public: + SetOperationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, + int tuple_id); + + virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); + virtual Status prepare(RuntimeState* state); + virtual Status close(RuntimeState* state); + +protected: + std::string get_row_output_string(TupleRow* row, const RowDescriptor& row_desc); + void create_output_row(TupleRow* input_row, RowBatch* row_batch, uint8_t* tuple_buf); + // Returns true if the values of row and other are equal + bool equals(TupleRow* row, TupleRow* other); + + /// Tuple id resolved in Prepare() to set tuple_desc_; + const int _tuple_id; + /// Descriptor for tuples this union node constructs. + const TupleDescriptor* _tuple_desc; + // Exprs materialized by this node. The i-th result expr list refers to the i-th child. + std::vector> _child_expr_lists; + + std::unique_ptr _hash_tbl; + HashTable::Iterator _hash_tbl_iterator; + std::unique_ptr _probe_batch; + // holds everything referenced in _hash_tbl + std::unique_ptr _build_pool; + + std::vector _build_tuple_idx; + int _build_tuple_size; + int _build_tuple_row_size; + std::vector _find_nulls; + + RuntimeProfile::Counter* _build_timer; // time to build hash table + RuntimeProfile::Counter* _probe_timer; // time to probe +}; + +}; // namespace doris \ No newline at end of file