rename A-Detection to Detection

This commit is contained in:
l00520705
2020-12-03 16:27:44 +08:00
committed by wangtq
parent a47d910712
commit 10af77f691
47 changed files with 2097 additions and 0 deletions

View File

@ -0,0 +1,245 @@
![structure](structure.png)
## Introduction to Detection
**Detection** is a monitor and abnormality-detect tool based on timeseries-forecast algorithm aim at openGauss
metrics, such as IO-Read、IO-Write、CPU-Usage、Memory-Usage、Disk-Usage. Detection can monitor multi-metric at same
time, and forecast trend of metric in future, if the forecast value in future is beyond the specified scope, it can
notify user timely.
Detection is composed of two element, **agent** and **detector**, **agent** is deployed on same machine with openGauss,
and **detector** can be deployed on any machine which can correspond with agent by _http_ or _https_, for security
reason, we suggest use _https_.
## Detection Installition
we suggest to use _anaconda_ to manage your python environment
**agent**
python3.6+
python-dateutil
configparse
**detector**
python3.6+
pystan
python-dateutil
fbprophet
pandas
flask
flask_sqlalchemy
flask_restful
configparse
notes:
using ```python -m pip install --upgrade pip``` to upgrade your pip
using ```conda install fbprophet``` to install fbprophet
## Introduction to a-detection.conf
the config is divided into section: `database`, `server`, `agent`, `forecast`, `log`, `security`
in all sections in config, the `log` and `security` section is public, in addition to this, `agent` is used for
**agent** module, `database`, `server`, `forecast` is used in **detector** module. so you should note the
path in every section, make it clear that it is on **agent** or **detector**.
[agent]
# timer of source to collect metric info
source_timer_interval = 1
# timer of sink to send data to server
sink_timer_interval = 1
# maxsize of channel: default value is 300
channel_capacity = 300
[security]
# config for https, if `tls` is False, use http instead
tls = True
ca = ./certificate/ca/ca.crt
server_cert = ./certificate/server/server.crt
server_key = ./certificate/server/server.key
agent_cert = ./certificate/agent/agent.crt
agent_key = ./certificate/agent/agent.key
[database]
# location of sqlite
database_path = ./data/sqlite.db
# max rows of table in sqlite, in order to prevent table is too large.
max_rows = 10000
# frequency to remove surplus rows in table
max_flush_cache = 1000
[server]
host = 127.0.0.1
# listen host of app
listen_host = 0.0.0.0
# listen port of app
listen_port = 8080
[forecast]
# forecast algorithm, fbprophet represent facebook's prophet algorithm
predict_alg = fbprophet
[log]
# relative dirname of log
log_dir = ./log
## Quick Guide
###1. deploy certificate
if you want to correspond with 'https', you should set `tls = True` firstly, then you should own certificate,
then place the certificate in the appropriate location(`./certificate/ca`, `./certificate/server`, `./certificate/agent`).
if `tls=False`, then will use 'http' method
wo provide demo [script](shell) to generate certificate
use [script](shell/gen_ca_certificate.sh) to generate ca certificate and secret key. the same goes for other
certificates
sh gen_ca_certificate.sh
this script will create dirname `certificate` in project, it include three sub-dirname named `ca`, `server`, `agent`,
ca certificate and secret key will be placed in `./certificate/ca`.
you can also use your own ca certificate, just place it in `ca`.
use [script](shell/gen_certificate.sh) to generate server certificate and secret key.
sh gen_certificate.sh
after generating certificate and secret key, you should place it in corresponding sub-dirname(`./certificate/server` or
`./certificate/agent`)
###2. Install openGauss
in our [task/metric_task.py](task/metric_task.py), the program will acquire openGauss metrics or openGauss environment automatically,
if not install openGauss, then Exception will occur.
###3. Deploy program
just as said above, the Detection is composed of two modules, respectively are **agent** and **detector**, the agent is
deployed with openGauss, and the detector can be deployed at any machine which can correspond with agent machine.
###4 Start program
*step 1: deploy code*
you can copy the code to other machine or use [script](bin/start.sh)
sh start.sh --deploy_code
*step 2: start detector in machine which has installed openGauss*
nohup python main.py -r detector > /dev/null 2>&1 &
or
sh start.sh --start_detector
*step 3: start agent in agent machine*
nohup python main.py -r agent > /dev/null 2>&1 &
or
shb start.sh --start_agent
you can use [script](bin/stop.sh) to stop agent and detector process
###5 Obeserve result
the program has four logs file, respectively are **agent.log**, **server.log**, **monitor.log**, **abnormal.log**.
agent.log: this log record running status of agent module.
server.log: this log record running status of app
monitor.log: this log record monitor status such as forecasting, detecting, etc.
abnormal.log: this log record abnormal status of monitor metric
## Introduction to task
the monitor metric is defined in [task/metric_task.py](task/metric_task.py), the function should return metric value.
the monitor metric is configured in [task/metric_task.conf](task/metric_task.conf)
### How to add monitor metric
it is very easy to add metric that you want:
*step 1: write code in [task/metric_task.py](task/metric_task.py) which get the value of metric.*
*step 2: add metric config in [task/metric_task.conf](task/metric_task.conf)
instruction of metric config:
[cpu_usage_usage]
minimum = 20
maximum = 100
data_period = 2000
forecast_interval = 20S
forecast_period = 150S
int config of cpu_usage:
'maximum': maximum allowable value of cpu_usage, it is considered as Exception if value is highed than it.
'minimum': minimum allowable value of cpu_usage, it is considered as Exception if value is lower than it.
note: you should at least provide one of it, if not, the metric will not be monitored.
'data_period': the value of 'data_period' reprensent time interval or length from now. for example, if we
want to get last 100 second data from now, then the value of 'data_period' is '100S'; if we want to get
last 20 days from now, then 'data_period' is '20D'; if we want to get last 1000 datasets, then the
'data_period' is 1000
'forecast_interval': the interval of predict operation. for example: if we want to predict 'cpu_usage' every
10 seconds, then the value of 'interval' is '10S'.
'forecast_period': the forecast length, for example, if we want to forecast value of cpu_usage in the future
100 seconds at frequency of '1S', then the value of 'forecast_period' should be 100S.
notes: 'S' -> second
'M' -> minute
'H' -> hour
'D' -> day
'W' -> week
for example:
if we want to monitor io_read of openGauss:
*step 1:*
task/metric_task.py
def io_read():
child1 = subprocess.Popen(['pidstat', '-d'], stdout=subprocess.PIPE, shell=False)
child2 = subprocess.Popen(['grep', 'gaussd[b]'], stdin=child1.stdout, stdout=subprocess.PIPE, shell=False)
result = child2.communicate()
if not result[0]:
return 0.0
else:
return result[0].split()[3].decode('utf-8')
*step2:*
config.conf
[io_read]
minimum = 30
maximum = 100
data_period = 1000
forecast_interval = 25S
forecast_period = 200S
*step3:*
restart your project

View File

@ -0,0 +1,28 @@
[database]
max_rows = 10000
max_flush_cache = 1000
database_path = ./data/metric.db
[security]
tls = False
ca = ./certificate/ca/ca.crt
server_cert = ./certificate/server/server.crt
server_key = ./certificate/server/server.key
agent_cert = ./certificate/agent/agent.crt
agent_key = ./certificate/agent/agent.key
[server]
host = 127.0.0.1
listen_host = 0.0.0.0
listen_port = 8080
[agent]
source_timer_interval = 1S
sink_timer_interval = 1S
channel_capacity = 100
[forecast]
forecast_alg = fbprophet
[log]
log_dir = ./log

