cherry pick from #38528
This commit is contained in:
@ -34,7 +34,6 @@ import org.apache.doris.catalog.MaterializedIndex.IndexState;
|
||||
import org.apache.doris.catalog.Partition.PartitionState;
|
||||
import org.apache.doris.catalog.Replica.ReplicaState;
|
||||
import org.apache.doris.catalog.Tablet.TabletStatus;
|
||||
import org.apache.doris.clone.TabletSchedCtx;
|
||||
import org.apache.doris.clone.TabletScheduler;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
@ -1841,11 +1840,11 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
|
||||
return false;
|
||||
}
|
||||
|
||||
Pair<TabletStatus, TabletSchedCtx.Priority> statusPair = tablet.getHealthStatusWithPriority(
|
||||
infoService, visibleVersion, replicaAlloc, aliveBeIds);
|
||||
if (statusPair.first != TabletStatus.HEALTHY) {
|
||||
TabletStatus status = tablet.getHealth(infoService, visibleVersion,
|
||||
replicaAlloc, aliveBeIds).status;
|
||||
if (status != TabletStatus.HEALTHY) {
|
||||
LOG.info("table {} is not stable because tablet {} status is {}. replicas: {}",
|
||||
id, tablet.getId(), statusPair.first, tablet.getReplicas());
|
||||
id, tablet.getId(), status, tablet.getReplicas());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@ -2482,6 +2481,10 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
|
||||
return tableProperty.getEnableUniqueKeyMergeOnWrite();
|
||||
}
|
||||
|
||||
public boolean isUniqKeyMergeOnWrite() {
|
||||
return getKeysType() == KeysType.UNIQUE_KEYS && getEnableUniqueKeyMergeOnWrite();
|
||||
}
|
||||
|
||||
public boolean isDuplicateWithoutKey() {
|
||||
return getKeysType() == KeysType.DUP_KEYS && getKeysNum() == 0;
|
||||
}
|
||||
@ -2573,8 +2576,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
|
||||
}
|
||||
|
||||
public boolean isDupKeysOrMergeOnWrite() {
|
||||
return keysType == KeysType.DUP_KEYS
|
||||
|| (keysType == KeysType.UNIQUE_KEYS && getEnableUniqueKeyMergeOnWrite());
|
||||
return keysType == KeysType.DUP_KEYS || isUniqKeyMergeOnWrite();
|
||||
}
|
||||
|
||||
public void initAutoIncrementGenerator(long dbId) {
|
||||
|
||||
@ -81,6 +81,36 @@ public class Tablet extends MetaObject implements Writable {
|
||||
REPLICA_COMPACTION_TOO_SLOW // one replica's version count is much more than other replicas;
|
||||
}
|
||||
|
||||
public static class TabletHealth {
|
||||
public TabletStatus status;
|
||||
public TabletSchedCtx.Priority priority;
|
||||
|
||||
// num of alive replica with version complete
|
||||
public int aliveAndVersionCompleteNum;
|
||||
|
||||
// NEED_FURTHER_REPAIR replica id
|
||||
public long needFurtherRepairReplicaId;
|
||||
|
||||
// has alive replica with version incomplete, prior to repair these replica
|
||||
public boolean hasAliveAndVersionIncomplete;
|
||||
|
||||
// this tablet recent write failed, then increase its sched priority
|
||||
public boolean hasRecentLoadFailed;
|
||||
|
||||
// this tablet want to add new replica, but not found target backend.
|
||||
public boolean noPathForNewReplica;
|
||||
|
||||
public TabletHealth() {
|
||||
status = null; // don't set for balance task
|
||||
priority = TabletSchedCtx.Priority.NORMAL;
|
||||
aliveAndVersionCompleteNum = 0;
|
||||
needFurtherRepairReplicaId = -1L;
|
||||
hasAliveAndVersionIncomplete = false;
|
||||
hasRecentLoadFailed = false;
|
||||
noPathForNewReplica = false;
|
||||
}
|
||||
}
|
||||
|
||||
@SerializedName(value = "id")
|
||||
private long id;
|
||||
@SerializedName(value = "replicas")
|
||||
@ -104,6 +134,16 @@ public class Tablet extends MetaObject implements Writable {
|
||||
// no need to persist
|
||||
private long lastStatusCheckTime = -1;
|
||||
|
||||
// last time for load data fail
|
||||
private long lastLoadFailedTime = -1;
|
||||
|
||||
// if tablet want to add a new replica, but cann't found any backend to locate the new replica.
|
||||
// then mark this tablet. For later repair, even try and try to repair this tablet, sched will always fail.
|
||||
// For example, 1 tablet contains 3 replicas, if 1 backend is dead, then tablet's healthy status
|
||||
// is REPLICA_MISSING. But since no other backend can held the new replica, then sched always fail.
|
||||
// So don't increase this tablet's sched priority if it has no path for new replica.
|
||||
private long lastTimeNoPathForNewReplica = -1;
|
||||
|
||||
public Tablet() {
|
||||
this(0L, new ArrayList<>());
|
||||
}
|
||||
@ -466,10 +506,8 @@ public class Tablet extends MetaObject implements Writable {
|
||||
* 1. healthy replica num is equal to replicationNum
|
||||
* 2. all healthy replicas are in right tag
|
||||
*/
|
||||
public Pair<TabletStatus, TabletSchedCtx.Priority> getHealthStatusWithPriority(SystemInfoService systemInfoService,
|
||||
public TabletHealth getHealth(SystemInfoService systemInfoService,
|
||||
long visibleVersion, ReplicaAllocation replicaAlloc, List<Long> aliveBeIds) {
|
||||
|
||||
|
||||
Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
|
||||
Map<Tag, Short> stableAllocMap = Maps.newHashMap();
|
||||
Map<Tag, Short> stableVersionCompleteAllocMap = Maps.newHashMap();
|
||||
@ -480,16 +518,12 @@ public class Tablet extends MetaObject implements Writable {
|
||||
int stable = 0;
|
||||
|
||||
Replica needFurtherRepairReplica = null;
|
||||
boolean hasAliveAndVersionIncomplete = false;
|
||||
Set<String> hosts = Sets.newHashSet();
|
||||
ArrayList<Long> versions = new ArrayList<>();
|
||||
for (Replica replica : replicas) {
|
||||
Backend backend = systemInfoService.getBackend(replica.getBackendId());
|
||||
if (backend == null || !backend.isAlive() || !replica.isAlive()
|
||||
|| checkHost(hosts, backend) || replica.tooSlow() || !backend.isMixNode()) {
|
||||
// this replica is not alive,
|
||||
// or if this replica is on same host with another replica, we also treat it as 'dead',
|
||||
// so that Tablet Scheduler will create a new replica on different host.
|
||||
// ATTN: Replicas on same host is a bug of previous Doris version, so we fix it by this way.
|
||||
if (!isReplicaAndBackendAlive(replica, backend, hosts)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -514,13 +548,30 @@ public class Tablet extends MetaObject implements Writable {
|
||||
|
||||
allocNum = stableVersionCompleteAllocMap.getOrDefault(backend.getLocationTag(), (short) 0);
|
||||
stableVersionCompleteAllocMap.put(backend.getLocationTag(), (short) (allocNum + 1));
|
||||
} else {
|
||||
hasAliveAndVersionIncomplete = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TabletHealth tabletHealth = new TabletHealth();
|
||||
initTabletHealth(tabletHealth);
|
||||
tabletHealth.aliveAndVersionCompleteNum = aliveAndVersionComplete;
|
||||
tabletHealth.hasAliveAndVersionIncomplete = hasAliveAndVersionIncomplete;
|
||||
if (needFurtherRepairReplica != null) {
|
||||
tabletHealth.needFurtherRepairReplicaId = needFurtherRepairReplica.getId();
|
||||
}
|
||||
|
||||
// 0. We can not choose a good replica as src to repair this tablet.
|
||||
if (aliveAndVersionComplete == 0) {
|
||||
return Pair.of(TabletStatus.UNRECOVERABLE, Priority.VERY_HIGH);
|
||||
tabletHealth.status = TabletStatus.UNRECOVERABLE;
|
||||
return tabletHealth;
|
||||
} else if (aliveAndVersionComplete < replicationNum && hasAliveAndVersionIncomplete) {
|
||||
// not enough good replica, and there exists schedule available replicas and version incomplete,
|
||||
// no matter whether they tag is proper right, fix them immediately.
|
||||
tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
|
||||
tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
|
||||
return tabletHealth;
|
||||
}
|
||||
|
||||
// 1. alive replicas are not enough
|
||||
@ -536,24 +587,32 @@ public class Tablet extends MetaObject implements Writable {
|
||||
// 3. aliveBackendsNum >= replicationNum: make sure after deleting,
|
||||
// there will be at least one backend for new replica.
|
||||
// 4. replicationNum > 1: if replication num is set to 1, do not delete any replica, for safety reason
|
||||
return Pair.of(TabletStatus.FORCE_REDUNDANT, TabletSchedCtx.Priority.VERY_HIGH);
|
||||
} else if (alive < (replicationNum / 2) + 1) {
|
||||
return Pair.of(TabletStatus.REPLICA_MISSING, TabletSchedCtx.Priority.HIGH);
|
||||
tabletHealth.status = TabletStatus.FORCE_REDUNDANT;
|
||||
tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
|
||||
return tabletHealth;
|
||||
} else if (alive < replicationNum) {
|
||||
return Pair.of(TabletStatus.REPLICA_MISSING, TabletSchedCtx.Priority.NORMAL);
|
||||
tabletHealth.status = TabletStatus.REPLICA_MISSING;
|
||||
tabletHealth.priority = alive < (replicationNum / 2) + 1 ? TabletSchedCtx.Priority.VERY_HIGH
|
||||
: TabletSchedCtx.Priority.NORMAL;
|
||||
return tabletHealth;
|
||||
}
|
||||
|
||||
// 2. version complete replicas are not enough
|
||||
if (aliveAndVersionComplete < (replicationNum / 2) + 1) {
|
||||
return Pair.of(TabletStatus.VERSION_INCOMPLETE, TabletSchedCtx.Priority.HIGH);
|
||||
} else if (aliveAndVersionComplete < replicationNum) {
|
||||
return Pair.of(TabletStatus.VERSION_INCOMPLETE, TabletSchedCtx.Priority.NORMAL);
|
||||
if (aliveAndVersionComplete < replicationNum) {
|
||||
tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
|
||||
tabletHealth.priority = alive < (replicationNum / 2) + 1 ? TabletSchedCtx.Priority.HIGH
|
||||
: TabletSchedCtx.Priority.NORMAL;
|
||||
return tabletHealth;
|
||||
} else if (aliveAndVersionComplete > replicationNum) {
|
||||
if (needFurtherRepairReplica != null) {
|
||||
return Pair.of(TabletStatus.NEED_FURTHER_REPAIR, TabletSchedCtx.Priority.HIGH);
|
||||
tabletHealth.status = TabletStatus.NEED_FURTHER_REPAIR;
|
||||
tabletHealth.priority = TabletSchedCtx.Priority.HIGH;
|
||||
} else {
|
||||
// we set REDUNDANT as VERY_HIGH, because delete redundant replicas can free the space quickly.
|
||||
tabletHealth.status = TabletStatus.REDUNDANT;
|
||||
tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
|
||||
}
|
||||
// we set REDUNDANT as VERY_HIGH, because delete redundant replicas can free the space quickly.
|
||||
return Pair.of(TabletStatus.REDUNDANT, TabletSchedCtx.Priority.VERY_HIGH);
|
||||
return tabletHealth;
|
||||
}
|
||||
|
||||
// 3. replica is under relocating
|
||||
@ -564,14 +623,17 @@ public class Tablet extends MetaObject implements Writable {
|
||||
if (replicaBeIds.containsAll(availableBeIds)
|
||||
&& availableBeIds.size() >= replicationNum
|
||||
&& replicationNum > 1) { // No BE can be choose to create a new replica
|
||||
return Pair.of(TabletStatus.FORCE_REDUNDANT,
|
||||
stable < (replicationNum / 2) + 1
|
||||
? TabletSchedCtx.Priority.NORMAL : TabletSchedCtx.Priority.LOW);
|
||||
tabletHealth.status = TabletStatus.FORCE_REDUNDANT;
|
||||
tabletHealth.priority = stable < (replicationNum / 2) + 1
|
||||
? TabletSchedCtx.Priority.NORMAL : TabletSchedCtx.Priority.LOW;
|
||||
return tabletHealth;
|
||||
}
|
||||
if (stable < (replicationNum / 2) + 1) {
|
||||
return Pair.of(TabletStatus.REPLICA_RELOCATING, TabletSchedCtx.Priority.NORMAL);
|
||||
} else if (stable < replicationNum) {
|
||||
return Pair.of(TabletStatus.REPLICA_RELOCATING, TabletSchedCtx.Priority.LOW);
|
||||
|
||||
if (stable < replicationNum) {
|
||||
tabletHealth.status = TabletStatus.REPLICA_RELOCATING;
|
||||
tabletHealth.priority = stable < (replicationNum / 2) + 1 ? TabletSchedCtx.Priority.NORMAL
|
||||
: TabletSchedCtx.Priority.LOW;
|
||||
return tabletHealth;
|
||||
}
|
||||
}
|
||||
|
||||
@ -579,19 +641,25 @@ public class Tablet extends MetaObject implements Writable {
|
||||
for (Map.Entry<Tag, Short> alloc : allocMap.entrySet()) {
|
||||
if (stableVersionCompleteAllocMap.getOrDefault(alloc.getKey(), (short) 0) < alloc.getValue()) {
|
||||
if (stableAllocMap.getOrDefault(alloc.getKey(), (short) 0) >= alloc.getValue()) {
|
||||
return Pair.of(TabletStatus.VERSION_INCOMPLETE, TabletSchedCtx.Priority.NORMAL);
|
||||
tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
|
||||
} else {
|
||||
return Pair.of(TabletStatus.REPLICA_MISSING_FOR_TAG, TabletSchedCtx.Priority.NORMAL);
|
||||
tabletHealth.status = TabletStatus.REPLICA_MISSING_FOR_TAG;
|
||||
}
|
||||
tabletHealth.priority = TabletSchedCtx.Priority.NORMAL;
|
||||
return tabletHealth;
|
||||
}
|
||||
}
|
||||
|
||||
if (replicas.size() > replicationNum) {
|
||||
if (needFurtherRepairReplica != null) {
|
||||
return Pair.of(TabletStatus.NEED_FURTHER_REPAIR, TabletSchedCtx.Priority.HIGH);
|
||||
tabletHealth.status = TabletStatus.NEED_FURTHER_REPAIR;
|
||||
tabletHealth.priority = TabletSchedCtx.Priority.HIGH;
|
||||
} else {
|
||||
// we set REDUNDANT as VERY_HIGH, because delete redundant replicas can free the space quickly.
|
||||
tabletHealth.status = TabletStatus.REDUNDANT;
|
||||
tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
|
||||
}
|
||||
// we set REDUNDANT as VERY_HIGH, because delete redundant replicas can free the space quickly.
|
||||
return Pair.of(TabletStatus.REDUNDANT, TabletSchedCtx.Priority.VERY_HIGH);
|
||||
return tabletHealth;
|
||||
}
|
||||
|
||||
// 5. find a replica's version count is much more than others, and drop it
|
||||
@ -603,12 +671,36 @@ public class Tablet extends MetaObject implements Writable {
|
||||
double ratio = (double) delta / versions.get(versions.size() - 1);
|
||||
if (versions.get(versions.size() - 1) >= Config.min_version_count_indicate_replica_compaction_too_slow
|
||||
&& ratio > Config.valid_version_count_delta_ratio_between_replicas) {
|
||||
return Pair.of(TabletStatus.REPLICA_COMPACTION_TOO_SLOW, Priority.HIGH);
|
||||
tabletHealth.status = TabletStatus.REPLICA_COMPACTION_TOO_SLOW;
|
||||
tabletHealth.priority = Priority.HIGH;
|
||||
return tabletHealth;
|
||||
}
|
||||
}
|
||||
|
||||
// 6. healthy
|
||||
return Pair.of(TabletStatus.HEALTHY, TabletSchedCtx.Priority.NORMAL);
|
||||
tabletHealth.status = TabletStatus.HEALTHY;
|
||||
tabletHealth.priority = TabletSchedCtx.Priority.NORMAL;
|
||||
|
||||
return tabletHealth;
|
||||
}
|
||||
|
||||
private void initTabletHealth(TabletHealth tabletHealth) {
|
||||
long endTime = System.currentTimeMillis() - Config.tablet_recent_load_failed_second * 1000L;
|
||||
tabletHealth.hasRecentLoadFailed = lastLoadFailedTime > endTime;
|
||||
tabletHealth.noPathForNewReplica = lastTimeNoPathForNewReplica > endTime;
|
||||
}
|
||||
|
||||
private boolean isReplicaAndBackendAlive(Replica replica, Backend backend, Set<String> hosts) {
|
||||
if (backend == null || !backend.isAlive() || !replica.isAlive()
|
||||
|| checkHost(hosts, backend) || replica.tooSlow() || !backend.isMixNode()) {
|
||||
// this replica is not alive,
|
||||
// or if this replica is on same host with another replica, we also treat it as 'dead',
|
||||
// so that Tablet Scheduler will create a new replica on different host.
|
||||
// ATTN: Replicas on same host is a bug of previous Doris version, so we fix it by this way.
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkHost(Set<String> hosts, Backend backend) {
|
||||
@ -637,8 +729,49 @@ public class Tablet extends MetaObject implements Writable {
|
||||
* No need to check if backend is available. We consider all backends in 'backendsSet' are available,
|
||||
* If not, unavailable backends will be relocated by CalocateTableBalancer first.
|
||||
*/
|
||||
public TabletStatus getColocateHealthStatus(long visibleVersion,
|
||||
public TabletHealth getColocateHealth(long visibleVersion,
|
||||
ReplicaAllocation replicaAlloc, Set<Long> backendsSet) {
|
||||
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
|
||||
short replicationNum = replicaAlloc.getTotalReplicaNum();
|
||||
boolean hasAliveAndVersionIncomplete = false;
|
||||
int aliveAndVersionComplete = 0;
|
||||
Set<String> hosts = Sets.newHashSet();
|
||||
for (Replica replica : replicas) {
|
||||
Backend backend = systemInfoService.getBackend(replica.getBackendId());
|
||||
if (!isReplicaAndBackendAlive(replica, backend, hosts)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
boolean versionCompleted = replica.getLastFailedVersion() < 0 && replica.getVersion() >= visibleVersion;
|
||||
if (versionCompleted) {
|
||||
aliveAndVersionComplete++;
|
||||
}
|
||||
|
||||
if (replica.isScheduleAvailable()) {
|
||||
if (!versionCompleted) {
|
||||
hasAliveAndVersionIncomplete = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TabletHealth tabletHealth = new TabletHealth();
|
||||
initTabletHealth(tabletHealth);
|
||||
tabletHealth.aliveAndVersionCompleteNum = aliveAndVersionComplete;
|
||||
tabletHealth.hasAliveAndVersionIncomplete = hasAliveAndVersionIncomplete;
|
||||
tabletHealth.priority = TabletSchedCtx.Priority.NORMAL;
|
||||
|
||||
// 0. We can not choose a good replica as src to repair this tablet.
|
||||
if (aliveAndVersionComplete == 0) {
|
||||
tabletHealth.status = TabletStatus.UNRECOVERABLE;
|
||||
return tabletHealth;
|
||||
} else if (aliveAndVersionComplete < replicationNum && hasAliveAndVersionIncomplete) {
|
||||
// not enough good replica, and there exists schedule available replicas and version incomplete,
|
||||
// no matter whether they tag is proper right, fix them immediately.
|
||||
tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
|
||||
tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
|
||||
return tabletHealth;
|
||||
}
|
||||
|
||||
// Here we don't need to care about tag. Because the replicas of the colocate table has been confirmed
|
||||
// in ColocateTableCheckerAndBalancer.
|
||||
Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
|
||||
@ -647,7 +780,8 @@ public class Tablet extends MetaObject implements Writable {
|
||||
// Because if the following check doesn't pass, the COLOCATE_MISMATCH will return.
|
||||
Set<Long> replicaBackendIds = getBackendIds();
|
||||
if (!replicaBackendIds.containsAll(backendsSet)) {
|
||||
return TabletStatus.COLOCATE_MISMATCH;
|
||||
tabletHealth.status = TabletStatus.COLOCATE_MISMATCH;
|
||||
return tabletHealth;
|
||||
}
|
||||
|
||||
// 2. check version completeness
|
||||
@ -663,27 +797,31 @@ public class Tablet extends MetaObject implements Writable {
|
||||
if (replica.isBad()) {
|
||||
// If this replica is bad but located on one of backendsSet,
|
||||
// we have drop it first, or we can find any other BE for new replica.
|
||||
return TabletStatus.COLOCATE_REDUNDANT;
|
||||
tabletHealth.status = TabletStatus.COLOCATE_REDUNDANT;
|
||||
} else {
|
||||
// maybe in replica's DECOMMISSION state
|
||||
// Here we return VERSION_INCOMPLETE,
|
||||
// and the tablet scheduler will finally set it's state to NORMAL.
|
||||
return TabletStatus.VERSION_INCOMPLETE;
|
||||
tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
|
||||
}
|
||||
return tabletHealth;
|
||||
}
|
||||
|
||||
if (replica.getLastFailedVersion() > 0 || replica.getVersion() < visibleVersion) {
|
||||
// this replica is alive but version incomplete
|
||||
return TabletStatus.VERSION_INCOMPLETE;
|
||||
tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
|
||||
return tabletHealth;
|
||||
}
|
||||
}
|
||||
|
||||
// 3. check redundant
|
||||
if (replicas.size() > totalReplicaNum) {
|
||||
return TabletStatus.COLOCATE_REDUNDANT;
|
||||
tabletHealth.status = TabletStatus.COLOCATE_REDUNDANT;
|
||||
return tabletHealth;
|
||||
}
|
||||
|
||||
return TabletStatus.HEALTHY;
|
||||
tabletHealth.status = TabletStatus.HEALTHY;
|
||||
return tabletHealth;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -744,4 +882,16 @@ public class Tablet extends MetaObject implements Writable {
|
||||
public void setLastStatusCheckTime(long lastStatusCheckTime) {
|
||||
this.lastStatusCheckTime = lastStatusCheckTime;
|
||||
}
|
||||
|
||||
public long getLastLoadFailedTime() {
|
||||
return lastLoadFailedTime;
|
||||
}
|
||||
|
||||
public void setLastLoadFailedTime(long lastLoadFailedTime) {
|
||||
this.lastLoadFailedTime = lastLoadFailedTime;
|
||||
}
|
||||
|
||||
public void setLastTimeNoPathForNewReplica(long lastTimeNoPathForNewReplica) {
|
||||
this.lastTimeNoPathForNewReplica = lastTimeNoPathForNewReplica;
|
||||
}
|
||||
}
|
||||
|
||||
@ -28,6 +28,7 @@ import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.ReplicaAllocation;
|
||||
import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.catalog.Tablet.TabletHealth;
|
||||
import org.apache.doris.catalog.Tablet.TabletStatus;
|
||||
import org.apache.doris.clone.TabletChecker.CheckerCounter;
|
||||
import org.apache.doris.clone.TabletSchedCtx.Priority;
|
||||
@ -334,7 +335,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
* tablet to TabletScheduler.
|
||||
* Otherwise, mark the group as stable
|
||||
*/
|
||||
protected void runAfterCatalogReady() {
|
||||
public void runAfterCatalogReady() {
|
||||
relocateAndBalanceGroups();
|
||||
matchGroups();
|
||||
}
|
||||
@ -512,6 +513,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
if (olapTable == null || !colocateIndex.isColocateTable(olapTable.getId())) {
|
||||
continue;
|
||||
}
|
||||
boolean isUniqKeyMergeOnWrite = olapTable.isUniqKeyMergeOnWrite();
|
||||
olapTable.readLock();
|
||||
try {
|
||||
for (Partition partition : olapTable.getPartitions()) {
|
||||
@ -530,16 +532,20 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
Preconditions.checkState(bucketsSeq.size() == replicationNum,
|
||||
bucketsSeq.size() + " vs. " + replicationNum);
|
||||
Tablet tablet = index.getTablet(tabletId);
|
||||
TabletStatus st = tablet.getColocateHealthStatus(
|
||||
TabletHealth tabletHealth = tablet.getColocateHealth(
|
||||
visibleVersion, replicaAlloc, bucketsSeq);
|
||||
if (st != TabletStatus.HEALTHY) {
|
||||
if (tabletHealth.status != TabletStatus.HEALTHY) {
|
||||
counter.unhealthyTabletNum++;
|
||||
unstableReason = String.format("get unhealthy tablet %d in colocate table."
|
||||
+ " status: %s", tablet.getId(), st);
|
||||
+ " status: %s", tablet.getId(), tabletHealth.status);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(unstableReason);
|
||||
}
|
||||
|
||||
if (tabletHealth.status == TabletStatus.UNRECOVERABLE) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!tablet.readyToBeRepaired(infoService, Priority.NORMAL)) {
|
||||
counter.tabletNotReady++;
|
||||
continue;
|
||||
@ -550,9 +556,9 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
db.getId(), tableId, partition.getId(), index.getId(), tablet.getId(),
|
||||
replicaAlloc, System.currentTimeMillis());
|
||||
// the tablet status will be set again when being scheduled
|
||||
tabletCtx.setTabletStatus(st);
|
||||
tabletCtx.setPriority(Priority.NORMAL);
|
||||
tabletCtx.setTabletHealth(tabletHealth);
|
||||
tabletCtx.setTabletOrderIdx(idx);
|
||||
tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite);
|
||||
|
||||
AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */);
|
||||
if (res == AddResult.LIMIT_EXCEED || res == AddResult.DISABLED) {
|
||||
|
||||
@ -30,6 +30,7 @@ import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.Partition.PartitionState;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.catalog.Tablet.TabletHealth;
|
||||
import org.apache.doris.catalog.Tablet.TabletStatus;
|
||||
import org.apache.doris.clone.TabletScheduler.AddResult;
|
||||
import org.apache.doris.common.Config;
|
||||
@ -197,7 +198,7 @@ public class TabletChecker extends MasterDaemon {
|
||||
* If a tablet is not healthy, a TabletInfo will be created and sent to TabletScheduler for repairing.
|
||||
*/
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
public void runAfterCatalogReady() {
|
||||
int pendingNum = tabletScheduler.getPendingNum();
|
||||
int runningNum = tabletScheduler.getRunningNum();
|
||||
if (pendingNum > Config.max_scheduling_tablets
|
||||
@ -357,6 +358,7 @@ public class TabletChecker extends MasterDaemon {
|
||||
return LoopControlStatus.CONTINUE;
|
||||
}
|
||||
boolean prioPartIsHealthy = true;
|
||||
boolean isUniqKeyMergeOnWrite = tbl.isUniqKeyMergeOnWrite();
|
||||
/*
|
||||
* Tablet in SHADOW index can not be repaired of balanced
|
||||
*/
|
||||
@ -369,26 +371,25 @@ public class TabletChecker extends MasterDaemon {
|
||||
continue;
|
||||
}
|
||||
|
||||
Pair<TabletStatus, TabletSchedCtx.Priority> statusWithPrio = tablet.getHealthStatusWithPriority(
|
||||
infoService, partition.getVisibleVersion(),
|
||||
TabletHealth tabletHealth = tablet.getHealth(infoService, partition.getVisibleVersion(),
|
||||
tbl.getPartitionInfo().getReplicaAllocation(partition.getId()), aliveBeIds);
|
||||
|
||||
if (statusWithPrio.first == TabletStatus.HEALTHY) {
|
||||
if (tabletHealth.status == TabletStatus.HEALTHY) {
|
||||
// Only set last status check time when status is healthy.
|
||||
tablet.setLastStatusCheckTime(startTime);
|
||||
continue;
|
||||
} else if (statusWithPrio.first == TabletStatus.UNRECOVERABLE) {
|
||||
} else if (tabletHealth.status == TabletStatus.UNRECOVERABLE) {
|
||||
// This tablet is not recoverable, do not set it into tablet scheduler
|
||||
// all UNRECOVERABLE tablet can be seen from "show proc '/statistic'"
|
||||
counter.unhealthyTabletNum++;
|
||||
continue;
|
||||
} else if (isInPrios) {
|
||||
statusWithPrio.second = TabletSchedCtx.Priority.VERY_HIGH;
|
||||
tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
|
||||
prioPartIsHealthy = false;
|
||||
}
|
||||
|
||||
counter.unhealthyTabletNum++;
|
||||
if (!tablet.readyToBeRepaired(infoService, statusWithPrio.second)) {
|
||||
if (!tablet.readyToBeRepaired(infoService, tabletHealth.priority)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -399,8 +400,8 @@ public class TabletChecker extends MasterDaemon {
|
||||
tbl.getPartitionInfo().getReplicaAllocation(partition.getId()),
|
||||
System.currentTimeMillis());
|
||||
// the tablet status will be set again when being scheduled
|
||||
tabletCtx.setTabletStatus(statusWithPrio.first);
|
||||
tabletCtx.setPriority(statusWithPrio.second);
|
||||
tabletCtx.setTabletHealth(tabletHealth);
|
||||
tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite);
|
||||
|
||||
AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */);
|
||||
if (res == AddResult.LIMIT_EXCEED || res == AddResult.DISABLED) {
|
||||
|
||||
@ -27,6 +27,7 @@ import org.apache.doris.catalog.Replica.ReplicaState;
|
||||
import org.apache.doris.catalog.ReplicaAllocation;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.catalog.Tablet.TabletHealth;
|
||||
import org.apache.doris.catalog.Tablet.TabletStatus;
|
||||
import org.apache.doris.clone.SchedException.Status;
|
||||
import org.apache.doris.clone.SchedException.SubCode;
|
||||
@ -134,8 +135,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
private Type type;
|
||||
private BalanceType balanceType;
|
||||
|
||||
private Priority priority;
|
||||
|
||||
// we change the dynamic priority based on how many times it fails to be scheduled
|
||||
private int failedSchedCounter = 0;
|
||||
// clone task failed counter
|
||||
@ -161,7 +160,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
private long taskTimeoutMs = 0;
|
||||
|
||||
private State state;
|
||||
private TabletStatus tabletStatus;
|
||||
private TabletHealth tabletHealth;
|
||||
|
||||
private long decommissionTime = -1;
|
||||
|
||||
@ -213,6 +212,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
|
||||
private SubCode schedFailedCode;
|
||||
|
||||
private boolean isUniqKeyMergeOnWrite = false;
|
||||
|
||||
public TabletSchedCtx(Type type, long dbId, long tblId, long partId,
|
||||
long idxId, long tabletId, ReplicaAllocation replicaAlloc, long createTime) {
|
||||
this.type = type;
|
||||
@ -227,6 +228,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
this.replicaAlloc = replicaAlloc;
|
||||
this.balanceType = BalanceType.BE_BALANCE;
|
||||
this.schedFailedCode = SubCode.NONE;
|
||||
this.tabletHealth = new TabletHealth();
|
||||
}
|
||||
|
||||
public ReplicaAllocation getReplicaAlloc() {
|
||||
@ -262,11 +264,19 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
}
|
||||
|
||||
public Priority getPriority() {
|
||||
return priority;
|
||||
return tabletHealth.priority;
|
||||
}
|
||||
|
||||
public void setPriority(Priority priority) {
|
||||
this.priority = priority;
|
||||
this.tabletHealth.priority = priority;
|
||||
}
|
||||
|
||||
public void setTabletHealth(TabletHealth tabletHealth) {
|
||||
this.tabletHealth = tabletHealth;
|
||||
}
|
||||
|
||||
public void setIsUniqKeyMergeOnWrite(boolean isUniqKeyMergeOnWrite) {
|
||||
this.isUniqKeyMergeOnWrite = isUniqKeyMergeOnWrite;
|
||||
}
|
||||
|
||||
public int getFinishedCounter() {
|
||||
@ -345,11 +355,11 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
}
|
||||
|
||||
public void setTabletStatus(TabletStatus tabletStatus) {
|
||||
this.tabletStatus = tabletStatus;
|
||||
this.tabletHealth.status = tabletStatus;
|
||||
}
|
||||
|
||||
public TabletStatus getTabletStatus() {
|
||||
return tabletStatus;
|
||||
return tabletHealth.status;
|
||||
}
|
||||
|
||||
public long getDbId() {
|
||||
@ -739,7 +749,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
if (replica.getLastFailedVersion() <= 0
|
||||
&& replica.getVersion() >= visibleVersion) {
|
||||
|
||||
if (tabletStatus == TabletStatus.NEED_FURTHER_REPAIR && replica.needFurtherRepair()) {
|
||||
if (tabletHealth.status == TabletStatus.NEED_FURTHER_REPAIR && replica.needFurtherRepair()) {
|
||||
furtherRepairs.add(replica);
|
||||
}
|
||||
|
||||
@ -1016,10 +1026,10 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
// REPLICA_MISSING/REPLICA_RELOCATING,
|
||||
// we create a new replica with state CLONE
|
||||
Replica replica = null;
|
||||
if (tabletStatus == TabletStatus.REPLICA_MISSING
|
||||
|| tabletStatus == TabletStatus.REPLICA_RELOCATING || type == Type.BALANCE
|
||||
|| tabletStatus == TabletStatus.COLOCATE_MISMATCH
|
||||
|| tabletStatus == TabletStatus.REPLICA_MISSING_FOR_TAG) {
|
||||
if (tabletHealth.status == TabletStatus.REPLICA_MISSING
|
||||
|| tabletHealth.status == TabletStatus.REPLICA_RELOCATING || type == Type.BALANCE
|
||||
|| tabletHealth.status == TabletStatus.COLOCATE_MISMATCH
|
||||
|| tabletHealth.status == TabletStatus.REPLICA_MISSING_FOR_TAG) {
|
||||
replica = new Replica(
|
||||
Env.getCurrentEnv().getNextId(), destBackendId,
|
||||
-1 /* version */, schemaHash,
|
||||
@ -1054,7 +1064,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
destOldVersion = replica.getVersion();
|
||||
cloneTask.setPathHash(srcPathHash, destPathHash);
|
||||
LOG.info("create clone task to repair replica, tabletId={}, replica={}, visible version {}, tablet status {}",
|
||||
tabletId, replica, visibleVersion, tabletStatus);
|
||||
tabletId, replica, visibleVersion, tabletHealth.status);
|
||||
|
||||
this.state = State.RUNNING;
|
||||
return cloneTask;
|
||||
@ -1062,7 +1072,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
|
||||
// for storage migration or cloning a new replica
|
||||
public long getDestEstimatedCopingSize() {
|
||||
if ((cloneTask != null && tabletStatus != TabletStatus.VERSION_INCOMPLETE)
|
||||
if ((cloneTask != null && tabletHealth.status != TabletStatus.VERSION_INCOMPLETE)
|
||||
|| storageMediaMigrationTask != null) {
|
||||
return Math.max(getTabletSize(), 10L);
|
||||
} else {
|
||||
@ -1149,10 +1159,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
|
||||
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
|
||||
ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo().getReplicaAllocation(partitionId);
|
||||
Pair<TabletStatus, TabletSchedCtx.Priority> pair = tablet.getHealthStatusWithPriority(
|
||||
infoService, visibleVersion, replicaAlloc,
|
||||
aliveBeIds);
|
||||
if (pair.first == TabletStatus.HEALTHY) {
|
||||
TabletStatus status = tablet.getHealth(infoService, visibleVersion, replicaAlloc, aliveBeIds).status;
|
||||
if (status == TabletStatus.HEALTHY) {
|
||||
throw new SchedException(Status.FINISHED, "tablet is healthy");
|
||||
}
|
||||
|
||||
@ -1266,10 +1274,13 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
result.add(String.valueOf(tabletId));
|
||||
result.add(type.name());
|
||||
result.add(storageMedium == null ? FeConstants.null_string : storageMedium.name());
|
||||
result.add(tabletStatus == null ? FeConstants.null_string : tabletStatus.name());
|
||||
result.add(tabletHealth.status == null ? FeConstants.null_string : tabletHealth.status.name());
|
||||
result.add(state.name());
|
||||
result.add(schedFailedCode.name());
|
||||
result.add(priority.name());
|
||||
result.add(tabletHealth.priority == null ? FeConstants.null_string : tabletHealth.priority.name());
|
||||
// show the real priority value, higher this value, higher sched priority. Add 10 hour to make it
|
||||
// to be a positive value.
|
||||
result.add(String.valueOf((System.currentTimeMillis() - getCompareValue()) / 1000 + 10 * 3600L));
|
||||
result.add(srcReplica == null ? "-1" : String.valueOf(srcReplica.getBackendId()));
|
||||
result.add(String.valueOf(srcPathHash));
|
||||
result.add(String.valueOf(destBackendId));
|
||||
@ -1299,26 +1310,44 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
return Long.compare(getCompareValue(), o.getCompareValue());
|
||||
}
|
||||
|
||||
// smaller compare value, higher priority
|
||||
private long getCompareValue() {
|
||||
long value = createTime;
|
||||
if (lastVisitedTime > 0) {
|
||||
value = lastVisitedTime;
|
||||
}
|
||||
|
||||
value += (Priority.VERY_HIGH.ordinal() - priority.ordinal() + 1) * 60 * 1000L;
|
||||
value += (Priority.VERY_HIGH.ordinal() - tabletHealth.priority.ordinal() + 1) * 60 * 1000L;
|
||||
value += 5000L * (failedSchedCounter / 10);
|
||||
|
||||
if (schedFailedCode == SubCode.WAITING_DECOMMISSION) {
|
||||
value += 5 * 1000L;
|
||||
}
|
||||
|
||||
long baseTime = Config.tablet_schedule_high_priority_second * 1000L;
|
||||
// repair tasks always prior than balance
|
||||
if (type == Type.BALANCE) {
|
||||
value += 5 * 3600 * 1000L; // 5 hour
|
||||
value += 10 * baseTime;
|
||||
} else {
|
||||
int replicaNum = replicaAlloc.getTotalReplicaNum();
|
||||
if (tabletHealth.aliveAndVersionCompleteNum < replicaNum && !tabletHealth.noPathForNewReplica) {
|
||||
if (tabletHealth.aliveAndVersionCompleteNum < (replicaNum / 2 + 1)) {
|
||||
value -= 3 * baseTime;
|
||||
if (tabletHealth.hasRecentLoadFailed) {
|
||||
value -= 3 * baseTime;
|
||||
}
|
||||
}
|
||||
if (tabletHealth.hasAliveAndVersionIncomplete) {
|
||||
value -= 1 * baseTime;
|
||||
if (isUniqKeyMergeOnWrite) {
|
||||
value -= 1 * baseTime;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (tabletStatus == TabletStatus.NEED_FURTHER_REPAIR) {
|
||||
value -= 3600 * 1000L; // 1 hour
|
||||
if (tabletHealth.status == TabletStatus.NEED_FURTHER_REPAIR) {
|
||||
value -= 1 * baseTime;
|
||||
}
|
||||
|
||||
return value;
|
||||
@ -1328,8 +1357,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("tablet id: ").append(tabletId);
|
||||
if (tabletStatus != null) {
|
||||
sb.append(", status: ").append(tabletStatus.name());
|
||||
if (tabletHealth.status != null) {
|
||||
sb.append(", status: ").append(tabletHealth.status.name());
|
||||
}
|
||||
if (state != null) {
|
||||
sb.append(", state: ").append(state.name());
|
||||
@ -1340,9 +1369,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
if (type == Type.BALANCE && balanceType != null) {
|
||||
sb.append(", balance: ").append(balanceType.name());
|
||||
}
|
||||
if (priority != null) {
|
||||
sb.append(", priority: ").append(priority.name());
|
||||
}
|
||||
sb.append(", priority: ").append(tabletHealth.priority.name());
|
||||
sb.append(", tablet size: ").append(tabletSize);
|
||||
if (srcReplica != null) {
|
||||
sb.append(", from backend: ").append(srcReplica.getBackendId());
|
||||
|
||||
@ -34,6 +34,7 @@ import org.apache.doris.catalog.Replica;
|
||||
import org.apache.doris.catalog.Replica.ReplicaState;
|
||||
import org.apache.doris.catalog.ReplicaAllocation;
|
||||
import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.catalog.Tablet.TabletHealth;
|
||||
import org.apache.doris.catalog.Tablet.TabletStatus;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair;
|
||||
@ -45,7 +46,6 @@ import org.apache.doris.clone.TabletSchedCtx.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.util.DebugPointUtil;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.persist.ReplicaPersistInfo;
|
||||
@ -167,6 +167,17 @@ public class TabletScheduler extends MasterDaemon {
|
||||
this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex, backendsWorkingSlots);
|
||||
}
|
||||
|
||||
// for fe ut
|
||||
public synchronized void clear() {
|
||||
pendingTablets.clear();
|
||||
allTabletTypes.clear();
|
||||
runningTablets.clear();
|
||||
schedHistory.clear();
|
||||
|
||||
lastStatUpdateTime = 0;
|
||||
lastSlotAdjustTime = 0;
|
||||
}
|
||||
|
||||
public TabletSchedulerStat getStat() {
|
||||
return stat;
|
||||
}
|
||||
@ -322,7 +333,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
public void runAfterCatalogReady() {
|
||||
if (!updateWorkingSlots()) {
|
||||
return;
|
||||
}
|
||||
@ -481,7 +492,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
tabletCtx.setLastVisitedTime(currentTime);
|
||||
stat.counterTabletScheduled.incrementAndGet();
|
||||
|
||||
Pair<TabletStatus, TabletSchedCtx.Priority> statusPair;
|
||||
TabletHealth tabletHealth;
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrException(tabletCtx.getDbId(),
|
||||
s -> new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
|
||||
"db " + tabletCtx.getDbId() + " does not exist"));
|
||||
@ -530,15 +541,13 @@ public class TabletScheduler extends MasterDaemon {
|
||||
Preconditions.checkState(tabletOrderIdx != -1);
|
||||
|
||||
Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
|
||||
TabletStatus st = tablet.getColocateHealthStatus(
|
||||
partition.getVisibleVersion(), replicaAlloc, backendsSet);
|
||||
statusPair = Pair.of(st, Priority.HIGH);
|
||||
tabletHealth = tablet.getColocateHealth(partition.getVisibleVersion(), replicaAlloc, backendsSet);
|
||||
tabletHealth.priority = Priority.HIGH;
|
||||
tabletCtx.setColocateGroupBackendIds(backendsSet);
|
||||
} else {
|
||||
replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
|
||||
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
|
||||
statusPair = tablet.getHealthStatusWithPriority(
|
||||
infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
|
||||
tabletHealth = tablet.getHealth(infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
|
||||
}
|
||||
|
||||
if (tabletCtx.getType() != allTabletTypes.get(tabletId)) {
|
||||
@ -576,7 +585,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
}
|
||||
}
|
||||
|
||||
if (statusPair.first != TabletStatus.VERSION_INCOMPLETE
|
||||
if (tabletHealth.status != TabletStatus.VERSION_INCOMPLETE
|
||||
&& (partition.getState() != PartitionState.NORMAL || tableState != OlapTableState.NORMAL)
|
||||
&& tableState != OlapTableState.WAITING_STABLE) {
|
||||
// If table is under ALTER process(before FINISHING), do not allow to add or delete replica.
|
||||
@ -585,13 +594,14 @@ public class TabletScheduler extends MasterDaemon {
|
||||
// executing an alter job, but the alter job is in a PENDING state and is waiting for
|
||||
// the table to become stable. In this case, we allow the tablet repair to proceed.
|
||||
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
|
||||
"table is in alter process, but tablet status is " + statusPair.first.name());
|
||||
"table is in alter process, but tablet status is " + tabletHealth.status.name());
|
||||
}
|
||||
|
||||
tabletCtx.setTabletStatus(statusPair.first);
|
||||
if (statusPair.first == TabletStatus.HEALTHY && tabletCtx.getType() == TabletSchedCtx.Type.REPAIR) {
|
||||
tabletCtx.setTabletHealth(tabletHealth);
|
||||
tabletCtx.setIsUniqKeyMergeOnWrite(tbl.isUniqKeyMergeOnWrite());
|
||||
if (tabletHealth.status == TabletStatus.HEALTHY && tabletCtx.getType() == TabletSchedCtx.Type.REPAIR) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "tablet is healthy");
|
||||
} else if (statusPair.first != TabletStatus.HEALTHY
|
||||
} else if (tabletHealth.status != TabletStatus.HEALTHY
|
||||
&& tabletCtx.getType() == TabletSchedCtx.Type.BALANCE) {
|
||||
// we select an unhealthy tablet to do balance, which is not right.
|
||||
// so here we stop this task.
|
||||
@ -612,7 +622,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
tabletCtx.setSchemaHash(tbl.getSchemaHashByIndexId(idx.getId()));
|
||||
tabletCtx.setStorageMedium(tbl.getPartitionInfo().getDataProperty(partition.getId()).getStorageMedium());
|
||||
|
||||
handleTabletByTypeAndStatus(statusPair.first, tabletCtx, batchTask);
|
||||
handleTabletByTypeAndStatus(tabletHealth.status, tabletCtx, batchTask);
|
||||
} finally {
|
||||
tbl.writeUnlock();
|
||||
}
|
||||
@ -1379,6 +1389,24 @@ public class TabletScheduler extends MasterDaemon {
|
||||
// if forColocate is false, the tag must be set.
|
||||
private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx, Tag tag, boolean forColocate)
|
||||
throws SchedException {
|
||||
boolean noPathForNewReplica = false;
|
||||
try {
|
||||
return doChooseAvailableDestPath(tabletCtx, tag, forColocate);
|
||||
} catch (SchedException e) {
|
||||
if (e.getStatus() == Status.UNRECOVERABLE) {
|
||||
noPathForNewReplica = true;
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
Tablet tablet = tabletCtx.getTablet();
|
||||
if (tablet != null) {
|
||||
tablet.setLastTimeNoPathForNewReplica(noPathForNewReplica ? System.currentTimeMillis() : -1L);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private RootPathLoadStatistic doChooseAvailableDestPath(TabletSchedCtx tabletCtx, Tag tag, boolean forColocate)
|
||||
throws SchedException {
|
||||
List<BackendLoadStatistic> beStatistics;
|
||||
if (tag != null) {
|
||||
Preconditions.checkState(!forColocate);
|
||||
@ -1552,13 +1580,11 @@ public class TabletScheduler extends MasterDaemon {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void addBackToPendingTablets(TabletSchedCtx tabletCtx) {
|
||||
Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.PENDING);
|
||||
addTablet(tabletCtx, true /* force */);
|
||||
}
|
||||
|
||||
|
||||
private void finalizeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, Status status, String reason) {
|
||||
if (state == TabletSchedCtx.State.CANCELLED || state == TabletSchedCtx.State.UNEXPECTED) {
|
||||
if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE
|
||||
@ -1595,8 +1621,6 @@ public class TabletScheduler extends MasterDaemon {
|
||||
if (tbl == null) {
|
||||
return;
|
||||
}
|
||||
Pair<TabletStatus, TabletSchedCtx.Priority> statusPair;
|
||||
ReplicaAllocation replicaAlloc = null;
|
||||
tbl.readLock();
|
||||
try {
|
||||
Partition partition = tbl.getPartition(tabletCtx.getPartitionId());
|
||||
@ -1614,67 +1638,79 @@ public class TabletScheduler extends MasterDaemon {
|
||||
return;
|
||||
}
|
||||
|
||||
boolean isColocateTable = colocateTableIndex.isColocateTable(tbl.getId());
|
||||
if (isColocateTable) {
|
||||
GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
|
||||
if (groupId == null) {
|
||||
return;
|
||||
}
|
||||
ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId);
|
||||
if (groupSchema == null) {
|
||||
return;
|
||||
}
|
||||
tryAddRepairTablet(tablet, tabletCtx.getDbId(), tbl, partition, idx, finishedCounter);
|
||||
} finally {
|
||||
tbl.readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
replicaAlloc = groupSchema.getReplicaAlloc();
|
||||
int tabletOrderIdx = tabletCtx.getTabletOrderIdx();
|
||||
if (tabletOrderIdx == -1) {
|
||||
tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId());
|
||||
}
|
||||
Preconditions.checkState(tabletOrderIdx != -1);
|
||||
public void tryAddRepairTablet(Tablet tablet, long dbId, OlapTable table, Partition partition,
|
||||
MaterializedIndex idx, int finishedCounter) {
|
||||
if (Config.disable_tablet_scheduler) {
|
||||
return;
|
||||
}
|
||||
|
||||
Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
|
||||
TabletStatus st = tablet.getColocateHealthStatus(
|
||||
partition.getVisibleVersion(), replicaAlloc, backendsSet);
|
||||
statusPair = Pair.of(st, Priority.HIGH);
|
||||
tabletCtx.setColocateGroupBackendIds(backendsSet);
|
||||
} else {
|
||||
replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
|
||||
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
|
||||
statusPair = tablet.getHealthStatusWithPriority(
|
||||
infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
|
||||
|
||||
if (statusPair.second.ordinal() < tabletCtx.getPriority().ordinal()) {
|
||||
statusPair.second = tabletCtx.getPriority();
|
||||
}
|
||||
TabletHealth tabletHealth;
|
||||
ReplicaAllocation replicaAlloc;
|
||||
Set<Long> colocateBackendIds = null;
|
||||
boolean isColocateTable = colocateTableIndex.isColocateTable(table.getId());
|
||||
if (isColocateTable) {
|
||||
GroupId groupId = colocateTableIndex.getGroup(table.getId());
|
||||
if (groupId == null) {
|
||||
return;
|
||||
}
|
||||
ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId);
|
||||
if (groupSchema == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (statusPair.first == TabletStatus.NEED_FURTHER_REPAIR) {
|
||||
replicaAlloc = groupSchema.getReplicaAlloc();
|
||||
int tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId());
|
||||
if (tabletOrderIdx == -1) {
|
||||
LOG.warn("Unknow colocate tablet order idx: group {}, table {}, partition {}, index {}, tablet {}",
|
||||
groupId, table.getId(), partition.getId(), idx.getId(), tablet.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
colocateBackendIds = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
|
||||
tabletHealth = tablet.getColocateHealth(partition.getVisibleVersion(), replicaAlloc, colocateBackendIds);
|
||||
tabletHealth.priority = Priority.HIGH;
|
||||
} else {
|
||||
replicaAlloc = table.getPartitionInfo().getReplicaAllocation(partition.getId());
|
||||
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
|
||||
tabletHealth = tablet.getHealth(infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
|
||||
}
|
||||
|
||||
if (tabletHealth.status == TabletStatus.HEALTHY || tabletHealth.status == TabletStatus.UNRECOVERABLE) {
|
||||
return;
|
||||
}
|
||||
|
||||
// first time found this tablet is unhealthy
|
||||
if (finishedCounter == 0) {
|
||||
if (!tablet.readyToBeRepaired(Env.getCurrentSystemInfo(), tabletHealth.priority)) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
if (tabletHealth.status == TabletStatus.NEED_FURTHER_REPAIR) {
|
||||
// replica is just waiting for finishing txns before furtherRepairWatermarkTxnTd,
|
||||
// no need to add it immediately
|
||||
Replica replica = tablet.getReplicaByBackendId(tabletCtx.getDestBackendId());
|
||||
// no need to re add it immediately, can wait a little
|
||||
Replica replica = tablet.getReplicaById(tabletHealth.needFurtherRepairReplicaId);
|
||||
if (replica != null && replica.getVersion() >= partition.getVisibleVersion()
|
||||
&& replica.getLastFailedVersion() < 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
tbl.readUnlock();
|
||||
}
|
||||
|
||||
if (statusPair.first == TabletStatus.HEALTHY) {
|
||||
return;
|
||||
}
|
||||
TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.REPAIR, dbId, table.getId(),
|
||||
partition.getId(), idx.getId(), tablet.getId(), replicaAlloc, System.currentTimeMillis());
|
||||
|
||||
TabletSchedCtx newTabletCtx = new TabletSchedCtx(
|
||||
TabletSchedCtx.Type.REPAIR, tabletCtx.getDbId(), tabletCtx.getTblId(),
|
||||
tabletCtx.getPartitionId(), tabletCtx.getIndexId(), tabletCtx.getTabletId(),
|
||||
replicaAlloc, System.currentTimeMillis());
|
||||
tabletCtx.setTabletHealth(tabletHealth);
|
||||
tabletCtx.setFinishedCounter(finishedCounter);
|
||||
tabletCtx.setColocateGroupBackendIds(colocateBackendIds);
|
||||
tabletCtx.setIsUniqKeyMergeOnWrite(table.isUniqKeyMergeOnWrite());
|
||||
|
||||
newTabletCtx.setTabletStatus(statusPair.first);
|
||||
newTabletCtx.setPriority(statusPair.second);
|
||||
newTabletCtx.setFinishedCounter(finishedCounter);
|
||||
|
||||
addTablet(newTabletCtx, false);
|
||||
addTablet(tabletCtx, false);
|
||||
}
|
||||
|
||||
private void releaseTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, boolean resetReplicaState) {
|
||||
|
||||
@ -29,10 +29,8 @@ import org.apache.doris.catalog.ReplicaAllocation;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.clone.TabletSchedCtx;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.task.AgentTask;
|
||||
import org.apache.doris.task.AgentTaskQueue;
|
||||
@ -181,6 +179,7 @@ public class TabletHealthProcDir implements ProcDirInterface {
|
||||
olapTable.readLock();
|
||||
try {
|
||||
for (Partition partition : olapTable.getAllPartitions()) {
|
||||
long visibleVersion = partition.getVisibleVersion();
|
||||
ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo()
|
||||
.getReplicaAllocation(partition.getId());
|
||||
for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(
|
||||
@ -196,13 +195,10 @@ public class TabletHealthProcDir implements ProcDirInterface {
|
||||
replicaAlloc = groupSchema.getReplicaAlloc();
|
||||
}
|
||||
Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, i);
|
||||
res = tablet.getColocateHealthStatus(partition.getVisibleVersion(), replicaAlloc,
|
||||
backendsSet);
|
||||
res = tablet.getColocateHealth(visibleVersion, replicaAlloc, backendsSet).status;
|
||||
} else {
|
||||
Pair<Tablet.TabletStatus, TabletSchedCtx.Priority> pair
|
||||
= tablet.getHealthStatusWithPriority(infoService,
|
||||
partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
|
||||
res = pair.first;
|
||||
res = tablet.getHealth(infoService, visibleVersion, replicaAlloc,
|
||||
aliveBeIds).status;
|
||||
}
|
||||
switch (res) { // CHECKSTYLE IGNORE THIS LINE: missing switch default
|
||||
case HEALTHY:
|
||||
|
||||
@ -35,7 +35,8 @@ import java.util.List;
|
||||
*/
|
||||
public class TabletSchedulerDetailProcDir implements ProcDirInterface {
|
||||
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("TabletId")
|
||||
.add("Type").add("Medium").add("Status").add("State").add("SchedCode").add("Priority").add("SrcBe")
|
||||
.add("Type").add("Medium").add("Status").add("State").add("SchedCode")
|
||||
.add("Priority").add("RealPriorityVal").add("SrcBe")
|
||||
.add("SrcPath").add("DestBe").add("DestPath").add("Timeout").add("Create").add("LstSched").add("LstVisit")
|
||||
.add("Finished").add("ReplicaSize").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer")
|
||||
.add("CmtVer").add("ErrMsg")
|
||||
|
||||
@ -38,7 +38,6 @@ import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.catalog.Tablet.TabletStatus;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.catalog.TabletMeta;
|
||||
import org.apache.doris.clone.TabletSchedCtx;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.Pair;
|
||||
@ -1252,8 +1251,7 @@ public class ReportHandler extends Daemon {
|
||||
if (groupSchema != null) {
|
||||
replicaAlloc = groupSchema.getReplicaAlloc();
|
||||
}
|
||||
TabletStatus status =
|
||||
tablet.getColocateHealthStatus(visibleVersion, replicaAlloc, backendsSet);
|
||||
TabletStatus status = tablet.getColocateHealth(visibleVersion, replicaAlloc, backendsSet).status;
|
||||
if (status == TabletStatus.HEALTHY) {
|
||||
return false;
|
||||
}
|
||||
@ -1265,8 +1263,7 @@ public class ReportHandler extends Daemon {
|
||||
|
||||
SystemInfoService infoService = Env.getCurrentSystemInfo();
|
||||
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
|
||||
Pair<TabletStatus, TabletSchedCtx.Priority> status = tablet.getHealthStatusWithPriority(infoService,
|
||||
visibleVersion, replicaAlloc, aliveBeIds);
|
||||
TabletStatus status = tablet.getHealth(infoService, visibleVersion, replicaAlloc, aliveBeIds).status;
|
||||
|
||||
// FORCE_REDUNDANT is a specific missing case.
|
||||
// So it can add replica when it's in FORCE_REDUNDANT.
|
||||
@ -1275,16 +1272,16 @@ public class ReportHandler extends Daemon {
|
||||
// it's safe to add this replica.
|
||||
// Because if the tablet scheduler want to delete a replica, it will choose the sched
|
||||
// unavailable replica and avoid the repeating loop as above.
|
||||
boolean canAddForceRedundant = status.first == TabletStatus.FORCE_REDUNDANT
|
||||
boolean canAddForceRedundant = status == TabletStatus.FORCE_REDUNDANT
|
||||
&& infoService.checkBackendScheduleAvailable(backendId)
|
||||
&& tablet.getReplicas().stream().anyMatch(
|
||||
r -> !infoService.checkBackendScheduleAvailable(r.getBackendId()));
|
||||
|
||||
if (isColocateBackend
|
||||
|| canAddForceRedundant
|
||||
|| status.first == TabletStatus.VERSION_INCOMPLETE
|
||||
|| status.first == TabletStatus.REPLICA_MISSING
|
||||
|| status.first == TabletStatus.UNRECOVERABLE) {
|
||||
|| status == TabletStatus.VERSION_INCOMPLETE
|
||||
|| status == TabletStatus.REPLICA_MISSING
|
||||
|| status == TabletStatus.UNRECOVERABLE) {
|
||||
long lastFailedVersion = -1L;
|
||||
|
||||
// For some partition created by old version's Doris
|
||||
@ -1360,7 +1357,7 @@ public class ReportHandler extends Daemon {
|
||||
|
||||
LOG.info("add replica[{}-{}] to catalog. backend[{}], tablet status {}, tablet size {}, "
|
||||
+ "is colocate backend {}",
|
||||
tabletId, replicaId, backendId, status.first.name(), tablet.getReplicas().size(),
|
||||
tabletId, replicaId, backendId, status.name(), tablet.getReplicas().size(),
|
||||
isColocateBackend);
|
||||
return true;
|
||||
} else {
|
||||
@ -1374,7 +1371,7 @@ public class ReportHandler extends Daemon {
|
||||
}
|
||||
LOG.warn("no add replica [{}-{}] cause it is enough[{}-{}], tablet status {}",
|
||||
tabletId, replicaId, tablet.getReplicas().size(), replicaAlloc.toCreateStmt(),
|
||||
status.first.name());
|
||||
status.name());
|
||||
return false;
|
||||
}
|
||||
} finally {
|
||||
|
||||
@ -177,7 +177,7 @@ public class OlapInsertExecutor extends AbstractInsertExecutor {
|
||||
database.getId(), olapTableSink.getDstTable(), analyzer));
|
||||
dataStreamSink.setTabletSinkTupleDesc(olapTableSink.getTupleDescriptor());
|
||||
List<TOlapTableLocationParam> locationParams = olapTableSink
|
||||
.createLocation(olapTableSink.getDstTable());
|
||||
.createLocation(database.getId(), olapTableSink.getDstTable());
|
||||
dataStreamSink.setTabletSinkLocationParam(locationParams.get(0));
|
||||
dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId());
|
||||
}
|
||||
|
||||
@ -213,7 +213,7 @@ public class OlapTableSink extends DataSink {
|
||||
tSink.setNeedGenRollup(dstTable.shouldLoadToNewRollup());
|
||||
tSink.setSchema(createSchema(tSink.getDbId(), dstTable, analyzer));
|
||||
tSink.setPartition(createPartition(tSink.getDbId(), dstTable, analyzer));
|
||||
List<TOlapTableLocationParam> locationParams = createLocation(dstTable);
|
||||
List<TOlapTableLocationParam> locationParams = createLocation(tSink.getDbId(), dstTable);
|
||||
tSink.setLocation(locationParams.get(0));
|
||||
if (singleReplicaLoad) {
|
||||
tSink.setSlaveLocation(locationParams.get(1));
|
||||
@ -604,7 +604,7 @@ public class OlapTableSink extends DataSink {
|
||||
return Arrays.asList(locationParam, slaveLocationParam);
|
||||
}
|
||||
|
||||
public List<TOlapTableLocationParam> createLocation(OlapTable table) throws UserException {
|
||||
public List<TOlapTableLocationParam> createLocation(long dbId, OlapTable table) throws UserException {
|
||||
if (table.getPartitionInfo().enableAutomaticPartition() && partitionIds.isEmpty()) {
|
||||
return createDummyLocation(table);
|
||||
}
|
||||
@ -622,6 +622,13 @@ public class OlapTableSink extends DataSink {
|
||||
for (Tablet tablet : index.getTablets()) {
|
||||
Multimap<Long, Long> bePathsMap = tablet.getNormalReplicaBackendPathMap();
|
||||
if (bePathsMap.keySet().size() < loadRequiredReplicaNum) {
|
||||
long now = System.currentTimeMillis();
|
||||
long lastLoadFailedTime = tablet.getLastLoadFailedTime();
|
||||
tablet.setLastLoadFailedTime(now);
|
||||
if (now - lastLoadFailedTime >= 5000L) {
|
||||
Env.getCurrentEnv().getTabletScheduler().tryAddRepairTablet(
|
||||
tablet, dbId, table, partition, index, 0);
|
||||
}
|
||||
throw new UserException(InternalErrorCode.REPLICA_FEW_ERR,
|
||||
"tablet " + tablet.getId() + " alive replica num " + bePathsMap.keySet().size()
|
||||
+ " < load required replica num " + loadRequiredReplicaNum
|
||||
|
||||
@ -621,6 +621,14 @@ public class DatabaseTransactionMgr {
|
||||
|
||||
int successReplicaNum = tabletSuccReplicas.size();
|
||||
if (successReplicaNum < loadRequiredReplicaNum) {
|
||||
long now = System.currentTimeMillis();
|
||||
long lastLoadFailedTime = tablet.getLastLoadFailedTime();
|
||||
tablet.setLastLoadFailedTime(now);
|
||||
if (now - lastLoadFailedTime >= 5000L) {
|
||||
Env.getCurrentEnv().getTabletScheduler().tryAddRepairTablet(
|
||||
tablet, db.getId(), table, partition, index, 0);
|
||||
}
|
||||
|
||||
String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
|
||||
tabletVersionFailedReplicas);
|
||||
|
||||
@ -1271,6 +1279,7 @@ public class DatabaseTransactionMgr {
|
||||
transactionState, tablet.getId(), newVersion, loadRequiredReplicaNum, tableId,
|
||||
partitionId, partition.getCommittedVersion(), writeDetail));
|
||||
}
|
||||
tablet.setLastLoadFailedTime(-1L);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user