diff --git a/src/gausskernel/dbmind/tools/Detection/README.md b/src/gausskernel/dbmind/tools/Detection/README.md new file mode 100644 index 000000000..772cd7d1b --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/README.md @@ -0,0 +1,245 @@ +![structure](structure.png) + +## Introduction to Detection + +**Detection** is a monitor and abnormality-detect tool based on timeseries-forecast algorithm aim at openGauss +metrics, such as IO-Read、IO-Write、CPU-Usage、Memory-Usage、Disk-Usage. Detection can monitor multi-metric at same +time, and forecast trend of metric in future, if the forecast value in future is beyond the specified scope, it can +notify user timely. + +Detection is composed of two element, **agent** and **detector**, **agent** is deployed on same machine with openGauss, +and **detector** can be deployed on any machine which can correspond with agent by _http_ or _https_, for security +reason, we suggest use _https_. + +## Detection Installition + +we suggest to use _anaconda_ to manage your python environment + +**agent** + + python3.6+ + python-dateutil + configparse + +**detector** + + python3.6+ + pystan + python-dateutil + fbprophet + pandas + flask + flask_sqlalchemy + flask_restful + configparse + +notes: + +using ```python -m pip install --upgrade pip``` to upgrade your pip + +using ```conda install fbprophet``` to install fbprophet + +## Introduction to a-detection.conf + +the config is divided into section: `database`, `server`, `agent`, `forecast`, `log`, `security` + +in all sections in config, the `log` and `security` section is public, in addition to this, `agent` is used for +**agent** module, `database`, `server`, `forecast` is used in **detector** module. so you should note the +path in every section, make it clear that it is on **agent** or **detector**. + + [agent] + # timer of source to collect metric info + source_timer_interval = 1 + # timer of sink to send data to server + sink_timer_interval = 1 + # maxsize of channel: default value is 300 + channel_capacity = 300 + + [security] + # config for https, if `tls` is False, use http instead + tls = True + ca = ./certificate/ca/ca.crt + server_cert = ./certificate/server/server.crt + server_key = ./certificate/server/server.key + agent_cert = ./certificate/agent/agent.crt + agent_key = ./certificate/agent/agent.key + + [database] + # location of sqlite + database_path = ./data/sqlite.db + # max rows of table in sqlite, in order to prevent table is too large. + max_rows = 10000 + # frequency to remove surplus rows in table + max_flush_cache = 1000 + + [server] + host = 127.0.0.1 + # listen host of app + listen_host = 0.0.0.0 + # listen port of app + listen_port = 8080 + + [forecast] + # forecast algorithm, fbprophet represent facebook's prophet algorithm + predict_alg = fbprophet + + [log] + # relative dirname of log + log_dir = ./log + + +## Quick Guide + +###1. deploy certificate + +if you want to correspond with 'https', you should set `tls = True` firstly, then you should own certificate, +then place the certificate in the appropriate location(`./certificate/ca`, `./certificate/server`, `./certificate/agent`). +if `tls=False`, then will use 'http' method + +wo provide demo [script](shell) to generate certificate + +use [script](shell/gen_ca_certificate.sh) to generate ca certificate and secret key. the same goes for other +certificates + + sh gen_ca_certificate.sh + +this script will create dirname `certificate` in project, it include three sub-dirname named `ca`, `server`, `agent`, +ca certificate and secret key will be placed in `./certificate/ca`. + +you can also use your own ca certificate, just place it in `ca`. + +use [script](shell/gen_certificate.sh) to generate server certificate and secret key. + + sh gen_certificate.sh + +after generating certificate and secret key, you should place it in corresponding sub-dirname(`./certificate/server` or + `./certificate/agent`) + +###2. Install openGauss + +in our [task/metric_task.py](task/metric_task.py), the program will acquire openGauss metrics or openGauss environment automatically, + +if not install openGauss, then Exception will occur. + +###3. Deploy program + +just as said above, the Detection is composed of two modules, respectively are **agent** and **detector**, the agent is +deployed with openGauss, and the detector can be deployed at any machine which can correspond with agent machine. + +###4 Start program + +*step 1: deploy code* + +you can copy the code to other machine or use [script](bin/start.sh) + + sh start.sh --deploy_code + +*step 2: start detector in machine which has installed openGauss* + + nohup python main.py -r detector > /dev/null 2>&1 & + +or + + sh start.sh --start_detector + +*step 3: start agent in agent machine* + + nohup python main.py -r agent > /dev/null 2>&1 & + +or + + shb start.sh --start_agent + +you can use [script](bin/stop.sh) to stop agent and detector process + + +###5 Obeserve result + +the program has four logs file, respectively are **agent.log**, **server.log**, **monitor.log**, **abnormal.log**. + +agent.log: this log record running status of agent module. + +server.log: this log record running status of app + +monitor.log: this log record monitor status such as forecasting, detecting, etc. + +abnormal.log: this log record abnormal status of monitor metric + + +## Introduction to task + +the monitor metric is defined in [task/metric_task.py](task/metric_task.py), the function should return metric value. + +the monitor metric is configured in [task/metric_task.conf](task/metric_task.conf) + +### How to add monitor metric + +it is very easy to add metric that you want: + +*step 1: write code in [task/metric_task.py](task/metric_task.py) which get the value of metric.* + +*step 2: add metric config in [task/metric_task.conf](task/metric_task.conf) + + instruction of metric config: + + [cpu_usage_usage] + minimum = 20 + maximum = 100 + data_period = 2000 + forecast_interval = 20S + forecast_period = 150S + + int config of cpu_usage: + + 'maximum': maximum allowable value of cpu_usage, it is considered as Exception if value is highed than it. + 'minimum': minimum allowable value of cpu_usage, it is considered as Exception if value is lower than it. + note: you should at least provide one of it, if not, the metric will not be monitored. + + 'data_period': the value of 'data_period' reprensent time interval or length from now. for example, if we + want to get last 100 second data from now, then the value of 'data_period' is '100S'; if we want to get + last 20 days from now, then 'data_period' is '20D'; if we want to get last 1000 datasets, then the + 'data_period' is 1000 + 'forecast_interval': the interval of predict operation. for example: if we want to predict 'cpu_usage' every + 10 seconds, then the value of 'interval' is '10S'. + + 'forecast_period': the forecast length, for example, if we want to forecast value of cpu_usage in the future + 100 seconds at frequency of '1S', then the value of 'forecast_period' should be 100S. + + notes: 'S' -> second + 'M' -> minute + 'H' -> hour + 'D' -> day + 'W' -> week + +for example: + +if we want to monitor io_read of openGauss: + +*step 1:* + + task/metric_task.py + + def io_read(): + child1 = subprocess.Popen(['pidstat', '-d'], stdout=subprocess.PIPE, shell=False) + child2 = subprocess.Popen(['grep', 'gaussd[b]'], stdin=child1.stdout, stdout=subprocess.PIPE, shell=False) + result = child2.communicate() + if not result[0]: + return 0.0 + else: + return result[0].split()[3].decode('utf-8') + +*step2:* + + config.conf + + [io_read] + minimum = 30 + maximum = 100 + data_period = 1000 + forecast_interval = 25S + forecast_period = 200S + +*step3:* + +restart your project + diff --git a/src/gausskernel/dbmind/tools/Detection/a-detection.conf b/src/gausskernel/dbmind/tools/Detection/a-detection.conf new file mode 100644 index 000000000..65582f297 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/a-detection.conf @@ -0,0 +1,28 @@ +[database] +max_rows = 10000 +max_flush_cache = 1000 +database_path = ./data/metric.db + +[security] +tls = False +ca = ./certificate/ca/ca.crt +server_cert = ./certificate/server/server.crt +server_key = ./certificate/server/server.key +agent_cert = ./certificate/agent/agent.crt +agent_key = ./certificate/agent/agent.key + +[server] +host = 127.0.0.1 +listen_host = 0.0.0.0 +listen_port = 8080 + +[agent] +source_timer_interval = 1S +sink_timer_interval = 1S +channel_capacity = 100 + +[forecast] +forecast_alg = fbprophet + +[log] +log_dir = ./log diff --git a/src/gausskernel/dbmind/tools/Detection/agent/__init__.py b/src/gausskernel/dbmind/tools/Detection/agent/__init__.py new file mode 100644 index 000000000..ec194c2a1 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/agent/__init__.py @@ -0,0 +1 @@ +from .metric_agent import start_agent diff --git a/src/gausskernel/dbmind/tools/Detection/agent/agent_logger.py b/src/gausskernel/dbmind/tools/Detection/agent/agent_logger.py new file mode 100644 index 000000000..18434790e --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/agent/agent_logger.py @@ -0,0 +1,15 @@ +import os +from configparser import ConfigParser + +from main import config_path +from utils import detection_logger + +config = ConfigParser() +config.read(config_path) +log_dir_realpath = os.path.realpath(config.get('log', 'log_dir')) +if not os.path.exists(log_dir_realpath): + os.makedirs(log_dir_realpath) + +logger = detection_logger(level='INFO', + log_name='agent', + log_path=os.path.join(log_dir_realpath, 'agent.log')) diff --git a/src/gausskernel/dbmind/tools/Detection/agent/channel.py b/src/gausskernel/dbmind/tools/Detection/agent/channel.py new file mode 100644 index 000000000..0238a0b73 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/agent/channel.py @@ -0,0 +1,73 @@ +from queue import Queue, Empty, Full + +from .agent_logger import logger + + +class Channel: + def __init__(self): + pass + + def put(self, event): + pass + + def take(self): + pass + + def size(self): + pass + + +class MemoryChannel(Channel): + def __init__(self, name, maxsize=None): + Channel.__init__(self) + self.name = name + self.maxsize = maxsize + self.memory = Queue(maxsize) + + def put(self, event): + if self.maxsize and self.size() > self.maxsize: + logger.warn("Channel {name} has reach queue maxsize".format(name=self.name)) + try: + self.memory.put(event, block=True, timeout=0.2) + except Full: + logger.warn("throw away {name} data when reach maxsize".format(name=self.name)) + + def take(self): + try: + return self.memory.get_nowait() + except Empty: + logger.warn('Channel {name} is empty.'.format(name=self.name)) + return None + + def size(self): + rv = self.memory.qsize() + return 0 if rv is None else rv + + +class ChannelManager: + + def __init__(self): + self._channels = {} + + def add_channel(self, name, maxsize): + self._channels[name] = MemoryChannel(name=name, maxsize=maxsize) + logger.info('Channel {name} is created.'.format(name=name)) + + def get_channel(self, name): + return self._channels[name] + + def check(self, name): + if name not in self._channels: + return False + return True + + def get_channel_content(self): + contents = {} + for name, queue in self._channels.items(): + event = queue.take() + if event is not None: + contents[name] = event + return contents + + def size(self): + return len(self._channels) diff --git a/src/gausskernel/dbmind/tools/Detection/agent/db_source.py b/src/gausskernel/dbmind/tools/Detection/agent/db_source.py new file mode 100644 index 000000000..662afc207 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/agent/db_source.py @@ -0,0 +1,49 @@ +import threading +import time + +from .agent_logger import logger +from .source import Source + + +class TaskHandler(threading.Thread): + def __init__(self, interval, function, *args, **kwargs): + threading.Thread.__init__(self) + self._func = function + self._interval = interval + self._args = args + self._kwargs = kwargs + self._finished = threading.Event() + self._res = None + self._channel = None + + def set_channel(self, channel): + self._channel = channel + + def run(self): + while not self._finished.is_set(): + try: + self._res = self._func(*self._args, **self._kwargs) + self._channel.put({'timestamp': int(time.time()), 'value': self._res}) + except Exception as e: + logger.exception(e) + self._finished.wait(self._interval) + + def cancel(self): + self._finished.set() + + +class DBSource(Source): + def __init__(self): + Source.__init__(self) + self.running = False + self._tasks = {} + + def add_task(self, name, interval, task, maxsize, *args, **kwargs): + if name not in self._tasks: + self._tasks[name] = TaskHandler(interval, task, *args, **kwargs) + self._channel_manager.add_channel(name, maxsize) + self._tasks[name].set_channel(self._channel_manager.get_channel(name)) + + def start(self): + for _, task in self._tasks.items(): + task.start() diff --git a/src/gausskernel/dbmind/tools/Detection/agent/metric_agent.py b/src/gausskernel/dbmind/tools/Detection/agent/metric_agent.py new file mode 100644 index 000000000..44f0c68be --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/agent/metric_agent.py @@ -0,0 +1,110 @@ +import os +import ssl +from configparser import ConfigParser, NoOptionError + +from task import metric_task +from utils import get_funcs, transform_time_string, check_certificate +from .agent_logger import logger +from .channel import ChannelManager +from .db_source import DBSource +from .sink import HttpSink + + +def start_agent(config_path): + if not os.path.exists(config_path): + logger.error('{config_path} is not exist..'.format(config_path=config_path)) + return + + config = ConfigParser() + config.read(config_path) + + if not config.has_section('agent') or not config.has_section('server'): + logger.error("do not has 'agent' or 'server' section in config file...") + return + + if not config.has_option('server', 'host') or not config.has_option('server', 'listen_port'): + logger.error("do not has 'host' or 'listen_port' in 'server' section...") + return + else: + context = None + if config.has_option('security', 'tls') and config.getboolean('security', 'tls'): + url = 'https://' + config.get('server', 'host') + ':' + config.get('server', 'listen_port') + '/sink' + try: + agent_cert = os.path.realpath(config.get('security', 'agent_cert')) + agent_key = os.path.realpath(config.get('security', 'agent_key')) + ca = os.path.realpath(config.get('security', 'ca')) + except NoOptionError as e: + logger.error(e) + return + else: + logger.info(agent_cert) + logger.info(agent_key) + logger.info(ca) + ssl_certificate_status = check_certificate(agent_cert) + ca_certificate_status = check_certificate(ca) + if ssl_certificate_status['status'] == 'fail': + logger.error("error occur when check '{certificate}'.".format(certificate=agent_cert)) + else: + if ssl_certificate_status['level'] == 'info': + logger.info(ssl_certificate_status['info']) + elif ssl_certificate_status['level'] == 'warn': + logger.warn(ssl_certificate_status['info']) + else: + logger.error(ssl_certificate_status['info']) + return + + if ca_certificate_status['status'] == 'fail': + logger.error("error occur when check '{certificate}'.".format(certificate=ca)) + else: + if ca_certificate_status['level'] == 'info': + logger.info(ca_certificate_status['info']) + elif ca_certificate_status['level'] == 'warn': + logger.warn(ca_certificate_status['info']) + else: + logger.error(ca_certificate_status['info']) + return + pw_file = os.path.join(os.path.dirname(config_path), 'certificate/pwf') + with open(pw_file, mode='r') as f: + pw = f.read().strip() + + context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=ca) + context.check_hostname = False + context.load_cert_chain(certfile=agent_cert, keyfile=agent_key, password=pw) + else: + logger.warn("detect not config 'ssl certificate', use 'http' instead.[advise use 'https']") + url = 'http://' + config.get('server', 'host') + ':' + config.get('server', 'listen_port') + '/sink' + + default_agent_parameter_dicts = {'sink_timer_interval': '10S', + 'source_timer_interval': '10S', + 'channel_capacity': 1000} + + for parameter, default_value in default_agent_parameter_dicts.items(): + if not config.has_option('agent', parameter): + logger.warn("do not provide '{parameter}' in 'agent' section, use default '{default_value}'." + .format(parameter=parameter, + default_value=default_agent_parameter_dicts['sink_timer_interval'])) + value = default_value + else: + value = config.get('agent', parameter) + try: + if parameter in ('sink_timer_interval', 'source_timer_interval'): + globals()[parameter] = transform_time_string(value, mode='to_second') + if parameter == 'channel_capacity': + globals()[parameter] = int(value) + except Exception as e: + logger.error(e) + return + + chan = ChannelManager() + source = DBSource() + http_sink = HttpSink(interval=globals()['sink_timer_interval'], url=url, context=context) + source.channel_manager = chan + http_sink.channel_manager = chan + + for task_name, task_func in get_funcs(metric_task): + source.add_task(name=task_name, + interval=globals()['source_timer_interval'], + task=task_func, + maxsize=globals()['channel_capacity']) + source.start() + http_sink.start() diff --git a/src/gausskernel/dbmind/tools/Detection/agent/sink.py b/src/gausskernel/dbmind/tools/Detection/agent/sink.py new file mode 100644 index 000000000..3911f2c2d --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/agent/sink.py @@ -0,0 +1,59 @@ +import json +import time +from urllib import request + +from .agent_logger import logger + +header = {'Content-Type': 'application/json'} + + +class Sink: + def __init__(self): + self._channel_manager = None + self.running = False + + @property + def channel_manager(self): + return self._channel_manager + + @channel_manager.setter + def channel_manager(self, channel_manager): + self._channel_manager = channel_manager + + def process(self): + pass + + def start(self): + self.running = True + self.process() + + def stop(self): + self.running = False + + +class HttpSink(Sink): + def __init__(self, interval, url, context): + Sink.__init__(self) + self._interval = interval + self.running = False + self._url = url + self.context = context + + def process(self): + + logger.info('begin send data to {url}'.format(url=self._url)) + while self.running: + time.sleep(self._interval) + contents = self._channel_manager.get_channel_content() + if contents: + while True: + try: + req = request.Request(self._url, headers=header, data=json.dumps(contents).encode('utf-8'), + method='POST') + request.urlopen(req, context=self.context) + break + except Exception as e: + logger.warn(e, exc_info=True) + time.sleep(0.5) + else: + logger.warn('Not found data in each channel.') diff --git a/src/gausskernel/dbmind/tools/Detection/agent/source.py b/src/gausskernel/dbmind/tools/Detection/agent/source.py new file mode 100644 index 000000000..b44c9a5ac --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/agent/source.py @@ -0,0 +1,17 @@ +class Source: + def __init__(self): + self._channel_manager = None + + def start(self): + pass + + def stop(self): + pass + + @property + def channel_manager(self): + return self._channel_manager + + @channel_manager.setter + def channel_manager(self, channel_manager): + self._channel_manager = channel_manager diff --git a/src/gausskernel/dbmind/tools/Detection/detector/algorithms/__init__.py b/src/gausskernel/dbmind/tools/Detection/detector/algorithms/__init__.py new file mode 100644 index 000000000..7f46c7863 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/algorithms/__init__.py @@ -0,0 +1,8 @@ +from .fb_prophet import FacebookProphet + + +def forecast_algorithm(method): + if method == 'fbprophet': + return FacebookProphet + else: + raise Exception('No {method} forecast method.'.format(method=method)) diff --git a/src/gausskernel/dbmind/tools/Detection/detector/algorithms/fb_prophet.py b/src/gausskernel/dbmind/tools/Detection/detector/algorithms/fb_prophet.py new file mode 100644 index 000000000..d8beaabd1 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/algorithms/fb_prophet.py @@ -0,0 +1,61 @@ +import os +import pickle +import re +import time + +import pandas as pd +from fbprophet import Prophet + +from .model import AlgModel + +date_format = "%Y-%m-%d %H:%M:%S" + + +class FacebookProphet(AlgModel): + def __init__(self): + AlgModel.__init__(self) + self.model = None + self.date_unit_mapper = {'S': 'S', + 'M': 'T', + 'H': 'H', + 'D': 'D', + 'W': 'W'} + + def fit(self, timeseries): + try: + timeseries = pd.DataFrame(timeseries, columns=['ds', 'y']) + timeseries['ds'] = timeseries['ds'].map(lambda x: time.strftime(date_format, time.localtime(x))) + self.model = Prophet(yearly_seasonality=True, + weekly_seasonality=True, + daily_seasonality=True) + self.model.fit(timeseries) + except Exception: + raise + + def forecast(self, forecast_periods): + forecast_period, forecast_freq = re.match(r'(\d+)?([WDHMS])', forecast_periods).groups() + try: + # synchronize date-unit to fb-prophet + forecast_freq = self.date_unit_mapper[forecast_freq] + if forecast_period is None: + forecast_period = 1 + else: + forecast_period = int(forecast_period) + future = self.model.make_future_dataframe(freq=forecast_freq, + periods=forecast_period, + include_history=False) + predict_result = self.model.predict(future)[['ds', 'yhat']] + predict_result['ds'] = predict_result['ds'].map(lambda x: x.strftime(date_format)) + return predict_result.values[:, 0], predict_result.values[:, 1] + except Exception: + raise + + def save(self, model_path): + with open(model_path, mode='wb') as f: + pickle.dump(self.model, f) + + def load(self, model_path): + if not os.path.exists(model_path): + raise FileNotFoundError('%s not found.' % model_path) + with open(model_path, mode='rb') as f: + self.model = pickle.load(f) diff --git a/src/gausskernel/dbmind/tools/Detection/detector/algorithms/model.py b/src/gausskernel/dbmind/tools/Detection/detector/algorithms/model.py new file mode 100644 index 000000000..0739da5a7 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/algorithms/model.py @@ -0,0 +1,20 @@ +from abc import abstractmethod + + +class AlgModel(object): + def __init__(self): + pass + + @abstractmethod + def fit(self, timeseries): + pass + + @abstractmethod + def forecast(self, forecast_periods): + pass + + def save(self, model_path): + pass + + def load(self, model_path): + pass diff --git a/src/gausskernel/dbmind/tools/Detection/detector/monitor/__init__.py b/src/gausskernel/dbmind/tools/Detection/detector/monitor/__init__.py new file mode 100644 index 000000000..611728bc0 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/monitor/__init__.py @@ -0,0 +1 @@ +from .metric_monitor import start_monitor diff --git a/src/gausskernel/dbmind/tools/Detection/detector/monitor/abnormal_logger.py b/src/gausskernel/dbmind/tools/Detection/detector/monitor/abnormal_logger.py new file mode 100644 index 000000000..f0536e5b1 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/monitor/abnormal_logger.py @@ -0,0 +1,15 @@ +import os +from configparser import ConfigParser + +from main import config_path +from utils import detection_logger + +config = ConfigParser() +config.read(config_path) +log_dir_realpath = os.path.realpath(config.get('log', 'log_dir')) +if not os.path.exists(log_dir_realpath): + os.makedirs(log_dir_realpath) + +logger = detection_logger(level='INFO', + log_name='abnormal', + log_path=os.path.join(log_dir_realpath, 'abnormal.log')) diff --git a/src/gausskernel/dbmind/tools/Detection/detector/monitor/data_handler.py b/src/gausskernel/dbmind/tools/Detection/detector/monitor/data_handler.py new file mode 100644 index 000000000..3b10a41bf --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/monitor/data_handler.py @@ -0,0 +1,100 @@ +import os +import sqlite3 +import time + +from utils import transform_time_string +from .monitor_logger import logger + + +class DataHandler: + ''' + process sqlite3 data, provide data to forecastor + ''' + + def __init__(self, table, dbpath): + self._table = table + self._dbpath = dbpath + self._conn = None + self._cur = None + + def __enter__(self): + self.connect_db() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def connect_db(self): + if not os.path.exists(self._dbpath): + logger.error('{dbpath} not found'.format(dbpath=self._dbpath), exc_info=True) + return + self._conn = sqlite3.connect(self._dbpath) + self._cur = self._conn.cursor() + + def select_timeseries_by_timestamp(self, period): + timeseries = [] + times = 0 + times_limit = 5 + while times < times_limit: + try: + get_last_timestamp_sql = "select timestamp from {table} order by timestamp desc limit 1" + self._cur.execute(get_last_timestamp_sql.format(table=self._table)) + last_timestamp = self._cur.fetchall()[0][0] + time_interval = transform_time_string(period, mode='to_second') + select_timestamp = last_timestamp - time_interval + get_timeseries_sql = "select timestamp, value from {table} where timestamp >= '{select_timestamp}'" + self._cur.execute(get_timeseries_sql.format(table=self._table, select_timestamp=select_timestamp)) + timeseries = self._cur.fetchall() + if not timeseries: + logger.warn("get no timeseries from {table}', retry...".format(table=self._table)) + else: + return timeseries + except Exception as e: + logger.exception('exception occur when get timeseries: {error}, retry...'.format(error=str(e)), exc_info=True) + times += 1 + time.sleep(0.11) + return timeseries + + def select_timeseries_by_length(self, length): + timeseries = [] + times = 0 + times_limit = 5 + while times < times_limit: + try: + sql = "select * from (select * from {table} order by timestamp desc limit {length}) order by timestamp" + self._cur.execute(sql.format(table=self._table, length=length)) + timeseries = self._cur.fetchall() + if not timeseries: + logger.warn("get no timeseries from {table}', retry...".format(table=self._table)) + else: + return timeseries + + except Exception as e: + logger.exception('exception occur when get timeseries: {error}'.format(error=str(e)), exc_info=True) + times += 1 + time.sleep(0.11) + return timeseries + + def get_timeseries(self, period): + if isinstance(period, int): + timeseries = self.select_timeseries_by_length(length=period) + else: + timeseries = self.select_timeseries_by_timestamp(period=period) + return timeseries + + def check_table(self, table): + ''' + check whether table exist in data + ''' + sql = "select name from sqlite_master where type = 'table'" + self._cur.execute(sql) + tables = self._cur.fetchall() + tables = [item[0] for item in tables] + if table not in tables: + return False + return True + + def close(self): + self._cur.close() + self._conn.close() + diff --git a/src/gausskernel/dbmind/tools/Detection/detector/monitor/detect.py b/src/gausskernel/dbmind/tools/Detection/detector/monitor/detect.py new file mode 100644 index 000000000..3313fa288 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/monitor/detect.py @@ -0,0 +1,98 @@ +import types +from collections import OrderedDict +from functools import wraps +from itertools import groupby + +from .abnormal_logger import logger as a_logger +from .monitor_logger import logger as m_logger + + +class Detector: + def __init__(self, func): + wraps(func)(self) + + def __get__(self, instance, cls): + if instance is None: + return self + return types.MethodType(self, instance) + + def __call__(self, *args, **kwargs): + + def mapper_function(value): + if value > maximum: + result = (value, 'higher') + elif value < minimum: + result = (value, 'lower') + else: + result = (value, 'normal') + return result + + forecast_result = self.__wrapped__(*args, *kwargs) + if forecast_result['status'] == 'fail': + return + + metric_name = forecast_result['metric_name'] + future_value = forecast_result['future_value'] + future_date = forecast_result['future_date'] + minimum = forecast_result['detect_basis']['minimum'] + maximum = forecast_result['detect_basis']['maximum'] + + if minimum is None and maximum is not None: + minimum = '-inf' + value_map_result = list(map(lambda x: (x, 'higher') if x > maximum else (x, 'normal'), future_value)) + elif maximum is None and minimum is not None: + maximum = 'inf' + value_map_result = list(map(lambda x: (x, 'lower') if x < minimum else (x, 'normal'), future_value)) + else: + value_map_result = list(map(mapper_function, future_value)) + forecast_condition = OrderedDict(zip(future_date, value_map_result)) + for key, value in groupby(list(forecast_condition.items()), key=lambda item: item[1][1]): + metric_status = key + value_ = value + metric_date_value_scope = [(_item[0], _item[1][0]) for _item in value_] + maximum_forecast_value = round(max([_item[1] for _item in metric_date_value_scope]), 3) + minimum_forecast_value = round(min([_item[1] for _item in metric_date_value_scope]), 3) + if metric_status == 'normal': + if len(metric_date_value_scope) == 1: + m_logger.info('the forecast value of [{metric}]({minimum}~{maximum})' + ' at {date} is ({forecast_value}) [{metric_status}].' + .format(metric=metric_name, + minimum=minimum, + maximum=maximum, + forecast_value=metric_date_value_scope[0][1], + metric_status=metric_status, + date=metric_date_value_scope[0][0])) + else: + m_logger.info('the forecast value of [{metric}]({minimum}~{maximum}) in ' + '[{start_date}~{end_date}] is between ({minimum_forecast_value}' + '~{maximum_forecast_value}) [{metric_status}].' + .format(metric=metric_name, + minimum=minimum, + maximum=maximum, + minimum_forecast_value=minimum_forecast_value, + maximum_forecast_value=maximum_forecast_value, + metric_status=metric_status, + start_date=metric_date_value_scope[0][0], + end_date=metric_date_value_scope[-1][0])) + else: + if len(metric_date_value_scope) == 1: + a_logger.warn('the forecast value of [{metric}]({minimum}~{maximum})' + ' at {date} is ({forecast_value}) [{metric_status}].' + .format(metric=metric_name, + minimum=minimum, + maximum=maximum, + forecast_value=metric_date_value_scope[1], + metric_status=metric_status, + date=metric_date_value_scope[0][0])) + else: + a_logger.warn('the forecast value of [{metric}]({minimum}~{maximum}) in ' + '[{start_date}~{end_date}] is between ({minimum_forecast_value}' + '~{maximum_forecast_value}) [{metric_status}].' + .format(metric=metric_name, + minimum=minimum, + maximum=maximum, + minimum_forecast_value=minimum_forecast_value, + maximum_forecast_value=maximum_forecast_value, + metric_status=metric_status, + start_date=metric_date_value_scope[0][0], + end_date=metric_date_value_scope[-1][0])) diff --git a/src/gausskernel/dbmind/tools/Detection/detector/monitor/forecast.py b/src/gausskernel/dbmind/tools/Detection/detector/monitor/forecast.py new file mode 100644 index 000000000..a19be9cb5 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/monitor/forecast.py @@ -0,0 +1,61 @@ +from detector.monitor import detect +from .monitor_logger import logger + + +class Forecastor: + """ + repeat execute by Timer + """ + + def __init__(self, **kwargs): + self.minimum_timeseries_length = 20 + self.metric_name = kwargs['metric_name'] + self.database_path = kwargs['database_path'] + self.data_handler = kwargs['data_handler'] + self.forecast_alg = kwargs['forecast_alg'] + self.forecast_period = kwargs['forecast_period'] + self.forecast_interval = kwargs['forecast_interval'] + self.data_period = kwargs['data_period'] + + self.detect_basis = {'minimum': kwargs.get('minimum', None), + 'maximum': kwargs.get('maximum', None)} + + @detect.Detector + def run(self): + forecast_result = {} + with self.data_handler(self.metric_name, self.database_path) as db: + timeseries = db.get_timeseries(period=self.data_period) + logger.info('acquire data[{metric_name}] -> data_period: {data_period} data_length: {length}' + .format(metric_name=self.metric_name, + data_period=self.data_period, + length=len(timeseries))) + if not timeseries: + logger.error("can not get timeseries from table [{metric_name}] by period '{period}', " + "skip forecast step for [{metric_name}]".format(metric_name=self.metric_name, + period=self.data_period)) + forecast_result['status'] = 'fail' + else: + try: + if len(timeseries) < self.minimum_timeseries_length: + logger.warn( + "the length of timeseries[{metric_name}] is too short: [{ts_length}]." + .format(metric_name=self.metric_name, + ts_length=len(timeseries))) + self.forecast_alg.fit(timeseries) + self.forecast_period = self.forecast_period.upper() + date, value = self.forecast_alg.forecast(self.forecast_period) + logger.info("forecast[{metric_name}] -> forecast length: {length}" + .format(metric_name=self.metric_name, + length=len(value))) + forecast_result['status'] = 'success' + forecast_result['metric_name'] = self.metric_name + forecast_result['detect_basis'] = self.detect_basis + forecast_result['future_date'] = date + forecast_result['future_value'] = value + except Exception as e: + logger.error(e, exc_info=True) + forecast_result['status'] = 'fail' + return forecast_result + + def __repr__(self): + return 'forecastor of the metric {metric}'.format(metric=self.metric_name) diff --git a/src/gausskernel/dbmind/tools/Detection/detector/monitor/metric_monitor.py b/src/gausskernel/dbmind/tools/Detection/detector/monitor/metric_monitor.py new file mode 100644 index 000000000..3e642e573 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/monitor/metric_monitor.py @@ -0,0 +1,118 @@ +import os +import re +from configparser import ConfigParser + +from detector.algorithms import forecast_algorithm +from task import metric_task +from utils import get_funcs +from .data_handler import DataHandler +from .forecast import Forecastor +from .monitor import Monitor +from .monitor_logger import logger + + +def start_monitor(config_path, metric_config_path): + if not os.path.exists(config_path): + logger.error('{config_path} is not exist.'.format(config_path=config_path)) + return + if not os.path.exists(metric_config_path): + logger.error('{metric_config_path} is not exist.'.format(metric_config_path=metric_config_path)) + return + + config = ConfigParser() + config.read(config_path) + + if not config.has_section('forecast') or not config.has_section('database'): + logger.error("do not has 'forecast' or 'database' section in config file.") + return + + if not config.has_option('forecast', 'forecast_alg'): + logger.warn("do not find 'forecast_alg' in forecast section, use default 'fbprophet'.") + forecast_alg = forecast_algorithm('fbprophet') + else: + try: + forecast_alg = forecast_algorithm(config.get('forecast', 'forecast_alg')) + except Exception as e: + logger.warn("{error}, use default method: 'fbprophet'.".format(error=str(e))) + forecast_alg = forecast_algorithm('fbprophet') + + if not config.has_option('database', 'database_path'): + logger.error("do not find 'database_path' in database section...") + return + else: + database_path = config.get('database', 'database_path') + database_path = os.path.realpath(database_path) + + monitor_service = Monitor() + config.clear() + config.read(metric_config_path) + + metric_task_from_py = get_funcs(metric_task) + metric_name_from_py = [item[0] for item in metric_task_from_py] + metric_name_from_config = config.sections() + + default_metric_parameter_values = {'forecast_interval': '120S', + 'forecast_period': '60S', + 'data_period': '60S'} + + for metric_name in set(metric_name_from_config).union(set(metric_name_from_py)): + if metric_name in set(metric_name_from_config).difference(set(metric_name_from_py)): + logger.error("{metric_name} is not defined in 'task/metric_task.py', abandon monitoring." + .format(metric_name=metric_name)) + continue + + if metric_name in set(metric_name_from_py).difference(set(metric_name_from_config)): + logger.error("{metric_name} has no config information in 'task/metric_config.conf', abandon monitoring." + .format(metric_name=metric_name)) + continue + + if metric_name in set(metric_name_from_py).intersection(set(metric_name_from_config)): + kwargs = {} + if not config.has_option(metric_name, 'maximum') and not config.has_option(metric_name, 'minimum'): + logger.error("{metric_name} do not provide any range parameter ('minimum' or 'maximum'), skip monitor." + .format(metric_name=metric_name)) + continue + else: + if config.has_option(metric_name, 'maximum'): + kwargs['maximum'] = config.getfloat(metric_name, 'maximum') + if config.has_option(metric_name, 'minimum'): + kwargs['minimum'] = config.getfloat(metric_name, 'minimum') + + for parameter, default_value in default_metric_parameter_values.items(): + if not config.has_option(metric_name, parameter): + logger.warn("{metric_name} do not provide {parameter}, use default value: {default_value}." + .format(parameter=parameter, + metric_name=metric_name, + default_value=default_value)) + value = default_value + else: + temp_value = config.get(metric_name, parameter) + if parameter == 'data_period' and temp_value.isdigit(): + value = int(temp_value) + else: + try: + value_number, value_unit = re.match(r'(\d+)?([WDHMS])', temp_value).groups() + if value_number is None or value_unit is None or value_unit not in ('S', 'M', 'H', 'D', 'W'): + logger.error("wrong value: {metric_name} - {parameter}, only support 'S(second)' 'M(minute)'" + "'H(hour)' 'D(day)' 'W(week)', not support '{unit}', use default value: {default_value}" + .format(metric_name=metric_name, + unit=value_unit, + parameter=parameter, + default_value=default_value)) + value = default_value + else: + value = temp_value + except Exception as e: + logger.error("{metric_name} - {parameter} error: {error}, use default value: {default_value}.") + value = default_value + + kwargs[parameter] = value + + kwargs['forecast_alg'] = forecast_alg() + kwargs['database_path'] = database_path + kwargs['data_handler'] = DataHandler + kwargs['metric_name'] = metric_name + + monitor_service.apply(Forecastor(**kwargs)) + + monitor_service.start() diff --git a/src/gausskernel/dbmind/tools/Detection/detector/monitor/monitor.py b/src/gausskernel/dbmind/tools/Detection/detector/monitor/monitor.py new file mode 100644 index 000000000..02d559531 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/monitor/monitor.py @@ -0,0 +1,26 @@ +from utils import RepeatTimer, transform_time_string +from .monitor_logger import logger + + +class Monitor: + def __init__(self): + self._tasks = dict() + + def apply(self, instance, args=None, kwargs=None): + if instance in self._tasks: + return False + logger.info('add [{task}] in Monitor task......'.format(task=getattr(instance, 'metric_name'))) + interval = getattr(instance, 'forecast_interval') + try: + interval = transform_time_string(interval, mode='to_second') + except ValueError as e: + logger.error(e, exc_info=True) + return + timer = RepeatTimer(interval=interval, function=instance.run, args=args, kwargs=kwargs) + self._tasks[instance] = timer + return True + + def start(self): + for instance, timer in self._tasks.items(): + timer.start() + logger.info('begin to monitor [{task}]'.format(task=getattr(instance, 'metric_name'))) diff --git a/src/gausskernel/dbmind/tools/Detection/detector/monitor/monitor_logger.py b/src/gausskernel/dbmind/tools/Detection/detector/monitor/monitor_logger.py new file mode 100644 index 000000000..1049c8586 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/monitor/monitor_logger.py @@ -0,0 +1,14 @@ +import os +from configparser import ConfigParser + +from main import config_path +from utils import detection_logger + +config = ConfigParser() +config.read(config_path) +log_dir_realpath = os.path.realpath(config.get('log', 'log_dir')) +if not os.path.exists(log_dir_realpath): + os.makedirs(log_dir_realpath) +logger = detection_logger(level='INFO', + log_name='monitor', + log_path=os.path.join(log_dir_realpath, 'monitor.log')) diff --git a/src/gausskernel/dbmind/tools/Detection/detector/server/__init__.py b/src/gausskernel/dbmind/tools/Detection/detector/server/__init__.py new file mode 100644 index 000000000..f9928662b --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/server/__init__.py @@ -0,0 +1,11 @@ +from .database import db +from .detection_app import MyApp + + +def start_service(config_path): + app = MyApp() + app.initialize_config(config_path) + app.initialize_app() + app.initialize_database(db) + app.add_resources() + app.start_service() diff --git a/src/gausskernel/dbmind/tools/Detection/detector/server/app.py b/src/gausskernel/dbmind/tools/Detection/detector/server/app.py new file mode 100644 index 000000000..5c02adadd --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/server/app.py @@ -0,0 +1,12 @@ +class App(object): + def __init__(self): + pass + + def add_resources(self): + pass + + def init_database(self): + pass + + def start_service(self, *args, **kwargs): + pass diff --git a/src/gausskernel/dbmind/tools/Detection/detector/server/database.py b/src/gausskernel/dbmind/tools/Detection/detector/server/database.py new file mode 100644 index 000000000..cd94a5c9f --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/server/database.py @@ -0,0 +1,33 @@ +''' +define table structure in database +''' +from flask_sqlalchemy import SQLAlchemy + +from .server_logger import logger + +db = SQLAlchemy() + + +class Base(db.Model): + __abstract__ = True + row = 0 + timestamp = db.Column(db.BIGINT, nullable=False, primary_key=True) + value = db.Column(db.Float, nullable=False) + max_rows = 100000 + max_flush_cache = 1000 + + @classmethod + def limit_max_rows(cls): + db.session.execute(db.text( + "delete from {table} where timestamp in (select timestamp from {table} order by timestamp desc limit -1 " + "offset {max_rows})".format(table=cls.__tablename__, max_rows=cls.max_rows) + )) + logger.info('remove surplus rows in table [{table}]'.format(table=cls.__tablename__)) + + @classmethod + def on_insert(cls, mapper, connection, target): + if cls.rows % cls.max_flush_cache == 0: + cls.limit_max_rows() + cls.rows += 1 + else: + cls.rows += 1 diff --git a/src/gausskernel/dbmind/tools/Detection/detector/server/detection_app.py b/src/gausskernel/dbmind/tools/Detection/detector/server/detection_app.py new file mode 100644 index 000000000..46d63b70a --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/server/detection_app.py @@ -0,0 +1,137 @@ +import os +import ssl +from configparser import ConfigParser, NoOptionError + +from flask import Flask +from flask_restful import Api +from flask_sqlalchemy import event + +from task import metric_task +from utils import get_funcs, check_certificate +from .app import App +from .database import Base +from .resource import receiver +from .server_logger import logger + + +class MyApp(App): + def __init__(self): + App.__init__(self) + self.app = None + self.api = None + self.config = None + self.db = None + self.dirname_path = None + self.table_class_relation = {} + + def initialize_config(self, config_path): + logger.info('initialize config......') + if not os.path.exists(config_path): + logger.error('{config_path} is not exist..'.format(config_path=config_path)) + self.dirname_path = os.path.dirname(config_path) + self.config = ConfigParser() + self.config.read(config_path) + if not self.config.has_section('database'): + logger.error("do not find 'database' section in config file.") + else: + if not self.config.has_option('database', 'database_path'): + logger.error("do not find 'database_path' in database section.") + + if not self.config.has_section('server'): + logger.error("do not find 'database' section in config file.") + else: + if not self.config.has_option('server', 'listen_host') or not self.config.has_option('server', + 'listen_port'): + logger.error("do not find 'listen_host' or 'listen_port' in server section.") + + def initialize_app(self): + logger.info('initialize app......') + self.app = Flask(__name__) + self.app.config['debug'] = False + + database_path = os.path.realpath(self.config.get('database', 'database_path')) + database_path_dir = os.path.dirname(database_path) + if not os.path.exists(database_path_dir): + os.makedirs(database_path_dir) + if os.name == 'nt': + self.app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + database_path + elif os.name == 'posix': + self.app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////' + database_path + else: + logger.error("do not support this {system}".format(system=os.name)) + self.app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True + self.api = Api(self.app) + + def initialize_database(self, db): + logger.info('initialize database......') + self.db = db + self.db.init_app(self.app) + default_database_parameter_values = {'max_rows': 100000, 'max_flush_cache': 1000} + for parameter, default_value in default_database_parameter_values.items(): + if not self.config.has_option('database', parameter): + logger.warn("do not find '{parameter}' in database section, use default value: '{default_value}'" + .format(parameter=parameter, default_value=default_value)) + value = default_value + else: + value = self.config.getint('database', parameter) + globals()[parameter] = value + + Base.max_rows = globals()['max_rows'] + Base.max_flush_cache = globals()['max_flush_cache'] + metric_names = [func_name for func_name, _, in get_funcs(metric_task)] + for metric_name in metric_names: + table = type(metric_name.upper(), (Base, self.db.Model), {'__tablename__': metric_name, 'rows': 0}) + event.listen(table, 'after_insert', table.on_insert) + self.table_class_relation[metric_name] = table + with self.app.app_context(): + self.db.create_all() + + def add_resources(self): + self.api.add_resource(receiver.Source, '/sink', + resource_class_kwargs={'db': self.db, 'table_class_relation': self.table_class_relation}) + + def start_service(self): + context = None + listen_host = self.config.get('server', 'listen_host') + listen_port = self.config.getint('server', 'listen_port') + if self.config.has_option('security', 'tls') and self.config.getboolean('security', 'tls'): + try: + server_cert = self.config.get('security', 'server_cert') + server_key = self.config.get('security', 'server_key') + ca = self.config.get('security', 'ca') + except NoOptionError as e: + logger.error(e) + return + else: + ssl_certificate_status = check_certificate(server_cert) + ca_certificate_status = check_certificate(ca) + if ssl_certificate_status['status'] == 'fail': + logger.error("error occur when check '{certificate}'.".format(certificate=server_cert)) + else: + if ssl_certificate_status['level'] == 'info': + logger.info(ssl_certificate_status['info']) + elif ssl_certificate_status['level'] == 'warn': + logger.warn(ssl_certificate_status['info']) + else: + logger.error(ssl_certificate_status['info']) + return + + if ca_certificate_status['status'] == 'fail': + logger.error("error occur when check '{certificate}'.".format(certificate=ca)) + else: + if ca_certificate_status['level'] == 'info': + logger.info(ca_certificate_status['info']) + elif ca_certificate_status['level'] == 'warn': + logger.warn(ca_certificate_status['info']) + else: + logger.error(ca_certificate_status['info']) + return + + pw_file = os.path.join(self.dirname_path, 'certificate/pwf') + with open(pw_file, mode='r') as f: + pw = f.read().strip() + context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=ca) + context.verify_mode = ssl.CERT_REQUIRED + context.load_cert_chain(certfile=server_cert, keyfile=server_key, password=pw) + logger.info('Start service......') + self.app.run(host=listen_host, port=listen_port, ssl_context=context) diff --git a/src/gausskernel/dbmind/tools/Detection/detector/server/resource/receiver.py b/src/gausskernel/dbmind/tools/Detection/detector/server/resource/receiver.py new file mode 100644 index 000000000..ae0ef421c --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/server/resource/receiver.py @@ -0,0 +1,53 @@ +from flask import request +from flask_restful import Resource + +from ..server_logger import logger + + +class ResponseTuple: + """generate a response tuple.""" + + @staticmethod + def success(result=None): + if result is None: + return {"status": "success"}, 200 + + return {"status": "success", "result": result} + + @staticmethod + def error(msg="", status_code=400): + return {"status": "error", "msg": msg}, status_code + + +class Source(Resource): + + def __init__(self, db, table_class_relation): + self.db = db + self.table_class_relation = table_class_relation + + def post(self): + content = request.json + try: + for name, event in content.items(): + tup_obj = self.table_class_relation[name](timestamp=event['timestamp'], value=event['value']) + self.db.session.add(tup_obj) + self.db.session.commit() + return ResponseTuple.success() + except Exception as e: + logger.error('error when receive data from agent: ' + str(e)) + self.db.session.rollback() + return ResponseTuple.error(msg=str(e)) + + def get(self): + return ResponseTuple.error(status_code=400) + + def delete(self): + return ResponseTuple.error(status_code=400) + + +class Index(Resource): + def get(self): + return ResponseTuple.success() + + def delete(self): + return ResponseTuple.error(status_code=400) diff --git a/src/gausskernel/dbmind/tools/Detection/detector/server/server_logger.py b/src/gausskernel/dbmind/tools/Detection/detector/server/server_logger.py new file mode 100644 index 000000000..de60684bf --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/detector/server/server_logger.py @@ -0,0 +1,15 @@ +import os +from configparser import ConfigParser + +from main import config_path +from utils import detection_logger + +config = ConfigParser() +config.read(config_path) +log_dir_realpath = os.path.realpath(config.get('log', 'log_dir')) +if not os.path.exists(log_dir_realpath): + os.makedirs(log_dir_realpath) + +logger = detection_logger(level='INFO', + log_name='server', + log_path=os.path.join(log_dir_realpath, 'server.log')) diff --git a/src/gausskernel/dbmind/tools/Detection/main.py b/src/gausskernel/dbmind/tools/Detection/main.py new file mode 100644 index 000000000..d1a43771d --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/main.py @@ -0,0 +1,45 @@ +import argparse +import os +import sys +from multiprocessing import Process + +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'a-detection.conf') +metric_config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'task/metric_task.conf') + +__version__ = '1.0.0' + + +def parse_args(): + parser = argparse.ArgumentParser(description='abnormal detection: detect abnormality of database metric.') + parser.add_argument('-r', '--role', required=True, choices=['agent', 'detector'], help='run on client or detector.') + parser.version = __version__ + return parser.parse_args() + + +def main(args): + role = args.role + if role == 'agent': + agent_pid = os.getpid() + with open('./agent.pid', mode='w') as f: + f.write(str(agent_pid)) + from agent import start_agent + start_agent(config_path) + else: + from detector.server import start_service + from detector.monitor import start_monitor + server_process = Process(target=start_service, args=(config_path,)) + monitor_process = Process(target=start_monitor, args=(config_path, metric_config_path)) + server_process.start() + monitor_process.start() + with open('./server.pid', mode='w') as f: + f.write(str(server_process.pid)) + with open('./monitor.pid', mode='w') as f: + f.write(str(monitor_process.pid)) + server_process.join() + monitor_process.join() + + +if __name__ == '__main__': + main(parse_args()) diff --git a/src/gausskernel/dbmind/tools/Detection/shell/ca.conf b/src/gausskernel/dbmind/tools/Detection/shell/ca.conf new file mode 100644 index 000000000..e32b2e091 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/shell/ca.conf @@ -0,0 +1,6 @@ +[req] +distinguished_name = req_distinguished_name +prompt = no + +[req_distinguished_name] +O = A-Detection Certificate Authority diff --git a/src/gausskernel/dbmind/tools/Detection/shell/common.sh b/src/gausskernel/dbmind/tools/Detection/shell/common.sh new file mode 100644 index 000000000..aa965bcb9 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/shell/common.sh @@ -0,0 +1,4 @@ +CURRENT_DIR=$(cd ../$(dirname $0); pwd) +BASENAME=$(basename $CURRENT_DIR) + +PROJECT_NAME="A-Detection" diff --git a/src/gausskernel/dbmind/tools/Detection/shell/gen_ca_certificate.sh b/src/gausskernel/dbmind/tools/Detection/shell/gen_ca_certificate.sh new file mode 100644 index 000000000..1a9274015 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/shell/gen_ca_certificate.sh @@ -0,0 +1,46 @@ +#!/bin/bash +source ./initialize_certificate.sh + +ca_crt="ca.crt" +ca_key="ca.key" +ca_password="" + +read -s -p "please input the password of ca: " ca_password + +cat > ca.conf <<-EOF +[req] +distinguished_name = req_distinguished_name +prompt = no + +[req_distinguished_name] +O = $PROJECT_NAME Certificate Authority +EOF + +expect <<-EOF + spawn /bin/openssl genrsa -aes256 -out ${ca_key} 2048 + expect "Enter pass phrase for" + send "${ca_password}\r" + expect "Verifying - Enter pass phrase for" + send "${ca_password}\r" + expect eof +EOF + +expect <<-EOF + spawn /bin/openssl req -new -out ca.req -key ${ca_key} -config ca.conf + expect "Enter pass phrase for" + send "${ca_password}\r" + expect eof +EOF + +expect <<-EOF + spawn /bin/openssl x509 -req -in ca.req -signkey ${ca_key} -days 7300 -out ${ca_crt} + expect "Enter pass phrase for" + send "${ca_password}\r" + expect eof +EOF + +mv ${ca_crt} ${ca_key} ${CURRENT_DIR}/${CA} +rm ca.req +chmod 600 `find ${CURRENT_DIR}/${CA} -type f` + + diff --git a/src/gausskernel/dbmind/tools/Detection/shell/gen_certificate.sh b/src/gausskernel/dbmind/tools/Detection/shell/gen_certificate.sh new file mode 100644 index 000000000..dfaa3506f --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/shell/gen_certificate.sh @@ -0,0 +1,63 @@ +source ./initialize_certificate.sh + +CA_CRT="${CURRENT_DIR}/${CA}/ca.crt" +CA_KEY="${CURRENT_DIR}/${CA}/ca.key" +pwf="${CURRENT_DIR}/${PW_FILE}" +local_host="" +ca_password="" +ssl_password="" +base_dir="" +file_name="" + +if [ ! -f ${CA_CRT} ]; then + echo "not found ${CA_CRT}." + exit 0 +fi + +if [ ! -f ${CA_KEY} ]; then + echo "not found ${CA_KEY}." + exit 0 +fi + +read -p "please input the basename of ssl certificate: " base_dir +read -p "please input the filename of ssl certificate: " file_name +read -p "please input the loca host: " local_host +read -s -p "please input the password of ca and ssl separated by space: " ca_password ssl_password + +if [ ! -d ${base_dir}/ ]; then + mkdir -p ${base_dir} +fi + +key="${base_dir}/${file_name}.key" +crt="${base_dir}/${file_name}.crt" +req="${base_dir}/${file_name}.req" + +expect <<-EOF + spawn /bin/openssl genrsa -aes256 -out ${key} 2048 + expect "Enter pass phrase for" + send "${ssl_password}\r" + expect "Verifying - Enter pass phrase for" + send "${ssl_password}\r" + expect eof +EOF + +expect <<-EOF + spawn /bin/openssl req -new -out ${req} -key ${key} -subj "/C=CN/ST=Some-State/O=${file_name}/CN=${local_host}" + expect "Enter pass phrase for" + send "${ssl_password}\r" + expect eof +EOF + +expect <<-EOF + spawn /bin/openssl x509 -req -in ${req} -out ${crt} -sha256 -CAcreateserial -days 7000 -CA ${CA_CRT} -CAkey ${CA_KEY} + expect "Enter pass phrase for" + send "${ca_password}\r" + expect eof +EOF + +rm ${req} + +echo "${ssl_password}">${pwf} +chmod 600 ${key} +chmod 600 ${crt} +chmod 600 ${pwf} diff --git a/src/gausskernel/dbmind/tools/Detection/shell/initialize_certificate.sh b/src/gausskernel/dbmind/tools/Detection/shell/initialize_certificate.sh new file mode 100644 index 000000000..719ddb350 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/shell/initialize_certificate.sh @@ -0,0 +1,19 @@ +source ./common.sh + +SERVER="certificate/server" +AGENT="certificate/agent" +CA="certificate/ca" +PW_FILE="certificate/pwf" + +if [ ! -d ${CURRENT_DIR}/${SERVER} ]; then + mkdir -p ${CURRENT_DIR}/${SERVER} +fi + +if [ ! -d ${CURRENT_DIR}/${AGENT} ]; then + mkdir -p ${CURRENT_DIR}/${AGENT} +fi + +if [ ! -d ${CURRENT_DIR}/${CA} ]; then + mkdir -p ${CURRENT_DIR}/${CA} +fi + diff --git a/src/gausskernel/dbmind/tools/Detection/structure.png b/src/gausskernel/dbmind/tools/Detection/structure.png new file mode 100644 index 000000000..8f692355e Binary files /dev/null and b/src/gausskernel/dbmind/tools/Detection/structure.png differ diff --git a/src/gausskernel/dbmind/tools/Detection/task/__pycache__/metric_task.cpython-37.pyc b/src/gausskernel/dbmind/tools/Detection/task/__pycache__/metric_task.cpython-37.pyc new file mode 100644 index 000000000..00a16494a Binary files /dev/null and b/src/gausskernel/dbmind/tools/Detection/task/__pycache__/metric_task.cpython-37.pyc differ diff --git a/src/gausskernel/dbmind/tools/Detection/task/metric_task.conf b/src/gausskernel/dbmind/tools/Detection/task/metric_task.conf new file mode 100644 index 000000000..46656ac45 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/task/metric_task.conf @@ -0,0 +1,32 @@ +[cpu_usage] +minimum = 30 +maximum = 100 +data_period = 100S +forecast_period = 100S + +[memory_usage] +minimum = 20 +maximum = 100 +data_period = 20 +forecast_interval = 20S +forecast_period = 20S + +[io_read] +minimum = 30 +maximum = 100 +data_period = 30 +forecast_interval = 30S +forecast_period = 30S + +[io_write] +minimum = 50 +maximum = 100 +data_period = 40S +forecast_interval = 40S + +[disk_space] +minimum = 10 +maximum = 100 +data_period = 50S +forecast_interval = 50S +forecast_period = 50S diff --git a/src/gausskernel/dbmind/tools/Detection/task/metric_task.py b/src/gausskernel/dbmind/tools/Detection/task/metric_task.py new file mode 100644 index 000000000..927dd8d06 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/task/metric_task.py @@ -0,0 +1,67 @@ +import os +import subprocess + +from utils import unify_byte_unit + + +def cpu_usage(): + child1 = subprocess.Popen(['ps', '-ux'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) + child2 = subprocess.Popen(['grep', 'gaussd[b]'], stdin=child1.stdout, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, shell=False) + sub_chan = child2.communicate() + if not sub_chan[0]: + result = 0.0 + else: + result = sub_chan[0].split()[2].decode('utf-8') + return result + + +def io_read(): + child1 = subprocess.Popen(['pidstat', '-d'], stdout=subprocess.PIPE, shell=False) + child2 = subprocess.Popen(['grep', 'gaussd[b]'], stdin=child1.stdout, stdout=subprocess.PIPE, shell=False) + sub_chan = child2.communicate() + if not sub_chan[0]: + result = 0.0 + else: + result = sub_chan[0].split()[3].decode('utf-8') + return result + + +def io_write(): + child1 = subprocess.Popen(['pidstat', '-d'], stdout=subprocess.PIPE, shell=False) + child2 = subprocess.Popen(['grep', 'gaussd[b]'], stdin=child1.stdout, stdout=subprocess.PIPE, shell=False) + sub_chan = child2.communicate() + if not sub_chan[0]: + result = 0.0 + else: + result = sub_chan[0].split()[4].decode('utf-8') + return result + + +def memory_usage(): + child1 = subprocess.Popen(['ps', '-ux'], stdout=subprocess.PIPE, shell=False) + child2 = subprocess.Popen(['grep', 'gaussd[b]'], stdin=child1.stdout, stdout=subprocess.PIPE, shell=False) + sub_chan = child2.communicate() + if not sub_chan[0]: + result = 0.0 + else: + result = sub_chan[0].split()[3].decode('utf-8') + return result + + +def disk_space(): + pg_data = os.getenv('PGDATA') + if pg_data is None: + raise ValueError('not found PGDATA in environment.') + else: + pg_data = os.path.realpath(pg_data) + child = subprocess.Popen(['du', '-sh', pg_data], stdout=subprocess.PIPE, shell=False) + sub_chan = child.communicate() + if sub_chan[1] is not None: + raise ValueError('error when get disk usage of openGauss: {error}'. + format(error=sub_chan[1].decode('utf-8'))) + if not sub_chan[0]: + result = 0.0 + else: + result = unify_byte_unit(sub_chan[0].decode('utf-8')) + return result diff --git a/src/gausskernel/dbmind/tools/Detection/test/agent_module.py b/src/gausskernel/dbmind/tools/Detection/test/agent_module.py new file mode 100644 index 000000000..521a24cec --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/test/agent_module.py @@ -0,0 +1,11 @@ +import os +import sys + +sys.path.append((os.path.dirname(os.getcwd()))) + +from agent import start_agent + +config_path = '../a-detection.conf' + +if __name__ == '__main__': + start_agent(config_path) diff --git a/src/gausskernel/dbmind/tools/Detection/test/data/metric.db b/src/gausskernel/dbmind/tools/Detection/test/data/metric.db new file mode 100644 index 000000000..d7bb4124b Binary files /dev/null and b/src/gausskernel/dbmind/tools/Detection/test/data/metric.db differ diff --git a/src/gausskernel/dbmind/tools/Detection/test/datahandler_module.py b/src/gausskernel/dbmind/tools/Detection/test/datahandler_module.py new file mode 100644 index 000000000..2b9716836 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/test/datahandler_module.py @@ -0,0 +1,20 @@ +import sys +sys.path.append('../') +from detector.monitor import data_handler + + +dh = data_handler.DataHandler('io_read', './data/metric.db') + +#with dh('io_read', './data/metric.db') as db: +# ts = db.get_timeseries(period=10) +# print(ts) +# print(len(ts)) +# ts = db.get_timeseries(period='10S') +# print(ts) +# print(len(ts)) + + +dh.connect_db() +ts = dh.get_timeseries(period='10S') +print(ts) +dh.close() diff --git a/src/gausskernel/dbmind/tools/Detection/test/log/abnormal.log b/src/gausskernel/dbmind/tools/Detection/test/log/abnormal.log new file mode 100644 index 000000000..e69de29bb diff --git a/src/gausskernel/dbmind/tools/Detection/test/log/monitor.log b/src/gausskernel/dbmind/tools/Detection/test/log/monitor.log new file mode 100644 index 000000000..e69de29bb diff --git a/src/gausskernel/dbmind/tools/Detection/test/monitor_module.py b/src/gausskernel/dbmind/tools/Detection/test/monitor_module.py new file mode 100644 index 000000000..5f03a515b --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/test/monitor_module.py @@ -0,0 +1,12 @@ +import os +import sys + +sys.path.append((os.path.dirname(os.getcwd()))) + +from detector.monitor import start_monitor + +config_path = '../a-detection.conf' +metric_config_path = '../task/metric_task.conf' + +if __name__ == '__main__': + start_monitor(config_path, metric_config_path) diff --git a/src/gausskernel/dbmind/tools/Detection/test/server_module.py b/src/gausskernel/dbmind/tools/Detection/test/server_module.py new file mode 100644 index 000000000..ea891ff43 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/test/server_module.py @@ -0,0 +1,17 @@ +import os +import sys + +sys.path.append((os.path.dirname(os.getcwd()))) + +from detector.server import start_service + +config_path = '../a-detection.conf' + +if __name__ == '__main__': + start_service(config_path) + +from urllib import parse + +import urllib + +urllib.parse.qu diff --git a/src/gausskernel/dbmind/tools/Detection/tools/common.sh b/src/gausskernel/dbmind/tools/Detection/tools/common.sh new file mode 100644 index 000000000..29d704880 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/tools/common.sh @@ -0,0 +1,6 @@ +CURRENT_DIR=$(cd ../$(dirname $0); pwd) +BASENAME=$(basename $CURRENT_DIR) + +MONITOR_PID="monitor.pid" +SERVER_PID="server.pid" +AGENT_PID="agent.pid" diff --git a/src/gausskernel/dbmind/tools/Detection/tools/start.sh b/src/gausskernel/dbmind/tools/Detection/tools/start.sh new file mode 100644 index 000000000..f0e838140 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/tools/start.sh @@ -0,0 +1,137 @@ +source ./common.sh + + +function usage() +{ + echo "usage: $0 [option] + --help + --deploy_code + --start_agent + --start_detector + --start_all + " +} + + +function start_agent() +{ + cd ${CURRENT_DIR} + nohup python main.py -r agent > /dev/null 2>&1 & +} + + +function start_detector() +{ + local user="" + local host="" + local detector_path="" + local password="" + local port=22 + + read -p "please input the user of detector: " user + read -p "please input the host of detector: " host + read -p "please input the path of detector: " detector_path + read -s -p "please input the password of ${user}@${host}: " password + +expect <<-EOF + spawn ssh ${host} -p ${port} -l ${user} + expect { + "(yes/no)?" { + send "yes\r" + expect "*assword:" + send "${password}\r" + } + "*assword:" { + send "${password}\r" + } + "Last login:" { + send "\r" + } + + } + send "\r" + expect "*]*" + send "cd ${detector_path}/${BASENAME}\r" + expect "*]*" + send "nohup python main.py -r detector > /dev/null 2>&1 &\r" + expect "*]*" + send "exit\r" + expect eof +EOF + +} + + +function deploy_code() +{ + local user="" + local host="" + local detector_path="" + + read -p "please input the user of detector: " user + read -p "please input the host of detector: " host + read -p "please input the path of detector: " detector_path + read -s -p "please input the password of ${user}@${host}: " password + +expect <<-EOF + spawn scp -r ${CURRENT_DIR} ${user}@${host}:${detector_path} + expect { + "(yes/no)?" { + send "yes\r" + expect "*assword:" + send "${password}\r" + } + "*assword" { + send "${password}\r" + } +} + expect eof +EOF + + +} + + +function start_all() +{ + start_agent + start_detector +} + + +function main() +{ + if [ $# -ne 1 ]; then + usage + exit 1 + fi + + case "$1" in + --help) + usage + break + ;; + --start_agent) + start_agent + break + ;; + --start_detector) + start_detector + break + ;; + --deploy_code) + deploy_code + break + ;; + --start_all) + start_all + break + ;; + *) + echo "unknown arguments" + ;; + esac +} + + +main $@ diff --git a/src/gausskernel/dbmind/tools/Detection/tools/stop.sh b/src/gausskernel/dbmind/tools/Detection/tools/stop.sh new file mode 100644 index 000000000..16a0d98f8 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/tools/stop.sh @@ -0,0 +1,100 @@ +source ./common.sh + + +function usage() +{ + echo "usage: $0 [option] + --help + --stop_agent + --stop_detector + --stop_all + " +} + + +function stop_agent() +{ + cat ${CURRENT_DIR}/${AGENT_PID} | xargs kill -9 +} + + +function stop_detector() +{ + local user="" + local host="" + local detector_path="" + local password="" + local port=22 + + read -p "please input the user of detector: " user + read -p "please input the host of detector: " host + read -p "please input the path of detector: " detector_path + read -s -p "please input the password of ${user}@${host}: " password + +expect <<-EOF + spawn ssh ${host} -p ${port} -l ${user} + expect { + "(yes/no)?" { + send "yes\r" + expect "*assword:" + send "${password}\r" + } + "*assword:" { + send "${password}\r" + } + "Last login:" { + send "\r" + } + + } + send "\r" + expect "*]*" + send "cat ${detector_path}/${BASENAME}/${MONITOR_PID} | xargs kill -9\r" + expect "*]*" + send "cat ${detector_path}/${BASENAME}/${SERVER_PID} | xargs kill -9\r" + expect "*]*" + send "exit\r" + expect eof +EOF +} + + +function stop_all() +{ + stop_agent + stop_detector +} + + +function main() +{ + if [ $# -ne 1 ]; then + usage + exit 1 + fi + + case "$1" in + --help) + usage + break + ;; + --stop_agent) + stop_agent + break + ;; + --stop_detector) + stop_detector + break + ;; + --stop_all) + stop_all + break + ;; + *) + echo "unknown arguments" + ;; + esac +} + + +main $@ diff --git a/src/gausskernel/dbmind/tools/Detection/utils.py b/src/gausskernel/dbmind/tools/Detection/utils.py new file mode 100644 index 000000000..c87d87523 --- /dev/null +++ b/src/gausskernel/dbmind/tools/Detection/utils.py @@ -0,0 +1,132 @@ +import inspect +import logging +import pydoc +import re +import subprocess +from datetime import datetime, timedelta +from logging import handlers +from threading import Thread, Event + +import dateutil.parser + + +class RepeatTimer(Thread): + def __init__(self, interval, function, args=None, kwargs=None): + Thread.__init__(self) + self.interval = interval + self.function = function + self.args = args if args is not None else [] + self.kwargs = kwargs if kwargs is not None else {} + self.finished = Event() + + def cancel(self): + self.finished.set() + + def run(self): + while not self.finished.is_set(): + self.finished.wait(self.interval) + self.function(*self.args, **self.kwargs) + self.finished.set() + + +def transform_time_string(time_str, mode='timedelta'): + """ + only support 'weeks, days, hours, minutes, seconds + W: week, D: days, H: hours, M: minutes, S: seconds + """ + if mode not in ('timedelta', 'to_second'): + raise ValueError('wrong mode {mode} in time_transfer.'.format(mode=mode)) + + time_num, time_flag = re.match(r'(\d+)?([WDHMS])', time_str).groups() + + if time_flag is None: + raise ValueError('wrong format {time_str} for time_str in time_transfer.'.format(time_str=time_str)) + + if time_num is None: + time_num = 1 + else: + time_num = int(time_num) + + timedelta_mapper = {'W': timedelta(weeks=1), + 'D': timedelta(days=1), + 'H': timedelta(hours=1), + 'M': timedelta(minutes=1), + 'S': timedelta(seconds=1)} + + second_mapper = {'W': 7 * 24 * 3600, 'D': 24 * 3600, 'H': 3600, 'M': 60, 'S': 1} + + if mode == 'timedelta': + return timedelta_mapper.get(time_flag) * time_num + if mode == 'to_second': + return second_mapper.get(time_flag) * time_num + + +def detection_logger(log_name, log_path, level): + logger = logging.getLogger(log_name) + agent_handler = handlers.RotatingFileHandler(filename=log_path, + maxBytes=1024 * 1024 * 100, + backupCount=5) + agent_handler.setFormatter(logging.Formatter("[%(asctime)s %(levelname)s]-[%(name)s]: %(message)s")) + logger.addHandler(agent_handler) + logger.setLevel(getattr(logging, level.upper()) if hasattr(logging, level.upper()) else logging.INFO) + return logger + + +def unify_byte_unit(byte_info): + """ + func: transfer unit of K、M、G、T、P to M + """ + byte_info = byte_info.upper() + bytes_num, bytes_unit = re.match(r'^(\d+|\d+\.\d+)([KMGTP])', byte_info).groups() + if bytes_num is None or bytes_unit is None or bytes_unit not in 'KMGTP': + raise ValueError('can not parse format of {bytes}'.format(bytes=byte_info)) + byte_unit_mapper = {'K': 1 / 1024, 'M': 1, 'G': 1024, 'T': 1024 * 1024, 'P': 1024 * 1024 * 1024} + return byte_unit_mapper[bytes_unit] * int(float(bytes_num)) + + +def get_funcs(thing): + """ + return functions in python file + """ + funcs = [] + _object, _ = pydoc.resolve(thing) + _all = getattr(_object, '__all__', None) + for key, value in inspect.getmembers(_object, inspect.isroutine): + if _all is not None or inspect.isbuiltin(value) or inspect.getmodule(value) is _object: + if pydoc.visiblename(key, _all, _object): + funcs.append((key, value)) + return funcs + + +def check_certificate(certificate_path): + """ + check certificate validity + """ + check_result = {} + certificate_waring_threshold = 365 + child = subprocess.Popen(['openssl', 'x509', '-in', certificate_path, '-noout', '-dates'], + shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE) + sub_chan = child.communicate() + if sub_chan[1] or not sub_chan[0]: + check_result['status'] = 'fail' + else: + check_result['status'] = 'success' + not_after = sub_chan[0].decode('utf-8').split('\n')[1].split('=')[1].strip() + end_time = dateutil.parser.parse(not_after).replace(tzinfo=None) + certificate_remaining_days = (end_time - datetime.now()).days + if 0 < certificate_remaining_days < certificate_waring_threshold: + check_result['level'] = 'warn' + check_result['info'] = "the '{certificate}' has {certificate_remaining_days} days before out of date." \ + .format(certificate=certificate_path, + certificate_remaining_days=certificate_remaining_days) + elif certificate_remaining_days >= certificate_waring_threshold: + check_result['level'] = 'info' + check_result['info'] = "the '{certificate}' has {certificate_remaining_days} days before out of date." \ + .format(certificate=certificate_path, + certificate_remaining_days=certificate_remaining_days) + else: + check_result['level'] = 'error' + check_result['info'] = "the '{certificate}' is out of date." \ + .format(certificate=certificate_path) + return check_result +