From 8cada91e73d3f310bc4211b6669da5ca87e91dda Mon Sep 17 00:00:00 2001 From: openGaussDev Date: Sun, 26 Feb 2023 19:19:08 +0800 Subject: [PATCH] =?UTF-8?q?=E9=9B=86=E4=B8=AD=E5=BC=8F=E6=94=AF=E6=8C=81vi?= =?UTF-8?q?p=E6=9C=AC=E5=9C=B0=E5=8C=96=E5=AE=89=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Match-id-c7906104d5bcad5351beb7863fb3c72c990f7c25 --- script/gs_dropnode | 3 +- script/gspylib/common/DbClusterInfo.py | 65 ++++- script/gspylib/common/OMCommand.py | 2 + script/gspylib/component/CM/CM.py | 80 +++++- .../gspylib/component/CM/CM_OLAP/CM_OLAP.py | 104 ++++++++ .../component/Kernel/DN_OLAP/DN_OLAP.py | 44 ++-- script/impl/dropnode/DropnodeImpl.py | 70 ++++++ .../impl/dropnode/drop_node_with_cm_impl.py | 62 ++++- script/impl/expansion/ExpansionImpl.py | 25 +- .../impl/expansion/expansion_impl_with_cm.py | 34 ++- script/impl/install/OLAP/InstallImplOLAP.py | 21 ++ script/local/ConfigHba.py | 4 +- script/local/config_cm_resource.py | 231 ++++++++++++++++++ 13 files changed, 702 insertions(+), 43 deletions(-) create mode 100644 script/local/config_cm_resource.py diff --git a/script/gs_dropnode b/script/gs_dropnode index befb1aa..fb915fb 100644 --- a/script/gs_dropnode +++ b/script/gs_dropnode @@ -204,8 +204,7 @@ General options: def check_repeat_process(self): """ - function: Check whether only one node be left in the cluster - return a flag + Check whether the same gs_dropnode command be run at the same time """ cmd = "ps -ef | grep 'gs_dropnode -U %s -G %s' | grep -v grep" \ % (self.user, self.group) diff --git a/script/gspylib/common/DbClusterInfo.py b/script/gspylib/common/DbClusterInfo.py index 9cdb0b4..5959f20 100644 --- a/script/gspylib/common/DbClusterInfo.py +++ b/script/gspylib/common/DbClusterInfo.py @@ -410,7 +410,7 @@ def compareObject(Object_A, Object_B, instName, tempbuffer=None, model=None, dss_ignore = [ "enable_dss", "dss_config", "dss_home", "cm_vote_disk", "cm_share_disk", "dss_pri_disks", "dss_shared_disks", "dss_vg_info", "dss_vgname", "dss_ssl_enable", - "ss_rdma_work_config", "ss_interconnect_type"] + "ss_rdma_work_config", "ss_interconnect_type", "float_ips"] for i in Object_A_list: if i.startswith("_") or ignoreCheck(Object_A, i, model) or i in dss_ignore: continue @@ -566,6 +566,8 @@ class instanceInfo(): self.listenIps = [] # ha ip self.haIps = [] + # float ip + self.float_ips = [] # port self.port = 0 # It's pool port for coordinator, and ha port for other instance @@ -771,9 +773,8 @@ class dbNodeInfo(): return count def appendInstance(self, instId, mirrorId, instRole, instanceType, - listenIps=None, - haIps=None, datadir="", ssddir="", level=1, - xlogdir="", syncNum=-1, syncNumFirst="", dcf_data=""): + listenIps=None, haIps=None, datadir="", ssddir="", level=1, + xlogdir="", syncNum=-1, syncNumFirst="", dcf_data="", float_ips=None): """ function : Classify the instance of cmserver/gtm input : int,int,String,String @@ -800,6 +801,10 @@ class dbNodeInfo(): else: dbInst.listenIps = listenIps[:] + if float_ips is not None: + if len(float_ips) != 0: + dbInst.float_ips = float_ips + if (haIps is not None): if (len(haIps) == 0): dbInst.haIps = self.backIps[:] @@ -946,6 +951,7 @@ class dbClusterInfo(): self.managerPath = "" self.replicaNum = 0 self.corePath = "" + self.float_ips = {} # add azName self.azName = "" @@ -3394,6 +3400,7 @@ class dbClusterInfo(): """ dnListenIps = None dnHaIps = None + dn_float_ips = None mirror_count_data = self.__getDataNodeCount(masterNode) if masterNode.dataNum > 0: dnListenIps = self.__readInstanceIps(masterNode.name, @@ -3403,7 +3410,12 @@ class dbClusterInfo(): dnHaIps = self.__readInstanceIps(masterNode.name, "dataHaIp", masterNode.dataNum * mirror_count_data) - + dn_float_ips = self.__readInstanceIps(masterNode.name, + "floatIpMap", + masterNode.dataNum * + mirror_count_data) + if dn_float_ips is not None: + self.__read_cluster_float_ips(dn_float_ips) dnInfoLists = [[] for row in range(masterNode.dataNum)] xlogInfoLists = [[] for row in range(masterNode.dataNum)] dcf_data_lists = [[] for row in range(masterNode.dataNum)] @@ -3593,7 +3605,9 @@ class dbClusterInfo(): dnInfoList[0], syncNum=syncNumList[i], syncNumFirst=syncNumFirstList[i], - dcf_data=dcf_data_list[0]) + dcf_data=dcf_data_list[0], + float_ips=dn_float_ips[instIndex] \ + if dn_float_ips else []) else: masterNode.appendInstance(instId, groupId, INSTANCE_ROLE_DATANODE, @@ -3602,7 +3616,9 @@ class dbClusterInfo(): dnHaIps[instIndex], dnInfoList[0], syncNum=syncNumList[i], - syncNumFirst=syncNumFirstList[i]) + syncNumFirst=syncNumFirstList[i], + float_ips=dn_float_ips[instIndex] \ + if dn_float_ips else []) else: masterNode.appendInstance(instId, groupId, INSTANCE_ROLE_DATANODE, @@ -3612,7 +3628,9 @@ class dbClusterInfo(): dnInfoList[0], xlogdir=xlogInfoList[0], syncNum=syncNumList[i], - syncNumFirst=syncNumFirstList[i]) + syncNumFirst=syncNumFirstList[i], + float_ips=dn_float_ips[instIndex] \ + if dn_float_ips else []) instIndex += 1 @@ -3694,7 +3712,9 @@ class dbClusterInfo(): dnInfoList[nodeLen * 2 + 2], syncNum=syncNumList[i], syncNumFirst=syncNumFirstList[i], - dcf_data=dcf_data_list[0]) + dcf_data=dcf_data_list[0], + float_ips=dn_float_ips[instIndex] \ + if dn_float_ips else []) else: dbNode.appendInstance(instId, groupId, INSTANCE_ROLE_DATANODE, @@ -3703,7 +3723,9 @@ class dbClusterInfo(): dnHaIps[instIndex], dnInfoList[nodeLen * 2 + 2], syncNum=syncNumList[i], - syncNumFirst=syncNumFirstList[i]) + syncNumFirst=syncNumFirstList[i], + float_ips=dn_float_ips[instIndex] \ + if dn_float_ips else []) else: if self.enable_dcf == "on": dbNode.appendInstance(instId, groupId, @@ -3715,7 +3737,9 @@ class dbClusterInfo(): xlogdir=xlogInfoList[nodeLen + 1], syncNum=syncNumList[i], syncNumFirst=syncNumFirstList[i], - dcf_data=dcf_data_list[0]) + dcf_data=dcf_data_list[0], + float_ips=dn_float_ips[instIndex] \ + if dn_float_ips else []) else: dbNode.appendInstance(instId, groupId, INSTANCE_ROLE_DATANODE, @@ -3725,7 +3749,9 @@ class dbClusterInfo(): dnInfoList[nodeLen * 2 + 2], xlogdir=xlogInfoList[nodeLen + 1], syncNum=syncNumList[i], - syncNumFirst=syncNumFirstList[i]) + syncNumFirst=syncNumFirstList[i], + float_ips=dn_float_ips[instIndex] \ + if dn_float_ips else []) if dbNode.cascadeRole == "on": if self.enable_dcf != "on": for inst in dbNode.datanodes: @@ -4862,3 +4888,18 @@ class dbClusterInfo(): if dbNode.id == inputid: return dbNode return None + + def __read_cluster_float_ips(self, dn_float_ips): + """ + Read cluster global info(float IP) to dbClusterInfo + """ + for ips_tmp in dn_float_ips: + for res_name in ips_tmp: + if res_name not in self.float_ips: + ret_status, ret_value = ClusterConfigFile.readOneClusterConfigItem( + xmlRootNode, res_name, "CLUSTER") + if ret_status == 0: + self.float_ips[res_name] = ret_value.strip() + else: + raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % \ + "float IP." + " Error: \n%s" % ret_value) diff --git a/script/gspylib/common/OMCommand.py b/script/gspylib/common/OMCommand.py index 82f753a..337d08f 100644 --- a/script/gspylib/common/OMCommand.py +++ b/script/gspylib/common/OMCommand.py @@ -75,6 +75,8 @@ class OMCommand(): Current_Path + "/../../local/CleanOsUser.py"), "Local_Config_Hba": os.path.normpath( Current_Path + "/../../local/ConfigHba.py"), + "Local_Config_CM_Res": os.path.normpath( + Current_Path + "/../../local/config_cm_resource.py"), "Local_Config_Instance": os.path.normpath( Current_Path + "/../../local/ConfigInstance.py"), "Local_Init_Instance": os.path.normpath( diff --git a/script/gspylib/component/CM/CM.py b/script/gspylib/component/CM/CM.py index 25d9435..48d6c49 100644 --- a/script/gspylib/component/CM/CM.py +++ b/script/gspylib/component/CM/CM.py @@ -66,6 +66,56 @@ class DssInstAttr(): ' ', '').replace('{', '"').replace('}', '"').replace(';', ' ') +class VipResAttr(): + """ + VIP resource attribute + """ + def __init__(self, float_ip): + self.resources_type = "VIP" + self.float_ip = float_ip + + def __str__(self): + return str(vars(self)).replace(':', '=').replace('\'', '').replace( + ' ', '').replace('{', '"').replace('}', '"').replace(';', ' ') + + +class VipInstAttr(): + """ + VIP instance attribute + """ + def __init__(self, base_ip): + self.base_ip = base_ip + + def __str__(self): + return str(vars(self)).replace(':', '=').replace('\'', '').replace( + ' ', '').replace('{', '"').replace('}', '"').replace(';', ' ') + + +class VipAddInst(): + """ + VIP add instance attribute + """ + def __init__(self, res_instance_id, node_id): + self.node_id = node_id + self.res_instance_id = res_instance_id + + def __str__(self): + return str(vars(self)).replace(':', '=').replace('\'', '').replace( + ' ', '').replace('{', '"').replace('}', '"').replace(';', ' ') + + +class VipDelInst(): + """ + VIP del instance attribute + """ + def __init__(self, res_instance_id): + self.res_instance_id = res_instance_id + + def __str__(self): + return str(vars(self)).replace(':', '=').replace('\'', '').replace( + ' ', '').replace('{', '"').replace('}', '"').replace(';', ' ') + + class CmResCtrlCmd(): def __init__(self, action='add', name='', attr=''): @@ -77,8 +127,34 @@ class CmResCtrlCmd(): cmd = '' if self.action == 'add': cmd = 'cm_ctl res --add --res_name {} --res_attr={}'.format( - self.attr_name, self.attr) + self.attr_name, self.attr) elif self.action == 'edit': cmd = 'cm_ctl res --edit --res_name {} --add_inst={}'.format( - self.attr_name, self.attr) + self.attr_name, self.attr) + return cmd + + +class VipCmResCtrlCmd(): + """ + VipCmResCtrlCmd + """ + def __init__(self, action, name, inst="", attr=""): + self.action = action + self.name = name + self.inst = inst + self.attr = attr + + def __str__(self): + cmd = "" + if self.action == "add_res": + cmd = "cm_ctl res --add --res_name=\"%s\" --res_attr=%s" % \ + (self.name, self.attr) + elif self.action == "del_res": + cmd = "cm_ctl res --del --res_name=\"%s\"" % self.name + elif self.action == "add_inst": + cmd = "cm_ctl res --edit --res_name=\"%s\" --add_inst=%s --inst_attr=%s" % \ + (self.name, self.inst, self.attr) + elif self.action == "del_inst": + cmd = "cm_ctl res --edit --res_name=\"%s\" --del_inst=%s" % \ + (self.name, self.inst) return cmd diff --git a/script/gspylib/component/CM/CM_OLAP/CM_OLAP.py b/script/gspylib/component/CM/CM_OLAP/CM_OLAP.py index c457f3a..ecd0707 100644 --- a/script/gspylib/component/CM/CM_OLAP/CM_OLAP.py +++ b/script/gspylib/component/CM/CM_OLAP/CM_OLAP.py @@ -30,6 +30,8 @@ try: from gspylib.common.DbClusterStatus import DbClusterStatus from gspylib.os.gsfile import g_file from gspylib.component.CM.CM import CM, CmResAttr, CmResCtrlCmd, DssInstAttr + from gspylib.component.CM.CM import VipInstAttr, VipAddInst, VipDelInst + from gspylib.component.CM.CM import VipCmResCtrlCmd, VipResAttr from gspylib.common.DbClusterInfo import dbClusterInfo from base_utils.os.crontab_util import CrontabUtil from base_utils.os.env_util import EnvUtil @@ -872,3 +874,105 @@ class CM_OLAP(CM): raise Exception( 'Failed to initialize the CM resource file. Error: {}'.format( str(out))) + + def get_add_cm_res_cmd(self, cm_res_info): + """ + Get add CM resource information cmd for VIP + """ + cmd_list = [] + for res_name, _list in cm_res_info.items(): + check_res = "cm_ctl res --list --res_name=\"%s\"" % res_name + stat, out= subprocess.getstatusoutput(check_res) + if stat != 0 or not out: + cmd_list.append(str(VipCmResCtrlCmd("add_res", res_name, + attr=VipResAttr(_list[0][0])))) + for _tup in _list: + check_inst = "cm_ctl res --list --res_name=\"%s\" --list_inst" \ + " | grep \"%s\"" % (res_name, _tup[1]) + stat, out= subprocess.getstatusoutput(check_inst) + if stat != 0 or not out: + cmd_list.append(str(VipCmResCtrlCmd("add_inst", res_name, + inst=VipAddInst(_tup[2], _tup[3]), + attr=VipInstAttr(_tup[1])))) + cmd = "source %s; %s" % (EnvUtil.getMpprcFile(), ' ;'.join(cmd_list)) + self.logger.log("Add cm resource information cmd: \n%s" % cmd) + return cmd + + def _get_cm_res_info(self, base_ip): + """ + Get the CM resource info for reducing + """ + # Get resource name + cmd = "cm_ctl res --list | grep \"VIP\" | awk -F \"|\" '{print $1}'" \ + " | xargs -i cm_ctl res --list --res_name={} --list_inst" \ + " | grep \"base_ip=%s\" | awk -F \"|\" '{print $1}'" % base_ip + stat, out= subprocess.getstatusoutput(cmd) + if stat != 0: + raise Exception("Failed to get res name. Cmd: \n%s" % cmd) + if not out: + return "", -1, -1 + res_name = out.strip() + + # Get intance ID + cmd = "cm_ctl res --list --res_name=\"%s\" --list_inst | grep " \ + "\"base_ip=%s\" | awk -F \"|\" '{print $4}'" % (res_name, base_ip) + stat, out= subprocess.getstatusoutput(cmd) + if stat != 0 or not out: + raise Exception("Failed to get the intance ID. Cmd: \n%s" % cmd) + inst_id = int(out.strip()) + + # Get the number of instances contained in a resource + cmd = "cm_ctl res --list --res_name=\"%s\" --list_inst" \ + " | grep \"VIP\" | wc -l" % res_name + stat, out= subprocess.getstatusoutput(cmd) + if stat != 0 or not out: + raise Exception("Failed to get the number of instances. Cmd: %s" % cmd) + inst_num = int(out.strip()) + + self.logger.log("CM resource info: res_name=%s,inst_id=%d,inst_num=%d q" \ + "" % (res_name, inst_id, inst_num)) + return res_name, inst_id, inst_num + + def get_reduce_cm_res_cmd(self, base_ips): + """ + Get reduce cm resource information cmd for VIP + """ + cmd_list = [] + for base_ip in base_ips: + res_name, inst_id, inst_num = self._get_cm_res_info(base_ip) + if not res_name: + self.logger.log("The base IP is not found: %s" % base_ip) + continue + if inst_num > 1: + cmd_list.append(str(VipCmResCtrlCmd("del_inst", res_name, + inst=VipDelInst(inst_id)))) + else: + cmd_list.append(str(VipCmResCtrlCmd("del_res", res_name))) + cmd = "source %s; %s" % (EnvUtil.getMpprcFile(), ' ;'.join(cmd_list)) + self.logger.log("Reduce cm resource information cmd: \n%s" % cmd) + return cmd + + def config_cm_res_json(self, base_ips, cm_res_info): + """ + Config cm resource file for vip + """ + if not base_ips and not cm_res_info: + raise Exception("The parameters cannot be empty at the same time") + + cmd = self.get_reduce_cm_res_cmd(base_ips) + stat, out = subprocess.getstatusoutput(cmd) + if stat != 0: + raise Exception("Failed to reduce the CM resource for VIP." \ + " Cmd: \n%s, Error: \n%s" % (cmd, str(out))) + + cmd = self.get_add_cm_res_cmd(cm_res_info) + stat, out = subprocess.getstatusoutput(cmd) + if stat != 0: + raise Exception("Failed to add the CM resource for VIP." \ + " Cmd: \n%s, Error: \n%s" % (cmd, str(out))) + + cmd = "cm_ctl res --check" + stat, out = subprocess.getstatusoutput(cmd) + if stat != 0: + raise Exception("Failed to config the CM resource file for VIP." \ + " Cmd: \n%s, Error: \n%s" % (cmd, str(out))) diff --git a/script/gspylib/component/Kernel/DN_OLAP/DN_OLAP.py b/script/gspylib/component/Kernel/DN_OLAP/DN_OLAP.py index 3601835..5b332e5 100644 --- a/script/gspylib/component/Kernel/DN_OLAP/DN_OLAP.py +++ b/script/gspylib/component/Kernel/DN_OLAP/DN_OLAP.py @@ -453,7 +453,7 @@ class DN_OLAP(Kernel): self.modifyDummpyStandbyConfigItem() - def setPghbaConfig(self, clusterAllIpList, try_reload=False): + def setPghbaConfig(self, clusterAllIpList, try_reload=False, float_ips=None): """ """ principal = None @@ -472,39 +472,47 @@ class DN_OLAP(Kernel): # build ip string list # Every 1000 records merged into one i = 0 - GUCParasStr = "" + guc_paras_str = "" GUCParasStrList = [] pg_user = ClusterUser.get_pg_user() - for ipAddress in clusterAllIpList: + for ip_address in clusterAllIpList: i += 1 # Set the initial user and initial database access permissions if principal is None: - GUCParasStr += "-h \"host all %s %s/32 %s\" " % \ - (pg_user, ipAddress, METHOD_TRUST) - GUCParasStr += "-h \"host all all %s/32 %s\" " % (ipAddress, METHOD_SHA) - + if ip_address.startswith("floatIp"): + guc_paras_str += "-h \"host all all %s/32 %s\" " % \ + (float_ips[ip_address], METHOD_SHA) + else: + guc_paras_str += "-h \"host all %s %s/32 %s\" " % \ + (pg_user, ip_address, METHOD_TRUST) + guc_paras_str += "-h \"host all all %s/32 %s\" " % \ + (ip_address, METHOD_SHA) else: - GUCParasStr += "-h \"host all %s %s/32 gss " \ - "include_realm=1 krb_realm=%s\" "\ - % (pg_user, ipAddress, principal) - GUCParasStr += "-h \"host all all %s/32 %s\" " % (ipAddress, METHOD_SHA) + if ip_address.startswith("floatIp"): + guc_paras_str += "-h \"host all all %s/32 %s\" " % \ + (float_ips[ip_address], METHOD_SHA) + else: + guc_paras_str += "-h \"host all %s %s/32 gss include_realm=1 " \ + " krb_realm=%s\" " % (pg_user, ip_address, principal) + guc_paras_str += "-h \"host all all %s/32 %s\" " % \ + (ip_address, METHOD_SHA) if (i % MAX_PARA_NUMBER == 0): - GUCParasStrList.append(GUCParasStr) + GUCParasStrList.append(guc_paras_str) i = 0 - GUCParasStr = "" + guc_paras_str = "" # Used only streaming disaster cluster streaming_dn_ips = self.get_streaming_relate_dn_ips(self.instInfo) if streaming_dn_ips: for dn_ip in streaming_dn_ips: - GUCParasStr += "-h \"host all %s %s/32 %s\" " \ + guc_paras_str += "-h \"host all %s %s/32 %s\" " \ % (pg_user, dn_ip, METHOD_TRUST) - GUCParasStr += "-h \"host all all %s/32 %s\" " \ + guc_paras_str += "-h \"host all all %s/32 %s\" " \ % (dn_ip, METHOD_SHA) ip_segment = '.'.join(dn_ip.split('.')[:2]) + ".0.0/16" - GUCParasStr += "-h \"host replication all %s sha256\" " % ip_segment + guc_paras_str += "-h \"host replication all %s sha256\" " % ip_segment - if (GUCParasStr != ""): - GUCParasStrList.append(GUCParasStr) + if (guc_paras_str != ""): + GUCParasStrList.append(guc_paras_str) for parasStr in GUCParasStrList: self.doGUCConfig("set", parasStr, True, try_reload=try_reload) diff --git a/script/impl/dropnode/DropnodeImpl.py b/script/impl/dropnode/DropnodeImpl.py index f26e0bd..7a7b1d0 100644 --- a/script/impl/dropnode/DropnodeImpl.py +++ b/script/impl/dropnode/DropnodeImpl.py @@ -433,6 +433,74 @@ class OperCommon: "[gs_dropnode]End to backup parameter config file on %s." % host) return '%s/parameter_%s.tar' % (tmpPath, host) + def check_is_vip_mode(self): + """ + Check whether the current mode is VIP + """ + cmd = "cm_ctl res --list | awk -F \"|\" '{print $2}' | grep -w \"VIP\"" + self.logger.log("Command for Checking VIP mode: %s" % cmd) + stat, out= subprocess.getstatusoutput(cmd) + if stat != 0 or not out: + return False + return True + + def get_float_ip_from_json(self, base_ip, host_ips_for_del): + """ + Get float IP from json file by cmd + """ + cmd = "cm_ctl res --list | grep \"VIP\" | awk -F \"|\" '{print $1}' | " \ + "xargs -i cm_ctl res --list --res_name={} --list_inst |grep \"base_ip=%s\""\ + " | awk -F \"|\" '{print $1}' | xargs -i cm_ctl res --list --res_name={}" \ + " | grep \"VIP\" | awk -F \"|\" '{print $3}'" % base_ip + stat, out= subprocess.getstatusoutput(cmd) + if stat != 0: + GaussLog.exitWithError(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd) + if not out: + self.logger.log("Failed to get float IP from json. Cmd: %s" % cmd) + return "" + float_ip = re.findall("float_ip=([\.\d]+)", out.strip())[0] + + cmd = "cm_ctl res --list | grep \"VIP\" | awk -F \"|\" '{print $1}' | " \ + "xargs -i cm_ctl res --list --res_name={} | grep \"float_ip=%s\" | " \ + "awk -F \"|\" '{print $1}' | xargs -i cm_ctl res --list --res_name={} " \ + "--list_inst | grep \"VIP\" | awk -F \"|\" '{print $5}'" % float_ip + stat, out= subprocess.getstatusoutput(cmd) + if stat != 0 or not out: + raise Exception("Failed to get base IP list from json. Cmd: %s" % cmd) + for item in out.split('\n'): + _ip = re.findall("base_ip=([\.\d]+)", item.strip())[0] + if _ip not in host_ips_for_del: + return "" + + self.logger.log("Successfully get float IP from json, %s." % float_ip) + return float_ip + + def get_float_ip_config(self, host, dn_dir, host_ips_for_del, ssh_tool, env_file): + """ + Get float IP configuration str + """ + if not self.check_is_vip_mode(): + self.logger.log("The current cluster does not support VIP.") + return "" + + float_ips_for_del = [] + for _ip in host_ips_for_del: + float_ip = self.get_float_ip_from_json(_ip, host_ips_for_del) + if float_ip and float_ip not in float_ips_for_del: + float_ips_for_del.append(float_ip) + cmd = "grep '^host.*sha256' %s" % os.path.join(dn_dir, 'pg_hba.conf') + stat_map, output = ssh_tool.getSshStatusOutput(cmd, [host], env_file) + if stat_map[host] != 'Success': + self.logger.debug("[gs_dropnode]Parse pg_hba file failed:" + output) + GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35809"]) + ret = "" + for float_ip in float_ips_for_del: + if float_ip in output: + s = output.rfind('host', 0, output.find(float_ip)) + e = output.find('\n', output.find(float_ip), len(output)) + ret += output[s:e] + '|' + return ret + def parseConfigFile(self, host, dirDn, dnId, hostIpListForDel, sshTool, envfile): """ @@ -475,6 +543,8 @@ class OperCommon: s = output.rfind('host', 0, output.find(ip)) e = output.find('\n', output.find(ip), len(output)) resultDict['pghbaStr'] += output[s:e] + '|' + resultDict['pghbaStr'] += self.get_float_ip_config(host, dirDn, hostIpListForDel, + sshTool, envfile) self.logger.log( "[gs_dropnode]End to parse parameter config file on %s." % host) return resultDict diff --git a/script/impl/dropnode/drop_node_with_cm_impl.py b/script/impl/dropnode/drop_node_with_cm_impl.py index 6d1fc0e..28c3975 100644 --- a/script/impl/dropnode/drop_node_with_cm_impl.py +++ b/script/impl/dropnode/drop_node_with_cm_impl.py @@ -28,13 +28,18 @@ sys.path.append(sys.path[0] + "/../../../../") from base_utils.os.net_util import NetUtil from base_utils.os.env_util import EnvUtil from base_utils.executor.cmd_executor import CmdExecutor - +from gspylib.common.OMCommand import OMCommand from gspylib.common.ErrorCode import ErrorCode from gspylib.common.Common import DefaultValue from gspylib.component.CM.CM_OLAP.CM_OLAP import CM_OLAP from gspylib.threads.SshTool import SshTool from gspylib.os.gsfile import g_file from impl.dropnode.DropnodeImpl import DropnodeImpl +from base_utils.os.file_util import FileUtil + + +# Action type +ACTION_DROP_NODE = "drop_node" class DropNodeWithCmImpl(DropnodeImpl): @@ -83,6 +88,37 @@ class DropNodeWithCmImpl(DropnodeImpl): raise Exception("Too many cm_server nodes are dropped.A maximum of {0} cm_server " "nodes can be dropped.".format(len(all_cm_server_nodes) - 2)) + def backup_cm_res_json(self): + """ + Backup cm resource json on primary node + """ + cm_resource = os.path.realpath( + os.path.join(self.cm_component.instInfo.datadir, "cm_resource.json")) + backup_cm_res = os.path.realpath( + os.path.join(self.pghostPath, "cm_resource_bak.json")) + if not os.path.isfile(backup_cm_res): + FileUtil.cpFile(cm_resource, backup_cm_res) + + def update_cm_res_json(self): + """ + Update cm resource json file. + """ + if not self.commonOper.check_is_vip_mode(): + self.logger.log("The current cluster does not support VIP.") + return + + self.backup_cm_res_json() + self.logger.log("Updating cm resource file on exist nodes.") + del_hosts = ",".join(self.context.hostMapForDel.keys()) + cmd = "source %s; " % self.userProfile + cmd += "%s -t %s -U %s -H %s -l '%s' " % ( + OMCommand.getLocalScript("Local_Config_CM_Res"), + ACTION_DROP_NODE, self.user, del_hosts, self.context.localLog) + self.logger.debug("Command for updating cm resource file: %s" % cmd) + CmdExecutor.execCommandWithMode(cmd, self.ssh_tool, + host_list=self.context.hostMapForExist.keys()) + self.logger.log("Successfully updated cm resource file.") + def _stop_drop_node(self): """ try to stop drop nodes @@ -154,6 +190,27 @@ class DropNodeWithCmImpl(DropnodeImpl): "HINT: Maybe the cluster is continually being started in the background.\n" "You can wait for a while and check whether the cluster starts.") + def restore_cm_res_json(self): + """ + Restore cm resource json on primary node + """ + cm_resource = os.path.realpath( + os.path.join(self.cm_component.instInfo.datadir, "cm_resource.json")) + backup_cm_res = os.path.realpath( + os.path.join(self.pghostPath, "cm_resource_bak.json")) + if os.path.isfile(backup_cm_res): + FileUtil.cpFile(backup_cm_res, cm_resource) + + def remove_cm_res_backup(self): + """ + Remove cm resource backup on primary node + """ + backup_cm_res = os.path.realpath( + os.path.join(self.pghostPath, "cm_resource_bak.json")) + if os.path.isfile(backup_cm_res): + os.remove(backup_cm_res) + self.logger.log("Successfully remove cm resource backup file") + def run(self): """ start dropnode @@ -163,11 +220,14 @@ class DropNodeWithCmImpl(DropnodeImpl): self.check_drop_cm_node() self.change_user() self.logger.log("[gs_dropnode]Start to drop nodes of the cluster.") + self.restore_cm_res_json() self.checkAllStandbyState() self.dropNodeOnAllHosts() self.operationOnlyOnPrimary() + self.update_cm_res_json() self._stop_drop_node() self._generate_flag_file_on_drop_nodes() self.modifyStaticConf() self.restart_new_cluster() + self.remove_cm_res_backup() self.logger.log("[gs_dropnode] Success to drop the target nodes.") diff --git a/script/impl/expansion/ExpansionImpl.py b/script/impl/expansion/ExpansionImpl.py index 96e71f3..a411a10 100644 --- a/script/impl/expansion/ExpansionImpl.py +++ b/script/impl/expansion/ExpansionImpl.py @@ -650,6 +650,23 @@ gs_guc set -D {dn} -c "available_zone='{azName}'" if self._isAllFailed(): GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35706"] % "set guc") + def get_add_float_ip_cmd(self, host_ip): + """ + Get cmd for adding float IP to pg_hba.conf + """ + if not self.context.clusterInfo.float_ips: + self.logger.debug("The current cluster does not support VIP.") + return "" + + cmd = "" + name = self.context.backIpNameMap[host_ip] + node = self.context.clusterInfo.getDbNodeByName(name) + for inst in node.datanodes: + for float_ip in inst.float_ips: + cmd += " -h 'host all all %s/32 sha256'" % \ + self.context.clusterInfo.float_ips[float_ip] + return cmd + def addTrust(self): """ add authentication rules about new host ip in existing hosts and @@ -666,13 +683,13 @@ gs_guc set -D {dn} -c "available_zone='{azName}'" cmd = "source %s;gs_guc set -D %s" % (self.envFile, dataNode) if hostExec in self.existingHosts: for hostParam in self.context.newHostList: - cmd += " -h 'host all all %s/32 trust'" % \ - hostParam + cmd += " -h 'host all all %s/32 trust'" % hostParam + cmd += self.get_add_float_ip_cmd(hostParam) else: for hostParam in allHosts: if hostExec != hostParam: - cmd += " -h 'host all all %s/32 trust'" % \ - hostParam + cmd += " -h 'host all all %s/32 trust'" % hostParam + cmd += self.get_add_float_ip_cmd(hostParam) self.logger.debug("[%s] trustCmd:%s" % (hostExec, cmd)) sshTool = SshTool([hostExec]) sshTool.getSshStatusOutput(cmd, [hostExec], self.envFile) diff --git a/script/impl/expansion/expansion_impl_with_cm.py b/script/impl/expansion/expansion_impl_with_cm.py index 8e7880b..e7695ea 100644 --- a/script/impl/expansion/expansion_impl_with_cm.py +++ b/script/impl/expansion/expansion_impl_with_cm.py @@ -47,7 +47,9 @@ from domain_utils.cluster_file.cluster_dir import ClusterDir from gspylib.component.CM.CM_OLAP.CM_OLAP import CM_OLAP +# Action type ACTION_INSTALL_CLUSTER = "install_cluster" +ACTION_EXPAND_NODE ="expansion_node" def change_user_executor(perform_method): @@ -116,8 +118,9 @@ class ExpansionImplWithCm(ExpansionImpl): DefaultValue.MAX_DIRECTORY_MODE) create_dir_cmd += " && chown {0}:{1} {2}".format(self.user, self.group, xml_dir) self.ssh_tool.executeCommand(create_dir_cmd) - self.ssh_tool.scpFiles(self.context.xmlFile, self.context.xmlFile, - hostList=self.get_node_names(self.new_nodes)) + self.ssh_tool.scpFiles(self.context.xmlFile, self.context.xmlFile) + cmd = "chown %s:%s %s" % (self.user, self.group, self.context.xmlFile) + self.ssh_tool.executeCommand(cmd) self.logger.log("Success to send XML to new nodes") def preinstall_run(self): @@ -302,11 +305,16 @@ class ExpansionImplWithCm(ExpansionImpl): datains = node.datanodes[0] log_dir = "%s/pg_log/dn_%d" % (log_path, appname) audit_dir = "%s/pg_audit/dn_%d" % (log_path, appname) + if "127.0.0.1" in datains.listenIps: + listen_ips = "%s" % ",".join(datains.listenIps) + else: + listen_ips = "localhost,%s" % ",".join(datains.listenIps) new_nodes_para_list.extend([ (node.name, datains.datadir, "port", datains.port), (node.name, datains.datadir, "application_name", "dn_%s" % appname), (node.name, datains.datadir, "log_directory", "%s" % log_dir), - (node.name, datains.datadir, "audit_directory", "%s" % audit_dir) + (node.name, datains.datadir, "audit_directory", "%s" % audit_dir), + (node.name, datains.datadir, "listen_addresses", listen_ips) ]) for new_node_para in new_nodes_para_list: @@ -381,6 +389,9 @@ class ExpansionImplWithCm(ExpansionImpl): cmd = "source {0};gs_guc set -D {1}".format(self.envFile, new_inst.datadir) cmd += " -h 'host all %s %s/32 trust'" % (self.user, host_ip) cmd += " -h 'host all all %s/32 sha256'" % host_ip + if self.xml_cluster_info.float_ips: + cmd += " -h 'host all all %s/32 sha256'" % \ + self.xml_cluster_info.float_ips[new_inst.float_ips[0]] self.logger.log("Ready to perform command on node [{0}]. " "Command is : {1}".format(new_node.name, cmd)) CmdExecutor.execCommandWithMode(cmd, self.ssh_tool, host_list=[new_node.name]) @@ -394,6 +405,22 @@ class ExpansionImplWithCm(ExpansionImpl): self._config_new_node_hba(node) self.logger.log("Successfully set hba on all nodes.") + def _update_cm_res_json(self): + """ + Update cm resource json file. + """ + if not self.xml_cluster_info.float_ips: + self.logger.log("The current cluster does not support VIP.") + return + self.logger.log("Updating cm resource file on all nodes.") + cmd = "source %s; " % self.envFile + cmd += "%s -t %s -U %s -X '%s' -l '%s' " % ( + OMCommand.getLocalScript("Local_Config_CM_Res"), ACTION_EXPAND_NODE, + self.context.user, self.context.xmlFile, self.context.localLog) + self.logger.debug("Command for updating cm resource file: %s" % cmd) + CmdExecutor.execCommandWithMode(cmd, self.ssh_tool) + self.logger.log("Successfully updated cm resource file.") + def _config_instance(self): """ Config instance @@ -402,6 +429,7 @@ class ExpansionImplWithCm(ExpansionImpl): self.generateClusterStaticFile() self.setGucConfig() self._set_other_guc_para() + self._update_cm_res_json() self._config_pg_hba() self.distributeCipherFile() diff --git a/script/impl/install/OLAP/InstallImplOLAP.py b/script/impl/install/OLAP/InstallImplOLAP.py index e56def5..56dbb76 100644 --- a/script/impl/install/OLAP/InstallImplOLAP.py +++ b/script/impl/install/OLAP/InstallImplOLAP.py @@ -36,6 +36,9 @@ from gspylib.component.DSS.dss_checker import DssConfig ROLLBACK_FAILED = 3 +# Action type +ACTION_INSTALL_CLUSTER = "install_cluster" + class InstallImplOLAP(InstallImpl): """ @@ -221,6 +224,8 @@ class InstallImplOLAP(InstallImpl): output: NA """ # config instance applications + if self.context.clusterInfo.float_ips: + self.config_cm_res_json() self.updateInstanceConfig() self.updateHbaConfig() @@ -372,6 +377,22 @@ class InstallImplOLAP(InstallImpl): self.context.isSingle) self.context.logger.debug("Successfully configured node instance.") + def config_cm_res_json(self): + """ + Config cm resource json file. + """ + self.context.logger.log("Configuring cm resource file on all nodes.") + cmd = "source %s; " % self.context.mpprcFile + cmd += "%s -t %s -U %s -X '%s' -l '%s' " % ( + OMCommand.getLocalScript("Local_Config_CM_Res"), ACTION_INSTALL_CLUSTER, + self.context.user, self.context.xmlFile, self.context.localLog) + self.context.logger.debug( + "Command for configuring cm resource file: %s" % cmd) + CmdExecutor.execCommandWithMode(cmd, + self.context.sshTool, + self.context.isSingle) + self.context.logger.log("Successfully configured cm resource file.") + def updateHbaConfig(self): """ function: config Hba instance diff --git a/script/local/ConfigHba.py b/script/local/ConfigHba.py index 2dce53d..e0db8d1 100644 --- a/script/local/ConfigHba.py +++ b/script/local/ConfigHba.py @@ -203,6 +203,7 @@ class ConfigHba(LocalBaseOM): for inst in nodeinfo.datanodes: self.allIps += inst.haIps self.allIps += inst.listenIps + self.allIps += inst.float_ips # get all ips. Remove the duplicates ips self.allIps = DefaultValue.Deduplication(self.allIps) @@ -260,7 +261,8 @@ class ConfigHba(LocalBaseOM): self.logger.debug("The %s does not exist." % hbaFile) return - component.setPghbaConfig(self.allIps, try_reload=self.try_reload) + component.setPghbaConfig(self.allIps, try_reload=self.try_reload, + float_ips=self.clusterInfo.float_ips) if len(self.removeIps) != 0: component.removeIpInfoOnPghbaConfig(self.removeIps) self.remove_streaming_config(component) diff --git a/script/local/config_cm_resource.py b/script/local/config_cm_resource.py new file mode 100644 index 0000000..9539f60 --- /dev/null +++ b/script/local/config_cm_resource.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 +#-*- coding:utf-8 -*- +############################################################################## +#Copyright (c): 2020-2025, Huawei Tech. Co., Ltd. +#FileName : config_cm_resource.py +#Version : openGauss +#Date : 2023-02-12 +#Description : config_cm_resource.py is a utility to config CM resource file. +############################################################################## + +import os +import sys +import getopt + +sys.path.append(sys.path[0] + "/../") +from gspylib.common.GaussLog import GaussLog +from gspylib.common.ErrorCode import ErrorCode +from gspylib.common.LocalBaseOM import LocalBaseOM +from domain_utils.domain_common.cluster_constants import ClusterConstants +from domain_utils.cluster_file.cluster_log import ClusterLog +from gspylib.threads.parallelTool import parallelTool +from base_utils.os.net_util import NetUtil + + +# Global variables define +g_opts = None + +# Action type +ACTION_INSTALL_CLUSTER = "install_cluster" +ACTION_EXPAND_NODE ="expansion_node" +ACTION_DROP_NODE = "drop_node" + + +class CmdOptions(): + """ + Command line parameters + """ + + def __init__(self): + """ + """ + self.action_type = "" + self.cluster_user = "" + self.cluster_conf = "" + self.log_file = "" + self.drop_nodes = [] + + +def parse_command_line(): + """ + Parse command line + """ + try: + opts, args = getopt.getopt(sys.argv[1:], "t:U:X:l:H:") + except Exception as e: + GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50000"] % str(e)) + + if len(args) > 0: + GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50000"] % str(args[0])) + + global g_opts + g_opts = CmdOptions() + + for (key, value) in opts: + if key == "-t": + g_opts.action_type = value + elif key == "-U": + g_opts.cluster_user = value + elif key == "-X": + g_opts.cluster_conf = value + elif key == "-l": + g_opts.log_file = value + elif key == "-H": + g_opts.drop_nodes = value.split(',') + +def check_parameters(): + """ + Check parameters + """ + if not g_opts.action_type: + GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50001"] % 't' + ".") + + if not g_opts.drop_nodes and g_opts.action_type == ACTION_DROP_NODE: + GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50001"] % 'H' + ".") + + if not g_opts.cluster_user: + GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50001"] % 'U' + ".") + + if g_opts.cluster_conf: + if not os.path.exists(g_opts.cluster_conf): + GaussLog.exitWithError(ErrorCode.GAUSS_502["GAUSS_50201"] % g_opts.cluster_conf) + + if not g_opts.log_file: + g_opts.log_file = ClusterLog.getOMLogPath( + ClusterConstants.LOCAL_LOG_FILE, g_opts.cluster_user, "") + + +class ConfigCMResource(LocalBaseOM): + """ + class: ConfigCMResource + """ + + def __init__(self, action_type, cluster_user, cluster_conf, log_file, drop_nodes): + """ + Init configuration on local node + """ + LocalBaseOM.__init__(self, log_file, cluster_user, cluster_conf) + if self.clusterConfig == "": + self.readConfigInfo() + else: + self.readConfigInfoByXML() + + # Check user information + self.getUserInfo() + if self.user != cluster_user.strip(): + self.logger.debug("User parameter : %s." % self.user) + self.logger.logExit(ErrorCode.GAUSS_503["GAUSS_50315"] + % (self.user, self.clusterInfo.appPath)) + self.initComponent() + + self.cm_res_info = {} + self.base_ips = [] + self.action_type = action_type + self.drop_nodes = drop_nodes + + def get_cm_res_info(self): + """ + Get CM resource information for adding + """ + # get all node names + node_names = self.clusterInfo.getClusterNodeNames() + for node_name in node_names: + node_info = self.clusterInfo.getDbNodeByName(node_name) + for inst in node_info.datanodes: + for i, res_name in enumerate(inst.float_ips): + _tup = (self.clusterInfo.float_ips[res_name], inst.listenIps[i], + inst.instanceId, node_info.id) + if res_name not in self.cm_res_info: + self.cm_res_info[res_name] = [_tup] + else: + self.cm_res_info[res_name].append(_tup) + + self.logger.log("Successfully get cm res info: \n%s" % str(self.cm_res_info)) + + def get_base_ips(self): + """ + Get base IP for reducing + """ + for node_name in self.drop_nodes: + node_info = self.clusterInfo.getDbNodeByName(node_name) + for inst in node_info.datanodes: + self.base_ips.extend(inst.listenIps) + + def _config_an_instance(self, component): + """ + Config CM resource file for single component + """ + # check instance data directory + inst_type = component.instInfo.datadir.split('/')[-1].strip() + if inst_type != "cm_agent": + self.logger.log("Current instance is not cm_agent") + return + + if not os.path.exists(component.instInfo.datadir): + raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % \ + "data directory of the cm_agent instance") + + component.config_cm_res_json(self.base_ips, self.cm_res_info) + + def add_cm_res_info(self): + """ + Config CM resource file for "install/expansion" + """ + self.get_cm_res_info() + component_list = self.cmCons + try: + parallelTool.parallelExecute(self._config_an_instance, component_list) + except Exception as e: + raise Exception(str(e)) + + def reduce_cm_res_info(self): + """ + Config CM resource file for "dropnode" + """ + self.get_base_ips() + component_list = self.cmCons + try: + parallelTool.parallelExecute(self._config_an_instance, component_list) + except Exception as e: + raise Exception(str(e)) + + def run(self): + """ + Config CM resource file + """ + fun_dict = {ACTION_INSTALL_CLUSTER : self.add_cm_res_info, + ACTION_EXPAND_NODE : self.add_cm_res_info, + ACTION_DROP_NODE : self.reduce_cm_res_info} + + if self.action_type in list(fun_dict.keys()): + fun_dict[self.action_type]() + else: + raise Exception(ErrorCode.GAUSS_500["GAUSS_50004"] % 't' + \ + " Value: %s." % self.action_type) + + self.logger.log("Successfully configured CM resource file on node[%s]" % \ + NetUtil.GetHostIpOrName()) + + +if __name__ == '__main__': + """ + function: Main function + 1.Parse command line + 2.Check parameter + 3.Read config from xml config file + 4.Get the CM resource information to be configured + 5.Config CM resource file + input : NA + output: NA + """ + try: + parse_command_line() + check_parameters() + configer = ConfigCMResource(g_opts.action_type, g_opts.cluster_user, + g_opts.cluster_conf, g_opts.log_file, g_opts.drop_nodes) + configer.run() + + except Exception as e: + GaussLog.exitWithError(str(e)) + + sys.exit(0)