[fix](profile) Fix pipeline load channel profile #19828
This commit is contained in:
@ -824,7 +824,8 @@ public class Coordinator {
|
||||
for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry : tParams.entrySet()) {
|
||||
Long backendId = this.addressToBackendID.get(entry.getKey());
|
||||
PipelineExecContext pipelineExecContext = new PipelineExecContext(fragment.getFragmentId(),
|
||||
profileFragmentId, entry.getValue(), backendId, fragmentInstancesMap);
|
||||
profileFragmentId, entry.getValue(), backendId, fragmentInstancesMap,
|
||||
executionProfile.getLoadChannelProfile());
|
||||
// Each tParam will set the total number of Fragments that need to be executed on the same BE,
|
||||
// and the BE will determine whether all Fragments have been executed based on this information.
|
||||
// Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons.
|
||||
@ -2686,6 +2687,7 @@ public class Coordinator {
|
||||
volatile boolean done;
|
||||
boolean hasCanceled;
|
||||
Map<TUniqueId, RuntimeProfile> fragmentInstancesMap;
|
||||
RuntimeProfile loadChannelProfile;
|
||||
int cancelProgress = 0;
|
||||
int profileFragmentId;
|
||||
TNetworkAddress brpcAddress;
|
||||
@ -2697,12 +2699,14 @@ public class Coordinator {
|
||||
|
||||
public PipelineExecContext(PlanFragmentId fragmentId, int profileFragmentId,
|
||||
TPipelineFragmentParams rpcParams, Long backendId,
|
||||
Map<TUniqueId, RuntimeProfile> fragmentInstancesMap) {
|
||||
Map<TUniqueId, RuntimeProfile> fragmentInstancesMap,
|
||||
RuntimeProfile loadChannelProfile) {
|
||||
this.profileFragmentId = profileFragmentId;
|
||||
this.fragmentId = fragmentId;
|
||||
this.rpcParams = rpcParams;
|
||||
this.numInstances = rpcParams.local_params.size();
|
||||
this.fragmentInstancesMap = fragmentInstancesMap;
|
||||
this.loadChannelProfile = loadChannelProfile;
|
||||
|
||||
this.initiated = false;
|
||||
this.done = false;
|
||||
@ -2741,6 +2745,9 @@ public class Coordinator {
|
||||
if (params.isSetProfile()) {
|
||||
profile.update(params.profile);
|
||||
}
|
||||
if (params.isSetLoadChannelProfile()) {
|
||||
loadChannelProfile.update(params.loadChannelProfile);
|
||||
}
|
||||
if (params.done) {
|
||||
profile.setIsDone(true);
|
||||
profileReportProgress++;
|
||||
|
||||
Reference in New Issue
Block a user