[Improvement](tablet clone) impr tablet sched speed and fix tablet sched failed too many times (#21856)

This commit is contained in:
yujun
2023-07-18 23:25:22 +08:00
committed by GitHub
parent dcb165cc9f
commit beec0e9169
15 changed files with 491 additions and 372 deletions

View File

@ -102,6 +102,7 @@ class DBManager(object):
def decommission_be(self, be_endpoint):
old_tablet_num = 0
id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
start_ts = time.time()
if id not in self.be_states:
self._load_be_states()
if id in self.be_states:
@ -132,8 +133,9 @@ class DBManager(object):
return
LOG.info(
"Decommission be {} status: alive {}, decommissioned {}. " \
"It is migrating its tablets, left {}/{} tablets."
.format(be_endpoint, be.alive, be.decommissioned, be.tablet_num, old_tablet_num))
"It is migrating its tablets, left {}/{} tablets. Time elapse {} s."
.format(be_endpoint, be.alive, be.decommissioned, be.tablet_num, old_tablet_num,
int(time.time() - start_ts)))
time.sleep(5)
@ -189,7 +191,7 @@ class DBManager(object):
def _reset_conn(self):
self.conn = pymysql.connect(user="root",
host="127.0.0.1",
read_timeout = 10,
read_timeout=10,
port=self.query_port)
@ -234,6 +236,6 @@ def get_db_mgr(cluster_name, required_load_succ=True):
except Exception as e:
if required_load_succ:
raise e
LOG.exception(e)
#LOG.exception(e)
return db_mgr

View File

