[feature](pipelineX) add node id and profilev2 in pipelineX (#25084)

This commit is contained in:
Mryange
2023-10-10 09:09:26 +08:00
committed by GitHub
parent b58010c48e
commit 90ad48cdb7
16 changed files with 137 additions and 49 deletions

View File

@ -63,7 +63,7 @@ public class Profile {
}
public synchronized void update(long startTime, Map<String, String> summaryInfo, boolean isFinished,
int profileLevel, Planner planner) {
int profileLevel, Planner planner, boolean isPipelineX) {
if (this.isFinished) {
return;
}
@ -74,6 +74,7 @@ public class Profile {
rootProfile.computeTimeInProfile();
rootProfile.setPlaner(planner);
rootProfile.setProfileLevel(profileLevel);
rootProfile.setIsPipelineX(isPipelineX);
ProfileManager.getInstance().pushProfile(rootProfile);
this.isFinished = isFinished;
}

View File

@ -18,6 +18,7 @@
package org.apache.doris.common.util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
/**
@ -35,11 +36,14 @@ public class ProfileStatistics {
private boolean isDataSink;
public ProfileStatistics() {
private boolean isPipelineX;
public ProfileStatistics(boolean isPipelineX) {
statisticalInfo = new HashMap<Integer, ArrayList<String>>();
fragmentInfo = new HashMap<Integer, ArrayList<String>>();
fragmentId = 0;
isDataSink = false;
this.isPipelineX = isPipelineX;
}
private void addPlanNodeInfo(int id, String info) {
@ -56,11 +60,20 @@ public class ProfileStatistics {
fragmentInfo.get(fragmentId).add(info);
}
public void addInfoFromProfile(RuntimeProfile profile, String info) {
if (isDataSink) {
addDataSinkInfo(info);
public void addInfoFromProfile(RuntimeProfile profile, String name, String info) {
if (isPipelineX) {
if (profile.sinkOperator()) {
name = name + "(Sink)";
} else {
name = name + "(Operator)";
}
addPlanNodeInfo(profile.nodeId(), name + ": " + info);
} else {
addPlanNodeInfo(profile.nodeId(), info);
if (isDataSink) {
addDataSinkInfo(name + ": " + info);
} else {
addPlanNodeInfo(profile.nodeId(), name + ": " + info);
}
}
}
@ -73,6 +86,7 @@ public class ProfileStatistics {
return;
}
ArrayList<String> infos = statisticalInfo.get(id);
Collections.sort(infos);
for (String info : infos) {
str.append(prefix + info + "\n");
}
@ -83,6 +97,7 @@ public class ProfileStatistics {
return;
}
ArrayList<String> infos = fragmentInfo.get(fragmentIdx);
Collections.sort(infos);
for (String info : infos) {
str.append(prefix + info + "\n");
}

View File

@ -75,6 +75,13 @@ public class RuntimeProfile {
private Boolean isDone = false;
private Boolean isCancel = false;
// In pipelineX, we have explicitly split the Operator into sink and operator,
// and we can distinguish them using tags.
// In the old pipeline, we can only differentiate them based on their position
// in the profile, which is quite tricky and only transitional.
private Boolean isPipelineX = false;
private Boolean isSinkOperator = false;
private int profileLevel = 3;
private Planner planner = null;
private int nodeid = -1;
@ -119,6 +126,14 @@ public class RuntimeProfile {
return this.nodeid;
}
public Boolean sinkOperator() {
return this.isSinkOperator;
}
public void setIsPipelineX(boolean isPipelineX) {
this.isPipelineX = isPipelineX;
}
public Map<String, Counter> getCounterMap() {
return counterMap;
}
@ -181,6 +196,9 @@ public class RuntimeProfile {
if (node.isSetMetadata()) {
this.nodeid = (int) node.getMetadata();
}
if (node.isSetIsSink()) {
this.isSinkOperator = node.is_sink;
}
Preconditions.checkState(timestamp == -1 || node.timestamp != -1);
// update this level's counters
if (node.counters != null) {
@ -483,11 +501,10 @@ public class RuntimeProfile {
long countNumber = rhsCounter.size() + 1;
if (newCounter.getValue() > 0) {
newCounter.divValue(countNumber);
String infoString = counterName + ": "
+ AVG_TIME_PRE + printCounter(newCounter.getValue(), newCounter.getType()) + ", "
String infoString = AVG_TIME_PRE + printCounter(newCounter.getValue(), newCounter.getType()) + ", "
+ MAX_TIME_PRE + printCounter(maxCounter.getValue(), maxCounter.getType()) + ", "
+ MIN_TIME_PRE + printCounter(minCounter.getValue(), minCounter.getType());
statistics.addInfoFromProfile(src, infoString);
statistics.addInfoFromProfile(src, counterName, infoString);
}
} else {
Counter newCounter = new Counter(counter.getType(), counter.getValue());
@ -496,8 +513,8 @@ public class RuntimeProfile {
newCounter.addValue(cnt);
}
}
String infoString = counterName + ": " + printCounter(newCounter.getValue(), newCounter.getType());
statistics.addInfoFromProfile(src, infoString);
String infoString = printCounter(newCounter.getValue(), newCounter.getType());
statistics.addInfoFromProfile(src, counterName, infoString);
}
}
@ -516,7 +533,7 @@ public class RuntimeProfile {
}
StringBuilder builder = new StringBuilder();
prettyPrint(builder, "");
ProfileStatistics statistics = new ProfileStatistics();
ProfileStatistics statistics = new ProfileStatistics(this.isPipelineX);
simpleProfile(0, 0, statistics);
String planerStr = this.planner.getExplainStringToProfile(statistics);
return "Simple profile \n \n " + planerStr + "\n \n \n" + builder.toString();

View File

@ -323,7 +323,7 @@ public class BrokerLoadJob extends BulkLoadJob {
return;
}
jobProfile.update(createTimestamp, getSummaryInfo(true), true,
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, "3")), null);
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, "3")), null, false);
}
private Map<String, String> getSummaryInfo(boolean isFinished) {

View File

@ -887,7 +887,8 @@ public class StmtExecutor {
}
profile.update(context.startTime, getSummaryInfo(isFinished), isFinished,
context.getSessionVariable().profileLevel, this.planner);
context.getSessionVariable().profileLevel, this.planner,
context.getSessionVariable().getEnablePipelineXEngine());
}
// Analyze one statement to structure in memory.