From 358bd79fb10e571a9fcf54a8726ad9906cdec0e3 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Mon, 31 Jan 2022 22:14:12 +0800 Subject: [PATCH] [improvement](vec)(Join) Mem reuse to speed up join operator (#7905) 1. Reuse the mem of output block in vec join node 2. Add the function `replicate` in column --- be/src/vec/columns/column.h | 4 +++ be/src/vec/columns/column_complex.h | 21 +++++++++++++ be/src/vec/columns/column_const.cpp | 6 ++++ be/src/vec/columns/column_const.h | 1 + be/src/vec/columns/column_decimal.cpp | 14 +++++++++ be/src/vec/columns/column_decimal.h | 3 ++ be/src/vec/columns/column_nullable.cpp | 6 ++++ be/src/vec/columns/column_nullable.h | 1 + be/src/vec/columns/column_string.cpp | 31 +++++++++++++++++++ be/src/vec/columns/column_string.h | 2 ++ be/src/vec/columns/column_vector.cpp | 14 +++++++++ be/src/vec/columns/column_vector.h | 2 ++ be/src/vec/common/hash_table/hash_table.h | 2 +- be/src/vec/common/pod_array.h | 4 +-- be/src/vec/exec/join/vhash_join_node.cpp | 36 +++++++++++------------ be/src/vec/functions/math.cpp | 6 ++-- 16 files changed, 129 insertions(+), 24 deletions(-) diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index c730168d1c..3edd25af0e 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -267,6 +267,10 @@ public: using Offsets = PaddedPODArray; virtual Ptr replicate(const Offsets& offsets) const = 0; + virtual void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { + LOG(FATAL) << "not support"; + }; + /** Split column to smaller columns. Each value goes to column index, selected by corresponding element of 'selector'. * Selector must contain values from 0 to num_columns - 1. * For default implementation, see scatter_impl. diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h index 76ad270be4..576388f907 100644 --- a/be/src/vec/columns/column_complex.h +++ b/be/src/vec/columns/column_complex.h @@ -191,6 +191,8 @@ public: ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; + [[noreturn]] MutableColumns scatter(IColumn::ColumnIndex num_columns, const IColumn::Selector& selector) const override { LOG(FATAL) << "scatter not implemented"; @@ -298,5 +300,24 @@ ColumnPtr ColumnComplexType::replicate(const IColumn::Offsets& offsets) const return res; } +template +void ColumnComplexType::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { + size_t size = data.size(); + if (0 == size) return; + + auto& res = reinterpret_cast&>(column); + typename Self::Container& res_data = res.get_data(); + res_data.reserve(target_size); + + for (size_t i = 0; i < size; ++i) { + size_t size_to_replicate = counts[i]; + for (size_t j = 0; j < size_to_replicate; ++j) { + res_data.push_back(data[i]); + } + } + + return res; +} + using ColumnBitmap = ColumnComplexType; } // namespace doris::vectorized diff --git a/be/src/vec/columns/column_const.cpp b/be/src/vec/columns/column_const.cpp index 3bc07a7a6a..efb790249a 100644 --- a/be/src/vec/columns/column_const.cpp +++ b/be/src/vec/columns/column_const.cpp @@ -65,6 +65,12 @@ ColumnPtr ColumnConst::replicate(const Offsets& offsets) const { return ColumnConst::create(data, replicated_size); } +void ColumnConst::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { + if (s == 0) return; + auto& res = reinterpret_cast(column); + res.s = s; +} + ColumnPtr ColumnConst::permute(const Permutation& perm, size_t limit) const { if (limit == 0) { limit = s; diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index e019c56169..16be16692d 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -117,6 +117,7 @@ public: ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override; ColumnPtr replicate(const Offsets& offsets) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; ColumnPtr permute(const Permutation& perm, size_t limit) const override; // ColumnPtr index(const IColumn & indexes, size_t limit) const override; void get_permutation(bool reverse, size_t limit, int nan_direction_hint, diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index fbfc1b4432..2cf5cc6a55 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -221,6 +221,20 @@ ColumnPtr ColumnDecimal::replicate(const IColumn::Offsets& offsets) const { return res; } +template +void ColumnDecimal::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { + size_t size = data.size(); + if (0 == size) return; + + auto& res = reinterpret_cast&>(column); + typename Self::Container& res_data = res.get_data(); + res_data.reserve(target_size); + + for (size_t i = 0; i < size; ++i) { + res_data.add_num_element_without_reserve(data[i], counts[i]); + } +} + template void ColumnDecimal::get_extremes(Field& min, Field& max) const { if (data.size() == 0) { diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index 96a7a72a66..017d891e4f 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -153,6 +153,9 @@ public: ColumnPtr index_impl(const PaddedPODArray& indexes, size_t limit) const; ColumnPtr replicate(const IColumn::Offsets& offsets) const override; + + void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; + void get_extremes(Field& min, Field& max) const override; MutableColumns scatter(IColumn::ColumnIndex num_columns, diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index ae3a2fd1b7..2d946f89d0 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -407,6 +407,12 @@ ColumnPtr ColumnNullable::replicate(const Offsets& offsets) const { return ColumnNullable::create(replicated_data, replicated_null_map); } +void ColumnNullable::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { + auto& res = reinterpret_cast(column); + get_nested_column().replicate(counts, target_size, res.get_nested_column()); + get_null_map_column().replicate(counts, target_size, res.get_null_map_column()); +} + template void ColumnNullable::apply_null_map_impl(const ColumnUInt8& map) { NullMap& arr1 = get_null_map_data(); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index a369c02abd..2bbf55845d 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -121,6 +121,7 @@ public: size_t allocated_bytes() const override; void protect() override; ColumnPtr replicate(const Offsets& replicate_offsets) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; void update_hash_with_value(size_t n, SipHash& hash) const override; void get_extremes(Field& min, Field& max) const override; diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index f4a32dc4f6..afd3f23115 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -290,6 +290,37 @@ ColumnPtr ColumnString::replicate(const Offsets& replicate_offsets) const { return res; } +void ColumnString::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { + size_t col_size = size(); + if (0 == col_size) return; + + auto& res = reinterpret_cast(column); + + Chars& res_chars = res.chars; + Offsets& res_offsets = res.offsets; + res_chars.reserve(chars.size() / col_size * target_size); + res_offsets.reserve(target_size); + + Offset prev_string_offset = 0; + Offset current_new_offset = 0; + + for (size_t i = 0; i < col_size; ++i) { + size_t size_to_replicate = counts[i]; + size_t string_size = offsets[i] - prev_string_offset; + + for (size_t j = 0; j < size_to_replicate; ++j) { + current_new_offset += string_size; + res_offsets.push_back(current_new_offset); + + res_chars.resize(res_chars.size() + string_size); + memcpy_small_allow_read_write_overflow15(&res_chars[res_chars.size() - string_size], + &chars[prev_string_offset], string_size); + } + + prev_string_offset = offsets[i]; + } +} + void ColumnString::reserve(size_t n) { offsets.reserve(n); } diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 312e1d8c82..2b318faec7 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -238,6 +238,8 @@ public: ColumnPtr replicate(const Offsets& replicate_offsets) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; + MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override { return scatter_impl(num_columns, selector); } diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index f640a601bd..aed2502b1f 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -324,6 +324,20 @@ ColumnPtr ColumnVector::replicate(const IColumn::Offsets& offsets) const { return res; } +template +void ColumnVector::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { + size_t size = data.size(); + if (size == 0) return; + + auto& res = reinterpret_cast&>(column); + typename Self::Container& res_data = res.get_data(); + res_data.reserve(target_size); + + for (size_t i = 0; i < size; ++i) { + res_data.add_num_element_without_reserve(data[i], counts[i]); + } +} + template void ColumnVector::get_extremes(Field& min, Field& max) const { size_t size = data.size(); diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index 8732032a1c..8e06169697 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -271,6 +271,8 @@ public: ColumnPtr replicate(const IColumn::Offsets& offsets) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; + void get_extremes(Field& min, Field& max) const override; MutableColumns scatter(IColumn::ColumnIndex num_columns, diff --git a/be/src/vec/common/hash_table/hash_table.h b/be/src/vec/common/hash_table/hash_table.h index 53af823ea0..f0a94b77f3 100644 --- a/be/src/vec/common/hash_table/hash_table.h +++ b/be/src/vec/common/hash_table/hash_table.h @@ -240,7 +240,7 @@ void insert_set_mapped(MappedType* dest, const ValueType& src) { /** Determines the size of the hash table, and when and how much it should be resized. */ -template +template struct HashTableGrower { /// The state of this structure is enough to get the buffer size of the hash table. doris::vectorized::UInt8 size_degree = initial_size_degree; diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h index 892b3d243e..d6419fdb8f 100644 --- a/be/src/vec/common/pod_array.h +++ b/be/src/vec/common/pod_array.h @@ -378,8 +378,8 @@ public: template void add_num_element_without_reserve(U&& x, uint32_t num, TAllocatorParams&&... allocator_params) { - std::fill(t_end(), t_end() + num, x); - this->c_end += sizeof(T) * num; + std::fill(t_end(), t_end() + num, x); + this->c_end += sizeof(T) * num; } /** diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 9563ebf169..4fa5429182 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -175,9 +175,8 @@ struct ProcessHashTableProbe { KeyGetter key_getter(_probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); - IColumn::Offsets offset_data; + std::vector items_counts(_probe_rows); auto& mcol = mutable_block.mutable_columns(); - offset_data.assign(_probe_rows, (uint32_t)0); int right_col_idx = _join_node->_is_right_semi_anti ? 0 : _left_table_data_types.size(); int right_col_len = _right_table_data_types.size(); @@ -187,10 +186,12 @@ struct ProcessHashTableProbe { // ignore null rows if constexpr (ignore_null) { if ((*null_map)[_probe_index]) { - offset_data[_probe_index++] = current_offset; + items_counts[_probe_index++] = 0; continue; } } + + int repeat_count = 0; auto find_result = (*null_map)[_probe_index] ? decltype(key_getter.find_key(hash_table_ctx.hash_table, _probe_index, @@ -203,7 +204,7 @@ struct ProcessHashTableProbe { if (find_result.is_found()) { // left semi join only need one match, do not need insert the data of right table if (_join_node->_join_op == TJoinOp::LEFT_SEMI_JOIN) { - ++current_offset; + ++repeat_count; } else if (_join_node->_join_op == TJoinOp::LEFT_ANTI_JOIN) { // do nothing } else { @@ -215,7 +216,7 @@ struct ProcessHashTableProbe { // right semi/anti join should dispose the data in hash table // after probe data eof if (!_join_node->_is_right_semi_anti) { - ++current_offset; + ++repeat_count; for (size_t j = 0; j < right_col_len; ++j) { auto& column = *mapped.block->get_by_position(j).column; mcol[j + right_col_idx]->insert_from(column, mapped.row_num); @@ -226,7 +227,7 @@ struct ProcessHashTableProbe { // right semi/anti join should dispose the data in hash table // after probe data eof if (!_join_node->_is_right_semi_anti) { - ++current_offset; + ++repeat_count; for (size_t j = 0; j < right_col_len; ++j) { auto& column = *it->block->get_by_position(j).column; // TODO: interface insert from cause serious performance problems @@ -240,7 +241,7 @@ struct ProcessHashTableProbe { } } else if (_join_node->_match_all_probe || _join_node->_join_op == TJoinOp::LEFT_ANTI_JOIN) { - ++current_offset; + ++repeat_count; // only full outer / left outer need insert the data of right table if (_join_node->_match_all_probe) { for (size_t j = 0; j < right_col_len; ++j) { @@ -250,22 +251,19 @@ struct ProcessHashTableProbe { } } - offset_data[_probe_index++] = current_offset; + items_counts[_probe_index++] = repeat_count; + current_offset += repeat_count; if (current_offset >= _batch_size) { break; } } - - for (int i = _probe_index; i < _probe_rows; ++i) { - offset_data[i] = current_offset; - } - output_block->swap(mutable_block.to_block()); - + for (int i = 0; i < right_col_idx; ++i) { auto& column = _probe_block.get_by_position(i).column; - output_block->get_by_position(i).column = column->replicate(offset_data); + column->replicate(items_counts.data(), current_offset, *mcol[i]); } + output_block->swap(mutable_block.to_block()); return Status::OK(); } @@ -711,11 +709,13 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo } Status st; - output_block->clear(); if (_probe_index < _probe_block.rows()) { - MutableBlock mutable_block(VectorizedUtils::create_empty_columnswithtypename( - _have_other_join_conjunct ? _row_desc_for_other_join_conjunt : row_desc())); + MutableBlock mutable_block = (output_block->mem_reuse() && !_have_other_join_conjunct) ? + MutableBlock(output_block) : + MutableBlock(VectorizedUtils::create_empty_columnswithtypename( + !_have_other_join_conjunct ? row_desc() : _row_desc_for_other_join_conjunt)); + std::visit( [&](auto&& arg) { using HashTableCtxType = std::decay_t; diff --git a/be/src/vec/functions/math.cpp b/be/src/vec/functions/math.cpp index abff782fee..540c2c175c 100644 --- a/be/src/vec/functions/math.cpp +++ b/be/src/vec/functions/math.cpp @@ -181,7 +181,7 @@ struct HexIntImpl { // uint64_t max value 0xFFFFFFFFFFFFFFFF , 16 'F' if (num == 0) { return {hex_table, 1};} - size_t i = 0; + int i = 0; while (num) { ans[i++] = hex_table[num & 15]; num = num >> 4; @@ -189,13 +189,13 @@ struct HexIntImpl { ans[i] = '\0'; // reverse - for (int k = 0, j = i - 1; k <= j; k++, j--) { + for (int k = 0, j = i - 1; k <= j && k <= 16; k++, j--) { char tmp = ans[j]; ans[j] = ans[k]; ans[k] = tmp; } - return {ans, i}; + return {ans, static_cast(i)}; } static Status vector(const ColumnInt64::Container& data, ColumnString::Chars& res_data,