From a390c9ee9f2e4db1e4f08fc3ba2869ccfd888b9d Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Fri, 4 Sep 2020 17:39:37 +0800 Subject: [PATCH] [Bug] Fix bug that table ids is not set right for hadoop load job (#4535) We store all table ids involved in the Load Job in TransactionState. However, for Hadoop Load job, table ids are set incorrectly. This caused the WAITING_TXN phase to not correctly wait for the completion of the previous load transaction when doing the alter table, which caused some data version loss problems. --- .../src/main/java/org/apache/doris/load/LoadJob.java | 10 +++++++++- .../java/org/apache/doris/task/LoadPendingTask.java | 5 ++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java index d56b9cc987..88670e8bb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java @@ -544,6 +544,14 @@ public class LoadJob implements Writable { this.idToTableLoadInfo = idToTableLoadInfo; } + public List getAllTableIds() { + List tblIds = Lists.newArrayList(); + if (idToTableLoadInfo != null) { + tblIds.addAll(idToTableLoadInfo.keySet()); + } + return tblIds; + } + public Map getIdToTabletLoadInfo() { return idToTabletLoadInfo; } @@ -692,7 +700,7 @@ public class LoadJob implements Writable { pushTasks.clear(); pushTasks = null; } - + resourceInfo = null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadPendingTask.java index 848027f60a..3dca5c3144 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadPendingTask.java @@ -17,7 +17,6 @@ package org.apache.doris.task; -import com.google.common.collect.Lists; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.common.util.DebugUtil; @@ -30,8 +29,8 @@ import org.apache.doris.load.LoadJob.JobState; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; -import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; +import org.apache.doris.transaction.TransactionState.TxnSourceType; import com.google.common.base.Joiner; @@ -82,7 +81,7 @@ public abstract class LoadPendingTask extends MasterTask { // create etl request and make some guarantee for schema change and rollup if (job.getTransactionId() < 0) { long transactionId = Catalog.getCurrentGlobalTransactionMgr() - .beginTransaction(dbId, Lists.newArrayList(tableId), DebugUtil.printId(UUID.randomUUID()), + .beginTransaction(dbId, job.getAllTableIds(), DebugUtil.printId(UUID.randomUUID()), new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), LoadJobSourceType.FRONTEND, job.getTimeoutSecond());