This commit is contained in:
l00280231
2021-05-08 15:56:26 +08:00
31 changed files with 9042 additions and 8627 deletions

View File

@ -59,7 +59,7 @@ bool open_join_children = true;
bool will_shutdown = false;
/* hard-wired binary version number */
const uint32 GRAND_VERSION_NUM = 92298;
const uint32 GRAND_VERSION_NUM = 92299;
const uint32 MATVIEW_VERSION_NUM = 92213;
const uint32 PARTIALPUSH_VERSION_NUM = 92087;

View File

@ -34,11 +34,11 @@ FULL_ARRANGEMENT_THRESHOLD = 20
BASE_CMD = ''
SHARP = '#'
SCHEMA = None
BLANK = ' '
SQL_TYPE = ['select', 'delete', 'insert', 'update']
SQL_PATTERN = [r'([^\\])\'((\')|(.*?([^\\])\'))',
r'([^\\])"((")|(.*?([^\\])"))',
r'([^a-zA-Z])-?\d+(\.\d+)?',
r'([^a-zA-Z])-?\d+(\.\d+)?',
r'(\s*[<>]\s*=*\s*\d+)',
r'(\'\d+\\.*?\')']
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
@ -86,7 +86,6 @@ class IndexItem:
self.storage = 0
def run_shell_cmd(target_sql_list):
cmd = BASE_CMD + ' -c \"'
if SCHEMA:
@ -94,7 +93,7 @@ def run_shell_cmd(target_sql_list):
for target_sql in target_sql_list:
cmd += target_sql + ';'
cmd += '\"'
proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
(stdout, stderr) = proc.communicate()
stdout, stderr = stdout.decode(), stderr.decode()
if 'gsql' in stderr or 'failed to connect' in stderr:
@ -132,12 +131,15 @@ def print_header_boundary(header):
def load_workload(file_path):
wd_dict = {}
workload = []
global BLANK
with open(file_path, 'r') as file:
raw_text = ''.join(file.readlines())
sqls = raw_text.split(';')
for sql in sqls:
if any(tp in sql.lower() for tp in SQL_TYPE):
TWO_BLANKS = BLANK * 2
while TWO_BLANKS in sql:
sql = sql.replace(TWO_BLANKS, BLANK)
if sql not in wd_dict.keys():
wd_dict[sql] = 1
else:
@ -228,7 +230,7 @@ def estimate_workload_cost_file(workload, index_config=None):
if ENABLE_MULTI_NODE:
file.write('set enable_fast_query_shipping = off;\n')
for query in workload:
file.write('EXPLAIN ' + query.statement + ';\n')
file.write('set explain_perf_mode = 'normal'; EXPLAIN ' + query.statement + ';\n')
result = run_shell_sql_cmd(sql_file).split('\n')
if os.path.exists(sql_file):
@ -269,6 +271,7 @@ def estimate_workload_cost_file(workload, index_config=None):
def make_single_advisor_sql(ori_sql):
sql = 'select gs_index_advise(\''
ori_sql = ori_sql.replace('"', '\'')
for elem in ori_sql:
if elem == '\'':
sql += '\''

View File

@ -1,52 +1,94 @@
## Introduction to Sqldiag
**Sqldiag** is a robust forecasting framework that allows a DBMS to predict execute time for unknown workload based on
historical data, The prediction is based on similarity of unknown queries and historical queries. We do not use
execute-plan since achieving execute-plan will claim resource on user database, also inapplicable for __OLTP__ workload.
**Sqldiag** is a robust forecasting framework that allows a DBMS to predict execute time for unknown workload based on historical data, The prediction is based on similarity of unknown queries and historical queries. We do not use execute-plan since achieving execute-plan will claim resource on user database, also inapplicable for __OLTP__ workload.
Framework pipeline is:
* prepare train dataset: prepare dataset follow the required format below.
* file_processor: get dataset from csv file.
* sql2vector: train an adaptive model to transform sql to vector.
* sql_template: we divide historical queries into a number of categories based on SQl similarity algorithm.
* predict: get unknown query and return execute information including 'execute_time', 'cluster id', 'point'.
## Run sqldiag on sample data:
```
sqldiag/
├── algorithm
│   ├── diag.py
│   ├── duration_time_model
│   │   ├── dnn.py
│   │   ├── __init__.py
│   │   └── template.py
│   ├── sql_similarity
│   │   ├── __init__.py
│   │   ├── levenshtein.py
│   │   ├── list_distance.py
│   │   └── parse_tree.py
│   └── word2vec.py
├── load_sql_from_wdr.py
├── main.py
├── preprocessing.py
├── README.md
├── requirements.txt
├── result.png
├── sample_data
│   ├── predict.csv
│   └── train.csv
├── sqldiag.conf
└── utils.py
```
# train based on train.csv
python main.py -m train -f data/train.csv
# predict unknown dataset
python main.py -m predict -f data/predict.csv
* sqldiag.conf : Parameters such as model training path.
* requirements.txt: Dependency package installed using pip.
## Sqldiag Dependencies
python3.5+
gensim
tensorflow==2.3.1
pandas
matplotlib
gensim==3.8.3
sqlparse
sklearn
numpy
After installing python,run`pip install -r requirements.txt` command to install all required packages.
## Quick guide
## prepare dataset
# train dataset
EXEC_TIME,SQL
SQL,EXEC_TIME
# predict dataset
SQL
note: you can use script [load_sqk_from_wdr](load_sql_from_wdr.py) to collect train dataset based on WDR.
usage: load_sql_from_wdr.py [-h] --port PORT --start_time START_TIME
--finish_time FINISH_TIME [--save_path SAVE_PATH]
example:
python load_sql_from_wdr.py --start_time "2021-04-25 00:00:00" --finish_time "2021-04-26 14:00:00" --port 8000
* _EXEC_TIME_: execute time for sql.
* _SQL_: current query string
note: you should separated by commas
train dataset sample: [sample_data/train.csv](data/train.csv)
train dataset sample: [data/train.csv](data/train.csv)
predict dataset sample: [sample_data/predict.csv](data/predict.csv)
predict dataset sample: [data/predict.csv](data/predict.csv)
## example for template method using sample dataset
## train based on dataset
# training
python main.py train -f ./sample_data/train.csv --model template --model-path ./template
python main.py -m train -f train_file
# predict
python main.py predict -f ./sample_data/predict.csv --model template --model-path ./template
--predicted-file ./result/t_result
## predict unknown dataset
# update model
python main.py finetune -f ./sample_data/train.csv --model template --model-path ./template
python main.py -m predict -f predict_file
## example for dnn method using sample dataset
# training
python main.py train -f ./sample_data/train.csv --model dnn --model-path ./dnn_model
# predict
python main.py predict -f ./sample_data/predict.csv --model dnn --model-path ./dnn_model
--predicted-file ./result/result
# update model
python main.py finetune -f ./sample_data/train.csv --model dnn --model-path ./dnn_model

View File

