!410 支持VIP本地安装

Merge pull request !410 from 杨皓/master
This commit is contained in:
opengauss-bot 2023-02-28 09:04:36 +00:00 committed by Gitee
commit ae1e91ed49
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
13 changed files with 702 additions and 43 deletions

View File

@ -204,8 +204,7 @@ General options:
def check_repeat_process(self):
"""
function: Check whether only one node be left in the cluster
return a flag
Check whether the same gs_dropnode command be run at the same time
"""
cmd = "ps -ef | grep 'gs_dropnode -U %s -G %s' | grep -v grep" \
% (self.user, self.group)

View File

@ -410,7 +410,7 @@ def compareObject(Object_A, Object_B, instName, tempbuffer=None, model=None,
dss_ignore = [
"enable_dss", "dss_config", "dss_home", "cm_vote_disk", "cm_share_disk",
"dss_pri_disks", "dss_shared_disks", "dss_vg_info", "dss_vgname", "dss_ssl_enable",
"ss_rdma_work_config", "ss_interconnect_type"]
"ss_rdma_work_config", "ss_interconnect_type", "float_ips"]
for i in Object_A_list:
if i.startswith("_") or ignoreCheck(Object_A, i, model) or i in dss_ignore:
continue
@ -566,6 +566,8 @@ class instanceInfo():
self.listenIps = []
# ha ip
self.haIps = []
# float ip
self.float_ips = []
# port
self.port = 0
# It's pool port for coordinator, and ha port for other instance
@ -771,9 +773,8 @@ class dbNodeInfo():
return count
def appendInstance(self, instId, mirrorId, instRole, instanceType,
listenIps=None,
haIps=None, datadir="", ssddir="", level=1,
xlogdir="", syncNum=-1, syncNumFirst="", dcf_data=""):
listenIps=None, haIps=None, datadir="", ssddir="", level=1,
xlogdir="", syncNum=-1, syncNumFirst="", dcf_data="", float_ips=None):
"""
function : Classify the instance of cmserver/gtm
input : int,int,String,String
@ -800,6 +801,10 @@ class dbNodeInfo():
else:
dbInst.listenIps = listenIps[:]
if float_ips is not None:
if len(float_ips) != 0:
dbInst.float_ips = float_ips
if (haIps is not None):
if (len(haIps) == 0):
dbInst.haIps = self.backIps[:]
@ -946,6 +951,7 @@ class dbClusterInfo():
self.managerPath = ""
self.replicaNum = 0
self.corePath = ""
self.float_ips = {}
# add azName
self.azName = ""
@ -3394,6 +3400,7 @@ class dbClusterInfo():
"""
dnListenIps = None
dnHaIps = None
dn_float_ips = None
mirror_count_data = self.__getDataNodeCount(masterNode)
if masterNode.dataNum > 0:
dnListenIps = self.__readInstanceIps(masterNode.name,
@ -3403,7 +3410,12 @@ class dbClusterInfo():
dnHaIps = self.__readInstanceIps(masterNode.name, "dataHaIp",
masterNode.dataNum *
mirror_count_data)
dn_float_ips = self.__readInstanceIps(masterNode.name,
"floatIpMap",
masterNode.dataNum *
mirror_count_data)
if dn_float_ips is not None:
self.__read_cluster_float_ips(dn_float_ips)
dnInfoLists = [[] for row in range(masterNode.dataNum)]
xlogInfoLists = [[] for row in range(masterNode.dataNum)]
dcf_data_lists = [[] for row in range(masterNode.dataNum)]
@ -3593,7 +3605,9 @@ class dbClusterInfo():
dnInfoList[0],
syncNum=syncNumList[i],
syncNumFirst=syncNumFirstList[i],
dcf_data=dcf_data_list[0])
dcf_data=dcf_data_list[0],
float_ips=dn_float_ips[instIndex] \
if dn_float_ips else [])
else:
masterNode.appendInstance(instId, groupId,
INSTANCE_ROLE_DATANODE,
@ -3602,7 +3616,9 @@ class dbClusterInfo():
dnHaIps[instIndex],
dnInfoList[0],
syncNum=syncNumList[i],
syncNumFirst=syncNumFirstList[i])
syncNumFirst=syncNumFirstList[i],
float_ips=dn_float_ips[instIndex] \
if dn_float_ips else [])
else:
masterNode.appendInstance(instId, groupId,
INSTANCE_ROLE_DATANODE,
@ -3612,7 +3628,9 @@ class dbClusterInfo():
dnInfoList[0],
xlogdir=xlogInfoList[0],
syncNum=syncNumList[i],
syncNumFirst=syncNumFirstList[i])
syncNumFirst=syncNumFirstList[i],
float_ips=dn_float_ips[instIndex] \
if dn_float_ips else [])
instIndex += 1
@ -3694,7 +3712,9 @@ class dbClusterInfo():
dnInfoList[nodeLen * 2 + 2],
syncNum=syncNumList[i],
syncNumFirst=syncNumFirstList[i],
dcf_data=dcf_data_list[0])
dcf_data=dcf_data_list[0],
float_ips=dn_float_ips[instIndex] \
if dn_float_ips else [])
else:
dbNode.appendInstance(instId, groupId,
INSTANCE_ROLE_DATANODE,
@ -3703,7 +3723,9 @@ class dbClusterInfo():
dnHaIps[instIndex],
dnInfoList[nodeLen * 2 + 2],
syncNum=syncNumList[i],
syncNumFirst=syncNumFirstList[i])
syncNumFirst=syncNumFirstList[i],
float_ips=dn_float_ips[instIndex] \
if dn_float_ips else [])
else:
if self.enable_dcf == "on":
dbNode.appendInstance(instId, groupId,
@ -3715,7 +3737,9 @@ class dbClusterInfo():
xlogdir=xlogInfoList[nodeLen + 1],
syncNum=syncNumList[i],
syncNumFirst=syncNumFirstList[i],
dcf_data=dcf_data_list[0])
dcf_data=dcf_data_list[0],
float_ips=dn_float_ips[instIndex] \
if dn_float_ips else [])
else:
dbNode.appendInstance(instId, groupId,
INSTANCE_ROLE_DATANODE,
@ -3725,7 +3749,9 @@ class dbClusterInfo():
dnInfoList[nodeLen * 2 + 2],
xlogdir=xlogInfoList[nodeLen + 1],
syncNum=syncNumList[i],
syncNumFirst=syncNumFirstList[i])
syncNumFirst=syncNumFirstList[i],
float_ips=dn_float_ips[instIndex] \
if dn_float_ips else [])
if dbNode.cascadeRole == "on":
if self.enable_dcf != "on":
for inst in dbNode.datanodes:
@ -4862,3 +4888,18 @@ class dbClusterInfo():
if dbNode.id == inputid:
return dbNode
return None
def __read_cluster_float_ips(self, dn_float_ips):
"""
Read cluster global info(float IP) to dbClusterInfo
"""
for ips_tmp in dn_float_ips:
for res_name in ips_tmp:
if res_name not in self.float_ips:
ret_status, ret_value = ClusterConfigFile.readOneClusterConfigItem(
xmlRootNode, res_name, "CLUSTER")
if ret_status == 0:
self.float_ips[res_name] = ret_value.strip()
else:
raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % \
"float IP." + " Error: \n%s" % ret_value)

