From bde362c3cd040e841d37b33f6830605f364cb72f Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 9 Jul 2019 10:17:09 +0800 Subject: [PATCH] Modify insert operation's behavior (#1444) Before changing default insert operation to streaming load, if the select result of a insert stmt is empty, a label will still be returned to the user, and user can use this label to check the insert load job's status. After changing the insert operation, if the select result is empty, a exception will be thrown to user client directly without any label. This new usage pattern is not friendly to already existed users, which is forcing them to change their way of using insert operation. So I add a new FE config 'using_old_load_usage_pattern', default is false. If set to true, a label will be returned to user even if the select result is empty. --- .../java/org/apache/doris/common/Config.java | 8 ++ .../doris/load/loadv2/InsertLoadJob.java | 16 ++- .../apache/doris/load/loadv2/LoadManager.java | 4 +- .../org/apache/doris/qe/StmtExecutor.java | 131 +++++++++++------- .../transaction/GlobalTransactionMgr.java | 11 +- .../TransactionCommitFailedException.java | 2 + .../doris/load/loadv2/InsertLoadJobTest.java | 2 +- .../doris/load/loadv2/LoadManagerTest.java | 12 +- 8 files changed, 122 insertions(+), 64 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 7c6734b22b..10459be41d 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -824,5 +824,13 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_relocate = false; @ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_balance = false; + + /* + * If set to true, the insert stmt with processing error will still return a label to user. + * And user can use this label to check the load job's status. + * The default value is false, which means if insert operation encounter errors, + * exception will be thrown to user client directly without load label. + */ + @ConfField(mutable = true, masterOnly = true) public static boolean using_old_load_usage_pattern = false; } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java index a30075383c..ad7f9e411c 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java @@ -23,6 +23,10 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.Config; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.load.EtlJobType; +import org.apache.doris.load.FailMsg; +import org.apache.doris.load.FailMsg.CancelType; + +import com.google.common.base.Strings; import java.io.DataInput; import java.io.DataOutput; @@ -46,14 +50,20 @@ public class InsertLoadJob extends LoadJob { this.jobType = EtlJobType.INSERT; } - public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp) { + public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp, String failMsg) { super(dbId, label); this.tableId = tableId; this.createTimestamp = createTimestamp; this.loadStartTimestamp = createTimestamp; this.finishTimestamp = System.currentTimeMillis(); - this.state = JobState.FINISHED; - this.progress = 100; + if (Strings.isNullOrEmpty(failMsg)) { + this.state = JobState.FINISHED; + this.progress = 100; + } else { + this.state = JobState.CANCELLED; + this.failMsg = new FailMsg(CancelType.LOAD_RUN_FAIL, failMsg); + this.progress = 0; + } this.jobType = EtlJobType.INSERT; this.timeoutSecond = Config.insert_load_default_timeout_second; } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 1f01f0ec51..e950b16fc0 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -251,7 +251,7 @@ public class LoadManager implements Writable{ } public void recordFinishedLoadJob(String label, String dbName, long tableId, EtlJobType jobType, - long createTimestamp) throws MetaNotFoundException { + long createTimestamp, String failMsg) throws MetaNotFoundException { // get db id Database db = Catalog.getCurrentCatalog().getDb(dbName); @@ -262,7 +262,7 @@ public class LoadManager implements Writable{ LoadJob loadJob; switch (jobType) { case INSERT: - loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp); + loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp, failMsg); break; default: return; diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index 91a3c205f0..916f7b2f64 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -589,74 +589,111 @@ public class StmtExecutor { return; } - // assign request_id + long createTime = System.currentTimeMillis(); UUID uuid = UUID.randomUUID(); - context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); + Throwable throwable = null; + try { + // assign request_id + context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); - coord = new Coordinator(context, analyzer, planner); - coord.setQueryType(TQueryType.LOAD); + coord = new Coordinator(context, analyzer, planner); + coord.setQueryType(TQueryType.LOAD); - QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord); + QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord); - coord.exec(); + coord.exec(); - coord.join(context.getSessionVariable().getQueryTimeoutS()); - if (!coord.isDone()) { - coord.cancel(); - ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT); - } - - if (!coord.getExecStatus().ok()) { - String errMsg = coord.getExecStatus().getErrorMsg(); - LOG.warn("insert failed: {}", errMsg); - - // hide host info - int hostIndex = errMsg.indexOf("host"); - if (hostIndex != -1) { - errMsg = errMsg.substring(0, hostIndex); + coord.join(context.getSessionVariable().getQueryTimeoutS()); + if (!coord.isDone()) { + coord.cancel(); + ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT); } - ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); - } - LOG.info("delta files is {}", coord.getDeltaUrls()); + if (!coord.getExecStatus().ok()) { + String errMsg = coord.getExecStatus().getErrorMsg(); + LOG.warn("insert failed: {}", errMsg); - if (context.getSessionVariable().getEnableInsertStrict()) { - Map counters = coord.getLoadCounters(); - String strValue = counters.get(LoadEtlTask.DPP_ABNORMAL_ALL); - if (strValue != null && Long.valueOf(strValue) > 0) { - throw new UserException("Insert has filtered data in strict mode, tracking_url=" - + coord.getTrackingUrl()); + // hide host info + int hostIndex = errMsg.indexOf("host"); + if (hostIndex != -1) { + errMsg = errMsg.substring(0, hostIndex); + } + ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); } + + LOG.info("delta files is {}", coord.getDeltaUrls()); + + if (context.getSessionVariable().getEnableInsertStrict()) { + Map counters = coord.getLoadCounters(); + String strValue = counters.get(LoadEtlTask.DPP_ABNORMAL_ALL); + if (strValue != null && Long.valueOf(strValue) > 0) { + throw new UserException("Insert has filtered data in strict mode, tracking_url=" + + coord.getTrackingUrl()); + } + } + + if (insertStmt.getTargetTable().getType() != TableType.OLAP) { + // no need to add load job. + // MySQL table is already being inserted. + context.getState().setOk(); + return; + } + + Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( + insertStmt.getDbObj(), insertStmt.getTransactionId(), + TabletCommitInfo.fromThrift(coord.getCommitInfos()), + 5000); + } catch (Throwable t) { + // if any throwable being thrown during insert operation, first we should abort this txn + LOG.warn("handle insert stmt fail: {}", DebugUtil.printId(uuid), t); + try { + Catalog.getCurrentGlobalTransactionMgr().abortTransaction( + insertStmt.getTransactionId(), + t.getMessage() == null ? "unknown reason" : t.getMessage()); + } catch (Exception abortTxnException) { + // just print a log if abort txn failed. This failure do not need to pass to user. + // user only concern abort how txn failed. + LOG.warn("errors when abort txn", abortTxnException); + } + + if (!Config.using_old_load_usage_pattern) { + // if not using old usage pattern, the exception will be thrown to user directly without + // a label + throw t; + } + + /* + * If config 'using_old_load_usage_pattern' is true. + * Doris will return a label to user, and user can use this label to check load job's status, + * which exactly like the old insert stmt usage pattern. + */ + throwable = t; } - if (insertStmt.getTargetTable().getType() != TableType.OLAP) { - // no need to add load job. - // mysql table is already being inserted. - context.getState().setOk(); - return; - } - - Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( - insertStmt.getDbObj(), insertStmt.getTransactionId(), - TabletCommitInfo.fromThrift(coord.getCommitInfos()), - 5000); - context.getState().setOk(); - - // record the non-streaming insert info for show load - if (!insertStmt.isStreaming()) { + // record insert info for show load stmt + if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern) { try { context.getCatalog().getLoadManager().recordFinishedLoadJob( uuid.toString(), insertStmt.getDb(), insertStmt.getTargetTable().getId(), EtlJobType.INSERT, - System.currentTimeMillis() + createTime, + throwable == null ? "" : throwable.getMessage() ); } catch (MetaNotFoundException e) { - LOG.warn("Record info of insert load with error " + e.getMessage(), e); - context.getState().setOk("Insert has been finished while info has not been recorded with " + e.getMessage()); + LOG.warn("Record info of insert load with error {}", e.getMessage(), e); + context.getState().setError("Failed to record info of insert load job, but insert job is " + + (throwable == null ? "success" : "failed")); + return; } + + // set to OK, which means the insert load job is successfully submitted. + // and user can check the job's status by label. context.getState().setOk("{'label':'" + uuid.toString() + "'}"); + } else { + // just return OK without label, which means this job is successfully done + context.getState().setOk(); } } diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 50f63c2046..f64a01344f 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -242,12 +242,8 @@ public class GlobalTransactionMgr { } LOG.debug("try to commit transaction: {}", transactionId); - if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) { - throw new TransactionCommitFailedException("all partitions have no load data"); - } - // 1. check status - // the caller method already own db lock, we not obtain db lock here + // the caller method already own db lock, we do not obtain db lock here Database db = catalog.getDb(dbId); if (null == db) { throw new MetaNotFoundException("could not find db [" + dbId + "]"); @@ -257,6 +253,7 @@ public class GlobalTransactionMgr { || transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { throw new TransactionCommitFailedException(transactionState.getReason()); } + if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { return; } @@ -264,6 +261,10 @@ public class GlobalTransactionMgr { return; } + if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) { + throw new TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG); + } + // update transaction state extra if exists if (txnCommitAttachment != null) { transactionState.setTxnCommitAttachment(txnCommitAttachment); diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionCommitFailedException.java b/fe/src/main/java/org/apache/doris/transaction/TransactionCommitFailedException.java index 2478cf9a8f..99c1348265 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionCommitFailedException.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionCommitFailedException.java @@ -22,6 +22,8 @@ public class TransactionCommitFailedException extends TransactionException { private static final long serialVersionUID = -2528170792631761535L; + public static final String NO_DATA_TO_LOAD_MSG = "all partitions have no load data"; + public TransactionCommitFailedException(String msg) { super(msg); } diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java index 188c66312b..bf0f058871 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java @@ -38,7 +38,7 @@ public class InsertLoadJobTest { public void testGetTableNames(@Mocked Catalog catalog, @Injectable Database database, @Injectable Table table) throws MetaNotFoundException { - InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1000); + InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1000, ""); String tableName = "table1"; new Expectations() { { diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java index c1ff5eb4b3..4a65fb2104 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java @@ -17,11 +17,6 @@ package org.apache.doris.load.loadv2; -import mockit.Deencapsulation; -import mockit.Expectations; -import mockit.Injectable; -import mockit.Mocked; - import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.catalog.Catalog; @@ -48,6 +43,11 @@ import java.io.FileOutputStream; import java.util.List; import java.util.Map; +import mockit.Deencapsulation; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; + public class LoadManagerTest { private LoadManager loadManager; private static final String methodName = "getIdToLoadJobs"; @@ -55,7 +55,7 @@ public class LoadManagerTest { @Before public void setUp() throws Exception { loadManager = new LoadManager(new LoadJobScheduler()); - LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis()); + LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis(), ""); Deencapsulation.invoke(loadManager, "addLoadJob", job1); }