Support determine isPreviousLoadFinished for some alter jobs in table level (#3196)

This PR is to reduce the time cost for waiting transactions to be completed in same db by filter the running transactions in table level.

NOTICE: Update FE meta version to 79
This commit is contained in:
caiconghui
2020-03-27 07:16:23 -05:00
committed by GitHub
parent 0462607d8d
commit 32c4fc691c
19 changed files with 108 additions and 49 deletions

View File

@ -17,6 +17,7 @@
package org.apache.doris.alter;
import com.google.common.collect.Lists;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
@ -291,7 +292,7 @@ public abstract class AlterJob implements Writable {
return true;
} else {
isPreviousLoadFinished = Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
transactionId, dbId);
transactionId, dbId, Lists.newArrayList(tableId));
return isPreviousLoadFinished;
}
}

View File

@ -498,7 +498,7 @@ public class RollupJobV2 extends AlterJobV2 {
// Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished.
protected boolean isPreviousLoadFinished() {
return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId);
return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId, Lists.newArrayList(tableId));
}
public static RollupJobV2 read(DataInput in) throws IOException {

View File

@ -617,7 +617,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
// Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished.
protected boolean isPreviousLoadFinished() {
return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId);
return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId, Lists.newArrayList(tableId));
}
public static SchemaChangeJobV2 read(DataInput in) throws IOException {

View File

@ -292,7 +292,8 @@ public class InsertStmt extends DdlStmt {
if (targetTable instanceof OlapTable) {
LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
label, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond);
Lists.newArrayList(targetTable.getId()), label, "FE: " + FrontendOptions.getLocalHostAddress(),
sourceType, timeoutSecond);
}
isTransactionBegin = true;
}

View File

@ -880,7 +880,7 @@ public class TabletScheduler extends MasterDaemon {
} else if (replica.getState() == ReplicaState.DECOMMISSION && replica.getWatermarkTxnId() != -1) {
long watermarkTxnId = replica.getWatermarkTxnId();
if (!Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId,
tabletCtx.getDbId())) {
tabletCtx.getDbId(), Lists.newArrayList(tabletCtx.getTblId()))) {
throw new SchedException(Status.SCHEDULE_FAILED, "wait txn before " + watermarkTxnId + " to be finished");
}
}

View File

@ -167,6 +167,9 @@ public final class FeMetaVersion {
public static final int VERSION_77 = 77;
// plugin support
public static final int VERSION_78 = 78;
// for transaction state in table level
public static final int VERSION_79 = 79;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_78;
public static final int VERSION_CURRENT = VERSION_79;
}

View File

@ -3326,7 +3326,8 @@ public class Load {
}
loadDeleteJob.setIdToTabletLoadInfo(idToTabletLoadInfo);
loadDeleteJob.setState(JobState.LOADING);
long transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), jobLabel,
long transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
Lists.newArrayList(table.getId()), jobLabel,
"FE: " + FrontendOptions.getLocalHostAddress(), LoadJobSourceType.FRONTEND,
Config.stream_load_default_timeout_second);
loadDeleteJob.setTransactionId(transactionId);

View File