@ -0,0 +1,92 @@
import logging
import sys
from preprocessing import LoadData
from .duration_time_model.dnn import DnnModel
from .duration_time_model.template import TemplateModel
W2V_SUFFIX = 'word2vector'
def check_template_algorithm(param):
if param and param not in ["list", "levenshtein", "parse_tree"]:
raise ValueError("The similarity algorithm '%s' is invaild, "
"please choose from ['list', 'levenshtein', 'parse_tree']" % param)
class ModelConfig(object):
def __init__(self):
pass
def init_from(self, config):
pass
@classmethod
def init_from_config_parser(cls, config):
config_instance = cls()
config_instance.init_from(config)
return config_instance
class DnnConfig(ModelConfig):
def __init__(self):
super().__init__()
self.epoch = 300
def init_from(self, config):
self.epoch = config.get("dnn", "epoch") if \
config.get("dnn", "epoch") else self.epoch
self.epoch = int(self.epoch)
class TemplateConfig(ModelConfig):
def __init__(self):
super().__init__()
self.similarity_algorithm = "list"
self.time_list_size = 10
self.knn_number = 3
def init_from(self, config):
self.similarity_algorithm = config.get("template", "similarity_algorithm") if \
config.get("template", "similarity_algorithm") else self.similarity_algorithm
check_template_algorithm(self.similarity_algorithm)
self.time_list_size = config.get("template", "time_list_size") if \
config.get("template", "time_list_size") else self.time_list_size
self.knn_number = config.get("template", "knn_number", ) if \
config.get("template", "knn_number") else self.knn_number
self.time_list_size = int(self.time_list_size)
self.knn_number = int(self.knn_number)
SUPPORTED_ALGORITHM = {'dnn': lambda config: DnnModel(DnnConfig.init_from_config_parser(config)),
'template': lambda config: TemplateModel(
TemplateConfig.init_from_config_parser(config))}
class SQLDiag:
def __init__(self, model_algorithm, csv_file, params):
if model_algorithm not in SUPPORTED_ALGORITHM:
raise NotImplementedError("do not support {}".format(model_algorithm))
try:
self._model = SUPPORTED_ALGORITHM.get(model_algorithm)(params)
except ValueError as e:
logging.error(e, exc_info=True)
sys.exit(1)
self.load_data = LoadData(csv_file)
def __getattr__(self, item):
return getattr(self.load_data, item)
def fit(self):
self._model.fit(self.train_data)
def transform(self):
return self._model.transform(self.predict_data)
def fine_tune(self, filepath):
self._model.load(filepath)
self._model.fit(self.train_data)
def load(self, filepath):
self._model.load(filepath)
def save(self, filepath):
self._model.save(filepath)

View File

@ -0,0 +1,22 @@
import abc
class AbstractModel:
def __init__(self, params):
self.params = params
@abc.abstractmethod
def fit(self, data):
pass
@abc.abstractmethod
def transform(self, data):
pass
@abc.abstractmethod
def load(self, filepath):
pass
@abc.abstractmethod
def save(self, filepath):
pass

View File

@ -0,0 +1,155 @@
import logging
import os
import pickle
import stat
from abc import ABC
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from algorithm.word2vec import Word2Vector
from preprocessing import templatize_sql
from utils import check_illegal_sql
from . import AbstractModel
class KerasRegression:
def __init__(self, encoding_dim=1):
self.model = None
self.encoding_dim = encoding_dim
@staticmethod
def build_model(shape, encoding_dim):
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Dense
inputs = Input(shape=(shape,))
layer_dense1 = Dense(128, activation='relu', kernel_initializer='he_normal')(inputs)
layer_dense2 = Dense(256, activation='relu', kernel_initializer='he_normal')(layer_dense1)
layer_dense3 = Dense(256, activation='relu', kernel_initializer='he_normal')(layer_dense2)
layer_dense4 = Dense(256, activation='relu', kernel_initializer='he_normal', name='vectors')(layer_dense3)
y_pred = Dense(encoding_dim)(layer_dense4)
model = Model(inputs=inputs, outputs=y_pred)
model.compile(optimizer='adam', loss='mse', metrics=['mae'])
return model
def fit(self, features, labels, batch_size=128, epochs=300):
shape = features.shape[1]
if self.model is None:
self.model = self.build_model(shape=shape, encoding_dim=self.encoding_dim)
self.model.fit(features, labels, epochs=epochs, batch_size=batch_size, shuffle=True, verbose=2)
def predict(self, features):
predict_result = self.model.predict(features)
return predict_result
def save(self, filepath):
self.model.save(filepath)
def load(self, filepath):
from tensorflow.keras.models import load_model
self.model = load_model(filepath)
class DnnModel(AbstractModel, ABC):
def __init__(self, params):
super().__init__(params)
self.w2v_model_parameter = {'max_len': 150,
'sg': 1,
'hs': 1,
'min_count': 0,
'window': 1,
'size': 5,
'iter': 30,
'workers': 8
}
self.w2v = Word2Vector(**self.w2v_model_parameter)
self.epoch = params.epoch
self.scaler = None
self.regression = KerasRegression(encoding_dim=1)
self.data = None
def build_word2vector(self, data):
self.data = list(data)
if self.w2v.model:
self.w2v.update(self.data)
else:
self.w2v.fit(self.data)
def fit(self, data):
self.build_word2vector(data)
list_vec = []
list_cost = []
for sql, duration_time in self.data:
if check_illegal_sql(sql):
continue
filter_template = templatize_sql(sql)
vector = self.w2v.str2vec(filter_template)
list_vec.append(vector)
list_cost.append(duration_time)
features = np.array(list_vec)
labels = np.array(list_cost)
labels = labels.reshape(-1, 1)
self.scaler = MinMaxScaler(feature_range=(0, 1))
self.scaler.fit(labels)
labels = self.scaler.transform(labels)
self.regression.fit(features, labels, epochs=self.epoch)
def transform(self, data):
feature_list = []
data_backup = list(data)
error_list = []
for idx_error, sql in enumerate(data_backup):
if check_illegal_sql(sql):
error_list.append(idx_error)
continue
filter_template = templatize_sql(sql)
vector = self.w2v.str2vec(filter_template)
feature_list.append(vector)
features = np.array(feature_list)
predictions = self.regression.predict(features)
predictions = np.abs(predictions)
score = self.scaler.inverse_transform(predictions)
if error_list:
for item in error_list:
score = np.insert(score, item, -1)
score = np.hstack((np.array(data_backup).reshape(-1, 1), score.reshape(-1, 1))).tolist()
return score
def load(self, filepath):
realpath = os.path.realpath(filepath)
if os.path.exists(realpath):
dnn_path = os.path.join(realpath, 'dnn_model.h5')
word2vector_path = os.path.join(realpath, 'w2v.model')
scaler_path = os.path.join(realpath, 'scaler.pkl')
self.regression.load(dnn_path)
self.w2v.load(word2vector_path)
with open(scaler_path, 'rb') as f:
self.scaler = pickle.load(f)
logging.info("dnn model is loaded: '{}'; w2v model is loaded: '{}'; scaler model is loaded: '{}'."
.format(dnn_path,
word2vector_path,
scaler_path))
else:
logging.error("{} not exist.".format(realpath))
def save(self, filepath):
realpath = os.path.realpath(filepath)
if not os.path.exists(realpath):
os.makedirs(realpath, mode=0o700)
if oct(os.stat(realpath).st_mode)[-3:] != '700':
os.chmod(realpath, stat.S_IRWXU)
dnn_path = os.path.join(realpath, 'dnn_model.h5')
word2vector_path = os.path.join(realpath, 'w2v.model')
scaler_path = os.path.join(realpath, 'scaler.pkl')
self.regression.save(dnn_path)
self.w2v.save(word2vector_path)
with open(scaler_path, 'wb') as f:
pickle.dump(self.scaler, f)
logging.info("dnn model is saved: '{}'; w2v model is saved: '{}'; scaler model is saved: '{}'."
.format(dnn_path,
word2vector_path,
scaler_path))

View File

