diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 118b2a31b5..0c7df2437d 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -351,7 +351,7 @@ void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t } } *max_time = local_max_time; - *min_time = local_min_time; + *min_time = local_min_time == INT64_MAX ? 0 : local_min_time; } template diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java index e1336ce5af..7f2b7a154c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -70,6 +70,7 @@ public class Profile { executionProfile.update(startTime, isFinished); } rootProfile.computeTimeInProfile(); + rootProfile.setProfileLevel(); ProfileManager.getInstance().pushProfile(rootProfile); this.isFinished = isFinished; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java index cbd5e88c79..946d8057fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java @@ -33,6 +33,10 @@ public class Counter { this.value = value; } + public void setValue(long value) { + this.value = value; + } + public TUnit getType() { return TUnit.findByValue(type); } @@ -45,4 +49,20 @@ public class Counter { this.value = value; this.type = type.getValue(); } + + public void addValue(Counter other) { + this.value += other.value; + } + + public void divValue(long div) { + if (div <= 0) { + return; + } + value /= div; + } + + public boolean isTimeType() { + TUnit ttype = TUnit.findByValue(type); + return ttype == TUnit.TIME_MS || ttype == TUnit.TIME_NS || ttype == TUnit.TIME_S; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index dbfb7e83f9..738aeb34c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -70,7 +70,7 @@ public class ProfileManager { private final RuntimeProfile profile; // cache the result of getProfileContent method - private volatile String profileContent; + private volatile String profileContent = null; public Map infoStrings = Maps.newHashMap(); public MultiProfileTreeBuilder builder = null; public String errMsg = ""; @@ -79,11 +79,16 @@ public class ProfileManager { // lazy load profileContent because sometimes profileContent is very large public String getProfileContent() { - if (profileContent != null) { - return profileContent; - } + // no need to lock because the possibility of concurrent read is very low - profileContent = profile.toString(); + if (profileContent == null) { + // Simple profile will change the structure of the profile. + try { + profileContent = profile.getSimpleString(); + } catch (Exception e) { + LOG.warn("profile get error : " + e.toString()); + } + } return profileContent; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index fea16c2f75..730d8f9686 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -20,6 +20,7 @@ package org.apache.doris.common.util; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.profile.SummaryProfile; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TCounter; import org.apache.doris.thrift.TRuntimeProfileNode; import org.apache.doris.thrift.TRuntimeProfileTree; @@ -47,6 +48,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class RuntimeProfile { private static final Logger LOG = LogManager.getLogger(RuntimeProfile.class); public static String ROOT_COUNTER = ""; + public static int FRAGMENT_DEPTH = 3; + public static String MAX_TIME_PRE = "max: "; + public static String MIN_TIME_PRE = "min: "; + public static String AVG_TIME_PRE = "avg: "; private Counter counterTotalTime; private double localTimePercent; @@ -69,6 +74,7 @@ public class RuntimeProfile { private Boolean isDone = false; private Boolean isCancel = false; + private boolean enableSimplyProfile = false; public RuntimeProfile(String name) { this(); @@ -245,10 +251,10 @@ public class RuntimeProfile { } // Print the profile: - // 1. Profile Name - // 2. Info Strings - // 3. Counters - // 4. Children + // 1. Profile Name + // 2. Info Strings + // 3. Counters + // 4. Children public void prettyPrint(StringBuilder builder, String prefix) { Counter counter = this.counterMap.get("TotalTime"); Preconditions.checkState(counter != null); @@ -299,12 +305,198 @@ public class RuntimeProfile { } } + public void simpleProfile(int depth) { + if (depth == FRAGMENT_DEPTH) { + mergeMutiInstance(childList); + return; + } + for (int i = 0; i < childList.size(); i++) { + Pair pair = childList.get(i); + RuntimeProfile profile = pair.first; + profile.simpleProfile(depth + 1); + } + } + + private static void mergeMutiInstance( + LinkedList> childList) { + /* + * Fragment 1: Fragment 1: + * Instance 0 Instance (total) + * Instance 1 + * Instance 2 + */ + int numInstance = childList.size(); + Pair pair = childList.get(0); + RuntimeProfile mergedProfile = pair.first; + LinkedList other = new LinkedList(); + for (int i = 1; i < childList.size(); i++) { + other.add(childList.get(i).first); + } + mergeInstanceProfile(mergedProfile, other); + childList.clear(); + mergedProfile.name = "Instance " + "(" + numInstance + ")"; + childList.add(Pair.of(mergedProfile, pair.second)); + } + + private static LinkedList getChildListFromLists(int idx, LinkedList rhs) { + LinkedList ret = new LinkedList(); + for (RuntimeProfile profile : rhs) { + ret.add(profile.childList.get(idx).first); + } + return ret; + } + + private static LinkedList getCounterListFromLists(String counterName, LinkedList rhs) { + LinkedList ret = new LinkedList(); + for (RuntimeProfile profile : rhs) { + ret.add(profile.counterMap.get(counterName)); + } + return ret; + } + + private static void mergeInstanceProfile(RuntimeProfile src, LinkedList rhs) { + mergeProfileCounter(src, ROOT_COUNTER, rhs); + mergeTotalTime(src, rhs); + mergeProfileInfoStr(src, rhs); + removePipelineContext(src); + for (int i = 0; i < src.childList.size(); i++) { + RuntimeProfile srcChild = src.childList.get(i).first; + LinkedList rhsChild = getChildListFromLists(i, rhs); + mergeInstanceProfile(srcChild, rhsChild); + } + } + + private static void mergeTotalTime(RuntimeProfile src, LinkedList rhs) { + Counter counter = src.counterMap.get("TotalTime"); + for (RuntimeProfile profile : rhs) { + Counter othCounter = profile.counterMap.get("TotalTime"); + if (othCounter != null && counter != null) { + counter.addValue(othCounter); + } + } + counter.setValue(0); // Because the time is not accurate, it has been set to 0. + } + + private static void removePipelineContext(RuntimeProfile src) { + LinkedList> newChildList = new LinkedList>(); + for (Pair pair : src.childList) { + RuntimeProfile profile = pair.first; + if (!profile.name.equals("PipelineContext")) { + newChildList.add(pair); + } + } + src.childList = newChildList; + } + + private static void mergeProfileCounter(RuntimeProfile src, String counterName, LinkedList rhs) { + Set childCounterSet = src.childCounterMap.get(counterName); + if (childCounterSet == null) { + return; + } + List childCounterList = new LinkedList<>(childCounterSet); + for (String childCounterName : childCounterList) { + Counter counter = src.counterMap.get(childCounterName); + LinkedList rhsCounter = getCounterListFromLists(childCounterName, rhs); + + mergeProfileCounter(src, childCounterName, rhs); + mergeCounter(src, childCounterName, counter, rhsCounter); + removeZeroeCounter(childCounterSet, childCounterName, counter); + + } + } + + private static void mergeProfileInfoStr(RuntimeProfile src, LinkedList rhs) { + for (String key : src.infoStringsDisplayOrder) { + Set strList = new TreeSet(); + strList.add(src.infoStrings.get(key)); + for (RuntimeProfile profile : rhs) { + String value = profile.infoStrings.get(key); + if (value != null) { + strList.add(value); + } + } + try { + String joinedString = String.join(" | ", strList); + src.infoStrings.put(key, joinedString); + } catch (Exception e) { + return; + } + } + } + + private static void removeZeroeCounter(Set childCounterSet, String childCounterName, Counter counter) { + if (counter.getValue() == 0) { + childCounterSet.remove(childCounterName); + } + } + + private static void mergeCounter(RuntimeProfile src, String counterName, Counter counter, + LinkedList rhsCounter) { + if (rhsCounter.size() == 0) { + return; + } + if (counter.isTimeType()) { + Counter maxCounter = new Counter(counter.getType(), counter.getValue()); + Counter minCounter = new Counter(counter.getType(), counter.getValue()); + for (Counter cnt : rhsCounter) { + if (cnt.getValue() > maxCounter.getValue()) { + maxCounter.setValue(cnt.getValue()); + } + if (cnt.getValue() < minCounter.getValue()) { + minCounter.setValue(cnt.getValue()); + } + } + for (Counter cnt : rhsCounter) { + counter.addValue(cnt); + } + long countNumber = rhsCounter.size() + 1; + counter.divValue(countNumber); + String maxCounterName = MAX_TIME_PRE + counterName; + String minCounterName = MIN_TIME_PRE + counterName; + src.counterMap.put(minCounterName, minCounter); + src.counterMap.put(maxCounterName, maxCounter); + TreeSet childCounterSet = src.childCounterMap.get(counterName); + if (childCounterSet == null) { + src.childCounterMap.put(counterName, new TreeSet()); + childCounterSet = src.childCounterMap.get(counterName); + } + childCounterSet.add(minCounterName); + childCounterSet.add(maxCounterName); + if (counter.getValue() > 0) { + src.infoStringsDisplayOrder.add(counterName); + String infoString = "[ " + + AVG_TIME_PRE + printCounter(counter.getValue(), counter.getType()) + " , " + + MAX_TIME_PRE + printCounter(maxCounter.getValue(), maxCounter.getType()) + " , " + + MIN_TIME_PRE + printCounter(minCounter.getValue(), minCounter.getType()) + " ]"; + src.infoStrings.put(counterName, infoString); + } + counter.setValue(0); // value 0 will remove in removeZeroeCounter + } else { + if (rhsCounter.size() == 0) { + return; + } + for (Counter cnt : rhsCounter) { + counter.addValue(cnt); + } + } + } + public String toString() { StringBuilder builder = new StringBuilder(); prettyPrint(builder, ""); return builder.toString(); } + public String getSimpleString() { + if (!this.enableSimplyProfile) { + return toString(); + } + StringBuilder builder = new StringBuilder(); + simpleProfile(0); + prettyPrint(builder, ""); + return builder.toString(); + } + private void printChildCounters(String prefix, String counterName, StringBuilder builder) { Set childCounterSet = childCounterMap.get(counterName); if (childCounterSet == null) { @@ -450,6 +642,10 @@ public class RuntimeProfile { computeTimeInProfile(this.counterTotalTime.getValue()); } + public void setProfileLevel() { + this.enableSimplyProfile = ConnectContext.get().getSessionVariable().getEnableSimplyProfile(); + } + private void computeTimeInProfile(long total) { if (total == 0) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 12a74ae893..bb1aabf4cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -110,6 +110,7 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join"; public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num"; public static final String PARALLEL_PIPELINE_TASK_NUM = "parallel_pipeline_task_num"; + public static final String ENABLE_SIMPLY_PROFILE = "enable_simply_profile"; public static final String MAX_INSTANCE_NUM = "max_instance_num"; public static final String ENABLE_INSERT_STRICT = "enable_insert_strict"; public static final String ENABLE_SPILLING = "enable_spilling"; @@ -605,6 +606,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = PARALLEL_PIPELINE_TASK_NUM, fuzzy = true, needForward = true) public int parallelPipelineTaskNum = 0; + @VariableMgr.VarAttr(name = ENABLE_SIMPLY_PROFILE, fuzzy = true) + public boolean enableSimplyProfile = true; + @VariableMgr.VarAttr(name = MAX_INSTANCE_NUM) public int maxInstanceNum = 64; @@ -2619,5 +2623,8 @@ public class SessionVariable implements Serializable, Writable { throw new UnsupportedOperationException("Expect format: HH:mm:ss"); } } -} + public boolean getEnableSimplyProfile() { + return this.enableSimplyProfile; + } +}