[improvement](tablet clone) tablet balance ignore deleted partitions (#25499)
This commit is contained in:
@ -194,6 +194,17 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
idToRecycleTime.put(id, recycleTime);
|
||||
}
|
||||
|
||||
public synchronized boolean isRecyclePartition(long dbId, long tableId, long partitionId) {
|
||||
return idToDatabase.containsKey(dbId) || idToTable.containsKey(tableId)
|
||||
|| idToPartition.containsKey(partitionId);
|
||||
}
|
||||
|
||||
public synchronized void getRecycleIds(Set<Long> dbIds, Set<Long> tableIds, Set<Long> partitionIds) {
|
||||
dbIds.addAll(idToDatabase.keySet());
|
||||
tableIds.addAll(idToTable.keySet());
|
||||
partitionIds.addAll(idToPartition.keySet());
|
||||
}
|
||||
|
||||
private synchronized boolean isExpire(long id, long currentTimeMs) {
|
||||
long latency = currentTimeMs - idToRecycleTime.get(id);
|
||||
return latency > minEraseLatency && latency > Config.catalog_trash_expire_second * 1000L;
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.catalog.Replica.ReplicaState;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.cooldown.CooldownConf;
|
||||
import org.apache.doris.task.PublishVersionTask;
|
||||
@ -41,6 +42,7 @@ import com.google.common.collect.ListMultimap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.collect.Table;
|
||||
import com.google.common.collect.TreeMultimap;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -714,6 +716,13 @@ public class TabletInvertedIndex {
|
||||
// Only build from available bes, exclude colocate tables
|
||||
public Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> buildPartitionInfoBySkew(
|
||||
List<Long> availableBeIds) {
|
||||
Set<Long> dbIds = Sets.newHashSet();
|
||||
Set<Long> tableIds = Sets.newHashSet();
|
||||
Set<Long> partitionIds = Sets.newHashSet();
|
||||
// Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread)
|
||||
if (!FeConstants.runningUnitTest) {
|
||||
Env.getCurrentRecycleBin().getRecycleIds(dbIds, tableIds, partitionIds);
|
||||
}
|
||||
long stamp = readLock();
|
||||
|
||||
// 1. gen <partitionId-indexId, <beId, replicaCount>>
|
||||
@ -733,6 +742,10 @@ public class TabletInvertedIndex {
|
||||
try {
|
||||
Preconditions.checkState(availableBeIds.contains(beId), "dead be " + beId);
|
||||
TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
|
||||
if (dbIds.contains(tabletMeta.getDbId()) || tableIds.contains(tabletMeta.getTableId())
|
||||
|| partitionIds.contains(tabletMeta.getPartitionId())) {
|
||||
continue;
|
||||
}
|
||||
Preconditions.checkNotNull(tabletMeta, "invalid tablet " + tabletId);
|
||||
Preconditions.checkState(
|
||||
!Env.getCurrentColocateIndex().isColocateTable(tabletMeta.getTableId()),
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.clone;
|
||||
|
||||
import org.apache.doris.catalog.CatalogRecycleBin;
|
||||
import org.apache.doris.catalog.ColocateTableIndex;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Replica;
|
||||
@ -29,6 +30,7 @@ import org.apache.doris.clone.SchedException.SubCode;
|
||||
import org.apache.doris.clone.TabletSchedCtx.Priority;
|
||||
import org.apache.doris.clone.TabletScheduler.PathSlot;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
@ -115,6 +117,12 @@ public class BeLoadRebalancer extends Rebalancer {
|
||||
}
|
||||
LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium);
|
||||
|
||||
// Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread)
|
||||
// so in clone ut recycleBin need to set to null.
|
||||
CatalogRecycleBin recycleBin = null;
|
||||
if (!FeConstants.runningUnitTest) {
|
||||
recycleBin = Env.getCurrentRecycleBin();
|
||||
}
|
||||
int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
|
||||
ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
|
||||
// choose tablets from high load backends.
|
||||
@ -178,6 +186,11 @@ public class BeLoadRebalancer extends Rebalancer {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(),
|
||||
tabletMeta.getTableId(), tabletMeta.getPartitionId())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE,
|
||||
tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(),
|
||||
tabletMeta.getIndexId(), tabletId, null /* replica alloc is not used for balance*/,
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.clone;
|
||||
|
||||
import org.apache.doris.catalog.CatalogRecycleBin;
|
||||
import org.apache.doris.catalog.DataProperty;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
@ -29,6 +30,7 @@ import org.apache.doris.clone.TabletSchedCtx.BalanceType;
|
||||
import org.apache.doris.clone.TabletSchedCtx.Priority;
|
||||
import org.apache.doris.clone.TabletScheduler.PathSlot;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
@ -160,6 +162,12 @@ public class DiskRebalancer extends Rebalancer {
|
||||
return alternativeTablets;
|
||||
}
|
||||
|
||||
// Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread)
|
||||
// so in clone ut recycleBin need to set to null.
|
||||
CatalogRecycleBin recycleBin = null;
|
||||
if (!FeConstants.runningUnitTest) {
|
||||
recycleBin = Env.getCurrentRecycleBin();
|
||||
}
|
||||
Set<Long> alternativeTabletIds = Sets.newHashSet();
|
||||
Set<Long> unbalancedBEs = Sets.newHashSet();
|
||||
// choose tablets from backends randomly.
|
||||
@ -222,6 +230,10 @@ public class DiskRebalancer extends Rebalancer {
|
||||
if (tabletMeta == null) {
|
||||
continue;
|
||||
}
|
||||
if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(),
|
||||
tabletMeta.getTableId(), tabletMeta.getPartitionId())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE,
|
||||
tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(),
|
||||
|
||||
@ -80,6 +80,7 @@ public class DiskRebalanceTest {
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
FeConstants.runningUnitTest = true;
|
||||
Config.used_capacity_percent_max_diff = 1.0;
|
||||
Config.balance_slot_num_per_path = 1;
|
||||
db = new Database(1, "test db");
|
||||
|
||||
@ -90,6 +90,7 @@ public class RebalanceTest {
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
FeConstants.runningUnitTest = true;
|
||||
db = new Database(1, "test db");
|
||||
db.setClusterName(SystemInfoService.DEFAULT_CLUSTER);
|
||||
new Expectations() {
|
||||
|
||||
Reference in New Issue
Block a user