diff --git a/include/maxscale/buffer.h b/include/maxscale/buffer.h index be21992a5..067c867ce 100644 --- a/include/maxscale/buffer.h +++ b/include/maxscale/buffer.h @@ -212,6 +212,21 @@ extern void gwbuf_free(GWBUF *buf); */ extern GWBUF *gwbuf_clone(GWBUF *buf); +/** + * @brief Deep clone a GWBUF + * + * Clone the data inside a GWBUF into a new buffer. The created buffer has its + * own internal buffer and any modifications to the deep cloned buffer will not + * reflect on the original one. Any buffer objects attached to the original buffer + * will not be copied. Only the buffer type of the original buffer will be copied + * over to the cloned buffer. + * + * @param buf Buffer to clone + * + * @return Deep copy of @c buf or NULL on error + */ +extern GWBUF* gwbuf_deep_clone(const GWBUF* buf); + /** * Compare two GWBUFs. Two GWBUFs are considered identical if their * content is identical, irrespective of whether one is segmented and diff --git a/include/maxscale/session_command.hh b/include/maxscale/session_command.hh index 59c198f43..5cd1e8350 100644 --- a/include/maxscale/session_command.hh +++ b/include/maxscale/session_command.hh @@ -54,10 +54,11 @@ public: uint64_t get_position() const; /** - * @brief Creates a copy of the internal buffer - * @return A copy of the internal buffer + * @brief Creates a deep copy of the internal buffer + * + * @return A deep copy of the internal buffer or NULL on error */ - mxs::Buffer copy_buffer() const; + GWBUF* deep_copy_buffer(); /** * @brief Create a new session command diff --git a/server/core/backend.cc b/server/core/backend.cc index 133ceefc6..ca373e38e 100644 --- a/server/core/backend.cc +++ b/server/core/backend.cc @@ -86,7 +86,7 @@ bool Backend::execute_session_command() SessionCommandList::iterator iter = m_session_commands.begin(); SessionCommand& sescmd = *(*iter); - GWBUF *buffer = sescmd.copy_buffer().release(); + GWBUF *buffer = sescmd.deep_copy_buffer(); bool rval = false; switch (sescmd.get_command()) diff --git a/server/core/buffer.cc b/server/core/buffer.cc index e77cbc1ab..b93adcf41 100644 --- a/server/core/buffer.cc +++ b/server/core/buffer.cc @@ -352,6 +352,28 @@ GWBUF* gwbuf_clone(GWBUF* buf) return rval; } +GWBUF* gwbuf_deep_clone(const GWBUF* buf) +{ + GWBUF* rval = NULL; + + if (buf) + { + size_t buflen = gwbuf_length(buf); + rval = gwbuf_alloc(buflen); + + if (rval && gwbuf_copy_data(buf, 0, buflen, GWBUF_DATA(rval)) == buflen) + { + rval->gwbuf_type = buf->gwbuf_type; + } + else + { + gwbuf_free(rval); + rval = NULL; + } + } + + return rval; +} static GWBUF *gwbuf_clone_portion(GWBUF *buf, size_t start_offset, diff --git a/server/core/session_command.cc b/server/core/session_command.cc index c663ee0ff..836a818bb 100644 --- a/server/core/session_command.cc +++ b/server/core/session_command.cc @@ -38,9 +38,12 @@ uint64_t SessionCommand::get_position() const return m_pos; } -Buffer SessionCommand::copy_buffer() const +GWBUF* SessionCommand::deep_copy_buffer() { - return m_buffer; + GWBUF* temp = m_buffer.release(); + GWBUF* rval = gwbuf_deep_clone(temp); + m_buffer.reset(temp); + return rval; } SessionCommand::SessionCommand(GWBUF *buffer, uint64_t id): diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index aba1aec4f..dafc5752e 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -155,8 +155,17 @@ struct rwsplit_config_t * been idle for too long */ }; -typedef std::map BackendHandleMap; -typedef std::map ClientHandleMap; +static inline bool is_ps_command(uint8_t cmd) +{ + return cmd == MYSQL_COM_STMT_EXECUTE || + cmd == MYSQL_COM_STMT_SEND_LONG_DATA || + cmd == MYSQL_COM_STMT_CLOSE || + cmd == MYSQL_COM_STMT_FETCH || + cmd == MYSQL_COM_STMT_RESET; +} + +typedef std::map BackendHandleMap; /** Internal ID to external ID */ +typedef std::map ClientHandleMap; /** External ID to internal ID */ class RWBackend: public mxs::Backend { @@ -196,15 +205,15 @@ public: return rval; } - void add_ps_handle(uint64_t id, uint32_t handle) + void add_ps_handle(uint32_t id, uint32_t handle) { m_ps_handles[id] = handle; - MXS_INFO("PS response for %s: %lu -> %u", name(), id, handle); + MXS_INFO("PS response for %s: %u -> %u", name(), id, handle); } - uint32_t get_ps_handle(uint64_t id) const + uint32_t get_ps_handle(uint32_t id) const { - HandleMap::const_iterator it = m_ps_handles.find(id); + BackendHandleMap::const_iterator it = m_ps_handles.find(id); if (it != m_ps_handles.end()) { @@ -214,10 +223,13 @@ public: return 0; } - bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE, uint64_t id = 0) + bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE) { - if (id) + uint8_t cmd = mxs_mysql_get_command(buffer); + + if (is_ps_command(cmd)) { + uint32_t id = mxs_mysql_extract_ps_id(buffer); BackendHandleMap::iterator it = m_ps_handles.find(id); if (it != m_ps_handles.end()) @@ -237,7 +249,7 @@ private: }; /** Prepared statement ID to type maps for text protocols */ -typedef std::tr1::unordered_map BinaryPSMap; +typedef std::tr1::unordered_map BinaryPSMap; typedef std::tr1::unordered_map TextPSMap; class PSManager @@ -256,7 +268,7 @@ public: * prepared statement * @param id The unique ID for this statement */ - void store(GWBUF* buffer, uint64_t id); + void store(GWBUF* buffer, uint32_t id); /** * @brief Get the type of a stored prepared statement @@ -266,7 +278,7 @@ public: * * @return The type of the prepared statement */ - uint32_t get_type(uint64_t id) const; + uint32_t get_type(uint32_t id) const; uint32_t get_type(std::string id) const; /** @@ -275,7 +287,7 @@ public: * @param id Statement identifier to remove */ void erase(std::string id); - void erase(uint64_t id); + void erase(uint32_t id); private: BinaryPSMap m_binary_ps; diff --git a/server/modules/routing/readwritesplit/rwsplit_internal.hh b/server/modules/routing/readwritesplit/rwsplit_internal.hh index 4ef396ade..33f15a6d2 100644 --- a/server/modules/routing/readwritesplit/rwsplit_internal.hh +++ b/server/modules/routing/readwritesplit/rwsplit_internal.hh @@ -65,7 +65,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, GWBUF *querybuf); SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype, char *name, int max_rlag); -route_target_t get_route_target(ROUTER_CLIENT_SES *rses, +route_target_t get_route_target(ROUTER_CLIENT_SES *rses, uint8_t command, uint32_t qtype, HINT *hint); void handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t packet_type, uint32_t *qtype); @@ -75,8 +75,7 @@ SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, SRWBackend* dest); bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, - GWBUF *querybuf, SRWBackend& target, - bool store, uint64_t stmt_id); + GWBUF *querybuf, SRWBackend& target, bool store); bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t command, uint32_t type); diff --git a/server/modules/routing/readwritesplit/rwsplit_ps.cc b/server/modules/routing/readwritesplit/rwsplit_ps.cc index de30536f7..138af7a1a 100644 --- a/server/modules/routing/readwritesplit/rwsplit_ps.cc +++ b/server/modules/routing/readwritesplit/rwsplit_ps.cc @@ -78,11 +78,11 @@ PSManager::~PSManager() { } -void PSManager::erase(uint64_t id) +void PSManager::erase(uint32_t id) { if (m_binary_ps.erase(id) == 0) { - MXS_WARNING("Closing unknown prepared statement with ID %lu", id); + MXS_WARNING("Closing unknown prepared statement with ID %u", id); } } @@ -112,7 +112,7 @@ uint32_t PSManager::get_type(std::string id) const } -uint32_t PSManager::get_type(uint64_t id) const +uint32_t PSManager::get_type(uint32_t id) const { uint32_t rval = QUERY_TYPE_UNKNOWN; BinaryPSMap::const_iterator it = m_binary_ps.find(id); @@ -123,14 +123,13 @@ uint32_t PSManager::get_type(uint64_t id) const } else { - MXS_WARNING("Using unknown prepared statement with ID %lu", id); + MXS_WARNING("Using unknown prepared statement with ID %u", id); } - ss_dassert(rval != QUERY_TYPE_UNKNOWN); return rval; } -void PSManager::store(GWBUF* buffer, uint64_t id) +void PSManager::store(GWBUF* buffer, uint32_t id) { ss_dassert(mxs_mysql_get_command(buffer) == MYSQL_COM_STMT_PREPARE || qc_query_is_type(qc_get_type_mask(buffer), diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 62ee2bbd6..b3778a11b 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -91,15 +91,9 @@ void handle_connection_keepalive(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, ss_dassert(nserv < rses->rses_nbackends); } -static inline bool is_ps_command(uint8_t cmd) +uint32_t get_stmt_id(ROUTER_CLIENT_SES* rses, GWBUF* buffer) { - return cmd == MYSQL_COM_STMT_EXECUTE || - cmd == MYSQL_COM_STMT_SEND_LONG_DATA; -} - -uint64_t get_stmt_id(ROUTER_CLIENT_SES* rses, GWBUF* buffer) -{ - uint64_t rval = 0; + uint32_t rval = 0; // All COM_STMT type statements store the ID in the same place uint32_t id = mxs_mysql_extract_ps_id(buffer); @@ -113,6 +107,12 @@ uint64_t get_stmt_id(ROUTER_CLIENT_SES* rses, GWBUF* buffer) return rval; } +void replace_stmt_id(GWBUF* buffer, uint32_t id) +{ + uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET; + gw_mysql_set_byte4(ptr, id); +} + /** * Routing function. Find out query type, backend type, and target DCB(s). * Then route query to found target(s). @@ -135,7 +135,6 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, /* packet_type is a problem as it is MySQL specific */ uint8_t command = determine_packet_type(querybuf, &non_empty_packet); uint32_t qtype = determine_query_type(querybuf, command, non_empty_packet); - uint64_t stmt_id = 0; if (non_empty_packet) { @@ -163,8 +162,6 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, * eventually to master */ - uint32_t ps_type; - if (command == MYSQL_COM_QUERY && qc_get_operation(querybuf) == QUERY_OP_EXECUTE) { @@ -173,11 +170,12 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, } else if (is_ps_command(command)) { - stmt_id = get_stmt_id(rses, querybuf); + uint32_t stmt_id = get_stmt_id(rses, querybuf); qtype = rses->ps_manager.get_type(stmt_id); + replace_stmt_id(querybuf, stmt_id); } - route_target = get_route_target(rses, qtype, querybuf->hint); + route_target = get_route_target(rses, command, qtype, querybuf->hint); } else { @@ -233,7 +231,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, if (target && succp) /*< Have DCB of the target backend */ { ss_dassert(!store_stmt || TARGET_IS_SLAVE(route_target)); - handle_got_target(inst, rses, querybuf, target, store_stmt, stmt_id); + handle_got_target(inst, rses, querybuf, target, store_stmt); } } @@ -541,7 +539,7 @@ SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype, * @return bitfield including the routing target, or the target server name * if the query would otherwise be routed to slave. */ -route_target_t get_route_target(ROUTER_CLIENT_SES *rses, +route_target_t get_route_target(ROUTER_CLIENT_SES *rses, uint8_t command, uint32_t qtype, HINT *hint) { bool trx_active = session_trx_is_active(rses->client_dcb->session); @@ -1044,8 +1042,7 @@ static inline bool query_creates_reply(mysql_server_cmd_t cmd) * @return True on success */ bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, - GWBUF *querybuf, SRWBackend& target, - bool store, uint64_t stmt_id) + GWBUF *querybuf, SRWBackend& target, bool store) { /** * If the transaction is READ ONLY set forced_node to this backend. @@ -1074,7 +1071,7 @@ bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, response = mxs::Backend::EXPECT_RESPONSE; } - if (target->write(gwbuf_clone(querybuf), response, stmt_id)) + if (target->write(gwbuf_clone(querybuf), response)) { if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target->server())) {