diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index c7c7fc38ca..677c189002 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -107,10 +107,6 @@ public: virtual void deserialize_vec(AggregateDataPtr places, ColumnString* column, Arena* arena, size_t num_rows) const = 0; - /// Deserializes state and merge it with current aggregation function. - virtual void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const = 0; - /// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()). virtual bool allocates_memory_in_arena() const { return false; } @@ -257,17 +253,6 @@ public: size_t align_of_data() const override { return alignof(Data); } void reset(AggregateDataPtr place) const override {} - - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { - Data deserialized_data; - AggregateDataPtr deserialized_place = (AggregateDataPtr)&deserialized_data; - - auto derived = static_cast(this); - derived->create(deserialized_place); - derived->deserialize(deserialized_place, buf, arena); - derived->merge(place, deserialized_place, arena); - } }; using AggregateFunctionPtr = std::shared_ptr; diff --git a/be/src/vec/aggregate_functions/aggregate_function_nothing.h b/be/src/vec/aggregate_functions/aggregate_function_nothing.h index 64af14a6cf..c0ae740be4 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_nothing.h +++ b/be/src/vec/aggregate_functions/aggregate_function_nothing.h @@ -64,9 +64,6 @@ public: void insert_result_into(ConstAggregateDataPtr, IColumn& to) const override { to.insert_default(); } - - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override {} }; } // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/aggregate_function_null.h b/be/src/vec/aggregate_functions/aggregate_function_null.h index 89960bc9f0..5b804b82a7 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_null.h +++ b/be/src/vec/aggregate_functions/aggregate_function_null.h @@ -151,18 +151,6 @@ public: } } - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { - bool flag = true; - if (result_is_nullable) { - read_binary(flag, buf); - } - if (flag) { - set_flag(place); - nested_function->deserialize_and_merge(nested_place(place), buf, arena); - } - } - void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { if constexpr (result_is_nullable) { ColumnNullable& to_concrete = assert_cast(to); diff --git a/be/src/vec/aggregate_functions/aggregate_function_uniq.h b/be/src/vec/aggregate_functions/aggregate_function_uniq.h index 988e9bdb01..c717307c72 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_uniq.h +++ b/be/src/vec/aggregate_functions/aggregate_function_uniq.h @@ -20,8 +20,6 @@ #pragma once -#include - #include #include "gutil/hash/city.h" @@ -36,26 +34,29 @@ namespace doris::vectorized { -// Here is an empirical value. -static constexpr size_t HASH_MAP_PREFETCH_DIST = 16; - /// uniqExact template struct AggregateFunctionUniqExactData { - static constexpr bool is_string_key = std::is_same_v; - using Key = std::conditional_t; - using Hash = std::conditional_t>; + using Key = T; - using Set = phmap::flat_hash_set; + /// When creating, the hash table must be small. + using Set = HashSet, HashTableGrower<4>, + HashTableAllocatorWithStackMemory>; - static UInt128 ALWAYS_INLINE get_key(const StringRef& value) { - UInt128 key; - SipHash hash; - hash.update(value.data, value.size); - hash.get128(key.low, key.high); - return key; - } + Set set; + + static String get_name() { return "uniqExact"; } +}; + +/// For rows, we put the SipHash values (128 bits) into the hash table. +template <> +struct AggregateFunctionUniqExactData { + using Key = UInt128; + + /// When creating, the hash table must be small. + using Set = HashSet, + HashTableAllocatorWithStackMemory>; Set set; @@ -72,9 +73,16 @@ struct OneAdder { static void ALWAYS_INLINE add(Data& data, const IColumn& column, size_t row_num) { if constexpr (std::is_same_v) { StringRef value = column.get_data_at(row_num); - data.set.insert(Data::get_key(value)); - } else if constexpr (IsDecimalNumber) { - data.set.insert(assert_cast&>(column).get_data()[row_num]); + + UInt128 key; + SipHash hash; + hash.update(value.data, value.size); + hash.get128(key.low, key.high); + + data.set.insert(key); + } else if constexpr (std::is_same_v) { + data.set.insert( + assert_cast&>(column).get_data()[row_num]); } else { data.set.insert(assert_cast&>(column).get_data()[row_num]); } @@ -88,7 +96,6 @@ template class AggregateFunctionUniq final : public IAggregateFunctionDataHelper> { public: - using KeyType = std::conditional_t, UInt128, T>; AggregateFunctionUniq(const DataTypes& argument_types_) : IAggregateFunctionDataHelper>(argument_types_, {}) {} @@ -102,96 +109,18 @@ public: detail::OneAdder::add(this->data(place), *columns[0], row_num); } - static ALWAYS_INLINE const KeyType* get_keys(std::vector& keys_container, - const IColumn& column, size_t batch_size) { - if constexpr (std::is_same_v) { - keys_container.resize(batch_size); - for (size_t i = 0; i != batch_size; ++i) { - StringRef value = column.get_data_at(i); - keys_container[i] = Data::get_key(value); - } - return keys_container.data(); - } else { - using ColumnType = - std::conditional_t, ColumnDecimal, ColumnVector>; - return assert_cast(column).get_data().data(); - } - } - - void add_batch(size_t batch_size, AggregateDataPtr* places, size_t place_offset, - const IColumn** columns, Arena* arena) const override { - std::vector keys_container; - const KeyType* keys = get_keys(keys_container, *columns[0], batch_size); - - std::vector array_of_data_set(batch_size); - - for (size_t i = 0; i != batch_size; ++i) { - array_of_data_set[i] = &(this->data(places[i] + place_offset).set); - } - - for (size_t i = 0; i != batch_size; ++i) { - if (i + HASH_MAP_PREFETCH_DIST < batch_size) { - array_of_data_set[i + HASH_MAP_PREFETCH_DIST]->prefetch( - keys[i + HASH_MAP_PREFETCH_DIST]); - } - - array_of_data_set[i]->insert(keys[i]); - } - } - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena*) const override { - auto& rhs_set = this->data(rhs).set; - if (rhs_set.size() == 0) return; - - auto& set = this->data(place).set; - set.rehash(set.size() + rhs_set.size()); - - for (auto elem : rhs_set) { - set.insert(elem); - } - } - - void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns, - Arena* arena) const override { - std::vector keys_container; - const KeyType* keys = get_keys(keys_container, *columns[0], batch_size); - auto& set = this->data(place).set; - - for (size_t i = 0; i != batch_size; ++i) { - if (i + HASH_MAP_PREFETCH_DIST < batch_size) { - set.prefetch(keys[i + HASH_MAP_PREFETCH_DIST]); - } - set.insert(keys[i]); - } + this->data(place).set.merge(this->data(rhs).set); } void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { - auto& set = this->data(place).set; - write_var_uint(set.size(), buf); - for (const auto& elem : set) { - write_pod_binary(elem, buf); - } - } - - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { - auto& set = this->data(place).set; - size_t size; - read_var_uint(size, buf); - - set.rehash(size + set.size()); - - for (size_t i = 0; i < size; ++i) { - KeyType ref; - read_pod_binary(ref, buf); - set.insert(ref); - } + this->data(place).set.write(buf); } void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { - deserialize_and_merge(place, buf, arena); + Arena*) const override { + this->data(place).set.read(buf); } void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 54f6b8d15a..d4325bc17e 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -618,10 +618,18 @@ Status AggregationNode::_merge_without_key(Block* block) { for (int j = 0; j < rows; ++j) { VectorBufferReader buffer_reader(((ColumnString*)(column.get()))->get_data_at(j)); + _create_agg_status(deserialize_buffer.get()); - _aggregate_evaluators[i]->function()->deserialize_and_merge( - _agg_data.without_key + _offsets_of_aggregate_states[i], buffer_reader, + _aggregate_evaluators[i]->function()->deserialize( + deserialize_buffer.get() + _offsets_of_aggregate_states[i], buffer_reader, &_agg_arena_pool); + + _aggregate_evaluators[i]->function()->merge( + _agg_data.without_key + _offsets_of_aggregate_states[i], + deserialize_buffer.get() + _offsets_of_aggregate_states[i], + &_agg_arena_pool); + + _destroy_agg_status(deserialize_buffer.get()); } } else { _aggregate_evaluators[i]->execute_single_add(