STOP/START SLAVE first implementation

STOP/START SLAVE first implementation
This commit is contained in:
MassimilianoPinto
2015-05-25 17:37:39 +02:00
parent c1b956af2c
commit 7634f22a94
3 changed files with 126 additions and 13 deletions

View File

@ -26,6 +26,7 @@
* *
* Date Who Description * Date Who Description
* 02/04/14 Mark Riddoch Initial implementation * 02/04/14 Mark Riddoch Initial implementation
* 25/05/15 Massimiliano Pinto Added BLRM_SLAVE_STOPPED state
* *
* @endverbatim * @endverbatim
*/ */
@ -314,15 +315,16 @@ typedef struct router_instance {
#define BLRM_MAP 0x0011 #define BLRM_MAP 0x0011
#define BLRM_REGISTER 0x0012 #define BLRM_REGISTER 0x0012
#define BLRM_BINLOGDUMP 0x0013 #define BLRM_BINLOGDUMP 0x0013
#define BLRM_SLAVE_STOPPED 0x0014
#define BLRM_MAXSTATE 0x0013 #define BLRM_MAXSTATE 0x0014
static char *blrm_states[] = { "Unconnected", "Connecting", "Authenticated", "Timestamp retrieval", static char *blrm_states[] = { "Unconnected", "Connecting", "Authenticated", "Timestamp retrieval",
"Server ID retrieval", "HeartBeat Period setup", "binlog checksum config", "Server ID retrieval", "HeartBeat Period setup", "binlog checksum config",
"binlog checksum rerieval", "GTID Mode retrieval", "Master UUID retrieval", "binlog checksum rerieval", "GTID Mode retrieval", "Master UUID retrieval",
"Set Slave UUID", "Set Names latin1", "Set Names utf8", "select 1", "Set Slave UUID", "Set Names latin1", "Set Names utf8", "select 1",
"select version()", "select @@version_comment", "select @@hostname", "select version()", "select @@version_comment", "select @@hostname",
"select @@mx_allowed_packet", "Register slave", "Binlog Dump" }; "select @@mx_allowed_packet", "Register slave", "Binlog Dump" , "Slave stopped"};
#define BLRS_CREATED 0x0000 #define BLRS_CREATED 0x0000
#define BLRS_UNREGISTERED 0x0001 #define BLRS_UNREGISTERED 0x0001

View File

@ -33,6 +33,7 @@
* *
* Date Who Description * Date Who Description
* 02/04/2014 Mark Riddoch Initial implementation * 02/04/2014 Mark Riddoch Initial implementation
* 25/05/2015 Massimiliano Pinto Addev BLRM_SLAVE_STOPPED state
* *
* @endverbatim * @endverbatim
*/ */
@ -78,7 +79,7 @@ static void *CreateMySQLAuthData(char *username, char *password, char *database)
void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr); void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
inline uint32_t extract_field(uint8_t *src, int bits); inline uint32_t extract_field(uint8_t *src, int bits);
static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len); static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len);
static void blr_master_close(ROUTER_INSTANCE *); void blr_master_close(ROUTER_INSTANCE *);
static char *blr_extract_column(GWBUF *buf, int col); static char *blr_extract_column(GWBUF *buf, int col);
static int keepalive = 1; static int keepalive = 1;
@ -100,9 +101,15 @@ GWBUF *buf;
spinlock_acquire(&router->lock); spinlock_acquire(&router->lock);
if (router->master_state != BLRM_UNCONNECTED) if (router->master_state != BLRM_UNCONNECTED)
{ {
if (router->master_state != BLRM_SLAVE_STOPPED) {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"%s: Master Connect: Unexpected master state %s\n", "%s: Master Connect: Unexpected master state %s\n",
router->service->name, blrm_states[router->master_state]))); router->service->name, blrm_states[router->master_state])));
} else {
LOGIF(LM, (skygw_log_write_flush(LOGFILE_MESSAGE,
"%s: Master Connect: binlog state is %s\n",
router->service->name, blrm_states[router->master_state])));
}
spinlock_release(&router->lock); spinlock_release(&router->lock);
return; return;
} }

View File

