On branch xlog_root
Your branch is up to date with 'origin/xlog_root'. Changes to be committed: modified: script/impl/expansion/expansion_impl_with_cm.py modified: script/local/LocalCollect.py
This commit is contained in:
parent
177b86b24e
commit
96cbe9f683
@ -506,7 +506,10 @@ class ExpansionImplWithCm(ExpansionImpl):
|
||||
"""
|
||||
dss_home = self.get_dss_env_root("DSS_HOME")
|
||||
dss_inst = dss_home + '/cfg/dss_inst.ini'
|
||||
get_list_cmd = f"su - {self.user} -c 'cat {dss_inst} | grep DSS_NODES_LIST'"
|
||||
if os.getuid() == 0:
|
||||
get_list_cmd = f"su - {self.user} -c 'cat {dss_inst} | grep DSS_NODES_LIST'"
|
||||
else:
|
||||
get_list_cmd = f'cat {dss_inst} | grep DSS_NODES_LIST'
|
||||
status, output = subprocess.getstatusoutput(get_list_cmd)
|
||||
if status != 0:
|
||||
self.logger.debug("Failed to get old DSS_NODES_LIST.")
|
||||
@ -521,7 +524,8 @@ class ExpansionImplWithCm(ExpansionImpl):
|
||||
id += 1
|
||||
new_list += ',%d:%s:%d' % (last_id + id, node, port)
|
||||
update_list_cmd = "sed -i 's/%s/%s/g' %s" % (old_list, new_list, dss_inst)
|
||||
update_list_cmd = f"su - {self.user} -c '{update_list_cmd}'"
|
||||
if os.getuid() == 0:
|
||||
update_list_cmd = f"su - {self.user} -c '{update_list_cmd}'"
|
||||
self.logger.debug("Command for update dss_inst: %s" % update_list_cmd)
|
||||
for host in ExpansionImplWithCm.get_node_names(hosts):
|
||||
sshTool = SshTool([host], timeout=300)
|
||||
@ -540,8 +544,11 @@ class ExpansionImplWithCm(ExpansionImpl):
|
||||
Update ss_interconnect_url on old nodes.
|
||||
"""
|
||||
pg_port = self.get_dss_env_root("PGPORT")
|
||||
get_url_cmd = 'su - %s -c "source %s; gsql -d postgres -p %s -c \'show ss_interconnect_url;\'"' % (
|
||||
self.user, self.envFile, pg_port)
|
||||
if os.getuid() == 0:
|
||||
get_url_cmd = 'su - %s -c "source %s; gsql -d postgres -p %s -c \'show ss_interconnect_url;\'"' % (
|
||||
self.user, self.envFile, pg_port)
|
||||
else:
|
||||
get_url_cmd = "source %s; gsql -d postgres -p %s -c 'show ss_interconnect_url;'" % (self.envFile, pg_port)
|
||||
sta, out = subprocess.getstatusoutput(get_url_cmd)
|
||||
url = out.split('\n')[2]
|
||||
url_port = (url.split(',')[0]).split(':')[-1]
|
||||
@ -550,7 +557,8 @@ class ExpansionImplWithCm(ExpansionImpl):
|
||||
pg_data = self.get_dss_env_root("PGDATA=") + '/postgresql.conf'
|
||||
|
||||
guc_cmd = "sed -i 's/%s/%s/g' %s" % (url.strip(), new_url, pg_data)
|
||||
guc_cmd = f"su - {self.user} -c '{guc_cmd}'"
|
||||
if os.getuid() == 0:
|
||||
guc_cmd = f"su - {self.user} -c '{guc_cmd}'"
|
||||
self.logger.debug("Command for update ss_interconnect_url: %s" % guc_cmd)
|
||||
for host in ExpansionImplWithCm.get_node_names(hosts):
|
||||
sshTool = SshTool([host], timeout=300)
|
||||
|
@ -49,6 +49,7 @@ from domain_utils.domain_common.cluster_constants import ClusterConstants
|
||||
from domain_utils.cluster_os.cluster_user import ClusterUser
|
||||
from domain_utils.cluster_file.cluster_dir import ClusterDir
|
||||
from gspylib.component.DSS.dss_comp import Dss, DssInst
|
||||
from gspylib.threads.SshTool import SshTool
|
||||
|
||||
###########################
|
||||
# instance type. only for CN/DN
|
||||
@ -1137,12 +1138,19 @@ def getTargetFile(dir_path, fileList):
|
||||
getTargetFile(newDir, fileList)
|
||||
return fileList
|
||||
|
||||
def get_dss_xlog_dir(pri_vgname):
|
||||
def get_dss_xlog_dir(pri_vgname, normalNode):
|
||||
cmd = "dsscmd ls -p +%s" % pri_vgname
|
||||
(status, output) = subprocess.getstatusoutput(cmd)
|
||||
if status != 0:
|
||||
g_logger.debug("Failed to collect xlog directorys.")
|
||||
raise Exception("")
|
||||
if normalNode == HOSTNAME:
|
||||
(status, output) = subprocess.getstatusoutput(cmd)
|
||||
if status != 0:
|
||||
g_logger.debug("Failed to collect xlog directorys.")
|
||||
raise Exception("")
|
||||
else:
|
||||
sshTool = SshTool([normalNode])
|
||||
resultMap, output = sshTool.getSshStatusOutput(cmd, [normalNode])
|
||||
if DefaultValue.FAILURE in resultMap.values():
|
||||
g_logger.debug("Failed to collect xlog directorys.")
|
||||
raise Exception("")
|
||||
xlog_dirs = []
|
||||
out_list = output.split('\n')
|
||||
for out in out_list:
|
||||
@ -1153,17 +1161,24 @@ def get_dss_xlog_dir(pri_vgname):
|
||||
|
||||
return xlog_dirs
|
||||
|
||||
def get_dss_xlog_file(xlog_path):
|
||||
def get_dss_xlog_file(xlog_path, normalNode):
|
||||
"""
|
||||
function: get xlog file list when dss enabled
|
||||
input: xlog path
|
||||
output: xlog_lists
|
||||
"""
|
||||
cmd = "dsscmd ls -p %s" % xlog_path
|
||||
(status, output) = subprocess.getstatusoutput(cmd)
|
||||
if status != 0:
|
||||
g_logger.debug("Failed to collect disk xlog files")
|
||||
raise Exception("Failed to collect disk xlog files")
|
||||
if normalNode == HOSTNAME:
|
||||
(status, output) = subprocess.getstatusoutput(cmd)
|
||||
if status != 0:
|
||||
g_logger.debug("Failed to collect disk xlog files")
|
||||
raise Exception("Failed to collect disk xlog files")
|
||||
else:
|
||||
sshTool = SshTool([normalNode])
|
||||
resultMap, output = sshTool.getSshStatusOutput(cmd, [normalNode])
|
||||
if DefaultValue.FAILURE in resultMap.values():
|
||||
g_logger.debug("Failed to collect xlog files.")
|
||||
raise Exception("")
|
||||
out_lines = output.split('\n')
|
||||
xlog_lists = []
|
||||
for line in out_lines:
|
||||
@ -1177,7 +1192,7 @@ def get_dss_xlog_file(xlog_path):
|
||||
pop_num -= 1
|
||||
return xlog_lists
|
||||
|
||||
def getXlogCmd(Inst):
|
||||
def getXlogCmd(Inst, normalNode):
|
||||
"""
|
||||
function: get xlog file
|
||||
input : Inst
|
||||
@ -1190,11 +1205,11 @@ def getXlogCmd(Inst):
|
||||
dss_home = EnvUtil.getEnv('DSS_HOME')
|
||||
inst_id = DssInst.get_dss_id_from_key(dss_home)
|
||||
pri_vgname = DssInst.get_private_vgname_by_ini(dss_home, inst_id)
|
||||
xlog_dirs = get_dss_xlog_dir(pri_vgname)
|
||||
xlog_dirs = get_dss_xlog_dir(pri_vgname, normalNode)
|
||||
xlogs = []
|
||||
for xdir in xlog_dirs:
|
||||
pg_xlog = '+' + pri_vgname + '/' + xdir
|
||||
tmp_xlogs = get_dss_xlog_file(pg_xlog)
|
||||
tmp_xlogs = get_dss_xlog_file(pg_xlog, normalNode)
|
||||
xlogs.extend(tmp_xlogs)
|
||||
else:
|
||||
pg_xlog = Inst.datadir + "/pg_xlog"
|
||||
@ -1236,15 +1251,39 @@ def getXlogCmd(Inst):
|
||||
"'%s/xlogfiles/xlogfile_%s/%s_%s'" % \
|
||||
(cmd, xlog, g_resultdir, g_current_time,
|
||||
prefix, Inst.instanceId)
|
||||
if normalNode != HOSTNAME:
|
||||
cmd = "%s && pscp -H %s -r %s/xlogfiles/xlogfile_%s/ %s/xlogfiles/. >/dev/null && rm -rf %s" % (
|
||||
cmd, HOSTNAME, g_resultdir, g_current_time, g_resultdir, g_resultdir)
|
||||
|
||||
return cmd
|
||||
|
||||
def find_dss_normal_node():
|
||||
"""
|
||||
function: find normal node when dss enabled.
|
||||
input: None
|
||||
output: node that can process dsscmd.
|
||||
"""
|
||||
if not check_dss_env():
|
||||
return HOSTNAME
|
||||
cmd = "cm_ctl query -Cv | awk 'END{print}'"
|
||||
_, output = subprocess.getstatusoutput(cmd)
|
||||
node_states = output.split("|")
|
||||
normal_nodes = list()
|
||||
for node in node_states:
|
||||
state = (node.rstrip()).split(" ")[-1]
|
||||
if state == "Normal":
|
||||
normal_nodes.append(node.split()[1])
|
||||
if HOSTNAME in normal_nodes:
|
||||
return HOSTNAME
|
||||
return normal_nodes[0]
|
||||
|
||||
def parallel_xlog(Inst):
|
||||
"""
|
||||
parallel copy xlog files
|
||||
"""
|
||||
cmd = getXlogCmd(Inst)
|
||||
if len(cmd) > 1:
|
||||
normalNode = find_dss_normal_node()
|
||||
cmd = getXlogCmd(Inst, normalNode)
|
||||
if len(cmd) > 1 and normalNode == HOSTNAME:
|
||||
(status, output) = subprocess.getstatusoutput(cmd)
|
||||
if status != 0:
|
||||
g_logger.debug(
|
||||
@ -1253,6 +1292,13 @@ def parallel_xlog(Inst):
|
||||
g_jobInfo.failedTask["collect xlog files"] = replaceInvalidStr(
|
||||
output)
|
||||
raise Exception("")
|
||||
if len(cmd) > 1 and normalNode != HOSTNAME:
|
||||
sshTool = SshTool([normalNode])
|
||||
resultMap, outputMap = sshTool.getSshStatusOutput(cmd, [normalNode])
|
||||
if resultMap[normalNode] != DefaultValue.SUCCESS:
|
||||
g_logger.debug("Failed to collect xlog files. Command: %s.\n Error: %s\n" % (cmd, output))
|
||||
g_jobInfo.failedTask["collect xlog files"] = replaceInvalidStr(output)
|
||||
raise Exception("")
|
||||
|
||||
def check_core_pattern():
|
||||
"""
|
||||
|
Loading…
x
Reference in New Issue
Block a user