diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index d54633de29..e45033b71e 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -726,28 +726,28 @@ Status PipelineXFragmentContext::_build_pipeline_x_tasks( (_runtime_state->query_options().__isset.parallel_prepare_threshold && target_size > _runtime_state->query_options().parallel_prepare_threshold)) { Status prepare_status[target_size]; - std::mutex m; - std::condition_variable cv; - int prepare_done = 0; - for (size_t i = 0; i < target_size; i++) { - RETURN_IF_ERROR(thread_pool->submit_func([&, i]() { + int submitted_tasks = 0; + Status submit_status; + CountDownLatch latch((int)target_size); + for (int i = 0; i < target_size; i++) { + submit_status = thread_pool->submit_func([&, i]() { SCOPED_ATTACH_TASK(_query_ctx.get()); prepare_status[i] = pre_and_submit(i, this); - std::unique_lock lock(m); - prepare_done++; - if (prepare_done == target_size) { - cv.notify_one(); - } - })); + latch.count_down(); + }); + if (LIKELY(submit_status.ok())) { + submitted_tasks++; + } else { + break; + } } - std::unique_lock lock(m); - if (prepare_done != target_size) { - cv.wait(lock); - - for (size_t i = 0; i < target_size; i++) { - if (!prepare_status[i].ok()) { - return prepare_status[i]; - } + latch.arrive_and_wait(target_size - submitted_tasks); + if (UNLIKELY(!submit_status.ok())) { + return submit_status; + } + for (int i = 0; i < submitted_tasks; i++) { + if (!prepare_status[i].ok()) { + return prepare_status[i]; } } } else { diff --git a/be/src/util/countdown_latch.h b/be/src/util/countdown_latch.h index a41a417d20..b27737e8bb 100644 --- a/be/src/util/countdown_latch.h +++ b/be/src/util/countdown_latch.h @@ -91,6 +91,13 @@ public: } } + // decrements the internal counter by n and blocks the calling thread until the counter reaches zero. + void arrive_and_wait(uint64_t n) { + DCHECK_GE(n, 0); + count_down(n); + wait(); + } + uint64_t count() const { std::lock_guard lock(_lock); return _count;