diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 37be7a0181..851e0ad325 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -74,7 +74,7 @@ std::string ScanOperator::debug_string() const { fmt::format_to(debug_string_buffer, "{}, scanner_ctx is null: {} ", SourceOperator::debug_string(), _node->_scanner_ctx == nullptr); if (_node->_scanner_ctx) { - fmt::format_to(debug_string_buffer, ", num_running_scanners = {}", + fmt::format_to(debug_string_buffer, ", scanner ctx detail = {}", _node->_scanner_ctx->debug_string()); } return fmt::to_string(debug_string_buffer); @@ -557,14 +557,7 @@ std::string ScanLocalState::debug_string(int indentation_level) const { _eos.load()); if (_scanner_ctx) { fmt::format_to(debug_string_buffer, ""); - fmt::format_to(debug_string_buffer, - ", Scanner Context: (_is_finished = {}, _should_stop = {}, " - "_num_running_scanners={}, " - " _num_unfinished_scanners = {}, status = {})", - _scanner_ctx->is_finished(), _scanner_ctx->should_stop(), - _scanner_ctx->get_num_running_scanners(), - _scanner_ctx->get_num_unfinished_scanners(), - _scanner_ctx->status().to_string()); + fmt::format_to(debug_string_buffer, ", Scanner Context: {}", _scanner_ctx->debug_string()); } return fmt::to_string(debug_string_buffer); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 30022792e8..85951bdb06 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -287,7 +287,7 @@ public: << print_id(_state->fragment_instance_id()) << " current pipeline exceed run time " << config::enable_debug_log_timeout_secs << " seconds. Task state " - << get_state_name(get_state()) << debug_string(); + << get_state_name(get_state()) << "/n task detail:" << debug_string(); } } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index b74461ac88..bba3190689 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -276,10 +276,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo // (if the scheduler continues to schedule, it will cause a lot of busy running). // At this point, consumers are required to trigger new scheduling to ensure that // data can be continuously fetched. - int64_t cur_bytes_in_queue = _cur_bytes_in_queue; - int32_t serving_blocks_num = _serving_blocks_num; bool to_be_schedule = should_be_scheduled(); - int num_running_scanners = _num_running_scanners; bool is_scheduled = false; if (!done() && to_be_schedule && _num_running_scanners == 0) { @@ -296,10 +293,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo SCOPED_TIMER(_scanner_wait_batch_timer); while (!(!_blocks_queue.empty() || done() || !status().ok() || state->is_cancelled())) { if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) { - LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue - << ", serving_blocks_num " << serving_blocks_num - << ", num_running_scanners " << num_running_scanners - << ", to_be_scheudle " << to_be_schedule << (void*)this; + LOG(INFO) << debug_string(); } _blocks_queue_added_cv.wait_for(l, 1s); } @@ -393,6 +387,8 @@ void ScannerContext::set_status_on_error(const Status& status, bool need_lock) { _blocks_queue_added_cv.notify_one(); _should_stop = true; _set_scanner_done(); + LOG(INFO) << "ctx is set status on error " << debug_string() + << ", call stack is: " << Status::InternalError("catch error status"); } } @@ -456,14 +452,15 @@ void ScannerContext::_set_scanner_done() { std::string ScannerContext::debug_string() { return fmt::format( - "id: {}, sacnners: {}, blocks in queue: {}," + "id: {}, total scanners: {}, scanners: {}, blocks in queue: {}," " status: {}, _should_stop: {}, _is_finished: {}, free blocks: {}," " limit: {}, _num_running_scanners: {}, _max_thread_num: {}," " _block_per_scanner: {}, _cur_bytes_in_queue: {}, MAX_BYTE_OF_QUEUE: {}, " - "num_ctx_scheduled: {}, query_id: {}", - ctx_id, _scanners.size(), _blocks_queue.size(), status().ok(), _should_stop, - _is_finished, _free_blocks.size_approx(), limit, _num_running_scanners, _max_thread_num, - _block_per_scanner, _cur_bytes_in_queue, _max_bytes_in_queue, num_ctx_scheduled(), + "num_ctx_scheduled: {}, serving_blocks_num: {}, allowed_blocks_num: {}, query_id: {}", + ctx_id, _all_scanners.size(), _scanners.size(), _blocks_queue.size(), + _process_status.to_string(), _should_stop, _is_finished, _free_blocks.size_approx(), + limit, _num_running_scanners, _max_thread_num, _block_per_scanner, _cur_bytes_in_queue, + _max_bytes_in_queue, num_ctx_scheduled(), _serving_blocks_num, allowed_blocks_num(), print_id(_query_id)); }