[fix](pipelineX) fix unable to merge profiles in multi be (#29204)

This commit is contained in:
Mryange
2023-12-29 23:48:46 +08:00
committed by GitHub
parent 2518ed64ea
commit 7f105facc4
2 changed files with 49 additions and 19 deletions

View File

@ -23,6 +23,7 @@ import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TUnit;
@ -35,9 +36,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* ExecutionProfile is used to collect profile of a complete query plan(including query or load).
* Need to call addToProfileAsChild() to add it to the root profile.
@ -71,39 +72,62 @@ public class ExecutionProfile {
// fragmentId -> dummy value
private MarkedCountDownLatch<Integer, Long> profileFragmentDoneSignal;
// use to merge profile from multi be
private List<Map<TNetworkAddress, List<RuntimeProfile>>> multiBeProfile = null;
public ExecutionProfile(TUniqueId queryId, int fragmentNum) {
executionProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId));
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
executionProfile.addChild(fragmentsProfile);
fragmentProfiles = Lists.newArrayList();
multiBeProfile = Lists.newArrayList();
for (int i = 0; i < fragmentNum; i++) {
fragmentProfiles.add(new RuntimeProfile("Fragment " + i));
fragmentsProfile.addChild(fragmentProfiles.get(i));
multiBeProfile.add(new ConcurrentHashMap<TNetworkAddress, List<RuntimeProfile>>());
}
loadChannelProfile = new RuntimeProfile("LoadChannels");
executionProfile.addChild(loadChannelProfile);
}
private RuntimeProfile getPipelineXAggregatedProfile(Map<Integer, String> planNodeMap) {
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
for (int i = 0; i < fragmentProfiles.size(); ++i) {
RuntimeProfile oldFragmentProfile = fragmentProfiles.get(i);
RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " + i);
fragmentsProfile.addChild(newFragmentProfile);
List<RuntimeProfile> allPipelines = new ArrayList<RuntimeProfile>();
for (Pair<RuntimeProfile, Boolean> runtimeProfile : oldFragmentProfile.getChildList()) {
allPipelines.add(runtimeProfile.first);
}
int pipelineIdx = 0;
for (RuntimeProfile pipeline : allPipelines) {
List<RuntimeProfile> allPipelineTask = new ArrayList<RuntimeProfile>();
public void addMultiBeProfileByPipelineX(int profileFragmentId, TNetworkAddress address,
List<RuntimeProfile> taskProfile) {
multiBeProfile.get(profileFragmentId).put(address, taskProfile);
}
private List<List<RuntimeProfile>> getMultiBeProfile(int profileFragmentId) {
Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline = multiBeProfile.get(profileFragmentId);
List<List<RuntimeProfile>> allPipelines = Lists.newArrayList();
int pipelineSize = 0;
for (List<RuntimeProfile> profiles : multiPipeline.values()) {
pipelineSize = profiles.size();
break;
}
for (int pipelineIdx = 0; pipelineIdx < pipelineSize; pipelineIdx++) {
List<RuntimeProfile> allPipelineTask = new ArrayList<RuntimeProfile>();
for (List<RuntimeProfile> pipelines : multiPipeline.values()) {
RuntimeProfile pipeline = pipelines.get(pipelineIdx);
for (Pair<RuntimeProfile, Boolean> runtimeProfile : pipeline.getChildList()) {
allPipelineTask.add(runtimeProfile.first);
}
}
allPipelines.add(allPipelineTask);
}
return allPipelines;
}
private RuntimeProfile getPipelineXAggregatedProfile(Map<Integer, String> planNodeMap) {
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
for (int i = 0; i < fragmentProfiles.size(); ++i) {
RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " + i);
fragmentsProfile.addChild(newFragmentProfile);
List<List<RuntimeProfile>> allPipelines = getMultiBeProfile(i);
int pipelineIdx = 0;
for (List<RuntimeProfile> allPipelineTask : allPipelines) {
RuntimeProfile mergedpipelineProfile = new RuntimeProfile(
"Pipeline : " + pipelineIdx + "(instance_num="
+ allPipelineTask.size() + ")",
allPipelines.get(0).nodeId());
allPipelineTask.get(0).nodeId());
RuntimeProfile.mergeProfiles(allPipelineTask, mergedpipelineProfile, planNodeMap);
newFragmentProfile.addChild(mergedpipelineProfile);
pipelineIdx++;

View File

@ -3174,9 +3174,11 @@ public class Coordinator implements CoordInterface {
return fragmentInstancesMap.values().stream();
}
private void attachInstanceProfileToFragmentProfile() {
public void attachPipelineProfileToFragmentProfile() {
profileStream()
.forEach(p -> executionProfile.addInstanceProfile(this.profileFragmentId, p));
executionProfile.addMultiBeProfileByPipelineX(profileFragmentId, address,
taskProfile);
}
/**
@ -3218,7 +3220,7 @@ public class Coordinator implements CoordInterface {
loadChannelProfile.update(params.loadChannelProfile);
}
this.done = params.done;
attachInstanceProfileToFragmentProfile();
attachPipelineProfileToFragmentProfile();
return this.done;
} else {
RuntimeProfile profile = fragmentInstancesMap.get(params.fragment_instance_id);
@ -3940,8 +3942,12 @@ public class Coordinator implements CoordInterface {
private void attachInstanceProfileToFragmentProfile() {
if (enablePipelineEngine) {
for (PipelineExecContext ctx : pipelineExecContexts.values()) {
ctx.profileStream()
.forEach(p -> executionProfile.addInstanceProfile(ctx.profileFragmentId, p));
if (enablePipelineXEngine) {
ctx.attachPipelineProfileToFragmentProfile();
} else {
ctx.profileStream()
.forEach(p -> executionProfile.addInstanceProfile(ctx.profileFragmentId, p));
}
}
} else {
for (BackendExecState backendExecState : backendExecStates) {