diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index 98faf3a389..4b2118fc51 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -143,11 +143,21 @@ public: virtual void deserialize_vec(AggregateDataPtr places, const ColumnString* column, Arena* arena, size_t num_rows) const = 0; + virtual void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const = 0; + + virtual void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, + const ColumnString* column, Arena* arena, + const size_t num_rows) const = 0; + virtual void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena* arena, size_t num_rows) const = 0; /// Deserializes state and merge it with current aggregation function. - virtual void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, + virtual void deserialize_and_merge(AggregateDataPtr __restrict place, + AggregateDataPtr __restrict rhs, BufferReadable& buf, Arena* arena) const = 0; virtual void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict place, @@ -361,6 +371,51 @@ public: } } + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + const auto size_of_data = assert_cast(this)->size_of_data(); + for (size_t i = 0; i != num_rows; ++i) { + try { + auto rhs_place = rhs + size_of_data * i; + VectorBufferReader buffer_reader(column->get_data_at(i)); + assert_cast(this)->create(rhs_place); + assert_cast(this)->deserialize_and_merge( + places[i] + offset, rhs_place, buffer_reader, arena); + } catch (...) { + for (int j = 0; j < i; ++j) { + auto place = rhs + size_of_data * j; + assert_cast(this)->destroy(place); + } + throw; + } + } + assert_cast(this)->destroy_vec(rhs, num_rows); + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + const auto size_of_data = assert_cast(this)->size_of_data(); + for (size_t i = 0; i != num_rows; ++i) { + try { + auto rhs_place = rhs + size_of_data * i; + VectorBufferReader buffer_reader(column->get_data_at(i)); + assert_cast(this)->create(rhs_place); + if (places[i]) + assert_cast(this)->deserialize_and_merge( + places[i] + offset, rhs_place, buffer_reader, arena); + } catch (...) { + for (int j = 0; j < i; ++j) { + auto place = rhs + size_of_data * j; + assert_cast(this)->destroy(place); + } + throw; + } + } + assert_cast(this)->destroy_vec(rhs, num_rows); + } + void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena* arena, size_t num_rows) const override { deserialize_vec(places, assert_cast(&column), arena, num_rows); @@ -395,7 +450,12 @@ public: for (size_t i = begin; i <= end; ++i) { VectorBufferReader buffer_reader( (assert_cast(column)).get_data_at(i)); - deserialize_and_merge(place, buffer_reader, arena); + char deserialized_data[size_of_data()]; + AggregateDataPtr deserialized_place = (AggregateDataPtr)deserialized_data; + assert_cast(this)->create(deserialized_place); + DEFER({ assert_cast(this)->destroy(deserialized_place); }); + assert_cast(this)->deserialize_and_merge(place, deserialized_place, + buffer_reader, arena); } } @@ -407,16 +467,10 @@ public: deserialize_and_merge_from_column_range(place, column, 0, column.size() - 1, arena); } - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { - char deserialized_data[size_of_data()]; - AggregateDataPtr deserialized_place = (AggregateDataPtr)deserialized_data; - - auto derived = static_cast(this); - derived->create(deserialized_place); - derived->deserialize(deserialized_place, buf, arena); - derived->merge(place, deserialized_place, arena); - derived->destroy(deserialized_place); + void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs, + BufferReadable& buf, Arena* arena) const override { + assert_cast(this)->deserialize(rhs, buf, arena); + assert_cast(this)->merge(place, rhs, arena); } }; @@ -451,16 +505,10 @@ public: create(place); } - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { - char deserialized_data[size_of_data()]; - AggregateDataPtr deserialized_place = (AggregateDataPtr)deserialized_data; - - auto derived = assert_cast(this); - derived->create(deserialized_place); - DEFER({ derived->destroy(deserialized_place); }); - derived->deserialize(deserialized_place, buf, arena); - derived->merge(place, deserialized_place, arena); + void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs, + BufferReadable& buf, Arena* arena) const override { + assert_cast(this)->deserialize(rhs, buf, arena); + assert_cast(this)->merge(place, rhs, arena); } }; diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h b/be/src/vec/aggregate_functions/aggregate_function_avg.h index ff1de6b2e2..c30c6db6b5 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_avg.h +++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h @@ -233,6 +233,22 @@ public: } } + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec(places, offset, rhs, arena, num_rows); + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec_selected(places, offset, rhs, arena, num_rows); + } + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, IColumn& to) const override { auto& col = assert_cast(to); diff --git a/be/src/vec/aggregate_functions/aggregate_function_count.h b/be/src/vec/aggregate_functions/aggregate_function_count.h index 35da6c3406..92d0f644d3 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_count.h +++ b/be/src/vec/aggregate_functions/aggregate_function_count.h @@ -145,6 +145,22 @@ public: } } + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec(places, offset, rhs, arena, num_rows); + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec_selected(places, offset, rhs, arena, num_rows); + } + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, IColumn& to) const override { auto& col = assert_cast(to); @@ -267,6 +283,22 @@ public: } } + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec(places, offset, rhs, arena, num_rows); + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec_selected(places, offset, rhs, arena, num_rows); + } + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, IColumn& to) const override { auto& col = assert_cast(to); diff --git a/be/src/vec/aggregate_functions/aggregate_function_count_old.h b/be/src/vec/aggregate_functions/aggregate_function_count_old.h index ff935b6a67..e42299fdc6 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_count_old.h +++ b/be/src/vec/aggregate_functions/aggregate_function_count_old.h @@ -130,6 +130,22 @@ public: } } + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec(places, offset, rhs, arena, num_rows); + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec_selected(places, offset, rhs, arena, num_rows); + } + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, IColumn& to) const override { auto& col = assert_cast(to); diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h b/be/src/vec/aggregate_functions/aggregate_function_min_max.h index 5d3bee95c9..778d311214 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h +++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h @@ -624,6 +624,22 @@ public: } } + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec(places, offset, rhs, arena, num_rows); + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec_selected(places, offset, rhs, arena, num_rows); + } + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, IColumn& to) const override { if constexpr (Data::IsFixedLength) { diff --git a/be/src/vec/aggregate_functions/aggregate_function_nothing.h b/be/src/vec/aggregate_functions/aggregate_function_nothing.h index f8f3cda43c..0c48dcc52b 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_nothing.h +++ b/be/src/vec/aggregate_functions/aggregate_function_nothing.h @@ -65,8 +65,8 @@ public: to.insert_default(); } - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override {} + void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs, + BufferReadable& buf, Arena* arena) const override {} void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column, Arena* arena) const override {} diff --git a/be/src/vec/aggregate_functions/aggregate_function_null.h b/be/src/vec/aggregate_functions/aggregate_function_null.h index 50f5ab380e..a0bb79dfc0 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_null.h +++ b/be/src/vec/aggregate_functions/aggregate_function_null.h @@ -143,15 +143,17 @@ public: } } - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { + void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs, + BufferReadable& buf, Arena* arena) const override { bool flag = true; if (result_is_nullable) { read_binary(flag, buf); } if (flag) { + set_flag(rhs); set_flag(place); - nested_function->deserialize_and_merge(nested_place(place), buf, arena); + nested_function->deserialize_and_merge(nested_place(place), nested_place(rhs), buf, + arena); } } diff --git a/be/src/vec/aggregate_functions/aggregate_function_sum.h b/be/src/vec/aggregate_functions/aggregate_function_sum.h index cab803bc7a..e81e7b4af2 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_sum.h +++ b/be/src/vec/aggregate_functions/aggregate_function_sum.h @@ -177,6 +177,22 @@ public: } } + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec(places, offset, rhs, arena, num_rows); + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + this->deserialize_from_column(rhs, *column, arena, num_rows); + DEFER({ this->destroy_vec(rhs, num_rows); }); + this->merge_vec_selected(places, offset, rhs, arena, num_rows); + } + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, IColumn& to) const override { auto& col = assert_cast(to); diff --git a/be/src/vec/aggregate_functions/aggregate_function_uniq.h b/be/src/vec/aggregate_functions/aggregate_function_uniq.h index c8165ae3b2..b9445c4ed7 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_uniq.h +++ b/be/src/vec/aggregate_functions/aggregate_function_uniq.h @@ -193,8 +193,8 @@ public: } } - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { + void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs, + BufferReadable& buf, Arena* arena) const override { auto& set = this->data(place).set; UInt64 size; read_var_uint(size, buf); @@ -210,7 +210,17 @@ public: void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, Arena* arena) const override { - deserialize_and_merge(place, buf, arena); + auto& set = this->data(place).set; + UInt64 size; + read_var_uint(size, buf); + + set.rehash(size + set.size()); + + for (size_t i = 0; i < size; ++i) { + KeyType ref; + read_pod_binary(ref, buf); + set.insert(ref); + } } void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index ca36809ca9..ca34663515 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -338,7 +338,7 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { _get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime"); _serialize_data_timer = ADD_TIMER(runtime_profile(), "SerializeDataTime"); _serialize_result_timer = ADD_TIMER(runtime_profile(), "SerializeResultTime"); - _deserialize_data_timer = ADD_TIMER(runtime_profile(), "DeserializeDataTime"); + _deserialize_data_timer = ADD_TIMER(runtime_profile(), "DeserializeAndMergeTime"); _hash_table_compute_timer = ADD_TIMER(runtime_profile(), "HashTableComputeTime"); _hash_table_emplace_timer = ADD_TIMER(runtime_profile(), "HashTableEmplaceTime"); _hash_table_iterate_timer = ADD_TIMER(runtime_profile(), "HashTableIterateTime"); diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 05893d9873..1e01a138a3 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -1091,18 +1091,11 @@ private: { SCOPED_TIMER(_deserialize_data_timer); - _aggregate_evaluators[i]->function()->deserialize_from_column( - _deserialize_buffer.data(), *column, _agg_arena_pool.get(), rows); + _aggregate_evaluators[i]->function()->deserialize_and_merge_vec_selected( + _places.data(), _offsets_of_aggregate_states[i], + _deserialize_buffer.data(), (ColumnString*)(column.get()), + _agg_arena_pool.get(), rows); } - - DEFER({ - _aggregate_evaluators[i]->function()->destroy_vec( - _deserialize_buffer.data(), rows); - }); - - _aggregate_evaluators[i]->function()->merge_vec_selected( - _places.data(), _offsets_of_aggregate_states[i], - _deserialize_buffer.data(), _agg_arena_pool.get(), rows); } else { RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected( block, _offsets_of_aggregate_states[i], _places.data(), @@ -1133,18 +1126,11 @@ private: { SCOPED_TIMER(_deserialize_data_timer); - _aggregate_evaluators[i]->function()->deserialize_from_column( - _deserialize_buffer.data(), *column, _agg_arena_pool.get(), rows); + _aggregate_evaluators[i]->function()->deserialize_and_merge_vec( + _places.data(), _offsets_of_aggregate_states[i], + _deserialize_buffer.data(), (ColumnString*)(column.get()), + _agg_arena_pool.get(), rows); } - - DEFER({ - _aggregate_evaluators[i]->function()->destroy_vec( - _deserialize_buffer.data(), rows); - }); - - _aggregate_evaluators[i]->function()->merge_vec( - _places.data(), _offsets_of_aggregate_states[i], - _deserialize_buffer.data(), _agg_arena_pool.get(), rows); } else { RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add( block, _offsets_of_aggregate_states[i], _places.data(),