[Bug] Fix bug that table ids is not set right for hadoop load job (#4535)
We store all table ids involved in the Load Job in TransactionState. However, for Hadoop Load job, table ids are set incorrectly. This caused the WAITING_TXN phase to not correctly wait for the completion of the previous load transaction when doing the alter table, which caused some data version loss problems.
This commit is contained in:
@ -544,6 +544,14 @@ public class LoadJob implements Writable {
|
||||
this.idToTableLoadInfo = idToTableLoadInfo;
|
||||
}
|
||||
|
||||
public List<Long> getAllTableIds() {
|
||||
List<Long> tblIds = Lists.newArrayList();
|
||||
if (idToTableLoadInfo != null) {
|
||||
tblIds.addAll(idToTableLoadInfo.keySet());
|
||||
}
|
||||
return tblIds;
|
||||
}
|
||||
|
||||
public Map<Long, TabletLoadInfo> getIdToTabletLoadInfo() {
|
||||
return idToTabletLoadInfo;
|
||||
}
|
||||
@ -692,7 +700,7 @@ public class LoadJob implements Writable {
|
||||
pushTasks.clear();
|
||||
pushTasks = null;
|
||||
}
|
||||
|
||||
|
||||
resourceInfo = null;
|
||||
}
|
||||
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.task;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
@ -30,8 +29,8 @@ import org.apache.doris.load.LoadJob.JobState;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
|
||||
import org.apache.doris.transaction.TransactionState.TxnSourceType;
|
||||
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
|
||||
import org.apache.doris.transaction.TransactionState.TxnSourceType;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
||||
@ -82,7 +81,7 @@ public abstract class LoadPendingTask extends MasterTask {
|
||||
// create etl request and make some guarantee for schema change and rollup
|
||||
if (job.getTransactionId() < 0) {
|
||||
long transactionId = Catalog.getCurrentGlobalTransactionMgr()
|
||||
.beginTransaction(dbId, Lists.newArrayList(tableId), DebugUtil.printId(UUID.randomUUID()),
|
||||
.beginTransaction(dbId, job.getAllTableIds(), DebugUtil.printId(UUID.randomUUID()),
|
||||
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
|
||||
LoadJobSourceType.FRONTEND,
|
||||
job.getTimeoutSecond());
|
||||
|
||||
Reference in New Issue
Block a user