From 37dbda6209c82b3e99efd08eeb73d4e072d8c437 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 13 Oct 2023 16:51:55 +0800 Subject: [PATCH] [pipelineX](refactor) Use class template to simplify join (#25369) --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 67 ++-- be/src/pipeline/exec/hashjoin_build_sink.h | 29 +- .../pipeline/exec/hashjoin_probe_operator.cpp | 67 +++- .../pipeline/exec/hashjoin_probe_operator.h | 43 ++- .../exec/partition_sort_sink_operator.cpp | 2 +- .../exec/partition_sort_source_operator.cpp | 23 +- .../exec/partition_sort_source_operator.h | 1 + be/src/pipeline/pipeline_x/dependency.h | 21 +- .../vec/exec/join/process_hash_table_probe.h | 20 +- .../exec/join/process_hash_table_probe_impl.h | 349 +++++++++--------- be/src/vec/exec/join/vhash_join_node.cpp | 134 +------ be/src/vec/exec/join/vhash_join_node.h | 216 ++++------- be/src/vec/exec/join/vjoin_node_base.h | 2 +- .../runtime/shared_hash_table_controller.h | 1 + 14 files changed, 442 insertions(+), 533 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index c9fe153af0..939363776b 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -52,11 +52,15 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo auto& p = _parent->cast(); _shared_state->join_op_variants = p._join_op_variants; _shared_state->probe_key_sz = p._build_key_sz; - _shared_state->build_blocks.reset(new std::vector()); - // avoid vector expand change block address. - // one block can store 4g data, _build_blocks can store 128*4g data. - // if probe data bigger than 512g, runtime filter maybe will core dump when insert data. - _shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT); + if (p._is_broadcast_join && state->enable_share_hash_table_for_broadcast_join()) { + _shared_state->build_blocks = p._shared_hash_table_context->blocks; + } else { + _shared_state->build_blocks.reset(new std::vector()); + // avoid vector expand change block address. + // one block can store 4g data, _build_blocks can store 128*4g data. + // if probe data bigger than 512g, runtime filter maybe will core dump when insert data. + _shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT); + } _shared_state->is_null_safe_eq_join = p._is_null_safe_eq_join; _shared_state->store_null_in_hash_table = p._store_null_in_hash_table; _build_expr_ctxs.resize(p._build_expr_ctxs.size()); @@ -79,6 +83,11 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo if (!_should_build_hash_table) { _shared_hash_table_dependency->block_writing(); p._shared_hashtable_controller->append_dependency(p.id(), _shared_hash_table_dependency); + } else if (p._is_broadcast_join) { + // avoid vector expand change block address. + // one block can store 4g data, _build_blocks can store 128*4g data. + // if probe data bigger than 512g, runtime filter maybe will core dump when insert data. + _shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT); } _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage"); @@ -135,6 +144,18 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { return Status::OK(); } +vectorized::Sizes& HashJoinBuildSinkLocalState::build_key_sz() { + return _parent->cast()._build_key_sz; +} + +bool HashJoinBuildSinkLocalState::build_unique() const { + return _parent->cast()._build_unique; +} + +std::vector& HashJoinBuildSinkLocalState::runtime_filter_descs() const { + return _parent->cast()._runtime_filter_descs; +} + void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() { auto& p = _parent->cast(); _shared_state->short_circuit_for_probe = @@ -201,9 +222,9 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, [&](auto&& arg, auto has_null_value, auto short_circuit_for_null_in_build_side) -> Status { using HashTableCtxType = std::decay_t; - vectorized::HashJoinBuildContext context(this); - vectorized::ProcessHashTableBuild - hash_table_build_process(rows, block, raw_ptrs, &context, + vectorized::ProcessHashTableBuild + hash_table_build_process(rows, block, raw_ptrs, this, state->batch_size(), offset, state); return hash_table_build_process .template run( @@ -246,11 +267,6 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) { JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN || JoinOpType::value == TJoinOp::FULL_OUTER_JOIN, vectorized::RowRefListWithFlag, vectorized::RowRefList>>; - _shared_state->probe_row_match_iter - .emplace>(); - _shared_state->outer_join_pull_visited_iter - .emplace>(); - if (_build_expr_ctxs.size() == 1 && !p._store_null_in_hash_table[0]) { // Single column optimization switch (_build_expr_ctxs[0]->root()->result_type()) { @@ -519,18 +535,16 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* state, (*local_state._shared_state->build_blocks)[local_state._build_block_idx], local_state._build_block_idx)); } - auto ret = std::visit(Overload {[&](std::monostate&) -> Status { - LOG(FATAL) << "FATAL: uninited hash table"; - __builtin_unreachable(); - }, - [&](auto&& arg) -> Status { - using HashTableCtxType = std::decay_t; - vectorized::RuntimeFilterContext context(&local_state); - vectorized::ProcessRuntimeFilterBuild - runtime_filter_build_process(&context); - return runtime_filter_build_process(state, arg); - }}, - *local_state._shared_state->hash_table_variants); + auto ret = std::visit( + Overload {[&](std::monostate&) -> Status { + LOG(FATAL) << "FATAL: uninited hash table"; + __builtin_unreachable(); + }, + [&](auto&& arg) -> Status { + vectorized::ProcessRuntimeFilterBuild runtime_filter_build_process; + return runtime_filter_build_process(state, arg, &local_state); + }}, + *local_state._shared_state->hash_table_variants); if (!ret.ok()) { if (_shared_hashtable_controller) { _shared_hash_table_context->status = ret; @@ -542,7 +556,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* _shared_hash_table_context->status = Status::OK(); // arena will be shared with other instances. _shared_hash_table_context->arena = local_state._shared_state->arena; - _shared_hash_table_context->blocks = local_state._shared_state->build_blocks; _shared_hash_table_context->hash_table_variants = local_state._shared_state->hash_table_variants; _shared_hash_table_context->short_circuit_for_null_in_probe_side = @@ -578,8 +591,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* *std::static_pointer_cast( _shared_hash_table_context->hash_table_variants)); - local_state._shared_state->build_blocks = _shared_hash_table_context->blocks; - if (!_shared_hash_table_context->runtime_filters.empty()) { auto ret = std::visit( Overload { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 54848f7c86..459b66718c 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -70,12 +70,25 @@ public: void init_short_circuit_for_probe(); HashJoinBuildSinkOperatorX* join_build() { return (HashJoinBuildSinkOperatorX*)_parent; } + vectorized::Sizes& build_key_sz(); + bool build_unique() const; + std::vector& runtime_filter_descs() const; + std::shared_ptr arena() { return _shared_state->arena; } + + void add_hash_buckets_info(const std::string& info) const { + _profile->add_info_string("HashTableBuckets", info); + } + void add_hash_buckets_filled_info(const std::string& info) const { + _profile->add_info_string("HashTableFilledBuckets", info); + } + protected: void _hash_table_init(RuntimeState* state); void _set_build_ignore_flag(vectorized::Block& block, const std::vector& res_col_ids); friend class HashJoinBuildSinkOperatorX; - friend struct vectorized::HashJoinBuildContext; - friend struct vectorized::RuntimeFilterContext; + template + friend struct vectorized::ProcessHashTableBuild; + friend struct vectorized::ProcessRuntimeFilterBuild; // build expr vectorized::VExprContextSPtrs _build_expr_ctxs; @@ -139,16 +152,13 @@ public: } bool should_dry_run(RuntimeState* state) override { - auto tmp = _is_broadcast_join && !state->get_sink_local_state(id()) - ->cast() - ._should_build_hash_table; - return tmp; + return _is_broadcast_join && !state->get_sink_local_state(id()) + ->cast() + ._should_build_hash_table; } private: friend class HashJoinBuildSinkLocalState; - friend struct vectorized::HashJoinBuildContext; - friend struct vectorized::RuntimeFilterContext; // build expr vectorized::VExprContextSPtrs _build_expr_ctxs; @@ -165,9 +175,6 @@ private: vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr; std::vector _runtime_filter_descs; - - std::atomic_bool _probe_open_finish = false; - bool _probe_ignore_null = false; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 1688b17778..c4b009f738 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -59,12 +59,68 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) return Status::OK(); } +Status HashJoinProbeLocalState::open(RuntimeState* state) { + SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(JoinProbeLocalState::open(state)); + + _process_hashtable_ctx_variants = std::make_unique(); + auto& p = _parent->cast(); + std::visit( + [&](auto&& join_op_variants, auto have_other_join_conjunct) { + using JoinOpType = std::decay_t; + using RowRefListType = std::conditional_t< + have_other_join_conjunct, vectorized::RowRefListWithFlags, + std::conditional_t>; + _probe_row_match_iter.emplace>(); + _outer_join_pull_visited_iter + .emplace>(); + _process_hashtable_ctx_variants->emplace>(this, state->batch_size()); + }, + _shared_state->join_op_variants, + vectorized::make_bool_variant(p._have_other_join_conjunct)); + return Status::OK(); +} + void HashJoinProbeLocalState::prepare_for_next() { _probe_index = 0; _ready_probe = false; _prepare_probe_block(); } +bool HashJoinProbeLocalState::have_other_join_conjunct() const { + return _parent->cast()._have_other_join_conjunct; +} + +bool HashJoinProbeLocalState::is_right_semi_anti() const { + return _parent->cast()._is_right_semi_anti; +} + +bool HashJoinProbeLocalState::is_outer_join() const { + return _parent->cast()._is_outer_join; +} + +std::vector* HashJoinProbeLocalState::left_output_slot_flags() { + return &_parent->cast()._left_output_slot_flags; +} + +std::vector* HashJoinProbeLocalState::right_output_slot_flags() { + return &_parent->cast()._right_output_slot_flags; +} + +vectorized::DataTypes HashJoinProbeLocalState::right_table_data_types() { + return _parent->cast()._right_table_data_types; +} + +vectorized::DataTypes HashJoinProbeLocalState::left_table_data_types() { + return _parent->cast()._left_table_data_types; +} + Status HashJoinProbeLocalState::close(RuntimeState* state) { SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(_close_timer); @@ -111,16 +167,7 @@ void HashJoinProbeLocalState::init_for_probe(RuntimeState* state) { if (_probe_inited) { return; } - _process_hashtable_ctx_variants = std::make_unique(); - std::visit( - [&](auto&& join_op_variants) { - using JoinOpType = std::decay_t; - _probe_context.reset(new vectorized::HashJoinProbeContext(this)); - _process_hashtable_ctx_variants - ->emplace>( - _probe_context.get(), state->batch_size()); - }, - _shared_state->join_op_variants); + _probe_inited = true; } diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 4d96b68b7d..ed241141e5 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -45,6 +45,22 @@ public: Status open(RuntimeState*) override { return Status::OK(); } }; +class HashJoinProbeLocalState; + +using HashTableCtxVariants = std::variant< + std::monostate, + vectorized::ProcessHashTableProbe, + vectorized::ProcessHashTableProbe, + vectorized::ProcessHashTableProbe, + vectorized::ProcessHashTableProbe, + vectorized::ProcessHashTableProbe, + vectorized::ProcessHashTableProbe, + vectorized::ProcessHashTableProbe, + vectorized::ProcessHashTableProbe, + vectorized::ProcessHashTableProbe, + vectorized::ProcessHashTableProbe>; + class HashJoinProbeOperatorX; class HashJoinProbeLocalState final : public JoinProbeLocalState { @@ -55,6 +71,7 @@ public: ~HashJoinProbeLocalState() override = default; Status init(RuntimeState* state, LocalStateInfo& info) override; + Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; void prepare_for_next(); @@ -64,13 +81,25 @@ public: SourceState& source_state, vectorized::Block* temp_block, bool check_rows_count = true); - HashJoinProbeOperatorX* join_probe() { return (HashJoinProbeOperatorX*)_parent; } + bool have_other_join_conjunct() const; + bool is_right_semi_anti() const; + bool is_outer_join() const; + std::vector* left_output_slot_flags(); + std::vector* right_output_slot_flags(); + vectorized::DataTypes right_table_data_types(); + vectorized::DataTypes left_table_data_types(); + bool* has_null_in_build_side() { return &_shared_state->_has_null_in_build_side; } + std::shared_ptr> build_blocks() const { + return _shared_state->build_blocks; + } + vectorized::Sizes probe_key_sz() const { return _shared_state->probe_key_sz; } private: void _prepare_probe_block(); bool _need_probe_null_map(vectorized::Block& block, const std::vector& res_col_ids); friend class HashJoinProbeOperatorX; - friend struct vectorized::HashJoinProbeContext; + template + friend struct vectorized::ProcessHashTableProbe; int _probe_index = -1; bool _ready_probe = false; @@ -88,12 +117,15 @@ private: bool _need_null_map_for_probe = false; bool _has_set_need_null_map_for_probe = false; - std::unique_ptr _probe_context; vectorized::ColumnUInt8::MutablePtr _null_map_column; // for cases when a probe row matches more than batch size build rows. bool _is_any_probe_match_row_output = false; - std::unique_ptr _process_hashtable_ctx_variants = - std::make_unique(); + std::unique_ptr _process_hashtable_ctx_variants = + std::make_unique(); + + // for full/right outer join + vectorized::HashTableIteratorVariants _outer_join_pull_visited_iter; + vectorized::HashTableIteratorVariants _probe_row_match_iter; RuntimeProfile::Counter* _probe_expr_call_timer; RuntimeProfile::Counter* _probe_next_timer; @@ -125,7 +157,6 @@ private: RuntimeProfile::Counter& expr_call_timer, std::vector& res_col_ids) const; friend class HashJoinProbeLocalState; - friend struct vectorized::HashJoinProbeContext; // other expr vectorized::VExprContextSPtrs _other_join_conjuncts; diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 907114829e..4bd66be9db 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -152,7 +152,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* COUNTER_SET(local_state._hash_table_size_counter, int64_t(local_state._num_partition)); //so all data from child have sink completed - local_state._dependency->set_ready_for_read(); + local_state._dependency->set_eos(); } return Status::OK(); diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index a80d327700..f9003c6526 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -52,8 +52,8 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: local_state._shared_state->blocks_buffer.front().swap(*output_block); local_state._shared_state->blocks_buffer.pop(); //if buffer have no data, block reading and wait for signal again - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block, - output_block->columns())); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block( + local_state._conjuncts, output_block, output_block->columns())); if (local_state._shared_state->blocks_buffer.empty()) { local_state._dependency->block_reading(); } @@ -67,13 +67,12 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: // if we move the _blocks_buffer output at last(behind 286 line), // it's maybe eos but not output all data: when _blocks_buffer.empty() and _can_read = false (this: _sort_idx && _partition_sorts.size() are 0) RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state)); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block, + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, output_block->columns())); { std::lock_guard lock(local_state._shared_state->buffer_mutex); if (local_state._shared_state->blocks_buffer.empty() && - local_state._shared_state->sort_idx >= - local_state._shared_state->partition_sorts.size()) { + local_state._sort_idx >= local_state._shared_state->partition_sorts.size()) { source_state = SourceState::FINISHED; } } @@ -91,20 +90,18 @@ Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state, SCOPED_TIMER(local_state._get_sorted_timer); //sorter output data one by one bool current_eos = false; - if (local_state._shared_state->sort_idx < local_state._shared_state->partition_sorts.size()) { - RETURN_IF_ERROR( - local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx] - ->get_next(state, output_block, ¤t_eos)); + if (local_state._sort_idx < local_state._shared_state->partition_sorts.size()) { + RETURN_IF_ERROR(local_state._shared_state->partition_sorts[local_state._sort_idx]->get_next( + state, output_block, ¤t_eos)); } if (current_eos) { //current sort have eos, so get next idx local_state._shared_state->previous_row->reset(); - auto rows = local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx] + auto rows = local_state._shared_state->partition_sorts[local_state._sort_idx] ->get_output_rows(); local_state._num_rows_returned += rows; - local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx].reset( - nullptr); - local_state._shared_state->sort_idx++; + local_state._shared_state->partition_sorts[local_state._sort_idx].reset(nullptr); + local_state._sort_idx++; } return Status::OK(); diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h b/be/src/pipeline/exec/partition_sort_source_operator.h index f7d950838c..23393a870a 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.h +++ b/be/src/pipeline/exec/partition_sort_source_operator.h @@ -68,6 +68,7 @@ private: RuntimeProfile::Counter* _get_sorted_timer; RuntimeProfile::Counter* _get_next_timer; int64_t _num_rows_returned; + int _sort_idx = 0; }; class PartitionSortSourceOperatorX final : public OperatorX { diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index c5abc5142c..c1ed407477 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -565,14 +565,10 @@ struct HashJoinSharedState : public JoinSharedState { // maybe share hash table with other fragment instances std::shared_ptr hash_table_variants = std::make_shared(); - // for full/right outer join - vectorized::HashTableIteratorVariants outer_join_pull_visited_iter; - vectorized::HashTableIteratorVariants probe_row_match_iter; vectorized::Sizes probe_key_sz; const std::vector build_side_child_desc; size_t build_exprs_size = 0; - std::shared_ptr> build_blocks = - std::make_shared>(); + std::shared_ptr> build_blocks = nullptr; bool probe_ignore_null = false; }; @@ -626,20 +622,31 @@ public: std::mutex buffer_mutex; std::vector> partition_sorts; std::unique_ptr previous_row = nullptr; - int sort_idx = 0; }; class PartitionSortDependency final : public WriteDependency { public: using SharedState = PartitionSortNodeSharedState; - PartitionSortDependency(int id) : WriteDependency(id, "PartitionSortDependency") {} + PartitionSortDependency(int id) : WriteDependency(id, "PartitionSortDependency"), _eos(false) {} ~PartitionSortDependency() override = default; void* shared_state() override { return (void*)&_partition_sort_state; }; void set_ready_for_write() override {} void block_writing() override {} + [[nodiscard]] Dependency* read_blocked_by() override { + if (config::enable_fuzzy_mode && !(_ready_for_read || _eos) && + _read_dependency_watcher.elapsed_time() > SLOW_DEPENDENCY_THRESHOLD) { + LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " + << id(); + } + return _ready_for_read || _eos ? nullptr : this; + } + + void set_eos() { _eos = true; } + private: PartitionSortNodeSharedState _partition_sort_state; + std::atomic _eos; }; class AsyncWriterDependency final : public WriteDependency { diff --git a/be/src/vec/exec/join/process_hash_table_probe.h b/be/src/vec/exec/join/process_hash_table_probe.h index bf6a68f690..e20e7c8318 100644 --- a/be/src/vec/exec/join/process_hash_table_probe.h +++ b/be/src/vec/exec/join/process_hash_table_probe.h @@ -37,9 +37,9 @@ using MutableColumns = std::vector; using NullMap = ColumnUInt8::Container; using ConstNullMapPtr = const NullMap*; -template +template struct ProcessHashTableProbe { - ProcessHashTableProbe(HashJoinProbeContext* join_context, int batch_size); + ProcessHashTableProbe(Parent* parent, int batch_size); ~ProcessHashTableProbe() = default; // output build side result column @@ -78,8 +78,9 @@ struct ProcessHashTableProbe { void _emplace_element(int8_t block_offset, int32_t block_row, int& current_offset); template - HashTableType::State _init_probe_side(HashTableType& hash_table_ctx, size_t probe_rows, - bool with_other_join_conjuncts, const uint8_t* null_map); + typename HashTableType::State _init_probe_side(HashTableType& hash_table_ctx, size_t probe_rows, + bool with_other_join_conjuncts, + const uint8_t* null_map); template ForwardIterator& _probe_row_match(int& current_offset, int& probe_index, @@ -91,9 +92,9 @@ struct ProcessHashTableProbe { Status process_data_in_hashtable(HashTableType& hash_table_ctx, MutableBlock& mutable_block, Block* output_block, bool* eos); - vectorized::HashJoinProbeContext* _join_context; + Parent* _parent; const int _batch_size; - const std::vector& _build_blocks; + std::shared_ptr> _build_blocks; std::unique_ptr _arena; std::vector _probe_keys; @@ -118,6 +119,13 @@ struct ProcessHashTableProbe { int _right_col_len; int _row_count_from_last_probe; + bool _have_other_join_conjunct; + bool _is_right_semi_anti; + Sizes _probe_key_sz; + std::vector* _left_output_slot_flags; + std::vector* _right_output_slot_flags; + bool* _has_null_in_build_side; + RuntimeProfile::Counter* _rows_returned_counter; RuntimeProfile::Counter* _search_hashtable_timer; RuntimeProfile::Counter* _build_side_output_timer; diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 54ca59b6ea..16ef76d02c 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -18,6 +18,7 @@ #pragma once #include "common/status.h" +#include "pipeline/exec/hashjoin_probe_operator.h" #include "process_hash_table_probe.h" #include "runtime/thread_context.h" // IWYU pragma: keep #include "util/simd/bits.h" @@ -27,32 +28,35 @@ namespace doris::vectorized { -template -ProcessHashTableProbe::ProcessHashTableProbe(HashJoinProbeContext* join_context, - int batch_size) - : _join_context(join_context), +template +ProcessHashTableProbe::ProcessHashTableProbe(Parent* parent, int batch_size) + : _parent(parent), _batch_size(batch_size), - _build_blocks(*join_context->_build_blocks), - _tuple_is_null_left_flags( - join_context->_is_outer_join - ? &(reinterpret_cast( - **join_context->_tuple_is_null_left_flag_column) - .get_data()) - : nullptr), - _tuple_is_null_right_flags( - join_context->_is_outer_join - ? &(reinterpret_cast( - **join_context->_tuple_is_null_right_flag_column) - .get_data()) - : nullptr), - _rows_returned_counter(join_context->_rows_returned_counter), - _search_hashtable_timer(join_context->_search_hashtable_timer), - _build_side_output_timer(join_context->_build_side_output_timer), - _probe_side_output_timer(join_context->_probe_side_output_timer), - _probe_process_hashtable_timer(join_context->_probe_process_hashtable_timer) {} + _build_blocks(parent->build_blocks()), + _tuple_is_null_left_flags(parent->is_outer_join() + ? &(reinterpret_cast( + *parent->_tuple_is_null_left_flag_column) + .get_data()) + : nullptr), + _tuple_is_null_right_flags(parent->is_outer_join() + ? &(reinterpret_cast( + *parent->_tuple_is_null_right_flag_column) + .get_data()) + : nullptr), + _have_other_join_conjunct(parent->have_other_join_conjunct()), + _is_right_semi_anti(parent->is_right_semi_anti()), + _probe_key_sz(parent->probe_key_sz()), + _left_output_slot_flags(parent->left_output_slot_flags()), + _right_output_slot_flags(parent->right_output_slot_flags()), + _has_null_in_build_side(parent->has_null_in_build_side()), + _rows_returned_counter(parent->_rows_returned_counter), + _search_hashtable_timer(parent->_search_hashtable_timer), + _build_side_output_timer(parent->_build_side_output_timer), + _probe_side_output_timer(parent->_probe_side_output_timer), + _probe_process_hashtable_timer(parent->_probe_process_hashtable_timer) {} -template -void ProcessHashTableProbe::build_side_output_column( +template +void ProcessHashTableProbe::build_side_output_column( MutableColumns& mcol, const std::vector& output_slot_flags, int size, bool have_other_join_conjunct) { SCOPED_TIMER(_build_side_output_timer); @@ -66,9 +70,9 @@ void ProcessHashTableProbe::build_side_output_column( JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; if (!is_semi_anti_join || have_other_join_conjunct) { - if (_build_blocks.size() == 1) { + if (_build_blocks->size() == 1) { for (int i = 0; i < _right_col_len; i++) { - auto& column = *_build_blocks[0].get_by_position(i).column; + auto& column = *(*_build_blocks)[0].get_by_position(i).column; if (output_slot_flags[i]) { mcol[i + _right_col_idx]->insert_indices_from(column, _build_block_rows.data(), _build_block_rows.data() + size); @@ -86,7 +90,7 @@ void ProcessHashTableProbe::build_side_output_column( assert_cast(mcol[i + _right_col_idx].get()) ->insert_default(); } else { - auto& column = *_build_blocks[_build_block_offsets[j]] + auto& column = *(*_build_blocks)[_build_block_offsets[j]] .get_by_position(i) .column; mcol[i + _right_col_idx]->insert_from(column, _build_block_rows[j]); @@ -101,7 +105,7 @@ void ProcessHashTableProbe::build_side_output_column( // just insert default value mcol[i + _right_col_idx]->insert_default(); } else { - auto& column = *_build_blocks[_build_block_offsets[j]] + auto& column = *(*_build_blocks)[_build_block_offsets[j]] .get_by_position(i) .column; mcol[i + _right_col_idx]->insert_from(column, _build_block_rows[j]); @@ -125,13 +129,13 @@ void ProcessHashTableProbe::build_side_output_column( } } -template -void ProcessHashTableProbe::probe_side_output_column( +template +void ProcessHashTableProbe::probe_side_output_column( MutableColumns& mcol, const std::vector& output_slot_flags, int size, int last_probe_index, size_t probe_size, bool all_match_one, bool have_other_join_conjunct) { SCOPED_TIMER(_probe_side_output_timer); - auto& probe_block = *_join_context->_probe_block; + auto& probe_block = _parent->_probe_block; for (int i = 0; i < output_slot_flags.size(); ++i) { if (output_slot_flags[i]) { auto& column = probe_block.get_by_position(i).column; @@ -152,15 +156,15 @@ void ProcessHashTableProbe::probe_side_output_column( } } -template +template template -HashTableType::State ProcessHashTableProbe::_init_probe_side( +typename HashTableType::State ProcessHashTableProbe::_init_probe_side( HashTableType& hash_table_ctx, size_t probe_rows, bool with_other_join_conjuncts, const uint8_t* null_map) { - _right_col_idx = _join_context->_is_right_semi_anti && !with_other_join_conjuncts + _right_col_idx = _is_right_semi_anti && !with_other_join_conjuncts ? 0 - : _join_context->_left_table_data_types->size(); - _right_col_len = _join_context->_right_table_data_types->size(); + : _parent->left_table_data_types().size(); + _right_col_len = _parent->right_table_data_types().size(); _row_count_from_last_probe = 0; _build_block_rows.clear(); @@ -177,24 +181,20 @@ HashTableType::State ProcessHashTableProbe::_init_probe_side( _build_block_rows.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE); _build_block_offsets.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE); - if (!*_join_context->_ready_probe) { - *_join_context->_ready_probe = true; + if (!_parent->_ready_probe) { + _parent->_ready_probe = true; hash_table_ctx.reset(); - hash_table_ctx.init_serialized_keys(*_join_context->_probe_columns, - _join_context->_probe_key_sz, probe_rows, null_map); + hash_table_ctx.init_serialized_keys(_parent->_probe_columns, _probe_key_sz, probe_rows, + null_map); } - return typename HashTableType::State(*_join_context->_probe_columns, - _join_context->_probe_key_sz); + return typename HashTableType::State(_parent->_probe_columns, _probe_key_sz); } -template +template template -ForwardIterator& ProcessHashTableProbe::_probe_row_match(int& current_offset, - int& probe_index, - size_t& probe_size, - bool& all_match_one) { - auto& probe_row_match_iter = - std::get>(*_join_context->_probe_row_match_iter); +ForwardIterator& ProcessHashTableProbe::_probe_row_match( + int& current_offset, int& probe_index, size_t& probe_size, bool& all_match_one) { + auto& probe_row_match_iter = std::get>(_parent->_probe_row_match_iter); if (!probe_row_match_iter.ok()) { return probe_row_match_iter; } @@ -219,22 +219,24 @@ ForwardIterator& ProcessHashTableProbe::_probe_row_match(int return probe_row_match_iter; } -template -void ProcessHashTableProbe::_emplace_element(int8_t block_offset, int32_t block_row, - int& current_offset) { +template +void ProcessHashTableProbe::_emplace_element(int8_t block_offset, + int32_t block_row, + int& current_offset) { _build_block_offsets.emplace_back(block_offset); _build_block_rows.emplace_back(block_row); current_offset++; } -template +template template -Status ProcessHashTableProbe::do_process(HashTableType& hash_table_ctx, - ConstNullMapPtr null_map, - MutableBlock& mutable_block, - Block* output_block, size_t probe_rows) { - auto& probe_index = *_join_context->_probe_index; +Status ProcessHashTableProbe::do_process(HashTableType& hash_table_ctx, + ConstNullMapPtr null_map, + MutableBlock& mutable_block, + Block* output_block, + size_t probe_rows) { + auto& probe_index = _parent->_probe_index; using KeyGetter = typename HashTableType::State; using Mapped = typename HashTableType::Mapped; @@ -318,9 +320,8 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c (JoinOpType != TJoinOp::LEFT_SEMI_JOIN) ^ find_result.is_found(); if constexpr (is_mark_join) { ++current_offset; - bool null_result = - (need_null_map_for_probe && (*null_map)[probe_index]) || - (!need_go_ahead && _join_context->_has_null_value_in_build_side); + bool null_result = (need_null_map_for_probe && (*null_map)[probe_index]) || + (!need_go_ahead && *_has_null_in_build_side); if (null_result) { mark_column->insert_null(); } else { @@ -401,14 +402,13 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c probe_size = probe_index - last_probe_index + probe_row_match_iter.ok(); } - build_side_output_column(mcol, *_join_context->_right_output_slot_flags, current_offset, - with_other_conjuncts); + build_side_output_column(mcol, *_right_output_slot_flags, current_offset, with_other_conjuncts); if constexpr (with_other_conjuncts || (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && JoinOpType != TJoinOp::RIGHT_ANTI_JOIN)) { RETURN_IF_CATCH_EXCEPTION(probe_side_output_column( - mcol, *_join_context->_left_output_slot_flags, current_offset, last_probe_index, - probe_size, all_match_one, with_other_conjuncts)); + mcol, *_left_output_slot_flags, current_offset, last_probe_index, probe_size, + all_match_one, with_other_conjuncts)); } output_block->swap(mutable_block.to_block()); @@ -421,8 +421,8 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c return Status::OK(); } -template -Status ProcessHashTableProbe::do_other_join_conjuncts( +template +Status ProcessHashTableProbe::do_other_join_conjuncts( Block* output_block, bool is_mark_join, int multi_matched_output_row_count, bool is_the_last_sub_block) { // dispose the other join conjunct exec @@ -431,14 +431,14 @@ Status ProcessHashTableProbe::do_other_join_conjuncts( return Status::OK(); } - SCOPED_TIMER(_join_context->_process_other_join_conjunct_timer); + SCOPED_TIMER(_parent->_process_other_join_conjunct_timer); int orig_columns = output_block->columns(); IColumn::Filter other_conjunct_filter(row_count, 1); { bool can_be_filter_all = false; - RETURN_IF_ERROR(VExprContext::execute_conjuncts( - *_join_context->_other_join_conjuncts, nullptr, output_block, - &other_conjunct_filter, &can_be_filter_all)); + RETURN_IF_ERROR(VExprContext::execute_conjuncts(_parent->_other_join_conjuncts, nullptr, + output_block, &other_conjunct_filter, + &can_be_filter_all)); } auto filter_column = ColumnUInt8::create(); @@ -465,7 +465,7 @@ Status ProcessHashTableProbe::do_other_join_conjuncts( null_map_data, filter_map, output_block); // This is the last sub block of splitted block, and no equal-conjuncts-matched tuple // is output in all sub blocks, need to output a tuple for this probe row - if (is_the_last_sub_block && !*_join_context->_is_any_probe_match_row_output) { + if (is_the_last_sub_block && !_parent->_is_any_probe_match_row_output) { filter_map[0] = true; null_map_data[0] = true; } @@ -514,7 +514,7 @@ Status ProcessHashTableProbe::do_other_join_conjuncts( // It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row if (multi_matched_output_row_count > 0) { - *_join_context->_is_any_probe_match_row_output = false; + _parent->_is_any_probe_match_row_output = false; _process_splited_equal_matched_tuples(row_count - multi_matched_output_row_count, multi_matched_output_row_count, filter_column_ptr, null_map_data, filter_map, output_block); @@ -534,7 +534,7 @@ Status ProcessHashTableProbe::do_other_join_conjuncts( size_t start_row_idx = 1; // We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks if (_row_count_from_last_probe > 0) { - if (*_join_context->_is_any_probe_match_row_output) { + if (_parent->_is_any_probe_match_row_output) { // if any matched tuple for this probe row is output, // ignore all the following tuples for this probe row. for (int row_idx = 0; row_idx < _row_count_from_last_probe; ++row_idx) { @@ -563,14 +563,13 @@ Status ProcessHashTableProbe::do_other_join_conjuncts( if (multi_matched_output_row_count > 0) { // If a matched row is output, all the equal-matched tuples in // the following sub blocks should be ignored - *_join_context->_is_any_probe_match_row_output = filter_map[row_count - 1]; - } else if (_row_count_from_last_probe > 0 && - !*_join_context->_is_any_probe_match_row_output) { + _parent->_is_any_probe_match_row_output = filter_map[row_count - 1]; + } else if (_row_count_from_last_probe > 0 && !_parent->_is_any_probe_match_row_output) { // We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks, // and no matched tuple has been output in all previous run. // If a tuple is output in this run, all the following mathced tuples should be ignored if (filter_map[_row_count_from_last_probe - 1]) { - *_join_context->_is_any_probe_match_row_output = true; + _parent->_is_any_probe_match_row_output = true; } } @@ -609,7 +608,7 @@ Status ProcessHashTableProbe::do_other_join_conjuncts( size_t start_row_idx = 1; // We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks - if (_row_count_from_last_probe > 0 && *_join_context->_is_any_probe_match_row_output) { + if (_row_count_from_last_probe > 0 && _parent->_is_any_probe_match_row_output) { // if any matched tuple for this probe row is output, // ignore all the following tuples for this probe row. for (int row_idx = 0; row_idx < _row_count_from_last_probe; ++row_idx) { @@ -658,15 +657,15 @@ Status ProcessHashTableProbe::do_other_join_conjuncts( int end_row_idx = 0; if (_row_count_from_last_probe > 0) { end_row_idx = row_count - multi_matched_output_row_count; - if (!*_join_context->_is_any_probe_match_row_output) { + if (!_parent->_is_any_probe_match_row_output) { // We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks, // and no matched tuple has been output in all previous run. // If a tuple is output in this run, all the following mathced tuples should be ignored if (filter_map[_row_count_from_last_probe - 1]) { - *_join_context->_is_any_probe_match_row_output = true; + _parent->_is_any_probe_match_row_output = true; filter_map[_row_count_from_last_probe - 1] = false; } - if (is_the_last_sub_block && !*_join_context->_is_any_probe_match_row_output) { + if (is_the_last_sub_block && !_parent->_is_any_probe_match_row_output) { // This is the last sub block of splitted block, and no equal-conjuncts-matched tuple // is output in all sub blocks, output a tuple for this probe row filter_map[0] = true; @@ -676,7 +675,7 @@ Status ProcessHashTableProbe::do_other_join_conjuncts( // It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row // If a matched row is output, all the equal-matched tuples in // the following sub blocks should be ignored - *_join_context->_is_any_probe_match_row_output = filter_map[row_count - 1]; + _parent->_is_any_probe_match_row_output = filter_map[row_count - 1]; filter_map[row_count - 1] = false; } } else if (multi_matched_output_row_count > 0) { @@ -684,7 +683,7 @@ Status ProcessHashTableProbe::do_other_join_conjuncts( // It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row // If a matched row is output, all the equal-matched tuples in // the following sub blocks should be ignored - *_join_context->_is_any_probe_match_row_output = filter_map[row_count - 1]; + _parent->_is_any_probe_match_row_output = filter_map[row_count - 1]; filter_map[row_count - 1] = false; } else { end_row_idx = row_count; @@ -744,8 +743,8 @@ Status ProcessHashTableProbe::do_other_join_conjuncts( // and when processing the last sub block, check whether there are any // equal-conjuncts-matched tuple is output in all sub blocks, // if not, just pick a tuple and output. -template -void ProcessHashTableProbe::_process_splited_equal_matched_tuples( +template +void ProcessHashTableProbe::_process_splited_equal_matched_tuples( int start_row_idx, int row_count, const UInt8* __restrict other_hit_column, UInt8* __restrict null_map_data, UInt8* __restrict filter_map, Block* output_block) { int end_row_idx = start_row_idx + row_count; @@ -770,16 +769,15 @@ void ProcessHashTableProbe::_process_splited_equal_matched_tuples( *_visited_map[i] |= other_hit; } } - *_join_context->_is_any_probe_match_row_output |= + _parent->_is_any_probe_match_row_output |= simd::contain_byte(filter_map + start_row_idx, row_count, 1); } -template +template template -Status ProcessHashTableProbe::process_data_in_hashtable(HashTableType& hash_table_ctx, - MutableBlock& mutable_block, - Block* output_block, - bool* eos) { +Status ProcessHashTableProbe::process_data_in_hashtable( + HashTableType& hash_table_ctx, MutableBlock& mutable_block, Block* output_block, + bool* eos) { using Mapped = typename HashTableType::Mapped; SCOPED_TIMER(_probe_process_hashtable_timer); if constexpr (std::is_same_v || @@ -787,16 +785,15 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp hash_table_ctx.init_iterator(); auto& mcol = mutable_block.mutable_columns(); - bool right_semi_anti_without_other = - _join_context->_is_right_semi_anti && !_join_context->_have_other_join_conjunct; + bool right_semi_anti_without_other = _is_right_semi_anti && !_have_other_join_conjunct; int right_col_idx = - right_semi_anti_without_other ? 0 : _join_context->_left_table_data_types->size(); - int right_col_len = _join_context->_right_table_data_types->size(); + right_semi_anti_without_other ? 0 : _parent->left_table_data_types().size(); + int right_col_len = _parent->right_table_data_types().size(); auto& iter = hash_table_ctx.iterator; auto block_size = 0; auto& visited_iter = - std::get>(*_join_context->_outer_join_pull_visited_iter); + std::get>(_parent->_outer_join_pull_visited_iter); _build_blocks_locs.resize(_batch_size); auto register_build_loc = [&](int8_t offset, int32_t row_nums) { _build_blocks_locs[block_size++] = std::pair(offset, row_nums); @@ -874,13 +871,13 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp auto insert_build_rows = [&](int8_t offset) { for (size_t j = 0; j < right_col_len; ++j) { - auto& column = *_build_blocks[offset].get_by_position(j).column; + auto& column = *(*_build_blocks)[offset].get_by_position(j).column; mcol[j + right_col_idx]->insert_indices_from( column, _build_block_rows.data(), _build_block_rows.data() + _build_block_rows.size()); } }; - if (_build_blocks.size() > 1) { + if (_build_blocks->size() > 1) { std::sort(_build_blocks_locs.begin(), _build_blocks_locs.end(), [](const auto a, const auto b) { return a.first > b.first; }); auto start = 0, end = 0; @@ -897,7 +894,7 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp start = end; insert_build_rows(offset); } - } else if (_build_blocks.size() == 1) { + } else if (_build_blocks->size() == 1) { const auto size = _build_blocks_locs.size(); _build_block_rows.resize(_build_blocks_locs.size()); for (int i = 0; i < size; i++) { @@ -907,7 +904,7 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp } // just resize the left table column in case with other conjunct to make block size is not zero - if (_join_context->_is_right_semi_anti && _join_context->_have_other_join_conjunct) { + if (_is_right_semi_anti && _have_other_join_conjunct) { auto target_size = mcol[right_col_idx]->size(); for (int i = 0; i < right_col_idx; ++i) { mcol[i]->resize(target_size); @@ -933,13 +930,11 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp } } -template +template template -Status ProcessHashTableProbe::process(HashTableType& hash_table_ctx, - ConstNullMapPtr null_map, - MutableBlock& mutable_block, Block* output_block, - size_t probe_rows, bool is_mark_join, - bool have_other_join_conjunct) { +Status ProcessHashTableProbe::process( + HashTableType& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, + Block* output_block, size_t probe_rows, bool is_mark_join, bool have_other_join_conjunct) { Status res; if constexpr (!std::is_same_v) { if (have_other_join_conjunct) { @@ -973,74 +968,78 @@ struct ExtractType { using Type = U; }; -#define INSTANTIATION(JoinOpType, T) \ - template Status \ - ProcessHashTableProbe::process::Type>( \ - ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ - MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join, bool have_other_join_conjunct); \ - template Status \ - ProcessHashTableProbe::process::Type>( \ - ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ - MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join, bool have_other_join_conjunct); \ - template Status \ - ProcessHashTableProbe::process::Type>( \ - ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ - MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join, bool have_other_join_conjunct); \ - template Status \ - ProcessHashTableProbe::process::Type>( \ - ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ - MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join, bool have_other_join_conjunct); \ - \ - template Status \ - ProcessHashTableProbe::process_data_in_hashtable::Type>( \ - ExtractType::Type & hash_table_ctx, MutableBlock & mutable_block, \ - Block * output_block, bool* eos) +#define INSTANTIATION(JoinOpType, Parent, T) \ + template Status \ + ProcessHashTableProbe::process::Type>( \ + ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ + MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ + bool is_mark_join, bool have_other_join_conjunct); \ + template Status \ + ProcessHashTableProbe::process::Type>( \ + ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ + MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ + bool is_mark_join, bool have_other_join_conjunct); \ + template Status \ + ProcessHashTableProbe::process::Type>( \ + ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ + MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ + bool is_mark_join, bool have_other_join_conjunct); \ + template Status \ + ProcessHashTableProbe::process::Type>( \ + ExtractType::Type & hash_table_ctx, ConstNullMapPtr null_map, \ + MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ + bool is_mark_join, bool have_other_join_conjunct); \ + \ + template Status ProcessHashTableProbe::process_data_in_hashtable< \ + ExtractType::Type>(ExtractType::Type & hash_table_ctx, \ + MutableBlock & mutable_block, Block * output_block, \ + bool* eos) -#define INSTANTIATION_FOR(JoinOpType) \ - template struct ProcessHashTableProbe; \ - \ - INSTANTIATION(JoinOpType, (SerializedHashTableContext)); \ - INSTANTIATION(JoinOpType, (I8HashTableContext)); \ - INSTANTIATION(JoinOpType, (I16HashTableContext)); \ - INSTANTIATION(JoinOpType, (I32HashTableContext)); \ - INSTANTIATION(JoinOpType, (I64HashTableContext)); \ - INSTANTIATION(JoinOpType, (I128HashTableContext)); \ - INSTANTIATION(JoinOpType, (I256HashTableContext)); \ - INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (SerializedHashTableContext)); \ - INSTANTIATION(JoinOpType, (I8HashTableContext)); \ - INSTANTIATION(JoinOpType, (I16HashTableContext)); \ - INSTANTIATION(JoinOpType, (I32HashTableContext)); \ - INSTANTIATION(JoinOpType, (I64HashTableContext)); \ - INSTANTIATION(JoinOpType, (I128HashTableContext)); \ - INSTANTIATION(JoinOpType, (I256HashTableContext)); \ - INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (SerializedHashTableContext)); \ - INSTANTIATION(JoinOpType, (I8HashTableContext)); \ - INSTANTIATION(JoinOpType, (I16HashTableContext)); \ - INSTANTIATION(JoinOpType, (I32HashTableContext)); \ - INSTANTIATION(JoinOpType, (I64HashTableContext)); \ - INSTANTIATION(JoinOpType, (I128HashTableContext)); \ - INSTANTIATION(JoinOpType, (I256HashTableContext)); \ - INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext)); \ - INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext)) +#define INSTANTIATION_FOR1(JoinOpType, Parent) \ + template struct ProcessHashTableProbe; \ + \ + INSTANTIATION(JoinOpType, Parent, (SerializedHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I8HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I16HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I32HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I64HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I128HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I256HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (SerializedHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I8HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I16HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I32HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I64HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I128HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I256HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (SerializedHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I8HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I16HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I32HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I64HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I128HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I256HashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext)); \ + INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext)) + +#define INSTANTIATION_FOR(JoinOpType) \ + INSTANTIATION_FOR1(JoinOpType, HashJoinNode); \ + INSTANTIATION_FOR1(JoinOpType, pipeline::HashJoinProbeLocalState) } // namespace doris::vectorized diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index e3647762d4..f48a7e278f 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -84,120 +84,6 @@ template Status HashJoinNode::_extract_join_column( std::vector>&, std::vector> const&); -RuntimeFilterContext::RuntimeFilterContext(HashJoinNode* join_node) - : _runtime_filter_descs(join_node->_runtime_filter_descs), - _runtime_filter_slots(join_node->_runtime_filter_slots), - _build_expr_ctxs(join_node->_build_expr_ctxs), - _build_rf_cardinality(join_node->_build_rf_cardinality), - _inserted_rows(join_node->_inserted_rows), - _push_down_timer(join_node->_push_down_timer), - _push_compute_timer(join_node->_push_compute_timer) {} - -RuntimeFilterContext::RuntimeFilterContext(pipeline::HashJoinBuildSinkLocalState* local_state) - : _runtime_filter_descs(local_state->join_build()->_runtime_filter_descs), - _runtime_filter_slots(local_state->_runtime_filter_slots), - _build_expr_ctxs(local_state->_build_expr_ctxs), - _build_rf_cardinality(local_state->_build_rf_cardinality), - _inserted_rows(local_state->_inserted_rows), - _push_down_timer(local_state->_push_down_timer), - _push_compute_timer(local_state->_push_compute_timer) {} - -HashJoinProbeContext::HashJoinProbeContext(HashJoinNode* join_node) - : _have_other_join_conjunct(join_node->_have_other_join_conjunct), - _is_right_semi_anti(join_node->_is_right_semi_anti), - _is_outer_join(join_node->_is_outer_join), - _tuple_is_null_left_flag_column(&join_node->_tuple_is_null_left_flag_column), - _tuple_is_null_right_flag_column(&join_node->_tuple_is_null_right_flag_column), - _other_join_conjuncts(&join_node->_other_join_conjuncts), - _right_table_data_types(&join_node->_right_table_data_types), - _left_table_data_types(&join_node->_left_table_data_types), - _search_hashtable_timer(join_node->_search_hashtable_timer), - _build_side_output_timer(join_node->_build_side_output_timer), - _probe_side_output_timer(join_node->_probe_side_output_timer), - _probe_process_hashtable_timer(join_node->_probe_process_hashtable_timer), - _process_other_join_conjunct_timer(join_node->_process_other_join_conjunct_timer), - _rows_returned_counter(join_node->_rows_returned_counter), - _probe_arena_memory_usage(join_node->_probe_arena_memory_usage), - _arena(join_node->_arena), - _outer_join_pull_visited_iter(&join_node->_outer_join_pull_visited_iter), - _probe_row_match_iter(&join_node->_probe_row_match_iter), - _build_blocks(join_node->_build_blocks), - _probe_block(&join_node->_probe_block), - _probe_columns(&join_node->_probe_columns), - _probe_index(&join_node->_probe_index), - _ready_probe(&join_node->_ready_probe), - _probe_key_sz(join_node->_probe_key_sz), - _left_output_slot_flags(&join_node->_left_output_slot_flags), - _right_output_slot_flags(&join_node->_right_output_slot_flags), - _is_any_probe_match_row_output(&join_node->_is_any_probe_match_row_output), - _has_null_value_in_build_side(join_node->_has_null_in_build_side) {} - -HashJoinProbeContext::HashJoinProbeContext(pipeline::HashJoinProbeLocalState* local_state) - : _have_other_join_conjunct(local_state->join_probe()->_have_other_join_conjunct), - _is_right_semi_anti(local_state->join_probe()->_is_right_semi_anti), - _is_outer_join(local_state->join_probe()->_is_outer_join), - _tuple_is_null_left_flag_column(&local_state->_tuple_is_null_left_flag_column), - _tuple_is_null_right_flag_column(&local_state->_tuple_is_null_right_flag_column), - _other_join_conjuncts(&local_state->_other_join_conjuncts), - _right_table_data_types(&local_state->join_probe()->_right_table_data_types), - _left_table_data_types(&local_state->join_probe()->_left_table_data_types), - _search_hashtable_timer(local_state->_search_hashtable_timer), - _build_side_output_timer(local_state->_build_side_output_timer), - _probe_side_output_timer(local_state->_probe_side_output_timer), - _probe_process_hashtable_timer(local_state->_probe_process_hashtable_timer), - _process_other_join_conjunct_timer(local_state->_process_other_join_conjunct_timer), - _rows_returned_counter(local_state->_rows_returned_counter), - _probe_arena_memory_usage(local_state->_probe_arena_memory_usage), - _arena(local_state->_shared_state->arena), - _outer_join_pull_visited_iter(&local_state->_shared_state->outer_join_pull_visited_iter), - _probe_row_match_iter(&local_state->_shared_state->probe_row_match_iter), - _build_blocks(local_state->_shared_state->build_blocks), - _probe_block(&local_state->_probe_block), - _probe_columns(&local_state->_probe_columns), - _probe_index(&local_state->_probe_index), - _ready_probe(&local_state->_ready_probe), - _probe_key_sz(local_state->_shared_state->probe_key_sz), - _left_output_slot_flags(&local_state->join_probe()->_left_output_slot_flags), - _right_output_slot_flags(&local_state->join_probe()->_right_output_slot_flags), - _is_any_probe_match_row_output(&local_state->_is_any_probe_match_row_output), - _has_null_value_in_build_side(local_state->_shared_state->_has_null_in_build_side) {} - -HashJoinBuildContext::HashJoinBuildContext(HashJoinNode* join_node) - : _hash_table_memory_usage(join_node->_hash_table_memory_usage), - _build_buckets_counter(join_node->_build_buckets_counter), - _build_collisions_counter(join_node->_build_collisions_counter), - _build_buckets_fill_counter(join_node->_build_buckets_fill_counter), - _build_table_insert_timer(join_node->_build_table_insert_timer), - _build_table_expanse_timer(join_node->_build_table_expanse_timer), - _build_table_convert_timer(join_node->_build_table_convert_timer), - _build_side_compute_hash_timer(join_node->_build_side_compute_hash_timer), - _build_arena_memory_usage(join_node->_build_arena_memory_usage), - _profile(join_node->runtime_profile()), - _build_key_sz(join_node->_build_key_sz), - _build_unique(join_node->_build_unique), - _runtime_filter_descs(join_node->_runtime_filter_descs), - _inserted_rows(join_node->_inserted_rows), - _arena(join_node->_arena), - _build_rf_cardinality(join_node->_build_rf_cardinality) {} - -HashJoinBuildContext::HashJoinBuildContext(pipeline::HashJoinBuildSinkLocalState* local_state) - : _hash_table_memory_usage(local_state->_hash_table_memory_usage), - _build_buckets_counter(local_state->_build_buckets_counter), - _build_collisions_counter(local_state->_build_collisions_counter), - _build_buckets_fill_counter(local_state->_build_buckets_fill_counter), - _build_table_insert_timer(local_state->_build_table_insert_timer), - _build_table_expanse_timer(local_state->_build_table_expanse_timer), - _build_table_convert_timer(local_state->_build_table_convert_timer), - _build_side_compute_hash_timer(local_state->_build_side_compute_hash_timer), - _build_arena_memory_usage(local_state->_build_arena_memory_usage), - _profile(local_state->profile()), - _build_key_sz(local_state->join_build()->_build_key_sz), - _build_unique(local_state->join_build()->_build_unique), - _runtime_filter_descs(local_state->join_build()->_runtime_filter_descs), - _inserted_rows(local_state->_inserted_rows), - _arena(local_state->_shared_state->arena), - _build_rf_cardinality(local_state->_build_rf_cardinality) {} - HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : VJoinNodeBase(pool, tnode, descs), _is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join && @@ -896,11 +782,8 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc __builtin_unreachable(); }, [&](auto&& arg) -> Status { - using HashTableCtxType = std::decay_t; - RuntimeFilterContext context(this); - ProcessRuntimeFilterBuild - runtime_filter_build_process(&context); - return runtime_filter_build_process(state, arg); + ProcessRuntimeFilterBuild runtime_filter_build_process; + return runtime_filter_build_process(state, arg, this); }}, *_hash_table_variants); if (!ret.ok()) { @@ -1119,10 +1002,9 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin [&](auto&& arg, auto has_null_value, auto short_circuit_for_null_in_build_side) -> Status { using HashTableCtxType = std::decay_t; - HashJoinBuildContext context(this); - ProcessHashTableBuild hash_table_build_process( - rows, block, raw_ptrs, &context, state->batch_size(), offset, - state); + ProcessHashTableBuild + hash_table_build_process(rows, block, raw_ptrs, this, + state->batch_size(), offset, state); return hash_table_build_process .template run( arg, @@ -1277,9 +1159,9 @@ void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) { std::visit( [&](auto&& join_op_variants) { using JoinOpType = std::decay_t; - _probe_context.reset(new HashJoinProbeContext(this)); - _process_hashtable_ctx_variants->emplace>( - _probe_context.get(), state->batch_size()); + _process_hashtable_ctx_variants + ->emplace>( + this, state->batch_size()); }, _join_op_variants); } diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 19ddfc4626..b9264bc145 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -69,99 +69,52 @@ constexpr size_t CHECK_FRECUENCY = 65536; struct UInt128; struct UInt256; -template +template struct ProcessHashTableProbe; class HashJoinNode; -struct RuntimeFilterContext { - RuntimeFilterContext(HashJoinNode* join_node); - RuntimeFilterContext(pipeline::HashJoinBuildSinkLocalState* local_state); - std::vector& _runtime_filter_descs; - std::shared_ptr& _runtime_filter_slots; - VExprContextSPtrs& _build_expr_ctxs; - size_t& _build_rf_cardinality; - std::unordered_map>& _inserted_rows; - RuntimeProfile::Counter* _push_down_timer; - RuntimeProfile::Counter* _push_compute_timer; -}; - -template struct ProcessRuntimeFilterBuild { - ProcessRuntimeFilterBuild(RuntimeFilterContext* context) : _context(context) {} - - Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx) { - if (_context->_runtime_filter_descs.empty()) { + template + Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx, Parent* parent) { + if (parent->runtime_filter_descs().empty()) { return Status::OK(); } - _context->_runtime_filter_slots = std::make_shared( - _context->_build_expr_ctxs, _context->_runtime_filter_descs); + parent->_runtime_filter_slots = std::make_shared( + parent->_build_expr_ctxs, parent->runtime_filter_descs()); - RETURN_IF_ERROR(_context->_runtime_filter_slots->init( - state, hash_table_ctx.hash_table->size(), _context->_build_rf_cardinality)); + RETURN_IF_ERROR(parent->_runtime_filter_slots->init( + state, hash_table_ctx.hash_table->size(), parent->_build_rf_cardinality)); - if (!_context->_runtime_filter_slots->empty() && !_context->_inserted_rows.empty()) { + if (!parent->_runtime_filter_slots->empty() && !parent->_inserted_rows.empty()) { { - SCOPED_TIMER(_context->_push_compute_timer); - _context->_runtime_filter_slots->insert(_context->_inserted_rows); + SCOPED_TIMER(parent->_push_compute_timer); + parent->_runtime_filter_slots->insert(parent->_inserted_rows); } } { - SCOPED_TIMER(_context->_push_down_timer); - RETURN_IF_ERROR(_context->_runtime_filter_slots->publish()); + SCOPED_TIMER(parent->_push_down_timer); + RETURN_IF_ERROR(parent->_runtime_filter_slots->publish()); } return Status::OK(); } - -private: - RuntimeFilterContext* _context; -}; - -struct HashJoinBuildContext { - HashJoinBuildContext(HashJoinNode* join_node); - HashJoinBuildContext(pipeline::HashJoinBuildSinkLocalState* local_state); - - void add_hash_buckets_info(const std::string& info) const { - _profile->add_info_string("HashTableBuckets", info); - } - void add_hash_buckets_filled_info(const std::string& info) const { - _profile->add_info_string("HashTableFilledBuckets", info); - } - RuntimeProfile::Counter* _hash_table_memory_usage; - RuntimeProfile::Counter* _build_buckets_counter; - RuntimeProfile::Counter* _build_collisions_counter; - RuntimeProfile::Counter* _build_buckets_fill_counter; - RuntimeProfile::Counter* _build_table_insert_timer; - RuntimeProfile::Counter* _build_table_expanse_timer; - RuntimeProfile::Counter* _build_table_convert_timer; - RuntimeProfile::Counter* _build_side_compute_hash_timer; - RuntimeProfile::HighWaterMarkCounter* _build_arena_memory_usage; - RuntimeProfile* _profile; - - Sizes& _build_key_sz; - bool& _build_unique; - std::vector& _runtime_filter_descs; - std::unordered_map>& _inserted_rows; - std::shared_ptr& _arena; - size_t& _build_rf_cardinality; }; using ProfileCounter = RuntimeProfile::Counter; -template +template struct ProcessHashTableBuild { ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& build_raw_ptrs, - HashJoinBuildContext* join_context, int batch_size, uint8_t offset, - RuntimeState* state) + Parent* parent, int batch_size, uint8_t offset, RuntimeState* state) : _rows(rows), _skip_rows(0), _acquired_block(acquired_block), _build_raw_ptrs(build_raw_ptrs), - _join_context(join_context), + _parent(parent), _batch_size(batch_size), _offset(offset), _state(state), - _build_side_compute_hash_timer(join_context->_build_side_compute_hash_timer) {} + _build_side_compute_hash_timer(parent->_build_side_compute_hash_timer) {} template Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key) { @@ -172,30 +125,30 @@ struct ProcessHashTableBuild { int64_t bucket_size = hash_table_ctx.hash_table->get_buffer_size_in_cells(); int64_t filled_bucket_size = hash_table_ctx.hash_table->size(); int64_t bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes(); - COUNTER_SET(_join_context->_hash_table_memory_usage, bucket_bytes); - COUNTER_SET(_join_context->_build_buckets_counter, bucket_size); - COUNTER_SET(_join_context->_build_collisions_counter, + COUNTER_SET(_parent->_hash_table_memory_usage, bucket_bytes); + COUNTER_SET(_parent->_build_buckets_counter, bucket_size); + COUNTER_SET(_parent->_build_collisions_counter, hash_table_ctx.hash_table->get_collisions()); - COUNTER_SET(_join_context->_build_buckets_fill_counter, filled_bucket_size); + COUNTER_SET(_parent->_build_buckets_fill_counter, filled_bucket_size); auto hash_table_buckets = hash_table_ctx.hash_table->get_buffer_sizes_in_cells(); std::string hash_table_buckets_info; for (auto bucket_count : hash_table_buckets) { hash_table_buckets_info += std::to_string(bucket_count) + ", "; } - _join_context->add_hash_buckets_info(hash_table_buckets_info); + _parent->add_hash_buckets_info(hash_table_buckets_info); auto hash_table_sizes = hash_table_ctx.hash_table->sizes(); hash_table_buckets_info.clear(); for (auto table_size : hash_table_sizes) { hash_table_buckets_info += std::to_string(table_size) + ", "; } - _join_context->add_hash_buckets_filled_info(hash_table_buckets_info); + _parent->add_hash_buckets_filled_info(hash_table_buckets_info); }}; - KeyGetter key_getter(_build_raw_ptrs, _join_context->_build_key_sz); + KeyGetter key_getter(_build_raw_ptrs, _parent->build_key_sz()); - SCOPED_TIMER(_join_context->_build_table_insert_timer); + SCOPED_TIMER(_parent->_build_table_insert_timer); hash_table_ctx.hash_table->reset_resize_timer(); // only not build_unique, we need expanse hash table before insert data @@ -204,21 +157,21 @@ struct ProcessHashTableBuild { // 2. There are many duplicate keys, and the hash table filled bucket is far less than // the hash table build bucket, which may waste a lot of memory. // TODO, use the NDV expansion of the key column in the optimizer statistics - if (!_join_context->_build_unique) { + if (!_parent->build_unique()) { RETURN_IF_CATCH_EXCEPTION(hash_table_ctx.hash_table->expanse_for_add_elem( std::min(_rows, config::hash_table_pre_expanse_max_rows))); } - vector& inserted_rows = _join_context->_inserted_rows[&_acquired_block]; - bool has_runtime_filter = !_join_context->_runtime_filter_descs.empty(); + vector& inserted_rows = _parent->_inserted_rows[&_acquired_block]; + bool has_runtime_filter = !_parent->runtime_filter_descs().empty(); if (has_runtime_filter) { inserted_rows.reserve(_batch_size); } - hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _join_context->_build_key_sz, _rows, + hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _parent->build_key_sz(), _rows, null_map ? null_map->data() : nullptr); - auto& arena = *(_join_context->_arena); + auto& arena = *_parent->arena(); auto old_build_arena_memory = arena.size(); size_t k = 0; @@ -229,7 +182,7 @@ struct ProcessHashTableBuild { ctor(key, Mapped {k, _offset}); }; - bool build_unique = _join_context->_build_unique; + bool build_unique = _parent->build_unique(); #define EMPLACE_IMPL(stmt) \ for (; k < _rows; ++k) { \ if (k % CHECK_FRECUENCY == 0) { \ @@ -258,21 +211,21 @@ struct ProcessHashTableBuild { } else if (has_runtime_filter && !build_unique) { EMPLACE_IMPL( if (inserted) { inserted_rows.push_back(k); } else { - mapped.insert({k, _offset}, *(_join_context->_arena)); + mapped.insert({k, _offset}, *_parent->arena()); inserted_rows.push_back(k); }); } else if (!has_runtime_filter && build_unique) { EMPLACE_IMPL(if (!inserted) { _skip_rows++; }); } else { - EMPLACE_IMPL(if (!inserted) { mapped.insert({k, _offset}, *(_join_context->_arena)); }); + EMPLACE_IMPL(if (!inserted) { mapped.insert({k, _offset}, *_parent->arena()); }); } - _join_context->_build_rf_cardinality += inserted_rows.size(); + _parent->_build_rf_cardinality += inserted_rows.size(); - _join_context->_build_arena_memory_usage->add(arena.size() - old_build_arena_memory); + _parent->_build_arena_memory_usage->add(arena.size() - old_build_arena_memory); - COUNTER_UPDATE(_join_context->_build_table_expanse_timer, + COUNTER_UPDATE(_parent->_build_table_expanse_timer, hash_table_ctx.hash_table->get_resize_timer_value()); - COUNTER_UPDATE(_join_context->_build_table_convert_timer, + COUNTER_UPDATE(_parent->_build_table_convert_timer, hash_table_ctx.hash_table->get_convert_timer_value()); return Status::OK(); @@ -283,7 +236,7 @@ private: int _skip_rows; Block& _acquired_block; ColumnRawPtrs& _build_raw_ptrs; - HashJoinBuildContext* _join_context; + Parent* _parent; int _batch_size; uint8_t _offset; RuntimeState* _state; @@ -348,16 +301,16 @@ using HashTableVariants = std::variant< class VExprContext; using HashTableCtxVariants = - std::variant, - ProcessHashTableProbe, - ProcessHashTableProbe, - ProcessHashTableProbe, - ProcessHashTableProbe, - ProcessHashTableProbe, - ProcessHashTableProbe, - ProcessHashTableProbe, - ProcessHashTableProbe, - ProcessHashTableProbe>; + std::variant, + ProcessHashTableProbe, + ProcessHashTableProbe, + ProcessHashTableProbe, + ProcessHashTableProbe, + ProcessHashTableProbe, + ProcessHashTableProbe, + ProcessHashTableProbe, + ProcessHashTableProbe, + ProcessHashTableProbe>; using HashTableIteratorVariants = std::variant, @@ -365,52 +318,6 @@ using HashTableIteratorVariants = static constexpr auto HASH_JOIN_MAX_BUILD_BLOCK_COUNT = 128; -struct HashJoinProbeContext { - HashJoinProbeContext(HashJoinNode* join_node); - HashJoinProbeContext(pipeline::HashJoinProbeLocalState* local_state); - bool _have_other_join_conjunct; - const bool _is_right_semi_anti; - const bool _is_outer_join; - - MutableColumnPtr* _tuple_is_null_left_flag_column; - MutableColumnPtr* _tuple_is_null_right_flag_column; - - // other expr - VExprContextSPtrs* _other_join_conjuncts; - - DataTypes* _right_table_data_types; - DataTypes* _left_table_data_types; - - RuntimeProfile::Counter* _search_hashtable_timer; - RuntimeProfile::Counter* _build_side_output_timer; - RuntimeProfile::Counter* _probe_side_output_timer; - RuntimeProfile::Counter* _probe_process_hashtable_timer; - RuntimeProfile::Counter* _process_other_join_conjunct_timer; - RuntimeProfile::Counter* _rows_returned_counter; - RuntimeProfile::HighWaterMarkCounter* _probe_arena_memory_usage; - - std::shared_ptr _arena; - - // for full/right outer join - HashTableIteratorVariants* _outer_join_pull_visited_iter; - HashTableIteratorVariants* _probe_row_match_iter; - - std::shared_ptr> _build_blocks; - Block* _probe_block; - ColumnRawPtrs* _probe_columns; - int* _probe_index; - bool* _ready_probe; - - Sizes _probe_key_sz; - - std::vector* _left_output_slot_flags; - std::vector* _right_output_slot_flags; - - // for cases when a probe row matches more than batch size build rows. - bool* _is_any_probe_match_row_output; - bool _has_null_value_in_build_side {}; -}; - class HashJoinNode final : public VJoinNodeBase { public: HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); @@ -450,13 +357,27 @@ public: return _runtime_filter_slots->ready_finish_publish(); } + bool have_other_join_conjunct() const { return _have_other_join_conjunct; } + bool is_right_semi_anti() const { return _is_right_semi_anti; } + bool is_outer_join() const { return _is_outer_join; } + std::shared_ptr> build_blocks() const { return _build_blocks; } + Sizes probe_key_sz() const { return _probe_key_sz; } + std::vector* left_output_slot_flags() { return &_left_output_slot_flags; } + std::vector* right_output_slot_flags() { return &_right_output_slot_flags; } + bool* has_null_in_build_side() { return &_has_null_in_build_side; } + DataTypes right_table_data_types() { return _right_table_data_types; } + DataTypes left_table_data_types() { return _left_table_data_types; } + vectorized::Sizes& build_key_sz() { return _build_key_sz; } + bool build_unique() const { return _build_unique; } + std::vector& runtime_filter_descs() { return _runtime_filter_descs; } + std::shared_ptr arena() { return _arena; } + protected: void _probe_side_open_thread(RuntimeState* state, std::promise* status) override; private: - friend struct HashJoinProbeContext; - friend struct HashJoinBuildContext; - friend struct RuntimeFilterContext; + template + friend struct ProcessHashTableProbe; void _init_short_circuit_for_probe() override { _short_circuit_for_probe = @@ -605,13 +526,12 @@ private: bool* eos, Block* temp_block, bool check_rows_count = true); - template + template friend struct ProcessHashTableBuild; - template + template friend struct ProcessHashTableProbe; - template friend struct ProcessRuntimeFilterBuild; std::vector _runtime_filter_descs; @@ -620,8 +540,6 @@ private: std::vector _runtime_filters; size_t _build_rf_cardinality = 0; std::atomic_bool _probe_open_finish = false; - - std::unique_ptr _probe_context; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 9bb946bc0e..b3758407f8 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -105,7 +105,7 @@ protected: TJoinOp::type _join_op; JoinOpVariants _join_op_variants; - bool _have_other_join_conjunct; + const bool _have_other_join_conjunct; const bool _match_all_probe; // output all rows coming from the probe input. Full/Left Join const bool _match_all_build; // output all rows coming from the build input. Full/Right Join bool _build_unique; // build a hash table without duplicated rows. Left semi/anti Join diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index fae87c3731..05c0bc609e 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -54,6 +54,7 @@ struct SharedRuntimeFilterContext { struct SharedHashTableContext { SharedHashTableContext() : hash_table_variants(nullptr), + blocks(new std::vector()), signaled(false), short_circuit_for_null_in_probe_side(false) {}