[Enhance] Add profile for load job (#5052)
Add viewable profile for broker load. Similar to the query profile, the user can submit the import job by setting the session variable is_report_success to true, and then view the running profile of the job on the FE web page for easy analysis and debugging.
This commit is contained in:
@ -24,16 +24,17 @@ import org.apache.doris.thrift.TRuntimeProfileNode;
|
||||
import org.apache.doris.thrift.TRuntimeProfileTree;
|
||||
import org.apache.doris.thrift.TUnit;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Formatter;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -58,7 +59,7 @@ public class RuntimeProfile {
|
||||
private Map<String, RuntimeProfile> childMap = Maps.newConcurrentMap();
|
||||
|
||||
private Map<String, TreeSet<String>> childCounterMap = Maps.newHashMap();
|
||||
private List<Pair<RuntimeProfile, Boolean>> childList = Lists.newArrayList();
|
||||
private LinkedList<Pair<RuntimeProfile, Boolean>> childList = Lists.newLinkedList();
|
||||
|
||||
private String name;
|
||||
|
||||
@ -318,6 +319,16 @@ public class RuntimeProfile {
|
||||
this.childList.add(pair);
|
||||
}
|
||||
|
||||
public void addFirstChild(RuntimeProfile child) {
|
||||
if (child == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.childMap.put(child.name, child);
|
||||
Pair<RuntimeProfile, Boolean> pair = Pair.create(child, true);
|
||||
this.childList.addFirst(pair);
|
||||
}
|
||||
|
||||
// Because the profile of summary and child fragment is not a real parent-child relationship
|
||||
// Each child profile needs to calculate the time proportion consumed by itself
|
||||
public void computeTimeInChildProfile() {
|
||||
|
||||
@ -29,13 +29,18 @@ import org.apache.doris.common.DuplicatedRequestException;
|
||||
import org.apache.doris.common.LabelAlreadyUsedException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.LogBuilder;
|
||||
import org.apache.doris.common.util.LogKey;
|
||||
import org.apache.doris.common.util.ProfileManager;
|
||||
import org.apache.doris.common.util.RuntimeProfile;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.load.BrokerFileGroup;
|
||||
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
|
||||
import org.apache.doris.load.EtlJobType;
|
||||
import org.apache.doris.load.FailMsg;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.OriginStatement;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
@ -44,12 +49,12 @@ import org.apache.doris.transaction.TransactionState;
|
||||
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
|
||||
import org.apache.doris.transaction.TransactionState.TxnSourceType;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
@ -65,7 +70,12 @@ public class BrokerLoadJob extends BulkLoadJob {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(BrokerLoadJob.class);
|
||||
|
||||
// only for log replay
|
||||
// Profile of this load job, including all tasks' profiles
|
||||
private RuntimeProfile jobProfile;
|
||||
// If set to true, the profile of load job with be pushed to ProfileManager
|
||||
private boolean isReportSuccess = false;
|
||||
|
||||
// for log replay and unit test
|
||||
public BrokerLoadJob() {
|
||||
super();
|
||||
this.jobType = EtlJobType.BROKER;
|
||||
@ -78,6 +88,9 @@ public class BrokerLoadJob extends BulkLoadJob {
|
||||
this.timeoutSecond = Config.broker_load_default_timeout_second;
|
||||
this.brokerDesc = brokerDesc;
|
||||
this.jobType = EtlJobType.BROKER;
|
||||
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isReportSucc()) {
|
||||
isReportSuccess = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -173,6 +186,7 @@ public class BrokerLoadJob extends BulkLoadJob {
|
||||
private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachment) throws UserException {
|
||||
// divide job into broker loading task by table
|
||||
List<LoadLoadingTask> newLoadingTasks = Lists.newArrayList();
|
||||
this.jobProfile = new RuntimeProfile("BrokerLoadJob " + id + ". " + label);
|
||||
db.readLock();
|
||||
try {
|
||||
for (Map.Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : fileGroupAggInfo.getAggKeyToFileGroups().entrySet()) {
|
||||
@ -193,7 +207,8 @@ public class BrokerLoadJob extends BulkLoadJob {
|
||||
// Generate loading task and init the plan of task
|
||||
LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc,
|
||||
brokerFileGroups, getDeadlineMs(), execMemLimit,
|
||||
strictMode, transactionId, this, timezone, timeoutSecond);
|
||||
strictMode, transactionId, this, timezone, timeoutSecond,
|
||||
isReportSuccess ? jobProfile : null);
|
||||
UUID uuid = UUID.randomUUID();
|
||||
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
|
||||
task.init(loadId, attachment.getFileStatusByTable(aggKey),
|
||||
@ -300,6 +315,32 @@ public class BrokerLoadJob extends BulkLoadJob {
|
||||
}
|
||||
}
|
||||
|
||||
private void writeProfile() {
|
||||
if (!isReportSuccess) {
|
||||
return;
|
||||
}
|
||||
|
||||
RuntimeProfile summaryProfile = new RuntimeProfile("Summary");
|
||||
summaryProfile.addInfoString(ProfileManager.QUERY_ID, String.valueOf(id));
|
||||
summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(createTimestamp));
|
||||
summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(finishTimestamp));
|
||||
summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(finishTimestamp - createTimestamp));
|
||||
|
||||
summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Load");
|
||||
summaryProfile.addInfoString(ProfileManager.QUERY_STATE, "N/A");
|
||||
summaryProfile.addInfoString(ProfileManager.USER, "N/A");
|
||||
summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, "N/A");
|
||||
summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, "N/A");
|
||||
summaryProfile.addInfoString(ProfileManager.IS_CACHED, "N/A");
|
||||
|
||||
// Add the summary profile to the first
|
||||
jobProfile.addFirstChild(summaryProfile);
|
||||
jobProfile.computeTimeInChildProfile();
|
||||
StringBuilder builder = new StringBuilder();
|
||||
jobProfile.prettyPrint(builder, "");
|
||||
ProfileManager.getInstance().pushProfile(jobProfile);
|
||||
}
|
||||
|
||||
private void updateLoadingStatus(BrokerLoadingTaskAttachment attachment) {
|
||||
loadingStatus.replaceCounter(DPP_ABNORMAL_ALL,
|
||||
increaseCounter(DPP_ABNORMAL_ALL, attachment.getCounter(DPP_ABNORMAL_ALL)));
|
||||
@ -327,4 +368,11 @@ public class BrokerLoadJob extends BulkLoadJob {
|
||||
}
|
||||
return String.valueOf(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterVisible(TransactionState txnState, boolean txnOperated) {
|
||||
super.afterVisible(txnState, txnOperated);
|
||||
writeProfile();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -27,6 +27,8 @@ import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.LogBuilder;
|
||||
import org.apache.doris.common.util.LogKey;
|
||||
import org.apache.doris.common.util.RuntimeProfile;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.load.BrokerFileGroup;
|
||||
import org.apache.doris.load.FailMsg;
|
||||
import org.apache.doris.qe.Coordinator;
|
||||
@ -64,11 +66,15 @@ public class LoadLoadingTask extends LoadTask {
|
||||
|
||||
private LoadingTaskPlanner planner;
|
||||
|
||||
private RuntimeProfile jobProfile;
|
||||
private RuntimeProfile profile;
|
||||
private long beginTime;
|
||||
|
||||
public LoadLoadingTask(Database db, OlapTable table,
|
||||
BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
|
||||
long jobDeadlineMs, long execMemLimit, boolean strictMode,
|
||||
long txnId, LoadTaskCallback callback, String timezone,
|
||||
long timeoutS) {
|
||||
long timeoutS, RuntimeProfile profile) {
|
||||
super(callback, TaskType.LOADING);
|
||||
this.db = db;
|
||||
this.table = table;
|
||||
@ -82,6 +88,7 @@ public class LoadLoadingTask extends LoadTask {
|
||||
this.retryTime = 2; // 2 times is enough
|
||||
this.timezone = timezone;
|
||||
this.timeoutS = timeoutS;
|
||||
this.jobProfile = profile;
|
||||
}
|
||||
|
||||
public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusList, int fileNum, UserIdentity userInfo) throws UserException {
|
||||
@ -100,6 +107,7 @@ public class LoadLoadingTask extends LoadTask {
|
||||
LOG.info("begin to execute loading task. load id: {} job: {}. db: {}, tbl: {}. left retry: {}",
|
||||
DebugUtil.printId(loadId), callback.getCallbackId(), db.getFullName(), table.getName(), retryTime);
|
||||
retryTime--;
|
||||
beginTime = System.nanoTime();
|
||||
executeOnce();
|
||||
}
|
||||
|
||||
@ -148,6 +156,8 @@ public class LoadLoadingTask extends LoadTask {
|
||||
curCoordinator.getLoadCounters(),
|
||||
curCoordinator.getTrackingUrl(),
|
||||
TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos()));
|
||||
// Create profile of this task and add to the job profile.
|
||||
createProfile(curCoordinator);
|
||||
} else {
|
||||
throw new LoadException(status.getErrorMsg());
|
||||
}
|
||||
@ -160,6 +170,19 @@ public class LoadLoadingTask extends LoadTask {
|
||||
return jobDeadlineMs - System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public void createProfile(Coordinator coord) {
|
||||
if (jobProfile == null) {
|
||||
// No need to gather profile
|
||||
return;
|
||||
}
|
||||
// Summary profile
|
||||
profile = new RuntimeProfile("LoadTask: " + DebugUtil.printId(loadId));
|
||||
coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTime));
|
||||
coord.endProfile();
|
||||
profile.addChild(coord.getQueryProfile());
|
||||
jobProfile.addChild(profile);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateRetryInfo() {
|
||||
super.updateRetryInfo();
|
||||
|
||||
@ -17,22 +17,23 @@
|
||||
|
||||
package org.apache.doris.load.loadv2;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.BrokerDesc;
|
||||
import org.apache.doris.analysis.DataDescription;
|
||||
import org.apache.doris.analysis.LabelName;
|
||||
import org.apache.doris.analysis.LoadStmt;
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.catalog.BrokerTable;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.BrokerTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
import org.apache.doris.common.util.RuntimeProfile;
|
||||
import org.apache.doris.load.BrokerFileGroup;
|
||||
import org.apache.doris.load.BrokerFileGroupAggInfo;
|
||||
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
|
||||
@ -45,29 +46,29 @@ import org.apache.doris.planner.BrokerScanNode;
|
||||
import org.apache.doris.planner.OlapTableSink;
|
||||
import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.task.MasterTaskExecutor;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.UUID;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import mockit.Expectations;
|
||||
import mockit.Injectable;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import mockit.Mocked;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
public class BrokerLoadJobTest {
|
||||
|
||||
@ -358,8 +359,10 @@ public class BrokerLoadJobTest {
|
||||
fileGroups.add(brokerFileGroup);
|
||||
UUID uuid = UUID.randomUUID();
|
||||
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
|
||||
RuntimeProfile jobProfile = new RuntimeProfile("test");
|
||||
LoadLoadingTask task = new LoadLoadingTask(database, olapTable,brokerDesc, fileGroups,
|
||||
100, 100,false, 100, callback, "", 100);
|
||||
100, 100, false, 100, callback, "", 100,
|
||||
jobProfile);
|
||||
try {
|
||||
UserIdentity userInfo = new UserIdentity("root", "localhost");
|
||||
userInfo.setIsAnalyzed();
|
||||
|
||||
Reference in New Issue
Block a user