[Enhancement](compaction) stop tablet compaction when table dropped (#18702)

* [Enhancement](compaction) stop tablet compaction when table dropped

* fix be ut
This commit is contained in:
gitccl
2023-04-24 11:04:27 +08:00
committed by GitHub
parent ab2a6864bc
commit 296b0c92f7
14 changed files with 207 additions and 7 deletions

View File

@ -162,6 +162,9 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
}
idToRecycleTime.put(table.getId(), recycleTime);
idToTable.put(table.getId(), tableInfo);
if (!Env.isCheckpointThread()) {
Env.getCurrentEnv().markTableDropped(table);
}
LOG.info("recycle table[{}-{}]", table.getId(), table.getName());
return true;
}
@ -180,6 +183,9 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
range, listPartitionItem, dataProperty, replicaAlloc, isInMemory, isMutable);
idToRecycleTime.put(partition.getId(), System.currentTimeMillis());
idToPartition.put(partition.getId(), partitionInfo);
if (!Env.isCheckpointThread()) {
Env.getCurrentEnv().markPartitionDropped(partition);
}
LOG.info("recycle partition[{}-{}]", partition.getId(), partition.getName());
return true;
}
@ -590,6 +596,9 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
iterator.remove();
idToRecycleTime.remove(table.getId());
tableNames.remove(table.getName());
if (!Env.isCheckpointThread()) {
Env.getCurrentEnv().unmarkTableDropped(table);
}
}
if (!tableNames.isEmpty()) {
@ -692,6 +701,10 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
RecoverInfo recoverInfo = new RecoverInfo(db.getId(), table.getId(), -1L, "", newTableName, "");
Env.getCurrentEnv().getEditLog().logRecoverTable(recoverInfo);
}
if (!Env.isCheckpointThread()) {
Env.getCurrentEnv().unmarkTableDropped(table);
}
} finally {
table.writeUnlock();
}
@ -774,6 +787,11 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
// log
RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), partitionId, "", "", newPartitionName);
Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo);
if (!Env.isCheckpointThread()) {
Env.getCurrentEnv().unmarkPartitionDropped(recoverPartition);
}
LOG.info("recover partition[{}]", partitionId);
}
@ -814,6 +832,10 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
iterator.remove();
idToRecycleTime.remove(partitionId);
if (!Env.isCheckpointThread()) {
Env.getCurrentEnv().unmarkPartitionDropped(recyclePartitionInfo.getPartition());
}
LOG.info("replay recover partition[{}]", partitionId);
break;
}
@ -842,7 +864,9 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
long indexId = index.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : index.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium);
// all tablets in RecycleBin are dropped
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium,
true);
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
@ -894,7 +918,8 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
long indexId = index.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : index.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium);
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium,
true);
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {

View File

@ -5352,6 +5352,73 @@ public class Env {
return analysisManager.taskScheduler;
}
/**
* mark all tablets of the table as dropped
*/
public void markTableDropped(Table table) {
if (table.getType() != TableType.OLAP) {
return;
}
OlapTable olapTable = (OlapTable) table;
for (Partition partition : olapTable.getAllPartitions()) {
innerMarkPartitionDropped(partition, true);
}
LOG.info("mark all tablets of table: {} as dropped", table.getName());
}
/**
* mark all tablets of the table as undropped
*/
public void unmarkTableDropped(Table table) {
if (table.getType() != TableType.OLAP) {
return;
}
OlapTable olapTable = (OlapTable) table;
for (Partition partition : olapTable.getAllPartitions()) {
innerMarkPartitionDropped(partition, false);
}
LOG.info("mark all tablets of table: {} as undropped", table.getName());
}
/**
* mark all tablets of the partition as dropped
*/
public void markPartitionDropped(Partition partition) {
innerMarkPartitionDropped(partition, true);
LOG.info("mark all tablets of partition: {} as dropped", partition.getName());
}
/**
* mark all tablets of the partition as undropped
*/
public void unmarkPartitionDropped(Partition partition) {
innerMarkPartitionDropped(partition, false);
LOG.info("mark all tablets of partition: {} as undropped", partition.getName());
}
private void innerMarkPartitionDropped(Partition partition, boolean isDropped) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
List<MaterializedIndex> allIndices = partition.getMaterializedIndices(IndexExtState.ALL);
for (MaterializedIndex materializedIndex : allIndices) {
for (Tablet tablet : materializedIndex.getTablets()) {
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tablet.getId());
if (tabletMeta == null) {
LOG.warn("cannot find tabletMeta of tabletId={}", tablet.getId());
continue;
}
if (tabletMeta.getIsDropped() == isDropped) {
continue;
}
tabletMeta.setIsDropped(isDropped);
} // end for tablets
} // end for indices
}
// TODO:
// 1. handle partition level analysis statement properly
// 2. support sample job

View File