View File

@ -0,0 +1 @@
from .metric_agent import start_agent

View File

@ -0,0 +1,15 @@
import os
from configparser import ConfigParser
from main import config_path
from utils import detection_logger
config = ConfigParser()
config.read(config_path)
log_dir_realpath = os.path.realpath(config.get('log', 'log_dir'))
if not os.path.exists(log_dir_realpath):
os.makedirs(log_dir_realpath)
logger = detection_logger(level='INFO',
log_name='agent',
log_path=os.path.join(log_dir_realpath, 'agent.log'))

View File

@ -0,0 +1,73 @@
from queue import Queue, Empty, Full
from .agent_logger import logger
class Channel:
def __init__(self):
pass
def put(self, event):
pass
def take(self):
pass
def size(self):
pass
class MemoryChannel(Channel):
def __init__(self, name, maxsize=None):
Channel.__init__(self)
self.name = name
self.maxsize = maxsize
self.memory = Queue(maxsize)
def put(self, event):
if self.maxsize and self.size() > self.maxsize:
logger.warn("Channel {name} has reach queue maxsize".format(name=self.name))
try:
self.memory.put(event, block=True, timeout=0.2)
except Full:
logger.warn("throw away {name} data when reach maxsize".format(name=self.name))
def take(self):
try:
return self.memory.get_nowait()
except Empty:
logger.warn('Channel {name} is empty.'.format(name=self.name))
return None
def size(self):
rv = self.memory.qsize()
return 0 if rv is None else rv
class ChannelManager:
def __init__(self):
self._channels = {}
def add_channel(self, name, maxsize):
self._channels[name] = MemoryChannel(name=name, maxsize=maxsize)
logger.info('Channel {name} is created.'.format(name=name))
def get_channel(self, name):
return self._channels[name]
def check(self, name):
if name not in self._channels:
return False
return True
def get_channel_content(self):
contents = {}
for name, queue in self._channels.items():
event = queue.take()
if event is not None:
contents[name] = event
return contents
def size(self):
return len(self._channels)

View File

@ -0,0 +1,49 @@
import threading
import time
from .agent_logger import logger
from .source import Source
class TaskHandler(threading.Thread):
def __init__(self, interval, function, *args, **kwargs):
threading.Thread.__init__(self)
self._func = function
self._interval = interval
self._args = args
self._kwargs = kwargs
self._finished = threading.Event()
self._res = None
self._channel = None
def set_channel(self, channel):
self._channel = channel
def run(self):
while not self._finished.is_set():
try:
self._res = self._func(*self._args, **self._kwargs)
self._channel.put({'timestamp': int(time.time()), 'value': self._res})
except Exception as e:
logger.exception(e)
self._finished.wait(self._interval)
def cancel(self):
self._finished.set()
class DBSource(Source):
def __init__(self):
Source.__init__(self)
self.running = False
self._tasks = {}
def add_task(self, name, interval, task, maxsize, *args, **kwargs):
if name not in self._tasks:
self._tasks[name] = TaskHandler(interval, task, *args, **kwargs)
self._channel_manager.add_channel(name, maxsize)
self._tasks[name].set_channel(self._channel_manager.get_channel(name))
def start(self):
for _, task in self._tasks.items():
task.start()

View File

@ -0,0 +1,110 @@
import os
import ssl
from configparser import ConfigParser, NoOptionError
from task import metric_task
from utils import get_funcs, transform_time_string, check_certificate
from .agent_logger import logger
from .channel import ChannelManager
from .db_source import DBSource
from .sink import HttpSink
def start_agent(config_path):
if not os.path.exists(config_path):
logger.error('{config_path} is not exist..'.format(config_path=config_path))
return
config = ConfigParser()
config.read(config_path)
if not config.has_section('agent') or not config.has_section('server'):
logger.error("do not has 'agent' or 'server' section in config file...")
return
if not config.has_option('server', 'host') or not config.has_option('server', 'listen_port'):
logger.error("do not has 'host' or 'listen_port' in 'server' section...")
return
else:
context = None
if config.has_option('security', 'tls') and config.getboolean('security', 'tls'):
url = 'https://' + config.get('server', 'host') + ':' + config.get('server', 'listen_port') + '/sink'
try:
agent_cert = os.path.realpath(config.get('security', 'agent_cert'))
agent_key = os.path.realpath(config.get('security', 'agent_key'))
ca = os.path.realpath(config.get('security', 'ca'))
except NoOptionError as e:
logger.error(e)
return
else:
logger.info(agent_cert)
logger.info(agent_key)
logger.info(ca)
ssl_certificate_status = check_certificate(agent_cert)
ca_certificate_status = check_certificate(ca)
if ssl_certificate_status['status'] == 'fail':
logger.error("error occur when check '{certificate}'.".format(certificate=agent_cert))
else:
if ssl_certificate_status['level'] == 'info':
logger.info(ssl_certificate_status['info'])
elif ssl_certificate_status['level'] == 'warn':
logger.warn(ssl_certificate_status['info'])
else:
logger.error(ssl_certificate_status['info'])
return
if ca_certificate_status['status'] == 'fail':
logger.error("error occur when check '{certificate}'.".format(certificate=ca))
else:
if ca_certificate_status['level'] == 'info':
logger.info(ca_certificate_status['info'])
elif ca_certificate_status['level'] == 'warn':
logger.warn(ca_certificate_status['info'])
else:
logger.error(ca_certificate_status['info'])
return
pw_file = os.path.join(os.path.dirname(config_path), 'certificate/pwf')
with open(pw_file, mode='r') as f:
pw = f.read().strip()
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=ca)
context.check_hostname = False
context.load_cert_chain(certfile=agent_cert, keyfile=agent_key, password=pw)
else:
logger.warn("detect not config 'ssl certificate', use 'http' instead.[advise use 'https']")
url = 'http://' + config.get('server', 'host') + ':' + config.get('server', 'listen_port') + '/sink'
default_agent_parameter_dicts = {'sink_timer_interval': '10S',
'source_timer_interval': '10S',
'channel_capacity': 1000}
for parameter, default_value in default_agent_parameter_dicts.items():
if not config.has_option('agent', parameter):
logger.warn("do not provide '{parameter}' in 'agent' section, use default '{default_value}'."
.format(parameter=parameter,
default_value=default_agent_parameter_dicts['sink_timer_interval']))
value = default_value
else:
value = config.get('agent', parameter)
try:
if parameter in ('sink_timer_interval', 'source_timer_interval'):
globals()[parameter] = transform_time_string(value, mode='to_second')
if parameter == 'channel_capacity':
globals()[parameter] = int(value)
except Exception as e:
logger.error(e)
return
chan = ChannelManager()
source = DBSource()
http_sink = HttpSink(interval=globals()['sink_timer_interval'], url=url, context=context)
source.channel_manager = chan
http_sink.channel_manager = chan
for task_name, task_func in get_funcs(metric_task):
source.add_task(name=task_name,
interval=globals()['source_timer_interval'],
task=task_func,
maxsize=globals()['channel_capacity'])
source.start()
http_sink.start()

View File

