From beec0e9169ece675bbaa17a83fda5bbacc7350d8 Mon Sep 17 00:00:00 2001 From: yujun Date: Tue, 18 Jul 2023 23:25:22 +0800 Subject: [PATCH] [Improvement](tablet clone) impr tablet sched speed and fix tablet sched failed too many times (#21856) --- docker/runtime/doris-compose/database.py | 10 +- .../java/org/apache/doris/common/Config.java | 16 +- .../java/org/apache/doris/catalog/Tablet.java | 15 +- .../apache/doris/clone/BeLoadRebalancer.java | 50 +- .../ColocateTableCheckerAndBalancer.java | 5 +- .../apache/doris/clone/DiskRebalancer.java | 4 +- .../doris/clone/PartitionRebalancer.java | 6 +- .../apache/doris/clone/SchedException.java | 16 + .../org/apache/doris/clone/TabletChecker.java | 6 +- .../apache/doris/clone/TabletSchedCtx.java | 245 ++++------ .../apache/doris/clone/TabletScheduler.java | 461 +++++++++++------- .../proc/TabletSchedulerDetailProcDir.java | 10 +- .../doris/clone/TabletSchedCtxTest.java | 12 +- .../cluster/DecommissionBackendTest.java | 2 +- .../doris/utframe/TestWithFeService.java | 5 +- 15 files changed, 491 insertions(+), 372 deletions(-) diff --git a/docker/runtime/doris-compose/database.py b/docker/runtime/doris-compose/database.py index 21aa400a47..57fc90275a 100644 --- a/docker/runtime/doris-compose/database.py +++ b/docker/runtime/doris-compose/database.py @@ -102,6 +102,7 @@ class DBManager(object): def decommission_be(self, be_endpoint): old_tablet_num = 0 id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")]) + start_ts = time.time() if id not in self.be_states: self._load_be_states() if id in self.be_states: @@ -132,8 +133,9 @@ class DBManager(object): return LOG.info( "Decommission be {} status: alive {}, decommissioned {}. " \ - "It is migrating its tablets, left {}/{} tablets." - .format(be_endpoint, be.alive, be.decommissioned, be.tablet_num, old_tablet_num)) + "It is migrating its tablets, left {}/{} tablets. Time elapse {} s." + .format(be_endpoint, be.alive, be.decommissioned, be.tablet_num, old_tablet_num, + int(time.time() - start_ts))) time.sleep(5) @@ -189,7 +191,7 @@ class DBManager(object): def _reset_conn(self): self.conn = pymysql.connect(user="root", host="127.0.0.1", - read_timeout = 10, + read_timeout=10, port=self.query_port) @@ -234,6 +236,6 @@ def get_db_mgr(cluster_name, required_load_succ=True): except Exception as e: if required_load_succ: raise e - LOG.exception(e) + #LOG.exception(e) return db_mgr diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index d614d7f9bc..29df2c3740 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -904,7 +904,21 @@ public class Config extends ConfigBase { * the default slot number per path in tablet scheduler * TODO(cmy): remove this config and dynamically adjust it by clone task statistic */ - @ConfField public static int schedule_slot_num_per_path = 2; + @ConfField(mutable = true, masterOnly = true) + public static int schedule_slot_num_per_path = 4; + + /** + * the default slot number per path in tablet scheduler for decommission backend + */ + @ConfField(mutable = true, masterOnly = true) + public static int schedule_decommission_slot_num_per_path = 8; + + /** + * the default batch size in tablet scheduler for a single schedule. + */ + @ConfField(mutable = true, masterOnly = true) + public static int schedule_batch_size = 50; + /** * Deprecated after 0.10 diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 74f1c31cbf..0e93a4dd0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -703,11 +703,24 @@ public class Tablet extends MetaObject implements Writable { * NORMAL: delay Config.tablet_repair_delay_factor_second * 2; * LOW: delay Config.tablet_repair_delay_factor_second * 3; */ - public boolean readyToBeRepaired(TabletSchedCtx.Priority priority) { + public boolean readyToBeRepaired(SystemInfoService infoService, TabletSchedCtx.Priority priority) { if (priority == Priority.VERY_HIGH) { return true; } + boolean allBeAliveOrDecommissioned = true; + for (Replica replica : replicas) { + Backend backend = infoService.getBackend(replica.getBackendId()); + if (backend == null || (!backend.isAlive() && !backend.isDecommissioned())) { + allBeAliveOrDecommissioned = false; + break; + } + } + + if (allBeAliveOrDecommissioned) { + return true; + } + long currentTime = System.currentTimeMillis(); // first check, wait for next round diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java index ebbebe6806..5317725881 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.clone.SchedException.Status; +import org.apache.doris.clone.SchedException.SubCode; import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.common.Config; @@ -166,7 +167,7 @@ public class BeLoadRebalancer extends Rebalancer { System.currentTimeMillis()); tabletCtx.setTag(clusterStat.getTag()); // balance task's priority is always LOW - tabletCtx.setOrigPriority(Priority.LOW); + tabletCtx.setPriority(Priority.LOW); alternativeTablets.add(tabletCtx); if (--numOfLowPaths <= 0) { @@ -262,7 +263,7 @@ public class BeLoadRebalancer extends Rebalancer { } // Select a low load backend as destination. - boolean setDest = false; + List candidates = Lists.newArrayList(); for (BackendLoadStatistic beStat : lowBe) { if (beStat.isAvailable() && replicas.stream().noneMatch(r -> r.getBackendId() == beStat.getBeId())) { // check if on same host. @@ -296,27 +297,36 @@ public class BeLoadRebalancer extends Rebalancer { continue; } - // classify the paths. - // And we only select path from 'low' and 'mid' paths - Set pathLow = Sets.newHashSet(); - Set pathMid = Sets.newHashSet(); - Set pathHigh = Sets.newHashSet(); - beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium()); - pathLow.addAll(pathMid); - - long pathHash = slot.takeAnAvailBalanceSlotFrom(pathLow); - if (pathHash == -1) { - LOG.debug("paths has no available balance slot: {}", pathLow); - } else { - tabletCtx.setDest(beStat.getBeId(), pathHash); - setDest = true; - break; - } + candidates.add(beStat); } } - if (!setDest) { - throw new SchedException(Status.SCHEDULE_FAILED, "unable to find low backend"); + if (candidates.isEmpty()) { + throw new SchedException(Status.UNRECOVERABLE, "unable to find low backend"); } + + for (BackendLoadStatistic beStat : candidates) { + PathSlot slot = backendsWorkingSlots.get(beStat.getBeId()); + if (slot == null) { + continue; + } + + // classify the paths. + // And we only select path from 'low' and 'mid' paths + Set pathLow = Sets.newHashSet(); + Set pathMid = Sets.newHashSet(); + Set pathHigh = Sets.newHashSet(); + beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium()); + pathLow.addAll(pathMid); + + long pathHash = slot.takeAnAvailBalanceSlotFrom(pathLow); + if (pathHash != -1) { + tabletCtx.setDest(beStat.getBeId(), pathHash); + return; + } + } + + throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT, + "unable to find low backend"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java index b3fdc3ca71..b34bc926dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java @@ -202,6 +202,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { */ private void matchGroup() { Env env = Env.getCurrentEnv(); + SystemInfoService infoService = Env.getCurrentSystemInfo(); ColocateTableIndex colocateIndex = env.getColocateTableIndex(); TabletScheduler tabletScheduler = env.getTabletScheduler(); @@ -254,7 +255,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { + " status: %s", tablet.getId(), st); LOG.debug(unstableReason); - if (!tablet.readyToBeRepaired(Priority.NORMAL)) { + if (!tablet.readyToBeRepaired(infoService, Priority.NORMAL)) { continue; } @@ -265,7 +266,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { System.currentTimeMillis()); // the tablet status will be set again when being scheduled tabletCtx.setTabletStatus(st); - tabletCtx.setOrigPriority(Priority.NORMAL); + tabletCtx.setPriority(Priority.NORMAL); tabletCtx.setTabletOrderIdx(idx); AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java index 9d676d950c..abac0c2d1a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java @@ -208,10 +208,10 @@ public class DiskRebalancer extends Rebalancer { tabletCtx.setTag(clusterStat.getTag()); if (prioBackends.containsKey(beStat.getBeId())) { // priority of balance task of prio BE is NORMAL - tabletCtx.setOrigPriority(Priority.NORMAL); + tabletCtx.setPriority(Priority.NORMAL); } else { // balance task's default priority is LOW - tabletCtx.setOrigPriority(Priority.LOW); + tabletCtx.setPriority(Priority.LOW); } // we must set balanceType to DISK_BALANCE for create migration task tabletCtx.setBalanceType(BalanceType.DISK_BALANCE); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java index 9e3e37ef00..d9d3f27cc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java @@ -151,7 +151,7 @@ public class PartitionRebalancer extends Rebalancer { System.currentTimeMillis()); tabletCtx.setTag(clusterStat.getTag()); // Balance task's priority is always LOW - tabletCtx.setOrigPriority(TabletSchedCtx.Priority.LOW); + tabletCtx.setPriority(TabletSchedCtx.Priority.LOW); alternativeTablets.add(tabletCtx); // Pair, ToDeleteReplicaId should be -1L before scheduled successfully movesInProgress.get().put(pickedTabletId, @@ -251,7 +251,7 @@ public class PartitionRebalancer extends Rebalancer { if (slot.takeBalanceSlot(srcReplica.getPathHash()) != -1) { tabletCtx.setSrc(srcReplica); } else { - throw new SchedException(SchedException.Status.SCHEDULE_FAILED, + throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SchedException.SubCode.WAITING_SLOT, "no slot for src replica " + srcReplica + ", pathHash " + srcReplica.getPathHash()); } @@ -269,7 +269,7 @@ public class PartitionRebalancer extends Rebalancer { .map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet()); long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath); if (pathHash == -1) { - throw new SchedException(SchedException.Status.SCHEDULE_FAILED, + throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SchedException.SubCode.WAITING_SLOT, "paths has no available balance slot: " + availPath); } else { tabletCtx.setDest(beStat.getBeId(), pathHash); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java b/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java index 0ad83e8909..a343e6543c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java @@ -27,14 +27,30 @@ public class SchedException extends Exception { FINISHED // schedule is done, remove the tablet from tablet scheduler with status FINISHED } + public enum SubCode { + NONE, + WAITING_DECOMMISSION, + WAITING_SLOT, + } + private Status status; + private SubCode subCode; public SchedException(Status status, String errorMsg) { + this(status, SubCode.NONE, errorMsg); + } + + public SchedException(Status status, SubCode subCode, String errorMsg) { super(errorMsg); this.status = status; + this.subCode = subCode; } public Status getStatus() { return status; } + + public SubCode getSubCode() { + return subCode; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java index 67226eae67..d9330e0f16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java @@ -372,9 +372,7 @@ public class TabletChecker extends MasterDaemon { } counter.unhealthyTabletNum++; - - if (!tablet.readyToBeRepaired(statusWithPrio.second)) { - counter.tabletNotReady++; + if (!tablet.readyToBeRepaired(infoService, statusWithPrio.second)) { continue; } @@ -386,7 +384,7 @@ public class TabletChecker extends MasterDaemon { System.currentTimeMillis()); // the tablet status will be set again when being scheduled tabletCtx.setTabletStatus(statusWithPrio.first); - tabletCtx.setOrigPriority(statusWithPrio.second); + tabletCtx.setPriority(statusWithPrio.second); AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */); if (res == AddResult.LIMIT_EXCEED || res == AddResult.DISABLED) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 58c411c40f..286b70f65e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -29,6 +29,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.Tablet.TabletStatus; import org.apache.doris.clone.SchedException.Status; +import org.apache.doris.clone.SchedException.SubCode; import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; @@ -102,6 +103,8 @@ public class TabletSchedCtx implements Comparable { */ private static final int RUNNING_FAILED_COUNTER_THRESHOLD = 3; + public static final int FINISHED_COUNTER_THRESHOLD = 3; + private static VersionCountComparator VERSION_COUNTER_COMPARATOR = new VersionCountComparator(); public enum Type { @@ -117,22 +120,6 @@ public class TabletSchedCtx implements Comparable { NORMAL, HIGH, VERY_HIGH; - - // VERY_HIGH can only be downgraded to NORMAL - // LOW can only be upgraded to HIGH - public Priority adjust(Priority origPriority, boolean isUp) { - switch (this) { - case VERY_HIGH: - return isUp ? VERY_HIGH : HIGH; - case HIGH: - return isUp ? (origPriority == LOW ? HIGH : VERY_HIGH) : NORMAL; - case NORMAL: - return isUp ? HIGH : (origPriority == Priority.VERY_HIGH ? NORMAL : LOW); - default: - return isUp ? NORMAL : LOW; - } - } - } public enum State { @@ -147,23 +134,19 @@ public class TabletSchedCtx implements Comparable { private Type type; private BalanceType balanceType; - /* - * origPriority is the origin priority being set when this tablet being added to scheduler. - * dynamicPriority will be set during tablet schedule processing, it will not be prior than origin priority. - * And dynamic priority is also used in priority queue compare in tablet scheduler. - */ - private Priority origPriority; - private Priority dynamicPriority; + private Priority priority; // we change the dynamic priority based on how many times it fails to be scheduled private int failedSchedCounter = 0; // clone task failed counter private int failedRunningCounter = 0; + // When finish a tablet ctx, it will check the tablet's health status. + // If the tablet is unhealthy, it will add a new ctx. + // The new ctx's finishedCounter = old ctx's finishedCounter + 1. + private int finishedCounter = 0; // last time this tablet being scheduled private long lastSchedTime = 0; - // last time the dynamic priority being adjusted - private long lastAdjustPrioTime = 0; // last time this tablet being visited. // being visited means: @@ -180,6 +163,8 @@ public class TabletSchedCtx implements Comparable { private State state; private TabletStatus tabletStatus; + private long decommissionTime = -1; + private long dbId; private long tblId; private long partitionId; @@ -223,6 +208,8 @@ public class TabletSchedCtx implements Comparable { // tag is only set for BALANCE task, used to identify which workload group this Balance job is in private Tag tag; + private SubCode schedFailedCode; + public TabletSchedCtx(Type type, long dbId, long tblId, long partId, long idxId, long tabletId, ReplicaAllocation replicaAlloc, long createTime) { this.type = type; @@ -236,12 +223,17 @@ public class TabletSchedCtx implements Comparable { this.state = State.PENDING; this.replicaAlloc = replicaAlloc; this.balanceType = BalanceType.BE_BALANCE; + this.schedFailedCode = SubCode.NONE; } public ReplicaAllocation getReplicaAlloc() { return replicaAlloc; } + public void setReplicaAlloc(ReplicaAllocation replicaAlloc) { + this.replicaAlloc = replicaAlloc; + } + public void setTag(Tag tag) { this.tag = tag; } @@ -266,21 +258,20 @@ public class TabletSchedCtx implements Comparable { return balanceType; } - public Priority getOrigPriority() { - return origPriority; + public Priority getPriority() { + return priority; } - public void setOrigPriority(Priority origPriority) { - this.origPriority = origPriority; - // reset dynamic priority along with the origin priority being set. - this.dynamicPriority = origPriority; - this.failedSchedCounter = 0; - this.lastSchedTime = 0; - this.lastAdjustPrioTime = 0; + public void setPriority(Priority priority) { + this.priority = priority; } - public Priority getDynamicPriority() { - return dynamicPriority; + public int getFinishedCounter() { + return finishedCounter; + } + + public void setFinishedCounter(int finishedCounter) { + this.finishedCounter = finishedCounter; } public void increaseFailedSchedCounter() { @@ -295,8 +286,27 @@ public class TabletSchedCtx implements Comparable { ++failedRunningCounter; } - public int getFailedRunningCounter() { - return failedRunningCounter; + public boolean isExceedFailedRunningLimit() { + return failedRunningCounter >= RUNNING_FAILED_COUNTER_THRESHOLD; + } + + public boolean onSchedFailedAndCheckExceedLimit(SubCode code) { + schedFailedCode = code; + failedSchedCounter++; + if (code == SubCode.WAITING_DECOMMISSION) { + failedSchedCounter = 0; + if (decommissionTime < 0) { + decommissionTime = System.currentTimeMillis(); + } + return System.currentTimeMillis() > decommissionTime + 10 * 60 * 1000L; + } else { + decommissionTime = -1; + if (code == SubCode.WAITING_SLOT && type != Type.BALANCE) { + return failedSchedCounter > 30 * 1000 / TabletScheduler.SCHEDULE_INTERVAL_MS; + } else { + return failedSchedCounter > 10; + } + } } public void setLastSchedTime(long lastSchedTime) { @@ -311,6 +321,10 @@ public class TabletSchedCtx implements Comparable { this.finishedTime = finishedTime; } + public void setDecommissionTime(long decommissionTime) { + this.decommissionTime = decommissionTime; + } + public State getState() { return state; } @@ -615,7 +629,8 @@ public class TabletSchedCtx implements Comparable { setSrc(srcReplica); return; } - throw new SchedException(Status.SCHEDULE_FAILED, "unable to find source slot"); + throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT, + "unable to find source slot"); } /* @@ -641,7 +656,7 @@ public class TabletSchedCtx implements Comparable { */ public void chooseDestReplicaForVersionIncomplete(Map backendsWorkingSlots) throws SchedException { - Replica chosenReplica = null; + List candidates = Lists.newArrayList(); for (Replica replica : tablet.getReplicas()) { if (replica.isBad()) { LOG.debug("replica {} is bad, skip. tablet: {}", @@ -660,18 +675,37 @@ public class TabletSchedCtx implements Comparable { // if the replica's state is DECOMMISSION, it may be chose as dest replica, // and its state will be set to NORMAL later. if (replica.getLastFailedVersion() <= 0 - && ((replica.getVersion() == visibleVersion) - || replica.getVersion() > visibleVersion) && replica.getState() != ReplicaState.DECOMMISSION) { + && replica.getVersion() >= visibleVersion + && replica.getState() != ReplicaState.DECOMMISSION) { // skip healthy replica LOG.debug("replica {} version {} is healthy, visible version {}, replica state {}, skip. tablet: {}", replica.getId(), replica.getVersion(), visibleVersion, replica.getState(), tabletId); continue; } + candidates.add(replica); + } + + if (candidates.isEmpty()) { + throw new SchedException(Status.UNRECOVERABLE, "unable to choose dest replica"); + } + + Replica chosenReplica = null; + for (Replica replica : candidates) { + PathSlot slot = backendsWorkingSlots.get(replica.getBackendId()); + if (slot == null || !slot.hasAvailableSlot(replica.getPathHash())) { + if (!replica.needFurtherRepair()) { + throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT, + "replica " + replica + " has not slot"); + } + + continue; + } + if (replica.needFurtherRepair()) { + chosenReplica = replica; LOG.debug("replica {} need further repair, choose it. tablet: {}", replica.getId(), tabletId); - chosenReplica = replica; break; } @@ -686,20 +720,19 @@ public class TabletSchedCtx implements Comparable { } } - if (chosenReplica == null) { - throw new SchedException(Status.SCHEDULE_FAILED, "unable to choose dest replica"); - } - // check if the dest replica has available slot + // it should not happen cause it just check hasAvailableSlot yet. PathSlot slot = backendsWorkingSlots.get(chosenReplica.getBackendId()); if (slot == null) { - throw new SchedException(Status.SCHEDULE_FAILED, "backend of dest replica is missing"); + throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT, + "backend of dest replica is missing"); } - long destPathHash = slot.takeSlot(chosenReplica.getPathHash()); if (destPathHash == -1) { - throw new SchedException(Status.SCHEDULE_FAILED, "unable to take slot of dest path"); + throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT, + "unable to take slot of dest path"); } + if (chosenReplica.getState() == ReplicaState.DECOMMISSION) { // Since this replica is selected as the repair object of VERSION_INCOMPLETE, // it means that this replica needs to be able to accept loading data. @@ -717,6 +750,7 @@ public class TabletSchedCtx implements Comparable { // forever, because the replica in the DECOMMISSION state will not receive the load task. chosenReplica.setWatermarkTxnId(-1); chosenReplica.setState(ReplicaState.NORMAL); + setDecommissionTime(-1); LOG.info("choose replica {} on backend {} of tablet {} as dest replica for version incomplete," + " and change state from DECOMMISSION to NORMAL", chosenReplica.getId(), chosenReplica.getBackendId(), tabletId); @@ -941,7 +975,7 @@ public class TabletSchedCtx implements Comparable { cloneTask.getDbId(), cloneTask.getTableId(), cloneTask.getPartitionId(), cloneTask.getIndexId(), cloneTask.getTabletId(), cloneTask.getBackendId(), dbId, tblId, partitionId, indexId, tablet.getId(), destBackendId); - throw new SchedException(Status.RUNNING_FAILED, msg); + throw new SchedException(Status.UNRECOVERABLE, msg); } // 1. check the tablet status first @@ -1041,13 +1075,6 @@ public class TabletSchedCtx implements Comparable { state = State.FINISHED; LOG.info("clone finished: {}", this); - } catch (SchedException e) { - // if failed to too many times, remove this task - ++failedRunningCounter; - if (failedRunningCounter > RUNNING_FAILED_COUNTER_THRESHOLD) { - throw new SchedException(Status.UNRECOVERABLE, e.getMessage()); - } - throw e; } finally { olapTable.writeUnlock(); } @@ -1061,73 +1088,6 @@ public class TabletSchedCtx implements Comparable { } } - /* - * we try to adjust the priority based on schedule history - * 1. If failed counter is larger than FAILED_COUNTER_THRESHOLD, which means this tablet is being scheduled - * at least FAILED_TIME_THRESHOLD times and all are failed. So we downgrade its priority. - * Also reset the failedCounter, or it will be downgraded forever. - * - * 2. Else, if it has been a long time since last time the tablet being scheduled, we upgrade its - * priority to let it more available to be scheduled. - * - * The time gap between adjustment should be larger than MIN_ADJUST_PRIORITY_INTERVAL_MS, to avoid - * being downgraded too fast. - * - * eg: - * A tablet has been scheduled for 5 times and all were failed. its priority will be downgraded. And if it is - * scheduled for 5 times and all are failed again, it will be downgraded again, until to the LOW. - * And than, because of LOW, this tablet can not be scheduled for a long time, and it will be upgraded - * to NORMAL, if still not being scheduled, it will be upgraded up to VERY_HIGH. - * - * return true if dynamic priority changed - */ - public boolean adjustPriority(TabletSchedulerStat stat) { - long currentTime = System.currentTimeMillis(); - if (lastAdjustPrioTime == 0) { - // skip the first time we adjust this priority - lastAdjustPrioTime = currentTime; - return false; - } else { - if (currentTime - lastAdjustPrioTime < MIN_ADJUST_PRIORITY_INTERVAL_MS) { - return false; - } - } - - boolean isDowngrade = false; - boolean isUpgrade = false; - - if (failedSchedCounter > SCHED_FAILED_COUNTER_THRESHOLD) { - isDowngrade = true; - } else { - long lastTime = lastSchedTime == 0 ? createTime : lastSchedTime; - if (currentTime - lastTime > MAX_NOT_BEING_SCHEDULED_INTERVAL_MS) { - isUpgrade = true; - } - } - - Priority originDynamicPriority = dynamicPriority; - if (isDowngrade) { - dynamicPriority = dynamicPriority.adjust(origPriority, false /* downgrade */); - failedSchedCounter = 0; - if (originDynamicPriority != dynamicPriority) { - LOG.debug("downgrade dynamic priority from {} to {}, origin: {}, tablet: {}", - originDynamicPriority.name(), dynamicPriority.name(), origPriority.name(), tabletId); - stat.counterTabletPrioDowngraded.incrementAndGet(); - return true; - } - } else if (isUpgrade) { - dynamicPriority = dynamicPriority.adjust(origPriority, true /* upgrade */); - // no need to set lastSchedTime, lastSchedTime is set each time we schedule this tablet - if (originDynamicPriority != dynamicPriority) { - LOG.debug("upgrade dynamic priority from {} to {}, origin: {}, tablet: {}", - originDynamicPriority.name(), dynamicPriority.name(), origPriority.name(), tabletId); - stat.counterTabletPrioUpgraded.incrementAndGet(); - return true; - } - } - return false; - } - public boolean isTimeout() { if (state != TabletSchedCtx.State.RUNNING) { return false; @@ -1144,8 +1104,8 @@ public class TabletSchedCtx implements Comparable { result.add(storageMedium == null ? FeConstants.null_string : storageMedium.name()); result.add(tabletStatus == null ? FeConstants.null_string : tabletStatus.name()); result.add(state.name()); - result.add(origPriority.name()); - result.add(dynamicPriority.name()); + result.add(schedFailedCode.name()); + result.add(priority.name()); result.add(srcReplica == null ? "-1" : String.valueOf(srcReplica.getBackendId())); result.add(String.valueOf(srcPathHash)); result.add(String.valueOf(destBackendId)); @@ -1158,7 +1118,6 @@ public class TabletSchedCtx implements Comparable { result.add(copyTimeMs > 0 ? String.valueOf(copySize / copyTimeMs / 1000.0) : FeConstants.null_string); result.add(String.valueOf(failedSchedCounter)); result.add(String.valueOf(failedRunningCounter)); - result.add(TimeUtils.longToTimeString(lastAdjustPrioTime)); result.add(String.valueOf(visibleVersion)); result.add(String.valueOf(committedVersion)); result.add(Strings.nullToEmpty(errMsg)); @@ -1171,19 +1130,23 @@ public class TabletSchedCtx implements Comparable { */ @Override public int compareTo(TabletSchedCtx o) { - if (dynamicPriority.ordinal() < o.dynamicPriority.ordinal()) { - return 1; - } else if (dynamicPriority.ordinal() > o.dynamicPriority.ordinal()) { - return -1; - } else { - if (lastVisitedTime < o.lastVisitedTime) { - return -1; - } else if (lastVisitedTime > o.lastVisitedTime) { - return 1; - } else { - return 0; - } + return Long.compare(getCompareValue(), o.getCompareValue()); + } + + private long getCompareValue() { + long value = createTime; + if (lastVisitedTime > 0) { + value = lastVisitedTime; } + + value += (Priority.VERY_HIGH.ordinal() - priority.ordinal() + 1) * 60 * 1000L; + value += 5000L * (failedSchedCounter / 10); + + if (type == Type.BALANCE) { + value += 30 * 60 * 1000L; + } + + return value; } @Override @@ -1227,6 +1190,7 @@ public class TabletSchedCtx implements Comparable { * call this when releaseTabletCtx() */ public void resetReplicaState() { + setDecommissionTime(-1); if (tablet != null) { for (Replica replica : tablet.getReplicas()) { // To address issue: https://github.com/apache/doris/issues/9422 @@ -1243,5 +1207,4 @@ public class TabletSchedCtx implements Comparable { } } } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index dbf3d46237..aaaac36a24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -32,10 +32,12 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Partition.PartitionState; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Replica.ReplicaState; +import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.Tablet.TabletStatus; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.clone.SchedException.Status; +import org.apache.doris.clone.SchedException.SubCode; import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletSchedCtx.Type; import org.apache.doris.common.AnalysisException; @@ -59,6 +61,7 @@ import org.apache.doris.thrift.TStatusCode; import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.TransactionState; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.EvictingQueue; import com.google.common.collect.ImmutableMap; @@ -100,21 +103,21 @@ public class TabletScheduler extends MasterDaemon { // the minimum interval of updating cluster statistics and priority of tablet info private static final long STAT_UPDATE_INTERVAL_MS = 20 * 1000; // 20s - private static final long SCHEDULE_INTERVAL_MS = 1000; // 1s + public static final long SCHEDULE_INTERVAL_MS = 100; /* - * Tablet is added to pendingTablets as well it's id in allTabletIds. - * TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletIds when + * Tablet is added to pendingTablets as well it's id in allTabletTypes. + * TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletTypes when * handling a tablet. * Tablet' id can only be removed after the clone task or migration task is done(timeout, cancelled or finished). - * So if a tablet's id is still in allTabletIds, TabletChecker can not add tablet to TabletScheduler. + * So if a tablet's id is still in allTabletTypes, TabletChecker can not add tablet to TabletScheduler. * - * pendingTablets + runningTablets = allTabletIds + * pendingTablets + runningTablets = allTabletTypes * - * pendingTablets, allTabletIds, runningTablets and schedHistory are protected by 'synchronized' + * pendingTablets, allTabletTypes, runningTablets and schedHistory are protected by 'synchronized' */ private PriorityQueue pendingTablets = new PriorityQueue<>(); - private Set allTabletIds = Sets.newHashSet(); + private Map allTabletTypes = Maps.newHashMap(); // contains all tabletCtxs which state are RUNNING private Map runningTablets = Maps.newHashMap(); // save the latest 1000 scheduled tablet info @@ -128,6 +131,8 @@ public class TabletScheduler extends MasterDaemon { private long lastSlotAdjustTime = 0; + private long lastCheckTimeoutTime = 0; + private Env env; private SystemInfoService infoService; private TabletInvertedIndex invertedIndex; @@ -175,7 +180,8 @@ public class TabletScheduler extends MasterDaemon { // when upgrading, backend may not get path info yet. so return false and wait for next round. // and we should check if backend is alive. If backend is dead when upgrading, this backend // will never report its path hash, and tablet scheduler is blocked. - LOG.info("not all backends have path info"); + LOG.info("backend {}:{} with id {} doesn't have path info.", backend.getHost(), + backend.getBePort(), backend.getId()); return false; } } @@ -204,7 +210,7 @@ public class TabletScheduler extends MasterDaemon { if (!backendsWorkingSlots.containsKey(be.getId())) { List pathHashes = be.getDisks().values().stream() .map(DiskInfo::getPathHash).collect(Collectors.toList()); - PathSlot slot = new PathSlot(pathHashes, Config.schedule_slot_num_per_path); + PathSlot slot = new PathSlot(pathHashes, be.getId()); backendsWorkingSlots.put(be.getId(), slot); LOG.info("add new backend {} with slots num: {}", be.getId(), be.getDisks().size()); } @@ -225,7 +231,18 @@ public class TabletScheduler extends MasterDaemon { if (!force && Config.disable_tablet_scheduler) { return AddResult.DISABLED; } - if (!force && containsTablet(tablet.getTabletId())) { + + // REPAIR has higher priority than BALANCE. + // Suppose adding a BALANCE tablet successfully, then adding this tablet's REPAIR ctx will fail. + // But we set allTabletTypes[tabletId] to REPAIR. Later at the beginning of scheduling this tablet, + // it will reset its type as allTabletTypes[tabletId], so its type will convert to REPAIR. + + long tabletId = tablet.getTabletId(); + boolean contains = allTabletTypes.containsKey(tabletId); + if (contains && !force) { + if (tablet.getType() == TabletSchedCtx.Type.REPAIR) { + allTabletTypes.put(tabletId, TabletSchedCtx.Type.REPAIR); + } return AddResult.ALREADY_IN; } @@ -238,13 +255,22 @@ public class TabletScheduler extends MasterDaemon { return AddResult.LIMIT_EXCEED; } - allTabletIds.add(tablet.getTabletId()); + if (!contains || tablet.getType() == TabletSchedCtx.Type.REPAIR) { + allTabletTypes.put(tabletId, tablet.getType()); + } + pendingTablets.offer(tablet); + if (!contains) { + LOG.info("Add tablet to pending queue, tablet id {}, type {}, status {}, priority {}", + tablet.getTabletId(), tablet.getType(), tablet.getTabletStatus(), + tablet.getPriority()); + } + return AddResult.ADDED; } public synchronized boolean containsTablet(long tabletId) { - return allTabletIds.contains(tabletId); + return allTabletTypes.containsKey(tabletId); } public synchronized void rebalanceDisk(AdminRebalanceDiskStmt stmt) { @@ -263,7 +289,7 @@ public class TabletScheduler extends MasterDaemon { for (TabletSchedCtx tabletCtx : pendingTablets) { if (tabletCtx.getDbId() == dbId && tabletCtx.getTblId() == tblId && partitionIds.contains(tabletCtx.getPartitionId())) { - tabletCtx.setOrigPriority(Priority.VERY_HIGH); + tabletCtx.setPriority(Priority.VERY_HIGH); } newPendingTablets.add(tabletCtx); } @@ -293,14 +319,15 @@ public class TabletScheduler extends MasterDaemon { return; } - updateLoadStatisticsAndPriorityIfNecessary(); + if (System.currentTimeMillis() - lastCheckTimeoutTime >= 1000L) { + updateLoadStatisticsAndPriorityIfNecessary(); + handleRunningTablets(); + selectTabletsForBalance(); + lastCheckTimeoutTime = System.currentTimeMillis(); + } schedulePendingTablets(); - handleRunningTablets(); - - selectTabletsForBalance(); - stat.counterTabletScheduleRound.incrementAndGet(); } @@ -314,8 +341,6 @@ public class TabletScheduler extends MasterDaemon { rebalancer.updateLoadStatistic(statisticMap); diskRebalancer.updateLoadStatistic(statisticMap); - adjustPriorities(); - lastStatUpdateTime = System.currentTimeMillis(); } @@ -332,7 +357,7 @@ public class TabletScheduler extends MasterDaemon { LoadStatisticForTag loadStatistic = new LoadStatisticForTag(tag, infoService, invertedIndex); loadStatistic.init(); newStatisticMap.put(tag, loadStatistic); - LOG.debug("update load statistic:\n{}", loadStatistic.getBrief()); + LOG.debug("update load statistic for tag {}:\n{}", tag, loadStatistic.getBrief()); } this.statisticMap = newStatisticMap; @@ -342,28 +367,6 @@ public class TabletScheduler extends MasterDaemon { return statisticMap; } - /** - * adjust priorities of all tablet infos - */ - private synchronized void adjustPriorities() { - int size = pendingTablets.size(); - int changedNum = 0; - TabletSchedCtx tabletCtx; - for (int i = 0; i < size; i++) { - tabletCtx = pendingTablets.poll(); - if (tabletCtx == null) { - break; - } - - if (tabletCtx.adjustPriority(stat)) { - changedNum++; - } - pendingTablets.add(tabletCtx); - } - - LOG.debug("adjust priority for all tablets. changed: {}, total: {}", changedNum, size); - } - /** * get at most BATCH_NUM tablets from queue, and try to schedule them. * After handle, the tablet info should be @@ -371,7 +374,7 @@ public class TabletScheduler extends MasterDaemon { * 2. or in schedHistory with state CANCELLING, if some unrecoverable error happens. * 3. or in pendingTablets with state PENDING, if failed to be scheduled. * - * if in schedHistory, it should be removed from allTabletIds. + * if in schedHistory, it should be removed from allTabletTypes. */ private void schedulePendingTablets() { long start = System.currentTimeMillis(); @@ -385,36 +388,25 @@ public class TabletScheduler extends MasterDaemon { // do not schedule more tablet is tablet scheduler is disabled. throw new SchedException(Status.FINISHED, "tablet scheduler is disabled"); } + if (Config.disable_balance && tabletCtx.getType() == Type.BALANCE) { + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE, + "config disable balance"); + continue; + } scheduleTablet(tabletCtx, batchTask); } catch (SchedException e) { - tabletCtx.increaseFailedSchedCounter(); tabletCtx.setErrMsg(e.getMessage()); - if (e.getStatus() == Status.SCHEDULE_FAILED) { - if (tabletCtx.getType() == Type.BALANCE) { - // if balance is disabled, remove this tablet - if (Config.disable_balance) { - finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(), - "disable balance and " + e.getMessage()); - } else { - // remove the balance task if it fails to be scheduled many times - if (tabletCtx.getFailedSchedCounter() > 10) { - finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(), - "schedule failed too many times and " + e.getMessage()); - } else { - // we must release resource it current hold, and be scheduled again - tabletCtx.releaseResource(this); - // adjust priority to avoid some higher priority always be the first in pendingTablets - stat.counterTabletScheduledFailed.incrementAndGet(); - dynamicAdjustPrioAndAddBackToPendingTablets(tabletCtx, e.getMessage()); - } - } + boolean isExceedLimit = tabletCtx.onSchedFailedAndCheckExceedLimit(e.getSubCode()); + if (isExceedLimit) { + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(), + "schedule failed too many times and " + e.getMessage()); } else { // we must release resource it current hold, and be scheduled again tabletCtx.releaseResource(this); // adjust priority to avoid some higher priority always be the first in pendingTablets stat.counterTabletScheduledFailed.incrementAndGet(); - dynamicAdjustPrioAndAddBackToPendingTablets(tabletCtx, e.getMessage()); + addBackToPendingTablets(tabletCtx); } } else if (e.getStatus() == Status.FINISHED) { // schedule redundant tablet or scheduler disabled will throw this exception @@ -485,6 +477,8 @@ public class TabletScheduler extends MasterDaemon { tbl.writeLockOrException(new SchedException(Status.UNRECOVERABLE, "table " + tbl.getName() + " does not exist")); try { + long tabletId = tabletCtx.getTabletId(); + boolean isColocateTable = colocateTableIndex.isColocateTable(tbl.getId()); OlapTableState tableState = tbl.getState(); @@ -499,8 +493,9 @@ public class TabletScheduler extends MasterDaemon { throw new SchedException(Status.UNRECOVERABLE, "index does not exist"); } - Tablet tablet = idx.getTablet(tabletCtx.getTabletId()); + Tablet tablet = idx.getTablet(tabletId); Preconditions.checkNotNull(tablet); + ReplicaAllocation replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId()); if (isColocateTable) { GroupId groupId = colocateTableIndex.getGroup(tbl.getId()); @@ -516,18 +511,26 @@ public class TabletScheduler extends MasterDaemon { Set backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx); TabletStatus st = tablet.getColocateHealthStatus( - partition.getVisibleVersion(), - tbl.getPartitionInfo().getReplicaAllocation(partition.getId()), - backendsSet); + partition.getVisibleVersion(), replicaAlloc, backendsSet); statusPair = Pair.of(st, Priority.HIGH); tabletCtx.setColocateGroupBackendIds(backendsSet); } else { List aliveBeIds = infoService.getAllBackendIds(true); statusPair = tablet.getHealthStatusWithPriority( - infoService, - partition.getVisibleVersion(), - tbl.getPartitionInfo().getReplicaAllocation(partition.getId()), - aliveBeIds); + infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds); + } + + if (tabletCtx.getType() != allTabletTypes.get(tabletId)) { + TabletSchedCtx.Type curType = tabletCtx.getType(); + TabletSchedCtx.Type newType = allTabletTypes.get(tabletId); + if (curType == TabletSchedCtx.Type.BALANCE && newType == TabletSchedCtx.Type.REPAIR) { + tabletCtx.setType(newType); + tabletCtx.setReplicaAlloc(replicaAlloc); + tabletCtx.setTag(null); + } else { + throw new SchedException(Status.UNRECOVERABLE, "can not convert type of tablet " + + tabletId + " from " + curType.name() + " to " + newType.name()); + } } if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE && tableState != OlapTableState.NORMAL) { @@ -732,11 +735,11 @@ public class TabletScheduler extends MasterDaemon { private void handleReplicaVersionIncomplete(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException { stat.counterReplicaVersionMissingErr.incrementAndGet(); - try { tabletCtx.chooseDestReplicaForVersionIncomplete(backendsWorkingSlots); } catch (SchedException e) { - if (e.getMessage().equals("unable to choose dest replica")) { + // could not find dest, try add a missing. + if (e.getStatus() == Status.UNRECOVERABLE) { // This situation may occur when the BE nodes // where all replicas of a tablet are located are decommission, // and this task is a VERSION_INCOMPLETE task. @@ -779,25 +782,7 @@ public class TabletScheduler extends MasterDaemon { private void handleReplicaRelocating(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException { stat.counterReplicaUnavailableErr.incrementAndGet(); - try { - handleReplicaVersionIncomplete(tabletCtx, batchTask); - LOG.debug("succeed to find version incomplete replica from tablet relocating. tablet id: {}", - tabletCtx.getTabletId()); - } catch (SchedException e) { - if (e.getStatus() == Status.SCHEDULE_FAILED) { - LOG.debug("failed to find version incomplete replica from tablet relocating. tablet id: {}, " - + "try to find a new backend", tabletCtx.getTabletId()); - // the dest or src slot may be taken after calling handleReplicaVersionIncomplete(), - // so we need to release these slots first. - // and reserve the tablet in TabletSchedCtx so that it can continue to be scheduled. - tabletCtx.releaseResource(this, true); - tabletCtx.setTabletStatus(TabletStatus.REPLICA_MISSING); - handleReplicaMissing(tabletCtx, batchTask); - LOG.debug("succeed to find new backend for tablet relocating. tablet id: {}", tabletCtx.getTabletId()); - } else { - throw e; - } - } + handleReplicaVersionIncomplete(tabletCtx, batchTask); } /** @@ -831,7 +816,7 @@ public class TabletScheduler extends MasterDaemon { // to remove this tablet from the pendingTablets(consider it as finished) throw new SchedException(Status.FINISHED, "redundant replica is deleted"); } - throw new SchedException(Status.SCHEDULE_FAILED, "unable to delete any redundant replicas"); + throw new SchedException(Status.UNRECOVERABLE, "unable to delete any redundant replicas"); } private boolean deleteBackendDropped(TabletSchedCtx tabletCtx, boolean force) throws SchedException { @@ -1035,7 +1020,7 @@ public class TabletScheduler extends MasterDaemon { deleteReplicaInternal(tabletCtx, replica, "colocate redundant", false); throw new SchedException(Status.FINISHED, "colocate redundant replica is deleted"); } - throw new SchedException(Status.SCHEDULE_FAILED, "unable to delete any colocate redundant replicas"); + throw new SchedException(Status.UNRECOVERABLE, "unable to delete any colocate redundant replicas"); } /** @@ -1102,16 +1087,17 @@ public class TabletScheduler extends MasterDaemon { replica.setWatermarkTxnId(nextTxnId); replica.setState(ReplicaState.DECOMMISSION); // set priority to normal because it may wait for a long time. Remain it as VERY_HIGH may block other task. - tabletCtx.setOrigPriority(Priority.NORMAL); + tabletCtx.setPriority(Priority.NORMAL); LOG.info("set replica {} on backend {} of tablet {} state to DECOMMISSION due to reason {}", replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), reason); - throw new SchedException(Status.SCHEDULE_FAILED, "set watermark txn " + nextTxnId); + throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION, + "set watermark txn " + nextTxnId); } else if (replica.getState() == ReplicaState.DECOMMISSION && replica.getWatermarkTxnId() != -1) { long watermarkTxnId = replica.getWatermarkTxnId(); try { if (!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId, tabletCtx.getDbId(), Lists.newArrayList(tabletCtx.getTblId()))) { - throw new SchedException(Status.SCHEDULE_FAILED, + throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION, "wait txn before " + watermarkTxnId + " to be finished"); } } catch (AnalysisException e) { @@ -1225,7 +1211,7 @@ public class TabletScheduler extends MasterDaemon { } for (TabletSchedCtx tabletCtx : diskBalanceTablets) { // add if task from prio backend or cluster is balanced - if (alternativeTablets.isEmpty() || tabletCtx.getOrigPriority() == TabletSchedCtx.Priority.NORMAL) { + if (alternativeTablets.isEmpty() || tabletCtx.getPriority() == TabletSchedCtx.Priority.NORMAL) { addTablet(tabletCtx, false); } } @@ -1260,7 +1246,8 @@ public class TabletScheduler extends MasterDaemon { LoadStatisticForTag statistic = statisticMap.get(tag); if (statistic == null) { throw new SchedException(Status.UNRECOVERABLE, - String.format("tag %s does not exist.", tag)); + String.format("tag %s does not exist. available tags: %s", tag, + Joiner.on(",").join(statisticMap.keySet().stream().limit(5).toArray()))); } beStatistics = statistic.getSortedBeLoadStats(null /* sorted ignore medium */); } else { @@ -1336,7 +1323,7 @@ public class TabletScheduler extends MasterDaemon { } if (allFitPaths.isEmpty()) { - throw new SchedException(Status.SCHEDULE_FAILED, "unable to find dest path for new replica"); + throw new SchedException(Status.UNRECOVERABLE, "unable to find dest path for new replica"); } // all fit paths has already been sorted by load score in 'allFitPaths' in ascend order. @@ -1390,25 +1377,109 @@ public class TabletScheduler extends MasterDaemon { return rootPathLoadStatistic; } - throw new SchedException(Status.SCHEDULE_FAILED, "unable to find dest path which can be fit in"); + throw new SchedException(Status.UNRECOVERABLE, "unable to find dest path which can be fit in"); } - /** - * For some reason, a tablet info failed to be scheduled this time, - * So we dynamically change its priority and add back to queue, waiting for next round. - */ - private void dynamicAdjustPrioAndAddBackToPendingTablets(TabletSchedCtx tabletCtx, String message) { + + private void addBackToPendingTablets(TabletSchedCtx tabletCtx) { Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.PENDING); - tabletCtx.adjustPriority(stat); addTablet(tabletCtx, true /* force */); } + private void finalizeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, Status status, String reason) { // use 2 steps to avoid nested database lock and synchronized.(releaseTabletCtx() may hold db lock) // remove the tablet ctx, so that no other process can see it removeTabletCtx(tabletCtx, reason); // release resources taken by tablet ctx releaseTabletCtx(tabletCtx, state, status == Status.UNRECOVERABLE); + + // if check immediately, then no need to wait TabletChecker's 20s + if (state == TabletSchedCtx.State.FINISHED) { + tryAddAfterFinished(tabletCtx); + } + } + + private void tryAddAfterFinished(TabletSchedCtx tabletCtx) { + int finishedCounter = tabletCtx.getFinishedCounter(); + finishedCounter++; + tabletCtx.setFinishedCounter(finishedCounter); + if (finishedCounter >= TabletSchedCtx.FINISHED_COUNTER_THRESHOLD) { + return; + } + + Database db = Env.getCurrentInternalCatalog().getDbNullable(tabletCtx.getDbId()); + if (db == null) { + return; + } + OlapTable tbl = (OlapTable) db.getTableNullable(tabletCtx.getTblId()); + if (tbl == null) { + return; + } + Pair statusPair; + ReplicaAllocation replicaAlloc = null; + tbl.readLock(); + try { + Partition partition = tbl.getPartition(tabletCtx.getPartitionId()); + if (partition == null) { + return; + } + + MaterializedIndex idx = partition.getIndex(tabletCtx.getIndexId()); + if (idx == null) { + return; + } + + Tablet tablet = idx.getTablet(tabletCtx.getTabletId()); + if (tablet == null) { + return; + } + + replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId()); + boolean isColocateTable = colocateTableIndex.isColocateTable(tbl.getId()); + if (isColocateTable) { + GroupId groupId = colocateTableIndex.getGroup(tbl.getId()); + if (groupId == null) { + return; + } + + int tabletOrderIdx = tabletCtx.getTabletOrderIdx(); + if (tabletOrderIdx == -1) { + tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId()); + } + Preconditions.checkState(tabletOrderIdx != -1); + + Set backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx); + TabletStatus st = tablet.getColocateHealthStatus( + partition.getVisibleVersion(), replicaAlloc, backendsSet); + statusPair = Pair.of(st, Priority.HIGH); + } else { + List aliveBeIds = infoService.getAllBackendIds(true); + statusPair = tablet.getHealthStatusWithPriority( + infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds); + + if (statusPair.second.ordinal() < tabletCtx.getPriority().ordinal()) { + statusPair.second = tabletCtx.getPriority(); + } + } + } finally { + tbl.readUnlock(); + } + + if (statusPair.first == TabletStatus.HEALTHY) { + return; + } + + TabletSchedCtx newTabletCtx = new TabletSchedCtx( + TabletSchedCtx.Type.REPAIR, tabletCtx.getDbId(), tabletCtx.getTblId(), + tabletCtx.getPartitionId(), tabletCtx.getIndexId(), tabletCtx.getTabletId(), + replicaAlloc, System.currentTimeMillis()); + + newTabletCtx.setTabletStatus(statusPair.first); + newTabletCtx.setPriority(statusPair.second); + newTabletCtx.setFinishedCounter(finishedCounter); + + addTablet(newTabletCtx, false); } private void releaseTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, boolean resetReplicaState) { @@ -1422,7 +1493,7 @@ public class TabletScheduler extends MasterDaemon { private synchronized void removeTabletCtx(TabletSchedCtx tabletCtx, String reason) { runningTablets.remove(tabletCtx.getTabletId()); - allTabletIds.remove(tabletCtx.getTabletId()); + allTabletTypes.remove(tabletCtx.getTabletId()); schedHistory.add(tabletCtx); LOG.info("remove the tablet {}. because: {}", tabletCtx.getTabletId(), reason); } @@ -1430,15 +1501,27 @@ public class TabletScheduler extends MasterDaemon { // get next batch of tablets from queue. private synchronized List getNextTabletCtxBatch() { List list = Lists.newArrayList(); - int count = Math.min(MIN_BATCH_NUM, getCurrentAvailableSlotNum()); - while (count > 0) { + int slotNum = getCurrentAvailableSlotNum(); + // Make slotNum >= 1 to ensure that it could return at least 1 ctx + // when the pending list is not empty. + if (slotNum < 1) { + slotNum = 1; + } + while (list.size() < Config.schedule_batch_size && slotNum > 0) { TabletSchedCtx tablet = pendingTablets.poll(); if (tablet == null) { // no more tablets break; } list.add(tablet); - count--; + TabletStatus status = tablet.getTabletStatus(); + // for a clone, it will take 2 slots: src slot and dst slot. + if (!(status == TabletStatus.REDUNDANT + || status == TabletStatus.FORCE_REDUNDANT + || status == TabletStatus.COLOCATE_REDUNDANT + || status == TabletStatus.REPLICA_COMPACTION_TOO_SLOW)) { + slotNum -= 2; + } } return list; } @@ -1469,9 +1552,12 @@ public class TabletScheduler extends MasterDaemon { // if we have a success task, then stat must be refreshed before schedule a new task updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash()); updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash()); + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "finished"); + } else { + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE, + request.getTaskStatus().getErrorMsgs().get(0)); } - // we need this function to free slot for this migration task - finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "finished"); + return true; } @@ -1496,12 +1582,20 @@ public class TabletScheduler extends MasterDaemon { try { tabletCtx.finishCloneTask(cloneTask, request); } catch (SchedException e) { - tabletCtx.increaseFailedRunningCounter(); tabletCtx.setErrMsg(e.getMessage()); if (e.getStatus() == Status.RUNNING_FAILED) { - stat.counterCloneTaskFailed.incrementAndGet(); - addToRunningTablets(tabletCtx); - return false; + tabletCtx.increaseFailedRunningCounter(); + if (!tabletCtx.isExceedFailedRunningLimit()) { + stat.counterCloneTaskFailed.incrementAndGet(); + addToRunningTablets(tabletCtx); + return false; + } else { + // unrecoverable + stat.counterTabletScheduledDiscard.incrementAndGet(); + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE, + e.getMessage()); + return true; + } } else if (e.getStatus() == Status.UNRECOVERABLE) { // unrecoverable stat.counterTabletScheduledDiscard.incrementAndGet(); @@ -1639,7 +1733,7 @@ public class TabletScheduler extends MasterDaemon { } public synchronized int getTotalNum() { - return allTabletIds.size(); + return allTabletTypes.size(); } public synchronized long getBalanceTabletsNumber() { @@ -1655,10 +1749,12 @@ public class TabletScheduler extends MasterDaemon { public static class PathSlot { // path hash -> slot num private Map pathSlots = Maps.newConcurrentMap(); + private long beId; - public PathSlot(List paths, int initSlotNum) { + public PathSlot(List paths, long beId) { + this.beId = beId; for (Long pathHash : paths) { - pathSlots.put(pathHash, new Slot(initSlotNum)); + pathSlots.put(pathHash, new Slot(beId)); } } @@ -1670,25 +1766,11 @@ public class TabletScheduler extends MasterDaemon { // add new path for (Long pathHash : paths) { if (!pathSlots.containsKey(pathHash)) { - pathSlots.put(pathHash, new Slot(Config.schedule_slot_num_per_path)); + pathSlots.put(pathHash, new Slot(beId)); } } } - // Update the total slots num of specified paths, increase or decrease - public synchronized void updateSlot(List pathHashs, int delta) { - for (Long pathHash : pathHashs) { - Slot slot = pathSlots.get(pathHash); - if (slot == null) { - continue; - } - - slot.total += delta; - slot.rectify(); - LOG.debug("decrease path {} slots num to {}", pathHash, pathSlots.get(pathHash).total); - } - } - /** * Update the statistic of specified path */ @@ -1701,6 +1783,21 @@ public class TabletScheduler extends MasterDaemon { slot.totalCopyTimeMs += copyTimeMs; } + public synchronized boolean hasAvailableSlot(long pathHash) { + if (pathHash == -1) { + return false; + } + + Slot slot = pathSlots.get(pathHash); + if (slot == null) { + return false; + } + if (slot.getAvailable() == 0) { + return false; + } + return true; + } + /** * If the specified 'pathHash' has available slot, decrease the slot number and return this path hash */ @@ -1709,7 +1806,8 @@ public class TabletScheduler extends MasterDaemon { if (LOG.isDebugEnabled()) { LOG.debug("path hash is not set.", new Exception()); } - throw new SchedException(Status.SCHEDULE_FAILED, "path hash is not set"); + throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT, + "path hash is not set"); } Slot slot = pathSlots.get(pathHash); @@ -1717,12 +1815,11 @@ public class TabletScheduler extends MasterDaemon { LOG.debug("path {} is not exist", pathHash); return -1; } - slot.rectify(); - if (slot.available <= 0) { + if (slot.used >= slot.getTotal()) { LOG.debug("path {} has no available slot", pathHash); return -1; } - slot.available--; + slot.used++; return pathHash; } @@ -1731,23 +1828,15 @@ public class TabletScheduler extends MasterDaemon { if (slot == null) { return; } - slot.available++; - slot.rectify(); - } - - public synchronized int peekSlot(long pathHash) { - Slot slot = pathSlots.get(pathHash); - if (slot == null) { - return -1; + if (slot.used > 0) { + slot.used--; } - slot.rectify(); - return slot.available; } public synchronized int getTotalAvailSlotNum() { int total = 0; for (Slot slot : pathSlots.values()) { - total += slot.available; + total += slot.getAvailable(); } return total; } @@ -1758,7 +1847,7 @@ public class TabletScheduler extends MasterDaemon { public synchronized Set getAvailPathsForBalance() { Set pathHashs = Sets.newHashSet(); for (Map.Entry entry : pathSlots.entrySet()) { - if (entry.getValue().balanceSlot > 0) { + if (entry.getValue().getBalanceAvailable() > 0) { pathHashs.add(entry.getKey()); } } @@ -1768,7 +1857,7 @@ public class TabletScheduler extends MasterDaemon { public synchronized int getAvailBalanceSlotNum() { int num = 0; for (Map.Entry entry : pathSlots.entrySet()) { - num += entry.getValue().balanceSlot; + num += entry.getValue().getBalanceAvailable(); } return num; } @@ -1776,13 +1865,12 @@ public class TabletScheduler extends MasterDaemon { public synchronized List> getSlotInfo(long beId) { List> results = Lists.newArrayList(); pathSlots.forEach((key, value) -> { - value.rectify(); List result = Lists.newArrayList(); result.add(String.valueOf(beId)); result.add(String.valueOf(key)); - result.add(String.valueOf(value.available)); - result.add(String.valueOf(value.total)); - result.add(String.valueOf(value.balanceSlot)); + result.add(String.valueOf(value.getAvailable())); + result.add(String.valueOf(value.getTotal())); + result.add(String.valueOf(value.getBalanceAvailable())); result.add(String.valueOf(value.getAvgRate())); results.add(result); }); @@ -1794,8 +1882,8 @@ public class TabletScheduler extends MasterDaemon { if (slot == null) { return -1; } - if (slot.balanceSlot > 0) { - slot.balanceSlot--; + if (slot.balanceUsed < slot.getBalanceTotal()) { + slot.balanceUsed++; return pathHash; } return -1; @@ -1807,8 +1895,8 @@ public class TabletScheduler extends MasterDaemon { if (slot == null) { continue; } - if (slot.balanceSlot > 0) { - slot.balanceSlot--; + if (slot.balanceUsed < slot.getBalanceTotal()) { + slot.balanceUsed++; return pathHash; } } @@ -1820,8 +1908,9 @@ public class TabletScheduler extends MasterDaemon { if (slot == null) { return; } - slot.balanceSlot++; - slot.rectify(); + if (slot.balanceUsed > 0) { + slot.balanceUsed--; + } } public synchronized void updateDiskBalanceLastSuccTime(long pathHash) { @@ -1851,10 +1940,8 @@ public class TabletScheduler extends MasterDaemon { } public static class Slot { - public int total; - public int available; - // slot reserved for balance - public int balanceSlot; + public int used; + public int balanceUsed; public long totalCopySize = 0; public long totalCopyTimeMs = 0; @@ -1862,23 +1949,35 @@ public class TabletScheduler extends MasterDaemon { // for disk balance public long diskBalanceLastSuccTime = 0; - public Slot(int total) { - this.total = total; - this.available = total; - this.balanceSlot = Config.balance_slot_num_per_path; + private long beId; + + public Slot(long beId) { + this.beId = beId; + this.used = 0; + this.balanceUsed = 0; } - public void rectify() { - if (total <= 0) { - total = 1; - } - if (available > total) { - available = total; + public int getAvailable() { + return Math.max(0, getTotal() - used); + } + + public int getTotal() { + int total = Math.max(1, Config.schedule_slot_num_per_path); + + Backend be = Env.getCurrentSystemInfo().getBackend(beId); + if (be != null && be.isDecommissioned()) { + total = Math.max(1, Config.schedule_decommission_slot_num_per_path); } - if (balanceSlot > Config.balance_slot_num_per_path) { - balanceSlot = Config.balance_slot_num_per_path; - } + return total; + } + + public int getBalanceAvailable() { + return Math.max(0, getBalanceTotal() - balanceUsed); + } + + public int getBalanceTotal() { + return Math.max(1, Config.balance_slot_num_per_path); } // return avg rate, Bytes/S diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java index e2aada6f1f..4441a99431 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java @@ -29,15 +29,15 @@ import com.google.common.collect.Lists; import java.util.List; /* - * show proc "/tablet_scheduler/pending_tablets"; - * show proc "/tablet_scheduler/running_tablets"; - * show proc "/tablet_scheduler/history_tablets"; + * show proc "/cluster_balance/pending_tablets"; + * show proc "/cluster_balance/running_tablets"; + * show proc "/cluster_balance/history_tablets"; */ public class TabletSchedulerDetailProcDir implements ProcDirInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder().add("TabletId") - .add("Type").add("Medium").add("Status").add("State").add("OrigPrio").add("DynmPrio").add("SrcBe") + .add("Type").add("Medium").add("Status").add("State").add("SchedCode").add("Priority").add("SrcBe") .add("SrcPath").add("DestBe").add("DestPath").add("Timeout").add("Create").add("LstSched").add("LstVisit") - .add("Finished").add("Rate").add("FailedSched").add("FailedRunning").add("LstAdjPrio").add("VisibleVer") + .add("Finished").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer") .add("CmtVer").add("ErrMsg") .build(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java index 41df080a75..d4578e17d7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java @@ -39,17 +39,17 @@ public class TabletSchedCtxTest { ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; TabletSchedCtx ctx1 = new TabletSchedCtx(Type.REPAIR, 1, 2, 3, 4, 1000, replicaAlloc, System.currentTimeMillis()); - ctx1.setOrigPriority(Priority.NORMAL); + ctx1.setPriority(Priority.NORMAL); ctx1.setLastVisitedTime(2); TabletSchedCtx ctx2 = new TabletSchedCtx(Type.REPAIR, 1, 2, 3, 4, 1001, replicaAlloc, System.currentTimeMillis()); - ctx2.setOrigPriority(Priority.NORMAL); + ctx2.setPriority(Priority.NORMAL); ctx2.setLastVisitedTime(3); TabletSchedCtx ctx3 = new TabletSchedCtx(Type.REPAIR, - 1, 2, 3, 4, 1001, replicaAlloc, System.currentTimeMillis()); - ctx3.setOrigPriority(Priority.NORMAL); + 1, 2, 3, 4, 1002, replicaAlloc, System.currentTimeMillis()); + ctx3.setPriority(Priority.NORMAL); ctx3.setLastVisitedTime(1); pendingTablets.add(ctx1); @@ -62,8 +62,8 @@ public class TabletSchedCtxTest { // priority is not equal, info2 is HIGH, should ranks ahead pendingTablets.clear(); - ctx1.setOrigPriority(Priority.NORMAL); - ctx2.setOrigPriority(Priority.HIGH); + ctx1.setPriority(Priority.NORMAL); + ctx2.setPriority(Priority.HIGH); ctx1.setLastVisitedTime(2); ctx2.setLastVisitedTime(2); pendingTablets.add(ctx2); diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index 95fb22eac6..209f8a1127 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -33,7 +33,6 @@ import org.junit.jupiter.api.Test; import java.util.List; public class DecommissionBackendTest extends TestWithFeService { - @Override protected int backendNum() { return 3; @@ -42,6 +41,7 @@ public class DecommissionBackendTest extends TestWithFeService { @Override protected void beforeCluster() { FeConstants.runningUnitTest = true; + needCleanDir = false; } @BeforeAll diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index ac2fe5978d..f2610738f3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -119,6 +119,7 @@ public abstract class TestWithFeService { protected String dorisHome; protected String runningDir = "fe/mocked/" + getClass().getSimpleName() + "/" + UUID.randomUUID() + "/"; protected ConnectContext connectContext; + protected boolean needCleanDir = true; protected static final String DEFAULT_CLUSTER_PREFIX = "default_cluster:"; @@ -140,7 +141,9 @@ public abstract class TestWithFeService { runAfterAll(); Env.getCurrentEnv().clear(); StatementScopeIdGenerator.clear(); - cleanDorisFeDir(); + if (needCleanDir) { + cleanDorisFeDir(); + } } @BeforeEach