From 5e8ba4b519d731de64515a1dcecc224b2b2ba6e7 Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Tue, 26 Jan 2016 14:00:32 +0200 Subject: [PATCH] Introduce MySQL-embedded based query classifier plugin. Currently not used, but the existing query_classifier will be changed into being just a wrapper around a dynamically loaded query classifier plugin, i.e. qc_mysqlembedded. --- query_classifier/CMakeLists.txt | 3 + .../qc_mysqlembedded/CMakeLists.txt | 7 + .../qc_mysqlembedded/qc_mysqlembedded.cc | 1865 +++++++++++++++++ .../qc_mysqlembedded/qc_mysqlembedded.map | 9 + 4 files changed, 1884 insertions(+) create mode 100644 query_classifier/qc_mysqlembedded/CMakeLists.txt create mode 100644 query_classifier/qc_mysqlembedded/qc_mysqlembedded.cc create mode 100644 query_classifier/qc_mysqlembedded/qc_mysqlembedded.map diff --git a/query_classifier/CMakeLists.txt b/query_classifier/CMakeLists.txt index d9c6f6b13..4c411eb1e 100644 --- a/query_classifier/CMakeLists.txt +++ b/query_classifier/CMakeLists.txt @@ -5,6 +5,9 @@ set_target_properties(query_classifier PROPERTIES VERSION "1.0.0") set_target_properties(query_classifier PROPERTIES LINK_FLAGS -Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/query_classifier.map) #set_target_properties(query_classifier PROPERTIES LINK_FLAGS -Wl,-z,defs) install(TARGETS query_classifier COMPONENT lib DESTINATION ${MAXSCALE_LIBDIR}) + +add_subdirectory(qc_mysqlembedded) + if(BUILD_TESTS) add_subdirectory(test) endif() diff --git a/query_classifier/qc_mysqlembedded/CMakeLists.txt b/query_classifier/qc_mysqlembedded/CMakeLists.txt new file mode 100644 index 000000000..73bf5f2cf --- /dev/null +++ b/query_classifier/qc_mysqlembedded/CMakeLists.txt @@ -0,0 +1,7 @@ +add_library(qc_mysqlembedded SHARED qc_mysqlembedded.cc) + +target_link_libraries(qc_mysqlembedded ${EMBEDDED_LIB} aio crypt crypto dl m ${PCRE_LINK_FLAGS} ssl stdc++ z) +set_target_properties(qc_mysqlembedded PROPERTIES VERSION "1.0.0") +set_target_properties(qc_mysqlembedded PROPERTIES LINK_FLAGS -Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/qc_mysqlembedded.map) +#set_target_properties(qc_mysqlembedded PROPERTIES LINK_FLAGS -Wl,-z,defs) +install(TARGETS qc_mysqlembedded COMPONENT lib DESTINATION ${MAXSCALE_LIBDIR}) diff --git a/query_classifier/qc_mysqlembedded/qc_mysqlembedded.cc b/query_classifier/qc_mysqlembedded/qc_mysqlembedded.cc new file mode 100644 index 000000000..8c312497b --- /dev/null +++ b/query_classifier/qc_mysqlembedded/qc_mysqlembedded.cc @@ -0,0 +1,1865 @@ +/** + * @section LICENCE + * + * This file is distributed as part of the MariaDB Corporation MaxScale. It is + * free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the + * Free Software Foundation, version 2. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA. + * + * Copyright MariaDB Corporation Ab + * + * @file + * + */ + +#define EMBEDDED_LIBRARY +#define MYSQL_YACC +#define MYSQL_LEX012 +#define MYSQL_SERVER +#if defined(MYSQL_CLIENT) +#undef MYSQL_CLIENT +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +// In client_settings.h mysql_server_init and mysql_server_end are defined to +// mysql_client_plugin_init and mysql_client_plugin_deinit respectively. +// Those must be undefined, so that we here really call mysql_server_[init|end]. +#undef mysql_server_init +#undef mysql_server_end +#include +#include +#include + +#include "../utils/skygw_types.h" +#include "../utils/skygw_debug.h" +#include +#include +#include + +#include +#include +#include +#include + +#define MYSQL_COM_QUERY_HEADER_SIZE 5 /*< 3 bytes size, 1 sequence, 1 command */ +#define MAX_QUERYBUF_SIZE 2048 +typedef struct parsing_info_st +{ +#if defined(SS_DEBUG) + skygw_chk_t pi_chk_top; +#endif + void* pi_handle; /*< parsing info object pointer */ + char* pi_query_plain_str; /*< query as plain string */ + void (*pi_done_fp)(void *); /*< clean-up function for parsing info */ +#if defined(SS_DEBUG) + skygw_chk_t pi_chk_tail; +#endif +} parsing_info_t; + +#define QTYPE_LESS_RESTRICTIVE_THAN_WRITE(t) (tpi_handle; + + /** Find out the query type */ + if (mysql != NULL) + { + qtype = resolve_query_type((THD *) mysql->thd); + } + } + } + +retblock: + return qtype; +} + +/** + * Create parsing info and try to parse the query included in the query buffer. + * Store pointer to created parse_tree_t object to buffer. + * + * @param querybuf buffer including the query and possibly the parsing information + * + * @return true if succeed, false otherwise + */ +static bool parse_query(GWBUF* querybuf) +{ + bool succp; + THD* thd; + uint8_t* data; + size_t len; + char* query_str = NULL; + parsing_info_t* pi; + + CHK_GWBUF(querybuf); + /** Do not parse without releasing previous parse info first */ + ss_dassert(!query_is_parsed(querybuf)); + + if (querybuf == NULL || query_is_parsed(querybuf)) + { + return false; + } + + /** Create parsing info */ + pi = parsing_info_init(parsing_info_done); + + if (pi == NULL) + { + succp = false; + goto retblock; + } + + /** Extract query and copy it to different buffer */ + data = (uint8_t*) GWBUF_DATA(querybuf); + len = MYSQL_GET_PACKET_LEN(data) - 1; /*< distract 1 for packet type byte */ + + + if (len < 1 || len >= ~((size_t) 0) - 1 || (query_str = (char *) malloc(len + 1)) == NULL) + { + /** Free parsing info data */ + parsing_info_done(pi); + succp = false; + goto retblock; + } + + memcpy(query_str, &data[5], len); + memset(&query_str[len], 0, 1); + parsing_info_set_plain_str(pi, query_str); + + /** Get one or create new THD object to be use in parsing */ + thd = get_or_create_thd_for_parsing((MYSQL *) pi->pi_handle, query_str); + + if (thd == NULL) + { + /** Free parsing info data */ + parsing_info_done(pi); + succp = false; + goto retblock; + } + + /** + * Create parse_tree inside thd. + * thd and lex are readable even if creating parse tree fails. + */ + create_parse_tree(thd); + /** Add complete parsing info struct to the query buffer */ + gwbuf_add_buffer_object(querybuf, + GWBUF_PARSING_INFO, + (void *) pi, + parsing_info_done); + + succp = true; +retblock: + return succp; +} + +/** + * If buffer has non-NULL gwbuf_parsing_info it is parsed and it has parsing + * information included. + * + * @param buf buffer being examined + * + * @return true or false + */ +static bool query_is_parsed(GWBUF* buf) +{ + CHK_GWBUF(buf); + return (buf != NULL && GWBUF_IS_PARSED(buf)); +} + +/** + * Create a thread context, thd, init embedded server, connect to it, and allocate + * query to thd. + * + * Parameters: + * @param mysql Database handle + * + * @param query_str Query in plain txt string + * + * @return Thread context pointer + * + */ +static THD* get_or_create_thd_for_parsing(MYSQL* mysql, char* query_str) +{ + THD* thd = NULL; + unsigned long client_flags; + char* db = mysql->options.db; + bool failp = FALSE; + size_t query_len; + + ss_info_dassert(mysql != NULL, ("mysql is NULL")); + ss_info_dassert(query_str != NULL, ("query_str is NULL")); + + query_len = strlen(query_str); + client_flags = set_client_flags(mysql); + + /** Get THD. + * NOTE: Instead of creating new every time, THD instance could + * be get from a pool of them. + */ + thd = (THD *) create_embedded_thd(client_flags); + + if (thd == NULL) + { + MXS_ERROR("Failed to create thread context for parsing."); + goto return_thd; + } + + mysql->thd = thd; + init_embedded_mysql(mysql, client_flags); + failp = check_embedded_connection(mysql, db); + + if (failp) + { + MXS_ERROR("Call to check_embedded_connection failed."); + goto return_err_with_thd; + } + + thd->clear_data_list(); + + /** Check that we are calling the client functions in right order */ + if (mysql->status != MYSQL_STATUS_READY) + { + set_mysql_error(mysql, CR_COMMANDS_OUT_OF_SYNC, unknown_sqlstate); + MXS_ERROR("Invalid status %d in embedded server.", + mysql->status); + goto return_err_with_thd; + } + + /** Clear result variables */ + thd->current_stmt = NULL; + thd->store_globals(); + /** + * We have to call free_old_query before we start to fill mysql->fields + * for new query. In the case of embedded server we collect field data + * during query execution (not during data retrieval as it is in remote + * client). So we have to call free_old_query here + */ + free_old_query(mysql); + thd->extra_length = query_len; + thd->extra_data = query_str; + alloc_query(thd, query_str, query_len); + goto return_thd; + +return_err_with_thd: + (*mysql->methods->free_embedded_thd)(mysql); + thd = 0; + mysql->thd = 0; +return_thd: + return thd; +} + +/** + * @node Set client flags. This is copied from libmysqld.c:mysql_real_connect + * + * Parameters: + * @param mysql - + * + * + * @return + * + * + * @details (write detailed description here) + * + */ +static unsigned long set_client_flags(MYSQL* mysql) +{ + unsigned long f = 0; + + f |= mysql->options.client_flag; + + /* Send client information for access check */ + f |= CLIENT_CAPABILITIES; + + if (f & CLIENT_MULTI_STATEMENTS) + { + f |= CLIENT_MULTI_RESULTS; + } + + /** + * No compression in embedded as we don't send any data, + * and no pluggable auth, as we cannot do a client-server dialog + */ + f &= ~(CLIENT_COMPRESS | CLIENT_PLUGIN_AUTH); + + if (mysql->options.db != NULL) + { + f |= CLIENT_CONNECT_WITH_DB; + } + + return f; +} + +static bool create_parse_tree(THD* thd) +{ + Parser_state parser_state; + bool failp = FALSE; + const char* virtual_db = "skygw_virtual"; + + if (parser_state.init(thd, thd->query(), thd->query_length())) + { + failp = TRUE; + goto return_here; + } + + thd->reset_for_next_command(); + + /** + * Set some database to thd so that parsing won't fail because of + * missing database. Then parse. + */ + failp = thd->set_db(virtual_db, strlen(virtual_db)); + + if (failp) + { + MXS_ERROR("Failed to set database in thread context."); + } + + failp = parse_sql(thd, &parser_state, NULL); + + if (failp) + { + MXS_DEBUG("%lu [readwritesplit:create_parse_tree] failed to " + "create parse tree.", + pthread_self()); + } + +return_here: + return failp; +} + +/** + * Detect query type by examining parsed representation of it. + * + * @param thd MariaDB thread context. + * + * @return Copy of query type value. + * + * + * @details Query type is deduced by checking for certain properties + * of them. The order is essential. Some SQL commands have multiple + * flags set and changing the order in which flags are tested, + * the resulting type may be different. + * + */ +static qc_query_type_t resolve_query_type(THD* thd) +{ + qc_query_type_t qtype = QUERY_TYPE_UNKNOWN; + u_int32_t type = QUERY_TYPE_UNKNOWN; + int set_autocommit_stmt = -1; /*< -1 no, 0 disable, 1 enable */ + LEX* lex; + Item* item; + /** + * By default, if sql_log_bin, that is, recording data modifications + * to binary log, is disabled, gateway treats operations normally. + * Effectively nothing is replicated. + * When force_data_modify_op_replication is TRUE, gateway distributes + * all write operations to all nodes. + */ +#if defined(NOT_IN_USE) + bool force_data_modify_op_replication; + force_data_modify_op_replication = FALSE; +#endif /* NOT_IN_USE */ + ss_info_dassert(thd != NULL, ("thd is NULL\n")); + + lex = thd->lex; + + /** SELECT ..INTO variable|OUTFILE|DUMPFILE */ + if (lex->result != NULL) + { + type = QUERY_TYPE_GSYSVAR_WRITE; + goto return_qtype; + } + + if (skygw_stmt_causes_implicit_commit(lex, &set_autocommit_stmt)) + { + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) + { + if (sql_command_flags[lex->sql_command] & + CF_IMPLICT_COMMIT_BEGIN) + { + MXS_INFO("Implicit COMMIT before executing the next command."); + } + else if (sql_command_flags[lex->sql_command] & + CF_IMPLICIT_COMMIT_END) + { + MXS_INFO("Implicit COMMIT after executing the next command."); + } + } + + if (set_autocommit_stmt == 1) + { + type |= QUERY_TYPE_ENABLE_AUTOCOMMIT; + } + + type |= QUERY_TYPE_COMMIT; + } + + if (set_autocommit_stmt == 0) + { + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) + { + MXS_INFO("Disable autocommit : implicit START TRANSACTION" + " before executing the next command."); + } + + type |= QUERY_TYPE_DISABLE_AUTOCOMMIT; + type |= QUERY_TYPE_BEGIN_TRX; + } + + if (lex->option_type == OPT_GLOBAL) + { + /** + * SHOW syntax http://dev.mysql.com/doc/refman/5.6/en/show.html + */ + if (lex->sql_command == SQLCOM_SHOW_VARIABLES) + { + type |= QUERY_TYPE_GSYSVAR_READ; + } + /** + * SET syntax http://dev.mysql.com/doc/refman/5.6/en/set-statement.html + */ + else if (lex->sql_command == SQLCOM_SET_OPTION) + { + type |= QUERY_TYPE_GSYSVAR_WRITE; + } + + /* + * SHOW GLOBAL STATUS - Route to master + */ + else if (lex->sql_command == SQLCOM_SHOW_STATUS) + { + type = QUERY_TYPE_WRITE; + } + /** + * REVOKE ALL, ASSIGN_TO_KEYCACHE, + * PRELOAD_KEYS, FLUSH, RESET, CREATE|ALTER|DROP SERVER + */ + + else + { + type |= QUERY_TYPE_GSYSVAR_WRITE; + } + + goto return_qtype; + } + else if (lex->option_type == OPT_SESSION) + { + /** + * SHOW syntax http://dev.mysql.com/doc/refman/5.6/en/show.html + */ + if (lex->sql_command == SQLCOM_SHOW_VARIABLES) + { + type |= QUERY_TYPE_SYSVAR_READ; + } + /** + * SET syntax http://dev.mysql.com/doc/refman/5.6/en/set-statement.html + */ + else if (lex->sql_command == SQLCOM_SET_OPTION) + { + /** Either user- or system variable write */ + type |= QUERY_TYPE_GSYSVAR_WRITE; + } + + goto return_qtype; + } + + /** + * 1:ALTER TABLE, TRUNCATE, REPAIR, OPTIMIZE, ANALYZE, CHECK. + * 2:CREATE|ALTER|DROP|TRUNCATE|RENAME TABLE, LOAD, CREATE|DROP|ALTER DB, + * CREATE|DROP INDEX, CREATE|DROP VIEW, CREATE|DROP TRIGGER, + * CREATE|ALTER|DROP EVENT, UPDATE, INSERT, INSERT(SELECT), + * DELETE, REPLACE, REPLACE(SELECT), CREATE|RENAME|DROP USER, + * GRANT, REVOKE, OPTIMIZE, CREATE|ALTER|DROP FUNCTION|PROCEDURE, + * CREATE SPFUNCTION, INSTALL|UNINSTALL PLUGIN + */ + if (is_log_table_write_query(lex->sql_command) || + is_update_query(lex->sql_command)) + { +#if defined(NOT_IN_USE) + + if (thd->variables.sql_log_bin == 0 && + force_data_modify_op_replication) + { + /** Not replicated */ + type |= QUERY_TYPE_SESSION_WRITE; + } + else +#endif /* NOT_IN_USE */ + { + /** Written to binlog, that is, replicated except tmp tables */ + type |= QUERY_TYPE_WRITE; /*< to master */ + + if (lex->sql_command == SQLCOM_CREATE_TABLE && + (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE)) + { + type |= QUERY_TYPE_CREATE_TMP_TABLE; /*< remember in router */ + } + } + + goto return_qtype; + } + + /** Try to catch session modifications here */ + switch (lex->sql_command) + { + /** fallthrough */ + case SQLCOM_CHANGE_DB: + case SQLCOM_DEALLOCATE_PREPARE: + type |= QUERY_TYPE_SESSION_WRITE; + break; + + case SQLCOM_SELECT: + case SQLCOM_SHOW_SLAVE_STAT: + type |= QUERY_TYPE_READ; + break; + + case SQLCOM_CALL: + type |= QUERY_TYPE_WRITE; + break; + + case SQLCOM_BEGIN: + type |= QUERY_TYPE_BEGIN_TRX; + goto return_qtype; + break; + + case SQLCOM_COMMIT: + type |= QUERY_TYPE_COMMIT; + goto return_qtype; + break; + + case SQLCOM_ROLLBACK: + type |= QUERY_TYPE_ROLLBACK; + goto return_qtype; + break; + + case SQLCOM_PREPARE: + type |= QUERY_TYPE_PREPARE_NAMED_STMT; + goto return_qtype; + break; + + case SQLCOM_SHOW_DATABASES: + type |= QUERY_TYPE_SHOW_DATABASES; + goto return_qtype; + break; + + case SQLCOM_SHOW_TABLES: + type |= QUERY_TYPE_SHOW_TABLES; + goto return_qtype; + break; + + default: + break; + } + +#if defined(UPDATE_VAR_SUPPORT) + + if (QTYPE_LESS_RESTRICTIVE_THAN_WRITE(type)) +#endif + if (QUERY_IS_TYPE(qtype, QUERY_TYPE_UNKNOWN) || + QUERY_IS_TYPE(qtype, QUERY_TYPE_LOCAL_READ) || + QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) || + QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ) || + QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ) || + QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_READ)) + { + /** + * These values won't change qtype more restrictive than write. + * UDFs and procedures could possibly cause session-wide write, + * but unless their content is replicated this is a limitation + * of this implementation. + * In other words : UDFs and procedures are not allowed to + * perform writes which are not replicated but need to repeat + * in every node. + * It is not sure if such statements exist. vraa 25.10.13 + */ + + /** + * Search for system functions, UDFs and stored procedures. + */ + for (item = thd->free_list; item != NULL; item = item->next) + { + Item::Type itype; + + itype = item->type(); + MXS_DEBUG("%lu [resolve_query_type] Item %s:%s", + pthread_self(), item->name, STRITEMTYPE(itype)); + + if (itype == Item::SUBSELECT_ITEM) + { + continue; + } + else if (itype == Item::FUNC_ITEM) + { + int func_qtype = QUERY_TYPE_UNKNOWN; + /** + * Item types: + * FIELD_ITEM = 0, FUNC_ITEM, + * SUM_FUNC_ITEM, STRING_ITEM, INT_ITEM, + * REAL_ITEM, NULL_ITEM, VARBIN_ITEM, + * COPY_STR_ITEM, FIELD_AVG_ITEM, + * DEFAULT_VALUE_ITEM, PROC_ITEM, + * COND_ITEM, REF_ITEM, FIELD_STD_ITEM, + * FIELD_VARIANCE_ITEM, + * INSERT_VALUE_ITEM, + * SUBSELECT_ITEM, ROW_ITEM, CACHE_ITEM, + * TYPE_HOLDER, PARAM_ITEM, + * TRIGGER_FIELD_ITEM, DECIMAL_ITEM, + * XPATH_NODESET, XPATH_NODESET_CMP, + * VIEW_FIXER_ITEM, + * EXPR_CACHE_ITEM == 27 + **/ + + Item_func::Functype ftype; + ftype = ((Item_func*) item)->functype(); + + /** + * Item_func types: + * + * UNKNOWN_FUNC = 0,EQ_FUNC, EQUAL_FUNC, + * NE_FUNC, LT_FUNC, LE_FUNC, + * GE_FUNC, GT_FUNC, FT_FUNC, + * LIKE_FUNC == 10, ISNULL_FUNC, ISNOTNULL_FUNC, + * COND_AND_FUNC, COND_OR_FUNC, XOR_FUNC, + * BETWEEN, IN_FUNC, + * MULT_EQUAL_FUNC, INTERVAL_FUNC, + * ISNOTNULLTEST_FUNC == 20, + * SP_EQUALS_FUNC, SP_DISJOINT_FUNC, + * SP_INTERSECTS_FUNC, + * SP_TOUCHES_FUNC, SP_CROSSES_FUNC, + * SP_WITHIN_FUNC, SP_CONTAINS_FUNC, + * SP_OVERLAPS_FUNC, + * SP_STARTPOINT, SP_ENDPOINT == 30, + * SP_EXTERIORRING, SP_POINTN, SP_GEOMETRYN, + * SP_INTERIORRINGN,NOT_FUNC, NOT_ALL_FUNC, + * NOW_FUNC, TRIG_COND_FUNC, + * SUSERVAR_FUNC, GUSERVAR_FUNC == 40, + * COLLATE_FUNC, EXTRACT_FUNC, + * CHAR_TYPECAST_FUNC, + * FUNC_SP, UDF_FUNC, NEG_FUNC, + * GSYSVAR_FUNC == 47 + **/ + switch (ftype) + { + case Item_func::FUNC_SP: + /** + * An unknown (for maxscale) function / sp + * belongs to this category. + */ + func_qtype |= QUERY_TYPE_WRITE; + MXS_DEBUG("%lu [resolve_query_type] " + "functype FUNC_SP, stored proc " + "or unknown function.", + pthread_self()); + break; + + case Item_func::UDF_FUNC: + func_qtype |= QUERY_TYPE_WRITE; + MXS_DEBUG("%lu [resolve_query_type] " + "functype UDF_FUNC, user-defined " + "function.", + pthread_self()); + break; + + case Item_func::NOW_FUNC: + func_qtype |= QUERY_TYPE_LOCAL_READ; + MXS_DEBUG("%lu [resolve_query_type] " + "functype NOW_FUNC, could be " + "executed in MaxScale.", + pthread_self()); + break; + + /** System session variable */ + case Item_func::GSYSVAR_FUNC: + func_qtype |= QUERY_TYPE_SYSVAR_READ; + MXS_DEBUG("%lu [resolve_query_type] " + "functype GSYSVAR_FUNC, system " + "variable read.", + pthread_self()); + break; + + /** User-defined variable read */ + case Item_func::GUSERVAR_FUNC: + func_qtype |= QUERY_TYPE_USERVAR_READ; + MXS_DEBUG("%lu [resolve_query_type] " + "functype GUSERVAR_FUNC, user " + "variable read.", + pthread_self()); + break; + + /** User-defined variable modification */ + case Item_func::SUSERVAR_FUNC: + /** + * Really it is user variable but we + * don't separate sql variables atm. + * 15.9.14 + */ + func_qtype |= QUERY_TYPE_GSYSVAR_WRITE; + MXS_DEBUG("%lu [resolve_query_type] " + "functype SUSERVAR_FUNC, user " + "variable write.", + pthread_self()); + break; + + case Item_func::UNKNOWN_FUNC: + + if (((Item_func*) item)->func_name() != NULL && + strcmp((char*) ((Item_func*) item)->func_name(), "last_insert_id") == 0) + { + func_qtype |= QUERY_TYPE_MASTER_READ; + } + else + { + func_qtype |= QUERY_TYPE_READ; + } + + /** + * Many built-in functions are of this + * type, for example, rand(), soundex(), + * repeat() . + */ + MXS_DEBUG("%lu [resolve_query_type] " + "functype UNKNOWN_FUNC, " + "typically some system function.", + pthread_self()); + break; + + default: + MXS_DEBUG("%lu [resolve_query_type] " + "Functype %d.", + pthread_self(), + ftype); + break; + } /**< switch */ + + /**< Set new query type */ + type |= func_qtype; + } + +#if defined(UPDATE_VAR_SUPPORT) + + /** + * Write is as restrictive as it gets due functions, + * so break. + */ + if ((type & QUERY_TYPE_WRITE) == QUERY_TYPE_WRITE) + { + break; + } + +#endif + } /**< for */ + } /**< if */ + +return_qtype: + qtype = (qc_query_type_t) type; + return qtype; +} + +/** + * Checks if statement causes implicit COMMIT. + * autocommit_stmt gets values 1, 0 or -1 if stmt is enable, disable or + * something else than autocommit. + * + * @param lex Parse tree + * @param autocommit_stmt memory address for autocommit status + * + * @return true if statement causes implicit commit and false otherwise + */ +static bool skygw_stmt_causes_implicit_commit(LEX* lex, int* autocommit_stmt) +{ + bool succp; + + if (!(sql_command_flags[lex->sql_command] & CF_AUTO_COMMIT_TRANS)) + { + succp = false; + goto return_succp; + } + + switch (lex->sql_command) + { + case SQLCOM_DROP_TABLE: + succp = !(lex->create_info.options & HA_LEX_CREATE_TMP_TABLE); + break; + + case SQLCOM_ALTER_TABLE: + case SQLCOM_CREATE_TABLE: + /* If CREATE TABLE of non-temporary table, do implicit commit */ + succp = !(lex->create_info.options & HA_LEX_CREATE_TMP_TABLE); + break; + + case SQLCOM_SET_OPTION: + if ((*autocommit_stmt = is_autocommit_stmt(lex)) == 1) + { + succp = true; + } + else + { + succp = false; + } + + break; + + default: + succp = true; + break; + } + +return_succp: + return succp; +} + +/** + * Finds out if stmt is SET autocommit + * and if the new value matches with the enable_cmd argument. + * + * @param lex parse tree + * + * @return 1, 0, or -1 if command was: + * enable, disable, or not autocommit, respectively. + */ +static int is_autocommit_stmt(LEX* lex) +{ + struct list_node* node; + set_var* setvar; + int rc = -1; + static char target[8]; /*< for converted string */ + Item* item = NULL; + + node = lex->var_list.first_node(); + setvar = (set_var*) node->info; + + if (setvar == NULL) + { + goto return_rc; + } + + do /*< Search for the last occurrence of 'autocommit' */ + { + if ((sys_var*) setvar->var == Sys_autocommit_ptr) + { + item = setvar->value; + } + + node = node->next; + } + while ((setvar = (set_var*) node->info) != NULL); + + if (item != NULL) /*< found autocommit command */ + { + if (item->type() == Item::INT_ITEM) /*< '0' or '1' */ + { + rc = item->val_int(); + + if (rc > 1 || rc < 0) + { + rc = -1; + } + } + else if (item->type() == Item::STRING_ITEM) /*< 'on' or 'off' */ + { + String str(target, sizeof (target), system_charset_info); + String* res = item->val_str(&str); + + if ((rc = find_type(&bool_typelib, res->ptr(), res->length(), false))) + { + ss_dassert(rc >= 0 && rc <= 2); + /** + * rc is the position of matchin string in + * typelib's value array. + * 1=OFF, 2=ON. + */ + rc -= 1; + } + } + } + +return_rc: + return rc; +} + +#if defined(NOT_USED) + +char* qc_get_stmtname(GWBUF* buf) +{ + MYSQL* mysql; + + if (buf == NULL || + buf->gwbuf_bufobj == NULL || + buf->gwbuf_bufobj->bo_data == NULL || + (mysql = (MYSQL *) ((parsing_info_t *) buf->gwbuf_bufobj->bo_data)->pi_handle) == NULL || + mysql->thd == NULL || + (THD *) (mysql->thd))->lex == NULL || + (THD *) (mysql->thd))->lex->prepared_stmt_name == NULL) + { + return NULL; + } + + return ((THD *) (mysql->thd))->lex->prepared_stmt_name.str; +} +#endif + +/** + * Get the parse tree from parsed querybuf. + * @param querybuf The parsed GWBUF + * + * @return Pointer to the LEX struct or NULL if an error occurred or the query + * was not parsed + */ +LEX* get_lex(GWBUF* querybuf) +{ + + parsing_info_t* pi; + MYSQL* mysql; + THD* thd; + + if (querybuf == NULL || !GWBUF_IS_PARSED(querybuf)) + { + return NULL; + } + + pi = (parsing_info_t *) gwbuf_get_buffer_object_data(querybuf, GWBUF_PARSING_INFO); + + if (pi == NULL) + { + return NULL; + } + + if ((mysql = (MYSQL *) pi->pi_handle) == NULL || + (thd = (THD *) mysql->thd) == NULL) + { + ss_dassert(mysql != NULL && thd != NULL); + return NULL; + } + + return thd->lex; +} + +/** + * Finds the head of the list of tables affected by the current select statement. + * @param thd Pointer to a valid THD + * @return Pointer to the head of the TABLE_LIST chain or NULL in case of an error + */ +static void* skygw_get_affected_tables(void* lexptr) +{ + LEX* lex = (LEX*) lexptr; + + if (lex == NULL || lex->current_select == NULL) + { + ss_dassert(lex != NULL && lex->current_select != NULL); + return NULL; + } + + return (void*) lex->current_select->table_list.first; +} + +/** + * Reads the parsetree and lists all the affected tables and views in the query. + * In the case of an error, the size of the table is set to zero and no memory + * is allocated. The caller must free the allocated memory. + * + * @param querybuf GWBUF where the table names are extracted from + * @param tblsize Pointer where the number of tables is written + * @return Array of null-terminated strings with the table names + */ +char** qc_get_table_names(GWBUF* querybuf, int* tblsize, bool fullnames) +{ + LEX* lex; + TABLE_LIST* tbl; + int i = 0, currtblsz = 0; + char **tables = NULL, **tmp = NULL; + + if (querybuf == NULL || tblsize == NULL) + { + goto retblock; + } + + if (!ensure_query_is_parsed(querybuf)) + { + goto retblock; + } + + if ((lex = get_lex(querybuf)) == NULL || lex->current_select == NULL) + { + goto retblock; + } + + lex->current_select = lex->all_selects_list; + + while (lex->current_select) + { + tbl = (TABLE_LIST*) skygw_get_affected_tables(lex); + + while (tbl) + { + if (i >= currtblsz) + { + tmp = (char**) malloc(sizeof (char*)*(currtblsz * 2 + 1)); + + if (tmp) + { + if (currtblsz > 0) + { + for (int x = 0; x < currtblsz; x++) + { + tmp[x] = tables[x]; + } + + free(tables); + } + + tables = tmp; + currtblsz = currtblsz * 2 + 1; + } + } + + if (tmp != NULL) + { + char *catnm = NULL; + + if (fullnames) + { + if (tbl->db && + strcmp(tbl->db, "skygw_virtual") != 0) + { + catnm = (char*) calloc(strlen(tbl->db) + + strlen(tbl->table_name) + + 2, + sizeof (char)); + strcpy(catnm, tbl->db); + strcat(catnm, "."); + strcat(catnm, tbl->table_name); + } + } + + if (catnm) + { + tables[i++] = catnm; + } + else + { + tables[i++] = strdup(tbl->table_name); + } + + tbl = tbl->next_local; + } + } /*< while (tbl) */ + + lex->current_select = lex->current_select->next_select_in_list(); + } /*< while(lex->current_select) */ + +retblock: + + if (tblsize) + { + *tblsize = i; + } + + return tables; +} + +/** + * Extract, allocate memory and copy the name of the created table. + * @param querybuf Buffer to use. + * @return A pointer to the name if a table was created, otherwise NULL + */ +char* qc_get_created_table_name(GWBUF* querybuf) +{ + if (querybuf == NULL) + { + return NULL; + } + + if (!ensure_query_is_parsed(querybuf)) + { + return NULL; + } + + LEX* lex = get_lex(querybuf); + + if (lex == NULL) + { + return NULL; + } + + if (lex->create_last_non_select_table && + lex->create_last_non_select_table->table_name) + { + char* name = strdup(lex->create_last_non_select_table->table_name); + return name; + } + else + { + return NULL; + } +} + +/** + * Checks whether the query is a "real" query ie. SELECT,UPDATE,INSERT,DELETE or + * any variation of these. Queries that affect the underlying database are not + * considered as real queries and the queries that target specific row or + * variable data are regarded as the real queries. + * + * @param GWBUF to analyze + * + * @return true if the query is a real query, otherwise false + */ +bool qc_is_real_query(GWBUF* querybuf) +{ + bool succp; + LEX* lex; + + if (querybuf == NULL) + { + succp = false; + goto retblock; + } + + if (!ensure_query_is_parsed(querybuf)) + { + succp = false; + goto retblock; + } + + if ((lex = get_lex(querybuf)) == NULL) + { + succp = false; + goto retblock; + } + + switch (lex->sql_command) + { + case SQLCOM_SELECT: + succp = lex->all_selects_list->table_list.elements > 0; + goto retblock; + break; + + case SQLCOM_UPDATE: + case SQLCOM_INSERT: + case SQLCOM_INSERT_SELECT: + case SQLCOM_DELETE: + case SQLCOM_TRUNCATE: + case SQLCOM_REPLACE: + case SQLCOM_REPLACE_SELECT: + case SQLCOM_PREPARE: + case SQLCOM_EXECUTE: + succp = true; + goto retblock; + break; + + default: + succp = false; + goto retblock; + break; + } + +retblock: + return succp; +} + +/** + * Checks whether the buffer contains a DROP TABLE... query. + * @param querybuf Buffer to inspect + * @return true if it contains the query otherwise false + */ +bool qc_is_drop_table_query(GWBUF* querybuf) +{ + bool answer = false; + + if (querybuf) + { + if (ensure_query_is_parsed(querybuf)) + { + LEX* lex = get_lex(querybuf); + + answer = lex && lex->sql_command == SQLCOM_DROP_TABLE; + } + } + + return answer; +} + +inline void add_str(char** buf, int* buflen, int* bufsize, char* str) +{ + int isize = strlen(str) + 1; + + if (*buf == NULL || isize + *buflen >= *bufsize) + { + *bufsize = (*bufsize) * 2 + isize; + char *tmp = (char*) realloc(*buf, (*bufsize) * sizeof (char)); + + if (tmp == NULL) + { + MXS_ERROR("Error: memory reallocation failed."); + free(*buf); + *buf = NULL; + *bufsize = 0; + } + + *buf = tmp; + } + + if (*buflen > 0) + { + if (*buf) + { + strcat(*buf, " "); + } + } + + if (*buf) + { + strcat(*buf, str); + } + + *buflen += isize; + +} + +/** + * Returns all the fields that the query affects. + * @param buf Buffer to parse + * @return Pointer to newly allocated string or NULL if nothing was found + */ +char* qc_get_affected_fields(GWBUF* buf) +{ + LEX* lex; + int buffsz = 0, bufflen = 0; + char* where = NULL; + Item* item; + Item::Type itype; + + if (!buf) + { + return NULL; + } + + if (!ensure_query_is_parsed(buf)) + { + return NULL; + } + + if ((lex = get_lex(buf)) == NULL) + { + return NULL; + } + + lex->current_select = lex->all_selects_list; + + if ((where = (char*) malloc(sizeof (char)*1)) == NULL) + { + MXS_ERROR("Memory allocation failed."); + return NULL; + } + + *where = '\0'; + + while (lex->current_select) + { + + List_iterator ilist(lex->current_select->item_list); + item = (Item*) ilist.next(); + + for (; item != NULL; item = (Item*) ilist.next()) + { + + itype = item->type(); + + if (item->name && itype == Item::FIELD_ITEM) + { + add_str(&where, &buffsz, &bufflen, item->name); + } + } + + + if (lex->current_select->where) + { + for (item = lex->current_select->where; item != NULL; item = item->next) + { + + itype = item->type(); + + if (item->name && itype == Item::FIELD_ITEM) + { + add_str(&where, &buffsz, &bufflen, item->name); + } + } + } + + if (lex->current_select->having) + { + for (item = lex->current_select->having; item != NULL; item = item->next) + { + + itype = item->type(); + + if (item->name && itype == Item::FIELD_ITEM) + { + add_str(&where, &buffsz, &bufflen, item->name); + } + } + } + + lex->current_select = lex->current_select->next_select_in_list(); + } + + return where; +} + +bool qc_query_has_clause(GWBUF* buf) +{ + bool clause = false; + + if (buf) + { + if (ensure_query_is_parsed(buf)) + { + LEX* lex = get_lex(buf); + + if (lex) + { + SELECT_LEX* current = lex->all_selects_list; + + while (current && !clause) + { + if (current->where || current->having) + { + clause = true; + } + + current = current->next_select_in_list(); + } + } + } + } + + return clause; +} + +/* + * Replace user-provided literals with question marks. + * + * @param querybuf GWBUF with a COM_QUERY statement + * @return A copy of the query in its canonical form or NULL if an error occurred. + */ +char* qc_get_canonical(GWBUF* querybuf) +{ + char *querystr = NULL; + if (GWBUF_LENGTH(querybuf) > MYSQL_COM_QUERY_HEADER_SIZE && GWBUF_IS_SQL(querybuf)) + { + size_t srcsize = GWBUF_LENGTH(querybuf) - MYSQL_COM_QUERY_HEADER_SIZE; + char *src = (char*) malloc(srcsize); + size_t destsize = 0; + char *dest = NULL; + if (src) + { + memcpy(src, (uint8_t*) GWBUF_DATA(querybuf) + MYSQL_COM_QUERY_HEADER_SIZE, + srcsize); + if (replace_quoted((const char**) &src, &srcsize, &dest, &destsize) && + remove_mysql_comments((const char**) &dest, &destsize, &src, &srcsize) && + replace_values((const char**) &src, &srcsize, &dest, &destsize)) + { + querystr = dest; + } + free(src); + } + } + return querystr; +} + +/** + * Create parsing information; initialize mysql handle, allocate parsing info + * struct and set handle and free function pointer to it. + * + * @param donefun pointer to free function + * + * @return pointer to parsing information + */ +static parsing_info_t* parsing_info_init(void (*donefun)(void *)) +{ + parsing_info_t* pi = NULL; + MYSQL* mysql; + const char* user = "skygw"; + const char* db = "skygw"; + + ss_dassert(donefun != NULL); + + /** Get server handle */ + mysql = mysql_init(NULL); + + if (mysql == NULL) + { + MXS_ERROR("Call to mysql_real_connect failed due %d, %s.", + mysql_errno(mysql), + mysql_error(mysql)); + ss_dassert(mysql != NULL); + goto retblock; + } + + /** Set methods and authentication to mysql */ + mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "libmysqld_skygw"); + mysql_options(mysql, MYSQL_OPT_USE_EMBEDDED_CONNECTION, NULL); + mysql->methods = &embedded_methods; + mysql->user = my_strdup(user, MYF(0)); + mysql->db = my_strdup(db, MYF(0)); + mysql->passwd = NULL; + + pi = (parsing_info_t*) calloc(1, sizeof (parsing_info_t)); + + if (pi == NULL) + { + mysql_close(mysql); + goto retblock; + } + +#if defined(SS_DEBUG) + pi->pi_chk_top = CHK_NUM_PINFO; + pi->pi_chk_tail = CHK_NUM_PINFO; +#endif + /** Set handle and free function to parsing info struct */ + pi->pi_handle = mysql; + pi->pi_done_fp = donefun; + +retblock: + return pi; +} + +/** + * Free function for parsing info. Called by gwbuf_free or in case initialization + * of parsing information fails. + * + * @param ptr Pointer to parsing information, cast required + * + * @return void + * + */ +static void parsing_info_done(void* ptr) +{ + parsing_info_t* pi; + THD* thd; + + if (ptr) + { + pi = (parsing_info_t *) ptr; + + if (pi->pi_handle != NULL) + { + MYSQL* mysql = (MYSQL *) pi->pi_handle; + + if (mysql->thd != NULL) + { + thd = (THD*) mysql->thd; + thd->end_statement(); + (*mysql->methods->free_embedded_thd)(mysql); + mysql->thd = NULL; + } + + mysql_close(mysql); + } + + /** Free plain text query string */ + if (pi->pi_query_plain_str != NULL) + { + free(pi->pi_query_plain_str); + } + + free(pi); + } +} + +/** + * Add plain text query string to parsing info. + * + * @param ptr Pointer to parsing info struct, cast required + * @param str String to be added + * + * @return void + */ +static void parsing_info_set_plain_str(void* ptr, char* str) +{ + parsing_info_t* pi = (parsing_info_t *) ptr; + CHK_PARSING_INFO(pi); + + pi->pi_query_plain_str = str; +} + +/** + * Generate a string of query type value. + * Caller must free the memory of the resulting string. + * + * @param qtype Query type value, combination of values listed in + * query_classifier.h + * + * @return string representing the query type value + */ +char* qc_get_qtype_str(qc_query_type_t qtype) +{ + int t1 = (int) qtype; + int t2 = 1; + qc_query_type_t t = QUERY_TYPE_UNKNOWN; + char* qtype_str = NULL; + + /** + * Test values (bits) and clear matching bits from t1 one by one until + * t1 is completely cleared. + */ + while (t1 != 0) + { + if (t1 & t2) + { + t = (qc_query_type_t) t2; + + if (qtype_str == NULL) + { + qtype_str = strdup(STRQTYPE(t)); + } + else + { + size_t len = strlen(STRQTYPE(t)); + /** reallocate space for delimiter, new string and termination */ + qtype_str = (char *) realloc(qtype_str, strlen(qtype_str) + 1 + len + 1); + snprintf(qtype_str + strlen(qtype_str), 1 + len + 1, "|%s", STRQTYPE(t)); + } + + /** Remove found value from t1 */ + t1 &= ~t2; + } + + t2 <<= 1; + } + + return qtype_str; +} + +/** + * Returns an array of strings of databases that this query uses. + * If the database isn't defined in the query, it is assumed that this query + * only targets the current database. + * The value of @p size is set to the number of allocated strings. The caller is + * responsible for freeing all the allocated memory. + * @param querybuf GWBUF containing the query + * @param size Size of the resulting array + * @return A new array of strings containing the database names or NULL if no + * databases were found. + */ +char** qc_get_database_names(GWBUF* querybuf, int* size) +{ + LEX* lex; + TABLE_LIST* tbl; + char **databases = NULL, **tmp = NULL; + int currsz = 0, i = 0; + + if (!querybuf) + { + goto retblock; + } + + if (!ensure_query_is_parsed(querybuf)) + { + goto retblock; + } + + if ((lex = get_lex(querybuf)) == NULL) + { + goto retblock; + } + + lex->current_select = lex->all_selects_list; + + while (lex->current_select) + { + tbl = lex->current_select->table_list.first; + + while (tbl) + { + if (strcmp(tbl->db, "skygw_virtual") != 0) + { + if (i >= currsz) + { + tmp = (char**) realloc(databases, + sizeof (char*)*(currsz * 2 + 1)); + + if (tmp == NULL) + { + goto retblock; + } + + databases = tmp; + currsz = currsz * 2 + 1; + } + + databases[i++] = strdup(tbl->db); + } + + tbl = tbl->next_local; + } + + lex->current_select = lex->current_select->next_select_in_list(); + } + +retblock: + *size = i; + return databases; +} + +qc_query_op_t qc_get_operation(GWBUF* querybuf) +{ + qc_query_op_t operation = QUERY_OP_UNDEFINED; + + if (querybuf) + { + if (ensure_query_is_parsed(querybuf)) + { + LEX* lex = get_lex(querybuf); + + if (lex) + { + switch (lex->sql_command) + { + case SQLCOM_SELECT: + operation = QUERY_OP_SELECT; + break; + + case SQLCOM_CREATE_TABLE: + operation = QUERY_OP_CREATE_TABLE; + break; + + case SQLCOM_CREATE_INDEX: + operation = QUERY_OP_CREATE_INDEX; + break; + + case SQLCOM_ALTER_TABLE: + operation = QUERY_OP_ALTER_TABLE; + break; + + case SQLCOM_UPDATE: + operation = QUERY_OP_UPDATE; + break; + + case SQLCOM_INSERT: + operation = QUERY_OP_INSERT; + break; + + case SQLCOM_INSERT_SELECT: + operation = QUERY_OP_INSERT_SELECT; + break; + + case SQLCOM_DELETE: + operation = QUERY_OP_DELETE; + break; + + case SQLCOM_TRUNCATE: + operation = QUERY_OP_TRUNCATE; + break; + + case SQLCOM_DROP_TABLE: + operation = QUERY_OP_DROP_TABLE; + break; + + case SQLCOM_DROP_INDEX: + operation = QUERY_OP_DROP_INDEX; + break; + + case SQLCOM_CHANGE_DB: + operation = QUERY_OP_CHANGE_DB; + break; + + case SQLCOM_LOAD: + operation = QUERY_OP_LOAD; + break; + + default: + operation = QUERY_OP_UNDEFINED; + } + } + } + } + + return operation; +} + + +bool qc_init(int argc, char** argv, char** groups) +{ + int rc = mysql_library_init(argc, argv, groups); + + if (rc != 0) + { + MXS_ERROR("mysql_library_init() failed. Error code: %d", rc); + } + else + { + MXS_NOTICE("Query classifier initialized."); + } + + return rc == 0; +} + +void qc_end() +{ + mysql_library_end(); +} + +bool qc_thread_init() +{ + bool inited = (mysql_thread_init() == 0); + + if (!inited) + { + MXS_ERROR("mysql_thread_init() failed."); + } + + return inited; +} + +void qc_thread_end() +{ + mysql_thread_end(); +} + +/** + * EXPORTS + */ + +static char version_string[] = "V1.0.0"; + +static QUERY_CLASSIFIER qc = +{ + qc_init, + qc_end, + qc_thread_init, + qc_thread_end, + qc_get_type, + qc_get_operation, + qc_get_created_table_name, + qc_is_drop_table_query, + qc_is_real_query, + qc_get_table_names, + qc_get_canonical, + qc_query_has_clause, + qc_get_qtype_str, + qc_get_affected_fields, + qc_get_database_names, +}; + + +MODULE_INFO info = +{ + MODULE_API_QUERY_CLASSIFIER, + MODULE_IN_DEVELOPMENT, + QUERY_CLASSIFIER_VERSION, + const_cast("Query classifier based upon MySQL Embedded"), +}; + +char* version() +{ + return const_cast(version_string); +} + +void ModuleInit() +{ +} + +QUERY_CLASSIFIER* GetModuleObject() +{ + return &qc; +} + + diff --git a/query_classifier/qc_mysqlembedded/qc_mysqlembedded.map b/query_classifier/qc_mysqlembedded/qc_mysqlembedded.map new file mode 100644 index 000000000..baad642c0 --- /dev/null +++ b/query_classifier/qc_mysqlembedded/qc_mysqlembedded.map @@ -0,0 +1,9 @@ +{ + global: + info; + version; + GetModuleInfo; + ModuleInit; + local: + *; +};