diff --git a/be/src/pipeline/exec/data_queue.cpp b/be/src/pipeline/exec/data_queue.cpp index 680589ce2b..5b12c6876d 100644 --- a/be/src/pipeline/exec/data_queue.cpp +++ b/be/src/pipeline/exec/data_queue.cpp @@ -120,9 +120,7 @@ Status DataQueue::get_block_from_queue(std::unique_ptr* outpu _cur_blocks_nums_in_queue[_flag_queue_idx] -= 1; auto old_value = _cur_blocks_total_nums.fetch_sub(1); if (old_value == 1 && _source_dependency) { - if (!is_all_finish()) { - _source_dependency->block(); - } + set_source_block(); _sink_dependencies[_flag_queue_idx]->set_ready(); } } else { @@ -145,7 +143,7 @@ void DataQueue::push_block(std::unique_ptr block, int child_i _cur_blocks_nums_in_queue[child_idx] += 1; _cur_blocks_total_nums++; if (_source_dependency) { - _source_dependency->set_ready(); + set_source_ready(); _sink_dependencies[child_idx]->block(); } //this only use to record the queue[0] for profile @@ -160,12 +158,10 @@ void DataQueue::set_finish(int child_idx) { return; } _is_finished[child_idx] = true; - if (_source_dependency) { - _source_dependency->set_ready(); - } if (_un_finished_counter.fetch_sub(1) == 1) { _is_all_finished = true; } + set_source_ready(); } void DataQueue::set_canceled(int child_idx) { @@ -173,9 +169,10 @@ void DataQueue::set_canceled(int child_idx) { DCHECK(!_is_finished[child_idx]); _is_canceled[child_idx] = true; _is_finished[child_idx] = true; - if (_source_dependency) { - _source_dependency->set_ready(); + if (_un_finished_counter.fetch_sub(1) == 1) { + _is_all_finished = true; } + set_source_ready(); } bool DataQueue::is_finish(int child_idx) { @@ -186,5 +183,22 @@ bool DataQueue::is_all_finish() { return _is_all_finished; } +void DataQueue::set_source_ready() { + if (_source_dependency) { + std::unique_lock lc(_source_lock); + _source_dependency->set_ready(); + } +} + +void DataQueue::set_source_block() { + if (_cur_blocks_total_nums == 0 && !is_all_finish()) { + std::unique_lock lc(_source_lock); + // Performing the judgment twice, attempting to avoid blocking the source as much as possible. + if (_cur_blocks_total_nums == 0 && !is_all_finish()) { + _source_dependency->block(); + } + } +} + } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h index d28fe5d8f0..f0d3511a86 100644 --- a/be/src/pipeline/exec/data_queue.h +++ b/be/src/pipeline/exec/data_queue.h @@ -25,6 +25,7 @@ #include #include "common/status.h" +#include "util/spinlock.h" #include "vec/core/block.h" namespace doris { @@ -67,6 +68,9 @@ public: _sink_dependencies[child_idx] = sink_dependency; } + void set_source_ready(); + void set_source_block(); + private: friend class AggSourceDependency; friend class UnionSourceDependency; @@ -103,6 +107,7 @@ private: // data queue is multi sink one source Dependency* _source_dependency = nullptr; std::vector _sink_dependencies; + SpinLock _source_lock; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 619b40f777..591e9b04f9 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -154,6 +154,15 @@ std::shared_ptr UnionSourceLocalState::create_shared_state() { return data_queue; } +std::string UnionSourceLocalState::debug_string(int indentation_level) const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); + fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})", + _shared_state->data_queue.is_all_finish(), + _shared_state->data_queue.remaining_has_data()); + return fmt::to_string(debug_string_buffer); +} + Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { auto& local_state = get_local_state(state); diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 0927ddb9ab..8d1b25bc4c 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -75,8 +75,6 @@ public: UnionSourceDependency(int id, int node_id, QueryContext* query_ctx) : Dependency(id, node_id, "UnionSourceDependency", query_ctx) {} ~UnionSourceDependency() override = default; - - void block() override {} }; class UnionSourceOperatorX; @@ -90,6 +88,8 @@ public: Status init(RuntimeState* state, LocalStateInfo& info) override; std::shared_ptr create_shared_state(); + [[nodiscard]] std::string debug_string(int indentation_level = 0) const override; + private: friend class UnionSourceOperatorX; friend class OperatorX;