增加对主机term合法性的校验

This commit is contained in:
xue_meng_en 2022-11-19 11:20:05 +08:00
parent fd9b981119
commit 6729cd9ab3
3 changed files with 95 additions and 20 deletions

View File

@ -38,7 +38,7 @@ def getLocalhostName():
def executeCmdOnHost(host, cmd, isLocal = False):
if not isLocal:
cmd = 'ssh -o ConnectTimeout=5 %s \"%s\"' % (host, cmd)
cmd = 'ssh -q -o ConnectTimeout=5 %s \"%s\"' % (host, cmd)
status, output = subprocess.getstatusoutput(cmd)
return status, output

View File

@ -33,7 +33,7 @@ class InstallImpl:
self.envFile = install.envFile
self.xmlFile = install.xmlFile
self.cmDirs = install.cmDirs
self.hostNames = install.hostNames
self.hostnames = install.hostnames
self.gaussHome = install.gaussHome
self.gaussLog = install.gaussLog
self.toolPath = install.toolPath
@ -51,7 +51,7 @@ class InstallImpl:
create path: cmdircmdir/cm_servercmdir/cm_agent
"""
self.logger.log("Preparing CM path.")
for (cmdir, host) in zip(self.cmDirs, self.hostNames):
for (cmdir, host) in zip(self.cmDirs, self.hostnames):
cmd = "mkdir -p {cmdir}/cm_server {cmdir}/cm_agent".format(cmdir=cmdir)
status, output = self.executeCmdOnHost(host, cmd)
if status != 0:
@ -77,7 +77,7 @@ class InstallImpl:
# decompress cmpkg on other hosts
cmpkgName = os.path.basename(self.cmpkg)
for host in self.hostNames:
for host in self.hostnames:
if host == self.localhostName:
continue
# copy cm pacakage to other hosts
@ -125,7 +125,7 @@ class InstallImpl:
touch {gaussHome}/bin/cluster_manual_start
fi
""".format(gaussHome=self.gaussHome)
for host in self.hostNames:
for host in self.hostnames:
status, output = self.executeCmdOnHost(host, cmd)
if status != 0:
self.logger.debug("Command: " + cmd)
@ -134,7 +134,7 @@ class InstallImpl:
def initCMServer(self):
self.logger.log("Initializing cm_server.")
for (cmdir, host) in zip(self.cmDirs, self.hostNames):
for (cmdir, host) in zip(self.cmDirs, self.hostnames):
cmd = """
cp {gaussHome}/share/config/cm_server.conf.sample {cmdir}/cm_server/cm_server.conf
sed 's#log_dir = .*#log_dir = {gaussLog}/cm/cm_server#' {cmdir}/cm_server/cm_server.conf -i
@ -147,7 +147,7 @@ class InstallImpl:
def initCMAgent(self):
self.logger.log("Initializing cm_agent.")
for (cmdir, host) in zip(self.cmDirs, self.hostNames):
for (cmdir, host) in zip(self.cmDirs, self.hostnames):
cmd = """
cp {gaussHome}/share/config/cm_agent.conf.sample {cmdir}/cm_agent/cm_agent.conf &&
sed 's#log_dir = .*#log_dir = {gaussLog}/cm/cm_agent#' {cmdir}/cm_agent/cm_agent.conf -i &&
@ -198,7 +198,7 @@ class InstallImpl:
# set crontab on other hosts
setCronCmd = "crontab %s" % cronContentTmpFile
cleanTmpFileCmd = "rm %s -f" % cronContentTmpFile
for host in self.hostNames:
for host in self.hostnames:
if host == self.localhostName:
continue
# copy cronContentTmpFile to other host
@ -246,6 +246,7 @@ class InstallImpl:
self.logger.debug("Command: " + startCmd)
errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
self.logger.logExit("Failed to start cluster." + errorDetail)
queryCmd = "source %s; cm_ctl query -Cv" % self.envFile
status, output = subprocess.getstatusoutput(queryCmd)
if status != 0:

View File

