[fix](mtmv)resolve the issue of table version updates in concurrent situations (#32487)
Move the logic for version+1 from `unprotectedCommitTransaction `to`FinishTransaction`, as the write lock for the table was obtained in `FinishTransaction`
This commit is contained in:
@ -1033,6 +1033,7 @@ public class DatabaseTransactionMgr {
|
||||
transactionState.setFinishTime(System.currentTimeMillis());
|
||||
transactionState.clearErrorMsg();
|
||||
transactionState.setTransactionStatus(TransactionStatus.VISIBLE);
|
||||
setTableVersion(transactionState, db);
|
||||
unprotectUpsertTransactionState(transactionState, false);
|
||||
txnOperated = true;
|
||||
// TODO(cmy): We found a very strange problem. When delete-related transactions are processed here,
|
||||
@ -1063,6 +1064,20 @@ public class DatabaseTransactionMgr {
|
||||
transactionState, transactionState.getPublishCount(), publishResult.name());
|
||||
}
|
||||
|
||||
private void setTableVersion(TransactionState transactionState, Database db) {
|
||||
Map<Long, TableCommitInfo> idToTableCommitInfos = transactionState.getIdToTableCommitInfos();
|
||||
for (Entry<Long, TableCommitInfo> entry : idToTableCommitInfos.entrySet()) {
|
||||
OlapTable table = (OlapTable) db.getTableNullable(entry.getKey());
|
||||
if (table == null) {
|
||||
LOG.warn("table {} does not exist when setTableVersion. transaction: {}, db: {}",
|
||||
entry.getKey(), transactionState.getTransactionId(), db.getId());
|
||||
continue;
|
||||
}
|
||||
entry.getValue().setVersion(table.getNextVersion());
|
||||
entry.getValue().setVersionTime(System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean finishCheckPartitionVersion(TransactionState transactionState, Database db,
|
||||
List<Pair<OlapTable, Partition>> relatedTblPartitions) {
|
||||
Iterator<TableCommitInfo> tableCommitInfoIterator
|
||||
@ -1316,8 +1331,7 @@ public class DatabaseTransactionMgr {
|
||||
transactionState.setErrorReplicas(errorReplicaIds);
|
||||
for (long tableId : tableToPartition.keySet()) {
|
||||
OlapTable table = (OlapTable) db.getTableNullable(tableId);
|
||||
TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId, table.getNextVersion(),
|
||||
System.currentTimeMillis());
|
||||
TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
|
||||
PartitionInfo tblPartitionInfo = table.getPartitionInfo();
|
||||
for (long partitionId : tableToPartition.get(tableId)) {
|
||||
String partitionRange = "";
|
||||
@ -1357,8 +1371,7 @@ public class DatabaseTransactionMgr {
|
||||
transactionState.setErrorReplicas(errorReplicaIds);
|
||||
for (long tableId : tableToPartition.keySet()) {
|
||||
OlapTable table = (OlapTable) db.getTableNullable(tableId);
|
||||
TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId, table.getNextVersion(),
|
||||
System.currentTimeMillis());
|
||||
TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
|
||||
PartitionInfo tblPartitionInfo = table.getPartitionInfo();
|
||||
for (long partitionId : tableToPartition.get(tableId)) {
|
||||
Partition partition = table.getPartition(partitionId);
|
||||
|
||||
@ -46,11 +46,9 @@ public class TableCommitInfo implements Writable {
|
||||
|
||||
}
|
||||
|
||||
public TableCommitInfo(long tableId, long version, long visibleTime) {
|
||||
public TableCommitInfo(long tableId) {
|
||||
this.tableId = tableId;
|
||||
idToPartitionCommitInfo = Maps.newHashMap();
|
||||
this.version = version;
|
||||
this.versionTime = visibleTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user