[Pipeline] Fix PipScannerContext::can_finish return wrong status (#15259)

Now in ScannerContext::push_back_scanner_and_reschedule, _num_running_scanners-- is before _num_scheduling_ctx++.
InPipScannerContext::can_finish, we check _num_running_scanners == 0 && _num_scheduling_ctx == 0 without obtaining _transfer_lock.
In follow case, PipScannerContext::can_finish will return wrong result.

_num_running_scanners--
Check _num_running_scanners == 0 && _num_scheduling_ctx == 0` return true.
_num_scheduling_ctx++
So, we can set _num_running_scanners-- in the last of this func.

Describe your changes.

PipScannerContext::get_block_from_queue not block.
Set _num_running_scanners-- in the last of ScannerContext::push_back_scanner_and_reschedule.
This commit is contained in:
Lijia Liu
2023-01-09 08:46:58 +08:00
committed by GitHub
parent 663676ccfe
commit c57fa7c930
7 changed files with 73 additions and 22 deletions

View File

@ -25,10 +25,10 @@ namespace doris::pipeline {
OPERATOR_CODE_GENERATOR(ScanOperator, SourceOperator)
bool ScanOperator::can_read() {
if (_node->_eos || _node->_scanner_ctx->done() || _node->_scanner_ctx->can_finish()) {
if (_node->_eos || _node->_scanner_ctx->done() || _node->_scanner_ctx->no_schedule()) {
// _eos: need eos
// _scanner_ctx->done(): need finish
// _scanner_ctx->can_finish(): should be scheduled
// _scanner_ctx->no_schedule(): should schedule _scanner_ctx
return true;
} else {
return !_node->_scanner_ctx->empty_in_queue(); // there are some blocks to process
@ -36,7 +36,7 @@ bool ScanOperator::can_read() {
}
bool ScanOperator::is_pending_finish() const {
return _node->_scanner_ctx && !_node->_scanner_ctx->can_finish();
return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule();
}
bool ScanOperator::runtime_filters_are_ready_or_timeout() {

View File

@ -28,8 +28,17 @@ void PipelineTask::_init_profile() {
auto* task_profile = new RuntimeProfile(ss.str());
_parent_profile->add_child(task_profile, true, nullptr);
_task_profile.reset(task_profile);
_sink_timer = ADD_TIMER(_task_profile, "SinkTime");
_get_block_timer = ADD_TIMER(_task_profile, "GetBlockTime");
_task_cpu_timer = ADD_TIMER(_task_profile, "TaskCpuTime");
static const char* exec_time = "ExecuteTime";
_exec_timer = ADD_TIMER(_task_profile, exec_time);
_prepare_timer = ADD_CHILD_TIMER(_task_profile, "PrepareTime", exec_time);
_open_timer = ADD_CHILD_TIMER(_task_profile, "OpenTime", exec_time);
_get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", exec_time);
_sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
_finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime", exec_time);
_close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
_wait_source_timer = ADD_TIMER(_task_profile, "WaitSourceTime");
_wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
_wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
@ -39,12 +48,16 @@ void PipelineTask::_init_profile() {
_block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT);
_schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT);
_yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
_core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", TUnit::UNIT);
}
Status PipelineTask::prepare(RuntimeState* state) {
DCHECK(_sink);
DCHECK(_cur_state == NOT_READY);
_init_profile();
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_prepare_timer);
RETURN_IF_ERROR(_sink->prepare(state));
for (auto& o : _operators) {
RETURN_IF_ERROR(o->prepare(state));
@ -94,6 +107,9 @@ bool PipelineTask::has_dependency() {
}
Status PipelineTask::open() {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_open_timer);
for (auto& o : _operators) {
RETURN_IF_ERROR(o->open(_state));
}
@ -105,8 +121,10 @@ Status PipelineTask::open() {
}
Status PipelineTask::execute(bool* eos) {
SCOPED_ATTACH_TASK(runtime_state());
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(runtime_state());
int64_t time_spent = 0;
// The status must be runnable
*eos = false;
@ -170,15 +188,23 @@ Status PipelineTask::execute(bool* eos) {
}
Status PipelineTask::finalize() {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_finalize_timer);
return _sink->finalize(_state);
}
Status PipelineTask::close() {
auto s = _sink->close(_state);
for (auto& op : _operators) {
auto tem = op->close(_state);
if (!tem.ok() && s.ok()) {
s = tem;
int64_t close_ns = 0;
Status s;
{
SCOPED_RAW_TIMER(&close_ns);
s = _sink->close(_state);
for (auto& op : _operators) {
auto tem = op->close(_state);
if (!tem.ok() && s.ok()) {
s = tem;
}
}
}
if (_opened) {
@ -187,6 +213,8 @@ Status PipelineTask::close() {
COUNTER_UPDATE(_wait_sink_timer, _wait_sink_watcher.elapsed_time());
COUNTER_UPDATE(_wait_worker_timer, _wait_worker_watcher.elapsed_time());
COUNTER_UPDATE(_wait_schedule_timer, _wait_schedule_watcher.elapsed_time());
COUNTER_UPDATE(_close_timer, close_ns);
COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns);
}
return s;
}

View File

@ -156,7 +156,15 @@ public:
: _pipeline->_previous_schedule_id;
}
void set_previous_core_id(int id) { _previous_schedule_id = id; }
void set_previous_core_id(int id) {
if (id == _previous_schedule_id) {
return;
}
if (_previous_schedule_id != -1) {
COUNTER_UPDATE(_core_change_times, 1);
}
_previous_schedule_id = id;
}
bool has_dependency();
@ -197,8 +205,14 @@ private:
RuntimeProfile* _parent_profile;
std::unique_ptr<RuntimeProfile> _task_profile;
RuntimeProfile::Counter* _sink_timer;
RuntimeProfile::Counter* _task_cpu_timer;
RuntimeProfile::Counter* _prepare_timer;
RuntimeProfile::Counter* _open_timer;
RuntimeProfile::Counter* _exec_timer;
RuntimeProfile::Counter* _get_block_timer;
RuntimeProfile::Counter* _sink_timer;
RuntimeProfile::Counter* _finalize_timer;
RuntimeProfile::Counter* _close_timer;
RuntimeProfile::Counter* _block_counts;
RuntimeProfile::Counter* _block_by_source_counts;
RuntimeProfile::Counter* _block_by_sink_counts;
@ -213,5 +227,6 @@ private:
MonotonicStopWatch _wait_schedule_watcher;
RuntimeProfile::Counter* _wait_schedule_timer;
RuntimeProfile::Counter* _yield_counts;
RuntimeProfile::Counter* _core_change_times;
};
} // namespace doris::pipeline

View File

@ -35,9 +35,13 @@ public:
void _update_block_queue_empty() override { _blocks_queue_empty = _blocks_queue.empty(); }
Status get_block_from_queue(vectorized::Block** block, bool* eos, bool wait = false) override {
return vectorized::ScannerContext::get_block_from_queue(block, eos, false);
}
// We should make those method lock free.
bool done() override { return _is_finished || _should_stop || _status_error; }
bool can_finish() override { return _num_running_scanners == 0 && _num_scheduling_ctx == 0; }
bool no_schedule() override { return _num_running_scanners == 0 && _num_scheduling_ctx == 0; }
bool empty_in_queue() override { return _blocks_queue_empty; }
private:

View File

@ -133,7 +133,7 @@ Status ScannerContext::get_block_from_queue(vectorized::Block** block, bool* eos
_state->exec_env()->scanner_scheduler()->submit(this);
}
// Wait for block from queue
{
if (wait) {
SCOPED_TIMER(_parent->_scanner_wait_batch_timer);
_blocks_queue_added_cv.wait(l, [this]() {
return !_blocks_queue.empty() || _is_finished || !_process_status.ok() ||
@ -240,7 +240,7 @@ void ScannerContext::clear_and_join() {
return;
}
bool ScannerContext::can_finish() {
bool ScannerContext::no_schedule() {
std::unique_lock<std::mutex> l(_transfer_lock);
return _num_running_scanners == 0 && _num_scheduling_ctx == 0;
}
@ -263,9 +263,11 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
}
std::lock_guard<std::mutex> l(_transfer_lock);
_num_running_scanners--;
_num_scheduling_ctx++;
_state->exec_env()->scanner_scheduler()->submit(this);
auto submit_st = _state->exec_env()->scanner_scheduler()->submit(this);
if (!submit_st.ok()) {
_num_scheduling_ctx--;
}
// Notice that after calling "_scanners.push_front(scanner)", there may be other ctx in scheduler
// to schedule that scanner right away, and in that schedule run, the scanner may be marked as closed
@ -274,14 +276,16 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
// same scanner.
if (scanner->need_to_close() && scanner->set_counted_down() &&
(--_num_unfinished_scanners) == 0) {
_is_finished = true;
// ATTN: this 2 counters will be set at close() again, which is the final values.
// But we set them here because the counter set at close() can not send to FE's profile.
// So we set them here, and the counter value may be little less than final values.
COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling);
COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling);
_is_finished = true;
_blocks_queue_added_cv.notify_one();
}
// In pipeline engine, doris will close scanners when `no_schedule`.
_num_running_scanners--;
_ctx_finish_cv.notify_one();
}

View File

@ -78,7 +78,7 @@ public:
// Get next block from blocks queue. Called by ScanNode
// Set eos to true if there is no more data to read.
// And if eos is true, the block returned must be nullptr.
Status get_block_from_queue(vectorized::Block** block, bool* eos, bool wait = true);
virtual Status get_block_from_queue(vectorized::Block** block, bool* eos, bool wait = true);
// When a scanner complete a scan, this method will be called
// to return the scanner to the list for next scheduling.
@ -118,7 +118,7 @@ public:
void clear_and_join();
virtual bool can_finish();
virtual bool no_schedule();
std::string debug_string();

View File

@ -89,7 +89,7 @@ private:
std::condition_variable _queue_reader_cond;
std::condition_variable _queue_writer_cond;
int _num_running_scanners;
std::atomic<int> _num_running_scanners;
std::atomic<bool> _scan_finished;