From 548d121699d3dc8a4e8a47287f551ee3fce43442 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Tue, 25 Sep 2018 12:51:13 +0300 Subject: [PATCH] MXS-2068: Use RWBackend in schemarouter The schemarouter now uses the RWBackend to track the response states. This fixes the debug assertions that happened with the mxs1113_schemarouter_ps test. --- include/maxscale/protocol/rwbackend.hh | 2 +- .../routing/schemarouter/schemarouter.hh | 5 +- .../schemarouter/schemaroutersession.cc | 111 +++++++++--------- 3 files changed, 57 insertions(+), 61 deletions(-) diff --git a/include/maxscale/protocol/rwbackend.hh b/include/maxscale/protocol/rwbackend.hh index a72602774..3dcbfdef8 100644 --- a/include/maxscale/protocol/rwbackend.hh +++ b/include/maxscale/protocol/rwbackend.hh @@ -47,7 +47,7 @@ public: static SRWBackendList from_servers(SERVER_REF* servers); RWBackend(SERVER_REF* ref); - ~RWBackend(); + virtual ~RWBackend(); inline reply_state_t get_reply_state() const { diff --git a/server/modules/routing/schemarouter/schemarouter.hh b/server/modules/routing/schemarouter/schemarouter.hh index f97de7de2..411d1b667 100644 --- a/server/modules/routing/schemarouter/schemarouter.hh +++ b/server/modules/routing/schemarouter/schemarouter.hh @@ -30,6 +30,7 @@ #include #include #include +#include namespace schemarouter { @@ -96,12 +97,12 @@ struct Stats * * Owned by router client session. */ -class SRBackend : public mxs::Backend +class SRBackend : public mxs::RWBackend { public: SRBackend(SERVER_REF* ref) - : mxs::Backend(ref) + : mxs::RWBackend(ref) , m_mapped(false) { } diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index 41d8e80f9..965ac4229 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -440,7 +440,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) m_load_target = bref->backend()->server; } - MXS_INFO("Route query to \t%s:%d <", bref->backend()->server->address, bref->backend()->server->port); + MXS_INFO("Route query to \t%s %s <", bref->name(), bref->uri()); if (bref->has_session_commands()) { @@ -459,17 +459,25 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) ret = 1; } } - else if (bref->write(pPacket)) - { - /** Add one query response waiter to backend reference */ - mxb::atomic::add(&m_router->m_stats.n_queries, 1, mxb::atomic::RELAXED); - mxb::atomic::add(&bref->server()->stats.packets, 1, mxb::atomic::RELAXED); - ret = 1; - } else { - MXS_ERROR("Routing query failed."); - gwbuf_free(pPacket); + uint8_t cmd = mxs_mysql_get_command(pPacket); + + auto responds = mxs_mysql_command_will_respond(cmd) ? + mxs::Backend::EXPECT_RESPONSE : + mxs::Backend::NO_RESPONSE; + + if (bref->write(pPacket, responds)) + { + /** Add one query response waiter to backend reference */ + mxb::atomic::add(&m_router->m_stats.n_queries, 1, mxb::atomic::RELAXED); + mxb::atomic::add(&bref->server()->stats.packets, 1, mxb::atomic::RELAXED); + ret = 1; + } + else + { + gwbuf_free(pPacket); + } } } @@ -511,35 +519,32 @@ void SchemaRouterSession::handle_mapping_reply(SSRBackend& bref, GWBUF** pPacket void SchemaRouterSession::process_sescmd_response(SSRBackend& bref, GWBUF** ppPacket) { - if (bref->has_session_commands()) - { - mxb_assert(GWBUF_IS_COLLECTED_RESULT(*ppPacket)); - uint8_t command = bref->next_session_command()->get_command(); - uint64_t id = bref->complete_session_command(); - MXS_PS_RESPONSE resp = {}; + mxb_assert(GWBUF_IS_COLLECTED_RESULT(*ppPacket)); + uint8_t command = bref->next_session_command()->get_command(); + uint64_t id = bref->complete_session_command(); + MXS_PS_RESPONSE resp = {}; - if (m_replied_sescmd < m_sent_sescmd && id == m_replied_sescmd + 1) + if (m_replied_sescmd < m_sent_sescmd && id == m_replied_sescmd + 1) + { + if (command == MXS_COM_STMT_PREPARE) { - if (command == MXS_COM_STMT_PREPARE) - { - mxs_mysql_extract_ps_response(*ppPacket, &resp); - MXS_INFO("ID: %lu HANDLE: %lu", (unsigned long)id, (unsigned long)resp.id); - m_shard.add_ps_handle(id, resp.id); - MXS_INFO("STMT SERVER: %s", bref->backend()->server->name); - m_shard.add_statement(id, bref->backend()->server); - uint8_t* ptr = GWBUF_DATA(*ppPacket) + MYSQL_PS_ID_OFFSET; - gw_mysql_set_byte4(ptr, id); - } - /** First reply to this session command, route it to the client */ - ++m_replied_sescmd; - } - else - { - /** The reply to this session command has already been sent to - * the client, discard it */ - gwbuf_free(*ppPacket); - *ppPacket = NULL; + mxs_mysql_extract_ps_response(*ppPacket, &resp); + MXS_INFO("ID: %lu HANDLE: %lu", (unsigned long)id, (unsigned long)resp.id); + m_shard.add_ps_handle(id, resp.id); + MXS_INFO("STMT SERVER: %s", bref->backend()->server->name); + m_shard.add_statement(id, bref->backend()->server); + uint8_t* ptr = GWBUF_DATA(*ppPacket) + MYSQL_PS_ID_OFFSET; + gw_mysql_set_byte4(ptr, id); } + /** First reply to this session command, route it to the client */ + ++m_replied_sescmd; + } + else + { + /** The reply to this session command has already been sent to + * the client, discard it */ + gwbuf_free(*ppPacket); + *ppPacket = NULL; } } @@ -553,13 +558,7 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) return; } - MXS_DEBUG("Reply from [%s] session [%p]" - " mapping [%s] queries queued [%s]", - bref->backend()->server->name, - m_client->session, - m_state & INIT_MAPPING ? "true" : "false", - m_queue.size() == 0 ? "none" : - m_queue.size() > 0 ? "multiple" : "one"); + bref->process_reply(pPacket); if (m_state & INIT_MAPPING) { @@ -574,6 +573,9 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) m_current_db = m_connect_db; mxb_assert(m_state == INIT_READY); + gwbuf_free(pPacket); + pPacket = NULL; + if (m_queue.size()) { route_queued_query(); @@ -585,23 +587,14 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) mxb_assert(m_state == INIT_READY); route_queued_query(); } - else + else if (bref->reply_is_complete()) { - process_sescmd_response(bref, &pPacket); - - if (bref->is_waiting_result()) + if (bref->has_session_commands()) { - /** Set response status as replied */ - bref->ack_write(); + process_sescmd_response(bref, &pPacket); } - if (pPacket) - { - MXS_SESSION_ROUTE_REPLY(pDcb->session, pPacket); - pPacket = NULL; - } - - if (bref->execute_session_command()) + if (bref->has_session_commands() && bref->execute_session_command()) { MXS_INFO("Backend %s:%d processed reply and starts to execute active cursor.", bref->backend()->server->address, @@ -613,7 +606,10 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) } } - gwbuf_free(pPacket); + if (pPacket) + { + MXS_SESSION_ROUTE_REPLY(pDcb->session, pPacket); + } } void SchemaRouterSession::handleError(GWBUF* pMessage, @@ -1035,7 +1031,6 @@ int SchemaRouterSession::inspect_mapping_states(SSRBackend& bref, if (rc == SHOWDB_FULL_RESPONSE) { (*it)->set_mapped(true); - (*it)->ack_write(); MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p", (*it)->backend()->server->name, m_client->session);