[fix](runtime-filter) build thread destruct first may cause probe thread coredump (#13911)

This commit is contained in:
yinzhijian
2022-11-04 09:29:37 +08:00
committed by GitHub
parent f2d84d81e6
commit e09033276e

View File

@ -250,20 +250,36 @@ Status HashJoinNode::open(RuntimeState* state) {
if (!_runtime_filter_descs.empty()) {
RuntimeFilterSlots runtime_filter_slots(_probe_expr_ctxs, _build_expr_ctxs,
_runtime_filter_descs);
RETURN_IF_ERROR(construct_hash_table(state));
RETURN_IF_ERROR(runtime_filter_slots.init(state, _hash_tbl->size()));
{
SCOPED_TIMER(_push_compute_timer);
auto func = [&](TupleRow* row) { runtime_filter_slots.insert(row); };
_hash_tbl->for_each_row(func);
}
COUNTER_UPDATE(_build_timer, _push_compute_timer->value());
{
SCOPED_TIMER(_push_down_timer);
runtime_filter_slots.publish();
}
Status st;
do {
st = construct_hash_table(state);
if (UNLIKELY(!st.ok())) {
break;
}
st = runtime_filter_slots.init(state, _hash_tbl->size());
if (UNLIKELY(!st.ok())) {
break;
}
{
SCOPED_TIMER(_push_compute_timer);
auto func = [&](TupleRow* row) { runtime_filter_slots.insert(row); };
_hash_tbl->for_each_row(func);
}
COUNTER_UPDATE(_build_timer, _push_compute_timer->value());
{
SCOPED_TIMER(_push_down_timer);
runtime_filter_slots.publish();
}
} while (false);
VLOG_ROW << "runtime st: " << st;
// Don't exit even if we see an error, we still need to wait for the probe thread
// to finish.
// If this return first, probe thread will use '_await_time_cost'
// which is already destructor and then coredump.
RETURN_IF_ERROR(thread_status.get_future().get());
if (UNLIKELY(!st.ok())) {
return st;
}
} else {
// Blocks until ConstructHashTable has returned, after which
// the hash table is fully constructed and we can start the probe