From 6729cd9ab3d25571feaba703c17de0b914c09a20 Mon Sep 17 00:00:00 2001 From: xue_meng_en <1836611252@qq.com> Date: Sat, 19 Nov 2022 11:20:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AF=B9=E4=B8=BB=E6=9C=BAte?= =?UTF-8?q?rm=E5=90=88=E6=B3=95=E6=80=A7=E7=9A=84=E6=A0=A1=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tool/cm_tool/Common.py | 2 +- tool/cm_tool/InstallImpl.py | 15 +++--- tool/cm_tool/cm_install | 98 ++++++++++++++++++++++++++++++++----- 3 files changed, 95 insertions(+), 20 deletions(-) diff --git a/tool/cm_tool/Common.py b/tool/cm_tool/Common.py index a4aa83f..b8ee2dd 100644 --- a/tool/cm_tool/Common.py +++ b/tool/cm_tool/Common.py @@ -38,7 +38,7 @@ def getLocalhostName(): def executeCmdOnHost(host, cmd, isLocal = False): if not isLocal: - cmd = 'ssh -o ConnectTimeout=5 %s \"%s\"' % (host, cmd) + cmd = 'ssh -q -o ConnectTimeout=5 %s \"%s\"' % (host, cmd) status, output = subprocess.getstatusoutput(cmd) return status, output diff --git a/tool/cm_tool/InstallImpl.py b/tool/cm_tool/InstallImpl.py index f6c1f0c..4553d55 100644 --- a/tool/cm_tool/InstallImpl.py +++ b/tool/cm_tool/InstallImpl.py @@ -33,7 +33,7 @@ class InstallImpl: self.envFile = install.envFile self.xmlFile = install.xmlFile self.cmDirs = install.cmDirs - self.hostNames = install.hostNames + self.hostnames = install.hostnames self.gaussHome = install.gaussHome self.gaussLog = install.gaussLog self.toolPath = install.toolPath @@ -51,7 +51,7 @@ class InstallImpl: create path: cmdir、cmdir/cm_server、cmdir/cm_agent """ self.logger.log("Preparing CM path.") - for (cmdir, host) in zip(self.cmDirs, self.hostNames): + for (cmdir, host) in zip(self.cmDirs, self.hostnames): cmd = "mkdir -p {cmdir}/cm_server {cmdir}/cm_agent".format(cmdir=cmdir) status, output = self.executeCmdOnHost(host, cmd) if status != 0: @@ -77,7 +77,7 @@ class InstallImpl: # decompress cmpkg on other hosts cmpkgName = os.path.basename(self.cmpkg) - for host in self.hostNames: + for host in self.hostnames: if host == self.localhostName: continue # copy cm pacakage to other hosts @@ -125,7 +125,7 @@ class InstallImpl: touch {gaussHome}/bin/cluster_manual_start fi """.format(gaussHome=self.gaussHome) - for host in self.hostNames: + for host in self.hostnames: status, output = self.executeCmdOnHost(host, cmd) if status != 0: self.logger.debug("Command: " + cmd) @@ -134,7 +134,7 @@ class InstallImpl: def initCMServer(self): self.logger.log("Initializing cm_server.") - for (cmdir, host) in zip(self.cmDirs, self.hostNames): + for (cmdir, host) in zip(self.cmDirs, self.hostnames): cmd = """ cp {gaussHome}/share/config/cm_server.conf.sample {cmdir}/cm_server/cm_server.conf sed 's#log_dir = .*#log_dir = {gaussLog}/cm/cm_server#' {cmdir}/cm_server/cm_server.conf -i @@ -147,7 +147,7 @@ class InstallImpl: def initCMAgent(self): self.logger.log("Initializing cm_agent.") - for (cmdir, host) in zip(self.cmDirs, self.hostNames): + for (cmdir, host) in zip(self.cmDirs, self.hostnames): cmd = """ cp {gaussHome}/share/config/cm_agent.conf.sample {cmdir}/cm_agent/cm_agent.conf && sed 's#log_dir = .*#log_dir = {gaussLog}/cm/cm_agent#' {cmdir}/cm_agent/cm_agent.conf -i && @@ -198,7 +198,7 @@ class InstallImpl: # set crontab on other hosts setCronCmd = "crontab %s" % cronContentTmpFile cleanTmpFileCmd = "rm %s -f" % cronContentTmpFile - for host in self.hostNames: + for host in self.hostnames: if host == self.localhostName: continue # copy cronContentTmpFile to other host @@ -246,6 +246,7 @@ class InstallImpl: self.logger.debug("Command: " + startCmd) errorDetail = "\nStatus: %s\nOutput: %s" % (status, output) self.logger.logExit("Failed to start cluster." + errorDetail) + queryCmd = "source %s; cm_ctl query -Cv" % self.envFile status, output = subprocess.getstatusoutput(queryCmd) if status != 0: diff --git a/tool/cm_tool/cm_install b/tool/cm_tool/cm_install index a2a117f..cff24ce 100644 --- a/tool/cm_tool/cm_install +++ b/tool/cm_tool/cm_install @@ -49,6 +49,9 @@ class Install: self.hostnames = [] self.localhostName = "" self.cmpkg = "" + self.nodesInfo = dict() + self.clusterStopped = False + self.maxTerm = 0 def getLocalhostName(self): import socket @@ -131,7 +134,7 @@ General options: if status != 0: errorDetail = "\nCommand: %s\nStatus: %s\nOutput: %s\n" % ( cmd, status, output) - CMLog.exitWithError("OM tool is required." + errorDetail) + self.logger.logExit("OM tool is required." + errorDetail) def checkXMLFileSecurity(self): """ @@ -174,6 +177,24 @@ General options: """ self.localhostName = getLocalhostName() + # get hostnames and port from static file + cmd = "source %s; gs_om -t view" % self.envFile + status, output = subprocess.getstatusoutput(cmd) + if status != 0: + self.logger.logExit((ErrorCode.GAUSS_514["GAUSS_51400"] % cmd) + \ + "\nStatus:%d\nOutput:" + output) + nodesStaticInfo = re.split("=+", output)[1:] + for nodeInfo in nodesStaticInfo: + if nodeInfo == "": + continue + nodename = re.findall("nodeName:(.*)", nodeInfo)[0] + self.hostnames.append(nodename) + dataPath = re.findall("datanodeLocalDataPath.*:(.*)", nodeInfo)[0] + port = re.findall("datanodePort.*:(.*)", nodeInfo)[0] + self.nodesInfo[nodename] = {"dataPath": dataPath, "port": port} + + # get node info from XML + hostnamesInXML = [] rootNode = self.initParserXMLFile() elementName = 'DEVICELIST' if not rootNode.findall('DEVICELIST'): @@ -188,27 +209,30 @@ General options: paraName = param.attrib['name'] paraValue = param.attrib['value'] if paraName == 'name': - self.hostnames.append(paraValue) + hostnamesInXML.append(paraValue) elif paraName == 'cmDir': self.cmDirs.append(paraValue) elif paraName in cmDict.keys(): cmDict[paraName] = paraValue + # check whether XML contains all nodes info + if self.hostnames != hostnamesInXML: + self.logger.logExit("XML info is not consistent with static file.") # check params in xml for item in cmDict: if item == 'cmServerPortStandby': continue if cmDict[item] == "": - CMLog.exitWithError(ErrorCode.GAUSS_512["GAUSS_51200"] % item) + self.logger.logExit(ErrorCode.GAUSS_512["GAUSS_51200"] % item) if cmDict['cmsNum'] != '1': - CMLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmsNum') + self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmsNum') if cmDict['cmServerlevel'] != '1': - CMLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerlevel') + self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerlevel') if not cmDict['cmServerPortBase'].isdigit(): - CMLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerPortBase') + self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerPortBase') if cmDict['cmServerPortStandby'] != "" and not cmDict['cmServerPortStandby'].isdigit(): - CMLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerPortStandby') + self.logger.logExit(ErrorCode.GAUSS_500["GAUSS_50024"] % 'cmServerPortStandby') if len(self.hostnames) != len(self.cmDirs): - CMLog.exitWithError("\"cmDir\" of all nodes must be provided.") + self.logger.logExit("\"cmDir\" of all nodes must be provided.") def checkHostTrust(self): checkHostsTrust(self.hostnames, self.localhostName) @@ -227,18 +251,68 @@ General options: "grep 'CMServer State' > /dev/null" % self.envFile status, output = subprocess.getstatusoutput(checkCMExistCmd) if status == 0: - CMLog.exitWithError("CM exists in current cluster.") + self.logger.logExit("CM exists in current cluster.") + + def checkCluster(self): + """ + check the status of the current cluster + """ + cmd = "source %s; gs_om -t status --detail" % self.envFile + status, output = subprocess.getstatusoutput(cmd) + if status != 0: + erroeDetail = "Detail:\nCommand:\n" + cmd + "\noutput:" + output + self.logger.logExit(ErrorCode.GAUSS_516["GAUSS_51600"] + erroeDetail) + if "cluster_state : Unavailable" in output: + # It’s permitted to deploy CM tool when cluster is stopped, + # but not permitted when cluster is unavailable. + if output.count("Manually stopped") == len(self.hostnames): + self.clusterStopped = True + return + self.logger.logExit("The cluster is unavailable currently.") + if "cluster_state : Normal" not in output: + self.logger.logExit("Cluster is running but its status is abnormal.") + # check whether term of primary is invalid and biggest. + primaryCount = 0 + primaryTerm = 0 + sqlCmd = "select term from pg_last_xlog_replay_location();" + for host in self.hostnames: + isLocal = False + if host == self.localhostName: + isLocal = True + findPrimaryCmd = "source %s; gs_ctl query -D %s | grep 'local_role.*Primary' > /dev/null" % \ + (self.envFile, self.nodesInfo[host]["dataPath"]) + notPrimary, output = executeCmdOnHost(host, findPrimaryCmd, isLocal) + if notPrimary == 0: + primaryCount += 1 + getTermLsnCmd = "source %s; gsql -d postgres -p %s -tA -c '%s'" % \ + (self.envFile, self.nodesInfo[host]["port"], sqlCmd) + status, term = executeCmdOnHost(host, getTermLsnCmd, isLocal) + if status != 0: + self.logger.logExit("Failed to get term of host %s." % host) + if notPrimary == 0: + primaryTerm = int(term) + if self.maxTerm < int(term): + self.maxTerm = int(term) + + if primaryCount != 1: + self.logger.logExit("The number of primary is invalid.") + if primaryTerm == 0 or primaryTerm < self.maxTerm: + self.logger.logExit("Term of primary is invalid or not maximal.\n" + "Hint: it seems that the cluster is newly installed, so it's " + "recommended to deploy CM tool while installing the cluster.") + def run(self): self.checkExeUser() self.parseCommandLine() self.checkParam() - self.checkOm() - self.checkCM() self.getEnvParams() self.initLogger() - self.getLocalhostName() + self.checkOm() + self.checkCM() self.getInfoListOfAllNodes() + self.getLocalhostName() self.checkHostTrust() + self.checkCluster() installImpl = InstallImpl(self) installImpl.run()