From a6eeed98feb84e28a3b0b2aab4dd3a506a02c617 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 6 Oct 2017 03:06:50 +0300 Subject: [PATCH] Fix handling of collected results The result collection did not reset properly when a non-resultset was returned for a request. As collected result need to be distinguishable from single packet responses, a new buffer type was added. The new buffer type is used by readwritesplit which uses result collection for preparation of prepared statements. Moved the current command tracking to the RWBackend class as the command tracked by the protocol is can change before a response to the executed command is received. Removed a false debug assertion in the mxs_mysql_extract_ps_response function that was triggered when a very large prepared statement response was processed in multiple parts. --- include/maxscale/buffer.h | 4 ++- .../MySQL/MySQLBackend/mysql_backend.c | 20 +++++++---- server/modules/protocol/MySQL/mysql_common.cc | 17 --------- .../routing/readwritesplit/readwritesplit.cc | 36 +++++++++++++------ .../routing/readwritesplit/rwsplitsession.cc | 2 ++ .../routing/readwritesplit/rwsplitsession.hh | 6 ++++ 6 files changed, 50 insertions(+), 35 deletions(-) diff --git a/include/maxscale/buffer.h b/include/maxscale/buffer.h index 362a49a3a..dd2caf59b 100644 --- a/include/maxscale/buffer.h +++ b/include/maxscale/buffer.h @@ -57,7 +57,8 @@ typedef enum GWBUF_TYPE_SESCMD = 0x04, GWBUF_TYPE_HTTP = 0x08, GWBUF_TYPE_IGNORABLE = 0x10, - GWBUF_TYPE_COLLECT_RESULT = 0x20 + GWBUF_TYPE_COLLECT_RESULT = 0x20, + GWBUF_TYPE_RESULT = 0x40, } gwbuf_type_t; #define GWBUF_IS_TYPE_UNDEFINED(b) (b->gwbuf_type == 0) @@ -65,6 +66,7 @@ typedef enum #define GWBUF_IS_TYPE_RESPONSE_END(b) (b->gwbuf_type & GWBUF_TYPE_RESPONSE_END) #define GWBUF_IS_TYPE_SESCMD(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD) #define GWBUF_IS_IGNORABLE(b) (b->gwbuf_type & GWBUF_TYPE_IGNORABLE) +#define GWBUF_IS_COLLECTED_RESULT(b) (b->gwbuf_type & GWBUF_TYPE_RESULT) #define GWBUF_SHOULD_COLLECT_RESULT(b) (b->gwbuf_type & GWBUF_TYPE_COLLECT_RESULT) typedef enum diff --git a/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c b/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c index df66e2b0d..6dcdb18ca 100644 --- a/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c +++ b/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c @@ -758,14 +758,16 @@ gw_read_and_write(DCB *dcb) if (collecting_resultset(proto, capabilities)) { - if (expecting_resultset(proto) && - mxs_mysql_is_result_set(read_buffer)) + if (expecting_resultset(proto)) { - bool more = false; - if (modutil_count_signal_packets(read_buffer, 0, &more, NULL) != 2) + if (mxs_mysql_is_result_set(read_buffer)) { - dcb_readq_prepend(dcb, read_buffer); - return 0; + bool more = false; + if (modutil_count_signal_packets(read_buffer, 0, &more, NULL) != 2) + { + dcb_readq_prepend(dcb, read_buffer); + return 0; + } } // Collected the complete result @@ -937,6 +939,12 @@ gw_read_and_write(DCB *dcb) if (session_ok_to_route(dcb)) { + if (result_collected) + { + // Mark that this is a buffer containing a collected result + gwbuf_set_type(stmt, GWBUF_TYPE_RESULT); + } + session->service->router->clientReply(session->service->router_instance, session->router_session, stmt, dcb); diff --git a/server/modules/protocol/MySQL/mysql_common.cc b/server/modules/protocol/MySQL/mysql_common.cc index c4c90255c..404c56ae8 100644 --- a/server/modules/protocol/MySQL/mysql_common.cc +++ b/server/modules/protocol/MySQL/mysql_common.cc @@ -1648,23 +1648,6 @@ bool mxs_mysql_extract_ps_response(GWBUF* buffer, MXS_PS_RESPONSE* out) out->parameters = gw_mysql_get_byte2(params); out->warnings = gw_mysql_get_byte2(warnings); rval = true; - -#ifdef SS_DEBUG - // Make sure that the PS response contains the whole response - bool more; - modutil_state state; - int n_eof = modutil_count_signal_packets(buffer, 0, &more, &state); - int n_expected = 0; - if (out->columns) - { - n_expected++; - } - if (out->parameters) - { - n_expected++; - } - ss_dassert(n_eof == n_expected); -#endif } return rval; diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index a8ef8ffcd..b98465972 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -515,6 +515,11 @@ static inline bool is_eof(GWBUF* buffer) gw_mysql_get_byte3(data) + MYSQL_HEADER_LEN == MYSQL_EOF_PACKET_LEN; } +static inline bool is_ok(GWBUF* buffer) +{ + uint8_t* data = GWBUF_DATA(buffer); + return data[MYSQL_HEADER_LEN] == MYSQL_REPLY_OK; +} static inline bool is_large(GWBUF* buffer) { return gw_mysql_get_byte3(GWBUF_DATA(buffer)) == GW_MYSQL_MAX_PACKET_LEN; @@ -549,11 +554,6 @@ static inline bool is_result_set(GWBUF *buffer) return rval; } -static inline uint8_t get_cmd(SRWBackend& backend) -{ - return mxs_mysql_current_command(backend->dcb()->session); -} - /** * @brief Check if we have received a complete reply from the backend * @@ -564,9 +564,21 @@ static inline uint8_t get_cmd(SRWBackend& backend) */ bool reply_is_complete(SRWBackend backend, GWBUF *buffer) { - if (backend->get_reply_state() == REPLY_STATE_START && !is_result_set(buffer)) + if (GWBUF_IS_COLLECTED_RESULT(buffer)) { - if (!more_results_exist(buffer) || get_cmd(backend) == MXS_COM_STMT_PREPARE) + // This branch should only be taken with a PS response + ss_dassert(backend->get_reply_state() == REPLY_STATE_START); + ss_dassert(backend->current_command() == MXS_COM_STMT_PREPARE || + backend->current_command() == MXS_COM_QUERY); + + // This is a complete result of a request + LOG_RS(backend, REPLY_STATE_DONE); + backend->set_reply_state(REPLY_STATE_DONE); + } + else if (backend->get_reply_state() == REPLY_STATE_START && !is_result_set(buffer)) + { + if (backend->current_command() == MXS_COM_STMT_PREPARE || + !is_ok(buffer) || !more_results_exist(buffer)) { /** Not a result set, we have the complete response */ LOG_RS(backend, REPLY_STATE_DONE); @@ -599,7 +611,7 @@ bool reply_is_complete(SRWBackend backend, GWBUF *buffer) LOG_RS(backend, REPLY_STATE_RSET_COLDEF); backend->set_reply_state(REPLY_STATE_RSET_COLDEF); } - else if (n_eof == 1 && get_cmd(backend) != MXS_COM_FIELD_LIST) + else if (n_eof == 1 && backend->current_command() != MXS_COM_FIELD_LIST) { /** Waiting for the EOF packet after the rows */ LOG_RS(backend, REPLY_STATE_RSET_ROWS); @@ -609,7 +621,7 @@ bool reply_is_complete(SRWBackend backend, GWBUF *buffer) { /** We either have a complete result set or a response to * a COM_FIELD_LIST command */ - ss_dassert(n_eof == 2 || (n_eof == 1 && get_cmd(backend) == MXS_COM_FIELD_LIST)); + ss_dassert(n_eof == 2 || (n_eof == 1 && backend->current_command() == MXS_COM_FIELD_LIST)); LOG_RS(backend, REPLY_STATE_DONE); backend->set_reply_state(REPLY_STATE_DONE); @@ -1172,8 +1184,10 @@ static void clientReply(MXS_ROUTER *instance, GWBUF *writebuf, DCB *backend_dcb) { - ss_dassert(GWBUF_IS_CONTIGUOUS(writebuf) && - MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(writebuf)) + MYSQL_HEADER_LEN == gwbuf_length(writebuf)); + ss_dassert((GWBUF_IS_CONTIGUOUS(writebuf) && + MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(writebuf)) + + MYSQL_HEADER_LEN == gwbuf_length(writebuf)) || + GWBUF_IS_COLLECTED_RESULT(writebuf)); RWSplitSession *rses = (RWSplitSession *)router_session; DCB *client_dcb = backend_dcb->session->client_dcb; diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 0fe945494..88456f4c6 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -60,6 +60,8 @@ bool RWBackend::write(GWBUF* buffer, response_type type) { uint8_t cmd = mxs_mysql_get_command(buffer); + m_command = cmd; + if (is_ps_command(cmd)) { uint32_t id = mxs_mysql_extract_ps_id(buffer); diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 3f48170fc..29511104d 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -69,12 +69,18 @@ public: return m_large_packet; } + inline uint8_t current_command() const + { + return m_command; + } + private: reply_state_t m_reply_state; BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */ bool m_large_packet; /**< Used to store the state of the EOF packet *calculation for result sets when the result * contains very large rows */ + uint8_t m_command; }; typedef std::tr1::shared_ptr SRWBackend;