From 42bf95eb82035d06c926cde0b00bce5b704b49a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Tue, 25 Jun 2019 10:15:06 +0300 Subject: [PATCH 01/11] Fix MariaDB startup commands Newer systems only define the mariadb service. --- maxscale-system-test/mariadb_nodes.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/maxscale-system-test/mariadb_nodes.cpp b/maxscale-system-test/mariadb_nodes.cpp index e350d655a..deb6e0146 100644 --- a/maxscale-system-test/mariadb_nodes.cpp +++ b/maxscale-system-test/mariadb_nodes.cpp @@ -144,11 +144,11 @@ void Mariadb_nodes::read_env() //reading start_db_command sprintf(env_name, "%s_%03d_start_db_command", prefix, i); - start_db_command[i] = readenv(env_name, (char *) "service mysql start"); + start_db_command[i] = readenv(env_name, (char *) "systemctl start mariadb || service mysql start"); //reading stop_db_command sprintf(env_name, "%s_%03d_stop_db_command", prefix, i); - stop_db_command[i] = readenv(env_name, (char *) "service mysql stop"); + stop_db_command[i] = readenv(env_name, (char *) "systemctl stop mariadb || service mysql stop"); //reading cleanup_db_command sprintf(env_name, "%s_%03d_cleanup_db_command", prefix, i); From d15582d26d8656596d8edf202d4532603137241a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Tue, 25 Jun 2019 17:32:24 +0300 Subject: [PATCH 02/11] Allow posting to stopped workers Due to there being no distinction between a temporarily stopped worker and a permanently stopped one, we must allow posting of messages to stopped workers. --- server/core/worker.cc | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/server/core/worker.cc b/server/core/worker.cc index b699e740b..14cc7df3a 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -719,17 +719,8 @@ size_t Worker::execute_concurrently(Task& task) bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { // NOTE: No logging here, this function must be signal safe. - bool rval = false; - // TODO: Fix this, it will be hit - // ss_dassert(state() != Worker::STOPPED); - - if (state() != Worker::STOPPED) - { - MessageQueue::Message message(msg_id, arg1, arg2); - rval = m_pQueue->post(message); - } - - return rval; + MessageQueue::Message message(msg_id, arg1, arg2); + return m_pQueue->post(message); } bool mxs_worker_post_message(MXS_WORKER* pWorker, uint32_t msg_id, intptr_t arg1, intptr_t arg2) From 991067372dde0d4665afe462f7ecca704c289a76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Wed, 26 Jun 2019 08:06:32 +0300 Subject: [PATCH 03/11] Improve systemd check error message The error now displays the correct package name for systems that use RPMs. --- cmake/CheckPlatform.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/CheckPlatform.cmake b/cmake/CheckPlatform.cmake index 183e59dfc..a000b668b 100644 --- a/cmake/CheckPlatform.cmake +++ b/cmake/CheckPlatform.cmake @@ -54,7 +54,7 @@ if(HAVE_SYSTEMD) elseif(NOT BUILD_SYSTEM_TESTS) # If systemd is in use, require libsystemd-dev to be installed if(NOT NOT_SYSTEMD_IS_RUNNING) - message( FATAL_ERROR "systemd is running: please install libsystemd-dev" ) + message(FATAL_ERROR "systemd is running: please install libsystemd-dev (DEB) or systemd-devel (RPM)") endif() endif() From 25a076d0723ac2c55f36d866be2d54b81a7577cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Wed, 26 Jun 2019 12:27:41 +0300 Subject: [PATCH 04/11] Fix buffer sizes in PS extraction code The code used 4 byte buffers for 2 byte values. --- server/modules/protocol/MySQL/mysql_common.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/modules/protocol/MySQL/mysql_common.cc b/server/modules/protocol/MySQL/mysql_common.cc index 648ce60bc..4efded961 100644 --- a/server/modules/protocol/MySQL/mysql_common.cc +++ b/server/modules/protocol/MySQL/mysql_common.cc @@ -1299,8 +1299,8 @@ bool mxs_mysql_extract_ps_response(GWBUF* buffer, MXS_PS_RESPONSE* out) { bool rval = false; uint8_t id[MYSQL_PS_ID_SIZE]; - uint8_t cols[MYSQL_PS_ID_SIZE]; - uint8_t params[MYSQL_PS_ID_SIZE]; + uint8_t cols[MYSQL_PS_COLS_SIZE]; + uint8_t params[MYSQL_PS_PARAMS_SIZE]; uint8_t warnings[MYSQL_PS_WARN_SIZE]; if (gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET, sizeof(id), id) == sizeof(id) From 8d50450b5a9eddb8fc4f7f2e6c5848e996f583a0 Mon Sep 17 00:00:00 2001 From: "wuzang.hdp" Date: Thu, 20 Jun 2019 16:10:19 +0800 Subject: [PATCH 05/11] MXS-2521:Route subseqenct COM_STMT_EXECUTE to the same server which first COM_STMT_EXECUTE was executed on --- .../readwritesplit/rwsplit_internal.hh | 2 +- .../readwritesplit/rwsplit_route_stmt.cc | 59 +++++++++++++++---- .../readwritesplit/rwsplit_session_cmd.cc | 2 +- .../routing/readwritesplit/rwsplitsession.cc | 8 ++- .../routing/readwritesplit/rwsplitsession.hh | 6 +- 5 files changed, 60 insertions(+), 17 deletions(-) diff --git a/server/modules/routing/readwritesplit/rwsplit_internal.hh b/server/modules/routing/readwritesplit/rwsplit_internal.hh index fd4745a9d..aa0d80b5e 100644 --- a/server/modules/routing/readwritesplit/rwsplit_internal.hh +++ b/server/modules/routing/readwritesplit/rwsplit_internal.hh @@ -78,7 +78,7 @@ void handle_multi_temp_and_load(RWSplitSession *rses, GWBUF *querybuf, SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf, route_target_t route_target); SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses, - uint8_t cmd, uint32_t id); + const GWBUF *query, const RouteInfo& info); bool handle_master_is_target(RWSplit *inst, RWSplitSession *rses, SRWBackend* dest); bool handle_got_target(RWSplit *inst, RWSplitSession *rses, diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 0ac458292..741977a54 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -93,7 +93,7 @@ void handle_connection_keepalive(RWSplit *inst, RWSplitSession *rses, } route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer, - uint8_t* command, uint32_t* type, uint32_t* stmt_id) + uint8_t* command, uint32_t* type, uint32_t* stmt_id, uint16_t* n_params) { route_target_t route_target = TARGET_MASTER; bool in_read_only_trx = rses->target_node && session_trx_is_read_only(rses->client_dcb->session); @@ -161,7 +161,7 @@ route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer, } else if (is_ps_command(*command)) { - *stmt_id = get_internal_ps_id(rses, buffer); + *stmt_id = get_internal_ps_id(rses, buffer, n_params); *type = rses->ps_manager.get_type(*stmt_id); } @@ -250,7 +250,7 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses, GWBUF *querybuf, con } else if (TARGET_IS_SLAVE(route_target)) { - if ((target = handle_slave_is_target(inst, rses, command, stmt_id))) + if ((target = handle_slave_is_target(inst, rses, querybuf, info))) { succp = true; @@ -999,6 +999,38 @@ SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf, return target; } +/** + * @brief Determine whether this stmt is subsequent COM_STMT_EXECUTE + * + * @param cmd command type + * @param query GWBUF including the query + * @param info Route info + * + * return is subsequent COM_STMT_EXECUTE + */ +bool is_sub_stmt_exec(uint8_t cmd, const GWBUF *query, uint16_t n_params) +{ + if (cmd != COM_STMT_EXECUTE || n_params == 0) + { + return false; + } + + bool rval = true; + + /*https://mariadb.com/kb/en/library/com_stmt_execute/*/ + /*need n_params to parse new_params_bound_flag(alias send type to server)*/ + int new_params_bound_flag_offset = MYSQL_HEADER_LEN + 10 + (n_params + 7) / 8; + ss_dassert(gwbuf_length(query) <= new_params_bound_flag_offset); + uint8_t data[new_params_bound_flag_offset]; + gwbuf_copy_data(query, 0, new_params_bound_flag_offset, data); + if (data[new_params_bound_flag_offset]) + { + rval = false; + } + + return rval; +} + /** * @brief Handle slave is the target * @@ -1006,20 +1038,27 @@ SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf, * * @param inst Router instance * @param ses Router session - * @param target_dcb DCB for the target server + * @param query GWBUF including the query + * @param info Holding routing related information * * @return bool - true if succeeded, false otherwise */ SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses, - uint8_t cmd, uint32_t stmt_id) + const GWBUF *query, const RouteInfo& info) { int rlag_max = rses_get_max_replication_lag(rses); SRWBackend target; + uint8_t cmd = info.command; + uint32_t stmt_id = info.stmt_id; + uint16_t n_params = info.n_params; - if (cmd == MXS_COM_STMT_FETCH) + if (cmd == MXS_COM_STMT_FETCH || is_sub_stmt_exec(cmd, query, n_params)) { /** The COM_STMT_FETCH must be executed on the same server as the - * COM_STMT_EXECUTE was executed on */ + * COM_STMT_EXECUTE was executed on, the subsequent COM_STMT_EXECUTE also + */ + + const char* command_str = cmd == MXS_COM_STMT_FETCH ? "COM_STMT_FETCH" : "subseqent COM_STMT_EXECUTE"; ExecMap::iterator it = rses->exec_map.find(stmt_id); if (it != rses->exec_map.end()) @@ -1027,17 +1066,17 @@ SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses, if (it->second->in_use()) { target = it->second; - MXS_INFO("COM_STMT_FETCH on %s", target->name()); + MXS_INFO("%s on %s", command_str, target->name()); } else { MXS_ERROR("Old COM_STMT_EXECUTE target %s not in use, cannot " - "proceed with COM_STMT_FETCH", it->second->name()); + "proceed with %s", it->second->name(), command_str); } } else { - MXS_WARNING("Unknown statement ID %u used in COM_STMT_FETCH", stmt_id); + MXS_WARNING("Unknown statement ID %u used in %s", stmt_id, command_str); } } else diff --git a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc index f9bb2684e..6324ea778 100644 --- a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc +++ b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc @@ -122,7 +122,7 @@ void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend, { /** Map the returned response to the internal ID */ MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id); - rses->ps_handles[resp.id] = id; + rses->ps_handles[resp.id] = (id << 16) + resp.parameters; } // Discard any slave connections that did not return the same result diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 858614f3b..1079e7761 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -110,7 +110,7 @@ bool RWBackend::consume_fetched_rows(GWBUF* buffer) return m_expected_rows == 0; } -uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer) +uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer, uint16_t* n_params) { uint32_t rval = 0; @@ -120,7 +120,8 @@ uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer) if (it != rses->ps_handles.end()) { - rval = it->second; + rval = it->second >> 16; + *n_params = it->second & 0xffff; } else { @@ -135,7 +136,8 @@ RouteInfo::RouteInfo(RWSplitSession* rses, GWBUF* buffer): target(TARGET_UNDEFINED), command(0xff), type(QUERY_TYPE_UNKNOWN), - stmt_id(0) + stmt_id(0), + n_params(0) { target = get_target_type(rses, buffer, &command, &type, &stmt_id); } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 33e09c87b..4e5b49ff1 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -32,7 +32,7 @@ enum reply_state_t rstostr((a)->get_reply_state()), rstostr(b)); typedef std::map BackendHandleMap; /** Internal ID to external ID */ -typedef std::map ClientHandleMap; /** External ID to internal ID */ +typedef std::map ClientHandleMap; /** External ID to internal ID */ class RWBackend: public mxs::Backend { @@ -171,6 +171,7 @@ struct RouteInfo uint8_t command; /**< The command byte, 0xff for unknown commands */ uint32_t type; /**< The query type, QUERY_TYPE_UNKNOWN for unknown types*/ uint32_t stmt_id; /**< Prepared statement ID, 0 for unknown */ + uint16_t n_params; /**< Prepared statement params count */ }; /** @@ -202,7 +203,8 @@ static inline const char* rstostr(reply_state_t state) * * @param rses Router client session * @param buffer Buffer containing a binary protocol statement other than COM_STMT_PREPARE + * @param n_params statement parmas number * * @return The internal ID of the prepared statement that the buffer contents refer to */ -uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer); +uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer, uint16_t* n_params); From 6b31b80e76069669456ca7c2949ab6c72e87b54f Mon Sep 17 00:00:00 2001 From: "wuzang.hdp" Date: Thu, 20 Jun 2019 16:38:07 +0800 Subject: [PATCH 06/11] fix wrong assert --- server/modules/routing/readwritesplit/rwsplit_route_stmt.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 741977a54..23f2f7e0d 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -1020,7 +1020,7 @@ bool is_sub_stmt_exec(uint8_t cmd, const GWBUF *query, uint16_t n_params) /*https://mariadb.com/kb/en/library/com_stmt_execute/*/ /*need n_params to parse new_params_bound_flag(alias send type to server)*/ int new_params_bound_flag_offset = MYSQL_HEADER_LEN + 10 + (n_params + 7) / 8; - ss_dassert(gwbuf_length(query) <= new_params_bound_flag_offset); + ss_dassert((int)gwbuf_length(query) >= new_params_bound_flag_offset); uint8_t data[new_params_bound_flag_offset]; gwbuf_copy_data(query, 0, new_params_bound_flag_offset, data); if (data[new_params_bound_flag_offset]) From 51ce3c53fd74bb2788e83d3d4d7ca17d0bb23ec9 Mon Sep 17 00:00:00 2001 From: "wuzang.hdp" Date: Thu, 20 Jun 2019 19:52:20 +0800 Subject: [PATCH 07/11] shift 32 --- server/modules/routing/readwritesplit/rwsplit_session_cmd.cc | 2 +- server/modules/routing/readwritesplit/rwsplitsession.cc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc index 6324ea778..7bd56edbc 100644 --- a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc +++ b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc @@ -122,7 +122,7 @@ void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend, { /** Map the returned response to the internal ID */ MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id); - rses->ps_handles[resp.id] = (id << 16) + resp.parameters; + rses->ps_handles[resp.id] = (id << 32) + resp.parameters; } // Discard any slave connections that did not return the same result diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 1079e7761..4d168c573 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -120,8 +120,8 @@ uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer, uint16_t* n_par if (it != rses->ps_handles.end()) { - rval = it->second >> 16; - *n_params = it->second & 0xffff; + rval = it->second >> 32; + *n_params = it->second & 0xffffffff; } else { From 5c762bb84137fd8f571084e69fd3a2eb32273861 Mon Sep 17 00:00:00 2001 From: "wuzang.hdp" Date: Thu, 27 Jun 2019 10:54:56 +0800 Subject: [PATCH 08/11] misc fix --- .../modules/routing/readwritesplit/rwsplit_route_stmt.cc | 8 ++++---- server/modules/routing/readwritesplit/rwsplitsession.hh | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 23f2f7e0d..2b2d0ec15 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -1002,7 +1002,7 @@ SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf, /** * @brief Determine whether this stmt is subsequent COM_STMT_EXECUTE * - * @param cmd command type + * @param cmd command type * @param query GWBUF including the query * @param info Route info * @@ -1021,9 +1021,9 @@ bool is_sub_stmt_exec(uint8_t cmd, const GWBUF *query, uint16_t n_params) /*need n_params to parse new_params_bound_flag(alias send type to server)*/ int new_params_bound_flag_offset = MYSQL_HEADER_LEN + 10 + (n_params + 7) / 8; ss_dassert((int)gwbuf_length(query) >= new_params_bound_flag_offset); - uint8_t data[new_params_bound_flag_offset]; - gwbuf_copy_data(query, 0, new_params_bound_flag_offset, data); - if (data[new_params_bound_flag_offset]) + uint8_t flag; + gwbuf_copy_data(query, new_params_bound_flag_offset, 1, &flag); + if (flag) { rval = false; } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 4e5b49ff1..0ff32d5c0 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -203,7 +203,7 @@ static inline const char* rstostr(reply_state_t state) * * @param rses Router client session * @param buffer Buffer containing a binary protocol statement other than COM_STMT_PREPARE - * @param n_params statement parmas number + * @param n_params Statement parmas count * * @return The internal ID of the prepared statement that the buffer contents refer to */ From 83e276ebd3e00d857b11cea34d48d9f213552b4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Wed, 26 Jun 2019 15:26:45 +0300 Subject: [PATCH 09/11] MXS-2521: Fix the test Fixed the test to correctly set the parameters and also to first verify that a direct connection works before doing the test via MaxScale. --- maxscale-system-test/mxs2521_double_exec.cpp | 82 +++++++++++++------- 1 file changed, 53 insertions(+), 29 deletions(-) diff --git a/maxscale-system-test/mxs2521_double_exec.cpp b/maxscale-system-test/mxs2521_double_exec.cpp index ad110ecfe..c6ef40729 100644 --- a/maxscale-system-test/mxs2521_double_exec.cpp +++ b/maxscale-system-test/mxs2521_double_exec.cpp @@ -5,63 +5,87 @@ #include "testconnections.h" -int main(int argc, char** argv) +void do_test(TestConnections& test, MYSQL* conn) { - TestConnections test(argc, argv); - test.maxscales->connect(); - - auto conn = test.maxscales->conn_rwsplit[0]; - - test.try_query(conn, "DROP TABLE IF EXISTS double_execute;"); - test.try_query(conn, "CREATE TABLE double_execute(a int);"); - test.try_query(conn, "INSERT INTO double_execute VALUES (123), (456)"); - auto stmt = mysql_stmt_init(conn); std::string sql = "select a, @@server_id from double_execute where a = ?"; test.expect(mysql_stmt_prepare(stmt, sql.c_str(), sql.length()) == 0, "Prepare should work: %s", mysql_error(conn)); - int data[2] = {0, 0}; - MYSQL_BIND my_bind[2] = {}; - char is_null = 0; - my_bind[0].buffer_type = MYSQL_TYPE_LONG; - my_bind[0].buffer = &data[0]; - my_bind[0].buffer_length = sizeof(data[0]); - my_bind[0].is_null = &is_null; - my_bind[1].buffer_type = MYSQL_TYPE_LONG; - my_bind[1].buffer = &data[1]; - my_bind[1].buffer_length = sizeof(data[2]); - my_bind[1].is_null = &is_null; - data[1] = 123; - test.expect(mysql_stmt_bind_param(stmt, my_bind) == 0, "Bind: %s", mysql_stmt_error(stmt)); + int data_out = 123; + MYSQL_BIND bind_out; + char null_out = 0; + bind_out.buffer_type = MYSQL_TYPE_LONG; + bind_out.buffer = &data_out; + bind_out.buffer_length = sizeof(data_out); + bind_out.is_null = &null_out; + test.expect(mysql_stmt_bind_param(stmt, &bind_out) == 0, "Bind: %s", mysql_stmt_error(stmt)); // The first execute is done on the master test.try_query(conn, "BEGIN"); test.expect(mysql_stmt_execute(stmt) == 0, "First execute should work: %s", mysql_stmt_error(stmt)); - data[0] = 0; + int data_in[2] = {}; + MYSQL_BIND bind_in[2] = {}; + char null_in[2] = {}; + + for (int i = 0; i < 2; i++) + { + bind_in[i].buffer_type = MYSQL_TYPE_LONG; + bind_in[i].buffer = &data_in[i]; + bind_in[i].buffer_length = sizeof(data_in[i]); + bind_in[i].is_null = &null_in[i]; + } + + mysql_stmt_bind_result(stmt, bind_in); mysql_stmt_store_result(stmt); + test.expect(mysql_stmt_fetch(stmt) == 0, "First fetch of first execute should work"); - test.expect(data[0] == 123, "Query should return one row with value 123: `%d`", data[0]); + test.expect(data_in[0] == 123, "Query should return one row with value 123: `%d`", data_in[0]); test.expect(mysql_stmt_fetch(stmt) != 0, "Second fetch of first execute should NOT work"); + int first_server = data_in[1]; + test.try_query(conn, "COMMIT"); // The second execute goes to a slave, no new parameters are sent in it - data[0] = 123; + memset(data_in, 0, sizeof(data_in)); test.expect(mysql_stmt_execute(stmt) == 0, "Second execute should work: %s", mysql_stmt_error(stmt)); - data[0] = 0; + mysql_stmt_bind_result(stmt, bind_in); mysql_stmt_store_result(stmt); test.expect(mysql_stmt_fetch(stmt) == 0, "First fetch of second execute should work"); - test.expect(data[0] == 123, "Query should return one row with value 123: `%d`", data[0]); + test.expect(data_in[0] == 123, "Query should return one row with value 123: `%d`", data_in[0]); + test.expect(data_in[1] == first_server, + "The query should be routed to the server with server_id %d, not %d", + first_server, data_in[1]); test.expect(mysql_stmt_fetch(stmt) != 0, "Second fetch of second execute should NOT work"); mysql_stmt_close(stmt); +} - test.try_query(conn, "DROP TABLE IF EXISTS double_execute;"); +int main(int argc, char** argv) +{ + TestConnections test(argc, argv); + + test.repl->connect(); + test.maxscales->connect(); + + // Prepare a table + test.try_query(test.repl->nodes[0], "DROP TABLE IF EXISTS double_execute;"); + test.try_query(test.repl->nodes[0], "CREATE TABLE double_execute(a int);"); + test.try_query(test.repl->nodes[0], "INSERT INTO double_execute VALUES (123), (456)"); + test.repl->sync_slaves(); + + test.tprintf("Running test with a direct connection"); + do_test(test, test.repl->nodes[0]); + + test.tprintf("Running test through readwritesplit"); + do_test(test, test.maxscales->conn_rwsplit[0]); + + test.try_query(test.repl->nodes[0], "DROP TABLE IF EXISTS double_execute;"); return test.global_result; } From a6617f52fc6df62164bdc02184db53da5a978a32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Wed, 26 Jun 2019 16:53:19 +0300 Subject: [PATCH 10/11] MXS-2578: Remove installation of /var/lib/maxscale The directory was installed as the root user but later on in the installation process the owner would be changed to the maxscale user. This causes some validation programs to fail as they expect installed files to retain the original ownership. --- cmake/package_rpm.cmake | 3 --- 1 file changed, 3 deletions(-) diff --git a/cmake/package_rpm.cmake b/cmake/package_rpm.cmake index 52bcdc7ff..32afd8c7b 100644 --- a/cmake/package_rpm.cmake +++ b/cmake/package_rpm.cmake @@ -40,9 +40,6 @@ set(CPACK_RPM_USER_FILELIST "${IGNORED_DIRS}") if(TARGET_COMPONENT STREQUAL "core" OR TARGET_COMPONENT STREQUAL "all") set(CPACK_RPM_POST_INSTALL_SCRIPT_FILE ${CMAKE_BINARY_DIR}/postinst) set(CPACK_RPM_POST_UNINSTALL_SCRIPT_FILE ${CMAKE_BINARY_DIR}/postrm) - - # Installing this prevents RPM from deleting the /var/lib/maxscale folder - install(DIRECTORY DESTINATION ${MAXSCALE_VARDIR}/lib/maxscale) endif() if(EXTRA_PACKAGE_DEPENDENCIES) From 5eba688c1bcd194e7ab2d0a2e18f2bcc2354cf48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Wed, 26 Jun 2019 12:33:46 +0300 Subject: [PATCH 11/11] MXS-2521: Detect COM_STMT_EXECUTE without metadata If a COM_STMT_EXECUTE has no metadata in it and it has more than one parameter, it must be routed to the same backend where the previous COM_STMT_EXECUTE with the same ID was routed to. This prevents MDEV-19811 that is triggered by MaxScale routing the queries to different backends. --- include/maxscale/queryclassifier.hh | 26 +++++- server/core/queryclassifier.cc | 85 ++++++++++++++++++- .../readwritesplit/rwsplit_route_stmt.cc | 4 +- .../readwritesplit/rwsplit_session_cmd.cc | 2 +- 4 files changed, 105 insertions(+), 12 deletions(-) diff --git a/include/maxscale/queryclassifier.hh b/include/maxscale/queryclassifier.hh index 3af22b6a5..78ea4fed6 100644 --- a/include/maxscale/queryclassifier.hh +++ b/include/maxscale/queryclassifier.hh @@ -234,6 +234,17 @@ public: return qc_query_is_type(m_route_info.type_mask(), QUERY_TYPE_BEGIN_TRX); } + /** + * Whether the current binary protocol statement is a continuation of a previously executed statement. + * + * All COM_STMT_FETCH are continuations of a previously executed COM_STMT_EXECUTE. A COM_STMT_EXECUTE can + * be a continuation if it has parameters but it doesn't provide the metadata for them. + */ + bool is_ps_continuation() const + { + return m_ps_continuation; + } + /** * @brief Store and process a prepared statement * @@ -251,12 +262,16 @@ public: void ps_erase(GWBUF* buffer); /** - * @brief Store a mapping from an external id to the corresponding internal id + * @brief Store a prepared statement response * - * @param external_id The external id as seen by the client. - * @param internal_id The corresponding internal id. + * The response maps the internal ID to the external ID that is given to the client. It also collects + * the number of parameters in the prepared statement which are required in some cases in the routing + * process. + * + * @param internal_id The internal id (i.e. the session command number) + * @param buffer The buffer containing the OK response to a COM_STMT_PREPARE */ - void ps_id_internal_put(uint32_t external_id, uint32_t internal_id); + void ps_store_response(uint32_t internal_id, GWBUF* buffer); /** * @brief Update the current RouteInfo. @@ -373,6 +388,8 @@ private: uint8_t packet_type, uint32_t* qtype); + bool query_continues_ps(uint8_t cmd, uint32_t stmt_id, GWBUF* buffer); + private: class PSManager; typedef std::shared_ptr SPSManager; @@ -397,5 +414,6 @@ private: HandleMap m_ps_handles; /** External ID to internal ID */ RouteInfo m_route_info; bool m_trx_is_read_only; + bool m_ps_continuation; }; } diff --git a/server/core/queryclassifier.cc b/server/core/queryclassifier.cc index 7ebe25395..84ccd41d4 100644 --- a/server/core/queryclassifier.cc +++ b/server/core/queryclassifier.cc @@ -75,6 +75,19 @@ uint32_t qc_mysql_extract_ps_id(GWBUF* buffer) return rval; } +uint16_t qc_extract_ps_param_count(GWBUF* buffer) +{ + uint16_t rval = 0; + uint8_t params[MYSQL_PS_PARAMS_SIZE]; + + if (gwbuf_copy_data(buffer, MYSQL_PS_PARAMS_OFFSET, sizeof(params), params) == sizeof(params)) + { + rval = gw_mysql_get_byte2(params); + } + + return rval; +} + bool have_semicolon(const char* ptr, int len) { for (int i = 0; i < len; i++) @@ -265,7 +278,7 @@ public: break; case MXS_COM_STMT_PREPARE: - m_binary_ps[id] = get_prepare_type(buffer); + m_binary_ps[id].type = get_prepare_type(buffer); break; default: @@ -281,7 +294,7 @@ public: if (it != m_binary_ps.end()) { - rval = it->second; + rval = it->second.type; } else { @@ -342,8 +355,32 @@ public: } } + void set_param_count(uint32_t id, uint16_t param_count) + { + m_binary_ps[id].param_count = param_count; + } + + uint16_t param_count(uint32_t id) const + { + uint16_t rval = 0; + auto it = m_binary_ps.find(id); + + if (it != m_binary_ps.end()) + { + rval = it->second.param_count; + } + + return rval; + } + private: - typedef std::unordered_map BinaryPSMap; + struct BinaryPS + { + uint32_t type = 0; + uint16_t param_count = 0; + }; + + typedef std::unordered_map BinaryPSMap; typedef std::unordered_map TextPSMap; private: @@ -368,6 +405,7 @@ QueryClassifier::QueryClassifier(Handler* pHandler, , m_multi_statements_allowed(are_multi_statements_allowed(pSession)) , m_sPs_manager(new PSManager) , m_trx_is_read_only(true) + , m_ps_continuation(false) { } @@ -622,9 +660,15 @@ uint32_t QueryClassifier::ps_id_internal_get(GWBUF* pBuffer) return internal_id; } -void QueryClassifier::ps_id_internal_put(uint32_t external_id, uint32_t internal_id) +void QueryClassifier::ps_store_response(uint32_t internal_id, GWBUF* buffer) { + auto external_id = qc_mysql_extract_ps_id(buffer); m_ps_handles[external_id] = internal_id; + + if (auto param_count = qc_extract_ps_param_count(buffer)) + { + m_sPs_manager->set_param_count(internal_id, param_count); + } } void QueryClassifier::log_transaction_status(GWBUF* querybuf, uint32_t qtype) @@ -909,6 +953,38 @@ QueryClassifier::current_target_t QueryClassifier::handle_multi_temp_and_load( return rv; } +bool QueryClassifier::query_continues_ps(uint8_t cmd, uint32_t stmt_id, GWBUF* buffer) +{ + bool rval = false; + + if (cmd == COM_STMT_FETCH) + { + // COM_STMT_FETCH should always go to the same target as the COM_STMT_EXECUTE + rval = true; + } + else if (cmd == MXS_COM_STMT_EXECUTE) + { + if (auto params = m_sPs_manager->param_count(stmt_id)) + { + size_t types_offset = MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + ((params + 7) / 8); + uint8_t have_types = 0; + + if (gwbuf_copy_data(buffer, types_offset, 1, &have_types)) + { + if (have_types == 0) + { + // A previous COM_STMT_EXECUTE provided the field types, and this one relies on the + // previous one. This means that this query must be routed to the same server where the + // previous COM_STMT_EXECUTE was routed. + rval = true; + } + } + } + } + + return rval; +} + QueryClassifier::RouteInfo QueryClassifier::update_route_info( QueryClassifier::current_target_t current_target, GWBUF* pBuffer) @@ -1005,6 +1081,7 @@ QueryClassifier::RouteInfo QueryClassifier::update_route_info( { stmt_id = ps_id_internal_get(pBuffer); type_mask = ps_get_type(stmt_id); + m_ps_continuation = query_continues_ps(command, stmt_id, pBuffer); } route_target = get_route_target(command, type_mask); diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index da86d25fe..d88ae1ca7 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -895,10 +895,8 @@ SRWBackend RWSplitSession::handle_slave_is_target(uint8_t cmd, uint32_t stmt_id) int rlag_max = get_max_replication_lag(); SRWBackend target; - if (cmd == MXS_COM_STMT_FETCH) + if (m_qc.is_ps_continuation()) { - /** The COM_STMT_FETCH must be executed on the same server as the - * COM_STMT_EXECUTE was executed on */ ExecMap::iterator it = m_exec_map.find(stmt_id); if (it != m_exec_map.end()) diff --git a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc index 57963013a..762cd10ef 100644 --- a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc +++ b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc @@ -126,7 +126,7 @@ void RWSplitSession::process_sescmd_response(SRWBackend& backend, GWBUF** ppPack { /** Map the returned response to the internal ID */ MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id); - m_qc.ps_id_internal_put(resp.id, id); + m_qc.ps_store_response(id, *ppPacket); } // Discard any slave connections that did not return the same result