[Improve](ddl) Use RecycleBin to force drop db/table asynchronously (#26563)
This commit is contained in:
@ -21,6 +21,7 @@ import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
|
||||
import org.apache.doris.catalog.TableIf.TableType;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.FeMetaVersion;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
@ -55,6 +56,7 @@ import java.util.stream.Stream;
|
||||
|
||||
public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
private static final Logger LOG = LogManager.getLogger(CatalogRecycleBin.class);
|
||||
private static final int DEFAULT_INTERVAL_SECONDS = 30; // 30 seconds
|
||||
// erase meta at least after minEraseLatency milliseconds
|
||||
// to avoid erase log ahead of drop log
|
||||
private static final long minEraseLatency = 10 * 60 * 1000; // 10 min
|
||||
@ -66,7 +68,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
private Map<Long, Long> idToRecycleTime;
|
||||
|
||||
public CatalogRecycleBin() {
|
||||
super("recycle bin");
|
||||
super("recycle bin", FeConstants.runningUnitTest ? 10L : DEFAULT_INTERVAL_SECONDS * 1000L);
|
||||
idToDatabase = Maps.newHashMap();
|
||||
idToTable = Maps.newHashMap();
|
||||
idToPartition = Maps.newHashMap();
|
||||
@ -124,7 +126,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
}
|
||||
|
||||
public synchronized boolean recycleDatabase(Database db, Set<String> tableNames, Set<Long> tableIds,
|
||||
boolean isReplay, long replayRecycleTime) {
|
||||
boolean isReplay, boolean isForceDrop, long replayRecycleTime) {
|
||||
long recycleTime = 0;
|
||||
if (idToDatabase.containsKey(db.getId())) {
|
||||
LOG.error("db[{}] already in recycle bin.", db.getId());
|
||||
@ -137,17 +139,21 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
// recycle db
|
||||
RecycleDatabaseInfo databaseInfo = new RecycleDatabaseInfo(db, tableNames, tableIds);
|
||||
idToDatabase.put(db.getId(), databaseInfo);
|
||||
if (!isReplay || replayRecycleTime == 0) {
|
||||
if (isForceDrop) {
|
||||
// The 'force drop' database should be recycle immediately.
|
||||
recycleTime = 0;
|
||||
} else if (!isReplay || replayRecycleTime == 0) {
|
||||
recycleTime = System.currentTimeMillis();
|
||||
} else {
|
||||
recycleTime = replayRecycleTime;
|
||||
}
|
||||
idToRecycleTime.put(db.getId(), recycleTime);
|
||||
LOG.info("recycle db[{}-{}]", db.getId(), db.getFullName());
|
||||
LOG.info("recycle db[{}-{}], is force drop: {}", db.getId(), db.getFullName(), isForceDrop);
|
||||
return true;
|
||||
}
|
||||
|
||||
public synchronized boolean recycleTable(long dbId, Table table, boolean isReplay, long replayRecycleTime) {
|
||||
public synchronized boolean recycleTable(long dbId, Table table, boolean isReplay,
|
||||
boolean isForceDrop, long replayRecycleTime) {
|
||||
long recycleTime = 0;
|
||||
if (idToTable.containsKey(table.getId())) {
|
||||
LOG.error("table[{}] already in recycle bin.", table.getId());
|
||||
@ -156,14 +162,17 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
|
||||
// recycle table
|
||||
RecycleTableInfo tableInfo = new RecycleTableInfo(dbId, table);
|
||||
if (!isReplay || replayRecycleTime == 0) {
|
||||
if (isForceDrop) {
|
||||
// The 'force drop' table should be recycle immediately.
|
||||
recycleTime = 0;
|
||||
} else if (!isReplay || replayRecycleTime == 0) {
|
||||
recycleTime = System.currentTimeMillis();
|
||||
} else {
|
||||
recycleTime = replayRecycleTime;
|
||||
}
|
||||
idToRecycleTime.put(table.getId(), recycleTime);
|
||||
idToTable.put(table.getId(), tableInfo);
|
||||
LOG.info("recycle table[{}-{}]", table.getId(), table.getName());
|
||||
LOG.info("recycle table[{}-{}], is force drop: {}", table.getId(), table.getName(), isForceDrop);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -419,6 +428,11 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
LOG.info("before replay erase table[{}]", tableId);
|
||||
RecycleTableInfo tableInfo = idToTable.remove(tableId);
|
||||
idToRecycleTime.remove(tableId);
|
||||
if (tableInfo == null) {
|
||||
// FIXME(walter): Sometimes `eraseTable` in 'DROP DB ... FORCE' may be executed earlier than
|
||||
// finish drop db, especially in the case of drop db with many tables.
|
||||
return;
|
||||
}
|
||||
Table table = tableInfo.getTable();
|
||||
if (table.getType() == TableType.OLAP) {
|
||||
Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, true);
|
||||
@ -537,7 +551,9 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
|
||||
public synchronized Database recoverDatabase(String dbName, long dbId) throws DdlException {
|
||||
RecycleDatabaseInfo dbInfo = null;
|
||||
long recycleTime = -1;
|
||||
// The recycle time of the force dropped tables and databases will be set to zero, use 1 here to
|
||||
// skip these databases and tables.
|
||||
long recycleTime = 1;
|
||||
Iterator<Map.Entry<Long, RecycleDatabaseInfo>> iterator = idToDatabase.entrySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<Long, RecycleDatabaseInfo> entry = iterator.next();
|
||||
@ -616,7 +632,9 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
String newTableName) throws DdlException {
|
||||
// make sure to get db lock
|
||||
Table table = null;
|
||||
long recycleTime = -1;
|
||||
// The recycle time of the force dropped tables and databases will be set to zero, use 1 here to
|
||||
// skip these databases and tables.
|
||||
long recycleTime = 1;
|
||||
long dbId = db.getId();
|
||||
Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = idToTable.entrySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
|
||||
@ -822,22 +822,42 @@ public class TabletInvertedIndex {
|
||||
|
||||
// just for ut
|
||||
public Table<Long, Long, Replica> getReplicaMetaTable() {
|
||||
return replicaMetaTable;
|
||||
long stamp = readLock();
|
||||
try {
|
||||
return HashBasedTable.create(replicaMetaTable);
|
||||
} finally {
|
||||
readUnlock(stamp);
|
||||
}
|
||||
}
|
||||
|
||||
// just for ut
|
||||
public Table<Long, Long, Replica> getBackingReplicaMetaTable() {
|
||||
return backingReplicaMetaTable;
|
||||
long stamp = readLock();
|
||||
try {
|
||||
return HashBasedTable.create(backingReplicaMetaTable);
|
||||
} finally {
|
||||
readUnlock(stamp);
|
||||
}
|
||||
}
|
||||
|
||||
// just for ut
|
||||
public Table<Long, Long, TabletMeta> getTabletMetaTable() {
|
||||
return tabletMetaTable;
|
||||
long stamp = readLock();
|
||||
try {
|
||||
return HashBasedTable.create(tabletMetaTable);
|
||||
} finally {
|
||||
readUnlock(stamp);
|
||||
}
|
||||
}
|
||||
|
||||
// just for ut
|
||||
public Map<Long, TabletMeta> getTabletMetaMap() {
|
||||
return tabletMetaMap;
|
||||
long stamp = readLock();
|
||||
try {
|
||||
return new HashMap(tabletMetaMap);
|
||||
} finally {
|
||||
readUnlock(stamp);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isLocal(TStorageMedium storageMedium) {
|
||||
|
||||
@ -478,6 +478,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
|
||||
public void dropDb(DropDbStmt stmt) throws DdlException {
|
||||
String dbName = stmt.getDbName();
|
||||
LOG.info("begin drop database[{}], is force : {}", dbName, stmt.isForceDrop());
|
||||
|
||||
// 1. check if database exists
|
||||
if (!tryLock(false)) {
|
||||
@ -536,12 +537,8 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
MetaLockUtils.writeUnlockTables(tableList);
|
||||
}
|
||||
|
||||
if (!stmt.isForceDrop()) {
|
||||
Env.getCurrentRecycleBin().recycleDatabase(db, tableNames, tableIds, false, 0);
|
||||
recycleTime = Env.getCurrentRecycleBin().getRecycleTimeById(db.getId());
|
||||
} else {
|
||||
Env.getCurrentEnv().eraseDatabase(db.getId(), false);
|
||||
}
|
||||
Env.getCurrentRecycleBin().recycleDatabase(db, tableNames, tableIds, false, stmt.isForceDrop(), 0);
|
||||
recycleTime = Env.getCurrentRecycleBin().getRecycleTimeById(db.getId());
|
||||
} finally {
|
||||
db.writeUnlock();
|
||||
}
|
||||
@ -588,11 +585,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
} finally {
|
||||
MetaLockUtils.writeUnlockTables(tableList);
|
||||
}
|
||||
if (!isForceDrop) {
|
||||
Env.getCurrentRecycleBin().recycleDatabase(db, tableNames, tableIds, true, recycleTime);
|
||||
} else {
|
||||
Env.getCurrentEnv().eraseDatabase(db.getId(), false);
|
||||
}
|
||||
Env.getCurrentRecycleBin().recycleDatabase(db, tableNames, tableIds, true, isForceDrop, recycleTime);
|
||||
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getInternalCatalog().getId(), db.getId());
|
||||
} finally {
|
||||
db.writeUnlock();
|
||||
@ -861,6 +854,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
public void dropTable(DropTableStmt stmt) throws DdlException {
|
||||
String dbName = stmt.getDbName();
|
||||
String tableName = stmt.getTableName();
|
||||
LOG.info("begin to drop table: {} from db: {}, is force: {}", tableName, dbName, stmt.isForceDrop());
|
||||
|
||||
// check database
|
||||
Database db = (Database) getDbOrDdlException(dbName);
|
||||
@ -943,13 +937,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
}
|
||||
|
||||
db.dropTable(table.getName());
|
||||
if (!isForceDrop) {
|
||||
Env.getCurrentRecycleBin().recycleTable(db.getId(), table, isReplay, recycleTime);
|
||||
} else {
|
||||
if (table.getType() == TableType.OLAP) {
|
||||
Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, isReplay);
|
||||
}
|
||||
}
|
||||
Env.getCurrentRecycleBin().recycleTable(db.getId(), table, isReplay, isForceDrop, recycleTime);
|
||||
LOG.info("finished dropping table[{}] in db[{}]", table.getName(), db.getFullName());
|
||||
return true;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user