@ -128,6 +128,7 @@ public class TabletInvertedIndex {
ListMultimap<Long, Long> transactionsToClear,
ListMultimap<Long, Long> tabletRecoveryMap,
List<Triple<Long, Integer, Boolean>> tabletToInMemory,
List<Triple<Long, Integer, Boolean>> tabletToIsDropped,
List<CooldownConf> cooldownConfToPush,
List<CooldownConf> cooldownConfToUpdate) {
long stamp = readLock();
@ -155,6 +156,15 @@ public class TabletInvertedIndex {
backendTabletInfo.getSchemaHash(), !backendTabletInfo.isIsInMemory()));
}
}
if (tabletMeta.getIsDropped() != backendTabletInfo.isIsDropped()) {
synchronized (tabletToIsDropped) {
tabletToIsDropped.add(
new ImmutableTriple<>(tabletId, backendTabletInfo.getSchemaHash(),
tabletMeta.getIsDropped()));
}
}
// 1. (intersection)
if (needSync(replica, backendTabletInfo)) {
// need sync
@ -319,10 +329,10 @@ public class TabletInvertedIndex {
LOG.info("finished to do tablet diff with backend[{}]. sync: {}."
+ " metaDel: {}. foundInMeta: {}. migration: {}. "
+ "found invalid transactions {}. found republish transactions {}. tabletInMemorySync: {}."
+ " need recovery: {}. cost: {} ms", backendId, tabletSyncMap.size(),
+ " need recovery: {}. tabletToIsDropped: {}. cost: {} ms", backendId, tabletSyncMap.size(),
tabletDeleteFromMeta.size(), tabletFoundInMeta.size(), tabletMigrationMap.size(),
transactionsToClear.size(), transactionsToPublish.size(), tabletToInMemory.size(),
tabletRecoveryMap.size(), (end - start));
tabletRecoveryMap.size(), tabletToIsDropped.size(), (end - start));
}
public Long getTabletIdByReplica(long replicaId) {

View File

@ -35,8 +35,15 @@ public class TabletMeta {
private TStorageMedium storageMedium;
private boolean isDropped;
public TabletMeta(long dbId, long tableId, long partitionId, long indexId, int schemaHash,
TStorageMedium storageMedium) {
this(dbId, tableId, partitionId, indexId, schemaHash, storageMedium, false);
}
public TabletMeta(long dbId, long tableId, long partitionId, long indexId, int schemaHash,
TStorageMedium storageMedium, boolean isDropped) {
this.dbId = dbId;
this.tableId = tableId;
this.partitionId = partitionId;
@ -46,6 +53,7 @@ public class TabletMeta {
this.newSchemaHash = -1;
this.storageMedium = storageMedium;
this.isDropped = isDropped;
}
public long getDbId() {
@ -76,6 +84,14 @@ public class TabletMeta {
return this.oldSchemaHash;
}
public boolean getIsDropped() {
return isDropped;
}
public void setIsDropped(boolean isDropped) {
this.isDropped = isDropped;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@ -88,4 +104,5 @@ public class TabletMeta {
return sb.toString();
}
}

View File

@ -422,6 +422,9 @@ public class ReportHandler extends Daemon {
List<Triple<Long, Integer, Boolean>> tabletToInMemory = Lists.newArrayList();
// <tablet id, tablet schema hash, tablet is dropped>
List<Triple<Long, Integer, Boolean>> tabletToIsDropped = Lists.newArrayList();
List<CooldownConf> cooldownConfToPush = new LinkedList<>();
List<CooldownConf> cooldownConfToUpdate = new LinkedList<>();
@ -435,6 +438,7 @@ public class ReportHandler extends Daemon {
transactionsToClear,
tabletRecoveryMap,
tabletToInMemory,
tabletToIsDropped,
cooldownConfToPush,
cooldownConfToUpdate);
@ -479,6 +483,11 @@ public class ReportHandler extends Daemon {
handleSetTabletInMemory(backendId, tabletToInMemory);
}
// 10. send mark tablet isDropped to be
if (!tabletToIsDropped.isEmpty()) {
handleMarkTabletIsDropped(backendId, tabletToIsDropped);
}
// handle cooldown conf
if (!cooldownConfToPush.isEmpty()) {
handlePushCooldownConf(backendId, cooldownConfToPush);
@ -1039,6 +1048,14 @@ public class ReportHandler extends Daemon {
AgentTaskExecutor.submit(batchTask);
}
private static void handleMarkTabletIsDropped(long backendId,
List<Triple<Long, Integer, Boolean>> tabletToIsDropped) {
AgentBatchTask batchTask = new AgentBatchTask();
UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(tabletToIsDropped, backendId);
batchTask.addTask(task);
AgentTaskExecutor.submit(batchTask);
}
private static void handleClearTransactions(ListMultimap<Long, Long> transactionsToClear, long backendId) {
AgentBatchTask batchTask = new AgentBatchTask();
for (Long transactionId : transactionsToClear.keySet()) {

View File

@ -51,6 +51,9 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
// <tablet id, tablet schema hash, tablet in memory>
private List<Triple<Long, Integer, Boolean>> tabletToInMemory;
// <tablet id, tablet schema hash, tablet is dropped>
private List<Triple<Long, Integer, Boolean>> tabletToIsDropped;
public UpdateTabletMetaInfoTask(long backendId, Set<Pair<Long, Integer>> tableIdWithSchemaHash,
TTabletMetaType metaType) {
super(null, backendId, TTaskType.UPDATE_TABLET_META_INFO,
@ -77,6 +80,13 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
this.tabletToInMemory = tabletToInMemory;
}
public UpdateTabletMetaInfoTask(List<Triple<Long, Integer, Boolean>> tabletToIsDropped, long backendId) {
super(null, backendId, TTaskType.UPDATE_TABLET_META_INFO,
-1L, -1L, -1L, -1L, -1L, tabletToIsDropped.hashCode());
this.metaType = TTabletMetaType.MARKDROP;
this.tabletToIsDropped = tabletToIsDropped;
}
public void countDownLatch(long backendId, Set<Pair<Long, Integer>> tablets) {
if (this.latch != null) {
if (latch.markedCountDown(backendId, tablets)) {
@ -154,6 +164,17 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
}
break;
}
case MARKDROP: {
for (Triple<Long, Integer, Boolean> triple : tabletToIsDropped) {
TTabletMetaInfo metaInfo = new TTabletMetaInfo();
metaInfo.setTabletId(triple.getLeft());
metaInfo.setSchemaHash(triple.getMiddle());
metaInfo.setIsDropped(triple.getRight());
metaInfo.setMetaType(metaType);
metaInfos.add(metaInfo);
}
break;
}
default:
break;
}