!929 Fix issue: Fix some bugs on the index advisor

Merge pull request !929 from wangtq/master
This commit is contained in:
opengauss-bot
2021-04-27 11:08:55 +08:00
committed by Gitee
4 changed files with 911 additions and 766 deletions

View File

@ -1021,10 +1021,6 @@ const char *hypo_explain_get_index_name_hook(Oid indexId)
*/
Datum hypopg_display_index(PG_FUNCTION_ARGS)
{
#ifdef ENABLE_MULTIPLE_NODES
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("not support for distributed scenarios yet.")));
#endif
ReturnSetInfo *rsinfo = (ReturnSetInfo *)fcinfo->resultinfo;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
@ -1065,6 +1061,7 @@ Datum hypopg_display_index(PG_FUNCTION_ARGS)
Datum values[HYPO_INDEX_NB_COLS];
bool nulls[HYPO_INDEX_NB_COLS];
StringInfoData index_columns;
char *rel_name = NULL;
int i = 0;
int keyno;
@ -1073,9 +1070,13 @@ Datum hypopg_display_index(PG_FUNCTION_ARGS)
rc = memset_s(nulls, sizeof(nulls), 0, sizeof(nulls));
securec_check(rc, "\0", "\0");
rel_name = get_rel_name(entry->relid);
if (rel_name == NULL) {
break;
}
values[i++] = CStringGetTextDatum(entry->indexname);
values[i++] = ObjectIdGetDatum(entry->oid);
values[i++] = CStringGetTextDatum(get_rel_name(entry->relid));
values[i++] = CStringGetTextDatum(rel_name);
initStringInfo(&index_columns);
appendStringInfo(&index_columns, "(");
@ -1104,10 +1105,6 @@ Datum hypopg_display_index(PG_FUNCTION_ARGS)
*/
Datum hypopg_create_index(PG_FUNCTION_ARGS)
{
#ifdef ENABLE_MULTIPLE_NODES
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("not support for distributed scenarios yet.")));
#endif
char *sql = TextDatumGetCString(PG_GETARG_TEXT_PP(0));
List *parsetree_list;
ListCell *parsetree_item;
@ -1183,10 +1180,6 @@ Datum hypopg_create_index(PG_FUNCTION_ARGS)
*/
Datum hypopg_drop_index(PG_FUNCTION_ARGS)
{
#ifdef ENABLE_MULTIPLE_NODES
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("not support for distributed scenarios yet.")));
#endif
Oid indexid = PG_GETARG_OID(0);
PG_RETURN_BOOL(hypo_index_remove(indexid));
@ -1197,10 +1190,6 @@ Datum hypopg_drop_index(PG_FUNCTION_ARGS)
*/
Datum hypopg_estimate_size(PG_FUNCTION_ARGS)
{
#ifdef ENABLE_MULTIPLE_NODES
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("not support for distributed scenarios yet.")));
#endif
BlockNumber pages;
double tuples;
Oid indexid = PG_GETARG_OID(0);
@ -1228,10 +1217,6 @@ Datum hypopg_estimate_size(PG_FUNCTION_ARGS)
*/
Datum hypopg_reset_index(PG_FUNCTION_ARGS)
{
#ifdef ENABLE_MULTIPLE_NODES
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("not support for distributed scenarios yet.")));
#endif
hypo_index_reset();
PG_RETURN_VOID();
}

View File

