MXS-1825: Fix PS output parameter tracking for MySQL variants
The resultset processing for MySQL requires some extra work as it lacks the proper SERVER_MORE_RESULTS_EXIST flag in the last EOF packet. Instead, the first EOF packet has the SERVER_PS_OUT_PARAMS flag which needs to be interpreted as a SERVER_MORE_RESULTS_EXIST flag for the second EOF packet. Also corrected the EOF packet handling to do the flag checks in the code that deals with the EOF packets. As the modutil_state parameter is now used for more than large packet tracking, the correct solution is to store this state object in the readwritesplit session instead of interpreting it to a boolean value.
This commit is contained in:
parent
260fcf85ec
commit
c97d2c94eb
@ -630,14 +630,20 @@ GWBUF* modutil_get_complete_packets(GWBUF **p_readbuf)
|
||||
|
||||
int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more_out, modutil_state* state)
|
||||
{
|
||||
enum
|
||||
{
|
||||
SKIP_NEXT = 0x1,
|
||||
PS_OUT_PARAM = 0x2,
|
||||
};
|
||||
|
||||
unsigned int len = gwbuf_length(reply);
|
||||
int eof = 0;
|
||||
int err = 0;
|
||||
size_t offset = 0;
|
||||
bool skip_next = state ? state->state : false;
|
||||
bool more = false;
|
||||
bool only_ok = true;
|
||||
uint64_t num_packets = 0;
|
||||
uint8_t internal_state = state ? state->state : 0;
|
||||
|
||||
while (offset < len)
|
||||
{
|
||||
@ -652,12 +658,12 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more_out, modu
|
||||
if (payloadlen == GW_MYSQL_MAX_PACKET_LEN)
|
||||
{
|
||||
only_ok = false;
|
||||
skip_next = true;
|
||||
internal_state |= SKIP_NEXT;
|
||||
}
|
||||
else if (skip_next)
|
||||
else if (internal_state & SKIP_NEXT)
|
||||
{
|
||||
only_ok = false;
|
||||
skip_next = false;
|
||||
internal_state &= ~SKIP_NEXT;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -675,6 +681,28 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more_out, modu
|
||||
{
|
||||
eof++;
|
||||
only_ok = false;
|
||||
|
||||
uint8_t status[2]; // Two byte server status
|
||||
gwbuf_copy_data(reply, offset + MYSQL_HEADER_LEN + 1 + 2, sizeof(status), status);
|
||||
more = gw_mysql_get_byte2(status) & SERVER_MORE_RESULTS_EXIST;
|
||||
|
||||
/**
|
||||
* MySQL 5.6 and 5.7 have a "feature" that doesn't set
|
||||
* the SERVER_MORE_RESULTS_EXIST flag in the last EOF packet of
|
||||
* a result set if the SERVER_PS_OUT_PARAMS flag was set in
|
||||
* the first result set. To handle this, we have to store
|
||||
* the information from the first EOF packet until we process
|
||||
* the second EOF packet.
|
||||
*/
|
||||
if (gw_mysql_get_byte2(status) & SERVER_PS_OUT_PARAMS)
|
||||
{
|
||||
internal_state |= PS_OUT_PARAM;
|
||||
}
|
||||
else if (internal_state & PS_OUT_PARAM)
|
||||
{
|
||||
more = true;
|
||||
internal_state &= ~PS_OUT_PARAM;
|
||||
}
|
||||
}
|
||||
else if (command == MYSQL_REPLY_OK && pktlen >= MYSQL_OK_PACKET_MIN_LEN &&
|
||||
(eof + n_found) % 2 == 0)
|
||||
@ -696,13 +724,6 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more_out, modu
|
||||
}
|
||||
}
|
||||
|
||||
if (offset + pktlen >= len || (eof + err + n_found) >= 2)
|
||||
{
|
||||
gwbuf_copy_data(reply, offset, sizeof(header), header);
|
||||
uint16_t* status = (uint16_t*)(header + MYSQL_HEADER_LEN + 1 + 2); // Skip command and warning count
|
||||
more = ((*status) & SERVER_MORE_RESULTS_EXIST);
|
||||
}
|
||||
|
||||
offset += pktlen;
|
||||
|
||||
if (offset >= GWBUF_LENGTH(reply) && reply->next)
|
||||
@ -717,7 +738,7 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more_out, modu
|
||||
|
||||
if (state)
|
||||
{
|
||||
state->state = skip_next;
|
||||
state->state = internal_state;
|
||||
}
|
||||
|
||||
*more_out = more;
|
||||
|
@ -523,9 +523,9 @@ bool reply_is_complete(SRWBackend& backend, GWBUF *buffer)
|
||||
if (backend->current_command() == MXS_COM_STMT_FETCH)
|
||||
{
|
||||
bool more = false;
|
||||
modutil_state state = {backend->is_large_packet()};
|
||||
modutil_state state = backend->get_modutil_state();
|
||||
int n_eof = modutil_count_signal_packets(buffer, 0, &more, &state);
|
||||
backend->set_large_packet(state.state);
|
||||
backend->set_modutil_state(state);
|
||||
|
||||
// If the server responded with an error, n_eof > 0
|
||||
if (n_eof > 0 || backend->consume_fetched_rows(buffer))
|
||||
@ -569,10 +569,10 @@ bool reply_is_complete(SRWBackend& backend, GWBUF *buffer)
|
||||
else
|
||||
{
|
||||
bool more = false;
|
||||
modutil_state state = {backend->is_large_packet()};
|
||||
modutil_state state = backend->get_modutil_state();
|
||||
int n_old_eof = backend->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
|
||||
int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &state);
|
||||
backend->set_large_packet(state.state);
|
||||
backend->set_modutil_state(state);
|
||||
|
||||
if (n_eof > 2)
|
||||
{
|
||||
|
@ -17,7 +17,7 @@
|
||||
RWBackend::RWBackend(SERVER_REF* ref):
|
||||
mxs::Backend(ref),
|
||||
m_reply_state(REPLY_STATE_DONE),
|
||||
m_large_packet(false),
|
||||
m_modutil_state({}),
|
||||
m_command(0),
|
||||
m_opening_cursor(false),
|
||||
m_expected_rows(0)
|
||||
|
@ -62,14 +62,14 @@ public:
|
||||
// For COM_STMT_FETCH processing
|
||||
bool consume_fetched_rows(GWBUF* buffer);
|
||||
|
||||
inline void set_large_packet(bool value)
|
||||
inline void set_modutil_state(modutil_state value)
|
||||
{
|
||||
m_large_packet = value;
|
||||
m_modutil_state = value;
|
||||
}
|
||||
|
||||
inline bool is_large_packet() const
|
||||
inline modutil_state get_modutil_state() const
|
||||
{
|
||||
return m_large_packet;
|
||||
return m_modutil_state;
|
||||
}
|
||||
|
||||
inline uint8_t current_command() const
|
||||
@ -90,9 +90,7 @@ public:
|
||||
private:
|
||||
reply_state_t m_reply_state;
|
||||
BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */
|
||||
bool m_large_packet; /**< Used to store the state of the EOF packet
|
||||
*calculation for result sets when the result
|
||||
* contains very large rows */
|
||||
modutil_state m_modutil_state; /**< @see modutil_count_signal_packets */
|
||||
uint8_t m_command;
|
||||
bool m_opening_cursor; /**< Whether we are opening a cursor */
|
||||
uint32_t m_expected_rows; /**< Number of rows a COM_STMT_FETCH is retrieving */
|
||||
|
Loading…
x
Reference in New Issue
Block a user