[Bug] Fix bug that not erase meta such as tablet when force drop db, table, partition (#5329)
This commit is contained in:
@ -1566,12 +1566,6 @@ public class RestoreJob extends AbstractJob {
|
||||
restoreTbl.getName(), entry.second.getName());
|
||||
restoreTbl.writeLock();
|
||||
try {
|
||||
for (MaterializedIndex idx : entry.second.getMaterializedIndices(IndexExtState.VISIBLE)) {
|
||||
for (Tablet tablet : idx.getTablets()) {
|
||||
Catalog.getCurrentInvertedIndex().deleteTablet(tablet.getId());
|
||||
}
|
||||
}
|
||||
|
||||
restoreTbl.dropPartition(dbId, entry.second.getName(), true /* is restore */);
|
||||
} finally {
|
||||
restoreTbl.writeUnlock();
|
||||
|
||||
@ -205,6 +205,7 @@ import org.apache.doris.task.AgentBatchTask;
|
||||
import org.apache.doris.task.AgentTaskExecutor;
|
||||
import org.apache.doris.task.AgentTaskQueue;
|
||||
import org.apache.doris.task.CreateReplicaTask;
|
||||
import org.apache.doris.task.DropReplicaTask;
|
||||
import org.apache.doris.task.MasterTaskExecutor;
|
||||
import org.apache.doris.thrift.TStorageFormat;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
@ -2661,9 +2662,11 @@ public class Catalog {
|
||||
|
||||
// save table names for recycling
|
||||
Set<String> tableNames = db.getTableNamesWithLock();
|
||||
unprotectDropDb(db, stmt.isForceDrop());
|
||||
unprotectDropDb(db, stmt.isForceDrop(), false);
|
||||
if (!stmt.isForceDrop()) {
|
||||
Catalog.getCurrentRecycleBin().recycleDatabase(db, tableNames);
|
||||
} else {
|
||||
Catalog.getCurrentCatalog().eraseDatabase(db.getId(), false);
|
||||
}
|
||||
} finally {
|
||||
db.writeUnlock();
|
||||
@ -2683,11 +2686,11 @@ public class Catalog {
|
||||
LOG.info("finish drop database[{}], is force : {}", dbName, stmt.isForceDrop());
|
||||
}
|
||||
|
||||
public void unprotectDropDb(Database db, boolean isForeDrop) {
|
||||
public void unprotectDropDb(Database db, boolean isForeDrop, boolean isReplay) {
|
||||
for (Table table : db.getTables()) {
|
||||
table.writeLock();
|
||||
try {
|
||||
unprotectDropTable(db, table, isForeDrop);
|
||||
unprotectDropTable(db, table, isForeDrop, isReplay);
|
||||
} finally {
|
||||
table.writeUnlock();
|
||||
}
|
||||
@ -2717,9 +2720,11 @@ public class Catalog {
|
||||
db.writeLock();
|
||||
try {
|
||||
Set<String> tableNames = db.getTableNamesWithLock();
|
||||
unprotectDropDb(db, isForceDrop);
|
||||
unprotectDropDb(db, isForceDrop, true);
|
||||
if (!isForceDrop) {
|
||||
Catalog.getCurrentRecycleBin().recycleDatabase(db, tableNames);
|
||||
} else {
|
||||
Catalog.getCurrentCatalog().eraseDatabase(db.getId(), false);
|
||||
}
|
||||
} finally {
|
||||
db.writeUnlock();
|
||||
@ -4364,7 +4369,7 @@ public class Catalog {
|
||||
DropInfo info = new DropInfo(db.getId(), table.getId(), -1L, stmt.isForceDrop());
|
||||
table.writeLock();
|
||||
try {
|
||||
unprotectDropTable(db, table, stmt.isForceDrop());
|
||||
unprotectDropTable(db, table, stmt.isForceDrop(), false);
|
||||
} finally {
|
||||
table.writeUnlock();
|
||||
}
|
||||
@ -4375,7 +4380,7 @@ public class Catalog {
|
||||
LOG.info("finished dropping table: {} from db: {}, is force: {}", tableName, dbName, stmt.isForceDrop());
|
||||
}
|
||||
|
||||
public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop) {
|
||||
public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay) {
|
||||
if (table.getType() == TableType.ELASTICSEARCH) {
|
||||
esRepository.deRegisterTable(table.getId());
|
||||
} else if (table.getType() == TableType.OLAP) {
|
||||
@ -4387,6 +4392,10 @@ public class Catalog {
|
||||
db.dropTable(table.getName());
|
||||
if (!isForceDrop) {
|
||||
Catalog.getCurrentRecycleBin().recycleTable(db.getId(), table);
|
||||
} else {
|
||||
if (table.getType() == TableType.OLAP) {
|
||||
Catalog.getCurrentCatalog().onEraseOlapTable((OlapTable) table, isReplay);
|
||||
}
|
||||
}
|
||||
|
||||
LOG.info("finished dropping table[{}] in db[{}]", table.getName(), db.getFullName());
|
||||
@ -4399,7 +4408,7 @@ public class Catalog {
|
||||
db.writeLock();
|
||||
table.writeLock();
|
||||
try {
|
||||
unprotectDropTable(db, table, isForceDrop);
|
||||
unprotectDropTable(db, table, isForceDrop, true);
|
||||
} finally {
|
||||
table.writeUnlock();
|
||||
db.writeUnlock();
|
||||
@ -6800,5 +6809,67 @@ public class Catalog {
|
||||
table.writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void eraseDatabase(long dbId, boolean needEditLog) {
|
||||
// remove jobs
|
||||
Catalog.getCurrentCatalog().getLoadInstance().removeDbLoadJob(dbId);
|
||||
Catalog.getCurrentCatalog().getSchemaChangeHandler().removeDbAlterJob(dbId);
|
||||
Catalog.getCurrentCatalog().getRollupHandler().removeDbAlterJob(dbId);
|
||||
|
||||
// remove database transaction manager
|
||||
Catalog.getCurrentCatalog().getGlobalTransactionMgr().removeDatabaseTransactionMgr(dbId);
|
||||
|
||||
if (needEditLog) {
|
||||
Catalog.getCurrentCatalog().getEditLog().logEraseDb(dbId);
|
||||
}
|
||||
}
|
||||
|
||||
public void onEraseOlapTable(OlapTable olapTable, boolean isReplay) {
|
||||
// inverted index
|
||||
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
|
||||
Collection<Partition> allPartitions = olapTable.getAllPartitions();
|
||||
for (Partition partition : allPartitions) {
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
|
||||
for (Tablet tablet : index.getTablets()) {
|
||||
invertedIndex.deleteTablet(tablet.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!isReplay) {
|
||||
// drop all replicas
|
||||
AgentBatchTask batchTask = new AgentBatchTask();
|
||||
for (Partition partition : olapTable.getAllPartitions()) {
|
||||
List<MaterializedIndex> allIndices = partition.getMaterializedIndices(IndexExtState.ALL);
|
||||
for (MaterializedIndex materializedIndex : allIndices) {
|
||||
long indexId = materializedIndex.getId();
|
||||
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
|
||||
for (Tablet tablet : materializedIndex.getTablets()) {
|
||||
long tabletId = tablet.getId();
|
||||
List<Replica> replicas = tablet.getReplicas();
|
||||
for (Replica replica : replicas) {
|
||||
long backendId = replica.getBackendId();
|
||||
DropReplicaTask dropTask = new DropReplicaTask(backendId, tabletId, schemaHash);
|
||||
batchTask.addTask(dropTask);
|
||||
} // end for replicas
|
||||
} // end for tablets
|
||||
} // end for indices
|
||||
} // end for partitions
|
||||
AgentTaskExecutor.submit(batchTask);
|
||||
}
|
||||
|
||||
// colocation
|
||||
Catalog.getCurrentColocateIndex().removeTable(olapTable.getId());
|
||||
}
|
||||
|
||||
public void onErasePartition(Partition partition) {
|
||||
// remove tablet in inverted index
|
||||
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
|
||||
for (Tablet tablet : index.getTablets()) {
|
||||
invertedIndex.deleteTablet(tablet.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -28,11 +28,7 @@ import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.common.util.RangeUtils;
|
||||
import org.apache.doris.persist.ColocatePersistInfo;
|
||||
import org.apache.doris.persist.RecoverInfo;
|
||||
import org.apache.doris.task.AgentBatchTask;
|
||||
import org.apache.doris.task.AgentTaskExecutor;
|
||||
import org.apache.doris.task.DropReplicaTask;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -47,7 +43,6 @@ import com.google.common.collect.Sets;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -147,18 +142,8 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
// erase db
|
||||
dbIter.remove();
|
||||
idToRecycleTime.remove(entry.getKey());
|
||||
|
||||
// remove jobs
|
||||
Catalog.getCurrentCatalog().getLoadInstance().removeDbLoadJob(db.getId());
|
||||
Catalog.getCurrentCatalog().getSchemaChangeHandler().removeDbAlterJob(db.getId());
|
||||
Catalog.getCurrentCatalog().getRollupHandler().removeDbAlterJob(db.getId());
|
||||
|
||||
// remove database transaction manager
|
||||
Catalog.getCurrentCatalog().getGlobalTransactionMgr().removeDatabaseTransactionMgr(db.getId());
|
||||
|
||||
// log
|
||||
Catalog.getCurrentCatalog().getEditLog().logEraseDb(entry.getKey());
|
||||
LOG.info("erase db[{}]", entry.getKey());
|
||||
Catalog.getCurrentCatalog().eraseDatabase(db.getId(), true);
|
||||
LOG.info("erase db[{}]", db.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -184,14 +169,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
public synchronized void replayEraseDatabase(long dbId) {
|
||||
idToDatabase.remove(dbId);
|
||||
idToRecycleTime.remove(dbId);
|
||||
|
||||
// remove jobs
|
||||
Catalog.getCurrentCatalog().getLoadInstance().removeDbLoadJob(dbId);
|
||||
Catalog.getCurrentCatalog().getSchemaChangeHandler().removeDbAlterJob(dbId);
|
||||
Catalog.getCurrentCatalog().getRollupHandler().removeDbAlterJob(dbId);
|
||||
|
||||
// remove database transaction manager
|
||||
Catalog.getCurrentCatalog().getGlobalTransactionMgr().removeDatabaseTransactionMgr(dbId);
|
||||
Catalog.getCurrentCatalog().eraseDatabase(dbId, false);
|
||||
LOG.info("replay erase db[{}]", dbId);
|
||||
}
|
||||
|
||||
@ -205,7 +183,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
|
||||
if (isExpire(tableId, currentTimeMs)) {
|
||||
if (table.getType() == TableType.OLAP) {
|
||||
onEraseOlapTable((OlapTable) table);
|
||||
Catalog.getCurrentCatalog().onEraseOlapTable((OlapTable) table, false);
|
||||
}
|
||||
|
||||
// erase table
|
||||
@ -219,45 +197,6 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
} // end for tables
|
||||
}
|
||||
|
||||
private void onEraseOlapTable(OlapTable olapTable) {
|
||||
// inverted index
|
||||
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
|
||||
Collection<Partition> allPartitions = olapTable.getAllPartitions();
|
||||
for (Partition partition : allPartitions) {
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
|
||||
for (Tablet tablet : index.getTablets()) {
|
||||
invertedIndex.deleteTablet(tablet.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// drop all replicas
|
||||
AgentBatchTask batchTask = new AgentBatchTask();
|
||||
for (Partition partition : olapTable.getAllPartitions()) {
|
||||
List<MaterializedIndex> allIndices = partition.getMaterializedIndices(IndexExtState.ALL);
|
||||
for (MaterializedIndex materializedIndex : allIndices) {
|
||||
long indexId = materializedIndex.getId();
|
||||
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
|
||||
for (Tablet tablet : materializedIndex.getTablets()) {
|
||||
long tabletId = tablet.getId();
|
||||
List<Replica> replicas = tablet.getReplicas();
|
||||
for (Replica replica : replicas) {
|
||||
long backendId = replica.getBackendId();
|
||||
DropReplicaTask dropTask = new DropReplicaTask(backendId, tabletId, schemaHash);
|
||||
batchTask.addTask(dropTask);
|
||||
} // end for replicas
|
||||
} // end for tablets
|
||||
} // end for indices
|
||||
} // end for partitions
|
||||
AgentTaskExecutor.submit(batchTask);
|
||||
|
||||
// colocation
|
||||
if (Catalog.getCurrentColocateIndex().removeTable(olapTable.getId())) {
|
||||
Catalog.getCurrentCatalog().getEditLog().logColocateRemoveTable(
|
||||
ColocatePersistInfo.createForRemoveTable(olapTable.getId()));
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void eraseTableWithSameName(long dbId, String tableName) {
|
||||
Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = idToTable.entrySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
@ -270,7 +209,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
Table table = tableInfo.getTable();
|
||||
if (table.getName().equals(tableName)) {
|
||||
if (table.getType() == TableType.OLAP) {
|
||||
onEraseOlapTable((OlapTable) table);
|
||||
Catalog.getCurrentCatalog().onEraseOlapTable((OlapTable) table, false);
|
||||
}
|
||||
|
||||
iterator.remove();
|
||||
@ -286,22 +225,9 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
|
||||
Table table = tableInfo.getTable();
|
||||
if (table.getType() == TableType.OLAP && !Catalog.isCheckpointThread()) {
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
|
||||
// remove tablet from inverted index
|
||||
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
|
||||
for (Partition partition : olapTable.getAllPartitions()) {
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
|
||||
for (Tablet tablet : index.getTablets()) {
|
||||
invertedIndex.deleteTablet(tablet.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
Catalog.getCurrentCatalog().onEraseOlapTable((OlapTable) table, true);
|
||||
}
|
||||
|
||||
// colocation
|
||||
Catalog.getCurrentColocateIndex().removeTable(tableId);
|
||||
|
||||
LOG.info("replay erase table[{}]", tableId);
|
||||
}
|
||||
|
||||
@ -314,14 +240,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
|
||||
long partitionId = entry.getKey();
|
||||
if (isExpire(partitionId, currentTimeMs)) {
|
||||
// remove tablet in inverted index
|
||||
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
|
||||
for (Tablet tablet : index.getTablets()) {
|
||||
invertedIndex.deleteTablet(tablet.getId());
|
||||
}
|
||||
}
|
||||
|
||||
Catalog.getCurrentCatalog().onErasePartition(partition);
|
||||
// erase partition
|
||||
iterator.remove();
|
||||
idToRecycleTime.remove(partitionId);
|
||||
@ -344,14 +263,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
|
||||
Partition partition = partitionInfo.getPartition();
|
||||
if (partition.getName().equals(partitionName)) {
|
||||
// remove tablet in inverted index
|
||||
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
|
||||
for (Tablet tablet : index.getTablets()) {
|
||||
invertedIndex.deleteTablet(tablet.getId());
|
||||
}
|
||||
}
|
||||
|
||||
Catalog.getCurrentCatalog().onErasePartition(partition);
|
||||
iterator.remove();
|
||||
idToRecycleTime.remove(entry.getKey());
|
||||
|
||||
@ -366,13 +278,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
|
||||
Partition partition = partitionInfo.getPartition();
|
||||
if (!Catalog.isCheckpointThread()) {
|
||||
// remove tablet from inverted index
|
||||
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
|
||||
for (Tablet tablet : index.getTablets()) {
|
||||
invertedIndex.deleteTablet(tablet.getId());
|
||||
}
|
||||
}
|
||||
Catalog.getCurrentCatalog().onErasePartition(partition);
|
||||
}
|
||||
|
||||
LOG.info("replay erase partition[{}]", partitionId);
|
||||
|
||||
@ -683,6 +683,8 @@ public class OlapTable extends Table {
|
||||
rangePartitionInfo.getDataProperty(partition.getId()),
|
||||
rangePartitionInfo.getReplicationNum(partition.getId()),
|
||||
rangePartitionInfo.getIsInMemory(partition.getId()));
|
||||
} else {
|
||||
Catalog.getCurrentCatalog().onErasePartition(partition);
|
||||
}
|
||||
|
||||
// drop partition info
|
||||
|
||||
Reference in New Issue
Block a user