!1130 Update index advisor:Index recommendation function enhancement

Merge pull request !1130 from liuly/master
This commit is contained in:
opengauss-bot
2021-07-29 01:47:48 +00:00
committed by Gitee
5 changed files with 1947 additions and 221 deletions

View File

@ -0,0 +1,25 @@
[server]
app_name=
database=
port=
host=
user=
workload_user=
schema=
index_intervals=
max_index_num=
max_index_storage=
driver=
sql_amount=
max_generate_log_size=
statement=
log_min_duration_statement=
log_statement=
output_sql_file=
datanode=
pg_log_path=
ai_monitor_url=

View File

@ -1,89 +1,202 @@
import re
import os
import argparse
import json
import random
import time
from subprocess import Popen, PIPE
SQL_TYPE = ['select', 'delete', 'insert', 'update']
SQL_AMOUNT = 0
PLACEHOLDER = r'@@@'
SAMPLE_NUM = 5
SQL_PATTERN = [r'\((\s*(\d+(\.\d+)?\s*)[,]?)+\)', # match integer set in the IN collection
r'([^\\])\'((\')|(.*?([^\\])\'))', # match all content in single quotes
r'(([^<>]\s*=\s*)|([^<>]\s+))(\d+)(\.\d+)?'] # match single integer
def get_workload_template(templates, sqls):
for sql in sqls:
sql_template = sql
for pattern in SQL_PATTERN:
sql_template = re.sub(pattern, PLACEHOLDER, sql_template)
if sql_template not in templates:
templates[sql_template] = {}
templates[sql_template]['cnt'] = 0
templates[sql_template]['samples'] = []
templates[sql_template]['cnt'] += 1
# reservoir sampling
if len(templates[sql_template]['samples']) < SAMPLE_NUM:
if sql not in templates[sql_template]['samples']:
templates[sql_template]['samples'].append(sql)
else:
if random.randint(0, templates[sql_template]['cnt']) < SAMPLE_NUM:
templates[sql_template]['samples'][random.randint(0, SAMPLE_NUM - 1)] = sql
def output_valid_sql(sql):
if 'from pg_' in sql.lower():
is_quotation_valid = sql.count("'") % 2
if 'from pg_' in sql.lower() or ' join ' in sql.lower() or is_quotation_valid:
return ''
if any(tp in sql.lower() for tp in SQL_TYPE[1:]):
return sql if sql.endswith('; ') else sql + ';'
sql = re.sub(r'for\s+update[\s;]*$', '', sql, flags=re.I)
return sql.strip() if sql.endswith('; ') else sql + ';'
elif SQL_TYPE[0] in sql.lower() and 'from ' in sql.lower():
return sql if sql.endswith('; ') else sql + ';'
return sql.strip() if sql.endswith('; ') else sql + ';'
return ''
def extract_sql_from_log(log_path):
files = os.listdir(log_path)
for file in files:
file_path = log_path + "/" + file
if os.path.isfile(file_path) and re.search(r'.log$', file):
with open(file_path, mode='r') as f:
line = f.readline()
sql = ''
statement_flag = False
execute_flag = False
def get_parsed_sql(file, user, database, sql_amount, statement):
global SQL_AMOUNT
line = file.readline()
sql = ''
statement_flag = False
execute_flag = False
while line:
try:
# Identify statement scene
if re.search('statement: ', line, re.IGNORECASE):
statement_flag = True
if output_valid_sql(sql):
yield output_valid_sql(sql)
sql = re.search(r'statement: (.*)', line.strip(),
re.IGNORECASE).group(1) + ' '
line = f.readline()
while line:
if sql_amount and SQL_AMOUNT == sql_amount:
break
try:
# Identify statement scene
if re.search('statement: ', line.lower(), re.IGNORECASE) and statement:
if output_valid_sql(sql):
SQL_AMOUNT += 1
yield output_valid_sql(sql)
log_info = line.split(' ')
if (user and user not in log_info) or (
database and database not in log_info):
line = file.readline()
continue
statement_flag = True
sql = re.search(r'statement: (.*)', line.strip(), re.IGNORECASE).group(1) + ' '
line = file.readline()
# Identify execute statement scene
elif re.search(r'execute .*:', line, re.IGNORECASE):
if output_valid_sql(sql):
yield output_valid_sql(sql)
execute_flag = True
sql = re.search(r'execute .*: (.*)', line.strip(), re.IGNORECASE).group(1)
line = f.readline()
else:
if statement_flag:
if re.match(r'^\t', line):
sql += line.strip('\t\n')
else:
statement_flag = False
if output_valid_sql(sql):
yield output_valid_sql(sql)
sql = ''
if execute_flag and re.search(r'parameters: ', line, re.IGNORECASE):
execute_flag = False
param_list = re.search(r'parameters: (.*)', line.strip(),
re.IGNORECASE).group(1).split(', ')
param_list = list(param.split('=', 1) for param in param_list)
param_list.sort(key=lambda x: int(x[0].strip(' $')),
reverse=True)
for item in param_list:
if len(item[1].strip()) >= 256:
sql = sql.replace(item[0].strip(), "''")
else:
sql = sql.replace(item[0].strip(), item[1].strip())
yield output_valid_sql(sql)
sql = ''
line = f.readline()
except:
execute_flag = False
# Identify execute statement scene
elif re.search(r'execute .*:', line, re.IGNORECASE):
if output_valid_sql(sql):
SQL_AMOUNT += 1
yield output_valid_sql(sql)
log_info = line.split(' ')
if (user and user not in log_info) or (
database and database not in log_info):
line = file.readline()
continue
execute_flag = True
sql = re.search(r'execute .*: (.*)', line.strip(), re.IGNORECASE).group(1)
line = file.readline()
else:
if statement_flag:
if re.match(r'^\t', line):
sql += line.strip('\t\n')
else:
statement_flag = False
line = f.readline()
if output_valid_sql(sql):
SQL_AMOUNT += 1
yield output_valid_sql(sql)
sql = ''
if execute_flag:
execute_flag = False
if re.search(r'parameters: ', line, re.IGNORECASE):
param_list = re.search(r'parameters: (.*)', line.strip(),
re.IGNORECASE).group(1).split(', $')
param_list = list(param.split('=', 1) for param in param_list)
param_list.sort(key=lambda x: int(x[0].strip(' $')),
reverse=True)
for item in param_list:
if len(item[1].strip()) >= 256:
sql = sql.replace(item[0].strip() if re.match(r'\$', item[0]) else
('$' + item[0].strip()), "''")
else:
sql = sql.replace(item[0].strip() if re.match(r'\$', item[0]) else
('$' + item[0].strip()), item[1].strip())
if output_valid_sql(sql):
SQL_AMOUNT += 1
yield output_valid_sql(sql)
sql = ''
line = file.readline()
except:
execute_flag = False
statement_flag = False
line = file.readline()
def get_start_position(start_time, file_path):
while start_time:
cmd = 'head -n $(cat %s | grep -m 1 -n "^%s" | awk -F : \'{print $1}\') %s | wc -c' % \
(file_path, start_time, file_path)
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
std, err_msg = proc.communicate()
if proc.returncode == 0 and not err_msg:
return int(std)
elif len(start_time) > 13:
start_time = start_time[0: -3]
else:
break
return -1
def record_sql(valid_files, args, output_obj):
for ind, file in enumerate(valid_files):
if args.sql_amount and SQL_AMOUNT >= args.sql_amount:
break
file_path = os.path.join(args.l, file)
if os.path.isfile(file_path) and re.search(r'.log$', file):
start_position = 0
if ind == 0:
start_position = get_start_position(args.start_time, file_path)
if start_position == -1:
continue
with open(file_path, mode='r') as f:
f.seek(start_position, 0)
if isinstance(output_obj, dict):
get_workload_template(output_obj, get_parsed_sql(f, args.U, args.d,
args.sql_amount,
args.statement))
else:
for sql in get_parsed_sql(f, args.U, args.d, args.sql_amount, args.statement):
output_obj.write(sql + '\n')
def extract_sql_from_log(args):
files = os.listdir(args.l)
files = sorted(files, key=lambda x: os.path.getctime(os.path.join(args.l, x)), reverse=True)
valid_files = files
time_stamp = int(time.mktime(time.strptime(args.start_time, '%Y-%m-%d %H:%M:%S')))
if args.start_time:
valid_files = []
for file in files:
if os.path.getmtime(os.path.join(args.l, file)) < time_stamp:
break
valid_files.insert(0, file)
if args.json:
templates = {}
record_sql(valid_files, args, templates)
with open(args.f, 'w') as output_file:
json.dump(templates, output_file)
else:
with open(args.f, 'w') as output_file:
record_sql(valid_files, args, output_file)
def main():
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument("l", help="The path of the log file that needs to be parsed.")
arg_parser.add_argument("f", help="The output path of the extracted file.")
arg_parser.add_argument("-d", help="Name of database")
arg_parser.add_argument("-U", help="Username for database log-in")
arg_parser.add_argument("--start_time", help="Start time of extracted log")
arg_parser.add_argument("--sql_amount", help="The number of sql collected", type=int)
arg_parser.add_argument("--statement", action='store_true', help="Extract statement log type",
default=False)
arg_parser.add_argument("--json", action='store_true',
help="Whether the workload file format is json", default=False)
args = arg_parser.parse_args()
sqls = extract_sql_from_log(args.l)
with open(args.f, 'w') as file:
for sql in sqls:
file.write(sql + '\n')
if args.start_time:
time.strptime(args.start_time, '%Y-%m-%d %H:%M:%S')
if args.sql_amount and args.sql_amount <= 0:
raise argparse.ArgumentTypeError("%s is an invalid positive int value" % args.sql_amount)
extract_sql_from_log(args)
if __name__ == '__main__':

View File

