From f49722163ed7b779afcc0db331715159cefeaade Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 22 Nov 2023 19:25:04 +0800 Subject: [PATCH] [pipelineX](bug) Fix query timeout due to broadcast (#27398) --- be/src/pipeline/exec/exchange_sink_operator.h | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index c27681738a..89b024a6c6 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -83,13 +83,20 @@ public: void return_available_block() { if (_available_block.fetch_add(1) == 0) { + std::lock_guard lock(_lock); + if (_available_block == 0) { + return; + } Dependency::set_ready(); } } void take_available_block() { if (_available_block.fetch_sub(1) == 1) { - Dependency::block(); + std::lock_guard lock(_lock); + if (_available_block == 0) { + Dependency::block(); + } } } @@ -97,6 +104,7 @@ public: private: std::atomic _available_block; + std::mutex _lock; }; /**