Process PS responses in readwritesplit

When a prepared statement preparation is being routed to the master, the
response is now collected into one buffer before being sent back. This
allows proper processing of pipelined prepared statements.
This commit is contained in:
Markus Mäkelä 2017-04-09 04:26:21 +03:00
parent 36d06960bf
commit 9a221c46f3
2 changed files with 13 additions and 5 deletions

View File

@ -162,7 +162,7 @@ MXS_MODULE *MXS_CREATE_MODULE()
MXS_MODULE_API_ROUTER, MXS_MODULE_GA, MXS_ROUTER_VERSION,
"A Read/Write splitting router for enhancement read scalability",
"V1.1.0",
RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING,
RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_STMT_OUTPUT,
&MyObject,
NULL, /* Process init. */
NULL, /* Process finish. */
@ -616,7 +616,9 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
ss_dassert(rses->expected_responses || rses->query_queue);
/** We are already processing a request from the client. Store the
* new query and wait for the previous one to complete. */
MXS_DEBUG("Storing query, expecting %d replies", rses->expected_responses);
MXS_DEBUG("Storing query (len: %d cmd: %0x), expecting %d replies",
gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4],
rses->expected_responses);
rses->query_queue = gwbuf_append(rses->query_queue, querybuf);
querybuf = NULL;
rval = 1;
@ -719,9 +721,11 @@ static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
*/
bool reply_is_complete(backend_ref_t* bref, GWBUF *buffer)
{
mysql_server_cmd_t cmd = mxs_mysql_current_command(bref->bref_dcb->session);
if (bref->reply_state == REPLY_STATE_START && !mxs_mysql_is_result_set(buffer))
{
if (!mxs_mysql_more_results_after_ok(buffer))
if (cmd == MYSQL_COM_STMT_PREPARE || !mxs_mysql_more_results_after_ok(buffer))
{
/** Not a result set, we have the complete response */
LOG_RS(bref, REPLY_STATE_DONE);
@ -732,9 +736,7 @@ bool reply_is_complete(backend_ref_t* bref, GWBUF *buffer)
{
bool more = false;
int old_eof = bref->reply_state == REPLY_STATE_RSET_ROWS ? 1 : 0;
int n_eof = modutil_count_signal_packets(buffer, old_eof, &more);
mysql_server_cmd_t cmd = mxs_mysql_current_command(bref->bref_dcb->session);
if (n_eof == 0)
{

View File

@ -1280,6 +1280,12 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
/** The session command cursor must not be active */
ss_dassert(!sescmd_cursor_is_active(&bref->bref_sescmd_cur));
/** We only want the complete response to the preparation */
if (MYSQL_GET_COMMAND(GWBUF_DATA(querybuf)) == MYSQL_COM_STMT_PREPARE)
{
gwbuf_set_type(querybuf, GWBUF_TYPE_COLLECT_RESULT);
}
if (target_dcb->func.write(target_dcb, gwbuf_clone(querybuf)) == 1)
{
if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target_dcb->server))