[pipeline](profile) Show each instance profile in FE (#18544)

This commit is contained in:
HappenLee
2023-04-12 11:25:46 +08:00
committed by GitHub
parent 43392918cd
commit 155e4e547b

View File

@ -835,7 +835,6 @@ public class Coordinator {
}
// 3. group BackendExecState by BE. So that we can use one RPC to send all fragment instances of a BE.
for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry : tParams.entrySet()) {
PipelineExecContext pipelineExecContext =
new PipelineExecContext(fragment.getFragmentId(),
@ -2694,9 +2693,10 @@ public class Coordinator {
volatile Map<TUniqueId, Boolean> doneFlags = new HashMap<TUniqueId, Boolean>();
boolean hasCanceled;
volatile Map<TUniqueId, Boolean> cancelFlags = new HashMap<TUniqueId, Boolean>();
volatile Map<TUniqueId, RuntimeProfile> profiles = new HashMap<TUniqueId, RuntimeProfile>();
int cancelProgress = 0;
int profileFragmentId;
RuntimeProfile profile;
TNetworkAddress brpcAddress;
TNetworkAddress address;
Backend backend;
@ -2713,6 +2713,11 @@ public class Coordinator {
this.numInstances = rpcParams.local_params.size();
for (int i = 0; i < this.numInstances; i++) {
this.doneFlags.put(rpcParams.local_params.get(i).fragment_instance_id, false);
this.cancelFlags.put(rpcParams.local_params.get(i).fragment_instance_id, false);
String name = "Instance " + DebugUtil.printId(rpcParams.local_params.get(i).fragment_instance_id)
+ " (host=" + addr + ")";
this.profiles.put(rpcParams.local_params.get(i).fragment_instance_id, new RuntimeProfile(name));
}
this.initiated = false;
this.done = false;
@ -2721,12 +2726,7 @@ public class Coordinator {
this.backend = idToBackend.get(addressToBackendID.get(address));
this.brpcAddress = new TNetworkAddress(backend.getIp(), backend.getBrpcPort());
String name = "Fragment " + profileFragmentId + " (host=" + address + ")";
this.profile = new RuntimeProfile(name);
this.hasCanceled = false;
for (int i = 0; i < this.numInstances; i++) {
this.cancelFlags.put(rpcParams.local_params.get(i).fragment_instance_id, false);
}
this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime();
}
@ -2752,7 +2752,7 @@ public class Coordinator {
return false;
}
if (params.isSetProfile()) {
profile.update(params.profile);
this.profiles.get(params.fragment_instance_id).update(params.profile);
}
if (params.done) {
this.doneFlags.replace(params.fragment_instance_id, true);
@ -2765,8 +2765,10 @@ public class Coordinator {
}
public synchronized void printProfile(StringBuilder builder) {
this.profile.computeTimeInProfile();
this.profile.prettyPrint(builder, "");
this.profiles.values().stream().forEach(p -> {
p.computeTimeInProfile();
p.prettyPrint(builder, "");
});
}
// cancel all fragment instances.
@ -2826,7 +2828,7 @@ public class Coordinator {
LOG.warn("profileFragmentId {} should be in [0, {})", profileFragmentId, maxFragmentId);
return false;
}
profile.computeTimeInProfile();
// profile.computeTimeInProfile();
return true;
}
@ -3329,7 +3331,8 @@ public class Coordinator {
if (!ctx.computeTimeInProfile(fragmentProfile.size())) {
return;
}
fragmentProfile.get(ctx.profileFragmentId).addChild(ctx.profile);
ctx.profiles.values().stream().forEach(p ->
fragmentProfile.get(ctx.profileFragmentId).addChild(p));
}
} else {
for (BackendExecState backendExecState : backendExecStates) {