diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 7c6734b22b..10459be41d 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -824,5 +824,13 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_relocate = false; @ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_balance = false; + + /* + * If set to true, the insert stmt with processing error will still return a label to user. + * And user can use this label to check the load job's status. + * The default value is false, which means if insert operation encounter errors, + * exception will be thrown to user client directly without load label. + */ + @ConfField(mutable = true, masterOnly = true) public static boolean using_old_load_usage_pattern = false; } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java index a30075383c..ad7f9e411c 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java @@ -23,6 +23,10 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.Config; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.load.EtlJobType; +import org.apache.doris.load.FailMsg; +import org.apache.doris.load.FailMsg.CancelType; + +import com.google.common.base.Strings; import java.io.DataInput; import java.io.DataOutput; @@ -46,14 +50,20 @@ public class InsertLoadJob extends LoadJob { this.jobType = EtlJobType.INSERT; } - public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp) { + public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp, String failMsg) { super(dbId, label); this.tableId = tableId; this.createTimestamp = createTimestamp; this.loadStartTimestamp = createTimestamp; this.finishTimestamp = System.currentTimeMillis(); - this.state = JobState.FINISHED; - this.progress = 100; + if (Strings.isNullOrEmpty(failMsg)) { + this.state = JobState.FINISHED; + this.progress = 100; + } else { + this.state = JobState.CANCELLED; + this.failMsg = new FailMsg(CancelType.LOAD_RUN_FAIL, failMsg); + this.progress = 0; + } this.jobType = EtlJobType.INSERT; this.timeoutSecond = Config.insert_load_default_timeout_second; } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 1f01f0ec51..e950b16fc0 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -251,7 +251,7 @@ public class LoadManager implements Writable{ } public void recordFinishedLoadJob(String label, String dbName, long tableId, EtlJobType jobType, - long createTimestamp) throws MetaNotFoundException { + long createTimestamp, String failMsg) throws MetaNotFoundException { // get db id Database db = Catalog.getCurrentCatalog().getDb(dbName); @@ -262,7 +262,7 @@ public class LoadManager implements Writable{ LoadJob loadJob; switch (jobType) { case INSERT: - loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp); + loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp, failMsg); break; default: return; diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index 91a3c205f0..916f7b2f64 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -589,74 +589,111 @@ public class StmtExecutor { return; } - // assign request_id + long createTime = System.currentTimeMillis(); UUID uuid = UUID.randomUUID(); - context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); + Throwable throwable = null; + try { + // assign request_id + context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); - coord = new Coordinator(context, analyzer, planner); - coord.setQueryType(TQueryType.LOAD); + coord = new Coordinator(context, analyzer, planner); + coord.setQueryType(TQueryType.LOAD); - QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord); + QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord); - coord.exec(); + coord.exec(); - coord.join(context.getSessionVariable().getQueryTimeoutS()); - if (!coord.isDone()) { - coord.cancel(); - ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT); - } - - if (!coord.getExecStatus().ok()) { - String errMsg = coord.getExecStatus().getErrorMsg(); - LOG.warn("insert failed: {}", errMsg); - - // hide host info - int hostIndex = errMsg.indexOf("host"); - if (hostIndex != -1) { - errMsg = errMsg.substring(0, hostIndex); + coord.join(context.getSessionVariable().getQueryTimeoutS()); + if (!coord.isDone()) { + coord.cancel(); + ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT); } - ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); - } - LOG.info("delta files is {}", coord.getDeltaUrls()); + if (!coord.getExecStatus().ok()) { + String errMsg = coord.getExecStatus().getErrorMsg(); + LOG.warn("insert failed: {}", errMsg); - if (context.getSessionVariable().getEnableInsertStrict()) { - Map counters = coord.getLoadCounters(); - String strValue = counters.get(LoadEtlTask.DPP_ABNORMAL_ALL); - if (strValue != null && Long.valueOf(strValue) > 0) { - throw new UserException("Insert has filtered data in strict mode, tracking_url=" - + coord.getTrackingUrl()); + // hide host info + int hostIndex = errMsg.indexOf("host"); + if (hostIndex != -1) { + errMsg = errMsg.substring(0, hostIndex); + } + ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); } + + LOG.info("delta files is {}", coord.getDeltaUrls()); + + if (context.getSessionVariable().getEnableInsertStrict()) { + Map counters = coord.getLoadCounters(); + String strValue = counters.get(LoadEtlTask.DPP_ABNORMAL_ALL); + if (strValue != null && Long.valueOf(strValue) > 0) { + throw new UserException("Insert has filtered data in strict mode, tracking_url=" + + coord.getTrackingUrl()); + } + } + + if (insertStmt.getTargetTable().getType() != TableType.OLAP) { + // no need to add load job. + // MySQL table is already being inserted. + context.getState().setOk(); + return; + } + + Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( + insertStmt.getDbObj(), insertStmt.getTransactionId(), + TabletCommitInfo.fromThrift(coord.getCommitInfos()), + 5000); + } catch (Throwable t) { + // if any throwable being thrown during insert operation, first we should abort this txn + LOG.warn("handle insert stmt fail: {}", DebugUtil.printId(uuid), t); + try { + Catalog.getCurrentGlobalTransactionMgr().abortTransaction( + insertStmt.getTransactionId(), + t.getMessage() == null ? "unknown reason" : t.getMessage()); + } catch (Exception abortTxnException) { + // just print a log if abort txn failed. This failure do not need to pass to user. + // user only concern abort how txn failed. + LOG.warn("errors when abort txn", abortTxnException); + } + + if (!Config.using_old_load_usage_pattern) { + // if not using old usage pattern, the exception will be thrown to user directly without + // a label + throw t; + } + + /* + * If config 'using_old_load_usage_pattern' is true. + * Doris will return a label to user, and user can use this label to check load job's status, + * which exactly like the old insert stmt usage pattern. + */ + throwable = t; } - if (insertStmt.getTargetTable().getType() != TableType.OLAP) { - // no need to add load job. - // mysql table is already being inserted. - context.getState().setOk(); - return; - } - - Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( - insertStmt.getDbObj(), insertStmt.getTransactionId(), - TabletCommitInfo.fromThrift(coord.getCommitInfos()), - 5000); - context.getState().setOk(); - - // record the non-streaming insert info for show load - if (!insertStmt.isStreaming()) { + // record insert info for show load stmt + if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern) { try { context.getCatalog().getLoadManager().recordFinishedLoadJob( uuid.toString(), insertStmt.getDb(), insertStmt.getTargetTable().getId(), EtlJobType.INSERT, - System.currentTimeMillis() + createTime, + throwable == null ? "" : throwable.getMessage() ); } catch (MetaNotFoundException e) { - LOG.warn("Record info of insert load with error " + e.getMessage(), e); - context.getState().setOk("Insert has been finished while info has not been recorded with " + e.getMessage()); + LOG.warn("Record info of insert load with error {}", e.getMessage(), e); + context.getState().setError("Failed to record info of insert load job, but insert job is " + + (throwable == null ? "success" : "failed")); + return; } + + // set to OK, which means the insert load job is successfully submitted. + // and user can check the job's status by label. context.getState().setOk("{'label':'" + uuid.toString() + "'}"); + } else { + // just return OK without label, which means this job is successfully done + context.getState().setOk(); } } diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 50f63c2046..f64a01344f 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -242,12 +242,8 @@ public class GlobalTransactionMgr { } LOG.debug("try to commit transaction: {}", transactionId); - if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) { - throw new TransactionCommitFailedException("all partitions have no load data"); - } - // 1. check status - // the caller method already own db lock, we not obtain db lock here + // the caller method already own db lock, we do not obtain db lock here Database db = catalog.getDb(dbId); if (null == db) { throw new MetaNotFoundException("could not find db [" + dbId + "]"); @@ -257,6 +253,7 @@ public class GlobalTransactionMgr { || transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { throw new TransactionCommitFailedException(transactionState.getReason()); } + if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { return; } @@ -264,6 +261,10 @@ public class GlobalTransactionMgr { return; } + if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) { + throw new TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG); + } + // update transaction state extra if exists if (txnCommitAttachment != null) { transactionState.setTxnCommitAttachment(txnCommitAttachment); diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionCommitFailedException.java b/fe/src/main/java/org/apache/doris/transaction/TransactionCommitFailedException.java index 2478cf9a8f..99c1348265 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionCommitFailedException.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionCommitFailedException.java @@ -22,6 +22,8 @@ public class TransactionCommitFailedException extends TransactionException { private static final long serialVersionUID = -2528170792631761535L; + public static final String NO_DATA_TO_LOAD_MSG = "all partitions have no load data"; + public TransactionCommitFailedException(String msg) { super(msg); } diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java index 188c66312b..bf0f058871 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java @@ -38,7 +38,7 @@ public class InsertLoadJobTest { public void testGetTableNames(@Mocked Catalog catalog, @Injectable Database database, @Injectable Table table) throws MetaNotFoundException { - InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1000); + InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1000, ""); String tableName = "table1"; new Expectations() { { diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java index c1ff5eb4b3..4a65fb2104 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java @@ -17,11 +17,6 @@ package org.apache.doris.load.loadv2; -import mockit.Deencapsulation; -import mockit.Expectations; -import mockit.Injectable; -import mockit.Mocked; - import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.catalog.Catalog; @@ -48,6 +43,11 @@ import java.io.FileOutputStream; import java.util.List; import java.util.Map; +import mockit.Deencapsulation; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; + public class LoadManagerTest { private LoadManager loadManager; private static final String methodName = "getIdToLoadJobs"; @@ -55,7 +55,7 @@ public class LoadManagerTest { @Before public void setUp() throws Exception { loadManager = new LoadManager(new LoadJobScheduler()); - LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis()); + LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis(), ""); Deencapsulation.invoke(loadManager, "addLoadJob", job1); }