diff --git a/src/manager/om/script/gs_expansion b/src/manager/om/script/gs_expansion index ee2d8648b..83696f78a 100644 --- a/src/manager/om/script/gs_expansion +++ b/src/manager/om/script/gs_expansion @@ -20,12 +20,8 @@ ############################################################################# import os -import pwd import sys -import threading -import uuid -import subprocess -import weakref + sys.path.append(sys.path[0]) from gspylib.common.DbClusterInfo import dbClusterInfo, \ @@ -61,6 +57,7 @@ class Expansion(ParallelBaseOM): os.path.join(os.path.realpath(__file__), "../../")) self.standbyLocalMode = False + self.envFile = DefaultValue.getEnv("MPPDB_ENV_SEPARATE_PATH") def usage(self): """ @@ -142,6 +139,13 @@ General options: backIpList = clusterInfo.getClusterBackIps() nodeNameList = clusterInfo.getClusterNodeNames() + # only support single az now. + azNames = clusterInfo.getazNames() + self.azName = "AZ1" + if len(azNames) > 0: + self.azName = azNames[0] + + self.localIp = backIpList[0] self.nodeNameList = nodeNameList self.backIpNameMap = {} for backip in backIpList: diff --git a/src/manager/om/script/gspylib/common/ErrorCode.py b/src/manager/om/script/gspylib/common/ErrorCode.py index 8d0eb24ae..517ee97c9 100644 --- a/src/manager/om/script/gspylib/common/ErrorCode.py +++ b/src/manager/om/script/gspylib/common/ErrorCode.py @@ -1108,7 +1108,11 @@ class ErrorCode(): "database on node is abnormal. \n" "node [%s], user [%s], dataNode [%s]. \n" "You can use command \"gs_ctl query -D %s\" for more " - "detail." + "detail.", + "GAUSS_35704": "[GAUSS-35704] %s [%s] does not exist on node [%s].", + "GAUSS_35705": "[GAUSS-35705] Error, the database version is " + "inconsistent in %s: %s" + } diff --git a/src/manager/om/script/impl/expansion/ExpansionImpl.py b/src/manager/om/script/impl/expansion/ExpansionImpl.py index 95e21dd77..939da41a0 100644 --- a/src/manager/om/script/impl/expansion/ExpansionImpl.py +++ b/src/manager/om/script/impl/expansion/ExpansionImpl.py @@ -25,8 +25,12 @@ import os import getpass import pwd import datetime +import weakref from random import sample import time +import grp +import socket +import stat from multiprocessing import Process, Value sys.path.append(sys.path[0] + "/../../../../") @@ -86,12 +90,14 @@ class ExpansionImpl(): self.tempFileDir = "/tmp/gs_expansion_%s" % (currentTime) self.logger.debug("tmp expansion dir is %s ." % self.tempFileDir) - + self._finalizer = weakref.finalize(self, self.clearTmpFile) + + def sendSoftToHosts(self): """ create software dir and send it on each nodes """ - self.logger.debug("Start to send software to each standby nodes.\n") + self.logger.debug("Start to send soft to each standby nodes.\n") hostNames = self.context.newHostList hostList = hostNames @@ -115,7 +121,8 @@ class ExpansionImpl(): group=self.group,srcFile=path2ChangeMode) sshTool.executeCommand(changeModCmd, "", DefaultValue.SUCCESS, hostList) - self.logger.debug("End to send software to each standby nodes.\n") + self.logger.debug("End to send soft to each standby nodes.\n") + self.cleanSshToolFile(sshTool) def generateAndSendXmlFile(self): """ @@ -134,9 +141,10 @@ class ExpansionImpl(): for host in newHosts: # create single deploy xml file for each standby node xmlContent = self.__generateXml(host) - fo = open("%s" % tempXmlFile, "w") - fo.write( xmlContent ) - fo.close() + with os.fdopen(os.open("%s" % tempXmlFile, os.O_WRONLY | os.O_CREAT, + stat.S_IWUSR | stat.S_IRUSR),'w') as fo: + fo.write( xmlContent ) + fo.close() # send single deploy xml file to each standby node sshTool = SshTool(host) retmap, output = sshTool.getSshStatusOutput("mkdir -p %s" % @@ -145,7 +153,8 @@ class ExpansionImpl(): (self.user, self.group, self.tempFileDir), [host], self.envFile) sshTool.scpFiles("%s" % tempXmlFile, "%s" % tempXmlFile, [host], self.envFile) - + self.cleanSshToolFile(sshTool) + self.logger.debug("End to generateAndSend XML file.\n") def __generateXml(self, backIp): @@ -180,7 +189,7 @@ class ExpansionImpl(): - + @@ -193,7 +202,7 @@ class ExpansionImpl(): """.format(nodeName=nodeName,backIp=backIp,appPath=appPath, logPath=logPath,toolPath=toolPath,corePath=corePath, - sshIp=sshIp,port=port,dataNode=dataNode) + sshIp=sshIp,port=port,dataNode=dataNode,azName=self.context.azName) return xmlConfig def changeUser(self): @@ -326,8 +335,9 @@ class ExpansionImpl(): statusValues = status.values() if STATUS_FAIL in statusValues: GaussLog.exitWithError(output) - + self.logger.debug("End to preinstall database step.\n") + self.cleanSshToolFile(sshTool) def buildStandbyRelation(self): @@ -345,6 +355,7 @@ class ExpansionImpl(): self.queryPrimaryClusterDetail() self.setPrimaryGUCConfig() self.setStandbyGUCConfig() + self.restartSingleDbWithPrimaryMode() self.buildStandbyHosts() self.generateClusterStaticFile() @@ -368,10 +379,27 @@ class ExpansionImpl(): """ self.logger.debug("Start to set primary node GUC config.\n") primaryHost = self.getPrimaryHostName() - dataNode = self.context.clusterInfoDict[primaryHost]["dataNode"] self.setGUCOnClusterHosts([primaryHost]) self.addStandbyIpInPrimaryConf() + + + def setStandbyGUCConfig(self): + """ + set the expansion standby node db guc config + """ + self.logger.debug("Stat to set standby node GUC config.\n") + nodeList = self.context.nodeNameList + primaryHost = self.getPrimaryHostName() + standbyHostNames = list(set(nodeList).difference(set([primaryHost]))) + self.setGUCOnClusterHosts(standbyHostNames) + + def restartSingleDbWithPrimaryMode(self): + """ + """ + primaryHost = self.getPrimaryHostName() + dataNode = self.context.clusterInfoDict[primaryHost]["dataNode"] + insType, dbStat = self.commonGsCtl.queryInstanceStatus(primaryHost, dataNode,self.envFile) if insType != MODE_PRIMARY: @@ -391,17 +419,6 @@ retry for %s times" % start_retry_num) self.commonGsCtl.startInstanceWithMode(primaryHost, dataNode, MODE_PRIMARY, self.envFile) start_retry_num = start_retry_num + 1 - - def setStandbyGUCConfig(self): - """ - """ - self.logger.debug("Start to set standby node GUC config.\n") - standbyHosts = self.context.newHostList - standbyHostNames = [] - for host in standbyHosts: - hostName = self.context.backIpNameMap[host] - standbyHostNames.append(hostName) - self.setGUCOnClusterHosts(standbyHostNames) def addStandbyIpInPrimaryConf(self): """ @@ -421,6 +438,7 @@ retry for %s times" % start_retry_num) resultMap, outputCollect = sshTool.getSshStatusOutput(command, [primaryHost], self.envFile) self.logger.debug(outputCollect) + self.cleanSshToolFile(sshTool) def reloadPrimaryConf(self): """ @@ -433,6 +451,7 @@ retry for %s times" % start_retry_num) resultMap, outputCollect = sshTool.getSshStatusOutput(command, [primaryHost], self.envFile) self.logger.debug(outputCollect) + self.cleanSshToolFile(sshTool) def getPrimaryHostName(self): """ @@ -451,12 +470,15 @@ retry for %s times" % start_retry_num) stop the new standby host`s database and build it as standby mode """ self.logger.debug("start to build standby node...\n") - + standbyHosts = self.context.newHostList for host in standbyHosts: hostName = self.context.backIpNameMap[host] dataNode = self.context.clusterInfoDict[hostName]["dataNode"] + + self.checkTmpDir(hostName) + self.commonGsCtl.stopInstance(hostName, dataNode, self.envFile) self.commonGsCtl.startInstanceWithMode(hostName, dataNode, MODE_STANDBY, self.envFile) @@ -467,11 +489,8 @@ retry for %s times" % start_retry_num) insType, dbStat = self.commonGsCtl.queryInstanceStatus(hostName, dataNode, self.envFile) if insType != MODE_STANDBY: - self.logger.debug("Start databasse as Standby mode failed, \ + self.logger.debug("Start database as Standby mode failed, \ retry for %s times" % start_retry_num) - self.setGUCOnClusterHosts([]) - self.addStandbyIpInPrimaryConf() - self.reloadPrimaryConf() self.commonGsCtl.startInstanceWithMode(hostName, dataNode, MODE_STANDBY, self.envFile) start_retry_num = start_retry_num + 1 @@ -500,6 +519,21 @@ retry for %s times" % start_retry_num) else: break + def checkTmpDir(self, hostName): + """ + if the tmp dir id not exist, create it. + """ + tmpDir = os.path.realpath(DefaultValue.getTmpDirFromEnv()) + checkCmd = 'if [ ! -d "%s" ]; then exit 1;fi;' % (tmpDir) + sshTool = SshTool([hostName]) + resultMap, outputCollect = sshTool.getSshStatusOutput(checkCmd, + [hostName], self.envFile) + ret = resultMap[hostName] + if ret == STATUS_FAIL: + self.logger.debug("Node [%s] does not have tmp dir. need to fix.") + fixCmd = "mkdir -p %s" % (tmpDir) + sshTool.getSshStatusOutput(fixCmd, [hostName], self.envFile) + self.cleanSshToolFile(sshTool) def generateClusterStaticFile(self): """ @@ -508,11 +542,12 @@ retry for %s times" % start_retry_num) self.logger.debug("Start to generate and send cluster static file.\n") primaryHosts = self.getPrimaryHostName() - command = "gs_om -t generateconf -X %s" % self.context.xmlFile + command = "gs_om -t generateconf -X %s --distribute" % self.context.xmlFile sshTool = SshTool([primaryHosts]) resultMap, outputCollect = sshTool.getSshStatusOutput(command, [primaryHosts], self.envFile) self.logger.debug(outputCollect) + self.cleanSshToolFile(sshTool) nodeNameList = self.context.nodeNameList @@ -522,8 +557,12 @@ retry for %s times" % start_retry_num) appPath = self.context.clusterInfoDict["appPath"] srcFile = "%s/script/static_config_files/cluster_static_config_%s" \ % (toolPath, hostName) + if not os.path.exists(srcFile): + GaussLog.exitWithError("Generate static file [%s] not found." \ + % srcFile) targetFile = "%s/bin/cluster_static_config" % appPath hostSsh.scpFiles(srcFile, targetFile, [hostName], self.envFile) + self.cleanSshToolFile(hostSsh) self.logger.debug("End to generate and send cluster static file.\n") time.sleep(10) @@ -549,9 +588,9 @@ retry for %s times" % start_retry_num) insType, dbStat = self.commonGsCtl.queryInstanceStatus(hostName, dataNode, self.envFile) if dbStat != STAT_NORMAL: - self.commonGsCtl.buildInstance(hostName, dataNode, + self.commonGsCtl.startInstanceWithMode(hostName, dataNode, MODE_STANDBY, self.envFile) - + self.commonGsCtl.startOmCluster(primaryHosts, self.envFile) def setGUCOnClusterHosts(self, hostNames=[]): @@ -582,10 +621,11 @@ retry for %s times" % start_retry_num) subprocess.getstatusoutput("mkdir -m a+x -p %s; touch %s; \ cat /dev/null > %s" % \ (self.tempFileDir, tempShFile, tempShFile)) - fo = open("%s" % tempShFile, "w") - fo.write("#bash\n") - fo.write( command ) - fo.close() + with os.fdopen(os.open("%s" % tempShFile, os.O_WRONLY | os.O_CREAT, + stat.S_IWUSR | stat.S_IRUSR),'w') as fo: + fo.write("#bash\n") + fo.write( command ) + fo.close() # send guc command bashfile to each host and execute it. sshTool.scpFiles("%s" % tempShFile, "%s" % tempShFile, [host], @@ -595,6 +635,7 @@ retry for %s times" % start_retry_num) tempShFile, [host], self.envFile) self.logger.debug(outputCollect) + self.cleanSshToolFile(sshTool) def getGUCConfig(self): """ @@ -619,14 +660,14 @@ retry for %s times" % start_retry_num) remoteHostInfo = nodeDict[remoteHost] guc_repl_template = """\ - gs_guc set -D {dn} -c "replconninfo{index}=\ - 'localhost={localhost} localport={localport} \ - localheartbeatport={localeHeartPort} \ - localservice={localservice} \ - remotehost={remoteNode} \ - remoteport={remotePort} \ - remoteheartbeatport={remoteHeartPort} \ - remoteservice={remoteservice}'" +gs_guc set -D {dn} -c "replconninfo{index}=\ +'localhost={localhost} localport={localport} \ +localheartbeatport={localeHeartPort} \ +localservice={localservice} \ +remotehost={remoteNode} \ +remoteport={remotePort} \ +remoteheartbeatport={remoteHeartPort} \ +remoteservice={remoteservice}'" """.format(dn=localeHostInfo["dataNode"], index=index, localhost=localeHostInfo["sshIp"], @@ -653,8 +694,13 @@ retry for %s times" % start_retry_num) def checkLocalModeOnStandbyHosts(self): """ + expansion the installed standby node. check standby database. + 1. if the database is normal + 2. if the databases version are same before existing and new """ standbyHosts = self.context.newHostList + envfile = self.envFile + self.logger.log("Checking the database with locale mode.") for host in standbyHosts: hostName = self.context.backIpNameMap[host] @@ -665,6 +711,39 @@ retry for %s times" % start_retry_num) GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35703"] % (hostName, self.user, dataNode, dataNode)) + allHostIp = [] + allHostIp.append(self.context.localIp) + versionDic = {} + + for hostip in standbyHosts: + allHostIp.append(hostip) + sshTool= SshTool(allHostIp) + #get version in the nodes + getversioncmd = "gaussdb --version" + resultMap, outputCollect = sshTool.getSshStatusOutput(getversioncmd, + [], envfile) + self.cleanSshToolFile(sshTool) + versionLines = outputCollect.splitlines() + for verline in versionLines: + if verline[0:9] == '[SUCCESS]': + ipKey = verline[10:-1] + continue + else: + versionStr = "".join(verline) + preVersion = versionStr.split(' ') + versionInfo = preVersion[4] + versionDic[ipKey] = versionInfo[:-2] + for hostip in versionDic: + if hostip == self.context.localIp: + versionCompare = "" + versionCompare = versionDic[hostip] + else: + if versionDic[hostip] == versionCompare: + continue + else: + GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35705"] \ + %(hostip, versionDic[hostip])) + self.logger.log("Successfully checked the database with locale mode.") def preInstall(self): @@ -684,14 +763,117 @@ standby nodes.") """ clear temporary file after expansion success """ - self.logger.debug("start to delete temporary file") + self.logger.debug("start to delete temporary file %s" % self.tempFileDir) + clearCmd = "if [ -d '%s' ];then rm -rf %s;fi" % \ + (self.tempFileDir, self.tempFileDir) hostNames = self.context.nodeNameList - sshTool = SshTool(hostNames) - clearCmd = "source %s ; rm -rf %s" % (self.envFile, self.tempFileDir) - result, output = sshTool.getSshStatusOutput(clearCmd, - hostNames, self.envFile) - self.logger.debug(output) + for host in hostNames: + try: + sshTool = SshTool(hostNames) + result, output = sshTool.getSshStatusOutput(clearCmd, + hostNames, self.envFile) + self.logger.debug(output) + self.cleanSshToolFile(sshTool) + except Exception as e: + self.logger.debug(str(e)) + self.cleanSshToolFile(sshTool) + + def cleanSshToolFile(self, sshTool): + """ + """ + try: + sshTool.clenSshResultFiles() + except Exception as e: + self.logger.debug(str(e)) + + + def checkNodesDetail(self): + """ + """ + self.checkUserAndGroupExists() + self.checkXmlFileAccessToUser() + + def checkXmlFileAccessToUser(self): + """ + Check if the xml config file has readable access to user. + """ + userInfo = pwd.getpwnam(self.user) + uid = userInfo.pw_uid + gid = userInfo.pw_gid + + xmlFile = self.context.xmlFile + fstat = os.stat(xmlFile) + mode = fstat[stat.ST_MODE] + if (fstat[stat.ST_UID] == uid and (mode & stat.S_IRUSR > 0)) or \ + (fstat[stat.ST_GID] == gid and (mode & stat.S_IRGRP > 0)): + pass + else: + self.logger.debug("User %s has no access right for file %s" \ + % (self.user, xmlFile)) + os.chown(xmlFile, uid, gid) + os.chmod(xmlFile, stat.S_IRUSR) + + def checkUserAndGroupExists(self): + """ + check system user and group exists and be same + on primary and standby nodes + """ + inputUser = self.user + inputGroup = self.group + + user_group_id = "" + isUserExits = False + localHost = socket.gethostname() + for user in pwd.getpwall(): + if user.pw_name == self.user: + user_group_id = user.pw_gid + isUserExits = True + break + if not isUserExits: + GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35704"] \ + % ("User", self.user, localHost)) + + isGroupExits = False + group_id = "" + for group in grp.getgrall(): + if group.gr_name == self.group: + group_id = group.gr_gid + isGroupExits = True + if not isGroupExits: + GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35704"] \ + % ("Group", self.group, localHost)) + if user_group_id != group_id: + GaussLog.exitWithError("User [%s] is not in the group [%s]."\ + % (self.user, self.group)) + + hostNames = self.context.newHostList + envfile = self.envFile + sshTool = SshTool(hostNames) + + #get username in the other standy nodes + getUserNameCmd = "cat /etc/passwd | grep -w %s" % inputUser + resultMap, outputCollect = sshTool.getSshStatusOutput(getUserNameCmd, + [], envfile) + + for hostKey in resultMap: + if resultMap[hostKey] == STATUS_FAIL: + self.cleanSshToolFile(sshTool) + GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35704"] \ + % ("User", self.user, hostKey)) + + #get groupname in the other standy nodes + getGroupNameCmd = "cat /etc/group | grep -w %s" % inputGroup + resultMap, outputCollect = sshTool.getSshStatusOutput(getGroupNameCmd, + [], envfile) + for hostKey in resultMap: + if resultMap[hostKey] == STATUS_FAIL: + self.cleanSshToolFile(sshTool) + GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35704"] \ + % ("Group", self.group, hostKey)) + self.cleanSshToolFile(sshTool) + + def installAndExpansion(self): """ install database and expansion standby node with db om user @@ -727,16 +909,15 @@ Start to establish the primary-standby relationship.") """ start expansion """ + self.checkNodesDetail() # preinstall on standby nodes with root user. if not self.context.standbyLocalMode: self.preInstall() self.installAndExpansion() - self.clearTmpFile() - self.logger.log("\nSuccess to expansion standby nodes.") - + class GsCtlCommon: @@ -768,7 +949,7 @@ class GsCtlCommon: dbStatus = "" else: dbStatus = db_state[0] - + self.cleanSshToolTmpFile(sshTool) return insType.strip().lower(), dbStatus.strip().lower() def stopInstance(self, host, datanode, env): @@ -780,6 +961,7 @@ class GsCtlCommon: [host], env) self.logger.debug(host) self.logger.debug(outputCollect) + self.cleanSshToolTmpFile(sshTool) def startInstanceWithMode(self, host, datanode, mode, env): """ @@ -791,6 +973,7 @@ class GsCtlCommon: [host], env) self.logger.debug(host) self.logger.debug(outputCollect) + self.cleanSshToolTmpFile(sshTool) def buildInstance(self, host, datanode, mode, env): command = "source %s ; gs_ctl build -D %s -M %s" % (env, datanode, mode) @@ -800,6 +983,7 @@ class GsCtlCommon: [host], env) self.logger.debug(host) self.logger.debug(outputCollect) + self.cleanSshToolTmpFile(sshTool) def startOmCluster(self, host, env): """ @@ -812,6 +996,7 @@ class GsCtlCommon: [host], env) self.logger.debug(host) self.logger.debug(outputCollect) + self.cleanSshToolTmpFile(sshTool) def queryOmCluster(self, host, env): """ @@ -824,6 +1009,16 @@ class GsCtlCommon: [host], env) self.logger.debug(host) self.logger.debug(outputCollect) + self.cleanSshToolTmpFile(sshTool) return outputCollect + def cleanSshToolTmpFile(self, sshTool): + """ + """ + try: + sshTool.clenSshResultFiles() + except Exception as e: + self.logger.debug(str(e)) + +