From 618596a7f26903a31b583f48ba383dec194203f9 Mon Sep 17 00:00:00 2001 From: gyt0221 <846772234@qq.com> Date: Tue, 29 Dec 2020 15:09:02 +0800 Subject: [PATCH] =?UTF-8?q?om=20=E9=80=82=E9=85=8D=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=A4=A7=E7=89=88=E6=9C=AC=E5=8D=87=E7=BA=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- script/gs_upgradectl | 34 + script/gspylib/common/Common.py | 37 + script/gspylib/common/DbClusterInfo.py | 103 +- script/gspylib/common/OMCommand.py | 54 + script/gspylib/common/ParallelBaseOM.py | 4 +- script/gspylib/component/Kernel/Kernel.py | 10 +- .../impl/postuninstall/PostUninstallImpl.py | 5 + script/impl/upgrade/UpgradeConst.py | 21 +- script/impl/upgrade/UpgradeImpl.py | 1466 +++++++++++++++- script/local/StartInstance.py | 11 +- script/local/UnPreInstallUtility.py | 5 + script/local/UpgradeUtility.py | 1539 +++++++++++++++-- 12 files changed, 3061 insertions(+), 228 deletions(-) diff --git a/script/gs_upgradectl b/script/gs_upgradectl index db13dc1..da38139 100644 --- a/script/gs_upgradectl +++ b/script/gs_upgradectl @@ -43,6 +43,7 @@ import os import sys import pwd import grp +import copy import socket from gspylib.common.Common import DefaultValue @@ -213,6 +214,39 @@ General options: self.initClusterInfoFromStaticFile(self.user) self.logger.debug("Successfully init global infos") + def distributeFileToSpecialNode(self, file, destDir, hostList): + """ + distribute file to special node + :param file: + :param destDir: + :param hostList: + :return: + """ + if not hostList: + hostList = copy.deepcopy(self.clusterNodes) + else: + hostList = copy.deepcopy(hostList) + if DefaultValue.GetHostIpOrName() in hostList: + hostList.remove(DefaultValue.GetHostIpOrName()) + + self.logger.debug("Start copy file:{0} to hosts:{1}.".format( + file, hostList)) + if not os.path.exists(file): + raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % file) + self.logger.debug("Distribute the file %s" % file) + retry = True + count = 0 + while retry: + try: + if count > 4: + retry = False + self.sshTool.scpFiles(file, destDir, hostList) + retry = False + except Exception as e: + count += 1 + self.logger.debug("Retry distributing xml command, " + "the {0} time.".format(count)) + if __name__ == '__main__': """ diff --git a/script/gspylib/common/Common.py b/script/gspylib/common/Common.py index 808df34..9458cd5 100644 --- a/script/gspylib/common/Common.py +++ b/script/gspylib/common/Common.py @@ -109,6 +109,7 @@ from gspylib.common.VersionInfo import VersionInfo from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, \ algorithms, modes +import impl.upgrade.UpgradeConst as Const noPassIPs = [] g_lock = thread.allocate_lock() @@ -3674,6 +3675,11 @@ class DefaultValue(): tarLists = "--exclude=script/*.log --exclude=*.log script " \ "version.cfg lib" + upgrade_sql_file_path = os.path.join(packageDir, + Const.UPGRADE_SQL_FILE) + if os.path.exists(upgrade_sql_file_path): + tarLists += " %s %s" % (Const.UPGRADE_SQL_SHA, + Const.UPGRADE_SQL_FILE) if "HOST_IP" in os.environ.keys(): tarLists += " cluster_default_agent.xml" try: @@ -4163,6 +4169,37 @@ class DefaultValue(): else: return False + @staticmethod + def getPrimaryNode(userProfile): + """ + :param + :return: PrimaryNode + """ + try: + primaryFlag = "Primary" + count = 0 + while count < 60: + count = 0 + cmd = "source {0} && gs_om -t status --detail".format( + userProfile) + (status, output) = subprocess.getstatusoutput(cmd) + if status == 0: + break + time.sleep(10) + count += 1 + if status != 0: + raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % + "Command:%s. Error:\n%s" % (cmd, output)) + targetString = output.split("Datanode")[1] + dnPrimary = [x for x in re.split(r"[|\n]", targetString) + if primaryFlag in x] + primaryList = [] + for dn in dnPrimary: + primaryList.append(list(filter(None, dn.split(" ")))[1]) + return primaryList + except Exception as e: + raise Exception(str(e)) + class ClusterCommand(): ''' diff --git a/script/gspylib/common/DbClusterInfo.py b/script/gspylib/common/DbClusterInfo.py index 3f60ee5..f59044a 100644 --- a/script/gspylib/common/DbClusterInfo.py +++ b/script/gspylib/common/DbClusterInfo.py @@ -1385,9 +1385,9 @@ class dbClusterInfo(): """ try: with open(staticConfigFile, "rb") as fp: - info = fp.read(32) + info = fp.read(28) (crc, lenth, version, currenttime, nodeNum, - localNodeId) = struct.unpack("=qIIqiI", info) + localNodeId) = struct.unpack("=IIIqiI", info) except Exception as e: raise Exception( ErrorCode.GAUSS_512["GAUSS_51236"] + " Error: \n%s." % str(e)) @@ -2062,12 +2062,22 @@ class dbClusterInfo(): # find the path from right to left self.logPath = logPathWithUser[ 0:(logPathWithUser.rfind(splitMark))] + staticConfigFilePath = os.path.split(staticConfigFile)[0] + versionFile = os.path.join( + staticConfigFilePath, "upgrade_version") + version, number, commitid = VersionInfo.get_version_info( + versionFile) try: # read static_config_file fp = open(staticConfigFile, "rb") - info = fp.read(32) - (crc, lenth, version, currenttime, nodeNum, - localNodeId) = struct.unpack("=qIIqiI", info) + if float(number) <= 92.200: + info = fp.read(32) + (crc, lenth, version, currenttime, nodeNum, + localNodeId) = struct.unpack("=qIIqiI", info) + else: + info = fp.read(28) + (crc, lenth, version, currenttime, nodeNum, + localNodeId) = struct.unpack("=IIIqiI", info) self.version = version self.installTime = currenttime self.localNodeId = localNodeId @@ -2110,7 +2120,7 @@ class dbClusterInfo(): for i in range(nodeNum): offset = (fp.tell() // PAGE_SIZE + 1) * PAGE_SIZE fp.seek(offset) - dbNode = self.__unPackNodeInfo(fp, isLCCluster) + dbNode = self.__unPackNodeInfo(fp, number, isLCCluster) self.dbNodes.append(dbNode) fp.close() except Exception as e: @@ -2122,14 +2132,18 @@ class dbClusterInfo(): fp.close() raise Exception(str(e)) - def __unPackNodeInfo(self, fp, isLCCluster=False): + def __unPackNodeInfo(self, fp, number, isLCCluster=False): """ function : unpack a node config info input : file output : Object """ - info = fp.read(76) - (crc, nodeId, nodeName) = struct.unpack("=qI64s", info) + if float(number) <= 92.200: + info = fp.read(76) + (crc, nodeId, nodeName) = struct.unpack("=qI64s", info) + else: + info = fp.read(72) + (crc, nodeId, nodeName) = struct.unpack("=II64s", info) nodeName = nodeName.decode().strip('\x00') dbNode = dbNodeInfo(nodeId, nodeName) info = fp.read(68) @@ -2414,11 +2428,21 @@ class dbClusterInfo(): """ fp = None try: + staticConfigFilePath = os.path.split(staticConfigFile)[0] + versionFile = os.path.join( + staticConfigFilePath, "upgrade_version") + version, number, commitid = VersionInfo.get_version_info( + versionFile) # read cluster info from static config file fp = open(staticConfigFile, "rb") - info = fp.read(32) - (crc, lenth, version, currenttime, nodeNum, - localNodeId) = struct.unpack("=qIIqiI", info) + if float(number) <= 92.200: + info = fp.read(32) + (crc, lenth, version, currenttime, nodeNum, + localNodeId) = struct.unpack("=qIIqiI", info) + else: + info = fp.read(28) + (crc, lenth, version, currenttime, nodeNum, + localNodeId) = struct.unpack("=IIIqiI", info) if (version <= 100): raise Exception(ErrorCode.GAUSS_516["GAUSS_51637"] % ("cluster static config version[%s]" @@ -2452,7 +2476,7 @@ class dbClusterInfo(): for i in range(nodeNum): offset = (fp.tell() // PAGE_SIZE + 1) * PAGE_SIZE fp.seek(offset) - dbNode = self.__unPackNodeInfo(fp) + dbNode = self.__unPackNodeInfo(fp, number) self.dbNodes.append(dbNode) fp.close() except Exception as e: @@ -4215,9 +4239,8 @@ class dbClusterInfo(): raise Exception(ErrorCode.GAUSS_532["GAUSS_53200"]) if peerNum > 8: - raise Exception(ErrorCode.GAUSS_512["GAUSS_51230"] % \ - ("database node standbys", "be less than 5") - + " Please set it.") + raise Exception(ErrorCode.GAUSS_512["GAUSS_51230"] % ( + "database node standbys", "be less than 9") + " Please set it.") @@ -4410,13 +4433,21 @@ class dbClusterInfo(): else: return instances - def saveToStaticConfig(self, filePath, localNodeId, dbNodes=None): + def saveToStaticConfig(self, filePath, localNodeId, dbNodes=None, + upgrade=False): """ function : Save cluster info into to static config input : String,int output : NA """ fp = None + number = None + if upgrade: + staticConfigFilePath = os.path.split(filePath)[0] + versionFile = os.path.join( + staticConfigFilePath, "upgrade_version") + version, number, commitid = VersionInfo.get_version_info( + versionFile) try: if (dbNodes is None): dbNodes = self.dbNodes @@ -4434,14 +4465,20 @@ class dbClusterInfo(): info += struct.pack("I", localNodeId) crc = binascii.crc32(info) - info = struct.pack("q", crc) + info + if upgrade: + if float(number) <= 92.200: + info = struct.pack("q", crc) + info + else: + info = struct.pack("I", crc) + info + else: + info = struct.pack("I", crc) + info fp.write(info) for dbNode in dbNodes: offset = (fp.tell() // PAGE_SIZE + 1) * PAGE_SIZE fp.seek(offset) - info = self.__packNodeInfo(dbNode) + info = self.__packNodeInfo(dbNode, number, upgrade=upgrade) fp.write(info) endBytes = PAGE_SIZE - fp.tell() % PAGE_SIZE if (endBytes != PAGE_SIZE): @@ -4457,7 +4494,7 @@ class dbClusterInfo(): "static configuration file" + " Error: \n%s" % str(e)) - def __packNodeInfo(self, dbNode): + def __packNodeInfo(self, dbNode, number, upgrade=False): """ function : Pack the info of node input : [] @@ -4493,7 +4530,13 @@ class dbClusterInfo(): info += struct.pack("I", 0) crc = binascii.crc32(info) - return struct.pack("q", crc) + info + if upgrade: + if float(number) <= 92.200: + return struct.pack("q", crc) + info + else: + return struct.pack("I", crc) + info + else: + return struct.pack("I", crc) + info def __packNodeInfoForLC(self, dbNode): """ @@ -4516,7 +4559,7 @@ class dbClusterInfo(): info += struct.pack("I", 0) crc = binascii.crc32(info) - return struct.pack("q", crc) + info + return struct.pack("I", crc) + info def __packEtcdInfo(self, dbNode): """ @@ -5936,7 +5979,7 @@ class dbClusterInfo(): # node count info += struct.pack("I", len(self.dbNodes)) crc = binascii.crc32(info) - info = struct.pack("q", crc) + info + info = struct.pack("I", crc) + info fp.write(info) primaryDnNum = 0 for dbNode in self.dbNodes: @@ -6039,7 +6082,7 @@ class dbClusterInfo(): info += struct.pack("I", 0) info += struct.pack("I", 0) crc = binascii.crc32(info) - return (primaryNum, struct.pack("q", crc) + info) + return (primaryNum, struct.pack("I", crc) + info) def __getClusterSwitchTime(self, dynamicConfigFile): """ @@ -6051,9 +6094,9 @@ class dbClusterInfo(): fp = None try: fp = open(dynamicConfigFile, "rb") - info = fp.read(28) + info = fp.read(24) (crc, lenth, version, switchTime, nodeNum) = \ - struct.unpack("=qIIqi", info) + struct.unpack("=IIIqi", info) fp.close() except Exception as e: if fp: @@ -6189,9 +6232,9 @@ class dbClusterInfo(): dynamicConfigFile = self.__getDynamicConfig(user) # read dynamic_config_file fp = open(dynamicConfigFile, "rb") - info = fp.read(28) + info = fp.read(24) (crc, lenth, version, currenttime, nodeNum) = \ - struct.unpack("=qIIqi", info) + struct.unpack("=IIIqi", info) totalMaterDnNum = 0 for i in range(nodeNum): offset = (fp.tell() // PAGE_SIZE + 1) * PAGE_SIZE @@ -6210,8 +6253,8 @@ class dbClusterInfo(): dynamicConfigFile + " Error:\n" + str(e)) def __unpackDynamicNodeInfo(self, fp): - info = fp.read(76) - (crc, nodeId, nodeName) = struct.unpack("=qI64s", info) + info = fp.read(72) + (crc, nodeId, nodeName) = struct.unpack("=II64s", info) nodeName = nodeName.decode().strip('\x00') dbNode = dbNodeInfo(nodeId, nodeName) info = fp.read(4) diff --git a/script/gspylib/common/OMCommand.py b/script/gspylib/common/OMCommand.py index a967fe3..c0610b7 100644 --- a/script/gspylib/common/OMCommand.py +++ b/script/gspylib/common/OMCommand.py @@ -229,6 +229,60 @@ class OMCommand(): except Exception as e: raise Exception(str(e)) + @staticmethod + def doCheckStaus(user, nodeId, cluster_normal_status=None, + expected_redistributing=""): + """ + function: Check cluster status + input : user, nodeId, cluster_normal_status, expected_redistributing + output: status, output + """ + try: + statusFile = "/home/%s/gauss_check_status_%d.dat" % ( + user, os.getpid()) + TempfileManagement.removeTempFile(statusFile) + cmd = ClusterCommand.getQueryStatusCmd(user, "", statusFile) + (status, output) = subprocess.getstatusoutput(cmd) + if status != 0: + TempfileManagement.removeTempFile(statusFile) + return (status, output) + + clusterStatus = DbClusterStatus() + clusterStatus.initFromFile(statusFile) + TempfileManagement.removeTempFile(statusFile) + except Exception as e: + DefaultValue.cleanTmpFile(statusFile) + raise Exception( + ErrorCode.GAUSS_516["GAUSS_51600"] + "Error: %s." % str(e)) + status = 0 + output = "" + statusRep = None + if nodeId > 0: + nodeStatus = clusterStatus.getDbNodeStatusById(nodeId) + if nodeStatus is None: + raise Exception(ErrorCode.GAUSS_516["GAUSS_51619"] % nodeId) + + status = 0 if nodeStatus.isNodeHealthy() else 1 + statusRep = nodeStatus.getNodeStatusReport() + else: + status = 0 if clusterStatus.isAllHealthy(cluster_normal_status) \ + and (clusterStatus.redistributing == + expected_redistributing or + expected_redistributing == "") else 1 + statusRep = clusterStatus.getClusterStatusReport() + output += "cluster_state : %s\n" % clusterStatus.clusterStatus + output += "redistributing : %s\n" % clusterStatus.redistributing + output += "node_count : %d\n" % statusRep.nodeCount + output += "Datanode State\n" + output += " primary : %d\n" % statusRep.dnPrimary + output += " standby : %d\n" % statusRep.dnStandby + output += " secondary : %d\n" % statusRep.dnDummy + output += " building : %d\n" % statusRep.dnBuild + output += " abnormal : %d\n" % statusRep.dnAbnormal + output += " down : %d\n" % statusRep.dnDown + + return (status, output) + @staticmethod def getClusterStatus(user, isExpandScene=False): """ diff --git a/script/gspylib/common/ParallelBaseOM.py b/script/gspylib/common/ParallelBaseOM.py index f7cba92..0a7e164 100644 --- a/script/gspylib/common/ParallelBaseOM.py +++ b/script/gspylib/common/ParallelBaseOM.py @@ -790,7 +790,7 @@ class ParallelBaseOM(object): return output.strip() - def killKernalSnapshotThread(self, coorInst): + def killKernalSnapshotThread(self, dnInst): """ function: kill snapshot thread in Kernel, avoid dead lock with redistribution) @@ -801,7 +801,7 @@ class ParallelBaseOM(object): killSnapshotSQL = "select * from kill_snapshot();" (status, output) = ClusterCommand.remoteSQLCommand( - killSnapshotSQL, self.user, coorInst.hostname, coorInst.port, + killSnapshotSQL, self.user, dnInst.hostname, dnInst.port, False, DefaultValue.DEFAULT_DB_NAME) if (status != 0): raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % diff --git a/script/gspylib/component/Kernel/Kernel.py b/script/gspylib/component/Kernel/Kernel.py index 3fcbd23..5d45b31 100644 --- a/script/gspylib/component/Kernel/Kernel.py +++ b/script/gspylib/component/Kernel/Kernel.py @@ -67,10 +67,16 @@ class Kernel(BaseComponent): """ def start(self, time_out=DefaultValue.TIMEOUT_CLUSTER_START, - security_mode="off"): + security_mode="off", cluster_number=None): """ """ - cmd = "%s/gs_ctl start -D %s " % (self.binPath, self.instInfo.datadir) + if cluster_number: + cmd = "%s/gs_ctl start -o '-u %s' -D %s " % ( + self.binPath, int(float(cluster_number) * 1000), + self.instInfo.datadir) + else: + cmd = "%s/gs_ctl start -D %s " % ( + self.binPath, self.instInfo.datadir) if self.instInfo.instanceType == DefaultValue.MASTER_INSTANCE: if len(self.instInfo.peerInstanceInfos) > 0: cmd += "-M primary" diff --git a/script/impl/postuninstall/PostUninstallImpl.py b/script/impl/postuninstall/PostUninstallImpl.py index 0a53a99..ed613b2 100644 --- a/script/impl/postuninstall/PostUninstallImpl.py +++ b/script/impl/postuninstall/PostUninstallImpl.py @@ -32,6 +32,7 @@ from gspylib.common.ErrorCode import ErrorCode from gspylib.os.gsfile import g_file from gspylib.os.gsfile import g_Platform from gspylib.common.VersionInfo import VersionInfo +import impl.upgrade.UpgradeConst as Const sys.path.append(sys.path[0] + "/../../../lib/") DefaultValue.doConfigForParamiko() @@ -414,6 +415,10 @@ class PostUninstallImpl: g_file.removeDirectory(path) path = "%s/sctp_patch" % (self.clusterToolPath) g_file.removeDirectory(path) + path = "%s/%s" % (Const.UPGRADE_SQL_FILE, self.clusterToolPath) + g_file.removeFile(path) + path = "%s/%s" % (Const.UPGRADE_SQL_SHA, self.clusterToolPath) + g_file.removeFile(path) self.logger.debug( "Deleting environmental software of local nodes.") diff --git a/script/impl/upgrade/UpgradeConst.py b/script/impl/upgrade/UpgradeConst.py index c9f3ede..27862f4 100644 --- a/script/impl/upgrade/UpgradeConst.py +++ b/script/impl/upgrade/UpgradeConst.py @@ -53,6 +53,18 @@ ACTION_INPLACE_RESTORE = "inplace_restore" ACTION_CHECK_GUC = "check_guc" ACTION_BACKUP_HOTPATCH = "backup_hotpatch" ACTION_ROLLBACK_HOTPATCH = "rollback_hotpatch" +ACTION_UPGRADE_SQL_FOLDER = "prepare_upgrade_sql_folder" +ACTION_BACKUP_OLD_CLUSTER_DB_AND_REL = "backup_old_cluster_db_and_rel" +ACTION_UPDATE_CATALOG = "update_catalog" +ACTION_BACKUP_OLD_CLUSTER_CATALOG_PHYSICAL_FILES = \ + "backup_old_cluster_catalog_physical_files" +ACTION_RESTORE_OLD_CLUSTER_CATALOG_PHYSICAL_FILES = \ + "restore_old_cluster_catalog_physical_files" +ACTION_CLEAN_OLD_CLUSTER_CATALOG_PHYSICAL_FILES = \ + "clean_old_cluster_catalog_physical_files" +ACTION_REPLACE_PG_PROC_FILES = "replace_pg_proc_files" +ACTION_CREATE_PG_PROC_MAPPING_FILE = "create_pg_proc_mapping_file" +ACTION_CREATE_NEW_CSV_FILE = "create_new_csv_file" OPTION_PRECHECK = "before" OPTION_POSTCHECK = "after" @@ -61,7 +73,7 @@ GREY_UPGRADE_STEP_FILE = "upgrade_step.csv" CLUSTER_CMSCONF_FILE = "cluster_cmsconf.json" CLUSTER_CNSCONF_FILE = "cluster_cnconf.json" READONLY_MODE = "read_only_mode" - +TMP_DYNAMIC_DN_INFO = "upgrade_gauss_dn_status.dat" #step flag BINARY_UPGRADE_NO_NEED_ROLLBACK = -2 INVALID_UPRADE_STEP = -1 @@ -95,6 +107,11 @@ BACKUP_DIR_LIST = ['global', 'pg_clog', 'pg_xlog', 'pg_multixact', 'pg_replslot', 'pg_notify', 'pg_subtrans', 'pg_cbm', 'pg_twophase'] + +BACKUP_DIR_LIST_BASE = ['global', 'pg_clog', 'pg_csnlog'] +BACKUP_DIR_LIST_64BIT_XID = ['pg_multixact', 'pg_replslot', 'pg_notify', + 'pg_subtrans', 'pg_twophase'] + FIRST_GREY_UPGRADE_NUM = 92 UPGRADE_PRECOMMIT_NUM = 0.001 @@ -115,6 +132,7 @@ UPGRADE_SCHEMA = "on_upgrade_69954349032535120" RECORD_NODE_STEP = "record_node_step" READ_STEP_FROM_FILE_FLAG = "read_step_from_file_flag" RECORD_UPGRADE_DIR = "record_app_directory" +XLOG_BACKUP_INFO = "xlog_backup_info.json" OLD = "old" NEW = "new" # upgrade sql sha file and sql file @@ -124,3 +142,4 @@ UPGRADE_SQL_FILE = "upgrade_sql.tar.gz" COMBIN_NUM = 30 ON_INPLACE_UPGRADE = "IsInplaceUpgrade" MAX_APP_SIZE = 2000 +UPGRADE_VERSION_64bit_xid = 91.208 diff --git a/script/impl/upgrade/UpgradeImpl.py b/script/impl/upgrade/UpgradeImpl.py index 2dae455..2778857 100644 --- a/script/impl/upgrade/UpgradeImpl.py +++ b/script/impl/upgrade/UpgradeImpl.py @@ -22,17 +22,22 @@ import json import re import csv import traceback +import copy from datetime import datetime, timedelta -from gspylib.common.Common import DefaultValue, ClusterCommand +from gspylib.common.Common import DefaultValue, ClusterCommand, \ + ClusterInstanceConfig from gspylib.common.DbClusterInfo import instanceInfo, \ dbNodeInfo, dbClusterInfo, compareObject from gspylib.common.OMCommand import OMCommand from gspylib.common.ErrorCode import ErrorCode from gspylib.threads.SshTool import SshTool from gspylib.common.VersionInfo import VersionInfo +from gspylib.common.DbClusterStatus import DbClusterStatus from gspylib.os.gsplatform import g_Platform from gspylib.os.gsfile import g_file +from gspylib.os.gsOSlib import g_OSlib +from gspylib.inspection.common import SharedFuncs from impl.upgrade.UpgradeConst import GreyUpgradeStep import impl.upgrade.UpgradeConst as Const @@ -60,9 +65,12 @@ class UpgradeImpl: """ function: constructor """ + self.dnInst = None self.context = upgrade self.newCommitId = "" self.oldCommitId = "" + self.isLargeInplaceUpgrade = False + self.__upgrade_across_64bit_xid = False def exitWithRetCode(self, action, succeed=True, msg=""): """ @@ -398,9 +406,8 @@ class UpgradeImpl: elif ((float(newClusterNumber) - int(float(newClusterNumber))) > (float(oldClusterNumber) - int(float(oldClusterNumber)))): - raise Exception(ErrorCode.GAUSS_529["GAUSS_52904"] - + "This cluster version is " - "not supported upgrade.") + upgradeAction = Const.ACTION_INPLACE_UPGRADE + self.isLargeInplaceUpgrade = True else: raise Exception(ErrorCode.GAUSS_516["GAUSS_51629"] % newClusterNumber) @@ -576,7 +583,7 @@ class UpgradeImpl: input : NA output: NA """ - self.context.logger.log("Stopping the cluster.", "addStep") + self.context.logger.debug("Stopping the cluster.", "addStep") # Stop cluster applications cmd = "%s -U %s -R %s -t %s" % ( OMCommand.getLocalScript("Local_StopInstance"), @@ -587,7 +594,7 @@ class UpgradeImpl: cmd, "Stop cluster", self.context.sshTool, self.context.isSingle or self.context.localMode, self.context.mpprcFile) - self.context.logger.log("Successfully stopped cluster.") + self.context.logger.debug("Successfully stopped cluster.") def startCluster(self): """ @@ -595,10 +602,19 @@ class UpgradeImpl: input : NA output: NA """ - cmd = "%s -U %s -R %s -t %s" % ( - OMCommand.getLocalScript("Local_StartInstance"), - self.context.user, self.context.clusterInfo.appPath, - Const.UPGRADE_TIMEOUT_CLUSTER_START) + versionFile = os.path.join( + self.context.oldClusterAppPath, "bin/upgrade_version") + if os.path.exists(versionFile): + _, number, _ = VersionInfo.get_version_info(versionFile) + cmd = "%s -U %s -R %s -t %s --cluster_number=%s" % ( + OMCommand.getLocalScript("Local_StartInstance"), + self.context.user, self.context.clusterInfo.appPath, + Const.UPGRADE_TIMEOUT_CLUSTER_START, number) + else: + cmd = "%s -U %s -R %s -t %s" % ( + OMCommand.getLocalScript("Local_StartInstance"), + self.context.user, self.context.clusterInfo.appPath, + Const.UPGRADE_TIMEOUT_CLUSTER_START) DefaultValue.execCommandWithMode( cmd, "Start cluster", self.context.sshTool, self.context.isSingle or self.context.localMode, @@ -666,6 +682,10 @@ class UpgradeImpl: if (not self.context.isSingle): self.context.sshTool.scpFiles(inplace_upgrade_flag_file, self.context.upgradeBackupPath) + if float(self.context.oldClusterNumber) <= float( + Const.UPGRADE_VERSION_64bit_xid) < \ + float(self.context.newClusterNumber): + self.__upgrade_across_64bit_xid = True self.context.logger.debug("Successfully created inplace" " upgrade flag file.") @@ -732,8 +752,8 @@ class UpgradeImpl: output : NA """ self.context.logger.debug("Set upgrade_mode guc parameter.") - cmd = "gs_guc %s -Z coordinator -Z datanode -N all " \ - "-I all -c 'upgrade_mode=%d'" % (setType, mode) + cmd = "gs_guc %s -N all -I all -c 'upgrade_mode=%d'" % ( + setType, mode) self.context.logger.debug("Command for setting database" " node parameter: %s." % cmd) (status, output) = subprocess.getstatusoutput(cmd) @@ -818,6 +838,18 @@ class UpgradeImpl: return True return False + def reloadVacuumDeferCleanupAge(self): + """ + function: reload the guc paramter vacuum_defer_cleanup_age value on + inplace upgrade or grey large upgrade + input : NA + """ + (status, output) = self.setGUCValue("vacuum_defer_cleanup_age", + "100000", "reload") + if status != 0: + raise Exception(ErrorCode.GAUSS_500["GAUSS_50007"] % "GUC" + + " Error: \n%s" % str(output)) + def doInplaceBinaryUpgrade(self): """ function: do binary upgrade, which essentially replace the binary files @@ -849,6 +881,12 @@ class UpgradeImpl: % "cluster" + output) # 4.record the old and new app dir in file self.recordDirFile() + if self.isLargeInplaceUpgrade: + self.recordLogicalClusterName() + # 6. reload vacuum_defer_cleanup_age to new value + if self.isLargeInplaceUpgrade: + if self.__upgrade_across_64bit_xid: + self.reloadVacuumDeferCleanupAge() if self.setClusterReadOnlyMode() != 0: raise Exception(ErrorCode.GAUSS_529["GAUSS_52908"]) @@ -861,6 +899,12 @@ class UpgradeImpl: # to ensure the transaction atomicity, # it will be used with checkUpgrade(). self.backupNodeVersion() + # For inplace upgrade, we have to perform additional checks + # and then backup catalog files. + if self.isLargeInplaceUpgrade: + self.prepareUpgradeSqlFolder() + self.HASyncReplayCheck() + self.backupOldClusterDBAndRelInfo() # 8. stop old cluster self.recordNodeStepInplace(Const.ACTION_INPLACE_UPGRADE, Const.BINARY_UPGRADE_STEP_STOP_NODE) @@ -903,6 +947,12 @@ class UpgradeImpl: # At the same time, sync newly added guc for instances self.restoreClusterConfig() self.syncNewGUC() + # unset cluster readonly + self.startCluster() + if self.unSetClusterReadOnlyMode() != 0: + raise Exception("NOTICE: " + + ErrorCode.GAUSS_529["GAUSS_52907"]) + self.stopCluster() # 12. modify GUC parameter unix_socket_directory self.modifySocketDir() # 13. start new cluster @@ -913,12 +963,21 @@ class UpgradeImpl: # update catalog # start cluster in normal mode + if self.isLargeInplaceUpgrade: + self.touchRollbackCatalogFlag() + self.updateCatalog() self.CopyCerts() self.context.createGrpcCa() self.context.logger.debug("Successfully createGrpcCa.") - self.switchBin(Const.NEW) + self.switchBin(Const.NEW) self.startCluster() + if self.isLargeInplaceUpgrade: + self.modifyPgProcIndex() + self.context.logger.debug("Start to exec post upgrade script") + self.doUpgradeCatalog(postUpgrade=True) + self.context.logger.debug( + "Successfully exec post upgrade script") self.context.logger.debug("Successfully start all " "instances on the node.", "constant") # 14. check the cluster status @@ -964,16 +1023,28 @@ class UpgradeImpl: # and cleanup list file for re-entry cleanUpSuccess = True + # drop table and index after large upgrade + if self.isLargeInplaceUpgrade: + if self.check_upgrade_mode(): + self.drop_table_or_index() # 1.unset read-only + if self.isLargeInplaceUpgrade: + self.setUpgradeMode(0) if self.unSetClusterReadOnlyMode() != 0: self.context.logger.log("NOTICE: " + ErrorCode.GAUSS_529["GAUSS_52907"]) cleanUpSuccess = False - + if self.isLargeInplaceUpgrade: + self.cleanCsvFile() # 2. drop old PMK schema # we sleep 10 seconds first because DB might be updating # ha status after unsetting read-only time.sleep(10) + # 3. clean backup catalog physical files if doing inplace upgrade + if self.cleanBackupedCatalogPhysicalFiles() != 0: + self.context.logger.debug( + "Failed to clean backup files in directory %s. " + % self.context.upgradeBackupPath) if not cleanUpSuccess: self.context.logger.log("NOTICE: Cleanup is incomplete during" @@ -985,10 +1056,1001 @@ class UpgradeImpl: # and uninstall inplace upgrade support functions self.cleanInstallPath(Const.OLD) self.cleanBinaryUpgradeBakFiles() + if self.isLargeInplaceUpgrade: + self.stopCluster() + self.startCluster() self.context.logger.log("Commit binary upgrade succeeded.") self.exitWithRetCode(Const.ACTION_INPLACE_UPGRADE, True) + def cleanCsvFile(self): + """ + clean csv file + :return: + """ + clusterNodes = self.context.clusterInfo.dbNodes + for dbNode in clusterNodes: + if len(dbNode.datanodes) == 0: + continue + dnInst = dbNode.datanodes[0] + dndir = dnInst.datadir + pg_proc_csv_path = \ + '%s/pg_copydir/tbl_pg_proc_oids.csv' % dndir + new_pg_proc_csv_path = \ + '%s/pg_copydir/new_tbl_pg_proc_oids.csv' % dndir + if os.path.exists(pg_proc_csv_path): + g_file.removeFile(pg_proc_csv_path) + if os.path.exists(new_pg_proc_csv_path): + g_file.removeFile(new_pg_proc_csv_path) + + def check_upgrade_mode(self): + """ + check upgrade_mode value + :return: + """ + cmd = "source %s ; gs_guc check -N all -I all -c 'upgrade_mode'" % \ + self.context.userProfile + (status, output) = subprocess.getstatusoutput(cmd) + if status != 0: + raise Exception(ErrorCode.GAUSS_500[ + "GAUSS_50010"] % 'upgrade_mode' + + "Error: \n%s" % str(output)) + if output.find("upgrade_mode=0") >= 0: + return False + else: + return True + + def cleanBackupedCatalogPhysicalFiles(self, isRollBack=False): + """ + function : clean backuped catalog physical files + input : isRollBack, default is False + output: return 0, if the operation is done successfully. + return 1, if the operation failed. + """ + try: + if self.isLargeInplaceUpgrade: + self.context.logger.log("Clean up backup catalog files.") + # send cmd to all node and exec + cmd = "%s -t %s -U %s --upgrade_bak_path=%s -l %s" % \ + (OMCommand.getLocalScript("Local_Upgrade_Utility"), + Const.ACTION_CLEAN_OLD_CLUSTER_CATALOG_PHYSICAL_FILES, + self.context.user, + self.context.upgradeBackupPath, + self.context.localLog) + if isRollBack: + cmd += " --rollback --oldcluster_num='%s'" % \ + self.context.oldClusterNumber + self.context.logger.debug( + "Command for cleaning up physical catalog files: %s." % cmd) + DefaultValue.execCommandWithMode( + cmd, + "clean backuped physical files of catalog objects", + self.context.sshTool, + self.context.isSingle, + self.context.userProfile) + self.context.logger.debug( + "Successfully cleaned up backup catalog files.") + return 0 + except Exception as e: + if isRollBack: + raise Exception( + "Fail to clean up backup catalog files: %s" % str(e)) + else: + self.context.logger.debug( + "Fail to clean up backup catalog files. " + + "Please re-commit upgrade once again or clean up manually.") + return 1 + + def recordLogicalClusterName(self): + """ + function: record the logical node group name in bakpath, + so that we can restore specfic name in bakpath, + used in restoreCgroup, and refresh the CgroupConfigure + input : NA + output: NA + """ + lcgroupfile = "%s/oldclusterinfo.json" % self.context.tmpDir + try: + self.context.logger.debug( + "Write and send logical cluster info file.") + # check whether file is exists + if os.path.isfile(lcgroupfile): + return 0 + # check whether it is lc cluster + sql = """SELECT true AS group_kind + FROM pg_class c, pg_namespace n, pg_attribute attr + WHERE c.relname = 'pgxc_group' AND n.nspname = 'pg_catalog' + AND attr.attname = 'group_kind' AND c.relnamespace = + n.oid AND attr.attrelid = c.oid; """ + self.context.logger.debug( + "Check if the cluster type is a logical cluster.") + (status, output) = ClusterCommand.remoteSQLCommand( + sql, + self.context.user, + self.dnInst.hostname, + self.dnInst.port, + False, + DefaultValue.DEFAULT_DB_NAME, + IsInplaceUpgrade=True) + if status != 0: + raise Exception(ErrorCode.GAUSS_513[ + "GAUSS_51300"] % sql + " Error: \n%s" % str( + output)) + if not output or output.strip() != 't': + self.context.logger.debug( + "The old cluster is not logical cluster.") + return 0 + self.context.logger.debug("The old cluster is logical cluster.") + # get lc group name lists + sql = "SELECT group_name FROM pgxc_group WHERE group_kind = 'v';" + self.context.logger.debug( + "Getting the list of logical cluster names.") + (status, output) = ClusterCommand.remoteSQLCommand( + sql, + self.context.user, + self.dnInst.hostname, + self.dnInst.port, + False, + DefaultValue.DEFAULT_DB_NAME, + IsInplaceUpgrade=True) + if status != 0: + raise Exception(ErrorCode.GAUSS_513[ + "GAUSS_51300"] % sql + " Error: \n%s" % str( + output)) + lcgroupnames = output.split("\n") + self.context.logger.debug( + "The list of logical cluster names: %s." % lcgroupnames) + # create the file + g_file.createFile(lcgroupfile) + g_file.changeOwner(self.context.user, lcgroupfile) + g_file.changeMode(DefaultValue.KEY_FILE_MODE, lcgroupfile) + # write result to file + with open(lcgroupfile, "w") as fp_json: + json.dump({"lcgroupnamelist": lcgroupnames}, fp_json) + # send file to remote nodes + self.context.sshTool.scpFiles(lcgroupfile, self.context.tmpDir) + self.context.logger.debug( + "Successfully to write and send logical cluster info file.") + return 0 + except Exception as e: + cmd = "(if [ -f '%s' ]; then rm -f '%s'; fi)" % ( + lcgroupfile, lcgroupfile) + DefaultValue.execCommandWithMode(cmd, + "clean lcgroup name list file", + self.context.sshTool, + self.context.isSingle, + self.context.userProfile) + raise Exception(str(e)) + + def prepareUpgradeSqlFolder(self): + """ + function: verify upgrade_sql.tar.gz and extract it to binary backup + path, because all node need set_guc, so + we will decompress on all nodes + input : NA + output: NA + """ + self.context.logger.debug("Preparing upgrade sql folder.") + if self.context.action == Const.ACTION_INPLACE_UPGRADE: + hostName = DefaultValue.GetHostIpOrName() + hosts = [hostName] + else: + hosts = self.context.clusterNodes + cmd = "%s -t %s -U %s --upgrade_bak_path=%s -X %s -l %s" % \ + (OMCommand.getLocalScript("Local_Upgrade_Utility"), + Const.ACTION_UPGRADE_SQL_FOLDER, + self.context.user, + self.context.upgradeBackupPath, + self.context.xmlFile, + self.context.localLog) + DefaultValue.execCommandWithMode(cmd, + "prepare upgrade_sql", + self.context.sshTool, + self.context.isSingle, + self.context.userProfile, + hosts) + + def HASyncReplayCheck(self): + """ + function: Wait and check if all standbys have replayed upto flushed + xlog positions of primaries.We record primary xlog flush + position at start of the check and wait until standby replay + upto that point. + Attention: If autovacuum is turned on, primary xlog flush + position may increase during the check.We do not check such + newly added xlog because they will not change catalog + physical file position. + Input: NA + output : NA + """ + self.context.logger.debug("Start to wait and check if all the standby" + " instances have replayed all xlogs.") + self.doReplay() + self.context.logger.debug("Successfully performed the replay check " + "of the standby instance.") + + def doReplay(self): + refreshTimeout = 180 + waitTimeout = 300 + RefreshTime = datetime.now() + timedelta(seconds=refreshTimeout) + EndTime = datetime.now() + timedelta(seconds=waitTimeout) + # wait and check sync status between primary and standby + + NeedReplay = True + PosList = [] + while NeedReplay: + sql = "SELECT sender_flush_location,receiver_replay_location " \ + "from pg_catalog.pg_stat_get_wal_senders() " \ + "where peer_role != 'Secondary';" + (status, output) = ClusterCommand.remoteSQLCommand( + sql, + self.context.user, + self.dnInst.hostname, + self.dnInst.port, + False, + DefaultValue.DEFAULT_DB_NAME, + IsInplaceUpgrade=True) + if status != 0: + self.context.logger.debug( + "Primary and Standby may be not in sync.") + self.context.logger.debug( + "Sync status: %s. Output: %s" % (str(status), output)) + elif output != "": + self.context.logger.debug( + "Sync status: %s. Output: %s" % (str(status), output)) + tmpPosList = self.getXlogPosition(output) + if len(PosList) == 0: + PosList = copy.deepcopy(tmpPosList) + self.context.logger.debug( + "Primary and Standby may be not in sync.") + else: + NeedReplay = False + for eachRec in PosList: + for eachTmpRec in tmpPosList: + if self.needReplay(eachRec, eachTmpRec): + NeedReplay = True + self.context.logger.debug( + "Primary and Standby may be not in sync.") + break + if NeedReplay: + break + else: + NeedReplay = False + + # Standby replay postion may keep falling behind primary + # flush position if it is at the end of one xlog page and the + # free space is less than xlog record header size. + # We do a checkpoint to avoid such situation. + if datetime.now() > RefreshTime and NeedReplay: + self.context.logger.debug( + "Execute CHECKPOINT to refresh xlog position.") + refreshsql = "set statement_timeout=300000;CHECKPOINT;" + (status, output) = ClusterCommand.remoteSQLCommand( + refreshsql, + self.context.user, + self.dnInst.hostname, + self.dnInst.port, + False, + DefaultValue.DEFAULT_DB_NAME, + IsInplaceUpgrade=True) + if status != 0: + raise Exception( + ErrorCode.GAUSS_513["GAUSS_51300"] % refreshsql + + "Error: \n%s" % str(output)) + + if datetime.now() > EndTime and NeedReplay: + self.context.logger.log("WARNING: " + ErrorCode.GAUSS_513[ + "GAUSS_51300"] % sql + " Timeout while waiting for " + "standby replay.") + return + time.sleep(5) + + def getXlogPosition(self, output): + """ + get xlog position from output + """ + tmpPosList = [] + resList = output.split('\n') + for eachLine in resList: + tmpRec = {} + (flushPos, replayPos) = eachLine.split('|') + (flushPosId, flushPosOff) = (flushPos.strip()).split('/') + (replayPosId, replayPosOff) = (replayPos.strip()).split('/') + tmpRec['nodeName'] = self.getHAShardingName() + tmpRec['flushPosId'] = flushPosId.strip() + tmpRec['flushPosOff'] = flushPosOff.strip() + tmpRec['replayPosId'] = replayPosId.strip() + tmpRec['replayPosOff'] = replayPosOff.strip() + tmpPosList.append(tmpRec) + return tmpPosList + + def getHAShardingName(self): + """ + in centralized cluster, used to get the only one sharding name + """ + peerInsts = self.context.clusterInfo.getPeerInstance(self.dnInst) + (instance_name, _, _) = ClusterInstanceConfig.\ + getInstanceInfoForSinglePrimaryMultiStandbyCluster( + self.dnInst, peerInsts) + return instance_name + + def needReplay(self, eachRec, eachTmpRec): + """ + judeg if need replay by xlog position + """ + if eachRec['nodeName'] == eachTmpRec['nodeName'] \ + and (int(eachRec['flushPosId'], 16) > int( + eachTmpRec['replayPosId'], 16) or ( + int(eachRec['flushPosId'], 16) == int( + eachTmpRec['replayPosId'], 16) and int( + eachRec['flushPosOff'], 16) > int(eachTmpRec['replayPosOff'], 16))): + return True + else: + return False + + def backupOldClusterDBAndRelInfo(self): + + """ + function: backup old cluster db and rel info + send cmd to that node + input : NA + output: NA + """ + tmpFile = os.path.join(DefaultValue.getTmpDirFromEnv( + self.context.user), Const.TMP_DYNAMIC_DN_INFO) + try: + self.context.logger.debug("Start to backup old cluster database" + " and relation information.") + # prepare backup path + backup_path = os.path.join( + self.context.upgradeBackupPath, "oldClusterDBAndRel") + cmd = "rm -rf '%s' && mkdir '%s' -m '%s' " % \ + (backup_path, backup_path, DefaultValue.KEY_DIRECTORY_MODE) + hostList = copy.deepcopy(self.context.clusterNodes) + self.context.sshTool.executeCommand(cmd, "", hostList=hostList) + # prepare dynamic cluster info file in every node + self.generateDynamicInfoFile(tmpFile) + # get dn primary hosts + dnPrimaryNodes = self.getPrimaryDnListFromDynamicFile() + execHosts = list(set(dnPrimaryNodes)) + + # send cmd to all node and exec + cmd = "%s -t %s -U %s --upgrade_bak_path=%s -l %s" % \ + (OMCommand.getLocalScript("Local_Upgrade_Utility"), + Const.ACTION_BACKUP_OLD_CLUSTER_DB_AND_REL, + self.context.user, + self.context.upgradeBackupPath, + self.context.localLog) + self.context.logger.debug( + "Command for backing up old cluster database and " + "relation information: %s." % cmd) + self.context.sshTool.executeCommand(cmd, "", hostList=execHosts) + self.context.logger.debug("Backing up information of all nodes.") + self.context.logger.debug("Successfully backed up old cluster " + "database and relation information") + except Exception as e: + raise Exception(str(e)) + finally: + if os.path.exists(tmpFile): + deleteCmd = "(if [ -f '%s' ]; then rm -f '%s'; fi) " % \ + (tmpFile, tmpFile) + hostList = copy.deepcopy(self.context.clusterNodes) + self.context.sshTool.executeCommand( + deleteCmd, "", hostList=hostList) + + def generateDynamicInfoFile(self, tmpFile): + """ + generate dynamic info file and send to every node + :return: + """ + self.context.logger.debug( + "Start to generate dynamic info file and send to every node.") + try: + cmd = ClusterCommand.getQueryStatusCmd( + self.context.user, outFile=tmpFile) + SharedFuncs.runShellCmd(cmd, self.context.user, + self.context.userProfile) + if not os.path.exists(tmpFile): + raise Exception("Can not genetate dynamic info file") + self.context.distributeFileToSpecialNode(tmpFile, + os.path.dirname(tmpFile), + self.context.clusterNodes) + self.context.logger.debug( + "Success to generate dynamic info file and send to every node.") + except Exception as er: + raise Exception("Failed to generate dynamic info file in " + "these nodes: {0}, error: {1}".format( + self.context.clusterNodes, str(er))) + + def getPrimaryDnListFromDynamicFile(self): + """ + get primary dn list from dynamic file + :return: primary dn list + """ + try: + self.context.logger.debug( + "Start to get primary dn list from dynamic file.") + tmpFile = os.path.join(DefaultValue.getTmpDirFromEnv( + self.context.user), Const.TMP_DYNAMIC_DN_INFO) + if not os.path.exists(tmpFile): + raise Exception(ErrorCode.GAUSS_529["GAUSS_50201"] % tmpFile) + dynamicClusterStatus = DbClusterStatus() + dynamicClusterStatus.initFromFile(tmpFile) + cnAndPrimaryDnNodes = [] + # Find the master DN instance + for dbNode in dynamicClusterStatus.dbNodes: + for instance in dbNode.datanodes: + if instance.status == 'Primary': + for staticDBNode in self.context.clusterInfo.dbNodes: + if staticDBNode.id == instance.nodeId: + cnAndPrimaryDnNodes.append(staticDBNode.name) + result = list(set(cnAndPrimaryDnNodes)) + self.context.logger.debug("Success to get primary dn list from " + "dynamic file: {0}.".format(result)) + return result + except Exception as er: + raise Exception("Failed to get primary dn list from dynamic file. " + "Error:{0}".format(str(er))) + + + def touchRollbackCatalogFlag(self): + """ + before update system catalog, touch a flag file. + """ + # touch init flag file + # during rollback, if init flag file has not been touched, + # we do not need to do catalog rollback. + cmd = "touch '%s/touch_init_flag'" % self.context.upgradeBackupPath + DefaultValue.execCommandWithMode(cmd, + "create init flag file", + self.context.sshTool, + self.context.isSingle, + self.context.userProfile) + + def updateCatalog(self): + """ + function: update catalog to new version + steps: + 1.prepare update sql file and check sql file + 2.do update catalog + Input: NA + output : NA + """ + try: + self.prepareSql("upgrade-post") + self.prepareSql("upgrade") + self.prepareSql("rollback-post") + self.prepareSql("rollback") + self.doUpgradeCatalog() + except Exception as e: + raise Exception( + "Failed to execute update sql file. Error: %s" % str(e)) + + def doUpgradeCatalog(self, postUpgrade=False): + """ + function: update catalog to new version + 1.set upgrade_from param + 2.start cluster + 3.touch init files and do pre-upgrade staffs + 4.connect database and update catalog one by one + 5.stop cluster + 6.unset upgrade_from param + 7.start cluster + Input: oldClusterNumber + output : NA + """ + try: + if self.context.action == Const.ACTION_INPLACE_UPGRADE: + if not postUpgrade: + self.startCluster() + self.setUpgradeMode(1) + self.touchInitFile() + elif not postUpgrade: + # the guc parameter upgrade_from need to restart + # cmagent to take effect + self.setUpgradeMode(2) + # kill snapshot thread in kernel + self.context.killKernalSnapshotThread(self.dnInst) + # if we use --force to forceRollback last time, + # it may has remaining last catalog + if postUpgrade: + self.execRollbackUpgradedCatalog(scriptType="rollback-post") + self.execRollbackUpgradedCatalog(scriptType="upgrade-post") + else: + self.execRollbackUpgradedCatalog(scriptType="rollback") + self.execRollbackUpgradedCatalog(scriptType="upgrade") + self.pgxcNodeUpdateLocalhost("upgrade") + + if self.context.action == \ + Const.ACTION_INPLACE_UPGRADE and not postUpgrade: + self.updatePgproc() + except Exception as e: + raise Exception("update catalog failed.ERROR: %s" % str(e)) + + def updatePgproc(self): + """ + function: update pg_proc during large upgrade + :return: + """ + self.context.logger.debug( + "Start to update pg_proc in inplace large upgrade ") + # generate new csv file + execHosts = [self.dnInst.hostname] + # send cmd to all node and exec + cmd = "%s -t %s -U %s -R '%s' -l %s" % ( + OMCommand.getLocalScript("Local_Upgrade_Utility"), + Const.ACTION_CREATE_NEW_CSV_FILE, + self.context.user, + self.context.tmpDir, + self.context.localLog) + self.context.logger.debug( + "Command for create new csv file: %s." % cmd) + self.context.sshTool.executeCommand(cmd, "", hostList=execHosts) + self.context.logger.debug( + "Successfully created new csv file.") + # select all databases + database_list = self.getDatabaseList() + # create pg_proc_temp_oids + new_pg_proc_csv_path = '%s/pg_copydir/new_tbl_pg_proc_oids.csv' % \ + self.dnInst.datadir + self.createPgprocTempOids(new_pg_proc_csv_path, database_list) + # create pg_proc_temp_oids index + self.createPgprocTempOidsIndex(database_list) + # make checkpoint + self.replyXlog(database_list) + # create pg_proc_mapping.txt to save the mapping between pg_proc + # file path and pg_proc_temp_oids file path + cmd = "%s -t %s -U %s -R '%s' -l %s" % ( + OMCommand.getLocalScript("Local_Upgrade_Utility"), + Const.ACTION_CREATE_PG_PROC_MAPPING_FILE, + self.context.user, + self.context.tmpDir, + self.context.localLog) + DefaultValue.execCommandWithMode( + cmd, + "create file to save mapping between pg_proc file path and " + "pg_proc_temp_oids file path", + self.context.sshTool, + self.context.isSingle, + self.context.userProfile) + self.context.logger.debug( + "Successfully created file to save mapping between pg_proc file " + "path and pg_proc_temp_oids file path.") + # stop cluster + self.stopCluster() + # replace pg_proc data file by pg_proc_temp data file + # send cmd to all node and exec + cmd = "%s -t %s -U %s -R '%s' -l %s" % ( + OMCommand.getLocalScript("Local_Upgrade_Utility"), + Const.ACTION_REPLACE_PG_PROC_FILES, + self.context.user, + self.context.tmpDir, + self.context.localLog) + DefaultValue.execCommandWithMode( + cmd, + "replace pg_proc data file by pg_proc_temp data files", + self.context.sshTool, + self.context.isSingle, + self.context.userProfile) + self.context.logger.debug( + "Successfully replaced pg_proc data files.") + + def copy_and_modify_tableinfo_to_csv(self, old_csv_path, new_csv_path): + """ + 1. copy pg_proc info to csv file + 2. modify csv file + 3. create new table and get info by csv file + :return: + """ + sql =\ + """copy pg_proc( proname, pronamespace, proowner, prolang, + procost, prorows, provariadic, protransform, prosecdef, + proleakproof, proisstrict, proretset, provolatile, pronargs, + pronargdefaults, prorettype, proargtypes, proallargtypes, + proargmodes, proargnames, proargdefaults, prosrc, probin, + proconfig, proacl, prodefaultargpos, fencedmode, proshippable, + propackage,prokind) WITH OIDS to '%s' delimiter ',' + csv header;""" % old_csv_path + (status, output) = ClusterCommand.remoteSQLCommand( + sql, self.context.user, + self.dnInst.hostname, self.dnInst.port, False, + DefaultValue.DEFAULT_DB_NAME, IsInplaceUpgrade=True) + if status != 0: + raise Exception(ErrorCode.GAUSS_513["GAUSS_51300"] % sql + + " Error: \n%s" % str(output)) + pg_proc_csv_reader = csv.reader(open(old_csv_path, 'r')) + pg_proc_csv_data = list(pg_proc_csv_reader) + header = pg_proc_csv_data[0] + header.insert(header.index('protransform') + 1, 'proisagg') + header.insert(header.index('protransform') + 2, 'proiswindow') + new_pg_proc_csv_data = [] + new_pg_proc_csv_data.append(header) + pg_proc_data_info = pg_proc_csv_data[1:] + for i in range(2): + for info in pg_proc_data_info: + info.insert(header.index('protransform') + 2, 'True') + for info in pg_proc_data_info: + new_pg_proc_csv_data.append(info) + f = open(new_csv_path, 'w') + new_pg_proc_csv_writer = csv.writer(f) + for info in new_pg_proc_csv_data: + new_pg_proc_csv_writer.writerow(info) + f.close() + + def createPgprocTempOids(self, new_pg_proc_csv_path, database_list): + """ + create pg_proc_temp_oids + :return: + """ + sql = \ + """START TRANSACTION; SET IsInplaceUpgrade = on; + CREATE TABLE pg_proc_temp_oids (proname name NOT NULL, + pronamespace oid NOT NULL, proowner oid NOT NULL, prolang oid + NOT NULL, procost real NOT NULL, prorows real NOT NULL, + provariadic oid NOT NULL, protransform regproc NOT NULL, + proisagg boolean NOT NULL, proiswindow boolean NOT NULL, + prosecdef boolean NOT NULL, proleakproof boolean NOT NULL, + proisstrict boolean NOT NULL, proretset boolean NOT NULL, + provolatile "char" NOT NULL, pronargs smallint NOT NULL, + pronargdefaults smallint NOT NULL, prorettype oid NOT NULL, + proargtypes oidvector NOT NULL, proallargtypes oid[], + proargmodes "char"[], proargnames text[], proargdefaults + pg_node_tree, prosrc text, probin text, proconfig text[], + proacl aclitem[], prodefaultargpos int2vector,fencedmode boolean, + proshippable boolean, propackage boolean, prokind "char" NOT + NULL) with oids;""" + sql += "copy pg_proc_temp_oids WITH OIDS from '%s' with " \ + "delimiter ',' csv header FORCE NOT NULL proargtypes;" % \ + new_pg_proc_csv_path + sql += "COMMIT;" + # update proisagg and proiswindow message sql + sql += \ + "update pg_proc_temp_oids set proisagg = CASE WHEN prokind = 'a' " \ + "THEN True ELSE False END, proiswindow = CASE WHEN prokind = 'w' " \ + "THEN True ELSE False END;" + self.context.logger.debug("pg_proc_temp_oids sql is %s" % sql) + # creat table + for eachdb in database_list: + (status, output) = ClusterCommand.remoteSQLCommand( + sql, self.context.user, + self.dnInst.hostname, self.dnInst.port, False, + eachdb, IsInplaceUpgrade=True) + if status != 0: + raise Exception(ErrorCode.GAUSS_513["GAUSS_51300"] % sql + + " Error: \n%s" % str(output)) + + def createPgprocTempOidsIndex(self, database_list): + """ + create index pg_proc_oid_index_temp and + pg_proc_proname_args_nsp_index_temp + :return: + """ + sql = "CREATE UNIQUE INDEX pg_proc_oid_index_temp ON " \ + "pg_proc_temp_oids USING btree (oid) TABLESPACE pg_default;" + sql += "CREATE UNIQUE INDEX pg_proc_proname_args_nsp_index_temp ON" \ + " pg_proc_temp_oids USING btree (proname, proargtypes," \ + " pronamespace) TABLESPACE pg_default;" + # creat index + for eachdb in database_list: + (status, output) = ClusterCommand.remoteSQLCommand( + sql, self.context.user, + self.dnInst.hostname, self.dnInst.port, False, + eachdb, IsInplaceUpgrade=True) + if status != 0: + raise Exception(ErrorCode.GAUSS_513["GAUSS_51300"] % sql + + " Error: \n%s" % str(output)) + + def getDatabaseList(self): + """ + check database list in cluster + :return: + """ + self.context.logger.debug("Get database list in cluster.") + sql = "select datname from pg_database;" + (status, output) = ClusterCommand.remoteSQLCommand( + sql, self.context.user, + self.dnInst.hostname, self.dnInst.port, False, + DefaultValue.DEFAULT_DB_NAME, IsInplaceUpgrade=True) + if status != 0: + raise Exception(ErrorCode.GAUSS_513["GAUSS_51300"] % sql + + " Error: \n%s" % str(output)) + if "" == output: + raise Exception("No database objects were found in the cluster!") + reslines = (output.strip()).split('\n') + if (len(reslines) < 3 + or "template1" not in reslines + or "template0" not in reslines + or "postgres" not in reslines): + raise Exception("The database list is invalid:%s." % str(reslines)) + self.context.logger.debug("Database list in cluster is %s." % reslines) + return reslines + + def replyXlog(self, database_list): + """ + make checkpoint + :return: + """ + sql = 'CHECKPOINT;' + for eachdb in database_list: + (status, output) = ClusterCommand.remoteSQLCommand( + sql, self.context.user, + self.dnInst.hostname, self.dnInst.port, False, + eachdb, IsInplaceUpgrade=True) + if status != 0: + raise Exception(ErrorCode.GAUSS_513["GAUSS_51300"] % sql + + " Error: \n%s" % str(output)) + + def execRollbackUpgradedCatalog(self, scriptType="rollback"): + """ + function : connect database and rollback/upgrade catalog one by one + 1.find a node that has dn instance + 2.scp sql files to that node + 3.send cmd to that node and exec + input : NA + output: NA + """ + self.context.logger.debug("Start to {0} catalog.".format(scriptType)) + try: + dnNodeName = self.dnInst.hostname + if dnNodeName == "": + raise Exception(ErrorCode.GAUSS_526["GAUSS_52602"]) + self.context.logger.debug("dn nodes is {0}".format(dnNodeName)) + # scp sql files to that node + maindb_sql = "%s/%s_catalog_maindb_tmp.sql" \ + % (self.context.upgradeBackupPath, scriptType) + otherdb_sql = "%s/%s_catalog_otherdb_tmp.sql" \ + % (self.context.upgradeBackupPath, scriptType) + if "upgrade" == scriptType: + check_upgrade_sql = \ + "%s/check_upgrade_tmp.sql" % self.context.upgradeBackupPath + if not os.path.isfile(check_upgrade_sql): + raise Exception( + ErrorCode.GAUSS_502["GAUSS_50210"] % check_upgrade_sql) + self.context.logger.debug("Scp {0} file to nodes {1}".format( + check_upgrade_sql, dnNodeName)) + g_OSlib.scpFile(dnNodeName, check_upgrade_sql, + self.context.upgradeBackupPath) + if not os.path.isfile(maindb_sql): + raise Exception(ErrorCode.GAUSS_502["GAUSS_50210"] % maindb_sql) + if not os.path.isfile(otherdb_sql): + raise Exception( + ErrorCode.GAUSS_502["GAUSS_50210"] % otherdb_sql) + g_OSlib.scpFile(dnNodeName, maindb_sql, + self.context.upgradeBackupPath) + g_OSlib.scpFile(dnNodeName, otherdb_sql, + self.context.upgradeBackupPath) + self.context.logger.debug( + "Scp {0} file and {1} file to nodes {2}".format( + maindb_sql, otherdb_sql, dnNodeName)) + # send cmd to that node and exec + cmd = "%s -t %s -U %s --upgrade_bak_path=%s --script_type=%s -l " \ + "%s" % (OMCommand.getLocalScript("Local_Upgrade_Utility"), + Const.ACTION_UPDATE_CATALOG, + self.context.user, + self.context.upgradeBackupPath, + scriptType, + self.context.localLog) + self.context.logger.debug( + "Command for executing {0} catalog.".format(scriptType)) + DefaultValue.execCommandWithMode(cmd, + "{0} catalog".format(scriptType), + self.context.sshTool, + self.context.isSingle, + self.context.userProfile, + [dnNodeName]) + self.context.logger.debug( + "Successfully {0} catalog.".format(scriptType)) + except Exception as e: + self.context.logger.log("Failed to {0} catalog.".format(scriptType)) + if not self.context.forceRollback: + raise Exception(str(e)) + + def pgxcNodeUpdateLocalhost(self, mode): + """ + This function is used to modify the localhost of the system table + which pgxc_node + :param mode: + :return: + """ + try: + if int(float(self.context.newClusterNumber) * 1000) < 92069 or \ + int(float(self.context.oldClusterNumber) * 1000) >= 92069: + return + if mode == "upgrade": + self.context.logger.debug("Update localhost in pgxc_node.") + else: + self.context.logger.debug("Rollback localhost in pgxc_node.") + for dbNode in self.context.clusterInfo.dbNodes: + for dn in dbNode.datanodes: + sql = "START TRANSACTION;" + sql += "SET %s = on;" % Const.ON_INPLACE_UPGRADE + if mode == "upgrade": + sql += "UPDATE PGXC_NODE SET node_host = '%s', " \ + "node_host1 = '%s' WHERE node_host = " \ + "'localhost'; " % (dn.listenIps[0], + dn.listenIps[0]) + else: + sql += "UPDATE PGXC_NODE SET node_host = " \ + "'localhost', node_host1 = 'localhost' WHERE" \ + " node_type = 'C' and node_host = '%s';" %\ + (dn.listenIps[0]) + sql += "COMMIT;" + self.context.logger.debug("Current sql %s." % sql) + (status, output) = ClusterCommand.remoteSQLCommand( + sql, self.context.user, dn.hostname, dn.port, + False, DefaultValue.DEFAULT_DB_NAME, + IsInplaceUpgrade=True) + if status != 0: + if self.context.forceRollback: + self.context.logger.debug("In forceRollback, " + "roll back pgxc_node. " + "%s " % str(output)) + else: + raise Exception(ErrorCode.GAUSS_513["GAUSS_51300"] + % sql + " Error: \n%s" % + str(output)) + if mode == "upgrade": + self.context.logger.debug( + "Success update localhost in pgxc_node.") + else: + self.context.logger.debug( + "Success rollback localhost in pgxc_node.") + except Exception as e: + raise Exception(str(e)) + + def touchInitFile(self): + """ + function: touch upgrade init file for every primary/standby and + do pre-upgrade staffs + input : NA + output: NA + """ + try: + if self.isLargeInplaceUpgrade: + self.context.logger.debug("Start to create upgrade init file.") + # send cmd to all node and exec + cmd = "%s -t %s -U %s --upgrade_bak_path=%s -l %s" % \ + (OMCommand.getLocalScript("Local_Upgrade_Utility"), + Const.ACTION_TOUCH_INIT_FILE, + self.context.user, + self.context.upgradeBackupPath, + self.context.localLog) + DefaultValue.execCommandWithMode(cmd, + "create upgrade init file", + self.context.sshTool, + self.context.isSingle, + self.context.userProfile) + self.context.logger.debug( + "Successfully created upgrade init file.") + except Exception as e: + raise Exception(str(e)) + + def prepareSql(self, mode="rollback"): + """ + function : prepare 4 files: rollback_catalog_maindb_tmp.sql, + rollback_catalog_otherdb_tmp.sql and upgrade file + 2.for each result file: filter all files and merge + into the *_tmp.sql file + + :param rollback: can be rollback or upgrade + """ + try: + self.prepareSqlForDb(mode) + self.prepareSqlForDb(mode, "otherdb") + if mode == "upgrade": + self.prepareCheckSql() + except Exception as e: + raise Exception("Failed to prepare %s sql file failed. ERROR: %s" + % (mode, str(e))) + + def prepareSqlForDb(self, mode, dbType="maindb"): + self.context.logger.debug( + "Start to prepare {0} sql files for {1}.".format(mode, dbType)) + header = self.getSqlHeader() + if "upgrade" in mode: + listName = "upgrade" + else: + listName = "rollback" + fileNameList = self.getFileNameList("{0}_catalog_{1}".format( + listName, dbType), mode) + if "rollback" in mode: + fileNameList.sort(reverse=True) + else: + fileNameList.sort() + fileName = "{0}_catalog_{1}_tmp.sql".format(mode, dbType) + self.context.logger.debug("The real file list for %s: %s" % ( + dbType, fileNameList)) + self.togetherFile(header, "{0}_catalog_{1}".format(listName, dbType), + fileNameList, fileName) + self.context.logger.debug("Successfully prepared sql files for %s." + % dbType) + + def prepareCheckSql(self): + header = ["START TRANSACTION;"] + fileNameList = self.getFileNameList("check_upgrade") + fileNameList.sort() + self.context.logger.debug("The real file list for checking upgrade: " + "%s" % fileNameList) + self.togetherFile(header, "check_upgrade", fileNameList, + "check_upgrade_tmp.sql") + + def togetherFile(self, header, filePathName, fileNameList, executeFileName): + writeFile = "" + try: + filePath = "%s/upgrade_sql/%s" % (self.context.upgradeBackupPath, + filePathName) + self.context.logger.debug("Preparing [%s]." % filePath) + writeFile = "%s/%s" % (self.context.upgradeBackupPath, + executeFileName) + g_file.createFile(writeFile) + g_file.writeFile(writeFile, header, 'w') + + with open(writeFile, 'a') as sqlFile: + for each_file in fileNameList: + each_file_with_path = "%s/%s" % (filePath, each_file) + self.context.logger.debug("Handling file: %s" % + each_file_with_path) + with open(each_file_with_path, 'r') as fp: + for line in fp: + sqlFile.write(line) + sqlFile.write(os.linesep) + g_file.writeFile(writeFile, ["COMMIT;"], 'a') + self.context.logger.debug( + "Success to together {0} file".format(writeFile)) + if not os.path.isfile(writeFile): + raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % writeFile) + except Exception as e: + raise Exception("Failed to write {0} sql file. ERROR: {1}".format( + writeFile, str(e))) + + def modifyPgProcIndex(self): + """ + 1. 执行重建pg_proc index 的sql + 2. make checkpoint + 3. stop cluster + 4. start cluster + :return: + """ + self.context.logger.debug("Begin to modify pg_proc index.") + time.sleep(3) + database_list = self.getDatabaseList() + # 执行重建pg_proc index 的sql + sql = """START TRANSACTION;SET IsInplaceUpgrade = on; + drop index pg_proc_oid_index;SET LOCAL + inplace_upgrade_next_system_object_oids=IUO_CATALOG,false, + true,0,0,0,2690;CREATE UNIQUE INDEX pg_proc_oid_index ON pg_proc + USING btree (oid);SET LOCAL + inplace_upgrade_next_system_object_oids=IUO_CATALOG,false, + true,0,0,0,0;commit;CHECKPOINT;""" + for eachdb in database_list: + (status, output) = ClusterCommand.remoteSQLCommand( + sql, self.context.user, + self.dnInst.hostname, self.dnInst.port, False, + eachdb, IsInplaceUpgrade=True) + if status != 0: + raise Exception(ErrorCode.GAUSS_513["GAUSS_51300"] % sql + + " Error: \n%s" % str(output)) + sql = """START TRANSACTION;SET IsInplaceUpgrade = on; + drop index pg_proc_proname_args_nsp_index;SET LOCAL + inplace_upgrade_next_system_object_oids=IUO_CATALOG,false, + true,0,0,0,2691;create UNIQUE INDEX pg_proc_proname_args_nsp_index + ON pg_proc USING btree (proname, proargtypes, pronamespace);SET + LOCAL inplace_upgrade_next_system_object_oids=IUO_CATALOG,false, + true,0,0,0,0;commit;CHECKPOINT;""" + for eachdb in database_list: + (status, output) = ClusterCommand.remoteSQLCommand( + sql, self.context.user, + self.dnInst.hostname, self.dnInst.port, False, + eachdb, IsInplaceUpgrade=True) + if status != 0: + raise Exception(ErrorCode.GAUSS_513["GAUSS_51300"] % sql + + " Error: \n%s" % str(output)) + # stop cluster + self.stopCluster() + # start cluster + self.startCluster() + self.context.logger.debug("Successfully modified pg_proc index.") + def setNewVersionGuc(self): """ function: set new Version guc @@ -1174,6 +2236,7 @@ class UpgradeImpl: try: self.checkStaticConfig() + self.startCluster() # Mark that we leave pre commit status, # so that if we fail at the first few steps, # we won't be allowed to commit upgrade any more. @@ -1183,9 +2246,22 @@ class UpgradeImpl: Const.BINARY_UPGRADE_STEP_START_NODE) if step >= Const.BINARY_UPGRADE_STEP_START_NODE: + # drop table and index after large upgrade + if self.isLargeInplaceUpgrade: + if self.check_upgrade_mode(): + self.drop_table_or_index() self.restoreClusterConfig(True) self.switchBin(Const.OLD) - self.stopCluster() + if self.isLargeInplaceUpgrade: + touchInitFlagFile = os.path.join( + self.context.upgradeBackupPath, "touch_init_flag") + if os.path.exists(touchInitFlagFile): + self.rollbackCatalog() + self.cleanCsvFile() + else: + self.setUpgradeMode(0) + else: + self.stopCluster() self.recordNodeStepInplace( Const.ACTION_INPLACE_UPGRADE, Const.BINARY_UPGRADE_STEP_UPGRADE_APP) @@ -1198,6 +2274,7 @@ class UpgradeImpl: Const.BINARY_UPGRADE_STEP_BACKUP_VERSION) if step >= Const.BINARY_UPGRADE_STEP_BACKUP_VERSION: + self.cleanBackupedCatalogPhysicalFiles(True) self.recordNodeStepInplace( Const.ACTION_INPLACE_UPGRADE, Const.BINARY_UPGRADE_STEP_STOP_NODE) @@ -1222,6 +2299,191 @@ class UpgradeImpl: self.context.logger.log("Rollback succeeded.") return True + def check_table_or_index_exist(self, name, eachdb): + """ + check a table exist + :return: + """ + sql = "select count(*) from pg_class where relname = '%s';" % name + (status, output) = ClusterCommand.remoteSQLCommand( + sql, self.context.user, + self.dnInst.hostname, self.dnInst.port, False, + eachdb, IsInplaceUpgrade=True) + if status != 0 or ClusterCommand.findErrorInSql(output): + raise Exception(ErrorCode.GAUSS_513["GAUSS_51300"] % sql + + " Error: \n%s" % str(output)) + if output == '0': + self.context.logger.debug("Table does not exist.") + return False + self.context.logger.debug("Table exists.") + return True + + def drop_table_or_index(self): + """ + drop a table + :return: + """ + self.context.logger.debug("Start to drop table or index") + database_list = self.getDatabaseList() + # drop table and index + maindb = "postgres" + otherdbs = database_list + otherdbs.remove("postgres") + # check table exist in postgres + table_name = 'pg_proc_temp_oids' + if self.check_table_or_index_exist(table_name, maindb): + self.drop_one_database_table_or_index([maindb]) + else: + return + # drop other database table and index + self.drop_one_database_table_or_index(otherdbs) + self.context.logger.debug( + "Successfully droped table or index.") + + def drop_one_database_table_or_index(self, + database_list): + """ + drop a table in one database + :return: + """ + table_name = 'pg_proc_temp_oids' + delete_table_sql = "drop table %s;" % table_name + index_name_list = ['pg_proc_oid_index_temp', + 'pg_proc_proname_args_nsp_index_temp'] + for eachdb in database_list: + if self.check_table_or_index_exist(table_name, eachdb): + (status, output) = ClusterCommand.remoteSQLCommand( + delete_table_sql, self.context.user, + self.dnInst.hostname, self.dnInst.port, False, + eachdb, IsInplaceUpgrade=True) + if status != 0: + raise Exception( + ErrorCode.GAUSS_513["GAUSS_51300"] % delete_table_sql + + " Error: \n%s" % str(output)) + for index in index_name_list: + if self.check_table_or_index_exist(index, eachdb): + sql = "drop index %s;" % index + (status, output) = ClusterCommand.remoteSQLCommand( + sql, self.context.user, + self.dnInst.hostname, self.dnInst.port, False, + eachdb, IsInplaceUpgrade=True) + if status != 0: + raise Exception( + ErrorCode.GAUSS_513[ + "GAUSS_51300"] % sql + " Error: \n%s" % str( + output)) + + def rollbackCatalog(self): + """ + function: rollback catalog change + steps: + 1.prepare update sql file and check sql file + 2.do rollback catalog + input : NA + output: NA + """ + try: + if self.context.action == Const.ACTION_INPLACE_UPGRADE and int( + float(self.context.oldClusterNumber) * 1000) <= 93000: + raise Exception("For this old version %s, we only support " + "physical rollback." % str( + self.context.oldClusterNumber)) + self.context.logger.log("Rollbacking catalog.") + self.prepareUpgradeSqlFolder() + self.prepareSql() + self.doRollbackCatalog() + self.context.logger.log("Successfully Rollbacked catalog.") + except Exception as e: + if self.context.action == Const.ACTION_INPLACE_UPGRADE: + self.context.logger.debug( + "Failed to perform rollback operation by rolling " + "back SQL files:\n%s" % str(e)) + try: + self.context.logger.debug("Try to recover again using " + "catalog physical files") + self.doPhysicalRollbackCatalog() + except Exception as e: + raise Exception( + "Failed to rollback catalog. ERROR: %s" % str(e)) + else: + raise Exception( + "Failed to rollback catalog. ERROR: %s" % str(e)) + + + def doRollbackCatalog(self): + """ + function : rollback catalog change + steps: + stop cluster + set upgrade_from param + start cluster + connect database and rollback catalog changes one by one + stop cluster + unset upgrade_from param + input : NA + output: NA + """ + if self.context.action == Const.ACTION_INPLACE_UPGRADE: + self.startCluster() + self.setUpgradeMode(1) + else: + self.setUpgradeMode(2) + self.execRollbackUpgradedCatalog(scriptType="rollback") + self.pgxcNodeUpdateLocalhost("rollback") + if self.context.action == Const.ACTION_INPLACE_UPGRADE: + self.stopCluster() + self.setUpgradeMode(0) + + def doPhysicalRollbackCatalog(self): + """ + function : rollback catalog by restore physical files + stop cluster + unset upgrade_from param + restore physical files + input : NA + output: NA + """ + try: + self.startCluster() + self.setUpgradeMode(0) + self.stopCluster() + self.execPhysicalRollbackUpgradedCatalog() + except Exception as e: + raise Exception(str(e)) + + def execPhysicalRollbackUpgradedCatalog(self): + """ + function : rollback catalog by restore physical files + send cmd to all node + input : NA + output: NA + """ + try: + if self.isLargeInplaceUpgrade: + self.context.logger.debug( + "Start to restore physical catalog files.") + # send cmd to all node and exec + cmd = "%s -t %s -U %s --upgrade_bak_path=%s " \ + "--oldcluster_num='%s' -l %s" % \ + (OMCommand.getLocalScript("Local_Upgrade_Utility"), + Const.ACTION_RESTORE_OLD_CLUSTER_CATALOG_PHYSICAL_FILES, + self.context.user, + self.context.upgradeBackupPath, + self.context.oldClusterNumber, + self.context.localLog) + self.context.logger.debug( + "Command for restoring physical catalog files: %s." % cmd) + DefaultValue.execCommandWithMode( + cmd, + "restore physical files of catalog objects", + self.context.sshTool, + self.context.isSingle, + self.context.userProfile) + self.context.logger.debug( + "Successfully restored physical catalog files.") + except Exception as e: + raise Exception(str(e)) + def getSqlHeader(self): """ function: get sql header @@ -1235,7 +2497,7 @@ class UpgradeImpl: header.append("SET local log_min_messages = NOTICE;") return header - def getFileNameList(self, filePathName): + def getFileNameList(self, filePathName, scriptType="_"): """ function: get file name list input : filePathName @@ -1252,10 +2514,9 @@ class UpgradeImpl: continue prefix = each_sql_file.split('.')[0] resList = prefix.split('_') - if len(resList) != 5: + if len(resList) != 5 or scriptType not in resList: continue file_num = "%s.%s" % (resList[3], resList[4]) - if self.floatMoreThan(float(file_num), self.context.oldClusterNumber) and \ self.floatGreaterOrEqualTo(self.context.newClusterNumber, @@ -1442,6 +2703,7 @@ class UpgradeImpl: # newClusterNumber, the oldClusterInfo is same with new try: self.context.oldClusterInfo = self.context.clusterInfo + self.getOneDNInst(True) if os.path.isfile(commonDbClusterInfoModule) and \ os.path.isfile(commonStaticConfigFile): # import old module @@ -1540,6 +2802,9 @@ class UpgradeImpl: # we will get the self.context.newClusterAppPath in # choseStrategy self.context.clusterInfo.initFromXml(self.context.xmlFile) + if self.context.is_inplace_upgrade or \ + self.context.action == Const.ACTION_AUTO_ROLLBACK: + self.getOneDNInst() self.context.logger.debug("Successfully init cluster config.") else: raise Exception(ErrorCode.GAUSS_500["GAUSS_50004"] % 't' + @@ -1548,6 +2813,74 @@ class UpgradeImpl: self.context.logger.debug(traceback.format_exc()) self.exitWithRetCode(self.context.action, False, str(e)) + def getOneDNInst(self, checkNormal=False): + """ + function: find a dn instance by dbNodes, + which we can execute SQL commands + input : NA + output: DN instance + """ + try: + self.context.logger.debug( + "Get one DN. CheckNormal is %s" % checkNormal) + dnInst = None + clusterNodes = self.context.oldClusterInfo.dbNodes + for dbNode in clusterNodes: + if len(dbNode.datanodes) == 0: + continue + dnInst = dbNode.datanodes[0] + primaryDnNode = DefaultValue.getPrimaryNode( + self.context.userProfile) + if dnInst.hostname not in primaryDnNode: + continue + break + + if checkNormal: + (checkStatus, checkResult) = OMCommand.doCheckStaus( + self.context.user, 0) + if checkStatus == 0: + self.context.logger.debug("The cluster status is normal," + " no need to check dn status.") + else: + clusterStatus = \ + OMCommand.getClusterStatus(self.context.user) + if clusterStatus is None: + raise Exception(ErrorCode.GAUSS_516["GAUSS_51600"]) + clusterInfo = dbClusterInfo() + clusterInfo.initFromXml(self.context.xmlFile) + clusterInfo.dbNodes.extend(clusterNodes) + for dbNode in clusterInfo.dbNodes: + if len(dbNode.datanodes) == 0: + continue + dn = dbNode.datanodes[0] + primaryDnNode = DefaultValue.getPrimaryNode( + self.context.userProfile) + if dn.hostname not in primaryDnNode: + continue + dbInst = clusterStatus.getInstanceStatusById( + dn.instanceId) + if dbInst is None: + continue + if dbInst.status == "Normal": + self.context.logger.debug( + "DN from %s is healthy." % dn.hostname) + dnInst = dn + break + self.context.logger.debug( + "DN from %s is unhealthy." % dn.hostname) + + # check if contain DN on nodes + if not dnInst or dnInst == []: + raise Exception(ErrorCode.GAUSS_526["GAUSS_52602"]) + else: + self.context.logger.debug("Successfully get one DN from %s." + % dnInst.hostname) + self.dnInst = dnInst + + except Exception as e: + self.context.logger.log("Failed to get one DN. Error: %s" % str(e)) + raise Exception(ErrorCode.GAUSS_516["GAUSS_51624"]) + def verifyClusterConfigInfo(self, clusterInfo, oldClusterInfo, ignoreFlag="upgradectl"): """ @@ -1838,12 +3171,66 @@ class UpgradeImpl: self.backupHotpatch() # backup version file. self.backup_version_file() + + if not self.isLargeInplaceUpgrade: + return + # backup catalog data files if needed + self.backupCatalogFiles() + + # backup DS libs and gds file + cmd = "%s -t %s -U %s --upgrade_bak_path=%s -l %s" % \ + (OMCommand.getLocalScript("Local_Upgrade_Utility"), + Const.ACTION_INPLACE_BACKUP, + self.context.user, + self.context.upgradeBackupPath, + self.context.localLog) + self.context.logger.debug( + "Command for backing up gds file: %s" % cmd) + DefaultValue.execCommandWithMode(cmd, + "backup DS libs and gds file", + self.context.sshTool, + self.context.isSingle, + self.context.userProfile) except Exception as e: raise Exception(str(e)) self.context.logger.log("Successfully backed up cluster " "configuration.", "constant") + def backupCatalogFiles(self): + """ + function: backup physical files of catalg objects + 1.check if is inplace upgrade + 2.get database list + 3.get catalog objects list + 4.backup physical files for each database + 5.backup global folder + input : NA + output: NA + """ + try: + # send cmd to all node and exec + cmd = "%s -t %s -U %s --upgrade_bak_path=%s " \ + "--oldcluster_num='%s' -l %s" % \ + (OMCommand.getLocalScript("Local_Upgrade_Utility"), + Const.ACTION_BACKUP_OLD_CLUSTER_CATALOG_PHYSICAL_FILES, + self.context.user, + self.context.upgradeBackupPath, + self.context.oldClusterNumber, + self.context.localLog) + self.context.logger.debug("Command for backing up physical files " + "of catalg objects: %s" % cmd) + DefaultValue.execCommandWithMode( + cmd, + "backup physical files of catalg objects", + self.context.sshTool, + self.context.isSingle, + self.context.userProfile) + self.context.logger.debug("Successfully backed up catalog " + "physical files for old cluster.") + except Exception as e: + raise Exception(str(e)) + def syncNewGUC(self): """ function: sync newly added guc during inplace upgrade. @@ -2010,14 +3397,16 @@ class UpgradeImpl: else: # restore static configuration cmd = "%s -t %s -U %s -V %d --upgrade_bak_path=%s " \ - "--new_cluster_app_path=%s -l %s" % \ - (OMCommand.getLocalScript("Local_Upgrade_Utility"), - Const.ACTION_RESTORE_CONFIG, - self.context.user, - int(float(self.context.oldClusterNumber) * 1000), - self.context.upgradeBackupPath, - self.context.newClusterAppPath, - self.context.localLog) + "--old_cluster_app_path=%s --new_cluster_app_path=%s " \ + "-l %s" % ( + OMCommand.getLocalScript("Local_Upgrade_Utility"), + Const.ACTION_RESTORE_CONFIG, + self.context.user, + int(float(self.context.oldClusterNumber) * 1000), + self.context.upgradeBackupPath, + self.context.oldClusterAppPath, + self.context.newClusterAppPath, + self.context.localLog) self.context.logger.debug("Command for restoring " "config files: %s" % cmd) @@ -2026,6 +3415,22 @@ class UpgradeImpl: self.context.sshTool, self.context.isSingle, self.context.mpprcFile) + if self.isLargeInplaceUpgrade: + # backup DS libs and gds file + cmd = "%s -t %s -U %s --upgrade_bak_path=%s -l %s" % \ + (OMCommand.getLocalScript("Local_Upgrade_Utility"), + Const.ACTION_INPLACE_BACKUP, + self.context.user, + self.context.upgradeBackupPath, + self.context.localLog) + self.context.logger.debug( + "Command for restoreing DS libs and gds file: %s" % cmd) + DefaultValue.execCommandWithMode( + cmd, + "restore DS libs and gds file", + self.context.sshTool, + self.context.isSingle, + self.context.userProfile) # change the owner of application cmd = "chown -R %s:%s '%s'" % \ (self.context.user, self.context.group, @@ -2222,9 +3627,12 @@ class UpgradeImpl: (self.context.tmpDir, Const.CLUSTER_CNSCONF_FILE, self.context.tmpDir, Const.CLUSTER_CNSCONF_FILE) cmd += "(rm -f '%s'/gauss_crontab_file_*) &&" % self.context.tmpDir - cmd += "(if [ -d '%s' ]; then rm -rf '%s'; fi) " % \ - (self.context.upgradeBackupPath, - self.context.upgradeBackupPath) + cmd += "(if [ -d '%s' ]; then rm -rf '%s'; fi) &&" % \ + (self.context.upgradeBackupPath, + self.context.upgradeBackupPath) + cmd += "(if [ -f '%s/pg_proc_mapping.txt' ]; then rm -f" \ + " '%s/pg_proc_mapping.txt'; fi)" % \ + (self.context.tmpDir, self.context.tmpDir) self.context.logger.debug("Command for clean " "backup files: %s" % cmd) DefaultValue.execCommandWithMode(cmd, diff --git a/script/local/StartInstance.py b/script/local/StartInstance.py index 27b61fa..bd764b9 100644 --- a/script/local/StartInstance.py +++ b/script/local/StartInstance.py @@ -46,6 +46,7 @@ class Start(LocalBaseOM): self.logger = None self.installPath = "" self.security_mode = "" + self.cluster_number = None def usage(self): """ @@ -72,7 +73,8 @@ General options: """ try: opts, args = getopt.getopt(sys.argv[1:], "U:D:R:l:t:h?", - ["help", "security-mode="]) + ["help", "security-mode=", + "cluster_number="]) except getopt.GetoptError as e: GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50000"] % str(e)) @@ -96,6 +98,8 @@ General options: sys.exit(0) elif key == "--security-mode": self.security_mode = value + elif key == "--cluster_number": + self.cluster_number = value else: GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50000"] % key) @@ -134,7 +138,10 @@ General options: for dn in self.dnCons: if self.dataDir != "" and dn.instInfo.datadir != self.dataDir: continue - dn.start(self.time_out, self.security_mode) + if self.cluster_number: + dn.start(self.time_out, self.security_mode, self.cluster_number) + else: + dn.start(self.time_out, self.security_mode) isDataDirCorrect = True if not isDataDirCorrect: diff --git a/script/local/UnPreInstallUtility.py b/script/local/UnPreInstallUtility.py index 2fdf13f..1c0d68a 100644 --- a/script/local/UnPreInstallUtility.py +++ b/script/local/UnPreInstallUtility.py @@ -36,6 +36,7 @@ from gspylib.os.gsnetwork import g_network from gspylib.os.gsservice import g_service from gspylib.common.LocalBaseOM import LocalBaseOM from gspylib.os.gsfile import g_Platform +import impl.upgrade.UpgradeConst as Const ACTION_CLEAN_SYSLOG_CONFIG = 'clean_syslog_config' ACTION_CLEAN_TOOL_ENV = 'clean_tool_env' @@ -361,6 +362,10 @@ class Postuninstall(LocalBaseOM): g_file.removeDirectory(path) path = "%s/unixodbc" % self.clusterToolPath g_file.removeDirectory(path) + path = "%s/%s" % (self.clusterToolPath, Const.UPGRADE_SQL_FILE) + g_file.removeFile(path) + path = "%s/%s" % (self.clusterToolPath, Const.UPGRADE_SQL_SHA) + g_file.removeFile(path) self.logger.debug( "Successfully cleaned the environmental software and variable.") diff --git a/script/local/UpgradeUtility.py b/script/local/UpgradeUtility.py index 7159458..6614c9e 100644 --- a/script/local/UpgradeUtility.py +++ b/script/local/UpgradeUtility.py @@ -30,6 +30,9 @@ import time import traceback import json import platform +import shutil +import copy +import csv from multiprocessing.dummy import Pool as ThreadPool sys.path.append(sys.path[0] + "/../") @@ -39,6 +42,7 @@ from gspylib.common.Common import DefaultValue, ClusterCommand, \ from gspylib.common.ParameterParsecheck import Parameter from gspylib.common.DbClusterInfo import dbClusterInfo from gspylib.common.ErrorCode import ErrorCode +from gspylib.common.DbClusterStatus import DbClusterStatus from gspylib.os.gsfile import g_file import impl.upgrade.UpgradeConst as const @@ -96,11 +100,13 @@ class CmdOptions(): self.xmlFile = "" # inplace upgrade bak path or grey upgrade path self.upgrade_bak_path = "" + self.scriptType = "" self.rollback = False self.forceRollback = False self.oldClusterAppPath = "" self.newClusterAppPath = "" self.gucStr = "" + self.oldclusternum = "" self.postgisSOFileList = \ {"postgis-*.*.so": "lib/postgresql/", "libgeos_c.so.*": "lib/", @@ -263,10 +269,12 @@ Common options: -X the xml configure file --help show this help, then exit --upgrade_bak_path always be the $PGHOST/binary_upgrade + --scriptType upgrade script type --old_cluster_app_path absolute path with old commit id --new_cluster_app_path absolute path with new commit id --rollback is rollback --guc_string check the guc string has been successfully + --oldcluster_num old cluster number wrote in the configure file, format is guc:value, can only check upgrade_from, upgrade_mode """ @@ -282,9 +290,9 @@ def parseCommandLine(): try: opts, args = getopt.getopt(sys.argv[1:], "t:U:R:l:V:X:", ["help", "upgrade_bak_path=", - "old_cluster_app_path=", + "script_type=", "old_cluster_app_path=", "new_cluster_app_path=", "rollback", - "force", "guc_string="]) + "force", "guc_string=", "oldcluster_num="]) except Exception as e: usage() GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50000"] % str(e)) @@ -311,6 +319,8 @@ def parseCommandLine(): g_opts.xmlFile = os.path.realpath(value) elif key == "--upgrade_bak_path": g_opts.upgrade_bak_path = os.path.normpath(value) + elif key == "--script_type": + g_opts.scriptType = os.path.normpath(value) elif key == "--old_cluster_app_path": g_opts.oldClusterAppPath = os.path.normpath(value) elif key == "--new_cluster_app_path": @@ -321,6 +331,8 @@ def parseCommandLine(): g_opts.forceRollback = True elif key == "--guc_string": g_opts.gucStr = value + elif key == "--oldcluster_num": + g_opts.oldclusternum = value else: GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50000"] % key) @@ -355,6 +367,10 @@ def checkParameter(): [const.ACTION_SWITCH_BIN, const.ACTION_CLEAN_INSTALL_PATH] and not g_opts.appPath: GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50001"] % "R") + elif g_opts.action in [const.ACTION_UPGRADE_SQL_FOLDER] and not \ + g_opts.upgrade_bak_path: + GaussLog.exitWithError( + ErrorCode.GAUSS_500["GAUSS_50001"] % "-upgrade_bak_path") # Check the incoming parameter -U if g_opts.user == "": g_opts.user = pwd.getpwuid(os.getuid()).pw_name @@ -489,18 +505,15 @@ def syncPostgresqlconf(dbInstance): if dbInstance.instanceRole == DefaultValue.INSTANCE_ROLE_DATANODE: # rebuild replconninfo connInfo1 = None - connInfo2 = None dummyStandbyInst = None peerInsts = g_clusterInfo.getPeerInstance(dbInstance) if len(peerInsts) > 0: - (connInfo1, connInfo2, dummyStandbyInst) = \ - ClusterInstanceConfig.setReplConninfo( - dbInstance, - peerInsts, - g_clusterInfo)[0:3] - gucParamDict["replconninfo1"] = "'%s'" % connInfo1 - if dummyStandbyInst is not None: - gucParamDict["replconninfo2"] = "'%s'" % connInfo2 + (connInfo1, _) = ClusterInstanceConfig.\ + setReplConninfoForSinglePrimaryMultiStandbyCluster( + dbInstance, peerInsts, g_clusterInfo) + for i in range(len(connInfo1)): + connInfo = "replconninfo" + "%d" % (i + 1) + gucParamDict[connInfo] = "'%s'" % connInfo1[i] if len(gucParamDict) > 0: gucStr = "" @@ -655,15 +668,10 @@ def touchInstanceInitFile(): g_logger.log("Touch init file.") try: InstanceList = [] - # find all CN instances need to touch - if len(g_dbNode.coordinators) != 0: - for eachInstance in g_dbNode.coordinators: - InstanceList.append(eachInstance) # find all DB instances need to touch if len(g_dbNode.datanodes) != 0: for eachInstance in g_dbNode.datanodes: - if ( - eachInstance.instanceType == MASTER_INSTANCE + if (eachInstance.instanceType == MASTER_INSTANCE or eachInstance.instanceType == STANDBY_INSTANCE): InstanceList.append(eachInstance) @@ -797,44 +805,46 @@ def touchOneInstanceInitFile(instance): def getInstanceName(instance): """ - function: get master instance name - input: NA - output: NA + get master instance name """ instance_name = "" if instance.instanceRole == INSTANCE_ROLE_COODINATOR: instance_name = "cn_%s" % instance.instanceId elif instance.instanceRole == INSTANCE_ROLE_DATANODE: - # if dn, it should be master or standby dn - if instance.instanceType == DUMMY_STANDBY_INSTANCE: - raise Exception( - ErrorCode.GAUSS_529["GAUSS_52943"] % instance.instanceType) - peerInsts = g_clusterInfo.getPeerInstance(instance) - if len(peerInsts) != 2 and len(peerInsts) != 1: - raise Exception(ErrorCode.GAUSS_516["GAUSS_51620"] % "peer") - masterInst = None - standbyInst = None - for i in iter(peerInsts): - if i.instanceType == MASTER_INSTANCE: - masterInst = i - standbyInst = instance - instance_name = "dn_%d_%d" % ( - masterInst.instanceId, standbyInst.instanceId) - elif i.instanceType == STANDBY_INSTANCE: - standbyInst = i - masterInst = instance - instance_name = "dn_%d_%d" % ( - masterInst.instanceId, standbyInst.instanceId) - else: - # we are searching master or standby DB instance, - # if dummy dn, just continue - continue + if g_clusterInfo.isSingleInstCluster(): + # the instance type must be master or standby dn + peerInsts = g_clusterInfo.getPeerInstance(instance) + (instance_name, masterInst, _) = \ + ClusterInstanceConfig.\ + getInstanceInfoForSinglePrimaryMultiStandbyCluster( + instance, peerInsts) + else: + # if dn, it should be master or standby dn + if instance.instanceType == DUMMY_STANDBY_INSTANCE: + raise Exception( + "Invalid instance type:%s" % instance.instanceType) + peerInsts = g_clusterInfo.getPeerInstance(instance) + if len(peerInsts) != 2 and len(peerInsts) != 1: + raise Exception(ErrorCode.GAUSS_516["GAUSS_51620"] % "peer") + for i in range(len(peerInsts)): + if peerInsts[i].instanceType == MASTER_INSTANCE: + masterInst = peerInsts[i] + standbyInst = instance + instance_name = "dn_%d_%d" % (masterInst.instanceId, + standbyInst.instanceId) + elif peerInsts[i].instanceType == STANDBY_INSTANCE: + standbyInst = peerInsts[i] + masterInst = instance + instance_name = "dn_%d_%d" % (masterInst.instanceId, + standbyInst.instanceId) + else: + # we are searching master or standby dn instance, + # if dummy dn, just continue + continue if instance_name == "": - raise Exception(ErrorCode.GAUSS_529["GAUSS_52939"] - % "instance name!") + raise Exception("Can not get instance name!") else: - raise Exception(ErrorCode.GAUSS_529["GAUSS_52940"] - % instance.instanceRole) + raise Exception("Invalid node type:%s" % instance.instanceRole) return instance_name.strip() @@ -854,9 +864,8 @@ def getStandbyInstance(instance): instance.instanceRole) peerInsts = g_clusterInfo.getPeerInstance(instance) - if len(peerInsts) != 2 and len(peerInsts) != 1: - raise Exception(ErrorCode.GAUSS_516["GAUSS_51620"] % "peer") - + if len(peerInsts) == 0: + return standbyInst = None for i in iter(peerInsts): if i.instanceType == STANDBY_INSTANCE: @@ -880,23 +889,19 @@ def getJsonFile(instance, backup_path): # load db and catalog info from json file if instance.instanceRole == INSTANCE_ROLE_COODINATOR: db_and_catalog_info_file_name = \ - "%s/cn_db_and_catalog_info_%s.json" \ - % (backup_path, instance_name) + "%s/cn_db_and_catalog_info_%s.json" % ( + backup_path, instance_name) elif instance.instanceRole == INSTANCE_ROLE_DATANODE: - if instance.instanceType == MASTER_INSTANCE: + if instance.instanceType == MASTER_INSTANCE or\ + instance.instanceType == STANDBY_INSTANCE: db_and_catalog_info_file_name = \ - "%s/master_dn_db_and_catalog_info_%s.json" \ - % (backup_path, instance_name) - elif instance.instanceType == STANDBY_INSTANCE: - db_and_catalog_info_file_name = \ - "%s/standby_dn_db_and_catalog_info_%s.json" \ - % (backup_path, instance_name) + "%s/dn_db_and_catalog_info_%s.json" % ( + backup_path, instance_name) else: raise Exception( - ErrorCode.GAUSS_529["GAUSS_52943"] % instance.instanceType) + "Invalid instance type:%s" % instance.instanceType) else: - raise Exception(ErrorCode.GAUSS_529["GAUSS_52941"] % - instance.instanceRole) + raise Exception("Invalid instance role:%s" % instance.instanceRole) return db_and_catalog_info_file_name except Exception as e: raise Exception(str(e)) @@ -904,20 +909,16 @@ def getJsonFile(instance, backup_path): def __backup_base_folder(instance): """ - function: back base folder - input : instance - output : NA """ - g_logger.debug( - "Backup instance catalog physical files. Instance data dir: %s" - % instance.datadir) + g_logger.debug("Backup instance catalog physical files. " + "Instance data dir: %s" % instance.datadir) backup_path = "%s/oldClusterDBAndRel/" % g_opts.upgrade_bak_path db_and_catalog_info_file_name = getJsonFile(instance, backup_path) - with open(db_and_catalog_info_file_name, 'r') as fp: - dbInfoStr = fp.read() - dbInfoDict = {} + fp = open(db_and_catalog_info_file_name, 'r') + dbInfoStr = fp.read() + fp.close() dbInfoDict = json.loads(dbInfoStr) # get instance name @@ -929,55 +930,63 @@ def __backup_base_folder(instance): if each_db["spclocation"].startswith('/'): tbsBaseDir = each_db["spclocation"] else: - tbsBaseDir = "%s/pg_location/%s" % ( - instance.datadir, each_db["spclocation"]) + tbsBaseDir = "%s/pg_location/%s" % (instance.datadir, + each_db["spclocation"]) pg_catalog_base_dir = "%s/%s_%s/%d" % ( tbsBaseDir, DefaultValue.TABLESPACE_VERSION_DIRECTORY, instance_name, int(each_db["dboid"])) else: - pg_catalog_base_dir = "%s/base/%d" % ( - instance.datadir, int(each_db["dboid"])) + pg_catalog_base_dir = "%s/base/%d" % (instance.datadir, + int(each_db["dboid"])) # for base folder, template0 need handle specially if each_db["dbname"] == 'template0': pg_catalog_base_back_dir = "%s_bak" % pg_catalog_base_dir cpDirectory(pg_catalog_base_dir, pg_catalog_base_back_dir) + g_logger.debug( + "Template0 has been backed up from {0} to {1}".format( + pg_catalog_base_dir, pg_catalog_base_back_dir)) continue # handle other db's base folder if len(each_db["CatalogList"]) <= 0: raise Exception( - ErrorCode.GAUSS_536["GAUSS_53612"] % each_db["dbname"]) + "Can not find any catalog in database %s" % each_db["dbname"]) for each_catalog in each_db["CatalogList"]: # main/vm/fsm -- main.1 .. - cmd = "" main_file = "%s/%d" % ( pg_catalog_base_dir, int(each_catalog['relfilenode'])) if not os.path.isfile(main_file): raise Exception(ErrorCode.GAUSS_502["GAUSS_50210"] % main_file) cmd = "cp -f -p '%s' '%s_bak'" % (main_file, main_file) + g_logger.debug( + "{0} needs to be backed up to {0}_bak".format(main_file)) seg_idx = 1 while 1: - seg_file = "%s/%d.%d" % ( - pg_catalog_base_dir, int(each_catalog['relfilenode']), - seg_idx) + seg_file = "%s/%d.%d" % (pg_catalog_base_dir, + int(each_catalog['relfilenode']), + seg_idx) if os.path.isfile(seg_file): cmd += "&& cp -f -p '%s' '%s_bak'" % (seg_file, seg_file) seg_idx += 1 else: break - vm_file = "%s/%d_vm" % ( - pg_catalog_base_dir, int(each_catalog['relfilenode'])) + g_logger.debug("seg_file needs to be backed up") + vm_file = "%s/%d_vm" % (pg_catalog_base_dir, + int(each_catalog['relfilenode'])) if os.path.isfile(vm_file): cmd += "&& cp -f -p '%s' '%s_bak'" % (vm_file, vm_file) - fsm_file = "%s/%d_fsm" % ( - pg_catalog_base_dir, int(each_catalog['relfilenode'])) + g_logger.debug( + "{0} needs to be backed up to {0}_bak".format(vm_file)) + fsm_file = "%s/%d_fsm" % (pg_catalog_base_dir, + int(each_catalog['relfilenode'])) if os.path.isfile(fsm_file): cmd += "&& cp -f -p '%s' '%s_bak'" % (fsm_file, fsm_file) - (status, output) = subprocess.getstatusoutput(cmd) + g_logger.debug( + "{0} needs to be backed up to {0}_bak".format(fsm_file)) + (status, output) = DefaultValue.retryGetstatusoutput(cmd, 2, 5) if status != 0: - raise Exception( - ErrorCode.GAUSS_514["GAUSS_51400"] % cmd - + "\nOutput:%s" % output) + raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd + + "\nOutput:%s" % output) # special files pg_filenode.map pg_internal.init cmd = "" @@ -989,6 +998,8 @@ def __backup_base_folder(instance): else: cmd += "&& cp -f -p '%s' '%s_bak'" % ( pg_filenode_map_file, pg_filenode_map_file) + g_logger.debug("{0} needs to be backed up to {0}_bak".format( + pg_filenode_map_file)) pg_internal_init_file = "%s/pg_internal.init" % pg_catalog_base_dir if os.path.isfile(pg_internal_init_file): if cmd == "": @@ -997,51 +1008,44 @@ def __backup_base_folder(instance): else: cmd += "&& cp -f -p '%s' '%s_bak'" % ( pg_internal_init_file, pg_internal_init_file) + g_logger.debug("{0} needs to be backed up to {0}_bak".format( + pg_internal_init_file)) if cmd != 0: - (status, output) = subprocess.getstatusoutput(cmd) + (status, output) = DefaultValue.retryGetstatusoutput(cmd, 2, 5) if status != 0: - raise Exception( - ErrorCode.GAUSS_514["GAUSS_51400"] % cmd - + "\nOutput:%s" % output) + raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd + + "\nOutput:%s" % output) - g_logger.debug( - "Successfully backuped instance catalog physical files. " - "Instance data dir: %s" % instance.datadir) + g_logger.debug("Successfully backuped instance catalog physical files." + " Instance data dir: %s" % instance.datadir) def __restore_base_folder(instance): """ - function: restore base folder - input : instance - output : NA """ + g_logger.debug("Restore instance base folders. " + "Instance data dir: {0}".format(instance.datadir)) backup_path = "%s/oldClusterDBAndRel/" % g_opts.upgrade_bak_path - dbInfoDict = {} # get instance name instance_name = getInstanceName(instance) # load db and catalog info from json file if instance.instanceRole == INSTANCE_ROLE_COODINATOR: - db_and_catalog_info_file_name = "%s/cn_db_and_catalog_info_%s.json" % ( - backup_path, instance_name) + db_and_catalog_info_file_name = \ + "%s/cn_db_and_catalog_info_%s.json" % (backup_path, instance_name) elif instance.instanceRole == INSTANCE_ROLE_DATANODE: - if instance.instanceType == MASTER_INSTANCE: + if instance.instanceType == MASTER_INSTANCE or \ + instance.instanceType == STANDBY_INSTANCE: db_and_catalog_info_file_name = \ - "%s/master_dn_db_and_catalog_info_%s.json" \ - % (backup_path, instance_name) - elif instance.instanceType == STANDBY_INSTANCE: - db_and_catalog_info_file_name = \ - "%s/standby_dn_db_and_catalog_info_%s.json" \ - % (backup_path, instance_name) + "%s/dn_db_and_catalog_info_%s.json" % ( + backup_path, instance_name) else: - raise Exception(ErrorCode.GAUSS_529["GAUSS_52940"] - % instance.instanceType) + raise Exception("Invalid instance type:%s" % instance.instanceType) else: - raise Exception(ErrorCode.GAUSS_529["GAUSS_52941"] - % instance.instanceRole) - - with open(db_and_catalog_info_file_name, 'r') as fp: - dbInfoStr = fp.read() + raise Exception("Invalid instance role:%s" % instance.instanceRole) + fp = open(db_and_catalog_info_file_name, 'r') + dbInfoStr = fp.read() + fp.close() dbInfoDict = json.loads(dbInfoStr) # restore base folder @@ -1062,89 +1066,102 @@ def __restore_base_folder(instance): if each_db["dbname"] == 'template0': pg_catalog_base_back_dir = "%s_bak" % pg_catalog_base_dir cpDirectory(pg_catalog_base_back_dir, pg_catalog_base_dir) + g_logger.debug( + "Template0 has been restored from {0} to {1}".format( + pg_catalog_base_back_dir, pg_catalog_base_dir)) continue # handle other db's base folder if len(each_db["CatalogList"]) <= 0: - raise Exception( - ErrorCode.GAUSS_536["GAUSS_53612"] % each_db["dbname"]) + raise Exception("Can not find any catalog in database %s" % + each_db["dbname"]) for each_catalog in each_db["CatalogList"]: # main/vm/fsm -- main.1 .. - cmd = "" - main_file = "%s/%d" % ( - pg_catalog_base_dir, int(each_catalog['relfilenode'])) + main_file = "%s/%d" % (pg_catalog_base_dir, + int(each_catalog['relfilenode'])) if not os.path.isfile(main_file): - g_logger.debug( - "Instance data dir: %s, database: %s, relnodefile: " - "%s does not exists." - % (instance.datadir, each_db["dbname"], main_file)) + g_logger.debug("Instance data dir: %s, database: %s, " + "relnodefile: %s does not exists." \ + % (instance.datadir, each_db["dbname"], + main_file)) cmd = "cp -f -p '%s_bak' '%s'" % (main_file, main_file) + g_logger.debug( + "{0} needs to be restored from {0}_bak".format(main_file)) seg_idx = 1 while 1: - seg_file = "%s/%d.%d" % ( - pg_catalog_base_dir, int(each_catalog['relfilenode']), - seg_idx) + seg_file = "%s/%d.%d" % (pg_catalog_base_dir, + int(each_catalog['relfilenode']), + seg_idx) seg_file_bak = "%s_bak" % seg_file if os.path.isfile(seg_file): if os.path.isfile(seg_file_bak): - cmd += "&& cp -f -p '%s' '%s'" % ( - seg_file_bak, seg_file) + cmd += "&& cp -f -p '%s' '%s'" % (seg_file_bak, + seg_file) else: cmd += "&& rm -f '%s'" % seg_file seg_idx += 1 else: break + g_logger.debug("seg_file needs to be restored") - vm_file = "%s/%d_vm" % ( - pg_catalog_base_dir, int(each_catalog['relfilenode'])) + vm_file = "%s/%d_vm" % (pg_catalog_base_dir, + int(each_catalog['relfilenode'])) vm_file_bak = "%s_bak" % vm_file if os.path.isfile(vm_file): if os.path.isfile(vm_file_bak): cmd += "&& cp -f -p '%s' '%s'" % (vm_file_bak, vm_file) else: cmd += "&& rm -f '%s'" % vm_file - fsm_file = "%s/%d_fsm" % ( - pg_catalog_base_dir, int(each_catalog['relfilenode'])) + g_logger.debug( + "{0} needs to be restored from {0}_bak".format(vm_file)) + fsm_file = "%s/%d_fsm" % (pg_catalog_base_dir, + int(each_catalog['relfilenode'])) fsm_file_bak = "%s_bak" % fsm_file if os.path.isfile(fsm_file): if os.path.isfile(fsm_file_bak): cmd += "&& cp -f -p '%s' '%s'" % (fsm_file_bak, fsm_file) else: cmd += "&& rm -f '%s'" % fsm_file - (status, output) = subprocess.getstatusoutput(cmd) + g_logger.debug("{0} needs to be restored from {0}_bak".format( + fsm_file)) + (status, output) = DefaultValue.retryGetstatusoutput(cmd, 2, 5) if status != 0: - raise Exception( - ErrorCode.GAUSS_514["GAUSS_51400"] % cmd - + "\nOutput:%s" % output) + raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd + + "\nOutput:%s" % output) # special files pg_filenode.map pg_internal.init cmd = "" pg_filenode_map_file = "%s/pg_filenode.map" % pg_catalog_base_dir if os.path.isfile(pg_filenode_map_file): if cmd == "": - cmd = "cp -f -p '%s_bak' '%s'" % ( - pg_filenode_map_file, pg_filenode_map_file) + cmd = "cp -f -p '%s_bak' '%s'" % (pg_filenode_map_file, + pg_filenode_map_file) else: - cmd += "&& cp -f -p '%s_bak' '%s'" % ( - pg_filenode_map_file, pg_filenode_map_file) + cmd += "&& cp -f -p '%s_bak' '%s'" % (pg_filenode_map_file, + pg_filenode_map_file) + g_logger.debug("{0} needs to be restored from {0}_bak".format( + pg_filenode_map_file)) pg_internal_init_file = "%s/pg_internal.init" % pg_catalog_base_dir if os.path.isfile(pg_internal_init_file): if cmd == "": - cmd = "cp -f -p '%s_bak' '%s'" % ( - pg_internal_init_file, pg_internal_init_file) + cmd = "cp -f -p '%s_bak' '%s'" % (pg_internal_init_file, + pg_internal_init_file) else: - cmd += "&& cp -f -p '%s_bak' '%s'" % ( - pg_internal_init_file, pg_internal_init_file) + cmd += "&& cp -f -p '%s_bak' '%s'" % (pg_internal_init_file, + pg_internal_init_file) + g_logger.debug("{0} needs to be restored from {0}_bak".format( + pg_internal_init_file)) if cmd != 0: - (status, output) = subprocess.getstatusoutput(cmd) + (status, output) = DefaultValue.retryGetstatusoutput(cmd, 2, 5) if status != 0: - raise Exception( - ErrorCode.GAUSS_514["GAUSS_51400"] % cmd - + "\nOutput:%s" % output) + raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd + + "\nOutput:%s" % output) + g_logger.debug("Successfully restore instance base folders. Instance data " + "dir: {0}".format(instance.datadir)) def cleanBackUpDir(backupDir): @@ -1466,12 +1483,26 @@ def restoreConfig(): try: bakPath = g_opts.upgrade_bak_path clusterAppPath = g_opts.newClusterAppPath - # restore static configuration - cmd = "cp -f -p '%s'/*cluster_static_config* '%s'/bin/" % ( - bakPath, clusterAppPath) + # init old cluster config + oldStaticConfigFile = os.path.join( + g_opts.oldClusterAppPath, "bin/cluster_static_config") + oldStaticClusterInfo = dbClusterInfo() + oldStaticClusterInfo.initFromStaticConfig(g_opts.user, + oldStaticConfigFile) + # flush new static configuration + newStaticConfig = os.path.join( + clusterAppPath, "bin/cluster_static_config") + if not os.path.isfile(newStaticConfig): + raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % + os.path.realpath(newStaticConfig)) + g_file.removeFile(newStaticConfig) + newStaticClusterInfo = dbClusterInfo() + newStaticClusterInfo.saveToStaticConfig( + newStaticConfig, oldStaticClusterInfo.localNodeId, + oldStaticClusterInfo.dbNodes, upgrade=True) # restore dynamic configuration dynamic_config = "%s/cluster_dynamic_config" % bakPath - cmd += " && (if [ -f '%s' ];then cp -f -p '%s' '%s/bin/';fi)" % ( + cmd = "(if [ -f '%s' ];then cp -f -p '%s' '%s/bin/';fi)" % ( dynamic_config, dynamic_config, clusterAppPath) # no need to restore alarm.conf at here, # because it has been done on upgradeNodeApp @@ -1696,6 +1727,14 @@ def inplaceBackup(): gdspath, gdspath, gdspath, bakPath) g_logger.debug("Inplace backup command: %s" % cmd) DefaultValue.execCommandLocally(cmd) + + # backup gsql files + bakPath = g_opts.upgrade_bak_path + gsqlpath = "%s/share/sslcert/gsql" % g_clusterInfo.appPath + cmd = "(if [ -d '%s' ];then chmod 600 -R '%s'/*; cp -r '%s' '%s';fi)" %\ + (gsqlpath, gsqlpath, gsqlpath, bakPath) + g_logger.debug("Inplace backup command: %s" % cmd) + DefaultValue.execCommandLocally(cmd) except Exception as e: raise Exception(str(e)) @@ -1729,8 +1768,9 @@ def checkGucValue(): instances = g_dbNode.cmagents fileName = "cm_agent.conf" elif key == "upgrade_mode": - instances = g_dbNode.coordinators - instances.extend(g_dbNode.datanodes) + #instances = g_dbNode.coordinators + #instances.extend(g_dbNode.datanodes) + instances = g_dbNode.datanodes fileName = "postgresql.conf" else: raise Exception(ErrorCode.GAUSS_529["GAUSS_52942"]) @@ -1872,7 +1912,7 @@ def readDeleteGuc(): return gucContent -def cleanInstallPath(): +def cleanInstallPath(): """ function: clean install path input : NA @@ -1937,8 +1977,10 @@ def cleanInstallPath(): (installPath, installPath) cmd += "(if [ -d '%s/kerberos' ]; then rm -rf '%s/kerberos'; fi) &&" % \ (installPath, installPath) - cmd += "(if [ -d '%s/var/krb5kdc' ]; then rm -rf '%s/var/krb5kdc'; fi)" % \ - (installPath, installPath) + cmd += "(if [ -d '%s/var/krb5kdc' ]; then rm -rf '%s/var/krb5kdc'; fi) &&" \ + % (installPath, installPath) + cmd += "(if [ -e '%s/version.cfg' ]; then rm -rf '%s/version.cfg'; fi)"\ + % (installPath, installPath) DefaultValue.execCommandLocally(cmd) if os.listdir(installPath): g_logger.log( @@ -1977,6 +2019,1155 @@ def copyCerts(): newOmSslCerts) +def prepareUpgradeSqlFolder(): + """ + function: verify upgrade_sql.tar.gz and extract it to binary backup path, + if execute gs_upgradectl again, we will decompress the sql folder + again to avoid the file in backup path destroyed + input : NA + output: NA + """ + g_logger.debug("Preparing upgrade sql folder.") + # verify upgrade_sql.tar.gz + dirName = os.path.dirname(os.path.realpath(__file__)) + packageDir = os.path.join(dirName, "./../../") + packageDir = os.path.normpath(packageDir) + upgrade_sql_gz_file = "%s/%s" % (packageDir, const.UPGRADE_SQL_FILE) + upgrade_sql_sha256_file = "%s/%s" % (packageDir, const.UPGRADE_SQL_SHA) + if not os.path.isfile(upgrade_sql_gz_file): + raise Exception( + ErrorCode.GAUSS_502["GAUSS_50201"] % upgrade_sql_gz_file) + if not os.path.isfile(upgrade_sql_sha256_file): + raise Exception( + ErrorCode.GAUSS_502["GAUSS_50201"] % upgrade_sql_sha256_file) + g_logger.debug( + "The SQL file is %s, the sha256 file is %s." % ( + upgrade_sql_gz_file, upgrade_sql_sha256_file)) + + g_logger.debug("Checking the SHA256 value of upgrade sql folder.") + sha256Actual = g_file.getFileSHA256(upgrade_sql_gz_file) + sha256Record = g_file.readFile(upgrade_sql_sha256_file) + if sha256Actual.strip() != sha256Record[0].strip(): + raise Exception(ErrorCode.GAUSS_516["GAUSS_51635"] + \ + " The SHA256 value is different: \nTar file: " + "%s \nSHA256 file: %s " % \ + (upgrade_sql_gz_file, upgrade_sql_sha256_file)) + + # extract it to binary backup path + # self.context.upgradeBackupPath just recreated at last step, + # it should not has upgrade_sql folder, so no need do clean + g_logger.debug("Extracting upgrade sql folder.") + g_file.decompressFiles(upgrade_sql_gz_file, g_opts.upgrade_bak_path) + g_logger.debug("Successfully prepared upgrade sql folder.") + + +def backupOldClusterDBAndRel(): + """ + backup old cluster db and rel info + get database list + connect to each cn and master dn + connect to each database, and get rel info + """ + g_logger.log("Backing up old cluster database and catalog.") + try: + InstanceList = [] + # find all instances need to do backup + if len(g_dbNode.coordinators) != 0: + InstanceList.append(g_dbNode.coordinators[0]) + primaryDnIntance = getLocalPrimaryDNInstance() + if primaryDnIntance: + InstanceList.extend(primaryDnIntance) + + # do backup parallelly + if len(InstanceList) != 0: + pool = ThreadPool(len(InstanceList)) + pool.map(backupOneInstanceOldClusterDBAndRel, InstanceList) + pool.close() + pool.join() + else: + g_logger.debug("No master instance found on this node, " + "nothing need to do.") + return + + g_logger.log("Successfully backed up old cluster database and catalog.") + except Exception as e: + g_logger.logExit(str(e)) + + +def getLocalPrimaryDNInstance(): + """ + function: Get local primary DN instance + input: NA + output: NA + """ + g_logger.log("We will find all primary dn instance in the local node.") + tmpFile = os.path.join(DefaultValue.getTmpDirFromEnv( + g_opts.user), const.TMP_DYNAMIC_DN_INFO) + primaryDNList = [] + try: + # Match query results and cluster configuration + clusterStatus = DbClusterStatus() + clusterStatus.initFromFile(tmpFile) + # Find the master DN instance + for dbNode in clusterStatus.dbNodes: + for instance in dbNode.datanodes: + if instance.status == 'Primary' and \ + instance.nodeId == g_dbNode.id: + for eachInstance in g_dbNode.datanodes: + if eachInstance.instanceId == instance.instanceId: + primaryDNList.append(eachInstance) + g_logger.log( + "Success get the primary dn instance:{0}.".format( + instance.__dict__)) + return primaryDNList + except Exception as er: + raise Exception(str(er)) + + +def backupOneInstanceOldClusterDBAndRel(instance): + """ + backup db and catalog info for one old cluster instance + do checkpoint + get database info list + remove template0 + connect each database, get catalog info + save to file + """ + tmpDir = DefaultValue.getTmpDirFromEnv(g_opts.user) + if tmpDir == "": + raise Exception(ErrorCode.GAUSS_518["GAUSS_51800"] % "$PGHOST") + g_logger.debug( + "Obtaining instance catalog information. Instance data dir: %s" % + instance.datadir) + dbInfoDict = {} + dbInfoDict["dblist"] = [] + dbInfoDict["dbnum"] = 0 + backup_path = "%s/oldClusterDBAndRel/" % g_opts.upgrade_bak_path + try: + # get database info + get_db_list_sql = """SELECT d.datname, d.oid, + pg_catalog.pg_tablespace_location(t.oid) AS spclocation + FROM pg_catalog.pg_database d LEFT OUTER JOIN + pg_catalog.pg_tablespace t ON d.dattablespace = t.oid ORDER BY 2;""" + g_logger.debug("Get database info command: \n%s" % get_db_list_sql) + (status, output) = ClusterCommand.execSQLCommand(get_db_list_sql, + g_opts.user, "", + instance.port, + "postgres", + False, "-m", + IsInplaceUpgrade=True) + if status != 0: + raise Exception(ErrorCode.GAUSS_513[ + "GAUSS_51300"] % get_db_list_sql + + " Error:\n%s" % output) + if output == "": + raise Exception("can not find any database!!") + g_logger.debug("Get database info result: \n%s." % output) + resList = output.split('\n') + for each_line in resList: + tmpDbInfo = initDbInfo() + (datname, oid, spclocation) = each_line.split('|') + tmpDbInfo['dbname'] = datname.strip() + tmpDbInfo['dboid'] = oid.strip() + tmpDbInfo['spclocation'] = spclocation.strip() + dbInfoDict["dblist"].append(tmpDbInfo) + dbInfoDict["dbnum"] += 1 + + # connect each database, get catalog info + get_catalog_list_sql =\ + """SELECT p.oid, n.nspname, p.relname, + pg_catalog.pg_relation_filenode(p.oid) AS relfilenode, + p.reltablespace, pg_catalog.pg_tablespace_location(t.oid) AS + spclocation FROM pg_catalog.pg_class p INNER JOIN + pg_catalog.pg_namespace n ON (p.relnamespace = n.oid) LEFT OUTER + JOIN pg_catalog.pg_tablespace t ON (p.reltablespace = t.oid) WHERE + p.oid < 16384 AND p.relkind IN ('r', 'i', 't') AND + p.relisshared= false AND p.relpersistence != 'u' ORDER BY 1;""" + g_logger.debug("Get catalog info command: \n%s" % get_catalog_list_sql) + for each_db in dbInfoDict["dblist"]: + # template0 need handle specially, skip it here + if each_db["dbname"] == 'template0': + continue + (status, output) = ClusterCommand.execSQLCommand( + get_catalog_list_sql, g_opts.user, "", instance.port, + each_db["dbname"], False, "-m", IsInplaceUpgrade=True) + if status != 0: + raise Exception(ErrorCode.GAUSS_513[ + "GAUSS_51300"] % get_catalog_list_sql + + " Error:\n%s" % output) + if output == "": + raise Exception("can not find any catalog!!") + g_logger.debug("Get catalog info result of %s: \n%s." % ( + each_db["dbname"], output)) + resList = output.split('\n') + for each_line in resList: + tmpCatalogInfo = initCatalogInfo() + (oid, nspname, relname, relfilenode, reltablespace, + spclocation) = each_line.split('|') + tmpCatalogInfo['oid'] = oid.strip() + tmpCatalogInfo['relname'] = relname.strip() + tmpCatalogInfo['relfilenode'] = relfilenode.strip() + each_db["CatalogList"].append(tmpCatalogInfo) + each_db["CatalogNum"] += 1 + + # save db and catlog info into file + instance_name = getInstanceName(instance) + if instance.instanceRole == INSTANCE_ROLE_COODINATOR: + # handle cn instance + cn_db_and_catalog_info_file_name = \ + "%s/cn_db_and_catalog_info_%s.json" % ( + backup_path, instance_name) + DbInfoStr = json.dumps(dbInfoDict, indent=2) + fp = open(cn_db_and_catalog_info_file_name, 'w') + fp.write(DbInfoStr) + fp.flush() + fp.close() + else: + # handle master dn instance + dn_db_and_catalog_info_file_name = \ + "%s/dn_db_and_catalog_info_%s.json" % ( + backup_path, instance_name) + DbInfoStr = json.dumps(dbInfoDict, indent=2) + fp = open(dn_db_and_catalog_info_file_name, 'w') + fp.write(DbInfoStr) + fp.flush() + fp.close() + + standbyInstLst = [] + peerInsts = g_clusterInfo.getPeerInstance(instance) + for i in range(len(peerInsts)): + if peerInsts[i].instanceType == DefaultValue.MASTER_INSTANCE\ + or peerInsts[i].instanceType == \ + DefaultValue.STANDBY_INSTANCE: + standbyInstLst.append(peerInsts[i]) + for standbyInstance in standbyInstLst: + cmd = "pscp -H %s %s %s" % ( + standbyInstance.hostname, dn_db_and_catalog_info_file_name, + dn_db_and_catalog_info_file_name) + g_logger.debug("exec cmd is: %s" % cmd) + (status, output) = DefaultValue.retryGetstatusoutput(cmd, 2, 5) + if status != 0: + raise Exception(ErrorCode.GAUSS_514[ + "GAUSS_51400"] % cmd + + "\nOutput:%s" % output) + + except Exception as e: + raise Exception(str(e)) + + g_logger.debug( + "Successfully obtained instance catalog information. " + "Instance data dir: %s" % instance.datadir) + + +def updateCatalog(): + """ + connect database and update catalog one by one + 1.get database list + 2.connect each database, and exec update sql/check sql + """ + g_logger.log("Updating catalog.") + try: + update_catalog_maindb_sql = "{0}/{1}_catalog_maindb_tmp.sql".format( + g_opts.upgrade_bak_path, g_opts.scriptType) + update_catalog_otherdb_sql = "{0}/{1}_catalog_otherdb_tmp.sql".format( + g_opts.upgrade_bak_path, + g_opts.scriptType) + check_upgrade_sql = "" + if "upgrade" == g_opts.scriptType: + check_upgrade_sql = "{0}/check_upgrade_tmp.sql".format( + g_opts.upgrade_bak_path) + if not os.path.isfile(check_upgrade_sql): + raise Exception( + ErrorCode.GAUSS_502["GAUSS_50210"] % check_upgrade_sql) + if not os.path.isfile(update_catalog_maindb_sql): + raise Exception( + ErrorCode.GAUSS_502["GAUSS_50210"] % update_catalog_maindb_sql) + if not os.path.isfile(update_catalog_otherdb_sql): + raise Exception( + ErrorCode.GAUSS_502["GAUSS_50210"] % update_catalog_otherdb_sql) + + # get database list + clusterNodes = g_clusterInfo.dbNodes + for dbNode in clusterNodes: + if len(dbNode.datanodes) == 0: + continue + dnInst = dbNode.datanodes[0] + primaryDnNode = DefaultValue.getPrimaryNode(g_opts.userProfile) + if dnInst.hostname not in primaryDnNode: + continue + break + reslines = get_database_list(dnInst) + + # connect each database, and exec update sql/check sql + maindb = "postgres" + otherdbs = reslines + otherdbs.remove("postgres") + # 1.handle maindb first + upgrade_one_database([maindb, dnInst.port, + update_catalog_maindb_sql, check_upgrade_sql]) + + # 2.handle otherdbs + upgrade_info = [] + for eachdb in otherdbs: + g_logger.debug("Updating catalog for database %s." % eachdb) + upgrade_info.append([eachdb, dnInst.port, + update_catalog_otherdb_sql, check_upgrade_sql]) + if len(upgrade_info) != 0: + pool = ThreadPool(1) + pool.map(upgrade_one_database, upgrade_info) + pool.close() + pool.join() + + g_logger.log("Successfully updated catalog.") + except Exception as e: + g_logger.logExit(str(e)) + + +def get_database_list(dnInst): + """ + get database list + :return: + """ + # get database list + sqlSelect = "select datname from pg_database;" + g_logger.debug("Command for getting database list: %s" % sqlSelect) + (status, output) = ClusterCommand.execSQLCommand( + sqlSelect, g_opts.user, "", dnInst.port, IsInplaceUpgrade=True) + g_logger.debug("The result of database list: %s." % output) + if 0 != status: + raise Exception(ErrorCode.GAUSS_513["GAUSS_51300"] % + sqlSelect + " Error:\n%s" % output) + if "" == output: + raise Exception( + "No database objects were found in the cluster!") + + reslines = (output.strip()).split('\n') + if (len(reslines) < 3 + or "template1" not in reslines + or "template0" not in reslines + or "postgres" not in reslines): + raise Exception( + "The database list is invalid:%s." % str(reslines)) + return reslines + + +def upgrade_one_database(upgrade_info): + """ + upgrade catalog for one database + """ + try: + db_name = upgrade_info[0] + port = upgrade_info[1] + update_catalog_file = upgrade_info[2] + check_upgrade_file = upgrade_info[3] + + g_logger.debug("Updating catalog for database %s" % db_name) + execSQLFile(db_name, update_catalog_file, port) + if "" != check_upgrade_file: + execSQLFile(db_name, check_upgrade_file, port) + except Exception as e: + raise Exception(str(e)) + + +def execSQLFile(dbname, sqlFile, cn_port): + """ + exec sql file + """ + gsql_cmd = ClusterCommand.getSQLCommandForInplaceUpgradeBackup( + cn_port, dbname.replace('$', '\$')) + cmd = "%s -X --echo-queries --set ON_ERROR_STOP=on -f %s" % ( + gsql_cmd, sqlFile) + (status, output) = subprocess.getstatusoutput(cmd) + g_logger.debug("Catalog modification log for database %s:\n%s." % ( + dbname, output)) + if status != 0 or ClusterCommand.findErrorInSqlFile(sqlFile, output): + g_logger.debug(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd) + raise Exception("Failed to update catalog. Error: %s" % str(output)) + + +def backupOldClusterCatalogPhysicalFiles(): + """ + backup old cluster catalog physical files + get database list + connect to each cn and dn, + connect to each database, and do backup + """ + g_logger.log("Backing up old cluster catalog physical files.") + try: + InstanceList = [] + # find all instances need to do backup + if len(g_dbNode.coordinators) != 0: + InstanceList.append(g_dbNode.coordinators[0]) + if len(g_dbNode.datanodes) != 0: + for eachInstance in g_dbNode.datanodes: + InstanceList.append(eachInstance) + + # do backup parallelly + if len(InstanceList) != 0: + pool = ThreadPool(len(InstanceList)) + pool.map( + backupOneInstanceOldClusterCatalogPhysicalFiles, InstanceList) + pool.close() + pool.join() + else: + g_logger.debug("No master instance found on this node," + " nothing need to do.") + return + + g_logger.log( + "Successfully backed up old cluster catalog physical files.") + except Exception as e: + g_logger.logExit(str(e)) + + +def backupOneInstanceOldClusterCatalogPhysicalFiles(instance): + """ + backup catalog physical files for one old cluster instance + read database and catalog info from file + connect each database, do backup + """ + g_logger.debug("Backup instance catalog physical files and xlog. " + "Instance data dir: %s" % instance.datadir) + try: + # backup list folder + __backup_global_dir(instance) + + if instance.instanceRole == INSTANCE_ROLE_DATANODE and \ + instance.instanceType == DUMMY_STANDBY_INSTANCE: + g_logger.debug("There is no need to backup catalog. " + "Instance data dir: %s" % instance.datadir) + return + __backup_xlog_file(instance) + __backup_cbm_file(instance) + __backup_base_folder(instance) + except Exception as e: + raise Exception(str(e)) + + g_logger.debug( + "Successfully backuped instance catalog physical files and xlog. " + "Instance data dir: %s" % instance.datadir) + + +def __backup_global_dir(instance): + """ + """ + g_logger.debug("Start to back up global_dir") + try: + backup_dir_list = const.BACKUP_DIR_LIST_BASE + if float(g_opts.oldclusternum) < float(const.UPGRADE_VERSION_64bit_xid): + backup_dir_list.extend(const.BACKUP_DIR_LIST_64BIT_XID) + for name in backup_dir_list: + srcDir = "%s/%s" % (instance.datadir, name) + destDir = "%s_bak" % srcDir + if os.path.isdir(srcDir): + cpDirectory(srcDir, destDir) + g_logger.debug("Successfully backed up global_dir") + except Exception as e: + raise Exception(str(e)) + + +def __backup_xlog_file(instance): + """ + """ + try: + g_logger.debug("Backup instance xlog files. " + "Instance data dir: %s" % instance.datadir) + + # get Latest checkpoint location + pg_xlog_info = __get_latest_checkpoint_location(instance) + xlog_back_file = os.path.join( + instance.datadir, "pg_xlog", pg_xlog_info.get( + 'latest_checkpoint_redo_xlog_file')) + if not os.path.exists(xlog_back_file): + raise Exception("There is no xlog to backup for %d." + % instance.instanceId) + + xlog_dir = os.path.join(instance.datadir, "pg_xlog") + xlog_file_list = os.listdir(xlog_dir) + xlog_file_list.sort() + + backup_xlog_list = [] + for one_file in xlog_file_list: + if not os.path.isfile(os.path.join(xlog_dir, one_file)): + continue + if len(one_file) != 24: + continue + if one_file >= pg_xlog_info.get('latest_checkpoint_redo_xlog_file'): + backup_xlog_list.append(one_file) + + if len(backup_xlog_list) == 0: + raise Exception("There is no xlog to backup for %d." % + instance.instanceId) + + for one_file in backup_xlog_list: + src_file = os.path.join(xlog_dir, one_file) + dst_file = os.path.join(xlog_dir, one_file + "_upgrade_backup") + shutil.copy2(src_file, dst_file) + g_logger.debug("file {0} has been backed up to {1}".format( + src_file, dst_file)) + + xlog_backup_info = copy.deepcopy(pg_xlog_info) + xlog_backup_info['backup_xlog_list'] = backup_xlog_list + xlog_backup_info_target_file = os.path.join(xlog_dir, + const.XLOG_BACKUP_INFO) + g_file.createFileInSafeMode(xlog_backup_info_target_file) + with open(xlog_backup_info_target_file, "w") as fp: + json.dump(xlog_backup_info, fp) + + g_logger.debug("XLOG backup info:%s." % xlog_backup_info) + g_logger.debug("Successfully backuped instance xlog files. " + "Instance data dir: %s" % instance.datadir) + except Exception as e: + raise Exception(str(e)) + + +def __get_latest_checkpoint_location(instance): + try: + result = dict() + cmd = "pg_controldata '%s'" % instance.datadir + if g_opts.mpprcFile != "" and g_opts.mpprcFile is not None: + cmd = "source %s; %s" % (g_opts.mpprcFile, cmd) + (status, output) = DefaultValue.retryGetstatusoutput(cmd, 2, 5) + g_logger.debug("Command for get control data:%s.Output:\n%s." % ( + cmd, output)) + if status != 0: + raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd + + "\nOutput:%s" % output) + time_line_id = "" + latest_checkpoint_redo_location = "" + for one_line in output.split('\n'): + one_line = one_line.strip() + if len(one_line.split(':')) == 2: + if one_line.split(':')[0].strip() == \ + "Latest checkpoint's TimeLineID": + time_line_id = one_line.split(':')[1].strip() + elif one_line.split(':')[0].strip() == \ + "Latest checkpoint's REDO location": + latest_checkpoint_redo_location = \ + one_line.split(':')[1].strip() + if time_line_id != "" and latest_checkpoint_redo_location != "": + break + if time_line_id == "": + raise Exception( + "Failed to get Latest checkpoint's TimeLineID for %d." % + instance.instanceId) + if latest_checkpoint_redo_location == "": + raise Exception("Failed to get Latest checkpoint' " + "REDO location for %d." % instance.instanceId) + redo_log_id = latest_checkpoint_redo_location.split('/')[0] + redo_tmp_log_seg = latest_checkpoint_redo_location.split('/')[1] + if len(redo_tmp_log_seg) > 6: + redo_log_seg = redo_tmp_log_seg[0:-6] + else: + redo_log_seg = 0 + latest_checkpoint_redo_xlog_file = \ + "%08d%s%s" % (int(time_line_id, 16), + str(redo_log_id).zfill(8), str(redo_log_seg).zfill(8)) + result['latest_checkpoint_redo_location'] = \ + latest_checkpoint_redo_location + result['time_line_id'] = time_line_id + result['latest_checkpoint_redo_xlog_file'] = \ + latest_checkpoint_redo_xlog_file + g_logger.debug("%d(pg_xlog_info):%s." % (instance.instanceId, result)) + return result + except Exception as e: + raise Exception(str(e)) + + +def __backup_cbm_file(instance): + """ + """ + try: + g_logger.debug("Backup instance cbm files. " + "Instance data dir: %s" % instance.datadir) + cbm_back_dir = os.path.join(instance.datadir, "pg_cbm_back") + cmd = "rm -rf '%s' " % cbm_back_dir + (status, output) = DefaultValue.retryGetstatusoutput(cmd, 2, 5) + if status != 0: + raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd + + "\nOutput:%s" % output) + + cbm_dir = os.path.join(instance.datadir, "pg_cbm") + if not os.path.exists(cbm_dir): + g_logger.debug("There is no cbm dir to backup for %d." + % instance.instanceId) + return + + cpDirectory(cbm_dir, cbm_back_dir) + g_logger.debug("Successfully backuped instance cbm files. " + "Instance data dir: %s" % instance.datadir) + except Exception as e: + raise Exception(str(e)) + + +def restoreOldClusterCatalogPhysicalFiles(): + """ + restore old cluster catalog physical files + get database list + connect to each cn and dn, + connect to each database, and do backup + """ + g_logger.log("Restoring old cluster catalog physical files.") + try: + InstanceList = [] + # find all instances need to do restore + if len(g_dbNode.datanodes) != 0: + for eachInstance in g_dbNode.datanodes: + InstanceList.append(eachInstance) + + # do restore parallelly + if len(InstanceList) != 0: + pool = ThreadPool(len(InstanceList)) + pool.map( + restoreOneInstanceOldClusterCatalogPhysicalFiles, InstanceList) + pool.close() + pool.join() + else: + g_logger.debug("No master instance found on this node, " + "nothing need to do.") + return + + g_logger.log( + "Successfully restored old cluster catalog physical files.") + except Exception as e: + g_logger.logExit(str(e)) + + +def restoreOneInstanceOldClusterCatalogPhysicalFiles(instance): + """ + restore catalog physical files for one old cluster instance + read database and catalog info from file + connect each database, do restore + """ + g_logger.debug("Restore instance catalog physical files. " + "Instance data dir: %s" % instance.datadir) + try: + # handle dummy standby dn instance first + if instance.instanceRole == INSTANCE_ROLE_DATANODE and \ + instance.instanceType == DUMMY_STANDBY_INSTANCE: + # clean pg_xlog folder of dummy standby dn instance and return + pg_xlog_dir = "%s/pg_xlog" % instance.datadir + cmd = "find '%s' -type f | xargs -r -n 100 rm -f" % pg_xlog_dir + DefaultValue.execCommandLocally(cmd) + + # restore list folder + __restore_global_dir(instance) + return + + __restore_global_dir(instance) + __restore_xlog_file(instance) + __restore_cbm_file(instance) + __restore_base_folder(instance) + except Exception as e: + raise Exception(str(e)) + + g_logger.debug("Successfully restored instance catalog physical files. " + "Instance data dir: %s" % instance.datadir) + + +def __restore_global_dir(instance): + """ + """ + try: + g_logger.debug("Start to restore global_dir") + backup_dir_list = const.BACKUP_DIR_LIST_BASE + const.BACKUP_DIR_LIST_64BIT_XID + for name in backup_dir_list: + srcDir = "%s/%s" % (instance.datadir, name) + destDir = "%s/%s_bak" % (instance.datadir, name) + if os.path.isdir(destDir): + cpDirectory(destDir, srcDir) + g_logger.debug("Successfully restored global_dir") + except Exception as e: + raise Exception(str(e)) + + +def __restore_xlog_file(instance): + """ + """ + try: + g_logger.debug("Restore instance xlog files. " + "Instance data dir: %s" % instance.datadir) + + # read xlog_backup_info + xlog_backup_info_file = os.path.join(instance.datadir, + "pg_xlog", const.XLOG_BACKUP_INFO) + if not os.path.exists(xlog_backup_info_file): + raise Exception( + ErrorCode.GAUSS_502["GAUSS_50201"] % xlog_backup_info_file) + + with open(xlog_backup_info_file, "r") as fp: + xlog_backup_info_str = fp.read() + xlog_backup_info = json.loads(xlog_backup_info_str) + + # clean new xlog after latest_checkpoint_xlog_file + xlog_dir = os.path.join(instance.datadir, "pg_xlog") + xlog_list = os.listdir(xlog_dir) + xlog_list.sort() + + for one_file in xlog_list: + xlog_path = os.path.join(xlog_dir, one_file) + if len(one_file) == 24 and one_file >= xlog_backup_info[ + 'latest_checkpoint_redo_xlog_file'] and \ + os.path.isfile(xlog_path): + g_logger.debug("%s:Removing %s." % ( + instance.instanceId, xlog_path)) + os.remove(xlog_path) + + # restore old xlog file + for one_file in xlog_backup_info['backup_xlog_list']: + src_file = os.path.join(xlog_dir, one_file + "_upgrade_backup") + dst_file = os.path.join(xlog_dir, one_file) + if os.path.exists(src_file): + g_logger.debug("%s:Restoring %s." % ( + instance.instanceId, dst_file)) + shutil.copy2(src_file, dst_file) + else: + raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % src_file) + + g_logger.debug("Successfully restore instance xlog files. " + "Instance data dir: {0}".format(instance.datadir)) + except Exception as e: + raise Exception(str(e)) + + +def __restore_cbm_file(instance): + """ + """ + try: + g_logger.debug("restore instance cbm files. " + "Instance data dir: %s" % instance.datadir) + cbm_dir = os.path.join(instance.datadir, "pg_cbm") + cmd = "rm -rf '%s' " % cbm_dir + (status, output) = DefaultValue.retryGetstatusoutput(cmd, 2, 5) + if status != 0: + raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd + + "\nOutput:%s" % output) + + cbm_back_dir = os.path.join(instance.datadir, "pg_cbm_back") + if not os.path.exists(cbm_back_dir): + g_logger.debug("There is no cbm dir to restore for %d." % + instance.instanceId) + return + cpDirectory(cbm_back_dir, cbm_dir) + g_logger.debug("Successfully restored instance cbm files. " + "Instance data dir: %s" % instance.datadir) + except Exception as e: + raise Exception(str(e)) + + +def cleanOldClusterCatalogPhysicalFiles(): + """ + clean old cluster catalog physical files + get database list + connect to each cn and dn, + connect to each database, and do backup + """ + g_logger.log("Cleaning old cluster catalog physical files.") + try: + # kill any pending processes that are + # copying backup catalog physical files + killCmd = DefaultValue.killInstProcessCmd( + "backup_old_cluster_catalog_physical_files") + (status, output) = subprocess.getstatusoutput(killCmd) + if status != 0: + raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % killCmd + + "\nOutput:%s" % output) + + InstanceList = [] + # find all instances need to do clean + if len(g_dbNode.datanodes) != 0: + for eachInstance in g_dbNode.datanodes: + InstanceList.append(eachInstance) + + # do clean parallelly + if len(InstanceList) != 0: + pool = ThreadPool(len(InstanceList)) + pool.map( + cleanOneInstanceOldClusterCatalogPhysicalFiles, InstanceList) + pool.close() + pool.join() + else: + g_logger.debug("No master instance found on this node, " + "nothing need to do.") + return + + g_logger.log("Successfully cleaned old cluster catalog physical files.") + except Exception as e: + g_logger.logExit(str(e)) + + +def cleanOneInstanceOldClusterCatalogPhysicalFiles(instance): + """ + clean catalog physical files for one old cluster instance + read database and catalog info from file + connect each database, do restore + """ + g_logger.debug("clean up instance catalog backup. " + "Instance data dir: %s" % instance.datadir) + try: + __clean_global_dir(instance) + + if g_opts.rollback: + pg_csnlog_dir = os.path.join(instance.datadir, "pg_csnlog") + # when do rollback, if old cluster num less than + # UPGRADE_VERSION_64bit_xid, remove the pg_csnlog directory + if float(g_opts.oldclusternum) < float( + const.UPGRADE_VERSION_64bit_xid) and \ + os.path.isdir(pg_csnlog_dir): + g_file.removeDirectory(pg_csnlog_dir) + else: + pg_subtrans_dir = os.path.join(instance.datadir, "pg_subtrans") + # when do commit, remove the pg_subtrans directory + if os.path.isdir(pg_subtrans_dir): + g_file.removeDirectory(pg_subtrans_dir) + + if instance.instanceRole == INSTANCE_ROLE_DATANODE and \ + instance.instanceType == DUMMY_STANDBY_INSTANCE: + g_logger.debug("There is no need to clean catalog. " + "Instance data dir: %s" % instance.datadir) + return + + __clean_xlog_file(instance) + __clean_cbm_file(instance) + __clean_base_folder(instance) + except Exception as e: + raise Exception(str(e)) + + g_logger.debug("Successfully cleaned up instance catalog backup. " + "Instance data dir: %s" % instance.datadir) + + +def __clean_global_dir(instance): + """ + """ + # clean pg_internal.init* + g_logger.debug("Start to clean global_dir") + cmd = "rm -f %s/global/pg_internal.init*" % instance.datadir + DefaultValue.execCommandLocally(cmd) + + backup_dir_list = const.BACKUP_DIR_LIST_BASE + const.BACKUP_DIR_LIST_64BIT_XID + for name in backup_dir_list: + backup_dir = "%s/%s" % (instance.datadir, name) + cleanBackUpDir(backup_dir) + g_logger.debug("Successfully cleaned global_dir") + + +def __clean_xlog_file(instance): + """ + """ + # clean *.upgrade_backup files + cmd = "rm -f '%s'/pg_xlog/*_upgrade_backup && rm -f '%s'/pg_xlog/%s" % \ + (instance.datadir, instance.datadir, const.XLOG_BACKUP_INFO) + DefaultValue.execCommandLocally(cmd) + g_logger.debug("Successfully clean instance xlog files. " + "Instance data dir: {0}".format(instance.datadir)) + + +def __clean_cbm_file(instance): + """ + """ + # clean pg_cbm_back files + cbm_back_dir = os.path.join(instance.datadir, "pg_cbm_back") + cmd = "rm -rf '%s' " % cbm_back_dir + (status, output) = DefaultValue.retryGetstatusoutput(cmd, 2, 5) + if status != 0: + raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd + + "\nOutput:%s" % output) + g_logger.debug("Successfully clean instance cbm files. " + "Instance data dir: {0}".format(instance.datadir)) + + +def __clean_base_folder(instance): + """ + """ + g_logger.debug("Clean instance base folders. " + "Instance data dir: {0}".format(instance.datadir)) + backup_path = os.path.join(g_opts.upgrade_bak_path, "oldClusterDBAndRel") + # get instance name + instance_name = getInstanceName(instance) + # load db and catalog info from json file + if instance.instanceRole == INSTANCE_ROLE_COODINATOR: + db_and_catalog_info_file_name = \ + "%s/cn_db_and_catalog_info_%s.json" % (backup_path, instance_name) + elif instance.instanceRole == INSTANCE_ROLE_DATANODE: + if instance.instanceType == MASTER_INSTANCE or \ + instance.instanceType == STANDBY_INSTANCE: + db_and_catalog_info_file_name = \ + "%s/dn_db_and_catalog_info_%s.json" % ( + backup_path, instance_name) + else: + raise Exception("Invalid instance type:%s" % instance.instanceType) + else: + raise Exception("Invalid instance role:%s" % instance.instanceRole) + with open(db_and_catalog_info_file_name, 'r') as fp: + dbInfoStr = fp.read() + try: + dbInfoDict = json.loads(dbInfoStr) + except Exception as ee: + raise Exception(str(ee)) + + # clean base folder + for each_db in dbInfoDict["dblist"]: + if each_db["spclocation"] != "": + if each_db["spclocation"].startswith('/'): + tbsBaseDir = each_db["spclocation"] + else: + tbsBaseDir = "%s/pg_location/%s" % ( + instance.datadir, each_db["spclocation"]) + pg_catalog_base_dir = "%s/%s_%s/%d" % ( + tbsBaseDir, + DefaultValue.TABLESPACE_VERSION_DIRECTORY, + instance_name, + int(each_db["dboid"])) + else: + pg_catalog_base_dir = "%s/base/%d" % ( + instance.datadir, int(each_db["dboid"])) + + # for base folder, template0 need handle specially + if each_db["dbname"] == 'template0': + cmd = "rm -rf '%s_bak' && rm -f %s/pg_internal.init*" % \ + (pg_catalog_base_dir, pg_catalog_base_dir) + (status, output) = DefaultValue.retryGetstatusoutput(cmd, 2, 5) + if status != 0: + raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd + + "\nOutput:%s" % output) + g_logger.debug("{0} has been cleaned".format(pg_catalog_base_dir)) + continue + + # main/vm/fsm -- main.1 .. + # can not add '' for this cmd + cmd = "rm -f %s/*_bak && rm -f %s/pg_internal.init*" % ( + pg_catalog_base_dir, pg_catalog_base_dir) + g_logger.debug("{0} needs to be cleaned".format(pg_catalog_base_dir)) + (status, output) = DefaultValue.retryGetstatusoutput(cmd, 2, 5) + if status != 0: + raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd + + "\nOutput:%s" % output) + g_logger.debug("Successfully clean instance base folders. " + "Instance data dir: {0}".format(instance.datadir)) + + +def replacePgprocFile(): + """ + function: replace pg_proc data file by pg_proc_temp data file + input: NA + output: NA + """ + g_logger.log("Replace pg_proc file.") + try: + InstanceList = [] + # find all DB instances need to replace pg_proc + if len(g_dbNode.datanodes) != 0: + for eachInstance in g_dbNode.datanodes: + if (eachInstance.instanceType == MASTER_INSTANCE + or eachInstance.instanceType == STANDBY_INSTANCE): + InstanceList.append(eachInstance) + + # replace each instance pg_proc + if len(InstanceList) != 0: + pool = ThreadPool(len(InstanceList)) + pool.map(replaceOneInstancePgprocFile, InstanceList) + pool.close() + pool.join() + else: + g_logger.debug( + "No instance found on this node, nothing need to do.") + return + + g_logger.log( + "Successfully replaced all instances pg_proc file on this node.") + except Exception as e: + g_logger.logExit(str(e)) + + +def replaceOneInstancePgprocFile(instance): + """ + function: touch upgrade init file for this instance + input: NA + output: NA + """ + g_logger.debug("Replace instance pg_proc file. " + "Instance data dir: %s" % instance.datadir) + pg_proc_mapping_file = os.path.join(g_opts.appPath, + 'pg_proc_mapping.txt') + with open(pg_proc_mapping_file, 'r') as fp: + pg_proc_dict_str = fp.read() + proc_dict = eval(pg_proc_dict_str) + try: + # replace pg_proc data file with pg_proc_temp data file + for proc_file_path, pg_proc_temp_file_path in proc_dict.items(): + pg_proc_data_file = \ + os.path.join(instance.datadir, proc_file_path) + if not os.path.exists(pg_proc_data_file): + raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % + pg_proc_data_file) + pg_proc_temp_data_file = os.path.join( + instance.datadir, pg_proc_temp_file_path) + if not os.path.exists(pg_proc_temp_data_file): + raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % + pg_proc_temp_data_file) + g_file.removeFile(pg_proc_data_file) + g_file.cpFile(pg_proc_temp_data_file, pg_proc_data_file) + + except Exception as e: + raise Exception(str(e)) + + g_logger.debug( + "Successfully replaced instance pg_proc file. Instance data dir: %s" + % instance.datadir) + + +def createPgprocPathMappingFile(): + """ + create pg_proc and pg_proc_temp_oids data file path mapping + :return: + """ + g_logger.log("Create file to save mapping between pg_proc file path and" + " pg_proc_temp_oids file path.") + clusterNodes = g_clusterInfo.dbNodes + dnInst = None + for dbNode in clusterNodes: + if len(dbNode.datanodes) == 0: + continue + dnInst = dbNode.datanodes[0] + primaryDnNode = DefaultValue.getPrimaryNode(g_opts.userProfile) + if dnInst.hostname not in primaryDnNode: + continue + break + database_list = get_database_list(dnInst) + pg_proc_list = ['pg_proc', 'pg_proc_oid_index', + 'pg_proc_proname_args_nsp_index'] + pg_proc_temp_list = ['pg_proc_temp_oids', 'pg_proc_oid_index_temp', + 'pg_proc_proname_args_nsp_index_temp'] + proc_file_path_list = [] + pg_proc_temp_file_path_list = [] + for eachdb in database_list: + for info in pg_proc_list: + pg_proc_file_path = getTableFilePath(info, dnInst, eachdb) + proc_file_path_list.append(pg_proc_file_path) + for temp_info in pg_proc_temp_list: + pg_proc_temp_file_path = getTableFilePath(temp_info, dnInst, eachdb) + pg_proc_temp_file_path_list.append(pg_proc_temp_file_path) + proc_dict = dict((proc_file_path, pg_proc_temp_file_path) for + proc_file_path, pg_proc_temp_file_path in + zip(proc_file_path_list, pg_proc_temp_file_path_list)) + pg_proc_mapping_file = os.path.join(g_opts.appPath, 'pg_proc_mapping.txt') + with open(pg_proc_mapping_file, 'w') as fp: + fp.write(str(proc_dict)) + g_logger.log( + "Successfully created file to save mapping between pg_proc file path" + " and pg_proc_temp_oids file path.") + + +def getTableFilePath(tablename, dnInst, db_name): + """ + get table file path by oid + :return: + """ + sql = "select oid from pg_class where relname='%s';" % tablename + (status, output) = ClusterCommand.remoteSQLCommand( + sql, g_opts.user, + dnInst.hostname, + dnInst.port, False, + db_name, + IsInplaceUpgrade=True) + if status != 0: + raise Exception(ErrorCode.GAUSS_513["GAUSS_51300"] % sql + + " Error: \n%s" % str(output)) + table_oid = output.strip('\n') + g_logger.debug("pg_proc oid is %s" % table_oid) + sql = "select pg_relation_filepath(%s);" % table_oid + (status, output) = ClusterCommand.remoteSQLCommand( + sql, g_opts.user, + dnInst.hostname, + dnInst.port, False, + db_name, + IsInplaceUpgrade=True) + if status != 0: + raise Exception(ErrorCode.GAUSS_513["GAUSS_51300"] % sql + + " Error: \n%s" % str(output)) + table_file_path = output.strip('\n') + g_logger.debug("pg_proc file path is %s" % table_file_path) + return table_file_path + + +def createNewCsvFile(): + """ + 1. copy pg_proc info to csv file + 2. modify csv file + 3. create new table and get info by csv file + :return: + """ + g_logger.log("Create new csv file.") + clusterNodes = g_clusterInfo.dbNodes + dnInst = None + for dbNode in clusterNodes: + if len(dbNode.datanodes) == 0: + continue + dnInst = dbNode.datanodes[0] + primaryDnNode = DefaultValue.getPrimaryNode(g_opts.userProfile) + if dnInst.hostname not in primaryDnNode: + continue + break + dndir = dnInst.datadir + pg_proc_csv_path = '%s/pg_copydir/tbl_pg_proc_oids.csv' % dndir + new_pg_proc_csv_path = '%s/pg_copydir/new_tbl_pg_proc_oids.csv' % dndir + sql = \ + """copy pg_proc( proname, pronamespace, proowner, prolang, + procost, prorows, provariadic, protransform, prosecdef, + proleakproof, proisstrict, proretset, provolatile, pronargs, + pronargdefaults, prorettype, proargtypes, proallargtypes, + proargmodes, proargnames, proargdefaults, prosrc, probin, + proconfig, proacl, prodefaultargpos, fencedmode, proshippable, + propackage,prokind) WITH OIDS to '%s' delimiter ',' + csv header;""" % pg_proc_csv_path + (status, output) = ClusterCommand.remoteSQLCommand( + sql, g_opts.user, + dnInst.hostname, dnInst.port, False, + DefaultValue.DEFAULT_DB_NAME, IsInplaceUpgrade=True) + if status != 0: + raise Exception(ErrorCode.GAUSS_513["GAUSS_51300"] % sql + + " Error: \n%s" % str(output)) + pg_proc_csv_reader = csv.reader(open(pg_proc_csv_path, 'r')) + pg_proc_csv_data = list(pg_proc_csv_reader) + header = pg_proc_csv_data[0] + header.insert(header.index('protransform') + 1, 'proisagg') + header.insert(header.index('protransform') + 2, 'proiswindow') + new_pg_proc_csv_data = [] + new_pg_proc_csv_data.append(header) + pg_proc_data_info = pg_proc_csv_data[1:] + for i in range(2): + for info in pg_proc_data_info: + info.insert(header.index('protransform') + 2, 'True') + for info in pg_proc_data_info: + new_pg_proc_csv_data.append(info) + f = open(new_pg_proc_csv_path, 'w') + new_pg_proc_csv_writer = csv.writer(f) + for info in new_pg_proc_csv_data: + new_pg_proc_csv_writer.writerow(info) + f.close() + # scp csv file to other nodes + standbyInstLst = [] + peerInsts = g_clusterInfo.getPeerInstance(dnInst) + for i in range(len(peerInsts)): + if peerInsts[i].instanceType == DefaultValue.MASTER_INSTANCE \ + or peerInsts[i].instanceType == \ + DefaultValue.STANDBY_INSTANCE: + standbyInstLst.append(peerInsts[i]) + for standbyInstance in standbyInstLst: + standbyCsvFilePath = \ + '%s/pg_copydir/new_tbl_pg_proc_oids.csv' % standbyInstance.datadir + cmd = "pscp -H %s %s %s" % ( + standbyInstance.hostname, new_pg_proc_csv_path, + standbyCsvFilePath) + g_logger.debug("exec cmd is: %s" % cmd) + (status, output) = DefaultValue.retryGetstatusoutput(cmd, 2, 5) + if status != 0: + raise Exception(ErrorCode.GAUSS_514[ + "GAUSS_51400"] % cmd + + "\nOutput:%s" % output) + + def checkAction(): """ function: check action @@ -1984,7 +3175,10 @@ def checkAction(): output : NA """ if g_opts.action not in \ - [const.ACTION_TOUCH_INIT_FILE, const.ACTION_SYNC_CONFIG, + [const.ACTION_TOUCH_INIT_FILE, + const.ACTION_UPDATE_CATALOG, + const.ACTION_BACKUP_OLD_CLUSTER_DB_AND_REL, + const.ACTION_SYNC_CONFIG, const.ACTION_BACKUP_CONFIG, const.ACTION_RESTORE_CONFIG, const.ACTION_INPLACE_BACKUP, @@ -1995,7 +3189,14 @@ def checkAction(): const.ACTION_SWITCH_PROCESS, const.ACTION_SWITCH_BIN, const.ACTION_CLEAN_INSTALL_PATH, - const.ACTION_COPY_CERTS]: + const.ACTION_COPY_CERTS, + const.ACTION_UPGRADE_SQL_FOLDER, + const.ACTION_BACKUP_OLD_CLUSTER_CATALOG_PHYSICAL_FILES, + const.ACTION_RESTORE_OLD_CLUSTER_CATALOG_PHYSICAL_FILES, + const.ACTION_CLEAN_OLD_CLUSTER_CATALOG_PHYSICAL_FILES, + const.ACTION_REPLACE_PG_PROC_FILES, + const.ACTION_CREATE_PG_PROC_MAPPING_FILE, + const.ACTION_CREATE_NEW_CSV_FILE]: GaussLog.exitWithError( ErrorCode.GAUSS_500["GAUSS_50004"] % 't' + " Value: %s" % g_opts.action) @@ -2027,7 +3228,21 @@ def main(): const.ACTION_CHECK_GUC: checkGucValue, const.ACTION_BACKUP_HOTPATCH: backupHotpatch, const.ACTION_ROLLBACK_HOTPATCH: rollbackHotpatch, - const.ACTION_COPY_CERTS: copyCerts} + const.ACTION_COPY_CERTS: copyCerts, + const.ACTION_UPGRADE_SQL_FOLDER: prepareUpgradeSqlFolder, + const.ACTION_BACKUP_OLD_CLUSTER_DB_AND_REL: + backupOldClusterDBAndRel, + const.ACTION_UPDATE_CATALOG: updateCatalog, + const.ACTION_BACKUP_OLD_CLUSTER_CATALOG_PHYSICAL_FILES: + backupOldClusterCatalogPhysicalFiles, + const.ACTION_RESTORE_OLD_CLUSTER_CATALOG_PHYSICAL_FILES: + restoreOldClusterCatalogPhysicalFiles, + const.ACTION_CLEAN_OLD_CLUSTER_CATALOG_PHYSICAL_FILES: + cleanOldClusterCatalogPhysicalFiles, + const.ACTION_REPLACE_PG_PROC_FILES: replacePgprocFile, + const.ACTION_CREATE_PG_PROC_MAPPING_FILE: + createPgprocPathMappingFile, + const.ACTION_CREATE_NEW_CSV_FILE: createNewCsvFile} func = funcs[g_opts.action] func() except Exception as e: