MXS-852: Track COM_STMT_EXECUTE by statement ID

The COM_STMT_EXECUTE targets are now tracked per statement ID. This should
theoretically allow parallel execution of COM_STMT_EXECUTE commands that
use cursors but the current implementation of the reply state processing
does not yet allow it.
This commit is contained in:
Markus Mäkelä 2017-06-23 12:47:33 +03:00
parent 40120264e7
commit 296c1001a2
4 changed files with 30 additions and 14 deletions

View File

@ -933,7 +933,7 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
ss_dassert(rses->expected_responses || rses->query_queue); ss_dassert(rses->expected_responses || rses->query_queue);
/** We are already processing a request from the client. Store the /** We are already processing a request from the client. Store the
* new query and wait for the previous one to complete. */ * new query and wait for the previous one to complete. */
MXS_DEBUG("Storing query (len: %d cmd: %0x), expecting %d replies", MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command",
gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4],
rses->expected_responses); rses->expected_responses);
rses->query_queue = gwbuf_append(rses->query_queue, querybuf); rses->query_queue = gwbuf_append(rses->query_queue, querybuf);

View File

@ -301,6 +301,9 @@ typedef std::list<SRWBackend> SRWBackendList;
typedef std::tr1::unordered_set<std::string> TableSet; typedef std::tr1::unordered_set<std::string> TableSet;
typedef std::map<uint64_t, uint8_t> ResponseMap; typedef std::map<uint64_t, uint8_t> ResponseMap;
/** Map of COM_STMT_EXECUTE targets by internal ID */
typedef std::tr1::unordered_map<uint32_t, SRWBackend> ExecMap;
/** /**
* The client session structure used within this router. * The client session structure used within this router.
*/ */
@ -311,7 +314,6 @@ struct ROUTER_CLIENT_SES
SRWBackendList backends; /**< List of backend servers */ SRWBackendList backends; /**< List of backend servers */
SRWBackend current_master; /**< Current master server */ SRWBackend current_master; /**< Current master server */
SRWBackend target_node; /**< The currently locked target node */ SRWBackend target_node; /**< The currently locked target node */
SRWBackend last_exec_target; /**< Node where the latest COM_STMT_EXECUTE was sent */
rwsplit_config_t rses_config; /**< copied config info from router instance */ rwsplit_config_t rses_config; /**< copied config info from router instance */
int rses_nbackends; int rses_nbackends;
enum ld_state load_data_state; /**< Current load data state */ enum ld_state load_data_state; /**< Current load data state */
@ -330,6 +332,7 @@ struct ROUTER_CLIENT_SES
uint64_t recv_sescmd; /**< ID of the most recently completed session command */ uint64_t recv_sescmd; /**< ID of the most recently completed session command */
PSManager ps_manager; /**< Prepared statement manager*/ PSManager ps_manager; /**< Prepared statement manager*/
ClientHandleMap ps_handles; /**< Client PS handle to internal ID mapping */ ClientHandleMap ps_handles; /**< Client PS handle to internal ID mapping */
ExecMap exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */
skygw_chk_t rses_chk_tail; skygw_chk_t rses_chk_tail;
}; };

View File

@ -70,7 +70,8 @@ void handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
uint8_t packet_type, uint32_t *qtype); uint8_t packet_type, uint32_t *qtype);
SRWBackend handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, SRWBackend handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
route_target_t route_target); route_target_t route_target);
SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, uint8_t cmd); SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
uint8_t cmd, uint32_t id);
bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
SRWBackend* dest); SRWBackend* dest);
bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,

View File

@ -129,6 +129,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
route_target_t route_target; route_target_t route_target;
bool succp = false; bool succp = false;
bool non_empty_packet; bool non_empty_packet;
uint32_t stmt_id = 0;
ss_dassert(querybuf->next == NULL); // The buffer must be contiguous. ss_dassert(querybuf->next == NULL); // The buffer must be contiguous.
@ -170,7 +171,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
} }
else if (is_ps_command(command)) else if (is_ps_command(command))
{ {
uint32_t stmt_id = get_stmt_id(rses, querybuf); stmt_id = get_stmt_id(rses, querybuf);
qtype = rses->ps_manager.get_type(stmt_id); qtype = rses->ps_manager.get_type(stmt_id);
replace_stmt_id(querybuf, stmt_id); replace_stmt_id(querybuf, stmt_id);
} }
@ -210,7 +211,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
} }
else if (TARGET_IS_SLAVE(route_target)) else if (TARGET_IS_SLAVE(route_target))
{ {
if ((target = handle_slave_is_target(inst, rses, command))) if ((target = handle_slave_is_target(inst, rses, command, stmt_id)))
{ {
succp = true; succp = true;
store_stmt = rses->rses_config.retry_failed_reads; store_stmt = rses->rses_config.retry_failed_reads;
@ -235,10 +236,11 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
if (succp && command == MYSQL_COM_STMT_EXECUTE) if (succp && command == MYSQL_COM_STMT_EXECUTE)
{ {
/** Track where the target of the last COM_STMT_EXECUTE. This /** Track the targets of the COM_STMT_EXECUTE statements. This
* information is used to route all COM_STMT_FETCH commands * information is used to route all COM_STMT_FETCH commands
* to the same server where the COM_STMT_EXECUTE was done. */ * to the same server where the COM_STMT_EXECUTE was done. */
rses->last_exec_target = target; ss_dassert(stmt_id > 0);
rses->exec_map[stmt_id] = target;
MXS_INFO("COM_STMT_EXECUTE on %s", target->uri()); MXS_INFO("COM_STMT_EXECUTE on %s", target->uri());
} }
} }
@ -929,19 +931,29 @@ SRWBackend handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
* @return bool - true if succeeded, false otherwise * @return bool - true if succeeded, false otherwise
*/ */
SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
uint8_t cmd) uint8_t cmd, uint32_t stmt_id)
{ {
int rlag_max = rses_get_max_replication_lag(rses); int rlag_max = rses_get_max_replication_lag(rses);
SRWBackend target; SRWBackend target;
if (cmd == MYSQL_COM_STMT_FETCH && rses->last_exec_target) if (cmd == MYSQL_COM_STMT_FETCH)
{ {
/** The COM_STMT_FETCH must be executed on the same server as the /** The COM_STMT_FETCH must be executed on the same server as the
* COM_STMT_EXECUTE was executed on */ * COM_STMT_EXECUTE was executed on */
target = rses->last_exec_target; ExecMap::iterator it = rses->exec_map.find(stmt_id);
if (it != rses->exec_map.end())
{
target = it->second;
MXS_INFO("COM_STMT_FETCH on %s", target->uri()); MXS_INFO("COM_STMT_FETCH on %s", target->uri());
} }
else else
{
MXS_WARNING("Unknown statement ID %u used in COM_STMT_FETCH", stmt_id);
}
}
if (!target)
{ {
target = get_target_backend(rses, BE_SLAVE, NULL, rlag_max); target = get_target_backend(rses, BE_SLAVE, NULL, rlag_max);
} }