MXS-199: Support Causal Read in Read Write Splitting (#164)

* MXS-199: Support Causal Read in Read Write Splitting

* move most causal read logic into rwsplit router and get server type from monitor

* misc fix: remove new line

* refactor, move config to right place, replace ltrim with gwbuf_consume

* refacter a little for previous commit

* fix code style
This commit is contained in:
dapeng
2018-02-05 15:09:18 +08:00
committed by Johan Wikman
parent 0696f3f60a
commit 8a0c8e63f2
9 changed files with 222 additions and 3 deletions

View File

@ -423,4 +423,16 @@ extern void dprintAllBuffers(void *pdcb);
*/ */
void gwbuf_hexdump(GWBUF* buffer); void gwbuf_hexdump(GWBUF* buffer);
/**
* Return pointer of the byte at offset from start of chained buffer
* Warning: It not guaranteed to point to a contiguous segment of memory,
* it is only safe to modify the first byte this pointer point to.
*
* @param buffer one or more chained buffer
* @param offset Offset into the buffer
* @return if total buffer length is bigger than offset then return
* the offset byte pointer, otherwise return null
*/
extern uint8_t *gwbuf_byte_pointer(GWBUF* buffer, size_t offset);
MXS_END_DECLS MXS_END_DECLS

View File

@ -75,6 +75,12 @@ typedef struct server_version
uint32_t patch; uint32_t patch;
} SERVER_VERSION; } SERVER_VERSION;
typedef enum
{
SERVER_TYPE_MARIADB,
SERVER_TYPE_MYSQL
} server_type_t;
static inline void server_decode_version(uint64_t version, SERVER_VERSION* server_version) static inline void server_decode_version(uint64_t version, SERVER_VERSION* server_version)
{ {
uint32_t major = version / 10000; uint32_t major = version / 10000;
@ -120,6 +126,7 @@ typedef struct server
struct server *nextdb; /**< Next server in list attached to a service */ struct server *nextdb; /**< Next server in list attached to a service */
char version_string[MAX_SERVER_VERSION_LEN]; /**< Server version string, i.e. MySQL server version */ char version_string[MAX_SERVER_VERSION_LEN]; /**< Server version string, i.e. MySQL server version */
uint64_t version; /**< Server version */ uint64_t version; /**< Server version */
server_type_t server_type; /**< Server type */
long node_id; /**< Node id, server_id for M/S or local_index for Galera */ long node_id; /**< Node id, server_id for M/S or local_index for Galera */
int rlag; /**< Replication Lag for Master / Slave replication */ int rlag; /**< Replication Lag for Master / Slave replication */
unsigned long node_ts; /**< Last timestamp set from M/S monitor module */ unsigned long node_ts; /**< Last timestamp set from M/S monitor module */

View File

@ -883,6 +883,24 @@ size_t gwbuf_copy_data(const GWBUF *buffer, size_t offset, size_t bytes, uint8_t
return bytes_read; return bytes_read;
} }
uint8_t *gwbuf_byte_pointer(GWBUF *buffer, size_t offset)
{
uint8_t *rval = NULL;
// Ignore NULL buffer and walk past empty or too short buffers.
while (buffer && (GWBUF_LENGTH(buffer) <= offset))
{
offset -= GWBUF_LENGTH(buffer);
buffer = buffer->next;
}
if (buffer != NULL)
{
rval = (GWBUF_DATA(buffer) + offset);
}
return rval;
}
static std::string dump_one_buffer(GWBUF* buffer) static std::string dump_one_buffer(GWBUF* buffer)
{ {
std::string rval; std::string rval;

View File

@ -366,5 +366,14 @@ void mxs_mysql_set_server_version(MYSQL* mysql, SERVER* server)
unsigned long version = mysql_get_server_version(mysql); unsigned long version = mysql_get_server_version(mysql);
server_set_version(server, version_string, version); server_set_version(server, version_string, version);
if (strcasestr(version_string, "mariadb") != NULL)
{
server->server_type = SERVER_TYPE_MARIADB;
}
else
{
server->server_type = SERVER_TYPE_MYSQL;
}
} }
} }

