diff --git a/be/src/vec/exec/join/process_hash_table_probe.h b/be/src/vec/exec/join/process_hash_table_probe.h index 6b95b6e3e0..35b8aefb96 100644 --- a/be/src/vec/exec/join/process_hash_table_probe.h +++ b/be/src/vec/exec/join/process_hash_table_probe.h @@ -73,6 +73,9 @@ struct ProcessHashTableProbe { int right_col_len, UInt8* __restrict null_map_data, UInt8* __restrict filter_map, Block* output_block); + void _pre_serialize_key(const ColumnRawPtrs& key_columns, const size_t key_rows, + std::vector& serialized_keys); + // Process full outer join/ right join / right semi/anti join to output the join result // in hash table template @@ -93,6 +96,10 @@ struct ProcessHashTableProbe { // only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN ColumnUInt8::Container* _tuple_is_null_right_flags; + size_t _serialized_key_buffer_size {0}; + uint8_t* _serialized_key_buffer; + std::unique_ptr _serialize_key_arena; + RuntimeProfile::Counter* _rows_returned_counter; RuntimeProfile::Counter* _search_hashtable_timer; RuntimeProfile::Counter* _build_side_output_timer; diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 5923dbf1c5..f7ad2a1fd5 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -150,6 +150,62 @@ void ProcessHashTableProbe::probe_side_output_column( } } +template +void ProcessHashTableProbe::_pre_serialize_key( + const ColumnRawPtrs& key_columns, const size_t key_rows, + std::vector& serialized_keys) { + if (serialized_keys.size() < key_rows) { + serialized_keys.resize(key_rows); + } + size_t max_one_row_byte_size = 0; + for (const auto column : key_columns) { + max_one_row_byte_size += column->get_max_row_byte_size(); + } + size_t total_bytes = max_one_row_byte_size * key_rows; + + /// reach mem limit, don't serialize in batch + /// If there is a very long row of data in a string column, + /// it will result in a very larger estimated total_bytes. + if (total_bytes > config::pre_serialize_keys_limit_bytes) { + size_t old_probe_keys_memory_usage = 0; + if (!_arena) { + _arena.reset(new Arena()); + } else { + old_probe_keys_memory_usage = _arena->size(); + } + + _arena->clear(); + size_t keys_size = key_columns.size(); + for (size_t i = 0; i < key_rows; ++i) { + serialized_keys[i] = + serialize_keys_to_pool_contiguous(i, keys_size, key_columns, *_arena); + } + _join_node->_probe_arena_memory_usage->add(_arena->size() - old_probe_keys_memory_usage); + } else { + if (!_serialize_key_arena) { + _serialize_key_arena.reset(new Arena); + } + if (total_bytes > _serialized_key_buffer_size) { + _join_node->_probe_arena_memory_usage->add(-_serialized_key_buffer_size); + _serialized_key_buffer_size = total_bytes; + _serialize_key_arena->clear(); + _serialized_key_buffer = reinterpret_cast( + _serialize_key_arena->alloc(_serialized_key_buffer_size)); + _join_node->_probe_arena_memory_usage->add(_serialized_key_buffer_size); + } + + for (size_t i = 0; i < key_rows; ++i) { + serialized_keys[i].data = + reinterpret_cast(_serialized_key_buffer + i * max_one_row_byte_size); + serialized_keys[i].size = 0; + } + + for (const auto column : key_columns) { + column->serialize_vec(serialized_keys, key_rows, max_one_row_byte_size); + } + } +} + template template Status ProcessHashTableProbe::do_process(HashTableType& hash_table_ctx, @@ -176,27 +232,10 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); - if (probe_index == 0) { - size_t old_probe_keys_memory_usage = 0; - if (_arena) { - old_probe_keys_memory_usage = _arena->size(); - } - _arena.reset(new Arena()); // TODO arena reuse by clear()? - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { - if (_probe_keys.size() < probe_rows) { - _probe_keys.resize(probe_rows); - } - size_t keys_size = probe_raw_ptrs.size(); - for (size_t i = 0; i < probe_rows; ++i) { - _probe_keys[i] = - serialize_keys_to_pool_contiguous(i, keys_size, probe_raw_ptrs, *_arena); - } - _join_node->_probe_arena_memory_usage->add(_arena->size() - - old_probe_keys_memory_usage); - } - } - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { + if (probe_index == 0) { + _pre_serialize_key(probe_raw_ptrs, probe_rows, _probe_keys); + } key_getter.set_serialized_keys(_probe_keys.data()); } @@ -426,27 +465,10 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); - if (probe_index == 0) { - size_t old_probe_keys_memory_usage = 0; - if (_arena) { - old_probe_keys_memory_usage = _arena->size(); - } - _arena.reset(new Arena()); - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { - if (_probe_keys.size() < probe_rows) { - _probe_keys.resize(probe_rows); - } - size_t keys_size = probe_raw_ptrs.size(); - for (size_t i = 0; i < probe_rows; ++i) { - _probe_keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, probe_raw_ptrs, - *_arena); - } - } - _join_node->_probe_arena_memory_usage->add(_arena->size() - - old_probe_keys_memory_usage); - } - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { + if (probe_index == 0) { + _pre_serialize_key(probe_raw_ptrs, probe_rows, _probe_keys); + } key_getter.set_serialized_keys(_probe_keys.data()); } diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 2a239c48b0..678d2f6483 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -508,6 +508,18 @@ Status HashJoinNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); } + std::visit(Overload {[&](std::monostate&) {}, + [&](auto&& process_hashtable_ctx) { + if (process_hashtable_ctx._arena) { + process_hashtable_ctx._arena.reset(); + } + + if (process_hashtable_ctx._serialize_key_arena) { + process_hashtable_ctx._serialize_key_arena.reset(); + process_hashtable_ctx._serialized_key_buffer_size = 0; + } + }}, + *_process_hashtable_ctx_variants); return VJoinNodeBase::close(state); }