!461 解决删除备节点需求测试发现的一些问题

Merge pull request !461 from cchen676/test2
This commit is contained in:
opengauss-bot
2020-12-09 11:46:14 +08:00
committed by Gitee
2 changed files with 102 additions and 109 deletions

View File

@ -24,6 +24,8 @@ import os
import re
import subprocess
import sys
import pwd
import grp
sys.path.append(sys.path[0])
from gspylib.common.DbClusterInfo import dbClusterInfo
@ -35,6 +37,7 @@ from gspylib.common.ParallelBaseOM import ParallelBaseOM
from gspylib.common.ParameterParsecheck import Parameter
from gspylib.threads.SshTool import SshTool
from impl.dropnode.DropnodeImpl import DropnodeImpl
from gspylib.inspection.common.Exception import CheckException
ENV_LIST = ["MPPDB_ENV_SEPARATE_PATH", "GPHOME", "PATH",
"LD_LIBRARY_PATH", "PYTHONPATH", "GAUSS_WARNING_TYPE",
@ -62,9 +65,12 @@ class Dropnode(ParallelBaseOM):
envFile = DefaultValue.getEnv("MPPDB_ENV_SEPARATE_PATH")
if envFile:
self.envFile = envFile
self.userProfile = envFile
else:
self.envFile = "/etc/profile"
self.userProfile = ''
cmd = "echo ~%s" % self.user
(status, output) = subprocess.getstatusoutput(cmd)
self.userProfile = os.path.join(output, ".bashrc")
def usage(self):
"""
@ -126,6 +132,18 @@ General options:
# get dbcluster info from static config file
self.clusterInfo.initFromStaticConfig(self.user)
appPath = self.clusterInfo.appPath
db_uid = os.stat(appPath).st_uid
db_gid = os.stat(appPath).st_gid
try:
pw_user = pwd.getpwnam(self.user)
gr_group = grp.getgrnam(self.group)
except CheckException:
GaussLog.exitWithError(
ErrorCode.GAUSS_503["GAUSS_50300"] % self.user)
if db_uid != pw_user.pw_uid or db_gid != gr_group.gr_gid:
GaussLog.exitWithError(
ErrorCode.GAUSS_503["GAUSS_50323"] % self.user)
self.backIpNameMap = {}
for node in self.clusterInfo.dbNodes:
self.backIpNameMap[node.name] = node.backIps[0]
@ -162,15 +180,16 @@ General options:
# check the node ip is the IP of the current server
localIp = self.backIpNameMap[DefaultValue.GetHostIpOrName()]
for dndir_loop in \
self.hostMapForExist[DefaultValue.GetHostIpOrName()]['datadir']:
cmd = "gs_ctl query -D %s|grep '\<local_role\>'| " \
"awk -F ':' '{print $2}'" % dndir_loop
(status, output) = subprocess.getstatusoutput(cmd)
if 'Primary' not in output:
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35804"])
if localIp in self.hostIpListForDel:
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35803"] % \
localIp)
localNode = self.clusterInfo.getDbNodeByName(
DefaultValue.GetHostIpOrName())
localInstanceType = self.clusterInfo.getdataNodeInstanceType(
localNode.id)
if localInstanceType:
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35804"])
def check_repeat_process(self):
"""
@ -202,15 +221,12 @@ General options:
flag = input("Please type 'yes' or 'no': ")
continue
break
if count_f == 0:
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35805"] % flag.upper())
if flag.upper() == "NO" or flag.upper() == "N":
if flag.upper() != "YES" and flag.upper() != "Y":
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35805"] % flag.upper())
self.flagOnlyPrimary = True
def checkClusterStatus(self):
def check_cluster_status(self):
"""
function: Check whether the status of cluster is normal
input: NA
@ -220,7 +236,7 @@ General options:
tmpFile = os.path.join(tmpDir, "gauss_cluster_status.dat_" + \
str(datetime.datetime.now().strftime(
'%Y%m%d%H%M%S')) + "_" + str(os.getpid()))
cmd = ClusterCommand.getQueryStatusCmd(self.user, "", tmpFile, True)
cmd = ClusterCommand.getQueryStatusCmd(self.user, "", tmpFile, False)
(status, output) = subprocess.getstatusoutput(cmd)
if status != 0:
self.logger.debug("The cmd is %s " % cmd)
@ -231,25 +247,15 @@ General options:
clusterStatus = DbClusterStatus()
clusterStatus.initFromFile(tmpFile)
clsStatus = clusterStatus.clusterStatus
statusDelHost = " The target node to be dropped is %s \n" % str(
clsStatus = clusterStatus.clusterStatusDetail
statusDelHost = "The target node to be dropped is %s \n" % str(
self.hostMapForDel.keys())[9:]
if clsStatus in ["Unknown", "Unavailable"]:
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35806"] % clsStatus)
for dbNode in clusterStatus.dbNodes:
if dbNode.name in self.hostMapForDel.keys():
if dbNode.isNodeHealthy():
statusDelHost += "The status of %s is %s \n" \
% (dbNode.name,
DbClusterStatus.OM_NODE_STATUS_NORMAL)
else:
statusDelHost += "The status of %s is %s \n" \
% (dbNode.name,
DbClusterStatus.OM_NODE_STATUS_ABNORMAL)
flag = input(
statusDelHost + "\n \
Do you want to continue to drop the target node (yes/no)? ")
statusDelHost + "Do you want to continue "
"to drop the target node (yes/no)? ")
count_f = 2
while count_f:
if (
@ -260,10 +266,7 @@ General options:
flag = input("Please type 'yes' or 'no': ")
continue
break
if count_f == 0:
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35805"] % flag.upper())
if flag.upper() == "NO" or flag.upper() == "N":
if flag.upper() != "YES" and flag.upper() != "Y":
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35805"] % flag.upper())
@ -278,9 +281,10 @@ General options:
resultMap, outputCollect = sshTool.getSshStatusOutput(command,
hostnames, env)
self.logger.debug(outputCollect)
self.failureHosts = re.findall(r"\[FAILURE\] .*:.*\n", outputCollect)
for host in self.failureHosts:
if host in self.hostMapForExist.keys():
self.failureHosts = '.'.join(re.findall(r"\[FAILURE\] .*:.*\n",
outputCollect))
for host in list(self.hostMapForExist.keys()):
if host in self.failureHosts:
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35807"] % host)
@ -288,9 +292,6 @@ General options:
"""
init log file
"""
cmd = "echo ~%s" % self.user
(status, output) = subprocess.getstatusoutput(cmd)
self.userProfile = os.path.join(output, ".bashrc")
if not os.path.isfile(self.userProfile):
raise Exception(
ErrorCode.GAUSS_502["GAUSS_50210"] % self.userProfile)
@ -307,11 +308,15 @@ General options:
if __name__ == "__main__":
# check if user is root
if (os.getuid() == 0):
GaussLog.exitWithError(ErrorCode.GAUSS_501["GAUSS_50105"])
dropNode = Dropnode()
dropNode.parseCommandLine()
dropNode.initLogs()
dropNode.check_repeat_process()
dropNode.checkParameters()
dropNode.check_cluster_status()
dropNode.flagForOnlyPrimaryLeft()
dropNode.checkConnection(list(dropNode.backIpNameMap.keys()),
dropNode.envFile)

View File

@ -34,7 +34,8 @@ from gspylib.common.ErrorCode import ErrorCode
from gspylib.common.Common import DefaultValue, ClusterCommand
from gspylib.common.GaussLog import GaussLog
from gspylib.inspection.common.SharedFuncs import cleanFile
from gspylib.inspection.common.Exception import SQLCommandException
from gspylib.inspection.common.Exception import CheckException, \
SQLCommandException
sys.path.append(sys.path[0] + "/../../../lib/")
DefaultValue.doConfigForParamiko()
@ -95,9 +96,9 @@ class DropnodeImpl():
(status, output) = subprocess.getstatusoutput("which gs_om")
if "no gs_om in" in output:
raise Exception(ErrorCode.GAUSS_518["GAUSS_51800"] % "$GPHOME")
self.gphomepath = os.path.normpath(output.replace("/script/gs_om",""))
self.gphomepath = os.path.normpath(output.replace("/gs_om", ""))
self.appPath = self.context.clusterInfo.appPath
self.gsqlPath = "source %s;%s/bin/gsql" % (self.userProfile, self.appPath)
self.gsql_path = "source %s;%s/bin/gsql" % (self.userProfile, self.appPath)
currentTime = str(datetime.datetime.now()).replace(" ", "_").replace(
".", "_")
@ -106,17 +107,15 @@ class DropnodeImpl():
self.dnIdForDel += self.context.hostMapForDel[hostDelName]['dn_id']
self.commonOper = OperCommon(dropnode)
def changeUser(self):
def change_user(self):
if os.getuid() == 0:
user = self.user
try:
pw_record = pwd.getpwnam(user)
except Exception:
except CheckException:
GaussLog.exitWithError(ErrorCode.GAUSS_503["GAUSS_50300"] % user)
user_name = pw_record.pw_name
user_uid = pw_record.pw_uid
user_gid = pw_record.pw_gid
env = os.environ.copy()
os.setgid(user_gid)
os.setuid(user_uid)
@ -124,32 +123,27 @@ class DropnodeImpl():
"""
check all standby state whether switchover is happening
"""
for hostNameLoop in self.context.hostMapForExist.keys():
sshtool_host = SshTool([hostNameLoop])
for i in self.context.hostMapForExist[hostNameLoop]['datadir']:
# check whether switchover/failover is happening
self.commonOper.checkStandbyState(hostNameLoop, i,
sshtool_host,
self.userProfile)
self.cleanSshToolFile(sshtool_host)
for hostNameLoop in self.context.hostMapForDel.keys():
if hostNameLoop not in self.context.failureHosts:
sshtool_host = SshTool([hostNameLoop])
for i in self.context.hostMapForDel[hostNameLoop]['datadir']:
# check whether switchover/failover is happening
if not self.commonOper.checkStandbyState(hostNameLoop, i,
sshtool_host,
self.userProfile, True):
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35808"] % hostNameLoop)
self.commonOper.checkStandbyState(hostNameLoop, i,
sshtool_host,
self.userProfile, True)
self.commonOper.stopInstance(hostNameLoop, sshtool_host, i,
self.userProfile)
self.cleanSshToolFile(sshtool_host)
for hostNameLoop in self.context.hostMapForExist.keys():
sshtool_host = SshTool([hostNameLoop])
for i in self.context.hostMapForExist[hostNameLoop]['datadir']:
# check whether switchover/failover is happening
if not self.commonOper.checkStandbyState(hostNameLoop, i,
sshtool_host,
self.userProfile):
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35808"] % hostNameLoop)
self.cleanSshToolFile(sshtool_host)
def dropNodeOnAllHosts(self):
"""
drop the target node on the other host
@ -205,22 +199,22 @@ class DropnodeImpl():
resultDictForRollback[
'rollbackReplStr'])
try:
replSlot = self.commonOper.GetReplSlot(hostNameLoop,
sshtool_host, self.userProfile, self.gsqlPath,
repl_slot = self.commonOper.get_repl_slot(hostNameLoop,
sshtool_host, self.userProfile, self.gsql_path,
self.context.hostMapForExist[hostNameLoop]['port'][
indexForuse])
self.commonOper.SetReplSlot(hostNameLoop, sshtool_host,
self.userProfile, self.gsqlPath,
self.userProfile, self.gsql_path,
self.context.hostMapForExist[
hostNameLoop]['port'][indexForuse
], self.dnIdForDel, replSlot)
], self.dnIdForDel, repl_slot)
except ValueError:
self.logger.log("[gs_dropnode]Rollback replslot")
self.commonOper.SetReplSlot(hostNameLoop, sshtool_host,
self.userProfile, self.gsqlPath,
self.userProfile, self.gsql_path,
self.context.hostMapForExist[
hostNameLoop]['port'][indexForuse
], self.dnIdForDel, replSlot, True)
], self.dnIdForDel, repl_slot, True)
indexForuse += 1
self.cleanSshToolFile(sshtool_host)
@ -228,20 +222,18 @@ class DropnodeImpl():
"""
operation only need to be executed on primary node
"""
LocalhostName = self.localhostname
sshtool_host = SshTool([LocalhostName])
try:
self.commonOper.SetPghbaConf(self.userProfile, LocalhostName,
self.resultDictOfPrimary[0][
'pghbaStr'], False,
self.context.flagOnlyPrimary)
except ValueError:
self.logger.log("[gs_dropnode]Rollback pghba conf.")
self.commonOper.SetPghbaConf(self.userProfile, LocalhostName,
self.resultDictOfPrimary[0][
'pghbaStr'], True,
self.context.flagOnlyPrimary)
self.cleanSshToolFile(sshtool_host)
for hostNameLoop in self.context.hostMapForExist.keys():
sshtool_host = SshTool([hostNameLoop])
try:
self.commonOper.SetPghbaConf(self.userProfile, hostNameLoop,
self.resultDictOfPrimary[0][
'pghbaStr'], False)
except ValueError:
self.logger.log("[gs_dropnode]Rollback pghba conf.")
self.commonOper.SetPghbaConf(self.userProfile, hostNameLoop,
self.resultDictOfPrimary[0][
'pghbaStr'], True)
self.cleanSshToolFile(sshtool_host)
def modifyStaticConf(self):
"""
@ -281,7 +273,6 @@ class DropnodeImpl():
sshtool = SshTool(self.context.clusterInfo.getClusterNodeNames())
cmd = "%s/script/gs_om -t refreshconf" % self.gphomepath
(status, output) = subprocess.getstatusoutput(cmd)
self.logger.debug("[gs_dropnode]Output of refresh dynamic conf :%s." % output)
for hostName in self.context.hostMapForExist.keys():
hostSsh = SshTool([hostName])
if hostName != self.localhostname:
@ -385,10 +376,7 @@ class DropnodeImpl():
flag = input("Please type 'yes' or 'no': ")
continue
break
if count_f == 0:
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35805"] % flag.upper())
if flag.upper() == "NO" or flag.upper() == "N":
if flag.upper() != "YES" and flag.upper() != "Y":
GaussLog.exitWithError(
ErrorCode.GAUSS_358["GAUSS_35805"] % flag.upper())
sshTool = SshTool([self.localhostname])
@ -404,7 +392,7 @@ class DropnodeImpl():
"""
start dropnode
"""
self.changeUser()
self.change_user()
self.checkUserAndGroupExists()
self.logger.log("[gs_dropnode]Start to drop nodes of the cluster.")
self.checkAllStandbyState()
@ -432,14 +420,16 @@ class OperCommon:
(statusMap, output) = sshTool.getSshStatusOutput(sshcmd, [host],
envfile)
if 'Is server running?' in output and not isForDel:
return False
if 'Is server running?' in output and isForDel:
return True
res = re.findall(r'db_state\s*:\s*(\w+)', output)
dbState = res[0]
if dbState in ['Promoting', 'Wait', 'Demoting']:
return False
return True
GaussLog.exitWithError(ErrorCode.GAUSS_516["GAUSS_51651"] % host)
elif 'Is server running?' in output and isForDel:
return
else:
res = re.findall(r'db_state\s*:\s*(\w+)', output)
if not len(res):
GaussLog.exitWithError(ErrorCode.GAUSS_516["GAUSS_51651"] % host)
dbState = res[0]
if dbState in ['Promoting', 'Wait', 'Demoting']:
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35808"] % host)
def backupConf(self, appPath, user, host, envfile, sshTool):
"""
@ -450,7 +440,7 @@ class OperCommon:
"[gs_dropnode]Start to backup parameter config file on %s." % host)
tmpPath = '/tmp/gs_dropnode_backup' + \
str(datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
backupPyPath = os.path.join(appPath, '../om/script/local/Backup.py')
backupPyPath = os.path.join(appPath, './script/local/Backup.py')
cmd = "(find /tmp -type d | grep gs_dropnode_backup | xargs rm -rf;" \
"if [ ! -d '%s' ]; then mkdir -p '%s' -m %s;fi)" \
% (tmpPath, tmpPath, DefaultValue.KEY_DIRECTORY_MODE)
@ -533,6 +523,7 @@ class OperCommon:
list_output1 = output_dn_nospace.split(',')
list_output1.remove(dninst)
list_output1 = ','.join(list_output1)
output_dn_nospace = list_output1
init_no -= 1
count_dn += 1
if count_dn == 0 or list_output1 == '':
@ -613,7 +604,7 @@ class OperCommon:
sqlvalue += "ALTER SYSTEM SET synchronous_standby_names = '%s';" \
% syncStandbyValue
if singleLeft:
sqlvalue += "ALTER SYSTEM SET synchronous_standby_names = '';"
sqlvalue += "ALTER SYSTEM SET synchronous_standby_names = '*';"
if sqlvalue != '':
cmd = "touch %s && chmod %s %s" % \
(sqlExecFile, DefaultValue.MAX_DIRECTORY_MODE, sqlExecFile)
@ -655,26 +646,22 @@ class OperCommon:
"[gs_dropnode]End of set postgresql config file on %s." % host)
def SetPghbaConf(self, envProfile, host, pgHbaValue,
flagRollback=False, flagPrimayOnly=False):
flagRollback=False):
"""
Set the value of pg_hba.conf
"""
self.logger.log(
"[gs_dropnode]Start of set pg_hba config file on %s." % host)
cmd = 'source %s;' % envProfile
if not flagPrimayOnly:
hostPara = 'all'
if flagPrimayOnly:
hostPara = host
if len(pgHbaValue):
if not flagRollback:
for i in pgHbaValue[:-1].split('|'):
v = i[0:i.find('/32') + 3]
cmd += "gs_guc set -N %s -I all -h '%s';" % (hostPara, v)
cmd += "gs_guc set -N %s -I all -h '%s';" % (host, v)
if flagRollback:
for i in pgHbaValue[:-1].split('|'):
cmd += "gs_guc set -N %s -I all -h '%s';" \
% (hostPara, i.strip())
% (host, i.strip())
(status, output) = subprocess.getstatusoutput(cmd)
result_v = re.findall(r'Failed instances: (\d)\.', output)
if status:
@ -696,33 +683,34 @@ class OperCommon:
self.logger.log(
"[gs_dropnode]End of set pg_hba config file on %s." % host)
def GetReplSlot(self, host, sshTool, envfile, gsqlPath, port):
def get_repl_slot(self, host, ssh_tool, envfile, gsql_path, port):
"""
Get the replication slot on primary node only
"""
self.logger.log("[gs_dropnode]Start to get repl slot on primary node.")
selectSQL = "SELECT slot_name,plugin,slot_type FROM pg_replication_slots;"
querycmd = "%s -p %s postgres -A -t -c '%s'" % (gsqlPath, port, selectSQL)
(status, output) = sshTool.getSshStatusOutput(querycmd, [host], envfile)
querycmd = "%s -p %s postgres -A -t -c '%s'" % (gsql_path, port, selectSQL)
(status, output) = ssh_tool.getSshStatusOutput(querycmd, [host], envfile)
if status[host] != 'Success' or "ERROR" in output:
self.logger.debug(
"[gs_dropnode]Get repl slot failed:" + output)
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35809"])
return ','.join(output.split('\n')[1:])
def SetReplSlot(self, host, sshTool, envfile, gsqlPath, port, dnid, replslot_output, flagRollback=False):
def SetReplSlot(self, host, sshTool, envfile, gsqlPath, port, dnid,
replslot_output, flag_rollback=False):
"""
Drop the replication slot on primary node only
"""
self.logger.log("[gs_dropnode]Start to set repl slot on primary node.")
setcmd = ''
if not flagRollback:
if not flag_rollback:
for i in dnid:
if i in replslot_output:
setcmd += "%s -p %s postgres -A -t -c \\\"SELECT pg_drop_" \
"replication_slot('%s');\\\";" % \
(gsqlPath, port, i)
if flagRollback:
if flag_rollback:
list_o = [i.split('|') for i in replslot_output.split(',')]
for r in list_o:
if r[0] in dnid and r[2] == 'physical':