diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 98ecafdc5a..8f3a725958 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -88,7 +88,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, "MemoryUsage"); // Build phase - auto record_profile = _should_build_hash_table ? profile() : faker_runtime_profile(); + auto* record_profile = _should_build_hash_table ? profile() : faker_runtime_profile(); _build_table_timer = ADD_TIMER(profile(), "BuildTableTime"); _build_side_merge_block_timer = ADD_TIMER(profile(), "BuildSideMergeBlockTime"); _build_table_insert_timer = ADD_TIMER(record_profile, "BuildTableInsertTime"); @@ -197,10 +197,10 @@ Status HashJoinBuildSinkLocalState::_extract_join_column( if (shared_state.is_null_safe_eq_join[i]) { raw_ptrs[i] = block.get_by_position(res_col_ids[i]).column.get(); } else { - auto column = block.get_by_position(res_col_ids[i]).column.get(); - if (auto* nullable = check_and_get_column(*column)) { - auto& col_nested = nullable->get_nested_column(); - auto& col_nullmap = nullable->get_null_map_data(); + const auto* column = block.get_by_position(res_col_ids[i]).column.get(); + if (const auto* nullable = check_and_get_column(*column)) { + const auto& col_nested = nullable->get_nested_column(); + const auto& col_nullmap = nullable->get_null_map_data(); if (shared_state.store_null_in_hash_table[i]) { raw_ptrs[i] = nullable; @@ -290,7 +290,7 @@ void HashJoinBuildSinkLocalState::_set_build_ignore_flag(vectorized::Block& bloc const std::vector& res_col_ids) { for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) { if (!_shared_state->is_null_safe_eq_join[i]) { - auto column = block.get_by_position(res_col_ids[i]).column.get(); + const auto* column = block.get_by_position(res_col_ids[i]).column.get(); if (check_and_get_column(*column)) { _build_side_ignore_null |= (_parent->cast()._join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && @@ -304,14 +304,7 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) { auto& p = _parent->cast(); std::visit( [&](auto&& join_op_variants, auto have_other_join_conjunct) { - using JoinOpType = std::decay_t; - using RowRefListType = std::conditional_t< - have_other_join_conjunct, vectorized::RowRefListWithFlags, - std::conditional_t>; + using RowRefListType = vectorized::RowRefList; if (_build_expr_ctxs.size() == 1 && !p._store_null_in_hash_table[0]) { // Single column optimization switch (_build_expr_ctxs[0]->root()->result_type()) { diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 2a788c6af4..ca1172501c 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -69,16 +69,6 @@ Status HashJoinProbeLocalState::open(RuntimeState* state) { std::visit( [&](auto&& join_op_variants, auto have_other_join_conjunct) { using JoinOpType = std::decay_t; - using RowRefListType = std::conditional_t< - have_other_join_conjunct, vectorized::RowRefListWithFlags, - std::conditional_t>; - _probe_row_match_iter.emplace>(); - _outer_join_pull_visited_iter - .emplace>(); _process_hashtable_ctx_variants->emplace>(this, state->batch_size()); }, diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index cdb1bc05c9..76103c4d8f 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -135,10 +135,6 @@ private: std::unique_ptr _process_hashtable_ctx_variants = std::make_unique(); - // for full/right outer join - vectorized::HashTableIteratorVariants _outer_join_pull_visited_iter; - vectorized::HashTableIteratorVariants _probe_row_match_iter; - RuntimeProfile::Counter* _probe_expr_call_timer = nullptr; RuntimeProfile::Counter* _probe_next_timer = nullptr; RuntimeProfile::Counter* _probe_side_output_timer = nullptr; diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index 440936d595..5ff2c3df2d 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -232,50 +232,45 @@ void SetProbeSinkOperatorX::_refresh_hash_table( [&](auto&& arg) { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - if constexpr (std::is_same_v) { - auto tmp_hash_table = - std::make_shared(); - bool is_need_shrink = - arg.hash_table->should_be_shrink(valid_element_in_hash_tbl); - if (is_intersect || is_need_shrink) { - tmp_hash_table->init_buf_size( - valid_element_in_hash_tbl / arg.hash_table->get_factor() + 1); - } + auto tmp_hash_table = + std::make_shared(); + bool is_need_shrink = + arg.hash_table->should_be_shrink(valid_element_in_hash_tbl); + if (is_intersect || is_need_shrink) { + tmp_hash_table->init_buf_size( + valid_element_in_hash_tbl / arg.hash_table->get_factor() + 1); + } - arg.init_iterator(); - auto& iter = arg.iterator; - auto iter_end = arg.hash_table->end(); - std::visit( - [&](auto is_need_shrink_const) { - while (iter != iter_end) { - auto& mapped = iter->get_second(); - auto it = mapped.begin(); + arg.init_iterator(); + auto& iter = arg.iterator; + auto iter_end = arg.hash_table->end(); + std::visit( + [&](auto is_need_shrink_const) { + while (iter != iter_end) { + auto& mapped = iter->get_second(); + auto it = mapped.begin(); - if constexpr (is_intersect) { //intersected - if (it->visited) { - it->visited = false; + if constexpr (is_intersect) { //intersected + if (it->visited) { + it->visited = false; + tmp_hash_table->insert(iter->get_value()); + } + ++iter; + } else { //except + if constexpr (is_need_shrink_const) { + if (!it->visited) { tmp_hash_table->insert(iter->get_value()); } - ++iter; - } else { //except - if constexpr (is_need_shrink_const) { - if (!it->visited) { - tmp_hash_table->insert(iter->get_value()); - } - } - ++iter; } + ++iter; } - }, - vectorized::make_bool_variant(is_need_shrink)); + } + }, + vectorized::make_bool_variant(is_need_shrink)); - arg.reset(); - if (is_intersect || is_need_shrink) { - arg.hash_table = std::move(tmp_hash_table); - } - } else { - LOG(FATAL) << "FATAL: Invalid RowRefList"; + arg.reset(); + if (is_intersect || is_need_shrink) { + arg.hash_table = std::move(tmp_hash_table); } } else { LOG(FATAL) << "FATAL: uninited hash table"; diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 367ff41acc..6c18cab03f 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -181,7 +181,7 @@ Status SetSinkLocalState::init(RuntimeState* state, LocalSinkState } child_exprs_lists[parent._cur_child_id] = _child_exprs; - _shared_state->hash_table_variants = std::make_unique(); + _shared_state->hash_table_variants = std::make_unique(); for (int i = 0; i < child_exprs_lists[0].size(); ++i) { const auto& ctx = child_exprs_lists[0][i]; diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index e74e8b22ec..03cd67477d 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -140,23 +140,18 @@ Status SetSourceOperatorX::_get_data_in_hashtable( auto& iter = hash_table_ctx.iterator; auto block_size = 0; - if constexpr (std::is_same_v) { - for (; iter != hash_table_ctx.hash_table->end() && block_size < batch_size; ++iter) { - auto& value = iter->get_second(); - auto it = value.begin(); - if constexpr (is_intersect) { - if (it->visited) { //intersected: have done probe, so visited values it's the result - _add_result_columns(local_state, value, block_size); - } - } else { - if (!it->visited) { //except: haven't visited values it's the needed result - _add_result_columns(local_state, value, block_size); - } + for (; iter != hash_table_ctx.hash_table->end() && block_size < batch_size; ++iter) { + auto& value = iter->get_second(); + auto it = value.begin(); + if constexpr (is_intersect) { + if (it->visited) { //intersected: have done probe, so visited values it's the result + _add_result_columns(local_state, value, block_size); + } + } else { + if (!it->visited) { //except: haven't visited values it's the needed result + _add_result_columns(local_state, value, block_size); } } - } else { - return Status::InternalError("Invalid RowRefListType!"); } if (iter == hash_table_ctx.hash_table->end()) { diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 5161424d8d..61c251e00b 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -40,6 +40,7 @@ #include "vec/exec/vaggregation_node.h" #include "vec/exec/vanalytic_eval_node.h" #include "vec/exec/vpartition_sort_node.h" +#include "vec/exec/vset_operation_node.h" namespace doris::pipeline { @@ -481,7 +482,7 @@ public: //// shared static states (shared, decided in prepare/open...) /// init in setup_local_state - std::unique_ptr hash_table_variants = + std::unique_ptr hash_table_variants = nullptr; // the real data HERE. std::vector build_not_ignore_null; diff --git a/be/src/vec/common/hash_table/hash_map.h b/be/src/vec/common/hash_table/hash_map.h index 457df7e628..bd7c413dd9 100644 --- a/be/src/vec/common/hash_table/hash_map.h +++ b/be/src/vec/common/hash_table/hash_map.h @@ -211,8 +211,6 @@ public: using LookupResult = typename Base::LookupResult; - using HashMapTable::HashMapTable; - static uint32_t calc_bucket_size(size_t num_elem) { size_t expect_bucket_size = num_elem + (num_elem - 1) / 7; return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1; diff --git a/be/src/vec/common/hash_table/hash_map_context.h b/be/src/vec/common/hash_table/hash_map_context.h index 031af96e79..0da222d372 100644 --- a/be/src/vec/common/hash_table/hash_map_context.h +++ b/be/src/vec/common/hash_table/hash_map_context.h @@ -161,11 +161,6 @@ struct MethodBase { size_t num_rows) = 0; }; -// FIXME: parameter 'keys' shadows member inherited from type `MethodBase` -#ifdef __clang__ -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wshadow-field" -#endif template struct MethodSerialized : public MethodBase { using Base = MethodBase; @@ -191,9 +186,9 @@ struct MethodSerialized : public MethodBase { } void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t num_rows, - std::vector& keys, Arena& arena) { - arena.clear(); - keys.resize(num_rows); + std::vector& input_keys, Arena& input_arena) { + input_arena.clear(); + input_keys.resize(num_rows); size_t max_one_row_byte_size = 0; for (const auto& column : key_columns) { @@ -204,22 +199,24 @@ struct MethodSerialized : public MethodBase { // reach mem limit, don't serialize in batch size_t keys_size = key_columns.size(); for (size_t i = 0; i < num_rows; ++i) { - keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, key_columns, arena); + input_keys[i] = + serialize_keys_to_pool_contiguous(i, keys_size, key_columns, input_arena); } } else { - auto* serialized_key_buffer = reinterpret_cast(arena.alloc(total_bytes)); + auto* serialized_key_buffer = + reinterpret_cast(input_arena.alloc(total_bytes)); for (size_t i = 0; i < num_rows; ++i) { - keys[i].data = + input_keys[i].data = reinterpret_cast(serialized_key_buffer + i * max_one_row_byte_size); - keys[i].size = 0; + input_keys[i].size = 0; } for (const auto& column : key_columns) { - column->serialize_vec(keys, num_rows, max_one_row_byte_size); + column->serialize_vec(input_keys, num_rows, max_one_row_byte_size); } } - Base::keys = keys.data(); + Base::keys = input_keys.data(); } void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, @@ -234,10 +231,10 @@ struct MethodSerialized : public MethodBase { } } - void insert_keys_into_columns(std::vector& keys, MutableColumns& key_columns, + void insert_keys_into_columns(std::vector& input_keys, MutableColumns& key_columns, const size_t num_rows) override { for (auto& column : key_columns) { - column->deserialize_vec(keys, num_rows); + column->deserialize_vec(input_keys, num_rows); } } }; @@ -280,10 +277,10 @@ struct MethodStringNoCache : public MethodBase { } } - void insert_keys_into_columns(std::vector& keys, MutableColumns& key_columns, + void insert_keys_into_columns(std::vector& input_keys, MutableColumns& key_columns, const size_t num_rows) override { key_columns[0]->reserve(num_rows); - key_columns[0]->insert_many_strings(keys.data(), num_rows); + key_columns[0]->insert_many_strings(input_keys.data(), num_rows); } }; @@ -314,12 +311,12 @@ struct MethodOneNumber : public MethodBase { } } - void insert_keys_into_columns(std::vector& keys, + void insert_keys_into_columns(std::vector& input_keys, MutableColumns& key_columns, const size_t num_rows) override { key_columns[0]->reserve(num_rows); auto* column = static_cast(key_columns[0].get()); for (size_t i = 0; i != num_rows; ++i) { - const auto* key_holder = reinterpret_cast(&keys[i]); + const auto* key_holder = reinterpret_cast(&input_keys[i]); column->insert_raw_data(key_holder); } } @@ -444,7 +441,7 @@ struct MethodKeysFixed : public MethodBase { } } - void insert_keys_into_columns(std::vector& keys, + void insert_keys_into_columns(std::vector& input_keys, MutableColumns& key_columns, const size_t num_rows) override { // In any hash key value, column values to be read start just after the bitmap, if it exists. size_t pos = has_nullable_keys ? get_bitmap_size(key_columns.size()) : 0; @@ -467,7 +464,8 @@ struct MethodKeysFixed : public MethodBase { size_t bucket = i / BITSIZE; size_t offset = i % BITSIZE; for (size_t j = 0; j < num_rows; j++) { - nullmap[j] = (reinterpret_cast(&keys[j])[bucket] >> offset) & 1; + nullmap[j] = + (reinterpret_cast(&input_keys[j])[bucket] >> offset) & 1; } } else { data = const_cast(key_columns[i]->get_raw_data().data); @@ -476,7 +474,7 @@ struct MethodKeysFixed : public MethodBase { auto foo = [&](Fixed zero) { CHECK_EQ(sizeof(Fixed), size); for (size_t j = 0; j < num_rows; j++) { - memcpy_fixed(data + j * sizeof(Fixed), (char*)(&keys[j]) + pos); + memcpy_fixed(data + j * sizeof(Fixed), (char*)(&input_keys[j]) + pos); } }; @@ -533,20 +531,17 @@ struct MethodSingleNullableColumn : public SingleColumnMethod { using Base = SingleColumnMethod; using State = ColumnsHashing::HashMethodSingleLowNullableColumn; - void insert_keys_into_columns(std::vector& keys, + void insert_keys_into_columns(std::vector& input_keys, MutableColumns& key_columns, const size_t num_rows) override { auto* col = key_columns[0].get(); col->reserve(num_rows); if constexpr (std::is_same_v) { - col->insert_many_strings(keys.data(), num_rows); + col->insert_many_strings(input_keys.data(), num_rows); } else { - col->insert_many_raw_data(reinterpret_cast(keys.data()), num_rows); + col->insert_many_raw_data(reinterpret_cast(input_keys.data()), num_rows); } } }; -#ifdef __clang__ -#pragma clang diagnostic pop -#endif template using SerializedHashTableContext = MethodSerialized>; diff --git a/be/src/vec/common/hash_table/hash_table_set_probe.h b/be/src/vec/common/hash_table/hash_table_set_probe.h index 331e519445..c47b513770 100644 --- a/be/src/vec/common/hash_table/hash_table_set_probe.h +++ b/be/src/vec/common/hash_table/hash_table_set_probe.h @@ -36,23 +36,19 @@ struct HashTableProbe { KeyGetter key_getter(_probe_raw_ptrs); hash_table_ctx.init_serialized_keys(_probe_raw_ptrs, _probe_rows); - if constexpr (std::is_same_v) { - for (int probe_index = 0; probe_index < _probe_rows; probe_index++) { - auto find_result = hash_table_ctx.find(key_getter, probe_index); - if (find_result.is_found()) { //if found, marked visited - auto it = find_result.get_mapped().begin(); - if (!(it->visited)) { - it->visited = true; - if constexpr (is_intersected) { //intersected - (*_valid_element_in_hash_tbl)++; - } else { - (*_valid_element_in_hash_tbl)--; //except - } + for (int probe_index = 0; probe_index < _probe_rows; probe_index++) { + auto find_result = hash_table_ctx.find(key_getter, probe_index); + if (find_result.is_found()) { //if found, marked visited + auto it = find_result.get_mapped().begin(); + if (!(it->visited)) { + it->visited = true; + if constexpr (is_intersected) { //intersected + (*_valid_element_in_hash_tbl)++; + } else { + (*_valid_element_in_hash_tbl)--; //except } } } - } else { - LOG(FATAL) << "Invalid RowRefListType!"; } return Status::OK(); } diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 91c43763b3..a8ef9c0648 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -409,27 +409,13 @@ Status ProcessHashTableProbe::process( HashTableType& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, Block* output_block, size_t probe_rows, bool is_mark_join, bool have_other_join_conjunct) { Status res; - if constexpr (!std::is_same_v) { - if (have_other_join_conjunct) { - res = Status::InvalidArgument("Invalid HashTableType::Mapped"); - } else { - std::visit( - [&](auto is_mark_join) { - res = do_process(hash_table_ctx, null_map, mutable_block, - output_block, probe_rows); - }, - make_bool_variant(is_mark_join)); - } - } else { - std::visit( - [&](auto is_mark_join, auto have_other_join_conjunct) { - res = do_process( - hash_table_ctx, null_map, mutable_block, output_block, probe_rows); - }, - make_bool_variant(is_mark_join), make_bool_variant(have_other_join_conjunct)); - } + std::visit( + [&](auto is_mark_join, auto have_other_join_conjunct) { + res = do_process( + hash_table_ctx, null_map, mutable_block, output_block, probe_rows); + }, + make_bool_variant(is_mark_join), make_bool_variant(have_other_join_conjunct)); return res; } @@ -468,54 +454,24 @@ struct ExtractType { MutableBlock & mutable_block, Block * output_block, \ bool* eos) -#define INSTANTIATION_FOR1(JoinOpType, Parent) \ - template struct ProcessHashTableProbe; \ - \ - INSTANTIATION(JoinOpType, Parent, (SerializedHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I8HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I16HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I32HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I64HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I128HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I256HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I136FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I136FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (SerializedHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I8HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I16HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I32HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I64HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I128HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I256HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I136FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I136FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (SerializedHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I8HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I16HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I32HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I64HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I128HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I256HashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I136FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I136FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext)) +#define INSTANTIATION_FOR1(JoinOpType, Parent) \ + template struct ProcessHashTableProbe; \ + \ + INSTANTIATION(JoinOpType, Parent, (SerializedHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I8HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I16HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I32HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I64HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I128HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I256HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I136FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I136FixedKeyHashTableContext)); #define INSTANTIATION_FOR(JoinOpType) \ INSTANTIATION_FOR1(JoinOpType, HashJoinNode); \ diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index b135ada723..c65513c807 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -91,9 +91,7 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr tnode.hash_join_node.is_broadcast_join), _hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids ? tnode.hash_join_node.hash_output_slot_ids - : std::vector {}), - _build_side_mem_used(0), - _build_side_last_mem_used(0) { + : std::vector {}) { _runtime_filter_descs = tnode.runtime_filters; _arena = std::make_shared(); _hash_table_variants = std::make_shared(); @@ -216,7 +214,8 @@ Status HashJoinNode::prepare(RuntimeState* state) { "ProbeKeyArena", TUnit::BYTES, "MemoryUsage"); // Build phase - auto record_profile = _should_build_hash_table ? _build_phase_profile : faker_runtime_profile(); + auto* record_profile = + _should_build_hash_table ? _build_phase_profile : faker_runtime_profile(); _build_get_next_timer = ADD_TIMER(record_profile, "BuildGetNextTime"); _build_timer = ADD_TIMER(record_profile, "BuildTime"); _build_rows_counter = ADD_COUNTER(record_profile, "BuildRows", TUnit::UNIT); @@ -228,7 +227,7 @@ Status HashJoinNode::prepare(RuntimeState* state) { _build_side_compute_hash_timer = ADD_TIMER(record_profile, "BuildSideHashComputingTime"); // Probe phase - auto probe_phase_profile = _probe_phase_profile; + auto* probe_phase_profile = _probe_phase_profile; _probe_next_timer = ADD_TIMER(probe_phase_profile, "ProbeFindNextTime"); _probe_expr_call_timer = ADD_TIMER(probe_phase_profile, "ProbeExprCallTime"); _search_hashtable_timer = @@ -785,7 +784,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc } else if (!_should_build_hash_table) { DCHECK(_shared_hashtable_controller != nullptr); DCHECK(_shared_hash_table_context != nullptr); - auto wait_timer = ADD_TIMER(_build_phase_profile, "WaitForSharedHashTableTime"); + auto* wait_timer = ADD_TIMER(_build_phase_profile, "WaitForSharedHashTableTime"); SCOPED_TIMER(wait_timer); RETURN_IF_ERROR( _shared_hashtable_controller->wait_for_signal(state, _shared_hash_table_context)); @@ -867,10 +866,10 @@ Status HashJoinNode::_extract_join_column(Block& block, ColumnUInt8::MutablePtr& if (_is_null_safe_eq_join[i]) { raw_ptrs[i] = block.get_by_position(res_col_ids[i]).column.get(); } else { - auto column = block.get_by_position(res_col_ids[i]).column.get(); - if (auto* nullable = check_and_get_column(*column)) { - auto& col_nested = nullable->get_nested_column(); - auto& col_nullmap = nullable->get_null_map_data(); + const auto* column = block.get_by_position(res_col_ids[i]).column.get(); + if (const auto* nullable = check_and_get_column(*column)) { + const auto& col_nested = nullable->get_nested_column(); + const auto& col_nullmap = nullable->get_null_map_data(); if constexpr (!BuildSide) { DCHECK(null_map != nullptr); @@ -925,7 +924,7 @@ bool HashJoinNode::_need_probe_null_map(Block& block, const std::vector& re DCHECK_EQ(_build_expr_ctxs.size(), _probe_expr_ctxs.size()); for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) { if (!_is_null_safe_eq_join[i]) { - auto column = block.get_by_position(res_col_ids[i]).column.get(); + const auto* column = block.get_by_position(res_col_ids[i]).column.get(); if (check_and_get_column(*column)) { return true; } @@ -938,7 +937,7 @@ void HashJoinNode::_set_build_ignore_flag(Block& block, const std::vector& DCHECK_EQ(_build_expr_ctxs.size(), _probe_expr_ctxs.size()); for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) { if (!_is_null_safe_eq_join[i]) { - auto column = block.get_by_position(res_col_ids[i]).column.get(); + const auto* column = block.get_by_position(res_col_ids[i]).column.get(); if (check_and_get_column(*column)) { _build_side_ignore_null |= (_join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_store_null_in_hash_table[i]); @@ -1016,16 +1015,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) { void HashJoinNode::_hash_table_init(RuntimeState* state) { std::visit( [&](auto&& join_op_variants, auto have_other_join_conjunct) { - using JoinOpType = std::decay_t; - using RowRefListType = std::conditional_t< - have_other_join_conjunct, RowRefListWithFlags, - std::conditional_t>; - _probe_row_match_iter.emplace>(); - _outer_join_pull_visited_iter.emplace>(); + using RowRefListType = RowRefList; if (_build_expr_ctxs.size() == 1 && !_store_null_in_hash_table[0]) { // Single column optimization diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 35d97ccb03..be94dacdca 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -184,42 +184,19 @@ using I256FixedKeyHashTableContext = FixedKeyHashTableContext using I136FixedKeyHashTableContext = FixedKeyHashTableContext; -using HashTableVariants = std::variant< - std::monostate, SerializedHashTableContext, I8HashTableContext, - I16HashTableContext, I32HashTableContext, - I64HashTableContext, I128HashTableContext, - I256HashTableContext, I64FixedKeyHashTableContext, - I64FixedKeyHashTableContext, - I128FixedKeyHashTableContext, - I128FixedKeyHashTableContext, - I256FixedKeyHashTableContext, - I256FixedKeyHashTableContext, - SerializedHashTableContext, I8HashTableContext, - I16HashTableContext, I32HashTableContext, - I64HashTableContext, I128HashTableContext, - I256HashTableContext, - I64FixedKeyHashTableContext, - I64FixedKeyHashTableContext, - I128FixedKeyHashTableContext, - I128FixedKeyHashTableContext, - I256FixedKeyHashTableContext, - I256FixedKeyHashTableContext, - SerializedHashTableContext, I8HashTableContext, - I16HashTableContext, I32HashTableContext, - I64HashTableContext, I128HashTableContext, - I256HashTableContext, - I64FixedKeyHashTableContext, - I64FixedKeyHashTableContext, - I128FixedKeyHashTableContext, - I128FixedKeyHashTableContext, - I256FixedKeyHashTableContext, - I256FixedKeyHashTableContext, - I136FixedKeyHashTableContext, - I136FixedKeyHashTableContext, - I136FixedKeyHashTableContext, - I136FixedKeyHashTableContext, - I136FixedKeyHashTableContext, - I136FixedKeyHashTableContext>; +using HashTableVariants = + std::variant, + I8HashTableContext, I16HashTableContext, + I32HashTableContext, I64HashTableContext, + I128HashTableContext, I256HashTableContext, + I64FixedKeyHashTableContext, + I64FixedKeyHashTableContext, + I128FixedKeyHashTableContext, + I128FixedKeyHashTableContext, + I256FixedKeyHashTableContext, + I256FixedKeyHashTableContext, + I136FixedKeyHashTableContext, + I136FixedKeyHashTableContext>; class VExprContext; @@ -235,10 +212,6 @@ using HashTableCtxVariants = ProcessHashTableProbe, ProcessHashTableProbe>; -using HashTableIteratorVariants = - std::variant, - ForwardIterator, ForwardIterator>; - class HashJoinNode final : public VJoinNodeBase { public: HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); @@ -373,10 +346,6 @@ private: std::unique_ptr _process_hashtable_ctx_variants; - // for full/right outer join - HashTableIteratorVariants _outer_join_pull_visited_iter; - HashTableIteratorVariants _probe_row_match_iter; - std::shared_ptr _build_block; Block _probe_block; ColumnRawPtrs _probe_columns; diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 3e41a067b6..4f4634fbd6 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -56,7 +56,7 @@ template VSetOperationNode::VSetOperationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), _valid_element_in_hash_tbl(0), _build_finished(false) { - _hash_table_variants = std::make_unique(); + _hash_table_variants = std::make_unique(); } template @@ -86,7 +86,7 @@ Status VSetOperationNode::init(const TPlanNode& tnode, RuntimeStat return Status::NotSupported("Not Implemented, Check The Operation Node."); } - for (auto& texprs : *result_texpr_lists) { + for (const auto& texprs : *result_texpr_lists) { VExprContextSPtrs ctxs; RETURN_IF_ERROR(VExpr::create_expr_trees(texprs, ctxs)); _child_expr_lists.push_back(ctxs); @@ -424,10 +424,10 @@ Status VSetOperationNode::extract_build_column(Block& block, block.get_by_position(result_col_id).column = block.get_by_position(result_col_id).column->convert_to_full_column_if_const(); - auto column = block.get_by_position(result_col_id).column.get(); + const auto* column = block.get_by_position(result_col_id).column.get(); - if (auto* nullable = check_and_get_column(*column)) { - auto& col_nested = nullable->get_nested_column(); + if (const auto* nullable = check_and_get_column(*column)) { + const auto& col_nested = nullable->get_nested_column(); if (_build_not_ignore_null[i]) { raw_ptrs[i] = nullable; } else { @@ -452,10 +452,10 @@ Status VSetOperationNode::extract_probe_column(Block& block, Colum block.get_by_position(result_col_id).column = block.get_by_position(result_col_id).column->convert_to_full_column_if_const(); - auto column = block.get_by_position(result_col_id).column.get(); + const auto* column = block.get_by_position(result_col_id).column.get(); - if (auto* nullable = check_and_get_column(*column)) { - auto& col_nested = nullable->get_nested_column(); + if (const auto* nullable = check_and_get_column(*column)) { + const auto& col_nested = nullable->get_nested_column(); if (_build_not_ignore_null[i]) { //same as build column raw_ptrs[i] = nullable; } else { @@ -496,8 +496,8 @@ void VSetOperationNode::debug_string(int indentation_level, std::stringstream* out) const { *out << string(indentation_level * 2, ' '); *out << " _child_expr_lists=["; - for (int i = 0; i < _child_expr_lists.size(); ++i) { - *out << VExpr::debug_string(_child_expr_lists[i]) << ", "; + for (const auto& _child_expr_list : _child_expr_lists) { + *out << VExpr::debug_string(_child_expr_list) << ", "; } *out << "] \n"; ExecNode::debug_string(indentation_level, out); @@ -516,50 +516,45 @@ void VSetOperationNode::refresh_hash_table() { [&](auto&& arg) { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - if constexpr (std::is_same_v) { - auto tmp_hash_table = - std::make_shared(); - bool is_need_shrink = - arg.hash_table->should_be_shrink(_valid_element_in_hash_tbl); - if (is_intersect || is_need_shrink) { - tmp_hash_table->init_buf_size( - _valid_element_in_hash_tbl / arg.hash_table->get_factor() + 1); - } + auto tmp_hash_table = + std::make_shared(); + bool is_need_shrink = + arg.hash_table->should_be_shrink(_valid_element_in_hash_tbl); + if (is_intersect || is_need_shrink) { + tmp_hash_table->init_buf_size( + _valid_element_in_hash_tbl / arg.hash_table->get_factor() + 1); + } - arg.init_iterator(); - auto& iter = arg.iterator; - auto iter_end = arg.hash_table->end(); - std::visit( - [&](auto is_need_shrink_const) { - while (iter != iter_end) { - auto& mapped = iter->get_second(); - auto it = mapped.begin(); + arg.init_iterator(); + auto& iter = arg.iterator; + auto iter_end = arg.hash_table->end(); + std::visit( + [&](auto is_need_shrink_const) { + while (iter != iter_end) { + auto& mapped = iter->get_second(); + auto it = mapped.begin(); - if constexpr (is_intersect) { //intersected - if (it->visited) { - it->visited = false; + if constexpr (is_intersect) { //intersected + if (it->visited) { + it->visited = false; + tmp_hash_table->insert(iter->get_value()); + } + ++iter; + } else { //except + if constexpr (is_need_shrink_const) { + if (!it->visited) { tmp_hash_table->insert(iter->get_value()); } - ++iter; - } else { //except - if constexpr (is_need_shrink_const) { - if (!it->visited) { - tmp_hash_table->insert(iter->get_value()); - } - } - ++iter; } + ++iter; } - }, - make_bool_variant(is_need_shrink)); + } + }, + make_bool_variant(is_need_shrink)); - arg.reset(); - if (is_intersect || is_need_shrink) { - arg.hash_table = std::move(tmp_hash_table); - } - } else { - LOG(FATAL) << "FATAL: Invalid RowRefList"; + arg.reset(); + if (is_intersect || is_need_shrink) { + arg.hash_table = std::move(tmp_hash_table); } } else { LOG(FATAL) << "FATAL: uninited hash table"; @@ -578,22 +573,18 @@ Status VSetOperationNode::get_data_in_hashtable(HashTableContext auto& iter = hash_table_ctx.iterator; auto block_size = 0; - if constexpr (std::is_same_v) { - for (; iter != hash_table_ctx.hash_table->end() && block_size < batch_size; ++iter) { - auto& value = iter->get_second(); - auto it = value.begin(); - if constexpr (is_intersected) { - if (it->visited) { //intersected: have done probe, so visited values it's the result - add_result_columns(value, block_size); - } - } else { - if (!it->visited) { //except: haven't visited values it's the needed result - add_result_columns(value, block_size); - } + for (; iter != hash_table_ctx.hash_table->end() && block_size < batch_size; ++iter) { + auto& value = iter->get_second(); + auto it = value.begin(); + if constexpr (is_intersected) { + if (it->visited) { //intersected: have done probe, so visited values it's the result + add_result_columns(value, block_size); + } + } else { + if (!it->visited) { //except: haven't visited values it's the needed result + add_result_columns(value, block_size); } } - } else { - return Status::InternalError("Invalid RowRefListType!"); } *eos = iter == hash_table_ctx.hash_table->end(); diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index 3741fc564c..ae600c6490 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -49,6 +49,20 @@ namespace vectorized { class VExprContext; struct RowRefListWithFlags; +using SetHashTableVariants = std::variant< + std::monostate, SerializedHashTableContext, + I8HashTableContext, I16HashTableContext, + I32HashTableContext, I64HashTableContext, + I128HashTableContext, I256HashTableContext, + I64FixedKeyHashTableContext, + I64FixedKeyHashTableContext, + I128FixedKeyHashTableContext, + I128FixedKeyHashTableContext, + I256FixedKeyHashTableContext, + I256FixedKeyHashTableContext, + I136FixedKeyHashTableContext, + I136FixedKeyHashTableContext>; + template class VSetOperationNode final : public ExecNode { public: @@ -95,7 +109,7 @@ private: void create_mutable_cols(Block* output_block); void release_mem(); - std::unique_ptr _hash_table_variants; + std::unique_ptr _hash_table_variants; std::vector _build_not_ignore_null;