diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index ab845b4bdb..c365f901b0 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -306,11 +306,6 @@ under the License.
com.squareup.okhttp3
okhttp
-
-
- com.squareup.okhttp3
- okhttp-ws
-
com.squareup.okio
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 035fc6d7ab..632ea3daab 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1403,6 +1403,7 @@ public class Env {
if (!Config.enable_deploy_manager.equalsIgnoreCase("disable")) {
LOG.info("deploy manager {} start", deployManager.getName());
deployManager.start();
+ deployManager.startListener();
}
// start routine load scheduler
routineLoadScheduler.start();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java
index e052654f8a..3e3fabe4dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java
@@ -31,19 +31,19 @@ import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;
-import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
/*
* This deploy manager is to support Kubernetes, Ambari or other system for automating deployment.
@@ -108,6 +108,8 @@ public class DeployManager extends MasterDaemon {
public static final String ENV_FE_EXIST_ENDPOINT = "FE_EXIST_ENDPOINT";
public static final String ENV_FE_INIT_NUMBER = "FE_INIT_NUMBER";
+ // we arbitrarily set all broker name as what ENV_BROKER_NAME specified.
+ public static final String ENV_BROKER_NAME = "BROKER_NAME";
public enum NodeType {
ELECTABLE, OBSERVER, BACKEND, BROKER, BACKEND_CN
@@ -115,18 +117,6 @@ public class DeployManager extends MasterDaemon {
protected Env env;
- protected String electableFeServiceGroup;
- protected String observerFeServiceGroup;
- protected String backendServiceGroup;
- protected String brokerServiceGroup;
- protected String cnServiceGroup;
-
- protected boolean hasElectableService = false;
- protected boolean hasBackendService = false;
- protected boolean hasObserverService = false;
- protected boolean hasBrokerService = false;
- protected boolean hasCnService = false;
-
// Host identifier -> missing counter
// eg:
// In k8s, when a node is down, the endpoint may be disappear immediately in service group.
@@ -136,12 +126,32 @@ public class DeployManager extends MasterDaemon {
// So we use this map to count the continuous detected down times, if the continuous down time is more
// then MAX_MISSING_TIME, we considered this node as down permanently.
protected Map counterMap = Maps.newHashMap();
- // k8s pod delete and will recreate, so we need to wait for a while,otherwise we will drop node by mistake
- protected static final Integer MAX_MISSING_TIME = 60;
+ protected Integer maxMissingTime = 5;
+ //if 'true',Actively pull node information from external systems.
+ //if 'false',The external system actively pushes the node change information,
+ // and only needs to listen to 'nodeChangeQueue'
+ protected boolean listenRequired;
+ protected BlockingQueue nodeChangeQueue;
+ protected Map nodeTypeAttrMap = Maps.newHashMap();
+ private boolean isRunning;
public DeployManager(Env env, long intervalMs) {
+ this(env, intervalMs, false);
+ }
+
+ public DeployManager(Env env, long intervalMs, boolean listenRequired) {
super("deployManager", intervalMs);
this.env = env;
+ this.listenRequired = listenRequired;
+ this.isRunning = false;
+ if (listenRequired) {
+ this.maxMissingTime = 0;
+ this.nodeChangeQueue = Queues.newLinkedBlockingDeque();
+ }
+ // init NodeTypeAttr for each NodeType,so when get NodeTypeAttr by NodeType,we assume not null
+ for (NodeType nodeType : NodeType.values()) {
+ nodeTypeAttrMap.put(nodeType, new NodeTypeAttr(false));
+ }
}
// Init all environment variables.
@@ -150,11 +160,11 @@ public class DeployManager extends MasterDaemon {
protected void initEnvVariables(String envElectableFeServiceGroup, String envObserverFeServiceGroup,
String envBackendServiceGroup, String envBrokerServiceGroup, String envCnServiceGroup) {
- this.electableFeServiceGroup = Strings.nullToEmpty(System.getenv(envElectableFeServiceGroup));
- this.observerFeServiceGroup = Strings.nullToEmpty(System.getenv(envObserverFeServiceGroup));
- this.backendServiceGroup = Strings.nullToEmpty(System.getenv(envBackendServiceGroup));
- this.brokerServiceGroup = Strings.nullToEmpty(System.getenv(envBrokerServiceGroup));
- this.cnServiceGroup = Strings.nullToEmpty(System.getenv(envCnServiceGroup));
+ String electableFeServiceGroup = Strings.nullToEmpty(System.getenv(envElectableFeServiceGroup));
+ String observerFeServiceGroup = Strings.nullToEmpty(System.getenv(envObserverFeServiceGroup));
+ String backendServiceGroup = Strings.nullToEmpty(System.getenv(envBackendServiceGroup));
+ String brokerServiceGroup = Strings.nullToEmpty(System.getenv(envBrokerServiceGroup));
+ String cnServiceGroup = Strings.nullToEmpty(System.getenv(envCnServiceGroup));
LOG.info("get deploy env: {}, {}, {}, {}, {}", envElectableFeServiceGroup, envObserverFeServiceGroup,
envBackendServiceGroup, envBrokerServiceGroup, envCnServiceGroup);
@@ -162,31 +172,36 @@ public class DeployManager extends MasterDaemon {
// check if we have electable service
if (!Strings.isNullOrEmpty(electableFeServiceGroup)) {
LOG.info("Electable service group is found");
- hasElectableService = true;
- }
-
- // check if we have backend service
- if (!Strings.isNullOrEmpty(backendServiceGroup)) {
- LOG.info("Backend service group is found");
- hasBackendService = true;
+ nodeTypeAttrMap.get(NodeType.ELECTABLE).setHasService(true);
+ nodeTypeAttrMap.get(NodeType.ELECTABLE).setServiceName(electableFeServiceGroup);
}
// check if we have observer service
if (!Strings.isNullOrEmpty(observerFeServiceGroup)) {
LOG.info("Observer service group is found");
- hasObserverService = true;
+ nodeTypeAttrMap.get(NodeType.OBSERVER).setHasService(true);
+ nodeTypeAttrMap.get(NodeType.OBSERVER).setServiceName(observerFeServiceGroup);
+ }
+
+ // check if we have backend service
+ if (!Strings.isNullOrEmpty(backendServiceGroup)) {
+ LOG.info("Backend service group is found");
+ nodeTypeAttrMap.get(NodeType.BACKEND).setHasService(true);
+ nodeTypeAttrMap.get(NodeType.BACKEND).setServiceName(backendServiceGroup);
}
// check if we have broker service
if (!Strings.isNullOrEmpty(brokerServiceGroup)) {
LOG.info("Broker service group is found");
- hasBrokerService = true;
+ nodeTypeAttrMap.get(NodeType.BROKER).setHasService(true);
+ nodeTypeAttrMap.get(NodeType.BROKER).setServiceName(brokerServiceGroup);
}
// check if we have cn service
if (!Strings.isNullOrEmpty(cnServiceGroup)) {
LOG.info("Cn service group is found");
- hasCnService = true;
+ nodeTypeAttrMap.get(NodeType.BACKEND_CN).setHasService(true);
+ nodeTypeAttrMap.get(NodeType.BACKEND_CN).setServiceName(cnServiceGroup);
}
LOG.info("get electableFeServiceGroup: {}, observerFeServiceGroup: {}, backendServiceGroup: {}"
@@ -195,6 +210,16 @@ public class DeployManager extends MasterDaemon {
cnServiceGroup);
}
+ public void startListener() {
+ if (listenRequired) {
+ startListenerInternal();
+ }
+ }
+
+ public void startListenerInternal() {
+ throw new NotImplementedException();
+ }
+
// Call init before each runOneCycle
// Default is do nothing. Can be override in derived class
// return false if init failed.
@@ -202,41 +227,20 @@ public class DeployManager extends MasterDaemon {
return true;
}
- // get electable fe
- protected List getElectableGroupHostInfos() {
- Preconditions.checkState(!Strings.isNullOrEmpty(electableFeServiceGroup));
- return getGroupHostInfos(electableFeServiceGroup);
- }
-
- // get observer fe
- protected List getObserverGroupHostInfos() {
- Preconditions.checkState(!Strings.isNullOrEmpty(observerFeServiceGroup));
- return getGroupHostInfos(observerFeServiceGroup);
- }
-
- // get backend
- protected List getBackendGroupHostInfos() {
- Preconditions.checkState(!Strings.isNullOrEmpty(backendServiceGroup));
- return getGroupHostInfos(backendServiceGroup);
- }
-
- // get cn
- protected List getCnGroupHostInfos() {
- Preconditions.checkState(!Strings.isNullOrEmpty(cnServiceGroup));
- return getGroupHostInfos(cnServiceGroup);
- }
-
// Get all host port pairs from specified group.
// Must implement in derived class.
// If encounter errors, return null
- protected List getGroupHostInfos(String groupName) {
+ protected List getGroupHostInfos(NodeType nodeType) {
throw new NotImplementedException();
}
- // get broker
- // return (broker name -> list of broker host port)
- protected Map> getBrokerGroupHostInfos() {
- throw new NotImplementedException();
+ protected String getBrokerName() {
+ String brokerName = System.getenv(ENV_BROKER_NAME);
+ if (Strings.isNullOrEmpty(brokerName)) {
+ LOG.error("failed to get broker name from env: {}", ENV_BROKER_NAME);
+ System.exit(-1);
+ }
+ return brokerName;
}
public List getHelperNodes() {
@@ -284,16 +288,19 @@ public class DeployManager extends MasterDaemon {
// 2. get electable fe host from remote
boolean ok = true;
- List feHostInfos = null;
+ List feHostInfos = null;
while (true) {
try {
- feHostInfos = getElectableGroupHostInfos();
+ feHostInfos = getGroupHostInfos(NodeType.ELECTABLE);
if (feHostInfos == null) {
ok = false;
} else if (feHostInfos.size() != numOfFe) {
LOG.error("num of fe get from remote [{}] does not equal to the expected num: {}",
feHostInfos, numOfFe);
ok = false;
+ } else if (!checkIpIfNotNull(feHostInfos)) {
+ LOG.error("some fe not ready,need wait.");
+ ok = false;
} else {
ok = true;
}
@@ -326,6 +333,15 @@ public class DeployManager extends MasterDaemon {
feHostInfos.get(0).getPort()));
}
+ private boolean checkIpIfNotNull(List hostInfos) {
+ for (HostInfo hostInfo : hostInfos) {
+ if (hostInfo.getIp() == null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
@Override
protected void runAfterCatalogReady() {
if (Config.enable_deploy_manager.equals("disable")) {
@@ -333,328 +349,232 @@ public class DeployManager extends MasterDaemon {
exit();
return;
}
-
// 0. init
if (!init()) {
return;
}
- // 1. Check the electable fe service group
- if (hasElectableService) {
- List remoteElectableFeHosts = getElectableGroupHostInfos();
- if (remoteElectableFeHosts == null) {
- return;
- }
- LOG.debug("get electable fe hosts {} from electable fe service group: {}",
- remoteElectableFeHosts, electableFeServiceGroup);
- if (remoteElectableFeHosts.isEmpty()) {
- LOG.error("electable fe service group {} is empty, which should not happen", electableFeServiceGroup);
- return;
- }
-
- // 1.1 Check if self is in electable fe service group
- SystemInfoService.HostInfo selfHostInfo = getFromHostInfos(remoteElectableFeHosts,
- new SystemInfoService.HostInfo(env.getMasterIp(), env.getMasterHostName(), Config.edit_log_port));
- if (selfHostInfo == null) {
- // The running of this deploy manager means this node is considered self as Master.
- // If it self does not exist in electable fe service group, it should shut it self down.
- LOG.warn("self host {} is not in electable fe service group {}. Exit now.",
- env.getMasterIp(), electableFeServiceGroup);
- System.exit(-1);
- }
-
- // 1.2 Check the change of electable fe service group
- List localElectableFeAddrs = env.getFrontends(FrontendNodeType.FOLLOWER);
- List localElectableFeHostInfos = this
- .convertFesToHostInfos(localElectableFeAddrs);
- LOG.debug("get local electable hosts: {}", localElectableFeHostInfos);
- if (inspectNodeChange(remoteElectableFeHosts, localElectableFeHostInfos, NodeType.ELECTABLE)) {
- return;
- }
+ if (isRunning) {
+ LOG.warn("Last task not finished,ignore current task.");
+ return;
}
+ isRunning = true;
- // 2. Check the backend service group
- if (hasBackendService) {
- BE_BLOCK: {
- List remoteBackendHosts = getBackendGroupHostInfos();
- if (remoteBackendHosts == null) {
- break BE_BLOCK;
- }
- LOG.debug("get remote backend hosts: {}", remoteBackendHosts);
- List localBackends = Env.getCurrentSystemInfo()
- .getClusterMixBackends(SystemInfoService.DEFAULT_CLUSTER);
- List localBackendHostInfos = this.convertBesToHostInfos(localBackends);
- LOG.debug("get local backend addrs: {}", localBackendHostInfos);
- if (inspectNodeChange(remoteBackendHosts, localBackendHostInfos, NodeType.BACKEND)) {
- return;
- }
- }
+ if (listenRequired && processQueue()) {
+ isRunning = false;
+ return;
}
+ processPolling();
+ isRunning = false;
+ }
- // 3. Check the cn service group
- if (hasCnService) {
- CN_BLOCK: {
- List remoteCnHosts = getCnGroupHostInfos();
- if (remoteCnHosts == null) {
- break CN_BLOCK;
- }
- LOG.debug("get remote cn hosts: {}", remoteCnHosts);
- List localCns = Env.getCurrentSystemInfo()
- .getClusterCnBackends(SystemInfoService.DEFAULT_CLUSTER);
- List localCnHostInfos = this.convertBesToHostInfos(localCns);
- LOG.debug("get local cn addrs: {}", localCnHostInfos);
- if (inspectNodeChange(remoteCnHosts, localCnHostInfos, NodeType.BACKEND_CN)) {
- return;
- }
+ private void processPolling() {
+ for (NodeType nodeType : NodeType.values()) {
+ NodeTypeAttr nodeTypeAttr = nodeTypeAttrMap.get(nodeType);
+ if (!nodeTypeAttr.hasService) {
+ continue;
}
- }
-
- if (hasObserverService) {
- OB_BLOCK: {
- // 3. Check the observer fe service group
- List remoteObserverFeHosts = getObserverGroupHostInfos();
- if (remoteObserverFeHosts == null) {
- break OB_BLOCK;
- }
- LOG.debug("get remote observer fe hosts: {}", remoteObserverFeHosts);
- List localObserverFeAddrs = env.getFrontends(FrontendNodeType.OBSERVER);
- List localObserverFeHosts = this
- .convertFesToHostInfos(localObserverFeAddrs);
- LOG.debug("get local observer fe hosts: {}", localObserverFeHosts);
- if (inspectNodeChange(remoteObserverFeHosts, localObserverFeHosts, NodeType.OBSERVER)) {
- return;
- }
- }
- }
-
- if (hasBrokerService) {
- BROKER_BLOCK: {
- // 4. Check the broker service group
- Map> remoteBrokerHosts = getBrokerGroupHostInfos();
- if (remoteBrokerHosts == null) {
- break BROKER_BLOCK;
- }
-
- Map> localBrokers = env.getBrokerMgr().getBrokerListMap();
-
- // 1. find missing brokers
- for (Map.Entry> entry : localBrokers.entrySet()) {
- String brokerName = entry.getKey();
- if (remoteBrokerHosts.containsKey(brokerName)) {
- List localList = entry.getValue();
- List remoteList = remoteBrokerHosts.get(brokerName);
-
- // 1.1 found missing broker host
- for (FsBroker addr : localList) {
- SystemInfoService.HostInfo foundHost = getFromHostInfos(remoteList,
- new SystemInfoService.HostInfo(addr.ip, null, addr.port));
- if (foundHost == null) {
- List> list = Lists.newArrayList();
- list.add(Pair.of(addr.ip, addr.port));
- try {
- env.getBrokerMgr().dropBrokers(brokerName, list);
- LOG.info("drop broker {}:{} with name: {}",
- addr.ip, addr.port, brokerName);
- } catch (DdlException e) {
- LOG.warn("failed to drop broker {}:{} with name: {}",
- addr.ip, addr.port, brokerName, e);
- continue;
- }
- }
- }
-
- // 1.2 add new broker host
- for (SystemInfoService.HostInfo pair : remoteList) {
- FsBroker foundAddr = getHostFromBrokerAddrs(localList, pair.getIp(), pair.getPort());
- if (foundAddr == null) {
- // add new broker
- List> list = Lists.newArrayList();
- list.add(Pair.of(pair.getIp(), pair.getPort()));
- try {
- env.getBrokerMgr().addBrokers(brokerName, list);
- LOG.info("add broker {}:{} with name {}", pair.getIp(), pair.getPort(), brokerName);
- } catch (DdlException e) {
- LOG.warn("failed to add broker {}:{} with name {}",
- pair.getIp(), pair.getPort(), brokerName);
- continue;
- }
- }
- }
-
- } else {
- // broker with this name does not exist in remote. drop all
- try {
- env.getBrokerMgr().dropAllBroker(brokerName);
- LOG.info("drop all brokers with name: {}", brokerName);
- } catch (DdlException e) {
- LOG.warn("failed to drop all brokers with name: {}", brokerName, e);
- continue;
- }
- }
- } // end for
-
- // 2. add new brokers
- for (Map.Entry> entry : remoteBrokerHosts.entrySet()) {
- String remoteBrokerName = entry.getKey();
- if (!localBrokers.containsKey(remoteBrokerName)) {
- // add new brokers
- try {
- env.getBrokerMgr()
- .addBrokers(remoteBrokerName, convertHostInfosToIpPortPair(entry.getValue()));
- LOG.info("add brokers {} with name {}", entry.getValue(), remoteBrokerName);
- } catch (DdlException e) {
- LOG.info("failed to add brokers {} with name {}",
- entry.getValue(), remoteBrokerName, e);
- continue;
- }
- }
- }
- } // end of BROKER BLOCK
+ List remoteHosts = getGroupHostInfos(nodeType);
+ LOG.debug("get serviceName: {},remoteHosts: {}", nodeTypeAttr.getServiceName(), remoteHosts);
+ process(nodeType, remoteHosts);
}
}
- private FsBroker getHostFromBrokerAddrs(List addrList,
- String ip, Integer port) {
- for (FsBroker brokerAddress : addrList) {
- if (brokerAddress.ip.equals(ip) && brokerAddress.port == port) {
- return brokerAddress;
- }
+ private boolean processQueue() {
+ Event event = nodeChangeQueue.poll();
+ if (event == null) {
+ return false;
+ }
+ process(event.getNodeType(), event.getHostInfos());
+ return true;
+ }
+
+ private void process(NodeType nodeType, List remoteHosts) {
+ if (remoteHosts == null) {
+ return;
+ }
+ if (nodeType == NodeType.ELECTABLE && remoteHosts.isEmpty()) {
+ LOG.warn("electable fe service is empty, which should not happen");
+ return;
+ }
+ List localHosts = getLocalHosts(nodeType);
+ inspectNodeChange(remoteHosts, localHosts, nodeType);
+ }
+
+ private List getLocalHosts(NodeType nodeType) {
+ switch (nodeType) {
+ case ELECTABLE:
+ List localElectableFeAddrs = env.getFrontends(FrontendNodeType.FOLLOWER);
+ return this
+ .convertFesToHostInfos(localElectableFeAddrs);
+ case OBSERVER:
+ List localObserverFeAddrs = env.getFrontends(FrontendNodeType.OBSERVER);
+ return this
+ .convertFesToHostInfos(localObserverFeAddrs);
+ case BACKEND:
+ List localBackends = Env.getCurrentSystemInfo()
+ .getClusterMixBackends(SystemInfoService.DEFAULT_CLUSTER);
+ return this.convertBesToHostInfos(localBackends);
+ case BACKEND_CN:
+ List localCns = Env.getCurrentSystemInfo()
+ .getClusterCnBackends(SystemInfoService.DEFAULT_CLUSTER);
+ return this.convertBesToHostInfos(localCns);
+ case BROKER:
+ List localBrokers = env.getBrokerMgr().getBrokerListMap().get(getBrokerName());
+ if (localBrokers == null) {
+ localBrokers = Lists.newArrayList();
+ }
+ return convertBrokersToHostInfos(localBrokers);
+ default:
+ break;
}
return null;
}
+ private boolean needDrop(boolean found, HostInfo localHostInfo) {
+ if (found) {
+ if (counterMap.containsKey(localHostInfo.getIdent())) {
+ counterMap.remove(localHostInfo.getIdent());
+ }
+ return false;
+ } else {
+ if (maxMissingTime <= 0) {
+ return true;
+ }
+ // Check the detected downtime
+ if (!counterMap.containsKey(localHostInfo.getIdent())) {
+ // First detected downtime. Add to the map and ignore
+ LOG.warn("downtime node: {} detected times: 1",
+ localHostInfo);
+ counterMap.put(localHostInfo.getIdent(), 1);
+ return false;
+ } else {
+ int times = counterMap.get(localHostInfo.getIdent());
+ if (times < maxMissingTime) {
+ LOG.warn("downtime node: {} detected times: {}",
+ localHostInfo, times + 1);
+ counterMap.put(localHostInfo.getIdent(), times + 1);
+ return false;
+ } else {
+ // Reset the counter map and do the dropping operation
+ LOG.warn("downtime node: {} detected times: {}. drop it",
+ localHostInfo, times + 1);
+ counterMap.remove(localHostInfo.getIdent());
+ return true;
+ }
+ }
+ }
+ }
+
/*
* Inspect the node change.
* 1. Check if there are some nodes need to be dropped.
* 2. Check if there are some nodes need to be added.
*
- * We only handle one change at a time.
* Return true if something changed
*/
- private boolean inspectNodeChange(List remoteHostInfos,
- List localHostInfos,
+ private void inspectNodeChange(List remoteHostInfos,
+ List localHostInfos,
NodeType nodeType) {
// 2.1 Find local node which need to be dropped.
- for (SystemInfoService.HostInfo localHostInfo : localHostInfos) {
- String localIp = localHostInfo.getIp();
- Integer localPort = localHostInfo.getPort();
- String localHostName = localHostInfo.getHostName();
- SystemInfoService.HostInfo foundHostInfo = getFromHostInfos(remoteHostInfos, localHostInfo);
- if (foundHostInfo != null) {
- if (counterMap.containsKey(localHostInfo.getIdent())) {
- counterMap.remove(localHostInfo.getIdent());
- }
- } else {
- // Double check if is it self
- if (isSelf(localHostInfo)) {
- // This is it self. Shut down now.
- LOG.error("self host {}:{} does not exist in remote hosts. Showdown.");
- System.exit(-1);
- }
-
- // Check the detected downtime
- if (!counterMap.containsKey(localHostInfo.getIdent())) {
- // First detected downtime. Add to the map and ignore
- LOG.warn("downtime of {} node: {} detected times: 1",
- nodeType.name(), localHostInfo);
- counterMap.put(localHostInfo.getIdent(), 1);
- return false;
- } else {
- int times = counterMap.get(localHostInfo.getIdent());
- if (times < MAX_MISSING_TIME) {
- LOG.warn("downtime of {} node: {} detected times: {}",
- nodeType.name(), localHostInfo, times + 1);
- counterMap.put(localHostInfo.getIdent(), times + 1);
- return false;
- } else {
- // Reset the counter map and do the dropping operation
- LOG.warn("downtime of {} node: {} detected times: {}. drop it",
- nodeType.name(), localHostInfo, times + 1);
- counterMap.remove(localHostInfo.getIdent());
- }
- }
-
- // Can not find local host from remote host list,
- // which means this node should be dropped.
- try {
- switch (nodeType) {
- case ELECTABLE:
- env.dropFrontend(FrontendNodeType.FOLLOWER, localIp, localHostName, localPort);
- break;
- case OBSERVER:
- env.dropFrontend(FrontendNodeType.OBSERVER, localIp, localHostName, localPort);
- break;
- case BACKEND:
- case BACKEND_CN:
- Env.getCurrentSystemInfo().dropBackend(localIp, localHostName, localPort);
- break;
- default:
- break;
- }
- } catch (DdlException e) {
- LOG.error("Failed to drop {} node: {}:{}", nodeType, localIp, localPort, e);
- // return true is a conservative behavior. we do not expect any exception here.
- return true;
- }
-
- LOG.info("Finished to drop {} node: {}:{}", nodeType, localIp, localPort);
- return true;
+ for (HostInfo localHostInfo : localHostInfos) {
+ HostInfo foundHostInfo = getFromHostInfos(remoteHostInfos, localHostInfo);
+ boolean needDrop = needDrop(foundHostInfo != null, localHostInfo);
+ if (needDrop) {
+ dealDropLocal(localHostInfo, nodeType);
}
}
// 2.2. Find remote node which need to be added.
- for (SystemInfoService.HostInfo remoteHostInfo : remoteHostInfos) {
- String remoteIp = remoteHostInfo.getIp();
- Integer remotePort = remoteHostInfo.getPort();
- String remoteHostName = remoteHostInfo.getHostName();
- SystemInfoService.HostInfo foundHostInfo = getFromHostInfos(localHostInfos, remoteHostInfo);
+ for (HostInfo remoteHostInfo : remoteHostInfos) {
+ HostInfo foundHostInfo = getFromHostInfos(localHostInfos, remoteHostInfo);
if (foundHostInfo == null) {
- // Can not find remote host in local hosts,
- // which means this remote host need to be added.
- try {
- switch (nodeType) {
- case ELECTABLE:
- env.addFrontend(FrontendNodeType.FOLLOWER, remoteIp, remoteHostName, remotePort);
- break;
- case OBSERVER:
- env.addFrontend(FrontendNodeType.OBSERVER, remoteIp, remoteHostName, remotePort);
- break;
- case BACKEND:
- case BACKEND_CN:
- List newBackends = Lists.newArrayList();
- newBackends.add(new SystemInfoService.HostInfo(remoteIp, remoteHostName, remotePort));
- Env.getCurrentSystemInfo().addBackends(newBackends, false);
- break;
- default:
- break;
- }
- } catch (UserException e) {
- LOG.error("Failed to add {} node: {}:{}", nodeType, remoteIp, remotePort, e);
- return true;
- }
-
- LOG.info("Finished to add {} node: {}:{}", nodeType, remoteIp, remotePort);
- return true;
+ dealAddRemote(remoteHostInfo, nodeType);
}
}
-
- return false;
}
- private List> convertHostInfosToIpPortPair(List hostInfos) {
- ArrayList> pairs = Lists.newArrayList();
- for (SystemInfoService.HostInfo e : hostInfos) {
- pairs.add(Pair.of(e.getIp(), e.getPort()));
+ private void dealDropLocal(HostInfo localHostInfo, NodeType nodeType) {
+ String localIp = localHostInfo.getIp();
+ Integer localPort = localHostInfo.getPort();
+ String localHostName = localHostInfo.getHostName();
+ // Double check if is it self
+ if (isSelf(localHostInfo)) {
+ // This is it self. Shut down now.
+ LOG.error("self host {}:{} does not exist in remote hosts. Showdown.");
+ System.exit(-1);
}
- return pairs;
+
+ // Can not find local host from remote host list,
+ // which means this node should be dropped.
+ try {
+ switch (nodeType) {
+ case ELECTABLE:
+ env.dropFrontend(FrontendNodeType.FOLLOWER, localIp, localHostName, localPort);
+ break;
+ case OBSERVER:
+ env.dropFrontend(FrontendNodeType.OBSERVER, localIp, localHostName, localPort);
+ break;
+ case BACKEND:
+ case BACKEND_CN:
+ Env.getCurrentSystemInfo().dropBackend(localIp, localHostName, localPort);
+ break;
+ case BROKER:
+ env.getBrokerMgr().dropBrokers(getBrokerName(), Lists.newArrayList(Pair.of(localIp, localPort)));
+ break;
+ default:
+ break;
+ }
+ } catch (DdlException e) {
+ LOG.error("Failed to drop {} node: {}:{}", nodeType, localIp, localPort, e);
+ }
+
+ LOG.info("Finished to drop {} node: {}:{}", nodeType, localIp, localPort);
+ }
+
+ private void dealAddRemote(HostInfo remoteHostInfo, NodeType nodeType) {
+ String remoteIp = remoteHostInfo.getIp();
+ Integer remotePort = remoteHostInfo.getPort();
+ String remoteHostName = remoteHostInfo.getHostName();
+ // Can not find remote host in local hosts,
+ // which means this remote host need to be added.
+ if (StringUtils.isEmpty(remoteIp)) {
+ LOG.info("remote node is not ready,need wait.");
+ return;
+ }
+ try {
+ switch (nodeType) {
+ case ELECTABLE:
+ env.addFrontend(FrontendNodeType.FOLLOWER, remoteIp, remoteHostName, remotePort);
+ break;
+ case OBSERVER:
+ env.addFrontend(FrontendNodeType.OBSERVER, remoteIp, remoteHostName, remotePort);
+ break;
+ case BACKEND:
+ case BACKEND_CN:
+ List newBackends = Lists.newArrayList();
+ newBackends.add(new HostInfo(remoteIp, remoteHostName, remotePort));
+ Env.getCurrentSystemInfo().addBackends(newBackends, false);
+ break;
+ case BROKER:
+ env.getBrokerMgr().addBrokers(getBrokerName(), Lists.newArrayList(Pair.of(remoteIp, remotePort)));
+ break;
+ default:
+ break;
+ }
+ } catch (UserException e) {
+ LOG.error("Failed to add {} node: {}:{}", nodeType, remoteIp, remotePort, e);
+ }
+
+ LOG.info("Finished to add {} node: {}:{}", nodeType, remoteIp, remotePort);
}
// Get host port pair from pair list. Return null if not found
// when hostName,compare hostname,otherwise compare ip
- private SystemInfoService.HostInfo getFromHostInfos(List hostInfos,
- SystemInfoService.HostInfo hostInfo) {
- for (SystemInfoService.HostInfo h : hostInfos) {
+ private HostInfo getFromHostInfos(List hostInfos,
+ HostInfo hostInfo) {
+ for (HostInfo h : hostInfos) {
if (StringUtils.isEmpty(hostInfo.hostName) || StringUtils.isEmpty(h.hostName)) {
if (hostInfo.getIp().equals(h.getIp()) && hostInfo.getPort() == (h.getPort())) {
return hostInfo;
@@ -669,40 +589,106 @@ public class DeployManager extends MasterDaemon {
return null;
}
- private List convertFesToHostInfos(List frontends) {
- List hostPortPair = Lists.newArrayList();
+ private List convertFesToHostInfos(List frontends) {
+ List hostPortPair = Lists.newArrayList();
for (Frontend fe : frontends) {
hostPortPair.add(convertToHostInfo(fe));
}
return hostPortPair;
}
- private List convertBesToHostInfos(List backends) {
- List hostPortPair = Lists.newArrayList();
+ private List convertBrokersToHostInfos(List brokers) {
+ List hostPortPair = Lists.newArrayList();
+ for (FsBroker broker : brokers) {
+ hostPortPair.add(convertToHostInfo(broker));
+ }
+ return hostPortPair;
+ }
+
+ private List convertBesToHostInfos(List backends) {
+ List hostPortPair = Lists.newArrayList();
for (Backend fe : backends) {
hostPortPair.add(convertToHostInfo(fe));
}
return hostPortPair;
}
- private SystemInfoService.HostInfo convertToHostInfo(Frontend frontend) {
- return new SystemInfoService.HostInfo(frontend.getIp(), frontend.getHostName(), frontend.getEditLogPort());
+ private HostInfo convertToHostInfo(Frontend frontend) {
+ return new HostInfo(frontend.getIp(), frontend.getHostName(), frontend.getEditLogPort());
}
- private SystemInfoService.HostInfo convertToHostInfo(Backend backend) {
- return new SystemInfoService.HostInfo(backend.getIp(), backend.getHostName(), backend.getHeartbeatPort());
+ private HostInfo convertToHostInfo(FsBroker broker) {
+ return new HostInfo(broker.ip, null, broker.port);
}
- private boolean isSelf(SystemInfoService.HostInfo hostInfo) {
- if (Config.edit_log_port == hostInfo.getPort()) {
- // master host name may not same as local host name, so we should compare ip here
- if (env.getMasterHostName() != null && env.getMasterHostName().equals(hostInfo.getHostName())) {
- return true;
- }
- if (env.getMasterIp().equals(hostInfo.getIp())) {
- return true;
- }
+ private HostInfo convertToHostInfo(Backend backend) {
+ return new HostInfo(backend.getIp(), backend.getHostName(), backend.getHeartbeatPort());
+ }
+
+ private boolean isSelf(HostInfo hostInfo) {
+ if (env.getMasterIp().equals(hostInfo.getIp()) && Config.edit_log_port == hostInfo.getPort()) {
+ return true;
}
return false;
}
+
+ protected class Event {
+ private NodeType nodeType;
+ private List hostInfos;
+
+ public Event(NodeType nodeType, List hostInfos) {
+ this.nodeType = nodeType;
+ this.hostInfos = hostInfos;
+ }
+
+ public NodeType getNodeType() {
+ return nodeType;
+ }
+
+ public List getHostInfos() {
+ return hostInfos;
+ }
+
+ @Override
+ public String toString() {
+ return "Event{"
+ + "nodeType=" + nodeType
+ + ", hostInfos=" + hostInfos
+ + '}';
+ }
+ }
+
+ protected class NodeTypeAttr {
+ private boolean hasService;
+ private String serviceName;
+ private String subAttr1;
+
+ public NodeTypeAttr(boolean hasService) {
+ this.hasService = hasService;
+ }
+
+ public boolean hasService() {
+ return hasService;
+ }
+
+ public void setHasService(boolean hasService) {
+ this.hasService = hasService;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ public String getSubAttr1() {
+ return subAttr1;
+ }
+
+ public void setSubAttr1(String subAttr1) {
+ this.subAttr1 = subAttr1;
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/deploy/impl/AmbariDeployManager.java b/fe/fe-core/src/main/java/org/apache/doris/deploy/impl/AmbariDeployManager.java
index 824718a8d5..846decd040 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/deploy/impl/AmbariDeployManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/deploy/impl/AmbariDeployManager.java
@@ -27,7 +27,6 @@ import org.apache.doris.system.SystemInfoService;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.commons.codec.binary.Base64;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -36,7 +35,6 @@ import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import java.util.List;
-import java.util.Map;
/*
* Required env variables:
@@ -129,7 +127,7 @@ public class AmbariDeployManager extends DeployManager {
if (Strings.isNullOrEmpty(brokerConfigNode)) {
LOG.warn("can not get broker config node from env var: {}", ENV_AMBARI_BROKER_COMPONENTS_CONFIG);
- hasBrokerService = false;
+ nodeTypeAttrMap.get(NodeType.BROKER).setHasService(false);
}
LOG.info("get fe, be and broker config node name: {}, {}, {}",
@@ -197,21 +195,21 @@ public class AmbariDeployManager extends DeployManager {
}
@Override
- protected List getGroupHostInfos(String groupName) {
+ protected List getGroupHostInfos(NodeType nodeType) {
int port = -1;
- if (groupName.equalsIgnoreCase(electableFeServiceGroup)) {
+ if (nodeType == NodeType.ELECTABLE) {
port = getFeEditLogPort();
- } else if (groupName.equalsIgnoreCase(backendServiceGroup)) {
+ } else if (nodeType == NodeType.BACKEND) {
port = getBeHeartbeatPort();
} else {
- LOG.warn("unknown group name: {}", groupName);
+ LOG.warn("unknown type: {}", nodeType.name());
return null;
}
if (port == -1) {
- LOG.warn("failed to get port of component: {}", groupName);
+ LOG.warn("failed to get port of component: {}", nodeType.name());
return null;
}
-
+ String groupName = nodeTypeAttrMap.get(nodeType).getServiceName();
List hostnames = getHostnamesFromComponentsJson(groupName);
List hostPorts = Lists.newArrayListWithCapacity(hostnames.size());
for (String hostname : hostnames) {
@@ -230,35 +228,8 @@ public class AmbariDeployManager extends DeployManager {
}
@Override
- protected Map> getBrokerGroupHostInfos() {
- int port = getBrokerIpcPort();
- if (port == -1) {
- LOG.warn("failed to get port of component: {}", brokerServiceGroup);
- return null;
- }
-
- String brokerName = getPropertyFromBlueprint(brokerConfigNode, KEY_BROKER_NAME);
- if (brokerName == null) {
- return null;
- }
-
- List hostnames = getHostnamesFromComponentsJson(brokerServiceGroup);
- List hostPorts = Lists.newArrayListWithCapacity(hostnames.size());
- for (String hostname : hostnames) {
- Pair hostPort = null;
- try {
- hostPort = SystemInfoService.validateHostAndPort(hostname + ":" + port);
- } catch (AnalysisException e) {
- LOG.warn("Invalid host port format: {}:{}", hostname, port, e);
- continue;
- }
- hostPorts.add(new SystemInfoService.HostInfo(hostPort.first, null, hostPort.second));
- }
-
- Map> brokers = Maps.newHashMap();
- brokers.put(brokerName, hostPorts);
- LOG.info("get brokers from ambari: {}", brokers);
- return brokers;
+ protected String getBrokerName() {
+ return getPropertyFromBlueprint(brokerConfigNode, KEY_BROKER_NAME);
}
private Integer getFeEditLogPort() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/deploy/impl/K8sDeployManager.java b/fe/fe-core/src/main/java/org/apache/doris/deploy/impl/K8sDeployManager.java
index 65c6b44be6..7ed5ebb4d4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/deploy/impl/K8sDeployManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/deploy/impl/K8sDeployManager.java
@@ -20,23 +20,32 @@ package org.apache.doris.deploy.impl;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.deploy.DeployManager;
-import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.system.SystemInfoService.HostInfo;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import io.fabric8.kubernetes.api.model.EndpointAddress;
import io.fabric8.kubernetes.api.model.EndpointPort;
import io.fabric8.kubernetes.api.model.EndpointSubset;
import io.fabric8.kubernetes.api.model.Endpoints;
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.ServicePort;
+import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
+import jline.internal.Log;
+import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
@@ -55,8 +64,11 @@ public class K8sDeployManager extends DeployManager {
public static final String ENV_BROKER_SERVICE = "BROKER_SERVICE";
public static final String ENV_CN_SERVICE = "CN_SERVICE";
- // we arbitrarily set all broker name as what ENV_BROKER_NAME specified.
- public static final String ENV_BROKER_NAME = "BROKER_NAME";
+ public static final String ENV_FE_STATEFULSET = "FE_STATEFULSET";
+ public static final String ENV_FE_OBSERVER_STATEFULSET = "FE_OBSERVER_STATEFULSET";
+ public static final String ENV_BE_STATEFULSET = "BE_STATEFULSET";
+ public static final String ENV_BROKER_STATEFULSET = "BROKER_STATEFULSET";
+ public static final String ENV_CN_STATEFULSET = "CN_STATEFULSET";
public static final String FE_PORT = "edit-log-port"; // k8s only support -, not _
public static final String BE_PORT = "heartbeat-port";
@@ -67,7 +79,7 @@ public class K8sDeployManager extends DeployManager {
private String appNamespace;
private String domainLTD;
private KubernetesClient client = null;
-
+ private Watch statefulSetWatch = null;
// =======for test only==========
public static final String K8S_CA_CERT_FILE = "cce-ca.pem";
public static final String K8S_CLIENT_CERT_FILE = "cce-admin.pem";
@@ -79,7 +91,8 @@ public class K8sDeployManager extends DeployManager {
// =======for test only==========
public K8sDeployManager(Env env, long intervalMs) {
- super(env, intervalMs);
+ // if enable fqdn,we wait for k8s to actively push the statefulset change
+ super(env, intervalMs, Config.enable_fqdn_mode);
initEnvVariables(ENV_FE_SERVICE, ENV_FE_OBSERVER_SERVICE, ENV_BE_SERVICE, ENV_BROKER_SERVICE, ENV_CN_SERVICE);
}
@@ -105,46 +118,88 @@ public class K8sDeployManager extends DeployManager {
}
LOG.info("use domainLTD: {}", domainLTD);
+
+ //Fill NodeTypeAttr.subAttr1 with statefulName
+ //If serviceName is configured, the corresponding statefulSetName must be configured
+ for (NodeType nodeType : NodeType.values()) {
+ NodeTypeAttr nodeTypeAttr = nodeTypeAttrMap.get(nodeType);
+ if (nodeTypeAttr.hasService()) {
+ String statefulSetEnvName = getStatefulSetEnvName(nodeType);
+ Log.info("Env name of: {} is: {}", nodeType.name(), statefulSetEnvName);
+ String statefulSetName = Strings.nullToEmpty(System.getenv(statefulSetEnvName));
+ if (Strings.isNullOrEmpty(statefulSetName)) {
+ LOG.error("failed to init statefulSetName: {}", statefulSetEnvName);
+ System.exit(-1);
+ }
+ LOG.info("use statefulSetName: {}, {}", nodeType.name(), statefulSetName);
+ nodeTypeAttr.setSubAttr1(statefulSetName);
+ }
+ }
+
}
@Override
- protected List getGroupHostInfos(String groupName) {
- // 1. get namespace and port name
- String portName = null;
- if (groupName.equals(electableFeServiceGroup)) {
- portName = FE_PORT;
- } else if (groupName.equals(observerFeServiceGroup)) {
- portName = FE_PORT;
- } else if (groupName.equals(backendServiceGroup)) {
- portName = BE_PORT;
- } else if (groupName.equals(brokerServiceGroup)) {
- portName = BROKER_PORT;
- } else if (groupName.equals(cnServiceGroup)) {
- portName = BE_PORT;
- } else {
- LOG.warn("unknown service group name: {}", groupName);
- return null;
- }
- Preconditions.checkNotNull(appNamespace);
- Preconditions.checkNotNull(portName);
+ public void startListenerInternal() {
+ statefulSetWatch = getWatch(client());
+ LOG.info("Start listen statefulSets change event.");
+ }
- // 2. get endpoint
- Endpoints endpoints = null;
- try {
- endpoints = endpoints(appNamespace, groupName);
- } catch (Exception e) {
- LOG.warn("encounter exception when get endpoint from namespace {}, service: {}",
- appNamespace, groupName, e);
+ private String getStatefulSetEnvName(NodeType nodeType) {
+ switch (nodeType) {
+ case ELECTABLE:
+ return ENV_FE_STATEFULSET;
+ case OBSERVER:
+ return ENV_FE_OBSERVER_STATEFULSET;
+ case BACKEND:
+ return ENV_BE_STATEFULSET;
+ case BACKEND_CN:
+ return ENV_CN_STATEFULSET;
+ case BROKER:
+ return ENV_BROKER_STATEFULSET;
+ default:
+ return null;
+ }
+ }
+
+ @Override
+ protected List getGroupHostInfos(NodeType nodeType) {
+ if (Config.enable_fqdn_mode) {
+ return getGroupHostInfosByStatefulSet(nodeType);
+ } else {
+ return getGroupHostInfosByEndpoint(nodeType);
+ }
+ }
+
+
+ private List getGroupHostInfosByStatefulSet(NodeType nodeType) {
+ String statefulSetName = nodeTypeAttrMap.get(nodeType).getSubAttr1();
+ Preconditions.checkNotNull(statefulSetName);
+ StatefulSet statefulSet = statefulSet(appNamespace, nodeTypeAttrMap.get(nodeType).getSubAttr1());
+ if (statefulSet == null) {
+ LOG.warn("get null statefulSet in namespace {}, statefulSetName: {}", appNamespace, statefulSetName);
return null;
}
+ return getHostInfosByNum(nodeType, statefulSet.getSpec().getReplicas());
+ }
+
+ private List getGroupHostInfosByEndpoint(NodeType nodeType) {
+ // get portName
+ String portName = getPortName(nodeType);
+ Preconditions.checkNotNull(portName);
+ // get serviceName
+ String serviceName = nodeTypeAttrMap.get(nodeType).getServiceName();
+ Preconditions.checkNotNull(serviceName);
+
+ // get endpoint
+ Endpoints endpoints = endpoints(appNamespace, serviceName);
if (endpoints == null) {
// endpoints may be null if service does not exist;
- LOG.warn("get null endpoints of namespace {} in service: {}", appNamespace, groupName);
+ LOG.warn("get null endpoints of namespace: {} in service: {}", appNamespace, serviceName);
return null;
}
- // 3. get host port
- List result = Lists.newArrayList();
+ // get host port
+ List result = Lists.newArrayList();
List subsets = endpoints.getSubsets();
for (EndpointSubset subset : subsets) {
Integer port = -1;
@@ -162,18 +217,19 @@ public class K8sDeployManager extends DeployManager {
List addrs = subset.getAddresses();
for (EndpointAddress eaddr : addrs) {
- result.add(new SystemInfoService.HostInfo(eaddr.getIp(), getDomainName(eaddr.getHostname(), groupName),
- port));
+ result.add(new HostInfo(eaddr.getIp(), null, port));
}
}
- LOG.info("get host port from group: {}: {}", groupName, result);
+ LOG.info("get host port from group: {}: {}", serviceName, result);
return result;
}
- private String getDomainName(String hostName, String serviceName) {
+ //The rules for the domain name of k8s are $(podName).$(servicename).$(namespace).svc.cluster.local
+ // can see https://www.cnblogs.com/xiaokantianse/p/14267987.html#_label1_4
+ private String getDomainName(String podName, String serviceName) {
StringBuilder builder = new StringBuilder();
- builder.append(hostName);
+ builder.append(podName);
builder.append(".");
builder.append(serviceName);
builder.append(".");
@@ -183,26 +239,128 @@ public class K8sDeployManager extends DeployManager {
return builder.toString();
}
- @Override
- protected Map> getBrokerGroupHostInfos() {
- List hostPorts = getGroupHostInfos(brokerServiceGroup);
- if (hostPorts == null) {
+ private Endpoints endpoints(String namespace, String serviceName) {
+ try {
+ return client().endpoints().inNamespace(namespace).withName(serviceName).get();
+ } catch (Exception e) {
+ LOG.warn("encounter exception when get endpoint from namespace {}, service: {}",
+ appNamespace, serviceName, e);
return null;
}
- final String brokerName = System.getenv(ENV_BROKER_NAME);
- if (Strings.isNullOrEmpty(brokerName)) {
- LOG.error("failed to get broker name from env: {}", ENV_BROKER_NAME);
- System.exit(-1);
- }
- Map> brokers = Maps.newHashMap();
- brokers.put(brokerName, hostPorts);
- LOG.info("get brokers from k8s: {}", brokers);
- return brokers;
}
- private Endpoints endpoints(String namespace, String serviceName) throws Exception {
- return client().endpoints().inNamespace(namespace).withName(serviceName).get();
+ public Service service(String namespace, String serviceName) {
+ try {
+ return client().services().inNamespace(namespace).withName(serviceName).get();
+ } catch (Exception e) {
+ LOG.warn("encounter exception when get service from namespace {}, service: {}",
+ appNamespace, serviceName, e);
+ return null;
+ }
+ }
+
+ public StatefulSet statefulSet(String namespace, String statefulSetName) {
+ try {
+ return client().apps().statefulSets().inNamespace(namespace).withName(statefulSetName).get();
+ } catch (Exception e) {
+ LOG.warn("encounter exception when get statefulSet from namespace {}, statefulSet: {}",
+ appNamespace, statefulSetName, e);
+ return null;
+ }
+ }
+
+
+ private void dealEvent(String statefulsetName, Integer num) {
+ NodeType nodeType = getNodeType(statefulsetName);
+ if (nodeType == null) {
+ return;
+ }
+ List hostInfosByNum = getHostInfosByNum(nodeType, num);
+ if (hostInfosByNum == null) {
+ return;
+ }
+ Event event = new Event(nodeType, hostInfosByNum);
+ nodeChangeQueue.offer(event);
+ }
+
+ public List getHostInfosByNum(NodeType nodeType, Integer num) {
+ int servicePort = getServicePort(nodeType);
+ if (servicePort == -1) {
+ LOG.warn("get servicePort failed,{}", nodeType.name());
+ return null;
+ }
+ String statefulsetName = nodeTypeAttrMap.get(nodeType).getSubAttr1();
+ Preconditions.checkNotNull(statefulsetName);
+ String serviceName = nodeTypeAttrMap.get(nodeType).getServiceName();
+ Preconditions.checkNotNull(serviceName);
+ List hostInfos = Lists.newArrayList();
+ for (int i = 0; i < num; i++) {
+ //The podName rule of k8s is $(statefulset name) - $(sequence number)
+ //can see https://www.cnblogs.com/xiaokantianse/p/14267987.html#_label1_4
+ String domainName = getDomainName(statefulsetName + "-" + i, serviceName);
+ hostInfos.add(new HostInfo(getIpByDomain(domainName), domainName, servicePort));
+ }
+ return hostInfos;
+ }
+
+ private String getIpByDomain(String domainName) {
+ try {
+ InetAddress inetAddress = InetAddress.getByName(domainName);
+ return inetAddress.getHostAddress();
+ } catch (UnknownHostException e) {
+ LOG.info("unknown host name for domainName, {}", domainName, e);
+ return null;
+ }
+ }
+
+ private int getServicePort(NodeType nodeType) {
+ Integer port = -1;
+ String serviceName = nodeTypeAttrMap.get(nodeType).getServiceName();
+ Preconditions.checkNotNull(serviceName);
+ Service service = service(appNamespace, serviceName);
+ if (service == null) {
+ LOG.warn("get null service in namespace: {}, serviceName: {}", appNamespace, serviceName);
+ return port;
+ }
+ String portName = getPortName(nodeType);
+ Preconditions.checkNotNull(portName);
+ List ports = service.getSpec().getPorts();
+ for (ServicePort servicePort : ports) {
+ if (servicePort.getName().equals(portName)) {
+ port = servicePort.getPort();
+ break;
+ }
+ }
+ return port;
+ }
+
+ private String getPortName(NodeType nodeType) {
+ switch (nodeType) {
+ case BROKER:
+ return BROKER_PORT;
+ case ELECTABLE:
+ case OBSERVER:
+ return FE_PORT;
+ case BACKEND:
+ case BACKEND_CN:
+ return BE_PORT;
+ default:
+ return null;
+ }
+ }
+
+
+ private NodeType getNodeType(String ststefulSetName) {
+ if (StringUtils.isEmpty(ststefulSetName)) {
+ return null;
+ }
+ for (Map.Entry entry : nodeTypeAttrMap.entrySet()) {
+ if (ststefulSetName.equals(entry.getValue().getSubAttr1())) {
+ return entry.getKey();
+ }
+ }
+ return null;
}
private synchronized KubernetesClient client() {
@@ -230,7 +388,32 @@ public class K8sDeployManager extends DeployManager {
return client;
}
+ private Watch getWatch(KubernetesClient client) {
+ return client.apps().statefulSets().inNamespace(appNamespace).watch(new Watcher() {
+
+ @Override
+ public void onClose(WatcherException e) {
+ LOG.warn("Watch error received: {}.", e.getMessage());
+ }
+
+ @Override
+ public void eventReceived(Action action, StatefulSet statefulSet) {
+ LOG.info("Watch event received {}: {}: {}", action.name(), statefulSet.getMetadata().getName(),
+ statefulSet.getSpec().getReplicas());
+ dealEvent(statefulSet.getMetadata().getName(), statefulSet.getSpec().getReplicas());
+ }
+
+ @Override
+ public void onClose() {
+ LOG.info("Watch gracefully closed.");
+ }
+ });
+ }
+
public void close() {
+ if (statefulSetWatch != null) {
+ statefulSetWatch.close();
+ }
if (client != null) {
client.close();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/deploy/impl/LocalFileDeployManager.java b/fe/fe-core/src/main/java/org/apache/doris/deploy/impl/LocalFileDeployManager.java
index 1bbd7258ba..f26ec0eaed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/deploy/impl/LocalFileDeployManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/deploy/impl/LocalFileDeployManager.java
@@ -25,7 +25,6 @@ import org.apache.doris.system.SystemInfoService;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -37,7 +36,6 @@ import java.io.InputStreamReader;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.List;
-import java.util.Map;
/*
* This for Boxer2 Baidu BCC agent
@@ -56,8 +54,6 @@ public class LocalFileDeployManager extends DeployManager {
public static final String ENV_BROKER_SERVICE = "BROKER_SERVICE";
public static final String ENV_CN_SERVICE = "CN_SERVICE";
- public static final String ENV_BROKER_NAME = "BROKER_NAME";
-
private String clusterInfoFile;
public LocalFileDeployManager(Env env, long intervalMs) {
@@ -83,7 +79,8 @@ public class LocalFileDeployManager extends DeployManager {
}
@Override
- public List getGroupHostInfos(String groupName) {
+ public List getGroupHostInfos(NodeType nodeType) {
+ String groupName = nodeTypeAttrMap.get(nodeType).getServiceName();
List result = Lists.newArrayList();
LOG.info("begin to get group: {} from file: {}", groupName, clusterInfoFile);
@@ -153,22 +150,4 @@ public class LocalFileDeployManager extends DeployManager {
LOG.info("get hosts from {}: {}", groupName, result);
return result;
}
-
- @Override
- protected Map> getBrokerGroupHostInfos() {
- List hostPorts = getGroupHostInfos(brokerServiceGroup);
- if (hostPorts == null) {
- return null;
- }
- final String brokerName = System.getenv(ENV_BROKER_NAME);
- if (Strings.isNullOrEmpty(brokerName)) {
- LOG.error("failed to get broker name from env: {}", ENV_BROKER_NAME);
- System.exit(-1);
- }
-
- Map> brokers = Maps.newHashMap();
- brokers.put(brokerName, hostPorts);
- LOG.info("get brokers from file: {}", brokers);
- return brokers;
- }
}
diff --git a/fe/pom.xml b/fe/pom.xml
index 2a645bd309..7cecdc58fb 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -207,7 +207,6 @@ under the License.
5.12.2
4.7.2
4.7.2
- 3.4.2
2.6.0
1.15
1.1.0.Final
@@ -629,12 +628,6 @@ under the License.
okhttp
${okhttp.version}
-
-
- com.squareup.okhttp3
- okhttp-ws
- ${okhttp-ws.version}
-
com.squareup.okio