@ -904,7 +904,21 @@ public class Config extends ConfigBase {
* the default slot number per path in tablet scheduler
* TODO(cmy): remove this config and dynamically adjust it by clone task statistic
*/
@ConfField public static int schedule_slot_num_per_path = 2;
@ConfField(mutable = true, masterOnly = true)
public static int schedule_slot_num_per_path = 4;
/**
* the default slot number per path in tablet scheduler for decommission backend
*/
@ConfField(mutable = true, masterOnly = true)
public static int schedule_decommission_slot_num_per_path = 8;
/**
* the default batch size in tablet scheduler for a single schedule.
*/
@ConfField(mutable = true, masterOnly = true)
public static int schedule_batch_size = 50;
/**
* Deprecated after 0.10

View File

@ -703,11 +703,24 @@ public class Tablet extends MetaObject implements Writable {
* NORMAL: delay Config.tablet_repair_delay_factor_second * 2;
* LOW: delay Config.tablet_repair_delay_factor_second * 3;
*/
public boolean readyToBeRepaired(TabletSchedCtx.Priority priority) {
public boolean readyToBeRepaired(SystemInfoService infoService, TabletSchedCtx.Priority priority) {
if (priority == Priority.VERY_HIGH) {
return true;
}
boolean allBeAliveOrDecommissioned = true;
for (Replica replica : replicas) {
Backend backend = infoService.getBackend(replica.getBackendId());
if (backend == null || (!backend.isAlive() && !backend.isDecommissioned())) {
allBeAliveOrDecommissioned = false;
break;
}
}
if (allBeAliveOrDecommissioned) {
return true;
}
long currentTime = System.currentTimeMillis();
// first check, wait for next round

View File

@ -23,6 +23,7 @@ import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.SchedException.Status;
import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
@ -166,7 +167,7 @@ public class BeLoadRebalancer extends Rebalancer {
System.currentTimeMillis());
tabletCtx.setTag(clusterStat.getTag());
// balance task's priority is always LOW
tabletCtx.setOrigPriority(Priority.LOW);
tabletCtx.setPriority(Priority.LOW);
alternativeTablets.add(tabletCtx);
if (--numOfLowPaths <= 0) {
@ -262,7 +263,7 @@ public class BeLoadRebalancer extends Rebalancer {
}
// Select a low load backend as destination.
boolean setDest = false;
List<BackendLoadStatistic> candidates = Lists.newArrayList();
for (BackendLoadStatistic beStat : lowBe) {
if (beStat.isAvailable() && replicas.stream().noneMatch(r -> r.getBackendId() == beStat.getBeId())) {
// check if on same host.
@ -296,27 +297,36 @@ public class BeLoadRebalancer extends Rebalancer {
continue;
}
// classify the paths.
// And we only select path from 'low' and 'mid' paths
Set<Long> pathLow = Sets.newHashSet();
Set<Long> pathMid = Sets.newHashSet();
Set<Long> pathHigh = Sets.newHashSet();
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
pathLow.addAll(pathMid);
long pathHash = slot.takeAnAvailBalanceSlotFrom(pathLow);
if (pathHash == -1) {
LOG.debug("paths has no available balance slot: {}", pathLow);
} else {
tabletCtx.setDest(beStat.getBeId(), pathHash);
setDest = true;
break;
}
candidates.add(beStat);
}
}
if (!setDest) {
throw new SchedException(Status.SCHEDULE_FAILED, "unable to find low backend");
if (candidates.isEmpty()) {
throw new SchedException(Status.UNRECOVERABLE, "unable to find low backend");
}
for (BackendLoadStatistic beStat : candidates) {
PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
if (slot == null) {
continue;
}
// classify the paths.
// And we only select path from 'low' and 'mid' paths
Set<Long> pathLow = Sets.newHashSet();
Set<Long> pathMid = Sets.newHashSet();
Set<Long> pathHigh = Sets.newHashSet();
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
pathLow.addAll(pathMid);
long pathHash = slot.takeAnAvailBalanceSlotFrom(pathLow);
if (pathHash != -1) {
tabletCtx.setDest(beStat.getBeId(), pathHash);
return;
}
}
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
"unable to find low backend");
}
}

View File

@ -202,6 +202,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
*/
private void matchGroup() {
Env env = Env.getCurrentEnv();
SystemInfoService infoService = Env.getCurrentSystemInfo();
ColocateTableIndex colocateIndex = env.getColocateTableIndex();
TabletScheduler tabletScheduler = env.getTabletScheduler();
@ -254,7 +255,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
+ " status: %s", tablet.getId(), st);
LOG.debug(unstableReason);
if (!tablet.readyToBeRepaired(Priority.NORMAL)) {
if (!tablet.readyToBeRepaired(infoService, Priority.NORMAL)) {
continue;
}
@ -265,7 +266,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
System.currentTimeMillis());
// the tablet status will be set again when being scheduled
tabletCtx.setTabletStatus(st);
tabletCtx.setOrigPriority(Priority.NORMAL);
tabletCtx.setPriority(Priority.NORMAL);
tabletCtx.setTabletOrderIdx(idx);
AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */);

View File

@ -208,10 +208,10 @@ public class DiskRebalancer extends Rebalancer {
tabletCtx.setTag(clusterStat.getTag());
if (prioBackends.containsKey(beStat.getBeId())) {
// priority of balance task of prio BE is NORMAL
tabletCtx.setOrigPriority(Priority.NORMAL);
tabletCtx.setPriority(Priority.NORMAL);
} else {
// balance task's default priority is LOW
tabletCtx.setOrigPriority(Priority.LOW);
tabletCtx.setPriority(Priority.LOW);
}
// we must set balanceType to DISK_BALANCE for create migration task
tabletCtx.setBalanceType(BalanceType.DISK_BALANCE);

View File

@ -151,7 +151,7 @@ public class PartitionRebalancer extends Rebalancer {
System.currentTimeMillis());
tabletCtx.setTag(clusterStat.getTag());
// Balance task's priority is always LOW
tabletCtx.setOrigPriority(TabletSchedCtx.Priority.LOW);
tabletCtx.setPriority(TabletSchedCtx.Priority.LOW);
alternativeTablets.add(tabletCtx);
// Pair<Move, ToDeleteReplicaId>, ToDeleteReplicaId should be -1L before scheduled successfully
movesInProgress.get().put(pickedTabletId,
@ -251,7 +251,7 @@ public class PartitionRebalancer extends Rebalancer {
if (slot.takeBalanceSlot(srcReplica.getPathHash()) != -1) {
tabletCtx.setSrc(srcReplica);
} else {
throw new SchedException(SchedException.Status.SCHEDULE_FAILED,
throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SchedException.SubCode.WAITING_SLOT,
"no slot for src replica " + srcReplica + ", pathHash " + srcReplica.getPathHash());
}
@ -269,7 +269,7 @@ public class PartitionRebalancer extends Rebalancer {
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet());
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath);
if (pathHash == -1) {
throw new SchedException(SchedException.Status.SCHEDULE_FAILED,
throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SchedException.SubCode.WAITING_SLOT,
"paths has no available balance slot: " + availPath);
} else {
tabletCtx.setDest(beStat.getBeId(), pathHash);

View File

@ -27,14 +27,30 @@ public class SchedException extends Exception {
FINISHED // schedule is done, remove the tablet from tablet scheduler with status FINISHED
}
public enum SubCode {
NONE,
WAITING_DECOMMISSION,
WAITING_SLOT,
}
private Status status;
private SubCode subCode;
public SchedException(Status status, String errorMsg) {
this(status, SubCode.NONE, errorMsg);
}
public SchedException(Status status, SubCode subCode, String errorMsg) {
super(errorMsg);
this.status = status;
this.subCode = subCode;
}
public Status getStatus() {
return status;
}
public SubCode getSubCode() {
return subCode;
}
}

View File

@ -372,9 +372,7 @@ public class TabletChecker extends MasterDaemon {
}
counter.unhealthyTabletNum++;
if (!tablet.readyToBeRepaired(statusWithPrio.second)) {
counter.tabletNotReady++;
if (!tablet.readyToBeRepaired(infoService, statusWithPrio.second)) {
continue;
}
@ -386,7 +384,7 @@ public class TabletChecker extends MasterDaemon {
System.currentTimeMillis());
// the tablet status will be set again when being scheduled
tabletCtx.setTabletStatus(statusWithPrio.first);
tabletCtx.setOrigPriority(statusWithPrio.second);
tabletCtx.setPriority(statusWithPrio.second);
AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */);
if (res == AddResult.LIMIT_EXCEED || res == AddResult.DISABLED) {

View File

@ -29,6 +29,7 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.clone.SchedException.Status;
import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
@ -102,6 +103,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
*/
private static final int RUNNING_FAILED_COUNTER_THRESHOLD = 3;
public static final int FINISHED_COUNTER_THRESHOLD = 3;
private static VersionCountComparator VERSION_COUNTER_COMPARATOR = new VersionCountComparator();
public enum Type {
@ -117,22 +120,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
NORMAL,
HIGH,
VERY_HIGH;
// VERY_HIGH can only be downgraded to NORMAL
// LOW can only be upgraded to HIGH
public Priority adjust(Priority origPriority, boolean isUp) {
switch (this) {
case VERY_HIGH:
return isUp ? VERY_HIGH : HIGH;
case HIGH:
return isUp ? (origPriority == LOW ? HIGH : VERY_HIGH) : NORMAL;
case NORMAL:
return isUp ? HIGH : (origPriority == Priority.VERY_HIGH ? NORMAL : LOW);
default:
return isUp ? NORMAL : LOW;
}
}
}
public enum State {
@ -147,23 +134,19 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
private Type type;
private BalanceType balanceType;
/*
* origPriority is the origin priority being set when this tablet being added to scheduler.
* dynamicPriority will be set during tablet schedule processing, it will not be prior than origin priority.
* And dynamic priority is also used in priority queue compare in tablet scheduler.
*/
private Priority origPriority;
private Priority dynamicPriority;
private Priority priority;
// we change the dynamic priority based on how many times it fails to be scheduled
private int failedSchedCounter = 0;
// clone task failed counter
private int failedRunningCounter = 0;
// When finish a tablet ctx, it will check the tablet's health status.
// If the tablet is unhealthy, it will add a new ctx.
// The new ctx's finishedCounter = old ctx's finishedCounter + 1.
private int finishedCounter = 0;
// last time this tablet being scheduled
private long lastSchedTime = 0;
// last time the dynamic priority being adjusted
private long lastAdjustPrioTime = 0;
// last time this tablet being visited.
// being visited means:
@ -180,6 +163,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
private State state;
private TabletStatus tabletStatus;
private long decommissionTime = -1;
private long dbId;
private long tblId;
private long partitionId;
@ -223,6 +208,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
// tag is only set for BALANCE task, used to identify which workload group this Balance job is in
private Tag tag;
private SubCode schedFailedCode;
public TabletSchedCtx(Type type, long dbId, long tblId, long partId,
long idxId, long tabletId, ReplicaAllocation replicaAlloc, long createTime) {
this.type = type;
@ -236,12 +223,17 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
this.state = State.PENDING;
this.replicaAlloc = replicaAlloc;
this.balanceType = BalanceType.BE_BALANCE;
this.schedFailedCode = SubCode.NONE;
}
public ReplicaAllocation getReplicaAlloc() {
return replicaAlloc;
}
public void setReplicaAlloc(ReplicaAllocation replicaAlloc) {
this.replicaAlloc = replicaAlloc;
}
public void setTag(Tag tag) {
this.tag = tag;
}
@ -266,21 +258,20 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
return balanceType;
}
public Priority getOrigPriority() {
return origPriority;
public Priority getPriority() {
return priority;
}
public void setOrigPriority(Priority origPriority) {
this.origPriority = origPriority;
// reset dynamic priority along with the origin priority being set.
this.dynamicPriority = origPriority;
this.failedSchedCounter = 0;
this.lastSchedTime = 0;
this.lastAdjustPrioTime = 0;
public void setPriority(Priority priority) {
this.priority = priority;
}
public Priority getDynamicPriority() {
return dynamicPriority;
public int getFinishedCounter() {
return finishedCounter;
}
public void setFinishedCounter(int finishedCounter) {
this.finishedCounter = finishedCounter;
}
public void increaseFailedSchedCounter() {
@ -295,8 +286,27 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
++failedRunningCounter;
}
public int getFailedRunningCounter() {
return failedRunningCounter;
public boolean isExceedFailedRunningLimit() {
return failedRunningCounter >= RUNNING_FAILED_COUNTER_THRESHOLD;
}
public boolean onSchedFailedAndCheckExceedLimit(SubCode code) {
schedFailedCode = code;
failedSchedCounter++;
if (code == SubCode.WAITING_DECOMMISSION) {
failedSchedCounter = 0;
if (decommissionTime < 0) {
decommissionTime = System.currentTimeMillis();
}
return System.currentTimeMillis() > decommissionTime + 10 * 60 * 1000L;
} else {
decommissionTime = -1;
if (code == SubCode.WAITING_SLOT && type != Type.BALANCE) {
return failedSchedCounter > 30 * 1000 / TabletScheduler.SCHEDULE_INTERVAL_MS;
} else {
return failedSchedCounter > 10;
}
}
}
public void setLastSchedTime(long lastSchedTime) {
@ -311,6 +321,10 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
this.finishedTime = finishedTime;
}
public void setDecommissionTime(long decommissionTime) {
this.decommissionTime = decommissionTime;
}
public State getState() {
return state;
}
@ -615,7 +629,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
setSrc(srcReplica);
return;
}
throw new SchedException(Status.SCHEDULE_FAILED, "unable to find source slot");
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
"unable to find source slot");
}
/*
@ -641,7 +656,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
*/
public void chooseDestReplicaForVersionIncomplete(Map<Long, PathSlot> backendsWorkingSlots)
throws SchedException {
Replica chosenReplica = null;
List<Replica> candidates = Lists.newArrayList();
for (Replica replica : tablet.getReplicas()) {
if (replica.isBad()) {
LOG.debug("replica {} is bad, skip. tablet: {}",
@ -660,18 +675,37 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
// if the replica's state is DECOMMISSION, it may be chose as dest replica,
// and its state will be set to NORMAL later.
if (replica.getLastFailedVersion() <= 0
&& ((replica.getVersion() == visibleVersion)
|| replica.getVersion() > visibleVersion) && replica.getState() != ReplicaState.DECOMMISSION) {
&& replica.getVersion() >= visibleVersion
&& replica.getState() != ReplicaState.DECOMMISSION) {
// skip healthy replica
LOG.debug("replica {} version {} is healthy, visible version {}, replica state {}, skip. tablet: {}",
replica.getId(), replica.getVersion(), visibleVersion, replica.getState(), tabletId);
continue;
}
candidates.add(replica);
}
if (candidates.isEmpty()) {
throw new SchedException(Status.UNRECOVERABLE, "unable to choose dest replica");
}
Replica chosenReplica = null;
for (Replica replica : candidates) {
PathSlot slot = backendsWorkingSlots.get(replica.getBackendId());
if (slot == null || !slot.hasAvailableSlot(replica.getPathHash())) {
if (!replica.needFurtherRepair()) {
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
"replica " + replica + " has not slot");
}
continue;
}
if (replica.needFurtherRepair()) {
chosenReplica = replica;
LOG.debug("replica {} need further repair, choose it. tablet: {}",
replica.getId(), tabletId);
chosenReplica = replica;
break;
}
@ -686,20 +720,19 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
}
}
if (chosenReplica == null) {
throw new SchedException(Status.SCHEDULE_FAILED, "unable to choose dest replica");
}
// check if the dest replica has available slot
// it should not happen cause it just check hasAvailableSlot yet.
PathSlot slot = backendsWorkingSlots.get(chosenReplica.getBackendId());
if (slot == null) {
throw new SchedException(Status.SCHEDULE_FAILED, "backend of dest replica is missing");
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
"backend of dest replica is missing");
}
long destPathHash = slot.takeSlot(chosenReplica.getPathHash());
if (destPathHash == -1) {
throw new SchedException(Status.SCHEDULE_FAILED, "unable to take slot of dest path");
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
"unable to take slot of dest path");
}
if (chosenReplica.getState() == ReplicaState.DECOMMISSION) {
// Since this replica is selected as the repair object of VERSION_INCOMPLETE,
// it means that this replica needs to be able to accept loading data.
@ -717,6 +750,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
// forever, because the replica in the DECOMMISSION state will not receive the load task.
chosenReplica.setWatermarkTxnId(-1);
chosenReplica.setState(ReplicaState.NORMAL);
setDecommissionTime(-1);
LOG.info("choose replica {} on backend {} of tablet {} as dest replica for version incomplete,"
+ " and change state from DECOMMISSION to NORMAL",
chosenReplica.getId(), chosenReplica.getBackendId(), tabletId);
@ -941,7 +975,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
cloneTask.getDbId(), cloneTask.getTableId(), cloneTask.getPartitionId(),
cloneTask.getIndexId(), cloneTask.getTabletId(), cloneTask.getBackendId(),
dbId, tblId, partitionId, indexId, tablet.getId(), destBackendId);
throw new SchedException(Status.RUNNING_FAILED, msg);
throw new SchedException(Status.UNRECOVERABLE, msg);
}
// 1. check the tablet status first
@ -1041,13 +1075,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
state = State.FINISHED;
LOG.info("clone finished: {}", this);
} catch (SchedException e) {
// if failed to too many times, remove this task
++failedRunningCounter;
if (failedRunningCounter > RUNNING_FAILED_COUNTER_THRESHOLD) {
throw new SchedException(Status.UNRECOVERABLE, e.getMessage());
}
throw e;
} finally {
olapTable.writeUnlock();
}
@ -1061,73 +1088,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
}
}
/*
* we try to adjust the priority based on schedule history
* 1. If failed counter is larger than FAILED_COUNTER_THRESHOLD, which means this tablet is being scheduled
* at least FAILED_TIME_THRESHOLD times and all are failed. So we downgrade its priority.
* Also reset the failedCounter, or it will be downgraded forever.
*
* 2. Else, if it has been a long time since last time the tablet being scheduled, we upgrade its
* priority to let it more available to be scheduled.
*
* The time gap between adjustment should be larger than MIN_ADJUST_PRIORITY_INTERVAL_MS, to avoid
* being downgraded too fast.
*
* eg:
* A tablet has been scheduled for 5 times and all were failed. its priority will be downgraded. And if it is
* scheduled for 5 times and all are failed again, it will be downgraded again, until to the LOW.
* And than, because of LOW, this tablet can not be scheduled for a long time, and it will be upgraded
* to NORMAL, if still not being scheduled, it will be upgraded up to VERY_HIGH.
*
* return true if dynamic priority changed
*/
public boolean adjustPriority(TabletSchedulerStat stat) {
long currentTime = System.currentTimeMillis();
if (lastAdjustPrioTime == 0) {
// skip the first time we adjust this priority
lastAdjustPrioTime = currentTime;
return false;
} else {
if (currentTime - lastAdjustPrioTime < MIN_ADJUST_PRIORITY_INTERVAL_MS) {
return false;
}
}
boolean isDowngrade = false;
boolean isUpgrade = false;
if (failedSchedCounter > SCHED_FAILED_COUNTER_THRESHOLD) {
isDowngrade = true;
} else {
long lastTime = lastSchedTime == 0 ? createTime : lastSchedTime;
if (currentTime - lastTime > MAX_NOT_BEING_SCHEDULED_INTERVAL_MS) {
isUpgrade = true;
}
}
Priority originDynamicPriority = dynamicPriority;
if (isDowngrade) {
dynamicPriority = dynamicPriority.adjust(origPriority, false /* downgrade */);
failedSchedCounter = 0;
if (originDynamicPriority != dynamicPriority) {
LOG.debug("downgrade dynamic priority from {} to {}, origin: {}, tablet: {}",
originDynamicPriority.name(), dynamicPriority.name(), origPriority.name(), tabletId);
stat.counterTabletPrioDowngraded.incrementAndGet();
return true;
}
} else if (isUpgrade) {
dynamicPriority = dynamicPriority.adjust(origPriority, true /* upgrade */);
// no need to set lastSchedTime, lastSchedTime is set each time we schedule this tablet
if (originDynamicPriority != dynamicPriority) {
LOG.debug("upgrade dynamic priority from {} to {}, origin: {}, tablet: {}",
originDynamicPriority.name(), dynamicPriority.name(), origPriority.name(), tabletId);
stat.counterTabletPrioUpgraded.incrementAndGet();
return true;
}
}
return false;
}
public boolean isTimeout() {
if (state != TabletSchedCtx.State.RUNNING) {
return false;
@ -1144,8 +1104,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
result.add(storageMedium == null ? FeConstants.null_string : storageMedium.name());
result.add(tabletStatus == null ? FeConstants.null_string : tabletStatus.name());
result.add(state.name());
result.add(origPriority.name());
result.add(dynamicPriority.name());
result.add(schedFailedCode.name());
result.add(priority.name());
result.add(srcReplica == null ? "-1" : String.valueOf(srcReplica.getBackendId()));
result.add(String.valueOf(srcPathHash));
result.add(String.valueOf(destBackendId));
@ -1158,7 +1118,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
result.add(copyTimeMs > 0 ? String.valueOf(copySize / copyTimeMs / 1000.0) : FeConstants.null_string);
result.add(String.valueOf(failedSchedCounter));
result.add(String.valueOf(failedRunningCounter));
result.add(TimeUtils.longToTimeString(lastAdjustPrioTime));
result.add(String.valueOf(visibleVersion));
result.add(String.valueOf(committedVersion));
result.add(Strings.nullToEmpty(errMsg));
@ -1171,19 +1130,23 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
*/
@Override
public int compareTo(TabletSchedCtx o) {
if (dynamicPriority.ordinal() < o.dynamicPriority.ordinal()) {
return 1;
} else if (dynamicPriority.ordinal() > o.dynamicPriority.ordinal()) {
return -1;
} else {
if (lastVisitedTime < o.lastVisitedTime) {
return -1;
} else if (lastVisitedTime > o.lastVisitedTime) {
return 1;
} else {
return 0;
}
return Long.compare(getCompareValue(), o.getCompareValue());
}
private long getCompareValue() {
long value = createTime;
if (lastVisitedTime > 0) {
value = lastVisitedTime;
}
value += (Priority.VERY_HIGH.ordinal() - priority.ordinal() + 1) * 60 * 1000L;
value += 5000L * (failedSchedCounter / 10);
if (type == Type.BALANCE) {
value += 30 * 60 * 1000L;
}
return value;
}
@Override
@ -1227,6 +1190,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
* call this when releaseTabletCtx()
*/
public void resetReplicaState() {
setDecommissionTime(-1);
if (tablet != null) {
for (Replica replica : tablet.getReplicas()) {
// To address issue: https://github.com/apache/doris/issues/9422
@ -1243,5 +1207,4 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
}
}
}
}

View File

@ -32,10 +32,12 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Partition.PartitionState;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.clone.SchedException.Status;
import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletSchedCtx.Type;
import org.apache.doris.common.AnalysisException;
@ -59,6 +61,7 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.TransactionState;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.EvictingQueue;
import com.google.common.collect.ImmutableMap;
@ -100,21 +103,21 @@ public class TabletScheduler extends MasterDaemon {
// the minimum interval of updating cluster statistics and priority of tablet info
private static final long STAT_UPDATE_INTERVAL_MS = 20 * 1000; // 20s
private static final long SCHEDULE_INTERVAL_MS = 1000; // 1s
public static final long SCHEDULE_INTERVAL_MS = 100;
/*
* Tablet is added to pendingTablets as well it's id in allTabletIds.
* TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletIds when
* Tablet is added to pendingTablets as well it's id in allTabletTypes.
* TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletTypes when
* handling a tablet.
* Tablet' id can only be removed after the clone task or migration task is done(timeout, cancelled or finished).
* So if a tablet's id is still in allTabletIds, TabletChecker can not add tablet to TabletScheduler.
* So if a tablet's id is still in allTabletTypes, TabletChecker can not add tablet to TabletScheduler.
*
* pendingTablets + runningTablets = allTabletIds
* pendingTablets + runningTablets = allTabletTypes
*
* pendingTablets, allTabletIds, runningTablets and schedHistory are protected by 'synchronized'
* pendingTablets, allTabletTypes, runningTablets and schedHistory are protected by 'synchronized'
*/
private PriorityQueue<TabletSchedCtx> pendingTablets = new PriorityQueue<>();
private Set<Long> allTabletIds = Sets.newHashSet();
private Map<Long, TabletSchedCtx.Type> allTabletTypes = Maps.newHashMap();
// contains all tabletCtxs which state are RUNNING
private Map<Long, TabletSchedCtx> runningTablets = Maps.newHashMap();
// save the latest 1000 scheduled tablet info
@ -128,6 +131,8 @@ public class TabletScheduler extends MasterDaemon {
private long lastSlotAdjustTime = 0;
private long lastCheckTimeoutTime = 0;
private Env env;
private SystemInfoService infoService;
private TabletInvertedIndex invertedIndex;
@ -175,7 +180,8 @@ public class TabletScheduler extends MasterDaemon {
// when upgrading, backend may not get path info yet. so return false and wait for next round.
// and we should check if backend is alive. If backend is dead when upgrading, this backend
// will never report its path hash, and tablet scheduler is blocked.
LOG.info("not all backends have path info");
LOG.info("backend {}:{} with id {} doesn't have path info.", backend.getHost(),
backend.getBePort(), backend.getId());
return false;
}
}
@ -204,7 +210,7 @@ public class TabletScheduler extends MasterDaemon {
if (!backendsWorkingSlots.containsKey(be.getId())) {
List<Long> pathHashes = be.getDisks().values().stream()
.map(DiskInfo::getPathHash).collect(Collectors.toList());
PathSlot slot = new PathSlot(pathHashes, Config.schedule_slot_num_per_path);
PathSlot slot = new PathSlot(pathHashes, be.getId());
backendsWorkingSlots.put(be.getId(), slot);
LOG.info("add new backend {} with slots num: {}", be.getId(), be.getDisks().size());
}
@ -225,7 +231,18 @@ public class TabletScheduler extends MasterDaemon {
if (!force && Config.disable_tablet_scheduler) {
return AddResult.DISABLED;
}
if (!force && containsTablet(tablet.getTabletId())) {
// REPAIR has higher priority than BALANCE.
// Suppose adding a BALANCE tablet successfully, then adding this tablet's REPAIR ctx will fail.
// But we set allTabletTypes[tabletId] to REPAIR. Later at the beginning of scheduling this tablet,
// it will reset its type as allTabletTypes[tabletId], so its type will convert to REPAIR.
long tabletId = tablet.getTabletId();
boolean contains = allTabletTypes.containsKey(tabletId);
if (contains && !force) {
if (tablet.getType() == TabletSchedCtx.Type.REPAIR) {
allTabletTypes.put(tabletId, TabletSchedCtx.Type.REPAIR);
}
return AddResult.ALREADY_IN;
}
@ -238,13 +255,22 @@ public class TabletScheduler extends MasterDaemon {
return AddResult.LIMIT_EXCEED;
}
allTabletIds.add(tablet.getTabletId());
if (!contains || tablet.getType() == TabletSchedCtx.Type.REPAIR) {
allTabletTypes.put(tabletId, tablet.getType());
}
pendingTablets.offer(tablet);
if (!contains) {
LOG.info("Add tablet to pending queue, tablet id {}, type {}, status {}, priority {}",
tablet.getTabletId(), tablet.getType(), tablet.getTabletStatus(),
tablet.getPriority());
}
return AddResult.ADDED;
}
public synchronized boolean containsTablet(long tabletId) {
return allTabletIds.contains(tabletId);
return allTabletTypes.containsKey(tabletId);
}
public synchronized void rebalanceDisk(AdminRebalanceDiskStmt stmt) {
@ -263,7 +289,7 @@ public class TabletScheduler extends MasterDaemon {
for (TabletSchedCtx tabletCtx : pendingTablets) {
if (tabletCtx.getDbId() == dbId && tabletCtx.getTblId() == tblId
&& partitionIds.contains(tabletCtx.getPartitionId())) {
tabletCtx.setOrigPriority(Priority.VERY_HIGH);
tabletCtx.setPriority(Priority.VERY_HIGH);
}
newPendingTablets.add(tabletCtx);
}
@ -293,14 +319,15 @@ public class TabletScheduler extends MasterDaemon {
return;
}
updateLoadStatisticsAndPriorityIfNecessary();
if (System.currentTimeMillis() - lastCheckTimeoutTime >= 1000L) {
updateLoadStatisticsAndPriorityIfNecessary();
handleRunningTablets();
selectTabletsForBalance();
lastCheckTimeoutTime = System.currentTimeMillis();
}
schedulePendingTablets();
handleRunningTablets();
selectTabletsForBalance();
stat.counterTabletScheduleRound.incrementAndGet();
}
@ -314,8 +341,6 @@ public class TabletScheduler extends MasterDaemon {
rebalancer.updateLoadStatistic(statisticMap);
diskRebalancer.updateLoadStatistic(statisticMap);
adjustPriorities();
lastStatUpdateTime = System.currentTimeMillis();
}
@ -332,7 +357,7 @@ public class TabletScheduler extends MasterDaemon {
LoadStatisticForTag loadStatistic = new LoadStatisticForTag(tag, infoService, invertedIndex);
loadStatistic.init();
newStatisticMap.put(tag, loadStatistic);
LOG.debug("update load statistic:\n{}", loadStatistic.getBrief());
LOG.debug("update load statistic for tag {}:\n{}", tag, loadStatistic.getBrief());
}
this.statisticMap = newStatisticMap;
@ -342,28 +367,6 @@ public class TabletScheduler extends MasterDaemon {
return statisticMap;
}
/**
* adjust priorities of all tablet infos
*/
private synchronized void adjustPriorities() {
int size = pendingTablets.size();
int changedNum = 0;
TabletSchedCtx tabletCtx;
for (int i = 0; i < size; i++) {
tabletCtx = pendingTablets.poll();
if (tabletCtx == null) {
break;
}
if (tabletCtx.adjustPriority(stat)) {
changedNum++;
}
pendingTablets.add(tabletCtx);
}
LOG.debug("adjust priority for all tablets. changed: {}, total: {}", changedNum, size);
}
/**
* get at most BATCH_NUM tablets from queue, and try to schedule them.
* After handle, the tablet info should be
@ -371,7 +374,7 @@ public class TabletScheduler extends MasterDaemon {
* 2. or in schedHistory with state CANCELLING, if some unrecoverable error happens.
* 3. or in pendingTablets with state PENDING, if failed to be scheduled.
*
* if in schedHistory, it should be removed from allTabletIds.
* if in schedHistory, it should be removed from allTabletTypes.
*/
private void schedulePendingTablets() {
long start = System.currentTimeMillis();
@ -385,36 +388,25 @@ public class TabletScheduler extends MasterDaemon {
// do not schedule more tablet is tablet scheduler is disabled.
throw new SchedException(Status.FINISHED, "tablet scheduler is disabled");
}
if (Config.disable_balance && tabletCtx.getType() == Type.BALANCE) {
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
"config disable balance");
continue;
}
scheduleTablet(tabletCtx, batchTask);
} catch (SchedException e) {
tabletCtx.increaseFailedSchedCounter();
tabletCtx.setErrMsg(e.getMessage());
if (e.getStatus() == Status.SCHEDULE_FAILED) {
if (tabletCtx.getType() == Type.BALANCE) {
// if balance is disabled, remove this tablet
if (Config.disable_balance) {
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(),
"disable balance and " + e.getMessage());
} else {
// remove the balance task if it fails to be scheduled many times
if (tabletCtx.getFailedSchedCounter() > 10) {
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(),
"schedule failed too many times and " + e.getMessage());
} else {
// we must release resource it current hold, and be scheduled again
tabletCtx.releaseResource(this);
// adjust priority to avoid some higher priority always be the first in pendingTablets
stat.counterTabletScheduledFailed.incrementAndGet();
dynamicAdjustPrioAndAddBackToPendingTablets(tabletCtx, e.getMessage());
}
}
boolean isExceedLimit = tabletCtx.onSchedFailedAndCheckExceedLimit(e.getSubCode());
if (isExceedLimit) {
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(),
"schedule failed too many times and " + e.getMessage());
} else {
// we must release resource it current hold, and be scheduled again
tabletCtx.releaseResource(this);
// adjust priority to avoid some higher priority always be the first in pendingTablets
stat.counterTabletScheduledFailed.incrementAndGet();
dynamicAdjustPrioAndAddBackToPendingTablets(tabletCtx, e.getMessage());
addBackToPendingTablets(tabletCtx);
}
} else if (e.getStatus() == Status.FINISHED) {
// schedule redundant tablet or scheduler disabled will throw this exception
@ -485,6 +477,8 @@ public class TabletScheduler extends MasterDaemon {
tbl.writeLockOrException(new SchedException(Status.UNRECOVERABLE, "table "
+ tbl.getName() + " does not exist"));
try {
long tabletId = tabletCtx.getTabletId();
boolean isColocateTable = colocateTableIndex.isColocateTable(tbl.getId());
OlapTableState tableState = tbl.getState();
@ -499,8 +493,9 @@ public class TabletScheduler extends MasterDaemon {
throw new SchedException(Status.UNRECOVERABLE, "index does not exist");
}
Tablet tablet = idx.getTablet(tabletCtx.getTabletId());
Tablet tablet = idx.getTablet(tabletId);
Preconditions.checkNotNull(tablet);
ReplicaAllocation replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
if (isColocateTable) {
GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
@ -516,18 +511,26 @@ public class TabletScheduler extends MasterDaemon {
Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
TabletStatus st = tablet.getColocateHealthStatus(
partition.getVisibleVersion(),
tbl.getPartitionInfo().getReplicaAllocation(partition.getId()),
backendsSet);
partition.getVisibleVersion(), replicaAlloc, backendsSet);
statusPair = Pair.of(st, Priority.HIGH);
tabletCtx.setColocateGroupBackendIds(backendsSet);
} else {
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
statusPair = tablet.getHealthStatusWithPriority(
infoService,
partition.getVisibleVersion(),
tbl.getPartitionInfo().getReplicaAllocation(partition.getId()),
aliveBeIds);
infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
}
if (tabletCtx.getType() != allTabletTypes.get(tabletId)) {
TabletSchedCtx.Type curType = tabletCtx.getType();
TabletSchedCtx.Type newType = allTabletTypes.get(tabletId);
if (curType == TabletSchedCtx.Type.BALANCE && newType == TabletSchedCtx.Type.REPAIR) {
tabletCtx.setType(newType);
tabletCtx.setReplicaAlloc(replicaAlloc);
tabletCtx.setTag(null);
} else {
throw new SchedException(Status.UNRECOVERABLE, "can not convert type of tablet "
+ tabletId + " from " + curType.name() + " to " + newType.name());
}
}
if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE && tableState != OlapTableState.NORMAL) {
@ -732,11 +735,11 @@ public class TabletScheduler extends MasterDaemon {
private void handleReplicaVersionIncomplete(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
throws SchedException {
stat.counterReplicaVersionMissingErr.incrementAndGet();
try {
tabletCtx.chooseDestReplicaForVersionIncomplete(backendsWorkingSlots);
} catch (SchedException e) {
if (e.getMessage().equals("unable to choose dest replica")) {
// could not find dest, try add a missing.
if (e.getStatus() == Status.UNRECOVERABLE) {
// This situation may occur when the BE nodes
// where all replicas of a tablet are located are decommission,
// and this task is a VERSION_INCOMPLETE task.
@ -779,25 +782,7 @@ public class TabletScheduler extends MasterDaemon {
private void handleReplicaRelocating(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
throws SchedException {
stat.counterReplicaUnavailableErr.incrementAndGet();
try {
handleReplicaVersionIncomplete(tabletCtx, batchTask);
LOG.debug("succeed to find version incomplete replica from tablet relocating. tablet id: {}",
tabletCtx.getTabletId());
} catch (SchedException e) {
if (e.getStatus() == Status.SCHEDULE_FAILED) {
LOG.debug("failed to find version incomplete replica from tablet relocating. tablet id: {}, "
+ "try to find a new backend", tabletCtx.getTabletId());
// the dest or src slot may be taken after calling handleReplicaVersionIncomplete(),
// so we need to release these slots first.
// and reserve the tablet in TabletSchedCtx so that it can continue to be scheduled.
tabletCtx.releaseResource(this, true);
tabletCtx.setTabletStatus(TabletStatus.REPLICA_MISSING);
handleReplicaMissing(tabletCtx, batchTask);
LOG.debug("succeed to find new backend for tablet relocating. tablet id: {}", tabletCtx.getTabletId());
} else {
throw e;
}
}
handleReplicaVersionIncomplete(tabletCtx, batchTask);
}
/**
@ -831,7 +816,7 @@ public class TabletScheduler extends MasterDaemon {
// to remove this tablet from the pendingTablets(consider it as finished)
throw new SchedException(Status.FINISHED, "redundant replica is deleted");
}
throw new SchedException(Status.SCHEDULE_FAILED, "unable to delete any redundant replicas");
throw new SchedException(Status.UNRECOVERABLE, "unable to delete any redundant replicas");
}
private boolean deleteBackendDropped(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
@ -1035,7 +1020,7 @@ public class TabletScheduler extends MasterDaemon {
deleteReplicaInternal(tabletCtx, replica, "colocate redundant", false);
throw new SchedException(Status.FINISHED, "colocate redundant replica is deleted");
}
throw new SchedException(Status.SCHEDULE_FAILED, "unable to delete any colocate redundant replicas");
throw new SchedException(Status.UNRECOVERABLE, "unable to delete any colocate redundant replicas");
}
/**
@ -1102,16 +1087,17 @@ public class TabletScheduler extends MasterDaemon {
replica.setWatermarkTxnId(nextTxnId);
replica.setState(ReplicaState.DECOMMISSION);
// set priority to normal because it may wait for a long time. Remain it as VERY_HIGH may block other task.
tabletCtx.setOrigPriority(Priority.NORMAL);
tabletCtx.setPriority(Priority.NORMAL);
LOG.info("set replica {} on backend {} of tablet {} state to DECOMMISSION due to reason {}",
replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), reason);
throw new SchedException(Status.SCHEDULE_FAILED, "set watermark txn " + nextTxnId);
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION,
"set watermark txn " + nextTxnId);
} else if (replica.getState() == ReplicaState.DECOMMISSION && replica.getWatermarkTxnId() != -1) {
long watermarkTxnId = replica.getWatermarkTxnId();
try {
if (!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId,
tabletCtx.getDbId(), Lists.newArrayList(tabletCtx.getTblId()))) {
throw new SchedException(Status.SCHEDULE_FAILED,
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION,
"wait txn before " + watermarkTxnId + " to be finished");
}
} catch (AnalysisException e) {
@ -1225,7 +1211,7 @@ public class TabletScheduler extends MasterDaemon {
}
for (TabletSchedCtx tabletCtx : diskBalanceTablets) {
// add if task from prio backend or cluster is balanced
if (alternativeTablets.isEmpty() || tabletCtx.getOrigPriority() == TabletSchedCtx.Priority.NORMAL) {
if (alternativeTablets.isEmpty() || tabletCtx.getPriority() == TabletSchedCtx.Priority.NORMAL) {
addTablet(tabletCtx, false);
}
}
@ -1260,7 +1246,8 @@ public class TabletScheduler extends MasterDaemon {
LoadStatisticForTag statistic = statisticMap.get(tag);
if (statistic == null) {
throw new SchedException(Status.UNRECOVERABLE,
String.format("tag %s does not exist.", tag));
String.format("tag %s does not exist. available tags: %s", tag,
Joiner.on(",").join(statisticMap.keySet().stream().limit(5).toArray())));
}
beStatistics = statistic.getSortedBeLoadStats(null /* sorted ignore medium */);
} else {
@ -1336,7 +1323,7 @@ public class TabletScheduler extends MasterDaemon {
}
if (allFitPaths.isEmpty()) {
throw new SchedException(Status.SCHEDULE_FAILED, "unable to find dest path for new replica");
throw new SchedException(Status.UNRECOVERABLE, "unable to find dest path for new replica");
}
// all fit paths has already been sorted by load score in 'allFitPaths' in ascend order.
@ -1390,25 +1377,109 @@ public class TabletScheduler extends MasterDaemon {
return rootPathLoadStatistic;
}
throw new SchedException(Status.SCHEDULE_FAILED, "unable to find dest path which can be fit in");
throw new SchedException(Status.UNRECOVERABLE, "unable to find dest path which can be fit in");
}
/**
* For some reason, a tablet info failed to be scheduled this time,
* So we dynamically change its priority and add back to queue, waiting for next round.
*/
private void dynamicAdjustPrioAndAddBackToPendingTablets(TabletSchedCtx tabletCtx, String message) {
private void addBackToPendingTablets(TabletSchedCtx tabletCtx) {
Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.PENDING);
tabletCtx.adjustPriority(stat);
addTablet(tabletCtx, true /* force */);
}
private void finalizeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, Status status, String reason) {
// use 2 steps to avoid nested database lock and synchronized.(releaseTabletCtx() may hold db lock)
// remove the tablet ctx, so that no other process can see it
removeTabletCtx(tabletCtx, reason);
// release resources taken by tablet ctx
releaseTabletCtx(tabletCtx, state, status == Status.UNRECOVERABLE);
// if check immediately, then no need to wait TabletChecker's 20s
if (state == TabletSchedCtx.State.FINISHED) {
tryAddAfterFinished(tabletCtx);
}
}
private void tryAddAfterFinished(TabletSchedCtx tabletCtx) {
int finishedCounter = tabletCtx.getFinishedCounter();
finishedCounter++;
tabletCtx.setFinishedCounter(finishedCounter);
if (finishedCounter >= TabletSchedCtx.FINISHED_COUNTER_THRESHOLD) {
return;
}
Database db = Env.getCurrentInternalCatalog().getDbNullable(tabletCtx.getDbId());
if (db == null) {
return;
}
OlapTable tbl = (OlapTable) db.getTableNullable(tabletCtx.getTblId());
if (tbl == null) {
return;
}
Pair<TabletStatus, TabletSchedCtx.Priority> statusPair;
ReplicaAllocation replicaAlloc = null;
tbl.readLock();
try {
Partition partition = tbl.getPartition(tabletCtx.getPartitionId());
if (partition == null) {
return;
}
MaterializedIndex idx = partition.getIndex(tabletCtx.getIndexId());
if (idx == null) {
return;
}
Tablet tablet = idx.getTablet(tabletCtx.getTabletId());
if (tablet == null) {
return;
}
replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
boolean isColocateTable = colocateTableIndex.isColocateTable(tbl.getId());
if (isColocateTable) {
GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
if (groupId == null) {
return;
}
int tabletOrderIdx = tabletCtx.getTabletOrderIdx();
if (tabletOrderIdx == -1) {
tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId());
}
Preconditions.checkState(tabletOrderIdx != -1);
Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
TabletStatus st = tablet.getColocateHealthStatus(
partition.getVisibleVersion(), replicaAlloc, backendsSet);
statusPair = Pair.of(st, Priority.HIGH);
} else {
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
statusPair = tablet.getHealthStatusWithPriority(
infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
if (statusPair.second.ordinal() < tabletCtx.getPriority().ordinal()) {
statusPair.second = tabletCtx.getPriority();
}
}
} finally {
tbl.readUnlock();
}
if (statusPair.first == TabletStatus.HEALTHY) {
return;
}
TabletSchedCtx newTabletCtx = new TabletSchedCtx(
TabletSchedCtx.Type.REPAIR, tabletCtx.getDbId(), tabletCtx.getTblId(),
tabletCtx.getPartitionId(), tabletCtx.getIndexId(), tabletCtx.getTabletId(),
replicaAlloc, System.currentTimeMillis());
newTabletCtx.setTabletStatus(statusPair.first);
newTabletCtx.setPriority(statusPair.second);
newTabletCtx.setFinishedCounter(finishedCounter);
addTablet(newTabletCtx, false);
}
private void releaseTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, boolean resetReplicaState) {
@ -1422,7 +1493,7 @@ public class TabletScheduler extends MasterDaemon {
private synchronized void removeTabletCtx(TabletSchedCtx tabletCtx, String reason) {
runningTablets.remove(tabletCtx.getTabletId());
allTabletIds.remove(tabletCtx.getTabletId());
allTabletTypes.remove(tabletCtx.getTabletId());
schedHistory.add(tabletCtx);
LOG.info("remove the tablet {}. because: {}", tabletCtx.getTabletId(), reason);
}
@ -1430,15 +1501,27 @@ public class TabletScheduler extends MasterDaemon {
// get next batch of tablets from queue.
private synchronized List<TabletSchedCtx> getNextTabletCtxBatch() {
List<TabletSchedCtx> list = Lists.newArrayList();
int count = Math.min(MIN_BATCH_NUM, getCurrentAvailableSlotNum());
while (count > 0) {
int slotNum = getCurrentAvailableSlotNum();
// Make slotNum >= 1 to ensure that it could return at least 1 ctx
// when the pending list is not empty.
if (slotNum < 1) {
slotNum = 1;
}
while (list.size() < Config.schedule_batch_size && slotNum > 0) {
TabletSchedCtx tablet = pendingTablets.poll();
if (tablet == null) {
// no more tablets
break;
}
list.add(tablet);
count--;
TabletStatus status = tablet.getTabletStatus();
// for a clone, it will take 2 slots: src slot and dst slot.
if (!(status == TabletStatus.REDUNDANT
|| status == TabletStatus.FORCE_REDUNDANT
|| status == TabletStatus.COLOCATE_REDUNDANT
|| status == TabletStatus.REPLICA_COMPACTION_TOO_SLOW)) {
slotNum -= 2;
}
}
return list;
}
@ -1469,9 +1552,12 @@ public class TabletScheduler extends MasterDaemon {
// if we have a success task, then stat must be refreshed before schedule a new task
updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash());
updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash());
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "finished");
} else {
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
request.getTaskStatus().getErrorMsgs().get(0));
}
// we need this function to free slot for this migration task
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "finished");
return true;
}
@ -1496,12 +1582,20 @@ public class TabletScheduler extends MasterDaemon {
try {
tabletCtx.finishCloneTask(cloneTask, request);
} catch (SchedException e) {
tabletCtx.increaseFailedRunningCounter();
tabletCtx.setErrMsg(e.getMessage());
if (e.getStatus() == Status.RUNNING_FAILED) {
stat.counterCloneTaskFailed.incrementAndGet();
addToRunningTablets(tabletCtx);
return false;
tabletCtx.increaseFailedRunningCounter();
if (!tabletCtx.isExceedFailedRunningLimit()) {
stat.counterCloneTaskFailed.incrementAndGet();
addToRunningTablets(tabletCtx);
return false;
} else {
// unrecoverable
stat.counterTabletScheduledDiscard.incrementAndGet();
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
e.getMessage());
return true;
}
} else if (e.getStatus() == Status.UNRECOVERABLE) {
// unrecoverable
stat.counterTabletScheduledDiscard.incrementAndGet();
@ -1639,7 +1733,7 @@ public class TabletScheduler extends MasterDaemon {
}
public synchronized int getTotalNum() {
return allTabletIds.size();
return allTabletTypes.size();
}
public synchronized long getBalanceTabletsNumber() {
@ -1655,10 +1749,12 @@ public class TabletScheduler extends MasterDaemon {
public static class PathSlot {
// path hash -> slot num
private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
private long beId;
public PathSlot(List<Long> paths, int initSlotNum) {
public PathSlot(List<Long> paths, long beId) {
this.beId = beId;
for (Long pathHash : paths) {
pathSlots.put(pathHash, new Slot(initSlotNum));
pathSlots.put(pathHash, new Slot(beId));
}
}
@ -1670,25 +1766,11 @@ public class TabletScheduler extends MasterDaemon {
// add new path
for (Long pathHash : paths) {
if (!pathSlots.containsKey(pathHash)) {
pathSlots.put(pathHash, new Slot(Config.schedule_slot_num_per_path));
pathSlots.put(pathHash, new Slot(beId));
}
}
}
// Update the total slots num of specified paths, increase or decrease
public synchronized void updateSlot(List<Long> pathHashs, int delta) {
for (Long pathHash : pathHashs) {
Slot slot = pathSlots.get(pathHash);
if (slot == null) {
continue;
}
slot.total += delta;
slot.rectify();
LOG.debug("decrease path {} slots num to {}", pathHash, pathSlots.get(pathHash).total);
}
}
/**
* Update the statistic of specified path
*/
@ -1701,6 +1783,21 @@ public class TabletScheduler extends MasterDaemon {
slot.totalCopyTimeMs += copyTimeMs;
}
public synchronized boolean hasAvailableSlot(long pathHash) {
if (pathHash == -1) {
return false;
}
Slot slot = pathSlots.get(pathHash);
if (slot == null) {
return false;
}
if (slot.getAvailable() == 0) {
return false;
}
return true;
}
/**
* If the specified 'pathHash' has available slot, decrease the slot number and return this path hash
*/
@ -1709,7 +1806,8 @@ public class TabletScheduler extends MasterDaemon {
if (LOG.isDebugEnabled()) {
LOG.debug("path hash is not set.", new Exception());
}
throw new SchedException(Status.SCHEDULE_FAILED, "path hash is not set");
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
"path hash is not set");
}
Slot slot = pathSlots.get(pathHash);
@ -1717,12 +1815,11 @@ public class TabletScheduler extends MasterDaemon {
LOG.debug("path {} is not exist", pathHash);
return -1;
}
slot.rectify();
if (slot.available <= 0) {
if (slot.used >= slot.getTotal()) {
LOG.debug("path {} has no available slot", pathHash);
return -1;
}
slot.available--;
slot.used++;
return pathHash;
}
@ -1731,23 +1828,15 @@ public class TabletScheduler extends MasterDaemon {
if (slot == null) {
return;
}
slot.available++;
slot.rectify();
}
public synchronized int peekSlot(long pathHash) {
Slot slot = pathSlots.get(pathHash);
if (slot == null) {
return -1;
if (slot.used > 0) {
slot.used--;
}
slot.rectify();
return slot.available;
}
public synchronized int getTotalAvailSlotNum() {
int total = 0;
for (Slot slot : pathSlots.values()) {
total += slot.available;
total += slot.getAvailable();
}
return total;
}
@ -1758,7 +1847,7 @@ public class TabletScheduler extends MasterDaemon {
public synchronized Set<Long> getAvailPathsForBalance() {
Set<Long> pathHashs = Sets.newHashSet();
for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
if (entry.getValue().balanceSlot > 0) {
if (entry.getValue().getBalanceAvailable() > 0) {
pathHashs.add(entry.getKey());
}
}
@ -1768,7 +1857,7 @@ public class TabletScheduler extends MasterDaemon {
public synchronized int getAvailBalanceSlotNum() {
int num = 0;
for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
num += entry.getValue().balanceSlot;
num += entry.getValue().getBalanceAvailable();
}
return num;
}
@ -1776,13 +1865,12 @@ public class TabletScheduler extends MasterDaemon {
public synchronized List<List<String>> getSlotInfo(long beId) {
List<List<String>> results = Lists.newArrayList();
pathSlots.forEach((key, value) -> {
value.rectify();
List<String> result = Lists.newArrayList();
result.add(String.valueOf(beId));
result.add(String.valueOf(key));
result.add(String.valueOf(value.available));
result.add(String.valueOf(value.total));
result.add(String.valueOf(value.balanceSlot));
result.add(String.valueOf(value.getAvailable()));
result.add(String.valueOf(value.getTotal()));
result.add(String.valueOf(value.getBalanceAvailable()));
result.add(String.valueOf(value.getAvgRate()));
results.add(result);
});
@ -1794,8 +1882,8 @@ public class TabletScheduler extends MasterDaemon {
if (slot == null) {
return -1;
}
if (slot.balanceSlot > 0) {
slot.balanceSlot--;
if (slot.balanceUsed < slot.getBalanceTotal()) {
slot.balanceUsed++;
return pathHash;
}
return -1;
@ -1807,8 +1895,8 @@ public class TabletScheduler extends MasterDaemon {
if (slot == null) {
continue;
}
if (slot.balanceSlot > 0) {
slot.balanceSlot--;
if (slot.balanceUsed < slot.getBalanceTotal()) {
slot.balanceUsed++;
return pathHash;
}
}
@ -1820,8 +1908,9 @@ public class TabletScheduler extends MasterDaemon {
if (slot == null) {
return;
}
slot.balanceSlot++;
slot.rectify();
if (slot.balanceUsed > 0) {
slot.balanceUsed--;
}
}
public synchronized void updateDiskBalanceLastSuccTime(long pathHash) {
@ -1851,10 +1940,8 @@ public class TabletScheduler extends MasterDaemon {
}
public static class Slot {
public int total;
public int available;
// slot reserved for balance
public int balanceSlot;
public int used;
public int balanceUsed;
public long totalCopySize = 0;
public long totalCopyTimeMs = 0;
@ -1862,23 +1949,35 @@ public class TabletScheduler extends MasterDaemon {
// for disk balance
public long diskBalanceLastSuccTime = 0;
public Slot(int total) {
this.total = total;
this.available = total;
this.balanceSlot = Config.balance_slot_num_per_path;
private long beId;
public Slot(long beId) {
this.beId = beId;
this.used = 0;
this.balanceUsed = 0;
}
public void rectify() {
if (total <= 0) {
total = 1;
}
if (available > total) {
available = total;
public int getAvailable() {
return Math.max(0, getTotal() - used);
}
public int getTotal() {
int total = Math.max(1, Config.schedule_slot_num_per_path);
Backend be = Env.getCurrentSystemInfo().getBackend(beId);
if (be != null && be.isDecommissioned()) {
total = Math.max(1, Config.schedule_decommission_slot_num_per_path);
}
if (balanceSlot > Config.balance_slot_num_per_path) {
balanceSlot = Config.balance_slot_num_per_path;
}
return total;
}
public int getBalanceAvailable() {
return Math.max(0, getBalanceTotal() - balanceUsed);
}
public int getBalanceTotal() {
return Math.max(1, Config.balance_slot_num_per_path);
}
// return avg rate, Bytes/S

View File

@ -29,15 +29,15 @@ import com.google.common.collect.Lists;
import java.util.List;
/*
* show proc "/tablet_scheduler/pending_tablets";
* show proc "/tablet_scheduler/running_tablets";
* show proc "/tablet_scheduler/history_tablets";
* show proc "/cluster_balance/pending_tablets";
* show proc "/cluster_balance/running_tablets";
* show proc "/cluster_balance/history_tablets";
*/
public class TabletSchedulerDetailProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("TabletId")
.add("Type").add("Medium").add("Status").add("State").add("OrigPrio").add("DynmPrio").add("SrcBe")
.add("Type").add("Medium").add("Status").add("State").add("SchedCode").add("Priority").add("SrcBe")
.add("SrcPath").add("DestBe").add("DestPath").add("Timeout").add("Create").add("LstSched").add("LstVisit")
.add("Finished").add("Rate").add("FailedSched").add("FailedRunning").add("LstAdjPrio").add("VisibleVer")
.add("Finished").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer")
.add("CmtVer").add("ErrMsg")
.build();

View File

@ -39,17 +39,17 @@ public class TabletSchedCtxTest {
ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
TabletSchedCtx ctx1 = new TabletSchedCtx(Type.REPAIR,
1, 2, 3, 4, 1000, replicaAlloc, System.currentTimeMillis());
ctx1.setOrigPriority(Priority.NORMAL);
ctx1.setPriority(Priority.NORMAL);
ctx1.setLastVisitedTime(2);
TabletSchedCtx ctx2 = new TabletSchedCtx(Type.REPAIR,
1, 2, 3, 4, 1001, replicaAlloc, System.currentTimeMillis());
ctx2.setOrigPriority(Priority.NORMAL);
ctx2.setPriority(Priority.NORMAL);
ctx2.setLastVisitedTime(3);
TabletSchedCtx ctx3 = new TabletSchedCtx(Type.REPAIR,
1, 2, 3, 4, 1001, replicaAlloc, System.currentTimeMillis());
ctx3.setOrigPriority(Priority.NORMAL);
1, 2, 3, 4, 1002, replicaAlloc, System.currentTimeMillis());
ctx3.setPriority(Priority.NORMAL);
ctx3.setLastVisitedTime(1);
pendingTablets.add(ctx1);
@ -62,8 +62,8 @@ public class TabletSchedCtxTest {
// priority is not equal, info2 is HIGH, should ranks ahead
pendingTablets.clear();
ctx1.setOrigPriority(Priority.NORMAL);
ctx2.setOrigPriority(Priority.HIGH);
ctx1.setPriority(Priority.NORMAL);
ctx2.setPriority(Priority.HIGH);
ctx1.setLastVisitedTime(2);
ctx2.setLastVisitedTime(2);
pendingTablets.add(ctx2);

View File

@ -33,7 +33,6 @@ import org.junit.jupiter.api.Test;
import java.util.List;
public class DecommissionBackendTest extends TestWithFeService {
@Override
protected int backendNum() {
return 3;
@ -42,6 +41,7 @@ public class DecommissionBackendTest extends TestWithFeService {
@Override
protected void beforeCluster() {
FeConstants.runningUnitTest = true;
needCleanDir = false;
}
@BeforeAll

View File

@ -119,6 +119,7 @@ public abstract class TestWithFeService {
protected String dorisHome;
protected String runningDir = "fe/mocked/" + getClass().getSimpleName() + "/" + UUID.randomUUID() + "/";
protected ConnectContext connectContext;
protected boolean needCleanDir = true;
protected static final String DEFAULT_CLUSTER_PREFIX = "default_cluster:";
@ -140,7 +141,9 @@ public abstract class TestWithFeService {
runAfterAll();
Env.getCurrentEnv().clear();
StatementScopeIdGenerator.clear();
cleanDorisFeDir();
if (needCleanDir) {
cleanDorisFeDir();
}
}
@BeforeEach