@ -0,0 +1,59 @@
import json
import time
from urllib import request
from .agent_logger import logger
header = {'Content-Type': 'application/json'}
class Sink:
def __init__(self):
self._channel_manager = None
self.running = False
@property
def channel_manager(self):
return self._channel_manager
@channel_manager.setter
def channel_manager(self, channel_manager):
self._channel_manager = channel_manager
def process(self):
pass
def start(self):
self.running = True
self.process()
def stop(self):
self.running = False
class HttpSink(Sink):
def __init__(self, interval, url, context):
Sink.__init__(self)
self._interval = interval
self.running = False
self._url = url
self.context = context
def process(self):
logger.info('begin send data to {url}'.format(url=self._url))
while self.running:
time.sleep(self._interval)
contents = self._channel_manager.get_channel_content()
if contents:
while True:
try:
req = request.Request(self._url, headers=header, data=json.dumps(contents).encode('utf-8'),
method='POST')
request.urlopen(req, context=self.context)
break
except Exception as e:
logger.warn(e, exc_info=True)
time.sleep(0.5)
else:
logger.warn('Not found data in each channel.')

View File

@ -0,0 +1,17 @@
class Source:
def __init__(self):
self._channel_manager = None
def start(self):
pass
def stop(self):
pass
@property
def channel_manager(self):
return self._channel_manager
@channel_manager.setter
def channel_manager(self, channel_manager):
self._channel_manager = channel_manager

View File

@ -0,0 +1,8 @@
from .fb_prophet import FacebookProphet
def forecast_algorithm(method):
if method == 'fbprophet':
return FacebookProphet
else:
raise Exception('No {method} forecast method.'.format(method=method))

View File

@ -0,0 +1,61 @@
import os
import pickle
import re
import time
import pandas as pd
from fbprophet import Prophet
from .model import AlgModel
date_format = "%Y-%m-%d %H:%M:%S"
class FacebookProphet(AlgModel):
def __init__(self):
AlgModel.__init__(self)
self.model = None
self.date_unit_mapper = {'S': 'S',
'M': 'T',
'H': 'H',
'D': 'D',
'W': 'W'}
def fit(self, timeseries):
try:
timeseries = pd.DataFrame(timeseries, columns=['ds', 'y'])
timeseries['ds'] = timeseries['ds'].map(lambda x: time.strftime(date_format, time.localtime(x)))
self.model = Prophet(yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=True)
self.model.fit(timeseries)
except Exception:
raise
def forecast(self, forecast_periods):
forecast_period, forecast_freq = re.match(r'(\d+)?([WDHMS])', forecast_periods).groups()
try:
# synchronize date-unit to fb-prophet
forecast_freq = self.date_unit_mapper[forecast_freq]
if forecast_period is None:
forecast_period = 1
else:
forecast_period = int(forecast_period)
future = self.model.make_future_dataframe(freq=forecast_freq,
periods=forecast_period,
include_history=False)
predict_result = self.model.predict(future)[['ds', 'yhat']]
predict_result['ds'] = predict_result['ds'].map(lambda x: x.strftime(date_format))
return predict_result.values[:, 0], predict_result.values[:, 1]
except Exception:
raise
def save(self, model_path):
with open(model_path, mode='wb') as f:
pickle.dump(self.model, f)
def load(self, model_path):
if not os.path.exists(model_path):
raise FileNotFoundError('%s not found.' % model_path)
with open(model_path, mode='rb') as f:
self.model = pickle.load(f)

View File

@ -0,0 +1,20 @@
from abc import abstractmethod
class AlgModel(object):
def __init__(self):
pass
@abstractmethod
def fit(self, timeseries):
pass
@abstractmethod
def forecast(self, forecast_periods):
pass
def save(self, model_path):
pass
def load(self, model_path):
pass

View File

@ -0,0 +1 @@
from .metric_monitor import start_monitor

View File

@ -0,0 +1,15 @@
import os
from configparser import ConfigParser
from main import config_path
from utils import detection_logger
config = ConfigParser()
config.read(config_path)
log_dir_realpath = os.path.realpath(config.get('log', 'log_dir'))
if not os.path.exists(log_dir_realpath):
os.makedirs(log_dir_realpath)
logger = detection_logger(level='INFO',
log_name='abnormal',
log_path=os.path.join(log_dir_realpath, 'abnormal.log'))

View File

@ -0,0 +1,100 @@
import os
import sqlite3
import time
from utils import transform_time_string
from .monitor_logger import logger
class DataHandler:
'''
process sqlite3 data, provide data to forecastor
'''
def __init__(self, table, dbpath):
self._table = table
self._dbpath = dbpath
self._conn = None
self._cur = None
def __enter__(self):
self.connect_db()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def connect_db(self):
if not os.path.exists(self._dbpath):
logger.error('{dbpath} not found'.format(dbpath=self._dbpath), exc_info=True)
return
self._conn = sqlite3.connect(self._dbpath)
self._cur = self._conn.cursor()
def select_timeseries_by_timestamp(self, period):
timeseries = []
times = 0
times_limit = 5
while times < times_limit:
try:
get_last_timestamp_sql = "select timestamp from {table} order by timestamp desc limit 1"
self._cur.execute(get_last_timestamp_sql.format(table=self._table))
last_timestamp = self._cur.fetchall()[0][0]
time_interval = transform_time_string(period, mode='to_second')
select_timestamp = last_timestamp - time_interval
get_timeseries_sql = "select timestamp, value from {table} where timestamp >= '{select_timestamp}'"
self._cur.execute(get_timeseries_sql.format(table=self._table, select_timestamp=select_timestamp))
timeseries = self._cur.fetchall()
if not timeseries:
logger.warn("get no timeseries from {table}', retry...".format(table=self._table))
else:
return timeseries
except Exception as e:
logger.exception('exception occur when get timeseries: {error}, retry...'.format(error=str(e)), exc_info=True)
times += 1
time.sleep(0.11)
return timeseries
def select_timeseries_by_length(self, length):
timeseries = []
times = 0
times_limit = 5
while times < times_limit:
try:
sql = "select * from (select * from {table} order by timestamp desc limit {length}) order by timestamp"
self._cur.execute(sql.format(table=self._table, length=length))
timeseries = self._cur.fetchall()
if not timeseries:
logger.warn("get no timeseries from {table}', retry...".format(table=self._table))
else:
return timeseries
except Exception as e:
logger.exception('exception occur when get timeseries: {error}'.format(error=str(e)), exc_info=True)
times += 1
time.sleep(0.11)
return timeseries
def get_timeseries(self, period):
if isinstance(period, int):
timeseries = self.select_timeseries_by_length(length=period)
else:
timeseries = self.select_timeseries_by_timestamp(period=period)
return timeseries
def check_table(self, table):
'''
check whether table exist in data
'''
sql = "select name from sqlite_master where type = 'table'"
self._cur.execute(sql)
tables = self._cur.fetchall()
tables = [item[0] for item in tables]
if table not in tables:
return False
return True
def close(self):
self._cur.close()
self._conn.close()

View File

