From 296b0c92f702675b92eee3c8af219f3862802fb2 Mon Sep 17 00:00:00 2001 From: gitccl <60637740+gitccl@users.noreply.github.com> Date: Mon, 24 Apr 2023 11:04:27 +0800 Subject: [PATCH] [Enhancement](compaction) stop tablet compaction when table dropped (#18702) * [Enhancement](compaction) stop tablet compaction when table dropped * fix be ut --- be/src/agent/task_worker_pool.cpp | 8 +++ be/src/olap/tablet.cpp | 5 ++ be/src/olap/tablet_meta.cpp | 13 +++- be/src/olap/tablet_meta.h | 14 ++++ .../olap/test_data/header_without_inc_rs.txt | 3 +- .../doris/catalog/CatalogRecycleBin.java | 29 +++++++- .../java/org/apache/doris/catalog/Env.java | 67 +++++++++++++++++++ .../doris/catalog/TabletInvertedIndex.java | 14 +++- .../org/apache/doris/catalog/TabletMeta.java | 17 +++++ .../apache/doris/master/ReportHandler.java | 17 +++++ .../doris/task/UpdateTabletMetaInfoTask.java | 21 ++++++ gensrc/proto/olap_file.proto | 1 + gensrc/thrift/AgentService.thrift | 4 +- gensrc/thrift/MasterService.thrift | 1 + 14 files changed, 207 insertions(+), 7 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index f93cee6a6b..b41e2b83e0 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1020,6 +1020,14 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() { tablet_meta_info.is_in_memory); } break; + case TTabletMetaType::MARKDROP: + if (tablet_meta_info.__isset.is_dropped) { + tablet->tablet_meta()->set_is_dropped(tablet_meta_info.is_dropped); + LOG_INFO("successfully set tablet is_dropped") + .tag("tablet_id", tablet_meta_info.tablet_id) + .tag("is_dropped", tablet_meta_info.is_dropped); + } + break; } } tablet->save_meta(); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 96311c4c77..90a42401ae 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -977,6 +977,10 @@ bool Tablet::can_do_compaction(size_t path_hash, CompactionType compaction_type) return false; } + if (_tablet_meta->is_dropped()) { + return false; + } + if (tablet_state() == TABLET_NOTREADY) { // In TABLET_NOTREADY, we keep last 10 versions in new tablet so base tablet max_version // not merged in new tablet and then we can do compaction @@ -1576,6 +1580,7 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info, // tablet may not have cooldowned data, but the storage policy is set tablet_info->__set_cooldown_term(_cooldown_term); } + tablet_info->__set_is_dropped(_tablet_meta->is_dropped()); } // should use this method to get a copy of current tablet meta diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index f5548e8061..67de2b83ea 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -253,6 +253,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id schema->set_store_row_column(tablet_schema.store_row_column); } + tablet_meta_pb.set_is_dropped(false); + init_from_pb(tablet_meta_pb); } @@ -276,7 +278,8 @@ TabletMeta::TabletMeta(const TabletMeta& b) _storage_policy_id(b._storage_policy_id), _cooldown_meta_id(b._cooldown_meta_id), _enable_unique_key_merge_on_write(b._enable_unique_key_merge_on_write), - _delete_bitmap(b._delete_bitmap) {}; + _delete_bitmap(b._delete_bitmap), + _is_dropped(b._is_dropped) {}; void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn, ColumnPB* column) { @@ -552,6 +555,12 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { delete_bitmap().delete_bitmap[{rst_id, seg_id, ver}] = roaring::Roaring::read(bitmap); } } + + if (tablet_meta_pb.has_is_dropped()) { + _is_dropped = tablet_meta_pb.is_dropped(); + } else { + _is_dropped = false; + } } void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { @@ -625,6 +634,8 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { *(delete_bitmap_pb->add_segment_delete_bitmaps()) = std::move(bitmap_data); } } + + tablet_meta_pb->set_is_dropped(_is_dropped); } uint32_t TabletMeta::mem_size() const { diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index d2870a8dfe..0acf5ddaee 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -217,6 +217,9 @@ public: bool enable_unique_key_merge_on_write() const { return _enable_unique_key_merge_on_write; } + bool is_dropped() const; + void set_is_dropped(bool is_dropped); + private: Status _save_meta(DataDir* data_dir); @@ -260,6 +263,9 @@ private: bool _enable_unique_key_merge_on_write = false; std::shared_ptr _delete_bitmap; + // _is_dropped is true means that the table has been dropped, and the tablet will not be compacted + bool _is_dropped = false; + mutable std::shared_mutex _meta_lock; }; @@ -578,6 +584,14 @@ inline bool TabletMeta::all_beta() const { return true; } +inline bool TabletMeta::is_dropped() const { + return _is_dropped; +} + +inline void TabletMeta::set_is_dropped(bool is_dropped) { + _is_dropped = is_dropped; +} + // Only for unit test now. bool operator==(const TabletMeta& a, const TabletMeta& b); bool operator!=(const TabletMeta& a, const TabletMeta& b); diff --git a/be/test/olap/test_data/header_without_inc_rs.txt b/be/test/olap/test_data/header_without_inc_rs.txt index e96b93e8cf..1824da3ab8 100644 --- a/be/test/olap/test_data/header_without_inc_rs.txt +++ b/be/test/olap/test_data/header_without_inc_rs.txt @@ -108,5 +108,6 @@ "preferred_rowset_type": "BETA_ROWSET", "tablet_type": "TABLET_TYPE_DISK", "replica_id": 0, - "enable_unique_key_merge_on_write": false + "enable_unique_key_merge_on_write": false, + "is_dropped": false } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java index 8fcab41cff..04964f55bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java @@ -162,6 +162,9 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { } idToRecycleTime.put(table.getId(), recycleTime); idToTable.put(table.getId(), tableInfo); + if (!Env.isCheckpointThread()) { + Env.getCurrentEnv().markTableDropped(table); + } LOG.info("recycle table[{}-{}]", table.getId(), table.getName()); return true; } @@ -180,6 +183,9 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { range, listPartitionItem, dataProperty, replicaAlloc, isInMemory, isMutable); idToRecycleTime.put(partition.getId(), System.currentTimeMillis()); idToPartition.put(partition.getId(), partitionInfo); + if (!Env.isCheckpointThread()) { + Env.getCurrentEnv().markPartitionDropped(partition); + } LOG.info("recycle partition[{}-{}]", partition.getId(), partition.getName()); return true; } @@ -590,6 +596,9 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { iterator.remove(); idToRecycleTime.remove(table.getId()); tableNames.remove(table.getName()); + if (!Env.isCheckpointThread()) { + Env.getCurrentEnv().unmarkTableDropped(table); + } } if (!tableNames.isEmpty()) { @@ -692,6 +701,10 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { RecoverInfo recoverInfo = new RecoverInfo(db.getId(), table.getId(), -1L, "", newTableName, ""); Env.getCurrentEnv().getEditLog().logRecoverTable(recoverInfo); } + + if (!Env.isCheckpointThread()) { + Env.getCurrentEnv().unmarkTableDropped(table); + } } finally { table.writeUnlock(); } @@ -774,6 +787,11 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { // log RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), partitionId, "", "", newPartitionName); Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo); + + if (!Env.isCheckpointThread()) { + Env.getCurrentEnv().unmarkPartitionDropped(recoverPartition); + } + LOG.info("recover partition[{}]", partitionId); } @@ -814,6 +832,10 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { iterator.remove(); idToRecycleTime.remove(partitionId); + if (!Env.isCheckpointThread()) { + Env.getCurrentEnv().unmarkPartitionDropped(recyclePartitionInfo.getPartition()); + } + LOG.info("replay recover partition[{}]", partitionId); break; } @@ -842,7 +864,9 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { long indexId = index.getId(); int schemaHash = olapTable.getSchemaHashByIndexId(indexId); for (Tablet tablet : index.getTablets()) { - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium); + // all tablets in RecycleBin are dropped + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium, + true); long tabletId = tablet.getId(); invertedIndex.addTablet(tabletId, tabletMeta); for (Replica replica : tablet.getReplicas()) { @@ -894,7 +918,8 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { long indexId = index.getId(); int schemaHash = olapTable.getSchemaHashByIndexId(indexId); for (Tablet tablet : index.getTablets()) { - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium); + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium, + true); long tabletId = tablet.getId(); invertedIndex.addTablet(tabletId, tabletMeta); for (Replica replica : tablet.getReplicas()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 8332ecb2f7..1c02e30e19 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -5352,6 +5352,73 @@ public class Env { return analysisManager.taskScheduler; } + /** + * mark all tablets of the table as dropped + */ + public void markTableDropped(Table table) { + if (table.getType() != TableType.OLAP) { + return; + } + + OlapTable olapTable = (OlapTable) table; + for (Partition partition : olapTable.getAllPartitions()) { + innerMarkPartitionDropped(partition, true); + } + + LOG.info("mark all tablets of table: {} as dropped", table.getName()); + } + + /** + * mark all tablets of the table as undropped + */ + public void unmarkTableDropped(Table table) { + if (table.getType() != TableType.OLAP) { + return; + } + + OlapTable olapTable = (OlapTable) table; + for (Partition partition : olapTable.getAllPartitions()) { + innerMarkPartitionDropped(partition, false); + } + + LOG.info("mark all tablets of table: {} as undropped", table.getName()); + } + + /** + * mark all tablets of the partition as dropped + */ + public void markPartitionDropped(Partition partition) { + innerMarkPartitionDropped(partition, true); + LOG.info("mark all tablets of partition: {} as dropped", partition.getName()); + } + + /** + * mark all tablets of the partition as undropped + */ + public void unmarkPartitionDropped(Partition partition) { + innerMarkPartitionDropped(partition, false); + LOG.info("mark all tablets of partition: {} as undropped", partition.getName()); + } + + private void innerMarkPartitionDropped(Partition partition, boolean isDropped) { + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + List allIndices = partition.getMaterializedIndices(IndexExtState.ALL); + for (MaterializedIndex materializedIndex : allIndices) { + for (Tablet tablet : materializedIndex.getTablets()) { + TabletMeta tabletMeta = invertedIndex.getTabletMeta(tablet.getId()); + if (tabletMeta == null) { + LOG.warn("cannot find tabletMeta of tabletId={}", tablet.getId()); + continue; + } + if (tabletMeta.getIsDropped() == isDropped) { + continue; + } + + tabletMeta.setIsDropped(isDropped); + } // end for tablets + } // end for indices + } + // TODO: // 1. handle partition level analysis statement properly // 2. support sample job diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 52c7ca8786..23eed25a84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -128,6 +128,7 @@ public class TabletInvertedIndex { ListMultimap transactionsToClear, ListMultimap tabletRecoveryMap, List> tabletToInMemory, + List> tabletToIsDropped, List cooldownConfToPush, List cooldownConfToUpdate) { long stamp = readLock(); @@ -155,6 +156,15 @@ public class TabletInvertedIndex { backendTabletInfo.getSchemaHash(), !backendTabletInfo.isIsInMemory())); } } + + if (tabletMeta.getIsDropped() != backendTabletInfo.isIsDropped()) { + synchronized (tabletToIsDropped) { + tabletToIsDropped.add( + new ImmutableTriple<>(tabletId, backendTabletInfo.getSchemaHash(), + tabletMeta.getIsDropped())); + } + } + // 1. (intersection) if (needSync(replica, backendTabletInfo)) { // need sync @@ -319,10 +329,10 @@ public class TabletInvertedIndex { LOG.info("finished to do tablet diff with backend[{}]. sync: {}." + " metaDel: {}. foundInMeta: {}. migration: {}. " + "found invalid transactions {}. found republish transactions {}. tabletInMemorySync: {}." - + " need recovery: {}. cost: {} ms", backendId, tabletSyncMap.size(), + + " need recovery: {}. tabletToIsDropped: {}. cost: {} ms", backendId, tabletSyncMap.size(), tabletDeleteFromMeta.size(), tabletFoundInMeta.size(), tabletMigrationMap.size(), transactionsToClear.size(), transactionsToPublish.size(), tabletToInMemory.size(), - tabletRecoveryMap.size(), (end - start)); + tabletRecoveryMap.size(), tabletToIsDropped.size(), (end - start)); } public Long getTabletIdByReplica(long replicaId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java index be291d93ec..616889725b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java @@ -35,8 +35,15 @@ public class TabletMeta { private TStorageMedium storageMedium; + private boolean isDropped; + public TabletMeta(long dbId, long tableId, long partitionId, long indexId, int schemaHash, TStorageMedium storageMedium) { + this(dbId, tableId, partitionId, indexId, schemaHash, storageMedium, false); + } + + public TabletMeta(long dbId, long tableId, long partitionId, long indexId, int schemaHash, + TStorageMedium storageMedium, boolean isDropped) { this.dbId = dbId; this.tableId = tableId; this.partitionId = partitionId; @@ -46,6 +53,7 @@ public class TabletMeta { this.newSchemaHash = -1; this.storageMedium = storageMedium; + this.isDropped = isDropped; } public long getDbId() { @@ -76,6 +84,14 @@ public class TabletMeta { return this.oldSchemaHash; } + public boolean getIsDropped() { + return isDropped; + } + + public void setIsDropped(boolean isDropped) { + this.isDropped = isDropped; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -88,4 +104,5 @@ public class TabletMeta { return sb.toString(); } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index d6dd0070ff..d986773db0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -422,6 +422,9 @@ public class ReportHandler extends Daemon { List> tabletToInMemory = Lists.newArrayList(); + // + List> tabletToIsDropped = Lists.newArrayList(); + List cooldownConfToPush = new LinkedList<>(); List cooldownConfToUpdate = new LinkedList<>(); @@ -435,6 +438,7 @@ public class ReportHandler extends Daemon { transactionsToClear, tabletRecoveryMap, tabletToInMemory, + tabletToIsDropped, cooldownConfToPush, cooldownConfToUpdate); @@ -479,6 +483,11 @@ public class ReportHandler extends Daemon { handleSetTabletInMemory(backendId, tabletToInMemory); } + // 10. send mark tablet isDropped to be + if (!tabletToIsDropped.isEmpty()) { + handleMarkTabletIsDropped(backendId, tabletToIsDropped); + } + // handle cooldown conf if (!cooldownConfToPush.isEmpty()) { handlePushCooldownConf(backendId, cooldownConfToPush); @@ -1039,6 +1048,14 @@ public class ReportHandler extends Daemon { AgentTaskExecutor.submit(batchTask); } + private static void handleMarkTabletIsDropped(long backendId, + List> tabletToIsDropped) { + AgentBatchTask batchTask = new AgentBatchTask(); + UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(tabletToIsDropped, backendId); + batchTask.addTask(task); + AgentTaskExecutor.submit(batchTask); + } + private static void handleClearTransactions(ListMultimap transactionsToClear, long backendId) { AgentBatchTask batchTask = new AgentBatchTask(); for (Long transactionId : transactionsToClear.keySet()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java index a39197f5ad..ac40776878 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java @@ -51,6 +51,9 @@ public class UpdateTabletMetaInfoTask extends AgentTask { // private List> tabletToInMemory; + // + private List> tabletToIsDropped; + public UpdateTabletMetaInfoTask(long backendId, Set> tableIdWithSchemaHash, TTabletMetaType metaType) { super(null, backendId, TTaskType.UPDATE_TABLET_META_INFO, @@ -77,6 +80,13 @@ public class UpdateTabletMetaInfoTask extends AgentTask { this.tabletToInMemory = tabletToInMemory; } + public UpdateTabletMetaInfoTask(List> tabletToIsDropped, long backendId) { + super(null, backendId, TTaskType.UPDATE_TABLET_META_INFO, + -1L, -1L, -1L, -1L, -1L, tabletToIsDropped.hashCode()); + this.metaType = TTabletMetaType.MARKDROP; + this.tabletToIsDropped = tabletToIsDropped; + } + public void countDownLatch(long backendId, Set> tablets) { if (this.latch != null) { if (latch.markedCountDown(backendId, tablets)) { @@ -154,6 +164,17 @@ public class UpdateTabletMetaInfoTask extends AgentTask { } break; } + case MARKDROP: { + for (Triple triple : tabletToIsDropped) { + TTabletMetaInfo metaInfo = new TTabletMetaInfo(); + metaInfo.setTabletId(triple.getLeft()); + metaInfo.setSchemaHash(triple.getMiddle()); + metaInfo.setIsDropped(triple.getRight()); + metaInfo.setMetaType(metaType); + metaInfos.add(metaInfo); + } + break; + } default: break; } diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 4d9c05b369..a3c238924d 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -298,6 +298,7 @@ message TabletMetaPB { optional bool enable_unique_key_merge_on_write = 24 [default = false]; optional int64 storage_policy_id = 25; optional PUniqueId cooldown_meta_id = 26; + optional bool is_dropped = 27; } message OLAPRawDeltaHeaderMessage { diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 91b2927705..11d5a5f024 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -355,7 +355,8 @@ struct TRecoverTabletReq { enum TTabletMetaType { PARTITIONID, - INMEMORY + INMEMORY, + MARKDROP } struct TTabletMetaInfo { @@ -366,6 +367,7 @@ struct TTabletMetaInfo { 5: optional bool is_in_memory // 6: optional string storage_policy; 7: optional i64 storage_policy_id + 8: optional bool is_dropped } struct TUpdateTabletMetaInfoReq { diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index 893bb0171a..446ed1cec8 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -46,6 +46,7 @@ struct TTabletInfo { // 18: optional bool is_cooldown 19: optional i64 cooldown_term 20: optional Types.TUniqueId cooldown_meta_id + 21: optional bool is_dropped } struct TFinishTaskRequest {