View File

@ -75,6 +75,8 @@ class OMCommand():
Current_Path + "/../../local/CleanOsUser.py"),
"Local_Config_Hba": os.path.normpath(
Current_Path + "/../../local/ConfigHba.py"),
"Local_Config_CM_Res": os.path.normpath(
Current_Path + "/../../local/config_cm_resource.py"),
"Local_Config_Instance": os.path.normpath(
Current_Path + "/../../local/ConfigInstance.py"),
"Local_Init_Instance": os.path.normpath(

View File

@ -66,6 +66,56 @@ class DssInstAttr():
' ', '').replace('{', '"').replace('}', '"').replace(';', ' ')
class VipResAttr():
"""
VIP resource attribute
"""
def __init__(self, float_ip):
self.resources_type = "VIP"
self.float_ip = float_ip
def __str__(self):
return str(vars(self)).replace(':', '=').replace('\'', '').replace(
' ', '').replace('{', '"').replace('}', '"').replace(';', ' ')
class VipInstAttr():
"""
VIP instance attribute
"""
def __init__(self, base_ip):
self.base_ip = base_ip
def __str__(self):
return str(vars(self)).replace(':', '=').replace('\'', '').replace(
' ', '').replace('{', '"').replace('}', '"').replace(';', ' ')
class VipAddInst():
"""
VIP add instance attribute
"""
def __init__(self, res_instance_id, node_id):
self.node_id = node_id
self.res_instance_id = res_instance_id
def __str__(self):
return str(vars(self)).replace(':', '=').replace('\'', '').replace(
' ', '').replace('{', '"').replace('}', '"').replace(';', ' ')
class VipDelInst():
"""
VIP del instance attribute
"""
def __init__(self, res_instance_id):
self.res_instance_id = res_instance_id
def __str__(self):
return str(vars(self)).replace(':', '=').replace('\'', '').replace(
' ', '').replace('{', '"').replace('}', '"').replace(';', ' ')
class CmResCtrlCmd():
def __init__(self, action='add', name='', attr=''):
@ -77,8 +127,34 @@ class CmResCtrlCmd():
cmd = ''
if self.action == 'add':
cmd = 'cm_ctl res --add --res_name {} --res_attr={}'.format(
self.attr_name, self.attr)
self.attr_name, self.attr)
elif self.action == 'edit':
cmd = 'cm_ctl res --edit --res_name {} --add_inst={}'.format(
self.attr_name, self.attr)
self.attr_name, self.attr)
return cmd
class VipCmResCtrlCmd():
"""
VipCmResCtrlCmd
"""
def __init__(self, action, name, inst="", attr=""):
self.action = action
self.name = name
self.inst = inst
self.attr = attr
def __str__(self):
cmd = ""
if self.action == "add_res":
cmd = "cm_ctl res --add --res_name=\"%s\" --res_attr=%s" % \
(self.name, self.attr)
elif self.action == "del_res":
cmd = "cm_ctl res --del --res_name=\"%s\"" % self.name
elif self.action == "add_inst":
cmd = "cm_ctl res --edit --res_name=\"%s\" --add_inst=%s --inst_attr=%s" % \
(self.name, self.inst, self.attr)
elif self.action == "del_inst":
cmd = "cm_ctl res --edit --res_name=\"%s\" --del_inst=%s" % \
(self.name, self.inst)
return cmd

