From 155e4e547bec83b6ced838d5252eb1cb35b82d4a Mon Sep 17 00:00:00 2001 From: HappenLee Date: Wed, 12 Apr 2023 11:25:46 +0800 Subject: [PATCH] [pipeline](profile) Show each instance profile in FE (#18544) --- .../java/org/apache/doris/qe/Coordinator.java | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index ee4c0135b1..bef25659b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -835,7 +835,6 @@ public class Coordinator { } // 3. group BackendExecState by BE. So that we can use one RPC to send all fragment instances of a BE. - for (Map.Entry entry : tParams.entrySet()) { PipelineExecContext pipelineExecContext = new PipelineExecContext(fragment.getFragmentId(), @@ -2694,9 +2693,10 @@ public class Coordinator { volatile Map doneFlags = new HashMap(); boolean hasCanceled; volatile Map cancelFlags = new HashMap(); + + volatile Map profiles = new HashMap(); int cancelProgress = 0; int profileFragmentId; - RuntimeProfile profile; TNetworkAddress brpcAddress; TNetworkAddress address; Backend backend; @@ -2713,6 +2713,11 @@ public class Coordinator { this.numInstances = rpcParams.local_params.size(); for (int i = 0; i < this.numInstances; i++) { this.doneFlags.put(rpcParams.local_params.get(i).fragment_instance_id, false); + this.cancelFlags.put(rpcParams.local_params.get(i).fragment_instance_id, false); + + String name = "Instance " + DebugUtil.printId(rpcParams.local_params.get(i).fragment_instance_id) + + " (host=" + addr + ")"; + this.profiles.put(rpcParams.local_params.get(i).fragment_instance_id, new RuntimeProfile(name)); } this.initiated = false; this.done = false; @@ -2721,12 +2726,7 @@ public class Coordinator { this.backend = idToBackend.get(addressToBackendID.get(address)); this.brpcAddress = new TNetworkAddress(backend.getIp(), backend.getBrpcPort()); - String name = "Fragment " + profileFragmentId + " (host=" + address + ")"; - this.profile = new RuntimeProfile(name); this.hasCanceled = false; - for (int i = 0; i < this.numInstances; i++) { - this.cancelFlags.put(rpcParams.local_params.get(i).fragment_instance_id, false); - } this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime(); } @@ -2752,7 +2752,7 @@ public class Coordinator { return false; } if (params.isSetProfile()) { - profile.update(params.profile); + this.profiles.get(params.fragment_instance_id).update(params.profile); } if (params.done) { this.doneFlags.replace(params.fragment_instance_id, true); @@ -2765,8 +2765,10 @@ public class Coordinator { } public synchronized void printProfile(StringBuilder builder) { - this.profile.computeTimeInProfile(); - this.profile.prettyPrint(builder, ""); + this.profiles.values().stream().forEach(p -> { + p.computeTimeInProfile(); + p.prettyPrint(builder, ""); + }); } // cancel all fragment instances. @@ -2826,7 +2828,7 @@ public class Coordinator { LOG.warn("profileFragmentId {} should be in [0, {})", profileFragmentId, maxFragmentId); return false; } - profile.computeTimeInProfile(); + // profile.computeTimeInProfile(); return true; } @@ -3329,7 +3331,8 @@ public class Coordinator { if (!ctx.computeTimeInProfile(fragmentProfile.size())) { return; } - fragmentProfile.get(ctx.profileFragmentId).addChild(ctx.profile); + ctx.profiles.values().stream().forEach(p -> + fragmentProfile.get(ctx.profileFragmentId).addChild(p)); } } else { for (BackendExecState backendExecState : backendExecStates) {