@ -0,0 +1,98 @@
import types
from collections import OrderedDict
from functools import wraps
from itertools import groupby
from .abnormal_logger import logger as a_logger
from .monitor_logger import logger as m_logger
class Detector:
def __init__(self, func):
wraps(func)(self)
def __get__(self, instance, cls):
if instance is None:
return self
return types.MethodType(self, instance)
def __call__(self, *args, **kwargs):
def mapper_function(value):
if value > maximum:
result = (value, 'higher')
elif value < minimum:
result = (value, 'lower')
else:
result = (value, 'normal')
return result
forecast_result = self.__wrapped__(*args, *kwargs)
if forecast_result['status'] == 'fail':
return
metric_name = forecast_result['metric_name']
future_value = forecast_result['future_value']
future_date = forecast_result['future_date']
minimum = forecast_result['detect_basis']['minimum']
maximum = forecast_result['detect_basis']['maximum']
if minimum is None and maximum is not None:
minimum = '-inf'
value_map_result = list(map(lambda x: (x, 'higher') if x > maximum else (x, 'normal'), future_value))
elif maximum is None and minimum is not None:
maximum = 'inf'
value_map_result = list(map(lambda x: (x, 'lower') if x < minimum else (x, 'normal'), future_value))
else:
value_map_result = list(map(mapper_function, future_value))
forecast_condition = OrderedDict(zip(future_date, value_map_result))
for key, value in groupby(list(forecast_condition.items()), key=lambda item: item[1][1]):
metric_status = key
value_ = value
metric_date_value_scope = [(_item[0], _item[1][0]) for _item in value_]
maximum_forecast_value = round(max([_item[1] for _item in metric_date_value_scope]), 3)
minimum_forecast_value = round(min([_item[1] for _item in metric_date_value_scope]), 3)
if metric_status == 'normal':
if len(metric_date_value_scope) == 1:
m_logger.info('the forecast value of [{metric}]({minimum}~{maximum})'
' at {date} is ({forecast_value}) [{metric_status}].'
.format(metric=metric_name,
minimum=minimum,
maximum=maximum,
forecast_value=metric_date_value_scope[0][1],
metric_status=metric_status,
date=metric_date_value_scope[0][0]))
else:
m_logger.info('the forecast value of [{metric}]({minimum}~{maximum}) in '
'[{start_date}~{end_date}] is between ({minimum_forecast_value}'
'~{maximum_forecast_value}) [{metric_status}].'
.format(metric=metric_name,
minimum=minimum,
maximum=maximum,
minimum_forecast_value=minimum_forecast_value,
maximum_forecast_value=maximum_forecast_value,
metric_status=metric_status,
start_date=metric_date_value_scope[0][0],
end_date=metric_date_value_scope[-1][0]))
else:
if len(metric_date_value_scope) == 1:
a_logger.warn('the forecast value of [{metric}]({minimum}~{maximum})'
' at {date} is ({forecast_value}) [{metric_status}].'
.format(metric=metric_name,
minimum=minimum,
maximum=maximum,
forecast_value=metric_date_value_scope[1],
metric_status=metric_status,
date=metric_date_value_scope[0][0]))
else:
a_logger.warn('the forecast value of [{metric}]({minimum}~{maximum}) in '
'[{start_date}~{end_date}] is between ({minimum_forecast_value}'
'~{maximum_forecast_value}) [{metric_status}].'
.format(metric=metric_name,
minimum=minimum,
maximum=maximum,
minimum_forecast_value=minimum_forecast_value,
maximum_forecast_value=maximum_forecast_value,
metric_status=metric_status,
start_date=metric_date_value_scope[0][0],
end_date=metric_date_value_scope[-1][0]))

View File

@ -0,0 +1,61 @@
from detector.monitor import detect
from .monitor_logger import logger
class Forecastor:
"""
repeat execute by Timer
"""
def __init__(self, **kwargs):
self.minimum_timeseries_length = 20
self.metric_name = kwargs['metric_name']
self.database_path = kwargs['database_path']
self.data_handler = kwargs['data_handler']
self.forecast_alg = kwargs['forecast_alg']
self.forecast_period = kwargs['forecast_period']
self.forecast_interval = kwargs['forecast_interval']
self.data_period = kwargs['data_period']
self.detect_basis = {'minimum': kwargs.get('minimum', None),
'maximum': kwargs.get('maximum', None)}
@detect.Detector
def run(self):
forecast_result = {}
with self.data_handler(self.metric_name, self.database_path) as db:
timeseries = db.get_timeseries(period=self.data_period)
logger.info('acquire data[{metric_name}] -> data_period: {data_period} data_length: {length}'
.format(metric_name=self.metric_name,
data_period=self.data_period,
length=len(timeseries)))
if not timeseries:
logger.error("can not get timeseries from table [{metric_name}] by period '{period}', "
"skip forecast step for [{metric_name}]".format(metric_name=self.metric_name,
period=self.data_period))
forecast_result['status'] = 'fail'
else:
try:
if len(timeseries) < self.minimum_timeseries_length:
logger.warn(
"the length of timeseries[{metric_name}] is too short: [{ts_length}]."
.format(metric_name=self.metric_name,
ts_length=len(timeseries)))
self.forecast_alg.fit(timeseries)
self.forecast_period = self.forecast_period.upper()
date, value = self.forecast_alg.forecast(self.forecast_period)
logger.info("forecast[{metric_name}] -> forecast length: {length}"
.format(metric_name=self.metric_name,
length=len(value)))
forecast_result['status'] = 'success'
forecast_result['metric_name'] = self.metric_name
forecast_result['detect_basis'] = self.detect_basis
forecast_result['future_date'] = date
forecast_result['future_value'] = value
except Exception as e:
logger.error(e, exc_info=True)
forecast_result['status'] = 'fail'
return forecast_result
def __repr__(self):
return 'forecastor of the metric {metric}'.format(metric=self.metric_name)

View File