@ -49,6 +49,9 @@ class Install:
self.hostnames = []
self.localhostName = ""
self.cmpkg = ""
self.nodesInfo = dict()
self.clusterStopped = False
self.maxTerm = 0
def getLocalhostName(self):
import socket
@ -131,7 +134,7 @@ General options:
if status != 0:
errorDetail = "\nCommand: %s\nStatus: %s\nOutput: %s\n" % (
cmd, status, output)
CMLog.exitWithError("OM tool is required." + errorDetail)
self.logger.logExit("OM tool is required." + errorDetail)
def checkXMLFileSecurity(self):
"""
@ -174,6 +177,24 @@ General options:
"""
self.localhostName = getLocalhostName()
# get hostnames and port from static file
cmd = "source %s; gs_om -t view" % self.envFile
status, output = subprocess.getstatusoutput(cmd)
if status != 0:
self.logger.logExit((ErrorCode.GAUSS_514["GAUSS_51400"] % cmd) + \
"\nStatus:%d\nOutput:" + output)
nodesStaticInfo = re.split("=+", output)[1:]
for nodeInfo in nodesStaticInfo:
if nodeInfo == "":
continue
nodename = re.findall("nodeName:(.*)", nodeInfo)[0]
self.hostnames.append(nodename)
dataPath = re.findall("datanodeLocalDataPath.*:(.*)", nodeInfo)[0]
port = re.findall("datanodePort.*:(.*)", nodeInfo)[0]
self.nodesInfo[nodename] = {"dataPath": dataPath, "port": port}
# get node info from XML
hostnamesInXML = []
rootNode = self.initParserXMLFile()
elementName = 'DEVICELIST'
if not rootNode.findall('DEVICELIST'):
@ -188,27 +209,30 @@ General options:
paraName = param.attrib['name']
paraValue = param.attrib['value']
if paraName == 'name':
self.hostnames.append(paraValue)
hostnamesInXML.append(paraValue)
elif paraName == 'cmDir':
self.cmDirs.append(paraValue)
elif paraName in cmDict.keys():
cmDict[paraName] = paraValue
# check whether XML contains all nodes info
if self.hostnames != hostnamesInXML:
self.logger.logExit("XML info is not consistent with static file.")
# check params in xml
for item in cmDict:
if item == 'cmServerPortStandby':
continue
if cmDict[item] == "":
CMLog.exitWithError(ErrorCode.GAUSS_512["GAUSS_51200"] % item)
self.logger.logExit(ErrorCode.GAUSS_512["GAUSS_51200"] % item)
if cmDict['cmsNum'] != '1':
CMLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmsNum')
self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmsNum')
if cmDict['cmServerlevel'] != '1':
CMLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerlevel')
self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerlevel')
if not cmDict['cmServerPortBase'].isdigit():
CMLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerPortBase')
self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerPortBase')
if cmDict['cmServerPortStandby'] != "" and not cmDict['cmServerPortStandby'].isdigit():
CMLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerPortStandby')
self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerPortStandby')
if len(self.hostnames) != len(self.cmDirs):
CMLog.exitWithError("\"cmDir\" of all nodes must be provided.")
self.logger.logExit("\"cmDir\" of all nodes must be provided.")
def checkHostTrust(self):
checkHostsTrust(self.hostnames, self.localhostName)
@ -227,18 +251,68 @@ General options:
"grep 'CMServer State' > /dev/null" % self.envFile
status, output = subprocess.getstatusoutput(checkCMExistCmd)
if status == 0:
CMLog.exitWithError("CM exists in current cluster.")
self.logger.logExit("CM exists in current cluster.")
def checkCluster(self):
"""
check the status of the current cluster
"""
cmd = "source %s; gs_om -t status --detail" % self.envFile
status, output = subprocess.getstatusoutput(cmd)
if status != 0:
erroeDetail = "Detail:\nCommand:\n" + cmd + "\noutput:" + output
self.logger.logExit(ErrorCode.GAUSS_516["GAUSS_51600"] + erroeDetail)
if "cluster_state : Unavailable" in output:
# It’s permitted to deploy CM tool when cluster is stopped,
# but not permitted when cluster is unavailable.
if output.count("Manually stopped") == len(self.hostnames):
self.clusterStopped = True
return
self.logger.logExit("The cluster is unavailable currently.")
if "cluster_state : Normal" not in output:
self.logger.logExit("Cluster is running but its status is abnormal.")
# check whether term of primary is invalid and biggest.
primaryCount = 0
primaryTerm = 0
sqlCmd = "select term from pg_last_xlog_replay_location();"
for host in self.hostnames:
isLocal = False
if host == self.localhostName:
isLocal = True
findPrimaryCmd = "source %s; gs_ctl query -D %s | grep 'local_role.*Primary' > /dev/null" % \
(self.envFile, self.nodesInfo[host]["dataPath"])
notPrimary, output = executeCmdOnHost(host, findPrimaryCmd, isLocal)
if notPrimary == 0:
primaryCount += 1
getTermLsnCmd = "source %s; gsql -d postgres -p %s -tA -c '%s'" % \
(self.envFile, self.nodesInfo[host]["port"], sqlCmd)
status, term = executeCmdOnHost(host, getTermLsnCmd, isLocal)
if status != 0:
self.logger.logExit("Failed to get term of host %s." % host)
if notPrimary == 0:
primaryTerm = int(term)
if self.maxTerm < int(term):
self.maxTerm = int(term)
if primaryCount != 1:
self.logger.logExit("The number of primary is invalid.")
if primaryTerm == 0 or primaryTerm < self.maxTerm:
self.logger.logExit("Term of primary is invalid or not maximal.\n"
"Hint: it seems that the cluster is newly installed, so it's "
"recommended to deploy CM tool while installing the cluster.")
def run(self):
self.checkExeUser()
self.parseCommandLine()
self.checkParam()
self.checkOm()
self.checkCM()
self.getEnvParams()
self.initLogger()
self.getLocalhostName()
self.checkOm()
self.checkCM()
self.getInfoListOfAllNodes()
self.getLocalhostName()
self.checkHostTrust()
self.checkCluster()
installImpl = InstallImpl(self)
installImpl.run()