From 1832360488c83afd0a9f079b7dceb0713b465d98 Mon Sep 17 00:00:00 2001 From: flyly Date: Wed, 28 Jul 2021 20:55:05 +0800 Subject: [PATCH] Index recommendation function enhancement --- .../tools/index_advisor/database-info.conf | 25 + .../dbmind/tools/index_advisor/extract_log.py | 237 +++-- .../index_advisor/index_advisor_workload.py | 582 ++++++++--- .../index_advisor_workload_driver.py | 951 ++++++++++++++++++ .../tools/index_advisor/index_server.py | 373 +++++++ 5 files changed, 1947 insertions(+), 221 deletions(-) create mode 100644 src/gausskernel/dbmind/tools/index_advisor/database-info.conf create mode 100644 src/gausskernel/dbmind/tools/index_advisor/index_advisor_workload_driver.py create mode 100644 src/gausskernel/dbmind/tools/index_advisor/index_server.py diff --git a/src/gausskernel/dbmind/tools/index_advisor/database-info.conf b/src/gausskernel/dbmind/tools/index_advisor/database-info.conf new file mode 100644 index 000000000..ee758a561 --- /dev/null +++ b/src/gausskernel/dbmind/tools/index_advisor/database-info.conf @@ -0,0 +1,25 @@ +[server] +app_name= +database= +port= +host= +user= +workload_user= +schema= + +index_intervals= +max_index_num= +max_index_storage= +driver= + +sql_amount= +max_generate_log_size= +statement= +log_min_duration_statement= +log_statement= +output_sql_file= +datanode= +pg_log_path= + +ai_monitor_url= + diff --git a/src/gausskernel/dbmind/tools/index_advisor/extract_log.py b/src/gausskernel/dbmind/tools/index_advisor/extract_log.py index c93cf6b8e..3374ffcb3 100644 --- a/src/gausskernel/dbmind/tools/index_advisor/extract_log.py +++ b/src/gausskernel/dbmind/tools/index_advisor/extract_log.py @@ -1,89 +1,202 @@ import re import os import argparse - +import json +import random +import time +from subprocess import Popen, PIPE SQL_TYPE = ['select', 'delete', 'insert', 'update'] +SQL_AMOUNT = 0 +PLACEHOLDER = r'@@@' +SAMPLE_NUM = 5 +SQL_PATTERN = [r'\((\s*(\d+(\.\d+)?\s*)[,]?)+\)', # match integer set in the IN collection + r'([^\\])\'((\')|(.*?([^\\])\'))', # match all content in single quotes + r'(([^<>]\s*=\s*)|([^<>]\s+))(\d+)(\.\d+)?'] # match single integer + + +def get_workload_template(templates, sqls): + for sql in sqls: + sql_template = sql + for pattern in SQL_PATTERN: + sql_template = re.sub(pattern, PLACEHOLDER, sql_template) + if sql_template not in templates: + templates[sql_template] = {} + templates[sql_template]['cnt'] = 0 + templates[sql_template]['samples'] = [] + templates[sql_template]['cnt'] += 1 + # reservoir sampling + if len(templates[sql_template]['samples']) < SAMPLE_NUM: + if sql not in templates[sql_template]['samples']: + templates[sql_template]['samples'].append(sql) + else: + if random.randint(0, templates[sql_template]['cnt']) < SAMPLE_NUM: + templates[sql_template]['samples'][random.randint(0, SAMPLE_NUM - 1)] = sql def output_valid_sql(sql): - if 'from pg_' in sql.lower(): + is_quotation_valid = sql.count("'") % 2 + if 'from pg_' in sql.lower() or ' join ' in sql.lower() or is_quotation_valid: return '' if any(tp in sql.lower() for tp in SQL_TYPE[1:]): - return sql if sql.endswith('; ') else sql + ';' + sql = re.sub(r'for\s+update[\s;]*$', '', sql, flags=re.I) + return sql.strip() if sql.endswith('; ') else sql + ';' elif SQL_TYPE[0] in sql.lower() and 'from ' in sql.lower(): - return sql if sql.endswith('; ') else sql + ';' + return sql.strip() if sql.endswith('; ') else sql + ';' return '' -def extract_sql_from_log(log_path): - files = os.listdir(log_path) - for file in files: - file_path = log_path + "/" + file - if os.path.isfile(file_path) and re.search(r'.log$', file): - with open(file_path, mode='r') as f: - line = f.readline() - sql = '' - statement_flag = False - execute_flag = False +def get_parsed_sql(file, user, database, sql_amount, statement): + global SQL_AMOUNT + line = file.readline() + sql = '' + statement_flag = False + execute_flag = False - while line: - try: - # Identify statement scene - if re.search('statement: ', line, re.IGNORECASE): - statement_flag = True - if output_valid_sql(sql): - yield output_valid_sql(sql) - sql = re.search(r'statement: (.*)', line.strip(), - re.IGNORECASE).group(1) + ' ' - line = f.readline() + while line: + if sql_amount and SQL_AMOUNT == sql_amount: + break + try: + # Identify statement scene + if re.search('statement: ', line.lower(), re.IGNORECASE) and statement: + if output_valid_sql(sql): + SQL_AMOUNT += 1 + yield output_valid_sql(sql) + log_info = line.split(' ') + if (user and user not in log_info) or ( + database and database not in log_info): + line = file.readline() + continue + statement_flag = True + sql = re.search(r'statement: (.*)', line.strip(), re.IGNORECASE).group(1) + ' ' + line = file.readline() - # Identify execute statement scene - elif re.search(r'execute .*:', line, re.IGNORECASE): - if output_valid_sql(sql): - yield output_valid_sql(sql) - execute_flag = True - sql = re.search(r'execute .*: (.*)', line.strip(), re.IGNORECASE).group(1) - line = f.readline() - else: - if statement_flag: - if re.match(r'^\t', line): - sql += line.strip('\t\n') - else: - statement_flag = False - if output_valid_sql(sql): - yield output_valid_sql(sql) - sql = '' - if execute_flag and re.search(r'parameters: ', line, re.IGNORECASE): - execute_flag = False - param_list = re.search(r'parameters: (.*)', line.strip(), - re.IGNORECASE).group(1).split(', ') - param_list = list(param.split('=', 1) for param in param_list) - param_list.sort(key=lambda x: int(x[0].strip(' $')), - reverse=True) - for item in param_list: - if len(item[1].strip()) >= 256: - sql = sql.replace(item[0].strip(), "''") - else: - sql = sql.replace(item[0].strip(), item[1].strip()) - yield output_valid_sql(sql) - sql = '' - line = f.readline() - except: - execute_flag = False + # Identify execute statement scene + elif re.search(r'execute .*:', line, re.IGNORECASE): + if output_valid_sql(sql): + SQL_AMOUNT += 1 + yield output_valid_sql(sql) + log_info = line.split(' ') + if (user and user not in log_info) or ( + database and database not in log_info): + line = file.readline() + continue + execute_flag = True + sql = re.search(r'execute .*: (.*)', line.strip(), re.IGNORECASE).group(1) + line = file.readline() + else: + if statement_flag: + if re.match(r'^\t', line): + sql += line.strip('\t\n') + else: statement_flag = False - line = f.readline() + if output_valid_sql(sql): + SQL_AMOUNT += 1 + yield output_valid_sql(sql) + sql = '' + if execute_flag: + execute_flag = False + if re.search(r'parameters: ', line, re.IGNORECASE): + param_list = re.search(r'parameters: (.*)', line.strip(), + re.IGNORECASE).group(1).split(', $') + param_list = list(param.split('=', 1) for param in param_list) + param_list.sort(key=lambda x: int(x[0].strip(' $')), + reverse=True) + for item in param_list: + if len(item[1].strip()) >= 256: + sql = sql.replace(item[0].strip() if re.match(r'\$', item[0]) else + ('$' + item[0].strip()), "''") + else: + sql = sql.replace(item[0].strip() if re.match(r'\$', item[0]) else + ('$' + item[0].strip()), item[1].strip()) + if output_valid_sql(sql): + SQL_AMOUNT += 1 + yield output_valid_sql(sql) + sql = '' + line = file.readline() + except: + execute_flag = False + statement_flag = False + line = file.readline() + + +def get_start_position(start_time, file_path): + while start_time: + cmd = 'head -n $(cat %s | grep -m 1 -n "^%s" | awk -F : \'{print $1}\') %s | wc -c' % \ + (file_path, start_time, file_path) + proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + std, err_msg = proc.communicate() + if proc.returncode == 0 and not err_msg: + return int(std) + elif len(start_time) > 13: + start_time = start_time[0: -3] + else: + break + return -1 + + +def record_sql(valid_files, args, output_obj): + for ind, file in enumerate(valid_files): + if args.sql_amount and SQL_AMOUNT >= args.sql_amount: + break + file_path = os.path.join(args.l, file) + if os.path.isfile(file_path) and re.search(r'.log$', file): + start_position = 0 + if ind == 0: + start_position = get_start_position(args.start_time, file_path) + if start_position == -1: + continue + with open(file_path, mode='r') as f: + f.seek(start_position, 0) + if isinstance(output_obj, dict): + get_workload_template(output_obj, get_parsed_sql(f, args.U, args.d, + args.sql_amount, + args.statement)) + else: + for sql in get_parsed_sql(f, args.U, args.d, args.sql_amount, args.statement): + output_obj.write(sql + '\n') + + +def extract_sql_from_log(args): + files = os.listdir(args.l) + files = sorted(files, key=lambda x: os.path.getctime(os.path.join(args.l, x)), reverse=True) + valid_files = files + time_stamp = int(time.mktime(time.strptime(args.start_time, '%Y-%m-%d %H:%M:%S'))) + if args.start_time: + valid_files = [] + for file in files: + if os.path.getmtime(os.path.join(args.l, file)) < time_stamp: + break + valid_files.insert(0, file) + if args.json: + templates = {} + record_sql(valid_files, args, templates) + with open(args.f, 'w') as output_file: + json.dump(templates, output_file) + else: + with open(args.f, 'w') as output_file: + record_sql(valid_files, args, output_file) def main(): arg_parser = argparse.ArgumentParser() arg_parser.add_argument("l", help="The path of the log file that needs to be parsed.") arg_parser.add_argument("f", help="The output path of the extracted file.") + arg_parser.add_argument("-d", help="Name of database") + arg_parser.add_argument("-U", help="Username for database log-in") + arg_parser.add_argument("--start_time", help="Start time of extracted log") + arg_parser.add_argument("--sql_amount", help="The number of sql collected", type=int) + arg_parser.add_argument("--statement", action='store_true', help="Extract statement log type", + default=False) + arg_parser.add_argument("--json", action='store_true', + help="Whether the workload file format is json", default=False) + args = arg_parser.parse_args() - sqls = extract_sql_from_log(args.l) - with open(args.f, 'w') as file: - for sql in sqls: - file.write(sql + '\n') + if args.start_time: + time.strptime(args.start_time, '%Y-%m-%d %H:%M:%S') + if args.sql_amount and args.sql_amount <= 0: + raise argparse.ArgumentTypeError("%s is an invalid positive int value" % args.sql_amount) + extract_sql_from_log(args) if __name__ == '__main__': diff --git a/src/gausskernel/dbmind/tools/index_advisor/index_advisor_workload.py b/src/gausskernel/dbmind/tools/index_advisor/index_advisor_workload.py index 17b02aa7d..cfb8e4976 100644 --- a/src/gausskernel/dbmind/tools/index_advisor/index_advisor_workload.py +++ b/src/gausskernel/dbmind/tools/index_advisor/index_advisor_workload.py @@ -24,6 +24,7 @@ import subprocess import time import select import logging +import json ENABLE_MULTI_NODE = False SAMPLE_NUM = 5 @@ -31,15 +32,19 @@ MAX_INDEX_COLUMN_NUM = 5 MAX_INDEX_NUM = 10 MAX_INDEX_STORAGE = None FULL_ARRANGEMENT_THRESHOLD = 20 +NEGATIVE_RATIO_THRESHOLD = 0.2 BASE_CMD = '' SHARP = '#' SCHEMA = None +JSON_TYPE = False BLANK = ' ' -SQL_TYPE = ['select', 'delete', 'insert', 'update'] -SQL_PATTERN = [r'([^\\])\'((\')|(.*?([^\\])\'))', - r'([^\\])"((")|(.*?([^\\])"))', - r'(\s*[<>]\s*=*\s*\d+)', - r'(\'\d+\\.*?\')'] +SQL_TYPE = ['select ', 'delete ', 'insert ', 'update '] +SQL_PATTERN = [r'\((\s*(\d+(\.\d+)?\s*)[,]?)+\)', # match integer set in the IN collection + r'([^\\])\'((\')|(.*?([^\\])\'))', # match all content in single quotes + r'(([^<>]\s*=\s*)|([^<>]\s+))(\d+)(\.\d+)?'] # match single integer +SQL_DISPLAY_PATTERN = [r'\((\s*(\d+(\.\d+)?\s*)[,]?)+\)', # match integer set in the IN collection + r'\'((\')|(.*?\'))', # match all content in single quotes + r'([^\_\d])\d+(\.\d+)?'] # match single integer logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') @@ -79,11 +84,31 @@ class QueryItem: class IndexItem: - def __init__(self, tbl, cols): + def __init__(self, tbl, cols, positive_pos=None): self.table = tbl self.columns = cols + self.atomic_pos = 0 self.benefit = 0 self.storage = 0 + self.positive_pos = positive_pos + self.ineffective_pos = [] + self.negative_pos = [] + self.total_sql_num = 0 + self.insert_sql_num = 0 + self.update_sql_num = 0 + self.delete_sql_num = 0 + self.select_sql_num = 0 + + +class IndexInfo: + def __init__(self, schema, table, indexname, columns, indexdef): + self.schema = schema + self.table = table + self.indexname = indexname + self.columns = columns + self.indexdef = indexdef + self.primary_key = False + self.redundant_obj = [] def run_shell_cmd(target_sql_list): @@ -128,6 +153,222 @@ def print_header_boundary(header): print(green(title)) +def filter_low_benefit(pos_list, candidate_indexes, multi_iter_mode, workload): + remove_list = [] + for key, index in enumerate(candidate_indexes): + sql_optimzed = 0 + for ind, pos in enumerate(index.positive_pos): + if multi_iter_mode: + cost_list_pos = index.atomic_pos + else: + cost_list_pos = pos_list[key] + 1 + sql_optimzed += 1 - workload[pos].cost_list[cost_list_pos] / workload[pos].cost_list[0] + negative_ratio = (index.insert_sql_num + index.delete_sql_num + index.update_sql_num) / \ + index.total_sql_num + # filter the candidate indexes that do not meet the conditions of optimization + if sql_optimzed / len(index.positive_pos) < 0.1: + remove_list.append(key) + elif sql_optimzed / len(index.positive_pos) < NEGATIVE_RATIO_THRESHOLD < negative_ratio: + remove_list.append(key) + for item in sorted(remove_list, reverse=True): + candidate_indexes.pop(item) + + +def display_recommend_result(workload, candidate_indexes, index_cost_total, + multi_iter_mode, display_info): + cnt = 0 + index_current_storage = 0 + pos_list = [] + if not multi_iter_mode: + pos_list = [item[0] for item in candidate_indexes] + candidate_indexes = [item[1] for item in candidate_indexes] + # filter candidate indexes with low benefit + filter_low_benefit(pos_list, candidate_indexes, multi_iter_mode, workload) + # display determine result + for key, index in enumerate(candidate_indexes): + if MAX_INDEX_STORAGE and (index_current_storage + index.storage) > MAX_INDEX_STORAGE: + continue + if MAX_INDEX_NUM and cnt == MAX_INDEX_NUM: + break + index_current_storage += index.storage + table_name = index.table.split('.')[-1] + index_name = 'idx_' + table_name + '_' + '_'.join(index.columns.split(', ')) + statement = 'CREATE INDEX ' + index_name + ' ON ' + index.table + '(' + index.columns + ');' + print(statement) + cnt += 1 + if multi_iter_mode: + cost_list_pos = index.atomic_pos + else: + cost_list_pos = pos_list[key] + 1 + + sql_info = {'sqlDetails': []} + benefit_types = [index.ineffective_pos, index.positive_pos, index.negative_pos] + for category, benefit_type in enumerate(benefit_types): + sql_count = 0 + for item in benefit_type: + sql_count += workload[item].frequency + for ind, pos in enumerate(benefit_type): + sql_detail = {} + sql_template = workload[pos].statement + for pattern in SQL_DISPLAY_PATTERN: + sql_template = re.sub(pattern, '?', sql_template) + + sql_detail['sqlTemplate'] = sql_template + sql_detail['sql'] = workload[pos].statement + sql_detail['sqlCount'] = sql_count + if category == 1: + sql_optimzed = (workload[pos].cost_list[0] - + workload[pos].cost_list[cost_list_pos]) / \ + workload[pos].cost_list[cost_list_pos] + sql_detail['optimized'] = '%.3f' % sql_optimzed + sql_detail['correlationType'] = category + sql_info['sqlDetails'].append(sql_detail) + workload_optimized = (1 - index_cost_total[cost_list_pos] / index_cost_total[0]) * 100 + sql_info['workloadOptimized'] = '%.2f' % (workload_optimized if workload_optimized > 1 else 1) + sql_info['schemaName'] = SCHEMA + sql_info['tbName'] = table_name + sql_info['columns'] = index.columns + sql_info['statement'] = statement + sql_info['dmlCount'] = round(index.total_sql_num) + sql_info['selectRatio'] = round(index.select_sql_num * 100 / index.total_sql_num, 2) + sql_info['insertRatio'] = round(index.insert_sql_num * 100 / index.total_sql_num, 2) + sql_info['deleteRatio'] = round(index.delete_sql_num * 100 / index.total_sql_num, 2) + sql_info['updateRatio'] = round(100 - sql_info['selectRatio'] - sql_info['insertRatio'] + - sql_info['deleteRatio'], 2) + display_info['recommendIndexes'].append(sql_info) + return display_info + + +def record_redundant_indexes(cur_table_indexes, redundant_indexes): + cur_table_indexes = sorted(cur_table_indexes, + key=lambda index_obj: len(index_obj.columns.split(','))) + # record redundant indexes + has_restore = [] + for pos, index in enumerate(cur_table_indexes[:-1]): + is_redundant = False + for candidate_index in cur_table_indexes[pos + 1:]: + if 'UNIQUE INDEX' in index.indexdef: + # ensure that UNIQUE INDEX will not become redundant compared to normal index + if 'UNIQUE INDEX' not in candidate_index.indexdef: + continue + # ensure redundant index not is pkey + elif index.primary_key: + if re.match(r'%s' % candidate_index.columns, index.columns): + candidate_index.redundant_obj.append(index) + redundant_indexes.append(candidate_index) + has_restore.append(candidate_index) + continue + if re.match(r'%s' % index.columns, candidate_index.columns): + is_redundant = True + index.redundant_obj.append(candidate_index) + if is_redundant and index not in has_restore: + redundant_indexes.append(index) + + +def check_useless_index(tables): + whole_indexes = list() + redundant_indexes = list() + if not tables: + return whole_indexes, redundant_indexes + tables_string = ','.join(["'%s'" % table for table in tables[SCHEMA]]) + sql = "SELECT c.relname AS tablename, i.relname AS indexname, " \ + "pg_get_indexdef(i.oid) AS indexdef, p.contype AS pkey from " \ + "pg_index x JOIN pg_class c ON c.oid = x.indrelid JOIN " \ + "pg_class i ON i.oid = x.indexrelid LEFT JOIN pg_namespace n " \ + "ON n.oid = c.relnamespace LEFT JOIN pg_constraint p ON i.oid = p.conindid" \ + "WHERE (c.relkind = ANY (ARRAY['r'::\"char\", 'm'::\"char\"])) AND " \ + "(i.relkind = ANY (ARRAY['i'::\"char\", 'I'::\"char\"])) AND " \ + "n.nspname = '%s' AND c.relname in (%s) order by c.relname;" % (SCHEMA, tables_string) + + res = run_shell_cmd([sql]).split('\n') + if res: + cur_table_indexes = list() + for line in res: + if 'tablename' in line or re.match(r'-+', line): + continue + elif re.match(r'\(\d+ rows?\)', line): + continue + elif '|' in line: + table, index, indexdef, pkey = [item.strip() for item in line.split('|')] + cur_columns = re.search(r'\(([^\(\)]*)\)', indexdef).group(1) + cur_index_obj = IndexInfo(SCHEMA, table, index, cur_columns, indexdef) + if pkey: + cur_index_obj.primary_key = True + whole_indexes.append(cur_index_obj) + if cur_table_indexes and cur_table_indexes[-1].table != table: + record_redundant_indexes(cur_table_indexes, redundant_indexes) + cur_table_indexes = [] + cur_table_indexes.append(cur_index_obj) + if cur_table_indexes: + record_redundant_indexes(cur_table_indexes, redundant_indexes) + return whole_indexes, redundant_indexes + + +def get_whole_index(tables, detail_info): + whole_index, redundant_indexes = check_useless_index(tables) + print_header_boundary(" Created indexes ") + detail_info['createdIndexes'] = [] + if not whole_index: + print("No created index!") + else: + for index in whole_index: + index_info = {'schemaName': index.schema, 'tbName': index.table, + 'columns': index.columns, 'statement': index.indexdef + ';'} + detail_info['createdIndexes'].append(index_info) + print("%s;" % index.indexdef) + return whole_index, redundant_indexes + + +def check_unused_index_workload(whole_indexes, redundant_indexes, workload_indexes, detail_info): + indexes_name = set(index.indexname for index in whole_indexes) + unused_index = list(indexes_name.difference(workload_indexes)) + remove_list = [] + print_header_boundary(" Current workload useless indexes ") + if not unused_index: + print("No useless index!") + detail_info['uselessIndexes'] = [] + # useless index + unused_index_columns = dict() + for cur_index in unused_index: + for index in whole_indexes: + if cur_index == index.indexname: + unused_index_columns[cur_index] = index.columns + if 'UNIQUE INDEX' not in index.indexdef: + statement = "DROP INDEX %s;" % index.indexname + print(statement) + useless_index = {"schemaName": index.schema, "tbName": index.table, "type": 1, + "columns": index.columns, "statement": statement} + detail_info['uselessIndexes'].append(useless_index) + print_header_boundary(" Redundant indexes ") + # filter redundant index + for pos, index in enumerate(redundant_indexes): + is_redundant = False + for redundant_obj in index.redundant_obj: + # redundant objects are not in the useless index set or + # equal to the column value in the useless index must be redundant index + index_exist = redundant_obj.indexname not in unused_index_columns.keys() or \ + (unused_index_columns.get(redundant_obj.indexname) and + redundant_obj.columns == unused_index_columns[redundant_obj.indexname]) + if index_exist: + is_redundant = True + if not is_redundant: + remove_list.append(pos) + for item in sorted(remove_list, reverse=True): + redundant_indexes.pop(item) + + if not redundant_indexes: + print("No redundant index!") + # redundant index + for index in redundant_indexes: + statement = "DROP INDEX %s;" % index.indexname + print(statement) + existing_index = [item.indexname + ':' + item.columns for item in index.redundant_obj] + redundant_index = {"schemaName": index.schema, "tbName": index.table, "type": 2, + "columns": index.columns, "statement": statement, + "existingIndex": existing_index} + detail_info['uselessIndexes'].append(redundant_index) + + def load_workload(file_path): wd_dict = {} workload = [] @@ -176,14 +417,20 @@ def get_workload_template(workload): def workload_compression(input_path): compressed_workload = [] - - workload = load_workload(input_path) - templates = get_workload_template(workload) + total_num = 0 + if JSON_TYPE: + with open(input_path, 'r') as file: + templates = json.load(file) + else: + workload = load_workload(input_path) + templates = get_workload_template(workload) for _, elem in templates.items(): for sql in elem['samples']: - compressed_workload.append(QueryItem(sql, elem['cnt'] / SAMPLE_NUM)) - return compressed_workload + compressed_workload.append(QueryItem(sql.strip('\n'), + elem['cnt'] / len(elem['samples']))) + total_num += elem['cnt'] + return compressed_workload, total_num # parse the explain plan to get estimated cost by database optimizer @@ -213,15 +460,19 @@ def update_index_storage(res, index_config, hypo_index_num): index_config[hypo_index_num].storage = float(item.strip()) / 1024 / 1024 -def estimate_workload_cost_file(workload, index_config=None): +def estimate_workload_cost_file(workload, index_config=None, ori_indexes_name=None): total_cost = 0 sql_file = str(time.time()) + '.sql' found_plan = False hypo_index = False + is_computed = False + select_sql_pos = [] with open(sql_file, 'w') as file: if SCHEMA: file.write('SET current_schema = %s;\n' % SCHEMA) if index_config: + if len(index_config) == 1 and index_config[0].positive_pos: + is_computed = True # create hypo-indexes file.write('SET enable_hypo_index = on;\n') for index in index_config: @@ -230,8 +481,16 @@ def estimate_workload_cost_file(workload, index_config=None): if ENABLE_MULTI_NODE: file.write('set enable_fast_query_shipping = off;\n') file.write("set explain_perf_mode = 'normal'; \n") - for query in workload: - file.write('EXPLAIN ' + query.statement + ';\n') + + for ind, query in enumerate(workload): + if 'select ' not in query.statement.lower(): + workload[ind].cost_list.append(0) + else: + file.write('EXPLAIN ' + query.statement + ';\n') + select_sql_pos.append(ind) + # record ineffective sql and negative sql for candidate indexes + if is_computed: + record_ineffective_negative_sql(index_config[0], query, ind) result = run_shell_sql_cmd(sql_file).split('\n') if os.path.exists(sql_file): @@ -244,15 +503,18 @@ def estimate_workload_cost_file(workload, index_config=None): if 'QUERY PLAN' in line: found_plan = True if 'ERROR' in line: - workload.pop(i) + if i >= len(select_sql_pos): + raise ValueError("The size of workload is not correct!") + workload[select_sql_pos[i]].cost_list.append(0) + i += 1 if 'hypopg_create_index' in line: hypo_index = True if found_plan and '(cost=' in line: - if i >= len(workload): + if i >= len(select_sql_pos): raise ValueError("The size of workload is not correct!") query_cost = parse_explain_plan(line) - query_cost *= workload[i].frequency - workload[i].cost_list.append(query_cost) + query_cost *= workload[select_sql_pos[i]].frequency + workload[select_sql_pos[i]].cost_list.append(query_cost) total_cost += query_cost found_plan = False i += 1 @@ -261,8 +523,15 @@ def estimate_workload_cost_file(workload, index_config=None): hypo_index = False update_index_storage(line, index_config, hypo_index_num) hypo_index_num += 1 - while i < len(workload): - workload[i].cost_list.append(0) + if 'Index' in line and 'Scan' in line and not index_config: + ind1, ind2 = re.search(r'Index.*Scan(.*)on ([^\s]+)', + line.strip(), re.IGNORECASE).groups() + if ind1.strip(): + ori_indexes_name.add(ind1.strip().split(' ')[1]) + else: + ori_indexes_name.add(ind2) + while i < len(select_sql_pos): + workload[select_sql_pos[i]].cost_list.append(0) i += 1 if index_config: run_shell_cmd(['SELECT hypopg_reset_index();']) @@ -282,11 +551,13 @@ def make_single_advisor_sql(ori_sql): return sql -def parse_single_advisor_result(res): +def parse_single_advisor_result(res, workload_table_name): table_index_dict = {} if len(res) > 2 and res[0:2] == ' (': items = res.split(',', 1) table = items[0][2:] + workload_table_name[SCHEMA] = workload_table_name.get(SCHEMA, set()) + workload_table_name[SCHEMA].add(table) indexes = re.split('[()]', items[1][:-1].strip('\"')) for columns in indexes: if columns == '': @@ -299,7 +570,7 @@ def parse_single_advisor_result(res): # call the single-index-advisor in the database -def query_index_advisor(query): +def query_index_advisor(query, workload_table_name): table_index_dict = {} if 'select' not in query.lower(): @@ -309,7 +580,7 @@ def query_index_advisor(query): result = run_shell_cmd([sql]).split('\n') for res in result: - table_index_dict.update(parse_single_advisor_result(res)) + table_index_dict.update(parse_single_advisor_result(res, workload_table_name)) return table_index_dict @@ -329,30 +600,37 @@ def query_index_check(query, query_index_dict): if columns != '': sql_list.append("SELECT hypopg_create_index('CREATE INDEX ON %s(%s)')" % (table, columns)) + sql_list.append('SELECT hypopg_display_index()') sql_list.append("set explain_perf_mode = 'normal'; explain " + query) sql_list.append('SELECT hypopg_reset_index()') result = run_shell_cmd(sql_list).split('\n') # parse the result of explain plan + hypoid_table_column = {} + hypo_display = False for line in result: - hypo_index = '' + if hypo_display and 'btree' in line: + hypo_index_info = line.split(',', 3) + if len(hypo_index_info) == 4: + hypoid_table_column[hypo_index_info[1]] = \ + hypo_index_info[2] + ':' + hypo_index_info[3].strip('"()') + if hypo_display and re.search(r'\d+ rows', line): + hypo_display = False + if 'hypopg_display_index' in line: + hypo_display = True if 'Index' in line and 'Scan' in line and 'btree' in line: tokens = line.split(' ') for token in tokens: if 'btree' in token: - hypo_index = token.split('_', 1)[1] - if len(hypo_index) > 1: - for table in query_index_dict.keys(): - for columns in query_index_dict[table]: - index_name_list = columns.split(',') - index_name_list.insert(0, table) - index_name = "_".join(index_name_list) - if index_name != hypo_index: - continue - if table not in valid_indexes.keys(): - valid_indexes[table] = [] - if columns not in valid_indexes[table]: - valid_indexes[table].append(columns) + hypo_index_id = re.search(r'\d+', token.split('_', 1)[0]).group() + table_columns = hypoid_table_column.get(hypo_index_id) + if not table_columns: + continue + table_name, columns = table_columns.split(':') + if table_name not in valid_indexes.keys(): + valid_indexes[table_name] = [] + if columns not in valid_indexes[table_name]: + valid_indexes[table_name].append(columns) return valid_indexes @@ -373,13 +651,13 @@ def get_indexable_columns(table_index_dict): return query_indexable_columns -def generate_candidate_indexes(workload, iterate=False): +def generate_candidate_indexes(workload, workload_table_name): candidate_indexes = [] index_dict = {} for k, query in enumerate(workload): - table_index_dict = query_index_advisor(query.statement) - if iterate: + if 'select ' in query.statement.lower(): + table_index_dict = query_index_advisor(query.statement, workload_table_name) need_check = False query_indexable_columns = get_indexable_columns(table_index_dict) valid_index_dict = query_index_check(query.statement, query_indexable_columns) @@ -397,56 +675,24 @@ def generate_candidate_indexes(workload, iterate=False): need_check = False else: break - else: - valid_index_dict = query_index_check(query.statement, table_index_dict) - # filter duplicate indexes - for table in valid_index_dict.keys(): - if table not in index_dict.keys(): - index_dict[table] = set() - for columns in valid_index_dict[table]: - if len(workload[k].valid_index_list) >= FULL_ARRANGEMENT_THRESHOLD: - break - workload[k].valid_index_list.append(IndexItem(table, columns)) - if columns not in index_dict[table]: - print("table: ", table, "columns: ", columns) - index_dict[table].add(columns) - candidate_indexes.append(IndexItem(table, columns)) - - return candidate_indexes - - -def generate_candidate_indexes_file(workload): - candidate_indexes = [] - index_dict = {} - sql_file = str(time.time()) + '.sql' - - if len(workload) > 0: - run_shell_cmd([workload[0].statement]) - with open(sql_file, 'w') as file: - if SCHEMA: - file.write('SET current_schema = %s;\n' % SCHEMA) - for query in workload: - if 'select' in query.statement.lower(): - file.write(make_single_advisor_sql(query.statement) + '\n') - - result = run_shell_sql_cmd(sql_file).split('\n') - if os.path.exists(sql_file): - os.remove(sql_file) - - for line in result: - table_index_dict = parse_single_advisor_result(line.strip('\n')) - # filter duplicate indexes - for table, columns in table_index_dict.items(): - if table not in index_dict.keys(): - index_dict[table] = set() - for column in columns: - if column == "": - continue - if column not in index_dict[table] and not re.match(r'\s*,\s*$', column): - print("table: ", table, "columns: ", column) - index_dict[table].add(column) - candidate_indexes.append(IndexItem(table, column)) + # filter duplicate indexes + for table in valid_index_dict.keys(): + if table not in index_dict.keys(): + index_dict[table] = {} + for columns in valid_index_dict[table]: + if len(workload[k].valid_index_list) >= FULL_ARRANGEMENT_THRESHOLD: + break + workload[k].valid_index_list.append(IndexItem(table, columns)) + if not any(re.match(r'%s' % columns, item) for item in index_dict[table]): + column_sql = {columns: [k]} + index_dict[table].update(column_sql) + elif columns in index_dict[table].keys(): + index_dict[table][columns].append(k) + for table, column_sqls in index_dict.items(): + for column, sql in column_sqls.items(): + print("table: ", table, "columns: ", column) + candidate_indexes.append(IndexItem(table, column, sql)) return candidate_indexes @@ -517,6 +763,10 @@ def find_subsets_num(config, atomic_config_total): for i, atomic_config in enumerate(atomic_config_total): if len(atomic_config) > len(config): continue + # Record the atomic index position of the newly added index + if len(atomic_config) == 1 and atomic_config[0].table == config[-1].table and \ + atomic_config[0].columns == config[-1].columns: + cur_index_atomic_pos = i for atomic_index in atomic_config: is_exist = False for index in config: @@ -529,7 +779,7 @@ def find_subsets_num(config, atomic_config_total): if is_exist: atomic_subsets_num.append(i) - return atomic_subsets_num + return atomic_subsets_num, cur_index_atomic_pos def get_index_num(index, atomic_config_total): @@ -541,14 +791,39 @@ def get_index_num(index, atomic_config_total): return -1 +def record_ineffective_negative_sql(candidate_index, obj, ind): + cur_table = candidate_index.table + ' ' + if cur_table in obj.statement: + candidate_index.total_sql_num += obj.frequency + if 'insert ' in obj.statement.lower(): + candidate_index.insert_sql_num += obj.frequency + candidate_index.negative_pos.append(ind) + elif 'delete ' in obj.statement.lower(): + candidate_index.delete_sql_num += obj.frequency + candidate_index.negative_pos.append(ind) + elif 'update ' in obj.statement.lower(): + candidate_index.update_sql_num += obj.frequency + if any(column in obj.statement.lower().split('where ', 1)[0] for column in + candidate_index.columns.split(',')): + candidate_index.negative_pos.append(ind) + else: + candidate_index.ineffective_pos.append(ind) + else: + candidate_index.select_sql_num += obj.frequency + if ind not in candidate_index.positive_pos and \ + any(column in obj.statement.lower() for column in candidate_index.columns): + candidate_index.ineffective_pos.append(ind) + + # infer the total cost of workload for a config according to the cost of atomic configs def infer_workload_cost(workload, config, atomic_config_total): total_cost = 0 - - atomic_subsets_num = find_subsets_num(config, atomic_config_total) + is_computed = False + atomic_subsets_num, cur_index_atomic_pos = find_subsets_num(config, atomic_config_total) if len(atomic_subsets_num) == 0: raise ValueError("No atomic configs found for current config!") - + if not config[-1].total_sql_num: + is_computed = True for ind, obj in enumerate(workload): if max(atomic_subsets_num) >= len(obj.cost_list): raise ValueError("Wrong atomic config for current query!") @@ -559,63 +834,39 @@ def infer_workload_cost(workload, config, atomic_config_total): min_cost = obj.cost_list[num] total_cost += min_cost - # compute the cost for updating indexes - if 'insert' in obj.statement.lower() or 'delete' in obj.statement.lower(): - for index in config: - index_num = get_index_num(index, atomic_config_total) - if index_num == -1: - raise ValueError("The index isn't found for current query!") - if 0 <= index_num < len(workload[ind].cost_list): - total_cost += obj.cost_list[index_num] - obj.cost_list[0] - - return total_cost + # record ineffective sql and negative sql for candidate indexes + if is_computed: + record_ineffective_negative_sql(config[-1], obj, ind) + return total_cost, cur_index_atomic_pos def simple_index_advisor(input_path, max_index_num): - workload = workload_compression(input_path) + workload, workload_count = workload_compression(input_path) print_header_boundary(" Generate candidate indexes ") - candidate_indexes = generate_candidate_indexes_file(workload) + ori_indexes_name = set() + workload_table_name = dict() + display_info = {'workloadCount': workload_count, 'recommendIndexes': []} + candidate_indexes = generate_candidate_indexes(workload, workload_table_name) if len(candidate_indexes) == 0: print("No candidate indexes generated!") - return + estimate_workload_cost_file(workload, ori_indexes_name=ori_indexes_name) + return ori_indexes_name, workload_table_name, display_info print_header_boundary(" Determine optimal indexes ") - ori_total_cost = estimate_workload_cost_file(workload) + ori_total_cost = estimate_workload_cost_file(workload, ori_indexes_name=ori_indexes_name) + index_cost_total = [ori_total_cost] for _, obj in enumerate(candidate_indexes): new_total_cost = estimate_workload_cost_file(workload, [obj]) + index_cost_total.append(new_total_cost) obj.benefit = ori_total_cost - new_total_cost - candidate_indexes = sorted(candidate_indexes, key=lambda item: item.benefit, reverse=True) + candidate_indexes = sorted(enumerate(candidate_indexes), + key=lambda item: item[1].benefit, reverse=True) + candidate_indexes = [item for item in candidate_indexes if item[1].benefit > 0] + global MAX_INDEX_NUM + MAX_INDEX_NUM = max_index_num - # filter out duplicate index - final_index_set = {} - final_index_list = [] - for index in candidate_indexes: - picked = True - cols = set(index.columns.split(',')) - if index.table not in final_index_set.keys(): - final_index_set[index.table] = [] - for i in range(len(final_index_set[index.table]) - 1, -1, -1): - pre_index = final_index_set[index.table][i] - pre_cols = set(pre_index.columns.split(',')) - if len(pre_cols.difference(cols)) == 0 and len(pre_cols) < len(cols): - final_index_set[index.table].pop(i) - if len(cols.difference(pre_cols)) == 0: - picked = False - break - if picked and index.benefit > 0: - final_index_set[index.table].append(index) - [final_index_list.extend(item) for item in list(final_index_set.values())] - final_index_list = sorted(final_index_list, key=lambda item: item.benefit, reverse=True) - cnt = 0 - index_current_storage = 0 - for index in final_index_list: - if max_index_num and cnt == max_index_num: - break - if MAX_INDEX_STORAGE and (index_current_storage + index.storage) > MAX_INDEX_STORAGE: - continue - index_current_storage += index.storage - print("create index ind" + str(cnt) + " on " + index.table + "(" + index.columns + ");") - cnt += 1 + display_recommend_result(workload, candidate_indexes, index_cost_total, False, display_info) + return ori_indexes_name, workload_table_name, display_info def greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes): @@ -633,10 +884,12 @@ def greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes continue cur_config = copy.copy(opt_config) cur_config.append(index) - cur_estimated_cost = infer_workload_cost(workload, cur_config, atomic_config_total) + cur_estimated_cost, cur_index_atomic_pos = \ + infer_workload_cost(workload, cur_config, atomic_config_total) if cur_estimated_cost < cur_min_cost: cur_min_cost = cur_estimated_cost cur_index = index + cur_index.atomic_pos = cur_index_atomic_pos cur_index_num = k if cur_index and cur_min_cost < min_cost: if MAX_INDEX_STORAGE and sum([obj.storage for obj in opt_config]) + \ @@ -654,33 +907,30 @@ def greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes def complex_index_advisor(input_path): - workload = workload_compression(input_path) + workload, workload_count = workload_compression(input_path) print_header_boundary(" Generate candidate indexes ") - candidate_indexes = generate_candidate_indexes(workload, True) + ori_indexes_name = set() + workload_table_name = dict() + display_info = {'workloadCount': workload_count, 'recommendIndexes': []} + candidate_indexes = generate_candidate_indexes(workload, workload_table_name) if len(candidate_indexes) == 0: print("No candidate indexes generated!") - return + estimate_workload_cost_file(workload, ori_indexes_name=ori_indexes_name) + return ori_indexes_name, workload_table_name, display_info print_header_boundary(" Determine optimal indexes ") atomic_config_total = generate_atomic_config(workload) - if len(atomic_config_total[0]) != 0: + if atomic_config_total and len(atomic_config_total[0]) != 0: raise ValueError("The empty atomic config isn't generated!") - + index_cost_total = [] for atomic_config in atomic_config_total: - estimate_workload_cost_file(workload, atomic_config) + index_cost_total.append(estimate_workload_cost_file(workload, atomic_config, + ori_indexes_name)) opt_config = greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes) - cnt = 0 - index_current_storage = 0 - for index in opt_config: - if MAX_INDEX_STORAGE and (index_current_storage + index.storage) > MAX_INDEX_STORAGE: - continue - if cnt == MAX_INDEX_NUM: - break - index_current_storage += index.storage - print("create index ind" + str(cnt) + " on " + index.table + "(" + index.columns + ");") - cnt += 1 + display_recommend_result(workload, opt_config, index_cost_total, True, display_info) + return ori_indexes_name, workload_table_name, display_info def main(): @@ -691,7 +941,7 @@ def main(): arg_parser.add_argument("-U", help="Username for database log-in") arg_parser.add_argument("-W", help="Password for database user", nargs="?", action=PwdAction) arg_parser.add_argument("f", help="File containing workload queries (One query per line)") - arg_parser.add_argument("--schema", help="Schema name for the current business data") + arg_parser.add_argument("--schema", help="Schema name for the current business data", required=True) arg_parser.add_argument("--max_index_num", help="Maximum number of suggested indexes", type=int) arg_parser.add_argument("--max_index_storage", help="Maximum storage of suggested indexes/MB", type=int) @@ -699,9 +949,13 @@ def main(): help="Whether to use multi-iteration algorithm", default=False) arg_parser.add_argument("--multi_node", action='store_true', help="Whether to support distributed scenarios", default=False) + arg_parser.add_argument("--json", action='store_true', + help="Whether the workload file format is json", default=False) + arg_parser.add_argument("--show_detail", action='store_true', + help="Whether to show detailed sql information", default=False) args = arg_parser.parse_args() - global MAX_INDEX_NUM, BASE_CMD, ENABLE_MULTI_NODE, MAX_INDEX_STORAGE, SCHEMA + global MAX_INDEX_NUM, BASE_CMD, ENABLE_MULTI_NODE, MAX_INDEX_STORAGE, SCHEMA, JSON_TYPE if args.max_index_num is not None and args.max_index_num <= 0: raise argparse.ArgumentTypeError("%s is an invalid positive int value" % args.max_index_num) @@ -710,6 +964,7 @@ def main(): args.max_index_storage) if args.schema: SCHEMA = args.schema + JSON_TYPE = args.json MAX_INDEX_NUM = args.max_index_num or 10 ENABLE_MULTI_NODE = args.multi_node MAX_INDEX_STORAGE = args.max_index_storage @@ -725,10 +980,19 @@ def main(): BASE_CMD += ' -W ' + args.W if args.multi_iter_mode: - complex_index_advisor(args.f) + workload_indexes, tables, detail_info = complex_index_advisor(args.f) else: - simple_index_advisor(args.f, args.max_index_num) + workload_indexes, tables, detail_info = simple_index_advisor(args.f, args.max_index_num) + + whole_indexes, redundant_indexes = get_whole_index(tables, detail_info) + # check the unused indexes of the current workload based on the whole index + check_unused_index_workload(whole_indexes, redundant_indexes, workload_indexes, detail_info) + if args.show_detail: + print_header_boundary(" Display detail information ") + sql_info = json.dumps(detail_info, indent=4, separators=(',', ':')) + print(sql_info) if __name__ == '__main__': main() + diff --git a/src/gausskernel/dbmind/tools/index_advisor/index_advisor_workload_driver.py b/src/gausskernel/dbmind/tools/index_advisor/index_advisor_workload_driver.py new file mode 100644 index 000000000..5d9bb565b --- /dev/null +++ b/src/gausskernel/dbmind/tools/index_advisor/index_advisor_workload_driver.py @@ -0,0 +1,951 @@ +import os +import sys +import argparse +import copy +import getpass +import random +import re +import select +import logging +import psycopg2 +import json + +ENABLE_MULTI_NODE = False +SAMPLE_NUM = 5 +MAX_INDEX_COLUMN_NUM = 5 +MAX_INDEX_NUM = 10 +MAX_INDEX_STORAGE = None +FULL_ARRANGEMENT_THRESHOLD = 20 +NEGATIVE_RATIO_THRESHOLD = 0.2 +BASE_CMD = '' +SHARP = '#' +SCHEMA = None +JSON_TYPE = False +BLANK = ' ' +SQL_TYPE = ['select', 'delete', 'insert', 'update'] +SQL_PATTERN = [r'\((\s*(\d+(\.\d+)?\s*)[,]?)+\)', # match integer set in the IN collection + r'([^\\])\'((\')|(.*?([^\\])\'))', # match all content in single quotes + r'(([^<>]\s*=\s*)|([^<>]\s+))(\d+)(\.\d+)?'] # match single integer +SQL_DISPLAY_PATTERN = [r'\((\s*(\d+(\.\d+)?\s*)[,]?)+\)', # match integer set in the IN collection + r'\'((\')|(.*?\'))', # match all content in single quotes + r'([^\_\d])\d+(\.\d+)?'] # match single integer + +logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') + + +def read_input_from_pipe(): + """ + Read stdin input if there is "echo 'str1 str2' | python xx.py", + return the input string + """ + input_str = "" + r_handle, _, _ = select.select([sys.stdin], [], [], 0) + if not r_handle: + return "" + + for item in r_handle: + if item == sys.stdin: + input_str = sys.stdin.read().strip() + return input_str + + +class PwdAction(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + password = read_input_from_pipe() + if password: + logging.warning("Read password from pipe.") + else: + password = getpass.getpass("Password for database user:") + setattr(namespace, self.dest, password) + + +class QueryItem: + def __init__(self, sql, freq): + self.statement = sql + self.frequency = freq + self.valid_index_list = [] + self.cost_list = [] + + +class IndexItem: + def __init__(self, tbl, cols, positive_pos=None): + self.table = tbl + self.columns = cols + self.atomic_pos = 0 + self.benefit = 0 + self.storage = 0 + self.positive_pos = positive_pos + self.ineffective_pos = [] + self.negative_pos = [] + self.total_sql_num = 0 + self.insert_sql_num = 0 + self.update_sql_num = 0 + self.delete_sql_num = 0 + self.select_sql_num = 0 + + +class IndexInfo: + def __init__(self, schema, table, indexname, columns, indexdef): + self.schema = schema + self.table = table + self.indexname = indexname + self.columns = columns + self.indexdef = indexdef + self.primary_key = False + self.redundant_obj = [] + + +class DatabaseConn: + def __init__(self, dbname, user, password, host, port): + self.dbname = dbname + self.user = user + self.password = password + self.host = host + self.port = port + self.conn = None + self.cur = None + + def init_conn_handle(self): + self.conn = psycopg2.connect(dbname=self.dbname, + user=self.user, + password=self.password, + host=self.host, + port=self.port) + self.cur = self.conn.cursor() + + def execute(self, sql): + try: + self.cur.execute(sql) + self.conn.commit() + return self.cur.fetchall() + except Exception: + self.conn.commit() + + def close_conn(self): + if self.conn and self.cur: + self.cur.close() + self.conn.close() + + +def green(text): + return '\033[32m%s\033[0m' % text + + +def print_header_boundary(header): + # Output a header first, which looks more beautiful. + try: + term_width = os.get_terminal_size().columns + # The width of each of the two sides of the terminal. + side_width = (term_width - len(header)) // 2 + except (AttributeError, OSError): + side_width = 0 + title = SHARP * side_width + header + SHARP * side_width + print(green(title)) + + +def filter_low_benefit(pos_list, candidate_indexes, multi_iter_mode, workload): + remove_list = [] + for key, index in enumerate(candidate_indexes): + sql_optimzed = 0 + for ind, pos in enumerate(index.positive_pos): + if multi_iter_mode: + cost_list_pos = index.atomic_pos + else: + cost_list_pos = pos_list[key] + 1 + sql_optimzed += 1 - workload[pos].cost_list[cost_list_pos] / workload[pos].cost_list[0] + negative_ratio = (index.insert_sql_num + index.delete_sql_num + index.update_sql_num) / \ + index.total_sql_num + # filter the candidate indexes that do not meet the conditions of optimization + if sql_optimzed / len(index.positive_pos) < 0.1: + remove_list.append(key) + elif sql_optimzed / len(index.positive_pos) < NEGATIVE_RATIO_THRESHOLD < negative_ratio: + remove_list.append(key) + for item in sorted(remove_list, reverse=True): + candidate_indexes.pop(item) + + +def display_recommend_result(workload, candidate_indexes, index_cost_total, + multi_iter_mode, display_info): + cnt = 0 + index_current_storage = 0 + pos_list = [] + if not multi_iter_mode: + pos_list = [item[0] for item in candidate_indexes] + candidate_indexes = [item[1] for item in candidate_indexes] + # filter candidate indexes with low benefit + filter_low_benefit(pos_list, candidate_indexes, multi_iter_mode, workload) + # display determine result + for key, index in enumerate(candidate_indexes): + if MAX_INDEX_STORAGE and (index_current_storage + index.storage) > MAX_INDEX_STORAGE: + continue + if MAX_INDEX_NUM and cnt == MAX_INDEX_NUM: + break + index_current_storage += index.storage + table_name = index.table.split('.')[-1] + index_name = 'idx_' + table_name + '_' + '_'.join(index.columns.split(', ')) + statement = 'CREATE INDEX ' + index_name + ' ON ' + index.table + '(' + index.columns + ');' + print(statement) + cnt += 1 + if multi_iter_mode: + cost_list_pos = index.atomic_pos + else: + cost_list_pos = pos_list[key] + 1 + + sql_info = {'sqlDetails': []} + benefit_types = [index.ineffective_pos, index.positive_pos, index.negative_pos] + for category, benefit_type in enumerate(benefit_types): + sql_count = 0 + for item in benefit_type: + sql_count += workload[item].frequency + for ind, pos in enumerate(benefit_type): + sql_detail = {} + sql_template = workload[pos].statement + for pattern in SQL_DISPLAY_PATTERN: + sql_template = re.sub(pattern, '?', sql_template) + + sql_detail['sqlTemplate'] = sql_template + sql_detail['sql'] = workload[pos].statement + sql_detail['sqlCount'] = sql_count + if category == 1: + sql_optimzed = (workload[pos].cost_list[0] - + workload[pos].cost_list[cost_list_pos]) / \ + workload[pos].cost_list[cost_list_pos] + sql_detail['optimized'] = '%.3f' % sql_optimzed + sql_detail['correlationType'] = category + sql_info['sqlDetails'].append(sql_detail) + workload_optimized = (1 - index_cost_total[cost_list_pos] / index_cost_total[0]) * 100 + sql_info['workloadOptimized'] = '%.2f' % (workload_optimized if workload_optimized > 1 else 1) + sql_info['schemaName'] = SCHEMA + sql_info['tbName'] = table_name + sql_info['columns'] = index.columns + sql_info['statement'] = statement + sql_info['dmlCount'] = round(index.total_sql_num) + sql_info['selectRatio'] = round(index.select_sql_num * 100 / index.total_sql_num, 2) + sql_info['insertRatio'] = round(index.insert_sql_num * 100 / index.total_sql_num, 2) + sql_info['deleteRatio'] = round(index.delete_sql_num * 100 / index.total_sql_num, 2) + sql_info['updateRatio'] = round(100 - sql_info['selectRatio'] - sql_info['insertRatio'] + - sql_info['deleteRatio'], 2) + display_info['recommendIndexes'].append(sql_info) + return display_info + + +def record_redundant_indexes(cur_table_indexes, redundant_indexes): + cur_table_indexes = sorted(cur_table_indexes, + key=lambda index_obj: len(index_obj.columns.split(','))) + # record redundant indexes + has_restore = [] + for pos, index in enumerate(cur_table_indexes[:-1]): + is_redundant = False + for candidate_index in cur_table_indexes[pos + 1:]: + if 'UNIQUE INDEX' in index.indexdef: + # ensure that UNIQUE INDEX will not become redundant compared to normal index + if 'UNIQUE INDEX' not in candidate_index.indexdef: + continue + # ensure redundant index not is pkey + elif index.primary_key: + if re.match(r'%s' % candidate_index.columns, index.columns): + candidate_index.redundant_obj.append(index) + redundant_indexes.append(candidate_index) + has_restore.append(candidate_index) + continue + if re.match(r'%s' % index.columns, candidate_index.columns): + is_redundant = True + index.redundant_obj.append(candidate_index) + if is_redundant and index not in has_restore: + redundant_indexes.append(index) + + +def check_useless_index(tables, db): + whole_indexes = list() + redundant_indexes = list() + if not tables: + return whole_indexes, redundant_indexes + tables_string = ','.join(["'%s'" % table for table in tables[SCHEMA]]) + sql = "SELECT c.relname AS tablename, i.relname AS indexname, " \ + "pg_get_indexdef(i.oid) AS indexdef, p.contype AS pkey from " \ + "pg_index x JOIN pg_class c ON c.oid = x.indrelid JOIN " \ + "pg_class i ON i.oid = x.indexrelid LEFT JOIN pg_namespace n " \ + "ON n.oid = c.relnamespace LEFT JOIN pg_constraint p ON i.oid = p.conindid" \ + "WHERE (c.relkind = ANY (ARRAY['r'::\"char\", 'm'::\"char\"])) AND " \ + "(i.relkind = ANY (ARRAY['i'::\"char\", 'I'::\"char\"])) AND " \ + "n.nspname = '%s' AND c.relname in (%s) order by c.relname;" % (SCHEMA, tables_string) + + res = db.execute(sql) + if res: + cur_table_indexes = list() + for item in res: + cur_columns = re.search(r'\(([^\(\)]*)\)', item[2]).group(1) + cur_index_obj = IndexInfo(SCHEMA, item[0], item[1], cur_columns, item[2]) + if item[3]: + cur_index_obj.primary_key = True + whole_indexes.append(cur_index_obj) + if cur_table_indexes and cur_table_indexes[-1].table != item[0]: + record_redundant_indexes(cur_table_indexes, redundant_indexes) + cur_table_indexes = [] + cur_table_indexes.append(cur_index_obj) + if cur_table_indexes: + record_redundant_indexes(cur_table_indexes, redundant_indexes) + + return whole_indexes, redundant_indexes + + +def get_whole_index(tables, db, detail_info): + db.init_conn_handle() + whole_index, redundant_indexes = check_useless_index(tables, db) + db.close_conn() + print_header_boundary(" Created indexes ") + detail_info['createdIndexes'] = [] + if not whole_index: + print("No created index!") + else: + for index in whole_index: + index_info = {'schemaName': index.schema, 'tbName': index.table, + 'columns': index.columns, 'statement': index.indexdef + ';'} + detail_info['createdIndexes'].append(index_info) + print("%s;" % index.indexdef) + return whole_index, redundant_indexes + + +def check_unused_index_workload(whole_indexes, redundant_indexes, workload_indexes, detail_info): + indexes_name = set(index.indexname for index in whole_indexes) + unused_index = list(indexes_name.difference(workload_indexes)) + remove_list = [] + print_header_boundary(" Current workload useless indexes ") + if not unused_index: + print("No useless index!") + detail_info['uselessIndexes'] = [] + # useless index + unused_index_columns = dict() + for cur_index in unused_index: + for index in whole_indexes: + if cur_index == index.indexname: + unused_index_columns[cur_index] = index.columns + if 'UNIQUE INDEX' not in index.indexdef: + statement = "DROP INDEX %s;" % index.indexname + print(statement) + useless_index = {"schemaName": index.schema, "tbName": index.table, "type": 1, + "columns": index.columns, "statement": statement} + detail_info['uselessIndexes'].append(useless_index) + print_header_boundary(" Redundant indexes ") + # filter redundant index + for pos, index in enumerate(redundant_indexes): + is_redundant = False + for redundant_obj in index.redundant_obj: + # redundant objects are not in the useless index set or + # equal to the column value in the useless index must be redundant index + index_exist = redundant_obj.indexname not in unused_index_columns.keys() or \ + (unused_index_columns.get(redundant_obj.indexname) and + redundant_obj.columns == unused_index_columns[redundant_obj.indexname]) + if index_exist: + is_redundant = True + if not is_redundant: + remove_list.append(pos) + for item in sorted(remove_list, reverse=True): + redundant_indexes.pop(item) + + if not redundant_indexes: + print("No redundant index!") + # redundant index + for index in redundant_indexes: + statement = "DROP INDEX %s;" % index.indexname + print(statement) + existing_index = [item.indexname + ':' + item.columns for item in index.redundant_obj] + redundant_index = {"schemaName": index.schema, "tbName": index.table, "type": 2, + "columns": index.columns, "statement": statement, "existingIndex": existing_index} + detail_info['uselessIndexes'].append(redundant_index) + + +def load_workload(file_path): + wd_dict = {} + workload = [] + global BLANK + with open(file_path, 'r') as file: + raw_text = ''.join(file.readlines()) + sqls = raw_text.split(';') + for sql in sqls: + if any(tp in sql.lower() for tp in SQL_TYPE): + TWO_BLANKS = BLANK * 2 + while TWO_BLANKS in sql: + sql = sql.replace(TWO_BLANKS, BLANK) + if sql not in wd_dict.keys(): + wd_dict[sql] = 1 + else: + wd_dict[sql] += 1 + for sql, freq in wd_dict.items(): + workload.append(QueryItem(sql, freq)) + + return workload + + +def get_workload_template(workload): + templates = {} + placeholder = r'@@@' + + for item in workload: + sql_template = item.statement + for pattern in SQL_PATTERN: + sql_template = re.sub(pattern, placeholder, sql_template) + if sql_template not in templates: + templates[sql_template] = {} + templates[sql_template]['cnt'] = 0 + templates[sql_template]['samples'] = [] + templates[sql_template]['cnt'] += item.frequency + # reservoir sampling + if len(templates[sql_template]['samples']) < SAMPLE_NUM: + templates[sql_template]['samples'].append(item.statement) + else: + if random.randint(0, templates[sql_template]['cnt']) < SAMPLE_NUM: + templates[sql_template]['samples'][random.randint(0, SAMPLE_NUM - 1)] = \ + item.statement + + return templates + + +def workload_compression(input_path): + total_num = 0 + compressed_workload = [] + if JSON_TYPE: + with open(input_path, 'r') as file: + templates = json.load(file) + else: + workload = load_workload(input_path) + templates = get_workload_template(workload) + + for _, elem in templates.items(): + for sql in elem['samples']: + compressed_workload.append(QueryItem(sql.strip('\n'), + elem['cnt'] / len(elem['samples']))) + total_num += elem['cnt'] + return compressed_workload, total_num + + +# parse the explain plan to get estimated cost by database optimizer +def parse_explain_plan(plan, index_config, ori_indexes_name): + cost_total = -1 + cost_flag = True + for item in plan: + if '(cost=' in item[0] and cost_flag: + cost_flag = False + pattern = re.compile(r'\(cost=([^\)]*)\)', re.S) + matched_res = re.search(pattern, item[0]) + if matched_res: + cost_list = matched_res.group(1).split() + if len(cost_list) == 3: + cost_total = float(cost_list[0].split('..')[-1]) + if 'Index' in item[0] and 'Scan' in item[0] and not index_config: + ind1, ind2 = re.search(r'Index.*Scan(.*)on ([^\s]+)', + item[0].strip(), re.IGNORECASE).groups() + if ind1.strip(): + ori_indexes_name.add(ind1.strip().split(' ')[1]) + else: + ori_indexes_name.add(ind2) + break + + return cost_total + + +def update_index_storage(index_id, index_config, hypo_index_num, db): + index_size_sql = 'select * from hypopg_estimate_size(%s);' % index_id + res = db.execute(index_size_sql) + if res: + index_config[hypo_index_num].storage = float(res[0][0]) / 1024 / 1024 + + +def estimate_workload_cost_file(workload, db, index_config=None, ori_indexes_name=None): + total_cost = 0 + hypo_index_num = 0 + is_computed = False + db.execute('SET current_schema = %s' % SCHEMA) + if index_config: + if len(index_config) == 1 and index_config[0].positive_pos: + is_computed = True + # create hypo-indexes + db.execute('SET enable_hypo_index = on') + for index in index_config: + res = db.execute("SELECT * from hypopg_create_index('CREATE INDEX ON %s(%s)')" % + (index.table, index.columns)) + if MAX_INDEX_STORAGE and res: + update_index_storage(res[0][0], index_config, hypo_index_num, db) + hypo_index_num += 1 + if ENABLE_MULTI_NODE: + db.execute('set enable_fast_query_shipping = off;set enable_stream_operator = on') + db.execute("set explain_perf_mode = 'normal'") + + remove_list = [] + for ind, query in enumerate(workload): + # record ineffective sql and negative sql for candidate indexes + if is_computed: + record_ineffective_negative_sql(index_config[0], query, ind) + if 'select ' not in query.statement.lower(): + workload[ind].cost_list.append(0) + else: + res = db.execute('EXPLAIN ' + query.statement) + if res: + query_cost = parse_explain_plan(res, index_config, ori_indexes_name) + query_cost *= workload[ind].frequency + workload[ind].cost_list.append(query_cost) + total_cost += query_cost + if index_config: + db.execute('SELECT hypopg_reset_index()') + return total_cost + + +def make_single_advisor_sql(ori_sql): + sql = 'set current_schema = %s; select gs_index_advise(\'' % SCHEMA + ori_sql = ori_sql.replace('"', '\'') + for elem in ori_sql: + if elem == '\'': + sql += '\'' + sql += elem + sql += '\');' + + return sql + + +def parse_single_advisor_result(res, workload_table_name): + table_index_dict = {} + items = res.strip('()').split(',', 1) + if len(items) == 2: + table = items[0] + workload_table_name[SCHEMA] = workload_table_name.get(SCHEMA, set()) + workload_table_name[SCHEMA].add(table) + indexes = re.split('[()]', items[1].strip('\"')) + for columns in indexes: + if columns == '': + continue + if table not in table_index_dict.keys(): + table_index_dict[table] = [] + table_index_dict[table].append(columns) + + return table_index_dict + + +# call the single-index-advisor in the database +def query_index_advisor(query, workload_table_name, db): + table_index_dict = {} + + if 'select' not in query.lower(): + return table_index_dict + + sql = make_single_advisor_sql(query) + result = db.execute(sql=sql) + if not result: + return table_index_dict + for res in result: + table_index_dict.update(parse_single_advisor_result(res[0], workload_table_name)) + + return table_index_dict + + +# judge whether the index is used by the optimizer +def query_index_check(query, query_index_dict, db): + valid_indexes = {} + if len(query_index_dict) == 0: + return valid_indexes + + # create hypo-indexes + sqls = 'SET enable_hypo_index = on;' + if ENABLE_MULTI_NODE: + sqls += 'SET enable_fast_query_shipping = off;SET enable_stream_operator = on;' + for table in query_index_dict.keys(): + for columns in query_index_dict[table]: + if columns != '': + sqls += "SELECT hypopg_create_index('CREATE INDEX ON %s(%s)');" % \ + (table, columns) + sqls += 'SELECT * from hypopg_display_index();' + result = db.execute(sqls) + if not result: + return valid_indexes + hypoid_table_column = {} + for item in result: + if len(item) == 4: + hypoid_table_column[str(item[1])] = item[2] + ':' + item[3].strip('()') + sqls = "SET explain_perf_mode = 'normal'; explain %s" % query + result = db.execute(sqls) + if not result: + return valid_indexes + # parse the result of explain plan + for item in result: + if 'Index' in item[0] and 'Scan' in item[0] and 'btree' in item[0]: + tokens = item[0].split(' ') + for token in tokens: + if 'btree' in token: + hypo_index_id = re.search(r'\d+', token.split('_', 1)[0]).group() + table_columns = hypoid_table_column.get(hypo_index_id) + if not table_columns: + continue + table_name, columns = table_columns.split(':') + if table_name not in valid_indexes.keys(): + valid_indexes[table_name] = [] + if columns not in valid_indexes[table_name]: + valid_indexes[table_name].append(columns) + db.execute('SELECT hypopg_reset_index()') + return valid_indexes + + +# enumerate the column combinations for a suggested index +def get_indexable_columns(table_index_dict): + query_indexable_columns = {} + if len(table_index_dict) == 0: + return query_indexable_columns + + for table in table_index_dict.keys(): + query_indexable_columns[table] = [] + for columns in table_index_dict[table]: + indexable_columns = columns.split(',') + for column in indexable_columns: + query_indexable_columns[table].append(column) + + return query_indexable_columns + + +def generate_candidate_indexes(workload, workload_table_name, db): + candidate_indexes = [] + index_dict = {} + db.init_conn_handle() + for k, query in enumerate(workload): + if 'select ' in query.statement.lower(): + table_index_dict = query_index_advisor(query.statement, workload_table_name, db) + need_check = False + query_indexable_columns = get_indexable_columns(table_index_dict) + valid_index_dict = query_index_check(query.statement, query_indexable_columns, db) + + for i in range(MAX_INDEX_COLUMN_NUM): + for table in valid_index_dict.keys(): + for columns in valid_index_dict[table]: + if columns.count(',') == i: + need_check = True + for single_column in query_indexable_columns[table]: + if single_column not in columns: + valid_index_dict[table].append(columns + ',' + single_column) + if need_check: + valid_index_dict = query_index_check(query.statement, valid_index_dict, db) + need_check = False + else: + break + + # filter duplicate indexes + for table in valid_index_dict.keys(): + if table not in index_dict.keys(): + index_dict[table] = {} + for columns in valid_index_dict[table]: + if len(workload[k].valid_index_list) >= FULL_ARRANGEMENT_THRESHOLD: + break + workload[k].valid_index_list.append(IndexItem(table, columns)) + if not any(re.match(r'%s' % columns, item) for item in index_dict[table]): + column_sql = {columns: [k]} + index_dict[table].update(column_sql) + elif columns in index_dict[table].keys(): + index_dict[table][columns].append(k) + for table, column_sqls in index_dict.items(): + for column, sql in column_sqls.items(): + print("table: ", table, "columns: ", column) + candidate_indexes.append(IndexItem(table, column, sql)) + db.close_conn() + return candidate_indexes + + +def get_atomic_config_for_query(indexes, config, ind, atomic_configs): + if ind == len(indexes): + table_count = {} + for index in config: + if index.table not in table_count.keys(): + table_count[index.table] = 1 + else: + table_count[index.table] += 1 + if len(table_count) > 2 or table_count[index.table] > 2: + return + atomic_configs.append(config) + + return + + get_atomic_config_for_query(indexes, copy.copy(config), ind + 1, atomic_configs) + config.append(indexes[ind]) + get_atomic_config_for_query(indexes, copy.copy(config), ind + 1, atomic_configs) + + +def is_same_config(config1, config2): + if len(config1) != len(config2): + return False + + for index1 in config1: + is_found = False + for index2 in config2: + if index1.table == index2.table and index1.columns == index2.columns: + is_found = True + if not is_found: + return False + + return True + + +def generate_atomic_config(workload): + atomic_config_total = [] + + for query in workload: + if len(query.valid_index_list) == 0: + continue + + atomic_configs = [] + config = [] + get_atomic_config_for_query(query.valid_index_list, config, 0, atomic_configs) + + is_found = False + for new_config in atomic_configs: + for exist_config in atomic_config_total: + if is_same_config(new_config, exist_config): + is_found = True + break + if not is_found: + atomic_config_total.append(new_config) + is_found = False + + return atomic_config_total + + +# find the subsets of a given config in the atomic configs +def find_subsets_num(config, atomic_config_total): + atomic_subsets_num = [] + is_exist = False + + for i, atomic_config in enumerate(atomic_config_total): + if len(atomic_config) > len(config): + continue + # Record the atomic index position of the newly added index + if len(atomic_config) == 1 and atomic_config[0].table == config[-1].table and \ + atomic_config[0].columns == config[-1].columns: + cur_index_atomic_pos = i + for atomic_index in atomic_config: + is_exist = False + for index in config: + if atomic_index.table == index.table and atomic_index.columns == index.columns: + index.storage = atomic_index.storage + is_exist = True + break + if not is_exist: + break + if is_exist: + atomic_subsets_num.append(i) + + return atomic_subsets_num, cur_index_atomic_pos + + +def get_index_num(index, atomic_config_total): + for i, atomic_config in enumerate(atomic_config_total): + if len(atomic_config) == 1 and atomic_config[0].table == index.table and \ + atomic_config[0].columns == index.columns: + return i + + return -1 + + +def record_ineffective_negative_sql(candidate_index, obj, ind): + cur_table = candidate_index.table + ' ' + if cur_table in obj.statement: + candidate_index.total_sql_num += obj.frequency + if 'insert ' in obj.statement.lower(): + candidate_index.insert_sql_num += obj.frequency + candidate_index.negative_pos.append(ind) + elif 'delete ' in obj.statement.lower(): + candidate_index.delete_sql_num += obj.frequency + candidate_index.negative_pos.append(ind) + elif 'update ' in obj.statement.lower(): + candidate_index.update_sql_num += obj.frequency + if any(column in obj.statement.lower().split('where ', 1)[0] for column in + candidate_index.columns.split(',')): + candidate_index.negative_pos.append(ind) + else: + candidate_index.ineffective_pos.append(ind) + else: + candidate_index.select_sql_num += obj.frequency + if ind not in candidate_index.positive_pos and \ + any(column in obj.statement.lower() for column in candidate_index.columns): + candidate_index.ineffective_pos.append(ind) + + +# infer the total cost of workload for a config according to the cost of atomic configs +def infer_workload_cost(workload, config, atomic_config_total): + total_cost = 0 + is_computed = False + atomic_subsets_num, cur_index_atomic_pos = find_subsets_num(config, atomic_config_total) + if len(atomic_subsets_num) == 0: + raise ValueError("No atomic configs found for current config!") + if not config[-1].total_sql_num: + is_computed = True + for ind, obj in enumerate(workload): + if max(atomic_subsets_num) >= len(obj.cost_list): + raise ValueError("Wrong atomic config for current query!") + # compute the cost for selection + min_cost = obj.cost_list[0] + for num in atomic_subsets_num: + if num < len(obj.cost_list) and obj.cost_list[num] < min_cost: + min_cost = obj.cost_list[num] + total_cost += min_cost + + # record ineffective sql and negative sql for candidate indexes + if is_computed: + record_ineffective_negative_sql(config[-1], obj, ind) + return total_cost, cur_index_atomic_pos + + +def simple_index_advisor(input_path, max_index_num, db): + workload, workload_count = workload_compression(input_path) + print_header_boundary(" Generate candidate indexes ") + ori_indexes_name = set() + workload_table_name = dict() + display_info = {'workloadCount': workload_count, 'recommendIndexes': []} + candidate_indexes = generate_candidate_indexes(workload, workload_table_name, db) + db.init_conn_handle() + if len(candidate_indexes) == 0: + print("No candidate indexes generated!") + estimate_workload_cost_file(workload, db, ori_indexes_name=ori_indexes_name) + return ori_indexes_name, workload_table_name, display_info + + print_header_boundary(" Determine optimal indexes ") + ori_total_cost = estimate_workload_cost_file(workload, db, ori_indexes_name=ori_indexes_name) + index_cost_total = [ori_total_cost] + for _, obj in enumerate(candidate_indexes): + new_total_cost = estimate_workload_cost_file(workload, db, [obj]) + index_cost_total.append(new_total_cost) + obj.benefit = ori_total_cost - new_total_cost + db.close_conn() + candidate_indexes = sorted(enumerate(candidate_indexes), + key=lambda item: item[1].benefit, reverse=True) + candidate_indexes = [item for item in candidate_indexes if item[1].benefit > 0] + global MAX_INDEX_NUM + MAX_INDEX_NUM = max_index_num + + display_recommend_result(workload, candidate_indexes, index_cost_total, False, display_info) + return ori_indexes_name, workload_table_name, display_info + + +def greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes, origin_sum_cost): + opt_config = [] + index_num_record = set() + min_cost = origin_sum_cost + for i in range(len(candidate_indexes)): + if i == 1 and min_cost == origin_sum_cost: + break + cur_min_cost = origin_sum_cost + cur_index = None + cur_index_num = -1 + for k, index in enumerate(candidate_indexes): + if k in index_num_record: + continue + cur_config = copy.copy(opt_config) + cur_config.append(index) + cur_estimated_cost, cur_index_atomic_pos = \ + infer_workload_cost(workload, cur_config, atomic_config_total) + if cur_estimated_cost < cur_min_cost: + cur_min_cost = cur_estimated_cost + cur_index = index + cur_index.atomic_pos = cur_index_atomic_pos + cur_index_num = k + if cur_index and cur_min_cost < min_cost: + if MAX_INDEX_STORAGE and sum([obj.storage for obj in opt_config]) + \ + cur_index.storage > MAX_INDEX_STORAGE: + continue + if len(opt_config) == MAX_INDEX_NUM: + break + min_cost = cur_min_cost + opt_config.append(cur_index) + index_num_record.add(cur_index_num) + else: + break + + return opt_config + + +def complex_index_advisor(input_path, db): + workload, workload_count = workload_compression(input_path) + print_header_boundary(" Generate candidate indexes ") + ori_indexes_name = set() + workload_table_name = dict() + display_info = {'workloadCount': workload_count, 'recommendIndexes': []} + candidate_indexes = generate_candidate_indexes(workload, workload_table_name, db) + db.init_conn_handle() + if len(candidate_indexes) == 0: + print("No candidate indexes generated!") + estimate_workload_cost_file(workload, db, ori_indexes_name=ori_indexes_name) + return ori_indexes_name, workload_table_name, display_info + + print_header_boundary(" Determine optimal indexes ") + atomic_config_total = generate_atomic_config(workload) + if atomic_config_total and len(atomic_config_total[0]) != 0: + raise ValueError("The empty atomic config isn't generated!") + index_cost_total = [] + for atomic_config in atomic_config_total: + index_cost_total.append(estimate_workload_cost_file(workload, db, atomic_config, + ori_indexes_name)) + db.close_conn() + opt_config = greedy_determine_opt_config(workload, atomic_config_total, + candidate_indexes, index_cost_total[0]) + + display_recommend_result(workload, opt_config, index_cost_total, True, display_info) + return ori_indexes_name, workload_table_name, display_info + + +def main(): + arg_parser = argparse.ArgumentParser(description='Generate index set for workload.') + arg_parser.add_argument("p", help="Port of database") + arg_parser.add_argument("d", help="Name of database") + arg_parser.add_argument("--h", help="Host for database") + arg_parser.add_argument("-U", help="Username for database log-in") + arg_parser.add_argument("-W", help="Password for database user", nargs="?", action=PwdAction) + arg_parser.add_argument("f", help="File containing workload queries (One query per line)") + arg_parser.add_argument("--schema", help="Schema name for the current business data", required=True) + arg_parser.add_argument("--max_index_num", help="Maximum number of suggested indexes", type=int) + arg_parser.add_argument("--max_index_storage", + help="Maximum storage of suggested indexes/MB", type=int) + arg_parser.add_argument("--multi_iter_mode", action='store_true', + help="Whether to use multi-iteration algorithm", default=False) + arg_parser.add_argument("--multi_node", action='store_true', + help="Whether to support distributed scenarios", default=False) + arg_parser.add_argument("--json", action='store_true', + help="Whether the workload file format is json", default=False) + arg_parser.add_argument("--show_detail", action='store_true', + help="Whether to show detailed sql information", default=False) + args = arg_parser.parse_args() + + global MAX_INDEX_NUM, BASE_CMD, ENABLE_MULTI_NODE, MAX_INDEX_STORAGE, SCHEMA, JSON_TYPE + if args.max_index_num is not None and args.max_index_num <= 0: + raise argparse.ArgumentTypeError("%s is an invalid positive int value" % + args.max_index_num) + if args.max_index_storage is not None and args.max_index_storage <= 0: + raise argparse.ArgumentTypeError("%s is an invalid positive int value" % + args.max_index_storage) + SCHEMA = args.schema + JSON_TYPE = args.json + MAX_INDEX_NUM = args.max_index_num or 10 + ENABLE_MULTI_NODE = args.multi_node + MAX_INDEX_STORAGE = args.max_index_storage + BASE_CMD = 'gsql -p ' + args.p + ' -d ' + args.d + if args.h: + BASE_CMD += ' -h ' + args.h + if args.U: + BASE_CMD += ' -U ' + args.U + if args.U != getpass.getuser() and not args.W: + raise ValueError('Enter the \'-W\' parameter for user ' + + args.U + ' when executing the script.') + if args.W: + BASE_CMD += ' -W ' + args.W + # Initialize the connection + db = DatabaseConn(args.d, args.U, args.W, args.h, args.p) + + if args.multi_iter_mode: + workload_indexes, tables, detail_info = complex_index_advisor(args.f, db) + else: + workload_indexes, tables, detail_info = simple_index_advisor(args.f, args.max_index_num, db) + + whole_indexes, redundant_indexes = get_whole_index(tables, db, detail_info) + # check the unused indexes of the current workload based on the whole index + check_unused_index_workload(whole_indexes, redundant_indexes, workload_indexes, detail_info) + if args.show_detail: + print_header_boundary(" Display detail information ") + sql_info = json.dumps(detail_info, indent=4, separators=(',', ':')) + print(sql_info) + + +if __name__ == '__main__': + main() + diff --git a/src/gausskernel/dbmind/tools/index_advisor/index_server.py b/src/gausskernel/dbmind/tools/index_advisor/index_server.py new file mode 100644 index 000000000..9c8834101 --- /dev/null +++ b/src/gausskernel/dbmind/tools/index_advisor/index_server.py @@ -0,0 +1,373 @@ +try: + import sys + import os + import argparse + import logging + import time + import signal + import re + import select + import urllib + import json + import datetime + from urllib.request import Request + from subprocess import Popen, PIPE + from threading import Thread, Event, Timer + from configparser import ConfigParser + from logging import handlers +except ImportError as err: + sys.exit("index_server.py: Failed to import module: %s." % str(err)) + + +current_dirname = os.path.dirname(os.path.realpath(__file__)) +__description__ = 'index advise: index server tool.' + + +class RepeatTimer(Thread): + """ + This class inherits from threading.Thread, it is used for periodic execution + function at a specified time interval. + """ + + def __init__(self, interval, function, *args, **kwargs): + Thread.__init__(self) + self._interval = interval + self._function = function + self._args = args + self._kwargs = kwargs + self._finished = Event() + + def run(self): + while not self._finished.is_set(): + # Execute first, wait later. + self._function(*self._args, **self._kwargs) + self._finished.wait(self._interval) + self._finished.set() + + def cancel(self): + self._finished.set() + + +class CreateLogger: + def __init__(self, level, log_name): + self.level = level + self.log_name = log_name + + def create_log(self, log_path): + logger = logging.getLogger(self.log_name) + log_path = os.path.join(os.path.dirname(log_path), 'log') + if os.path.exists(log_path): + if os.path.isfile(log_path): + os.remove(log_path) + os.mkdir(log_path) + else: + os.makedirs(log_path) + agent_handler = handlers.RotatingFileHandler(filename=os.path.join(log_path, self.log_name), + maxBytes=1024 * 1024 * 100, + backupCount=5) + agent_handler.setFormatter(logging.Formatter( + "[%(asctime)s %(levelname)s]-[%(filename)s][%(lineno)d]: %(message)s")) + logger.addHandler(agent_handler) + logger.setLevel(getattr(logging, self.level.upper()) + if hasattr(logging, self.level.upper()) else logging.INFO) + return logger + + +class IndexServer: + def __init__(self, pid_file, logger, password, **kwargs): + self.pid_file = pid_file + self.logger = logger + self.password = password + self._kwargs = kwargs + + def check_proc_exist(self, proc_name): + """ + check proc exist + :param proc_name: proc name + :return: proc pid + """ + check_proc = "ps ux | grep '%s' | grep -v grep | grep -v nohup | awk \'{print $2}\'" % proc_name + _, std = self.execute_cmd(check_proc) + current_pid = str(os.getpid()) + pid_list = [pid for pid in std.split("\n") if pid and pid != current_pid] + if not pid_list: + return "" + return " ".join(pid_list) + + def execute_cmd(self, cmd): + """ + execute cmd + :param cmd: cmd str + :param shell: execute shell mode, True or False + :return: execute result + """ + proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + std, err_msg = proc.communicate() + if proc.returncode != 0: + self.logger.error("Failed to execute command. Error: %s." % str(err_msg)) + return proc.returncode, std.decode() + + def save_recommendation_infos(self, recommendation_infos): + headers = {'Content-Type': 'application/json'} + data = json.dumps(recommendation_infos, default=lambda o: o.__dict__, sort_keys=True, + indent=4).encode() + request = Request(url=self._kwargs['ai_monitor_url'], headers=headers, + data=data) + + response = None + try: + response = urllib.request.urlopen(request, timeout=600) + result = json.loads(response.read()) + finally: + if response: + response.close() + + return result + + def convert_output_to_recommendation_infos(self, sql_lines): + detail_info_pos = 0 + index_info = sql_lines.splitlines() + for pos, line in enumerate(index_info): + if 'Display detail information' in line: + detail_info_pos = pos + 1 + break + detail_info_json = json.loads('\n'.join(index_info[detail_info_pos:])) + detail_info_json['appName'] = self._kwargs.get('app_name') + detail_info_json['nodeHost'] = self._kwargs.get('host') + detail_info_json['dbName'] = self._kwargs.get('database') + return detail_info_json + + def execute_index_advisor(self): + self.logger.info('Index advisor task starting.') + try: + cmd = 'echo %s | python3 %s/index_advisor_workload.py %s %s %s -U %s -W ' \ + '--schema %s --json --multi_iter_mode --show_detail' % ( + self.password, current_dirname, self._kwargs['port'], self._kwargs['database'], + self._kwargs['output_sql_file'], self._kwargs['user'], self._kwargs['schema']) + if self._kwargs['max_index_storage']: + cmd += ' --max_index_storage %s ' % self._kwargs['max_index_storage'] + if self._kwargs['max_index_num']: + cmd += ' --max_index_num %s ' % self._kwargs['max_index_num'] + if self._kwargs['driver']: + try: + import psycopg2 + cmd = cmd.replace('index_advisor_workload.py', + 'index_advisor_workload_driver.py') + except ImportError: + self.logger.warning('Driver import failed, use gsql to connect to the database.') + self.logger.info('Index advisor cmd:%s' % cmd.split('|')[-1]) + if os.path.exists(self._kwargs['output_sql_file']): + _, res = self.execute_cmd(cmd) + detail_info_json = self.convert_output_to_recommendation_infos(res) + + self.logger.info('Index advisor result: %s.' % detail_info_json) + result = self.save_recommendation_infos(detail_info_json) + if result['status'] is not True: + self.logger.error('Fail to upload index result, Error: %s' % result['message']) + else: + self.logger.info('Success to upload index result.') + except Exception as e: + self.logger.error(e) + + def extract_log(self, start_time): + extract_log_cmd = 'python3 %s %s %s --start_time "%s"' % \ + (os.path.join(current_dirname, 'extract_log.py'), + self._kwargs['pg_log_path'], + self._kwargs['output_sql_file'], start_time) + if self._kwargs['database']: + extract_log_cmd += ' -d %s ' % self._kwargs['database'] + if self._kwargs['wl_user']: + extract_log_cmd += ' -U %s ' % self._kwargs['wl_user'] + if self._kwargs['sql_amount']: + extract_log_cmd += ' --sql_amount %s ' % self._kwargs['sql_amount'] + if self._kwargs['statement']: + extract_log_cmd += ' --statement ' + self.logger.info('Extracting log cmd: %s' % extract_log_cmd) + self.execute_cmd(extract_log_cmd) + self.logger.info('The current log extraction is complete.') + + def monitor_log_size(self, guc_reset): + self.logger.info('Open GUC params.') + # get original all file size + original_total_size = self.get_directory_size() + self.logger.info('Original total file size: %sM' % (original_total_size / 1024 / 1024)) + deviation_size = 0 + # open guc + guc_reload = 'gs_guc reload -Z datanode -D {datanode} -c "log_min_duration_statement = 0" && ' \ + 'gs_guc reload -Z datanode -D {datanode} -c "log_statement= \'all\'"' \ + .format(datanode=self._kwargs['datanode']) + self.execute_cmd(guc_reload) + start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(time.time()))) + # caculate log size + count = 0 + while deviation_size < self._kwargs['max_generate_log_size']: + time.sleep(5) + current_size = self.get_directory_size() + deviation_size = (current_size - original_total_size) / 1024 / 1024 + if current_size - original_total_size < 0: + if count >= 60: + break + count += 1 + self.logger.info('Current log size difference: %sM' % deviation_size) + self.logger.info('Start to reset GUC, cmd: %s' % guc_reset) + returncode, res = self.execute_cmd(guc_reset) + if returncode == 0: + self.logger.info('Success to reset GUC setting.') + else: + self.logger.error('Failed to reset GUC params. please check it.') + return start_time + + def get_directory_size(self): + files = os.listdir(self._kwargs['pg_log_path']) + total_size = 0 + for file in files: + total_size += os.path.getsize(os.path.join(self._kwargs['pg_log_path'], file)) + return total_size + + def execute_index_recommendation(self): + self.logger.info('Start checking guc.') + try: + guc_check = 'gs_guc check -Z datanode -D {datanode} -c "log_min_duration_statement" && ' \ + 'gs_guc check -Z datanode -D {datanode} -c "log_statement" '\ + .format(datanode=self._kwargs['datanode']) + returncode, res = self.execute_cmd(guc_check) + origin_min_duration = self._kwargs['log_min_duration_statement'] + origin_log_statement = self._kwargs['log_statement'] + if returncode == 0: + self.logger.info('Original GUC settings is: %s' % res) + match_res = re.findall(r'log_min_duration_statement=(\'?[a-zA-Z0-9]+\'?)', res) + if match_res: + origin_min_duration = match_res[-1] + if 'NULL' in origin_min_duration: + origin_min_duration = '30min' + match_res = re.findall(r'log_statement=(\'?[a-zA-Z]+\'?)', res) + if match_res: + origin_log_statement = match_res[-1] + if 'NULL' in origin_log_statement: + origin_log_statement = 'none' + self.logger.info('Parsed (log_min_duration_statement, log_statement) GUC params are (%s, %s)' % + (origin_min_duration, origin_log_statement)) + self.logger.info('Test reseting GUC command...') + guc_reset = 'gs_guc reload -Z datanode -D %s -c "log_min_duration_statement = %s" && ' \ + 'gs_guc reload -Z datanode -D %s -c "log_statement= %s"' % \ + (self._kwargs['datanode'], origin_min_duration, + self._kwargs['datanode'], origin_log_statement) + returncode, res = self.execute_cmd(guc_reset) + if returncode != 0: + guc_reset = 'gs_guc reload -Z datanode -D %s -c "log_min_duration_statement = %s" && ' \ + 'gs_guc reload -Z datanode -D %s -c "log_statement= %s"' % \ + (self._kwargs['datanode'], self._kwargs['log_min_duration_statement'], + self._kwargs['datanode'], self._kwargs['log_statement']) + ret, res = self.execute_cmd(guc_reset) + if ret != 0: + raise Exception('Cannot reset GUC initial value, please check it.') + self.logger.info('Test successfully') + # open guc and monitor log real-time size + start_time = self.monitor_log_size(guc_reset) + # extract log + self.extract_log(start_time) + # index advise + self.execute_index_advisor() + except Exception as e: + self.logger.error(e) + guc_reset = 'gs_guc reload -Z datanode -D %s -c "log_min_duration_statement = %s" && ' \ + 'gs_guc reload -Z datanode -D %s -c "log_statement= %s"' % \ + (self._kwargs['datanode'], self._kwargs['log_min_duration_statement'], + self._kwargs['datanode'], self._kwargs['log_statement']) + self.execute_cmd(guc_reset) + + def start_service(self): + # check service is running or not. + if os.path.isfile(self.pid_file): + pid = self.check_proc_exist("index_server") + if pid: + raise Exception("Error: Process already running, can't start again.") + else: + os.remove(self.pid_file) + + # get listen host and port + self.logger.info("Start service...") + # write process pid to file + if not os.path.isdir(os.path.dirname(self.pid_file)): + os.makedirs(os.path.dirname(self.pid_file), 0o700) + with open(self.pid_file, mode='w') as f: + f.write(str(os.getpid())) + + self.logger.info("Index advisor execution intervals is: %sh" % + self._kwargs['index_intervals']) + index_recommendation_thread = RepeatTimer(self._kwargs['index_intervals']*60*60, + self.execute_index_recommendation) + self.logger.info("Start timer...") + index_recommendation_thread.start() + + +def read_input_from_pipe(): + """ + Read stdin input if there is "echo 'str1 str2' | python xx.py", + return the input string + """ + input_str = "" + r_handle, _, _ = select.select([sys.stdin], [], [], 0) + if not r_handle: + return "" + + for item in r_handle: + if item == sys.stdin: + input_str = sys.stdin.read().strip() + return input_str + + +def parse_check_conf(config_path): + config = ConfigParser() + config.read(config_path) + config_dict = dict() + config_dict['app_name'] = config.get("server", "app_name") + config_dict['database'] = config.get("server", "database") + config_dict['port'] = config.get("server", "port") + config_dict['host'] = config.get("server", "host") + config_dict['user'] = config.get("server", "user") + config_dict['wl_user'] = config.get("server", "workload_user") + config_dict['schema'] = config.get("server", "schema") + config_dict['max_index_num'] = config.getint("server", "max_index_num") + config_dict['max_index_storage'] = config.get("server", "max_index_storage") + config_dict['driver'] = config.getboolean("server", "driver") + config_dict['index_intervals'] = config.getint("server", "index_intervals") + config_dict['sql_amount'] = config.getint("server", "sql_amount") + config_dict['output_sql_file'] = config.get("server", "output_sql_file") + config_dict['datanode'] = config.get("server", "datanode") + config_dict['pg_log_path'] = config.get("server", "pg_log_path") + config_dict['ai_monitor_url'] = config.get("server", "ai_monitor_url") + config_dict['max_generate_log_size'] = config.getfloat("server", "max_generate_log_size") + config_dict['statement'] = config.getboolean("server", "statement") + config_dict['log_min_duration_statement'] = config.get("server", "log_min_duration_statement") + config_dict['log_statement'] = config.get("server", "log_statement") + if not config_dict['log_min_duration_statement'] or \ + not re.match(r'[a-zA-Z0-9]+', config_dict['log_min_duration_statement']): + raise ValueError("Please enter a legal value of [log_min_duration_statement]") + legal_log_statement = ['none', 'all', 'ddl', 'mod'] + if config_dict['log_statement'] not in legal_log_statement: + raise ValueError("Please enter a legal value of [log_statement]") + return config_dict + + +def manage_service(): + config_path = os.path.join(current_dirname, 'database-info.conf') + config_dict = parse_check_conf(config_path) + LOGGER = CreateLogger("debug", "start_service.log").create_log(config_dict.get('output_sql_file')) + server_pid_file = os.path.join(current_dirname, 'index_server.pid') + password = read_input_from_pipe() + IndexServer(server_pid_file, LOGGER, password, **config_dict).start_service() + + +def main(): + try: + manage_service() + except Exception as err_msg: + print(err_msg) + sys.exit(1) + + +if __name__ == '__main__': + main() + +