From 9794ae1f46abc50fa9203c50d4b861e8c941459f Mon Sep 17 00:00:00 2001 From: wangtq Date: Mon, 26 Apr 2021 22:16:47 +0800 Subject: [PATCH] fix (dbmind): fix some bugs on the index advisor --- .../dbmind/kernel/hypopg_index.cpp | 27 +- .../dbmind/kernel/index_advisor.cpp | 278 ++-- .../dbmind/tools/index_advisor/README.md | 5 +- .../index_advisor/index_advisor_workload.py | 1367 +++++++++-------- 4 files changed, 911 insertions(+), 766 deletions(-) diff --git a/src/gausskernel/dbmind/kernel/hypopg_index.cpp b/src/gausskernel/dbmind/kernel/hypopg_index.cpp index af4830657..621a08ab1 100644 --- a/src/gausskernel/dbmind/kernel/hypopg_index.cpp +++ b/src/gausskernel/dbmind/kernel/hypopg_index.cpp @@ -1021,10 +1021,6 @@ const char *hypo_explain_get_index_name_hook(Oid indexId) */ Datum hypopg_display_index(PG_FUNCTION_ARGS) { -#ifdef ENABLE_MULTIPLE_NODES - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("not support for distributed scenarios yet."))); -#endif - ReturnSetInfo *rsinfo = (ReturnSetInfo *)fcinfo->resultinfo; MemoryContext per_query_ctx; MemoryContext oldcontext; @@ -1065,6 +1061,7 @@ Datum hypopg_display_index(PG_FUNCTION_ARGS) Datum values[HYPO_INDEX_NB_COLS]; bool nulls[HYPO_INDEX_NB_COLS]; StringInfoData index_columns; + char *rel_name = NULL; int i = 0; int keyno; @@ -1073,9 +1070,13 @@ Datum hypopg_display_index(PG_FUNCTION_ARGS) rc = memset_s(nulls, sizeof(nulls), 0, sizeof(nulls)); securec_check(rc, "\0", "\0"); + rel_name = get_rel_name(entry->relid); + if (rel_name == NULL) { + break; + } values[i++] = CStringGetTextDatum(entry->indexname); values[i++] = ObjectIdGetDatum(entry->oid); - values[i++] = CStringGetTextDatum(get_rel_name(entry->relid)); + values[i++] = CStringGetTextDatum(rel_name); initStringInfo(&index_columns); appendStringInfo(&index_columns, "("); @@ -1104,10 +1105,6 @@ Datum hypopg_display_index(PG_FUNCTION_ARGS) */ Datum hypopg_create_index(PG_FUNCTION_ARGS) { -#ifdef ENABLE_MULTIPLE_NODES - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("not support for distributed scenarios yet."))); -#endif - char *sql = TextDatumGetCString(PG_GETARG_TEXT_PP(0)); List *parsetree_list; ListCell *parsetree_item; @@ -1183,10 +1180,6 @@ Datum hypopg_create_index(PG_FUNCTION_ARGS) */ Datum hypopg_drop_index(PG_FUNCTION_ARGS) { -#ifdef ENABLE_MULTIPLE_NODES - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("not support for distributed scenarios yet."))); -#endif - Oid indexid = PG_GETARG_OID(0); PG_RETURN_BOOL(hypo_index_remove(indexid)); @@ -1197,10 +1190,6 @@ Datum hypopg_drop_index(PG_FUNCTION_ARGS) */ Datum hypopg_estimate_size(PG_FUNCTION_ARGS) { -#ifdef ENABLE_MULTIPLE_NODES - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("not support for distributed scenarios yet."))); -#endif - BlockNumber pages; double tuples; Oid indexid = PG_GETARG_OID(0); @@ -1228,10 +1217,6 @@ Datum hypopg_estimate_size(PG_FUNCTION_ARGS) */ Datum hypopg_reset_index(PG_FUNCTION_ARGS) { -#ifdef ENABLE_MULTIPLE_NODES - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("not support for distributed scenarios yet."))); -#endif - hypo_index_reset(); PG_RETURN_VOID(); } diff --git a/src/gausskernel/dbmind/kernel/index_advisor.cpp b/src/gausskernel/dbmind/kernel/index_advisor.cpp index 89e8bcd46..333b525fe 100644 --- a/src/gausskernel/dbmind/kernel/index_advisor.cpp +++ b/src/gausskernel/dbmind/kernel/index_advisor.cpp @@ -31,6 +31,7 @@ #include "catalog/indexing.h" #include "catalog/pg_attribute.h" #include "funcapi.h" +#include "nodes/makefuncs.h" #include "nodes/nodes.h" #include "nodes/parsenodes.h" #include "pg_config_manual.h" @@ -67,7 +68,7 @@ typedef struct { typedef struct { char *table_name; - char *alias_name; + List *alias_name; List *index; List *join_cond; List *index_print; @@ -105,6 +106,8 @@ static void startup(DestReceiver *self, int operation, TupleDesc typeinfo); static void destroy(DestReceiver *self); static void free_global_resource(); static void find_select_stmt(Node *); +static void extract_stmt_from_clause(List *); +static void extract_stmt_where_clause(Node *); static void parse_where_clause(Node *); static void field_value_trans(_out_ char *, A_Const *); static void parse_field_expr(List *, List *, List *); @@ -114,12 +117,12 @@ static uint4 calculate_field_cardinality(const char *, const char *); static bool is_tmp_table(const char *); static char *find_field_name(List *); static char *find_table_name(List *); +static bool check_relation_type_valid(Oid); static TableCell *find_or_create_tblcell(char *, char *); static void add_index_from_field(char *, IndexCell *); static char *parse_group_clause(List *, List *); static char *parse_order_clause(List *, List *); static void add_index_from_group_order(TableCell *, List *, List *, bool); -static Oid find_table_oid(List *, const char *); static void generate_final_index(TableCell *, Oid); static void parse_from_clause(List *); static void add_drived_tables(RangeVar *); @@ -134,10 +137,6 @@ static void add_index_for_drived_tables(); Datum gs_index_advise(PG_FUNCTION_ARGS) { -#ifdef ENABLE_MULTIPLE_NODES - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("not support for distributed scenarios yet."))); -#endif - FuncCallContext *func_ctx = NULL; SuggestedIndex *array = NULL; @@ -233,6 +232,8 @@ SuggestedIndex *suggest_index(const char *query_string, _out_ int *len) } Node *parsetree = (Node *)lfirst(list_head(parse_tree_list)); + Node* parsetree_copy = (Node*)copyObject(parsetree); + (void)parse_analyze(parsetree_copy, query_string, NULL, 0); find_select_stmt(parsetree); if (!g_stmt_list) { @@ -245,9 +246,6 @@ SuggestedIndex *suggest_index(const char *query_string, _out_ int *len) foreach (item, g_stmt_list) { SelectStmt *stmt = (SelectStmt *)lfirst(item); parse_from_clause(stmt->fromClause); - /* Note: the structure JoinExpr will be modified after 'parse_analyze', so 'parse_from_clause' - should be executed first. */ - Query *query_tree = parse_analyze((Node *)stmt, query_string, NULL, 0); if (g_table_list) { parse_where_clause(stmt->whereClause); @@ -267,8 +265,9 @@ SuggestedIndex *suggest_index(const char *query_string, _out_ int *len) foreach (table_item, g_table_list) { TableCell *table = (TableCell *)lfirst(table_item); if (table->index != NIL) { - Oid table_oid = find_table_oid(query_tree->rtable, table->table_name); - if (table_oid == 0) { + RangeVar* rtable = makeRangeVar(NULL, table->table_name, -1); + Oid table_oid = RangeVarGetRelid(rtable, NoLock, true); + if (table_oid == InvalidOid) { continue; } generate_final_index(table, table_oid); @@ -277,9 +276,12 @@ SuggestedIndex *suggest_index(const char *query_string, _out_ int *len) g_driver_table = NULL; } } + if (g_table_list == NIL) { + ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("can not advise for query: %s.", query_string))); + } // Format the returned result, e.g., 'table1, "(col1,col2),(col3)"'. - int array_len = g_table_list == NIL ? 0 : g_table_list->length; + int array_len = g_table_list->length; *len = array_len; SuggestedIndex *array = (SuggestedIndex *)palloc0(sizeof(SuggestedIndex) * array_len); errno_t rc = EOK; @@ -299,6 +301,9 @@ SuggestedIndex *suggest_index(const char *query_string, _out_ int *len) ListCell *cur_index = NULL; int j = 0; foreach (cur_index, index_list) { + if (strlen((char *)lfirst(cur_index)) + strlen((array + i)->column) + 3 > NAMEDATALEN) { + continue; + } if (j > 0) { rc = strcat_s((array + i)->column, NAMEDATALEN, ",("); } else { @@ -551,79 +556,41 @@ void destroy(DestReceiver *self) */ void find_select_stmt(Node *parsetree) { - switch (nodeTag(parsetree)) { - case T_SelectStmt: { - SelectStmt *stmt = (SelectStmt *)parsetree; - bool has_substmt = false; + if (parsetree == NULL || nodeTag(parsetree) != T_SelectStmt) { + return; + } + SelectStmt *stmt = (SelectStmt *)parsetree; + g_stmt_list = lappend(g_stmt_list, stmt); - switch (stmt->op) { - case SETOP_UNION: { - // analyze the set operation: union - has_substmt = true; - find_select_stmt((Node *)stmt->larg); - find_select_stmt((Node *)stmt->rarg); - break; + switch (stmt->op) { + case SETOP_UNION: { + // analyze the set operation: union + find_select_stmt((Node *)stmt->larg); + find_select_stmt((Node *)stmt->rarg); + break; + } + case SETOP_NONE: { + // analyze the 'with' clause + if (stmt->withClause) { + List *cte_list = stmt->withClause->ctes; + ListCell *item = NULL; + + foreach (item, cte_list) { + CommonTableExpr *cte = (CommonTableExpr *)lfirst(item); + g_tmp_table_list = lappend(g_tmp_table_list, cte->ctename); + find_select_stmt(cte->ctequery); } - case SETOP_NONE: { - // analyze the 'with' clause - if (stmt->withClause) { - List *cte_list = stmt->withClause->ctes; - has_substmt = true; - ListCell *item = NULL; + break; + } - foreach (item, cte_list) { - CommonTableExpr *cte = (CommonTableExpr *)lfirst(item); - g_tmp_table_list = lappend(g_tmp_table_list, cte->ctename); - find_select_stmt(cte->ctequery); - } - break; - } + // analyze the 'from' clause + if (stmt->fromClause) { + extract_stmt_from_clause(stmt->fromClause); + } - // analyze the 'from' clause - if (stmt->fromClause) { - List *from_list = stmt->fromClause; - ListCell *item = NULL; - - foreach (item, from_list) { - Node *from = (Node *)lfirst(item); - if (IsA(from, RangeSubselect)) { - has_substmt = true; - find_select_stmt(((RangeSubselect *)from)->subquery); - } - } - } - - // analyze the 'where' clause - if (stmt->whereClause) { - Node *item_where = stmt->whereClause; - if (IsA(item_where, SubLink)) { - has_substmt = true; - find_select_stmt(((SubLink *)item_where)->subselect); - } - while (IsA(item_where, A_Expr)) { - A_Expr *expr = (A_Expr *)item_where; - Node *lexpr = expr->lexpr; - Node *rexpr = expr->rexpr; - if (IsA(lexpr, SubLink)) { - has_substmt = true; - find_select_stmt(((SubLink *)lexpr)->subselect); - } - if (IsA(rexpr, SubLink)) { - has_substmt = true; - find_select_stmt(((SubLink *)rexpr)->subselect); - } - item_where = lexpr; - } - } - - if (!has_substmt) { - g_stmt_list = lappend(g_stmt_list, stmt); - } - break; - } - default: { - break; - } + // analyze the 'where' clause + if (stmt->whereClause) { + extract_stmt_where_clause(stmt->whereClause); } break; } @@ -633,20 +600,45 @@ void find_select_stmt(Node *parsetree) } } -Oid find_table_oid(List *list, const char *table_name) +void extract_stmt_from_clause(List *from_list) { ListCell *item = NULL; - foreach (item, list) { - RangeTblEntry *entry = (RangeTblEntry *)lfirst(item); - if (entry != NULL && entry->relname != NULL) { - if (strcasecmp(table_name, entry->relname) == 0) { - return entry->relid; + foreach (item, from_list) { + Node *from = (Node *)lfirst(item); + if (from && IsA(from, RangeSubselect)) { + find_select_stmt(((RangeSubselect *)from)->subquery); + } + if (from && IsA(from, JoinExpr)) { + Node *larg = ((JoinExpr *)from)->larg; + Node *rarg = ((JoinExpr *)from)->rarg; + if (larg && IsA(larg, RangeSubselect)) { + find_select_stmt(((RangeSubselect *)larg)->subquery); } + if (rarg && IsA(rarg, RangeSubselect)) { + find_select_stmt(((RangeSubselect *)rarg)->subquery); + } } } +} - return 0; +void extract_stmt_where_clause(Node *item_where) +{ + if (IsA(item_where, SubLink)) { + find_select_stmt(((SubLink *)item_where)->subselect); + } + while (item_where && IsA(item_where, A_Expr)) { + A_Expr *expr = (A_Expr *)item_where; + Node *lexpr = expr->lexpr; + Node *rexpr = expr->rexpr; + if (lexpr && IsA(lexpr, SubLink)) { + find_select_stmt(((SubLink *)lexpr)->subselect); + } + if (rexpr && IsA(rexpr, SubLink)) { + find_select_stmt(((SubLink *)rexpr)->subselect); + } + item_where = lexpr; + } } /* @@ -739,6 +731,10 @@ void parse_where_clause(Node *item_where) Node *rexpr = expr->rexpr; List *field_value = NIL; + if (lexpr == NULL || rexpr == NULL) { + return; + } + switch (expr->kind) { case AEXPR_OP: { // normal operator if (IsA(lexpr, ColumnRef) && IsA(rexpr, ColumnRef)) { @@ -806,8 +802,13 @@ void parse_from_clause(List *from_list) void add_drived_tables(RangeVar *join_node) { - char *table_name = ((RangeVar *)join_node)->relname; - TableCell *join_table = find_or_create_tblcell(table_name, NULL); + TableCell *join_table = NULL; + + if (join_node->alias) { + join_table = find_or_create_tblcell(join_node->relname, join_node->alias->aliasname); + } else { + join_table = find_or_create_tblcell(join_node->relname, NULL); + } if (!join_table) { return; @@ -928,7 +929,7 @@ void parse_join_expr(JoinExpr *join_tree) // convert field value to string void field_value_trans(_out_ char *target, A_Const *field_value) { - Value value = ((A_Const *)field_value)->val; + Value value = field_value->val; if (value.type == T_Integer) { pg_itoa(value.val.ival, target); @@ -963,6 +964,9 @@ void parse_field_expr(List *field, List *op, List *lfield_values) // get field values foreach (item, lfield_values) { char *str = (char *)palloc0(MAX_QUERY_LEN); + if (!IsA(lfirst(item), A_Const)) { + continue; + } field_value_trans(str, (A_Const *)lfirst(item)); if (i == 0) { rc = strcpy_s(field_value, MAX_QUERY_LEN, str); @@ -976,6 +980,12 @@ void parse_field_expr(List *field, List *op, List *lfield_values) pfree(str); } + if (i == 0) { + pfree(field_value); + pfree(field_expr); + return; + } + // get field expression, e.g., 'id = 100' if (strcasecmp(op_type, "~~") == 0) { // ...like... @@ -1076,21 +1086,22 @@ char *find_table_name(List *fields) char *table = NULL; ListCell *item = NULL; + ListCell *sub_item = NULL; // if fields have table name if (fields->length > 1) { table = strVal(linitial(fields)); - // check temporary tables and existed tables - if (is_tmp_table(table)) { - free_global_resource(); - ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("can not advise for temporary table: %s.", table))); - } else { - foreach (item, g_table_list) { - TableCell *cur_table = (TableCell *)lfirst(item); - if (strcasecmp(table, cur_table->table_name) == 0 || - (cur_table->alias_name && strcasecmp(table, cur_table->alias_name) == 0)) { + // check existed tables + foreach (item, g_table_list) { + TableCell *cur_table = (TableCell *)lfirst(item); + if (strcasecmp(table, cur_table->table_name) == 0) { + return cur_table->table_name; + } + foreach (sub_item, cur_table->alias_name) { + char *cur_alias_name = (char *)lfirst(sub_item); + if (strcasecmp(table, cur_alias_name) == 0) { return cur_table->table_name; - } + } } } return NULL; @@ -1127,7 +1138,33 @@ char *find_table_name(List *fields) return NULL; } -// Check whether the table is temporary. +// Check whether the table type is supported. +bool check_relation_type_valid(Oid relid) +{ + Relation relation; + bool result = false; + + relation = heap_open(relid, AccessShareLock); + if (RelationIsValid(relation) == false) { + heap_close(relation, AccessShareLock); + return result; + } + if (RelationIsRelation(relation) && + RelationGetPartType(relation) == PARTTYPE_NON_PARTITIONED_RELATION && + RelationGetRelPersistence(relation) == RELPERSISTENCE_PERMANENT) { + const char *format = ((relation->rd_options) && (((StdRdOptions *)(relation->rd_options))->orientation)) ? + ((char *)(relation->rd_options) + *(int *)&(((StdRdOptions *)(relation->rd_options))->orientation)) : + ORIENTATION_ROW; + if (pg_strcasecmp(format, ORIENTATION_ROW) == 0) { + result = true; + } + } + + heap_close(relation, AccessShareLock); + + return result; +} + bool is_tmp_table(const char *table_name) { ListCell *item = NULL; @@ -1138,6 +1175,7 @@ bool is_tmp_table(const char *table_name) return true; } } + return false; } @@ -1152,30 +1190,54 @@ TableCell *find_or_create_tblcell(char *table_name, char *alias_name) return NULL; } if (is_tmp_table(table_name)) { - free_global_resource(); - ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("can not advise for temporary table: %s.", table_name))); + ereport(WARNING, (errmsg("can not advise for: %s.", table_name))); + return NULL; } // seach the table among existed tables ListCell *item = NULL; + ListCell *sub_item = NULL; if (g_table_list != NIL) { foreach (item, g_table_list) { TableCell *cur_table = (TableCell *)lfirst(item); char *cur_table_name = cur_table->table_name; - char *cur_alias_name = cur_table->alias_name; - if (strcasecmp(cur_table_name, table_name) == 0 || - (cur_alias_name && strcasecmp(cur_alias_name, table_name) == 0)) { + if (strcasecmp(cur_table_name, table_name) == 0) { + if (alias_name) { + foreach (sub_item, cur_table->alias_name) { + char *cur_alias_name = (char *)lfirst(sub_item); + if (strcasecmp(cur_alias_name, alias_name) == 0) { + return cur_table; + } + } + cur_table->alias_name = lappend(cur_table->alias_name, alias_name); + } return cur_table; } + foreach (sub_item, cur_table->alias_name) { + char *cur_alias_name = (char *)lfirst(sub_item); + if (strcasecmp(cur_alias_name, table_name) == 0) { + return cur_table; + } + } } } + RangeVar* rtable = makeRangeVar(NULL, table_name, -1); + Oid table_oid = RangeVarGetRelid(rtable, NoLock, true); + if (check_relation_type_valid(table_oid) == false) { + ereport(WARNING, (errmsg("can not advise for: %s.", table_name))); + return NULL; + } + // create a new table TableCell *new_table = NULL; new_table = (TableCell *)palloc0(sizeof(*new_table)); new_table->table_name = table_name; - new_table->alias_name = alias_name; + new_table->alias_name = NIL; + if (alias_name) { + new_table->alias_name = lappend(new_table->alias_name, alias_name); + } new_table->index = NIL; new_table->join_cond = NIL; new_table->index_print = NIL; diff --git a/src/gausskernel/dbmind/tools/index_advisor/README.md b/src/gausskernel/dbmind/tools/index_advisor/README.md index 76fe85c4b..180010da1 100644 --- a/src/gausskernel/dbmind/tools/index_advisor/README.md +++ b/src/gausskernel/dbmind/tools/index_advisor/README.md @@ -11,5 +11,6 @@ benefit of it for the workload. ## Usage - python index_advisor_workload.py [p PORT] [d DATABASE] [f FILE] [--h HOST] [-U USERNAME] [-W PASSWORD] - [--max_index_num MAX_INDEX_NUM] [--multi_iter_mode] + python index_advisor_workload.py [p PORT] [d DATABASE] [f FILE] [--h HOST] [-U USERNAME] [-W PASSWORD][--schema SCHEMA] + [--max_index_num MAX_INDEX_NUM][--max_index_storage MAX_INDEX_STORAGE] [--multi_iter_mode] [--multi_node] + diff --git a/src/gausskernel/dbmind/tools/index_advisor/index_advisor_workload.py b/src/gausskernel/dbmind/tools/index_advisor/index_advisor_workload.py index ea16a7578..8056a12f5 100644 --- a/src/gausskernel/dbmind/tools/index_advisor/index_advisor_workload.py +++ b/src/gausskernel/dbmind/tools/index_advisor/index_advisor_workload.py @@ -1,635 +1,732 @@ -""" -Copyright (c) 2020 Huawei Technologies Co.,Ltd. - -openGauss is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - - http://license.coscl.org.cn/MulanPSL2 - -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. -""" -import os -import sys -import argparse -import copy -import getpass -import random -import re -import shlex -import subprocess -import time - -SAMPLE_NUM = 5 -MAX_INDEX_COLUMN_NUM = 5 -MAX_INDEX_NUM = 10 -BASE_CMD = '' -SQL_TYPE = ['select', 'delete', 'insert', 'update'] -SQL_PATTERN = [r'([^\\])\'((\')|(.*?([^\\])\'))', - r'([^\\])"((")|(.*?([^\\])"))', - r'([^a-zA-Z])-?\d+(\.\d+)?', - r'([^a-zA-Z])-?\d+(\.\d+)?', - r'(\'\d+\\.*?\')'] - - -class PwdAction(argparse.Action): - def __call__(self, parser, namespace, values, option_string=None): - if values is None: - values = getpass.getpass() - setattr(namespace, self.dest, values) - - -class QueryItem: - def __init__(self, sql, freq): - self.statement = sql - self.frequency = freq - self.valid_index_list = [] - self.cost_list = [] - - -class IndexItem: - def __init__(self, tbl, cols): - self.table = tbl - self.columns = cols - self.benefit = 0 - - -def get_file_size(filename): - if os.path.isfile(filename): - return os.stat(filename).st_size - else: - return -1 - - -def run_shell_cmd(target_sql_list): - cmd = BASE_CMD + ' -c \"' - for target_sql in target_sql_list: - cmd += target_sql + ';' - cmd += '\"' - proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - (stdout, stderr) = proc.communicate() - stdout, stderr = stdout.decode(), stderr.decode() - if 'gsql' in stderr or 'failed to connect' in stderr: - raise ConnectionError("An error occurred while connecting to the database.\n" + "Details: " + stderr) - - return stdout - - -def run_shell_sql_cmd(sql_file): - cmd = BASE_CMD + ' -f ./' + sql_file - - proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - (stdout, stderr) = proc.communicate() - stdout, stderr = stdout.decode(), stderr.decode() - if stderr: - print(stderr) - - return stdout - - -def print_header_boundary(header): - term_width = int(os.get_terminal_size().columns / 2) - side_width = (term_width - len(header)) - - print('#' * side_width + header + '#' * side_width) - - -def load_workload(file_path): - wd_dict = {} - workload = [] - - with open(file_path, 'r') as f: - for sql in f.readlines(): - sql = sql.strip('\n;') - if any(tp in sql.lower() for tp in SQL_TYPE): - if sql not in wd_dict.keys(): - wd_dict[sql] = 1 - else: - wd_dict[sql] += 1 - for sql, freq in wd_dict.items(): - workload.append(QueryItem(sql, freq)) - - return workload - - -def get_workload_template(workload): - templates = {} - placeholder = r'@@@' - - for item in workload: - sql_template = item.statement - for pattern in SQL_PATTERN: - sql_template = re.sub(pattern, placeholder, sql_template) - if sql_template not in templates: - templates[sql_template] = {} - templates[sql_template]['cnt'] = 0 - templates[sql_template]['samples'] = [] - templates[sql_template]['cnt'] += item.frequency - # reservoir sampling - if len(templates[sql_template]['samples']) < SAMPLE_NUM: - templates[sql_template]['samples'].append(item.statement) - else: - if random.randint(0, templates[sql_template]['cnt']) < SAMPLE_NUM: - templates[sql_template]['samples'][random.randint(0, SAMPLE_NUM - 1)] = item.statement - - return templates - - -def workload_compression(input_path): - compressed_workload = [] - - workload = load_workload(input_path) - templates = get_workload_template(workload) - - for key in templates.keys(): - for sql in templates[key]['samples']: - compressed_workload.append(QueryItem(sql, templates[key]['cnt'] / SAMPLE_NUM)) - return compressed_workload - - -# parse the explain plan to get estimated cost by database optimizer -def parse_explain_plan(plan): - cost_total = -1 - - plan_list = plan.split('\n') - for line in plan_list: - if '(cost=' in line: - pattern = re.compile(r'\(cost=([^\)]*)\)', re.S) - matched_res = re.search(pattern, line) - if matched_res: - cost_list = matched_res.group(1).split() - if len(cost_list) == 3: - cost_total = float(cost_list[0].split('..')[-1]) - break - - return cost_total - - -def estimate_workload_cost_file(workload, index_config=None): - total_cost = 0 - sql_file = str(time.time()) + '.sql' - found_plan = False - - with open(sql_file, 'w') as wf: - if index_config: - # create hypo-indexes - wf.write('SET enable_hypo_index = on;\n') - for index in index_config: - wf.write('SELECT hypopg_create_index(\'CREATE INDEX ON ' + index.table - + '(' + index.columns + ')\');\n') - for query in workload: - wf.write('EXPLAIN ' + query.statement + ';\n') - if index_config: - wf.write('SELECT hypopg_reset_index();') - - result = run_shell_sql_cmd(sql_file).split('\n') - if os.path.exists(sql_file): - os.remove(sql_file) - - # parse the result of explain plans - i = 0 - for line in result: - if 'QUERY PLAN' in line: - found_plan = True - if 'ERROR' in line: - workload.pop(i) - if found_plan and '(cost=' in line: - if i >= len(workload): - raise ValueError("The size of workload is not correct!") - query_cost = parse_explain_plan(line) - query_cost *= workload[i].frequency - workload[i].cost_list.append(query_cost) - total_cost += query_cost - found_plan = False - i += 1 - while i < len(workload): - workload[i].cost_list.append(0) - i += 1 - - return total_cost - - -def make_single_advisor_sql(ori_sql): - sql = 'select gs_index_advise(\'' - for ch in ori_sql: - if ch == '\'': - sql += '\'' - sql += ch - sql += '\');' - - return sql - - -def parse_single_advisor_result(res): - table_index_dict = {} - if len(res) > 2 and res[0:2] == (' ('): - items = res.split(',', 1) - table = items[0][2:] - indexes = re.split('[()]', items[1][:-1].strip('\"')) - for columns in indexes: - if columns == '': - continue - if table not in table_index_dict.keys(): - table_index_dict[table] = [] - table_index_dict[table].append(columns) - - return table_index_dict - - -# call the single-index-advisor in the database -def query_index_advisor(query): - table_index_dict = {} - - if 'select' not in query.lower(): - return table_index_dict - - sql = make_single_advisor_sql(query) - result = run_shell_cmd([sql]).split('\n') - - for res in result: - table_index_dict.update(parse_single_advisor_result(res)) - - return table_index_dict - - -# judge whether the index is used by the optimizer -def query_index_check(query, query_index_dict): - valid_indexes = {} - if len(query_index_dict) == 0: - return valid_indexes - - # create hypo-indexes - sql_list = ['SET enable_hypo_index = on;'] - for table in query_index_dict.keys(): - for columns in query_index_dict[table]: - if columns != '': - sql_list.append('SELECT hypopg_create_index(\'CREATE INDEX ON ' + table + '(' + columns + ')\')') - sql_list.append('explain ' + query) - sql_list.append('SELECT hypopg_reset_index()') - result = run_shell_cmd(sql_list).split('\n') - - # parse the result of explain plan - for line in result: - hypo_index = '' - if 'Index' in line and 'Scan' in line and 'btree' in line: - tokens = line.split(' ') - for token in tokens: - if 'btree' in token: - hypo_index = token.split('_', 1)[1] - if len(hypo_index) > 1: - for table in query_index_dict.keys(): - for columns in query_index_dict[table]: - index_name_list = columns.split(',') - index_name_list.insert(0, table) - index_name = "_".join(index_name_list) - if index_name != hypo_index: - continue - if table not in valid_indexes.keys(): - valid_indexes[table] = [] - valid_indexes[table].append(columns) - - return valid_indexes - - -# enumerate the column combinations for a suggested index -def get_indexable_columns(table_index_dict): - query_indexable_columns = {} - if len(table_index_dict) == 0: - return query_indexable_columns - - for table in table_index_dict.keys(): - query_indexable_columns[table] = [] - for columns in table_index_dict[table]: - indexable_columns = columns.split(',') - for column in indexable_columns: - query_indexable_columns[table].append(column) - - return query_indexable_columns - - -def generate_candidate_indexes(workload, iterate=False): - candidate_indexes = [] - index_dict = {} - - for k, query in enumerate(workload): - table_index_dict = query_index_advisor(query.statement) - if iterate: - need_check = False - query_indexable_columns = get_indexable_columns(table_index_dict) - valid_index_dict = query_index_check(query.statement, query_indexable_columns) - - for i in range(MAX_INDEX_COLUMN_NUM): - for table in valid_index_dict.keys(): - for columns in valid_index_dict[table]: - if columns.count(',') == i: - need_check = True - for single_column in query_indexable_columns[table]: - if single_column not in columns: - valid_index_dict[table].append(columns + ',' + single_column) - if need_check: - valid_index_dict = query_index_check(query.statement, valid_index_dict) - need_check = False - else: - break - else: - valid_index_dict = query_index_check(query.statement, table_index_dict) - - # filter duplicate indexes - for table in valid_index_dict.keys(): - if table not in index_dict.keys(): - index_dict[table] = set() - for columns in valid_index_dict[table]: - workload[k].valid_index_list.append(IndexItem(table, columns)) - if columns not in index_dict[table]: - print("table: ", table, "columns: ", columns) - index_dict[table].add(columns) - candidate_indexes.append(IndexItem(table, columns)) - - return candidate_indexes - - -def generate_candidate_indexes_file(workload): - candidate_indexes = [] - index_dict = {} - sql_file = str(time.time()) + '.sql' - - if len(workload) > 0: - run_shell_cmd([workload[0].statement]) - with open(sql_file, 'w') as wf: - for query in workload: - if 'select' in query.statement.lower(): - wf.write(make_single_advisor_sql(query.statement) + '\n') - - result = run_shell_sql_cmd(sql_file).split('\n') - if os.path.exists(sql_file): - os.remove(sql_file) - - for line in result: - table_index_dict = parse_single_advisor_result(line.strip('\n')) - # filter duplicate indexes - for table in table_index_dict.keys(): - if table not in index_dict.keys(): - index_dict[table] = set() - for columns in table_index_dict[table]: - if columns == "": - continue - if columns not in index_dict[table]: - print("table: ", table, "columns: ", columns) - index_dict[table].add(columns) - candidate_indexes.append(IndexItem(table, columns)) - - return candidate_indexes - - -def get_atomic_config_for_query(indexes, config, ind, atomic_configs): - if ind == len(indexes): - table_count = {} - for index in config: - if index.table not in table_count.keys(): - table_count[index.table] = 1 - else: - table_count[index.table] += 1 - if len(table_count) > 2 or table_count[index.table] > 2: - return - atomic_configs.append(config) - - return - - get_atomic_config_for_query(indexes, copy.copy(config), ind + 1, atomic_configs) - config.append(indexes[ind]) - get_atomic_config_for_query(indexes, copy.copy(config), ind + 1, atomic_configs) - - -def is_same_config(config1, config2): - if len(config1) != len(config2): - return False - - for index1 in config1: - is_found = False - for index2 in config2: - if index1.table == index2.table and index1.columns == index2.columns: - is_found = True - if not is_found: - return False - - return True - - -def generate_atomic_config(workload): - atomic_config_total = [] - - for query in workload: - if len(query.valid_index_list) == 0: - continue - - atomic_configs = [] - config = [] - get_atomic_config_for_query(query.valid_index_list, config, 0, atomic_configs) - - is_found = False - for new_config in atomic_configs: - for exist_config in atomic_config_total: - if is_same_config(new_config, exist_config): - is_found = True - break - if not is_found: - atomic_config_total.append(new_config) - is_found = False - - return atomic_config_total - - -# find the subsets of a given config in the atomic configs -def find_subsets_num(config, atomic_config_total): - atomic_subsets_num = [] - is_exist = False - - for i, atomic_config in enumerate(atomic_config_total): - if len(atomic_config) > len(config): - continue - for atomic_index in atomic_config: - is_exist = False - for index in config: - if atomic_index.table == index.table and atomic_index.columns == index.columns: - is_exist = True - break - if not is_exist: - break - if is_exist: - atomic_subsets_num.append(i) - - return atomic_subsets_num - - -def get_index_num(index, atomic_config_total): - for i, atomic_config in enumerate(atomic_config_total): - if len(atomic_config) == 1 and atomic_config[0].table == index.table and \ - atomic_config[0].columns == index.columns: - return i - - return -1 - - -# infer the total cost of workload for a config according to the cost of atomic configs -def infer_workload_cost(workload, config, atomic_config_total): - total_cost = 0 - - atomic_subsets_num = find_subsets_num(config, atomic_config_total) - if len(atomic_subsets_num) == 0: - raise ValueError("No atomic configs found for current config!") - - for i in range(len(workload)): - if max(atomic_subsets_num) >= len(workload[i].cost_list): - raise ValueError("Wrong atomic config for current query!") - # compute the cost for selection - min_cost = sys.maxsize - for num in atomic_subsets_num: - if num < len(workload[i].cost_list) and workload[i].cost_list[num] < min_cost: - min_cost = workload[i].cost_list[num] - total_cost += min_cost - - # compute the cost for updating indexes - if 'insert' in workload[i].statement.lower() or 'delete' in workload[i].statement.lower(): - for index in config: - index_num = get_index_num(index, atomic_config_total) - if index_num == -1: - raise ValueError("The index isn't found for current query!") - if 0 <= index_num < len(workload[i].cost_list): - total_cost += workload[i].cost_list[index_num] - workload[i].cost_list[0] - - return total_cost - - -def simple_index_advisor(input_path): - workload = workload_compression(input_path) - print_header_boundary(" Generate candidate indexes ") - candidate_indexes = generate_candidate_indexes_file(workload) - if len(candidate_indexes) == 0: - print("No candidate indexes generated!") - return - - print_header_boundary(" Determine optimal indexes ") - ori_total_cost = estimate_workload_cost_file(workload) - for i in range(len(candidate_indexes)): - new_total_cost = estimate_workload_cost_file(workload, [candidate_indexes[i]]) - candidate_indexes[i].benefit = ori_total_cost - new_total_cost - candidate_indexes = sorted(candidate_indexes, key=lambda item: item.benefit, reverse=True) - - # filter out duplicate index - final_index_set = {} - for index in candidate_indexes: - picked = True - cols = set(index.columns.split(',')) - if index.table not in final_index_set.keys(): - final_index_set[index.table] = [] - for i in range(len(final_index_set[index.table]) - 1, -1, -1): - pre_index = final_index_set[index.table][i] - pre_cols = set(pre_index.columns.split(',')) - if len(pre_cols.difference(cols)) == 0 and len(pre_cols) < len(cols): - final_index_set[index.table].pop(i) - if len(cols.difference(pre_cols)) == 0: - picked = False - break - if picked: - final_index_set[index.table].append(index) - - cnt = 0 - for table in final_index_set.keys(): - for index in final_index_set[table]: - print("create index ind" + str(cnt) + " on " + table + "(" + index.columns + ");") - cnt += 1 - if cnt == MAX_INDEX_NUM: - break - - -def greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes): - opt_config = [] - index_num_record = set() - min_cost = sys.maxsize - - while len(opt_config) < MAX_INDEX_NUM: - cur_min_cost = sys.maxsize - cur_index = None - cur_index_num = -1 - for k, index in enumerate(candidate_indexes): - if k in index_num_record: - continue - cur_config = copy.copy(opt_config) - cur_config.append(index) - cur_estimated_cost = infer_workload_cost(workload, cur_config, atomic_config_total) - if cur_estimated_cost < cur_min_cost: - cur_min_cost = cur_estimated_cost - cur_index = index - cur_index_num = k - if cur_index and cur_min_cost < min_cost: - min_cost = cur_min_cost - opt_config.append(cur_index) - index_num_record.add(cur_index_num) - else: - break - - return opt_config - - -def complex_index_advisor(input_path): - workload = workload_compression(input_path) - print_header_boundary(" Generate candidate indexes ") - candidate_indexes = generate_candidate_indexes(workload, True) - if len(candidate_indexes) == 0: - print("No candidate indexes generated!") - return - - print_header_boundary(" Determine optimal indexes ") - atomic_config_total = generate_atomic_config(workload) - if len(atomic_config_total[0]) != 0: - raise ValueError("The empty atomic config isn't generated!") - - for atomic_config in atomic_config_total: - estimate_workload_cost_file(workload, atomic_config) - - opt_config = greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes) - - cnt = 0 - for index in opt_config: - print("create index ind" + str(cnt) + " on " + index.table + "(" + index.columns + ");") - cnt += 1 - if cnt == MAX_INDEX_NUM: - break - - -def main(): - arg_parser = argparse.ArgumentParser(description='Generate index set for workload.') - arg_parser.add_argument("p", help="Port of database") - arg_parser.add_argument("d", help="Name of database") - arg_parser.add_argument("--h", help="Host for database") - arg_parser.add_argument("-U", help="Username for database log-in") - arg_parser.add_argument("-W", help="Password for database user", action=PwdAction) - arg_parser.add_argument("f", help="File containing workload queries (One query per line)") - arg_parser.add_argument("--max_index_num", help="Maximum number of suggested indexes", type=int) - arg_parser.add_argument("--multi_iter_mode", action='store_true', help="Whether to use multi-iteration algorithm", - default=False) - args = arg_parser.parse_args() - - global MAX_INDEX_NUM, BASE_CMD - MAX_INDEX_NUM = args.max_index_num or 10 - BASE_CMD = 'gsql -p ' + args.p + ' -d ' + args.d - if args.h: - BASE_CMD += ' -h ' + args.h - if args.U: - BASE_CMD += ' -U ' + args.U - if args.U != getpass.getuser() and not args.W: - raise ValueError('Enter the \'-W\' parameter for user ' + args.U + ' when executing the script.') - if args.W: - BASE_CMD += ' -W ' + args.W - - if args.multi_iter_mode: - complex_index_advisor(args.f) - else: - simple_index_advisor(args.f) - - -if __name__ == '__main__': - main() +""" +Copyright (c) 2020 Huawei Technologies Co.,Ltd. + +openGauss is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + + http://license.coscl.org.cn/MulanPSL2 + +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. +""" +import os +import sys +import argparse +import copy +import getpass +import random +import re +import shlex +import subprocess +import time +import select +import logging + +ENABLE_MULTI_NODE = False +SAMPLE_NUM = 5 +MAX_INDEX_COLUMN_NUM = 5 +MAX_INDEX_NUM = 10 +MAX_INDEX_STORAGE = None +FULL_ARRANGEMENT_THRESHOLD = 20 +BASE_CMD = '' +SHARP = '#' +SCHEMA = None +SQL_TYPE = ['select', 'delete', 'insert', 'update'] +SQL_PATTERN = [r'([^\\])\'((\')|(.*?([^\\])\'))', + r'([^\\])"((")|(.*?([^\\])"))', + r'([^a-zA-Z])-?\d+(\.\d+)?', + r'([^a-zA-Z])-?\d+(\.\d+)?', + r'(\'\d+\\.*?\')'] + +logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') + + +def read_input_from_pipe(): + """ + Read stdin input if there is "echo 'str1 str2' | python xx.py", + return the input string + """ + input_str = "" + r_handle, _, _ = select.select([sys.stdin], [], [], 0) + if not r_handle: + return "" + + for item in r_handle: + if item == sys.stdin: + input_str = sys.stdin.read().strip() + return input_str + + +class PwdAction(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + password = read_input_from_pipe() + if password: + logging.warning("Read password from pipe.") + else: + password = getpass.getpass("Password for database user:") + setattr(namespace, self.dest, password) + + +class QueryItem: + def __init__(self, sql, freq): + self.statement = sql + self.frequency = freq + self.valid_index_list = [] + self.cost_list = [] + + +class IndexItem: + def __init__(self, tbl, cols): + self.table = tbl + self.columns = cols + self.benefit = 0 + self.storage = 0 + + + +def run_shell_cmd(target_sql_list): + cmd = BASE_CMD + ' -c \"' + if SCHEMA: + cmd += 'set current_schema = %s; ' % SCHEMA + for target_sql in target_sql_list: + cmd += target_sql + ';' + cmd += '\"' + proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (stdout, stderr) = proc.communicate() + stdout, stderr = stdout.decode(), stderr.decode() + if 'gsql' in stderr or 'failed to connect' in stderr: + raise ConnectionError("An error occurred while connecting to the database.\n" + + "Details: " + stderr) + return stdout + + +def run_shell_sql_cmd(sql_file): + cmd = BASE_CMD + ' -f ./' + sql_file + + proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (stdout, stderr) = proc.communicate() + stdout, stderr = stdout.decode(), stderr.decode() + + return stdout + + +def green(text): + return '\033[32m%s\033[0m' % text + + +def print_header_boundary(header): + # Output a header first, which looks more beautiful. + try: + term_width = os.get_terminal_size().columns + # The width of each of the two sides of the terminal. + side_width = (term_width - len(header)) // 2 + except (AttributeError, OSError): + side_width = 0 + title = SHARP * side_width + header + SHARP * side_width + print(green(title)) + + +def load_workload(file_path): + wd_dict = {} + workload = [] + + with open(file_path, 'r') as file: + raw_text = ''.join(file.readlines()) + sqls = raw_text.split(';') + for sql in sqls: + if any(tp in sql.lower() for tp in SQL_TYPE): + if sql not in wd_dict.keys(): + wd_dict[sql] = 1 + else: + wd_dict[sql] += 1 + for sql, freq in wd_dict.items(): + workload.append(QueryItem(sql, freq)) + + return workload + + +def get_workload_template(workload): + templates = {} + placeholder = r'@@@' + + for item in workload: + sql_template = item.statement + for pattern in SQL_PATTERN: + sql_template = re.sub(pattern, placeholder, sql_template) + if sql_template not in templates: + templates[sql_template] = {} + templates[sql_template]['cnt'] = 0 + templates[sql_template]['samples'] = [] + templates[sql_template]['cnt'] += item.frequency + # reservoir sampling + if len(templates[sql_template]['samples']) < SAMPLE_NUM: + templates[sql_template]['samples'].append(item.statement) + else: + if random.randint(0, templates[sql_template]['cnt']) < SAMPLE_NUM: + templates[sql_template]['samples'][random.randint(0, SAMPLE_NUM - 1)] = \ + item.statement + + return templates + + +def workload_compression(input_path): + compressed_workload = [] + + workload = load_workload(input_path) + templates = get_workload_template(workload) + + for _, elem in templates.items(): + for sql in elem['samples']: + compressed_workload.append(QueryItem(sql, elem['cnt'] / SAMPLE_NUM)) + return compressed_workload + + +# parse the explain plan to get estimated cost by database optimizer +def parse_explain_plan(plan): + cost_total = -1 + + plan_list = plan.split('\n') + for line in plan_list: + if '(cost=' in line: + pattern = re.compile(r'\(cost=([^\)]*)\)', re.S) + matched_res = re.search(pattern, line) + if matched_res: + cost_list = matched_res.group(1).split() + if len(cost_list) == 3: + cost_total = float(cost_list[0].split('..')[-1]) + break + + return cost_total + + +def update_index_storage(res, index_config, hypo_index_num): + index_id = re.findall(r'<\d+>', res)[0].strip('<>') + index_size_sql = 'select * from hypopg_estimate_size(%s);' % index_id + res = run_shell_cmd([index_size_sql]).split('\n') + for item in res: + if re.match(r'\d+', item.strip()): + index_config[hypo_index_num].storage = float(item.strip()) / 1024 / 1024 + + +def estimate_workload_cost_file(workload, index_config=None): + total_cost = 0 + sql_file = str(time.time()) + '.sql' + found_plan = False + hypo_index = False + with open(sql_file, 'w') as file: + if index_config: + # create hypo-indexes + if SCHEMA: + file.write('SET current_schema = %s;\n' % SCHEMA) + file.write('SET enable_hypo_index = on;\n') + for index in index_config: + file.write("SELECT hypopg_create_index('CREATE INDEX ON %s(%s)');\n" % + (index.table, index.columns)) + if ENABLE_MULTI_NODE: + file.write('set enable_fast_query_shipping = off;\n') + for query in workload: + file.write('EXPLAIN ' + query.statement + ';\n') + + result = run_shell_sql_cmd(sql_file).split('\n') + if os.path.exists(sql_file): + os.remove(sql_file) + + # parse the result of explain plans + i = 0 + hypo_index_num = 0 + for line in result: + if 'QUERY PLAN' in line: + found_plan = True + if 'ERROR' in line: + workload.pop(i) + if 'hypopg_create_index' in line: + hypo_index = True + if found_plan and '(cost=' in line: + if i >= len(workload): + raise ValueError("The size of workload is not correct!") + query_cost = parse_explain_plan(line) + query_cost *= workload[i].frequency + workload[i].cost_list.append(query_cost) + total_cost += query_cost + found_plan = False + i += 1 + if hypo_index: + if 'btree' in line and MAX_INDEX_STORAGE: + hypo_index = False + update_index_storage(line, index_config, hypo_index_num) + hypo_index_num += 1 + while i < len(workload): + workload[i].cost_list.append(0) + i += 1 + if index_config: + run_shell_cmd(['SELECT hypopg_reset_index();']) + + return total_cost + + +def make_single_advisor_sql(ori_sql): + sql = 'select gs_index_advise(\'' + for elem in ori_sql: + if elem == '\'': + sql += '\'' + sql += elem + sql += '\');' + + return sql + + +def parse_single_advisor_result(res): + table_index_dict = {} + if len(res) > 2 and res[0:2] == ' (': + items = res.split(',', 1) + table = items[0][2:] + indexes = re.split('[()]', items[1][:-1].strip('\"')) + for columns in indexes: + if columns == '': + continue + if table not in table_index_dict.keys(): + table_index_dict[table] = [] + table_index_dict[table].append(columns) + + return table_index_dict + + +# call the single-index-advisor in the database +def query_index_advisor(query): + table_index_dict = {} + + if 'select' not in query.lower(): + return table_index_dict + + sql = make_single_advisor_sql(query) + result = run_shell_cmd([sql]).split('\n') + + for res in result: + table_index_dict.update(parse_single_advisor_result(res)) + + return table_index_dict + + +# judge whether the index is used by the optimizer +def query_index_check(query, query_index_dict): + valid_indexes = {} + if len(query_index_dict) == 0: + return valid_indexes + + # create hypo-indexes + sql_list = ['SET enable_hypo_index = on;'] + if ENABLE_MULTI_NODE: + sql_list.append('SET enable_fast_query_shipping = off;') + for table in query_index_dict.keys(): + for columns in query_index_dict[table]: + if columns != '': + sql_list.append("SELECT hypopg_create_index('CREATE INDEX ON %s(%s)')" % + (table, columns)) + sql_list.append('explain ' + query) + sql_list.append('SELECT hypopg_reset_index()') + result = run_shell_cmd(sql_list).split('\n') + + # parse the result of explain plan + for line in result: + hypo_index = '' + if 'Index' in line and 'Scan' in line and 'btree' in line: + tokens = line.split(' ') + for token in tokens: + if 'btree' in token: + hypo_index = token.split('_', 1)[1] + if len(hypo_index) > 1: + for table in query_index_dict.keys(): + for columns in query_index_dict[table]: + index_name_list = columns.split(',') + index_name_list.insert(0, table) + index_name = "_".join(index_name_list) + if index_name != hypo_index: + continue + if table not in valid_indexes.keys(): + valid_indexes[table] = [] + if columns not in valid_indexes[table]: + valid_indexes[table].append(columns) + + return valid_indexes + + +# enumerate the column combinations for a suggested index +def get_indexable_columns(table_index_dict): + query_indexable_columns = {} + if len(table_index_dict) == 0: + return query_indexable_columns + + for table in table_index_dict.keys(): + query_indexable_columns[table] = [] + for columns in table_index_dict[table]: + indexable_columns = columns.split(',') + for column in indexable_columns: + query_indexable_columns[table].append(column) + + return query_indexable_columns + + +def generate_candidate_indexes(workload, iterate=False): + candidate_indexes = [] + index_dict = {} + + for k, query in enumerate(workload): + table_index_dict = query_index_advisor(query.statement) + if iterate: + need_check = False + query_indexable_columns = get_indexable_columns(table_index_dict) + valid_index_dict = query_index_check(query.statement, query_indexable_columns) + + for i in range(MAX_INDEX_COLUMN_NUM): + for table in valid_index_dict.keys(): + for columns in valid_index_dict[table]: + if columns.count(',') == i: + need_check = True + for single_column in query_indexable_columns[table]: + if single_column not in columns: + valid_index_dict[table].append(columns + ',' + single_column) + if need_check: + valid_index_dict = query_index_check(query.statement, valid_index_dict) + need_check = False + else: + break + else: + valid_index_dict = query_index_check(query.statement, table_index_dict) + + # filter duplicate indexes + for table in valid_index_dict.keys(): + if table not in index_dict.keys(): + index_dict[table] = set() + for columns in valid_index_dict[table]: + if len(workload[k].valid_index_list) >= FULL_ARRANGEMENT_THRESHOLD: + break + workload[k].valid_index_list.append(IndexItem(table, columns)) + if columns not in index_dict[table]: + print("table: ", table, "columns: ", columns) + index_dict[table].add(columns) + candidate_indexes.append(IndexItem(table, columns)) + + return candidate_indexes + + +def generate_candidate_indexes_file(workload): + candidate_indexes = [] + index_dict = {} + sql_file = str(time.time()) + '.sql' + + if len(workload) > 0: + run_shell_cmd([workload[0].statement]) + with open(sql_file, 'w') as file: + if SCHEMA: + file.write('SET current_schema = %s;\n' % SCHEMA) + for query in workload: + if 'select' in query.statement.lower(): + file.write(make_single_advisor_sql(query.statement) + '\n') + + result = run_shell_sql_cmd(sql_file).split('\n') + if os.path.exists(sql_file): + os.remove(sql_file) + + for line in result: + table_index_dict = parse_single_advisor_result(line.strip('\n')) + # filter duplicate indexes + for table, columns in table_index_dict.items(): + if table not in index_dict.keys(): + index_dict[table] = set() + for column in columns: + if column == "": + continue + if column not in index_dict[table]: + print("table: ", table, "columns: ", column) + index_dict[table].add(column) + candidate_indexes.append(IndexItem(table, column)) + + return candidate_indexes + + +def get_atomic_config_for_query(indexes, config, ind, atomic_configs): + if ind == len(indexes): + table_count = {} + for index in config: + if index.table not in table_count.keys(): + table_count[index.table] = 1 + else: + table_count[index.table] += 1 + if len(table_count) > 2 or table_count[index.table] > 2: + return + atomic_configs.append(config) + + return + + get_atomic_config_for_query(indexes, copy.copy(config), ind + 1, atomic_configs) + config.append(indexes[ind]) + get_atomic_config_for_query(indexes, copy.copy(config), ind + 1, atomic_configs) + + +def is_same_config(config1, config2): + if len(config1) != len(config2): + return False + + for index1 in config1: + is_found = False + for index2 in config2: + if index1.table == index2.table and index1.columns == index2.columns: + is_found = True + if not is_found: + return False + + return True + + +def generate_atomic_config(workload): + atomic_config_total = [] + + for query in workload: + if len(query.valid_index_list) == 0: + continue + + atomic_configs = [] + config = [] + get_atomic_config_for_query(query.valid_index_list, config, 0, atomic_configs) + + is_found = False + for new_config in atomic_configs: + for exist_config in atomic_config_total: + if is_same_config(new_config, exist_config): + is_found = True + break + if not is_found: + atomic_config_total.append(new_config) + is_found = False + + return atomic_config_total + + +# find the subsets of a given config in the atomic configs +def find_subsets_num(config, atomic_config_total): + atomic_subsets_num = [] + is_exist = False + + for i, atomic_config in enumerate(atomic_config_total): + if len(atomic_config) > len(config): + continue + for atomic_index in atomic_config: + is_exist = False + for index in config: + if atomic_index.table == index.table and atomic_index.columns == index.columns: + index.storage = atomic_index.storage + is_exist = True + break + if not is_exist: + break + if is_exist: + atomic_subsets_num.append(i) + + return atomic_subsets_num + + +def get_index_num(index, atomic_config_total): + for i, atomic_config in enumerate(atomic_config_total): + if len(atomic_config) == 1 and atomic_config[0].table == index.table and \ + atomic_config[0].columns == index.columns: + return i + + return -1 + + +# infer the total cost of workload for a config according to the cost of atomic configs +def infer_workload_cost(workload, config, atomic_config_total): + total_cost = 0 + + atomic_subsets_num = find_subsets_num(config, atomic_config_total) + if len(atomic_subsets_num) == 0: + raise ValueError("No atomic configs found for current config!") + + for ind, obj in enumerate(workload): + if max(atomic_subsets_num) >= len(obj.cost_list): + raise ValueError("Wrong atomic config for current query!") + # compute the cost for selection + min_cost = sys.maxsize + for num in atomic_subsets_num: + if num < len(obj.cost_list) and obj.cost_list[num] < min_cost: + min_cost = obj.cost_list[num] + total_cost += min_cost + + # compute the cost for updating indexes + if 'insert' in obj.statement.lower() or 'delete' in obj.statement.lower(): + for index in config: + index_num = get_index_num(index, atomic_config_total) + if index_num == -1: + raise ValueError("The index isn't found for current query!") + if 0 <= index_num < len(workload[ind].cost_list): + total_cost += obj.cost_list[index_num] - obj.cost_list[0] + + return total_cost + + +def simple_index_advisor(input_path): + workload = workload_compression(input_path) + print_header_boundary(" Generate candidate indexes ") + candidate_indexes = generate_candidate_indexes_file(workload) + if len(candidate_indexes) == 0: + print("No candidate indexes generated!") + return + + print_header_boundary(" Determine optimal indexes ") + ori_total_cost = estimate_workload_cost_file(workload) + for _, obj in enumerate(candidate_indexes): + new_total_cost = estimate_workload_cost_file(workload, [obj]) + obj.benefit = ori_total_cost - new_total_cost + candidate_indexes = sorted(candidate_indexes, key=lambda item: item.benefit, reverse=True) + + # filter out duplicate index + final_index_set = {} + for index in candidate_indexes: + picked = True + cols = set(index.columns.split(',')) + if index.table not in final_index_set.keys(): + final_index_set[index.table] = [] + for i in range(len(final_index_set[index.table]) - 1, -1, -1): + pre_index = final_index_set[index.table][i] + pre_cols = set(pre_index.columns.split(',')) + if len(pre_cols.difference(cols)) == 0 and len(pre_cols) < len(cols): + final_index_set[index.table].pop(i) + if len(cols.difference(pre_cols)) == 0: + picked = False + break + if picked: + final_index_set[index.table].append(index) + + cnt = 0 + index_current_storage = 0 + for table, indexs in final_index_set.items(): + for index in indexs: + if cnt == MAX_INDEX_NUM: + break + if MAX_INDEX_STORAGE and (index_current_storage + index.storage) > MAX_INDEX_STORAGE: + continue + index_current_storage += index.storage + print("create index ind" + str(cnt) + " on " + table + "(" + index.columns + ");") + cnt += 1 + else: + continue + break + + +def greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes): + opt_config = [] + index_num_record = set() + min_cost = sys.maxsize + for i in range(len(candidate_indexes)): + if i == 1 and min_cost == sys.maxsize: + break + cur_min_cost = sys.maxsize + cur_index = None + cur_index_num = -1 + for k, index in enumerate(candidate_indexes): + if k in index_num_record: + continue + cur_config = copy.copy(opt_config) + cur_config.append(index) + cur_estimated_cost = infer_workload_cost(workload, cur_config, atomic_config_total) + if cur_estimated_cost < cur_min_cost: + cur_min_cost = cur_estimated_cost + cur_index = index + cur_index_num = k + if cur_index and cur_min_cost < min_cost: + if MAX_INDEX_STORAGE and sum([obj.storage for obj in opt_config]) + \ + cur_index.storage > MAX_INDEX_STORAGE: + continue + if len(opt_config) == MAX_INDEX_NUM: + break + min_cost = cur_min_cost + opt_config.append(cur_index) + index_num_record.add(cur_index_num) + else: + break + + return opt_config + + +def complex_index_advisor(input_path): + workload = workload_compression(input_path) + print_header_boundary(" Generate candidate indexes ") + candidate_indexes = generate_candidate_indexes(workload, True) + if len(candidate_indexes) == 0: + print("No candidate indexes generated!") + return + + print_header_boundary(" Determine optimal indexes ") + atomic_config_total = generate_atomic_config(workload) + if len(atomic_config_total[0]) != 0: + raise ValueError("The empty atomic config isn't generated!") + + for atomic_config in atomic_config_total: + estimate_workload_cost_file(workload, atomic_config) + + opt_config = greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes) + + cnt = 0 + index_current_storage = 0 + for index in opt_config: + if MAX_INDEX_STORAGE and (index_current_storage + index.storage) > MAX_INDEX_STORAGE: + continue + if cnt == MAX_INDEX_NUM: + break + index_current_storage += index.storage + print("create index ind" + str(cnt) + " on " + index.table + "(" + index.columns + ");") + cnt += 1 + + +def main(): + arg_parser = argparse.ArgumentParser(description='Generate index set for workload.') + arg_parser.add_argument("p", help="Port of database") + arg_parser.add_argument("d", help="Name of database") + arg_parser.add_argument("--h", help="Host for database") + arg_parser.add_argument("-U", help="Username for database log-in") + arg_parser.add_argument("-W", help="Password for database user", nargs="?", action=PwdAction) + arg_parser.add_argument("f", help="File containing workload queries (One query per line)") + arg_parser.add_argument("--schema", help="Schema name for the current business data") + arg_parser.add_argument("--max_index_num", help="Maximum number of suggested indexes", type=int) + arg_parser.add_argument("--max_index_storage", + help="Maximum storage of suggested indexes/MB", type=int) + arg_parser.add_argument("--multi_iter_mode", action='store_true', + help="Whether to use multi-iteration algorithm", default=False) + arg_parser.add_argument("--multi_node", action='store_true', + help="Whether to support distributed scenarios", default=False) + args = arg_parser.parse_args() + + global MAX_INDEX_NUM, BASE_CMD, ENABLE_MULTI_NODE, MAX_INDEX_STORAGE, SCHEMA + if args.max_index_num is not None and args.max_index_num <= 0: + raise argparse.ArgumentTypeError("%s is an invalid positive int value" % + args.max_index_num) + if args.max_index_storage is not None and args.max_index_storage <= 0: + raise argparse.ArgumentTypeError("%s is an invalid positive int value" % + args.max_index_storage) + if args.schema: + SCHEMA = args.schema + MAX_INDEX_NUM = args.max_index_num or 10 + ENABLE_MULTI_NODE = args.multi_node + MAX_INDEX_STORAGE = args.max_index_storage + BASE_CMD = 'gsql -p ' + args.p + ' -d ' + args.d + if args.h: + BASE_CMD += ' -h ' + args.h + if args.U: + BASE_CMD += ' -U ' + args.U + if args.U != getpass.getuser() and not args.W: + raise ValueError('Enter the \'-W\' parameter for user ' + + args.U + ' when executing the script.') + if args.W: + BASE_CMD += ' -W ' + args.W + + if args.multi_iter_mode: + complex_index_advisor(args.f) + else: + simple_index_advisor(args.f) + + +if __name__ == '__main__': + main()