[improvement](tablet scheduler) fix higher priority tablet add failed due to pending queue full #41076 (#41268)

cherry pick from #41076
This commit is contained in:
yujun
2024-09-26 22:31:20 +08:00
committed by GitHub
parent d875b026b9
commit d89e5de815
5 changed files with 77 additions and 19 deletions

View File

@ -561,15 +561,17 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite);
AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */);
if (res == AddResult.LIMIT_EXCEED || res == AddResult.DISABLED) {
if (res == AddResult.DISABLED) {
// tablet in scheduler exceed limit, or scheduler is disabled,
// skip this group and check next one.
LOG.info("tablet scheduler return: {}. stop colocate table check", res.name());
break OUT;
} else if (res == AddResult.ADDED) {
counter.addToSchedulerTabletNum++;
} else {
} else if (res == AddResult.ALREADY_IN) {
counter.tabletInScheduler++;
} else if (res == AddResult.REPLACE_ADDED || res == AddResult.LIMIT_EXCEED) {
counter.tabletExceedLimit++;
}
}
}
@ -589,9 +591,10 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
} // end for groups
long cost = System.currentTimeMillis() - start;
LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready: {}/{}/{}/{}/{}, cost: {} ms",
LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{}, "
+ "cost: {} ms",
counter.unhealthyTabletNum, counter.totalTabletNum, counter.addToSchedulerTabletNum,
counter.tabletInScheduler, counter.tabletNotReady, cost);
counter.tabletInScheduler, counter.tabletNotReady, counter.tabletExceedLimit, cost);
}
private GlobalColocateStatistic buildGlobalColocateStatistic() {

View File

@ -78,6 +78,7 @@ public class TabletChecker extends MasterDaemon {
put("added", new AtomicLong(0L));
put("in_sched", new AtomicLong(0L));
put("not_ready", new AtomicLong(0L));
put("exceed_limit", new AtomicLong(0L));
}
};
@ -224,6 +225,7 @@ public class TabletChecker extends MasterDaemon {
public long addToSchedulerTabletNum = 0;
public long tabletInScheduler = 0;
public long tabletNotReady = 0;
public long tabletExceedLimit = 0;
}
private enum LoopControlStatus {
@ -344,10 +346,12 @@ public class TabletChecker extends MasterDaemon {
tabletCountByStatus.get("added").set(counter.addToSchedulerTabletNum);
tabletCountByStatus.get("in_sched").set(counter.tabletInScheduler);
tabletCountByStatus.get("not_ready").set(counter.tabletNotReady);
tabletCountByStatus.get("exceed_limit").set(counter.tabletExceedLimit);
LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready: {}/{}/{}/{}/{}, cost: {} ms",
LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{},"
+ "cost: {} ms",
counter.unhealthyTabletNum, counter.totalTabletNum, counter.addToSchedulerTabletNum,
counter.tabletInScheduler, counter.tabletNotReady, cost);
counter.tabletInScheduler, counter.tabletNotReady, counter.tabletExceedLimit, cost);
}
private LoopControlStatus handlePartitionTablet(Database db, OlapTable tbl, Partition partition, boolean isInPrios,
@ -404,11 +408,13 @@ public class TabletChecker extends MasterDaemon {
tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite);
AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */);
if (res == AddResult.LIMIT_EXCEED || res == AddResult.DISABLED) {
if (res == AddResult.DISABLED) {
LOG.info("tablet scheduler return: {}. stop tablet checker", res.name());
return LoopControlStatus.BREAK_OUT;
} else if (res == AddResult.ADDED) {
counter.addToSchedulerTabletNum++;
} else if (res == AddResult.REPLACE_ADDED || res == AddResult.LIMIT_EXCEED) {
counter.tabletExceedLimit++;
}
}
} // indices

View File

@ -1334,13 +1334,13 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
if (tabletHealth.aliveAndVersionCompleteNum < (replicaNum / 2 + 1)) {
value -= 3 * baseTime;
if (tabletHealth.hasRecentLoadFailed) {
value -= 3 * baseTime;
value -= 4 * baseTime;
}
}
if (tabletHealth.hasAliveAndVersionIncomplete) {
value -= 1 * baseTime;
if (isUniqKeyMergeOnWrite) {
value -= 1 * baseTime;
value -= 2 * baseTime;
}
}
}

View File

