om 适配支持大版本升级

This commit is contained in:
gyt0221 2020-12-29 15:09:02 +08:00
parent 0fbbc08162
commit 618596a7f2
12 changed files with 3061 additions and 228 deletions

View File

@ -43,6 +43,7 @@ import os
import sys
import pwd
import grp
import copy
import socket
from gspylib.common.Common import DefaultValue
@ -213,6 +214,39 @@ General options:
self.initClusterInfoFromStaticFile(self.user)
self.logger.debug("Successfully init global infos")
def distributeFileToSpecialNode(self, file, destDir, hostList):
"""
distribute file to special node
:param file:
:param destDir:
:param hostList:
:return:
"""
if not hostList:
hostList = copy.deepcopy(self.clusterNodes)
else:
hostList = copy.deepcopy(hostList)
if DefaultValue.GetHostIpOrName() in hostList:
hostList.remove(DefaultValue.GetHostIpOrName())
self.logger.debug("Start copy file:{0} to hosts:{1}.".format(
file, hostList))
if not os.path.exists(file):
raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % file)
self.logger.debug("Distribute the file %s" % file)
retry = True
count = 0
while retry:
try:
if count > 4:
retry = False
self.sshTool.scpFiles(file, destDir, hostList)
retry = False
except Exception as e:
count += 1
self.logger.debug("Retry distributing xml command, "
"the {0} time.".format(count))
if __name__ == '__main__':
"""

View File

@ -109,6 +109,7 @@ from gspylib.common.VersionInfo import VersionInfo
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, \
algorithms, modes
import impl.upgrade.UpgradeConst as Const
noPassIPs = []
g_lock = thread.allocate_lock()
@ -3674,6 +3675,11 @@ class DefaultValue():
tarLists = "--exclude=script/*.log --exclude=*.log script " \
"version.cfg lib"
upgrade_sql_file_path = os.path.join(packageDir,
Const.UPGRADE_SQL_FILE)
if os.path.exists(upgrade_sql_file_path):
tarLists += " %s %s" % (Const.UPGRADE_SQL_SHA,
Const.UPGRADE_SQL_FILE)
if "HOST_IP" in os.environ.keys():
tarLists += " cluster_default_agent.xml"
try:
@ -4163,6 +4169,37 @@ class DefaultValue():
else:
return False
@staticmethod
def getPrimaryNode(userProfile):
"""
:param
:return: PrimaryNode
"""
try:
primaryFlag = "Primary"
count = 0
while count < 60:
count = 0
cmd = "source {0} && gs_om -t status --detail".format(
userProfile)
(status, output) = subprocess.getstatusoutput(cmd)
if status == 0:
break
time.sleep(10)
count += 1
if status != 0:
raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] %
"Command:%s. Error:\n%s" % (cmd, output))
targetString = output.split("Datanode")[1]
dnPrimary = [x for x in re.split(r"[|\n]", targetString)
if primaryFlag in x]
primaryList = []
for dn in dnPrimary:
primaryList.append(list(filter(None, dn.split(" ")))[1])
return primaryList
except Exception as e:
raise Exception(str(e))
class ClusterCommand():
'''

View File

@ -1385,9 +1385,9 @@ class dbClusterInfo():
"""
try:
with open(staticConfigFile, "rb") as fp:
info = fp.read(32)
info = fp.read(28)
(crc, lenth, version, currenttime, nodeNum,
localNodeId) = struct.unpack("=qIIqiI", info)
localNodeId) = struct.unpack("=IIIqiI", info)
except Exception as e:
raise Exception(
ErrorCode.GAUSS_512["GAUSS_51236"] + " Error: \n%s." % str(e))
@ -2062,12 +2062,22 @@ class dbClusterInfo():
# find the path from right to left
self.logPath = logPathWithUser[
0:(logPathWithUser.rfind(splitMark))]
staticConfigFilePath = os.path.split(staticConfigFile)[0]
versionFile = os.path.join(
staticConfigFilePath, "upgrade_version")
version, number, commitid = VersionInfo.get_version_info(
versionFile)
try:
# read static_config_file
fp = open(staticConfigFile, "rb")
info = fp.read(32)
(crc, lenth, version, currenttime, nodeNum,
localNodeId) = struct.unpack("=qIIqiI", info)
if float(number) <= 92.200:
info = fp.read(32)
(crc, lenth, version, currenttime, nodeNum,
localNodeId) = struct.unpack("=qIIqiI", info)
else:
info = fp.read(28)
(crc, lenth, version, currenttime, nodeNum,
localNodeId) = struct.unpack("=IIIqiI", info)
self.version = version
self.installTime = currenttime
self.localNodeId = localNodeId
@ -2110,7 +2120,7 @@ class dbClusterInfo():
for i in range(nodeNum):
offset = (fp.tell() // PAGE_SIZE + 1) * PAGE_SIZE
fp.seek(offset)
dbNode = self.__unPackNodeInfo(fp, isLCCluster)
dbNode = self.__unPackNodeInfo(fp, number, isLCCluster)
self.dbNodes.append(dbNode)
fp.close()
except Exception as e:
@ -2122,14 +2132,18 @@ class dbClusterInfo():
fp.close()
raise Exception(str(e))
def __unPackNodeInfo(self, fp, isLCCluster=False):
def __unPackNodeInfo(self, fp, number, isLCCluster=False):
"""
function : unpack a node config info
input : file
output : Object
"""
info = fp.read(76)
(crc, nodeId, nodeName) = struct.unpack("=qI64s", info)
if float(number) <= 92.200:
info = fp.read(76)
(crc, nodeId, nodeName) = struct.unpack("=qI64s", info)
else:
info = fp.read(72)
(crc, nodeId, nodeName) = struct.unpack("=II64s", info)
nodeName = nodeName.decode().strip('\x00')
dbNode = dbNodeInfo(nodeId, nodeName)
info = fp.read(68)
@ -2414,11 +2428,21 @@ class dbClusterInfo():
"""
fp = None
try:
staticConfigFilePath = os.path.split(staticConfigFile)[0]
versionFile = os.path.join(
staticConfigFilePath, "upgrade_version")
version, number, commitid = VersionInfo.get_version_info(
versionFile)
# read cluster info from static config file
fp = open(staticConfigFile, "rb")
info = fp.read(32)
(crc, lenth, version, currenttime, nodeNum,
localNodeId) = struct.unpack("=qIIqiI", info)
if float(number) <= 92.200:
info = fp.read(32)
(crc, lenth, version, currenttime, nodeNum,
localNodeId) = struct.unpack("=qIIqiI", info)
else:
info = fp.read(28)
(crc, lenth, version, currenttime, nodeNum,
localNodeId) = struct.unpack("=IIIqiI", info)
if (version <= 100):
raise Exception(ErrorCode.GAUSS_516["GAUSS_51637"]
% ("cluster static config version[%s]"
@ -2452,7 +2476,7 @@ class dbClusterInfo():
for i in range(nodeNum):
offset = (fp.tell() // PAGE_SIZE + 1) * PAGE_SIZE
fp.seek(offset)
dbNode = self.__unPackNodeInfo(fp)
dbNode = self.__unPackNodeInfo(fp, number)
self.dbNodes.append(dbNode)
fp.close()
except Exception as e:
@ -4215,9 +4239,8 @@ class dbClusterInfo():
raise Exception(ErrorCode.GAUSS_532["GAUSS_53200"])
if peerNum > 8:
raise Exception(ErrorCode.GAUSS_512["GAUSS_51230"] % \
("database node standbys", "be less than 5")
+ " Please set it.")
raise Exception(ErrorCode.GAUSS_512["GAUSS_51230"] % (
"database node standbys", "be less than 9") + " Please set it.")
@ -4410,13 +4433,21 @@ class dbClusterInfo():
else:
return instances
def saveToStaticConfig(self, filePath, localNodeId, dbNodes=None):
def saveToStaticConfig(self, filePath, localNodeId, dbNodes=None,
upgrade=False):
"""
function : Save cluster info into to static config
input : String,int
output : NA
"""
fp = None
number = None
if upgrade:
staticConfigFilePath = os.path.split(filePath)[0]
versionFile = os.path.join(
staticConfigFilePath, "upgrade_version")
version, number, commitid = VersionInfo.get_version_info(
versionFile)
try:
if (dbNodes is None):
dbNodes = self.dbNodes
@ -4434,14 +4465,20 @@ class dbClusterInfo():
info += struct.pack("I", localNodeId)
crc = binascii.crc32(info)
info = struct.pack("q", crc) + info
if upgrade:
if float(number) <= 92.200:
info = struct.pack("q", crc) + info
else:
info = struct.pack("I", crc) + info
else:
info = struct.pack("I", crc) + info
fp.write(info)
for dbNode in dbNodes:
offset = (fp.tell() // PAGE_SIZE + 1) * PAGE_SIZE
fp.seek(offset)
info = self.__packNodeInfo(dbNode)
info = self.__packNodeInfo(dbNode, number, upgrade=upgrade)
fp.write(info)
endBytes = PAGE_SIZE - fp.tell() % PAGE_SIZE
if (endBytes != PAGE_SIZE):
@ -4457,7 +4494,7 @@ class dbClusterInfo():
"static configuration file"
+ " Error: \n%s" % str(e))
def __packNodeInfo(self, dbNode):
def __packNodeInfo(self, dbNode, number, upgrade=False):
"""
function : Pack the info of node
input : []
@ -4493,7 +4530,13 @@ class dbClusterInfo():
info += struct.pack("I", 0)
crc = binascii.crc32(info)
return struct.pack("q", crc) + info
if upgrade:
if float(number) <= 92.200:
return struct.pack("q", crc) + info
else:
return struct.pack("I", crc) + info
else:
return struct.pack("I", crc) + info
def __packNodeInfoForLC(self, dbNode):
"""
@ -4516,7 +4559,7 @@ class dbClusterInfo():
info += struct.pack("I", 0)
crc = binascii.crc32(info)
return struct.pack("q", crc) + info
return struct.pack("I", crc) + info
def __packEtcdInfo(self, dbNode):
"""
@ -5936,7 +5979,7 @@ class dbClusterInfo():
# node count
info += struct.pack("I", len(self.dbNodes))
crc = binascii.crc32(info)
info = struct.pack("q", crc) + info
info = struct.pack("I", crc) + info
fp.write(info)
primaryDnNum = 0
for dbNode in self.dbNodes:
@ -6039,7 +6082,7 @@ class dbClusterInfo():
info += struct.pack("I", 0)
info += struct.pack("I", 0)
crc = binascii.crc32(info)
return (primaryNum, struct.pack("q", crc) + info)
return (primaryNum, struct.pack("I", crc) + info)
def __getClusterSwitchTime(self, dynamicConfigFile):
"""
@ -6051,9 +6094,9 @@ class dbClusterInfo():
fp = None
try:
fp = open(dynamicConfigFile, "rb")
info = fp.read(28)
info = fp.read(24)
(crc, lenth, version, switchTime, nodeNum) = \
struct.unpack("=qIIqi", info)
struct.unpack("=IIIqi", info)
fp.close()
except Exception as e:
if fp:
@ -6189,9 +6232,9 @@ class dbClusterInfo():
dynamicConfigFile = self.__getDynamicConfig(user)
# read dynamic_config_file
fp = open(dynamicConfigFile, "rb")
info = fp.read(28)
info = fp.read(24)
(crc, lenth, version, currenttime, nodeNum) = \
struct.unpack("=qIIqi", info)
struct.unpack("=IIIqi", info)
totalMaterDnNum = 0
for i in range(nodeNum):
offset = (fp.tell() // PAGE_SIZE + 1) * PAGE_SIZE
@ -6210,8 +6253,8 @@ class dbClusterInfo():
dynamicConfigFile + " Error:\n" + str(e))
def __unpackDynamicNodeInfo(self, fp):
info = fp.read(76)
(crc, nodeId, nodeName) = struct.unpack("=qI64s", info)
info = fp.read(72)
(crc, nodeId, nodeName) = struct.unpack("=II64s", info)
nodeName = nodeName.decode().strip('\x00')
dbNode = dbNodeInfo(nodeId, nodeName)
info = fp.read(4)

View File

@ -229,6 +229,60 @@ class OMCommand():
except Exception as e:
raise Exception(str(e))
@staticmethod
def doCheckStaus(user, nodeId, cluster_normal_status=None,
expected_redistributing=""):
"""
function: Check cluster status
input : user, nodeId, cluster_normal_status, expected_redistributing
output: status, output
"""
try:
statusFile = "/home/%s/gauss_check_status_%d.dat" % (
user, os.getpid())
TempfileManagement.removeTempFile(statusFile)
cmd = ClusterCommand.getQueryStatusCmd(user, "", statusFile)
(status, output) = subprocess.getstatusoutput(cmd)
if status != 0:
TempfileManagement.removeTempFile(statusFile)
return (status, output)
clusterStatus = DbClusterStatus()
clusterStatus.initFromFile(statusFile)
TempfileManagement.removeTempFile(statusFile)
except Exception as e:
DefaultValue.cleanTmpFile(statusFile)
raise Exception(
ErrorCode.GAUSS_516["GAUSS_51600"] + "Error: %s." % str(e))
status = 0
output = ""
statusRep = None
if nodeId > 0:
nodeStatus = clusterStatus.getDbNodeStatusById(nodeId)
if nodeStatus is None:
raise Exception(ErrorCode.GAUSS_516["GAUSS_51619"] % nodeId)
status = 0 if nodeStatus.isNodeHealthy() else 1
statusRep = nodeStatus.getNodeStatusReport()
else:
status = 0 if clusterStatus.isAllHealthy(cluster_normal_status) \
and (clusterStatus.redistributing ==
expected_redistributing or
expected_redistributing == "") else 1
statusRep = clusterStatus.getClusterStatusReport()
output += "cluster_state : %s\n" % clusterStatus.clusterStatus
output += "redistributing : %s\n" % clusterStatus.redistributing
output += "node_count : %d\n" % statusRep.nodeCount
output += "Datanode State\n"
output += " primary : %d\n" % statusRep.dnPrimary
output += " standby : %d\n" % statusRep.dnStandby
output += " secondary : %d\n" % statusRep.dnDummy
output += " building : %d\n" % statusRep.dnBuild
output += " abnormal : %d\n" % statusRep.dnAbnormal
output += " down : %d\n" % statusRep.dnDown
return (status, output)
@staticmethod
def getClusterStatus(user, isExpandScene=False):
"""

View File

@ -790,7 +790,7 @@ class ParallelBaseOM(object):
return output.strip()
def killKernalSnapshotThread(self, coorInst):
def killKernalSnapshotThread(self, dnInst):
"""
function: kill snapshot thread in Kernel,
avoid dead lock with redistribution)
@ -801,7 +801,7 @@ class ParallelBaseOM(object):
killSnapshotSQL = "select * from kill_snapshot();"
(status, output) = ClusterCommand.remoteSQLCommand(
killSnapshotSQL, self.user, coorInst.hostname, coorInst.port,
killSnapshotSQL, self.user, dnInst.hostname, dnInst.port,
False, DefaultValue.DEFAULT_DB_NAME)
if (status != 0):
raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] %

View File

@ -67,10 +67,16 @@ class Kernel(BaseComponent):
"""
def start(self, time_out=DefaultValue.TIMEOUT_CLUSTER_START,
security_mode="off"):
security_mode="off", cluster_number=None):
"""
"""
cmd = "%s/gs_ctl start -D %s " % (self.binPath, self.instInfo.datadir)
if cluster_number:
cmd = "%s/gs_ctl start -o '-u %s' -D %s " % (
self.binPath, int(float(cluster_number) * 1000),
self.instInfo.datadir)
else:
cmd = "%s/gs_ctl start -D %s " % (
self.binPath, self.instInfo.datadir)
if self.instInfo.instanceType == DefaultValue.MASTER_INSTANCE:
if len(self.instInfo.peerInstanceInfos) > 0:
cmd += "-M primary"

