diff --git a/cmake/CheckPlatform.cmake b/cmake/CheckPlatform.cmake index 183e59dfc..a000b668b 100644 --- a/cmake/CheckPlatform.cmake +++ b/cmake/CheckPlatform.cmake @@ -54,7 +54,7 @@ if(HAVE_SYSTEMD) elseif(NOT BUILD_SYSTEM_TESTS) # If systemd is in use, require libsystemd-dev to be installed if(NOT NOT_SYSTEMD_IS_RUNNING) - message( FATAL_ERROR "systemd is running: please install libsystemd-dev" ) + message(FATAL_ERROR "systemd is running: please install libsystemd-dev (DEB) or systemd-devel (RPM)") endif() endif() diff --git a/cmake/package_rpm.cmake b/cmake/package_rpm.cmake index 52bcdc7ff..32afd8c7b 100644 --- a/cmake/package_rpm.cmake +++ b/cmake/package_rpm.cmake @@ -40,9 +40,6 @@ set(CPACK_RPM_USER_FILELIST "${IGNORED_DIRS}") if(TARGET_COMPONENT STREQUAL "core" OR TARGET_COMPONENT STREQUAL "all") set(CPACK_RPM_POST_INSTALL_SCRIPT_FILE ${CMAKE_BINARY_DIR}/postinst) set(CPACK_RPM_POST_UNINSTALL_SCRIPT_FILE ${CMAKE_BINARY_DIR}/postrm) - - # Installing this prevents RPM from deleting the /var/lib/maxscale folder - install(DIRECTORY DESTINATION ${MAXSCALE_VARDIR}/lib/maxscale) endif() if(EXTRA_PACKAGE_DEPENDENCIES) diff --git a/include/maxscale/queryclassifier.hh b/include/maxscale/queryclassifier.hh index 7a22cd305..e25518121 100644 --- a/include/maxscale/queryclassifier.hh +++ b/include/maxscale/queryclassifier.hh @@ -234,6 +234,17 @@ public: return qc_query_is_type(m_route_info.type_mask(), QUERY_TYPE_BEGIN_TRX); } + /** + * Whether the current binary protocol statement is a continuation of a previously executed statement. + * + * All COM_STMT_FETCH are continuations of a previously executed COM_STMT_EXECUTE. A COM_STMT_EXECUTE can + * be a continuation if it has parameters but it doesn't provide the metadata for them. + */ + bool is_ps_continuation() const + { + return m_ps_continuation; + } + /** * @brief Store and process a prepared statement * @@ -251,12 +262,16 @@ public: void ps_erase(GWBUF* buffer); /** - * @brief Store a mapping from an external id to the corresponding internal id + * @brief Store a prepared statement response * - * @param external_id The external id as seen by the client. - * @param internal_id The corresponding internal id. + * The response maps the internal ID to the external ID that is given to the client. It also collects + * the number of parameters in the prepared statement which are required in some cases in the routing + * process. + * + * @param internal_id The internal id (i.e. the session command number) + * @param buffer The buffer containing the OK response to a COM_STMT_PREPARE */ - void ps_id_internal_put(uint32_t external_id, uint32_t internal_id); + void ps_store_response(uint32_t internal_id, GWBUF* buffer); /** * @brief Update the current RouteInfo. @@ -373,6 +388,8 @@ private: uint8_t packet_type, uint32_t* qtype); + bool query_continues_ps(uint8_t cmd, uint32_t stmt_id, GWBUF* buffer); + private: class PSManager; typedef std::shared_ptr SPSManager; @@ -397,5 +414,6 @@ private: HandleMap m_ps_handles; /** External ID to internal ID */ RouteInfo m_route_info; bool m_trx_is_read_only; + bool m_ps_continuation; }; } diff --git a/maxscale-system-test/mariadb_nodes.cpp b/maxscale-system-test/mariadb_nodes.cpp index c97b4e156..50dcaa997 100644 --- a/maxscale-system-test/mariadb_nodes.cpp +++ b/maxscale-system-test/mariadb_nodes.cpp @@ -179,11 +179,11 @@ void Mariadb_nodes::read_env() // reading start_db_command sprintf(env_name, "%s_%03d_start_db_command", prefix, i); - start_db_command[i] = readenv(env_name, (char *) "service mysql start"); + start_db_command[i] = readenv(env_name, (char *) "systemctl start mariadb || service mysql start"); // reading stop_db_command sprintf(env_name, "%s_%03d_stop_db_command", prefix, i); - stop_db_command[i] = readenv(env_name, (char *) "service mysql stop"); + stop_db_command[i] = readenv(env_name, (char *) "systemctl stop mariadb || service mysql stop"); // reading cleanup_db_command sprintf(env_name, "%s_%03d_cleanup_db_command", prefix, i); diff --git a/maxscale-system-test/mxs2521_double_exec.cpp b/maxscale-system-test/mxs2521_double_exec.cpp index ad110ecfe..c6ef40729 100644 --- a/maxscale-system-test/mxs2521_double_exec.cpp +++ b/maxscale-system-test/mxs2521_double_exec.cpp @@ -5,63 +5,87 @@ #include "testconnections.h" -int main(int argc, char** argv) +void do_test(TestConnections& test, MYSQL* conn) { - TestConnections test(argc, argv); - test.maxscales->connect(); - - auto conn = test.maxscales->conn_rwsplit[0]; - - test.try_query(conn, "DROP TABLE IF EXISTS double_execute;"); - test.try_query(conn, "CREATE TABLE double_execute(a int);"); - test.try_query(conn, "INSERT INTO double_execute VALUES (123), (456)"); - auto stmt = mysql_stmt_init(conn); std::string sql = "select a, @@server_id from double_execute where a = ?"; test.expect(mysql_stmt_prepare(stmt, sql.c_str(), sql.length()) == 0, "Prepare should work: %s", mysql_error(conn)); - int data[2] = {0, 0}; - MYSQL_BIND my_bind[2] = {}; - char is_null = 0; - my_bind[0].buffer_type = MYSQL_TYPE_LONG; - my_bind[0].buffer = &data[0]; - my_bind[0].buffer_length = sizeof(data[0]); - my_bind[0].is_null = &is_null; - my_bind[1].buffer_type = MYSQL_TYPE_LONG; - my_bind[1].buffer = &data[1]; - my_bind[1].buffer_length = sizeof(data[2]); - my_bind[1].is_null = &is_null; - data[1] = 123; - test.expect(mysql_stmt_bind_param(stmt, my_bind) == 0, "Bind: %s", mysql_stmt_error(stmt)); + int data_out = 123; + MYSQL_BIND bind_out; + char null_out = 0; + bind_out.buffer_type = MYSQL_TYPE_LONG; + bind_out.buffer = &data_out; + bind_out.buffer_length = sizeof(data_out); + bind_out.is_null = &null_out; + test.expect(mysql_stmt_bind_param(stmt, &bind_out) == 0, "Bind: %s", mysql_stmt_error(stmt)); // The first execute is done on the master test.try_query(conn, "BEGIN"); test.expect(mysql_stmt_execute(stmt) == 0, "First execute should work: %s", mysql_stmt_error(stmt)); - data[0] = 0; + int data_in[2] = {}; + MYSQL_BIND bind_in[2] = {}; + char null_in[2] = {}; + + for (int i = 0; i < 2; i++) + { + bind_in[i].buffer_type = MYSQL_TYPE_LONG; + bind_in[i].buffer = &data_in[i]; + bind_in[i].buffer_length = sizeof(data_in[i]); + bind_in[i].is_null = &null_in[i]; + } + + mysql_stmt_bind_result(stmt, bind_in); mysql_stmt_store_result(stmt); + test.expect(mysql_stmt_fetch(stmt) == 0, "First fetch of first execute should work"); - test.expect(data[0] == 123, "Query should return one row with value 123: `%d`", data[0]); + test.expect(data_in[0] == 123, "Query should return one row with value 123: `%d`", data_in[0]); test.expect(mysql_stmt_fetch(stmt) != 0, "Second fetch of first execute should NOT work"); + int first_server = data_in[1]; + test.try_query(conn, "COMMIT"); // The second execute goes to a slave, no new parameters are sent in it - data[0] = 123; + memset(data_in, 0, sizeof(data_in)); test.expect(mysql_stmt_execute(stmt) == 0, "Second execute should work: %s", mysql_stmt_error(stmt)); - data[0] = 0; + mysql_stmt_bind_result(stmt, bind_in); mysql_stmt_store_result(stmt); test.expect(mysql_stmt_fetch(stmt) == 0, "First fetch of second execute should work"); - test.expect(data[0] == 123, "Query should return one row with value 123: `%d`", data[0]); + test.expect(data_in[0] == 123, "Query should return one row with value 123: `%d`", data_in[0]); + test.expect(data_in[1] == first_server, + "The query should be routed to the server with server_id %d, not %d", + first_server, data_in[1]); test.expect(mysql_stmt_fetch(stmt) != 0, "Second fetch of second execute should NOT work"); mysql_stmt_close(stmt); +} - test.try_query(conn, "DROP TABLE IF EXISTS double_execute;"); +int main(int argc, char** argv) +{ + TestConnections test(argc, argv); + + test.repl->connect(); + test.maxscales->connect(); + + // Prepare a table + test.try_query(test.repl->nodes[0], "DROP TABLE IF EXISTS double_execute;"); + test.try_query(test.repl->nodes[0], "CREATE TABLE double_execute(a int);"); + test.try_query(test.repl->nodes[0], "INSERT INTO double_execute VALUES (123), (456)"); + test.repl->sync_slaves(); + + test.tprintf("Running test with a direct connection"); + do_test(test, test.repl->nodes[0]); + + test.tprintf("Running test through readwritesplit"); + do_test(test, test.maxscales->conn_rwsplit[0]); + + test.try_query(test.repl->nodes[0], "DROP TABLE IF EXISTS double_execute;"); return test.global_result; } diff --git a/server/core/queryclassifier.cc b/server/core/queryclassifier.cc index c73336e66..58fb1ecc5 100644 --- a/server/core/queryclassifier.cc +++ b/server/core/queryclassifier.cc @@ -75,6 +75,19 @@ uint32_t qc_mysql_extract_ps_id(GWBUF* buffer) return rval; } +uint16_t qc_extract_ps_param_count(GWBUF* buffer) +{ + uint16_t rval = 0; + uint8_t params[MYSQL_PS_PARAMS_SIZE]; + + if (gwbuf_copy_data(buffer, MYSQL_PS_PARAMS_OFFSET, sizeof(params), params) == sizeof(params)) + { + rval = gw_mysql_get_byte2(params); + } + + return rval; +} + bool have_semicolon(const char* ptr, int len) { for (int i = 0; i < len; i++) @@ -265,7 +278,7 @@ public: break; case MXS_COM_STMT_PREPARE: - m_binary_ps[id] = get_prepare_type(buffer); + m_binary_ps[id].type = get_prepare_type(buffer); break; default: @@ -281,7 +294,7 @@ public: if (it != m_binary_ps.end()) { - rval = it->second; + rval = it->second.type; } else { @@ -342,8 +355,32 @@ public: } } + void set_param_count(uint32_t id, uint16_t param_count) + { + m_binary_ps[id].param_count = param_count; + } + + uint16_t param_count(uint32_t id) const + { + uint16_t rval = 0; + auto it = m_binary_ps.find(id); + + if (it != m_binary_ps.end()) + { + rval = it->second.param_count; + } + + return rval; + } + private: - typedef std::unordered_map BinaryPSMap; + struct BinaryPS + { + uint32_t type = 0; + uint16_t param_count = 0; + }; + + typedef std::unordered_map BinaryPSMap; typedef std::unordered_map TextPSMap; private: @@ -368,6 +405,7 @@ QueryClassifier::QueryClassifier(Handler* pHandler, , m_multi_statements_allowed(are_multi_statements_allowed(pSession)) , m_sPs_manager(new PSManager) , m_trx_is_read_only(true) + , m_ps_continuation(false) { } @@ -622,9 +660,15 @@ uint32_t QueryClassifier::ps_id_internal_get(GWBUF* pBuffer) return internal_id; } -void QueryClassifier::ps_id_internal_put(uint32_t external_id, uint32_t internal_id) +void QueryClassifier::ps_store_response(uint32_t internal_id, GWBUF* buffer) { + auto external_id = qc_mysql_extract_ps_id(buffer); m_ps_handles[external_id] = internal_id; + + if (auto param_count = qc_extract_ps_param_count(buffer)) + { + m_sPs_manager->set_param_count(internal_id, param_count); + } } void QueryClassifier::log_transaction_status(GWBUF* querybuf, uint32_t qtype) @@ -909,6 +953,38 @@ QueryClassifier::current_target_t QueryClassifier::handle_multi_temp_and_load( return rv; } +bool QueryClassifier::query_continues_ps(uint8_t cmd, uint32_t stmt_id, GWBUF* buffer) +{ + bool rval = false; + + if (cmd == COM_STMT_FETCH) + { + // COM_STMT_FETCH should always go to the same target as the COM_STMT_EXECUTE + rval = true; + } + else if (cmd == MXS_COM_STMT_EXECUTE) + { + if (auto params = m_sPs_manager->param_count(stmt_id)) + { + size_t types_offset = MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + ((params + 7) / 8); + uint8_t have_types = 0; + + if (gwbuf_copy_data(buffer, types_offset, 1, &have_types)) + { + if (have_types == 0) + { + // A previous COM_STMT_EXECUTE provided the field types, and this one relies on the + // previous one. This means that this query must be routed to the same server where the + // previous COM_STMT_EXECUTE was routed. + rval = true; + } + } + } + } + + return rval; +} + QueryClassifier::RouteInfo QueryClassifier::update_route_info( QueryClassifier::current_target_t current_target, GWBUF* pBuffer) @@ -1005,6 +1081,7 @@ QueryClassifier::RouteInfo QueryClassifier::update_route_info( { stmt_id = ps_id_internal_get(pBuffer); type_mask = ps_get_type(stmt_id); + m_ps_continuation = query_continues_ps(command, stmt_id, pBuffer); } route_target = get_route_target(command, type_mask); diff --git a/server/modules/protocol/MySQL/mysql_common.cc b/server/modules/protocol/MySQL/mysql_common.cc index 2d9b274c3..f9fd3fd9c 100644 --- a/server/modules/protocol/MySQL/mysql_common.cc +++ b/server/modules/protocol/MySQL/mysql_common.cc @@ -1277,8 +1277,8 @@ bool mxs_mysql_extract_ps_response(GWBUF* buffer, MXS_PS_RESPONSE* out) { bool rval = false; uint8_t id[MYSQL_PS_ID_SIZE]; - uint8_t cols[MYSQL_PS_ID_SIZE]; - uint8_t params[MYSQL_PS_ID_SIZE]; + uint8_t cols[MYSQL_PS_COLS_SIZE]; + uint8_t params[MYSQL_PS_PARAMS_SIZE]; uint8_t warnings[MYSQL_PS_WARN_SIZE]; if (gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET, sizeof(id), id) == sizeof(id) diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index dab71d6dc..79ffc485e 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -822,10 +822,8 @@ RWBackend* RWSplitSession::handle_slave_is_target(uint8_t cmd, uint32_t stmt_id) int rlag_max = get_max_replication_lag(); RWBackend* target = nullptr; - if (cmd == MXS_COM_STMT_FETCH) + if (m_qc.is_ps_continuation()) { - /** The COM_STMT_FETCH must be executed on the same server as the - * COM_STMT_EXECUTE was executed on */ ExecMap::iterator it = m_exec_map.find(stmt_id); if (it != m_exec_map.end()) diff --git a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc index 1ac491d1a..0416150fe 100644 --- a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc +++ b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc @@ -128,7 +128,7 @@ void RWSplitSession::process_sescmd_response(RWBackend* backend, GWBUF** ppPacke { /** Map the returned response to the internal ID */ MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id); - m_qc.ps_id_internal_put(resp.id, id); + m_qc.ps_store_response(id, *ppPacket); } // Discard any slave connections that did not return the same result