MXS-1209: Cleaning up Master registration phase
Master registration phase is using new routines
This commit is contained in:
@ -96,7 +96,9 @@ static void blr_log_packet(int priority, char *msg, uint8_t *ptr, int len);
|
|||||||
void blr_master_close(ROUTER_INSTANCE *);
|
void blr_master_close(ROUTER_INSTANCE *);
|
||||||
char *blr_extract_column(GWBUF *buf, int col);
|
char *blr_extract_column(GWBUF *buf, int col);
|
||||||
void poll_fake_write_event(DCB *dcb);
|
void poll_fake_write_event(DCB *dcb);
|
||||||
GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr,
|
GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router,
|
||||||
|
unsigned long long pos,
|
||||||
|
REP_HEADER *hdr,
|
||||||
unsigned long long pos_end);
|
unsigned long long pos_end);
|
||||||
static void blr_check_last_master_event(void *inst);
|
static void blr_check_last_master_event(void *inst);
|
||||||
extern int blr_check_heartbeat(ROUTER_INSTANCE *router);
|
extern int blr_check_heartbeat(ROUTER_INSTANCE *router);
|
||||||
@ -105,10 +107,19 @@ static void blr_extract_header_semisync(uint8_t *pkt, REP_HEADER *hdr);
|
|||||||
static int blr_send_semisync_ack (ROUTER_INSTANCE *router, uint64_t pos);
|
static int blr_send_semisync_ack (ROUTER_INSTANCE *router, uint64_t pos);
|
||||||
static int blr_get_master_semisync(GWBUF *buf);
|
static int blr_get_master_semisync(GWBUF *buf);
|
||||||
|
|
||||||
static void blr_terminate_master_replication(ROUTER_INSTANCE *router, uint8_t* ptr, int len);
|
static void blr_terminate_master_replication(ROUTER_INSTANCE *router,
|
||||||
|
uint8_t* ptr,
|
||||||
|
int len);
|
||||||
void blr_notify_all_slaves(ROUTER_INSTANCE *router);
|
void blr_notify_all_slaves(ROUTER_INSTANCE *router);
|
||||||
extern bool blr_notify_waiting_slave(ROUTER_SLAVE *slave);
|
extern bool blr_notify_waiting_slave(ROUTER_SLAVE *slave);
|
||||||
extern bool blr_save_mariadb_gtid(ROUTER_INSTANCE *inst);
|
extern bool blr_save_mariadb_gtid(ROUTER_INSTANCE *inst);
|
||||||
|
static void blr_register_serverid(ROUTER_INSTANCE *router, GWBUF *buf);
|
||||||
|
static void blr_register_heartbeat(ROUTER_INSTANCE *router, GWBUF *buf);
|
||||||
|
static void blr_register_setchecksum(ROUTER_INSTANCE *router, GWBUF *buf);
|
||||||
|
static void blr_register_getchecksum(ROUTER_INSTANCE *router, GWBUF *buf);
|
||||||
|
static void blr_register_handlechecksum(ROUTER_INSTANCE *router, GWBUF *buf);
|
||||||
|
static void blr_register_mariadb10(ROUTER_INSTANCE *router, GWBUF *buf);
|
||||||
|
static void blr_register_mysqlgtid(ROUTER_INSTANCE *router, GWBUF *buf);
|
||||||
|
|
||||||
static int keepalive = 1;
|
static int keepalive = 1;
|
||||||
|
|
||||||
@ -207,7 +218,9 @@ blr_start_master(void* data)
|
|||||||
}
|
}
|
||||||
|
|
||||||
router->master_state = BLRM_AUTHENTICATED;
|
router->master_state = BLRM_AUTHENTICATED;
|
||||||
router->master->func.write(router->master, blr_make_query(router->master, "SELECT UNIX_TIMESTAMP()"));
|
router->master->func.write(router->master,
|
||||||
|
blr_make_query(router->master,
|
||||||
|
"SELECT UNIX_TIMESTAMP()"));
|
||||||
router->master_state = BLRM_TIMESTAMP;
|
router->master_state = BLRM_TIMESTAMP;
|
||||||
|
|
||||||
router->stats.n_masterstarts++;
|
router->stats.n_masterstarts++;
|
||||||
@ -444,98 +457,31 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
|
|||||||
switch (router->master_state)
|
switch (router->master_state)
|
||||||
{
|
{
|
||||||
case BLRM_TIMESTAMP:
|
case BLRM_TIMESTAMP:
|
||||||
// Response to a timestamp message, no need to save this.
|
blr_register_serverid(router, buf);
|
||||||
gwbuf_free(buf);
|
|
||||||
buf = blr_make_query(router->master, "SHOW VARIABLES LIKE 'SERVER_ID'");
|
|
||||||
router->master_state = BLRM_SERVERID;
|
|
||||||
router->master->func.write(router->master, buf);
|
|
||||||
router->retry_backoff = 1;
|
|
||||||
break;
|
break;
|
||||||
case BLRM_SERVERID:
|
case BLRM_SERVERID:
|
||||||
{
|
blr_register_heartbeat(router, buf);
|
||||||
char *val = blr_extract_column(buf, 2);
|
|
||||||
|
|
||||||
// Response to fetch of master's server-id
|
|
||||||
if (router->saved_master.server_id)
|
|
||||||
{
|
|
||||||
GWBUF_CONSUME_ALL(router->saved_master.server_id);
|
|
||||||
}
|
|
||||||
router->saved_master.server_id = buf;
|
|
||||||
blr_cache_response(router, "serverid", buf);
|
|
||||||
|
|
||||||
// set router->masterid from master server-id if it's not set by the config option
|
|
||||||
if (router->masterid == 0)
|
|
||||||
{
|
|
||||||
router->masterid = atoi(val);
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
char str[BLRM_SET_HEARTBEAT_QUERY_LEN];
|
|
||||||
sprintf(str, "SET @master_heartbeat_period = %lu000000000", router->heartbeat);
|
|
||||||
buf = blr_make_query(router->master, str);
|
|
||||||
}
|
|
||||||
router->master_state = BLRM_HBPERIOD;
|
|
||||||
router->master->func.write(router->master, buf);
|
|
||||||
MXS_FREE(val);
|
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
case BLRM_HBPERIOD:
|
case BLRM_HBPERIOD:
|
||||||
// Response to set the heartbeat period
|
blr_register_setchecksum(router, buf);
|
||||||
if (router->saved_master.heartbeat)
|
|
||||||
{
|
|
||||||
GWBUF_CONSUME_ALL(router->saved_master.heartbeat);
|
|
||||||
}
|
|
||||||
router->saved_master.heartbeat = buf;
|
|
||||||
blr_cache_response(router, "heartbeat", buf);
|
|
||||||
buf = blr_make_query(router->master, "SET @master_binlog_checksum = @@global.binlog_checksum");
|
|
||||||
router->master_state = BLRM_CHKSUM1;
|
|
||||||
router->master->func.write(router->master, buf);
|
|
||||||
break;
|
break;
|
||||||
case BLRM_CHKSUM1:
|
case BLRM_CHKSUM1:
|
||||||
// Response to set the master binlog checksum
|
blr_register_getchecksum(router, buf);
|
||||||
if (router->saved_master.chksum1)
|
|
||||||
{
|
|
||||||
GWBUF_CONSUME_ALL(router->saved_master.chksum1);
|
|
||||||
}
|
|
||||||
router->saved_master.chksum1 = buf;
|
|
||||||
blr_cache_response(router, "chksum1", buf);
|
|
||||||
buf = blr_make_query(router->master, "SELECT @master_binlog_checksum");
|
|
||||||
router->master_state = BLRM_CHKSUM2;
|
|
||||||
router->master->func.write(router->master, buf);
|
|
||||||
break;
|
break;
|
||||||
case BLRM_CHKSUM2:
|
case BLRM_CHKSUM2:
|
||||||
{
|
// Set router->master_chksum based on server reply
|
||||||
char *val = blr_extract_column(buf, 1);
|
blr_register_handlechecksum(router, buf);
|
||||||
|
|
||||||
if (val && strncasecmp(val, "NONE", 4) == 0)
|
|
||||||
{
|
|
||||||
router->master_chksum = false;
|
|
||||||
}
|
|
||||||
if (val)
|
|
||||||
{
|
|
||||||
MXS_FREE(val);
|
|
||||||
}
|
|
||||||
// Response to the master_binlog_checksum, should be stored
|
|
||||||
if (router->saved_master.chksum2)
|
|
||||||
{
|
|
||||||
GWBUF_CONSUME_ALL(router->saved_master.chksum2);
|
|
||||||
}
|
|
||||||
router->saved_master.chksum2 = buf;
|
|
||||||
blr_cache_response(router, "chksum2", buf);
|
|
||||||
|
|
||||||
|
// Next state is BLRM_MARIADB10 or BLRM_GTIDMODE
|
||||||
if (router->mariadb10_compat)
|
if (router->mariadb10_compat)
|
||||||
{
|
{
|
||||||
buf = blr_make_query(router->master, "SET @mariadb_slave_capability=4");
|
blr_register_mariadb10(router, buf);
|
||||||
router->master_state = BLRM_MARIADB10;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
buf = blr_make_query(router->master, "SELECT @@GLOBAL.GTID_MODE");
|
blr_register_mysqlgtid(router, buf);
|
||||||
router->master_state = BLRM_GTIDMODE;
|
|
||||||
}
|
}
|
||||||
router->master->func.write(router->master, buf);
|
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
case BLRM_MARIADB10:
|
case BLRM_MARIADB10:
|
||||||
// Response to the SET @mariadb_slave_capability=4, should be stored
|
// Response to the SET @mariadb_slave_capability=4, should be stored
|
||||||
if (router->saved_master.mariadb10)
|
if (router->saved_master.mariadb10)
|
||||||
@ -2724,3 +2670,201 @@ void blr_notify_all_slaves(ROUTER_INSTANCE *router)
|
|||||||
MXS_DEBUG("Notified %d slaves about new data.", notified);
|
MXS_DEBUG("Notified %d slaves about new data.", notified);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Slave Protocol registration to Master:
|
||||||
|
*
|
||||||
|
* Handles previous reply from Master and sends
|
||||||
|
* SHOW VARIABLES LIKE 'SERVER_ID to master
|
||||||
|
*
|
||||||
|
* @param router Current router instance
|
||||||
|
* @param buf GWBUF with server reply to previous
|
||||||
|
* registration command
|
||||||
|
*/
|
||||||
|
static void blr_register_serverid(ROUTER_INSTANCE *router, GWBUF *buf)
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Previous state was BLRM_TIMESTAMP
|
||||||
|
* No need to save the server reply
|
||||||
|
*/
|
||||||
|
gwbuf_free(buf);
|
||||||
|
|
||||||
|
// Prepare the new message
|
||||||
|
buf = blr_make_query(router->master, "SHOW VARIABLES LIKE 'SERVER_ID'");
|
||||||
|
|
||||||
|
// Set the new state
|
||||||
|
router->master_state = BLRM_SERVERID;
|
||||||
|
router->master->func.write(router->master, buf);
|
||||||
|
router->retry_backoff = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Slave Protocol registration to Master:
|
||||||
|
*
|
||||||
|
* Handles previous reply from Master and sends
|
||||||
|
* SET @master_heartbeat_period to master
|
||||||
|
*
|
||||||
|
* @param router Current router instance
|
||||||
|
* @param buf GWBUF with server reply to previous
|
||||||
|
* registration command
|
||||||
|
*/
|
||||||
|
static void blr_register_heartbeat(ROUTER_INSTANCE *router, GWBUF *buf)
|
||||||
|
{
|
||||||
|
char str[BLRM_SET_HEARTBEAT_QUERY_LEN];
|
||||||
|
char *val = blr_extract_column(buf, 2);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Previous state was BLRM_SERVERID:
|
||||||
|
* Save the response to fetch of master's server-id
|
||||||
|
*/
|
||||||
|
if (router->saved_master.server_id)
|
||||||
|
{
|
||||||
|
GWBUF_CONSUME_ALL(router->saved_master.server_id);
|
||||||
|
}
|
||||||
|
router->saved_master.server_id = buf;
|
||||||
|
blr_cache_response(router, "serverid", buf);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set router->masterid from master server-id
|
||||||
|
* if it's not set by the config option
|
||||||
|
*/
|
||||||
|
if (router->masterid == 0)
|
||||||
|
{
|
||||||
|
router->masterid = atoi(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare new registration message
|
||||||
|
sprintf(str,
|
||||||
|
"SET @master_heartbeat_period = %lu000000000",
|
||||||
|
router->heartbeat);
|
||||||
|
|
||||||
|
buf = blr_make_query(router->master, str);
|
||||||
|
|
||||||
|
// Set the new state
|
||||||
|
router->master_state = BLRM_HBPERIOD;
|
||||||
|
router->master->func.write(router->master, buf);
|
||||||
|
MXS_FREE(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Slave Protocol registration to Master:
|
||||||
|
*
|
||||||
|
* Handles previous reply from Master and sends
|
||||||
|
* SET @master_binlog_checksum to master
|
||||||
|
*
|
||||||
|
* @param router Current router instance
|
||||||
|
* @param buf GWBUF with server reply to previous
|
||||||
|
* registration command
|
||||||
|
*/
|
||||||
|
static void blr_register_setchecksum(ROUTER_INSTANCE *router, GWBUF *buf)
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Previous state was BLRM_HBPERIOD:
|
||||||
|
* Save the response to SET @master_heartbeat_period
|
||||||
|
*/
|
||||||
|
if (router->saved_master.heartbeat)
|
||||||
|
{
|
||||||
|
GWBUF_CONSUME_ALL(router->saved_master.heartbeat);
|
||||||
|
}
|
||||||
|
router->saved_master.heartbeat = buf;
|
||||||
|
blr_cache_response(router, "heartbeat", buf);
|
||||||
|
|
||||||
|
// New registration message
|
||||||
|
buf = blr_make_query(router->master,
|
||||||
|
"SET @master_binlog_checksum = @@global.binlog_checksum");
|
||||||
|
|
||||||
|
// Set the new state
|
||||||
|
router->master_state = BLRM_CHKSUM1;
|
||||||
|
router->master->func.write(router->master, buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Slave Protocol registration to Master:
|
||||||
|
*
|
||||||
|
* Handles previous reply from Master and sends
|
||||||
|
* SELECT @master_binlog_checksum to master
|
||||||
|
*
|
||||||
|
* @param router Current router instance
|
||||||
|
* @param buf GWBUF with server reply to previous
|
||||||
|
* registration command
|
||||||
|
*/
|
||||||
|
static void blr_register_getchecksum(ROUTER_INSTANCE *router, GWBUF *buf)
|
||||||
|
{
|
||||||
|
/* Previous state was BLRM_CHKSUM1: save the response */
|
||||||
|
if (router->saved_master.chksum1)
|
||||||
|
{
|
||||||
|
GWBUF_CONSUME_ALL(router->saved_master.chksum1);
|
||||||
|
}
|
||||||
|
router->saved_master.chksum1 = buf;
|
||||||
|
blr_cache_response(router, "chksum1", buf);
|
||||||
|
|
||||||
|
// New registration message
|
||||||
|
buf = blr_make_query(router->master, "SELECT @master_binlog_checksum");
|
||||||
|
|
||||||
|
// Set the new state
|
||||||
|
router->master_state = BLRM_CHKSUM2;
|
||||||
|
router->master->func.write(router->master, buf);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Slave Protocol registration to Master:
|
||||||
|
*
|
||||||
|
* Handles the reply from Master which
|
||||||
|
* contains the Binlog checksum algorithm in use:
|
||||||
|
* NONE or CRC32.
|
||||||
|
*
|
||||||
|
* @param router Current router instance
|
||||||
|
* @param buf GWBUF with server reply to previous
|
||||||
|
* registration command
|
||||||
|
*/
|
||||||
|
static void blr_register_handlechecksum(ROUTER_INSTANCE *router, GWBUF *buf)
|
||||||
|
{
|
||||||
|
char *val = blr_extract_column(buf, 1);
|
||||||
|
|
||||||
|
if (val && strncasecmp(val, "NONE", 4) == 0)
|
||||||
|
{
|
||||||
|
router->master_chksum = false;
|
||||||
|
}
|
||||||
|
if (val)
|
||||||
|
{
|
||||||
|
MXS_FREE(val);
|
||||||
|
}
|
||||||
|
// Previous state was BLRM_CHKSUM2: save the response */
|
||||||
|
if (router->saved_master.chksum2)
|
||||||
|
{
|
||||||
|
GWBUF_CONSUME_ALL(router->saved_master.chksum2);
|
||||||
|
}
|
||||||
|
router->saved_master.chksum2 = buf;
|
||||||
|
blr_cache_response(router, "chksum2", buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Slave Protocol registration to Master:
|
||||||
|
*
|
||||||
|
* Send SET @mariadb_slave_capability=4 to MariaDB10 Master
|
||||||
|
*
|
||||||
|
* @param router Current router instance
|
||||||
|
* @param buf GWBUF with server reply to previous
|
||||||
|
* registration command
|
||||||
|
*/
|
||||||
|
static void blr_register_mariadb10(ROUTER_INSTANCE *router, GWBUF *buf)
|
||||||
|
{
|
||||||
|
buf = blr_make_query(router->master, "SET @mariadb_slave_capability=4");
|
||||||
|
router->master_state = BLRM_MARIADB10;
|
||||||
|
router->master->func.write(router->master, buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Slave Protocol registration to Master:
|
||||||
|
*
|
||||||
|
* Send SELECT @@GLOBAL.GTID_MODE to MySQL 5.6/5.7 Master
|
||||||
|
*
|
||||||
|
* @param router Current router instance
|
||||||
|
* @param buf GWBUF with server reply to previous
|
||||||
|
* registration command
|
||||||
|
*/
|
||||||
|
static void blr_register_mysqlgtid(ROUTER_INSTANCE *router, GWBUF *buf)
|
||||||
|
{
|
||||||
|
buf = blr_make_query(router->master, "SELECT @@GLOBAL.GTID_MODE");
|
||||||
|
router->master_state = BLRM_GTIDMODE;
|
||||||
|
router->master->func.write(router->master, buf);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user