View File

@ -30,6 +30,8 @@ try:
from gspylib.common.DbClusterStatus import DbClusterStatus
from gspylib.os.gsfile import g_file
from gspylib.component.CM.CM import CM, CmResAttr, CmResCtrlCmd, DssInstAttr
from gspylib.component.CM.CM import VipInstAttr, VipAddInst, VipDelInst
from gspylib.component.CM.CM import VipCmResCtrlCmd, VipResAttr
from gspylib.common.DbClusterInfo import dbClusterInfo
from base_utils.os.crontab_util import CrontabUtil
from base_utils.os.env_util import EnvUtil
@ -872,3 +874,105 @@ class CM_OLAP(CM):
raise Exception(
'Failed to initialize the CM resource file. Error: {}'.format(
str(out)))
def get_add_cm_res_cmd(self, cm_res_info):
"""
Get add CM resource information cmd for VIP
"""
cmd_list = []
for res_name, _list in cm_res_info.items():
check_res = "cm_ctl res --list --res_name=\"%s\"" % res_name
stat, out= subprocess.getstatusoutput(check_res)
if stat != 0 or not out:
cmd_list.append(str(VipCmResCtrlCmd("add_res", res_name,
attr=VipResAttr(_list[0][0]))))
for _tup in _list:
check_inst = "cm_ctl res --list --res_name=\"%s\" --list_inst" \
" | grep \"%s\"" % (res_name, _tup[1])
stat, out= subprocess.getstatusoutput(check_inst)
if stat != 0 or not out:
cmd_list.append(str(VipCmResCtrlCmd("add_inst", res_name,
inst=VipAddInst(_tup[2], _tup[3]),
attr=VipInstAttr(_tup[1]))))
cmd = "source %s; %s" % (EnvUtil.getMpprcFile(), ' ;'.join(cmd_list))
self.logger.log("Add cm resource information cmd: \n%s" % cmd)
return cmd
def _get_cm_res_info(self, base_ip):
"""
Get the CM resource info for reducing
"""
# Get resource name
cmd = "cm_ctl res --list | grep \"VIP\" | awk -F \"|\" '{print $1}'" \
" | xargs -i cm_ctl res --list --res_name={} --list_inst" \
" | grep \"base_ip=%s\" | awk -F \"|\" '{print $1}'" % base_ip
stat, out= subprocess.getstatusoutput(cmd)
if stat != 0:
raise Exception("Failed to get res name. Cmd: \n%s" % cmd)
if not out:
return "", -1, -1
res_name = out.strip()
# Get intance ID
cmd = "cm_ctl res --list --res_name=\"%s\" --list_inst | grep " \
"\"base_ip=%s\" | awk -F \"|\" '{print $4}'" % (res_name, base_ip)
stat, out= subprocess.getstatusoutput(cmd)
if stat != 0 or not out:
raise Exception("Failed to get the intance ID. Cmd: \n%s" % cmd)
inst_id = int(out.strip())
# Get the number of instances contained in a resource
cmd = "cm_ctl res --list --res_name=\"%s\" --list_inst" \
" | grep \"VIP\" | wc -l" % res_name
stat, out= subprocess.getstatusoutput(cmd)
if stat != 0 or not out:
raise Exception("Failed to get the number of instances. Cmd: %s" % cmd)
inst_num = int(out.strip())
self.logger.log("CM resource info: res_name=%s,inst_id=%d,inst_num=%d q" \
"" % (res_name, inst_id, inst_num))
return res_name, inst_id, inst_num
def get_reduce_cm_res_cmd(self, base_ips):
"""
Get reduce cm resource information cmd for VIP
"""
cmd_list = []
for base_ip in base_ips:
res_name, inst_id, inst_num = self._get_cm_res_info(base_ip)
if not res_name:
self.logger.log("The base IP is not found: %s" % base_ip)
continue
if inst_num > 1:
cmd_list.append(str(VipCmResCtrlCmd("del_inst", res_name,
inst=VipDelInst(inst_id))))
else:
cmd_list.append(str(VipCmResCtrlCmd("del_res", res_name)))
cmd = "source %s; %s" % (EnvUtil.getMpprcFile(), ' ;'.join(cmd_list))
self.logger.log("Reduce cm resource information cmd: \n%s" % cmd)
return cmd
def config_cm_res_json(self, base_ips, cm_res_info):
"""
Config cm resource file for vip
"""
if not base_ips and not cm_res_info:
raise Exception("The parameters cannot be empty at the same time")
cmd = self.get_reduce_cm_res_cmd(base_ips)
stat, out = subprocess.getstatusoutput(cmd)
if stat != 0:
raise Exception("Failed to reduce the CM resource for VIP." \
" Cmd: \n%s, Error: \n%s" % (cmd, str(out)))
cmd = self.get_add_cm_res_cmd(cm_res_info)
stat, out = subprocess.getstatusoutput(cmd)
if stat != 0:
raise Exception("Failed to add the CM resource for VIP." \
" Cmd: \n%s, Error: \n%s" % (cmd, str(out)))
cmd = "cm_ctl res --check"
stat, out = subprocess.getstatusoutput(cmd)
if stat != 0:
raise Exception("Failed to config the CM resource file for VIP." \
" Cmd: \n%s, Error: \n%s" % (cmd, str(out)))

