[improvement](balance) don't balance tablet which has unfinish alter job #39121 (#39202)

cherry pick from #39121
This commit is contained in:
yujun
2024-08-13 09:33:26 +08:00
committed by GitHub
parent ee6f9d0e90
commit f5e896f6eb
7 changed files with 64 additions and 36 deletions

View File

@ -78,6 +78,7 @@ import org.apache.doris.thrift.TTabletType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -86,6 +87,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
public class Alter {
private static final Logger LOG = LogManager.getLogger(Alter.class);
@ -892,6 +894,27 @@ public class Alter {
}
}
public Set<Long> getUnfinishedAlterTableIds() {
Set<Long> unfinishedTableIds = Sets.newHashSet();
for (AlterJobV2 job : schemaChangeHandler.getAlterJobsV2().values()) {
if (!job.isDone()) {
unfinishedTableIds.add(job.getTableId());
}
}
for (IndexChangeJob job : ((SchemaChangeHandler) schemaChangeHandler).getIndexChangeJobs().values()) {
if (!job.isDone()) {
unfinishedTableIds.add(job.getTableId());
}
}
for (AlterJobV2 job : materializedViewHandler.getAlterJobsV2().values()) {
if (!job.isDone()) {
unfinishedTableIds.add(job.getTableId());
}
}
return unfinishedTableIds;
}
public AlterHandler getSchemaChangeHandler() {
return schemaChangeHandler;
}

View File

