!780 修复资源池化集群节点down收集xlog失败

Merge pull request !780 from 张悦萌/xlog_root
This commit is contained in:
opengauss_bot 2024-07-30 09:49:25 +00:00 committed by Gitee
commit d103858dd2
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
2 changed files with 74 additions and 20 deletions

View File

@ -506,7 +506,10 @@ class ExpansionImplWithCm(ExpansionImpl):
"""
dss_home = self.get_dss_env_root("DSS_HOME")
dss_inst = dss_home + '/cfg/dss_inst.ini'
get_list_cmd = f"su - {self.user} -c 'cat {dss_inst} | grep DSS_NODES_LIST'"
if os.getuid() == 0:
get_list_cmd = f"su - {self.user} -c 'cat {dss_inst} | grep DSS_NODES_LIST'"
else:
get_list_cmd = f'cat {dss_inst} | grep DSS_NODES_LIST'
status, output = subprocess.getstatusoutput(get_list_cmd)
if status != 0:
self.logger.debug("Failed to get old DSS_NODES_LIST.")
@ -521,7 +524,8 @@ class ExpansionImplWithCm(ExpansionImpl):
id += 1
new_list += ',%d:%s:%d' % (last_id + id, node, port)
update_list_cmd = "sed -i 's/%s/%s/g' %s" % (old_list, new_list, dss_inst)
update_list_cmd = f"su - {self.user} -c '{update_list_cmd}'"
if os.getuid() == 0:
update_list_cmd = f"su - {self.user} -c '{update_list_cmd}'"
self.logger.debug("Command for update dss_inst: %s" % update_list_cmd)
for host in ExpansionImplWithCm.get_node_names(hosts):
sshTool = SshTool([host], timeout=300)
@ -540,8 +544,11 @@ class ExpansionImplWithCm(ExpansionImpl):
Update ss_interconnect_url on old nodes.
"""
pg_port = self.get_dss_env_root("PGPORT")
get_url_cmd = 'su - %s -c "source %s; gsql -d postgres -p %s -c \'show ss_interconnect_url;\'"' % (
self.user, self.envFile, pg_port)
if os.getuid() == 0:
get_url_cmd = 'su - %s -c "source %s; gsql -d postgres -p %s -c \'show ss_interconnect_url;\'"' % (
self.user, self.envFile, pg_port)
else:
get_url_cmd = "source %s; gsql -d postgres -p %s -c 'show ss_interconnect_url;'" % (self.envFile, pg_port)
sta, out = subprocess.getstatusoutput(get_url_cmd)
url = out.split('\n')[2]
url_port = (url.split(',')[0]).split(':')[-1]
@ -550,7 +557,8 @@ class ExpansionImplWithCm(ExpansionImpl):
pg_data = self.get_dss_env_root("PGDATA=") + '/postgresql.conf'
guc_cmd = "sed -i 's/%s/%s/g' %s" % (url.strip(), new_url, pg_data)
guc_cmd = f"su - {self.user} -c '{guc_cmd}'"
if os.getuid() == 0:
guc_cmd = f"su - {self.user} -c '{guc_cmd}'"
self.logger.debug("Command for update ss_interconnect_url: %s" % guc_cmd)
for host in ExpansionImplWithCm.get_node_names(hosts):
sshTool = SshTool([host], timeout=300)

View File