View File

@ -453,7 +453,7 @@ class DN_OLAP(Kernel):
self.modifyDummpyStandbyConfigItem()
def setPghbaConfig(self, clusterAllIpList, try_reload=False):
def setPghbaConfig(self, clusterAllIpList, try_reload=False, float_ips=None):
"""
"""
principal = None
@ -472,39 +472,47 @@ class DN_OLAP(Kernel):
# build ip string list
# Every 1000 records merged into one
i = 0
GUCParasStr = ""
guc_paras_str = ""
GUCParasStrList = []
pg_user = ClusterUser.get_pg_user()
for ipAddress in clusterAllIpList:
for ip_address in clusterAllIpList:
i += 1
# Set the initial user and initial database access permissions
if principal is None:
GUCParasStr += "-h \"host all %s %s/32 %s\" " % \
(pg_user, ipAddress, METHOD_TRUST)
GUCParasStr += "-h \"host all all %s/32 %s\" " % (ipAddress, METHOD_SHA)
if ip_address.startswith("floatIp"):
guc_paras_str += "-h \"host all all %s/32 %s\" " % \
(float_ips[ip_address], METHOD_SHA)
else:
guc_paras_str += "-h \"host all %s %s/32 %s\" " % \
(pg_user, ip_address, METHOD_TRUST)
guc_paras_str += "-h \"host all all %s/32 %s\" " % \
(ip_address, METHOD_SHA)
else:
GUCParasStr += "-h \"host all %s %s/32 gss " \
"include_realm=1 krb_realm=%s\" "\
% (pg_user, ipAddress, principal)
GUCParasStr += "-h \"host all all %s/32 %s\" " % (ipAddress, METHOD_SHA)
if ip_address.startswith("floatIp"):
guc_paras_str += "-h \"host all all %s/32 %s\" " % \
(float_ips[ip_address], METHOD_SHA)
else:
guc_paras_str += "-h \"host all %s %s/32 gss include_realm=1 " \
" krb_realm=%s\" " % (pg_user, ip_address, principal)
guc_paras_str += "-h \"host all all %s/32 %s\" " % \
(ip_address, METHOD_SHA)
if (i % MAX_PARA_NUMBER == 0):
GUCParasStrList.append(GUCParasStr)
GUCParasStrList.append(guc_paras_str)
i = 0
GUCParasStr = ""
guc_paras_str = ""
# Used only streaming disaster cluster
streaming_dn_ips = self.get_streaming_relate_dn_ips(self.instInfo)
if streaming_dn_ips:
for dn_ip in streaming_dn_ips:
GUCParasStr += "-h \"host all %s %s/32 %s\" " \
guc_paras_str += "-h \"host all %s %s/32 %s\" " \
% (pg_user, dn_ip, METHOD_TRUST)
GUCParasStr += "-h \"host all all %s/32 %s\" " \
guc_paras_str += "-h \"host all all %s/32 %s\" " \
% (dn_ip, METHOD_SHA)
ip_segment = '.'.join(dn_ip.split('.')[:2]) + ".0.0/16"
GUCParasStr += "-h \"host replication all %s sha256\" " % ip_segment
guc_paras_str += "-h \"host replication all %s sha256\" " % ip_segment
if (GUCParasStr != ""):
GUCParasStrList.append(GUCParasStr)
if (guc_paras_str != ""):
GUCParasStrList.append(guc_paras_str)
for parasStr in GUCParasStrList:
self.doGUCConfig("set", parasStr, True, try_reload=try_reload)

View File