@ -24,6 +24,7 @@ import subprocess
import time
import select
import logging
import json
ENABLE_MULTI_NODE = False
SAMPLE_NUM = 5
@ -31,15 +32,19 @@ MAX_INDEX_COLUMN_NUM = 5
MAX_INDEX_NUM = 10
MAX_INDEX_STORAGE = None
FULL_ARRANGEMENT_THRESHOLD = 20
NEGATIVE_RATIO_THRESHOLD = 0.2
BASE_CMD = ''
SHARP = '#'
SCHEMA = None
JSON_TYPE = False
BLANK = ' '
SQL_TYPE = ['select', 'delete', 'insert', 'update']
SQL_PATTERN = [r'([^\\])\'((\')|(.*?([^\\])\'))',
r'([^\\])"((")|(.*?([^\\])"))',
r'(\s*[<>]\s*=*\s*\d+)',
r'(\'\d+\\.*?\')']
SQL_TYPE = ['select ', 'delete ', 'insert ', 'update ']
SQL_PATTERN = [r'\((\s*(\d+(\.\d+)?\s*)[,]?)+\)', # match integer set in the IN collection
r'([^\\])\'((\')|(.*?([^\\])\'))', # match all content in single quotes
r'(([^<>]\s*=\s*)|([^<>]\s+))(\d+)(\.\d+)?'] # match single integer
SQL_DISPLAY_PATTERN = [r'\((\s*(\d+(\.\d+)?\s*)[,]?)+\)', # match integer set in the IN collection
r'\'((\')|(.*?\'))', # match all content in single quotes
r'([^\_\d])\d+(\.\d+)?'] # match single integer
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
@ -79,11 +84,31 @@ class QueryItem:
class IndexItem:
def __init__(self, tbl, cols):
def __init__(self, tbl, cols, positive_pos=None):
self.table = tbl
self.columns = cols
self.atomic_pos = 0
self.benefit = 0
self.storage = 0
self.positive_pos = positive_pos
self.ineffective_pos = []
self.negative_pos = []
self.total_sql_num = 0
self.insert_sql_num = 0
self.update_sql_num = 0
self.delete_sql_num = 0
self.select_sql_num = 0
class IndexInfo:
def __init__(self, schema, table, indexname, columns, indexdef):
self.schema = schema
self.table = table
self.indexname = indexname
self.columns = columns
self.indexdef = indexdef
self.primary_key = False
self.redundant_obj = []
def run_shell_cmd(target_sql_list):
@ -128,6 +153,222 @@ def print_header_boundary(header):
print(green(title))
def filter_low_benefit(pos_list, candidate_indexes, multi_iter_mode, workload):
remove_list = []
for key, index in enumerate(candidate_indexes):
sql_optimzed = 0
for ind, pos in enumerate(index.positive_pos):
if multi_iter_mode:
cost_list_pos = index.atomic_pos
else:
cost_list_pos = pos_list[key] + 1
sql_optimzed += 1 - workload[pos].cost_list[cost_list_pos] / workload[pos].cost_list[0]
negative_ratio = (index.insert_sql_num + index.delete_sql_num + index.update_sql_num) / \
index.total_sql_num
# filter the candidate indexes that do not meet the conditions of optimization
if sql_optimzed / len(index.positive_pos) < 0.1:
remove_list.append(key)
elif sql_optimzed / len(index.positive_pos) < NEGATIVE_RATIO_THRESHOLD < negative_ratio:
remove_list.append(key)
for item in sorted(remove_list, reverse=True):
candidate_indexes.pop(item)
def display_recommend_result(workload, candidate_indexes, index_cost_total,
multi_iter_mode, display_info):
cnt = 0
index_current_storage = 0
pos_list = []
if not multi_iter_mode:
pos_list = [item[0] for item in candidate_indexes]
candidate_indexes = [item[1] for item in candidate_indexes]
# filter candidate indexes with low benefit
filter_low_benefit(pos_list, candidate_indexes, multi_iter_mode, workload)
# display determine result
for key, index in enumerate(candidate_indexes):
if MAX_INDEX_STORAGE and (index_current_storage + index.storage) > MAX_INDEX_STORAGE:
continue
if MAX_INDEX_NUM and cnt == MAX_INDEX_NUM:
break
index_current_storage += index.storage
table_name = index.table.split('.')[-1]
index_name = 'idx_' + table_name + '_' + '_'.join(index.columns.split(', '))
statement = 'CREATE INDEX ' + index_name + ' ON ' + index.table + '(' + index.columns + ');'
print(statement)
cnt += 1
if multi_iter_mode:
cost_list_pos = index.atomic_pos
else:
cost_list_pos = pos_list[key] + 1
sql_info = {'sqlDetails': []}
benefit_types = [index.ineffective_pos, index.positive_pos, index.negative_pos]
for category, benefit_type in enumerate(benefit_types):
sql_count = 0
for item in benefit_type:
sql_count += workload[item].frequency
for ind, pos in enumerate(benefit_type):
sql_detail = {}
sql_template = workload[pos].statement
for pattern in SQL_DISPLAY_PATTERN:
sql_template = re.sub(pattern, '?', sql_template)
sql_detail['sqlTemplate'] = sql_template
sql_detail['sql'] = workload[pos].statement
sql_detail['sqlCount'] = sql_count
if category == 1:
sql_optimzed = (workload[pos].cost_list[0] -
workload[pos].cost_list[cost_list_pos]) / \
workload[pos].cost_list[cost_list_pos]
sql_detail['optimized'] = '%.3f' % sql_optimzed
sql_detail['correlationType'] = category
sql_info['sqlDetails'].append(sql_detail)
workload_optimized = (1 - index_cost_total[cost_list_pos] / index_cost_total[0]) * 100
sql_info['workloadOptimized'] = '%.2f' % (workload_optimized if workload_optimized > 1 else 1)
sql_info['schemaName'] = SCHEMA
sql_info['tbName'] = table_name
sql_info['columns'] = index.columns
sql_info['statement'] = statement
sql_info['dmlCount'] = round(index.total_sql_num)
sql_info['selectRatio'] = round(index.select_sql_num * 100 / index.total_sql_num, 2)
sql_info['insertRatio'] = round(index.insert_sql_num * 100 / index.total_sql_num, 2)
sql_info['deleteRatio'] = round(index.delete_sql_num * 100 / index.total_sql_num, 2)
sql_info['updateRatio'] = round(100 - sql_info['selectRatio'] - sql_info['insertRatio']
- sql_info['deleteRatio'], 2)
display_info['recommendIndexes'].append(sql_info)
return display_info
def record_redundant_indexes(cur_table_indexes, redundant_indexes):
cur_table_indexes = sorted(cur_table_indexes,
key=lambda index_obj: len(index_obj.columns.split(',')))
# record redundant indexes
has_restore = []
for pos, index in enumerate(cur_table_indexes[:-1]):
is_redundant = False
for candidate_index in cur_table_indexes[pos + 1:]:
if 'UNIQUE INDEX' in index.indexdef:
# ensure that UNIQUE INDEX will not become redundant compared to normal index
if 'UNIQUE INDEX' not in candidate_index.indexdef:
continue
# ensure redundant index not is pkey
elif index.primary_key:
if re.match(r'%s' % candidate_index.columns, index.columns):
candidate_index.redundant_obj.append(index)
redundant_indexes.append(candidate_index)
has_restore.append(candidate_index)
continue
if re.match(r'%s' % index.columns, candidate_index.columns):
is_redundant = True
index.redundant_obj.append(candidate_index)
if is_redundant and index not in has_restore:
redundant_indexes.append(index)
def check_useless_index(tables):
whole_indexes = list()
redundant_indexes = list()
if not tables:
return whole_indexes, redundant_indexes
tables_string = ','.join(["'%s'" % table for table in tables[SCHEMA]])
sql = "SELECT c.relname AS tablename, i.relname AS indexname, " \
"pg_get_indexdef(i.oid) AS indexdef, p.contype AS pkey from " \
"pg_index x JOIN pg_class c ON c.oid = x.indrelid JOIN " \
"pg_class i ON i.oid = x.indexrelid LEFT JOIN pg_namespace n " \
"ON n.oid = c.relnamespace LEFT JOIN pg_constraint p ON i.oid = p.conindid" \
"WHERE (c.relkind = ANY (ARRAY['r'::\"char\", 'm'::\"char\"])) AND " \
"(i.relkind = ANY (ARRAY['i'::\"char\", 'I'::\"char\"])) AND " \
"n.nspname = '%s' AND c.relname in (%s) order by c.relname;" % (SCHEMA, tables_string)
res = run_shell_cmd([sql]).split('\n')
if res:
cur_table_indexes = list()
for line in res:
if 'tablename' in line or re.match(r'-+', line):
continue
elif re.match(r'\(\d+ rows?\)', line):
continue
elif '|' in line:
table, index, indexdef, pkey = [item.strip() for item in line.split('|')]
cur_columns = re.search(r'\(([^\(\)]*)\)', indexdef).group(1)
cur_index_obj = IndexInfo(SCHEMA, table, index, cur_columns, indexdef)
if pkey:
cur_index_obj.primary_key = True
whole_indexes.append(cur_index_obj)
if cur_table_indexes and cur_table_indexes[-1].table != table:
record_redundant_indexes(cur_table_indexes, redundant_indexes)
cur_table_indexes = []
cur_table_indexes.append(cur_index_obj)
if cur_table_indexes:
record_redundant_indexes(cur_table_indexes, redundant_indexes)
return whole_indexes, redundant_indexes
def get_whole_index(tables, detail_info):
whole_index, redundant_indexes = check_useless_index(tables)
print_header_boundary(" Created indexes ")
detail_info['createdIndexes'] = []
if not whole_index:
print("No created index!")
else:
for index in whole_index:
index_info = {'schemaName': index.schema, 'tbName': index.table,
'columns': index.columns, 'statement': index.indexdef + ';'}
detail_info['createdIndexes'].append(index_info)
print("%s;" % index.indexdef)
return whole_index, redundant_indexes
def check_unused_index_workload(whole_indexes, redundant_indexes, workload_indexes, detail_info):
indexes_name = set(index.indexname for index in whole_indexes)
unused_index = list(indexes_name.difference(workload_indexes))
remove_list = []
print_header_boundary(" Current workload useless indexes ")
if not unused_index:
print("No useless index!")
detail_info['uselessIndexes'] = []
# useless index
unused_index_columns = dict()
for cur_index in unused_index:
for index in whole_indexes:
if cur_index == index.indexname:
unused_index_columns[cur_index] = index.columns
if 'UNIQUE INDEX' not in index.indexdef:
statement = "DROP INDEX %s;" % index.indexname
print(statement)
useless_index = {"schemaName": index.schema, "tbName": index.table, "type": 1,
"columns": index.columns, "statement": statement}
detail_info['uselessIndexes'].append(useless_index)
print_header_boundary(" Redundant indexes ")
# filter redundant index
for pos, index in enumerate(redundant_indexes):
is_redundant = False
for redundant_obj in index.redundant_obj:
# redundant objects are not in the useless index set or
# equal to the column value in the useless index must be redundant index
index_exist = redundant_obj.indexname not in unused_index_columns.keys() or \
(unused_index_columns.get(redundant_obj.indexname) and
redundant_obj.columns == unused_index_columns[redundant_obj.indexname])
if index_exist:
is_redundant = True
if not is_redundant:
remove_list.append(pos)
for item in sorted(remove_list, reverse=True):
redundant_indexes.pop(item)
if not redundant_indexes:
print("No redundant index!")
# redundant index
for index in redundant_indexes:
statement = "DROP INDEX %s;" % index.indexname
print(statement)
existing_index = [item.indexname + ':' + item.columns for item in index.redundant_obj]
redundant_index = {"schemaName": index.schema, "tbName": index.table, "type": 2,
"columns": index.columns, "statement": statement,
"existingIndex": existing_index}
detail_info['uselessIndexes'].append(redundant_index)
def load_workload(file_path):
wd_dict = {}
workload = []
@ -176,14 +417,20 @@ def get_workload_template(workload):
def workload_compression(input_path):
compressed_workload = []
workload = load_workload(input_path)
templates = get_workload_template(workload)
total_num = 0
if JSON_TYPE:
with open(input_path, 'r') as file:
templates = json.load(file)
else:
workload = load_workload(input_path)
templates = get_workload_template(workload)
for _, elem in templates.items():
for sql in elem['samples']:
compressed_workload.append(QueryItem(sql, elem['cnt'] / SAMPLE_NUM))
return compressed_workload
compressed_workload.append(QueryItem(sql.strip('\n'),
elem['cnt'] / len(elem['samples'])))
total_num += elem['cnt']
return compressed_workload, total_num
# parse the explain plan to get estimated cost by database optimizer
@ -213,15 +460,19 @@ def update_index_storage(res, index_config, hypo_index_num):
index_config[hypo_index_num].storage = float(item.strip()) / 1024 / 1024
def estimate_workload_cost_file(workload, index_config=None):
def estimate_workload_cost_file(workload, index_config=None, ori_indexes_name=None):
total_cost = 0
sql_file = str(time.time()) + '.sql'
found_plan = False
hypo_index = False
is_computed = False
select_sql_pos = []
with open(sql_file, 'w') as file:
if SCHEMA:
file.write('SET current_schema = %s;\n' % SCHEMA)
if index_config:
if len(index_config) == 1 and index_config[0].positive_pos:
is_computed = True
# create hypo-indexes
file.write('SET enable_hypo_index = on;\n')
for index in index_config:
@ -230,8 +481,16 @@ def estimate_workload_cost_file(workload, index_config=None):
if ENABLE_MULTI_NODE:
file.write('set enable_fast_query_shipping = off;\n')
file.write("set explain_perf_mode = 'normal'; \n")
for query in workload:
file.write('EXPLAIN ' + query.statement + ';\n')
for ind, query in enumerate(workload):
if 'select ' not in query.statement.lower():
workload[ind].cost_list.append(0)
else:
file.write('EXPLAIN ' + query.statement + ';\n')
select_sql_pos.append(ind)
# record ineffective sql and negative sql for candidate indexes
if is_computed:
record_ineffective_negative_sql(index_config[0], query, ind)
result = run_shell_sql_cmd(sql_file).split('\n')
if os.path.exists(sql_file):
@ -244,15 +503,18 @@ def estimate_workload_cost_file(workload, index_config=None):
if 'QUERY PLAN' in line:
found_plan = True
if 'ERROR' in line:
workload.pop(i)
if i >= len(select_sql_pos):
raise ValueError("The size of workload is not correct!")
workload[select_sql_pos[i]].cost_list.append(0)
i += 1
if 'hypopg_create_index' in line:
hypo_index = True
if found_plan and '(cost=' in line:
if i >= len(workload):
if i >= len(select_sql_pos):
raise ValueError("The size of workload is not correct!")
query_cost = parse_explain_plan(line)
query_cost *= workload[i].frequency
workload[i].cost_list.append(query_cost)
query_cost *= workload[select_sql_pos[i]].frequency
workload[select_sql_pos[i]].cost_list.append(query_cost)
total_cost += query_cost
found_plan = False
i += 1
@ -261,8 +523,15 @@ def estimate_workload_cost_file(workload, index_config=None):
hypo_index = False
update_index_storage(line, index_config, hypo_index_num)
hypo_index_num += 1
while i < len(workload):
workload[i].cost_list.append(0)
if 'Index' in line and 'Scan' in line and not index_config:
ind1, ind2 = re.search(r'Index.*Scan(.*)on ([^\s]+)',
line.strip(), re.IGNORECASE).groups()
if ind1.strip():
ori_indexes_name.add(ind1.strip().split(' ')[1])
else:
ori_indexes_name.add(ind2)
while i < len(select_sql_pos):
workload[select_sql_pos[i]].cost_list.append(0)
i += 1
if index_config:
run_shell_cmd(['SELECT hypopg_reset_index();'])
@ -282,11 +551,13 @@ def make_single_advisor_sql(ori_sql):
return sql
def parse_single_advisor_result(res):
def parse_single_advisor_result(res, workload_table_name):
table_index_dict = {}
if len(res) > 2 and res[0:2] == ' (':
items = res.split(',', 1)
table = items[0][2:]
workload_table_name[SCHEMA] = workload_table_name.get(SCHEMA, set())
workload_table_name[SCHEMA].add(table)
indexes = re.split('[()]', items[1][:-1].strip('\"'))
for columns in indexes:
if columns == '':
@ -299,7 +570,7 @@ def parse_single_advisor_result(res):
# call the single-index-advisor in the database
def query_index_advisor(query):
def query_index_advisor(query, workload_table_name):
table_index_dict = {}
if 'select' not in query.lower():
@ -309,7 +580,7 @@ def query_index_advisor(query):
result = run_shell_cmd([sql]).split('\n')
for res in result:
table_index_dict.update(parse_single_advisor_result(res))
table_index_dict.update(parse_single_advisor_result(res, workload_table_name))
return table_index_dict
@ -329,30 +600,37 @@ def query_index_check(query, query_index_dict):
if columns != '':
sql_list.append("SELECT hypopg_create_index('CREATE INDEX ON %s(%s)')" %
(table, columns))
sql_list.append('SELECT hypopg_display_index()')
sql_list.append("set explain_perf_mode = 'normal'; explain " + query)
sql_list.append('SELECT hypopg_reset_index()')
result = run_shell_cmd(sql_list).split('\n')
# parse the result of explain plan
hypoid_table_column = {}
hypo_display = False
for line in result:
hypo_index = ''
if hypo_display and 'btree' in line:
hypo_index_info = line.split(',', 3)
if len(hypo_index_info) == 4:
hypoid_table_column[hypo_index_info[1]] = \
hypo_index_info[2] + ':' + hypo_index_info[3].strip('"()')
if hypo_display and re.search(r'\d+ rows', line):
hypo_display = False
if 'hypopg_display_index' in line:
hypo_display = True
if 'Index' in line and 'Scan' in line and 'btree' in line:
tokens = line.split(' ')
for token in tokens:
if 'btree' in token:
hypo_index = token.split('_', 1)[1]
if len(hypo_index) > 1:
for table in query_index_dict.keys():
for columns in query_index_dict[table]:
index_name_list = columns.split(',')
index_name_list.insert(0, table)
index_name = "_".join(index_name_list)
if index_name != hypo_index:
continue
if table not in valid_indexes.keys():
valid_indexes[table] = []
if columns not in valid_indexes[table]:
valid_indexes[table].append(columns)
hypo_index_id = re.search(r'\d+', token.split('_', 1)[0]).group()
table_columns = hypoid_table_column.get(hypo_index_id)
if not table_columns:
continue
table_name, columns = table_columns.split(':')
if table_name not in valid_indexes.keys():
valid_indexes[table_name] = []
if columns not in valid_indexes[table_name]:
valid_indexes[table_name].append(columns)
return valid_indexes
@ -373,13 +651,13 @@ def get_indexable_columns(table_index_dict):
return query_indexable_columns
def generate_candidate_indexes(workload, iterate=False):
def generate_candidate_indexes(workload, workload_table_name):
candidate_indexes = []
index_dict = {}
for k, query in enumerate(workload):
table_index_dict = query_index_advisor(query.statement)
if iterate:
if 'select ' in query.statement.lower():
table_index_dict = query_index_advisor(query.statement, workload_table_name)
need_check = False
query_indexable_columns = get_indexable_columns(table_index_dict)
valid_index_dict = query_index_check(query.statement, query_indexable_columns)
@ -397,56 +675,24 @@ def generate_candidate_indexes(workload, iterate=False):
need_check = False
else:
break
else:
valid_index_dict = query_index_check(query.statement, table_index_dict)
# filter duplicate indexes
for table in valid_index_dict.keys():
if table not in index_dict.keys():
index_dict[table] = set()
for columns in valid_index_dict[table]:
if len(workload[k].valid_index_list) >= FULL_ARRANGEMENT_THRESHOLD:
break
workload[k].valid_index_list.append(IndexItem(table, columns))
if columns not in index_dict[table]:
print("table: ", table, "columns: ", columns)
index_dict[table].add(columns)
candidate_indexes.append(IndexItem(table, columns))
return candidate_indexes
def generate_candidate_indexes_file(workload):
candidate_indexes = []
index_dict = {}
sql_file = str(time.time()) + '.sql'
if len(workload) > 0:
run_shell_cmd([workload[0].statement])
with open(sql_file, 'w') as file:
if SCHEMA:
file.write('SET current_schema = %s;\n' % SCHEMA)
for query in workload:
if 'select' in query.statement.lower():
file.write(make_single_advisor_sql(query.statement) + '\n')
result = run_shell_sql_cmd(sql_file).split('\n')
if os.path.exists(sql_file):
os.remove(sql_file)
for line in result:
table_index_dict = parse_single_advisor_result(line.strip('\n'))
# filter duplicate indexes
for table, columns in table_index_dict.items():
if table not in index_dict.keys():
index_dict[table] = set()
for column in columns:
if column == "":
continue
if column not in index_dict[table] and not re.match(r'\s*,\s*$', column):
print("table: ", table, "columns: ", column)
index_dict[table].add(column)
candidate_indexes.append(IndexItem(table, column))
# filter duplicate indexes
for table in valid_index_dict.keys():
if table not in index_dict.keys():
index_dict[table] = {}
for columns in valid_index_dict[table]:
if len(workload[k].valid_index_list) >= FULL_ARRANGEMENT_THRESHOLD:
break
workload[k].valid_index_list.append(IndexItem(table, columns))
if not any(re.match(r'%s' % columns, item) for item in index_dict[table]):
column_sql = {columns: [k]}
index_dict[table].update(column_sql)
elif columns in index_dict[table].keys():
index_dict[table][columns].append(k)
for table, column_sqls in index_dict.items():
for column, sql in column_sqls.items():
print("table: ", table, "columns: ", column)
candidate_indexes.append(IndexItem(table, column, sql))
return candidate_indexes
@ -517,6 +763,10 @@ def find_subsets_num(config, atomic_config_total):
for i, atomic_config in enumerate(atomic_config_total):
if len(atomic_config) > len(config):
continue
# Record the atomic index position of the newly added index
if len(atomic_config) == 1 and atomic_config[0].table == config[-1].table and \
atomic_config[0].columns == config[-1].columns:
cur_index_atomic_pos = i
for atomic_index in atomic_config:
is_exist = False
for index in config:
@ -529,7 +779,7 @@ def find_subsets_num(config, atomic_config_total):
if is_exist:
atomic_subsets_num.append(i)
return atomic_subsets_num
return atomic_subsets_num, cur_index_atomic_pos
def get_index_num(index, atomic_config_total):
@ -541,14 +791,39 @@ def get_index_num(index, atomic_config_total):
return -1
def record_ineffective_negative_sql(candidate_index, obj, ind):
cur_table = candidate_index.table + ' '
if cur_table in obj.statement:
candidate_index.total_sql_num += obj.frequency
if 'insert ' in obj.statement.lower():
candidate_index.insert_sql_num += obj.frequency
candidate_index.negative_pos.append(ind)
elif 'delete ' in obj.statement.lower():
candidate_index.delete_sql_num += obj.frequency
candidate_index.negative_pos.append(ind)
elif 'update ' in obj.statement.lower():
candidate_index.update_sql_num += obj.frequency
if any(column in obj.statement.lower().split('where ', 1)[0] for column in
candidate_index.columns.split(',')):
candidate_index.negative_pos.append(ind)
else:
candidate_index.ineffective_pos.append(ind)
else:
candidate_index.select_sql_num += obj.frequency
if ind not in candidate_index.positive_pos and \
any(column in obj.statement.lower() for column in candidate_index.columns):
candidate_index.ineffective_pos.append(ind)
# infer the total cost of workload for a config according to the cost of atomic configs
def infer_workload_cost(workload, config, atomic_config_total):
total_cost = 0
atomic_subsets_num = find_subsets_num(config, atomic_config_total)
is_computed = False
atomic_subsets_num, cur_index_atomic_pos = find_subsets_num(config, atomic_config_total)
if len(atomic_subsets_num) == 0:
raise ValueError("No atomic configs found for current config!")
if not config[-1].total_sql_num:
is_computed = True
for ind, obj in enumerate(workload):
if max(atomic_subsets_num) >= len(obj.cost_list):
raise ValueError("Wrong atomic config for current query!")
@ -559,63 +834,39 @@ def infer_workload_cost(workload, config, atomic_config_total):
min_cost = obj.cost_list[num]
total_cost += min_cost
# compute the cost for updating indexes
if 'insert' in obj.statement.lower() or 'delete' in obj.statement.lower():
for index in config:
index_num = get_index_num(index, atomic_config_total)
if index_num == -1:
raise ValueError("The index isn't found for current query!")
if 0 <= index_num < len(workload[ind].cost_list):
total_cost += obj.cost_list[index_num] - obj.cost_list[0]
return total_cost
# record ineffective sql and negative sql for candidate indexes
if is_computed:
record_ineffective_negative_sql(config[-1], obj, ind)
return total_cost, cur_index_atomic_pos
def simple_index_advisor(input_path, max_index_num):
workload = workload_compression(input_path)
workload, workload_count = workload_compression(input_path)
print_header_boundary(" Generate candidate indexes ")
candidate_indexes = generate_candidate_indexes_file(workload)
ori_indexes_name = set()
workload_table_name = dict()
display_info = {'workloadCount': workload_count, 'recommendIndexes': []}
candidate_indexes = generate_candidate_indexes(workload, workload_table_name)
if len(candidate_indexes) == 0:
print("No candidate indexes generated!")
return
estimate_workload_cost_file(workload, ori_indexes_name=ori_indexes_name)
return ori_indexes_name, workload_table_name, display_info
print_header_boundary(" Determine optimal indexes ")
ori_total_cost = estimate_workload_cost_file(workload)
ori_total_cost = estimate_workload_cost_file(workload, ori_indexes_name=ori_indexes_name)
index_cost_total = [ori_total_cost]
for _, obj in enumerate(candidate_indexes):
new_total_cost = estimate_workload_cost_file(workload, [obj])
index_cost_total.append(new_total_cost)
obj.benefit = ori_total_cost - new_total_cost
candidate_indexes = sorted(candidate_indexes, key=lambda item: item.benefit, reverse=True)
candidate_indexes = sorted(enumerate(candidate_indexes),
key=lambda item: item[1].benefit, reverse=True)
candidate_indexes = [item for item in candidate_indexes if item[1].benefit > 0]
global MAX_INDEX_NUM
MAX_INDEX_NUM = max_index_num
# filter out duplicate index
final_index_set = {}
final_index_list = []
for index in candidate_indexes:
picked = True
cols = set(index.columns.split(','))
if index.table not in final_index_set.keys():
final_index_set[index.table] = []
for i in range(len(final_index_set[index.table]) - 1, -1, -1):
pre_index = final_index_set[index.table][i]
pre_cols = set(pre_index.columns.split(','))
if len(pre_cols.difference(cols)) == 0 and len(pre_cols) < len(cols):
final_index_set[index.table].pop(i)
if len(cols.difference(pre_cols)) == 0:
picked = False
break
if picked and index.benefit > 0:
final_index_set[index.table].append(index)
[final_index_list.extend(item) for item in list(final_index_set.values())]
final_index_list = sorted(final_index_list, key=lambda item: item.benefit, reverse=True)
cnt = 0
index_current_storage = 0
for index in final_index_list:
if max_index_num and cnt == max_index_num:
break
if MAX_INDEX_STORAGE and (index_current_storage + index.storage) > MAX_INDEX_STORAGE:
continue
index_current_storage += index.storage
print("create index ind" + str(cnt) + " on " + index.table + "(" + index.columns + ");")
cnt += 1
display_recommend_result(workload, candidate_indexes, index_cost_total, False, display_info)
return ori_indexes_name, workload_table_name, display_info
def greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes):
@ -633,10 +884,12 @@ def greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes
continue
cur_config = copy.copy(opt_config)
cur_config.append(index)
cur_estimated_cost = infer_workload_cost(workload, cur_config, atomic_config_total)
cur_estimated_cost, cur_index_atomic_pos = \
infer_workload_cost(workload, cur_config, atomic_config_total)
if cur_estimated_cost < cur_min_cost:
cur_min_cost = cur_estimated_cost
cur_index = index
cur_index.atomic_pos = cur_index_atomic_pos
cur_index_num = k
if cur_index and cur_min_cost < min_cost:
if MAX_INDEX_STORAGE and sum([obj.storage for obj in opt_config]) + \
@ -654,33 +907,30 @@ def greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes
def complex_index_advisor(input_path):
workload = workload_compression(input_path)
workload, workload_count = workload_compression(input_path)
print_header_boundary(" Generate candidate indexes ")
candidate_indexes = generate_candidate_indexes(workload, True)
ori_indexes_name = set()
workload_table_name = dict()
display_info = {'workloadCount': workload_count, 'recommendIndexes': []}
candidate_indexes = generate_candidate_indexes(workload, workload_table_name)
if len(candidate_indexes) == 0:
print("No candidate indexes generated!")
return
estimate_workload_cost_file(workload, ori_indexes_name=ori_indexes_name)
return ori_indexes_name, workload_table_name, display_info
print_header_boundary(" Determine optimal indexes ")
atomic_config_total = generate_atomic_config(workload)
if len(atomic_config_total[0]) != 0:
if atomic_config_total and len(atomic_config_total[0]) != 0:
raise ValueError("The empty atomic config isn't generated!")
index_cost_total = []
for atomic_config in atomic_config_total:
estimate_workload_cost_file(workload, atomic_config)
index_cost_total.append(estimate_workload_cost_file(workload, atomic_config,
ori_indexes_name))
opt_config = greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes)
cnt = 0
index_current_storage = 0
for index in opt_config:
if MAX_INDEX_STORAGE and (index_current_storage + index.storage) > MAX_INDEX_STORAGE:
continue
if cnt == MAX_INDEX_NUM:
break
index_current_storage += index.storage
print("create index ind" + str(cnt) + " on " + index.table + "(" + index.columns + ");")
cnt += 1
display_recommend_result(workload, opt_config, index_cost_total, True, display_info)
return ori_indexes_name, workload_table_name, display_info
def main():
@ -691,7 +941,7 @@ def main():
arg_parser.add_argument("-U", help="Username for database log-in")
arg_parser.add_argument("-W", help="Password for database user", nargs="?", action=PwdAction)
arg_parser.add_argument("f", help="File containing workload queries (One query per line)")
arg_parser.add_argument("--schema", help="Schema name for the current business data")
arg_parser.add_argument("--schema", help="Schema name for the current business data", required=True)
arg_parser.add_argument("--max_index_num", help="Maximum number of suggested indexes", type=int)
arg_parser.add_argument("--max_index_storage",
help="Maximum storage of suggested indexes/MB", type=int)
@ -699,9 +949,13 @@ def main():
help="Whether to use multi-iteration algorithm", default=False)
arg_parser.add_argument("--multi_node", action='store_true',
help="Whether to support distributed scenarios", default=False)
arg_parser.add_argument("--json", action='store_true',
help="Whether the workload file format is json", default=False)
arg_parser.add_argument("--show_detail", action='store_true',
help="Whether to show detailed sql information", default=False)
args = arg_parser.parse_args()
global MAX_INDEX_NUM, BASE_CMD, ENABLE_MULTI_NODE, MAX_INDEX_STORAGE, SCHEMA
global MAX_INDEX_NUM, BASE_CMD, ENABLE_MULTI_NODE, MAX_INDEX_STORAGE, SCHEMA, JSON_TYPE
if args.max_index_num is not None and args.max_index_num <= 0:
raise argparse.ArgumentTypeError("%s is an invalid positive int value" %
args.max_index_num)
@ -710,6 +964,7 @@ def main():
args.max_index_storage)
if args.schema:
SCHEMA = args.schema
JSON_TYPE = args.json
MAX_INDEX_NUM = args.max_index_num or 10
ENABLE_MULTI_NODE = args.multi_node
MAX_INDEX_STORAGE = args.max_index_storage
@ -725,10 +980,19 @@ def main():
BASE_CMD += ' -W ' + args.W
if args.multi_iter_mode:
complex_index_advisor(args.f)
workload_indexes, tables, detail_info = complex_index_advisor(args.f)
else:
simple_index_advisor(args.f, args.max_index_num)
workload_indexes, tables, detail_info = simple_index_advisor(args.f, args.max_index_num)
whole_indexes, redundant_indexes = get_whole_index(tables, detail_info)
# check the unused indexes of the current workload based on the whole index
check_unused_index_workload(whole_indexes, redundant_indexes, workload_indexes, detail_info)
if args.show_detail:
print_header_boundary(" Display detail information ")
sql_info = json.dumps(detail_info, indent=4, separators=(',', ':'))
print(sql_info)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,951 @@
import os
import sys
import argparse
import copy
import getpass
import random
import re
import select
import logging
import psycopg2
import json
ENABLE_MULTI_NODE = False
SAMPLE_NUM = 5
MAX_INDEX_COLUMN_NUM = 5
MAX_INDEX_NUM = 10
MAX_INDEX_STORAGE = None
FULL_ARRANGEMENT_THRESHOLD = 20
NEGATIVE_RATIO_THRESHOLD = 0.2
BASE_CMD = ''
SHARP = '#'
SCHEMA = None
JSON_TYPE = False
BLANK = ' '
SQL_TYPE = ['select', 'delete', 'insert', 'update']
SQL_PATTERN = [r'\((\s*(\d+(\.\d+)?\s*)[,]?)+\)', # match integer set in the IN collection
r'([^\\])\'((\')|(.*?([^\\])\'))', # match all content in single quotes
r'(([^<>]\s*=\s*)|([^<>]\s+))(\d+)(\.\d+)?'] # match single integer
SQL_DISPLAY_PATTERN = [r'\((\s*(\d+(\.\d+)?\s*)[,]?)+\)', # match integer set in the IN collection
r'\'((\')|(.*?\'))', # match all content in single quotes
r'([^\_\d])\d+(\.\d+)?'] # match single integer
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
def read_input_from_pipe():
"""
Read stdin input if there is "echo 'str1 str2' | python xx.py",
return the input string
"""
input_str = ""
r_handle, _, _ = select.select([sys.stdin], [], [], 0)
if not r_handle:
return ""
for item in r_handle:
if item == sys.stdin:
input_str = sys.stdin.read().strip()
return input_str
class PwdAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
password = read_input_from_pipe()
if password:
logging.warning("Read password from pipe.")
else:
password = getpass.getpass("Password for database user:")
setattr(namespace, self.dest, password)
class QueryItem:
def __init__(self, sql, freq):
self.statement = sql
self.frequency = freq
self.valid_index_list = []
self.cost_list = []
class IndexItem:
def __init__(self, tbl, cols, positive_pos=None):
self.table = tbl
self.columns = cols
self.atomic_pos = 0
self.benefit = 0
self.storage = 0
self.positive_pos = positive_pos
self.ineffective_pos = []
self.negative_pos = []
self.total_sql_num = 0
self.insert_sql_num = 0
self.update_sql_num = 0
self.delete_sql_num = 0
self.select_sql_num = 0
class IndexInfo:
def __init__(self, schema, table, indexname, columns, indexdef):
self.schema = schema
self.table = table
self.indexname = indexname
self.columns = columns
self.indexdef = indexdef
self.primary_key = False
self.redundant_obj = []
class DatabaseConn:
def __init__(self, dbname, user, password, host, port):
self.dbname = dbname
self.user = user
self.password = password
self.host = host
self.port = port
self.conn = None
self.cur = None
def init_conn_handle(self):
self.conn = psycopg2.connect(dbname=self.dbname,
user=self.user,
password=self.password,
host=self.host,
port=self.port)
self.cur = self.conn.cursor()
def execute(self, sql):
try:
self.cur.execute(sql)
self.conn.commit()
return self.cur.fetchall()
except Exception:
self.conn.commit()
def close_conn(self):
if self.conn and self.cur:
self.cur.close()
self.conn.close()
def green(text):
return '\033[32m%s\033[0m' % text
def print_header_boundary(header):
# Output a header first, which looks more beautiful.
try:
term_width = os.get_terminal_size().columns
# The width of each of the two sides of the terminal.
side_width = (term_width - len(header)) // 2
except (AttributeError, OSError):
side_width = 0
title = SHARP * side_width + header + SHARP * side_width
print(green(title))
def filter_low_benefit(pos_list, candidate_indexes, multi_iter_mode, workload):
remove_list = []
for key, index in enumerate(candidate_indexes):
sql_optimzed = 0
for ind, pos in enumerate(index.positive_pos):
if multi_iter_mode:
cost_list_pos = index.atomic_pos
else:
cost_list_pos = pos_list[key] + 1
sql_optimzed += 1 - workload[pos].cost_list[cost_list_pos] / workload[pos].cost_list[0]
negative_ratio = (index.insert_sql_num + index.delete_sql_num + index.update_sql_num) / \
index.total_sql_num
# filter the candidate indexes that do not meet the conditions of optimization
if sql_optimzed / len(index.positive_pos) < 0.1:
remove_list.append(key)
elif sql_optimzed / len(index.positive_pos) < NEGATIVE_RATIO_THRESHOLD < negative_ratio:
remove_list.append(key)
for item in sorted(remove_list, reverse=True):
candidate_indexes.pop(item)
def display_recommend_result(workload, candidate_indexes, index_cost_total,
multi_iter_mode, display_info):
cnt = 0
index_current_storage = 0
pos_list = []
if not multi_iter_mode:
pos_list = [item[0] for item in candidate_indexes]
candidate_indexes = [item[1] for item in candidate_indexes]
# filter candidate indexes with low benefit
filter_low_benefit(pos_list, candidate_indexes, multi_iter_mode, workload)
# display determine result
for key, index in enumerate(candidate_indexes):
if MAX_INDEX_STORAGE and (index_current_storage + index.storage) > MAX_INDEX_STORAGE:
continue
if MAX_INDEX_NUM and cnt == MAX_INDEX_NUM:
break
index_current_storage += index.storage
table_name = index.table.split('.')[-1]
index_name = 'idx_' + table_name + '_' + '_'.join(index.columns.split(', '))
statement = 'CREATE INDEX ' + index_name + ' ON ' + index.table + '(' + index.columns + ');'
print(statement)
cnt += 1
if multi_iter_mode:
cost_list_pos = index.atomic_pos
else:
cost_list_pos = pos_list[key] + 1
sql_info = {'sqlDetails': []}
benefit_types = [index.ineffective_pos, index.positive_pos, index.negative_pos]
for category, benefit_type in enumerate(benefit_types):
sql_count = 0
for item in benefit_type:
sql_count += workload[item].frequency
for ind, pos in enumerate(benefit_type):
sql_detail = {}
sql_template = workload[pos].statement
for pattern in SQL_DISPLAY_PATTERN:
sql_template = re.sub(pattern, '?', sql_template)
sql_detail['sqlTemplate'] = sql_template
sql_detail['sql'] = workload[pos].statement
sql_detail['sqlCount'] = sql_count
if category == 1:
sql_optimzed = (workload[pos].cost_list[0] -
workload[pos].cost_list[cost_list_pos]) / \
workload[pos].cost_list[cost_list_pos]
sql_detail['optimized'] = '%.3f' % sql_optimzed
sql_detail['correlationType'] = category
sql_info['sqlDetails'].append(sql_detail)
workload_optimized = (1 - index_cost_total[cost_list_pos] / index_cost_total[0]) * 100
sql_info['workloadOptimized'] = '%.2f' % (workload_optimized if workload_optimized > 1 else 1)
sql_info['schemaName'] = SCHEMA
sql_info['tbName'] = table_name
sql_info['columns'] = index.columns
sql_info['statement'] = statement
sql_info['dmlCount'] = round(index.total_sql_num)
sql_info['selectRatio'] = round(index.select_sql_num * 100 / index.total_sql_num, 2)
sql_info['insertRatio'] = round(index.insert_sql_num * 100 / index.total_sql_num, 2)
sql_info['deleteRatio'] = round(index.delete_sql_num * 100 / index.total_sql_num, 2)
sql_info['updateRatio'] = round(100 - sql_info['selectRatio'] - sql_info['insertRatio']
- sql_info['deleteRatio'], 2)
display_info['recommendIndexes'].append(sql_info)
return display_info
def record_redundant_indexes(cur_table_indexes, redundant_indexes):
cur_table_indexes = sorted(cur_table_indexes,
key=lambda index_obj: len(index_obj.columns.split(',')))
# record redundant indexes
has_restore = []
for pos, index in enumerate(cur_table_indexes[:-1]):
is_redundant = False
for candidate_index in cur_table_indexes[pos + 1:]:
if 'UNIQUE INDEX' in index.indexdef:
# ensure that UNIQUE INDEX will not become redundant compared to normal index
if 'UNIQUE INDEX' not in candidate_index.indexdef:
continue
# ensure redundant index not is pkey
elif index.primary_key:
if re.match(r'%s' % candidate_index.columns, index.columns):
candidate_index.redundant_obj.append(index)
redundant_indexes.append(candidate_index)
has_restore.append(candidate_index)
continue
if re.match(r'%s' % index.columns, candidate_index.columns):
is_redundant = True
index.redundant_obj.append(candidate_index)
if is_redundant and index not in has_restore:
redundant_indexes.append(index)
def check_useless_index(tables, db):
whole_indexes = list()
redundant_indexes = list()
if not tables:
return whole_indexes, redundant_indexes
tables_string = ','.join(["'%s'" % table for table in tables[SCHEMA]])
sql = "SELECT c.relname AS tablename, i.relname AS indexname, " \
"pg_get_indexdef(i.oid) AS indexdef, p.contype AS pkey from " \
"pg_index x JOIN pg_class c ON c.oid = x.indrelid JOIN " \
"pg_class i ON i.oid = x.indexrelid LEFT JOIN pg_namespace n " \
"ON n.oid = c.relnamespace LEFT JOIN pg_constraint p ON i.oid = p.conindid" \
"WHERE (c.relkind = ANY (ARRAY['r'::\"char\", 'm'::\"char\"])) AND " \
"(i.relkind = ANY (ARRAY['i'::\"char\", 'I'::\"char\"])) AND " \
"n.nspname = '%s' AND c.relname in (%s) order by c.relname;" % (SCHEMA, tables_string)
res = db.execute(sql)
if res:
cur_table_indexes = list()
for item in res:
cur_columns = re.search(r'\(([^\(\)]*)\)', item[2]).group(1)
cur_index_obj = IndexInfo(SCHEMA, item[0], item[1], cur_columns, item[2])
if item[3]:
cur_index_obj.primary_key = True
whole_indexes.append(cur_index_obj)
if cur_table_indexes and cur_table_indexes[-1].table != item[0]:
record_redundant_indexes(cur_table_indexes, redundant_indexes)
cur_table_indexes = []
cur_table_indexes.append(cur_index_obj)
if cur_table_indexes:
record_redundant_indexes(cur_table_indexes, redundant_indexes)
return whole_indexes, redundant_indexes
def get_whole_index(tables, db, detail_info):
db.init_conn_handle()
whole_index, redundant_indexes = check_useless_index(tables, db)
db.close_conn()
print_header_boundary(" Created indexes ")
detail_info['createdIndexes'] = []
if not whole_index:
print("No created index!")
else:
for index in whole_index:
index_info = {'schemaName': index.schema, 'tbName': index.table,
'columns': index.columns, 'statement': index.indexdef + ';'}
detail_info['createdIndexes'].append(index_info)
print("%s;" % index.indexdef)
return whole_index, redundant_indexes
def check_unused_index_workload(whole_indexes, redundant_indexes, workload_indexes, detail_info):
indexes_name = set(index.indexname for index in whole_indexes)
unused_index = list(indexes_name.difference(workload_indexes))
remove_list = []
print_header_boundary(" Current workload useless indexes ")
if not unused_index:
print("No useless index!")
detail_info['uselessIndexes'] = []
# useless index
unused_index_columns = dict()
for cur_index in unused_index:
for index in whole_indexes:
if cur_index == index.indexname:
unused_index_columns[cur_index] = index.columns
if 'UNIQUE INDEX' not in index.indexdef:
statement = "DROP INDEX %s;" % index.indexname
print(statement)
useless_index = {"schemaName": index.schema, "tbName": index.table, "type": 1,
"columns": index.columns, "statement": statement}
detail_info['uselessIndexes'].append(useless_index)
print_header_boundary(" Redundant indexes ")
# filter redundant index
for pos, index in enumerate(redundant_indexes):
is_redundant = False
for redundant_obj in index.redundant_obj:
# redundant objects are not in the useless index set or
# equal to the column value in the useless index must be redundant index
index_exist = redundant_obj.indexname not in unused_index_columns.keys() or \
(unused_index_columns.get(redundant_obj.indexname) and
redundant_obj.columns == unused_index_columns[redundant_obj.indexname])
if index_exist:
is_redundant = True
if not is_redundant:
remove_list.append(pos)
for item in sorted(remove_list, reverse=True):
redundant_indexes.pop(item)
if not redundant_indexes:
print("No redundant index!")
# redundant index
for index in redundant_indexes:
statement = "DROP INDEX %s;" % index.indexname
print(statement)
existing_index = [item.indexname + ':' + item.columns for item in index.redundant_obj]
redundant_index = {"schemaName": index.schema, "tbName": index.table, "type": 2,
"columns": index.columns, "statement": statement, "existingIndex": existing_index}
detail_info['uselessIndexes'].append(redundant_index)
def load_workload(file_path):
wd_dict = {}
workload = []
global BLANK
with open(file_path, 'r') as file:
raw_text = ''.join(file.readlines())
sqls = raw_text.split(';')
for sql in sqls:
if any(tp in sql.lower() for tp in SQL_TYPE):
TWO_BLANKS = BLANK * 2
while TWO_BLANKS in sql:
sql = sql.replace(TWO_BLANKS, BLANK)
if sql not in wd_dict.keys():
wd_dict[sql] = 1
else:
wd_dict[sql] += 1
for sql, freq in wd_dict.items():
workload.append(QueryItem(sql, freq))
return workload
def get_workload_template(workload):
templates = {}
placeholder = r'@@@'
for item in workload:
sql_template = item.statement
for pattern in SQL_PATTERN:
sql_template = re.sub(pattern, placeholder, sql_template)
if sql_template not in templates:
templates[sql_template] = {}
templates[sql_template]['cnt'] = 0
templates[sql_template]['samples'] = []
templates[sql_template]['cnt'] += item.frequency
# reservoir sampling
if len(templates[sql_template]['samples']) < SAMPLE_NUM:
templates[sql_template]['samples'].append(item.statement)
else:
if random.randint(0, templates[sql_template]['cnt']) < SAMPLE_NUM:
templates[sql_template]['samples'][random.randint(0, SAMPLE_NUM - 1)] = \
item.statement
return templates
def workload_compression(input_path):
total_num = 0
compressed_workload = []
if JSON_TYPE:
with open(input_path, 'r') as file:
templates = json.load(file)
else:
workload = load_workload(input_path)
templates = get_workload_template(workload)
for _, elem in templates.items():
for sql in elem['samples']:
compressed_workload.append(QueryItem(sql.strip('\n'),
elem['cnt'] / len(elem['samples'])))
total_num += elem['cnt']
return compressed_workload, total_num
# parse the explain plan to get estimated cost by database optimizer
def parse_explain_plan(plan, index_config, ori_indexes_name):
cost_total = -1
cost_flag = True
for item in plan:
if '(cost=' in item[0] and cost_flag:
cost_flag = False
pattern = re.compile(r'\(cost=([^\)]*)\)', re.S)
matched_res = re.search(pattern, item[0])
if matched_res:
cost_list = matched_res.group(1).split()
if len(cost_list) == 3:
cost_total = float(cost_list[0].split('..')[-1])
if 'Index' in item[0] and 'Scan' in item[0] and not index_config:
ind1, ind2 = re.search(r'Index.*Scan(.*)on ([^\s]+)',
item[0].strip(), re.IGNORECASE).groups()
if ind1.strip():
ori_indexes_name.add(ind1.strip().split(' ')[1])
else:
ori_indexes_name.add(ind2)
break
return cost_total
def update_index_storage(index_id, index_config, hypo_index_num, db):
index_size_sql = 'select * from hypopg_estimate_size(%s);' % index_id
res = db.execute(index_size_sql)
if res:
index_config[hypo_index_num].storage = float(res[0][0]) / 1024 / 1024
def estimate_workload_cost_file(workload, db, index_config=None, ori_indexes_name=None):
total_cost = 0
hypo_index_num = 0
is_computed = False
db.execute('SET current_schema = %s' % SCHEMA)
if index_config:
if len(index_config) == 1 and index_config[0].positive_pos:
is_computed = True
# create hypo-indexes
db.execute('SET enable_hypo_index = on')
for index in index_config:
res = db.execute("SELECT * from hypopg_create_index('CREATE INDEX ON %s(%s)')" %
(index.table, index.columns))
if MAX_INDEX_STORAGE and res:
update_index_storage(res[0][0], index_config, hypo_index_num, db)
hypo_index_num += 1
if ENABLE_MULTI_NODE:
db.execute('set enable_fast_query_shipping = off;set enable_stream_operator = on')
db.execute("set explain_perf_mode = 'normal'")
remove_list = []
for ind, query in enumerate(workload):
# record ineffective sql and negative sql for candidate indexes
if is_computed:
record_ineffective_negative_sql(index_config[0], query, ind)
if 'select ' not in query.statement.lower():
workload[ind].cost_list.append(0)
else:
res = db.execute('EXPLAIN ' + query.statement)
if res:
query_cost = parse_explain_plan(res, index_config, ori_indexes_name)
query_cost *= workload[ind].frequency
workload[ind].cost_list.append(query_cost)
total_cost += query_cost
if index_config:
db.execute('SELECT hypopg_reset_index()')
return total_cost
def make_single_advisor_sql(ori_sql):
sql = 'set current_schema = %s; select gs_index_advise(\'' % SCHEMA
ori_sql = ori_sql.replace('"', '\'')
for elem in ori_sql:
if elem == '\'':
sql += '\''
sql += elem
sql += '\');'
return sql
def parse_single_advisor_result(res, workload_table_name):
table_index_dict = {}
items = res.strip('()').split(',', 1)
if len(items) == 2:
table = items[0]
workload_table_name[SCHEMA] = workload_table_name.get(SCHEMA, set())
workload_table_name[SCHEMA].add(table)
indexes = re.split('[()]', items[1].strip('\"'))
for columns in indexes:
if columns == '':
continue
if table not in table_index_dict.keys():
table_index_dict[table] = []
table_index_dict[table].append(columns)
return table_index_dict
# call the single-index-advisor in the database
def query_index_advisor(query, workload_table_name, db):
table_index_dict = {}
if 'select' not in query.lower():
return table_index_dict
sql = make_single_advisor_sql(query)
result = db.execute(sql=sql)
if not result:
return table_index_dict
for res in result:
table_index_dict.update(parse_single_advisor_result(res[0], workload_table_name))
return table_index_dict
# judge whether the index is used by the optimizer
def query_index_check(query, query_index_dict, db):
valid_indexes = {}
if len(query_index_dict) == 0:
return valid_indexes
# create hypo-indexes
sqls = 'SET enable_hypo_index = on;'
if ENABLE_MULTI_NODE:
sqls += 'SET enable_fast_query_shipping = off;SET enable_stream_operator = on;'
for table in query_index_dict.keys():
for columns in query_index_dict[table]:
if columns != '':
sqls += "SELECT hypopg_create_index('CREATE INDEX ON %s(%s)');" % \
(table, columns)
sqls += 'SELECT * from hypopg_display_index();'
result = db.execute(sqls)
if not result:
return valid_indexes
hypoid_table_column = {}
for item in result:
if len(item) == 4:
hypoid_table_column[str(item[1])] = item[2] + ':' + item[3].strip('()')
sqls = "SET explain_perf_mode = 'normal'; explain %s" % query
result = db.execute(sqls)
if not result:
return valid_indexes
# parse the result of explain plan
for item in result:
if 'Index' in item[0] and 'Scan' in item[0] and 'btree' in item[0]:
tokens = item[0].split(' ')
for token in tokens:
if 'btree' in token:
hypo_index_id = re.search(r'\d+', token.split('_', 1)[0]).group()
table_columns = hypoid_table_column.get(hypo_index_id)
if not table_columns:
continue
table_name, columns = table_columns.split(':')
if table_name not in valid_indexes.keys():
valid_indexes[table_name] = []
if columns not in valid_indexes[table_name]:
valid_indexes[table_name].append(columns)
db.execute('SELECT hypopg_reset_index()')
return valid_indexes
# enumerate the column combinations for a suggested index
def get_indexable_columns(table_index_dict):
query_indexable_columns = {}
if len(table_index_dict) == 0:
return query_indexable_columns
for table in table_index_dict.keys():
query_indexable_columns[table] = []
for columns in table_index_dict[table]:
indexable_columns = columns.split(',')
for column in indexable_columns:
query_indexable_columns[table].append(column)
return query_indexable_columns
def generate_candidate_indexes(workload, workload_table_name, db):
candidate_indexes = []
index_dict = {}
db.init_conn_handle()
for k, query in enumerate(workload):
if 'select ' in query.statement.lower():
table_index_dict = query_index_advisor(query.statement, workload_table_name, db)
need_check = False
query_indexable_columns = get_indexable_columns(table_index_dict)
valid_index_dict = query_index_check(query.statement, query_indexable_columns, db)
for i in range(MAX_INDEX_COLUMN_NUM):
for table in valid_index_dict.keys():
for columns in valid_index_dict[table]:
if columns.count(',') == i:
need_check = True
for single_column in query_indexable_columns[table]:
if single_column not in columns:
valid_index_dict[table].append(columns + ',' + single_column)
if need_check:
valid_index_dict = query_index_check(query.statement, valid_index_dict, db)
need_check = False
else:
break
# filter duplicate indexes
for table in valid_index_dict.keys():
if table not in index_dict.keys():
index_dict[table] = {}
for columns in valid_index_dict[table]:
if len(workload[k].valid_index_list) >= FULL_ARRANGEMENT_THRESHOLD:
break
workload[k].valid_index_list.append(IndexItem(table, columns))
if not any(re.match(r'%s' % columns, item) for item in index_dict[table]):
column_sql = {columns: [k]}
index_dict[table].update(column_sql)
elif columns in index_dict[table].keys():
index_dict[table][columns].append(k)
for table, column_sqls in index_dict.items():
for column, sql in column_sqls.items():
print("table: ", table, "columns: ", column)
candidate_indexes.append(IndexItem(table, column, sql))
db.close_conn()
return candidate_indexes
def get_atomic_config_for_query(indexes, config, ind, atomic_configs):
if ind == len(indexes):
table_count = {}
for index in config:
if index.table not in table_count.keys():
table_count[index.table] = 1
else:
table_count[index.table] += 1
if len(table_count) > 2 or table_count[index.table] > 2:
return
atomic_configs.append(config)
return
get_atomic_config_for_query(indexes, copy.copy(config), ind + 1, atomic_configs)
config.append(indexes[ind])
get_atomic_config_for_query(indexes, copy.copy(config), ind + 1, atomic_configs)
def is_same_config(config1, config2):
if len(config1) != len(config2):
return False
for index1 in config1:
is_found = False
for index2 in config2:
if index1.table == index2.table and index1.columns == index2.columns:
is_found = True
if not is_found:
return False
return True
def generate_atomic_config(workload):
atomic_config_total = []
for query in workload:
if len(query.valid_index_list) == 0:
continue
atomic_configs = []
config = []
get_atomic_config_for_query(query.valid_index_list, config, 0, atomic_configs)
is_found = False
for new_config in atomic_configs:
for exist_config in atomic_config_total:
if is_same_config(new_config, exist_config):
is_found = True
break
if not is_found:
atomic_config_total.append(new_config)
is_found = False
return atomic_config_total
# find the subsets of a given config in the atomic configs
def find_subsets_num(config, atomic_config_total):
atomic_subsets_num = []
is_exist = False
for i, atomic_config in enumerate(atomic_config_total):
if len(atomic_config) > len(config):
continue
# Record the atomic index position of the newly added index
if len(atomic_config) == 1 and atomic_config[0].table == config[-1].table and \
atomic_config[0].columns == config[-1].columns:
cur_index_atomic_pos = i
for atomic_index in atomic_config:
is_exist = False
for index in config:
if atomic_index.table == index.table and atomic_index.columns == index.columns:
index.storage = atomic_index.storage
is_exist = True
break
if not is_exist:
break
if is_exist:
atomic_subsets_num.append(i)
return atomic_subsets_num, cur_index_atomic_pos
def get_index_num(index, atomic_config_total):
for i, atomic_config in enumerate(atomic_config_total):
if len(atomic_config) == 1 and atomic_config[0].table == index.table and \
atomic_config[0].columns == index.columns:
return i
return -1
def record_ineffective_negative_sql(candidate_index, obj, ind):
cur_table = candidate_index.table + ' '
if cur_table in obj.statement:
candidate_index.total_sql_num += obj.frequency
if 'insert ' in obj.statement.lower():
candidate_index.insert_sql_num += obj.frequency
candidate_index.negative_pos.append(ind)
elif 'delete ' in obj.statement.lower():
candidate_index.delete_sql_num += obj.frequency
candidate_index.negative_pos.append(ind)
elif 'update ' in obj.statement.lower():
candidate_index.update_sql_num += obj.frequency
if any(column in obj.statement.lower().split('where ', 1)[0] for column in
candidate_index.columns.split(',')):
candidate_index.negative_pos.append(ind)
else:
candidate_index.ineffective_pos.append(ind)
else:
candidate_index.select_sql_num += obj.frequency
if ind not in candidate_index.positive_pos and \
any(column in obj.statement.lower() for column in candidate_index.columns):
candidate_index.ineffective_pos.append(ind)
# infer the total cost of workload for a config according to the cost of atomic configs
def infer_workload_cost(workload, config, atomic_config_total):
total_cost = 0
is_computed = False
atomic_subsets_num, cur_index_atomic_pos = find_subsets_num(config, atomic_config_total)
if len(atomic_subsets_num) == 0:
raise ValueError("No atomic configs found for current config!")
if not config[-1].total_sql_num:
is_computed = True
for ind, obj in enumerate(workload):
if max(atomic_subsets_num) >= len(obj.cost_list):
raise ValueError("Wrong atomic config for current query!")
# compute the cost for selection
min_cost = obj.cost_list[0]
for num in atomic_subsets_num:
if num < len(obj.cost_list) and obj.cost_list[num] < min_cost:
min_cost = obj.cost_list[num]
total_cost += min_cost
# record ineffective sql and negative sql for candidate indexes
if is_computed:
record_ineffective_negative_sql(config[-1], obj, ind)
return total_cost, cur_index_atomic_pos
def simple_index_advisor(input_path, max_index_num, db):
workload, workload_count = workload_compression(input_path)
print_header_boundary(" Generate candidate indexes ")
ori_indexes_name = set()
workload_table_name = dict()
display_info = {'workloadCount': workload_count, 'recommendIndexes': []}
candidate_indexes = generate_candidate_indexes(workload, workload_table_name, db)
db.init_conn_handle()
if len(candidate_indexes) == 0:
print("No candidate indexes generated!")
estimate_workload_cost_file(workload, db, ori_indexes_name=ori_indexes_name)
return ori_indexes_name, workload_table_name, display_info
print_header_boundary(" Determine optimal indexes ")
ori_total_cost = estimate_workload_cost_file(workload, db, ori_indexes_name=ori_indexes_name)
index_cost_total = [ori_total_cost]
for _, obj in enumerate(candidate_indexes):
new_total_cost = estimate_workload_cost_file(workload, db, [obj])
index_cost_total.append(new_total_cost)
obj.benefit = ori_total_cost - new_total_cost
db.close_conn()
candidate_indexes = sorted(enumerate(candidate_indexes),
key=lambda item: item[1].benefit, reverse=True)
candidate_indexes = [item for item in candidate_indexes if item[1].benefit > 0]
global MAX_INDEX_NUM
MAX_INDEX_NUM = max_index_num
display_recommend_result(workload, candidate_indexes, index_cost_total, False, display_info)
return ori_indexes_name, workload_table_name, display_info
def greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes, origin_sum_cost):
opt_config = []
index_num_record = set()
min_cost = origin_sum_cost
for i in range(len(candidate_indexes)):
if i == 1 and min_cost == origin_sum_cost:
break
cur_min_cost = origin_sum_cost
cur_index = None
cur_index_num = -1
for k, index in enumerate(candidate_indexes):
if k in index_num_record:
continue
cur_config = copy.copy(opt_config)
cur_config.append(index)
cur_estimated_cost, cur_index_atomic_pos = \
infer_workload_cost(workload, cur_config, atomic_config_total)
if cur_estimated_cost < cur_min_cost:
cur_min_cost = cur_estimated_cost
cur_index = index
cur_index.atomic_pos = cur_index_atomic_pos
cur_index_num = k
if cur_index and cur_min_cost < min_cost:
if MAX_INDEX_STORAGE and sum([obj.storage for obj in opt_config]) + \
cur_index.storage > MAX_INDEX_STORAGE:
continue
if len(opt_config) == MAX_INDEX_NUM:
break
min_cost = cur_min_cost
opt_config.append(cur_index)
index_num_record.add(cur_index_num)
else:
break
return opt_config
def complex_index_advisor(input_path, db):
workload, workload_count = workload_compression(input_path)
print_header_boundary(" Generate candidate indexes ")
ori_indexes_name = set()
workload_table_name = dict()
display_info = {'workloadCount': workload_count, 'recommendIndexes': []}
candidate_indexes = generate_candidate_indexes(workload, workload_table_name, db)
db.init_conn_handle()
if len(candidate_indexes) == 0:
print("No candidate indexes generated!")
estimate_workload_cost_file(workload, db, ori_indexes_name=ori_indexes_name)
return ori_indexes_name, workload_table_name, display_info
print_header_boundary(" Determine optimal indexes ")
atomic_config_total = generate_atomic_config(workload)
if atomic_config_total and len(atomic_config_total[0]) != 0:
raise ValueError("The empty atomic config isn't generated!")
index_cost_total = []
for atomic_config in atomic_config_total:
index_cost_total.append(estimate_workload_cost_file(workload, db, atomic_config,
ori_indexes_name))
db.close_conn()
opt_config = greedy_determine_opt_config(workload, atomic_config_total,
candidate_indexes, index_cost_total[0])
display_recommend_result(workload, opt_config, index_cost_total, True, display_info)
return ori_indexes_name, workload_table_name, display_info
def main():
arg_parser = argparse.ArgumentParser(description='Generate index set for workload.')
arg_parser.add_argument("p", help="Port of database")
arg_parser.add_argument("d", help="Name of database")
arg_parser.add_argument("--h", help="Host for database")
arg_parser.add_argument("-U", help="Username for database log-in")
arg_parser.add_argument("-W", help="Password for database user", nargs="?", action=PwdAction)
arg_parser.add_argument("f", help="File containing workload queries (One query per line)")
arg_parser.add_argument("--schema", help="Schema name for the current business data", required=True)
arg_parser.add_argument("--max_index_num", help="Maximum number of suggested indexes", type=int)
arg_parser.add_argument("--max_index_storage",
help="Maximum storage of suggested indexes/MB", type=int)
arg_parser.add_argument("--multi_iter_mode", action='store_true',
help="Whether to use multi-iteration algorithm", default=False)
arg_parser.add_argument("--multi_node", action='store_true',
help="Whether to support distributed scenarios", default=False)
arg_parser.add_argument("--json", action='store_true',
help="Whether the workload file format is json", default=False)
arg_parser.add_argument("--show_detail", action='store_true',
help="Whether to show detailed sql information", default=False)
args = arg_parser.parse_args()
global MAX_INDEX_NUM, BASE_CMD, ENABLE_MULTI_NODE, MAX_INDEX_STORAGE, SCHEMA, JSON_TYPE
if args.max_index_num is not None and args.max_index_num <= 0:
raise argparse.ArgumentTypeError("%s is an invalid positive int value" %
args.max_index_num)
if args.max_index_storage is not None and args.max_index_storage <= 0:
raise argparse.ArgumentTypeError("%s is an invalid positive int value" %
args.max_index_storage)
SCHEMA = args.schema
JSON_TYPE = args.json
MAX_INDEX_NUM = args.max_index_num or 10
ENABLE_MULTI_NODE = args.multi_node
MAX_INDEX_STORAGE = args.max_index_storage
BASE_CMD = 'gsql -p ' + args.p + ' -d ' + args.d
if args.h:
BASE_CMD += ' -h ' + args.h
if args.U:
BASE_CMD += ' -U ' + args.U
if args.U != getpass.getuser() and not args.W:
raise ValueError('Enter the \'-W\' parameter for user '
+ args.U + ' when executing the script.')
if args.W:
BASE_CMD += ' -W ' + args.W
# Initialize the connection
db = DatabaseConn(args.d, args.U, args.W, args.h, args.p)
if args.multi_iter_mode:
workload_indexes, tables, detail_info = complex_index_advisor(args.f, db)
else:
workload_indexes, tables, detail_info = simple_index_advisor(args.f, args.max_index_num, db)
whole_indexes, redundant_indexes = get_whole_index(tables, db, detail_info)
# check the unused indexes of the current workload based on the whole index
check_unused_index_workload(whole_indexes, redundant_indexes, workload_indexes, detail_info)
if args.show_detail:
print_header_boundary(" Display detail information ")
sql_info = json.dumps(detail_info, indent=4, separators=(',', ':'))
print(sql_info)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,373 @@
try:
import sys
import os
import argparse
import logging
import time
import signal
import re
import select
import urllib
import json
import datetime
from urllib.request import Request
from subprocess import Popen, PIPE
from threading import Thread, Event, Timer
from configparser import ConfigParser
from logging import handlers
except ImportError as err:
sys.exit("index_server.py: Failed to import module: %s." % str(err))
current_dirname = os.path.dirname(os.path.realpath(__file__))
__description__ = 'index advise: index server tool.'
class RepeatTimer(Thread):
"""
This class inherits from threading.Thread, it is used for periodic execution
function at a specified time interval.
"""
def __init__(self, interval, function, *args, **kwargs):
Thread.__init__(self)
self._interval = interval
self._function = function
self._args = args
self._kwargs = kwargs
self._finished = Event()
def run(self):
while not self._finished.is_set():
# Execute first, wait later.
self._function(*self._args, **self._kwargs)
self._finished.wait(self._interval)
self._finished.set()
def cancel(self):
self._finished.set()
class CreateLogger:
def __init__(self, level, log_name):
self.level = level
self.log_name = log_name
def create_log(self, log_path):
logger = logging.getLogger(self.log_name)
log_path = os.path.join(os.path.dirname(log_path), 'log')
if os.path.exists(log_path):
if os.path.isfile(log_path):
os.remove(log_path)
os.mkdir(log_path)
else:
os.makedirs(log_path)
agent_handler = handlers.RotatingFileHandler(filename=os.path.join(log_path, self.log_name),
maxBytes=1024 * 1024 * 100,
backupCount=5)
agent_handler.setFormatter(logging.Formatter(
"[%(asctime)s %(levelname)s]-[%(filename)s][%(lineno)d]: %(message)s"))
logger.addHandler(agent_handler)
logger.setLevel(getattr(logging, self.level.upper())
if hasattr(logging, self.level.upper()) else logging.INFO)
return logger
class IndexServer:
def __init__(self, pid_file, logger, password, **kwargs):
self.pid_file = pid_file
self.logger = logger
self.password = password
self._kwargs = kwargs
def check_proc_exist(self, proc_name):
"""
check proc exist
:param proc_name: proc name
:return: proc pid
"""
check_proc = "ps ux | grep '%s' | grep -v grep | grep -v nohup | awk \'{print $2}\'" % proc_name
_, std = self.execute_cmd(check_proc)
current_pid = str(os.getpid())
pid_list = [pid for pid in std.split("\n") if pid and pid != current_pid]
if not pid_list:
return ""
return " ".join(pid_list)
def execute_cmd(self, cmd):
"""
execute cmd
:param cmd: cmd str
:param shell: execute shell mode, True or False
:return: execute result
"""
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
std, err_msg = proc.communicate()
if proc.returncode != 0:
self.logger.error("Failed to execute command. Error: %s." % str(err_msg))
return proc.returncode, std.decode()
def save_recommendation_infos(self, recommendation_infos):
headers = {'Content-Type': 'application/json'}
data = json.dumps(recommendation_infos, default=lambda o: o.__dict__, sort_keys=True,
indent=4).encode()
request = Request(url=self._kwargs['ai_monitor_url'], headers=headers,
data=data)
response = None
try:
response = urllib.request.urlopen(request, timeout=600)
result = json.loads(response.read())
finally:
if response:
response.close()
return result
def convert_output_to_recommendation_infos(self, sql_lines):
detail_info_pos = 0
index_info = sql_lines.splitlines()
for pos, line in enumerate(index_info):
if 'Display detail information' in line:
detail_info_pos = pos + 1
break
detail_info_json = json.loads('\n'.join(index_info[detail_info_pos:]))
detail_info_json['appName'] = self._kwargs.get('app_name')
detail_info_json['nodeHost'] = self._kwargs.get('host')
detail_info_json['dbName'] = self._kwargs.get('database')
return detail_info_json
def execute_index_advisor(self):
self.logger.info('Index advisor task starting.')
try:
cmd = 'echo %s | python3 %s/index_advisor_workload.py %s %s %s -U %s -W ' \
'--schema %s --json --multi_iter_mode --show_detail' % (
self.password, current_dirname, self._kwargs['port'], self._kwargs['database'],
self._kwargs['output_sql_file'], self._kwargs['user'], self._kwargs['schema'])
if self._kwargs['max_index_storage']:
cmd += ' --max_index_storage %s ' % self._kwargs['max_index_storage']
if self._kwargs['max_index_num']:
cmd += ' --max_index_num %s ' % self._kwargs['max_index_num']
if self._kwargs['driver']:
try:
import psycopg2
cmd = cmd.replace('index_advisor_workload.py',
'index_advisor_workload_driver.py')
except ImportError:
self.logger.warning('Driver import failed, use gsql to connect to the database.')
self.logger.info('Index advisor cmd:%s' % cmd.split('|')[-1])
if os.path.exists(self._kwargs['output_sql_file']):
_, res = self.execute_cmd(cmd)
detail_info_json = self.convert_output_to_recommendation_infos(res)
self.logger.info('Index advisor result: %s.' % detail_info_json)
result = self.save_recommendation_infos(detail_info_json)
if result['status'] is not True:
self.logger.error('Fail to upload index result, Error: %s' % result['message'])
else:
self.logger.info('Success to upload index result.')
except Exception as e:
self.logger.error(e)
def extract_log(self, start_time):
extract_log_cmd = 'python3 %s %s %s --start_time "%s"' % \
(os.path.join(current_dirname, 'extract_log.py'),
self._kwargs['pg_log_path'],
self._kwargs['output_sql_file'], start_time)
if self._kwargs['database']:
extract_log_cmd += ' -d %s ' % self._kwargs['database']
if self._kwargs['wl_user']:
extract_log_cmd += ' -U %s ' % self._kwargs['wl_user']
if self._kwargs['sql_amount']:
extract_log_cmd += ' --sql_amount %s ' % self._kwargs['sql_amount']
if self._kwargs['statement']:
extract_log_cmd += ' --statement '
self.logger.info('Extracting log cmd: %s' % extract_log_cmd)
self.execute_cmd(extract_log_cmd)
self.logger.info('The current log extraction is complete.')
def monitor_log_size(self, guc_reset):
self.logger.info('Open GUC params.')
# get original all file size
original_total_size = self.get_directory_size()
self.logger.info('Original total file size: %sM' % (original_total_size / 1024 / 1024))
deviation_size = 0
# open guc
guc_reload = 'gs_guc reload -Z datanode -D {datanode} -c "log_min_duration_statement = 0" && ' \
'gs_guc reload -Z datanode -D {datanode} -c "log_statement= \'all\'"' \
.format(datanode=self._kwargs['datanode'])
self.execute_cmd(guc_reload)
start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(time.time())))
# caculate log size
count = 0
while deviation_size < self._kwargs['max_generate_log_size']:
time.sleep(5)
current_size = self.get_directory_size()
deviation_size = (current_size - original_total_size) / 1024 / 1024
if current_size - original_total_size < 0:
if count >= 60:
break
count += 1
self.logger.info('Current log size difference: %sM' % deviation_size)
self.logger.info('Start to reset GUC, cmd: %s' % guc_reset)
returncode, res = self.execute_cmd(guc_reset)
if returncode == 0:
self.logger.info('Success to reset GUC setting.')
else:
self.logger.error('Failed to reset GUC params. please check it.')
return start_time
def get_directory_size(self):
files = os.listdir(self._kwargs['pg_log_path'])
total_size = 0
for file in files:
total_size += os.path.getsize(os.path.join(self._kwargs['pg_log_path'], file))
return total_size
def execute_index_recommendation(self):
self.logger.info('Start checking guc.')
try:
guc_check = 'gs_guc check -Z datanode -D {datanode} -c "log_min_duration_statement" && ' \
'gs_guc check -Z datanode -D {datanode} -c "log_statement" '\
.format(datanode=self._kwargs['datanode'])
returncode, res = self.execute_cmd(guc_check)
origin_min_duration = self._kwargs['log_min_duration_statement']
origin_log_statement = self._kwargs['log_statement']
if returncode == 0:
self.logger.info('Original GUC settings is: %s' % res)
match_res = re.findall(r'log_min_duration_statement=(\'?[a-zA-Z0-9]+\'?)', res)
if match_res:
origin_min_duration = match_res[-1]
if 'NULL' in origin_min_duration:
origin_min_duration = '30min'
match_res = re.findall(r'log_statement=(\'?[a-zA-Z]+\'?)', res)
if match_res:
origin_log_statement = match_res[-1]
if 'NULL' in origin_log_statement:
origin_log_statement = 'none'
self.logger.info('Parsed (log_min_duration_statement, log_statement) GUC params are (%s, %s)' %
(origin_min_duration, origin_log_statement))
self.logger.info('Test reseting GUC command...')
guc_reset = 'gs_guc reload -Z datanode -D %s -c "log_min_duration_statement = %s" && ' \
'gs_guc reload -Z datanode -D %s -c "log_statement= %s"' % \
(self._kwargs['datanode'], origin_min_duration,
self._kwargs['datanode'], origin_log_statement)
returncode, res = self.execute_cmd(guc_reset)
if returncode != 0:
guc_reset = 'gs_guc reload -Z datanode -D %s -c "log_min_duration_statement = %s" && ' \
'gs_guc reload -Z datanode -D %s -c "log_statement= %s"' % \
(self._kwargs['datanode'], self._kwargs['log_min_duration_statement'],
self._kwargs['datanode'], self._kwargs['log_statement'])
ret, res = self.execute_cmd(guc_reset)
if ret != 0:
raise Exception('Cannot reset GUC initial value, please check it.')
self.logger.info('Test successfully')
# open guc and monitor log real-time size
start_time = self.monitor_log_size(guc_reset)
# extract log
self.extract_log(start_time)
# index advise
self.execute_index_advisor()
except Exception as e:
self.logger.error(e)
guc_reset = 'gs_guc reload -Z datanode -D %s -c "log_min_duration_statement = %s" && ' \
'gs_guc reload -Z datanode -D %s -c "log_statement= %s"' % \
(self._kwargs['datanode'], self._kwargs['log_min_duration_statement'],
self._kwargs['datanode'], self._kwargs['log_statement'])
self.execute_cmd(guc_reset)
def start_service(self):
# check service is running or not.
if os.path.isfile(self.pid_file):
pid = self.check_proc_exist("index_server")
if pid:
raise Exception("Error: Process already running, can't start again.")
else:
os.remove(self.pid_file)
# get listen host and port
self.logger.info("Start service...")
# write process pid to file
if not os.path.isdir(os.path.dirname(self.pid_file)):
os.makedirs(os.path.dirname(self.pid_file), 0o700)
with open(self.pid_file, mode='w') as f:
f.write(str(os.getpid()))
self.logger.info("Index advisor execution intervals is: %sh" %
self._kwargs['index_intervals'])
index_recommendation_thread = RepeatTimer(self._kwargs['index_intervals']*60*60,
self.execute_index_recommendation)
self.logger.info("Start timer...")
index_recommendation_thread.start()
def read_input_from_pipe():
"""
Read stdin input if there is "echo 'str1 str2' | python xx.py",
return the input string
"""
input_str = ""
r_handle, _, _ = select.select([sys.stdin], [], [], 0)
if not r_handle:
return ""
for item in r_handle:
if item == sys.stdin:
input_str = sys.stdin.read().strip()
return input_str
def parse_check_conf(config_path):
config = ConfigParser()
config.read(config_path)
config_dict = dict()
config_dict['app_name'] = config.get("server", "app_name")
config_dict['database'] = config.get("server", "database")
config_dict['port'] = config.get("server", "port")
config_dict['host'] = config.get("server", "host")
config_dict['user'] = config.get("server", "user")
config_dict['wl_user'] = config.get("server", "workload_user")
config_dict['schema'] = config.get("server", "schema")
config_dict['max_index_num'] = config.getint("server", "max_index_num")
config_dict['max_index_storage'] = config.get("server", "max_index_storage")
config_dict['driver'] = config.getboolean("server", "driver")
config_dict['index_intervals'] = config.getint("server", "index_intervals")
config_dict['sql_amount'] = config.getint("server", "sql_amount")
config_dict['output_sql_file'] = config.get("server", "output_sql_file")
config_dict['datanode'] = config.get("server", "datanode")
config_dict['pg_log_path'] = config.get("server", "pg_log_path")
config_dict['ai_monitor_url'] = config.get("server", "ai_monitor_url")
config_dict['max_generate_log_size'] = config.getfloat("server", "max_generate_log_size")
config_dict['statement'] = config.getboolean("server", "statement")
config_dict['log_min_duration_statement'] = config.get("server", "log_min_duration_statement")
config_dict['log_statement'] = config.get("server", "log_statement")
if not config_dict['log_min_duration_statement'] or \
not re.match(r'[a-zA-Z0-9]+', config_dict['log_min_duration_statement']):
raise ValueError("Please enter a legal value of [log_min_duration_statement]")
legal_log_statement = ['none', 'all', 'ddl', 'mod']
if config_dict['log_statement'] not in legal_log_statement:
raise ValueError("Please enter a legal value of [log_statement]")
return config_dict
def manage_service():
config_path = os.path.join(current_dirname, 'database-info.conf')
config_dict = parse_check_conf(config_path)
LOGGER = CreateLogger("debug", "start_service.log").create_log(config_dict.get('output_sql_file'))
server_pid_file = os.path.join(current_dirname, 'index_server.pid')
password = read_input_from_pipe()
IndexServer(server_pid_file, LOGGER, password, **config_dict).start_service()
def main():
try:
manage_service()
except Exception as err_msg:
print(err_msg)
sys.exit(1)
if __name__ == '__main__':
main()