@ -208,9 +208,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
|
||||
|
||||
auto* fragment_context = this;
|
||||
|
||||
LOG_INFO("Preparing instance {}, backend_num {}",
|
||||
PrintInstanceStandardInfo(_query_id, local_params.fragment_instance_id),
|
||||
local_params.backend_num);
|
||||
LOG_INFO("Preparing instance {}|{}, backend_num {}", print_id(_query_id),
|
||||
print_id(local_params.fragment_instance_id), local_params.backend_num);
|
||||
|
||||
// 1. init _runtime_state
|
||||
_runtime_state = RuntimeState::create_unique(
|
||||
@ -754,9 +753,8 @@ void PipelineFragmentContext::close_if_prepare_failed() {
|
||||
}
|
||||
for (auto& task : _tasks) {
|
||||
DCHECK(!task->is_pending_finish());
|
||||
std::stringstream msg;
|
||||
msg << "query " << print_id(_query_id) << " closed since prepare failed";
|
||||
WARN_IF_ERROR(task->close(Status::OK()), msg.str());
|
||||
WARN_IF_ERROR(task->close(Status::OK()),
|
||||
fmt::format("Query {} closed since prepare failed", print_id(_query_id)));
|
||||
close_a_pipeline();
|
||||
}
|
||||
}
|
||||
|
||||
@ -121,9 +121,9 @@ void BlockedTaskScheduler::_schedule() {
|
||||
if (state == PipelineTaskState::PENDING_FINISH) {
|
||||
// should cancel or should finish
|
||||
if (task->is_pending_finish()) {
|
||||
VLOG_DEBUG << "Task pending" << task->debug_string();
|
||||
iter++;
|
||||
} else {
|
||||
VLOG_DEBUG << "Task pending" << task->debug_string();
|
||||
_make_task_run(local_blocked_tasks, iter, PipelineTaskState::PENDING_FINISH);
|
||||
}
|
||||
} else if (task->query_context()->is_cancelled()) {
|
||||
|
||||
@ -426,9 +426,8 @@ void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
|
||||
SCOPED_ATTACH_TASK(fragment_executor->runtime_state());
|
||||
#endif
|
||||
|
||||
LOG_INFO("Instance {} executing",
|
||||
PrintInstanceStandardInfo(fragment_executor->query_id(),
|
||||
fragment_executor->fragment_instance_id()));
|
||||
VLOG_DEBUG << fmt::format("Instance {}|{} executing", print_id(fragment_executor->query_id()),
|
||||
print_id(fragment_executor->fragment_instance_id()));
|
||||
|
||||
Status st = fragment_executor->execute();
|
||||
if (!st.ok()) {
|
||||
@ -448,9 +447,8 @@ void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
|
||||
std::lock_guard<std::mutex> lock(_lock);
|
||||
_fragment_instance_map.erase(fragment_executor->fragment_instance_id());
|
||||
|
||||
LOG_INFO("Instance {} finished",
|
||||
PrintInstanceStandardInfo(fragment_executor->query_id(),
|
||||
fragment_executor->fragment_instance_id()));
|
||||
LOG_INFO("Instance {} finished", print_id(fragment_executor->fragment_instance_id()));
|
||||
|
||||
if (all_done && query_ctx) {
|
||||
_query_ctx_map.erase(query_ctx->query_id());
|
||||
LOG_INFO("Query {} finished", print_id(query_ctx->query_id()));
|
||||
@ -1081,7 +1079,7 @@ void FragmentMgr::cancel_worker() {
|
||||
}
|
||||
for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) {
|
||||
if (it->second->is_timeout(now)) {
|
||||
LOG_INFO("Query {} is timeout", print_id(it->first));
|
||||
LOG_WARNING("Query {} is timeout", print_id(it->first));
|
||||
it = _query_ctx_map.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
|
||||
@ -20,6 +20,7 @@
|
||||
|
||||
#include "runtime/plan_fragment_executor.h"
|
||||
|
||||
#include <fmt/core.h>
|
||||
#include <gen_cpp/FrontendService_types.h>
|
||||
#include <gen_cpp/Metrics_types.h>
|
||||
#include <gen_cpp/PlanNodes_types.h>
|
||||
@ -256,9 +257,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
|
||||
|
||||
Status PlanFragmentExecutor::open() {
|
||||
int64_t mem_limit = _runtime_state->query_mem_tracker()->limit();
|
||||
LOG_INFO("PlanFragmentExecutor::open {}, mem_limit {}",
|
||||
PrintInstanceStandardInfo(_query_ctx->query_id(), _fragment_instance_id),
|
||||
PrettyPrinter::print(mem_limit, TUnit::BYTES));
|
||||
LOG_INFO("PlanFragmentExecutor::open {}|{}, mem_limit {}", print_id(_query_ctx->query_id()),
|
||||
print_id(_fragment_instance_id), PrettyPrinter::print(mem_limit, TUnit::BYTES));
|
||||
|
||||
// we need to start the profile-reporting thread before calling Open(), since it
|
||||
// may block
|
||||
|
||||
@ -146,8 +146,6 @@ public:
|
||||
|
||||
TUniqueId query_id() const { return _query_ctx->query_id(); }
|
||||
|
||||
int fragment_id() const { return _fragment_id; }
|
||||
|
||||
bool is_timeout(const VecDateTimeValue& now) const;
|
||||
|
||||
bool is_canceled() { return _runtime_state->is_cancelled(); }
|
||||
|
||||
@ -22,7 +22,6 @@
|
||||
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
#include "common/config.h"
|
||||
|
||||
@ -27,6 +27,7 @@
|
||||
#include <utility>
|
||||
|
||||
#include "common/version_internal.h"
|
||||
#include "fmt/core.h"
|
||||
#include "util/uid_util.h"
|
||||
|
||||
namespace doris {
|
||||
@ -129,9 +130,7 @@ std::string PrintFrontendInfo(const TFrontendInfo& fe_info) {
|
||||
}
|
||||
|
||||
std::string PrintInstanceStandardInfo(const TUniqueId& qid, const TUniqueId& iid) {
|
||||
std::stringstream ss;
|
||||
ss << print_id(iid) << '|' << print_id(qid);
|
||||
return ss.str();
|
||||
return fmt::format("{}|{}", print_id(iid), print_id(qid));
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -34,7 +34,6 @@ import org.apache.logging.log4j.Logger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -67,10 +66,6 @@ public class ExecutionProfile {
|
||||
// instance id -> dummy value
|
||||
private MarkedCountDownLatch<TUniqueId, Long> profileDoneSignal;
|
||||
|
||||
private int waitCount = 0;
|
||||
|
||||
private TUniqueId queryId;
|
||||
|
||||
public ExecutionProfile(TUniqueId queryId, int fragmentNum) {
|
||||
executionProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId));
|
||||
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
|
||||
@ -82,7 +77,6 @@ public class ExecutionProfile {
|
||||
}
|
||||
loadChannelProfile = new RuntimeProfile("LoadChannels");
|
||||
executionProfile.addChild(loadChannelProfile);
|
||||
this.queryId = queryId;
|
||||
}
|
||||
|
||||
public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> planNodeMap) {
|
||||
@ -148,16 +142,12 @@ public class ExecutionProfile {
|
||||
if (profileDoneSignal != null) {
|
||||
// count down to zero to notify all objects waiting for this
|
||||
profileDoneSignal.countDownToZero(new Status());
|
||||
LOG.info("Query {} unfinished instance: {}", DebugUtil.printId(queryId), profileDoneSignal.getLeftMarks()
|
||||
.stream().map(e -> DebugUtil.printId(e.getKey())).toArray());
|
||||
}
|
||||
}
|
||||
|
||||
public void markOneInstanceDone(TUniqueId fragmentInstanceId) {
|
||||
if (profileDoneSignal != null) {
|
||||
if (profileDoneSignal.markedCountDown(fragmentInstanceId, -1L)) {
|
||||
LOG.info("Mark instance {} done succeed", DebugUtil.printId(fragmentInstanceId));
|
||||
} else {
|
||||
if (!profileDoneSignal.markedCountDown(fragmentInstanceId, -1L)) {
|
||||
LOG.warn("Mark instance {} done failed", DebugUtil.printId(fragmentInstanceId));
|
||||
}
|
||||
}
|
||||
@ -167,16 +157,6 @@ public class ExecutionProfile {
|
||||
if (profileDoneSignal == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
waitCount++;
|
||||
|
||||
for (Entry<TUniqueId, Long> entry : profileDoneSignal.getLeftMarks()) {
|
||||
if (waitCount > 2) {
|
||||
LOG.info("Query {} waiting instance {}, waitCount: {}",
|
||||
DebugUtil.printId(queryId), DebugUtil.printId(entry.getKey()), waitCount);
|
||||
}
|
||||
}
|
||||
|
||||
return profileDoneSignal.await(waitTimeS, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
|
||||
@ -106,7 +106,7 @@ public class Transaction {
|
||||
|
||||
coordinator.exec();
|
||||
int execTimeout = ctx.getExecTimeout();
|
||||
LOG.info("Insert {} execution timeout:{}", DebugUtil.printId(ctx.queryId()), execTimeout);
|
||||
LOG.debug("Insert {} execution timeout:{}", DebugUtil.printId(ctx.queryId()), execTimeout);
|
||||
boolean notTimeout = coordinator.join(execTimeout);
|
||||
if (!coordinator.isDone()) {
|
||||
coordinator.cancel();
|
||||
|
||||
@ -2532,12 +2532,7 @@ public class Coordinator implements CoordInterface {
|
||||
if (params.isSetErrorTabletInfos()) {
|
||||
updateErrorTabletInfos(params.getErrorTabletInfos());
|
||||
}
|
||||
LOG.info("Query {} instance {} is marked done",
|
||||
DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()));
|
||||
executionProfile.markOneInstanceDone(params.getFragmentInstanceId());
|
||||
} else {
|
||||
LOG.info("Query {} instance {} is not marked done",
|
||||
DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -140,7 +140,9 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
public void unregisterQuery(TUniqueId queryId) {
|
||||
QueryInfo queryInfo = coordinatorMap.remove(queryId);
|
||||
if (queryInfo != null) {
|
||||
LOG.info("Deregister query id {}", DebugUtil.printId(queryId));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Deregister query id {}", DebugUtil.printId(queryId));
|
||||
}
|
||||
|
||||
if (queryInfo.getConnectContext() != null
|
||||
&& !Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
|
||||
@ -159,7 +161,9 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG.warn("not found query {} when unregisterQuery", DebugUtil.printId(queryId));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("not found query {} when unregisterQuery", DebugUtil.printId(queryId));
|
||||
}
|
||||
}
|
||||
|
||||
// commit hive tranaction if needed
|
||||
@ -190,10 +194,6 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
|
||||
@Override
|
||||
public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr) {
|
||||
LOG.info("Processing report exec status, query {} instance {} from {}",
|
||||
DebugUtil.printId(params.query_id), DebugUtil.printId(params.fragment_instance_id),
|
||||
beAddr.toString());
|
||||
|
||||
if (params.isSetProfile()) {
|
||||
LOG.info("ReportExecStatus(): fragment_instance_id={}, query id={}, backend num: {}, ip: {}",
|
||||
DebugUtil.printId(params.fragment_instance_id), DebugUtil.printId(params.query_id),
|
||||
@ -220,7 +220,7 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
writeProfileExecutor.submit(new WriteProfileTask(params, info));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Report response: {}, query: {}, instance: {}", result.toString(),
|
||||
LOG.warn("Exception during handle report, response: {}, query: {}, instance: {}", result.toString(),
|
||||
DebugUtil.printId(params.query_id), DebugUtil.printId(params.fragment_instance_id));
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -1363,12 +1363,16 @@ public class StmtExecutor {
|
||||
|
||||
// Process a select statement.
|
||||
private void handleQueryStmt() throws Exception {
|
||||
LOG.info("Handling query {} with query id {}",
|
||||
originStmt.originStmt, DebugUtil.printId(context.queryId));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Handling query {} with query id {}",
|
||||
originStmt.originStmt, DebugUtil.printId(context.queryId));
|
||||
}
|
||||
|
||||
if (context.getConnectType() == ConnectType.MYSQL) {
|
||||
// Every time set no send flag and clean all data in buffer
|
||||
context.getMysqlChannel().reset();
|
||||
}
|
||||
|
||||
Queriable queryStmt = (Queriable) parsedStmt;
|
||||
|
||||
QueryDetail queryDetail = new QueryDetail(context.getStartTime(),
|
||||
@ -1419,7 +1423,10 @@ public class StmtExecutor {
|
||||
if (channel != null && parsedStmt instanceof SelectStmt) {
|
||||
SelectStmt parsedSelectStmt = (SelectStmt) parsedStmt;
|
||||
if (parsedSelectStmt.getLimit() == 0) {
|
||||
LOG.info("ignore handle limit 0 ,sql:{}", parsedSelectStmt.toSql());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("ignore handle limit 0 ,sql:{}", parsedSelectStmt.toSql());
|
||||
}
|
||||
|
||||
sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
|
||||
context.getState().setEof();
|
||||
LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
|
||||
|
||||
Reference in New Issue
Block a user