@ -78,6 +78,7 @@ public class LoadJob implements Writable {
private long id;
private long dbId;
private long tableId;
private String label;
// when this job is a real time load job, the job is attach with a transaction
private long transactionId = -1;
@ -144,6 +145,7 @@ public class LoadJob implements Writable {
DeleteInfo deleteInfo) {
this.id = id;
this.dbId = dbId;
this.tableId = tableId;
this.label = label;
this.transactionId = -1;
this.timestamp = -1;
@ -243,6 +245,10 @@ public class LoadJob implements Writable {
return dbId;
}
public long getTableId() {
return tableId;
}
public void setDbId(long dbId) {
this.dbId = dbId;
}

View File

@ -208,7 +208,8 @@ public class BrokerLoadJob extends LoadJob {
public void beginTxn()
throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException {
transactionId = Catalog.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, label, null, "FE: " + FrontendOptions.getLocalHostAddress(),
.beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
"FE: " + FrontendOptions.getLocalHostAddress(),
TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id,
timeoutSecond);
}

View File

@ -222,7 +222,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
public long getDbId() {
return dbId;
}
public String getLabel() {
return label;
}

View File

@ -23,6 +23,7 @@ import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@ -137,11 +138,12 @@ public class LoadManager implements Writable{
cluster = request.getCluster();
}
Database database = checkDb(ClusterNamespace.getFullName(cluster, request.getDb()));
Table table = database.getTable(request.tbl);
checkTable(database, request.getTbl());
LoadJob loadJob = null;
writeLock();
try {
loadJob = new MiniLoadJob(database.getId(), request);
loadJob = new MiniLoadJob(database.getId(), table.getId(), request);
// call unprotectedExecute before adding load job. so that if job is not started ok, no need to add.
// NOTICE(cmy): this order is only for Mini Load, because mini load's unprotectedExecute() only do beginTxn().
// for other kind of load job, execute the job after adding job.

View File

@ -17,6 +17,7 @@
package org.apache.doris.load.loadv2;
import com.google.common.collect.Lists;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
@ -47,14 +48,17 @@ public class MiniLoadJob extends LoadJob {
private String tableName;
private long tableId;
// only for log replay
public MiniLoadJob() {
super();
this.jobType = EtlJobType.MINI;
}
public MiniLoadJob(long dbId, TMiniLoadBeginRequest request) throws MetaNotFoundException {
public MiniLoadJob(long dbId, long tableId, TMiniLoadBeginRequest request) throws MetaNotFoundException {
super(dbId, request.getLabel());
this.tableId = tableId;
this.jobType = EtlJobType.MINI;
this.tableName = request.getTbl();
if (request.isSetTimeout_second()) {
@ -93,7 +97,7 @@ public class MiniLoadJob extends LoadJob {
public void beginTxn()
throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException {
transactionId = Catalog.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, label, requestId, "FE: " + FrontendOptions.getLocalHostAddress(),
.beginTransaction(dbId, Lists.newArrayList(tableId), label, requestId, "FE: " + FrontendOptions.getLocalHostAddress(),
TransactionState.LoadJobSourceType.BACKEND_STREAMING, id,
timeoutSecond);
}

View File

@ -164,7 +164,8 @@ public abstract class RoutineLoadTaskInfo {
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
try {
txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
routineLoadJob.getDbId(), DebugUtil.printId(id), null, "FE: " + FrontendOptions.getLocalHostAddress(),
routineLoadJob.getDbId(), Lists.newArrayList(routineLoadJob.getTableId()), DebugUtil.printId(id), null,
"FE: " + FrontendOptions.getLocalHostAddress(),
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId(),
timeoutMs / 1000);
} catch (DuplicatedRequestException e) {

View File

@ -670,6 +670,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
Catalog catalog = Catalog.getInstance();
String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb());
Database db = catalog.getDb(fullDbName);
Table table = db.getTable(request.tbl);
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(request.getCluster())) {
@ -681,7 +682,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
// begin
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;
return Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
db.getId(), request.getLabel(), request.getRequest_id(), "BE: " + clientIp,
db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequest_id(), "BE: " + clientIp,
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.task;
import com.google.common.collect.Lists;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.util.DebugUtil;
@ -67,6 +68,7 @@ public abstract class LoadPendingTask extends MasterTask {
// get db
long dbId = job.getDbId();
long tableId = job.getTableId();
db = Catalog.getInstance().getDb(dbId);
if (db == null) {
load.cancelLoadJob(job, CancelType.ETL_SUBMIT_FAIL, "db does not exist. id: " + dbId);
@ -78,7 +80,7 @@ public abstract class LoadPendingTask extends MasterTask {
// create etl request and make some guarantee for schema change and rollup
if (job.getTransactionId() < 0) {
long transactionId = Catalog.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, DebugUtil.printId(UUID.randomUUID()),
.beginTransaction(dbId, Lists.newArrayList(tableId), DebugUtil.printId(UUID.randomUUID()),
"FE: " + FrontendOptions.getLocalHostAddress(), LoadJobSourceType.FRONTEND,
job.getTimeoutSecond());
job.setTransactionId(transactionId);

View File

@ -17,6 +17,7 @@
package org.apache.doris.transaction;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
@ -121,10 +122,10 @@ public class GlobalTransactionMgr implements Writable {
return callbackFactory;
}
public long beginTransaction(long dbId, String label, String coordinator, LoadJobSourceType sourceType,
public long beginTransaction(long dbId, List<Long> tableIdList, String label, String coordinator, LoadJobSourceType sourceType,
long timeoutSecond)
throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException {
return beginTransaction(dbId, label, null, coordinator, sourceType, -1, timeoutSecond);
return beginTransaction(dbId, tableIdList, label, null, coordinator, sourceType, -1, timeoutSecond);
}
/**
@ -140,7 +141,7 @@ public class GlobalTransactionMgr implements Writable {
* @throws DuplicatedRequestException
* @throws IllegalTransactionParameterException
*/
public long beginTransaction(long dbId, String label, TUniqueId requestId,
public long beginTransaction(long dbId, List<Long> tableIdList, String label, TUniqueId requestId,
String coordinator, LoadJobSourceType sourceType, long listenerId, long timeoutSecond)
throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException {
@ -196,7 +197,7 @@ public class GlobalTransactionMgr implements Writable {
long tid = idGenerator.getNextTransactionId();
LOG.info("begin transaction: txn id {} with label {} from coordinator {}", tid, label, coordinator);
TransactionState transactionState = new TransactionState(dbId, tid, label, requestId, sourceType,
TransactionState transactionState = new TransactionState(dbId, tableIdList, tid, label, requestId, sourceType,
coordinator, listenerId, timeoutSecond * 1000);
transactionState.setPrepareTime(System.currentTimeMillis());
unprotectUpsertTransactionState(transactionState);
@ -802,25 +803,37 @@ public class GlobalTransactionMgr implements Writable {
}
// check if there exists a load job before the endTransactionId have all finished
// load job maybe started but could not know the affected table id, so that we not check by table
public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId) {
readLock();
try {
for (Map.Entry<Long, TransactionState> entry : idToTransactionState.entrySet()) {
if (entry.getValue().getDbId() != dbId || !entry.getValue().isRunning()) {
continue;
}
if (entry.getKey() <= endTransactionId) {
LOG.debug("find a running txn with txn_id={} on db: {}, less than watermark txn_id {}",
entry.getKey(), dbId, endTransactionId);
return false;
}
public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId, List<Long> tableIdList) {
for (Map.Entry<Long, TransactionState> entry : idToTransactionState.entrySet()) {
if (entry.getValue().getDbId() != dbId || !isIntersectionNotEmpty(entry.getValue().getTableIdList(),
tableIdList) || !entry.getValue().isRunning()) {
continue;
}
if (entry.getKey() <= endTransactionId) {
LOG.debug("find a running txn with txn_id={} on db: {}, less than watermark txn_id {}",
entry.getKey(), dbId, endTransactionId);
return false;
}
} finally {
readUnlock();
}
return true;
}
// check if there exists a intersection between the source tableId list and target tableId list
// if one of them is null or empty, that means that we don't know related tables in tableList,
// we think the two lists may have intersection for right ordered txns
public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> targetTableIdList) {
if (CollectionUtils.isEmpty(sourceTableIdList) || CollectionUtils.isEmpty(targetTableIdList)) {
return true;
}
for (int i = 0; i < sourceTableIdList.size(); i++) {
for (int j = 0; j < targetTableIdList.size(); j++) {
if (sourceTableIdList.get(i).equals(targetTableIdList.get(j))) {
return true;
}
}
}
return false;
}
/*
* The txn cleaner will run at a fixed interval and try to delete expired and timeout txns:

View File

@ -17,6 +17,8 @@
package org.apache.doris.transaction;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
@ -40,6 +42,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -126,6 +129,7 @@ public class TransactionState implements Writable {
}
private long dbId;
private List<Long> tableIdList;
private long transactionId;
private String label;
// requsetId is used to judge whether a begin request is a internal retry request.
@ -168,6 +172,7 @@ public class TransactionState implements Writable {
public TransactionState() {
this.dbId = -1;
this.tableIdList = Lists.newArrayList();
this.transactionId = -1;
this.label = "";
this.idToTableCommitInfos = Maps.newHashMap();
@ -184,9 +189,10 @@ public class TransactionState implements Writable {
this.latch = new CountDownLatch(1);
}
public TransactionState(long dbId, long transactionId, String label, TUniqueId requsetId,
public TransactionState(long dbId, List<Long> tableIdList, long transactionId, String label, TUniqueId requsetId,
LoadJobSourceType sourceType, String coordinator, long callbackId, long timeoutMs) {
this.dbId = dbId;
this.tableIdList = (tableIdList == null ? Lists.newArrayList() : tableIdList);
this.transactionId = transactionId;
this.label = label;
this.requsetId = requsetId;
@ -408,7 +414,11 @@ public class TransactionState implements Writable {
public long getDbId() {
return dbId;
}
public List<Long> getTableIdList() {
return tableIdList;
}
public Map<Long, TableCommitInfo> getIdToTableCommitInfos() {
return idToTableCommitInfos;
}
@ -467,6 +477,7 @@ public class TransactionState implements Writable {
sb.append("transaction id: ").append(transactionId);
sb.append(", label: ").append(label);
sb.append(", db id: ").append(dbId);
sb.append(", table id list: ").append(StringUtils.join(tableIdList, ","));
sb.append(", callback id: ").append(callbackId);
sb.append(", coordinator: ").append(coordinator);
sb.append(", transaction status: ").append(transactionStatus);
@ -533,6 +544,10 @@ public class TransactionState implements Writable {
}
out.writeLong(callbackId);
out.writeLong(timeoutMs);
out.writeInt(tableIdList.size());
for (int i = 0; i < tableIdList.size(); i++) {
out.writeLong(tableIdList.get(i));
}
}
public void readFields(DataInput in) throws IOException {
@ -564,5 +579,13 @@ public class TransactionState implements Writable {
callbackId = in.readLong();
timeoutMs = in.readLong();
}
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_79) {
tableIdList = Lists.newArrayList();
int tableListSize = in.readInt();
for (int i = 0; i < tableListSize; i++) {
tableIdList.add(in.readLong());
}
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.doris.load.loadv2;
import com.google.common.collect.Lists;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
@ -106,7 +107,7 @@ public class LoadJobTest {
LoadJob loadJob = new BrokerLoadJob();
new Expectations() {
{
globalTransactionMgr.beginTransaction(anyLong, anyString, (TUniqueId) any, anyString,
globalTransactionMgr.beginTransaction(anyLong, Lists.newArrayList(), anyString, (TUniqueId) any, anyString,
(TransactionState.LoadJobSourceType) any, anyLong, anyLong);
minTimes = 0;
result = 1;

View File

@ -106,7 +106,7 @@ public class GlobalTransactionMgrTest {
public void testBeginTransaction() throws LabelAlreadyUsedException, AnalysisException,
BeginTransactionException, DuplicatedRequestException {
FakeCatalog.setCatalog(masterCatalog);
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@ -124,7 +124,7 @@ public class GlobalTransactionMgrTest {
FakeCatalog.setCatalog(masterCatalog);
long transactionId = 0;
try {
transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@ -141,7 +141,7 @@ public class GlobalTransactionMgrTest {
assertEquals(transactionSource, transactionState.getCoordinator());
try {
transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@ -154,7 +154,7 @@ public class GlobalTransactionMgrTest {
@Test
public void testCommitTransaction1() throws UserException {
FakeCatalog.setCatalog(masterCatalog);
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@ -195,7 +195,7 @@ public class GlobalTransactionMgrTest {
public void testCommitTransactionWithOneFailed() throws UserException {
TransactionState transactionState = null;
FakeCatalog.setCatalog(masterCatalog);
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@ -217,7 +217,7 @@ public class GlobalTransactionMgrTest {
FakeCatalog.setCatalog(masterCatalog);
// commit another transaction with 1,3 success
long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable2,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@ -320,7 +320,7 @@ public class GlobalTransactionMgrTest {
partitionIdToOffset);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
TransactionState transactionState = new TransactionState(1L, 1L, "label", null,
TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null,
LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(),
Config.stream_load_default_timeout_second);
transactionState.setTransactionStatus(TransactionStatus.PREPARE);
@ -387,7 +387,7 @@ public class GlobalTransactionMgrTest {
partitionIdToOffset);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
TransactionState transactionState = new TransactionState(1L, 1L, "label", null,
TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null,
LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(),
Config.stream_load_default_timeout_second);
transactionState.setTransactionStatus(TransactionStatus.PREPARE);
@ -431,7 +431,7 @@ public class GlobalTransactionMgrTest {
}
public void testFinishTransaction() throws UserException {
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@ -477,7 +477,7 @@ public class GlobalTransactionMgrTest {
.getPartition(CatalogTestUtil.testPartition1);
Tablet tablet = testPartition.getIndex(CatalogTestUtil.testIndexId1).getTablet(CatalogTestUtil.testTabletId1);
FakeCatalog.setCatalog(masterCatalog);
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@ -531,7 +531,7 @@ public class GlobalTransactionMgrTest {
FakeCatalog.setCatalog(masterCatalog);
// commit another transaction with 1,3 success
long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable2,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@ -603,7 +603,7 @@ public class GlobalTransactionMgrTest {
public void testDeleteTransaction() throws LabelAlreadyUsedException,
AnalysisException, BeginTransactionException, DuplicatedRequestException {
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);