Fix EOF packet calculation for large rows

The EOF packet calculation function in modutil.cc didn't handle the case
where the payload exceeded maximum packet size and could mistake binary
data for a ERR packet.

The state of a multi-packet payload is now exposed by the
modutil_count_signal_packets function. This allows proper handling of
large multi-packet payloads.

Added minor improvements to mxs1110_16mb to handle testing of this change.
This commit is contained in:
Markus Mäkelä
2017-08-25 14:23:14 +03:00
parent 5832352a08
commit 13f7015e7b
7 changed files with 58 additions and 12 deletions

View File

@ -59,13 +59,19 @@ GWBUF* modutil_create_mysql_err_msg(int packet_number,
* whole packets. If partial packets are in the buffer, they are ignored.
* The caller must handle the detection of partial packets in buffers.
*
* On the first invocation, the value pointed by @c skip should be set to false.
* On all subsequent calls, for partial result sets, the function uses it to
* store the internal state. When the value pointed by @c skip is set to true,
* the next call must be done with only unprocessed packets in @c reply.
*
* @param reply Buffer to use
* @param n_found Number of previous found packets
* @param more Set to true of more results exist
* @param skip Internal state of the function used for handling large payloads.
*
* @return Total number of EOF and ERR packets including the ones already found
*/
int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more);
int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more, bool* skip);
mxs_pcre2_result_t modutil_mysql_wildcard_match(const char* pattern, const char* string);

View File

@ -13,6 +13,7 @@ int main(int argc, char *argv[])
{
TestConnections::skip_maxscale_start(true);
TestConnections * Test = new TestConnections(argc, argv);
Test->stop_maxscale();
Test->set_timeout(60);
int chunk_size = 2500000;
int chunk_num = 5;
@ -35,6 +36,7 @@ int main(int argc, char *argv[])
Test->repl->close_connections();
Test->close_maxscale_connections();
Test->repl->sync_slaves();
Test->connect_maxscale();
Test->tprintf("Checking data via RWSplit\n");
check_longblob_data(Test, Test->conn_rwsplit, chunk_size, chunk_num, 2);

View File

@ -636,12 +636,13 @@ GWBUF* modutil_get_complete_packets(GWBUF **p_readbuf)
return complete;
}
int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more)
int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more, bool* skip)
{
unsigned int len = gwbuf_length(reply);
int eof = 0;
int err = 0;
size_t offset = 0;
bool skip_next = skip ? *skip : false;
while (offset < len)
{
@ -649,16 +650,29 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more)
gwbuf_copy_data(reply, offset, MYSQL_HEADER_LEN + 1, header);
unsigned int pktlen = MYSQL_GET_PAYLOAD_LEN(header) + MYSQL_HEADER_LEN;
unsigned int payloadlen = MYSQL_GET_PAYLOAD_LEN(header);
unsigned int pktlen = payloadlen + MYSQL_HEADER_LEN;
if (MYSQL_GET_COMMAND(header) == MYSQL_REPLY_ERR)
if (payloadlen == GW_MYSQL_MAX_PACKET_LEN)
{
err++;
skip_next = true;
}
else if (MYSQL_GET_COMMAND(header) == MYSQL_REPLY_EOF &&
pktlen == 5 + MYSQL_HEADER_LEN)
else if (skip_next)
{
eof++;
skip_next = false;
}
else
{
uint8_t command = MYSQL_GET_COMMAND(header);
if (command == MYSQL_REPLY_ERR)
{
err++;
}
else if (command == MYSQL_REPLY_EOF && pktlen == MYSQL_EOF_PACKET_LEN)
{
eof++;
}
}
if (offset + pktlen >= len || (eof + err + n_found) >= 2)
@ -675,6 +689,11 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more)
int total = err + eof + n_found;
if (skip)
{
*skip = skip_next;
}
return total;
}

View File

@ -645,7 +645,7 @@ static inline bool complete_ps_response(GWBUF *buffer)
}
bool more;
int n_eof = modutil_count_signal_packets(buffer, 0, &more);
int n_eof = modutil_count_signal_packets(buffer, 0, &more, NULL);
MXS_DEBUG("Expecting %u EOF, have %u", n_eof, expected_eof);
@ -740,7 +740,7 @@ gw_read_and_write(DCB *dcb)
mxs_mysql_is_result_set(read_buffer))
{
bool more = false;
if (modutil_count_signal_packets(read_buffer, 0, &more) != 2)
if (modutil_count_signal_packets(read_buffer, 0, &more, NULL) != 2)
{
dcb->dcb_readqueue = gwbuf_append(read_buffer, dcb->dcb_readqueue);
return 0;

View File

@ -523,8 +523,10 @@ bool reply_is_complete(SRWBackend backend, GWBUF *buffer)
else
{
bool more = false;
bool skip = backend->get_skip_packet();
int old_eof = backend->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
int n_eof = modutil_count_signal_packets(buffer, old_eof, &more);
int n_eof = modutil_count_signal_packets(buffer, old_eof, &more, &skip);
backend->set_skip_packet(skip);
if (n_eof == 0)
{

View File

@ -16,7 +16,8 @@
RWBackend::RWBackend(SERVER_REF* ref):
mxs::Backend(ref),
m_reply_state(REPLY_STATE_DONE)
m_reply_state(REPLY_STATE_DONE),
m_skip(false)
{
}
@ -34,6 +35,16 @@ void RWBackend::set_reply_state(reply_state_t state)
m_reply_state = state;
}
void RWBackend::set_skip_packet(bool state)
{
m_skip = state;
}
bool RWBackend::get_skip_packet() const
{
return m_skip;
}
bool RWBackend::execute_session_command()
{
bool expect_response = mxs_mysql_command_will_respond(next_session_command()->get_command());

View File

@ -50,9 +50,15 @@ public:
bool execute_session_command();
bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE);
void set_skip_packet(bool state);
bool get_skip_packet() const;
private:
reply_state_t m_reply_state;
BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */
bool m_skip; /**< Used to store the state of the EOF packet
* calculation for result sets when the result
* contains very large rows */
};
typedef std::tr1::shared_ptr<RWBackend> SRWBackend;