diff --git a/include/maxscale/protocol/rwbackend.hh b/include/maxscale/protocol/rwbackend.hh index d8b1841d5..64c8fd91f 100644 --- a/include/maxscale/protocol/rwbackend.hh +++ b/include/maxscale/protocol/rwbackend.hh @@ -28,6 +28,7 @@ enum reply_state_t REPLY_STATE_START, /**< Query sent to backend */ REPLY_STATE_DONE, /**< Complete reply received */ REPLY_STATE_RSET_COLDEF, /**< Resultset response, waiting for column definitions */ + REPLY_STATE_RSET_COLDEF_EOF,/**< Resultset response, waiting for EOF for column definitions */ REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */ }; @@ -67,6 +68,9 @@ public: case REPLY_STATE_RSET_COLDEF: return "COLDEF"; + case REPLY_STATE_RSET_COLDEF_EOF: + return "COLDEF_EOF"; + case REPLY_STATE_RSET_ROWS: return "ROWS"; @@ -137,6 +141,8 @@ public: return m_reply_state == REPLY_STATE_DONE; } + void process_packets(GWBUF* buffer); + // Controlled by the session ResponseStat& response_stat(); private: @@ -148,6 +154,7 @@ private: uint32_t m_expected_rows; /**< Number of rows a COM_STMT_FETCH is retrieving */ bool m_local_infile_requested; /**< Whether a LOCAL INFILE was requested */ ResponseStat m_response_stat; + uint64_t m_num_coldefs = 0; inline bool is_opening_cursor() const { diff --git a/server/modules/protocol/MySQL/rwbackend.cc b/server/modules/protocol/MySQL/rwbackend.cc index e95b6406b..abeb0db82 100644 --- a/server/modules/protocol/MySQL/rwbackend.cc +++ b/server/modules/protocol/MySQL/rwbackend.cc @@ -129,9 +129,23 @@ void RWBackend::close(close_type type) bool RWBackend::consume_fetched_rows(GWBUF* buffer) { - m_expected_rows -= modutil_count_packets(buffer); - mxb_assert(m_expected_rows >= 0); - return m_expected_rows == 0; + bool rval = false; + bool more = false; + int n_eof = modutil_count_signal_packets(buffer, 0, &more, &m_modutil_state); + + // If the server responded with an error, n_eof > 0 + if (n_eof > 0) + { + rval = true; + } + else + { + m_expected_rows -= modutil_count_packets(buffer); + mxb_assert(m_expected_rows >= 0); + rval = m_expected_rows == 0; + } + + return rval; } static inline bool have_next_packet(GWBUF* buffer) @@ -140,6 +154,189 @@ static inline bool have_next_packet(GWBUF* buffer) return gwbuf_length(buffer) > len; } +template +uint64_t get_encoded_int(Iter it) +{ + uint64_t len = *it++; + + switch (len) + { + case 0xfc: + len = *it++; + len |= ((uint64_t)*it++) << 8; + break; + + case 0xfd: + len = *it++; + len |= ((uint64_t)*it++) << 8; + len |= ((uint64_t)*it++) << 16; + break; + + case 0xfe: + len = *it++; + len |= ((uint64_t)*it++) << 8; + len |= ((uint64_t)*it++) << 16; + len |= ((uint64_t)*it++) << 24; + len |= ((uint64_t)*it++) << 32; + len |= ((uint64_t)*it++) << 40; + len |= ((uint64_t)*it++) << 48; + len |= ((uint64_t)*it++) << 56; + break; + + default: + break; + } + + return len; +} + +template +Iter skip_encoded_int(Iter it) +{ + switch (*it) + { + case 0xfc: + return std::next(it, 3); + + case 0xfd: + return std::next(it, 4); + + case 0xfe: + return std::next(it, 9); + + default: + return std::next(it); + } +} + +template +uint64_t is_last_ok(Iter it) +{ + ++it; // Skip the command byte + it = skip_encoded_int(it); // Affected rows + it = skip_encoded_int(it); // Last insert ID + uint16_t status = *it++; + status |= (*it++) << 8; + return (status & SERVER_MORE_RESULTS_EXIST) == 0; +} + +template +uint64_t is_last_eof(Iter it) +{ + std::advance(it, 3); // Skip the command byte and warning count + uint16_t status = *it++; + status |= (*it++) << 8; + return (status & SERVER_MORE_RESULTS_EXIST) == 0; +} + +void RWBackend::process_packets(GWBUF* result) +{ + mxs::Buffer buffer(result); + auto it = buffer.begin(); + + while (it != buffer.end()) + { + // Extract packet length and command byte + uint32_t len = *it++; + len |= (*it++) << 8; + len |= (*it++) << 16; + ++it; // Skip the sequence + mxb_assert(it != buffer.end()); + auto end = std::next(it, len); + uint8_t cmd = *it; + + switch (m_reply_state) + { + case REPLY_STATE_START: + m_local_infile_requested = false; + + switch (cmd) + { + case MYSQL_REPLY_OK: + if (is_last_ok(it)) + { + // No more results + set_reply_state(REPLY_STATE_DONE); + } + break; + + case MYSQL_REPLY_LOCAL_INFILE: + m_local_infile_requested = true; + set_reply_state(REPLY_STATE_DONE); + break; + + case MYSQL_REPLY_ERR: + // Nothing ever follows an error packet + set_reply_state(REPLY_STATE_DONE); + break; + + case MYSQL_REPLY_EOF: + // EOF packets are never expected as the first response + mxb_assert(!true); + break; + + default: + + if (current_command() == MXS_COM_FIELD_LIST) + { + // COM_FIELD_LIST sends a strange kind of a result set + set_reply_state(REPLY_STATE_RSET_ROWS); + } + else + { + // Start of a result set + m_num_coldefs = get_encoded_int(it); + set_reply_state(REPLY_STATE_RSET_COLDEF); + } + + break; + } + break; + + case REPLY_STATE_DONE: + // This should never happen + mxb_assert(!true); + MXS_ERROR("Unexpected result state. cmd: 0x%02hhx, len: %u", cmd, len); + break; + + case REPLY_STATE_RSET_COLDEF: + mxb_assert(m_num_coldefs > 0); + --m_num_coldefs; + + if (m_num_coldefs == 0) + { + set_reply_state(REPLY_STATE_RSET_COLDEF_EOF); + // Skip this state when DEPRECATE_EOF capability is supported + } + break; + + case REPLY_STATE_RSET_COLDEF_EOF: + mxb_assert(cmd == MYSQL_REPLY_EOF && len == MYSQL_EOF_PACKET_LEN - MYSQL_HEADER_LEN); + set_reply_state(REPLY_STATE_RSET_ROWS); + + if (is_opening_cursor()) + { + set_cursor_opened(); + MXS_INFO("Cursor successfully opened"); + set_reply_state(REPLY_STATE_DONE); + } + break; + + case REPLY_STATE_RSET_ROWS: + if ((cmd == MYSQL_REPLY_EOF && len == MYSQL_EOF_PACKET_LEN - MYSQL_HEADER_LEN) + || cmd == MYSQL_REPLY_ERR) + { + set_reply_state(is_last_eof(it) ? REPLY_STATE_DONE : REPLY_STATE_START); + } + break; + } + + it = end; + } + + buffer.release(); +} + /** * @brief Process a possibly partial response from the backend * @@ -149,114 +346,24 @@ void RWBackend::process_reply(GWBUF* buffer) { if (current_command() == MXS_COM_STMT_FETCH) { - bool more = false; - int n_eof = modutil_count_signal_packets(buffer, 0, &more, &m_modutil_state); - // If the server responded with an error, n_eof > 0 - if (n_eof > 0 || consume_fetched_rows(buffer)) + if (consume_fetched_rows(buffer)) { set_reply_state(REPLY_STATE_DONE); } } - else if (current_command() == MXS_COM_STATISTICS) + else if (current_command() == MXS_COM_STATISTICS || GWBUF_IS_COLLECTED_RESULT(buffer)) { - // COM_STATISTICS returns a single string and thus requires special handling + // COM_STATISTICS returns a single string and thus requires special handling. + // Collected result are all in one buffer and need no processing. set_reply_state(REPLY_STATE_DONE); } - else if (get_reply_state() == REPLY_STATE_START - && (!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer))) - { - m_local_infile_requested = false; - - if (GWBUF_IS_COLLECTED_RESULT(buffer) - || current_command() == MXS_COM_STMT_PREPARE - || !mxs_mysql_is_ok_packet(buffer) - || !mxs_mysql_more_results_after_ok(buffer)) - { - /** Not a result set, we have the complete response */ - set_reply_state(REPLY_STATE_DONE); - - if (mxs_mysql_is_local_infile(buffer)) - { - m_local_infile_requested = true; - } - } - else - { - // This is an OK packet and more results will follow - mxb_assert(mxs_mysql_is_ok_packet(buffer) - && mxs_mysql_more_results_after_ok(buffer)); - - if (have_next_packet(buffer)) - { - // TODO: Don't clone the buffer - GWBUF* tmp = gwbuf_clone(buffer); - tmp = gwbuf_consume(tmp, mxs_mysql_get_packet_len(tmp)); - - // Consume repeating OK packets - while (mxs_mysql_more_results_after_ok(buffer) && have_next_packet(tmp)) - { - tmp = gwbuf_consume(tmp, mxs_mysql_get_packet_len(tmp)); - mxb_assert(tmp); - } - - process_reply(tmp); - gwbuf_free(tmp); - return; - } - } - } else { - bool more = false; - int n_old_eof = get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0; - int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &m_modutil_state); - - if (n_eof > 2) - { - /** - * We have multiple results in the buffer, we only care about - * the state of the last one. Skip the complete result sets and act - * like we're processing a single result set. - */ - n_eof = n_eof % 2 ? 1 : 2; - } - - if (n_eof == 0) - { - /** Waiting for the EOF packet after the column definitions */ - set_reply_state(REPLY_STATE_RSET_COLDEF); - } - else if (n_eof == 1 && current_command() != MXS_COM_FIELD_LIST) - { - /** Waiting for the EOF packet after the rows */ - set_reply_state(REPLY_STATE_RSET_ROWS); - - if (is_opening_cursor()) - { - set_cursor_opened(); - MXS_INFO("Cursor successfully opened"); - set_reply_state(REPLY_STATE_DONE); - } - } - else - { - /** We either have a complete result set or a response to - * a COM_FIELD_LIST command */ - mxb_assert(n_eof == 2 || (n_eof == 1 && current_command() == MXS_COM_FIELD_LIST)); - set_reply_state(REPLY_STATE_DONE); - - if (more) - { - /** The server will send more resultsets */ - set_reply_state(REPLY_STATE_START); - } - } + // Normal result, process it one packet at a time + process_packets(buffer); } - MXS_DEBUG("cmd: %02hhx bytes: %u packets: %d state: %s", mxs_mysql_get_command(buffer), - gwbuf_length(buffer), modutil_count_packets(buffer), reply_state_str()); - if (get_reply_state() == REPLY_STATE_DONE) { ack_write();