[fix](binlog) Add is temp for UpsertRecord (#35774)

Cherry-pick #35636.

The ccr-syncer does not support syncing temporary partitions, so this PR
adds a field to record whether this upsert record comes from a temporary
partition.
This commit is contained in:
walter
2024-06-03 12:41:29 +08:00
committed by GitHub
parent c033c71aed
commit f25b7fb4eb
5 changed files with 27 additions and 4 deletions

View File

@ -42,6 +42,9 @@ public class UpsertRecord {
@SerializedName(value = "version")
public long version;
@SerializedName(value = "isTempPartition")
public boolean isTemp;
}
@SerializedName(value = "partitionRecords")
@ -60,6 +63,7 @@ public class UpsertRecord {
partitionRecord.partitionId = partitionCommitInfo.getPartitionId();
partitionRecord.range = partitionCommitInfo.getPartitionRange();
partitionRecord.version = partitionCommitInfo.getVersion();
partitionRecord.isTemp = partitionCommitInfo.isTempPartition();
partitionRecords.add(partitionRecord);
}

View File

@ -246,6 +246,10 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
return getOrCreatTableProperty().isBeingSynced();
}
public boolean isTemporaryPartition(long partitionId) {
return tempPartitions.hasPartition(partitionId);
}
public void setTableProperty(TableProperty tableProperty) {
this.tableProperty = tableProperty;
}

View File

@ -104,6 +104,10 @@ public class TempPartitions implements Writable, GsonPostProcessable {
return nameToPartition.containsKey(partName);
}
public boolean hasPartition(long partitionId) {
return idToPartition.containsKey(partitionId);
}
public boolean isEmpty() {
return idToPartition.isEmpty();
}

View File

@ -1363,7 +1363,8 @@ public class DatabaseTransactionMgr {
|| tblPartitionInfo.getType() == PartitionType.LIST) {
partitionRange = tblPartitionInfo.getItem(partitionId).getItems().toString();
}
PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, partitionRange, -1, -1);
PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, partitionRange, -1, -1,
table.isTemporaryPartition(partitionId));
tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
}
transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
@ -1406,7 +1407,8 @@ public class DatabaseTransactionMgr {
}
PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, partitionRange,
partition.getNextVersion(),
System.currentTimeMillis() /* use as partition visible time */);
System.currentTimeMillis() /* use as partition visible time */,
table.isTemporaryPartition(partitionId));
tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
}
transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);

View File

@ -37,17 +37,21 @@ public class PartitionCommitInfo implements Writable {
private long version;
@SerializedName(value = "versionTime")
private long versionTime;
@SerializedName(value = "isTempPartition")
private boolean isTempPartition;
public PartitionCommitInfo() {
}
public PartitionCommitInfo(long partitionId, String partitionRange, long version, long visibleTime) {
public PartitionCommitInfo(long partitionId, String partitionRange, long version, long visibleTime,
boolean isTempPartition) {
super();
this.partitionId = partitionId;
this.range = partitionRange;
this.version = version;
this.versionTime = visibleTime;
this.isTempPartition = isTempPartition;
}
@Override
@ -85,12 +89,17 @@ public class PartitionCommitInfo implements Writable {
this.versionTime = versionTime;
}
public boolean isTempPartition() {
return this.isTempPartition;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("partitionid=");
StringBuilder sb = new StringBuilder("partitionId=");
sb.append(partitionId);
sb.append(", version=").append(version);
sb.append(", versionTime=").append(versionTime);
sb.append(", isTemp=").append(isTempPartition);
return sb.toString();
}
}