cherry pick from #44263 Co-authored-by: Dongyang Li <lidongyang@selectdb.com>
This commit is contained in:
@ -574,7 +574,7 @@ public class Alter {
|
||||
replaceTableInternal(db, origTable, olapNewTbl, swapTable, false);
|
||||
// write edit log
|
||||
ReplaceTableOperationLog log = new ReplaceTableOperationLog(db.getId(),
|
||||
origTable.getId(), olapNewTbl.getId(), swapTable);
|
||||
origTable.getId(), oldTblName, olapNewTbl.getId(), newTblName, swapTable);
|
||||
Env.getCurrentEnv().getEditLog().logReplaceTable(log);
|
||||
LOG.info("finish replacing table {} with table {}, is swap: {}", oldTblName, newTblName, swapTable);
|
||||
} finally {
|
||||
|
||||
@ -33,6 +33,7 @@ import org.apache.doris.persist.DropPartitionInfo;
|
||||
import org.apache.doris.persist.ModifyCommentOperationLog;
|
||||
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
|
||||
import org.apache.doris.persist.ReplacePartitionOperationLog;
|
||||
import org.apache.doris.persist.ReplaceTableOperationLog;
|
||||
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
|
||||
import org.apache.doris.persist.TableInfo;
|
||||
import org.apache.doris.persist.TableRenameColumnInfo;
|
||||
@ -45,6 +46,7 @@ import org.apache.doris.thrift.TStatusCode;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.TException;
|
||||
@ -361,6 +363,20 @@ public class BinlogManager {
|
||||
addBarrierLog(log, commitSeq);
|
||||
}
|
||||
|
||||
public void addReplaceTable(ReplaceTableOperationLog info, long commitSeq) {
|
||||
if (StringUtils.isEmpty(info.getOrigTblName()) || StringUtils.isEmpty(info.getNewTblName())) {
|
||||
LOG.warn("skip replace table binlog, because origTblName or newTblName is empty. info: {}", info);
|
||||
return;
|
||||
}
|
||||
|
||||
long dbId = info.getDbId();
|
||||
long tableId = info.getOrigTblId();
|
||||
TBinlogType type = TBinlogType.REPLACE_TABLE;
|
||||
String data = info.toJson();
|
||||
BarrierLog log = new BarrierLog(dbId, tableId, type, data);
|
||||
addBarrierLog(log, commitSeq);
|
||||
}
|
||||
|
||||
// get binlog by dbId, return first binlog.version > version
|
||||
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) {
|
||||
TStatus status = new TStatus(TStatusCode.OK);
|
||||
|
||||
@ -22,7 +22,9 @@ import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.proc.BaseProcResult;
|
||||
import org.apache.doris.persist.BarrierLog;
|
||||
import org.apache.doris.persist.DropPartitionInfo;
|
||||
import org.apache.doris.persist.ReplaceTableOperationLog;
|
||||
import org.apache.doris.thrift.TBinlog;
|
||||
import org.apache.doris.thrift.TBinlogType;
|
||||
import org.apache.doris.thrift.TStatus;
|
||||
@ -626,19 +628,29 @@ public class DBBinlog {
|
||||
|
||||
// A method to record the dropped tables, indexes, and partitions.
|
||||
private void recordDroppedResources(TBinlog binlog, Object raw) {
|
||||
recordDroppedResources(binlog.getType(), binlog.getCommitSeq(), binlog.getData(), raw);
|
||||
}
|
||||
|
||||
private void recordDroppedResources(TBinlogType binlogType, long commitSeq, String data, Object raw) {
|
||||
if (raw == null) {
|
||||
switch (binlog.getType()) {
|
||||
switch (binlogType) {
|
||||
case DROP_PARTITION:
|
||||
raw = DropPartitionInfo.fromJson(binlog.data);
|
||||
raw = DropPartitionInfo.fromJson(data);
|
||||
break;
|
||||
case DROP_TABLE:
|
||||
raw = DropTableRecord.fromJson(binlog.data);
|
||||
raw = DropTableRecord.fromJson(data);
|
||||
break;
|
||||
case ALTER_JOB:
|
||||
raw = AlterJobRecord.fromJson(binlog.data);
|
||||
raw = AlterJobRecord.fromJson(data);
|
||||
break;
|
||||
case TRUNCATE_TABLE:
|
||||
raw = TruncateTableRecord.fromJson(binlog.data);
|
||||
raw = TruncateTableRecord.fromJson(data);
|
||||
break;
|
||||
case REPLACE_TABLE:
|
||||
raw = ReplaceTableOperationLog.fromJson(data);
|
||||
break;
|
||||
case BARRIER:
|
||||
raw = BarrierLog.fromJson(data);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
@ -648,29 +660,44 @@ public class DBBinlog {
|
||||
}
|
||||
}
|
||||
|
||||
if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) {
|
||||
recordDroppedResources(binlogType, commitSeq, raw);
|
||||
}
|
||||
|
||||
private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Object raw) {
|
||||
if (binlogType == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) {
|
||||
long partitionId = ((DropPartitionInfo) raw).getPartitionId();
|
||||
if (partitionId > 0) {
|
||||
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
|
||||
droppedPartitions.add(Pair.of(partitionId, commitSeq));
|
||||
}
|
||||
} else if (binlog.getType() == TBinlogType.DROP_TABLE && raw instanceof DropTableRecord) {
|
||||
} else if (binlogType == TBinlogType.DROP_TABLE && raw instanceof DropTableRecord) {
|
||||
long tableId = ((DropTableRecord) raw).getTableId();
|
||||
if (tableId > 0) {
|
||||
droppedTables.add(Pair.of(tableId, binlog.getCommitSeq()));
|
||||
droppedTables.add(Pair.of(tableId, commitSeq));
|
||||
}
|
||||
} else if (binlog.getType() == TBinlogType.ALTER_JOB && raw instanceof AlterJobRecord) {
|
||||
} else if (binlogType == TBinlogType.ALTER_JOB && raw instanceof AlterJobRecord) {
|
||||
AlterJobRecord alterJobRecord = (AlterJobRecord) raw;
|
||||
if (alterJobRecord.isJobFinished() && alterJobRecord.isSchemaChangeJob()) {
|
||||
for (Long indexId : alterJobRecord.getOriginIndexIdList()) {
|
||||
if (indexId != null && indexId > 0) {
|
||||
droppedIndexes.add(Pair.of(indexId, binlog.getCommitSeq()));
|
||||
droppedIndexes.add(Pair.of(indexId, commitSeq));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE && raw instanceof TruncateTableRecord) {
|
||||
} else if (binlogType == TBinlogType.TRUNCATE_TABLE && raw instanceof TruncateTableRecord) {
|
||||
TruncateTableRecord truncateTableRecord = (TruncateTableRecord) raw;
|
||||
for (long partitionId : truncateTableRecord.getOldPartitionIds()) {
|
||||
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
|
||||
droppedPartitions.add(Pair.of(partitionId, commitSeq));
|
||||
}
|
||||
} else if (binlogType == TBinlogType.REPLACE_TABLE && raw instanceof ReplaceTableOperationLog) {
|
||||
ReplaceTableOperationLog record = (ReplaceTableOperationLog) raw;
|
||||
if (!record.isSwapTable()) {
|
||||
droppedTables.add(Pair.of(record.getOrigTblId(), commitSeq));
|
||||
}
|
||||
} else if (binlogType == TBinlogType.BARRIER && raw instanceof BarrierLog) {
|
||||
BarrierLog log = (BarrierLog) raw;
|
||||
// keep compatible with doris 2.0/2.1
|
||||
if (log.hasBinlog()) {
|
||||
recordDroppedResources(log.getBinlogType(), commitSeq, log.getBinlog(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -109,6 +109,10 @@ public class BarrierLog implements Writable {
|
||||
return GsonUtils.GSON.toJson(this);
|
||||
}
|
||||
|
||||
public static BarrierLog fromJson(String json) {
|
||||
return GsonUtils.GSON.fromJson(json, BarrierLog.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return toJson();
|
||||
|
||||
@ -314,6 +314,7 @@ public class EditLog {
|
||||
case OperationType.OP_RENAME_PARTITION: {
|
||||
TableInfo info = (TableInfo) journal.getData();
|
||||
env.replayRenamePartition(info);
|
||||
env.getBinlogManager().addTableRename(info, logId);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_RENAME_COLUMN: {
|
||||
@ -361,6 +362,7 @@ public class EditLog {
|
||||
case OperationType.OP_RENAME_ROLLUP: {
|
||||
TableInfo info = (TableInfo) journal.getData();
|
||||
env.replayRenameRollup(info);
|
||||
env.getBinlogManager().addTableRename(info, logId);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_LOAD_START:
|
||||
@ -887,6 +889,7 @@ public class EditLog {
|
||||
case OperationType.OP_REPLACE_TABLE: {
|
||||
ReplaceTableOperationLog log = (ReplaceTableOperationLog) journal.getData();
|
||||
env.getAlterInstance().replayReplaceTable(log);
|
||||
env.getBinlogManager().addReplaceTable(log, logId);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_CREATE_SQL_BLOCK_RULE: {
|
||||
@ -1887,7 +1890,9 @@ public class EditLog {
|
||||
}
|
||||
|
||||
public void logReplaceTable(ReplaceTableOperationLog log) {
|
||||
logEdit(OperationType.OP_REPLACE_TABLE, log);
|
||||
long logId = logEdit(OperationType.OP_REPLACE_TABLE, log);
|
||||
LOG.info("add replace table binlog, logId: {}, infos: {}", logId, log);
|
||||
Env.getCurrentEnv().getBinlogManager().addReplaceTable(log, logId);
|
||||
}
|
||||
|
||||
public void logBatchRemoveTransactions(BatchRemoveTransactionsOperationV2 op) {
|
||||
|
||||
@ -32,15 +32,23 @@ public class ReplaceTableOperationLog implements Writable {
|
||||
private long dbId;
|
||||
@SerializedName(value = "origTblId")
|
||||
private long origTblId;
|
||||
@SerializedName(value = "origTblName")
|
||||
private String origTblName;
|
||||
@SerializedName(value = "newTblName")
|
||||
private long newTblId;
|
||||
@SerializedName(value = "actualNewTblName")
|
||||
private String newTblName;
|
||||
@SerializedName(value = "swapTable")
|
||||
private boolean swapTable;
|
||||
|
||||
public ReplaceTableOperationLog(long dbId, long origTblId, long newTblId, boolean swapTable) {
|
||||
public ReplaceTableOperationLog(long dbId, long origTblId,
|
||||
String origTblName, long newTblId, String newTblName,
|
||||
boolean swapTable) {
|
||||
this.dbId = dbId;
|
||||
this.origTblId = origTblId;
|
||||
this.origTblName = origTblName;
|
||||
this.newTblId = newTblId;
|
||||
this.newTblName = newTblName;
|
||||
this.swapTable = swapTable;
|
||||
}
|
||||
|
||||
@ -52,21 +60,37 @@ public class ReplaceTableOperationLog implements Writable {
|
||||
return origTblId;
|
||||
}
|
||||
|
||||
public String getOrigTblName() {
|
||||
return origTblName;
|
||||
}
|
||||
|
||||
public long getNewTblId() {
|
||||
return newTblId;
|
||||
}
|
||||
|
||||
public String getNewTblName() {
|
||||
return newTblName;
|
||||
}
|
||||
|
||||
public boolean isSwapTable() {
|
||||
return swapTable;
|
||||
}
|
||||
|
||||
public String toJson() {
|
||||
return GsonUtils.GSON.toJson(this);
|
||||
}
|
||||
|
||||
public static ReplaceTableOperationLog fromJson(String json) {
|
||||
return GsonUtils.GSON.fromJson(json, ReplaceTableOperationLog.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
String json = GsonUtils.GSON.toJson(this);
|
||||
Text.writeString(out, json);
|
||||
}
|
||||
|
||||
public static ReplaceTableOperationLog read(DataInput in) throws IOException {
|
||||
public static ReplaceTableOperationLog read(DataInput in) throws IOException {
|
||||
String json = Text.readString(in);
|
||||
return GsonUtils.GSON.fromJson(json, ReplaceTableOperationLog.class);
|
||||
}
|
||||
|
||||
@ -34,7 +34,7 @@ public class ReplaceTableOperationLogTest {
|
||||
file.createNewFile();
|
||||
DataOutputStream dos = new DataOutputStream(new FileOutputStream(file));
|
||||
|
||||
ReplaceTableOperationLog log = new ReplaceTableOperationLog(1, 2, 3, true);
|
||||
ReplaceTableOperationLog log = new ReplaceTableOperationLog(1, 2, "old", 3, "new", true);
|
||||
log.write(dos);
|
||||
|
||||
dos.flush();
|
||||
@ -48,6 +48,8 @@ public class ReplaceTableOperationLogTest {
|
||||
Assert.assertTrue(readLog.getNewTblId() == log.getNewTblId());
|
||||
Assert.assertTrue(readLog.getOrigTblId() == log.getOrigTblId());
|
||||
Assert.assertTrue(readLog.isSwapTable() == log.isSwapTable());
|
||||
Assert.assertTrue(readLog.getOrigTblName().equals(log.getOrigTblName()));
|
||||
Assert.assertTrue(readLog.getNewTblName().equals(log.getNewTblName()));
|
||||
|
||||
// 3. delete files
|
||||
dis.close();
|
||||
|
||||
@ -1141,6 +1141,7 @@ enum TBinlogType {
|
||||
RENAME_COLUMN = 15,
|
||||
MODIFY_COMMENT = 16,
|
||||
MODIFY_VIEW_DEF = 17,
|
||||
REPLACE_TABLE = 18,
|
||||
|
||||
// Keep some IDs for allocation so that when new binlog types are added in the
|
||||
// future, the changes can be picked back to the old versions without breaking
|
||||
@ -1157,8 +1158,7 @@ enum TBinlogType {
|
||||
// MODIFY_XXX = 17,
|
||||
// MIN_UNKNOWN = 18,
|
||||
// UNKNOWN_3 = 19,
|
||||
MIN_UNKNOWN = 18,
|
||||
UNKNOWN_3 = 19,
|
||||
MIN_UNKNOWN = 19,
|
||||
UNKNOWN_4 = 20,
|
||||
UNKNOWN_5 = 21,
|
||||
UNKNOWN_6 = 22,
|
||||
|
||||
Reference in New Issue
Block a user