From a23aee28830bad7a38834c5fd38c491906619af8 Mon Sep 17 00:00:00 2001 From: xueweizhang Date: Fri, 14 Jun 2024 09:29:14 +0800 Subject: [PATCH] [fix](broker) fix no error url when broker data quality error (#35643) (#36089) ## Proposed changes cherry-pick from #35643 --- .../java/org/apache/doris/load/loadv2/BrokerLoadJob.java | 3 ++- .../doris/load/loadv2/BrokerLoadingTaskAttachment.java | 9 ++++++++- .../org/apache/doris/load/loadv2/LoadLoadingTask.java | 6 ++++-- .../suites/load_p0/broker_load/test_etl_failed.groovy | 2 +- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 02f9bb0a3e..de32abbbb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -50,6 +50,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; @@ -292,7 +293,7 @@ public class BrokerLoadJob extends BulkLoadJob { } // check data quality - if (!checkDataQuality()) { + if (!checkDataQuality() || attachment.getStatus().getErrorCode() == TStatusCode.DATA_QUALITY_ERROR) { cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, DataQualityException.QUALITY_FAIL_MSG), true, true); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java index 7aefd332a2..e5f8973d33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java @@ -17,6 +17,7 @@ package org.apache.doris.load.loadv2; +import org.apache.doris.common.Status; import org.apache.doris.transaction.ErrorTabletInfo; import org.apache.doris.transaction.TabletCommitInfo; @@ -29,15 +30,17 @@ public class BrokerLoadingTaskAttachment extends TaskAttachment { private String trackingUrl; private List commitInfoList; List errorTabletInfos; + private Status status = new Status(); public BrokerLoadingTaskAttachment(long taskId, Map counters, String trackingUrl, List commitInfoList, - List errorTabletInfos) { + List errorTabletInfos, Status status) { super(taskId); this.trackingUrl = trackingUrl; this.counters = counters; this.commitInfoList = commitInfoList; this.errorTabletInfos = errorTabletInfos; + this.status = status; } public String getCounter(String key) { @@ -55,4 +58,8 @@ public class BrokerLoadingTaskAttachment extends TaskAttachment { public List getErrorTabletInfos() { return errorTabletInfos; } + + public Status getStatus() { + return status; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 2d1b312fb8..015190bb55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -36,6 +36,7 @@ import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TQueryType; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.ErrorTabletInfo; import org.apache.doris.transaction.TabletCommitInfo; @@ -195,13 +196,14 @@ public class LoadLoadingTask extends LoadTask { curCoordinator.exec(); if (curCoordinator.join(waitSecond)) { Status status = curCoordinator.getExecStatus(); - if (status.ok()) { + if (status.ok() || status.getErrorCode() == TStatusCode.DATA_QUALITY_ERROR) { attachment = new BrokerLoadingTaskAttachment(signature, curCoordinator.getLoadCounters(), curCoordinator.getTrackingUrl(), TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos()), ErrorTabletInfo.fromThrift(curCoordinator.getErrorTabletInfos() - .stream().limit(Config.max_error_tablet_of_broker_load).collect(Collectors.toList()))); + .stream().limit(Config.max_error_tablet_of_broker_load).collect(Collectors.toList())), + status); curCoordinator.getErrorTabletInfos().clear(); } else { throw new LoadException(status.getErrorMsg()); diff --git a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy index 3b4eac874d..928b4e3854 100644 --- a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy +++ b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy @@ -63,7 +63,7 @@ suite("test_etl_failed", "load_p0") { assertTrue(1 == 2, "etl should be failed") break; } - if (result[0][2].equals("CANCELLED")) { + if (result[0][2].equals("CANCELLED") && result[0][13].contains("_load_error_log")) { break; } Thread.sleep(1000)