diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index ff22f7923c..6717af3ce4 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1071,9 +1071,6 @@ void TaskWorkerPool::_handle_report(const TReportRequest& request, ReportType ty .error(result.status); } else { is_report_success = true; - LOG_INFO("successfully report {}", TYPE_STRING(type)) - .tag("host", _master_info.network_address.hostname) - .tag("port", _master_info.network_address.port); } switch (type) { case TASK: diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index ed8b11457a..16964264ed 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -208,12 +208,13 @@ void ExecNode::release_resource(doris::RuntimeState* state) { Status ExecNode::close(RuntimeState* state) { if (_is_closed) { - LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) + LOG(INFO) << "query= " << print_id(state->query_id()) + << " fragment_instance_id=" << print_id(state->fragment_instance_id()) << " already closed"; return Status::OK(); } - LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) << ", " - << " id=" << _id << " type=" << print_plan_node_type(_type) << " closed"; + LOG(INFO) << "query= " << print_id(state->query_id()) + << " fragment_instance_id=" << print_id(state->fragment_instance_id()) << " closed"; _is_closed = true; Status result; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 709e4df765..686ff0fe0a 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -166,8 +166,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) { if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { LOG(WARNING) << "PipelineFragmentContext " - << PrintInstanceStandardInfo(_query_id, _fragment_id, - _fragment_instance_id) + << PrintInstanceStandardInfo(_query_id, _fragment_instance_id) << " is canceled, cancel message: " << msg; } else { @@ -217,12 +216,9 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re tracer = telemetry::get_tracer(print_id(_query_id)); } - LOG_INFO("PipelineFragmentContext::prepare") - .tag("query_id", print_id(_query_id)) - .tag("fragment_id", _fragment_id) - .tag("instance_id", print_id(local_params.fragment_instance_id)) - .tag("backend_num", local_params.backend_num) - .tag("pthread_id", (uintptr_t)pthread_self()); + LOG_INFO("Preparing instance {}, backend_num {}", + PrintInstanceStandardInfo(_query_id, local_params.fragment_instance_id), + local_params.backend_num); // 1. init _runtime_state _runtime_state = RuntimeState::create_unique(local_params, request.query_id, @@ -284,8 +280,9 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re std::vector scan_nodes; std::vector no_scan_ranges; _root_plan->collect_scan_nodes(&scan_nodes); - VLOG_CRITICAL << "scan_nodes.size()=" << scan_nodes.size(); - VLOG_CRITICAL << "params.per_node_scan_ranges.size()=" + VLOG_CRITICAL << "query " << print_id(get_query_id()) + << " scan_nodes.size()=" << scan_nodes.size(); + VLOG_CRITICAL << "query " << print_id(get_query_id()) << " params.per_node_scan_ranges.size()=" << local_params.per_node_scan_ranges.size(); // set scan range in ScanNode @@ -310,7 +307,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); static_cast(scan_node->set_scan_ranges(scan_ranges)); - VLOG_CRITICAL << "scan_node_Id=" << scan_node->id() + VLOG_CRITICAL << "query " << print_id(get_query_id()) + << "scan_node_Id=" << scan_node->id() << " size=" << scan_ranges.get().size(); } } @@ -750,7 +748,9 @@ void PipelineFragmentContext::close_if_prepare_failed() { } for (auto& task : _tasks) { DCHECK(!task->is_pending_finish()); - WARN_IF_ERROR(task->close(Status::OK()), "close_if_prepare_failed failed: "); + std::stringstream msg; + msg << "query " << print_id(_query_id) << " closed since prepare failed"; + WARN_IF_ERROR(task->close(Status::OK()), msg.str()); close_a_pipeline(); } } diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index ffbfaef987..85834e513f 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -71,7 +71,7 @@ public: PipelinePtr add_pipeline(); - TUniqueId get_fragment_instance_id() { return _fragment_instance_id; } + TUniqueId get_fragment_instance_id() const { return _fragment_instance_id; } virtual RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/) { return _runtime_state.get(); diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index ee6f5cdd82..4ef5def6f1 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -31,6 +31,7 @@ #include #include +#include "common/logging.h" #include "common/signal_handler.h" #include "pipeline/pipeline_task.h" #include "pipeline/task_queue.h" @@ -111,9 +112,9 @@ void BlockedTaskScheduler::_schedule() { if (state == PipelineTaskState::PENDING_FINISH) { // should cancel or should finish if (task->is_pending_finish()) { - VLOG_DEBUG << "Task pending" << task->debug_string(); iter++; } else { + VLOG_DEBUG << "Task pending" << task->debug_string(); _make_task_run(local_blocked_tasks, iter, PipelineTaskState::PENDING_FINISH); } } else if (task->query_context()->is_cancelled()) { @@ -272,7 +273,6 @@ void TaskScheduler::_do_work(size_t index) { LOG(WARNING) << fmt::format( "Pipeline task failed. query_id: {} reason: {}", PrintInstanceStandardInfo(task->query_context()->query_id(), - task->fragment_context()->get_fragment_id(), task->fragment_context()->get_fragment_instance_id()), status.msg()); // Print detail informations below when you debugging here. @@ -296,11 +296,22 @@ void TaskScheduler::_do_work(size_t index) { fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "finalize fail:" + status.msg()); } else { + VLOG_DEBUG << fmt::format( + "Try close task: {}, fragment_ctx->is_canceled(): {}", + PrintInstanceStandardInfo( + task->query_context()->query_id(), + task->fragment_context()->get_fragment_instance_id()), + fragment_ctx->is_canceled()); _try_close_task(task, fragment_ctx->is_canceled() ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED, status); } + VLOG_DEBUG << fmt::format( + "Task {} is eos, status {}.", + PrintInstanceStandardInfo(task->query_context()->query_id(), + task->fragment_context()->get_fragment_instance_id()), + get_state_name(task->get_state())); continue; } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 1fb68049e6..83b57c28fd 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -415,6 +415,8 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { } if (!rpc_status.ok()) { + LOG_INFO("Going to cancel instance {} since report exec status got rpc failed: {}", + print_id(req.fragment_instance_id), rpc_status.to_string()); // we need to cancel the execution of this fragment static_cast(req.update_fn(rpc_status)); req.cancel_fn(PPlanFragmentCancelReason::INTERNAL_ERROR, rpc_status.msg()); @@ -425,15 +427,13 @@ static void empty_function(RuntimeState*, Status*) {} void FragmentMgr::_exec_actual(std::shared_ptr fragment_executor, const FinishCallback& cb) { - std::string func_name {"PlanFragmentExecutor::_exec_actual"}; #ifndef BE_TEST SCOPED_ATTACH_TASK(fragment_executor->runtime_state()); #endif - LOG_INFO(func_name) - .tag("query_id", fragment_executor->query_id()) - .tag("instance_id", fragment_executor->fragment_instance_id()) - .tag("pthread_id", (uintptr_t)pthread_self()); + LOG_INFO("Instance {} executing", + PrintInstanceStandardInfo(fragment_executor->query_id(), + fragment_executor->fragment_instance_id())); Status st = fragment_executor->execute(); if (!st.ok()) { @@ -452,8 +452,13 @@ void FragmentMgr::_exec_actual(std::shared_ptr fragment_ex { std::lock_guard lock(_lock); _fragment_instance_map.erase(fragment_executor->fragment_instance_id()); + + LOG_INFO("Instance {} finished", + PrintInstanceStandardInfo(fragment_executor->query_id(), + fragment_executor->fragment_instance_id())); if (all_done && query_ctx) { _query_ctx_map.erase(query_ctx->query_id()); + LOG_INFO("Query {} finished", print_id(query_ctx->query_id())); } } @@ -560,11 +565,12 @@ void FragmentMgr::remove_pipeline_context( f_context->instance_ids(ins_ids); bool all_done = q_context->countdown(ins_ids.size()); for (const auto& ins_id : ins_ids) { - VLOG_DEBUG << "remove pipeline context " << print_id(ins_id) << ", all_done:" << all_done; + LOG_INFO("Removing query {} instance {}, all done? {}", print_id(query_id), + print_id(ins_id), all_done); _pipeline_map.erase(ins_id); } if (all_done) { - LOG(INFO) << "remove query context " << print_id(query_id); + LOG_INFO("Query {} finished", print_id(query_id)); _query_ctx_map.erase(query_id); } } @@ -800,11 +806,11 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan(); cur_span->SetAttribute("query_id", print_id(params.query_id)); - VLOG_ROW << "exec_plan_fragment params is " + VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment params is " << apache::thrift::ThriftDebugString(params).c_str(); // sometimes TExecPlanFragmentParams debug string is too long and glog // will truncate the log line, so print query options seperately for debuggin purpose - VLOG_ROW << "query options is " + VLOG_ROW << "query: " << print_id(params.query_id) << "query options is " << apache::thrift::ThriftDebugString(params.query_options).c_str(); std::shared_ptr query_ctx; @@ -1075,6 +1081,7 @@ void FragmentMgr::cancel_worker() { } for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { if (it->second->is_timeout(now)) { + LOG_INFO("Query {} is timeout", print_id(it->first)); it = _query_ctx_map.erase(it); } else { ++it; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index e96b086ad4..78d48327ab 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -56,6 +56,7 @@ #include "runtime/stream_load/stream_load_context.h" #include "runtime/thread_context.h" #include "util/container_util.hpp" +#include "util/debug_util.h" #include "util/defer_op.h" #include "util/pretty_printer.h" #include "util/telemetry/telemetry.h" @@ -264,10 +265,9 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { Status PlanFragmentExecutor::open() { int64_t mem_limit = _runtime_state->query_mem_tracker()->limit(); - LOG_INFO("PlanFragmentExecutor::open") - .tag("query_id", _query_ctx->query_id()) - .tag("instance_id", _runtime_state->fragment_instance_id()) - .tag("mem_limit", PrettyPrinter::print(mem_limit, TUnit::BYTES)); + LOG_INFO("PlanFragmentExecutor::open {}, mem_limit {}", + PrintInstanceStandardInfo(_query_ctx->query_id(), _fragment_instance_id), + PrettyPrinter::print(mem_limit, TUnit::BYTES)); // we need to start the profile-reporting thread before calling Open(), since it // may block @@ -592,11 +592,9 @@ void PlanFragmentExecutor::stop_report_thread() { void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { std::lock_guard l(_status_lock); - LOG_INFO("PlanFragmentExecutor::cancel") - .tag("query_id", print_id(_query_ctx->query_id())) - .tag("instance_id", print_id(_runtime_state->fragment_instance_id())) - .tag("reason", reason) - .tag("error message", msg); + LOG_INFO("PlanFragmentExecutor::cancel {} reason {} error msg {}", + PrintInstanceStandardInfo(query_id(), fragment_instance_id()), reason, msg); + // NOTE: Not need to check if already cancelled. // Bug scenario: test_array_map_function.groovy: // select /*+SET_VAR(experimental_enable_pipeline_engine=false)*/ array_map((x,y)->x+y, c_array1, c_array2) from test.array_test2 where id > 10 order by id @@ -680,8 +678,6 @@ void PlanFragmentExecutor::close() { } LOG(INFO) << ss.str(); } - LOG(INFO) << "Close() fragment_instance_id=" - << print_id(_runtime_state->fragment_instance_id()); } profile()->add_to_span(_span); diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index bb39596ff1..5426626da3 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -146,6 +146,8 @@ public: TUniqueId query_id() const { return _query_ctx->query_id(); } + int fragment_id() const { return _fragment_id; } + bool is_timeout(const VecDateTimeValue& now) const; bool is_canceled() { return _runtime_state->is_cancelled(); } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 4ec3517740..c51e92d14c 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -22,6 +22,7 @@ #include #include +#include #include #include "common/config.h" @@ -81,9 +82,10 @@ public: // it is found that query already exists in _query_ctx_map, and query mem tracker is not used. // query mem tracker consumption is not equal to 0 after use, because there is memory consumed // on query mem tracker, released on other trackers. + std::string mem_tracker_msg {""}; if (query_mem_tracker->peak_consumption() != 0) { - LOG(INFO) << fmt::format( - "Deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, " + mem_tracker_msg = fmt::format( + ", deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, " "PeakUsed={}", print_id(_query_id), MemTracker::print_bytes(query_mem_tracker->limit()), MemTracker::print_bytes(query_mem_tracker->consumption()), @@ -92,6 +94,8 @@ public: if (_task_group) { _task_group->remove_mem_tracker_limiter(query_mem_tracker); } + + LOG_INFO("Query {} deconstructed, {}", print_id(_query_id), mem_tracker_msg); } // Notice. For load fragments, the fragment_num sent by FE has a small probability of 0. diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 65256b98dd..29ef581947 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -181,7 +181,7 @@ public: // Create a error status, so that we could print error stack, and // we could know which path call cancel. LOG(WARNING) << "Task is cancelled, instance: " - << PrintInstanceStandardInfo(_query_id, _fragment_id, _fragment_instance_id) + << PrintInstanceStandardInfo(_query_id, _fragment_instance_id) << " st = " << Status::Error(msg); } diff --git a/be/src/util/debug_util.cpp b/be/src/util/debug_util.cpp index 2d44c281a5..68ea21f02d 100644 --- a/be/src/util/debug_util.cpp +++ b/be/src/util/debug_util.cpp @@ -128,9 +128,9 @@ std::string PrintFrontendInfo(const TFrontendInfo& fe_info) { return ss.str(); } -std::string PrintInstanceStandardInfo(const TUniqueId& qid, const int fid, const TUniqueId& iid) { +std::string PrintInstanceStandardInfo(const TUniqueId& qid, const TUniqueId& iid) { std::stringstream ss; - ss << print_id(iid) << '|' << fid << '|' << print_id(qid); + ss << print_id(iid) << '|' << print_id(qid); return ss.str(); } diff --git a/be/src/util/debug_util.h b/be/src/util/debug_util.h index e6b6491b8a..31cc1f8f5c 100644 --- a/be/src/util/debug_util.h +++ b/be/src/util/debug_util.h @@ -38,7 +38,7 @@ std::string PrintFrontendInfos(const std::vector& fe_infos); // A desirable scenario would be to call this function WHENEVER whenever we need to print instance information. // By using a fixed format, we would be able to identify all the paths in which this instance is executed. // InstanceId|FragmentIdx|QueryId -std::string PrintInstanceStandardInfo(const TUniqueId& qid, const int fid, const TUniqueId& iid); +std::string PrintInstanceStandardInfo(const TUniqueId& qid, const TUniqueId& iid); // Returns a string " (build )" // If compact == false, this string is appended: "\nBuilt on " diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index 5c87059c3d..19db8ca15b 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -60,7 +60,7 @@ std::shared_ptr VDataStreamMgr::create_recvr( PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, bool is_merging, std::shared_ptr sub_plan_query_statistics_recvr) { DCHECK(profile != nullptr); - VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id + VLOG_FILE << "creating receiver for fragment=" << print_id(fragment_instance_id) << ", node=" << dest_node_id; std::shared_ptr recvr(new VDataStreamRecvr( this, state, row_desc, fragment_instance_id, dest_node_id, num_senders, is_merging, @@ -75,7 +75,8 @@ std::shared_ptr VDataStreamMgr::create_recvr( std::shared_ptr VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id, bool acquire_lock) { - VLOG_ROW << "looking up fragment_instance_id=" << fragment_instance_id << ", node=" << node_id; + VLOG_ROW << "looking up fragment_instance_id=" << print_id(fragment_instance_id) + << ", node=" << node_id; size_t hash_value = get_hash_value(fragment_instance_id, node_id); // Create lock guard and not own lock currently and will lock conditionally std::unique_lock recvr_lock(_lock, std::defer_lock); @@ -141,7 +142,7 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id) { std::shared_ptr targert_recvr; - VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" << fragment_instance_id + VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" << print_id(fragment_instance_id) << ", node=" << node_id; size_t hash_value = get_hash_value(fragment_instance_id, node_id); { @@ -168,7 +169,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, P return Status::OK(); } else { std::stringstream err; - err << "unknown row receiver id: fragment_instance_id=" << fragment_instance_id + err << "unknown row receiver id: fragment_instance_id=" << print_id(fragment_instance_id) << " node_id=" << node_id; LOG(ERROR) << err.str(); return Status::InternalError(err.str()); @@ -176,7 +177,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, P } void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status exec_status) { - VLOG_QUERY << "cancelling all streams for fragment=" << fragment_instance_id; + VLOG_QUERY << "cancelling all streams for fragment=" << print_id(fragment_instance_id); std::vector> recvrs; { std::lock_guard l(_lock); @@ -187,7 +188,7 @@ void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status exec_s if (recvr == nullptr) { // keep going but at least log it std::stringstream err; - err << "cancel(): missing in stream_map: fragment=" << i->first + err << "cancel(): missing in stream_map: fragment=" << print_id(i->first) << " node=" << i->second; LOG(ERROR) << err.str(); } else { diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 17f3975645..b842cbeea4 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -69,7 +69,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { std::unique_lock l(_lock); // wait until something shows up or we know we're done while (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) { - VLOG_ROW << "wait arrival fragment_instance_id=" << _recvr->fragment_instance_id() + VLOG_ROW << "wait arrival fragment_instance_id=" << print_id(_recvr->fragment_instance_id()) << " node=" << _recvr->dest_node_id(); // Don't count time spent waiting on the sender as active time. CANCEL_SAFE_SCOPED_TIMER(_recvr->_data_arrival_timer, &_is_cancelled); @@ -263,8 +263,9 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) { _sender_eos_set.insert(be_number); DCHECK_GT(_num_remaining_senders, 0); _num_remaining_senders--; - VLOG_FILE << "decremented senders: fragment_instance_id=" << _recvr->fragment_instance_id() - << " node_id=" << _recvr->dest_node_id() << " #senders=" << _num_remaining_senders; + VLOG_FILE << "decremented senders: fragment_instance_id=" + << print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id() + << " #senders=" << _num_remaining_senders; if (_num_remaining_senders == 0) { if (_dependency) { _dependency->set_always_done(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java index bb9e1c8907..70ff5da18f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -31,6 +31,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -63,6 +64,8 @@ public class ExecutionProfile { // instance id -> dummy value private MarkedCountDownLatch profileDoneSignal; + private int waitCount = 0; + private TUniqueId queryId; public ExecutionProfile(TUniqueId queryId, int fragmentNum) { @@ -127,7 +130,11 @@ public class ExecutionProfile { public void markOneInstanceDone(TUniqueId fragmentInstanceId) { if (profileDoneSignal != null) { - profileDoneSignal.markedCountDown(fragmentInstanceId, -1L); + if (profileDoneSignal.markedCountDown(fragmentInstanceId, -1L)) { + LOG.info("Mark instance {} done succeed", DebugUtil.printId(fragmentInstanceId)); + } else { + LOG.warn("Mark instance {} done failed", DebugUtil.printId(fragmentInstanceId)); + } } } @@ -135,6 +142,16 @@ public class ExecutionProfile { if (profileDoneSignal == null) { return true; } + + waitCount++; + + for (Entry entry : profileDoneSignal.getLeftMarks()) { + if (waitCount > 2) { + LOG.info("Query {} waiting instance {}, waitCount: {}", + DebugUtil.printId(queryId), DebugUtil.printId(entry.getKey()), waitCount); + } + } + return profileDoneSignal.await(waitTimeS, TimeUnit.SECONDS); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java index 994f5e3605..82031e32ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java @@ -105,7 +105,7 @@ public class Transaction { coordinator.exec(); int execTimeout = ctx.getExecTimeout(); - LOG.debug("Insert execution timeout:{}", execTimeout); + LOG.info("Insert {} execution timeout:{}", DebugUtil.printId(ctx.queryId()), execTimeout); boolean notTimeout = coordinator.join(execTimeout); if (!coordinator.isDone()) { coordinator.cancel(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index c84fb90f2a..752dedf801 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -530,13 +530,18 @@ public class Coordinator implements CoordInterface { this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend(); if (LOG.isDebugEnabled()) { - LOG.debug("Query {} idToBackend size={}", DebugUtil.printId(queryId), idToBackend.size()); + int backendNum = idToBackend.size(); + StringBuilder backendInfos = new StringBuilder("backends info:"); for (Map.Entry entry : idToBackend.entrySet()) { Long backendID = entry.getKey(); Backend backend = entry.getValue(); - LOG.debug("Query {}, backend: {}-{}-{}-{}", DebugUtil.printId(queryId), - backendID, backend.getHost(), backend.getBePort(), backend.getProcessEpoch()); + backendInfos.append(' ').append(backendID).append("-") + .append(backend.getHost()).append("-") + .append(backend.getBePort()).append("-") + .append(backend.getProcessEpoch()); } + LOG.debug("query {}, backend size: {}, {}", + DebugUtil.printId(queryId), backendNum, backendInfos.toString()); } } @@ -631,7 +636,7 @@ public class Coordinator implements CoordInterface { resultInternalServiceAddr = toBrpcHost(execBeAddr); resultOutputExprs = fragments.get(0).getOutputExprs(); if (LOG.isDebugEnabled()) { - LOG.debug("dispatch query job: {} to {}", DebugUtil.printId(queryId), + LOG.debug("dispatch result sink of query {} to {}", DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host); } @@ -857,7 +862,8 @@ public class Coordinator implements CoordInterface { } } - // 3. group BackendExecState by BE. So that we can use one RPC to send all fragment instances of a BE. + // 3. group PipelineExecContext by BE. + // So that we can use one RPC to send all fragment instances of a BE. for (Map.Entry entry : tParams.entrySet()) { Long backendId = this.addressToBackendID.get(entry.getKey()); PipelineExecContext pipelineExecContext = new PipelineExecContext(fragment.getFragmentId(), @@ -905,6 +911,16 @@ public class Coordinator implements CoordInterface { span = ConnectContext.get().getTracer().spanBuilder("execRemoteFragmentsAsync") .setParent(parentSpanContext).setSpanKind(SpanKind.CLIENT).startSpan(); } + + if (LOG.isDebugEnabled()) { + String infos = ""; + for (PipelineExecContext pec : ctxs.ctxs) { + infos += pec.fragmentId + " "; + } + LOG.debug("query {}, sending pipeline fragments: {} to be {} bprc address {}", + DebugUtil.printId(queryId), infos, ctxs.beId, ctxs.brpcAddr.toString()); + } + ctxs.scopedSpan = new ScopedSpan(span); ctxs.unsetFields(); BackendServiceProxy proxy = BackendServiceProxy.getInstance(); @@ -2415,9 +2431,12 @@ public class Coordinator implements CoordInterface { if (params.isSetErrorTabletInfos()) { updateErrorTabletInfos(params.getErrorTabletInfos()); } + Preconditions.checkArgument(params.isSetDetailedReport()); for (TDetailedReportParams param : params.detailed_report) { if (ctx.fragmentInstancesMap.get(param.fragment_instance_id).getIsDone()) { + LOG.debug("Query {} instance {} is marked done", + DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId())); executionProfile.markOneInstanceDone(param.getFragmentInstanceId()); } } @@ -2442,9 +2461,11 @@ public class Coordinator implements CoordInterface { // and returned_all_results_ is true. // (UpdateStatus() initiates cancellation, if it hasn't already been initiated) if (!(returnedAllResults && status.isCancelled()) && !status.ok()) { - LOG.warn("one instance report fail, query_id={} instance_id={}, error message: {}", - DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()), - status.getErrorMsg()); + LOG.warn("one instance report fail, query_id={} fragment_id={} instance_id={}, be={}," + + " error message: {}", + DebugUtil.printId(queryId), params.getFragmentId(), + DebugUtil.printId(params.getFragmentInstanceId()), + params.getBackendId(), status.getErrorMsg()); updateStatus(status, params.getFragmentInstanceId()); } if (ctx.fragmentInstancesMap.get(params.fragment_instance_id).getIsDone()) { @@ -2466,11 +2487,17 @@ public class Coordinator implements CoordInterface { if (params.isSetErrorTabletInfos()) { updateErrorTabletInfos(params.getErrorTabletInfos()); } + LOG.debug("Query {} instance {} is marked done", + DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId())); executionProfile.markOneInstanceDone(params.getFragmentInstanceId()); + } else { + LOG.debug("Query {} instance {} is not marked done", + DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId())); } } else { if (params.backend_num >= backendExecStates.size()) { - LOG.warn("unknown backend number: {}, expected less than: {}", + LOG.warn("Query {} instance {} unknown backend number: {}, expected less than: {}", + DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()), params.backend_num, backendExecStates.size()); return; } @@ -2518,7 +2545,12 @@ public class Coordinator implements CoordInterface { if (params.isSetErrorTabletInfos()) { updateErrorTabletInfos(params.getErrorTabletInfos()); } + LOG.info("Query {} instance {} is marked done", + DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId())); executionProfile.markOneInstanceDone(params.getFragmentInstanceId()); + } else { + LOG.info("Query {} instance {} is not marked done", + DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId())); } } @@ -3121,9 +3153,10 @@ public class Coordinator implements CoordInterface { for (TPipelineInstanceParams localParam : rpcParams.local_params) { if (LOG.isDebugEnabled()) { LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} backend: {}," - + " fragment instance id={}, reason: {}", + + " fragment instance id={} query={}, reason: {}", this.initiated, this.done, this.hasCanceled, backend.getId(), - DebugUtil.printId(localParam.fragment_instance_id), cancelReason.name()); + DebugUtil.printId(localParam.fragment_instance_id), + DebugUtil.printId(queryId), cancelReason.name()); } RuntimeProfile profile = fragmentInstancesMap.get(localParam.fragment_instance_id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index f6eaaea254..15c93411f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -140,9 +140,8 @@ public final class QeProcessorImpl implements QeProcessor { public void unregisterQuery(TUniqueId queryId) { QueryInfo queryInfo = coordinatorMap.remove(queryId); if (queryInfo != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("deregister query id {}", DebugUtil.printId(queryId)); - } + LOG.info("Deregister query id {}", DebugUtil.printId(queryId)); + if (queryInfo.getConnectContext() != null && !Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser()) ) { @@ -160,9 +159,7 @@ public final class QeProcessorImpl implements QeProcessor { } } } else { - if (LOG.isDebugEnabled()) { - LOG.debug("not found query {} when unregisterQuery", DebugUtil.printId(queryId)); - } + LOG.warn("not found query {} when unregisterQuery", DebugUtil.printId(queryId)); } // commit hive tranaction if needed @@ -193,6 +190,10 @@ public final class QeProcessorImpl implements QeProcessor { @Override public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr) { + LOG.info("Processing report exec status, query {} instance {} from {}", + DebugUtil.printId(params.query_id), DebugUtil.printId(params.fragment_instance_id), + beAddr.toString()); + if (params.isSetProfile()) { LOG.info("ReportExecStatus(): fragment_instance_id={}, query id={}, backend num: {}, ip: {}", DebugUtil.printId(params.fragment_instance_id), DebugUtil.printId(params.query_id), @@ -219,7 +220,8 @@ public final class QeProcessorImpl implements QeProcessor { writeProfileExecutor.submit(new WriteProfileTask(params, info)); } } catch (Exception e) { - LOG.warn(e.getMessage()); + LOG.warn("Report response: {}, query: {}, instance: {}", result.toString(), + DebugUtil.printId(params.query_id), DebugUtil.printId(params.fragment_instance_id)); return result; } result.setStatus(new TStatus(TStatusCode.OK)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java index b0c4d70ca7..519fb6f9bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java @@ -204,7 +204,6 @@ public class SimpleScheduler { try { Thread.sleep(1000L); SystemInfoService clusterInfoService = Env.getCurrentSystemInfo(); - LOG.debug("UpdateBlacklistThread retry begin"); Iterator>> iterator = blacklistBackends.entrySet().iterator(); while (iterator.hasNext()) { @@ -227,9 +226,6 @@ public class SimpleScheduler { } } } - - LOG.debug("UpdateBlacklistThread retry end"); - } catch (Throwable ex) { LOG.warn("blacklist thread exception", ex); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index ee97429691..92fd39f89f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1353,6 +1353,8 @@ public class StmtExecutor { // Process a select statement. private void handleQueryStmt() throws Exception { + LOG.info("Handling query {} with query id {}", + originStmt.originStmt, DebugUtil.printId(context.queryId)); // Every time set no send flag and clean all data in buffer context.getMysqlChannel().reset(); Queriable queryStmt = (Queriable) parsedStmt; @@ -1369,6 +1371,7 @@ public class StmtExecutor { if (queryStmt.isExplain()) { String explainString = planner.getExplainString(queryStmt.getExplainOptions()); handleExplainStmt(explainString, false); + LOG.info("Query {} finished", DebugUtil.printId(context.queryId)); return; } @@ -1376,6 +1379,7 @@ public class StmtExecutor { Optional resultSet = planner.handleQueryInFe(parsedStmt); if (resultSet.isPresent()) { sendResultSet(resultSet.get()); + LOG.info("Query {} finished", DebugUtil.printId(context.queryId)); return; } @@ -1389,6 +1393,7 @@ public class StmtExecutor { && context.getSessionVariable().getDefaultOrderByLimit() < 0) { if (queryStmt instanceof QueryStmt || queryStmt instanceof LogicalPlanAdapter) { handleCacheStmt(cacheAnalyzer, channel); + LOG.info("Query {} finished", DebugUtil.printId(context.queryId)); return; } } @@ -1400,11 +1405,13 @@ public class StmtExecutor { LOG.info("ignore handle limit 0 ,sql:{}", parsedSelectStmt.toSql()); sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs())); context.getState().setEof(); + LOG.info("Query {} finished", DebugUtil.printId(context.queryId)); return; } } sendResult(isOutfileQuery, false, queryStmt, channel, null, null); + LOG.info("Query {} finished", DebugUtil.printId(context.queryId)); } private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable queryStmt, MysqlChannel channel, @@ -1893,10 +1900,10 @@ public class StmtExecutor { coord.exec(); int execTimeout = context.getExecTimeout(); - LOG.debug("Insert execution timeout:{}", execTimeout); + LOG.debug("Insert {} execution timeout:{}", DebugUtil.printId(context.queryId()), execTimeout); boolean notTimeout = coord.join(execTimeout); if (!coord.isDone()) { - coord.cancel(); + coord.cancel(Types.PPlanFragmentCancelReason.TIMEOUT); if (notTimeout) { errMsg = coord.getExecStatus().getErrorMsg(); ErrorReport.reportDdlException("There exists unhealthy backend. "