MXS-2068: Use RWBackend in schemarouter

The schemarouter now uses the RWBackend to track the response states. This
fixes the debug assertions that happened with the mxs1113_schemarouter_ps
test.
This commit is contained in:
Markus Mäkelä 2018-09-25 12:51:13 +03:00
parent 24b438c9b6
commit 548d121699
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19
3 changed files with 57 additions and 61 deletions

View File

@ -47,7 +47,7 @@ public:
static SRWBackendList from_servers(SERVER_REF* servers);
RWBackend(SERVER_REF* ref);
~RWBackend();
virtual ~RWBackend();
inline reply_state_t get_reply_state() const
{

View File

@ -30,6 +30,7 @@
#include <maxscale/pcre2.h>
#include <maxscale/service.h>
#include <maxscale/backend.hh>
#include <maxscale/protocol/rwbackend.hh>
namespace schemarouter
{
@ -96,12 +97,12 @@ struct Stats
*
* Owned by router client session.
*/
class SRBackend : public mxs::Backend
class SRBackend : public mxs::RWBackend
{
public:
SRBackend(SERVER_REF* ref)
: mxs::Backend(ref)
: mxs::RWBackend(ref)
, m_mapped(false)
{
}

View File

@ -440,7 +440,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
m_load_target = bref->backend()->server;
}
MXS_INFO("Route query to \t%s:%d <", bref->backend()->server->address, bref->backend()->server->port);
MXS_INFO("Route query to \t%s %s <", bref->name(), bref->uri());
if (bref->has_session_commands())
{
@ -459,17 +459,25 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
ret = 1;
}
}
else if (bref->write(pPacket))
{
/** Add one query response waiter to backend reference */
mxb::atomic::add(&m_router->m_stats.n_queries, 1, mxb::atomic::RELAXED);
mxb::atomic::add(&bref->server()->stats.packets, 1, mxb::atomic::RELAXED);
ret = 1;
}
else
{
MXS_ERROR("Routing query failed.");
gwbuf_free(pPacket);
uint8_t cmd = mxs_mysql_get_command(pPacket);
auto responds = mxs_mysql_command_will_respond(cmd) ?
mxs::Backend::EXPECT_RESPONSE :
mxs::Backend::NO_RESPONSE;
if (bref->write(pPacket, responds))
{
/** Add one query response waiter to backend reference */
mxb::atomic::add(&m_router->m_stats.n_queries, 1, mxb::atomic::RELAXED);
mxb::atomic::add(&bref->server()->stats.packets, 1, mxb::atomic::RELAXED);
ret = 1;
}
else
{
gwbuf_free(pPacket);
}
}
}
@ -511,35 +519,32 @@ void SchemaRouterSession::handle_mapping_reply(SSRBackend& bref, GWBUF** pPacket
void SchemaRouterSession::process_sescmd_response(SSRBackend& bref, GWBUF** ppPacket)
{
if (bref->has_session_commands())
{
mxb_assert(GWBUF_IS_COLLECTED_RESULT(*ppPacket));
uint8_t command = bref->next_session_command()->get_command();
uint64_t id = bref->complete_session_command();
MXS_PS_RESPONSE resp = {};
mxb_assert(GWBUF_IS_COLLECTED_RESULT(*ppPacket));
uint8_t command = bref->next_session_command()->get_command();
uint64_t id = bref->complete_session_command();
MXS_PS_RESPONSE resp = {};
if (m_replied_sescmd < m_sent_sescmd && id == m_replied_sescmd + 1)
if (m_replied_sescmd < m_sent_sescmd && id == m_replied_sescmd + 1)
{
if (command == MXS_COM_STMT_PREPARE)
{
if (command == MXS_COM_STMT_PREPARE)
{
mxs_mysql_extract_ps_response(*ppPacket, &resp);
MXS_INFO("ID: %lu HANDLE: %lu", (unsigned long)id, (unsigned long)resp.id);
m_shard.add_ps_handle(id, resp.id);
MXS_INFO("STMT SERVER: %s", bref->backend()->server->name);
m_shard.add_statement(id, bref->backend()->server);
uint8_t* ptr = GWBUF_DATA(*ppPacket) + MYSQL_PS_ID_OFFSET;
gw_mysql_set_byte4(ptr, id);
}
/** First reply to this session command, route it to the client */
++m_replied_sescmd;
}
else
{
/** The reply to this session command has already been sent to
* the client, discard it */
gwbuf_free(*ppPacket);
*ppPacket = NULL;
mxs_mysql_extract_ps_response(*ppPacket, &resp);
MXS_INFO("ID: %lu HANDLE: %lu", (unsigned long)id, (unsigned long)resp.id);
m_shard.add_ps_handle(id, resp.id);
MXS_INFO("STMT SERVER: %s", bref->backend()->server->name);
m_shard.add_statement(id, bref->backend()->server);
uint8_t* ptr = GWBUF_DATA(*ppPacket) + MYSQL_PS_ID_OFFSET;
gw_mysql_set_byte4(ptr, id);
}
/** First reply to this session command, route it to the client */
++m_replied_sescmd;
}
else
{
/** The reply to this session command has already been sent to
* the client, discard it */
gwbuf_free(*ppPacket);
*ppPacket = NULL;
}
}
@ -553,13 +558,7 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
return;
}
MXS_DEBUG("Reply from [%s] session [%p]"
" mapping [%s] queries queued [%s]",
bref->backend()->server->name,
m_client->session,
m_state & INIT_MAPPING ? "true" : "false",
m_queue.size() == 0 ? "none" :
m_queue.size() > 0 ? "multiple" : "one");
bref->process_reply(pPacket);
if (m_state & INIT_MAPPING)
{
@ -574,6 +573,9 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
m_current_db = m_connect_db;
mxb_assert(m_state == INIT_READY);
gwbuf_free(pPacket);
pPacket = NULL;
if (m_queue.size())
{
route_queued_query();
@ -585,23 +587,14 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
mxb_assert(m_state == INIT_READY);
route_queued_query();
}
else
else if (bref->reply_is_complete())
{
process_sescmd_response(bref, &pPacket);
if (bref->is_waiting_result())
if (bref->has_session_commands())
{
/** Set response status as replied */
bref->ack_write();
process_sescmd_response(bref, &pPacket);
}
if (pPacket)
{
MXS_SESSION_ROUTE_REPLY(pDcb->session, pPacket);
pPacket = NULL;
}
if (bref->execute_session_command())
if (bref->has_session_commands() && bref->execute_session_command())
{
MXS_INFO("Backend %s:%d processed reply and starts to execute active cursor.",
bref->backend()->server->address,
@ -613,7 +606,10 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
}
}
gwbuf_free(pPacket);
if (pPacket)
{
MXS_SESSION_ROUTE_REPLY(pDcb->session, pPacket);
}
}
void SchemaRouterSession::handleError(GWBUF* pMessage,
@ -1035,7 +1031,6 @@ int SchemaRouterSession::inspect_mapping_states(SSRBackend& bref,
if (rc == SHOWDB_FULL_RESPONSE)
{
(*it)->set_mapped(true);
(*it)->ack_write();
MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p",
(*it)->backend()->server->name,
m_client->session);