From 96cbe9f68361085c15de207e1cdac5b19c6df8f1 Mon Sep 17 00:00:00 2001 From: z00848344 Date: Tue, 30 Jul 2024 16:36:52 +0800 Subject: [PATCH] On branch xlog_root Your branch is up to date with 'origin/xlog_root'. Changes to be committed: modified: script/impl/expansion/expansion_impl_with_cm.py modified: script/local/LocalCollect.py --- .../impl/expansion/expansion_impl_with_cm.py | 18 +++-- script/local/LocalCollect.py | 76 +++++++++++++++---- 2 files changed, 74 insertions(+), 20 deletions(-) diff --git a/script/impl/expansion/expansion_impl_with_cm.py b/script/impl/expansion/expansion_impl_with_cm.py index db491ab..09fb074 100644 --- a/script/impl/expansion/expansion_impl_with_cm.py +++ b/script/impl/expansion/expansion_impl_with_cm.py @@ -506,7 +506,10 @@ class ExpansionImplWithCm(ExpansionImpl): """ dss_home = self.get_dss_env_root("DSS_HOME") dss_inst = dss_home + '/cfg/dss_inst.ini' - get_list_cmd = f"su - {self.user} -c 'cat {dss_inst} | grep DSS_NODES_LIST'" + if os.getuid() == 0: + get_list_cmd = f"su - {self.user} -c 'cat {dss_inst} | grep DSS_NODES_LIST'" + else: + get_list_cmd = f'cat {dss_inst} | grep DSS_NODES_LIST' status, output = subprocess.getstatusoutput(get_list_cmd) if status != 0: self.logger.debug("Failed to get old DSS_NODES_LIST.") @@ -521,7 +524,8 @@ class ExpansionImplWithCm(ExpansionImpl): id += 1 new_list += ',%d:%s:%d' % (last_id + id, node, port) update_list_cmd = "sed -i 's/%s/%s/g' %s" % (old_list, new_list, dss_inst) - update_list_cmd = f"su - {self.user} -c '{update_list_cmd}'" + if os.getuid() == 0: + update_list_cmd = f"su - {self.user} -c '{update_list_cmd}'" self.logger.debug("Command for update dss_inst: %s" % update_list_cmd) for host in ExpansionImplWithCm.get_node_names(hosts): sshTool = SshTool([host], timeout=300) @@ -540,8 +544,11 @@ class ExpansionImplWithCm(ExpansionImpl): Update ss_interconnect_url on old nodes. """ pg_port = self.get_dss_env_root("PGPORT") - get_url_cmd = 'su - %s -c "source %s; gsql -d postgres -p %s -c \'show ss_interconnect_url;\'"' % ( - self.user, self.envFile, pg_port) + if os.getuid() == 0: + get_url_cmd = 'su - %s -c "source %s; gsql -d postgres -p %s -c \'show ss_interconnect_url;\'"' % ( + self.user, self.envFile, pg_port) + else: + get_url_cmd = "source %s; gsql -d postgres -p %s -c 'show ss_interconnect_url;'" % (self.envFile, pg_port) sta, out = subprocess.getstatusoutput(get_url_cmd) url = out.split('\n')[2] url_port = (url.split(',')[0]).split(':')[-1] @@ -550,7 +557,8 @@ class ExpansionImplWithCm(ExpansionImpl): pg_data = self.get_dss_env_root("PGDATA=") + '/postgresql.conf' guc_cmd = "sed -i 's/%s/%s/g' %s" % (url.strip(), new_url, pg_data) - guc_cmd = f"su - {self.user} -c '{guc_cmd}'" + if os.getuid() == 0: + guc_cmd = f"su - {self.user} -c '{guc_cmd}'" self.logger.debug("Command for update ss_interconnect_url: %s" % guc_cmd) for host in ExpansionImplWithCm.get_node_names(hosts): sshTool = SshTool([host], timeout=300) diff --git a/script/local/LocalCollect.py b/script/local/LocalCollect.py index 8146931..cdea9ba 100644 --- a/script/local/LocalCollect.py +++ b/script/local/LocalCollect.py @@ -49,6 +49,7 @@ from domain_utils.domain_common.cluster_constants import ClusterConstants from domain_utils.cluster_os.cluster_user import ClusterUser from domain_utils.cluster_file.cluster_dir import ClusterDir from gspylib.component.DSS.dss_comp import Dss, DssInst +from gspylib.threads.SshTool import SshTool ########################### # instance type. only for CN/DN @@ -1137,12 +1138,19 @@ def getTargetFile(dir_path, fileList): getTargetFile(newDir, fileList) return fileList -def get_dss_xlog_dir(pri_vgname): +def get_dss_xlog_dir(pri_vgname, normalNode): cmd = "dsscmd ls -p +%s" % pri_vgname - (status, output) = subprocess.getstatusoutput(cmd) - if status != 0: - g_logger.debug("Failed to collect xlog directorys.") - raise Exception("") + if normalNode == HOSTNAME: + (status, output) = subprocess.getstatusoutput(cmd) + if status != 0: + g_logger.debug("Failed to collect xlog directorys.") + raise Exception("") + else: + sshTool = SshTool([normalNode]) + resultMap, output = sshTool.getSshStatusOutput(cmd, [normalNode]) + if DefaultValue.FAILURE in resultMap.values(): + g_logger.debug("Failed to collect xlog directorys.") + raise Exception("") xlog_dirs = [] out_list = output.split('\n') for out in out_list: @@ -1153,17 +1161,24 @@ def get_dss_xlog_dir(pri_vgname): return xlog_dirs -def get_dss_xlog_file(xlog_path): +def get_dss_xlog_file(xlog_path, normalNode): """ function: get xlog file list when dss enabled input: xlog path output: xlog_lists """ cmd = "dsscmd ls -p %s" % xlog_path - (status, output) = subprocess.getstatusoutput(cmd) - if status != 0: - g_logger.debug("Failed to collect disk xlog files") - raise Exception("Failed to collect disk xlog files") + if normalNode == HOSTNAME: + (status, output) = subprocess.getstatusoutput(cmd) + if status != 0: + g_logger.debug("Failed to collect disk xlog files") + raise Exception("Failed to collect disk xlog files") + else: + sshTool = SshTool([normalNode]) + resultMap, output = sshTool.getSshStatusOutput(cmd, [normalNode]) + if DefaultValue.FAILURE in resultMap.values(): + g_logger.debug("Failed to collect xlog files.") + raise Exception("") out_lines = output.split('\n') xlog_lists = [] for line in out_lines: @@ -1177,7 +1192,7 @@ def get_dss_xlog_file(xlog_path): pop_num -= 1 return xlog_lists -def getXlogCmd(Inst): +def getXlogCmd(Inst, normalNode): """ function: get xlog file input : Inst @@ -1190,11 +1205,11 @@ def getXlogCmd(Inst): dss_home = EnvUtil.getEnv('DSS_HOME') inst_id = DssInst.get_dss_id_from_key(dss_home) pri_vgname = DssInst.get_private_vgname_by_ini(dss_home, inst_id) - xlog_dirs = get_dss_xlog_dir(pri_vgname) + xlog_dirs = get_dss_xlog_dir(pri_vgname, normalNode) xlogs = [] for xdir in xlog_dirs: pg_xlog = '+' + pri_vgname + '/' + xdir - tmp_xlogs = get_dss_xlog_file(pg_xlog) + tmp_xlogs = get_dss_xlog_file(pg_xlog, normalNode) xlogs.extend(tmp_xlogs) else: pg_xlog = Inst.datadir + "/pg_xlog" @@ -1236,15 +1251,39 @@ def getXlogCmd(Inst): "'%s/xlogfiles/xlogfile_%s/%s_%s'" % \ (cmd, xlog, g_resultdir, g_current_time, prefix, Inst.instanceId) + if normalNode != HOSTNAME: + cmd = "%s && pscp -H %s -r %s/xlogfiles/xlogfile_%s/ %s/xlogfiles/. >/dev/null && rm -rf %s" % ( + cmd, HOSTNAME, g_resultdir, g_current_time, g_resultdir, g_resultdir) return cmd +def find_dss_normal_node(): + """ + function: find normal node when dss enabled. + input: None + output: node that can process dsscmd. + """ + if not check_dss_env(): + return HOSTNAME + cmd = "cm_ctl query -Cv | awk 'END{print}'" + _, output = subprocess.getstatusoutput(cmd) + node_states = output.split("|") + normal_nodes = list() + for node in node_states: + state = (node.rstrip()).split(" ")[-1] + if state == "Normal": + normal_nodes.append(node.split()[1]) + if HOSTNAME in normal_nodes: + return HOSTNAME + return normal_nodes[0] + def parallel_xlog(Inst): """ parallel copy xlog files """ - cmd = getXlogCmd(Inst) - if len(cmd) > 1: + normalNode = find_dss_normal_node() + cmd = getXlogCmd(Inst, normalNode) + if len(cmd) > 1 and normalNode == HOSTNAME: (status, output) = subprocess.getstatusoutput(cmd) if status != 0: g_logger.debug( @@ -1253,6 +1292,13 @@ def parallel_xlog(Inst): g_jobInfo.failedTask["collect xlog files"] = replaceInvalidStr( output) raise Exception("") + if len(cmd) > 1 and normalNode != HOSTNAME: + sshTool = SshTool([normalNode]) + resultMap, outputMap = sshTool.getSshStatusOutput(cmd, [normalNode]) + if resultMap[normalNode] != DefaultValue.SUCCESS: + g_logger.debug("Failed to collect xlog files. Command: %s.\n Error: %s\n" % (cmd, output)) + g_jobInfo.failedTask["collect xlog files"] = replaceInvalidStr(output) + raise Exception("") def check_core_pattern(): """