MXS-1778: Add support for MariaDB GTID tracking

The MariaDB implementation allows the last GTID to be tracked with the
`last_gtid` variable. To do this, the configuration option
`session_track_system_variables=last_gtid` must be used or it must be
enabled at runtime.
This commit is contained in:
Markus Mäkelä
2018-05-20 22:35:03 +03:00
parent 6d1c0e5ba6
commit 4ba0ac434b
5 changed files with 41 additions and 16 deletions

View File

@ -314,6 +314,12 @@ typedef enum
static const mxs_mysql_cmd_t MXS_COM_UNDEFINED = (mxs_mysql_cmd_t) - 1; static const mxs_mysql_cmd_t MXS_COM_UNDEFINED = (mxs_mysql_cmd_t) - 1;
/**
* A GWBUF property with this name will contain the latest GTID in string form.
* This information is only available in OK packets.
*/
static const char* const MXS_LAST_GTID = "last_gtid";
/** /**
* List of server commands, and number of response packets are stored here. * List of server commands, and number of response packets are stored here.
* server_command_t is used in MySQLProtocol structure, so for each DCB there is * server_command_t is used in MySQLProtocol structure, so for each DCB there is

View File

@ -1871,13 +1871,13 @@ void mxs_mysql_parse_ok_packet(GWBUF *buff, size_t packet_offset, size_t packet_
mxs_leint_consume(&ptr); // Length of the overall entity. mxs_leint_consume(&ptr); // Length of the overall entity.
mxs_leint_consume(&ptr); // encoding specification mxs_leint_consume(&ptr); // encoding specification
var_value = mxs_lestr_consume_dup(&ptr); var_value = mxs_lestr_consume_dup(&ptr);
gwbuf_add_property(buff, (char *)"gtid", var_value); gwbuf_add_property(buff, MXS_LAST_GTID, var_value);
MXS_FREE(var_value); MXS_FREE(var_value);
break; break;
case SESSION_TRACK_TRANSACTION_CHARACTERISTICS: case SESSION_TRACK_TRANSACTION_CHARACTERISTICS:
mxs_leint_consume(&ptr); //length mxs_leint_consume(&ptr); //length
var_value = mxs_lestr_consume_dup(&ptr); var_value = mxs_lestr_consume_dup(&ptr);
gwbuf_add_property(buff, (char *)"trx_characteristics", var_value); gwbuf_add_property(buff, "trx_characteristics", var_value);
MXS_FREE(var_value); MXS_FREE(var_value);
break; break;
case SESSION_TRACK_SYSTEM_VARIABLES: case SESSION_TRACK_SYSTEM_VARIABLES:

View File

@ -955,7 +955,7 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool
if (cmd == COM_QUERY && m_router->config().enable_causal_read && !m_gtid_pos .empty()) if (cmd == COM_QUERY && m_router->config().enable_causal_read && !m_gtid_pos .empty())
{ {
send_buf = add_prefix_wait_gtid(target->server(), send_buf); send_buf = add_prefix_wait_gtid(target->server(), send_buf);
m_waiting_for_gtid = true; m_wait_gtid = WAITING_FOR_HEADER;
} }
if (m_qc.load_data_state() != QueryClassifier::LOAD_DATA_ACTIVE && if (m_qc.load_data_state() != QueryClassifier::LOAD_DATA_ACTIVE &&

View File

@ -36,7 +36,7 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session,
m_sent_sescmd(0), m_sent_sescmd(0),
m_recv_sescmd(0), m_recv_sescmd(0),
m_gtid_pos(""), m_gtid_pos(""),
m_waiting_for_gtid(false), m_wait_gtid(NONE),
m_next_seq(0), m_next_seq(0),
m_qc(this, session, instance->config().use_sql_variables_in), m_qc(this, session, instance->config().use_sql_variables_in),
m_retry_duration(0), m_retry_duration(0),
@ -245,19 +245,24 @@ bool RWSplitSession::route_stored_query()
*/ */
GWBUF* RWSplitSession::discard_master_wait_gtid_result(GWBUF *buffer) GWBUF* RWSplitSession::discard_master_wait_gtid_result(GWBUF *buffer)
{ {
// MASTER_WAIT_GTID is complete, discard the OK packet or return the ERR packet
m_waiting_for_gtid = false;
uint8_t header_and_command[MYSQL_HEADER_LEN + 1]; uint8_t header_and_command[MYSQL_HEADER_LEN + 1];
gwbuf_copy_data(buffer, 0, MYSQL_HEADER_LEN + 1, header_and_command); gwbuf_copy_data(buffer, 0, MYSQL_HEADER_LEN + 1, header_and_command);
if (MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_OK) if (MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_OK)
{ {
// MASTER_WAIT_GTID is complete, discard the OK packet or return the ERR packet
m_wait_gtid = UPDATING_PACKETS;
// Discard the OK packet and start updating sequence numbers // Discard the OK packet and start updating sequence numbers
uint8_t packet_len = MYSQL_GET_PAYLOAD_LEN(header_and_command) + MYSQL_HEADER_LEN; uint8_t packet_len = MYSQL_GET_PAYLOAD_LEN(header_and_command) + MYSQL_HEADER_LEN;
m_next_seq = 1; m_next_seq = 1;
buffer = gwbuf_consume(buffer, packet_len); buffer = gwbuf_consume(buffer, packet_len);
} }
else if (MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_ERR)
{
// The MASTER_WAIT_GTID command failed and no further packets will come
m_wait_gtid = NONE;
}
return buffer; return buffer;
} }
@ -356,19 +361,19 @@ GWBUF* RWSplitSession::handle_causal_read_reply(GWBUF *writebuf, SRWBackend& bac
if (GWBUF_IS_REPLY_OK(writebuf) && backend == m_current_master) if (GWBUF_IS_REPLY_OK(writebuf) && backend == m_current_master)
{ {
/** Save gtid position */ /** Save gtid position */
char *tmp = gwbuf_get_property(writebuf, (char *)"gtid"); char *tmp = gwbuf_get_property(writebuf, MXS_LAST_GTID);
if (tmp) if (tmp)
{ {
m_gtid_pos = std::string(tmp); m_gtid_pos = std::string(tmp);
} }
} }
if (m_waiting_for_gtid) if (m_wait_gtid == WAITING_FOR_HEADER)
{ {
writebuf = discard_master_wait_gtid_result(writebuf); writebuf = discard_master_wait_gtid_result(writebuf);
} }
if (writebuf) if (m_wait_gtid == UPDATING_PACKETS && writebuf)
{ {
correct_packet_sequence(writebuf); correct_packet_sequence(writebuf);
} }
@ -420,11 +425,6 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
SRWBackend& backend = get_backend_from_dcb(backend_dcb); SRWBackend& backend = get_backend_from_dcb(backend_dcb);
if ((writebuf = handle_causal_read_reply(writebuf, backend)) == NULL)
{
return; // Nothing to route, return
}
if (backend->get_reply_state() == REPLY_STATE_DONE) if (backend->get_reply_state() == REPLY_STATE_DONE)
{ {
/** If we receive an unexpected response from the server, the internal /** If we receive an unexpected response from the server, the internal
@ -435,6 +435,11 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
return; return;
} }
if ((writebuf = handle_causal_read_reply(writebuf, backend)) == NULL)
{
return; // Nothing to route, return
}
if (m_config.transaction_replay && m_can_replay_trx && if (m_config.transaction_replay && m_can_replay_trx &&
session_trx_is_active(m_client->session)) session_trx_is_active(m_client->session))
{ {
@ -469,6 +474,13 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE); ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
MXS_INFO("Reply complete, last reply from %s", backend->name()); MXS_INFO("Reply complete, last reply from %s", backend->name());
if (m_config.enable_causal_read)
{
// The reply should never be complete while we are still waiting for the header.
ss_dassert(m_wait_gtid != WAITING_FOR_HEADER);
m_wait_gtid = NONE;
}
if (backend->local_infile_requested()) if (backend->local_infile_requested())
{ {
// Server requested a local file, go into data streaming mode // Server requested a local file, go into data streaming mode

View File

@ -59,6 +59,13 @@ public:
TARGET_RLAG_MAX = maxscale::QueryClassifier::TARGET_RLAG_MAX, TARGET_RLAG_MAX = maxscale::QueryClassifier::TARGET_RLAG_MAX,
}; };
enum wait_gtid_state
{
NONE,
WAITING_FOR_HEADER,
UPDATING_PACKETS
};
virtual ~RWSplitSession() virtual ~RWSplitSession()
{ {
} }
@ -132,7 +139,7 @@ public:
ClientHandleMap m_ps_handles; /**< Client PS handle to internal ID mapping */ ClientHandleMap m_ps_handles; /**< Client PS handle to internal ID mapping */
ExecMap m_exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */ ExecMap m_exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */
std::string m_gtid_pos; /**< Gtid position for causal read */ std::string m_gtid_pos; /**< Gtid position for causal read */
bool m_waiting_for_gtid; /**< Waiting for MASTER_GTID_WAIT reply */ wait_gtid_state m_wait_gtid; /**< State of MASTER_GTID_WAIT reply */
uint32_t m_next_seq; /**< Next packet's sequence number */ uint32_t m_next_seq; /**< Next packet's sequence number */
mxs::QueryClassifier m_qc; /**< The query classifier. */ mxs::QueryClassifier m_qc; /**< The query classifier. */
uint64_t m_retry_duration; /**< Total time spent retrying queries */ uint64_t m_retry_duration; /**< Total time spent retrying queries */