branch-2.1: [improve](binlog) Allow commit txn without waiting txn publish #48961 (#49266)

cherry pick from #48961
This commit is contained in:
walter
2025-03-29 08:59:23 +08:00
committed by GitHub
parent 646f49fb93
commit f55055096b
9 changed files with 64 additions and 26 deletions

View File

@ -321,7 +321,7 @@ public class BrokerLoadJob extends BulkLoadJob {
.add("txn_id", transactionId)
.add("msg", "Load job try to commit txn")
.build());
Env.getCurrentGlobalTransactionMgr().commitTransaction(
Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
finishTimestamp, state, failMsg));

View File

@ -653,7 +653,7 @@ public class SparkLoadJob extends BulkLoadJob {
Lists.newArrayList(tableToLoadPartitions.keySet()));
MetaLockUtils.writeLockTablesOrMetaException(tableList);
try {
Env.getCurrentGlobalTransactionMgr().commitTransaction(
Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
finishTimestamp, state, failMsg));

View File

@ -1747,11 +1747,21 @@ public class FrontendServiceImpl implements FrontendService.Iface {
long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : 5000;
// Step 5: commit and publish
return Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, tableList,
request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
if (!request.isOnlyCommit()) {
return Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, tableList,
request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
} else {
// single table commit, so don't need to wait for publish.
Env.getCurrentGlobalTransactionMgr()
.commitTransaction(db, tableList,
request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
return true;
}
}
@Override

View File

@ -226,10 +226,10 @@ public class GlobalTransactionMgr implements Writable {
}
@Deprecated
public void commitTransaction(long dbId, List<Table> tableList,
public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws UserException {
commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null);
commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, null);
}
/**
@ -241,7 +241,7 @@ public class GlobalTransactionMgr implements Writable {
* @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time
* @note callers should get all tables' write locks before call this api
*/
public void commitTransaction(long dbId, List<Table> tableList, long transactionId,
public void commitTransactionWithoutLock(long dbId, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
throws UserException {
if (Config.disable_load_job) {
@ -255,6 +255,20 @@ public class GlobalTransactionMgr implements Writable {
dbTransactionMgr.commitTransaction(tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false);
}
public void commitTransaction(DatabaseIf db, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment)
throws UserException {
if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) {
throw new UserException("get tableList write lock timeout, tableList=("
+ StringUtils.join(tableList, ",") + ")");
}
try {
commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
}
private void commitTransaction2PC(long dbId, long transactionId)
throws UserException {
if (Config.disable_load_job) {
@ -282,7 +296,7 @@ public class GlobalTransactionMgr implements Writable {
+ StringUtils.join(tableList, ",") + ")");
}
try {
commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
@ -313,7 +327,7 @@ public class GlobalTransactionMgr implements Writable {
}
stopWatch.stop();
LOG.info("stream load tasks are committed successfully. txns: {}. time cost: {} ms."
+ " data will be visable later.", transactionId, stopWatch.getTime());
+ " data will be visible later.", transactionId, stopWatch.getTime());
}
public void abortTransaction(long dbId, long transactionId, String reason) throws UserException {

View File

@ -358,7 +358,8 @@ public class DeleteHandlerTest {
new Expectations(globalTransactionMgr) {
{
try {
globalTransactionMgr.commitTransaction(anyLong, (List<Table>) any, anyLong, (List<TabletCommitInfo>) any, (TxnCommitAttachment) any);
globalTransactionMgr.commitTransactionWithoutLock(
anyLong, (List<Table>) any, anyLong, (List<TabletCommitInfo>) any, (TxnCommitAttachment) any);
} catch (UserException e) {
// CHECKSTYLE IGNORE THIS LINE
}

View File

@ -377,7 +377,8 @@ public class SparkLoadJobTest {
AgentTaskExecutor.submit((AgentBatchTask) any);
Env.getCurrentGlobalTransactionMgr();
result = transactionMgr;
transactionMgr.commitTransaction(dbId, (List<Table>) any, transactionId, (List<TabletCommitInfo>) any,
transactionMgr.commitTransactionWithoutLock(
dbId, (List<Table>) any, transactionId, (List<TabletCommitInfo>) any,
(LoadJobFinalOperation) any);
}
};

View File

@ -111,7 +111,8 @@ public class DatabaseTransactionMgrTest {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1,
transTablets, null);
TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1);
setTransactionFinishPublish(transactionState1,

View File

@ -172,8 +172,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
@ -213,7 +213,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo2);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
// follower catalog replay the transaction
@ -235,7 +236,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
@ -265,7 +267,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
@ -366,7 +369,8 @@ public class GlobalTransactionMgrTest {
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
masterTransMgr.commitTransactionWithoutLock(
1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(jobStatistic, "currentTotalRows"));
@ -439,7 +443,8 @@ public class GlobalTransactionMgrTest {
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
masterTransMgr.commitTransactionWithoutLock(
1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
// current total rows and error rows will be reset after job pause, so here they should be 0.
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
@ -471,7 +476,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
@ -537,7 +543,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo2);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
// follower catalog replay the transaction
@ -607,7 +614,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
@ -624,7 +632,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
masterTransMgr.commitTransactionWithoutLock(
CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit

View File

@ -824,6 +824,8 @@ struct TCommitTxnRequest {
10: optional i64 thrift_rpc_timeout_ms
11: optional string token
12: optional i64 db_id
// used for ccr
15: optional bool only_commit // only commit txn, without waiting txn publish
}
struct TCommitTxnResult {