diff --git a/Documentation/Tutorials/Replication-Proxy-Binlog-Router-Tutorial.md b/Documentation/Tutorials/Replication-Proxy-Binlog-Router-Tutorial.md index 23995abcc..d1003a24b 100644 --- a/Documentation/Tutorials/Replication-Proxy-Binlog-Router-Tutorial.md +++ b/Documentation/Tutorials/Replication-Proxy-Binlog-Router-Tutorial.md @@ -117,7 +117,7 @@ This parameter is used to define the maximum amount of data that will be sent to This parameter is used to enable/disable incomplete transactions detection in binlog router. When MaxScale starts an error message may appear if current binlog file is corrupted or an incomplete transaction is found. During normal operations binlog events are not distributed to the slaves until a COMMIT is seen. -The default value is on, set transaction_safety=off to completely disable the incomplete transactions detection. +The default value is off, set transaction_safety=on to enable the incomplete transactions detection. A complete example of a service entry for a binlog router service would be as follows. ``` diff --git a/cmake/package_deb.cmake b/cmake/package_deb.cmake index 90d66d4bc..ea07b6451 100644 --- a/cmake/package_deb.cmake +++ b/cmake/package_deb.cmake @@ -3,4 +3,3 @@ set(CPACK_GENERATOR "${CPACK_GENERATOR};DEB") set(CPACK_DEBIAN_PACKAGE_CONTROL_EXTRA "${CMAKE_BINARY_DIR}/postinst;{CMAKE_BINARY_DIR}/postrm") execute_process(COMMAND dpgk --print-architecture OUTPUT_VARIABLE DEB_ARCHITECTURE) set(CPACK_DEBIAN_PACKAGE_ARCHITECTURE ${DEB_ARCHITECTURE}) -set (CPACK_DEBIAN_PACKAGE_SHLIBDEPS ON) diff --git a/query_classifier/query_classifier.cc b/query_classifier/query_classifier.cc index 6a0831efc..48783db4d 100644 --- a/query_classifier/query_classifier.cc +++ b/query_classifier/query_classifier.cc @@ -1,25 +1,25 @@ /** * @section LICENCE - * + * * This file is distributed as part of the MariaDB Corporation MaxScale. It is * free software: you can redistribute it and/or modify it under * the terms of the GNU General Public License as published by the * Free Software Foundation, version 2. - * + * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. - * + * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA * 02110-1301 USA. - * + * * Copyright MariaDB Corporation Ab - * - * @file - * + * + * @file + * */ #define EMBEDDED_LIBRARY @@ -27,7 +27,7 @@ #define MYSQL_LEX012 #define MYSQL_SERVER #if defined(MYSQL_CLIENT) -# undef MYSQL_CLIENT +#undef MYSQL_CLIENT #endif #include @@ -67,894 +67,929 @@ static THD* get_or_create_thd_for_parsing(MYSQL* mysql, char* query_str); static unsigned long set_client_flags(MYSQL* mysql); static bool create_parse_tree(THD* thd); static skygw_query_type_t resolve_query_type(THD* thd); -static bool skygw_stmt_causes_implicit_commit( - LEX* lex, - int* autocommit_stmt); +static bool skygw_stmt_causes_implicit_commit(LEX* lex, int* autocommit_stmt); static int is_autocommit_stmt(LEX* lex); static void parsing_info_set_plain_str(void* ptr, char* str); static void* skygw_get_affected_tables(void* lexptr); - /** - * Calls parser for the query includede in the buffer. Creates and adds parsing - * information to buffer if it doesn't exist already. Resolves the query type. - * + * Calls parser for the query includede in the buffer. Creates and adds parsing + * information to buffer if it doesn't exist already. Resolves the query type. + * * @param querybuf buffer including the query and possibly the parsing information - * + * * @return query type */ -skygw_query_type_t query_classifier_get_type( - GWBUF* querybuf) +skygw_query_type_t query_classifier_get_type(GWBUF* querybuf) { - MYSQL* mysql; - skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; - bool succp; - - ss_info_dassert(querybuf != NULL, ("querybuf is NULL")); - - if (querybuf == NULL) - { - succp = false; - goto retblock; - } - /** Create parsing info for the query and store it to buffer */ - succp = query_is_parsed(querybuf); - - if (!succp) - { - succp = parse_query(querybuf); - } - /** Read thd pointer and resolve the query type with it. */ - if (succp) - { - parsing_info_t* pi; - - pi = (parsing_info_t*)gwbuf_get_buffer_object_data(querybuf, - GWBUF_PARSING_INFO); - - if (pi != NULL) - { - mysql = (MYSQL *)pi->pi_handle; + MYSQL* mysql; + skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; + bool succp; - /** Find out the query type */ - if (mysql != NULL) - { - qtype = resolve_query_type((THD *)mysql->thd); - } - } + ss_info_dassert(querybuf != NULL, ("querybuf is NULL")); + + if (querybuf == NULL) + { + succp = false; + goto retblock; + } + + /** Create parsing info for the query and store it to buffer */ + succp = query_is_parsed(querybuf); + + if (!succp) + { + succp = parse_query(querybuf); + } + + /** Read thd pointer and resolve the query type with it. */ + if (succp) + { + parsing_info_t* pi; + + pi = (parsing_info_t*) gwbuf_get_buffer_object_data(querybuf, + GWBUF_PARSING_INFO); + + if (pi != NULL) + { + mysql = (MYSQL *) pi->pi_handle; + + /** Find out the query type */ + if (mysql != NULL) + { + qtype = resolve_query_type((THD *) mysql->thd); + } } + } + retblock: - return qtype; + return qtype; } /** * Create parsing info and try to parse the query included in the query buffer. * Store pointer to created parse_tree_t object to buffer. - * + * * @param querybuf buffer including the query and possibly the parsing information - * + * * @return true if succeed, false otherwise */ -bool parse_query ( - GWBUF* querybuf) +bool parse_query(GWBUF* querybuf) { - bool succp; - THD* thd; - uint8_t* data; - size_t len; - char* query_str = NULL; - parsing_info_t* pi; - - CHK_GWBUF(querybuf); - /** Do not parse without releasing previous parse info first */ - ss_dassert(!query_is_parsed(querybuf)); - - if (querybuf == NULL || query_is_parsed(querybuf)) - { - return false; - } - /** Create parsing info */ - pi = parsing_info_init(parsing_info_done); - - if (pi == NULL) - { - succp = false; - goto retblock; - } - /** Extract query and copy it to different buffer */ - data = (uint8_t*)GWBUF_DATA(querybuf); - len = MYSQL_GET_PACKET_LEN(data)-1; /*< distract 1 for packet type byte */ - + bool succp; + THD* thd; + uint8_t* data; + size_t len; + char* query_str = NULL; + parsing_info_t* pi; - if (len < 1 || len >= ~((size_t)0) - 1 || (query_str = (char *)malloc(len+1)) == NULL) - { - /** Free parsing info data */ - parsing_info_done(pi); - succp = false; - goto retblock; - } - memcpy(query_str, &data[5], len); - memset(&query_str[len], 0, 1); - parsing_info_set_plain_str(pi, query_str); - - /** Get one or create new THD object to be use in parsing */ - thd = get_or_create_thd_for_parsing((MYSQL *)pi->pi_handle, query_str); - - if (thd == NULL) - { - /** Free parsing info data */ - parsing_info_done(pi); - succp = false; - goto retblock; - } - /** - * Create parse_tree inside thd. - * thd and lex are readable even if creating parse tree fails. - */ - create_parse_tree(thd); - /** Add complete parsing info struct to the query buffer */ - gwbuf_add_buffer_object(querybuf, - GWBUF_PARSING_INFO, - (void *)pi, - parsing_info_done); - - succp = true; + CHK_GWBUF(querybuf); + /** Do not parse without releasing previous parse info first */ + ss_dassert(!query_is_parsed(querybuf)); + + if (querybuf == NULL || query_is_parsed(querybuf)) + { + return false; + } + + /** Create parsing info */ + pi = parsing_info_init(parsing_info_done); + + if (pi == NULL) + { + succp = false; + goto retblock; + } + + /** Extract query and copy it to different buffer */ + data = (uint8_t*) GWBUF_DATA(querybuf); + len = MYSQL_GET_PACKET_LEN(data) - 1; /*< distract 1 for packet type byte */ + + + if (len < 1 || len >= ~((size_t) 0) - 1 || (query_str = (char *) malloc(len + 1)) == NULL) + { + /** Free parsing info data */ + parsing_info_done(pi); + succp = false; + goto retblock; + } + + memcpy(query_str, &data[5], len); + memset(&query_str[len], 0, 1); + parsing_info_set_plain_str(pi, query_str); + + /** Get one or create new THD object to be use in parsing */ + thd = get_or_create_thd_for_parsing((MYSQL *) pi->pi_handle, query_str); + + if (thd == NULL) + { + /** Free parsing info data */ + parsing_info_done(pi); + succp = false; + goto retblock; + } + + /** + * Create parse_tree inside thd. + * thd and lex are readable even if creating parse tree fails. + */ + create_parse_tree(thd); + /** Add complete parsing info struct to the query buffer */ + gwbuf_add_buffer_object(querybuf, + GWBUF_PARSING_INFO, + (void *) pi, + parsing_info_done); + + succp = true; retblock: - return succp; + return succp; } - /** * If buffer has non-NULL gwbuf_parsing_info it is parsed and it has parsing * information included. - * + * * @param buf buffer being examined - * + * * @return true or false */ -bool query_is_parsed( - GWBUF* buf) +bool query_is_parsed(GWBUF* buf) { - CHK_GWBUF(buf); - return (buf != NULL && GWBUF_IS_PARSED(buf)); + CHK_GWBUF(buf); + return (buf != NULL && GWBUF_IS_PARSED(buf)); } - /** * Create a thread context, thd, init embedded server, connect to it, and allocate * query to thd. * * Parameters: * @param mysql Database handle - * + * * @param query_str Query in plain txt string * * @return Thread context pointer * */ -static THD* get_or_create_thd_for_parsing( - MYSQL* mysql, - char* query_str) +static THD* get_or_create_thd_for_parsing(MYSQL* mysql, char* query_str) { - THD* thd = NULL; - unsigned long client_flags; - char* db = mysql->options.db; - bool failp = FALSE; - size_t query_len; + THD* thd = NULL; + unsigned long client_flags; + char* db = mysql->options.db; + bool failp = FALSE; + size_t query_len; - ss_info_dassert(mysql != NULL, ("mysql is NULL")); - ss_info_dassert(query_str != NULL, ("query_str is NULL")); + ss_info_dassert(mysql != NULL, ("mysql is NULL")); + ss_info_dassert(query_str != NULL, ("query_str is NULL")); - query_len = strlen(query_str); - client_flags = set_client_flags(mysql); - - /** Get THD. - * NOTE: Instead of creating new every time, THD instance could - * be get from a pool of them. - */ - thd = (THD *)create_embedded_thd(client_flags); + query_len = strlen(query_str); + client_flags = set_client_flags(mysql); - if (thd == NULL) { - MXS_ERROR("Failed to create thread context for parsing."); - goto return_thd; - } - mysql->thd = thd; - init_embedded_mysql(mysql, client_flags); - failp = check_embedded_connection(mysql, db); + /** Get THD. + * NOTE: Instead of creating new every time, THD instance could + * be get from a pool of them. + */ + thd = (THD *) create_embedded_thd(client_flags); - if (failp) { - MXS_ERROR("Call to check_embedded_connection failed."); - goto return_err_with_thd; - } - thd->clear_data_list(); - - /** Check that we are calling the client functions in right order */ - if (mysql->status != MYSQL_STATUS_READY) { - set_mysql_error(mysql, CR_COMMANDS_OUT_OF_SYNC, unknown_sqlstate); - MXS_ERROR("Invalid status %d in embedded server.", - mysql->status); - goto return_err_with_thd; - } - /** Clear result variables */ - thd->current_stmt= NULL; - thd->store_globals(); - /** - * We have to call free_old_query before we start to fill mysql->fields - * for new query. In the case of embedded server we collect field data - * during query execution (not during data retrieval as it is in remote - * client). So we have to call free_old_query here - */ - free_old_query(mysql); - thd->extra_length = query_len; - thd->extra_data = query_str; - alloc_query(thd, query_str, query_len); + if (thd == NULL) + { + MXS_ERROR("Failed to create thread context for parsing."); goto return_thd; - + } + + mysql->thd = thd; + init_embedded_mysql(mysql, client_flags); + failp = check_embedded_connection(mysql, db); + + if (failp) + { + MXS_ERROR("Call to check_embedded_connection failed."); + goto return_err_with_thd; + } + + thd->clear_data_list(); + + /** Check that we are calling the client functions in right order */ + if (mysql->status != MYSQL_STATUS_READY) + { + set_mysql_error(mysql, CR_COMMANDS_OUT_OF_SYNC, unknown_sqlstate); + MXS_ERROR("Invalid status %d in embedded server.", + mysql->status); + goto return_err_with_thd; + } + + /** Clear result variables */ + thd->current_stmt = NULL; + thd->store_globals(); + /** + * We have to call free_old_query before we start to fill mysql->fields + * for new query. In the case of embedded server we collect field data + * during query execution (not during data retrieval as it is in remote + * client). So we have to call free_old_query here + */ + free_old_query(mysql); + thd->extra_length = query_len; + thd->extra_data = query_str; + alloc_query(thd, query_str, query_len); + goto return_thd; + return_err_with_thd: - (*mysql->methods->free_embedded_thd)(mysql); - thd = 0; - mysql->thd = 0; + (*mysql->methods->free_embedded_thd)(mysql); + thd = 0; + mysql->thd = 0; return_thd: - return thd; + return thd; } - - -/** - * @node Set client flags. This is copied from libmysqld.c:mysql_real_connect +/** + * @node Set client flags. This is copied from libmysqld.c:mysql_real_connect * * Parameters: * @param mysql - * * - * @return + * @return + * * - * * @details (write detailed description here) * */ -static unsigned long set_client_flags( - MYSQL* mysql) +static unsigned long set_client_flags(MYSQL* mysql) { - unsigned long f = 0; + unsigned long f = 0; - f |= mysql->options.client_flag; - - /* Send client information for access check */ - f |= CLIENT_CAPABILITIES; - - if (f & CLIENT_MULTI_STATEMENTS) { - f |= CLIENT_MULTI_RESULTS; - } - /** - * No compression in embedded as we don't send any data, - * and no pluggable auth, as we cannot do a client-server dialog - */ - f &= ~(CLIENT_COMPRESS | CLIENT_PLUGIN_AUTH); - - if (mysql->options.db != NULL) { - f |= CLIENT_CONNECT_WITH_DB; - } - return f; + f |= mysql->options.client_flag; + + /* Send client information for access check */ + f |= CLIENT_CAPABILITIES; + + if (f & CLIENT_MULTI_STATEMENTS) + { + f |= CLIENT_MULTI_RESULTS; + } + + /** + * No compression in embedded as we don't send any data, + * and no pluggable auth, as we cannot do a client-server dialog + */ + f &= ~(CLIENT_COMPRESS | CLIENT_PLUGIN_AUTH); + + if (mysql->options.db != NULL) + { + f |= CLIENT_CONNECT_WITH_DB; + } + + return f; } - -static bool create_parse_tree( - THD* thd) +static bool create_parse_tree(THD* thd) { - Parser_state parser_state; - bool failp = FALSE; - const char* virtual_db = "skygw_virtual"; + Parser_state parser_state; + bool failp = FALSE; + const char* virtual_db = "skygw_virtual"; - if (parser_state.init(thd, thd->query(), thd->query_length())) - { - failp = TRUE; - goto return_here; - } - thd->reset_for_next_command(); + if (parser_state.init(thd, thd->query(), thd->query_length())) + { + failp = TRUE; + goto return_here; + } - /** - * Set some database to thd so that parsing won't fail because of - * missing database. Then parse. - */ - failp = thd->set_db(virtual_db, strlen(virtual_db)); - if (failp) - { - MXS_ERROR("Failed to set database in thread context."); - } - failp = parse_sql(thd, &parser_state, NULL); + thd->reset_for_next_command(); + + /** + * Set some database to thd so that parsing won't fail because of + * missing database. Then parse. + */ + failp = thd->set_db(virtual_db, strlen(virtual_db)); + + if (failp) + { + MXS_ERROR("Failed to set database in thread context."); + } + + failp = parse_sql(thd, &parser_state, NULL); + + if (failp) + { + MXS_DEBUG("%lu [readwritesplit:create_parse_tree] failed to " + "create parse tree.", + pthread_self()); + } - if (failp) - { - MXS_DEBUG("%lu [readwritesplit:create_parse_tree] failed to " - "create parse tree.", - pthread_self()); - } return_here: - return failp; + return failp; } - -/** +/** * Detect query type by examining parsed representation of it. * - * @param thd MariaDB thread context. + * @param thd MariaDB thread context. + * + * @return Copy of query type value. * - * @return Copy of query type value. * - * * @details Query type is deduced by checking for certain properties * of them. The order is essential. Some SQL commands have multiple * flags set and changing the order in which flags are tested, * the resulting type may be different. * */ -static skygw_query_type_t resolve_query_type( - THD* thd) +static skygw_query_type_t resolve_query_type(THD* thd) { - skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; - u_int32_t type = QUERY_TYPE_UNKNOWN; - int set_autocommit_stmt = -1; /*< -1 no, 0 disable, 1 enable */ - LEX* lex; - Item* item; - /** - * By default, if sql_log_bin, that is, recording data modifications - * to binary log, is disabled, gateway treats operations normally. - * Effectively nothing is replicated. - * When force_data_modify_op_replication is TRUE, gateway distributes - * all write operations to all nodes. - */ + skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; + u_int32_t type = QUERY_TYPE_UNKNOWN; + int set_autocommit_stmt = -1; /*< -1 no, 0 disable, 1 enable */ + LEX* lex; + Item* item; + /** + * By default, if sql_log_bin, that is, recording data modifications + * to binary log, is disabled, gateway treats operations normally. + * Effectively nothing is replicated. + * When force_data_modify_op_replication is TRUE, gateway distributes + * all write operations to all nodes. + */ #if defined(NOT_IN_USE) - bool force_data_modify_op_replication; - force_data_modify_op_replication = FALSE; + bool force_data_modify_op_replication; + force_data_modify_op_replication = FALSE; #endif /* NOT_IN_USE */ - ss_info_dassert(thd != NULL, ("thd is NULL\n")); + ss_info_dassert(thd != NULL, ("thd is NULL\n")); - lex = thd->lex; - - /** SELECT ..INTO variable|OUTFILE|DUMPFILE */ - if (lex->result != NULL) { - type = QUERY_TYPE_GSYSVAR_WRITE; - goto return_qtype; - } - - if (skygw_stmt_causes_implicit_commit( - lex, - &set_autocommit_stmt)) + lex = thd->lex; + + /** SELECT ..INTO variable|OUTFILE|DUMPFILE */ + if (lex->result != NULL) + { + type = QUERY_TYPE_GSYSVAR_WRITE; + goto return_qtype; + } + + if (skygw_stmt_causes_implicit_commit(lex, &set_autocommit_stmt)) + { + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) { - if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) - { - if (sql_command_flags[lex->sql_command] & - CF_IMPLICT_COMMIT_BEGIN) - { - MXS_INFO("Implicit COMMIT before executing the next command."); - } - else if (sql_command_flags[lex->sql_command] & - CF_IMPLICIT_COMMIT_END) - { - MXS_INFO("Implicit COMMIT after executing the next command."); - } - } - - if (set_autocommit_stmt == 1) - { - type |= QUERY_TYPE_ENABLE_AUTOCOMMIT; - } - type |= QUERY_TYPE_COMMIT; - } - - if (set_autocommit_stmt == 0) - { - if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) - { - MXS_INFO("Disable autocommit : implicit START TRANSACTION" - " before executing the next command."); - } - type |= QUERY_TYPE_DISABLE_AUTOCOMMIT; - type |= QUERY_TYPE_BEGIN_TRX; + if (sql_command_flags[lex->sql_command] & + CF_IMPLICT_COMMIT_BEGIN) + { + MXS_INFO("Implicit COMMIT before executing the next command."); + } + else if (sql_command_flags[lex->sql_command] & + CF_IMPLICIT_COMMIT_END) + { + MXS_INFO("Implicit COMMIT after executing the next command."); + } } - if (lex->option_type == OPT_GLOBAL) + if (set_autocommit_stmt == 1) { - /** - * SHOW syntax http://dev.mysql.com/doc/refman/5.6/en/show.html - */ - if (lex->sql_command == SQLCOM_SHOW_VARIABLES) - { - type |= QUERY_TYPE_GSYSVAR_READ; - } - /** - * SET syntax http://dev.mysql.com/doc/refman/5.6/en/set-statement.html - */ - else if (lex->sql_command == SQLCOM_SET_OPTION) - { - type |= QUERY_TYPE_GSYSVAR_WRITE; - } - - /* - * SHOW GLOBAL STATUS - Route to master - */ - else if (lex->sql_command == SQLCOM_SHOW_STATUS) - { - type = QUERY_TYPE_WRITE; - } - /** - * REVOKE ALL, ASSIGN_TO_KEYCACHE, - * PRELOAD_KEYS, FLUSH, RESET, CREATE|ALTER|DROP SERVER - */ - - else - { - type |= QUERY_TYPE_GSYSVAR_WRITE; - } - goto return_qtype; + type |= QUERY_TYPE_ENABLE_AUTOCOMMIT; } - else if (lex->option_type == OPT_SESSION) + + type |= QUERY_TYPE_COMMIT; + } + + if (set_autocommit_stmt == 0) + { + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) { - /** - * SHOW syntax http://dev.mysql.com/doc/refman/5.6/en/show.html - */ - if (lex->sql_command == SQLCOM_SHOW_VARIABLES) - { - type |= QUERY_TYPE_SYSVAR_READ; - } - /** - * SET syntax http://dev.mysql.com/doc/refman/5.6/en/set-statement.html - */ - else if (lex->sql_command == SQLCOM_SET_OPTION) - { - /** Either user- or system variable write */ - type |= QUERY_TYPE_GSYSVAR_WRITE; - } - goto return_qtype; + MXS_INFO("Disable autocommit : implicit START TRANSACTION" + " before executing the next command."); } + + type |= QUERY_TYPE_DISABLE_AUTOCOMMIT; + type |= QUERY_TYPE_BEGIN_TRX; + } + + if (lex->option_type == OPT_GLOBAL) + { /** - * 1:ALTER TABLE, TRUNCATE, REPAIR, OPTIMIZE, ANALYZE, CHECK. - * 2:CREATE|ALTER|DROP|TRUNCATE|RENAME TABLE, LOAD, CREATE|DROP|ALTER DB, - * CREATE|DROP INDEX, CREATE|DROP VIEW, CREATE|DROP TRIGGER, - * CREATE|ALTER|DROP EVENT, UPDATE, INSERT, INSERT(SELECT), - * DELETE, REPLACE, REPLACE(SELECT), CREATE|RENAME|DROP USER, - * GRANT, REVOKE, OPTIMIZE, CREATE|ALTER|DROP FUNCTION|PROCEDURE, - * CREATE SPFUNCTION, INSTALL|UNINSTALL PLUGIN + * SHOW syntax http://dev.mysql.com/doc/refman/5.6/en/show.html */ - if (is_log_table_write_query(lex->sql_command) || - is_update_query(lex->sql_command)) + if (lex->sql_command == SQLCOM_SHOW_VARIABLES) { + type |= QUERY_TYPE_GSYSVAR_READ; + } + /** + * SET syntax http://dev.mysql.com/doc/refman/5.6/en/set-statement.html + */ + else if (lex->sql_command == SQLCOM_SET_OPTION) + { + type |= QUERY_TYPE_GSYSVAR_WRITE; + } + + /* + * SHOW GLOBAL STATUS - Route to master + */ + else if (lex->sql_command == SQLCOM_SHOW_STATUS) + { + type = QUERY_TYPE_WRITE; + } + /** + * REVOKE ALL, ASSIGN_TO_KEYCACHE, + * PRELOAD_KEYS, FLUSH, RESET, CREATE|ALTER|DROP SERVER + */ + + else + { + type |= QUERY_TYPE_GSYSVAR_WRITE; + } + + goto return_qtype; + } + else if (lex->option_type == OPT_SESSION) + { + /** + * SHOW syntax http://dev.mysql.com/doc/refman/5.6/en/show.html + */ + if (lex->sql_command == SQLCOM_SHOW_VARIABLES) + { + type |= QUERY_TYPE_SYSVAR_READ; + } + /** + * SET syntax http://dev.mysql.com/doc/refman/5.6/en/set-statement.html + */ + else if (lex->sql_command == SQLCOM_SET_OPTION) + { + /** Either user- or system variable write */ + type |= QUERY_TYPE_GSYSVAR_WRITE; + } + + goto return_qtype; + } + + /** + * 1:ALTER TABLE, TRUNCATE, REPAIR, OPTIMIZE, ANALYZE, CHECK. + * 2:CREATE|ALTER|DROP|TRUNCATE|RENAME TABLE, LOAD, CREATE|DROP|ALTER DB, + * CREATE|DROP INDEX, CREATE|DROP VIEW, CREATE|DROP TRIGGER, + * CREATE|ALTER|DROP EVENT, UPDATE, INSERT, INSERT(SELECT), + * DELETE, REPLACE, REPLACE(SELECT), CREATE|RENAME|DROP USER, + * GRANT, REVOKE, OPTIMIZE, CREATE|ALTER|DROP FUNCTION|PROCEDURE, + * CREATE SPFUNCTION, INSTALL|UNINSTALL PLUGIN + */ + if (is_log_table_write_query(lex->sql_command) || + is_update_query(lex->sql_command)) + { #if defined(NOT_IN_USE) - if (thd->variables.sql_log_bin == 0 && - force_data_modify_op_replication) - { - /** Not replicated */ - type |= QUERY_TYPE_SESSION_WRITE; - } - else + + if (thd->variables.sql_log_bin == 0 && + force_data_modify_op_replication) + { + /** Not replicated */ + type |= QUERY_TYPE_SESSION_WRITE; + } + else #endif /* NOT_IN_USE */ - { - /** Written to binlog, that is, replicated except tmp tables */ - type |= QUERY_TYPE_WRITE; /*< to master */ - - if (lex->sql_command == SQLCOM_CREATE_TABLE && - (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE)) - { - type |= QUERY_TYPE_CREATE_TMP_TABLE; /*< remember in router */ - } - } - goto return_qtype; + { + /** Written to binlog, that is, replicated except tmp tables */ + type |= QUERY_TYPE_WRITE; /*< to master */ + + if (lex->sql_command == SQLCOM_CREATE_TABLE && + (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE)) + { + type |= QUERY_TYPE_CREATE_TMP_TABLE; /*< remember in router */ + } } - - /** Try to catch session modifications here */ - switch (lex->sql_command) { - /** fallthrough */ - case SQLCOM_CHANGE_DB: - case SQLCOM_DEALLOCATE_PREPARE: - type |= QUERY_TYPE_SESSION_WRITE; - break; - case SQLCOM_SELECT: - case SQLCOM_SHOW_SLAVE_STAT: - type |= QUERY_TYPE_READ; - break; + goto return_qtype; + } - case SQLCOM_CALL: - type |= QUERY_TYPE_WRITE; - break; - - case SQLCOM_BEGIN: - type |= QUERY_TYPE_BEGIN_TRX; - goto return_qtype; - break; - - case SQLCOM_COMMIT: - type |= QUERY_TYPE_COMMIT; - goto return_qtype; - break; - - case SQLCOM_ROLLBACK: - type |= QUERY_TYPE_ROLLBACK; - goto return_qtype; - break; - - case SQLCOM_PREPARE: - type |= QUERY_TYPE_PREPARE_NAMED_STMT; - goto return_qtype; - break; + /** Try to catch session modifications here */ + switch (lex->sql_command) + { + /** fallthrough */ + case SQLCOM_CHANGE_DB: + case SQLCOM_DEALLOCATE_PREPARE: + type |= QUERY_TYPE_SESSION_WRITE; + break; + + case SQLCOM_SELECT: + case SQLCOM_SHOW_SLAVE_STAT: + type |= QUERY_TYPE_READ; + break; + + case SQLCOM_CALL: + type |= QUERY_TYPE_WRITE; + break; + + case SQLCOM_BEGIN: + type |= QUERY_TYPE_BEGIN_TRX; + goto return_qtype; + break; + + case SQLCOM_COMMIT: + type |= QUERY_TYPE_COMMIT; + goto return_qtype; + break; + + case SQLCOM_ROLLBACK: + type |= QUERY_TYPE_ROLLBACK; + goto return_qtype; + break; + + case SQLCOM_PREPARE: + type |= QUERY_TYPE_PREPARE_NAMED_STMT; + goto return_qtype; + break; + + case SQLCOM_SHOW_DATABASES: + type |= QUERY_TYPE_SHOW_DATABASES; + goto return_qtype; + break; + + case SQLCOM_SHOW_TABLES: + type |= QUERY_TYPE_SHOW_TABLES; + goto return_qtype; + break; + + default: + break; + } - case SQLCOM_SHOW_DATABASES: - type |= QUERY_TYPE_SHOW_DATABASES; - goto return_qtype; - break; - - case SQLCOM_SHOW_TABLES: - type |= QUERY_TYPE_SHOW_TABLES; - goto return_qtype; - break; - - default: - break; - } #if defined(UPDATE_VAR_SUPPORT) - if (QTYPE_LESS_RESTRICTIVE_THAN_WRITE(type)) + + if (QTYPE_LESS_RESTRICTIVE_THAN_WRITE(type)) #endif - if (QUERY_IS_TYPE(qtype, QUERY_TYPE_UNKNOWN) || - QUERY_IS_TYPE(qtype, QUERY_TYPE_LOCAL_READ) || - QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) || - QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ) || - QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ) || - QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_READ)) - { - /** - * These values won't change qtype more restrictive than write. - * UDFs and procedures could possibly cause session-wide write, - * but unless their content is replicated this is a limitation - * of this implementation. - * In other words : UDFs and procedures are not allowed to - * perform writes which are not replicated but need to repeat - * in every node. - * It is not sure if such statements exist. vraa 25.10.13 - */ + if (QUERY_IS_TYPE(qtype, QUERY_TYPE_UNKNOWN) || + QUERY_IS_TYPE(qtype, QUERY_TYPE_LOCAL_READ) || + QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) || + QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ) || + QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ) || + QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_READ)) + { + /** + * These values won't change qtype more restrictive than write. + * UDFs and procedures could possibly cause session-wide write, + * but unless their content is replicated this is a limitation + * of this implementation. + * In other words : UDFs and procedures are not allowed to + * perform writes which are not replicated but need to repeat + * in every node. + * It is not sure if such statements exist. vraa 25.10.13 + */ - /** - * Search for system functions, UDFs and stored procedures. - */ - for (item=thd->free_list; item != NULL; item=item->next) { - Item::Type itype; - - itype = item->type(); - MXS_DEBUG("%lu [resolve_query_type] Item %s:%s", - pthread_self(), - item->name, - STRITEMTYPE(itype)); - - if (itype == Item::SUBSELECT_ITEM) { - continue; - } - else if (itype == Item::FUNC_ITEM) - { - int func_qtype = QUERY_TYPE_UNKNOWN; - /** - * Item types: - * FIELD_ITEM = 0, FUNC_ITEM, - * SUM_FUNC_ITEM, STRING_ITEM, INT_ITEM, - * REAL_ITEM, NULL_ITEM, VARBIN_ITEM, - * COPY_STR_ITEM, FIELD_AVG_ITEM, - * DEFAULT_VALUE_ITEM, PROC_ITEM, - * COND_ITEM, REF_ITEM, FIELD_STD_ITEM, - * FIELD_VARIANCE_ITEM, - * INSERT_VALUE_ITEM, - * SUBSELECT_ITEM, ROW_ITEM, CACHE_ITEM, - * TYPE_HOLDER, PARAM_ITEM, - * TRIGGER_FIELD_ITEM, DECIMAL_ITEM, - * XPATH_NODESET, XPATH_NODESET_CMP, - * VIEW_FIXER_ITEM, - * EXPR_CACHE_ITEM == 27 - **/ - - Item_func::Functype ftype; - ftype = ((Item_func*)item)->functype(); - /** - * Item_func types: - * - * UNKNOWN_FUNC = 0,EQ_FUNC, EQUAL_FUNC, - * NE_FUNC, LT_FUNC, LE_FUNC, - * GE_FUNC, GT_FUNC, FT_FUNC, - * LIKE_FUNC == 10, ISNULL_FUNC, ISNOTNULL_FUNC, - * COND_AND_FUNC, COND_OR_FUNC, XOR_FUNC, - * BETWEEN, IN_FUNC, - * MULT_EQUAL_FUNC, INTERVAL_FUNC, - * ISNOTNULLTEST_FUNC == 20, - * SP_EQUALS_FUNC, SP_DISJOINT_FUNC, - * SP_INTERSECTS_FUNC, - * SP_TOUCHES_FUNC, SP_CROSSES_FUNC, - * SP_WITHIN_FUNC, SP_CONTAINS_FUNC, - * SP_OVERLAPS_FUNC, - * SP_STARTPOINT, SP_ENDPOINT == 30, - * SP_EXTERIORRING, SP_POINTN, SP_GEOMETRYN, - * SP_INTERIORRINGN,NOT_FUNC, NOT_ALL_FUNC, - * NOW_FUNC, TRIG_COND_FUNC, - * SUSERVAR_FUNC, GUSERVAR_FUNC == 40, - * COLLATE_FUNC, EXTRACT_FUNC, - * CHAR_TYPECAST_FUNC, - * FUNC_SP, UDF_FUNC, NEG_FUNC, - * GSYSVAR_FUNC == 47 - **/ - switch (ftype) { - case Item_func::FUNC_SP: - /** - * An unknown (for maxscale) function / sp - * belongs to this category. - */ - func_qtype |= QUERY_TYPE_WRITE; - MXS_DEBUG("%lu [resolve_query_type] " - "functype FUNC_SP, stored proc " - "or unknown function.", - pthread_self()); - break; - case Item_func::UDF_FUNC: - func_qtype |= QUERY_TYPE_WRITE; - MXS_DEBUG("%lu [resolve_query_type] " - "functype UDF_FUNC, user-defined " - "function.", - pthread_self()); - break; - case Item_func::NOW_FUNC: - func_qtype |= QUERY_TYPE_LOCAL_READ; - MXS_DEBUG("%lu [resolve_query_type] " - "functype NOW_FUNC, could be " - "executed in MaxScale.", - pthread_self()); - break; - /** System session variable */ - case Item_func::GSYSVAR_FUNC: - func_qtype |= QUERY_TYPE_SYSVAR_READ; - MXS_DEBUG("%lu [resolve_query_type] " - "functype GSYSVAR_FUNC, system " - "variable read.", - pthread_self()); - break; - /** User-defined variable read */ - case Item_func::GUSERVAR_FUNC: - func_qtype |= QUERY_TYPE_USERVAR_READ; - MXS_DEBUG("%lu [resolve_query_type] " - "functype GUSERVAR_FUNC, user " - "variable read.", - pthread_self()); - break; - /** User-defined variable modification */ - case Item_func::SUSERVAR_FUNC: - /** - * Really it is user variable but we - * don't separate sql variables atm. - * 15.9.14 - */ - func_qtype |= QUERY_TYPE_GSYSVAR_WRITE; - MXS_DEBUG("%lu [resolve_query_type] " - "functype SUSERVAR_FUNC, user " - "variable write.", - pthread_self()); - break; - case Item_func::UNKNOWN_FUNC: + /** + * Search for system functions, UDFs and stored procedures. + */ + for (item = thd->free_list; item != NULL; item = item->next) + { + Item::Type itype; - if (((Item_func*)item)->func_name () != NULL && - strcmp((char*)((Item_func*)item)->func_name (), "last_insert_id") == 0) - { - func_qtype |= QUERY_TYPE_MASTER_READ; - } - else - { - func_qtype |= QUERY_TYPE_READ; - } - /** - * Many built-in functions are of this - * type, for example, rand(), soundex(), - * repeat() . - */ - MXS_DEBUG("%lu [resolve_query_type] " - "functype UNKNOWN_FUNC, " - "typically some system function.", - pthread_self()); - break; - default: - MXS_DEBUG("%lu [resolve_query_type] " - "Functype %d.", - pthread_self(), - ftype); - break; - } /**< switch */ - /**< Set new query type */ - type |= func_qtype; - } -#if defined(UPDATE_VAR_SUPPORT) + itype = item->type(); + MXS_DEBUG("%lu [resolve_query_type] Item %s:%s", + pthread_self(), item->name, STRITEMTYPE(itype)); + + if (itype == Item::SUBSELECT_ITEM) + { + continue; + } + else if (itype == Item::FUNC_ITEM) + { + int func_qtype = QUERY_TYPE_UNKNOWN; + /** + * Item types: + * FIELD_ITEM = 0, FUNC_ITEM, + * SUM_FUNC_ITEM, STRING_ITEM, INT_ITEM, + * REAL_ITEM, NULL_ITEM, VARBIN_ITEM, + * COPY_STR_ITEM, FIELD_AVG_ITEM, + * DEFAULT_VALUE_ITEM, PROC_ITEM, + * COND_ITEM, REF_ITEM, FIELD_STD_ITEM, + * FIELD_VARIANCE_ITEM, + * INSERT_VALUE_ITEM, + * SUBSELECT_ITEM, ROW_ITEM, CACHE_ITEM, + * TYPE_HOLDER, PARAM_ITEM, + * TRIGGER_FIELD_ITEM, DECIMAL_ITEM, + * XPATH_NODESET, XPATH_NODESET_CMP, + * VIEW_FIXER_ITEM, + * EXPR_CACHE_ITEM == 27 + **/ + + Item_func::Functype ftype; + ftype = ((Item_func*) item)->functype(); + + /** + * Item_func types: + * + * UNKNOWN_FUNC = 0,EQ_FUNC, EQUAL_FUNC, + * NE_FUNC, LT_FUNC, LE_FUNC, + * GE_FUNC, GT_FUNC, FT_FUNC, + * LIKE_FUNC == 10, ISNULL_FUNC, ISNOTNULL_FUNC, + * COND_AND_FUNC, COND_OR_FUNC, XOR_FUNC, + * BETWEEN, IN_FUNC, + * MULT_EQUAL_FUNC, INTERVAL_FUNC, + * ISNOTNULLTEST_FUNC == 20, + * SP_EQUALS_FUNC, SP_DISJOINT_FUNC, + * SP_INTERSECTS_FUNC, + * SP_TOUCHES_FUNC, SP_CROSSES_FUNC, + * SP_WITHIN_FUNC, SP_CONTAINS_FUNC, + * SP_OVERLAPS_FUNC, + * SP_STARTPOINT, SP_ENDPOINT == 30, + * SP_EXTERIORRING, SP_POINTN, SP_GEOMETRYN, + * SP_INTERIORRINGN,NOT_FUNC, NOT_ALL_FUNC, + * NOW_FUNC, TRIG_COND_FUNC, + * SUSERVAR_FUNC, GUSERVAR_FUNC == 40, + * COLLATE_FUNC, EXTRACT_FUNC, + * CHAR_TYPECAST_FUNC, + * FUNC_SP, UDF_FUNC, NEG_FUNC, + * GSYSVAR_FUNC == 47 + **/ + switch (ftype) + { + case Item_func::FUNC_SP: /** - * Write is as restrictive as it gets due functions, - * so break. + * An unknown (for maxscale) function / sp + * belongs to this category. */ - if ((type & QUERY_TYPE_WRITE) == QUERY_TYPE_WRITE) { - break; + func_qtype |= QUERY_TYPE_WRITE; + MXS_DEBUG("%lu [resolve_query_type] " + "functype FUNC_SP, stored proc " + "or unknown function.", + pthread_self()); + break; + + case Item_func::UDF_FUNC: + func_qtype |= QUERY_TYPE_WRITE; + MXS_DEBUG("%lu [resolve_query_type] " + "functype UDF_FUNC, user-defined " + "function.", + pthread_self()); + break; + + case Item_func::NOW_FUNC: + func_qtype |= QUERY_TYPE_LOCAL_READ; + MXS_DEBUG("%lu [resolve_query_type] " + "functype NOW_FUNC, could be " + "executed in MaxScale.", + pthread_self()); + break; + + /** System session variable */ + case Item_func::GSYSVAR_FUNC: + func_qtype |= QUERY_TYPE_SYSVAR_READ; + MXS_DEBUG("%lu [resolve_query_type] " + "functype GSYSVAR_FUNC, system " + "variable read.", + pthread_self()); + break; + + /** User-defined variable read */ + case Item_func::GUSERVAR_FUNC: + func_qtype |= QUERY_TYPE_USERVAR_READ; + MXS_DEBUG("%lu [resolve_query_type] " + "functype GUSERVAR_FUNC, user " + "variable read.", + pthread_self()); + break; + + /** User-defined variable modification */ + case Item_func::SUSERVAR_FUNC: + /** + * Really it is user variable but we + * don't separate sql variables atm. + * 15.9.14 + */ + func_qtype |= QUERY_TYPE_GSYSVAR_WRITE; + MXS_DEBUG("%lu [resolve_query_type] " + "functype SUSERVAR_FUNC, user " + "variable write.", + pthread_self()); + break; + + case Item_func::UNKNOWN_FUNC: + + if (((Item_func*) item)->func_name() != NULL && + strcmp((char*) ((Item_func*) item)->func_name(), "last_insert_id") == 0) + { + func_qtype |= QUERY_TYPE_MASTER_READ; } + else + { + func_qtype |= QUERY_TYPE_READ; + } + + /** + * Many built-in functions are of this + * type, for example, rand(), soundex(), + * repeat() . + */ + MXS_DEBUG("%lu [resolve_query_type] " + "functype UNKNOWN_FUNC, " + "typically some system function.", + pthread_self()); + break; + + default: + MXS_DEBUG("%lu [resolve_query_type] " + "Functype %d.", + pthread_self(), + ftype); + break; + } /**< switch */ + + /**< Set new query type */ + type |= func_qtype; + } + +#if defined(UPDATE_VAR_SUPPORT) + + /** + * Write is as restrictive as it gets due functions, + * so break. + */ + if ((type & QUERY_TYPE_WRITE) == QUERY_TYPE_WRITE) + { + break; + } + #endif - } /**< for */ - } /**< if */ + } /**< for */ + } /**< if */ + return_qtype: - qtype = (skygw_query_type_t)type; - return qtype; + qtype = (skygw_query_type_t) type; + return qtype; } /** * Checks if statement causes implicit COMMIT. - * autocommit_stmt gets values 1, 0 or -1 if stmt is enable, disable or - * something else than autocommit. - * - * @param lex Parse tree - * @param autocommit_stmt memory address for autocommit status - * + * autocommit_stmt gets values 1, 0 or -1 if stmt is enable, disable or + * something else than autocommit. + * + * @param lex Parse tree + * @param autocommit_stmt memory address for autocommit status + * * @return true if statement causes implicit commit and false otherwise */ -static bool skygw_stmt_causes_implicit_commit( - LEX* lex, - int* autocommit_stmt) +static bool skygw_stmt_causes_implicit_commit(LEX* lex, int* autocommit_stmt) { - bool succp; + bool succp; - if (!(sql_command_flags[lex->sql_command] & CF_AUTO_COMMIT_TRANS)) + if (!(sql_command_flags[lex->sql_command] & CF_AUTO_COMMIT_TRANS)) + { + succp = false; + goto return_succp; + } + + switch (lex->sql_command) + { + case SQLCOM_DROP_TABLE: + succp = !(lex->create_info.options & HA_LEX_CREATE_TMP_TABLE); + break; + + case SQLCOM_ALTER_TABLE: + case SQLCOM_CREATE_TABLE: + /* If CREATE TABLE of non-temporary table, do implicit commit */ + succp = !(lex->create_info.options & HA_LEX_CREATE_TMP_TABLE); + break; + + case SQLCOM_SET_OPTION: + if ((*autocommit_stmt = is_autocommit_stmt(lex)) == 1) { - succp = false; - goto return_succp; + succp = true; } - - switch (lex->sql_command) { - case SQLCOM_DROP_TABLE: - succp = !(lex->create_info.options & HA_LEX_CREATE_TMP_TABLE); - break; - case SQLCOM_ALTER_TABLE: - case SQLCOM_CREATE_TABLE: - /* If CREATE TABLE of non-temporary table, do implicit commit */ - succp = !(lex->create_info.options & HA_LEX_CREATE_TMP_TABLE); - break; - case SQLCOM_SET_OPTION: - if ((*autocommit_stmt = is_autocommit_stmt(lex)) == 1) - { - succp = true; - } - else - { - succp = false; - } - break; - default: - succp = true; - break; + else + { + succp = false; } -return_succp: - return succp; -} + break; + + default: + succp = true; + break; + } + +return_succp: + return succp; +} /** * Finds out if stmt is SET autocommit * and if the new value matches with the enable_cmd argument. - * - * @param lex parse tree - * + * + * @param lex parse tree + * * @return 1, 0, or -1 if command was: * enable, disable, or not autocommit, respectively. */ -static int is_autocommit_stmt( - LEX* lex) +static int is_autocommit_stmt(LEX* lex) { - struct list_node* node; - set_var* setvar; - int rc = -1; - static char target[8]; /*< for converted string */ - Item* item = NULL; - - node = lex->var_list.first_node(); - setvar=(set_var*)node->info; - - if (setvar == NULL) + struct list_node* node; + set_var* setvar; + int rc = -1; + static char target[8]; /*< for converted string */ + Item* item = NULL; + + node = lex->var_list.first_node(); + setvar = (set_var*) node->info; + + if (setvar == NULL) + { + goto return_rc; + } + + do /*< Search for the last occurrence of 'autocommit' */ + { + if ((sys_var*) setvar->var == Sys_autocommit_ptr) { - goto return_rc; - } - - do /*< Search for the last occurrence of 'autocommit' */ - { - if ((sys_var*)setvar->var == Sys_autocommit_ptr) - { - item = setvar->value; - } - node = node->next; - } while ((setvar = (set_var*)node->info) != NULL); - - if (item != NULL) /*< found autocommit command */ - { - if (item->type() == Item::INT_ITEM) /*< '0' or '1' */ - { - rc = item->val_int(); - - if (rc > 1 || rc < 0) - { - rc = -1; - } - } - else if (item->type() == Item::STRING_ITEM) /*< 'on' or 'off' */ - { - String str(target, sizeof(target), system_charset_info); - String* res = item->val_str(&str); - - if ((rc = find_type(&bool_typelib, res->ptr(), res->length(), false))) - { - ss_dassert(rc >= 0 && rc <= 2); - /** - * rc is the position of matchin string in - * typelib's value array. - * 1=OFF, 2=ON. - */ - rc -= 1; - } - } + item = setvar->value; } + node = node->next; + } + while ((setvar = (set_var*) node->info) != NULL); + + if (item != NULL) /*< found autocommit command */ + { + if (item->type() == Item::INT_ITEM) /*< '0' or '1' */ + { + rc = item->val_int(); + + if (rc > 1 || rc < 0) + { + rc = -1; + } + } + else if (item->type() == Item::STRING_ITEM) /*< 'on' or 'off' */ + { + String str(target, sizeof (target), system_charset_info); + String* res = item->val_str(&str); + + if ((rc = find_type(&bool_typelib, res->ptr(), res->length(), false))) + { + ss_dassert(rc >= 0 && rc <= 2); + /** + * rc is the position of matchin string in + * typelib's value array. + * 1=OFF, 2=ON. + */ + rc -= 1; + } + } + } + return_rc: - return rc; + return rc; } #if defined(NOT_USED) -char* skygw_query_classifier_get_stmtname( - GWBUF* buf) + +char* +skygw_query_classifier_get_stmtname( + GWBUF* buf) { - MYSQL* mysql; - - if (buf == NULL || - buf->gwbuf_bufobj == NULL || - buf->gwbuf_bufobj->bo_data == NULL || - (mysql = (MYSQL *)((parsing_info_t *)buf->gwbuf_bufobj->bo_data)->pi_handle) == NULL || - mysql->thd == NULL || - (THD *)(mysql->thd))->lex == NULL || - (THD *)(mysql->thd))->lex->prepared_stmt_name == NULL) - { - return NULL; - } - return ((THD *)(mysql->thd))->lex->prepared_stmt_name.str; + MYSQL* mysql; + + if (buf == NULL || + buf->gwbuf_bufobj == NULL || + buf->gwbuf_bufobj->bo_data == NULL || + (mysql = (MYSQL *) ((parsing_info_t *) buf->gwbuf_bufobj->bo_data)->pi_handle) == NULL || + mysql->thd == NULL || + (THD *) (mysql->thd))->lex == NULL || + (THD *) (mysql->thd))->lex->prepared_stmt_name == NULL) + { + return NULL; + } + + return ((THD *) (mysql->thd))->lex->prepared_stmt_name.str; } #endif /** * Get the parse tree from parsed querybuf. - * @param querybuf The parsed GWBUF - * + * @param querybuf The parsed GWBUF + * * @return Pointer to the LEX struct or NULL if an error occurred or the query * was not parsed */ LEX* get_lex(GWBUF* querybuf) { - parsing_info_t* pi; - MYSQL* mysql; - THD* thd; - - if (querybuf == NULL || !GWBUF_IS_PARSED(querybuf)) - { - return NULL; - } - pi = (parsing_info_t *)gwbuf_get_buffer_object_data(querybuf, - GWBUF_PARSING_INFO); + parsing_info_t* pi; + MYSQL* mysql; + THD* thd; - if (pi == NULL) - { - return NULL; - } - - if ((mysql = (MYSQL *)pi->pi_handle) == NULL || - (thd = (THD *)mysql->thd) == NULL) - { - ss_dassert(mysql != NULL && thd != NULL); - return NULL; - } - return thd->lex; + if (querybuf == NULL || !GWBUF_IS_PARSED(querybuf)) + { + return NULL; + } + + pi = (parsing_info_t *) gwbuf_get_buffer_object_data(querybuf, GWBUF_PARSING_INFO); + + if (pi == NULL) + { + return NULL; + } + + if ((mysql = (MYSQL *) pi->pi_handle) == NULL || + (thd = (THD *) mysql->thd) == NULL) + { + ss_dassert(mysql != NULL && thd != NULL); + return NULL; + } + + return thd->lex; } - - /** * Finds the head of the list of tables affected by the current select statement. * @param thd Pointer to a valid THD @@ -962,23 +997,20 @@ LEX* get_lex(GWBUF* querybuf) */ static void* skygw_get_affected_tables(void* lexptr) { - LEX* lex = (LEX*)lexptr; - - if(lex == NULL || - lex->current_select == NULL) - { - ss_dassert(lex != NULL && - lex->current_select != NULL); - return NULL; - } + LEX* lex = (LEX*) lexptr; - return (void*)lex->current_select->table_list.first; + if (lex == NULL || lex->current_select == NULL) + { + ss_dassert(lex != NULL && lex->current_select != NULL); + return NULL; + } + + return (void*) lex->current_select->table_list.first; } - /** * Reads the parsetree and lists all the affected tables and views in the query. - * In the case of an error, the size of the table is set to zero and no memory + * In the case of an error, the size of the table is set to zero and no memory * is allocated. The caller must free the allocated memory. * * @param querybuf GWBUF where the table names are extracted from @@ -987,85 +1019,89 @@ static void* skygw_get_affected_tables(void* lexptr) */ char** skygw_get_table_names(GWBUF* querybuf, int* tblsize, bool fullnames) { - LEX* lex; - TABLE_LIST* tbl; - int i = 0, - currtblsz = 0; - char **tables = NULL, - **tmp = NULL; + LEX* lex; + TABLE_LIST* tbl; + int i = 0, currtblsz = 0; + char **tables = NULL, **tmp = NULL; - if(querybuf == NULL || - tblsize == NULL || - (lex = get_lex(querybuf)) == NULL || - lex->current_select == NULL) - { - goto retblock; - } + if (querybuf == NULL || tblsize == NULL || + (lex = get_lex(querybuf)) == NULL || lex->current_select == NULL) + { + goto retblock; + } - lex->current_select = lex->all_selects_list; + lex->current_select = lex->all_selects_list; - while(lex->current_select) - { - tbl = (TABLE_LIST*)skygw_get_affected_tables(lex); + while (lex->current_select) + { + tbl = (TABLE_LIST*) skygw_get_affected_tables(lex); - while (tbl) - { - if (i >= currtblsz) - { - tmp = (char**)malloc(sizeof(char*)*(currtblsz*2+1)); + while (tbl) + { + if (i >= currtblsz) + { + tmp = (char**) malloc(sizeof (char*)*(currtblsz * 2 + 1)); + + if (tmp) + { + if (currtblsz > 0) + { + for (int x = 0; x < currtblsz; x++) + { + tmp[x] = tables[x]; + } + + free(tables); + } + + tables = tmp; + currtblsz = currtblsz * 2 + 1; + } + } + + if (tmp != NULL) + { + char *catnm = NULL; + + if (fullnames) + { + if (tbl->db && + strcmp(tbl->db, "skygw_virtual") != 0) + { + catnm = (char*) calloc(strlen(tbl->db) + + strlen(tbl->table_name) + + 2, + sizeof (char)); + strcpy(catnm, tbl->db); + strcat(catnm, "."); + strcat(catnm, tbl->table_name); + } + } + + if (catnm) + { + tables[i++] = catnm; + } + else + { + tables[i++] = strdup(tbl->table_name); + } + + tbl = tbl->next_local; + } + } /*< while (tbl) */ + + lex->current_select = lex->current_select->next_select_in_list(); + } /*< while(lex->current_select) */ - if(tmp) - { - if(currtblsz > 0) - { - int x; - for(x = 0; xdb && - strcmp(tbl->db,"skygw_virtual") != 0) - { - catnm = (char*)calloc(strlen(tbl->db) + - strlen(tbl->table_name) + - 2, - sizeof(char)); - strcpy(catnm,tbl->db); - strcat(catnm,"."); - strcat(catnm,tbl->table_name); - } - } - - if(catnm) - { - tables[i++] = catnm; - } - else - { - tables[i++] = strdup(tbl->table_name); - } - tbl=tbl->next_local; - } - } /*< while (tbl) */ - lex->current_select = lex->current_select->next_select_in_list(); - } /*< while(lex->current_select) */ retblock: - if(tblsize) - *tblsize = i; - return tables; + + if (tblsize) + { + *tblsize = i; + } + + return tables; } /** @@ -1075,72 +1111,76 @@ retblock: */ char* skygw_get_created_table_name(GWBUF* querybuf) { - LEX* lex; - - if(querybuf == NULL || (lex = get_lex(querybuf)) == NULL) - { - return NULL; - } + LEX* lex; - if (lex->create_last_non_select_table && - lex->create_last_non_select_table->table_name) - { - char* name = strdup(lex->create_last_non_select_table->table_name); - return name; - } - else - { - return NULL; - } + if (querybuf == NULL || (lex = get_lex(querybuf)) == NULL) + { + return NULL; + } + + if (lex->create_last_non_select_table && + lex->create_last_non_select_table->table_name) + { + char* name = strdup(lex->create_last_non_select_table->table_name); + return name; + } + else + { + return NULL; + } } /** - * Checks whether the query is a "real" query ie. SELECT,UPDATE,INSERT,DELETE or - * any variation of these. Queries that affect the underlying database are not - * considered as real queries and the queries that target specific row or + * Checks whether the query is a "real" query ie. SELECT,UPDATE,INSERT,DELETE or + * any variation of these. Queries that affect the underlying database are not + * considered as real queries and the queries that target specific row or * variable data are regarded as the real queries. - * + * * @param GWBUF to analyze - * + * * @return true if the query is a real query, otherwise false */ bool skygw_is_real_query(GWBUF* querybuf) { - bool succp; - LEX* lex; - - if (querybuf == NULL || - (lex = get_lex(querybuf)) == NULL) - { - succp = false; - goto retblock; - } - switch(lex->sql_command) { - case SQLCOM_SELECT: - succp = lex->all_selects_list->table_list.elements > 0; - goto retblock; - break; - case SQLCOM_UPDATE: - case SQLCOM_INSERT: - case SQLCOM_INSERT_SELECT: - case SQLCOM_DELETE: - case SQLCOM_TRUNCATE: - case SQLCOM_REPLACE: - case SQLCOM_REPLACE_SELECT: - case SQLCOM_PREPARE: - case SQLCOM_EXECUTE: - succp = true; - goto retblock; - break; - default: - succp = false; - goto retblock; - break; - } -retblock: - return succp; -} + bool succp; + LEX* lex; + if (querybuf == NULL || + (lex = get_lex(querybuf)) == NULL) + { + succp = false; + goto retblock; + } + + switch (lex->sql_command) + { + case SQLCOM_SELECT: + succp = lex->all_selects_list->table_list.elements > 0; + goto retblock; + break; + + case SQLCOM_UPDATE: + case SQLCOM_INSERT: + case SQLCOM_INSERT_SELECT: + case SQLCOM_DELETE: + case SQLCOM_TRUNCATE: + case SQLCOM_REPLACE: + case SQLCOM_REPLACE_SELECT: + case SQLCOM_PREPARE: + case SQLCOM_EXECUTE: + succp = true; + goto retblock; + break; + + default: + succp = false; + goto retblock; + break; + } + +retblock: + return succp; +} /** * Checks whether the buffer contains a DROP TABLE... query. @@ -1149,42 +1189,50 @@ retblock: */ bool is_drop_table_query(GWBUF* querybuf) { - LEX* lex; - - return (querybuf != NULL && - (lex = get_lex(querybuf)) != NULL && - lex->sql_command == SQLCOM_DROP_TABLE); + LEX* lex; + + return (querybuf != NULL && + (lex = get_lex(querybuf)) != NULL && + lex->sql_command == SQLCOM_DROP_TABLE); } - - inline void add_str(char** buf, int* buflen, int* bufsize, char* str) { - int isize = strlen(str) + 1; - if(*buf == NULL || isize + *buflen >= *bufsize) - { - *bufsize = (*bufsize) * 2 + isize; - char *tmp = (char*)realloc(*buf,(*bufsize)* sizeof(char)); - if(tmp == NULL){ - MXS_ERROR("Error: memory reallocation failed."); - free(*buf); - *buf = NULL; - *bufsize = 0; - } - *buf = tmp; - } + int isize = strlen(str) + 1; + + if (*buf == NULL || isize + *buflen >= *bufsize) + { + *bufsize = (*bufsize) * 2 + isize; + char *tmp = (char*) realloc(*buf, (*bufsize) * sizeof (char)); + + if (tmp == NULL) + { + MXS_ERROR("Error: memory reallocation failed."); + free(*buf); + *buf = NULL; + *bufsize = 0; + } + + *buf = tmp; + } + + if (*buflen > 0) + { + if (*buf) + { + strcat(*buf, " "); + } + } + + if (*buf) + { + strcat(*buf, str); + } + + *buflen += isize; - if(*buflen > 0){ - if(*buf) - strcat(*buf," "); - } - if(*buf) - strcat(*buf,str); - *buflen += isize; - } - /** * Returns all the fields that the query affects. * @param buf Buffer to parse @@ -1192,413 +1240,444 @@ inline void add_str(char** buf, int* buflen, int* bufsize, char* str) */ char* skygw_get_affected_fields(GWBUF* buf) { - LEX* lex; - int buffsz = 0,bufflen = 0; - char* where = NULL; - Item* item; - Item::Type itype; + LEX* lex; + int buffsz = 0, bufflen = 0; + char* where = NULL; + Item* item; + Item::Type itype; - if(!query_is_parsed(buf)){ - parse_query(buf); - } + if (!query_is_parsed(buf)) + { + parse_query(buf); + } - if((lex = get_lex(buf)) == NULL){ - return NULL; - } - - lex->current_select = lex->all_selects_list; - if((where = (char*)malloc(sizeof(char)*1)) == NULL) + if ((lex = get_lex(buf)) == NULL) + { + return NULL; + } + + lex->current_select = lex->all_selects_list; + + if ((where = (char*) malloc(sizeof (char)*1)) == NULL) + { + MXS_ERROR("Memory allocation failed."); + return NULL; + } + + *where = '\0'; + + while (lex->current_select) + { + + List_iterator ilist(lex->current_select->item_list); + item = (Item*) ilist.next(); + + for (; item != NULL; item = (Item*) ilist.next()) { - MXS_ERROR("Memory allocation failed."); - return NULL; + + itype = item->type(); + + if (item->name && itype == Item::FIELD_ITEM) + { + add_str(&where, &buffsz, &bufflen, item->name); + } } - *where = '\0'; - while(lex->current_select) - { - - List_iterator ilist(lex->current_select->item_list); - item = (Item*)ilist.next(); - for (; item != NULL; item=(Item*)ilist.next()) - { - itype = item->type(); - if(item->name && itype == Item::FIELD_ITEM){ - add_str(&where,&buffsz,&bufflen,item->name); - } - } - - if(lex->current_select->where){ - for (item=lex->current_select->where; item != NULL; item=item->next) - { + if (lex->current_select->where) + { + for (item = lex->current_select->where; item != NULL; item = item->next) + { - itype = item->type(); - if(item->name && itype == Item::FIELD_ITEM){ - add_str(&where,&buffsz,&bufflen,item->name); - } - } - } + itype = item->type(); - if(lex->current_select->having){ - for (item=lex->current_select->having; item != NULL; item=item->next) - { - - itype = item->type(); - if(item->name && itype == Item::FIELD_ITEM){ - add_str(&where,&buffsz,&bufflen,item->name); - } - } - } + if (item->name && itype == Item::FIELD_ITEM) + { + add_str(&where, &buffsz, &bufflen, item->name); + } + } + } - lex->current_select = lex->current_select->next_select_in_list(); - } - return where; + if (lex->current_select->having) + { + for (item = lex->current_select->having; item != NULL; item = item->next) + { + + itype = item->type(); + + if (item->name && itype == Item::FIELD_ITEM) + { + add_str(&where, &buffsz, &bufflen, item->name); + } + } + } + + lex->current_select = lex->current_select->next_select_in_list(); + } + + return where; } bool skygw_query_has_clause(GWBUF* buf) { - LEX* lex; - SELECT_LEX* current; - bool clause = false; - - if(!query_is_parsed(buf)){ - parse_query(buf); - } + LEX* lex; + SELECT_LEX* current; + bool clause = false; + + if (!query_is_parsed(buf)) + { + parse_query(buf); + } + + if ((lex = get_lex(buf)) == NULL) + { + return false; + } - if((lex = get_lex(buf)) == NULL){ - return false; - } - current = lex->all_selects_list; - - while(current) - { - if(current->where || current->having){ - clause = true; - } - - current = current->next_select_in_list(); - } - return clause; + + while (current) + { + if (current->where || current->having) + { + clause = true; + } + + current = current->next_select_in_list(); + } + + return clause; } /* * Replace user-provided literals with question marks. Return a copy of the * querystr with replacements. - * + * * @param querybuf GWBUF buffer including necessary parsing info - * + * * @return Copy of querystr where literals are replaces with question marks or * NULL if querystr is NULL, thread context or lex are NULL or if replacement * function fails. - * + * * Replaced literal types are STRING_ITEM,INT_ITEM,DECIMAL_ITEM,REAL_ITEM, * VARBIN_ITEM,NULL_ITEM */ -char* skygw_get_canonical( - GWBUF* querybuf) +char* skygw_get_canonical(GWBUF* querybuf) { - parsing_info_t* pi; - MYSQL* mysql; - THD* thd; - LEX* lex; - Item* item; - char* querystr; - - if (querybuf == NULL || - !GWBUF_IS_PARSED(querybuf)) - { - querystr = NULL; - goto retblock; - } - pi = (parsing_info_t *)gwbuf_get_buffer_object_data(querybuf, - GWBUF_PARSING_INFO); - CHK_PARSING_INFO(pi); - - if (pi == NULL) - { - querystr = NULL; - goto retblock; - } - - if (pi->pi_query_plain_str == NULL || - (mysql = (MYSQL *)pi->pi_handle) == NULL || - (thd = (THD *)mysql->thd) == NULL || - (lex = thd->lex) == NULL) - { - ss_dassert(pi->pi_query_plain_str != NULL && - mysql != NULL && - thd != NULL && - lex != NULL); - querystr = NULL; - goto retblock; - } - querystr = strdup(pi->pi_query_plain_str); + parsing_info_t* pi; + MYSQL* mysql; + THD* thd; + LEX* lex; + Item* item; + char* querystr; - for (item=thd->free_list; item != NULL; item=item->next) - { - Item::Type itype; - - if (item->name == NULL) - { - continue; - } - itype = item->type(); + if (querybuf == NULL || + !GWBUF_IS_PARSED(querybuf)) + { + querystr = NULL; + goto retblock; + } - if (itype == Item::STRING_ITEM) - { - String tokenstr; - String* res = item->val_str_ascii(&tokenstr); - - if (res->is_empty()) /*< empty string */ - { - querystr = replace_literal(querystr, "\"\"", "\"?\""); - } - else - { - querystr = replace_literal(querystr, res->ptr(), "?"); - } - } - else if (itype == Item::INT_ITEM || - itype == Item::DECIMAL_ITEM || - itype == Item::REAL_ITEM || - itype == Item::VARBIN_ITEM || - itype == Item::NULL_ITEM) - { - querystr = replace_literal(querystr, item->name, "?"); - } - } /*< for */ + pi = (parsing_info_t *) gwbuf_get_buffer_object_data(querybuf, + GWBUF_PARSING_INFO); + CHK_PARSING_INFO(pi); + + if (pi == NULL) + { + querystr = NULL; + goto retblock; + } + + if (pi->pi_query_plain_str == NULL || + (mysql = (MYSQL *) pi->pi_handle) == NULL || + (thd = (THD *) mysql->thd) == NULL || + (lex = thd->lex) == NULL) + { + ss_dassert(pi->pi_query_plain_str != NULL && + mysql != NULL && + thd != NULL && + lex != NULL); + querystr = NULL; + goto retblock; + } + + querystr = strdup(pi->pi_query_plain_str); + + for (item = thd->free_list; item != NULL; item = item->next) + { + Item::Type itype; + + if (item->name == NULL) + { + continue; + } + + itype = item->type(); + + if (itype == Item::STRING_ITEM) + { + String tokenstr; + String* res = item->val_str_ascii(&tokenstr); + + if (res->is_empty()) /*< empty string */ + { + querystr = replace_literal(querystr, "\"\"", "\"?\""); + } + else + { + querystr = replace_literal(querystr, res->ptr(), "?"); + } + } + else if (itype == Item::INT_ITEM || + itype == Item::DECIMAL_ITEM || + itype == Item::REAL_ITEM || + itype == Item::VARBIN_ITEM || + itype == Item::NULL_ITEM) + { + querystr = replace_literal(querystr, item->name, "?"); + } + } /*< for */ /** Check for SET ... options with no Item classes */ if (thd->free_list == NULL) { char *replaced = replace_quoted(querystr); + if (replaced) { free(querystr); querystr = replaced; } } + retblock: - return querystr; + return querystr; } - /** - * Create parsing information; initialize mysql handle, allocate parsing info + * Create parsing information; initialize mysql handle, allocate parsing info * struct and set handle and free function pointer to it. - * + * * @param donefun pointer to free function - * + * * @return pointer to parsing information */ -parsing_info_t* parsing_info_init( - void (*donefun)(void *)) +parsing_info_t* parsing_info_init(void (*donefun)(void *)) { - parsing_info_t* pi = NULL; - MYSQL* mysql; - const char* user = "skygw"; - const char* db = "skygw"; - - ss_dassert(donefun != NULL); - - /** Get server handle */ - mysql = mysql_init(NULL); - ss_dassert(mysql != NULL); - - if (mysql == NULL) { - MXS_ERROR("Call to mysql_real_connect failed due %d, %s.", - mysql_errno(mysql), - mysql_error(mysql)); - - goto retblock; - } - /** Set methods and authentication to mysql */ - mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "libmysqld_skygw"); - mysql_options(mysql, MYSQL_OPT_USE_EMBEDDED_CONNECTION, NULL); - mysql->methods = &embedded_methods; - mysql->user = my_strdup(user, MYF(0)); - mysql->db = my_strdup(db, MYF(0)); - mysql->passwd = NULL; - - pi = (parsing_info_t*)calloc(1, sizeof(parsing_info_t)); - - if (pi == NULL) - { - mysql_close(mysql); - goto retblock; - } + parsing_info_t* pi = NULL; + MYSQL* mysql; + const char* user = "skygw"; + const char* db = "skygw"; + + ss_dassert(donefun != NULL); + + /** Get server handle */ + mysql = mysql_init(NULL); + ss_dassert(mysql != NULL); + + if (mysql == NULL) + { + MXS_ERROR("Call to mysql_real_connect failed due %d, %s.", + mysql_errno(mysql), + mysql_error(mysql)); + + goto retblock; + } + + /** Set methods and authentication to mysql */ + mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "libmysqld_skygw"); + mysql_options(mysql, MYSQL_OPT_USE_EMBEDDED_CONNECTION, NULL); + mysql->methods = &embedded_methods; + mysql->user = my_strdup(user, MYF(0)); + mysql->db = my_strdup(db, MYF(0)); + mysql->passwd = NULL; + + pi = (parsing_info_t*) calloc(1, sizeof (parsing_info_t)); + + if (pi == NULL) + { + mysql_close(mysql); + goto retblock; + } + #if defined(SS_DEBUG) - pi->pi_chk_top = CHK_NUM_PINFO; - pi->pi_chk_tail = CHK_NUM_PINFO; + pi->pi_chk_top = CHK_NUM_PINFO; + pi->pi_chk_tail = CHK_NUM_PINFO; #endif - /** Set handle and free function to parsing info struct */ - pi->pi_handle = mysql; - pi->pi_done_fp = donefun; - + /** Set handle and free function to parsing info struct */ + pi->pi_handle = mysql; + pi->pi_done_fp = donefun; + retblock: - return pi; + return pi; } /** * Free function for parsing info. Called by gwbuf_free or in case initialization * of parsing information fails. - * + * * @param ptr Pointer to parsing information, cast required - * + * * @return void - * + * */ -void parsing_info_done( - void* ptr) +void parsing_info_done(void* ptr) { - parsing_info_t* pi; - THD* thd; + parsing_info_t* pi; + THD* thd; - if (ptr) - { - pi = (parsing_info_t *)ptr; - - if (pi->pi_handle != NULL) - { - MYSQL* mysql = (MYSQL *)pi->pi_handle; - - if (mysql->thd != NULL) - { - thd = (THD*)mysql->thd; - thd->end_statement (); - (*mysql->methods->free_embedded_thd)(mysql); - mysql->thd = NULL; - } - mysql_close(mysql); - } - /** Free plain text query string */ - if (pi->pi_query_plain_str != NULL) - { - free(pi->pi_query_plain_str); - } - free(pi); - } + if (ptr) + { + pi = (parsing_info_t *) ptr; + + if (pi->pi_handle != NULL) + { + MYSQL* mysql = (MYSQL *) pi->pi_handle; + + if (mysql->thd != NULL) + { + thd = (THD*) mysql->thd; + thd->end_statement(); + (*mysql->methods->free_embedded_thd)(mysql); + mysql->thd = NULL; + } + + mysql_close(mysql); + } + + /** Free plain text query string */ + if (pi->pi_query_plain_str != NULL) + { + free(pi->pi_query_plain_str); + } + + free(pi); + } } /** * Add plain text query string to parsing info. - * + * * @param ptr Pointer to parsing info struct, cast required * @param str String to be added - * + * * @return void */ -static void parsing_info_set_plain_str( - void* ptr, - char* str) +static void parsing_info_set_plain_str(void* ptr, char* str) { - parsing_info_t* pi = (parsing_info_t *)ptr; - CHK_PARSING_INFO(pi); - - pi->pi_query_plain_str = str; + parsing_info_t* pi = (parsing_info_t *) ptr; + CHK_PARSING_INFO(pi); + + pi->pi_query_plain_str = str; } /** * Generate a string of query type value. * Caller must free the memory of the resulting string. - * - * @param qtype Query type value, combination of values listed in - * query_classifier.h - * - * @return string representing the query type value + * + * @param qtype Query type value, combination of values listed in + * query_classifier.h + * + * @return string representing the query type value */ -char* skygw_get_qtype_str( - skygw_query_type_t qtype) +char* skygw_get_qtype_str(skygw_query_type_t qtype) { - int t1 = (int)qtype; - int t2 = 1; - skygw_query_type_t t = QUERY_TYPE_UNKNOWN; - char* qtype_str = NULL; + int t1 = (int) qtype; + int t2 = 1; + skygw_query_type_t t = QUERY_TYPE_UNKNOWN; + char* qtype_str = NULL; - /** - * Test values (bits) and clear matching bits from t1 one by one until - * t1 is completely cleared. - */ - while (t1 != 0) - { - if (t1&t2) - { - t = (skygw_query_type_t)t2; + /** + * Test values (bits) and clear matching bits from t1 one by one until + * t1 is completely cleared. + */ + while (t1 != 0) + { + if (t1 & t2) + { + t = (skygw_query_type_t) t2; - if (qtype_str == NULL) - { - qtype_str = strdup(STRQTYPE(t)); - } - else - { - size_t len = strlen(STRQTYPE(t)); - /** reallocate space for delimiter, new string and termination */ - qtype_str = (char *)realloc(qtype_str, strlen(qtype_str)+1+len+1); - snprintf(qtype_str+strlen(qtype_str), 1+len+1, "|%s", STRQTYPE(t)); - } - /** Remove found value from t1 */ - t1 &= ~t2; - } - t2 <<= 1; - } - return qtype_str; + if (qtype_str == NULL) + { + qtype_str = strdup(STRQTYPE(t)); + } + else + { + size_t len = strlen(STRQTYPE(t)); + /** reallocate space for delimiter, new string and termination */ + qtype_str = (char *) realloc(qtype_str, strlen(qtype_str) + 1 + len + 1); + snprintf(qtype_str + strlen(qtype_str), 1 + len + 1, "|%s", STRQTYPE(t)); + } + + /** Remove found value from t1 */ + t1 &= ~t2; + } + + t2 <<= 1; + } + + return qtype_str; } /** * Returns an array of strings of databases that this query uses. - * If the database isn't defined in the query, it is assumed that this query + * If the database isn't defined in the query, it is assumed that this query * only targets the current database. - * The value of @p size is set to the number of allocated strings. The caller is + * The value of @p size is set to the number of allocated strings. The caller is * responsible for freeing all the allocated memory. * @param querybuf GWBUF containing the query * @param size Size of the resulting array - * @return A new array of strings containing the database names or NULL if no + * @return A new array of strings containing the database names or NULL if no * databases were found. */ -char** skygw_get_database_names(GWBUF* querybuf,int* size) +char** skygw_get_database_names(GWBUF* querybuf, int* size) { - LEX* lex; - TABLE_LIST* tbl; - char **databases = NULL, **tmp = NULL; - int currsz = 0,i = 0; + LEX* lex; + TABLE_LIST* tbl; + char **databases = NULL, **tmp = NULL; + int currsz = 0, i = 0; - if( (lex = get_lex(querybuf)) == NULL) - { - goto retblock; - } + if ((lex = get_lex(querybuf)) == NULL) + { + goto retblock; + } - lex->current_select = lex->all_selects_list; - - while(lex->current_select) - { - tbl = lex->current_select->table_list.first; + lex->current_select = lex->all_selects_list; - while(tbl) - { - if(strcmp(tbl->db,"skygw_virtual") != 0) - { - if(i>= currsz) - { - tmp = (char**)realloc(databases, - sizeof(char*)*(currsz*2 + 1)); - if(tmp == NULL) - { - goto retblock; - } - databases = tmp; - currsz = currsz*2 + 1; - } - databases[i++] = strdup(tbl->db); - } - tbl=tbl->next_local; - } - lex->current_select = lex->current_select->next_select_in_list(); - } + while (lex->current_select) + { + tbl = lex->current_select->table_list.first; + + while (tbl) + { + if (strcmp(tbl->db, "skygw_virtual") != 0) + { + if (i >= currsz) + { + tmp = (char**) realloc(databases, + sizeof (char*)*(currsz * 2 + 1)); + + if (tmp == NULL) + { + goto retblock; + } + + databases = tmp; + currsz = currsz * 2 + 1; + } + + databases[i++] = strdup(tbl->db); + } + + tbl = tbl->next_local; + } + + lex->current_select = lex->current_select->next_select_in_list(); + } retblock: - *size = i; - return databases; + *size = i; + return databases; } skygw_query_op_t query_classifier_get_operation(GWBUF* querybuf) @@ -1608,53 +1687,69 @@ skygw_query_op_t query_classifier_get_operation(GWBUF* querybuf) parse_query(querybuf); } - LEX* lex = get_lex(querybuf); - skygw_query_op_t operation = QUERY_OP_UNDEFINED; - if(lex){ - switch(lex->sql_command){ - case SQLCOM_SELECT: - operation = QUERY_OP_SELECT; - break; - case SQLCOM_CREATE_TABLE: - operation = QUERY_OP_CREATE_TABLE; - break; - case SQLCOM_CREATE_INDEX: - operation = QUERY_OP_CREATE_INDEX; - break; - case SQLCOM_ALTER_TABLE: - operation = QUERY_OP_ALTER_TABLE; - break; - case SQLCOM_UPDATE: - operation = QUERY_OP_UPDATE; - break; - case SQLCOM_INSERT: - operation = QUERY_OP_INSERT; - break; - case SQLCOM_INSERT_SELECT: - operation = QUERY_OP_INSERT_SELECT; - break; - case SQLCOM_DELETE: - operation = QUERY_OP_DELETE; - break; - case SQLCOM_TRUNCATE: - operation = QUERY_OP_TRUNCATE; - break; - case SQLCOM_DROP_TABLE: - operation = QUERY_OP_DROP_TABLE; - break; - case SQLCOM_DROP_INDEX: - operation = QUERY_OP_DROP_INDEX; - break; - case SQLCOM_CHANGE_DB: - operation = QUERY_OP_CHANGE_DB; - break; - case SQLCOM_LOAD: - operation = QUERY_OP_LOAD; - break; + LEX* lex = get_lex(querybuf); + skygw_query_op_t operation = QUERY_OP_UNDEFINED; - default: - operation = QUERY_OP_UNDEFINED; - } - } - return operation; + if (lex) + { + switch (lex->sql_command) + { + case SQLCOM_SELECT: + operation = QUERY_OP_SELECT; + break; + + case SQLCOM_CREATE_TABLE: + operation = QUERY_OP_CREATE_TABLE; + break; + + case SQLCOM_CREATE_INDEX: + operation = QUERY_OP_CREATE_INDEX; + break; + + case SQLCOM_ALTER_TABLE: + operation = QUERY_OP_ALTER_TABLE; + break; + + case SQLCOM_UPDATE: + operation = QUERY_OP_UPDATE; + break; + + case SQLCOM_INSERT: + operation = QUERY_OP_INSERT; + break; + + case SQLCOM_INSERT_SELECT: + operation = QUERY_OP_INSERT_SELECT; + break; + + case SQLCOM_DELETE: + operation = QUERY_OP_DELETE; + break; + + case SQLCOM_TRUNCATE: + operation = QUERY_OP_TRUNCATE; + break; + + case SQLCOM_DROP_TABLE: + operation = QUERY_OP_DROP_TABLE; + break; + + case SQLCOM_DROP_INDEX: + operation = QUERY_OP_DROP_INDEX; + break; + + case SQLCOM_CHANGE_DB: + operation = QUERY_OP_CHANGE_DB; + break; + + case SQLCOM_LOAD: + operation = QUERY_OP_LOAD; + break; + + default: + operation = QUERY_OP_UNDEFINED; + } + } + + return operation; } diff --git a/query_classifier/query_classifier.h b/query_classifier/query_classifier.h index e49022647..11822fdfb 100644 --- a/query_classifier/query_classifier.h +++ b/query_classifier/query_classifier.h @@ -17,7 +17,7 @@ Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. Copyright MariaDB Corporation Ab -*/ + */ /** getpid */ #include @@ -33,90 +33,96 @@ EXTERN_C_BLOCK_BEGIN * The meaninful difference is where operation is done and whether master data * is modified */ -typedef enum { - QUERY_TYPE_UNKNOWN = 0x000000, /*< Initial value, can't be tested bitwisely */ - QUERY_TYPE_LOCAL_READ = 0x000001, /*< Read non-database data, execute in MaxScale:any */ - QUERY_TYPE_READ = 0x000002, /*< Read database data:any */ - QUERY_TYPE_WRITE = 0x000004, /*< Master data will be modified:master */ - QUERY_TYPE_MASTER_READ = 0x000008, /*< Read from the master:master */ - QUERY_TYPE_SESSION_WRITE = 0x000010, /*< Session data will be modified:master or all */ +typedef enum +{ + QUERY_TYPE_UNKNOWN = 0x000000, /*< Initial value, can't be tested bitwisely */ + QUERY_TYPE_LOCAL_READ = 0x000001, /*< Read non-database data, execute in MaxScale:any */ + QUERY_TYPE_READ = 0x000002, /*< Read database data:any */ + QUERY_TYPE_WRITE = 0x000004, /*< Master data will be modified:master */ + QUERY_TYPE_MASTER_READ = 0x000008, /*< Read from the master:master */ + QUERY_TYPE_SESSION_WRITE = 0x000010, /*< Session data will be modified:master or all */ /** Not implemented yet */ -// QUERY_TYPE_USERVAR_WRITE = 0x000020, /*< Write a user variable:master or all */ - QUERY_TYPE_USERVAR_READ = 0x000040, /*< Read a user variable:master or any */ - QUERY_TYPE_SYSVAR_READ = 0x000080, /*< Read a system variable:master or any */ + // QUERY_TYPE_USERVAR_WRITE = 0x000020, /*< Write a user variable:master or all */ + QUERY_TYPE_USERVAR_READ = 0x000040, /*< Read a user variable:master or any */ + QUERY_TYPE_SYSVAR_READ = 0x000080, /*< Read a system variable:master or any */ /** Not implemented yet */ -// QUERY_TYPE_SYSVAR_WRITE = 0x000100, /*< Write a system variable:master or all */ - QUERY_TYPE_GSYSVAR_READ = 0x000200, /*< Read global system variable:master or any */ - QUERY_TYPE_GSYSVAR_WRITE = 0x000400, /*< Write global system variable:master or all */ - QUERY_TYPE_BEGIN_TRX = 0x000800, /*< BEGIN or START TRANSACTION */ - QUERY_TYPE_ENABLE_AUTOCOMMIT = 0x001000, /*< SET autocommit=1 */ - QUERY_TYPE_DISABLE_AUTOCOMMIT = 0x002000, /*< SET autocommit=0 */ - QUERY_TYPE_ROLLBACK = 0x004000, /*< ROLLBACK */ - QUERY_TYPE_COMMIT = 0x008000, /*< COMMIT */ - QUERY_TYPE_PREPARE_NAMED_STMT = 0x010000, /*< Prepared stmt with name from user:all */ - QUERY_TYPE_PREPARE_STMT = 0x020000, /*< Prepared stmt with id provided by server:all */ - QUERY_TYPE_EXEC_STMT = 0x040000, /*< Execute prepared statement:master or any */ - QUERY_TYPE_CREATE_TMP_TABLE = 0x080000, /*< Create temporary table:master (could be all) */ - QUERY_TYPE_READ_TMP_TABLE = 0x100000, /*< Read temporary table:master (could be any) */ - QUERY_TYPE_SHOW_DATABASES = 0x200000, /*< Show list of databases */ - QUERY_TYPE_SHOW_TABLES = 0x400000 /*< Show list of tables */ + // QUERY_TYPE_SYSVAR_WRITE = 0x000100, /*< Write a system variable:master or all */ + QUERY_TYPE_GSYSVAR_READ = 0x000200, /*< Read global system variable:master or any */ + QUERY_TYPE_GSYSVAR_WRITE = 0x000400, /*< Write global system variable:master or all */ + QUERY_TYPE_BEGIN_TRX = 0x000800, /*< BEGIN or START TRANSACTION */ + QUERY_TYPE_ENABLE_AUTOCOMMIT = 0x001000, /*< SET autocommit=1 */ + QUERY_TYPE_DISABLE_AUTOCOMMIT = 0x002000, /*< SET autocommit=0 */ + QUERY_TYPE_ROLLBACK = 0x004000, /*< ROLLBACK */ + QUERY_TYPE_COMMIT = 0x008000, /*< COMMIT */ + QUERY_TYPE_PREPARE_NAMED_STMT = 0x010000, /*< Prepared stmt with name from user:all */ + QUERY_TYPE_PREPARE_STMT = 0x020000, /*< Prepared stmt with id provided by server:all */ + QUERY_TYPE_EXEC_STMT = 0x040000, /*< Execute prepared statement:master or any */ + QUERY_TYPE_CREATE_TMP_TABLE = 0x080000, /*< Create temporary table:master (could be all) */ + QUERY_TYPE_READ_TMP_TABLE = 0x100000, /*< Read temporary table:master (could be any) */ + QUERY_TYPE_SHOW_DATABASES = 0x200000, /*< Show list of databases */ + QUERY_TYPE_SHOW_TABLES = 0x400000 /*< Show list of tables */ } skygw_query_type_t; -typedef enum { - QUERY_OP_UNDEFINED = 0, - QUERY_OP_SELECT = 1, - QUERY_OP_UPDATE = (1 << 1), - QUERY_OP_INSERT = (1 << 2), - QUERY_OP_DELETE = (1 << 3), - QUERY_OP_INSERT_SELECT = (1 << 4), - QUERY_OP_TRUNCATE = (1 << 5), - QUERY_OP_ALTER_TABLE = (1 << 6), - QUERY_OP_CREATE_TABLE = (1 << 7), - QUERY_OP_CREATE_INDEX = (1 << 8), - QUERY_OP_DROP_TABLE = (1 << 9), - QUERY_OP_DROP_INDEX = (1 << 10), - QUERY_OP_CHANGE_DB = (1 << 11), - QUERY_OP_LOAD = (1 << 12) -}skygw_query_op_t; +typedef enum +{ + QUERY_OP_UNDEFINED = 0, + QUERY_OP_SELECT = 1, + QUERY_OP_UPDATE = (1 << 1), + QUERY_OP_INSERT = (1 << 2), + QUERY_OP_DELETE = (1 << 3), + QUERY_OP_INSERT_SELECT = (1 << 4), + QUERY_OP_TRUNCATE = (1 << 5), + QUERY_OP_ALTER_TABLE = (1 << 6), + QUERY_OP_CREATE_TABLE = (1 << 7), + QUERY_OP_CREATE_INDEX = (1 << 8), + QUERY_OP_DROP_TABLE = (1 << 9), + QUERY_OP_DROP_INDEX = (1 << 10), + QUERY_OP_CHANGE_DB = (1 << 11), + QUERY_OP_LOAD = (1 << 12) +} skygw_query_op_t; -typedef struct parsing_info_st { +typedef struct parsing_info_st +{ #if defined(SS_DEBUG) - skygw_chk_t pi_chk_top; + skygw_chk_t pi_chk_top; #endif - void* pi_handle; /*< parsing info object pointer */ - char* pi_query_plain_str; /*< query as plain string */ - void (*pi_done_fp)(void *); /*< clean-up function for parsing info */ + void* pi_handle; /*< parsing info object pointer */ + char* pi_query_plain_str; /*< query as plain string */ + void (*pi_done_fp)(void *); /*< clean-up function for parsing info */ #if defined(SS_DEBUG) - skygw_chk_t pi_chk_tail; + skygw_chk_t pi_chk_tail; #endif } parsing_info_t; #define QUERY_IS_TYPE(mask,type) ((mask & type) == type) -/** - * Create THD and use it for creating parse tree. Examine parse tree and +/** + * Create THD and use it for creating parse tree. Examine parse tree and * classify the query. */ skygw_query_type_t query_classifier_get_type(GWBUF* querybuf); skygw_query_op_t query_classifier_get_operation(GWBUF* querybuf); -/** Free THD context and close MYSQL */ + #if defined(NOT_USED) -char* skygw_query_classifier_get_stmtname(GWBUF* buf); +char* skygw_query_classifier_get_stmtname(GWBUF* buf); #endif -char* skygw_get_created_table_name(GWBUF* querybuf); -bool is_drop_table_query(GWBUF* querybuf); -bool skygw_is_real_query(GWBUF* querybuf); -char** skygw_get_table_names(GWBUF* querybuf, int* tblsize, bool fullnames); -char* skygw_get_canonical(GWBUF* querybuf); -bool parse_query (GWBUF* querybuf); + +char* skygw_get_created_table_name(GWBUF* querybuf); +bool is_drop_table_query(GWBUF* querybuf); +bool skygw_is_real_query(GWBUF* querybuf); +char** skygw_get_table_names(GWBUF* querybuf, int* tblsize, bool fullnames); +char* skygw_get_canonical(GWBUF* querybuf); +bool parse_query(GWBUF* querybuf); parsing_info_t* parsing_info_init(void (*donefun)(void *)); -void parsing_info_done(void* ptr); -bool query_is_parsed(GWBUF* buf); -bool skygw_query_has_clause(GWBUF* buf); -char* skygw_get_qtype_str(skygw_query_type_t qtype); -char* skygw_get_affected_fields(GWBUF* buf); -char** skygw_get_database_names(GWBUF* querybuf,int* size); + +/** Free THD context and close MYSQL */ +void parsing_info_done(void* ptr); +bool query_is_parsed(GWBUF* buf); +bool skygw_query_has_clause(GWBUF* buf); +char* skygw_get_qtype_str(skygw_query_type_t qtype); +char* skygw_get_affected_fields(GWBUF* buf); +char** skygw_get_database_names(GWBUF* querybuf, int* size); EXTERN_C_BLOCK_END diff --git a/server/core/gateway.c b/server/core/gateway.c index 57e63630d..d90b7d48f 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -210,7 +210,7 @@ static bool resolve_maxscale_conf_fname( char* cnf_file_arg); static char* check_dir_access(char* dirname, bool, bool); -static int set_user(); +static int set_user(const char* user); bool pid_file_exists(); void write_child_exit_code(int fd, int code); /** SSL multi-threading functions and structures */ @@ -968,7 +968,7 @@ static void usage(void) " (default: /etc/)\n" " -D, --datadir=PATH path to data directory, stored embedded mysql tables\n" " (default: /var/cache/maxscale)\n" - " -N, --language=PATH apth to errmsg.sys file\n" + " -N, --language=PATH path to errmsg.sys file\n" " (default: /var/lib/maxscale)\n" " -P, --piddir=PATH path to PID file directory\n" " (default: /var/run/maxscale)\n" @@ -2433,7 +2433,7 @@ static int cnf_preparser(void* data, const char* section, const char* name, cons return 1; } -static int set_user(char* user) +static int set_user(const char* user) { errno = 0; struct passwd *pwname; diff --git a/server/core/gwbitmask.c b/server/core/gwbitmask.c index 8287bb849..b0cea9758 100644 --- a/server/core/gwbitmask.c +++ b/server/core/gwbitmask.c @@ -109,13 +109,13 @@ bitmask_set(GWBITMASK *bitmask, int bit) unsigned char mask; spinlock_acquire(&bitmask->lock); - if (bit >= bitmask->length) + while (bit >= bitmask->length) { bitmask->bits = realloc(bitmask->bits, (bitmask->length + BIT_LENGTH_INC) / 8); memset(bitmask->bits + (bitmask->length / 8), 0, BIT_LENGTH_INC / 8); - bitmask->length += (BIT_LENGTH_INC / 8); + bitmask->length += BIT_LENGTH_INC; } ptr = bitmask->bits + (bit / 8); mask = 1 << (bit % 8); diff --git a/server/include/gwbitmask.h b/server/include/gwbitmask.h index 6b7b504f1..77c023bd6 100644 --- a/server/include/gwbitmask.h +++ b/server/include/gwbitmask.h @@ -33,8 +33,8 @@ */ /* Both these numbers MUST be exact multiples of 8 */ -#define BIT_LENGTH_INITIAL 32 /**< Initial number of bits in the bitmask */ -#define BIT_LENGTH_INC 32 /**< Number of bits to add on each increment */ +#define BIT_LENGTH_INITIAL 256 /**< Initial number of bits in the bitmask */ +#define BIT_LENGTH_INC 256 /**< Number of bits to add on each increment */ /** * The bitmask structure used to store an arbitrary large bitmask diff --git a/server/modules/include/readconnection.h b/server/modules/include/readconnection.h index 45eb2b51d..c8ff2f21d 100644 --- a/server/modules/include/readconnection.h +++ b/server/modules/include/readconnection.h @@ -37,52 +37,55 @@ * connections to. This provides the storage for routing module specific data * that is required for each of the backend servers. */ -typedef struct backend { - SERVER *server; /*< The server itself */ - int current_connection_count; /*< Number of connections to the server */ - int weight; /*< Desired routing weight */ +typedef struct backend +{ + SERVER *server; /*< The server itself */ + int current_connection_count; /*< Number of connections to the server */ + int weight; /*< Desired routing weight */ } BACKEND; /** * The client session structure used within this router. */ -typedef struct router_client_session { +typedef struct router_client_session +{ #if defined(SS_DEBUG) - skygw_chk_t rses_chk_top; + skygw_chk_t rses_chk_top; #endif - SPINLOCK rses_lock; /*< protects rses_deleted */ - int rses_versno; /*< even = no active update, else odd */ - bool rses_closed; /*< true when closeSession is called */ - BACKEND *backend; /*< Backend used by the client session */ - DCB *backend_dcb; /*< DCB Connection to the backend */ - struct router_client_session *next; - int rses_capabilities; /*< input type, for example */ + SPINLOCK rses_lock; /*< protects rses_deleted */ + int rses_versno; /*< even = no active update, else odd */ + bool rses_closed; /*< true when closeSession is called */ + BACKEND *backend; /*< Backend used by the client session */ + DCB *backend_dcb; /*< DCB Connection to the backend */ + struct router_client_session *next; + int rses_capabilities; /*< input type, for example */ #if defined(SS_DEBUG) - skygw_chk_t rses_chk_tail; + skygw_chk_t rses_chk_tail; #endif } ROUTER_CLIENT_SES; /** * The statistics for this router instance */ -typedef struct { - int n_sessions; /*< Number sessions created */ - int n_queries; /*< Number of queries forwarded */ +typedef struct +{ + int n_sessions; /*< Number sessions created */ + int n_queries; /*< Number of queries forwarded */ } ROUTER_STATS; - /** * The per instance data for the router. */ -typedef struct router_instance { - SERVICE *service; /*< Pointer to the service using this router */ - ROUTER_CLIENT_SES *connections; /*< Link list of all the client connections */ - SPINLOCK lock; /*< Spinlock for the instance data */ - BACKEND **servers; /*< List of backend servers */ - unsigned int bitmask; /*< Bitmask to apply to server->status */ - unsigned int bitvalue; /*< Required value of server->status */ - ROUTER_STATS stats; /*< Statistics for this router */ - struct router_instance - *next; +typedef struct router_instance +{ + SERVICE *service; /*< Pointer to the service using this router */ + ROUTER_CLIENT_SES *connections; /*< Link list of all the client connections */ + SPINLOCK lock; /*< Spinlock for the instance data */ + BACKEND **servers; /*< List of backend servers */ + unsigned int bitmask; /*< Bitmask to apply to server->status */ + unsigned int bitvalue; /*< Required value of server->status */ + ROUTER_STATS stats; /*< Statistics for this router */ + struct router_instance + *next; } ROUTER_INSTANCE; #endif diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index d69d8fca4..e005aefe5 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -284,7 +284,7 @@ char task_name[BLRM_TASK_NAME_LEN+1] = ""; inst->m_errno = 0; inst->m_errmsg = NULL; - inst->trx_safe = 1; + inst->trx_safe = 0; inst->pending_transaction = 0; inst->last_safe_pos = 0; diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index 299e422b2..890a9efe4 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -96,39 +96,33 @@ #include "modutil.h" -MODULE_INFO info = { - MODULE_API_ROUTER, - MODULE_GA, - ROUTER_VERSION, - "A connection based router to load balance based on connections" +MODULE_INFO info = +{ + MODULE_API_ROUTER, + MODULE_GA, + ROUTER_VERSION, + "A connection based router to load balance based on connections" }; static char *version_str = "V1.1.0"; /* The router entry points */ -static ROUTER *createInstance(SERVICE *service, char **options); -static void *newSession(ROUTER *instance, SESSION *session); -static void closeSession(ROUTER *instance, void *router_session); -static void freeSession(ROUTER *instance, void *router_session); -static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue); -static void diagnostics(ROUTER *instance, DCB *dcb); -static void clientReply( - ROUTER *instance, - void *router_session, - GWBUF *queue, - DCB *backend_dcb); -static void handleError( - ROUTER *instance, - void *router_session, - GWBUF *errbuf, - DCB *problem_dcb, - error_action_t action, - bool *succp); -static int getCapabilities (); +static ROUTER *createInstance(SERVICE *service, char **options); +static void *newSession(ROUTER *instance, SESSION *session); +static void closeSession(ROUTER *instance, void *router_session); +static void freeSession(ROUTER *instance, void *router_session); +static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue); +static void diagnostics(ROUTER *instance, DCB *dcb); +static void clientReply(ROUTER *instance, void *router_session, GWBUF *queue, + DCB *backend_dcb); +static void handleError(ROUTER *instance, void *router_session, GWBUF *errbuf, + DCB *problem_dcb, error_action_t action, bool *succp); +static int getCapabilities(); /** The module object definition */ -static ROUTER_OBJECT MyObject = { +static ROUTER_OBJECT MyObject = +{ createInstance, newSession, closeSession, @@ -140,17 +134,13 @@ static ROUTER_OBJECT MyObject = { getCapabilities }; -static bool rses_begin_locked_router_action( - ROUTER_CLIENT_SES* rses); +static bool rses_begin_locked_router_action(ROUTER_CLIENT_SES* rses); -static void rses_end_locked_router_action( - ROUTER_CLIENT_SES* rses); +static void rses_end_locked_router_action(ROUTER_CLIENT_SES* rses); -static BACKEND *get_root_master( - BACKEND **servers); -static int handle_state_switch( - DCB* dcb,DCB_REASON reason, void * routersession); -static SPINLOCK instlock; +static BACKEND *get_root_master(BACKEND **servers); +static int handle_state_switch(DCB* dcb, DCB_REASON reason, void * routersession); +static SPINLOCK instlock; static ROUTER_INSTANCE *instances; /** @@ -161,7 +151,7 @@ static ROUTER_INSTANCE *instances; char * version() { - return version_str; + return version_str; } /** @@ -187,66 +177,71 @@ ModuleInit() ROUTER_OBJECT * GetModuleObject() { - return &MyObject; + return &MyObject; } /** * Create an instance of the router for a particular service * within the gateway. - * + * * @param service The service this router is being create for * @param options An array of options for this query router * * @return The instance data for this new instance */ -static ROUTER * +static ROUTER * createInstance(SERVICE *service, char **options) { -ROUTER_INSTANCE *inst; -SERVER *server; -SERVER_REF *sref; -int i, n; -BACKEND *backend; -char *weightby; + ROUTER_INSTANCE *inst; + SERVER *server; + SERVER_REF *sref; + int i, n; + BACKEND *backend; + char *weightby; - if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { - return NULL; + if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) + { + return NULL; + } + + inst->service = service; + spinlock_init(&inst->lock); + + /* + * We need an array of the backend servers in the instance structure so + * that we can maintain a count of the number of connections to each + * backend server. + */ + for (sref = service->dbref, n = 0; sref; sref = sref->next) + { + n++; + } + + inst->servers = (BACKEND **) calloc(n + 1, sizeof(BACKEND *)); + if (!inst->servers) + { + free(inst); + return NULL; + } + + for (sref = service->dbref, n = 0; sref; sref = sref->next) + { + if ((inst->servers[n] = malloc(sizeof(BACKEND))) == NULL) + { + for (i = 0; i < n; i++) + { + free(inst->servers[i]); + } + free(inst->servers); + free(inst); + return NULL; } - - inst->service = service; - spinlock_init(&inst->lock); - - /* - * We need an array of the backend servers in the instance structure so - * that we can maintain a count of the number of connections to each - * backend server. - */ - for (sref = service->dbref, n = 0; sref; sref = sref->next) - n++; - - inst->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *)); - if (!inst->servers) - { - free(inst); - return NULL; - } - - for (sref = service->dbref, n = 0; sref; sref = sref->next) - { - if ((inst->servers[n] = malloc(sizeof(BACKEND))) == NULL) - { - for (i = 0; i < n; i++) - free(inst->servers[i]); - free(inst->servers); - free(inst); - return NULL; - } - inst->servers[n]->server = sref->server; - inst->servers[n]->current_connection_count = 0; - inst->servers[n]->weight = 1000; - n++; - } - inst->servers[n] = NULL; + inst->servers[n]->server = sref->server; + inst->servers[n]->current_connection_count = 0; + inst->servers[n]->weight = 1000; + n++; + } + inst->servers[n] = NULL; if ((weightby = serviceGetWeightingParameter(service)) != NULL) { @@ -314,67 +309,67 @@ char *weightby; } } - /* - * Process the options - */ - inst->bitmask = 0; - inst->bitvalue = 0; - if (options) - { - for (i = 0; options[i]; i++) - { - if (!strcasecmp(options[i], "master")) - { - inst->bitmask |= (SERVER_MASTER|SERVER_SLAVE); - inst->bitvalue |= SERVER_MASTER; - } - else if (!strcasecmp(options[i], "slave")) - { - inst->bitmask |= (SERVER_MASTER|SERVER_SLAVE); - inst->bitvalue |= SERVER_SLAVE; - } - else if (!strcasecmp(options[i], "running")) - { - inst->bitmask |= (SERVER_RUNNING); - inst->bitvalue |= SERVER_RUNNING; - } - else if (!strcasecmp(options[i], "synced")) - { - inst->bitmask |= (SERVER_JOINED); - inst->bitvalue |= SERVER_JOINED; - } - else if (!strcasecmp(options[i], "ndb")) - { - inst->bitmask |= (SERVER_NDB); - inst->bitvalue |= SERVER_NDB; - } - else - { - MXS_WARNING("Unsupported router " - "option \'%s\' for readconnroute. " - "Expected router options are " - "[slave|master|synced|ndb]", - options[i]); - } - } - } - if(inst->bitmask == 0 && inst->bitvalue == 0) - { - /** No parameters given, use RUNNING as a valid server */ - inst->bitmask |= (SERVER_RUNNING); - inst->bitvalue |= SERVER_RUNNING; - } - /* - * We have completed the creation of the instance data, so now - * insert this router instance into the linked list of routers - * that have been created with this module. - */ - spinlock_acquire(&instlock); - inst->next = instances; - instances = inst; - spinlock_release(&instlock); + /* + * Process the options + */ + inst->bitmask = 0; + inst->bitvalue = 0; + if (options) + { + for (i = 0; options[i]; i++) + { + if (!strcasecmp(options[i], "master")) + { + inst->bitmask |= (SERVER_MASTER | SERVER_SLAVE); + inst->bitvalue |= SERVER_MASTER; + } + else if (!strcasecmp(options[i], "slave")) + { + inst->bitmask |= (SERVER_MASTER | SERVER_SLAVE); + inst->bitvalue |= SERVER_SLAVE; + } + else if (!strcasecmp(options[i], "running")) + { + inst->bitmask |= (SERVER_RUNNING); + inst->bitvalue |= SERVER_RUNNING; + } + else if (!strcasecmp(options[i], "synced")) + { + inst->bitmask |= (SERVER_JOINED); + inst->bitvalue |= SERVER_JOINED; + } + else if (!strcasecmp(options[i], "ndb")) + { + inst->bitmask |= (SERVER_NDB); + inst->bitvalue |= SERVER_NDB; + } + else + { + MXS_WARNING("Unsupported router " + "option \'%s\' for readconnroute. " + "Expected router options are " + "[slave|master|synced|ndb]", + options[i]); + } + } + } + if (inst->bitmask == 0 && inst->bitvalue == 0) + { + /** No parameters given, use RUNNING as a valid server */ + inst->bitmask |= (SERVER_RUNNING); + inst->bitvalue |= SERVER_RUNNING; + } + /* + * We have completed the creation of the instance data, so now + * insert this router instance into the linked list of routers + * that have been created with this module. + */ + spinlock_acquire(&instlock); + inst->next = instances; + instances = inst; + spinlock_release(&instlock); - return (ROUTER *)inst; + return(ROUTER *) inst; } /** @@ -384,210 +379,228 @@ char *weightby; * @param session The session itself * @return Session specific data for this session */ -static void * +static void * newSession(ROUTER *instance, SESSION *session) { -ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; -ROUTER_CLIENT_SES *client_rses; -BACKEND *candidate = NULL; -int i; -BACKEND *master_host = NULL; + ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *) instance; + ROUTER_CLIENT_SES *client_rses; + BACKEND *candidate = NULL; + int i; + BACKEND *master_host = NULL; - MXS_DEBUG("%lu [newSession] new router session with session " - "%p, and inst %p.", - pthread_self(), - session, - inst); + MXS_DEBUG("%lu [newSession] new router session with session " + "%p, and inst %p.", + pthread_self(), + session, + inst); - client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); + client_rses = (ROUTER_CLIENT_SES *) calloc(1, sizeof(ROUTER_CLIENT_SES)); - if (client_rses == NULL) { - return NULL; - } + if (client_rses == NULL) + { + return NULL; + } #if defined(SS_DEBUG) - client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; - client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; + client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; + client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; #endif - /** - * Find the Master host from available servers - */ - master_host = get_root_master(inst->servers); + /** + * Find the Master host from available servers + */ + master_host = get_root_master(inst->servers); - /** - * Find a backend server to connect to. This is the extent of the - * load balancing algorithm we need to implement for this simple - * connection router. - */ + /** + * Find a backend server to connect to. This is the extent of the + * load balancing algorithm we need to implement for this simple + * connection router. + */ - /* - * Loop over all the servers and find any that have fewer connections - * than the candidate server. - * - * If a server has less connections than the current candidate we mark this - * as the new candidate to connect to. - * - * If a server has the same number of connections currently as the candidate - * and has had less connections over time than the candidate it will also - * become the new candidate. This has the effect of spreading the - * connections over different servers during periods of very low load. - */ - for (i = 0; inst->servers[i]; i++) { - if(inst->servers[i]) { - MXS_DEBUG("%lu [newSession] Examine server in port %d with " - "%d connections. Status is %s, " - "inst->bitvalue is %d", - pthread_self(), - inst->servers[i]->server->port, - inst->servers[i]->current_connection_count, - STRSRVSTATUS(inst->servers[i]->server), - inst->bitmask); - } + /* + * Loop over all the servers and find any that have fewer connections + * than the candidate server. + * + * If a server has less connections than the current candidate we mark this + * as the new candidate to connect to. + * + * If a server has the same number of connections currently as the candidate + * and has had less connections over time than the candidate it will also + * become the new candidate. This has the effect of spreading the + * connections over different servers during periods of very low load. + */ + for (i = 0; inst->servers[i]; i++) + { + if (inst->servers[i]) + { + MXS_DEBUG("%lu [newSession] Examine server in port %d with " + "%d connections. Status is %s, " + "inst->bitvalue is %d", + pthread_self(), + inst->servers[i]->server->port, + inst->servers[i]->current_connection_count, + STRSRVSTATUS(inst->servers[i]->server), + inst->bitmask); + } - if (SERVER_IN_MAINT(inst->servers[i]->server)) - continue; + if (SERVER_IN_MAINT(inst->servers[i]->server)) + { + continue; + } - if (inst->servers[i]->weight == 0) - continue; + if (inst->servers[i]->weight == 0) + { + continue; + } - /* Check server status bits against bitvalue from router_options */ - if (inst->servers[i] && - SERVER_IS_RUNNING(inst->servers[i]->server) && - (inst->servers[i]->server->status & inst->bitmask & inst->bitvalue)) + /* Check server status bits against bitvalue from router_options */ + if (inst->servers[i] && + SERVER_IS_RUNNING(inst->servers[i]->server) && + (inst->servers[i]->server->status & inst->bitmask & inst->bitvalue)) + { + if (master_host) + { + if (inst->servers[i] == master_host && (inst->bitvalue & SERVER_SLAVE)) { - if (master_host) { - if (inst->servers[i] == master_host && (inst->bitvalue & SERVER_SLAVE)) { - /* skip root Master here, as it could also be slave of an external server - * that is not in the configuration. - * Intermediate masters (Relay Servers) are also slave and will be selected - * as Slave(s) - */ + /* skip root Master here, as it could also be slave of an external server + * that is not in the configuration. + * Intermediate masters (Relay Servers) are also slave and will be selected + * as Slave(s) + */ - continue; - } - if (inst->servers[i] == master_host && (inst->bitvalue & SERVER_MASTER)) { - /* If option is "master" return only the root Master as there - * could be intermediate masters (Relay Servers) - * and they must not be selected. - */ + continue; + } + if (inst->servers[i] == master_host && (inst->bitvalue & SERVER_MASTER)) + { + /* If option is "master" return only the root Master as there + * could be intermediate masters (Relay Servers) + * and they must not be selected. + */ - candidate = master_host; - break; - } - } else { - /* master_host is NULL, no master server. - * If requested router_option is 'master' - * candidate wll be NULL. - */ - if (inst->bitvalue & SERVER_MASTER) { - candidate = NULL; - break; - } - } + candidate = master_host; + break; + } + } + else + { + /* master_host is NULL, no master server. + * If requested router_option is 'master' + * candidate wll be NULL. + */ + if (inst->bitvalue & SERVER_MASTER) + { + candidate = NULL; + break; + } + } - /* If no candidate set, set first running server as - our initial candidate server */ - if (candidate == NULL) - { - candidate = inst->servers[i]; - } - else if ((inst->servers[i]->current_connection_count - * 1000) / inst->servers[i]->weight < - (candidate->current_connection_count * - 1000) / candidate->weight) - { - /* This running server has fewer - connections, set it as a new candidate */ - candidate = inst->servers[i]; - } - else if ((inst->servers[i]->current_connection_count - * 1000) / inst->servers[i]->weight == - (candidate->current_connection_count * - 1000) / candidate->weight && - inst->servers[i]->server->stats.n_connections < - candidate->server->stats.n_connections) - { - /* This running server has the same number - of connections currently as the candidate - but has had fewer connections over time - than candidate, set this server to candidate*/ - candidate = inst->servers[i]; - } - } - } + /* If no candidate set, set first running server as + our initial candidate server */ + if (candidate == NULL) + { + candidate = inst->servers[i]; + } + else if (((inst->servers[i]->current_connection_count + 1) + * 1000) / inst->servers[i]->weight < + ((candidate->current_connection_count + 1) * + 1000) / candidate->weight) + { + /* This running server has fewer + connections, set it as a new candidate */ + candidate = inst->servers[i]; + } + else if (((inst->servers[i]->current_connection_count + 1) + * 1000) / inst->servers[i]->weight == + ((candidate->current_connection_count + 1) * + 1000) / candidate->weight && + inst->servers[i]->server->stats.n_connections < + candidate->server->stats.n_connections) + { + /* This running server has the same number + of connections currently as the candidate + but has had fewer connections over time + than candidate, set this server to candidate*/ + candidate = inst->servers[i]; + } + } + } - /* There is no candidate server here! - * With router_option=slave a master_host could be set, so route traffic there. - * Otherwise, just clean up and return NULL - */ - if (!candidate) { - if (master_host) { - candidate = master_host; - } else { - MXS_ERROR("Failed to create new routing session. " - "Couldn't find eligible candidate server. Freeing " - "allocated resources."); - free(client_rses); - return NULL; - } - } + /* There is no candidate server here! + * With router_option=slave a master_host could be set, so route traffic there. + * Otherwise, just clean up and return NULL + */ + if (!candidate) + { + if (master_host) + { + candidate = master_host; + } + else + { + MXS_ERROR("Failed to create new routing session. " + "Couldn't find eligible candidate server. Freeing " + "allocated resources."); + free(client_rses); + return NULL; + } + } - client_rses->rses_capabilities = RCAP_TYPE_PACKET_INPUT; - - /* - * We now have the server with the least connections. - * Bump the connection count for this server - */ - atomic_add(&candidate->current_connection_count, 1); - client_rses->backend = candidate; - MXS_DEBUG("%lu [newSession] Selected server in port %d. " - "Connections : %d\n", - pthread_self(), - candidate->server->port, - candidate->current_connection_count); + client_rses->rses_capabilities = RCAP_TYPE_PACKET_INPUT; - /* - * Open a backend connection, putting the DCB for this - * connection in the client_rses->backend_dcb - */ - client_rses->backend_dcb = dcb_connect(candidate->server, - session, - candidate->server->protocol); - if (client_rses->backend_dcb == NULL) - { - atomic_add(&candidate->current_connection_count, -1); - free(client_rses); - return NULL; - } - dcb_add_callback( - client_rses->backend_dcb, - DCB_REASON_NOT_RESPONDING, - &handle_state_switch, - client_rses); - inst->stats.n_sessions++; + /* + * We now have the server with the least connections. + * Bump the connection count for this server + */ + atomic_add(&candidate->current_connection_count, 1); + client_rses->backend = candidate; + MXS_DEBUG("%lu [newSession] Selected server in port %d. " + "Connections : %d\n", + pthread_self(), + candidate->server->port, + candidate->current_connection_count); - /** - * Add this session to the list of active sessions. - */ - spinlock_acquire(&inst->lock); - client_rses->next = inst->connections; - inst->connections = client_rses; - spinlock_release(&inst->lock); + /* + * Open a backend connection, putting the DCB for this + * connection in the client_rses->backend_dcb + */ + client_rses->backend_dcb = dcb_connect(candidate->server, + session, + candidate->server->protocol); + if (client_rses->backend_dcb == NULL) + { + atomic_add(&candidate->current_connection_count, -1); + free(client_rses); + return NULL; + } + dcb_add_callback( + client_rses->backend_dcb, + DCB_REASON_NOT_RESPONDING, + &handle_state_switch, + client_rses); + inst->stats.n_sessions++; - CHK_CLIENT_RSES(client_rses); + /** + * Add this session to the list of active sessions. + */ + spinlock_acquire(&inst->lock); + client_rses->next = inst->connections; + inst->connections = client_rses; + spinlock_release(&inst->lock); - MXS_INFO("Readconnroute: New session for server %s. " - "Connections : %d", - candidate->server->unique_name, - candidate->current_connection_count); - return (void *)client_rses; + CHK_CLIENT_RSES(client_rses); + + MXS_INFO("Readconnroute: New session for server %s. " + "Connections : %d", + candidate->server->unique_name, + candidate->current_connection_count); + + return(void *) client_rses; } /** * @node Unlink from backend server, unlink from router's connection list, - * and free memory of a router client session. + * and free memory of a router client session. * * Parameters: * @param router - @@ -598,51 +611,53 @@ BACKEND *master_host = NULL; * * @return void * - * + * * @details (write detailed description here) * */ -static void freeSession( - ROUTER* router_instance, - void* router_client_ses) +static void freeSession(ROUTER* router_instance, void* router_client_ses) { - ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_instance; - ROUTER_CLIENT_SES* router_cli_ses = - (ROUTER_CLIENT_SES *)router_client_ses; - int prev_val; - - prev_val = atomic_add(&router_cli_ses->backend->current_connection_count, -1); - ss_dassert(prev_val > 0); - - spinlock_acquire(&router->lock); - - if (router->connections == router_cli_ses) { - router->connections = router_cli_ses->next; - } else { - ROUTER_CLIENT_SES *ptr = router->connections; - - while (ptr != NULL && ptr->next != router_cli_ses) { - ptr = ptr->next; - } - - if (ptr != NULL) { - ptr->next = router_cli_ses->next; - } - } - spinlock_release(&router->lock); + ROUTER_INSTANCE* router = (ROUTER_INSTANCE *) router_instance; + ROUTER_CLIENT_SES* router_cli_ses = + (ROUTER_CLIENT_SES *) router_client_ses; + int prev_val; - MXS_DEBUG("%lu [freeSession] Unlinked router_client_session %p from " - "router %p and from server on port %d. Connections : %d. ", - pthread_self(), - router_cli_ses, - router, - router_cli_ses->backend->server->port, - prev_val-1); + prev_val = atomic_add(&router_cli_ses->backend->current_connection_count, -1); + ss_dassert(prev_val > 0); - free(router_cli_ses); + spinlock_acquire(&router->lock); + + if (router->connections == router_cli_ses) + { + router->connections = router_cli_ses->next; + } + else + { + ROUTER_CLIENT_SES *ptr = router->connections; + + while (ptr != NULL && ptr->next != router_cli_ses) + { + ptr = ptr->next; + } + + if (ptr != NULL) + { + ptr->next = router_cli_ses->next; + } + } + spinlock_release(&router->lock); + + MXS_DEBUG("%lu [freeSession] Unlinked router_client_session %p from " + "router %p and from server on port %d. Connections : %d. ", + pthread_self(), + router_cli_ses, + router, + router_cli_ses->backend->server->port, + prev_val - 1); + + free(router_cli_ses); } - /** * Close a session with the router, this is the mechanism * by which a router may cleanup data structure etc. @@ -650,34 +665,35 @@ static void freeSession( * @param instance The router instance data * @param router_session The session being closed */ -static void +static void closeSession(ROUTER *instance, void *router_session) { -ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session; -DCB* backend_dcb; + ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *) router_session; + DCB* backend_dcb; + + CHK_CLIENT_RSES(router_cli_ses); + /** + * Lock router client session for secure read and update. + */ + if (rses_begin_locked_router_action(router_cli_ses)) + { + /* decrease server current connection counter */ + + backend_dcb = router_cli_ses->backend_dcb; + router_cli_ses->backend_dcb = NULL; + router_cli_ses->rses_closed = true; + /** Unlock */ + rses_end_locked_router_action(router_cli_ses); - CHK_CLIENT_RSES(router_cli_ses); /** - * Lock router client session for secure read and update. + * Close the backend server connection */ - if (rses_begin_locked_router_action(router_cli_ses)) + if (backend_dcb != NULL) { - /* decrease server current connection counter */ - - backend_dcb = router_cli_ses->backend_dcb; - router_cli_ses->backend_dcb = NULL; - router_cli_ses->rses_closed = true; - /** Unlock */ - rses_end_locked_router_action(router_cli_ses); - - /** - * Close the backend server connection - */ - if (backend_dcb != NULL) { - CHK_DCB(backend_dcb); - dcb_close(backend_dcb); - } + CHK_DCB(backend_dcb); + dcb_close(backend_dcb); } + } } /** @@ -690,80 +706,83 @@ DCB* backend_dcb; * @param queue The queue of data buffers to route * @return if succeed 1, otherwise 0 */ -static int +static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue) { - ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; - ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session; - uint8_t *payload = GWBUF_DATA(queue); - int mysql_command; - int rc; - DCB* backend_dcb; - bool rses_is_closed; - - inst->stats.n_queries++; - mysql_command = MYSQL_GET_COMMAND(payload); + ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *) instance; + ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *) router_session; + uint8_t *payload = GWBUF_DATA(queue); + int mysql_command; + int rc; + DCB* backend_dcb; + bool rses_is_closed; - /** Dirty read for quick check if router is closed. */ - if (router_cli_ses->rses_closed) + inst->stats.n_queries++; + mysql_command = MYSQL_GET_COMMAND(payload); + + /** Dirty read for quick check if router is closed. */ + if (router_cli_ses->rses_closed) + { + rses_is_closed = true; + } + else + { + /** + * Lock router client session for secure read of DCBs + */ + rses_is_closed = !(rses_begin_locked_router_action(router_cli_ses)); + } + + if (!rses_is_closed) + { + backend_dcb = router_cli_ses->backend_dcb; + /** unlock */ + rses_end_locked_router_action(router_cli_ses); + } + + if (rses_is_closed || backend_dcb == NULL || + SERVER_IS_DOWN(router_cli_ses->backend->server)) + { + MXS_ERROR("Failed to route MySQL command %d to backend " + "server.%s", + mysql_command, rses_is_closed ? " Session is closed." : ""); + rc = 0; + while ((queue = GWBUF_CONSUME_ALL(queue)) != NULL) { - rses_is_closed = true; - } - else - { - /** - * Lock router client session for secure read of DCBs - */ - rses_is_closed = !(rses_begin_locked_router_action(router_cli_ses)); + ; } + goto return_rc; - if (!rses_is_closed) - { - backend_dcb = router_cli_ses->backend_dcb; - /** unlock */ - rses_end_locked_router_action(router_cli_ses); - } + } - if (rses_is_closed || backend_dcb == NULL || - SERVER_IS_DOWN(router_cli_ses->backend->server)) - { - MXS_ERROR("Failed to route MySQL command %d to backend " - "server.%s", - mysql_command,rses_is_closed ? " Session is closed." : ""); - rc = 0; - while((queue = GWBUF_CONSUME_ALL(queue)) != NULL); - goto return_rc; + char* trc = NULL; - } + switch (mysql_command) + { + case MYSQL_COM_CHANGE_USER: + rc = backend_dcb->func.auth(backend_dcb, NULL, backend_dcb->session, + queue); + break; + case MYSQL_COM_QUERY: + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) + { + trc = modutil_get_SQL(queue); + } + default: + rc = backend_dcb->func.write(backend_dcb, queue); + break; + } - char* trc = NULL; + MXS_INFO("Routed [%s] to '%s'%s%s", + STRPACKETTYPE(mysql_command), + backend_dcb->server->unique_name, + trc ? ": " : ".", + trc ? trc : ""); + free(trc); - switch(mysql_command) { - case MYSQL_COM_CHANGE_USER: - rc = backend_dcb->func.auth( - backend_dcb, - NULL, - backend_dcb->session, - queue); - break; - case MYSQL_COM_QUERY: - if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) - { - trc = modutil_get_SQL(queue); - } - default: - rc = backend_dcb->func.write(backend_dcb, queue); - break; - } - - MXS_INFO("Routed [%s] to '%s'%s%s", - STRPACKETTYPE(mysql_command), - backend_dcb->server->unique_name, - trc?": ":".", - trc?trc:""); - free(trc); return_rc: - return rc; + + return rc; } /** @@ -772,47 +791,47 @@ return_rc: * @param instance Instance of the router * @param dcb DCB to send diagnostics to */ -static void +static void diagnostics(ROUTER *router, DCB *dcb) { -ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router; -ROUTER_CLIENT_SES *session; -int i = 0; -BACKEND *backend; -char *weightby; + ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *) router; + ROUTER_CLIENT_SES *session; + int i = 0; + BACKEND *backend; + char *weightby; - spinlock_acquire(&router_inst->lock); - session = router_inst->connections; - while (session) - { - i++; - session = session->next; - } - spinlock_release(&router_inst->lock); - - dcb_printf(dcb, "\tNumber of router sessions: %d\n", - router_inst->stats.n_sessions); - dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n", i); - dcb_printf(dcb, "\tNumber of queries forwarded: %d\n", - router_inst->stats.n_queries); - if ((weightby = serviceGetWeightingParameter(router_inst->service)) - != NULL) - { - dcb_printf(dcb, "\tConnection distribution based on %s " - "server parameter.\n", - weightby); - dcb_printf(dcb, - "\t\tServer Target %% Connections\n"); - for (i = 0; router_inst->servers[i]; i++) - { - backend = router_inst->servers[i]; - dcb_printf(dcb, "\t\t%-20s %3.1f%% %d\n", - backend->server->unique_name, - (float)backend->weight / 10, - backend->current_connection_count); - } - - } + spinlock_acquire(&router_inst->lock); + session = router_inst->connections; + while (session) + { + i++; + session = session->next; + } + spinlock_release(&router_inst->lock); + + dcb_printf(dcb, "\tNumber of router sessions: %d\n", + router_inst->stats.n_sessions); + dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n", i); + dcb_printf(dcb, "\tNumber of queries forwarded: %d\n", + router_inst->stats.n_queries); + if ((weightby = serviceGetWeightingParameter(router_inst->service)) + != NULL) + { + dcb_printf(dcb, "\tConnection distribution based on %s " + "server parameter.\n", + weightby); + dcb_printf(dcb, + "\t\tServer Target %% Connections\n"); + for (i = 0; router_inst->servers[i]; i++) + { + backend = router_inst->servers[i]; + dcb_printf(dcb, "\t\t%-20s %3.1f%% %d\n", + backend->server->unique_name, + (float) backend->weight / 10, + backend->current_connection_count); + } + + } } /** @@ -825,15 +844,11 @@ char *weightby; * @param backend_dcb The backend DCB * @param queue The GWBUF with reply data */ -static void -clientReply( - ROUTER *instance, - void *router_session, - GWBUF *queue, - DCB *backend_dcb) +static void +clientReply(ROUTER *instance, void *router_session, GWBUF *queue, DCB *backend_dcb) { - ss_dassert(backend_dcb->session->client != NULL); - SESSION_ROUTE_REPLY(backend_dcb->session, queue); + ss_dassert(backend_dcb->session->client != NULL); + SESSION_ROUTE_REPLY(backend_dcb->session, queue); } /** @@ -849,19 +864,14 @@ clientReply( * @param succp Result of action: true if router can continue * */ -static void handleError( - ROUTER *instance, - void *router_session, - GWBUF *errbuf, - DCB *problem_dcb, - error_action_t action, - bool *succp) +static void handleError(ROUTER *instance, void *router_session, GWBUF *errbuf, + DCB *problem_dcb, error_action_t action, bool *succp) { - DCB *client_dcb; - SESSION *session = problem_dcb->session; + DCB *client_dcb; + SESSION *session = problem_dcb->session; session_state_t sesstate; - ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session; + ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *) router_session; /** Don't handle same error twice on same DCB */ if (problem_dcb->dcb_errhandle_called) @@ -877,14 +887,14 @@ static void handleError( spinlock_acquire(&session->ses_lock); sesstate = session->state; client_dcb = session->client; - + if (sesstate == SESSION_STATE_ROUTER_READY) { CHK_DCB(client_dcb); - spinlock_release(&session->ses_lock); + spinlock_release(&session->ses_lock); client_dcb->func.write(client_dcb, gwbuf_clone(errbuf)); } - else + else { spinlock_release(&session->ses_lock); } @@ -904,44 +914,47 @@ static void handleError( } /** to be inline'd */ -/** + +/** * @node Acquires lock to router client session if it is not closed. * * Parameters: * @param rses - in, use - * + * * * @return true if router session was not closed. If return value is true * it means that router is locked, and must be unlocked later. False, if * router was closed before lock was acquired. * - * + * * @details (write detailed description here) * */ -static bool rses_begin_locked_router_action( - ROUTER_CLIENT_SES* rses) +static bool rses_begin_locked_router_action(ROUTER_CLIENT_SES* rses) { - bool succp = false; - - CHK_CLIENT_RSES(rses); + bool succp = false; + + CHK_CLIENT_RSES(rses); + + if (rses->rses_closed) + { + goto return_succp; + } + spinlock_acquire(&rses->rses_lock); + if (rses->rses_closed) + { + spinlock_release(&rses->rses_lock); + goto return_succp; + } + succp = true; - if (rses->rses_closed) { - goto return_succp; - } - spinlock_acquire(&rses->rses_lock); - if (rses->rses_closed) { - spinlock_release(&rses->rses_lock); - goto return_succp; - } - succp = true; - return_succp: - return succp; + return succp; } /** to be inline'd */ -/** + +/** * @node Releases router client session lock. * * Parameters: @@ -950,21 +963,19 @@ return_succp: * * @return void * - * + * * @details (write detailed description here) * */ -static void rses_end_locked_router_action( - ROUTER_CLIENT_SES* rses) +static void rses_end_locked_router_action(ROUTER_CLIENT_SES* rses) { - CHK_CLIENT_RSES(rses); - spinlock_release(&rses->rses_lock); + CHK_CLIENT_RSES(rses); + spinlock_release(&rses->rses_lock); } - static int getCapabilities() { - return RCAP_TYPE_PACKET_INPUT; + return RCAP_TYPE_PACKET_INPUT; } /******************************** @@ -980,65 +991,69 @@ static int getCapabilities() * */ -static BACKEND *get_root_master(BACKEND **servers) { - int i = 0; - BACKEND *master_host = NULL; +static BACKEND *get_root_master(BACKEND **servers) +{ + int i = 0; + BACKEND *master_host = NULL; - for (i = 0; servers[i]; i++) { - if (servers[i] && (servers[i]->server->status & (SERVER_MASTER|SERVER_MAINT)) == SERVER_MASTER) { - if (master_host && servers[i]->server->depth < master_host->server->depth) { - master_host = servers[i]; - } else { - if (master_host == NULL) { - master_host = servers[i]; - } - } - } - } - return master_host; + for (i = 0; servers[i]; i++) + { + if (servers[i] && (servers[i]->server->status & (SERVER_MASTER | SERVER_MAINT)) == SERVER_MASTER) + { + if (master_host && servers[i]->server->depth < master_host->server->depth) + { + master_host = servers[i]; + } + else if (master_host == NULL) + { + master_host = servers[i]; + } + } + } + return master_host; } -static int handle_state_switch(DCB* dcb,DCB_REASON reason, void * routersession) +static int handle_state_switch(DCB* dcb, DCB_REASON reason, void * routersession) { ss_dassert(dcb != NULL); SESSION* session = dcb->session; - ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES*)routersession; + ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES*) routersession; SERVICE* service = session->service; - ROUTER* router = (ROUTER *)service->router; + ROUTER* router = (ROUTER *) service->router; - if (NULL == dcb->session->router_session && DCB_REASON_ERROR != reason) + if (NULL == dcb->session->router_session && DCB_REASON_ERROR != reason) { - /* + /* * We cannot handle a DCB that does not have a router session, * except in the case where error processing is invoked. */ return 0; } - switch(reason) + switch (reason) { - case DCB_REASON_CLOSE: - dcb->func.close(dcb); - break; - case DCB_REASON_DRAINED: - /** Do we need to do anything? */ - break; - case DCB_REASON_HIGH_WATER: - /** Do we need to do anything? */ - break; - case DCB_REASON_LOW_WATER: - /** Do we need to do anything? */ - break; - case DCB_REASON_ERROR: - dcb->func.error(dcb); - break; - case DCB_REASON_HUP: - dcb->func.hangup(dcb); - break; - case DCB_REASON_NOT_RESPONDING: - dcb->func.hangup(dcb); - break; - default: - break; + case DCB_REASON_CLOSE: + dcb->func.close(dcb); + break; + case DCB_REASON_DRAINED: + /** Do we need to do anything? */ + break; + case DCB_REASON_HIGH_WATER: + /** Do we need to do anything? */ + break; + case DCB_REASON_LOW_WATER: + /** Do we need to do anything? */ + break; + case DCB_REASON_ERROR: + dcb->func.error(dcb); + break; + case DCB_REASON_HUP: + dcb->func.hangup(dcb); + break; + case DCB_REASON_NOT_RESPONDING: + dcb->func.hangup(dcb); + break; + default: + break; } return 0; diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 433fcf24e..6be727e56 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -4653,7 +4653,7 @@ static void rwsplit_process_router_options( /** * Error Handler routine to resolve _backend_ failures. If it succeeds then there - * are enough operative backends available and connected. Otherwise it fails, + * are enough operative backends available and connected. Otherwise it fails, * and session is terminated. * * @param instance The router instance @@ -4662,7 +4662,7 @@ static void rwsplit_process_router_options( * @param backend_dcb The backend DCB * @param action The action: ERRACT_NEW_CONNECTION or ERRACT_REPLY_CLIENT * @param succp Result of action: true iff router can continue - * + * * Even if succp == true connecting to new slave may have failed. succp is to * tell whether router has enough master/slave connections to continue work. */ @@ -4670,7 +4670,7 @@ static void handleError ( ROUTER* instance, void* router_session, GWBUF* errmsgbuf, - DCB* backend_dcb, + DCB* problem_dcb, error_action_t action, bool* succp) { @@ -4678,10 +4678,10 @@ static void handleError ( ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session; - CHK_DCB(backend_dcb); + CHK_DCB(problem_dcb); /** Don't handle same error twice on same DCB */ - if (backend_dcb->dcb_errhandle_called) + if (problem_dcb->dcb_errhandle_called) { /** we optimistically assume that previous call succeed */ /* @@ -4693,24 +4693,28 @@ static void handleError ( } else { - backend_dcb->dcb_errhandle_called = true; + problem_dcb->dcb_errhandle_called = true; } - session = backend_dcb->session; - + session = problem_dcb->session; + if (session == NULL || rses == NULL) { *succp = false; - } + } + else if (dcb_isclient(problem_dcb)) + { + *succp = false; + } else { CHK_SESSION(session); CHK_CLIENT_RSES(rses); - + switch (action) { case ERRACT_NEW_CONNECTION: { SERVER* srv; - + if (!rses_begin_locked_router_action(rses)) { *succp = false; @@ -4718,14 +4722,14 @@ static void handleError ( } srv = rses->rses_master_ref->bref_backend->backend_server; /** - * If master has lost its Master status error can't be + * If master has lost its Master status error can't be * handled so that session could continue. */ - if (rses->rses_master_ref->bref_dcb == backend_dcb && + if (rses->rses_master_ref->bref_dcb == problem_dcb && !SERVER_IS_MASTER(srv)) { backend_ref_t* bref; - bref = get_bref_from_dcb(rses, backend_dcb); + bref = get_bref_from_dcb(rses, problem_dcb); if (bref != NULL) { CHK_BACKEND_REF(bref); @@ -4739,7 +4743,7 @@ static void handleError ( "corresponding backend ref.", srv->name, srv->port); - dcb_close(backend_dcb); + dcb_close(problem_dcb); } if (!srv->master_err_is_logged) { @@ -4756,35 +4760,35 @@ static void handleError ( else { /** - * This is called in hope of getting replacement for + * This is called in hope of getting replacement for * failed slave(s). This call may free rses. */ - *succp = handle_error_new_connection(inst, - &rses, - backend_dcb, + *succp = handle_error_new_connection(inst, + &rses, + problem_dcb, errmsgbuf); } /* Free the lock if rses still exists */ if (rses) rses_end_locked_router_action(rses); break; } - + case ERRACT_REPLY_CLIENT: { - handle_error_reply_client(session, - rses, - backend_dcb, + handle_error_reply_client(session, + rses, + problem_dcb, errmsgbuf); *succp = false; /*< no new backend servers were made available */ - break; + break; } - - default: + + default: *succp = false; break; } } - dcb_close(backend_dcb); + dcb_close(problem_dcb); } @@ -4797,7 +4801,7 @@ static void handle_error_reply_client( session_state_t sesstate; DCB* client_dcb; backend_ref_t* bref; - + spinlock_acquire(&ses->ses_lock); sesstate = ses->state; client_dcb = ses->client; @@ -4812,7 +4816,7 @@ static void handle_error_reply_client( bref_clear_state(bref, BREF_IN_USE); bref_set_state(bref, BREF_CLOSED); } - + if (sesstate == SESSION_STATE_ROUTER_READY) { CHK_DCB(client_dcb); diff --git a/server/modules/routing/schemarouter/schemarouter.c b/server/modules/routing/schemarouter/schemarouter.c index 382e5e248..4e5daaf4a 100644 --- a/server/modules/routing/schemarouter/schemarouter.c +++ b/server/modules/routing/schemarouter/schemarouter.c @@ -4001,7 +4001,7 @@ return_succp: /** * Error Handler routine to resolve _backend_ failures. If it succeeds then there - * are enough operative backends available and connected. Otherwise it fails, + * are enough operative backends available and connected. Otherwise it fails, * and session is terminated. * * @param instance The router instance @@ -4010,7 +4010,7 @@ return_succp: * @param backend_dcb The backend DCB * @param action The action: ERRACT_NEW_CONNECTION or ERRACT_REPLY_CLIENT * @param succp Result of action: true iff router can continue - * + * * Even if succp == true connecting to new slave may have failed. succp is to * tell whether router has enough master/slave connections to continue work. */ @@ -4018,7 +4018,7 @@ static void handleError ( ROUTER* instance, void* router_session, GWBUF* errmsgbuf, - DCB* backend_dcb, + DCB* problem_dcb, error_action_t action, bool* succp) { @@ -4026,10 +4026,10 @@ static void handleError ( ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session; - CHK_DCB(backend_dcb); - + CHK_DCB(problem_dcb); + /** Don't handle same error twice on same DCB */ - if (backend_dcb->dcb_errhandle_called) + if (problem_dcb->dcb_errhandle_called) { /** we optimistically assume that previous call succeed */ *succp = true; @@ -4037,19 +4037,23 @@ static void handleError ( } else { - backend_dcb->dcb_errhandle_called = true; + problem_dcb->dcb_errhandle_called = true; } - session = backend_dcb->session; - + session = problem_dcb->session; + if (session == NULL || rses == NULL) { *succp = false; } + else if (dcb_isclient(problem_dcb)) + { + *succp = false; + } else { CHK_SESSION(session); CHK_CLIENT_RSES(rses); - + switch (action) { case ERRACT_NEW_CONNECTION: { @@ -4059,33 +4063,33 @@ static void handleError ( break; } /** - * This is called in hope of getting replacement for + * This is called in hope of getting replacement for * failed slave(s). */ - *succp = handle_error_new_connection(inst, - rses, - backend_dcb, + *succp = handle_error_new_connection(inst, + rses, + problem_dcb, errmsgbuf); rses_end_locked_router_action(rses); break; } - + case ERRACT_REPLY_CLIENT: { - handle_error_reply_client(session, - rses, - backend_dcb, + handle_error_reply_client(session, + rses, + problem_dcb, errmsgbuf); *succp = false; /*< no new backend servers were made available */ - break; + break; } - + default: *succp = false; break; } } - dcb_close(backend_dcb); + dcb_close(problem_dcb); }