MXS-852: Store the internal ID in the buffer

If the internal ID is stored in the buffer when it is moving inside the
readwritesplit router, the RWBackend can manage the execution of all
commands with a statement ID by replacing the stored ID with the correct
value.
This commit is contained in:
Markus Mäkelä
2017-06-21 18:22:16 +03:00
parent 3c4e1e3b4b
commit 5fc30740b7
9 changed files with 93 additions and 45 deletions

View File

@ -212,6 +212,21 @@ extern void gwbuf_free(GWBUF *buf);
*/ */
extern GWBUF *gwbuf_clone(GWBUF *buf); extern GWBUF *gwbuf_clone(GWBUF *buf);
/**
* @brief Deep clone a GWBUF
*
* Clone the data inside a GWBUF into a new buffer. The created buffer has its
* own internal buffer and any modifications to the deep cloned buffer will not
* reflect on the original one. Any buffer objects attached to the original buffer
* will not be copied. Only the buffer type of the original buffer will be copied
* over to the cloned buffer.
*
* @param buf Buffer to clone
*
* @return Deep copy of @c buf or NULL on error
*/
extern GWBUF* gwbuf_deep_clone(const GWBUF* buf);
/** /**
* Compare two GWBUFs. Two GWBUFs are considered identical if their * Compare two GWBUFs. Two GWBUFs are considered identical if their
* content is identical, irrespective of whether one is segmented and * content is identical, irrespective of whether one is segmented and

View File

@ -54,10 +54,11 @@ public:
uint64_t get_position() const; uint64_t get_position() const;
/** /**
* @brief Creates a copy of the internal buffer * @brief Creates a deep copy of the internal buffer
* @return A copy of the internal buffer *
* @return A deep copy of the internal buffer or NULL on error
*/ */
mxs::Buffer copy_buffer() const; GWBUF* deep_copy_buffer();
/** /**
* @brief Create a new session command * @brief Create a new session command

View File

@ -86,7 +86,7 @@ bool Backend::execute_session_command()
SessionCommandList::iterator iter = m_session_commands.begin(); SessionCommandList::iterator iter = m_session_commands.begin();
SessionCommand& sescmd = *(*iter); SessionCommand& sescmd = *(*iter);
GWBUF *buffer = sescmd.copy_buffer().release(); GWBUF *buffer = sescmd.deep_copy_buffer();
bool rval = false; bool rval = false;
switch (sescmd.get_command()) switch (sescmd.get_command())

View File

@ -352,6 +352,28 @@ GWBUF* gwbuf_clone(GWBUF* buf)
return rval; return rval;
} }
GWBUF* gwbuf_deep_clone(const GWBUF* buf)
{
GWBUF* rval = NULL;
if (buf)
{
size_t buflen = gwbuf_length(buf);
rval = gwbuf_alloc(buflen);
if (rval && gwbuf_copy_data(buf, 0, buflen, GWBUF_DATA(rval)) == buflen)
{
rval->gwbuf_type = buf->gwbuf_type;
}
else
{
gwbuf_free(rval);
rval = NULL;
}
}
return rval;
}
static GWBUF *gwbuf_clone_portion(GWBUF *buf, static GWBUF *gwbuf_clone_portion(GWBUF *buf,
size_t start_offset, size_t start_offset,

View File

@ -38,9 +38,12 @@ uint64_t SessionCommand::get_position() const
return m_pos; return m_pos;
} }
Buffer SessionCommand::copy_buffer() const GWBUF* SessionCommand::deep_copy_buffer()
{ {
return m_buffer; GWBUF* temp = m_buffer.release();
GWBUF* rval = gwbuf_deep_clone(temp);
m_buffer.reset(temp);
return rval;
} }
SessionCommand::SessionCommand(GWBUF *buffer, uint64_t id): SessionCommand::SessionCommand(GWBUF *buffer, uint64_t id):

View File

@ -155,8 +155,17 @@ struct rwsplit_config_t
* been idle for too long */ * been idle for too long */
}; };
typedef std::map<uint64_t, uint32_t> BackendHandleMap; static inline bool is_ps_command(uint8_t cmd)
typedef std::map<uint32_t, uint64_t> ClientHandleMap; {
return cmd == MYSQL_COM_STMT_EXECUTE ||
cmd == MYSQL_COM_STMT_SEND_LONG_DATA ||
cmd == MYSQL_COM_STMT_CLOSE ||
cmd == MYSQL_COM_STMT_FETCH ||
cmd == MYSQL_COM_STMT_RESET;
}
typedef std::map<uint32_t, uint32_t> BackendHandleMap; /** Internal ID to external ID */
typedef std::map<uint32_t, uint32_t> ClientHandleMap; /** External ID to internal ID */
class RWBackend: public mxs::Backend class RWBackend: public mxs::Backend
{ {
@ -196,15 +205,15 @@ public:
return rval; return rval;
} }
void add_ps_handle(uint64_t id, uint32_t handle) void add_ps_handle(uint32_t id, uint32_t handle)
{ {
m_ps_handles[id] = handle; m_ps_handles[id] = handle;
MXS_INFO("PS response for %s: %lu -> %u", name(), id, handle); MXS_INFO("PS response for %s: %u -> %u", name(), id, handle);
} }
uint32_t get_ps_handle(uint64_t id) const uint32_t get_ps_handle(uint32_t id) const
{ {
HandleMap::const_iterator it = m_ps_handles.find(id); BackendHandleMap::const_iterator it = m_ps_handles.find(id);
if (it != m_ps_handles.end()) if (it != m_ps_handles.end())
{ {
@ -214,10 +223,13 @@ public:
return 0; return 0;
} }
bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE, uint64_t id = 0) bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE)
{ {
if (id) uint8_t cmd = mxs_mysql_get_command(buffer);
if (is_ps_command(cmd))
{ {
uint32_t id = mxs_mysql_extract_ps_id(buffer);
BackendHandleMap::iterator it = m_ps_handles.find(id); BackendHandleMap::iterator it = m_ps_handles.find(id);
if (it != m_ps_handles.end()) if (it != m_ps_handles.end())
@ -237,7 +249,7 @@ private:
}; };
/** Prepared statement ID to type maps for text protocols */ /** Prepared statement ID to type maps for text protocols */
typedef std::tr1::unordered_map<uint64_t, uint32_t> BinaryPSMap; typedef std::tr1::unordered_map<uint32_t, uint32_t> BinaryPSMap;
typedef std::tr1::unordered_map<std::string, uint32_t> TextPSMap; typedef std::tr1::unordered_map<std::string, uint32_t> TextPSMap;
class PSManager class PSManager
@ -256,7 +268,7 @@ public:
* prepared statement * prepared statement
* @param id The unique ID for this statement * @param id The unique ID for this statement
*/ */
void store(GWBUF* buffer, uint64_t id); void store(GWBUF* buffer, uint32_t id);
/** /**
* @brief Get the type of a stored prepared statement * @brief Get the type of a stored prepared statement
@ -266,7 +278,7 @@ public:
* *
* @return The type of the prepared statement * @return The type of the prepared statement
*/ */
uint32_t get_type(uint64_t id) const; uint32_t get_type(uint32_t id) const;
uint32_t get_type(std::string id) const; uint32_t get_type(std::string id) const;
/** /**
@ -275,7 +287,7 @@ public:
* @param id Statement identifier to remove * @param id Statement identifier to remove
*/ */
void erase(std::string id); void erase(std::string id);
void erase(uint64_t id); void erase(uint32_t id);
private: private:
BinaryPSMap m_binary_ps; BinaryPSMap m_binary_ps;

View File

@ -65,7 +65,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf); GWBUF *querybuf);
SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype, SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
char *name, int max_rlag); char *name, int max_rlag);
route_target_t get_route_target(ROUTER_CLIENT_SES *rses, route_target_t get_route_target(ROUTER_CLIENT_SES *rses, uint8_t command,
uint32_t qtype, HINT *hint); uint32_t qtype, HINT *hint);
void handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, void handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
uint8_t packet_type, uint32_t *qtype); uint8_t packet_type, uint32_t *qtype);
@ -75,8 +75,7 @@ SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses
bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
SRWBackend* dest); SRWBackend* dest);
bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, SRWBackend& target, GWBUF *querybuf, SRWBackend& target, bool store);
bool store, uint64_t stmt_id);
bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
uint8_t command, uint32_t type); uint8_t command, uint32_t type);

