This reverts commit 48afd77e37d63e2989cd85ab12b39a273fcd284e. There is meta problem
This commit is contained in:
@ -54,11 +54,9 @@ public final class FeMetaVersion {
|
||||
public static final int VERSION_116 = 116;
|
||||
// add user and comment to load job
|
||||
public static final int VERSION_117 = 117;
|
||||
// change frontend meta to json, add hostname to MasterInfo
|
||||
public static final int VERSION_118 = 118;
|
||||
|
||||
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
|
||||
public static final int VERSION_CURRENT = VERSION_118;
|
||||
|
||||
public static final int VERSION_CURRENT = VERSION_117;
|
||||
|
||||
// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
|
||||
// if (FE_METAVERSION < VERSION_94) ...
|
||||
|
||||
@ -153,20 +153,16 @@ public class SystemHandler extends AlterHandler {
|
||||
|
||||
} else if (alterClause instanceof AddObserverClause) {
|
||||
AddObserverClause clause = (AddObserverClause) alterClause;
|
||||
Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER, clause.getIp(), clause.getHostName(),
|
||||
clause.getPort());
|
||||
Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER, clause.getHost(), clause.getPort());
|
||||
} else if (alterClause instanceof DropObserverClause) {
|
||||
DropObserverClause clause = (DropObserverClause) alterClause;
|
||||
Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER, clause.getIp(), clause.getHostName(),
|
||||
clause.getPort());
|
||||
Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER, clause.getHost(), clause.getPort());
|
||||
} else if (alterClause instanceof AddFollowerClause) {
|
||||
AddFollowerClause clause = (AddFollowerClause) alterClause;
|
||||
Env.getCurrentEnv().addFrontend(FrontendNodeType.FOLLOWER, clause.getIp(), clause.getHostName(),
|
||||
clause.getPort());
|
||||
Env.getCurrentEnv().addFrontend(FrontendNodeType.FOLLOWER, clause.getHost(), clause.getPort());
|
||||
} else if (alterClause instanceof DropFollowerClause) {
|
||||
DropFollowerClause clause = (DropFollowerClause) alterClause;
|
||||
Env.getCurrentEnv().dropFrontend(FrontendNodeType.FOLLOWER, clause.getIp(), clause.getHostName(),
|
||||
clause.getPort());
|
||||
Env.getCurrentEnv().dropFrontend(FrontendNodeType.FOLLOWER, clause.getHost(), clause.getPort());
|
||||
} else if (alterClause instanceof ModifyBrokerClause) {
|
||||
ModifyBrokerClause clause = (ModifyBrokerClause) alterClause;
|
||||
Env.getCurrentEnv().getBrokerMgr().execute(clause);
|
||||
|
||||
@ -22,11 +22,11 @@ import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.ha.FrontendNodeType;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.system.SystemInfoService.HostInfo;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
@ -36,8 +36,7 @@ import java.util.Map;
|
||||
|
||||
public class FrontendClause extends AlterClause {
|
||||
protected String hostPort;
|
||||
protected String ip;
|
||||
protected String hostName;
|
||||
protected String host;
|
||||
protected int port;
|
||||
protected FrontendNodeType role;
|
||||
|
||||
@ -47,12 +46,8 @@ public class FrontendClause extends AlterClause {
|
||||
this.role = role;
|
||||
}
|
||||
|
||||
public String getIp() {
|
||||
return ip;
|
||||
}
|
||||
|
||||
public String getHostName() {
|
||||
return hostName;
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
@ -66,11 +61,10 @@ public class FrontendClause extends AlterClause {
|
||||
analyzer.getQualifiedUser());
|
||||
}
|
||||
|
||||
HostInfo pair = SystemInfoService.getIpHostAndPort(hostPort, true);
|
||||
this.ip = pair.getIp();
|
||||
this.hostName = pair.getHostName();
|
||||
this.port = pair.getPort();
|
||||
Preconditions.checkState(!Strings.isNullOrEmpty(ip));
|
||||
Pair<String, Integer> pair = SystemInfoService.validateHostAndPort(hostPort);
|
||||
this.host = pair.first;
|
||||
this.port = pair.second;
|
||||
Preconditions.checkState(!Strings.isNullOrEmpty(host));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -220,7 +220,6 @@ import org.apache.doris.system.FQDNManager;
|
||||
import org.apache.doris.system.Frontend;
|
||||
import org.apache.doris.system.HeartbeatMgr;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.system.SystemInfoService.HostInfo;
|
||||
import org.apache.doris.task.AgentBatchTask;
|
||||
import org.apache.doris.task.AgentTaskExecutor;
|
||||
import org.apache.doris.task.CompactionTask;
|
||||
@ -355,7 +354,6 @@ public class Env {
|
||||
private int masterRpcPort;
|
||||
private int masterHttpPort;
|
||||
private String masterIp;
|
||||
private String masterHostName;
|
||||
|
||||
private MetaIdGenerator idGenerator = new MetaIdGenerator(NEXT_ID_INIT_VALUE);
|
||||
|
||||
@ -368,8 +366,8 @@ public class Env {
|
||||
private static Env CHECKPOINT = null;
|
||||
private static long checkpointThreadId = -1;
|
||||
private Checkpoint checkpointer;
|
||||
private List<HostInfo> helperNodes = Lists.newArrayList();
|
||||
private HostInfo selfNode = null;
|
||||
private List<Pair<String, Integer>> helperNodes = Lists.newArrayList();
|
||||
private Pair<String, Integer> selfNode = null;
|
||||
|
||||
// node name -> Frontend
|
||||
private ConcurrentHashMap<String, Frontend> frontends;
|
||||
@ -928,8 +926,7 @@ public class Env {
|
||||
// For compatibility. Because this is the very first time to start, so we arbitrarily choose
|
||||
// a new name for this node
|
||||
role = FrontendNodeType.FOLLOWER;
|
||||
nodeName = genFeNodeName(Config.enable_fqdn_mode ? selfNode.getIdent() : selfNode.getIp(),
|
||||
selfNode.getPort(), false /* new style */);
|
||||
nodeName = genFeNodeName(selfNode.first, selfNode.second, false /* new style */);
|
||||
storage.writeFrontendRoleAndNodeName(role, nodeName);
|
||||
LOG.info("very first time to start this node. role: {}, node name: {}", role.name(), nodeName);
|
||||
} else {
|
||||
@ -945,13 +942,24 @@ public class Env {
|
||||
// But we will get a empty nodeName after upgrading.
|
||||
// So for forward compatibility, we use the "old-style" way of naming: "ip_port",
|
||||
// and update the ROLE file.
|
||||
nodeName = genFeNodeName(selfNode.getIp(), selfNode.getPort(), true/* old style */);
|
||||
nodeName = genFeNodeName(selfNode.first, selfNode.second, true/* old style */);
|
||||
storage.writeFrontendRoleAndNodeName(role, nodeName);
|
||||
LOG.info("forward compatibility. role: {}, node name: {}", role.name(), nodeName);
|
||||
} else {
|
||||
// nodeName should be like "192.168.1.1_9217_1620296111213"
|
||||
// and the selfNode should be the prefix of nodeName.
|
||||
// If not, it means that the ip used last time is different from this time, which is not allowed.
|
||||
// But is metadata_failure_recovery is true,
|
||||
// we will not check it because this may be a FE migration.
|
||||
String[] split = nodeName.split("_");
|
||||
if (Config.metadata_failure_recovery.equals("false") && !selfNode.first.equalsIgnoreCase(
|
||||
split[0])) {
|
||||
throw new IOException(
|
||||
"the self host " + selfNode.first + " does not equal to the host in ROLE" + " file "
|
||||
+ split[0] + ". You need to set 'priority_networks' config"
|
||||
+ " in fe.conf to match the host " + split[0]);
|
||||
}
|
||||
}
|
||||
// Notice:
|
||||
// With the introduction of FQDN, the nodeName is no longer bound to an IP address,
|
||||
// so consistency is no longer checked here. Otherwise, the startup will fail.
|
||||
}
|
||||
|
||||
Preconditions.checkNotNull(role);
|
||||
@ -964,8 +972,7 @@ public class Env {
|
||||
storage.writeClusterIdAndToken();
|
||||
|
||||
isFirstTimeStartUp = true;
|
||||
Frontend self = new Frontend(role, nodeName, selfNode.getIp(), selfNode.getHostName(),
|
||||
selfNode.getPort());
|
||||
Frontend self = new Frontend(role, nodeName, selfNode.first, selfNode.second);
|
||||
// We don't need to check if frontends already contains self.
|
||||
// frontends must be empty cause no image is loaded and no journal is replayed yet.
|
||||
// And this frontend will be persisted later after opening bdbje environment.
|
||||
@ -1009,7 +1016,7 @@ public class Env {
|
||||
Preconditions.checkNotNull(role);
|
||||
Preconditions.checkNotNull(nodeName);
|
||||
|
||||
HostInfo rightHelperNode = helperNodes.get(0);
|
||||
Pair<String, Integer> rightHelperNode = helperNodes.get(0);
|
||||
|
||||
Storage storage = new Storage(this.imageDir);
|
||||
if (roleFile.exists() && (role != storage.getRole() || !nodeName.equals(storage.getNodeName()))
|
||||
@ -1019,8 +1026,8 @@ public class Env {
|
||||
if (!versionFile.exists()) {
|
||||
// If the version file doesn't exist, download it from helper node
|
||||
if (!getVersionFileFromHelper(rightHelperNode)) {
|
||||
throw new IOException("fail to download version file from "
|
||||
+ rightHelperNode.getIp() + " will exit.");
|
||||
throw new IOException(
|
||||
"fail to download version file from " + rightHelperNode.first + " will exit.");
|
||||
}
|
||||
|
||||
// NOTE: cluster_id will be init when Storage object is constructed,
|
||||
@ -1037,7 +1044,7 @@ public class Env {
|
||||
clusterId = storage.getClusterID();
|
||||
token = storage.getToken();
|
||||
try {
|
||||
URL idURL = new URL("http://" + NetUtils.getHostPortInAccessibleFormat(rightHelperNode.getIp(), Config.http_port) + "/check");
|
||||
URL idURL = new URL("http://" + NetUtils.getHostPortInAccessibleFormat(rightHelperNode.first, Config.http_port) + "/check");
|
||||
HttpURLConnection conn = null;
|
||||
conn = (HttpURLConnection) idURL.openConnection();
|
||||
conn.setConnectTimeout(2 * 1000);
|
||||
@ -1045,8 +1052,7 @@ public class Env {
|
||||
String clusterIdString = conn.getHeaderField(MetaBaseAction.CLUSTER_ID);
|
||||
int remoteClusterId = Integer.parseInt(clusterIdString);
|
||||
if (remoteClusterId != clusterId) {
|
||||
LOG.error("cluster id is not equal with helper node {}. will exit.",
|
||||
rightHelperNode.getIp());
|
||||
LOG.error("cluster id is not equal with helper node {}. will exit.", rightHelperNode.first);
|
||||
System.exit(-1);
|
||||
}
|
||||
String remoteToken = conn.getHeaderField(MetaBaseAction.TOKEN);
|
||||
@ -1061,8 +1067,7 @@ public class Env {
|
||||
Preconditions.checkNotNull(remoteToken);
|
||||
if (!token.equals(remoteToken)) {
|
||||
throw new IOException(
|
||||
"token is not equal with helper node "
|
||||
+ rightHelperNode.getIp() + ". will exit.");
|
||||
"token is not equal with helper node " + rightHelperNode.first + ". will exit.");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@ -1084,8 +1089,7 @@ public class Env {
|
||||
}
|
||||
|
||||
Preconditions.checkState(helperNodes.size() == 1);
|
||||
LOG.info("finished to get cluster id: {}, isElectable: {}, role: {} and node name: {}",
|
||||
clusterId, isElectable, role.name(), nodeName);
|
||||
LOG.info("finished to get cluster id: {}, role: {} and node name: {}", clusterId, role.name(), nodeName);
|
||||
}
|
||||
|
||||
public static String genFeNodeName(String host, int port, boolean isOldStyle) {
|
||||
@ -1102,14 +1106,12 @@ public class Env {
|
||||
private boolean getFeNodeTypeAndNameFromHelpers() {
|
||||
// we try to get info from helper nodes, once we get the right helper node,
|
||||
// other helper nodes will be ignored and removed.
|
||||
HostInfo rightHelperNode = null;
|
||||
for (HostInfo helperNode : helperNodes) {
|
||||
Pair<String, Integer> rightHelperNode = null;
|
||||
for (Pair<String, Integer> helperNode : helperNodes) {
|
||||
try {
|
||||
// For upgrade compatibility, the host parameter name remains the same
|
||||
// and the new hostname parameter is added
|
||||
URL url = new URL("http://" + NetUtils.getHostPortInAccessibleFormat(helperNode.getIp(), Config.http_port)
|
||||
+ "/role?host=" + selfNode.getIp() + "&hostname=" + selfNode.getHostName()
|
||||
+ "&port=" + selfNode.getPort());
|
||||
URL url = new URL("http://" + NetUtils.getHostPortInAccessibleFormat(helperNode.first, Config.http_port)
|
||||
+ "/role?host=" + selfNode.first
|
||||
+ "&port=" + selfNode.second);
|
||||
HttpURLConnection conn = null;
|
||||
conn = (HttpURLConnection) url.openConnection();
|
||||
if (conn.getResponseCode() != 200) {
|
||||
@ -1136,15 +1138,14 @@ public class Env {
|
||||
|
||||
if (Strings.isNullOrEmpty(nodeName)) {
|
||||
// For forward compatibility, we use old-style name: "ip_port"
|
||||
nodeName = genFeNodeName(selfNode.getIp(), selfNode.getPort(), true /* old style */);
|
||||
nodeName = genFeNodeName(selfNode.first, selfNode.second, true /* old style */);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failed to get fe node type from helper node: {}.", helperNode, e);
|
||||
continue;
|
||||
}
|
||||
|
||||
LOG.info("get fe node type {}, name {} from {}:{}:{}", role, nodeName,
|
||||
helperNode.getHostName(), helperNode.getIp(), Config.http_port);
|
||||
LOG.info("get fe node type {}, name {} from {}:{}", role, nodeName, helperNode.first, Config.http_port);
|
||||
rightHelperNode = helperNode;
|
||||
break;
|
||||
}
|
||||
@ -1158,22 +1159,8 @@ public class Env {
|
||||
return true;
|
||||
}
|
||||
|
||||
private void getSelfHostPort() throws Exception {
|
||||
String hostName = FrontendOptions.getHostname();
|
||||
if (hostName.equals(FrontendOptions.getLocalHostAddress())) {
|
||||
if (Config.enable_fqdn_mode) {
|
||||
LOG.fatal("Can't get hostname in FQDN mode. Please check your network configuration."
|
||||
+ " got hostname: {}, ip: {}",
|
||||
hostName, FrontendOptions.getLocalHostAddress());
|
||||
throw new Exception("Can't get hostname in FQDN mode. Please check your network configuration."
|
||||
+ " got hostname: " + hostName + ", ip: " + FrontendOptions.getLocalHostAddress());
|
||||
} else {
|
||||
// hostName should be real hostname, not ip
|
||||
hostName = null;
|
||||
}
|
||||
}
|
||||
selfNode = new HostInfo(FrontendOptions.getLocalHostAddress(), hostName,
|
||||
Config.edit_log_port);
|
||||
private void getSelfHostPort() {
|
||||
selfNode = Pair.of(FrontendOptions.getLocalHostAddress(), Config.edit_log_port);
|
||||
LOG.debug("get self node: {}", selfNode);
|
||||
}
|
||||
|
||||
@ -1206,8 +1193,8 @@ public class Env {
|
||||
if (helpers != null) {
|
||||
String[] splittedHelpers = helpers.split(",");
|
||||
for (String helper : splittedHelpers) {
|
||||
HostInfo helperHostPort = SystemInfoService.getIpHostAndPort(helper, true);
|
||||
if (helperHostPort.isSame(selfNode)) {
|
||||
Pair<String, Integer> helperHostPort = SystemInfoService.validateHostAndPort(helper);
|
||||
if (helperHostPort.equals(selfNode)) {
|
||||
/**
|
||||
* If user specified the helper node to this FE itself,
|
||||
* we will stop the starting FE process and report an error.
|
||||
@ -1224,7 +1211,7 @@ public class Env {
|
||||
}
|
||||
} else {
|
||||
// If helper node is not designated, use local node as helper node.
|
||||
helperNodes.add(new HostInfo(selfNode.getIp(), selfNode.getHostName(), Config.edit_log_port));
|
||||
helperNodes.add(Pair.of(selfNode.first, Config.edit_log_port));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1247,8 +1234,7 @@ public class Env {
|
||||
// This is not the first time this node start up.
|
||||
// It should already added to FE group, just set helper node as it self.
|
||||
LOG.info("role file exist. this is not the first time to start up");
|
||||
helperNodes = Lists.newArrayList(new HostInfo(selfNode.getIp(), selfNode.getHostName(),
|
||||
Config.edit_log_port));
|
||||
helperNodes = Lists.newArrayList(Pair.of(selfNode.first, Config.edit_log_port));
|
||||
return;
|
||||
}
|
||||
|
||||
@ -1323,11 +1309,10 @@ public class Env {
|
||||
|
||||
// MUST set master ip before starting checkpoint thread.
|
||||
// because checkpoint thread need this info to select non-master FE to push image
|
||||
this.masterIp = Env.getCurrentEnv().getSelfNode().getIp();
|
||||
this.masterHostName = Env.getCurrentEnv().getSelfNode().getHostName();
|
||||
this.masterIp = FrontendOptions.getLocalHostAddress();
|
||||
this.masterRpcPort = Config.rpc_port;
|
||||
this.masterHttpPort = Config.http_port;
|
||||
MasterInfo info = new MasterInfo(this.masterIp, this.masterHostName, this.masterHttpPort, this.masterRpcPort);
|
||||
MasterInfo info = new MasterInfo(this.masterIp, this.masterHttpPort, this.masterRpcPort);
|
||||
editLog.logMasterInfo(info);
|
||||
|
||||
// for master, the 'isReady' is set behind.
|
||||
@ -1476,7 +1461,7 @@ public class Env {
|
||||
if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
|
||||
for (Frontend fe : frontends.values()) {
|
||||
if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) {
|
||||
((BDBHA) getHaProtocol()).addHelperSocket(fe.getIp(), fe.getEditLogPort());
|
||||
((BDBHA) getHaProtocol()).addHelperSocket(fe.getHost(), fe.getEditLogPort());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1546,11 +1531,11 @@ public class Env {
|
||||
return;
|
||||
}
|
||||
|
||||
Frontend fe = checkFeExist(selfNode.getIp(), selfNode.getHostName(), selfNode.getPort());
|
||||
Frontend fe = checkFeExist(selfNode.first, selfNode.second);
|
||||
if (fe == null) {
|
||||
LOG.error("current node {}:{}:{} is not added to the cluster, will exit."
|
||||
LOG.error("current node {}:{} is not added to the cluster, will exit."
|
||||
+ " Your FE IP maybe changed, please set 'priority_networks' config in fe.conf properly.",
|
||||
selfNode.getHostName(), selfNode.getIp(), selfNode.getPort());
|
||||
selfNode.first, selfNode.second);
|
||||
System.exit(-1);
|
||||
} else if (fe.getRole() != role) {
|
||||
LOG.error("current node role is {} not match with frontend recorded role {}. will exit", role,
|
||||
@ -1568,9 +1553,9 @@ public class Env {
|
||||
}
|
||||
}
|
||||
|
||||
private boolean getVersionFileFromHelper(HostInfo helperNode) throws IOException {
|
||||
private boolean getVersionFileFromHelper(Pair<String, Integer> helperNode) throws IOException {
|
||||
try {
|
||||
String url = "http://" + NetUtils.getHostPortInAccessibleFormat(helperNode.getIp(), Config.http_port) + "/version";
|
||||
String url = "http://" + NetUtils.getHostPortInAccessibleFormat(helperNode.first, Config.http_port) + "/version";
|
||||
File dir = new File(this.imageDir);
|
||||
MetaHelper.getRemoteFile(url, HTTP_TIMEOUT_SECOND * 1000,
|
||||
MetaHelper.getOutputStream(Storage.VERSION_FILE, dir));
|
||||
@ -1583,13 +1568,13 @@ public class Env {
|
||||
return false;
|
||||
}
|
||||
|
||||
private void getNewImage(HostInfo helperNode) throws IOException {
|
||||
private void getNewImage(Pair<String, Integer> helperNode) throws IOException {
|
||||
long localImageVersion = 0;
|
||||
Storage storage = new Storage(this.imageDir);
|
||||
localImageVersion = storage.getLatestImageSeq();
|
||||
|
||||
try {
|
||||
String hostPort = NetUtils.getHostPortInAccessibleFormat(helperNode.getIp(), Config.http_port);
|
||||
String hostPort = NetUtils.getHostPortInAccessibleFormat(helperNode.first, Config.http_port);
|
||||
URL infoUrl = new URL("http://" + hostPort + "/info");
|
||||
StorageInfo info = getStorageInfo(infoUrl);
|
||||
long version = info.getImageSeq();
|
||||
@ -1612,11 +1597,10 @@ public class Env {
|
||||
Preconditions.checkNotNull(selfNode);
|
||||
Preconditions.checkNotNull(helperNodes);
|
||||
LOG.debug("self: {}. helpers: {}", selfNode, helperNodes);
|
||||
// if helper nodes contain itself, remove other helpers
|
||||
// if helper nodes contain it self, remove other helpers
|
||||
boolean containSelf = false;
|
||||
for (HostInfo helperNode : helperNodes) {
|
||||
// WARN: cannot use equals() here, because the hostname may not equal to helperNode.getHostName()
|
||||
if (selfNode.isSame(helperNode)) {
|
||||
for (Pair<String, Integer> helperNode : helperNodes) {
|
||||
if (selfNode.equals(helperNode)) {
|
||||
containSelf = true;
|
||||
break;
|
||||
}
|
||||
@ -2468,103 +2452,58 @@ public class Env {
|
||||
};
|
||||
}
|
||||
|
||||
public void addFrontend(FrontendNodeType role, String ip, String hostname, int editLogPort) throws DdlException {
|
||||
public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException {
|
||||
if (!tryLock(false)) {
|
||||
throw new DdlException("Failed to acquire catalog lock. Try again");
|
||||
}
|
||||
try {
|
||||
Frontend fe = checkFeExist(ip, hostname, editLogPort);
|
||||
Frontend fe = checkFeExist(host, editLogPort);
|
||||
if (fe != null) {
|
||||
throw new DdlException("frontend already exists " + fe);
|
||||
}
|
||||
if (Config.enable_fqdn_mode && StringUtils.isEmpty(hostname)) {
|
||||
throw new DdlException("frontend's hostName should not be empty while enable_fqdn_mode is true");
|
||||
}
|
||||
String host = hostname != null && Config.enable_fqdn_mode ? hostname : ip;
|
||||
|
||||
String nodeName = genFeNodeName(host, editLogPort, false /* new name style */);
|
||||
|
||||
if (removedFrontends.contains(nodeName)) {
|
||||
throw new DdlException("frontend name already exists " + nodeName + ". Try again");
|
||||
}
|
||||
|
||||
fe = new Frontend(role, nodeName, ip, hostname, editLogPort);
|
||||
fe = new Frontend(role, nodeName, host, editLogPort);
|
||||
frontends.put(nodeName, fe);
|
||||
BDBHA bdbha = (BDBHA) haProtocol;
|
||||
if (role == FrontendNodeType.FOLLOWER || role == FrontendNodeType.REPLICA) {
|
||||
bdbha.addHelperSocket(ip, editLogPort);
|
||||
helperNodes.add(new HostInfo(ip, hostname, editLogPort));
|
||||
bdbha.addHelperSocket(host, editLogPort);
|
||||
helperNodes.add(Pair.of(host, editLogPort));
|
||||
bdbha.addUnReadyElectableNode(nodeName, getFollowerCount());
|
||||
}
|
||||
bdbha.removeConflictNodeIfExist(ip, editLogPort);
|
||||
bdbha.removeConflictNodeIfExist(host, editLogPort);
|
||||
editLog.logAddFrontend(fe);
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void modifyFrontendIp(String nodeName, String destIp) throws DdlException {
|
||||
modifyFrontendHost(nodeName, destIp, null);
|
||||
}
|
||||
|
||||
public void modifyFrontendHostName(String nodeName, String destHostName) throws DdlException {
|
||||
modifyFrontendHost(nodeName, null, destHostName);
|
||||
}
|
||||
|
||||
public void modifyFrontendHost(String nodeName, String destIp, String destHostName) throws DdlException {
|
||||
if (!tryLock(false)) {
|
||||
throw new DdlException("Failed to acquire catalog lock. Try again");
|
||||
}
|
||||
try {
|
||||
Frontend fe = getFeByName(nodeName);
|
||||
if (fe == null) {
|
||||
throw new DdlException("frontend does not exist, nodeName:" + nodeName);
|
||||
}
|
||||
boolean needLog = false;
|
||||
// we use hostname as address of bdbha, so we not need to update node address when ip changed
|
||||
if (destIp != null && !destIp.equals(fe.getIp())) {
|
||||
fe.setIp(destIp);
|
||||
needLog = true;
|
||||
}
|
||||
if (destHostName != null && !destHostName.equals(fe.getHostName())) {
|
||||
fe.setHostName(destHostName);
|
||||
BDBHA bdbha = (BDBHA) haProtocol;
|
||||
bdbha.updateNodeAddress(fe.getNodeName(), destHostName, fe.getEditLogPort());
|
||||
needLog = true;
|
||||
}
|
||||
if (needLog) {
|
||||
Env.getCurrentEnv().getEditLog().logModifyFrontend(fe);
|
||||
}
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void dropFrontend(FrontendNodeType role, String ip, String hostname, int port) throws DdlException {
|
||||
if (port == selfNode.getPort() && feType == FrontendNodeType.MASTER
|
||||
&& ((selfNode.getHostName() != null && selfNode.getHostName().equals(hostname))
|
||||
|| ip.equals(selfNode.getIp()))) {
|
||||
public void dropFrontend(FrontendNodeType role, String host, int port) throws DdlException {
|
||||
if (host.equals(selfNode.first) && port == selfNode.second && feType == FrontendNodeType.MASTER) {
|
||||
throw new DdlException("can not drop current master node.");
|
||||
}
|
||||
if (!tryLock(false)) {
|
||||
throw new DdlException("Failed to acquire catalog lock. Try again");
|
||||
}
|
||||
try {
|
||||
Frontend fe = checkFeExist(ip, hostname, port);
|
||||
Frontend fe = checkFeExist(host, port);
|
||||
if (fe == null) {
|
||||
throw new DdlException("frontend does not exist[" + ip + ":" + port + "]");
|
||||
throw new DdlException("frontend does not exist[" + host + ":" + port + "]");
|
||||
}
|
||||
if (fe.getRole() != role) {
|
||||
throw new DdlException(role.toString() + " does not exist[" + ip + ":" + port + "]");
|
||||
throw new DdlException(role.toString() + " does not exist[" + host + ":" + port + "]");
|
||||
}
|
||||
frontends.remove(fe.getNodeName());
|
||||
removedFrontends.add(fe.getNodeName());
|
||||
|
||||
if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) {
|
||||
haProtocol.removeElectableNode(fe.getNodeName());
|
||||
// ip may be changed, so we need use both ip and hostname to check.
|
||||
// use node.getIdent() for simplicity here.
|
||||
helperNodes.removeIf(node -> (node.getIp().equals(ip)
|
||||
|| node.getIdent().equals(hostname)) && node.getPort() == port);
|
||||
helperNodes.remove(Pair.of(host, port));
|
||||
BDBHA ha = (BDBHA) haProtocol;
|
||||
ha.removeUnReadyElectableNode(nodeName, getFollowerCount());
|
||||
}
|
||||
@ -2574,25 +2513,22 @@ public class Env {
|
||||
}
|
||||
}
|
||||
|
||||
public Frontend checkFeExist(String ip, String hostName, int port) {
|
||||
public Frontend checkFeExist(String host, int port) {
|
||||
for (Frontend fe : frontends.values()) {
|
||||
if (fe.getEditLogPort() != port) {
|
||||
continue;
|
||||
}
|
||||
if (fe.getIp().equals(ip) || (fe.getHostName() != null && fe.getHostName().equals(hostName))) {
|
||||
if (fe.getHost().equals(host) && fe.getEditLogPort() == port) {
|
||||
return fe;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public Frontend getFeByIp(String ip) {
|
||||
public Frontend getFeByHost(String host) {
|
||||
for (Frontend fe : frontends.values()) {
|
||||
InetAddress hostAddr = null;
|
||||
InetAddress feAddr = null;
|
||||
try {
|
||||
hostAddr = InetAddress.getByName(ip);
|
||||
feAddr = InetAddress.getByName(fe.getIp());
|
||||
hostAddr = InetAddress.getByName(host);
|
||||
feAddr = InetAddress.getByName(fe.getHost());
|
||||
} catch (UnknownHostException e) {
|
||||
LOG.warn("get address failed: {}", e.getMessage());
|
||||
return null;
|
||||
@ -3273,7 +3209,7 @@ public class Env {
|
||||
public void replayAddFrontend(Frontend fe) {
|
||||
tryLock(true);
|
||||
try {
|
||||
Frontend existFe = checkFeExist(fe.getIp(), fe.getHostName(), fe.getEditLogPort());
|
||||
Frontend existFe = checkFeExist(fe.getHost(), fe.getEditLogPort());
|
||||
if (existFe != null) {
|
||||
LOG.warn("fe {} already exist.", existFe);
|
||||
if (existFe.getRole() != fe.getRole()) {
|
||||
@ -3296,33 +3232,13 @@ public class Env {
|
||||
// DO NOT add helper sockets here, cause BDBHA is not instantiated yet.
|
||||
// helper sockets will be added after start BDBHA
|
||||
// But add to helperNodes, just for show
|
||||
helperNodes.add(new HostInfo(fe.getIp(), fe.getHostName(), fe.getEditLogPort()));
|
||||
helperNodes.add(Pair.of(fe.getHost(), fe.getEditLogPort()));
|
||||
}
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void replayModifyFrontend(Frontend fe) {
|
||||
tryLock(true);
|
||||
try {
|
||||
Frontend existFe = getFeByName(fe.getNodeName());
|
||||
if (existFe == null) {
|
||||
// frontend may already be dropped. this may happen when
|
||||
// drop and modify operations do not guarantee the order.
|
||||
return;
|
||||
}
|
||||
// modify fe in frontends
|
||||
existFe.setIp(fe.getIp());
|
||||
existFe.setHostName(fe.getHostName());
|
||||
// modify fe in helperNodes
|
||||
helperNodes.stream().filter(n -> n.getHostName() != null && n.getHostName().equals(fe.getHostName()))
|
||||
.forEach(n -> n.ip = fe.getIp());
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void replayDropFrontend(Frontend frontend) {
|
||||
tryLock(true);
|
||||
try {
|
||||
@ -3332,11 +3248,7 @@ public class Env {
|
||||
return;
|
||||
}
|
||||
if (removedFe.getRole() == FrontendNodeType.FOLLOWER || removedFe.getRole() == FrontendNodeType.REPLICA) {
|
||||
// ip may be changed, so we need use both ip and hostname to check.
|
||||
// use node.getIdent() for simplicity here.
|
||||
helperNodes.removeIf(node -> (node.getIp().equals(removedFe.getIp())
|
||||
|| node.getIdent().equals(removedFe.getHostName()))
|
||||
&& node.getPort() == removedFe.getEditLogPort());
|
||||
helperNodes.remove(Pair.of(removedFe.getHost(), removedFe.getEditLogPort()));
|
||||
}
|
||||
|
||||
removedFrontends.add(removedFe.getNodeName());
|
||||
@ -3593,16 +3505,16 @@ public class Env {
|
||||
return this.role;
|
||||
}
|
||||
|
||||
public HostInfo getHelperNode() {
|
||||
public Pair<String, Integer> getHelperNode() {
|
||||
Preconditions.checkState(helperNodes.size() >= 1);
|
||||
return this.helperNodes.get(0);
|
||||
}
|
||||
|
||||
public List<HostInfo> getHelperNodes() {
|
||||
public List<Pair<String, Integer>> getHelperNodes() {
|
||||
return Lists.newArrayList(helperNodes);
|
||||
}
|
||||
|
||||
public HostInfo getSelfNode() {
|
||||
public Pair<String, Integer> getSelfNode() {
|
||||
return this.selfNode;
|
||||
}
|
||||
|
||||
@ -3635,13 +3547,6 @@ public class Env {
|
||||
return this.masterIp;
|
||||
}
|
||||
|
||||
public String getMasterHostName() {
|
||||
if (!isReady()) {
|
||||
return "";
|
||||
}
|
||||
return this.masterHostName;
|
||||
}
|
||||
|
||||
public EsRepository getEsRepository() {
|
||||
return getInternalCatalog().getEsRepository();
|
||||
}
|
||||
|
||||
@ -24,7 +24,6 @@ import org.apache.doris.common.util.NetUtils;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.system.Frontend;
|
||||
import org.apache.doris.system.SystemInfoService.HostInfo;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
@ -87,11 +86,11 @@ public class FrontendsProcNode implements ProcNodeInterface {
|
||||
List<InetSocketAddress> allFe = env.getHaProtocol().getElectableNodes(true /* include leader */);
|
||||
allFe.addAll(env.getHaProtocol().getObserverNodes());
|
||||
List<Pair<String, Integer>> allFeHosts = convertToHostPortPair(allFe);
|
||||
List<HostInfo> helperNodes = env.getHelperNodes();
|
||||
List<Pair<String, Integer>> helperNodes = env.getHelperNodes();
|
||||
|
||||
// Because the `show frontend` stmt maybe forwarded from other FE.
|
||||
// if we only get self node from currrent catalog, the "CurrentConnected" field will always points to Msater FE.
|
||||
String selfNode = Env.getCurrentEnv().getSelfNode().getIp();
|
||||
String selfNode = Env.getCurrentEnv().getSelfNode().first;
|
||||
if (ConnectContext.get() != null && !Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) {
|
||||
selfNode = ConnectContext.get().getCurrentConnectedFEIp();
|
||||
}
|
||||
@ -100,13 +99,13 @@ public class FrontendsProcNode implements ProcNodeInterface {
|
||||
|
||||
List<String> info = new ArrayList<String>();
|
||||
info.add(fe.getNodeName());
|
||||
info.add(fe.getIp());
|
||||
info.add(fe.getHost());
|
||||
|
||||
info.add(NetUtils.getHostnameByIp(fe.getIp()));
|
||||
info.add(NetUtils.getHostnameByIp(fe.getHost()));
|
||||
info.add(Integer.toString(fe.getEditLogPort()));
|
||||
info.add(Integer.toString(Config.http_port));
|
||||
|
||||
if (fe.getIp().equals(env.getSelfNode().getIp())) {
|
||||
if (fe.getHost().equals(env.getSelfNode().first)) {
|
||||
info.add(Integer.toString(Config.query_port));
|
||||
info.add(Integer.toString(Config.rpc_port));
|
||||
} else {
|
||||
@ -115,7 +114,7 @@ public class FrontendsProcNode implements ProcNodeInterface {
|
||||
}
|
||||
|
||||
info.add(fe.getRole().name());
|
||||
InetSocketAddress socketAddress = new InetSocketAddress(fe.getIp(), fe.getEditLogPort());
|
||||
InetSocketAddress socketAddress = new InetSocketAddress(fe.getHost(), fe.getEditLogPort());
|
||||
//An ipv6 address may have different format, so we compare InetSocketAddress objects instead of IP Strings.
|
||||
//e.g. fdbd:ff1:ce00:1c26::d8 and fdbd:ff1:ce00:1c26:0:0:d8
|
||||
info.add(String.valueOf(socketAddress.equals(master)));
|
||||
@ -123,7 +122,7 @@ public class FrontendsProcNode implements ProcNodeInterface {
|
||||
info.add(Integer.toString(env.getClusterId()));
|
||||
info.add(String.valueOf(isJoin(allFeHosts, fe)));
|
||||
|
||||
if (fe.getIp().equals(env.getSelfNode().getIp())) {
|
||||
if (fe.getHost().equals(env.getSelfNode().first)) {
|
||||
info.add("true");
|
||||
info.add(Long.toString(env.getEditLog().getMaxJournalId()));
|
||||
} else {
|
||||
@ -135,19 +134,19 @@ public class FrontendsProcNode implements ProcNodeInterface {
|
||||
info.add(fe.getHeartbeatErrMsg());
|
||||
info.add(fe.getVersion());
|
||||
// To indicate which FE we currently connected
|
||||
info.add(fe.getIp().equals(selfNode) ? "Yes" : "No");
|
||||
info.add(fe.getHost().equals(selfNode) ? "Yes" : "No");
|
||||
|
||||
infos.add(info);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isHelperNode(List<HostInfo> helperNodes, Frontend fe) {
|
||||
return helperNodes.stream().anyMatch(p -> p.getIp().equals(fe.getIp()) && p.getPort() == fe.getEditLogPort());
|
||||
private static boolean isHelperNode(List<Pair<String, Integer>> helperNodes, Frontend fe) {
|
||||
return helperNodes.stream().anyMatch(p -> p.first.equals(fe.getHost()) && p.second == fe.getEditLogPort());
|
||||
}
|
||||
|
||||
private static boolean isJoin(List<Pair<String, Integer>> allFeHosts, Frontend fe) {
|
||||
for (Pair<String, Integer> pair : allFeHosts) {
|
||||
if (fe.getIp().equals(pair.first) && fe.getEditLogPort() == pair.second) {
|
||||
if (fe.getHost().equals(pair.first) && fe.getEditLogPort() == pair.second) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -70,7 +70,7 @@ public class Telemetry {
|
||||
throw new Exception("unknown value " + Config.trace_exporter + " of trace_exporter in fe.conf");
|
||||
}
|
||||
|
||||
String serviceName = "FRONTEND:" + Env.getCurrentEnv().getSelfNode().getIp();
|
||||
String serviceName = "FRONTEND:" + Env.getCurrentEnv().getSelfNode().first;
|
||||
Resource serviceNameResource = Resource.create(
|
||||
Attributes.of(AttributeKey.stringKey("service.name"), serviceName));
|
||||
// Send a batch of spans if ScheduleDelay time or MaxExportBatchSize is reached
|
||||
|
||||
@ -19,7 +19,6 @@ package org.apache.doris.deploy;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.FsBroker;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.Pair;
|
||||
@ -29,7 +28,6 @@ import org.apache.doris.ha.FrontendNodeType;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.Frontend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.system.SystemInfoService.HostInfo;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
@ -136,8 +134,7 @@ public class DeployManager extends MasterDaemon {
|
||||
// So we use this map to count the continuous detected down times, if the continuous down time is more
|
||||
// then MAX_MISSING_TIME, we considered this node as down permanently.
|
||||
protected Map<String, Integer> counterMap = Maps.newHashMap();
|
||||
// k8s pod delete and will recreate, so we need to wait for a while,otherwise we will drop node by mistake
|
||||
protected static final Integer MAX_MISSING_TIME = 60;
|
||||
protected static final Integer MAX_MISSING_TIME = 5;
|
||||
|
||||
public DeployManager(Env env, long intervalMs) {
|
||||
super("deployManager", intervalMs);
|
||||
@ -239,20 +236,28 @@ public class DeployManager extends MasterDaemon {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public List<HostInfo> getHelperNodes() {
|
||||
public List<Pair<String, Integer>> getHelperNodes() {
|
||||
String existFeHosts = System.getenv(ENV_FE_EXIST_ENDPOINT);
|
||||
if (!Strings.isNullOrEmpty(existFeHosts)) {
|
||||
// Some Frontends already exist in service group.
|
||||
// We consider them as helper node
|
||||
List<HostInfo> helperNodes = Lists.newArrayList();
|
||||
List<Pair<String, Integer>> helperNodes = Lists.newArrayList();
|
||||
String[] splittedHosts = existFeHosts.split(",");
|
||||
for (String host : splittedHosts) {
|
||||
try {
|
||||
helperNodes.add(SystemInfoService.getIpHostAndPort(host, true));
|
||||
} catch (AnalysisException e) {
|
||||
String[] splittedHostPort = host.split(":");
|
||||
if (splittedHostPort.length != 2) {
|
||||
LOG.error("Invalid exist fe hosts: {}. will exit", existFeHosts);
|
||||
System.exit(-1);
|
||||
}
|
||||
Integer port = -1;
|
||||
try {
|
||||
port = Integer.valueOf(splittedHostPort[1]);
|
||||
} catch (NumberFormatException e) {
|
||||
LOG.error("Invalid exist fe hosts: {}. will exit", existFeHosts);
|
||||
System.exit(-1);
|
||||
}
|
||||
|
||||
helperNodes.add(Pair.of(splittedHostPort[0], port));
|
||||
}
|
||||
|
||||
return helperNodes;
|
||||
@ -322,8 +327,8 @@ public class DeployManager extends MasterDaemon {
|
||||
LOG.info("sorted fe host list: {}", feHostInfos);
|
||||
|
||||
// 4. return the first one as helper
|
||||
return Lists.newArrayList(new HostInfo(feHostInfos.get(0).getIp(), feHostInfos.get(0).getHostName(),
|
||||
feHostInfos.get(0).getPort()));
|
||||
return Lists.newArrayList(Pair.of(feHostInfos.get(0).getIp(), feHostInfos.get(0).getPort()));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -353,8 +358,9 @@ public class DeployManager extends MasterDaemon {
|
||||
}
|
||||
|
||||
// 1.1 Check if self is in electable fe service group
|
||||
// TODO(zd): 2023/2/17 Need to modify here when fe support FQDN (hostname will set to real hostname)
|
||||
SystemInfoService.HostInfo selfHostInfo = getFromHostInfos(remoteElectableFeHosts,
|
||||
new SystemInfoService.HostInfo(env.getMasterIp(), env.getMasterHostName(), Config.edit_log_port));
|
||||
new SystemInfoService.HostInfo(env.getMasterIp(), null, Config.edit_log_port));
|
||||
if (selfHostInfo == null) {
|
||||
// The running of this deploy manager means this node is considered self as Master.
|
||||
// If it self does not exist in electable fe service group, it should shut it self down.
|
||||
@ -580,10 +586,10 @@ public class DeployManager extends MasterDaemon {
|
||||
try {
|
||||
switch (nodeType) {
|
||||
case ELECTABLE:
|
||||
env.dropFrontend(FrontendNodeType.FOLLOWER, localIp, localHostName, localPort);
|
||||
env.dropFrontend(FrontendNodeType.FOLLOWER, localIp, localPort);
|
||||
break;
|
||||
case OBSERVER:
|
||||
env.dropFrontend(FrontendNodeType.OBSERVER, localIp, localHostName, localPort);
|
||||
env.dropFrontend(FrontendNodeType.OBSERVER, localIp, localPort);
|
||||
break;
|
||||
case BACKEND:
|
||||
case BACKEND_CN:
|
||||
@ -615,10 +621,10 @@ public class DeployManager extends MasterDaemon {
|
||||
try {
|
||||
switch (nodeType) {
|
||||
case ELECTABLE:
|
||||
env.addFrontend(FrontendNodeType.FOLLOWER, remoteIp, remoteHostName, remotePort);
|
||||
env.addFrontend(FrontendNodeType.FOLLOWER, remoteIp, remotePort);
|
||||
break;
|
||||
case OBSERVER:
|
||||
env.addFrontend(FrontendNodeType.OBSERVER, remoteIp, remoteHostName, remotePort);
|
||||
env.addFrontend(FrontendNodeType.OBSERVER, remoteIp, remotePort);
|
||||
break;
|
||||
case BACKEND:
|
||||
case BACKEND_CN:
|
||||
@ -685,23 +691,19 @@ public class DeployManager extends MasterDaemon {
|
||||
return hostPortPair;
|
||||
}
|
||||
|
||||
// TODO: Need to modify here when fe support FQDN (hostname will set to real hostname)
|
||||
private SystemInfoService.HostInfo convertToHostInfo(Frontend frontend) {
|
||||
return new SystemInfoService.HostInfo(frontend.getIp(), frontend.getHostName(), frontend.getEditLogPort());
|
||||
return new SystemInfoService.HostInfo(frontend.getHost(), null, frontend.getEditLogPort());
|
||||
}
|
||||
|
||||
private SystemInfoService.HostInfo convertToHostInfo(Backend backend) {
|
||||
return new SystemInfoService.HostInfo(backend.getIp(), backend.getHostName(), backend.getHeartbeatPort());
|
||||
}
|
||||
|
||||
// TODO: Need to modify here when fe support FQDN(will check hostname?)
|
||||
private boolean isSelf(SystemInfoService.HostInfo hostInfo) {
|
||||
if (Config.edit_log_port == hostInfo.getPort()) {
|
||||
// master host name may not same as local host name, so we should compare ip here
|
||||
if (env.getMasterHostName() != null && env.getMasterHostName().equals(hostInfo.getHostName())) {
|
||||
return true;
|
||||
}
|
||||
if (env.getMasterIp().equals(hostInfo.getIp())) {
|
||||
return true;
|
||||
}
|
||||
if (env.getMasterIp().equals(hostInfo.getIp()) && Config.edit_log_port == hostInfo.getPort()) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -221,23 +221,6 @@ public class BDBHA implements HAProtocol {
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean updateNodeAddress(String nodeName, String newHostName, int port) {
|
||||
ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin();
|
||||
if (replicationGroupAdmin == null) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
replicationGroupAdmin.updateAddress(nodeName, newHostName, port);
|
||||
} catch (MemberNotFoundException e) {
|
||||
LOG.error("the updating electable node is not found {}", nodeName, e);
|
||||
return false;
|
||||
} catch (MasterStateException e) {
|
||||
LOG.error("the updating electable node is master {}", nodeName, e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// When new Follower FE is added to the cluster, it should also be added to the
|
||||
// helper sockets in
|
||||
// ReplicationGroupAdmin, in order to fix the following case:
|
||||
|
||||
@ -17,8 +17,6 @@
|
||||
|
||||
package org.apache.doris.ha;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.FeMetaVersion;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
|
||||
@ -29,20 +27,17 @@ import java.io.IOException;
|
||||
public class MasterInfo implements Writable {
|
||||
|
||||
private String ip;
|
||||
private String hostName;
|
||||
private int httpPort;
|
||||
private int rpcPort;
|
||||
|
||||
public MasterInfo() {
|
||||
this.ip = "";
|
||||
this.hostName = "";
|
||||
this.httpPort = 0;
|
||||
this.rpcPort = 0;
|
||||
}
|
||||
|
||||
public MasterInfo(String ip, String hostName, int httpPort, int rpcPort) {
|
||||
public MasterInfo(String ip, int httpPort, int rpcPort) {
|
||||
this.ip = ip;
|
||||
this.hostName = hostName;
|
||||
this.httpPort = httpPort;
|
||||
this.rpcPort = rpcPort;
|
||||
}
|
||||
@ -55,14 +50,6 @@ public class MasterInfo implements Writable {
|
||||
this.ip = ip;
|
||||
}
|
||||
|
||||
public String getHostName() {
|
||||
return hostName;
|
||||
}
|
||||
|
||||
public void setHostName(String hostName) {
|
||||
this.hostName = hostName;
|
||||
}
|
||||
|
||||
public int getHttpPort() {
|
||||
return this.httpPort;
|
||||
}
|
||||
@ -84,16 +71,12 @@ public class MasterInfo implements Writable {
|
||||
Text.writeString(out, ip);
|
||||
out.writeInt(httpPort);
|
||||
out.writeInt(rpcPort);
|
||||
Text.writeString(out, hostName);
|
||||
}
|
||||
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
ip = Text.readString(in);
|
||||
httpPort = in.readInt();
|
||||
rpcPort = in.readInt();
|
||||
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_118) {
|
||||
hostName = Text.readString(in);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -54,14 +54,13 @@ public class MetaService extends RestBaseController {
|
||||
|
||||
private static final String VERSION = "version";
|
||||
private static final String HOST = "host";
|
||||
private static final String HOSTNAME = "hostname";
|
||||
private static final String PORT = "port";
|
||||
|
||||
private File imageDir = MetaHelper.getMasterImageDir();
|
||||
|
||||
private boolean isFromValidFe(HttpServletRequest request) {
|
||||
String clientHost = request.getRemoteHost();
|
||||
Frontend fe = Env.getCurrentEnv().getFeByIp(clientHost);
|
||||
Frontend fe = Env.getCurrentEnv().getFeByHost(clientHost);
|
||||
if (fe == null) {
|
||||
LOG.warn("request is not from valid FE. client: {}", clientHost);
|
||||
return false;
|
||||
@ -185,15 +184,12 @@ public class MetaService extends RestBaseController {
|
||||
@RequestMapping(path = "/role", method = RequestMethod.GET)
|
||||
public Object role(HttpServletRequest request, HttpServletResponse response) throws DdlException {
|
||||
checkFromValidFe(request);
|
||||
// For upgrade compatibility, the host parameter name remains the same
|
||||
// and the new hostname parameter is added.
|
||||
// host = ip
|
||||
|
||||
String host = request.getParameter(HOST);
|
||||
String hostname = request.getParameter(HOSTNAME);
|
||||
String portString = request.getParameter(PORT);
|
||||
if (!Strings.isNullOrEmpty(host) && !Strings.isNullOrEmpty(portString)) {
|
||||
int port = Integer.parseInt(portString);
|
||||
Frontend fe = Env.getCurrentEnv().checkFeExist(host, hostname, port);
|
||||
Frontend fe = Env.getCurrentEnv().checkFeExist(host, port);
|
||||
if (fe == null) {
|
||||
response.setHeader("role", FrontendNodeType.UNKNOWN.name());
|
||||
} else {
|
||||
|
||||
@ -60,7 +60,7 @@ public class ClusterAction extends RestBaseController {
|
||||
Map<String, List<String>> result = Maps.newHashMap();
|
||||
List<String> frontends = Env.getCurrentEnv().getFrontends(null)
|
||||
.stream().filter(Frontend::isAlive)
|
||||
.map(Frontend::getIp)
|
||||
.map(Frontend::getHost)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
result.put("mysql", frontends.stream().map(ip -> ip + ":" + Config.query_port).collect(Collectors.toList()));
|
||||
|
||||
@ -49,7 +49,7 @@ public class HttpUtils {
|
||||
|
||||
static List<Pair<String, Integer>> getFeList() {
|
||||
return Env.getCurrentEnv().getFrontends(null)
|
||||
.stream().filter(Frontend::isAlive).map(fe -> Pair.of(fe.getIp(), Config.http_port))
|
||||
.stream().filter(Frontend::isAlive).map(fe -> Pair.of(fe.getHost(), Config.http_port))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ConfigBase;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MarkedCountDownLatch;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
@ -229,7 +230,7 @@ public class NodeAction extends RestBaseController {
|
||||
}
|
||||
|
||||
private static List<String> getFeList() {
|
||||
return Env.getCurrentEnv().getFrontends(null).stream().map(fe -> fe.getIp() + ":" + Config.http_port)
|
||||
return Env.getCurrentEnv().getFrontends(null).stream().map(fe -> fe.getHost() + ":" + Config.http_port)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@ -482,7 +483,7 @@ public class NodeAction extends RestBaseController {
|
||||
List<Map<String, String>> failedTotal = Lists.newArrayList();
|
||||
List<NodeConfigs> nodeConfigList = parseSetConfigNodes(requestBody, failedTotal);
|
||||
List<Pair<String, Integer>> aliveFe = Env.getCurrentEnv().getFrontends(null).stream().filter(Frontend::isAlive)
|
||||
.map(fe -> Pair.of(fe.getIp(), Config.http_port)).collect(Collectors.toList());
|
||||
.map(fe -> Pair.of(fe.getHost(), Config.http_port)).collect(Collectors.toList());
|
||||
checkNodeIsAlive(nodeConfigList, aliveFe, failedTotal);
|
||||
|
||||
Map<String, String> header = Maps.newHashMap();
|
||||
@ -645,6 +646,9 @@ public class NodeAction extends RestBaseController {
|
||||
}
|
||||
try {
|
||||
String role = reqInfo.getRole();
|
||||
String[] split = reqInfo.getHostPort().split(":");
|
||||
String host = split[0];
|
||||
int port = Integer.parseInt(split[1]);
|
||||
Env currentEnv = Env.getCurrentEnv();
|
||||
FrontendNodeType frontendNodeType;
|
||||
if (FrontendNodeType.FOLLOWER.name().equals(role)) {
|
||||
@ -652,14 +656,13 @@ public class NodeAction extends RestBaseController {
|
||||
} else {
|
||||
frontendNodeType = FrontendNodeType.OBSERVER;
|
||||
}
|
||||
HostInfo info = SystemInfoService.getIpHostAndPort(reqInfo.getHostPort(), true);
|
||||
if ("ADD".equals(action)) {
|
||||
currentEnv.addFrontend(frontendNodeType, info.getIp(), info.getHostName(), info.getPort());
|
||||
currentEnv.addFrontend(frontendNodeType, host, port);
|
||||
} else if ("DROP".equals(action)) {
|
||||
currentEnv.dropFrontend(frontendNodeType, info.getIp(), info.getHostName(), info.getPort());
|
||||
currentEnv.dropFrontend(frontendNodeType, host, port);
|
||||
}
|
||||
} catch (UserException userException) {
|
||||
return ResponseEntityBuilder.okWithCommonError(userException.getMessage());
|
||||
} catch (DdlException ddlException) {
|
||||
return ResponseEntityBuilder.okWithCommonError(ddlException.getMessage());
|
||||
}
|
||||
return ResponseEntityBuilder.ok();
|
||||
}
|
||||
|
||||
@ -175,7 +175,7 @@ public class QueryProfileAction extends RestBaseController {
|
||||
|
||||
// add node information
|
||||
for (List<String> query : queries) {
|
||||
query.add(1, Env.getCurrentEnv().getSelfNode().getIp() + ":" + Config.http_port);
|
||||
query.add(1, Env.getCurrentEnv().getSelfNode().first + ":" + Config.http_port);
|
||||
}
|
||||
|
||||
if (!Strings.isNullOrEmpty(search)) {
|
||||
|
||||
@ -337,9 +337,9 @@ public class JournalEntity implements Writable {
|
||||
}
|
||||
case OperationType.OP_ADD_FRONTEND:
|
||||
case OperationType.OP_ADD_FIRST_FRONTEND:
|
||||
case OperationType.OP_MODIFY_FRONTEND:
|
||||
case OperationType.OP_REMOVE_FRONTEND: {
|
||||
data = Frontend.read(in);
|
||||
data = new Frontend();
|
||||
((Frontend) data).readFields(in);
|
||||
isRead = true;
|
||||
break;
|
||||
}
|
||||
|
||||
@ -183,6 +183,7 @@ public class BDBEnvironment {
|
||||
// start state change listener
|
||||
StateChangeListener listener = new BDBStateChangeListener();
|
||||
replicatedEnvironment.setStateChangeListener(listener);
|
||||
|
||||
// open epochDB. the first parameter null means auto-commit
|
||||
epochDB = replicatedEnvironment.openDatabase(null, "epochDB", dbConfig);
|
||||
break;
|
||||
|
||||
@ -18,7 +18,7 @@
|
||||
package org.apache.doris.journal.bdbje;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.io.DataOutputBuffer;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.Util;
|
||||
@ -27,7 +27,6 @@ import org.apache.doris.journal.JournalCursor;
|
||||
import org.apache.doris.journal.JournalEntity;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.persist.OperationType;
|
||||
import org.apache.doris.system.SystemInfoService.HostInfo;
|
||||
|
||||
import com.sleepycat.bind.tuple.TupleBinding;
|
||||
import com.sleepycat.je.Database;
|
||||
@ -81,17 +80,9 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
|
||||
*/
|
||||
private void initBDBEnv(String nodeName) {
|
||||
environmentPath = Env.getServingEnv().getBdbDir();
|
||||
HostInfo selfNode = Env.getServingEnv().getSelfNode();
|
||||
Pair<String, Integer> selfNode = Env.getServingEnv().getSelfNode();
|
||||
selfNodeName = nodeName;
|
||||
if (Config.enable_fqdn_mode) {
|
||||
// We use the hostname as the address of the bdbje node,
|
||||
// so that we do not need to update bdbje when the IP changes.
|
||||
// WARNING:However, it is necessary to ensure that the hostname of the node
|
||||
// can be resolved and accessed by other nodes.
|
||||
selfNodeHostPort = selfNode.getHostName() + ":" + selfNode.getPort();
|
||||
} else {
|
||||
selfNodeHostPort = selfNode.getIp() + ":" + selfNode.getPort();
|
||||
}
|
||||
selfNodeHostPort = selfNode.first + ":" + selfNode.second;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -308,11 +299,8 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
|
||||
if (bdbEnvironment == null) {
|
||||
File dbEnv = new File(environmentPath);
|
||||
bdbEnvironment = new BDBEnvironment();
|
||||
HostInfo helperNode = Env.getServingEnv().getHelperNode();
|
||||
String helperHostPort = helperNode.getIp() + ":" + helperNode.getPort();
|
||||
if (Config.enable_fqdn_mode) {
|
||||
helperHostPort = helperNode.getHostName() + ":" + helperNode.getPort();
|
||||
}
|
||||
Pair<String, Integer> helperNode = Env.getServingEnv().getHelperNode();
|
||||
String helperHostPort = helperNode.first + ":" + helperNode.second;
|
||||
try {
|
||||
bdbEnvironment.setup(dbEnv, selfNodeName, selfNodeHostPort, helperHostPort,
|
||||
Env.getServingEnv().isElectable());
|
||||
@ -370,14 +358,14 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
|
||||
// the files
|
||||
// ATTN: here we use `getServingEnv()`, because only serving catalog has
|
||||
// helper nodes.
|
||||
HostInfo helperNode = Env.getServingEnv().getHelperNode();
|
||||
Pair<String, Integer> helperNode = Env.getServingEnv().getHelperNode();
|
||||
NetworkRestore restore = new NetworkRestore();
|
||||
NetworkRestoreConfig config = new NetworkRestoreConfig();
|
||||
config.setRetainLogFiles(false);
|
||||
restore.execute(insufficientLogEx, config);
|
||||
bdbEnvironment.close();
|
||||
bdbEnvironment.setup(new File(environmentPath), selfNodeName, selfNodeHostPort,
|
||||
helperNode.getIp() + ":" + helperNode.getPort(), Env.getServingEnv().isElectable());
|
||||
helperNode.first + ":" + helperNode.second, Env.getServingEnv().isElectable());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -185,7 +185,7 @@ public class Checkpoint extends MasterDaemon {
|
||||
if (!allFrontends.isEmpty()) {
|
||||
otherNodesCount = allFrontends.size() - 1; // skip master itself
|
||||
for (Frontend fe : allFrontends) {
|
||||
String host = fe.getIp();
|
||||
String host = fe.getHost();
|
||||
if (host.equals(Env.getServingEnv().getMasterIp())) {
|
||||
// skip master itself
|
||||
continue;
|
||||
@ -227,7 +227,7 @@ public class Checkpoint extends MasterDaemon {
|
||||
long deleteVersion = storage.getLatestValidatedImageSeq();
|
||||
if (successPushed > 0) {
|
||||
for (Frontend fe : allFrontends) {
|
||||
String host = fe.getIp();
|
||||
String host = fe.getHost();
|
||||
if (host.equals(Env.getServingEnv().getMasterIp())) {
|
||||
// skip master itself
|
||||
continue;
|
||||
|
||||
@ -402,11 +402,6 @@ public class EditLog {
|
||||
env.replayAddFrontend(fe);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_MODIFY_FRONTEND: {
|
||||
Frontend fe = (Frontend) journal.getData();
|
||||
env.replayModifyFrontend(fe);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_REMOVE_FRONTEND: {
|
||||
Frontend fe = (Frontend) journal.getData();
|
||||
env.replayDropFrontend(fe);
|
||||
@ -1241,10 +1236,6 @@ public class EditLog {
|
||||
logEdit(OperationType.OP_ADD_FIRST_FRONTEND, fe);
|
||||
}
|
||||
|
||||
public void logModifyFrontend(Frontend fe) {
|
||||
logEdit(OperationType.OP_MODIFY_FRONTEND, fe);
|
||||
}
|
||||
|
||||
public void logRemoveFrontend(Frontend fe) {
|
||||
logEdit(OperationType.OP_REMOVE_FRONTEND, fe);
|
||||
}
|
||||
|
||||
@ -150,8 +150,6 @@ public class OperationType {
|
||||
public static final short OP_DROP_REPOSITORY = 90;
|
||||
public static final short OP_MODIFY_BACKEND = 91;
|
||||
|
||||
public static final short OP_MODIFY_FRONTEND = 92;
|
||||
|
||||
//colocate table
|
||||
public static final short OP_COLOCATE_ADD_TABLE = 94;
|
||||
public static final short OP_COLOCATE_REMOVE_TABLE = 95;
|
||||
|
||||
@ -491,7 +491,7 @@ public class StmtExecutor implements ProfileWriter {
|
||||
// If goes here, which means we can't find a valid Master FE(some error happens).
|
||||
// To avoid endless forward, throw exception here.
|
||||
throw new UserException("The statement has been forwarded to master FE("
|
||||
+ Env.getCurrentEnv().getSelfNode().getIp() + ") and failed to execute"
|
||||
+ Env.getCurrentEnv().getSelfNode().first + ") and failed to execute"
|
||||
+ " because Master FE is not ready. You may need to check FE's status");
|
||||
}
|
||||
forwardToMaster();
|
||||
|
||||
@ -780,7 +780,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
public TMasterOpResult forward(TMasterOpRequest params) throws TException {
|
||||
TNetworkAddress clientAddr = getClientAddr();
|
||||
if (clientAddr != null) {
|
||||
Frontend fe = Env.getCurrentEnv().getFeByIp(clientAddr.getHostname());
|
||||
Frontend fe = Env.getCurrentEnv().getFeByHost(clientAddr.getHostname());
|
||||
if (fe == null) {
|
||||
LOG.warn("reject request from invalid host. client: {}", clientAddr);
|
||||
throw new TException("request from invalid host was rejected.");
|
||||
|
||||
@ -19,7 +19,6 @@ package org.apache.doris.system;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.ClientPool;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
@ -41,36 +40,10 @@ public class FQDNManager extends MasterDaemon {
|
||||
}
|
||||
|
||||
/**
|
||||
* At each round: check if ip of be or fe has already been changed
|
||||
* At each round: check if ip of be has already been changed
|
||||
*/
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
updateBeIp();
|
||||
updateFeIp();
|
||||
}
|
||||
|
||||
private void updateFeIp() {
|
||||
for (Frontend fe : Env.getCurrentEnv().getFrontends(null /* all */)) {
|
||||
if (fe.getHostName() != null) {
|
||||
try {
|
||||
InetAddress inetAddress = InetAddress.getByName(fe.getHostName());
|
||||
if (!fe.getIp().equalsIgnoreCase(inetAddress.getHostAddress())) {
|
||||
String oldIp = fe.getIp();
|
||||
String newIp = inetAddress.getHostAddress();
|
||||
Env.getCurrentEnv().modifyFrontendIp(fe.getNodeName(), newIp);
|
||||
LOG.info("ip for {} of fe has been changed from {} to {}",
|
||||
fe.getHostName(), oldIp, fe.getIp());
|
||||
}
|
||||
} catch (UnknownHostException e) {
|
||||
LOG.warn("unknown host name for fe, {}", fe.getHostName(), e);
|
||||
} catch (DdlException e) {
|
||||
LOG.warn("fail to update ip for fe, {}", fe.getHostName(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void updateBeIp() {
|
||||
for (Backend be : nodeMgr.getIdToBackend().values()) {
|
||||
if (be.getHostName() != null) {
|
||||
try {
|
||||
|
||||
@ -19,32 +19,20 @@ package org.apache.doris.system;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeMetaVersion;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.ha.BDBHA;
|
||||
import org.apache.doris.ha.FrontendNodeType;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.system.HeartbeatResponse.HbStatus;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
public class Frontend implements Writable {
|
||||
@SerializedName("role")
|
||||
private FrontendNodeType role;
|
||||
// nodeName = ip:port_timestamp
|
||||
@SerializedName("nodeName")
|
||||
private String nodeName;
|
||||
@SerializedName("ip")
|
||||
private volatile String ip;
|
||||
// used for getIpByHostname
|
||||
@SerializedName("hostName")
|
||||
private String hostName;
|
||||
@SerializedName("editLogPort")
|
||||
private String host;
|
||||
private int editLogPort;
|
||||
private String version;
|
||||
|
||||
@ -59,15 +47,10 @@ public class Frontend implements Writable {
|
||||
|
||||
public Frontend() {}
|
||||
|
||||
public Frontend(FrontendNodeType role, String nodeName, String ip, int editLogPort) {
|
||||
this(role, nodeName, ip, null, editLogPort);
|
||||
}
|
||||
|
||||
public Frontend(FrontendNodeType role, String nodeName, String ip, String hostName, int editLogPort) {
|
||||
public Frontend(FrontendNodeType role, String nodeName, String host, int editLogPort) {
|
||||
this.role = role;
|
||||
this.nodeName = nodeName;
|
||||
this.ip = ip;
|
||||
this.hostName = hostName;
|
||||
this.host = host;
|
||||
this.editLogPort = editLogPort;
|
||||
}
|
||||
|
||||
@ -75,22 +58,14 @@ public class Frontend implements Writable {
|
||||
return this.role;
|
||||
}
|
||||
|
||||
public String getIp() {
|
||||
return this.ip;
|
||||
public String getHost() {
|
||||
return this.host;
|
||||
}
|
||||
|
||||
public String getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public String getHostName() {
|
||||
return hostName;
|
||||
}
|
||||
|
||||
public void setHostName(String hostName) {
|
||||
this.hostName = hostName;
|
||||
}
|
||||
|
||||
public String getNodeName() {
|
||||
return nodeName;
|
||||
}
|
||||
@ -156,8 +131,10 @@ public class Frontend implements Writable {
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
String json = GsonUtils.GSON.toJson(this);
|
||||
Text.writeString(out, json);
|
||||
Text.writeString(out, role.name());
|
||||
Text.writeString(out, host);
|
||||
out.writeInt(editLogPort);
|
||||
Text.writeString(out, nodeName);
|
||||
}
|
||||
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
@ -167,31 +144,22 @@ public class Frontend implements Writable {
|
||||
// we changed REPLICA to FOLLOWER
|
||||
role = FrontendNodeType.FOLLOWER;
|
||||
}
|
||||
ip = Text.readString(in);
|
||||
host = Text.readString(in);
|
||||
editLogPort = in.readInt();
|
||||
nodeName = Text.readString(in);
|
||||
}
|
||||
|
||||
public static Frontend read(DataInput in) throws IOException {
|
||||
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_118) {
|
||||
Frontend frontend = new Frontend();
|
||||
frontend.readFields(in);
|
||||
return frontend;
|
||||
}
|
||||
String json = Text.readString(in);
|
||||
return GsonUtils.GSON.fromJson(json, Frontend.class);
|
||||
Frontend frontend = new Frontend();
|
||||
frontend.readFields(in);
|
||||
return frontend;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("name: ").append(nodeName).append(", role: ").append(role.name());
|
||||
sb.append(", hostname: ").append(hostName);
|
||||
sb.append(", ").append(ip).append(":").append(editLogPort);
|
||||
sb.append(", ").append(host).append(":").append(editLogPort);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public void setIp(String ip) {
|
||||
this.ip = ip;
|
||||
}
|
||||
}
|
||||
|
||||
@ -29,7 +29,6 @@ import org.apache.doris.persist.HbPackage;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.system.HeartbeatResponse.HbStatus;
|
||||
import org.apache.doris.system.SystemInfoService.HostInfo;
|
||||
import org.apache.doris.thrift.FrontendService;
|
||||
import org.apache.doris.thrift.HeartbeatService;
|
||||
import org.apache.doris.thrift.TBackendInfo;
|
||||
@ -294,9 +293,7 @@ public class HeartbeatMgr extends MasterDaemon {
|
||||
|
||||
@Override
|
||||
public HeartbeatResponse call() {
|
||||
HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
|
||||
if (fe.getIp().equals(selfNode.getIp())
|
||||
|| (fe.getHostName() != null && fe.getHostName().equals(selfNode.getHostName()))) {
|
||||
if (fe.getHost().equals(Env.getCurrentEnv().getSelfNode().first)) {
|
||||
// heartbeat to self
|
||||
if (Env.getCurrentEnv().isReady()) {
|
||||
return new FrontendHbResponse(fe.getNodeName(), Config.query_port, Config.rpc_port,
|
||||
@ -312,7 +309,7 @@ public class HeartbeatMgr extends MasterDaemon {
|
||||
|
||||
private HeartbeatResponse getHeartbeatResponse() {
|
||||
FrontendService.Client client = null;
|
||||
TNetworkAddress addr = new TNetworkAddress(fe.getIp(), Config.rpc_port);
|
||||
TNetworkAddress addr = new TNetworkAddress(fe.getHost(), Config.rpc_port);
|
||||
boolean ok = false;
|
||||
try {
|
||||
client = ClientPool.frontendHeartbeatPool.borrowObject(addr);
|
||||
|
||||
@ -61,7 +61,6 @@ import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
@ -128,33 +127,6 @@ public class SystemInfoService {
|
||||
return res;
|
||||
}
|
||||
|
||||
public boolean isSame(HostInfo other) {
|
||||
if (other.getPort() != port) {
|
||||
return false;
|
||||
}
|
||||
if (hostName != null && hostName.equals(other.getHostName())) {
|
||||
return true;
|
||||
}
|
||||
if (ip != null && ip.equals(other.getIp())) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
HostInfo that = (HostInfo) o;
|
||||
return Objects.equals(ip, that.getIp())
|
||||
&& Objects.equals(hostName, that.getHostName())
|
||||
&& Objects.equals(port, that.getPort());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HostInfo{"
|
||||
@ -196,8 +168,8 @@ public class SystemInfoService {
|
||||
if (!Config.enable_fqdn_mode) {
|
||||
hostInfo.setHostName(null);
|
||||
}
|
||||
if (Config.enable_fqdn_mode && StringUtils.isEmpty(hostInfo.getHostName())) {
|
||||
throw new DdlException("backend's hostName should not be empty while enable_fqdn_mode is true");
|
||||
if (Config.enable_fqdn_mode && hostInfo.getHostName() == null) {
|
||||
throw new DdlException("backend's hostName should not be null while enable_fqdn_mode is true");
|
||||
}
|
||||
// check is already exist
|
||||
if (getBackendWithHeartbeatPort(hostInfo.getIp(), hostInfo.getHostName(), hostInfo.getPort()) != null) {
|
||||
@ -383,20 +355,20 @@ public class SystemInfoService {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Backend getBackendWithBePort(String ip, int bePort) {
|
||||
public Backend getBackendWithBePort(String host, int bePort) {
|
||||
ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
|
||||
for (Backend backend : idToBackend.values()) {
|
||||
if (backend.getIp().equals(ip) && backend.getBePort() == bePort) {
|
||||
if (backend.getIp().equals(host) && backend.getBePort() == bePort) {
|
||||
return backend;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public Backend getBackendWithHttpPort(String ip, int httpPort) {
|
||||
public Backend getBackendWithHttpPort(String host, int httpPort) {
|
||||
ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
|
||||
for (Backend backend : idToBackend.values()) {
|
||||
if (backend.getIp().equals(ip) && backend.getHttpPort() == httpPort) {
|
||||
if (backend.getIp().equals(host) && backend.getHttpPort() == httpPort) {
|
||||
return backend;
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,8 +19,6 @@ package org.apache.doris.system;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.ha.BDBHA;
|
||||
import org.apache.doris.ha.FrontendNodeType;
|
||||
|
||||
import mockit.Expectations;
|
||||
import mockit.Mock;
|
||||
@ -32,8 +30,6 @@ import org.junit.Test;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class FQDNManagerTest {
|
||||
@Mocked
|
||||
@ -46,11 +42,6 @@ public class FQDNManagerTest {
|
||||
|
||||
private SystemInfoService systemInfoService;
|
||||
|
||||
List<Frontend> frontends = new ArrayList<>();
|
||||
|
||||
@Mocked
|
||||
private BDBHA bdbha;
|
||||
|
||||
@Before
|
||||
public void setUp() throws UnknownHostException {
|
||||
new MockUp<InetAddress>(InetAddress.class) {
|
||||
@ -65,41 +56,8 @@ public class FQDNManagerTest {
|
||||
public Env getServingEnv() {
|
||||
return env;
|
||||
}
|
||||
|
||||
@Mock
|
||||
public Env getCurrentEnv() {
|
||||
return env;
|
||||
}
|
||||
|
||||
@Mock
|
||||
public boolean tryLock() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Mock
|
||||
public void modifyFrontendIp(String nodeName, String ip) {
|
||||
for (Frontend fe : frontends) {
|
||||
if (fe.getNodeName().equals(nodeName)) {
|
||||
fe.setIp(ip);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
new MockUp<BDBHA>(BDBHA.class) {
|
||||
@Mock
|
||||
public boolean updateNodeAddress(String nodeName, String ip, int port) {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
Config.enable_fqdn_mode = true;
|
||||
systemInfoService = new SystemInfoService();
|
||||
systemInfoService.addBackend(new Backend(1, "193.88.67.98", "doris.test.domain", 9090));
|
||||
frontends.add(new Frontend(FrontendNodeType.FOLLOWER, "doris.test.domain_9010_1675168383846",
|
||||
"193.88.67.90", "doris.test.domain", 9010));
|
||||
fdqnManager = new FQDNManager(systemInfoService);
|
||||
|
||||
new Expectations() {
|
||||
{
|
||||
env.isReady();
|
||||
@ -109,20 +67,13 @@ public class FQDNManagerTest {
|
||||
inetAddress.getHostAddress();
|
||||
minTimes = 0;
|
||||
result = "193.88.67.99";
|
||||
|
||||
env.getFrontends(null);
|
||||
minTimes = 0;
|
||||
result = frontends;
|
||||
|
||||
env.getFeByName("doris.test.domain_9010_1675168383846");
|
||||
minTimes = 0;
|
||||
result = frontends.get(0);
|
||||
|
||||
env.getHaProtocol();
|
||||
minTimes = 0;
|
||||
result = bdbha;
|
||||
}
|
||||
};
|
||||
|
||||
Config.enable_fqdn_mode = true;
|
||||
systemInfoService = new SystemInfoService();
|
||||
systemInfoService.addBackend(new Backend(1, "193.88.67.98", "doris.test.domain", 9090));
|
||||
fdqnManager = new FQDNManager(systemInfoService);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -133,14 +84,4 @@ public class FQDNManagerTest {
|
||||
Assert.assertEquals("193.88.67.99", systemInfoService.getBackend(1).getIp());
|
||||
fdqnManager.exit();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFrontendChanged() throws InterruptedException {
|
||||
// frontend ip changed
|
||||
Assert.assertEquals("193.88.67.90", env.getFrontends(null).get(0).getIp());
|
||||
fdqnManager.start();
|
||||
Thread.sleep(1000);
|
||||
Assert.assertEquals("193.88.67.99", env.getFrontends(null).get(0).getIp());
|
||||
fdqnManager.exit();
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,11 +20,11 @@ package org.apache.doris.system;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.FsBroker;
|
||||
import org.apache.doris.common.GenericPool;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.ha.FrontendNodeType;
|
||||
import org.apache.doris.system.HeartbeatMgr.BrokerHeartbeatHandler;
|
||||
import org.apache.doris.system.HeartbeatMgr.FrontendHeartbeatHandler;
|
||||
import org.apache.doris.system.HeartbeatResponse.HbStatus;
|
||||
import org.apache.doris.system.SystemInfoService.HostInfo;
|
||||
import org.apache.doris.thrift.FrontendService;
|
||||
import org.apache.doris.thrift.TBrokerOperationStatus;
|
||||
import org.apache.doris.thrift.TBrokerOperationStatusCode;
|
||||
@ -55,7 +55,7 @@ public class HeartbeatMgrTest {
|
||||
{
|
||||
env.getSelfNode();
|
||||
minTimes = 0;
|
||||
result = new HostInfo("192.168.1.3", null, 9010); // not self
|
||||
result = Pair.of("192.168.1.3", 9010); // not self
|
||||
|
||||
env.isReady();
|
||||
minTimes = 0;
|
||||
|
||||
Reference in New Issue
Block a user