From 28a6a2e44db7e341fbb58b74476b4b6c509ba7d3 Mon Sep 17 00:00:00 2001 From: Jack Drogon Date: Thu, 20 Jul 2023 09:52:21 +0800 Subject: [PATCH] [Enhancement](binlog) Add partitionRange && indexIds in UpsertRecord && PartitionCommitInfo (#22005) --- .../apache/doris/binlog/BinlogManager.java | 4 ++++ .../org/apache/doris/binlog/UpsertRecord.java | 16 +++++++++++++-- .../java/org/apache/doris/catalog/Env.java | 4 ---- .../transaction/DatabaseTransactionMgr.java | 20 ++++++++++++++++--- .../transaction/PartitionCommitInfo.java | 9 ++++++++- 5 files changed, 43 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index b07072955f..11075c4fc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -361,6 +361,10 @@ public class BinlogManager { // Step 2.1: read a binlog TBinlog binlog = readTBinlogFromStream(dis); + if (!Config.enable_feature_binlog) { + continue; + } + // Step 2.2: check if there is in next db Binlogs region long dbId = binlog.getDbId(); if (dbId != currentDbId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java index 32052f798a..f42c7031cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java @@ -29,12 +29,17 @@ import com.google.gson.annotations.SerializedName; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; public class UpsertRecord { public static class TableRecord { public static class PartitionRecord { @SerializedName(value = "partitionId") public long partitionId; + + @SerializedName(value = "range") + private String range; + @SerializedName(value = "version") public long version; } @@ -42,13 +47,18 @@ public class UpsertRecord { @SerializedName(value = "partitionRecords") private List partitionRecords; - public TableRecord() { + @SerializedName(value = "indexIds") + private Set indexIds; + + public TableRecord(Set indexIds) { partitionRecords = Lists.newArrayList(); + this.indexIds = indexIds; } public void addPartitionRecord(PartitionCommitInfo partitionCommitInfo) { PartitionRecord partitionRecord = new PartitionRecord(); partitionRecord.partitionId = partitionCommitInfo.getPartitionId(); + partitionRecord.range = partitionCommitInfo.getPartitionRange(); partitionRecord.version = partitionCommitInfo.getVersion(); partitionRecords.add(partitionRecord); } @@ -83,8 +93,10 @@ public class UpsertRecord { dbId = state.getDbId(); tableRecords = Maps.newHashMap(); + Map> loadedTableIndexIds = state.getLoadedTblIndexes(); for (TableCommitInfo info : state.getIdToTableCommitInfos().values()) { - TableRecord tableRecord = new TableRecord(); + Set indexIds = loadedTableIndexIds.get(info.getTableId()); + TableRecord tableRecord = new TableRecord(indexIds); tableRecords.put(info.getTableId(), tableRecord); for (PartitionCommitInfo partitionCommitInfo : info.getIdToPartitionCommitInfo().values()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 933b7714a6..52ecb88ea0 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1918,10 +1918,6 @@ public class Env { // load binlogs public long loadBinlogs(DataInputStream dis, long checksum) throws IOException { - if (!Config.enable_feature_binlog) { - return checksum; - } - binlogManager.read(dis, checksum); LOG.info("finished replay binlogMgr from image"); return checksum; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index eb69eab8b7..9114186162 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; @@ -1027,8 +1028,15 @@ public class DatabaseTransactionMgr { transactionState.setErrorReplicas(errorReplicaIds); for (long tableId : tableToPartition.keySet()) { TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId); + OlapTable table = (OlapTable) db.getTableNullable(tableId); + PartitionInfo tblPartitionInfo = table.getPartitionInfo(); for (long partitionId : tableToPartition.get(tableId)) { - PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, -1, -1); + String partitionRange = ""; + if (tblPartitionInfo.getType() == PartitionType.RANGE + || tblPartitionInfo.getType() == PartitionType.LIST) { + partitionRange = tblPartitionInfo.getItem(partitionId).getItems().toString(); + } + PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, partitionRange, -1, -1); tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo); } transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo); @@ -1060,10 +1068,16 @@ public class DatabaseTransactionMgr { transactionState.setErrorReplicas(errorReplicaIds); for (long tableId : tableToPartition.keySet()) { TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId); + OlapTable table = (OlapTable) db.getTableNullable(tableId); + PartitionInfo tblPartitionInfo = table.getPartitionInfo(); for (long partitionId : tableToPartition.get(tableId)) { - OlapTable table = (OlapTable) db.getTableNullable(tableId); Partition partition = table.getPartition(partitionId); - PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, + String partitionRange = ""; + if (tblPartitionInfo.getType() == PartitionType.RANGE + || tblPartitionInfo.getType() == PartitionType.LIST) { + partitionRange = tblPartitionInfo.getItem(partitionId).getItems().toString(); + } + PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, partitionRange, partition.getNextVersion(), System.currentTimeMillis() /* use as partition visible time */); tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java index 8a206a172d..3f35c1d295 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java @@ -31,6 +31,8 @@ public class PartitionCommitInfo implements Writable { @SerializedName(value = "partitionId") private long partitionId; + @SerializedName(value = "range") + private String range; @SerializedName(value = "version") private long version; @SerializedName(value = "versionTime") @@ -40,9 +42,10 @@ public class PartitionCommitInfo implements Writable { } - public PartitionCommitInfo(long partitionId, long version, long visibleTime) { + public PartitionCommitInfo(long partitionId, String partitionRange, long version, long visibleTime) { super(); this.partitionId = partitionId; + this.range = partitionRange; this.version = version; this.versionTime = visibleTime; } @@ -62,6 +65,10 @@ public class PartitionCommitInfo implements Writable { return partitionId; } + public String getPartitionRange() { + return range; + } + public long getVersion() { return version; }