diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index e61bb11e62..2c3004c25b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -907,6 +907,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } } + public Map getIndexIdMap() { + return indexIdMap; + } + public List> getUnfinishedTasks(int limit) { List> taskInfos = Lists.newArrayList(); if (jobState == JobState.RUNNING) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/AlterJobRecord.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/AlterJobRecord.java index 36c772d524..51d1103530 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/AlterJobRecord.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/AlterJobRecord.java @@ -18,10 +18,15 @@ package org.apache.doris.binlog; import org.apache.doris.alter.AlterJobV2; +import org.apache.doris.alter.SchemaChangeJobV2; import org.apache.doris.persist.gson.GsonUtils; import com.google.gson.annotations.SerializedName; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + public class AlterJobRecord { @SerializedName(value = "type") private AlterJobV2.JobType type; @@ -37,6 +42,8 @@ public class AlterJobRecord { private AlterJobV2.JobState jobState; @SerializedName(value = "rawSql") private String rawSql; + @SerializedName(value = "iim") + private Map indexIdMap; public AlterJobRecord(AlterJobV2 job) { this.type = job.getType(); @@ -46,9 +53,31 @@ public class AlterJobRecord { this.jobId = job.getJobId(); this.jobState = job.getJobState(); this.rawSql = job.getRawSql(); + if (type == AlterJobV2.JobType.SCHEMA_CHANGE && job instanceof SchemaChangeJobV2) { + this.indexIdMap = ((SchemaChangeJobV2) job).getIndexIdMap(); + } + } + + public boolean isJobFinished() { + return jobState == AlterJobV2.JobState.FINISHED; + } + + public boolean isSchemaChangeJob() { + return type == AlterJobV2.JobType.SCHEMA_CHANGE; + } + + public List getOriginIndexIdList() { + if (indexIdMap == null) { + return new ArrayList<>(); + } + return new ArrayList<>(indexIdMap.values()); } public String toJson() { return GsonUtils.GSON.toJson(this); } + + public static AlterJobRecord fromJson(String json) { + return GsonUtils.GSON.fromJson(json, AlterJobRecord.class); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index fc3115e2b9..db49b5c2b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -225,7 +225,7 @@ public class BinlogManager { AlterJobRecord alterJobRecord = new AlterJobRecord(alterJob); String data = alterJobRecord.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, alterJob); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, alterJobRecord); } public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, long commitSeq) { @@ -383,6 +383,20 @@ public class BinlogManager { } } + // get the dropped indexes of the db. + public List getDroppedIndexes(long dbId) { + lock.readLock().lock(); + try { + DBBinlog dbBinlog = dbBinlogMap.get(dbId); + if (dbBinlog == null) { + return Lists.newArrayList(); + } + return dbBinlog.getDroppedIndexes(); + } finally { + lock.readLock().unlock(); + } + } + public List gc() { LOG.info("begin gc binlog"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index 502491004e..8469bdcc7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -64,6 +64,8 @@ public class DBBinlog { private List> droppedPartitions; // The commit seq of the dropped tables private List> droppedTables; + // The commit seq of the dropped indexes + private List> droppedIndexes; private List tableDummyBinlogs; @@ -82,6 +84,7 @@ public class DBBinlog { timestamps = Lists.newArrayList(); droppedPartitions = Lists.newArrayList(); droppedTables = Lists.newArrayList(); + droppedIndexes = Lists.newArrayList(); TBinlog dummy; if (binlog.getType() == TBinlogType.DUMMY) { @@ -129,6 +132,15 @@ public class DBBinlog { if (record != null && record.getTableId() > 0) { droppedTables.add(Pair.of(record.getTableId(), binlog.getCommitSeq())); } + } else if (binlog.getType() == TBinlogType.ALTER_JOB) { + AlterJobRecord record = AlterJobRecord.fromJson(binlog.data); + if (record != null && record.isSchemaChangeJob() && record.isJobFinished()) { + for (Long indexId : record.getOriginIndexIdList()) { + if (indexId != null && indexId > 0) { + droppedIndexes.add(Pair.of(indexId, binlog.getCommitSeq())); + } + } + } } if (tableIds == null) { @@ -193,6 +205,15 @@ public class DBBinlog { if (tableId > 0) { droppedTables.add(Pair.of(tableId, binlog.getCommitSeq())); } + } else if (binlog.getType() == TBinlogType.ALTER_JOB && raw instanceof AlterJobRecord) { + AlterJobRecord alterJobRecord = (AlterJobRecord) raw; + if (alterJobRecord.isJobFinished() && alterJobRecord.isSchemaChangeJob()) { + for (Long indexId : alterJobRecord.getOriginIndexIdList()) { + if (indexId != null && indexId > 0) { + droppedIndexes.add(Pair.of(indexId, binlog.getCommitSeq())); + } + } + } } switch (binlog.getType()) { @@ -263,6 +284,18 @@ public class DBBinlog { } } + // Get the dropped indexes of the db. + public List getDroppedIndexes() { + lock.readLock().lock(); + try { + return droppedIndexes.stream() + .map(v -> v.first) + .collect(Collectors.toList()); + } finally { + lock.readLock().unlock(); + } + } + public Pair getBinlogLag(long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); lock.readLock().lock(); @@ -380,7 +413,7 @@ public class DBBinlog { } } - gcDroppedPartitionAndTables(largestExpiredCommitSeq); + gcDroppedResources(largestExpiredCommitSeq); if (lastCommitSeq != -1) { dummy.setCommitSeq(lastCommitSeq); } @@ -418,7 +451,7 @@ public class DBBinlog { timeIter.remove(); } - gcDroppedPartitionAndTables(lastExpiredBinlog.getCommitSeq()); + gcDroppedResources(lastExpiredBinlog.getCommitSeq()); } return lastExpiredBinlog; @@ -528,7 +561,7 @@ public class DBBinlog { } } - private void gcDroppedPartitionAndTables(long commitSeq) { + private void gcDroppedResources(long commitSeq) { Iterator> iter = droppedPartitions.iterator(); while (iter.hasNext() && iter.next().second < commitSeq) { iter.remove(); @@ -537,6 +570,10 @@ public class DBBinlog { while (iter.hasNext() && iter.next().second < commitSeq) { iter.remove(); } + iter = droppedIndexes.iterator(); + while (iter.hasNext() && iter.next().second < commitSeq) { + iter.remove(); + } } // not thread safety, do this without lock diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index f425d94fa6..422d46b4fa 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -6224,6 +6224,7 @@ public class Env { BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager(); dbMeta.setDroppedPartitions(binlogManager.getDroppedPartitions(db.getId())); dbMeta.setDroppedTables(binlogManager.getDroppedTables(db.getId())); + dbMeta.setDroppedIndexes(binlogManager.getDroppedIndexes(db.getId())); } result.setDbMeta(dbMeta); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index ad3828e392..c53f25f766 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1435,6 +1435,7 @@ struct TGetMetaDBMeta { 3: optional list tables 4: optional list dropped_partitions 5: optional list dropped_tables + 6: optional list dropped_indexes } struct TGetMetaResult {