From 23f01ddf3ac41044f56cedb594d6ba4a5c769045 Mon Sep 17 00:00:00 2001 From: Mryange <59914473+Mryange@users.noreply.github.com> Date: Fri, 15 Sep 2023 10:25:14 +0800 Subject: [PATCH] [feature](profile) support simply profile (#23377) A Simplified Version of the Profile Divided into three levels: Level 2: The original profile. Level 1: Instances with identical structures are merged, utilizing concatenation for info strings, and recording the extremum for time types. Note that currently, this is purely experimental, simplifying the profile on the frontend (you can view profiles at any level). Subsequently, we will transition the simplification process to the backend. At that point, due to the simplification being done on the backend, viewing profiles at other levels won't be possible. Due to the issue with the pipeline structure, the active time does not accurately reflect the time of the operators. ``` set enable_simply_profile = false; set enable_simply_profile = true; ``` --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 2 +- .../apache/doris/common/profile/Profile.java | 1 + .../org/apache/doris/common/util/Counter.java | 20 ++ .../doris/common/util/ProfileManager.java | 15 +- .../doris/common/util/RuntimeProfile.java | 204 +++++++++++++++++- .../org/apache/doris/qe/SessionVariable.java | 9 +- 6 files changed, 240 insertions(+), 11 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 118b2a31b5..0c7df2437d 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -351,7 +351,7 @@ void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t } } *max_time = local_max_time; - *min_time = local_min_time; + *min_time = local_min_time == INT64_MAX ? 0 : local_min_time; } template diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java index e1336ce5af..7f2b7a154c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -70,6 +70,7 @@ public class Profile { executionProfile.update(startTime, isFinished); } rootProfile.computeTimeInProfile(); + rootProfile.setProfileLevel(); ProfileManager.getInstance().pushProfile(rootProfile); this.isFinished = isFinished; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java index cbd5e88c79..946d8057fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java @@ -33,6 +33,10 @@ public class Counter { this.value = value; } + public void setValue(long value) { + this.value = value; + } + public TUnit getType() { return TUnit.findByValue(type); } @@ -45,4 +49,20 @@ public class Counter { this.value = value; this.type = type.getValue(); } + + public void addValue(Counter other) { + this.value += other.value; + } + + public void divValue(long div) { + if (div <= 0) { + return; + } + value /= div; + } + + public boolean isTimeType() { + TUnit ttype = TUnit.findByValue(type); + return ttype == TUnit.TIME_MS || ttype == TUnit.TIME_NS || ttype == TUnit.TIME_S; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index dbfb7e83f9..738aeb34c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -70,7 +70,7 @@ public class ProfileManager { private final RuntimeProfile profile; // cache the result of getProfileContent method - private volatile String profileContent; + private volatile String profileContent = null; public Map infoStrings = Maps.newHashMap(); public MultiProfileTreeBuilder builder = null; public String errMsg = ""; @@ -79,11 +79,16 @@ public class ProfileManager { // lazy load profileContent because sometimes profileContent is very large public String getProfileContent() { - if (profileContent != null) { - return profileContent; - } + // no need to lock because the possibility of concurrent read is very low - profileContent = profile.toString(); + if (profileContent == null) { + // Simple profile will change the structure of the profile. + try { + profileContent = profile.getSimpleString(); + } catch (Exception e) { + LOG.warn("profile get error : " + e.toString()); + } + } return profileContent; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index fea16c2f75..730d8f9686 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -20,6 +20,7 @@ package org.apache.doris.common.util; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.profile.SummaryProfile; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TCounter; import org.apache.doris.thrift.TRuntimeProfileNode; import org.apache.doris.thrift.TRuntimeProfileTree; @@ -47,6 +48,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class RuntimeProfile { private static final Logger LOG = LogManager.getLogger(RuntimeProfile.class); public static String ROOT_COUNTER = ""; + public static int FRAGMENT_DEPTH = 3; + public static String MAX_TIME_PRE = "max: "; + public static String MIN_TIME_PRE = "min: "; + public static String AVG_TIME_PRE = "avg: "; private Counter counterTotalTime; private double localTimePercent; @@ -69,6 +74,7 @@ public class RuntimeProfile { private Boolean isDone = false; private Boolean isCancel = false; + private boolean enableSimplyProfile = false; public RuntimeProfile(String name) { this(); @@ -245,10 +251,10 @@ public class RuntimeProfile { } // Print the profile: - // 1. Profile Name - // 2. Info Strings - // 3. Counters - // 4. Children + // 1. Profile Name + // 2. Info Strings + // 3. Counters + // 4. Children public void prettyPrint(StringBuilder builder, String prefix) { Counter counter = this.counterMap.get("TotalTime"); Preconditions.checkState(counter != null); @@ -299,12 +305,198 @@ public class RuntimeProfile { } } + public void simpleProfile(int depth) { + if (depth == FRAGMENT_DEPTH) { + mergeMutiInstance(childList); + return; + } + for (int i = 0; i < childList.size(); i++) { + Pair pair = childList.get(i); + RuntimeProfile profile = pair.first; + profile.simpleProfile(depth + 1); + } + } + + private static void mergeMutiInstance( + LinkedList> childList) { + /* + * Fragment 1: Fragment 1: + * Instance 0 Instance (total) + * Instance 1 + * Instance 2 + */ + int numInstance = childList.size(); + Pair pair = childList.get(0); + RuntimeProfile mergedProfile = pair.first; + LinkedList other = new LinkedList(); + for (int i = 1; i < childList.size(); i++) { + other.add(childList.get(i).first); + } + mergeInstanceProfile(mergedProfile, other); + childList.clear(); + mergedProfile.name = "Instance " + "(" + numInstance + ")"; + childList.add(Pair.of(mergedProfile, pair.second)); + } + + private static LinkedList getChildListFromLists(int idx, LinkedList rhs) { + LinkedList ret = new LinkedList(); + for (RuntimeProfile profile : rhs) { + ret.add(profile.childList.get(idx).first); + } + return ret; + } + + private static LinkedList getCounterListFromLists(String counterName, LinkedList rhs) { + LinkedList ret = new LinkedList(); + for (RuntimeProfile profile : rhs) { + ret.add(profile.counterMap.get(counterName)); + } + return ret; + } + + private static void mergeInstanceProfile(RuntimeProfile src, LinkedList rhs) { + mergeProfileCounter(src, ROOT_COUNTER, rhs); + mergeTotalTime(src, rhs); + mergeProfileInfoStr(src, rhs); + removePipelineContext(src); + for (int i = 0; i < src.childList.size(); i++) { + RuntimeProfile srcChild = src.childList.get(i).first; + LinkedList rhsChild = getChildListFromLists(i, rhs); + mergeInstanceProfile(srcChild, rhsChild); + } + } + + private static void mergeTotalTime(RuntimeProfile src, LinkedList rhs) { + Counter counter = src.counterMap.get("TotalTime"); + for (RuntimeProfile profile : rhs) { + Counter othCounter = profile.counterMap.get("TotalTime"); + if (othCounter != null && counter != null) { + counter.addValue(othCounter); + } + } + counter.setValue(0); // Because the time is not accurate, it has been set to 0. + } + + private static void removePipelineContext(RuntimeProfile src) { + LinkedList> newChildList = new LinkedList>(); + for (Pair pair : src.childList) { + RuntimeProfile profile = pair.first; + if (!profile.name.equals("PipelineContext")) { + newChildList.add(pair); + } + } + src.childList = newChildList; + } + + private static void mergeProfileCounter(RuntimeProfile src, String counterName, LinkedList rhs) { + Set childCounterSet = src.childCounterMap.get(counterName); + if (childCounterSet == null) { + return; + } + List childCounterList = new LinkedList<>(childCounterSet); + for (String childCounterName : childCounterList) { + Counter counter = src.counterMap.get(childCounterName); + LinkedList rhsCounter = getCounterListFromLists(childCounterName, rhs); + + mergeProfileCounter(src, childCounterName, rhs); + mergeCounter(src, childCounterName, counter, rhsCounter); + removeZeroeCounter(childCounterSet, childCounterName, counter); + + } + } + + private static void mergeProfileInfoStr(RuntimeProfile src, LinkedList rhs) { + for (String key : src.infoStringsDisplayOrder) { + Set strList = new TreeSet(); + strList.add(src.infoStrings.get(key)); + for (RuntimeProfile profile : rhs) { + String value = profile.infoStrings.get(key); + if (value != null) { + strList.add(value); + } + } + try { + String joinedString = String.join(" | ", strList); + src.infoStrings.put(key, joinedString); + } catch (Exception e) { + return; + } + } + } + + private static void removeZeroeCounter(Set childCounterSet, String childCounterName, Counter counter) { + if (counter.getValue() == 0) { + childCounterSet.remove(childCounterName); + } + } + + private static void mergeCounter(RuntimeProfile src, String counterName, Counter counter, + LinkedList rhsCounter) { + if (rhsCounter.size() == 0) { + return; + } + if (counter.isTimeType()) { + Counter maxCounter = new Counter(counter.getType(), counter.getValue()); + Counter minCounter = new Counter(counter.getType(), counter.getValue()); + for (Counter cnt : rhsCounter) { + if (cnt.getValue() > maxCounter.getValue()) { + maxCounter.setValue(cnt.getValue()); + } + if (cnt.getValue() < minCounter.getValue()) { + minCounter.setValue(cnt.getValue()); + } + } + for (Counter cnt : rhsCounter) { + counter.addValue(cnt); + } + long countNumber = rhsCounter.size() + 1; + counter.divValue(countNumber); + String maxCounterName = MAX_TIME_PRE + counterName; + String minCounterName = MIN_TIME_PRE + counterName; + src.counterMap.put(minCounterName, minCounter); + src.counterMap.put(maxCounterName, maxCounter); + TreeSet childCounterSet = src.childCounterMap.get(counterName); + if (childCounterSet == null) { + src.childCounterMap.put(counterName, new TreeSet()); + childCounterSet = src.childCounterMap.get(counterName); + } + childCounterSet.add(minCounterName); + childCounterSet.add(maxCounterName); + if (counter.getValue() > 0) { + src.infoStringsDisplayOrder.add(counterName); + String infoString = "[ " + + AVG_TIME_PRE + printCounter(counter.getValue(), counter.getType()) + " , " + + MAX_TIME_PRE + printCounter(maxCounter.getValue(), maxCounter.getType()) + " , " + + MIN_TIME_PRE + printCounter(minCounter.getValue(), minCounter.getType()) + " ]"; + src.infoStrings.put(counterName, infoString); + } + counter.setValue(0); // value 0 will remove in removeZeroeCounter + } else { + if (rhsCounter.size() == 0) { + return; + } + for (Counter cnt : rhsCounter) { + counter.addValue(cnt); + } + } + } + public String toString() { StringBuilder builder = new StringBuilder(); prettyPrint(builder, ""); return builder.toString(); } + public String getSimpleString() { + if (!this.enableSimplyProfile) { + return toString(); + } + StringBuilder builder = new StringBuilder(); + simpleProfile(0); + prettyPrint(builder, ""); + return builder.toString(); + } + private void printChildCounters(String prefix, String counterName, StringBuilder builder) { Set childCounterSet = childCounterMap.get(counterName); if (childCounterSet == null) { @@ -450,6 +642,10 @@ public class RuntimeProfile { computeTimeInProfile(this.counterTotalTime.getValue()); } + public void setProfileLevel() { + this.enableSimplyProfile = ConnectContext.get().getSessionVariable().getEnableSimplyProfile(); + } + private void computeTimeInProfile(long total) { if (total == 0) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 12a74ae893..bb1aabf4cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -110,6 +110,7 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join"; public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num"; public static final String PARALLEL_PIPELINE_TASK_NUM = "parallel_pipeline_task_num"; + public static final String ENABLE_SIMPLY_PROFILE = "enable_simply_profile"; public static final String MAX_INSTANCE_NUM = "max_instance_num"; public static final String ENABLE_INSERT_STRICT = "enable_insert_strict"; public static final String ENABLE_SPILLING = "enable_spilling"; @@ -605,6 +606,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = PARALLEL_PIPELINE_TASK_NUM, fuzzy = true, needForward = true) public int parallelPipelineTaskNum = 0; + @VariableMgr.VarAttr(name = ENABLE_SIMPLY_PROFILE, fuzzy = true) + public boolean enableSimplyProfile = true; + @VariableMgr.VarAttr(name = MAX_INSTANCE_NUM) public int maxInstanceNum = 64; @@ -2619,5 +2623,8 @@ public class SessionVariable implements Serializable, Writable { throw new UnsupportedOperationException("Expect format: HH:mm:ss"); } } -} + public boolean getEnableSimplyProfile() { + return this.enableSimplyProfile; + } +}