From 7754791146e36efd53b5fe271189615c1206f9e2 Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Fri, 10 Nov 2023 10:14:42 +0800 Subject: [PATCH] =?UTF-8?q?[improvement](disk=20balance)=20Prevent=20dupli?= =?UTF-8?q?cate=20disk=20balance=20tasks=20afte=E2=80=A6=20(#25990)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- be/src/agent/task_worker_pool.cpp | 10 ++++--- .../apache/doris/clone/DiskRebalancer.java | 26 +++++++++++++++++++ .../apache/doris/clone/TabletScheduler.java | 13 ++++++++++ 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 6717af3ce4..698a71aec1 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1949,6 +1949,10 @@ void StorageMediumMigrateTaskPool::_storage_medium_migrate_worker_thread_callbac EngineStorageMigrationTask engine_task(tablet, dest_store); status = StorageEngine::instance()->execute_task(&engine_task); } + // fe should ignore this err + if (status.is()) { + status = Status::OK(); + } if (!status.ok()) { LOG_WARNING("failed to migrate storage medium") .tag("signature", agent_task_req.signature) @@ -2011,8 +2015,9 @@ Status StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium *dest_store = stores[0]; } if (tablet->data_dir()->path() == (*dest_store)->path()) { - return Status::InternalError("tablet is already on specified path {}", - tablet->data_dir()->path()); + LOG_WARNING("tablet is already on specified path").tag("path", tablet->data_dir()->path()); + return Status::Error("tablet is already on specified path: {}", + tablet->data_dir()->path()); } // check local disk capacity @@ -2021,7 +2026,6 @@ Status StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium return Status::InternalError("reach the capacity limit of path {}, tablet_size={}", (*dest_store)->path(), tablet_size); } - return Status::OK(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java index 5edca91444..0a7ce1b8f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java @@ -120,6 +120,7 @@ public class DiskRebalancer extends Rebalancer { @Override protected List selectAlternativeTabletsForCluster( LoadStatisticForTag clusterStat, TStorageMedium medium) { + LOG.info("dx test enter selectAlternativeTabletsForCluster"); List alternativeTablets = Lists.newArrayList(); // get classification of backends @@ -185,7 +186,17 @@ public class DiskRebalancer extends Rebalancer { Set pathHigh = Sets.newHashSet(); // we only select tablets from available high load path beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium); + LOG.info("dx test select before low={} mid={} high={} medium={}", pathLow, pathMid, pathHigh, medium); // check if BE has low and high paths for balance after reclassification + pathHigh.add(-2606726262674133323L); + pathHigh.add(384536254535458899L); + pathHigh.add(528047762753362128L); + pathLow.add(1252949013258184268L); + pathMid.remove(384536254535458899L); + pathMid.remove(528047762753362128L); + pathMid.remove(-2606726262674133323L); + pathMid.remove(1252949013258184268L); + LOG.info("dx test select after low={} mid={} high={} medium={}", pathLow, pathMid, pathHigh, medium); if (!checkAndReclassifyPaths(pathLow, pathMid, pathHigh)) { continue; } @@ -273,6 +284,7 @@ public class DiskRebalancer extends Rebalancer { medium, alternativeTablets.size(), alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray()); } + LOG.info("dx test out selectAlternativeTabletsForCluster, alternativeTablets={}", alternativeTablets); return alternativeTablets; } @@ -284,6 +296,7 @@ public class DiskRebalancer extends Rebalancer { */ @Override public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { + LOG.info("dx test enter completeSchedCtx"); LoadStatisticForTag clusterStat = statisticMap.get(tabletCtx.getTag()); if (clusterStat == null) { throw new SchedException(Status.UNRECOVERABLE, @@ -340,6 +353,18 @@ public class DiskRebalancer extends Rebalancer { Set pathMid = Sets.newHashSet(); Set pathHigh = Sets.newHashSet(); beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium()); + LOG.info("dx test complete before low={} mid={} high={} medium={}", + pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium()); + pathHigh.add(-2606726262674133323L); + pathHigh.add(384536254535458899L); + pathHigh.add(528047762753362128L); + pathLow.add(1252949013258184268L); + pathMid.remove(384536254535458899L); + pathMid.remove(528047762753362128L); + pathMid.remove(-2606726262674133323L); + pathMid.remove(1252949013258184268L); + LOG.info("dx test complete after low={} mid={} high={} medium={}", + pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium()); if (pathHigh.contains(replica.getPathHash())) { pathLow.addAll(pathMid); } else if (!pathMid.contains(replica.getPathHash())) { @@ -382,5 +407,6 @@ public class DiskRebalancer extends Rebalancer { if (!setDest) { throw new SchedException(Status.UNRECOVERABLE, "unable to find low load path"); } + LOG.info("dx test out completeSchedCtx"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 1d4592501f..beee677d2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -79,6 +79,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; @@ -615,6 +616,17 @@ public class TabletScheduler extends MasterDaemon { } } + public void updateDestPathHash(TabletSchedCtx tabletCtx) { + // find dest replica + Optional destReplica = tabletCtx.getReplicas() + .stream().filter(replica -> replica.getBackendId() == tabletCtx.getDestBackendId()).findAny(); + if (destReplica.isPresent() && tabletCtx.getDestPathHash() != -1) { + LOG.info("dx test success report old {} : new {}", + destReplica.get().getPathHash(), tabletCtx.getDestPathHash()); + destReplica.get().setPathHash(tabletCtx.getDestPathHash()); + } + } + public void updateDiskBalanceLastSuccTime(long beId, long pathHash) { PathSlot pathSlot = backendsWorkingSlots.get(beId); if (pathSlot == null) { @@ -1642,6 +1654,7 @@ public class TabletScheduler extends MasterDaemon { // if we have a success task, then stat must be refreshed before schedule a new task updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash()); updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash()); + updateDestPathHash(tabletCtx); finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "finished"); } else { finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,