[fix](streamload) fix stream load failed when enable profile (#18364)

#18015 enables stream load profile log,  however be will encounter rpc fail when loading tpch data(see #18291). This is because when `is_report_success` is true, be will reportExecStatus to fe, but fe cannot find QueryInfo in `coordinatorMap`, thus it will return error to be.
This commit is contained in:
gitccl
2023-04-05 01:01:46 +08:00
committed by GitHub
parent d8b293de07
commit 7f8d92656e
13 changed files with 47 additions and 27 deletions

View File

@ -47,7 +47,6 @@ import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.planner.external.ExternalFileScanNode;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.PaloInternalServiceVersion;
@ -276,7 +275,7 @@ public class StreamLoadPlanner {
queryOptions.setEnableVectorizedEngine(Config.enable_vectorized_load);
queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
queryOptions.setBeExecVersion(Config.be_exec_version);
queryOptions.setIsReportSuccess(VariableMgr.newSessionVariable().enableProfile());
queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
params.setQueryOptions(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();

View File

@ -24,6 +24,7 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ProfileWriter;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
import org.apache.doris.thrift.TStatus;
@ -190,8 +191,14 @@ public final class QeProcessorImpl implements QeProcessor {
final QueryInfo info = coordinatorMap.get(params.query_id);
if (info == null) {
result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR));
LOG.info("ReportExecStatus() runtime error, query {} does not exist", DebugUtil.printId(params.query_id));
// There is no QueryInfo for StreamLoad, so we return OK
if (params.query_type == TQueryType.LOAD) {
result.setStatus(new TStatus(TStatusCode.OK));
} else {
result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR));
}
LOG.info("ReportExecStatus() runtime error, query {} with type {} does not exist",
DebugUtil.printId(params.query_id), params.query_type);
return result;
}
try {

View File

@ -105,6 +105,10 @@ public interface LoadTaskInfo {
return 0;
}
default boolean getEnableProfile() {
return false;
}
class ImportColumnDescs {
public List<ImportColumnDesc> descs = Lists.newArrayList();
public boolean isColumnDescsRewrited = false;

View File

@ -83,8 +83,8 @@ public class StreamLoadTask implements LoadTaskInfo {
private String headerType = "";
private List<String> hiddenColumns;
private boolean trimDoubleQuotes = false;
private int skipLines = 0;
private boolean enableProfile = false;
public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType,
TFileCompressType compressType) {
@ -263,6 +263,11 @@ public class StreamLoadTask implements LoadTaskInfo {
return skipLines;
}
@Override
public boolean getEnableProfile() {
return enableProfile;
}
public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType(),
@ -368,6 +373,9 @@ public class StreamLoadTask implements LoadTaskInfo {
if (request.isSetSkipLines()) {
skipLines = request.getSkipLines();
}
if (request.isSetEnableProfile()) {
enableProfile = request.isEnableProfile();
}
}
// used for stream load