From 5fc30740b74dd3d9f12d412c9e77be28156e7a6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Wed, 21 Jun 2017 18:22:16 +0300 Subject: [PATCH] MXS-852: Store the internal ID in the buffer If the internal ID is stored in the buffer when it is moving inside the readwritesplit router, the RWBackend can manage the execution of all commands with a statement ID by replacing the stored ID with the correct value. --- include/maxscale/buffer.h | 15 ++++++++ include/maxscale/session_command.hh | 7 ++-- server/core/backend.cc | 2 +- server/core/buffer.cc | 22 ++++++++++++ server/core/session_command.cc | 7 ++-- .../routing/readwritesplit/readwritesplit.hh | 36 ++++++++++++------- .../readwritesplit/rwsplit_internal.hh | 5 ++- .../routing/readwritesplit/rwsplit_ps.cc | 11 +++--- .../readwritesplit/rwsplit_route_stmt.cc | 33 ++++++++--------- 9 files changed, 93 insertions(+), 45 deletions(-) diff --git a/include/maxscale/buffer.h b/include/maxscale/buffer.h index be21992a5..067c867ce 100644 --- a/include/maxscale/buffer.h +++ b/include/maxscale/buffer.h @@ -212,6 +212,21 @@ extern void gwbuf_free(GWBUF *buf); */ extern GWBUF *gwbuf_clone(GWBUF *buf); +/** + * @brief Deep clone a GWBUF + * + * Clone the data inside a GWBUF into a new buffer. The created buffer has its + * own internal buffer and any modifications to the deep cloned buffer will not + * reflect on the original one. Any buffer objects attached to the original buffer + * will not be copied. Only the buffer type of the original buffer will be copied + * over to the cloned buffer. + * + * @param buf Buffer to clone + * + * @return Deep copy of @c buf or NULL on error + */ +extern GWBUF* gwbuf_deep_clone(const GWBUF* buf); + /** * Compare two GWBUFs. Two GWBUFs are considered identical if their * content is identical, irrespective of whether one is segmented and diff --git a/include/maxscale/session_command.hh b/include/maxscale/session_command.hh index 59c198f43..5cd1e8350 100644 --- a/include/maxscale/session_command.hh +++ b/include/maxscale/session_command.hh @@ -54,10 +54,11 @@ public: uint64_t get_position() const; /** - * @brief Creates a copy of the internal buffer - * @return A copy of the internal buffer + * @brief Creates a deep copy of the internal buffer + * + * @return A deep copy of the internal buffer or NULL on error */ - mxs::Buffer copy_buffer() const; + GWBUF* deep_copy_buffer(); /** * @brief Create a new session command diff --git a/server/core/backend.cc b/server/core/backend.cc index 133ceefc6..ca373e38e 100644 --- a/server/core/backend.cc +++ b/server/core/backend.cc @@ -86,7 +86,7 @@ bool Backend::execute_session_command() SessionCommandList::iterator iter = m_session_commands.begin(); SessionCommand& sescmd = *(*iter); - GWBUF *buffer = sescmd.copy_buffer().release(); + GWBUF *buffer = sescmd.deep_copy_buffer(); bool rval = false; switch (sescmd.get_command()) diff --git a/server/core/buffer.cc b/server/core/buffer.cc index e77cbc1ab..b93adcf41 100644 --- a/server/core/buffer.cc +++ b/server/core/buffer.cc @@ -352,6 +352,28 @@ GWBUF* gwbuf_clone(GWBUF* buf) return rval; } +GWBUF* gwbuf_deep_clone(const GWBUF* buf) +{ + GWBUF* rval = NULL; + + if (buf) + { + size_t buflen = gwbuf_length(buf); + rval = gwbuf_alloc(buflen); + + if (rval && gwbuf_copy_data(buf, 0, buflen, GWBUF_DATA(rval)) == buflen) + { + rval->gwbuf_type = buf->gwbuf_type; + } + else + { + gwbuf_free(rval); + rval = NULL; + } + } + + return rval; +} static GWBUF *gwbuf_clone_portion(GWBUF *buf, size_t start_offset, diff --git a/server/core/session_command.cc b/server/core/session_command.cc index c663ee0ff..836a818bb 100644 --- a/server/core/session_command.cc +++ b/server/core/session_command.cc @@ -38,9 +38,12 @@ uint64_t SessionCommand::get_position() const return m_pos; } -Buffer SessionCommand::copy_buffer() const +GWBUF* SessionCommand::deep_copy_buffer() { - return m_buffer; + GWBUF* temp = m_buffer.release(); + GWBUF* rval = gwbuf_deep_clone(temp); + m_buffer.reset(temp); + return rval; } SessionCommand::SessionCommand(GWBUF *buffer, uint64_t id): diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index aba1aec4f..dafc5752e 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -155,8 +155,17 @@ struct rwsplit_config_t * been idle for too long */ }; -typedef std::map BackendHandleMap; -typedef std::map ClientHandleMap; +static inline bool is_ps_command(uint8_t cmd) +{ + return cmd == MYSQL_COM_STMT_EXECUTE || + cmd == MYSQL_COM_STMT_SEND_LONG_DATA || + cmd == MYSQL_COM_STMT_CLOSE || + cmd == MYSQL_COM_STMT_FETCH || + cmd == MYSQL_COM_STMT_RESET; +} + +typedef std::map BackendHandleMap; /** Internal ID to external ID */ +typedef std::map ClientHandleMap; /** External ID to internal ID */ class RWBackend: public mxs::Backend { @@ -196,15 +205,15 @@ public: return rval; } - void add_ps_handle(uint64_t id, uint32_t handle) + void add_ps_handle(uint32_t id, uint32_t handle) { m_ps_handles[id] = handle; - MXS_INFO("PS response for %s: %lu -> %u", name(), id, handle); + MXS_INFO("PS response for %s: %u -> %u", name(), id, handle); } - uint32_t get_ps_handle(uint64_t id) const + uint32_t get_ps_handle(uint32_t id) const { - HandleMap::const_iterator it = m_ps_handles.find(id); + BackendHandleMap::const_iterator it = m_ps_handles.find(id); if (it != m_ps_handles.end()) { @@ -214,10 +223,13 @@ public: return 0; } - bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE, uint64_t id = 0) + bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE) { - if (id) + uint8_t cmd = mxs_mysql_get_command(buffer); + + if (is_ps_command(cmd)) { + uint32_t id = mxs_mysql_extract_ps_id(buffer); BackendHandleMap::iterator it = m_ps_handles.find(id); if (it != m_ps_handles.end()) @@ -237,7 +249,7 @@ private: }; /** Prepared statement ID to type maps for text protocols */ -typedef std::tr1::unordered_map BinaryPSMap; +typedef std::tr1::unordered_map BinaryPSMap; typedef std::tr1::unordered_map TextPSMap; class PSManager @@ -256,7 +268,7 @@ public: * prepared statement * @param id The unique ID for this statement */ - void store(GWBUF* buffer, uint64_t id); + void store(GWBUF* buffer, uint32_t id); /** * @brief Get the type of a stored prepared statement @@ -266,7 +278,7 @@ public: * * @return The type of the prepared statement */ - uint32_t get_type(uint64_t id) const; + uint32_t get_type(uint32_t id) const; uint32_t get_type(std::string id) const; /** @@ -275,7 +287,7 @@ public: * @param id Statement identifier to remove */ void erase(std::string id); - void erase(uint64_t id); + void erase(uint32_t id); private: BinaryPSMap m_binary_ps; diff --git a/server/modules/routing/readwritesplit/rwsplit_internal.hh b/server/modules/routing/readwritesplit/rwsplit_internal.hh index 4ef396ade..33f15a6d2 100644 --- a/server/modules/routing/readwritesplit/rwsplit_internal.hh +++ b/server/modules/routing/readwritesplit/rwsplit_internal.hh @@ -65,7 +65,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, GWBUF *querybuf); SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype, char *name, int max_rlag); -route_target_t get_route_target(ROUTER_CLIENT_SES *rses, +route_target_t get_route_target(ROUTER_CLIENT_SES *rses, uint8_t command, uint32_t qtype, HINT *hint); void handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t packet_type, uint32_t *qtype); @@ -75,8 +75,7 @@ SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, SRWBackend* dest); bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, - GWBUF *querybuf, SRWBackend& target, - bool store, uint64_t stmt_id); + GWBUF *querybuf, SRWBackend& target, bool store); bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t command, uint32_t type); diff --git a/server/modules/routing/readwritesplit/rwsplit_ps.cc b/server/modules/routing/readwritesplit/rwsplit_ps.cc index de30536f7..138af7a1a 100644 --- a/server/modules/routing/readwritesplit/rwsplit_ps.cc +++ b/server/modules/routing/readwritesplit/rwsplit_ps.cc @@ -78,11 +78,11 @@ PSManager::~PSManager() { } -void PSManager::erase(uint64_t id) +void PSManager::erase(uint32_t id) { if (m_binary_ps.erase(id) == 0) { - MXS_WARNING("Closing unknown prepared statement with ID %lu", id); + MXS_WARNING("Closing unknown prepared statement with ID %u", id); } } @@ -112,7 +112,7 @@ uint32_t PSManager::get_type(std::string id) const } -uint32_t PSManager::get_type(uint64_t id) const +uint32_t PSManager::get_type(uint32_t id) const { uint32_t rval = QUERY_TYPE_UNKNOWN; BinaryPSMap::const_iterator it = m_binary_ps.find(id); @@ -123,14 +123,13 @@ uint32_t PSManager::get_type(uint64_t id) const } else { - MXS_WARNING("Using unknown prepared statement with ID %lu", id); + MXS_WARNING("Using unknown prepared statement with ID %u", id); } - ss_dassert(rval != QUERY_TYPE_UNKNOWN); return rval; } -void PSManager::store(GWBUF* buffer, uint64_t id) +void PSManager::store(GWBUF* buffer, uint32_t id) { ss_dassert(mxs_mysql_get_command(buffer) == MYSQL_COM_STMT_PREPARE || qc_query_is_type(qc_get_type_mask(buffer), diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 62ee2bbd6..b3778a11b 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -91,15 +91,9 @@ void handle_connection_keepalive(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, ss_dassert(nserv < rses->rses_nbackends); } -static inline bool is_ps_command(uint8_t cmd) +uint32_t get_stmt_id(ROUTER_CLIENT_SES* rses, GWBUF* buffer) { - return cmd == MYSQL_COM_STMT_EXECUTE || - cmd == MYSQL_COM_STMT_SEND_LONG_DATA; -} - -uint64_t get_stmt_id(ROUTER_CLIENT_SES* rses, GWBUF* buffer) -{ - uint64_t rval = 0; + uint32_t rval = 0; // All COM_STMT type statements store the ID in the same place uint32_t id = mxs_mysql_extract_ps_id(buffer); @@ -113,6 +107,12 @@ uint64_t get_stmt_id(ROUTER_CLIENT_SES* rses, GWBUF* buffer) return rval; } +void replace_stmt_id(GWBUF* buffer, uint32_t id) +{ + uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET; + gw_mysql_set_byte4(ptr, id); +} + /** * Routing function. Find out query type, backend type, and target DCB(s). * Then route query to found target(s). @@ -135,7 +135,6 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, /* packet_type is a problem as it is MySQL specific */ uint8_t command = determine_packet_type(querybuf, &non_empty_packet); uint32_t qtype = determine_query_type(querybuf, command, non_empty_packet); - uint64_t stmt_id = 0; if (non_empty_packet) { @@ -163,8 +162,6 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, * eventually to master */ - uint32_t ps_type; - if (command == MYSQL_COM_QUERY && qc_get_operation(querybuf) == QUERY_OP_EXECUTE) { @@ -173,11 +170,12 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, } else if (is_ps_command(command)) { - stmt_id = get_stmt_id(rses, querybuf); + uint32_t stmt_id = get_stmt_id(rses, querybuf); qtype = rses->ps_manager.get_type(stmt_id); + replace_stmt_id(querybuf, stmt_id); } - route_target = get_route_target(rses, qtype, querybuf->hint); + route_target = get_route_target(rses, command, qtype, querybuf->hint); } else { @@ -233,7 +231,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, if (target && succp) /*< Have DCB of the target backend */ { ss_dassert(!store_stmt || TARGET_IS_SLAVE(route_target)); - handle_got_target(inst, rses, querybuf, target, store_stmt, stmt_id); + handle_got_target(inst, rses, querybuf, target, store_stmt); } } @@ -541,7 +539,7 @@ SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype, * @return bitfield including the routing target, or the target server name * if the query would otherwise be routed to slave. */ -route_target_t get_route_target(ROUTER_CLIENT_SES *rses, +route_target_t get_route_target(ROUTER_CLIENT_SES *rses, uint8_t command, uint32_t qtype, HINT *hint) { bool trx_active = session_trx_is_active(rses->client_dcb->session); @@ -1044,8 +1042,7 @@ static inline bool query_creates_reply(mysql_server_cmd_t cmd) * @return True on success */ bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, - GWBUF *querybuf, SRWBackend& target, - bool store, uint64_t stmt_id) + GWBUF *querybuf, SRWBackend& target, bool store) { /** * If the transaction is READ ONLY set forced_node to this backend. @@ -1074,7 +1071,7 @@ bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, response = mxs::Backend::EXPECT_RESPONSE; } - if (target->write(gwbuf_clone(querybuf), response, stmt_id)) + if (target->write(gwbuf_clone(querybuf), response)) { if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target->server())) {