From a32361e89407fa85a56656c43f65e748bde87c96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Mon, 24 Sep 2018 13:57:55 +0300 Subject: [PATCH] MXS-2068: Move common functionality into RWBackend The RWBackend now updates the internal state when a new write is done in addition to acknowledging it when the reply is complete. --- include/maxscale/protocol/rwbackend.hh | 10 +++--- server/modules/protocol/MySQL/rwbackend.cc | 17 ++++++++-- server/modules/routing/cat/catsession.cc | 3 -- .../readwritesplit/rwsplit_route_stmt.cc | 33 ++++++++++--------- .../routing/readwritesplit/rwsplitsession.cc | 3 +- 5 files changed, 39 insertions(+), 27 deletions(-) diff --git a/include/maxscale/protocol/rwbackend.hh b/include/maxscale/protocol/rwbackend.hh index 2946012b0..1f15a51e4 100644 --- a/include/maxscale/protocol/rwbackend.hh +++ b/include/maxscale/protocol/rwbackend.hh @@ -54,11 +54,6 @@ public: return m_reply_state; } - inline void set_reply_state(reply_state_t state) - { - m_reply_state = state; - } - void add_ps_handle(uint32_t id, uint32_t handle); uint32_t get_ps_handle(uint32_t id) const; @@ -132,5 +127,10 @@ private: { m_opening_cursor = false; } + + inline void set_reply_state(reply_state_t state) + { + m_reply_state = state; + } }; } diff --git a/server/modules/protocol/MySQL/rwbackend.cc b/server/modules/protocol/MySQL/rwbackend.cc index e57b811fc..70289a380 100644 --- a/server/modules/protocol/MySQL/rwbackend.cc +++ b/server/modules/protocol/MySQL/rwbackend.cc @@ -74,6 +74,12 @@ uint32_t RWBackend::get_ps_handle(uint32_t id) const bool RWBackend::write(GWBUF* buffer, response_type type) { + if (type == mxs::Backend::EXPECT_RESPONSE) + { + /** The server will reply to this command */ + set_reply_state(REPLY_STATE_START); + } + uint8_t cmd = mxs_mysql_get_command(buffer); m_command = cmd; @@ -148,7 +154,6 @@ bool RWBackend::reply_is_complete(GWBUF* buffer) // If the server responded with an error, n_eof > 0 if (n_eof > 0 || consume_fetched_rows(buffer)) { - set_reply_state(REPLY_STATE_DONE); } } @@ -240,7 +245,15 @@ bool RWBackend::reply_is_complete(GWBUF* buffer) } } - return get_reply_state() == REPLY_STATE_DONE; + bool rval = false; + + if (get_reply_state() == REPLY_STATE_DONE) + { + ack_write(); + rval = true; + } + + return rval; } ResponseStat& RWBackend::response_stat() diff --git a/server/modules/routing/cat/catsession.cc b/server/modules/routing/cat/catsession.cc index beffe05a8..049b62de2 100644 --- a/server/modules/routing/cat/catsession.cc +++ b/server/modules/routing/cat/catsession.cc @@ -61,7 +61,6 @@ int32_t CatSession::routeQuery(GWBUF* pPacket) // We have a backend, write the query only to this one. It will be // propagated onwards in clientReply. rval = (*m_current)->write(gwbuf_clone(pPacket)); - (*m_current)->set_reply_state(REPLY_STATE_START); } return rval; @@ -75,7 +74,6 @@ void CatSession::clientReply(GWBUF* pPacket, DCB* pDcb) if (backend->reply_is_complete(pPacket)) { - backend->ack_write(); m_completed++; m_current++; @@ -88,7 +86,6 @@ void CatSession::clientReply(GWBUF* pPacket, DCB* pDcb) else { (*m_current)->write(gwbuf_clone(m_query)); - (*m_current)->set_reply_state(REPLY_STATE_START); } } diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index fcf4b07c2..f1e7ac45f 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -1086,6 +1086,15 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool bool large_query = is_large_query(querybuf); + /** + * We should not be routing a query to a server that is busy processing a result. + * + * TODO: This effectively disables pipelining of queries, very bad for batch insert performance. Replace + * with proper, per server tracking of which responses need to be sent to the client. This would + * also solve MXS-2009 by speeding up session commands. + */ + mxb_assert(target->get_reply_state() == REPLY_STATE_DONE || m_qc.large_query()); + /** * If we are starting a new query, we use RWBackend::write, otherwise we use * RWBackend::continue_write to continue an ongoing query. RWBackend::write @@ -1107,24 +1116,18 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool mxb::atomic::add(&target->server()->stats.packets, 1, mxb::atomic::RELAXED); m_router->server_stats(target->server()).total++; - if (!m_qc.large_query()) + if (!m_qc.large_query() && response == mxs::Backend::EXPECT_RESPONSE) { - mxb_assert(target->get_reply_state() == REPLY_STATE_DONE); + /** The server will reply to this command */ + m_expected_responses++; - if (response == mxs::Backend::EXPECT_RESPONSE) + if (m_qc.load_data_state() == QueryClassifier::LOAD_DATA_END) { - /** The server will reply to this command */ - target->set_reply_state(REPLY_STATE_START); - m_expected_responses++; - - if (m_qc.load_data_state() == QueryClassifier::LOAD_DATA_END) - { - /** The final packet in a LOAD DATA LOCAL INFILE is an empty packet - * to which the server responds with an OK or an ERR packet */ - mxb_assert(gwbuf_length(querybuf) == 4); - m_qc.set_load_data_state(QueryClassifier::LOAD_DATA_INACTIVE); - session_set_load_active(m_pSession, false); - } + /** The final packet in a LOAD DATA LOCAL INFILE is an empty packet + * to which the server responds with an OK or an ERR packet */ + mxb_assert(gwbuf_length(querybuf) == 4); + m_qc.set_load_data_state(QueryClassifier::LOAD_DATA_INACTIVE); + session_set_load_active(m_pSession, false); } } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index e0ad131db..25c38e138 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -562,8 +562,7 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb) if (backend->reply_is_complete(writebuf)) { - /** Got a complete reply, acknowledge the write and decrement expected response count */ - backend->ack_write(); + /** Got a complete reply, decrement expected response count */ m_expected_responses--; mxb_assert(m_expected_responses >= 0); mxb_assert(backend->get_reply_state() == REPLY_STATE_DONE);