Disconnect slave when pos is UNSAFE

Disconnect slave when pos is UNSAFE instead of sending an error that
stops the replication
This commit is contained in:
MassimilianoPinto
2015-11-04 16:24:46 +01:00
parent 7c582f91ea
commit d51ac7ab7d
3 changed files with 52 additions and 12 deletions

View File

@ -180,6 +180,10 @@
/* string len for master registration query */ /* string len for master registration query */
#define BLRM_MASTER_REGITRATION_QUERY_LEN 255 #define BLRM_MASTER_REGITRATION_QUERY_LEN 255
/* Read Binlog position states */
#define SLAVE_POS_READ_OK 0x0
#define SLAVE_POS_READ_ERR 0xff
#define SLAVE_POS_READ_UNSAFE 0xfe
/** /**
* Some useful macros for examining the MySQL Response packets * Some useful macros for examining the MySQL Response packets
*/ */

View File

@ -419,7 +419,7 @@ struct stat statb;
memset(&hdbuf, '\0', BINLOG_EVENT_HDR_LEN); memset(&hdbuf, '\0', BINLOG_EVENT_HDR_LEN);
/* set error indicator */ /* set error indicator */
hdr->ok = 0xff; hdr->ok = SLAVE_POS_READ_ERR;
if (!file) if (!file)
{ {
@ -428,6 +428,11 @@ struct stat statb;
} }
if (fstat(file->fd, &statb) == 0) if (fstat(file->fd, &statb) == 0)
filelen = statb.st_size; filelen = statb.st_size;
else
{
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Invalide size for binlog file '%s', pos %lu", file->binlogname, pos);
return NULL;
}
if (pos > filelen) if (pos > filelen)
{ {
@ -440,11 +445,12 @@ struct stat statb;
{ {
if (pos > router->binlog_position) if (pos > router->binlog_position)
{ {
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested position %lu is not available. Latest safe position %lu, end of binlog '%s' is %lu", /* Unsafe position, slave will be disconnected by the calling routine */
pos, router->binlog_position, file->binlogname, router->current_pos); snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested binlog position %lu. Position is unsafe so disconnecting. Latest safe position %lu, end of binlog file %lu", pos, router->binlog_position, router->current_pos);
hdr->ok = SLAVE_POS_READ_UNSAFE;
} else { } else {
/* accessing last position is ok */ /* accessing last position is ok */
hdr->ok = 0x0; hdr->ok = SLAVE_POS_READ_OK;
} }
return NULL; return NULL;
@ -461,7 +467,7 @@ struct stat statb;
file->binlogname, pos))); file->binlogname, pos)));
/* set ok indicator */ /* set ok indicator */
hdr->ok = 0x0; hdr->ok = SLAVE_POS_READ_OK;
break; break;
case -1: case -1:
@ -529,7 +535,7 @@ struct stat statb;
pos))); pos)));
/* set ok indicator */ /* set ok indicator */
hdr->ok = 0x0; hdr->ok = SLAVE_POS_READ_OK;
break; break;
case -1: case -1:
@ -615,7 +621,7 @@ struct stat statb;
} }
/* set OK indicator */ /* set OK indicator */
hdr->ok = 0x0; hdr->ok = SLAVE_POS_READ_OK;
return result; return result;
} }

View File

@ -2048,9 +2048,10 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
if (record == NULL) { if (record == NULL) {
slave->stats.n_failed_read++; slave->stats.n_failed_read++;
if (hdr.ok == 0xff) { if (hdr.ok == SLAVE_POS_READ_ERR) {
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Slave %s:%i, server-id %d, binlog '%s', blr_read_binlog failure: %s", "%s Slave %s:%i, server-id %d, binlog '%s', %s",
router->service->name,
slave->dcb->remote, slave->dcb->remote,
slave->port, slave->port,
slave->serverid, slave->serverid,
@ -2063,12 +2064,37 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
spinlock_release(&slave->catch_lock); spinlock_release(&slave->catch_lock);
/*
* Send an error that will stop slave replication
*/
blr_send_custom_error(slave->dcb, slave->seqno++, 0, read_errmsg, "HY000", 1236); blr_send_custom_error(slave->dcb, slave->seqno++, 0, read_errmsg, "HY000", 1236);
dcb_close(slave->dcb); dcb_close(slave->dcb);
return 0; return 0;
} }
if (hdr.ok == SLAVE_POS_READ_UNSAFE) {
ROUTER_OBJECT *router_obj= router->service->router;
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"%s: Slave %s:%i, server-id %d, binlog '%s', %s",
router->service->name,
slave->dcb->remote,
slave->port,
slave->serverid,
slave->binlogfile,
read_errmsg)));
/*
* Close the slave session and socket
* The slave will try to reconnect
*/
router_obj->closeSession(router->service->router_instance, slave);
return 0;
}
} }
spinlock_acquire(&slave->catch_lock); spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_BUSY; slave->cstate &= ~CS_BUSY;
@ -2120,17 +2146,21 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
if (slave->stats.n_caughtup == 1) if (slave->stats.n_caughtup == 1)
{ {
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s is up to date %s, %lu.", "%s: Slave %s:%d, server-id %d is up to date '%s', position %lu.",
router->service->name, router->service->name,
slave->dcb->remote, slave->dcb->remote,
slave->port,
slave->serverid,
slave->binlogfile, (unsigned long)slave->binlog_pos))); slave->binlogfile, (unsigned long)slave->binlog_pos)));
} }
else if ((slave->stats.n_caughtup % 50) == 0) else if ((slave->stats.n_caughtup % 50) == 0)
{ {
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s is up to date %s, %lu.", "%s: Slave %s:%d, server-id %d is up to date '%s', position %lu.",
router->service->name, router->service->name,
slave->dcb->remote, slave->dcb->remote,
slave->port,
slave->serverid,
slave->binlogfile, (unsigned long)slave->binlog_pos))); slave->binlogfile, (unsigned long)slave->binlog_pos)));
} }
} }
@ -2348,7 +2378,7 @@ char err_msg[BINLOG_ERROR_MSG_LEN+1];
return; return;
if ((record = blr_read_binlog(router, file, 4, &hdr, err_msg)) == NULL) if ((record = blr_read_binlog(router, file, 4, &hdr, err_msg)) == NULL)
{ {
if (hdr.ok == 0xff) { if (hdr.ok != SLAVE_POS_READ_OK) {
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Slave %s:%i, server-id %d, binlog '%s', blr_read_binlog failure: %s", "Slave %s:%i, server-id %d, binlog '%s', blr_read_binlog failure: %s",
slave->dcb->remote, slave->dcb->remote,