From 9d3f1dcf441482a00a49db5e8c7f146bc98c1613 Mon Sep 17 00:00:00 2001 From: ZenoYang Date: Wed, 2 Aug 2023 21:19:56 +0800 Subject: [PATCH] [improvement](vectorized) Deserialized elements of count distinct aggregation directly inserted into target hashset (#21888) The original logic is to first deserialize the ColumnString into a HashSet (insert the deserialized elements into the hashset), and then traverse all the HashSet elements into the target HashSet during the merge phase. After optimization, when deserializing, elements are directly inserted into the target HashSet, thereby reducing unnecessary hashset insert overhead. In one of our internal query tests, 30 hashsets were merged in second phase aggregation(the average cardinality is 1,400,000), and the cardinality after merging is 42,000,000. After optimization, the MergeTime dropped from 5s965ms to 3s375ms. --- .../aggregate_functions/aggregate_function.h | 92 ++++++++++++++----- .../aggregate_function_avg.h | 16 ++++ .../aggregate_function_count.h | 32 +++++++ .../aggregate_function_count_old.h | 16 ++++ .../aggregate_function_min_max.h | 16 ++++ .../aggregate_function_nothing.h | 4 +- .../aggregate_function_null.h | 8 +- .../aggregate_function_sum.h | 16 ++++ .../aggregate_function_uniq.h | 16 +++- be/src/vec/exec/vaggregation_node.cpp | 2 +- be/src/vec/exec/vaggregation_node.h | 30 ++---- 11 files changed, 195 insertions(+), 53 deletions(-) diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index 98faf3a389..4b2118fc51 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -143,11 +143,21 @@ public: virtual void deserialize_vec(AggregateDataPtr places, const ColumnString* column, Arena* arena, size_t num_rows) const = 0; + virtual void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const = 0; + + virtual void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, + const ColumnString* column, Arena* arena, + const size_t num_rows) const = 0; + virtual void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena* arena, size_t num_rows) const = 0; /// Deserializes state and merge it with current aggregation function. - virtual void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, + virtual void deserialize_and_merge(AggregateDataPtr __restrict place, + AggregateDataPtr __restrict rhs, BufferReadable& buf, Arena* arena) const = 0; virtual void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict place, @@ -361,6 +371,51 @@ public: } } + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + const auto size_of_data = assert_cast(this)->size_of_data(); + for (size_t i = 0; i != num_rows; ++i) { + try { + auto rhs_place = rhs + size_of_data * i; + VectorBufferReader buffer_reader(column->get_data_at(i)); + assert_cast(this)->create(rhs_place); + assert_cast(this)->deserialize_and_merge( + places[i] + offset, rhs_place, buffer_reader, arena); + } catch (...) { + for (int j = 0; j < i; ++j) { + auto place = rhs + size_of_data * j; + assert_cast(this)->destroy(place); + } + throw; + } + } + assert_cast(this)->destroy_vec(rhs, num_rows); + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + const auto size_of_data = assert_cast(this)->size_of_data(); + for (size_t i = 0; i != num_rows; ++i) { + try { + auto rhs_place = rhs + size_of_data * i; + VectorBufferReader buffer_reader(column->get_data_at(i)); + assert_cast(this)->create(rhs_place); + if (places[i]) + assert_cast(this)->deserialize_and_merge( + places[i] + offset, rhs_place, buffer_reader, arena); + } catch (...) { + for (int j = 0; j < i; ++j) { + auto place = rhs + size_of_data * j; + assert_cast(this)->destroy(place); + } + throw; + } + } + assert_cast(this)->destroy_vec(rhs, num_rows); + } + void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena* arena, size_t num_rows) const override { deserialize_vec(places, assert_cast(&column), arena, num_rows); @@ -395,7 +450,12 @@ public: for (size_t i = begin; i <= end; ++i) { VectorBufferReader buffer_reader( (assert_cast(column)).get_data_at(i)); - deserialize_and_merge(place, buffer_reader, arena); + char deserialized_data[size_of_data()]; + AggregateDataPtr deserialized_place = (AggregateDataPtr)deserialized_data; + assert_cast(this)->create(deserialized_place); + DEFER({ assert_cast(this)->destroy(deserialized_place); }); + assert_cast(this)->deserialize_and_merge(place, deserialized_place, + buffer_reader, arena); } } @@ -407,16 +467,10 @@ public: deserialize_and_merge_from_column_range(place, column, 0, column.size() - 1, arena); } - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { - char deserialized_data[size_of_data()]; - AggregateDataPtr deserialized_place = (AggregateDataPtr)deserialized_data; - - auto derived = static_cast(this); - derived->create(deserialized_place); - derived->deserialize(deserialized_place, buf, arena); - derived->merge(place, deserialized_place, arena); - derived->destroy(deserialized_place); + void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs, + BufferReadable& buf, Arena* arena) const override { + assert_cast(this)->deserialize(rhs, buf, arena); + assert_cast(this)->merge(place, rhs, arena); } }; @@ -451,16 +505,10 @@ public: create(place); } - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { - char deserialized_data[size_of_data()]; - AggregateDataPtr deserialized_place = (AggregateDataPtr)deserialized_data; - - auto derived = assert_cast(this); - derived->create(deserialized_place); - DEFER({ derived->destroy(deserialized_place); }); - derived->deserialize(deserialized_place, buf, arena); - derived->merge(place, deserialized_place, arena); + void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs, + BufferReadable& buf, Arena* arena) const override { + assert_cast(this)->deserialize(rhs, buf, arena); + assert_cast(this)->merge(place, rhs, arena); } }; diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h b/be/src/vec/aggregate_functions/aggregate_function_avg.h index ff1de6b2e2..c30c6db6b5 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_avg.h +++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h @@ -233,6 +233,22 @@ public: } } + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec(places, offset, rhs, arena, num_rows); + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec_selected(places, offset, rhs, arena, num_rows); + } + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, IColumn& to) const override { auto& col = assert_cast(to); diff --git a/be/src/vec/aggregate_functions/aggregate_function_count.h b/be/src/vec/aggregate_functions/aggregate_function_count.h index 35da6c3406..92d0f644d3 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_count.h +++ b/be/src/vec/aggregate_functions/aggregate_function_count.h @@ -145,6 +145,22 @@ public: } } + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec(places, offset, rhs, arena, num_rows); + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec_selected(places, offset, rhs, arena, num_rows); + } + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, IColumn& to) const override { auto& col = assert_cast(to); @@ -267,6 +283,22 @@ public: } } + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec(places, offset, rhs, arena, num_rows); + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec_selected(places, offset, rhs, arena, num_rows); + } + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, IColumn& to) const override { auto& col = assert_cast(to); diff --git a/be/src/vec/aggregate_functions/aggregate_function_count_old.h b/be/src/vec/aggregate_functions/aggregate_function_count_old.h index ff935b6a67..e42299fdc6 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_count_old.h +++ b/be/src/vec/aggregate_functions/aggregate_function_count_old.h @@ -130,6 +130,22 @@ public: } } + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec(places, offset, rhs, arena, num_rows); + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec_selected(places, offset, rhs, arena, num_rows); + } + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, IColumn& to) const override { auto& col = assert_cast(to); diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h b/be/src/vec/aggregate_functions/aggregate_function_min_max.h index 5d3bee95c9..778d311214 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h +++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h @@ -624,6 +624,22 @@ public: } } + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec(places, offset, rhs, arena, num_rows); + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec_selected(places, offset, rhs, arena, num_rows); + } + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, IColumn& to) const override { if constexpr (Data::IsFixedLength) { diff --git a/be/src/vec/aggregate_functions/aggregate_function_nothing.h b/be/src/vec/aggregate_functions/aggregate_function_nothing.h index f8f3cda43c..0c48dcc52b 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_nothing.h +++ b/be/src/vec/aggregate_functions/aggregate_function_nothing.h @@ -65,8 +65,8 @@ public: to.insert_default(); } - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override {} + void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs, + BufferReadable& buf, Arena* arena) const override {} void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column, Arena* arena) const override {} diff --git a/be/src/vec/aggregate_functions/aggregate_function_null.h b/be/src/vec/aggregate_functions/aggregate_function_null.h index 50f5ab380e..a0bb79dfc0 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_null.h +++ b/be/src/vec/aggregate_functions/aggregate_function_null.h @@ -143,15 +143,17 @@ public: } } - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { + void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs, + BufferReadable& buf, Arena* arena) const override { bool flag = true; if (result_is_nullable) { read_binary(flag, buf); } if (flag) { + set_flag(rhs); set_flag(place); - nested_function->deserialize_and_merge(nested_place(place), buf, arena); + nested_function->deserialize_and_merge(nested_place(place), nested_place(rhs), buf, + arena); } } diff --git a/be/src/vec/aggregate_functions/aggregate_function_sum.h b/be/src/vec/aggregate_functions/aggregate_function_sum.h index cab803bc7a..e81e7b4af2 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_sum.h +++ b/be/src/vec/aggregate_functions/aggregate_function_sum.h @@ -177,6 +177,22 @@ public: } } + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec(places, offset, rhs, arena, num_rows); + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec_selected(places, offset, rhs, arena, num_rows); + } + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, IColumn& to) const override { auto& col = assert_cast(to); diff --git a/be/src/vec/aggregate_functions/aggregate_function_uniq.h b/be/src/vec/aggregate_functions/aggregate_function_uniq.h index c8165ae3b2..b9445c4ed7 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_uniq.h +++ b/be/src/vec/aggregate_functions/aggregate_function_uniq.h @@ -193,8 +193,8 @@ public: } } - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { + void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs, + BufferReadable& buf, Arena* arena) const override { auto& set = this->data(place).set; UInt64 size; read_var_uint(size, buf); @@ -210,7 +210,17 @@ public: void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, Arena* arena) const override { - deserialize_and_merge(place, buf, arena); + auto& set = this->data(place).set; + UInt64 size; + read_var_uint(size, buf); + + set.rehash(size + set.size()); + + for (size_t i = 0; i < size; ++i) { + KeyType ref; + read_pod_binary(ref, buf); + set.insert(ref); + } } void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index ca36809ca9..ca34663515 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -338,7 +338,7 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { _get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime"); _serialize_data_timer = ADD_TIMER(runtime_profile(), "SerializeDataTime"); _serialize_result_timer = ADD_TIMER(runtime_profile(), "SerializeResultTime"); - _deserialize_data_timer = ADD_TIMER(runtime_profile(), "DeserializeDataTime"); + _deserialize_data_timer = ADD_TIMER(runtime_profile(), "DeserializeAndMergeTime"); _hash_table_compute_timer = ADD_TIMER(runtime_profile(), "HashTableComputeTime"); _hash_table_emplace_timer = ADD_TIMER(runtime_profile(), "HashTableEmplaceTime"); _hash_table_iterate_timer = ADD_TIMER(runtime_profile(), "HashTableIterateTime"); diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 05893d9873..1e01a138a3 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -1091,18 +1091,11 @@ private: { SCOPED_TIMER(_deserialize_data_timer); - _aggregate_evaluators[i]->function()->deserialize_from_column( - _deserialize_buffer.data(), *column, _agg_arena_pool.get(), rows); + _aggregate_evaluators[i]->function()->deserialize_and_merge_vec_selected( + _places.data(), _offsets_of_aggregate_states[i], + _deserialize_buffer.data(), (ColumnString*)(column.get()), + _agg_arena_pool.get(), rows); } - - DEFER({ - _aggregate_evaluators[i]->function()->destroy_vec( - _deserialize_buffer.data(), rows); - }); - - _aggregate_evaluators[i]->function()->merge_vec_selected( - _places.data(), _offsets_of_aggregate_states[i], - _deserialize_buffer.data(), _agg_arena_pool.get(), rows); } else { RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected( block, _offsets_of_aggregate_states[i], _places.data(), @@ -1133,18 +1126,11 @@ private: { SCOPED_TIMER(_deserialize_data_timer); - _aggregate_evaluators[i]->function()->deserialize_from_column( - _deserialize_buffer.data(), *column, _agg_arena_pool.get(), rows); + _aggregate_evaluators[i]->function()->deserialize_and_merge_vec( + _places.data(), _offsets_of_aggregate_states[i], + _deserialize_buffer.data(), (ColumnString*)(column.get()), + _agg_arena_pool.get(), rows); } - - DEFER({ - _aggregate_evaluators[i]->function()->destroy_vec( - _deserialize_buffer.data(), rows); - }); - - _aggregate_evaluators[i]->function()->merge_vec( - _places.data(), _offsets_of_aggregate_states[i], - _deserialize_buffer.data(), _agg_arena_pool.get(), rows); } else { RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add( block, _offsets_of_aggregate_states[i], _places.data(),