MXS-1209: blr_master.c cleanup

Further optimisations will come with new registration phases
This commit is contained in:
MassimilianoPinto
2017-04-28 18:08:33 +02:00
parent 23c166ba37
commit 5cef78b7d4

View File

@ -118,7 +118,7 @@ static void blr_register_serverid(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_heartbeat(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_setchecksum(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_getchecksum(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_handlechecksum(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_handle_checksum(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_mariadb10(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_mysqlgtid(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_serveruuid(ROUTER_INSTANCE *router, GWBUF *buf);
@ -130,8 +130,20 @@ static void blr_register_selecthostname(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_selectmap(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_mxw_binlogvars(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_mxw_tables(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_checksemisync(ROUTER_INSTANCE *router, GWBUF *buf);
static bool blr_register_requestsemisync(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_getsemisync(ROUTER_INSTANCE *router, GWBUF *buf);
static bool blr_register_setsemisync(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_mxw_handlelowercase(ROUTER_INSTANCE *router,
GWBUF *buf);
static void blr_register_mxw_servervars(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_set_latin1(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_cache_map(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_cache_utf8(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_cache_utf8(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_cache_slaveuuid(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_cache_mariadb10(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_send_select1(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_mxw_sqlmode(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_mxw_charset(ROUTER_INSTANCE *router, GWBUF *buf);
static void worker_cb_start_master(int worker_id, void* data);
static void blr_start_master_in_main(void* data);
@ -546,8 +558,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
break;
case BLRM_CHKSUM2:
// Set router->master_chksum based on server reply
blr_register_handlechecksum(router, buf);
blr_register_getchecksum(router, buf);
// Next state is BLRM_MARIADB10 or BLRM_GTIDMODE
if (router->mariadb10_compat)
{
@ -559,75 +570,41 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
}
break;
case BLRM_MARIADB10:
// Response to the SET @mariadb_slave_capability=4, should be stored
if (router->saved_master.mariadb10)
{
GWBUF_CONSUME_ALL(router->saved_master.mariadb10);
}
router->saved_master.mariadb10 = buf;
blr_cache_response(router, "mariadb10", buf);
// Skip SERVER_UUID fetch and SET slave UUID (MySQL 5.6/7 only)
buf = blr_make_query(router->master, "SET NAMES latin1");
router->master_state = BLRM_LATIN1;
router->master->func.write(router->master, buf);
blr_register_cache_mariadb10(router, buf);
// Skip SERVER_UUID fetch and SET slave UUID
blr_register_set_latin1(router, buf);
break;
case BLRM_GTIDMODE:
case BLRM_GTIDMODE: // MySQL 5.6/7 only
blr_register_serveruuid(router, buf);
break;
case BLRM_MUUID:
case BLRM_MUUID: // MySQL 5.6/7 only
blr_register_slaveuuid(router, buf);
break;
case BLRM_SUUID:
// Response to the SET @server_uuid, should be stored
if (router->saved_master.setslaveuuid)
{
GWBUF_CONSUME_ALL(router->saved_master.setslaveuuid);
}
router->saved_master.setslaveuuid = buf;
blr_cache_response(router, "ssuuid", buf);
buf = blr_make_query(router->master, "SET NAMES latin1");
router->master_state = BLRM_LATIN1;
router->master->func.write(router->master, buf);
case BLRM_SUUID: // MySQL 5.6/7 only
blr_register_cache_slaveuuid(router, buf);
blr_register_set_latin1(router, buf);
break;
case BLRM_LATIN1:
blr_register_utf8(router, buf);
break;
case BLRM_UTF8:
// Response to the SET NAMES utf8, should be stored
if (router->saved_master.utf8)
{
GWBUF_CONSUME_ALL(router->saved_master.utf8);
}
router->saved_master.utf8 = buf;
blr_cache_response(router, "utf8", buf);
blr_register_cache_utf8(router, buf);
if (router->maxwell_compat)
{
buf = blr_make_query(router->master, "SET character_set_results = NULL");
router->master_state = BLRM_RESULTS_CHARSET;
router->master->func.write(router->master, buf);
blr_register_mxw_charset(router, buf);
}
else
{
buf = blr_make_query(router->master, "SELECT 1");
router->master_state = BLRM_SELECT1;
router->master->func.write(router->master, buf);
blr_register_send_select1(router, buf);
}
break;
case BLRM_RESULTS_CHARSET:
gwbuf_free(buf);
buf = blr_make_query(router->master, MYSQL_CONNECTOR_SQL_MODE_QUERY);
router->master_state = BLRM_SQL_MODE;
router->master->func.write(router->master, buf);
gwbuf_free(buf); // Discard server reply, don't save it
blr_register_mxw_sqlmode(router, buf);
break;
case BLRM_SQL_MODE:
gwbuf_free(buf);
buf = blr_make_query(router->master, "SELECT 1");
router->master_state = BLRM_SELECT1;
router->master->func.write(router->master, buf);
gwbuf_free(buf); // Discard server reply, don't save it
blr_register_send_select1(router, buf);
break;
case BLRM_SELECT1:
blr_register_selectversion(router, buf);
@ -642,20 +619,10 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
blr_register_selectmap(router, buf);
break;
case BLRM_MAP:
// Response to SELECT @@max_allowed_packet should be stored
if (router->saved_master.map)
{
GWBUF_CONSUME_ALL(router->saved_master.map);
}
router->saved_master.map = buf;
blr_cache_response(router, "map", buf);
blr_register_cache_map(router, buf);
if (router->maxwell_compat)
{
// Query for Server Variables
buf = blr_make_query(router->master, MYSQL_CONNECTOR_SERVER_VARS_QUERY);
router->master_state = BLRM_SERVER_VARS;
router->master->func.write(router->master, buf);
blr_register_mxw_servervars(router, buf);
break;
}
else
@ -669,7 +636,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
* with new state BLRM_REGISTER_READY
* Go ahead if maxwell_compat is not set
*/
if (router->maxwell_compat)
if (router->master_state == BLRM_SERVER_VARS && router->maxwell_compat)
{
blr_register_mxw_binlogvars(router, buf);
break;
@ -680,7 +647,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
* with new state BLRM_REGISTER_READY.
* Go ahead if maxwell_compat is not set
*/
if (router->maxwell_compat)
if (router->master_state == BLRM_BINLOG_VARS && router->maxwell_compat)
{
blr_register_mxw_tables(router, buf);
break;
@ -691,21 +658,18 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
* with new state BLRM_REGISTER_READY.
* Go ahead if maxwell_compat is not set
*/
if (router->maxwell_compat)
if (router->master_state == BLRM_LOWER_CASE_TABLES &&
router->maxwell_compat)
{
if (router->saved_master.lower_case_tables)
{
GWBUF_CONSUME_ALL(router->saved_master.lower_case_tables);
}
router->saved_master.lower_case_tables = buf;
blr_cache_response(router, "lower_case_tables", buf);
router->master_state = BLRM_REGISTER_READY;
blr_register_mxw_handlelowercase(router, buf);
// Continue: ready for the registration, nothing to write/read
}
case BLRM_REGISTER_READY:
// Prepare registration
// Prepare Slave registration request: COM_REGISTER_SLAVE
buf = blr_make_registration(router);
// Set new state
router->master_state = BLRM_REGISTER;
// Send the packet
router->master->func.write(router->master, buf);
break;
case BLRM_REGISTER:
@ -715,7 +679,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
/* if semisync option is set, check for master semi-sync availability */
if (router->request_semi_sync)
{
blr_register_checksemisync(router, buf);
blr_register_getsemisync(router, buf);
break;
}
else
@ -730,7 +694,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
*/
if (router->master_state == BLRM_CHECK_SEMISYNC)
{
if (blr_register_requestsemisync(router, buf))
if (blr_register_setsemisync(router, buf))
{
break;
}
@ -748,19 +712,17 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
/* Continue */
router->master_state = BLRM_REQUEST_BINLOGDUMP;
}
case BLRM_REQUEST_BINLOGDUMP:
/**
* This branch is reached after semi-sync check/request or
* just after sending COM_REGISTER_SLAVE if request_semi_sync option is false
*/
/* Request now a dump of the binlog file */
/* Request now a dump of the binlog file: COM_BINLOG_DUMP */
buf = blr_make_binlog_dump(router);
router->master_state = BLRM_BINLOGDUMP;
router->master->func.write(router->master, buf);
MXS_NOTICE("%s: Request binlog records from %s at "
"position %lu from master server [%s]:%d",
router->service->name, router->binlog_name,
@ -770,9 +732,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
/* Log binlog router identity */
blr_log_identity(router);
break;
case BLRM_BINLOGDUMP:
/**
* Main body, we have received a binlog record from the master
@ -3035,6 +2995,7 @@ static void blr_register_mxw_binlogvars(ROUTER_INSTANCE *router, GWBUF *buf)
*/
static void blr_register_mxw_tables(ROUTER_INSTANCE *router, GWBUF *buf)
{
// Response from master should be stored
if (router->saved_master.binlog_vars)
{
GWBUF_CONSUME_ALL(router->saved_master.binlog_vars);
@ -3049,19 +3010,43 @@ static void blr_register_mxw_tables(ROUTER_INSTANCE *router, GWBUF *buf)
router->master->func.write(router->master, buf);
}
static void blr_register_checksemisync(ROUTER_INSTANCE *router, GWBUF *buf)
/**
* Slave protocol registration: check Master SEMI-SYNC replication
*
* Ask master server for Semi-Sync replication capability
*
* Note: Master server must have rpl_semi_sync_master plugin installed
* in order to start the Semi-Sync replication
*
* @param router Current router instance
* @param buf The GWBUF to fill with new request
*/
static void blr_register_getsemisync(ROUTER_INSTANCE *router, GWBUF *buf)
{
MXS_NOTICE("%s: checking Semi-Sync replication capability for master server [%s]:%d",
router->service->name,
router->service->dbref->server->name,
router->service->dbref->server->port);
// New registration message
buf = blr_make_query(router->master, "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'");
// Set the new state
router->master_state = BLRM_CHECK_SEMISYNC;
router->master->func.write(router->master, buf);
}
static bool blr_register_requestsemisync(ROUTER_INSTANCE *router, GWBUF *buf)
/**
* Slave protocol registration: handle SEMI-SYNC replication
*
* Get master semisync capability
* and if installed start the SEMI-SYNC replication
*
* @param router Current router instance
* @param buf GWBUF with server reply to previous
* registration command
* @return True is semi-sync can be started or false
*/
static bool blr_register_setsemisync(ROUTER_INSTANCE *router, GWBUF *buf)
{
if (router->master_state == BLRM_CHECK_SEMISYNC)
{
@ -3089,8 +3074,8 @@ static bool blr_register_requestsemisync(ROUTER_INSTANCE *router, GWBUF *buf)
if (router->master_semi_sync == MASTER_SEMISYNC_DISABLED)
{
/* Installed but not enabled, right now */
MXS_NOTICE("%s: master server [%s]:%d doesn't have semi_sync enabled right now, "
"Requesting Semi-Sync Replication",
MXS_NOTICE("%s: master server [%s]:%d doesn't have semi_sync"
" enabled right now, Request Semi-Sync Replication anyway",
router->service->name,
router->service->dbref->server->name,
router->service->dbref->server->port);
@ -3098,7 +3083,8 @@ static bool blr_register_requestsemisync(ROUTER_INSTANCE *router, GWBUF *buf)
else
{
/* Installed and enabled */
MXS_NOTICE("%s: master server [%s]:%d has semi_sync enabled, Requesting Semi-Sync Replication",
MXS_NOTICE("%s: master server [%s]:%d has semi_sync enabled,"
" Requesting Semi-Sync Replication",
router->service->name,
router->service->dbref->server->name,
router->service->dbref->server->port);
@ -3115,3 +3101,189 @@ static bool blr_register_requestsemisync(ROUTER_INSTANCE *router, GWBUF *buf)
return false;
}
/**
* Slave Protocol registration to Master (MaxWell compatibility):
*
* Handles previous reply from Master and
* sets the state to BLRM_REGISTER_READY
*
* @param router Current router instance
* @param buf GWBUF with server reply to previous
* registration command
*/
static void blr_register_mxw_handlelowercase(ROUTER_INSTANCE *router,
GWBUF *buf)
{
// Response from master should be stored
if (router->saved_master.lower_case_tables)
{
GWBUF_CONSUME_ALL(router->saved_master.lower_case_tables);
}
router->saved_master.lower_case_tables = buf;
blr_cache_response(router, "lower_case_tables", buf);
// Set the new state
router->master_state = BLRM_REGISTER_READY;
}
/**
* Slave Protocol registration to Master:
*
* Saves previous reply from Master (Max Allowed Packet)
*
* @param router Current router instance
* @param buf GWBUF with server reply to previous
* registration command
*/
static void blr_register_cache_map(ROUTER_INSTANCE *router,
GWBUF *buf)
{
// Response to SELECT @@max_allowed_packet should be stored
if (router->saved_master.map)
{
GWBUF_CONSUME_ALL(router->saved_master.map);
}
router->saved_master.map = buf;
blr_cache_response(router, "map", buf);
}
/**
* Slave Protocol registration to Master (MaxWell compatibility):
*
* Sends MYSQL_CONNECTOR_SERVER_VARS_QUERY to Master
*
* @param router Current router instance
* @param buf GWBUF to fill with new request
*/
static void blr_register_mxw_servervars(ROUTER_INSTANCE *router, GWBUF *buf)
{
// New registration message
buf = blr_make_query(router->master, MYSQL_CONNECTOR_SERVER_VARS_QUERY);
// Set the new state
router->master_state = BLRM_SERVER_VARS;
router->master->func.write(router->master, buf);
}
/**
* Slave Protocol registration to Master:
*
* Saves previous reply from Master (UTF8)
*
* @param router Current router instance
* @param buf GWBUF with server reply to previous
* registration command
*/
static void blr_register_cache_utf8(ROUTER_INSTANCE *router, GWBUF *buf)
{
// Response to the SET NAMES utf8, should be stored
if (router->saved_master.utf8)
{
GWBUF_CONSUME_ALL(router->saved_master.utf8);
}
router->saved_master.utf8 = buf;
blr_cache_response(router, "utf8", buf);
}
/**
* Slave Protocol registration to Master:
*
* Sends SET NAMES latin1 to Master
*
* @param router Current router instance
* @param buf GWBUF to fill with new request
*/
static void blr_register_set_latin1(ROUTER_INSTANCE *router, GWBUF *buf)
{
buf = blr_make_query(router->master, "SET NAMES latin1");
// Set the new state
router->master_state = BLRM_LATIN1;
router->master->func.write(router->master, buf);
}
/**
* Slave Protocol registration to Master:
*
* Saves previous reply from Master (Slave UUID)
*
* @param router Current router instance
* @param buf GWBUF with server reply to previous
* registration command
*/
static void blr_register_cache_slaveuuid(ROUTER_INSTANCE *router, GWBUF *buf)
{
// Response to the SET @server_uuid, should be stored
if (router->saved_master.setslaveuuid)
{
GWBUF_CONSUME_ALL(router->saved_master.setslaveuuid);
}
router->saved_master.setslaveuuid = buf;
blr_cache_response(router, "ssuuid", buf);
}
/**
* Slave Protocol registration to Master:
*
* Saves previous reply from Master (MariaDB10 compatibility)
*
* @param router Current router instance
* @param buf GWBUF with server reply to previous
* registration command
*/
static void blr_register_cache_mariadb10(ROUTER_INSTANCE *router, GWBUF *buf)
{
// Response to the SET @mariadb_slave_capability=4, should be stored
if (router->saved_master.mariadb10)
{
GWBUF_CONSUME_ALL(router->saved_master.mariadb10);
}
router->saved_master.mariadb10 = buf;
blr_cache_response(router, "mariadb10", buf);
}
/**
* Slave Protocol registration to Master:
*
* Sends SELECT 1 to Master
*
* @param router Current router instance
* @param buf GWBUF to fill with new request
*/
static void blr_register_send_select1(ROUTER_INSTANCE *router, GWBUF *buf)
{
buf = blr_make_query(router->master, "SELECT 1");
// Set the new state
router->master_state = BLRM_SELECT1;
router->master->func.write(router->master, buf);
}
/**
* Slave Protocol registration to Master (MaxWell compatibility):
*
* Sends MYSQL_CONNECTOR_SQL_MODE_QUERY to Master
*
* @param router Current router instance
* @param buf GWBUF to fill with new request
*/
static void blr_register_mxw_sqlmode(ROUTER_INSTANCE *router, GWBUF *buf)
{
buf = blr_make_query(router->master, MYSQL_CONNECTOR_SQL_MODE_QUERY);
// Set the new state
router->master_state = BLRM_SQL_MODE;
router->master->func.write(router->master, buf);
}
/**
* Slave Protocol registration to Master (MaxWell compatibility):
*
* Sends SET character_set_results = NULL to Master
*
* @param router Current router instance
* @param buf GWBUF to fill with new request
*/
static void blr_register_mxw_charset(ROUTER_INSTANCE *router, GWBUF *buf)
{
buf = blr_make_query(router->master, "SET character_set_results = NULL");
// Set the new state
router->master_state = BLRM_RESULTS_CHARSET;
router->master->func.write(router->master, buf);
}