diff --git a/include/maxscale/buffer.h b/include/maxscale/buffer.h index 7a2e754b3..9b74aeda3 100644 --- a/include/maxscale/buffer.h +++ b/include/maxscale/buffer.h @@ -423,4 +423,16 @@ extern void dprintAllBuffers(void *pdcb); */ void gwbuf_hexdump(GWBUF* buffer); +/** + * Return pointer of the byte at offset from start of chained buffer + * Warning: It not guaranteed to point to a contiguous segment of memory, + * it is only safe to modify the first byte this pointer point to. + * + * @param buffer one or more chained buffer + * @param offset Offset into the buffer + * @return if total buffer length is bigger than offset then return + * the offset byte pointer, otherwise return null + */ +extern uint8_t *gwbuf_byte_pointer(GWBUF* buffer, size_t offset); + MXS_END_DECLS diff --git a/include/maxscale/server.h b/include/maxscale/server.h index e1c82b42c..89a0e2cfc 100644 --- a/include/maxscale/server.h +++ b/include/maxscale/server.h @@ -75,6 +75,12 @@ typedef struct server_version uint32_t patch; } SERVER_VERSION; +typedef enum +{ + SERVER_TYPE_MARIADB, + SERVER_TYPE_MYSQL +} server_type_t; + static inline void server_decode_version(uint64_t version, SERVER_VERSION* server_version) { uint32_t major = version / 10000; @@ -120,6 +126,7 @@ typedef struct server struct server *nextdb; /**< Next server in list attached to a service */ char version_string[MAX_SERVER_VERSION_LEN]; /**< Server version string, i.e. MySQL server version */ uint64_t version; /**< Server version */ + server_type_t server_type; /**< Server type */ long node_id; /**< Node id, server_id for M/S or local_index for Galera */ int rlag; /**< Replication Lag for Master / Slave replication */ unsigned long node_ts; /**< Last timestamp set from M/S monitor module */ diff --git a/server/core/buffer.cc b/server/core/buffer.cc index 7e7cc1967..462e97746 100644 --- a/server/core/buffer.cc +++ b/server/core/buffer.cc @@ -883,6 +883,24 @@ size_t gwbuf_copy_data(const GWBUF *buffer, size_t offset, size_t bytes, uint8_t return bytes_read; } +uint8_t *gwbuf_byte_pointer(GWBUF *buffer, size_t offset) +{ + uint8_t *rval = NULL; + // Ignore NULL buffer and walk past empty or too short buffers. + while (buffer && (GWBUF_LENGTH(buffer) <= offset)) + { + offset -= GWBUF_LENGTH(buffer); + buffer = buffer->next; + } + + if (buffer != NULL) + { + rval = (GWBUF_DATA(buffer) + offset); + } + + return rval; +} + static std::string dump_one_buffer(GWBUF* buffer) { std::string rval; diff --git a/server/core/mysql_utils.cc b/server/core/mysql_utils.cc index e49ef39b5..512cd66b9 100644 --- a/server/core/mysql_utils.cc +++ b/server/core/mysql_utils.cc @@ -366,5 +366,14 @@ void mxs_mysql_set_server_version(MYSQL* mysql, SERVER* server) unsigned long version = mysql_get_server_version(mysql); server_set_version(server, version_string, version); + + if (strcasestr(version_string, "mariadb") != NULL) + { + server->server_type = SERVER_TYPE_MARIADB; + } + else + { + server->server_type = SERVER_TYPE_MYSQL; + } } } diff --git a/server/modules/protocol/MySQL/mysql_common.cc b/server/modules/protocol/MySQL/mysql_common.cc index 33a567c6c..ddc6d0735 100644 --- a/server/modules/protocol/MySQL/mysql_common.cc +++ b/server/modules/protocol/MySQL/mysql_common.cc @@ -1232,6 +1232,9 @@ create_capabilities(MySQLProtocol *conn, bool with_ssl, bool db_specified, bool /** add session track */ final_capabilities |= (uint32_t)GW_MYSQL_CAPABILITIES_SESSION_TRACK; + /** support multi statments */ + final_capabilities |= (uint32_t)GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS; + /* Compression is not currently supported */ ss_dassert(!compress); if (compress) @@ -1809,7 +1812,11 @@ void mxs_mysql_parse_ok_packet(GWBUF *buff, size_t packet_offset, size_t packet_ case SESSION_TRACK_STATE_CHANGE: case SESSION_TRACK_SCHEMA: case SESSION_TRACK_GTIDS: - mxs_lestr_consume(&ptr, &size); + mxs_leint_consume(&ptr); // Length of the overall entity. + mxs_leint_consume(&ptr); // encoding specification + var_value = mxs_lestr_consume_dup(&ptr); + gwbuf_add_property(buff, (char *)"gtid", var_value); + MXS_FREE(var_value); break; case SESSION_TRACK_TRANSACTION_CHARACTERISTICS: mxs_leint_consume(&ptr); //length diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index 9ba761a2c..40f2fd140 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -744,6 +744,7 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session, router(instance), sent_sescmd(0), recv_sescmd(0), + gtid_pos(""), rses_chk_tail(CHK_NUM_ROUTER_SES) { if (rses_config.rw_max_slave_conn_percent) @@ -1142,6 +1143,62 @@ static void log_unexpected_response(DCB* dcb, GWBUF* buffer) } } +/** + * @bref discard the result of wait gtid statment, the result will be an error + * packet or an error packet. + * @param buffer origin reply buffer + * @param proto MySQLProtocol + * @return reset buffer + */ +GWBUF *discard_master_wait_gtid_result(GWBUF *buffer, RWSplitSession *rses) +{ + uint8_t header_and_command[MYSQL_HEADER_LEN + 1]; + uint8_t packet_len = 0; + uint8_t offset = 0; + mxs_mysql_cmd_t com; + + gwbuf_copy_data(buffer, 0, MYSQL_HEADER_LEN + 1, header_and_command); + /* ignore error packet */ + if (MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_ERR) + { + rses->wait_gtid_state = EXPECTING_NOTHING; + return buffer; + } + + /* this packet must be an ok packet now */ + ss_dassert(MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_OK); + packet_len = MYSQL_GET_PAYLOAD_LEN(header_and_command) + MYSQL_HEADER_LEN; + rses->wait_gtid_state = EXPECTING_REAL_RESULT; + rses->next_seq = 1; + + return gwbuf_consume(buffer, packet_len); +} + +/** + * @bref After discarded the wait result, we need correct the seqence number of every packet + * + * @param buffer origin reply buffer + * @param proto MySQLProtocol + * + */ +void correct_packet_sequence(GWBUF *buffer, RWSplitSession *rses) +{ + uint8_t header[3]; + uint32_t offset = 0; + uint32_t packet_len = 0; + if (rses->wait_gtid_state == EXPECTING_REAL_RESULT) + { + while (gwbuf_copy_data(buffer, offset, 3, header) == 3) + { + packet_len = MYSQL_GET_PAYLOAD_LEN(header) + MYSQL_HEADER_LEN; + uint8_t *seq = gwbuf_byte_pointer(buffer, offset + MYSQL_SEQ_OFFSET); + *seq = rses->next_seq; + rses->next_seq++; + offset += packet_len; + } + } +} + /** * @brief Client Reply routine * @@ -1156,12 +1213,34 @@ static void clientReply(MXS_ROUTER *instance, DCB *backend_dcb) { RWSplitSession *rses = (RWSplitSession *)router_session; + RWSplit *inst = (RWSplit *)instance; DCB *client_dcb = backend_dcb->session->client_dcb; CHK_CLIENT_RSES(rses); ss_dassert(!rses->rses_closed); SRWBackend& backend = get_backend_from_dcb(rses, backend_dcb); + if (inst->config().enable_causal_read && + GWBUF_IS_REPLY_OK(writebuf) && + backend == rses->current_master) + { + /** Save gtid position */ + char *tmp = gwbuf_get_property(writebuf, (char *)"gtid"); + if (tmp) + { + rses->gtid_pos = std::string(tmp); + } + } + + if (rses->wait_gtid_state == EXPECTING_WAIT_GTID_RESULT) + { + writebuf = discard_master_wait_gtid_result(writebuf, rses); + } + if (rses->wait_gtid_state == EXPECTING_REAL_RESULT) + { + correct_packet_sequence(writebuf, rses); + } + if (backend->get_reply_state() == REPLY_STATE_DONE) { /** If we receive an unexpected response from the server, the internal @@ -1437,6 +1516,8 @@ MXS_MODULE *MXS_CREATE_MODULE() {"strict_sp_calls", MXS_MODULE_PARAM_BOOL, "false"}, {"master_accept_reads", MXS_MODULE_PARAM_BOOL, "false"}, {"connection_keepalive", MXS_MODULE_PARAM_COUNT, "0"}, + {"enable_causal_read", MXS_MODULE_PARAM_BOOL, "false"}, + {"causal_read_timeout", MXS_MODULE_PARAM_STRING, "0"}, {MXS_END_MODULE_PARAMS} } }; diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index d8008fbde..583c61c3e 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -145,6 +145,12 @@ enum ld_state #define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \ (SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED)); +#define MARIADB_WAIT_GTID_FUNC "MASTER_GTID_WAIT" +#define MYSQL_WAIT_GTID_FUNC "WAIT_FOR_EXECUTED_GTID_SET" +static const char gtid_wait_stmt[] = + "SET @maxscale_secret_variable=(SELECT CASE WHEN %s('%s', %s) = 0 " + "THEN 1 ELSE (SELECT 1 FROM INFORMATION_SCHEMA.ENGINES) END);"; + struct Config { Config(MXS_CONFIG_PARAMETER* params): @@ -166,8 +172,14 @@ struct Config connection_keepalive(config_get_integer(params, "connection_keepalive")), max_slave_replication_lag(config_get_integer(params, "max_slave_replication_lag")), rw_max_slave_conn_percent(0), - max_slave_connections(0) + max_slave_connections(0), + enable_causal_read(config_get_bool(params, "enable_causal_read")), + causal_read_timeout(config_get_string(params, "causal_read_timeout")) { + if (enable_causal_read) + { + retry_failed_reads = true; + } } select_criteria_t slave_selection_criteria; /**< The slave selection criteria */ @@ -187,6 +199,8 @@ struct Config int rw_max_slave_conn_percent; /**< Maximum percentage of slaves to use for * each connection*/ int max_slave_connections; /**< Maximum number of slaves for each connection*/ + bool enable_causal_read; /**< Enable causual read */ + std::string causal_read_timeout; /**< Timetout, second parameter of function master_wait_gtid */ }; diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 95dbf8fe0..142d2abf5 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -23,6 +23,7 @@ #include #include #include +#include /** * The functions that support the routing of queries to back end @@ -1107,6 +1108,58 @@ static inline bool is_large_query(GWBUF* buf) return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN; } +/* + * Add a wait gitd query in front of user's query to achive causal read; + * + * @param inst RWSplit + * @param rses RWSplitSession + * @param server SERVER + * @param origin origin send buffer + * @return A new buffer contains wait statement and origin query + */ +GWBUF *add_prefix_wait_gtid(RWSplit *inst, RWSplitSession *rses, SERVER *server, GWBUF *origin) +{ + + /** + * Pack wait function and client query into a multistatments will save a round trip latency, + * and prevent the client query being executed on timeout. + * For example: + * SET @maxscale_secret_variable=(SELECT CASE WHEN MASTER_GTID_WAIT('232-1-1', 10) = 0 + * THEN 1 ELSE (SELECT 1 FROM INFORMATION_SCHEMA.ENGINES) END); SELECT * FROM `city`; + * when MASTER_GTID_WAIT('232-1-1', 0.05) == 1 (timeout), it will return + * an error, and SELECT * FROM `city` will not be executed, then we can retry + * on master; + **/ + + GWBUF *rval; + const char* wait_func = (server->server_type == SERVER_TYPE_MARIADB) ? + MARIADB_WAIT_GTID_FUNC : MYSQL_WAIT_GTID_FUNC; + const char *gtid_wait_timeout = inst->config().causal_read_timeout.c_str(); + const char *gtid_pos = rses->gtid_pos.c_str(); + + /* Create a new buffer to store prefix sql */ + size_t prefix_len = strlen(gtid_wait_stmt) + strlen(gtid_pos) + + strlen(gtid_wait_timeout) + strlen(wait_func); + char prefix_sql[prefix_len]; + snprintf(prefix_sql, prefix_len, gtid_wait_stmt, wait_func, gtid_pos, gtid_wait_timeout); + GWBUF *prefix_buff = modutil_create_query(prefix_sql); + + /* Trim origin to sql, Append origin buffer to the prefix buffer */ + uint8_t header[MYSQL_HEADER_LEN]; + gwbuf_copy_data(origin, 0, MYSQL_HEADER_LEN, header); + /* Command length = 1 */ + size_t origin_sql_len = MYSQL_GET_PAYLOAD_LEN(header) - 1; + /* Trim mysql header and command */ + origin = gwbuf_consume(origin, MYSQL_HEADER_LEN + 1); + rval = gwbuf_append(prefix_buff, origin); + + /* Modify totol length: Prefix sql len + origin sql len + command len */ + size_t new_payload_len = strlen(prefix_sql) + origin_sql_len + 1; + gw_mysql_set_byte3(GWBUF_DATA(rval), new_payload_len); + + return rval; +} + /** * @brief Handle writing to a target server * @@ -1134,7 +1187,14 @@ bool handle_got_target(RWSplit *inst, RWSplitSession *rses, ss_dassert(target->session_command_count() == 0); mxs::Backend::response_type response = mxs::Backend::NO_RESPONSE; + rses->wait_gtid_state = EXPECTING_NOTHING; uint8_t cmd = mxs_mysql_get_command(querybuf); + GWBUF *send_buf = gwbuf_clone(querybuf); ; + if (cmd == COM_QUERY && inst->config().enable_causal_read && rses->gtid_pos != "") + { + send_buf = add_prefix_wait_gtid(inst, rses, target->server(), send_buf); + rses->wait_gtid_state = EXPECTING_WAIT_GTID_RESULT; + } if (rses->load_data_state != LOAD_DATA_ACTIVE && query_creates_reply(cmd)) @@ -1144,7 +1204,7 @@ bool handle_got_target(RWSplit *inst, RWSplitSession *rses, bool large_query = is_large_query(querybuf); - if (target->write(gwbuf_clone(querybuf), response)) + if (target->write(send_buf, response)) { if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target->server())) { diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 29511104d..3c41f67a2 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -14,6 +14,7 @@ #include "readwritesplit.hh" #include "rwsplit_ps.hh" +#include #include @@ -27,6 +28,13 @@ enum reply_state_t REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */ }; +typedef enum +{ + EXPECTING_NOTHING = 0, + EXPECTING_WAIT_GTID_RESULT, + EXPECTING_REAL_RESULT +} wait_gtid_state_t; + /** Reply state change debug logging */ #define LOG_RS(a, b) MXS_DEBUG("%s %s -> %s", (a)->uri(), \ rstostr((a)->get_reply_state()), rstostr(b)); @@ -138,6 +146,9 @@ public: PSManager ps_manager; /**< Prepared statement manager*/ ClientHandleMap ps_handles; /**< Client PS handle to internal ID mapping */ ExecMap exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */ + std::string gtid_pos; /**< Gtid position for causal read */ + wait_gtid_state_t wait_gtid_state; /**< Determine boundray of wait gtid result and client query result */ + uint32_t next_seq; /**< Next packet'ssequence number */ skygw_chk_t rses_chk_tail; private: