Files

294 lines
9.0 KiB
Python

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#############################################################################
# Copyright (c) 2023 Huawei Technologies Co.,Ltd.
#
# openGauss is licensed under Mulan PSL v2.
# You can use this software according to the terms
# and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
# http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
# WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# ----------------------------------------------------------------------------
# Description : gs_upgradechk is a utility to check meta data in gaussdb after upgrade.
#############################################################################
from enum import Enum
from upgrade_checker.opengauss import og
from upgrade_checker.log import logger
from upgrade_checker.rules.category import Category
from upgrade_checker.rules.meta import Accuracy
class Rule(object):
"""
单条检测SQL。
所有的检测sql都通过这个结构进行整理、检测、分析。
通过此数据结构来进行检测SQL的执行、结果的整理,检测内容、结果描述信息的整理生成。
"""
def __init__(self, sql, sql_desc="详见sql", key_desc="内容为%s的行", accuracy=Accuracy.STRICT,
callback=None):
"""
:param sql: 校验sql
:param sql_desc: 校验sql的描述,结尾不要带句号等标点符号
:param key_desc: 键值的描述,需要带一个%s用于格式匹配,结尾不要带句号等标点符号
:param accuracy: 校验精度
:param callback: 回调函数,用于对结果做一些处理
"""
self.sql = sql
self.accuracy = accuracy
self.sql_desc = sql_desc
self.key_desc = key_desc
self.callback = callback
self.result = {} # sql res key, val
def run(self):
"""
执行检查、重新执行检察。
:return:
"""
qres = og.query(self.sql)
for row in qres.data:
if self.result.get(row[0]) is not None:
logger.warning('在规则%s的查询执行中,发现重复键值%s' % (self.sql, row[0]))
if qres.col_count() == 1:
self.result[row[0]] = row[0]
else:
assert qres.col_count() == 2
self.result[row[0]] = row[1]
if self.callback is not None:
self.callback.run(self)
logger.debug('规则查询完成,共得到%d条结果,规则内容:%s' % (len(self.result), self.sql))
return self.result
def simplify(self):
"""
:return:
"""
return {
'sql': self.sql,
'accuracy': self.accuracy.name,
'sql_desc': self.sql_desc,
'key_desc': self.key_desc,
'result': self.result
}
def analyze(self, expect):
"""
:param expect:
:return:
"""
return Conclusion(self, expect)
class ConclusionState(Enum):
SUCCESS = 0
FAILED = 1
IGNORE = 2
class Conclusion(object):
"""
表示一个rule的分析结论。
"""
def __init__(self, rule, expect):
"""
:param rule:
:param expect:
"""
assert expect is None or rule.sql == expect['sql']
self.rule = rule
self.expect = expect
self.details = []
self.warnings = []
self.suc_count = 0
self.err_count = 0
self._analyze(rule, expect)
def all_success(self):
return self.err_count == 0
def _analyze_one_row(self, key, e_key, row_desc, val, e_val, accuracy):
"""
:param key:
:param e_key:
:param row_desc:
:param val:
:param e_val:
:param accuracy:
:return:
"""
detail = {
'state': ConclusionState.SUCCESS,
'summary': '成功'
}
if key is None:
detail['state'] = ConclusionState.FAILED
detail['summary'] = '%s 缺失' % row_desc
elif e_key is None:
if accuracy == Accuracy.ALLOW_MORE:
detail['state'] = ConclusionState.SUCCESS
detail['summary'] = '%s 允许冗余,校验成功' % row_desc
else:
detail['state'] = ConclusionState.FAILED
detail['summary'] = '%s 冗余' % row_desc
elif val != e_val:
detail['state'] = ConclusionState.FAILED
detail['summary'] = '%s 错误(预期%s 实际%s' % (row_desc, e_val, val)
else:
detail['state'] = ConclusionState.SUCCESS
detail['summary'] = '%s 校验成功' % row_desc
if detail['state'] == ConclusionState.SUCCESS:
self.suc_count += 1
else:
self.err_count += 1
self.details.append(detail)
def _analyze(self, rule, expect):
"""
:param rule:
:param expect:
:return:
"""
# 是否能找到这条规则的预期
if expect is None:
self.warnings = ['规则%s在校验地图内不存在。可能是由于vmap与工具不匹配、系统表结构升级不正确等原因导致,请手动排查。' % rule.sql]
return
# 以rule为基础,对照expect进行判断
for row_key, row_val in rule.result.items():
row_desc = rule.key_desc % row_key
erow_val = expect['result'].get(row_key)
erow_key = row_key if erow_val is not None else None
self._analyze_one_row(row_key, erow_key, row_desc, row_val, erow_val, rule.accuracy)
# 找到expect里有但实际没有的
for erow_key, erow_val in expect['result'].items():
row_val = rule.result.get(erow_key)
if row_val is not None:
continue
row_desc = expect['key_desc'] % erow_key
self._analyze_one_row(None, erow_key, row_desc, '', erow_val, rule.accuracy)
class StructRule(Rule):
"""
系统表结构校验规则。
"""
def __init__(self, rel_key, category, sql, sql_desc="详见sql", key_desc="内容为%s的行",
accuracy=Accuracy.STRICT, callback=None):
super(StructRule, self).__init__(sql, sql_desc, key_desc, accuracy, callback)
self.rel_key = rel_key
self.category = category
def simplify(self):
data = super(StructRule, self).simplify()
data['rel_key'] = self.rel_key
data['category'] = self.category.name
return data
def analyze(self, expect):
"""
:param expect:
:return:
"""
conclusion = super(StructRule, self).analyze(expect)
if self.category == Category.UNKNOWN:
conclusion.warnings.append('发现未分类的系统表 {0}, 建议升级工具。'.format(self.rel_key))
return conclusion
class ContentRule(Rule):
"""
系统表内容校验规则。
"""
def __init__(self, rel_key, category, sql, sql_desc="详见sql", key_desc="内容为%s的行",
accuracy=Accuracy.STRICT, callback=None):
super(ContentRule, self).__init__(sql, sql_desc, key_desc, accuracy, callback)
self.rel_key = rel_key
self.category = category
def simplify(self):
data = super(ContentRule, self).simplify()
data['rel_key'] = self.rel_key
data['category'] = self.category.name
return data
def analyze(self, expect):
"""
:param expect:
:return:
"""
conclusion = super(ContentRule, self).analyze(expect)
return conclusion
class CommonRule(Rule):
"""
常规通用校验规则。
"""
def __init__(self, category, sql, sql_desc="详见sql", key_desc="内容为%s的行",
accuracy=Accuracy.STRICT, callback=None):
super(CommonRule, self).__init__(sql, sql_desc, key_desc, accuracy, callback)
self.category = category
def simplify(self):
data = super(CommonRule, self).simplify()
data['category'] = self.category.name
return data
def analyze(self, expect):
"""
:param expect:
:return:
"""
conclusion = super(CommonRule, self).analyze(expect)
if self.category == Category.UNKNOWN:
conclusion.warnings.append('发现未分类的通用规则 {0},请升级工具。'.format(self.sql))
return conclusion
class RuleCallback(object):
def __init__(self):
self.res_keys = []
self.exp_keys = []
self.callback = []
def run(self, rule):
pass
COMMON_RULES = [
# 暂时没有,放个假的临时替代。
CommonRule(Category.TABLE,
"select 'test', 1",
'暂无规则描述',
'测试%s',
Accuracy.STRICT
)
]