398 lines
16 KiB
Python
398 lines
16 KiB
Python
# -*- coding:utf-8 -*-
|
|
#############################################################################
|
|
# Copyright (c) 2020 Huawei Technologies Co.,Ltd.
|
|
#
|
|
# openGauss is licensed under Mulan PSL v2.
|
|
# You can use this software according to the terms
|
|
# and conditions of the Mulan PSL v2.
|
|
# You may obtain a copy of Mulan PSL v2 at:
|
|
#
|
|
# http://license.coscl.org.cn/MulanPSL2
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OF ANY KIND,
|
|
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
# See the Mulan PSL v2 for more details.
|
|
# ----------------------------------------------------------------------------
|
|
# Description : omManagerImplOLAP.py is a utility to manage a Gauss200 cluster.
|
|
#############################################################################
|
|
import subprocess
|
|
import sys
|
|
import re
|
|
import time
|
|
import getpass
|
|
|
|
sys.path.append(sys.path[0] + "/../../../../")
|
|
from gspylib.common.DbClusterInfo import queryCmd
|
|
from gspylib.threads.SshTool import SshTool
|
|
from gspylib.common.ErrorCode import ErrorCode
|
|
from gspylib.common.DbClusterStatus import DbClusterStatus
|
|
from gspylib.common.Common import DefaultValue
|
|
from gspylib.common.OMCommand import OMCommand
|
|
from impl.om.OmImpl import OmImpl
|
|
from gspylib.os.gsfile import g_file
|
|
from base_utils.os.net_util import NetUtil
|
|
from base_utils.os.env_util import EnvUtil
|
|
from gspylib.component.DSS.dss_checker import DssConfig
|
|
|
|
|
|
|
|
|
|
###########################################
|
|
class OmImplOLAP(OmImpl):
|
|
"""
|
|
class: OmImplOLAP
|
|
"""
|
|
|
|
def __init__(self, OperationManager=None):
|
|
"""
|
|
function:class init
|
|
input:OperationManager
|
|
output:NA
|
|
"""
|
|
OmImpl.__init__(self, OperationManager)
|
|
|
|
# AP
|
|
def stopCluster(self):
|
|
"""
|
|
function:Stop cluster
|
|
input:NA
|
|
output:NA
|
|
"""
|
|
self.logger.log("Stopping the cluster.")
|
|
# Stop cluster in 300 seconds
|
|
cmd = "source %s; %s -t %d" % (
|
|
self.context.g_opts.mpprcFile, OMCommand.getLocalScript("Gs_Stop"),
|
|
DefaultValue.TIMEOUT_CLUSTER_STOP)
|
|
(status, output) = subprocess.getstatusoutput(cmd)
|
|
if (status != 0):
|
|
self.logger.log(
|
|
"Warning: Failed to stop cluster within 300 seconds,"
|
|
"stopping cluster again at immediate mode.")
|
|
cmd = "source %s; %s -m immediate -t %d" % (
|
|
self.context.g_opts.mpprcFile,
|
|
OMCommand.getLocalScript("Gs_Stop"),
|
|
DefaultValue.TIMEOUT_CLUSTER_STOP)
|
|
(status, output) = subprocess.getstatusoutput(cmd)
|
|
if (status != 0):
|
|
self.logger.log("The cmd is %s " % cmd)
|
|
raise Exception(
|
|
ErrorCode.GAUSS_516["GAUSS_51610"]
|
|
% "the cluster at immediate mode"
|
|
+ " Error: \n%s" % output)
|
|
|
|
self.logger.log("Successfully stopped the cluster.")
|
|
|
|
# AP
|
|
def startCluster(self):
|
|
"""
|
|
function:Start cluster
|
|
input:NA
|
|
output:NA
|
|
"""
|
|
self.logger.log("Starting the cluster.", "addStep")
|
|
# Delete cluster dynamic config if it is exist on all nodes
|
|
clusterDynamicConf = "%s/bin/cluster_dynamic_config" \
|
|
% self.oldClusterInfo.appPath
|
|
cmd = g_file.SHELL_CMD_DICT["deleteFile"] % (
|
|
clusterDynamicConf, clusterDynamicConf)
|
|
self.logger.debug(
|
|
"Command for removing the cluster dynamic configuration: %s."
|
|
% cmd)
|
|
self.sshTool.executeCommand(cmd)
|
|
# Start cluster in 300 seconds
|
|
cmd = "source %s; %s -t %s" % (
|
|
self.context.g_opts.mpprcFile,
|
|
OMCommand.getLocalScript("Gs_Start"),
|
|
DefaultValue.TIMEOUT_CLUSTER_START)
|
|
(status, output) = subprocess.getstatusoutput(cmd)
|
|
if (status != 0):
|
|
self.logger.debug("The cmd is %s " % cmd)
|
|
raise Exception(
|
|
ErrorCode.GAUSS_516["GAUSS_51607"]
|
|
% "the cluster" + " Error: \n%s" % output)
|
|
|
|
self.logger.log("Successfully started the cluster.", "constant")
|
|
|
|
##########################################################################
|
|
# Start Flow
|
|
##########################################################################
|
|
def getNodeId(self):
|
|
"""
|
|
function: get node Id
|
|
input: NA
|
|
output: NA
|
|
"""
|
|
clusterType = "cluster"
|
|
nodeId = 0
|
|
if (self.context.g_opts.nodeName != ""):
|
|
clusterType = "node"
|
|
dbNode = self.context.clusterInfo.getDbNodeByName(
|
|
self.context.g_opts.nodeName)
|
|
if not dbNode:
|
|
raise Exception(
|
|
ErrorCode.GAUSS_516["GAUSS_51619"]
|
|
% self.context.g_opts.nodeName)
|
|
nodeId = dbNode.id
|
|
elif (self.context.g_opts.azName != ""):
|
|
clusterType = self.context.g_opts.azName
|
|
# check whether the given azName is in the cluster
|
|
if (
|
|
self.context.g_opts.azName
|
|
not in self.context.clusterInfo.getazNames()):
|
|
raise Exception(
|
|
ErrorCode.GAUSS_500["GAUSS_50004"]
|
|
% '-az' + " The az name [%s] is not in the cluster."
|
|
% self.context.g_opts.azName)
|
|
return nodeId, clusterType
|
|
|
|
def doStartClusterByCm(self):
|
|
"""
|
|
function: start cluster by cm
|
|
:return: NA
|
|
"""
|
|
(nodeId, startType) = self.getNodeId()
|
|
if not self.context.cmCons[0]:
|
|
raise Exception(ErrorCode.GAUSS_516["GAUSS_51622"] %
|
|
("cm", "local"))
|
|
|
|
cluster_normal_status = [DbClusterStatus.CLUSTER_STATUS_NORMAL,
|
|
DbClusterStatus.CLUSTER_STATUS_DEGRADED]
|
|
|
|
if EnvUtil.is_dss_mode(self.context.g_opts.user):
|
|
cma_paths = DssConfig.get_cm_inst_path(
|
|
self.clusterInfo.dbNodes[nodeId])
|
|
if cma_paths and DssConfig.get_cma_res_value(
|
|
cma_paths[0], key='restart_delay') != str(
|
|
DssConfig.DMS_DEFAULT_RESTART_DELAY):
|
|
DssConfig.reload_cm_resource(
|
|
self.logger, timeout=DssConfig.DMS_DEFAULT_RESTART_DELAY)
|
|
if nodeId == 0 and self.dataDir:
|
|
raise Exception(ErrorCode.GAUSS_516["GAUSS_51655"] % ("cm", "-D"))
|
|
# start cluster
|
|
is_success = self.context.cmCons[0].startCluster(
|
|
self.context.g_opts.user,
|
|
nodeId,
|
|
self.context.g_opts.time_out,
|
|
False,
|
|
self.context.isSingle,
|
|
cluster_normal_status,
|
|
False,
|
|
self.context.g_opts.azName,
|
|
self.dataDir)
|
|
if is_success:
|
|
self.logger.log("Successfully started %s." % startType)
|
|
self.logger.debug("Operation succeeded: Start by cm.")
|
|
|
|
def doStartCluster(self):
|
|
"""
|
|
function: do start cluster
|
|
input: NA
|
|
output: NA
|
|
"""
|
|
self.logger.debug("Operating: Starting.")
|
|
# if has cm, will start cluster by cm_ctl command
|
|
if ((not self.context.clusterInfo.hasNoCm())
|
|
and DefaultValue.isgreyUpgradeNodeSpecify(self.context.user,
|
|
DefaultValue.GREY_UPGRADE_STEP_UPGRADE_PROCESS, None, self.context.logger)):
|
|
self.context.logger.debug("Have CM configuration, upgrade all"
|
|
" nodes together.")
|
|
self.doStartClusterByCm()
|
|
return
|
|
else:
|
|
self.context.logger.debug("Have CM configuration, rolling upgrade "
|
|
"partial node but not all nodes, so "
|
|
"start cluster with openGauss om.")
|
|
# Specifies the stop node
|
|
# Gets the specified node id
|
|
startType = "node" if self.context.g_opts.nodeName != "" else "cluster"
|
|
# Perform a start operation
|
|
self.logger.log("Starting %s." % startType)
|
|
self.logger.log("=========================================")
|
|
hostName = NetUtil.GetHostIpOrName()
|
|
# get the newest dynaminc config and send to other node
|
|
self.clusterInfo.checkClusterDynamicConfig(self.context.user, hostName)
|
|
if self.context.g_opts.nodeName == "":
|
|
hostList = self.clusterInfo.getClusterNodeNames()
|
|
else:
|
|
hostList = []
|
|
hostList.append(self.context.g_opts.nodeName)
|
|
self.sshTool = SshTool(self.clusterInfo.getClusterNodeNames(), None,
|
|
DefaultValue.TIMEOUT_CLUSTER_START)
|
|
if self.time_out is None:
|
|
time_out = DefaultValue.TIMEOUT_CLUSTER_START
|
|
else:
|
|
time_out = self.time_out
|
|
if self.context.g_opts.cluster_number:
|
|
cmd = "source %s; %s -U %s -R %s -t %s --security-mode=%s --cluster_number=%s" % (
|
|
self.context.g_opts.mpprcFile,
|
|
OMCommand.getLocalScript("Local_StartInstance"),
|
|
self.context.user, self.context.clusterInfo.appPath, time_out,
|
|
self.context.g_opts.security_mode, self.context.g_opts.cluster_number)
|
|
else:
|
|
cmd = "source %s; %s -U %s -R %s -t %s --security-mode=%s" % (
|
|
self.context.g_opts.mpprcFile,
|
|
OMCommand.getLocalScript("Local_StartInstance"),
|
|
self.context.user, self.context.clusterInfo.appPath, time_out,
|
|
self.context.g_opts.security_mode)
|
|
if self.dataDir != "":
|
|
cmd += " -D %s" % self.dataDir
|
|
failedOutput = ''
|
|
for nodeName in hostList:
|
|
(statusMap, output) = self.sshTool.getSshStatusOutput(cmd, [nodeName])
|
|
if statusMap[nodeName] != 'Success':
|
|
failedOutput += output
|
|
elif re.search("another server might be running", output):
|
|
self.logger.log(output)
|
|
elif re.search("] WARNING:", output):
|
|
tmp = '\n'.join(re.findall(".*] WARNING:.*", output))
|
|
self.logger.log(output[0:output.find(":")] + '\n' + tmp)
|
|
if len(failedOutput):
|
|
self.logger.log("=========================================")
|
|
raise Exception(
|
|
ErrorCode.GAUSS_536["GAUSS_53600"] % (cmd, failedOutput))
|
|
if startType == "cluster":
|
|
starttime = time.time()
|
|
cluster_state = ""
|
|
cmd = "source %s; gs_om -t status|grep cluster_state" \
|
|
% self.context.g_opts.mpprcFile
|
|
while time.time() <= 30 + starttime:
|
|
status, output = subprocess.getstatusoutput(cmd)
|
|
if status != 0:
|
|
raise Exception(
|
|
ErrorCode.GAUSS_516["GAUSS_51607"] % "cluster" +
|
|
" After startup, check cluster_state failed")
|
|
else:
|
|
cluster_state = output.split()[-1]
|
|
if cluster_state != "Normal":
|
|
self.logger.log("Waiting for check cluster state...")
|
|
time.sleep(5)
|
|
else:
|
|
break
|
|
if cluster_state != "Normal":
|
|
raise Exception(ErrorCode.GAUSS_516["GAUSS_51607"] % "cluster"
|
|
+ " After startup, the last check results were"
|
|
" %s. Please check manually."
|
|
% cluster_state)
|
|
self.logger.log("=========================================")
|
|
self.logger.log("Successfully started.")
|
|
self.logger.debug("Operation succeeded: Start.")
|
|
|
|
def doStopClusterByCm(self):
|
|
"""
|
|
function: stop cluster by cm
|
|
:return: None
|
|
"""
|
|
(nodeId, _) = self.getNodeId()
|
|
if not self.context.cmCons[0]:
|
|
raise Exception(ErrorCode.GAUSS_516["GAUSS_51622"] %
|
|
("cm", "local"))
|
|
if self.time_out is None:
|
|
time_out = DefaultValue.TIMEOUT_CLUSTER_STOP
|
|
else:
|
|
time_out = int(self.time_out)
|
|
if nodeId == 0 and self.dataDir:
|
|
raise Exception(ErrorCode.GAUSS_516["GAUSS_51655"] % ("cm", "-D"))
|
|
self.context.cmCons[0].stop_cluster((nodeId,
|
|
self.mode,
|
|
time_out,
|
|
self.dataDir,
|
|
self.context.g_opts.azName))
|
|
self.logger.debug("Operation succeeded: Stop by cm.")
|
|
|
|
def doStopCluster(self):
|
|
"""
|
|
function: do stop cluster
|
|
input: NA
|
|
output: NA
|
|
"""
|
|
self.logger.debug("Operating: Stopping.")
|
|
# if has cm, will start cluster by cm_ctl command
|
|
if not self.context.clusterInfo.hasNoCm():
|
|
self.doStopClusterByCm()
|
|
return
|
|
# Specifies the stop node
|
|
# Gets the specified node id
|
|
stop_type = "node" if self.context.g_opts.nodeName != "" else "cluster"
|
|
# Perform a stop operation
|
|
self.logger.log("Stopping %s." % stop_type)
|
|
self.logger.log("=========================================")
|
|
if self.context.g_opts.nodeName == "":
|
|
host_list = self.clusterInfo.getClusterNodeNames()
|
|
else:
|
|
host_list = []
|
|
host_list.append(self.context.g_opts.nodeName)
|
|
self.sshTool = SshTool(self.clusterInfo.getClusterNodeNames(), None,
|
|
DefaultValue.TIMEOUT_CLUSTER_START)
|
|
if self.time_out is None:
|
|
time_out = DefaultValue.TIMEOUT_CLUSTER_STOP
|
|
else:
|
|
time_out = self.time_out
|
|
cmd = "source %s; %s -U %s -R %s -t %s" % (
|
|
self.context.g_opts.mpprcFile,
|
|
OMCommand.getLocalScript("Local_StopInstance"),
|
|
self.context.user, self.context.clusterInfo.appPath, time_out)
|
|
if self.dataDir != "":
|
|
cmd += " -D %s" % self.dataDir
|
|
if self.mode != "":
|
|
cmd += " -m %s" % self.mode
|
|
(statusMap, output) = self.sshTool.getSshStatusOutput(cmd, host_list)
|
|
for nodeName in host_list:
|
|
if statusMap[nodeName] != 'Success':
|
|
raise Exception(
|
|
ErrorCode.GAUSS_536["GAUSS_53606"] % (cmd, output))
|
|
self.logger.log("Successfully stopped %s." % stop_type)
|
|
|
|
self.logger.log("=========================================")
|
|
self.logger.log("End stop %s." % stop_type)
|
|
self.logger.debug("Operation succeeded: Stop.")
|
|
|
|
def doView(self):
|
|
"""
|
|
function:get cluster node info
|
|
input:NA
|
|
output:NA
|
|
"""
|
|
# view static_config_file
|
|
self.context.clusterInfo.printStaticConfig(self.context.g_opts.outFile)
|
|
|
|
def doQuery(self):
|
|
"""
|
|
function: do query
|
|
input : NA
|
|
output : NA
|
|
"""
|
|
hostName = NetUtil.GetHostIpOrName()
|
|
dbNums = len(self.context.clusterInfo.dbNodes)
|
|
sshtools = []
|
|
for _ in range(dbNums - 1):
|
|
sshtools.append(SshTool([], timeout=self.time_out))
|
|
cmd = queryCmd()
|
|
if (self.context.g_opts.outFile != ""):
|
|
cmd.outputFile = self.context.g_opts.outFile
|
|
self.context.clusterInfo.queryClsInfo(hostName, sshtools,
|
|
self.context.mpprcFile, cmd)
|
|
|
|
def doRefreshConf(self):
|
|
"""
|
|
function: do refresh conf
|
|
input : NA
|
|
output : NA
|
|
"""
|
|
if self.context.clusterInfo.isSingleNode():
|
|
self.logger.log(
|
|
"No need to generate dynamic configuration file for one node.")
|
|
return
|
|
if DefaultValue.cm_exist_and_is_disaster_cluster(self.context.clusterInfo, self.logger):
|
|
self.logger.log(
|
|
"Streaming disaster cluster do not need to generate dynamic configuration.")
|
|
return
|
|
self.logger.log("Generating dynamic configuration file for all nodes.")
|
|
hostname = NetUtil.GetHostIpOrName()
|
|
sshtool = SshTool(self.context.clusterInfo.getClusterNodeNames())
|
|
self.context.clusterInfo.doRefreshConf(self.context.user, hostname,
|
|
sshtool)
|
|
|
|
self.logger.log("Successfully generated dynamic configuration file.")
|