@ -0,0 +1,154 @@
"""
Copyright (c) 2020 Huawei Technologies Co.,Ltd.
openGauss is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
"""
import heapq
import json
import logging
import os
import stat
from functools import reduce
from algorithm.sql_similarity import calc_sql_distance
from preprocessing import get_sql_template
from utils import check_illegal_sql, LRUCache
from . import AbstractModel
class TemplateModel(AbstractModel):
def __init__(self, params):
super().__init__(params)
self.bias = 1e-5
self.__hash_table = dict(INSERT=dict(), UPDATE=dict(), DELETE=dict(), SELECT=dict(),
OTHER=dict())
self.time_list_size = params.time_list_size
self.knn_number = params.knn_number
self.similarity_algorithm = calc_sql_distance(params.similarity_algorithm)
# training method for template model
def fit(self, data):
for sql, duration_time in data:
if check_illegal_sql(sql):
continue
# get 'fine_template' and 'rough_template' of SQL
fine_template, rough_template = get_sql_template(sql)
sql_prefix = fine_template.split()[0]
# if prefix of SQL is not in 'update', 'delete', 'select' and 'insert',
# then convert prefix to 'other'
if sql_prefix not in self.__hash_table:
sql_prefix = 'OTHER'
if rough_template not in self.__hash_table[sql_prefix]:
self.__hash_table[sql_prefix][rough_template] = dict()
self.__hash_table[sql_prefix][rough_template]['info'] = dict()
if fine_template not in self.__hash_table[sql_prefix][rough_template]['info']:
self.__hash_table[sql_prefix][rough_template]['info'][fine_template] = \
dict(time_list=[], count=0, mean_time=0.0, iter_time=0.0)
# count the number of occurrences of fine template
self.__hash_table[sql_prefix][rough_template]['info'][fine_template]['count'] += 1
# store the execution time of the matched template in the corresponding list
self.__hash_table[sql_prefix][rough_template]['info'][fine_template][
'time_list'].append(duration_time)
# iterative calculation of execution time based on historical data
if not self.__hash_table[sql_prefix][rough_template]['info'][fine_template][
'iter_time']:
self.__hash_table[sql_prefix][rough_template]['info'][fine_template][
'iter_time'] = duration_time
else:
self.__hash_table[sql_prefix][rough_template]['info'][fine_template]['iter_time'] = \
(self.__hash_table[sql_prefix][rough_template]['info'][fine_template][
'iter_time'] + duration_time) / 2
# calculate the average execution time of each template
for sql_prefix, sql_prefix_info in self.__hash_table.items():
for rough_template, rough_template_info in sql_prefix_info.items():
for _, fine_template_info in rough_template_info['info'].items():
del fine_template_info['time_list'][:-self.time_list_size]
fine_template_info['mean_time'] = \
sum(fine_template_info['time_list']) / len(
fine_template_info['time_list'])
rough_template_info['count'] = len(rough_template_info['info'])
rough_template_info['mean_time'] = sum(
[value['mean_time'] for key, value in
rough_template_info['info'].items()]) / len(
rough_template_info['info'])
def transform(self, data):
predict_time_list = []
for sql in data:
predict_time = self.predict_duration_time(sql)
predict_time_list.append([sql, predict_time])
return predict_time_list
@LRUCache(max_size=1024)
def predict_duration_time(self, sql):
if check_illegal_sql(sql):
return -1
sql_prefix = sql.strip().split()[0]
# get 'fine_template' and 'rough_template' of SQL
fine_template, rough_template = get_sql_template(sql)
if sql_prefix not in self.__hash_table:
sql_prefix = 'OTHER'
if not self.__hash_table[sql_prefix]:
logging.warning("'{}' not in the templates.".format(sql))
predict_time = -1
elif rough_template not in self.__hash_table[sql_prefix] or fine_template not in \
self.__hash_table[sql_prefix][rough_template]['info']:
similarity_info = []
"""
if the template does not exist in the hash table,
then calculate the possible execution time based on template
similarity and KNN algorithm in all other templates
"""
if rough_template not in self.__hash_table[sql_prefix]:
for local_rough_template, local_rough_template_info in self.__hash_table[sql_prefix].items():
similarity_info.append(
(self.similarity_algorithm(rough_template, local_rough_template),
local_rough_template_info['mean_time']))
else:
for local_fine_template, local_fine_template_info in \
self.__hash_table[sql_prefix][rough_template]['info'].items():
similarity_info.append(
(self.similarity_algorithm(fine_template, local_fine_template),
local_fine_template_info['iter_time']))
topn_similarity_info = heapq.nlargest(self.knn_number, similarity_info)
sum_similarity_scores = sum(item[0] for item in topn_similarity_info) + self.bias
similarity_proportions = (item[0] / sum_similarity_scores for item in
topn_similarity_info)
topn_duration_time = (item[1] for item in topn_similarity_info)
predict_time = reduce(lambda x, y: x + y,
map(lambda x, y: x * y, similarity_proportions,
topn_duration_time))
else:
predict_time = self.__hash_table[sql_prefix][rough_template]['info'][fine_template]['iter_time']
return predict_time
def load(self, filepath):
realpath = os.path.realpath(filepath)
if os.path.exists(realpath):
template_path = os.path.join(realpath, 'template.json')
with open(template_path, mode='r') as f:
self.__hash_table = json.load(f)
logging.info("template model '{}' is loaded.".format(template_path))
else:
logging.error("{} not exist.".format(realpath))
def save(self, filepath):
realpath = os.path.realpath(filepath)
if not os.path.exists(realpath):
os.makedirs(realpath, mode=0o700)
if oct(os.stat(realpath).st_mode)[-3:] != '700':
os.chmod(realpath, stat.S_IRWXU)
template_path = os.path.join(realpath, 'template.json')
with open(template_path, mode='w') as f:
json.dump(self.__hash_table, f, indent=4)
logging.info("template model is stored in '{}'".format(realpath))

View File

@ -0,0 +1,10 @@
def calc_sql_distance(algorithm):
if algorithm == 'list':
from .list_distance import distance
elif algorithm == 'levenshtein':
from .levenshtein import distance
elif algorithm == 'parse_tree':
from .parse_tree import distance
else:
raise NotImplementedError("do not support '{}'".format(algorithm))
return distance

View File

@ -0,0 +1,22 @@
def distance(str1, str2):
"""
func: calculate levenshtein distance between two strings.
:param str1: string1
:param str2: string2
:return: distance
"""
m, n = len(str1) + 1, len(str2) + 1
matrix = [[0] * n for _ in range(m)]
matrix[0][0] = 0
for i in range(1, m):
matrix[i][0] = matrix[i - 1][0] + 1
for j in range(1, n):
matrix[0][j] = matrix[0][j - 1] + 1
for i in range(1, m):
for j in range(1, n):
if str1[i - 1] == str2[j - 1]:
matrix[i][j] = matrix[i - 1][j - 1]
else:
matrix[i][j] = max(matrix[i - 1][j - 1], matrix[i - 1][j], matrix[i][j - 1]) + 1
return matrix[m - 1][n - 1]

View File

@ -0,0 +1,15 @@
def distance(str1, str2):
sql_distance = 0.0
list1 = str1.split()
list2 = str2.split()
sorted_list1 = sorted(list1)
sorted_list2 = sorted(list2)
max_len = max(len(sorted_list1), len(sorted_list2))
min_len = min(len(sorted_list1), len(sorted_list2))
short_list = sorted_list1 if len(sorted_list1) < len(sorted_list2) else sorted_list2
long_list = sorted_list1 if len(sorted_list1) > len(sorted_list2) else sorted_list2
for item in short_list:
if item in long_list:
sql_distance += 1.0
length_similarity = float(min_len / max_len)
return sql_distance + length_similarity

View File

