[Enhancement](binlog) Add partitionRange && indexIds in UpsertRecord && PartitionCommitInfo (#22005)

This commit is contained in:
Jack Drogon
2023-07-20 09:52:21 +08:00
committed by GitHub
parent 2daad2151d
commit 28a6a2e44d
5 changed files with 43 additions and 10 deletions

View File

@ -361,6 +361,10 @@ public class BinlogManager {
// Step 2.1: read a binlog
TBinlog binlog = readTBinlogFromStream(dis);
if (!Config.enable_feature_binlog) {
continue;
}
// Step 2.2: check if there is in next db Binlogs region
long dbId = binlog.getDbId();
if (dbId != currentDbId) {

View File

@ -29,12 +29,17 @@ import com.google.gson.annotations.SerializedName;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class UpsertRecord {
public static class TableRecord {
public static class PartitionRecord {
@SerializedName(value = "partitionId")
public long partitionId;
@SerializedName(value = "range")
private String range;
@SerializedName(value = "version")
public long version;
}
@ -42,13 +47,18 @@ public class UpsertRecord {
@SerializedName(value = "partitionRecords")
private List<PartitionRecord> partitionRecords;
public TableRecord() {
@SerializedName(value = "indexIds")
private Set<Long> indexIds;
public TableRecord(Set<Long> indexIds) {
partitionRecords = Lists.newArrayList();
this.indexIds = indexIds;
}
public void addPartitionRecord(PartitionCommitInfo partitionCommitInfo) {
PartitionRecord partitionRecord = new PartitionRecord();
partitionRecord.partitionId = partitionCommitInfo.getPartitionId();
partitionRecord.range = partitionCommitInfo.getPartitionRange();
partitionRecord.version = partitionCommitInfo.getVersion();
partitionRecords.add(partitionRecord);
}
@ -83,8 +93,10 @@ public class UpsertRecord {
dbId = state.getDbId();
tableRecords = Maps.newHashMap();
Map<Long, Set<Long>> loadedTableIndexIds = state.getLoadedTblIndexes();
for (TableCommitInfo info : state.getIdToTableCommitInfos().values()) {
TableRecord tableRecord = new TableRecord();
Set<Long> indexIds = loadedTableIndexIds.get(info.getTableId());
TableRecord tableRecord = new TableRecord(indexIds);
tableRecords.put(info.getTableId(), tableRecord);
for (PartitionCommitInfo partitionCommitInfo : info.getIdToPartitionCommitInfo().values()) {

View File

@ -1918,10 +1918,6 @@ public class Env {
// load binlogs
public long loadBinlogs(DataInputStream dis, long checksum) throws IOException {
if (!Config.enable_feature_binlog) {
return checksum;
}
binlogManager.read(dis, checksum);
LOG.info("finished replay binlogMgr from image");
return checksum;

View File

@ -24,6 +24,7 @@ import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
@ -1027,8 +1028,15 @@ public class DatabaseTransactionMgr {
transactionState.setErrorReplicas(errorReplicaIds);
for (long tableId : tableToPartition.keySet()) {
TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
OlapTable table = (OlapTable) db.getTableNullable(tableId);
PartitionInfo tblPartitionInfo = table.getPartitionInfo();
for (long partitionId : tableToPartition.get(tableId)) {
PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, -1, -1);
String partitionRange = "";
if (tblPartitionInfo.getType() == PartitionType.RANGE
|| tblPartitionInfo.getType() == PartitionType.LIST) {
partitionRange = tblPartitionInfo.getItem(partitionId).getItems().toString();
}
PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, partitionRange, -1, -1);
tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
}
transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
@ -1060,10 +1068,16 @@ public class DatabaseTransactionMgr {
transactionState.setErrorReplicas(errorReplicaIds);
for (long tableId : tableToPartition.keySet()) {
TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
OlapTable table = (OlapTable) db.getTableNullable(tableId);
PartitionInfo tblPartitionInfo = table.getPartitionInfo();
for (long partitionId : tableToPartition.get(tableId)) {
OlapTable table = (OlapTable) db.getTableNullable(tableId);
Partition partition = table.getPartition(partitionId);
PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId,
String partitionRange = "";
if (tblPartitionInfo.getType() == PartitionType.RANGE
|| tblPartitionInfo.getType() == PartitionType.LIST) {
partitionRange = tblPartitionInfo.getItem(partitionId).getItems().toString();
}
PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, partitionRange,
partition.getNextVersion(),
System.currentTimeMillis() /* use as partition visible time */);
tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);

View File

@ -31,6 +31,8 @@ public class PartitionCommitInfo implements Writable {
@SerializedName(value = "partitionId")
private long partitionId;
@SerializedName(value = "range")
private String range;
@SerializedName(value = "version")
private long version;
@SerializedName(value = "versionTime")
@ -40,9 +42,10 @@ public class PartitionCommitInfo implements Writable {
}
public PartitionCommitInfo(long partitionId, long version, long visibleTime) {
public PartitionCommitInfo(long partitionId, String partitionRange, long version, long visibleTime) {
super();
this.partitionId = partitionId;
this.range = partitionRange;
this.version = version;
this.versionTime = visibleTime;
}
@ -62,6 +65,10 @@ public class PartitionCommitInfo implements Writable {
return partitionId;
}
public String getPartitionRange() {
return range;
}
public long getVersion() {
return version;
}