From 7a63a17278f396580dd6c750e7f903fd7a3304d3 Mon Sep 17 00:00:00 2001 From: Niclas Antti Date: Wed, 5 Jun 2019 11:13:25 +0300 Subject: [PATCH] MXS-2485 Add suport for split query packets (from client side) --- maxutils/maxsql/include/maxsql/mysql_plus.hh | 4 +- .../maxsql/include/maxsql/packet_tracker.hh | 29 ++++++--- maxutils/maxsql/src/packet_tracker.cc | 62 +++++++++++++++---- 3 files changed, 74 insertions(+), 21 deletions(-) diff --git a/maxutils/maxsql/include/maxsql/mysql_plus.hh b/maxutils/maxsql/include/maxsql/mysql_plus.hh index f73ef55f7..43b61a996 100644 --- a/maxutils/maxsql/include/maxsql/mysql_plus.hh +++ b/maxutils/maxsql/include/maxsql/mysql_plus.hh @@ -684,13 +684,13 @@ private: uint8_t m_payload_offset; }; -std::ostream& operator<<(std::ostream& os, ComResponse::Type type) +inline std::ostream& operator<<(std::ostream& os, ComResponse::Type type) { static const std::array type_names = { "Ok", "Err", "Eof", "LocalInfile", "Data" }; - auto ind = size_t(type); + auto ind = static_cast(type); return os << ((ind < type_names.size()) ? type_names[ind] : "UNKNOWN"); } diff --git a/maxutils/maxsql/include/maxsql/packet_tracker.hh b/maxutils/maxsql/include/maxsql/packet_tracker.hh index 2bdd132ec..9f2df2ae7 100644 --- a/maxutils/maxsql/include/maxsql/packet_tracker.hh +++ b/maxutils/maxsql/include/maxsql/packet_tracker.hh @@ -22,21 +22,36 @@ namespace maxsql class ComResponse; -// Minimal class to track the lifetime of a query. // TODO add documentation class PacketTracker { public: + // The State reflects the response status. For the unlikely event that the query is split, but + // no response is expected, the tracker may still be waiting for packets from the client while + // m_state = Done. The function expecting_more_packets() would return true in this case. + // TODO, maybe, rename to ResponseState. enum class State {FirstPacket, Field, FieldEof, Row, ComFieldList, ComStatistics, ComStmtFetch, Done, ErrorPacket, Error}; PacketTracker() = default; + explicit PacketTracker(GWBUF* pQuery); // Track this query - void update(GWBUF* pPacket); // Update as packets are received. + PacketTracker(const PacketTracker&) = delete; + PacketTracker& operator=(const PacketTracker&) = delete; + + PacketTracker(PacketTracker&&) = default; + PacketTracker& operator=(PacketTracker&&) = default; + + bool update_request(GWBUF* pPacket); // Updates the query (must be a split packet) + void update_response(GWBUF* pPacket); // Update as response packets are received. + + bool expecting_request_packets() const; + bool expecting_response_packets() const; + bool expecting_more_packets() const; + State state() const; - bool expecting_more_packets() const; private: // State functions. @@ -47,12 +62,12 @@ private: State com_field_list(const ComResponse& response); State com_statistics(const ComResponse& response); State com_stmt_fetch(const ComResponse& response); - - State expect_no_more(const ComResponse& response); // states: Done, ErrorPacket, Error + State expect_no_response_packets(const ComResponse& response); // states: Done, ErrorPacket, Error State m_state = State::Error; - bool m_client_packet_internal = false; - bool m_server_packet_internal = false; + bool m_client_com_packet_internal = false; + bool m_server_com_packet_internal = false; + bool m_expect_more_split_query_packets = false; int m_command; int m_total_fields; diff --git a/maxutils/maxsql/src/packet_tracker.cc b/maxutils/maxsql/src/packet_tracker.cc index f43a515a7..7c0fb0408 100644 --- a/maxutils/maxsql/src/packet_tracker.cc +++ b/maxutils/maxsql/src/packet_tracker.cc @@ -11,9 +11,7 @@ * Public License. */ -// TODO handle client split packets -// TODO handle local infile -// TODO do cursors need more special handling than ComStmtFetch has. +// LIMITATION: local infile not handled yet. #include #include @@ -38,10 +36,11 @@ std::ostream& operator<<(std::ostream& os, PacketTracker::State state) PacketTracker::PacketTracker(GWBUF* pPacket) { - ComRequest request(ComPacket(pPacket, &m_client_packet_internal)); + ComRequest request(ComPacket(pPacket, &m_client_com_packet_internal)); m_command = request.command(); + m_expect_more_split_query_packets = request.is_split_leader(); - MXS_SINFO("PacketTracker Command: " << STRPACKETTYPE(m_command)); // TODO remove or change to debug + MXS_SDEBUG("PacketTracker Command: " << STRPACKETTYPE(m_command)); if (request.server_will_respond()) { @@ -70,7 +69,38 @@ PacketTracker::PacketTracker(GWBUF* pPacket) } } -bool PacketTracker::expecting_more_packets() const +bool PacketTracker::update_request(GWBUF* pPacket) +{ + MXS_SDEBUG("PacketTracker update_request: " << STRPACKETTYPE(m_command)); + ComPacket com_packet(pPacket, &m_client_com_packet_internal); + + if (!m_expect_more_split_query_packets) + { + MXS_SERROR("PacketTracker::update_request() called while not expecting splits"); + mxb_assert(!true); + m_state = State::Error; + } + else if (!com_packet.is_split_continuation()) + { + MXS_SERROR("PacketTracker::update_request() received a non-split packet"); + mxb_assert(!true); + m_state = State::Error; + } + + if (com_packet.is_split_trailer()) + { + m_expect_more_split_query_packets = false; + } + + return m_state != State::Error; +} + +bool PacketTracker::expecting_request_packets() const +{ + return m_expect_more_split_query_packets; +} + +bool PacketTracker::expecting_response_packets() const { switch (m_state) { @@ -84,21 +114,29 @@ bool PacketTracker::expecting_more_packets() const } } +bool PacketTracker::expecting_more_packets() const +{ + return expecting_response_packets() || expecting_request_packets(); +} + static constexpr std::array data_states { - PacketTracker::State::Field, PacketTracker::State::Row, - PacketTracker::State::ComFieldList, PacketTracker::State::ComStatistics, + PacketTracker::State::Field, + PacketTracker::State::Row, + PacketTracker::State::ComFieldList, + PacketTracker::State::ComStatistics, PacketTracker::State::ComStmtFetch }; -void PacketTracker::update(GWBUF* pPacket) +void PacketTracker::update_response(GWBUF* pPacket) { - ComPacket com_packet(pPacket, &m_server_packet_internal); + ComPacket com_packet(pPacket, &m_server_com_packet_internal); bool expect_data_only = std::find(begin(data_states), end(data_states), m_state) != end(data_states); ComResponse response(com_packet, expect_data_only); if (response.is_split_continuation()) { // no state change, just more of the same data + MXS_SDEBUG("PacketTracker::update_response IGNORE trailing split packets"); return; } @@ -141,7 +179,7 @@ void PacketTracker::update(GWBUF* pPacket) case State::Done: case State::ErrorPacket: case State::Error: - m_state = expect_no_more(response); + m_state = expect_no_response_packets(response); break; } } @@ -289,7 +327,7 @@ PacketTracker::State PacketTracker::com_stmt_fetch(const maxsql::ComResponse& re return new_state; } -PacketTracker::State PacketTracker::expect_no_more(const ComResponse& response) +PacketTracker::State PacketTracker::expect_no_response_packets(const ComResponse& response) { MXS_SERROR("PacketTracker unexpected " << response.type() << " in state " << m_state); return State::Error;