Implement collecting behaviour in qc_sqlite

By default, only the essentials - the type and the operation - of
a statement will be collected and only if fields, tables, functions
and databases are explicitly asked for, will they be collected.
However, a statement will be parsed at most twice; if parsing is
needed a second time then all information will be collected.

If it is known that some particular information is needed, then
qc_parse() can be called explicitly to ensure it is collected
at first parsing.
This commit is contained in:
Johan Wikman 2017-03-15 17:08:57 +02:00
parent d0a9571da0
commit d70dad260a
3 changed files with 206 additions and 100 deletions

View File

@ -34,10 +34,11 @@ typedef enum qc_init_kind
*/
typedef enum qc_collect_info
{
QC_COLLECT_TABLES = 0x01, /*< Collect table names. */
QC_COLLECT_DATABASES = 0x02, /*< Collect database names. */
QC_COLLECT_FIELDS = 0x04, /*< Collect field information. */
QC_COLLECT_FUNCTIONS = 0x08, /*< Collect function information. */
QC_COLLECT_ESSENTIALS = 0x00, /*< Collect only the base minimum. */
QC_COLLECT_TABLES = 0x01, /*< Collect table names. */
QC_COLLECT_DATABASES = 0x02, /*< Collect database names. */
QC_COLLECT_FIELDS = 0x04, /*< Collect field information. */
QC_COLLECT_FUNCTIONS = 0x08, /*< Collect function information. */
QC_COLLECT_ALL = (QC_COLLECT_TABLES|QC_COLLECT_DATABASES|QC_COLLECT_FIELDS|QC_COLLECT_FUNCTIONS)
} qc_collect_info_t;

View File

