From 6c7f758ef7d2db6d097b8c098702d7a2e0f2fbf5 Mon Sep 17 00:00:00 2001 From: TengJianPing <18241664+jacktengg@users.noreply.github.com> Date: Thu, 24 Nov 2022 14:16:47 +0800 Subject: [PATCH] [improvement](hashjoin) support partitioned hash table in hash join (#14480) --- be/src/runtime/runtime_state.h | 7 + be/src/vec/common/hash_table/hash_table.h | 167 +++--- .../common/hash_table/partitioned_hash_map.h | 64 ++ .../hash_table/partitioned_hash_table.h | 551 ++++++++++++++++++ be/src/vec/exec/join/vhash_join_node.cpp | 47 +- be/src/vec/exec/join/vhash_join_node.h | 14 +- .../runtime/shared_hash_table_controller.cpp | 2 +- .../org/apache/doris/qe/SessionVariable.java | 17 + gensrc/thrift/PaloInternalService.thrift | 2 + 9 files changed, 795 insertions(+), 76 deletions(-) create mode 100644 be/src/vec/common/hash_table/partitioned_hash_map.h create mode 100644 be/src/vec/common/hash_table/partitioned_hash_table.h diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 69b40f6c6f..cc9d8a4831 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -357,6 +357,13 @@ public: return _query_options.__isset.skip_delete_predicate && _query_options.skip_delete_predicate; } + int partitioned_hash_join_rows_threshold() const { + if (!_query_options.__isset.partitioned_hash_join_rows_threshold) { + return 0; + } + return _query_options.partitioned_hash_join_rows_threshold; + } + const std::vector& tablet_commit_infos() const { return _tablet_commit_infos; } diff --git a/be/src/vec/common/hash_table/hash_table.h b/be/src/vec/common/hash_table/hash_table.h index 0f2a2de0a4..4be24d3649 100644 --- a/be/src/vec/common/hash_table/hash_table.h +++ b/be/src/vec/common/hash_table/hash_table.h @@ -431,7 +431,7 @@ protected: friend class Reader; template - friend class TwoLevelHashTable; + friend class PartitionedHashTable; template friend class StringHashTable; @@ -445,6 +445,15 @@ protected: Grower grower; int64_t _resize_timer_ns; + // the bucket count threshold above which it's converted to partioned hash table + // > 0: enable convert dynamically + // 0: convert is disabled + int _partitioned_threshold = 0; + // if need resize and bucket count after resize will be >= _partitioned_threshold, + // this flag is set to true, and resize does not actually happen, + // PartitionedHashTable will convert this hash table to partitioned hash table + bool _need_partition = false; + //factor that will trigger growing the hash table on insert. static constexpr float MAX_BUCKET_OCCUPANCY_FRACTION = 0.5f; @@ -452,6 +461,14 @@ protected: mutable size_t collisions = 0; #endif + void set_partitioned_threshold(int threshold) { _partitioned_threshold = threshold; } + + bool check_if_need_partition(size_t bucket_count) { + return _partitioned_threshold > 0 && bucket_count >= _partitioned_threshold; + } + + bool need_partition() { return _need_partition; } + /// Find a cell with the same key or an empty cell, starting from the specified position and further along the collision resolution chain. size_t ALWAYS_INLINE find_cell(const Key& x, size_t hash_value, size_t place_value) const { while (!buf[place_value].is_zero(*this) && @@ -501,63 +518,6 @@ protected: } } - /// Increase the size of the buffer. - void resize(size_t for_num_elems = 0, size_t for_buf_size = 0) { - SCOPED_RAW_TIMER(&_resize_timer_ns); -#ifdef DBMS_HASH_MAP_DEBUG_RESIZES - Stopwatch watch; -#endif - - size_t old_size = grower.buf_size(); - - /** In case of exception for the object to remain in the correct state, - * changing the variable `grower` (which determines the buffer size of the hash table) - * is postponed for a moment after a real buffer change. - * The temporary variable `new_grower` is used to determine the new size. - */ - Grower new_grower = grower; - if (for_num_elems) { - new_grower.set(for_num_elems); - if (new_grower.buf_size() <= old_size) return; - } else if (for_buf_size) { - new_grower.set_buf_size(for_buf_size); - if (new_grower.buf_size() <= old_size) return; - } else - new_grower.increase_size(); - - /// Expand the space. - buf = reinterpret_cast(Allocator::realloc(buf, get_buffer_size_in_bytes(), - new_grower.buf_size() * sizeof(Cell))); - grower = new_grower; - - /** Now some items may need to be moved to a new location. - * The element can stay in place, or move to a new location "on the right", - * or move to the left of the collision resolution chain, because the elements to the left of it have been moved to the new "right" location. - */ - size_t i = 0; - for (; i < old_size; ++i) - if (!buf[i].is_zero(*this) && !buf[i].is_deleted()) - reinsert(buf[i], buf[i].get_hash(*this)); - - /** There is also a special case: - * if the element was to be at the end of the old buffer, [ x] - * but is at the beginning because of the collision resolution chain, [o x] - * then after resizing, it will first be out of place again, [ xo ] - * and in order to transfer it where necessary, - * after transferring all the elements from the old halves you need to [ o x ] - * process tail from the collision resolution chain immediately after it [ o x ] - */ - for (; !buf[i].is_zero(*this) && !buf[i].is_deleted(); ++i) - reinsert(buf[i], buf[i].get_hash(*this)); - -#ifdef DBMS_HASH_MAP_DEBUG_RESIZES - watch.stop(); - std::cerr << std::fixed << std::setprecision(3) << "Resize from " << old_size << " to " - << grower.buf_size() << " took " << watch.elapsedSeconds() << " sec." - << std::endl; -#endif - } - /** Paste into the new buffer the value that was in the old buffer. * Used when increasing the buffer size. */ @@ -684,6 +644,8 @@ public: std::swap(buf, rhs.buf); std::swap(m_size, rhs.m_size); std::swap(grower, rhs.grower); + std::swap(_need_partition, rhs._need_partition); + std::swap(_partitioned_threshold, rhs._partitioned_threshold); Hash::operator=(std::move(rhs)); Allocator::operator=(std::move(rhs)); @@ -816,10 +778,12 @@ protected: throw; } - // The hash table was rehashed, so we have to re-find the key. - size_t new_place = find_cell(key, hash_value, grower.place(hash_value)); - assert(!buf[new_place].is_zero(*this)); - it = &buf[new_place]; + if (LIKELY(!_need_partition)) { + // The hash table was rehashed, so we have to re-find the key. + size_t new_place = find_cell(key, hash_value, grower.place(hash_value)); + assert(!buf[new_place].is_zero(*this)); + it = &buf[new_place]; + } } } @@ -853,10 +817,12 @@ protected: throw; } - // The hash table was rehashed, so we have to re-find the key. - size_t new_place = find_cell(key, hash_value, grower.place(hash_value)); - assert(!buf[new_place].is_zero(*this)); - it = &buf[new_place]; + if (LIKELY(!_need_partition)) { + // The hash table was rehashed, so we have to re-find the key. + size_t new_place = find_cell(key, hash_value, grower.place(hash_value)); + assert(!buf[new_place].is_zero(*this)); + it = &buf[new_place]; + } } } @@ -1076,7 +1042,9 @@ public: float get_factor() const { return MAX_BUCKET_OCCUPANCY_FRACTION; } - bool should_be_shrink(int64_t valid_row) { return valid_row < get_factor() * (size() / 2.0); } + bool should_be_shrink(int64_t valid_row) const { + return valid_row < get_factor() * (size() / 2.0); + } void init_buf_size(size_t reserve_for_num_elements) { free(); @@ -1118,4 +1086,69 @@ public: #ifdef DBMS_HASH_MAP_COUNT_COLLISIONS size_t getCollisions() const { return collisions; } #endif + +private: + /// Increase the size of the buffer. + void resize(size_t for_num_elems = 0, size_t for_buf_size = 0) { + SCOPED_RAW_TIMER(&_resize_timer_ns); +#ifdef DBMS_HASH_MAP_DEBUG_RESIZES + Stopwatch watch; +#endif + + size_t old_size = grower.buf_size(); + + /** In case of exception for the object to remain in the correct state, + * changing the variable `grower` (which determines the buffer size of the hash table) + * is postponed for a moment after a real buffer change. + * The temporary variable `new_grower` is used to determine the new size. + */ + Grower new_grower = grower; + if (for_num_elems) { + new_grower.set(for_num_elems); + if (new_grower.buf_size() <= old_size) return; + } else if (for_buf_size) { + new_grower.set_buf_size(for_buf_size); + if (new_grower.buf_size() <= old_size) return; + } else + new_grower.increase_size(); + + // new bucket count exceed partitioned hash table bucket count threshold, + // don't resize and set need partition flag + if (check_if_need_partition(new_grower.buf_size())) { + _need_partition = true; + return; + } + + /// Expand the space. + buf = reinterpret_cast(Allocator::realloc(buf, get_buffer_size_in_bytes(), + new_grower.buf_size() * sizeof(Cell))); + grower = new_grower; + + /** Now some items may need to be moved to a new location. + * The element can stay in place, or move to a new location "on the right", + * or move to the left of the collision resolution chain, because the elements to the left of it have been moved to the new "right" location. + */ + size_t i = 0; + for (; i < old_size; ++i) + if (!buf[i].is_zero(*this) && !buf[i].is_deleted()) + reinsert(buf[i], buf[i].get_hash(*this)); + + /** There is also a special case: + * if the element was to be at the end of the old buffer, [ x] + * but is at the beginning because of the collision resolution chain, [o x] + * then after resizing, it will first be out of place again, [ xo ] + * and in order to transfer it where necessary, + * after transferring all the elements from the old halves you need to [ o x ] + * process tail from the collision resolution chain immediately after it [ o x ] + */ + for (; !buf[i].is_zero(*this) && !buf[i].is_deleted(); ++i) + reinsert(buf[i], buf[i].get_hash(*this)); + +#ifdef DBMS_HASH_MAP_DEBUG_RESIZES + watch.stop(); + std::cerr << std::fixed << std::setprecision(3) << "Resize from " << old_size << " to " + << grower.buf_size() << " took " << watch.elapsedSeconds() << " sec." + << std::endl; +#endif + } }; diff --git a/be/src/vec/common/hash_table/partitioned_hash_map.h b/be/src/vec/common/hash_table/partitioned_hash_map.h new file mode 100644 index 0000000000..fabf61b27a --- /dev/null +++ b/be/src/vec/common/hash_table/partitioned_hash_map.h @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/HashTable/TwoLevelHashMap.h +// and modified by Doris +#pragma once + +#include "vec/common/hash_table/hash_map.h" +#include "vec/common/hash_table/partitioned_hash_table.h" + +template , + typename Grower = PartitionedHashTableGrower<>, typename Allocator = HashTableAllocator, + template typename ImplTable = HashMapTable> +class PartitionedHashMapTable + : public PartitionedHashTable> { +public: + using Impl = ImplTable; + using Base = PartitionedHashTable>; + using LookupResult = typename Impl::LookupResult; + + using Base::Base; + using Base::prefetch; + + using mapped_type = typename Cell::Mapped; + + typename Cell::Mapped& ALWAYS_INLINE operator[](const Key& x) { + LookupResult it; + bool inserted; + this->emplace(x, it, inserted); + + if (inserted) new (lookup_result_get_mapped(it)) mapped_type(); + + return *lookup_result_get_mapped(it); + } +}; + +template , + typename Grower = PartitionedHashTableGrower<>, typename Allocator = HashTableAllocator, + template typename ImplTable = HashMapTable> +using PartitionedHashMap = PartitionedHashMapTable, Hash, + Grower, Allocator, ImplTable>; + +template , + typename Grower = PartitionedHashTableGrower<>, typename Allocator = HashTableAllocator, + template typename ImplTable = HashMapTable> +using PartitionedHashMapWithSavedHash = + PartitionedHashMapTable, Hash, Grower, + Allocator, ImplTable>; diff --git a/be/src/vec/common/hash_table/partitioned_hash_table.h b/be/src/vec/common/hash_table/partitioned_hash_table.h new file mode 100644 index 0000000000..d0cdc25f89 --- /dev/null +++ b/be/src/vec/common/hash_table/partitioned_hash_table.h @@ -0,0 +1,551 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/HashTable/TwoLevelHashTable.h +// and modified by Doris +#pragma once + +#include "vec/common/hash_table/hash_table.h" + +/** Partitioned hash table. + * Represents 16 (or 1ULL << BITS_FOR_SUB_TABLE) small hash tables (sub table count of the first level). + * To determine which one to use, one of the bytes of the hash function is taken. + * + * Usually works a little slower than a simple hash table. + * However, it has advantages in some cases: + * - if you need to merge two hash tables together, then you can easily parallelize it by sub tables; + * - delay during resizes is amortized, since the small hash tables will be resized separately; + * - in theory, resizes are cache-local in a larger range of sizes. + */ + +template +struct PartitionedHashTableGrower : public HashTableGrowerWithPrecalculation { + /// Increase the size of the hash table. + void increase_size() { this->increase_size_degree(this->size_degree() >= 15 ? 1 : 2); } +}; + +template , + size_t BITS_FOR_SUB_TABLE = 4> +class PartitionedHashTable : private boost::noncopyable, + protected Hash /// empty base optimization +{ +public: + using Impl = ImplTable; + + using key_type = typename Impl::key_type; + using mapped_type = typename Impl::mapped_type; + using value_type = typename Impl::value_type; + using cell_type = typename Impl::cell_type; + + using LookupResult = typename Impl::LookupResult; + using ConstLookupResult = typename Impl::ConstLookupResult; + +protected: + friend class const_iterator; + friend class iterator; + + using HashValue = size_t; + using Self = PartitionedHashTable; + +private: + static constexpr size_t NUM_LEVEL1_SUB_TABLES = 1ULL << BITS_FOR_SUB_TABLE; + static constexpr size_t MAX_SUB_TABLE = NUM_LEVEL1_SUB_TABLES - 1; + + //factor that will trigger growing the hash table on insert. + static constexpr float MAX_SUB_TABLE_OCCUPANCY_FRACTION = 0.5f; + + Impl level0_sub_table; + Impl level1_sub_tables[NUM_LEVEL1_SUB_TABLES]; + + bool _is_partitioned = false; + + int64_t _convert_timer_ns = 0; + +public: + PartitionedHashTable() = default; + + PartitionedHashTable(PartitionedHashTable&& rhs) { *this = std::move(rhs); } + + PartitionedHashTable& operator=(PartitionedHashTable&& rhs) { + std::swap(_is_partitioned, rhs._is_partitioned); + + level0_sub_table = std::move(rhs.level0_sub_table); + for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) { + level1_sub_tables[i] = std::move(rhs.level1_sub_tables[i]); + } + + Hash::operator=(std::move(rhs)); + return *this; + } + + size_t hash(const Key& x) const { return Hash::operator()(x); } + + float get_factor() const { return MAX_SUB_TABLE_OCCUPANCY_FRACTION; } + + int64_t get_convert_timer_value() const { return _convert_timer_ns; } + + bool should_be_shrink(int64_t valid_row) const { + if (_is_partitioned) { + return false; + } else { + return level0_sub_table.should_be_shrink(valid_row); + } + } + + template + void ALWAYS_INLINE for_each_value(Func&& func) { + if (_is_partitioned) { + for (auto i = 0u; i < NUM_LEVEL1_SUB_TABLES; ++i) { + level1_sub_tables[i].for_each_value(func); + } + } else { + level0_sub_table.for_each_value(func); + } + } + + size_t get_size() { + size_t count = 0; + if (_is_partitioned) { + for (auto i = 0u; i < this->NUM_LEVEL1_SUB_TABLES; ++i) { + for (auto& v : this->level1_sub_tables[i]) { + count += v.get_second().get_row_count(); + } + } + } else { + count = level0_sub_table.get_size(); + } + return count; + } + + void init_buf_size(size_t reserve_for_num_elements) { + if (_is_partitioned) { + for (auto& impl : level1_sub_tables) { + impl.init_buf_size(reserve_for_num_elements / NUM_LEVEL1_SUB_TABLES); + } + } else { + if (level0_sub_table.check_if_need_partition(reserve_for_num_elements)) { + level0_sub_table.clear_and_shrink(); + _is_partitioned = true; + + for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) { + level1_sub_tables[i].init_buf_size(reserve_for_num_elements / + NUM_LEVEL1_SUB_TABLES); + } + } else { + level0_sub_table.init_buf_size(reserve_for_num_elements); + } + } + } + + void delete_zero_key(Key key) { + if (_is_partitioned) { + const auto key_hash = hash(key); + size_t sub_table_idx = get_sub_table_from_hash(key_hash); + level1_sub_tables[sub_table_idx].delete_zero_key(key); + } else { + level0_sub_table.delete_zero_key(key); + } + } + + size_t get_buffer_size_in_bytes() const { + if (_is_partitioned) { + size_t buff_size = 0; + for (const auto& impl : level1_sub_tables) buff_size += impl.get_buffer_size_in_bytes(); + return buff_size; + } else { + return level0_sub_table.get_buffer_size_in_bytes(); + } + } + + size_t get_buffer_size_in_cells() const { + if (_is_partitioned) { + size_t buff_size = 0; + for (const auto& impl : level1_sub_tables) buff_size += impl.get_buffer_size_in_cells(); + return buff_size; + } else { + return level0_sub_table.get_buffer_size_in_cells(); + } + } + + std::vector get_buffer_sizes_in_cells() const { + std::vector sizes; + if (_is_partitioned) { + for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) { + sizes.push_back(level1_sub_tables[i].get_buffer_size_in_cells()); + } + } else { + sizes.push_back(level0_sub_table.get_buffer_size_in_cells()); + } + return sizes; + } + + void reset_resize_timer() { + if (_is_partitioned) { + for (auto& impl : level1_sub_tables) { + impl.reset_resize_timer(); + } + } else { + level0_sub_table.reset_resize_timer(); + } + } + int64_t get_resize_timer_value() const { + if (_is_partitioned) { + int64_t resize_timer_ns = 0; + for (const auto& impl : level1_sub_tables) { + resize_timer_ns += impl.get_resize_timer_value(); + } + return resize_timer_ns; + } else { + return level0_sub_table.get_resize_timer_value(); + } + } + +protected: + typename Impl::iterator begin_of_next_non_empty_sub_table_idx(size_t& sub_table_idx) { + while (sub_table_idx != NUM_LEVEL1_SUB_TABLES && level1_sub_tables[sub_table_idx].empty()) + ++sub_table_idx; + + if (sub_table_idx != NUM_LEVEL1_SUB_TABLES) return level1_sub_tables[sub_table_idx].begin(); + + --sub_table_idx; + return level1_sub_tables[MAX_SUB_TABLE].end(); + } + + typename Impl::const_iterator begin_of_next_non_empty_sub_table_idx( + size_t& sub_table_idx) const { + while (sub_table_idx != NUM_LEVEL1_SUB_TABLES && level1_sub_tables[sub_table_idx].empty()) + ++sub_table_idx; + + if (sub_table_idx != NUM_LEVEL1_SUB_TABLES) return level1_sub_tables[sub_table_idx].begin(); + + --sub_table_idx; + return level1_sub_tables[MAX_SUB_TABLE].end(); + } + +public: + void set_partitioned_threshold(int threshold) { + level0_sub_table.set_partitioned_threshold(threshold); + } + + class iterator /// NOLINT + { + Self* container {}; + size_t sub_table_idx {}; + typename Impl::iterator current_it {}; + + friend class PartitionedHashTable; + + iterator(Self* container_, size_t sub_table_idx_, typename Impl::iterator current_it_) + : container(container_), sub_table_idx(sub_table_idx_), current_it(current_it_) {} + + public: + iterator() = default; + + bool operator==(const iterator& rhs) const { + return sub_table_idx == rhs.sub_table_idx && current_it == rhs.current_it; + } + bool operator!=(const iterator& rhs) const { return !(*this == rhs); } + + iterator& operator++() { + ++current_it; + if (container->_is_partitioned) { + if (current_it == container->level1_sub_tables[sub_table_idx].end()) { + ++sub_table_idx; + current_it = container->begin_of_next_non_empty_sub_table_idx(sub_table_idx); + } + } + + return *this; + } + + Cell& operator*() const { return *current_it; } + Cell* operator->() const { return current_it.get_ptr(); } + + Cell* get_ptr() const { return current_it.get_ptr(); } + size_t get_hash() const { return current_it.get_hash(); } + }; + + class const_iterator /// NOLINT + { + Self* container {}; + size_t sub_table_idx {}; + typename Impl::const_iterator current_it {}; + + friend class PartitionedHashTable; + + const_iterator(Self* container_, size_t sub_table_idx_, + typename Impl::const_iterator current_it_) + : container(container_), sub_table_idx(sub_table_idx_), current_it(current_it_) {} + + public: + const_iterator() = default; + const_iterator(const iterator& rhs) + : container(rhs.container), + sub_table_idx(rhs.sub_table_idx), + current_it(rhs.current_it) {} /// NOLINT + + bool operator==(const const_iterator& rhs) const { + return sub_table_idx == rhs.sub_table_idx && current_it == rhs.current_it; + } + bool operator!=(const const_iterator& rhs) const { return !(*this == rhs); } + + const_iterator& operator++() { + ++current_it; + if (container->_is_partitioned) { + if (current_it == container->level1_sub_tables[sub_table_idx].end()) { + ++sub_table_idx; + current_it = container->begin_of_next_non_empty_sub_table_idx(sub_table_idx); + } + } + + return *this; + } + + const Cell& operator*() const { return *current_it; } + const Cell* operator->() const { return current_it->get_ptr(); } + + const Cell* get_ptr() const { return current_it.get_ptr(); } + size_t get_hash() const { return current_it.get_hash(); } + }; + + const_iterator begin() const { + if (_is_partitioned) { + size_t sub_table_idx = 0; + typename Impl::const_iterator impl_it = + begin_of_next_non_empty_sub_table_idx(sub_table_idx); + return {this, sub_table_idx, impl_it}; + } else { + return {this, NUM_LEVEL1_SUB_TABLES, level0_sub_table.begin()}; + } + } + + iterator begin() { + if (_is_partitioned) { + size_t sub_table_idx = 0; + typename Impl::iterator impl_it = begin_of_next_non_empty_sub_table_idx(sub_table_idx); + return {this, sub_table_idx, impl_it}; + } else { + return {this, NUM_LEVEL1_SUB_TABLES, level0_sub_table.begin()}; + } + } + + const_iterator end() const { + if (_is_partitioned) { + return {this, MAX_SUB_TABLE, level1_sub_tables[MAX_SUB_TABLE].end()}; + } else { + return {this, NUM_LEVEL1_SUB_TABLES, level0_sub_table.end()}; + } + } + iterator end() { + if (_is_partitioned) { + return {this, MAX_SUB_TABLE, level1_sub_tables[MAX_SUB_TABLE].end()}; + } else { + return {this, NUM_LEVEL1_SUB_TABLES, level0_sub_table.end()}; + } + } + + /// Insert a value. In the case of any more complex values, it is better to use the `emplace` function. + std::pair ALWAYS_INLINE insert(const value_type& x) { + size_t hash_value = hash(Cell::get_key(x)); + + std::pair res; + emplace(Cell::get_key(x), res.first, res.second, hash_value); + + if (res.second) insert_set_mapped(lookup_result_get_mapped(res.first), x); + + return res; + } + + void expanse_for_add_elem(size_t num_elem) { + if (_is_partitioned) { + size_t num_elem_per_sub_table = + (num_elem + NUM_LEVEL1_SUB_TABLES - 1) / NUM_LEVEL1_SUB_TABLES; + for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) { + level1_sub_tables[i].expanse_for_add_elem(num_elem_per_sub_table); + } + } else { + level0_sub_table.expanse_for_add_elem(num_elem); + if (UNLIKELY(level0_sub_table.need_partition())) { + convert_to_partitioned(); + } + } + } + + template + void ALWAYS_INLINE prefetch(KeyHolder& key_holder) { + if (_is_partitioned) { + const auto& key = key_holder_get_key(key_holder); + const auto key_hash = hash(key); + const auto sub_table_idx = get_sub_table_from_hash(key_hash); + level1_sub_tables[sub_table_idx].prefetch(key_holder); + } else { + level0_sub_table.prefetch(key_holder); + } + } + + template + void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) { + if (_is_partitioned) { + const auto sub_table_idx = get_sub_table_from_hash(hash_value); + level1_sub_tables[sub_table_idx].template prefetch_by_hash(hash_value); + } else { + level0_sub_table.template prefetch_by_hash(hash_value); + } + } + + template + void ALWAYS_INLINE prefetch(KeyHolder& key_holder) { + if (_is_partitioned) { + const auto& key = key_holder_get_key(key_holder); + const auto key_hash = hash(key); + const auto sub_table_idx = get_sub_table_from_hash(key_hash); + level1_sub_tables[sub_table_idx].template prefetch(key_holder); + } else { + level0_sub_table.template prefetch(key_holder); + } + } + + /** Insert the key, + * return an iterator to a position that can be used for `placement new` of value, + * as well as the flag - whether a new key was inserted. + * + * You have to make `placement new` values if you inserted a new key, + * since when destroying a hash table, the destructor will be invoked for it! + * + * Example usage: + * + * Map::iterator it; + * bool inserted; + * map.emplace(key, it, inserted); + * if (inserted) + * new(&it->second) Mapped(value); + */ + template + void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, bool& inserted) { + size_t hash_value = hash(key_holder_get_key(key_holder)); + emplace(key_holder, it, inserted, hash_value); + } + + /// Same, but with a precalculated values of hash function. + template + void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, bool& inserted, + size_t hash_value) { + if (_is_partitioned) { + size_t sub_table_idx = get_sub_table_from_hash(hash_value); + level1_sub_tables[sub_table_idx].emplace(key_holder, it, inserted, hash_value); + } else { + level0_sub_table.emplace(key_holder, it, inserted, hash_value); + if (UNLIKELY(level0_sub_table.need_partition())) { + convert_to_partitioned(); + + // The hash table was converted to partitioned, so we have to re-find the key. + size_t sub_table_id = get_sub_table_from_hash(hash_value); + it = level1_sub_tables[sub_table_id].find(key_holder_get_key(key_holder), + hash_value); + } + } + } + + template + void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, size_t hash_value, + bool& inserted) { + emplace(key_holder, it, inserted, hash_value); + } + + LookupResult ALWAYS_INLINE find(Key x, size_t hash_value) { + if (_is_partitioned) { + size_t sub_table_idx = get_sub_table_from_hash(hash_value); + return level1_sub_tables[sub_table_idx].find(x, hash_value); + } else { + return level0_sub_table.find(x, hash_value); + } + } + + ConstLookupResult ALWAYS_INLINE find(Key x, size_t hash_value) const { + return const_cast*>(this)->find(x, hash_value); + } + + LookupResult ALWAYS_INLINE find(Key x) { return find(x, hash(x)); } + + ConstLookupResult ALWAYS_INLINE find(Key x) const { return find(x, hash(x)); } + + size_t size() const { + if (_is_partitioned) { + size_t res = 0; + for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) res += level1_sub_tables[i].size(); + return res; + } else { + return level0_sub_table.size(); + } + } + + std::vector sizes() const { + std::vector sizes; + if (_is_partitioned) { + for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) { + sizes.push_back(level1_sub_tables[i].size()); + } + } else { + sizes.push_back(level0_sub_table.size()); + } + return sizes; + } + + bool empty() const { + if (_is_partitioned) { + for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) + if (!level1_sub_tables[i].empty()) return false; + return true; + } else { + return level0_sub_table.empty(); + } + } + +private: + void convert_to_partitioned() { + SCOPED_RAW_TIMER(&_convert_timer_ns); + + auto bucket_count = level0_sub_table.get_buffer_size_in_cells(); + for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) { + level1_sub_tables[i] = std::move(Impl(bucket_count / NUM_LEVEL1_SUB_TABLES)); + } + + auto it = level0_sub_table.begin(); + + /// It is assumed that the zero key (stored separately) is first in iteration order. + if (it != level0_sub_table.end() && it.get_ptr()->is_zero(level0_sub_table)) { + insert(it->get_value()); + ++it; + } + + for (; it != level0_sub_table.end(); ++it) { + const Cell* cell = it.get_ptr(); + size_t hash_value = cell->get_hash(level0_sub_table); + size_t sub_table_idx = get_sub_table_from_hash(hash_value); + level1_sub_tables[sub_table_idx].insert_unique_non_zero(cell, hash_value); + } + + _is_partitioned = true; + level0_sub_table.clear_and_shrink(); + } + + /// NOTE Bad for hash tables with more than 2^32 cells. + static size_t get_sub_table_from_hash(size_t hash_value) { + return (hash_value >> (32 - BITS_FOR_SUB_TABLE)) & MAX_SUB_TABLE; + } +}; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 79beb01a83..da144378b1 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -73,20 +73,37 @@ struct ProcessHashTableBuild { Defer defer {[&]() { int64_t bucket_size = hash_table_ctx.hash_table.get_buffer_size_in_cells(); + int64_t filled_bucket_size = hash_table_ctx.hash_table.size(); int64_t bucket_bytes = hash_table_ctx.hash_table.get_buffer_size_in_bytes(); _join_node->_mem_used += bucket_bytes - old_bucket_bytes; COUNTER_SET(_join_node->_build_buckets_counter, bucket_size); + COUNTER_SET(_join_node->_build_buckets_fill_counter, filled_bucket_size); + + auto hash_table_buckets = hash_table_ctx.hash_table.get_buffer_sizes_in_cells(); + std::string hash_table_buckets_info; + for (auto bucket_count : hash_table_buckets) { + hash_table_buckets_info += std::to_string(bucket_count) + ", "; + } + _join_node->add_hash_buckets_info(hash_table_buckets_info); + + auto hash_table_sizes = hash_table_ctx.hash_table.sizes(); + hash_table_buckets_info.clear(); + for (auto table_size : hash_table_sizes) { + hash_table_buckets_info += std::to_string(table_size) + ", "; + } + _join_node->add_hash_buckets_filled_info(hash_table_buckets_info); }}; KeyGetter key_getter(_build_raw_ptrs, _join_node->_build_key_sz, nullptr); SCOPED_TIMER(_join_node->_build_table_insert_timer); + hash_table_ctx.hash_table.reset_resize_timer(); + // only not build_unique, we need expanse hash table before insert data if (!_join_node->_build_unique) { // _rows contains null row, which will cause hash table resize to be large. RETURN_IF_CATCH_BAD_ALLOC(hash_table_ctx.hash_table.expanse_for_add_elem(_rows)); } - hash_table_ctx.hash_table.reset_resize_timer(); vector& inserted_rows = _join_node->_inserted_rows[&_acquired_block]; bool has_runtime_filter = !_join_node->_runtime_filter_descs.empty(); @@ -172,6 +189,9 @@ struct ProcessHashTableBuild { COUNTER_UPDATE(_join_node->_build_table_expanse_timer, hash_table_ctx.hash_table.get_resize_timer_value()); + COUNTER_UPDATE(_join_node->_build_table_convert_timer, + hash_table_ctx.hash_table.get_convert_timer_value()); + return Status::OK(); } @@ -337,6 +357,8 @@ Status HashJoinNode::prepare(RuntimeState* state) { _build_table_insert_timer = ADD_TIMER(build_phase_profile, "BuildTableInsertTime"); _build_expr_call_timer = ADD_TIMER(build_phase_profile, "BuildExprCallTime"); _build_table_expanse_timer = ADD_TIMER(build_phase_profile, "BuildTableExpanseTime"); + _build_table_convert_timer = + ADD_TIMER(build_phase_profile, "BuildTableConvertToPartitionedTime"); _build_rows_counter = ADD_COUNTER(build_phase_profile, "BuildRows", TUnit::UNIT); _build_side_compute_hash_timer = ADD_TIMER(build_phase_profile, "BuildSideHashComputingTime"); @@ -355,6 +377,7 @@ Status HashJoinNode::prepare(RuntimeState* state) { _push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime"); _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime"); _build_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT); + _build_buckets_fill_counter = ADD_COUNTER(runtime_profile(), "FilledBuckets", TUnit::UNIT); if (_is_broadcast_join) { runtime_profile()->add_info_string("BroadcastJoin", "true"); @@ -374,13 +397,21 @@ Status HashJoinNode::prepare(RuntimeState* state) { _left_table_data_types = VectorizedUtils::get_data_types(child(0)->row_desc()); // Hash Table Init - _hash_table_init(); + _hash_table_init(state); _process_hashtable_ctx_variants_init(state); _construct_mutable_join_block(); return Status::OK(); } +void HashJoinNode::add_hash_buckets_info(const std::string& info) { + runtime_profile()->add_info_string("HashTableBuckets", info); +} + +void HashJoinNode::add_hash_buckets_filled_info(const std::string& info) { + runtime_profile()->add_info_string("HashTableFilledBuckets", info); +} + Status HashJoinNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); @@ -833,7 +864,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin return st; } -void HashJoinNode::_hash_table_init() { +void HashJoinNode::_hash_table_init(RuntimeState* state) { std::visit( [&](auto&& join_op_variants, auto have_other_join_conjunct) { using JoinOpType = std::decay_t; @@ -956,6 +987,16 @@ void HashJoinNode::_hash_table_init() { _join_op_variants, make_bool_variant(_have_other_join_conjunct)); DCHECK(!std::holds_alternative(*_hash_table_variants)); + + std::visit(Overload {[&](std::monostate& arg) { + LOG(FATAL) << "FATAL: uninited hash table"; + __builtin_unreachable(); + }, + [&](auto&& arg) { + arg.hash_table_ptr->set_partitioned_threshold( + state->partitioned_hash_join_rows_threshold()); + }}, + *_hash_table_variants); } void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) { diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 691857c986..72d8317468 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -24,7 +24,7 @@ #include "join_op.h" #include "process_hash_table_probe.h" #include "vec/common/columns_hashing.h" -#include "vec/common/hash_table/hash_map.h" +#include "vec/common/hash_table/partitioned_hash_map.h" #include "vjoin_node_base.h" namespace doris { @@ -39,7 +39,7 @@ class SharedHashTableController; template struct SerializedHashTableContext { using Mapped = RowRefListType; - using HashTable = HashMap; + using HashTable = PartitionedHashMap; using State = ColumnsHashing::HashMethodSerialized; using Iter = typename HashTable::iterator; @@ -70,7 +70,7 @@ struct IsSerializedHashTableContextTraits struct PrimaryTypeHashTableContext { using Mapped = RowRefListType; - using HashTable = HashMap>; + using HashTable = PartitionedHashMap>; using State = ColumnsHashing::HashMethodOneNumber; using Iter = typename HashTable::iterator; @@ -105,7 +105,7 @@ using I256HashTableContext = PrimaryTypeHashTableContext struct FixedKeyHashTableContext { using Mapped = RowRefListType; - using HashTable = HashMap>; + using HashTable = PartitionedHashMap>; using State = ColumnsHashing::HashMethodKeysFixed; using Iter = typename HashTable::iterator; @@ -192,6 +192,8 @@ public: Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override; Status get_next(RuntimeState* state, Block* block, bool* eos) override; Status close(RuntimeState* state) override; + void add_hash_buckets_info(const std::string& info); + void add_hash_buckets_filled_info(const std::string& info); private: using VExprContexts = std::vector; @@ -218,9 +220,11 @@ private: RuntimeProfile::Counter* _build_expr_call_timer; RuntimeProfile::Counter* _build_table_insert_timer; RuntimeProfile::Counter* _build_table_expanse_timer; + RuntimeProfile::Counter* _build_table_convert_timer; RuntimeProfile::Counter* _probe_expr_call_timer; RuntimeProfile::Counter* _probe_next_timer; RuntimeProfile::Counter* _build_buckets_counter; + RuntimeProfile::Counter* _build_buckets_fill_counter; RuntimeProfile::Counter* _push_down_timer; RuntimeProfile::Counter* _push_compute_timer; RuntimeProfile::Counter* _search_hashtable_timer; @@ -281,7 +285,7 @@ private: void _set_build_ignore_flag(Block& block, const std::vector& res_col_ids); - void _hash_table_init(); + void _hash_table_init(RuntimeState* state); void _process_hashtable_ctx_variants_init(RuntimeState* state); static constexpr auto _MAX_BUILD_BLOCK_COUNT = 128; diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp b/be/src/vec/runtime/shared_hash_table_controller.cpp index 8ca3656ad4..7d92dabedf 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.cpp +++ b/be/src/vec/runtime/shared_hash_table_controller.cpp @@ -121,10 +121,10 @@ Status SharedHashTableController::release_ref_count_if_need(TUniqueId fragment_i Status SharedHashTableController::wait_for_closable(RuntimeState* state, int my_node_id) { std::unique_lock lock(_mutex); - RETURN_IF_CANCELLED(state); if (!_ref_fragments[my_node_id].empty()) { _cv.wait(lock, [&]() { return _ref_fragments[my_node_id].empty(); }); } + RETURN_IF_CANCELLED(state); return Status::OK(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 59b653a88f..c8622de800 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -229,6 +229,8 @@ public class SessionVariable implements Serializable, Writable { public static final String INTERNAL_SESSION = "internal_session"; + public static final String PARTITIONED_HASH_JOIN_ROWS_THRESHOLD = "partitioned_hash_join_rows_threshold"; + // session origin value public Map sessionOriginValue = new HashMap(); // check stmt is or not [select /*+ SET_VAR(...)*/ ...] @@ -601,6 +603,10 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = INTERNAL_SESSION) public boolean internalSession = false; + // Use partitioned hash join if build side row count >= the threshold . 0 - the threshold is not set. + @VariableMgr.VarAttr(name = PARTITIONED_HASH_JOIN_ROWS_THRESHOLD) + public int partitionedHashJoinRowsThreshold = 0; + // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables, // not the default value set in the code. public void initFuzzyModeVariables() { @@ -609,6 +615,7 @@ public class SessionVariable implements Serializable, Writable { this.enableLocalExchange = random.nextBoolean(); this.disableJoinReorder = random.nextBoolean(); this.disableStreamPreaggregations = random.nextBoolean(); + // this.partitionedHashJoinRowsThreshold = random.nextBoolean() ? 8 : 1048576; } public String getBlockEncryptionMode() { @@ -842,6 +849,14 @@ public class SessionVariable implements Serializable, Writable { this.enablePartitionCache = enablePartitionCache; } + public int getPartitionedHashJoinRowsThreshold() { + return partitionedHashJoinRowsThreshold; + } + + public void setPartitionedHashJoinRowsThreshold(int threshold) { + this.partitionedHashJoinRowsThreshold = threshold; + } + // Serialize to thrift object public boolean getForwardToMaster() { return forwardToMaster; @@ -1244,6 +1259,8 @@ public class SessionVariable implements Serializable, Writable { tResult.setSkipDeletePredicate(skipDeletePredicate); + tResult.setPartitionedHashJoinRowsThreshold(partitionedHashJoinRowsThreshold); + return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index ded68c7458..b725ea9191 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -179,6 +179,8 @@ struct TQueryOptions { 51: optional bool enable_new_shuffle_hash_method 52: optional i32 be_exec_version = 0 + + 53: optional i32 partitioned_hash_join_rows_threshold = 0 }