diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index c06ac94628..e191ee4d07 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -125,6 +125,9 @@ public: Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) { // process IN_OR_BLOOM_FILTER's real type for (auto* filter : _runtime_filters) { + if (filter->get_ignored() || filter->get_disabled()) { + continue; + } if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER && get_real_size(filter, local_hash_table_size) > state->runtime_filter_max_in_num()) { RETURN_IF_ERROR(filter->change_to_bloom_filter()); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index c764b8d1a7..d47c6f445d 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -174,20 +174,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu SCOPED_TIMER(_publish_runtime_filter_timer); RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table)); } catch (Exception& e) { - bool blocked_by_complete_build_stage = p._shared_hashtable_controller && - !p._shared_hash_table_context->complete_build_stage; bool blocked_by_shared_hash_table_signal = !_should_build_hash_table && p._shared_hashtable_controller && !p._shared_hash_table_context->signaled; return Status::InternalError( "rf process meet error: {}, wake_up_early: {}, should_build_hash_table: " - "{}, _finish_dependency: {}, blocked_by_complete_build_stage: {}, " + "{}, _finish_dependency: {}," "blocked_by_shared_hash_table_signal: " "{}", e.to_string(), state->get_task()->wake_up_early(), _should_build_hash_table, - _finish_dependency->debug_string(), blocked_by_complete_build_stage, - blocked_by_shared_hash_table_signal); + _finish_dependency->debug_string(), blocked_by_shared_hash_table_signal); } return Base::close(state, exec_status); } @@ -653,7 +650,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state.process_build_block(state, (*local_state._shared_state->build_block))); if (_shared_hashtable_controller) { _shared_hash_table_context->status = Status::OK(); - _shared_hash_table_context->complete_build_stage = true; // arena will be shared with other instances. _shared_hash_table_context->arena = local_state._shared_state->arena; _shared_hash_table_context->hash_table_variants = @@ -666,8 +662,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context); _shared_hashtable_controller->signal(node_id()); } - } else if (!local_state._should_build_hash_table && - _shared_hash_table_context->complete_build_stage) { + } else if (!local_state._should_build_hash_table) { DCHECK(_shared_hashtable_controller != nullptr); DCHECK(_shared_hash_table_context != nullptr); // the instance which is not build hash table, it's should wait the signal of hash table build finished. diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index ea26333a3a..da60bb2410 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -68,7 +68,6 @@ struct SharedHashTableContext { std::map runtime_filters; std::atomic signaled = false; bool short_circuit_for_null_in_probe_side = false; - std::atomic complete_build_stage = false; }; using SharedHashTableContextPtr = std::shared_ptr;