Merge branch '1.2.1-binlog_router_trx' into develop

This commit is contained in:
MassimilianoPinto
2015-11-04 16:42:30 +01:00
3 changed files with 52 additions and 12 deletions

View File

@ -180,6 +180,10 @@
/* string len for master registration query */ /* string len for master registration query */
#define BLRM_MASTER_REGITRATION_QUERY_LEN 255 #define BLRM_MASTER_REGITRATION_QUERY_LEN 255
/* Read Binlog position states */
#define SLAVE_POS_READ_OK 0x0
#define SLAVE_POS_READ_ERR 0xff
#define SLAVE_POS_READ_UNSAFE 0xfe
/** /**
* Some useful macros for examining the MySQL Response packets * Some useful macros for examining the MySQL Response packets
*/ */

View File

@ -415,7 +415,7 @@ struct stat statb;
memset(&hdbuf, '\0', BINLOG_EVENT_HDR_LEN); memset(&hdbuf, '\0', BINLOG_EVENT_HDR_LEN);
/* set error indicator */ /* set error indicator */
hdr->ok = 0xff; hdr->ok = SLAVE_POS_READ_ERR;
if (!file) if (!file)
{ {
@ -424,6 +424,11 @@ struct stat statb;
} }
if (fstat(file->fd, &statb) == 0) if (fstat(file->fd, &statb) == 0)
filelen = statb.st_size; filelen = statb.st_size;
else
{
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Invalid size of binlog file, pos %lu", pos);
return NULL;
}
if (pos > filelen) if (pos > filelen)
{ {
@ -436,11 +441,12 @@ struct stat statb;
{ {
if (pos > router->binlog_position) if (pos > router->binlog_position)
{ {
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested position %lu is not available. Latest safe position %lu, end of binlog '%s' is %lu", /* Unsafe position, slave will be disconnected by the calling routine */
pos, router->binlog_position, file->binlogname, router->current_pos); snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested binlog position %lu. Position is unsafe so disconnecting. Latest safe position %lu, end of binlog file %lu", pos, router->binlog_position, router->current_pos);
hdr->ok = SLAVE_POS_READ_UNSAFE;
} else { } else {
/* accessing last position is ok */ /* accessing last position is ok */
hdr->ok = 0x0; hdr->ok = SLAVE_POS_READ_OK;
} }
return NULL; return NULL;
@ -457,7 +463,7 @@ struct stat statb;
file->binlogname, pos))); file->binlogname, pos)));
/* set ok indicator */ /* set ok indicator */
hdr->ok = 0x0; hdr->ok = SLAVE_POS_READ_OK;
break; break;
case -1: case -1:
@ -525,7 +531,7 @@ struct stat statb;
pos))); pos)));
/* set ok indicator */ /* set ok indicator */
hdr->ok = 0x0; hdr->ok = SLAVE_POS_READ_OK;
break; break;
case -1: case -1:
@ -611,7 +617,7 @@ struct stat statb;
} }
/* set OK indicator */ /* set OK indicator */
hdr->ok = 0x0; hdr->ok = SLAVE_POS_READ_OK;
return result; return result;
} }

View File

@ -2044,9 +2044,10 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
if (record == NULL) { if (record == NULL) {
slave->stats.n_failed_read++; slave->stats.n_failed_read++;
if (hdr.ok == 0xff) { if (hdr.ok == SLAVE_POS_READ_ERR) {
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Slave %s:%i, server-id %d, binlog '%s', blr_read_binlog failure: %s", "%s Slave %s:%i, server-id %d, binlog '%s', %s",
router->service->name,
slave->dcb->remote, slave->dcb->remote,
slave->port, slave->port,
slave->serverid, slave->serverid,
@ -2059,12 +2060,37 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
spinlock_release(&slave->catch_lock); spinlock_release(&slave->catch_lock);
/*
* Send an error that will stop slave replication
*/
blr_send_custom_error(slave->dcb, slave->seqno++, 0, read_errmsg, "HY000", 1236); blr_send_custom_error(slave->dcb, slave->seqno++, 0, read_errmsg, "HY000", 1236);
dcb_close(slave->dcb); dcb_close(slave->dcb);
return 0; return 0;
} }
if (hdr.ok == SLAVE_POS_READ_UNSAFE) {
ROUTER_OBJECT *router_obj= router->service->router;
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"%s: Slave %s:%i, server-id %d, binlog '%s', %s",
router->service->name,
slave->dcb->remote,
slave->port,
slave->serverid,
slave->binlogfile,
read_errmsg)));
/*
* Close the slave session and socket
* The slave will try to reconnect
*/
router_obj->closeSession(router->service->router_instance, slave);
return 0;
}
} }
spinlock_acquire(&slave->catch_lock); spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_BUSY; slave->cstate &= ~CS_BUSY;
@ -2116,17 +2142,21 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
if (slave->stats.n_caughtup == 1) if (slave->stats.n_caughtup == 1)
{ {
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s is up to date %s, %lu.", "%s: Slave %s:%d, server-id %d is up to date '%s', position %lu.",
router->service->name, router->service->name,
slave->dcb->remote, slave->dcb->remote,
slave->port,
slave->serverid,
slave->binlogfile, (unsigned long)slave->binlog_pos))); slave->binlogfile, (unsigned long)slave->binlog_pos)));
} }
else if ((slave->stats.n_caughtup % 50) == 0) else if ((slave->stats.n_caughtup % 50) == 0)
{ {
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s is up to date %s, %lu.", "%s: Slave %s:%d, server-id %d is up to date '%s', position %lu.",
router->service->name, router->service->name,
slave->dcb->remote, slave->dcb->remote,
slave->port,
slave->serverid,
slave->binlogfile, (unsigned long)slave->binlog_pos))); slave->binlogfile, (unsigned long)slave->binlog_pos)));
} }
} }
@ -2344,7 +2374,7 @@ char err_msg[BINLOG_ERROR_MSG_LEN+1];
return; return;
if ((record = blr_read_binlog(router, file, 4, &hdr, err_msg)) == NULL) if ((record = blr_read_binlog(router, file, 4, &hdr, err_msg)) == NULL)
{ {
if (hdr.ok == 0xff) { if (hdr.ok != SLAVE_POS_READ_OK) {
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Slave %s:%i, server-id %d, binlog '%s', blr_read_binlog failure: %s", "Slave %s:%i, server-id %d, binlog '%s', blr_read_binlog failure: %s",
slave->dcb->remote, slave->dcb->remote,