[fix](tablet clone) fix not add colocate replica and print some logs #22378

This commit is contained in:
yujun
2023-08-04 14:09:02 +08:00
committed by GitHub
parent 24c1953e91
commit 3d5b90befe
6 changed files with 51 additions and 17 deletions

View File

@ -29,6 +29,7 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.clone.TabletChecker.CheckerCounter;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.AddResult;
import org.apache.doris.common.Config;
@ -201,6 +202,9 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
* If every replicas match the backends in group, mark that group as stable.
*/
private void matchGroup() {
long start = System.currentTimeMillis();
CheckerCounter counter = new CheckerCounter();
Env env = Env.getCurrentEnv();
SystemInfoService infoService = Env.getCurrentSystemInfo();
ColocateTableIndex colocateIndex = env.getColocateTableIndex();
@ -244,6 +248,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
backendBucketsSeq.size() + " vs. " + index.getTablets().size());
int idx = 0;
for (Long tabletId : index.getTabletIdsInOrder()) {
counter.totalTabletNum++;
Set<Long> bucketsSeq = backendBucketsSeq.get(idx);
Preconditions.checkState(bucketsSeq.size() == replicationNum,
bucketsSeq.size() + " vs. " + replicationNum);
@ -251,11 +256,13 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
TabletStatus st = tablet.getColocateHealthStatus(
visibleVersion, replicaAlloc, bucketsSeq);
if (st != TabletStatus.HEALTHY) {
counter.unhealthyTabletNum++;
unstableReason = String.format("get unhealthy tablet %d in colocate table."
+ " status: %s", tablet.getId(), st);
LOG.debug(unstableReason);
if (!tablet.readyToBeRepaired(infoService, Priority.NORMAL)) {
counter.tabletNotReady++;
continue;
}
@ -275,6 +282,10 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
// skip this group and check next one.
LOG.info("tablet scheduler return: {}. stop colocate table check", res.name());
break OUT;
} else if (res == AddResult.ADDED) {
counter.addToSchedulerTabletNum++;
} else {
counter.tabletInScheduler++;
}
}
idx++;
@ -293,6 +304,11 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
colocateIndex.markGroupUnstable(groupId, unstableReason, true);
}
} // end for groups
long cost = System.currentTimeMillis() - start;
LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready: {}/{}/{}/{}/{}, cost: {} ms",
counter.unhealthyTabletNum, counter.totalTabletNum, counter.addToSchedulerTabletNum,
counter.tabletInScheduler, counter.tabletNotReady, cost);
}
/*

View File

@ -214,7 +214,7 @@ public class TabletChecker extends MasterDaemon {
LOG.debug(stat.incrementalBrief());
}
private static class CheckerCounter {
public static class CheckerCounter {
public long totalTabletNum = 0;
public long unhealthyTabletNum = 0;
public long addToSchedulerTabletNum = 0;

View File

@ -127,7 +127,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
RUNNING, // tablet is being scheduled
FINISHED, // task is finished
CANCELLED, // task is failed
TIMEOUT, // task is timeout
UNEXPECTED // other unexpected errors
}
@ -656,7 +655,9 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
*/
public void chooseDestReplicaForVersionIncomplete(Map<Long, PathSlot> backendsWorkingSlots)
throws SchedException {
List<Replica> candidates = Lists.newArrayList();
List<Replica> decommissionCand = Lists.newArrayList();
List<Replica> colocateCand = Lists.newArrayList();
List<Replica> notColocateCand = Lists.newArrayList();
for (Replica replica : tablet.getReplicas()) {
if (replica.isBad()) {
LOG.debug("replica {} is bad, skip. tablet: {}",
@ -671,21 +672,35 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
continue;
}
// check version and replica state.
// if the replica's state is DECOMMISSION, it may be chose as dest replica,
// and its state will be set to NORMAL later.
// not enough version completed replicas, then try add back the decommission replica.
if (replica.getState() == ReplicaState.DECOMMISSION) {
decommissionCand.add(replica);
continue;
}
if (replica.getLastFailedVersion() <= 0
&& replica.getVersion() >= visibleVersion
&& replica.getState() != ReplicaState.DECOMMISSION) {
&& replica.getVersion() >= visibleVersion) {
// skip healthy replica
LOG.debug("replica {} version {} is healthy, visible version {}, replica state {}, skip. tablet: {}",
replica.getId(), replica.getVersion(), visibleVersion, replica.getState(), tabletId);
continue;
}
candidates.add(replica);
if (colocateBackendsSet != null && colocateBackendsSet.contains(replica.getBackendId())) {
colocateCand.add(replica);
} else {
notColocateCand.add(replica);
}
}
List<Replica> candidates = null;
if (!colocateCand.isEmpty()) {
candidates = colocateCand;
} else if (!notColocateCand.isEmpty()) {
candidates = notColocateCand;
} else {
candidates = decommissionCand;
}
if (candidates.isEmpty()) {
throw new SchedException(Status.UNRECOVERABLE, "unable to choose dest replica");
}

View File

@ -1483,6 +1483,7 @@ public class TabletScheduler extends MasterDaemon {
TabletStatus st = tablet.getColocateHealthStatus(
partition.getVisibleVersion(), replicaAlloc, backendsSet);
statusPair = Pair.of(st, Priority.HIGH);
tabletCtx.setColocateGroupBackendIds(backendsSet);
} else {
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
statusPair = tablet.getHealthStatusWithPriority(
@ -1525,7 +1526,7 @@ public class TabletScheduler extends MasterDaemon {
runningTablets.remove(tabletCtx.getTabletId());
allTabletTypes.remove(tabletCtx.getTabletId());
schedHistory.add(tabletCtx);
LOG.info("remove the tablet {}. because: {}", tabletCtx.getTabletId(), reason);
LOG.info("remove the tablet {}. because: {}", tabletCtx, reason);
}
// get next batch of tablets from queue.
@ -1703,10 +1704,6 @@ public class TabletScheduler extends MasterDaemon {
List<TabletSchedCtx> timeoutTablets = Lists.newArrayList();
synchronized (this) {
runningTablets.values().stream().filter(TabletSchedCtx::isTimeout).forEach(timeoutTablets::add);
for (TabletSchedCtx tabletSchedCtx : timeoutTablets) {
removeTabletCtx(tabletSchedCtx, "timeout");
}
}
// 2. release ctx
@ -1714,7 +1711,7 @@ public class TabletScheduler extends MasterDaemon {
// Set "resetReplicaState" to true because
// the timeout task should also be considered as UNRECOVERABLE,
// so need to reset replica state.
releaseTabletCtx(t, TabletSchedCtx.State.CANCELLED, true);
finalizeTabletCtx(t, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE, "timeout");
stat.counterCloneTaskTimeout.incrementAndGet();
});
}

View File

@ -1163,6 +1163,7 @@ public class ReportHandler extends Daemon {
// colocate table will delete Replica in meta when balance
// but we need to rely on MetaNotFoundException to decide whether delete the tablet in backend
// if the tablet is healthy, delete it.
boolean isColocateBackend = false;
ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
if (colocateTableIndex.isColocateTable(olapTable.getId())) {
ColocateTableIndex.GroupId groupId = colocateTableIndex.getGroup(tableId);
@ -1176,6 +1177,10 @@ public class ReportHandler extends Daemon {
if (status == TabletStatus.HEALTHY) {
return false;
}
if (backendsSet.contains(backendId)) {
isColocateBackend = true;
}
}
SystemInfoService infoService = Env.getCurrentSystemInfo();
@ -1183,7 +1188,8 @@ public class ReportHandler extends Daemon {
Pair<TabletStatus, TabletSchedCtx.Priority> status = tablet.getHealthStatusWithPriority(infoService,
visibleVersion, replicaAlloc, aliveBeIds);
if (status.first == TabletStatus.VERSION_INCOMPLETE || status.first == TabletStatus.REPLICA_MISSING
if (isColocateBackend || status.first == TabletStatus.VERSION_INCOMPLETE
|| status.first == TabletStatus.REPLICA_MISSING
|| status.first == TabletStatus.UNRECOVERABLE) {
long lastFailedVersion = -1L;

View File

@ -98,7 +98,7 @@ public class CloneTask extends AgentTask {
sb.append("tablet id: ").append(tabletId).append(", replica id: ").append(replicaId).append(", schema hash: ")
.append(schemaHash);
sb.append(", storageMedium: ").append(storageMedium.name());
sb.append(", visible version(hash): ").append(visibleVersion);
sb.append(", visible version: ").append(visibleVersion);
sb.append(", src backend: ").append(srcBackends.get(0).getHost())
.append(", src path hash: ").append(srcPathHash);
sb.append(", src backend: ").append(srcBackends.get(0).getHost()).append(", src path hash: ")