diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index c720c22d04..b71b72ab3c 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -48,7 +48,7 @@ public: set_status_on_error(Status::Cancelled("cancelled"), false); } - if (!_process_status.ok()) { + if (!status().ok()) { return _process_status; } } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 049283647c..2f535e4947 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -199,7 +199,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo // Wait for block from queue if (wait) { SCOPED_TIMER(_scanner_wait_batch_timer); - while (!(!_blocks_queue.empty() || _is_finished || !_process_status.ok() || + while (!(!_blocks_queue.empty() || _is_finished || !status().ok() || state->is_cancelled())) { _blocks_queue_added_cv.wait(l); } @@ -231,7 +231,7 @@ bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) { if (need_lock) { l.lock(); } - if (_process_status.ok()) { + if (this->status().ok()) { _process_status = status; _status_error = true; _blocks_queue_added_cv.notify_one(); @@ -327,7 +327,7 @@ std::string ScannerContext::debug_string() { " status: {}, _should_stop: {}, _is_finished: {}, free blocks: {}," " limit: {}, _num_running_scanners: {}, _num_scheduling_ctx: {}, _max_thread_num: {}," " _block_per_scanner: {}, _cur_bytes_in_queue: {}, MAX_BYTE_OF_QUEUE: {}", - ctx_id, _scanners.size(), _blocks_queue.size(), _process_status.ok(), _should_stop, + ctx_id, _scanners.size(), _blocks_queue.size(), status().ok(), _should_stop, _is_finished, _free_blocks.size_approx(), limit, _num_running_scanners, _num_scheduling_ctx, _max_thread_num, _block_per_scanner, _cur_bytes_in_queue, _max_bytes_in_queue);