@ -36,6 +36,7 @@
* 18/02/2015 Massimiliano Pinto Addition of DISCONNECT ALL and DISCONNECT SERVER server_id * 18/02/2015 Massimiliano Pinto Addition of DISCONNECT ALL and DISCONNECT SERVER server_id
* 18/03/2015 Markus Makela Better detection of CRC32 | NONE checksum * 18/03/2015 Markus Makela Better detection of CRC32 | NONE checksum
* 19/03/2015 Massimiliano Pinto Addition of basic MariaDB 10 compatibility support * 19/03/2015 Massimiliano Pinto Addition of basic MariaDB 10 compatibility support
* 25/05/2015 Massimiliano Pinto Addition BLRM_SLAVE_STOPPED state and blr_start/stop_slave
* *
* @endverbatim * @endverbatim
*/ */
@ -83,6 +84,8 @@ static int blr_slave_send_disconnected_server(ROUTER_INSTANCE *router, ROUTER_SL
static int blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); static int blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_disconnect_server(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int server_id); static int blr_slave_disconnect_server(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int server_id);
static int blr_slave_send_ok(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave); static int blr_slave_send_ok(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave);
static int blr_stop_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave);
static int blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave);
extern int lm_enabled_logfiles_bitmask; extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[]; extern size_t log_ses_count[];
@ -411,13 +414,41 @@ int query_len;
} }
} }
} }
/* start replication from the current configured master */
else if (strcasecmp(query_text, "START") == 0)
{
if ((word = strtok_r(NULL, sep, &brkb)) == NULL)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "%s: Incomplete START command.",
router->service->name)));
}
else if (strcasecmp(word, "SLAVE") == 0)
{
free(query_text);
return blr_start_slave(router, slave);
}
}
/* stop replication from the current */
else if (strcasecmp(query_text, "STOP") == 0)
{
if ((word = strtok_r(NULL, sep, &brkb)) == NULL)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "%s: Incomplete STOP command.",
router->service->name)));
}
else if (strcasecmp(word, "SLAVE") == 0)
{
free(query_text);
return blr_stop_slave(router, slave);
}
}
else if (strcasecmp(query_text, "DISCONNECT") == 0) else if (strcasecmp(query_text, "DISCONNECT") == 0)
{ {
if ((word = strtok_r(NULL, sep, &brkb)) == NULL) if ((word = strtok_r(NULL, sep, &brkb)) == NULL)
{ {
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "%s: Incomplete DISCONNECT command.", LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "%s: Incomplete DISCONNECT command.",
router->service->name))); router->service->name)));
} }
else if (strcasecmp(word, "ALL") == 0) else if (strcasecmp(word, "ALL") == 0)
{ {
@ -2084,12 +2115,16 @@ blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
return 1; return 1;
} }
/** /**
* Send a MySQL OK packet to the DCB * Send a MySQL OK packet to the slave backend
*
* @param router The binlog router instance
* @param slave The slave server to which we are sending the response
* *
* @param dcb The DCB to send the OK packet to
* @return result of a write call, non-zero if write was successful * @return result of a write call, non-zero if write was successful
*/ */
static int static int
blr_slave_send_ok(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) blr_slave_send_ok(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
{ {
@ -2110,5 +2145,74 @@ uint8_t *ptr;
*ptr++ = 0; *ptr++ = 0;
*ptr++ = 0; *ptr++ = 0;
*ptr++ = 0; *ptr++ = 0;
return slave->dcb->func.write(slave->dcb, pkt); return slave->dcb->func.write(slave->dcb, pkt);
} }
/**
* Stop current replication from master
*
* @param router The binlog router instance
* @param slave The slave server to which we are sending the response*
*
*/
static int
blr_stop_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, "%s: STOP SLAVE received by %s@%s. Disconnecting from master %s:%d",
router->service->name,
slave->dcb->user,
slave->dcb->remote,
router->service->dbref->server->name,
router->service->dbref->server->port)));
if (router->master_state != BLRM_SLAVE_STOPPED) {
if (router->master->fd != -1)
blr_master_close(router);
spinlock_acquire(&router->lock);
router->master_state = BLRM_SLAVE_STOPPED;
spinlock_release(&router->lock);
return blr_slave_send_ok(router, slave);
} else {
blr_slave_send_error(router, slave, "Slave connection is not running");
return 1;
}
}
/**
* Start replication from current configured master
*
* @param router The binlog router instance
* @param slave The slave server to which we are sending the response
*
*/
static int
blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, "%s: START SLAVE received by %s@%s. Trying connection to master %s:%d",
router->service->name,
slave->dcb->user,
slave->dcb->remote,
router->service->dbref->server->name,
router->service->dbref->server->port)));
if ( (router->master_state == BLRM_UNCONNECTED) || (router->master_state == BLRM_SLAVE_STOPPED) ) {
spinlock_acquire(&router->lock);
router->master_state = BLRM_UNCONNECTED;
spinlock_release(&router->lock);
blr_start_master(router);
return blr_slave_send_ok(router, slave);
} else {
blr_slave_send_error(router, slave, "Slave connection is already running");
return 1;
}
}