Files

213 lines
7.6 KiB
Python

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#############################################################################
# Copyright (c) 2023 Huawei Technologies Co.,Ltd.
#
# openGauss is licensed under Mulan PSL v2.
# You can use this software according to the terms
# and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
# http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
# WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# ----------------------------------------------------------------------------
# Description : gs_upgradechk is a utility to check meta data in gaussdb after upgrade.
#############################################################################
"""
校验组件模块,由三个模块进行配合,进行整体的流程。
collector:负责数据收集。连接数据库,构造校验规则Rule,并执行Rule进行数据的收集。
analyzer:负责数据分析。依据vmap,对输入的携带有数据的Rule进行分析,生成结论Conclusion
exporter:负责数据简化导出。将Rule的结果进行简化,记录到一个新的vmap内,最终导出生成vmap。
"""
from upgrade_checker.utils.param import Action
from upgrade_checker.rules.rule_maker import RuleMaker
from upgrade_checker.rules.vmap import VMapHeader, VerifyMap
from upgrade_checker.opengauss import og
from upgrade_checker.log import logger
class Collector(object):
"""
负责数据收集。连接数据库,构造校验规则Rule,并执行Rule进行数据的收集。
"""
@staticmethod
def prepare_db_list(param):
# 如果参数指明了则使用指明的数据库
if param.database.value is not None:
logger.info(f'仅操作参数指明的数据库:{param.database.value}')
search_db_sql = f"select datname from pg_database " \
f"where datname = '{param.database.value}'"
qres = og.query(search_db_sql)
if qres.row_count() == 0:
logger.err(f"未检测到数据库 {param.database.value}")
return [param.database.value]
# 否则自动检测所有库,且仅支持A库
if param.action == Action.VERIFY:
search_db_sql = "select datname from pg_database " \
"where datname not in ('template0', 'template1') and " \
" upper(datcompatibility) != 'A'"
qres = og.query(search_db_sql)
if qres.row_count() != 0:
ignore_list = ','.join([line[0] for line in qres])
logger.warning(f'暂不支持非A库的校验,{ignore_list} 将被跳过。')
search_db_sql = "select datname from pg_database " \
"where datname not in ('template0', 'template1') and " \
" upper(datcompatibility) = 'A'"
qres = og.query(search_db_sql)
if qres.row_count() == 0:
logger.err('没有需要校验的库。')
return [row[0] for row in qres]
elif param.action == Action.EXPORT:
search_db_sql = "select datname from pg_database " \
"where datname = 'postgres' and upper(datcompatibility) = 'A'"
qres = og.query(search_db_sql)
if qres.row_count() == 0:
logger.err('暂不支持非A兼容性的postgres库。')
logger.info('使用postgres库生成校验地图。')
return ['postgres']
else:
assert False
def __init__(self, db_list):
"""
:param db_list: 需要收集的数据库名称列表
"""
self._database_list = db_list
self._idx = 0
self._rule_buffer = []
self._estimate = None
logger.log('verifier.collector 数据库采集列表:[%s]。' % (', '.join(self._database_list)))
logger.log('verifier.collector 初始化完成。')
def __str__(self):
return 'Collector: dblist [{0}], current is {1}'.format(
', '.join(self._database_list),
self._database_list[self._idx - 1]
)
def __iter__(self):
return self
def __next__(self):
if len(self._rule_buffer) == 0 and self._collect_next_db() is None:
raise StopIteration
rule = self._rule_buffer.pop()
rule.run()
return rule
def _estimate_workload(self):
self._estimate = len(self._rule_buffer)
def _collect_next_db(self):
if self._idx >= len(self._database_list):
return
self._rule_buffer = RuleMaker.make_rules(self._database_list[self._idx])
self._estimate_workload()
logger.log('数据库%s的所有校验规则准备完毕,共计%d条,开始执行数据采集....' % (self._database_list[self._idx], len(self._rule_buffer)))
self._idx += 1
return self._rule_buffer
def current_db(self):
return self._database_list[self._idx - 1]
def check_next_rule(self):
if len(self._rule_buffer) == 0 and self._collect_next_db() is None:
return
rule = self._rule_buffer.pop()
rule.run()
return rule
def get_progress(self):
"""
返回当前采集的预估进度。
"""
finished_db_percentage = (self._idx - 1) / len(self._database_list) * 100.0
curr_db_percentage = (1.0 - len(self._rule_buffer) / self._estimate) / len(self._database_list) * 100.0
percentage = int(finished_db_percentage + curr_db_percentage)
if percentage < 0:
percentage = 0
elif percentage > 100:
percentage = 100
return percentage
class Analyzer(object):
"""
负责数据分析。依据vmap,对输入的携带有数据的Rule进行分析,生成结论Conclusion
"""
def __init__(self, vmap):
self.vmap = VerifyMap(vmap)
self.vmap.load()
VMapHeader.check_availability(self.vmap.head)
logger.log('verifier.analyzer 初始化完成。')
def __del__(self):
self.close()
def __str__(self):
return 'Analyzer: using vmap ({0})'.format(self.vmap.__str__())
def analyze(self, rule):
"""
在vmap内找到对应的规则预期输出,并进行分析处理。
:param rule:
:return:
"""
expect = self.vmap.get(rule.sql)
conclusion = rule.analyze(expect)
logger.debug('分析完毕一条规则,共计{0}条小项,{1}告警,规则内容为:{2}'.format(
len(conclusion.details),
len(conclusion.warnings),
rule.sql)
)
return conclusion
def close(self):
pass
class Exporter(object):
"""
负责数据简化导出。将Rule的结果进行简化,记录到一个新的vmap内,最终导出生成vmap。
"""
def __init__(self, export_vmap_file):
self.vmap = VerifyMap(export_vmap_file)
logger.log('verifier.exporter 初始化完成。')
def __str__(self):
return 'Exporter: export to vmap ({0})'.format(self.vmap.show_info())
def record(self, rule):
self.vmap.push(rule)
logger.debug('Exporter完成记录一条规则:%s' % rule.sql)
def export(self):
self.vmap.dump()
logger.debug('Exporter导出校验地图完成。')
def close(self):
self.vmap.dump()