@ -0,0 +1,84 @@
import sqlparse
from sqlparse.sql import IdentifierList, Identifier, Where, Comparison, Parenthesis, Values
from sqlparse.tokens import Keyword, DML, DDL
ALPHA = 0.8
def token2value(tokens):
value_list = []
for token in tokens:
if isinstance(token, IdentifierList):
for item in token.tokens:
value_list.append(item.value)
elif isinstance(token, Identifier) or isinstance(token, Comparison) or isinstance(token, Values):
value_list.append(token.value)
return value_list
def build_child_nodes(tokens, root=None):
if root is None:
root = {}
st_child = False
sub_tokens = []
sub_sql_count = 0
for token in tokens:
if token.ttype in [Keyword, DDL, DML] or isinstance(token, Where) or isinstance(token, Values) \
or isinstance(token, Parenthesis):
if st_child:
root[child_key] = build_child_nodes(sub_tokens)
if isinstance(token, Where):
root['Where'] = []
for item in token.tokens:
if isinstance(item, Comparison):
root['Where'].append(item.value)
st_child = False
elif isinstance(token, Values):
root['Values'] = token.value
st_child = False
elif isinstance(token, Parenthesis):
sub_sql_count = sub_sql_count + 1
child_key = 'sub_sql' + str(sub_sql_count)
root[child_key] = build_child_nodes(token.tokens)
st_child = False
else:
st_child = True
sub_tokens = []
child_key = token.value.lower()
else:
sub_tokens.append(token)
if st_child:
root[child_key] = build_child_nodes(sub_tokens)
else:
root = token2value(tokens)
return root
def build_tree(sql):
parsed_sql = sqlparse.parse(sql)[0]
parse_tree = dict()
build_child_nodes(parsed_sql.tokens, parse_tree)
return parse_tree
def compare_two_tree(tree1, tree2, total_score, cur_score):
if isinstance(tree1, dict) and isinstance(tree2, dict):
for i in range(len(tree1)):
key = list(tree1.keys())[i]
if key in tree2.keys():
total_score[0] += cur_score
compare_two_tree(tree1[key], tree2[key], total_score, cur_score * ALPHA)
if isinstance(tree1, list) and isinstance(tree2, list):
for value in tree1:
if value in tree2:
total_score[0] += cur_score
def distance(sql1, sql2):
parse_tree1 = build_tree(sql1)
parse_tree2 = build_tree(sql2)
similarity = [0]
compare_two_tree(parse_tree1, parse_tree2, similarity, 1)
return similarity[0]

View File

@ -0,0 +1,50 @@
from gensim.models import word2vec
from preprocessing import templatize_sql
class Sentence(object):
def __init__(self, data):
self.data = data
def __iter__(self):
for sql, _ in self.data:
yield templatize_sql(sql).split()
class Word2Vector(object):
def __init__(self, max_len=150, **kwargs):
self.model = None
self.params = kwargs
self.max_len = max_len
def fit(self, sentence):
sentence = Sentence(sentence)
self.model = word2vec.Word2Vec(sentence, **self.params)
def update(self, sentence):
sentence = Sentence(sentence)
self.model.build_vocab(sentence, update=True)
self.model.train(sentence, total_examples=self.model.corpus_count, epochs=self.model.iter)
def str2vec(self, string):
vector = list()
string = templatize_sql(string)
for item in string.strip().split():
if item in self.model:
vector.extend(self.model[item])
else:
vector.extend([0.0] * self.params.get('size'))
if len(vector) >= self.max_len:
del vector[self.max_len:]
else:
vector.extend([0.0] * (self.max_len - len(vector)))
return vector
def save(self, filepath):
self.model.save(filepath)
def load(self, filepath):
self.model = word2vec.Word2Vec.load(filepath)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,68 @@
import os
import re
import sys
import argparse
from preprocessing import templatize_sql
from utils import DBAgent, check_time_legality
__description__ = "Get sql information based on wdr."
def parse_args():
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description=__description__)
parser.add_argument('--port', help="User of remote server.", type=int, required=True)
parser.add_argument('--start_time', help="Start time of query", required=True)
parser.add_argument('--finish_time', help="Finish time of query", required=True)
parser.add_argument('--save_path', default='sample_data/data.csv', help="Path to save result")
return parser.parse_args()
def mapper_function(value):
query = templatize_sql(value[0])
execution_time = float(value[1]) / 1000000
return (query, execution_time)
def wdr_features(start_time, end_time, port, database='postgres'):
sql = 'select query, execution_time from statement_history '
if start_time and end_time:
sql = "select query, execution_time from dbe_perf.get_global_slow_sql_by_timestamp" \
" (\'{start_time}\',\'{end_time}\')" \
.format(start_time=start_time, end_time=end_time)
with DBAgent(port=port, database=database) as db:
result = db.fetch_all_result(sql)
if result:
result = list(filter(lambda x: re.match(r'UPDATE|SELECT|DELETE|INSERT', x[0]) and x[1] != 0, result))
result = list(map(mapper_function, result))
return result
def save_csv(result, save_path):
if save_path:
save_path = os.path.realpath(save_path)
if not os.path.exists(os.path.dirname(save_path)):
os.makedirs(os.path.dirname(save_path))
with open(save_path, mode='w') as f:
for query, execution_time in result:
f.write(query + ',' + str(execution_time) + '\n')
if __name__ == '__main__':
args = parse_args()
start_time, finish_time = args.start_time, args.finish_time
port = args.port
save_path = args.save_path
if start_time and not check_time_legality(start_time):
print("error time format '{time}', using: {date_format}.".format(time=start_time,
date_format='%Y-%m-%d %H:%M:%S'))
sys.exit(-1)
if finish_time and not check_time_legality(finish_time):
print("error time format '{time}', using: {date_format}.".format(time=finish_time,
date_format='%Y-%m-%d %H:%M:%S'))
sys.exit(-1)
res = wdr_features(start_time, finish_time, port)
save_csv(res, save_path)

View File