View File

@ -78,11 +78,11 @@ PSManager::~PSManager()
{ {
} }
void PSManager::erase(uint64_t id) void PSManager::erase(uint32_t id)
{ {
if (m_binary_ps.erase(id) == 0) if (m_binary_ps.erase(id) == 0)
{ {
MXS_WARNING("Closing unknown prepared statement with ID %lu", id); MXS_WARNING("Closing unknown prepared statement with ID %u", id);
} }
} }
@ -112,7 +112,7 @@ uint32_t PSManager::get_type(std::string id) const
} }
uint32_t PSManager::get_type(uint64_t id) const uint32_t PSManager::get_type(uint32_t id) const
{ {
uint32_t rval = QUERY_TYPE_UNKNOWN; uint32_t rval = QUERY_TYPE_UNKNOWN;
BinaryPSMap::const_iterator it = m_binary_ps.find(id); BinaryPSMap::const_iterator it = m_binary_ps.find(id);
@ -123,14 +123,13 @@ uint32_t PSManager::get_type(uint64_t id) const
} }
else else
{ {
MXS_WARNING("Using unknown prepared statement with ID %lu", id); MXS_WARNING("Using unknown prepared statement with ID %u", id);
} }
ss_dassert(rval != QUERY_TYPE_UNKNOWN);
return rval; return rval;
} }
void PSManager::store(GWBUF* buffer, uint64_t id) void PSManager::store(GWBUF* buffer, uint32_t id)
{ {
ss_dassert(mxs_mysql_get_command(buffer) == MYSQL_COM_STMT_PREPARE || ss_dassert(mxs_mysql_get_command(buffer) == MYSQL_COM_STMT_PREPARE ||
qc_query_is_type(qc_get_type_mask(buffer), qc_query_is_type(qc_get_type_mask(buffer),

View File

@ -91,15 +91,9 @@ void handle_connection_keepalive(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
ss_dassert(nserv < rses->rses_nbackends); ss_dassert(nserv < rses->rses_nbackends);
} }
static inline bool is_ps_command(uint8_t cmd) uint32_t get_stmt_id(ROUTER_CLIENT_SES* rses, GWBUF* buffer)
{ {
return cmd == MYSQL_COM_STMT_EXECUTE || uint32_t rval = 0;
cmd == MYSQL_COM_STMT_SEND_LONG_DATA;
}
uint64_t get_stmt_id(ROUTER_CLIENT_SES* rses, GWBUF* buffer)
{
uint64_t rval = 0;
// All COM_STMT type statements store the ID in the same place // All COM_STMT type statements store the ID in the same place
uint32_t id = mxs_mysql_extract_ps_id(buffer); uint32_t id = mxs_mysql_extract_ps_id(buffer);
@ -113,6 +107,12 @@ uint64_t get_stmt_id(ROUTER_CLIENT_SES* rses, GWBUF* buffer)
return rval; return rval;
} }
void replace_stmt_id(GWBUF* buffer, uint32_t id)
{
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
gw_mysql_set_byte4(ptr, id);
}
/** /**
* Routing function. Find out query type, backend type, and target DCB(s). * Routing function. Find out query type, backend type, and target DCB(s).
* Then route query to found target(s). * Then route query to found target(s).
@ -135,7 +135,6 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
/* packet_type is a problem as it is MySQL specific */ /* packet_type is a problem as it is MySQL specific */
uint8_t command = determine_packet_type(querybuf, &non_empty_packet); uint8_t command = determine_packet_type(querybuf, &non_empty_packet);
uint32_t qtype = determine_query_type(querybuf, command, non_empty_packet); uint32_t qtype = determine_query_type(querybuf, command, non_empty_packet);
uint64_t stmt_id = 0;
if (non_empty_packet) if (non_empty_packet)
{ {
@ -163,8 +162,6 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
* eventually to master * eventually to master
*/ */
uint32_t ps_type;
if (command == MYSQL_COM_QUERY && if (command == MYSQL_COM_QUERY &&
qc_get_operation(querybuf) == QUERY_OP_EXECUTE) qc_get_operation(querybuf) == QUERY_OP_EXECUTE)
{ {
@ -173,11 +170,12 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
} }
else if (is_ps_command(command)) else if (is_ps_command(command))
{ {
stmt_id = get_stmt_id(rses, querybuf); uint32_t stmt_id = get_stmt_id(rses, querybuf);
qtype = rses->ps_manager.get_type(stmt_id); qtype = rses->ps_manager.get_type(stmt_id);
replace_stmt_id(querybuf, stmt_id);
} }
route_target = get_route_target(rses, qtype, querybuf->hint); route_target = get_route_target(rses, command, qtype, querybuf->hint);
} }
else else
{ {
@ -233,7 +231,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
if (target && succp) /*< Have DCB of the target backend */ if (target && succp) /*< Have DCB of the target backend */
{ {
ss_dassert(!store_stmt || TARGET_IS_SLAVE(route_target)); ss_dassert(!store_stmt || TARGET_IS_SLAVE(route_target));
handle_got_target(inst, rses, querybuf, target, store_stmt, stmt_id); handle_got_target(inst, rses, querybuf, target, store_stmt);
} }
} }
@ -541,7 +539,7 @@ SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
* @return bitfield including the routing target, or the target server name * @return bitfield including the routing target, or the target server name
* if the query would otherwise be routed to slave. * if the query would otherwise be routed to slave.
*/ */
route_target_t get_route_target(ROUTER_CLIENT_SES *rses, route_target_t get_route_target(ROUTER_CLIENT_SES *rses, uint8_t command,
uint32_t qtype, HINT *hint) uint32_t qtype, HINT *hint)
{ {
bool trx_active = session_trx_is_active(rses->client_dcb->session); bool trx_active = session_trx_is_active(rses->client_dcb->session);
@ -1044,8 +1042,7 @@ static inline bool query_creates_reply(mysql_server_cmd_t cmd)
* @return True on success * @return True on success
*/ */
bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, SRWBackend& target, GWBUF *querybuf, SRWBackend& target, bool store)
bool store, uint64_t stmt_id)
{ {
/** /**
* If the transaction is READ ONLY set forced_node to this backend. * If the transaction is READ ONLY set forced_node to this backend.
@ -1074,7 +1071,7 @@ bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
response = mxs::Backend::EXPECT_RESPONSE; response = mxs::Backend::EXPECT_RESPONSE;
} }
if (target->write(gwbuf_clone(querybuf), response, stmt_id)) if (target->write(gwbuf_clone(querybuf), response))
{ {
if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target->server())) if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target->server()))
{ {