[enhancement](k8s) Support fqdn mode for be in k8s enviroment (#9172)

In the k8s environment, the ip of the pod can be changed, but the hostname of pod is stable. When the host machine of the pod fails, the k8s can schedule the failed pod to the new host machine for reconstruction. After that, the newly created pod's hostname remains unchanged, and the ip address has been changed. The change of the be node's ip address can be detected by FQDNManager when enable_fqdn_mode is true

Co-authored-by: caiconghui1 <caiconghui1@jd.com>
This commit is contained in:
caiconghui
2022-11-30 20:42:15 +08:00
committed by GitHub
parent 80baca2643
commit 9bbbcf031c
18 changed files with 425 additions and 178 deletions

View File

@ -2346,8 +2346,6 @@ Default: 3
Is it possible to dynamically configure: true
Is it a configuration item unique to the Master FE node: true
### `enable_storage_policy`
Whether to enable the Storage Policy feature. This feature allows users to separate hot and cold data. This feature is still under development. Recommended for test environments only.
@ -2358,3 +2356,13 @@ Is it possible to dynamically configure: true
Is it a configuration item unique to the Master FE node: true
### `enable_fqdn_mode`
This configuration is mainly used in the k8s cluster environment. When enable_fqdn_mode is true, the name of the pod where the be is located will remain unchanged after reconstruction, while the ip can be changed.
Default: false
Is it possible to dynamically configure: false
Is it a configuration item unique to the Master FE node: true

View File

@ -2259,7 +2259,7 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清
### backend_rpc_timeout_ms
FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒。
FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒。
默认值:60000
@ -2278,10 +2278,11 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清
是否为 Master FE 节点独有的配置项:false
### enable_fqdn_mode
FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒
此配置用于 k8s 部署环境。当 enable_k8s_detect_container_drift_mode 为 true 时,将允许更改 be 或 broker 的重建 pod的 ip
默认值:60000
默认值: false
是否可以动态配置:false
@ -2340,7 +2341,6 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清
是否为 Master FE 节点独有的配置项:true
### `max_replica_count_when_schema_change`
OlapTable在做schema change时,允许的最大副本数,副本数过大会导致FE OOM。
@ -2414,3 +2414,12 @@ hive partition 的最大缓存数量。
是否为 Master FE 节点独有的配置项:true
### `enable_fqdn_mode`
此配置用于 k8s 部署环境。当 enable_fqdn_mode 为 true 时,将允许更改 be 的重建 pod的 ip。
默认值: false
是否可以动态配置:false
是否为 Master FE 节点独有的配置项:true

View File

@ -38,11 +38,11 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@ -125,7 +125,7 @@ public class SystemHandler extends AlterHandler {
if (!Strings.isNullOrEmpty(destClusterName) && Env.getCurrentEnv().getCluster(destClusterName) == null) {
throw new DdlException("Cluster: " + destClusterName + " does not exist.");
}
Env.getCurrentSystemInfo().addBackends(addBackendClause.getHostPortPairs(), addBackendClause.isFree(),
Env.getCurrentSystemInfo().addBackends(addBackendClause.getHostInfos(), addBackendClause.isFree(),
addBackendClause.getDestCluster(), addBackendClause.getTagMap());
} else if (alterClause instanceof DropBackendClause) {
// drop backend
@ -136,7 +136,7 @@ public class SystemHandler extends AlterHandler {
+ "All data on this backend will be discarded permanently. "
+ "If you insist, use DROPP instead of DROP");
}
Env.getCurrentSystemInfo().dropBackends(dropBackendClause.getHostPortPairs());
Env.getCurrentSystemInfo().dropBackends(dropBackendClause.getHostInfos());
} else if (alterClause instanceof DecommissionBackendClause) {
// decommission
DecommissionBackendClause decommissionBackendClause = (DecommissionBackendClause) alterClause;
@ -197,7 +197,7 @@ public class SystemHandler extends AlterHandler {
private List<Backend> checkDecommission(DecommissionBackendClause decommissionBackendClause)
throws DdlException {
return checkDecommission(decommissionBackendClause.getHostPortPairs());
return checkDecommission(decommissionBackendClause.getHostInfos());
}
/*
@ -206,15 +206,18 @@ public class SystemHandler extends AlterHandler {
* 2. after decommission, the remaining backend num should meet the replication num.
* 3. after decommission, The remaining space capacity can store data on decommissioned backends.
*/
public static List<Backend> checkDecommission(List<Pair<String, Integer>> hostPortPairs)
public static List<Backend> checkDecommission(List<HostInfo> hostInfos)
throws DdlException {
SystemInfoService infoService = Env.getCurrentSystemInfo();
List<Backend> decommissionBackends = Lists.newArrayList();
// check if exist
for (Pair<String, Integer> pair : hostPortPairs) {
Backend backend = infoService.getBackendWithHeartbeatPort(pair.first, pair.second);
for (HostInfo hostInfo : hostInfos) {
Backend backend = infoService.getBackendWithHeartbeatPort(hostInfo.getIp(), hostInfo.getHostName(),
hostInfo.getPort());
if (backend == null) {
throw new DdlException("Backend does not exist[" + pair.first + ":" + pair.second + "]");
throw new DdlException("Backend does not exist["
+ (Config.enable_fqdn_mode ? hostInfo.getHostName() : hostInfo.getIp())
+ ":" + hostInfo.getPort() + "]");
}
if (backend.isDecommissioned()) {
// already under decommission, ignore it
@ -232,22 +235,23 @@ public class SystemHandler extends AlterHandler {
@Override
public synchronized void cancel(CancelStmt stmt) throws DdlException {
CancelAlterSystemStmt cancelAlterSystemStmt = (CancelAlterSystemStmt) stmt;
cancelAlterSystemStmt.getHostPortPairs();
SystemInfoService infoService = Env.getCurrentSystemInfo();
// check if backends is under decommission
List<Backend> backends = Lists.newArrayList();
List<Pair<String, Integer>> hostPortPairs = cancelAlterSystemStmt.getHostPortPairs();
for (Pair<String, Integer> pair : hostPortPairs) {
List<HostInfo> hostInfos = cancelAlterSystemStmt.getHostInfos();
for (HostInfo hostInfo : hostInfos) {
// check if exist
Backend backend = infoService.getBackendWithHeartbeatPort(pair.first, pair.second);
Backend backend = infoService.getBackendWithHeartbeatPort(hostInfo.getIp(), hostInfo.getHostName(),
hostInfo.getPort());
if (backend == null) {
throw new DdlException("Backend does not exists[" + pair.first + "]");
throw new DdlException("Backend does not exist["
+ (Config.enable_fqdn_mode && hostInfo.getHostName() != null ? hostInfo.getHostName() :
hostInfo.getIp()) + ":" + hostInfo.getPort() + "]");
}
if (!backend.isDecommissioned()) {
// it's ok. just log
LOG.info("backend is not decommissioned[{}]", pair.first);
LOG.info("backend is not decommissioned[{}]", backend.getId());
continue;
}

View File

@ -19,19 +19,19 @@ package org.apache.doris.analysis;
import org.apache.doris.alter.AlterOpType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang.NotImplementedException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class BackendClause extends AlterClause {
protected List<String> hostPorts;
protected List<Pair<String, Integer>> hostPortPairs;
protected List<HostInfo> hostInfos;
public static final String MUTLI_TAG_DISABLED_MSG = "Not support multi tags for Backend now. "
+ "You can set 'enable_multi_tags=true' in fe.conf to enable this feature.";
@ -41,21 +41,20 @@ public class BackendClause extends AlterClause {
protected BackendClause(List<String> hostPorts) {
super(AlterOpType.ALTER_OTHER);
this.hostPorts = hostPorts;
this.hostPortPairs = new LinkedList<Pair<String, Integer>>();
this.hostInfos = Lists.newArrayList();
}
public List<Pair<String, Integer>> getHostPortPairs() {
return hostPortPairs;
public List<HostInfo> getHostInfos() {
return hostInfos;
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
for (String hostPort : hostPorts) {
Pair<String, Integer> pair = SystemInfoService.validateHostAndPort(hostPort);
hostPortPairs.add(pair);
HostInfo hostInfo = SystemInfoService.getIpHostAndPort(hostPort, true);
hostInfos.add(hostInfo);
}
Preconditions.checkState(!hostPortPairs.isEmpty());
Preconditions.checkState(!hostInfos.isEmpty());
}
@Override

View File

@ -18,36 +18,37 @@
package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Config;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.LinkedList;
import java.util.List;
public class CancelAlterSystemStmt extends CancelStmt {
protected List<String> hostPorts;
private List<Pair<String, Integer>> hostPortPairs;
private List<HostInfo> hostInfos;
public CancelAlterSystemStmt(List<String> hostPorts) {
this.hostPorts = hostPorts;
this.hostPortPairs = new LinkedList<Pair<String, Integer>>();
this.hostInfos = Lists.newArrayList();
}
public List<Pair<String, Integer>> getHostPortPairs() {
return hostPortPairs;
public List<HostInfo> getHostInfos() {
return hostInfos;
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
for (String hostPort : hostPorts) {
Pair<String, Integer> pair = SystemInfoService.validateHostAndPort(hostPort);
this.hostPortPairs.add(pair);
HostInfo hostInfo = SystemInfoService.getIpHostAndPort(hostPort,
!Config.enable_fqdn_mode);
this.hostInfos.add(hostInfo);
}
Preconditions.checkState(!this.hostPortPairs.isEmpty());
Preconditions.checkState(!this.hostInfos.isEmpty());
}
@Override

View File

@ -17,6 +17,13 @@
package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;
import com.google.common.base.Preconditions;
import java.util.List;
public class DropBackendClause extends BackendClause {
@ -36,6 +43,20 @@ public class DropBackendClause extends BackendClause {
return force;
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
if (Config.enable_fqdn_mode) {
for (String hostPort : hostPorts) {
HostInfo hostInfo = SystemInfoService.getIpHostAndPort(hostPort,
!Config.enable_fqdn_mode);
hostInfos.add(hostInfo);
}
Preconditions.checkState(!hostInfos.isEmpty());
} else {
super.analyze(analyzer);
}
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();

View File

@ -216,6 +216,7 @@ import org.apache.doris.statistics.StatisticsManager;
import org.apache.doris.statistics.StatisticsRepository;
import org.apache.doris.statistics.StatisticsTaskScheduler;
import org.apache.doris.system.Backend;
import org.apache.doris.system.FQDNManager;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.HeartbeatMgr;
import org.apache.doris.system.SystemInfoService;
@ -446,6 +447,8 @@ public class Env {
private ExternalMetaCacheMgr extMetaCacheMgr;
private FQDNManager fqdnManager;
public List<Frontend> getFrontends(FrontendNodeType nodeType) {
if (nodeType == null) {
// get all
@ -647,6 +650,7 @@ public class Env {
this.analysisJobScheduler = new AnalysisJobScheduler();
this.statisticsCache = new StatisticsCache();
this.extMetaCacheMgr = new ExternalMetaCacheMgr();
this.fqdnManager = new FQDNManager(systemInfo);
}
public static void destroyCheckpoint() {
@ -1431,6 +1435,9 @@ public class Env {
this.statisticsJobScheduler.start();
this.statisticsTaskScheduler.start();
new InternalSchemaInitializer().start();
if (Config.enable_fqdn_mode) {
fqdnManager.start();
}
}
// start threads that should running on all FE

View File

@ -1925,5 +1925,13 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_storage_policy = false;
/**
* This config is mainly used in the k8s cluster environment.
* When enable_fqdn_mode is true, the name of the pod where be is located will remain unchanged
* after reconstruction, while the ip can be changed.
*/
@ConfField(mutable = false, masterOnly = true)
public static boolean enable_fqdn_mode = false;
}

View File

@ -35,6 +35,7 @@ public class FeConstants {
public static int heartbeat_interval_second = 5;
public static int checkpoint_interval_second = 60; // 1 minutes
public static int ip_check_interval_second = 5;
// dpp version
public static String dpp_version = "3_2_0";

View File

@ -119,7 +119,11 @@ public class BackendsProcDir implements ProcDirInterface {
backendInfo.add(backend.getOwnerClusterName());
backendInfo.add(backend.getHost());
if (Strings.isNullOrEmpty(clusterName)) {
backendInfo.add(NetUtils.getHostnameByIp(backend.getHost()));
if (backend.getHostName() != null) {
backendInfo.add(backend.getHostName());
} else {
backendInfo.add(NetUtils.getHostnameByIp(backend.getHost()));
}
backendInfo.add(String.valueOf(backend.getHeartbeatPort()));
backendInfo.add(String.valueOf(backend.getBePort()));
backendInfo.add(String.valueOf(backend.getHttpPort()));

View File

@ -24,10 +24,12 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@ -552,7 +554,7 @@ public class DeployManager extends MasterDaemon {
env.dropFrontend(FrontendNodeType.OBSERVER, localIp, localPort);
break;
case BACKEND:
Env.getCurrentSystemInfo().dropBackend(localIp, localPort);
Env.getCurrentSystemInfo().dropBackend(localIp, null, localPort);
break;
default:
break;
@ -585,8 +587,12 @@ public class DeployManager extends MasterDaemon {
env.addFrontend(FrontendNodeType.OBSERVER, remoteIp, remotePort);
break;
case BACKEND:
List<Pair<String, Integer>> newBackends = Lists.newArrayList();
newBackends.add(Pair.of(remoteIp, remotePort));
List<HostInfo> newBackends = Lists.newArrayList();
String remoteHostName = NetUtils.getHostnameByIp(remoteIp);
if (remoteHostName.equals(remoteIp)) {
remoteHostName = null;
}
newBackends.add(new HostInfo(remoteIp, remoteHostName, remotePort));
Env.getCurrentSystemInfo().addBackends(newBackends, false);
break;
default:

View File

@ -20,12 +20,12 @@ package org.apache.doris.httpv2.rest;
import org.apache.doris.alter.SystemHandler;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@ -70,19 +70,18 @@ public class CheckDecommissionAction extends RestBaseController {
return ResponseEntityBuilder.badRequest("No host:port specified");
}
List<Pair<String, Integer>> hostPortPairs = Lists.newArrayList();
List<HostInfo> hostInfos = Lists.newArrayList();
for (String hostPort : hostPortArr) {
Pair<String, Integer> pair;
try {
pair = SystemInfoService.validateHostAndPort(hostPort);
HostInfo hostInfo = SystemInfoService.getIpHostAndPort(hostPort, true);
hostInfos.add(hostInfo);
} catch (AnalysisException e) {
return ResponseEntityBuilder.badRequest(e.getMessage());
}
hostPortPairs.add(pair);
}
try {
List<Backend> backends = SystemHandler.checkDecommission(hostPortPairs);
List<Backend> backends = SystemHandler.checkDecommission(hostInfos);
List<String> backendsList = backends.stream().map(b -> b.getHost() + ":"
+ b.getHeartbeatPort()).collect(Collectors.toList());
return ResponseEntityBuilder.ok(backendsList);

View File

@ -47,7 +47,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -70,7 +69,9 @@ public class Backend implements Writable {
@SerializedName("id")
private long id;
@SerializedName("host")
private String host;
private volatile String ip;
@SerializedName("hostName")
private String hostName;
private String version;
@SerializedName("heartbeatPort")
@ -142,7 +143,7 @@ public class Backend implements Writable {
private int heartbeatFailureCounter = 0;
public Backend() {
this.host = "";
this.ip = "";
this.version = "";
this.lastUpdateMs = 0;
this.lastStartTime = 0;
@ -160,9 +161,14 @@ public class Backend implements Writable {
this.tagMap.put(locationTag.type, locationTag.value);
}
public Backend(long id, String host, int heartbeatPort) {
public Backend(long id, String ip, int heartbeatPort) {
this(id, ip, null, heartbeatPort);
}
public Backend(long id, String ip, String hostName, int heartbeatPort) {
this.id = id;
this.host = host;
this.ip = ip;
this.hostName = hostName;
this.version = "";
this.heartbeatPort = heartbeatPort;
this.bePort = -1;
@ -186,7 +192,11 @@ public class Backend implements Writable {
}
public String getHost() {
return host;
return ip;
}
public String getHostName() {
return hostName;
}
public String getVersion() {
@ -278,6 +288,10 @@ public class Backend implements Writable {
this.backendState = state.ordinal();
}
public void setHost(String ip) {
this.ip = ip;
}
public void setAlive(boolean isAlive) {
this.isAlive.set(isAlive);
}
@ -350,15 +364,6 @@ public class Backend implements Writable {
return heartbeatFailureCounter;
}
/**
* backend belong to some cluster
*
* @return
*/
public boolean isUsedByCluster() {
return this.backendState == BackendState.using.ordinal();
}
/**
* backend is free, and it isn't belong to any cluster
*
@ -368,16 +373,6 @@ public class Backend implements Writable {
return this.backendState == BackendState.free.ordinal();
}
/**
* backend execute discommission in cluster , and backendState will be free
* finally
*
* @return
*/
public boolean isOffLineFromCluster() {
return this.backendState == BackendState.offline.ordinal();
}
public ImmutableMap<String, DiskInfo> getDisks() {
return this.disksRef;
}
@ -480,15 +475,6 @@ public class Backend implements Writable {
return exceedLimit;
}
public String getPathByPathHash(long pathHash) {
for (DiskInfo diskInfo : disksRef.values()) {
if (diskInfo.getPathHash() == pathHash) {
return diskInfo.getRootPath();
}
}
return null;
}
public void updateDisks(Map<String, TDisk> backendDisks) {
ImmutableMap<String, DiskInfo> disks = disksRef;
// The very first time to init the path info
@ -611,7 +597,7 @@ public class Backend implements Writable {
@Override
public int hashCode() {
return Objects.hash(id, host, heartbeatPort, bePort, isAlive);
return Objects.hash(id, ip, heartbeatPort, bePort, isAlive);
}
@Override
@ -625,13 +611,13 @@ public class Backend implements Writable {
Backend backend = (Backend) obj;
return (id == backend.id) && (host.equals(backend.host)) && (heartbeatPort == backend.heartbeatPort)
return (id == backend.id) && (ip.equals(backend.ip)) && (heartbeatPort == backend.heartbeatPort)
&& (bePort == backend.bePort) && (isAlive.get() == backend.isAlive.get());
}
@Override
public String toString() {
return "Backend [id=" + id + ", host=" + host + ", heartbeatPort=" + heartbeatPort + ", alive=" + isAlive.get()
return "Backend [id=" + id + ", host=" + ip + ", heartbeatPort=" + heartbeatPort + ", alive=" + isAlive.get()
+ ", tags: " + tagMap + "]";
}
@ -800,17 +786,4 @@ public class Backend implements Writable {
public String getTagMapString() {
return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + "}";
}
/**
* Get Tag by type, return Optional.empty if no such tag with given type
*
* @param type
* @return
*/
public Optional<Tag> getTagByType(String type) {
if (!tagMap.containsKey(type)) {
return Optional.empty();
}
return Optional.of(Tag.createNotCheck(type, tagMap.get(type)));
}
}

View File

@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.system;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class FQDNManager extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(FQDNManager.class);
public static final String UNKNOWN_HOST_IP = "unknown";
private SystemInfoService nodeMgr;
public FQDNManager(SystemInfoService nodeMgr) {
super("FQDN mgr", FeConstants.ip_check_interval_second * 1000L);
this.nodeMgr = nodeMgr;
}
/**
* At each round: check if ip of be has already been changed
*/
@Override
protected void runAfterCatalogReady() {
for (Backend be : nodeMgr.getIdToBackend().values()) {
if (be.getHostName() != null) {
try {
InetAddress inetAddress = InetAddress.getByName(be.getHostName());
if (!be.getHost().equalsIgnoreCase(inetAddress.getHostAddress())) {
String ip = be.getHost();
if (!ip.equalsIgnoreCase(UNKNOWN_HOST_IP)) {
ClientPool.backendPool.clearPool(new TNetworkAddress(ip, be.getBePort()));
}
be.setHost(inetAddress.getHostAddress());
Env.getCurrentEnv().getEditLog().logBackendStateChange(be);
LOG.warn("ip for {} of be has been changed from {} to {}", be.getHostName(), ip, be.getHost());
}
} catch (UnknownHostException e) {
LOG.warn("unknown host name for be, {}", be.getHostName(), e);
// add be alive check to make ip work when be is still alive and dns has some problem.
if (!be.isAlive() && !be.getHost().equalsIgnoreCase(UNKNOWN_HOST_IP)) {
String ip = be.getHost();
ClientPool.backendPool.clearPool(new TNetworkAddress(ip, be.getBePort()));
be.setHost(UNKNOWN_HOST_IP);
Env.getCurrentEnv().getEditLog().logBackendStateChange(be);
LOG.warn("ip for {} of be has been changed from {} to {}", be.getHostName(), ip, "unknown");
}
}
}
}
}
}

View File

@ -145,7 +145,6 @@ public class HeartbeatMgr extends MasterDaemon {
}
} catch (InterruptedException | ExecutionException e) {
LOG.warn("got exception when doing heartbeat", e);
continue;
}
} // end for all results

View File

@ -31,6 +31,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend.BackendState;
@ -74,14 +75,32 @@ public class SystemInfoService {
private volatile ImmutableMap<Long, Backend> idToBackendRef = ImmutableMap.of();
private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef = ImmutableMap.of();
// last backend id used by round robin for sequential selecting backends for replica creation
private Map<Tag, Long> lastBackendIdForReplicaCreation = Maps.newConcurrentMap();
private long lastBackendIdForCreation = -1;
private long lastBackendIdForOther = -1;
private volatile ImmutableMap<Long, DiskInfo> pathHashToDishInfoRef = ImmutableMap.of();
public static class HostInfo {
public String ip;
public String hostName;
public int port;
public HostInfo(String ip, String hostName, int port) {
this.ip = ip;
this.hostName = hostName;
this.port = port;
}
public String getIp() {
return ip;
}
public String getHostName() {
return hostName;
}
public int getPort() {
return port;
}
}
// sort host backends list by num of backends, descending
private static final Comparator<List<Backend>> hostBackendsListComparator = new Comparator<List<Backend>>() {
@Override
@ -95,27 +114,33 @@ public class SystemInfoService {
};
// for deploy manager
public void addBackends(List<Pair<String, Integer>> hostPortPairs, boolean isFree) throws UserException {
addBackends(hostPortPairs, isFree, "", Tag.DEFAULT_BACKEND_TAG.toMap());
public void addBackends(List<HostInfo> hostInfos, boolean isFree)
throws UserException {
addBackends(hostInfos, isFree, "", Tag.DEFAULT_BACKEND_TAG.toMap());
}
/**
* @param hostPortPairs : backend's host and port
* @param hostInfos : backend's ip, hostName and port
* @param isFree : if true the backend is not owned by any cluster
* @param destCluster : if not null or empty backend will be added to destCluster
* @throws DdlException
*/
public void addBackends(List<Pair<String, Integer>> hostPortPairs, boolean isFree, String destCluster,
public void addBackends(List<HostInfo> hostInfos, boolean isFree, String destCluster,
Map<String, String> tagMap) throws UserException {
for (Pair<String, Integer> pair : hostPortPairs) {
for (HostInfo hostInfo : hostInfos) {
if (Config.enable_fqdn_mode && hostInfo.getHostName() == null) {
throw new DdlException("backend's hostName should not be null while enable_fqdn_mode is true");
}
// check is already exist
if (getBackendWithHeartbeatPort(pair.first, pair.second) != null) {
throw new DdlException("Same backend already exists[" + pair.first + ":" + pair.second + "]");
if (getBackendWithHeartbeatPort(hostInfo.getIp(), hostInfo.getHostName(), hostInfo.getPort()) != null) {
String backendIdentifier = (Config.enable_fqdn_mode ? hostInfo.getHostName() : hostInfo.getIp()) + ":"
+ hostInfo.getPort();
throw new DdlException("Same backend already exists[" + backendIdentifier + "]");
}
}
for (Pair<String, Integer> pair : hostPortPairs) {
addBackend(pair.first, pair.second, isFree, destCluster, tagMap);
for (HostInfo hostInfo : hostInfos) {
addBackend(hostInfo.getIp(), hostInfo.getHostName(), hostInfo.getPort(), isFree, destCluster, tagMap);
}
}
@ -136,9 +161,9 @@ public class SystemInfoService {
}
// Final entry of adding backend
private void addBackend(String host, int heartbeatPort, boolean isFree, String destCluster,
private void addBackend(String ip, String hostName, int heartbeatPort, boolean isFree, String destCluster,
Map<String, String> tagMap) {
Backend newBackend = new Backend(Env.getCurrentEnv().getNextId(), host, heartbeatPort);
Backend newBackend = new Backend(Env.getCurrentEnv().getNextId(), ip, hostName, heartbeatPort);
// update idToBackend
Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
copiedBackends.put(newBackend.getId(), newBackend);
@ -172,16 +197,17 @@ public class SystemInfoService {
MetricRepo.generateBackendsTabletMetrics();
}
public void dropBackends(List<Pair<String, Integer>> hostPortPairs) throws DdlException {
for (Pair<String, Integer> pair : hostPortPairs) {
public void dropBackends(List<HostInfo> hostInfos) throws DdlException {
for (HostInfo hostInfo : hostInfos) {
// check is already exist
if (getBackendWithHeartbeatPort(pair.first, pair.second) == null) {
throw new DdlException("backend does not exists[" + pair.first + ":" + pair.second + "]");
if (getBackendWithHeartbeatPort(hostInfo.getIp(), hostInfo.getHostName(), hostInfo.getPort()) == null) {
String backendIdentifier = Config.enable_fqdn_mode && hostInfo.getHostName() != null
? hostInfo.getHostName() : hostInfo.getIp() + ":" + hostInfo.getPort();
throw new DdlException("backend does not exists[" + backendIdentifier + "]");
}
}
for (Pair<String, Integer> pair : hostPortPairs) {
dropBackend(pair.first, pair.second);
for (HostInfo hostInfo : hostInfos) {
dropBackend(hostInfo.getIp(), hostInfo.getHostName(), hostInfo.getPort());
}
}
@ -192,17 +218,16 @@ public class SystemInfoService {
throw new DdlException("Backend[" + backendId + "] does not exist");
}
dropBackend(backend.getHost(), backend.getHeartbeatPort());
dropBackend(backend.getHost(), backend.getHostName(), backend.getHeartbeatPort());
}
// final entry of dropping backend
public void dropBackend(String host, int heartbeatPort) throws DdlException {
if (getBackendWithHeartbeatPort(host, heartbeatPort) == null) {
throw new DdlException("backend does not exists[" + host + ":" + heartbeatPort + "]");
public void dropBackend(String ip, String hostName, int heartbeatPort) throws DdlException {
Backend droppedBackend = getBackendWithHeartbeatPort(ip, hostName, heartbeatPort);
if (droppedBackend == null) {
throw new DdlException("backend does not exists[" + (ip == null ? hostName : ip)
+ ":" + heartbeatPort + "]");
}
Backend droppedBackend = getBackendWithHeartbeatPort(host, heartbeatPort);
// update idToBackend
Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
copiedBackends.remove(droppedBackend.getId());
@ -274,10 +299,16 @@ public class SystemInfoService {
return true;
}
public Backend getBackendWithHeartbeatPort(String host, int heartPort) {
public Backend getBackendWithHeartbeatPort(String ip, String hostName, int heartPort) {
ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
for (Backend backend : idToBackend.values()) {
if (backend.getHost().equals(host) && backend.getHeartbeatPort() == heartPort) {
if (hostName != null) {
if (hostName.equals(backend.getHostName()) && backend.getHeartbeatPort() == heartPort) {
return backend;
}
}
if (backend.getHost().equals(ip) && backend.getHeartbeatPort() == heartPort) {
return backend;
}
}
@ -901,7 +932,8 @@ public class SystemInfoService {
this.idToReportVersionRef = null;
}
public static Pair<String, Integer> validateHostAndPort(String hostPort) throws AnalysisException {
public static HostInfo getIpHostAndPort(String hostPort, boolean strictCheck)
throws AnalysisException {
hostPort = hostPort.replaceAll("\\s+", "");
if (hostPort.isEmpty()) {
throw new AnalysisException("Invalid host port: " + hostPort);
@ -912,38 +944,50 @@ public class SystemInfoService {
throw new AnalysisException("Invalid host port: " + hostPort);
}
String host = pair[0];
if (Strings.isNullOrEmpty(host)) {
String hostName = pair[0];
String ip = hostName;
if (Strings.isNullOrEmpty(hostName)) {
throw new AnalysisException("Host is null");
}
int heartbeatPort = -1;
try {
// validate host
if (!InetAddressValidator.getInstance().isValid(host)) {
// maybe this is a hostname
// if no IP address for the host could be found, 'getByName'
// will throw
// UnknownHostException
InetAddress inetAddress = InetAddress.getByName(host);
host = inetAddress.getHostAddress();
}
// validate port
heartbeatPort = Integer.parseInt(pair[1]);
if (heartbeatPort <= 0 || heartbeatPort >= 65536) {
throw new AnalysisException("Port is out of range: " + heartbeatPort);
}
return Pair.of(host, heartbeatPort);
// validate host
if (!InetAddressValidator.getInstance().isValid(ip)) {
// maybe this is a hostname
// if no IP address for the host could be found, 'getByName'
// will throw UnknownHostException
InetAddress inetAddress = InetAddress.getByName(hostName);
ip = inetAddress.getHostAddress();
} else {
hostName = NetUtils.getHostnameByIp(ip);
if (hostName.equals(ip)) {
hostName = null;
}
}
return new HostInfo(ip, hostName, heartbeatPort);
} catch (UnknownHostException e) {
if (!strictCheck) {
return new HostInfo(null, hostName, heartbeatPort);
}
throw new AnalysisException("Unknown host: " + e.getMessage());
} catch (Exception e) {
throw new AnalysisException("Encounter unknown exception: " + e.getMessage());
}
}
public static Pair<String, Integer> validateHostAndPort(String hostPort) throws AnalysisException {
HostInfo hostInfo = getIpHostAndPort(hostPort, true);
return Pair.of(hostInfo.getIp(), hostInfo.getPort());
}
public void replayAddBackend(Backend newBackend) {
// update idToBackend
Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
@ -996,25 +1040,25 @@ public class SystemInfoService {
public void updateBackendState(Backend be) {
long id = be.getId();
Backend memoryBe = getBackend(id);
if (memoryBe == null) {
// backend may already be dropped. this may happen when
// 1. SystemHandler drop the decommission backend
// 2. at same time, user try to cancel the decommission of that backend.
// These two operations do not guarantee the order.
return;
// backend may already be dropped. this may happen when
// drop and modify operations do not guarantee the order.
if (memoryBe != null) {
if (be.getHostName() != null && !be.getHost().equalsIgnoreCase(memoryBe.getHost())) {
memoryBe.setHost(be.getHost());
}
memoryBe.setBePort(be.getBePort());
memoryBe.setAlive(be.isAlive());
memoryBe.setDecommissioned(be.isDecommissioned());
memoryBe.setHttpPort(be.getHttpPort());
memoryBe.setBeRpcPort(be.getBeRpcPort());
memoryBe.setBrpcPort(be.getBrpcPort());
memoryBe.setLastUpdateMs(be.getLastUpdateMs());
memoryBe.setLastStartTime(be.getLastStartTime());
memoryBe.setDisks(be.getDisks());
memoryBe.setBackendState(be.getBackendState());
memoryBe.setOwnerClusterName(be.getOwnerClusterName());
memoryBe.setDecommissionType(be.getDecommissionType());
}
memoryBe.setBePort(be.getBePort());
memoryBe.setAlive(be.isAlive());
memoryBe.setDecommissioned(be.isDecommissioned());
memoryBe.setHttpPort(be.getHttpPort());
memoryBe.setBeRpcPort(be.getBeRpcPort());
memoryBe.setBrpcPort(be.getBrpcPort());
memoryBe.setLastUpdateMs(be.getLastUpdateMs());
memoryBe.setLastStartTime(be.getLastStartTime());
memoryBe.setDisks(be.getDisks());
memoryBe.setBackendState(be.getBackendState());
memoryBe.setOwnerClusterName(be.getOwnerClusterName());
memoryBe.setDecommissionType(be.getDecommissionType());
}
private long getClusterAvailableCapacityB(String clusterName) {
@ -1110,12 +1154,13 @@ public class SystemInfoService {
}
public void modifyBackends(ModifyBackendClause alterClause) throws UserException {
List<Pair<String, Integer>> hostPortPairs = alterClause.getHostPortPairs();
List<HostInfo> hostInfos = alterClause.getHostInfos();
List<Backend> backends = Lists.newArrayList();
for (Pair<String, Integer> pair : hostPortPairs) {
Backend be = getBackendWithHeartbeatPort(pair.first, pair.second);
for (HostInfo hostInfo : hostInfos) {
Backend be = getBackendWithHeartbeatPort(hostInfo.getIp(), hostInfo.getHostName(), hostInfo.getPort());
if (be == null) {
throw new DdlException("backend does not exists[" + pair.first + ":" + pair.second + "]");
throw new DdlException("backend does not exists[" + (Config.enable_fqdn_mode && hostInfo.getHostName()
!= null ? hostInfo.getHostName() : hostInfo.getIp()) + ":" + hostInfo.getPort() + "]");
}
backends.add(be);
}

View File

@ -218,19 +218,19 @@ public class SystemInfoServiceTest {
AddBackendClause stmt = new AddBackendClause(Lists.newArrayList("192.168.0.1:1234"));
stmt.analyze(analyzer);
try {
Env.getCurrentSystemInfo().addBackends(stmt.getHostPortPairs(), true);
Env.getCurrentSystemInfo().addBackends(stmt.getHostInfos(), true);
} catch (DdlException e) {
Assert.fail();
}
try {
Env.getCurrentSystemInfo().addBackends(stmt.getHostPortPairs(), true);
Env.getCurrentSystemInfo().addBackends(stmt.getHostInfos(), true);
} catch (DdlException e) {
Assert.assertTrue(e.getMessage().contains("already exists"));
}
Assert.assertNotNull(Env.getCurrentSystemInfo().getBackend(backendId));
Assert.assertNotNull(Env.getCurrentSystemInfo().getBackendWithHeartbeatPort("192.168.0.1", 1234));
Assert.assertNotNull(Env.getCurrentSystemInfo().getBackendWithHeartbeatPort("192.168.0.1", null, 1234));
Assert.assertTrue(Env.getCurrentSystemInfo().getBackendIds(false).size() == 1);
Assert.assertTrue(Env.getCurrentSystemInfo().getBackendIds(false).get(0) == backendId);
@ -247,7 +247,7 @@ public class SystemInfoServiceTest {
AddBackendClause stmt = new AddBackendClause(Lists.newArrayList("192.168.0.1:1234"));
stmt.analyze(analyzer);
try {
Env.getCurrentSystemInfo().addBackends(stmt.getHostPortPairs(), true);
Env.getCurrentSystemInfo().addBackends(stmt.getHostInfos(), true);
} catch (DdlException e) {
e.printStackTrace();
}
@ -255,14 +255,14 @@ public class SystemInfoServiceTest {
DropBackendClause dropStmt = new DropBackendClause(Lists.newArrayList("192.168.0.1:1234"));
dropStmt.analyze(analyzer);
try {
Env.getCurrentSystemInfo().dropBackends(dropStmt.getHostPortPairs());
Env.getCurrentSystemInfo().dropBackends(dropStmt.getHostInfos());
} catch (DdlException e) {
e.printStackTrace();
Assert.fail();
}
try {
Env.getCurrentSystemInfo().dropBackends(dropStmt.getHostPortPairs());
Env.getCurrentSystemInfo().dropBackends(dropStmt.getHostInfos());
} catch (DdlException e) {
Assert.assertTrue(e.getMessage().contains("does not exist"));
}

View File

@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.system;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class FQDNManagerTest {
@Mocked
private InetAddress inetAddress;
@Mocked
private Env env;
private FQDNManager fdqnManager;
private SystemInfoService systemInfoService;
@Before
public void setUp() throws UnknownHostException {
new MockUp<InetAddress>(InetAddress.class) {
@Mock
public InetAddress getByName(String hostName) {
return inetAddress;
}
};
new MockUp<Env>(Env.class) {
@Mock
public Env getServingEnv() {
return env;
}
};
new Expectations() {
{
env.isReady();
minTimes = 0;
result = true;
inetAddress.getHostAddress();
minTimes = 0;
result = "193.88.67.99";
}
};
Config.enable_fqdn_mode = true;
systemInfoService = new SystemInfoService();
systemInfoService.addBackend(new Backend(1, "193.88.67.98", "doris.test.domain", 9090));
fdqnManager = new FQDNManager(systemInfoService);
}
@Test
public void testBackendIpChanged() throws InterruptedException {
Assert.assertEquals("193.88.67.98", systemInfoService.getBackend(1).getHost());
fdqnManager.start();
Thread.sleep(1000);
Assert.assertEquals("193.88.67.99", systemInfoService.getBackend(1).getHost());
fdqnManager.exit();
}
}