[bugfix](profile) support multi execution profile for brokerload (#32280)

The bug is introduced by #27184
Profile Format is :
Summary
MergedProfile
ExecutionProfile1
ExecutionProfile2
...

There maybe multiple execution profiles for broker load.
This commit is contained in:
yiguolei
2024-03-15 17:41:47 +08:00
committed by yiguolei
parent 9ad196f189
commit a90a1a76f1
6 changed files with 222 additions and 63 deletions

View File

@ -1,41 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.profile;
import org.apache.doris.common.util.RuntimeProfile;
import java.util.Map;
/**
* AggregatedProfile is part of a query profile.
* It contains the aggregated information of a query.
*/
public class AggregatedProfile {
public static final String PROFILE_NAME = "MergedProfile";
private ExecutionProfile executionProfile;
public AggregatedProfile(RuntimeProfile rootProfile, ExecutionProfile executionProfile) {
this.executionProfile = executionProfile;
}
public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> planNodeMap) {
return executionProfile.getAggregatedFragmentsProfile(planNodeMap);
}
}

View File

@ -21,11 +21,13 @@ import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.planner.Planner;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
/**
@ -33,20 +35,23 @@ import java.util.Map;
* following structure: root profile: // summary of this profile, such as start
* time, end time, query id, etc. [SummaryProfile] // each execution profile is
* a complete execution of a query, a job may contain multiple queries.
* [List<ExecutionProfile>]
* [List<ExecutionProfile>].
* There maybe multi execution profiles for one job, for example broker load job.
* It will create one execution profile for every single load task.
*
* SummaryProfile: Summary: Execution Summary:
*
*
* ExecutionProfile: Fragment 0: Fragment 1: ...
* ExecutionProfile1: Fragment 0: Fragment 1: ...
* ExecutionProfile2: Fragment 0: Fragment 1: ...
*
*/
public class Profile {
private static final Logger LOG = LogManager.getLogger(Profile.class);
private static final int MergedProfileLevel = 1;
private RuntimeProfile rootProfile;
private SummaryProfile summaryProfile;
private AggregatedProfile aggregatedProfile;
private ExecutionProfile executionProfile;
private List<ExecutionProfile> executionProfiles = Lists.newArrayList();
private boolean isFinished;
private Map<Integer, String> planNodeMap;
@ -59,14 +64,13 @@ public class Profile {
this.isFinished = !isEnable;
}
public void setExecutionProfile(ExecutionProfile executionProfile) {
public void addExecutionProfile(ExecutionProfile executionProfile) {
if (executionProfile == null) {
LOG.warn("try to set a null excecution profile, it is abnormal", new Exception());
return;
}
this.executionProfile = executionProfile;
this.executionProfile.addToProfileAsChild(rootProfile);
this.aggregatedProfile = new AggregatedProfile(rootProfile, executionProfile);
this.executionProfiles.add(executionProfile);
executionProfile.addToProfileAsChild(rootProfile);
}
public synchronized void update(long startTime, Map<String, String> summaryInfo, boolean isFinished,
@ -75,12 +79,10 @@ public class Profile {
if (this.isFinished) {
return;
}
if (executionProfile == null) {
// Sometimes execution profile is not set
return;
}
summaryProfile.update(summaryInfo);
executionProfile.update(startTime, isFinished);
for (ExecutionProfile executionProfile : executionProfiles) {
executionProfile.update(startTime, isFinished);
}
rootProfile.computeTimeInProfile();
// Nerids native insert not set planner, so it is null
if (planner != null) {
@ -109,18 +111,22 @@ public class Profile {
// add summary to builder
summaryProfile.prettyPrint(builder);
LOG.info(builder.toString());
if (this.profileLevel == MergedProfileLevel) {
// Only generate merged profile for select, insert into select.
// Not support broker load now.
if (this.profileLevel == MergedProfileLevel && this.executionProfiles.size() == 1) {
try {
builder.append("\n MergedProfile \n");
aggregatedProfile.getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder, " ");
this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder, " ");
} catch (Throwable aggProfileException) {
LOG.warn("build merged simple profile failed", aggProfileException);
builder.append("build merged simple profile failed");
}
}
try {
builder.append("\n");
executionProfile.getExecutionProfile().prettyPrint(builder, "");
for (ExecutionProfile executionProfile : executionProfiles) {
builder.append("\n");
executionProfile.getExecutionProfile().prettyPrint(builder, "");
}
} catch (Throwable aggProfileException) {
LOG.warn("build profile failed", aggProfileException);
builder.append("build profile failed");

View File

@ -143,7 +143,7 @@ public class LoadLoadingTask extends LoadTask {
Coordinator curCoordinator = new Coordinator(callback.getCallbackId(), loadId, planner.getDescTable(),
planner.getFragments(), planner.getScanNodes(), planner.getTimezone(), loadZeroTolerance);
if (this.jobProfile != null) {
this.jobProfile.setExecutionProfile(curCoordinator.getExecutionProfile());
this.jobProfile.addExecutionProfile(curCoordinator.getExecutionProfile());
}
curCoordinator.setQueryType(TQueryType.LOAD);
curCoordinator.setExecMemoryLimit(execMemLimit);

View File

@ -222,7 +222,7 @@ public class InsertExecutor {
try {
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
coordinator.setQueryType(TQueryType.LOAD);
executor.getProfile().setExecutionProfile(coordinator.getExecutionProfile());
executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile());
QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), executor.getOriginStmtInString(), coordinator);
QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo);
coordinator.exec();

View File

@ -0,0 +1,194 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TQueryType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Optional;
/**
* Abstract insert executor.
* The derived class should implement the abstract method for certain type of target table
*/
public abstract class AbstractInsertExecutor {
private static final Logger LOG = LogManager.getLogger(AbstractInsertExecutor.class);
protected long jobId;
protected final ConnectContext ctx;
protected final Coordinator coordinator;
protected String labelName;
protected final DatabaseIf database;
protected final TableIf table;
protected final long createTime = System.currentTimeMillis();
protected long loadedRows = 0;
protected int filteredRows = 0;
protected String errMsg = "";
protected Optional<InsertCommandContext> insertCtx;
/**
* Constructor
*/
public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
this.ctx = ctx;
this.coordinator = EnvFactory.getInstance().createCoordinator(ctx, null, planner, ctx.getStatsErrorEstimator());
this.labelName = labelName;
this.table = table;
this.database = table.getDatabase();
this.insertCtx = insertCtx;
}
public Coordinator getCoordinator() {
return coordinator;
}
public DatabaseIf getDatabase() {
return database;
}
public TableIf getTable() {
return table;
}
public String getLabelName() {
return labelName;
}
/**
* begin transaction if necessary
*/
public abstract void beginTransaction();
/**
* finalize sink to complete enough info for sink execution
*/
protected abstract void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink);
/**
* Do something before exec
*/
protected abstract void beforeExec();
/**
* Do something after exec finished
*/
protected abstract void onComplete() throws UserException;
/**
* Do something when exec throw exception
*/
protected abstract void onFail(Throwable t);
/**
* Do something after exec
*/
protected abstract void afterExec(StmtExecutor executor);
protected final void execImpl(StmtExecutor executor, long jobId) throws Exception {
String queryId = DebugUtil.printId(ctx.queryId());
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
coordinator.setQueryType(TQueryType.LOAD);
executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile());
QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), executor.getOriginStmtInString(), coordinator);
QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo);
coordinator.exec();
int execTimeout = ctx.getExecTimeout();
if (LOG.isDebugEnabled()) {
LOG.debug("insert [{}] with query id {} execution timeout is {}", labelName, queryId, execTimeout);
}
boolean notTimeout = coordinator.join(execTimeout);
if (!coordinator.isDone()) {
coordinator.cancel();
if (notTimeout) {
errMsg = coordinator.getExecStatus().getErrorMsg();
ErrorReport.reportDdlException("there exists unhealthy backend. "
+ errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
}
}
if (!coordinator.getExecStatus().ok()) {
errMsg = coordinator.getExecStatus().getErrorMsg();
LOG.warn("insert [{}] with query id {} failed, {}", labelName, queryId, errMsg);
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
}
if (LOG.isDebugEnabled()) {
LOG.debug("insert [{}] with query id {} delta files is {}",
labelName, queryId, coordinator.getDeltaUrls());
}
if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) {
loadedRows = Long.parseLong(coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
}
if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) {
filteredRows = Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
}
}
private boolean checkStrictMode() {
// if in strict mode, insert will fail if there are filtered rows
if (ctx.getSessionVariable().getEnableInsertStrict()) {
if (filteredRows > 0) {
ctx.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT,
"Insert has filtered data in strict mode, tracking_url=" + coordinator.getTrackingUrl());
return false;
}
}
return true;
}
/**
* execute insert txn for insert into select command.
*/
public void executeSingleInsert(StmtExecutor executor, long jobId) {
beforeExec();
try {
execImpl(executor, jobId);
if (!checkStrictMode()) {
return;
}
onComplete();
} catch (Throwable t) {
onFail(t);
return;
} finally {
executor.updateProfile(true);
QeProcessorImpl.INSTANCE.unregisterQuery(ctx.queryId());
}
afterExec(executor);
}
}

View File

@ -1576,7 +1576,7 @@ public class StmtExecutor {
coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator());
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
profile.setExecutionProfile(coord.getExecutionProfile());
profile.addExecutionProfile(coord.getExecutionProfile());
coordBase = coord;
}
@ -2053,7 +2053,7 @@ public class StmtExecutor {
coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator());
coord.setLoadZeroTolerance(context.getSessionVariable().getEnableInsertStrict());
coord.setQueryType(TQueryType.LOAD);
profile.setExecutionProfile(coord.getExecutionProfile());
profile.addExecutionProfile(coord.getExecutionProfile());
QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), this.getOriginStmtInString(), coord);
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), queryInfo);
@ -2875,7 +2875,7 @@ public class StmtExecutor {
}
RowBatch batch;
coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator());
profile.setExecutionProfile(coord.getExecutionProfile());
profile.addExecutionProfile(coord.getExecutionProfile());
try {
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));