From 5eba688c1bcd194e7ab2d0a2e18f2bcc2354cf48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Wed, 26 Jun 2019 12:33:46 +0300 Subject: [PATCH] MXS-2521: Detect COM_STMT_EXECUTE without metadata If a COM_STMT_EXECUTE has no metadata in it and it has more than one parameter, it must be routed to the same backend where the previous COM_STMT_EXECUTE with the same ID was routed to. This prevents MDEV-19811 that is triggered by MaxScale routing the queries to different backends. --- include/maxscale/queryclassifier.hh | 26 +++++- server/core/queryclassifier.cc | 85 ++++++++++++++++++- .../readwritesplit/rwsplit_route_stmt.cc | 4 +- .../readwritesplit/rwsplit_session_cmd.cc | 2 +- 4 files changed, 105 insertions(+), 12 deletions(-) diff --git a/include/maxscale/queryclassifier.hh b/include/maxscale/queryclassifier.hh index 3af22b6a5..78ea4fed6 100644 --- a/include/maxscale/queryclassifier.hh +++ b/include/maxscale/queryclassifier.hh @@ -234,6 +234,17 @@ public: return qc_query_is_type(m_route_info.type_mask(), QUERY_TYPE_BEGIN_TRX); } + /** + * Whether the current binary protocol statement is a continuation of a previously executed statement. + * + * All COM_STMT_FETCH are continuations of a previously executed COM_STMT_EXECUTE. A COM_STMT_EXECUTE can + * be a continuation if it has parameters but it doesn't provide the metadata for them. + */ + bool is_ps_continuation() const + { + return m_ps_continuation; + } + /** * @brief Store and process a prepared statement * @@ -251,12 +262,16 @@ public: void ps_erase(GWBUF* buffer); /** - * @brief Store a mapping from an external id to the corresponding internal id + * @brief Store a prepared statement response * - * @param external_id The external id as seen by the client. - * @param internal_id The corresponding internal id. + * The response maps the internal ID to the external ID that is given to the client. It also collects + * the number of parameters in the prepared statement which are required in some cases in the routing + * process. + * + * @param internal_id The internal id (i.e. the session command number) + * @param buffer The buffer containing the OK response to a COM_STMT_PREPARE */ - void ps_id_internal_put(uint32_t external_id, uint32_t internal_id); + void ps_store_response(uint32_t internal_id, GWBUF* buffer); /** * @brief Update the current RouteInfo. @@ -373,6 +388,8 @@ private: uint8_t packet_type, uint32_t* qtype); + bool query_continues_ps(uint8_t cmd, uint32_t stmt_id, GWBUF* buffer); + private: class PSManager; typedef std::shared_ptr SPSManager; @@ -397,5 +414,6 @@ private: HandleMap m_ps_handles; /** External ID to internal ID */ RouteInfo m_route_info; bool m_trx_is_read_only; + bool m_ps_continuation; }; } diff --git a/server/core/queryclassifier.cc b/server/core/queryclassifier.cc index 7ebe25395..84ccd41d4 100644 --- a/server/core/queryclassifier.cc +++ b/server/core/queryclassifier.cc @@ -75,6 +75,19 @@ uint32_t qc_mysql_extract_ps_id(GWBUF* buffer) return rval; } +uint16_t qc_extract_ps_param_count(GWBUF* buffer) +{ + uint16_t rval = 0; + uint8_t params[MYSQL_PS_PARAMS_SIZE]; + + if (gwbuf_copy_data(buffer, MYSQL_PS_PARAMS_OFFSET, sizeof(params), params) == sizeof(params)) + { + rval = gw_mysql_get_byte2(params); + } + + return rval; +} + bool have_semicolon(const char* ptr, int len) { for (int i = 0; i < len; i++) @@ -265,7 +278,7 @@ public: break; case MXS_COM_STMT_PREPARE: - m_binary_ps[id] = get_prepare_type(buffer); + m_binary_ps[id].type = get_prepare_type(buffer); break; default: @@ -281,7 +294,7 @@ public: if (it != m_binary_ps.end()) { - rval = it->second; + rval = it->second.type; } else { @@ -342,8 +355,32 @@ public: } } + void set_param_count(uint32_t id, uint16_t param_count) + { + m_binary_ps[id].param_count = param_count; + } + + uint16_t param_count(uint32_t id) const + { + uint16_t rval = 0; + auto it = m_binary_ps.find(id); + + if (it != m_binary_ps.end()) + { + rval = it->second.param_count; + } + + return rval; + } + private: - typedef std::unordered_map BinaryPSMap; + struct BinaryPS + { + uint32_t type = 0; + uint16_t param_count = 0; + }; + + typedef std::unordered_map BinaryPSMap; typedef std::unordered_map TextPSMap; private: @@ -368,6 +405,7 @@ QueryClassifier::QueryClassifier(Handler* pHandler, , m_multi_statements_allowed(are_multi_statements_allowed(pSession)) , m_sPs_manager(new PSManager) , m_trx_is_read_only(true) + , m_ps_continuation(false) { } @@ -622,9 +660,15 @@ uint32_t QueryClassifier::ps_id_internal_get(GWBUF* pBuffer) return internal_id; } -void QueryClassifier::ps_id_internal_put(uint32_t external_id, uint32_t internal_id) +void QueryClassifier::ps_store_response(uint32_t internal_id, GWBUF* buffer) { + auto external_id = qc_mysql_extract_ps_id(buffer); m_ps_handles[external_id] = internal_id; + + if (auto param_count = qc_extract_ps_param_count(buffer)) + { + m_sPs_manager->set_param_count(internal_id, param_count); + } } void QueryClassifier::log_transaction_status(GWBUF* querybuf, uint32_t qtype) @@ -909,6 +953,38 @@ QueryClassifier::current_target_t QueryClassifier::handle_multi_temp_and_load( return rv; } +bool QueryClassifier::query_continues_ps(uint8_t cmd, uint32_t stmt_id, GWBUF* buffer) +{ + bool rval = false; + + if (cmd == COM_STMT_FETCH) + { + // COM_STMT_FETCH should always go to the same target as the COM_STMT_EXECUTE + rval = true; + } + else if (cmd == MXS_COM_STMT_EXECUTE) + { + if (auto params = m_sPs_manager->param_count(stmt_id)) + { + size_t types_offset = MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + ((params + 7) / 8); + uint8_t have_types = 0; + + if (gwbuf_copy_data(buffer, types_offset, 1, &have_types)) + { + if (have_types == 0) + { + // A previous COM_STMT_EXECUTE provided the field types, and this one relies on the + // previous one. This means that this query must be routed to the same server where the + // previous COM_STMT_EXECUTE was routed. + rval = true; + } + } + } + } + + return rval; +} + QueryClassifier::RouteInfo QueryClassifier::update_route_info( QueryClassifier::current_target_t current_target, GWBUF* pBuffer) @@ -1005,6 +1081,7 @@ QueryClassifier::RouteInfo QueryClassifier::update_route_info( { stmt_id = ps_id_internal_get(pBuffer); type_mask = ps_get_type(stmt_id); + m_ps_continuation = query_continues_ps(command, stmt_id, pBuffer); } route_target = get_route_target(command, type_mask); diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index da86d25fe..d88ae1ca7 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -895,10 +895,8 @@ SRWBackend RWSplitSession::handle_slave_is_target(uint8_t cmd, uint32_t stmt_id) int rlag_max = get_max_replication_lag(); SRWBackend target; - if (cmd == MXS_COM_STMT_FETCH) + if (m_qc.is_ps_continuation()) { - /** The COM_STMT_FETCH must be executed on the same server as the - * COM_STMT_EXECUTE was executed on */ ExecMap::iterator it = m_exec_map.find(stmt_id); if (it != m_exec_map.end()) diff --git a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc index 57963013a..762cd10ef 100644 --- a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc +++ b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc @@ -126,7 +126,7 @@ void RWSplitSession::process_sescmd_response(SRWBackend& backend, GWBUF** ppPack { /** Map the returned response to the internal ID */ MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id); - m_qc.ps_id_internal_put(resp.id, id); + m_qc.ps_store_response(id, *ppPacket); } // Discard any slave connections that did not return the same result