[improvement](join) Serialize build keys in a vectorized (columnar) way (#21361)
There is a significant performance improvement in serializing keys in the aggregate node through vectorization. Now, applying it to the join node also brings performance improvement.
This commit is contained in:
@ -73,6 +73,9 @@ struct ProcessHashTableProbe {
|
||||
int right_col_len, UInt8* __restrict null_map_data,
|
||||
UInt8* __restrict filter_map, Block* output_block);
|
||||
|
||||
void _pre_serialize_key(const ColumnRawPtrs& key_columns, const size_t key_rows,
|
||||
std::vector<StringRef>& serialized_keys);
|
||||
|
||||
// Process full outer join/ right join / right semi/anti join to output the join result
|
||||
// in hash table
|
||||
template <typename HashTableType>
|
||||
@ -93,6 +96,10 @@ struct ProcessHashTableProbe {
|
||||
// only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN
|
||||
ColumnUInt8::Container* _tuple_is_null_right_flags;
|
||||
|
||||
size_t _serialized_key_buffer_size {0};
|
||||
uint8_t* _serialized_key_buffer;
|
||||
std::unique_ptr<Arena> _serialize_key_arena;
|
||||
|
||||
RuntimeProfile::Counter* _rows_returned_counter;
|
||||
RuntimeProfile::Counter* _search_hashtable_timer;
|
||||
RuntimeProfile::Counter* _build_side_output_timer;
|
||||
|
||||
@ -150,6 +150,62 @@ void ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
|
||||
}
|
||||
}
|
||||
|
||||
template <int JoinOpType>
|
||||
void ProcessHashTableProbe<JoinOpType>::_pre_serialize_key(
|
||||
const ColumnRawPtrs& key_columns, const size_t key_rows,
|
||||
std::vector<StringRef>& serialized_keys) {
|
||||
if (serialized_keys.size() < key_rows) {
|
||||
serialized_keys.resize(key_rows);
|
||||
}
|
||||
size_t max_one_row_byte_size = 0;
|
||||
for (const auto column : key_columns) {
|
||||
max_one_row_byte_size += column->get_max_row_byte_size();
|
||||
}
|
||||
size_t total_bytes = max_one_row_byte_size * key_rows;
|
||||
|
||||
/// reach mem limit, don't serialize in batch
|
||||
/// If there is a very long row of data in a string column,
|
||||
/// it will result in a very larger estimated total_bytes.
|
||||
if (total_bytes > config::pre_serialize_keys_limit_bytes) {
|
||||
size_t old_probe_keys_memory_usage = 0;
|
||||
if (!_arena) {
|
||||
_arena.reset(new Arena());
|
||||
} else {
|
||||
old_probe_keys_memory_usage = _arena->size();
|
||||
}
|
||||
|
||||
_arena->clear();
|
||||
size_t keys_size = key_columns.size();
|
||||
for (size_t i = 0; i < key_rows; ++i) {
|
||||
serialized_keys[i] =
|
||||
serialize_keys_to_pool_contiguous(i, keys_size, key_columns, *_arena);
|
||||
}
|
||||
_join_node->_probe_arena_memory_usage->add(_arena->size() - old_probe_keys_memory_usage);
|
||||
} else {
|
||||
if (!_serialize_key_arena) {
|
||||
_serialize_key_arena.reset(new Arena);
|
||||
}
|
||||
if (total_bytes > _serialized_key_buffer_size) {
|
||||
_join_node->_probe_arena_memory_usage->add(-_serialized_key_buffer_size);
|
||||
_serialized_key_buffer_size = total_bytes;
|
||||
_serialize_key_arena->clear();
|
||||
_serialized_key_buffer = reinterpret_cast<uint8_t*>(
|
||||
_serialize_key_arena->alloc(_serialized_key_buffer_size));
|
||||
_join_node->_probe_arena_memory_usage->add(_serialized_key_buffer_size);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < key_rows; ++i) {
|
||||
serialized_keys[i].data =
|
||||
reinterpret_cast<char*>(_serialized_key_buffer + i * max_one_row_byte_size);
|
||||
serialized_keys[i].size = 0;
|
||||
}
|
||||
|
||||
for (const auto column : key_columns) {
|
||||
column->serialize_vec(serialized_keys, key_rows, max_one_row_byte_size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <int JoinOpType>
|
||||
template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType>
|
||||
Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_ctx,
|
||||
@ -176,27 +232,10 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
|
||||
|
||||
KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr);
|
||||
|
||||
if (probe_index == 0) {
|
||||
size_t old_probe_keys_memory_usage = 0;
|
||||
if (_arena) {
|
||||
old_probe_keys_memory_usage = _arena->size();
|
||||
}
|
||||
_arena.reset(new Arena()); // TODO arena reuse by clear()?
|
||||
if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) {
|
||||
if (_probe_keys.size() < probe_rows) {
|
||||
_probe_keys.resize(probe_rows);
|
||||
}
|
||||
size_t keys_size = probe_raw_ptrs.size();
|
||||
for (size_t i = 0; i < probe_rows; ++i) {
|
||||
_probe_keys[i] =
|
||||
serialize_keys_to_pool_contiguous(i, keys_size, probe_raw_ptrs, *_arena);
|
||||
}
|
||||
_join_node->_probe_arena_memory_usage->add(_arena->size() -
|
||||
old_probe_keys_memory_usage);
|
||||
}
|
||||
}
|
||||
|
||||
if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) {
|
||||
if (probe_index == 0) {
|
||||
_pre_serialize_key(probe_raw_ptrs, probe_rows, _probe_keys);
|
||||
}
|
||||
key_getter.set_serialized_keys(_probe_keys.data());
|
||||
}
|
||||
|
||||
@ -426,27 +465,10 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
|
||||
JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN;
|
||||
KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr);
|
||||
|
||||
if (probe_index == 0) {
|
||||
size_t old_probe_keys_memory_usage = 0;
|
||||
if (_arena) {
|
||||
old_probe_keys_memory_usage = _arena->size();
|
||||
}
|
||||
_arena.reset(new Arena());
|
||||
if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) {
|
||||
if (_probe_keys.size() < probe_rows) {
|
||||
_probe_keys.resize(probe_rows);
|
||||
}
|
||||
size_t keys_size = probe_raw_ptrs.size();
|
||||
for (size_t i = 0; i < probe_rows; ++i) {
|
||||
_probe_keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, probe_raw_ptrs,
|
||||
*_arena);
|
||||
}
|
||||
}
|
||||
_join_node->_probe_arena_memory_usage->add(_arena->size() -
|
||||
old_probe_keys_memory_usage);
|
||||
}
|
||||
|
||||
if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) {
|
||||
if (probe_index == 0) {
|
||||
_pre_serialize_key(probe_raw_ptrs, probe_rows, _probe_keys);
|
||||
}
|
||||
key_getter.set_serialized_keys(_probe_keys.data());
|
||||
}
|
||||
|
||||
|
||||
@ -508,6 +508,18 @@ Status HashJoinNode::close(RuntimeState* state) {
|
||||
if (is_closed()) {
|
||||
return Status::OK();
|
||||
}
|
||||
std::visit(Overload {[&](std::monostate&) {},
|
||||
[&](auto&& process_hashtable_ctx) {
|
||||
if (process_hashtable_ctx._arena) {
|
||||
process_hashtable_ctx._arena.reset();
|
||||
}
|
||||
|
||||
if (process_hashtable_ctx._serialize_key_arena) {
|
||||
process_hashtable_ctx._serialize_key_arena.reset();
|
||||
process_hashtable_ctx._serialized_key_buffer_size = 0;
|
||||
}
|
||||
}},
|
||||
*_process_hashtable_ctx_variants);
|
||||
return VJoinNodeBase::close(state);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user