diff --git a/fe/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/src/main/java/org/apache/doris/alter/AlterHandler.java index 394e363434..971b1200b0 100644 --- a/fe/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -319,7 +319,6 @@ public abstract class AlterHandler extends MasterDaemon { @Override public void start() { - // register observer super.start(); } diff --git a/fe/src/main/java/org/apache/doris/alter/DecommissionBackendJob.java b/fe/src/main/java/org/apache/doris/alter/DecommissionBackendJob.java index 68badee985..5357ef88ee 100644 --- a/fe/src/main/java/org/apache/doris/alter/DecommissionBackendJob.java +++ b/fe/src/main/java/org/apache/doris/alter/DecommissionBackendJob.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +@Deprecated public class DecommissionBackendJob extends AlterJob { public enum DecommissionType { diff --git a/fe/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/src/main/java/org/apache/doris/alter/SystemHandler.java index 181cf5db29..d40db137a2 100644 --- a/fe/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -18,7 +18,6 @@ package org.apache.doris.alter; import org.apache.doris.alter.AlterJob.JobState; -import org.apache.doris.alter.DecommissionBackendJob.DecommissionType; import org.apache.doris.analysis.AddBackendClause; import org.apache.doris.analysis.AddFollowerClause; import org.apache.doris.analysis.AddObserverClause; @@ -34,18 +33,12 @@ import org.apache.doris.analysis.ModifyBrokerClause; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.PartitionInfo; -import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.Table.TableType; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; +import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.system.Backend; -import org.apache.doris.system.Backend.BackendState; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.AgentTask; import org.apache.doris.thrift.TTabletInfo; @@ -53,16 +46,19 @@ import org.apache.doris.thrift.TTabletInfo; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.commons.lang.NotImplementedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; +/* + * SystemHandler is for + * 1. add/drop/decommisson backends + * 2. add/drop frontends + * 3. add/drop/modify brokers + */ public class SystemHandler extends AlterHandler { private static final Logger LOG = LogManager.getLogger(SystemHandler.class); @@ -73,59 +69,48 @@ public class SystemHandler extends AlterHandler { @Override public void handleFinishedReplica(AgentTask task, TTabletInfo finishTabletInfo, long reportVersion) throws MetaNotFoundException { - AlterJob alterJob = getAlterJob(-1L); - if (alterJob == null) { - throw new MetaNotFoundException("Cannot find " + task.getTaskType().name() + " job"); - } - alterJob.handleFinishedReplica(task, finishTabletInfo, reportVersion); } @Override protected void runAfterCatalogReady() { super.runAfterCatalogReady(); + runOldAlterJob(); + runAlterJobV2(); + } - List cancelledJobs = Lists.newArrayList(); - List finishedJobs = Lists.newArrayList(); + @Deprecated + private void runOldAlterJob() { + // just remove all old decommission jobs. the decommission state is already marked in Backend, + // and we no long need decommission job. + alterJobs.clear(); + finishedOrCancelledAlterJobs.clear(); + } - for (AlterJob alterJob : alterJobs.values()) { - AlterJob decommissionBackendJob = (DecommissionBackendJob) alterJob; - JobState state = decommissionBackendJob.getState(); - switch (state) { - case PENDING: { - // send tasks - decommissionBackendJob.sendTasks(); - break; - } - case RUNNING: { - // no timeout - // try finish job - decommissionBackendJob.tryFinishJob(); - break; - } - case FINISHED: { - // remove from alterJobs - finishedJobs.add(decommissionBackendJob); - break; - } - case CANCELLED: { - Preconditions.checkState(false); - break; - } - default: - Preconditions.checkState(false); - break; + // check all decommissioned backends, if there is no tablet on that backend, drop it. + private void runAlterJobV2() { + SystemInfoService systemInfoService = Catalog.getCurrentSystemInfo(); + TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex(); + // check if decommission is finished + for (Long beId : systemInfoService.getBackendIds(false)) { + Backend backend = systemInfoService.getBackend(beId); + if (backend == null || !backend.isDecommissioned()) { + continue; } - } // end for jobs - // handle cancelled jobs - for (AlterJob dropBackendJob : cancelledJobs) { - dropBackendJob.cancel(null, "cancelled"); - jobDone(dropBackendJob); - } + List backendTabletIds = invertedIndex.getTabletIdsByBackendId(beId); + if (backendTabletIds.isEmpty()) { + try { + systemInfoService.dropBackend(beId); + LOG.info("no tablet on decommission backend {}, drop it", beId); + } catch (DdlException e) { + // does not matter, may be backend not exist + LOG.info("backend {} is dropped failed after decommission {}", beId, e.getMessage()); + } + continue; + } - // handle finished jobs - for (AlterJob dropBackendJob : finishedJobs) { - jobDone(dropBackendJob); + LOG.info("backend {} lefts {} replicas to decommission: {}", beId, backendTabletIds.size(), + backendTabletIds.size() <= 20 ? backendTabletIds : "too many"); } } @@ -135,13 +120,14 @@ public class SystemHandler extends AlterHandler { } @Override - // add synchronized to avoid process 2 or more stmt at same time + // add synchronized to avoid process 2 or more stmts at same time public synchronized void process(List alterClauses, String clusterName, Database dummyDb, OlapTable dummyTbl) throws DdlException { Preconditions.checkArgument(alterClauses.size() == 1); AlterClause alterClause = alterClauses.get(0); if (alterClause instanceof AddBackendClause) { + // add backend AddBackendClause addBackendClause = (AddBackendClause) alterClause; final String destClusterName = addBackendClause.getDestCluster(); @@ -152,6 +138,7 @@ public class SystemHandler extends AlterHandler { Catalog.getCurrentSystemInfo().addBackends(addBackendClause.getHostPortPairs(), addBackendClause.isFree(), addBackendClause.getDestCluster()); } else if (alterClause instanceof DropBackendClause) { + // drop backend DropBackendClause dropBackendClause = (DropBackendClause) alterClause; if (!dropBackendClause.isForce()) { throw new DdlException("It is highly NOT RECOMMENDED to use DROP BACKEND stmt." @@ -161,33 +148,20 @@ public class SystemHandler extends AlterHandler { } Catalog.getCurrentSystemInfo().dropBackends(dropBackendClause.getHostPortPairs()); } else if (alterClause instanceof DecommissionBackendClause) { + // decommission DecommissionBackendClause decommissionBackendClause = (DecommissionBackendClause) alterClause; - // check request - // clusterName -> (beId -> backend) - Map> clusterBackendsMap = checkDecommission(decommissionBackendClause); + List decommissionBackends = checkDecommission(decommissionBackendClause); // set backend's state as 'decommissioned' - for (Map backends : clusterBackendsMap.values()) { - for (Backend backend : backends.values()) { - if (((DecommissionBackendClause) alterClause).getType() == DecommissionType.ClusterDecommission) { - backend.setBackendState(BackendState.offline); - } - backend.setDecommissioned(true); - backend.setDecommissionType(((DecommissionBackendClause) alterClause).getType()); - Catalog.getInstance().getEditLog().logBackendStateChange(backend); - } + // for decommission operation, here is no decommission job. the system handler will check + // all backend in decommission state + for (Backend backend : decommissionBackends) { + backend.setDecommissioned(true); + Catalog.getCurrentCatalog().getEditLog().logBackendStateChange(backend); + LOG.info("set backend {} to decommission", backend.getId()); } - // add job - long jobId = Catalog.getInstance().getNextId(); - DecommissionBackendJob decommissionBackendJob = new DecommissionBackendJob(jobId, clusterBackendsMap); - decommissionBackendJob.setDecommissionType(decommissionBackendClause.getType()); - addAlterJob(decommissionBackendJob); - - // log - Catalog.getInstance().getEditLog().logStartDecommissionBackend(decommissionBackendJob); - LOG.info("decommission backend job[{}] created. {}", jobId, decommissionBackendClause.toSql()); } else if (alterClause instanceof AddObserverClause) { AddObserverClause clause = (AddObserverClause) alterClause; Catalog.getInstance().addFrontend(FrontendNodeType.OBSERVER, clause.getHost(), clause.getPort()); @@ -211,126 +185,52 @@ public class SystemHandler extends AlterHandler { } } - private Map> checkDecommission(DecommissionBackendClause decommissionBackendClause) + private List checkDecommission(DecommissionBackendClause decommissionBackendClause) throws DdlException { return checkDecommission(decommissionBackendClause.getHostPortPairs()); } - public static Map> checkDecommission(List> hostPortPairs) + /* + * check if the specified backends can be decommissioned + * 1. backend should exist. + * 2. after decommission, the remaining backend num should meet the replication num. + * 3. after decommission, The remaining space capacity can store data on decommissioned backends. + */ + public static List checkDecommission(List> hostPortPairs) throws DdlException { - // check - Catalog.getCurrentSystemInfo().checkBackendsExist(hostPortPairs); - - // in Multi-Tenancy , we will check decommission in every cluster - // check if backend is under decommissioned - - // clusterName -> (beId -> backend) - final Map> decommClusterBackendsMap = Maps.newHashMap(); + SystemInfoService infoService = Catalog.getCurrentSystemInfo(); + List decommissionBackends = Lists.newArrayList(); + // check if exist for (Pair pair : hostPortPairs) { - Backend backend = Catalog.getCurrentSystemInfo().getBackendWithHeartbeatPort(pair.first, pair.second); - + Backend backend = infoService.getBackendWithHeartbeatPort(pair.first, pair.second); + if (backend == null) { + throw new DdlException("Backend does not exist[" + pair.first + ":" + pair.second + "]"); + } if (backend.isDecommissioned()) { - // it's ok to resend decommission command. just log - LOG.info("backend[{}] is already under decommissioned.", backend.getHost()); + // already under decommission, ignore it + continue; } - - Map backends = decommClusterBackendsMap.get(backend.getOwnerClusterName()); - if (backends == null) { - backends = Maps.newHashMap(); - decommClusterBackendsMap.put(backend.getOwnerClusterName(), backends); - } - backends.put(backend.getId(), backend); + decommissionBackends.add(backend); } - for (String clusterName : decommClusterBackendsMap.keySet()) { - // check available capacity for decommission. - // we need to make sure that there is enough space in this cluster - // to store the data from decommissioned backends. - long totalAvailableCapacityB = 0L; - long totalNeededCapacityB = 0L; // decommission + dead - int availableBackendNum = 0; - final Map decommBackendsInCluster = decommClusterBackendsMap.get(clusterName); + // TODO(cmy): check if replication num can be met + // TODO(cmy): check remaining space - // get all backends in this cluster - final Map idToBackendsInCluster = - Catalog.getCurrentSystemInfo().getBackendsInCluster(clusterName); - for (Entry entry : idToBackendsInCluster.entrySet()) { - long backendId = entry.getKey(); - Backend backend = entry.getValue(); - if (decommBackendsInCluster.containsKey(backendId) - || !backend.isAvailable()) { - totalNeededCapacityB += backend.getDataUsedCapacityB(); - } else { - ++availableBackendNum; - totalAvailableCapacityB += backend.getAvailableCapacityB(); - } - } - - // if the space we needed is larger than the current available capacity * 0.85, - // we refuse this decommission operation. - if (totalNeededCapacityB > totalAvailableCapacityB * (Config.storage_high_watermark_usage_percent / 100.0)) { - throw new DdlException("No available capacity for decommission in cluster: " + clusterName - + ", needed: " + totalNeededCapacityB + ", available: " + totalAvailableCapacityB - + ", threshold: " + Config.storage_high_watermark_usage_percent); - } - - // backend num not enough - if (availableBackendNum == 0) { - throw new DdlException("No available backend"); - } - - // check if meet replication number requirement - List dbNames; - try { - dbNames = Catalog.getInstance().getClusterDbNames(clusterName); - } catch (AnalysisException e) { - throw new DdlException(e.getMessage()); - } - - for (String dbName : dbNames) { - Database db = Catalog.getInstance().getDb(dbName); - if (db == null) { - continue; - } - db.readLock(); - try { - for (Table table : db.getTables()) { - if (table.getType() != TableType.OLAP) { - continue; - } - OlapTable olapTable = (OlapTable) table; - PartitionInfo partitionInfo = olapTable.getPartitionInfo(); - for (Partition partition : olapTable.getPartitions()) { - short replicationNum = partitionInfo.getReplicationNum(partition.getId()); - if (availableBackendNum < replicationNum) { - throw new DdlException("Table[" + table.getName() + "] in database[" + dbName - + "] need more than " + replicationNum - + " available backends to meet replication num requirement"); - } - } - - } - } finally { - db.readUnlock(); - } - } - } - - return decommClusterBackendsMap; + return decommissionBackends; } @Override public synchronized void cancel(CancelStmt stmt) throws DdlException { - CancelAlterSystemStmt cancelAlterClusterStmt = (CancelAlterSystemStmt) stmt; - cancelAlterClusterStmt.getHostPortPairs(); + CancelAlterSystemStmt cancelAlterSystemStmt = (CancelAlterSystemStmt) stmt; + cancelAlterSystemStmt.getHostPortPairs(); - SystemInfoService clusterInfo = Catalog.getCurrentSystemInfo(); + SystemInfoService infoService = Catalog.getCurrentSystemInfo(); // check if backends is under decommission List backends = Lists.newArrayList(); - List> hostPortPairs = cancelAlterClusterStmt.getHostPortPairs(); + List> hostPortPairs = cancelAlterSystemStmt.getHostPortPairs(); for (Pair pair : hostPortPairs) { // check if exist - Backend backend = clusterInfo.getBackendWithHeartbeatPort(pair.first, pair.second); + Backend backend = infoService.getBackendWithHeartbeatPort(pair.first, pair.second); if (backend == null) { throw new DdlException("Backend does not exists[" + pair.first + "]"); } @@ -338,6 +238,7 @@ public class SystemHandler extends AlterHandler { if (!backend.isDecommissioned()) { // it's ok. just log LOG.info("backend is not decommissioned[{}]", pair.first); + continue; } backends.add(backend); diff --git a/fe/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java b/fe/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java index 526c37da1c..4e548c72f8 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java @@ -20,6 +20,7 @@ package org.apache.doris.analysis; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; import org.apache.doris.system.SystemInfoService; + import com.google.common.base.Preconditions; import java.util.LinkedList; @@ -52,7 +53,7 @@ public class CancelAlterSystemStmt extends CancelStmt { @Override public String toSql() { StringBuilder sb = new StringBuilder(); - sb.append("CANCEL ALTER CLUSTER DECOMMISSION BACKEND "); + sb.append("CANCEL DECOMMISSION BACKEND "); for (int i = 0; i < hostPorts.size(); i++) { sb.append("\"").append(hostPorts.get(i)).append("\""); if (i != hostPorts.size() - 1) { diff --git a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java index ca09bf7c38..e1292be56e 100644 --- a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -175,15 +175,6 @@ public class SystemInfoService { MetricRepo.generateTabletNumMetrics(); } - public void checkBackendsExist(List> hostPortPairs) throws DdlException { - for (Pair pair : hostPortPairs) { - // check if exist - if (getBackendWithHeartbeatPort(pair.first, pair.second) == null) { - throw new DdlException("Backend does not exist[" + pair.first + ":" + pair.second + "]"); - } - } - } - public void dropBackends(List> hostPortPairs) throws DdlException { for (Pair pair : hostPortPairs) { // check is already exist @@ -1046,6 +1037,13 @@ public class SystemInfoService { public void updateBackendState(Backend be) { long id = be.getId(); Backend memoryBe = getBackend(id); + if (memoryBe == null) { + // backend may already be dropped. this may happen when + // 1. SystemHandler drop the decommission backend + // 2. at same time, user try to cancel the decommission of that backend. + // These two operations do not guarantee the order. + return; + } memoryBe.setBePort(be.getBePort()); memoryBe.setAlive(be.isAlive()); memoryBe.setDecommissioned(be.isDecommissioned());