备机支持扩容

This commit is contained in:
zhang_xubo
2020-08-26 11:05:00 +08:00
parent dc967cf640
commit b39e6a4b5c
7 changed files with 1098 additions and 1 deletions

View File

@ -398,6 +398,7 @@ function target_file_copy()
sed -i '/gs_lcctl/d' binfile
sed -i '/gs_wsr/d' binfile
sed -i '/gs_gucZenith/d' binfile
sed -i '/gs_expansion/d' binfile
bin_script=$(cat binfile)
rm binfile script_file
cd $BUILD_DIR

View File

@ -0,0 +1,245 @@
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#############################################################################
# Copyright (c) 2020 Huawei Technologies Co.,Ltd.
#
# openGauss is licensed under Mulan PSL v2.
# You can use this software according to the terms
# and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
# http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
# WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# ----------------------------------------------------------------------------
# Description : gs_expansion is a utility to expansion standby node databases
#############################################################################
import os
import pwd
import sys
import threading
import uuid
import subprocess
import weakref
sys.path.append(sys.path[0])
from gspylib.common.DbClusterInfo import dbClusterInfo, \
readOneClusterConfigItem, initParserXMLFile, dbNodeInfo, checkPathVaild
from gspylib.common.GaussLog import GaussLog
from gspylib.common.Common import DefaultValue
from gspylib.common.ErrorCode import ErrorCode
from gspylib.common.ParallelBaseOM import ParallelBaseOM
from gspylib.common.ParameterParsecheck import Parameter
from impl.preinstall.OLAP.PreinstallImplOLAP import PreinstallImplOLAP
from gspylib.threads.SshTool import SshTool
from impl.expansion.ExpansionImpl import ExpansionImpl
ENV_LIST = ["MPPDB_ENV_SEPARATE_PATH", "GPHOME", "PATH",
"LD_LIBRARY_PATH", "PYTHONPATH", "GAUSS_WARNING_TYPE",
"GAUSSHOME", "PATH", "LD_LIBRARY_PATH",
"S3_CLIENT_CRT_FILE", "GAUSS_VERSION", "PGHOST",
"GS_CLUSTER_NAME", "GAUSSLOG", "GAUSS_ENV", "umask"]
class Expansion(ParallelBaseOM):
"""
"""
def __init__(self):
"""
"""
ParallelBaseOM.__init__(self)
# new added standby node backip list
self.newHostList = []
self.clusterInfoDict = {}
self.backIpNameMap = {}
self.packagepath = os.path.realpath(
os.path.join(os.path.realpath(__file__), "../../"))
self.standbyLocalMode = False
def usage(self):
"""
gs_expansion is a utility to expansion standby node for a cluster.
Usage:
gs_expansion -? | --help
gs_expansion -V | --version
gs_expansion -U USER -G GROUP -X XMLFILE -h nodeList [-L]
General options:
-U Cluster user.
-G Group of the cluster user.
-X Path of the XML configuration file.
-h New standby node node backip list.
Separate multiple nodes with commas (,).
such as '-h 192.168.0.1,192.168.0.2'
-L The standby database installed with
local mode.
-?, --help Show help information for this
utility, and exit the command line mode.
-V, --version Show version information.
"""
print(self.usage.__doc__)
def parseCommandLine(self):
"""
parse parameter from command line
"""
ParaObj = Parameter()
ParaDict = ParaObj.ParameterCommandLine("expansion")
# parameter -h or -?
if (ParaDict.__contains__("helpFlag")):
self.usage()
sys.exit(0)
# Resolves command line arguments
# parameter -U
if (ParaDict.__contains__("user")):
self.user = ParaDict.get("user")
DefaultValue.checkPathVaild(self.user)
# parameter -G
if (ParaDict.__contains__("group")):
self.group = ParaDict.get("group")
# parameter -X
if (ParaDict.__contains__("confFile")):
self.xmlFile = ParaDict.get("confFile")
# parameter -L
if (ParaDict.__contains__("localMode")):
self.localMode = ParaDict.get("localMode")
self.standbyLocalMode = ParaDict.get("localMode")
# parameter -l
if (ParaDict.__contains__("logFile")):
self.logFile = ParaDict.get("logFile")
#parameter -h
if (ParaDict.__contains__("nodename")):
self.newHostList = ParaDict.get("nodename")
def checkParameters(self):
"""
function: Check parameter from command line
input: NA
output: NA
"""
# check user | group | xmlfile | node
if len(self.user) == 0:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35701"] % "-U")
if len(self.group) == 0:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35701"] % "-G")
if len(self.xmlFile) == 0:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35701"] % "-X")
if len(self.newHostList) == 0:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35701"] % "-h")
clusterInfo = ExpansipnClusterInfo()
hostNameIpDict = clusterInfo.initFromXml(self.xmlFile)
clusterDict = clusterInfo.getClusterDirectorys()
backIpList = clusterInfo.getClusterBackIps()
nodeNameList = clusterInfo.getClusterNodeNames()
self.nodeNameList = nodeNameList
self.backIpNameMap = {}
for backip in backIpList:
self.backIpNameMap[backip] = clusterInfo.getNodeNameByBackIp(backip)
# check parameter node must in xml config file
for nodeid in self.newHostList:
if nodeid not in backIpList:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35702"] % \
nodeid)
# get corepath and toolpath from xml file
corePath = clusterInfo.readClustercorePath(self.xmlFile)
toolPath = clusterInfo.getToolPath(self.xmlFile)
# parse xml file and cache node info
clusterInfoDict = {}
clusterInfoDict["appPath"] = clusterDict["appPath"][0]
clusterInfoDict["logPath"] = clusterDict["logPath"][0]
clusterInfoDict["corePath"] = corePath
clusterInfoDict["toolPath"] = toolPath
for nodeName in nodeNameList:
hostInfo = hostNameIpDict[nodeName]
ipList = hostInfo[0]
portList = hostInfo[1]
backIp = ""
sshIp = ""
if len(ipList) == 1:
backIp = sshIp = ipList[0]
elif len(ipList) == 2:
backIp = ipList[0]
sshIp = ipList[1]
port = portList[0]
cluster = clusterDict[nodeName]
dataNode = cluster[2]
clusterInfoDict[nodeName] = {
"backIp": backIp,
"sshIp": sshIp,
"port": port,
"localport": int(port) + 1,
"localservice": int(port) + 4,
"heartBeatPort": int(port) + 3,
"dataNode": dataNode,
"instanceType": -1
}
nodeIdList = clusterInfo.getClusterNodeIds()
for id in nodeIdList:
insType = clusterInfo.getdataNodeInstanceType(id)
hostName = clusterInfo.getHostNameByNodeId(id)
clusterInfoDict[hostName]["instanceType"] = insType
self.clusterInfoDict = clusterInfoDict
def initLogs(self):
"""
init log file
"""
# if no log file
if (self.logFile == ""):
self.logFile = DefaultValue.getOMLogPath(
DefaultValue.EXPANSION_LOG_FILE, self.user, "",
self.xmlFile)
# if not absolute path
if (not os.path.isabs(self.logFile)):
GaussLog.exitWithError(ErrorCode.GAUSS_502["GAUSS_50213"] % "log")
self.initLogger("gs_expansion")
self.logger.ignoreErr = True
class ExpansipnClusterInfo(dbClusterInfo):
def __init__(self):
dbClusterInfo.__init__(self)
def getToolPath(self, xmlFile):
"""
function : Read tool path from default xml file
input : String
output : String
"""
self.setDefaultXmlFile(xmlFile)
# read gaussdb tool path from xml file
(retStatus, retValue) = readOneClusterConfigItem(
initParserXMLFile(xmlFile), "gaussdbToolPath", "cluster")
if retStatus != 0:
raise Exception(ErrorCode.GAUSS_512["GAUSS_51200"]
% "gaussdbToolPath" + " Error: \n%s" % retValue)
toolPath = os.path.normpath(retValue)
checkPathVaild(toolPath)
return toolPath
if __name__ == "__main__":
"""
"""
expansion = Expansion()
expansion.parseCommandLine()
expansion.checkParameters()
expansion.initLogs()
expImpl = ExpansionImpl(expansion)
expImpl.run()

