MXS-1209: blr_start_master_registration() handles the replication protocol registration
New routine blr_start_master_registration() handles the replication protocol registration
This commit is contained in:
@ -139,6 +139,7 @@ static void blr_register_cache_response(ROUTER_INSTANCE *router,
|
||||
GWBUF **save_buf,
|
||||
const char *save_tag,
|
||||
GWBUF *in_buf);
|
||||
static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf);
|
||||
|
||||
static void worker_cb_start_master(int worker_id, void* data);
|
||||
static void blr_start_master_in_main(void* data);
|
||||
@ -537,252 +538,11 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
|
||||
atomic_add(&router->handling_threads, -1);
|
||||
return;
|
||||
}
|
||||
switch (router->master_state)
|
||||
{
|
||||
case BLRM_TIMESTAMP:
|
||||
/**
|
||||
* Previous state was BLRM_TIMESTAMP
|
||||
* No need to save the server reply
|
||||
*/
|
||||
gwbuf_free(buf);
|
||||
blr_register_send_command(router,
|
||||
"SHOW VARIABLES LIKE 'SERVER_ID'",
|
||||
BLRM_SERVERID);
|
||||
router->retry_backoff = 1;
|
||||
break;
|
||||
case BLRM_SERVERID:
|
||||
blr_register_heartbeat(router, buf);
|
||||
break;
|
||||
case BLRM_HBPERIOD:
|
||||
blr_register_setchecksum(router, buf);
|
||||
break;
|
||||
case BLRM_CHKSUM1:
|
||||
blr_register_getchecksum(router, buf);
|
||||
break;
|
||||
case BLRM_CHKSUM2:
|
||||
// Set router->master_chksum based on server reply
|
||||
blr_register_handle_checksum(router, buf);
|
||||
// Next state is BLRM_MARIADB10 or BLRM_GTIDMODE
|
||||
{
|
||||
unsigned int state = router->mariadb10_compat ?
|
||||
BLRM_MARIADB10 :
|
||||
BLRM_GTIDMODE;
|
||||
const char *command = router->mariadb10_compat ?
|
||||
"SET @mariadb_slave_capability=4" :
|
||||
"SELECT @@GLOBAL.GTID_MODE";
|
||||
|
||||
blr_register_send_command(router, command, state);
|
||||
}
|
||||
break;
|
||||
case BLRM_MARIADB10:
|
||||
// Save server response
|
||||
blr_register_cache_response(router,
|
||||
&router->saved_master.mariadb10,
|
||||
"mariadb10",
|
||||
buf);
|
||||
// Skip SERVER_UUID fetch and SET slave UUID
|
||||
blr_register_send_command(router,
|
||||
"SET NAMES latin1",
|
||||
BLRM_LATIN1);
|
||||
break;
|
||||
case BLRM_GTIDMODE: // MySQL 5.6/7 only
|
||||
blr_register_serveruuid(router, buf);
|
||||
break;
|
||||
case BLRM_MUUID: // MySQL 5.6/7 only
|
||||
blr_register_slaveuuid(router, buf);
|
||||
break;
|
||||
case BLRM_SUUID: // MySQL 5.6/7 only
|
||||
// Save server response
|
||||
blr_register_cache_response(router,
|
||||
&router->saved_master.setslaveuuid,
|
||||
"ssuuid",
|
||||
buf);
|
||||
blr_register_send_command(router,
|
||||
"SET NAMES latin1",
|
||||
BLRM_LATIN1);
|
||||
break;
|
||||
case BLRM_LATIN1:
|
||||
blr_register_utf8(router, buf);
|
||||
break;
|
||||
case BLRM_UTF8:
|
||||
// Save server response
|
||||
blr_register_cache_response(router,
|
||||
&router->saved_master.utf8,
|
||||
"utf8",
|
||||
buf);
|
||||
// Next state is MAXWELL BLRM_RESULTS_CHARSET or
|
||||
// BLRM_SELECT1
|
||||
{
|
||||
unsigned int state = router->maxwell_compat ?
|
||||
BLRM_RESULTS_CHARSET :
|
||||
BLRM_SELECT1;
|
||||
const char *command = router->maxwell_compat ?
|
||||
"SET character_set_results = NULL" :
|
||||
"SELECT 1";
|
||||
|
||||
blr_register_send_command(router, command, state);
|
||||
break;
|
||||
}
|
||||
case BLRM_RESULTS_CHARSET:
|
||||
gwbuf_free(buf); // Discard server reply, don't save it
|
||||
blr_register_send_command(router,
|
||||
MYSQL_CONNECTOR_SQL_MODE_QUERY,
|
||||
BLRM_SQL_MODE);
|
||||
break;
|
||||
case BLRM_SQL_MODE:
|
||||
gwbuf_free(buf); // Discard server reply, don't save it
|
||||
blr_register_send_command(router,
|
||||
"SELECT 1",
|
||||
BLRM_SELECT1);
|
||||
break;
|
||||
case BLRM_SELECT1:
|
||||
blr_register_selectversion(router, buf);
|
||||
break;
|
||||
case BLRM_SELECTVER:
|
||||
blr_register_selectvercomment(router, buf);
|
||||
break;
|
||||
case BLRM_SELECTVERCOM:
|
||||
blr_register_selecthostname(router, buf);
|
||||
break;
|
||||
case BLRM_SELECTHOSTNAME:
|
||||
blr_register_selectmap(router, buf);
|
||||
break;
|
||||
case BLRM_MAP:
|
||||
// Save server response
|
||||
blr_register_cache_response(router,
|
||||
&router->saved_master.map,
|
||||
"map",
|
||||
buf);
|
||||
if (router->maxwell_compat)
|
||||
{
|
||||
blr_register_send_command(router,
|
||||
MYSQL_CONNECTOR_SERVER_VARS_QUERY,
|
||||
BLRM_SERVER_VARS);
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Continue: ready for the registration, nothing to write/read
|
||||
router->master_state = BLRM_REGISTER_READY;
|
||||
}
|
||||
case BLRM_SERVER_VARS:
|
||||
/**
|
||||
* This branch could be reached as fallthrough from BLRM_MAP
|
||||
* with new state BLRM_REGISTER_READY
|
||||
* Go ahead if maxwell_compat is not set
|
||||
*/
|
||||
if (router->master_state == BLRM_SERVER_VARS && router->maxwell_compat)
|
||||
{
|
||||
blr_register_mxw_binlogvars(router, buf);
|
||||
break;
|
||||
}
|
||||
case BLRM_BINLOG_VARS:
|
||||
/**
|
||||
* This branch could be reached as fallthrough from BLRM_MAP
|
||||
* with new state BLRM_REGISTER_READY.
|
||||
* Go ahead if maxwell_compat is not set
|
||||
*/
|
||||
if (router->master_state == BLRM_BINLOG_VARS && router->maxwell_compat)
|
||||
{
|
||||
blr_register_mxw_tables(router, buf);
|
||||
break;
|
||||
}
|
||||
case BLRM_LOWER_CASE_TABLES:
|
||||
/**
|
||||
* This branch could be reached as fallthrough from BLRM_MAP
|
||||
* with new state BLRM_REGISTER_READY.
|
||||
* Go ahead if maxwell_compat is not set
|
||||
*/
|
||||
if (router->master_state == BLRM_LOWER_CASE_TABLES &&
|
||||
router->maxwell_compat)
|
||||
{
|
||||
blr_register_mxw_handlelowercase(router, buf);
|
||||
// Continue: ready for the registration, nothing to write/read
|
||||
}
|
||||
case BLRM_REGISTER_READY:
|
||||
// Prepare Slave registration request: COM_REGISTER_SLAVE
|
||||
buf = blr_make_registration(router);
|
||||
// Set new state
|
||||
router->master_state = BLRM_REGISTER;
|
||||
// Send the packet
|
||||
router->master->func.write(router->master, buf);
|
||||
break;
|
||||
case BLRM_REGISTER:
|
||||
/* discard master reply to COM_REGISTER_SLAVE */
|
||||
gwbuf_free(buf);
|
||||
|
||||
/* if semisync option is set, check for master semi-sync availability */
|
||||
if (router->request_semi_sync)
|
||||
{
|
||||
blr_register_getsemisync(router, buf);
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Continue */
|
||||
router->master_state = BLRM_REQUEST_BINLOGDUMP;
|
||||
}
|
||||
case BLRM_CHECK_SEMISYNC:
|
||||
/**
|
||||
* This branch could be reached as fallthrough from BLRM_REGISTER
|
||||
* if request_semi_sync option is false
|
||||
*/
|
||||
if (router->master_state == BLRM_CHECK_SEMISYNC)
|
||||
{
|
||||
if (blr_register_setsemisync(router, buf))
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
case BLRM_REQUEST_SEMISYNC:
|
||||
/**
|
||||
* This branch could be reached as fallthrough from BLRM_REGISTER or BLRM_CHECK_SEMISYNC
|
||||
* if request_semi_sync option is false or master doesn't support semisync or it's not enabled
|
||||
*/
|
||||
if (router->master_state == BLRM_REQUEST_SEMISYNC)
|
||||
{
|
||||
/* discard master reply */
|
||||
gwbuf_free(buf);
|
||||
|
||||
/* Continue */
|
||||
router->master_state = BLRM_REQUEST_BINLOGDUMP;
|
||||
}
|
||||
case BLRM_REQUEST_BINLOGDUMP:
|
||||
/**
|
||||
* This branch is reached after semi-sync check/request or
|
||||
* just after sending COM_REGISTER_SLAVE if request_semi_sync option is false
|
||||
*/
|
||||
|
||||
/* Request now a dump of the binlog file: COM_BINLOG_DUMP */
|
||||
buf = blr_make_binlog_dump(router);
|
||||
router->master_state = BLRM_BINLOGDUMP;
|
||||
router->master->func.write(router->master, buf);
|
||||
|
||||
MXS_NOTICE("%s: Request binlog records from %s at "
|
||||
"position %lu from master server [%s]:%d",
|
||||
router->service->name, router->binlog_name,
|
||||
router->current_pos,
|
||||
router->service->dbref->server->name,
|
||||
router->service->dbref->server->port);
|
||||
|
||||
/* Log binlog router identity */
|
||||
blr_log_identity(router);
|
||||
break;
|
||||
case BLRM_BINLOGDUMP:
|
||||
/**
|
||||
* Main body, we have received a binlog record from the master
|
||||
*/
|
||||
blr_handle_binlog_record(router, buf);
|
||||
|
||||
/**
|
||||
* Set heartbeat check task
|
||||
*/
|
||||
snprintf(task_name, BLRM_TASK_NAME_LEN, "%s heartbeat", router->service->name);
|
||||
hktask_add(task_name, blr_check_last_master_event, router, router->heartbeat);
|
||||
|
||||
break;
|
||||
}
|
||||
// Start the Slave Protocol registration with Master server
|
||||
blr_start_master_registration(router, buf);
|
||||
|
||||
// Check whether re-connect to master is needed
|
||||
if (router->reconnect_pending)
|
||||
{
|
||||
blr_restart_master(router);
|
||||
@ -3086,3 +2846,265 @@ static void blr_register_cache_response(ROUTER_INSTANCE *router,
|
||||
// New value saved to disk
|
||||
blr_cache_response(router, (char *)save_tag, in_buf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Slave Protocol registration to Master:
|
||||
*
|
||||
* Handling the registration process:
|
||||
*
|
||||
* Note: some phases are specific to MySQL 5.6/5.7,
|
||||
* others to MariaDB10.
|
||||
*
|
||||
* @param router Current router instance
|
||||
* @param in_buf GWBUF with previous phase server response
|
||||
*/
|
||||
static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf)
|
||||
{
|
||||
char task_name[BLRM_TASK_NAME_LEN + 1] = "";
|
||||
|
||||
switch (router->master_state)
|
||||
{
|
||||
case BLRM_TIMESTAMP:
|
||||
/**
|
||||
* Previous state was BLRM_TIMESTAMP
|
||||
* No need to save the server reply
|
||||
*/
|
||||
gwbuf_free(buf);
|
||||
blr_register_send_command(router,
|
||||
"SHOW VARIABLES LIKE 'SERVER_ID'",
|
||||
BLRM_SERVERID);
|
||||
router->retry_backoff = 1;
|
||||
break;
|
||||
case BLRM_SERVERID:
|
||||
blr_register_heartbeat(router, buf);
|
||||
break;
|
||||
case BLRM_HBPERIOD:
|
||||
blr_register_setchecksum(router, buf);
|
||||
break;
|
||||
case BLRM_CHKSUM1:
|
||||
blr_register_getchecksum(router, buf);
|
||||
break;
|
||||
case BLRM_CHKSUM2:
|
||||
// Set router->master_chksum based on server reply
|
||||
blr_register_handle_checksum(router, buf);
|
||||
// Next state is BLRM_MARIADB10 or BLRM_GTIDMODE
|
||||
{
|
||||
unsigned int state = router->mariadb10_compat ?
|
||||
BLRM_MARIADB10 :
|
||||
BLRM_GTIDMODE;
|
||||
const char *command = router->mariadb10_compat ?
|
||||
"SET @mariadb_slave_capability=4" :
|
||||
"SELECT @@GLOBAL.GTID_MODE";
|
||||
|
||||
blr_register_send_command(router, command, state);
|
||||
}
|
||||
break;
|
||||
case BLRM_MARIADB10:
|
||||
// Save server response
|
||||
blr_register_cache_response(router,
|
||||
&router->saved_master.mariadb10,
|
||||
"mariadb10",
|
||||
buf);
|
||||
// Skip SERVER_UUID fetch and SET slave UUID
|
||||
blr_register_send_command(router,
|
||||
"SET NAMES latin1",
|
||||
BLRM_LATIN1);
|
||||
break;
|
||||
case BLRM_GTIDMODE: // MySQL 5.6/7 only
|
||||
blr_register_serveruuid(router, buf);
|
||||
break;
|
||||
case BLRM_MUUID: // MySQL 5.6/7 only
|
||||
blr_register_slaveuuid(router, buf);
|
||||
break;
|
||||
case BLRM_SUUID: // MySQL 5.6/7 only
|
||||
// Save server response
|
||||
blr_register_cache_response(router,
|
||||
&router->saved_master.setslaveuuid,
|
||||
"ssuuid",
|
||||
buf);
|
||||
blr_register_send_command(router,
|
||||
"SET NAMES latin1",
|
||||
BLRM_LATIN1);
|
||||
break;
|
||||
case BLRM_LATIN1:
|
||||
blr_register_utf8(router, buf);
|
||||
break;
|
||||
case BLRM_UTF8:
|
||||
// Save server response
|
||||
blr_register_cache_response(router,
|
||||
&router->saved_master.utf8,
|
||||
"utf8",
|
||||
buf);
|
||||
// Next state is MAXWELL BLRM_RESULTS_CHARSET or
|
||||
// BLRM_SELECT1
|
||||
{
|
||||
unsigned int state = router->maxwell_compat ?
|
||||
BLRM_RESULTS_CHARSET :
|
||||
BLRM_SELECT1;
|
||||
const char *command = router->maxwell_compat ?
|
||||
"SET character_set_results = NULL" :
|
||||
"SELECT 1";
|
||||
|
||||
blr_register_send_command(router, command, state);
|
||||
break;
|
||||
}
|
||||
case BLRM_RESULTS_CHARSET:
|
||||
gwbuf_free(buf); // Discard server reply, don't save it
|
||||
blr_register_send_command(router,
|
||||
MYSQL_CONNECTOR_SQL_MODE_QUERY,
|
||||
BLRM_SQL_MODE);
|
||||
break;
|
||||
case BLRM_SQL_MODE:
|
||||
gwbuf_free(buf); // Discard server reply, don't save it
|
||||
blr_register_send_command(router,
|
||||
"SELECT 1",
|
||||
BLRM_SELECT1);
|
||||
break;
|
||||
case BLRM_SELECT1:
|
||||
blr_register_selectversion(router, buf);
|
||||
break;
|
||||
case BLRM_SELECTVER:
|
||||
blr_register_selectvercomment(router, buf);
|
||||
break;
|
||||
case BLRM_SELECTVERCOM:
|
||||
blr_register_selecthostname(router, buf);
|
||||
break;
|
||||
case BLRM_SELECTHOSTNAME:
|
||||
blr_register_selectmap(router, buf);
|
||||
break;
|
||||
case BLRM_MAP:
|
||||
// Save server response
|
||||
blr_register_cache_response(router,
|
||||
&router->saved_master.map,
|
||||
"map",
|
||||
buf);
|
||||
if (router->maxwell_compat)
|
||||
{
|
||||
blr_register_send_command(router,
|
||||
MYSQL_CONNECTOR_SERVER_VARS_QUERY,
|
||||
BLRM_SERVER_VARS);
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Continue: ready for the registration, nothing to write/read
|
||||
router->master_state = BLRM_REGISTER_READY;
|
||||
}
|
||||
case BLRM_SERVER_VARS:
|
||||
/**
|
||||
* This branch could be reached as fallthrough from BLRM_MAP
|
||||
* with new state BLRM_REGISTER_READY
|
||||
* Go ahead if maxwell_compat is not set
|
||||
*/
|
||||
if (router->master_state == BLRM_SERVER_VARS && router->maxwell_compat)
|
||||
{
|
||||
blr_register_mxw_binlogvars(router, buf);
|
||||
break;
|
||||
}
|
||||
case BLRM_BINLOG_VARS:
|
||||
/**
|
||||
* This branch could be reached as fallthrough from BLRM_MAP
|
||||
* with new state BLRM_REGISTER_READY.
|
||||
* Go ahead if maxwell_compat is not set
|
||||
*/
|
||||
if (router->master_state == BLRM_BINLOG_VARS && router->maxwell_compat)
|
||||
{
|
||||
blr_register_mxw_tables(router, buf);
|
||||
break;
|
||||
}
|
||||
case BLRM_LOWER_CASE_TABLES:
|
||||
/**
|
||||
* This branch could be reached as fallthrough from BLRM_MAP
|
||||
* with new state BLRM_REGISTER_READY.
|
||||
* Go ahead if maxwell_compat is not set
|
||||
*/
|
||||
if (router->master_state == BLRM_LOWER_CASE_TABLES &&
|
||||
router->maxwell_compat)
|
||||
{
|
||||
blr_register_mxw_handlelowercase(router, buf);
|
||||
// Continue: ready for the registration, nothing to write/read
|
||||
}
|
||||
case BLRM_REGISTER_READY:
|
||||
// Prepare Slave registration request: COM_REGISTER_SLAVE
|
||||
buf = blr_make_registration(router);
|
||||
// Set new state
|
||||
router->master_state = BLRM_REGISTER;
|
||||
// Send the packet
|
||||
router->master->func.write(router->master, buf);
|
||||
break;
|
||||
case BLRM_REGISTER:
|
||||
/* discard master reply to COM_REGISTER_SLAVE */
|
||||
gwbuf_free(buf);
|
||||
|
||||
/* if semisync option is set, check for master semi-sync availability */
|
||||
if (router->request_semi_sync)
|
||||
{
|
||||
blr_register_getsemisync(router, buf);
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Continue */
|
||||
router->master_state = BLRM_REQUEST_BINLOGDUMP;
|
||||
}
|
||||
case BLRM_CHECK_SEMISYNC:
|
||||
/**
|
||||
* This branch could be reached as fallthrough from BLRM_REGISTER
|
||||
* if request_semi_sync option is false
|
||||
*/
|
||||
if (router->master_state == BLRM_CHECK_SEMISYNC)
|
||||
{
|
||||
if (blr_register_setsemisync(router, buf))
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
case BLRM_REQUEST_SEMISYNC:
|
||||
/**
|
||||
* This branch could be reached as fallthrough from BLRM_REGISTER or BLRM_CHECK_SEMISYNC
|
||||
* if request_semi_sync option is false or master doesn't support semisync or it's not enabled
|
||||
*/
|
||||
if (router->master_state == BLRM_REQUEST_SEMISYNC)
|
||||
{
|
||||
/* discard master reply */
|
||||
gwbuf_free(buf);
|
||||
|
||||
/* Continue */
|
||||
router->master_state = BLRM_REQUEST_BINLOGDUMP;
|
||||
}
|
||||
case BLRM_REQUEST_BINLOGDUMP:
|
||||
/**
|
||||
* This branch is reached after semi-sync check/request or
|
||||
* just after sending COM_REGISTER_SLAVE if request_semi_sync option is false
|
||||
*/
|
||||
|
||||
/* Request now a dump of the binlog file: COM_BINLOG_DUMP */
|
||||
buf = blr_make_binlog_dump(router);
|
||||
router->master_state = BLRM_BINLOGDUMP;
|
||||
router->master->func.write(router->master, buf);
|
||||
|
||||
MXS_NOTICE("%s: Request binlog records from %s at "
|
||||
"position %lu from master server [%s]:%d",
|
||||
router->service->name, router->binlog_name,
|
||||
router->current_pos,
|
||||
router->service->dbref->server->name,
|
||||
router->service->dbref->server->port);
|
||||
|
||||
/* Log binlog router identity */
|
||||
blr_log_identity(router);
|
||||
break;
|
||||
case BLRM_BINLOGDUMP:
|
||||
/**
|
||||
* Main body, we have received a binlog record from the master
|
||||
*/
|
||||
blr_handle_binlog_record(router, buf);
|
||||
|
||||
/**
|
||||
* Set heartbeat check task
|
||||
*/
|
||||
snprintf(task_name, BLRM_TASK_NAME_LEN, "%s heartbeat", router->service->name);
|
||||
hktask_add(task_name, blr_check_last_master_event, router, router->heartbeat);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user