From 9d71a930a2a94daaa83dfdc1713a8ef6adc48c7c Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Fri, 25 Jan 2019 16:49:15 +0800 Subject: [PATCH] Fix bug that repair slot may not be released when clone finished (#589) --- be/src/olap/olap_snapshot.cpp | 2 +- .../org/apache/doris/clone/LoadBalancer.java | 18 +- .../apache/doris/clone/TabletSchedCtx.java | 14 +- .../apache/doris/clone/TabletScheduler.java | 338 +++++++++--------- 4 files changed, 188 insertions(+), 184 deletions(-) diff --git a/be/src/olap/olap_snapshot.cpp b/be/src/olap/olap_snapshot.cpp index 9c0d74e801..be18729bdc 100644 --- a/be/src/olap/olap_snapshot.cpp +++ b/be/src/olap/olap_snapshot.cpp @@ -668,7 +668,7 @@ OLAPStatus OLAPEngine::storage_medium_migrate( tablet->obtain_header_rdlock(); if (tablet->has_pending_data()) { tablet->release_header_lock(); - res = OLAP_ERR_HEADER_HAS_PENDING_DATA + res = OLAP_ERR_HEADER_HAS_PENDING_DATA; OLAP_LOG_WARNING("could not migration because has pending data [tablet='%s' ]", tablet->full_name().c_str()); break; diff --git a/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java b/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java index 126859c99f..d16faab3ee 100644 --- a/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java +++ b/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java @@ -175,9 +175,9 @@ public class LoadBalancer { * 2. Select a low load backend as destination. And tablet should not has replica on this backend. * 3. Create a clone task. */ - public void createBalanceTask(TabletSchedCtx tabletInfo, Map backendsWorkingSlots, + public void createBalanceTask(TabletSchedCtx tabletCtx, Map backendsWorkingSlots, AgentBatchTask batchTask) throws SchedException { - ClusterLoadStatistic clusterStat = statisticMap.get(tabletInfo.getCluster()); + ClusterLoadStatistic clusterStat = statisticMap.get(tabletCtx.getCluster()); if (clusterStat == null) { throw new SchedException(Status.UNRECOVERABLE, "cluster does not exist"); } @@ -197,7 +197,7 @@ public class LoadBalancer { throw new SchedException(Status.UNRECOVERABLE, "all low load backends is unavailable"); } - List replicas = tabletInfo.getReplicas(); + List replicas = tabletCtx.getReplicas(); // Check if this tablet has replica on high load backend. boolean hasHighReplica = false; @@ -222,7 +222,7 @@ public class LoadBalancer { if (pathHash == -1) { continue; } else { - tabletInfo.setSrc(replica); + tabletCtx.setSrc(replica); setSource = true; break; } @@ -238,13 +238,13 @@ public class LoadBalancer { // no replica on this low load backend // 1. check if this clone task can make the cluster more balance. List availPaths = Lists.newArrayList(); - if (beStat.isFit(tabletInfo.getTabletSize(), availPaths, + if (beStat.isFit(tabletCtx.getTabletSize(), availPaths, false /* not supplement */) != BalanceStatus.OK) { continue; } - if (!clusterStat.isMoreBalanced(tabletInfo.getSrcBackendId(), beStat.getBeId(), - tabletInfo.getTabletId(), tabletInfo.getTabletSize())) { + if (!clusterStat.isMoreBalanced(tabletCtx.getSrcBackendId(), beStat.getBeId(), + tabletCtx.getTabletId(), tabletCtx.getTabletSize())) { continue; } @@ -265,7 +265,7 @@ public class LoadBalancer { if (pathHash == -1) { continue; } else { - tabletInfo.setDestination(beStat.getBeId(), pathHash); + tabletCtx.setDest(beStat.getBeId(), pathHash); setDest = true; break; } @@ -277,6 +277,6 @@ public class LoadBalancer { } // create clone task - batchTask.addTask(tabletInfo.createCloneReplicaAndTask()); + batchTask.addTask(tabletCtx.createCloneReplicaAndTask()); } } diff --git a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 0b259ad8c2..02bea58e55 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -341,8 +341,8 @@ public class TabletSchedCtx implements Comparable { this.committedVersionHash = committedVersionHash; } - public void setDestination(Long destBe, long destPathHash) { - this.destBackendId = destBe; + public void setDest(Long destBeId, long destPathHash) { + this.destBackendId = destBeId; this.destPathHash = destPathHash; } @@ -447,8 +447,7 @@ public class TabletSchedCtx implements Comparable { long srcPathHash = slot.takeSlot(srcReplica.getPathHash()); if (srcPathHash != -1) { - this.srcReplica = srcReplica; - this.srcPathHash = srcPathHash; + setSrc(srcReplica); return; } } @@ -515,8 +514,7 @@ public class TabletSchedCtx implements Comparable { throw new SchedException(Status.SCHEDULE_FAILED, "unable to take slot of dest path"); } - this.destBackendId = chosenReplica.getBackendId(); - this.destPathHash = chosenReplica.getPathHash(); + setDest(chosenReplica.getBackendId(), chosenReplica.getPathHash()); } /* @@ -639,7 +637,7 @@ public class TabletSchedCtx implements Comparable { */ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) throws SchedException { - Preconditions.checkState(state == State.RUNNING); + Preconditions.checkState(state == State.RUNNING, state); Preconditions.checkArgument(cloneTask.getTaskVersion() == CloneTask.VERSION_2); setLastVisitedTime(System.currentTimeMillis()); @@ -854,7 +852,7 @@ public class TabletSchedCtx implements Comparable { return false; } - Preconditions.checkState(lastSchedTime != 0 && taskTimeoutMs != 0); + Preconditions.checkState(lastSchedTime != 0 && taskTimeoutMs != 0, lastSchedTime + "-" + taskTimeoutMs); return System.currentTimeMillis() - lastSchedTime > taskTimeoutMs; } diff --git a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java index 00d53c3782..3c14bf01f2 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -108,7 +108,7 @@ public class TabletScheduler extends Daemon { */ private PriorityQueue pendingTablets = new PriorityQueue<>(); private Set allTabletIds = Sets.newHashSet(); - // contains all tabletInfos which state are RUNNING + // contains all tabletCtxs which state are RUNNING private Map runningTablets = Maps.newHashMap(); // save the latest 1000 scheduled tablet info private Queue schedHistory = EvictingQueue.create(1000); @@ -226,12 +226,12 @@ public class TabletScheduler extends Daemon { */ public synchronized void changePriorityOfTablets(long dbId, long tblId, List partitionIds) { PriorityQueue newPendingTablets = new PriorityQueue<>(); - for (TabletSchedCtx tabletInfo : pendingTablets) { - if (tabletInfo.getDbId() == dbId && tabletInfo.getTblId() == tblId - && partitionIds.contains(tabletInfo.getPartitionId())) { - tabletInfo.setOrigPriority(Priority.VERY_HIGH); + for (TabletSchedCtx tabletCtx : pendingTablets) { + if (tabletCtx.getDbId() == dbId && tabletCtx.getTblId() == tblId + && partitionIds.contains(tabletCtx.getPartitionId())) { + tabletCtx.setOrigPriority(Priority.VERY_HIGH); } - newPendingTablets.add(tabletInfo); + newPendingTablets.add(tabletCtx); } pendingTablets = newPendingTablets; } @@ -309,17 +309,17 @@ public class TabletScheduler extends Daemon { private synchronized void adjustPriorities() { int size = pendingTablets.size(); int changedNum = 0; - TabletSchedCtx tabletInfo = null; + TabletSchedCtx tabletCtx = null; for (int i = 0; i < size; i++) { - tabletInfo = pendingTablets.poll(); - if (tabletInfo == null) { + tabletCtx = pendingTablets.poll(); + if (tabletCtx == null) { break; } - if (tabletInfo.adjustPriority(stat)) { + if (tabletCtx.adjustPriority(stat)) { changedNum++; } - pendingTablets.add(tabletInfo); + pendingTablets.add(tabletCtx); } LOG.info("adjust priority for all tablets. changed: {}, total: {}", changedNum, size); @@ -336,45 +336,45 @@ public class TabletScheduler extends Daemon { */ private void schedulePendingTablets() { long start = System.currentTimeMillis(); - List currentBatch = getNextTabletInfoBatch(); + List currentBatch = getNextTabletCtxBatch(); LOG.debug("get {} tablets to schedule", currentBatch.size()); AgentBatchTask batchTask = new AgentBatchTask(); - for (TabletSchedCtx tabletInfo : currentBatch) { + for (TabletSchedCtx tabletCtx : currentBatch) { try { - scheduleTablet(tabletInfo, batchTask); + scheduleTablet(tabletCtx, batchTask); } catch (SchedException e) { - tabletInfo.increaseFailedSchedCounter(); - tabletInfo.setErrMsg(e.getMessage()); + tabletCtx.increaseFailedSchedCounter(); + tabletCtx.setErrMsg(e.getMessage()); if (e.getStatus() == Status.SCHEDULE_FAILED) { // if balance is disabled, remove this tablet - if (tabletInfo.getType() == Type.BALANCE && Config.disable_balance) { - removeTabletInfo(tabletInfo, TabletSchedCtx.State.CANCELLED, + if (tabletCtx.getType() == Type.BALANCE && Config.disable_balance) { + removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, "disable balance and " + e.getMessage()); } else { // we must release resource it current hold, and be scheduled again - tabletInfo.releaseResource(this); + tabletCtx.releaseResource(this); // adjust priority to avoid some higher priority always be the first in pendingTablets stat.counterTabletScheduledFailed.incrementAndGet(); - dynamicAdjustPrioAndAddBackToPendingTablets(tabletInfo, e.getMessage()); + dynamicAdjustPrioAndAddBackToPendingTablets(tabletCtx, e.getMessage()); } } else if (e.getStatus() == Status.FINISHED) { // schedule redundant tablet will throw this exception stat.counterTabletScheduledSucceeded.incrementAndGet(); - removeTabletInfo(tabletInfo, TabletSchedCtx.State.FINISHED, e.getMessage()); + removeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, e.getMessage()); } else { Preconditions.checkState(e.getStatus() == Status.UNRECOVERABLE, e.getStatus()); // discard stat.counterTabletScheduledDiscard.incrementAndGet(); - removeTabletInfo(tabletInfo, TabletSchedCtx.State.CANCELLED, e.getMessage()); + removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage()); } continue; } - Preconditions.checkState(tabletInfo.getState() == TabletSchedCtx.State.RUNNING); + Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING); stat.counterTabletScheduledSucceeded.incrementAndGet(); - addToRunningTablets(tabletInfo); + addToRunningTablets(tabletCtx); } // must send task after adding tablet info to runningTablets. @@ -392,8 +392,8 @@ public class TabletScheduler extends Daemon { stat.counterTabletScheduleCostMs.addAndGet(cost); } - private synchronized void addToRunningTablets(TabletSchedCtx tabletInfo) { - runningTablets.put(tabletInfo.getTabletId(), tabletInfo); + private synchronized void addToRunningTablets(TabletSchedCtx tabletCtx) { + runningTablets.put(tabletCtx.getTabletId(), tabletCtx); } /* @@ -408,15 +408,15 @@ public class TabletScheduler extends Daemon { /* * Try to schedule a single tablet. */ - private void scheduleTablet(TabletSchedCtx tabletInfo, AgentBatchTask batchTask) throws SchedException { - LOG.debug("schedule tablet: {}", tabletInfo.getTabletId()); + private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException { + LOG.debug("schedule tablet: {}", tabletCtx.getTabletId()); long currentTime = System.currentTimeMillis(); - tabletInfo.setLastSchedTime(currentTime); - tabletInfo.setLastVisitedTime(currentTime); + tabletCtx.setLastSchedTime(currentTime); + tabletCtx.setLastVisitedTime(currentTime); stat.counterTabletScheduled.incrementAndGet(); // check this tablet again - Database db = catalog.getDb(tabletInfo.getDbId()); + Database db = catalog.getDb(tabletCtx.getDbId()); if (db == null) { throw new SchedException(Status.UNRECOVERABLE, "db does not exist"); } @@ -424,33 +424,33 @@ public class TabletScheduler extends Daemon { Pair statusPair = null; db.writeLock(); try { - OlapTable tbl = (OlapTable) db.getTable(tabletInfo.getTblId()); + OlapTable tbl = (OlapTable) db.getTable(tabletCtx.getTblId()); if (tbl == null) { throw new SchedException(Status.UNRECOVERABLE, "tbl does not exist"); } OlapTableState tableState = tbl.getState(); - Partition partition = tbl.getPartition(tabletInfo.getPartitionId()); + Partition partition = tbl.getPartition(tabletCtx.getPartitionId()); if (partition == null) { throw new SchedException(Status.UNRECOVERABLE, "partition does not exist"); } - MaterializedIndex idx = partition.getIndex(tabletInfo.getIndexId()); + MaterializedIndex idx = partition.getIndex(tabletCtx.getIndexId()); if (idx == null) { throw new SchedException(Status.UNRECOVERABLE, "index does not exist"); } - Tablet tablet = idx.getTablet(tabletInfo.getTabletId()); + Tablet tablet = idx.getTablet(tabletCtx.getTabletId()); Preconditions.checkNotNull(tablet); statusPair = tablet.getHealthStatusWithPriority( - infoService, tabletInfo.getCluster(), + infoService, tabletCtx.getCluster(), partition.getVisibleVersion(), partition.getVisibleVersionHash(), tbl.getPartitionInfo().getReplicationNum(partition.getId())); - if (tabletInfo.getType() == TabletSchedCtx.Type.BALANCE && tableState != OlapTableState.NORMAL) { + if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE && tableState != OlapTableState.NORMAL) { // If table is under ALTER process, do not allow to do balance. throw new SchedException(Status.UNRECOVERABLE, "table's state is not NORMAL"); } @@ -462,54 +462,56 @@ public class TabletScheduler extends Daemon { "table's state is not NORMAL but tablet status is " + statusPair.first.name()); } - tabletInfo.setTabletStatus(statusPair.first); - if (statusPair.first == TabletStatus.HEALTHY && tabletInfo.getType() == TabletSchedCtx.Type.REPAIR) { + tabletCtx.setTabletStatus(statusPair.first); + if (statusPair.first == TabletStatus.HEALTHY && tabletCtx.getType() == TabletSchedCtx.Type.REPAIR) { throw new SchedException(Status.UNRECOVERABLE, "tablet is healthy"); } else if (statusPair.first != TabletStatus.HEALTHY - && tabletInfo.getType() == TabletSchedCtx.Type.BALANCE) { - tabletInfo.releaseResource(this); + && tabletCtx.getType() == TabletSchedCtx.Type.BALANCE) { + tabletCtx.releaseResource(this); // we select an unhealthy tablet to do balance, which is not right. // so here we change it to a REPAIR task, and also reset its priority - tabletInfo.setType(TabletSchedCtx.Type.REPAIR); - tabletInfo.setOrigPriority(statusPair.second); + tabletCtx.setType(TabletSchedCtx.Type.REPAIR); + tabletCtx.setOrigPriority(statusPair.second); + tabletCtx.setLastSchedTime(currentTime); + tabletCtx.setLastVisitedTime(currentTime); } // we do not concern priority here. // once we take the tablet out of priority queue, priority is meaningless. - tabletInfo.setTablet(tablet); - tabletInfo.setVersionInfo(partition.getVisibleVersion(), partition.getVisibleVersionHash(), + tabletCtx.setTablet(tablet); + tabletCtx.setVersionInfo(partition.getVisibleVersion(), partition.getVisibleVersionHash(), partition.getCommittedVersion(), partition.getCommittedVersionHash()); - tabletInfo.setSchemaHash(tbl.getSchemaHashByIndexId(idx.getId())); - tabletInfo.setStorageMedium(tbl.getPartitionInfo().getDataProperty(partition.getId()).getStorageMedium()); + tabletCtx.setSchemaHash(tbl.getSchemaHashByIndexId(idx.getId())); + tabletCtx.setStorageMedium(tbl.getPartitionInfo().getDataProperty(partition.getId()).getStorageMedium()); - handleTabletByTypeAndStatus(statusPair.first, tabletInfo, batchTask); + handleTabletByTypeAndStatus(statusPair.first, tabletCtx, batchTask); } finally { db.writeUnlock(); } } - private void handleTabletByTypeAndStatus(TabletStatus status, TabletSchedCtx tabletInfo, AgentBatchTask batchTask) + private void handleTabletByTypeAndStatus(TabletStatus status, TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException { - if (tabletInfo.getType() == Type.REPAIR) { + if (tabletCtx.getType() == Type.REPAIR) { switch (status) { case REPLICA_MISSING: - handleReplicaMissing(tabletInfo, batchTask); + handleReplicaMissing(tabletCtx, batchTask); break; case VERSION_INCOMPLETE: - handleReplicaVersionIncomplete(tabletInfo, batchTask); + handleReplicaVersionIncomplete(tabletCtx, batchTask); break; case REDUNDANT: - handleRedundantReplica(tabletInfo); + handleRedundantReplica(tabletCtx); break; case REPLICA_MISSING_IN_CLUSTER: - handleReplicaClusterMigration(tabletInfo, batchTask); + handleReplicaClusterMigration(tabletCtx, batchTask); break; default: break; } } else { // balance - doBalance(tabletInfo, batchTask); + doBalance(tabletCtx, batchTask); } } @@ -528,18 +530,18 @@ public class TabletScheduler extends Daemon { * * 3. send clone task to destination backend */ - private void handleReplicaMissing(TabletSchedCtx tabletInfo, AgentBatchTask batchTask) throws SchedException { + private void handleReplicaMissing(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException { stat.counterReplicaMissingErr.incrementAndGet(); // find an available dest backend and path - RootPathLoadStatistic destPath = chooseAvailableDestPath(tabletInfo); + RootPathLoadStatistic destPath = chooseAvailableDestPath(tabletCtx); Preconditions.checkNotNull(destPath); - tabletInfo.setDestination(destPath.getBeId(), destPath.getPathHash()); + tabletCtx.setDest(destPath.getBeId(), destPath.getPathHash()); // choose a source replica for cloning from - tabletInfo.chooseSrcReplica(backendsWorkingSlots); + tabletCtx.chooseSrcReplica(backendsWorkingSlots); // create clone task - batchTask.addTask(tabletInfo.createCloneReplicaAndTask()); + batchTask.addTask(tabletCtx.createCloneReplicaAndTask()); } /* @@ -550,19 +552,19 @@ public class TabletScheduler extends Daemon { * 2. find a healthy replica as source replica * 3. send clone task */ - private void handleReplicaVersionIncomplete(TabletSchedCtx tabletInfo, AgentBatchTask batchTask) + private void handleReplicaVersionIncomplete(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException { stat.counterReplicaVersionMissingErr.incrementAndGet(); - ClusterLoadStatistic statistic = statisticMap.get(tabletInfo.getCluster()); + ClusterLoadStatistic statistic = statisticMap.get(tabletCtx.getCluster()); if (statistic == null) { throw new SchedException(Status.UNRECOVERABLE, "cluster does not exist"); } - tabletInfo.chooseDestReplicaForVersionIncomplete(backendsWorkingSlots); - tabletInfo.chooseSrcReplicaForVersionIncomplete(backendsWorkingSlots); + tabletCtx.chooseDestReplicaForVersionIncomplete(backendsWorkingSlots); + tabletCtx.chooseSrcReplicaForVersionIncomplete(backendsWorkingSlots); // create clone task - batchTask.addTask(tabletInfo.createCloneReplicaAndTask()); + batchTask.addTask(tabletCtx.createCloneReplicaAndTask()); } /* @@ -577,15 +579,15 @@ public class TabletScheduler extends Daemon { * 6. replica not in right cluster * 7. replica in higher load backend */ - private void handleRedundantReplica(TabletSchedCtx tabletInfo) throws SchedException { + private void handleRedundantReplica(TabletSchedCtx tabletCtx) throws SchedException { stat.counterReplicaRedundantErr.incrementAndGet(); - if (deleteBackendDropped(tabletInfo) - || deleteBackendUnavailable(tabletInfo) - || deleteCloneReplica(tabletInfo) - || deleteReplicaWithFailedVersion(tabletInfo) - || deleteReplicaWithLowerVersion(tabletInfo) - || deleteReplicaNotInCluster(tabletInfo) - || deleteReplicaOnHighLoadBackend(tabletInfo)) { + if (deleteBackendDropped(tabletCtx) + || deleteBackendUnavailable(tabletCtx) + || deleteCloneReplica(tabletCtx) + || deleteReplicaWithFailedVersion(tabletCtx) + || deleteReplicaWithLowerVersion(tabletCtx) + || deleteReplicaNotInCluster(tabletCtx) + || deleteReplicaOnHighLoadBackend(tabletCtx)) { // if we delete at least one redundant replica, we still throw a SchedException with status FINISHED // to remove this tablet from the pendingTablets(consider it as finished) throw new SchedException(Status.FINISHED, "redundant replica is deleted"); @@ -593,86 +595,86 @@ public class TabletScheduler extends Daemon { throw new SchedException(Status.SCHEDULE_FAILED, "unable to delete any redundant replicas"); } - private boolean deleteBackendDropped(TabletSchedCtx tabletInfo) { - for (Replica replica : tabletInfo.getReplicas()) { + private boolean deleteBackendDropped(TabletSchedCtx tabletCtx) { + for (Replica replica : tabletCtx.getReplicas()) { long beId = replica.getBackendId(); if (infoService.getBackend(beId) == null) { - deleteReplicaInternal(tabletInfo, replica, "backend dropped"); + deleteReplicaInternal(tabletCtx, replica, "backend dropped"); return true; } } return false; } - private boolean deleteBackendUnavailable(TabletSchedCtx tabletInfo) { - for (Replica replica : tabletInfo.getReplicas()) { + private boolean deleteBackendUnavailable(TabletSchedCtx tabletCtx) { + for (Replica replica : tabletCtx.getReplicas()) { Backend be = infoService.getBackend(replica.getBackendId()); if (be == null) { // this case should be handled in deleteBackendDropped() continue; } if (!be.isAvailable()) { - deleteReplicaInternal(tabletInfo, replica, "backend unavailable"); + deleteReplicaInternal(tabletCtx, replica, "backend unavailable"); return true; } } return false; } - private boolean deleteCloneReplica(TabletSchedCtx tabletInfo) { - for (Replica replica : tabletInfo.getReplicas()) { + private boolean deleteCloneReplica(TabletSchedCtx tabletCtx) { + for (Replica replica : tabletCtx.getReplicas()) { if (replica.getState() == ReplicaState.CLONE) { - deleteReplicaInternal(tabletInfo, replica, "clone state"); + deleteReplicaInternal(tabletCtx, replica, "clone state"); return true; } } return false; } - private boolean deleteReplicaWithFailedVersion(TabletSchedCtx tabletInfo) { - for (Replica replica : tabletInfo.getReplicas()) { + private boolean deleteReplicaWithFailedVersion(TabletSchedCtx tabletCtx) { + for (Replica replica : tabletCtx.getReplicas()) { if (replica.getLastFailedVersion() > 0) { - deleteReplicaInternal(tabletInfo, replica, "version incomplete"); + deleteReplicaInternal(tabletCtx, replica, "version incomplete"); return true; } } return false; } - private boolean deleteReplicaWithLowerVersion(TabletSchedCtx tabletInfo) { - for (Replica replica : tabletInfo.getReplicas()) { - if (!replica.checkVersionCatchUp(tabletInfo.getCommittedVersion(), tabletInfo.getCommittedVersionHash())) { - deleteReplicaInternal(tabletInfo, replica, "lower version"); + private boolean deleteReplicaWithLowerVersion(TabletSchedCtx tabletCtx) { + for (Replica replica : tabletCtx.getReplicas()) { + if (!replica.checkVersionCatchUp(tabletCtx.getCommittedVersion(), tabletCtx.getCommittedVersionHash())) { + deleteReplicaInternal(tabletCtx, replica, "lower version"); return true; } } return false; } - private boolean deleteReplicaNotInCluster(TabletSchedCtx tabletInfo) { - for (Replica replica : tabletInfo.getReplicas()) { + private boolean deleteReplicaNotInCluster(TabletSchedCtx tabletCtx) { + for (Replica replica : tabletCtx.getReplicas()) { Backend be = infoService.getBackend(replica.getBackendId()); if (be == null) { // this case should be handled in deleteBackendDropped() continue; } - if (!be.getOwnerClusterName().equals(tabletInfo.getCluster())) { - deleteReplicaInternal(tabletInfo, replica, "not in cluster"); + if (!be.getOwnerClusterName().equals(tabletCtx.getCluster())) { + deleteReplicaInternal(tabletCtx, replica, "not in cluster"); return true; } } return false; } - private boolean deleteReplicaOnHighLoadBackend(TabletSchedCtx tabletInfo) { - ClusterLoadStatistic statistic = statisticMap.get(tabletInfo.getCluster()); + private boolean deleteReplicaOnHighLoadBackend(TabletSchedCtx tabletCtx) { + ClusterLoadStatistic statistic = statisticMap.get(tabletCtx.getCluster()); if (statistic == null) { return false; } Replica chosenReplica = null; double maxScore = 0; - for (Replica replica : tabletInfo.getReplicas()) { + for (Replica replica : tabletCtx.getReplicas()) { BackendLoadStatistic beStatistic = statistic.getBackendLoadStatistic(replica.getBackendId()); if (beStatistic == null) { continue; @@ -684,29 +686,29 @@ public class TabletScheduler extends Daemon { } if (chosenReplica != null) { - deleteReplicaInternal(tabletInfo, chosenReplica, "high load"); + deleteReplicaInternal(tabletCtx, chosenReplica, "high load"); return true; } return false; } - private void deleteReplicaInternal(TabletSchedCtx tabletInfo, Replica replica, String reason) { + private void deleteReplicaInternal(TabletSchedCtx tabletCtx, Replica replica, String reason) { // delete this replica from catalog. // it will also delete replica from tablet inverted index. - tabletInfo.deleteReplica(replica); + tabletCtx.deleteReplica(replica); // write edit log - ReplicaPersistInfo info = ReplicaPersistInfo.createForDelete(tabletInfo.getDbId(), - tabletInfo.getTblId(), - tabletInfo.getPartitionId(), - tabletInfo.getIndexId(), - tabletInfo.getTabletId(), + ReplicaPersistInfo info = ReplicaPersistInfo.createForDelete(tabletCtx.getDbId(), + tabletCtx.getTblId(), + tabletCtx.getPartitionId(), + tabletCtx.getIndexId(), + tabletCtx.getTabletId(), replica.getBackendId()); Catalog.getInstance().getEditLog().logDeleteReplica(info); LOG.info("delete replica. tablet id: {}, backend id: {}. reason: {}", - tabletInfo.getTabletId(), replica.getBackendId(), reason); + tabletCtx.getTabletId(), replica.getBackendId(), reason); } /* @@ -716,10 +718,10 @@ public class TabletScheduler extends Daemon { * * after clone finished, the replica in wrong cluster will be treated as redundant, and will be deleted soon. */ - private void handleReplicaClusterMigration(TabletSchedCtx tabletInfo, AgentBatchTask batchTask) + private void handleReplicaClusterMigration(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException { stat.counterReplicaMissingInClusterErr.incrementAndGet(); - handleReplicaMissing(tabletInfo, batchTask); + handleReplicaMissing(tabletCtx, batchTask); } /* @@ -734,23 +736,23 @@ public class TabletScheduler extends Daemon { LoadBalancer loadBalancer = new LoadBalancer(statisticMap); List alternativeTablets = loadBalancer.selectAlternativeTablets(); - for (TabletSchedCtx tabletInfo : alternativeTablets) { - addTablet(tabletInfo, false); + for (TabletSchedCtx tabletCtx : alternativeTablets) { + addTablet(tabletCtx, false); } } /* * Try to create a balance task for a tablet. */ - private void doBalance(TabletSchedCtx tabletInfo, AgentBatchTask batchTask) throws SchedException { + private void doBalance(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException { stat.counterBalanceSchedule.incrementAndGet(); LoadBalancer loadBalancer = new LoadBalancer(statisticMap); - loadBalancer.createBalanceTask(tabletInfo, backendsWorkingSlots, batchTask); + loadBalancer.createBalanceTask(tabletCtx, backendsWorkingSlots, batchTask); } // choose a path on a backend which is fit for the tablet - private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletInfo) throws SchedException { - ClusterLoadStatistic statistic = statisticMap.get(tabletInfo.getCluster()); + private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx) throws SchedException { + ClusterLoadStatistic statistic = statisticMap.get(tabletCtx.getCluster()); if (statistic == null) { throw new SchedException(Status.UNRECOVERABLE, "cluster does not exist"); } @@ -762,14 +764,14 @@ public class TabletScheduler extends Daemon { for (int i = 0; i < beStatistics.size(); i++) { BackendLoadStatistic bes = beStatistics.get(i); // exclude BE which already has replica of this tablet - if (tabletInfo.containsBE(bes.getBeId())) { + if (tabletCtx.containsBE(bes.getBeId())) { continue; } List resultPaths = Lists.newArrayList(); - BalanceStatus st = bes.isFit(tabletInfo.getTabletSize(), resultPaths, true /* is supplement */); + BalanceStatus st = bes.isFit(tabletCtx.getTabletSize(), resultPaths, true /* is supplement */); if (!st.ok()) { - LOG.debug("unable to find path for supplementing tablet: {}. {}", tabletInfo, st); + LOG.debug("unable to find path for supplementing tablet: {}. {}", tabletCtx, st); continue; } @@ -785,7 +787,7 @@ public class TabletScheduler extends Daemon { // just get first available path. // we try to find a path with specified media type, if not find, arbitrarily use one. for (RootPathLoadStatistic rootPathLoadStatistic : allFitPaths) { - if (rootPathLoadStatistic.getStorageMedium() != tabletInfo.getStorageMedium()) { + if (rootPathLoadStatistic.getStorageMedium() != tabletCtx.getStorageMedium()) { continue; } @@ -820,24 +822,24 @@ public class TabletScheduler extends Daemon { * For some reason, a tablet info failed to be scheduled this time, * So we dynamically change its priority and add back to queue, waiting for next round. */ - private void dynamicAdjustPrioAndAddBackToPendingTablets(TabletSchedCtx tabletInfo, String message) { - Preconditions.checkState(tabletInfo.getState() == TabletSchedCtx.State.PENDING); - tabletInfo.adjustPriority(stat); - addTablet(tabletInfo, true /* force */); + private void dynamicAdjustPrioAndAddBackToPendingTablets(TabletSchedCtx tabletCtx, String message) { + Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.PENDING); + tabletCtx.adjustPriority(stat); + addTablet(tabletCtx, true /* force */); } - private synchronized void removeTabletInfo(TabletSchedCtx tabletInfo, TabletSchedCtx.State state, String reason) { - tabletInfo.setState(state); - tabletInfo.releaseResource(this); - tabletInfo.setFinishedTime(System.currentTimeMillis()); - runningTablets.remove(tabletInfo.getTabletId()); - allTabletIds.remove(tabletInfo.getTabletId()); - schedHistory.add(tabletInfo); - LOG.info("remove the tablet {}. because: {}", tabletInfo.getTabletId(), reason); + private synchronized void removeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, String reason) { + tabletCtx.setState(state); + tabletCtx.releaseResource(this); + tabletCtx.setFinishedTime(System.currentTimeMillis()); + runningTablets.remove(tabletCtx.getTabletId()); + allTabletIds.remove(tabletCtx.getTabletId()); + schedHistory.add(tabletCtx); + LOG.info("remove the tablet {}. because: {}", tabletCtx.getTabletId(), reason); } // get next batch of tablets from queue. - private synchronized List getNextTabletInfoBatch() { + private synchronized List getNextTabletCtxBatch() { List list = Lists.newArrayList(); int count = Math.max(MIN_BATCH_NUM, getCurrentAvailableSlotNum()); while (count > 0) { @@ -861,39 +863,43 @@ public class TabletScheduler extends Daemon { } /* - * return true if we want to remove the clone task from AgentTaskQueu + * return true if we want to remove the clone task from AgentTaskQueue */ public boolean finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) { long tabletId = cloneTask.getTabletId(); - TabletSchedCtx tabletInfo = takeRunningTablets(tabletId); - if (tabletInfo == null) { + TabletSchedCtx tabletCtx = takeRunningTablets(tabletId); + if (tabletCtx == null) { LOG.warn("tablet info does not exist: {}", tabletId); // tablet does not exist, no need to keep task. return true; } - Preconditions.checkState(tabletInfo.getState() == TabletSchedCtx.State.RUNNING); + + Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING, tabletCtx.getState()); try { - tabletInfo.finishCloneTask(cloneTask, request); + tabletCtx.finishCloneTask(cloneTask, request); } catch (SchedException e) { - tabletInfo.increaseFailedRunningCounter(); - tabletInfo.setErrMsg(e.getMessage()); + tabletCtx.increaseFailedRunningCounter(); + tabletCtx.setErrMsg(e.getMessage()); if (e.getStatus() == Status.RUNNING_FAILED) { stat.counterCloneTaskFailed.incrementAndGet(); - addToRunningTablets(tabletInfo); + addToRunningTablets(tabletCtx); return false; - } else { - Preconditions.checkState(e.getStatus() == Status.UNRECOVERABLE, e.getStatus()); + } else if (e.getStatus() == Status.UNRECOVERABLE) { // unrecoverable stat.counterTabletScheduledDiscard.incrementAndGet(); - removeTabletInfo(tabletInfo, TabletSchedCtx.State.CANCELLED, e.getMessage()); + removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage()); + return true; + } else if (e.getStatus() == Status.FINISHED) { + // tablet is already healthy, just remove + removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage()); return true; } } - Preconditions.checkState(tabletInfo.getState() == TabletSchedCtx.State.FINISHED); + Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.FINISHED); stat.counterCloneTaskSucceeded.incrementAndGet(); - gatherStatistics(tabletInfo); - removeTabletInfo(tabletInfo, TabletSchedCtx.State.FINISHED, "finished"); + gatherStatistics(tabletCtx); + removeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, "finished"); return true; } @@ -902,21 +908,21 @@ public class TabletScheduler extends Daemon { * It will be evaluated for future strategy. * This should only be called when the tablet is down with state FINISHED. */ - private void gatherStatistics(TabletSchedCtx tabletInfo) { - if (tabletInfo.getCopySize() > 0 && tabletInfo.getCopyTimeMs() > 0) { - if (tabletInfo.getSrcBackendId() != -1 && tabletInfo.getSrcPathHash() != -1) { - PathSlot pathSlot = backendsWorkingSlots.get(tabletInfo.getSrcBackendId()); + private void gatherStatistics(TabletSchedCtx tabletCtx) { + if (tabletCtx.getCopySize() > 0 && tabletCtx.getCopyTimeMs() > 0) { + if (tabletCtx.getSrcBackendId() != -1 && tabletCtx.getSrcPathHash() != -1) { + PathSlot pathSlot = backendsWorkingSlots.get(tabletCtx.getSrcBackendId()); if (pathSlot != null) { - pathSlot.updateStatistic(tabletInfo.getSrcPathHash(), tabletInfo.getCopySize(), - tabletInfo.getCopyTimeMs()); + pathSlot.updateStatistic(tabletCtx.getSrcPathHash(), tabletCtx.getCopySize(), + tabletCtx.getCopyTimeMs()); } } - if (tabletInfo.getDestBackendId() != -1 && tabletInfo.getDestPathHash() != -1) { - PathSlot pathSlot = backendsWorkingSlots.get(tabletInfo.getDestBackendId()); + if (tabletCtx.getDestBackendId() != -1 && tabletCtx.getDestPathHash() != -1) { + PathSlot pathSlot = backendsWorkingSlots.get(tabletCtx.getDestBackendId()); if (pathSlot != null) { - pathSlot.updateStatistic(tabletInfo.getDestPathHash(), tabletInfo.getCopySize(), - tabletInfo.getCopyTimeMs()); + pathSlot.updateStatistic(tabletCtx.getDestPathHash(), tabletCtx.getCopySize(), + tabletCtx.getCopyTimeMs()); } } } @@ -951,40 +957,40 @@ public class TabletScheduler extends Daemon { }); timeoutTablets.stream().forEach(t -> { - removeTabletInfo(t, TabletSchedCtx.State.TIMEOUT, "timeout"); + removeTabletCtx(t, TabletSchedCtx.State.TIMEOUT, "timeout"); stat.counterCloneTaskTimeout.incrementAndGet(); }); } public List> getPendingTabletsInfo(int limit) { - List tabletInfos = getCopiedTablets(pendingTablets, limit); - return collectTabletInfo(tabletInfos); + List tabletCtxs = getCopiedTablets(pendingTablets, limit); + return collectTabletCtx(tabletCtxs); } public List> getRunningTabletsInfo(int limit) { - List tabletInfos = getCopiedTablets(runningTablets.values(), limit); - return collectTabletInfo(tabletInfos); + List tabletCtxs = getCopiedTablets(runningTablets.values(), limit); + return collectTabletCtx(tabletCtxs); } public List> getHistoryTabletsInfo(int limit) { - List tabletInfos = getCopiedTablets(schedHistory, limit); - return collectTabletInfo(tabletInfos); + List tabletCtxs = getCopiedTablets(schedHistory, limit); + return collectTabletCtx(tabletCtxs); } - private List> collectTabletInfo(List tabletInfos) { + private List> collectTabletCtx(List tabletCtxs) { List> result = Lists.newArrayList(); - tabletInfos.stream().forEach(t -> { + tabletCtxs.stream().forEach(t -> { result.add(t.getBrief()); }); return result; } private synchronized List getCopiedTablets(Collection source, int limit) { - List tabletInfos = Lists.newArrayList(); + List tabletCtxs = Lists.newArrayList(); source.stream().limit(limit).forEach(t -> { - tabletInfos.add(t); + tabletCtxs.add(t); }); - return tabletInfos; + return tabletCtxs; } public synchronized int getPendingNum() {