View File

@ -1232,6 +1232,9 @@ create_capabilities(MySQLProtocol *conn, bool with_ssl, bool db_specified, bool
/** add session track */ /** add session track */
final_capabilities |= (uint32_t)GW_MYSQL_CAPABILITIES_SESSION_TRACK; final_capabilities |= (uint32_t)GW_MYSQL_CAPABILITIES_SESSION_TRACK;
/** support multi statments */
final_capabilities |= (uint32_t)GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS;
/* Compression is not currently supported */ /* Compression is not currently supported */
ss_dassert(!compress); ss_dassert(!compress);
if (compress) if (compress)
@ -1809,7 +1812,11 @@ void mxs_mysql_parse_ok_packet(GWBUF *buff, size_t packet_offset, size_t packet_
case SESSION_TRACK_STATE_CHANGE: case SESSION_TRACK_STATE_CHANGE:
case SESSION_TRACK_SCHEMA: case SESSION_TRACK_SCHEMA:
case SESSION_TRACK_GTIDS: case SESSION_TRACK_GTIDS:
mxs_lestr_consume(&ptr, &size); mxs_leint_consume(&ptr); // Length of the overall entity.
mxs_leint_consume(&ptr); // encoding specification
var_value = mxs_lestr_consume_dup(&ptr);
gwbuf_add_property(buff, (char *)"gtid", var_value);
MXS_FREE(var_value);
break; break;
case SESSION_TRACK_TRANSACTION_CHARACTERISTICS: case SESSION_TRACK_TRANSACTION_CHARACTERISTICS:
mxs_leint_consume(&ptr); //length mxs_leint_consume(&ptr); //length

View File

@ -744,6 +744,7 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session,
router(instance), router(instance),
sent_sescmd(0), sent_sescmd(0),
recv_sescmd(0), recv_sescmd(0),
gtid_pos(""),
rses_chk_tail(CHK_NUM_ROUTER_SES) rses_chk_tail(CHK_NUM_ROUTER_SES)
{ {
if (rses_config.rw_max_slave_conn_percent) if (rses_config.rw_max_slave_conn_percent)
@ -1142,6 +1143,62 @@ static void log_unexpected_response(DCB* dcb, GWBUF* buffer)
} }
} }
/**
* @bref discard the result of wait gtid statment, the result will be an error
* packet or an error packet.
* @param buffer origin reply buffer
* @param proto MySQLProtocol
* @return reset buffer
*/
GWBUF *discard_master_wait_gtid_result(GWBUF *buffer, RWSplitSession *rses)
{
uint8_t header_and_command[MYSQL_HEADER_LEN + 1];
uint8_t packet_len = 0;
uint8_t offset = 0;
mxs_mysql_cmd_t com;
gwbuf_copy_data(buffer, 0, MYSQL_HEADER_LEN + 1, header_and_command);
/* ignore error packet */
if (MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_ERR)
{
rses->wait_gtid_state = EXPECTING_NOTHING;
return buffer;
}
/* this packet must be an ok packet now */
ss_dassert(MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_OK);
packet_len = MYSQL_GET_PAYLOAD_LEN(header_and_command) + MYSQL_HEADER_LEN;
rses->wait_gtid_state = EXPECTING_REAL_RESULT;
rses->next_seq = 1;
return gwbuf_consume(buffer, packet_len);
}
/**
* @bref After discarded the wait result, we need correct the seqence number of every packet
*
* @param buffer origin reply buffer
* @param proto MySQLProtocol
*
*/
void correct_packet_sequence(GWBUF *buffer, RWSplitSession *rses)
{
uint8_t header[3];
uint32_t offset = 0;
uint32_t packet_len = 0;
if (rses->wait_gtid_state == EXPECTING_REAL_RESULT)
{
while (gwbuf_copy_data(buffer, offset, 3, header) == 3)
{
packet_len = MYSQL_GET_PAYLOAD_LEN(header) + MYSQL_HEADER_LEN;
uint8_t *seq = gwbuf_byte_pointer(buffer, offset + MYSQL_SEQ_OFFSET);
*seq = rses->next_seq;
rses->next_seq++;
offset += packet_len;
}
}
}
/** /**
* @brief Client Reply routine * @brief Client Reply routine
* *
@ -1156,12 +1213,34 @@ static void clientReply(MXS_ROUTER *instance,
DCB *backend_dcb) DCB *backend_dcb)
{ {
RWSplitSession *rses = (RWSplitSession *)router_session; RWSplitSession *rses = (RWSplitSession *)router_session;
RWSplit *inst = (RWSplit *)instance;
DCB *client_dcb = backend_dcb->session->client_dcb; DCB *client_dcb = backend_dcb->session->client_dcb;
CHK_CLIENT_RSES(rses); CHK_CLIENT_RSES(rses);
ss_dassert(!rses->rses_closed); ss_dassert(!rses->rses_closed);
SRWBackend& backend = get_backend_from_dcb(rses, backend_dcb); SRWBackend& backend = get_backend_from_dcb(rses, backend_dcb);
if (inst->config().enable_causal_read &&
GWBUF_IS_REPLY_OK(writebuf) &&
backend == rses->current_master)
{
/** Save gtid position */
char *tmp = gwbuf_get_property(writebuf, (char *)"gtid");
if (tmp)
{
rses->gtid_pos = std::string(tmp);
}
}
if (rses->wait_gtid_state == EXPECTING_WAIT_GTID_RESULT)
{
writebuf = discard_master_wait_gtid_result(writebuf, rses);
}
if (rses->wait_gtid_state == EXPECTING_REAL_RESULT)
{
correct_packet_sequence(writebuf, rses);
}
if (backend->get_reply_state() == REPLY_STATE_DONE) if (backend->get_reply_state() == REPLY_STATE_DONE)
{ {
/** If we receive an unexpected response from the server, the internal /** If we receive an unexpected response from the server, the internal
@ -1437,6 +1516,8 @@ MXS_MODULE *MXS_CREATE_MODULE()
{"strict_sp_calls", MXS_MODULE_PARAM_BOOL, "false"}, {"strict_sp_calls", MXS_MODULE_PARAM_BOOL, "false"},
{"master_accept_reads", MXS_MODULE_PARAM_BOOL, "false"}, {"master_accept_reads", MXS_MODULE_PARAM_BOOL, "false"},
{"connection_keepalive", MXS_MODULE_PARAM_COUNT, "0"}, {"connection_keepalive", MXS_MODULE_PARAM_COUNT, "0"},
{"enable_causal_read", MXS_MODULE_PARAM_BOOL, "false"},
{"causal_read_timeout", MXS_MODULE_PARAM_STRING, "0"},
{MXS_END_MODULE_PARAMS} {MXS_END_MODULE_PARAMS}
} }
}; };

View File

@ -145,6 +145,12 @@ enum ld_state
#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \ #define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED)); (SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED));
#define MARIADB_WAIT_GTID_FUNC "MASTER_GTID_WAIT"
#define MYSQL_WAIT_GTID_FUNC "WAIT_FOR_EXECUTED_GTID_SET"
static const char gtid_wait_stmt[] =
"SET @maxscale_secret_variable=(SELECT CASE WHEN %s('%s', %s) = 0 "
"THEN 1 ELSE (SELECT 1 FROM INFORMATION_SCHEMA.ENGINES) END);";
struct Config struct Config
{ {
Config(MXS_CONFIG_PARAMETER* params): Config(MXS_CONFIG_PARAMETER* params):
@ -166,8 +172,14 @@ struct Config
connection_keepalive(config_get_integer(params, "connection_keepalive")), connection_keepalive(config_get_integer(params, "connection_keepalive")),
max_slave_replication_lag(config_get_integer(params, "max_slave_replication_lag")), max_slave_replication_lag(config_get_integer(params, "max_slave_replication_lag")),
rw_max_slave_conn_percent(0), rw_max_slave_conn_percent(0),
max_slave_connections(0) max_slave_connections(0),
enable_causal_read(config_get_bool(params, "enable_causal_read")),
causal_read_timeout(config_get_string(params, "causal_read_timeout"))
{ {
if (enable_causal_read)
{
retry_failed_reads = true;
}
} }
select_criteria_t slave_selection_criteria; /**< The slave selection criteria */ select_criteria_t slave_selection_criteria; /**< The slave selection criteria */
@ -187,6 +199,8 @@ struct Config
int rw_max_slave_conn_percent; /**< Maximum percentage of slaves to use for int rw_max_slave_conn_percent; /**< Maximum percentage of slaves to use for
* each connection*/ * each connection*/
int max_slave_connections; /**< Maximum number of slaves for each connection*/ int max_slave_connections; /**< Maximum number of slaves for each connection*/
bool enable_causal_read; /**< Enable causual read */
std::string causal_read_timeout; /**< Timetout, second parameter of function master_wait_gtid */
}; };

