Misc fixes for bin log rotate issues

This commit is contained in:
Mark Riddoch 2014-05-28 23:38:54 +01:00
parent 97bb1638ad
commit 4b5f801ff9
5 changed files with 78 additions and 28 deletions

View File

@ -248,6 +248,8 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered",
*/
#define CS_READING 0x0001
#define CS_INNERLOOP 0x0002
#define CS_UPTODATE 0x0004
#define CS_EXPECTCB 0x0008
/**
* MySQL protocol OpCodes needed for replication

View File

@ -8,11 +8,6 @@ MariaDB or Percona Server.
To Do List:
1. Thread safety needs to be examine, currently MaxScale has been
run with a single thread when testing this router.
1. The router does not implement the replication heartbeat mechanism.
2. Binlog rotate events have yet to be tested.
3. The router does not implement the replication heartbeat mechanism.
4. Performance measurements have yet to be made.
2. Performance measurements have yet to be made.

View File

@ -330,6 +330,8 @@ ROUTER_SLAVE *slave;
memset(&slave->stats, 0, sizeof(SLAVE_STATS));
atomic_add(&inst->stats.n_slaves, 1);
slave->state = BLRS_CREATED; /* Set initial state of the slave */
slave->cstate = 0;
spinlock_init(&slave->catch_lock);
slave->dcb = session->client;
slave->router = instance;
@ -555,6 +557,14 @@ int i = 0;
dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events);
dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts);
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
if ((session->cstate & CS_UPTODATE) == 0)
{
dcb_printf(dcb, "\t\tSlave is in catchup mode. %s\n",
((session->cstate & CS_EXPECTCB) == 0 ? "" :
"Waiting for DCB queue to drain."));
}
else
dcb_printf(dcb, "\t\tSlave is in normal mode.\n");
session = session->next;
}
spinlock_release(&router_inst->lock);

View File

@ -658,7 +658,8 @@ ROUTER_SLAVE *slave;
slave = router->slaves;
while (slave)
{
if (slave->binlog_pos == hdr->next_pos - hdr->event_size)
if ((slave->binlog_pos == hdr->next_pos - hdr->event_size)
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
{
pkt = gwbuf_alloc(hdr->event_size + 5);
buf = GWBUF_DATA(pkt);
@ -667,12 +668,37 @@ ROUTER_SLAVE *slave;
*buf++ = slave->seqno++;
*buf++ = 0; // OK
memcpy(buf, ptr, hdr->event_size);
slave->dcb->func.write(slave->dcb, pkt);
slave->binlog_pos = hdr->next_pos;
if (hdr->event_type == ROTATE_EVENT)
{
blr_slave_rotate(slave, ptr);
}
slave->dcb->func.write(slave->dcb, pkt);
if (hdr->event_type != ROTATE_EVENT)
{
slave->binlog_pos = hdr->next_pos;
}
}
else if ((hdr->event_type != ROTATE_EVENT)
&& (slave->binlog_pos != hdr->next_pos ||
strcmp(slave->binlogfile, router->binlog_name) != 0))
{
/* Check slave is in catchup mode and if not
* force it to go into catchup mode.
*/
if (slave->cstate & CS_UPTODATE)
{
spinlock_release(&router->lock);
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_UPTODATE;
spinlock_release(&slave->catch_lock);
blr_slave_catchup(router, slave);
spinlock_acquire(&router->lock);
slave = router->slaves;
if (slave)
continue;
else
break;
}
}
slave = slave->next;

View File

@ -61,7 +61,7 @@ static void blr_slave_send_error(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, c
static int blr_slave_send_timestamp(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_register(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
static int blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
static int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
static int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
@ -88,7 +88,7 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
{
if (slave->state < 0 || slave->state > BLRS_MAXSTATE)
{
LOGIF(LM, (skygw_log_write(
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Invalid slave state machine state (%d) for binlog router.\n",
slave->state)));
gwbuf_consume(queue, gwbuf_length(queue));
@ -108,6 +108,10 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
return blr_slave_binlog_dump(router, slave, queue);
break;
default:
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Unexpected MySQL Command (%d) received from slave\n",
MYSQL_COMMAND(queue))));
break;
}
return 0;
@ -450,7 +454,13 @@ uint32_t chksum;
len = extract_field(ptr, 24);
ptr += 4; // Skip length and sequence number
if (*ptr++ != COM_BINLOG_DUMP)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_binlog_dump expected a COM_BINLOG_DUMP but received %d\n",
*(ptr-1))));
return 0;
}
slave->binlog_pos = extract_field(ptr, 32);
ptr += 4;
@ -632,16 +642,19 @@ uint8_t *ptr;
* @param slave The slave that is behind
* @return The number of bytes written
*/
static int
int
blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
{
GWBUF *head, *record;
REP_HEADER hdr;
int written, fd, rval = 0, burst = 0;
int written, fd, rval = 1, burst = 0;
uint8_t *ptr;
struct timespec req;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
/*
* We have a slightly complex syncronisation mechansim here,
* we need to make sure that we do not have multiple threads
@ -733,22 +746,11 @@ struct timespec req;
}
}
written = slave->dcb->func.write(slave->dcb, head);
if (written)
slave->binlog_pos = hdr.next_pos;
rval = written;
if (hdr.event_type == ROTATE_EVENT)
if (written && hdr.event_type != ROTATE_EVENT)
{
close(fd);
blr_slave_rotate(slave, GWBUF_DATA(record));
if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s\n",
slave->binlogfile)));
break;
}
slave->binlog_pos = hdr.next_pos;
}
rval = written;
atomic_add(&slave->stats.n_events, 1);
burst++;
}
@ -762,7 +764,21 @@ struct timespec req;
slave->cstate &= ~CS_READING;
spinlock_release(&slave->catch_lock);
if (record)
{
atomic_add(&slave->stats.n_flows, 1);
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
}
else
{
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_UPTODATE;
spinlock_release(&slave->catch_lock);
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"blr_slave_catchup slave is up to date %s, %u\n",
slave->binlogfile, slave->binlog_pos)));
}
return rval;
}
@ -790,6 +806,7 @@ ROUTER_INSTANCE *router = slave->router;
atomic_add(&slave->stats.n_events, 1);
blr_slave_catchup(router, slave);
}
return 0;
}
/**