[fix](broker) fix no error url when broker data quality error (#35643) (#36089)

## Proposed changes

cherry-pick from #35643
This commit is contained in:
xueweizhang
2024-06-14 09:29:14 +08:00
committed by GitHub
parent af1c4ecf89
commit a23aee2883
4 changed files with 15 additions and 5 deletions

View File

@ -50,6 +50,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TransactionState;
@ -292,7 +293,7 @@ public class BrokerLoadJob extends BulkLoadJob {
}
// check data quality
if (!checkDataQuality()) {
if (!checkDataQuality() || attachment.getStatus().getErrorCode() == TStatusCode.DATA_QUALITY_ERROR) {
cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED,
DataQualityException.QUALITY_FAIL_MSG), true, true);
return;

View File

@ -17,6 +17,7 @@
package org.apache.doris.load.loadv2;
import org.apache.doris.common.Status;
import org.apache.doris.transaction.ErrorTabletInfo;
import org.apache.doris.transaction.TabletCommitInfo;
@ -29,15 +30,17 @@ public class BrokerLoadingTaskAttachment extends TaskAttachment {
private String trackingUrl;
private List<TabletCommitInfo> commitInfoList;
List<ErrorTabletInfo> errorTabletInfos;
private Status status = new Status();
public BrokerLoadingTaskAttachment(long taskId, Map<String, String> counters, String trackingUrl,
List<TabletCommitInfo> commitInfoList,
List<ErrorTabletInfo> errorTabletInfos) {
List<ErrorTabletInfo> errorTabletInfos, Status status) {
super(taskId);
this.trackingUrl = trackingUrl;
this.counters = counters;
this.commitInfoList = commitInfoList;
this.errorTabletInfos = errorTabletInfos;
this.status = status;
}
public String getCounter(String key) {
@ -55,4 +58,8 @@ public class BrokerLoadingTaskAttachment extends TaskAttachment {
public List<ErrorTabletInfo> getErrorTabletInfos() {
return errorTabletInfos;
}
public Status getStatus() {
return status;
}
}

View File

@ -36,6 +36,7 @@ import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.ErrorTabletInfo;
import org.apache.doris.transaction.TabletCommitInfo;
@ -195,13 +196,14 @@ public class LoadLoadingTask extends LoadTask {
curCoordinator.exec();
if (curCoordinator.join(waitSecond)) {
Status status = curCoordinator.getExecStatus();
if (status.ok()) {
if (status.ok() || status.getErrorCode() == TStatusCode.DATA_QUALITY_ERROR) {
attachment = new BrokerLoadingTaskAttachment(signature,
curCoordinator.getLoadCounters(),
curCoordinator.getTrackingUrl(),
TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos()),
ErrorTabletInfo.fromThrift(curCoordinator.getErrorTabletInfos()
.stream().limit(Config.max_error_tablet_of_broker_load).collect(Collectors.toList())));
.stream().limit(Config.max_error_tablet_of_broker_load).collect(Collectors.toList())),
status);
curCoordinator.getErrorTabletInfos().clear();
} else {
throw new LoadException(status.getErrorMsg());