From 047b83b987f2a997ebcf3f6346ba7c8d042c0d23 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Fri, 12 Nov 2021 10:39:59 +0800 Subject: [PATCH] [Optimize][Set Operation Node] Reduce the memory expansion operation of the hash table in ExceptNode and IntersectNode (#6915) Reduce the memory expansion operation of the hash table in ExceptNode and IntersectNode --- be/src/exec/except_node.cpp | 27 ++++++--------------- be/src/exec/hash_table.cpp | 2 -- be/src/exec/hash_table.h | 16 +++++++++--- be/src/exec/intersect_node.cpp | 32 ++++++------------------ be/src/exec/set_operation_node.cpp | 4 ++- be/src/exec/set_operation_node.h | 39 ++++++++++++++++++++++++++++++ 6 files changed, 68 insertions(+), 52 deletions(-) diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp index 7ac5cd4b2b..8229b73e53 100644 --- a/be/src/exec/except_node.cpp +++ b/be/src/exec/except_node.cpp @@ -41,33 +41,17 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) { Status ExceptNode::open(RuntimeState* state) { RETURN_IF_ERROR(SetOperationNode::open(state)); // if a table is empty, the result must be empty - if (_hash_tbl->size() == 0) { _hash_tbl_iterator = _hash_tbl->begin(); return Status::OK(); } bool eos = false; + _valid_element_in_hash_tbl = _hash_tbl->num_filled_buckets(); for (int i = 1; i < _children.size(); ++i) { // rebuild hash table, for first time will rebuild with the no duplicated _hash_tbl, - if (i > 1) { - SCOPED_TIMER(_build_timer); - std::unique_ptr temp_tbl( - new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size, - true, _find_nulls, id(), mem_tracker(), 1024)); - _hash_tbl_iterator = _hash_tbl->begin(); - while (_hash_tbl_iterator.has_next()) { - if (!_hash_tbl_iterator.matched()) { - VLOG_ROW << "rebuild row: " - << get_row_output_string(_hash_tbl_iterator.get_row(), - child(0)->row_desc()); - temp_tbl->insert(_hash_tbl_iterator.get_row()); - } - _hash_tbl_iterator.next(); - } - _hash_tbl.swap(temp_tbl); - temp_tbl->close(); - } + if (i > 1) { refresh_hash_table(i); } + // probe _probe_batch.reset( new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker().get())); @@ -83,7 +67,10 @@ Status ExceptNode::open(RuntimeState* state) { << get_row_output_string(_probe_batch->get_row(j), child(i)->row_desc()); _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j)); if (_hash_tbl_iterator != _hash_tbl->end()) { - _hash_tbl_iterator.set_matched(); + if (!_hash_tbl_iterator.matched()) { + _hash_tbl_iterator.set_matched(); + _valid_element_in_hash_tbl--; + } VLOG_ROW << "probe matched: " << get_row_output_string(_hash_tbl_iterator.get_row(), child(0)->row_desc()); diff --git a/be/src/exec/hash_table.cpp b/be/src/exec/hash_table.cpp index a78607ae36..867f513316 100644 --- a/be/src/exec/hash_table.cpp +++ b/be/src/exec/hash_table.cpp @@ -27,8 +27,6 @@ namespace doris { -const float HashTable::MAX_BUCKET_OCCUPANCY_FRACTION = 0.75f; - HashTable::HashTable(const std::vector& build_expr_ctxs, const std::vector& probe_expr_ctxs, int num_build_tuples, bool stores_nulls, const std::vector& finds_nulls, int32_t initial_seed, diff --git a/be/src/exec/hash_table.h b/be/src/exec/hash_table.h index 0560fc8a56..ee74d8f66f 100644 --- a/be/src/exec/hash_table.h +++ b/be/src/exec/hash_table.h @@ -137,6 +137,14 @@ public: // Returns the number of buckets int64_t num_buckets() { return _buckets.size(); } + // Returns the number of filled buckets + int64_t num_filled_buckets() { return _num_filled_buckets; } + + // Check the hash table should be shrink + bool should_be_shrink(int64_t valid_row) { + return valid_row < MAX_BUCKET_OCCUPANCY_FRACTION * (_buckets.size() / 2.0); + } + // true if any of the MemTrackers was exceeded bool exceeded_limit() const { return _exceeded_limit; } @@ -174,6 +182,10 @@ public: inline std::pair minmax_node(); + // Load factor that will trigger growing the hash table on insert. This is + // defined as the number of non-empty buckets / total_buckets + static constexpr float MAX_BUCKET_OCCUPANCY_FRACTION = 0.75f; + // stl-like iterator interface. class Iterator { public: @@ -348,10 +360,6 @@ private: // brought us over the mem limit. void mem_limit_exceeded(int64_t allocation_size); - // Load factor that will trigger growing the hash table on insert. This is - // defined as the number of non-empty buckets / total_buckets - static const float MAX_BUCKET_OCCUPANCY_FRACTION; - const std::vector& _build_expr_ctxs; const std::vector& _probe_expr_ctxs; diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp index df78798036..60481cce86 100644 --- a/be/src/exec/intersect_node.cpp +++ b/be/src/exec/intersect_node.cpp @@ -52,30 +52,9 @@ Status IntersectNode::open(RuntimeState* state) { bool eos = false; for (int i = 1; i < _children.size(); ++i) { - if (i > 1) { - SCOPED_TIMER(_build_timer); - std::unique_ptr temp_tbl( - new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size, - true, _find_nulls, id(), mem_tracker(), 1024)); - _hash_tbl_iterator = _hash_tbl->begin(); - while (_hash_tbl_iterator.has_next()) { - if (_hash_tbl_iterator.matched()) { - VLOG_ROW << "rebuild row: " - << get_row_output_string(_hash_tbl_iterator.get_row(), - child(0)->row_desc()); - temp_tbl->insert(_hash_tbl_iterator.get_row()); - } - _hash_tbl_iterator.next(); - } - _hash_tbl.swap(temp_tbl); - temp_tbl->close(); - VLOG_ROW << "hash table content: " - << _hash_tbl->debug_string(true, &child(0)->row_desc()); - // if a table is empty, the result must be empty - if (_hash_tbl->size() == 0) { - break; - } - } + if (i > 1) { refresh_hash_table(i); } + + _valid_element_in_hash_tbl = 0; // probe _probe_batch.reset( new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker().get())); @@ -91,7 +70,10 @@ Status IntersectNode::open(RuntimeState* state) { << get_row_output_string(_probe_batch->get_row(j), child(i)->row_desc()); _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j)); if (_hash_tbl_iterator != _hash_tbl->end()) { - _hash_tbl_iterator.set_matched(); + if (!_hash_tbl_iterator.matched()) { + _valid_element_in_hash_tbl++; + _hash_tbl_iterator.set_matched(); + } VLOG_ROW << "probe matched: " << get_row_output_string(_hash_tbl_iterator.get_row(), child(0)->row_desc()); diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp index 7c63f1d17b..523efe0c86 100644 --- a/be/src/exec/set_operation_node.cpp +++ b/be/src/exec/set_operation_node.cpp @@ -27,7 +27,7 @@ namespace doris { SetOperationNode::SetOperationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, int tuple_id) - : ExecNode(pool, tnode, descs), _tuple_id(tuple_id), _tuple_desc(nullptr) {} + : ExecNode(pool, tnode, descs), _tuple_id(tuple_id), _tuple_desc(nullptr), _valid_element_in_hash_tbl(0) {} Status SetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); @@ -163,6 +163,8 @@ Status SetOperationNode::open(RuntimeState* state) { VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc()); build_batch.reset(); } + return Status::OK(); } + } // namespace doris diff --git a/be/src/exec/set_operation_node.h b/be/src/exec/set_operation_node.h index 8ec585b324..40645b1110 100644 --- a/be/src/exec/set_operation_node.h +++ b/be/src/exec/set_operation_node.h @@ -49,6 +49,13 @@ protected: // Returns true if the values of row and other are equal bool equals(TupleRow* row, TupleRow* other); + template + // Refresh the hash table and probe expr, before we dispose data of next child + // TODO: Check whether the hash table should be shrink to reduce necessary refresh + // but may different child has different probe expr which may cause wrong result. + // so we need keep probe expr same in FE to optimize this issue. + void refresh_hash_table(int child); + /// Tuple id resolved in Prepare() to set tuple_desc_; const int _tuple_id; /// Descriptor for tuples this union node constructs. @@ -58,6 +65,8 @@ protected: std::unique_ptr _hash_tbl; HashTable::Iterator _hash_tbl_iterator; + int64_t _valid_element_in_hash_tbl; + std::unique_ptr _probe_batch; // holds everything referenced in _hash_tbl std::unique_ptr _build_pool; @@ -71,4 +80,34 @@ protected: RuntimeProfile::Counter* _probe_timer; // time to probe }; +template +void SetOperationNode::refresh_hash_table(int child_id) { + SCOPED_TIMER(_build_timer); + std::unique_ptr temp_tbl( + new HashTable(_child_expr_lists[0], _child_expr_lists[child_id], _build_tuple_size, + true, _find_nulls, id(), mem_tracker(), + _valid_element_in_hash_tbl / HashTable::MAX_BUCKET_OCCUPANCY_FRACTION + 1)); + _hash_tbl_iterator = _hash_tbl->begin(); + while (_hash_tbl_iterator.has_next()) { + if constexpr (keep_matched) { + if (_hash_tbl_iterator.matched()) { + VLOG_ROW << "rebuild row: " + << get_row_output_string(_hash_tbl_iterator.get_row(), + child(0)->row_desc()); + temp_tbl->insert(_hash_tbl_iterator.get_row()); + } + } else { + if (!_hash_tbl_iterator.matched()) { + VLOG_ROW << "rebuild row: " + << get_row_output_string(_hash_tbl_iterator.get_row(), + child(0)->row_desc()); + temp_tbl->insert(_hash_tbl_iterator.get_row()); + } + } + _hash_tbl_iterator.next(); + } + _hash_tbl.swap(temp_tbl); + temp_tbl->close(); +} + }; // namespace doris \ No newline at end of file