Remove old decommission job (#2326)

DecommissionJob is also a type of AlterJob.
When AlterJobV2 was introduced before, DecommissionJob was not modified accordingly.

In fact, the Decommission operation does not need to generate a Job, but only need to mark the corresponding Backend state as Decommission. After that, the tablet repair logic will try to migrate the tablet on that Backend. And SystemHandler only needs to check all nodes marked as decommission, and then drop the emptied nodes.
This commit is contained in:
Mingyu Chen
2019-11-29 21:02:53 +08:00
committed by ZHAO Chun
parent 8787a59cb4
commit 5ac4f3468e
5 changed files with 87 additions and 187 deletions

View File

@ -319,7 +319,6 @@ public abstract class AlterHandler extends MasterDaemon {
@Override
public void start() {
// register observer
super.start();
}

View File

@ -51,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
@Deprecated
public class DecommissionBackendJob extends AlterJob {
public enum DecommissionType {

View File

@ -18,7 +18,6 @@
package org.apache.doris.alter;
import org.apache.doris.alter.AlterJob.JobState;
import org.apache.doris.alter.DecommissionBackendJob.DecommissionType;
import org.apache.doris.analysis.AddBackendClause;
import org.apache.doris.analysis.AddFollowerClause;
import org.apache.doris.analysis.AddObserverClause;
@ -34,18 +33,12 @@ import org.apache.doris.analysis.ModifyBrokerClause;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Table.TableType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Backend.BackendState;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentTask;
import org.apache.doris.thrift.TTabletInfo;
@ -53,16 +46,19 @@ import org.apache.doris.thrift.TTabletInfo;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
/*
* SystemHandler is for
* 1. add/drop/decommisson backends
* 2. add/drop frontends
* 3. add/drop/modify brokers
*/
public class SystemHandler extends AlterHandler {
private static final Logger LOG = LogManager.getLogger(SystemHandler.class);
@ -73,59 +69,48 @@ public class SystemHandler extends AlterHandler {
@Override
public void handleFinishedReplica(AgentTask task, TTabletInfo finishTabletInfo, long reportVersion)
throws MetaNotFoundException {
AlterJob alterJob = getAlterJob(-1L);
if (alterJob == null) {
throw new MetaNotFoundException("Cannot find " + task.getTaskType().name() + " job");
}
alterJob.handleFinishedReplica(task, finishTabletInfo, reportVersion);
}
@Override
protected void runAfterCatalogReady() {
super.runAfterCatalogReady();
runOldAlterJob();
runAlterJobV2();
}
List<AlterJob> cancelledJobs = Lists.newArrayList();
List<AlterJob> finishedJobs = Lists.newArrayList();
@Deprecated
private void runOldAlterJob() {
// just remove all old decommission jobs. the decommission state is already marked in Backend,
// and we no long need decommission job.
alterJobs.clear();
finishedOrCancelledAlterJobs.clear();
}
for (AlterJob alterJob : alterJobs.values()) {
AlterJob decommissionBackendJob = (DecommissionBackendJob) alterJob;
JobState state = decommissionBackendJob.getState();
switch (state) {
case PENDING: {
// send tasks
decommissionBackendJob.sendTasks();
break;
}
case RUNNING: {
// no timeout
// try finish job
decommissionBackendJob.tryFinishJob();
break;
}
case FINISHED: {
// remove from alterJobs
finishedJobs.add(decommissionBackendJob);
break;
}
case CANCELLED: {
Preconditions.checkState(false);
break;
}
default:
Preconditions.checkState(false);
break;
// check all decommissioned backends, if there is no tablet on that backend, drop it.
private void runAlterJobV2() {
SystemInfoService systemInfoService = Catalog.getCurrentSystemInfo();
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
// check if decommission is finished
for (Long beId : systemInfoService.getBackendIds(false)) {
Backend backend = systemInfoService.getBackend(beId);
if (backend == null || !backend.isDecommissioned()) {
continue;
}
} // end for jobs
// handle cancelled jobs
for (AlterJob dropBackendJob : cancelledJobs) {
dropBackendJob.cancel(null, "cancelled");
jobDone(dropBackendJob);
}
List<Long> backendTabletIds = invertedIndex.getTabletIdsByBackendId(beId);
if (backendTabletIds.isEmpty()) {
try {
systemInfoService.dropBackend(beId);
LOG.info("no tablet on decommission backend {}, drop it", beId);
} catch (DdlException e) {
// does not matter, may be backend not exist
LOG.info("backend {} is dropped failed after decommission {}", beId, e.getMessage());
}
continue;
}
// handle finished jobs
for (AlterJob dropBackendJob : finishedJobs) {
jobDone(dropBackendJob);
LOG.info("backend {} lefts {} replicas to decommission: {}", beId, backendTabletIds.size(),
backendTabletIds.size() <= 20 ? backendTabletIds : "too many");
}
}
@ -135,13 +120,14 @@ public class SystemHandler extends AlterHandler {
}
@Override
// add synchronized to avoid process 2 or more stmt at same time
// add synchronized to avoid process 2 or more stmts at same time
public synchronized void process(List<AlterClause> alterClauses, String clusterName, Database dummyDb,
OlapTable dummyTbl) throws DdlException {
Preconditions.checkArgument(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
if (alterClause instanceof AddBackendClause) {
// add backend
AddBackendClause addBackendClause = (AddBackendClause) alterClause;
final String destClusterName = addBackendClause.getDestCluster();
@ -152,6 +138,7 @@ public class SystemHandler extends AlterHandler {
Catalog.getCurrentSystemInfo().addBackends(addBackendClause.getHostPortPairs(),
addBackendClause.isFree(), addBackendClause.getDestCluster());
} else if (alterClause instanceof DropBackendClause) {
// drop backend
DropBackendClause dropBackendClause = (DropBackendClause) alterClause;
if (!dropBackendClause.isForce()) {
throw new DdlException("It is highly NOT RECOMMENDED to use DROP BACKEND stmt."
@ -161,33 +148,20 @@ public class SystemHandler extends AlterHandler {
}
Catalog.getCurrentSystemInfo().dropBackends(dropBackendClause.getHostPortPairs());
} else if (alterClause instanceof DecommissionBackendClause) {
// decommission
DecommissionBackendClause decommissionBackendClause = (DecommissionBackendClause) alterClause;
// check request
// clusterName -> (beId -> backend)
Map<String, Map<Long, Backend>> clusterBackendsMap = checkDecommission(decommissionBackendClause);
List<Backend> decommissionBackends = checkDecommission(decommissionBackendClause);
// set backend's state as 'decommissioned'
for (Map<Long, Backend> backends : clusterBackendsMap.values()) {
for (Backend backend : backends.values()) {
if (((DecommissionBackendClause) alterClause).getType() == DecommissionType.ClusterDecommission) {
backend.setBackendState(BackendState.offline);
}
backend.setDecommissioned(true);
backend.setDecommissionType(((DecommissionBackendClause) alterClause).getType());
Catalog.getInstance().getEditLog().logBackendStateChange(backend);
}
// for decommission operation, here is no decommission job. the system handler will check
// all backend in decommission state
for (Backend backend : decommissionBackends) {
backend.setDecommissioned(true);
Catalog.getCurrentCatalog().getEditLog().logBackendStateChange(backend);
LOG.info("set backend {} to decommission", backend.getId());
}
// add job
long jobId = Catalog.getInstance().getNextId();
DecommissionBackendJob decommissionBackendJob = new DecommissionBackendJob(jobId, clusterBackendsMap);
decommissionBackendJob.setDecommissionType(decommissionBackendClause.getType());
addAlterJob(decommissionBackendJob);
// log
Catalog.getInstance().getEditLog().logStartDecommissionBackend(decommissionBackendJob);
LOG.info("decommission backend job[{}] created. {}", jobId, decommissionBackendClause.toSql());
} else if (alterClause instanceof AddObserverClause) {
AddObserverClause clause = (AddObserverClause) alterClause;
Catalog.getInstance().addFrontend(FrontendNodeType.OBSERVER, clause.getHost(), clause.getPort());
@ -211,126 +185,52 @@ public class SystemHandler extends AlterHandler {
}
}
private Map<String, Map<Long, Backend>> checkDecommission(DecommissionBackendClause decommissionBackendClause)
private List<Backend> checkDecommission(DecommissionBackendClause decommissionBackendClause)
throws DdlException {
return checkDecommission(decommissionBackendClause.getHostPortPairs());
}
public static Map<String, Map<Long, Backend>> checkDecommission(List<Pair<String, Integer>> hostPortPairs)
/*
* check if the specified backends can be decommissioned
* 1. backend should exist.
* 2. after decommission, the remaining backend num should meet the replication num.
* 3. after decommission, The remaining space capacity can store data on decommissioned backends.
*/
public static List<Backend> checkDecommission(List<Pair<String, Integer>> hostPortPairs)
throws DdlException {
// check
Catalog.getCurrentSystemInfo().checkBackendsExist(hostPortPairs);
// in Multi-Tenancy , we will check decommission in every cluster
// check if backend is under decommissioned
// clusterName -> (beId -> backend)
final Map<String, Map<Long, Backend>> decommClusterBackendsMap = Maps.newHashMap();
SystemInfoService infoService = Catalog.getCurrentSystemInfo();
List<Backend> decommissionBackends = Lists.newArrayList();
// check if exist
for (Pair<String, Integer> pair : hostPortPairs) {
Backend backend = Catalog.getCurrentSystemInfo().getBackendWithHeartbeatPort(pair.first, pair.second);
Backend backend = infoService.getBackendWithHeartbeatPort(pair.first, pair.second);
if (backend == null) {
throw new DdlException("Backend does not exist[" + pair.first + ":" + pair.second + "]");
}
if (backend.isDecommissioned()) {
// it's ok to resend decommission command. just log
LOG.info("backend[{}] is already under decommissioned.", backend.getHost());
// already under decommission, ignore it
continue;
}
Map<Long, Backend> backends = decommClusterBackendsMap.get(backend.getOwnerClusterName());
if (backends == null) {
backends = Maps.newHashMap();
decommClusterBackendsMap.put(backend.getOwnerClusterName(), backends);
}
backends.put(backend.getId(), backend);
decommissionBackends.add(backend);
}
for (String clusterName : decommClusterBackendsMap.keySet()) {
// check available capacity for decommission.
// we need to make sure that there is enough space in this cluster
// to store the data from decommissioned backends.
long totalAvailableCapacityB = 0L;
long totalNeededCapacityB = 0L; // decommission + dead
int availableBackendNum = 0;
final Map<Long, Backend> decommBackendsInCluster = decommClusterBackendsMap.get(clusterName);
// TODO(cmy): check if replication num can be met
// TODO(cmy): check remaining space
// get all backends in this cluster
final Map<Long, Backend> idToBackendsInCluster =
Catalog.getCurrentSystemInfo().getBackendsInCluster(clusterName);
for (Entry<Long, Backend> entry : idToBackendsInCluster.entrySet()) {
long backendId = entry.getKey();
Backend backend = entry.getValue();
if (decommBackendsInCluster.containsKey(backendId)
|| !backend.isAvailable()) {
totalNeededCapacityB += backend.getDataUsedCapacityB();
} else {
++availableBackendNum;
totalAvailableCapacityB += backend.getAvailableCapacityB();
}
}
// if the space we needed is larger than the current available capacity * 0.85,
// we refuse this decommission operation.
if (totalNeededCapacityB > totalAvailableCapacityB * (Config.storage_high_watermark_usage_percent / 100.0)) {
throw new DdlException("No available capacity for decommission in cluster: " + clusterName
+ ", needed: " + totalNeededCapacityB + ", available: " + totalAvailableCapacityB
+ ", threshold: " + Config.storage_high_watermark_usage_percent);
}
// backend num not enough
if (availableBackendNum == 0) {
throw new DdlException("No available backend");
}
// check if meet replication number requirement
List<String> dbNames;
try {
dbNames = Catalog.getInstance().getClusterDbNames(clusterName);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
for (String dbName : dbNames) {
Database db = Catalog.getInstance().getDb(dbName);
if (db == null) {
continue;
}
db.readLock();
try {
for (Table table : db.getTables()) {
if (table.getType() != TableType.OLAP) {
continue;
}
OlapTable olapTable = (OlapTable) table;
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
for (Partition partition : olapTable.getPartitions()) {
short replicationNum = partitionInfo.getReplicationNum(partition.getId());
if (availableBackendNum < replicationNum) {
throw new DdlException("Table[" + table.getName() + "] in database[" + dbName
+ "] need more than " + replicationNum
+ " available backends to meet replication num requirement");
}
}
}
} finally {
db.readUnlock();
}
}
}
return decommClusterBackendsMap;
return decommissionBackends;
}
@Override
public synchronized void cancel(CancelStmt stmt) throws DdlException {
CancelAlterSystemStmt cancelAlterClusterStmt = (CancelAlterSystemStmt) stmt;
cancelAlterClusterStmt.getHostPortPairs();
CancelAlterSystemStmt cancelAlterSystemStmt = (CancelAlterSystemStmt) stmt;
cancelAlterSystemStmt.getHostPortPairs();
SystemInfoService clusterInfo = Catalog.getCurrentSystemInfo();
SystemInfoService infoService = Catalog.getCurrentSystemInfo();
// check if backends is under decommission
List<Backend> backends = Lists.newArrayList();
List<Pair<String, Integer>> hostPortPairs = cancelAlterClusterStmt.getHostPortPairs();
List<Pair<String, Integer>> hostPortPairs = cancelAlterSystemStmt.getHostPortPairs();
for (Pair<String, Integer> pair : hostPortPairs) {
// check if exist
Backend backend = clusterInfo.getBackendWithHeartbeatPort(pair.first, pair.second);
Backend backend = infoService.getBackendWithHeartbeatPort(pair.first, pair.second);
if (backend == null) {
throw new DdlException("Backend does not exists[" + pair.first + "]");
}
@ -338,6 +238,7 @@ public class SystemHandler extends AlterHandler {
if (!backend.isDecommissioned()) {
// it's ok. just log
LOG.info("backend is not decommissioned[{}]", pair.first);
continue;
}
backends.add(backend);

View File

@ -20,6 +20,7 @@ package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.system.SystemInfoService;
import com.google.common.base.Preconditions;
import java.util.LinkedList;
@ -52,7 +53,7 @@ public class CancelAlterSystemStmt extends CancelStmt {
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("CANCEL ALTER CLUSTER DECOMMISSION BACKEND ");
sb.append("CANCEL DECOMMISSION BACKEND ");
for (int i = 0; i < hostPorts.size(); i++) {
sb.append("\"").append(hostPorts.get(i)).append("\"");
if (i != hostPorts.size() - 1) {

View File

@ -175,15 +175,6 @@ public class SystemInfoService {
MetricRepo.generateTabletNumMetrics();
}
public void checkBackendsExist(List<Pair<String, Integer>> hostPortPairs) throws DdlException {
for (Pair<String, Integer> pair : hostPortPairs) {
// check if exist
if (getBackendWithHeartbeatPort(pair.first, pair.second) == null) {
throw new DdlException("Backend does not exist[" + pair.first + ":" + pair.second + "]");
}
}
}
public void dropBackends(List<Pair<String, Integer>> hostPortPairs) throws DdlException {
for (Pair<String, Integer> pair : hostPortPairs) {
// check is already exist
@ -1046,6 +1037,13 @@ public class SystemInfoService {
public void updateBackendState(Backend be) {
long id = be.getId();
Backend memoryBe = getBackend(id);
if (memoryBe == null) {
// backend may already be dropped. this may happen when
// 1. SystemHandler drop the decommission backend
// 2. at same time, user try to cancel the decommission of that backend.
// These two operations do not guarantee the order.
return;
}
memoryBe.setBePort(be.getBePort());
memoryBe.setAlive(be.isAlive());
memoryBe.setDecommissioned(be.isDecommissioned());