diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java index 8c2bdce4f8..e11868e26a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java @@ -48,10 +48,11 @@ public class InsertLoadJob extends LoadJob { super(EtlJobType.INSERT); } - public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp, String failMsg, + public InsertLoadJob(String label, long transactionId, long dbId, long tableId, long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException { super(EtlJobType.INSERT, dbId, label); this.tableId = tableId; + this.transactionId = transactionId; this.createTimestamp = createTimestamp; this.loadStartTimestamp = createTimestamp; this.finishTimestamp = System.currentTimeMillis(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 2715e2c033..0c71559590 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -260,9 +260,11 @@ public class LoadManager implements Writable{ return; } addLoadJob(loadJob); - // add callback before txn created, because callback will be performed on replay without txn begin + // add callback before txn if load job is uncompleted, because callback will be performed on replay without txn begin // register txn state listener - Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob); + if (!loadJob.isCompleted()) { + Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob); + } } private void addLoadJob(LoadJob loadJob) { @@ -278,7 +280,7 @@ public class LoadManager implements Writable{ labelToLoadJobs.get(loadJob.getLabel()).add(loadJob); } - public void recordFinishedLoadJob(String label, String dbName, long tableId, EtlJobType jobType, + public void recordFinishedLoadJob(String label, long transactionId, String dbName, long tableId, EtlJobType jobType, long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException { // get db id @@ -287,7 +289,7 @@ public class LoadManager implements Writable{ LoadJob loadJob; switch (jobType) { case INSERT: - loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp, failMsg, trackingUrl); + loadJob = new InsertLoadJob(label, transactionId, db.getId(), tableId, createTimestamp, failMsg, trackingUrl); break; default: return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index d581e70412..f1e8e5bcb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1288,9 +1288,11 @@ public class StmtExecutor implements ProfileWriter { message = throwable.getMessage(); } + txnId = insertStmt.getTransactionId(); try { context.getCatalog().getLoadManager().recordFinishedLoadJob( label, + txnId, insertStmt.getDb(), insertStmt.getTargetTable().getId(), EtlJobType.INSERT, @@ -1301,7 +1303,6 @@ public class StmtExecutor implements ProfileWriter { LOG.warn("Record info of insert load with error {}", e.getMessage(), e); errMsg = "Record info of insert load with error " + e.getMessage(); } - txnId = insertStmt.getTransactionId(); } // {'label':'my_label1', 'status':'visible', 'txnId':'123'} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java index cba2f717a5..a7f2bb3766 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java @@ -39,7 +39,7 @@ public class InsertLoadJobTest { public void testGetTableNames(@Mocked Catalog catalog, @Injectable Database database, @Injectable Table table) throws MetaNotFoundException { - InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1000, "", ""); + InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1L, 1000, "", ""); String tableName = "table1"; new Expectations() { { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java index d33f31027c..f014411436 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java @@ -133,7 +133,7 @@ public class LoadManagerTest { }; loadManager = new LoadManager(new LoadJobScheduler()); - LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis(), "", ""); + LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, System.currentTimeMillis(), "", ""); Deencapsulation.invoke(loadManager, "addLoadJob", job1); File file = serializeToFile(loadManager); @@ -168,7 +168,7 @@ public class LoadManagerTest { }; loadManager = new LoadManager(new LoadJobScheduler()); - LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis(), "", ""); + LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, System.currentTimeMillis(), "", ""); Deencapsulation.invoke(loadManager, "addLoadJob", job1); //make job1 don't serialize