diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index f02e203c78..2711b0d885 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -364,7 +364,7 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) { } return; } - if (!try_get_hash_map_context_fixed( + if (!try_get_hash_map_context_fixed( *_shared_state->hash_table_variants, _build_expr_ctxs)) { _shared_state->hash_table_variants ->emplace>(); diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 59f9fee377..8a58973be3 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -35,6 +35,7 @@ #include "vec/common/hash_table/hash_map_context_creator.h" #include "vec/common/sort/partition_sorter.h" #include "vec/common/sort/sorter.h" +#include "vec/core/types.h" #include "vec/exec/join/process_hash_table_probe.h" #include "vec/exec/join/vhash_join_node.h" #include "vec/exec/vaggregation_node.h" @@ -524,24 +525,22 @@ public: /// called in setup_local_state void hash_table_init() { + using namespace vectorized; if (child_exprs_lists[0].size() == 1 && (!build_not_ignore_null[0])) { // Single column optimization switch (child_exprs_lists[0][0]->root()->result_type()) { case TYPE_BOOLEAN: case TYPE_TINYINT: - hash_table_variants->emplace< - vectorized::I8HashTableContext>(); + hash_table_variants->emplace>(); break; case TYPE_SMALLINT: - hash_table_variants->emplace< - vectorized::I16HashTableContext>(); + hash_table_variants->emplace>(); break; case TYPE_INT: case TYPE_FLOAT: case TYPE_DATEV2: case TYPE_DECIMAL32: - hash_table_variants->emplace< - vectorized::I32HashTableContext>(); + hash_table_variants->emplace>(); break; case TYPE_BIGINT: case TYPE_DOUBLE: @@ -549,27 +548,21 @@ public: case TYPE_DATE: case TYPE_DECIMAL64: case TYPE_DATETIMEV2: - hash_table_variants->emplace< - vectorized::I64HashTableContext>(); + hash_table_variants->emplace>(); break; case TYPE_LARGEINT: case TYPE_DECIMALV2: case TYPE_DECIMAL128I: - hash_table_variants->emplace< - vectorized::I128HashTableContext>(); + hash_table_variants->emplace>(); break; default: - hash_table_variants->emplace< - vectorized::SerializedHashTableContext>(); + hash_table_variants->emplace(); } return; } - - if (!try_get_hash_map_context_fixed( + if (!try_get_hash_map_context_fixed( *hash_table_variants, child_exprs_lists[0])) { - hash_table_variants->emplace< - vectorized::SerializedHashTableContext>(); + hash_table_variants->emplace(); } } }; diff --git a/be/src/vec/common/hash_table/hash_map.h b/be/src/vec/common/hash_table/hash_map.h index 382f46acb7..d10b24ade2 100644 --- a/be/src/vec/common/hash_table/hash_map.h +++ b/be/src/vec/common/hash_table/hash_map.h @@ -27,7 +27,9 @@ #include "vec/common/hash_table/hash.h" #include "vec/common/hash_table/hash_table.h" #include "vec/common/hash_table/hash_table_allocator.h" +#include "vec/common/hash_table/join_hash_table.h" +namespace doris { /** NOTE HashMap could only be used for memmoveable (position independent) types. * Example: std::string is not position independent in libstdc++ with C++11 ABI or in libc++. * Also, key in hash table must be of type, that zero bytes is compared equals to zero key. @@ -192,379 +194,15 @@ public: bool has_null_key_data() const { return false; } }; -template , - typename Grower = HashTableGrower<>, typename Allocator = HashTableAllocator> -class JoinHashMapTable : public HashMapTable { -public: - using Self = JoinHashMapTable; - using Base = HashMapTable; - - using key_type = Key; - using value_type = typename Cell::value_type; - using mapped_type = typename Cell::Mapped; - - using LookupResult = typename Base::LookupResult; - - static uint32_t calc_bucket_size(size_t num_elem) { - size_t expect_bucket_size = num_elem + (num_elem - 1) / 7; - return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1; - } - - size_t get_byte_size() const { - auto cal_vector_mem = [](const auto& vec) { return vec.capacity() * sizeof(vec[0]); }; - return cal_vector_mem(visited) + cal_vector_mem(first) + cal_vector_mem(next); - } - - template - void prepare_build(size_t num_elem, int batch_size, bool has_null_key) { - _has_null_key = has_null_key; - - // the first row in build side is not really from build side table - _empty_build_side = num_elem <= 1; - max_batch_size = batch_size; - bucket_size = calc_bucket_size(num_elem + 1); - first.resize(bucket_size + 1); - next.resize(num_elem); - - if constexpr (JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN || - JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN || - JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN || - JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) { - visited.resize(num_elem); - } - } - - uint32_t get_bucket_size() const { return bucket_size; } - - size_t size() const { return Base::size() == 0 ? next.size() : Base::size(); } - - std::vector& get_visited() { return visited; } - - void build(const Key* __restrict keys, const uint32_t* __restrict bucket_nums, - size_t num_elem) { - build_keys = keys; - for (size_t i = 1; i < num_elem; i++) { - uint32_t bucket_num = bucket_nums[i]; - next[i] = first[bucket_num]; - first[bucket_num] = i; - } - first[bucket_size] = 0; // index = bucket_num means null - } - - template - auto find_batch(const Key* __restrict keys, const uint32_t* __restrict build_idx_map, - int probe_idx, uint32_t build_idx, int probe_rows, - uint32_t* __restrict probe_idxs, bool& probe_visited, - uint32_t* __restrict build_idxs, - doris::vectorized::ColumnFilterHelper* mark_column) { - if constexpr (JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - if (_empty_build_side) { - return _process_null_aware_left_anti_join_for_empty_build_side< - JoinOpType, with_other_conjuncts, is_mark_join>( - probe_idx, probe_rows, probe_idxs, build_idxs, mark_column); - } - } - - if constexpr (with_other_conjuncts) { - return _find_batch_conjunct(keys, build_idx_map, probe_idx, build_idx, - probe_rows, probe_idxs, build_idxs); - } - - if constexpr (is_mark_join) { - return _find_batch_mark(keys, build_idx_map, probe_idx, probe_rows, - probe_idxs, build_idxs, mark_column); - } - - if constexpr (JoinOpType == doris::TJoinOp::INNER_JOIN || - JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN || - JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN || - JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN) { - return _find_batch_inner_outer_join(keys, build_idx_map, probe_idx, - build_idx, probe_rows, probe_idxs, - probe_visited, build_idxs); - } - if constexpr (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN || - JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return _find_batch_left_semi_anti( - keys, build_idx_map, probe_idx, probe_rows, probe_idxs); - } - if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN || - JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) { - return _find_batch_right_semi_anti(keys, build_idx_map, probe_idx, probe_rows); - } - return std::tuple {0, 0U, 0}; - } - - template - bool iterate_map(std::vector& build_idxs) const { - const auto batch_size = max_batch_size; - const auto elem_num = visited.size(); - int count = 0; - build_idxs.resize(batch_size); - - while (count < batch_size && iter_idx < elem_num) { - const auto matched = visited[iter_idx]; - build_idxs[count] = iter_idx; - if constexpr (JoinOpType != doris::TJoinOp::RIGHT_SEMI_JOIN) { - count += !matched; - } else { - count += matched; - } - iter_idx++; - } - - build_idxs.resize(count); - return iter_idx >= elem_num; - } - - bool has_null_key() { return _has_null_key; } - - void pre_build_idxs(std::vector& bucksets, const uint8_t* null_map) { - if (null_map) { - first[bucket_size] = bucket_size; // distinguish between not matched and null - } - - for (uint32_t i = 0; i < bucksets.size(); i++) { - bucksets[i] = first[bucksets[i]]; - } - } - -private: - // only LEFT_ANTI_JOIN/LEFT_SEMI_JOIN/NULL_AWARE_LEFT_ANTI_JOIN/CROSS_JOIN support mark join - template - auto _find_batch_mark(const Key* __restrict keys, const uint32_t* __restrict build_idx_map, - int probe_idx, int probe_rows, uint32_t* __restrict probe_idxs, - uint32_t* __restrict build_idxs, - doris::vectorized::ColumnFilterHelper* mark_column) { - auto matched_cnt = 0; - const auto batch_size = max_batch_size; - - while (probe_idx < probe_rows && matched_cnt < batch_size) { - auto build_idx = build_idx_map[probe_idx] == bucket_size ? 0 : build_idx_map[probe_idx]; - - while (build_idx && keys[probe_idx] != build_keys[build_idx]) { - build_idx = next[build_idx]; - } - - if (build_idx_map[probe_idx] == bucket_size) { - // mark result as null when probe row is null - mark_column->insert_null(); - } else { - bool matched = JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ? build_idx != 0 - : build_idx == 0; - if (!matched && _has_null_key) { - mark_column->insert_null(); - } else { - mark_column->insert_value(matched); - } - } - - probe_idxs[matched_cnt] = probe_idx++; - build_idxs[matched_cnt] = build_idx; - matched_cnt++; - } - return std::tuple {probe_idx, 0U, matched_cnt}; - } - - template - auto _process_null_aware_left_anti_join_for_empty_build_side( - int probe_idx, int probe_rows, uint32_t* __restrict probe_idxs, - uint32_t* __restrict build_idxs, doris::vectorized::ColumnFilterHelper* mark_column) { - static_assert(JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN); - auto matched_cnt = 0; - const auto batch_size = max_batch_size; - - while (probe_idx < probe_rows && matched_cnt < batch_size) { - probe_idxs[matched_cnt] = probe_idx++; - if constexpr (is_mark_join) { - build_idxs[matched_cnt] = 0; - } - ++matched_cnt; - } - - if constexpr (is_mark_join && !with_other_conjuncts) { - mark_column->resize_fill(matched_cnt, 1); - } - - return std::tuple {probe_idx, 0U, matched_cnt}; - } - - auto _find_batch_right_semi_anti(const Key* __restrict keys, - const uint32_t* __restrict build_idx_map, int probe_idx, - int probe_rows) { - while (probe_idx < probe_rows) { - auto build_idx = build_idx_map[probe_idx]; - - while (build_idx) { - if (!visited[build_idx] && keys[probe_idx] == build_keys[build_idx]) { - visited[build_idx] = 1; - } - build_idx = next[build_idx]; - } - probe_idx++; - } - return std::tuple {probe_idx, 0U, 0}; - } - - template - auto _find_batch_left_semi_anti(const Key* __restrict keys, - const uint32_t* __restrict build_idx_map, int probe_idx, - int probe_rows, uint32_t* __restrict probe_idxs) { - auto matched_cnt = 0; - const auto batch_size = max_batch_size; - - while (probe_idx < probe_rows && matched_cnt < batch_size) { - if constexpr (need_judge_null) { - if (build_idx_map[probe_idx] == bucket_size) { - probe_idx++; - continue; - } - } - - auto build_idx = build_idx_map[probe_idx]; - - while (build_idx && keys[probe_idx] != build_keys[build_idx]) { - build_idx = next[build_idx]; - } - bool matched = - JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ? build_idx != 0 : build_idx == 0; - probe_idxs[matched_cnt] = probe_idx++; - matched_cnt += matched; - } - return std::tuple {probe_idx, 0U, matched_cnt}; - } - - template - auto _find_batch_conjunct(const Key* __restrict keys, const uint32_t* __restrict build_idx_map, - int probe_idx, uint32_t build_idx, int probe_rows, - uint32_t* __restrict probe_idxs, uint32_t* __restrict build_idxs) { - auto matched_cnt = 0; - const auto batch_size = max_batch_size; - - auto do_the_probe = [&]() { - while (build_idx && matched_cnt < batch_size) { - if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN || - JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) { - if (!visited[build_idx] && keys[probe_idx] == build_keys[build_idx]) { - probe_idxs[matched_cnt] = probe_idx; - build_idxs[matched_cnt] = build_idx; - matched_cnt++; - } - } else if (keys[probe_idx] == build_keys[build_idx]) { - build_idxs[matched_cnt] = build_idx; - probe_idxs[matched_cnt] = probe_idx; - matched_cnt++; - } - build_idx = next[build_idx]; - } - - if constexpr (JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN || - JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN || - JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN || - JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - // may over batch_size when emplace 0 into build_idxs - if (!build_idx) { - probe_idxs[matched_cnt] = probe_idx; - build_idxs[matched_cnt] = 0; - matched_cnt++; - } - } - - probe_idx++; - }; - - if (build_idx) { - do_the_probe(); - } - - while (probe_idx < probe_rows && matched_cnt < batch_size) { - build_idx = build_idx_map[probe_idx]; - do_the_probe(); - } - - probe_idx -= (build_idx != 0); - return std::tuple {probe_idx, build_idx, matched_cnt}; - } - - template - auto _find_batch_inner_outer_join(const Key* __restrict keys, - const uint32_t* __restrict build_idx_map, int probe_idx, - uint32_t build_idx, int probe_rows, - uint32_t* __restrict probe_idxs, bool& probe_visited, - uint32_t* __restrict build_idxs) { - auto matched_cnt = 0; - const auto batch_size = max_batch_size; - - auto do_the_probe = [&]() { - while (build_idx && matched_cnt < batch_size) { - if (keys[probe_idx] == build_keys[build_idx]) { - probe_idxs[matched_cnt] = probe_idx; - build_idxs[matched_cnt] = build_idx; - matched_cnt++; - if constexpr (JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN || - JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN) { - if (!visited[build_idx]) { - visited[build_idx] = 1; - } - } - } - build_idx = next[build_idx]; - } - - if constexpr (JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN || - JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN) { - // `(!matched_cnt || probe_idxs[matched_cnt - 1] != probe_idx)` means not match one build side - probe_visited |= (matched_cnt && probe_idxs[matched_cnt - 1] == probe_idx); - if (!build_idx) { - if (!probe_visited) { - probe_idxs[matched_cnt] = probe_idx; - build_idxs[matched_cnt] = 0; - matched_cnt++; - } - probe_visited = false; - } - } - probe_idx++; - }; - - if (build_idx) { - do_the_probe(); - } - - while (probe_idx < probe_rows && matched_cnt < batch_size) { - build_idx = build_idx_map[probe_idx]; - do_the_probe(); - } - - probe_idx -= (build_idx != 0); - return std::tuple {probe_idx, build_idx, matched_cnt}; - } - - const Key* __restrict build_keys; - std::vector visited; - - uint32_t bucket_size = 1; - int max_batch_size = 4064; - - std::vector first = {0}; - std::vector next = {0}; - - // use in iter hash map - mutable uint32_t iter_idx = 1; - Cell cell; - doris::vectorized::Arena* pool; - bool _has_null_key = false; - bool _empty_build_side = true; -}; - template , typename Grower = HashTableGrower<>, typename Allocator = HashTableAllocator> using HashMap = HashMapTable, Hash, Grower, Allocator>; template > -using JoinFixedHashMap = JoinHashMapTable, Hash>; +using NormalHashMap = HashMapTable, Hash>; + +template > +using JoinHashMap = JoinHashTable; template , typename Grower = HashTableGrower<>, typename Allocator = HashTableAllocator> @@ -577,3 +215,5 @@ using HashMapWithStackMemory = HashMapTable< HashTableGrower, HashTableAllocatorWithStackMemory<(1ULL << initial_size_degree) * sizeof(HashMapCellWithSavedHash)>>; + +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/common/hash_table/hash_map_context.h b/be/src/vec/common/hash_table/hash_map_context.h index d96aa2d7c6..f8861ccfcd 100644 --- a/be/src/vec/common/hash_table/hash_map_context.h +++ b/be/src/vec/common/hash_table/hash_map_context.h @@ -31,6 +31,7 @@ #include "vec/common/hash_table/string_hash_map.h" #include "vec/common/string_ref.h" #include "vec/core/types.h" +#include "vec/exec/join/join_op.h" #include "vec/utils/util.hpp" namespace doris::vectorized { @@ -41,15 +42,13 @@ template struct DataWithNullKey; template -struct MethodBase { +struct MethodBaseInner { using Key = typename HashMap::key_type; using Mapped = typename HashMap::mapped_type; using Value = typename HashMap::value_type; - using Iterator = typename HashMap::iterator; using HashMapType = HashMap; std::shared_ptr hash_table; - Iterator iterator; bool inited_iterator = false; Key* keys = nullptr; Arena arena; @@ -58,21 +57,14 @@ struct MethodBase { // use in join case std::vector bucket_nums; - MethodBase() { hash_table.reset(new HashMap()); } - virtual ~MethodBase() = default; + MethodBaseInner() { hash_table.reset(new HashMap()); } + virtual ~MethodBaseInner() = default; virtual void reset() { arena.clear(); inited_iterator = false; } - void init_iterator() { - if (!inited_iterator) { - inited_iterator = true; - iterator = hash_table->begin(); - } - } - virtual void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, const uint8_t* null_map = nullptr, bool is_join = false, bool is_build = false, uint32_t bucket_size = 0) = 0; @@ -170,6 +162,29 @@ struct MethodBase { size_t num_rows) = 0; }; +template +concept IteratoredMap = requires(T* map) { typename T::iterator; }; + +template +struct MethodBase : public MethodBaseInner { + using Iterator = void*; + Iterator iterator; + void init_iterator() { MethodBaseInner::inited_iterator = true; } +}; + +template +struct MethodBase : public MethodBaseInner { + using Iterator = typename HashMap::iterator; + using Base = MethodBaseInner; + Iterator iterator; + void init_iterator() { + if (!Base::inited_iterator) { + Base::inited_iterator = true; + iterator = Base::hash_table->begin(); + } + } +}; + template struct MethodSerialized : public MethodBase { using Base = MethodBase; @@ -555,14 +570,23 @@ struct MethodSingleNullableColumn : public SingleColumnMethod { }; template -using SerializedHashTableContext = MethodSerialized>; +using SerializedHashTableContext = MethodSerialized>; template using PrimaryTypeHashTableContext = - MethodOneNumber>>; + MethodOneNumber>>; template -using FixedKeyHashTableContext = - MethodKeysFixed>, has_null>; +using FixedKeyHashTableContext = MethodKeysFixed>, has_null>; + +template +using SetFixedKeyHashTableContext = + MethodKeysFixed>, has_null>; + +template +using SetPrimaryTypeHashTableContext = + MethodOneNumber>>; + +using SetSerializedHashTableContext = MethodSerialized>; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/common/hash_table/join_hash_table.h b/be/src/vec/common/hash_table/join_hash_table.h new file mode 100644 index 0000000000..b190d3d89c --- /dev/null +++ b/be/src/vec/common/hash_table/join_hash_table.h @@ -0,0 +1,385 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "vec/columns/column_filter_helper.h" +#include "vec/common/hash_table/hash.h" +#include "vec/common/hash_table/hash_table.h" +#include "vec/common/hash_table/hash_table_allocator.h" + +namespace doris { +template > +class JoinHashTable { +public: + using key_type = Key; + using mapped_type = void*; + using value_type = void*; + size_t hash(const Key& x) const { return Hash()(x); } + + static uint32_t calc_bucket_size(size_t num_elem) { + size_t expect_bucket_size = num_elem + (num_elem - 1) / 7; + return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1; + } + + size_t get_byte_size() const { + auto cal_vector_mem = [](const auto& vec) { return vec.capacity() * sizeof(vec[0]); }; + return cal_vector_mem(visited) + cal_vector_mem(first) + cal_vector_mem(next); + } + + template + void prepare_build(size_t num_elem, int batch_size, bool has_null_key) { + _has_null_key = has_null_key; + + // the first row in build side is not really from build side table + _empty_build_side = num_elem <= 1; + max_batch_size = batch_size; + bucket_size = calc_bucket_size(num_elem + 1); + first.resize(bucket_size + 1); + next.resize(num_elem); + + if constexpr (JoinOpType == TJoinOp::FULL_OUTER_JOIN || + JoinOpType == TJoinOp::RIGHT_OUTER_JOIN || + JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || + JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { + visited.resize(num_elem); + } + } + + uint32_t get_bucket_size() const { return bucket_size; } + + size_t size() const { return next.size(); } + + std::vector& get_visited() { return visited; } + + void build(const Key* __restrict keys, const uint32_t* __restrict bucket_nums, + size_t num_elem) { + build_keys = keys; + for (size_t i = 1; i < num_elem; i++) { + uint32_t bucket_num = bucket_nums[i]; + next[i] = first[bucket_num]; + first[bucket_num] = i; + } + first[bucket_size] = 0; // index = bucket_num means null + } + + template + auto find_batch(const Key* __restrict keys, const uint32_t* __restrict build_idx_map, + int probe_idx, uint32_t build_idx, int probe_rows, + uint32_t* __restrict probe_idxs, bool& probe_visited, + uint32_t* __restrict build_idxs, vectorized::ColumnFilterHelper* mark_column) { + if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + if (_empty_build_side) { + return _process_null_aware_left_anti_join_for_empty_build_side< + JoinOpType, with_other_conjuncts, is_mark_join>( + probe_idx, probe_rows, probe_idxs, build_idxs, mark_column); + } + } + + if constexpr (with_other_conjuncts) { + return _find_batch_conjunct(keys, build_idx_map, probe_idx, build_idx, + probe_rows, probe_idxs, build_idxs); + } + + if constexpr (is_mark_join) { + return _find_batch_mark(keys, build_idx_map, probe_idx, probe_rows, + probe_idxs, build_idxs, mark_column); + } + + if constexpr (JoinOpType == TJoinOp::INNER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN || + JoinOpType == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) { + return _find_batch_inner_outer_join(keys, build_idx_map, probe_idx, + build_idx, probe_rows, probe_idxs, + probe_visited, build_idxs); + } + if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::LEFT_SEMI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + return _find_batch_left_semi_anti( + keys, build_idx_map, probe_idx, probe_rows, probe_idxs); + } + if constexpr (JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || + JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { + return _find_batch_right_semi_anti(keys, build_idx_map, probe_idx, probe_rows); + } + return std::tuple {0, 0U, 0}; + } + + template + bool iterate_map(std::vector& build_idxs) const { + const auto batch_size = max_batch_size; + const auto elem_num = visited.size(); + int count = 0; + build_idxs.resize(batch_size); + + while (count < batch_size && iter_idx < elem_num) { + const auto matched = visited[iter_idx]; + build_idxs[count] = iter_idx; + if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) { + count += !matched; + } else { + count += matched; + } + iter_idx++; + } + + build_idxs.resize(count); + return iter_idx >= elem_num; + } + + bool has_null_key() { return _has_null_key; } + + void pre_build_idxs(std::vector& bucksets, const uint8_t* null_map) { + if (null_map) { + first[bucket_size] = bucket_size; // distinguish between not matched and null + } + + for (uint32_t i = 0; i < bucksets.size(); i++) { + bucksets[i] = first[bucksets[i]]; + } + } + +private: + // only LEFT_ANTI_JOIN/LEFT_SEMI_JOIN/NULL_AWARE_LEFT_ANTI_JOIN/CROSS_JOIN support mark join + template + auto _find_batch_mark(const Key* __restrict keys, const uint32_t* __restrict build_idx_map, + int probe_idx, int probe_rows, uint32_t* __restrict probe_idxs, + uint32_t* __restrict build_idxs, + vectorized::ColumnFilterHelper* mark_column) { + auto matched_cnt = 0; + const auto batch_size = max_batch_size; + + while (probe_idx < probe_rows && matched_cnt < batch_size) { + auto build_idx = build_idx_map[probe_idx] == bucket_size ? 0 : build_idx_map[probe_idx]; + + while (build_idx && keys[probe_idx] != build_keys[build_idx]) { + build_idx = next[build_idx]; + } + + if (build_idx_map[probe_idx] == bucket_size) { + // mark result as null when probe row is null + mark_column->insert_null(); + } else { + bool matched = + JoinOpType == TJoinOp::LEFT_SEMI_JOIN ? build_idx != 0 : build_idx == 0; + if (!matched && _has_null_key) { + mark_column->insert_null(); + } else { + mark_column->insert_value(matched); + } + } + + probe_idxs[matched_cnt] = probe_idx++; + build_idxs[matched_cnt] = build_idx; + matched_cnt++; + } + return std::tuple {probe_idx, 0U, matched_cnt}; + } + + template + auto _process_null_aware_left_anti_join_for_empty_build_side( + int probe_idx, int probe_rows, uint32_t* __restrict probe_idxs, + uint32_t* __restrict build_idxs, vectorized::ColumnFilterHelper* mark_column) { + static_assert(JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN); + auto matched_cnt = 0; + const auto batch_size = max_batch_size; + + while (probe_idx < probe_rows && matched_cnt < batch_size) { + probe_idxs[matched_cnt] = probe_idx++; + if constexpr (is_mark_join) { + build_idxs[matched_cnt] = 0; + } + ++matched_cnt; + } + + if constexpr (is_mark_join && !with_other_conjuncts) { + mark_column->resize_fill(matched_cnt, 1); + } + + return std::tuple {probe_idx, 0U, matched_cnt}; + } + + auto _find_batch_right_semi_anti(const Key* __restrict keys, + const uint32_t* __restrict build_idx_map, int probe_idx, + int probe_rows) { + while (probe_idx < probe_rows) { + auto build_idx = build_idx_map[probe_idx]; + + while (build_idx) { + if (!visited[build_idx] && keys[probe_idx] == build_keys[build_idx]) { + visited[build_idx] = 1; + } + build_idx = next[build_idx]; + } + probe_idx++; + } + return std::tuple {probe_idx, 0U, 0}; + } + + template + auto _find_batch_left_semi_anti(const Key* __restrict keys, + const uint32_t* __restrict build_idx_map, int probe_idx, + int probe_rows, uint32_t* __restrict probe_idxs) { + auto matched_cnt = 0; + const auto batch_size = max_batch_size; + + while (probe_idx < probe_rows && matched_cnt < batch_size) { + if constexpr (need_judge_null) { + if (build_idx_map[probe_idx] == bucket_size) { + probe_idx++; + continue; + } + } + + auto build_idx = build_idx_map[probe_idx]; + + while (build_idx && keys[probe_idx] != build_keys[build_idx]) { + build_idx = next[build_idx]; + } + bool matched = JoinOpType == TJoinOp::LEFT_SEMI_JOIN ? build_idx != 0 : build_idx == 0; + probe_idxs[matched_cnt] = probe_idx++; + matched_cnt += matched; + } + return std::tuple {probe_idx, 0U, matched_cnt}; + } + + template + auto _find_batch_conjunct(const Key* __restrict keys, const uint32_t* __restrict build_idx_map, + int probe_idx, uint32_t build_idx, int probe_rows, + uint32_t* __restrict probe_idxs, uint32_t* __restrict build_idxs) { + auto matched_cnt = 0; + const auto batch_size = max_batch_size; + + auto do_the_probe = [&]() { + while (build_idx && matched_cnt < batch_size) { + if constexpr (JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || + JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { + if (!visited[build_idx] && keys[probe_idx] == build_keys[build_idx]) { + probe_idxs[matched_cnt] = probe_idx; + build_idxs[matched_cnt] = build_idx; + matched_cnt++; + } + } else if (keys[probe_idx] == build_keys[build_idx]) { + build_idxs[matched_cnt] = build_idx; + probe_idxs[matched_cnt] = probe_idx; + matched_cnt++; + } + build_idx = next[build_idx]; + } + + if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType == TJoinOp::FULL_OUTER_JOIN || + JoinOpType == TJoinOp::LEFT_SEMI_JOIN || + JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + // may over batch_size when emplace 0 into build_idxs + if (!build_idx) { + probe_idxs[matched_cnt] = probe_idx; + build_idxs[matched_cnt] = 0; + matched_cnt++; + } + } + + probe_idx++; + }; + + if (build_idx) { + do_the_probe(); + } + + while (probe_idx < probe_rows && matched_cnt < batch_size) { + build_idx = build_idx_map[probe_idx]; + do_the_probe(); + } + + probe_idx -= (build_idx != 0); + return std::tuple {probe_idx, build_idx, matched_cnt}; + } + + template + auto _find_batch_inner_outer_join(const Key* __restrict keys, + const uint32_t* __restrict build_idx_map, int probe_idx, + uint32_t build_idx, int probe_rows, + uint32_t* __restrict probe_idxs, bool& probe_visited, + uint32_t* __restrict build_idxs) { + auto matched_cnt = 0; + const auto batch_size = max_batch_size; + + auto do_the_probe = [&]() { + while (build_idx && matched_cnt < batch_size) { + if (keys[probe_idx] == build_keys[build_idx]) { + probe_idxs[matched_cnt] = probe_idx; + build_idxs[matched_cnt] = build_idx; + matched_cnt++; + if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN || + JoinOpType == TJoinOp::FULL_OUTER_JOIN) { + if (!visited[build_idx]) { + visited[build_idx] = 1; + } + } + } + build_idx = next[build_idx]; + } + + if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType == TJoinOp::FULL_OUTER_JOIN) { + // `(!matched_cnt || probe_idxs[matched_cnt - 1] != probe_idx)` means not match one build side + probe_visited |= (matched_cnt && probe_idxs[matched_cnt - 1] == probe_idx); + if (!build_idx) { + if (!probe_visited) { + probe_idxs[matched_cnt] = probe_idx; + build_idxs[matched_cnt] = 0; + matched_cnt++; + } + probe_visited = false; + } + } + probe_idx++; + }; + + if (build_idx) { + do_the_probe(); + } + + while (probe_idx < probe_rows && matched_cnt < batch_size) { + build_idx = build_idx_map[probe_idx]; + do_the_probe(); + } + + probe_idx -= (build_idx != 0); + return std::tuple {probe_idx, build_idx, matched_cnt}; + } + + const Key* __restrict build_keys; + std::vector visited; + + uint32_t bucket_size = 1; + int max_batch_size = 4064; + + std::vector first = {0}; + std::vector next = {0}; + + // use in iter hash map + mutable uint32_t iter_idx = 1; + vectorized::Arena* pool; + bool _has_null_key = false; + bool _empty_build_side = true; +}; +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/common/hash_table/partitioned_hash_map.h b/be/src/vec/common/hash_table/partitioned_hash_map.h index f23b0a347d..a2db6fece3 100644 --- a/be/src/vec/common/hash_table/partitioned_hash_map.h +++ b/be/src/vec/common/hash_table/partitioned_hash_map.h @@ -22,7 +22,7 @@ #include "vec/common/hash_table/hash_map.h" #include "vec/common/hash_table/partitioned_hash_table.h" #include "vec/common/hash_table/ph_hash_map.h" - +namespace doris { template class PartitionedHashMapTable : public PartitionedHashTable { public: @@ -57,3 +57,4 @@ using PartitionedHashMap = template > using PHNormalHashMap = PHHashMap; +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/common/hash_table/string_hash_map.h b/be/src/vec/common/hash_table/string_hash_map.h index f1efd0fab1..61d304cf7d 100644 --- a/be/src/vec/common/hash_table/string_hash_map.h +++ b/be/src/vec/common/hash_table/string_hash_map.h @@ -23,6 +23,7 @@ #include "vec/common/hash_table/hash_map.h" #include "vec/common/hash_table/string_hash_table.h" +namespace doris { template struct StringHashMapCell : public HashMapCell { using Base = HashMapCell; @@ -152,3 +153,4 @@ public: } bool has_null_key_data() const { return false; } }; +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 39e0593639..e6c00d94a2 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -1049,7 +1049,7 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) { return; } - if (!try_get_hash_map_context_fixed( + if (!try_get_hash_map_context_fixed( *_hash_table_variants, _build_expr_ctxs)) { _hash_table_variants->emplace>(); } diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 3c47638ef4..75317b4c93 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -183,16 +183,16 @@ void VSetOperationNode::hash_table_init() { switch (_child_expr_lists[0][0]->root()->result_type()) { case TYPE_BOOLEAN: case TYPE_TINYINT: - _hash_table_variants->emplace>(); + _hash_table_variants->emplace>(); break; case TYPE_SMALLINT: - _hash_table_variants->emplace>(); + _hash_table_variants->emplace>(); break; case TYPE_INT: case TYPE_FLOAT: case TYPE_DATEV2: case TYPE_DECIMAL32: - _hash_table_variants->emplace>(); + _hash_table_variants->emplace>(); break; case TYPE_BIGINT: case TYPE_DOUBLE: @@ -200,21 +200,21 @@ void VSetOperationNode::hash_table_init() { case TYPE_DATE: case TYPE_DECIMAL64: case TYPE_DATETIMEV2: - _hash_table_variants->emplace>(); + _hash_table_variants->emplace>(); break; case TYPE_LARGEINT: case TYPE_DECIMALV2: case TYPE_DECIMAL128I: - _hash_table_variants->emplace>(); + _hash_table_variants->emplace>(); break; default: - _hash_table_variants->emplace>(); + _hash_table_variants->emplace(); } return; } - if (!try_get_hash_map_context_fixed( + if (!try_get_hash_map_context_fixed( *_hash_table_variants, _child_expr_lists[0])) { - _hash_table_variants->emplace>(); + _hash_table_variants->emplace(); } } diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index b1ab9c4765..ce5a8eb1db 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -31,6 +31,7 @@ #include "vec/columns/column.h" #include "vec/common/arena.h" #include "vec/core/block.h" +#include "vec/core/types.h" #include "vec/exec/join/process_hash_table_probe.h" #include "vec/exec/join/vhash_join_node.h" @@ -45,18 +46,14 @@ class VExprContext; struct RowRefListWithFlags; using SetHashTableVariants = std::variant< - std::monostate, SerializedHashTableContext, - I8HashTableContext, I16HashTableContext, - I32HashTableContext, I64HashTableContext, - I128HashTableContext, I256HashTableContext, - I64FixedKeyHashTableContext, - I64FixedKeyHashTableContext, - I128FixedKeyHashTableContext, - I128FixedKeyHashTableContext, - I256FixedKeyHashTableContext, - I256FixedKeyHashTableContext, - I136FixedKeyHashTableContext, - I136FixedKeyHashTableContext>; + std::monostate, MethodSerialized>, + SetPrimaryTypeHashTableContext, SetPrimaryTypeHashTableContext, + SetPrimaryTypeHashTableContext, SetPrimaryTypeHashTableContext, + SetPrimaryTypeHashTableContext, SetPrimaryTypeHashTableContext, + SetFixedKeyHashTableContext, SetFixedKeyHashTableContext, + SetFixedKeyHashTableContext, SetFixedKeyHashTableContext, + SetFixedKeyHashTableContext, SetFixedKeyHashTableContext, + SetFixedKeyHashTableContext, SetFixedKeyHashTableContext>; template class VSetOperationNode final : public ExecNode {