[Enhancement](binlog) Add ModifyPartition, BatchModifyPartitions && ReplacePartitionOperationLog support (#23773)
This commit is contained in:
@ -24,9 +24,11 @@ import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.persist.AlterDatabasePropertyInfo;
|
||||
import org.apache.doris.persist.BarrierLog;
|
||||
import org.apache.doris.persist.BatchModifyPartitionsInfo;
|
||||
import org.apache.doris.persist.BinlogGcInfo;
|
||||
import org.apache.doris.persist.DropPartitionInfo;
|
||||
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
|
||||
import org.apache.doris.persist.ReplacePartitionOperationLog;
|
||||
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
|
||||
import org.apache.doris.thrift.TBinlog;
|
||||
import org.apache.doris.thrift.TBinlogType;
|
||||
@ -269,6 +271,30 @@ public class BinlogManager {
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
|
||||
}
|
||||
|
||||
// add Modify partitions
|
||||
public void addModifyPartitions(BatchModifyPartitionsInfo info, long commitSeq) {
|
||||
long dbId = info.getDbId();
|
||||
List<Long> tableIds = Lists.newArrayList();
|
||||
tableIds.add(info.getTableId());
|
||||
long timestamp = -1;
|
||||
TBinlogType type = TBinlogType.MODIFY_PARTITIONS;
|
||||
String data = info.toJson();
|
||||
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
|
||||
}
|
||||
|
||||
// add Replace partition
|
||||
public void addReplacePartitions(ReplacePartitionOperationLog info, long commitSeq) {
|
||||
long dbId = info.getDbId();
|
||||
List<Long> tableIds = Lists.newArrayList();
|
||||
tableIds.add(info.getTblId());
|
||||
long timestamp = -1;
|
||||
TBinlogType type = TBinlogType.REPLACE_PARTITIONS;
|
||||
String data = info.toJson();
|
||||
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
|
||||
}
|
||||
|
||||
// get binlog by dbId, return first binlog.version > version
|
||||
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) {
|
||||
TStatus status = new TStatus(TStatusCode.OK);
|
||||
|
||||
@ -5194,8 +5194,9 @@ public class Env {
|
||||
olapTable.replaceTempPartitions(partitionNames, tempPartitionNames, isStrictRange, useTempPartitionName);
|
||||
|
||||
// write log
|
||||
ReplacePartitionOperationLog info = new ReplacePartitionOperationLog(db.getId(), olapTable.getId(),
|
||||
partitionNames, tempPartitionNames, isStrictRange, useTempPartitionName);
|
||||
ReplacePartitionOperationLog info =
|
||||
new ReplacePartitionOperationLog(db.getId(), db.getFullName(), olapTable.getId(), olapTable.getName(),
|
||||
partitionNames, tempPartitionNames, isStrictRange, useTempPartitionName);
|
||||
editLog.logReplaceTempPartition(info);
|
||||
LOG.info("finished to replace partitions {} with temp partitions {} from table: {}", clause.getPartitionNames(),
|
||||
clause.getTempPartitionNames(), olapTable.getName());
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
import java.io.DataInput;
|
||||
@ -37,9 +38,38 @@ public class BatchModifyPartitionsInfo implements Writable {
|
||||
private List<ModifyPartitionInfo> infos;
|
||||
|
||||
public BatchModifyPartitionsInfo(List<ModifyPartitionInfo> infos) {
|
||||
if (infos == null || infos.isEmpty()) {
|
||||
throw new IllegalArgumentException("infos is null or empty");
|
||||
}
|
||||
|
||||
long dbId = infos.get(0).getDbId();
|
||||
long tableId = infos.get(0).getTableId();
|
||||
for (ModifyPartitionInfo info : infos) {
|
||||
if (info.getDbId() != dbId || info.getTableId() != tableId) {
|
||||
throw new IllegalArgumentException("dbId or tableId is not equal");
|
||||
}
|
||||
}
|
||||
|
||||
this.infos = infos;
|
||||
}
|
||||
|
||||
public BatchModifyPartitionsInfo(ModifyPartitionInfo info) {
|
||||
if (info == null) {
|
||||
throw new IllegalArgumentException("info is null");
|
||||
}
|
||||
|
||||
this.infos = Lists.newArrayList();
|
||||
this.infos.add(info);
|
||||
}
|
||||
|
||||
public long getDbId() {
|
||||
return infos.get(0).getDbId();
|
||||
}
|
||||
|
||||
public long getTableId() {
|
||||
return infos.get(0).getTableId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Text.writeString(out, GsonUtils.GSON.toJson(this));
|
||||
@ -72,4 +102,8 @@ public class BatchModifyPartitionsInfo implements Writable {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public String toJson() {
|
||||
return GsonUtils.GSON.toJson(this);
|
||||
}
|
||||
}
|
||||
|
||||
@ -261,7 +261,9 @@ public class EditLog {
|
||||
ModifyPartitionInfo info = (ModifyPartitionInfo) journal.getData();
|
||||
LOG.info("Begin to unprotect modify partition. db = " + info.getDbId() + " table = "
|
||||
+ info.getTableId() + " partitionId = " + info.getPartitionId());
|
||||
BatchModifyPartitionsInfo infos = new BatchModifyPartitionsInfo(info);
|
||||
env.getAlterInstance().replayModifyPartition(info);
|
||||
env.getBinlogManager().addModifyPartitions(infos, logId);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_BATCH_MODIFY_PARTITION: {
|
||||
@ -269,6 +271,7 @@ public class EditLog {
|
||||
for (ModifyPartitionInfo modifyPartitionInfo : info.getModifyPartitionInfos()) {
|
||||
env.getAlterInstance().replayModifyPartition(modifyPartitionInfo);
|
||||
}
|
||||
env.getBinlogManager().addModifyPartitions(info, logId);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_ERASE_TABLE: {
|
||||
@ -799,6 +802,7 @@ public class EditLog {
|
||||
ReplacePartitionOperationLog replaceTempPartitionLog =
|
||||
(ReplacePartitionOperationLog) journal.getData();
|
||||
env.replayReplaceTempPartition(replaceTempPartitionLog);
|
||||
env.getBinlogManager().addReplacePartitions(replaceTempPartitionLog, logId);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_INSTALL_PLUGIN: {
|
||||
@ -1289,11 +1293,16 @@ public class EditLog {
|
||||
}
|
||||
|
||||
public void logModifyPartition(ModifyPartitionInfo info) {
|
||||
logEdit(OperationType.OP_MODIFY_PARTITION, info);
|
||||
long logId = logEdit(OperationType.OP_MODIFY_PARTITION, info);
|
||||
BatchModifyPartitionsInfo infos = new BatchModifyPartitionsInfo(info);
|
||||
LOG.info("log modify partition, logId:{}, infos: {}", logId, infos.toJson());
|
||||
Env.getCurrentEnv().getBinlogManager().addModifyPartitions(infos, logId);
|
||||
}
|
||||
|
||||
public void logBatchModifyPartition(BatchModifyPartitionsInfo info) {
|
||||
logEdit(OperationType.OP_BATCH_MODIFY_PARTITION, info);
|
||||
long logId = logEdit(OperationType.OP_BATCH_MODIFY_PARTITION, info);
|
||||
LOG.info("log modify partition, logId:{}, infos: {}", logId, info.toJson());
|
||||
Env.getCurrentEnv().getBinlogManager().addModifyPartitions(info, logId);
|
||||
}
|
||||
|
||||
public void logDropTable(DropInfo info) {
|
||||
@ -1721,7 +1730,9 @@ public class EditLog {
|
||||
}
|
||||
|
||||
public void logReplaceTempPartition(ReplacePartitionOperationLog info) {
|
||||
logEdit(OperationType.OP_REPLACE_TEMP_PARTITION, info);
|
||||
long logId = logEdit(OperationType.OP_REPLACE_TEMP_PARTITION, info);
|
||||
LOG.info("log replace temp partition, logId: {}, info: {}", logId, info.toJson());
|
||||
Env.getCurrentEnv().getBinlogManager().addReplacePartitions(info, logId);
|
||||
}
|
||||
|
||||
public void logInstallPlugin(PluginInfo plugin) {
|
||||
|
||||
@ -35,8 +35,12 @@ public class ReplacePartitionOperationLog implements Writable {
|
||||
|
||||
@SerializedName(value = "dbId")
|
||||
private long dbId;
|
||||
@SerializedName(value = "dbName")
|
||||
private String dbName;
|
||||
@SerializedName(value = "tblId")
|
||||
private long tblId;
|
||||
@SerializedName(value = "tblName")
|
||||
private String tblName;
|
||||
@SerializedName(value = "partitions")
|
||||
private List<String> partitions;
|
||||
@SerializedName(value = "tempPartitions")
|
||||
@ -46,10 +50,14 @@ public class ReplacePartitionOperationLog implements Writable {
|
||||
@SerializedName(value = "useTempPartitionName")
|
||||
private boolean useTempPartitionName;
|
||||
|
||||
public ReplacePartitionOperationLog(long dbId, long tblId, List<String> partitionNames,
|
||||
List<String> tempPartitonNames, boolean strictRange, boolean useTempPartitionName) {
|
||||
public ReplacePartitionOperationLog(long dbId, String dbName, long tblId, String tblName,
|
||||
List<String> partitionNames,
|
||||
List<String> tempPartitonNames, boolean strictRange,
|
||||
boolean useTempPartitionName) {
|
||||
this.dbId = dbId;
|
||||
this.dbName = dbName;
|
||||
this.tblId = tblId;
|
||||
this.tblName = tblName;
|
||||
this.partitions = partitionNames;
|
||||
this.tempPartitions = tempPartitonNames;
|
||||
this.strictRange = strictRange;
|
||||
@ -90,4 +98,13 @@ public class ReplacePartitionOperationLog implements Writable {
|
||||
String json = GsonUtils.GSON.toJson(this);
|
||||
Text.writeString(out, json);
|
||||
}
|
||||
|
||||
public String toJson() {
|
||||
return GsonUtils.GSON.toJson(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return toJson();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user