View File

@ -309,6 +309,7 @@ class DefaultValue():
LCCTL_LOG_FILE = "gs_lcctl.log"
RESIZE_LOG_FILE = "gs_resize.log"
HOTPATCH_LOG_FILE = "gs_hotpatch.log"
EXPANSION_LOG_FILE = "gs_expansion.log"
# hotpatch action
HOTPATCH_ACTION_LIST = ["load", "unload", "active", "deactive",
"info", "list"]

View File

@ -1092,6 +1092,24 @@ class ErrorCode():
'GAUSS_53612': "[GAUSS-53612]: Can not find any catalog in database %s"
}
##########################################################################
# gs_expansion
# [GAUSS-537] : gs_expansion failed
##########################################################################
GAUSS_357 = {
"GAUSS_35700": "[GAUSS-35700] Expansion standby node failed.",
"GAUSS_35701": "[GAUSS-35701] Empty parameter. The %s parameter is"
"missing in the command.",
"GAUSS_35702": "[GAUSS-35702] Unrecognized parameter, standby host "
"backip %s is not in the "
"XML configuration file",
"GAUSS_35703": "[GAUSS-35703] Check standby database Failed. The "
"database on node is abnormal. \n"
"node [%s], user [%s], dataNode [%s]. \n"
"You can use command \"gs_ctl query -D %s\" for more "
"detail."
}
class OmError(BaseException):
"""

View File

@ -93,6 +93,8 @@ gs_ssh = ["-?", "--help", "-V", "--version", "-c:"]
gs_checkos = ["-?", "--help", "-V", "--version", "-h:", "-f:", "-o:",
"-i:", "--detail",
"-l:", "-X:"]
gs_expansion = ["-?", "--help", "-V", "--version", "-U:", "-G:", "-L",
"-X:", "-h:", "--sep-env-file="]
# gs_om child branch
gs_om_start = ["-t:", "-?", "--help", "-V", "--version", "-h:", "-I:",
@ -153,7 +155,8 @@ ParameterDict = {"preinstall": gs_preinstall,
"postuninstall": gs_postuninstall,
"view": gs_om_view,
"query": gs_om_query,
"refreshconf": gs_om_refreshconf
"refreshconf": gs_om_refreshconf,
"expansion": gs_expansion
}
# List of scripts with the -t parameter

View File

@ -0,0 +1,829 @@
# -*- coding:utf-8 -*-
#############################################################################
# Copyright (c) 2020 Huawei Technologies Co.,Ltd.
#
# openGauss is licensed under Mulan PSL v2.
# You can use this software according to the terms
# and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
# http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
# WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# ----------------------------------------------------------------------------
# Description : ExpansionImpl.py
#############################################################################
import subprocess
import sys
import re
import os
import getpass
import pwd
import datetime
from random import sample
import time
from multiprocessing import Process, Value
sys.path.append(sys.path[0] + "/../../../../")
from gspylib.common.DbClusterInfo import dbClusterInfo, queryCmd
from gspylib.threads.SshTool import SshTool
from gspylib.common.DbClusterStatus import DbClusterStatus
from gspylib.common.ErrorCode import ErrorCode
from gspylib.common.Common import DefaultValue
from gspylib.common.GaussLog import GaussLog
sys.path.append(sys.path[0] + "/../../../lib/")
DefaultValue.doConfigForParamiko()
import paramiko
#mode
MODE_PRIMARY = "primary"
MODE_STANDBY = "standby"
MODE_NORMAL = "normal"
#db state
STAT_NORMAL = "normal"
# master
MASTER_INSTANCE = 0
# standby
STANDBY_INSTANCE = 1
# statu failed
STATUS_FAIL = "Failure"
class ExpansionImpl():
"""
class for expansion standby node.
step:
1. preinstall database on new standby node
2. install as single-node database
3. establish primary-standby relationship of all node
"""
def __init__(self, expansion):
"""
"""
self.context = expansion
self.user = self.context.user
self.group = self.context.group
self.logger = self.context.logger
self.envFile = DefaultValue.getEnv("MPPDB_ENV_SEPARATE_PATH")
currentTime = str(datetime.datetime.now()).replace(" ", "_").replace(
".", "_")
self.commonGsCtl = GsCtlCommon(expansion)
self.tempFileDir = "/tmp/gs_expansion_%s" % (currentTime)
self.logger.debug("tmp expansion dir is %s ." % self.tempFileDir)
def sendSoftToHosts(self):
"""
create software dir and send it on each nodes
"""
self.logger.debug("Start to send software to each standby nodes.\n")
hostNames = self.context.newHostList
hostList = hostNames
sshTool = SshTool(hostNames)
srcFile = self.context.packagepath
targetDir = os.path.realpath(
os.path.join(srcFile, "../"))
## mkdir package dir and send package to remote nodes.
sshTool.executeCommand("mkdir -p %s" % srcFile , "", DefaultValue.SUCCESS,
hostList)
sshTool.scpFiles(srcFile, targetDir, hostList)
## change mode of package dir to set privileges for users
tPathList = os.path.split(targetDir)
path2ChangeMode = targetDir
if len(tPathList) > 2:
path2ChangeMode = os.path.join(tPathList[0],tPathList[1])
changeModCmd = "chmod -R a+x {srcFile}".format(user=self.user,
group=self.group,srcFile=path2ChangeMode)
sshTool.executeCommand(changeModCmd, "", DefaultValue.SUCCESS,
hostList)
self.logger.debug("End to send software to each standby nodes.\n")
def generateAndSendXmlFile(self):
"""
"""
self.logger.debug("Start to generateAndSend XML file.\n")
tempXmlFile = "%s/clusterconfig.xml" % self.tempFileDir
cmd = "mkdir -p %s; touch %s; cat /dev/null > %s" % \
(self.tempFileDir, tempXmlFile, tempXmlFile)
(status, output) = subprocess.getstatusoutput(cmd)
cmd = "chown -R %s:%s %s" % (self.user, self.group, self.tempFileDir)
(status, output) = subprocess.getstatusoutput(cmd)
newHosts = self.context.newHostList
for host in newHosts:
# create single deploy xml file for each standby node
xmlContent = self.__generateXml(host)
fo = open("%s" % tempXmlFile, "w")
fo.write( xmlContent )
fo.close()
# send single deploy xml file to each standby node
sshTool = SshTool(host)
retmap, output = sshTool.getSshStatusOutput("mkdir -p %s" %
self.tempFileDir , [host], self.envFile)
retmap, output = sshTool.getSshStatusOutput("chown %s:%s %s" %
(self.user, self.group, self.tempFileDir), [host], self.envFile)
sshTool.scpFiles("%s" % tempXmlFile, "%s" %
tempXmlFile, [host], self.envFile)
self.logger.debug("End to generateAndSend XML file.\n")
def __generateXml(self, backIp):
"""
"""
nodeName = self.context.backIpNameMap[backIp]
nodeInfo = self.context.clusterInfoDict[nodeName]
backIp = nodeInfo["backIp"]
sshIp = nodeInfo["sshIp"]
port = nodeInfo["port"]
dataNode = nodeInfo["dataNode"]
appPath = self.context.clusterInfoDict["appPath"]
logPath = self.context.clusterInfoDict["logPath"]
corePath = self.context.clusterInfoDict["corePath"]
toolPath = self.context.clusterInfoDict["toolPath"]
xmlConfig = """\
<?xml version="1.0" encoding="UTF-8"?>
<ROOT>
<CLUSTER>
<PARAM name="clusterName" value="dbCluster" />
<PARAM name="nodeNames" value="{nodeName}" />
<PARAM name="backIp1s" value="{backIp}"/>
<PARAM name="gaussdbAppPath" value="{appPath}" />
<PARAM name="gaussdbLogPath" value="{logPath}" />
<PARAM name="gaussdbToolPath" value="{toolPath}" />
<PARAM name="corePath" value="{corePath}"/>
<PARAM name="clusterType" value="single-inst"/>
</CLUSTER>
<DEVICELIST>
<DEVICE sn="1000001">
<PARAM name="name" value="{nodeName}"/>
<PARAM name="azName" value="AZ1"/>
<PARAM name="azPriority" value="1"/>
<PARAM name="backIp1" value="{backIp}"/>
<PARAM name="sshIp1" value="{sshIp}"/>
<!--dbnode-->
<PARAM name="dataNum" value="1"/>
<PARAM name="dataPortBase" value="{port}"/>
<PARAM name="dataNode1" value="{dataNode}"/>
</DEVICE>
</DEVICELIST>
</ROOT>
""".format(nodeName=nodeName,backIp=backIp,appPath=appPath,
logPath=logPath,toolPath=toolPath,corePath=corePath,
sshIp=sshIp,port=port,dataNode=dataNode)
return xmlConfig
def changeUser(self):
user = self.user
try:
pw_record = pwd.getpwnam(user)
except Exception:
GaussLog.exitWithError(ErrorCode.GAUSS_503["GAUSS_50300"] % user)
user_name = pw_record.pw_name
user_uid = pw_record.pw_uid
user_gid = pw_record.pw_gid
env = os.environ.copy()
os.setgid(user_gid)
os.setuid(user_uid)
def initSshConnect(self, host, user='root'):
try:
getPwdStr = "Please enter the password of user [%s] on node [%s]: " \
% (user, host)
passwd = getpass.getpass(getPwdStr)
self.sshClient = paramiko.SSHClient()
self.sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.sshClient.connect(host, 22, user, passwd)
except paramiko.ssh_exception.AuthenticationException as e :
self.logger.log("Authentication failed.")
self.initSshConnect(host, user)
def installDatabaseOnHosts(self):
"""
install database on each standby node
"""
hostList = self.context.newHostList
envfile = DefaultValue.getEnv(DefaultValue.MPPRC_FILE_ENV)
tempXmlFile = "%s/clusterconfig.xml" % self.tempFileDir
installCmd = "source {envfile} ; gs_install -X {xmlfile} \
2>&1".format(envfile=envfile,xmlfile=tempXmlFile)
statusArr = []
for newHost in hostList:
self.logger.log("\ninstalling database on node %s:" % newHost)
self.logger.debug(installCmd)
hostName = self.context.backIpNameMap[newHost]
sshIp = self.context.clusterInfoDict[hostName]["sshIp"]
self.initSshConnect(sshIp, self.user)
stdin, stdout, stderr = self.sshClient.exec_command(installCmd,
get_pty=True)
channel = stdout.channel
echannel = stderr.channel
while not channel.exit_status_ready():
try:
recvOut = channel.recv(1024)
outDecode = recvOut.decode("utf-8");
outStr = outDecode.strip()
if(len(outStr) == 0):
continue
if(outDecode.endswith("\r\n")):
self.logger.log(outStr)
else:
value = ""
if re.match(r".*yes.*no.*", outStr):
value = input(outStr)
while True:
# check the input
if (
value.upper() != "YES"
and value.upper() != "NO"
and value.upper() != "Y"
and value.upper() != "N"):
value = input("Please type 'yes' or 'no': ")
continue
break
else:
value = getpass.getpass(outStr)
stdin.channel.send("%s\r\n" %value)
stdin.flush()
stdout.flush()
except Exception as e:
sys.exit(1)
pass
if channel.exit_status_ready() and \
not channel.recv_stderr_ready() and \
not channel.recv_ready():
channel.close()
break
stdout.close()
stderr.close()
status = channel.recv_exit_status()
statusArr.append(status)
isBothSuccess = True
for status in statusArr:
if status != 0:
isBothSuccess = False
break
if isBothSuccess:
self.logger.log("\nSuccessfully install database on node %s" %
hostList)
else:
sys.exit(1)
def preInstallOnHosts(self):
"""
execute preinstall step
"""
self.logger.debug("Start to preinstall database step.\n")
newBackIps = self.context.newHostList
newHostNames = []
for host in newBackIps:
newHostNames.append(self.context.backIpNameMap[host])
envfile = self.envFile
tempXmlFile = "%s/clusterconfig.xml" % self.tempFileDir
preinstallCmd = "{softpath}/script/gs_preinstall -U {user} -G {group} \
-X {xmlfile} --sep-env-file={envfile} \
--non-interactive 2>&1\
".format(softpath=self.context.packagepath,user=self.user,
group=self.group,xmlfile=tempXmlFile,envfile=envfile)
sshTool = SshTool(newHostNames)
status, output = sshTool.getSshStatusOutput(preinstallCmd , [], envfile)
statusValues = status.values()
if STATUS_FAIL in statusValues:
GaussLog.exitWithError(output)
self.logger.debug("End to preinstall database step.\n")
def buildStandbyRelation(self):
"""
func: after install single database on standby nodes.
build the relation with primary and standby nodes.
step:
1. restart primary node with Primary Mode
(only used to Single-Node instance)
2. set guc config to primary node
3. restart standby node with Standby Mode
4. set guc config to standby node
5. generate cluster static file and send to each node.
"""
self.queryPrimaryClusterDetail()
self.setPrimaryGUCConfig()
self.setStandbyGUCConfig()
self.buildStandbyHosts()
self.generateClusterStaticFile()
def queryPrimaryClusterDetail(self):
"""
get current cluster type.
single-node or primary-standby
"""
self.logger.debug("Query primary database instance mode.\n")
self.isSingleNodeInstance = True
primaryHost = self.getPrimaryHostName()
result = self.commonGsCtl.queryOmCluster(primaryHost, self.envFile)
instance = re.findall(r"node\s+node_ip\s+instance\s+state", result)
if len(instance) > 1:
self.isSingleNodeInstance = False
self.logger.debug("Original instance mode is %s" %
self.isSingleNodeInstance)
def setPrimaryGUCConfig(self):
"""
"""
self.logger.debug("Start to set primary node GUC config.\n")
primaryHost = self.getPrimaryHostName()
dataNode = self.context.clusterInfoDict[primaryHost]["dataNode"]
self.setGUCOnClusterHosts([primaryHost])
self.addStandbyIpInPrimaryConf()
insType, dbStat = self.commonGsCtl.queryInstanceStatus(primaryHost,
dataNode,self.envFile)
if insType != MODE_PRIMARY:
self.commonGsCtl.stopInstance(primaryHost, dataNode, self.envFile)
self.commonGsCtl.startInstanceWithMode(primaryHost, dataNode,
MODE_PRIMARY,self.envFile)
# start db to primary state for three times max
start_retry_num = 1
while start_retry_num <= 3:
insType, dbStat = self.commonGsCtl.queryInstanceStatus(primaryHost,
dataNode, self.envFile)
if insType == MODE_PRIMARY:
break
self.logger.debug("Start database as Primary mode failed, \
retry for %s times" % start_retry_num)
self.commonGsCtl.startInstanceWithMode(primaryHost, dataNode,
MODE_PRIMARY, self.envFile)
start_retry_num = start_retry_num + 1
def setStandbyGUCConfig(self):
"""
"""
self.logger.debug("Start to set standby node GUC config.\n")
standbyHosts = self.context.newHostList
standbyHostNames = []
for host in standbyHosts:
hostName = self.context.backIpNameMap[host]
standbyHostNames.append(hostName)
self.setGUCOnClusterHosts(standbyHostNames)
def addStandbyIpInPrimaryConf(self):
"""
add standby hosts ip in primary node pg_hba.conf
"""
standbyHosts = self.context.newHostList
primaryHost = self.getPrimaryHostName()
command = ''
for host in standbyHosts:
hostName = self.context.backIpNameMap[host]
dataNode = self.context.clusterInfoDict[hostName]["dataNode"]
command += "gs_guc set -D %s -h 'host all all %s/32 \
trust';" % (dataNode, host)
self.logger.debug(command)
sshTool = SshTool([primaryHost])
resultMap, outputCollect = sshTool.getSshStatusOutput(command,
[primaryHost], self.envFile)
self.logger.debug(outputCollect)
def reloadPrimaryConf(self):
"""
"""
primaryHost = self.getPrimaryHostName()
dataNode = self.context.clusterInfoDict[primaryHost]["dataNode"]
command = "gs_ctl reload -D %s " % dataNode
sshTool = SshTool([primaryHost])
self.logger.debug(command)
resultMap, outputCollect = sshTool.getSshStatusOutput(command,
[primaryHost], self.envFile)
self.logger.debug(outputCollect)
def getPrimaryHostName(self):
"""
"""
primaryHost = ""
for nodeName in self.context.nodeNameList:
if self.context.clusterInfoDict[nodeName]["instanceType"] \
== MASTER_INSTANCE:
primaryHost = nodeName
break
return primaryHost
def buildStandbyHosts(self):
"""
stop the new standby host`s database and build it as standby mode
"""
self.logger.debug("start to build standby node...\n")
standbyHosts = self.context.newHostList
for host in standbyHosts:
hostName = self.context.backIpNameMap[host]
dataNode = self.context.clusterInfoDict[hostName]["dataNode"]
self.commonGsCtl.stopInstance(hostName, dataNode, self.envFile)
self.commonGsCtl.startInstanceWithMode(hostName, dataNode,
MODE_STANDBY, self.envFile)
# start standby as standby mode for three times max.
start_retry_num = 1
while start_retry_num <= 3:
insType, dbStat = self.commonGsCtl.queryInstanceStatus(hostName,
dataNode, self.envFile)
if insType != MODE_STANDBY:
self.logger.debug("Start databasse as Standby mode failed, \
retry for %s times" % start_retry_num)
self.setGUCOnClusterHosts([])
self.addStandbyIpInPrimaryConf()
self.reloadPrimaryConf()
self.commonGsCtl.startInstanceWithMode(hostName, dataNode,
MODE_STANDBY, self.envFile)
start_retry_num = start_retry_num + 1
else:
break
# build standby node
self.addStandbyIpInPrimaryConf()
self.reloadPrimaryConf()
self.commonGsCtl.buildInstance(hostName, dataNode, MODE_STANDBY,
self.envFile)
# if build failed first time. retry for three times.
start_retry_num = 1
while start_retry_num <= 3:
insType, dbStat = self.commonGsCtl.queryInstanceStatus(hostName,
dataNode, self.envFile)
if dbStat != STAT_NORMAL:
self.logger.debug("Build standby instance failed, \
retry for %s times" % start_retry_num)
self.addStandbyIpInPrimaryConf()
self.reloadPrimaryConf()
self.commonGsCtl.buildInstance(hostName, dataNode,
MODE_STANDBY, self.envFile)
start_retry_num = start_retry_num + 1
else:
break
def generateClusterStaticFile(self):
"""
generate static_config_files and send to all hosts
"""
self.logger.debug("Start to generate and send cluster static file.\n")
primaryHosts = self.getPrimaryHostName()
command = "gs_om -t generateconf -X %s" % self.context.xmlFile
sshTool = SshTool([primaryHosts])
resultMap, outputCollect = sshTool.getSshStatusOutput(command,
[primaryHosts], self.envFile)
self.logger.debug(outputCollect)
nodeNameList = self.context.nodeNameList
for hostName in nodeNameList:
hostSsh = SshTool([hostName])
toolPath = self.context.clusterInfoDict["toolPath"]
appPath = self.context.clusterInfoDict["appPath"]
srcFile = "%s/script/static_config_files/cluster_static_config_%s" \
% (toolPath, hostName)
targetFile = "%s/bin/cluster_static_config" % appPath
hostSsh.scpFiles(srcFile, targetFile, [hostName], self.envFile)
self.logger.debug("End to generate and send cluster static file.\n")
time.sleep(10)
# Single-node database need start cluster after expansion
if self.isSingleNodeInstance:
self.logger.debug("Single-Node instance need restart.\n")
self.commonGsCtl.queryOmCluster(primaryHosts, self.envFile)
# if primary database not normal, restart it
primaryHost = self.getPrimaryHostName()
dataNode = self.context.clusterInfoDict[primaryHost]["dataNode"]
insType, dbStat = self.commonGsCtl.queryInstanceStatus(primaryHost,
dataNode, self.envFile)
if insType != MODE_PRIMARY:
self.commonGsCtl.startInstanceWithMode(primaryHost, dataNode,
MODE_PRIMARY, self.envFile)
# if stat if not normal,rebuild standby database
standbyHosts = self.context.newHostList
for host in standbyHosts:
hostName = self.context.backIpNameMap[host]
dataNode = self.context.clusterInfoDict[hostName]["dataNode"]
insType, dbStat = self.commonGsCtl.queryInstanceStatus(hostName,
dataNode, self.envFile)
if dbStat != STAT_NORMAL:
self.commonGsCtl.buildInstance(hostName, dataNode,
MODE_STANDBY, self.envFile)
self.commonGsCtl.startOmCluster(primaryHosts, self.envFile)
def setGUCOnClusterHosts(self, hostNames=[]):
"""
guc config on all hosts
"""
gucDict = self.getGUCConfig()
tempShFile = "%s/guc.sh" % self.tempFileDir
if len(hostNames) == 0:
hostNames = self.context.nodeNameList
for host in hostNames:
command = "source %s ; " % self.envFile + gucDict[host]
self.logger.debug(command)
sshTool = SshTool([host])
# create temporary dir to save guc command bashfile.
mkdirCmd = "mkdir -m a+x -p %s; chown %s:%s %s" % \
(self.tempFileDir,self.user,self.group,self.tempFileDir)
retmap, output = sshTool.getSshStatusOutput(mkdirCmd, [host], self.envFile)
subprocess.getstatusoutput("mkdir -m a+x -p %s; touch %s; \
cat /dev/null > %s" % \
(self.tempFileDir, tempShFile, tempShFile))
fo = open("%s" % tempShFile, "w")
fo.write("#bash\n")
fo.write( command )
fo.close()
# send guc command bashfile to each host and execute it.
sshTool.scpFiles("%s" % tempShFile, "%s" % tempShFile, [host],
self.envFile)
resultMap, outputCollect = sshTool.getSshStatusOutput("sh %s" % \
tempShFile, [host], self.envFile)
self.logger.debug(outputCollect)
def getGUCConfig(self):
"""
get guc config of each node:
replconninfo[index]
remote_read_mode
replication_type
"""
nodeDict = self.context.clusterInfoDict
hostNames = self.context.nodeNameList
gucDict = {}
for hostName in hostNames:
localeHostInfo = nodeDict[hostName]
index = 1
guc_tempate_str = ""
for remoteHost in hostNames:
if(remoteHost == hostName):
continue
remoteHostInfo = nodeDict[remoteHost]
guc_repl_template = """\
gs_guc set -D {dn} -c "replconninfo{index}=\
'localhost={localhost} localport={localport} \
localheartbeatport={localeHeartPort} \
localservice={localservice} \
remotehost={remoteNode} \
remoteport={remotePort} \
remoteheartbeatport={remoteHeartPort} \
remoteservice={remoteservice}'"
""".format(dn=localeHostInfo["dataNode"],
index=index,
localhost=localeHostInfo["sshIp"],
localport=localeHostInfo["localport"],
localeHeartPort=localeHostInfo["heartBeatPort"],
localservice=localeHostInfo["localservice"],
remoteNode=remoteHostInfo["sshIp"],
remotePort=remoteHostInfo["localport"],
remoteHeartPort=remoteHostInfo["heartBeatPort"],
remoteservice=remoteHostInfo["localservice"])
guc_tempate_str += guc_repl_template
index += 1
guc_mode_type = """
gs_guc set -D {dn} -c 'remote_read_mode=off';
gs_guc set -D {dn} -c 'replication_type=1';
""".format(dn=localeHostInfo["dataNode"])
guc_tempate_str += guc_mode_type
gucDict[hostName] = guc_tempate_str
return gucDict
def checkLocalModeOnStandbyHosts(self):
"""
"""
standbyHosts = self.context.newHostList
self.logger.log("Checking the database with locale mode.")
for host in standbyHosts:
hostName = self.context.backIpNameMap[host]
dataNode = self.context.clusterInfoDict[hostName]["dataNode"]
insType, dbStat = self.commonGsCtl.queryInstanceStatus(hostName,
dataNode, self.envFile)
if insType not in (MODE_PRIMARY, MODE_STANDBY, MODE_NORMAL):
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35703"] %
(hostName, self.user, dataNode, dataNode))
self.logger.log("Successfully checked the database with locale mode.")
def preInstall(self):
"""
preinstall on new hosts.
"""
self.logger.log("Start to preinstall database on the new \
standby nodes.")
self.sendSoftToHosts()
self.generateAndSendXmlFile()
self.preInstallOnHosts()
self.logger.log("Successfully preinstall database on the new \
standby nodes.")
def clearTmpFile(self):
"""
clear temporary file after expansion success
"""
self.logger.debug("start to delete temporary file")
hostNames = self.context.nodeNameList
sshTool = SshTool(hostNames)
clearCmd = "source %s ; rm -rf %s" % (self.envFile, self.tempFileDir)
result, output = sshTool.getSshStatusOutput(clearCmd,
hostNames, self.envFile)
self.logger.debug(output)
def installAndExpansion(self):
"""
install database and expansion standby node with db om user
"""
pvalue = Value('i', 0)
proc = Process(target=self.installProcess, args=(pvalue,))
proc.start()
proc.join()
if not pvalue.value:
sys.exit(1)
else:
proc.terminate()
def installProcess(self, pvalue):
# change to db manager user. the below steps run with db manager user.
self.changeUser()
if not self.context.standbyLocalMode:
self.logger.log("\nStart to install database on the new \
standby nodes.")
self.installDatabaseOnHosts()
else:
self.logger.log("\nStandby nodes is installed with locale mode.")
self.checkLocalModeOnStandbyHosts()
self.logger.log("\nDatabase on standby nodes installed finished. \
Start to establish the primary-standby relationship.")
self.buildStandbyRelation()
# process success
pvalue.value = 1
def run(self):
"""
start expansion
"""
# preinstall on standby nodes with root user.
if not self.context.standbyLocalMode:
self.preInstall()
self.installAndExpansion()
self.clearTmpFile()
self.logger.log("\nSuccess to expansion standby nodes.")
class GsCtlCommon:
def __init__(self, expansion):
"""
"""
self.logger = expansion.logger
def queryInstanceStatus(self, host, datanode, env):
"""
"""
command = "source %s ; gs_ctl query -D %s" % (env, datanode)
sshTool = SshTool([datanode])
resultMap, outputCollect = sshTool.getSshStatusOutput(command,
[host], env)
self.logger.debug(outputCollect)
localRole = re.findall(r"local_role.*: (.*?)\n", outputCollect)
db_state = re.findall(r"db_state.*: (.*?)\n", outputCollect)
insType = ""
if(len(localRole)) == 0:
insType = ""
else:
insType = localRole[0]
dbStatus = ""
if(len(db_state)) == 0:
dbStatus = ""
else:
dbStatus = db_state[0]
return insType.strip().lower(), dbStatus.strip().lower()
def stopInstance(self, host, datanode, env):
"""
"""
command = "source %s ; gs_ctl stop -D %s" % (env, datanode)
sshTool = SshTool([host])
resultMap, outputCollect = sshTool.getSshStatusOutput(command,
[host], env)
self.logger.debug(host)
self.logger.debug(outputCollect)
def startInstanceWithMode(self, host, datanode, mode, env):
"""
"""
command = "source %s ; gs_ctl start -D %s -M %s" % (env, datanode, mode)
self.logger.debug(command)
sshTool = SshTool([host])
resultMap, outputCollect = sshTool.getSshStatusOutput(command,
[host], env)
self.logger.debug(host)
self.logger.debug(outputCollect)
def buildInstance(self, host, datanode, mode, env):
command = "source %s ; gs_ctl build -D %s -M %s" % (env, datanode, mode)
self.logger.debug(command)
sshTool = SshTool([host])
resultMap, outputCollect = sshTool.getSshStatusOutput(command,
[host], env)
self.logger.debug(host)
self.logger.debug(outputCollect)
def startOmCluster(self, host, env):
"""
om tool start cluster
"""
command = "source %s ; gs_om -t start" % env
self.logger.debug(command)
sshTool = SshTool([host])
resultMap, outputCollect = sshTool.getSshStatusOutput(command,
[host], env)
self.logger.debug(host)
self.logger.debug(outputCollect)
def queryOmCluster(self, host, env):
"""
query om cluster detail with command:
gs_om -t status --detail
"""
command = "source %s ; gs_om -t status --detail" % env
sshTool = SshTool([host])
resultMap, outputCollect = sshTool.getSshStatusOutput(command,
[host], env)
self.logger.debug(host)
self.logger.debug(outputCollect)
return outputCollect