MXS-1503: Queue command if executing a session command

When a non-connected target is chosed as the target server and the session
command history is not empty, the query needs to be placed into the query
queue and routed only after the session commands have been executed.
This commit is contained in:
Markus Mäkelä
2018-03-28 16:26:46 +03:00
parent e57ac4b0a3
commit 7f0745e552
2 changed files with 28 additions and 26 deletions

View File

@ -1272,11 +1272,15 @@ static void clientReply(MXS_ROUTER *instance,
process_sescmd_response(rses, backend, &writebuf);
}
bool queue_routed = false;
if (rses->expected_responses == 0 && rses->query_queue)
if (backend->session_command_count())
{
if (backend->execute_session_command())
{
rses->expected_responses++;
}
}
else if (rses->expected_responses == 0 && rses->query_queue)
{
queue_routed = true;
route_stored_query(rses);
}
@ -1286,17 +1290,6 @@ static void clientReply(MXS_ROUTER *instance,
/** Write reply to client DCB */
MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
}
/** Check pending session commands */
else if (!queue_routed && backend->session_command_count())
{
MXS_DEBUG("Backend %s processed reply and starts to execute active cursor.",
backend->uri());
if (backend->execute_session_command())
{
rses->expected_responses++;
}
}
}

View File

@ -211,18 +211,27 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses, GWBUF *querybuf, con
if (succp && target && prepare_target(rses, target, route_target))
{
// Target server was found and is in the correct state
ss_dassert(!store_stmt || TARGET_IS_SLAVE(route_target));
succp = handle_got_target(inst, rses, querybuf, target, store_stmt);
if (succp && command == MXS_COM_STMT_EXECUTE && not_locked_to_master)
if (target->session_command_count())
{
/** Track the targets of the COM_STMT_EXECUTE statements. This
* information is used to route all COM_STMT_FETCH commands
* to the same server where the COM_STMT_EXECUTE was done. */
ss_dassert(stmt_id > 0);
rses->exec_map[stmt_id] = target;
MXS_INFO("COM_STMT_EXECUTE on %s", target->uri());
// We need to wait until the session commands are executed
rses->expected_responses++;
rses->query_queue = gwbuf_append(rses->query_queue, gwbuf_clone(querybuf));
}
else
{
// Target server was found and is in the correct state
ss_dassert(!store_stmt || TARGET_IS_SLAVE(route_target));
succp = handle_got_target(inst, rses, querybuf, target, store_stmt);
if (succp && command == MXS_COM_STMT_EXECUTE && not_locked_to_master)
{
/** Track the targets of the COM_STMT_EXECUTE statements. This
* information is used to route all COM_STMT_FETCH commands
* to the same server where the COM_STMT_EXECUTE was done. */
ss_dassert(stmt_id > 0);
rses->exec_map[stmt_id] = target;
MXS_INFO("COM_STMT_EXECUTE on %s", target->uri());
}
}
}
else