@ -55,6 +55,8 @@ static inline bool qc_info_was_parsed(qc_parse_result_t status)
typedef struct qc_sqlite_info
{
qc_parse_result_t status; // The validity of the information in this structure.
uint32_t collect; // What information should be collected.
uint32_t collected; // What information has been collected.
const char* query; // The query passed to sqlite.
size_t query_len; // The length of the query.
@ -128,18 +130,18 @@ typedef enum qc_token_position
static void buffer_object_free(void* data);
static char** copy_string_array(char** strings, int* pn);
static void enlarge_string_array(size_t n, size_t len, char*** ppzStrings, size_t* pCapacity);
static bool ensure_query_is_parsed(GWBUF* query);
static bool ensure_query_is_parsed(GWBUF* query, uint32_t collect);
static void free_field_infos(QC_FIELD_INFO* infos, size_t n_infos);
static void free_string_array(char** sa);
static QC_SQLITE_INFO* get_query_info(GWBUF* query);
static QC_SQLITE_INFO* info_alloc(void);
static QC_SQLITE_INFO* get_query_info(GWBUF* query, uint32_t collect);
static QC_SQLITE_INFO* info_alloc(uint32_t collect);
static void info_finish(QC_SQLITE_INFO* info);
static void info_free(QC_SQLITE_INFO* info);
static QC_SQLITE_INFO* info_init(QC_SQLITE_INFO* info);
static QC_SQLITE_INFO* info_init(QC_SQLITE_INFO* info, uint32_t collect);
static void log_invalid_data(GWBUF* query, const char* message);
static bool parse_query(GWBUF* query);
static bool parse_query(GWBUF* query, uint32_t collect);
static void parse_query_string(const char* query, size_t len);
static bool query_is_parsed(GWBUF* query);
static bool query_is_parsed(GWBUF* query, uint32_t collect);
static bool should_exclude(const char* zName, const ExprList* pExclude);
static void update_field_info(QC_SQLITE_INFO* info,
const char* database,
@ -259,13 +261,13 @@ static void enlarge_string_array(size_t n, size_t len, char*** ppzStrings, size_
}
}
static bool ensure_query_is_parsed(GWBUF* query)
static bool ensure_query_is_parsed(GWBUF* query, uint32_t collect)
{
bool parsed = query_is_parsed(query);
bool parsed = query_is_parsed(query, collect);
if (!parsed)
{
parsed = parse_query(query);
parsed = parse_query(query, collect);
}
return parsed;
@ -315,11 +317,11 @@ static void free_string_array(char** sa)
}
}
static QC_SQLITE_INFO* get_query_info(GWBUF* query)
static QC_SQLITE_INFO* get_query_info(GWBUF* query, uint32_t collect)
{
QC_SQLITE_INFO* info = NULL;
if (ensure_query_is_parsed(query))
if (ensure_query_is_parsed(query, collect))
{
info = (QC_SQLITE_INFO*) gwbuf_get_buffer_object_data(query, GWBUF_PARSING_INFO);
ss_dassert(info);
@ -328,12 +330,12 @@ static QC_SQLITE_INFO* get_query_info(GWBUF* query)
return info;
}
static QC_SQLITE_INFO* info_alloc(void)
static QC_SQLITE_INFO* info_alloc(uint32_t collect)
{
QC_SQLITE_INFO* info = MXS_MALLOC(sizeof(*info));
MXS_ABORT_IF_NULL(info);
info_init(info);
info_init(info, collect);
return info;
}
@ -359,11 +361,13 @@ static void info_free(QC_SQLITE_INFO* info)
}
}
static QC_SQLITE_INFO* info_init(QC_SQLITE_INFO* info)
static QC_SQLITE_INFO* info_init(QC_SQLITE_INFO* info, uint32_t collect)
{
memset(info, 0, sizeof(*info));
info->status = QC_QUERY_INVALID;
info->collect = collect;
info->collected = 0;
info->type_mask = QUERY_TYPE_UNKNOWN;
info->operation = QUERY_OP_UNDEFINED;
@ -495,10 +499,10 @@ static void parse_query_string(const char* query, size_t len)
}
}
static bool parse_query(GWBUF* query)
static bool parse_query(GWBUF* query, uint32_t collect)
{
bool parsed = false;
ss_dassert(!query_is_parsed(query));
ss_dassert(!query_is_parsed(query, collect));
if (GWBUF_IS_CONTIGUOUS(query))
{
@ -511,7 +515,29 @@ static bool parse_query(GWBUF* query)
if ((command == MYSQL_COM_QUERY) || (command == MYSQL_COM_STMT_PREPARE))
{
QC_SQLITE_INFO* info = info_alloc();
QC_SQLITE_INFO* info =
(QC_SQLITE_INFO*) gwbuf_get_buffer_object_data(query, GWBUF_PARSING_INFO);
if (info)
{
ss_dassert((~info->collect & collect) != 0);
ss_dassert((~info->collected & collect) != 0);
// If we get here, then the statement has been parsed once, but
// not all needed was collected. Now we turn on all blinkelichts to
// ensure that a statement is parsed at most twice.
info->collect = QC_COLLECT_ALL;
}
else
{
info = info_alloc(collect);
if (info)
{
// TODO: Add return value to gwbuf_add_buffer_object.
gwbuf_add_buffer_object(query, GWBUF_PARSING_INFO, info, buffer_object_free);
}
}
if (info)
{
@ -532,10 +558,8 @@ static bool parse_query(GWBUF* query)
info->type_mask |= QUERY_TYPE_PREPARE_STMT;
}
// TODO: Add return value to gwbuf_add_buffer_object.
// Always added; also when it was not recognized. If it was not recognized now,
// it won't be if we try a second time.
gwbuf_add_buffer_object(query, GWBUF_PARSING_INFO, info, buffer_object_free);
info->collected = info->collect;
parsed = true;
this_thread.info = NULL;
@ -566,9 +590,24 @@ static bool parse_query(GWBUF* query)
return parsed;
}
static bool query_is_parsed(GWBUF* query)
static bool query_is_parsed(GWBUF* query, uint32_t collect)
{
return query && GWBUF_IS_PARSED(query);
bool rc = query && GWBUF_IS_PARSED(query);
if (rc)
{
QC_SQLITE_INFO* info = (QC_SQLITE_INFO*) gwbuf_get_buffer_object_data(query, GWBUF_PARSING_INFO);
ss_dassert(info);
if ((~info->collected & collect) != 0)
{
// The statement has been parsed once, but the needed information
// was not collected at that time.
rc = false;
}
}
return rc;
}
/**
@ -652,6 +691,13 @@ static void update_field_info(QC_SQLITE_INFO* info,
{
ss_dassert(column);
if (!(info->collect & QC_COLLECT_FIELDS) || (info->collected & QC_COLLECT_FIELDS))
{
// If field information should not be collected, or if field information
// has already been collected, we just return.
return;
}
QC_FIELD_INFO item = { (char*)database, (char*)table, (char*)column, usage };
int i;
@ -737,6 +783,13 @@ static void update_function_info(QC_SQLITE_INFO* info,
{
ss_dassert(name);
if (!(info->collect & QC_COLLECT_FUNCTIONS) || (info->collected & QC_COLLECT_FUNCTIONS))
{
// If function information should not be collected, or if function information
// has already been collected, we just return.
return;
}
QC_FUNCTION_INFO item = { (char*)name, usage };
int i;
@ -1220,37 +1273,46 @@ static void update_database_names(QC_SQLITE_INFO* info, const char* zDatabase)
static void update_names(QC_SQLITE_INFO* info, const char* zDatabase, const char* zTable)
{
char* zCopy = MXS_STRDUP(zTable);
MXS_ABORT_IF_NULL(zCopy);
// TODO: Is this call really needed. Check also sqlite3Dequote.
exposed_sqlite3Dequote(zCopy);
enlarge_string_array(1, info->table_names_len, &info->table_names, &info->table_names_capacity);
info->table_names[info->table_names_len++] = zCopy;
info->table_names[info->table_names_len] = NULL;
if (zDatabase)
if ((info->collect & QC_COLLECT_TABLES) && !(info->collected & QC_COLLECT_TABLES))
{
zCopy = MXS_MALLOC(strlen(zDatabase) + 1 + strlen(zTable) + 1);
char* zCopy = MXS_STRDUP(zTable);
MXS_ABORT_IF_NULL(zCopy);
strcpy(zCopy, zDatabase);
strcat(zCopy, ".");
strcat(zCopy, zTable);
// TODO: Is this call really needed. Check also sqlite3Dequote.
exposed_sqlite3Dequote(zCopy);
update_database_names(info, zDatabase);
}
else
{
zCopy = MXS_STRDUP(zCopy);
MXS_ABORT_IF_NULL(zCopy);
enlarge_string_array(1, info->table_names_len, &info->table_names, &info->table_names_capacity);
info->table_names[info->table_names_len++] = zCopy;
info->table_names[info->table_names_len] = NULL;
if (zDatabase)
{
zCopy = MXS_MALLOC(strlen(zDatabase) + 1 + strlen(zTable) + 1);
MXS_ABORT_IF_NULL(zCopy);
strcpy(zCopy, zDatabase);
strcat(zCopy, ".");
strcat(zCopy, zTable);
exposed_sqlite3Dequote(zCopy);
}
else
{
zCopy = MXS_STRDUP(zCopy);
MXS_ABORT_IF_NULL(zCopy);
}
enlarge_string_array(1, info->table_fullnames_len,
&info->table_fullnames, &info->table_fullnames_capacity);
info->table_fullnames[info->table_fullnames_len++] = zCopy;
info->table_fullnames[info->table_fullnames_len] = NULL;
}
enlarge_string_array(1, info->table_fullnames_len,
&info->table_fullnames, &info->table_fullnames_capacity);
info->table_fullnames[info->table_fullnames_len++] = zCopy;
info->table_fullnames[info->table_fullnames_len] = NULL;
if ((info->collect & QC_COLLECT_DATABASES) && !(info->collected & QC_COLLECT_DATABASES))
{
if (zDatabase)
{
update_database_names(info, zDatabase);
}
}
}
static void update_names_from_srclist(QC_SQLITE_INFO* info, const SrcList* pSrc)
@ -1736,8 +1798,21 @@ void mxs_sqlite3StartTable(Parse *pParse, /* Parser context */
update_names(info, NULL, name);
}
info->created_table_name = MXS_STRDUP(info->table_names[0]);
MXS_ABORT_IF_NULL(info->created_table_name);
if (info->collect & QC_COLLECT_TABLES)
{
// If information is collected in several passes, then we may
// this information already.
if (!info->created_table_name)
{
info->created_table_name = MXS_STRDUP(info->table_names[0]);
MXS_ABORT_IF_NULL(info->created_table_name);
}
else
{
ss_dassert(info->collect != info->collected);
ss_dassert(strcmp(info->created_table_name, info->table_names[0]) == 0);
}
}
}
else
{
@ -1899,11 +1974,21 @@ void maxscaleDeallocate(Parse* pParse, Token* pName)
info->status = QC_QUERY_PARSED;
info->type_mask = QUERY_TYPE_WRITE;
info->prepare_name = MXS_MALLOC(pName->n + 1);
if (info->prepare_name)
// If information is collected in several passes, then we may
// this information already.
if (!info->prepare_name)
{
memcpy(info->prepare_name, pName->z, pName->n);
info->prepare_name[pName->n] = 0;
info->prepare_name = MXS_MALLOC(pName->n + 1);
if (info->prepare_name)
{
memcpy(info->prepare_name, pName->z, pName->n);
info->prepare_name[pName->n] = 0;
}
}
else
{
ss_dassert(info->collect != info->collected);
ss_dassert(strncmp(info->prepare_name, pName->z, pName->n) == 0);
}
}
@ -1942,11 +2027,21 @@ void maxscaleExecute(Parse* pParse, Token* pName)
info->status = QC_QUERY_PARSED;
info->type_mask = QUERY_TYPE_WRITE;
info->prepare_name = MXS_MALLOC(pName->n + 1);
if (info->prepare_name)
// If information is collected in several passes, then we may
// this information already.
if (!info->prepare_name)
{
memcpy(info->prepare_name, pName->z, pName->n);
info->prepare_name[pName->n] = 0;
info->prepare_name = MXS_MALLOC(pName->n + 1);
if (info->prepare_name)
{
memcpy(info->prepare_name, pName->z, pName->n);
info->prepare_name[pName->n] = 0;
}
}
else
{
ss_dassert(info->collect != info->collected);
ss_dassert(strncmp(info->prepare_name, pName->z, pName->n) == 0);
}
}
@ -2312,32 +2407,42 @@ void maxscalePrepare(Parse* pParse, Token* pName, Token* pStmt)
info->status = QC_QUERY_PARSED;
info->type_mask = QUERY_TYPE_PREPARE_NAMED_STMT;
info->prepare_name = MXS_MALLOC(pName->n + 1);
if (info->prepare_name)
// If information is collected in several passes, then we may
// this information already.
if (!info->prepare_name)
{
memcpy(info->prepare_name, pName->z, pName->n);
info->prepare_name[pName->n] = 0;
info->prepare_name = MXS_MALLOC(pName->n + 1);
if (info->prepare_name)
{
memcpy(info->prepare_name, pName->z, pName->n);
info->prepare_name[pName->n] = 0;
}
size_t preparable_stmt_len = pStmt->n - 2;
size_t payload_len = 1 + preparable_stmt_len;
size_t packet_len = MYSQL_HEADER_LEN + payload_len;
info->preparable_stmt = gwbuf_alloc(packet_len);
if (info->preparable_stmt)
{
uint8_t* ptr = GWBUF_DATA(info->preparable_stmt);
// Payload length
*ptr++ = payload_len;
*ptr++ = (payload_len >> 8);
*ptr++ = (payload_len >> 16);
// Sequence id
*ptr++ = 0x00;
// Command
*ptr++ = MYSQL_COM_QUERY;
memcpy(ptr, pStmt->z + 1, pStmt->n - 2);
}
}
size_t preparable_stmt_len = pStmt->n - 2;
size_t payload_len = 1 + preparable_stmt_len;
size_t packet_len = MYSQL_HEADER_LEN + payload_len;
info->preparable_stmt = gwbuf_alloc(packet_len);
if (info->preparable_stmt)
else
{
uint8_t* ptr = GWBUF_DATA(info->preparable_stmt);
// Payload length
*ptr++ = payload_len;
*ptr++ = (payload_len >> 8);
*ptr++ = (payload_len >> 16);
// Sequence id
*ptr++ = 0x00;
// Command
*ptr++ = MYSQL_COM_QUERY;
memcpy(ptr, pStmt->z + 1, pStmt->n - 2);
ss_dassert(info->collect != info->collected);
ss_dassert(strncmp(info->prepare_name, pName->z, pName->n) == 0);
}
}
@ -2952,7 +3057,7 @@ static int32_t qc_sqlite_thread_init(void)
MXS_INFO("In-memory sqlite database successfully opened for thread %lu.",
(unsigned long) pthread_self());
QC_SQLITE_INFO* info = info_alloc();
QC_SQLITE_INFO* info = info_alloc(QC_COLLECT_ALL);
if (info)
{
@ -3016,7 +3121,7 @@ static int32_t qc_sqlite_parse(GWBUF* query, uint32_t collect, int32_t* result)
ss_dassert(this_unit.initialized);
ss_dassert(this_thread.initialized);
QC_SQLITE_INFO* info = get_query_info(query);
QC_SQLITE_INFO* info = get_query_info(query, collect);
if (info)
{
@ -3038,7 +3143,7 @@ static int32_t qc_sqlite_get_type_mask(GWBUF* query, uint32_t* type_mask)
ss_dassert(this_thread.initialized);
*type_mask = QUERY_TYPE_UNKNOWN;
QC_SQLITE_INFO* info = get_query_info(query);
QC_SQLITE_INFO* info = get_query_info(query, QC_COLLECT_ESSENTIALS);
if (info)
{
@ -3068,7 +3173,7 @@ static int32_t qc_sqlite_get_operation(GWBUF* query, int32_t* op)
ss_dassert(this_thread.initialized);
*op = QUERY_OP_UNDEFINED;
QC_SQLITE_INFO* info = get_query_info(query);
QC_SQLITE_INFO* info = get_query_info(query, QC_COLLECT_ESSENTIALS);
if (info)
{
@ -3098,7 +3203,7 @@ static int32_t qc_sqlite_get_created_table_name(GWBUF* query, char** created_tab
ss_dassert(this_thread.initialized);
*created_table_name = NULL;
QC_SQLITE_INFO* info = get_query_info(query);
QC_SQLITE_INFO* info = get_query_info(query, QC_COLLECT_TABLES);
if (info)
{
@ -3132,7 +3237,7 @@ static int32_t qc_sqlite_is_drop_table_query(GWBUF* query, int32_t* is_drop_tabl
ss_dassert(this_thread.initialized);
*is_drop_table = 0;
QC_SQLITE_INFO* info = get_query_info(query);
QC_SQLITE_INFO* info = get_query_info(query, QC_COLLECT_ESSENTIALS);
if (info)
{
@ -3166,7 +3271,7 @@ static int32_t qc_sqlite_get_table_names(GWBUF* query,
*table_names = NULL;
*tblsize = 0;
QC_SQLITE_INFO* info = get_query_info(query);
QC_SQLITE_INFO* info = get_query_info(query, QC_COLLECT_TABLES);
if (info)
{
@ -3227,7 +3332,7 @@ static int32_t qc_sqlite_query_has_clause(GWBUF* query, int32_t* has_clause)
ss_dassert(this_thread.initialized);
*has_clause = false;
QC_SQLITE_INFO* info = get_query_info(query);
QC_SQLITE_INFO* info = get_query_info(query, QC_COLLECT_ESSENTIALS);
if (info)
{
@ -3258,7 +3363,7 @@ static int32_t qc_sqlite_get_database_names(GWBUF* query, char*** database_names
*database_names = NULL;
*sizep = 0;
QC_SQLITE_INFO* info = get_query_info(query);
QC_SQLITE_INFO* info = get_query_info(query, QC_COLLECT_DATABASES);
if (info)
{
@ -3292,7 +3397,7 @@ static int32_t qc_sqlite_get_prepare_name(GWBUF* query, char** prepare_name)
ss_dassert(this_thread.initialized);
*prepare_name = NULL;
QC_SQLITE_INFO* info = get_query_info(query);
QC_SQLITE_INFO* info = get_query_info(query, QC_COLLECT_ESSENTIALS);
if (info)
{
@ -3328,7 +3433,7 @@ int32_t qc_sqlite_get_field_info(GWBUF* query, const QC_FIELD_INFO** infos, uint
*infos = NULL;
*n_infos = 0;
QC_SQLITE_INFO* info = get_query_info(query);
QC_SQLITE_INFO* info = get_query_info(query, QC_COLLECT_FIELDS);
if (info)
{
@ -3362,7 +3467,7 @@ int32_t qc_sqlite_get_function_info(GWBUF* query, const QC_FUNCTION_INFO** infos
*infos = NULL;
*n_infos = 0;
QC_SQLITE_INFO* info = get_query_info(query);
QC_SQLITE_INFO* info = get_query_info(query, QC_COLLECT_FUNCTIONS);
if (info)
{
@ -3395,7 +3500,7 @@ int32_t qc_sqlite_get_preparable_stmt(GWBUF* stmt, GWBUF** preparable_stmt)
*preparable_stmt = NULL;
QC_SQLITE_INFO* info = get_query_info(stmt);
QC_SQLITE_INFO* info = get_query_info(stmt, QC_COLLECT_ESSENTIALS);
if (info)
{

View File

@ -312,13 +312,13 @@ bool compare_parse(QUERY_CLASSIFIER* pClassifier1, GWBUF* pCopy1,
clock_gettime(CLOCK_MONOTONIC_RAW, &start);
int32_t rv1;
pClassifier1->qc_parse(pCopy1, QC_COLLECT_ALL, &rv1);
pClassifier1->qc_parse(pCopy1, QC_COLLECT_ESSENTIALS, &rv1);
clock_gettime(CLOCK_MONOTONIC_RAW, &finish);
update_time(&global.time1, start, finish);
clock_gettime(CLOCK_MONOTONIC_RAW, &start);
int32_t rv2;
pClassifier2->qc_parse(pCopy2, QC_COLLECT_ALL, &rv2);
pClassifier2->qc_parse(pCopy2, QC_COLLECT_ESSENTIALS, &rv2);
clock_gettime(CLOCK_MONOTONIC_RAW, &finish);
update_time(&global.time2, start, finish);