[Enhance](ComputeNode)support k8s watch (#17442)

Describe your changes.

1.Add the watch mechanism to listen for changes in k8s statefulSet and update nodes in time.
2.For broker, there is only one name by default when using deployManager
3.Refactoring code makes it easier to understand and maintain
4.Fix jar package conflicts between okhttp-ws and okhttp

Previously, the logic of k8sDeployManager.getGroupHostInfos was to call the endpoints () interface of k8s,
which would cause if the pod was unexpectedly restarted, k8sDeployManager would delete the pod before the
restart from the fe or be list and add the pod after the restart to the fe or be list, which obviously does not
meet our expectations.
Now, after fqdn is enabled, we call the statefulSets() interface of k8s to listen for the number of copies to
determine whether we need to be online or offline.
In addition, the watch mechanism is added to avoid the possible A-B-A problem caused by timed polling.
For the sake of stability, when the watch mechanism does not receive messages for a period of time,
it will be degraded to the polling mode.

Now several environment variables have been added,ENV_FE_STATEFULSET,ENV_FE_OBSERVER_STATEFULSET,ENV_BE_STATEFULSET,ENV_BROKER_STATEFULSET,ENV_CN_STATEFULSET For statefulsetName,One-to-one correspondence with ENV_FE_SERVICE,ENV_FE_OBSERVER_SERVICE,ENV_BE_SERVICE,ENV_BROKER_SERVICE,ENV_CN_SERVICE,If a serviceName is configured, the corresponding statefulsetName must be configured, otherwise the program cannot be started.
This commit is contained in:
zhangdong
2023-03-20 11:36:32 +08:00
committed by GitHub
parent 06095054ab
commit 93cfd5cd2b
7 changed files with 596 additions and 488 deletions

View File

@ -306,11 +306,6 @@ under the License.
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttp-ws -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp-ws</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.squareup.okio/okio -->
<dependency>
<groupId>com.squareup.okio</groupId>

View File

@ -1403,6 +1403,7 @@ public class Env {
if (!Config.enable_deploy_manager.equalsIgnoreCase("disable")) {
LOG.info("deploy manager {} start", deployManager.getName());
deployManager.start();
deployManager.startListener();
}
// start routine load scheduler
routineLoadScheduler.start();

View File

@ -31,19 +31,19 @@ import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
/*
* This deploy manager is to support Kubernetes, Ambari or other system for automating deployment.
@ -108,6 +108,8 @@ public class DeployManager extends MasterDaemon {
public static final String ENV_FE_EXIST_ENDPOINT = "FE_EXIST_ENDPOINT";
public static final String ENV_FE_INIT_NUMBER = "FE_INIT_NUMBER";
// we arbitrarily set all broker name as what ENV_BROKER_NAME specified.
public static final String ENV_BROKER_NAME = "BROKER_NAME";
public enum NodeType {
ELECTABLE, OBSERVER, BACKEND, BROKER, BACKEND_CN
@ -115,18 +117,6 @@ public class DeployManager extends MasterDaemon {
protected Env env;
protected String electableFeServiceGroup;
protected String observerFeServiceGroup;
protected String backendServiceGroup;
protected String brokerServiceGroup;
protected String cnServiceGroup;
protected boolean hasElectableService = false;
protected boolean hasBackendService = false;
protected boolean hasObserverService = false;
protected boolean hasBrokerService = false;
protected boolean hasCnService = false;
// Host identifier -> missing counter
// eg:
// In k8s, when a node is down, the endpoint may be disappear immediately in service group.
@ -136,12 +126,32 @@ public class DeployManager extends MasterDaemon {
// So we use this map to count the continuous detected down times, if the continuous down time is more
// then MAX_MISSING_TIME, we considered this node as down permanently.
protected Map<String, Integer> counterMap = Maps.newHashMap();
// k8s pod delete and will recreate, so we need to wait for a while,otherwise we will drop node by mistake
protected static final Integer MAX_MISSING_TIME = 60;
protected Integer maxMissingTime = 5;
//if 'true',Actively pull node information from external systems.
//if 'false',The external system actively pushes the node change information,
// and only needs to listen to 'nodeChangeQueue'
protected boolean listenRequired;
protected BlockingQueue<Event> nodeChangeQueue;
protected Map<NodeType, NodeTypeAttr> nodeTypeAttrMap = Maps.newHashMap();
private boolean isRunning;
public DeployManager(Env env, long intervalMs) {
this(env, intervalMs, false);
}
public DeployManager(Env env, long intervalMs, boolean listenRequired) {
super("deployManager", intervalMs);
this.env = env;
this.listenRequired = listenRequired;
this.isRunning = false;
if (listenRequired) {
this.maxMissingTime = 0;
this.nodeChangeQueue = Queues.newLinkedBlockingDeque();
}
// init NodeTypeAttr for each NodeType,so when get NodeTypeAttr by NodeType,we assume not null
for (NodeType nodeType : NodeType.values()) {
nodeTypeAttrMap.put(nodeType, new NodeTypeAttr(false));
}
}
// Init all environment variables.
@ -150,11 +160,11 @@ public class DeployManager extends MasterDaemon {
protected void initEnvVariables(String envElectableFeServiceGroup, String envObserverFeServiceGroup,
String envBackendServiceGroup, String envBrokerServiceGroup, String envCnServiceGroup) {
this.electableFeServiceGroup = Strings.nullToEmpty(System.getenv(envElectableFeServiceGroup));
this.observerFeServiceGroup = Strings.nullToEmpty(System.getenv(envObserverFeServiceGroup));
this.backendServiceGroup = Strings.nullToEmpty(System.getenv(envBackendServiceGroup));
this.brokerServiceGroup = Strings.nullToEmpty(System.getenv(envBrokerServiceGroup));
this.cnServiceGroup = Strings.nullToEmpty(System.getenv(envCnServiceGroup));
String electableFeServiceGroup = Strings.nullToEmpty(System.getenv(envElectableFeServiceGroup));
String observerFeServiceGroup = Strings.nullToEmpty(System.getenv(envObserverFeServiceGroup));
String backendServiceGroup = Strings.nullToEmpty(System.getenv(envBackendServiceGroup));
String brokerServiceGroup = Strings.nullToEmpty(System.getenv(envBrokerServiceGroup));
String cnServiceGroup = Strings.nullToEmpty(System.getenv(envCnServiceGroup));
LOG.info("get deploy env: {}, {}, {}, {}, {}", envElectableFeServiceGroup, envObserverFeServiceGroup,
envBackendServiceGroup, envBrokerServiceGroup, envCnServiceGroup);
@ -162,31 +172,36 @@ public class DeployManager extends MasterDaemon {
// check if we have electable service
if (!Strings.isNullOrEmpty(electableFeServiceGroup)) {
LOG.info("Electable service group is found");
hasElectableService = true;
}
// check if we have backend service
if (!Strings.isNullOrEmpty(backendServiceGroup)) {
LOG.info("Backend service group is found");
hasBackendService = true;
nodeTypeAttrMap.get(NodeType.ELECTABLE).setHasService(true);
nodeTypeAttrMap.get(NodeType.ELECTABLE).setServiceName(electableFeServiceGroup);
}
// check if we have observer service
if (!Strings.isNullOrEmpty(observerFeServiceGroup)) {
LOG.info("Observer service group is found");
hasObserverService = true;
nodeTypeAttrMap.get(NodeType.OBSERVER).setHasService(true);
nodeTypeAttrMap.get(NodeType.OBSERVER).setServiceName(observerFeServiceGroup);
}
// check if we have backend service
if (!Strings.isNullOrEmpty(backendServiceGroup)) {
LOG.info("Backend service group is found");
nodeTypeAttrMap.get(NodeType.BACKEND).setHasService(true);
nodeTypeAttrMap.get(NodeType.BACKEND).setServiceName(backendServiceGroup);
}
// check if we have broker service
if (!Strings.isNullOrEmpty(brokerServiceGroup)) {
LOG.info("Broker service group is found");
hasBrokerService = true;
nodeTypeAttrMap.get(NodeType.BROKER).setHasService(true);
nodeTypeAttrMap.get(NodeType.BROKER).setServiceName(brokerServiceGroup);
}
// check if we have cn service
if (!Strings.isNullOrEmpty(cnServiceGroup)) {
LOG.info("Cn service group is found");
hasCnService = true;
nodeTypeAttrMap.get(NodeType.BACKEND_CN).setHasService(true);
nodeTypeAttrMap.get(NodeType.BACKEND_CN).setServiceName(cnServiceGroup);
}
LOG.info("get electableFeServiceGroup: {}, observerFeServiceGroup: {}, backendServiceGroup: {}"
@ -195,6 +210,16 @@ public class DeployManager extends MasterDaemon {
cnServiceGroup);
}
public void startListener() {
if (listenRequired) {
startListenerInternal();
}
}
public void startListenerInternal() {
throw new NotImplementedException();
}
// Call init before each runOneCycle
// Default is do nothing. Can be override in derived class
// return false if init failed.
@ -202,41 +227,20 @@ public class DeployManager extends MasterDaemon {
return true;
}
// get electable fe
protected List<SystemInfoService.HostInfo> getElectableGroupHostInfos() {
Preconditions.checkState(!Strings.isNullOrEmpty(electableFeServiceGroup));
return getGroupHostInfos(electableFeServiceGroup);
}
// get observer fe
protected List<SystemInfoService.HostInfo> getObserverGroupHostInfos() {
Preconditions.checkState(!Strings.isNullOrEmpty(observerFeServiceGroup));
return getGroupHostInfos(observerFeServiceGroup);
}
// get backend
protected List<SystemInfoService.HostInfo> getBackendGroupHostInfos() {
Preconditions.checkState(!Strings.isNullOrEmpty(backendServiceGroup));
return getGroupHostInfos(backendServiceGroup);
}
// get cn
protected List<SystemInfoService.HostInfo> getCnGroupHostInfos() {
Preconditions.checkState(!Strings.isNullOrEmpty(cnServiceGroup));
return getGroupHostInfos(cnServiceGroup);
}
// Get all host port pairs from specified group.
// Must implement in derived class.
// If encounter errors, return null
protected List<SystemInfoService.HostInfo> getGroupHostInfos(String groupName) {
protected List<HostInfo> getGroupHostInfos(NodeType nodeType) {
throw new NotImplementedException();
}
// get broker
// return (broker name -> list of broker host port)
protected Map<String, List<SystemInfoService.HostInfo>> getBrokerGroupHostInfos() {
throw new NotImplementedException();
protected String getBrokerName() {
String brokerName = System.getenv(ENV_BROKER_NAME);
if (Strings.isNullOrEmpty(brokerName)) {
LOG.error("failed to get broker name from env: {}", ENV_BROKER_NAME);
System.exit(-1);
}
return brokerName;
}
public List<HostInfo> getHelperNodes() {
@ -284,16 +288,19 @@ public class DeployManager extends MasterDaemon {
// 2. get electable fe host from remote
boolean ok = true;
List<SystemInfoService.HostInfo> feHostInfos = null;
List<HostInfo> feHostInfos = null;
while (true) {
try {
feHostInfos = getElectableGroupHostInfos();
feHostInfos = getGroupHostInfos(NodeType.ELECTABLE);
if (feHostInfos == null) {
ok = false;
} else if (feHostInfos.size() != numOfFe) {
LOG.error("num of fe get from remote [{}] does not equal to the expected num: {}",
feHostInfos, numOfFe);
ok = false;
} else if (!checkIpIfNotNull(feHostInfos)) {
LOG.error("some fe not ready,need wait.");
ok = false;
} else {
ok = true;
}
@ -326,6 +333,15 @@ public class DeployManager extends MasterDaemon {
feHostInfos.get(0).getPort()));
}
private boolean checkIpIfNotNull(List<HostInfo> hostInfos) {
for (HostInfo hostInfo : hostInfos) {
if (hostInfo.getIp() == null) {
return false;
}
}
return true;
}
@Override
protected void runAfterCatalogReady() {
if (Config.enable_deploy_manager.equals("disable")) {
@ -333,328 +349,232 @@ public class DeployManager extends MasterDaemon {
exit();
return;
}
// 0. init
if (!init()) {
return;
}
// 1. Check the electable fe service group
if (hasElectableService) {
List<SystemInfoService.HostInfo> remoteElectableFeHosts = getElectableGroupHostInfos();
if (remoteElectableFeHosts == null) {
return;
}
LOG.debug("get electable fe hosts {} from electable fe service group: {}",
remoteElectableFeHosts, electableFeServiceGroup);
if (remoteElectableFeHosts.isEmpty()) {
LOG.error("electable fe service group {} is empty, which should not happen", electableFeServiceGroup);
return;
}
// 1.1 Check if self is in electable fe service group
SystemInfoService.HostInfo selfHostInfo = getFromHostInfos(remoteElectableFeHosts,
new SystemInfoService.HostInfo(env.getMasterIp(), env.getMasterHostName(), Config.edit_log_port));
if (selfHostInfo == null) {
// The running of this deploy manager means this node is considered self as Master.
// If it self does not exist in electable fe service group, it should shut it self down.
LOG.warn("self host {} is not in electable fe service group {}. Exit now.",
env.getMasterIp(), electableFeServiceGroup);
System.exit(-1);
}
// 1.2 Check the change of electable fe service group
List<Frontend> localElectableFeAddrs = env.getFrontends(FrontendNodeType.FOLLOWER);
List<SystemInfoService.HostInfo> localElectableFeHostInfos = this
.convertFesToHostInfos(localElectableFeAddrs);
LOG.debug("get local electable hosts: {}", localElectableFeHostInfos);
if (inspectNodeChange(remoteElectableFeHosts, localElectableFeHostInfos, NodeType.ELECTABLE)) {
return;
}
if (isRunning) {
LOG.warn("Last task not finished,ignore current task.");
return;
}
isRunning = true;
// 2. Check the backend service group
if (hasBackendService) {
BE_BLOCK: {
List<SystemInfoService.HostInfo> remoteBackendHosts = getBackendGroupHostInfos();
if (remoteBackendHosts == null) {
break BE_BLOCK;
}
LOG.debug("get remote backend hosts: {}", remoteBackendHosts);
List<Backend> localBackends = Env.getCurrentSystemInfo()
.getClusterMixBackends(SystemInfoService.DEFAULT_CLUSTER);
List<SystemInfoService.HostInfo> localBackendHostInfos = this.convertBesToHostInfos(localBackends);
LOG.debug("get local backend addrs: {}", localBackendHostInfos);
if (inspectNodeChange(remoteBackendHosts, localBackendHostInfos, NodeType.BACKEND)) {
return;
}
}
if (listenRequired && processQueue()) {
isRunning = false;
return;
}
processPolling();
isRunning = false;
}
// 3. Check the cn service group
if (hasCnService) {
CN_BLOCK: {
List<SystemInfoService.HostInfo> remoteCnHosts = getCnGroupHostInfos();
if (remoteCnHosts == null) {
break CN_BLOCK;
}
LOG.debug("get remote cn hosts: {}", remoteCnHosts);
List<Backend> localCns = Env.getCurrentSystemInfo()
.getClusterCnBackends(SystemInfoService.DEFAULT_CLUSTER);
List<SystemInfoService.HostInfo> localCnHostInfos = this.convertBesToHostInfos(localCns);
LOG.debug("get local cn addrs: {}", localCnHostInfos);
if (inspectNodeChange(remoteCnHosts, localCnHostInfos, NodeType.BACKEND_CN)) {
return;
}
private void processPolling() {
for (NodeType nodeType : NodeType.values()) {
NodeTypeAttr nodeTypeAttr = nodeTypeAttrMap.get(nodeType);
if (!nodeTypeAttr.hasService) {
continue;
}
}
if (hasObserverService) {
OB_BLOCK: {
// 3. Check the observer fe service group
List<SystemInfoService.HostInfo> remoteObserverFeHosts = getObserverGroupHostInfos();
if (remoteObserverFeHosts == null) {
break OB_BLOCK;
}
LOG.debug("get remote observer fe hosts: {}", remoteObserverFeHosts);
List<Frontend> localObserverFeAddrs = env.getFrontends(FrontendNodeType.OBSERVER);
List<SystemInfoService.HostInfo> localObserverFeHosts = this
.convertFesToHostInfos(localObserverFeAddrs);
LOG.debug("get local observer fe hosts: {}", localObserverFeHosts);
if (inspectNodeChange(remoteObserverFeHosts, localObserverFeHosts, NodeType.OBSERVER)) {
return;
}
}
}
if (hasBrokerService) {
BROKER_BLOCK: {
// 4. Check the broker service group
Map<String, List<SystemInfoService.HostInfo>> remoteBrokerHosts = getBrokerGroupHostInfos();
if (remoteBrokerHosts == null) {
break BROKER_BLOCK;
}
Map<String, List<FsBroker>> localBrokers = env.getBrokerMgr().getBrokerListMap();
// 1. find missing brokers
for (Map.Entry<String, List<FsBroker>> entry : localBrokers.entrySet()) {
String brokerName = entry.getKey();
if (remoteBrokerHosts.containsKey(brokerName)) {
List<FsBroker> localList = entry.getValue();
List<SystemInfoService.HostInfo> remoteList = remoteBrokerHosts.get(brokerName);
// 1.1 found missing broker host
for (FsBroker addr : localList) {
SystemInfoService.HostInfo foundHost = getFromHostInfos(remoteList,
new SystemInfoService.HostInfo(addr.ip, null, addr.port));
if (foundHost == null) {
List<Pair<String, Integer>> list = Lists.newArrayList();
list.add(Pair.of(addr.ip, addr.port));
try {
env.getBrokerMgr().dropBrokers(brokerName, list);
LOG.info("drop broker {}:{} with name: {}",
addr.ip, addr.port, brokerName);
} catch (DdlException e) {
LOG.warn("failed to drop broker {}:{} with name: {}",
addr.ip, addr.port, brokerName, e);
continue;
}
}
}
// 1.2 add new broker host
for (SystemInfoService.HostInfo pair : remoteList) {
FsBroker foundAddr = getHostFromBrokerAddrs(localList, pair.getIp(), pair.getPort());
if (foundAddr == null) {
// add new broker
List<Pair<String, Integer>> list = Lists.newArrayList();
list.add(Pair.of(pair.getIp(), pair.getPort()));
try {
env.getBrokerMgr().addBrokers(brokerName, list);
LOG.info("add broker {}:{} with name {}", pair.getIp(), pair.getPort(), brokerName);
} catch (DdlException e) {
LOG.warn("failed to add broker {}:{} with name {}",
pair.getIp(), pair.getPort(), brokerName);
continue;
}
}
}
} else {
// broker with this name does not exist in remote. drop all
try {
env.getBrokerMgr().dropAllBroker(brokerName);
LOG.info("drop all brokers with name: {}", brokerName);
} catch (DdlException e) {
LOG.warn("failed to drop all brokers with name: {}", brokerName, e);
continue;
}
}
} // end for
// 2. add new brokers
for (Map.Entry<String, List<SystemInfoService.HostInfo>> entry : remoteBrokerHosts.entrySet()) {
String remoteBrokerName = entry.getKey();
if (!localBrokers.containsKey(remoteBrokerName)) {
// add new brokers
try {
env.getBrokerMgr()
.addBrokers(remoteBrokerName, convertHostInfosToIpPortPair(entry.getValue()));
LOG.info("add brokers {} with name {}", entry.getValue(), remoteBrokerName);
} catch (DdlException e) {
LOG.info("failed to add brokers {} with name {}",
entry.getValue(), remoteBrokerName, e);
continue;
}
}
}
} // end of BROKER BLOCK
List<HostInfo> remoteHosts = getGroupHostInfos(nodeType);
LOG.debug("get serviceName: {},remoteHosts: {}", nodeTypeAttr.getServiceName(), remoteHosts);
process(nodeType, remoteHosts);
}
}
private FsBroker getHostFromBrokerAddrs(List<FsBroker> addrList,
String ip, Integer port) {
for (FsBroker brokerAddress : addrList) {
if (brokerAddress.ip.equals(ip) && brokerAddress.port == port) {
return brokerAddress;
}
private boolean processQueue() {
Event event = nodeChangeQueue.poll();
if (event == null) {
return false;
}
process(event.getNodeType(), event.getHostInfos());
return true;
}
private void process(NodeType nodeType, List<HostInfo> remoteHosts) {
if (remoteHosts == null) {
return;
}
if (nodeType == NodeType.ELECTABLE && remoteHosts.isEmpty()) {
LOG.warn("electable fe service is empty, which should not happen");
return;
}
List<HostInfo> localHosts = getLocalHosts(nodeType);
inspectNodeChange(remoteHosts, localHosts, nodeType);
}
private List<HostInfo> getLocalHosts(NodeType nodeType) {
switch (nodeType) {
case ELECTABLE:
List<Frontend> localElectableFeAddrs = env.getFrontends(FrontendNodeType.FOLLOWER);
return this
.convertFesToHostInfos(localElectableFeAddrs);
case OBSERVER:
List<Frontend> localObserverFeAddrs = env.getFrontends(FrontendNodeType.OBSERVER);
return this
.convertFesToHostInfos(localObserverFeAddrs);
case BACKEND:
List<Backend> localBackends = Env.getCurrentSystemInfo()
.getClusterMixBackends(SystemInfoService.DEFAULT_CLUSTER);
return this.convertBesToHostInfos(localBackends);
case BACKEND_CN:
List<Backend> localCns = Env.getCurrentSystemInfo()
.getClusterCnBackends(SystemInfoService.DEFAULT_CLUSTER);
return this.convertBesToHostInfos(localCns);
case BROKER:
List<FsBroker> localBrokers = env.getBrokerMgr().getBrokerListMap().get(getBrokerName());
if (localBrokers == null) {
localBrokers = Lists.newArrayList();
}
return convertBrokersToHostInfos(localBrokers);
default:
break;
}
return null;
}
private boolean needDrop(boolean found, HostInfo localHostInfo) {
if (found) {
if (counterMap.containsKey(localHostInfo.getIdent())) {
counterMap.remove(localHostInfo.getIdent());
}
return false;
} else {
if (maxMissingTime <= 0) {
return true;
}
// Check the detected downtime
if (!counterMap.containsKey(localHostInfo.getIdent())) {
// First detected downtime. Add to the map and ignore
LOG.warn("downtime node: {} detected times: 1",
localHostInfo);
counterMap.put(localHostInfo.getIdent(), 1);
return false;
} else {
int times = counterMap.get(localHostInfo.getIdent());
if (times < maxMissingTime) {
LOG.warn("downtime node: {} detected times: {}",
localHostInfo, times + 1);
counterMap.put(localHostInfo.getIdent(), times + 1);
return false;
} else {
// Reset the counter map and do the dropping operation
LOG.warn("downtime node: {} detected times: {}. drop it",
localHostInfo, times + 1);
counterMap.remove(localHostInfo.getIdent());
return true;
}
}
}
}
/*
* Inspect the node change.
* 1. Check if there are some nodes need to be dropped.
* 2. Check if there are some nodes need to be added.
*
* We only handle one change at a time.
* Return true if something changed
*/
private boolean inspectNodeChange(List<SystemInfoService.HostInfo> remoteHostInfos,
List<SystemInfoService.HostInfo> localHostInfos,
private void inspectNodeChange(List<HostInfo> remoteHostInfos,
List<HostInfo> localHostInfos,
NodeType nodeType) {
// 2.1 Find local node which need to be dropped.
for (SystemInfoService.HostInfo localHostInfo : localHostInfos) {
String localIp = localHostInfo.getIp();
Integer localPort = localHostInfo.getPort();
String localHostName = localHostInfo.getHostName();
SystemInfoService.HostInfo foundHostInfo = getFromHostInfos(remoteHostInfos, localHostInfo);
if (foundHostInfo != null) {
if (counterMap.containsKey(localHostInfo.getIdent())) {
counterMap.remove(localHostInfo.getIdent());
}
} else {
// Double check if is it self
if (isSelf(localHostInfo)) {
// This is it self. Shut down now.
LOG.error("self host {}:{} does not exist in remote hosts. Showdown.");
System.exit(-1);
}
// Check the detected downtime
if (!counterMap.containsKey(localHostInfo.getIdent())) {
// First detected downtime. Add to the map and ignore
LOG.warn("downtime of {} node: {} detected times: 1",
nodeType.name(), localHostInfo);
counterMap.put(localHostInfo.getIdent(), 1);
return false;
} else {
int times = counterMap.get(localHostInfo.getIdent());
if (times < MAX_MISSING_TIME) {
LOG.warn("downtime of {} node: {} detected times: {}",
nodeType.name(), localHostInfo, times + 1);
counterMap.put(localHostInfo.getIdent(), times + 1);
return false;
} else {
// Reset the counter map and do the dropping operation
LOG.warn("downtime of {} node: {} detected times: {}. drop it",
nodeType.name(), localHostInfo, times + 1);
counterMap.remove(localHostInfo.getIdent());
}
}
// Can not find local host from remote host list,
// which means this node should be dropped.
try {
switch (nodeType) {
case ELECTABLE:
env.dropFrontend(FrontendNodeType.FOLLOWER, localIp, localHostName, localPort);
break;
case OBSERVER:
env.dropFrontend(FrontendNodeType.OBSERVER, localIp, localHostName, localPort);
break;
case BACKEND:
case BACKEND_CN:
Env.getCurrentSystemInfo().dropBackend(localIp, localHostName, localPort);
break;
default:
break;
}
} catch (DdlException e) {
LOG.error("Failed to drop {} node: {}:{}", nodeType, localIp, localPort, e);
// return true is a conservative behavior. we do not expect any exception here.
return true;
}
LOG.info("Finished to drop {} node: {}:{}", nodeType, localIp, localPort);
return true;
for (HostInfo localHostInfo : localHostInfos) {
HostInfo foundHostInfo = getFromHostInfos(remoteHostInfos, localHostInfo);
boolean needDrop = needDrop(foundHostInfo != null, localHostInfo);
if (needDrop) {
dealDropLocal(localHostInfo, nodeType);
}
}
// 2.2. Find remote node which need to be added.
for (SystemInfoService.HostInfo remoteHostInfo : remoteHostInfos) {
String remoteIp = remoteHostInfo.getIp();
Integer remotePort = remoteHostInfo.getPort();
String remoteHostName = remoteHostInfo.getHostName();
SystemInfoService.HostInfo foundHostInfo = getFromHostInfos(localHostInfos, remoteHostInfo);
for (HostInfo remoteHostInfo : remoteHostInfos) {
HostInfo foundHostInfo = getFromHostInfos(localHostInfos, remoteHostInfo);
if (foundHostInfo == null) {
// Can not find remote host in local hosts,
// which means this remote host need to be added.
try {
switch (nodeType) {
case ELECTABLE:
env.addFrontend(FrontendNodeType.FOLLOWER, remoteIp, remoteHostName, remotePort);
break;
case OBSERVER:
env.addFrontend(FrontendNodeType.OBSERVER, remoteIp, remoteHostName, remotePort);
break;
case BACKEND:
case BACKEND_CN:
List<SystemInfoService.HostInfo> newBackends = Lists.newArrayList();
newBackends.add(new SystemInfoService.HostInfo(remoteIp, remoteHostName, remotePort));
Env.getCurrentSystemInfo().addBackends(newBackends, false);
break;
default:
break;
}
} catch (UserException e) {
LOG.error("Failed to add {} node: {}:{}", nodeType, remoteIp, remotePort, e);
return true;
}
LOG.info("Finished to add {} node: {}:{}", nodeType, remoteIp, remotePort);
return true;
dealAddRemote(remoteHostInfo, nodeType);
}
}
return false;
}
private List<Pair<String, Integer>> convertHostInfosToIpPortPair(List<SystemInfoService.HostInfo> hostInfos) {
ArrayList<Pair<String, Integer>> pairs = Lists.newArrayList();
for (SystemInfoService.HostInfo e : hostInfos) {
pairs.add(Pair.of(e.getIp(), e.getPort()));
private void dealDropLocal(HostInfo localHostInfo, NodeType nodeType) {
String localIp = localHostInfo.getIp();
Integer localPort = localHostInfo.getPort();
String localHostName = localHostInfo.getHostName();
// Double check if is it self
if (isSelf(localHostInfo)) {
// This is it self. Shut down now.
LOG.error("self host {}:{} does not exist in remote hosts. Showdown.");
System.exit(-1);
}
return pairs;
// Can not find local host from remote host list,
// which means this node should be dropped.
try {
switch (nodeType) {
case ELECTABLE:
env.dropFrontend(FrontendNodeType.FOLLOWER, localIp, localHostName, localPort);
break;
case OBSERVER:
env.dropFrontend(FrontendNodeType.OBSERVER, localIp, localHostName, localPort);
break;
case BACKEND:
case BACKEND_CN:
Env.getCurrentSystemInfo().dropBackend(localIp, localHostName, localPort);
break;
case BROKER:
env.getBrokerMgr().dropBrokers(getBrokerName(), Lists.newArrayList(Pair.of(localIp, localPort)));
break;
default:
break;
}
} catch (DdlException e) {
LOG.error("Failed to drop {} node: {}:{}", nodeType, localIp, localPort, e);
}
LOG.info("Finished to drop {} node: {}:{}", nodeType, localIp, localPort);
}
private void dealAddRemote(HostInfo remoteHostInfo, NodeType nodeType) {
String remoteIp = remoteHostInfo.getIp();
Integer remotePort = remoteHostInfo.getPort();
String remoteHostName = remoteHostInfo.getHostName();
// Can not find remote host in local hosts,
// which means this remote host need to be added.
if (StringUtils.isEmpty(remoteIp)) {
LOG.info("remote node is not ready,need wait.");
return;
}
try {
switch (nodeType) {
case ELECTABLE:
env.addFrontend(FrontendNodeType.FOLLOWER, remoteIp, remoteHostName, remotePort);
break;
case OBSERVER:
env.addFrontend(FrontendNodeType.OBSERVER, remoteIp, remoteHostName, remotePort);
break;
case BACKEND:
case BACKEND_CN:
List<HostInfo> newBackends = Lists.newArrayList();
newBackends.add(new HostInfo(remoteIp, remoteHostName, remotePort));
Env.getCurrentSystemInfo().addBackends(newBackends, false);
break;
case BROKER:
env.getBrokerMgr().addBrokers(getBrokerName(), Lists.newArrayList(Pair.of(remoteIp, remotePort)));
break;
default:
break;
}
} catch (UserException e) {
LOG.error("Failed to add {} node: {}:{}", nodeType, remoteIp, remotePort, e);
}
LOG.info("Finished to add {} node: {}:{}", nodeType, remoteIp, remotePort);
}
// Get host port pair from pair list. Return null if not found
// when hostName,compare hostname,otherwise compare ip
private SystemInfoService.HostInfo getFromHostInfos(List<SystemInfoService.HostInfo> hostInfos,
SystemInfoService.HostInfo hostInfo) {
for (SystemInfoService.HostInfo h : hostInfos) {
private HostInfo getFromHostInfos(List<HostInfo> hostInfos,
HostInfo hostInfo) {
for (HostInfo h : hostInfos) {
if (StringUtils.isEmpty(hostInfo.hostName) || StringUtils.isEmpty(h.hostName)) {
if (hostInfo.getIp().equals(h.getIp()) && hostInfo.getPort() == (h.getPort())) {
return hostInfo;
@ -669,40 +589,106 @@ public class DeployManager extends MasterDaemon {
return null;
}
private List<SystemInfoService.HostInfo> convertFesToHostInfos(List<Frontend> frontends) {
List<SystemInfoService.HostInfo> hostPortPair = Lists.newArrayList();
private List<HostInfo> convertFesToHostInfos(List<Frontend> frontends) {
List<HostInfo> hostPortPair = Lists.newArrayList();
for (Frontend fe : frontends) {
hostPortPair.add(convertToHostInfo(fe));
}
return hostPortPair;
}
private List<SystemInfoService.HostInfo> convertBesToHostInfos(List<Backend> backends) {
List<SystemInfoService.HostInfo> hostPortPair = Lists.newArrayList();
private List<HostInfo> convertBrokersToHostInfos(List<FsBroker> brokers) {
List<HostInfo> hostPortPair = Lists.newArrayList();
for (FsBroker broker : brokers) {
hostPortPair.add(convertToHostInfo(broker));
}
return hostPortPair;
}
private List<HostInfo> convertBesToHostInfos(List<Backend> backends) {
List<HostInfo> hostPortPair = Lists.newArrayList();
for (Backend fe : backends) {
hostPortPair.add(convertToHostInfo(fe));
}
return hostPortPair;
}
private SystemInfoService.HostInfo convertToHostInfo(Frontend frontend) {
return new SystemInfoService.HostInfo(frontend.getIp(), frontend.getHostName(), frontend.getEditLogPort());
private HostInfo convertToHostInfo(Frontend frontend) {
return new HostInfo(frontend.getIp(), frontend.getHostName(), frontend.getEditLogPort());
}
private SystemInfoService.HostInfo convertToHostInfo(Backend backend) {
return new SystemInfoService.HostInfo(backend.getIp(), backend.getHostName(), backend.getHeartbeatPort());
private HostInfo convertToHostInfo(FsBroker broker) {
return new HostInfo(broker.ip, null, broker.port);
}
private boolean isSelf(SystemInfoService.HostInfo hostInfo) {
if (Config.edit_log_port == hostInfo.getPort()) {
// master host name may not same as local host name, so we should compare ip here
if (env.getMasterHostName() != null && env.getMasterHostName().equals(hostInfo.getHostName())) {
return true;
}
if (env.getMasterIp().equals(hostInfo.getIp())) {
return true;
}
private HostInfo convertToHostInfo(Backend backend) {
return new HostInfo(backend.getIp(), backend.getHostName(), backend.getHeartbeatPort());
}
private boolean isSelf(HostInfo hostInfo) {
if (env.getMasterIp().equals(hostInfo.getIp()) && Config.edit_log_port == hostInfo.getPort()) {
return true;
}
return false;
}
protected class Event {
private NodeType nodeType;
private List<HostInfo> hostInfos;
public Event(NodeType nodeType, List<HostInfo> hostInfos) {
this.nodeType = nodeType;
this.hostInfos = hostInfos;
}
public NodeType getNodeType() {
return nodeType;
}
public List<HostInfo> getHostInfos() {
return hostInfos;
}
@Override
public String toString() {
return "Event{"
+ "nodeType=" + nodeType
+ ", hostInfos=" + hostInfos
+ '}';
}
}
protected class NodeTypeAttr {
private boolean hasService;
private String serviceName;
private String subAttr1;
public NodeTypeAttr(boolean hasService) {
this.hasService = hasService;
}
public boolean hasService() {
return hasService;
}
public void setHasService(boolean hasService) {
this.hasService = hasService;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public String getSubAttr1() {
return subAttr1;
}
public void setSubAttr1(String subAttr1) {
this.subAttr1 = subAttr1;
}
}
}

View File

@ -27,7 +27,6 @@ import org.apache.doris.system.SystemInfoService;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.codec.binary.Base64;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -36,7 +35,6 @@ import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import java.util.List;
import java.util.Map;
/*
* Required env variables:
@ -129,7 +127,7 @@ public class AmbariDeployManager extends DeployManager {
if (Strings.isNullOrEmpty(brokerConfigNode)) {
LOG.warn("can not get broker config node from env var: {}", ENV_AMBARI_BROKER_COMPONENTS_CONFIG);
hasBrokerService = false;
nodeTypeAttrMap.get(NodeType.BROKER).setHasService(false);
}
LOG.info("get fe, be and broker config node name: {}, {}, {}",
@ -197,21 +195,21 @@ public class AmbariDeployManager extends DeployManager {
}
@Override
protected List<SystemInfoService.HostInfo> getGroupHostInfos(String groupName) {
protected List<SystemInfoService.HostInfo> getGroupHostInfos(NodeType nodeType) {
int port = -1;
if (groupName.equalsIgnoreCase(electableFeServiceGroup)) {
if (nodeType == NodeType.ELECTABLE) {
port = getFeEditLogPort();
} else if (groupName.equalsIgnoreCase(backendServiceGroup)) {
} else if (nodeType == NodeType.BACKEND) {
port = getBeHeartbeatPort();
} else {
LOG.warn("unknown group name: {}", groupName);
LOG.warn("unknown type: {}", nodeType.name());
return null;
}
if (port == -1) {
LOG.warn("failed to get port of component: {}", groupName);
LOG.warn("failed to get port of component: {}", nodeType.name());
return null;
}
String groupName = nodeTypeAttrMap.get(nodeType).getServiceName();
List<String> hostnames = getHostnamesFromComponentsJson(groupName);
List<SystemInfoService.HostInfo> hostPorts = Lists.newArrayListWithCapacity(hostnames.size());
for (String hostname : hostnames) {
@ -230,35 +228,8 @@ public class AmbariDeployManager extends DeployManager {
}
@Override
protected Map<String, List<SystemInfoService.HostInfo>> getBrokerGroupHostInfos() {
int port = getBrokerIpcPort();
if (port == -1) {
LOG.warn("failed to get port of component: {}", brokerServiceGroup);
return null;
}
String brokerName = getPropertyFromBlueprint(brokerConfigNode, KEY_BROKER_NAME);
if (brokerName == null) {
return null;
}
List<String> hostnames = getHostnamesFromComponentsJson(brokerServiceGroup);
List<SystemInfoService.HostInfo> hostPorts = Lists.newArrayListWithCapacity(hostnames.size());
for (String hostname : hostnames) {
Pair<String, Integer> hostPort = null;
try {
hostPort = SystemInfoService.validateHostAndPort(hostname + ":" + port);
} catch (AnalysisException e) {
LOG.warn("Invalid host port format: {}:{}", hostname, port, e);
continue;
}
hostPorts.add(new SystemInfoService.HostInfo(hostPort.first, null, hostPort.second));
}
Map<String, List<SystemInfoService.HostInfo>> brokers = Maps.newHashMap();
brokers.put(brokerName, hostPorts);
LOG.info("get brokers from ambari: {}", brokers);
return brokers;
protected String getBrokerName() {
return getPropertyFromBlueprint(brokerConfigNode, KEY_BROKER_NAME);
}
private Integer getFeEditLogPort() {

View File

@ -20,23 +20,32 @@ package org.apache.doris.deploy.impl;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.deploy.DeployManager;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.fabric8.kubernetes.api.model.EndpointAddress;
import io.fabric8.kubernetes.api.model.EndpointPort;
import io.fabric8.kubernetes.api.model.EndpointSubset;
import io.fabric8.kubernetes.api.model.Endpoints;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServicePort;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import jline.internal.Log;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
@ -55,8 +64,11 @@ public class K8sDeployManager extends DeployManager {
public static final String ENV_BROKER_SERVICE = "BROKER_SERVICE";
public static final String ENV_CN_SERVICE = "CN_SERVICE";
// we arbitrarily set all broker name as what ENV_BROKER_NAME specified.
public static final String ENV_BROKER_NAME = "BROKER_NAME";
public static final String ENV_FE_STATEFULSET = "FE_STATEFULSET";
public static final String ENV_FE_OBSERVER_STATEFULSET = "FE_OBSERVER_STATEFULSET";
public static final String ENV_BE_STATEFULSET = "BE_STATEFULSET";
public static final String ENV_BROKER_STATEFULSET = "BROKER_STATEFULSET";
public static final String ENV_CN_STATEFULSET = "CN_STATEFULSET";
public static final String FE_PORT = "edit-log-port"; // k8s only support -, not _
public static final String BE_PORT = "heartbeat-port";
@ -67,7 +79,7 @@ public class K8sDeployManager extends DeployManager {
private String appNamespace;
private String domainLTD;
private KubernetesClient client = null;
private Watch statefulSetWatch = null;
// =======for test only==========
public static final String K8S_CA_CERT_FILE = "cce-ca.pem";
public static final String K8S_CLIENT_CERT_FILE = "cce-admin.pem";
@ -79,7 +91,8 @@ public class K8sDeployManager extends DeployManager {
// =======for test only==========
public K8sDeployManager(Env env, long intervalMs) {
super(env, intervalMs);
// if enable fqdn,we wait for k8s to actively push the statefulset change
super(env, intervalMs, Config.enable_fqdn_mode);
initEnvVariables(ENV_FE_SERVICE, ENV_FE_OBSERVER_SERVICE, ENV_BE_SERVICE, ENV_BROKER_SERVICE, ENV_CN_SERVICE);
}
@ -105,46 +118,88 @@ public class K8sDeployManager extends DeployManager {
}
LOG.info("use domainLTD: {}", domainLTD);
//Fill NodeTypeAttr.subAttr1 with statefulName
//If serviceName is configured, the corresponding statefulSetName must be configured
for (NodeType nodeType : NodeType.values()) {
NodeTypeAttr nodeTypeAttr = nodeTypeAttrMap.get(nodeType);
if (nodeTypeAttr.hasService()) {
String statefulSetEnvName = getStatefulSetEnvName(nodeType);
Log.info("Env name of: {} is: {}", nodeType.name(), statefulSetEnvName);
String statefulSetName = Strings.nullToEmpty(System.getenv(statefulSetEnvName));
if (Strings.isNullOrEmpty(statefulSetName)) {
LOG.error("failed to init statefulSetName: {}", statefulSetEnvName);
System.exit(-1);
}
LOG.info("use statefulSetName: {}, {}", nodeType.name(), statefulSetName);
nodeTypeAttr.setSubAttr1(statefulSetName);
}
}
}
@Override
protected List<SystemInfoService.HostInfo> getGroupHostInfos(String groupName) {
// 1. get namespace and port name
String portName = null;
if (groupName.equals(electableFeServiceGroup)) {
portName = FE_PORT;
} else if (groupName.equals(observerFeServiceGroup)) {
portName = FE_PORT;
} else if (groupName.equals(backendServiceGroup)) {
portName = BE_PORT;
} else if (groupName.equals(brokerServiceGroup)) {
portName = BROKER_PORT;
} else if (groupName.equals(cnServiceGroup)) {
portName = BE_PORT;
} else {
LOG.warn("unknown service group name: {}", groupName);
return null;
}
Preconditions.checkNotNull(appNamespace);
Preconditions.checkNotNull(portName);
public void startListenerInternal() {
statefulSetWatch = getWatch(client());
LOG.info("Start listen statefulSets change event.");
}
// 2. get endpoint
Endpoints endpoints = null;
try {
endpoints = endpoints(appNamespace, groupName);
} catch (Exception e) {
LOG.warn("encounter exception when get endpoint from namespace {}, service: {}",
appNamespace, groupName, e);
private String getStatefulSetEnvName(NodeType nodeType) {
switch (nodeType) {
case ELECTABLE:
return ENV_FE_STATEFULSET;
case OBSERVER:
return ENV_FE_OBSERVER_STATEFULSET;
case BACKEND:
return ENV_BE_STATEFULSET;
case BACKEND_CN:
return ENV_CN_STATEFULSET;
case BROKER:
return ENV_BROKER_STATEFULSET;
default:
return null;
}
}
@Override
protected List<HostInfo> getGroupHostInfos(NodeType nodeType) {
if (Config.enable_fqdn_mode) {
return getGroupHostInfosByStatefulSet(nodeType);
} else {
return getGroupHostInfosByEndpoint(nodeType);
}
}
private List<HostInfo> getGroupHostInfosByStatefulSet(NodeType nodeType) {
String statefulSetName = nodeTypeAttrMap.get(nodeType).getSubAttr1();
Preconditions.checkNotNull(statefulSetName);
StatefulSet statefulSet = statefulSet(appNamespace, nodeTypeAttrMap.get(nodeType).getSubAttr1());
if (statefulSet == null) {
LOG.warn("get null statefulSet in namespace {}, statefulSetName: {}", appNamespace, statefulSetName);
return null;
}
return getHostInfosByNum(nodeType, statefulSet.getSpec().getReplicas());
}
private List<HostInfo> getGroupHostInfosByEndpoint(NodeType nodeType) {
// get portName
String portName = getPortName(nodeType);
Preconditions.checkNotNull(portName);
// get serviceName
String serviceName = nodeTypeAttrMap.get(nodeType).getServiceName();
Preconditions.checkNotNull(serviceName);
// get endpoint
Endpoints endpoints = endpoints(appNamespace, serviceName);
if (endpoints == null) {
// endpoints may be null if service does not exist;
LOG.warn("get null endpoints of namespace {} in service: {}", appNamespace, groupName);
LOG.warn("get null endpoints of namespace: {} in service: {}", appNamespace, serviceName);
return null;
}
// 3. get host port
List<SystemInfoService.HostInfo> result = Lists.newArrayList();
// get host port
List<HostInfo> result = Lists.newArrayList();
List<EndpointSubset> subsets = endpoints.getSubsets();
for (EndpointSubset subset : subsets) {
Integer port = -1;
@ -162,18 +217,19 @@ public class K8sDeployManager extends DeployManager {
List<EndpointAddress> addrs = subset.getAddresses();
for (EndpointAddress eaddr : addrs) {
result.add(new SystemInfoService.HostInfo(eaddr.getIp(), getDomainName(eaddr.getHostname(), groupName),
port));
result.add(new HostInfo(eaddr.getIp(), null, port));
}
}
LOG.info("get host port from group: {}: {}", groupName, result);
LOG.info("get host port from group: {}: {}", serviceName, result);
return result;
}
private String getDomainName(String hostName, String serviceName) {
//The rules for the domain name of k8s are $(podName).$(servicename).$(namespace).svc.cluster.local
// can see https://www.cnblogs.com/xiaokantianse/p/14267987.html#_label1_4
private String getDomainName(String podName, String serviceName) {
StringBuilder builder = new StringBuilder();
builder.append(hostName);
builder.append(podName);
builder.append(".");
builder.append(serviceName);
builder.append(".");
@ -183,26 +239,128 @@ public class K8sDeployManager extends DeployManager {
return builder.toString();
}
@Override
protected Map<String, List<SystemInfoService.HostInfo>> getBrokerGroupHostInfos() {
List<SystemInfoService.HostInfo> hostPorts = getGroupHostInfos(brokerServiceGroup);
if (hostPorts == null) {
private Endpoints endpoints(String namespace, String serviceName) {
try {
return client().endpoints().inNamespace(namespace).withName(serviceName).get();
} catch (Exception e) {
LOG.warn("encounter exception when get endpoint from namespace {}, service: {}",
appNamespace, serviceName, e);
return null;
}
final String brokerName = System.getenv(ENV_BROKER_NAME);
if (Strings.isNullOrEmpty(brokerName)) {
LOG.error("failed to get broker name from env: {}", ENV_BROKER_NAME);
System.exit(-1);
}
Map<String, List<SystemInfoService.HostInfo>> brokers = Maps.newHashMap();
brokers.put(brokerName, hostPorts);
LOG.info("get brokers from k8s: {}", brokers);
return brokers;
}
private Endpoints endpoints(String namespace, String serviceName) throws Exception {
return client().endpoints().inNamespace(namespace).withName(serviceName).get();
public Service service(String namespace, String serviceName) {
try {
return client().services().inNamespace(namespace).withName(serviceName).get();
} catch (Exception e) {
LOG.warn("encounter exception when get service from namespace {}, service: {}",
appNamespace, serviceName, e);
return null;
}
}
public StatefulSet statefulSet(String namespace, String statefulSetName) {
try {
return client().apps().statefulSets().inNamespace(namespace).withName(statefulSetName).get();
} catch (Exception e) {
LOG.warn("encounter exception when get statefulSet from namespace {}, statefulSet: {}",
appNamespace, statefulSetName, e);
return null;
}
}
private void dealEvent(String statefulsetName, Integer num) {
NodeType nodeType = getNodeType(statefulsetName);
if (nodeType == null) {
return;
}
List<HostInfo> hostInfosByNum = getHostInfosByNum(nodeType, num);
if (hostInfosByNum == null) {
return;
}
Event event = new Event(nodeType, hostInfosByNum);
nodeChangeQueue.offer(event);
}
public List<HostInfo> getHostInfosByNum(NodeType nodeType, Integer num) {
int servicePort = getServicePort(nodeType);
if (servicePort == -1) {
LOG.warn("get servicePort failed,{}", nodeType.name());
return null;
}
String statefulsetName = nodeTypeAttrMap.get(nodeType).getSubAttr1();
Preconditions.checkNotNull(statefulsetName);
String serviceName = nodeTypeAttrMap.get(nodeType).getServiceName();
Preconditions.checkNotNull(serviceName);
List<HostInfo> hostInfos = Lists.newArrayList();
for (int i = 0; i < num; i++) {
//The podName rule of k8s is $(statefulset name) - $(sequence number)
//can see https://www.cnblogs.com/xiaokantianse/p/14267987.html#_label1_4
String domainName = getDomainName(statefulsetName + "-" + i, serviceName);
hostInfos.add(new HostInfo(getIpByDomain(domainName), domainName, servicePort));
}
return hostInfos;
}
private String getIpByDomain(String domainName) {
try {
InetAddress inetAddress = InetAddress.getByName(domainName);
return inetAddress.getHostAddress();
} catch (UnknownHostException e) {
LOG.info("unknown host name for domainName, {}", domainName, e);
return null;
}
}
private int getServicePort(NodeType nodeType) {
Integer port = -1;
String serviceName = nodeTypeAttrMap.get(nodeType).getServiceName();
Preconditions.checkNotNull(serviceName);
Service service = service(appNamespace, serviceName);
if (service == null) {
LOG.warn("get null service in namespace: {}, serviceName: {}", appNamespace, serviceName);
return port;
}
String portName = getPortName(nodeType);
Preconditions.checkNotNull(portName);
List<ServicePort> ports = service.getSpec().getPorts();
for (ServicePort servicePort : ports) {
if (servicePort.getName().equals(portName)) {
port = servicePort.getPort();
break;
}
}
return port;
}
private String getPortName(NodeType nodeType) {
switch (nodeType) {
case BROKER:
return BROKER_PORT;
case ELECTABLE:
case OBSERVER:
return FE_PORT;
case BACKEND:
case BACKEND_CN:
return BE_PORT;
default:
return null;
}
}
private NodeType getNodeType(String ststefulSetName) {
if (StringUtils.isEmpty(ststefulSetName)) {
return null;
}
for (Map.Entry<NodeType, NodeTypeAttr> entry : nodeTypeAttrMap.entrySet()) {
if (ststefulSetName.equals(entry.getValue().getSubAttr1())) {
return entry.getKey();
}
}
return null;
}
private synchronized KubernetesClient client() {
@ -230,7 +388,32 @@ public class K8sDeployManager extends DeployManager {
return client;
}
private Watch getWatch(KubernetesClient client) {
return client.apps().statefulSets().inNamespace(appNamespace).watch(new Watcher<StatefulSet>() {
@Override
public void onClose(WatcherException e) {
LOG.warn("Watch error received: {}.", e.getMessage());
}
@Override
public void eventReceived(Action action, StatefulSet statefulSet) {
LOG.info("Watch event received {}: {}: {}", action.name(), statefulSet.getMetadata().getName(),
statefulSet.getSpec().getReplicas());
dealEvent(statefulSet.getMetadata().getName(), statefulSet.getSpec().getReplicas());
}
@Override
public void onClose() {
LOG.info("Watch gracefully closed.");
}
});
}
public void close() {
if (statefulSetWatch != null) {
statefulSetWatch.close();
}
if (client != null) {
client.close();
}

View File

@ -25,7 +25,6 @@ import org.apache.doris.system.SystemInfoService;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -37,7 +36,6 @@ import java.io.InputStreamReader;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.List;
import java.util.Map;
/*
* This for Boxer2 Baidu BCC agent
@ -56,8 +54,6 @@ public class LocalFileDeployManager extends DeployManager {
public static final String ENV_BROKER_SERVICE = "BROKER_SERVICE";
public static final String ENV_CN_SERVICE = "CN_SERVICE";
public static final String ENV_BROKER_NAME = "BROKER_NAME";
private String clusterInfoFile;
public LocalFileDeployManager(Env env, long intervalMs) {
@ -83,7 +79,8 @@ public class LocalFileDeployManager extends DeployManager {
}
@Override
public List<SystemInfoService.HostInfo> getGroupHostInfos(String groupName) {
public List<SystemInfoService.HostInfo> getGroupHostInfos(NodeType nodeType) {
String groupName = nodeTypeAttrMap.get(nodeType).getServiceName();
List<SystemInfoService.HostInfo> result = Lists.newArrayList();
LOG.info("begin to get group: {} from file: {}", groupName, clusterInfoFile);
@ -153,22 +150,4 @@ public class LocalFileDeployManager extends DeployManager {
LOG.info("get hosts from {}: {}", groupName, result);
return result;
}
@Override
protected Map<String, List<SystemInfoService.HostInfo>> getBrokerGroupHostInfos() {
List<SystemInfoService.HostInfo> hostPorts = getGroupHostInfos(brokerServiceGroup);
if (hostPorts == null) {
return null;
}
final String brokerName = System.getenv(ENV_BROKER_NAME);
if (Strings.isNullOrEmpty(brokerName)) {
LOG.error("failed to get broker name from env: {}", ENV_BROKER_NAME);
System.exit(-1);
}
Map<String, List<SystemInfoService.HostInfo>> brokers = Maps.newHashMap();
brokers.put(brokerName, hostPorts);
LOG.info("get brokers from file: {}", brokers);
return brokers;
}
}

View File

@ -207,7 +207,6 @@ under the License.
<kubernetes-model.version>5.12.2</kubernetes-model.version>
<logging-interceptor.version>4.7.2</logging-interceptor.version>
<okhttp.version>4.7.2</okhttp.version>
<okhttp-ws.version>3.4.2</okhttp-ws.version>
<okio.version>2.6.0</okio.version>
<snakeyaml.version>1.15</snakeyaml.version>
<validation-api.version>1.1.0.Final</validation-api.version>
@ -629,12 +628,6 @@ under the License.
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttp-ws -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp-ws</artifactId>
<version>${okhttp-ws.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.squareup.okio/okio -->
<dependency>
<groupId>com.squareup.okio</groupId>