diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 47c3a252b..fa10970c5 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -605,6 +605,7 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, { /** We are already processing a request from the client. Store the * new query and wait for the previous one to complete. */ + MXS_DEBUG("Storing query, expecting %d replies", rses->expected_responses); rses->query_queue = gwbuf_append(rses->query_queue, querybuf); querybuf = NULL; rval = 1; @@ -711,41 +712,66 @@ static void diagnostics(MXS_ROUTER *instance, DCB *dcb) */ bool reply_is_complete(backend_ref_t* bref, GWBUF *buffer) { + size_t offset = 0; + size_t len = gwbuf_length(buffer); - if (bref->reply_state == REPLY_STATE_START && - !mxs_mysql_is_result_set(buffer)) + while (offset < len) { - /** Not a result set, we have the complete response */ - LOG_RS(bref, REPLY_STATE_DONE); - bref->reply_state = REPLY_STATE_DONE; - } - else - { - int more; - int n_eof = bref->reply_state == REPLY_STATE_RSET_ROWS ? 1 : 0; - n_eof += modutil_count_signal_packets(buffer, 0, n_eof, &more); - - mysql_server_cmd_t cmd = mxs_mysql_current_command(bref->bref_dcb->session); - - if (n_eof == 0) + if (bref->reply_state == REPLY_STATE_START && + !mxs_mysql_is_result_set(buffer, offset)) { - /** Waiting for the EOF packet after the column definitions */ - LOG_RS(bref, REPLY_STATE_RSET_COLDEF); - bref->reply_state = REPLY_STATE_RSET_COLDEF; - } - else if (n_eof == 1 && cmd != MYSQL_COM_FIELD_LIST) - { - /** Waiting for the EOF packet after the rows */ - LOG_RS(bref, REPLY_STATE_RSET_ROWS); - bref->reply_state = REPLY_STATE_RSET_ROWS; + if (!mxs_mysql_more_results_after_ok(buffer, offset)) + { + /** Not a result set, we have the complete response */ + LOG_RS(bref, REPLY_STATE_DONE); + bref->reply_state = REPLY_STATE_DONE; + break; + } + + uint8_t header[MYSQL_HEADER_LEN]; + gwbuf_copy_data(buffer, offset, sizeof(header), header); + offset += MYSQL_GET_PAYLOAD_LEN(header) + MYSQL_HEADER_LEN; } else { - /** We either have a complete result set or a response to - * a COM_FIELD_LIST command */ - ss_dassert(n_eof == 2 || (n_eof == 1 && cmd == MYSQL_COM_FIELD_LIST)); - LOG_RS(bref, REPLY_STATE_DONE); - bref->reply_state = REPLY_STATE_DONE; + bool more = false; + int old_eof = bref->reply_state == REPLY_STATE_RSET_ROWS ? 1 : 0; + + int n_eof = modutil_count_signal_packets(buffer, old_eof, &more, &offset); + mysql_server_cmd_t cmd = mxs_mysql_current_command(bref->bref_dcb->session); + + if (n_eof == 0) + { + /** Waiting for the EOF packet after the column definitions */ + LOG_RS(bref, REPLY_STATE_RSET_COLDEF); + bref->reply_state = REPLY_STATE_RSET_COLDEF; + } + else if (n_eof == 1 && cmd != MYSQL_COM_FIELD_LIST) + { + /** Waiting for the EOF packet after the rows */ + LOG_RS(bref, REPLY_STATE_RSET_ROWS); + bref->reply_state = REPLY_STATE_RSET_ROWS; + } + else + { + /** We either have a complete result set or a response to + * a COM_FIELD_LIST command */ + ss_dassert(n_eof == 2 || (n_eof == 1 && cmd == MYSQL_COM_FIELD_LIST)); + LOG_RS(bref, REPLY_STATE_DONE); + bref->reply_state = REPLY_STATE_DONE; + + if (more) + { + /** The server will send more resultsets */ + LOG_RS(bref, REPLY_STATE_START); + bref->reply_state = REPLY_STATE_START; + } + else + { + ss_dassert(offset == len); + break; + } + } } } @@ -808,6 +834,11 @@ static void clientReply(MXS_ROUTER *instance, ss_dassert(router_cli_ses->expected_responses >= 0); ss_dassert(bref->reply_state == REPLY_STATE_DONE); } + else + { + MXS_DEBUG("Reply not yet complete, waiting for %d replies", router_cli_ses->expected_responses); + } + /** * Active cursor means that reply is from session command * execution. diff --git a/server/modules/routing/readwritesplit/rwsplit_mysql.c b/server/modules/routing/readwritesplit/rwsplit_mysql.c index 3f0d6b5f1..0a0e30863 100644 --- a/server/modules/routing/readwritesplit/rwsplit_mysql.c +++ b/server/modules/routing/readwritesplit/rwsplit_mysql.c @@ -218,80 +218,46 @@ log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t * @param qtype Query type * @return bool indicating whether the session can continue */ -bool -handle_target_is_all(route_target_t route_target, - ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, - GWBUF *querybuf, int packet_type, qc_query_type_t qtype) +bool handle_target_is_all(route_target_t route_target, ROUTER_INSTANCE *inst, + ROUTER_CLIENT_SES *rses, GWBUF *querybuf, + int packet_type, qc_query_type_t qtype) { - bool result; + bool result = false; - /** Multiple, conflicting routing target. Return error */ if (TARGET_IS_MASTER(route_target) || TARGET_IS_SLAVE(route_target)) { - backend_ref_t *bref = rses->rses_backend_ref; + /** + * Conflicting routing targets. Return an error to the client. + */ - /* NOTE: modutil_get_query is MySQL specific */ char *query_str = modutil_get_query(querybuf); char *qtype_str = qc_typemask_to_string(qtype); - /* NOTE: packet_type is MySQL specific */ MXS_ERROR("Can't route %s:%s:\"%s\". SELECT with session data " "modification is not supported if configuration parameter " "use_sql_variables_in=all .", STRPACKETTYPE(packet_type), qtype_str, (query_str == NULL ? "(empty)" : query_str)); - MXS_INFO("Unable to route the query without losing session data " - "modification from other servers. <"); + GWBUF *errbuf = modutil_create_mysql_err_msg(1, 0, 1064, "42000", + "Routing query to backend failed. " + "See the error log for further details."); - while (bref != NULL && !BREF_IS_IN_USE(bref)) + if (errbuf) { - bref++; - } - - if (bref != NULL && BREF_IS_IN_USE(bref)) - { - /** Create and add MySQL error to eventqueue */ - modutil_reply_parse_error(bref->bref_dcb, - MXS_STRDUP_A("Routing query to backend failed. " - "See the error log for further " - "details."), 0); + rses->client_dcb->func.write(rses->client_dcb, errbuf); result = true; } - else - { - /** - * If there were no available backend references - * available return false - session will be closed - */ - MXS_ERROR("Sending error message to client " - "failed. Router doesn't have any " - "available backends. Session will be " - "closed."); - result = false; - } - /* Test shouldn't be needed */ - if (query_str) - { - MXS_FREE(query_str); - } - if (qtype_str) - { - MXS_FREE(qtype_str); - } - return result; - } - /** - * It is not sure if the session command in question requires - * response. Statement is examined in route_session_write. - * Router locking is done inside the function. - */ - result = route_session_write(rses, gwbuf_clone(querybuf), inst, - packet_type, qtype); - if (result) + MXS_FREE(query_str); + MXS_FREE(qtype_str); + } + else if (route_session_write(rses, gwbuf_clone(querybuf), inst, packet_type, qtype)) { + + result = true; atomic_add_uint64(&inst->stats.n_all, 1); } + return result; }