[Log](query) Add log of fragment instance num for query (#20597)
Co-authored-by: weizuo <weizuo@xiaomi.com>
This commit is contained in:
@ -187,6 +187,7 @@ public class Coordinator {
|
||||
private final Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap = Maps.newHashMap();
|
||||
|
||||
private final List<PlanFragment> fragments;
|
||||
private int instanceTotalNum;
|
||||
|
||||
private Map<Long, BackendExecStates> beToExecStates = Maps.newHashMap();
|
||||
private Map<Long, PipelineExecContexts> beToPipelineExecCtxs = Maps.newHashMap();
|
||||
@ -482,6 +483,10 @@ public class Coordinator {
|
||||
return result;
|
||||
}
|
||||
|
||||
public int getInstanceTotalNum() {
|
||||
return instanceTotalNum;
|
||||
}
|
||||
|
||||
// Initialize
|
||||
private void prepare() {
|
||||
for (PlanFragment fragment : fragments) {
|
||||
@ -664,6 +669,7 @@ public class Coordinator {
|
||||
// 1. set up exec states
|
||||
int instanceNum = params.instanceExecParams.size();
|
||||
Preconditions.checkState(instanceNum > 0);
|
||||
instanceTotalNum += instanceNum;
|
||||
List<TExecPlanFragmentParams> tParams = params.toThrift(backendIdx);
|
||||
|
||||
// 2. update memory limit for colocate join
|
||||
|
||||
@ -1371,6 +1371,18 @@ public class StmtExecutor {
|
||||
}
|
||||
profile.getSummaryProfile().setQueryScheduleFinishTime();
|
||||
updateProfile(false);
|
||||
if (coord.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
|
||||
try {
|
||||
LOG.debug("Start to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}",
|
||||
context.getQualifiedUser(), context.getDatabase(),
|
||||
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
|
||||
coord.getInstanceTotalNum());
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Fail to print fragment concurrency for Query.", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Span fetchResultSpan = context.getTracer().spanBuilder("fetch result").setParent(Context.current()).startSpan();
|
||||
try (Scope scope = fetchResultSpan.makeCurrent()) {
|
||||
while (true) {
|
||||
@ -1449,6 +1461,16 @@ public class StmtExecutor {
|
||||
throw e;
|
||||
} finally {
|
||||
fetchResultSpan.end();
|
||||
if (coord.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
|
||||
try {
|
||||
LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}",
|
||||
context.getQualifiedUser(), context.getDatabase(),
|
||||
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
|
||||
coord.getInstanceTotalNum());
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Fail to print fragment concurrency for Query.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user