From fe52988ef86de29795c64caf8d4d9efed4186bcb Mon Sep 17 00:00:00 2001 From: yujun Date: Fri, 20 Oct 2023 19:40:44 +0800 Subject: [PATCH] [improvement](tablet clone) tablet balance ignore deleted partitions (#25499) --- .../org/apache/doris/catalog/CatalogRecycleBin.java | 11 +++++++++++ .../apache/doris/catalog/TabletInvertedIndex.java | 13 +++++++++++++ .../org/apache/doris/clone/BeLoadRebalancer.java | 13 +++++++++++++ .../java/org/apache/doris/clone/DiskRebalancer.java | 12 ++++++++++++ .../org/apache/doris/clone/DiskRebalanceTest.java | 1 + .../java/org/apache/doris/clone/RebalanceTest.java | 1 + 6 files changed, 51 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java index 890a76ee24..b28a7fd08d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java @@ -194,6 +194,17 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { idToRecycleTime.put(id, recycleTime); } + public synchronized boolean isRecyclePartition(long dbId, long tableId, long partitionId) { + return idToDatabase.containsKey(dbId) || idToTable.containsKey(tableId) + || idToPartition.containsKey(partitionId); + } + + public synchronized void getRecycleIds(Set dbIds, Set tableIds, Set partitionIds) { + dbIds.addAll(idToDatabase.keySet()); + tableIds.addAll(idToTable.keySet()); + partitionIds.addAll(idToPartition.keySet()); + } + private synchronized boolean isExpire(long id, long currentTimeMs) { long latency = currentTimeMs - idToRecycleTime.get(id); return latency > minEraseLatency && latency > Config.catalog_trash_expire_second * 1000L; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index a2d5983aac..c1b7ca293b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -19,6 +19,7 @@ package org.apache.doris.catalog; import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.cooldown.CooldownConf; import org.apache.doris.task.PublishVersionTask; @@ -41,6 +42,7 @@ import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; import com.google.common.collect.Table; import com.google.common.collect.TreeMultimap; import org.apache.logging.log4j.LogManager; @@ -714,6 +716,13 @@ public class TabletInvertedIndex { // Only build from available bes, exclude colocate tables public Map> buildPartitionInfoBySkew( List availableBeIds) { + Set dbIds = Sets.newHashSet(); + Set tableIds = Sets.newHashSet(); + Set partitionIds = Sets.newHashSet(); + // Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread) + if (!FeConstants.runningUnitTest) { + Env.getCurrentRecycleBin().getRecycleIds(dbIds, tableIds, partitionIds); + } long stamp = readLock(); // 1. gen > @@ -733,6 +742,10 @@ public class TabletInvertedIndex { try { Preconditions.checkState(availableBeIds.contains(beId), "dead be " + beId); TabletMeta tabletMeta = tabletMetaMap.get(tabletId); + if (dbIds.contains(tabletMeta.getDbId()) || tableIds.contains(tabletMeta.getTableId()) + || partitionIds.contains(tabletMeta.getPartitionId())) { + continue; + } Preconditions.checkNotNull(tabletMeta, "invalid tablet " + tabletId); Preconditions.checkState( !Env.getCurrentColocateIndex().isColocateTable(tabletMeta.getTableId()), diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java index 1871d4d43a..4e52024c7b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java @@ -17,6 +17,7 @@ package org.apache.doris.clone; +import org.apache.doris.catalog.CatalogRecycleBin; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Replica; @@ -29,6 +30,7 @@ import org.apache.doris.clone.SchedException.SubCode; import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; @@ -115,6 +117,12 @@ public class BeLoadRebalancer extends Rebalancer { } LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium); + // Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread) + // so in clone ut recycleBin need to set to null. + CatalogRecycleBin recycleBin = null; + if (!FeConstants.runningUnitTest) { + recycleBin = Env.getCurrentRecycleBin(); + } int clusterAvailableBEnum = infoService.getAllBackendIds(true).size(); ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex(); // choose tablets from high load backends. @@ -178,6 +186,11 @@ public class BeLoadRebalancer extends Rebalancer { continue; } + if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(), + tabletMeta.getTableId(), tabletMeta.getPartitionId())) { + continue; + } + TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(), tabletMeta.getIndexId(), tabletId, null /* replica alloc is not used for balance*/, diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java index 63554e17b1..5edca91444 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java @@ -17,6 +17,7 @@ package org.apache.doris.clone; +import org.apache.doris.catalog.CatalogRecycleBin; import org.apache.doris.catalog.DataProperty; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -29,6 +30,7 @@ import org.apache.doris.clone.TabletSchedCtx.BalanceType; import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; @@ -160,6 +162,12 @@ public class DiskRebalancer extends Rebalancer { return alternativeTablets; } + // Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread) + // so in clone ut recycleBin need to set to null. + CatalogRecycleBin recycleBin = null; + if (!FeConstants.runningUnitTest) { + recycleBin = Env.getCurrentRecycleBin(); + } Set alternativeTabletIds = Sets.newHashSet(); Set unbalancedBEs = Sets.newHashSet(); // choose tablets from backends randomly. @@ -222,6 +230,10 @@ public class DiskRebalancer extends Rebalancer { if (tabletMeta == null) { continue; } + if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(), + tabletMeta.getTableId(), tabletMeta.getPartitionId())) { + continue; + } TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(), diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java index 457466d72a..0f62e637d0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java @@ -80,6 +80,7 @@ public class DiskRebalanceTest { @Before public void setUp() throws Exception { + FeConstants.runningUnitTest = true; Config.used_capacity_percent_max_diff = 1.0; Config.balance_slot_num_per_path = 1; db = new Database(1, "test db"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java index fe47338398..bc53ce068e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java @@ -90,6 +90,7 @@ public class RebalanceTest { @Before public void setUp() throws Exception { + FeConstants.runningUnitTest = true; db = new Database(1, "test db"); db.setClusterName(SystemInfoService.DEFAULT_CLUSTER); new Expectations() {