View File

@ -23,6 +23,7 @@
#include <maxscale/hk_heartbeat.h> #include <maxscale/hk_heartbeat.h>
#include <maxscale/modutil.h> #include <maxscale/modutil.h>
#include <maxscale/router.h> #include <maxscale/router.h>
#include <maxscale/server.h>
/** /**
* The functions that support the routing of queries to back end * The functions that support the routing of queries to back end
@ -1107,6 +1108,58 @@ static inline bool is_large_query(GWBUF* buf)
return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN; return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN;
} }
/*
* Add a wait gitd query in front of user's query to achive causal read;
*
* @param inst RWSplit
* @param rses RWSplitSession
* @param server SERVER
* @param origin origin send buffer
* @return A new buffer contains wait statement and origin query
*/
GWBUF *add_prefix_wait_gtid(RWSplit *inst, RWSplitSession *rses, SERVER *server, GWBUF *origin)
{
/**
* Pack wait function and client query into a multistatments will save a round trip latency,
* and prevent the client query being executed on timeout.
* For example:
* SET @maxscale_secret_variable=(SELECT CASE WHEN MASTER_GTID_WAIT('232-1-1', 10) = 0
* THEN 1 ELSE (SELECT 1 FROM INFORMATION_SCHEMA.ENGINES) END); SELECT * FROM `city`;
* when MASTER_GTID_WAIT('232-1-1', 0.05) == 1 (timeout), it will return
* an error, and SELECT * FROM `city` will not be executed, then we can retry
* on master;
**/
GWBUF *rval;
const char* wait_func = (server->server_type == SERVER_TYPE_MARIADB) ?
MARIADB_WAIT_GTID_FUNC : MYSQL_WAIT_GTID_FUNC;
const char *gtid_wait_timeout = inst->config().causal_read_timeout.c_str();
const char *gtid_pos = rses->gtid_pos.c_str();
/* Create a new buffer to store prefix sql */
size_t prefix_len = strlen(gtid_wait_stmt) + strlen(gtid_pos) +
strlen(gtid_wait_timeout) + strlen(wait_func);
char prefix_sql[prefix_len];
snprintf(prefix_sql, prefix_len, gtid_wait_stmt, wait_func, gtid_pos, gtid_wait_timeout);
GWBUF *prefix_buff = modutil_create_query(prefix_sql);
/* Trim origin to sql, Append origin buffer to the prefix buffer */
uint8_t header[MYSQL_HEADER_LEN];
gwbuf_copy_data(origin, 0, MYSQL_HEADER_LEN, header);
/* Command length = 1 */
size_t origin_sql_len = MYSQL_GET_PAYLOAD_LEN(header) - 1;
/* Trim mysql header and command */
origin = gwbuf_consume(origin, MYSQL_HEADER_LEN + 1);
rval = gwbuf_append(prefix_buff, origin);
/* Modify totol length: Prefix sql len + origin sql len + command len */
size_t new_payload_len = strlen(prefix_sql) + origin_sql_len + 1;
gw_mysql_set_byte3(GWBUF_DATA(rval), new_payload_len);
return rval;
}
/** /**
* @brief Handle writing to a target server * @brief Handle writing to a target server
* *
@ -1134,7 +1187,14 @@ bool handle_got_target(RWSplit *inst, RWSplitSession *rses,
ss_dassert(target->session_command_count() == 0); ss_dassert(target->session_command_count() == 0);
mxs::Backend::response_type response = mxs::Backend::NO_RESPONSE; mxs::Backend::response_type response = mxs::Backend::NO_RESPONSE;
rses->wait_gtid_state = EXPECTING_NOTHING;
uint8_t cmd = mxs_mysql_get_command(querybuf); uint8_t cmd = mxs_mysql_get_command(querybuf);
GWBUF *send_buf = gwbuf_clone(querybuf); ;
if (cmd == COM_QUERY && inst->config().enable_causal_read && rses->gtid_pos != "")
{
send_buf = add_prefix_wait_gtid(inst, rses, target->server(), send_buf);
rses->wait_gtid_state = EXPECTING_WAIT_GTID_RESULT;
}
if (rses->load_data_state != LOAD_DATA_ACTIVE && if (rses->load_data_state != LOAD_DATA_ACTIVE &&
query_creates_reply(cmd)) query_creates_reply(cmd))
@ -1144,7 +1204,7 @@ bool handle_got_target(RWSplit *inst, RWSplitSession *rses,
bool large_query = is_large_query(querybuf); bool large_query = is_large_query(querybuf);
if (target->write(gwbuf_clone(querybuf), response)) if (target->write(send_buf, response))
{ {
if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target->server())) if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target->server()))
{ {

View File

@ -14,6 +14,7 @@
#include "readwritesplit.hh" #include "readwritesplit.hh"
#include "rwsplit_ps.hh" #include "rwsplit_ps.hh"
#include <string>
#include <maxscale/modutil.h> #include <maxscale/modutil.h>
@ -27,6 +28,13 @@ enum reply_state_t
REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */ REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */
}; };
typedef enum
{
EXPECTING_NOTHING = 0,
EXPECTING_WAIT_GTID_RESULT,
EXPECTING_REAL_RESULT
} wait_gtid_state_t;
/** Reply state change debug logging */ /** Reply state change debug logging */
#define LOG_RS(a, b) MXS_DEBUG("%s %s -> %s", (a)->uri(), \ #define LOG_RS(a, b) MXS_DEBUG("%s %s -> %s", (a)->uri(), \
rstostr((a)->get_reply_state()), rstostr(b)); rstostr((a)->get_reply_state()), rstostr(b));
@ -138,6 +146,9 @@ public:
PSManager ps_manager; /**< Prepared statement manager*/ PSManager ps_manager; /**< Prepared statement manager*/
ClientHandleMap ps_handles; /**< Client PS handle to internal ID mapping */ ClientHandleMap ps_handles; /**< Client PS handle to internal ID mapping */
ExecMap exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */ ExecMap exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */
std::string gtid_pos; /**< Gtid position for causal read */
wait_gtid_state_t wait_gtid_state; /**< Determine boundray of wait gtid result and client query result */
uint32_t next_seq; /**< Next packet'ssequence number */
skygw_chk_t rses_chk_tail; skygw_chk_t rses_chk_tail;
private: private: