MXS-1776: Handle recursive COM_STMT_EXECUTE commands

Readwritesplit would not handle multiple overlapping COM_STMT_EXECUTE
commands properly if they opened cursors. This was due to the fact that
the result would not be marked as complete and COM_STMT_FETCH commands
were executed as if they did not return results.

The correct implementation is to consider a COM_STMT_EXECUTE that opens a
cursor complete only when the first EOF packet is read (that is, when the
resultset header is read). This allows subsequent COM_STMT_FETCH commands
to be handled separately.

The separate COM_STMT_FETCH handling must count the number of packets that
are being fetched. This allows correct tracking of the state of a
COM_STMT_FETCH by checking that the number of packets is correct or the
second EOF/ERR packet is read.
This commit is contained in:
Markus Mäkelä
2018-04-11 15:16:22 +03:00
parent 252475cdc5
commit 311adf817f
6 changed files with 79 additions and 5 deletions

View File

@ -45,6 +45,7 @@ int modutil_MySQL_query_len(GWBUF* buf, int* nbytes_missing);
void modutil_reply_parse_error(DCB* backend_dcb, char* errstr, uint32_t flags);
void modutil_reply_auth_error(DCB* backend_dcb, char* errstr, uint32_t flags);
int modutil_count_statements(GWBUF* buffer);
int modutil_count_packets(GWBUF* buffer);
GWBUF* modutil_create_query(const char* query);
GWBUF* modutil_create_mysql_err_msg(int packet_number,
int affected_rows,

View File

@ -1068,6 +1068,21 @@ int modutil_count_statements(GWBUF* buffer)
return num;
}
int modutil_count_packets(GWBUF* buffer)
{
int packets = 0;
size_t offset = 0;
uint8_t len[3];
while (gwbuf_copy_data(buffer, offset, 3, len) == 3)
{
++packets;
offset += gw_mysql_get_byte3(len) + MYSQL_HEADER_LEN;
}
return packets;
}
/**
* Initialize the PCRE2 patterns used when converting MySQL wildcards to PCRE syntax.
*/

View File

@ -520,7 +520,20 @@ static inline bool have_next_packet(GWBUF* buffer)
*/
bool reply_is_complete(SRWBackend& backend, GWBUF *buffer)
{
if (backend->get_reply_state() == REPLY_STATE_START &&
if (backend->current_command() == MXS_COM_STMT_FETCH)
{
bool more = false;
modutil_state state = {backend->is_large_packet()};
int n_eof = modutil_count_signal_packets(buffer, 0, &more, &state);
backend->set_large_packet(state.state);
if (n_eof > 0 || backend->consume_fetched_rows(buffer))
{
LOG_RS(backend, REPLY_STATE_DONE);
backend->set_reply_state(REPLY_STATE_DONE);
}
}
else if (backend->get_reply_state() == REPLY_STATE_START &&
(!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
{
if (GWBUF_IS_COLLECTED_RESULT(buffer) ||
@ -575,6 +588,13 @@ bool reply_is_complete(SRWBackend& backend, GWBUF *buffer)
/** Waiting for the EOF packet after the rows */
LOG_RS(backend, REPLY_STATE_RSET_ROWS);
backend->set_reply_state(REPLY_STATE_RSET_ROWS);
if (backend->cursor_is_open())
{
MXS_INFO("Cursor successfully opened");
LOG_RS(backend, REPLY_STATE_DONE);
backend->set_reply_state(REPLY_STATE_DONE);
}
}
else
{
@ -927,7 +947,6 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
{
if (rses->query_queue == NULL &&
(rses->expected_responses == 0 ||
mxs_mysql_get_command(querybuf) == MXS_COM_STMT_FETCH ||
rses->load_data_state == LOAD_DATA_ACTIVE ||
rses->large_query))
{

View File

@ -1111,8 +1111,7 @@ static inline bool query_creates_reply(uint8_t cmd)
{
return cmd != MXS_COM_QUIT &&
cmd != MXS_COM_STMT_SEND_LONG_DATA &&
cmd != MXS_COM_STMT_CLOSE &&
cmd != MXS_COM_STMT_FETCH; // Fetch is done mid-result
cmd != MXS_COM_STMT_CLOSE;
}
static inline bool is_large_query(GWBUF* buf)

View File

@ -17,7 +17,9 @@
RWBackend::RWBackend(SERVER_REF* ref):
mxs::Backend(ref),
m_reply_state(REPLY_STATE_DONE),
m_large_packet(false)
m_large_packet(false),
m_open_cursor(false),
m_expected_rows(0)
{
}
@ -72,12 +74,40 @@ bool RWBackend::write(GWBUF* buffer, response_type type)
/** Replace the client handle with the real PS handle */
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
gw_mysql_set_byte4(ptr, it->second);
uint8_t buf[4];
if (cmd == MXS_COM_STMT_EXECUTE &&
// Skip header and command byte for the 4 byte ID
gwbuf_copy_data(buffer, 5, 4, buf) == sizeof(buf) &&
// A flag value of 0 is no cursor
gw_mysql_get_byte4(buf) != 0)
{
m_open_cursor = true;
}
else if (cmd == MXS_COM_STMT_FETCH)
{
ss_dassert(m_open_cursor);
// Number of rows to fetch is a 4 byte integer after the ID
gwbuf_copy_data(buffer, 5 + 4, 4, buf);
m_expected_rows = gw_mysql_get_byte4(buf);
}
else
{
m_open_cursor = false;
}
}
}
return mxs::Backend::write(buffer);
}
bool RWBackend::consume_fetched_rows(GWBUF* buffer)
{
m_expected_rows -= modutil_count_packets(buffer);
ss_dassert(m_expected_rows >= 0);
return m_expected_rows == 0;
}
uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer)
{
uint32_t rval = 0;

View File

@ -59,6 +59,9 @@ public:
bool execute_session_command();
bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE);
// For COM_STMT_FETCH processing
bool consume_fetched_rows(GWBUF* buffer);
inline void set_large_packet(bool value)
{
m_large_packet = value;
@ -74,6 +77,11 @@ public:
return m_command;
}
inline bool cursor_is_open() const
{
return m_open_cursor;
}
private:
reply_state_t m_reply_state;
BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */
@ -81,6 +89,8 @@ private:
*calculation for result sets when the result
* contains very large rows */
uint8_t m_command;
bool m_open_cursor; /**< Whether we have an open cursor */
uint32_t m_expected_rows; /**< Number of rows a COM_STMT_FETCH is retrieving */
};
typedef std::tr1::shared_ptr<RWBackend> SRWBackend;