Cherry-picked from #45298 Co-authored-by: deardeng <dengxin@selectdb.com>
This commit is contained in:
committed by
GitHub
parent
1314a2b942
commit
6d6473efae
@ -62,6 +62,7 @@ import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/*
|
||||
* TabletSchedCtx contains all information which is created during tablet scheduler processing.
|
||||
@ -69,28 +70,6 @@ import java.util.Set;
|
||||
public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
private static final Logger LOG = LogManager.getLogger(TabletSchedCtx.class);
|
||||
|
||||
/*
|
||||
* SCHED_FAILED_COUNTER_THRESHOLD:
|
||||
* threshold of times a tablet failed to be scheduled
|
||||
*
|
||||
* MIN_ADJUST_PRIORITY_INTERVAL_MS:
|
||||
* min interval time of adjusting a tablet's priority
|
||||
*
|
||||
* MAX_NOT_BEING_SCHEDULED_INTERVAL_MS:
|
||||
* max gap time of a tablet NOT being scheduled.
|
||||
*
|
||||
* These 3 params is for adjusting priority.
|
||||
* If a tablet being scheduled failed for more than SCHED_FAILED_COUNTER_THRESHOLD times, its priority
|
||||
* will be downgraded. And the interval between adjustment is larger than MIN_ADJUST_PRIORITY_INTERVAL_MS,
|
||||
* to avoid being downgraded too soon.
|
||||
* And if a tablet is not being scheduled longer than MAX_NOT_BEING_SCHEDULED_INTERVAL_MS, its priority
|
||||
* will be upgraded, to avoid starvation.
|
||||
*
|
||||
*/
|
||||
private static final int SCHED_FAILED_COUNTER_THRESHOLD = 5;
|
||||
private static final long MIN_ADJUST_PRIORITY_INTERVAL_MS = 5 * 60 * 1000L; // 5 min
|
||||
private static final long MAX_NOT_BEING_SCHEDULED_INTERVAL_MS = 30 * 60 * 1000L; // 30 min
|
||||
|
||||
/*
|
||||
* A clone task timeout is between Config.min_clone_task_timeout_sec and Config.max_clone_task_timeout_sec,
|
||||
* estimated by tablet size / MIN_CLONE_SPEED_MB_PER_SECOND.
|
||||
@ -450,10 +429,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
schedFailedCode = code;
|
||||
}
|
||||
|
||||
public CloneTask getCloneTask() {
|
||||
return cloneTask;
|
||||
}
|
||||
|
||||
public long getCopySize() {
|
||||
return copySize;
|
||||
}
|
||||
@ -932,12 +907,14 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
}
|
||||
if (cloneTask != null) {
|
||||
AgentTaskQueue.removeTask(cloneTask.getBackendId(), TTaskType.CLONE, cloneTask.getSignature());
|
||||
cloneTask = null;
|
||||
|
||||
// clear all CLONE replicas
|
||||
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
|
||||
if (db != null) {
|
||||
Table table = db.getTableNullable(tblId);
|
||||
if (table != null && table.writeLockIfExist()) {
|
||||
// try get table write lock, if failed TabletScheduler will try next time
|
||||
if (table != null && table.tryWriteLockIfExist(Table.TRY_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
|
||||
try {
|
||||
List<Replica> cloneReplicas = Lists.newArrayList();
|
||||
tablet.getReplicas().stream().filter(r -> r.getState() == ReplicaState.CLONE).forEach(r -> {
|
||||
|
||||
@ -104,9 +104,6 @@ import java.util.stream.Collectors;
|
||||
public class TabletScheduler extends MasterDaemon {
|
||||
private static final Logger LOG = LogManager.getLogger(TabletScheduler.class);
|
||||
|
||||
// handle at most BATCH_NUM tablets in one loop
|
||||
private static final int MIN_BATCH_NUM = 50;
|
||||
|
||||
// the minimum interval of updating cluster statistics and priority of tablet info
|
||||
private static final long STAT_UPDATE_INTERVAL_MS = 20 * 1000; // 20s
|
||||
|
||||
@ -150,7 +147,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
ADDED, // success to add
|
||||
ALREADY_IN, // already added, skip
|
||||
LIMIT_EXCEED, // number of pending tablets exceed the limit
|
||||
REPLACE_ADDED, // succ to add, and envit a lowest task
|
||||
REPLACE_ADDED, // succ to add, and evict a lowest task
|
||||
DISABLED // scheduler has been disabled.
|
||||
}
|
||||
|
||||
@ -285,7 +282,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
addResult = AddResult.REPLACE_ADDED;
|
||||
pendingTablets.pollLast();
|
||||
finalizeTabletCtx(lowestPriorityTablet, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
|
||||
"envit lower priority sched tablet because pending queue is full");
|
||||
"evict lower priority sched tablet because pending queue is full");
|
||||
}
|
||||
|
||||
if (!contains || tablet.getType() == TabletSchedCtx.Type.REPAIR) {
|
||||
@ -1845,9 +1842,9 @@ public class TabletScheduler extends MasterDaemon {
|
||||
tabletCtx.increaseFailedRunningCounter();
|
||||
if (!tabletCtx.isExceedFailedRunningLimit()) {
|
||||
stat.counterCloneTaskFailed.incrementAndGet();
|
||||
tabletCtx.setState(TabletSchedCtx.State.PENDING);
|
||||
tabletCtx.releaseResource(this);
|
||||
tabletCtx.resetFailedSchedCounter();
|
||||
tabletCtx.setState(TabletSchedCtx.State.PENDING);
|
||||
addBackToPendingTablets(tabletCtx);
|
||||
return false;
|
||||
} else {
|
||||
|
||||
@ -40,12 +40,14 @@ import org.apache.doris.utframe.TestWithFeService;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class TabletHealthTest extends TestWithFeService {
|
||||
@ -78,6 +80,8 @@ public class TabletHealthTest extends TestWithFeService {
|
||||
|
||||
@Override
|
||||
protected void runBeforeEach() throws Exception {
|
||||
// set back to default value
|
||||
Config.max_scheduling_tablets = 2000;
|
||||
for (Table table : db.getTables()) {
|
||||
dropTable(table.getName(), true);
|
||||
}
|
||||
@ -358,4 +362,52 @@ public class TabletHealthTest extends TestWithFeService {
|
||||
|
||||
dropTable(table.getName(), true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddTabletNoDeadLock() throws Exception {
|
||||
Config.max_scheduling_tablets = 1;
|
||||
createTable("CREATE TABLE tbl3 (k INT) DISTRIBUTED BY HASH(k) BUCKETS 2"
|
||||
+ " PROPERTIES ('replication_num' = '3')");
|
||||
DebugPointUtil.addDebugPoint("MockedBackendFactory.handleCloneTablet.failed");
|
||||
OlapTable table = (OlapTable) db.getTableOrMetaException("tbl3");
|
||||
Partition partition = table.getPartitions().iterator().next();
|
||||
List<Tablet> tablets = partition.getMaterializedIndices(IndexExtState.ALL).iterator().next().getTablets();
|
||||
Assertions.assertEquals(2, tablets.size());
|
||||
|
||||
partition.updateVisibleVersion(10L);
|
||||
tablets.forEach(tablet -> tablet.getReplicas().forEach(replica -> replica.updateVersion(10)));
|
||||
|
||||
Tablet tabletA = tablets.get(0);
|
||||
Tablet tabletB = tablets.get(1);
|
||||
TabletScheduler scheduler = Env.getCurrentEnv().getTabletScheduler();
|
||||
tabletA.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L);
|
||||
checkTabletStatus(tabletA, TabletStatus.VERSION_INCOMPLETE, table, partition);
|
||||
Env.getCurrentEnv().getTabletChecker().runAfterCatalogReady();
|
||||
Env.getCurrentEnv().getTabletScheduler().runAfterCatalogReady();
|
||||
Thread.sleep(1000);
|
||||
MinMaxPriorityQueue<TabletSchedCtx> queue = scheduler.getPendingTabletQueue();
|
||||
TabletSchedCtx tabletACtx = queue.peekFirst();
|
||||
Assertions.assertNotNull(tabletACtx);
|
||||
tabletACtx.setLastVisitedTime(System.currentTimeMillis() + 3600 * 1000L);
|
||||
tabletB.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L);
|
||||
checkTabletStatus(tabletB, TabletStatus.VERSION_INCOMPLETE, table, partition);
|
||||
Thread thread = new Thread(() -> {
|
||||
try {
|
||||
Env.getCurrentEnv().getTabletChecker().runAfterCatalogReady();
|
||||
Env.getCurrentEnv().getTabletScheduler().runAfterCatalogReady();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
thread.start();
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertTrue(table.tryWriteLock(2, TimeUnit.SECONDS));
|
||||
table.writeUnlock();
|
||||
DebugPointUtil.clearDebugPoints();
|
||||
doRepair();
|
||||
Thread.sleep(1000);
|
||||
doRepair();
|
||||
checkTabletIsHealth(tabletA, table, partition);
|
||||
checkTabletIsHealth(tabletB, table, partition);
|
||||
}
|
||||
}
|
||||
|
||||
@ -83,6 +83,7 @@ import io.grpc.stub.StreamObserver;
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
@ -293,6 +294,10 @@ public class MockedBackendFactory {
|
||||
tabletInfo.setPathHash(pathHash);
|
||||
tabletInfo.setUsed(true);
|
||||
tabletInfos.add(tabletInfo);
|
||||
if (DebugPointUtil.isEnable("MockedBackendFactory.handleCloneTablet.failed")) {
|
||||
finishTaskRequest.setTaskStatus(new TStatus(TStatusCode.CANCELLED));
|
||||
finishTaskRequest.getTaskStatus().setErrorMsgs(Collections.singletonList("debug point set"));
|
||||
}
|
||||
finishTaskRequest.setFinishTabletInfos(tabletInfos);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user