MXS-852: Use stored query type for COM_STMT_EXECUTE
When a COM_STMT_EXECUTE or a COM_STMT_SEND_LONG_DATA command is executed, the query type of the prepared statement is used. This allows read-only prepared statements to be load balanced across slaves.
This commit is contained in:
@ -159,7 +159,7 @@ public:
|
|||||||
*
|
*
|
||||||
* @return True if data was written successfully
|
* @return True if data was written successfully
|
||||||
*/
|
*/
|
||||||
bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE);
|
virtual bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Write an authentication switch request to the backend server
|
* @brief Write an authentication switch request to the backend server
|
||||||
|
@ -537,12 +537,14 @@ uint8_t mxs_mysql_get_command(GWBUF* buffer);
|
|||||||
bool mxs_mysql_extract_ps_response(GWBUF* buffer, MXS_PS_RESPONSE* out);
|
bool mxs_mysql_extract_ps_response(GWBUF* buffer, MXS_PS_RESPONSE* out);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Extract the ID of a COM_STMT_EXECUTE
|
* @brief Extract the ID from a COM_STMT command
|
||||||
*
|
*
|
||||||
* @param buffer Buffer containing a COM_STMT_EXECUTE packet
|
* All the COM_STMT type commands store the statement ID in the same place.
|
||||||
*
|
*
|
||||||
* @return The ID of the prepared statement being executed or 0 on failure
|
* @param buffer Buffer containing one of the COM_STMT commands (not COM_STMT_PREPARE)
|
||||||
|
*
|
||||||
|
* @return The statement ID
|
||||||
*/
|
*/
|
||||||
uint32_t mxs_mysql_extract_execute(GWBUF* buffer);
|
uint32_t mxs_mysql_extract_ps_id(GWBUF* buffer);
|
||||||
|
|
||||||
MXS_END_DECLS
|
MXS_END_DECLS
|
||||||
|
@ -1637,7 +1637,7 @@ bool mxs_mysql_extract_ps_response(GWBUF* buffer, MXS_PS_RESPONSE* out)
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t mxs_mysql_extract_execute(GWBUF* buffer)
|
uint32_t mxs_mysql_extract_ps_id(GWBUF* buffer)
|
||||||
{
|
{
|
||||||
uint32_t rval = 0;
|
uint32_t rval = 0;
|
||||||
uint8_t id[MYSQL_PS_ID_SIZE];
|
uint8_t id[MYSQL_PS_ID_SIZE];
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
#include <maxscale/service.h>
|
#include <maxscale/service.h>
|
||||||
#include <maxscale/backend.hh>
|
#include <maxscale/backend.hh>
|
||||||
#include <maxscale/session_command.hh>
|
#include <maxscale/session_command.hh>
|
||||||
|
#include <maxscale/protocol/mysql.h>
|
||||||
|
|
||||||
enum backend_type_t
|
enum backend_type_t
|
||||||
{
|
{
|
||||||
@ -213,6 +214,23 @@ public:
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE, uint64_t id = 0)
|
||||||
|
{
|
||||||
|
if (id)
|
||||||
|
{
|
||||||
|
BackendHandleMap::iterator it = m_ps_handles.find(id);
|
||||||
|
|
||||||
|
if (it != m_ps_handles.end())
|
||||||
|
{
|
||||||
|
/** Replace the client handle with the real PS handle */
|
||||||
|
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
|
||||||
|
gw_mysql_set_byte4(ptr, it->second);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return mxs::Backend::write(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
reply_state_t m_reply_state;
|
reply_state_t m_reply_state;
|
||||||
BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */
|
BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */
|
||||||
|
@ -75,7 +75,8 @@ SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses
|
|||||||
bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||||
SRWBackend* dest);
|
SRWBackend* dest);
|
||||||
bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||||
GWBUF *querybuf, SRWBackend& target, bool store);
|
GWBUF *querybuf, SRWBackend& target,
|
||||||
|
bool store, uint64_t stmt_id);
|
||||||
bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
|
bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
|
||||||
uint8_t command, uint32_t type);
|
uint8_t command, uint32_t type);
|
||||||
|
|
||||||
|
@ -91,6 +91,28 @@ void handle_connection_keepalive(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
|||||||
ss_dassert(nserv < rses->rses_nbackends);
|
ss_dassert(nserv < rses->rses_nbackends);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline bool is_ps_command(uint8_t cmd)
|
||||||
|
{
|
||||||
|
return cmd == MYSQL_COM_STMT_EXECUTE ||
|
||||||
|
cmd == MYSQL_COM_STMT_SEND_LONG_DATA;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t get_stmt_id(ROUTER_CLIENT_SES* rses, GWBUF* buffer)
|
||||||
|
{
|
||||||
|
uint64_t rval = 0;
|
||||||
|
|
||||||
|
// All COM_STMT type statements store the ID in the same place
|
||||||
|
uint32_t id = mxs_mysql_extract_ps_id(buffer);
|
||||||
|
ClientHandleMap::iterator it = rses->ps_handles.find(id);
|
||||||
|
|
||||||
|
if (it != rses->ps_handles.end())
|
||||||
|
{
|
||||||
|
rval = it->second;
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Routing function. Find out query type, backend type, and target DCB(s).
|
* Routing function. Find out query type, backend type, and target DCB(s).
|
||||||
* Then route query to found target(s).
|
* Then route query to found target(s).
|
||||||
@ -113,6 +135,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
|||||||
/* packet_type is a problem as it is MySQL specific */
|
/* packet_type is a problem as it is MySQL specific */
|
||||||
uint8_t command = determine_packet_type(querybuf, &non_empty_packet);
|
uint8_t command = determine_packet_type(querybuf, &non_empty_packet);
|
||||||
uint32_t qtype = determine_query_type(querybuf, command, non_empty_packet);
|
uint32_t qtype = determine_query_type(querybuf, command, non_empty_packet);
|
||||||
|
uint64_t stmt_id = 0;
|
||||||
|
|
||||||
if (non_empty_packet)
|
if (non_empty_packet)
|
||||||
{
|
{
|
||||||
@ -148,17 +171,10 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
|||||||
std::string id = extract_text_ps_id(querybuf);
|
std::string id = extract_text_ps_id(querybuf);
|
||||||
qtype = rses->ps_manager.get_type(id);
|
qtype = rses->ps_manager.get_type(id);
|
||||||
}
|
}
|
||||||
else if (command == MYSQL_COM_STMT_EXECUTE)
|
else if (is_ps_command(command))
|
||||||
{
|
{
|
||||||
uint32_t id = mxs_mysql_extract_execute(querybuf);
|
stmt_id = get_stmt_id(rses, querybuf);
|
||||||
ClientHandleMap::iterator it = rses->ps_handles.find(id);
|
qtype = rses->ps_manager.get_type(stmt_id);
|
||||||
|
|
||||||
if (it != rses->ps_handles.end())
|
|
||||||
{
|
|
||||||
char *qtypestr = qc_typemask_to_string(rses->ps_manager.get_type(it->second));
|
|
||||||
MXS_INFO("Client handle %u maps to %lu of type %s", id, it->second, qtypestr);
|
|
||||||
MXS_FREE(qtypestr);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
route_target = get_route_target(rses, qtype, querybuf->hint);
|
route_target = get_route_target(rses, qtype, querybuf->hint);
|
||||||
@ -217,7 +233,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
|||||||
if (target && succp) /*< Have DCB of the target backend */
|
if (target && succp) /*< Have DCB of the target backend */
|
||||||
{
|
{
|
||||||
ss_dassert(!store_stmt || TARGET_IS_SLAVE(route_target));
|
ss_dassert(!store_stmt || TARGET_IS_SLAVE(route_target));
|
||||||
handle_got_target(inst, rses, querybuf, target, store_stmt);
|
handle_got_target(inst, rses, querybuf, target, store_stmt, stmt_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1023,20 +1039,13 @@ static inline bool query_creates_reply(mysql_server_cmd_t cmd)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Handle got a target
|
* @brief Handle writing to a target server
|
||||||
*
|
*
|
||||||
* One of the possible types of handling required when a request is routed
|
* @return True on success
|
||||||
*
|
|
||||||
* @param inst Router instance
|
|
||||||
* @param ses Router session
|
|
||||||
* @param querybuf Buffer containing query to be routed
|
|
||||||
* @param target_dcb DCB for the target server
|
|
||||||
*
|
|
||||||
* @return bool - true if succeeded, false otherwise
|
|
||||||
*/
|
*/
|
||||||
bool
|
bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||||
handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
GWBUF *querybuf, SRWBackend& target,
|
||||||
GWBUF *querybuf, SRWBackend& target, bool store)
|
bool store, uint64_t stmt_id)
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* If the transaction is READ ONLY set forced_node to this backend.
|
* If the transaction is READ ONLY set forced_node to this backend.
|
||||||
@ -1065,7 +1074,7 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
|||||||
response = mxs::Backend::EXPECT_RESPONSE;
|
response = mxs::Backend::EXPECT_RESPONSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (target->write(gwbuf_clone(querybuf), response))
|
if (target->write(gwbuf_clone(querybuf), response, stmt_id))
|
||||||
{
|
{
|
||||||
if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target->server()))
|
if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target->server()))
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user