From 5ac4f3468eae69ff7d264f362a179f10ec0848cd Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Fri, 29 Nov 2019 21:02:53 +0800 Subject: [PATCH] Remove old decommission job (#2326) DecommissionJob is also a type of AlterJob. When AlterJobV2 was introduced before, DecommissionJob was not modified accordingly. In fact, the Decommission operation does not need to generate a Job, but only need to mark the corresponding Backend state as Decommission. After that, the tablet repair logic will try to migrate the tablet on that Backend. And SystemHandler only needs to check all nodes marked as decommission, and then drop the emptied nodes. --- .../org/apache/doris/alter/AlterHandler.java | 1 - .../doris/alter/DecommissionBackendJob.java | 1 + .../org/apache/doris/alter/SystemHandler.java | 253 ++++++------------ .../doris/analysis/CancelAlterSystemStmt.java | 3 +- .../doris/system/SystemInfoService.java | 16 +- 5 files changed, 87 insertions(+), 187 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/src/main/java/org/apache/doris/alter/AlterHandler.java index 394e363434..971b1200b0 100644 --- a/fe/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -319,7 +319,6 @@ public abstract class AlterHandler extends MasterDaemon { @Override public void start() { - // register observer super.start(); } diff --git a/fe/src/main/java/org/apache/doris/alter/DecommissionBackendJob.java b/fe/src/main/java/org/apache/doris/alter/DecommissionBackendJob.java index 68badee985..5357ef88ee 100644 --- a/fe/src/main/java/org/apache/doris/alter/DecommissionBackendJob.java +++ b/fe/src/main/java/org/apache/doris/alter/DecommissionBackendJob.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +@Deprecated public class DecommissionBackendJob extends AlterJob { public enum DecommissionType { diff --git a/fe/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/src/main/java/org/apache/doris/alter/SystemHandler.java index 181cf5db29..d40db137a2 100644 --- a/fe/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -18,7 +18,6 @@ package org.apache.doris.alter; import org.apache.doris.alter.AlterJob.JobState; -import org.apache.doris.alter.DecommissionBackendJob.DecommissionType; import org.apache.doris.analysis.AddBackendClause; import org.apache.doris.analysis.AddFollowerClause; import org.apache.doris.analysis.AddObserverClause; @@ -34,18 +33,12 @@ import org.apache.doris.analysis.ModifyBrokerClause; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.PartitionInfo; -import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.Table.TableType; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; +import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.system.Backend; -import org.apache.doris.system.Backend.BackendState; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.AgentTask; import org.apache.doris.thrift.TTabletInfo; @@ -53,16 +46,19 @@ import org.apache.doris.thrift.TTabletInfo; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.commons.lang.NotImplementedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; +/* + * SystemHandler is for + * 1. add/drop/decommisson backends + * 2. add/drop frontends + * 3. add/drop/modify brokers + */ public class SystemHandler extends AlterHandler { private static final Logger LOG = LogManager.getLogger(SystemHandler.class); @@ -73,59 +69,48 @@ public class SystemHandler extends AlterHandler { @Override public void handleFinishedReplica(AgentTask task, TTabletInfo finishTabletInfo, long reportVersion) throws MetaNotFoundException { - AlterJob alterJob = getAlterJob(-1L); - if (alterJob == null) { - throw new MetaNotFoundException("Cannot find " + task.getTaskType().name() + " job"); - } - alterJob.handleFinishedReplica(task, finishTabletInfo, reportVersion); } @Override protected void runAfterCatalogReady() { super.runAfterCatalogReady(); + runOldAlterJob(); + runAlterJobV2(); + } - List cancelledJobs = Lists.newArrayList(); - List finishedJobs = Lists.newArrayList(); + @Deprecated + private void runOldAlterJob() { + // just remove all old decommission jobs. the decommission state is already marked in Backend, + // and we no long need decommission job. + alterJobs.clear(); + finishedOrCancelledAlterJobs.clear(); + } - for (AlterJob alterJob : alterJobs.values()) { - AlterJob decommissionBackendJob = (DecommissionBackendJob) alterJob; - JobState state = decommissionBackendJob.getState(); - switch (state) { - case PENDING: { - // send tasks - decommissionBackendJob.sendTasks(); - break; - } - case RUNNING: { - // no timeout - // try finish job - decommissionBackendJob.tryFinishJob(); - break; - } - case FINISHED: { - // remove from alterJobs - finishedJobs.add(decommissionBackendJob); - break; - } - case CANCELLED: { - Preconditions.checkState(false); - break; - } - default: - Preconditions.checkState(false); - break; + // check all decommissioned backends, if there is no tablet on that backend, drop it. + private void runAlterJobV2() { + SystemInfoService systemInfoService = Catalog.getCurrentSystemInfo(); + TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex(); + // check if decommission is finished + for (Long beId : systemInfoService.getBackendIds(false)) { + Backend backend = systemInfoService.getBackend(beId); + if (backend == null || !backend.isDecommissioned()) { + continue; } - } // end for jobs - // handle cancelled jobs - for (AlterJob dropBackendJob : cancelledJobs) { - dropBackendJob.cancel(null, "cancelled"); - jobDone(dropBackendJob); - } + List backendTabletIds = invertedIndex.getTabletIdsByBackendId(beId); + if (backendTabletIds.isEmpty()) { + try { + systemInfoService.dropBackend(beId); + LOG.info("no tablet on decommission backend {}, drop it", beId); + } catch (DdlException e) { + // does not matter, may be backend not exist + LOG.info("backend {} is dropped failed after decommission {}", beId, e.getMessage()); + } + continue; + } - // handle finished jobs - for (AlterJob dropBackendJob : finishedJobs) { - jobDone(dropBackendJob); + LOG.info("backend {} lefts {} replicas to decommission: {}", beId, backendTabletIds.size(), + backendTabletIds.size() <= 20 ? backendTabletIds : "too many"); } } @@ -135,13 +120,14 @@ public class SystemHandler extends AlterHandler { } @Override - // add synchronized to avoid process 2 or more stmt at same time + // add synchronized to avoid process 2 or more stmts at same time public synchronized void process(List alterClauses, String clusterName, Database dummyDb, OlapTable dummyTbl) throws DdlException { Preconditions.checkArgument(alterClauses.size() == 1); AlterClause alterClause = alterClauses.get(0); if (alterClause instanceof AddBackendClause) { + // add backend AddBackendClause addBackendClause = (AddBackendClause) alterClause; final String destClusterName = addBackendClause.getDestCluster(); @@ -152,6 +138,7 @@ public class SystemHandler extends AlterHandler { Catalog.getCurrentSystemInfo().addBackends(addBackendClause.getHostPortPairs(), addBackendClause.isFree(), addBackendClause.getDestCluster()); } else if (alterClause instanceof DropBackendClause) { + // drop backend DropBackendClause dropBackendClause = (DropBackendClause) alterClause; if (!dropBackendClause.isForce()) { throw new DdlException("It is highly NOT RECOMMENDED to use DROP BACKEND stmt." @@ -161,33 +148,20 @@ public class SystemHandler extends AlterHandler { } Catalog.getCurrentSystemInfo().dropBackends(dropBackendClause.getHostPortPairs()); } else if (alterClause instanceof DecommissionBackendClause) { + // decommission DecommissionBackendClause decommissionBackendClause = (DecommissionBackendClause) alterClause; - // check request - // clusterName -> (beId -> backend) - Map> clusterBackendsMap = checkDecommission(decommissionBackendClause); + List decommissionBackends = checkDecommission(decommissionBackendClause); // set backend's state as 'decommissioned' - for (Map backends : clusterBackendsMap.values()) { - for (Backend backend : backends.values()) { - if (((DecommissionBackendClause) alterClause).getType() == DecommissionType.ClusterDecommission) { - backend.setBackendState(BackendState.offline); - } - backend.setDecommissioned(true); - backend.setDecommissionType(((DecommissionBackendClause) alterClause).getType()); - Catalog.getInstance().getEditLog().logBackendStateChange(backend); - } + // for decommission operation, here is no decommission job. the system handler will check + // all backend in decommission state + for (Backend backend : decommissionBackends) { + backend.setDecommissioned(true); + Catalog.getCurrentCatalog().getEditLog().logBackendStateChange(backend); + LOG.info("set backend {} to decommission", backend.getId()); } - // add job - long jobId = Catalog.getInstance().getNextId(); - DecommissionBackendJob decommissionBackendJob = new DecommissionBackendJob(jobId, clusterBackendsMap); - decommissionBackendJob.setDecommissionType(decommissionBackendClause.getType()); - addAlterJob(decommissionBackendJob); - - // log - Catalog.getInstance().getEditLog().logStartDecommissionBackend(decommissionBackendJob); - LOG.info("decommission backend job[{}] created. {}", jobId, decommissionBackendClause.toSql()); } else if (alterClause instanceof AddObserverClause) { AddObserverClause clause = (AddObserverClause) alterClause; Catalog.getInstance().addFrontend(FrontendNodeType.OBSERVER, clause.getHost(), clause.getPort()); @@ -211,126 +185,52 @@ public class SystemHandler extends AlterHandler { } } - private Map> checkDecommission(DecommissionBackendClause decommissionBackendClause) + private List checkDecommission(DecommissionBackendClause decommissionBackendClause) throws DdlException { return checkDecommission(decommissionBackendClause.getHostPortPairs()); } - public static Map> checkDecommission(List> hostPortPairs) + /* + * check if the specified backends can be decommissioned + * 1. backend should exist. + * 2. after decommission, the remaining backend num should meet the replication num. + * 3. after decommission, The remaining space capacity can store data on decommissioned backends. + */ + public static List checkDecommission(List> hostPortPairs) throws DdlException { - // check - Catalog.getCurrentSystemInfo().checkBackendsExist(hostPortPairs); - - // in Multi-Tenancy , we will check decommission in every cluster - // check if backend is under decommissioned - - // clusterName -> (beId -> backend) - final Map> decommClusterBackendsMap = Maps.newHashMap(); + SystemInfoService infoService = Catalog.getCurrentSystemInfo(); + List decommissionBackends = Lists.newArrayList(); + // check if exist for (Pair pair : hostPortPairs) { - Backend backend = Catalog.getCurrentSystemInfo().getBackendWithHeartbeatPort(pair.first, pair.second); - + Backend backend = infoService.getBackendWithHeartbeatPort(pair.first, pair.second); + if (backend == null) { + throw new DdlException("Backend does not exist[" + pair.first + ":" + pair.second + "]"); + } if (backend.isDecommissioned()) { - // it's ok to resend decommission command. just log - LOG.info("backend[{}] is already under decommissioned.", backend.getHost()); + // already under decommission, ignore it + continue; } - - Map backends = decommClusterBackendsMap.get(backend.getOwnerClusterName()); - if (backends == null) { - backends = Maps.newHashMap(); - decommClusterBackendsMap.put(backend.getOwnerClusterName(), backends); - } - backends.put(backend.getId(), backend); + decommissionBackends.add(backend); } - for (String clusterName : decommClusterBackendsMap.keySet()) { - // check available capacity for decommission. - // we need to make sure that there is enough space in this cluster - // to store the data from decommissioned backends. - long totalAvailableCapacityB = 0L; - long totalNeededCapacityB = 0L; // decommission + dead - int availableBackendNum = 0; - final Map decommBackendsInCluster = decommClusterBackendsMap.get(clusterName); + // TODO(cmy): check if replication num can be met + // TODO(cmy): check remaining space - // get all backends in this cluster - final Map idToBackendsInCluster = - Catalog.getCurrentSystemInfo().getBackendsInCluster(clusterName); - for (Entry entry : idToBackendsInCluster.entrySet()) { - long backendId = entry.getKey(); - Backend backend = entry.getValue(); - if (decommBackendsInCluster.containsKey(backendId) - || !backend.isAvailable()) { - totalNeededCapacityB += backend.getDataUsedCapacityB(); - } else { - ++availableBackendNum; - totalAvailableCapacityB += backend.getAvailableCapacityB(); - } - } - - // if the space we needed is larger than the current available capacity * 0.85, - // we refuse this decommission operation. - if (totalNeededCapacityB > totalAvailableCapacityB * (Config.storage_high_watermark_usage_percent / 100.0)) { - throw new DdlException("No available capacity for decommission in cluster: " + clusterName - + ", needed: " + totalNeededCapacityB + ", available: " + totalAvailableCapacityB - + ", threshold: " + Config.storage_high_watermark_usage_percent); - } - - // backend num not enough - if (availableBackendNum == 0) { - throw new DdlException("No available backend"); - } - - // check if meet replication number requirement - List dbNames; - try { - dbNames = Catalog.getInstance().getClusterDbNames(clusterName); - } catch (AnalysisException e) { - throw new DdlException(e.getMessage()); - } - - for (String dbName : dbNames) { - Database db = Catalog.getInstance().getDb(dbName); - if (db == null) { - continue; - } - db.readLock(); - try { - for (Table table : db.getTables()) { - if (table.getType() != TableType.OLAP) { - continue; - } - OlapTable olapTable = (OlapTable) table; - PartitionInfo partitionInfo = olapTable.getPartitionInfo(); - for (Partition partition : olapTable.getPartitions()) { - short replicationNum = partitionInfo.getReplicationNum(partition.getId()); - if (availableBackendNum < replicationNum) { - throw new DdlException("Table[" + table.getName() + "] in database[" + dbName - + "] need more than " + replicationNum - + " available backends to meet replication num requirement"); - } - } - - } - } finally { - db.readUnlock(); - } - } - } - - return decommClusterBackendsMap; + return decommissionBackends; } @Override public synchronized void cancel(CancelStmt stmt) throws DdlException { - CancelAlterSystemStmt cancelAlterClusterStmt = (CancelAlterSystemStmt) stmt; - cancelAlterClusterStmt.getHostPortPairs(); + CancelAlterSystemStmt cancelAlterSystemStmt = (CancelAlterSystemStmt) stmt; + cancelAlterSystemStmt.getHostPortPairs(); - SystemInfoService clusterInfo = Catalog.getCurrentSystemInfo(); + SystemInfoService infoService = Catalog.getCurrentSystemInfo(); // check if backends is under decommission List backends = Lists.newArrayList(); - List> hostPortPairs = cancelAlterClusterStmt.getHostPortPairs(); + List> hostPortPairs = cancelAlterSystemStmt.getHostPortPairs(); for (Pair pair : hostPortPairs) { // check if exist - Backend backend = clusterInfo.getBackendWithHeartbeatPort(pair.first, pair.second); + Backend backend = infoService.getBackendWithHeartbeatPort(pair.first, pair.second); if (backend == null) { throw new DdlException("Backend does not exists[" + pair.first + "]"); } @@ -338,6 +238,7 @@ public class SystemHandler extends AlterHandler { if (!backend.isDecommissioned()) { // it's ok. just log LOG.info("backend is not decommissioned[{}]", pair.first); + continue; } backends.add(backend); diff --git a/fe/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java b/fe/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java index 526c37da1c..4e548c72f8 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java @@ -20,6 +20,7 @@ package org.apache.doris.analysis; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; import org.apache.doris.system.SystemInfoService; + import com.google.common.base.Preconditions; import java.util.LinkedList; @@ -52,7 +53,7 @@ public class CancelAlterSystemStmt extends CancelStmt { @Override public String toSql() { StringBuilder sb = new StringBuilder(); - sb.append("CANCEL ALTER CLUSTER DECOMMISSION BACKEND "); + sb.append("CANCEL DECOMMISSION BACKEND "); for (int i = 0; i < hostPorts.size(); i++) { sb.append("\"").append(hostPorts.get(i)).append("\""); if (i != hostPorts.size() - 1) { diff --git a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java index ca09bf7c38..e1292be56e 100644 --- a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -175,15 +175,6 @@ public class SystemInfoService { MetricRepo.generateTabletNumMetrics(); } - public void checkBackendsExist(List> hostPortPairs) throws DdlException { - for (Pair pair : hostPortPairs) { - // check if exist - if (getBackendWithHeartbeatPort(pair.first, pair.second) == null) { - throw new DdlException("Backend does not exist[" + pair.first + ":" + pair.second + "]"); - } - } - } - public void dropBackends(List> hostPortPairs) throws DdlException { for (Pair pair : hostPortPairs) { // check is already exist @@ -1046,6 +1037,13 @@ public class SystemInfoService { public void updateBackendState(Backend be) { long id = be.getId(); Backend memoryBe = getBackend(id); + if (memoryBe == null) { + // backend may already be dropped. this may happen when + // 1. SystemHandler drop the decommission backend + // 2. at same time, user try to cancel the decommission of that backend. + // These two operations do not guarantee the order. + return; + } memoryBe.setBePort(be.getBePort()); memoryBe.setAlive(be.isAlive()); memoryBe.setDecommissioned(be.isDecommissioned());