@ -1734,6 +1734,10 @@ public class SchemaChangeHandler extends AlterHandler {
}
}
public Map<Long, IndexChangeJob> getIndexChangeJobs() {
return indexChangeJobs;
}
public List<List<Comparable>> getAllIndexChangeJobInfos() {
List<List<Comparable>> indexChangeJobInfos = new LinkedList<>();
for (IndexChangeJob indexChangeJob : ImmutableList.copyOf(indexChangeJobs.values())) {

View File

@ -17,9 +17,6 @@
package org.apache.doris.clone;
import org.apache.doris.catalog.CatalogRecycleBin;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
@ -31,7 +28,6 @@ import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
@ -120,15 +116,7 @@ public class BeLoadRebalancer extends Rebalancer {
LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium);
List<String> alternativeTabletInfos = Lists.newArrayList();
// Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread)
// so in clone ut recycleBin need to set to null.
CatalogRecycleBin recycleBin = null;
if (!FeConstants.runningUnitTest) {
recycleBin = Env.getCurrentRecycleBin();
}
int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
List<Set<Long>> lowBETablets = lowBEs.stream()
.map(beStat -> Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId())))
.collect(Collectors.toList());
@ -230,11 +218,7 @@ public class BeLoadRebalancer extends Rebalancer {
long replicaDataSize = replica.getDataSize();
if (remainingPaths.containsKey(replicaPathHash)) {
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
if (tabletMeta == null) {
continue;
}
if (colocateTableIndex.isColocateTable(tabletMeta.getTableId())) {
if (!canBalanceTablet(tabletMeta)) {
continue;
}
@ -245,11 +229,6 @@ public class BeLoadRebalancer extends Rebalancer {
continue;
}
if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(),
tabletMeta.getTableId(), tabletMeta.getPartitionId())) {
continue;
}
boolean isFit = lowBEs.stream().anyMatch(be -> be.isFit(replicaDataSize,
medium, null, false) == BalanceStatus.OK);
if (!isFit) {

View File

@ -17,7 +17,6 @@
package org.apache.doris.clone;
import org.apache.doris.catalog.CatalogRecycleBin;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
@ -59,6 +58,7 @@ public class DiskRebalancer extends Rebalancer {
public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex,
Map<Long, PathSlot> backendsWorkingSlots) {
super(infoService, invertedIndex, backendsWorkingSlots);
canBalanceColocateTable = true;
}
public List<BackendLoadStatistic> filterByPrioBackends(List<BackendLoadStatistic> bes) {
@ -163,12 +163,6 @@ public class DiskRebalancer extends Rebalancer {
return alternativeTablets;
}
// Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread)
// so in clone ut recycleBin need to set to null.
CatalogRecycleBin recycleBin = null;
if (!FeConstants.runningUnitTest) {
recycleBin = Env.getCurrentRecycleBin();
}
Set<Long> alternativeTabletIds = Sets.newHashSet();
Set<Long> unbalancedBEs = Sets.newHashSet();
// choose tablets from backends randomly.
@ -243,11 +237,7 @@ public class DiskRebalancer extends Rebalancer {
long replicaPathHash = replica.getPathHash();
if (remainingPaths.containsKey(replicaPathHash)) {
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
if (tabletMeta == null) {
continue;
}
if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(),
tabletMeta.getTableId(), tabletMeta.getPartitionId())) {
if (!canBalanceTablet(tabletMeta)) {
continue;
}

View File

@ -138,7 +138,7 @@ public class PartitionRebalancer extends Rebalancer {
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium));
BiPredicate<Long, TabletMeta> canMoveTablet = (Long tabletId, TabletMeta tabletMeta) -> {
return tabletMeta != null
return canBalanceTablet(tabletMeta)
&& tabletMeta.getPartitionId() == move.partitionId
&& tabletMeta.getIndexId() == move.indexId
&& !invalidIds.contains(tabletId)

View File

@ -17,9 +17,14 @@
package org.apache.doris.clone;
import org.apache.doris.catalog.CatalogRecycleBin;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
@ -29,13 +34,14 @@ import org.apache.doris.thrift.TStorageMedium;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.Set;
/*
* Rebalancer is responsible for
@ -61,6 +67,9 @@ public abstract class Rebalancer {
// be id -> end time of prio
protected Map<Long, Long> prioBackends = Maps.newConcurrentMap();
protected boolean canBalanceColocateTable = false;
private Set<Long> alterTableIds = Sets.newHashSet();
// tag -> (medium, timestamp)
private Table<Tag, TStorageMedium, Long> lastPickTimeTable = HashBasedTable.create();
@ -106,6 +115,21 @@ public abstract class Rebalancer {
return lastPickTime == null || now - lastPickTime >= Config.be_rebalancer_idle_seconds * 1000L;
}
protected boolean canBalanceTablet(TabletMeta tabletMeta) {
// Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread)
// so in clone ut recycleBin need to set to null.
ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
CatalogRecycleBin recycleBin = null;
if (!FeConstants.runningUnitTest) {
recycleBin = Env.getCurrentRecycleBin();
}
return tabletMeta != null
&& !alterTableIds.contains(tabletMeta.getTableId())
&& (canBalanceColocateTable || !colocateTableIndex.isColocateTable(tabletMeta.getTableId()))
&& (recycleBin == null || !recycleBin.isRecyclePartition(tabletMeta.getDbId(),
tabletMeta.getTableId(), tabletMeta.getPartitionId()));
}
public AgentTask createBalanceTask(TabletSchedCtx tabletCtx)
throws SchedException {
completeSchedCtx(tabletCtx);
@ -139,6 +163,10 @@ public abstract class Rebalancer {
this.statisticMap = statisticMap;
}
public void updateAlterTableIds(Set<Long> alterTableIds) {
this.alterTableIds = alterTableIds;
}
public void addPrioBackends(List<Backend> backends, long timeoutS) {
long currentTimeMillis = System.currentTimeMillis();
for (Backend backend : backends) {

View File

@ -352,6 +352,10 @@ public class TabletScheduler extends MasterDaemon {
rebalancer.updateLoadStatistic(statisticMap);
diskRebalancer.updateLoadStatistic(statisticMap);
Set<Long> alterTableIds = Env.getCurrentEnv().getAlterInstance().getUnfinishedAlterTableIds();
rebalancer.updateAlterTableIds(alterTableIds);
diskRebalancer.updateAlterTableIds(alterTableIds);
lastStatUpdateTime = System.currentTimeMillis();
}