diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 495ac28e76..4859734a6a 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -57,7 +57,7 @@ public: throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, filter_id={}", filter_id); } - for (auto filter : filters) { + for (auto* filter : filters) { filter->set_ignored(""); filter->signal(); } @@ -166,7 +166,7 @@ public: return Status::OK(); } - void insert(const std::unordered_set& datas) { + void insert(const vectorized::Block* block) { for (int i = 0; i < _build_expr_context.size(); ++i) { auto iter = _runtime_filters.find(i); if (iter == _runtime_filters.end()) { @@ -174,18 +174,16 @@ public: } int result_column_id = _build_expr_context[i]->get_last_result_column_id(); - for (const auto* it : datas) { - auto column = it->get_by_position(result_column_id).column; - for (auto* filter : iter->second) { - filter->insert_batch(column, 1); - } + const auto& column = block->get_by_position(result_column_id).column; + for (auto* filter : iter->second) { + filter->insert_batch(column, 1); } } } bool ready_finish_publish() { for (auto& pair : _runtime_filters) { - for (auto filter : pair.second) { + for (auto* filter : pair.second) { if (!filter->is_finish_rpc()) { return false; } @@ -196,7 +194,7 @@ public: void finish_publish() { for (auto& pair : _runtime_filters) { - for (auto filter : pair.second) { + for (auto* filter : pair.second) { static_cast(filter->join_rpc()); } } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 757673c70a..f02e203c78 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -484,29 +484,13 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._build_side_mutable_block.to_block()); COUNTER_UPDATE(local_state._build_blocks_memory_usage, (*local_state._shared_state->build_block).bytes()); - RETURN_IF_ERROR( - local_state.process_build_block(state, (*local_state._shared_state->build_block))); const bool use_global_rf = local_state._parent->cast()._use_global_rf; - auto ret = std::visit( - Overload {[&](std::monostate&) -> Status { - LOG(FATAL) << "FATAL: uninited hash table"; - __builtin_unreachable(); - }, - [&](auto&& arg) -> Status { - vectorized::ProcessRuntimeFilterBuild runtime_filter_build_process; - return runtime_filter_build_process(state, arg, &local_state, - use_global_rf); - }}, - *local_state._shared_state->hash_table_variants); - if (!ret.ok()) { - if (_shared_hashtable_controller) { - _shared_hash_table_context->status = ret; - _shared_hashtable_controller->signal(node_id()); - } - return ret; - } + RETURN_IF_ERROR(vectorized::process_runtime_filter_build( + state, local_state._shared_state->build_block.get(), &local_state, use_global_rf)); + RETURN_IF_ERROR( + local_state.process_build_block(state, (*local_state._shared_state->build_block))); if (_shared_hashtable_controller) { _shared_hash_table_context->status = Status::OK(); // arena will be shared with other instances. diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 5ea504d488..3c1b772b30 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -94,7 +94,10 @@ protected: friend class HashJoinBuildSinkOperatorX; template friend struct vectorized::ProcessHashTableBuild; - friend struct vectorized::ProcessRuntimeFilterBuild; + template + friend Status vectorized::process_runtime_filter_build(RuntimeState* state, + vectorized::Block* block, Parent* parent, + bool is_global); // build expr vectorized::VExprContextSPtrs _build_expr_ctxs; @@ -107,7 +110,6 @@ protected: std::shared_ptr _runtime_filter_slots; bool _has_set_need_null_map_for_build = false; bool _build_side_ignore_null = false; - std::unordered_set _inserted_blocks; std::shared_ptr _shared_hash_table_dependency; std::vector _build_col_ids; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index c65513c807..94cb5be876 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -662,7 +662,7 @@ Status HashJoinNode::alloc_resource(doris::RuntimeState* state) { SCOPED_TIMER(_allocate_resource_timer); RETURN_IF_ERROR(VJoinNodeBase::alloc_resource(state)); for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { - if (auto bf = _runtime_filters[i]->get_bloomfilter()) { + if (auto* bf = _runtime_filters[i]->get_bloomfilter()) { RETURN_IF_ERROR(bf->init_with_fixed_length()); } } @@ -751,23 +751,8 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc DCHECK(!_build_side_mutable_block.empty()); _build_block = std::make_shared(_build_side_mutable_block.to_block()); COUNTER_UPDATE(_build_blocks_memory_usage, _build_block->bytes()); + RETURN_IF_ERROR(process_runtime_filter_build(state, _build_block.get(), this)); RETURN_IF_ERROR(_process_build_block(state, *_build_block)); - auto ret = std::visit(Overload {[&](std::monostate&) -> Status { - LOG(FATAL) << "FATAL: uninited hash table"; - __builtin_unreachable(); - }, - [&](auto&& arg) -> Status { - ProcessRuntimeFilterBuild runtime_filter_build_process; - return runtime_filter_build_process(state, arg, this); - }}, - *_hash_table_variants); - if (!ret.ok()) { - if (_shared_hashtable_controller) { - _shared_hash_table_context->status = ret; - _shared_hashtable_controller->signal(id()); - } - return ret; - } if (_shared_hashtable_controller) { _shared_hash_table_context->status = Status::OK(); // arena will be shared with other instances. @@ -949,9 +934,6 @@ void HashJoinNode::_set_build_ignore_flag(Block& block, const std::vector& Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) { SCOPED_TIMER(_build_table_timer); size_t rows = block.rows(); - if (UNLIKELY(rows == 0)) { - return Status::OK(); - } COUNTER_UPDATE(_build_rows_counter, rows); ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size()); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index be94dacdca..b9b3d18dff 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -74,33 +74,28 @@ template struct ProcessHashTableProbe; class HashJoinNode; -struct ProcessRuntimeFilterBuild { - template - Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx, Parent* parent, - bool is_global = false) { - if (parent->runtime_filter_descs().empty()) { - return Status::OK(); - } - parent->_runtime_filter_slots = std::make_shared( - parent->_build_expr_ctxs, parent->runtime_filter_descs(), is_global); - - RETURN_IF_ERROR( - parent->_runtime_filter_slots->init(state, hash_table_ctx.hash_table->size())); - - if (!parent->_runtime_filter_slots->empty() && !parent->_inserted_blocks.empty()) { - { - SCOPED_TIMER(parent->_runtime_filter_compute_timer); - parent->_runtime_filter_slots->insert(parent->_inserted_blocks); - } - } - { - SCOPED_TIMER(parent->_publish_runtime_filter_timer); - RETURN_IF_ERROR(parent->_runtime_filter_slots->publish()); - } - +template +Status process_runtime_filter_build(RuntimeState* state, Block* block, Parent* parent, + bool is_global = false) { + if (parent->runtime_filter_descs().empty()) { return Status::OK(); } -}; + parent->_runtime_filter_slots = std::make_shared( + parent->_build_expr_ctxs, parent->runtime_filter_descs(), is_global); + + RETURN_IF_ERROR(parent->_runtime_filter_slots->init(state, block->rows())); + + if (!parent->_runtime_filter_slots->empty() && block->rows() > 1) { + SCOPED_TIMER(parent->_runtime_filter_compute_timer); + parent->_runtime_filter_slots->insert(block); + } + { + SCOPED_TIMER(parent->_publish_runtime_filter_timer); + RETURN_IF_ERROR(parent->_runtime_filter_slots->publish()); + } + + return Status::OK(); +} using ProfileCounter = RuntimeProfile::Counter; @@ -129,10 +124,6 @@ struct ProcessHashTableBuild { } } - if (!_parent->runtime_filter_descs().empty()) { - _parent->_inserted_blocks.insert(&_acquired_block); - } - SCOPED_TIMER(_parent->_build_table_insert_timer); hash_table_ctx.hash_table->template prepare_build(_rows, _batch_size, *has_null_key); @@ -414,10 +405,11 @@ private: template friend struct ProcessHashTableProbe; - friend struct ProcessRuntimeFilterBuild; + template + friend Status process_runtime_filter_build(RuntimeState* state, vectorized::Block* block, + Parent* parent, bool is_global); std::vector _runtime_filter_descs; - std::unordered_set _inserted_blocks; std::vector _runtime_filters; std::atomic_bool _probe_open_finish = false;