[fix](pipeline) premature exit causing core dump during concurrent pr… (#52850)

…epare execution (#51492)

Issue Number: close #51491

Problem Summary:
When the queue of the FragmentMgrAsync thread pool is full, newly
submitted tasks are rejected and return early. However, previously
submitted tasks may still be scheduled for execution later. This can
lead to premature destruction of objects such as PipelineFragmentContext
and TPipelineFragmentParams that are referenced by those tasks,
resulting in null pointer exceptions during task execution and
ultimately causing a coredump.

The pr policy is to wait until all previously submitted tasks are
completed before returning.

```
*** SIGSEGV address not mapped to object (@0x1c8) received by PID 3941201 (TID 2115617 OR 0xfe1685bb97f0) from PID 456; stack trace: ***
 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /home/jenkins_agent/workspace/BigDataComponent_doris-unified-arm-release/be/src/common/signal_handler.h:421
 1# os::Linux::chained_handler(int, siginfo_t*, void*) in /usr/jdk64/current/jre/lib/aarch64/server/libjvm.so
 2# JVM_handle_linux_signal in /usr/jdk64/current/jre/lib/aarch64/server/libjvm.so
 3# signalHandler(int, siginfo_t*, void*) in /usr/jdk64/current/jre/lib/aarch64/server/libjvm.so
 4# 0x0000FFFF6B2A07C0 in linux-vdso.so.1
 5# doris::TUniqueId::TUniqueId(doris::TUniqueId const&) at /home/jenkins_agent/workspace/BigDataComponent_doris-unified-arm-release/gensrc/build/gen_cpp/Types_types.cpp:2354
 6# doris::AttachTask::AttachTask(doris::QueryContext*) at /home/jenkins_agent/workspace/BigDataComponent_doris-unified-arm-release/be/src/runtime/thread_context.cpp:60
 7# std::_Function_handler<void (), doris::pipeline::PipelineXFragmentContext::_build_pipeline_x_tasks(doris::TPipelineFragmentParams const&, doris::ThreadPool*)::$_0>::_M_invoke(std::_Any_data const&) at /usr/lib/gcc/aarch64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:290
 8# doris::ThreadPool::dispatch_thread() at /home/jenkins_agent/workspace/BigDataComponent_doris-unified-arm-release/be/src/util/threadpool.cpp:552
 9# doris::Thread::supervise_thread(void*) at /home/jenkins_agent/workspace/BigDataComponent_doris-unified-arm-release/be/src/util/thread.cpp:499
10# 0x0000FFFF6AF187AC in /lib64/libpthread.so.0
11# 0x0000FFFF6B16548C in /lib64/libc.so.6
```

Co-authored-by: XLPE <crykix@gmail.com>
Co-authored-by: XLPE <weiwh1@chinatelecom.cn>
This commit is contained in:
Gabriel
2025-07-09 10:20:59 +08:00
committed by GitHub
parent abb9c88a3f
commit 528eaadaf7
2 changed files with 26 additions and 19 deletions

View File

@ -726,28 +726,28 @@ Status PipelineXFragmentContext::_build_pipeline_x_tasks(
(_runtime_state->query_options().__isset.parallel_prepare_threshold &&
target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
Status prepare_status[target_size];
std::mutex m;
std::condition_variable cv;
int prepare_done = 0;
for (size_t i = 0; i < target_size; i++) {
RETURN_IF_ERROR(thread_pool->submit_func([&, i]() {
int submitted_tasks = 0;
Status submit_status;
CountDownLatch latch((int)target_size);
for (int i = 0; i < target_size; i++) {
submit_status = thread_pool->submit_func([&, i]() {
SCOPED_ATTACH_TASK(_query_ctx.get());
prepare_status[i] = pre_and_submit(i, this);
std::unique_lock<std::mutex> lock(m);
prepare_done++;
if (prepare_done == target_size) {
cv.notify_one();
}
}));
latch.count_down();
});
if (LIKELY(submit_status.ok())) {
submitted_tasks++;
} else {
break;
}
}
std::unique_lock<std::mutex> lock(m);
if (prepare_done != target_size) {
cv.wait(lock);
for (size_t i = 0; i < target_size; i++) {
if (!prepare_status[i].ok()) {
return prepare_status[i];
}
latch.arrive_and_wait(target_size - submitted_tasks);
if (UNLIKELY(!submit_status.ok())) {
return submit_status;
}
for (int i = 0; i < submitted_tasks; i++) {
if (!prepare_status[i].ok()) {
return prepare_status[i];
}
}
} else {

View File

@ -91,6 +91,13 @@ public:
}
}
// decrements the internal counter by n and blocks the calling thread until the counter reaches zero.
void arrive_and_wait(uint64_t n) {
DCHECK_GE(n, 0);
count_down(n);
wait();
}
uint64_t count() const {
std::lock_guard<std::mutex> lock(_lock);
return _count;