From 3c4e1e3b4b82f0384298bb8f12e40f8f94a45c7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Wed, 21 Jun 2017 16:27:31 +0300 Subject: [PATCH] MXS-852: Use stored query type for COM_STMT_EXECUTE When a COM_STMT_EXECUTE or a COM_STMT_SEND_LONG_DATA command is executed, the query type of the prepared statement is used. This allows read-only prepared statements to be load balanced across slaves. --- include/maxscale/backend.hh | 2 +- include/maxscale/protocol/mysql.h | 10 ++-- server/modules/protocol/MySQL/mysql_common.c | 2 +- .../routing/readwritesplit/readwritesplit.hh | 18 ++++++ .../readwritesplit/rwsplit_internal.hh | 3 +- .../readwritesplit/rwsplit_route_stmt.cc | 57 +++++++++++-------- 6 files changed, 61 insertions(+), 31 deletions(-) diff --git a/include/maxscale/backend.hh b/include/maxscale/backend.hh index a7c84911c..b3df34430 100644 --- a/include/maxscale/backend.hh +++ b/include/maxscale/backend.hh @@ -159,7 +159,7 @@ public: * * @return True if data was written successfully */ - bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE); + virtual bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE); /** * @brief Write an authentication switch request to the backend server diff --git a/include/maxscale/protocol/mysql.h b/include/maxscale/protocol/mysql.h index 7b1ca1e73..8ece64bfe 100644 --- a/include/maxscale/protocol/mysql.h +++ b/include/maxscale/protocol/mysql.h @@ -537,12 +537,14 @@ uint8_t mxs_mysql_get_command(GWBUF* buffer); bool mxs_mysql_extract_ps_response(GWBUF* buffer, MXS_PS_RESPONSE* out); /** - * @brief Extract the ID of a COM_STMT_EXECUTE + * @brief Extract the ID from a COM_STMT command * - * @param buffer Buffer containing a COM_STMT_EXECUTE packet + * All the COM_STMT type commands store the statement ID in the same place. * - * @return The ID of the prepared statement being executed or 0 on failure + * @param buffer Buffer containing one of the COM_STMT commands (not COM_STMT_PREPARE) + * + * @return The statement ID */ -uint32_t mxs_mysql_extract_execute(GWBUF* buffer); +uint32_t mxs_mysql_extract_ps_id(GWBUF* buffer); MXS_END_DECLS diff --git a/server/modules/protocol/MySQL/mysql_common.c b/server/modules/protocol/MySQL/mysql_common.c index e39e1f948..839e313e0 100644 --- a/server/modules/protocol/MySQL/mysql_common.c +++ b/server/modules/protocol/MySQL/mysql_common.c @@ -1637,7 +1637,7 @@ bool mxs_mysql_extract_ps_response(GWBUF* buffer, MXS_PS_RESPONSE* out) return rval; } -uint32_t mxs_mysql_extract_execute(GWBUF* buffer) +uint32_t mxs_mysql_extract_ps_id(GWBUF* buffer) { uint32_t rval = 0; uint8_t id[MYSQL_PS_ID_SIZE]; diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index 5dea83441..aba1aec4f 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -32,6 +32,7 @@ #include #include #include +#include enum backend_type_t { @@ -213,6 +214,23 @@ public: return 0; } + bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE, uint64_t id = 0) + { + if (id) + { + BackendHandleMap::iterator it = m_ps_handles.find(id); + + if (it != m_ps_handles.end()) + { + /** Replace the client handle with the real PS handle */ + uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET; + gw_mysql_set_byte4(ptr, it->second); + } + } + + return mxs::Backend::write(buffer); + } + private: reply_state_t m_reply_state; BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */ diff --git a/server/modules/routing/readwritesplit/rwsplit_internal.hh b/server/modules/routing/readwritesplit/rwsplit_internal.hh index fae399edd..4ef396ade 100644 --- a/server/modules/routing/readwritesplit/rwsplit_internal.hh +++ b/server/modules/routing/readwritesplit/rwsplit_internal.hh @@ -75,7 +75,8 @@ SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, SRWBackend* dest); bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, - GWBUF *querybuf, SRWBackend& target, bool store); + GWBUF *querybuf, SRWBackend& target, + bool store, uint64_t stmt_id); bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t command, uint32_t type); diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 05d80b91a..62ee2bbd6 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -91,6 +91,28 @@ void handle_connection_keepalive(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, ss_dassert(nserv < rses->rses_nbackends); } +static inline bool is_ps_command(uint8_t cmd) +{ + return cmd == MYSQL_COM_STMT_EXECUTE || + cmd == MYSQL_COM_STMT_SEND_LONG_DATA; +} + +uint64_t get_stmt_id(ROUTER_CLIENT_SES* rses, GWBUF* buffer) +{ + uint64_t rval = 0; + + // All COM_STMT type statements store the ID in the same place + uint32_t id = mxs_mysql_extract_ps_id(buffer); + ClientHandleMap::iterator it = rses->ps_handles.find(id); + + if (it != rses->ps_handles.end()) + { + rval = it->second; + } + + return rval; +} + /** * Routing function. Find out query type, backend type, and target DCB(s). * Then route query to found target(s). @@ -113,6 +135,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, /* packet_type is a problem as it is MySQL specific */ uint8_t command = determine_packet_type(querybuf, &non_empty_packet); uint32_t qtype = determine_query_type(querybuf, command, non_empty_packet); + uint64_t stmt_id = 0; if (non_empty_packet) { @@ -148,17 +171,10 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, std::string id = extract_text_ps_id(querybuf); qtype = rses->ps_manager.get_type(id); } - else if (command == MYSQL_COM_STMT_EXECUTE) + else if (is_ps_command(command)) { - uint32_t id = mxs_mysql_extract_execute(querybuf); - ClientHandleMap::iterator it = rses->ps_handles.find(id); - - if (it != rses->ps_handles.end()) - { - char *qtypestr = qc_typemask_to_string(rses->ps_manager.get_type(it->second)); - MXS_INFO("Client handle %u maps to %lu of type %s", id, it->second, qtypestr); - MXS_FREE(qtypestr); - } + stmt_id = get_stmt_id(rses, querybuf); + qtype = rses->ps_manager.get_type(stmt_id); } route_target = get_route_target(rses, qtype, querybuf->hint); @@ -217,7 +233,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, if (target && succp) /*< Have DCB of the target backend */ { ss_dassert(!store_stmt || TARGET_IS_SLAVE(route_target)); - handle_got_target(inst, rses, querybuf, target, store_stmt); + handle_got_target(inst, rses, querybuf, target, store_stmt, stmt_id); } } @@ -1023,20 +1039,13 @@ static inline bool query_creates_reply(mysql_server_cmd_t cmd) } /** - * @brief Handle got a target + * @brief Handle writing to a target server * - * One of the possible types of handling required when a request is routed - * - * @param inst Router instance - * @param ses Router session - * @param querybuf Buffer containing query to be routed - * @param target_dcb DCB for the target server - * - * @return bool - true if succeeded, false otherwise + * @return True on success */ -bool -handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, - GWBUF *querybuf, SRWBackend& target, bool store) +bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, + GWBUF *querybuf, SRWBackend& target, + bool store, uint64_t stmt_id) { /** * If the transaction is READ ONLY set forced_node to this backend. @@ -1065,7 +1074,7 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, response = mxs::Backend::EXPECT_RESPONSE; } - if (target->write(gwbuf_clone(querybuf), response)) + if (target->write(gwbuf_clone(querybuf), response, stmt_id)) { if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target->server())) {