@ -31,6 +31,7 @@
#include "catalog/indexing.h"
#include "catalog/pg_attribute.h"
#include "funcapi.h"
#include "nodes/makefuncs.h"
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "pg_config_manual.h"
@ -67,7 +68,7 @@ typedef struct {
typedef struct {
char *table_name;
char *alias_name;
List *alias_name;
List *index;
List *join_cond;
List *index_print;
@ -105,6 +106,8 @@ static void startup(DestReceiver *self, int operation, TupleDesc typeinfo);
static void destroy(DestReceiver *self);
static void free_global_resource();
static void find_select_stmt(Node *);
static void extract_stmt_from_clause(List *);
static void extract_stmt_where_clause(Node *);
static void parse_where_clause(Node *);
static void field_value_trans(_out_ char *, A_Const *);
static void parse_field_expr(List *, List *, List *);
@ -114,12 +117,12 @@ static uint4 calculate_field_cardinality(const char *, const char *);
static bool is_tmp_table(const char *);
static char *find_field_name(List *);
static char *find_table_name(List *);
static bool check_relation_type_valid(Oid);
static TableCell *find_or_create_tblcell(char *, char *);
static void add_index_from_field(char *, IndexCell *);
static char *parse_group_clause(List *, List *);
static char *parse_order_clause(List *, List *);
static void add_index_from_group_order(TableCell *, List *, List *, bool);
static Oid find_table_oid(List *, const char *);
static void generate_final_index(TableCell *, Oid);
static void parse_from_clause(List *);
static void add_drived_tables(RangeVar *);
@ -134,10 +137,6 @@ static void add_index_for_drived_tables();
Datum gs_index_advise(PG_FUNCTION_ARGS)
{
#ifdef ENABLE_MULTIPLE_NODES
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("not support for distributed scenarios yet.")));
#endif
FuncCallContext *func_ctx = NULL;
SuggestedIndex *array = NULL;
@ -233,6 +232,8 @@ SuggestedIndex *suggest_index(const char *query_string, _out_ int *len)
}
Node *parsetree = (Node *)lfirst(list_head(parse_tree_list));
Node* parsetree_copy = (Node*)copyObject(parsetree);
(void)parse_analyze(parsetree_copy, query_string, NULL, 0);
find_select_stmt(parsetree);
if (!g_stmt_list) {
@ -245,9 +246,6 @@ SuggestedIndex *suggest_index(const char *query_string, _out_ int *len)
foreach (item, g_stmt_list) {
SelectStmt *stmt = (SelectStmt *)lfirst(item);
parse_from_clause(stmt->fromClause);
/* Note: the structure JoinExpr will be modified after 'parse_analyze', so 'parse_from_clause'
should be executed first. */
Query *query_tree = parse_analyze((Node *)stmt, query_string, NULL, 0);
if (g_table_list) {
parse_where_clause(stmt->whereClause);
@ -267,8 +265,9 @@ SuggestedIndex *suggest_index(const char *query_string, _out_ int *len)
foreach (table_item, g_table_list) {
TableCell *table = (TableCell *)lfirst(table_item);
if (table->index != NIL) {
Oid table_oid = find_table_oid(query_tree->rtable, table->table_name);
if (table_oid == 0) {
RangeVar* rtable = makeRangeVar(NULL, table->table_name, -1);
Oid table_oid = RangeVarGetRelid(rtable, NoLock, true);
if (table_oid == InvalidOid) {
continue;
}
generate_final_index(table, table_oid);
@ -277,9 +276,12 @@ SuggestedIndex *suggest_index(const char *query_string, _out_ int *len)
g_driver_table = NULL;
}
}
if (g_table_list == NIL) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("can not advise for query: %s.", query_string)));
}
// Format the returned result, e.g., 'table1, "(col1,col2),(col3)"'.
int array_len = g_table_list == NIL ? 0 : g_table_list->length;
int array_len = g_table_list->length;
*len = array_len;
SuggestedIndex *array = (SuggestedIndex *)palloc0(sizeof(SuggestedIndex) * array_len);
errno_t rc = EOK;
@ -299,6 +301,9 @@ SuggestedIndex *suggest_index(const char *query_string, _out_ int *len)
ListCell *cur_index = NULL;
int j = 0;
foreach (cur_index, index_list) {
if (strlen((char *)lfirst(cur_index)) + strlen((array + i)->column) + 3 > NAMEDATALEN) {
continue;
}
if (j > 0) {
rc = strcat_s((array + i)->column, NAMEDATALEN, ",(");
} else {
@ -551,15 +556,15 @@ void destroy(DestReceiver *self)
*/
void find_select_stmt(Node *parsetree)
{
switch (nodeTag(parsetree)) {
case T_SelectStmt: {
if (parsetree == NULL || nodeTag(parsetree) != T_SelectStmt) {
return;
}
SelectStmt *stmt = (SelectStmt *)parsetree;
bool has_substmt = false;
g_stmt_list = lappend(g_stmt_list, stmt);
switch (stmt->op) {
case SETOP_UNION: {
// analyze the set operation: union
has_substmt = true;
find_select_stmt((Node *)stmt->larg);
find_select_stmt((Node *)stmt->rarg);
break;
@ -568,7 +573,6 @@ void find_select_stmt(Node *parsetree)
// analyze the 'with' clause
if (stmt->withClause) {
List *cte_list = stmt->withClause->ctes;
has_substmt = true;
ListCell *item = NULL;
foreach (item, cte_list) {
@ -581,49 +585,12 @@ void find_select_stmt(Node *parsetree)
// analyze the 'from' clause
if (stmt->fromClause) {
List *from_list = stmt->fromClause;
ListCell *item = NULL;
foreach (item, from_list) {
Node *from = (Node *)lfirst(item);
if (IsA(from, RangeSubselect)) {
has_substmt = true;
find_select_stmt(((RangeSubselect *)from)->subquery);
}
}
extract_stmt_from_clause(stmt->fromClause);
}
// analyze the 'where' clause
if (stmt->whereClause) {
Node *item_where = stmt->whereClause;
if (IsA(item_where, SubLink)) {
has_substmt = true;
find_select_stmt(((SubLink *)item_where)->subselect);
}
while (IsA(item_where, A_Expr)) {
A_Expr *expr = (A_Expr *)item_where;
Node *lexpr = expr->lexpr;
Node *rexpr = expr->rexpr;
if (IsA(lexpr, SubLink)) {
has_substmt = true;
find_select_stmt(((SubLink *)lexpr)->subselect);
}
if (IsA(rexpr, SubLink)) {
has_substmt = true;
find_select_stmt(((SubLink *)rexpr)->subselect);
}
item_where = lexpr;
}
}
if (!has_substmt) {
g_stmt_list = lappend(g_stmt_list, stmt);
}
break;
}
default: {
break;
}
extract_stmt_where_clause(stmt->whereClause);
}
break;
}
@ -633,20 +600,45 @@ void find_select_stmt(Node *parsetree)
}
}
Oid find_table_oid(List *list, const char *table_name)
void extract_stmt_from_clause(List *from_list)
{
ListCell *item = NULL;
foreach (item, list) {
RangeTblEntry *entry = (RangeTblEntry *)lfirst(item);
if (entry != NULL && entry->relname != NULL) {
if (strcasecmp(table_name, entry->relname) == 0) {
return entry->relid;
foreach (item, from_list) {
Node *from = (Node *)lfirst(item);
if (from && IsA(from, RangeSubselect)) {
find_select_stmt(((RangeSubselect *)from)->subquery);
}
if (from && IsA(from, JoinExpr)) {
Node *larg = ((JoinExpr *)from)->larg;
Node *rarg = ((JoinExpr *)from)->rarg;
if (larg && IsA(larg, RangeSubselect)) {
find_select_stmt(((RangeSubselect *)larg)->subquery);
}
if (rarg && IsA(rarg, RangeSubselect)) {
find_select_stmt(((RangeSubselect *)rarg)->subquery);
}
}
}
}
return 0;
void extract_stmt_where_clause(Node *item_where)
{
if (IsA(item_where, SubLink)) {
find_select_stmt(((SubLink *)item_where)->subselect);
}
while (item_where && IsA(item_where, A_Expr)) {
A_Expr *expr = (A_Expr *)item_where;
Node *lexpr = expr->lexpr;
Node *rexpr = expr->rexpr;
if (lexpr && IsA(lexpr, SubLink)) {
find_select_stmt(((SubLink *)lexpr)->subselect);
}
if (rexpr && IsA(rexpr, SubLink)) {
find_select_stmt(((SubLink *)rexpr)->subselect);
}
item_where = lexpr;
}
}
/*
@ -739,6 +731,10 @@ void parse_where_clause(Node *item_where)
Node *rexpr = expr->rexpr;
List *field_value = NIL;
if (lexpr == NULL || rexpr == NULL) {
return;
}
switch (expr->kind) {
case AEXPR_OP: { // normal operator
if (IsA(lexpr, ColumnRef) && IsA(rexpr, ColumnRef)) {
@ -806,8 +802,13 @@ void parse_from_clause(List *from_list)
void add_drived_tables(RangeVar *join_node)
{
char *table_name = ((RangeVar *)join_node)->relname;
TableCell *join_table = find_or_create_tblcell(table_name, NULL);
TableCell *join_table = NULL;
if (join_node->alias) {
join_table = find_or_create_tblcell(join_node->relname, join_node->alias->aliasname);
} else {
join_table = find_or_create_tblcell(join_node->relname, NULL);
}
if (!join_table) {
return;
@ -928,7 +929,7 @@ void parse_join_expr(JoinExpr *join_tree)
// convert field value to string
void field_value_trans(_out_ char *target, A_Const *field_value)
{
Value value = ((A_Const *)field_value)->val;
Value value = field_value->val;
if (value.type == T_Integer) {
pg_itoa(value.val.ival, target);
@ -963,6 +964,9 @@ void parse_field_expr(List *field, List *op, List *lfield_values)
// get field values
foreach (item, lfield_values) {
char *str = (char *)palloc0(MAX_QUERY_LEN);
if (!IsA(lfirst(item), A_Const)) {
continue;
}
field_value_trans(str, (A_Const *)lfirst(item));
if (i == 0) {
rc = strcpy_s(field_value, MAX_QUERY_LEN, str);
@ -976,6 +980,12 @@ void parse_field_expr(List *field, List *op, List *lfield_values)
pfree(str);
}
if (i == 0) {
pfree(field_value);
pfree(field_expr);
return;
}
// get field expression, e.g., 'id = 100'
if (strcasecmp(op_type, "~~") == 0) {
// ...like...
@ -1076,19 +1086,20 @@ char *find_table_name(List *fields)
char *table = NULL;
ListCell *item = NULL;
ListCell *sub_item = NULL;
// if fields have table name
if (fields->length > 1) {
table = strVal(linitial(fields));
// check temporary tables and existed tables
if (is_tmp_table(table)) {
free_global_resource();
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("can not advise for temporary table: %s.", table)));
} else {
// check existed tables
foreach (item, g_table_list) {
TableCell *cur_table = (TableCell *)lfirst(item);
if (strcasecmp(table, cur_table->table_name) == 0 ||
(cur_table->alias_name && strcasecmp(table, cur_table->alias_name) == 0)) {
if (strcasecmp(table, cur_table->table_name) == 0) {
return cur_table->table_name;
}
foreach (sub_item, cur_table->alias_name) {
char *cur_alias_name = (char *)lfirst(sub_item);
if (strcasecmp(table, cur_alias_name) == 0) {
return cur_table->table_name;
}
}
@ -1127,7 +1138,33 @@ char *find_table_name(List *fields)
return NULL;
}
// Check whether the table is temporary.
// Check whether the table type is supported.
bool check_relation_type_valid(Oid relid)
{
Relation relation;
bool result = false;
relation = heap_open(relid, AccessShareLock);
if (RelationIsValid(relation) == false) {
heap_close(relation, AccessShareLock);
return result;
}
if (RelationIsRelation(relation) &&
RelationGetPartType(relation) == PARTTYPE_NON_PARTITIONED_RELATION &&
RelationGetRelPersistence(relation) == RELPERSISTENCE_PERMANENT) {
const char *format = ((relation->rd_options) && (((StdRdOptions *)(relation->rd_options))->orientation)) ?
((char *)(relation->rd_options) + *(int *)&(((StdRdOptions *)(relation->rd_options))->orientation)) :
ORIENTATION_ROW;
if (pg_strcasecmp(format, ORIENTATION_ROW) == 0) {
result = true;
}
}
heap_close(relation, AccessShareLock);
return result;
}
bool is_tmp_table(const char *table_name)
{
ListCell *item = NULL;
@ -1138,6 +1175,7 @@ bool is_tmp_table(const char *table_name)
return true;
}
}
return false;
}
@ -1152,30 +1190,54 @@ TableCell *find_or_create_tblcell(char *table_name, char *alias_name)
return NULL;
}
if (is_tmp_table(table_name)) {
free_global_resource();
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("can not advise for temporary table: %s.", table_name)));
ereport(WARNING, (errmsg("can not advise for: %s.", table_name)));
return NULL;
}
// seach the table among existed tables
ListCell *item = NULL;
ListCell *sub_item = NULL;
if (g_table_list != NIL) {
foreach (item, g_table_list) {
TableCell *cur_table = (TableCell *)lfirst(item);
char *cur_table_name = cur_table->table_name;
char *cur_alias_name = cur_table->alias_name;
if (strcasecmp(cur_table_name, table_name) == 0 ||
(cur_alias_name && strcasecmp(cur_alias_name, table_name) == 0)) {
if (strcasecmp(cur_table_name, table_name) == 0) {
if (alias_name) {
foreach (sub_item, cur_table->alias_name) {
char *cur_alias_name = (char *)lfirst(sub_item);
if (strcasecmp(cur_alias_name, alias_name) == 0) {
return cur_table;
}
}
cur_table->alias_name = lappend(cur_table->alias_name, alias_name);
}
return cur_table;
}
foreach (sub_item, cur_table->alias_name) {
char *cur_alias_name = (char *)lfirst(sub_item);
if (strcasecmp(cur_alias_name, table_name) == 0) {
return cur_table;
}
}
}
}
RangeVar* rtable = makeRangeVar(NULL, table_name, -1);
Oid table_oid = RangeVarGetRelid(rtable, NoLock, true);
if (check_relation_type_valid(table_oid) == false) {
ereport(WARNING, (errmsg("can not advise for: %s.", table_name)));
return NULL;
}
// create a new table
TableCell *new_table = NULL;
new_table = (TableCell *)palloc0(sizeof(*new_table));
new_table->table_name = table_name;
new_table->alias_name = alias_name;
new_table->alias_name = NIL;
if (alias_name) {
new_table->alias_name = lappend(new_table->alias_name, alias_name);
}
new_table->index = NIL;
new_table->join_cond = NIL;
new_table->index_print = NIL;

View File

@ -11,5 +11,6 @@ benefit of it for the workload.
## Usage
python index_advisor_workload.py [p PORT] [d DATABASE] [f FILE] [--h HOST] [-U USERNAME] [-W PASSWORD]
[--max_index_num MAX_INDEX_NUM] [--multi_iter_mode]
python index_advisor_workload.py [p PORT] [d DATABASE] [f FILE] [--h HOST] [-U USERNAME] [-W PASSWORD][--schema SCHEMA]
[--max_index_num MAX_INDEX_NUM][--max_index_storage MAX_INDEX_STORAGE] [--multi_iter_mode] [--multi_node]

View File

@ -22,11 +22,18 @@ import re
import shlex
import subprocess
import time
import select
import logging
ENABLE_MULTI_NODE = False
SAMPLE_NUM = 5
MAX_INDEX_COLUMN_NUM = 5
MAX_INDEX_NUM = 10
MAX_INDEX_STORAGE = None
FULL_ARRANGEMENT_THRESHOLD = 20
BASE_CMD = ''
SHARP = '#'
SCHEMA = None
SQL_TYPE = ['select', 'delete', 'insert', 'update']
SQL_PATTERN = [r'([^\\])\'((\')|(.*?([^\\])\'))',
r'([^\\])"((")|(.*?([^\\])"))',
@ -34,12 +41,33 @@ SQL_PATTERN = [r'([^\\])\'((\')|(.*?([^\\])\'))',
r'([^a-zA-Z])-?\d+(\.\d+)?',
r'(\'\d+\\.*?\')']
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
def read_input_from_pipe():
"""
Read stdin input if there is "echo 'str1 str2' | python xx.py",
return the input string
"""
input_str = ""
r_handle, _, _ = select.select([sys.stdin], [], [], 0)
if not r_handle:
return ""
for item in r_handle:
if item == sys.stdin:
input_str = sys.stdin.read().strip()
return input_str
class PwdAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
if values is None:
values = getpass.getpass()
setattr(namespace, self.dest, values)
password = read_input_from_pipe()
if password:
logging.warning("Read password from pipe.")
else:
password = getpass.getpass("Password for database user:")
setattr(namespace, self.dest, password)
class QueryItem:
@ -55,17 +83,14 @@ class IndexItem:
self.table = tbl
self.columns = cols
self.benefit = 0
self.storage = 0
def get_file_size(filename):
if os.path.isfile(filename):
return os.stat(filename).st_size
else:
return -1
def run_shell_cmd(target_sql_list):
cmd = BASE_CMD + ' -c \"'
if SCHEMA:
cmd += 'set current_schema = %s; ' % SCHEMA
for target_sql in target_sql_list:
cmd += target_sql + ';'
cmd += '\"'
@ -73,8 +98,8 @@ def run_shell_cmd(target_sql_list):
(stdout, stderr) = proc.communicate()
stdout, stderr = stdout.decode(), stderr.decode()
if 'gsql' in stderr or 'failed to connect' in stderr:
raise ConnectionError("An error occurred while connecting to the database.\n" + "Details: " + stderr)
raise ConnectionError("An error occurred while connecting to the database.\n"
+ "Details: " + stderr)
return stdout
@ -84,26 +109,34 @@ def run_shell_sql_cmd(sql_file):
proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
stdout, stderr = stdout.decode(), stderr.decode()
if stderr:
print(stderr)
return stdout
def print_header_boundary(header):
term_width = int(os.get_terminal_size().columns / 2)
side_width = (term_width - len(header))
def green(text):
return '\033[32m%s\033[0m' % text
print('#' * side_width + header + '#' * side_width)
def print_header_boundary(header):
# Output a header first, which looks more beautiful.
try:
term_width = os.get_terminal_size().columns
# The width of each of the two sides of the terminal.
side_width = (term_width - len(header)) // 2
except (AttributeError, OSError):
side_width = 0
title = SHARP * side_width + header + SHARP * side_width
print(green(title))
def load_workload(file_path):
wd_dict = {}
workload = []
with open(file_path, 'r') as f:
for sql in f.readlines():
sql = sql.strip('\n;')
with open(file_path, 'r') as file:
raw_text = ''.join(file.readlines())
sqls = raw_text.split(';')
for sql in sqls:
if any(tp in sql.lower() for tp in SQL_TYPE):
if sql not in wd_dict.keys():
wd_dict[sql] = 1
@ -133,7 +166,8 @@ def get_workload_template(workload):
templates[sql_template]['samples'].append(item.statement)
else:
if random.randint(0, templates[sql_template]['cnt']) < SAMPLE_NUM:
templates[sql_template]['samples'][random.randint(0, SAMPLE_NUM - 1)] = item.statement
templates[sql_template]['samples'][random.randint(0, SAMPLE_NUM - 1)] = \
item.statement
return templates
@ -144,9 +178,9 @@ def workload_compression(input_path):
workload = load_workload(input_path)
templates = get_workload_template(workload)
for key in templates.keys():
for sql in templates[key]['samples']:
compressed_workload.append(QueryItem(sql, templates[key]['cnt'] / SAMPLE_NUM))
for _, elem in templates.items():
for sql in elem['samples']:
compressed_workload.append(QueryItem(sql, elem['cnt'] / SAMPLE_NUM))
return compressed_workload
@ -168,22 +202,33 @@ def parse_explain_plan(plan):
return cost_total
def update_index_storage(res, index_config, hypo_index_num):
index_id = re.findall(r'<\d+>', res)[0].strip('<>')
index_size_sql = 'select * from hypopg_estimate_size(%s);' % index_id
res = run_shell_cmd([index_size_sql]).split('\n')
for item in res:
if re.match(r'\d+', item.strip()):
index_config[hypo_index_num].storage = float(item.strip()) / 1024 / 1024
def estimate_workload_cost_file(workload, index_config=None):
total_cost = 0
sql_file = str(time.time()) + '.sql'
found_plan = False
with open(sql_file, 'w') as wf:
hypo_index = False
with open(sql_file, 'w') as file:
if index_config:
# create hypo-indexes
wf.write('SET enable_hypo_index = on;\n')
if SCHEMA:
file.write('SET current_schema = %s;\n' % SCHEMA)
file.write('SET enable_hypo_index = on;\n')
for index in index_config:
wf.write('SELECT hypopg_create_index(\'CREATE INDEX ON ' + index.table
+ '(' + index.columns + ')\');\n')
file.write("SELECT hypopg_create_index('CREATE INDEX ON %s(%s)');\n" %
(index.table, index.columns))
if ENABLE_MULTI_NODE:
file.write('set enable_fast_query_shipping = off;\n')
for query in workload:
wf.write('EXPLAIN ' + query.statement + ';\n')
if index_config:
wf.write('SELECT hypopg_reset_index();')
file.write('EXPLAIN ' + query.statement + ';\n')
result = run_shell_sql_cmd(sql_file).split('\n')
if os.path.exists(sql_file):
@ -191,11 +236,14 @@ def estimate_workload_cost_file(workload, index_config=None):
# parse the result of explain plans
i = 0
hypo_index_num = 0
for line in result:
if 'QUERY PLAN' in line:
found_plan = True
if 'ERROR' in line:
workload.pop(i)
if 'hypopg_create_index' in line:
hypo_index = True
if found_plan and '(cost=' in line:
if i >= len(workload):
raise ValueError("The size of workload is not correct!")
@ -205,19 +253,26 @@ def estimate_workload_cost_file(workload, index_config=None):
total_cost += query_cost
found_plan = False
i += 1
if hypo_index:
if 'btree' in line and MAX_INDEX_STORAGE:
hypo_index = False
update_index_storage(line, index_config, hypo_index_num)
hypo_index_num += 1
while i < len(workload):
workload[i].cost_list.append(0)
i += 1
if index_config:
run_shell_cmd(['SELECT hypopg_reset_index();'])
return total_cost
def make_single_advisor_sql(ori_sql):
sql = 'select gs_index_advise(\''
for ch in ori_sql:
if ch == '\'':
for elem in ori_sql:
if elem == '\'':
sql += '\''
sql += ch
sql += elem
sql += '\');'
return sql
@ -225,7 +280,7 @@ def make_single_advisor_sql(ori_sql):
def parse_single_advisor_result(res):
table_index_dict = {}
if len(res) > 2 and res[0:2] == (' ('):
if len(res) > 2 and res[0:2] == ' (':
items = res.split(',', 1)
table = items[0][2:]
indexes = re.split('[()]', items[1][:-1].strip('\"'))
@ -263,10 +318,13 @@ def query_index_check(query, query_index_dict):
# create hypo-indexes
sql_list = ['SET enable_hypo_index = on;']
if ENABLE_MULTI_NODE:
sql_list.append('SET enable_fast_query_shipping = off;')
for table in query_index_dict.keys():
for columns in query_index_dict[table]:
if columns != '':
sql_list.append('SELECT hypopg_create_index(\'CREATE INDEX ON ' + table + '(' + columns + ')\')')
sql_list.append("SELECT hypopg_create_index('CREATE INDEX ON %s(%s)')" %
(table, columns))
sql_list.append('explain ' + query)
sql_list.append('SELECT hypopg_reset_index()')
result = run_shell_cmd(sql_list).split('\n')
@ -289,6 +347,7 @@ def query_index_check(query, query_index_dict):
continue
if table not in valid_indexes.keys():
valid_indexes[table] = []
if columns not in valid_indexes[table]:
valid_indexes[table].append(columns)
return valid_indexes
@ -342,6 +401,8 @@ def generate_candidate_indexes(workload, iterate=False):
if table not in index_dict.keys():
index_dict[table] = set()
for columns in valid_index_dict[table]:
if len(workload[k].valid_index_list) >= FULL_ARRANGEMENT_THRESHOLD:
break
workload[k].valid_index_list.append(IndexItem(table, columns))
if columns not in index_dict[table]:
print("table: ", table, "columns: ", columns)
@ -358,10 +419,12 @@ def generate_candidate_indexes_file(workload):
if len(workload) > 0:
run_shell_cmd([workload[0].statement])
with open(sql_file, 'w') as wf:
with open(sql_file, 'w') as file:
if SCHEMA:
file.write('SET current_schema = %s;\n' % SCHEMA)
for query in workload:
if 'select' in query.statement.lower():
wf.write(make_single_advisor_sql(query.statement) + '\n')
file.write(make_single_advisor_sql(query.statement) + '\n')
result = run_shell_sql_cmd(sql_file).split('\n')
if os.path.exists(sql_file):
@ -370,16 +433,16 @@ def generate_candidate_indexes_file(workload):
for line in result:
table_index_dict = parse_single_advisor_result(line.strip('\n'))
# filter duplicate indexes
for table in table_index_dict.keys():
for table, columns in table_index_dict.items():
if table not in index_dict.keys():
index_dict[table] = set()
for columns in table_index_dict[table]:
if columns == "":
for column in columns:
if column == "":
continue
if columns not in index_dict[table]:
print("table: ", table, "columns: ", columns)
index_dict[table].add(columns)
candidate_indexes.append(IndexItem(table, columns))
if column not in index_dict[table]:
print("table: ", table, "columns: ", column)
index_dict[table].add(column)
candidate_indexes.append(IndexItem(table, column))
return candidate_indexes
@ -454,6 +517,7 @@ def find_subsets_num(config, atomic_config_total):
is_exist = False
for index in config:
if atomic_index.table == index.table and atomic_index.columns == index.columns:
index.storage = atomic_index.storage
is_exist = True
break
if not is_exist:
@ -481,24 +545,24 @@ def infer_workload_cost(workload, config, atomic_config_total):
if len(atomic_subsets_num) == 0:
raise ValueError("No atomic configs found for current config!")
for i in range(len(workload)):
if max(atomic_subsets_num) >= len(workload[i].cost_list):
for ind, obj in enumerate(workload):
if max(atomic_subsets_num) >= len(obj.cost_list):
raise ValueError("Wrong atomic config for current query!")
# compute the cost for selection
min_cost = sys.maxsize
for num in atomic_subsets_num:
if num < len(workload[i].cost_list) and workload[i].cost_list[num] < min_cost:
min_cost = workload[i].cost_list[num]
if num < len(obj.cost_list) and obj.cost_list[num] < min_cost:
min_cost = obj.cost_list[num]
total_cost += min_cost
# compute the cost for updating indexes
if 'insert' in workload[i].statement.lower() or 'delete' in workload[i].statement.lower():
if 'insert' in obj.statement.lower() or 'delete' in obj.statement.lower():
for index in config:
index_num = get_index_num(index, atomic_config_total)
if index_num == -1:
raise ValueError("The index isn't found for current query!")
if 0 <= index_num < len(workload[i].cost_list):
total_cost += workload[i].cost_list[index_num] - workload[i].cost_list[0]
if 0 <= index_num < len(workload[ind].cost_list):
total_cost += obj.cost_list[index_num] - obj.cost_list[0]
return total_cost
@ -513,9 +577,9 @@ def simple_index_advisor(input_path):
print_header_boundary(" Determine optimal indexes ")
ori_total_cost = estimate_workload_cost_file(workload)
for i in range(len(candidate_indexes)):
new_total_cost = estimate_workload_cost_file(workload, [candidate_indexes[i]])
candidate_indexes[i].benefit = ori_total_cost - new_total_cost
for _, obj in enumerate(candidate_indexes):
new_total_cost = estimate_workload_cost_file(workload, [obj])
obj.benefit = ori_total_cost - new_total_cost
candidate_indexes = sorted(candidate_indexes, key=lambda item: item.benefit, reverse=True)
# filter out duplicate index
@ -537,11 +601,18 @@ def simple_index_advisor(input_path):
final_index_set[index.table].append(index)
cnt = 0
for table in final_index_set.keys():
for index in final_index_set[table]:
index_current_storage = 0
for table, indexs in final_index_set.items():
for index in indexs:
if cnt == MAX_INDEX_NUM:
break
if MAX_INDEX_STORAGE and (index_current_storage + index.storage) > MAX_INDEX_STORAGE:
continue
index_current_storage += index.storage
print("create index ind" + str(cnt) + " on " + table + "(" + index.columns + ");")
cnt += 1
if cnt == MAX_INDEX_NUM:
else:
continue
break
@ -549,8 +620,9 @@ def greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes
opt_config = []
index_num_record = set()
min_cost = sys.maxsize
while len(opt_config) < MAX_INDEX_NUM:
for i in range(len(candidate_indexes)):
if i == 1 and min_cost == sys.maxsize:
break
cur_min_cost = sys.maxsize
cur_index = None
cur_index_num = -1
@ -565,6 +637,11 @@ def greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes
cur_index = index
cur_index_num = k
if cur_index and cur_min_cost < min_cost:
if MAX_INDEX_STORAGE and sum([obj.storage for obj in opt_config]) + \
cur_index.storage > MAX_INDEX_STORAGE:
continue
if len(opt_config) == MAX_INDEX_NUM:
break
min_cost = cur_min_cost
opt_config.append(cur_index)
index_num_record.add(cur_index_num)
@ -593,11 +670,15 @@ def complex_index_advisor(input_path):
opt_config = greedy_determine_opt_config(workload, atomic_config_total, candidate_indexes)
cnt = 0
index_current_storage = 0
for index in opt_config:
print("create index ind" + str(cnt) + " on " + index.table + "(" + index.columns + ");")
cnt += 1
if MAX_INDEX_STORAGE and (index_current_storage + index.storage) > MAX_INDEX_STORAGE:
continue
if cnt == MAX_INDEX_NUM:
break
index_current_storage += index.storage
print("create index ind" + str(cnt) + " on " + index.table + "(" + index.columns + ");")
cnt += 1
def main():
@ -606,22 +687,38 @@ def main():
arg_parser.add_argument("d", help="Name of database")
arg_parser.add_argument("--h", help="Host for database")
arg_parser.add_argument("-U", help="Username for database log-in")
arg_parser.add_argument("-W", help="Password for database user", action=PwdAction)
arg_parser.add_argument("-W", help="Password for database user", nargs="?", action=PwdAction)
arg_parser.add_argument("f", help="File containing workload queries (One query per line)")
arg_parser.add_argument("--schema", help="Schema name for the current business data")
arg_parser.add_argument("--max_index_num", help="Maximum number of suggested indexes", type=int)
arg_parser.add_argument("--multi_iter_mode", action='store_true', help="Whether to use multi-iteration algorithm",
default=False)
arg_parser.add_argument("--max_index_storage",
help="Maximum storage of suggested indexes/MB", type=int)
arg_parser.add_argument("--multi_iter_mode", action='store_true',
help="Whether to use multi-iteration algorithm", default=False)
arg_parser.add_argument("--multi_node", action='store_true',
help="Whether to support distributed scenarios", default=False)
args = arg_parser.parse_args()
global MAX_INDEX_NUM, BASE_CMD
global MAX_INDEX_NUM, BASE_CMD, ENABLE_MULTI_NODE, MAX_INDEX_STORAGE, SCHEMA
if args.max_index_num is not None and args.max_index_num <= 0:
raise argparse.ArgumentTypeError("%s is an invalid positive int value" %
args.max_index_num)
if args.max_index_storage is not None and args.max_index_storage <= 0:
raise argparse.ArgumentTypeError("%s is an invalid positive int value" %
args.max_index_storage)
if args.schema:
SCHEMA = args.schema
MAX_INDEX_NUM = args.max_index_num or 10
ENABLE_MULTI_NODE = args.multi_node
MAX_INDEX_STORAGE = args.max_index_storage
BASE_CMD = 'gsql -p ' + args.p + ' -d ' + args.d
if args.h:
BASE_CMD += ' -h ' + args.h
if args.U:
BASE_CMD += ' -U ' + args.U
if args.U != getpass.getuser() and not args.W:
raise ValueError('Enter the \'-W\' parameter for user ' + args.U + ' when executing the script.')
raise ValueError('Enter the \'-W\' parameter for user '
+ args.U + ' when executing the script.')
if args.W:
BASE_CMD += ' -W ' + args.W