Fix hang with queries larger than 2^24 bytes

Readwritesplit didn't track multi-packet queries which resulted in them
being confused for pipelined queries.
This commit is contained in:
Markus Mäkelä
2017-08-23 12:33:10 +03:00
parent 2ef9fbc47a
commit ae2b9fd30d
3 changed files with 40 additions and 11 deletions

View File

@ -681,6 +681,7 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session,
rses_closed(false), rses_closed(false),
backends(backends), backends(backends),
current_master(master), current_master(master),
large_query(false),
rses_config(instance->config()), rses_config(instance->config()),
rses_nbackends(instance->service()->n_dbref), rses_nbackends(instance->service()->n_dbref),
load_data_state(LOAD_DATA_INACTIVE), load_data_state(LOAD_DATA_INACTIVE),
@ -892,7 +893,8 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
if (rses->query_queue == NULL && if (rses->query_queue == NULL &&
(rses->expected_responses == 0 || (rses->expected_responses == 0 ||
info.command == MYSQL_COM_STMT_FETCH || info.command == MYSQL_COM_STMT_FETCH ||
rses->load_data_state == LOAD_DATA_ACTIVE)) rses->load_data_state == LOAD_DATA_ACTIVE ||
rses->large_query))
{ {
/** No active or pending queries */ /** No active or pending queries */
if (route_single_stmt(inst, rses, querybuf, info)) if (route_single_stmt(inst, rses, querybuf, info))

View File

@ -181,9 +181,10 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses, GWBUF *querybuf, con
uint8_t command = info.command; uint8_t command = info.command;
uint32_t qtype = info.type; uint32_t qtype = info.type;
route_target_t route_target = info.target; route_target_t route_target = info.target;
bool not_locked_to_master = !rses->target_node || rses->target_node != rses->current_master; bool not_locked_to_master = !rses->prev_target &&
(!rses->target_node || rses->target_node != rses->current_master);
if (is_ps_command(command) && not_locked_to_master) if (not_locked_to_master && is_ps_command(command))
{ {
/** Replace the client statement ID with our internal one only if the /** Replace the client statement ID with our internal one only if the
* target node is not the current master */ * target node is not the current master */
@ -194,19 +195,28 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses, GWBUF *querybuf, con
if (TARGET_IS_ALL(route_target)) if (TARGET_IS_ALL(route_target))
{ {
// TODO: Handle payloads larger than (2^24 - 1) bytes that are routed to all servers
succp = handle_target_is_all(route_target, inst, rses, querybuf, command, qtype); succp = handle_target_is_all(route_target, inst, rses, querybuf, command, qtype);
} }
else else
{ {
bool store_stmt = false; bool store_stmt = false;
if (rses->large_query)
{
/** We're processing a large query that's split across multiple packets.
* Route it to the same backend where we routed the previous packet. */
ss_dassert(rses->prev_target);
target = rses->prev_target;
succp = true;
}
else if (TARGET_IS_NAMED_SERVER(route_target) || TARGET_IS_RLAG_MAX(route_target))
{
/** /**
* There is a hint which either names the target backend or * There is a hint which either names the target backend or
* hint which sets maximum allowed replication lag for the * hint which sets maximum allowed replication lag for the
* backend. * backend.
*/ */
if (TARGET_IS_NAMED_SERVER(route_target) ||
TARGET_IS_RLAG_MAX(route_target))
{
if ((target = handle_hinted_target(rses, querybuf, route_target))) if ((target = handle_hinted_target(rses, querybuf, route_target)))
{ {
succp = true; succp = true;
@ -1077,6 +1087,13 @@ static inline bool query_creates_reply(uint8_t cmd)
cmd != MYSQL_COM_STMT_FETCH; // Fetch is done mid-result cmd != MYSQL_COM_STMT_FETCH; // Fetch is done mid-result
} }
static inline bool is_large_query(GWBUF* buf)
{
uint32_t buflen = gwbuf_length(buf);
ss_dassert(buflen <= MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN);
return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN;
}
/** /**
* @brief Handle writing to a target server * @brief Handle writing to a target server
* *
@ -1112,6 +1129,8 @@ bool handle_got_target(RWSplit *inst, RWSplitSession *rses,
response = mxs::Backend::EXPECT_RESPONSE; response = mxs::Backend::EXPECT_RESPONSE;
} }
bool large_query = is_large_query(querybuf);
if (target->write(gwbuf_clone(querybuf), response)) if (target->write(gwbuf_clone(querybuf), response))
{ {
if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target->server())) if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target->server()))
@ -1121,7 +1140,7 @@ bool handle_got_target(RWSplit *inst, RWSplitSession *rses,
atomic_add_uint64(&inst->stats().n_queries, 1); atomic_add_uint64(&inst->stats().n_queries, 1);
if (response == mxs::Backend::EXPECT_RESPONSE) if (!rses->large_query && response == mxs::Backend::EXPECT_RESPONSE)
{ {
/** The server will reply to this command */ /** The server will reply to this command */
ss_dassert(target->get_reply_state() == REPLY_STATE_DONE); ss_dassert(target->get_reply_state() == REPLY_STATE_DONE);
@ -1145,6 +1164,12 @@ bool handle_got_target(RWSplit *inst, RWSplitSession *rses,
} }
} }
/** Store the previous target if we're processing a multi-packet query */
if ((rses->large_query = large_query))
{
rses->prev_target = target;
}
/** /**
* If a READ ONLY transaction is ending set forced_node to NULL * If a READ ONLY transaction is ending set forced_node to NULL
*/ */

View File

@ -90,6 +90,8 @@ public:
SRWBackendList backends; /**< List of backend servers */ SRWBackendList backends; /**< List of backend servers */
SRWBackend current_master; /**< Current master server */ SRWBackend current_master; /**< Current master server */
SRWBackend target_node; /**< The currently locked target node */ SRWBackend target_node; /**< The currently locked target node */
SRWBackend prev_target; /**< The previous target where a query was sent */
bool large_query; /**< Set to true when processing payloads >= 2^24 bytes */
Config rses_config; /**< copied config info from router instance */ Config rses_config; /**< copied config info from router instance */
int rses_nbackends; int rses_nbackends;
enum ld_state load_data_state; /**< Current load data state */ enum ld_state load_data_state; /**< Current load data state */