diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h b/be/src/pipeline/exec/partition_sort_source_operator.h index 719f6c1388..4d9c3f4695 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.h +++ b/be/src/pipeline/exec/partition_sort_source_operator.h @@ -57,6 +57,10 @@ public: ~PartitionSortSourceDependency() override = default; void block() override { + if (_always_ready) { + return; + } + std::unique_lock lc(_always_done_lock); if (_always_ready) { return; } @@ -64,6 +68,10 @@ public: } void set_always_ready() { + if (_always_ready) { + return; + } + std::unique_lock lc(_always_done_lock); if (_always_ready) { return; } @@ -71,8 +79,16 @@ public: set_ready(); } + std::string debug_string(int indentation_level = 0) override { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}, _always_ready = {}", + Dependency::debug_string(indentation_level), _always_ready); + return fmt::to_string(debug_string_buffer); + } + private: - std::atomic _always_ready {false}; + bool _always_ready {false}; + std::mutex _always_done_lock; }; class PartitionSortSourceOperatorX; diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 8ad88b3375..42185fc80c 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -74,6 +74,10 @@ public: bool push_to_blocking_queue() const override { return true; } void block() override { + if (_scanner_done) { + return; + } + std::unique_lock lc(_always_done_lock); if (_scanner_done) { return; } @@ -81,6 +85,10 @@ public: } void set_scanner_done() { + if (_scanner_done) { + return; + } + std::unique_lock lc(_always_done_lock); if (_scanner_done) { return; } @@ -88,11 +96,19 @@ public: Dependency::set_ready(); } + std::string debug_string(int indentation_level = 0) override { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}, _scanner_done = {}", + Dependency::debug_string(indentation_level), _scanner_done); + return fmt::to_string(debug_string_buffer); + } + void set_scanner_ctx(vectorized::ScannerContext* scanner_ctx) { _scanner_ctx = scanner_ctx; } private: vectorized::ScannerContext* _scanner_ctx = nullptr; - std::atomic _scanner_done {false}; + bool _scanner_done {false}; + std::mutex _always_done_lock; }; class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::RuntimeFilterConsumer {