@ -49,6 +49,7 @@ from domain_utils.domain_common.cluster_constants import ClusterConstants
from domain_utils.cluster_os.cluster_user import ClusterUser
from domain_utils.cluster_file.cluster_dir import ClusterDir
from gspylib.component.DSS.dss_comp import Dss, DssInst
from gspylib.threads.SshTool import SshTool
###########################
# instance type. only for CN/DN
@ -1137,12 +1138,19 @@ def getTargetFile(dir_path, fileList):
getTargetFile(newDir, fileList)
return fileList
def get_dss_xlog_dir(pri_vgname):
def get_dss_xlog_dir(pri_vgname, normalNode):
cmd = "dsscmd ls -p +%s" % pri_vgname
(status, output) = subprocess.getstatusoutput(cmd)
if status != 0:
g_logger.debug("Failed to collect xlog directorys.")
raise Exception("")
if normalNode == HOSTNAME:
(status, output) = subprocess.getstatusoutput(cmd)
if status != 0:
g_logger.debug("Failed to collect xlog directorys.")
raise Exception("")
else:
sshTool = SshTool([normalNode])
resultMap, output = sshTool.getSshStatusOutput(cmd, [normalNode])
if DefaultValue.FAILURE in resultMap.values():
g_logger.debug("Failed to collect xlog directorys.")
raise Exception("")
xlog_dirs = []
out_list = output.split('\n')
for out in out_list:
@ -1153,17 +1161,24 @@ def get_dss_xlog_dir(pri_vgname):
return xlog_dirs
def get_dss_xlog_file(xlog_path):
def get_dss_xlog_file(xlog_path, normalNode):
"""
function: get xlog file list when dss enabled
input: xlog path
output: xlog_lists
"""
cmd = "dsscmd ls -p %s" % xlog_path
(status, output) = subprocess.getstatusoutput(cmd)
if status != 0:
g_logger.debug("Failed to collect disk xlog files")
raise Exception("Failed to collect disk xlog files")
if normalNode == HOSTNAME:
(status, output) = subprocess.getstatusoutput(cmd)
if status != 0:
g_logger.debug("Failed to collect disk xlog files")
raise Exception("Failed to collect disk xlog files")
else:
sshTool = SshTool([normalNode])
resultMap, output = sshTool.getSshStatusOutput(cmd, [normalNode])
if DefaultValue.FAILURE in resultMap.values():
g_logger.debug("Failed to collect xlog files.")
raise Exception("")
out_lines = output.split('\n')
xlog_lists = []
for line in out_lines:
@ -1177,7 +1192,7 @@ def get_dss_xlog_file(xlog_path):
pop_num -= 1
return xlog_lists
def getXlogCmd(Inst):
def getXlogCmd(Inst, normalNode):
"""
function: get xlog file
input : Inst
@ -1190,11 +1205,11 @@ def getXlogCmd(Inst):
dss_home = EnvUtil.getEnv('DSS_HOME')
inst_id = DssInst.get_dss_id_from_key(dss_home)
pri_vgname = DssInst.get_private_vgname_by_ini(dss_home, inst_id)
xlog_dirs = get_dss_xlog_dir(pri_vgname)
xlog_dirs = get_dss_xlog_dir(pri_vgname, normalNode)
xlogs = []
for xdir in xlog_dirs:
pg_xlog = '+' + pri_vgname + '/' + xdir
tmp_xlogs = get_dss_xlog_file(pg_xlog)
tmp_xlogs = get_dss_xlog_file(pg_xlog, normalNode)
xlogs.extend(tmp_xlogs)
else:
pg_xlog = Inst.datadir + "/pg_xlog"
@ -1236,15 +1251,39 @@ def getXlogCmd(Inst):
"'%s/xlogfiles/xlogfile_%s/%s_%s'" % \
(cmd, xlog, g_resultdir, g_current_time,
prefix, Inst.instanceId)
if normalNode != HOSTNAME:
cmd = "%s && pscp -H %s -r %s/xlogfiles/xlogfile_%s/ %s/xlogfiles/. >/dev/null && rm -rf %s" % (
cmd, HOSTNAME, g_resultdir, g_current_time, g_resultdir, g_resultdir)
return cmd
def find_dss_normal_node():
"""
function: find normal node when dss enabled.
input: None
output: node that can process dsscmd.
"""
if not check_dss_env():
return HOSTNAME
cmd = "cm_ctl query -Cv | awk 'END{print}'"
_, output = subprocess.getstatusoutput(cmd)
node_states = output.split("|")
normal_nodes = list()
for node in node_states:
state = (node.rstrip()).split(" ")[-1]
if state == "Normal":
normal_nodes.append(node.split()[1])
if HOSTNAME in normal_nodes:
return HOSTNAME
return normal_nodes[0]
def parallel_xlog(Inst):
"""
parallel copy xlog files
"""
cmd = getXlogCmd(Inst)
if len(cmd) > 1:
normalNode = find_dss_normal_node()
cmd = getXlogCmd(Inst, normalNode)
if len(cmd) > 1 and normalNode == HOSTNAME:
(status, output) = subprocess.getstatusoutput(cmd)
if status != 0:
g_logger.debug(
@ -1253,6 +1292,13 @@ def parallel_xlog(Inst):
g_jobInfo.failedTask["collect xlog files"] = replaceInvalidStr(
output)
raise Exception("")
if len(cmd) > 1 and normalNode != HOSTNAME:
sshTool = SshTool([normalNode])
resultMap, outputMap = sshTool.getSshStatusOutput(cmd, [normalNode])
if resultMap[normalNode] != DefaultValue.SUCCESS:
g_logger.debug("Failed to collect xlog files. Command: %s.\n Error: %s\n" % (cmd, output))
g_jobInfo.failedTask["collect xlog files"] = replaceInvalidStr(output)
raise Exception("")
def check_core_pattern():
"""