@ -0,0 +1,118 @@
import os
import re
from configparser import ConfigParser
from detector.algorithms import forecast_algorithm
from task import metric_task
from utils import get_funcs
from .data_handler import DataHandler
from .forecast import Forecastor
from .monitor import Monitor
from .monitor_logger import logger
def start_monitor(config_path, metric_config_path):
if not os.path.exists(config_path):
logger.error('{config_path} is not exist.'.format(config_path=config_path))
return
if not os.path.exists(metric_config_path):
logger.error('{metric_config_path} is not exist.'.format(metric_config_path=metric_config_path))
return
config = ConfigParser()
config.read(config_path)
if not config.has_section('forecast') or not config.has_section('database'):
logger.error("do not has 'forecast' or 'database' section in config file.")
return
if not config.has_option('forecast', 'forecast_alg'):
logger.warn("do not find 'forecast_alg' in forecast section, use default 'fbprophet'.")
forecast_alg = forecast_algorithm('fbprophet')
else:
try:
forecast_alg = forecast_algorithm(config.get('forecast', 'forecast_alg'))
except Exception as e:
logger.warn("{error}, use default method: 'fbprophet'.".format(error=str(e)))
forecast_alg = forecast_algorithm('fbprophet')
if not config.has_option('database', 'database_path'):
logger.error("do not find 'database_path' in database section...")
return
else:
database_path = config.get('database', 'database_path')
database_path = os.path.realpath(database_path)
monitor_service = Monitor()
config.clear()
config.read(metric_config_path)
metric_task_from_py = get_funcs(metric_task)
metric_name_from_py = [item[0] for item in metric_task_from_py]
metric_name_from_config = config.sections()
default_metric_parameter_values = {'forecast_interval': '120S',
'forecast_period': '60S',
'data_period': '60S'}
for metric_name in set(metric_name_from_config).union(set(metric_name_from_py)):
if metric_name in set(metric_name_from_config).difference(set(metric_name_from_py)):
logger.error("{metric_name} is not defined in 'task/metric_task.py', abandon monitoring."
.format(metric_name=metric_name))
continue
if metric_name in set(metric_name_from_py).difference(set(metric_name_from_config)):
logger.error("{metric_name} has no config information in 'task/metric_config.conf', abandon monitoring."
.format(metric_name=metric_name))
continue
if metric_name in set(metric_name_from_py).intersection(set(metric_name_from_config)):
kwargs = {}
if not config.has_option(metric_name, 'maximum') and not config.has_option(metric_name, 'minimum'):
logger.error("{metric_name} do not provide any range parameter ('minimum' or 'maximum'), skip monitor."
.format(metric_name=metric_name))
continue
else:
if config.has_option(metric_name, 'maximum'):
kwargs['maximum'] = config.getfloat(metric_name, 'maximum')
if config.has_option(metric_name, 'minimum'):
kwargs['minimum'] = config.getfloat(metric_name, 'minimum')
for parameter, default_value in default_metric_parameter_values.items():
if not config.has_option(metric_name, parameter):
logger.warn("{metric_name} do not provide {parameter}, use default value: {default_value}."
.format(parameter=parameter,
metric_name=metric_name,
default_value=default_value))
value = default_value
else:
temp_value = config.get(metric_name, parameter)
if parameter == 'data_period' and temp_value.isdigit():
value = int(temp_value)
else:
try:
value_number, value_unit = re.match(r'(\d+)?([WDHMS])', temp_value).groups()
if value_number is None or value_unit is None or value_unit not in ('S', 'M', 'H', 'D', 'W'):
logger.error("wrong value: {metric_name} - {parameter}, only support 'S(second)' 'M(minute)'"
"'H(hour)' 'D(day)' 'W(week)', not support '{unit}', use default value: {default_value}"
.format(metric_name=metric_name,
unit=value_unit,
parameter=parameter,
default_value=default_value))
value = default_value
else:
value = temp_value
except Exception as e:
logger.error("{metric_name} - {parameter} error: {error}, use default value: {default_value}.")
value = default_value
kwargs[parameter] = value
kwargs['forecast_alg'] = forecast_alg()
kwargs['database_path'] = database_path
kwargs['data_handler'] = DataHandler
kwargs['metric_name'] = metric_name
monitor_service.apply(Forecastor(**kwargs))
monitor_service.start()

View File

@ -0,0 +1,26 @@
from utils import RepeatTimer, transform_time_string
from .monitor_logger import logger
class Monitor:
def __init__(self):
self._tasks = dict()
def apply(self, instance, args=None, kwargs=None):
if instance in self._tasks:
return False
logger.info('add [{task}] in Monitor task......'.format(task=getattr(instance, 'metric_name')))
interval = getattr(instance, 'forecast_interval')
try:
interval = transform_time_string(interval, mode='to_second')
except ValueError as e:
logger.error(e, exc_info=True)
return
timer = RepeatTimer(interval=interval, function=instance.run, args=args, kwargs=kwargs)
self._tasks[instance] = timer
return True
def start(self):
for instance, timer in self._tasks.items():
timer.start()
logger.info('begin to monitor [{task}]'.format(task=getattr(instance, 'metric_name')))

View File

@ -0,0 +1,14 @@
import os
from configparser import ConfigParser
from main import config_path
from utils import detection_logger
config = ConfigParser()
config.read(config_path)
log_dir_realpath = os.path.realpath(config.get('log', 'log_dir'))
if not os.path.exists(log_dir_realpath):
os.makedirs(log_dir_realpath)
logger = detection_logger(level='INFO',
log_name='monitor',
log_path=os.path.join(log_dir_realpath, 'monitor.log'))

View File

@ -0,0 +1,11 @@
from .database import db
from .detection_app import MyApp
def start_service(config_path):
app = MyApp()
app.initialize_config(config_path)
app.initialize_app()
app.initialize_database(db)
app.add_resources()
app.start_service()

View File

@ -0,0 +1,12 @@
class App(object):
def __init__(self):
pass
def add_resources(self):
pass
def init_database(self):
pass
def start_service(self, *args, **kwargs):
pass

View File

@ -0,0 +1,33 @@
'''
define table structure in database
'''
from flask_sqlalchemy import SQLAlchemy
from .server_logger import logger
db = SQLAlchemy()
class Base(db.Model):
__abstract__ = True
row = 0
timestamp = db.Column(db.BIGINT, nullable=False, primary_key=True)
value = db.Column(db.Float, nullable=False)
max_rows = 100000
max_flush_cache = 1000
@classmethod
def limit_max_rows(cls):
db.session.execute(db.text(
"delete from {table} where timestamp in (select timestamp from {table} order by timestamp desc limit -1 "
"offset {max_rows})".format(table=cls.__tablename__, max_rows=cls.max_rows)
))
logger.info('remove surplus rows in table [{table}]'.format(table=cls.__tablename__))
@classmethod
def on_insert(cls, mapper, connection, target):
if cls.rows % cls.max_flush_cache == 0:
cls.limit_max_rows()
cls.rows += 1
else:
cls.rows += 1

View File

@ -0,0 +1,137 @@
import os
import ssl
from configparser import ConfigParser, NoOptionError
from flask import Flask
from flask_restful import Api
from flask_sqlalchemy import event
from task import metric_task
from utils import get_funcs, check_certificate
from .app import App
from .database import Base
from .resource import receiver
from .server_logger import logger
class MyApp(App):
def __init__(self):
App.__init__(self)
self.app = None
self.api = None
self.config = None
self.db = None
self.dirname_path = None
self.table_class_relation = {}
def initialize_config(self, config_path):
logger.info('initialize config......')
if not os.path.exists(config_path):
logger.error('{config_path} is not exist..'.format(config_path=config_path))
self.dirname_path = os.path.dirname(config_path)
self.config = ConfigParser()
self.config.read(config_path)
if not self.config.has_section('database'):
logger.error("do not find 'database' section in config file.")
else:
if not self.config.has_option('database', 'database_path'):
logger.error("do not find 'database_path' in database section.")
if not self.config.has_section('server'):
logger.error("do not find 'database' section in config file.")
else:
if not self.config.has_option('server', 'listen_host') or not self.config.has_option('server',
'listen_port'):
logger.error("do not find 'listen_host' or 'listen_port' in server section.")
def initialize_app(self):
logger.info('initialize app......')
self.app = Flask(__name__)
self.app.config['debug'] = False
database_path = os.path.realpath(self.config.get('database', 'database_path'))
database_path_dir = os.path.dirname(database_path)
if not os.path.exists(database_path_dir):
os.makedirs(database_path_dir)
if os.name == 'nt':
self.app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + database_path
elif os.name == 'posix':
self.app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////' + database_path
else:
logger.error("do not support this {system}".format(system=os.name))
self.app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
self.api = Api(self.app)
def initialize_database(self, db):
logger.info('initialize database......')
self.db = db
self.db.init_app(self.app)
default_database_parameter_values = {'max_rows': 100000, 'max_flush_cache': 1000}
for parameter, default_value in default_database_parameter_values.items():
if not self.config.has_option('database', parameter):
logger.warn("do not find '{parameter}' in database section, use default value: '{default_value}'"
.format(parameter=parameter, default_value=default_value))
value = default_value
else:
value = self.config.getint('database', parameter)
globals()[parameter] = value
Base.max_rows = globals()['max_rows']
Base.max_flush_cache = globals()['max_flush_cache']
metric_names = [func_name for func_name, _, in get_funcs(metric_task)]
for metric_name in metric_names:
table = type(metric_name.upper(), (Base, self.db.Model), {'__tablename__': metric_name, 'rows': 0})
event.listen(table, 'after_insert', table.on_insert)
self.table_class_relation[metric_name] = table
with self.app.app_context():
self.db.create_all()
def add_resources(self):
self.api.add_resource(receiver.Source, '/sink',
resource_class_kwargs={'db': self.db, 'table_class_relation': self.table_class_relation})
def start_service(self):
context = None
listen_host = self.config.get('server', 'listen_host')
listen_port = self.config.getint('server', 'listen_port')
if self.config.has_option('security', 'tls') and self.config.getboolean('security', 'tls'):
try:
server_cert = self.config.get('security', 'server_cert')
server_key = self.config.get('security', 'server_key')
ca = self.config.get('security', 'ca')
except NoOptionError as e:
logger.error(e)
return
else:
ssl_certificate_status = check_certificate(server_cert)
ca_certificate_status = check_certificate(ca)
if ssl_certificate_status['status'] == 'fail':
logger.error("error occur when check '{certificate}'.".format(certificate=server_cert))
else:
if ssl_certificate_status['level'] == 'info':
logger.info(ssl_certificate_status['info'])
elif ssl_certificate_status['level'] == 'warn':
logger.warn(ssl_certificate_status['info'])
else:
logger.error(ssl_certificate_status['info'])
return
if ca_certificate_status['status'] == 'fail':
logger.error("error occur when check '{certificate}'.".format(certificate=ca))
else:
if ca_certificate_status['level'] == 'info':
logger.info(ca_certificate_status['info'])
elif ca_certificate_status['level'] == 'warn':
logger.warn(ca_certificate_status['info'])
else:
logger.error(ca_certificate_status['info'])
return
pw_file = os.path.join(self.dirname_path, 'certificate/pwf')
with open(pw_file, mode='r') as f:
pw = f.read().strip()
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=ca)
context.verify_mode = ssl.CERT_REQUIRED
context.load_cert_chain(certfile=server_cert, keyfile=server_key, password=pw)
logger.info('Start service......')
self.app.run(host=listen_host, port=listen_port, ssl_context=context)

