[schedule](pipeline) Remove wait schedule time in pipeline query engine (#23994)
Co-authored-by: yiguolei <676222867@qq.com>
This commit is contained in:
@ -85,7 +85,6 @@ void PipelineTask::_fresh_profile_counter() {
|
||||
COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
|
||||
COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time());
|
||||
COUNTER_SET(_wait_worker_timer, (int64_t)_wait_worker_watcher.elapsed_time());
|
||||
COUNTER_SET(_wait_schedule_timer, (int64_t)_wait_schedule_watcher.elapsed_time());
|
||||
COUNTER_SET(_begin_execute_timer, _begin_execute_time);
|
||||
COUNTER_SET(_eos_timer, _eos_time);
|
||||
COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time);
|
||||
@ -116,7 +115,6 @@ void PipelineTask::_init_profile() {
|
||||
_wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
|
||||
_wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
|
||||
_wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
|
||||
_wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime");
|
||||
_block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
|
||||
_block_by_source_counts = ADD_COUNTER(_task_profile, "NumBlockedBySrcTimes", TUnit::UNIT);
|
||||
_block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT);
|
||||
|
||||
@ -133,8 +133,6 @@ public:
|
||||
_wait_worker_watcher.start();
|
||||
}
|
||||
void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
|
||||
void start_schedule_watcher() { _wait_schedule_watcher.start(); }
|
||||
void stop_schedule_watcher() { _wait_schedule_watcher.stop(); }
|
||||
PipelineTaskState get_state() { return _cur_state; }
|
||||
void set_state(PipelineTaskState state);
|
||||
|
||||
@ -311,8 +309,6 @@ protected:
|
||||
MonotonicStopWatch _wait_worker_watcher;
|
||||
RuntimeProfile::Counter* _wait_worker_timer;
|
||||
// TODO we should calculate the time between when really runnable and runnable
|
||||
MonotonicStopWatch _wait_schedule_watcher;
|
||||
RuntimeProfile::Counter* _wait_schedule_timer;
|
||||
RuntimeProfile::Counter* _yield_counts;
|
||||
RuntimeProfile::Counter* _core_change_times;
|
||||
|
||||
|
||||
@ -85,7 +85,6 @@ void BlockedTaskScheduler::_schedule() {
|
||||
_started.store(true);
|
||||
std::list<PipelineTask*> local_blocked_tasks;
|
||||
int empty_times = 0;
|
||||
std::vector<PipelineTask*> ready_tasks;
|
||||
|
||||
while (!_shutdown) {
|
||||
{
|
||||
@ -105,6 +104,7 @@ void BlockedTaskScheduler::_schedule() {
|
||||
}
|
||||
}
|
||||
|
||||
auto origin_local_block_tasks_size = local_blocked_tasks.size();
|
||||
auto iter = local_blocked_tasks.begin();
|
||||
vectorized::VecDateTimeValue now = vectorized::VecDateTimeValue::local_time();
|
||||
while (iter != local_blocked_tasks.end()) {
|
||||
@ -116,57 +116,52 @@ void BlockedTaskScheduler::_schedule() {
|
||||
VLOG_DEBUG << "Task pending" << task->debug_string();
|
||||
iter++;
|
||||
} else {
|
||||
_make_task_run(local_blocked_tasks, iter, ready_tasks,
|
||||
PipelineTaskState::PENDING_FINISH);
|
||||
_make_task_run(local_blocked_tasks, iter, PipelineTaskState::PENDING_FINISH);
|
||||
}
|
||||
} else if (task->query_context()->is_cancelled()) {
|
||||
_make_task_run(local_blocked_tasks, iter, ready_tasks);
|
||||
_make_task_run(local_blocked_tasks, iter);
|
||||
} else if (task->query_context()->is_timeout(now)) {
|
||||
LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id())
|
||||
<< ", instance_id=" << print_id(task->instance_id())
|
||||
<< ", task info: " << task->debug_string();
|
||||
|
||||
task->query_context()->cancel(true, "", Status::Cancelled(""));
|
||||
_make_task_run(local_blocked_tasks, iter, ready_tasks);
|
||||
_make_task_run(local_blocked_tasks, iter);
|
||||
} else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
|
||||
if (task->has_dependency()) {
|
||||
iter++;
|
||||
} else {
|
||||
_make_task_run(local_blocked_tasks, iter, ready_tasks);
|
||||
_make_task_run(local_blocked_tasks, iter);
|
||||
}
|
||||
} else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
|
||||
if (task->source_can_read()) {
|
||||
_make_task_run(local_blocked_tasks, iter, ready_tasks);
|
||||
_make_task_run(local_blocked_tasks, iter);
|
||||
} else {
|
||||
iter++;
|
||||
}
|
||||
} else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
|
||||
if (task->runtime_filters_are_ready_or_timeout()) {
|
||||
_make_task_run(local_blocked_tasks, iter, ready_tasks);
|
||||
_make_task_run(local_blocked_tasks, iter);
|
||||
} else {
|
||||
iter++;
|
||||
}
|
||||
} else if (state == PipelineTaskState::BLOCKED_FOR_SINK) {
|
||||
if (task->sink_can_write()) {
|
||||
_make_task_run(local_blocked_tasks, iter, ready_tasks);
|
||||
_make_task_run(local_blocked_tasks, iter);
|
||||
} else {
|
||||
iter++;
|
||||
}
|
||||
} else {
|
||||
// TODO: DCHECK the state
|
||||
_make_task_run(local_blocked_tasks, iter, ready_tasks);
|
||||
_make_task_run(local_blocked_tasks, iter);
|
||||
}
|
||||
}
|
||||
|
||||
if (ready_tasks.empty()) {
|
||||
if (origin_local_block_tasks_size == 0 ||
|
||||
local_blocked_tasks.size() == origin_local_block_tasks_size) {
|
||||
empty_times += 1;
|
||||
} else {
|
||||
empty_times = 0;
|
||||
for (auto& task : ready_tasks) {
|
||||
task->stop_schedule_watcher();
|
||||
_task_queue->push_back(task);
|
||||
}
|
||||
ready_tasks.clear();
|
||||
}
|
||||
|
||||
if (empty_times != 0 && (empty_times & (EMPTY_TIMES_TO_YIELD - 1)) == 0) {
|
||||
@ -186,13 +181,11 @@ void BlockedTaskScheduler::_schedule() {
|
||||
|
||||
void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
|
||||
std::list<PipelineTask*>::iterator& task_itr,
|
||||
std::vector<PipelineTask*>& ready_tasks,
|
||||
PipelineTaskState t_state) {
|
||||
auto task = *task_itr;
|
||||
task->start_schedule_watcher();
|
||||
task->set_state(t_state);
|
||||
local_tasks.erase(task_itr++);
|
||||
ready_tasks.emplace_back(task);
|
||||
_task_queue->push_back(task);
|
||||
}
|
||||
|
||||
TaskScheduler::~TaskScheduler() {
|
||||
|
||||
@ -71,7 +71,6 @@ private:
|
||||
void _schedule();
|
||||
void _make_task_run(std::list<PipelineTask*>& local_tasks,
|
||||
std::list<PipelineTask*>::iterator& task_itr,
|
||||
std::vector<PipelineTask*>& ready_tasks,
|
||||
PipelineTaskState state = PipelineTaskState::RUNNABLE);
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user