!65 修复CM安装解耦问题

Merge pull request !65 from 薛蒙恩/split_fix
This commit is contained in:
opengauss-bot 2022-11-22 01:41:06 +00:00 committed by Gitee
commit b3bd2f1687
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
4 changed files with 124 additions and 32 deletions

View File

@ -38,7 +38,7 @@ def getLocalhostName():
def executeCmdOnHost(host, cmd, isLocal = False):
if not isLocal:
cmd = 'ssh -o ConnectTimeout=5 %s \"%s\"' % (host, cmd)
cmd = 'ssh -q -o ConnectTimeout=5 %s \"%s\"' % (host, cmd)
status, output = subprocess.getstatusoutput(cmd)
return status, output

View File

@ -33,13 +33,14 @@ class InstallImpl:
self.envFile = install.envFile
self.xmlFile = install.xmlFile
self.cmDirs = install.cmDirs
self.hostNames = install.hostNames
self.hostnames = install.hostnames
self.gaussHome = install.gaussHome
self.gaussLog = install.gaussLog
self.toolPath = install.toolPath
self.tmpPath = install.tmpPath
self.localhostName = install.localhostName
self.logger = install.logger
self.clusterStopped = install.clusterStopped
def executeCmdOnHost(self, host, cmd, isLocal = False):
if host == self.localhostName:
@ -51,7 +52,7 @@ class InstallImpl:
create path: cmdircmdir/cm_servercmdir/cm_agent
"""
self.logger.log("Preparing CM path.")
for (cmdir, host) in zip(self.cmDirs, self.hostNames):
for (cmdir, host) in zip(self.cmDirs, self.hostnames):
cmd = "mkdir -p {cmdir}/cm_server {cmdir}/cm_agent".format(cmdir=cmdir)
status, output = self.executeCmdOnHost(host, cmd)
if status != 0:
@ -77,7 +78,7 @@ class InstallImpl:
# decompress cmpkg on other hosts
cmpkgName = os.path.basename(self.cmpkg)
for host in self.hostNames:
for host in self.hostnames:
if host == self.localhostName:
continue
# copy cm pacakage to other hosts
@ -125,7 +126,7 @@ class InstallImpl:
touch {gaussHome}/bin/cluster_manual_start
fi
""".format(gaussHome=self.gaussHome)
for host in self.hostNames:
for host in self.hostnames:
status, output = self.executeCmdOnHost(host, cmd)
if status != 0:
self.logger.debug("Command: " + cmd)
@ -134,7 +135,7 @@ class InstallImpl:
def initCMServer(self):
self.logger.log("Initializing cm_server.")
for (cmdir, host) in zip(self.cmDirs, self.hostNames):
for (cmdir, host) in zip(self.cmDirs, self.hostnames):
cmd = """
cp {gaussHome}/share/config/cm_server.conf.sample {cmdir}/cm_server/cm_server.conf
sed 's#log_dir = .*#log_dir = {gaussLog}/cm/cm_server#' {cmdir}/cm_server/cm_server.conf -i
@ -147,7 +148,7 @@ class InstallImpl:
def initCMAgent(self):
self.logger.log("Initializing cm_agent.")
for (cmdir, host) in zip(self.cmDirs, self.hostNames):
for (cmdir, host) in zip(self.cmDirs, self.hostnames):
cmd = """
cp {gaussHome}/share/config/cm_agent.conf.sample {cmdir}/cm_agent/cm_agent.conf &&
sed 's#log_dir = .*#log_dir = {gaussLog}/cm/cm_agent#' {cmdir}/cm_agent/cm_agent.conf -i &&
@ -198,7 +199,10 @@ class InstallImpl:
# set crontab on other hosts
setCronCmd = "crontab %s" % cronContentTmpFile
cleanTmpFileCmd = "rm %s -f" % cronContentTmpFile
for host in self.hostNames:
import getpass
username = getpass.getuser()
killMonitorCmd = "pkill om_monitor -u %s; " % username
for host in self.hostnames:
if host == self.localhostName:
continue
# copy cronContentTmpFile to other host
@ -218,7 +222,8 @@ class InstallImpl:
self.logger.logExit(ErrorCode.GAUSS_508["GAUSS_50801"] + errorDetail)
# start om_monitor
status, output = self.executeCmdOnHost(host, startMonitorCmd)
# Firstly, kill residual om_monitor, otherwise cm_agent won't be started if there are residual om_monitor process.
status, output = self.executeCmdOnHost(host, killMonitorCmd + startMonitorCmd)
if status != 0:
self.logger.debug("Command: " + startMonitorCmd)
errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
@ -232,7 +237,7 @@ class InstallImpl:
self.logger.logExit(ErrorCode.GAUSS_508["GAUSS_50801"] + errorDetail)
os.remove(cronContentTmpFile)
status, output = subprocess.getstatusoutput(startMonitorCmd)
status, output = subprocess.getstatusoutput(killMonitorCmd + startMonitorCmd)
if status != 0:
self.logger.debug("Command: " + startMonitorCmd)
errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
@ -246,6 +251,11 @@ class InstallImpl:
self.logger.debug("Command: " + startCmd)
errorDetail = "\nStatus: %s\nOutput: %s" % (status, output)
self.logger.logExit("Failed to start cluster." + errorDetail)
status, output = InstallImpl.refreshDynamicFile(self.envFile)
if status != 0:
self.logger.error("Failed to refresh dynamic file." + output)
queryCmd = "source %s; cm_ctl query -Cv" % self.envFile
status, output = subprocess.getstatusoutput(queryCmd)
if status != 0:
@ -274,11 +284,6 @@ class InstallImpl:
@staticmethod
def refreshDynamicFile(envFile):
# refresh dynamic file
getStatusCmd = "source %s; gs_om -t status --detail | grep 'Primary Normal' > /dev/null" % envFile
status, output = subprocess.getstatusoutput(getStatusCmd)
if status != 0:
CMLog.printMessage("Normal primary doesn't exist in the cluster, no need to refresh dynamic file.")
return 0, ""
refreshDynamicFileCmd = "source %s; gs_om -t refreshconf" % envFile
status, output = subprocess.getstatusoutput(refreshDynamicFileCmd)
errorDetail = ""
@ -286,14 +291,11 @@ class InstallImpl:
errorDetail = "\nCommand: %s\nStatus: %s\nOutput: %s" % (refreshDynamicFileCmd, status, output)
return status, errorDetail
def refreshStaticAndDynamicFile(self):
def _refreshStaticFile(self):
self.logger.log("Refreshing static and dynamic file using xml file with cm.")
status, output = InstallImpl.refreshStaticFile(self.envFile, self.xmlFile)
if status != 0:
self.logger.logExit("Failed to refresh static file." + output)
status, output = InstallImpl.refreshDynamicFile(self.envFile)
if status != 0:
self.logger.logExit("Failed to refresh dynamic file." + output)
def run(self):
self.logger.log("Start to install cm tool.")
@ -302,6 +304,6 @@ class InstallImpl:
self.createManualStartFile()
self.initCMServer()
self.initCMAgent()
self.refreshStaticAndDynamicFile()
self._refreshStaticFile()
self.setMonitorCrontab()
self.startCluster()

