gs_om -t status 查询,添加超时时间,以及改为并行查询

This commit is contained in:
zhang_xubo
2021-08-07 17:43:28 +08:00
parent c18b389990
commit f2f2e6010e
5 changed files with 113 additions and 49 deletions

View File

@ -36,6 +36,7 @@ sys.path.append(os.path.split(os.path.realpath(__file__))[0] + "/../../")
from gspylib.os.gsfile import g_file
from gspylib.common.ErrorCode import ErrorCode
from gspylib.common.VersionInfo import VersionInfo
from gspylib.threads.parallelTool import parallelTool
###########################
# instance role
@ -265,6 +266,8 @@ azName3 = "AZ3"
AZNMAE_LIST = [azName1, azName2, azName3]
DN_ROLE_MAP = {"Primary": "P", "Standby": "S", "Normal": "P", "Secondary": "R"}
# global param to cache gs_om query instance result.
global_cls_query_rst = {}
def InstanceIgnore_haPort(Object):
"""
@ -1633,7 +1636,55 @@ class dbClusterInfo():
except Exception as e:
raise Exception(ErrorCode.GAUSS_516["GAUSS_51652"] % str(e))
def queryClsInfo(self, hostName, sshtool, mpprcFile, cmd):
def queryClsInfoParallel(self, hostName, sshtools, mpprcFile, cmd):
"""
function : queryClsInfoParallel
Query cluster information in parallel.
input : String
output : Map
"""
dbInfoList = []
index = 0
for dbNode in self.dbNodes:
for dnInst in dbNode.datanodes:
sshtool = None
if (dbNode.name != hostName):
sshtool = sshtools[index]
index += 1
querycmd = "gs_ctl query -D %s" % dnInst.datadir
portcmd = "gs_guc check -D %s -c port" % dnInst.datadir
splitcmd = "echo %s" % "------------------------"
sshcmd = "%s;%s;%s" % (querycmd, splitcmd, portcmd)
dbName = dbNode.name
dbInfoList.append({
"name": dbName,
"command": sshcmd,
"sshtool": sshtool
})
def queryInstance(dbInfo):
dnName = dbInfo["name"]
command = dbInfo["command"]
sshtool = dbInfo["sshtool"]
status = 0
output = ""
if (dnName != hostName):
(statusMap, output) = sshtool.getSshStatusOutput(
command, [dnName], mpprcFile)
if statusMap[dnName] != 'Success':
status = -1
else:
(status, output) = subprocess.getstatusoutput(command)
global_cls_query_rst[dnName] = [status, output]
global global_cls_query_rst;
parallelTool.parallelExecute(queryInstance, dbInfoList)
return global_cls_query_rst
def queryClsInfo(self, hostName, sshtools, mpprcFile, cmd):
try:
clusterState = 'Normal'
roleStatusArray = []
@ -1647,47 +1698,27 @@ class dbClusterInfo():
primaryDbNum = 0
primaryDbState = ""
portMap = {}
queryClsResult = self.queryClsInfoParallel(hostName, sshtools, mpprcFile, cmd)
for dbNode in self.dbNodes:
for dnInst in dbNode.datanodes:
sshcmd = "gs_ctl query -D %s" % dnInst.datadir
output = ""
if (dbNode.name != hostName):
(statusMap, output) = sshtool.getSshStatusOutput(
sshcmd, [dbNode.name], mpprcFile)
if statusMap[dbNode.name] != 'Success' or \
output.find("exc_sql failed") > 0:
if output.find(
"could not connect to the local server") \
> 0 or output.find(
"Is server running") > 0:
roleStatus = "Down"
dbState = "Manually stopped"
else:
roleStatus = "Unknown"
dbState = "Unknown"
(status, output) = queryClsResult.get(dbNode.name)
if status != 0 or output.find("exc_sql failed") > 0:
if output.find(
"could not connect to the local server") \
> 0 or output.find(
"Is server running") > 0:
roleStatus = "Down"
dbState = "Manually stopped"
else:
res = re.findall(r'local_role\s*:\s*(\w+)', output)
roleStatus = res[0]
res = re.findall(r'db_state\s*:\s*(\w+)', output)
dbState = res[0]
roleStatus = "Unknown"
dbState = "Unknown"
else:
(status, output) = subprocess.getstatusoutput(sshcmd)
if status != 0 or output.find("exc_sql failed") > 0:
if output.find(
"could not connect to the local server") \
> 0 or output.find(
"Is server running") > 0:
roleStatus = "Down"
dbState = "Manually stopped"
else:
roleStatus = "Unknown"
dbState = "Unknown"
else:
res = re.findall(r'local_role\s*:\s*(\w+)', output)
roleStatus = res[0]
res = re.findall(r'db_state\s*:\s*(\w+)', output)
dbState = res[0]
res = re.findall(r'local_role\s*:\s*(\w+)', output)
roleStatus = res[0]
res = re.findall(r'db_state\s*:\s*(\w+)', output)
dbState = res[0]
if (dbState == "Need"):
detailInformation = re.findall(
@ -1719,9 +1750,7 @@ class dbClusterInfo():
clusterState = 'Degraded'
# get port info by guc
portMap[dbNode.name] = self.__getPortInfo(dbNode.name, \
hostName, dnInst.datadir, sshtool, mpprcFile)
portMap[dbNode.name] = self.__getPortInfoLocal(dbNode.name, queryClsResult)
if dnNodeCount == 1:
clusterState = "Unavailable" if dbState != "Normal" \
else "Normal"
@ -1800,6 +1829,27 @@ class dbClusterInfo():
except Exception as e:
raise Exception(ErrorCode.GAUSS_516["GAUSS_51652"] % str(e))
def __getPortInfoLocal(self, nodeName, queryClsResult):
"""
function: Get port info form guc result
"""
(status, output) = queryClsResult.get(nodeName)
if status != 0:
return None
DEFAULT_PORT = 5432
if not output:
return None
portpattern = re.compile("port=(\d+|NULL)")
result = portpattern.findall(output)
portvalue = ""
if len(result) > 0:
portvalue = result.pop()
if portvalue != "NULL":
return int(portvalue)
return DEFAULT_PORT
def __getPortInfo(self, nodeName, hostName, dnpath, sshtool, mpprcFile=""):
"""
function: Get port info form guc result

View File

@ -108,9 +108,9 @@ gs_om_restart = ["-t:", "-?", "--help", "-V", "--version", "-h:", "-I:",
"--time-out=", "--az=", "-l:", "--nodeId=", "-D:",
"--security-mode=", "--mode=", "-m:"]
gs_om_view = ["-t:", "-?", "--help", "-V", "--version", "-o:", "-l:"]
gs_om_query = ["-t:", "-?", "--help", "-V", "--version", "-o:", "-l:"]
gs_om_query = ["-t:", "-?", "--help", "-V", "--version", "-o:", "-l:", "--time-out="]
gs_om_status = ["-t:", "-?", "--help", "-V", "--version", "-h:", "-o:",
"--detail", "--all", "-l:", "--az="]
"--detail", "--all", "-l:", "--az=", "--time-out="]
gs_om_generateconf = ["-t:", "-?", "--help", "-V", "--version", "-X:",
"--distribute", "-l:"]
gs_om_cert = ["-t:", "-?", "--help", "-V", "--version", "-L", "-l:",