View File

@ -0,0 +1,53 @@
from flask import request
from flask_restful import Resource
from ..server_logger import logger
class ResponseTuple:
"""generate a response tuple."""
@staticmethod
def success(result=None):
if result is None:
return {"status": "success"}, 200
return {"status": "success", "result": result}
@staticmethod
def error(msg="", status_code=400):
return {"status": "error", "msg": msg}, status_code
class Source(Resource):
def __init__(self, db, table_class_relation):
self.db = db
self.table_class_relation = table_class_relation
def post(self):
content = request.json
try:
for name, event in content.items():
tup_obj = self.table_class_relation[name](timestamp=event['timestamp'], value=event['value'])
self.db.session.add(tup_obj)
self.db.session.commit()
return ResponseTuple.success()
except Exception as e:
logger.error('error when receive data from agent: ' + str(e))
self.db.session.rollback()
return ResponseTuple.error(msg=str(e))
def get(self):
return ResponseTuple.error(status_code=400)
def delete(self):
return ResponseTuple.error(status_code=400)
class Index(Resource):
def get(self):
return ResponseTuple.success()
def delete(self):
return ResponseTuple.error(status_code=400)

View File

@ -0,0 +1,15 @@
import os
from configparser import ConfigParser
from main import config_path
from utils import detection_logger
config = ConfigParser()
config.read(config_path)
log_dir_realpath = os.path.realpath(config.get('log', 'log_dir'))
if not os.path.exists(log_dir_realpath):
os.makedirs(log_dir_realpath)
logger = detection_logger(level='INFO',
log_name='server',
log_path=os.path.join(log_dir_realpath, 'server.log'))

View File

@ -0,0 +1,45 @@
import argparse
import os
import sys
from multiprocessing import Process
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'a-detection.conf')
metric_config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'task/metric_task.conf')
__version__ = '1.0.0'
def parse_args():
parser = argparse.ArgumentParser(description='abnormal detection: detect abnormality of database metric.')
parser.add_argument('-r', '--role', required=True, choices=['agent', 'detector'], help='run on client or detector.')
parser.version = __version__
return parser.parse_args()
def main(args):
role = args.role
if role == 'agent':
agent_pid = os.getpid()
with open('./agent.pid', mode='w') as f:
f.write(str(agent_pid))
from agent import start_agent
start_agent(config_path)
else:
from detector.server import start_service
from detector.monitor import start_monitor
server_process = Process(target=start_service, args=(config_path,))
monitor_process = Process(target=start_monitor, args=(config_path, metric_config_path))
server_process.start()
monitor_process.start()
with open('./server.pid', mode='w') as f:
f.write(str(server_process.pid))
with open('./monitor.pid', mode='w') as f:
f.write(str(monitor_process.pid))
server_process.join()
monitor_process.join()
if __name__ == '__main__':
main(parse_args())

View File

@ -0,0 +1,6 @@
[req]
distinguished_name = req_distinguished_name
prompt = no
[req_distinguished_name]
O = A-Detection Certificate Authority

View File

@ -0,0 +1,4 @@
CURRENT_DIR=$(cd ../$(dirname $0); pwd)
BASENAME=$(basename $CURRENT_DIR)
PROJECT_NAME="A-Detection"

View File

@ -0,0 +1,46 @@
#!/bin/bash
source ./initialize_certificate.sh
ca_crt="ca.crt"
ca_key="ca.key"
ca_password=""
read -s -p "please input the password of ca: " ca_password
cat > ca.conf <<-EOF
[req]
distinguished_name = req_distinguished_name
prompt = no
[req_distinguished_name]
O = $PROJECT_NAME Certificate Authority
EOF
expect <<-EOF
spawn /bin/openssl genrsa -aes256 -out ${ca_key} 2048
expect "Enter pass phrase for"
send "${ca_password}\r"
expect "Verifying - Enter pass phrase for"
send "${ca_password}\r"
expect eof
EOF
expect <<-EOF
spawn /bin/openssl req -new -out ca.req -key ${ca_key} -config ca.conf
expect "Enter pass phrase for"
send "${ca_password}\r"
expect eof
EOF
expect <<-EOF
spawn /bin/openssl x509 -req -in ca.req -signkey ${ca_key} -days 7300 -out ${ca_crt}
expect "Enter pass phrase for"
send "${ca_password}\r"
expect eof
EOF
mv ${ca_crt} ${ca_key} ${CURRENT_DIR}/${CA}
rm ca.req
chmod 600 `find ${CURRENT_DIR}/${CA} -type f`

View File

