[refactor](coordinator) split profile logic and instance report logic (#32010)
Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
@ -128,4 +128,9 @@ public class Status {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Status [errorCode=" + errorCode + ", errorMsg=" + errorMsg + "]";
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,33 +17,31 @@
|
||||
|
||||
package org.apache.doris.common.profile;
|
||||
|
||||
import org.apache.doris.common.MarkedCountDownLatch;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.Status;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.RuntimeProfile;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.planner.PlanFragmentId;
|
||||
import org.apache.doris.thrift.TDetailedReportParams;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TReportExecStatusParams;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
import org.apache.doris.thrift.TUnit;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
/**
|
||||
* ExecutionProfile is used to collect profile of a complete query plan(including query or load).
|
||||
* root is used to collect profile of a complete query plan(including query or load).
|
||||
* Need to call addToProfileAsChild() to add it to the root profile.
|
||||
* It has the following structure:
|
||||
* Execution Profile:
|
||||
@ -59,53 +57,62 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
public class ExecutionProfile {
|
||||
private static final Logger LOG = LogManager.getLogger(ExecutionProfile.class);
|
||||
|
||||
private final TUniqueId queryId;
|
||||
private boolean isFinished = false;
|
||||
private long startTime = 0L;
|
||||
private long queryFinishTime = 0L;
|
||||
// The root profile of this execution task
|
||||
private RuntimeProfile executionProfile;
|
||||
private RuntimeProfile root;
|
||||
// Profiles for each fragment. And the InstanceProfile is the child of fragment profile.
|
||||
// Which will be added to fragment profile when calling Coordinator::sendFragment()
|
||||
private List<RuntimeProfile> fragmentProfiles;
|
||||
// Could not use array list because fragment id is not continuous, planner may cut fragment
|
||||
// during planning.
|
||||
private Map<Integer, RuntimeProfile> fragmentProfiles;
|
||||
// Profile for load channels. Only for load job.
|
||||
private RuntimeProfile loadChannelProfile;
|
||||
// A countdown latch to mark the completion of each instance.
|
||||
// use for old pipeline
|
||||
// instance id -> dummy value
|
||||
private MarkedCountDownLatch<TUniqueId, Long> profileDoneSignal;
|
||||
|
||||
// A countdown latch to mark the completion of each fragment. use for pipelineX
|
||||
// fragmentId -> dummy value
|
||||
private MarkedCountDownLatch<Integer, Long> profileFragmentDoneSignal;
|
||||
|
||||
// fragmentId -> The number of BE without 'done.
|
||||
private Map<Integer, Integer> befragmentDone;
|
||||
|
||||
// lock befragmentDone
|
||||
private ReadWriteLock lock;
|
||||
// FragmentId -> InstanceId -> RuntimeProfile
|
||||
private Map<PlanFragmentId, Map<TUniqueId, RuntimeProfile>> fragmentInstancesProfiles;
|
||||
private boolean isPipelineXProfile = false;
|
||||
|
||||
// use to merge profile from multi be
|
||||
private List<Map<TNetworkAddress, List<RuntimeProfile>>> multiBeProfile = null;
|
||||
private Map<Integer, Map<TNetworkAddress, List<RuntimeProfile>>> multiBeProfile = null;
|
||||
|
||||
public ExecutionProfile(TUniqueId queryId, int fragmentNum) {
|
||||
executionProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId));
|
||||
// Not serialize this property, it is only used to get profile id.
|
||||
private SummaryProfile summaryProfile;
|
||||
|
||||
// BE only has instance id, does not have fragmentid, so should use this map to find fragmentid.
|
||||
private Map<TUniqueId, PlanFragmentId> instanceIdToFragmentId;
|
||||
private Map<Integer, Integer> fragmentIdBeNum;
|
||||
private Map<Integer, Integer> seqNoToFragmentId;
|
||||
|
||||
public ExecutionProfile(TUniqueId queryId, List<PlanFragment> fragments) {
|
||||
this.queryId = queryId;
|
||||
root = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId));
|
||||
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
|
||||
executionProfile.addChild(fragmentsProfile);
|
||||
fragmentProfiles = Lists.newArrayList();
|
||||
multiBeProfile = Lists.newArrayList();
|
||||
for (int i = 0; i < fragmentNum; i++) {
|
||||
fragmentProfiles.add(new RuntimeProfile("Fragment " + i));
|
||||
fragmentsProfile.addChild(fragmentProfiles.get(i));
|
||||
multiBeProfile.add(new ConcurrentHashMap<TNetworkAddress, List<RuntimeProfile>>());
|
||||
root.addChild(fragmentsProfile);
|
||||
fragmentProfiles = Maps.newHashMap();
|
||||
multiBeProfile = Maps.newHashMap();
|
||||
fragmentIdBeNum = Maps.newHashMap();
|
||||
seqNoToFragmentId = Maps.newHashMap();
|
||||
int i = 0;
|
||||
for (PlanFragment planFragment : fragments) {
|
||||
RuntimeProfile runtimeProfile = new RuntimeProfile("Fragment " + i);
|
||||
fragmentProfiles.put(planFragment.getFragmentId().asInt(), runtimeProfile);
|
||||
fragmentsProfile.addChild(runtimeProfile);
|
||||
multiBeProfile.put(planFragment.getFragmentId().asInt(),
|
||||
new ConcurrentHashMap<TNetworkAddress, List<RuntimeProfile>>());
|
||||
fragmentIdBeNum.put(planFragment.getFragmentId().asInt(), 0);
|
||||
seqNoToFragmentId.put(i, planFragment.getFragmentId().asInt());
|
||||
++i;
|
||||
}
|
||||
loadChannelProfile = new RuntimeProfile("LoadChannels");
|
||||
executionProfile.addChild(loadChannelProfile);
|
||||
root.addChild(loadChannelProfile);
|
||||
fragmentInstancesProfiles = Maps.newHashMap();
|
||||
instanceIdToFragmentId = Maps.newHashMap();
|
||||
}
|
||||
|
||||
public void addMultiBeProfileByPipelineX(int profileFragmentId, TNetworkAddress address,
|
||||
List<RuntimeProfile> taskProfile) {
|
||||
multiBeProfile.get(profileFragmentId).put(address, taskProfile);
|
||||
}
|
||||
|
||||
private List<List<RuntimeProfile>> getMultiBeProfile(int profileFragmentId) {
|
||||
Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline = multiBeProfile.get(profileFragmentId);
|
||||
private List<List<RuntimeProfile>> getMultiBeProfile(int fragmentId) {
|
||||
Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline = multiBeProfile.get(fragmentId);
|
||||
List<List<RuntimeProfile>> allPipelines = Lists.newArrayList();
|
||||
int pipelineSize = 0;
|
||||
for (List<RuntimeProfile> profiles : multiPipeline.values()) {
|
||||
@ -130,7 +137,7 @@ public class ExecutionProfile {
|
||||
for (int i = 0; i < fragmentProfiles.size(); ++i) {
|
||||
RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " + i);
|
||||
fragmentsProfile.addChild(newFragmentProfile);
|
||||
List<List<RuntimeProfile>> allPipelines = getMultiBeProfile(i);
|
||||
List<List<RuntimeProfile>> allPipelines = getMultiBeProfile(seqNoToFragmentId.get(i));
|
||||
int pipelineIdx = 0;
|
||||
for (List<RuntimeProfile> allPipelineTask : allPipelines) {
|
||||
RuntimeProfile mergedpipelineProfile = new RuntimeProfile(
|
||||
@ -148,7 +155,7 @@ public class ExecutionProfile {
|
||||
private RuntimeProfile getNonPipelineXAggregatedProfile(Map<Integer, String> planNodeMap) {
|
||||
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
|
||||
for (int i = 0; i < fragmentProfiles.size(); ++i) {
|
||||
RuntimeProfile oldFragmentProfile = fragmentProfiles.get(i);
|
||||
RuntimeProfile oldFragmentProfile = fragmentProfiles.get(seqNoToFragmentId.get(i));
|
||||
RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " + i);
|
||||
fragmentsProfile.addChild(newFragmentProfile);
|
||||
List<RuntimeProfile> allInstanceProfiles = new ArrayList<RuntimeProfile>();
|
||||
@ -164,7 +171,7 @@ public class ExecutionProfile {
|
||||
}
|
||||
|
||||
public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> planNodeMap) {
|
||||
if (enablePipelineX()) {
|
||||
if (isPipelineXProfile) {
|
||||
/*
|
||||
* Fragment 0
|
||||
* ---Pipeline 0
|
||||
@ -211,143 +218,143 @@ public class ExecutionProfile {
|
||||
}
|
||||
}
|
||||
|
||||
public RuntimeProfile getExecutionProfile() {
|
||||
return executionProfile;
|
||||
public RuntimeProfile getRoot() {
|
||||
return root;
|
||||
}
|
||||
|
||||
public RuntimeProfile getLoadChannelProfile() {
|
||||
return loadChannelProfile;
|
||||
}
|
||||
|
||||
public List<RuntimeProfile> getFragmentProfiles() {
|
||||
return fragmentProfiles;
|
||||
}
|
||||
|
||||
public void addToProfileAsChild(RuntimeProfile rootProfile) {
|
||||
rootProfile.addChild(executionProfile);
|
||||
}
|
||||
|
||||
public void markInstances(Set<TUniqueId> instanceIds) {
|
||||
profileDoneSignal = new MarkedCountDownLatch<>(instanceIds.size());
|
||||
for (TUniqueId instanceId : instanceIds) {
|
||||
profileDoneSignal.addMark(instanceId, -1L /* value is meaningless */);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean enablePipelineX() {
|
||||
return profileFragmentDoneSignal != null;
|
||||
}
|
||||
|
||||
public void markFragments(int fragments) {
|
||||
profileFragmentDoneSignal = new MarkedCountDownLatch<>(fragments);
|
||||
lock = new ReentrantReadWriteLock();
|
||||
befragmentDone = new HashMap<>();
|
||||
for (int fragmentId = 0; fragmentId < fragments; fragmentId++) {
|
||||
profileFragmentDoneSignal.addMark(fragmentId, -1L /* value is meaningless */);
|
||||
befragmentDone.put(fragmentId, 0);
|
||||
}
|
||||
}
|
||||
|
||||
public void addFragments(int fragmentId) {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
befragmentDone.put(fragmentId, befragmentDone.get(fragmentId) + 1);
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
public void setPipelineX() {
|
||||
this.isPipelineXProfile = true;
|
||||
}
|
||||
|
||||
// The execution profile is maintained in ProfileManager, if it is finished, then should
|
||||
// remove it from it as soon as possible.
|
||||
public void update(long startTime, boolean isFinished) {
|
||||
if (this.isFinished) {
|
||||
return;
|
||||
}
|
||||
this.isFinished = isFinished;
|
||||
this.startTime = startTime;
|
||||
if (startTime > 0) {
|
||||
executionProfile.getCounterTotalTime().setValue(TUnit.TIME_MS, TimeUtils.getElapsedTimeMs(startTime));
|
||||
}
|
||||
// Wait for all backends to finish reporting when writing profile last time.
|
||||
if (isFinished && profileDoneSignal != null) {
|
||||
try {
|
||||
profileDoneSignal.await(2, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e1) {
|
||||
LOG.warn("signal await error", e1);
|
||||
}
|
||||
root.getCounterTotalTime().setValue(TUnit.TIME_MS, TimeUtils.getElapsedTimeMs(startTime));
|
||||
}
|
||||
|
||||
if (isFinished && profileFragmentDoneSignal != null) {
|
||||
try {
|
||||
profileFragmentDoneSignal.await(2, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e1) {
|
||||
LOG.warn("signal await error", e1);
|
||||
}
|
||||
}
|
||||
|
||||
for (RuntimeProfile fragmentProfile : fragmentProfiles) {
|
||||
for (RuntimeProfile fragmentProfile : fragmentProfiles.values()) {
|
||||
fragmentProfile.sortChildren();
|
||||
}
|
||||
}
|
||||
|
||||
public void onCancel() {
|
||||
if (profileDoneSignal != null) {
|
||||
// count down to zero to notify all objects waiting for this
|
||||
profileDoneSignal.countDownToZero(new Status());
|
||||
}
|
||||
if (profileFragmentDoneSignal != null) {
|
||||
profileFragmentDoneSignal.countDownToZero(new Status());
|
||||
}
|
||||
}
|
||||
|
||||
public void markOneInstanceDone(TUniqueId fragmentInstanceId) {
|
||||
if (profileDoneSignal != null) {
|
||||
if (!profileDoneSignal.markedCountDown(fragmentInstanceId, -1L)) {
|
||||
LOG.warn("Mark instance {} done failed", DebugUtil.printId(fragmentInstanceId));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void markOneFragmentDone(int fragmentId) {
|
||||
if (profileFragmentDoneSignal != null) {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
befragmentDone.put(fragmentId, befragmentDone.get(fragmentId) - 1);
|
||||
if (befragmentDone.get(fragmentId) == 0) {
|
||||
if (!profileFragmentDoneSignal.markedCountDown(fragmentId, -1L)) {
|
||||
LOG.warn("Mark fragment {} done failed", fragmentId);
|
||||
}
|
||||
public void updateProfile(TReportExecStatusParams params, TNetworkAddress address) {
|
||||
if (isPipelineXProfile) {
|
||||
int pipelineIdx = 0;
|
||||
List<RuntimeProfile> taskProfile = Lists.newArrayList();
|
||||
for (TDetailedReportParams param : params.detailed_report) {
|
||||
String name = "Pipeline :" + pipelineIdx + " "
|
||||
+ " (host=" + address + ")";
|
||||
RuntimeProfile profile = new RuntimeProfile(name);
|
||||
taskProfile.add(profile);
|
||||
if (param.isSetProfile()) {
|
||||
profile.update(param.profile);
|
||||
}
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
if (params.done) {
|
||||
profile.setIsDone(true);
|
||||
}
|
||||
pipelineIdx++;
|
||||
fragmentProfiles.get(params.fragment_id).addChild(profile);
|
||||
}
|
||||
// TODO ygl: is this right? there maybe multi Backends, what does
|
||||
// update load profile do???
|
||||
if (params.isSetLoadChannelProfile()) {
|
||||
loadChannelProfile.update(params.loadChannelProfile);
|
||||
}
|
||||
multiBeProfile.get(params.fragment_id).put(address, taskProfile);
|
||||
} else {
|
||||
PlanFragmentId fragmentId = instanceIdToFragmentId.get(params.fragment_instance_id);
|
||||
if (fragmentId == null) {
|
||||
LOG.warn("Could not find related fragment for instance {}",
|
||||
DebugUtil.printId(params.fragment_instance_id));
|
||||
return;
|
||||
}
|
||||
// Do not use fragment id in params, because non-pipeline engine will set it to -1
|
||||
Map<TUniqueId, RuntimeProfile> instanceProfiles = fragmentInstancesProfiles.get(fragmentId);
|
||||
if (instanceProfiles == null) {
|
||||
LOG.warn("Could not find related instances for fragment {}", fragmentId);
|
||||
return;
|
||||
}
|
||||
RuntimeProfile instanceProfile = instanceProfiles.get(params.fragment_instance_id);
|
||||
if (instanceProfile == null) {
|
||||
LOG.warn("Could not find related instance {}", params.fragment_instance_id);
|
||||
return;
|
||||
}
|
||||
if (params.isSetProfile()) {
|
||||
instanceProfile.update(params.profile);
|
||||
}
|
||||
if (params.isSetDone() && params.isDone()) {
|
||||
instanceProfile.setIsDone(true);
|
||||
}
|
||||
if (params.isSetLoadChannelProfile()) {
|
||||
loadChannelProfile.update(params.loadChannelProfile);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean awaitAllInstancesDone(long waitTimeS) throws InterruptedException {
|
||||
if (profileDoneSignal == null) {
|
||||
return true;
|
||||
// MultiInstances may update the profile concurrently
|
||||
public synchronized void addInstanceProfile(PlanFragmentId fragmentId, TUniqueId instanceId,
|
||||
RuntimeProfile instanceProfile) {
|
||||
Map<TUniqueId, RuntimeProfile> instanceProfiles = fragmentInstancesProfiles.get(fragmentId);
|
||||
if (instanceProfiles == null) {
|
||||
instanceProfiles = Maps.newHashMap();
|
||||
fragmentInstancesProfiles.put(fragmentId, instanceProfiles);
|
||||
}
|
||||
RuntimeProfile existingInstanceProfile = instanceProfiles.get(instanceId);
|
||||
if (existingInstanceProfile == null) {
|
||||
instanceProfiles.put(instanceId, instanceProfile);
|
||||
instanceIdToFragmentId.put(instanceId, fragmentId);
|
||||
fragmentProfiles.get(fragmentId.asInt()).addChild(instanceProfile);
|
||||
return;
|
||||
}
|
||||
return profileDoneSignal.await(waitTimeS, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public boolean awaitAllFragmentsDone(long waitTimeS) throws InterruptedException {
|
||||
if (profileFragmentDoneSignal == null) {
|
||||
return true;
|
||||
}
|
||||
return profileFragmentDoneSignal.await(waitTimeS, TimeUnit.SECONDS);
|
||||
public synchronized void addFragmentBackend(PlanFragmentId fragmentId, Long backendId) {
|
||||
fragmentIdBeNum.put(fragmentId.asInt(), fragmentIdBeNum.get(fragmentId.asInt()) + 1);
|
||||
}
|
||||
|
||||
public boolean isAllInstancesDone() {
|
||||
if (profileDoneSignal == null) {
|
||||
return true;
|
||||
}
|
||||
return profileDoneSignal.getCount() == 0;
|
||||
public TUniqueId getQueryId() {
|
||||
return queryId;
|
||||
}
|
||||
|
||||
public boolean isAllFragmentsDone() {
|
||||
if (profileFragmentDoneSignal == null) {
|
||||
return true;
|
||||
// Check all fragments's child, if all finished, then this execution profile is finished
|
||||
public boolean isCompleted() {
|
||||
for (Entry<Integer, RuntimeProfile> element : fragmentProfiles.entrySet()) {
|
||||
RuntimeProfile fragmentProfile = element.getValue();
|
||||
// If any fragment is empty, it means BE does not report the profile, then the total
|
||||
// execution profile is not completed.
|
||||
if (fragmentProfile.isEmpty()
|
||||
|| fragmentProfile.getChildList().size() < fragmentIdBeNum.get(element.getKey())) {
|
||||
return false;
|
||||
}
|
||||
for (Pair<RuntimeProfile, Boolean> runtimeProfile : fragmentProfile.getChildList()) {
|
||||
// If any child instance profile is not ready, then return false.
|
||||
if (!(runtimeProfile.first.getIsDone() || runtimeProfile.first.getIsCancel())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return profileFragmentDoneSignal.getCount() == 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
public void addInstanceProfile(int fragmentId, RuntimeProfile instanceProfile) {
|
||||
Preconditions.checkArgument(fragmentId < fragmentProfiles.size(),
|
||||
fragmentId + " vs. " + fragmentProfiles.size());
|
||||
fragmentProfiles.get(fragmentId).addChild(instanceProfile);
|
||||
public long getQueryFinishTime() {
|
||||
return queryFinishTime;
|
||||
}
|
||||
|
||||
public void setQueryFinishTime(long queryFinishTime) {
|
||||
this.queryFinishTime = queryFinishTime;
|
||||
}
|
||||
|
||||
public SummaryProfile getSummaryProfile() {
|
||||
return summaryProfile;
|
||||
}
|
||||
|
||||
public void setSummaryProfile(SummaryProfile summaryProfile) {
|
||||
this.summaryProfile = summaryProfile;
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,11 +45,16 @@ import java.util.Map;
|
||||
* ExecutionProfile1: Fragment 0: Fragment 1: ...
|
||||
* ExecutionProfile2: Fragment 0: Fragment 1: ...
|
||||
*
|
||||
* ExecutionProfile: Fragment 0: Fragment 1: ...
|
||||
* And also summary profile contains plan information, but execution profile is for
|
||||
* be execution time.
|
||||
* StmtExecutor(Profile) ---> Coordinator(ExecutionProfile)
|
||||
*/
|
||||
public class Profile {
|
||||
private static final Logger LOG = LogManager.getLogger(Profile.class);
|
||||
private static final int MergedProfileLevel = 1;
|
||||
private RuntimeProfile rootProfile;
|
||||
private final String name;
|
||||
private final boolean isPipelineX;
|
||||
private SummaryProfile summaryProfile;
|
||||
private List<ExecutionProfile> executionProfiles = Lists.newArrayList();
|
||||
private boolean isFinished;
|
||||
@ -57,51 +62,57 @@ public class Profile {
|
||||
|
||||
private int profileLevel = 3;
|
||||
|
||||
public Profile(String name, boolean isEnable) {
|
||||
this.rootProfile = new RuntimeProfile(name);
|
||||
this.summaryProfile = new SummaryProfile(rootProfile);
|
||||
public Profile(String name, boolean isEnable, int profileLevel, boolean isPipelineX) {
|
||||
this.name = name;
|
||||
this.isPipelineX = isPipelineX;
|
||||
this.summaryProfile = new SummaryProfile();
|
||||
// if disabled, just set isFinished to true, so that update() will do nothing
|
||||
this.isFinished = !isEnable;
|
||||
this.profileLevel = profileLevel;
|
||||
}
|
||||
|
||||
// For load task, the profile contains many execution profiles
|
||||
public void addExecutionProfile(ExecutionProfile executionProfile) {
|
||||
if (executionProfile == null) {
|
||||
LOG.warn("try to set a null excecution profile, it is abnormal", new Exception());
|
||||
return;
|
||||
}
|
||||
if (this.isPipelineX) {
|
||||
executionProfile.setPipelineX();
|
||||
}
|
||||
executionProfile.setSummaryProfile(summaryProfile);
|
||||
this.executionProfiles.add(executionProfile);
|
||||
executionProfile.addToProfileAsChild(rootProfile);
|
||||
}
|
||||
|
||||
public synchronized void update(long startTime, Map<String, String> summaryInfo, boolean isFinished,
|
||||
int profileLevel, Planner planner, boolean isPipelineX) {
|
||||
public List<ExecutionProfile> getExecutionProfiles() {
|
||||
return this.executionProfiles;
|
||||
}
|
||||
|
||||
// This API will also add the profile to ProfileManager, so that we could get the profile from ProfileManager.
|
||||
// isFinished ONLY means the cooridnator or stmtexecutor is finished.
|
||||
public synchronized void updateSummary(long startTime, Map<String, String> summaryInfo, boolean isFinished,
|
||||
Planner planner) {
|
||||
try {
|
||||
if (this.isFinished) {
|
||||
return;
|
||||
}
|
||||
summaryProfile.update(summaryInfo);
|
||||
for (ExecutionProfile executionProfile : executionProfiles) {
|
||||
// Tell execution profile the start time
|
||||
executionProfile.update(startTime, isFinished);
|
||||
}
|
||||
rootProfile.computeTimeInProfile();
|
||||
// Nerids native insert not set planner, so it is null
|
||||
if (planner != null) {
|
||||
this.planNodeMap = planner.getExplainStringMap();
|
||||
}
|
||||
rootProfile.setIsPipelineX(isPipelineX);
|
||||
ProfileManager.getInstance().pushProfile(this);
|
||||
this.isFinished = isFinished;
|
||||
this.profileLevel = profileLevel;
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("update profile failed", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
public RuntimeProfile getRootProfile() {
|
||||
return this.rootProfile;
|
||||
}
|
||||
|
||||
public SummaryProfile getSummaryProfile() {
|
||||
return summaryProfile;
|
||||
}
|
||||
@ -110,7 +121,7 @@ public class Profile {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
// add summary to builder
|
||||
summaryProfile.prettyPrint(builder);
|
||||
LOG.info(builder.toString());
|
||||
waitProfileCompleteIfNeeded();
|
||||
// Only generate merged profile for select, insert into select.
|
||||
// Not support broker load now.
|
||||
if (this.profileLevel == MergedProfileLevel && this.executionProfiles.size() == 1) {
|
||||
@ -125,7 +136,7 @@ public class Profile {
|
||||
try {
|
||||
for (ExecutionProfile executionProfile : executionProfiles) {
|
||||
builder.append("\n");
|
||||
executionProfile.getExecutionProfile().prettyPrint(builder, "");
|
||||
executionProfile.getRoot().prettyPrint(builder, "");
|
||||
}
|
||||
} catch (Throwable aggProfileException) {
|
||||
LOG.warn("build profile failed", aggProfileException);
|
||||
@ -134,7 +145,44 @@ public class Profile {
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
// If the query is already finished, and user wants to get the profile, we should check
|
||||
// if BE has reported all profiles, if not, sleep 2s.
|
||||
private void waitProfileCompleteIfNeeded() {
|
||||
if (!this.isFinished) {
|
||||
return;
|
||||
}
|
||||
boolean allCompleted = true;
|
||||
for (ExecutionProfile executionProfile : executionProfiles) {
|
||||
if (!executionProfile.isCompleted()) {
|
||||
allCompleted = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!allCompleted) {
|
||||
try {
|
||||
Thread.currentThread().sleep(2000);
|
||||
} catch (InterruptedException e) {
|
||||
// Do nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private RuntimeProfile composeRootProfile() {
|
||||
|
||||
RuntimeProfile rootProfile = new RuntimeProfile(name);
|
||||
rootProfile.setIsPipelineX(isPipelineX);
|
||||
rootProfile.addChild(summaryProfile.getSummary());
|
||||
rootProfile.addChild(summaryProfile.getExecutionSummary());
|
||||
for (ExecutionProfile executionProfile : executionProfiles) {
|
||||
rootProfile.addChild(executionProfile.getRoot());
|
||||
}
|
||||
rootProfile.computeTimeInProfile();
|
||||
return rootProfile;
|
||||
}
|
||||
|
||||
public String getProfileBrief() {
|
||||
waitProfileCompleteIfNeeded();
|
||||
RuntimeProfile rootProfile = composeRootProfile();
|
||||
Gson gson = new GsonBuilder().setPrettyPrinting().create();
|
||||
return gson.toJson(rootProfile.toBrief());
|
||||
}
|
||||
|
||||
@ -144,12 +144,22 @@ public class SummaryProfile {
|
||||
private long queryFetchResultConsumeTime = 0;
|
||||
private long queryWriteResultConsumeTime = 0;
|
||||
|
||||
public SummaryProfile(RuntimeProfile rootProfile) {
|
||||
public SummaryProfile() {
|
||||
summaryProfile = new RuntimeProfile(SUMMARY_PROFILE_NAME);
|
||||
executionSummaryProfile = new RuntimeProfile(EXECUTION_SUMMARY_PROFILE_NAME);
|
||||
init();
|
||||
rootProfile.addChild(summaryProfile);
|
||||
rootProfile.addChild(executionSummaryProfile);
|
||||
}
|
||||
|
||||
public String getProfileId() {
|
||||
return this.summaryProfile.getInfoString(PROFILE_ID);
|
||||
}
|
||||
|
||||
public RuntimeProfile getSummary() {
|
||||
return summaryProfile;
|
||||
}
|
||||
|
||||
public RuntimeProfile getExecutionSummary() {
|
||||
return executionSummaryProfile;
|
||||
}
|
||||
|
||||
private void init() {
|
||||
|
||||
@ -21,12 +21,14 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.AuthenticationException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.profile.ExecutionProfile;
|
||||
import org.apache.doris.common.profile.MultiProfileTreeBuilder;
|
||||
import org.apache.doris.common.profile.Profile;
|
||||
import org.apache.doris.common.profile.ProfileTreeBuilder;
|
||||
import org.apache.doris.common.profile.ProfileTreeNode;
|
||||
import org.apache.doris.common.profile.SummaryProfile;
|
||||
import org.apache.doris.nereids.stats.StatsErrorEstimator;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -104,6 +106,9 @@ public class ProfileManager {
|
||||
// record the order of profiles by queryId
|
||||
private Deque<String> queryIdDeque;
|
||||
private Map<String, ProfileElement> queryIdToProfileMap; // from QueryId to RuntimeProfile
|
||||
// Sometimes one Profile is related with multiple execution profiles(Brokerload), so that
|
||||
// execution profile's query id is not related with Profile's query id.
|
||||
private Map<TUniqueId, ExecutionProfile> queryIdToExecutionProfiles;
|
||||
|
||||
public static ProfileManager getInstance() {
|
||||
if (INSTANCE == null) {
|
||||
@ -122,25 +127,58 @@ public class ProfileManager {
|
||||
writeLock = lock.writeLock();
|
||||
queryIdDeque = new LinkedList<>();
|
||||
queryIdToProfileMap = new ConcurrentHashMap<>();
|
||||
queryIdToExecutionProfiles = Maps.newHashMap();
|
||||
}
|
||||
|
||||
public ProfileElement createElement(Profile profile) {
|
||||
private ProfileElement createElement(Profile profile) {
|
||||
ProfileElement element = new ProfileElement(profile);
|
||||
element.infoStrings.putAll(profile.getSummaryProfile().getAsInfoStings());
|
||||
MultiProfileTreeBuilder builder = new MultiProfileTreeBuilder(profile.getRootProfile());
|
||||
try {
|
||||
builder.build();
|
||||
} catch (Exception e) {
|
||||
element.errMsg = e.getMessage();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("failed to build profile tree", e);
|
||||
}
|
||||
return element;
|
||||
}
|
||||
element.builder = builder;
|
||||
// Not init builder any more, we will not maintain it since 2.1.0, because the structure
|
||||
// assume that the execution profiles structure is already known before execution. But in
|
||||
// PipelineX Engine, it will changed during execution.
|
||||
return element;
|
||||
}
|
||||
|
||||
public void addExecutionProfile(ExecutionProfile executionProfile) {
|
||||
if (executionProfile == null) {
|
||||
return;
|
||||
}
|
||||
writeLock.lock();
|
||||
try {
|
||||
if (queryIdToExecutionProfiles.containsKey(executionProfile.getQueryId())) {
|
||||
return;
|
||||
}
|
||||
queryIdToExecutionProfiles.put(executionProfile.getQueryId(), executionProfile);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Add execution profile {} to profile manager",
|
||||
DebugUtil.printId(executionProfile.getQueryId()));
|
||||
}
|
||||
// Check if there are some query profiles that not finish collecting, should
|
||||
// remove them to release memory.
|
||||
if (queryIdToExecutionProfiles.size() > 2 * Config.max_query_profile_num) {
|
||||
List<ExecutionProfile> finishOrExpireExecutionProfiles = Lists.newArrayList();
|
||||
for (ExecutionProfile tmpProfile : queryIdToExecutionProfiles.values()) {
|
||||
if (System.currentTimeMillis() - tmpProfile.getQueryFinishTime()
|
||||
> Config.profile_async_collect_expire_time_secs * 1000) {
|
||||
finishOrExpireExecutionProfiles.add(tmpProfile);
|
||||
}
|
||||
}
|
||||
for (ExecutionProfile tmp : finishOrExpireExecutionProfiles) {
|
||||
queryIdToExecutionProfiles.remove(tmp.getQueryId());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Remove expired execution profile {}", DebugUtil.printId(tmp.getQueryId()));
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public ExecutionProfile getExecutionProfile(TUniqueId queryId) {
|
||||
return this.queryIdToExecutionProfiles.get(queryId);
|
||||
}
|
||||
|
||||
public void pushProfile(Profile profile) {
|
||||
if (profile == null) {
|
||||
return;
|
||||
@ -148,14 +186,13 @@ public class ProfileManager {
|
||||
|
||||
ProfileElement element = createElement(profile);
|
||||
// 'insert into' does have job_id, put all profiles key with query_id
|
||||
String key = element.infoStrings.get(SummaryProfile.PROFILE_ID);
|
||||
String key = element.profile.getSummaryProfile().getProfileId();
|
||||
// check when push in, which can ensure every element in the list has QUERY_ID column,
|
||||
// so there is no need to check when remove element from list.
|
||||
if (Strings.isNullOrEmpty(key)) {
|
||||
LOG.warn("the key or value of Map is null, "
|
||||
+ "may be forget to insert 'QUERY_ID' or 'JOB_ID' column into infoStrings");
|
||||
}
|
||||
|
||||
writeLock.lock();
|
||||
// a profile may be updated multiple times in queryIdToProfileMap,
|
||||
// and only needs to be inserted into the queryIdDeque for the first time.
|
||||
@ -163,7 +200,13 @@ public class ProfileManager {
|
||||
try {
|
||||
if (!queryIdDeque.contains(key)) {
|
||||
if (queryIdDeque.size() >= Config.max_query_profile_num) {
|
||||
queryIdToProfileMap.remove(queryIdDeque.getFirst());
|
||||
ProfileElement profileElementRemoved = queryIdToProfileMap.remove(queryIdDeque.getFirst());
|
||||
// If the Profile object is removed from manager, then related execution profile is also useless.
|
||||
if (profileElementRemoved != null) {
|
||||
for (ExecutionProfile executionProfile : profileElementRemoved.profile.getExecutionProfiles()) {
|
||||
this.queryIdToExecutionProfiles.remove(executionProfile.getQueryId());
|
||||
}
|
||||
}
|
||||
queryIdDeque.removeFirst();
|
||||
}
|
||||
queryIdDeque.addLast(key);
|
||||
@ -173,6 +216,21 @@ public class ProfileManager {
|
||||
}
|
||||
}
|
||||
|
||||
public void removeProfile(String profileId) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
ProfileElement profileElementRemoved = queryIdToProfileMap.remove(profileId);
|
||||
// If the Profile object is removed from manager, then related execution profile is also useless.
|
||||
if (profileElementRemoved != null) {
|
||||
for (ExecutionProfile executionProfile : profileElementRemoved.profile.getExecutionProfiles()) {
|
||||
this.queryIdToExecutionProfiles.remove(executionProfile.getQueryId());
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public List<List<String>> getAllQueries() {
|
||||
return getQueryWithType(null);
|
||||
}
|
||||
|
||||
@ -26,6 +26,7 @@ import org.apache.doris.thrift.TRuntimeProfileTree;
|
||||
import org.apache.doris.thrift.TUnit;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -69,7 +70,8 @@ public class RuntimeProfile {
|
||||
private ReentrantReadWriteLock childLock = new ReentrantReadWriteLock();
|
||||
|
||||
private List<String> planNodeInfos = Lists.newArrayList();
|
||||
private String name;
|
||||
// name should not changed.
|
||||
private final String name;
|
||||
|
||||
private Long timestamp = -1L;
|
||||
|
||||
@ -85,22 +87,24 @@ public class RuntimeProfile {
|
||||
private int nodeid = -1;
|
||||
|
||||
public RuntimeProfile(String name) {
|
||||
this();
|
||||
this.localTimePercent = 0;
|
||||
if (Strings.isNullOrEmpty(name)) {
|
||||
throw new RuntimeException("Profile name must not be null");
|
||||
}
|
||||
this.name = name;
|
||||
this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 1);
|
||||
this.counterMap.put("TotalTime", counterTotalTime);
|
||||
}
|
||||
|
||||
public RuntimeProfile(String name, int nodeId) {
|
||||
this();
|
||||
this.localTimePercent = 0;
|
||||
if (Strings.isNullOrEmpty(name)) {
|
||||
throw new RuntimeException("Profile name must not be null");
|
||||
}
|
||||
this.name = name;
|
||||
this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 3);
|
||||
this.nodeid = nodeId;
|
||||
}
|
||||
|
||||
public RuntimeProfile() {
|
||||
this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 1);
|
||||
this.localTimePercent = 0;
|
||||
this.counterMap.put("TotalTime", counterTotalTime);
|
||||
this.nodeid = nodeId;
|
||||
}
|
||||
|
||||
public void setIsCancel(Boolean isCancel) {
|
||||
@ -143,10 +147,6 @@ public class RuntimeProfile {
|
||||
this.isPipelineX = isPipelineX;
|
||||
}
|
||||
|
||||
public boolean getIsPipelineX() {
|
||||
return this.isPipelineX;
|
||||
}
|
||||
|
||||
public Map<String, Counter> getCounterMap() {
|
||||
return counterMap;
|
||||
}
|
||||
@ -155,6 +155,10 @@ public class RuntimeProfile {
|
||||
return childList;
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return childList.isEmpty();
|
||||
}
|
||||
|
||||
public Map<String, RuntimeProfile> getChildMap() {
|
||||
return childMap;
|
||||
}
|
||||
@ -750,10 +754,6 @@ public class RuntimeProfile {
|
||||
}
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
// Returns the value to which the specified key is mapped;
|
||||
// or null if this map contains no mapping for the key.
|
||||
public String getInfoString(String key) {
|
||||
|
||||
@ -199,7 +199,9 @@ public class BrokerLoadJob extends BulkLoadJob {
|
||||
// divide job into broker loading task by table
|
||||
List<LoadLoadingTask> newLoadingTasks = Lists.newArrayList();
|
||||
if (enableProfile) {
|
||||
this.jobProfile = new Profile("BrokerLoadJob " + id + ". " + label, true);
|
||||
this.jobProfile = new Profile("BrokerLoadJob " + id + ". " + label, true,
|
||||
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, "3")),
|
||||
false);
|
||||
}
|
||||
ProgressManager progressManager = Env.getCurrentProgressManager();
|
||||
progressManager.registerProgressSimple(String.valueOf(id));
|
||||
@ -329,16 +331,6 @@ public class BrokerLoadJob extends BulkLoadJob {
|
||||
}
|
||||
}
|
||||
|
||||
private void writeProfile() {
|
||||
if (!enableProfile) {
|
||||
return;
|
||||
}
|
||||
jobProfile.update(createTimestamp, getSummaryInfo(true), true,
|
||||
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, "3")), null, false);
|
||||
// jobProfile has been pushed into ProfileManager, remove reference in brokerLoadJob
|
||||
jobProfile = null;
|
||||
}
|
||||
|
||||
private Map<String, String> getSummaryInfo(boolean isFinished) {
|
||||
long currentTimestamp = System.currentTimeMillis();
|
||||
SummaryBuilder builder = new SummaryBuilder();
|
||||
@ -410,7 +402,12 @@ public class BrokerLoadJob extends BulkLoadJob {
|
||||
@Override
|
||||
public void afterVisible(TransactionState txnState, boolean txnOperated) {
|
||||
super.afterVisible(txnState, txnOperated);
|
||||
writeProfile();
|
||||
if (!enableProfile) {
|
||||
return;
|
||||
}
|
||||
jobProfile.updateSummary(createTimestamp, getSummaryInfo(true), true, null);
|
||||
// jobProfile has been pushed into ProfileManager, remove reference in brokerLoadJob
|
||||
jobProfile = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -172,8 +172,11 @@ public class LoadLoadingTask extends LoadTask {
|
||||
}
|
||||
|
||||
try {
|
||||
QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator);
|
||||
QeProcessorImpl.INSTANCE.registerQuery(loadId, new QeProcessorImpl.QueryInfo(curCoordinator));
|
||||
actualExecute(curCoordinator, timeoutS);
|
||||
if (this.jobProfile != null) {
|
||||
curCoordinator.getExecutionProfile().update(beginTime, true);
|
||||
}
|
||||
} finally {
|
||||
QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
|
||||
}
|
||||
@ -198,8 +201,6 @@ public class LoadLoadingTask extends LoadTask {
|
||||
ErrorTabletInfo.fromThrift(curCoordinator.getErrorTabletInfos()
|
||||
.stream().limit(Config.max_error_tablet_of_broker_load).collect(Collectors.toList())));
|
||||
curCoordinator.getErrorTabletInfos().clear();
|
||||
// Create profile of this task and add to the job profile.
|
||||
createProfile(curCoordinator);
|
||||
} else {
|
||||
throw new LoadException(status.getErrorMsg());
|
||||
}
|
||||
@ -212,15 +213,6 @@ public class LoadLoadingTask extends LoadTask {
|
||||
return jobDeadlineMs - System.currentTimeMillis();
|
||||
}
|
||||
|
||||
private void createProfile(Coordinator coord) {
|
||||
if (jobProfile == null) {
|
||||
// No need to gather profile
|
||||
return;
|
||||
}
|
||||
// Summary profile
|
||||
coord.getExecutionProfile().update(beginTime, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateRetryInfo() {
|
||||
super.updateRetryInfo();
|
||||
|
||||
@ -127,7 +127,9 @@ public class LoadCommand extends Command implements ForwardWithSync {
|
||||
if (!Config.enable_nereids_load) {
|
||||
throw new AnalysisException("Fallback to legacy planner temporary.");
|
||||
}
|
||||
this.profile = new Profile("Query", ctx.getSessionVariable().enableProfile);
|
||||
this.profile = new Profile("Query", ctx.getSessionVariable().enableProfile,
|
||||
ctx.getSessionVariable().profileLevel,
|
||||
ctx.getSessionVariable().getEnablePipelineXEngine());
|
||||
profile.getSummaryProfile().setQueryBeginTime();
|
||||
if (sourceInfos.size() == 1) {
|
||||
plans = ImmutableList.of(new InsertIntoTableCommand(completeQueryPlan(ctx, sourceInfos.get(0)),
|
||||
|
||||
@ -25,8 +25,6 @@ public interface CoordInterface {
|
||||
|
||||
public RowBatch getNext() throws Exception;
|
||||
|
||||
public int getInstanceTotalNum();
|
||||
|
||||
public void cancel(Types.PPlanFragmentCancelReason cancelReason);
|
||||
|
||||
// When call exec or get next data finished, should call this method to release
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -168,12 +168,6 @@ public class PointQueryExec implements CoordInterface {
|
||||
requestBuilder.addKeyTuples(kBuilder);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInstanceTotalNum() {
|
||||
// TODO
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
|
||||
// Do nothing
|
||||
|
||||
@ -31,8 +31,6 @@ public interface QeProcessor {
|
||||
|
||||
TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr);
|
||||
|
||||
void registerQuery(TUniqueId queryId, Coordinator coord) throws UserException;
|
||||
|
||||
void registerQuery(TUniqueId queryId, QeProcessorImpl.QueryInfo info) throws UserException;
|
||||
|
||||
void registerInstances(TUniqueId queryId, Integer instancesNum) throws UserException;
|
||||
|
||||
@ -23,6 +23,7 @@ import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.profile.ExecutionProfile;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.ProfileManager;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
@ -53,6 +54,7 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
|
||||
private Map<TUniqueId, Integer> queryToInstancesNum;
|
||||
private Map<String, AtomicInteger> userToInstancesCount;
|
||||
private ExecutorService writeProfileExecutor;
|
||||
|
||||
public static final QeProcessor INSTANCE;
|
||||
|
||||
@ -60,15 +62,13 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
INSTANCE = new QeProcessorImpl();
|
||||
}
|
||||
|
||||
private ExecutorService writeProfileExecutor;
|
||||
|
||||
private QeProcessorImpl() {
|
||||
coordinatorMap = new ConcurrentHashMap<>();
|
||||
// write profile to ProfileManager when query is running.
|
||||
writeProfileExecutor = ThreadPoolManager.newDaemonProfileThreadPool(1, 100,
|
||||
"profile-write-pool", true);
|
||||
queryToInstancesNum = new ConcurrentHashMap<>();
|
||||
userToInstancesCount = new ConcurrentHashMap<>();
|
||||
// write profile to ProfileManager when query is running.
|
||||
writeProfileExecutor = ThreadPoolManager.newDaemonProfileThreadPool(3, 100,
|
||||
"profile-write-pool", true);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -90,11 +90,6 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
return res;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerQuery(TUniqueId queryId, Coordinator coord) throws UserException {
|
||||
registerQuery(queryId, new QueryInfo(coord));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerQuery(TUniqueId queryId, QueryInfo info) throws UserException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -104,6 +99,10 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
if (result != null) {
|
||||
throw new UserException("queryId " + queryId + " already exists");
|
||||
}
|
||||
|
||||
// Should add the execution profile to profile manager, BE will report the profile to FE and FE
|
||||
// will update it in ProfileManager
|
||||
ProfileManager.getInstance().addExecutionProfile(info.getCoord().getExecutionProfile());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -145,7 +144,18 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Deregister query id {}", DebugUtil.printId(queryId));
|
||||
}
|
||||
|
||||
ExecutionProfile executionProfile = ProfileManager.getInstance().getExecutionProfile(queryId);
|
||||
if (executionProfile != null) {
|
||||
executionProfile.setQueryFinishTime(System.currentTimeMillis());
|
||||
if (queryInfo.connectContext != null) {
|
||||
long autoProfileThresholdMs = queryInfo.connectContext
|
||||
.getSessionVariable().getAutoProfileThresholdMs();
|
||||
if (autoProfileThresholdMs > 0 && System.currentTimeMillis() - queryInfo.getStartExecTime()
|
||||
< autoProfileThresholdMs) {
|
||||
ProfileManager.getInstance().removeProfile(executionProfile.getSummaryProfile().getProfileId());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (queryInfo.getConnectContext() != null
|
||||
&& !Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
|
||||
) {
|
||||
@ -187,7 +197,7 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
.connId(String.valueOf(context.getConnectionId())).db(context.getDatabase())
|
||||
.catalog(context.getDefaultCatalog())
|
||||
.fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos())
|
||||
.profile(info.getCoord().getExecutionProfile().getExecutionProfile())
|
||||
.profile(info.getCoord().getExecutionProfile().getRoot())
|
||||
.isReportSucc(context.getSessionVariable().enableProfile()).build();
|
||||
querySet.put(queryIdStr, item);
|
||||
}
|
||||
@ -196,13 +206,25 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
|
||||
@Override
|
||||
public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr) {
|
||||
if (params.isSetProfile()) {
|
||||
if (params.isSetProfile() || params.isSetLoadChannelProfile()) {
|
||||
LOG.info("ReportExecStatus(): fragment_instance_id={}, query id={}, backend num: {}, ip: {}",
|
||||
DebugUtil.printId(params.fragment_instance_id), DebugUtil.printId(params.query_id),
|
||||
params.backend_num, beAddr);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("params: {}", params);
|
||||
}
|
||||
ExecutionProfile executionProfile = ProfileManager.getInstance().getExecutionProfile(params.query_id);
|
||||
if (executionProfile != null) {
|
||||
// Update profile may cost a lot of time, use a seperate pool to deal with it.
|
||||
writeProfileExecutor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
executionProfile.updateProfile(params, beAddr);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
LOG.info("Could not find execution profile with query id {}", DebugUtil.printId(params.query_id));
|
||||
}
|
||||
}
|
||||
final TReportExecStatusResult result = new TReportExecStatusResult();
|
||||
|
||||
@ -229,12 +251,9 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
}
|
||||
try {
|
||||
info.getCoord().updateFragmentExecStatus(params);
|
||||
if (params.isSetProfile()) {
|
||||
writeProfileExecutor.submit(new WriteProfileTask(params, info));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Exception during handle report, response: {}, query: {}, instance: {}", result.toString(),
|
||||
DebugUtil.printId(params.query_id), DebugUtil.printId(params.fragment_instance_id));
|
||||
DebugUtil.printId(params.query_id), DebugUtil.printId(params.fragment_instance_id), e);
|
||||
return result;
|
||||
}
|
||||
result.setStatus(new TStatus(TStatusCode.OK));
|
||||
@ -266,6 +285,7 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
private final ConnectContext connectContext;
|
||||
private final Coordinator coord;
|
||||
private final String sql;
|
||||
private long registerTimeMs = 0L;
|
||||
|
||||
// from Export, Pull load, Insert
|
||||
public QueryInfo(Coordinator coord) {
|
||||
@ -277,6 +297,7 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
this.connectContext = connectContext;
|
||||
this.coord = coord;
|
||||
this.sql = sql;
|
||||
this.registerTimeMs = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public ConnectContext getConnectContext() {
|
||||
@ -295,7 +316,7 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
if (coord.getQueueToken() != null) {
|
||||
return coord.getQueueToken().getQueueEndTime();
|
||||
}
|
||||
return -1;
|
||||
return registerTimeMs;
|
||||
}
|
||||
|
||||
public long getQueueStartTime() {
|
||||
@ -319,26 +340,4 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private class WriteProfileTask implements Runnable {
|
||||
private TReportExecStatusParams params;
|
||||
|
||||
private QueryInfo queryInfo;
|
||||
|
||||
WriteProfileTask(TReportExecStatusParams params, QueryInfo queryInfo) {
|
||||
this.params = params;
|
||||
this.queryInfo = queryInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
QueryInfo info = coordinatorMap.get(params.query_id);
|
||||
if (info == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
ExecutionProfile executionProfile = info.getCoord().getExecutionProfile();
|
||||
executionProfile.update(-1, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -84,6 +84,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public static final String MAX_EXECUTION_TIME = "max_execution_time";
|
||||
public static final String INSERT_TIMEOUT = "insert_timeout";
|
||||
public static final String ENABLE_PROFILE = "enable_profile";
|
||||
public static final String AUTO_PROFILE_THRESHOLD_MS = "auto_profile_threshold_ms";
|
||||
public static final String SQL_MODE = "sql_mode";
|
||||
public static final String WORKLOAD_VARIABLE = "workload_group";
|
||||
public static final String RESOURCE_VARIABLE = "resource_group";
|
||||
@ -629,6 +630,10 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true)
|
||||
public boolean enableProfile = false;
|
||||
|
||||
// if true, need report to coordinator when plan fragment execute successfully.
|
||||
@VariableMgr.VarAttr(name = AUTO_PROFILE_THRESHOLD_MS, needForward = true)
|
||||
public int autoProfileThresholdMs = -1;
|
||||
|
||||
@VariableMgr.VarAttr(name = "runtime_filter_prune_for_external")
|
||||
public boolean runtimeFilterPruneForExternal = true;
|
||||
|
||||
@ -1969,6 +1974,10 @@ public class SessionVariable implements Serializable, Writable {
|
||||
return enableProfile;
|
||||
}
|
||||
|
||||
public int getAutoProfileThresholdMs() {
|
||||
return this.autoProfileThresholdMs;
|
||||
}
|
||||
|
||||
public boolean enableSingleDistinctColumnOpt() {
|
||||
return enableSingleDistinctColumnOpt;
|
||||
}
|
||||
|
||||
@ -220,7 +220,6 @@ public class StmtExecutor {
|
||||
private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
|
||||
public static final int MAX_DATA_TO_SEND_FOR_TXN = 100;
|
||||
public static final String NULL_VALUE_FOR_LOAD = "\\N";
|
||||
private final Object writeProfileLock = new Object();
|
||||
private ConnectContext context;
|
||||
private final StatementContext statementContext;
|
||||
private MysqlSerializer serializer;
|
||||
@ -260,7 +259,9 @@ public class StmtExecutor {
|
||||
this.isProxy = isProxy;
|
||||
this.statementContext = new StatementContext(context, originStmt);
|
||||
this.context.setStatementContext(statementContext);
|
||||
this.profile = new Profile("Query", this.context.getSessionVariable().enableProfile);
|
||||
this.profile = new Profile("Query", this.context.getSessionVariable().enableProfile,
|
||||
this.context.getSessionVariable().profileLevel,
|
||||
this.context.getSessionVariable().getEnablePipelineXEngine());
|
||||
}
|
||||
|
||||
// for test
|
||||
@ -290,7 +291,8 @@ public class StmtExecutor {
|
||||
this.statementContext.setParsedStatement(parsedStmt);
|
||||
}
|
||||
this.context.setStatementContext(statementContext);
|
||||
this.profile = new Profile("Query", context.getSessionVariable().enableProfile());
|
||||
this.profile = new Profile("Query", context.getSessionVariable().enableProfile(),
|
||||
context.getSessionVariable().profileLevel, context.getSessionVariable().getEnablePipelineXEngine());
|
||||
}
|
||||
|
||||
public static InternalService.PDataRow getRowStringValue(List<Expr> cols) throws UserException {
|
||||
@ -993,9 +995,7 @@ public class StmtExecutor {
|
||||
// and ensure the sql is finished normally. For example, if update profile
|
||||
// failed, the insert stmt should be success
|
||||
try {
|
||||
profile.update(context.startTime, getSummaryInfo(isFinished), isFinished,
|
||||
context.getSessionVariable().profileLevel, this.planner,
|
||||
context.getSessionVariable().getEnablePipelineXEngine());
|
||||
profile.updateSummary(context.startTime, getSummaryInfo(isFinished), isFinished, this.planner);
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("failed to update profile, ingore this error", t);
|
||||
}
|
||||
@ -1600,9 +1600,9 @@ public class StmtExecutor {
|
||||
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
|
||||
} else {
|
||||
coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator());
|
||||
profile.addExecutionProfile(coord.getExecutionProfile());
|
||||
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
|
||||
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
|
||||
profile.addExecutionProfile(coord.getExecutionProfile());
|
||||
coordBase = coord;
|
||||
}
|
||||
|
||||
@ -1610,35 +1610,10 @@ public class StmtExecutor {
|
||||
coordBase.exec();
|
||||
profile.getSummaryProfile().setQueryScheduleFinishTime();
|
||||
updateProfile(false);
|
||||
if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Start to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}",
|
||||
context.getQualifiedUser(), context.getDatabase(),
|
||||
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
|
||||
coordBase.getInstanceTotalNum());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Fail to print fragment concurrency for Query.", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
|
||||
Preconditions.checkState(!context.isReturnResultFromLocal());
|
||||
profile.getSummaryProfile().setTempStartTime();
|
||||
if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, "
|
||||
+ "fragment instance num: {}",
|
||||
context.getQualifiedUser(), context.getDatabase(),
|
||||
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
|
||||
coordBase.getInstanceTotalNum());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Fail to print fragment concurrency for Query.", e);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@ -1723,18 +1698,6 @@ public class StmtExecutor {
|
||||
throw e;
|
||||
} finally {
|
||||
coordBase.close();
|
||||
if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}",
|
||||
context.getQualifiedUser(), context.getDatabase(),
|
||||
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
|
||||
coordBase.getInstanceTotalNum());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Fail to print fragment concurrency for Query.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -22,6 +22,7 @@ import org.apache.doris.proto.InternalService;
|
||||
import org.apache.doris.proto.PBackendServiceGrpc;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import io.grpc.ConnectivityState;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.netty.NettyChannelBuilder;
|
||||
@ -82,7 +83,7 @@ public class BackendServiceClient {
|
||||
.execPlanFragmentStart(request);
|
||||
}
|
||||
|
||||
public Future<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync(
|
||||
public ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync(
|
||||
InternalService.PCancelPlanFragmentRequest request) {
|
||||
return stub.cancelPlanFragment(request);
|
||||
}
|
||||
|
||||
@ -36,6 +36,7 @@ import org.apache.doris.thrift.TPipelineFragmentParamsList;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.protobuf.ByteString;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -225,7 +226,7 @@ public class BackendServiceProxy {
|
||||
}
|
||||
}
|
||||
|
||||
public Future<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync(TNetworkAddress address,
|
||||
public ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync(TNetworkAddress address,
|
||||
TUniqueId finstId, Types.PPlanFragmentCancelReason cancelReason) throws RpcException {
|
||||
final InternalService.PCancelPlanFragmentRequest pRequest =
|
||||
InternalService.PCancelPlanFragmentRequest.newBuilder()
|
||||
@ -241,8 +242,8 @@ public class BackendServiceProxy {
|
||||
}
|
||||
}
|
||||
|
||||
public Future<InternalService.PCancelPlanFragmentResult> cancelPipelineXPlanFragmentAsync(TNetworkAddress address,
|
||||
PlanFragmentId fragmentId, TUniqueId queryId,
|
||||
public ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelPipelineXPlanFragmentAsync(
|
||||
TNetworkAddress address, PlanFragmentId fragmentId, TUniqueId queryId,
|
||||
Types.PPlanFragmentCancelReason cancelReason) throws RpcException {
|
||||
final InternalService.PCancelPlanFragmentRequest pRequest = InternalService.PCancelPlanFragmentRequest
|
||||
.newBuilder()
|
||||
|
||||
@ -98,7 +98,7 @@ public class RuntimeProfileTest {
|
||||
|
||||
@Test
|
||||
public void testCounter() {
|
||||
RuntimeProfile profile = new RuntimeProfile();
|
||||
RuntimeProfile profile = new RuntimeProfile("test counter");
|
||||
profile.addCounter("key", TUnit.UNIT, "");
|
||||
Assert.assertNotNull(profile.getCounterMap().get("key"));
|
||||
Assert.assertNull(profile.getCounterMap().get("key2"));
|
||||
|
||||
@ -33,6 +33,7 @@ import org.apache.doris.analysis.UseStmt;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
import org.apache.doris.common.profile.Profile;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.mysql.MysqlChannel;
|
||||
@ -172,7 +173,8 @@ public class StmtExecutorTest {
|
||||
public void testSelect(@Mocked QueryStmt queryStmt,
|
||||
@Mocked SqlParser parser,
|
||||
@Mocked OriginalPlanner planner,
|
||||
@Mocked Coordinator coordinator) throws Exception {
|
||||
@Mocked Coordinator coordinator,
|
||||
@Mocked Profile profile) throws Exception {
|
||||
Env env = Env.getCurrentEnv();
|
||||
Deencapsulation.setField(env, "canRead", new AtomicBoolean(true));
|
||||
|
||||
|
||||
Reference in New Issue
Block a user