@ -13,71 +13,63 @@ MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
"""
import argparse
import os
import stat
import logging
import sys
from configparser import ConfigParser
from src.file_processor import get_train_dataset, get_test_dataset
from src.sql_template import SqlTemplate
from src.w2vector import Word2Vector
from algorithm.diag import SQLDiag
from utils import ResultSaver
__version__ = '1.0.0'
w2v_model_name = 'w2v.model'
w2v_model_parameter = {'max_len': 300,
'sg': 1,
'hs': 1,
'min_count': 0,
'window': 1,
'size': 5,
'iter': 50,
'workers': 4
}
def train(template_dir, data):
w2v_model_path = os.path.join(template_dir, w2v_model_name)
w2v = Word2Vector(**w2v_model_parameter)
if not os.path.exists(w2v_model_path):
w2v.train(sentence=data)
else:
w2v.load_model(w2v_model_path)
w2v.update(sentence=data)
w2v.save_model(w2v_model_path)
sql_template = SqlTemplate(template_dir, word2vec_model=w2v)
sql_template.template = data
def predict(template_dir, data):
w2v_model_path = os.path.join(template_dir, w2v_model_name)
w2v = Word2Vector(**w2v_model_parameter)
w2v.load_model(w2v_model_path)
sql_template = SqlTemplate(template_dir, word2vec_model=w2v)
result = sql_template.predict_batch_exec_time(data)
return result
__version__ = '2.0.0'
__description__ = 'SQLdiag integrated by openGauss.'
def parse_args():
parser = argparse.ArgumentParser(description='sqldiag')
parser.add_argument('-m', '--mode', required=True, choices=['train', 'predict'])
parser.add_argument('-f', '--file', required=True)
parser = argparse.ArgumentParser(description=__description__)
parser.add_argument('mode', choices=['train', 'predict', 'finetune'],
help='The training mode is to perform feature extraction and '
'model training based on historical SQL statements. '
'The prediction mode is to predict the execution time of '
'a new SQL statement through the trained model.')
parser.add_argument('-f', '--csv-file', type=argparse.FileType('r'),
help='The data set for training or prediction. '
'The file format is CSV. '
'If it is two columns, the format is (SQL statement, duration time). '
'If it is three columns, '
'the format is (timestamp of SQL statement execution time, SQL statement, duration time).')
parser.add_argument('--predicted-file', help='The file path to save the predicted result.')
parser.add_argument('--model', default='template', choices=['template', 'dnn'],
help='Choose the model model to use.')
parser.add_argument('--model-path', required=True,
help='The storage path of the model file, used to read or save the model file.')
parser.add_argument('--config-file', default='sqldiag.conf')
parser.version = __version__
return parser.parse_args()
def get_config(filepath):
cp = ConfigParser()
cp.read(filepath, encoding='UTF-8')
return cp
def main(args):
mode = args.mode
filepath = args.file
template_dir = os.path.realpath('./template')
if not os.path.exists(template_dir):
os.makedirs(template_dir, mode=0o700)
if oct(os.stat(template_dir).st_mode)[-3:] != '700':
os.chmod(template_dir, stat.S_IRWXU)
if mode == 'train':
train_data = get_train_dataset(filepath)
train(template_dir, train_data)
if mode == 'predict':
predict_data = get_test_dataset(filepath)
result = predict(template_dir, predict_data)
print(result)
logging.basicConfig(level=logging.INFO)
model = SQLDiag(args.model, args.csv_file, get_config(args.config_file))
if args.mode == 'train':
model.fit()
model.save(args.model_path)
elif args.mode == 'predict':
if not args.predicted_file:
logging.error("The [--predicted-file] parameter is required for predict mode")
sys.exit(1)
model.load(args.model_path)
pred_result = model.transform()
ResultSaver().save(pred_result, args.predicted_file)
logging.info('predicted result in saved in {}'.format(args.predicted_file))
elif args.mode == 'finetune':
model.fine_tune(args.model_path)
model.save(args.model_path)
if __name__ == '__main__':

View File

@ -18,8 +18,6 @@ import sqlparse
from sqlparse.sql import Identifier, IdentifierList
from sqlparse.tokens import Keyword, DML
from .similarity import list_distance
# split flag in SQL
split_flag = ('!=', '<=', '>=', '==', '<', '>', '=', ',', '(', ')', '*', ';', '%', '+', ',', ';')
@ -81,7 +79,7 @@ EQUALS_FILTER = r'(= .*? |= .*$)'
LIMIT_DIGIT = r'LIMIT \d+'
def unify_sql(sql):
def _unify_sql(sql):
"""
function: unify sql format
"""
@ -106,11 +104,11 @@ def unify_sql(sql):
return sql.strip()
def sql_filter(sql):
def templatize_sql(sql):
"""
function: replace the message which is not important in sql
"""
sql = unify_sql(sql)
sql = _unify_sql(sql)
sql = re.sub(r';', r'', sql)
@ -148,7 +146,7 @@ def sql_filter(sql):
return sql
def check_select(parsed_sql):
def _is_select_clause(parsed_sql):
if not parsed_sql.is_group:
return False
for token in parsed_sql.tokens:
@ -157,26 +155,27 @@ def check_select(parsed_sql):
return False
def get_table_token_list(parsed_sql, token_list):
# todo: what is token list? from list?
def _get_table_token_list(parsed_sql, token_list):
flag = False
for token in parsed_sql.tokens:
if not flag:
if token.ttype is Keyword and token.value.upper() == 'FROM':
flag = True
else:
if check_select(token):
get_table_token_list(token, token_list)
if _is_select_clause(token):
_get_table_token_list(token, token_list)
elif token.ttype is Keyword:
return
else:
token_list.append(token)
def extract_table(sql):
def _extract_table(sql):
tables = []
table_token_list = []
sql_parsed = sqlparse.parse(sql)[0]
get_table_token_list(sql_parsed, table_token_list)
_get_table_token_list(sql_parsed, table_token_list)
for table_token in table_token_list:
if isinstance(table_token, Identifier):
tables.append(table_token.get_name())
@ -190,13 +189,13 @@ def extract_table(sql):
return tables
def get_sql_table_name(sql):
def _get_sql_table_name(sql):
"""
function: get table name in sql
has many problems in code, especially in 'delete', 'update', 'insert into' sql
"""
if sql.startswith('SELECT'):
tables = extract_table(sql)
tables = _extract_table(sql)
elif sql.startswith('DELETE'):
if 'WHERE' not in sql:
tables = re.findall(r'FROM\s+([^\s]*)[;\s ]?', sql)
@ -212,9 +211,9 @@ def get_sql_table_name(sql):
return tables
def get_table_column_name(sql):
def _get_table_column_name(sql):
remove_sign = (r'=', r'<', r'>')
tables = get_sql_table_name(sql)
tables = _get_sql_table_name(sql)
sql = re.sub(r'[?]', r'', sql)
sql = re.sub(r'[()]', r'', sql)
sql = re.sub(r'`', r'', sql)
@ -245,28 +244,50 @@ def get_sql_template(sql):
"""
function: derive skeleton of sql
"""
filtered_sql = sql_filter(sql)
sql_template = filtered_sql
tables, columns = get_table_column_name(sql_template)
if filtered_sql.startswith('INSERT INTO'):
fine_template = templatize_sql(sql)
rough_template = fine_template
tables, columns = _get_table_column_name(fine_template)
if rough_template.startswith('INSERT INTO'):
table = tables[0]
sql_template = re.sub(r'INTO ' + table + r' \(.*?\)', r'INTO tab ()', sql_template)
rough_template = re.sub(r'INTO ' + table + r' \(.*?\)', r'INTO tab ()', rough_template)
for table in tables:
sql_template = re.sub(r'(\s+{table}\.|\s+{table}\s+|\s+{table})'.format(table=table), r' tab ', sql_template)
rough_template = re.sub(r'(\s+{table}\.|\s+{table}\s+|\s+{table})'.format(table=table), r' tab ',
rough_template)
for column in columns:
if column in ['*', '.', '+', '?']:
continue
sql_template = re.sub(r'\s+' + column + r'\s+', r' col ', sql_template)
return filtered_sql, sql_template
rough_template = re.sub(r'\s+' + column + r'\s+', r' col ', rough_template)
return fine_template, rough_template
def sql_similarity(sql1, sql2):
"""
calculate similarity of sql
"""
if sql1.split()[0] != sql2.split()[0]:
return 0.0
similarity_of_sql = list_distance(sql1.split(), sql2.split())
return similarity_of_sql
class LoadData:
def __init__(self, csv_file):
self.csv_file = csv_file
def load_predict_file(self):
for line in self.csv_file:
line = line.strip()
if line:
yield line
def load_train_file(self):
for line in self.csv_file:
line = line.strip()
if not line or ',' not in line:
continue
last_delimater_pos = line.rindex(',')
if re.search(r'\d+(\.\d+)?$', line[last_delimater_pos + 1:]) is None:
continue
sql = line[:last_delimater_pos]
duration_time = float(line[last_delimater_pos + 1:])
yield sql, duration_time
def __getattr__(self, name):
if name not in ('train_data', 'predict_data'):
raise AttributeError('{} has no attribute {}.'.format(LoadData.__name__, name))
if name == 'train_data':
return self.load_train_file()
else:
return self.load_predict_file()

View File

@ -0,0 +1,7 @@
tensorflow==2.3.1
pandas
matplotlib
gensim==3.8.3
sqlparse
sklearn
numpy

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,31 @@
# -----------------------------
# SQLdiag configuration file
# -----------------------------
#
# This file consists of lines of the form:
#
# name = value
#
# Comments are introduced with "#" anywhere on a line.
#------------------------------------------------------------------------------
# Main Configurations
#------------------------------------------------------------------------------
[Master]
#------------------------------------------------------------------------------
# DNN Learning
#------------------------------------------------------------------------------
[dnn]
epoch = 30
max_limit = 115.46
min_limit = 6.78e-05
#------------------------------------------------------------------------------
# Template Methed
#------------------------------------------------------------------------------
[template]
similarity_algorithm =
time_list_size =
knn_number =

View File

@ -1,47 +0,0 @@
"""
Copyright (c) 2020 Huawei Technologies Co.,Ltd.
openGauss is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
"""
import os
def get_train_dataset(filepath):
"""
return all message from train file onetime
the content must be separated by '\t'
:return numpy array
"""
def line_processor(line):
first_delimater_pos = line.index(',')
exec_time = float(line[:first_delimater_pos])
sql = line[first_delimater_pos + 1:].strip().strip('"')
if sql in ('NULL',) or not sql:
return
return exec_time, sql
if not os.path.exists(filepath):
raise FileNotFoundError('%s not exists.' % filepath)
with open(filepath, mode='r') as f:
contents = f.readlines()
contents = tuple(filter(lambda item: item, list(map(line_processor, contents))))
return contents
def get_test_dataset(filepath):
if not os.path.exists(filepath):
raise FileNotFoundError('%s not exists.' % filepath)
with open(filepath, mode='r') as f:
contents = f.readlines()
contents = tuple(map(lambda item: item.strip().strip('"'), contents))
return contents

View File

@ -1,160 +0,0 @@
"""
Copyright (c) 2020 Huawei Technologies Co.,Ltd.
openGauss is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
"""
import sqlparse
from sqlparse.sql import IdentifierList, Identifier, Where, Comparison, Parenthesis, Values
from sqlparse.tokens import Keyword, DML, DDL
ALPHA = 0.8
def levenshtein_distance(str1, str2):
"""
func: caculate levenshten distance between two string
:param str1: string1
:param str2: string2
:return: distance
"""
m, n = len(str1) + 1, len(str2) + 1
matrix = [[0] * n for _ in range(m)]
matrix[0][0] = 0
for i in range(1, m):
matrix[i][0] = matrix[i - 1][0] + 1
for j in range(1, n):
matrix[0][j] = matrix[0][j - 1] + 1
for i in range(1, m):
for j in range(1, n):
if str1[i - 1] == str2[j - 1]:
matrix[i][j] = matrix[i - 1][j - 1]
else:
matrix[i][j] = min(matrix[i - 1][j - 1], matrix[i - 1][j], matrix[i][j - 1]) + 1
return matrix[m - 1][n - 1]
def dtw_distance(template1, template2):
distance_matrix = []
for i in template1:
row_dist = []
for j in template2:
row_dist.append(0 if i == j else 1)
distance_matrix.append(row_dist)
m, n = len(template1), len(template2)
dtw = [[0] * n for _ in range(m)]
for i in range(m):
for j in range(n):
if not i:
if not j:
dtw[i][j] = 2 * distance_matrix[i][j]
else:
dtw[i][j] = dtw[i][j - 1] + distance_matrix[i][j]
else:
if not j:
dtw[i][j] = dtw[i - 1][j] + distance_matrix[i][j]
else:
dtw[i][j] = min(dtw[i - 1][j - 1] + 2 * distance_matrix[i][j],
dtw[i - 1][j] + distance_matrix[i][j],
dtw[i][j - 1] + distance_matrix[i][j])
return dtw[-1][-1]
def list_distance(list1, list2):
distance = 0.0
sorted_list1 = sorted(list1)
sorted_list2 = sorted(list2)
max_len = max(len(sorted_list1), len(sorted_list2))
min_len = min(len(sorted_list1), len(sorted_list2))
short_list = sorted_list1 if len(sorted_list1) < len(sorted_list2) else sorted_list2
long_list = sorted_list1 if len(sorted_list1) > len(sorted_list2) else sorted_list2
for item in short_list:
if item in long_list:
distance += 1.0
length_similarity = float(min_len / max_len)
return distance + length_similarity
def token2value(tokens):
value_list = []
for token in tokens:
if isinstance(token, IdentifierList):
for item in token.tokens:
value_list.append(item.value)
elif isinstance(token, Identifier) or isinstance(token, Comparison) or isinstance(token, Values):
value_list.append(token.value)
return value_list
def build_child_nodes(tokens, root={}):
st_child = False
sub_tokens = []
sub_sql_count = 0
for token in tokens:
if token.ttype in [Keyword, DDL, DML] or isinstance(token, Where) or isinstance(token, Values) \
or isinstance(token, Parenthesis):
if st_child:
root[child_key] = build_child_nodes(sub_tokens)
if isinstance(token, Where):
root['Where'] = []
for item in token.tokens:
if isinstance(item, Comparison):
root['Where'].append(item.value)
st_child = False
elif isinstance(token, Values):
root['Values'] = token.value
st_child = False
elif isinstance(token, Parenthesis):
sub_sql_count = sub_sql_count + 1
child_key = 'sub_sql' + str(sub_sql_count)
root[child_key] = build_child_nodes(token.tokens)
st_child = False
else:
st_child = True
sub_tokens = []
child_key = token.value.lower()
else:
sub_tokens.append(token)
if st_child:
root[child_key] = build_child_nodes(sub_tokens)
else:
root = token2value(tokens)
return root
def build_tree(sql):
parsed_sql = sqlparse.parse(sql)[0]
parse_tree = dict()
build_child_nodes(parsed_sql.tokens, parse_tree)
return parse_tree
def compare_two_tree(tree1, tree2, total_score, cur_score):
if isinstance(tree1, dict) and isinstance(tree2, dict):
for i in range(len(tree1)):
key = list(tree1.keys())[i]
if key in tree2.keys():
total_score[0] += cur_score
compare_two_tree(tree1[key], tree2[key], total_score, cur_score * ALPHA)
if isinstance(tree1, list) and isinstance(tree2, list):
for value in tree1:
if value in tree2:
total_score[0] += cur_score
def parse_tree_distance(parse_tree1, parse_tree2):
similarity = [0]
compare_two_tree(parse_tree1, parse_tree2, similarity, 1)
return -similarity[0]

View File

@ -1,242 +0,0 @@
"""
Copyright (c) 2020 Huawei Technologies Co.,Ltd.
openGauss is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
"""
import json
import os
import pickle
import random
from collections import defaultdict
from functools import lru_cache, reduce
from sklearn.decomposition import PCA
from .sql_processor import sql_similarity, get_sql_template
TIME_LIST_SIZE = 10
KNN_NUMBER = 3
SAMPLE_NUMBER = 5
HISTORY_VECTORS_LIMIT = 3000
POINT_NUMBER = 3
class SqlTemplate:
def __init__(self, template_path, word2vec_model):
self._template_path = template_path
self._w2v_model = word2vec_model
self._hash_table = dict(INSERT=dict(), UPDATE=dict(), DELETE=dict(), SELECT=dict(), OTHER=dict())
self._pca_model = None
@property
def template(self):
return self._hash_table
@template.setter
def template(self, data):
history_vectors = list()
db_template_path = os.path.join(self._template_path, 'template.json')
pca_model_path = os.path.join(self._template_path, 'decomposition.pickle')
history_sql_vecs_path = os.path.join(self._template_path, 'history_sql_vectors')
if os.path.exists(db_template_path):
self.load_template(db_template_path)
if os.path.exists(history_sql_vecs_path):
history_vectors = self.load_history_vectors(history_sql_vecs_path)
sql_number = list(data)
if len(sql_number) <= 1:
self._pca_model = None
else:
self._pca_model = PCA(n_components=2)
self.worker(data, history_vectors)
self.save_template(db_template_path)
self.save_decompose_model(pca_model_path)
if len(history_vectors) > HISTORY_VECTORS_LIMIT:
del history_vectors[:-HISTORY_VECTORS_LIMIT]
self.save_history_vectors(history_sql_vecs_path, history_vectors)
self.predict_exec_time.cache_clear()
def worker(self, db_data, history_vectors):
for exec_time, sql in db_data:
filtered_sql, sql_template = get_sql_template(sql)
history_vectors.append(self._w2v_model.str2vec(filtered_sql))
sql_prefix = filtered_sql.split()[0]
if sql_prefix not in self._hash_table.keys():
sql_prefix = 'OTHER'
if sql_template in self._hash_table[sql_prefix]:
if filtered_sql in self._hash_table[sql_prefix][sql_template]['sql_info']:
self._hash_table[sql_prefix][sql_template]['sql_info'][filtered_sql]['time_list'].append(exec_time)
else:
self._hash_table[sql_prefix][sql_template]['sql_info'][filtered_sql] = dict()
self._hash_table[sql_prefix][sql_template]['sql_info'][filtered_sql]['time_list'] = []
self._hash_table[sql_prefix][sql_template]['sql_info'][filtered_sql]['time_list'].append(exec_time)
else:
self._hash_table[sql_prefix][sql_template] = dict()
self._hash_table[sql_prefix][sql_template]['sql_info'] = dict()
self._hash_table[sql_prefix][sql_template]['sql_info'][filtered_sql] = dict()
self._hash_table[sql_prefix][sql_template]['sql_info'][filtered_sql]['time_list'] = []
self._hash_table[sql_prefix][sql_template]['sql_info'][filtered_sql]['time_list'].append(exec_time)
if self._pca_model:
self._pca_model.fit(history_vectors)
template_id_index = 0
for sql_prefix, sql_prefix_info in self._hash_table.items():
for sql_template, sql_template_info in sql_prefix_info.items():
decomposition_str_vecs = []
sql_template_info['template_id'] = template_id_index
template_id_index += 1
for filtered_sql, filtered_sql_info in sql_template_info['sql_info'].items():
del filtered_sql_info['time_list'][:-TIME_LIST_SIZE]
filtered_sql_info['time'] = reduce(lambda x, y: (x + y) / 2, filtered_sql_info['time_list'])
filtered_sql_info['point'] = [0.0, 0.0] if self._pca_model is None else \
self._pca_model.transform([self._w2v_model.str2vec(filtered_sql)]).tolist()[0]
decomposition_str_vecs.append([filtered_sql_info['point']])
filtered_sql_info['mean_time'] = sum(filtered_sql_info['time_list']) / \
len(filtered_sql_info['time_list'])
sql_template_info['center'] = [sum([item[0][0] for item in decomposition_str_vecs]) / \
len(sql_template_info['sql_info']),
sum([item[0][1] for item in decomposition_str_vecs]) / \
len(sql_template_info['sql_info'])]
sql_template_info['mean_time'] = sum(
[value['mean_time'] for key, value in sql_template_info['sql_info'].items()]) / \
len(sql_template_info['sql_info'])
@lru_cache(maxsize=10240)
def predict_exec_time(self, sql):
if not sql.strip():
exec_time = 0.0
template_id = -1
point = [-100, -100]
return exec_time, template_id, point
else:
filtered_sql, sql_template = get_sql_template(sql)
sql_prefix = filtered_sql.split()[0]
if sql_prefix not in self._hash_table.keys():
sql_prefix = "OTHER"
if not self._hash_table[sql_prefix]:
exec_time = 0.0
template_id = -1
point = [0.0, 0.0] if self._pca_model is None else \
self._pca_model.transform([self._w2v_model.str2vec(filtered_sql)]).tolist()[0]
return exec_time, template_id, point
elif sql_template in self._hash_table[sql_prefix]:
if filtered_sql in self._hash_table[sql_prefix][sql_template]['sql_info']:
exec_time = self._hash_table[sql_prefix][sql_template]['sql_info'][filtered_sql]['time']
template_id = self._hash_table[sql_prefix][sql_template]['template_id']
point = self._hash_table[sql_prefix][sql_template]['sql_info'][filtered_sql]['point']
return exec_time, template_id, point
else:
similarity_result = defaultdict(list)
for sql_info in self._hash_table[sql_prefix][sql_template]['sql_info']:
exec_time = self._hash_table[sql_prefix][sql_template]['sql_info'][sql_info]['time']
similarity_of_sql = sql_similarity(filtered_sql, sql_info)
similarity_result[sql_info].extend([similarity_of_sql, exec_time])
topn_sql_info = sorted(similarity_result.items(), key=lambda x: x[1][0], reverse=True)[:KNN_NUMBER]
topn_exec_times = [item[1][1] for item in topn_sql_info]
similarity_of_sqls = [item[1][0] for item in topn_sql_info]
similarity_proportions = [item / sum(similarity_of_sqls) for item in similarity_of_sqls]
exec_time = reduce(lambda x, y: x + y,
map(lambda x, y: x * y, similarity_proportions, topn_exec_times))
template_id = self._hash_table[sql_prefix][sql_template]['template_id']
point = [0.0, 0.0] if self._pca_model is None else \
self._pca_model.transform([self._w2v_model.str2vec(filtered_sql)]).tolist()[0]
return exec_time, template_id, point
else:
most_similar_template = ''
similarity_value = -1
for template in self._hash_table[sql_prefix]:
if sql_similarity(sql_template, template) > similarity_value:
most_similar_template = template
exec_time = self._hash_table[sql_prefix][most_similar_template]['mean_time']
template_id = self._hash_table[sql_prefix][most_similar_template]['template_id']
point = [0.0, 0.0] if self._pca_model is None else \
self._pca_model.transform([self._w2v_model.str2vec(filtered_sql)]).tolist()[0]
return exec_time, template_id, point
def predict_batch_exec_time(self, sqls):
result = self._init_result()
db_template_path = os.path.join(self._template_path, 'template.json')
pca_model_path = os.path.join(self._template_path, 'decomposition.pickle')
if not os.path.exists(db_template_path):
result['data']['background'] = dict()
return result
else:
self.load_template(db_template_path)
if not sqls:
result['data']['background'] = self.get_template_background()
return result
if os.path.exists(pca_model_path):
self.load_decompose_model(pca_model_path)
else:
self._pca_model = None
for sql in sqls:
exec_time, template_id, point = self.predict_exec_time(sql)
result['data']['time'].append(exec_time)
result['data']['cluster'].append(str(template_id))
result['data']['points'].append(point)
result['data']['background'] = self.get_template_background()
return result
def get_template_background(self):
background = dict()
for sql_prefix, sql_prefix_info in self._hash_table.items():
for sql_template, sql_template_info in sql_prefix_info.items():
background[sql_template_info['template_id']] = dict()
if len(sql_template_info['sql_info']) > SAMPLE_NUMBER:
background[sql_template_info['template_id']]['stmts'] = \
random.sample(sql_template_info['sql_info'].keys(), SAMPLE_NUMBER)
else:
background[sql_template_info['template_id']]['stmts'] = list(sql_template_info['sql_info'].keys())
background[sql_template_info['template_id']]['center'] = sql_template_info['center']
background[sql_template_info['template_id']]['avg_time'] = sql_template_info['mean_time']
background[sql_template_info['template_id']]['points'] = \
[sql_template_info['sql_info'][sql]['point'] \
for sql in background[sql_template_info['template_id']]['stmts']]
return background
@staticmethod
def _init_result():
result = dict()
result['status'] = 'success'
result['data'] = dict()
result['data']['time'] = list()
result['data']['points'] = list()
result['data']['cluster'] = list()
return result
def save_template(self, template_path):
with open(template_path, mode='w') as f:
json.dump(self._hash_table, f, indent=4)
def load_template(self, template_path):
if os.path.exists(template_path):
with open(template_path, mode='r') as f:
self._hash_table = json.load(f)
def save_decompose_model(self, pca_path):
with open(pca_path, mode='wb') as f:
pickle.dump(self._pca_model, f)
def load_decompose_model(self, pca_path):
if os.path.exists(pca_path):
with open(pca_path, mode='rb') as f:
self._pca_model = pickle.load(f)
@staticmethod
def save_history_vectors(history_vectors_path, history_vectors):
with open(history_vectors_path, mode='wb') as f:
pickle.dump(history_vectors, f)
@staticmethod
def load_history_vectors(history_vectors_path):
with open(history_vectors_path, mode='rb') as f:
return pickle.load(f)

View File

@ -1,58 +0,0 @@
"""
Copyright (c) 2020 Huawei Technologies Co.,Ltd.
openGauss is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
"""
from gensim.models import word2vec
from .sql_processor import sql_filter
class MySentence(object):
def __init__(self, data):
self.data = data
def __iter__(self):
for _, sql in self.data:
yield sql_filter(sql).split()
class Word2Vector(object):
def __init__(self, max_len=150, **kwargs):
self.model = None
self.params = kwargs
self.max_len = max_len
def train(self, sentence):
sentence = MySentence(sentence)
self.model = word2vec.Word2Vec(sentence, **self.params)
def update(self, sentence):
sentence = MySentence(sentence)
self.model.build_vocab(sentence, update=True)
self.model.train(sentence, total_examples=self.model.corpus_count, epochs=self.model.iter)
def str2vec(self, string):
str_vec = list()
for item in string.strip().split():
if item in self.model:
str_vec.extend(self.model[item])
else:
str_vec.extend([0.0] * self.params.get('size'))
if len(str_vec) >= self.max_len:
del str_vec[self.max_len:]
else:
str_vec.extend([0.0] * (self.max_len - len(str_vec)))
return str_vec
def save_model(self, model_path):
self.model.save(model_path)
def load_model(self, model_path):
self.model = word2vec.Word2Vec.load(model_path)

View File

@ -0,0 +1,133 @@
import csv
import datetime
import os
import stat
from collections import OrderedDict
import pandas as pd
import psycopg2
class ResultSaver:
"""
This class is used for saving result, and now support 'list', 'dict', 'tuple'.
"""
def __init__(self):
pass
def save(self, data, path):
realpath = os.path.realpath(path)
dirname = os.path.dirname(realpath)
if not os.path.exists(dirname):
os.makedirs(dirname, mode=0o700)
if oct(os.stat(dirname).st_mode)[-3:] != '700':
os.chmod(dirname, stat.S_IRWXU)
if isinstance(data, (list, tuple)):
self.save_list(data, realpath)
elif isinstance(data, dict):
self.save_dict(data, path)
else:
raise TypeError("mode should be 'list', 'tuple' or 'dict', but input type is '{}'".format(str(type(data))))
@staticmethod
def save_list(data, path):
data = pd.DataFrame(data)
data.to_csv(path, index=False, sep=',', header=False, quoting=csv.QUOTE_NONE, escapechar='\"')
@staticmethod
def save_dict(data, path):
data = pd.DataFrame(data.items())
data.to_csv(path, index=False, sep=',', header=False, quoting=csv.QUOTE_NONE, escapechar='\"')
class DBAgent:
def __init__(self, port, host=None, user=None, password=None, database=None):
self.host = host
self.port = port
self.user = user
self.database = database
self.password = password
self.conn = None
self.cursor = None
self.connect()
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def connect(self):
self.conn = psycopg2.connect(host=self.host,
user=self.user,
passwd=self.password,
database=self.database,
port=self.port)
self.conn.set_client_encoding('latin9')
self.cursor = self.conn.cursor()
def fetch_all_result(self, sql):
try:
self.cursor.execute(sql)
result = list(self.cursor.fetchall())
return result
except Exception as e:
logging.getLogger('agent').warning(str(e))
def close(self):
self.cursor.close()
self.conn.close()
class LRUCache:
"""
LRU algorithm based on ordered dictionary
"""
def __init__(self, max_size=1000):
self.cache = OrderedDict()
self.max_size = max_size
def set(self, key, val):
if len(self.cache) == self.max_size:
self.cache.popitem(last=False)
self.cache[key] = val
else:
self.cache[key] = val
def __call__(self, func):
def wrapper(*args, **kwargs):
key = make_key(args, kwargs)
if key in self.cache:
return self.cache.get(key)
else:
value = func(*args, **kwargs)
self.set(key, value)
return value
return wrapper
def make_key(args, kwargs):
key = args
if kwargs:
for item in kwargs.items():
key += item
return str(key)
def check_illegal_sql(sql):
if not sql or sql.strip().split()[0].upper() not in (
'INSERT', 'SELECT', 'UPDATE', 'DELETE', 'CREATE', 'DROP', 'ALTER', 'GRANT', 'DENY', 'REVOKE'):
return True
return False
def check_time_legality(time_string):
try:
datetime.datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S")
return True
except ValueError:
return False

View File

@ -0,0 +1,2 @@
DROP FUNCTION IF EXISTS pg_catalog.gs_decrypt(IN decryptstr text, IN keystr text, IN type text,OUT decrypt_result_str text) CASCADE;
DROP FUNCTION IF EXISTS pg_catalog.gs_encrypt(IN encryptstr text, IN keystr text, IN type text,OUT encrypt_result_str text) CASCADE;

View File

@ -0,0 +1,2 @@
DROP FUNCTION IF EXISTS pg_catalog.gs_decrypt(IN decryptstr text, IN keystr text, IN type text,OUT decrypt_result_str text) CASCADE;
DROP FUNCTION IF EXISTS pg_catalog.gs_encrypt(IN encryptstr text, IN keystr text, IN type text,OUT encrypt_result_str text) CASCADE;

View File

@ -0,0 +1,8 @@
DROP FUNCTION IF EXISTS pg_catalog.gs_decrypt(IN decryptstr text, IN keystr text, IN type text,OUT decrypt_result_str text) CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 6322;
CREATE FUNCTION pg_catalog.gs_decrypt(IN decryptstr text, IN keystr text, IN type text, OUT decrypt_result_str text) RETURNS text LANGUAGE INTERNAL as 'gs_decrypt';
DROP FUNCTION IF EXISTS pg_catalog.gs_encrypt(IN encryptstr text, IN keystr text, IN type text,OUT decrypt_result_str text) CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 6323;
CREATE FUNCTION pg_catalog.gs_encrypt(IN encryptstr text, IN keystr text, IN type text, OUT encrypt_result_str text) RETURNS text LANGUAGE INTERNAL as 'gs_encrypt';

View File

@ -0,0 +1,8 @@
DROP FUNCTION IF EXISTS pg_catalog.gs_decrypt(IN decryptstr text, IN keystr text, IN type text,OUT decrypt_result_str text) CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 6322;
CREATE FUNCTION pg_catalog.gs_decrypt(IN decryptstr text, IN keystr text, IN type text, OUT decrypt_result_str text) RETURNS text LANGUAGE INTERNAL as 'gs_decrypt';
DROP FUNCTION IF EXISTS pg_catalog.gs_encrypt(IN encryptstr text, IN keystr text, IN type text,OUT decrypt_result_str text) CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 6323;
CREATE FUNCTION pg_catalog.gs_encrypt(IN encryptstr text, IN keystr text, IN type text, OUT encrypt_result_str text) RETURNS text LANGUAGE INTERNAL as 'gs_encrypt';