@ -433,6 +433,74 @@ class OperCommon:
"[gs_dropnode]End to backup parameter config file on %s." % host)
return '%s/parameter_%s.tar' % (tmpPath, host)
def check_is_vip_mode(self):
"""
Check whether the current mode is VIP
"""
cmd = "cm_ctl res --list | awk -F \"|\" '{print $2}' | grep -w \"VIP\""
self.logger.log("Command for Checking VIP mode: %s" % cmd)
stat, out= subprocess.getstatusoutput(cmd)
if stat != 0 or not out:
return False
return True
def get_float_ip_from_json(self, base_ip, host_ips_for_del):
"""
Get float IP from json file by cmd
"""
cmd = "cm_ctl res --list | grep \"VIP\" | awk -F \"|\" '{print $1}' | " \
"xargs -i cm_ctl res --list --res_name={} --list_inst |grep \"base_ip=%s\""\
" | awk -F \"|\" '{print $1}' | xargs -i cm_ctl res --list --res_name={}" \
" | grep \"VIP\" | awk -F \"|\" '{print $3}'" % base_ip
stat, out= subprocess.getstatusoutput(cmd)
if stat != 0:
GaussLog.exitWithError(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd)
if not out:
self.logger.log("Failed to get float IP from json. Cmd: %s" % cmd)
return ""
float_ip = re.findall("float_ip=([\.\d]+)", out.strip())[0]
cmd = "cm_ctl res --list | grep \"VIP\" | awk -F \"|\" '{print $1}' | " \
"xargs -i cm_ctl res --list --res_name={} | grep \"float_ip=%s\" | " \
"awk -F \"|\" '{print $1}' | xargs -i cm_ctl res --list --res_name={} " \
"--list_inst | grep \"VIP\" | awk -F \"|\" '{print $5}'" % float_ip
stat, out= subprocess.getstatusoutput(cmd)
if stat != 0 or not out:
raise Exception("Failed to get base IP list from json. Cmd: %s" % cmd)
for item in out.split('\n'):
_ip = re.findall("base_ip=([\.\d]+)", item.strip())[0]
if _ip not in host_ips_for_del:
return ""
self.logger.log("Successfully get float IP from json, %s." % float_ip)
return float_ip
def get_float_ip_config(self, host, dn_dir, host_ips_for_del, ssh_tool, env_file):
"""
Get float IP configuration str
"""
if not self.check_is_vip_mode():
self.logger.log("The current cluster does not support VIP.")
return ""
float_ips_for_del = []
for _ip in host_ips_for_del:
float_ip = self.get_float_ip_from_json(_ip, host_ips_for_del)
if float_ip and float_ip not in float_ips_for_del:
float_ips_for_del.append(float_ip)
cmd = "grep '^host.*sha256' %s" % os.path.join(dn_dir, 'pg_hba.conf')
stat_map, output = ssh_tool.getSshStatusOutput(cmd, [host], env_file)
if stat_map[host] != 'Success':
self.logger.debug("[gs_dropnode]Parse pg_hba file failed:" + output)
GaussLog.exitWithError(ErrorCode.GAUSS_358["GAUSS_35809"])
ret = ""
for float_ip in float_ips_for_del:
if float_ip in output:
s = output.rfind('host', 0, output.find(float_ip))
e = output.find('\n', output.find(float_ip), len(output))
ret += output[s:e] + '|'
return ret
def parseConfigFile(self, host, dirDn, dnId, hostIpListForDel, sshTool,
envfile):
"""
@ -475,6 +543,8 @@ class OperCommon:
s = output.rfind('host', 0, output.find(ip))
e = output.find('\n', output.find(ip), len(output))
resultDict['pghbaStr'] += output[s:e] + '|'
resultDict['pghbaStr'] += self.get_float_ip_config(host, dirDn, hostIpListForDel,
sshTool, envfile)
self.logger.log(
"[gs_dropnode]End to parse parameter config file on %s." % host)
return resultDict

View File

@ -28,13 +28,18 @@ sys.path.append(sys.path[0] + "/../../../../")
from base_utils.os.net_util import NetUtil
from base_utils.os.env_util import EnvUtil
from base_utils.executor.cmd_executor import CmdExecutor
from gspylib.common.OMCommand import OMCommand
from gspylib.common.ErrorCode import ErrorCode
from gspylib.common.Common import DefaultValue
from gspylib.component.CM.CM_OLAP.CM_OLAP import CM_OLAP
from gspylib.threads.SshTool import SshTool
from gspylib.os.gsfile import g_file
from impl.dropnode.DropnodeImpl import DropnodeImpl
from base_utils.os.file_util import FileUtil
# Action type
ACTION_DROP_NODE = "drop_node"
class DropNodeWithCmImpl(DropnodeImpl):
@ -83,6 +88,37 @@ class DropNodeWithCmImpl(DropnodeImpl):
raise Exception("Too many cm_server nodes are dropped.A maximum of {0} cm_server "
"nodes can be dropped.".format(len(all_cm_server_nodes) - 2))
def backup_cm_res_json(self):
"""
Backup cm resource json on primary node
"""
cm_resource = os.path.realpath(
os.path.join(self.cm_component.instInfo.datadir, "cm_resource.json"))
backup_cm_res = os.path.realpath(
os.path.join(self.pghostPath, "cm_resource_bak.json"))
if not os.path.isfile(backup_cm_res):
FileUtil.cpFile(cm_resource, backup_cm_res)
def update_cm_res_json(self):
"""
Update cm resource json file.
"""
if not self.commonOper.check_is_vip_mode():
self.logger.log("The current cluster does not support VIP.")
return
self.backup_cm_res_json()
self.logger.log("Updating cm resource file on exist nodes.")
del_hosts = ",".join(self.context.hostMapForDel.keys())
cmd = "source %s; " % self.userProfile
cmd += "%s -t %s -U %s -H %s -l '%s' " % (
OMCommand.getLocalScript("Local_Config_CM_Res"),
ACTION_DROP_NODE, self.user, del_hosts, self.context.localLog)
self.logger.debug("Command for updating cm resource file: %s" % cmd)
CmdExecutor.execCommandWithMode(cmd, self.ssh_tool,
host_list=self.context.hostMapForExist.keys())
self.logger.log("Successfully updated cm resource file.")
def _stop_drop_node(self):
"""
try to stop drop nodes
@ -154,6 +190,27 @@ class DropNodeWithCmImpl(DropnodeImpl):
"HINT: Maybe the cluster is continually being started in the background.\n"
"You can wait for a while and check whether the cluster starts.")
def restore_cm_res_json(self):
"""
Restore cm resource json on primary node
"""
cm_resource = os.path.realpath(
os.path.join(self.cm_component.instInfo.datadir, "cm_resource.json"))
backup_cm_res = os.path.realpath(
os.path.join(self.pghostPath, "cm_resource_bak.json"))
if os.path.isfile(backup_cm_res):
FileUtil.cpFile(backup_cm_res, cm_resource)
def remove_cm_res_backup(self):
"""
Remove cm resource backup on primary node
"""
backup_cm_res = os.path.realpath(
os.path.join(self.pghostPath, "cm_resource_bak.json"))
if os.path.isfile(backup_cm_res):
os.remove(backup_cm_res)
self.logger.log("Successfully remove cm resource backup file")
def run(self):
"""
start dropnode
@ -163,11 +220,14 @@ class DropNodeWithCmImpl(DropnodeImpl):
self.check_drop_cm_node()
self.change_user()
self.logger.log("[gs_dropnode]Start to drop nodes of the cluster.")
self.restore_cm_res_json()
self.checkAllStandbyState()
self.dropNodeOnAllHosts()
self.operationOnlyOnPrimary()
self.update_cm_res_json()
self._stop_drop_node()
self._generate_flag_file_on_drop_nodes()
self.modifyStaticConf()
self.restart_new_cluster()
self.remove_cm_res_backup()
self.logger.log("[gs_dropnode] Success to drop the target nodes.")

View File

@ -650,6 +650,23 @@ gs_guc set -D {dn} -c "available_zone='{azName}'"
if self._isAllFailed():
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35706"] % "set guc")
def get_add_float_ip_cmd(self, host_ip):
"""
Get cmd for adding float IP to pg_hba.conf
"""
if not self.context.clusterInfo.float_ips:
self.logger.debug("The current cluster does not support VIP.")
return ""
cmd = ""
name = self.context.backIpNameMap[host_ip]
node = self.context.clusterInfo.getDbNodeByName(name)
for inst in node.datanodes:
for float_ip in inst.float_ips:
cmd += " -h 'host all all %s/32 sha256'" % \
self.context.clusterInfo.float_ips[float_ip]
return cmd
def addTrust(self):
"""
add authentication rules about new host ip in existing hosts and
@ -666,13 +683,13 @@ gs_guc set -D {dn} -c "available_zone='{azName}'"
cmd = "source %s;gs_guc set -D %s" % (self.envFile, dataNode)
if hostExec in self.existingHosts:
for hostParam in self.context.newHostList:
cmd += " -h 'host all all %s/32 trust'" % \
hostParam
cmd += " -h 'host all all %s/32 trust'" % hostParam
cmd += self.get_add_float_ip_cmd(hostParam)
else:
for hostParam in allHosts:
if hostExec != hostParam:
cmd += " -h 'host all all %s/32 trust'" % \
hostParam
cmd += " -h 'host all all %s/32 trust'" % hostParam
cmd += self.get_add_float_ip_cmd(hostParam)
self.logger.debug("[%s] trustCmd:%s" % (hostExec, cmd))
sshTool = SshTool([hostExec])
sshTool.getSshStatusOutput(cmd, [hostExec], self.envFile)

