cherry pick from #41246
This commit is contained in:
@ -907,6 +907,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
|
||||
}
|
||||
}
|
||||
|
||||
public Map<Long, Long> getIndexIdMap() {
|
||||
return indexIdMap;
|
||||
}
|
||||
|
||||
public List<List<String>> getUnfinishedTasks(int limit) {
|
||||
List<List<String>> taskInfos = Lists.newArrayList();
|
||||
if (jobState == JobState.RUNNING) {
|
||||
|
||||
@ -18,10 +18,15 @@
|
||||
package org.apache.doris.binlog;
|
||||
|
||||
import org.apache.doris.alter.AlterJobV2;
|
||||
import org.apache.doris.alter.SchemaChangeJobV2;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class AlterJobRecord {
|
||||
@SerializedName(value = "type")
|
||||
private AlterJobV2.JobType type;
|
||||
@ -37,6 +42,8 @@ public class AlterJobRecord {
|
||||
private AlterJobV2.JobState jobState;
|
||||
@SerializedName(value = "rawSql")
|
||||
private String rawSql;
|
||||
@SerializedName(value = "iim")
|
||||
private Map<Long, Long> indexIdMap;
|
||||
|
||||
public AlterJobRecord(AlterJobV2 job) {
|
||||
this.type = job.getType();
|
||||
@ -46,9 +53,31 @@ public class AlterJobRecord {
|
||||
this.jobId = job.getJobId();
|
||||
this.jobState = job.getJobState();
|
||||
this.rawSql = job.getRawSql();
|
||||
if (type == AlterJobV2.JobType.SCHEMA_CHANGE && job instanceof SchemaChangeJobV2) {
|
||||
this.indexIdMap = ((SchemaChangeJobV2) job).getIndexIdMap();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isJobFinished() {
|
||||
return jobState == AlterJobV2.JobState.FINISHED;
|
||||
}
|
||||
|
||||
public boolean isSchemaChangeJob() {
|
||||
return type == AlterJobV2.JobType.SCHEMA_CHANGE;
|
||||
}
|
||||
|
||||
public List<Long> getOriginIndexIdList() {
|
||||
if (indexIdMap == null) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
return new ArrayList<>(indexIdMap.values());
|
||||
}
|
||||
|
||||
public String toJson() {
|
||||
return GsonUtils.GSON.toJson(this);
|
||||
}
|
||||
|
||||
public static AlterJobRecord fromJson(String json) {
|
||||
return GsonUtils.GSON.fromJson(json, AlterJobRecord.class);
|
||||
}
|
||||
}
|
||||
|
||||
@ -225,7 +225,7 @@ public class BinlogManager {
|
||||
AlterJobRecord alterJobRecord = new AlterJobRecord(alterJob);
|
||||
String data = alterJobRecord.toJson();
|
||||
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, alterJob);
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, alterJobRecord);
|
||||
}
|
||||
|
||||
public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, long commitSeq) {
|
||||
@ -383,6 +383,20 @@ public class BinlogManager {
|
||||
}
|
||||
}
|
||||
|
||||
// get the dropped indexes of the db.
|
||||
public List<Long> getDroppedIndexes(long dbId) {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
DBBinlog dbBinlog = dbBinlogMap.get(dbId);
|
||||
if (dbBinlog == null) {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
return dbBinlog.getDroppedIndexes();
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public List<BinlogTombstone> gc() {
|
||||
LOG.info("begin gc binlog");
|
||||
|
||||
|
||||
@ -64,6 +64,8 @@ public class DBBinlog {
|
||||
private List<Pair<Long, Long>> droppedPartitions;
|
||||
// The commit seq of the dropped tables
|
||||
private List<Pair<Long, Long>> droppedTables;
|
||||
// The commit seq of the dropped indexes
|
||||
private List<Pair<Long, Long>> droppedIndexes;
|
||||
|
||||
private List<TBinlog> tableDummyBinlogs;
|
||||
|
||||
@ -82,6 +84,7 @@ public class DBBinlog {
|
||||
timestamps = Lists.newArrayList();
|
||||
droppedPartitions = Lists.newArrayList();
|
||||
droppedTables = Lists.newArrayList();
|
||||
droppedIndexes = Lists.newArrayList();
|
||||
|
||||
TBinlog dummy;
|
||||
if (binlog.getType() == TBinlogType.DUMMY) {
|
||||
@ -129,6 +132,15 @@ public class DBBinlog {
|
||||
if (record != null && record.getTableId() > 0) {
|
||||
droppedTables.add(Pair.of(record.getTableId(), binlog.getCommitSeq()));
|
||||
}
|
||||
} else if (binlog.getType() == TBinlogType.ALTER_JOB) {
|
||||
AlterJobRecord record = AlterJobRecord.fromJson(binlog.data);
|
||||
if (record != null && record.isSchemaChangeJob() && record.isJobFinished()) {
|
||||
for (Long indexId : record.getOriginIndexIdList()) {
|
||||
if (indexId != null && indexId > 0) {
|
||||
droppedIndexes.add(Pair.of(indexId, binlog.getCommitSeq()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (tableIds == null) {
|
||||
@ -193,6 +205,15 @@ public class DBBinlog {
|
||||
if (tableId > 0) {
|
||||
droppedTables.add(Pair.of(tableId, binlog.getCommitSeq()));
|
||||
}
|
||||
} else if (binlog.getType() == TBinlogType.ALTER_JOB && raw instanceof AlterJobRecord) {
|
||||
AlterJobRecord alterJobRecord = (AlterJobRecord) raw;
|
||||
if (alterJobRecord.isJobFinished() && alterJobRecord.isSchemaChangeJob()) {
|
||||
for (Long indexId : alterJobRecord.getOriginIndexIdList()) {
|
||||
if (indexId != null && indexId > 0) {
|
||||
droppedIndexes.add(Pair.of(indexId, binlog.getCommitSeq()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
switch (binlog.getType()) {
|
||||
@ -263,6 +284,18 @@ public class DBBinlog {
|
||||
}
|
||||
}
|
||||
|
||||
// Get the dropped indexes of the db.
|
||||
public List<Long> getDroppedIndexes() {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
return droppedIndexes.stream()
|
||||
.map(v -> v.first)
|
||||
.collect(Collectors.toList());
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) {
|
||||
TStatus status = new TStatus(TStatusCode.OK);
|
||||
lock.readLock().lock();
|
||||
@ -380,7 +413,7 @@ public class DBBinlog {
|
||||
}
|
||||
}
|
||||
|
||||
gcDroppedPartitionAndTables(largestExpiredCommitSeq);
|
||||
gcDroppedResources(largestExpiredCommitSeq);
|
||||
if (lastCommitSeq != -1) {
|
||||
dummy.setCommitSeq(lastCommitSeq);
|
||||
}
|
||||
@ -418,7 +451,7 @@ public class DBBinlog {
|
||||
timeIter.remove();
|
||||
}
|
||||
|
||||
gcDroppedPartitionAndTables(lastExpiredBinlog.getCommitSeq());
|
||||
gcDroppedResources(lastExpiredBinlog.getCommitSeq());
|
||||
}
|
||||
|
||||
return lastExpiredBinlog;
|
||||
@ -528,7 +561,7 @@ public class DBBinlog {
|
||||
}
|
||||
}
|
||||
|
||||
private void gcDroppedPartitionAndTables(long commitSeq) {
|
||||
private void gcDroppedResources(long commitSeq) {
|
||||
Iterator<Pair<Long, Long>> iter = droppedPartitions.iterator();
|
||||
while (iter.hasNext() && iter.next().second < commitSeq) {
|
||||
iter.remove();
|
||||
@ -537,6 +570,10 @@ public class DBBinlog {
|
||||
while (iter.hasNext() && iter.next().second < commitSeq) {
|
||||
iter.remove();
|
||||
}
|
||||
iter = droppedIndexes.iterator();
|
||||
while (iter.hasNext() && iter.next().second < commitSeq) {
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
|
||||
// not thread safety, do this without lock
|
||||
|
||||
@ -6224,6 +6224,7 @@ public class Env {
|
||||
BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager();
|
||||
dbMeta.setDroppedPartitions(binlogManager.getDroppedPartitions(db.getId()));
|
||||
dbMeta.setDroppedTables(binlogManager.getDroppedTables(db.getId()));
|
||||
dbMeta.setDroppedIndexes(binlogManager.getDroppedIndexes(db.getId()));
|
||||
}
|
||||
|
||||
result.setDbMeta(dbMeta);
|
||||
|
||||
Reference in New Issue
Block a user