174 lines
5.3 KiB
Python
174 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding:utf-8 -*-
|
|
#############################################################################
|
|
# Copyright (c) 2023 Huawei Technologies Co.,Ltd.
|
|
#
|
|
# openGauss is licensed under Mulan PSL v2.
|
|
# You can use this software according to the terms
|
|
# and conditions of the Mulan PSL v2.
|
|
# You may obtain a copy of Mulan PSL v2 at:
|
|
#
|
|
# http://license.coscl.org.cn/MulanPSL2
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OF ANY KIND,
|
|
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
# See the Mulan PSL v2 for more details.
|
|
# ----------------------------------------------------------------------------
|
|
# Description : gs_upgradechk is a utility to check meta data in gaussdb after upgrade.
|
|
#############################################################################
|
|
|
|
"""
|
|
openGauss查询模块,用于连接og进行sql查询,并将查询结果进行包装返回。
|
|
"""
|
|
|
|
import subprocess
|
|
from subprocess import Popen, PIPE
|
|
from upgrade_checker.log import logger
|
|
from upgrade_checker.utils.singleton import singleton
|
|
from upgrade_checker.utils.command import Shell
|
|
|
|
|
|
class QueryResult(object):
|
|
"""
|
|
og查询结果。用于将查询结果进行封装和处理。
|
|
需要注意的是,目前暂时不能处理结果行中包含换行的场景,会导致结果拆分错误,因此在设计查询SQL时应注意。
|
|
"""
|
|
def __init__(self, sql, data, err):
|
|
self._sql = sql # sql
|
|
self.errmsg = err if err != '' else None # 错误信息
|
|
self.attr = [] # 列名
|
|
self.data = [] # 结果
|
|
self._idx = 0 # 迭代输出结果的指针
|
|
|
|
if self.errmsg is not None:
|
|
logger.err("openGauss查询出错,\n查询命令:\n%s\n错误信息:\n%s" % (sql, err))
|
|
|
|
if data is None or len(data) == 0:
|
|
return
|
|
|
|
data = data.split('\n')
|
|
self.attr = [name.strip() for name in data[0].split('|')]
|
|
for row in data[2:-3]: # 删掉最后的行数和空行
|
|
self.data.append(tuple(col.strip() for col in row.split('|')))
|
|
|
|
def __getitem__(self, idx):
|
|
if idx >= len(self.data):
|
|
raise StopIteration
|
|
row = self.data[idx]
|
|
return row
|
|
|
|
def row_count(self):
|
|
return len(self.data)
|
|
|
|
def col_count(self):
|
|
return len(self.attr)
|
|
|
|
def size(self):
|
|
"""
|
|
:return: 行数, 列数
|
|
"""
|
|
return self.row_count(), self.col_count()
|
|
|
|
def output(self):
|
|
return self.attr, self.data
|
|
|
|
def value(self):
|
|
assert self.size() == (1, 1)
|
|
return self.data[0][0]
|
|
|
|
def reset_iterate(self):
|
|
self._idx = 0
|
|
|
|
def iterate(self, format_dict=False):
|
|
"""
|
|
迭代输出下一行,直接输出tuple,或者组装成dict进行输出
|
|
:return:
|
|
"""
|
|
if self._idx >= len(self.data):
|
|
self._idx = 0
|
|
return
|
|
|
|
if format_dict:
|
|
row = dict(zip(self.attr, self.data[self._idx]))
|
|
else:
|
|
row = self.data[self._idx]
|
|
|
|
return row
|
|
|
|
|
|
@singleton
|
|
class OpenGauss(object):
|
|
"""
|
|
openGauss连接模块,用于连接openGauss,执行查询、非查询语句。
|
|
"""
|
|
def __init__(self):
|
|
self._dbname = 'postgres'
|
|
self._port = 5432
|
|
self.version_info = ''
|
|
self.version = ''
|
|
self.nodename = ''
|
|
|
|
def __str__(self):
|
|
return '{0} [ RUN - nodename {1} port {2} ]'.format(
|
|
self.version_info,
|
|
self.nodename,
|
|
self._port)
|
|
|
|
def connect(self, dbname=None, port=None):
|
|
"""
|
|
链接数据库
|
|
:param dbname: database name
|
|
:param port: 端口号
|
|
:return:
|
|
"""
|
|
self._dbname = dbname if dbname is not None else self._dbname
|
|
self._port = port if port is not None else self._port
|
|
|
|
self.version_info = self.query('select version();').value()
|
|
parts = self.version_info.split(' ')
|
|
self.version = parts[1] if parts[0] == '(openGauss' else parts[3]
|
|
|
|
res = self.query('show pgxc_node_name;')
|
|
self.nodename = res.data[0][0]
|
|
|
|
logger.log('openGauss {0} {1} (port={2} db={3}) 连接成功.'.format(
|
|
self.version,
|
|
self.nodename,
|
|
self._port,
|
|
self._dbname))
|
|
return self
|
|
|
|
def query(self, sql, dbname=None, port=None):
|
|
"""
|
|
执行select,获取结果
|
|
:param sql: 查询语句
|
|
:param dbname: 指定的数据库名
|
|
:param port: 指定的端口号
|
|
:return:
|
|
"""
|
|
dbname = dbname if dbname is not None else self._dbname
|
|
port = port if port is not None else self._port
|
|
cmd = 'gsql -p {0} -d {1} -r'.format(port, dbname)
|
|
data, err = Shell.communicate(cmd, sql, check=False)
|
|
logger.debug('openGauss查询执行:{0}\n{1}'.format(cmd, sql))
|
|
return QueryResult(sql, data, err)
|
|
|
|
def execute(self, sql):
|
|
"""
|
|
执行非select语句。
|
|
:param sql:查询语句
|
|
:return:
|
|
"""
|
|
cmd = 'gsql -d %s -p %d -c "%s"' % (self._dbname, self._port, sql)
|
|
Shell.communicate(cmd, sql, check=True)
|
|
|
|
|
|
# 单例模式,全局仅一个
|
|
og = OpenGauss()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pass
|