修复在线扩容bug并优化代码

This commit is contained in:
xue_meng_en
2021-03-11 20:43:37 +08:00
parent 80df4ce336
commit e594191677
3 changed files with 125 additions and 66 deletions

View File

@ -21,6 +21,7 @@
import os import os
import sys import sys
import pwd
import subprocess import subprocess
import socket import socket
@ -222,7 +223,7 @@ General options:
backIpList = self.clusterInfo.getClusterBackIps() backIpList = self.clusterInfo.getClusterBackIps()
for nodeIp in self.newHostList: for nodeIp in self.newHostList:
if nodeIp not in backIpList: if nodeIp not in backIpList:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35702"] % \ GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35702"] %
nodeIp) nodeIp)
def _getBackIpNameMap(self): def _getBackIpNameMap(self):
@ -231,11 +232,17 @@ General options:
self.backIpNameMap[backip] = \ self.backIpNameMap[backip] = \
self.clusterInfo.getNodeNameByBackIp(backip) self.clusterInfo.getNodeNameByBackIp(backip)
def checkExecutingUserAndHost(self): def checkExecutingUser(self):
# check whether current user executing this command is root """
check whether current user executing this command is root
"""
if os.getuid() != 0: if os.getuid() != 0:
GaussLog.exitWithError(ErrorCode.GAUSS_501["GAUSS_50104"]) GaussLog.exitWithError(ErrorCode.GAUSS_501["GAUSS_50104"])
# check whether current host is primary host
def checkExecutingHost(self):
"""
check whether current host is primary host
"""
currentHost = socket.gethostname() currentHost = socket.gethostname()
primaryHost = "" primaryHost = ""
for nodeName in self.nodeNameList: for nodeName in self.nodeNameList:
@ -244,8 +251,8 @@ General options:
primaryHost = nodeName primaryHost = nodeName
break break
if currentHost != primaryHost: if currentHost != primaryHost:
GaussLog.exitWithError(ErrorCode.GAUSS_501["GAUSS_50110"] % \ GaussLog.exitWithError(ErrorCode.GAUSS_501["GAUSS_50110"] %
(currentHost + ", which is not primary.")) (currentHost + ", which is not primary"))
def checkTrust(self, hostList = None): def checkTrust(self, hostList = None):
""" """
@ -255,18 +262,15 @@ General options:
hostList = self.nodeNameList hostList = self.nodeNameList
rootSSHExceptionHosts = [] rootSSHExceptionHosts = []
individualSSHExceptionHosts = [] individualSSHExceptionHosts = []
sshTool = SshTool(hostList, timeout = 0)
retmap, output = sshTool.getSshStatusOutput("pwd")
for host in hostList: for host in hostList:
# check root's trust # check root's trust
if retmap[host] != DefaultValue.SUCCESS: checkRootTrustCmd = "ssh %s -o ConnectTimeout=10 \"pwd\"" % host
(status, output) = subprocess.getstatusoutput(checkRootTrustCmd)
if status != 0:
rootSSHExceptionHosts.append(host) rootSSHExceptionHosts.append(host)
try:
sshTool.clenSshResultFiles()
except Exception as e:
self.logger.debug(str(e))
# check individual user's trust # check individual user's trust
checkUserTrustCmd = "su - %s -c 'ssh %s \"pwd\"'" % (self.user, host) checkUserTrustCmd = "su - %s -c 'ssh %s -o " \
"ConnectTimeout=10 \"pwd\"'" % (self.user, host)
(status, output) = subprocess.getstatusoutput(checkUserTrustCmd) (status, output) = subprocess.getstatusoutput(checkUserTrustCmd)
if status != 0: if status != 0:
individualSSHExceptionHosts.append(host) individualSSHExceptionHosts.append(host)
@ -284,6 +288,20 @@ General options:
GaussLog.exitWithError(ErrorCode.GAUSS_511["GAUSS_51100"] % GaussLog.exitWithError(ErrorCode.GAUSS_511["GAUSS_51100"] %
sshExceptionInfo) sshExceptionInfo)
def checkEnvfile(self):
"""
check whether env file has been sourced, if not
1. throw error and exit if environment variable is separated
2. source /home/user/.bashrc otherwise
"""
if not DefaultValue.getEnv("GPHOME"):
userpath = pwd.getpwnam(self.user).pw_dir
envFile = os.path.join(userpath, ".bashrc")
sourceEnvCmd = "source %s" % envFile
os.system(sourceEnvCmd)
if not DefaultValue.getEnv("GPHOME"):
GaussLog.exitWithError(ErrorCode.GAUSS_518["GAUSS_51802"] % "file")
def _getHostAzNameMap(self): def _getHostAzNameMap(self):
""" """
get azName of all hosts get azName of all hosts
@ -326,13 +344,14 @@ if __name__ == "__main__":
""" """
""" """
expansion = Expansion() expansion = Expansion()
expansion.checkExecutingUser()
expansion.parseCommandLine() expansion.parseCommandLine()
expansion.checkParameters() expansion.checkParameters()
expansion.initLogs() expansion.initLogs()
expansion.checkEnvfile()
expansion.getExpansionInfo() expansion.getExpansionInfo()
expansion.checkXmlIncludeNewHost() expansion.checkXmlIncludeNewHost()
expansion.checkExecutingUserAndHost() expansion.checkExecutingHost()
expansion.checkTrust() expansion.checkTrust()
expImpl = ExpansionImpl(expansion) expImpl = ExpansionImpl(expansion)
expImpl.run() expImpl.run()

View File

@ -1113,7 +1113,11 @@ class ErrorCode():
"GAUSS_35704": "[GAUSS-35704] %s [%s] does not exist on node [%s].", "GAUSS_35704": "[GAUSS-35704] %s [%s] does not exist on node [%s].",
"GAUSS_35705": "[GAUSS-35705] Error, the database version is " "GAUSS_35705": "[GAUSS-35705] Error, the database version is "
"inconsistent in %s: %s", "inconsistent in %s: %s",
"GAUSS_35706": "[GAUSS-35706] All new hosts %s failed." "GAUSS_35706": "[GAUSS-35706] Fail to %s on all new hosts.",
"GAUSS_35707": "[GAUSS-35707] Fail to check %s version on:\n%s",
"GAUSS_35708": "[GAUSS-35708] Inconsistent %s version with primary on \n%s",
"GAUSS_35709": "[GAUSS-35709] The %s of %s is not %s.",
"GAUSS_35710": "[GAUSS-35710] Generate static file [%s] not found."
} }
########################################################################## ##########################################################################

View File

@ -433,7 +433,7 @@ class ExpansionImpl():
[primaryHost], self.envFile) [primaryHost], self.envFile)
self.logger.debug(outputCollect) self.logger.debug(outputCollect)
if resultMap[primaryHost] != DefaultValue.SUCCESS: if resultMap[primaryHost] != DefaultValue.SUCCESS:
GaussLog.exitWithError("Unable to query current cluster state.") GaussLog.exitWithError(ErrorCode.GAUSS_516["GAUSS_51600"])
instances = re.split('(?:\|)|(?:\n)', outputCollect) instances = re.split('(?:\|)|(?:\n)', outputCollect)
self.existingHosts = [] self.existingHosts = []
pattern = re.compile('(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*') pattern = re.compile('(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*')
@ -495,7 +495,10 @@ gs_guc set -D {dn} -c "available_zone='{azName}'"
add authentication rules about other all hosts ip in new hosts add authentication rules about other all hosts ip in new hosts
""" """
self.logger.debug("Start to set host trust on all node.") self.logger.debug("Start to set host trust on all node.")
allHosts = self.existingHosts + self.context.newHostList allHosts = list(self.existingHosts)
for host in self.context.newHostList:
if self.expansionSuccess[host]:
allHosts.append(host)
for hostExec in allHosts: for hostExec in allHosts:
hostExecName = self.context.backIpNameMap[hostExec] hostExecName = self.context.backIpNameMap[hostExec]
dataNode = self.context.clusterInfoDict[hostExecName]["dataNode"] dataNode = self.context.clusterInfoDict[hostExecName]["dataNode"]
@ -511,8 +514,7 @@ gs_guc set -D {dn} -c "available_zone='{azName}'"
hostParam hostParam
self.logger.debug("[%s] trustCmd:%s" % (hostExec, cmd)) self.logger.debug("[%s] trustCmd:%s" % (hostExec, cmd))
sshTool = SshTool([hostExec]) sshTool = SshTool([hostExec])
resultMap, outputCollect = sshTool.getSshStatusOutput(cmd, sshTool.getSshStatusOutput(cmd, [hostExec], self.envFile)
[hostExec], self.envFile)
self.cleanSshToolFile(sshTool) self.cleanSshToolFile(sshTool)
self.logger.debug("End to set host trust on all node.") self.logger.debug("End to set host trust on all node.")
@ -580,11 +582,14 @@ gs_guc set -D {dn} -c "available_zone='{azName}'"
primaryHost, primaryDataNode, self.envFile) primaryHost, primaryDataNode, self.envFile)
primaryExceptionInfo = "" primaryExceptionInfo = ""
if insType != ROLE_PRIMARY: if insType != ROLE_PRIMARY:
primaryExceptionInfo = "The server mode of primary host" \ primaryExceptionInfo = ErrorCode.GAUSS_357["GAUSS_35709"] % \
"is not primary." ("local_role", "primary", "primary")
if dbStat != STAT_NORMAL: if dbStat != STAT_NORMAL:
primaryExceptionInfo = "The primary is not in Normal state." primaryExceptionInfo = ErrorCode.GAUSS_357["GAUSS_35709"] % \
("db_state", "primary", "Normal")
if primaryExceptionInfo != "": if primaryExceptionInfo != "":
for host in standbyHosts:
self.expansionSuccess[host] = False
self.rollback() self.rollback()
GaussLog.exitWithError(primaryExceptionInfo) GaussLog.exitWithError(primaryExceptionInfo)
@ -737,7 +742,7 @@ gs_guc set -D {dn} -c "available_zone='{azName}'"
self.context.clusterInfo.saveToStaticConfig(staticConfigPath, dbNode.id) self.context.clusterInfo.saveToStaticConfig(staticConfigPath, dbNode.id)
srcFile = staticConfigPath srcFile = staticConfigPath
if not os.path.exists(srcFile): if not os.path.exists(srcFile):
GaussLog.exitWithError("Generate static file [%s] not found." % srcFile) GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35710"] % srcFile)
hostSsh = SshTool([hostName]) hostSsh = SshTool([hostName])
targetFile = "%s/bin/cluster_static_config" % appPath targetFile = "%s/bin/cluster_static_config" % appPath
hostSsh.scpFiles(srcFile, targetFile, [hostName], self.envFile) hostSsh.scpFiles(srcFile, targetFile, [hostName], self.envFile)
@ -802,50 +807,85 @@ remoteservice={remoteservice}'"
gucDict[hostName] = guc_tempate_str gucDict[hostName] = guc_tempate_str
return gucDict return gucDict
def checkLocalModeOnStandbyHosts(self): def checkGaussdbAndGsomVersionOfStandby(self):
""" """
expansion the installed standby node. check standby database. check whether gaussdb and gs_om version of standby are same with priamry
1. if the database is installed correctly
2. if the databases version are same before existing and new
""" """
standbyHosts = self.context.newHostList standbyHosts = self.context.newHostList
envFile = self.envFile envFile = self.envFile
if self.context.standbyLocalMode:
for host in standbyHosts: for host in standbyHosts:
self.expansionSuccess[host] = True self.expansionSuccess[host] = True
self.logger.log("Checking if the database is installed correctly with local mode.") self.logger.log("Checking gaussdb and gs_om version.")
getversioncmd = "source %s;gaussdb --version" % envFile getGaussdbVersionCmd = "source %s;gaussdb --version" % envFile
getGsomVersionCmd = "source %s;gs_om --version" % envFile
gaussdbVersionPattern = re.compile("gaussdb \((.*)\) .*")
gsomVersionPattern = re.compile("gs_om \(.*\) .*")
primaryHostName = self.getPrimaryHostName() primaryHostName = self.getPrimaryHostName()
sshPrimary = SshTool([primaryHostName]) sshPrimary = SshTool([primaryHostName])
resultMap, outputCollect = sshPrimary.getSshStatusOutput( resultMap, outputCollect = sshPrimary.getSshStatusOutput(
getversioncmd, [], envFile) getGaussdbVersionCmd, [], envFile)
if resultMap[primaryHostName] != DefaultValue.SUCCESS: if resultMap[primaryHostName] != DefaultValue.SUCCESS:
GaussLog.exitWithError("Fail to check the version of primary.") GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35707"] %
ipPattern = re.compile("\[.*\] (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):") ("gaussdb", "primary"))
versionPattern = re.compile("gaussdb \((.*)\) .*") primaryGaussdbVersion = gaussdbVersionPattern.findall(outputCollect)[0]
primaryVersion = versionPattern.findall(outputCollect)[0] resultMap, outputCollect = sshPrimary.getSshStatusOutput(
notInstalledHosts = [] getGsomVersionCmd, [], envFile)
wrongVersionHosts = [] if resultMap[primaryHostName] != DefaultValue.SUCCESS:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35707"] %
("gs_om", "primary"))
primaryGsomVersion = gsomVersionPattern.findall(outputCollect)[0]
self.cleanSshToolFile(sshPrimary)
failCheckGaussdbVersionHosts = []
failCheckGsomVersionHosts = []
wrongGaussdbVersionHosts = []
wrongGsomVersionHosts = []
for host in standbyHosts: for host in standbyHosts:
hostName = self.context.backIpNameMap[host] if not self.expansionSuccess[host]:
dataNode = self.context.clusterInfoDict[hostName]["dataNode"] continue
sshTool = SshTool([host]) sshTool = SshTool([host])
# get gaussdb version
resultMap, outputCollect = sshTool.getSshStatusOutput( resultMap, outputCollect = sshTool.getSshStatusOutput(
getversioncmd, [], envFile) getGaussdbVersionCmd, [], envFile)
if resultMap[host] != DefaultValue.SUCCESS: if resultMap[host] != DefaultValue.SUCCESS:
self.expansionSuccess[host] = False self.expansionSuccess[host] = False
notInstalledHosts.append(host) failCheckGaussdbVersionHosts.append(host)
else: else:
version = versionPattern.findall(outputCollect)[0] gaussdbVersion = gaussdbVersionPattern.findall(outputCollect)[0]
if version != primaryVersion: if gaussdbVersion != primaryGaussdbVersion:
self.expansionSuccess[host] = False self.expansionSuccess[host] = False
wrongVersionHosts.append(host) wrongGaussdbVersionHosts.append(host)
if notInstalledHosts: self.cleanSshToolFile(sshTool)
self.logger.log("In local mode, database is not installed " continue
"correctly on these nodes:\n%s" % ", ".join(notInstalledHosts)) # get gs_om version
if wrongVersionHosts: resultMap, outputCollect = sshTool.getSshStatusOutput(
self.logger.log("In local mode, the database version is not same " getGsomVersionCmd, [], envFile)
"with primary on these nodes:\n%s" % ", ".join(wrongVersionHosts)) if resultMap[host] != DefaultValue.SUCCESS:
self.logger.log("End to check the database with locale mode.") self.expansionSuccess[host] = False
failCheckGsomVersionHosts.append(host)
else:
gsomVersion = gsomVersionPattern.findall(outputCollect)[0]
if gsomVersion != primaryGsomVersion:
self.expansionSuccess[host] = False
wrongGsomVersionHosts.append(host)
self.cleanSshToolFile(sshTool)
if failCheckGaussdbVersionHosts:
self.logger.log(ErrorCode.GAUSS_357["GAUSS_35707"] %
("gaussdb", ", ".join(failCheckGaussdbVersionHosts)))
if failCheckGsomVersionHosts:
self.logger.log(ErrorCode.GAUSS_357["GAUSS_35707"] %
("gs_om", ", ".join(failCheckGsomVersionHosts)))
if wrongGaussdbVersionHosts:
self.logger.log(ErrorCode.GAUSS_357["GAUSS_35708"] %
("gaussdb", ", ".join(wrongGaussdbVersionHosts)))
if wrongGsomVersionHosts:
self.logger.log(ErrorCode.GAUSS_357["GAUSS_35708"] %
("gs_om", ", ".join(wrongGsomVersionHosts)))
self.logger.log("End to check gaussdb and gs_om version.\n")
if self._isAllFailed():
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35706"] %
"check gaussdb and gs_om version")
def preInstall(self): def preInstall(self):
""" """
@ -899,7 +939,7 @@ remoteservice={remoteservice}'"
""" """
Check whether the cluster status is normal before expand. Check whether the cluster status is normal before expand.
""" """
self.logger.debug("Start to check cluster status.\n") self.logger.debug("Start to check cluster status.")
curHostName = socket.gethostname() curHostName = socket.gethostname()
command = "" command = ""
@ -913,9 +953,7 @@ remoteservice={remoteservice}'"
resultMap, outputCollect = sshTool.getSshStatusOutput(command, resultMap, outputCollect = sshTool.getSshStatusOutput(command,
[curHostName], self.envFile) [curHostName], self.envFile)
if outputCollect.find("Primary Normal") == -1: if outputCollect.find("Primary Normal") == -1:
GaussLog.exitWithError("Unable to query current cluster status. " + \ GaussLog.exitWithError(ErrorCode.GAUSS_516["GAUSS_51600"])
"Please import environment variables or " +\
"check whether the cluster status is normal.")
self.logger.debug("The primary database is normal.\n") self.logger.debug("The primary database is normal.\n")
@ -975,8 +1013,8 @@ remoteservice={remoteservice}'"
(fstat[stat.ST_GID] == gid and (mode & stat.S_IRGRP > 0)): (fstat[stat.ST_GID] == gid and (mode & stat.S_IRGRP > 0)):
pass pass
else: else:
self.logger.debug("User %s has no access right for file %s" \ self.logger.debug(ErrorCode.GAUSS_501["GAUSS_50100"]
% (self.user, xmlFile)) % (xmlFile, self.user))
os.chown(xmlFile, uid, gid) os.chown(xmlFile, uid, gid)
os.chmod(xmlFile, stat.S_IRUSR) os.chmod(xmlFile, stat.S_IRUSR)
@ -1060,10 +1098,8 @@ remoteservice={remoteservice}'"
if not self.context.standbyLocalMode: if not self.context.standbyLocalMode:
self.logger.log("Start to install database on new nodes.") self.logger.log("Start to install database on new nodes.")
self.installDatabaseOnHosts() self.installDatabaseOnHosts()
else:
self.checkLocalModeOnStandbyHosts()
self.logger.log("Database on standby nodes installed finished.\n") self.logger.log("Database on standby nodes installed finished.\n")
self.checkGaussdbAndGsomVersionOfStandby()
self.logger.log("Start to establish the relationship.") self.logger.log("Start to establish the relationship.")
self.buildStandbyRelation() self.buildStandbyRelation()
# process success # process success
@ -1221,9 +1257,9 @@ class GsCtlCommon:
self.logger.debug(host) self.logger.debug(host)
self.logger.debug(outputCollect) self.logger.debug(outputCollect)
if resultMap[host] == STATUS_FAIL: if resultMap[host] == STATUS_FAIL:
GaussLog.exitWithError("Query cluster failed. Please check " \ GaussLog.exitWithError(ErrorCode.GAUSS_516["GAUSS_51600"] +
"the cluster status or " \ "Please check the cluster status or source the environmental"
"source the environmental variables of user [%s]." % self.user) " variables of user [%s]." % self.user)
self.cleanSshToolTmpFile(sshTool) self.cleanSshToolTmpFile(sshTool)
return outputCollect return outputCollect