From 13f7015e7ba73d70eaf9590440daef3fef04a3c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 25 Aug 2017 14:23:14 +0300 Subject: [PATCH] Fix EOF packet calculation for large rows The EOF packet calculation function in modutil.cc didn't handle the case where the payload exceeded maximum packet size and could mistake binary data for a ERR packet. The state of a multi-packet payload is now exposed by the modutil_count_signal_packets function. This allows proper handling of large multi-packet payloads. Added minor improvements to mxs1110_16mb to handle testing of this change. --- include/maxscale/modutil.h | 8 ++++- maxscale-system-test/mxs1110_16mb.cpp | 2 ++ server/core/modutil.cc | 33 +++++++++++++++---- .../MySQL/MySQLBackend/mysql_backend.c | 4 +-- .../routing/readwritesplit/readwritesplit.cc | 4 ++- .../routing/readwritesplit/rwsplitsession.cc | 13 +++++++- .../routing/readwritesplit/rwsplitsession.hh | 6 ++++ 7 files changed, 58 insertions(+), 12 deletions(-) diff --git a/include/maxscale/modutil.h b/include/maxscale/modutil.h index ad6a9c17e..35a36d28f 100644 --- a/include/maxscale/modutil.h +++ b/include/maxscale/modutil.h @@ -59,13 +59,19 @@ GWBUF* modutil_create_mysql_err_msg(int packet_number, * whole packets. If partial packets are in the buffer, they are ignored. * The caller must handle the detection of partial packets in buffers. * + * On the first invocation, the value pointed by @c skip should be set to false. + * On all subsequent calls, for partial result sets, the function uses it to + * store the internal state. When the value pointed by @c skip is set to true, + * the next call must be done with only unprocessed packets in @c reply. + * * @param reply Buffer to use * @param n_found Number of previous found packets * @param more Set to true of more results exist + * @param skip Internal state of the function used for handling large payloads. * * @return Total number of EOF and ERR packets including the ones already found */ -int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more); +int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more, bool* skip); mxs_pcre2_result_t modutil_mysql_wildcard_match(const char* pattern, const char* string); diff --git a/maxscale-system-test/mxs1110_16mb.cpp b/maxscale-system-test/mxs1110_16mb.cpp index a0b014075..5497373af 100644 --- a/maxscale-system-test/mxs1110_16mb.cpp +++ b/maxscale-system-test/mxs1110_16mb.cpp @@ -13,6 +13,7 @@ int main(int argc, char *argv[]) { TestConnections::skip_maxscale_start(true); TestConnections * Test = new TestConnections(argc, argv); + Test->stop_maxscale(); Test->set_timeout(60); int chunk_size = 2500000; int chunk_num = 5; @@ -35,6 +36,7 @@ int main(int argc, char *argv[]) Test->repl->close_connections(); Test->close_maxscale_connections(); + Test->repl->sync_slaves(); Test->connect_maxscale(); Test->tprintf("Checking data via RWSplit\n"); check_longblob_data(Test, Test->conn_rwsplit, chunk_size, chunk_num, 2); diff --git a/server/core/modutil.cc b/server/core/modutil.cc index 978be9af3..3c83e3eb0 100644 --- a/server/core/modutil.cc +++ b/server/core/modutil.cc @@ -636,12 +636,13 @@ GWBUF* modutil_get_complete_packets(GWBUF **p_readbuf) return complete; } -int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more) +int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more, bool* skip) { unsigned int len = gwbuf_length(reply); int eof = 0; int err = 0; size_t offset = 0; + bool skip_next = skip ? *skip : false; while (offset < len) { @@ -649,16 +650,29 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more) gwbuf_copy_data(reply, offset, MYSQL_HEADER_LEN + 1, header); - unsigned int pktlen = MYSQL_GET_PAYLOAD_LEN(header) + MYSQL_HEADER_LEN; + unsigned int payloadlen = MYSQL_GET_PAYLOAD_LEN(header); + unsigned int pktlen = payloadlen + MYSQL_HEADER_LEN; - if (MYSQL_GET_COMMAND(header) == MYSQL_REPLY_ERR) + if (payloadlen == GW_MYSQL_MAX_PACKET_LEN) { - err++; + skip_next = true; } - else if (MYSQL_GET_COMMAND(header) == MYSQL_REPLY_EOF && - pktlen == 5 + MYSQL_HEADER_LEN) + else if (skip_next) { - eof++; + skip_next = false; + } + else + { + uint8_t command = MYSQL_GET_COMMAND(header); + + if (command == MYSQL_REPLY_ERR) + { + err++; + } + else if (command == MYSQL_REPLY_EOF && pktlen == MYSQL_EOF_PACKET_LEN) + { + eof++; + } } if (offset + pktlen >= len || (eof + err + n_found) >= 2) @@ -675,6 +689,11 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more) int total = err + eof + n_found; + if (skip) + { + *skip = skip_next; + } + return total; } diff --git a/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c b/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c index 7cb821060..97ab8c433 100644 --- a/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c +++ b/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c @@ -645,7 +645,7 @@ static inline bool complete_ps_response(GWBUF *buffer) } bool more; - int n_eof = modutil_count_signal_packets(buffer, 0, &more); + int n_eof = modutil_count_signal_packets(buffer, 0, &more, NULL); MXS_DEBUG("Expecting %u EOF, have %u", n_eof, expected_eof); @@ -740,7 +740,7 @@ gw_read_and_write(DCB *dcb) mxs_mysql_is_result_set(read_buffer)) { bool more = false; - if (modutil_count_signal_packets(read_buffer, 0, &more) != 2) + if (modutil_count_signal_packets(read_buffer, 0, &more, NULL) != 2) { dcb->dcb_readqueue = gwbuf_append(read_buffer, dcb->dcb_readqueue); return 0; diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index a7171422b..492324e60 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -523,8 +523,10 @@ bool reply_is_complete(SRWBackend backend, GWBUF *buffer) else { bool more = false; + bool skip = backend->get_skip_packet(); int old_eof = backend->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0; - int n_eof = modutil_count_signal_packets(buffer, old_eof, &more); + int n_eof = modutil_count_signal_packets(buffer, old_eof, &more, &skip); + backend->set_skip_packet(skip); if (n_eof == 0) { diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 1007296c6..a7825e565 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -16,7 +16,8 @@ RWBackend::RWBackend(SERVER_REF* ref): mxs::Backend(ref), - m_reply_state(REPLY_STATE_DONE) + m_reply_state(REPLY_STATE_DONE), + m_skip(false) { } @@ -34,6 +35,16 @@ void RWBackend::set_reply_state(reply_state_t state) m_reply_state = state; } +void RWBackend::set_skip_packet(bool state) +{ + m_skip = state; +} + +bool RWBackend::get_skip_packet() const +{ + return m_skip; +} + bool RWBackend::execute_session_command() { bool expect_response = mxs_mysql_command_will_respond(next_session_command()->get_command()); diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 5768bf6bd..53a4daa68 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -50,9 +50,15 @@ public: bool execute_session_command(); bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE); + void set_skip_packet(bool state); + bool get_skip_packet() const; + private: reply_state_t m_reply_state; BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */ + bool m_skip; /**< Used to store the state of the EOF packet + * calculation for result sets when the result + * contains very large rows */ }; typedef std::tr1::shared_ptr SRWBackend;