MXS-2068: Move common functionality into RWBackend

The RWBackend now updates the internal state when a new write is done in
addition to acknowledging it when the reply is complete.
This commit is contained in:
Markus Mäkelä
2018-09-24 13:57:55 +03:00
parent 09a64753f1
commit a32361e894
5 changed files with 39 additions and 27 deletions

View File

@ -54,11 +54,6 @@ public:
return m_reply_state; return m_reply_state;
} }
inline void set_reply_state(reply_state_t state)
{
m_reply_state = state;
}
void add_ps_handle(uint32_t id, uint32_t handle); void add_ps_handle(uint32_t id, uint32_t handle);
uint32_t get_ps_handle(uint32_t id) const; uint32_t get_ps_handle(uint32_t id) const;
@ -132,5 +127,10 @@ private:
{ {
m_opening_cursor = false; m_opening_cursor = false;
} }
inline void set_reply_state(reply_state_t state)
{
m_reply_state = state;
}
}; };
} }

View File

@ -74,6 +74,12 @@ uint32_t RWBackend::get_ps_handle(uint32_t id) const
bool RWBackend::write(GWBUF* buffer, response_type type) bool RWBackend::write(GWBUF* buffer, response_type type)
{ {
if (type == mxs::Backend::EXPECT_RESPONSE)
{
/** The server will reply to this command */
set_reply_state(REPLY_STATE_START);
}
uint8_t cmd = mxs_mysql_get_command(buffer); uint8_t cmd = mxs_mysql_get_command(buffer);
m_command = cmd; m_command = cmd;
@ -148,7 +154,6 @@ bool RWBackend::reply_is_complete(GWBUF* buffer)
// If the server responded with an error, n_eof > 0 // If the server responded with an error, n_eof > 0
if (n_eof > 0 || consume_fetched_rows(buffer)) if (n_eof > 0 || consume_fetched_rows(buffer))
{ {
set_reply_state(REPLY_STATE_DONE); set_reply_state(REPLY_STATE_DONE);
} }
} }
@ -240,7 +245,15 @@ bool RWBackend::reply_is_complete(GWBUF* buffer)
} }
} }
return get_reply_state() == REPLY_STATE_DONE; bool rval = false;
if (get_reply_state() == REPLY_STATE_DONE)
{
ack_write();
rval = true;
}
return rval;
} }
ResponseStat& RWBackend::response_stat() ResponseStat& RWBackend::response_stat()

View File

@ -61,7 +61,6 @@ int32_t CatSession::routeQuery(GWBUF* pPacket)
// We have a backend, write the query only to this one. It will be // We have a backend, write the query only to this one. It will be
// propagated onwards in clientReply. // propagated onwards in clientReply.
rval = (*m_current)->write(gwbuf_clone(pPacket)); rval = (*m_current)->write(gwbuf_clone(pPacket));
(*m_current)->set_reply_state(REPLY_STATE_START);
} }
return rval; return rval;
@ -75,7 +74,6 @@ void CatSession::clientReply(GWBUF* pPacket, DCB* pDcb)
if (backend->reply_is_complete(pPacket)) if (backend->reply_is_complete(pPacket))
{ {
backend->ack_write();
m_completed++; m_completed++;
m_current++; m_current++;
@ -88,7 +86,6 @@ void CatSession::clientReply(GWBUF* pPacket, DCB* pDcb)
else else
{ {
(*m_current)->write(gwbuf_clone(m_query)); (*m_current)->write(gwbuf_clone(m_query));
(*m_current)->set_reply_state(REPLY_STATE_START);
} }
} }

View File

@ -1086,6 +1086,15 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool
bool large_query = is_large_query(querybuf); bool large_query = is_large_query(querybuf);
/**
* We should not be routing a query to a server that is busy processing a result.
*
* TODO: This effectively disables pipelining of queries, very bad for batch insert performance. Replace
* with proper, per server tracking of which responses need to be sent to the client. This would
* also solve MXS-2009 by speeding up session commands.
*/
mxb_assert(target->get_reply_state() == REPLY_STATE_DONE || m_qc.large_query());
/** /**
* If we are starting a new query, we use RWBackend::write, otherwise we use * If we are starting a new query, we use RWBackend::write, otherwise we use
* RWBackend::continue_write to continue an ongoing query. RWBackend::write * RWBackend::continue_write to continue an ongoing query. RWBackend::write
@ -1107,14 +1116,9 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool
mxb::atomic::add(&target->server()->stats.packets, 1, mxb::atomic::RELAXED); mxb::atomic::add(&target->server()->stats.packets, 1, mxb::atomic::RELAXED);
m_router->server_stats(target->server()).total++; m_router->server_stats(target->server()).total++;
if (!m_qc.large_query()) if (!m_qc.large_query() && response == mxs::Backend::EXPECT_RESPONSE)
{
mxb_assert(target->get_reply_state() == REPLY_STATE_DONE);
if (response == mxs::Backend::EXPECT_RESPONSE)
{ {
/** The server will reply to this command */ /** The server will reply to this command */
target->set_reply_state(REPLY_STATE_START);
m_expected_responses++; m_expected_responses++;
if (m_qc.load_data_state() == QueryClassifier::LOAD_DATA_END) if (m_qc.load_data_state() == QueryClassifier::LOAD_DATA_END)
@ -1126,7 +1130,6 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool
session_set_load_active(m_pSession, false); session_set_load_active(m_pSession, false);
} }
} }
}
m_qc.set_large_query(large_query); m_qc.set_large_query(large_query);

View File

@ -562,8 +562,7 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb)
if (backend->reply_is_complete(writebuf)) if (backend->reply_is_complete(writebuf))
{ {
/** Got a complete reply, acknowledge the write and decrement expected response count */ /** Got a complete reply, decrement expected response count */
backend->ack_write();
m_expected_responses--; m_expected_responses--;
mxb_assert(m_expected_responses >= 0); mxb_assert(m_expected_responses >= 0);
mxb_assert(backend->get_reply_state() == REPLY_STATE_DONE); mxb_assert(backend->get_reply_state() == REPLY_STATE_DONE);