Files
openGauss-OM/script/impl/om/OLAP/OmImplOLAP.py
2023-02-24 10:19:41 +08:00

398 lines
16 KiB
Python

# -*- coding:utf-8 -*-
#############################################################################
# Copyright (c) 2020 Huawei Technologies Co.,Ltd.
#
# openGauss is licensed under Mulan PSL v2.
# You can use this software according to the terms
# and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
# http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
# WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# ----------------------------------------------------------------------------
# Description : omManagerImplOLAP.py is a utility to manage a Gauss200 cluster.
#############################################################################
import subprocess
import sys
import re
import time
import getpass
sys.path.append(sys.path[0] + "/../../../../")
from gspylib.common.DbClusterInfo import queryCmd
from gspylib.threads.SshTool import SshTool
from gspylib.common.ErrorCode import ErrorCode
from gspylib.common.DbClusterStatus import DbClusterStatus
from gspylib.common.Common import DefaultValue
from gspylib.common.OMCommand import OMCommand
from impl.om.OmImpl import OmImpl
from gspylib.os.gsfile import g_file
from base_utils.os.net_util import NetUtil
from base_utils.os.env_util import EnvUtil
from gspylib.component.DSS.dss_checker import DssConfig
###########################################
class OmImplOLAP(OmImpl):
"""
class: OmImplOLAP
"""
def __init__(self, OperationManager=None):
"""
function:class init
input:OperationManager
output:NA
"""
OmImpl.__init__(self, OperationManager)
# AP
def stopCluster(self):
"""
function:Stop cluster
input:NA
output:NA
"""
self.logger.log("Stopping the cluster.")
# Stop cluster in 300 seconds
cmd = "source %s; %s -t %d" % (
self.context.g_opts.mpprcFile, OMCommand.getLocalScript("Gs_Stop"),
DefaultValue.TIMEOUT_CLUSTER_STOP)
(status, output) = subprocess.getstatusoutput(cmd)
if (status != 0):
self.logger.log(
"Warning: Failed to stop cluster within 300 seconds,"
"stopping cluster again at immediate mode.")
cmd = "source %s; %s -m immediate -t %d" % (
self.context.g_opts.mpprcFile,
OMCommand.getLocalScript("Gs_Stop"),
DefaultValue.TIMEOUT_CLUSTER_STOP)
(status, output) = subprocess.getstatusoutput(cmd)
if (status != 0):
self.logger.log("The cmd is %s " % cmd)
raise Exception(
ErrorCode.GAUSS_516["GAUSS_51610"]
% "the cluster at immediate mode"
+ " Error: \n%s" % output)
self.logger.log("Successfully stopped the cluster.")
# AP
def startCluster(self):
"""
function:Start cluster
input:NA
output:NA
"""
self.logger.log("Starting the cluster.", "addStep")
# Delete cluster dynamic config if it is exist on all nodes
clusterDynamicConf = "%s/bin/cluster_dynamic_config" \
% self.oldClusterInfo.appPath
cmd = g_file.SHELL_CMD_DICT["deleteFile"] % (
clusterDynamicConf, clusterDynamicConf)
self.logger.debug(
"Command for removing the cluster dynamic configuration: %s."
% cmd)
self.sshTool.executeCommand(cmd)
# Start cluster in 300 seconds
cmd = "source %s; %s -t %s" % (
self.context.g_opts.mpprcFile,
OMCommand.getLocalScript("Gs_Start"),
DefaultValue.TIMEOUT_CLUSTER_START)
(status, output) = subprocess.getstatusoutput(cmd)
if (status != 0):
self.logger.debug("The cmd is %s " % cmd)
raise Exception(
ErrorCode.GAUSS_516["GAUSS_51607"]
% "the cluster" + " Error: \n%s" % output)
self.logger.log("Successfully started the cluster.", "constant")
##########################################################################
# Start Flow
##########################################################################
def getNodeId(self):
"""
function: get node Id
input: NA
output: NA
"""
clusterType = "cluster"
nodeId = 0
if (self.context.g_opts.nodeName != ""):
clusterType = "node"
dbNode = self.context.clusterInfo.getDbNodeByName(
self.context.g_opts.nodeName)
if not dbNode:
raise Exception(
ErrorCode.GAUSS_516["GAUSS_51619"]
% self.context.g_opts.nodeName)
nodeId = dbNode.id
elif (self.context.g_opts.azName != ""):
clusterType = self.context.g_opts.azName
# check whether the given azName is in the cluster
if (
self.context.g_opts.azName
not in self.context.clusterInfo.getazNames()):
raise Exception(
ErrorCode.GAUSS_500["GAUSS_50004"]
% '-az' + " The az name [%s] is not in the cluster."
% self.context.g_opts.azName)
return nodeId, clusterType
def doStartClusterByCm(self):
"""
function: start cluster by cm
:return: NA
"""
(nodeId, startType) = self.getNodeId()
if not self.context.cmCons[0]:
raise Exception(ErrorCode.GAUSS_516["GAUSS_51622"] %
("cm", "local"))
cluster_normal_status = [DbClusterStatus.CLUSTER_STATUS_NORMAL,
DbClusterStatus.CLUSTER_STATUS_DEGRADED]
if EnvUtil.is_dss_mode(self.context.g_opts.user):
cma_paths = DssConfig.get_cm_inst_path(
self.clusterInfo.dbNodes[nodeId])
if cma_paths and DssConfig.get_cma_res_value(
cma_paths[0], key='restart_delay') != str(
DssConfig.DMS_DEFAULT_RESTART_DELAY):
DssConfig.reload_cm_resource(
self.logger, timeout=DssConfig.DMS_DEFAULT_RESTART_DELAY)
if nodeId == 0 and self.dataDir:
raise Exception(ErrorCode.GAUSS_516["GAUSS_51655"] % ("cm", "-D"))
# start cluster
is_success = self.context.cmCons[0].startCluster(
self.context.g_opts.user,
nodeId,
self.context.g_opts.time_out,
False,
self.context.isSingle,
cluster_normal_status,
False,
self.context.g_opts.azName,
self.dataDir)
if is_success:
self.logger.log("Successfully started %s." % startType)
self.logger.debug("Operation succeeded: Start by cm.")
def doStartCluster(self):
"""
function: do start cluster
input: NA
output: NA
"""
self.logger.debug("Operating: Starting.")
# if has cm, will start cluster by cm_ctl command
if ((not self.context.clusterInfo.hasNoCm())
and DefaultValue.isgreyUpgradeNodeSpecify(self.context.user,
DefaultValue.GREY_UPGRADE_STEP_UPGRADE_PROCESS, None, self.context.logger)):
self.context.logger.debug("Have CM configuration, upgrade all"
" nodes together.")
self.doStartClusterByCm()
return
else:
self.context.logger.debug("Have CM configuration, rolling upgrade "
"partial node but not all nodes, so "
"start cluster with openGauss om.")
# Specifies the stop node
# Gets the specified node id
startType = "node" if self.context.g_opts.nodeName != "" else "cluster"
# Perform a start operation
self.logger.log("Starting %s." % startType)
self.logger.log("=========================================")
hostName = NetUtil.GetHostIpOrName()
# get the newest dynaminc config and send to other node
self.clusterInfo.checkClusterDynamicConfig(self.context.user, hostName)
if self.context.g_opts.nodeName == "":
hostList = self.clusterInfo.getClusterNodeNames()
else:
hostList = []
hostList.append(self.context.g_opts.nodeName)
self.sshTool = SshTool(self.clusterInfo.getClusterNodeNames(), None,
DefaultValue.TIMEOUT_CLUSTER_START)
if self.time_out is None:
time_out = DefaultValue.TIMEOUT_CLUSTER_START
else:
time_out = self.time_out
if self.context.g_opts.cluster_number:
cmd = "source %s; %s -U %s -R %s -t %s --security-mode=%s --cluster_number=%s" % (
self.context.g_opts.mpprcFile,
OMCommand.getLocalScript("Local_StartInstance"),
self.context.user, self.context.clusterInfo.appPath, time_out,
self.context.g_opts.security_mode, self.context.g_opts.cluster_number)
else:
cmd = "source %s; %s -U %s -R %s -t %s --security-mode=%s" % (
self.context.g_opts.mpprcFile,
OMCommand.getLocalScript("Local_StartInstance"),
self.context.user, self.context.clusterInfo.appPath, time_out,
self.context.g_opts.security_mode)
if self.dataDir != "":
cmd += " -D %s" % self.dataDir
failedOutput = ''
for nodeName in hostList:
(statusMap, output) = self.sshTool.getSshStatusOutput(cmd, [nodeName])
if statusMap[nodeName] != 'Success':
failedOutput += output
elif re.search("another server might be running", output):
self.logger.log(output)
elif re.search("] WARNING:", output):
tmp = '\n'.join(re.findall(".*] WARNING:.*", output))
self.logger.log(output[0:output.find(":")] + '\n' + tmp)
if len(failedOutput):
self.logger.log("=========================================")
raise Exception(
ErrorCode.GAUSS_536["GAUSS_53600"] % (cmd, failedOutput))
if startType == "cluster":
starttime = time.time()
cluster_state = ""
cmd = "source %s; gs_om -t status|grep cluster_state" \
% self.context.g_opts.mpprcFile
while time.time() <= 30 + starttime:
status, output = subprocess.getstatusoutput(cmd)
if status != 0:
raise Exception(
ErrorCode.GAUSS_516["GAUSS_51607"] % "cluster" +
" After startup, check cluster_state failed")
else:
cluster_state = output.split()[-1]
if cluster_state != "Normal":
self.logger.log("Waiting for check cluster state...")
time.sleep(5)
else:
break
if cluster_state != "Normal":
raise Exception(ErrorCode.GAUSS_516["GAUSS_51607"] % "cluster"
+ " After startup, the last check results were"
" %s. Please check manually."
% cluster_state)
self.logger.log("=========================================")
self.logger.log("Successfully started.")
self.logger.debug("Operation succeeded: Start.")
def doStopClusterByCm(self):
"""
function: stop cluster by cm
:return: None
"""
(nodeId, _) = self.getNodeId()
if not self.context.cmCons[0]:
raise Exception(ErrorCode.GAUSS_516["GAUSS_51622"] %
("cm", "local"))
if self.time_out is None:
time_out = DefaultValue.TIMEOUT_CLUSTER_STOP
else:
time_out = int(self.time_out)
if nodeId == 0 and self.dataDir:
raise Exception(ErrorCode.GAUSS_516["GAUSS_51655"] % ("cm", "-D"))
self.context.cmCons[0].stop_cluster((nodeId,
self.mode,
time_out,
self.dataDir,
self.context.g_opts.azName))
self.logger.debug("Operation succeeded: Stop by cm.")
def doStopCluster(self):
"""
function: do stop cluster
input: NA
output: NA
"""
self.logger.debug("Operating: Stopping.")
# if has cm, will start cluster by cm_ctl command
if not self.context.clusterInfo.hasNoCm():
self.doStopClusterByCm()
return
# Specifies the stop node
# Gets the specified node id
stop_type = "node" if self.context.g_opts.nodeName != "" else "cluster"
# Perform a stop operation
self.logger.log("Stopping %s." % stop_type)
self.logger.log("=========================================")
if self.context.g_opts.nodeName == "":
host_list = self.clusterInfo.getClusterNodeNames()
else:
host_list = []
host_list.append(self.context.g_opts.nodeName)
self.sshTool = SshTool(self.clusterInfo.getClusterNodeNames(), None,
DefaultValue.TIMEOUT_CLUSTER_START)
if self.time_out is None:
time_out = DefaultValue.TIMEOUT_CLUSTER_STOP
else:
time_out = self.time_out
cmd = "source %s; %s -U %s -R %s -t %s" % (
self.context.g_opts.mpprcFile,
OMCommand.getLocalScript("Local_StopInstance"),
self.context.user, self.context.clusterInfo.appPath, time_out)
if self.dataDir != "":
cmd += " -D %s" % self.dataDir
if self.mode != "":
cmd += " -m %s" % self.mode
(statusMap, output) = self.sshTool.getSshStatusOutput(cmd, host_list)
for nodeName in host_list:
if statusMap[nodeName] != 'Success':
raise Exception(
ErrorCode.GAUSS_536["GAUSS_53606"] % (cmd, output))
self.logger.log("Successfully stopped %s." % stop_type)
self.logger.log("=========================================")
self.logger.log("End stop %s." % stop_type)
self.logger.debug("Operation succeeded: Stop.")
def doView(self):
"""
function:get cluster node info
input:NA
output:NA
"""
# view static_config_file
self.context.clusterInfo.printStaticConfig(self.context.g_opts.outFile)
def doQuery(self):
"""
function: do query
input : NA
output : NA
"""
hostName = NetUtil.GetHostIpOrName()
dbNums = len(self.context.clusterInfo.dbNodes)
sshtools = []
for _ in range(dbNums - 1):
sshtools.append(SshTool([], timeout=self.time_out))
cmd = queryCmd()
if (self.context.g_opts.outFile != ""):
cmd.outputFile = self.context.g_opts.outFile
self.context.clusterInfo.queryClsInfo(hostName, sshtools,
self.context.mpprcFile, cmd)
def doRefreshConf(self):
"""
function: do refresh conf
input : NA
output : NA
"""
if self.context.clusterInfo.isSingleNode():
self.logger.log(
"No need to generate dynamic configuration file for one node.")
return
if DefaultValue.cm_exist_and_is_disaster_cluster(self.context.clusterInfo, self.logger):
self.logger.log(
"Streaming disaster cluster do not need to generate dynamic configuration.")
return
self.logger.log("Generating dynamic configuration file for all nodes.")
hostname = NetUtil.GetHostIpOrName()
sshtool = SshTool(self.context.clusterInfo.getClusterNodeNames())
self.context.clusterInfo.doRefreshConf(self.context.user, hostname,
sshtool)
self.logger.log("Successfully generated dynamic configuration file.")