From 5fc04b6aebbc33c053e2e4063505c3a753f087ae Mon Sep 17 00:00:00 2001 From: Pxl Date: Wed, 27 Sep 2023 16:14:49 +0800 Subject: [PATCH] [Improvement](hash) some refactor of process hash table probe impl (#24461) some refactor of process hash table probe impl --- .../exec/aggregation_sink_operator.cpp | 23 +- ...ct_streaming_aggregation_sink_operator.cpp | 7 +- .../pipeline/exec/hashjoin_probe_operator.cpp | 43 +- .../pipeline/exec/hashjoin_probe_operator.h | 2 +- .../exec/partition_sort_sink_operator.cpp | 25 +- .../streaming_aggregation_sink_operator.cpp | 4 +- be/src/util/sse_util.hpp | 11 - be/src/vec/common/aggregation_common.h | 63 - be/src/vec/common/columns_hashing.h | 77 +- be/src/vec/common/columns_hashing_impl.h | 40 +- be/src/vec/common/hash_table/hash_map.h | 3 - be/src/vec/common/hash_table/hash_table.h | 31 +- .../hash_table/partitioned_hash_table.h | 24 - be/src/vec/common/hash_table/ph_hash_map.h | 5 - .../vec/exec/distinct_vaggregation_node.cpp | 2 +- be/src/vec/exec/join/join_op.h | 27 - .../vec/exec/join/process_hash_table_probe.h | 55 +- .../exec/join/process_hash_table_probe_impl.h | 1432 +++++++---------- be/src/vec/exec/join/vhash_join_node.cpp | 40 +- be/src/vec/exec/join/vhash_join_node.h | 66 +- be/src/vec/exec/vaggregation_node.cpp | 24 +- be/src/vec/exec/vpartition_sort_node.cpp | 24 +- be/src/vec/exec/vset_operation_node.cpp | 18 +- 23 files changed, 791 insertions(+), 1255 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 7a695699e9..426c8e99b8 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -544,7 +544,7 @@ void AggSinkLocalState::_emplace_into_hash_table( }; if constexpr (HashTableTraits::is_phmap) { - auto keys = state.get_keys(num_rows); + const auto& keys = state.get_keys(); if (_hash_values.size() < num_rows) { _hash_values.resize(num_rows); } @@ -602,19 +602,12 @@ void AggSinkLocalState::_find_in_hash_table( AggState state(key_columns, Base::_shared_state->probe_key_sz, nullptr); _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); - + const auto& keys = state.get_keys(); if constexpr (HashTableTraits::is_phmap) { - if (_hash_values.size() < num_rows) _hash_values.resize(num_rows); - if constexpr (vectorized::ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - AggState>::value) { - for (size_t i = 0; i < num_rows; ++i) { - _hash_values[i] = agg_method.data.hash(agg_method.keys[i]); - } - } else { - for (size_t i = 0; i < num_rows; ++i) { - _hash_values[i] = - agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool)); - } + _hash_values.resize(num_rows); + + for (size_t i = 0; i < num_rows; ++i) { + _hash_values[i] = agg_method.data.hash(keys[i]); } } @@ -627,8 +620,8 @@ void AggSinkLocalState::_find_in_hash_table( _hash_values[i + HASH_MAP_PREFETCH_DIST]); } - return state.find_key_with_hash(agg_method.data, _hash_values[i], i, - *_agg_arena_pool); + return state.find_key_with_hash(agg_method.data, _hash_values[i], + keys[i]); } else { return state.find_key(agg_method.data, i, *_agg_arena_pool); } diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp index 1aa0fbaffe..4fa798194c 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp @@ -161,7 +161,7 @@ void DistinctStreamingAggSinkLocalState::_emplace_into_hash_table_to_distinct( _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); if constexpr (HashTableTraits::is_phmap) { - auto keys = state.get_keys(num_rows); + const auto& keys = state.get_keys(); if (_hash_values.size() < num_rows) { _hash_values.resize(num_rows); } @@ -175,9 +175,8 @@ void DistinctStreamingAggSinkLocalState::_emplace_into_hash_table_to_distinct( agg_method.data.prefetch_by_hash( _hash_values[i + HASH_MAP_PREFETCH_DIST]); } - auto result = state.emplace_with_key( - agg_method.data, state.pack_key_holder(keys[i], *_agg_arena_pool), - _hash_values[i], i); + auto result = state.emplace_with_key(agg_method.data, keys[i], + _hash_values[i], i); if (result.is_inserted()) { distinct_row.push_back(i); } diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 0a0438720a..0a4b528be3 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -60,7 +60,7 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) void HashJoinProbeLocalState::prepare_for_next() { _probe_index = 0; - _ready_probe_index = 0; + _ready_probe = false; _prepare_probe_block(); } @@ -240,33 +240,20 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc if constexpr (!std::is_same_v) { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - if (_have_other_join_conjunct) { - st = process_hashtable_ctx - .template do_process_with_other_join_conjuncts< - need_null_map_for_probe, ignore_null>( - arg, - need_null_map_for_probe - ? &local_state._null_map_column - ->get_data() - : nullptr, - mutable_join_block, &temp_block, - local_state._probe_block.rows(), - _is_mark_join); - } else { - st = process_hashtable_ctx.template do_process< - need_null_map_for_probe, ignore_null>( - arg, - need_null_map_for_probe - ? &local_state._null_map_column->get_data() - : nullptr, - mutable_join_block, &temp_block, - local_state._probe_block.rows(), _is_mark_join); - } + st = process_hashtable_ctx.template process( + arg, + need_null_map_for_probe + ? &local_state._null_map_column->get_data() + : nullptr, + mutable_join_block, &temp_block, + local_state._probe_block.rows(), _is_mark_join, + _have_other_join_conjunct); } else { - LOG(FATAL) << "FATAL: uninited hash table"; + st = Status::InternalError("uninited hash table"); } } else { - LOG(FATAL) << "FATAL: uninited hash table probe"; + st = Status::InternalError("uninited hash table probe"); } }, *local_state._shared_state->hash_table_variants, @@ -282,15 +269,15 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc if constexpr (!std::is_same_v) { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - bool eos; + bool eos = false; st = process_hashtable_ctx.process_data_in_hashtable( arg, mutable_join_block, &temp_block, &eos); source_state = eos ? SourceState::FINISHED : source_state; } else { - LOG(FATAL) << "FATAL: uninited hash table"; + st = Status::InternalError("uninited hash table"); } } else { - LOG(FATAL) << "FATAL: uninited hash table probe"; + st = Status::InternalError("uninited hash table probe"); } }, *local_state._shared_state->hash_table_variants, diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 5c451b9fde..f4be3d9051 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -70,7 +70,7 @@ private: friend struct vectorized::HashJoinProbeContext; int _probe_index = -1; - int _ready_probe_index = -1; + bool _ready_probe = false; bool _probe_eos = false; std::atomic _probe_inited = false; diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index da792983b1..2bc19bf391 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -183,20 +183,12 @@ void PartitionSortSinkOperatorX::_emplace_into_hash_table( _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); //PHHashMap + const auto& keys = state.get_keys(); if constexpr (HashTableTraits::is_phmap) { - if (local_state._hash_values.size() < num_rows) { - local_state._hash_values.resize(num_rows); - } - if constexpr (vectorized::ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - AggState>::value) { - for (size_t i = 0; i < num_rows; ++i) { - local_state._hash_values[i] = agg_method.data.hash(agg_method.keys[i]); - } - } else { - for (size_t i = 0; i < num_rows; ++i) { - local_state._hash_values[i] = agg_method.data.hash( - state.get_key_holder(i, *local_state._agg_arena_pool)); - } + local_state._hash_values.resize(num_rows); + + for (size_t i = 0; i < num_rows; ++i) { + local_state._hash_values[i] = agg_method.data.hash(keys[i]); } } @@ -209,11 +201,10 @@ void PartitionSortSinkOperatorX::_emplace_into_hash_table( agg_method.data.prefetch_by_hash( local_state._hash_values[row + HASH_MAP_PREFETCH_DIST]); } - return state.emplace_key(agg_method.data, local_state._hash_values[row], - row, *local_state._agg_arena_pool); + return state.emplace_with_key(agg_method.data, keys[row], + local_state._hash_values[row], row); } else { - return state.emplace_key(agg_method.data, row, - *local_state._agg_arena_pool); + return state.emplace_with_key(agg_method.data, keys[row], row); } }(); diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp index b739103c94..154d0144f3 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp @@ -243,9 +243,7 @@ Status StreamingAggSinkLocalState::_pre_agg_with_serialized_key( } int rows = in_block->rows(); - if (_places.size() < rows) { - _places.resize(rows); - } + _places.resize(rows); // Stop expanding hash tables if we're not reducing the input sufficiently. As our // hash tables expand out of each level of cache hierarchy, every hash table lookup diff --git a/be/src/util/sse_util.hpp b/be/src/util/sse_util.hpp index a14c281c20..95d1064a36 100644 --- a/be/src/util/sse_util.hpp +++ b/be/src/util/sse_util.hpp @@ -37,7 +37,6 @@ namespace sse_util { // Number of characters that fit in 64/128 bit register. // SSE provides instructions for loading 64 or 128 bits into a register // at a time. -static const int CHARS_PER_64_BIT_REGISTER = 8; static const int CHARS_PER_128_BIT_REGISTER = 16; // SSE4.2 adds instructions for textprocessing. The instructions accept @@ -48,19 +47,9 @@ static const int CHARS_PER_128_BIT_REGISTER = 16; // - SIDD_NEGATIVE_POLARITY - toggles whether to set result to 1 or 0 when a // match is found. -// In this mode, sse text processing functions will return a mask of all the characters that -// matched -static const int STRCHR_MODE = _SIDD_CMP_EQUAL_ANY | _SIDD_UBYTE_OPS; - // In this mode, sse text processing functions will return the number of bytes that match // consecutively from the beginning. static const int STRCMP_MODE = _SIDD_CMP_EQUAL_EACH | _SIDD_UBYTE_OPS | _SIDD_NEGATIVE_POLARITY; -// Precomputed mask values up to 16 bits. -static const int SSE_BITMASK[CHARS_PER_128_BIT_REGISTER] = { - 1 << 0, 1 << 1, 1 << 2, 1 << 3, 1 << 4, 1 << 5, 1 << 6, 1 << 7, - 1 << 8, 1 << 9, 1 << 10, 1 << 11, 1 << 12, 1 << 13, 1 << 14, 1 << 15, -}; - } // namespace sse_util } // namespace doris diff --git a/be/src/vec/common/aggregation_common.h b/be/src/vec/common/aggregation_common.h index 68ee8e4cca..022823e6aa 100644 --- a/be/src/vec/common/aggregation_common.h +++ b/be/src/vec/common/aggregation_common.h @@ -40,69 +40,6 @@ inline size_t get_bitmap_size(size_t key_number) { using Sizes = std::vector; -template -T pack_fixed(size_t i, size_t keys_size, const ColumnRawPtrs& key_columns, const Sizes& key_sizes, - const ColumnRawPtrs& nullmap_columns) { - union { - T key; - char bytes[sizeof(key)] = {}; - }; - - size_t bitmap_size = get_bitmap_size(nullmap_columns.size()); - size_t offset = bitmap_size; - - for (size_t j = 0; j < keys_size; ++j) { - bool is_null = false; - - if (bitmap_size && nullmap_columns[j] != nullptr) { - is_null = nullmap_columns[j]->get_bool(i); - } - - if (is_null) { - size_t bucket = j / 8; - bytes[bucket] |= (1 << (j - bucket * 8)); - offset += key_sizes[j]; - continue; - } - - switch (key_sizes[j]) { - case 1: - memcpy(bytes + offset, - static_cast(key_columns[j])->get_raw_data_begin<1>() + - i, - 1); - break; - case 2: - memcpy(bytes + offset, - static_cast(key_columns[j])->get_raw_data_begin<2>() + - i * 2, - 2); - break; - case 4: - memcpy(bytes + offset, - static_cast(key_columns[j])->get_raw_data_begin<4>() + - i * 4, - 4); - break; - case 8: - memcpy(bytes + offset, - static_cast(key_columns[j])->get_raw_data_begin<8>() + - i * 8, - 8); - break; - default: - memcpy(bytes + offset, - static_cast(key_columns[j])->get_raw_data_begin<1>() + - i * key_sizes[j], - key_sizes[j]); - } - - offset += key_sizes[j]; - } - - return key; -} - template std::vector pack_fixeds(size_t row_numbers, const ColumnRawPtrs& key_columns, const Sizes& key_sizes, const ColumnRawPtrs& nullmap_columns) { diff --git a/be/src/vec/common/columns_hashing.h b/be/src/vec/common/columns_hashing.h index 5c2bf2e50d..21611e7d82 100644 --- a/be/src/vec/common/columns_hashing.h +++ b/be/src/vec/common/columns_hashing.h @@ -48,11 +48,13 @@ struct HashMethodOneNumber : public columns_hashing_impl::HashMethodBase< using Base = columns_hashing_impl::HashMethodBase; const char* vec; + size_t size; /// If the keys of a fixed length then key_sizes contains their lengths, empty otherwise. HashMethodOneNumber(const ColumnRawPtrs& key_columns, const Sizes& /*key_sizes*/, const HashMethodContextPtr&) { vec = key_columns[0]->get_raw_data().data; + size = key_columns[0]->size(); } HashMethodOneNumber(const IColumn* column) { vec = column->get_raw_data().data; } @@ -69,16 +71,11 @@ struct HashMethodOneNumber : public columns_hashing_impl::HashMethodBase< using Base::find_key; /// (Data & data, size_t row, Arena & pool) -> FindResult using Base::find_key_with_hash; - /// Get hash value of row. - using Base::get_hash; /// (const Data & data, size_t row, Arena & pool) -> size_t - /// Is used for default implementation in HashMethodBase. FieldType get_key_holder(size_t row, Arena&) const { return ((FieldType*)(vec))[row]; } FieldType pack_key_holder(FieldType key, Arena&) const { return key; } - std::span get_keys(size_t rows_number) const { - return std::span((FieldType*)vec, rows_number); - } + std::span get_keys() const { return std::span((FieldType*)vec, size); } }; /// For the case when there is one string key. @@ -91,6 +88,7 @@ struct HashMethodString : public columns_hashing_impl::HashMethodBase< const IColumn::Offset* offsets; const UInt8* chars; + std::vector keys; HashMethodString(const ColumnRawPtrs& key_columns, const Sizes& /*key_sizes*/, const HashMethodContextPtr&) { @@ -98,18 +96,23 @@ struct HashMethodString : public columns_hashing_impl::HashMethodBase< const ColumnString& column_string = assert_cast(column); offsets = column_string.get_offsets().data(); chars = column_string.get_chars().data(); + + keys.resize(column_string.size()); + for (size_t row = 0; row < column_string.size(); row++) { + keys[row] = StringRef(chars + offsets[row - 1], offsets[row] - offsets[row - 1]); + } } auto get_key_holder(ssize_t row, [[maybe_unused]] Arena& pool) const { - StringRef key(chars + offsets[row - 1], offsets[row] - offsets[row - 1]); - if constexpr (place_string_to_arena) { - return ArenaKeyHolder {key, pool}; + return ArenaKeyHolder {keys[row], pool}; } else { - return key; + return keys[row]; } } + const std::vector& get_keys() const { return keys; } + protected: friend class columns_hashing_impl::HashMethodBase; }; @@ -151,8 +154,8 @@ struct HashMethodSerialized return KeyHolderType {key, pool}; } - std::span get_keys(size_t rows_number) const { - return std::span(keys, rows_number); + std::span get_keys() const { + return std::span(keys, key_columns[0]->size()); } protected: @@ -202,22 +205,20 @@ struct HashMethodKeysFixed const Sizes& key_sizes; size_t keys_size; + std::vector keys; HashMethodKeysFixed(const ColumnRawPtrs& key_columns, const Sizes& key_sizes_, const HashMethodContextPtr&) - : Base(key_columns), key_sizes(key_sizes_), keys_size(key_columns.size()) {} - - ALWAYS_INLINE Key get_key_holder(size_t row, Arena&) const { - return pack_fixed(row, keys_size, Base::get_actual_columns(), key_sizes, - Base::get_nullmap_columns()); + : Base(key_columns), key_sizes(key_sizes_), keys_size(key_columns.size()) { + keys = pack_fixeds(key_columns[0]->size(), Base::get_actual_columns(), key_sizes, + Base::get_nullmap_columns()); } + ALWAYS_INLINE Key get_key_holder(size_t row, Arena&) const { return keys[row]; } + Key pack_key_holder(Key key, Arena& pool) const { return key; } - std::vector get_keys(size_t rows_number) const { - return pack_fixeds(rows_number, Base::get_actual_columns(), key_sizes, - Base::get_nullmap_columns()); - } + const std::vector& get_keys() const { return keys; } }; template @@ -271,14 +272,38 @@ struct HashMethodSingleLowNullableColumn : public SingleColumnMethod { new (&mapped) Mapped(); } return EmplaceResult(mapped, mapped, inserted); - } else + } else { return EmplaceResult(inserted); + } } - template - ALWAYS_INLINE EmplaceResult emplace_key(Data& data, size_t hash_value, size_t row, - Arena& pool) { - return emplace_with_key(data, Base::get_key_holder(row, pool), hash_value, row, pool); + template + ALWAYS_INLINE EmplaceResult emplace_with_key(Data& data, KeyHolder&& key, size_t row) { + if (key_column->is_null_at(row)) { + bool has_null_key = data.has_null_key_data(); + data.has_null_key_data() = true; + + if constexpr (has_mapped) { + return EmplaceResult(data.get_null_key_data(), data.get_null_key_data(), + !has_null_key); + } else { + return EmplaceResult(!has_null_key); + } + } + + bool inserted = false; + typename Data::LookupResult it; + data.emplace(key, it, inserted); + + if constexpr (has_mapped) { + auto& mapped = *lookup_result_get_mapped(it); + if (inserted) { + new (&mapped) Mapped(); + } + return EmplaceResult(mapped, mapped, inserted); + } else { + return EmplaceResult(inserted); + } } template diff --git a/be/src/vec/common/columns_hashing_impl.h b/be/src/vec/common/columns_hashing_impl.h index eade6b817b..2776e76d0d 100644 --- a/be/src/vec/common/columns_hashing_impl.h +++ b/be/src/vec/common/columns_hashing_impl.h @@ -133,12 +133,9 @@ public: auto key_holder = static_cast(*this).get_key_holder(row, pool); return emplaceImpl(key_holder, data); } - - template - ALWAYS_INLINE EmplaceResult emplace_key(Data& data, size_t hash_value, size_t row, - Arena& pool) { - auto key_holder = static_cast(*this).get_key_holder(row, pool); - return emplaceImpl(key_holder, hash_value, data); + template + EmplaceResult emplace_with_key(Data& data, KeyHolder&& key, size_t row) { + return emplaceImpl(key, data); } template @@ -173,29 +170,9 @@ public: return find_key_impl(key_holder_get_key(key_holder), data); } - template - ALWAYS_INLINE FindResult find_key_with_hash(Data& data, size_t hash_value, size_t row, - Arena& pool) { - auto key_holder = static_cast(*this).get_key_holder(row, pool); - return find_key_impl(key_holder_get_key(key_holder), hash_value, data); - } - - template - ALWAYS_INLINE size_t get_hash(const Data& data, size_t row, Arena& pool) { - auto key_holder = static_cast(*this).get_key_holder(row, pool); - return data.hash(key_holder_get_key(key_holder)); - } - - template - ALWAYS_INLINE void prefetch_by_key(Data& data, size_t row, Arena& pool) { - auto key_holder = static_cast(*this).get_key_holder(row, pool); - data.prefetch_by_key(key_holder); - } - - template - ALWAYS_INLINE void prefetch_by_key(Data& data, size_t row, Arena& pool) { - auto key_holder = static_cast(*this).get_key_holder(row, pool); - data.template prefetch_by_key(key_holder); + template + ALWAYS_INLINE FindResult find_key_with_hash(Data& data, size_t hash_value, Key key) { + return find_key_impl(key, hash_value, data); } template @@ -207,11 +184,6 @@ public: return static_cast(*this).get_key_holder(row, pool); } - template - ALWAYS_INLINE EmplaceResult emplace_key(Data& data, size_t hash_value, KeyHolder key_holder) { - return emplaceImpl(key_holder, hash_value, data); - } - protected: Cache cache; diff --git a/be/src/vec/common/hash_table/hash_map.h b/be/src/vec/common/hash_table/hash_map.h index eb3f47b04b..e3aec8129a 100644 --- a/be/src/vec/common/hash_table/hash_map.h +++ b/be/src/vec/common/hash_table/hash_map.h @@ -90,9 +90,6 @@ struct HashMapCell { /// Do I need to store the zero key separately (that is, can a zero key be inserted into the hash table). static constexpr bool need_zero_value_storage = true; - /// Whether the cell was deleted. - bool is_deleted() const { return false; } - void set_mapped(const value_type& value_) { value.second = value_.second; } }; diff --git a/be/src/vec/common/hash_table/hash_table.h b/be/src/vec/common/hash_table/hash_table.h index ed14e8d4f4..19ae8e2ed3 100644 --- a/be/src/vec/common/hash_table/hash_table.h +++ b/be/src/vec/common/hash_table/hash_table.h @@ -194,9 +194,6 @@ struct HashTableCell { /// Do the hash table need to store the zero key separately (that is, can a zero key be inserted into the hash table). static constexpr bool need_zero_value_storage = true; - /// Whether the cell is deleted. - bool is_deleted() const { return false; } - /// Set the mapped value, if any (for HashMap), to the corresponding `value`. void set_mapped(const value_type& /*value*/) {} @@ -861,14 +858,6 @@ public: return res; } - template - void ALWAYS_INLINE prefetch_by_key(KeyHolder& key_holder) { - const auto& key = key_holder_get_key(key_holder); - auto hash_value = hash(key); - auto place_value = grower.place(hash_value); - __builtin_prefetch(&buf[place_value]); - } - template void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) { // Two optional arguments: @@ -878,17 +867,6 @@ public: __builtin_prefetch(&buf[place_value], READ ? 0 : 1, 1); } - template - void ALWAYS_INLINE prefetch_by_key(KeyHolder& key_holder) { - // Two optional arguments: - // 'rw': 1 means the memory access is write - // 'locality': 0-3. 0 means no temporal locality. 3 means high temporal locality. - const auto& key = key_holder_get_key(key_holder); - auto hash_value = hash(key); - auto place_value = grower.place(hash_value); - __builtin_prefetch(&buf[place_value], READ ? 0 : 1, 1); - } - /// Reinsert node pointed to by iterator void ALWAYS_INLINE reinsert(iterator& it, size_t hash_value) { reinsert(*it.get_ptr(), hash_value); @@ -1121,9 +1099,11 @@ private: * or move to the left of the collision resolution chain, because the elements to the left of it have been moved to the new "right" location. */ size_t i = 0; - for (; i < old_size; ++i) - if (!buf[i].is_zero(*this) && !buf[i].is_deleted()) + for (; i < old_size; ++i) { + if (!buf[i].is_zero(*this)) { reinsert(buf[i], buf[i].get_hash(*this)); + } + } /** There is also a special case: * if the element was to be at the end of the old buffer, [ x] @@ -1133,7 +1113,8 @@ private: * after transferring all the elements from the old halves you need to [ o x ] * process tail from the collision resolution chain immediately after it [ o x ] */ - for (; !buf[i].is_zero(*this) && !buf[i].is_deleted(); ++i) + for (; !buf[i].is_zero(*this); ++i) { reinsert(buf[i], buf[i].get_hash(*this)); + } } }; diff --git a/be/src/vec/common/hash_table/partitioned_hash_table.h b/be/src/vec/common/hash_table/partitioned_hash_table.h index b79fab8ead..9bd0a1597f 100644 --- a/be/src/vec/common/hash_table/partitioned_hash_table.h +++ b/be/src/vec/common/hash_table/partitioned_hash_table.h @@ -386,18 +386,6 @@ public: } } - template - void ALWAYS_INLINE prefetch_by_key(KeyHolder& key_holder) { - if (_is_partitioned) { - const auto& key = key_holder_get_key(key_holder); - const auto key_hash = hash(key); - const auto sub_table_idx = get_sub_table_from_hash(key_hash); - level1_sub_tables[sub_table_idx].prefetch_by_key(key_holder); - } else { - level0_sub_table.prefetch_by_key(key_holder); - } - } - template void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) { if (_is_partitioned) { @@ -419,18 +407,6 @@ public: } } - template - void ALWAYS_INLINE prefetch_by_key(KeyHolder& key_holder) { - if (_is_partitioned) { - const auto& key = key_holder_get_key(key_holder); - const auto key_hash = hash(key); - const auto sub_table_idx = get_sub_table_from_hash(key_hash); - level1_sub_tables[sub_table_idx].template prefetch_by_key(key_holder); - } else { - level0_sub_table.template prefetch_by_key(key_holder); - } - } - /** Insert the key, * return an iterator to a position that can be used for `placement new` of value, * as well as the flag - whether a new key was inserted. diff --git a/be/src/vec/common/hash_table/ph_hash_map.h b/be/src/vec/common/hash_table/ph_hash_map.h index 07aa44647f..dcc3f15cbc 100644 --- a/be/src/vec/common/hash_table/ph_hash_map.h +++ b/be/src/vec/common/hash_table/ph_hash_map.h @@ -239,11 +239,6 @@ public: _hash_map.prefetch_hash(hash_value); } - template - void ALWAYS_INLINE prefetch_by_key(Key key) { - _hash_map.prefetch(key); - } - /// Call func(const Key &, Mapped &) for each hash map element. template void for_each_value(Func&& func) { diff --git a/be/src/vec/exec/distinct_vaggregation_node.cpp b/be/src/vec/exec/distinct_vaggregation_node.cpp index e0dca18d39..f7dc3d35a4 100644 --- a/be/src/vec/exec/distinct_vaggregation_node.cpp +++ b/be/src/vec/exec/distinct_vaggregation_node.cpp @@ -92,7 +92,7 @@ void DistinctAggregationNode::_emplace_into_hash_table_to_distinct(IColumn::Sele _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); if constexpr (HashTableTraits::is_phmap) { - auto keys = state.get_keys(num_rows); + const auto& keys = state.get_keys(); if (_hash_values.size() < num_rows) { _hash_values.resize(num_rows); } diff --git a/be/src/vec/exec/join/join_op.h b/be/src/vec/exec/join/join_op.h index 029454c5cc..1b8b8f2c69 100644 --- a/be/src/vec/exec/join/join_op.h +++ b/be/src/vec/exec/join/join_op.h @@ -103,17 +103,6 @@ public: } RowRefType* operator->() { return &(**this); } - bool operator==(const ForwardIterator& rhs) const { - if (ok() != rhs.ok()) { - return false; - } - if (first && rhs.first) { - return true; - } - return batch == rhs.batch && position == rhs.position; - } - bool operator!=(const ForwardIterator& rhs) const { return !(*this == rhs); } - void operator++() { if (first) { first = false; @@ -131,8 +120,6 @@ public: bool ok() const { return first || batch; } - static ForwardIterator end() { return ForwardIterator(); } - private: RowRefListType* root; bool first; @@ -147,7 +134,6 @@ struct RowRefList : RowRef { RowRefList(size_t row_num_, uint8_t block_offset_) : RowRef(row_num_, block_offset_) {} ForwardIterator begin() { return ForwardIterator(this); } - static ForwardIterator end() { return ForwardIterator::end(); } /// insert element after current one void insert(RowRefType&& row_ref, Arena& pool) { @@ -158,8 +144,6 @@ struct RowRefList : RowRef { next = next->insert(std::move(row_ref), pool); } - bool is_single() const { return next == nullptr; } - private: friend class ForwardIterator; @@ -176,10 +160,6 @@ struct RowRefListWithFlag : RowRef { return ForwardIterator(this); } - static ForwardIterator end() { - return ForwardIterator::end(); - } - /// insert element after current one void insert(RowRef&& row_ref, Arena& pool) { if (!next) { @@ -189,8 +169,6 @@ struct RowRefListWithFlag : RowRef { next = next->insert(std::move(row_ref), pool); } - bool is_single() const { return next == nullptr; } - bool visited = false; private: @@ -209,9 +187,6 @@ struct RowRefListWithFlags : RowRefWithFlag { ForwardIterator const begin() { return ForwardIterator(this); } - static ForwardIterator end() { - return ForwardIterator::end(); - } /// insert element after current one void insert(RowRefWithFlag&& row_ref, Arena& pool) { @@ -222,8 +197,6 @@ struct RowRefListWithFlags : RowRefWithFlag { next = next->insert(std::move(row_ref), pool); } - bool is_single() const { return next == nullptr; } - private: friend class ForwardIterator; diff --git a/be/src/vec/exec/join/process_hash_table_probe.h b/be/src/vec/exec/join/process_hash_table_probe.h index 1b5d5c66c3..9b8ab4be33 100644 --- a/be/src/vec/exec/join/process_hash_table_probe.h +++ b/be/src/vec/exec/join/process_hash_table_probe.h @@ -43,40 +43,53 @@ struct ProcessHashTableProbe { ~ProcessHashTableProbe() = default; // output build side result column - template - void build_side_output_column(MutableColumns& mcol, int column_offset, int column_length, - const std::vector& output_slot_flags, int size); + void build_side_output_column(MutableColumns& mcol, const std::vector& output_slot_flags, + int size, bool have_other_join_conjunct); void probe_side_output_column(MutableColumns& mcol, const std::vector& output_slot_flags, int size, int last_probe_index, size_t probe_size, bool all_match_one, bool have_other_join_conjunct); + + template + Status process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map, + MutableBlock& mutable_block, Block* output_block, size_t probe_rows, + bool is_mark_join, bool have_other_join_conjunct); + // Only process the join with no other join conjunct, because of no other join conjunt // the output block struct is same with mutable block. we can do more opt on it and simplify // the logic of probe // TODO: opt the visited here to reduce the size of hash table - template + template Status do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map, - MutableBlock& mutable_block, Block* output_block, size_t probe_rows, - bool is_mark_join); + MutableBlock& mutable_block, Block* output_block, size_t probe_rows); // In the presence of other join conjunct, the process of join become more complicated. // each matching join column need to be processed by other join conjunct. so the struct of mutable block // and output block may be different // The output result is determined by the other join conjunct result and same_to_prev struct - template - Status do_process_with_other_join_conjuncts(HashTableType& hash_table_ctx, - ConstNullMapPtr null_map, - MutableBlock& mutable_block, Block* output_block, - size_t probe_rows, bool is_mark_join); + Status do_other_join_conjuncts(Block* output_block, bool is_mark_join, + int multi_matched_output_row_count, bool is_the_last_sub_block); void _process_splited_equal_matched_tuples(int start_row_idx, int row_count, - const ColumnPtr& other_hit_column, - std::vector& visited_map, int right_col_idx, - int right_col_len, UInt8* __restrict null_map_data, + const UInt8* __restrict other_hit_column, + UInt8* __restrict null_map_data, UInt8* __restrict filter_map, Block* output_block); void _pre_serialize_key(const ColumnRawPtrs& key_columns, const size_t key_rows, std::vector& serialized_keys); + void _emplace_element(int8_t block_offset, int32_t block_row, int& current_offset); + + template + KeyGetter _init_probe_side(size_t probe_rows, bool with_other_join_conjuncts); + + template + ForwardIterator& _probe_row_match(int& current_offset, int& probe_index, + size_t& probe_size, bool& all_match_one); + + template + void _probe_hash(const Keys& keys, HashTableType& hash_table_ctx, ConstNullMapPtr null_map); + // Process full outer join/ right join / right semi/anti join to output the join result // in hash table template @@ -90,8 +103,8 @@ struct ProcessHashTableProbe { std::vector _probe_keys; std::vector _probe_indexs; - std::vector _build_block_offsets; - std::vector _build_block_rows; + PaddedPODArray _build_block_offsets; + PaddedPODArray _build_block_rows; std::vector> _build_blocks_locs; // only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN ColumnUInt8::Container* _tuple_is_null_left_flags; @@ -102,13 +115,21 @@ struct ProcessHashTableProbe { uint8_t* _serialized_key_buffer; std::unique_ptr _serialize_key_arena; std::vector _probe_side_hash_values; + std::vector _probe_side_find_result; + + std::vector _visited_map; + std::vector _same_to_prev; + + int _right_col_idx; + int _right_col_len; + int _row_count_from_last_probe; RuntimeProfile::Counter* _rows_returned_counter; RuntimeProfile::Counter* _search_hashtable_timer; RuntimeProfile::Counter* _build_side_output_timer; RuntimeProfile::Counter* _probe_side_output_timer; RuntimeProfile::Counter* _probe_process_hashtable_timer; - static constexpr int PROBE_SIDE_EXPLODE_RATE = 3; + static constexpr int PROBE_SIDE_EXPLODE_RATE = 1; }; } // namespace vectorized diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 8c4327f2e4..03c903379c 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -51,10 +51,10 @@ ProcessHashTableProbe::ProcessHashTableProbe(HashJoinProbeContext* j _probe_process_hashtable_timer(join_context->_probe_process_hashtable_timer) {} template -template void ProcessHashTableProbe::build_side_output_column( - MutableColumns& mcol, int column_offset, int column_length, - const std::vector& output_slot_flags, int size) { + MutableColumns& mcol, const std::vector& output_slot_flags, int size, + bool have_other_join_conjunct) { + SCOPED_TIMER(_build_side_output_timer); constexpr auto is_semi_anti_join = JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || JoinOpType == TJoinOp::LEFT_ANTI_JOIN || @@ -64,31 +64,31 @@ void ProcessHashTableProbe::build_side_output_column( constexpr auto probe_all = JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; - if constexpr (!is_semi_anti_join || have_other_join_conjunct) { + if (!is_semi_anti_join || have_other_join_conjunct) { if (_build_blocks.size() == 1) { - for (int i = 0; i < column_length; i++) { + for (int i = 0; i < _right_col_len; i++) { auto& column = *_build_blocks[0].get_by_position(i).column; if (output_slot_flags[i]) { - mcol[i + column_offset]->insert_indices_from(column, _build_block_rows.data(), - _build_block_rows.data() + size); + mcol[i + _right_col_idx]->insert_indices_from(column, _build_block_rows.data(), + _build_block_rows.data() + size); } else { - mcol[i + column_offset]->insert_many_defaults(size); + mcol[i + _right_col_idx]->insert_many_defaults(size); } } } else { - for (int i = 0; i < column_length; i++) { + for (int i = 0; i < _right_col_len; i++) { if (output_slot_flags[i]) { for (int j = 0; j < size; j++) { if constexpr (probe_all) { if (_build_block_offsets[j] == -1) { - DCHECK(mcol[i + column_offset]->is_nullable()); - assert_cast(mcol[i + column_offset].get()) + DCHECK(mcol[i + _right_col_idx]->is_nullable()); + assert_cast(mcol[i + _right_col_idx].get()) ->insert_default(); } else { auto& column = *_build_blocks[_build_block_offsets[j]] .get_by_position(i) .column; - mcol[i + column_offset]->insert_from(column, _build_block_rows[j]); + mcol[i + _right_col_idx]->insert_from(column, _build_block_rows[j]); } } else { if (_build_block_offsets[j] == -1) { @@ -98,24 +98,24 @@ void ProcessHashTableProbe::build_side_output_column( // since nullptr is emplaced back to visited_map, // the output value of the build side does not matter, // just insert default value - mcol[i + column_offset]->insert_default(); + mcol[i + _right_col_idx]->insert_default(); } else { auto& column = *_build_blocks[_build_block_offsets[j]] .get_by_position(i) .column; - mcol[i + column_offset]->insert_from(column, _build_block_rows[j]); + mcol[i + _right_col_idx]->insert_from(column, _build_block_rows[j]); } } } } else { - mcol[i + column_offset]->insert_many_defaults(size); + mcol[i + _right_col_idx]->insert_many_defaults(size); } } } } // Dispose right tuple is null flags columns - if constexpr (probe_all && !have_other_join_conjunct) { + if (probe_all && !have_other_join_conjunct) { _tuple_is_null_right_flags->resize(size); auto* __restrict null_data = _tuple_is_null_right_flags->data(); for (int i = 0; i < size; ++i) { @@ -129,6 +129,7 @@ void ProcessHashTableProbe::probe_side_output_column( MutableColumns& mcol, const std::vector& output_slot_flags, int size, int last_probe_index, size_t probe_size, bool all_match_one, bool have_other_join_conjunct) { + SCOPED_TIMER(_probe_side_output_timer); auto& probe_block = *_join_context->_probe_block; for (int i = 0; i < output_slot_flags.size(); ++i) { if (output_slot_flags[i]) { @@ -136,7 +137,7 @@ void ProcessHashTableProbe::probe_side_output_column( if (all_match_one) { mcol[i]->insert_range_from(*column, last_probe_index, probe_size); } else { - column->replicate(&_probe_indexs[0], size, *mcol[i]); + column->replicate(_probe_indexs.data(), size, *mcol[i]); } } else { mcol[i]->insert_many_defaults(size); @@ -207,38 +208,116 @@ void ProcessHashTableProbe::_pre_serialize_key( } template -template -Status ProcessHashTableProbe::do_process(HashTableType& hash_table_ctx, - ConstNullMapPtr null_map, - MutableBlock& mutable_block, - Block* output_block, size_t probe_rows, - bool is_mark_join) { - auto& probe_index = *_join_context->_probe_index; - auto& probe_raw_ptrs = *_join_context->_probe_columns; +template +KeyGetter ProcessHashTableProbe::_init_probe_side(size_t probe_rows, + bool with_other_join_conjuncts) { + _right_col_idx = _join_context->_is_right_semi_anti && !with_other_join_conjuncts + ? 0 + : _join_context->_left_table_data_types->size(); + _right_col_len = _join_context->_right_table_data_types->size(); + _row_count_from_last_probe = 0; - _probe_indexs.resize(_batch_size); - if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) { - _build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); - _build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); + _build_block_rows.clear(); + _build_block_offsets.clear(); + _probe_indexs.clear(); + if (with_other_join_conjuncts) { + // use in right join to change visited state after exec the vother join conjunct + _visited_map.clear(); + _same_to_prev.clear(); + _visited_map.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE); + _same_to_prev.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE); } - using KeyGetter = typename HashTableType::State; - using Mapped = typename HashTableType::Mapped; + _probe_indexs.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE); + _build_block_rows.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE); + _build_block_offsets.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE); - int right_col_idx = - _join_context->_is_right_semi_anti ? 0 : _join_context->_left_table_data_types->size(); - int right_col_len = _join_context->_right_table_data_types->size(); - - KeyGetter key_getter(probe_raw_ptrs, _join_context->_probe_key_sz, nullptr); + KeyGetter key_getter(*_join_context->_probe_columns, _join_context->_probe_key_sz, nullptr); if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { - if (probe_index == 0) { - _pre_serialize_key(probe_raw_ptrs, probe_rows, _probe_keys); + if (*_join_context->_ready_probe == false) { + _pre_serialize_key(*_join_context->_probe_columns, probe_rows, _probe_keys); } key_getter.set_serialized_keys(_probe_keys.data()); } + return key_getter; +} + +template +template +void ProcessHashTableProbe::_probe_hash(const Keys& keys, HashTableType& hash_table_ctx, + ConstNullMapPtr null_map) { + if (*_join_context->_ready_probe) { + return; + } + SCOPED_TIMER(_search_hashtable_timer); + _probe_side_hash_values.resize(keys.size()); + for (size_t k = 0; k < keys.size(); ++k) { + if constexpr (need_null_map_for_probe) { + if ((*null_map)[k]) { + continue; + } + } + _probe_side_hash_values[k] = hash_table_ctx.hash_table.hash(keys[k]); + } + *_join_context->_ready_probe = true; +} + +template +template +ForwardIterator& ProcessHashTableProbe::_probe_row_match(int& current_offset, + int& probe_index, + size_t& probe_size, + bool& all_match_one) { + auto& probe_row_match_iter = + std::get>(*_join_context->_probe_row_match_iter); + if (!probe_row_match_iter.ok()) { + return probe_row_match_iter; + } + + SCOPED_TIMER(_search_hashtable_timer); + for (; probe_row_match_iter.ok() && current_offset < _batch_size; ++probe_row_match_iter) { + _emplace_element(probe_row_match_iter->block_offset, probe_row_match_iter->row_num, + current_offset); + _probe_indexs.emplace_back(probe_index); + if constexpr (with_other_join_conjuncts) { + _visited_map.emplace_back(&probe_row_match_iter->visited); + } + } + + _row_count_from_last_probe = current_offset; + all_match_one &= (current_offset == 1); + if (!probe_row_match_iter.ok()) { + ++probe_index; + } + probe_size = 1; + + return probe_row_match_iter; +} + +template +void ProcessHashTableProbe::_emplace_element(int8_t block_offset, int32_t block_row, + int& current_offset) { + _build_block_offsets.emplace_back(block_offset); + _build_block_rows.emplace_back(block_row); + current_offset++; +} + +template +template +Status ProcessHashTableProbe::do_process(HashTableType& hash_table_ctx, + ConstNullMapPtr null_map, + MutableBlock& mutable_block, + Block* output_block, size_t probe_rows) { + auto& probe_index = *_join_context->_probe_index; + + using KeyGetter = typename HashTableType::State; + using Mapped = typename HashTableType::Mapped; + + KeyGetter key_getter = _init_probe_side(probe_rows, with_other_conjuncts); + auto& mcol = mutable_block.mutable_columns(); - int current_offset = 0; constexpr auto is_right_semi_anti_join = JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == TJoinOp::RIGHT_SEMI_JOIN; @@ -246,844 +325,492 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c constexpr auto probe_all = JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; - bool all_match_one = true; int last_probe_index = probe_index; + + int current_offset = 0; + bool all_match_one = true; size_t probe_size = 0; - auto& probe_row_match_iter = - std::get>(*_join_context->_probe_row_match_iter); + auto& probe_row_match_iter = _probe_row_match( + current_offset, probe_index, probe_size, all_match_one); + + // If not(which means it excceed batch size), probe_index is not increased and + // remaining matched rows for the current probe row will be + // handled in the next call of this function + int multi_matched_output_row_count = 0; + + // Is the last sub block of splitted block + bool is_the_last_sub_block = false; + + if (with_other_conjuncts && probe_size != 0) { + is_the_last_sub_block = !probe_row_match_iter.ok(); + _same_to_prev.emplace_back(false); + for (int i = 0; i < current_offset - 1; ++i) { + _same_to_prev.emplace_back(true); + } + } + + const auto& keys = key_getter.get_keys(); + + _probe_hash(keys, hash_table_ctx, null_map); + { SCOPED_TIMER(_search_hashtable_timer); - if constexpr (!is_right_semi_anti_join) { - // handle ramaining matched rows from last probe row - if (probe_row_match_iter.ok()) { - for (; probe_row_match_iter.ok() && current_offset < _batch_size; - ++probe_row_match_iter) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = probe_row_match_iter->block_offset; - _build_block_rows[current_offset] = probe_row_match_iter->row_num; - _probe_indexs[current_offset] = probe_index; - } else { - _build_block_offsets.emplace_back(probe_row_match_iter->block_offset); - _build_block_rows.emplace_back(probe_row_match_iter->row_num); - _probe_indexs.template emplace_back(probe_index); - } - ++current_offset; - } - all_match_one &= (current_offset == 1); - if (!probe_row_match_iter.ok()) { - ++probe_index; - } - probe_size = 1; - } - } + using FindResult = decltype(key_getter.find_key(hash_table_ctx.hash_table, 0, *_arena)); + FindResult empty = {nullptr, false}; + while (current_offset < _batch_size && probe_index < probe_rows) { + if constexpr (ignore_null && need_null_map_for_probe) { + if ((*null_map)[probe_index]) { + if constexpr (probe_all) { + // only full outer / left outer need insert the data of right table + _emplace_element(-1, -1, current_offset); + _probe_indexs.emplace_back(probe_index); - auto empty = decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index, - *_arena)) {nullptr, false}; - - if (current_offset < _batch_size) { - if (*(_join_context->_ready_probe_index) < probe_rows) { - _probe_side_hash_values.resize(probe_rows); - for (size_t k = *(_join_context->_ready_probe_index); k < probe_rows; ++k) { - if constexpr (ignore_null && need_null_map_for_probe) { - if ((*null_map)[k]) { - continue; + if constexpr (with_other_conjuncts) { + _same_to_prev.emplace_back(false); + _visited_map.emplace_back(nullptr); } - } - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - KeyGetter>::value) { - _probe_side_hash_values[k] = hash_table_ctx.hash_table.hash( - key_getter.get_key_holder(k, *_arena).key); } else { - _probe_side_hash_values[k] = hash_table_ctx.hash_table.hash( - key_getter.get_key_holder(k, *_arena)); - } - } - *(_join_context->_ready_probe_index) = probe_rows; - } - while (probe_index < probe_rows) { - if constexpr (ignore_null && need_null_map_for_probe) { - if ((*null_map)[probe_index]) { - if constexpr (probe_all) { - // only full outer / left outer need insert the data of right table - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - _probe_indexs[current_offset] = probe_index; - } else { - _build_block_offsets.emplace_back(-1); - _build_block_rows.emplace_back(-1); - _probe_indexs.template emplace_back(probe_index); - } - ++current_offset; - } - probe_index++; all_match_one = false; - if constexpr (probe_all) { - if (current_offset >= _batch_size) { - break; - } - } - continue; } + probe_index++; + continue; } - int last_offset = current_offset; - auto find_result = (need_null_map_for_probe && (*null_map)[probe_index]) - ? empty - : key_getter.find_key_with_hash( - hash_table_ctx.hash_table, - _probe_side_hash_values[probe_index], - probe_index, *_arena); - if (probe_index + HASH_MAP_PREFETCH_DIST < probe_rows) { - key_getter.template prefetch_by_hash( - hash_table_ctx.hash_table, - _probe_side_hash_values[probe_index + HASH_MAP_PREFETCH_DIST]); - } + } - auto current_probe_index = probe_index; - if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - if (is_mark_join) { - ++current_offset; - assert_cast&>(*mcol[mcol.size() - 1]) - .get_data() - .template push_back(!find_result.is_found()); - } else { - if (!find_result.is_found()) { - ++current_offset; - } - } - ++probe_index; - } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { - if (is_mark_join) { - ++current_offset; - assert_cast&>(*mcol[mcol.size() - 1]) - .get_data() - .template push_back(find_result.is_found()); - } else { - if (find_result.is_found()) { - ++current_offset; - } - } - ++probe_index; + const auto& find_result = + need_null_map_for_probe && (*null_map)[probe_index] + ? empty + : key_getter.find_key_with_hash(hash_table_ctx.hash_table, + _probe_side_hash_values[probe_index], + keys[probe_index]); + if (LIKELY(probe_index + HASH_MAP_PREFETCH_DIST < probe_rows) && + !(need_null_map_for_probe && (*null_map)[probe_index + HASH_MAP_PREFETCH_DIST])) { + key_getter.template prefetch_by_hash( + hash_table_ctx.hash_table, + _probe_side_hash_values[probe_index + HASH_MAP_PREFETCH_DIST]); + } + + auto current_probe_index = probe_index; + if constexpr (!with_other_conjuncts && + (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::LEFT_SEMI_JOIN)) { + bool need_go_ahead = + (JoinOpType != TJoinOp::LEFT_SEMI_JOIN) ^ find_result.is_found(); + if constexpr (is_mark_join) { + ++current_offset; + assert_cast&>(*mcol[mcol.size() - 1]) + .get_data() + .template push_back(need_go_ahead); } else { - DCHECK(!is_mark_join); - if (find_result.is_found()) { - auto& mapped = find_result.get_mapped(); - // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. - // We should rethink whether to use this iterator mode in the future. Now just opt the one row case - if (mapped.is_single() == 1) { - if constexpr (std::is_same_v) { - mapped.visited = true; - } + current_offset += need_go_ahead; + } + ++probe_index; + } else { + if (find_result.is_found()) { + auto& mapped = find_result.get_mapped(); + auto origin_offset = current_offset; - if constexpr (!is_right_semi_anti_join) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = mapped.block_offset; - _build_block_rows[current_offset] = mapped.row_num; - } else { - _build_block_offsets.emplace_back(mapped.block_offset); - _build_block_rows.emplace_back(mapped.row_num); - } - ++current_offset; - } - ++probe_index; - } else { - if constexpr (!is_right_semi_anti_join) { - auto it = mapped.begin(); - for (; it.ok() && current_offset < _batch_size; ++it) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = it->block_offset; - _build_block_rows[current_offset] = it->row_num; - } else { - _build_block_offsets.emplace_back(it->block_offset); - _build_block_rows.emplace_back(it->row_num); - } - ++current_offset; - } - probe_row_match_iter = it; - if (!it.ok()) { - // If all matched rows for the current probe row are handled, - // advance to next probe row. - // If not(which means it excceed batch size), probe_index is not increased and - // remaining matched rows for the current probe row will be - // handled in the next call of this function - ++probe_index; - } - } else { - ++probe_index; - } - if constexpr (std::is_same_v) { - mapped.visited = true; - } - } - } else { - if constexpr (probe_all) { - // only full outer / left outer need insert the data of right table - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - } else { - _build_block_offsets.emplace_back(-1); - _build_block_rows.emplace_back(-1); - } - ++current_offset; + // For mark join, if euqual-matched tuple count for one probe row + // excceeds batch size, it's difficult to implement the logic to + // split them into multiple sub blocks and handle them, keep the original + // logic for now. + if constexpr (is_mark_join && with_other_conjuncts) { + for (auto it = mapped.begin(); it.ok(); ++it) { + _emplace_element(it->block_offset, it->row_num, current_offset); + _visited_map.emplace_back(&it->visited); } ++probe_index; - } - } + } else if constexpr (with_other_conjuncts || !is_right_semi_anti_join) { + auto multi_match_last_offset = current_offset; + auto it = mapped.begin(); + for (; it.ok() && current_offset < _batch_size; ++it) { + _emplace_element(it->block_offset, it->row_num, current_offset); - uint32_t count = (uint32_t)(current_offset - last_offset); - if (LIKELY(current_offset < _probe_indexs.size())) { - for (int i = last_offset; i < current_offset; ++i) { - _probe_indexs[i] = current_probe_index; + if constexpr (with_other_conjuncts) { + _visited_map.emplace_back(&it->visited); + } + } + probe_row_match_iter = it; + if (!it.ok()) { + // If all matched rows for the current probe row are handled, + // advance to next probe row. + // If not(which means it excceed batch size), probe_index is not increased and + // remaining matched rows for the current probe row will be + // handled in the next call of this function + ++probe_index; + } else if constexpr (with_other_conjuncts) { + multi_matched_output_row_count = + current_offset - multi_match_last_offset; + } + } else { + ++probe_index; } + if constexpr (std::is_same_v) { + mapped.visited = true; + } + + if constexpr (with_other_conjuncts) { + _same_to_prev.emplace_back(false); + for (int i = 0; i < current_offset - origin_offset - 1; ++i) { + _same_to_prev.emplace_back(true); + } + } + } else if constexpr (probe_all || JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + (JoinOpType == TJoinOp::LEFT_SEMI_JOIN && is_mark_join)) { + // only full outer / left outer need insert the data of right table + _emplace_element(-1, -1, current_offset); + + if constexpr (with_other_conjuncts) { + _same_to_prev.emplace_back(false); + _visited_map.emplace_back(nullptr); + } + ++probe_index; } else { - for (int i = last_offset; i < _probe_indexs.size(); ++i) { - _probe_indexs[i] = current_probe_index; - } - _probe_indexs.resize(current_offset, current_probe_index); - } - all_match_one &= (count == 1); - if (current_offset >= _batch_size) { - break; + ++probe_index; } } - probe_size = probe_index - last_probe_index + probe_row_match_iter.ok(); + all_match_one &= (current_offset == _probe_indexs.size() + 1); + _probe_indexs.resize(current_offset, current_probe_index); } + probe_size = probe_index - last_probe_index + probe_row_match_iter.ok(); } - { - SCOPED_TIMER(_build_side_output_timer); - build_side_output_column(mcol, right_col_idx, right_col_len, - *_join_context->_right_output_slot_flags, current_offset); - } + build_side_output_column(mcol, *_join_context->_right_output_slot_flags, current_offset, + with_other_conjuncts); - if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && - JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) { - SCOPED_TIMER(_probe_side_output_timer); + if constexpr (with_other_conjuncts || (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && + JoinOpType != TJoinOp::RIGHT_ANTI_JOIN)) { RETURN_IF_CATCH_EXCEPTION(probe_side_output_column( mcol, *_join_context->_left_output_slot_flags, current_offset, last_probe_index, - probe_size, all_match_one, false)); + probe_size, all_match_one, with_other_conjuncts)); } output_block->swap(mutable_block.to_block()); + if constexpr (with_other_conjuncts) { + return do_other_join_conjuncts(output_block, is_mark_join, multi_matched_output_row_count, + is_the_last_sub_block); + } + return Status::OK(); } template -template -Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( - HashTableType& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, - Block* output_block, size_t probe_rows, bool is_mark_join) { - auto& probe_index = *_join_context->_probe_index; - auto& probe_raw_ptrs = *_join_context->_probe_columns; - if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) { - _probe_indexs.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); - _build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); - _build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); +Status ProcessHashTableProbe::do_other_join_conjuncts( + Block* output_block, bool is_mark_join, int multi_matched_output_row_count, + bool is_the_last_sub_block) { + // dispose the other join conjunct exec + auto row_count = output_block->rows(); + if (!row_count) { + return Status::OK(); } - using KeyGetter = typename HashTableType::State; - using Mapped = typename HashTableType::Mapped; - if constexpr (std::is_same_v) { - constexpr auto probe_all = - JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; - KeyGetter key_getter(probe_raw_ptrs, _join_context->_probe_key_sz, nullptr); + SCOPED_TIMER(_join_context->_process_other_join_conjunct_timer); + int orig_columns = output_block->columns(); + IColumn::Filter other_conjunct_filter(row_count, 1); + { + bool can_be_filter_all = false; + RETURN_IF_ERROR(VExprContext::execute_conjuncts( + *_join_context->_other_join_conjuncts, nullptr, output_block, + &other_conjunct_filter, &can_be_filter_all)); + } - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { - if (probe_index == 0) { - _pre_serialize_key(probe_raw_ptrs, probe_rows, _probe_keys); + auto filter_column = ColumnUInt8::create(); + filter_column->get_data() = std::move(other_conjunct_filter); + auto result_column_id = output_block->columns(); + output_block->insert({std::move(filter_column), std::make_shared(), ""}); + const uint8_t* __restrict filter_column_ptr = + assert_cast( + output_block->get_by_position(result_column_id).column.get()) + ->get_data() + .data(); + + if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType == TJoinOp::FULL_OUTER_JOIN) { + auto new_filter_column = ColumnVector::create(row_count); + auto* __restrict filter_map = new_filter_column->get_data().data(); + + auto null_map_column = ColumnVector::create(row_count, 0); + auto* __restrict null_map_data = null_map_column->get_data().data(); + + // It contains non-first sub block of splited equal-conjuncts-matched tuples from last probe row + if (_row_count_from_last_probe > 0) { + _process_splited_equal_matched_tuples(0, _row_count_from_last_probe, filter_column_ptr, + null_map_data, filter_map, output_block); + // This is the last sub block of splitted block, and no equal-conjuncts-matched tuple + // is output in all sub blocks, need to output a tuple for this probe row + if (is_the_last_sub_block && !*_join_context->_is_any_probe_match_row_output) { + filter_map[0] = true; + null_map_data[0] = true; + } + } + int end_idx = row_count - multi_matched_output_row_count; + // process equal-conjuncts-matched tuples that are newly generated + // in this run if there are any. + for (int i = _row_count_from_last_probe; i < end_idx; ++i) { + auto join_hit = _visited_map[i] != nullptr; + auto other_hit = filter_column_ptr[i]; + + if (!other_hit) { + for (size_t j = 0; j < _right_col_len; ++j) { + typeid_cast( + std::move(*output_block->get_by_position(j + _right_col_idx).column) + .assume_mutable() + .get()) + ->get_null_map_data()[i] = true; + } + } + null_map_data[i] = !join_hit || !other_hit; + + // For cases where one probe row matches multiple build rows for equal conjuncts, + // all the other-conjuncts-matched tuples should be output. + // + // Other-conjuncts-NOT-matched tuples fall into two categories: + // 1. The beginning consecutive one(s). + // For these tuples, only the last one is marked to output; + // If there are any following other-conjuncts-matched tuples, + // the last tuple is also marked NOT to output. + // 2. All the remaining other-conjuncts-NOT-matched tuples. + // All these tuples are marked not to output. + if (join_hit) { + *_visited_map[i] |= other_hit; + filter_map[i] = other_hit || !_same_to_prev[i] || + (!filter_column_ptr[i] && filter_map[i - 1]); + // Here to keep only hit join conjunct and other join conjunt is true need to be output. + // if not, only some key must keep one row will output will null right table column + if (_same_to_prev[i] && filter_map[i] && !filter_column_ptr[i - 1]) { + filter_map[i - 1] = false; + } + } else { + filter_map[i] = true; } - key_getter.set_serialized_keys(_probe_keys.data()); } - int right_col_idx = _join_context->_left_table_data_types->size(); - int right_col_len = _join_context->_right_table_data_types->size(); - - auto& mcol = mutable_block.mutable_columns(); - // use in right join to change visited state after - // exec the vother join conjunct - std::vector visited_map; - visited_map.reserve(1.2 * _batch_size); - - std::vector same_to_prev; - same_to_prev.reserve(1.2 * _batch_size); - - int current_offset = 0; - - bool all_match_one = true; - int last_probe_index = probe_index; - - int row_count_from_last_probe = 0; - bool is_the_last_sub_block = false; - size_t probe_size = 0; - auto& probe_row_match_iter = - std::get>(*_join_context->_probe_row_match_iter); - if (probe_row_match_iter.ok()) { - SCOPED_TIMER(_search_hashtable_timer); - auto origin_offset = current_offset; - for (; probe_row_match_iter.ok() && current_offset < _batch_size; - ++probe_row_match_iter) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _probe_indexs[current_offset] = probe_index; - _build_block_offsets[current_offset] = probe_row_match_iter->block_offset; - _build_block_rows[current_offset] = probe_row_match_iter->row_num; - } else { - _probe_indexs.template emplace_back(probe_index); - _build_block_offsets.emplace_back(probe_row_match_iter->block_offset); - _build_block_rows.emplace_back(probe_row_match_iter->row_num); - } - ++current_offset; - visited_map.emplace_back(&probe_row_match_iter->visited); - } - same_to_prev.emplace_back(false); - for (int i = 0; i < current_offset - origin_offset - 1; ++i) { - same_to_prev.emplace_back(true); - } - - row_count_from_last_probe = current_offset; - all_match_one &= (current_offset == 1); - if (!probe_row_match_iter.ok()) { - ++probe_index; - is_the_last_sub_block = true; - } - probe_size = 1; + // It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row + if (multi_matched_output_row_count > 0) { + *_join_context->_is_any_probe_match_row_output = false; + _process_splited_equal_matched_tuples(row_count - multi_matched_output_row_count, + multi_matched_output_row_count, filter_column_ptr, + null_map_data, filter_map, output_block); } - int multi_matched_output_row_count = 0; - if (current_offset < _batch_size) { - auto empty = decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index, - *_arena)) {nullptr, false}; - - if (*(_join_context->_ready_probe_index) < probe_rows) { - _probe_side_hash_values.resize(probe_rows); - for (size_t k = *(_join_context->_ready_probe_index); k < probe_rows; ++k) { - if constexpr (ignore_null && need_null_map_for_probe) { - if ((*null_map)[k]) { - continue; - } - } - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - KeyGetter>::value) { - _probe_side_hash_values[k] = hash_table_ctx.hash_table.hash( - key_getter.get_key_holder(k, *_arena).key); - } else { - _probe_side_hash_values[k] = hash_table_ctx.hash_table.hash( - key_getter.get_key_holder(k, *_arena)); - } - } - *(_join_context->_ready_probe_index) = probe_rows; + for (size_t i = 0; i < row_count; ++i) { + if (filter_map[i]) { + _tuple_is_null_right_flags->emplace_back(null_map_data[i]); } + } + output_block->get_by_position(result_column_id).column = std::move(new_filter_column); + } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { + // TODO: resize in advance + auto new_filter_column = ColumnVector::create(); + auto& filter_map = new_filter_column->get_data(); - SCOPED_TIMER(_search_hashtable_timer); - while (probe_index < probe_rows) { - // ignore null rows - if constexpr (ignore_null && need_null_map_for_probe) { - if ((*null_map)[probe_index]) { - if constexpr (probe_all) { - same_to_prev.emplace_back(false); - visited_map.emplace_back(nullptr); - // only full outer / left outer need insert the data of right table - if (LIKELY(current_offset < _build_block_rows.size())) { - _probe_indexs[current_offset] = probe_index; - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - } else { - _probe_indexs.template emplace_back(probe_index); - _build_block_offsets.emplace_back(-1); - _build_block_rows.emplace_back(-1); - } - ++current_offset; - } - probe_index++; - all_match_one = false; - if constexpr (probe_all) { - if (current_offset >= _batch_size) { - break; - } - } - continue; - } + size_t start_row_idx = 1; + // We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks + if (_row_count_from_last_probe > 0) { + if (*_join_context->_is_any_probe_match_row_output) { + // if any matched tuple for this probe row is output, + // ignore all the following tuples for this probe row. + for (int row_idx = 0; row_idx < _row_count_from_last_probe; ++row_idx) { + filter_map.emplace_back(false); } - - auto last_offset = current_offset; - auto find_result = (need_null_map_for_probe && (*null_map)[probe_index]) - ? empty - : key_getter.find_key_with_hash( - hash_table_ctx.hash_table, - _probe_side_hash_values[probe_index], - probe_index, *_arena); - if (probe_index + HASH_MAP_PREFETCH_DIST < probe_rows) { - key_getter.template prefetch_by_hash( - hash_table_ctx.hash_table, - _probe_side_hash_values[probe_index + HASH_MAP_PREFETCH_DIST]); - } - - auto current_probe_index = probe_index; - if (find_result.is_found()) { - auto& mapped = find_result.get_mapped(); - auto origin_offset = current_offset; - // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. - // We should rethink whether to use this iterator mode in the future. Now just opt the one row case - if (mapped.is_single() == 1) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = mapped.block_offset; - _build_block_rows[current_offset] = mapped.row_num; - } else { - _build_block_offsets.emplace_back(mapped.block_offset); - _build_block_rows.emplace_back(mapped.row_num); - } - ++current_offset; - visited_map.emplace_back(&mapped.visited); - ++probe_index; - } else { - // For mark join, if euqual-matched tuple count for one probe row - // excceeds batch size, it's difficult to implement the logic to - // split them into multiple sub blocks and handle them, keep the original - // logic for now. - if (is_mark_join) { - for (auto it = mapped.begin(); it.ok(); ++it) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = it->block_offset; - _build_block_rows[current_offset] = it->row_num; - } else { - _build_block_offsets.emplace_back(it->block_offset); - _build_block_rows.emplace_back(it->row_num); - } - ++current_offset; - visited_map.emplace_back(&it->visited); - } - ++probe_index; - } else { - auto multi_match_last_offset = current_offset; - auto it = mapped.begin(); - // breaks if row count exceeds batch_size - for (; it.ok() && current_offset < _batch_size; ++it) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = it->block_offset; - _build_block_rows[current_offset] = it->row_num; - } else { - _build_block_offsets.emplace_back(it->block_offset); - _build_block_rows.emplace_back(it->row_num); - } - ++current_offset; - visited_map.emplace_back(&it->visited); - } - probe_row_match_iter = it; - // If all matched rows for the current probe row are handled, - // advance to next probe row. - if (!it.ok()) { - ++probe_index; - } else { - // If not(which means it excceed batch size), probe_index is not increased and - // remaining matched rows for the current probe row will be - // handled in the next call of this function - multi_matched_output_row_count = - current_offset - multi_match_last_offset; - } - } - } - same_to_prev.emplace_back(false); - for (int i = 0; i < current_offset - origin_offset - 1; ++i) { - same_to_prev.emplace_back(true); - } - } else if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType == TJoinOp::FULL_OUTER_JOIN || - JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - same_to_prev.emplace_back(false); - visited_map.emplace_back(nullptr); - // only full outer / left outer need insert the data of right table - // left anti use -1 use a default value - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - } else { - _build_block_offsets.emplace_back(-1); - _build_block_rows.emplace_back(-1); - } - ++current_offset; - ++probe_index; - } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { - if (is_mark_join) { - same_to_prev.emplace_back(false); - visited_map.emplace_back(nullptr); - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - } else { - _build_block_offsets.emplace_back(-1); - _build_block_rows.emplace_back(-1); - } - ++current_offset; - } - ++probe_index; - } else { - // other join, no nothing - ++probe_index; - } - uint32_t count = (uint32_t)(current_offset - last_offset); - if (LIKELY(current_offset < _probe_indexs.size())) { - for (int i = last_offset; i < current_offset; ++i) { - _probe_indexs[i] = current_probe_index; - } - } else { - for (int i = last_offset; i < _probe_indexs.size(); ++i) { - _probe_indexs[i] = current_probe_index; - } - _probe_indexs.resize(current_offset, current_probe_index); - } - all_match_one &= (count == 1); - if (current_offset >= _batch_size) { - break; + start_row_idx += _row_count_from_last_probe; + if (_row_count_from_last_probe < row_count) { + filter_map.emplace_back(filter_column_ptr[_row_count_from_last_probe]); } + } else { + filter_map.emplace_back(filter_column_ptr[0]); + } + } else { + filter_map.emplace_back(filter_column_ptr[0]); + } + for (size_t i = start_row_idx; i < row_count; ++i) { + if (filter_column_ptr[i] || (_same_to_prev[i] && filter_map[i - 1])) { + // Only last same element is true, output last one + filter_map.push_back(true); + filter_map[i - 1] = !_same_to_prev[i] && filter_map[i - 1]; + } else { + filter_map.push_back(false); + } + } + // It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row + if (multi_matched_output_row_count > 0) { + // If a matched row is output, all the equal-matched tuples in + // the following sub blocks should be ignored + *_join_context->_is_any_probe_match_row_output = filter_map[row_count - 1]; + } else if (_row_count_from_last_probe > 0 && + !*_join_context->_is_any_probe_match_row_output) { + // We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks, + // and no matched tuple has been output in all previous run. + // If a tuple is output in this run, all the following mathced tuples should be ignored + if (filter_map[_row_count_from_last_probe - 1]) { + *_join_context->_is_any_probe_match_row_output = true; } - probe_size = probe_index - last_probe_index + (probe_row_match_iter.ok() ? 1 : 0); } - { - SCOPED_TIMER(_build_side_output_timer); - build_side_output_column(mcol, right_col_idx, right_col_len, - *_join_context->_right_output_slot_flags, - current_offset); - } - { - SCOPED_TIMER(_probe_side_output_timer); - RETURN_IF_CATCH_EXCEPTION(probe_side_output_column( - mcol, *_join_context->_left_output_slot_flags, current_offset, last_probe_index, - probe_size, all_match_one, true)); - } - auto num_cols = mutable_block.columns(); - output_block->swap(mutable_block.to_block()); + if (is_mark_join) { + auto& matched_map = assert_cast&>( + *(output_block->get_by_position(orig_columns - 1) + .column->assume_mutable())) + .get_data(); - // dispose the other join conjunct exec - auto row_count = output_block->rows(); - if (row_count) { - SCOPED_TIMER(_join_context->_process_other_join_conjunct_timer); - int orig_columns = output_block->columns(); - IColumn::Filter other_conjunct_filter(row_count, 1); - bool can_be_filter_all = false; - RETURN_IF_ERROR(VExprContext::execute_conjuncts( - *_join_context->_other_join_conjuncts, nullptr, output_block, - &other_conjunct_filter, &can_be_filter_all)); - - auto result_column_id = output_block->columns(); - auto filter_column = ColumnVector::create(); - if (can_be_filter_all) { - memset(other_conjunct_filter.data(), 0, row_count); + // For mark join, we only filter rows which have duplicate join keys. + // And then, we set matched_map to the join result to do the mark join's filtering. + for (size_t i = 1; i < row_count; ++i) { + if (!_same_to_prev[i]) { + matched_map.push_back(filter_map[i - 1]); + filter_map[i - 1] = true; + } } - filter_column->get_data() = std::move(other_conjunct_filter); - output_block->insert({std::move(filter_column), std::make_shared(), ""}); - auto column = output_block->get_by_position(result_column_id).column; - if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType == TJoinOp::FULL_OUTER_JOIN) { - auto new_filter_column = ColumnVector::create(row_count); - auto* __restrict filter_map = new_filter_column->get_data().data(); + matched_map.push_back(filter_map[filter_map.size() - 1]); + filter_map[filter_map.size() - 1] = true; + } - auto null_map_column = ColumnVector::create(row_count, 0); - auto* __restrict null_map_data = null_map_column->get_data().data(); + output_block->get_by_position(result_column_id).column = std::move(new_filter_column); + } else if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + auto new_filter_column = ColumnVector::create(row_count); + auto* __restrict filter_map = new_filter_column->get_data().data(); - // It contains non-first sub block of splited equal-conjuncts-matched tuples from last probe row - if (row_count_from_last_probe > 0) { - _process_splited_equal_matched_tuples(0, row_count_from_last_probe, column, - visited_map, right_col_idx, right_col_len, - null_map_data, filter_map, output_block); - // This is the last sub block of splitted block, and no equal-conjuncts-matched tuple - // is output in all sub blocks, need to output a tuple for this probe row - if (is_the_last_sub_block && !*_join_context->_is_any_probe_match_row_output) { - filter_map[0] = true; - null_map_data[0] = true; - } + // for left anti join, the probe side is output only when + // there are no matched tuples for the probe row. + + // If multiple equal-conjuncts-matched tuples is splitted into several + // sub blocks, just filter out all the other-conjuncts-NOT-matched tuples at first, + // and when processing the last sub block, check whether there are any + // equal-conjuncts-matched tuple is output in all sub blocks, + // if there are none, just pick a tuple and output. + + size_t start_row_idx = 1; + // We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks + if (_row_count_from_last_probe > 0 && *_join_context->_is_any_probe_match_row_output) { + // if any matched tuple for this probe row is output, + // ignore all the following tuples for this probe row. + for (int row_idx = 0; row_idx < _row_count_from_last_probe; ++row_idx) { + filter_map[row_idx] = false; + } + start_row_idx += _row_count_from_last_probe; + if (_row_count_from_last_probe < row_count) { + filter_map[_row_count_from_last_probe] = + filter_column_ptr[_row_count_from_last_probe] && + _visited_map[_row_count_from_last_probe]; + } + } else { + // Both equal conjuncts and other conjuncts are true + filter_map[0] = filter_column_ptr[0] && _visited_map[0]; + } + + for (size_t i = start_row_idx; i < row_count; ++i) { + if ((_visited_map[i] && filter_column_ptr[i]) || + (_same_to_prev[i] && filter_map[i - 1])) { + // When either of two conditions is meet: + // 1. Both equal conjuncts and other conjuncts are true or same_to_prev + // 2. This row is joined from the same build side row as the previous row + // Set filter_map[i] to true and filter_map[i - 1] to false if same_to_prev[i] + // is true. + filter_map[i] = true; + filter_map[i - 1] = !_same_to_prev[i] && filter_map[i - 1]; + } else { + filter_map[i] = false; + } + } + + if (is_mark_join) { + auto& matched_map = assert_cast&>( + *(output_block->get_by_position(orig_columns - 1) + .column->assume_mutable())) + .get_data(); + for (int i = 1; i < row_count; ++i) { + if (!_same_to_prev[i]) { + matched_map.push_back(!filter_map[i - 1]); + filter_map[i - 1] = true; } - - int end_idx = row_count - multi_matched_output_row_count; - // process equal-conjuncts-matched tuples that are newly generated - // in this run if there are any. - for (size_t i = row_count_from_last_probe; i < end_idx; ++i) { - auto join_hit = visited_map[i] != nullptr; - auto other_hit = column->get_bool(i); - - if (!other_hit) { - for (size_t j = 0; j < right_col_len; ++j) { - typeid_cast( - std::move(*output_block->get_by_position(j + right_col_idx) - .column) - .assume_mutable() - .get()) - ->get_null_map_data()[i] = true; - } - } - null_map_data[i] = !join_hit || !other_hit; - - // For cases where one probe row matches multiple build rows for equal conjuncts, - // all the other-conjuncts-matched tuples should be output. - // - // Other-conjuncts-NOT-matched tuples fall into two categories: - // 1. The beginning consecutive one(s). - // For these tuples, only the last one is marked to output; - // If there are any following other-conjuncts-matched tuples, - // the last tuple is also marked NOT to output. - // 2. All the remaining other-conjuncts-NOT-matched tuples. - // All these tuples are marked not to output. - if (join_hit) { - *visited_map[i] |= other_hit; - filter_map[i] = other_hit || !same_to_prev[i] || - (!column->get_bool(i - 1) && filter_map[i - 1]); - // Here to keep only hit join conjunct and other join conjunt is true need to be output. - // if not, only some key must keep one row will output will null right table column - if (same_to_prev[i] && filter_map[i] && !column->get_bool(i - 1)) { - filter_map[i - 1] = false; - } - } else { - filter_map[i] = true; - } - } - - // It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row - if (multi_matched_output_row_count > 0) { - *_join_context->_is_any_probe_match_row_output = false; - _process_splited_equal_matched_tuples( - row_count - multi_matched_output_row_count, - multi_matched_output_row_count, column, visited_map, right_col_idx, - right_col_len, null_map_data, filter_map, output_block); - } - - for (size_t i = 0; i < row_count; ++i) { - if (filter_map[i]) { - _tuple_is_null_right_flags->emplace_back(null_map_data[i]); - } - } - output_block->get_by_position(result_column_id).column = - std::move(new_filter_column); - } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { - // TODO: resize in advance - auto new_filter_column = ColumnVector::create(); - auto& filter_map = new_filter_column->get_data(); - - size_t start_row_idx = 1; - // We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks - if (row_count_from_last_probe > 0) { - if (*_join_context->_is_any_probe_match_row_output) { - // if any matched tuple for this probe row is output, - // ignore all the following tuples for this probe row. - for (int row_idx = 0; row_idx < row_count_from_last_probe; ++row_idx) { - filter_map.emplace_back(false); - } - start_row_idx += row_count_from_last_probe; - if (row_count_from_last_probe < row_count) { - filter_map.emplace_back(column->get_bool(row_count_from_last_probe)); - } - } else { - filter_map.emplace_back(column->get_bool(0)); - } - } else { - filter_map.emplace_back(column->get_bool(0)); - } - for (size_t i = start_row_idx; i < row_count; ++i) { - if (column->get_bool(i) || (same_to_prev[i] && filter_map[i - 1])) { - // Only last same element is true, output last one - filter_map.push_back(true); - filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1]; - } else { - filter_map.push_back(false); - } - } - // It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row - if (multi_matched_output_row_count > 0) { - // If a matched row is output, all the equal-matched tuples in - // the following sub blocks should be ignored - *_join_context->_is_any_probe_match_row_output = filter_map[row_count - 1]; - } else if (row_count_from_last_probe > 0 && - !*_join_context->_is_any_probe_match_row_output) { + } + matched_map.push_back(!filter_map[row_count - 1]); + filter_map[row_count - 1] = true; + } else { + int end_row_idx = 0; + if (_row_count_from_last_probe > 0) { + end_row_idx = row_count - multi_matched_output_row_count; + if (!*_join_context->_is_any_probe_match_row_output) { // We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks, // and no matched tuple has been output in all previous run. // If a tuple is output in this run, all the following mathced tuples should be ignored - if (filter_map[row_count_from_last_probe - 1]) { + if (filter_map[_row_count_from_last_probe - 1]) { *_join_context->_is_any_probe_match_row_output = true; + filter_map[_row_count_from_last_probe - 1] = false; + } + if (is_the_last_sub_block && !*_join_context->_is_any_probe_match_row_output) { + // This is the last sub block of splitted block, and no equal-conjuncts-matched tuple + // is output in all sub blocks, output a tuple for this probe row + filter_map[0] = true; } } - - if (is_mark_join) { - auto& matched_map = assert_cast&>( - *(output_block->get_by_position(num_cols - 1) - .column->assume_mutable())) - .get_data(); - - // For mark join, we only filter rows which have duplicate join keys. - // And then, we set matched_map to the join result to do the mark join's filtering. - for (size_t i = 1; i < row_count; ++i) { - if (!same_to_prev[i]) { - matched_map.push_back(filter_map[i - 1]); - filter_map[i - 1] = true; - } - } - matched_map.push_back(filter_map[filter_map.size() - 1]); - filter_map[filter_map.size() - 1] = true; + if (multi_matched_output_row_count > 0) { + // It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row + // If a matched row is output, all the equal-matched tuples in + // the following sub blocks should be ignored + *_join_context->_is_any_probe_match_row_output = filter_map[row_count - 1]; + filter_map[row_count - 1] = false; } - - output_block->get_by_position(result_column_id).column = - std::move(new_filter_column); - } else if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - auto new_filter_column = ColumnVector::create(row_count); - auto* __restrict filter_map = new_filter_column->get_data().data(); - - // for left anti join, the probe side is output only when - // there are no matched tuples for the probe row. - - // If multiple equal-conjuncts-matched tuples is splitted into several - // sub blocks, just filter out all the other-conjuncts-NOT-matched tuples at first, - // and when processing the last sub block, check whether there are any - // equal-conjuncts-matched tuple is output in all sub blocks, - // if there are none, just pick a tuple and output. - - size_t start_row_idx = 1; - // We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks - if (row_count_from_last_probe > 0) { - if (*_join_context->_is_any_probe_match_row_output) { - // if any matched tuple for this probe row is output, - // ignore all the following tuples for this probe row. - for (int row_idx = 0; row_idx < row_count_from_last_probe; ++row_idx) { - filter_map[row_idx] = false; - } - start_row_idx += row_count_from_last_probe; - if (row_count_from_last_probe < row_count) { - filter_map[row_count_from_last_probe] = - column->get_bool(row_count_from_last_probe) && - visited_map[row_count_from_last_probe]; - } - } else { - // Both equal conjuncts and other conjuncts are true - filter_map[0] = column->get_bool(0) && visited_map[0]; - } - } else { - // Both equal conjuncts and other conjuncts are true - filter_map[0] = column->get_bool(0) && visited_map[0]; - } - - for (size_t i = start_row_idx; i < row_count; ++i) { - if ((visited_map[i] && column->get_bool(i)) || - (same_to_prev[i] && filter_map[i - 1])) { - // When either of two conditions is meet: - // 1. Both equal conjuncts and other conjuncts are true or same_to_prev - // 2. This row is joined from the same build side row as the previous row - // Set filter_map[i] to true and filter_map[i - 1] to false if same_to_prev[i] - // is true. - filter_map[i] = true; - filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1]; - } else { - filter_map[i] = false; - } - } - - if (is_mark_join) { - auto& matched_map = assert_cast&>( - *(output_block->get_by_position(num_cols - 1) - .column->assume_mutable())) - .get_data(); - for (int i = 1; i < row_count; ++i) { - if (!same_to_prev[i]) { - matched_map.push_back(!filter_map[i - 1]); - filter_map[i - 1] = true; - } - } - matched_map.push_back(!filter_map[row_count - 1]); - filter_map[row_count - 1] = true; - } else { - int end_row_idx = 0; - if (row_count_from_last_probe > 0) { - end_row_idx = row_count - multi_matched_output_row_count; - if (!*_join_context->_is_any_probe_match_row_output) { - // We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks, - // and no matched tuple has been output in all previous run. - // If a tuple is output in this run, all the following mathced tuples should be ignored - if (filter_map[row_count_from_last_probe - 1]) { - *_join_context->_is_any_probe_match_row_output = true; - filter_map[row_count_from_last_probe - 1] = false; - } - if (is_the_last_sub_block && - !*_join_context->_is_any_probe_match_row_output) { - // This is the last sub block of splitted block, and no equal-conjuncts-matched tuple - // is output in all sub blocks, output a tuple for this probe row - filter_map[0] = true; - } - } - if (multi_matched_output_row_count > 0) { - // It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row - // If a matched row is output, all the equal-matched tuples in - // the following sub blocks should be ignored - *_join_context->_is_any_probe_match_row_output = - filter_map[row_count - 1]; - filter_map[row_count - 1] = false; - } - } else if (multi_matched_output_row_count > 0) { - end_row_idx = row_count - multi_matched_output_row_count; - // It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row - // If a matched row is output, all the equal-matched tuples in - // the following sub blocks should be ignored - *_join_context->_is_any_probe_match_row_output = filter_map[row_count - 1]; - filter_map[row_count - 1] = false; - } else { - end_row_idx = row_count; - } - - // Same to the semi join, but change the last value to opposite value - for (int i = 1 + row_count_from_last_probe; i < end_row_idx; ++i) { - if (!same_to_prev[i]) { - filter_map[i - 1] = !filter_map[i - 1]; - } - } - auto non_sub_blocks_matched_row_count = - row_count - row_count_from_last_probe - multi_matched_output_row_count; - if (non_sub_blocks_matched_row_count > 0) { - filter_map[end_row_idx - 1] = !filter_map[end_row_idx - 1]; - } - } - - output_block->get_by_position(result_column_id).column = - std::move(new_filter_column); - } else if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || - JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) { - for (int i = 0; i < column->size(); ++i) { - DCHECK(visited_map[i]); - *visited_map[i] |= column->get_bool(i); - } - } else if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) { - auto filter_size = 0; - for (int i = 0; i < row_count; ++i) { - DCHECK(visited_map[i]); - auto result = column->get_bool(i); - *visited_map[i] |= result; - filter_size += result; - } - _tuple_is_null_left_flags->resize_fill(filter_size, 0); + } else if (multi_matched_output_row_count > 0) { + end_row_idx = row_count - multi_matched_output_row_count; + // It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row + // If a matched row is output, all the equal-matched tuples in + // the following sub blocks should be ignored + *_join_context->_is_any_probe_match_row_output = filter_map[row_count - 1]; + filter_map[row_count - 1] = false; } else { - // inner join do nothing + end_row_idx = row_count; } - if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || - JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) { - output_block->clear(); - } else { - if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN || - JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - orig_columns = right_col_idx; - } - if (is_mark_join) { - Block::filter_block(output_block, result_column_id, output_block->columns()); - } else { - Block::filter_block(output_block, result_column_id, orig_columns); + // Same to the semi join, but change the last value to opposite value + for (int i = 1 + _row_count_from_last_probe; i < end_row_idx; ++i) { + if (!_same_to_prev[i]) { + filter_map[i - 1] = !filter_map[i - 1]; } } + auto non_sub_blocks_matched_row_count = + row_count - _row_count_from_last_probe - multi_matched_output_row_count; + if (non_sub_blocks_matched_row_count > 0) { + filter_map[end_row_idx - 1] = !filter_map[end_row_idx - 1]; + } } - return Status::OK(); - } else { - LOG(FATAL) << "Invalid RowRefList"; - return Status::InvalidArgument("Invalid RowRefList"); + output_block->get_by_position(result_column_id).column = std::move(new_filter_column); + } else if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || + JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) { + for (int i = 0; i < row_count; ++i) { + DCHECK(_visited_map[i]); + *_visited_map[i] |= filter_column_ptr[i]; + } + } else if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) { + auto filter_size = 0; + for (int i = 0; i < row_count; ++i) { + DCHECK(_visited_map[i]); + auto result = filter_column_ptr[i]; + *_visited_map[i] |= result; + filter_size += result; + } + _tuple_is_null_left_flags->resize_fill(filter_size, 0); } + + if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || + JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) { + output_block->clear(); + } else { + if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN || + JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + orig_columns = _right_col_idx; + } + Block::filter_block(output_block, result_column_id, + is_mark_join ? output_block->columns() : orig_columns); + } + + return Status::OK(); } // For left or full outer join with other conjuncts. @@ -1094,18 +821,17 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( // if not, just pick a tuple and output. template void ProcessHashTableProbe::_process_splited_equal_matched_tuples( - int start_row_idx, int row_count, const ColumnPtr& other_hit_column, - std::vector& visited_map, int right_col_idx, int right_col_len, + int start_row_idx, int row_count, const UInt8* __restrict other_hit_column, UInt8* __restrict null_map_data, UInt8* __restrict filter_map, Block* output_block) { int end_row_idx = start_row_idx + row_count; for (int i = start_row_idx; i < end_row_idx; ++i) { - auto join_hit = visited_map[i] != nullptr; - auto other_hit = other_hit_column->get_bool(i); + auto join_hit = _visited_map[i] != nullptr; + auto other_hit = other_hit_column[i]; if (!other_hit) { - for (size_t j = 0; j < right_col_len; ++j) { + for (size_t j = 0; j < _right_col_len; ++j) { typeid_cast( - std::move(*output_block->get_by_position(j + right_col_idx).column) + std::move(*output_block->get_by_position(j + _right_col_idx).column) .assume_mutable() .get()) ->get_null_map_data()[i] = true; @@ -1116,7 +842,7 @@ void ProcessHashTableProbe::_process_splited_equal_matched_tuples( filter_map[i] = other_hit; if (join_hit) { - *visited_map[i] |= other_hit; + *_visited_map[i] |= other_hit; } } *_join_context->_is_any_probe_match_row_output |= @@ -1282,6 +1008,38 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp } } +template +template +Status ProcessHashTableProbe::process(HashTableType& hash_table_ctx, + ConstNullMapPtr null_map, + MutableBlock& mutable_block, Block* output_block, + size_t probe_rows, bool is_mark_join, + bool have_other_join_conjunct) { + Status res; + if constexpr (!std::is_same_v) { + if (have_other_join_conjunct) { + res = Status::InvalidArgument("Invalid HashTableType::Mapped"); + } else { + std::visit( + [&](auto is_mark_join) { + res = do_process(hash_table_ctx, null_map, mutable_block, + output_block, probe_rows); + }, + make_bool_variant(is_mark_join)); + } + } else { + std::visit( + [&](auto is_mark_join, auto have_other_join_conjunct) { + res = do_process( + hash_table_ctx, null_map, mutable_block, output_block, probe_rows); + }, + make_bool_variant(is_mark_join), make_bool_variant(have_other_join_conjunct)); + } + return res; +} + template struct ExtractType; @@ -1292,46 +1050,25 @@ struct ExtractType { #define INSTANTIATION(JoinOpType, T) \ template Status \ - ProcessHashTableProbe::do_process::Type>( \ + ProcessHashTableProbe::process::Type>( \ ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join); \ + bool is_mark_join, bool have_other_join_conjunct); \ template Status \ - ProcessHashTableProbe::do_process::Type>( \ + ProcessHashTableProbe::process::Type>( \ ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join); \ + bool is_mark_join, bool have_other_join_conjunct); \ template Status \ - ProcessHashTableProbe::do_process::Type>( \ + ProcessHashTableProbe::process::Type>( \ ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join); \ + bool is_mark_join, bool have_other_join_conjunct); \ template Status \ - ProcessHashTableProbe::do_process::Type>( \ + ProcessHashTableProbe::process::Type>( \ ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join); \ - \ - template Status ProcessHashTableProbe::do_process_with_other_join_conjuncts< \ - false, false, ExtractType::Type>( \ - ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ - MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join); \ - template Status ProcessHashTableProbe::do_process_with_other_join_conjuncts< \ - false, true, ExtractType::Type>( \ - ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ - MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join); \ - template Status ProcessHashTableProbe::do_process_with_other_join_conjuncts< \ - true, false, ExtractType::Type>( \ - ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ - MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join); \ - template Status ProcessHashTableProbe::do_process_with_other_join_conjuncts< \ - true, true, ExtractType::Type>( \ - ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ - MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join); \ + bool is_mark_join, bool have_other_join_conjunct); \ \ template Status \ ProcessHashTableProbe::process_data_in_hashtable::Type>( \ @@ -1341,13 +1078,6 @@ struct ExtractType { #define INSTANTIATION_FOR(JoinOpType) \ template struct ProcessHashTableProbe; \ \ - template void ProcessHashTableProbe::build_side_output_column( \ - MutableColumns & mcol, int column_offset, int column_length, \ - const std::vector& output_slot_flags, int size); \ - template void ProcessHashTableProbe::build_side_output_column( \ - MutableColumns & mcol, int column_offset, int column_length, \ - const std::vector& output_slot_flags, int size); \ - \ INSTANTIATION(JoinOpType, (SerializedHashTableContext)); \ INSTANTIATION(JoinOpType, (I8HashTableContext)); \ INSTANTIATION(JoinOpType, (I16HashTableContext)); \ diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 1f351f3294..75ea6b06ba 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -125,7 +125,7 @@ HashJoinProbeContext::HashJoinProbeContext(HashJoinNode* join_node) _probe_block(&join_node->_probe_block), _probe_columns(&join_node->_probe_columns), _probe_index(&join_node->_probe_index), - _ready_probe_index(&join_node->_ready_probe_index), + _ready_probe(&join_node->_ready_probe), _probe_key_sz(join_node->_probe_key_sz), _left_output_slot_flags(&join_node->_left_output_slot_flags), _right_output_slot_flags(&join_node->_right_output_slot_flags), @@ -154,7 +154,7 @@ HashJoinProbeContext::HashJoinProbeContext(pipeline::HashJoinProbeLocalState* lo _probe_block(&local_state->_probe_block), _probe_columns(&local_state->_probe_columns), _probe_index(&local_state->_probe_index), - _ready_probe_index(&local_state->_ready_probe_index), + _ready_probe(&local_state->_ready_probe), _probe_key_sz(local_state->_shared_state->probe_key_sz), _left_output_slot_flags(&local_state->join_probe()->_left_output_slot_flags), _right_output_slot_flags(&local_state->join_probe()->_right_output_slot_flags), @@ -423,7 +423,7 @@ bool HashJoinNode::need_more_input_data() const { void HashJoinNode::prepare_for_next() { _probe_index = 0; - _ready_probe_index = 0; + _ready_probe = false; _prepare_probe_block(); } @@ -529,30 +529,18 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ if constexpr (!std::is_same_v) { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - if (_have_other_join_conjunct) { - st = process_hashtable_ctx - .template do_process_with_other_join_conjuncts< - need_null_map_for_probe, ignore_null>( - arg, - need_null_map_for_probe - ? &_null_map_column->get_data() - : nullptr, - mutable_join_block, &temp_block, - _probe_block.rows(), _is_mark_join); - } else { - st = process_hashtable_ctx.template do_process< - need_null_map_for_probe, ignore_null>( - arg, - need_null_map_for_probe ? &_null_map_column->get_data() - : nullptr, - mutable_join_block, &temp_block, _probe_block.rows(), - _is_mark_join); - } + st = process_hashtable_ctx.template process( + arg, + need_null_map_for_probe ? &_null_map_column->get_data() + : nullptr, + mutable_join_block, &temp_block, _probe_block.rows(), + _is_mark_join, _have_other_join_conjunct); } else { - LOG(FATAL) << "FATAL: uninited hash table"; + st = Status::InternalError("uninited hash table"); } } else { - LOG(FATAL) << "FATAL: uninited hash table probe"; + st = Status::InternalError("uninited hash table probe"); } }, *_hash_table_variants, *_process_hashtable_ctx_variants, @@ -570,10 +558,10 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ st = process_hashtable_ctx.process_data_in_hashtable( arg, mutable_join_block, &temp_block, eos); } else { - LOG(FATAL) << "FATAL: uninited hash table"; + st = Status::InternalError("uninited hash table"); } } else { - LOG(FATAL) << "FATAL: uninited hash table probe"; + st = Status::InternalError("uninited hash table probe"); } }, *_hash_table_variants, *_process_hashtable_ctx_variants); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 7f6272043d..c60f1a0c7a 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -215,19 +215,20 @@ struct ProcessHashTableBuild { inserted_rows.reserve(_batch_size); } + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { + auto old_keys_memory = hash_table_ctx.keys_memory_usage; + hash_table_ctx.serialize_keys(_build_raw_ptrs, _rows); + key_getter.set_serialized_keys(hash_table_ctx.keys.data()); + _join_context->_build_arena_memory_usage->add(hash_table_ctx.keys_memory_usage - + old_keys_memory); + } + _build_side_hash_values.resize(_rows); auto& arena = *(_join_context->_arena); auto old_build_arena_memory = arena.size(); + const auto& keys = key_getter.get_keys(); { SCOPED_TIMER(_build_side_compute_hash_timer); - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits::value) { - auto old_keys_memory = hash_table_ctx.keys_memory_usage; - hash_table_ctx.serialize_keys(_build_raw_ptrs, _rows); - key_getter.set_serialized_keys(hash_table_ctx.keys.data()); - _join_context->_build_arena_memory_usage->add(hash_table_ctx.keys_memory_usage - - old_keys_memory); - } - for (size_t k = 0; k < _rows; ++k) { if (k % CHECK_FRECUENCY == 0) { RETURN_IF_CANCELLED(_state); @@ -246,36 +247,29 @@ struct ProcessHashTableBuild { return Status::OK(); } } - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - KeyGetter>::value) { - _build_side_hash_values[k] = - hash_table_ctx.hash_table.hash(key_getter.get_key_holder(k, arena).key); - } else { - _build_side_hash_values[k] = - hash_table_ctx.hash_table.hash(key_getter.get_key_holder(k, arena)); - } + _build_side_hash_values[k] = hash_table_ctx.hash_table.hash(keys[k]); } } bool build_unique = _join_context->_build_unique; -#define EMPLACE_IMPL(stmt) \ - for (size_t k = 0; k < _rows; ++k) { \ - if (k % CHECK_FRECUENCY == 0) { \ - RETURN_IF_CANCELLED(_state); \ - } \ - if constexpr (ignore_null) { \ - if ((*null_map)[k]) { \ - continue; \ - } \ - } \ - auto emplace_result = key_getter.emplace_key(hash_table_ctx.hash_table, \ - _build_side_hash_values[k], k, arena); \ - if (k + HASH_MAP_PREFETCH_DIST < _rows) { \ - key_getter.template prefetch_by_hash( \ - hash_table_ctx.hash_table, \ - _build_side_hash_values[k + HASH_MAP_PREFETCH_DIST]); \ - } \ - stmt; \ +#define EMPLACE_IMPL(stmt) \ + for (size_t k = 0; k < _rows; ++k) { \ + if (k % CHECK_FRECUENCY == 0) { \ + RETURN_IF_CANCELLED(_state); \ + } \ + if constexpr (ignore_null) { \ + if ((*null_map)[k]) { \ + continue; \ + } \ + } \ + auto emplace_result = key_getter.emplace_with_key(hash_table_ctx.hash_table, keys[k], \ + _build_side_hash_values[k]); \ + if (LIKELY(k + HASH_MAP_PREFETCH_DIST < _rows)) { \ + key_getter.template prefetch_by_hash( \ + hash_table_ctx.hash_table, \ + _build_side_hash_values[k + HASH_MAP_PREFETCH_DIST]); \ + } \ + stmt; \ } if (has_runtime_filter && build_unique) { @@ -522,7 +516,7 @@ struct HashJoinProbeContext { Block* _probe_block; ColumnRawPtrs* _probe_columns; int* _probe_index; - int* _ready_probe_index; + bool* _ready_probe; Sizes _probe_key_sz; @@ -670,7 +664,7 @@ private: bool _has_set_need_null_map_for_build = false; bool _probe_ignore_null = false; int _probe_index = -1; - int _ready_probe_index = -1; + bool _ready_probe = false; bool _probe_eos = false; bool _build_side_ignore_null = false; diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index cb83d6fcab..3ac9e7c2d0 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -927,7 +927,7 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR }; if constexpr (HashTableTraits::is_phmap) { - auto keys = state.get_keys(num_rows); + const auto& keys = state.get_keys(); if (_hash_values.size() < num_rows) { _hash_values.resize(num_rows); } @@ -982,21 +982,11 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr AggState state(key_columns, _probe_key_sz, nullptr); _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); - + const auto& keys = state.get_keys(); if constexpr (HashTableTraits::is_phmap) { - if (_hash_values.size() < num_rows) { - _hash_values.resize(num_rows); - } - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - AggState>::value) { - for (size_t i = 0; i < num_rows; ++i) { - _hash_values[i] = agg_method.data.hash(agg_method.keys[i]); - } - } else { - for (size_t i = 0; i < num_rows; ++i) { - _hash_values[i] = - agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool)); - } + _hash_values.resize(num_rows); + for (size_t i = 0; i < num_rows; ++i) { + _hash_values[i] = agg_method.data.hash(keys[i]); } } @@ -1009,8 +999,8 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr _hash_values[i + HASH_MAP_PREFETCH_DIST]); } - return state.find_key_with_hash(agg_method.data, _hash_values[i], i, - *_agg_arena_pool); + return state.find_key_with_hash(agg_method.data, _hash_values[i], + keys[i]); } else { return state.find_key(agg_method.data, i, *_agg_arena_pool); } diff --git a/be/src/vec/exec/vpartition_sort_node.cpp b/be/src/vec/exec/vpartition_sort_node.cpp index b9907accbb..bcd10a7681 100644 --- a/be/src/vec/exec/vpartition_sort_node.cpp +++ b/be/src/vec/exec/vpartition_sort_node.cpp @@ -112,20 +112,12 @@ void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_colum _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); //PHHashMap + const auto& keys = state.get_keys(); if constexpr (HashTableTraits::is_phmap) { - if (_hash_values.size() < num_rows) { - _hash_values.resize(num_rows); - } - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - AggState>::value) { - for (size_t i = 0; i < num_rows; ++i) { - _hash_values[i] = agg_method.data.hash(agg_method.keys[i]); - } - } else { - for (size_t i = 0; i < num_rows; ++i) { - _hash_values[i] = - agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool)); - } + _hash_values.resize(num_rows); + + for (size_t i = 0; i < num_rows; ++i) { + _hash_values[i] = agg_method.data.hash(keys[i]); } } @@ -138,10 +130,10 @@ void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_colum agg_method.data.prefetch_by_hash( _hash_values[row + HASH_MAP_PREFETCH_DIST]); } - return state.emplace_key(agg_method.data, _hash_values[row], row, - *_agg_arena_pool); + return state.emplace_with_key(agg_method.data, keys[row], + _hash_values[row], row); } else { - return state.emplace_key(agg_method.data, row, *_agg_arena_pool); + return state.emplace_with_key(agg_method.data, keys[row], row); } }(); diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 3b6cc578ac..6a0b515780 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -82,16 +82,23 @@ struct HashTableBuild { key_getter.set_serialized_keys(hash_table_ctx.keys.data()); } + _build_side_hash_values.resize(_rows); + const auto& keys = key_getter.get_keys(); + for (size_t k = 0; k < _rows; ++k) { + _build_side_hash_values[k] = hash_table_ctx.hash_table.hash(keys[k]); + } + for (size_t k = 0; k < _rows; ++k) { if (k % CHECK_FRECUENCY == 0) { RETURN_IF_CANCELLED(_state); } - auto emplace_result = key_getter.emplace_key(hash_table_ctx.hash_table, k, - *(_operation_node->_arena)); + auto emplace_result = key_getter.emplace_with_key(hash_table_ctx.hash_table, keys[k], + _build_side_hash_values[k], k); - if (k + 1 < _rows) { - key_getter.prefetch_by_key(hash_table_ctx.hash_table, k + 1, - *(_operation_node->_arena)); + if (LIKELY(k + HASH_MAP_PREFETCH_DIST < _rows)) { + key_getter.template prefetch_by_hash( + hash_table_ctx.hash_table, + _build_side_hash_values[k + HASH_MAP_PREFETCH_DIST]); } if (emplace_result.is_inserted()) { //only inserted once as the same key, others skip @@ -107,6 +114,7 @@ private: ColumnRawPtrs& _build_raw_ptrs; VSetOperationNode* _operation_node; RuntimeState* _state; + std::vector _build_side_hash_values; }; template