MXS-2485 Add suport for split query packets (from client side)
This commit is contained in:
@ -684,13 +684,13 @@ private:
|
|||||||
uint8_t m_payload_offset;
|
uint8_t m_payload_offset;
|
||||||
};
|
};
|
||||||
|
|
||||||
std::ostream& operator<<(std::ostream& os, ComResponse::Type type)
|
inline std::ostream& operator<<(std::ostream& os, ComResponse::Type type)
|
||||||
{
|
{
|
||||||
static const std::array<std::string, 6> type_names = {
|
static const std::array<std::string, 6> type_names = {
|
||||||
"Ok", "Err", "Eof", "LocalInfile", "Data"
|
"Ok", "Err", "Eof", "LocalInfile", "Data"
|
||||||
};
|
};
|
||||||
|
|
||||||
auto ind = size_t(type);
|
auto ind = static_cast<size_t>(type);
|
||||||
return os << ((ind < type_names.size()) ? type_names[ind] : "UNKNOWN");
|
return os << ((ind < type_names.size()) ? type_names[ind] : "UNKNOWN");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -22,21 +22,36 @@ namespace maxsql
|
|||||||
|
|
||||||
class ComResponse;
|
class ComResponse;
|
||||||
|
|
||||||
// Minimal class to track the lifetime of a query.
|
|
||||||
// TODO add documentation
|
// TODO add documentation
|
||||||
class PacketTracker
|
class PacketTracker
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
// The State reflects the response status. For the unlikely event that the query is split, but
|
||||||
|
// no response is expected, the tracker may still be waiting for packets from the client while
|
||||||
|
// m_state = Done. The function expecting_more_packets() would return true in this case.
|
||||||
|
// TODO, maybe, rename to ResponseState.
|
||||||
enum class State {FirstPacket, Field, FieldEof, Row,
|
enum class State {FirstPacket, Field, FieldEof, Row,
|
||||||
ComFieldList, ComStatistics, ComStmtFetch,
|
ComFieldList, ComStatistics, ComStmtFetch,
|
||||||
Done, ErrorPacket, Error};
|
Done, ErrorPacket, Error};
|
||||||
|
|
||||||
PacketTracker() = default;
|
PacketTracker() = default;
|
||||||
|
|
||||||
explicit PacketTracker(GWBUF* pQuery); // Track this query
|
explicit PacketTracker(GWBUF* pQuery); // Track this query
|
||||||
|
|
||||||
void update(GWBUF* pPacket); // Update as packets are received.
|
PacketTracker(const PacketTracker&) = delete;
|
||||||
|
PacketTracker& operator=(const PacketTracker&) = delete;
|
||||||
|
|
||||||
|
PacketTracker(PacketTracker&&) = default;
|
||||||
|
PacketTracker& operator=(PacketTracker&&) = default;
|
||||||
|
|
||||||
|
bool update_request(GWBUF* pPacket); // Updates the query (must be a split packet)
|
||||||
|
void update_response(GWBUF* pPacket); // Update as response packets are received.
|
||||||
|
|
||||||
|
bool expecting_request_packets() const;
|
||||||
|
bool expecting_response_packets() const;
|
||||||
|
bool expecting_more_packets() const;
|
||||||
|
|
||||||
State state() const;
|
State state() const;
|
||||||
bool expecting_more_packets() const;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// State functions.
|
// State functions.
|
||||||
@ -47,12 +62,12 @@ private:
|
|||||||
State com_field_list(const ComResponse& response);
|
State com_field_list(const ComResponse& response);
|
||||||
State com_statistics(const ComResponse& response);
|
State com_statistics(const ComResponse& response);
|
||||||
State com_stmt_fetch(const ComResponse& response);
|
State com_stmt_fetch(const ComResponse& response);
|
||||||
|
State expect_no_response_packets(const ComResponse& response); // states: Done, ErrorPacket, Error
|
||||||
State expect_no_more(const ComResponse& response); // states: Done, ErrorPacket, Error
|
|
||||||
|
|
||||||
State m_state = State::Error;
|
State m_state = State::Error;
|
||||||
bool m_client_packet_internal = false;
|
bool m_client_com_packet_internal = false;
|
||||||
bool m_server_packet_internal = false;
|
bool m_server_com_packet_internal = false;
|
||||||
|
bool m_expect_more_split_query_packets = false;
|
||||||
|
|
||||||
int m_command;
|
int m_command;
|
||||||
int m_total_fields;
|
int m_total_fields;
|
||||||
|
|||||||
@ -11,9 +11,7 @@
|
|||||||
* Public License.
|
* Public License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// TODO handle client split packets
|
// LIMITATION: local infile not handled yet.
|
||||||
// TODO handle local infile
|
|
||||||
// TODO do cursors need more special handling than ComStmtFetch has.
|
|
||||||
|
|
||||||
#include <maxsql/packet_tracker.hh>
|
#include <maxsql/packet_tracker.hh>
|
||||||
#include <maxsql/mysql_plus.hh>
|
#include <maxsql/mysql_plus.hh>
|
||||||
@ -38,10 +36,11 @@ std::ostream& operator<<(std::ostream& os, PacketTracker::State state)
|
|||||||
|
|
||||||
PacketTracker::PacketTracker(GWBUF* pPacket)
|
PacketTracker::PacketTracker(GWBUF* pPacket)
|
||||||
{
|
{
|
||||||
ComRequest request(ComPacket(pPacket, &m_client_packet_internal));
|
ComRequest request(ComPacket(pPacket, &m_client_com_packet_internal));
|
||||||
m_command = request.command();
|
m_command = request.command();
|
||||||
|
m_expect_more_split_query_packets = request.is_split_leader();
|
||||||
|
|
||||||
MXS_SINFO("PacketTracker Command: " << STRPACKETTYPE(m_command)); // TODO remove or change to debug
|
MXS_SDEBUG("PacketTracker Command: " << STRPACKETTYPE(m_command));
|
||||||
|
|
||||||
if (request.server_will_respond())
|
if (request.server_will_respond())
|
||||||
{
|
{
|
||||||
@ -70,7 +69,38 @@ PacketTracker::PacketTracker(GWBUF* pPacket)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool PacketTracker::expecting_more_packets() const
|
bool PacketTracker::update_request(GWBUF* pPacket)
|
||||||
|
{
|
||||||
|
MXS_SDEBUG("PacketTracker update_request: " << STRPACKETTYPE(m_command));
|
||||||
|
ComPacket com_packet(pPacket, &m_client_com_packet_internal);
|
||||||
|
|
||||||
|
if (!m_expect_more_split_query_packets)
|
||||||
|
{
|
||||||
|
MXS_SERROR("PacketTracker::update_request() called while not expecting splits");
|
||||||
|
mxb_assert(!true);
|
||||||
|
m_state = State::Error;
|
||||||
|
}
|
||||||
|
else if (!com_packet.is_split_continuation())
|
||||||
|
{
|
||||||
|
MXS_SERROR("PacketTracker::update_request() received a non-split packet");
|
||||||
|
mxb_assert(!true);
|
||||||
|
m_state = State::Error;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (com_packet.is_split_trailer())
|
||||||
|
{
|
||||||
|
m_expect_more_split_query_packets = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return m_state != State::Error;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool PacketTracker::expecting_request_packets() const
|
||||||
|
{
|
||||||
|
return m_expect_more_split_query_packets;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool PacketTracker::expecting_response_packets() const
|
||||||
{
|
{
|
||||||
switch (m_state)
|
switch (m_state)
|
||||||
{
|
{
|
||||||
@ -84,21 +114,29 @@ bool PacketTracker::expecting_more_packets() const
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool PacketTracker::expecting_more_packets() const
|
||||||
|
{
|
||||||
|
return expecting_response_packets() || expecting_request_packets();
|
||||||
|
}
|
||||||
|
|
||||||
static constexpr std::array<PacketTracker::State, 5> data_states {
|
static constexpr std::array<PacketTracker::State, 5> data_states {
|
||||||
PacketTracker::State::Field, PacketTracker::State::Row,
|
PacketTracker::State::Field,
|
||||||
PacketTracker::State::ComFieldList, PacketTracker::State::ComStatistics,
|
PacketTracker::State::Row,
|
||||||
|
PacketTracker::State::ComFieldList,
|
||||||
|
PacketTracker::State::ComStatistics,
|
||||||
PacketTracker::State::ComStmtFetch
|
PacketTracker::State::ComStmtFetch
|
||||||
};
|
};
|
||||||
|
|
||||||
void PacketTracker::update(GWBUF* pPacket)
|
void PacketTracker::update_response(GWBUF* pPacket)
|
||||||
{
|
{
|
||||||
ComPacket com_packet(pPacket, &m_server_packet_internal);
|
ComPacket com_packet(pPacket, &m_server_com_packet_internal);
|
||||||
|
|
||||||
bool expect_data_only = std::find(begin(data_states), end(data_states), m_state) != end(data_states);
|
bool expect_data_only = std::find(begin(data_states), end(data_states), m_state) != end(data_states);
|
||||||
ComResponse response(com_packet, expect_data_only);
|
ComResponse response(com_packet, expect_data_only);
|
||||||
|
|
||||||
if (response.is_split_continuation())
|
if (response.is_split_continuation())
|
||||||
{ // no state change, just more of the same data
|
{ // no state change, just more of the same data
|
||||||
|
MXS_SDEBUG("PacketTracker::update_response IGNORE trailing split packets");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -141,7 +179,7 @@ void PacketTracker::update(GWBUF* pPacket)
|
|||||||
case State::Done:
|
case State::Done:
|
||||||
case State::ErrorPacket:
|
case State::ErrorPacket:
|
||||||
case State::Error:
|
case State::Error:
|
||||||
m_state = expect_no_more(response);
|
m_state = expect_no_response_packets(response);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -289,7 +327,7 @@ PacketTracker::State PacketTracker::com_stmt_fetch(const maxsql::ComResponse& re
|
|||||||
return new_state;
|
return new_state;
|
||||||
}
|
}
|
||||||
|
|
||||||
PacketTracker::State PacketTracker::expect_no_more(const ComResponse& response)
|
PacketTracker::State PacketTracker::expect_no_response_packets(const ComResponse& response)
|
||||||
{
|
{
|
||||||
MXS_SERROR("PacketTracker unexpected " << response.type() << " in state " << m_state);
|
MXS_SERROR("PacketTracker unexpected " << response.type() << " in state " << m_state);
|
||||||
return State::Error;
|
return State::Error;
|
||||||
|
|||||||
Reference in New Issue
Block a user