View File

@ -32,6 +32,7 @@ from gspylib.common.ErrorCode import ErrorCode
from gspylib.os.gsfile import g_file
from gspylib.os.gsfile import g_Platform
from gspylib.common.VersionInfo import VersionInfo
import impl.upgrade.UpgradeConst as Const
sys.path.append(sys.path[0] + "/../../../lib/")
DefaultValue.doConfigForParamiko()
@ -414,6 +415,10 @@ class PostUninstallImpl:
g_file.removeDirectory(path)
path = "%s/sctp_patch" % (self.clusterToolPath)
g_file.removeDirectory(path)
path = "%s/%s" % (Const.UPGRADE_SQL_FILE, self.clusterToolPath)
g_file.removeFile(path)
path = "%s/%s" % (Const.UPGRADE_SQL_SHA, self.clusterToolPath)
g_file.removeFile(path)
self.logger.debug(
"Deleting environmental software of local nodes.")

View File

@ -53,6 +53,18 @@ ACTION_INPLACE_RESTORE = "inplace_restore"
ACTION_CHECK_GUC = "check_guc"
ACTION_BACKUP_HOTPATCH = "backup_hotpatch"
ACTION_ROLLBACK_HOTPATCH = "rollback_hotpatch"
ACTION_UPGRADE_SQL_FOLDER = "prepare_upgrade_sql_folder"
ACTION_BACKUP_OLD_CLUSTER_DB_AND_REL = "backup_old_cluster_db_and_rel"
ACTION_UPDATE_CATALOG = "update_catalog"
ACTION_BACKUP_OLD_CLUSTER_CATALOG_PHYSICAL_FILES = \
"backup_old_cluster_catalog_physical_files"
ACTION_RESTORE_OLD_CLUSTER_CATALOG_PHYSICAL_FILES = \
"restore_old_cluster_catalog_physical_files"
ACTION_CLEAN_OLD_CLUSTER_CATALOG_PHYSICAL_FILES = \
"clean_old_cluster_catalog_physical_files"
ACTION_REPLACE_PG_PROC_FILES = "replace_pg_proc_files"
ACTION_CREATE_PG_PROC_MAPPING_FILE = "create_pg_proc_mapping_file"
ACTION_CREATE_NEW_CSV_FILE = "create_new_csv_file"
OPTION_PRECHECK = "before"
OPTION_POSTCHECK = "after"
@ -61,7 +73,7 @@ GREY_UPGRADE_STEP_FILE = "upgrade_step.csv"
CLUSTER_CMSCONF_FILE = "cluster_cmsconf.json"
CLUSTER_CNSCONF_FILE = "cluster_cnconf.json"
READONLY_MODE = "read_only_mode"
TMP_DYNAMIC_DN_INFO = "upgrade_gauss_dn_status.dat"
#step flag
BINARY_UPGRADE_NO_NEED_ROLLBACK = -2
INVALID_UPRADE_STEP = -1
@ -95,6 +107,11 @@ BACKUP_DIR_LIST = ['global', 'pg_clog', 'pg_xlog', 'pg_multixact',
'pg_replslot', 'pg_notify', 'pg_subtrans', 'pg_cbm',
'pg_twophase']
BACKUP_DIR_LIST_BASE = ['global', 'pg_clog', 'pg_csnlog']
BACKUP_DIR_LIST_64BIT_XID = ['pg_multixact', 'pg_replslot', 'pg_notify',
'pg_subtrans', 'pg_twophase']
FIRST_GREY_UPGRADE_NUM = 92
UPGRADE_PRECOMMIT_NUM = 0.001
@ -115,6 +132,7 @@ UPGRADE_SCHEMA = "on_upgrade_69954349032535120"
RECORD_NODE_STEP = "record_node_step"
READ_STEP_FROM_FILE_FLAG = "read_step_from_file_flag"
RECORD_UPGRADE_DIR = "record_app_directory"
XLOG_BACKUP_INFO = "xlog_backup_info.json"
OLD = "old"
NEW = "new"
# upgrade sql sha file and sql file
@ -124,3 +142,4 @@ UPGRADE_SQL_FILE = "upgrade_sql.tar.gz"
COMBIN_NUM = 30
ON_INPLACE_UPGRADE = "IsInplaceUpgrade"
MAX_APP_SIZE = 2000
UPGRADE_VERSION_64bit_xid = 91.208

