Files
openGauss-OM/script/gs_upgradectl
coolany 5177810e8c update script/gs_upgradectl.
Signed-off-by: coolany <kyosang@163.com>
2022-12-17 06:43:13 +00:00

482 lines
19 KiB
Python

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#############################################################################
# Copyright (c) 2020 Huawei Technologies Co.,Ltd.
#
# openGauss is licensed under Mulan PSL v2.
# You can use this software according to the terms
# and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
# http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
# WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# ----------------------------------------------------------------------------
# Description : gs_upgradectl is a utility to upgrade a Gauss200 application.
#
# gs_upgradectl is a upgrade framework,which control the upgrade process.
# it contains binary upgrade, in-place upgrade and on-line binary upgrade.
#
# binary upgrade: which includes stopping old cluster,
# replacing binary and starting
# new cluster,only used for no database objects changed between old cluster
# and new cluster.
#
# on-line binary upgrade: rolling upgrade, upgrade standby instances
# firstly, switch over,
# and then upgrade the master instances. only used for no database objects
# changed
# between old cluster and new cluster now.
#
# in-place upgrade: which includes binary upgrade and update database
# mete-data(system tables,
# system views, functions, and so on) ,used for some database objects had
# been changed
# between old cluster and new cluster.
#############################################################################
import os
import sys
import pwd
import grp
import copy
import re
import json
from gspylib.common.Common import DefaultValue
from gspylib.common.GaussLog import GaussLog
from gspylib.common.ParallelBaseOM import ParallelBaseOM
from gspylib.threads.SshTool import SshTool
from gspylib.common.ErrorCode import ErrorCode
from gspylib.common.ParameterParsecheck import Parameter
import impl.upgrade.UpgradeConst as Const
from impl.upgrade.OLAP.UpgradeImplOLAP import UpgradeImplOLAP
from impl.upgrade.upgrade_cm_impl import UpgradeCmImpl
from domain_utils.cluster_file.cluster_log import ClusterLog
from base_utils.os.env_util import EnvUtil
from base_utils.os.net_util import NetUtil
from domain_utils.domain_common.cluster_constants import ClusterConstants
class DualUpgradeShareInfo:
"""
Used to record the upgrade status information of the primary and standby clusters
"""
def __init__(self, jsonInfo=None):
# If the Json string is passed in, the Json information is used to initialize the class
if jsonInfo:
self.__dict__ = jsonInfo
else:
self.masterVersion = ""
self.masterUpgradeStatus = 0
self.standbyVersion = ""
self.standbyUpgradeStatus = 0
class Upgrade(ParallelBaseOM):
"""
The class about upgrade
"""
def __init__(self):
ParallelBaseOM.__init__(self)
self.oldClusterInfo = ""
self.oldVersion = ""
# the directory that when do binary upgrade the information store
self.upgradeBackupPath = ""
self.userProfile = ""
self.tmpDir = ""
self.newClusterAppPath = ""
self.oldClusterAppPath = ""
self.clusterNodes = []
self.nodesNum = -1
self.nodeNames = []
##static parameter
self.binTarName = "binary_%s.tar" % NetUtil.GetHostIpOrName()
self.rollback = False
self.is_inplace_upgrade = True
self.is_grey_upgrade = False
self.guc_paras = {}
self.newClusterVersion = None
self.newClusterNumber = None
self.oldclusterVersion = None
self.oldClusterNumber = None
self.forceRollback = False
self.upgrade_remain = False
# Record the upgrade status information under dual clusters
self.dualUpgradeShareInfo = None
# Record the primary cluster or the standby cluster, dual-primary or dual-standby
self.clusterType = ""
# Whether it is a standby cluster in a dual cluster. Convenient to judge
self.standbyCluster = False
# The path to record the information of each cluster upgrade stage in the dual cluster
self.upgradePhaseInfoPath = ""
self.upgrade_action = ""
self.upgrade_package = ""
def usage(self):
"""
gs_upgradectl is a utility to upgrade a cluster.
Usage:
gs_upgradectl -? | --help
gs_upgradectl -V | --version
gs_upgradectl -t chose-strategy [-l LOGFILE]
gs_upgradectl -t auto-upgrade -X XMLFILE [-l LOGFILE] [--grey]
gs_upgradectl -t auto-rollback -X XMLFILE [-l LOGFILE] [--force]
gs_upgradectl -t commit-upgrade -X XMLFILE [-l LOGFILE]
gs_upgradectl -t upgrade-cm --upgrade-package PACKAGE_PATH
General options:
-?, --help Show help information for this utility,
and exit the command line mode.
-V, --version Show version information.
-t Subcommand for upgrade. It can be
chose-strategy, auto-upgrade, auto-rollback,
commit-upgrade, upgrade-cm.
-X Path of the XML configuration file of the
later version cluster.
-l Path of log file.
--force Force to rollback when cluster status is
not normal
--grey Use grey-binary-upgrade
--upgrade-package Path of the CM component package
Option for grey upgrade
-h Under grey upgrade, specified nodes name.
--continue Under grey upgrade, continue to upgrade
remain nodes.
"""
print(self.usage.__doc__)
def parseCommandLine(self):
"""
Parse command line and save to global variable
"""
# Resolves incoming parameters
ParaObj = Parameter()
ParaDict = ParaObj.ParameterCommandLine("upgradectl")
if "helpFlag" in ParaDict.keys():
self.usage()
sys.exit(0)
# get action information
if "action" in ParaDict.keys():
self.action = ParaDict.get("action")
if "confFile" in ParaDict.keys():
self.xmlFile = ParaDict.get("confFile")
# get logFile information
if "logFile" in ParaDict.keys():
self.logFile = ParaDict.get("logFile")
if "grey" in ParaDict.keys():
self.is_grey_upgrade = True
self.is_inplace_upgrade = False
if "nodename" in ParaDict.keys():
self.nodeNames = ParaDict.get("nodename")
if "continue" in ParaDict.keys():
self.upgrade_remain = True
if "force" in ParaDict.keys():
self.forceRollback = True
if "upgrade-package" in ParaDict.keys():
self.upgrade_package = ParaDict.get("upgrade-package")
self.tmpDir = EnvUtil.getTmpDirFromEnv()
if self.tmpDir == "":
raise Exception(ErrorCode.GAUSS_518["GAUSS_51800"] % "$PGHOST")
self.upgradePhaseInfoPath = os.path.join(self.tmpDir, Const.UPGRADE_PHASE_INFO)
def checkUser(self):
"""
function: check user
"""
# check user
# it will do more check about user after get the cluster config info
# get user information
self.user = pwd.getpwuid(os.getuid()).pw_name
# get group information
self.group = grp.getgrgid(pwd.getpwnam(self.user).pw_gid).gr_name
# if the user or group is null, exit
if self.user == "" or self.group == "":
raise Exception(ErrorCode.GAUSS_503["GAUSS_50308"])
# if the user or group is 'root', exit
if self.user == "root" or self.group == "root":
raise Exception(ErrorCode.GAUSS_501["GAUSS_50105"])
# we must make sure the env 'GAUSSHOME', 'GS_CLUSTER_NAME',
# 'GAUSS_ENV' exists
if (EnvUtil.getEnvironmentParameterValue("GAUSSHOME",
self.user) == ""):
raise Exception(ErrorCode.GAUSS_518["GAUSS_51800"] % "$GAUSSHOME")
if (EnvUtil.getEnvironmentParameterValue("GS_CLUSTER_NAME",
self.user) == ""):
raise Exception(
ErrorCode.GAUSS_518["GAUSS_51800"] % "$GS_CLUSTER_NAME")
if (EnvUtil.getEnvironmentParameterValue("GAUSS_ENV",
self.user) == ""):
raise Exception(ErrorCode.GAUSS_518["GAUSS_51800"] % "$GAUSS_ENV")
# depending on the environment variable GPHOME, access to the python
GPHOME = EnvUtil.getEnv(ClusterConstants.TOOL_PATH_ENV)
if (GPHOME == None or GPHOME == ""):
raise Exception(ErrorCode.GAUSS_518["GAUSS_51800"] % "$GPHOME")
def checkParameter(self):
"""
function: Check parameter from command line
"""
if self.action == "":
raise Exception(ErrorCode.GAUSS_500["GAUSS_50001"] % "t" + ".")
# when we do auto-upgrade, auto-rollback or commit-upgrade,
# we must incoming '-X' and make sure the xml file exists.
if self.action not in [Const.ACTION_CHOSE_STRATEGY, Const.ACTION_UPGRADE_CM]:
if self.xmlFile == "":
raise Exception(ErrorCode.GAUSS_500["GAUSS_50001"] % 'X' + ".")
if not os.path.exists(self.xmlFile):
raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] %
self.xmlFile)
# check parameter base on different action
# check the param which input
if self.action == Const.ACTION_AUTO_UPGRADE:
if self.is_grey_upgrade:
self.checkCommandConflicts(inplace=False)
else:
self.checkCommandConflicts()
# check mpprc file path
# get mpprcFile by MPPDB_ENV_SEPARATE_PATH. Even if the return value
# is "" or None, no need to pay attention
self.mpprcFile = EnvUtil.getEnv(DefaultValue.MPPRC_FILE_ENV)
# make sure which env file we use
# If self.mpprcFile is not "" and None, return self.mpprcFile; else
# return '~/.bashrc'
self.userProfile = EnvUtil.getMpprcFile()
self.checkUser()
# check log file
if self.logFile == "":
self.logFile = ClusterLog.getOMLogPath(
ClusterConstants.UPGRADE_LOG_FILE, self.user, "", "")
if not os.path.isabs(self.logFile):
raise Exception(ErrorCode.GAUSS_502["GAUSS_50213"] % "log")
self.initLogger(self.action)
def execCommandInSpecialNode(self, cmd, hosts, retry_times=2, time_out=0):
if not hosts:
host_list = copy.deepcopy(self.clusterNodes)
else:
host_list = copy.deepcopy(hosts)
self.logger.debug("Commanded: exec cmd in the hosts {0}".format(host_list))
failed_host = []
fail_msg = ""
retry = True
count = 0
status, output = dict(), dict()
try:
while retry:
try:
(status, output) = self.sshTool.getSshStatusOutput(cmd, host_list)
for key, val in list(status.items()):
if DefaultValue.SUCCESS in val:
host_list.remove(key)
if host_list:
count += 1
if not host_list or count >= retry_times:
retry = False
except Exception as _:
count += 1
retry = False if not host_list or count >= retry_times else True
for key, val in list(status.items()):
if DefaultValue.FAILURE in val:
failed_host.append(key)
fail_msg += val
finally:
if failed_host:
local_file_msg = ""
if self.localLog in cmd:
local_log = GaussLog(self.localLog)
local_file_msg = "\n For details about this error, see the file: {0} " \
"on the host {1}".format(local_log.logFile, failed_host)
replace_reg = re.compile(r'-W[ ]*[^ ]*[ ]*')
cmd_hide_passwd = replace_reg.sub('-W *** ', cmd)
raise Exception(ErrorCode.GAUSS_535["GAUSS_53507"] % cmd_hide_passwd + local_file_msg + output)
def initGlobalInfos(self):
"""
function: init global infos
"""
self.logger.debug("Init global infos")
# init cluster info
if self.xmlFile:
self.initClusterInfo()
else:
self.initClusterInfoFromStaticFile(self.user)
# init clusterNodes
for dbNode in self.clusterInfo.dbNodes:
self.clusterNodes.append(dbNode.name)
if len(self.clusterNodes) == 0:
raise Exception(ErrorCode.GAUSS_512["GAUSS_51201"])
# verify whether it is single cluster, isSingle means single node
if len(self.clusterNodes) == 1:
self.isSingle = True
for nodeName in self.nodeNames:
if nodeName not in self.clusterNodes:
raise Exception(ErrorCode.GAUSS_516["GAUSS_51619"] % nodeName)
self.logger.debug("Successfully init global infos")
# If it is a dual-cluster, initialize the related information of the dual-cluster
self.initDualUpgradeInfo()
def initDualUpgradeInfo(self):
"""
initialize dual cluster upgrade status information
If it is not a dual cluster, do not initialize
:return:
"""
if os.path.exists(self.upgradePhaseInfoPath):
if self.is_inplace_upgrade and self.action not in \
["commit-upgrade", "auto-rollback", "chose-strategy"]:
raise Exception("Dual cluster does not support in-place upgrade")
self.dualUpgradeShareInfo = self.getDualUpgradeInfo(self.upgradePhaseInfoPath,
startPost=0)
if not self.dualUpgradeShareInfo:
self.dualUpgradeShareInfo = DualUpgradeShareInfo()
@staticmethod
def getDualUpgradeInfo(filePath, startPost):
"""
Obtain the dual-cluster upgrade status information from the file,
and return None if there is no record
:return:
"""
if os.path.exists(filePath):
lenInfo = 0
with open(filePath, 'r') as shareInfo:
shareInfo.seek(startPost)
length = shareInfo.read(4)
if length > '':
try:
lenInfo = int(length)
except Exception as _:
lenInfo = 0
if lenInfo > 0:
shareInfo.seek(startPost + 4)
return json.loads(shareInfo.read(lenInfo), object_hook=DualUpgradeShareInfo)
return None
def updateDualUpgradeInfo(self, dualUpgradeShareInfo, filePath, startPost):
"""
Update the upgrade information of the cluster to the dual-cluster
shared file /dev/my_disk_sync_disk file
:return:
"""
if os.path.exists(filePath):
with os.fdopen(os.open(filePath, os.O_WRONLY, 0o600), "w") as shareInfo:
shareInfo.seek(startPost + Const.LENGTH_STORAGE_INFO_LEN)
shareInfo.write(json.dumps(dualUpgradeShareInfo, default=lambda obj: obj.__dict__))
length = shareInfo.tell() - (startPost + Const.LENGTH_STORAGE_INFO_LEN)
shareInfo.seek(startPost, 0)
shareInfo.write("{0:04d}".format(length))
# After the status file is updated, the standby cluster
# distributes the updated status file to the data directory of the DN.
for dbNode in self.clusterInfo.dbNodes:
for dnInst in dbNode.datanodes:
self.sshTool.scpFiles(filePath, dnInst.datadir,
hostList=[dnInst.hostname])
else:
raise Exception("{0} file does not exist and cannot be updated".format(filePath))
def distributeFileToSpecialNode(self, file, destDir, hostList):
"""
distribute file to special node
:param file:
:param destDir:
:param hostList:
:return:
"""
if not hostList:
hostList = copy.deepcopy(self.clusterNodes)
else:
hostList = copy.deepcopy(hostList)
if NetUtil.GetHostIpOrName() in hostList:
hostList.remove(NetUtil.GetHostIpOrName())
self.logger.debug("Start copy file:{0} to hosts:{1}.".format(
file, hostList))
if not os.path.exists(file):
raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % file)
self.logger.debug("Distribute the file %s" % file)
retry = True
count = 0
while retry:
try:
if count > 4:
retry = False
self.sshTool.scpFiles(file, destDir, hostList)
retry = False
except Exception as e:
count += 1
self.logger.debug("Retry distributing xml command, "
"the {0} time.".format(count))
def checkCommandConflicts(self, inplace=True):
"""
function: check the command line for conflict input
return:
"""
conflictPara = 0
if self.upgrade_remain:
conflictPara += 1
if len(self.nodeNames) != 0:
conflictPara += 1
if inplace:
if conflictPara > 0:
raise Exception("The parameter %s should be used in grey "
"upgrade" % "'--continue, -h'")
else:
if conflictPara > 1:
GaussLog.exitWithError(ErrorCode.Gauss_500["GAUSS_50005"] % (
"--continue", "-h"))
if __name__ == '__main__':
"""
main function
"""
if os.getuid() == 0:
GaussLog.exitWithError(ErrorCode.GAUSS_501["GAUSS_50105"])
try:
REPEAT = False
upgrade = Upgrade()
upgrade.parseCommandLine()
upgrade.checkParameter()
# set action flag file
DefaultValue.setActionFlagFile("gs_upgradectl")
if upgrade.action == Const.ACTION_UPGRADE_CM:
impl = UpgradeCmImpl(upgrade)
else:
upgrade.initGlobalInfos()
impl = UpgradeImplOLAP(upgrade)
impl.run()
except Exception as e:
if REPEAT:
upgrade.sshTool = SshTool(upgrade.clusterNodes,
DefaultValue.TIMEOUT_PSSH_COMMON)
GaussLog.exitWithError(str(e))
finally:
DefaultValue.setActionFlagFile("gs_upgradectl",False)