MXS-1506: Refactor causal read reply processing
The state could be factored out into a boolean variable as the reply processing can be in two states: Either waiting for the response to MASTER_GTID_WAIT or updating packet numbers. The packet number updating can always be done as long as a buffer is available. The discard_master_wait_gtid_result function discards the OK packet before the packet numbers are updated so any trailing packets get corrected properly.
This commit is contained in:
@ -945,13 +945,13 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool
|
||||
ss_dassert(!target->has_session_commands());
|
||||
|
||||
mxs::Backend::response_type response = mxs::Backend::NO_RESPONSE;
|
||||
m_wait_gtid_state = EXPECTING_NOTHING;
|
||||
uint8_t cmd = mxs_mysql_get_command(querybuf);
|
||||
GWBUF *send_buf = gwbuf_clone(querybuf);
|
||||
if (cmd == COM_QUERY && m_router->config().enable_causal_read && m_gtid_pos != "")
|
||||
|
||||
if (cmd == COM_QUERY && m_router->config().enable_causal_read && !m_gtid_pos .empty())
|
||||
{
|
||||
send_buf = add_prefix_wait_gtid(target->server(), send_buf);
|
||||
m_wait_gtid_state = EXPECTING_WAIT_GTID_RESULT;
|
||||
m_waiting_for_gtid = true;
|
||||
}
|
||||
|
||||
if (m_qc.load_data_state() != QueryClassifier::LOAD_DATA_ACTIVE &&
|
||||
|
@ -34,7 +34,7 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
||||
m_sent_sescmd(0),
|
||||
m_recv_sescmd(0),
|
||||
m_gtid_pos(""),
|
||||
m_wait_gtid_state(EXPECTING_NOTHING),
|
||||
m_waiting_for_gtid(false),
|
||||
m_next_seq(0),
|
||||
m_qc(this, session, instance->config().use_sql_variables_in),
|
||||
m_retry_duration(0)
|
||||
@ -213,31 +213,31 @@ bool RWSplitSession::route_stored_query()
|
||||
}
|
||||
|
||||
/**
|
||||
* @bref discard the result of wait gtid statment, the result will be an error
|
||||
* packet or an error packet.
|
||||
* @param buffer origin reply buffer
|
||||
* @param proto MySQLProtocol
|
||||
* @return reset buffer
|
||||
* @bref discard the result of MASTER_GTID_WAIT statement
|
||||
*
|
||||
* The result will be an error or an OK packet.
|
||||
*
|
||||
* @param buffer Original reply buffer
|
||||
*
|
||||
* @return Any data after the ERR/OK packet, NULL for no data
|
||||
*/
|
||||
GWBUF* RWSplitSession::discard_master_wait_gtid_result(GWBUF *buffer)
|
||||
{
|
||||
uint8_t header_and_command[MYSQL_HEADER_LEN + 1];
|
||||
// MASTER_WAIT_GTID is complete, discard the OK packet or return the ERR packet
|
||||
m_waiting_for_gtid = false;
|
||||
|
||||
uint8_t header_and_command[MYSQL_HEADER_LEN + 1];
|
||||
gwbuf_copy_data(buffer, 0, MYSQL_HEADER_LEN + 1, header_and_command);
|
||||
/* ignore error packet */
|
||||
if (MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_ERR)
|
||||
|
||||
if (MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_OK)
|
||||
{
|
||||
m_wait_gtid_state = EXPECTING_NOTHING;
|
||||
return buffer;
|
||||
// Discard the OK packet and start updating sequence numbers
|
||||
uint8_t packet_len = MYSQL_GET_PAYLOAD_LEN(header_and_command) + MYSQL_HEADER_LEN;
|
||||
m_next_seq = 1;
|
||||
buffer = gwbuf_consume(buffer, packet_len);
|
||||
}
|
||||
|
||||
/* this packet must be an ok packet now */
|
||||
ss_dassert(MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_OK);
|
||||
uint8_t packet_len = MYSQL_GET_PAYLOAD_LEN(header_and_command) + MYSQL_HEADER_LEN;
|
||||
m_wait_gtid_state = EXPECTING_REAL_RESULT;
|
||||
m_next_seq = 1;
|
||||
|
||||
return gwbuf_consume(buffer, packet_len);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -341,12 +341,12 @@ GWBUF* RWSplitSession::handle_causal_read_reply(GWBUF *writebuf, SRWBackend& bac
|
||||
}
|
||||
}
|
||||
|
||||
if (m_wait_gtid_state == EXPECTING_WAIT_GTID_RESULT)
|
||||
if (m_waiting_for_gtid)
|
||||
{
|
||||
writebuf = discard_master_wait_gtid_result(writebuf);
|
||||
}
|
||||
|
||||
if (writebuf && m_wait_gtid_state == EXPECTING_REAL_RESULT)
|
||||
if (writebuf)
|
||||
{
|
||||
correct_packet_sequence(writebuf);
|
||||
}
|
||||
|
@ -23,13 +23,6 @@
|
||||
#include <maxscale/modutil.h>
|
||||
#include <maxscale/queryclassifier.hh>
|
||||
|
||||
typedef enum
|
||||
{
|
||||
EXPECTING_NOTHING = 0,
|
||||
EXPECTING_WAIT_GTID_RESULT,
|
||||
EXPECTING_REAL_RESULT
|
||||
} wait_gtid_state_t;
|
||||
|
||||
typedef std::map<uint32_t, uint32_t> ClientHandleMap; /** External ID to internal ID */
|
||||
|
||||
typedef std::tr1::unordered_set<std::string> TableSet;
|
||||
@ -124,7 +117,7 @@ public:
|
||||
ClientHandleMap m_ps_handles; /**< Client PS handle to internal ID mapping */
|
||||
ExecMap m_exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */
|
||||
std::string m_gtid_pos; /**< Gtid position for causal read */
|
||||
wait_gtid_state_t m_wait_gtid_state; /**< Determine boundary of generated query result */
|
||||
bool m_waiting_for_gtid; /**< Waiting for MASTER_GTID_WAIT reply */
|
||||
uint32_t m_next_seq; /**< Next packet's sequence number */
|
||||
mxs::QueryClassifier m_qc; /**< The query classifier. */
|
||||
uint64_t m_retry_duration; /**< Total time spent retrying queries */
|
||||
|
Reference in New Issue
Block a user