From f5e3cd2737fde82288f82ab6eaee52cad41592b1 Mon Sep 17 00:00:00 2001 From: Pxl Date: Wed, 2 Aug 2023 11:50:21 +0800 Subject: [PATCH] [Improvement](aggregation) optimization for aggregation hash_table_lazy_emplace (#22327) optimization for aggregation hash_table_lazy_emplace --- .../aggregate_function_uniq.h | 4 +- be/src/vec/common/aggregation_common.h | 91 ++++++++++++++----- be/src/vec/common/columns_hashing.h | 38 ++++++-- be/src/vec/common/columns_hashing_impl.h | 17 ++++ .../vec/common/hash_table/fixed_hash_table.h | 34 ------- be/src/vec/common/hash_table/hash.h | 3 + .../common/hash_table/hash_table_key_holder.h | 9 ++ be/src/vec/common/hash_table/ph_hash_map.h | 25 +++++ be/src/vec/common/memcpy_small.h | 5 + .../vec/exec/distinct_vaggregation_node.cpp | 63 ++++++------- be/src/vec/exec/distinct_vaggregation_node.h | 4 +- be/src/vec/exec/vaggregation_node.cpp | 71 ++++++--------- be/src/vec/exec/vaggregation_node.h | 42 ++++++--- be/src/vec/exec/vpartition_sort_node.cpp | 4 +- 14 files changed, 250 insertions(+), 160 deletions(-) diff --git a/be/src/vec/aggregate_functions/aggregate_function_uniq.h b/be/src/vec/aggregate_functions/aggregate_function_uniq.h index 0028fa7a38..c8165ae3b2 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_uniq.h +++ b/be/src/vec/aggregate_functions/aggregate_function_uniq.h @@ -35,6 +35,7 @@ #include "vec/columns/column_vector.h" #include "vec/columns/columns_number.h" #include "vec/common/assert_cast.h" +#include "vec/common/hash_table/hash.h" #include "vec/common/hash_table/phmap_fwd_decl.h" #include "vec/common/sip_hash.h" #include "vec/common/string_ref.h" @@ -58,9 +59,6 @@ struct HashCRC32; namespace doris::vectorized { -// Here is an empirical value. -static constexpr size_t HASH_MAP_PREFETCH_DIST = 16; - /// uniqExact template diff --git a/be/src/vec/common/aggregation_common.h b/be/src/vec/common/aggregation_common.h index 31f19e7418..1c07080eb0 100644 --- a/be/src/vec/common/aggregation_common.h +++ b/be/src/vec/common/aggregation_common.h @@ -71,10 +71,8 @@ using KeysNullMap = std::array()>; /// Pack into a binary blob of type T a set of fixed-size keys. Granted that all the keys fit into the /// binary blob, they are disposed in it consecutively. -template -T pack_fixed(size_t i, size_t keys_size, const ColumnRawPtrs& key_columns, const Sizes& key_sizes, - const ColumnRawPtrs* low_cardinality_positions [[maybe_unused]] = nullptr, - const Sizes* low_cardinality_sizes [[maybe_unused]] = nullptr) { +template +T pack_fixed(size_t i, size_t keys_size, const ColumnRawPtrs& key_columns, const Sizes& key_sizes) { union { T key; char bytes[sizeof(key)] = {}; @@ -85,26 +83,6 @@ T pack_fixed(size_t i, size_t keys_size, const ColumnRawPtrs& key_columns, const for (size_t j = 0; j < keys_size; ++j) { size_t index = i; const IColumn* column = key_columns[j]; - if constexpr (has_low_cardinality) { - if (const IColumn* positions = (*low_cardinality_positions)[j]) { - switch ((*low_cardinality_sizes)[j]) { - case sizeof(UInt8): - index = assert_cast(positions)->get_element(i); - break; - case sizeof(UInt16): - index = assert_cast(positions)->get_element(i); - break; - case sizeof(UInt32): - index = assert_cast(positions)->get_element(i); - break; - case sizeof(UInt64): - index = assert_cast(positions)->get_element(i); - break; - default: - LOG(FATAL) << "Unexpected size of index type for low cardinality column."; - } - } - } switch (key_sizes[j]) { case 1: @@ -218,6 +196,71 @@ T pack_fixed(size_t i, size_t keys_size, const ColumnRawPtrs& key_columns, const return key; } +template +std::vector pack_fixeds(size_t row_numbers, const ColumnRawPtrs& key_columns, + const Sizes& key_sizes, const ColumnRawPtrs& nullmap_columns) { + size_t bitmap_size = 0; + if (!nullmap_columns.empty()) { + bitmap_size = std::tuple_size>::value; + } + + std::vector result(row_numbers); + size_t offset = 0; + if (bitmap_size > 0) { + for (size_t j = 0; j < nullmap_columns.size(); j++) { + if (!nullmap_columns[j]) { + continue; + } + size_t bucket = j / 8; + size_t offset = j % 8; + const auto& data = + assert_cast(*nullmap_columns[j]).get_data().data(); + for (size_t i = 0; i < row_numbers; ++i) { + *((char*)(&result[i]) + bucket) |= data[i] << offset; + } + } + offset += bitmap_size; + } + + for (size_t j = 0; j < key_columns.size(); ++j) { + const char* data = key_columns[j]->get_raw_data().data; + + auto foo = [&](Fixed zero) { + CHECK_EQ(sizeof(Fixed), key_sizes[j]); + if (nullmap_columns.size() && nullmap_columns[j]) { + const auto& nullmap = + assert_cast(*nullmap_columns[j]).get_data().data(); + for (size_t i = 0; i < row_numbers; ++i) { + // make sure null cell is filled by 0x0 + memcpy_fixed((char*)(&result[i]) + offset, + nullmap[i] ? (char*)&zero : data + i * sizeof(Fixed)); + } + } else { + for (size_t i = 0; i < row_numbers; ++i) { + memcpy_fixed((char*)(&result[i]) + offset, data + i * sizeof(Fixed)); + } + } + }; + + if (key_sizes[j] == 1) { + foo(int8_t()); + } else if (key_sizes[j] == 2) { + foo(int16_t()); + } else if (key_sizes[j] == 4) { + foo(int32_t()); + } else if (key_sizes[j] == 8) { + foo(int64_t()); + } else if (key_sizes[j] == 16) { + foo(UInt128()); + } else { + throw Exception(ErrorCode::INTERNAL_ERROR, + "pack_fixeds input invalid key size, key_size={}", key_sizes[j]); + } + offset += key_sizes[j]; + } + return result; +} + /// Hash a set of keys into a UInt128 value. inline UInt128 hash128(size_t i, size_t keys_size, const ColumnRawPtrs& key_columns) { UInt128 key; diff --git a/be/src/vec/common/columns_hashing.h b/be/src/vec/common/columns_hashing.h index 64fe7d87b5..84d486a443 100644 --- a/be/src/vec/common/columns_hashing.h +++ b/be/src/vec/common/columns_hashing.h @@ -21,7 +21,9 @@ #pragma once #include +#include +#include "vec/aggregate_functions/aggregate_function.h" #include "vec/columns/column_string.h" #include "vec/common/arena.h" #include "vec/common/assert_cast.h" @@ -29,6 +31,7 @@ #include "vec/common/hash_table/hash_table.h" #include "vec/common/hash_table/hash_table_key_holder.h" #include "vec/common/hash_table/ph_hash_map.h" +#include "vec/common/string_ref.h" #include "vec/common/unaligned.h" namespace doris::vectorized { @@ -70,8 +73,11 @@ struct HashMethodOneNumber : public columns_hashing_impl::HashMethodBase< using Base::get_hash; /// (const Data & data, size_t row, Arena & pool) -> size_t /// Is used for default implementation in HashMethodBase. - FieldType get_key_holder(size_t row, Arena&) const { - return unaligned_load(vec + row * sizeof(FieldType)); + FieldType get_key_holder(size_t row, Arena&) const { return ((FieldType*)(vec))[row]; } + FieldType pack_key_holder(FieldType key, Arena&) const { return key; } + + std::span get_keys(size_t rows_number) const { + return std::span((FieldType*)vec, rows_number); } }; @@ -124,13 +130,13 @@ struct HashMethodSerialized ColumnRawPtrs key_columns; size_t keys_size; - const StringRef* keys; + StringRef* keys; HashMethodSerialized(const ColumnRawPtrs& key_columns_, const Sizes& /*key_sizes*/, const HashMethodContextPtr&) : key_columns(key_columns_), keys_size(key_columns_.size()) {} - void set_serialized_keys(const StringRef* keys_) { keys = keys_; } + void set_serialized_keys(StringRef* keys_) { keys = keys_; } ALWAYS_INLINE KeyHolderType get_key_holder(size_t row, Arena& pool) const { if constexpr (keys_pre_serialized) { @@ -141,6 +147,14 @@ struct HashMethodSerialized } } + KeyHolderType pack_key_holder(StringRef key, Arena& pool) const { + return KeyHolderType {key, pool}; + } + + std::span get_keys(size_t rows_number) const { + return std::span(keys, rows_number); + } + protected: friend class columns_hashing_impl::HashMethodBase; }; @@ -201,6 +215,13 @@ struct HashMethodKeysFixed return pack_fixed(row, keys_size, Base::get_actual_columns(), key_sizes); } } + + Key pack_key_holder(Key key, Arena& pool) const { return key; } + + std::vector get_keys(size_t rows_number) const { + return pack_fixeds(rows_number, Base::get_actual_columns(), key_sizes, + Base::get_nullmap_columns()); + } }; template @@ -261,6 +282,11 @@ struct HashMethodSingleLowNullableColumn : public SingleColumnMethod { template ALWAYS_INLINE EmplaceResult emplace_key(Data& data, size_t hash_value, size_t row, Arena& pool) { + return emplace_with_key(data, Base::get_key_holder(row, pool), hash_value, row, pool); + } + + template + EmplaceResult emplace_with_key(Data& data, KeyHolder&& key, size_t hash_value, size_t row) { if (key_column->is_null_at(row)) { bool has_null_key = data.has_null_key_data(); data.has_null_key_data() = true; @@ -273,11 +299,9 @@ struct HashMethodSingleLowNullableColumn : public SingleColumnMethod { } } - auto key_holder = Base::get_key_holder(row, pool); - bool inserted = false; typename Data::LookupResult it; - data.emplace(key_holder, it, hash_value, inserted); + data.emplace(key, it, hash_value, inserted); if constexpr (has_mapped) { auto& mapped = *lookup_result_get_mapped(it); diff --git a/be/src/vec/common/columns_hashing_impl.h b/be/src/vec/common/columns_hashing_impl.h index 5b85d2732c..bfd69c35a9 100644 --- a/be/src/vec/common/columns_hashing_impl.h +++ b/be/src/vec/common/columns_hashing_impl.h @@ -20,6 +20,7 @@ #pragma once +#include "vec/aggregate_functions/aggregate_function.h" #include "vec/columns/column.h" #include "vec/columns/column_nullable.h" #include "vec/common/aggregation_common.h" @@ -140,6 +141,11 @@ public: return emplaceImpl(key_holder, hash_value, data); } + template + EmplaceResult emplace_with_key(Data& data, KeyHolder&& key, size_t hash_value, size_t row) { + return emplaceImpl(key, hash_value, data); + } + template ALWAYS_INLINE typename std::enable_if_t& lazy_emplace_key(Data& data, size_t row, @@ -156,6 +162,12 @@ public: return lazy_emplace_impl(key_holder, hash_value, data, std::forward(f)); } + template + void lazy_emplace_keys(Data& data, const Keys& keys, const std::vector& hash_values, + Func&& f, AggregateDataPtr* places) { + data.lazy_emplace_keys(std::span(keys), hash_values, places, std::forward(f)); + } + template ALWAYS_INLINE FindResult find_key(Data& data, size_t row, Arena& pool) { auto key_holder = static_cast(*this).get_key_holder(row, pool); @@ -421,6 +433,8 @@ protected: /// column. Otherwise we return the key column itself. const ColumnRawPtrs& get_actual_columns() const { return actual_columns; } + const ColumnRawPtrs& get_nullmap_columns() const { return null_maps; } + /// Create a bitmap that indicates whether, for a particular row, /// a key column bears a null value or not. KeysNullMap create_bitmap(size_t row) const { @@ -453,12 +467,15 @@ protected: const ColumnRawPtrs& get_actual_columns() const { return actual_columns; } + const ColumnRawPtrs& get_nullmap_columns() const { return null_maps; } + KeysNullMap create_bitmap(size_t) const { LOG(FATAL) << "Internal error: calling create_bitmap() for non-nullable keys is forbidden"; } private: ColumnRawPtrs actual_columns; + ColumnRawPtrs null_maps; }; } // namespace columns_hashing_impl diff --git a/be/src/vec/common/hash_table/fixed_hash_table.h b/be/src/vec/common/hash_table/fixed_hash_table.h index 310857afa8..c498402ac4 100644 --- a/be/src/vec/common/hash_table/fixed_hash_table.h +++ b/be/src/vec/common/hash_table/fixed_hash_table.h @@ -22,40 +22,6 @@ #include "vec/common/hash_table/hash_table.h" -template -struct FixedHashTableCell { - using State = TState; - - using value_type = Key; - using mapped_type = VoidMapped; - bool full; - - FixedHashTableCell() {} - FixedHashTableCell(const Key&, const State&) : full(true) {} - - const VoidKey get_key() const { return {}; } - VoidMapped get_mapped() const { return {}; } - - bool is_zero(const State&) const { return !full; } - void set_zero() { full = false; } - static constexpr bool need_zero_value_storage = false; - - /// This Cell is only stored inside an iterator. It's used to accommodate the fact - /// that the iterator based API always provide a reference to a continuous memory - /// containing the Key. As a result, we have to instantiate a real Key field. - /// All methods that return a mutable reference to the Key field are named with - /// -Mutable suffix, indicating this is uncommon usage. As this is only for lookup - /// tables, it's totally fine to discard the Key mutations. - struct CellExt { - Key key; - - const VoidKey get_key() const { return {}; } - VoidMapped get_mapped() const { return {}; } - const value_type& get_value() const { return key; } - void update(Key&& key_, FixedHashTableCell*) { key = key_; } - }; -}; - /// How to obtain the size of the table. template diff --git a/be/src/vec/common/hash_table/hash.h b/be/src/vec/common/hash_table/hash.h index be1841a47a..85888644a9 100644 --- a/be/src/vec/common/hash_table/hash.h +++ b/be/src/vec/common/hash_table/hash.h @@ -27,6 +27,9 @@ #include "vec/common/uint128.h" #include "vec/core/types.h" +// Here is an empirical value. +static constexpr size_t HASH_MAP_PREFETCH_DIST = 16; + /** Hash functions that are better than the trivial function std::hash. * * Example: when we do aggregation by the visitor ID, the performance increase is more than 5 times. diff --git a/be/src/vec/common/hash_table/hash_table_key_holder.h b/be/src/vec/common/hash_table/hash_table_key_holder.h index b3e317286e..7137403ff2 100644 --- a/be/src/vec/common/hash_table/hash_table_key_holder.h +++ b/be/src/vec/common/hash_table/hash_table_key_holder.h @@ -139,3 +139,12 @@ inline void ALWAYS_INLINE key_holder_discard_key(doris::vectorized::SerializedKe holder.key.data = nullptr; holder.key.size = 0; } + +template +void key_holder_persist_key_with_arena(Key&, doris::vectorized::Arena&) {} + +inline void key_holder_persist_key_with_arena(doris::StringRef& key, + doris::vectorized::Arena& arena) { + // Hash table shouldn't ask us to persist a zero key + key.data = arena.insert(key.data, key.size); +} diff --git a/be/src/vec/common/hash_table/ph_hash_map.h b/be/src/vec/common/hash_table/ph_hash_map.h index da51f31cf9..ad0769d1d9 100644 --- a/be/src/vec/common/hash_table/ph_hash_map.h +++ b/be/src/vec/common/hash_table/ph_hash_map.h @@ -20,7 +20,10 @@ #include #include +#include +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/common/arena.h" #include "vec/common/hash_table/hash.h" #include "vec/common/hash_table/hash_table_utils.h" #include "vec/common/hash_table/phmap_fwd_decl.h" @@ -153,6 +156,27 @@ public: } } + template + void lazy_emplace_keys(const std::span& keys, const std::vector& hash_values, + doris::vectorized::AggregateDataPtr* places, Func&& f) { + for (size_t i = 0; i < keys.size(); i++) { + if (LIKELY(i + HASH_MAP_PREFETCH_DIST < keys.size())) { + prefetch_by_hash(hash_values[i + HASH_MAP_PREFETCH_DIST]); + } + places[i] = _hash_map + .lazy_emplace_with_hash(keys[i], hash_values[i], + [&](const auto& ctor) { + key_holder_persist_key_with_arena( + keys[i], _arena); + f(ctor, keys[i]); + }) + ->second; + if constexpr (PartitionedHashTable) { + _check_if_need_partition(); + } + } + } + template void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, size_t hash_value, bool& inserted) { @@ -272,6 +296,7 @@ private: // this flag is set to true, and resize does not actually happen, // PartitionedHashTable will convert this hash table to partitioned hash table bool _need_partition; + doris::vectorized::Arena _arena; }; template diff --git a/be/src/vec/common/memcpy_small.h b/be/src/vec/common/memcpy_small.h index 6e281ebdaa..812e84dc87 100644 --- a/be/src/vec/common/memcpy_small.h +++ b/be/src/vec/common/memcpy_small.h @@ -81,3 +81,8 @@ inline void memcpy_small_allow_read_write_overflow15(void* __restrict dst, } #endif + +template +void memcpy_fixed(char* lhs, const char* rhs) { + *(T*)lhs = *(T*)rhs; +} diff --git a/be/src/vec/exec/distinct_vaggregation_node.cpp b/be/src/vec/exec/distinct_vaggregation_node.cpp index bbbd196411..f44b0c6e36 100644 --- a/be/src/vec/exec/distinct_vaggregation_node.cpp +++ b/be/src/vec/exec/distinct_vaggregation_node.cpp @@ -53,23 +53,23 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key( } int rows = in_block->rows(); - IColumn::Selector distinct_row; - distinct_row.reserve(rows); + _distinct_row.clear(); + _distinct_row.reserve(rows); RETURN_IF_CATCH_EXCEPTION( - _emplace_into_hash_table_to_distinct(distinct_row, key_columns, rows)); + _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows)); bool mem_reuse = _make_nullable_keys.empty() && out_block->mem_reuse(); if (mem_reuse) { for (int i = 0; i < key_size; ++i) { auto dst = out_block->get_by_position(i).column->assume_mutable(); - key_columns[i]->append_data_by_selector(dst, distinct_row); + key_columns[i]->append_data_by_selector(dst, _distinct_row); } } else { ColumnsWithTypeAndName columns_with_schema; for (int i = 0; i < key_size; ++i) { auto distinct_column = key_columns[i]->clone_empty(); - key_columns[i]->append_data_by_selector(distinct_column, distinct_row); + key_columns[i]->append_data_by_selector(distinct_column, _distinct_row); columns_with_schema.emplace_back(std::move(distinct_column), _probe_expr_ctxs[i]->root()->data_type(), _probe_expr_ctxs[i]->root()->expr_name()); @@ -92,43 +92,38 @@ void DistinctAggregationNode::_emplace_into_hash_table_to_distinct(IColumn::Sele _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); if constexpr (HashTableTraits::is_phmap) { + auto keys = state.get_keys(num_rows); if (_hash_values.size() < num_rows) { _hash_values.resize(num_rows); } - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - AggState>::value) { - for (size_t i = 0; i < num_rows; ++i) { - _hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + + for (size_t i = 0; i < num_rows; ++i) { + _hash_values[i] = agg_method.data.hash(keys[i]); + } + SCOPED_TIMER(_hash_table_emplace_timer); + for (size_t i = 0; i < num_rows; ++i) { + if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) { + agg_method.data.prefetch_by_hash( + _hash_values[i + HASH_MAP_PREFETCH_DIST]); } - } else { - for (size_t i = 0; i < num_rows; ++i) { - _hash_values[i] = - agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool)); + auto result = state.emplace_with_key( + agg_method.data, state.pack_key_holder(keys[i], *_agg_arena_pool), + _hash_values[i], i); + if (result.is_inserted()) { + distinct_row.push_back(i); + } + } + } else { + SCOPED_TIMER(_hash_table_emplace_timer); + for (size_t i = 0; i < num_rows; ++i) { + auto result = state.emplace_key(agg_method.data, i, *_agg_arena_pool); + if (result.is_inserted()) { + result.set_mapped(dummy_mapped_data); + distinct_row.push_back(i); } } } - - /// For all rows. COUNTER_UPDATE(_hash_table_input_counter, num_rows); - for (size_t i = 0; i < num_rows; ++i) { - auto emplace_result = [&]() { - if constexpr (HashTableTraits::is_phmap) { - if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) { - agg_method.data.prefetch_by_hash( - _hash_values[i + HASH_MAP_PREFETCH_DIST]); - } - return state.emplace_key(agg_method.data, _hash_values[i], i, - *_agg_arena_pool); - } else { - return state.emplace_key(agg_method.data, i, *_agg_arena_pool); - } - }(); - - if (emplace_result.is_inserted()) { - emplace_result.set_mapped(dummy_mapped_data); - distinct_row.push_back(i); - } - } }, _agg_data->_aggregated_method_variant); } diff --git a/be/src/vec/exec/distinct_vaggregation_node.h b/be/src/vec/exec/distinct_vaggregation_node.h index f5ca0ceebb..adeb042da8 100644 --- a/be/src/vec/exec/distinct_vaggregation_node.h +++ b/be/src/vec/exec/distinct_vaggregation_node.h @@ -47,9 +47,11 @@ public: vectorized::VExprContextSPtrs get_conjuncts() { return _conjuncts; } private: - char* dummy_mapped_data = nullptr; void _emplace_into_hash_table_to_distinct(IColumn::Selector& distinct_row, ColumnRawPtrs& key_columns, const size_t num_rows); + + char* dummy_mapped_data = nullptr; + IColumn::Selector _distinct_row; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index f4da6d9aaf..ca36809ca9 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -36,6 +36,7 @@ #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "util/telemetry/telemetry.h" +#include "vec/common/hash_table/hash.h" #include "vec/common/hash_table/hash_table_key_holder.h" #include "vec/common/hash_table/hash_table_utils.h" #include "vec/common/hash_table/string_hash_table.h" @@ -54,10 +55,6 @@ class ObjectPool; } // namespace doris namespace doris::vectorized { - -// Here is an empirical value. -static constexpr size_t HASH_MAP_PREFETCH_DIST = 16; - /// The minimum reduction factor (input rows divided by output rows) to grow hash tables /// in a streaming preaggregation, given that the hash tables are currently the given /// size or above. The sizes roughly correspond to hash table sizes where the bucket @@ -343,6 +340,7 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { _serialize_result_timer = ADD_TIMER(runtime_profile(), "SerializeResultTime"); _deserialize_data_timer = ADD_TIMER(runtime_profile(), "DeserializeDataTime"); _hash_table_compute_timer = ADD_TIMER(runtime_profile(), "HashTableComputeTime"); + _hash_table_emplace_timer = ADD_TIMER(runtime_profile(), "HashTableEmplaceTime"); _hash_table_iterate_timer = ADD_TIMER(runtime_profile(), "HashTableIterateTime"); _insert_keys_to_column_timer = ADD_TIMER(runtime_profile(), "InsertKeysToColumnTime"); _streaming_agg_timer = ADD_TIMER(runtime_profile(), "StreamingAggTime"); @@ -926,23 +924,6 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); - if constexpr (HashTableTraits::is_phmap) { - if (_hash_values.size() < num_rows) { - _hash_values.resize(num_rows); - } - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - AggState>::value) { - for (size_t i = 0; i < num_rows; ++i) { - _hash_values[i] = agg_method.data.hash(agg_method.keys[i]); - } - } else { - for (size_t i = 0; i < num_rows; ++i) { - _hash_values[i] = - agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool)); - } - } - } - auto creator = [this](const auto& ctor, const auto& key) { using KeyType = std::decay_t; if constexpr (HashTableTraits::is_string_hash_table && @@ -966,26 +947,34 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR _create_agg_status(mapped); }; - /// For all rows. - COUNTER_UPDATE(_hash_table_input_counter, num_rows); - for (size_t i = 0; i < num_rows; ++i) { - AggregateDataPtr mapped = nullptr; - if constexpr (HashTableTraits::is_phmap) { - if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) { - agg_method.data.prefetch_by_hash( - _hash_values[i + HASH_MAP_PREFETCH_DIST]); - } + if constexpr (HashTableTraits::is_phmap) { + auto keys = state.get_keys(num_rows); + if (_hash_values.size() < num_rows) { + _hash_values.resize(num_rows); + } + for (size_t i = 0; i < num_rows; ++i) { + _hash_values[i] = agg_method.data.hash(keys[i]); + } - if constexpr (ColumnsHashing::IsSingleNullableColumnMethod< - AggState>::value) { - mapped = state.lazy_emplace_key(agg_method.data, i, *_agg_arena_pool, - _hash_values[i], creator, - creator_for_null_key); - } else { - mapped = state.lazy_emplace_key(agg_method.data, _hash_values[i], i, - *_agg_arena_pool, creator); + SCOPED_TIMER(_hash_table_emplace_timer); + if constexpr (ColumnsHashing::IsSingleNullableColumnMethod::value) { + for (size_t i = 0; i < num_rows; ++i) { + if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) { + agg_method.data.prefetch_by_hash( + _hash_values[i + HASH_MAP_PREFETCH_DIST]); + } + + places[i] = state.lazy_emplace_key(agg_method.data, i, *_agg_arena_pool, + _hash_values[i], creator, + creator_for_null_key); } } else { + state.lazy_emplace_keys(agg_method.data, keys, _hash_values, creator, + places); + } + } else { + for (size_t i = 0; i < num_rows; ++i) { + AggregateDataPtr mapped = nullptr; if constexpr (ColumnsHashing::IsSingleNullableColumnMethod< AggState>::value) { mapped = state.lazy_emplace_key(agg_method.data, i, *_agg_arena_pool, @@ -994,11 +983,11 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR mapped = state.lazy_emplace_key(agg_method.data, i, *_agg_arena_pool, creator); } + places[i] = mapped; } - - places[i] = mapped; - assert(places[i] != nullptr); } + + COUNTER_UPDATE(_hash_table_input_counter, num_rows); }, _agg_data->_aggregated_method_variant); } diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 7d560dae89..05893d9873 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -50,7 +50,6 @@ #include "vec/common/assert_cast.h" #include "vec/common/columns_hashing.h" #include "vec/common/hash_table/fixed_hash_map.h" -#include "vec/common/hash_table/fixed_hash_table.h" #include "vec/common/hash_table/hash.h" #include "vec/common/hash_table/partitioned_hash_map.h" #include "vec/common/hash_table/ph_hash_map.h" @@ -314,13 +313,13 @@ struct AggregationMethodKeysFixed { for (size_t i = 0; i < key_columns.size(); ++i) { size_t size = key_sizes[i]; + char* data = nullptr; key_columns[i]->resize(num_rows); // If we have a nullable column, get its nested column and its null map. if (is_column_nullable(*key_columns[i])) { ColumnNullable& nullable_col = assert_cast(*key_columns[i]); - char* data = - const_cast(nullable_col.get_nested_column().get_raw_data().data); + data = const_cast(nullable_col.get_nested_column().get_raw_data().data); UInt8* nullmap = assert_cast(&nullable_col.get_null_map_column()) ->get_data() .data(); @@ -330,20 +329,34 @@ struct AggregationMethodKeysFixed { size_t bucket = i / 8; size_t offset = i % 8; for (size_t j = 0; j < num_rows; j++) { - const Key& key = keys[j]; - UInt8 val = (reinterpret_cast(&key)[bucket] >> offset) & 1; - nullmap[j] = val; - if (!val) { - memcpy(data + j * size, reinterpret_cast(&key) + pos, size); - } + nullmap[j] = (reinterpret_cast(&keys[j])[bucket] >> offset) & 1; } } else { - char* data = const_cast(key_columns[i]->get_raw_data().data); - for (size_t j = 0; j < num_rows; j++) { - const Key& key = keys[j]; - memcpy(data + j * size, reinterpret_cast(&key) + pos, size); - } + data = const_cast(key_columns[i]->get_raw_data().data); } + + auto foo = [&](Fixed zero) { + CHECK_EQ(sizeof(Fixed), size); + for (size_t j = 0; j < num_rows; j++) { + memcpy_fixed(data + j * sizeof(Fixed), (char*)(&keys[j]) + pos); + } + }; + + if (size == 1) { + foo(int8_t()); + } else if (size == 2) { + foo(int16_t()); + } else if (size == 4) { + foo(int32_t()); + } else if (size == 8) { + foo(int64_t()); + } else if (size == 16) { + foo(UInt128()); + } else { + throw Exception(ErrorCode::INTERNAL_ERROR, + "pack_fixeds input invalid key size, key_size={}", size); + } + pos += size; } } @@ -854,6 +867,7 @@ protected: // nullable diff. so we need make nullable of it. std::vector _make_nullable_keys; RuntimeProfile::Counter* _hash_table_compute_timer; + RuntimeProfile::Counter* _hash_table_emplace_timer; RuntimeProfile::Counter* _hash_table_input_counter; RuntimeProfile::Counter* _build_timer; RuntimeProfile::Counter* _expr_timer; diff --git a/be/src/vec/exec/vpartition_sort_node.cpp b/be/src/vec/exec/vpartition_sort_node.cpp index c31c8af6a9..8ee303105d 100644 --- a/be/src/vec/exec/vpartition_sort_node.cpp +++ b/be/src/vec/exec/vpartition_sort_node.cpp @@ -29,13 +29,13 @@ #include "common/logging.h" #include "common/object_pool.h" #include "runtime/runtime_state.h" +#include "vec/common/hash_table/hash.h" #include "vec/common/hash_table/hash_set.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" namespace doris::vectorized { -// Here is an empirical value. -static constexpr size_t HASH_MAP_PREFETCH_DIST = 16; + VPartitionSortNode::VPartitionSortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), _hash_table_size_counter(nullptr) {