View File

@ -47,7 +47,9 @@ from domain_utils.cluster_file.cluster_dir import ClusterDir
from gspylib.component.CM.CM_OLAP.CM_OLAP import CM_OLAP
# Action type
ACTION_INSTALL_CLUSTER = "install_cluster"
ACTION_EXPAND_NODE ="expansion_node"
def change_user_executor(perform_method):
@ -116,8 +118,9 @@ class ExpansionImplWithCm(ExpansionImpl):
DefaultValue.MAX_DIRECTORY_MODE)
create_dir_cmd += " && chown {0}:{1} {2}".format(self.user, self.group, xml_dir)
self.ssh_tool.executeCommand(create_dir_cmd)
self.ssh_tool.scpFiles(self.context.xmlFile, self.context.xmlFile,
hostList=self.get_node_names(self.new_nodes))
self.ssh_tool.scpFiles(self.context.xmlFile, self.context.xmlFile)
cmd = "chown %s:%s %s" % (self.user, self.group, self.context.xmlFile)
self.ssh_tool.executeCommand(cmd)
self.logger.log("Success to send XML to new nodes")
def preinstall_run(self):
@ -302,11 +305,16 @@ class ExpansionImplWithCm(ExpansionImpl):
datains = node.datanodes[0]
log_dir = "%s/pg_log/dn_%d" % (log_path, appname)
audit_dir = "%s/pg_audit/dn_%d" % (log_path, appname)
if "127.0.0.1" in datains.listenIps:
listen_ips = "%s" % ",".join(datains.listenIps)
else:
listen_ips = "localhost,%s" % ",".join(datains.listenIps)
new_nodes_para_list.extend([
(node.name, datains.datadir, "port", datains.port),
(node.name, datains.datadir, "application_name", "dn_%s" % appname),
(node.name, datains.datadir, "log_directory", "%s" % log_dir),
(node.name, datains.datadir, "audit_directory", "%s" % audit_dir)
(node.name, datains.datadir, "audit_directory", "%s" % audit_dir),
(node.name, datains.datadir, "listen_addresses", listen_ips)
])
for new_node_para in new_nodes_para_list:
@ -381,6 +389,9 @@ class ExpansionImplWithCm(ExpansionImpl):
cmd = "source {0};gs_guc set -D {1}".format(self.envFile, new_inst.datadir)
cmd += " -h 'host all %s %s/32 trust'" % (self.user, host_ip)
cmd += " -h 'host all all %s/32 sha256'" % host_ip
if self.xml_cluster_info.float_ips:
cmd += " -h 'host all all %s/32 sha256'" % \
self.xml_cluster_info.float_ips[new_inst.float_ips[0]]
self.logger.log("Ready to perform command on node [{0}]. "
"Command is : {1}".format(new_node.name, cmd))
CmdExecutor.execCommandWithMode(cmd, self.ssh_tool, host_list=[new_node.name])
@ -394,6 +405,22 @@ class ExpansionImplWithCm(ExpansionImpl):
self._config_new_node_hba(node)
self.logger.log("Successfully set hba on all nodes.")
def _update_cm_res_json(self):
"""
Update cm resource json file.
"""
if not self.xml_cluster_info.float_ips:
self.logger.log("The current cluster does not support VIP.")
return
self.logger.log("Updating cm resource file on all nodes.")
cmd = "source %s; " % self.envFile
cmd += "%s -t %s -U %s -X '%s' -l '%s' " % (
OMCommand.getLocalScript("Local_Config_CM_Res"), ACTION_EXPAND_NODE,
self.context.user, self.context.xmlFile, self.context.localLog)
self.logger.debug("Command for updating cm resource file: %s" % cmd)
CmdExecutor.execCommandWithMode(cmd, self.ssh_tool)
self.logger.log("Successfully updated cm resource file.")
def _config_instance(self):
"""
Config instance
@ -402,6 +429,7 @@ class ExpansionImplWithCm(ExpansionImpl):
self.generateClusterStaticFile()
self.setGucConfig()
self._set_other_guc_para()
self._update_cm_res_json()
self._config_pg_hba()
self.distributeCipherFile()

View File

@ -36,6 +36,9 @@ from gspylib.component.DSS.dss_checker import DssConfig
ROLLBACK_FAILED = 3
# Action type
ACTION_INSTALL_CLUSTER = "install_cluster"
class InstallImplOLAP(InstallImpl):
"""
@ -221,6 +224,8 @@ class InstallImplOLAP(InstallImpl):
output: NA
"""
# config instance applications
if self.context.clusterInfo.float_ips:
self.config_cm_res_json()
self.updateInstanceConfig()
self.updateHbaConfig()
@ -372,6 +377,22 @@ class InstallImplOLAP(InstallImpl):
self.context.isSingle)
self.context.logger.debug("Successfully configured node instance.")
def config_cm_res_json(self):
"""
Config cm resource json file.
"""
self.context.logger.log("Configuring cm resource file on all nodes.")
cmd = "source %s; " % self.context.mpprcFile
cmd += "%s -t %s -U %s -X '%s' -l '%s' " % (
OMCommand.getLocalScript("Local_Config_CM_Res"), ACTION_INSTALL_CLUSTER,
self.context.user, self.context.xmlFile, self.context.localLog)
self.context.logger.debug(
"Command for configuring cm resource file: %s" % cmd)
CmdExecutor.execCommandWithMode(cmd,
self.context.sshTool,
self.context.isSingle)
self.context.logger.log("Successfully configured cm resource file.")
def updateHbaConfig(self):
"""
function: config Hba instance

