Files
openGauss-OM/script/gs_expansion
huchangqi 3c52ea3600 add loongarch and loongnix-server support
1. 名称使用  loongnix 而不是 loongnix-server
是由于部分脚本是通过 /etc/loongnix-release 的文件名去判断版本的,这样只
能匹配到 loongnix.

2. loongnix-server 修改LD_LIBRARY_PATH 之后,会导致 sudo ,su等命令失
效,只好在执行su命令的时候 export LD_LIBRARY_PATH=/usr/lib64, 保证su
命令成功, 试过 alias su='export LD_LIBRARY_PATH=/usr/lib64; su'
但是 py 需要 subprocess.run(cmd, shell=True) 才能读取这种环境变量,还是
全部替换了。
2024-08-15 11:04:20 +08:00

592 lines
25 KiB
Python

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#############################################################################
# Copyright (c) 2020 Huawei Technologies Co.,Ltd.
#
# openGauss is licensed under Mulan PSL v2.
# You can use this software according to the terms
# and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
# http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
# WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# ----------------------------------------------------------------------------
# Description : gs_expansion is a utility to expansion standby node databases
#############################################################################
import os
import sys
import subprocess
import socket
package_path = os.path.dirname(os.path.realpath(__file__))
ld_path = package_path + "/gspylib/clib"
if 'LD_LIBRARY_PATH' not in os.environ:
os.environ['LD_LIBRARY_PATH'] = ld_path
os.execve(os.path.realpath(__file__), sys.argv, os.environ)
if not os.environ.get('LD_LIBRARY_PATH').startswith(ld_path):
os.environ['LD_LIBRARY_PATH'] = \
ld_path + ":" + os.environ['LD_LIBRARY_PATH']
os.execve(os.path.realpath(__file__), sys.argv, os.environ)
sys.path.append(sys.path[0])
from gspylib.common.DbClusterInfo import dbClusterInfo, \
checkPathVaild, dbNodeInfo, instanceInfo
from gspylib.common.GaussLog import GaussLog
from gspylib.common.Common import DefaultValue
from gspylib.common.ErrorCode import ErrorCode
from gspylib.common.ParallelBaseOM import ParallelBaseOM
from gspylib.common.ParameterParsecheck import Parameter
from impl.expansion.ExpansionImpl import ExpansionImpl
from impl.expansion.expansion_impl_with_cm import ExpansionImplWithCm
from impl.expansion.expansion_impl_with_cm_local import ExpansionImplWithCmLocal
from domain_utils.cluster_file.cluster_config_file import ClusterConfigFile
from domain_utils.cluster_file.cluster_log import ClusterLog
from base_utils.os.env_util import EnvUtil
ENV_LIST = ["MPPDB_ENV_SEPARATE_PATH", "GPHOME", "PATH",
"LD_LIBRARY_PATH", "PYTHONPATH", "GAUSS_WARNING_TYPE",
"GAUSSHOME", "PATH", "LD_LIBRARY_PATH",
"S3_CLIENT_CRT_FILE", "GAUSS_VERSION", "PGHOST",
"GS_CLUSTER_NAME", "GAUSSLOG", "GAUSS_ENV", "umask"]
# The following attributes are skipped because the information
# in the static configuration file of the OpenGauss is incorrect.
ABORT_CHECK_PROPERTY = ["xmlFile", "id", "gtmNum", "instanceId",
"masterBasePorts", "standbyBasePorts",
"instanceType", "_dbClusterInfo__newInstanceId",
"_dbClusterInfo__newMirrorId", "version",
"installTime", "localNodeId", "nodeCount",
"_dbClusterInfo__newGroupId", "cmsNum", "datadir",
"enable_dcf", "dcf_config", "dcf_data_path", "cmscount", "casecadeRole",
"enable_dss", "dss_config", "dss_home", "cm_vote_disk", "cm_share_disk",
"dss_pri_disks", "dss_shared_disks", "dss_vg_info", "dss_vgname",
"dss_ssl_enable", "ss_rdma_work_config", "ss_interconnect_type", "syncNumFirst"]
IGNORE_CHECK_KEY = ["cascadeRole"]
class Expansion(ParallelBaseOM):
"""
"""
def __init__(self):
"""
"""
ParallelBaseOM.__init__(self)
# new added standby node backip list
self.newHostList = []
self.clusterInfoDict = {}
self.backIpNameMap = {}
self.newHostCasRoleMap = {}
self.hostAzNameMap = {}
self.packagepath = os.path.realpath(
os.path.join(os.path.realpath(__file__), "../../"))
self.standbyLocalMode = False
self.time_out = None
self.envFile = EnvUtil.getEnv("MPPDB_ENV_SEPARATE_PATH")
def usage(self):
"""
gs_expansion is a utility to expansion standby node for a cluster, streaming cluster does not yet support.
Usage:
gs_expansion -? | --help
gs_expansion -V | --version
gs_expansion -U USER -G GROUP -X XMLFILE -h nodeList [-L]
General options:
-U Cluster user.
-G Group of the cluster user.
-X Path of the XML configuration file.
-h New standby node backip list.
Separate multiple nodes with commas (,).
such as '-h 192.168.0.1,192.168.0.2'
-L The standby database installed with
local mode.
--time-out=SECS Maximum waiting time when send the
packages to new standby nodes.
-?, --help Show help information for this
utility, and exit the command line mode.
-V, --version Show version information.
"""
print(self.usage.__doc__)
def parseCommandLine(self):
"""
parse parameter from command line
"""
ParaObj = Parameter()
ParaDict = ParaObj.ParameterCommandLine("expansion")
# parameter -h or -?
if (ParaDict.__contains__("helpFlag")):
self.usage()
sys.exit(0)
# Resolves command line arguments
# parameter -U
if (ParaDict.__contains__("user")):
self.user = ParaDict.get("user")
DefaultValue.checkPathVaild(self.user)
# parameter -G
if (ParaDict.__contains__("group")):
self.group = ParaDict.get("group")
# parameter -X
if (ParaDict.__contains__("confFile")):
self.xmlFile = ParaDict.get("confFile")
# parameter -L
if (ParaDict.__contains__("localMode")):
self.localMode = ParaDict.get("localMode")
self.standbyLocalMode = ParaDict.get("localMode")
# parameter -l
if (ParaDict.__contains__("logFile")):
self.logFile = ParaDict.get("logFile")
#parameter -h
if (ParaDict.__contains__("nodename")):
self.newHostList = ParaDict.get("nodename")
# parameter --time-out
if (ParaDict.__contains__("time_out")):
self.time_out = ParaDict.get("time_out")
def checkParameters(self):
"""
function: Check parameter from command line
input: NA
output: NA
"""
# check user | group | xmlfile | node
if len(self.user) == 0:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35701"] % "-U")
if len(self.group) == 0:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35701"] % "-G")
if len(self.xmlFile) == 0:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35701"] % "-X")
if len(self.newHostList) == 0:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35701"] % "-h")
# check if upgrade action is exist
if DefaultValue.isUnderUpgrade(self.user):
GaussLog.exitWithError(ErrorCode.GAUSS_529["GAUSS_52936"])
if (self.time_out is None):
self.time_out = DefaultValue.TIMEOUT_CLUSTER_START
else:
# The timeout parameter must be a pure number
if (not str(self.time_out).isdigit()):
GaussLog.exitWithError(
ErrorCode.GAUSS_500["GAUSS_50003"] %
("-time-out", "a nonnegative integer"))
self.time_out = int(self.time_out)
# The timeout parameter must be greater than 0
# The timeout parameter must be less than the integer maximum
if (self.time_out <= 0 or self.time_out
>= 2147483647):
GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50004"]
% "-time-out")
def _get_node_dn_port(self, node_name):
"""
Get data node port
"""
if not self.clusterInfo.clusterType == DefaultValue.CLUSTER_TYPE_SINGLE_INST:
self.logger.log("The cluster type is not single-inst.")
raise Expansion("Cluster type is not single-inst.")
node = self.clusterInfo.getDbNodeByName(node_name)
if node.datanodes:
return node.datanodes[0].port
return None
def _getClusterInfoDict(self):
self.check_xml_config()
clusterInfo = ExpansionClusterInfo()
self.clusterInfo = clusterInfo
hostNameIpDict = clusterInfo.initFromXml(self.xmlFile)
clusterDict = clusterInfo.get_cluster_directory_dict()
self.nodeNameList = clusterInfo.getClusterNodeNames()
# get corepath and toolpath from xml file
corePath = clusterInfo.readClustercorePath(self.xmlFile)
toolPath = clusterInfo.getToolPath(self.xmlFile)
# parse xml file and cache node info
clusterInfoDict = {}
clusterInfoDict["appPath"] = clusterDict["appPath"][0]
clusterInfoDict["logPath"] = clusterDict["logPath"][0]
clusterInfoDict["corePath"] = corePath
clusterInfoDict["toolPath"] = toolPath
for nodeName in self.nodeNameList:
hostInfo = hostNameIpDict[nodeName]
ipList = hostInfo[0]
backIp = ipList[0]
sshIp = ipList[1]
port = self._get_node_dn_port(nodeName)
if clusterDict[nodeName]["dn"]["data_dir"]:
data_node = clusterDict[nodeName]["dn"]["data_dir"][0]
else:
data_node = ""
dbNode = clusterInfo.getDbNodeByName(nodeName)
clusterInfoDict[nodeName] = {
"backIp": backIp,
"sshIp": sshIp,
"port": port,
"localport": int(port) + 1,
"localservice": int(port) + 4,
"heartBeatPort": int(port) + 5,
"dataNode": data_node,
"instanceType": -1,
"azPriority": dbNode.azPriority
}
nodeIdList = clusterInfo.getClusterNodeIds()
for id in nodeIdList:
insType = clusterInfo.getdataNodeInstanceType(id)
hostName = clusterInfo.getHostNameByNodeId(id)
clusterInfoDict[hostName]["instanceType"] = insType
self.clusterInfoDict = clusterInfoDict
def initLogs(self):
"""
init log file
"""
# if no log file
if (self.logFile == ""):
self.logFile = ClusterLog.getOMLogPath(
DefaultValue.EXPANSION_LOG_FILE, self.user, "",
self.xmlFile)
# if not absolute path
if (not os.path.isabs(self.logFile)):
GaussLog.exitWithError(ErrorCode.GAUSS_502["GAUSS_50213"] % "log")
self.initLogger("gs_expansion")
# change the owner of gs_expansion.log to the db user
if os.path.isfile(self.logger.logFile):
(status, output) = subprocess.getstatusoutput("ls gs_expansion* -al | cut -d' ' -f3")
if output != self.user:
subprocess.getstatusoutput("chown {}:{} {}".format(self.user, self.group, self.logger.logFile))
self.logger.ignoreErr = True
def getExpansionInfo(self):
self._getClusterInfoDict()
self._getBackIpNameMap()
self._getHostAzNameMap()
self._getNewHostCasRoleMap()
def checkXmlIncludeNewHost(self):
"""
check parameter node must in xml config file
"""
backIpList = self.clusterInfo.getClusterBackIps()
for nodeIp in self.newHostList:
if nodeIp not in backIpList:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35702"] %
nodeIp)
def _getBackIpNameMap(self):
backIpList = self.clusterInfo.getClusterBackIps()
for backip in backIpList:
self.backIpNameMap[backip] = \
self.clusterInfo.getNodeNameByBackIp(backip)
def checkExecutingUser(self):
"""
check whether current user executing this command is root
"""
if os.getuid() != 0:
GaussLog.exitWithError(ErrorCode.GAUSS_501["GAUSS_50104"])
def checkExecutingHost(self):
"""
check whether current host is primary host
"""
currentHost = socket.gethostname()
primaryHost = ""
for nodeName in self.nodeNameList:
if self.clusterInfoDict[nodeName]["instanceType"] \
== 0:
primaryHost = nodeName
break
if currentHost != primaryHost:
GaussLog.exitWithError(ErrorCode.GAUSS_501["GAUSS_50110"] %
(currentHost + ", which is not primary"))
def checkTrust(self, hostList = None):
"""
check trust between primary/current host and every host in hostList
"""
if hostList == None:
hostList = list(self.nodeNameList)
backIpList = self.clusterInfo.getClusterBackIps()
hostList += backIpList
gpHome = EnvUtil.getEnv("GPHOME")
psshPath = "python3 %s/script/gspylib/pssh/bin/pssh" % gpHome
rootSSHExceptionHosts = []
individualSSHExceptionHosts = []
for host in hostList:
# check root's trust
checkRootTrustCmd = "%s -s -H %s 'pwd'" % (psshPath, host)
(status, output) = subprocess.getstatusoutput(checkRootTrustCmd)
if status != 0:
rootSSHExceptionHosts.append(host)
# check individual user's trust
checkUserTrustCmd = "export LD_LIBRARY_PATH=/usr/lib64; su - %s -c '%s -s -H %s pwd'" % (
self.user, psshPath, host)
(status, output) = subprocess.getstatusoutput(checkUserTrustCmd)
if status != 0:
individualSSHExceptionHosts.append(host)
# output ssh exception info if ssh connect failed
if rootSSHExceptionHosts or individualSSHExceptionHosts:
sshExceptionInfo = ""
if rootSSHExceptionHosts:
sshExceptionInfo += "\n"
sshExceptionInfo += ", ".join(rootSSHExceptionHosts)
sshExceptionInfo += " by root"
if individualSSHExceptionHosts:
sshExceptionInfo += "\n"
sshExceptionInfo += ", ".join(individualSSHExceptionHosts)
sshExceptionInfo += " by individual user"
GaussLog.exitWithError(ErrorCode.GAUSS_511["GAUSS_51100"] %
sshExceptionInfo)
def checkEnvfile(self):
"""
check whether env file is sourced
check whether info in XML is consistent with environment variable
"""
self.logger.debug("Checking environment variable.")
if not EnvUtil.getEnv("GPHOME"):
GaussLog.exitWithError(ErrorCode.GAUSS_518["GAUSS_51802"] % (
"\"GPHOME\", please import environment variable"))
if not EnvUtil.getEnv("GAUSSHOME"):
GaussLog.exitWithError(ErrorCode.GAUSS_518["GAUSS_51802"] % (
"\"GAUSSHOME\", please import environment variable"))
if not EnvUtil.getEnv("PGHOST"):
GaussLog.exitWithError(ErrorCode.GAUSS_518["GAUSS_51802"] % (
"\"PGHOST\", please import environment variable"))
clusterInfoDict = self.clusterInfoDict
toolPath = EnvUtil.getEnv("GPHOME")
appPath = EnvUtil.getEnv("GAUSSHOME")
if toolPath != clusterInfoDict["toolPath"]:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35711"] % "toolPath")
if appPath != clusterInfoDict["appPath"]:
GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35711"] % "appPath")
def _getHostAzNameMap(self):
"""
get azName of all hosts
"""
for dbnode in self.clusterInfo.dbNodes:
self.hostAzNameMap[dbnode.backIps[0]] = dbnode.azName
def _getNewHostCasRoleMap(self):
"""
get cascadeRole of newHosts
"""
for dbnode in self.clusterInfo.dbNodes:
if dbnode.backIps[0] in self.newHostList:
self.newHostCasRoleMap[dbnode.backIps[0]] = dbnode.cascadeRole
def check_cm_component(self):
"""
Init cluster information
"""
db_cluster_info = dbClusterInfo()
db_cluster_info.initFromStaticConfig(self.user)
if DefaultValue.get_cm_server_num_from_static(db_cluster_info) > 0:
return True
return False
def check_xml_config(self):
"""
Check XML configuration
"""
expand_cluster_info = ExpansionClusterInfo()
expand_cluster_info.initFromXml(self.xmlFile)
xml_cluster_info = dbClusterInfo()
xml_cluster_info.initFromXml(self.xmlFile)
static_cluster_info = dbClusterInfo()
static_cluster_info.initFromStaticConfig(self.user)
expand_cluster_info.compare_cluster_info(static_cluster_info,
xml_cluster_info)
def expand_run(self):
"""
This is expansion frame start
"""
if self.check_cm_component() and self.standbyLocalMode:
expand_impl = ExpansionImplWithCmLocal(self)
self.logger.log("Start expansion with cluster manager component on standby node.")
elif self.check_cm_component():
expand_impl = ExpansionImplWithCm(self)
self.logger.log("Start expansion with cluster manager component.")
else:
expand_impl = ExpansionImpl(self)
self.logger.log("Start expansion without cluster manager component.")
expand_impl.run()
class ExpansionClusterInfo(dbClusterInfo):
def __init__(self):
dbClusterInfo.__init__(self)
self.collect_compare_result = list()
def _remove_normal_msg(self, key):
"""
Remove normal message in alarm_message
"""
if key in self.collect_compare_result:
self.collect_compare_result.remove(key)
def _add_alarm_msg(self, key):
"""
Remove normal message in alarm_message
"""
self.collect_compare_result.append(key)
def getToolPath(self, xmlFile):
"""
function : Read tool path from default xml file
input : String
output : String
"""
ClusterConfigFile.setDefaultXmlFile(xmlFile)
# read gaussdb tool path from xml file
(retStatus, retValue) = ClusterConfigFile.readOneClusterConfigItem(
ClusterConfigFile.initParserXMLFile(xmlFile), "gaussdbToolPath", "cluster")
if retStatus != 0:
raise Exception(ErrorCode.GAUSS_512["GAUSS_51200"]
% "gaussdbToolPath" + " Error: \n%s" % retValue)
toolPath = os.path.normpath(retValue)
checkPathVaild(toolPath)
return toolPath
def find_right_node(self, a_node, b_list):
"""
Find node in b_list
"""
for node in b_list:
if node.name == a_node.name:
return node
raise Exception("Node {0} not config in XML.".format(a_node.name))
def find_right_instance(self, a_inst, b_list):
"""
Find instance in b_list
"""
for inst in b_list:
if inst.instanceId == a_inst.instanceId:
return inst
elif inst.instanceRole in [DefaultValue.INSTANCE_ROLE_CMSERVER,
DefaultValue.INSTANCE_ROLE_CMAGENT]:
return inst
raise Exception("Instance {0} not config in XML.".format(inst.instanceId))
def compare_list(self, a_list, b_list):
"""
Compare list object
"""
for a_index in range(len(a_list)):
if isinstance(a_list[a_index], dbNodeInfo):
self._add_alarm_msg("node: {0}".format(a_list[a_index].name))
b_list_node = self.find_right_node(a_list[a_index], b_list)
self.compare_dict(a_list[a_index].__dict__, b_list_node.__dict__)
self._remove_normal_msg("node: {0}".format(a_list[a_index].name))
elif isinstance(a_list[a_index], instanceInfo):
self._add_alarm_msg("instance: {0}".format(str(a_list[a_index].instanceId)))
b_list_inst = self.find_right_instance(a_list[a_index], b_list)
self.compare_dict(a_list[a_index].__dict__, b_list_inst.__dict__)
self._remove_normal_msg("instance: {0}".format(str(a_list[a_index].instanceId)))
elif isinstance(a_list[a_index], dict):
self._add_alarm_msg(a_list[a_index])
self.compare_dict(a_list[a_index], b_list[a_index])
self._remove_normal_msg(a_list[a_index])
elif isinstance(a_list[a_index], list):
self._add_alarm_msg(a_list[a_index])
self.compare_list(a_list[a_index], b_list[a_index])
self._remove_normal_msg(a_list[a_index])
elif isinstance(a_list[a_index], str):
self._add_alarm_msg(a_list[a_index])
self.compare_string(a_list[a_index], b_list[a_index])
self._remove_normal_msg(a_list[a_index])
elif isinstance(a_list[a_index], int):
self._add_alarm_msg(a_list[a_index])
self.compare_int(a_list[a_index], b_list[a_index])
self._remove_normal_msg(a_list[a_index])
def compare_string(self, a_string, b_string):
"""
Compare list object
"""
if a_string != b_string:
raise Exception((ErrorCode.GAUSS_357["GAUSS_35711"] %
self.collect_compare_result[-1]) +
"XML configure string item failed: {0} . "
"Static config {1}. "
"XML config {2}".format(self.collect_compare_result,
a_string, b_string))
def compare_int(self, a_integer, b_integer):
"""
Compare list object
"""
if a_integer != b_integer:
raise Exception((ErrorCode.GAUSS_357["GAUSS_35711"] %
self.collect_compare_result[-1]) +
"XML configure integer item failed. : {0} . "
"Static config {1}. "
"XML config {2}".format(self.collect_compare_result,
a_integer, b_integer))
def compare_dict(self, a_dict, b_dict):
"""
Compare dict object
"""
for a_key in a_dict.keys():
if a_key in ABORT_CHECK_PROPERTY:
self._remove_normal_msg(a_key)
continue
if type(a_dict.get(a_key)) is not type(b_dict.get(a_key)):
raise Exception("The value type of the XML configuration item [{0}] "
"is inconsistent with that "
"in the static configuration.".format(a_key))
self._add_alarm_msg(a_key)
if isinstance(a_dict.get(a_key), dict):
self.compare_dict(a_dict.get(a_key), b_dict.get(a_key))
self._remove_normal_msg(a_key)
elif isinstance(a_dict.get(a_key), list):
self.compare_list(a_dict.get(a_key), b_dict.get(a_key))
self._remove_normal_msg(a_key)
elif isinstance(a_dict.get(a_key), str):
pass
elif isinstance(a_dict.get(a_key), int):
pass
else:
raise Exception("Not support type. key is {0} , "
"static value is {1}, XML value is {2}, "
"type is {3}".format(a_key, a_dict.get(a_key),
b_dict.get(a_key), type(a_dict.get(a_key))))
def compare_cluster_info(self, static_cluster_info, xml_cluster_info):
"""
Compare cluster_info
"""
if len(static_cluster_info.dbNodes) >= len(xml_cluster_info.dbNodes):
raise Exception("XML configuration failed, "
"node count in expantion XML must be more than static file.")
self.compare_dict(static_cluster_info.__dict__, xml_cluster_info.__dict__)
if __name__ == "__main__":
"""
"""
expansion = Expansion()
expansion.checkExecutingUser()
expansion.parseCommandLine()
expansion.checkParameters()
expansion.initLogs()
expansion.getExpansionInfo()
expansion.checkEnvfile()
expansion.checkXmlIncludeNewHost()
expansion.checkExecutingHost()
expansion.checkTrust()
expansion.expand_run()