diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index c27681738a..89b024a6c6 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -83,13 +83,20 @@ public: void return_available_block() { if (_available_block.fetch_add(1) == 0) { + std::lock_guard lock(_lock); + if (_available_block == 0) { + return; + } Dependency::set_ready(); } } void take_available_block() { if (_available_block.fetch_sub(1) == 1) { - Dependency::block(); + std::lock_guard lock(_lock); + if (_available_block == 0) { + Dependency::block(); + } } } @@ -97,6 +104,7 @@ public: private: std::atomic _available_block; + std::mutex _lock; }; /**