[pipelineX](profile) Phase 1: refactor pipelineX detailed profile (#24322)
This commit is contained in:
@ -80,6 +80,7 @@ import org.apache.doris.task.LoadEtlTask;
|
||||
import org.apache.doris.thrift.PaloInternalServiceVersion;
|
||||
import org.apache.doris.thrift.TBrokerScanRange;
|
||||
import org.apache.doris.thrift.TDescriptorTable;
|
||||
import org.apache.doris.thrift.TDetailedReportParams;
|
||||
import org.apache.doris.thrift.TErrorTabletInfo;
|
||||
import org.apache.doris.thrift.TEsScanRange;
|
||||
import org.apache.doris.thrift.TExecPlanFragmentParams;
|
||||
@ -2305,9 +2306,42 @@ public class Coordinator {
|
||||
|
||||
// update job progress from BE
|
||||
public void updateFragmentExecStatus(TReportExecStatusParams params) {
|
||||
if (enablePipelineEngine) {
|
||||
if (enablePipelineXEngine) {
|
||||
PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId()));
|
||||
if (!ctx.updateProfile(params)) {
|
||||
if (!ctx.updateProfile(params, true)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// print fragment instance profile
|
||||
if (LOG.isDebugEnabled()) {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
ctx.printProfile(builder);
|
||||
LOG.debug("profile for query_id={} fragment_id={}\n{}",
|
||||
DebugUtil.printId(queryId),
|
||||
params.getFragmentId(),
|
||||
builder.toString());
|
||||
}
|
||||
|
||||
Status status = new Status(params.status);
|
||||
// for now, abort the query if we see any error except if the error is cancelled
|
||||
// and returned_all_results_ is true.
|
||||
// (UpdateStatus() initiates cancellation, if it hasn't already been initiated)
|
||||
if (!(returnedAllResults && status.isCancelled()) && !status.ok()) {
|
||||
LOG.warn("one instance report fail, query_id={} instance_id={}, error message: {}",
|
||||
DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()),
|
||||
status.getErrorMsg());
|
||||
updateStatus(status, params.getFragmentInstanceId());
|
||||
}
|
||||
Preconditions.checkArgument(params.isSetDetailedReport());
|
||||
for (TDetailedReportParams param : params.detailed_report) {
|
||||
if (ctx.fragmentInstancesMap.get(param.fragment_instance_id).getIsDone()) {
|
||||
// TODO
|
||||
executionProfile.markOneInstanceDone(param.getFragmentInstanceId());
|
||||
}
|
||||
}
|
||||
} else if (enablePipelineEngine) {
|
||||
PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId()));
|
||||
if (!ctx.updateProfile(params, false)) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -2932,27 +2966,51 @@ public class Coordinator {
|
||||
|
||||
// update profile.
|
||||
// return true if profile is updated. Otherwise, return false.
|
||||
public synchronized boolean updateProfile(TReportExecStatusParams params) {
|
||||
RuntimeProfile profile = fragmentInstancesMap.get(params.fragment_instance_id);
|
||||
if (params.done && profile.getIsDone()) {
|
||||
// duplicate packet
|
||||
return false;
|
||||
}
|
||||
public synchronized boolean updateProfile(TReportExecStatusParams params, boolean isPipelineX) {
|
||||
if (isPipelineX) {
|
||||
for (TDetailedReportParams param : params.detailed_report) {
|
||||
RuntimeProfile profile = fragmentInstancesMap.get(param.fragment_instance_id);
|
||||
if (params.done && profile.getIsDone()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (params.isSetProfile()) {
|
||||
profile.update(params.profile);
|
||||
if (param.isSetProfile()) {
|
||||
profile.update(param.profile);
|
||||
}
|
||||
if (params.isSetLoadChannelProfile()) {
|
||||
loadChannelProfile.update(params.loadChannelProfile);
|
||||
}
|
||||
if (params.done) {
|
||||
profile.setIsDone(true);
|
||||
profileReportProgress++;
|
||||
}
|
||||
if (profileReportProgress == numInstances) {
|
||||
this.done = true;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
RuntimeProfile profile = fragmentInstancesMap.get(params.fragment_instance_id);
|
||||
if (params.done && profile.getIsDone()) {
|
||||
// duplicate packet
|
||||
return false;
|
||||
}
|
||||
|
||||
if (params.isSetProfile()) {
|
||||
profile.update(params.profile);
|
||||
}
|
||||
if (params.isSetLoadChannelProfile()) {
|
||||
loadChannelProfile.update(params.loadChannelProfile);
|
||||
}
|
||||
if (params.done) {
|
||||
profile.setIsDone(true);
|
||||
profileReportProgress++;
|
||||
}
|
||||
if (profileReportProgress == numInstances) {
|
||||
this.done = true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if (params.isSetLoadChannelProfile()) {
|
||||
loadChannelProfile.update(params.loadChannelProfile);
|
||||
}
|
||||
if (params.done) {
|
||||
profile.setIsDone(true);
|
||||
profileReportProgress++;
|
||||
}
|
||||
if (profileReportProgress == numInstances) {
|
||||
this.done = true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public synchronized void printProfile(StringBuilder builder) {
|
||||
|
||||
Reference in New Issue
Block a user