[feature](agg)support two level-hash map in aggregation node (#15967)
This commit is contained in:
@ -342,6 +342,13 @@ public:
|
||||
return _query_options.partitioned_hash_join_rows_threshold;
|
||||
}
|
||||
|
||||
int partitioned_hash_agg_rows_threshold() const {
|
||||
if (!_query_options.__isset.partitioned_hash_agg_rows_threshold) {
|
||||
return 0;
|
||||
}
|
||||
return _query_options.partitioned_hash_agg_rows_threshold;
|
||||
}
|
||||
|
||||
const std::vector<TTabletCommitInfo>& tablet_commit_infos() const {
|
||||
return _tablet_commit_infos;
|
||||
}
|
||||
|
||||
@ -64,6 +64,7 @@ struct HashMethodOneNumber : public columns_hashing_impl::HashMethodBase<
|
||||
|
||||
/// Find key into HashTable or HashMap. If Data is HashMap and key was found, returns ptr to value, otherwise nullptr.
|
||||
using Base::find_key; /// (Data & data, size_t row, Arena & pool) -> FindResult
|
||||
using Base::find_key_with_hash;
|
||||
|
||||
/// Get hash value of row.
|
||||
using Base::get_hash; /// (const Data & data, size_t row, Arena & pool) -> size_t
|
||||
|
||||
@ -163,7 +163,8 @@ public:
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE FindResult find_key(Data& data, size_t hash_value, size_t row, Arena& pool) {
|
||||
ALWAYS_INLINE FindResult find_key_with_hash(Data& data, size_t hash_value, size_t row,
|
||||
Arena& pool) {
|
||||
auto key_holder = static_cast<Derived&>(*this).get_key_holder(row, pool);
|
||||
return find_key_impl(key_holder_get_key(key_holder), hash_value, data);
|
||||
}
|
||||
|
||||
@ -18,9 +18,11 @@
|
||||
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/HashTable/HashTable.h
|
||||
// and modified by Doris
|
||||
|
||||
#pragma once
|
||||
|
||||
template <typename T>
|
||||
struct HashTableTraits {
|
||||
static constexpr bool is_phmap = false;
|
||||
static constexpr bool is_parallel_phmap = false;
|
||||
static constexpr bool is_string_hash_table = false;
|
||||
static constexpr bool is_partitioned_table = false;
|
||||
};
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
|
||||
#include "vec/common/hash_table/hash_map.h"
|
||||
#include "vec/common/hash_table/partitioned_hash_table.h"
|
||||
#include "vec/common/hash_table/ph_hash_map.h"
|
||||
|
||||
template <typename ImplTable>
|
||||
class PartitionedHashMapTable : public PartitionedHashTable<ImplTable> {
|
||||
@ -46,8 +47,46 @@ public:
|
||||
|
||||
return *lookup_result_get_mapped(it);
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
void for_each_mapped(Func&& func) {
|
||||
for (auto& v : *this) {
|
||||
func(v.get_second());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>>
|
||||
using PartitionedHashMap =
|
||||
PartitionedHashMapTable<HashMap<Key, Mapped, Hash, PartitionedHashTableGrower<>>>;
|
||||
|
||||
template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>>
|
||||
using PHPartitionedHashMap = PartitionedHashMapTable<PHHashMap<Key, Mapped, Hash, true>>;
|
||||
|
||||
template <typename Key, typename Mapped, typename Hash>
|
||||
struct HashTableTraits<PartitionedHashMap<Key, Mapped, Hash>> {
|
||||
static constexpr bool is_phmap = false;
|
||||
static constexpr bool is_string_hash_table = false;
|
||||
static constexpr bool is_partitioned_table = true;
|
||||
};
|
||||
|
||||
template <template <typename> class Derived, typename Key, typename Mapped, typename Hash>
|
||||
struct HashTableTraits<Derived<PartitionedHashMap<Key, Mapped, Hash>>> {
|
||||
static constexpr bool is_phmap = false;
|
||||
static constexpr bool is_string_hash_table = false;
|
||||
static constexpr bool is_partitioned_table = true;
|
||||
};
|
||||
|
||||
template <typename Key, typename Mapped, typename Hash>
|
||||
struct HashTableTraits<PHPartitionedHashMap<Key, Mapped, Hash>> {
|
||||
static constexpr bool is_phmap = true;
|
||||
static constexpr bool is_string_hash_table = false;
|
||||
static constexpr bool is_partitioned_table = true;
|
||||
};
|
||||
|
||||
template <template <typename> class Derived, typename Key, typename Mapped, typename Hash>
|
||||
struct HashTableTraits<Derived<PHPartitionedHashMap<Key, Mapped, Hash>>> {
|
||||
static constexpr bool is_phmap = true;
|
||||
static constexpr bool is_string_hash_table = false;
|
||||
static constexpr bool is_partitioned_table = true;
|
||||
};
|
||||
|
||||
@ -20,6 +20,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "vec/common/hash_table/hash_table.h"
|
||||
#include "vec/common/hash_table/hash_table_utils.h"
|
||||
|
||||
/** Partitioned hash table.
|
||||
* Represents 16 (or 1ULL << BITS_FOR_SUB_TABLE) small hash tables (sub table count of the first level).
|
||||
@ -83,7 +84,7 @@ public:
|
||||
return *this;
|
||||
}
|
||||
|
||||
size_t hash(const Key& x) const { return Impl::Hash::operator()(x); }
|
||||
size_t hash(const Key& x) const { return level0_sub_table.hash(x); }
|
||||
|
||||
float get_factor() const { return MAX_SUB_TABLE_OCCUPANCY_FRACTION; }
|
||||
|
||||
@ -205,6 +206,9 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
bool has_null_key_data() const { return false; }
|
||||
char* get_null_key_data() { return nullptr; }
|
||||
|
||||
protected:
|
||||
typename Impl::iterator begin_of_next_non_empty_sub_table_idx(size_t& sub_table_idx) {
|
||||
while (sub_table_idx != NUM_LEVEL1_SUB_TABLES && level1_sub_tables[sub_table_idx].empty())
|
||||
@ -263,11 +267,11 @@ public:
|
||||
return *this;
|
||||
}
|
||||
|
||||
auto& operator*() const { return *current_it; }
|
||||
auto* operator->() const { return current_it.get_ptr(); }
|
||||
auto& operator*() { return *current_it; }
|
||||
auto* operator->() { return current_it.get_ptr(); }
|
||||
|
||||
auto* get_ptr() const { return current_it.get_ptr(); }
|
||||
size_t get_hash() const { return current_it.get_hash(); }
|
||||
auto* get_ptr() { return current_it.get_ptr(); }
|
||||
size_t get_hash() { return current_it.get_hash(); }
|
||||
};
|
||||
|
||||
class const_iterator /// NOLINT
|
||||
@ -398,6 +402,17 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) {
|
||||
if constexpr (HashTableTraits<Impl>::is_phmap) {
|
||||
if (_is_partitioned) {
|
||||
const auto sub_table_idx = get_sub_table_from_hash(hash_value);
|
||||
level1_sub_tables[sub_table_idx].prefetch_by_hash(hash_value);
|
||||
} else {
|
||||
level0_sub_table.prefetch_by_hash(hash_value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <bool READ, typename KeyHolder>
|
||||
void ALWAYS_INLINE prefetch(KeyHolder& key_holder) {
|
||||
if (_is_partitioned) {
|
||||
@ -457,6 +472,32 @@ public:
|
||||
emplace(key_holder, it, inserted, hash_value);
|
||||
}
|
||||
|
||||
template <typename KeyHolder, typename Func>
|
||||
void ALWAYS_INLINE lazy_emplace(KeyHolder&& key_holder, LookupResult& it, Func&& f) {
|
||||
size_t hash_value = hash(key_holder_get_key(key_holder));
|
||||
lazy_emplace(key_holder, it, hash_value, std::forward<Func>(f));
|
||||
}
|
||||
|
||||
template <typename KeyHolder, typename Func>
|
||||
void ALWAYS_INLINE lazy_emplace(KeyHolder&& key_holder, LookupResult& it, size_t hash_value,
|
||||
Func&& f) {
|
||||
if (_is_partitioned) {
|
||||
size_t sub_table_idx = get_sub_table_from_hash(hash_value);
|
||||
level1_sub_tables[sub_table_idx].lazy_emplace(key_holder, it, hash_value,
|
||||
std::forward<Func>(f));
|
||||
} else {
|
||||
level0_sub_table.lazy_emplace(key_holder, it, hash_value, std::forward<Func>(f));
|
||||
if (UNLIKELY(level0_sub_table.need_partition())) {
|
||||
convert_to_partitioned();
|
||||
|
||||
// The hash table was converted to partitioned, so we have to re-find the key.
|
||||
size_t sub_table_id = get_sub_table_from_hash(hash_value);
|
||||
it = level1_sub_tables[sub_table_id].find(key_holder_get_key(key_holder),
|
||||
hash_value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LookupResult ALWAYS_INLINE find(Key x, size_t hash_value) {
|
||||
if (_is_partitioned) {
|
||||
size_t sub_table_idx = get_sub_table_from_hash(hash_value);
|
||||
@ -506,6 +547,10 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
bool add_elem_size_overflow(size_t row) const {
|
||||
return !_is_partitioned && level0_sub_table.add_elem_size_overflow(row);
|
||||
}
|
||||
|
||||
private:
|
||||
void convert_to_partitioned() {
|
||||
SCOPED_RAW_TIMER(&_convert_timer_ns);
|
||||
@ -517,17 +562,26 @@ private:
|
||||
|
||||
auto it = level0_sub_table.begin();
|
||||
|
||||
/// It is assumed that the zero key (stored separately) is first in iteration order.
|
||||
if (it != level0_sub_table.end() && it.get_ptr()->is_zero(level0_sub_table)) {
|
||||
insert(it->get_value());
|
||||
++it;
|
||||
}
|
||||
if constexpr (HashTableTraits<Impl>::is_phmap) {
|
||||
for (; it != level0_sub_table.end(); ++it) {
|
||||
size_t hash_value = level0_sub_table.hash(it.get_first());
|
||||
size_t sub_table_idx = get_sub_table_from_hash(hash_value);
|
||||
level1_sub_tables[sub_table_idx].insert(it.get_first(), hash_value,
|
||||
it.get_second());
|
||||
}
|
||||
} else {
|
||||
/// It is assumed that the zero key (stored separately) is first in iteration order.
|
||||
if (it != level0_sub_table.end() && it.get_ptr()->is_zero(level0_sub_table)) {
|
||||
insert(it->get_value());
|
||||
++it;
|
||||
}
|
||||
|
||||
for (; it != level0_sub_table.end(); ++it) {
|
||||
const auto* cell = it.get_ptr();
|
||||
size_t hash_value = cell->get_hash(level0_sub_table);
|
||||
size_t sub_table_idx = get_sub_table_from_hash(hash_value);
|
||||
level1_sub_tables[sub_table_idx].insert_unique_non_zero(cell, hash_value);
|
||||
for (; it != level0_sub_table.end(); ++it) {
|
||||
const auto* cell = it.get_ptr();
|
||||
size_t hash_value = cell->get_hash(level0_sub_table);
|
||||
size_t sub_table_idx = get_sub_table_from_hash(hash_value);
|
||||
level1_sub_tables[sub_table_idx].insert_unique_non_zero(cell, hash_value);
|
||||
}
|
||||
}
|
||||
|
||||
_is_partitioned = true;
|
||||
|
||||
@ -19,6 +19,8 @@
|
||||
|
||||
#include <parallel_hashmap/phmap.h>
|
||||
|
||||
#include <boost/noncopyable.hpp>
|
||||
|
||||
#include "vec/common/hash_table/hash.h"
|
||||
#include "vec/common/hash_table/hash_table_utils.h"
|
||||
|
||||
@ -27,24 +29,40 @@ ALWAYS_INLINE inline auto lookup_result_get_mapped(std::pair<const Key, Mapped>*
|
||||
return &(it->second);
|
||||
}
|
||||
|
||||
template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
|
||||
bool use_parallel = false>
|
||||
template <typename Key, typename Mapped, typename HashMethod = DefaultHash<Key>,
|
||||
bool PartitionedHashTable = false>
|
||||
class PHHashMap : private boost::noncopyable {
|
||||
public:
|
||||
using Self = PHHashMap;
|
||||
using HashMapImpl =
|
||||
std::conditional_t<use_parallel, phmap::parallel_flat_hash_map<Key, Mapped, Hash>,
|
||||
phmap::flat_hash_map<Key, Mapped, Hash>>;
|
||||
using Hash = HashMethod;
|
||||
using cell_type = std::pair<const Key, Mapped>;
|
||||
using HashMapImpl = phmap::flat_hash_map<Key, Mapped, Hash>;
|
||||
|
||||
using key_type = Key;
|
||||
using mapped_type = Mapped;
|
||||
using value_type = std::pair<const Key, Mapped>;
|
||||
|
||||
using LookupResult = std::pair<const Key, Mapped>*;
|
||||
using ConstLookupResult = const std::pair<const Key, Mapped>*;
|
||||
|
||||
using const_iterator_impl = typename HashMapImpl::const_iterator;
|
||||
using iterator_impl = typename HashMapImpl::iterator;
|
||||
|
||||
PHHashMap() = default;
|
||||
|
||||
PHHashMap(size_t reserve_for_num_elements) { _hash_map.reserve(reserve_for_num_elements); }
|
||||
|
||||
PHHashMap(PHHashMap&& other) { *this = std::move(other); }
|
||||
|
||||
PHHashMap& operator=(PHHashMap&& rhs) {
|
||||
_hash_map.clear();
|
||||
_hash_map = std::move(rhs._hash_map);
|
||||
std::swap(_need_partition, rhs._need_partition);
|
||||
std::swap(_partitioned_threshold, rhs._partitioned_threshold);
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
template <typename Derived, bool is_const>
|
||||
class iterator_base {
|
||||
using BaseIterator = std::conditional_t<is_const, const_iterator_impl, iterator_impl>;
|
||||
@ -80,7 +98,7 @@ public:
|
||||
|
||||
auto& get_second() { return base_iterator->second; }
|
||||
|
||||
auto get_ptr() const { return *base_iterator; }
|
||||
auto get_ptr() const { return this; }
|
||||
size_t get_hash() const { return base_iterator->get_hash(); }
|
||||
|
||||
size_t get_collision_chain_length() const { return 0; }
|
||||
@ -110,22 +128,28 @@ public:
|
||||
void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, bool& inserted) {
|
||||
const auto& key = key_holder_get_key(key_holder);
|
||||
inserted = false;
|
||||
auto it_ = _hash_map.lazy_emplace(key, [&](const auto& ctor) {
|
||||
it = &*_hash_map.lazy_emplace(key, [&](const auto& ctor) {
|
||||
inserted = true;
|
||||
key_holder_persist_key(key_holder);
|
||||
ctor(key_holder_get_key(key_holder), nullptr);
|
||||
});
|
||||
it = &*it_;
|
||||
|
||||
if constexpr (PartitionedHashTable) {
|
||||
_check_if_need_partition();
|
||||
}
|
||||
}
|
||||
|
||||
template <typename KeyHolder, typename Func>
|
||||
void ALWAYS_INLINE lazy_emplace(KeyHolder&& key_holder, LookupResult& it, Func&& f) {
|
||||
const auto& key = key_holder_get_key(key_holder);
|
||||
auto it_ = _hash_map.lazy_emplace(key, [&](const auto& ctor) {
|
||||
it = &*_hash_map.lazy_emplace(key, [&](const auto& ctor) {
|
||||
key_holder_persist_key(key_holder);
|
||||
f(ctor, key);
|
||||
});
|
||||
it = &*it_;
|
||||
|
||||
if constexpr (PartitionedHashTable) {
|
||||
_check_if_need_partition();
|
||||
}
|
||||
}
|
||||
|
||||
template <typename KeyHolder>
|
||||
@ -133,20 +157,14 @@ public:
|
||||
bool& inserted) {
|
||||
const auto& key = key_holder_get_key(key_holder);
|
||||
inserted = false;
|
||||
if constexpr (use_parallel) {
|
||||
auto it_ = _hash_map.lazy_emplace_with_hash(hash_value, key, [&](const auto& ctor) {
|
||||
inserted = true;
|
||||
key_holder_persist_key(key_holder);
|
||||
ctor(key, nullptr);
|
||||
});
|
||||
it = &*it_;
|
||||
} else {
|
||||
auto it_ = _hash_map.lazy_emplace_with_hash(key, hash_value, [&](const auto& ctor) {
|
||||
inserted = true;
|
||||
key_holder_persist_key(key_holder);
|
||||
ctor(key, nullptr);
|
||||
});
|
||||
it = &*it_;
|
||||
it = &*_hash_map.lazy_emplace_with_hash(key, hash_value, [&](const auto& ctor) {
|
||||
inserted = true;
|
||||
key_holder_persist_key(key_holder);
|
||||
ctor(key, nullptr);
|
||||
});
|
||||
|
||||
if constexpr (PartitionedHashTable) {
|
||||
_check_if_need_partition();
|
||||
}
|
||||
}
|
||||
|
||||
@ -154,21 +172,23 @@ public:
|
||||
void ALWAYS_INLINE lazy_emplace(KeyHolder&& key_holder, LookupResult& it, size_t hash_value,
|
||||
Func&& f) {
|
||||
const auto& key = key_holder_get_key(key_holder);
|
||||
if constexpr (use_parallel) {
|
||||
auto it_ = _hash_map.lazy_emplace_with_hash(hash_value, key, [&](const auto& ctor) {
|
||||
key_holder_persist_key(key_holder);
|
||||
f(ctor, key);
|
||||
});
|
||||
it = &*it_;
|
||||
} else {
|
||||
auto it_ = _hash_map.lazy_emplace_with_hash(key, hash_value, [&](const auto& ctor) {
|
||||
key_holder_persist_key(key_holder);
|
||||
f(ctor, key);
|
||||
});
|
||||
it = &*it_;
|
||||
|
||||
it = &*_hash_map.lazy_emplace_with_hash(key, hash_value, [&](const auto& ctor) {
|
||||
key_holder_persist_key(key_holder);
|
||||
f(ctor, key);
|
||||
});
|
||||
|
||||
if constexpr (PartitionedHashTable) {
|
||||
_check_if_need_partition();
|
||||
}
|
||||
}
|
||||
|
||||
void ALWAYS_INLINE insert(const Key& key, size_t hash_value, const Mapped& value) {
|
||||
auto it = &*_hash_map.lazy_emplace_with_hash(key, hash_value,
|
||||
[&](const auto& ctor) { ctor(key, value); });
|
||||
it->second = value;
|
||||
}
|
||||
|
||||
template <typename KeyHolder>
|
||||
LookupResult ALWAYS_INLINE find(KeyHolder&& key_holder) {
|
||||
const auto& key = key_holder_get_key(key_holder);
|
||||
@ -185,9 +205,7 @@ public:
|
||||
|
||||
size_t hash(const Key& x) const { return _hash_map.hash(x); }
|
||||
|
||||
void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) {
|
||||
if constexpr (!use_parallel) _hash_map.prefetch_hash(hash_value);
|
||||
}
|
||||
void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) { _hash_map.prefetch_hash(hash_value); }
|
||||
|
||||
void ALWAYS_INLINE prefetch_by_key(Key key) { _hash_map.prefetch(key); }
|
||||
|
||||
@ -208,6 +226,8 @@ public:
|
||||
return capacity * sizeof(typename HashMapImpl::slot_type);
|
||||
}
|
||||
|
||||
size_t get_buffer_size_in_cells() const { return _hash_map.capacity(); }
|
||||
|
||||
bool add_elem_size_overflow(size_t row) const {
|
||||
const auto capacity = _hash_map.capacity();
|
||||
// phmap use 7/8th as maximum load factor.
|
||||
@ -219,12 +239,43 @@ public:
|
||||
char* get_null_key_data() { return nullptr; }
|
||||
bool has_null_key_data() const { return false; }
|
||||
|
||||
bool need_partition() { return _need_partition; }
|
||||
|
||||
void set_partitioned_threshold(int threshold) { _partitioned_threshold = threshold; }
|
||||
|
||||
bool check_if_need_partition(size_t bucket_count) {
|
||||
if constexpr (PartitionedHashTable) {
|
||||
return _partitioned_threshold > 0 && bucket_count >= _partitioned_threshold;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool empty() const { return _hash_map.empty(); }
|
||||
|
||||
void clear_and_shrink() { _hash_map.clear(); }
|
||||
|
||||
private:
|
||||
void _check_if_need_partition() {
|
||||
if (UNLIKELY(check_if_need_partition(_hash_map.size() + 1))) {
|
||||
_need_partition = add_elem_size_overflow(1);
|
||||
}
|
||||
}
|
||||
|
||||
HashMapImpl _hash_map;
|
||||
// the bucket count threshold above which it's converted to partioned hash table
|
||||
// > 0: enable convert dynamically
|
||||
// 0: convert is disabled
|
||||
int _partitioned_threshold = 0;
|
||||
// if need resize and bucket count after resize will be >= _partitioned_threshold,
|
||||
// this flag is set to true, and resize does not actually happen,
|
||||
// PartitionedHashTable will convert this hash table to partitioned hash table
|
||||
bool _need_partition;
|
||||
};
|
||||
|
||||
template <typename Key, typename Mapped, typename Hash, bool use_parallel>
|
||||
struct HashTableTraits<PHHashMap<Key, Mapped, Hash, use_parallel>> {
|
||||
template <typename Key, typename Mapped, typename Hash, bool PartitionedHashTable>
|
||||
struct HashTableTraits<PHHashMap<Key, Mapped, Hash, PartitionedHashTable>> {
|
||||
static constexpr bool is_phmap = true;
|
||||
static constexpr bool is_parallel_phmap = use_parallel;
|
||||
static constexpr bool is_string_hash_table = false;
|
||||
static constexpr bool is_partitioned_table = false;
|
||||
};
|
||||
|
||||
@ -213,13 +213,13 @@ public:
|
||||
template <typename TMapped, typename Allocator>
|
||||
struct HashTableTraits<StringHashMap<TMapped, Allocator>> {
|
||||
static constexpr bool is_phmap = false;
|
||||
static constexpr bool is_parallel_phmap = false;
|
||||
static constexpr bool is_string_hash_table = true;
|
||||
static constexpr bool is_partitioned_table = false;
|
||||
};
|
||||
|
||||
template <template <typename> class Derived, typename TMapped, typename Allocator>
|
||||
struct HashTableTraits<Derived<StringHashMap<TMapped, Allocator>>> {
|
||||
static constexpr bool is_phmap = false;
|
||||
static constexpr bool is_parallel_phmap = false;
|
||||
static constexpr bool is_string_hash_table = true;
|
||||
static constexpr bool is_partitioned_table = false;
|
||||
};
|
||||
|
||||
@ -24,6 +24,7 @@
|
||||
#include <variant>
|
||||
|
||||
#include "vec/common/hash_table/hash.h"
|
||||
#include "vec/common/hash_table/hash_table_utils.h"
|
||||
|
||||
using StringKey8 = doris::vectorized::UInt64;
|
||||
using StringKey16 = doris::vectorized::UInt128;
|
||||
@ -662,6 +663,6 @@ public:
|
||||
template <typename SubMaps>
|
||||
struct HashTableTraits<StringHashTable<SubMaps>> {
|
||||
static constexpr bool is_phmap = false;
|
||||
static constexpr bool is_parallel_phmap = false;
|
||||
static constexpr bool is_string_hash_table = true;
|
||||
static constexpr bool is_partitioned_table = false;
|
||||
};
|
||||
|
||||
@ -140,6 +140,9 @@ Status AggregationNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
_aggregate_evaluators.push_back(evaluator);
|
||||
}
|
||||
|
||||
_partitioned_hash_table_enabled = state->partitioned_hash_agg_rows_threshold() > 0;
|
||||
_agg_data->set_enable_partitioned_hash_table(_partitioned_hash_table_enabled);
|
||||
|
||||
const auto& agg_functions = tnode.agg_node.aggregate_functions;
|
||||
_is_merge = std::any_of(agg_functions.cbegin(), agg_functions.cend(),
|
||||
[](const auto& e) { return e.nodes[0].agg_expr.is_merge_agg; });
|
||||
@ -296,6 +299,7 @@ Status AggregationNode::prepare_profile(RuntimeState* state) {
|
||||
memory_usage->AddHighWaterMarkCounter("SerializeKeyArena", TUnit::BYTES);
|
||||
|
||||
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
|
||||
_build_table_convert_timer = ADD_TIMER(runtime_profile(), "BuildConvertToPartitionedTime");
|
||||
_serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime");
|
||||
_exec_timer = ADD_TIMER(runtime_profile(), "ExecTime");
|
||||
_merge_timer = ADD_TIMER(runtime_profile(), "MergeTime");
|
||||
@ -317,6 +321,9 @@ Status AggregationNode::prepare_profile(RuntimeState* state) {
|
||||
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
|
||||
RETURN_IF_ERROR(VExpr::prepare(_probe_expr_ctxs, state, child(0)->row_desc()));
|
||||
|
||||
runtime_profile()->add_info_string("PartitionedHashTableEnabled",
|
||||
_partitioned_hash_table_enabled ? "true" : "false");
|
||||
|
||||
_mem_pool = std::make_unique<MemPool>();
|
||||
|
||||
int j = _probe_expr_ctxs.size();
|
||||
@ -407,6 +414,10 @@ Status AggregationNode::prepare_profile(RuntimeState* state) {
|
||||
((_total_size_of_aggregate_states + _align_aggregate_states - 1) /
|
||||
_align_aggregate_states) *
|
||||
_align_aggregate_states));
|
||||
if constexpr (HashTableTraits<HashTableType>::is_partitioned_table) {
|
||||
agg_method.data.set_partitioned_threshold(
|
||||
state->partitioned_hash_agg_rows_threshold());
|
||||
}
|
||||
},
|
||||
_agg_data->_aggregated_method_variant);
|
||||
if (_is_merge) {
|
||||
@ -572,6 +583,11 @@ void AggregationNode::release_resource(RuntimeState* state) {
|
||||
if (_hash_table_size_counter) {
|
||||
std::visit(
|
||||
[&](auto&& agg_method) {
|
||||
using HashTableType = std::decay_t<decltype(agg_method.data)>;
|
||||
if constexpr (HashTableTraits<HashTableType>::is_partitioned_table) {
|
||||
COUNTER_UPDATE(_build_table_convert_timer,
|
||||
agg_method.data.get_convert_timer_value());
|
||||
}
|
||||
COUNTER_SET(_hash_table_size_counter, int64_t(agg_method.data.size()));
|
||||
},
|
||||
_agg_data->_aggregated_method_variant);
|
||||
@ -893,18 +909,14 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
|
||||
AggregateDataPtr mapped = nullptr;
|
||||
if constexpr (HashTableTraits<HashTableType>::is_phmap) {
|
||||
if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) {
|
||||
if constexpr (HashTableTraits<HashTableType>::is_parallel_phmap) {
|
||||
agg_method.data.prefetch_by_key(state.get_key_holder(
|
||||
i + HASH_MAP_PREFETCH_DIST, *_agg_arena_pool));
|
||||
} else
|
||||
agg_method.data.prefetch_by_hash(
|
||||
_hash_values[i + HASH_MAP_PREFETCH_DIST]);
|
||||
agg_method.data.prefetch_by_hash(
|
||||
_hash_values[i + HASH_MAP_PREFETCH_DIST]);
|
||||
}
|
||||
|
||||
if constexpr (ColumnsHashing::IsSingleNullableColumnMethod<
|
||||
AggState>::value) {
|
||||
mapped = state.lazy_emplace_key(agg_method.data, _hash_values[i], i,
|
||||
*_agg_arena_pool, creator,
|
||||
mapped = state.lazy_emplace_key(agg_method.data, i, *_agg_arena_pool,
|
||||
_hash_values[i], creator,
|
||||
creator_for_null_key);
|
||||
} else {
|
||||
mapped = state.lazy_emplace_key(agg_method.data, _hash_values[i], i,
|
||||
@ -959,16 +971,12 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr
|
||||
auto find_result = [&]() {
|
||||
if constexpr (HashTableTraits<HashTableType>::is_phmap) {
|
||||
if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) {
|
||||
if constexpr (HashTableTraits<HashTableType>::is_parallel_phmap) {
|
||||
agg_method.data.prefetch_by_key(state.get_key_holder(
|
||||
i + HASH_MAP_PREFETCH_DIST, *_agg_arena_pool));
|
||||
} else
|
||||
agg_method.data.prefetch_by_hash(
|
||||
_hash_values[i + HASH_MAP_PREFETCH_DIST]);
|
||||
agg_method.data.prefetch_by_hash(
|
||||
_hash_values[i + HASH_MAP_PREFETCH_DIST]);
|
||||
}
|
||||
|
||||
return state.find_key(agg_method.data, _hash_values[i], i,
|
||||
*_agg_arena_pool);
|
||||
return state.find_key_with_hash(agg_method.data, _hash_values[i], i,
|
||||
*_agg_arena_pool);
|
||||
} else {
|
||||
return state.find_key(agg_method.data, i, *_agg_arena_pool);
|
||||
}
|
||||
|
||||
@ -25,6 +25,7 @@
|
||||
#include "vec/aggregate_functions/aggregate_function.h"
|
||||
#include "vec/common/columns_hashing.h"
|
||||
#include "vec/common/hash_table/fixed_hash_map.h"
|
||||
#include "vec/common/hash_table/partitioned_hash_map.h"
|
||||
#include "vec/common/hash_table/string_hash_map.h"
|
||||
#include "vec/exprs/vectorized_agg_fn.h"
|
||||
#include "vec/exprs/vslot_ref.h"
|
||||
@ -145,6 +146,8 @@ private:
|
||||
|
||||
using AggregatedDataWithoutKey = AggregateDataPtr;
|
||||
using AggregatedDataWithStringKey = PHHashMap<StringRef, AggregateDataPtr, DefaultHash<StringRef>>;
|
||||
using PartitionedAggregatedDataWithStringKey =
|
||||
PHPartitionedHashMap<StringRef, AggregateDataPtr, DefaultHash<StringRef>>;
|
||||
using AggregatedDataWithShortStringKey = StringHashMap<AggregateDataPtr>;
|
||||
|
||||
template <typename TData>
|
||||
@ -404,6 +407,23 @@ using AggregatedDataWithUInt128KeyPhase2 =
|
||||
using AggregatedDataWithUInt256KeyPhase2 =
|
||||
PHHashMap<UInt256, AggregateDataPtr, HashMixWrapper<UInt256>>;
|
||||
|
||||
using PartitionedAggregatedDataWithUInt32Key =
|
||||
PHPartitionedHashMap<UInt32, AggregateDataPtr, HashCRC32<UInt32>>;
|
||||
using PartitionedAggregatedDataWithUInt64Key =
|
||||
PHPartitionedHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>;
|
||||
using PartitionedAggregatedDataWithUInt128Key =
|
||||
PHPartitionedHashMap<UInt128, AggregateDataPtr, HashCRC32<UInt128>>;
|
||||
using PartitionedAggregatedDataWithUInt256Key =
|
||||
PHPartitionedHashMap<UInt256, AggregateDataPtr, HashCRC32<UInt256>>;
|
||||
using PartitionedAggregatedDataWithUInt32KeyPhase2 =
|
||||
PHPartitionedHashMap<UInt32, AggregateDataPtr, HashMixWrapper<UInt32>>;
|
||||
using PartitionedAggregatedDataWithUInt64KeyPhase2 =
|
||||
PHPartitionedHashMap<UInt64, AggregateDataPtr, HashMixWrapper<UInt64>>;
|
||||
using PartitionedAggregatedDataWithUInt128KeyPhase2 =
|
||||
PHPartitionedHashMap<UInt128, AggregateDataPtr, HashMixWrapper<UInt128>>;
|
||||
using PartitionedAggregatedDataWithUInt256KeyPhase2 =
|
||||
PHPartitionedHashMap<UInt256, AggregateDataPtr, HashMixWrapper<UInt256>>;
|
||||
|
||||
using AggregatedDataWithNullableUInt8Key = AggregationDataWithNullKey<AggregatedDataWithUInt8Key>;
|
||||
using AggregatedDataWithNullableUInt16Key = AggregationDataWithNullKey<AggregatedDataWithUInt16Key>;
|
||||
using AggregatedDataWithNullableUInt32Key = AggregationDataWithNullKey<AggregatedDataWithUInt32Key>;
|
||||
@ -419,6 +439,19 @@ using AggregatedDataWithNullableUInt128Key =
|
||||
using AggregatedDataWithNullableUInt128KeyPhase2 =
|
||||
AggregationDataWithNullKey<AggregatedDataWithUInt128KeyPhase2>;
|
||||
|
||||
using PartitionedAggregatedDataWithNullableUInt32Key =
|
||||
AggregationDataWithNullKey<PartitionedAggregatedDataWithUInt32Key>;
|
||||
using PartitionedAggregatedDataWithNullableUInt64Key =
|
||||
AggregationDataWithNullKey<PartitionedAggregatedDataWithUInt64Key>;
|
||||
using PartitionedAggregatedDataWithNullableUInt32KeyPhase2 =
|
||||
AggregationDataWithNullKey<PartitionedAggregatedDataWithUInt32KeyPhase2>;
|
||||
using PartitionedAggregatedDataWithNullableUInt64KeyPhase2 =
|
||||
AggregationDataWithNullKey<PartitionedAggregatedDataWithUInt64KeyPhase2>;
|
||||
using PartitionedAggregatedDataWithNullableUInt128Key =
|
||||
AggregationDataWithNullKey<PartitionedAggregatedDataWithUInt128Key>;
|
||||
using PartitionedAggregatedDataWithNullableUInt128KeyPhase2 =
|
||||
AggregationDataWithNullKey<PartitionedAggregatedDataWithUInt128KeyPhase2>;
|
||||
|
||||
using AggregatedMethodVariants = std::variant<
|
||||
AggregationMethodSerialized<AggregatedDataWithStringKey>,
|
||||
AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key>,
|
||||
@ -459,7 +492,38 @@ using AggregatedMethodVariants = std::variant<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, false>,
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, true>,
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, false>,
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, true>>;
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, true>,
|
||||
AggregationMethodSerialized<PartitionedAggregatedDataWithStringKey>,
|
||||
AggregationMethodOneNumber<UInt32, PartitionedAggregatedDataWithUInt32Key>,
|
||||
AggregationMethodOneNumber<UInt64, PartitionedAggregatedDataWithUInt64Key>,
|
||||
AggregationMethodOneNumber<UInt128, PartitionedAggregatedDataWithUInt128Key>,
|
||||
AggregationMethodOneNumber<UInt32, PartitionedAggregatedDataWithUInt32KeyPhase2>,
|
||||
AggregationMethodOneNumber<UInt64, PartitionedAggregatedDataWithUInt64KeyPhase2>,
|
||||
AggregationMethodOneNumber<UInt128, PartitionedAggregatedDataWithUInt128KeyPhase2>,
|
||||
AggregationMethodSingleNullableColumn<
|
||||
AggregationMethodOneNumber<UInt32, PartitionedAggregatedDataWithNullableUInt32Key>>,
|
||||
AggregationMethodSingleNullableColumn<
|
||||
AggregationMethodOneNumber<UInt64, PartitionedAggregatedDataWithNullableUInt64Key>>,
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt32, PartitionedAggregatedDataWithNullableUInt32KeyPhase2>>,
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt64, PartitionedAggregatedDataWithNullableUInt64KeyPhase2>>,
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt128, PartitionedAggregatedDataWithNullableUInt128Key>>,
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt128, PartitionedAggregatedDataWithNullableUInt128KeyPhase2>>,
|
||||
AggregationMethodKeysFixed<PartitionedAggregatedDataWithUInt64Key, false>,
|
||||
AggregationMethodKeysFixed<PartitionedAggregatedDataWithUInt64Key, true>,
|
||||
AggregationMethodKeysFixed<PartitionedAggregatedDataWithUInt128Key, false>,
|
||||
AggregationMethodKeysFixed<PartitionedAggregatedDataWithUInt128Key, true>,
|
||||
AggregationMethodKeysFixed<PartitionedAggregatedDataWithUInt256Key, false>,
|
||||
AggregationMethodKeysFixed<PartitionedAggregatedDataWithUInt256Key, true>,
|
||||
AggregationMethodKeysFixed<PartitionedAggregatedDataWithUInt64KeyPhase2, false>,
|
||||
AggregationMethodKeysFixed<PartitionedAggregatedDataWithUInt64KeyPhase2, true>,
|
||||
AggregationMethodKeysFixed<PartitionedAggregatedDataWithUInt128KeyPhase2, false>,
|
||||
AggregationMethodKeysFixed<PartitionedAggregatedDataWithUInt128KeyPhase2, true>,
|
||||
AggregationMethodKeysFixed<PartitionedAggregatedDataWithUInt256KeyPhase2, false>,
|
||||
AggregationMethodKeysFixed<PartitionedAggregatedDataWithUInt256KeyPhase2, true>>;
|
||||
|
||||
struct AggregatedDataVariants {
|
||||
AggregatedDataVariants() = default;
|
||||
@ -467,6 +531,7 @@ struct AggregatedDataVariants {
|
||||
AggregatedDataVariants& operator=(const AggregatedDataVariants&) = delete;
|
||||
AggregatedDataWithoutKey without_key = nullptr;
|
||||
AggregatedMethodVariants _aggregated_method_variant;
|
||||
bool _enable_partitioned_hash_table = false;
|
||||
|
||||
// TODO: may we should support uint256 in the future
|
||||
enum class Type {
|
||||
@ -492,14 +557,23 @@ struct AggregatedDataVariants {
|
||||
|
||||
Type _type = Type::EMPTY;
|
||||
|
||||
void set_enable_partitioned_hash_table(bool enabled) {
|
||||
_enable_partitioned_hash_table = enabled;
|
||||
}
|
||||
|
||||
void init(Type type, bool is_nullable = false) {
|
||||
_type = type;
|
||||
switch (_type) {
|
||||
case Type::without_key:
|
||||
break;
|
||||
case Type::serialized:
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodSerialized<AggregatedDataWithStringKey>>();
|
||||
if (_enable_partitioned_hash_table) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodSerialized<PartitionedAggregatedDataWithStringKey>>();
|
||||
} else {
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodSerialized<AggregatedDataWithStringKey>>();
|
||||
}
|
||||
break;
|
||||
case Type::int8_key:
|
||||
if (is_nullable) {
|
||||
@ -520,115 +594,246 @@ struct AggregatedDataVariants {
|
||||
}
|
||||
break;
|
||||
case Type::int32_key:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodSingleNullableColumn<
|
||||
AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32Key>>>();
|
||||
if (_enable_partitioned_hash_table) {
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt32, PartitionedAggregatedDataWithNullableUInt32Key>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodOneNumber<
|
||||
UInt32, PartitionedAggregatedDataWithUInt32Key>>();
|
||||
}
|
||||
} else {
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt32Key>>();
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt32, AggregatedDataWithNullableUInt32Key>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt32Key>>();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::int32_key_phase2:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt32, AggregatedDataWithNullableUInt32KeyPhase2>>>();
|
||||
if (_enable_partitioned_hash_table) {
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt32,
|
||||
PartitionedAggregatedDataWithNullableUInt32KeyPhase2>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodOneNumber<
|
||||
UInt32, PartitionedAggregatedDataWithUInt32KeyPhase2>>();
|
||||
}
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt32KeyPhase2>>();
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt32, AggregatedDataWithNullableUInt32KeyPhase2>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodOneNumber<
|
||||
UInt32, AggregatedDataWithUInt32KeyPhase2>>();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::int64_key:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodSingleNullableColumn<
|
||||
AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>>();
|
||||
if (_enable_partitioned_hash_table) {
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt64, PartitionedAggregatedDataWithNullableUInt64Key>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodOneNumber<
|
||||
UInt64, PartitionedAggregatedDataWithUInt64Key>>();
|
||||
}
|
||||
} else {
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>>();
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt64, AggregatedDataWithNullableUInt64Key>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>>();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::int64_key_phase2:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt64, AggregatedDataWithNullableUInt64KeyPhase2>>>();
|
||||
if (_enable_partitioned_hash_table) {
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt64,
|
||||
PartitionedAggregatedDataWithNullableUInt64KeyPhase2>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodOneNumber<
|
||||
UInt64, PartitionedAggregatedDataWithUInt64KeyPhase2>>();
|
||||
}
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyPhase2>>();
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt64, AggregatedDataWithNullableUInt64KeyPhase2>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodOneNumber<
|
||||
UInt64, AggregatedDataWithUInt64KeyPhase2>>();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::int128_key:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt128, AggregatedDataWithNullableUInt128Key>>>();
|
||||
if (_enable_partitioned_hash_table) {
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt128, PartitionedAggregatedDataWithNullableUInt128Key>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodOneNumber<
|
||||
UInt128, PartitionedAggregatedDataWithUInt128Key>>();
|
||||
}
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodOneNumber<UInt128, AggregatedDataWithUInt128Key>>();
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt128, AggregatedDataWithNullableUInt128Key>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodOneNumber<UInt128, AggregatedDataWithUInt128Key>>();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::int128_key_phase2:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt128, AggregatedDataWithNullableUInt128KeyPhase2>>>();
|
||||
if (_enable_partitioned_hash_table) {
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt128,
|
||||
PartitionedAggregatedDataWithNullableUInt128KeyPhase2>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodOneNumber<
|
||||
UInt128, PartitionedAggregatedDataWithUInt128KeyPhase2>>();
|
||||
}
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodOneNumber<UInt128, AggregatedDataWithUInt128KeyPhase2>>();
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
|
||||
UInt128, AggregatedDataWithNullableUInt128KeyPhase2>>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodOneNumber<
|
||||
UInt128, AggregatedDataWithUInt128KeyPhase2>>();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::int64_keys:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, true>>();
|
||||
if (_enable_partitioned_hash_table) {
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<
|
||||
PartitionedAggregatedDataWithUInt64Key, true>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<
|
||||
PartitionedAggregatedDataWithUInt64Key, false>>();
|
||||
}
|
||||
} else {
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, false>>();
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, true>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, false>>();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::int64_keys_phase2:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, true>>();
|
||||
if (_enable_partitioned_hash_table) {
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<
|
||||
PartitionedAggregatedDataWithUInt64KeyPhase2, true>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<
|
||||
PartitionedAggregatedDataWithUInt64KeyPhase2, false>>();
|
||||
}
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, false>>();
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, true>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, false>>();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::int128_keys:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, true>>();
|
||||
if (_enable_partitioned_hash_table) {
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<
|
||||
PartitionedAggregatedDataWithUInt128Key, true>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<
|
||||
PartitionedAggregatedDataWithUInt128Key, false>>();
|
||||
}
|
||||
} else {
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, false>>();
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, true>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, false>>();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::int128_keys_phase2:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, true>>();
|
||||
if (_enable_partitioned_hash_table) {
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<
|
||||
PartitionedAggregatedDataWithUInt128KeyPhase2, true>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<
|
||||
PartitionedAggregatedDataWithUInt128KeyPhase2, false>>();
|
||||
}
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, false>>();
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, true>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<
|
||||
AggregatedDataWithUInt128KeyPhase2, false>>();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::int256_keys:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, true>>();
|
||||
if (_enable_partitioned_hash_table) {
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<
|
||||
PartitionedAggregatedDataWithUInt256Key, true>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<
|
||||
PartitionedAggregatedDataWithUInt256Key, false>>();
|
||||
}
|
||||
} else {
|
||||
_aggregated_method_variant
|
||||
.emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, false>>();
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, true>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, false>>();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::int256_keys_phase2:
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, true>>();
|
||||
if (_enable_partitioned_hash_table) {
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<
|
||||
PartitionedAggregatedDataWithUInt256KeyPhase2, true>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<
|
||||
PartitionedAggregatedDataWithUInt256KeyPhase2, false>>();
|
||||
}
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, false>>();
|
||||
if (is_nullable) {
|
||||
_aggregated_method_variant.emplace<
|
||||
AggregationMethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, true>>();
|
||||
} else {
|
||||
_aggregated_method_variant.emplace<AggregationMethodKeysFixed<
|
||||
AggregatedDataWithUInt256KeyPhase2, false>>();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::string_key:
|
||||
@ -688,7 +893,7 @@ public:
|
||||
friend class HashTable;
|
||||
|
||||
public:
|
||||
IteratorBase() {}
|
||||
IteratorBase() = default;
|
||||
IteratorBase(Container* container_, uint32_t index_)
|
||||
: container(container_), index(index_) {
|
||||
sub_container_index = index / SUB_CONTAINER_CAPACITY;
|
||||
@ -816,6 +1021,7 @@ private:
|
||||
bool _is_merge;
|
||||
bool _is_first_phase;
|
||||
bool _use_fixed_length_serialization_opt;
|
||||
bool _partitioned_hash_table_enabled;
|
||||
std::unique_ptr<MemPool> _mem_pool;
|
||||
|
||||
size_t _align_aggregate_states = 1;
|
||||
@ -829,6 +1035,7 @@ private:
|
||||
ArenaUPtr _agg_arena_pool;
|
||||
|
||||
RuntimeProfile::Counter* _build_timer;
|
||||
RuntimeProfile::Counter* _build_table_convert_timer;
|
||||
RuntimeProfile::Counter* _serialize_key_timer;
|
||||
RuntimeProfile::Counter* _exec_timer;
|
||||
RuntimeProfile::Counter* _merge_timer;
|
||||
|
||||
@ -244,6 +244,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public static final String INTERNAL_SESSION = "internal_session";
|
||||
|
||||
public static final String PARTITIONED_HASH_JOIN_ROWS_THRESHOLD = "partitioned_hash_join_rows_threshold";
|
||||
public static final String PARTITIONED_HASH_AGG_ROWS_THRESHOLD = "partitioned_hash_agg_rows_threshold";
|
||||
|
||||
public static final String ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN
|
||||
= "enable_share_hash_table_for_broadcast_join";
|
||||
@ -647,6 +648,10 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = PARTITIONED_HASH_JOIN_ROWS_THRESHOLD)
|
||||
public int partitionedHashJoinRowsThreshold = 0;
|
||||
|
||||
// Use partitioned hash join if build side row count >= the threshold . 0 - the threshold is not set.
|
||||
@VariableMgr.VarAttr(name = PARTITIONED_HASH_AGG_ROWS_THRESHOLD)
|
||||
public int partitionedHashAggRowsThreshold = 0;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN)
|
||||
public boolean enableShareHashTableForBroadcastJoin = true;
|
||||
|
||||
@ -680,6 +685,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
// this.disableJoinReorder = random.nextBoolean();
|
||||
this.disableStreamPreaggregations = random.nextBoolean();
|
||||
this.partitionedHashJoinRowsThreshold = random.nextBoolean() ? 8 : 1048576;
|
||||
this.partitionedHashAggRowsThreshold = random.nextBoolean() ? 8 : 1048576;
|
||||
this.enableShareHashTableForBroadcastJoin = random.nextBoolean();
|
||||
this.rewriteOrToInPredicateThreshold = random.nextInt(100) + 2;
|
||||
int randomInt = random.nextInt(4);
|
||||
@ -1415,6 +1421,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
tResult.setSkipDeletePredicate(skipDeletePredicate);
|
||||
|
||||
tResult.setPartitionedHashJoinRowsThreshold(partitionedHashJoinRowsThreshold);
|
||||
tResult.setPartitionedHashAggRowsThreshold(partitionedHashAggRowsThreshold);
|
||||
|
||||
tResult.setRepeatMaxNum(repeatMaxNum);
|
||||
|
||||
|
||||
@ -190,6 +190,8 @@ struct TQueryOptions {
|
||||
57: optional bool check_overflow_for_decimal = false
|
||||
|
||||
58: optional i64 external_sort_bytes_threshold = 0
|
||||
|
||||
59: optional i32 partitioned_hash_agg_rows_threshold = 0
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user