diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index b98465972..3f2dd0db2 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -124,6 +124,30 @@ SRWBackend get_backend_from_dcb(RWSplitSession *rses, DCB *dcb) return SRWBackend(); } +static SRWBackend emptyref; + +static SRWBackend& get_backend_ref_from_dcb(RWSplitSession *rses, DCB *dcb) +{ + ss_dassert(dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); + CHK_DCB(dcb); + CHK_CLIENT_RSES(rses); + + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) + { + SRWBackend& backend = *it; + + if (backend->dcb() == dcb) + { + return backend; + } + } + + /** We should always have a valid backend reference */ + ss_dassert(false); + return emptyref; +} + /** * @brief Process router options * @@ -562,22 +586,13 @@ static inline bool is_result_set(GWBUF *buffer) * * @return True if the complete response has been received */ -bool reply_is_complete(SRWBackend backend, GWBUF *buffer) +bool reply_is_complete(SRWBackend& backend, GWBUF *buffer) { - if (GWBUF_IS_COLLECTED_RESULT(buffer)) + if (backend->get_reply_state() == REPLY_STATE_START && + (!is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer))) { - // This branch should only be taken with a PS response - ss_dassert(backend->get_reply_state() == REPLY_STATE_START); - ss_dassert(backend->current_command() == MXS_COM_STMT_PREPARE || - backend->current_command() == MXS_COM_QUERY); - - // This is a complete result of a request - LOG_RS(backend, REPLY_STATE_DONE); - backend->set_reply_state(REPLY_STATE_DONE); - } - else if (backend->get_reply_state() == REPLY_STATE_START && !is_result_set(buffer)) - { - if (backend->current_command() == MXS_COM_STMT_PREPARE || + if (GWBUF_IS_COLLECTED_RESULT(buffer) || + backend->current_command() == MXS_COM_STMT_PREPARE || !is_ok(buffer) || !more_results_exist(buffer)) { /** Not a result set, we have the complete response */ @@ -1190,16 +1205,10 @@ static void clientReply(MXS_ROUTER *instance, GWBUF_IS_COLLECTED_RESULT(writebuf)); RWSplitSession *rses = (RWSplitSession *)router_session; DCB *client_dcb = backend_dcb->session->client_dcb; - CHK_CLIENT_RSES(rses); + ss_dassert(!rses->rses_closed); - if (rses->rses_closed) - { - gwbuf_free(writebuf); - return; - } - - SRWBackend backend = get_backend_from_dcb(rses, backend_dcb); + SRWBackend& backend = get_backend_ref_from_dcb(rses, backend_dcb); if (backend->get_reply_state() == REPLY_STATE_DONE) { @@ -1253,32 +1262,23 @@ static void clientReply(MXS_ROUTER *instance, bool queue_routed = false; - if (rses->expected_responses == 0) + if (rses->expected_responses == 0 && rses->query_queue) { - for (SRWBackendList::iterator it = rses->backends.begin(); - it != rses->backends.end(); it++) - { - ss_dassert((*it)->get_reply_state() == REPLY_STATE_DONE || (*it)->is_closed()); - } - - queue_routed = rses->query_queue != NULL; + queue_routed = true; route_stored_query(rses); } - else - { - ss_dassert(rses->expected_responses > 0); - } - if (writebuf && client_dcb) + if (writebuf) { + ss_dassert(client_dcb); /** Write reply to client DCB */ MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); } /** Check pending session commands */ else if (!queue_routed && backend->session_command_count()) { - MXS_INFO("Backend %s processed reply and starts to execute active cursor.", - backend->uri()); + MXS_DEBUG("Backend %s processed reply and starts to execute active cursor.", + backend->uri()); if (backend->execute_session_command()) { diff --git a/server/modules/routing/readwritesplit/rwsplit_internal.hh b/server/modules/routing/readwritesplit/rwsplit_internal.hh index cf8ff3460..ef9563dd4 100644 --- a/server/modules/routing/readwritesplit/rwsplit_internal.hh +++ b/server/modules/routing/readwritesplit/rwsplit_internal.hh @@ -47,7 +47,7 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses, GWBUF *querybuf); void closed_session_reply(GWBUF *querybuf); void print_error_packet(RWSplitSession *rses, GWBUF *buf, DCB *dcb); -void check_session_command_reply(GWBUF *writebuf, SRWBackend bref); +void check_session_command_reply(GWBUF *buffer, SRWBackend& backend); bool execute_sescmd_in_backend(SRWBackend& backend_ref); bool handle_target_is_all(route_target_t route_target, RWSplit *inst, RWSplitSession *rses, diff --git a/server/modules/routing/readwritesplit/rwsplit_mysql.cc b/server/modules/routing/readwritesplit/rwsplit_mysql.cc index 21d5a9229..824e5a057 100644 --- a/server/modules/routing/readwritesplit/rwsplit_mysql.cc +++ b/server/modules/routing/readwritesplit/rwsplit_mysql.cc @@ -291,7 +291,7 @@ void closed_session_reply(GWBUF *querybuf) * @param buffer Query buffer containing reply data * @param backend Router session data for a backend server */ -void check_session_command_reply(GWBUF *buffer, SRWBackend backend) +void check_session_command_reply(GWBUF *buffer, SRWBackend& backend) { if (MYSQL_IS_ERROR_PACKET(((uint8_t *)GWBUF_DATA(buffer)))) {