View File

@ -203,6 +203,7 @@ class ConfigHba(LocalBaseOM):
for inst in nodeinfo.datanodes:
self.allIps += inst.haIps
self.allIps += inst.listenIps
self.allIps += inst.float_ips
# get all ips. Remove the duplicates ips
self.allIps = DefaultValue.Deduplication(self.allIps)
@ -260,7 +261,8 @@ class ConfigHba(LocalBaseOM):
self.logger.debug("The %s does not exist." % hbaFile)
return
component.setPghbaConfig(self.allIps, try_reload=self.try_reload)
component.setPghbaConfig(self.allIps, try_reload=self.try_reload,
float_ips=self.clusterInfo.float_ips)
if len(self.removeIps) != 0:
component.removeIpInfoOnPghbaConfig(self.removeIps)
self.remove_streaming_config(component)

View File

@ -0,0 +1,231 @@
#!/usr/bin/env python3
#-*- coding:utf-8 -*-
##############################################################################
#Copyright (c): 2020-2025, Huawei Tech. Co., Ltd.
#FileName : config_cm_resource.py
#Version : openGauss
#Date : 2023-02-12
#Description : config_cm_resource.py is a utility to config CM resource file.
##############################################################################
import os
import sys
import getopt
sys.path.append(sys.path[0] + "/../")
from gspylib.common.GaussLog import GaussLog
from gspylib.common.ErrorCode import ErrorCode
from gspylib.common.LocalBaseOM import LocalBaseOM
from domain_utils.domain_common.cluster_constants import ClusterConstants
from domain_utils.cluster_file.cluster_log import ClusterLog
from gspylib.threads.parallelTool import parallelTool
from base_utils.os.net_util import NetUtil
# Global variables define
g_opts = None
# Action type
ACTION_INSTALL_CLUSTER = "install_cluster"
ACTION_EXPAND_NODE ="expansion_node"
ACTION_DROP_NODE = "drop_node"
class CmdOptions():
"""
Command line parameters
"""
def __init__(self):
"""
"""
self.action_type = ""
self.cluster_user = ""
self.cluster_conf = ""
self.log_file = ""
self.drop_nodes = []
def parse_command_line():
"""
Parse command line
"""
try:
opts, args = getopt.getopt(sys.argv[1:], "t:U:X:l:H:")
except Exception as e:
GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50000"] % str(e))
if len(args) > 0:
GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50000"] % str(args[0]))
global g_opts
g_opts = CmdOptions()
for (key, value) in opts:
if key == "-t":
g_opts.action_type = value
elif key == "-U":
g_opts.cluster_user = value
elif key == "-X":
g_opts.cluster_conf = value
elif key == "-l":
g_opts.log_file = value
elif key == "-H":
g_opts.drop_nodes = value.split(',')
def check_parameters():
"""
Check parameters
"""
if not g_opts.action_type:
GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50001"] % 't' + ".")
if not g_opts.drop_nodes and g_opts.action_type == ACTION_DROP_NODE:
GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50001"] % 'H' + ".")
if not g_opts.cluster_user:
GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50001"] % 'U' + ".")
if g_opts.cluster_conf:
if not os.path.exists(g_opts.cluster_conf):
GaussLog.exitWithError(ErrorCode.GAUSS_502["GAUSS_50201"] % g_opts.cluster_conf)
if not g_opts.log_file:
g_opts.log_file = ClusterLog.getOMLogPath(
ClusterConstants.LOCAL_LOG_FILE, g_opts.cluster_user, "")
class ConfigCMResource(LocalBaseOM):
"""
class: ConfigCMResource
"""
def __init__(self, action_type, cluster_user, cluster_conf, log_file, drop_nodes):
"""
Init configuration on local node
"""
LocalBaseOM.__init__(self, log_file, cluster_user, cluster_conf)
if self.clusterConfig == "":
self.readConfigInfo()
else:
self.readConfigInfoByXML()
# Check user information
self.getUserInfo()
if self.user != cluster_user.strip():
self.logger.debug("User parameter : %s." % self.user)
self.logger.logExit(ErrorCode.GAUSS_503["GAUSS_50315"]
% (self.user, self.clusterInfo.appPath))
self.initComponent()
self.cm_res_info = {}
self.base_ips = []
self.action_type = action_type
self.drop_nodes = drop_nodes
def get_cm_res_info(self):
"""
Get CM resource information for adding
"""
# get all node names
node_names = self.clusterInfo.getClusterNodeNames()
for node_name in node_names:
node_info = self.clusterInfo.getDbNodeByName(node_name)
for inst in node_info.datanodes:
for i, res_name in enumerate(inst.float_ips):
_tup = (self.clusterInfo.float_ips[res_name], inst.listenIps[i],
inst.instanceId, node_info.id)
if res_name not in self.cm_res_info:
self.cm_res_info[res_name] = [_tup]
else:
self.cm_res_info[res_name].append(_tup)
self.logger.log("Successfully get cm res info: \n%s" % str(self.cm_res_info))
def get_base_ips(self):
"""
Get base IP for reducing
"""
for node_name in self.drop_nodes:
node_info = self.clusterInfo.getDbNodeByName(node_name)
for inst in node_info.datanodes:
self.base_ips.extend(inst.listenIps)
def _config_an_instance(self, component):
"""
Config CM resource file for single component
"""
# check instance data directory
inst_type = component.instInfo.datadir.split('/')[-1].strip()
if inst_type != "cm_agent":
self.logger.log("Current instance is not cm_agent")
return
if not os.path.exists(component.instInfo.datadir):
raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % \
"data directory of the cm_agent instance")
component.config_cm_res_json(self.base_ips, self.cm_res_info)
def add_cm_res_info(self):
"""
Config CM resource file for "install/expansion"
"""
self.get_cm_res_info()
component_list = self.cmCons
try:
parallelTool.parallelExecute(self._config_an_instance, component_list)
except Exception as e:
raise Exception(str(e))
def reduce_cm_res_info(self):
"""
Config CM resource file for "dropnode"
"""
self.get_base_ips()
component_list = self.cmCons
try:
parallelTool.parallelExecute(self._config_an_instance, component_list)
except Exception as e:
raise Exception(str(e))
def run(self):
"""
Config CM resource file
"""
fun_dict = {ACTION_INSTALL_CLUSTER : self.add_cm_res_info,
ACTION_EXPAND_NODE : self.add_cm_res_info,
ACTION_DROP_NODE : self.reduce_cm_res_info}
if self.action_type in list(fun_dict.keys()):
fun_dict[self.action_type]()
else:
raise Exception(ErrorCode.GAUSS_500["GAUSS_50004"] % 't' + \
" Value: %s." % self.action_type)
self.logger.log("Successfully configured CM resource file on node[%s]" % \
NetUtil.GetHostIpOrName())
if __name__ == '__main__':
"""
function: Main function
1.Parse command line
2.Check parameter
3.Read config from xml config file
4.Get the CM resource information to be configured
5.Config CM resource file
input : NA
output: NA
"""
try:
parse_command_line()
check_parameters()
configer = ConfigCMResource(g_opts.action_type, g_opts.cluster_user,
g_opts.cluster_conf, g_opts.log_file, g_opts.drop_nodes)
configer.run()
except Exception as e:
GaussLog.exitWithError(str(e))
sys.exit(0)