@ -71,6 +71,7 @@ import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import org.apache.logging.log4j.LogManager;
@ -81,7 +82,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
@ -120,7 +120,7 @@ public class TabletScheduler extends MasterDaemon {
*
* pendingTablets, allTabletTypes, runningTablets and schedHistory are protected by 'synchronized'
*/
private PriorityQueue<TabletSchedCtx> pendingTablets = new PriorityQueue<>();
private MinMaxPriorityQueue<TabletSchedCtx> pendingTablets = MinMaxPriorityQueue.create();
private Map<Long, TabletSchedCtx.Type> allTabletTypes = Maps.newHashMap();
// contains all tabletCtxs which state are RUNNING
private Map<Long, TabletSchedCtx> runningTablets = Maps.newHashMap();
@ -149,6 +149,7 @@ public class TabletScheduler extends MasterDaemon {
ADDED, // success to add
ALREADY_IN, // already added, skip
LIMIT_EXCEED, // number of pending tablets exceed the limit
REPLACE_ADDED, // succ to add, and envit a lowest task
DISABLED // scheduler has been disabled.
}
@ -268,12 +269,22 @@ public class TabletScheduler extends MasterDaemon {
return AddResult.ALREADY_IN;
}
AddResult addResult = AddResult.ADDED;
// if this is not a force add,
// and number of scheduling tablets exceed the limit,
// refuse to add.
if (!force && (pendingTablets.size() > Config.max_scheduling_tablets
|| runningTablets.size() > Config.max_scheduling_tablets)) {
return AddResult.LIMIT_EXCEED;
if (!force && (pendingTablets.size() >= Config.max_scheduling_tablets
|| runningTablets.size() >= Config.max_scheduling_tablets)) {
// For a sched tablet, if its compare value is bigger, it will be more close to queue's tail position,
// and its priority is lower.
TabletSchedCtx lowestPriorityTablet = pendingTablets.peekLast();
if (lowestPriorityTablet == null || lowestPriorityTablet.compareTo(tablet) <= 0) {
return AddResult.LIMIT_EXCEED;
}
addResult = AddResult.REPLACE_ADDED;
pendingTablets.pollLast();
finalizeTabletCtx(lowestPriorityTablet, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
"envit lower priority sched tablet because pending queue is full");
}
if (!contains || tablet.getType() == TabletSchedCtx.Type.REPAIR) {
@ -285,7 +296,7 @@ public class TabletScheduler extends MasterDaemon {
LOG.info("Add tablet to pending queue, {}", tablet);
}
return AddResult.ADDED;
return addResult;
}
@ -306,11 +317,12 @@ public class TabletScheduler extends MasterDaemon {
* Iterate current tablets, change their priority to VERY_HIGH if necessary.
*/
public synchronized void changeTabletsPriorityToVeryHigh(long dbId, long tblId, List<Long> partitionIds) {
PriorityQueue<TabletSchedCtx> newPendingTablets = new PriorityQueue<>();
MinMaxPriorityQueue<TabletSchedCtx> newPendingTablets = MinMaxPriorityQueue.create();
for (TabletSchedCtx tabletCtx : pendingTablets) {
if (tabletCtx.getDbId() == dbId && tabletCtx.getTblId() == tblId
&& partitionIds.contains(tabletCtx.getPartitionId())) {
tabletCtx.setPriority(Priority.VERY_HIGH);
tabletCtx.setLastVisitedTime(1L);
}
newPendingTablets.add(tabletCtx);
}
@ -1745,7 +1757,7 @@ public class TabletScheduler extends MasterDaemon {
slotNum = 1;
}
while (list.size() < Config.schedule_batch_size && slotNum > 0) {
TabletSchedCtx tablet = pendingTablets.poll();
TabletSchedCtx tablet = pendingTablets.pollFirst();
if (tablet == null) {
// no more tablets
break;
@ -1947,6 +1959,11 @@ public class TabletScheduler extends MasterDaemon {
});
}
// only use for fe ut
public MinMaxPriorityQueue<TabletSchedCtx> getPendingTabletQueue() {
return pendingTablets;
}
public List<List<String>> getPendingTabletsInfo(int limit) {
return collectTabletCtx(getPendingTablets(limit));
}

View File

@ -17,25 +17,57 @@
package org.apache.doris.clone;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletSchedCtx.Type;
import org.apache.doris.common.Config;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.PriorityQueue;
public class TabletSchedCtxTest {
@Test
public void testAddTablet() {
List<TabletSchedCtx> tablets = Lists.newArrayList();
ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
for (long i = 0; i < 20; i++) {
tablets.add(new TabletSchedCtx(Type.REPAIR, 1, 2, 3, 4,
i, replicaAlloc, i));
tablets.add(new TabletSchedCtx(Type.BALANCE, 1, 2, 3, 4,
1000 + i, replicaAlloc, i));
}
Collections.shuffle(tablets);
Config.max_scheduling_tablets = 5;
TabletScheduler scheduler = Env.getCurrentEnv().getTabletScheduler();
for (TabletSchedCtx tablet : tablets) {
scheduler.addTablet(tablet, false);
}
MinMaxPriorityQueue<TabletSchedCtx> queue = scheduler.getPendingTabletQueue();
List<TabletSchedCtx> gotTablets = Lists.newArrayList();
while (!queue.isEmpty()) {
gotTablets.add(queue.pollFirst());
}
Assert.assertEquals(Config.max_scheduling_tablets, gotTablets.size());
for (int i = 0; i < gotTablets.size(); i++) {
TabletSchedCtx tablet = gotTablets.get(i);
Assert.assertEquals(Type.REPAIR, tablet.getType());
Assert.assertEquals((long) i, tablet.getCreateTime());
}
}
@Test
public void testPriorityCompare() {
// equal priority, but info3's last visit time is earlier than info2 and info1, so info1 should ranks ahead
PriorityQueue<TabletSchedCtx> pendingTablets = new PriorityQueue<>();
MinMaxPriorityQueue<TabletSchedCtx> pendingTablets = MinMaxPriorityQueue.create();
ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
TabletSchedCtx ctx1 = new TabletSchedCtx(Type.REPAIR,
1, 2, 3, 4, 1000, replicaAlloc, System.currentTimeMillis());