diff --git a/server/modules/routing/binlogrouter/blr_master.c b/server/modules/routing/binlogrouter/blr_master.c index f0db993ca..af84159c2 100644 --- a/server/modules/routing/binlogrouter/blr_master.c +++ b/server/modules/routing/binlogrouter/blr_master.c @@ -118,7 +118,7 @@ static void blr_register_serverid(ROUTER_INSTANCE *router, GWBUF *buf); static void blr_register_heartbeat(ROUTER_INSTANCE *router, GWBUF *buf); static void blr_register_setchecksum(ROUTER_INSTANCE *router, GWBUF *buf); static void blr_register_getchecksum(ROUTER_INSTANCE *router, GWBUF *buf); -static void blr_register_handlechecksum(ROUTER_INSTANCE *router, GWBUF *buf); +static void blr_register_handle_checksum(ROUTER_INSTANCE *router, GWBUF *buf); static void blr_register_mariadb10(ROUTER_INSTANCE *router, GWBUF *buf); static void blr_register_mysqlgtid(ROUTER_INSTANCE *router, GWBUF *buf); static void blr_register_serveruuid(ROUTER_INSTANCE *router, GWBUF *buf); @@ -130,8 +130,20 @@ static void blr_register_selecthostname(ROUTER_INSTANCE *router, GWBUF *buf); static void blr_register_selectmap(ROUTER_INSTANCE *router, GWBUF *buf); static void blr_register_mxw_binlogvars(ROUTER_INSTANCE *router, GWBUF *buf); static void blr_register_mxw_tables(ROUTER_INSTANCE *router, GWBUF *buf); -static void blr_register_checksemisync(ROUTER_INSTANCE *router, GWBUF *buf); -static bool blr_register_requestsemisync(ROUTER_INSTANCE *router, GWBUF *buf); +static void blr_register_getsemisync(ROUTER_INSTANCE *router, GWBUF *buf); +static bool blr_register_setsemisync(ROUTER_INSTANCE *router, GWBUF *buf); +static void blr_register_mxw_handlelowercase(ROUTER_INSTANCE *router, + GWBUF *buf); +static void blr_register_mxw_servervars(ROUTER_INSTANCE *router, GWBUF *buf); +static void blr_register_set_latin1(ROUTER_INSTANCE *router, GWBUF *buf); +static void blr_register_cache_map(ROUTER_INSTANCE *router, GWBUF *buf); +static void blr_register_cache_utf8(ROUTER_INSTANCE *router, GWBUF *buf); +static void blr_register_cache_utf8(ROUTER_INSTANCE *router, GWBUF *buf); +static void blr_register_cache_slaveuuid(ROUTER_INSTANCE *router, GWBUF *buf); +static void blr_register_cache_mariadb10(ROUTER_INSTANCE *router, GWBUF *buf); +static void blr_register_send_select1(ROUTER_INSTANCE *router, GWBUF *buf); +static void blr_register_mxw_sqlmode(ROUTER_INSTANCE *router, GWBUF *buf); +static void blr_register_mxw_charset(ROUTER_INSTANCE *router, GWBUF *buf); static void worker_cb_start_master(int worker_id, void* data); static void blr_start_master_in_main(void* data); @@ -546,8 +558,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf) break; case BLRM_CHKSUM2: // Set router->master_chksum based on server reply - blr_register_handlechecksum(router, buf); - + blr_register_getchecksum(router, buf); // Next state is BLRM_MARIADB10 or BLRM_GTIDMODE if (router->mariadb10_compat) { @@ -559,75 +570,41 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf) } break; case BLRM_MARIADB10: - // Response to the SET @mariadb_slave_capability=4, should be stored - if (router->saved_master.mariadb10) - { - GWBUF_CONSUME_ALL(router->saved_master.mariadb10); - } - router->saved_master.mariadb10 = buf; - blr_cache_response(router, "mariadb10", buf); - - // Skip SERVER_UUID fetch and SET slave UUID (MySQL 5.6/7 only) - buf = blr_make_query(router->master, "SET NAMES latin1"); - router->master_state = BLRM_LATIN1; - router->master->func.write(router->master, buf); + blr_register_cache_mariadb10(router, buf); + // Skip SERVER_UUID fetch and SET slave UUID + blr_register_set_latin1(router, buf); break; - case BLRM_GTIDMODE: + case BLRM_GTIDMODE: // MySQL 5.6/7 only blr_register_serveruuid(router, buf); break; - case BLRM_MUUID: + case BLRM_MUUID: // MySQL 5.6/7 only blr_register_slaveuuid(router, buf); break; - case BLRM_SUUID: - // Response to the SET @server_uuid, should be stored - if (router->saved_master.setslaveuuid) - { - GWBUF_CONSUME_ALL(router->saved_master.setslaveuuid); - } - router->saved_master.setslaveuuid = buf; - blr_cache_response(router, "ssuuid", buf); - buf = blr_make_query(router->master, "SET NAMES latin1"); - router->master_state = BLRM_LATIN1; - router->master->func.write(router->master, buf); + case BLRM_SUUID: // MySQL 5.6/7 only + blr_register_cache_slaveuuid(router, buf); + blr_register_set_latin1(router, buf); break; case BLRM_LATIN1: blr_register_utf8(router, buf); break; case BLRM_UTF8: - // Response to the SET NAMES utf8, should be stored - if (router->saved_master.utf8) - { - GWBUF_CONSUME_ALL(router->saved_master.utf8); - } - router->saved_master.utf8 = buf; - blr_cache_response(router, "utf8", buf); - + blr_register_cache_utf8(router, buf); if (router->maxwell_compat) { - buf = blr_make_query(router->master, "SET character_set_results = NULL"); - router->master_state = BLRM_RESULTS_CHARSET; - router->master->func.write(router->master, buf); + blr_register_mxw_charset(router, buf); } else { - buf = blr_make_query(router->master, "SELECT 1"); - router->master_state = BLRM_SELECT1; - router->master->func.write(router->master, buf); + blr_register_send_select1(router, buf); } break; case BLRM_RESULTS_CHARSET: - gwbuf_free(buf); - - buf = blr_make_query(router->master, MYSQL_CONNECTOR_SQL_MODE_QUERY); - router->master_state = BLRM_SQL_MODE; - router->master->func.write(router->master, buf); + gwbuf_free(buf); // Discard server reply, don't save it + blr_register_mxw_sqlmode(router, buf); break; case BLRM_SQL_MODE: - gwbuf_free(buf); - - buf = blr_make_query(router->master, "SELECT 1"); - router->master_state = BLRM_SELECT1; - router->master->func.write(router->master, buf); + gwbuf_free(buf); // Discard server reply, don't save it + blr_register_send_select1(router, buf); break; case BLRM_SELECT1: blr_register_selectversion(router, buf); @@ -642,20 +619,10 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf) blr_register_selectmap(router, buf); break; case BLRM_MAP: - // Response to SELECT @@max_allowed_packet should be stored - if (router->saved_master.map) - { - GWBUF_CONSUME_ALL(router->saved_master.map); - } - router->saved_master.map = buf; - blr_cache_response(router, "map", buf); - + blr_register_cache_map(router, buf); if (router->maxwell_compat) { - // Query for Server Variables - buf = blr_make_query(router->master, MYSQL_CONNECTOR_SERVER_VARS_QUERY); - router->master_state = BLRM_SERVER_VARS; - router->master->func.write(router->master, buf); + blr_register_mxw_servervars(router, buf); break; } else @@ -669,7 +636,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf) * with new state BLRM_REGISTER_READY * Go ahead if maxwell_compat is not set */ - if (router->maxwell_compat) + if (router->master_state == BLRM_SERVER_VARS && router->maxwell_compat) { blr_register_mxw_binlogvars(router, buf); break; @@ -680,7 +647,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf) * with new state BLRM_REGISTER_READY. * Go ahead if maxwell_compat is not set */ - if (router->maxwell_compat) + if (router->master_state == BLRM_BINLOG_VARS && router->maxwell_compat) { blr_register_mxw_tables(router, buf); break; @@ -691,21 +658,18 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf) * with new state BLRM_REGISTER_READY. * Go ahead if maxwell_compat is not set */ - if (router->maxwell_compat) + if (router->master_state == BLRM_LOWER_CASE_TABLES && + router->maxwell_compat) { - if (router->saved_master.lower_case_tables) - { - GWBUF_CONSUME_ALL(router->saved_master.lower_case_tables); - } - router->saved_master.lower_case_tables = buf; - blr_cache_response(router, "lower_case_tables", buf); - router->master_state = BLRM_REGISTER_READY; + blr_register_mxw_handlelowercase(router, buf); // Continue: ready for the registration, nothing to write/read } case BLRM_REGISTER_READY: - // Prepare registration + // Prepare Slave registration request: COM_REGISTER_SLAVE buf = blr_make_registration(router); + // Set new state router->master_state = BLRM_REGISTER; + // Send the packet router->master->func.write(router->master, buf); break; case BLRM_REGISTER: @@ -715,7 +679,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf) /* if semisync option is set, check for master semi-sync availability */ if (router->request_semi_sync) { - blr_register_checksemisync(router, buf); + blr_register_getsemisync(router, buf); break; } else @@ -730,7 +694,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf) */ if (router->master_state == BLRM_CHECK_SEMISYNC) { - if (blr_register_requestsemisync(router, buf)) + if (blr_register_setsemisync(router, buf)) { break; } @@ -748,19 +712,17 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf) /* Continue */ router->master_state = BLRM_REQUEST_BINLOGDUMP; } - case BLRM_REQUEST_BINLOGDUMP: /** * This branch is reached after semi-sync check/request or * just after sending COM_REGISTER_SLAVE if request_semi_sync option is false */ - /* Request now a dump of the binlog file */ + /* Request now a dump of the binlog file: COM_BINLOG_DUMP */ buf = blr_make_binlog_dump(router); - router->master_state = BLRM_BINLOGDUMP; - router->master->func.write(router->master, buf); + MXS_NOTICE("%s: Request binlog records from %s at " "position %lu from master server [%s]:%d", router->service->name, router->binlog_name, @@ -770,9 +732,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf) /* Log binlog router identity */ blr_log_identity(router); - break; - case BLRM_BINLOGDUMP: /** * Main body, we have received a binlog record from the master @@ -3035,6 +2995,7 @@ static void blr_register_mxw_binlogvars(ROUTER_INSTANCE *router, GWBUF *buf) */ static void blr_register_mxw_tables(ROUTER_INSTANCE *router, GWBUF *buf) { + // Response from master should be stored if (router->saved_master.binlog_vars) { GWBUF_CONSUME_ALL(router->saved_master.binlog_vars); @@ -3049,19 +3010,43 @@ static void blr_register_mxw_tables(ROUTER_INSTANCE *router, GWBUF *buf) router->master->func.write(router->master, buf); } -static void blr_register_checksemisync(ROUTER_INSTANCE *router, GWBUF *buf) +/** + * Slave protocol registration: check Master SEMI-SYNC replication + * + * Ask master server for Semi-Sync replication capability + * + * Note: Master server must have rpl_semi_sync_master plugin installed + * in order to start the Semi-Sync replication + * + * @param router Current router instance + * @param buf The GWBUF to fill with new request + */ +static void blr_register_getsemisync(ROUTER_INSTANCE *router, GWBUF *buf) { MXS_NOTICE("%s: checking Semi-Sync replication capability for master server [%s]:%d", router->service->name, router->service->dbref->server->name, router->service->dbref->server->port); + // New registration message buf = blr_make_query(router->master, "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'"); + // Set the new state router->master_state = BLRM_CHECK_SEMISYNC; router->master->func.write(router->master, buf); } -static bool blr_register_requestsemisync(ROUTER_INSTANCE *router, GWBUF *buf) +/** + * Slave protocol registration: handle SEMI-SYNC replication + * + * Get master semisync capability + * and if installed start the SEMI-SYNC replication + * + * @param router Current router instance + * @param buf GWBUF with server reply to previous + * registration command + * @return True is semi-sync can be started or false + */ +static bool blr_register_setsemisync(ROUTER_INSTANCE *router, GWBUF *buf) { if (router->master_state == BLRM_CHECK_SEMISYNC) { @@ -3089,8 +3074,8 @@ static bool blr_register_requestsemisync(ROUTER_INSTANCE *router, GWBUF *buf) if (router->master_semi_sync == MASTER_SEMISYNC_DISABLED) { /* Installed but not enabled, right now */ - MXS_NOTICE("%s: master server [%s]:%d doesn't have semi_sync enabled right now, " - "Requesting Semi-Sync Replication", + MXS_NOTICE("%s: master server [%s]:%d doesn't have semi_sync" + " enabled right now, Request Semi-Sync Replication anyway", router->service->name, router->service->dbref->server->name, router->service->dbref->server->port); @@ -3098,7 +3083,8 @@ static bool blr_register_requestsemisync(ROUTER_INSTANCE *router, GWBUF *buf) else { /* Installed and enabled */ - MXS_NOTICE("%s: master server [%s]:%d has semi_sync enabled, Requesting Semi-Sync Replication", + MXS_NOTICE("%s: master server [%s]:%d has semi_sync enabled," + " Requesting Semi-Sync Replication", router->service->name, router->service->dbref->server->name, router->service->dbref->server->port); @@ -3115,3 +3101,189 @@ static bool blr_register_requestsemisync(ROUTER_INSTANCE *router, GWBUF *buf) return false; } + +/** + * Slave Protocol registration to Master (MaxWell compatibility): + * + * Handles previous reply from Master and + * sets the state to BLRM_REGISTER_READY + * + * @param router Current router instance + * @param buf GWBUF with server reply to previous + * registration command + */ +static void blr_register_mxw_handlelowercase(ROUTER_INSTANCE *router, + GWBUF *buf) +{ + // Response from master should be stored + if (router->saved_master.lower_case_tables) + { + GWBUF_CONSUME_ALL(router->saved_master.lower_case_tables); + } + router->saved_master.lower_case_tables = buf; + blr_cache_response(router, "lower_case_tables", buf); + // Set the new state + router->master_state = BLRM_REGISTER_READY; +} + +/** + * Slave Protocol registration to Master: + * + * Saves previous reply from Master (Max Allowed Packet) + * + * @param router Current router instance + * @param buf GWBUF with server reply to previous + * registration command + */ +static void blr_register_cache_map(ROUTER_INSTANCE *router, + GWBUF *buf) +{ + // Response to SELECT @@max_allowed_packet should be stored + if (router->saved_master.map) + { + GWBUF_CONSUME_ALL(router->saved_master.map); + } + router->saved_master.map = buf; + blr_cache_response(router, "map", buf); +} + +/** + * Slave Protocol registration to Master (MaxWell compatibility): + * + * Sends MYSQL_CONNECTOR_SERVER_VARS_QUERY to Master + * + * @param router Current router instance + * @param buf GWBUF to fill with new request + */ +static void blr_register_mxw_servervars(ROUTER_INSTANCE *router, GWBUF *buf) +{ + // New registration message + buf = blr_make_query(router->master, MYSQL_CONNECTOR_SERVER_VARS_QUERY); + // Set the new state + router->master_state = BLRM_SERVER_VARS; + router->master->func.write(router->master, buf); +} + +/** + * Slave Protocol registration to Master: + * + * Saves previous reply from Master (UTF8) + * + * @param router Current router instance + * @param buf GWBUF with server reply to previous + * registration command + */ +static void blr_register_cache_utf8(ROUTER_INSTANCE *router, GWBUF *buf) +{ + // Response to the SET NAMES utf8, should be stored + if (router->saved_master.utf8) + { + GWBUF_CONSUME_ALL(router->saved_master.utf8); + } + router->saved_master.utf8 = buf; + blr_cache_response(router, "utf8", buf); +} + +/** + * Slave Protocol registration to Master: + * + * Sends SET NAMES latin1 to Master + * + * @param router Current router instance + * @param buf GWBUF to fill with new request + */ +static void blr_register_set_latin1(ROUTER_INSTANCE *router, GWBUF *buf) +{ + buf = blr_make_query(router->master, "SET NAMES latin1"); + // Set the new state + router->master_state = BLRM_LATIN1; + router->master->func.write(router->master, buf); +} + +/** + * Slave Protocol registration to Master: + * + * Saves previous reply from Master (Slave UUID) + * + * @param router Current router instance + * @param buf GWBUF with server reply to previous + * registration command + */ +static void blr_register_cache_slaveuuid(ROUTER_INSTANCE *router, GWBUF *buf) +{ + // Response to the SET @server_uuid, should be stored + if (router->saved_master.setslaveuuid) + { + GWBUF_CONSUME_ALL(router->saved_master.setslaveuuid); + } + router->saved_master.setslaveuuid = buf; + blr_cache_response(router, "ssuuid", buf); +} + +/** + * Slave Protocol registration to Master: + * + * Saves previous reply from Master (MariaDB10 compatibility) + * + * @param router Current router instance + * @param buf GWBUF with server reply to previous + * registration command + */ +static void blr_register_cache_mariadb10(ROUTER_INSTANCE *router, GWBUF *buf) +{ + // Response to the SET @mariadb_slave_capability=4, should be stored + if (router->saved_master.mariadb10) + { + GWBUF_CONSUME_ALL(router->saved_master.mariadb10); + } + router->saved_master.mariadb10 = buf; + blr_cache_response(router, "mariadb10", buf); +} + +/** + * Slave Protocol registration to Master: + * + * Sends SELECT 1 to Master + * + * @param router Current router instance + * @param buf GWBUF to fill with new request + */ +static void blr_register_send_select1(ROUTER_INSTANCE *router, GWBUF *buf) +{ + buf = blr_make_query(router->master, "SELECT 1"); + // Set the new state + router->master_state = BLRM_SELECT1; + router->master->func.write(router->master, buf); +} + +/** + * Slave Protocol registration to Master (MaxWell compatibility): + * + * Sends MYSQL_CONNECTOR_SQL_MODE_QUERY to Master + * + * @param router Current router instance + * @param buf GWBUF to fill with new request + */ +static void blr_register_mxw_sqlmode(ROUTER_INSTANCE *router, GWBUF *buf) +{ + buf = blr_make_query(router->master, MYSQL_CONNECTOR_SQL_MODE_QUERY); + // Set the new state + router->master_state = BLRM_SQL_MODE; + router->master->func.write(router->master, buf); +} + +/** + * Slave Protocol registration to Master (MaxWell compatibility): + * + * Sends SET character_set_results = NULL to Master + * + * @param router Current router instance + * @param buf GWBUF to fill with new request + */ +static void blr_register_mxw_charset(ROUTER_INSTANCE *router, GWBUF *buf) +{ + buf = blr_make_query(router->master, "SET character_set_results = NULL"); + // Set the new state + router->master_state = BLRM_RESULTS_CHARSET; + router->master->func.write(router->master, buf); +}