MXS-1203: Process multiple results correctly with readwritesplit

The readwritesplit didn't correctly process the response packets that
contained more than one part of a multi-result response. By processing the
packets in a loop, this problem is avoided.

Removed some of the more "unique" ways of sending error messages in favor
of simply writing the error to the client DCB. This removes the need for
extra logic in the clientReply response handling.
This commit is contained in:
Markus Mäkelä 2017-04-03 14:44:08 +03:00
parent d7258fffd0
commit ea38d511e3
2 changed files with 79 additions and 82 deletions

View File

@ -605,6 +605,7 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
{
/** We are already processing a request from the client. Store the
* new query and wait for the previous one to complete. */
MXS_DEBUG("Storing query, expecting %d replies", rses->expected_responses);
rses->query_queue = gwbuf_append(rses->query_queue, querybuf);
querybuf = NULL;
rval = 1;
@ -711,41 +712,66 @@ static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
*/
bool reply_is_complete(backend_ref_t* bref, GWBUF *buffer)
{
size_t offset = 0;
size_t len = gwbuf_length(buffer);
if (bref->reply_state == REPLY_STATE_START &&
!mxs_mysql_is_result_set(buffer))
while (offset < len)
{
/** Not a result set, we have the complete response */
LOG_RS(bref, REPLY_STATE_DONE);
bref->reply_state = REPLY_STATE_DONE;
}
else
{
int more;
int n_eof = bref->reply_state == REPLY_STATE_RSET_ROWS ? 1 : 0;
n_eof += modutil_count_signal_packets(buffer, 0, n_eof, &more);
mysql_server_cmd_t cmd = mxs_mysql_current_command(bref->bref_dcb->session);
if (n_eof == 0)
if (bref->reply_state == REPLY_STATE_START &&
!mxs_mysql_is_result_set(buffer, offset))
{
/** Waiting for the EOF packet after the column definitions */
LOG_RS(bref, REPLY_STATE_RSET_COLDEF);
bref->reply_state = REPLY_STATE_RSET_COLDEF;
}
else if (n_eof == 1 && cmd != MYSQL_COM_FIELD_LIST)
{
/** Waiting for the EOF packet after the rows */
LOG_RS(bref, REPLY_STATE_RSET_ROWS);
bref->reply_state = REPLY_STATE_RSET_ROWS;
if (!mxs_mysql_more_results_after_ok(buffer, offset))
{
/** Not a result set, we have the complete response */
LOG_RS(bref, REPLY_STATE_DONE);
bref->reply_state = REPLY_STATE_DONE;
break;
}
uint8_t header[MYSQL_HEADER_LEN];
gwbuf_copy_data(buffer, offset, sizeof(header), header);
offset += MYSQL_GET_PAYLOAD_LEN(header) + MYSQL_HEADER_LEN;
}
else
{
/** We either have a complete result set or a response to
* a COM_FIELD_LIST command */
ss_dassert(n_eof == 2 || (n_eof == 1 && cmd == MYSQL_COM_FIELD_LIST));
LOG_RS(bref, REPLY_STATE_DONE);
bref->reply_state = REPLY_STATE_DONE;
bool more = false;
int old_eof = bref->reply_state == REPLY_STATE_RSET_ROWS ? 1 : 0;
int n_eof = modutil_count_signal_packets(buffer, old_eof, &more, &offset);
mysql_server_cmd_t cmd = mxs_mysql_current_command(bref->bref_dcb->session);
if (n_eof == 0)
{
/** Waiting for the EOF packet after the column definitions */
LOG_RS(bref, REPLY_STATE_RSET_COLDEF);
bref->reply_state = REPLY_STATE_RSET_COLDEF;
}
else if (n_eof == 1 && cmd != MYSQL_COM_FIELD_LIST)
{
/** Waiting for the EOF packet after the rows */
LOG_RS(bref, REPLY_STATE_RSET_ROWS);
bref->reply_state = REPLY_STATE_RSET_ROWS;
}
else
{
/** We either have a complete result set or a response to
* a COM_FIELD_LIST command */
ss_dassert(n_eof == 2 || (n_eof == 1 && cmd == MYSQL_COM_FIELD_LIST));
LOG_RS(bref, REPLY_STATE_DONE);
bref->reply_state = REPLY_STATE_DONE;
if (more)
{
/** The server will send more resultsets */
LOG_RS(bref, REPLY_STATE_START);
bref->reply_state = REPLY_STATE_START;
}
else
{
ss_dassert(offset == len);
break;
}
}
}
}
@ -808,6 +834,11 @@ static void clientReply(MXS_ROUTER *instance,
ss_dassert(router_cli_ses->expected_responses >= 0);
ss_dassert(bref->reply_state == REPLY_STATE_DONE);
}
else
{
MXS_DEBUG("Reply not yet complete, waiting for %d replies", router_cli_ses->expected_responses);
}
/**
* Active cursor means that reply is from session command
* execution.

View File

@ -218,80 +218,46 @@ log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t
* @param qtype Query type
* @return bool indicating whether the session can continue
*/
bool
handle_target_is_all(route_target_t route_target,
ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, int packet_type, qc_query_type_t qtype)
bool handle_target_is_all(route_target_t route_target, ROUTER_INSTANCE *inst,
ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
int packet_type, qc_query_type_t qtype)
{
bool result;
bool result = false;
/** Multiple, conflicting routing target. Return error */
if (TARGET_IS_MASTER(route_target) || TARGET_IS_SLAVE(route_target))
{
backend_ref_t *bref = rses->rses_backend_ref;
/**
* Conflicting routing targets. Return an error to the client.
*/
/* NOTE: modutil_get_query is MySQL specific */
char *query_str = modutil_get_query(querybuf);
char *qtype_str = qc_typemask_to_string(qtype);
/* NOTE: packet_type is MySQL specific */
MXS_ERROR("Can't route %s:%s:\"%s\". SELECT with session data "
"modification is not supported if configuration parameter "
"use_sql_variables_in=all .", STRPACKETTYPE(packet_type),
qtype_str, (query_str == NULL ? "(empty)" : query_str));
MXS_INFO("Unable to route the query without losing session data "
"modification from other servers. <");
GWBUF *errbuf = modutil_create_mysql_err_msg(1, 0, 1064, "42000",
"Routing query to backend failed. "
"See the error log for further details.");
while (bref != NULL && !BREF_IS_IN_USE(bref))
if (errbuf)
{
bref++;
}
if (bref != NULL && BREF_IS_IN_USE(bref))
{
/** Create and add MySQL error to eventqueue */
modutil_reply_parse_error(bref->bref_dcb,
MXS_STRDUP_A("Routing query to backend failed. "
"See the error log for further "
"details."), 0);
rses->client_dcb->func.write(rses->client_dcb, errbuf);
result = true;
}
else
{
/**
* If there were no available backend references
* available return false - session will be closed
*/
MXS_ERROR("Sending error message to client "
"failed. Router doesn't have any "
"available backends. Session will be "
"closed.");
result = false;
}
/* Test shouldn't be needed */
if (query_str)
{
MXS_FREE(query_str);
}
if (qtype_str)
{
MXS_FREE(qtype_str);
}
return result;
}
/**
* It is not sure if the session command in question requires
* response. Statement is examined in route_session_write.
* Router locking is done inside the function.
*/
result = route_session_write(rses, gwbuf_clone(querybuf), inst,
packet_type, qtype);
if (result)
MXS_FREE(query_str);
MXS_FREE(qtype_str);
}
else if (route_session_write(rses, gwbuf_clone(querybuf), inst, packet_type, qtype))
{
result = true;
atomic_add_uint64(&inst->stats.n_all, 1);
}
return result;
}