[Bug](join) return eof when join build sink awakend by downstream source #47380 (#47709)

pick from #47380
This commit is contained in:
Pxl
2025-02-12 09:41:11 +08:00
committed by GitHub
parent 3ca0fc3784
commit 217e6cef34
3 changed files with 6 additions and 9 deletions

View File

@ -125,6 +125,9 @@ public:
Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
// process IN_OR_BLOOM_FILTER's real type
for (auto* filter : _runtime_filters) {
if (filter->get_ignored() || filter->get_disabled()) {
continue;
}
if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
get_real_size(filter, local_hash_table_size) > state->runtime_filter_max_in_num()) {
RETURN_IF_ERROR(filter->change_to_bloom_filter());

View File

@ -174,20 +174,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
} catch (Exception& e) {
bool blocked_by_complete_build_stage = p._shared_hashtable_controller &&
!p._shared_hash_table_context->complete_build_stage;
bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
p._shared_hashtable_controller &&
!p._shared_hash_table_context->signaled;
return Status::InternalError(
"rf process meet error: {}, wake_up_early: {}, should_build_hash_table: "
"{}, _finish_dependency: {}, blocked_by_complete_build_stage: {}, "
"{}, _finish_dependency: {},"
"blocked_by_shared_hash_table_signal: "
"{}",
e.to_string(), state->get_task()->wake_up_early(), _should_build_hash_table,
_finish_dependency->debug_string(), blocked_by_complete_build_stage,
blocked_by_shared_hash_table_signal);
_finish_dependency->debug_string(), blocked_by_shared_hash_table_signal);
}
return Base::close(state, exec_status);
}
@ -653,7 +650,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state.process_build_block(state, (*local_state._shared_state->build_block)));
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
_shared_hash_table_context->complete_build_stage = true;
// arena will be shared with other instances.
_shared_hash_table_context->arena = local_state._shared_state->arena;
_shared_hash_table_context->hash_table_variants =
@ -666,8 +662,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
_shared_hashtable_controller->signal(node_id());
}
} else if (!local_state._should_build_hash_table &&
_shared_hash_table_context->complete_build_stage) {
} else if (!local_state._should_build_hash_table) {
DCHECK(_shared_hashtable_controller != nullptr);
DCHECK(_shared_hash_table_context != nullptr);
// the instance which is not build hash table, it's should wait the signal of hash table build finished.

View File

@ -68,7 +68,6 @@ struct SharedHashTableContext {
std::map<int, RuntimeFilterContextSPtr> runtime_filters;
std::atomic<bool> signaled = false;
bool short_circuit_for_null_in_probe_side = false;
std::atomic<bool> complete_build_stage = false;
};
using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;