@ -0,0 +1,63 @@
source ./initialize_certificate.sh
CA_CRT="${CURRENT_DIR}/${CA}/ca.crt"
CA_KEY="${CURRENT_DIR}/${CA}/ca.key"
pwf="${CURRENT_DIR}/${PW_FILE}"
local_host=""
ca_password=""
ssl_password=""
base_dir=""
file_name=""
if [ ! -f ${CA_CRT} ]; then
echo "not found ${CA_CRT}."
exit 0
fi
if [ ! -f ${CA_KEY} ]; then
echo "not found ${CA_KEY}."
exit 0
fi
read -p "please input the basename of ssl certificate: " base_dir
read -p "please input the filename of ssl certificate: " file_name
read -p "please input the loca host: " local_host
read -s -p "please input the password of ca and ssl separated by space: " ca_password ssl_password
if [ ! -d ${base_dir}/ ]; then
mkdir -p ${base_dir}
fi
key="${base_dir}/${file_name}.key"
crt="${base_dir}/${file_name}.crt"
req="${base_dir}/${file_name}.req"
expect <<-EOF
spawn /bin/openssl genrsa -aes256 -out ${key} 2048
expect "Enter pass phrase for"
send "${ssl_password}\r"
expect "Verifying - Enter pass phrase for"
send "${ssl_password}\r"
expect eof
EOF
expect <<-EOF
spawn /bin/openssl req -new -out ${req} -key ${key} -subj "/C=CN/ST=Some-State/O=${file_name}/CN=${local_host}"
expect "Enter pass phrase for"
send "${ssl_password}\r"
expect eof
EOF
expect <<-EOF
spawn /bin/openssl x509 -req -in ${req} -out ${crt} -sha256 -CAcreateserial -days 7000 -CA ${CA_CRT} -CAkey ${CA_KEY}
expect "Enter pass phrase for"
send "${ca_password}\r"
expect eof
EOF
rm ${req}
echo "${ssl_password}">${pwf}
chmod 600 ${key}
chmod 600 ${crt}
chmod 600 ${pwf}

View File

@ -0,0 +1,19 @@
source ./common.sh
SERVER="certificate/server"
AGENT="certificate/agent"
CA="certificate/ca"
PW_FILE="certificate/pwf"
if [ ! -d ${CURRENT_DIR}/${SERVER} ]; then
mkdir -p ${CURRENT_DIR}/${SERVER}
fi
if [ ! -d ${CURRENT_DIR}/${AGENT} ]; then
mkdir -p ${CURRENT_DIR}/${AGENT}
fi
if [ ! -d ${CURRENT_DIR}/${CA} ]; then
mkdir -p ${CURRENT_DIR}/${CA}
fi

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

View File

@ -0,0 +1,32 @@
[cpu_usage]
minimum = 30
maximum = 100
data_period = 100S
forecast_period = 100S
[memory_usage]
minimum = 20
maximum = 100
data_period = 20
forecast_interval = 20S
forecast_period = 20S
[io_read]
minimum = 30
maximum = 100
data_period = 30
forecast_interval = 30S
forecast_period = 30S
[io_write]
minimum = 50
maximum = 100
data_period = 40S
forecast_interval = 40S
[disk_space]
minimum = 10
maximum = 100
data_period = 50S
forecast_interval = 50S
forecast_period = 50S

View File

@ -0,0 +1,67 @@
import os
import subprocess
from utils import unify_byte_unit
def cpu_usage():
child1 = subprocess.Popen(['ps', '-ux'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
child2 = subprocess.Popen(['grep', 'gaussd[b]'], stdin=child1.stdout, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=False)
sub_chan = child2.communicate()
if not sub_chan[0]:
result = 0.0
else:
result = sub_chan[0].split()[2].decode('utf-8')
return result
def io_read():
child1 = subprocess.Popen(['pidstat', '-d'], stdout=subprocess.PIPE, shell=False)
child2 = subprocess.Popen(['grep', 'gaussd[b]'], stdin=child1.stdout, stdout=subprocess.PIPE, shell=False)
sub_chan = child2.communicate()
if not sub_chan[0]:
result = 0.0
else:
result = sub_chan[0].split()[3].decode('utf-8')
return result
def io_write():
child1 = subprocess.Popen(['pidstat', '-d'], stdout=subprocess.PIPE, shell=False)
child2 = subprocess.Popen(['grep', 'gaussd[b]'], stdin=child1.stdout, stdout=subprocess.PIPE, shell=False)
sub_chan = child2.communicate()
if not sub_chan[0]:
result = 0.0
else:
result = sub_chan[0].split()[4].decode('utf-8')
return result
def memory_usage():
child1 = subprocess.Popen(['ps', '-ux'], stdout=subprocess.PIPE, shell=False)
child2 = subprocess.Popen(['grep', 'gaussd[b]'], stdin=child1.stdout, stdout=subprocess.PIPE, shell=False)
sub_chan = child2.communicate()
if not sub_chan[0]:
result = 0.0
else:
result = sub_chan[0].split()[3].decode('utf-8')
return result
def disk_space():
pg_data = os.getenv('PGDATA')
if pg_data is None:
raise ValueError('not found PGDATA in environment.')
else:
pg_data = os.path.realpath(pg_data)
child = subprocess.Popen(['du', '-sh', pg_data], stdout=subprocess.PIPE, shell=False)
sub_chan = child.communicate()
if sub_chan[1] is not None:
raise ValueError('error when get disk usage of openGauss: {error}'.
format(error=sub_chan[1].decode('utf-8')))
if not sub_chan[0]:
result = 0.0
else:
result = unify_byte_unit(sub_chan[0].decode('utf-8'))
return result

View File

@ -0,0 +1,11 @@
import os
import sys
sys.path.append((os.path.dirname(os.getcwd())))
from agent import start_agent
config_path = '../a-detection.conf'
if __name__ == '__main__':
start_agent(config_path)

View File

@ -0,0 +1,20 @@
import sys
sys.path.append('../')
from detector.monitor import data_handler
dh = data_handler.DataHandler('io_read', './data/metric.db')
#with dh('io_read', './data/metric.db') as db:
# ts = db.get_timeseries(period=10)
# print(ts)
# print(len(ts))
# ts = db.get_timeseries(period='10S')
# print(ts)
# print(len(ts))
dh.connect_db()
ts = dh.get_timeseries(period='10S')
print(ts)
dh.close()

View File

@ -0,0 +1,12 @@
import os
import sys
sys.path.append((os.path.dirname(os.getcwd())))
from detector.monitor import start_monitor
config_path = '../a-detection.conf'
metric_config_path = '../task/metric_task.conf'
if __name__ == '__main__':
start_monitor(config_path, metric_config_path)

View File

@ -0,0 +1,17 @@
import os
import sys
sys.path.append((os.path.dirname(os.getcwd())))
from detector.server import start_service
config_path = '../a-detection.conf'
if __name__ == '__main__':
start_service(config_path)
from urllib import parse
import urllib
urllib.parse.qu

View File

@ -0,0 +1,6 @@
CURRENT_DIR=$(cd ../$(dirname $0); pwd)
BASENAME=$(basename $CURRENT_DIR)
MONITOR_PID="monitor.pid"
SERVER_PID="server.pid"
AGENT_PID="agent.pid"

View File

@ -0,0 +1,137 @@
source ./common.sh
function usage()
{
echo "usage: $0 [option]
--help
--deploy_code
--start_agent
--start_detector
--start_all
"
}
function start_agent()
{
cd ${CURRENT_DIR}
nohup python main.py -r agent > /dev/null 2>&1 &
}
function start_detector()
{
local user=""
local host=""
local detector_path=""
local password=""
local port=22
read -p "please input the user of detector: " user
read -p "please input the host of detector: " host
read -p "please input the path of detector: " detector_path
read -s -p "please input the password of ${user}@${host}: " password
expect <<-EOF
spawn ssh ${host} -p ${port} -l ${user}
expect {
"(yes/no)?" {
send "yes\r"
expect "*assword:"
send "${password}\r"
}
"*assword:" {
send "${password}\r"
}
"Last login:" {
send "\r"
}
}
send "\r"
expect "*]*"
send "cd ${detector_path}/${BASENAME}\r"
expect "*]*"
send "nohup python main.py -r detector > /dev/null 2>&1 &\r"
expect "*]*"
send "exit\r"
expect eof
EOF
}
function deploy_code()
{
local user=""
local host=""
local detector_path=""
read -p "please input the user of detector: " user
read -p "please input the host of detector: " host
read -p "please input the path of detector: " detector_path
read -s -p "please input the password of ${user}@${host}: " password
expect <<-EOF
spawn scp -r ${CURRENT_DIR} ${user}@${host}:${detector_path}
expect {
"(yes/no)?" {
send "yes\r"
expect "*assword:"
send "${password}\r"
}
"*assword" {
send "${password}\r"
}
}
expect eof
EOF
}
function start_all()
{
start_agent
start_detector
}
function main()
{
if [ $# -ne 1 ]; then
usage
exit 1
fi
case "$1" in
--help)
usage
break
;;
--start_agent)
start_agent
break
;;
--start_detector)
start_detector
break
;;
--deploy_code)
deploy_code
break
;;
--start_all)
start_all
break
;;
*)
echo "unknown arguments"
;;
esac
}
main $@

