[feature](alter backends)backend modify & drop & decommission by ids (#25444)
This commit is contained in:
@ -36,8 +36,16 @@ The node offline operation is used to safely log off the node. The operation is
|
||||
|
||||
grammar:
|
||||
|
||||
- Find backend through host and port
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM DECOMMISSION BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
|
||||
````
|
||||
|
||||
- Find backend through backend_id
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM DECOMMISSION BACKEND "id1","id2"...;
|
||||
````
|
||||
|
||||
illustrate:
|
||||
@ -55,6 +63,10 @@ ALTER SYSTEM DECOMMISSION BACKEND "host:heartbeat_port"[,"host:heartbeat_port"..
|
||||
ALTER SYSTEM DECOMMISSION BACKEND "host1:port", "host2:port";
|
||||
````
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM DECOMMISSION BACKEND "id1", "id2";
|
||||
````
|
||||
|
||||
### Keywords
|
||||
|
||||
ALTER, SYSTEM, DECOMMISSION, BACKEND, ALTER SYSTEM
|
||||
|
||||
@ -36,9 +36,16 @@ This statement is used to delete the BACKEND node (administrator only!)
|
||||
|
||||
grammar:
|
||||
|
||||
- Find backend through host and port
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM DROP BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...]
|
||||
````
|
||||
- Find backend through backend_id
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM DROP BACKEND "id1","id2"...;
|
||||
````
|
||||
|
||||
illustrate:
|
||||
|
||||
@ -54,6 +61,10 @@ illustrate:
|
||||
ALTER SYSTEM DROP BACKEND "host1:port", "host2:port";
|
||||
````
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM DROP BACKEND "ids1", "ids2";
|
||||
````
|
||||
|
||||
### Keywords
|
||||
|
||||
ALTER, SYSTEM, DROP, BACKEND, ALTER SYSTEM
|
||||
|
||||
@ -37,8 +37,16 @@ Modify BE node properties (administrator only!)
|
||||
|
||||
grammar:
|
||||
|
||||
- Find backend through host and port
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "host:heartbeat_port" SET ("key" = "value"[, ...]);
|
||||
````
|
||||
|
||||
- Find backend through backend_id
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "id1" SET ("key" = "value"[, ...]);
|
||||
````
|
||||
|
||||
illustrate:
|
||||
@ -62,18 +70,31 @@ Note:
|
||||
ALTER SYSTEM MODIFY BACKEND "host1:heartbeat_port" SET ("tag.location" = "group_a");
|
||||
ALTER SYSTEM MODIFY BACKEND "host1:heartbeat_port" SET ("tag.location" = "group_a", "tag.compute" = "c1");
|
||||
````
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "id1" SET ("tag.location" = "group_a");
|
||||
ALTER SYSTEM MODIFY BACKEND "id1" SET ("tag.location" = "group_a", "tag.compute" = "c1");
|
||||
````
|
||||
|
||||
2. Modify the query disable property of BE
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "host1:heartbeat_port" SET ("disable_query" = "true");
|
||||
````
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "id1" SET ("disable_query" = "true");
|
||||
````
|
||||
|
||||
3. Modify the import disable property of BE
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "host1:heartbeat_port" SET ("disable_load" = "true");
|
||||
````
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "id1" SET ("disable_load" = "true");
|
||||
````
|
||||
|
||||
### Keywords
|
||||
|
||||
|
||||
@ -36,8 +36,16 @@ ALTER SYSTEM DECOMMISSION BACKEND
|
||||
|
||||
语法:
|
||||
|
||||
- 通过 host 和 port 查找 backend
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM DECOMMISSION BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
|
||||
```
|
||||
|
||||
- 通过 backend_id 查找 backend
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM DECOMMISSION BACKEND "id1","id2"...;
|
||||
```
|
||||
|
||||
说明:
|
||||
@ -54,6 +62,10 @@ ALTER SYSTEM DECOMMISSION BACKEND "host:heartbeat_port"[,"host:heartbeat_port"..
|
||||
```sql
|
||||
ALTER SYSTEM DECOMMISSION BACKEND "host1:port", "host2:port";
|
||||
```
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM DECOMMISSION BACKEND "id1", "id2";
|
||||
```
|
||||
|
||||
### Keywords
|
||||
|
||||
|
||||
@ -36,10 +36,18 @@ ALTER SYSTEM DROP BACKEND
|
||||
|
||||
语法:
|
||||
|
||||
- 通过 host 和 port 查找 backend
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM DROP BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...]
|
||||
```
|
||||
|
||||
- 通过 backend_id 查找 backend
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM DROP BACKEND "id1","id2"...;
|
||||
```
|
||||
|
||||
说明:
|
||||
|
||||
1. host 可以是主机名或者ip地址
|
||||
@ -53,6 +61,10 @@ ALTER SYSTEM DROP BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...]
|
||||
```sql
|
||||
ALTER SYSTEM DROP BACKEND "host1:port", "host2:port";
|
||||
```
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM DROP BACKEND "id1", "id2";
|
||||
```
|
||||
|
||||
### Keywords
|
||||
|
||||
|
||||
@ -37,10 +37,18 @@ ALTER SYSTEM MKDIFY BACKEND
|
||||
|
||||
语法:
|
||||
|
||||
- 通过 host 和 port 查找 backend
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "host:heartbeat_port" SET ("key" = "value"[, ...]);
|
||||
```
|
||||
|
||||
- 通过 backend_id 查找 backend
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "id1" SET ("key" = "value"[, ...]);
|
||||
````
|
||||
|
||||
说明:
|
||||
|
||||
1. host 可以是主机名或者ip地址
|
||||
@ -63,16 +71,31 @@ ALTER SYSTEM MODIFY BACKEND "host:heartbeat_port" SET ("key" = "value"[, ...]);
|
||||
ALTER SYSTEM MODIFY BACKEND "host1:heartbeat_port" SET ("tag.location" = "group_a", "tag.compute" = "c1");
|
||||
```
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "id1" SET ("tag.location" = "group_a");
|
||||
ALTER SYSTEM MODIFY BACKEND "id1" SET ("tag.location" = "group_a", "tag.compute" = "c1");
|
||||
````
|
||||
|
||||
2. 修改 BE 的查询禁用属性
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "host1:heartbeat_port" SET ("disable_query" = "true");
|
||||
```
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "id1" SET ("disable_query" = "true");
|
||||
````
|
||||
|
||||
3. 修改 BE 的导入禁用属性
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "host1:heartbeat_port" SET ("disable_load" = "true");
|
||||
```
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "id1" SET ("disable_load" = "true");
|
||||
````
|
||||
|
||||
### Keywords
|
||||
|
||||
ALTER, SYSTEM, ADD, BACKEND, ALTER SYSTEM
|
||||
|
||||
@ -86,7 +86,7 @@ import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -739,7 +739,7 @@ public class Alter {
|
||||
}
|
||||
|
||||
public void processAlterCluster(AlterSystemStmt stmt) throws UserException {
|
||||
clusterHandler.process(Arrays.asList(stmt.getAlterClause()), stmt.getClusterName(), null, null);
|
||||
clusterHandler.process(Collections.singletonList(stmt.getAlterClause()), stmt.getClusterName(), null, null);
|
||||
}
|
||||
|
||||
private void processRename(Database db, OlapTable table, List<AlterClause> alterClauses) throws DdlException {
|
||||
|
||||
@ -125,7 +125,13 @@ public class SystemHandler extends AlterHandler {
|
||||
+ "All data on this backend will be discarded permanently. "
|
||||
+ "If you insist, use DROPP instead of DROP");
|
||||
}
|
||||
Env.getCurrentSystemInfo().dropBackends(dropBackendClause.getHostInfos());
|
||||
if (dropBackendClause.getHostInfos().isEmpty()) {
|
||||
// drop by id
|
||||
Env.getCurrentSystemInfo().dropBackendsByIds(dropBackendClause.getIds());
|
||||
} else {
|
||||
// drop by host
|
||||
Env.getCurrentSystemInfo().dropBackends(dropBackendClause.getHostInfos());
|
||||
}
|
||||
} else if (alterClause instanceof DecommissionBackendClause) {
|
||||
// decommission
|
||||
DecommissionBackendClause decommissionBackendClause = (DecommissionBackendClause) alterClause;
|
||||
@ -192,6 +198,9 @@ public class SystemHandler extends AlterHandler {
|
||||
|
||||
private List<Backend> checkDecommission(DecommissionBackendClause decommissionBackendClause)
|
||||
throws DdlException {
|
||||
if (decommissionBackendClause.getHostInfos().isEmpty()) {
|
||||
return checkDecommissionByIds(decommissionBackendClause.getIds());
|
||||
}
|
||||
return checkDecommission(decommissionBackendClause.getHostInfos());
|
||||
}
|
||||
|
||||
@ -226,6 +235,29 @@ public class SystemHandler extends AlterHandler {
|
||||
return decommissionBackends;
|
||||
}
|
||||
|
||||
public static List<Backend> checkDecommissionByIds(List<String> ids)
|
||||
throws DdlException {
|
||||
SystemInfoService infoService = Env.getCurrentSystemInfo();
|
||||
List<Backend> decommissionBackends = Lists.newArrayList();
|
||||
// check if exist
|
||||
for (String id : ids) {
|
||||
Backend backend = infoService.getBackend(Long.parseLong(id));
|
||||
if (backend == null) {
|
||||
throw new DdlException("Backend does not exist, backend id is " + id);
|
||||
}
|
||||
if (backend.isDecommissioned()) {
|
||||
// already under decommission, ignore it
|
||||
continue;
|
||||
}
|
||||
decommissionBackends.add(backend);
|
||||
}
|
||||
|
||||
// TODO(cmy): check if replication num can be met
|
||||
// TODO(cmy): check remaining space
|
||||
|
||||
return decommissionBackends;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void cancel(CancelStmt stmt) throws DdlException {
|
||||
CancelAlterSystemStmt cancelAlterSystemStmt = (CancelAlterSystemStmt) stmt;
|
||||
|
||||
@ -23,12 +23,14 @@ import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.resource.Tag;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class AddBackendClause extends BackendClause {
|
||||
protected Map<String, String> properties = Maps.newHashMap();
|
||||
@Getter
|
||||
private Map<String, String> tagMap;
|
||||
|
||||
public AddBackendClause(List<String> hostPorts) {
|
||||
@ -43,10 +45,6 @@ public class AddBackendClause extends BackendClause {
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, String> getTagMap() {
|
||||
return tagMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException {
|
||||
super.analyze(analyzer);
|
||||
@ -69,9 +67,9 @@ public class AddBackendClause extends BackendClause {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("ADD ");
|
||||
sb.append("BACKEND ");
|
||||
for (int i = 0; i < hostPorts.size(); i++) {
|
||||
sb.append("\"").append(hostPorts.get(i)).append("\"");
|
||||
if (i != hostPorts.size() - 1) {
|
||||
for (int i = 0; i < params.size(); i++) {
|
||||
sb.append("\"").append(params.get(i)).append("\"");
|
||||
if (i != params.size() - 1) {
|
||||
sb.append(", ");
|
||||
}
|
||||
}
|
||||
|
||||
@ -25,19 +25,17 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import lombok.Getter;
|
||||
|
||||
@Getter
|
||||
public class AlterSystemStmt extends DdlStmt {
|
||||
|
||||
private AlterClause alterClause;
|
||||
private final AlterClause alterClause;
|
||||
|
||||
public AlterSystemStmt(AlterClause alterClause) {
|
||||
this.alterClause = alterClause;
|
||||
}
|
||||
|
||||
public AlterClause getAlterClause() {
|
||||
return alterClause;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws UserException {
|
||||
|
||||
@ -65,9 +63,7 @@ public class AlterSystemStmt extends DdlStmt {
|
||||
|
||||
@Override
|
||||
public String toSql() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("ALTER SYSTEM ").append(alterClause.toSql());
|
||||
return sb.toString();
|
||||
return "ALTER SYSTEM " + alterClause.toSql();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -24,37 +24,46 @@ import org.apache.doris.system.SystemInfoService.HostInfo;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.Getter;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class BackendClause extends AlterClause {
|
||||
protected List<String> hostPorts;
|
||||
protected List<String> params;
|
||||
@Getter
|
||||
protected List<HostInfo> hostInfos;
|
||||
|
||||
@Getter
|
||||
protected List<String> ids;
|
||||
|
||||
public static final String MUTLI_TAG_DISABLED_MSG = "Not support multi tags for Backend now. "
|
||||
+ "You can set 'enable_multi_tags=true' in fe.conf to enable this feature.";
|
||||
public static final String NEED_LOCATION_TAG_MSG
|
||||
= "Backend must have location type tag. Eg: 'tag.location' = 'xxx'.";
|
||||
|
||||
protected BackendClause(List<String> hostPorts) {
|
||||
protected BackendClause(List<String> params) {
|
||||
super(AlterOpType.ALTER_OTHER);
|
||||
this.hostPorts = hostPorts;
|
||||
this.params = params;
|
||||
this.ids = Lists.newArrayList();
|
||||
this.hostInfos = Lists.newArrayList();
|
||||
}
|
||||
|
||||
public List<HostInfo> getHostInfos() {
|
||||
return hostInfos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException {
|
||||
for (String hostPort : hostPorts) {
|
||||
HostInfo hostInfo = SystemInfoService.getHostAndPort(hostPort);
|
||||
hostInfos.add(hostInfo);
|
||||
|
||||
for (String param : params) {
|
||||
if (!param.contains(":")) {
|
||||
ids.add(param);
|
||||
} else {
|
||||
HostInfo hostInfo = SystemInfoService.getHostAndPort(param);
|
||||
this.hostInfos.add(hostInfo);
|
||||
}
|
||||
|
||||
}
|
||||
Preconditions.checkState(!hostInfos.isEmpty());
|
||||
Preconditions.checkState(!this.hostInfos.isEmpty() || !this.ids.isEmpty(),
|
||||
"hostInfos or ids can not be empty");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -29,9 +29,9 @@ public class DecommissionBackendClause extends BackendClause {
|
||||
public String toSql() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("DECOMMISSION BACKEND ");
|
||||
for (int i = 0; i < hostPorts.size(); i++) {
|
||||
sb.append("\"").append(hostPorts.get(i)).append("\"");
|
||||
if (i != hostPorts.size() - 1) {
|
||||
for (int i = 0; i < params.size(); i++) {
|
||||
sb.append("\"").append(params.get(i)).append("\"");
|
||||
if (i != params.size() - 1) {
|
||||
sb.append(", ");
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,52 +17,31 @@
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.system.SystemInfoService.HostInfo;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Getter
|
||||
public class DropBackendClause extends BackendClause {
|
||||
private boolean force;
|
||||
private final boolean force;
|
||||
|
||||
public DropBackendClause(List<String> hostPorts) {
|
||||
super(hostPorts);
|
||||
public DropBackendClause(List<String> params) {
|
||||
super(params);
|
||||
this.force = true;
|
||||
}
|
||||
|
||||
public DropBackendClause(List<String> hostPorts, boolean force) {
|
||||
super(hostPorts);
|
||||
public DropBackendClause(List<String> params, boolean force) {
|
||||
super(params);
|
||||
this.force = force;
|
||||
}
|
||||
|
||||
public boolean isForce() {
|
||||
return force;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException {
|
||||
if (Config.enable_fqdn_mode) {
|
||||
for (String hostPort : hostPorts) {
|
||||
HostInfo hostInfo = SystemInfoService.getHostAndPort(hostPort);
|
||||
hostInfos.add(hostInfo);
|
||||
}
|
||||
Preconditions.checkState(!hostInfos.isEmpty());
|
||||
} else {
|
||||
super.analyze(analyzer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toSql() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("DROP BACKEND ");
|
||||
for (int i = 0; i < hostPorts.size(); i++) {
|
||||
sb.append("\"").append(hostPorts.get(i)).append("\"");
|
||||
if (i != hostPorts.size() - 1) {
|
||||
for (int i = 0; i < params.size(); i++) {
|
||||
sb.append("\"").append(params.get(i)).append("\"");
|
||||
if (i != params.size() - 1) {
|
||||
sb.append(", ");
|
||||
}
|
||||
}
|
||||
|
||||
@ -23,14 +23,16 @@ import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.resource.Tag;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.Getter;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ModifyBackendClause extends BackendClause {
|
||||
protected Map<String, String> properties = Maps.newHashMap();
|
||||
protected Map<String, String> properties;
|
||||
protected Map<String, String> analyzedProperties = Maps.newHashMap();
|
||||
@Getter
|
||||
private Map<String, String> tagMap = null;
|
||||
private Boolean isQueryDisabled = null;
|
||||
private Boolean isLoadDisabled = null;
|
||||
@ -74,10 +76,6 @@ public class ModifyBackendClause extends BackendClause {
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, String> getTagMap() {
|
||||
return tagMap;
|
||||
}
|
||||
|
||||
public Boolean isQueryDisabled() {
|
||||
return isQueryDisabled;
|
||||
}
|
||||
@ -90,9 +88,9 @@ public class ModifyBackendClause extends BackendClause {
|
||||
public String toSql() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("MODIFY BACKEND ");
|
||||
for (int i = 0; i < hostPorts.size(); i++) {
|
||||
sb.append("\"").append(hostPorts.get(i)).append("\"");
|
||||
if (i != hostPorts.size() - 1) {
|
||||
for (int i = 0; i < params.size(); i++) {
|
||||
sb.append("\"").append(params.get(i)).append("\"");
|
||||
if (i != params.size() - 1) {
|
||||
sb.append(", ");
|
||||
}
|
||||
}
|
||||
|
||||
@ -235,12 +235,21 @@ public class SystemInfoService {
|
||||
.getHostPortInAccessibleFormat(hostInfo.getHost(), hostInfo.getPort());
|
||||
throw new DdlException("backend does not exists[" + backendIdentifier + "]");
|
||||
}
|
||||
}
|
||||
for (HostInfo hostInfo : hostInfos) {
|
||||
dropBackend(hostInfo.getHost(), hostInfo.getPort());
|
||||
}
|
||||
}
|
||||
|
||||
public void dropBackendsByIds(List<String> ids) throws DdlException {
|
||||
|
||||
for (String id : ids) {
|
||||
if (getBackend(Long.parseLong(id)) == null) {
|
||||
throw new DdlException("backend does not exists[" + id + "]");
|
||||
}
|
||||
dropBackend(Long.parseLong(id));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// for decommission
|
||||
public void dropBackend(long backendId) throws DdlException {
|
||||
Backend backend = getBackend(backendId);
|
||||
@ -896,14 +905,26 @@ public class SystemInfoService {
|
||||
public void modifyBackends(ModifyBackendClause alterClause) throws UserException {
|
||||
List<HostInfo> hostInfos = alterClause.getHostInfos();
|
||||
List<Backend> backends = Lists.newArrayList();
|
||||
for (HostInfo hostInfo : hostInfos) {
|
||||
Backend be = getBackendWithHeartbeatPort(hostInfo.getHost(), hostInfo.getPort());
|
||||
if (be == null) {
|
||||
throw new DdlException(
|
||||
"backend does not exists[" + NetUtils
|
||||
.getHostPortInAccessibleFormat(hostInfo.getHost(), hostInfo.getPort()) + "]");
|
||||
if (hostInfos.isEmpty()) {
|
||||
List<String> ids = alterClause.getIds();
|
||||
for (String id : ids) {
|
||||
long backendId = Long.parseLong(id);
|
||||
Backend be = getBackend(backendId);
|
||||
if (be == null) {
|
||||
throw new DdlException("backend does not exists[" + backendId + "]");
|
||||
}
|
||||
backends.add(be);
|
||||
}
|
||||
} else {
|
||||
for (HostInfo hostInfo : hostInfos) {
|
||||
Backend be = getBackendWithHeartbeatPort(hostInfo.getHost(), hostInfo.getPort());
|
||||
if (be == null) {
|
||||
throw new DdlException(
|
||||
"backend does not exists[" + NetUtils
|
||||
.getHostPortInAccessibleFormat(hostInfo.getHost(), hostInfo.getPort()) + "]");
|
||||
}
|
||||
backends.add(be);
|
||||
}
|
||||
backends.add(be);
|
||||
}
|
||||
|
||||
for (Backend be : backends) {
|
||||
|
||||
@ -35,7 +35,7 @@ import java.util.List;
|
||||
public class DecommissionBackendTest extends TestWithFeService {
|
||||
@Override
|
||||
protected int backendNum() {
|
||||
return 3;
|
||||
return 4;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -78,18 +78,18 @@ public class DecommissionBackendTest extends TestWithFeService {
|
||||
// 5. execute decommission
|
||||
Backend srcBackend = null;
|
||||
for (Backend backend : idToBackendRef.values()) {
|
||||
if (Env.getCurrentInvertedIndex().getTabletIdsByBackendId(backend.getId()).size() > 0) {
|
||||
if (!Env.getCurrentInvertedIndex().getTabletIdsByBackendId(backend.getId()).isEmpty()) {
|
||||
srcBackend = backend;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Assertions.assertTrue(srcBackend != null);
|
||||
Assertions.assertNotNull(srcBackend);
|
||||
String decommissionStmtStr = "alter system decommission backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\"";
|
||||
AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr);
|
||||
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
|
||||
|
||||
Assertions.assertEquals(true, srcBackend.isDecommissioned());
|
||||
Assertions.assertTrue(srcBackend.isDecommissioned());
|
||||
long startTimestamp = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - startTimestamp < 90000
|
||||
&& Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) {
|
||||
@ -110,6 +110,34 @@ public class DecommissionBackendTest extends TestWithFeService {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDecommissionBackendById() throws Exception {
|
||||
|
||||
ImmutableMap<Long, Backend> idToBackendRef = Env.getCurrentSystemInfo().getIdToBackend();
|
||||
Backend srcBackend = idToBackendRef.values().asList().get(0);
|
||||
Assertions.assertNotNull(srcBackend);
|
||||
// decommission backend by id
|
||||
String decommissionByIdStmtStr = "alter system decommission backend \"" + srcBackend.getId() + "\"";
|
||||
AlterSystemStmt decommissionByIdStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionByIdStmtStr);
|
||||
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionByIdStmt);
|
||||
|
||||
Assertions.assertTrue(srcBackend.isDecommissioned());
|
||||
long startTimestamp = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - startTimestamp < 90000
|
||||
&& Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getIdToBackend().size());
|
||||
|
||||
// add backend
|
||||
String addBackendStmtStr = "alter system add backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\"";
|
||||
AlterSystemStmt addBackendStmt = (AlterSystemStmt) parseAndAnalyzeStmt(addBackendStmtStr);
|
||||
Env.getCurrentEnv().getAlterInstance().processAlterCluster(addBackendStmt);
|
||||
Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getIdToBackend().size());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDecommissionBackendWithDropTable() throws Exception {
|
||||
// 1. create connect context
|
||||
@ -132,12 +160,12 @@ public class DecommissionBackendTest extends TestWithFeService {
|
||||
|
||||
Backend srcBackend = null;
|
||||
for (Backend backend : idToBackendRef.values()) {
|
||||
if (Env.getCurrentInvertedIndex().getTabletIdsByBackendId(backend.getId()).size() > 0) {
|
||||
if (!Env.getCurrentInvertedIndex().getTabletIdsByBackendId(backend.getId()).isEmpty()) {
|
||||
srcBackend = backend;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Assertions.assertTrue(srcBackend != null);
|
||||
Assertions.assertNotNull(srcBackend);
|
||||
|
||||
// 5. drop table tbl1
|
||||
dropTable("db2.tbl1", false);
|
||||
@ -146,7 +174,7 @@ public class DecommissionBackendTest extends TestWithFeService {
|
||||
String decommissionStmtStr = "alter system decommission backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\"";
|
||||
AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr);
|
||||
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
|
||||
Assertions.assertEquals(true, srcBackend.isDecommissioned());
|
||||
Assertions.assertTrue(srcBackend.isDecommissioned());
|
||||
|
||||
long startTimestamp = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - startTimestamp < 90000
|
||||
@ -163,7 +191,7 @@ public class DecommissionBackendTest extends TestWithFeService {
|
||||
|
||||
// TabletInvertedIndex still holds these tablets of srcBackend, but they are all in recycled status
|
||||
List<Long> tabletList = Env.getCurrentInvertedIndex().getTabletIdsByBackendId(srcBackend.getId());
|
||||
Assertions.assertTrue(tabletList.size() > 0);
|
||||
Assertions.assertFalse(tabletList.isEmpty());
|
||||
Assertions.assertTrue(Env.getCurrentRecycleBin().allTabletsInRecycledStatus(tabletList));
|
||||
|
||||
// recover tbl1, because tbl1 has more than one replica, so it still can be recovered
|
||||
@ -176,4 +204,6 @@ public class DecommissionBackendTest extends TestWithFeService {
|
||||
Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getIdToBackend().size());
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -264,6 +264,33 @@ public class SystemInfoServiceTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void removeBackendTestByBackendId() throws UserException {
|
||||
clearAllBackend();
|
||||
AddBackendClause stmt = new AddBackendClause(Lists.newArrayList("192.168.0.1:1234"));
|
||||
stmt.analyze(analyzer);
|
||||
try {
|
||||
Env.getCurrentSystemInfo().addBackends(stmt.getHostInfos(), true);
|
||||
} catch (DdlException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
DropBackendClause dropStmt = new DropBackendClause(Lists.newArrayList(String.valueOf(backendId)));
|
||||
dropStmt.analyze(analyzer);
|
||||
try {
|
||||
Env.getCurrentSystemInfo().dropBackends(dropStmt.getHostInfos());
|
||||
} catch (DdlException e) {
|
||||
e.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
try {
|
||||
Env.getCurrentSystemInfo().dropBackends(dropStmt.getHostInfos());
|
||||
} catch (DdlException e) {
|
||||
Assert.assertTrue(e.getMessage().contains("does not exist"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSaveLoadBackend() throws Exception {
|
||||
clearAllBackend();
|
||||
|
||||
Reference in New Issue
Block a user