diff --git a/include/maxscale/modutil.h b/include/maxscale/modutil.h index ad6a9c17e..35a36d28f 100644 --- a/include/maxscale/modutil.h +++ b/include/maxscale/modutil.h @@ -59,13 +59,19 @@ GWBUF* modutil_create_mysql_err_msg(int packet_number, * whole packets. If partial packets are in the buffer, they are ignored. * The caller must handle the detection of partial packets in buffers. * + * On the first invocation, the value pointed by @c skip should be set to false. + * On all subsequent calls, for partial result sets, the function uses it to + * store the internal state. When the value pointed by @c skip is set to true, + * the next call must be done with only unprocessed packets in @c reply. + * * @param reply Buffer to use * @param n_found Number of previous found packets * @param more Set to true of more results exist + * @param skip Internal state of the function used for handling large payloads. * * @return Total number of EOF and ERR packets including the ones already found */ -int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more); +int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more, bool* skip); mxs_pcre2_result_t modutil_mysql_wildcard_match(const char* pattern, const char* string); diff --git a/maxscale-system-test/mxs1110_16mb.cpp b/maxscale-system-test/mxs1110_16mb.cpp index a0b014075..5497373af 100644 --- a/maxscale-system-test/mxs1110_16mb.cpp +++ b/maxscale-system-test/mxs1110_16mb.cpp @@ -13,6 +13,7 @@ int main(int argc, char *argv[]) { TestConnections::skip_maxscale_start(true); TestConnections * Test = new TestConnections(argc, argv); + Test->stop_maxscale(); Test->set_timeout(60); int chunk_size = 2500000; int chunk_num = 5; @@ -35,6 +36,7 @@ int main(int argc, char *argv[]) Test->repl->close_connections(); Test->close_maxscale_connections(); + Test->repl->sync_slaves(); Test->connect_maxscale(); Test->tprintf("Checking data via RWSplit\n"); check_longblob_data(Test, Test->conn_rwsplit, chunk_size, chunk_num, 2); diff --git a/server/core/modutil.cc b/server/core/modutil.cc index 978be9af3..3c83e3eb0 100644 --- a/server/core/modutil.cc +++ b/server/core/modutil.cc @@ -636,12 +636,13 @@ GWBUF* modutil_get_complete_packets(GWBUF **p_readbuf) return complete; } -int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more) +int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more, bool* skip) { unsigned int len = gwbuf_length(reply); int eof = 0; int err = 0; size_t offset = 0; + bool skip_next = skip ? *skip : false; while (offset < len) { @@ -649,16 +650,29 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more) gwbuf_copy_data(reply, offset, MYSQL_HEADER_LEN + 1, header); - unsigned int pktlen = MYSQL_GET_PAYLOAD_LEN(header) + MYSQL_HEADER_LEN; + unsigned int payloadlen = MYSQL_GET_PAYLOAD_LEN(header); + unsigned int pktlen = payloadlen + MYSQL_HEADER_LEN; - if (MYSQL_GET_COMMAND(header) == MYSQL_REPLY_ERR) + if (payloadlen == GW_MYSQL_MAX_PACKET_LEN) { - err++; + skip_next = true; } - else if (MYSQL_GET_COMMAND(header) == MYSQL_REPLY_EOF && - pktlen == 5 + MYSQL_HEADER_LEN) + else if (skip_next) { - eof++; + skip_next = false; + } + else + { + uint8_t command = MYSQL_GET_COMMAND(header); + + if (command == MYSQL_REPLY_ERR) + { + err++; + } + else if (command == MYSQL_REPLY_EOF && pktlen == MYSQL_EOF_PACKET_LEN) + { + eof++; + } } if (offset + pktlen >= len || (eof + err + n_found) >= 2) @@ -675,6 +689,11 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more) int total = err + eof + n_found; + if (skip) + { + *skip = skip_next; + } + return total; } diff --git a/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c b/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c index 7cb821060..97ab8c433 100644 --- a/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c +++ b/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c @@ -645,7 +645,7 @@ static inline bool complete_ps_response(GWBUF *buffer) } bool more; - int n_eof = modutil_count_signal_packets(buffer, 0, &more); + int n_eof = modutil_count_signal_packets(buffer, 0, &more, NULL); MXS_DEBUG("Expecting %u EOF, have %u", n_eof, expected_eof); @@ -740,7 +740,7 @@ gw_read_and_write(DCB *dcb) mxs_mysql_is_result_set(read_buffer)) { bool more = false; - if (modutil_count_signal_packets(read_buffer, 0, &more) != 2) + if (modutil_count_signal_packets(read_buffer, 0, &more, NULL) != 2) { dcb->dcb_readqueue = gwbuf_append(read_buffer, dcb->dcb_readqueue); return 0; diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index a7171422b..492324e60 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -523,8 +523,10 @@ bool reply_is_complete(SRWBackend backend, GWBUF *buffer) else { bool more = false; + bool skip = backend->get_skip_packet(); int old_eof = backend->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0; - int n_eof = modutil_count_signal_packets(buffer, old_eof, &more); + int n_eof = modutil_count_signal_packets(buffer, old_eof, &more, &skip); + backend->set_skip_packet(skip); if (n_eof == 0) { diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 1007296c6..a7825e565 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -16,7 +16,8 @@ RWBackend::RWBackend(SERVER_REF* ref): mxs::Backend(ref), - m_reply_state(REPLY_STATE_DONE) + m_reply_state(REPLY_STATE_DONE), + m_skip(false) { } @@ -34,6 +35,16 @@ void RWBackend::set_reply_state(reply_state_t state) m_reply_state = state; } +void RWBackend::set_skip_packet(bool state) +{ + m_skip = state; +} + +bool RWBackend::get_skip_packet() const +{ + return m_skip; +} + bool RWBackend::execute_session_command() { bool expect_response = mxs_mysql_command_will_respond(next_session_command()->get_command()); diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 5768bf6bd..53a4daa68 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -50,9 +50,15 @@ public: bool execute_session_command(); bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE); + void set_skip_packet(bool state); + bool get_skip_packet() const; + private: reply_state_t m_reply_state; BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */ + bool m_skip; /**< Used to store the state of the EOF packet + * calculation for result sets when the result + * contains very large rows */ }; typedef std::tr1::shared_ptr SRWBackend;