[debug](timeout) debug select timeout (#29627)
--------- Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
@ -74,7 +74,7 @@ std::string ScanOperator::debug_string() const {
|
||||
fmt::format_to(debug_string_buffer, "{}, scanner_ctx is null: {} ",
|
||||
SourceOperator::debug_string(), _node->_scanner_ctx == nullptr);
|
||||
if (_node->_scanner_ctx) {
|
||||
fmt::format_to(debug_string_buffer, ", num_running_scanners = {}",
|
||||
fmt::format_to(debug_string_buffer, ", scanner ctx detail = {}",
|
||||
_node->_scanner_ctx->debug_string());
|
||||
}
|
||||
return fmt::to_string(debug_string_buffer);
|
||||
@ -557,14 +557,7 @@ std::string ScanLocalState<Derived>::debug_string(int indentation_level) const {
|
||||
_eos.load());
|
||||
if (_scanner_ctx) {
|
||||
fmt::format_to(debug_string_buffer, "");
|
||||
fmt::format_to(debug_string_buffer,
|
||||
", Scanner Context: (_is_finished = {}, _should_stop = {}, "
|
||||
"_num_running_scanners={}, "
|
||||
" _num_unfinished_scanners = {}, status = {})",
|
||||
_scanner_ctx->is_finished(), _scanner_ctx->should_stop(),
|
||||
_scanner_ctx->get_num_running_scanners(),
|
||||
_scanner_ctx->get_num_unfinished_scanners(),
|
||||
_scanner_ctx->status().to_string());
|
||||
fmt::format_to(debug_string_buffer, ", Scanner Context: {}", _scanner_ctx->debug_string());
|
||||
}
|
||||
|
||||
return fmt::to_string(debug_string_buffer);
|
||||
|
||||
@ -287,7 +287,7 @@ public:
|
||||
<< print_id(_state->fragment_instance_id())
|
||||
<< " current pipeline exceed run time "
|
||||
<< config::enable_debug_log_timeout_secs << " seconds. Task state "
|
||||
<< get_state_name(get_state()) << debug_string();
|
||||
<< get_state_name(get_state()) << "/n task detail:" << debug_string();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -276,10 +276,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
|
||||
// (if the scheduler continues to schedule, it will cause a lot of busy running).
|
||||
// At this point, consumers are required to trigger new scheduling to ensure that
|
||||
// data can be continuously fetched.
|
||||
int64_t cur_bytes_in_queue = _cur_bytes_in_queue;
|
||||
int32_t serving_blocks_num = _serving_blocks_num;
|
||||
bool to_be_schedule = should_be_scheduled();
|
||||
int num_running_scanners = _num_running_scanners;
|
||||
|
||||
bool is_scheduled = false;
|
||||
if (!done() && to_be_schedule && _num_running_scanners == 0) {
|
||||
@ -296,10 +293,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
|
||||
SCOPED_TIMER(_scanner_wait_batch_timer);
|
||||
while (!(!_blocks_queue.empty() || done() || !status().ok() || state->is_cancelled())) {
|
||||
if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) {
|
||||
LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue
|
||||
<< ", serving_blocks_num " << serving_blocks_num
|
||||
<< ", num_running_scanners " << num_running_scanners
|
||||
<< ", to_be_scheudle " << to_be_schedule << (void*)this;
|
||||
LOG(INFO) << debug_string();
|
||||
}
|
||||
_blocks_queue_added_cv.wait_for(l, 1s);
|
||||
}
|
||||
@ -393,6 +387,8 @@ void ScannerContext::set_status_on_error(const Status& status, bool need_lock) {
|
||||
_blocks_queue_added_cv.notify_one();
|
||||
_should_stop = true;
|
||||
_set_scanner_done();
|
||||
LOG(INFO) << "ctx is set status on error " << debug_string()
|
||||
<< ", call stack is: " << Status::InternalError<true>("catch error status");
|
||||
}
|
||||
}
|
||||
|
||||
@ -456,14 +452,15 @@ void ScannerContext::_set_scanner_done() {
|
||||
|
||||
std::string ScannerContext::debug_string() {
|
||||
return fmt::format(
|
||||
"id: {}, sacnners: {}, blocks in queue: {},"
|
||||
"id: {}, total scanners: {}, scanners: {}, blocks in queue: {},"
|
||||
" status: {}, _should_stop: {}, _is_finished: {}, free blocks: {},"
|
||||
" limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
|
||||
" _block_per_scanner: {}, _cur_bytes_in_queue: {}, MAX_BYTE_OF_QUEUE: {}, "
|
||||
"num_ctx_scheduled: {}, query_id: {}",
|
||||
ctx_id, _scanners.size(), _blocks_queue.size(), status().ok(), _should_stop,
|
||||
_is_finished, _free_blocks.size_approx(), limit, _num_running_scanners, _max_thread_num,
|
||||
_block_per_scanner, _cur_bytes_in_queue, _max_bytes_in_queue, num_ctx_scheduled(),
|
||||
"num_ctx_scheduled: {}, serving_blocks_num: {}, allowed_blocks_num: {}, query_id: {}",
|
||||
ctx_id, _all_scanners.size(), _scanners.size(), _blocks_queue.size(),
|
||||
_process_status.to_string(), _should_stop, _is_finished, _free_blocks.size_approx(),
|
||||
limit, _num_running_scanners, _max_thread_num, _block_per_scanner, _cur_bytes_in_queue,
|
||||
_max_bytes_in_queue, num_ctx_scheduled(), _serving_blocks_num, allowed_blocks_num(),
|
||||
print_id(_query_id));
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user