View File

@ -0,0 +1,100 @@
source ./common.sh
function usage()
{
echo "usage: $0 [option]
--help
--stop_agent
--stop_detector
--stop_all
"
}
function stop_agent()
{
cat ${CURRENT_DIR}/${AGENT_PID} | xargs kill -9
}
function stop_detector()
{
local user=""
local host=""
local detector_path=""
local password=""
local port=22
read -p "please input the user of detector: " user
read -p "please input the host of detector: " host
read -p "please input the path of detector: " detector_path
read -s -p "please input the password of ${user}@${host}: " password
expect <<-EOF
spawn ssh ${host} -p ${port} -l ${user}
expect {
"(yes/no)?" {
send "yes\r"
expect "*assword:"
send "${password}\r"
}
"*assword:" {
send "${password}\r"
}
"Last login:" {
send "\r"
}
}
send "\r"
expect "*]*"
send "cat ${detector_path}/${BASENAME}/${MONITOR_PID} | xargs kill -9\r"
expect "*]*"
send "cat ${detector_path}/${BASENAME}/${SERVER_PID} | xargs kill -9\r"
expect "*]*"
send "exit\r"
expect eof
EOF
}
function stop_all()
{
stop_agent
stop_detector
}
function main()
{
if [ $# -ne 1 ]; then
usage
exit 1
fi
case "$1" in
--help)
usage
break
;;
--stop_agent)
stop_agent
break
;;
--stop_detector)
stop_detector
break
;;
--stop_all)
stop_all
break
;;
*)
echo "unknown arguments"
;;
esac
}
main $@

View File

@ -0,0 +1,132 @@
import inspect
import logging
import pydoc
import re
import subprocess
from datetime import datetime, timedelta
from logging import handlers
from threading import Thread, Event
import dateutil.parser
class RepeatTimer(Thread):
def __init__(self, interval, function, args=None, kwargs=None):
Thread.__init__(self)
self.interval = interval
self.function = function
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
self.finished = Event()
def cancel(self):
self.finished.set()
def run(self):
while not self.finished.is_set():
self.finished.wait(self.interval)
self.function(*self.args, **self.kwargs)
self.finished.set()
def transform_time_string(time_str, mode='timedelta'):
"""
only support 'weeks, days, hours, minutes, seconds
W: week, D: days, H: hours, M: minutes, S: seconds
"""
if mode not in ('timedelta', 'to_second'):
raise ValueError('wrong mode {mode} in time_transfer.'.format(mode=mode))
time_num, time_flag = re.match(r'(\d+)?([WDHMS])', time_str).groups()
if time_flag is None:
raise ValueError('wrong format {time_str} for time_str in time_transfer.'.format(time_str=time_str))
if time_num is None:
time_num = 1
else:
time_num = int(time_num)
timedelta_mapper = {'W': timedelta(weeks=1),
'D': timedelta(days=1),
'H': timedelta(hours=1),
'M': timedelta(minutes=1),
'S': timedelta(seconds=1)}
second_mapper = {'W': 7 * 24 * 3600, 'D': 24 * 3600, 'H': 3600, 'M': 60, 'S': 1}
if mode == 'timedelta':
return timedelta_mapper.get(time_flag) * time_num
if mode == 'to_second':
return second_mapper.get(time_flag) * time_num
def detection_logger(log_name, log_path, level):
logger = logging.getLogger(log_name)
agent_handler = handlers.RotatingFileHandler(filename=log_path,
maxBytes=1024 * 1024 * 100,
backupCount=5)
agent_handler.setFormatter(logging.Formatter("[%(asctime)s %(levelname)s]-[%(name)s]: %(message)s"))
logger.addHandler(agent_handler)
logger.setLevel(getattr(logging, level.upper()) if hasattr(logging, level.upper()) else logging.INFO)
return logger
def unify_byte_unit(byte_info):
"""
func: transfer unit of K、M、G、T、P to M
"""
byte_info = byte_info.upper()
bytes_num, bytes_unit = re.match(r'^(\d+|\d+\.\d+)([KMGTP])', byte_info).groups()
if bytes_num is None or bytes_unit is None or bytes_unit not in 'KMGTP':
raise ValueError('can not parse format of {bytes}'.format(bytes=byte_info))
byte_unit_mapper = {'K': 1 / 1024, 'M': 1, 'G': 1024, 'T': 1024 * 1024, 'P': 1024 * 1024 * 1024}
return byte_unit_mapper[bytes_unit] * int(float(bytes_num))
def get_funcs(thing):
"""
return functions in python file
"""
funcs = []
_object, _ = pydoc.resolve(thing)
_all = getattr(_object, '__all__', None)
for key, value in inspect.getmembers(_object, inspect.isroutine):
if _all is not None or inspect.isbuiltin(value) or inspect.getmodule(value) is _object:
if pydoc.visiblename(key, _all, _object):
funcs.append((key, value))
return funcs
def check_certificate(certificate_path):
"""
check certificate validity
"""
check_result = {}
certificate_waring_threshold = 365
child = subprocess.Popen(['openssl', 'x509', '-in', certificate_path, '-noout', '-dates'],
shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
sub_chan = child.communicate()
if sub_chan[1] or not sub_chan[0]:
check_result['status'] = 'fail'
else:
check_result['status'] = 'success'
not_after = sub_chan[0].decode('utf-8').split('\n')[1].split('=')[1].strip()
end_time = dateutil.parser.parse(not_after).replace(tzinfo=None)
certificate_remaining_days = (end_time - datetime.now()).days
if 0 < certificate_remaining_days < certificate_waring_threshold:
check_result['level'] = 'warn'
check_result['info'] = "the '{certificate}' has {certificate_remaining_days} days before out of date." \
.format(certificate=certificate_path,
certificate_remaining_days=certificate_remaining_days)
elif certificate_remaining_days >= certificate_waring_threshold:
check_result['level'] = 'info'
check_result['info'] = "the '{certificate}' has {certificate_remaining_days} days before out of date." \
.format(certificate=certificate_path,
certificate_remaining_days=certificate_remaining_days)
else:
check_result['level'] = 'error'
check_result['info'] = "the '{certificate}' is out of date." \
.format(certificate=certificate_path)
return check_result