Simplify RWBackend result handling
By processing the packets one at a time, the reply state is updated correctly regardless of how many packets are received. This removes the need for the clunky code that used modutil_count_signal_packets to detect the end of the result set.
This commit is contained in:
@ -28,6 +28,7 @@ enum reply_state_t
|
|||||||
REPLY_STATE_START, /**< Query sent to backend */
|
REPLY_STATE_START, /**< Query sent to backend */
|
||||||
REPLY_STATE_DONE, /**< Complete reply received */
|
REPLY_STATE_DONE, /**< Complete reply received */
|
||||||
REPLY_STATE_RSET_COLDEF, /**< Resultset response, waiting for column definitions */
|
REPLY_STATE_RSET_COLDEF, /**< Resultset response, waiting for column definitions */
|
||||||
|
REPLY_STATE_RSET_COLDEF_EOF,/**< Resultset response, waiting for EOF for column definitions */
|
||||||
REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */
|
REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -67,6 +68,9 @@ public:
|
|||||||
case REPLY_STATE_RSET_COLDEF:
|
case REPLY_STATE_RSET_COLDEF:
|
||||||
return "COLDEF";
|
return "COLDEF";
|
||||||
|
|
||||||
|
case REPLY_STATE_RSET_COLDEF_EOF:
|
||||||
|
return "COLDEF_EOF";
|
||||||
|
|
||||||
case REPLY_STATE_RSET_ROWS:
|
case REPLY_STATE_RSET_ROWS:
|
||||||
return "ROWS";
|
return "ROWS";
|
||||||
|
|
||||||
@ -137,6 +141,8 @@ public:
|
|||||||
return m_reply_state == REPLY_STATE_DONE;
|
return m_reply_state == REPLY_STATE_DONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void process_packets(GWBUF* buffer);
|
||||||
|
|
||||||
// Controlled by the session
|
// Controlled by the session
|
||||||
ResponseStat& response_stat();
|
ResponseStat& response_stat();
|
||||||
private:
|
private:
|
||||||
@ -148,6 +154,7 @@ private:
|
|||||||
uint32_t m_expected_rows; /**< Number of rows a COM_STMT_FETCH is retrieving */
|
uint32_t m_expected_rows; /**< Number of rows a COM_STMT_FETCH is retrieving */
|
||||||
bool m_local_infile_requested; /**< Whether a LOCAL INFILE was requested */
|
bool m_local_infile_requested; /**< Whether a LOCAL INFILE was requested */
|
||||||
ResponseStat m_response_stat;
|
ResponseStat m_response_stat;
|
||||||
|
uint64_t m_num_coldefs = 0;
|
||||||
|
|
||||||
inline bool is_opening_cursor() const
|
inline bool is_opening_cursor() const
|
||||||
{
|
{
|
||||||
|
@ -129,9 +129,23 @@ void RWBackend::close(close_type type)
|
|||||||
|
|
||||||
bool RWBackend::consume_fetched_rows(GWBUF* buffer)
|
bool RWBackend::consume_fetched_rows(GWBUF* buffer)
|
||||||
{
|
{
|
||||||
|
bool rval = false;
|
||||||
|
bool more = false;
|
||||||
|
int n_eof = modutil_count_signal_packets(buffer, 0, &more, &m_modutil_state);
|
||||||
|
|
||||||
|
// If the server responded with an error, n_eof > 0
|
||||||
|
if (n_eof > 0)
|
||||||
|
{
|
||||||
|
rval = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
m_expected_rows -= modutil_count_packets(buffer);
|
m_expected_rows -= modutil_count_packets(buffer);
|
||||||
mxb_assert(m_expected_rows >= 0);
|
mxb_assert(m_expected_rows >= 0);
|
||||||
return m_expected_rows == 0;
|
rval = m_expected_rows == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline bool have_next_packet(GWBUF* buffer)
|
static inline bool have_next_packet(GWBUF* buffer)
|
||||||
@ -140,6 +154,189 @@ static inline bool have_next_packet(GWBUF* buffer)
|
|||||||
return gwbuf_length(buffer) > len;
|
return gwbuf_length(buffer) > len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template<class Iter>
|
||||||
|
uint64_t get_encoded_int(Iter it)
|
||||||
|
{
|
||||||
|
uint64_t len = *it++;
|
||||||
|
|
||||||
|
switch (len)
|
||||||
|
{
|
||||||
|
case 0xfc:
|
||||||
|
len = *it++;
|
||||||
|
len |= ((uint64_t)*it++) << 8;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 0xfd:
|
||||||
|
len = *it++;
|
||||||
|
len |= ((uint64_t)*it++) << 8;
|
||||||
|
len |= ((uint64_t)*it++) << 16;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 0xfe:
|
||||||
|
len = *it++;
|
||||||
|
len |= ((uint64_t)*it++) << 8;
|
||||||
|
len |= ((uint64_t)*it++) << 16;
|
||||||
|
len |= ((uint64_t)*it++) << 24;
|
||||||
|
len |= ((uint64_t)*it++) << 32;
|
||||||
|
len |= ((uint64_t)*it++) << 40;
|
||||||
|
len |= ((uint64_t)*it++) << 48;
|
||||||
|
len |= ((uint64_t)*it++) << 56;
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
|
||||||
|
template<class Iter>
|
||||||
|
Iter skip_encoded_int(Iter it)
|
||||||
|
{
|
||||||
|
switch (*it)
|
||||||
|
{
|
||||||
|
case 0xfc:
|
||||||
|
return std::next(it, 3);
|
||||||
|
|
||||||
|
case 0xfd:
|
||||||
|
return std::next(it, 4);
|
||||||
|
|
||||||
|
case 0xfe:
|
||||||
|
return std::next(it, 9);
|
||||||
|
|
||||||
|
default:
|
||||||
|
return std::next(it);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template<class Iter>
|
||||||
|
uint64_t is_last_ok(Iter it)
|
||||||
|
{
|
||||||
|
++it; // Skip the command byte
|
||||||
|
it = skip_encoded_int(it); // Affected rows
|
||||||
|
it = skip_encoded_int(it); // Last insert ID
|
||||||
|
uint16_t status = *it++;
|
||||||
|
status |= (*it++) << 8;
|
||||||
|
return (status & SERVER_MORE_RESULTS_EXIST) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
template<class Iter>
|
||||||
|
uint64_t is_last_eof(Iter it)
|
||||||
|
{
|
||||||
|
std::advance(it, 3); // Skip the command byte and warning count
|
||||||
|
uint16_t status = *it++;
|
||||||
|
status |= (*it++) << 8;
|
||||||
|
return (status & SERVER_MORE_RESULTS_EXIST) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void RWBackend::process_packets(GWBUF* result)
|
||||||
|
{
|
||||||
|
mxs::Buffer buffer(result);
|
||||||
|
auto it = buffer.begin();
|
||||||
|
|
||||||
|
while (it != buffer.end())
|
||||||
|
{
|
||||||
|
// Extract packet length and command byte
|
||||||
|
uint32_t len = *it++;
|
||||||
|
len |= (*it++) << 8;
|
||||||
|
len |= (*it++) << 16;
|
||||||
|
++it; // Skip the sequence
|
||||||
|
mxb_assert(it != buffer.end());
|
||||||
|
auto end = std::next(it, len);
|
||||||
|
uint8_t cmd = *it;
|
||||||
|
|
||||||
|
switch (m_reply_state)
|
||||||
|
{
|
||||||
|
case REPLY_STATE_START:
|
||||||
|
m_local_infile_requested = false;
|
||||||
|
|
||||||
|
switch (cmd)
|
||||||
|
{
|
||||||
|
case MYSQL_REPLY_OK:
|
||||||
|
if (is_last_ok(it))
|
||||||
|
{
|
||||||
|
// No more results
|
||||||
|
set_reply_state(REPLY_STATE_DONE);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MYSQL_REPLY_LOCAL_INFILE:
|
||||||
|
m_local_infile_requested = true;
|
||||||
|
set_reply_state(REPLY_STATE_DONE);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MYSQL_REPLY_ERR:
|
||||||
|
// Nothing ever follows an error packet
|
||||||
|
set_reply_state(REPLY_STATE_DONE);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MYSQL_REPLY_EOF:
|
||||||
|
// EOF packets are never expected as the first response
|
||||||
|
mxb_assert(!true);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
|
||||||
|
if (current_command() == MXS_COM_FIELD_LIST)
|
||||||
|
{
|
||||||
|
// COM_FIELD_LIST sends a strange kind of a result set
|
||||||
|
set_reply_state(REPLY_STATE_RSET_ROWS);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Start of a result set
|
||||||
|
m_num_coldefs = get_encoded_int(it);
|
||||||
|
set_reply_state(REPLY_STATE_RSET_COLDEF);
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REPLY_STATE_DONE:
|
||||||
|
// This should never happen
|
||||||
|
mxb_assert(!true);
|
||||||
|
MXS_ERROR("Unexpected result state. cmd: 0x%02hhx, len: %u", cmd, len);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REPLY_STATE_RSET_COLDEF:
|
||||||
|
mxb_assert(m_num_coldefs > 0);
|
||||||
|
--m_num_coldefs;
|
||||||
|
|
||||||
|
if (m_num_coldefs == 0)
|
||||||
|
{
|
||||||
|
set_reply_state(REPLY_STATE_RSET_COLDEF_EOF);
|
||||||
|
// Skip this state when DEPRECATE_EOF capability is supported
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REPLY_STATE_RSET_COLDEF_EOF:
|
||||||
|
mxb_assert(cmd == MYSQL_REPLY_EOF && len == MYSQL_EOF_PACKET_LEN - MYSQL_HEADER_LEN);
|
||||||
|
set_reply_state(REPLY_STATE_RSET_ROWS);
|
||||||
|
|
||||||
|
if (is_opening_cursor())
|
||||||
|
{
|
||||||
|
set_cursor_opened();
|
||||||
|
MXS_INFO("Cursor successfully opened");
|
||||||
|
set_reply_state(REPLY_STATE_DONE);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REPLY_STATE_RSET_ROWS:
|
||||||
|
if ((cmd == MYSQL_REPLY_EOF && len == MYSQL_EOF_PACKET_LEN - MYSQL_HEADER_LEN)
|
||||||
|
|| cmd == MYSQL_REPLY_ERR)
|
||||||
|
{
|
||||||
|
set_reply_state(is_last_eof(it) ? REPLY_STATE_DONE : REPLY_STATE_START);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
it = end;
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer.release();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Process a possibly partial response from the backend
|
* @brief Process a possibly partial response from the backend
|
||||||
*
|
*
|
||||||
@ -149,114 +346,24 @@ void RWBackend::process_reply(GWBUF* buffer)
|
|||||||
{
|
{
|
||||||
if (current_command() == MXS_COM_STMT_FETCH)
|
if (current_command() == MXS_COM_STMT_FETCH)
|
||||||
{
|
{
|
||||||
bool more = false;
|
|
||||||
int n_eof = modutil_count_signal_packets(buffer, 0, &more, &m_modutil_state);
|
|
||||||
|
|
||||||
// If the server responded with an error, n_eof > 0
|
// If the server responded with an error, n_eof > 0
|
||||||
if (n_eof > 0 || consume_fetched_rows(buffer))
|
if (consume_fetched_rows(buffer))
|
||||||
{
|
{
|
||||||
set_reply_state(REPLY_STATE_DONE);
|
set_reply_state(REPLY_STATE_DONE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (current_command() == MXS_COM_STATISTICS)
|
else if (current_command() == MXS_COM_STATISTICS || GWBUF_IS_COLLECTED_RESULT(buffer))
|
||||||
{
|
{
|
||||||
// COM_STATISTICS returns a single string and thus requires special handling
|
// COM_STATISTICS returns a single string and thus requires special handling.
|
||||||
|
// Collected result are all in one buffer and need no processing.
|
||||||
set_reply_state(REPLY_STATE_DONE);
|
set_reply_state(REPLY_STATE_DONE);
|
||||||
}
|
}
|
||||||
else if (get_reply_state() == REPLY_STATE_START
|
|
||||||
&& (!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
|
|
||||||
{
|
|
||||||
m_local_infile_requested = false;
|
|
||||||
|
|
||||||
if (GWBUF_IS_COLLECTED_RESULT(buffer)
|
|
||||||
|| current_command() == MXS_COM_STMT_PREPARE
|
|
||||||
|| !mxs_mysql_is_ok_packet(buffer)
|
|
||||||
|| !mxs_mysql_more_results_after_ok(buffer))
|
|
||||||
{
|
|
||||||
/** Not a result set, we have the complete response */
|
|
||||||
set_reply_state(REPLY_STATE_DONE);
|
|
||||||
|
|
||||||
if (mxs_mysql_is_local_infile(buffer))
|
|
||||||
{
|
|
||||||
m_local_infile_requested = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// This is an OK packet and more results will follow
|
// Normal result, process it one packet at a time
|
||||||
mxb_assert(mxs_mysql_is_ok_packet(buffer)
|
process_packets(buffer);
|
||||||
&& mxs_mysql_more_results_after_ok(buffer));
|
|
||||||
|
|
||||||
if (have_next_packet(buffer))
|
|
||||||
{
|
|
||||||
// TODO: Don't clone the buffer
|
|
||||||
GWBUF* tmp = gwbuf_clone(buffer);
|
|
||||||
tmp = gwbuf_consume(tmp, mxs_mysql_get_packet_len(tmp));
|
|
||||||
|
|
||||||
// Consume repeating OK packets
|
|
||||||
while (mxs_mysql_more_results_after_ok(buffer) && have_next_packet(tmp))
|
|
||||||
{
|
|
||||||
tmp = gwbuf_consume(tmp, mxs_mysql_get_packet_len(tmp));
|
|
||||||
mxb_assert(tmp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
process_reply(tmp);
|
|
||||||
gwbuf_free(tmp);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
bool more = false;
|
|
||||||
int n_old_eof = get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
|
|
||||||
int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &m_modutil_state);
|
|
||||||
|
|
||||||
if (n_eof > 2)
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* We have multiple results in the buffer, we only care about
|
|
||||||
* the state of the last one. Skip the complete result sets and act
|
|
||||||
* like we're processing a single result set.
|
|
||||||
*/
|
|
||||||
n_eof = n_eof % 2 ? 1 : 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (n_eof == 0)
|
|
||||||
{
|
|
||||||
/** Waiting for the EOF packet after the column definitions */
|
|
||||||
set_reply_state(REPLY_STATE_RSET_COLDEF);
|
|
||||||
}
|
|
||||||
else if (n_eof == 1 && current_command() != MXS_COM_FIELD_LIST)
|
|
||||||
{
|
|
||||||
/** Waiting for the EOF packet after the rows */
|
|
||||||
set_reply_state(REPLY_STATE_RSET_ROWS);
|
|
||||||
|
|
||||||
if (is_opening_cursor())
|
|
||||||
{
|
|
||||||
set_cursor_opened();
|
|
||||||
MXS_INFO("Cursor successfully opened");
|
|
||||||
set_reply_state(REPLY_STATE_DONE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/** We either have a complete result set or a response to
|
|
||||||
* a COM_FIELD_LIST command */
|
|
||||||
mxb_assert(n_eof == 2 || (n_eof == 1 && current_command() == MXS_COM_FIELD_LIST));
|
|
||||||
set_reply_state(REPLY_STATE_DONE);
|
|
||||||
|
|
||||||
if (more)
|
|
||||||
{
|
|
||||||
/** The server will send more resultsets */
|
|
||||||
set_reply_state(REPLY_STATE_START);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
MXS_DEBUG("cmd: %02hhx bytes: %u packets: %d state: %s", mxs_mysql_get_command(buffer),
|
|
||||||
gwbuf_length(buffer), modutil_count_packets(buffer), reply_state_str());
|
|
||||||
|
|
||||||
if (get_reply_state() == REPLY_STATE_DONE)
|
if (get_reply_state() == REPLY_STATE_DONE)
|
||||||
{
|
{
|
||||||
ack_write();
|
ack_write();
|
||||||
|
Reference in New Issue
Block a user