diff --git a/server/modules/routing/binlogrouter/blr_master.c b/server/modules/routing/binlogrouter/blr_master.c index fb01ccf37..f2016ce21 100644 --- a/server/modules/routing/binlogrouter/blr_master.c +++ b/server/modules/routing/binlogrouter/blr_master.c @@ -139,6 +139,7 @@ static void blr_register_cache_response(ROUTER_INSTANCE *router, GWBUF **save_buf, const char *save_tag, GWBUF *in_buf); +static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf); static void worker_cb_start_master(int worker_id, void* data); static void blr_start_master_in_main(void* data); @@ -537,252 +538,11 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf) atomic_add(&router->handling_threads, -1); return; } - switch (router->master_state) - { - case BLRM_TIMESTAMP: - /** - * Previous state was BLRM_TIMESTAMP - * No need to save the server reply - */ - gwbuf_free(buf); - blr_register_send_command(router, - "SHOW VARIABLES LIKE 'SERVER_ID'", - BLRM_SERVERID); - router->retry_backoff = 1; - break; - case BLRM_SERVERID: - blr_register_heartbeat(router, buf); - break; - case BLRM_HBPERIOD: - blr_register_setchecksum(router, buf); - break; - case BLRM_CHKSUM1: - blr_register_getchecksum(router, buf); - break; - case BLRM_CHKSUM2: - // Set router->master_chksum based on server reply - blr_register_handle_checksum(router, buf); - // Next state is BLRM_MARIADB10 or BLRM_GTIDMODE - { - unsigned int state = router->mariadb10_compat ? - BLRM_MARIADB10 : - BLRM_GTIDMODE; - const char *command = router->mariadb10_compat ? - "SET @mariadb_slave_capability=4" : - "SELECT @@GLOBAL.GTID_MODE"; - blr_register_send_command(router, command, state); - } - break; - case BLRM_MARIADB10: - // Save server response - blr_register_cache_response(router, - &router->saved_master.mariadb10, - "mariadb10", - buf); - // Skip SERVER_UUID fetch and SET slave UUID - blr_register_send_command(router, - "SET NAMES latin1", - BLRM_LATIN1); - break; - case BLRM_GTIDMODE: // MySQL 5.6/7 only - blr_register_serveruuid(router, buf); - break; - case BLRM_MUUID: // MySQL 5.6/7 only - blr_register_slaveuuid(router, buf); - break; - case BLRM_SUUID: // MySQL 5.6/7 only - // Save server response - blr_register_cache_response(router, - &router->saved_master.setslaveuuid, - "ssuuid", - buf); - blr_register_send_command(router, - "SET NAMES latin1", - BLRM_LATIN1); - break; - case BLRM_LATIN1: - blr_register_utf8(router, buf); - break; - case BLRM_UTF8: - // Save server response - blr_register_cache_response(router, - &router->saved_master.utf8, - "utf8", - buf); - // Next state is MAXWELL BLRM_RESULTS_CHARSET or - // BLRM_SELECT1 - { - unsigned int state = router->maxwell_compat ? - BLRM_RESULTS_CHARSET : - BLRM_SELECT1; - const char *command = router->maxwell_compat ? - "SET character_set_results = NULL" : - "SELECT 1"; - - blr_register_send_command(router, command, state); - break; - } - case BLRM_RESULTS_CHARSET: - gwbuf_free(buf); // Discard server reply, don't save it - blr_register_send_command(router, - MYSQL_CONNECTOR_SQL_MODE_QUERY, - BLRM_SQL_MODE); - break; - case BLRM_SQL_MODE: - gwbuf_free(buf); // Discard server reply, don't save it - blr_register_send_command(router, - "SELECT 1", - BLRM_SELECT1); - break; - case BLRM_SELECT1: - blr_register_selectversion(router, buf); - break; - case BLRM_SELECTVER: - blr_register_selectvercomment(router, buf); - break; - case BLRM_SELECTVERCOM: - blr_register_selecthostname(router, buf); - break; - case BLRM_SELECTHOSTNAME: - blr_register_selectmap(router, buf); - break; - case BLRM_MAP: - // Save server response - blr_register_cache_response(router, - &router->saved_master.map, - "map", - buf); - if (router->maxwell_compat) - { - blr_register_send_command(router, - MYSQL_CONNECTOR_SERVER_VARS_QUERY, - BLRM_SERVER_VARS); - break; - } - else - { - // Continue: ready for the registration, nothing to write/read - router->master_state = BLRM_REGISTER_READY; - } - case BLRM_SERVER_VARS: - /** - * This branch could be reached as fallthrough from BLRM_MAP - * with new state BLRM_REGISTER_READY - * Go ahead if maxwell_compat is not set - */ - if (router->master_state == BLRM_SERVER_VARS && router->maxwell_compat) - { - blr_register_mxw_binlogvars(router, buf); - break; - } - case BLRM_BINLOG_VARS: - /** - * This branch could be reached as fallthrough from BLRM_MAP - * with new state BLRM_REGISTER_READY. - * Go ahead if maxwell_compat is not set - */ - if (router->master_state == BLRM_BINLOG_VARS && router->maxwell_compat) - { - blr_register_mxw_tables(router, buf); - break; - } - case BLRM_LOWER_CASE_TABLES: - /** - * This branch could be reached as fallthrough from BLRM_MAP - * with new state BLRM_REGISTER_READY. - * Go ahead if maxwell_compat is not set - */ - if (router->master_state == BLRM_LOWER_CASE_TABLES && - router->maxwell_compat) - { - blr_register_mxw_handlelowercase(router, buf); - // Continue: ready for the registration, nothing to write/read - } - case BLRM_REGISTER_READY: - // Prepare Slave registration request: COM_REGISTER_SLAVE - buf = blr_make_registration(router); - // Set new state - router->master_state = BLRM_REGISTER; - // Send the packet - router->master->func.write(router->master, buf); - break; - case BLRM_REGISTER: - /* discard master reply to COM_REGISTER_SLAVE */ - gwbuf_free(buf); - - /* if semisync option is set, check for master semi-sync availability */ - if (router->request_semi_sync) - { - blr_register_getsemisync(router, buf); - break; - } - else - { - /* Continue */ - router->master_state = BLRM_REQUEST_BINLOGDUMP; - } - case BLRM_CHECK_SEMISYNC: - /** - * This branch could be reached as fallthrough from BLRM_REGISTER - * if request_semi_sync option is false - */ - if (router->master_state == BLRM_CHECK_SEMISYNC) - { - if (blr_register_setsemisync(router, buf)) - { - break; - } - } - case BLRM_REQUEST_SEMISYNC: - /** - * This branch could be reached as fallthrough from BLRM_REGISTER or BLRM_CHECK_SEMISYNC - * if request_semi_sync option is false or master doesn't support semisync or it's not enabled - */ - if (router->master_state == BLRM_REQUEST_SEMISYNC) - { - /* discard master reply */ - gwbuf_free(buf); - - /* Continue */ - router->master_state = BLRM_REQUEST_BINLOGDUMP; - } - case BLRM_REQUEST_BINLOGDUMP: - /** - * This branch is reached after semi-sync check/request or - * just after sending COM_REGISTER_SLAVE if request_semi_sync option is false - */ - - /* Request now a dump of the binlog file: COM_BINLOG_DUMP */ - buf = blr_make_binlog_dump(router); - router->master_state = BLRM_BINLOGDUMP; - router->master->func.write(router->master, buf); - - MXS_NOTICE("%s: Request binlog records from %s at " - "position %lu from master server [%s]:%d", - router->service->name, router->binlog_name, - router->current_pos, - router->service->dbref->server->name, - router->service->dbref->server->port); - - /* Log binlog router identity */ - blr_log_identity(router); - break; - case BLRM_BINLOGDUMP: - /** - * Main body, we have received a binlog record from the master - */ - blr_handle_binlog_record(router, buf); - - /** - * Set heartbeat check task - */ - snprintf(task_name, BLRM_TASK_NAME_LEN, "%s heartbeat", router->service->name); - hktask_add(task_name, blr_check_last_master_event, router, router->heartbeat); - - break; - } + // Start the Slave Protocol registration with Master server + blr_start_master_registration(router, buf); + // Check whether re-connect to master is needed if (router->reconnect_pending) { blr_restart_master(router); @@ -3086,3 +2846,265 @@ static void blr_register_cache_response(ROUTER_INSTANCE *router, // New value saved to disk blr_cache_response(router, (char *)save_tag, in_buf); } + +/** + * Slave Protocol registration to Master: + * + * Handling the registration process: + * + * Note: some phases are specific to MySQL 5.6/5.7, + * others to MariaDB10. + * + * @param router Current router instance + * @param in_buf GWBUF with previous phase server response + */ +static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf) +{ + char task_name[BLRM_TASK_NAME_LEN + 1] = ""; + + switch (router->master_state) + { + case BLRM_TIMESTAMP: + /** + * Previous state was BLRM_TIMESTAMP + * No need to save the server reply + */ + gwbuf_free(buf); + blr_register_send_command(router, + "SHOW VARIABLES LIKE 'SERVER_ID'", + BLRM_SERVERID); + router->retry_backoff = 1; + break; + case BLRM_SERVERID: + blr_register_heartbeat(router, buf); + break; + case BLRM_HBPERIOD: + blr_register_setchecksum(router, buf); + break; + case BLRM_CHKSUM1: + blr_register_getchecksum(router, buf); + break; + case BLRM_CHKSUM2: + // Set router->master_chksum based on server reply + blr_register_handle_checksum(router, buf); + // Next state is BLRM_MARIADB10 or BLRM_GTIDMODE + { + unsigned int state = router->mariadb10_compat ? + BLRM_MARIADB10 : + BLRM_GTIDMODE; + const char *command = router->mariadb10_compat ? + "SET @mariadb_slave_capability=4" : + "SELECT @@GLOBAL.GTID_MODE"; + + blr_register_send_command(router, command, state); + } + break; + case BLRM_MARIADB10: + // Save server response + blr_register_cache_response(router, + &router->saved_master.mariadb10, + "mariadb10", + buf); + // Skip SERVER_UUID fetch and SET slave UUID + blr_register_send_command(router, + "SET NAMES latin1", + BLRM_LATIN1); + break; + case BLRM_GTIDMODE: // MySQL 5.6/7 only + blr_register_serveruuid(router, buf); + break; + case BLRM_MUUID: // MySQL 5.6/7 only + blr_register_slaveuuid(router, buf); + break; + case BLRM_SUUID: // MySQL 5.6/7 only + // Save server response + blr_register_cache_response(router, + &router->saved_master.setslaveuuid, + "ssuuid", + buf); + blr_register_send_command(router, + "SET NAMES latin1", + BLRM_LATIN1); + break; + case BLRM_LATIN1: + blr_register_utf8(router, buf); + break; + case BLRM_UTF8: + // Save server response + blr_register_cache_response(router, + &router->saved_master.utf8, + "utf8", + buf); + // Next state is MAXWELL BLRM_RESULTS_CHARSET or + // BLRM_SELECT1 + { + unsigned int state = router->maxwell_compat ? + BLRM_RESULTS_CHARSET : + BLRM_SELECT1; + const char *command = router->maxwell_compat ? + "SET character_set_results = NULL" : + "SELECT 1"; + + blr_register_send_command(router, command, state); + break; + } + case BLRM_RESULTS_CHARSET: + gwbuf_free(buf); // Discard server reply, don't save it + blr_register_send_command(router, + MYSQL_CONNECTOR_SQL_MODE_QUERY, + BLRM_SQL_MODE); + break; + case BLRM_SQL_MODE: + gwbuf_free(buf); // Discard server reply, don't save it + blr_register_send_command(router, + "SELECT 1", + BLRM_SELECT1); + break; + case BLRM_SELECT1: + blr_register_selectversion(router, buf); + break; + case BLRM_SELECTVER: + blr_register_selectvercomment(router, buf); + break; + case BLRM_SELECTVERCOM: + blr_register_selecthostname(router, buf); + break; + case BLRM_SELECTHOSTNAME: + blr_register_selectmap(router, buf); + break; + case BLRM_MAP: + // Save server response + blr_register_cache_response(router, + &router->saved_master.map, + "map", + buf); + if (router->maxwell_compat) + { + blr_register_send_command(router, + MYSQL_CONNECTOR_SERVER_VARS_QUERY, + BLRM_SERVER_VARS); + break; + } + else + { + // Continue: ready for the registration, nothing to write/read + router->master_state = BLRM_REGISTER_READY; + } + case BLRM_SERVER_VARS: + /** + * This branch could be reached as fallthrough from BLRM_MAP + * with new state BLRM_REGISTER_READY + * Go ahead if maxwell_compat is not set + */ + if (router->master_state == BLRM_SERVER_VARS && router->maxwell_compat) + { + blr_register_mxw_binlogvars(router, buf); + break; + } + case BLRM_BINLOG_VARS: + /** + * This branch could be reached as fallthrough from BLRM_MAP + * with new state BLRM_REGISTER_READY. + * Go ahead if maxwell_compat is not set + */ + if (router->master_state == BLRM_BINLOG_VARS && router->maxwell_compat) + { + blr_register_mxw_tables(router, buf); + break; + } + case BLRM_LOWER_CASE_TABLES: + /** + * This branch could be reached as fallthrough from BLRM_MAP + * with new state BLRM_REGISTER_READY. + * Go ahead if maxwell_compat is not set + */ + if (router->master_state == BLRM_LOWER_CASE_TABLES && + router->maxwell_compat) + { + blr_register_mxw_handlelowercase(router, buf); + // Continue: ready for the registration, nothing to write/read + } + case BLRM_REGISTER_READY: + // Prepare Slave registration request: COM_REGISTER_SLAVE + buf = blr_make_registration(router); + // Set new state + router->master_state = BLRM_REGISTER; + // Send the packet + router->master->func.write(router->master, buf); + break; + case BLRM_REGISTER: + /* discard master reply to COM_REGISTER_SLAVE */ + gwbuf_free(buf); + + /* if semisync option is set, check for master semi-sync availability */ + if (router->request_semi_sync) + { + blr_register_getsemisync(router, buf); + break; + } + else + { + /* Continue */ + router->master_state = BLRM_REQUEST_BINLOGDUMP; + } + case BLRM_CHECK_SEMISYNC: + /** + * This branch could be reached as fallthrough from BLRM_REGISTER + * if request_semi_sync option is false + */ + if (router->master_state == BLRM_CHECK_SEMISYNC) + { + if (blr_register_setsemisync(router, buf)) + { + break; + } + } + case BLRM_REQUEST_SEMISYNC: + /** + * This branch could be reached as fallthrough from BLRM_REGISTER or BLRM_CHECK_SEMISYNC + * if request_semi_sync option is false or master doesn't support semisync or it's not enabled + */ + if (router->master_state == BLRM_REQUEST_SEMISYNC) + { + /* discard master reply */ + gwbuf_free(buf); + + /* Continue */ + router->master_state = BLRM_REQUEST_BINLOGDUMP; + } + case BLRM_REQUEST_BINLOGDUMP: + /** + * This branch is reached after semi-sync check/request or + * just after sending COM_REGISTER_SLAVE if request_semi_sync option is false + */ + + /* Request now a dump of the binlog file: COM_BINLOG_DUMP */ + buf = blr_make_binlog_dump(router); + router->master_state = BLRM_BINLOGDUMP; + router->master->func.write(router->master, buf); + + MXS_NOTICE("%s: Request binlog records from %s at " + "position %lu from master server [%s]:%d", + router->service->name, router->binlog_name, + router->current_pos, + router->service->dbref->server->name, + router->service->dbref->server->port); + + /* Log binlog router identity */ + blr_log_identity(router); + break; + case BLRM_BINLOGDUMP: + /** + * Main body, we have received a binlog record from the master + */ + blr_handle_binlog_record(router, buf); + + /** + * Set heartbeat check task + */ + snprintf(task_name, BLRM_TASK_NAME_LEN, "%s heartbeat", router->service->name); + hktask_add(task_name, blr_check_last_master_event, router, router->heartbeat); + + break; + } +}