[refine](pipelineX) refine dataqueue set source ready block (#27733)

This commit is contained in:
Mryange
2023-11-30 13:00:18 +08:00
committed by GitHub
parent 7f13dcc726
commit da03a50824
4 changed files with 39 additions and 11 deletions

View File

@ -120,9 +120,7 @@ Status DataQueue::get_block_from_queue(std::unique_ptr<vectorized::Block>* outpu
_cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
auto old_value = _cur_blocks_total_nums.fetch_sub(1);
if (old_value == 1 && _source_dependency) {
if (!is_all_finish()) {
_source_dependency->block();
}
set_source_block();
_sink_dependencies[_flag_queue_idx]->set_ready();
}
} else {
@ -145,7 +143,7 @@ void DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_i
_cur_blocks_nums_in_queue[child_idx] += 1;
_cur_blocks_total_nums++;
if (_source_dependency) {
_source_dependency->set_ready();
set_source_ready();
_sink_dependencies[child_idx]->block();
}
//this only use to record the queue[0] for profile
@ -160,12 +158,10 @@ void DataQueue::set_finish(int child_idx) {
return;
}
_is_finished[child_idx] = true;
if (_source_dependency) {
_source_dependency->set_ready();
}
if (_un_finished_counter.fetch_sub(1) == 1) {
_is_all_finished = true;
}
set_source_ready();
}
void DataQueue::set_canceled(int child_idx) {
@ -173,9 +169,10 @@ void DataQueue::set_canceled(int child_idx) {
DCHECK(!_is_finished[child_idx]);
_is_canceled[child_idx] = true;
_is_finished[child_idx] = true;
if (_source_dependency) {
_source_dependency->set_ready();
if (_un_finished_counter.fetch_sub(1) == 1) {
_is_all_finished = true;
}
set_source_ready();
}
bool DataQueue::is_finish(int child_idx) {
@ -186,5 +183,22 @@ bool DataQueue::is_all_finish() {
return _is_all_finished;
}
void DataQueue::set_source_ready() {
if (_source_dependency) {
std::unique_lock lc(_source_lock);
_source_dependency->set_ready();
}
}
void DataQueue::set_source_block() {
if (_cur_blocks_total_nums == 0 && !is_all_finish()) {
std::unique_lock lc(_source_lock);
// Performing the judgment twice, attempting to avoid blocking the source as much as possible.
if (_cur_blocks_total_nums == 0 && !is_all_finish()) {
_source_dependency->block();
}
}
}
} // namespace pipeline
} // namespace doris

View File

@ -25,6 +25,7 @@
#include <vector>
#include "common/status.h"
#include "util/spinlock.h"
#include "vec/core/block.h"
namespace doris {
@ -67,6 +68,9 @@ public:
_sink_dependencies[child_idx] = sink_dependency;
}
void set_source_ready();
void set_source_block();
private:
friend class AggSourceDependency;
friend class UnionSourceDependency;
@ -103,6 +107,7 @@ private:
// data queue is multi sink one source
Dependency* _source_dependency = nullptr;
std::vector<Dependency*> _sink_dependencies;
SpinLock _source_lock;
};
} // namespace pipeline

View File

@ -154,6 +154,15 @@ std::shared_ptr<UnionSharedState> UnionSourceLocalState::create_shared_state() {
return data_queue;
}
std::string UnionSourceLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})",
_shared_state->data_queue.is_all_finish(),
_shared_state->data_queue.remaining_has_data());
return fmt::to_string(debug_string_buffer);
}
Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
auto& local_state = get_local_state(state);

View File

@ -75,8 +75,6 @@ public:
UnionSourceDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "UnionSourceDependency", query_ctx) {}
~UnionSourceDependency() override = default;
void block() override {}
};
class UnionSourceOperatorX;
@ -90,6 +88,8 @@ public:
Status init(RuntimeState* state, LocalStateInfo& info) override;
std::shared_ptr<UnionSharedState> create_shared_state();
[[nodiscard]] std::string debug_string(int indentation_level = 0) const override;
private:
friend class UnionSourceOperatorX;
friend class OperatorX<UnionSourceLocalState>;