[pipelineX](dependency) Wake by task by read dependency (#27260)
This commit is contained in:
@ -23,7 +23,6 @@ namespace doris {
|
||||
|
||||
class HttpRequest;
|
||||
|
||||
// Get BE health state from http API.
|
||||
class PipelineTaskAction : public HttpHandler {
|
||||
public:
|
||||
PipelineTaskAction() = default;
|
||||
|
||||
@ -70,8 +70,6 @@ public:
|
||||
ExchangeSinkQueueDependency(int id, int node_id)
|
||||
: WriteDependency(id, node_id, "ResultQueueDependency") {}
|
||||
~ExchangeSinkQueueDependency() override = default;
|
||||
|
||||
void* shared_state() override { return nullptr; }
|
||||
};
|
||||
|
||||
class BroadcastDependency final : public WriteDependency {
|
||||
@ -95,19 +93,6 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void* shared_state() override {
|
||||
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void set_ready_for_write() override {
|
||||
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!");
|
||||
}
|
||||
|
||||
void block_writing() override {
|
||||
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!");
|
||||
}
|
||||
|
||||
int available_blocks() const { return _available_block; }
|
||||
|
||||
private:
|
||||
@ -138,7 +123,6 @@ public:
|
||||
LocalExchangeChannelDependency(int id, int node_id)
|
||||
: WriteDependency(id, node_id, "LocalExchangeChannelDependency") {}
|
||||
~LocalExchangeChannelDependency() override = default;
|
||||
void* shared_state() override { return nullptr; }
|
||||
// TODO(gabriel): blocked by memory
|
||||
};
|
||||
|
||||
|
||||
@ -56,15 +56,13 @@ public:
|
||||
ExchangeDataDependency(int id, int node_id,
|
||||
vectorized::VDataStreamRecvr::SenderQueue* sender_queue)
|
||||
: Dependency(id, node_id, "DataDependency"), _always_done(false) {}
|
||||
void* shared_state() override { return nullptr; }
|
||||
|
||||
void set_always_done() {
|
||||
_always_done = true;
|
||||
if (_ready_for_read) {
|
||||
if (_always_done) {
|
||||
return;
|
||||
}
|
||||
_read_dependency_watcher.stop();
|
||||
_ready_for_read = true;
|
||||
_always_done = true;
|
||||
Dependency::set_ready_for_read();
|
||||
}
|
||||
|
||||
void block_reading() override {
|
||||
|
||||
@ -52,8 +52,6 @@ public:
|
||||
SharedHashTableDependency(int id, int node_id)
|
||||
: WriteDependency(id, node_id, "SharedHashTableDependency") {}
|
||||
~SharedHashTableDependency() override = default;
|
||||
|
||||
void* shared_state() override { return nullptr; }
|
||||
};
|
||||
|
||||
class HashJoinBuildSinkLocalState final
|
||||
|
||||
@ -49,8 +49,6 @@ public:
|
||||
ResultSinkDependency(int id, int node_id)
|
||||
: WriteDependency(id, node_id, "ResultSinkDependency") {}
|
||||
~ResultSinkDependency() override = default;
|
||||
|
||||
void* shared_state() override { return nullptr; }
|
||||
};
|
||||
|
||||
class ResultSinkLocalState final : public PipelineXSinkLocalState<> {
|
||||
|
||||
@ -62,8 +62,6 @@ public:
|
||||
ScanDependency(int id, int node_id)
|
||||
: Dependency(id, node_id, "ScanDependency"), _scanner_ctx(nullptr) {}
|
||||
|
||||
void* shared_state() override { return nullptr; }
|
||||
|
||||
// TODO(gabriel):
|
||||
[[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
|
||||
if (_scanner_ctx && _scanner_ctx->get_num_running_scanners() == 0 &&
|
||||
@ -73,6 +71,8 @@ public:
|
||||
return Dependency::read_blocked_by(task);
|
||||
}
|
||||
|
||||
bool push_to_blocking_queue() override { return true; }
|
||||
|
||||
void block_reading() override {
|
||||
if (_eos) {
|
||||
return;
|
||||
|
||||
@ -165,6 +165,7 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState* state, LocalSinkState
|
||||
_build_timer = ADD_TIMER(_profile, "BuildTime");
|
||||
|
||||
Parent& parent = _parent->cast<Parent>();
|
||||
_dependency->set_cur_child_id(parent._cur_child_id);
|
||||
_child_exprs.resize(parent._child_exprs.size());
|
||||
for (size_t i = 0; i < _child_exprs.size(); i++) {
|
||||
RETURN_IF_ERROR(parent._child_exprs[i]->clone(state, _child_exprs[i]));
|
||||
|
||||
@ -133,7 +133,7 @@ public:
|
||||
_wait_worker_watcher.start();
|
||||
}
|
||||
void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
|
||||
PipelineTaskState get_state() { return _cur_state; }
|
||||
PipelineTaskState get_state() const { return _cur_state; }
|
||||
void set_state(PipelineTaskState state);
|
||||
|
||||
virtual bool is_pending_finish() {
|
||||
@ -154,6 +154,7 @@ public:
|
||||
}
|
||||
|
||||
virtual bool source_can_read() { return _source->can_read() || _pipeline->_always_can_read; }
|
||||
virtual bool push_blocked_task_to_queue() const { return true; }
|
||||
|
||||
virtual bool runtime_filters_are_ready_or_timeout() {
|
||||
return _source->runtime_filters_are_ready_or_timeout();
|
||||
|
||||
@ -29,10 +29,8 @@
|
||||
namespace doris::pipeline {
|
||||
|
||||
void Dependency::add_block_task(PipelineXTask* task) {
|
||||
// TODO(gabriel): support read dependency
|
||||
if (!_blocked_task.empty() && _blocked_task[_blocked_task.size() - 1] == task) {
|
||||
return;
|
||||
}
|
||||
DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] != task)
|
||||
<< "Duplicate task: " << task->debug_string();
|
||||
_blocked_task.push_back(task);
|
||||
}
|
||||
|
||||
@ -74,6 +72,17 @@ void Dependency::set_ready_for_read() {
|
||||
_ready_for_read = true;
|
||||
local_block_task.swap(_blocked_task);
|
||||
}
|
||||
for (auto* task : local_block_task) {
|
||||
task->try_wake_up(this);
|
||||
}
|
||||
}
|
||||
|
||||
void SetDependency::set_ready_for_read() {
|
||||
if (_child_idx == 0) {
|
||||
WriteDependency::set_ready_for_read();
|
||||
} else {
|
||||
_set_state->probe_finished_children_dependency[0]->set_ready_for_read();
|
||||
}
|
||||
}
|
||||
|
||||
void WriteDependency::set_ready_for_write() {
|
||||
@ -84,7 +93,7 @@ void WriteDependency::set_ready_for_write() {
|
||||
|
||||
std::vector<PipelineXTask*> local_block_task {};
|
||||
{
|
||||
std::unique_lock<std::mutex> lc(_task_lock);
|
||||
std::unique_lock<std::mutex> lc(_write_task_lock);
|
||||
if (_ready_for_write) {
|
||||
return;
|
||||
}
|
||||
@ -133,7 +142,7 @@ Dependency* Dependency::read_blocked_by(PipelineXTask* task) {
|
||||
|
||||
std::unique_lock<std::mutex> lc(_task_lock);
|
||||
auto ready_for_read = _ready_for_read.load();
|
||||
if (!ready_for_read && task) {
|
||||
if (!ready_for_read && !push_to_blocking_queue() && task) {
|
||||
add_block_task(task);
|
||||
}
|
||||
return ready_for_read ? nullptr : this;
|
||||
@ -162,7 +171,7 @@ FinishDependency* FinishDependency::finish_blocked_by(PipelineXTask* task) {
|
||||
}
|
||||
|
||||
WriteDependency* WriteDependency::write_blocked_by(PipelineXTask* task) {
|
||||
std::unique_lock<std::mutex> lc(_task_lock);
|
||||
std::unique_lock<std::mutex> lc(_write_task_lock);
|
||||
const auto ready_for_write = _ready_for_write.load();
|
||||
if (!ready_for_write && task) {
|
||||
add_write_block_task(task);
|
||||
|
||||
@ -57,9 +57,10 @@ public:
|
||||
|
||||
[[nodiscard]] int id() const { return _id; }
|
||||
[[nodiscard]] virtual std::string name() const { return _name; }
|
||||
virtual void* shared_state() = 0;
|
||||
virtual void* shared_state() { return nullptr; }
|
||||
virtual std::string debug_string(int indentation_level = 0);
|
||||
virtual bool is_write_dependency() { return false; }
|
||||
virtual bool push_to_blocking_queue() { return false; }
|
||||
|
||||
// Start the watcher. We use it to count how long this dependency block the current pipeline task.
|
||||
void start_read_watcher() {
|
||||
@ -145,6 +146,7 @@ public:
|
||||
protected:
|
||||
friend class Dependency;
|
||||
std::atomic<bool> _ready_for_write {true};
|
||||
std::mutex _write_task_lock;
|
||||
MonotonicStopWatch _write_dependency_watcher;
|
||||
|
||||
private:
|
||||
@ -172,7 +174,6 @@ public:
|
||||
|
||||
void set_ready_to_finish();
|
||||
|
||||
void* shared_state() override { return nullptr; }
|
||||
std::string debug_string(int indentation_level = 0) override;
|
||||
|
||||
void add_block_task(PipelineXTask* task) override;
|
||||
@ -222,7 +223,6 @@ public:
|
||||
RuntimeFilterDependency(int id, int node_id, std::string name)
|
||||
: Dependency(id, node_id, name) {}
|
||||
RuntimeFilterDependency* filter_blocked_by(PipelineXTask* task);
|
||||
void* shared_state() override { return nullptr; }
|
||||
void add_filters(IRuntimeFilter* runtime_filter);
|
||||
void sub_filters();
|
||||
void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) {
|
||||
@ -255,8 +255,6 @@ public:
|
||||
return fmt::to_string(debug_string_buffer);
|
||||
}
|
||||
|
||||
void* shared_state() override { return nullptr; }
|
||||
|
||||
std::string debug_string(int indentation_level = 0) override;
|
||||
|
||||
[[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
|
||||
@ -284,7 +282,6 @@ struct FakeDependency final : public WriteDependency {
|
||||
public:
|
||||
FakeDependency(int id, int node_id) : WriteDependency(id, node_id, "FakeDependency") {}
|
||||
using SharedState = FakeSharedState;
|
||||
void* shared_state() override { return nullptr; }
|
||||
[[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override { return nullptr; }
|
||||
[[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task) override {
|
||||
return nullptr;
|
||||
@ -465,6 +462,7 @@ public:
|
||||
}
|
||||
return this;
|
||||
}
|
||||
bool push_to_blocking_queue() override { return true; }
|
||||
void block_reading() override {}
|
||||
void block_writing() override {}
|
||||
|
||||
@ -667,7 +665,6 @@ public:
|
||||
AsyncWriterDependency(int id, int node_id)
|
||||
: WriteDependency(id, node_id, "AsyncWriterDependency") {}
|
||||
~AsyncWriterDependency() override = default;
|
||||
void* shared_state() override { return nullptr; }
|
||||
};
|
||||
|
||||
class SetDependency;
|
||||
@ -769,30 +766,10 @@ public:
|
||||
|
||||
void set_shared_state(std::shared_ptr<SetSharedState> set_state) { _set_state = set_state; }
|
||||
|
||||
// Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready.
|
||||
[[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
|
||||
if (config::enable_fuzzy_mode && !_set_state->ready_for_read &&
|
||||
_should_log(_read_dependency_watcher.elapsed_time())) {
|
||||
LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " "
|
||||
<< id() << " " << _node_id << " block tasks: " << _blocked_task.size();
|
||||
}
|
||||
std::unique_lock<std::mutex> lc(_task_lock);
|
||||
if (!_set_state->ready_for_read && task) {
|
||||
add_block_task(task);
|
||||
}
|
||||
return _set_state->ready_for_read ? nullptr : this;
|
||||
}
|
||||
|
||||
// Notify downstream pipeline tasks this dependency is ready.
|
||||
void set_ready_for_read() override {
|
||||
if (_set_state->ready_for_read) {
|
||||
return;
|
||||
}
|
||||
_read_dependency_watcher.stop();
|
||||
_set_state->ready_for_read = true;
|
||||
}
|
||||
void set_ready_for_read() override;
|
||||
|
||||
void set_cur_child_id(int id) {
|
||||
_child_idx = id;
|
||||
_set_state->probe_finished_children_dependency[id] = this;
|
||||
if (id != 0) {
|
||||
block_writing();
|
||||
@ -801,6 +778,7 @@ public:
|
||||
|
||||
private:
|
||||
std::shared_ptr<SetSharedState> _set_state;
|
||||
int _child_idx {0};
|
||||
};
|
||||
|
||||
using PartitionedBlock = std::pair<std::shared_ptr<vectorized::Block>,
|
||||
|
||||
@ -186,7 +186,7 @@ Status PipelineXTask::_open() {
|
||||
_blocked_dep = _filter_dependency->filter_blocked_by(this);
|
||||
if (_blocked_dep) {
|
||||
set_state(PipelineTaskState::BLOCKED_FOR_RF);
|
||||
set_use_blocking_queue(false);
|
||||
set_use_blocking_queue();
|
||||
RETURN_IF_ERROR(st);
|
||||
} else if (i == 1) {
|
||||
CHECK(false) << debug_string();
|
||||
|
||||
@ -127,7 +127,7 @@ public:
|
||||
|
||||
OperatorXs operatorXs() { return _operators; }
|
||||
|
||||
bool push_blocked_task_to_queue() {
|
||||
bool push_blocked_task_to_queue() const override {
|
||||
/**
|
||||
* Push task into blocking queue if:
|
||||
* 1. `_use_blocking_queue` is true.
|
||||
@ -135,15 +135,19 @@ public:
|
||||
*/
|
||||
return _use_blocking_queue || get_state() == PipelineTaskState::BLOCKED_FOR_DEPENDENCY;
|
||||
}
|
||||
void set_use_blocking_queue(bool use_blocking_queue) {
|
||||
_use_blocking_queue = use_blocking_queue;
|
||||
void set_use_blocking_queue() {
|
||||
if (_blocked_dep->push_to_blocking_queue()) {
|
||||
_use_blocking_queue = true;
|
||||
return;
|
||||
}
|
||||
_use_blocking_queue = false;
|
||||
}
|
||||
|
||||
private:
|
||||
Dependency* _write_blocked_dependency() {
|
||||
_blocked_dep = _write_dependencies->write_blocked_by(this);
|
||||
if (_blocked_dep != nullptr) {
|
||||
set_use_blocking_queue(false);
|
||||
set_use_blocking_queue();
|
||||
static_cast<WriteDependency*>(_blocked_dep)->start_write_watcher();
|
||||
return _blocked_dep;
|
||||
}
|
||||
@ -154,7 +158,7 @@ private:
|
||||
for (auto* fin_dep : _finish_dependencies) {
|
||||
_blocked_dep = fin_dep->finish_blocked_by(this);
|
||||
if (_blocked_dep != nullptr) {
|
||||
set_use_blocking_queue(false);
|
||||
set_use_blocking_queue();
|
||||
static_cast<FinishDependency*>(_blocked_dep)->start_finish_watcher();
|
||||
return _blocked_dep;
|
||||
}
|
||||
@ -166,8 +170,7 @@ private:
|
||||
for (auto* op_dep : _read_dependencies) {
|
||||
_blocked_dep = op_dep->read_blocked_by(this);
|
||||
if (_blocked_dep != nullptr) {
|
||||
// TODO(gabriel):
|
||||
set_use_blocking_queue(true);
|
||||
set_use_blocking_queue();
|
||||
_blocked_dep->start_read_watcher();
|
||||
return _blocked_dep;
|
||||
}
|
||||
|
||||
@ -77,7 +77,7 @@ Status BlockedTaskScheduler::add_blocked_task(PipelineTask* task) {
|
||||
return Status::InternalError("BlockedTaskScheduler shutdown");
|
||||
}
|
||||
std::unique_lock<std::mutex> lock(_task_mutex);
|
||||
if (task->is_pipelineX() && !static_cast<PipelineXTask*>(task)->push_blocked_task_to_queue()) {
|
||||
if (!static_cast<PipelineXTask*>(task)->push_blocked_task_to_queue()) {
|
||||
// put this task into current dependency's blocking queue and wait for event notification
|
||||
// instead of using a separate BlockedTaskScheduler.
|
||||
return Status::OK();
|
||||
@ -142,6 +142,11 @@ void BlockedTaskScheduler::_schedule() {
|
||||
} else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
|
||||
if (task->source_can_read()) {
|
||||
_make_task_run(local_blocked_tasks, iter);
|
||||
} else if (!task->push_blocked_task_to_queue()) {
|
||||
// TODO(gabriel): This condition means this task is in blocking queue now and we should
|
||||
// remove it because this new dependency should not be put into blocking queue. We
|
||||
// will delete this strange behavior after ScanDependency and UnionDependency done.
|
||||
local_blocked_tasks.erase(iter++);
|
||||
} else {
|
||||
iter++;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user