File diff suppressed because it is too large Load Diff

View File

@ -46,6 +46,7 @@ class Start(LocalBaseOM):
self.logger = None
self.installPath = ""
self.security_mode = ""
self.cluster_number = None
def usage(self):
"""
@ -72,7 +73,8 @@ General options:
"""
try:
opts, args = getopt.getopt(sys.argv[1:], "U:D:R:l:t:h?",
["help", "security-mode="])
["help", "security-mode=",
"cluster_number="])
except getopt.GetoptError as e:
GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50000"] % str(e))
@ -96,6 +98,8 @@ General options:
sys.exit(0)
elif key == "--security-mode":
self.security_mode = value
elif key == "--cluster_number":
self.cluster_number = value
else:
GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50000"]
% key)
@ -134,7 +138,10 @@ General options:
for dn in self.dnCons:
if self.dataDir != "" and dn.instInfo.datadir != self.dataDir:
continue
dn.start(self.time_out, self.security_mode)
if self.cluster_number:
dn.start(self.time_out, self.security_mode, self.cluster_number)
else:
dn.start(self.time_out, self.security_mode)
isDataDirCorrect = True
if not isDataDirCorrect:

View File

@ -36,6 +36,7 @@ from gspylib.os.gsnetwork import g_network
from gspylib.os.gsservice import g_service
from gspylib.common.LocalBaseOM import LocalBaseOM
from gspylib.os.gsfile import g_Platform
import impl.upgrade.UpgradeConst as Const
ACTION_CLEAN_SYSLOG_CONFIG = 'clean_syslog_config'
ACTION_CLEAN_TOOL_ENV = 'clean_tool_env'
@ -361,6 +362,10 @@ class Postuninstall(LocalBaseOM):
g_file.removeDirectory(path)
path = "%s/unixodbc" % self.clusterToolPath
g_file.removeDirectory(path)
path = "%s/%s" % (self.clusterToolPath, Const.UPGRADE_SQL_FILE)
g_file.removeFile(path)
path = "%s/%s" % (self.clusterToolPath, Const.UPGRADE_SQL_SHA)
g_file.removeFile(path)
self.logger.debug(
"Successfully cleaned the environmental software and variable.")

File diff suppressed because it is too large Load Diff