Fix bug that repair slot may not be released when clone finished (#589)

This commit is contained in:
Mingyu Chen
2019-01-25 16:49:15 +08:00
committed by GitHub
parent 9a272f0592
commit 9d71a930a2
4 changed files with 188 additions and 184 deletions

View File

@ -668,7 +668,7 @@ OLAPStatus OLAPEngine::storage_medium_migrate(
tablet->obtain_header_rdlock();
if (tablet->has_pending_data()) {
tablet->release_header_lock();
res = OLAP_ERR_HEADER_HAS_PENDING_DATA
res = OLAP_ERR_HEADER_HAS_PENDING_DATA;
OLAP_LOG_WARNING("could not migration because has pending data [tablet='%s' ]",
tablet->full_name().c_str());
break;

View File

@ -175,9 +175,9 @@ public class LoadBalancer {
* 2. Select a low load backend as destination. And tablet should not has replica on this backend.
* 3. Create a clone task.
*/
public void createBalanceTask(TabletSchedCtx tabletInfo, Map<Long, PathSlot> backendsWorkingSlots,
public void createBalanceTask(TabletSchedCtx tabletCtx, Map<Long, PathSlot> backendsWorkingSlots,
AgentBatchTask batchTask) throws SchedException {
ClusterLoadStatistic clusterStat = statisticMap.get(tabletInfo.getCluster());
ClusterLoadStatistic clusterStat = statisticMap.get(tabletCtx.getCluster());
if (clusterStat == null) {
throw new SchedException(Status.UNRECOVERABLE, "cluster does not exist");
}
@ -197,7 +197,7 @@ public class LoadBalancer {
throw new SchedException(Status.UNRECOVERABLE, "all low load backends is unavailable");
}
List<Replica> replicas = tabletInfo.getReplicas();
List<Replica> replicas = tabletCtx.getReplicas();
// Check if this tablet has replica on high load backend.
boolean hasHighReplica = false;
@ -222,7 +222,7 @@ public class LoadBalancer {
if (pathHash == -1) {
continue;
} else {
tabletInfo.setSrc(replica);
tabletCtx.setSrc(replica);
setSource = true;
break;
}
@ -238,13 +238,13 @@ public class LoadBalancer {
// no replica on this low load backend
// 1. check if this clone task can make the cluster more balance.
List<RootPathLoadStatistic> availPaths = Lists.newArrayList();
if (beStat.isFit(tabletInfo.getTabletSize(), availPaths,
if (beStat.isFit(tabletCtx.getTabletSize(), availPaths,
false /* not supplement */) != BalanceStatus.OK) {
continue;
}
if (!clusterStat.isMoreBalanced(tabletInfo.getSrcBackendId(), beStat.getBeId(),
tabletInfo.getTabletId(), tabletInfo.getTabletSize())) {
if (!clusterStat.isMoreBalanced(tabletCtx.getSrcBackendId(), beStat.getBeId(),
tabletCtx.getTabletId(), tabletCtx.getTabletSize())) {
continue;
}
@ -265,7 +265,7 @@ public class LoadBalancer {
if (pathHash == -1) {
continue;
} else {
tabletInfo.setDestination(beStat.getBeId(), pathHash);
tabletCtx.setDest(beStat.getBeId(), pathHash);
setDest = true;
break;
}
@ -277,6 +277,6 @@ public class LoadBalancer {
}
// create clone task
batchTask.addTask(tabletInfo.createCloneReplicaAndTask());
batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
}
}

View File

@ -341,8 +341,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
this.committedVersionHash = committedVersionHash;
}
public void setDestination(Long destBe, long destPathHash) {
this.destBackendId = destBe;
public void setDest(Long destBeId, long destPathHash) {
this.destBackendId = destBeId;
this.destPathHash = destPathHash;
}
@ -447,8 +447,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
long srcPathHash = slot.takeSlot(srcReplica.getPathHash());
if (srcPathHash != -1) {
this.srcReplica = srcReplica;
this.srcPathHash = srcPathHash;
setSrc(srcReplica);
return;
}
}
@ -515,8 +514,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
throw new SchedException(Status.SCHEDULE_FAILED, "unable to take slot of dest path");
}
this.destBackendId = chosenReplica.getBackendId();
this.destPathHash = chosenReplica.getPathHash();
setDest(chosenReplica.getBackendId(), chosenReplica.getPathHash());
}
/*
@ -639,7 +637,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
*/
public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
throws SchedException {
Preconditions.checkState(state == State.RUNNING);
Preconditions.checkState(state == State.RUNNING, state);
Preconditions.checkArgument(cloneTask.getTaskVersion() == CloneTask.VERSION_2);
setLastVisitedTime(System.currentTimeMillis());
@ -854,7 +852,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
return false;
}
Preconditions.checkState(lastSchedTime != 0 && taskTimeoutMs != 0);
Preconditions.checkState(lastSchedTime != 0 && taskTimeoutMs != 0, lastSchedTime + "-" + taskTimeoutMs);
return System.currentTimeMillis() - lastSchedTime > taskTimeoutMs;
}

View File

@ -108,7 +108,7 @@ public class TabletScheduler extends Daemon {
*/
private PriorityQueue<TabletSchedCtx> pendingTablets = new PriorityQueue<>();
private Set<Long> allTabletIds = Sets.newHashSet();
// contains all tabletInfos which state are RUNNING
// contains all tabletCtxs which state are RUNNING
private Map<Long, TabletSchedCtx> runningTablets = Maps.newHashMap();
// save the latest 1000 scheduled tablet info
private Queue<TabletSchedCtx> schedHistory = EvictingQueue.create(1000);
@ -226,12 +226,12 @@ public class TabletScheduler extends Daemon {
*/
public synchronized void changePriorityOfTablets(long dbId, long tblId, List<Long> partitionIds) {
PriorityQueue<TabletSchedCtx> newPendingTablets = new PriorityQueue<>();
for (TabletSchedCtx tabletInfo : pendingTablets) {
if (tabletInfo.getDbId() == dbId && tabletInfo.getTblId() == tblId
&& partitionIds.contains(tabletInfo.getPartitionId())) {
tabletInfo.setOrigPriority(Priority.VERY_HIGH);
for (TabletSchedCtx tabletCtx : pendingTablets) {
if (tabletCtx.getDbId() == dbId && tabletCtx.getTblId() == tblId
&& partitionIds.contains(tabletCtx.getPartitionId())) {
tabletCtx.setOrigPriority(Priority.VERY_HIGH);
}
newPendingTablets.add(tabletInfo);
newPendingTablets.add(tabletCtx);
}
pendingTablets = newPendingTablets;
}
@ -309,17 +309,17 @@ public class TabletScheduler extends Daemon {
private synchronized void adjustPriorities() {
int size = pendingTablets.size();
int changedNum = 0;
TabletSchedCtx tabletInfo = null;
TabletSchedCtx tabletCtx = null;
for (int i = 0; i < size; i++) {
tabletInfo = pendingTablets.poll();
if (tabletInfo == null) {
tabletCtx = pendingTablets.poll();
if (tabletCtx == null) {
break;
}
if (tabletInfo.adjustPriority(stat)) {
if (tabletCtx.adjustPriority(stat)) {
changedNum++;
}
pendingTablets.add(tabletInfo);
pendingTablets.add(tabletCtx);
}
LOG.info("adjust priority for all tablets. changed: {}, total: {}", changedNum, size);
@ -336,45 +336,45 @@ public class TabletScheduler extends Daemon {
*/
private void schedulePendingTablets() {
long start = System.currentTimeMillis();
List<TabletSchedCtx> currentBatch = getNextTabletInfoBatch();
List<TabletSchedCtx> currentBatch = getNextTabletCtxBatch();
LOG.debug("get {} tablets to schedule", currentBatch.size());
AgentBatchTask batchTask = new AgentBatchTask();
for (TabletSchedCtx tabletInfo : currentBatch) {
for (TabletSchedCtx tabletCtx : currentBatch) {
try {
scheduleTablet(tabletInfo, batchTask);
scheduleTablet(tabletCtx, batchTask);
} catch (SchedException e) {
tabletInfo.increaseFailedSchedCounter();
tabletInfo.setErrMsg(e.getMessage());
tabletCtx.increaseFailedSchedCounter();
tabletCtx.setErrMsg(e.getMessage());
if (e.getStatus() == Status.SCHEDULE_FAILED) {
// if balance is disabled, remove this tablet
if (tabletInfo.getType() == Type.BALANCE && Config.disable_balance) {
removeTabletInfo(tabletInfo, TabletSchedCtx.State.CANCELLED,
if (tabletCtx.getType() == Type.BALANCE && Config.disable_balance) {
removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED,
"disable balance and " + e.getMessage());
} else {
// we must release resource it current hold, and be scheduled again
tabletInfo.releaseResource(this);
tabletCtx.releaseResource(this);
// adjust priority to avoid some higher priority always be the first in pendingTablets
stat.counterTabletScheduledFailed.incrementAndGet();
dynamicAdjustPrioAndAddBackToPendingTablets(tabletInfo, e.getMessage());
dynamicAdjustPrioAndAddBackToPendingTablets(tabletCtx, e.getMessage());
}
} else if (e.getStatus() == Status.FINISHED) {
// schedule redundant tablet will throw this exception
stat.counterTabletScheduledSucceeded.incrementAndGet();
removeTabletInfo(tabletInfo, TabletSchedCtx.State.FINISHED, e.getMessage());
removeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, e.getMessage());
} else {
Preconditions.checkState(e.getStatus() == Status.UNRECOVERABLE, e.getStatus());
// discard
stat.counterTabletScheduledDiscard.incrementAndGet();
removeTabletInfo(tabletInfo, TabletSchedCtx.State.CANCELLED, e.getMessage());
removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage());
}
continue;
}
Preconditions.checkState(tabletInfo.getState() == TabletSchedCtx.State.RUNNING);
Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING);
stat.counterTabletScheduledSucceeded.incrementAndGet();
addToRunningTablets(tabletInfo);
addToRunningTablets(tabletCtx);
}
// must send task after adding tablet info to runningTablets.
@ -392,8 +392,8 @@ public class TabletScheduler extends Daemon {
stat.counterTabletScheduleCostMs.addAndGet(cost);
}
private synchronized void addToRunningTablets(TabletSchedCtx tabletInfo) {
runningTablets.put(tabletInfo.getTabletId(), tabletInfo);
private synchronized void addToRunningTablets(TabletSchedCtx tabletCtx) {
runningTablets.put(tabletCtx.getTabletId(), tabletCtx);
}
/*
@ -408,15 +408,15 @@ public class TabletScheduler extends Daemon {
/*
* Try to schedule a single tablet.
*/
private void scheduleTablet(TabletSchedCtx tabletInfo, AgentBatchTask batchTask) throws SchedException {
LOG.debug("schedule tablet: {}", tabletInfo.getTabletId());
private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException {
LOG.debug("schedule tablet: {}", tabletCtx.getTabletId());
long currentTime = System.currentTimeMillis();
tabletInfo.setLastSchedTime(currentTime);
tabletInfo.setLastVisitedTime(currentTime);
tabletCtx.setLastSchedTime(currentTime);
tabletCtx.setLastVisitedTime(currentTime);
stat.counterTabletScheduled.incrementAndGet();
// check this tablet again
Database db = catalog.getDb(tabletInfo.getDbId());
Database db = catalog.getDb(tabletCtx.getDbId());
if (db == null) {
throw new SchedException(Status.UNRECOVERABLE, "db does not exist");
}
@ -424,33 +424,33 @@ public class TabletScheduler extends Daemon {
Pair<TabletStatus, TabletSchedCtx.Priority> statusPair = null;
db.writeLock();
try {
OlapTable tbl = (OlapTable) db.getTable(tabletInfo.getTblId());
OlapTable tbl = (OlapTable) db.getTable(tabletCtx.getTblId());
if (tbl == null) {
throw new SchedException(Status.UNRECOVERABLE, "tbl does not exist");
}
OlapTableState tableState = tbl.getState();
Partition partition = tbl.getPartition(tabletInfo.getPartitionId());
Partition partition = tbl.getPartition(tabletCtx.getPartitionId());
if (partition == null) {
throw new SchedException(Status.UNRECOVERABLE, "partition does not exist");
}
MaterializedIndex idx = partition.getIndex(tabletInfo.getIndexId());
MaterializedIndex idx = partition.getIndex(tabletCtx.getIndexId());
if (idx == null) {
throw new SchedException(Status.UNRECOVERABLE, "index does not exist");
}
Tablet tablet = idx.getTablet(tabletInfo.getTabletId());
Tablet tablet = idx.getTablet(tabletCtx.getTabletId());
Preconditions.checkNotNull(tablet);
statusPair = tablet.getHealthStatusWithPriority(
infoService, tabletInfo.getCluster(),
infoService, tabletCtx.getCluster(),
partition.getVisibleVersion(),
partition.getVisibleVersionHash(),
tbl.getPartitionInfo().getReplicationNum(partition.getId()));
if (tabletInfo.getType() == TabletSchedCtx.Type.BALANCE && tableState != OlapTableState.NORMAL) {
if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE && tableState != OlapTableState.NORMAL) {
// If table is under ALTER process, do not allow to do balance.
throw new SchedException(Status.UNRECOVERABLE, "table's state is not NORMAL");
}
@ -462,54 +462,56 @@ public class TabletScheduler extends Daemon {
"table's state is not NORMAL but tablet status is " + statusPair.first.name());
}
tabletInfo.setTabletStatus(statusPair.first);
if (statusPair.first == TabletStatus.HEALTHY && tabletInfo.getType() == TabletSchedCtx.Type.REPAIR) {
tabletCtx.setTabletStatus(statusPair.first);
if (statusPair.first == TabletStatus.HEALTHY && tabletCtx.getType() == TabletSchedCtx.Type.REPAIR) {
throw new SchedException(Status.UNRECOVERABLE, "tablet is healthy");
} else if (statusPair.first != TabletStatus.HEALTHY
&& tabletInfo.getType() == TabletSchedCtx.Type.BALANCE) {
tabletInfo.releaseResource(this);
&& tabletCtx.getType() == TabletSchedCtx.Type.BALANCE) {
tabletCtx.releaseResource(this);
// we select an unhealthy tablet to do balance, which is not right.
// so here we change it to a REPAIR task, and also reset its priority
tabletInfo.setType(TabletSchedCtx.Type.REPAIR);
tabletInfo.setOrigPriority(statusPair.second);
tabletCtx.setType(TabletSchedCtx.Type.REPAIR);
tabletCtx.setOrigPriority(statusPair.second);
tabletCtx.setLastSchedTime(currentTime);
tabletCtx.setLastVisitedTime(currentTime);
}
// we do not concern priority here.
// once we take the tablet out of priority queue, priority is meaningless.
tabletInfo.setTablet(tablet);
tabletInfo.setVersionInfo(partition.getVisibleVersion(), partition.getVisibleVersionHash(),
tabletCtx.setTablet(tablet);
tabletCtx.setVersionInfo(partition.getVisibleVersion(), partition.getVisibleVersionHash(),
partition.getCommittedVersion(), partition.getCommittedVersionHash());
tabletInfo.setSchemaHash(tbl.getSchemaHashByIndexId(idx.getId()));
tabletInfo.setStorageMedium(tbl.getPartitionInfo().getDataProperty(partition.getId()).getStorageMedium());
tabletCtx.setSchemaHash(tbl.getSchemaHashByIndexId(idx.getId()));
tabletCtx.setStorageMedium(tbl.getPartitionInfo().getDataProperty(partition.getId()).getStorageMedium());
handleTabletByTypeAndStatus(statusPair.first, tabletInfo, batchTask);
handleTabletByTypeAndStatus(statusPair.first, tabletCtx, batchTask);
} finally {
db.writeUnlock();
}
}
private void handleTabletByTypeAndStatus(TabletStatus status, TabletSchedCtx tabletInfo, AgentBatchTask batchTask)
private void handleTabletByTypeAndStatus(TabletStatus status, TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
throws SchedException {
if (tabletInfo.getType() == Type.REPAIR) {
if (tabletCtx.getType() == Type.REPAIR) {
switch (status) {
case REPLICA_MISSING:
handleReplicaMissing(tabletInfo, batchTask);
handleReplicaMissing(tabletCtx, batchTask);
break;
case VERSION_INCOMPLETE:
handleReplicaVersionIncomplete(tabletInfo, batchTask);
handleReplicaVersionIncomplete(tabletCtx, batchTask);
break;
case REDUNDANT:
handleRedundantReplica(tabletInfo);
handleRedundantReplica(tabletCtx);
break;
case REPLICA_MISSING_IN_CLUSTER:
handleReplicaClusterMigration(tabletInfo, batchTask);
handleReplicaClusterMigration(tabletCtx, batchTask);
break;
default:
break;
}
} else {
// balance
doBalance(tabletInfo, batchTask);
doBalance(tabletCtx, batchTask);
}
}
@ -528,18 +530,18 @@ public class TabletScheduler extends Daemon {
*
* 3. send clone task to destination backend
*/
private void handleReplicaMissing(TabletSchedCtx tabletInfo, AgentBatchTask batchTask) throws SchedException {
private void handleReplicaMissing(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException {
stat.counterReplicaMissingErr.incrementAndGet();
// find an available dest backend and path
RootPathLoadStatistic destPath = chooseAvailableDestPath(tabletInfo);
RootPathLoadStatistic destPath = chooseAvailableDestPath(tabletCtx);
Preconditions.checkNotNull(destPath);
tabletInfo.setDestination(destPath.getBeId(), destPath.getPathHash());
tabletCtx.setDest(destPath.getBeId(), destPath.getPathHash());
// choose a source replica for cloning from
tabletInfo.chooseSrcReplica(backendsWorkingSlots);
tabletCtx.chooseSrcReplica(backendsWorkingSlots);
// create clone task
batchTask.addTask(tabletInfo.createCloneReplicaAndTask());
batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
}
/*
@ -550,19 +552,19 @@ public class TabletScheduler extends Daemon {
* 2. find a healthy replica as source replica
* 3. send clone task
*/
private void handleReplicaVersionIncomplete(TabletSchedCtx tabletInfo, AgentBatchTask batchTask)
private void handleReplicaVersionIncomplete(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
throws SchedException {
stat.counterReplicaVersionMissingErr.incrementAndGet();
ClusterLoadStatistic statistic = statisticMap.get(tabletInfo.getCluster());
ClusterLoadStatistic statistic = statisticMap.get(tabletCtx.getCluster());
if (statistic == null) {
throw new SchedException(Status.UNRECOVERABLE, "cluster does not exist");
}
tabletInfo.chooseDestReplicaForVersionIncomplete(backendsWorkingSlots);
tabletInfo.chooseSrcReplicaForVersionIncomplete(backendsWorkingSlots);
tabletCtx.chooseDestReplicaForVersionIncomplete(backendsWorkingSlots);
tabletCtx.chooseSrcReplicaForVersionIncomplete(backendsWorkingSlots);
// create clone task
batchTask.addTask(tabletInfo.createCloneReplicaAndTask());
batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
}
/*
@ -577,15 +579,15 @@ public class TabletScheduler extends Daemon {
* 6. replica not in right cluster
* 7. replica in higher load backend
*/
private void handleRedundantReplica(TabletSchedCtx tabletInfo) throws SchedException {
private void handleRedundantReplica(TabletSchedCtx tabletCtx) throws SchedException {
stat.counterReplicaRedundantErr.incrementAndGet();
if (deleteBackendDropped(tabletInfo)
|| deleteBackendUnavailable(tabletInfo)
|| deleteCloneReplica(tabletInfo)
|| deleteReplicaWithFailedVersion(tabletInfo)
|| deleteReplicaWithLowerVersion(tabletInfo)
|| deleteReplicaNotInCluster(tabletInfo)
|| deleteReplicaOnHighLoadBackend(tabletInfo)) {
if (deleteBackendDropped(tabletCtx)
|| deleteBackendUnavailable(tabletCtx)
|| deleteCloneReplica(tabletCtx)
|| deleteReplicaWithFailedVersion(tabletCtx)
|| deleteReplicaWithLowerVersion(tabletCtx)
|| deleteReplicaNotInCluster(tabletCtx)
|| deleteReplicaOnHighLoadBackend(tabletCtx)) {
// if we delete at least one redundant replica, we still throw a SchedException with status FINISHED
// to remove this tablet from the pendingTablets(consider it as finished)
throw new SchedException(Status.FINISHED, "redundant replica is deleted");
@ -593,86 +595,86 @@ public class TabletScheduler extends Daemon {
throw new SchedException(Status.SCHEDULE_FAILED, "unable to delete any redundant replicas");
}
private boolean deleteBackendDropped(TabletSchedCtx tabletInfo) {
for (Replica replica : tabletInfo.getReplicas()) {
private boolean deleteBackendDropped(TabletSchedCtx tabletCtx) {
for (Replica replica : tabletCtx.getReplicas()) {
long beId = replica.getBackendId();
if (infoService.getBackend(beId) == null) {
deleteReplicaInternal(tabletInfo, replica, "backend dropped");
deleteReplicaInternal(tabletCtx, replica, "backend dropped");
return true;
}
}
return false;
}
private boolean deleteBackendUnavailable(TabletSchedCtx tabletInfo) {
for (Replica replica : tabletInfo.getReplicas()) {
private boolean deleteBackendUnavailable(TabletSchedCtx tabletCtx) {
for (Replica replica : tabletCtx.getReplicas()) {
Backend be = infoService.getBackend(replica.getBackendId());
if (be == null) {
// this case should be handled in deleteBackendDropped()
continue;
}
if (!be.isAvailable()) {
deleteReplicaInternal(tabletInfo, replica, "backend unavailable");
deleteReplicaInternal(tabletCtx, replica, "backend unavailable");
return true;
}
}
return false;
}
private boolean deleteCloneReplica(TabletSchedCtx tabletInfo) {
for (Replica replica : tabletInfo.getReplicas()) {
private boolean deleteCloneReplica(TabletSchedCtx tabletCtx) {
for (Replica replica : tabletCtx.getReplicas()) {
if (replica.getState() == ReplicaState.CLONE) {
deleteReplicaInternal(tabletInfo, replica, "clone state");
deleteReplicaInternal(tabletCtx, replica, "clone state");
return true;
}
}
return false;
}
private boolean deleteReplicaWithFailedVersion(TabletSchedCtx tabletInfo) {
for (Replica replica : tabletInfo.getReplicas()) {
private boolean deleteReplicaWithFailedVersion(TabletSchedCtx tabletCtx) {
for (Replica replica : tabletCtx.getReplicas()) {
if (replica.getLastFailedVersion() > 0) {
deleteReplicaInternal(tabletInfo, replica, "version incomplete");
deleteReplicaInternal(tabletCtx, replica, "version incomplete");
return true;
}
}
return false;
}
private boolean deleteReplicaWithLowerVersion(TabletSchedCtx tabletInfo) {
for (Replica replica : tabletInfo.getReplicas()) {
if (!replica.checkVersionCatchUp(tabletInfo.getCommittedVersion(), tabletInfo.getCommittedVersionHash())) {
deleteReplicaInternal(tabletInfo, replica, "lower version");
private boolean deleteReplicaWithLowerVersion(TabletSchedCtx tabletCtx) {
for (Replica replica : tabletCtx.getReplicas()) {
if (!replica.checkVersionCatchUp(tabletCtx.getCommittedVersion(), tabletCtx.getCommittedVersionHash())) {
deleteReplicaInternal(tabletCtx, replica, "lower version");
return true;
}
}
return false;
}
private boolean deleteReplicaNotInCluster(TabletSchedCtx tabletInfo) {
for (Replica replica : tabletInfo.getReplicas()) {
private boolean deleteReplicaNotInCluster(TabletSchedCtx tabletCtx) {
for (Replica replica : tabletCtx.getReplicas()) {
Backend be = infoService.getBackend(replica.getBackendId());
if (be == null) {
// this case should be handled in deleteBackendDropped()
continue;
}
if (!be.getOwnerClusterName().equals(tabletInfo.getCluster())) {
deleteReplicaInternal(tabletInfo, replica, "not in cluster");
if (!be.getOwnerClusterName().equals(tabletCtx.getCluster())) {
deleteReplicaInternal(tabletCtx, replica, "not in cluster");
return true;
}
}
return false;
}
private boolean deleteReplicaOnHighLoadBackend(TabletSchedCtx tabletInfo) {
ClusterLoadStatistic statistic = statisticMap.get(tabletInfo.getCluster());
private boolean deleteReplicaOnHighLoadBackend(TabletSchedCtx tabletCtx) {
ClusterLoadStatistic statistic = statisticMap.get(tabletCtx.getCluster());
if (statistic == null) {
return false;
}
Replica chosenReplica = null;
double maxScore = 0;
for (Replica replica : tabletInfo.getReplicas()) {
for (Replica replica : tabletCtx.getReplicas()) {
BackendLoadStatistic beStatistic = statistic.getBackendLoadStatistic(replica.getBackendId());
if (beStatistic == null) {
continue;
@ -684,29 +686,29 @@ public class TabletScheduler extends Daemon {
}
if (chosenReplica != null) {
deleteReplicaInternal(tabletInfo, chosenReplica, "high load");
deleteReplicaInternal(tabletCtx, chosenReplica, "high load");
return true;
}
return false;
}
private void deleteReplicaInternal(TabletSchedCtx tabletInfo, Replica replica, String reason) {
private void deleteReplicaInternal(TabletSchedCtx tabletCtx, Replica replica, String reason) {
// delete this replica from catalog.
// it will also delete replica from tablet inverted index.
tabletInfo.deleteReplica(replica);
tabletCtx.deleteReplica(replica);
// write edit log
ReplicaPersistInfo info = ReplicaPersistInfo.createForDelete(tabletInfo.getDbId(),
tabletInfo.getTblId(),
tabletInfo.getPartitionId(),
tabletInfo.getIndexId(),
tabletInfo.getTabletId(),
ReplicaPersistInfo info = ReplicaPersistInfo.createForDelete(tabletCtx.getDbId(),
tabletCtx.getTblId(),
tabletCtx.getPartitionId(),
tabletCtx.getIndexId(),
tabletCtx.getTabletId(),
replica.getBackendId());
Catalog.getInstance().getEditLog().logDeleteReplica(info);
LOG.info("delete replica. tablet id: {}, backend id: {}. reason: {}",
tabletInfo.getTabletId(), replica.getBackendId(), reason);
tabletCtx.getTabletId(), replica.getBackendId(), reason);
}
/*
@ -716,10 +718,10 @@ public class TabletScheduler extends Daemon {
*
* after clone finished, the replica in wrong cluster will be treated as redundant, and will be deleted soon.
*/
private void handleReplicaClusterMigration(TabletSchedCtx tabletInfo, AgentBatchTask batchTask)
private void handleReplicaClusterMigration(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
throws SchedException {
stat.counterReplicaMissingInClusterErr.incrementAndGet();
handleReplicaMissing(tabletInfo, batchTask);
handleReplicaMissing(tabletCtx, batchTask);
}
/*
@ -734,23 +736,23 @@ public class TabletScheduler extends Daemon {
LoadBalancer loadBalancer = new LoadBalancer(statisticMap);
List<TabletSchedCtx> alternativeTablets = loadBalancer.selectAlternativeTablets();
for (TabletSchedCtx tabletInfo : alternativeTablets) {
addTablet(tabletInfo, false);
for (TabletSchedCtx tabletCtx : alternativeTablets) {
addTablet(tabletCtx, false);
}
}
/*
* Try to create a balance task for a tablet.
*/
private void doBalance(TabletSchedCtx tabletInfo, AgentBatchTask batchTask) throws SchedException {
private void doBalance(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException {
stat.counterBalanceSchedule.incrementAndGet();
LoadBalancer loadBalancer = new LoadBalancer(statisticMap);
loadBalancer.createBalanceTask(tabletInfo, backendsWorkingSlots, batchTask);
loadBalancer.createBalanceTask(tabletCtx, backendsWorkingSlots, batchTask);
}
// choose a path on a backend which is fit for the tablet
private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletInfo) throws SchedException {
ClusterLoadStatistic statistic = statisticMap.get(tabletInfo.getCluster());
private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx) throws SchedException {
ClusterLoadStatistic statistic = statisticMap.get(tabletCtx.getCluster());
if (statistic == null) {
throw new SchedException(Status.UNRECOVERABLE, "cluster does not exist");
}
@ -762,14 +764,14 @@ public class TabletScheduler extends Daemon {
for (int i = 0; i < beStatistics.size(); i++) {
BackendLoadStatistic bes = beStatistics.get(i);
// exclude BE which already has replica of this tablet
if (tabletInfo.containsBE(bes.getBeId())) {
if (tabletCtx.containsBE(bes.getBeId())) {
continue;
}
List<RootPathLoadStatistic> resultPaths = Lists.newArrayList();
BalanceStatus st = bes.isFit(tabletInfo.getTabletSize(), resultPaths, true /* is supplement */);
BalanceStatus st = bes.isFit(tabletCtx.getTabletSize(), resultPaths, true /* is supplement */);
if (!st.ok()) {
LOG.debug("unable to find path for supplementing tablet: {}. {}", tabletInfo, st);
LOG.debug("unable to find path for supplementing tablet: {}. {}", tabletCtx, st);
continue;
}
@ -785,7 +787,7 @@ public class TabletScheduler extends Daemon {
// just get first available path.
// we try to find a path with specified media type, if not find, arbitrarily use one.
for (RootPathLoadStatistic rootPathLoadStatistic : allFitPaths) {
if (rootPathLoadStatistic.getStorageMedium() != tabletInfo.getStorageMedium()) {
if (rootPathLoadStatistic.getStorageMedium() != tabletCtx.getStorageMedium()) {
continue;
}
@ -820,24 +822,24 @@ public class TabletScheduler extends Daemon {
* For some reason, a tablet info failed to be scheduled this time,
* So we dynamically change its priority and add back to queue, waiting for next round.
*/
private void dynamicAdjustPrioAndAddBackToPendingTablets(TabletSchedCtx tabletInfo, String message) {
Preconditions.checkState(tabletInfo.getState() == TabletSchedCtx.State.PENDING);
tabletInfo.adjustPriority(stat);
addTablet(tabletInfo, true /* force */);
private void dynamicAdjustPrioAndAddBackToPendingTablets(TabletSchedCtx tabletCtx, String message) {
Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.PENDING);
tabletCtx.adjustPriority(stat);
addTablet(tabletCtx, true /* force */);
}
private synchronized void removeTabletInfo(TabletSchedCtx tabletInfo, TabletSchedCtx.State state, String reason) {
tabletInfo.setState(state);
tabletInfo.releaseResource(this);
tabletInfo.setFinishedTime(System.currentTimeMillis());
runningTablets.remove(tabletInfo.getTabletId());
allTabletIds.remove(tabletInfo.getTabletId());
schedHistory.add(tabletInfo);
LOG.info("remove the tablet {}. because: {}", tabletInfo.getTabletId(), reason);
private synchronized void removeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, String reason) {
tabletCtx.setState(state);
tabletCtx.releaseResource(this);
tabletCtx.setFinishedTime(System.currentTimeMillis());
runningTablets.remove(tabletCtx.getTabletId());
allTabletIds.remove(tabletCtx.getTabletId());
schedHistory.add(tabletCtx);
LOG.info("remove the tablet {}. because: {}", tabletCtx.getTabletId(), reason);
}
// get next batch of tablets from queue.
private synchronized List<TabletSchedCtx> getNextTabletInfoBatch() {
private synchronized List<TabletSchedCtx> getNextTabletCtxBatch() {
List<TabletSchedCtx> list = Lists.newArrayList();
int count = Math.max(MIN_BATCH_NUM, getCurrentAvailableSlotNum());
while (count > 0) {
@ -861,39 +863,43 @@ public class TabletScheduler extends Daemon {
}
/*
* return true if we want to remove the clone task from AgentTaskQueu
* return true if we want to remove the clone task from AgentTaskQueue
*/
public boolean finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) {
long tabletId = cloneTask.getTabletId();
TabletSchedCtx tabletInfo = takeRunningTablets(tabletId);
if (tabletInfo == null) {
TabletSchedCtx tabletCtx = takeRunningTablets(tabletId);
if (tabletCtx == null) {
LOG.warn("tablet info does not exist: {}", tabletId);
// tablet does not exist, no need to keep task.
return true;
}
Preconditions.checkState(tabletInfo.getState() == TabletSchedCtx.State.RUNNING);
Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING, tabletCtx.getState());
try {
tabletInfo.finishCloneTask(cloneTask, request);
tabletCtx.finishCloneTask(cloneTask, request);
} catch (SchedException e) {
tabletInfo.increaseFailedRunningCounter();
tabletInfo.setErrMsg(e.getMessage());
tabletCtx.increaseFailedRunningCounter();
tabletCtx.setErrMsg(e.getMessage());
if (e.getStatus() == Status.RUNNING_FAILED) {
stat.counterCloneTaskFailed.incrementAndGet();
addToRunningTablets(tabletInfo);
addToRunningTablets(tabletCtx);
return false;
} else {
Preconditions.checkState(e.getStatus() == Status.UNRECOVERABLE, e.getStatus());
} else if (e.getStatus() == Status.UNRECOVERABLE) {
// unrecoverable
stat.counterTabletScheduledDiscard.incrementAndGet();
removeTabletInfo(tabletInfo, TabletSchedCtx.State.CANCELLED, e.getMessage());
removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage());
return true;
} else if (e.getStatus() == Status.FINISHED) {
// tablet is already healthy, just remove
removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage());
return true;
}
}
Preconditions.checkState(tabletInfo.getState() == TabletSchedCtx.State.FINISHED);
Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.FINISHED);
stat.counterCloneTaskSucceeded.incrementAndGet();
gatherStatistics(tabletInfo);
removeTabletInfo(tabletInfo, TabletSchedCtx.State.FINISHED, "finished");
gatherStatistics(tabletCtx);
removeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, "finished");
return true;
}
@ -902,21 +908,21 @@ public class TabletScheduler extends Daemon {
* It will be evaluated for future strategy.
* This should only be called when the tablet is down with state FINISHED.
*/
private void gatherStatistics(TabletSchedCtx tabletInfo) {
if (tabletInfo.getCopySize() > 0 && tabletInfo.getCopyTimeMs() > 0) {
if (tabletInfo.getSrcBackendId() != -1 && tabletInfo.getSrcPathHash() != -1) {
PathSlot pathSlot = backendsWorkingSlots.get(tabletInfo.getSrcBackendId());
private void gatherStatistics(TabletSchedCtx tabletCtx) {
if (tabletCtx.getCopySize() > 0 && tabletCtx.getCopyTimeMs() > 0) {
if (tabletCtx.getSrcBackendId() != -1 && tabletCtx.getSrcPathHash() != -1) {
PathSlot pathSlot = backendsWorkingSlots.get(tabletCtx.getSrcBackendId());
if (pathSlot != null) {
pathSlot.updateStatistic(tabletInfo.getSrcPathHash(), tabletInfo.getCopySize(),
tabletInfo.getCopyTimeMs());
pathSlot.updateStatistic(tabletCtx.getSrcPathHash(), tabletCtx.getCopySize(),
tabletCtx.getCopyTimeMs());
}
}
if (tabletInfo.getDestBackendId() != -1 && tabletInfo.getDestPathHash() != -1) {
PathSlot pathSlot = backendsWorkingSlots.get(tabletInfo.getDestBackendId());
if (tabletCtx.getDestBackendId() != -1 && tabletCtx.getDestPathHash() != -1) {
PathSlot pathSlot = backendsWorkingSlots.get(tabletCtx.getDestBackendId());
if (pathSlot != null) {
pathSlot.updateStatistic(tabletInfo.getDestPathHash(), tabletInfo.getCopySize(),
tabletInfo.getCopyTimeMs());
pathSlot.updateStatistic(tabletCtx.getDestPathHash(), tabletCtx.getCopySize(),
tabletCtx.getCopyTimeMs());
}
}
}
@ -951,40 +957,40 @@ public class TabletScheduler extends Daemon {
});
timeoutTablets.stream().forEach(t -> {
removeTabletInfo(t, TabletSchedCtx.State.TIMEOUT, "timeout");
removeTabletCtx(t, TabletSchedCtx.State.TIMEOUT, "timeout");
stat.counterCloneTaskTimeout.incrementAndGet();
});
}
public List<List<String>> getPendingTabletsInfo(int limit) {
List<TabletSchedCtx> tabletInfos = getCopiedTablets(pendingTablets, limit);
return collectTabletInfo(tabletInfos);
List<TabletSchedCtx> tabletCtxs = getCopiedTablets(pendingTablets, limit);
return collectTabletCtx(tabletCtxs);
}
public List<List<String>> getRunningTabletsInfo(int limit) {
List<TabletSchedCtx> tabletInfos = getCopiedTablets(runningTablets.values(), limit);
return collectTabletInfo(tabletInfos);
List<TabletSchedCtx> tabletCtxs = getCopiedTablets(runningTablets.values(), limit);
return collectTabletCtx(tabletCtxs);
}
public List<List<String>> getHistoryTabletsInfo(int limit) {
List<TabletSchedCtx> tabletInfos = getCopiedTablets(schedHistory, limit);
return collectTabletInfo(tabletInfos);
List<TabletSchedCtx> tabletCtxs = getCopiedTablets(schedHistory, limit);
return collectTabletCtx(tabletCtxs);
}
private List<List<String>> collectTabletInfo(List<TabletSchedCtx> tabletInfos) {
private List<List<String>> collectTabletCtx(List<TabletSchedCtx> tabletCtxs) {
List<List<String>> result = Lists.newArrayList();
tabletInfos.stream().forEach(t -> {
tabletCtxs.stream().forEach(t -> {
result.add(t.getBrief());
});
return result;
}
private synchronized List<TabletSchedCtx> getCopiedTablets(Collection<TabletSchedCtx> source, int limit) {
List<TabletSchedCtx> tabletInfos = Lists.newArrayList();
List<TabletSchedCtx> tabletCtxs = Lists.newArrayList();
source.stream().limit(limit).forEach(t -> {
tabletInfos.add(t);
tabletCtxs.add(t);
});
return tabletInfos;
return tabletCtxs;
}
public synchronized int getPendingNum() {