修复资源池化集群节点故障扩容集群状态

On branch code-master
 Your branch is ahead of 'origin/code-master' by 1 commit.
 Changes to be committed:
	modified:   script/impl/expansion/expansion_impl_with_cm.py
This commit is contained in:
z00848344 2024-09-23 15:35:05 +08:00
parent 94ed5b756f
commit f12de5542a

View File

@ -21,11 +21,10 @@
import os
import re
import sys
import datetime
import subprocess
import stat
import socket
import collections
import json
from multiprocessing import Process, Value
from time import sleep
@ -47,6 +46,7 @@ from domain_utils.cluster_file.cluster_dir import ClusterDir
from gspylib.component.CM.CM_OLAP.CM_OLAP import CM_OLAP
from gspylib.component.DSS.dss_checker import DssConfig
from gspylib.component.DSS.dss_comp import DssInst
# Action type
@ -78,6 +78,7 @@ class ExpansionImplWithCm(ExpansionImpl):
self.xml_cluster_info = dbClusterInfo()
self.ssh_tool = None
self.new_nodes = list()
self.old_nodes = list()
self.app_names = list()
self.node_name_map = collections.defaultdict(str)
self._init_global()
@ -94,7 +95,9 @@ class ExpansionImplWithCm(ExpansionImpl):
self.new_nodes = [node for node in self.xml_cluster_info.dbNodes
for back_ip in node.backIps if back_ip in self.context.newHostList]
self.old_nodes = list(set(self.xml_cluster_info.dbNodes).difference(set(self.new_nodes)))
back_ips = self.xml_cluster_info.getClusterBackIps()
for i, ip in enumerate(back_ips):
host = self.xml_cluster_info.getNodeNameByBackIp(ip)
@ -436,8 +439,7 @@ class ExpansionImplWithCm(ExpansionImpl):
cmd += " -h 'host all %s %s/%s trust'" % (self.user, host_ip, submask_length)
cmd += " -h 'host all all %s/%s sha256'" % (host_ip, submask_length)
if self.xml_cluster_info.enable_dss == 'on':
old_nodes = list(set(self.xml_cluster_info.dbNodes).difference(set(self.new_nodes)))
node_ips = [node.backIps[0] for node in old_nodes]
node_ips = [node.backIps[0] for node in self.old_nodes]
for ip in node_ips:
cmd += " -h 'host all all %s/%s sha256'" % (ip, submask_length)
if self.xml_cluster_info.float_ips:
@ -504,31 +506,32 @@ class ExpansionImplWithCm(ExpansionImpl):
self.checkClusterStatus()
self.validNodeInStandbyList()
def get_dss_env_root(self, env):
def get_env(self, env):
"""
Get dss_home when current user is root.
"""
cmd = f"su - {self.user} -c 'cat {self.envFile} | grep {env}'"
cmd = f"cat {self.envFile} | grep {env} | grep /"
if os.getuid() == 0:
cmd = f"su - {self.user} -c '{cmd}'"
sta, out = subprocess.getstatusoutput(cmd)
value = out.split('=')[-1]
return value
def check_nodes_list(self, old_list, hosts):
def get_nodes_list(self, port, hosts):
"""
Check dss_nodes_list and change it if invalied.
"""
port = old_list.split(':')[-1]
cur_lists = list()
for host in hosts:
cur_lists.append(str(self.node_name_map[host][0]) + ":" + self.node_name_map[host][1] + ":" + port)
cur_list = ','.join(cur_lists)
return cur_list
def update_dss_inst(self, hosts):
def update_dss_inst(self, old_hosts, hosts):
"""
Update dss_nodes_list on old nodes.
"""
dss_home = self.get_dss_env_root("DSS_HOME")
dss_home = self.get_env("DSS_HOME")
dss_inst = dss_home + '/cfg/dss_inst.ini'
if os.getuid() == 0:
get_list_cmd = f"su - {self.user} -c 'cat {dss_inst} | grep DSS_NODES_LIST'"
@ -538,20 +541,13 @@ class ExpansionImplWithCm(ExpansionImpl):
if status != 0:
self.logger.debug("Failed to get old DSS_NODES_LIST.")
raise Exception("Failed to get old DSS_NODES_LIST.")
old_list = output.split('=')[1]
new_list = self.check_nodes_list(old_list, hosts)
port = int(old_list.split(':')[-1])
for node in self.context.newHostList:
name = self.xml_cluster_info.getNodeNameByBackIp(node)
id_num = self.node_name_map[name][0]
new_list += ',%d:%s:%d' % (id_num, node, port)
new_list = 'DSS_NODES_LIST=' + new_list
port = output.split(":")[-1]
new_list = 'DSS_NODES_LIST=' + self.get_nodes_list(port, hosts)
update_list_cmd = "sed -i 's/^.*DSS_NODES_LIST.*$/%s/' %s" % (new_list, dss_inst)
if os.getuid() == 0:
update_list_cmd = f"su - {self.user} -c '{update_list_cmd}'"
self.logger.debug("Command for update dss_inst: %s" % update_list_cmd)
for host in hosts:
for host in old_hosts:
sshTool = SshTool([host], timeout=300)
result_map, _ = sshTool.getSshStatusOutput(update_list_cmd, [])
if result_map[host] == DefaultValue.SUCCESS:
@ -567,12 +563,9 @@ class ExpansionImplWithCm(ExpansionImpl):
"""
Update ss_interconnect_url on old nodes.
"""
pg_port = self.get_dss_env_root("PGPORT")
if os.getuid() == 0:
get_url_cmd = 'su - %s -c "source %s; gsql -d postgres -p %s -c \'show ss_interconnect_url;\'"' % (
self.user, self.envFile, pg_port)
else:
get_url_cmd = "source %s; gsql -d postgres -p %s -c 'show ss_interconnect_url;'" % (self.envFile, pg_port)
pgdata_path = self.get_env("PGDATA")
conf_file = pgdata_path + os.sep + 'postgresql.conf'
get_url_cmd = "grep -n 'ss_interconnect_url' %s" % conf_file
sta, out = subprocess.getstatusoutput(get_url_cmd)
url = out.split('\n')[2]
url_port = (url.split(',')[0]).split(':')[-1]
@ -596,7 +589,6 @@ class ExpansionImplWithCm(ExpansionImpl):
"""
Update cm_resource.json on old nodes.
"""
old_nodes = list(set(self.xml_cluster_info.dbNodes).difference(set(self.new_nodes)))
get_last_cmd = "cm_ctl res --list --res_name='dss' --list_inst | awk 'END{print $5, $7, $9, $10}'"
(status, output) = subprocess.getstatusoutput(get_last_cmd)
node_id, inst_id, dss_home, dn_home = output.split(' ')
@ -609,7 +601,7 @@ class ExpansionImplWithCm(ExpansionImpl):
if i < len(self.new_nodes) - 1:
update_cmd += " && "
self.logger.debug("Command for update cm_resource.json: %s" % update_cmd)
for host in ExpansionImplWithCm.get_node_names(old_nodes):
for host in ExpansionImplWithCm.get_node_names(self.old_nodes):
sshTool = SshTool([host], timeout=300)
result_map, _ = sshTool.getSshStatusOutput(update_cmd, [])
if result_map[host] == DefaultValue.SUCCESS:
@ -619,27 +611,6 @@ class ExpansionImplWithCm(ExpansionImpl):
raise Exception("Failed to update cm_resource.json on %s" % host)
self.logger.log("Successfully update cm_resource.json on old nodes.")
def get_cluster_nodes(self):
"""
Get nodes in current cluster.
"""
cmd = "source %s; cm_ctl query -Cv | awk 'END{print}'" % self.envFile
if os.getuid() == 0:
cmd = f"su - {self.user} -c '{cmd}'"
status, output = subprocess.getstatusoutput(cmd)
if status != 0:
self.logger.debug("Failed to get nodes in current cluster.")
raise Exception("Failed to get nodes in current cluster.")
cur_nodes = output.split('|')
old_names = list()
for node in cur_nodes:
if node == ' ':
continue
tmp_list = list(filter(None, node.split(' ')))
old_names.append(tmp_list[1])
return old_names
def update_old_dss_info(self):
"""
Update new node's dss on old nodes.
@ -647,9 +618,10 @@ class ExpansionImplWithCm(ExpansionImpl):
if self.xml_cluster_info.enable_dss != 'on':
return
old_names = self.get_cluster_nodes()
node_list = self.update_dss_inst(old_names)
self.update_guc_url(node_list, old_names)
old_hosts = [node.name for node in self.old_nodes]
new_hosts = [node.name for node in self.new_nodes]
node_list = self.update_dss_inst(old_hosts, old_hosts + new_hosts)
self.update_guc_url(node_list.split('=')[-1], old_hosts)
def do_preinstall(self):
"""
@ -751,16 +723,172 @@ class ExpansionImplWithCm(ExpansionImpl):
self.logger.debug("ctlReloadCmd: " + ctlReloadCmd)
CmdExecutor.execCommandWithMode(ctlReloadCmd, self.ssh_tool, host_list=[localHost])
def recover_old_dss_info(self, old_hosts):
"""
Recover old nodes dss info when expansion failed.
"""
node_list = self.update_dss_inst(old_hosts, old_hosts)
self.update_guc_url(node_list.split('=')[-1], old_hosts)
self.logger.debug("Successfully recover dss_nodes_list and ss_interconnect_url on old nodes.")
def recover_new_nodes(self):
"""
Uninstall app on new nodes when expansion failed.
"""
new_hosts = [node.name for node in self.new_nodes]
cmd = f"source {self.envFile}; gs_uninstall --delete-data -L"
if os.getuid() == 0:
cmd = f"su - {self.user} -c '{cmd}'"
CmdExecutor.execCommandWithMode(cmd, self.ssh_tool, host_list=new_hosts)
self.logger.debug("Successfully uninstall app on new nodes.")
def recover_old_cm_res(self, old_hosts):
"""
Recover cm_resource.json on old nodes.
"""
cm_component = CM_OLAP()
cm_component.binPath = os.path.realpath(os.path.join(
self.context.clusterInfo.appPath, "bin"))
local_node = [node for node in self.context.clusterInfo.dbNodes
if NetUtil.GetHostIpOrName() == node.name][0]
local_name = NetUtil.getHostName()
cm_component.instInfo = local_node.cmagents[0]
cm_res_file = os.path.realpath(
os.path.join(cm_component.instInfo.datadir, "cm_resource.json"))
with open(cm_res_file, "r") as f:
data = json.load(f)
insts_list = data["resources"][1]["instances"]
for i, _ in enumerate(self.new_nodes):
insts_list.pop()
with open(cm_res_file, "w") as f:
json.dump(data, f, indent=4)
for host in old_hosts:
if host == local_name:
continue
host_ssh = SshTool([host])
host_ssh.scpFiles(cm_res_file, cm_res_file)
self.logger.debug("Successfully recover cm_resource.json on old nodes.")
def recover_static_conf(self):
"""
Recover the cluster static conf and save it.
"""
master_instance = 0
local_name = NetUtil.GetHostIpOrName()
self.logger.log("Begin to recover the cluster static conf.")
tmp_dir = EnvUtil.getEnvironmentParameterValue("PGHOST", self.user, self.envFile)
config_path = "%s/bin/cluster_static_config" % self.static_cluster_info.appPath
for node in self.new_nodes:
dn_loop = self.context.clusterInfo.getDbNodeByName(node.name)
self.context.clusterInfo.dbNodes.remove(dn_loop)
for node in self.old_nodes:
if node.name == local_name:
if len(node.datanodes) > 0:
node.datanodes[0].instanceType = master_instance
break
for node in self.old_nodes:
if node.name == local_name:
self.context.clusterInfo.saveToStaticConfig(config_path, node.id)
continue
config_path_dn = "%s/cluster_static_config_%s" % (tmp_dir, node.name)
self.context.clusterInfo.saveToStaticConfig(config_path_dn, node.id)
cmd = "source %s; gs_om -t refreshconf" % self.envFile
subprocess.getstatusoutput(cmd)
old_hosts = [node.name for node in self.old_nodes]
for host in old_hosts:
host_ssh = SshTool([host])
if host != local_name:
config_name = "%s/cluster_static_config_%s" % (tmp_dir, host)
host_ssh.scpFiles(config_name, config_path, [host], self.envFile)
try:
os.unlink(config_name)
except FileNotFoundError:
pass
self.logger.log("End of recover the cluster static conf.")
def get_normal_host(self):
"""
Get host status normal.
"""
cmd = "cm_ctl query -Cv | awk 'END{print}'"
status, output = subprocess.getstatusoutput(cmd)
nodes_list = list(filter(None, output.split('|')))
for node in nodes_list:
status = list(filter(None, node.split(' ')))[-1]
node_name = list(filter(None, node.split(' ')))[1]
if status == "Normal":
return node_name
return None
def clean_disk(self):
"""
Delete new_node pg_xlog and pg_doublewrite.
"""
vg_name = EnvUtil.getEnv("VGNAME")
dss_home = self.get_env("DSS_HOME")
dss_inst = dss_home + '/cfg/dss_inst.ini'
get_list_cmd = "cat %s | grep DSS_NODES_LIST" % dss_inst
status, output = subprocess.getstatusoutput(get_list_cmd)
lists = (output.split('=')[-1]).split(',')
res_ids = set()
for item in lists:
res_ids.add(int(item[0]))
normal_host = self.get_normal_host()
if not normal_host:
self.logger.debug("No normal node in cluster now, please clean dss dir before next expansion.")
return
get_xlog_cmd = "dsscmd ls -p +%s | grep pg_xlog | awk '{print $6}'" % vg_name
host_ssh = SshTool([normal_host])
result_map, output = host_ssh.getSshStatusOutput(get_xlog_cmd, [normal_host], self.envFile)
out_lines = output.split('\n')
xlog_list = list()
for out_line in out_lines:
xlog_list.append(re.findall(r"pg_xlog[0-9]", out_line))
xlog_list = list(filter(None, xlog_list))
del_cmd = ""
for xlog_n in xlog_list:
if int(xlog_n[0][-1]) not in res_ids:
dw_path = 'pg_doublewrite' + xlog_n[0][-1]
pri_vgname = DssInst.get_private_vgname_by_ini(dss_home, int(xlog_n[0][-1]))
del_cmd += "dsscmd unlink -p +%s/%s; dsscmd rmdir -p +%s/%s -r; dsscmd rmdir -p +%s/%s -r;" % (
vg_name, xlog_n[0], pri_vgname, xlog_n[0], vg_name, dw_path)
result_map, _ = host_ssh.getSshStatusOutput(del_cmd, [normal_host])
if "Success" not in result_map.values():
self.logger.debug("Failed to delete xlog of new hosts.")
raise Exception("Failed to delete xlog of new hosts.")
self.logger.debug("Successfully delete xlog of new hosts.")
def recover_cluster(self):
"""
Recover cluster when expansion failed.
"""
self.logger.log("Begin to recover cluster.")
if self.xml_cluster_info.enable_dss == "on":
old_nodes = [node.name for node in self.old_nodes]
self.recover_old_dss_info(old_nodes)
self.recover_old_cm_res(old_nodes)
self.clean_disk()
self.recover_new_nodes()
self.recover_static_conf()
DefaultValue.remove_metadata_and_dynamic_config_file(self.user, self.ssh_tool, self.logger)
self.logger.log("Successfully recover cluster.")
sys.exit(1)
def check_processes(self):
"""
Check processes exist or not before restart cluster.
"""
check_cmd = ""
processes = {"om_monitor", "cm_agent", "cm_server", "dssserver"}
node_names = self.get_cluster_nodes()
processes = {"om_monitor", "cm_agent", "dssserver"}
old_nodes = [node.name for node in self.old_nodes]
for process in processes:
check_cmd = f"ps ux | grep {process} | grep -v grep | wc -l"
for node in node_names:
for node in old_nodes:
ssh_tool = SshTool([node])
result_map, output_map = ssh_tool.getSshStatusOutput(check_cmd, [])
if result_map[node] != DefaultValue.SUCCESS:
@ -768,8 +896,8 @@ class ExpansionImplWithCm(ExpansionImpl):
raise Exception(f"Failed to check process on node {node}.")
proc_num = int(output_map.split('\n')[1])
if proc_num < 1:
self.logger.debuf(f"No {process} on {node}.")
raise Exception(f"No {process} on {node}.")
self.logger.log(f"No {process} on {node}, need to expand again.")
self.recover_cluster()
self.logger.log("Successfully check processes on cluster nodes.")
def ss_restart_cluster(self):
@ -778,8 +906,8 @@ class ExpansionImplWithCm(ExpansionImpl):
"""
if self.xml_cluster_info.enable_dss != "on":
return
DefaultValue.remove_metadata_and_dynamic_config_file(self.user, self.ssh_tool, self.logger)
self.check_processes()
DefaultValue.remove_metadata_and_dynamic_config_file(self.user, self.ssh_tool, self.logger)
restart_cmd = f"source {self.envFile}; cm_ctl stop; cm_ctl start;"
status, _ = subprocess.getstatusoutput(restart_cmd)
if status != 0: