From c57fa7c9306df4883727a93831007381e66ccb7e Mon Sep 17 00:00:00 2001 From: Lijia Liu Date: Mon, 9 Jan 2023 08:46:58 +0800 Subject: [PATCH] [Pipeline] Fix PipScannerContext::can_finish return wrong status (#15259) Now in ScannerContext::push_back_scanner_and_reschedule, _num_running_scanners-- is before _num_scheduling_ctx++. InPipScannerContext::can_finish, we check _num_running_scanners == 0 && _num_scheduling_ctx == 0 without obtaining _transfer_lock. In follow case, PipScannerContext::can_finish will return wrong result. _num_running_scanners-- Check _num_running_scanners == 0 && _num_scheduling_ctx == 0` return true. _num_scheduling_ctx++ So, we can set _num_running_scanners-- in the last of this func. Describe your changes. PipScannerContext::get_block_from_queue not block. Set _num_running_scanners-- in the last of ScannerContext::push_back_scanner_and_reschedule. --- be/src/pipeline/exec/scan_operator.cpp | 6 +-- be/src/pipeline/pipeline_task.cpp | 44 ++++++++++++++++++---- be/src/pipeline/pipeline_task.h | 19 +++++++++- be/src/vec/exec/scan/pip_scanner_context.h | 6 ++- be/src/vec/exec/scan/scanner_context.cpp | 14 ++++--- be/src/vec/exec/scan/scanner_context.h | 4 +- be/src/vec/exec/vbroker_scan_node.h | 2 +- 7 files changed, 73 insertions(+), 22 deletions(-) diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index f9f2125878..16b210ddf7 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -25,10 +25,10 @@ namespace doris::pipeline { OPERATOR_CODE_GENERATOR(ScanOperator, SourceOperator) bool ScanOperator::can_read() { - if (_node->_eos || _node->_scanner_ctx->done() || _node->_scanner_ctx->can_finish()) { + if (_node->_eos || _node->_scanner_ctx->done() || _node->_scanner_ctx->no_schedule()) { // _eos: need eos // _scanner_ctx->done(): need finish - // _scanner_ctx->can_finish(): should be scheduled + // _scanner_ctx->no_schedule(): should schedule _scanner_ctx return true; } else { return !_node->_scanner_ctx->empty_in_queue(); // there are some blocks to process @@ -36,7 +36,7 @@ bool ScanOperator::can_read() { } bool ScanOperator::is_pending_finish() const { - return _node->_scanner_ctx && !_node->_scanner_ctx->can_finish(); + return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule(); } bool ScanOperator::runtime_filters_are_ready_or_timeout() { diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 3cd5f7f1e3..8aa033a759 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -28,8 +28,17 @@ void PipelineTask::_init_profile() { auto* task_profile = new RuntimeProfile(ss.str()); _parent_profile->add_child(task_profile, true, nullptr); _task_profile.reset(task_profile); - _sink_timer = ADD_TIMER(_task_profile, "SinkTime"); - _get_block_timer = ADD_TIMER(_task_profile, "GetBlockTime"); + _task_cpu_timer = ADD_TIMER(_task_profile, "TaskCpuTime"); + + static const char* exec_time = "ExecuteTime"; + _exec_timer = ADD_TIMER(_task_profile, exec_time); + _prepare_timer = ADD_CHILD_TIMER(_task_profile, "PrepareTime", exec_time); + _open_timer = ADD_CHILD_TIMER(_task_profile, "OpenTime", exec_time); + _get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", exec_time); + _sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time); + _finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime", exec_time); + _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time); + _wait_source_timer = ADD_TIMER(_task_profile, "WaitSourceTime"); _wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime"); _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime"); @@ -39,12 +48,16 @@ void PipelineTask::_init_profile() { _block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT); _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT); _yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT); + _core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", TUnit::UNIT); } Status PipelineTask::prepare(RuntimeState* state) { DCHECK(_sink); DCHECK(_cur_state == NOT_READY); _init_profile(); + SCOPED_TIMER(_task_profile->total_time_counter()); + SCOPED_CPU_TIMER(_task_cpu_timer); + SCOPED_TIMER(_prepare_timer); RETURN_IF_ERROR(_sink->prepare(state)); for (auto& o : _operators) { RETURN_IF_ERROR(o->prepare(state)); @@ -94,6 +107,9 @@ bool PipelineTask::has_dependency() { } Status PipelineTask::open() { + SCOPED_TIMER(_task_profile->total_time_counter()); + SCOPED_CPU_TIMER(_task_cpu_timer); + SCOPED_TIMER(_open_timer); for (auto& o : _operators) { RETURN_IF_ERROR(o->open(_state)); } @@ -105,8 +121,10 @@ Status PipelineTask::open() { } Status PipelineTask::execute(bool* eos) { - SCOPED_ATTACH_TASK(runtime_state()); SCOPED_TIMER(_task_profile->total_time_counter()); + SCOPED_CPU_TIMER(_task_cpu_timer); + SCOPED_TIMER(_exec_timer); + SCOPED_ATTACH_TASK(runtime_state()); int64_t time_spent = 0; // The status must be runnable *eos = false; @@ -170,15 +188,23 @@ Status PipelineTask::execute(bool* eos) { } Status PipelineTask::finalize() { + SCOPED_TIMER(_task_profile->total_time_counter()); + SCOPED_CPU_TIMER(_task_cpu_timer); + SCOPED_TIMER(_finalize_timer); return _sink->finalize(_state); } Status PipelineTask::close() { - auto s = _sink->close(_state); - for (auto& op : _operators) { - auto tem = op->close(_state); - if (!tem.ok() && s.ok()) { - s = tem; + int64_t close_ns = 0; + Status s; + { + SCOPED_RAW_TIMER(&close_ns); + s = _sink->close(_state); + for (auto& op : _operators) { + auto tem = op->close(_state); + if (!tem.ok() && s.ok()) { + s = tem; + } } } if (_opened) { @@ -187,6 +213,8 @@ Status PipelineTask::close() { COUNTER_UPDATE(_wait_sink_timer, _wait_sink_watcher.elapsed_time()); COUNTER_UPDATE(_wait_worker_timer, _wait_worker_watcher.elapsed_time()); COUNTER_UPDATE(_wait_schedule_timer, _wait_schedule_watcher.elapsed_time()); + COUNTER_UPDATE(_close_timer, close_ns); + COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns); } return s; } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index f6a6512201..2e57cf6453 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -156,7 +156,15 @@ public: : _pipeline->_previous_schedule_id; } - void set_previous_core_id(int id) { _previous_schedule_id = id; } + void set_previous_core_id(int id) { + if (id == _previous_schedule_id) { + return; + } + if (_previous_schedule_id != -1) { + COUNTER_UPDATE(_core_change_times, 1); + } + _previous_schedule_id = id; + } bool has_dependency(); @@ -197,8 +205,14 @@ private: RuntimeProfile* _parent_profile; std::unique_ptr _task_profile; - RuntimeProfile::Counter* _sink_timer; + RuntimeProfile::Counter* _task_cpu_timer; + RuntimeProfile::Counter* _prepare_timer; + RuntimeProfile::Counter* _open_timer; + RuntimeProfile::Counter* _exec_timer; RuntimeProfile::Counter* _get_block_timer; + RuntimeProfile::Counter* _sink_timer; + RuntimeProfile::Counter* _finalize_timer; + RuntimeProfile::Counter* _close_timer; RuntimeProfile::Counter* _block_counts; RuntimeProfile::Counter* _block_by_source_counts; RuntimeProfile::Counter* _block_by_sink_counts; @@ -213,5 +227,6 @@ private: MonotonicStopWatch _wait_schedule_watcher; RuntimeProfile::Counter* _wait_schedule_timer; RuntimeProfile::Counter* _yield_counts; + RuntimeProfile::Counter* _core_change_times; }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index d595396c72..20d45aca95 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -35,9 +35,13 @@ public: void _update_block_queue_empty() override { _blocks_queue_empty = _blocks_queue.empty(); } + Status get_block_from_queue(vectorized::Block** block, bool* eos, bool wait = false) override { + return vectorized::ScannerContext::get_block_from_queue(block, eos, false); + } + // We should make those method lock free. bool done() override { return _is_finished || _should_stop || _status_error; } - bool can_finish() override { return _num_running_scanners == 0 && _num_scheduling_ctx == 0; } + bool no_schedule() override { return _num_running_scanners == 0 && _num_scheduling_ctx == 0; } bool empty_in_queue() override { return _blocks_queue_empty; } private: diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 1940758907..29b379ecfe 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -133,7 +133,7 @@ Status ScannerContext::get_block_from_queue(vectorized::Block** block, bool* eos _state->exec_env()->scanner_scheduler()->submit(this); } // Wait for block from queue - { + if (wait) { SCOPED_TIMER(_parent->_scanner_wait_batch_timer); _blocks_queue_added_cv.wait(l, [this]() { return !_blocks_queue.empty() || _is_finished || !_process_status.ok() || @@ -240,7 +240,7 @@ void ScannerContext::clear_and_join() { return; } -bool ScannerContext::can_finish() { +bool ScannerContext::no_schedule() { std::unique_lock l(_transfer_lock); return _num_running_scanners == 0 && _num_scheduling_ctx == 0; } @@ -263,9 +263,11 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) { } std::lock_guard l(_transfer_lock); - _num_running_scanners--; _num_scheduling_ctx++; - _state->exec_env()->scanner_scheduler()->submit(this); + auto submit_st = _state->exec_env()->scanner_scheduler()->submit(this); + if (!submit_st.ok()) { + _num_scheduling_ctx--; + } // Notice that after calling "_scanners.push_front(scanner)", there may be other ctx in scheduler // to schedule that scanner right away, and in that schedule run, the scanner may be marked as closed @@ -274,14 +276,16 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) { // same scanner. if (scanner->need_to_close() && scanner->set_counted_down() && (--_num_unfinished_scanners) == 0) { - _is_finished = true; // ATTN: this 2 counters will be set at close() again, which is the final values. // But we set them here because the counter set at close() can not send to FE's profile. // So we set them here, and the counter value may be little less than final values. COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling); COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling); + _is_finished = true; _blocks_queue_added_cv.notify_one(); } + // In pipeline engine, doris will close scanners when `no_schedule`. + _num_running_scanners--; _ctx_finish_cv.notify_one(); } diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 2a2d0ffcf1..c6c6afe1e3 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -78,7 +78,7 @@ public: // Get next block from blocks queue. Called by ScanNode // Set eos to true if there is no more data to read. // And if eos is true, the block returned must be nullptr. - Status get_block_from_queue(vectorized::Block** block, bool* eos, bool wait = true); + virtual Status get_block_from_queue(vectorized::Block** block, bool* eos, bool wait = true); // When a scanner complete a scan, this method will be called // to return the scanner to the list for next scheduling. @@ -118,7 +118,7 @@ public: void clear_and_join(); - virtual bool can_finish(); + virtual bool no_schedule(); std::string debug_string(); diff --git a/be/src/vec/exec/vbroker_scan_node.h b/be/src/vec/exec/vbroker_scan_node.h index de46104088..bad1bd93c7 100644 --- a/be/src/vec/exec/vbroker_scan_node.h +++ b/be/src/vec/exec/vbroker_scan_node.h @@ -89,7 +89,7 @@ private: std::condition_variable _queue_reader_cond; std::condition_variable _queue_writer_cond; - int _num_running_scanners; + std::atomic _num_running_scanners; std::atomic _scan_finished;