View File

@ -49,6 +49,9 @@ class Install:
self.hostnames = []
self.localhostName = ""
self.cmpkg = ""
self.nodesInfo = dict()
self.clusterStopped = False
self.maxTerm = 0
def getLocalhostName(self):
import socket
@ -131,7 +134,7 @@ General options:
if status != 0:
errorDetail = "\nCommand: %s\nStatus: %s\nOutput: %s\n" % (
cmd, status, output)
CMLog.exitWithError("OM tool is required." + errorDetail)
self.logger.logExit("OM tool is required." + errorDetail)
def checkXMLFileSecurity(self):
"""
@ -174,6 +177,24 @@ General options:
"""
self.localhostName = getLocalhostName()
# get hostnames and port from static file
cmd = "source %s; gs_om -t view" % self.envFile
status, output = subprocess.getstatusoutput(cmd)
if status != 0:
self.logger.logExit((ErrorCode.GAUSS_514["GAUSS_51400"] % cmd) + \
"\nStatus:%d\nOutput:" + output)
nodesStaticInfo = re.split("=+", output)[1:]
for nodeInfo in nodesStaticInfo:
if nodeInfo == "":
continue
nodename = re.findall("nodeName:(.*)", nodeInfo)[0]
self.hostnames.append(nodename)
dataPath = re.findall("datanodeLocalDataPath.*:(.*)", nodeInfo)[0]
port = re.findall("datanodePort.*:(.*)", nodeInfo)[0]
self.nodesInfo[nodename] = {"dataPath": dataPath, "port": port}
# get node info from XML
hostnamesInXML = []
rootNode = self.initParserXMLFile()
elementName = 'DEVICELIST'
if not rootNode.findall('DEVICELIST'):
@ -188,27 +209,30 @@ General options:
paraName = param.attrib['name']
paraValue = param.attrib['value']
if paraName == 'name':
self.hostnames.append(paraValue)
hostnamesInXML.append(paraValue)
elif paraName == 'cmDir':
self.cmDirs.append(paraValue)
elif paraName in cmDict.keys():
cmDict[paraName] = paraValue
# check whether XML contains all nodes info
if self.hostnames != hostnamesInXML:
self.logger.logExit("XML info is not consistent with static file.")
# check params in xml
for item in cmDict:
if item == 'cmServerPortStandby':
continue
if cmDict[item] == "":
CMLog.exitWithError(ErrorCode.GAUSS_512["GAUSS_51200"] % item)
self.logger.logExit(ErrorCode.GAUSS_512["GAUSS_51200"] % item)
if cmDict['cmsNum'] != '1':
CMLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmsNum')
self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmsNum')
if cmDict['cmServerlevel'] != '1':
CMLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerlevel')
self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerlevel')
if not cmDict['cmServerPortBase'].isdigit():
CMLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerPortBase')
self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerPortBase')
if cmDict['cmServerPortStandby'] != "" and not cmDict['cmServerPortStandby'].isdigit():
CMLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerPortStandby')
self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerPortStandby')
if len(self.hostnames) != len(self.cmDirs):
CMLog.exitWithError("\"cmDir\" of all nodes must be provided.")
self.logger.logExit("\"cmDir\" of all nodes must be provided.")
def checkHostTrust(self):
checkHostsTrust(self.hostnames, self.localhostName)
@ -227,18 +251,68 @@ General options:
"grep 'CMServer State' > /dev/null" % self.envFile
status, output = subprocess.getstatusoutput(checkCMExistCmd)
if status == 0:
CMLog.exitWithError("CM exists in current cluster.")
self.logger.logExit("CM exists in current cluster.")
def checkCluster(self):
"""
check the status of the current cluster
"""
cmd = "source %s; gs_om -t status --detail" % self.envFile
status, output = subprocess.getstatusoutput(cmd)
if status != 0:
erroeDetail = "Detail:\nCommand:\n" + cmd + "\noutput:" + output
self.logger.logExit(ErrorCode.GAUSS_516["GAUSS_51600"] + erroeDetail)
if "cluster_state : Unavailable" in output:
# It’s permitted to deploy CM tool when cluster is stopped,
# but not permitted when cluster is unavailable.
if output.count("Manually stopped") == len(self.hostnames):
self.clusterStopped = True
return
self.logger.logExit("The cluster is unavailable currently.")
if "cluster_state : Normal" not in output:
self.logger.logExit("Cluster is running but its status is abnormal.")
# check whether term of primary is invalid and biggest.
primaryCount = 0
primaryTerm = 0
sqlCmd = "select term from pg_last_xlog_replay_location();"
for host in self.hostnames:
isLocal = False
if host == self.localhostName:
isLocal = True
findPrimaryCmd = "source %s; gs_ctl query -D %s | grep 'local_role.*Primary' > /dev/null" % \
(self.envFile, self.nodesInfo[host]["dataPath"])
notPrimary, output = executeCmdOnHost(host, findPrimaryCmd, isLocal)
if notPrimary == 0:
primaryCount += 1
getTermLsnCmd = "source %s; gsql -d postgres -p %s -tA -c '%s'" % \
(self.envFile, self.nodesInfo[host]["port"], sqlCmd)
status, term = executeCmdOnHost(host, getTermLsnCmd, isLocal)
if status != 0:
self.logger.logExit("Failed to get term of host %s." % host)
if notPrimary == 0:
primaryTerm = int(term)
if self.maxTerm < int(term):
self.maxTerm = int(term)
if primaryCount != 1:
self.logger.logExit("The number of primary is invalid.")
if primaryTerm == 0 or primaryTerm < self.maxTerm:
self.logger.logExit("Term of primary is invalid or not maximal.\n"
"Hint: it seems that the cluster is newly installed, so it's "
"recommended to deploy CM tool while installing the cluster.")
def run(self):
self.checkExeUser()
self.parseCommandLine()
self.checkParam()
self.checkOm()
self.checkCM()
self.getEnvParams()
self.initLogger()
self.getLocalhostName()
self.checkOm()
self.checkCM()
self.getInfoListOfAllNodes()
self.getLocalhostName()
self.checkHostTrust()
self.checkCluster()
installImpl = InstallImpl(self)
installImpl.run()

View File

@ -196,6 +196,22 @@ General options:
status, output = InstallImpl.refreshStaticFile(self.envFile, self.xmlFile)
if status != 0:
self.logger.logExit("Failed to refresh static file." + output)
# Remove dynamic file, if the cluster is stopped currently.
removeDynamicCmd = "source %s; rm -f $GAUSSHOME/bin/cluster_dynamic_config" % self.envFile
for host in self.hostnames:
isLocal = False
if host == self.localhostName:
isLocal = True
executeCmdOnHost(host, removeDynamicCmd, isLocal)
clusterStopped = False
checkClusterStoppedCmd = "source %s; ls $GAUSSHOME/bin/cluster_manual_start" % self.envFile
status, output = subprocess.getstatusoutput(checkClusterStoppedCmd)
if status == 0:
clusterStopped = True
self.logger.debug("Command: " + checkClusterStoppedCmd)
self.logger.debug("Status: %s\nOtput: %s" % (status, output))
if clusterStopped:
return
status, output = InstallImpl.refreshDynamicFile(self.envFile)
if status != 0:
self.logger.logExit("Failed to refresh dynamic file." + output)