diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 722775478..48117d10a 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -248,6 +248,8 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered", */ #define CS_READING 0x0001 #define CS_INNERLOOP 0x0002 +#define CS_UPTODATE 0x0004 +#define CS_EXPECTCB 0x0008 /** * MySQL protocol OpCodes needed for replication diff --git a/server/modules/routing/binlog/STATUS b/server/modules/routing/binlog/STATUS index bd981306b..db3a190f5 100644 --- a/server/modules/routing/binlog/STATUS +++ b/server/modules/routing/binlog/STATUS @@ -8,11 +8,6 @@ MariaDB or Percona Server. To Do List: -1. Thread safety needs to be examine, currently MaxScale has been -run with a single thread when testing this router. +1. The router does not implement the replication heartbeat mechanism. -2. Binlog rotate events have yet to be tested. - -3. The router does not implement the replication heartbeat mechanism. - -4. Performance measurements have yet to be made. +2. Performance measurements have yet to be made. diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 6560bd609..8d1e2b39e 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -330,6 +330,8 @@ ROUTER_SLAVE *slave; memset(&slave->stats, 0, sizeof(SLAVE_STATS)); atomic_add(&inst->stats.n_slaves, 1); slave->state = BLRS_CREATED; /* Set initial state of the slave */ + slave->cstate = 0; + spinlock_init(&slave->catch_lock); slave->dcb = session->client; slave->router = instance; @@ -555,6 +557,14 @@ int i = 0; dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events); dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts); dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows); + if ((session->cstate & CS_UPTODATE) == 0) + { + dcb_printf(dcb, "\t\tSlave is in catchup mode. %s\n", + ((session->cstate & CS_EXPECTCB) == 0 ? "" : + "Waiting for DCB queue to drain.")); + } + else + dcb_printf(dcb, "\t\tSlave is in normal mode.\n"); session = session->next; } spinlock_release(&router_inst->lock); diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index fee0be5db..c26c68376 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -658,7 +658,8 @@ ROUTER_SLAVE *slave; slave = router->slaves; while (slave) { - if (slave->binlog_pos == hdr->next_pos - hdr->event_size) + if ((slave->binlog_pos == hdr->next_pos - hdr->event_size) + && strcmp(slave->binlogfile, router->binlog_name) == 0) { pkt = gwbuf_alloc(hdr->event_size + 5); buf = GWBUF_DATA(pkt); @@ -667,12 +668,37 @@ ROUTER_SLAVE *slave; *buf++ = slave->seqno++; *buf++ = 0; // OK memcpy(buf, ptr, hdr->event_size); - slave->dcb->func.write(slave->dcb, pkt); - slave->binlog_pos = hdr->next_pos; if (hdr->event_type == ROTATE_EVENT) { blr_slave_rotate(slave, ptr); } + slave->dcb->func.write(slave->dcb, pkt); + if (hdr->event_type != ROTATE_EVENT) + { + slave->binlog_pos = hdr->next_pos; + } + } + else if ((hdr->event_type != ROTATE_EVENT) + && (slave->binlog_pos != hdr->next_pos || + strcmp(slave->binlogfile, router->binlog_name) != 0)) + { + /* Check slave is in catchup mode and if not + * force it to go into catchup mode. + */ + if (slave->cstate & CS_UPTODATE) + { + spinlock_release(&router->lock); + spinlock_acquire(&slave->catch_lock); + slave->cstate &= ~CS_UPTODATE; + spinlock_release(&slave->catch_lock); + blr_slave_catchup(router, slave); + spinlock_acquire(&router->lock); + slave = router->slaves; + if (slave) + continue; + else + break; + } } slave = slave->next; diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index 0a52fe2c0..da7d7f8a2 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -61,7 +61,7 @@ static void blr_slave_send_error(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, c static int blr_slave_send_timestamp(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); static int blr_slave_register(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue); static int blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue); -static int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); +int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); static uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr); static int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data); @@ -88,7 +88,7 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) { if (slave->state < 0 || slave->state > BLRS_MAXSTATE) { - LOGIF(LM, (skygw_log_write( + LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, "Invalid slave state machine state (%d) for binlog router.\n", slave->state))); gwbuf_consume(queue, gwbuf_length(queue)); @@ -108,6 +108,10 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) return blr_slave_binlog_dump(router, slave, queue); break; default: + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Unexpected MySQL Command (%d) received from slave\n", + MYSQL_COMMAND(queue)))); break; } return 0; @@ -450,7 +454,13 @@ uint32_t chksum; len = extract_field(ptr, 24); ptr += 4; // Skip length and sequence number if (*ptr++ != COM_BINLOG_DUMP) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "blr_slave_binlog_dump expected a COM_BINLOG_DUMP but received %d\n", + *(ptr-1)))); return 0; + } slave->binlog_pos = extract_field(ptr, 32); ptr += 4; @@ -632,16 +642,19 @@ uint8_t *ptr; * @param slave The slave that is behind * @return The number of bytes written */ -static int +int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) { GWBUF *head, *record; REP_HEADER hdr; -int written, fd, rval = 0, burst = 0; +int written, fd, rval = 1, burst = 0; uint8_t *ptr; struct timespec req; + spinlock_acquire(&slave->catch_lock); + slave->cstate &= ~CS_EXPECTCB; + spinlock_release(&slave->catch_lock); /* * We have a slightly complex syncronisation mechansim here, * we need to make sure that we do not have multiple threads @@ -733,22 +746,11 @@ struct timespec req; } } written = slave->dcb->func.write(slave->dcb, head); - if (written) - slave->binlog_pos = hdr.next_pos; - rval = written; - if (hdr.event_type == ROTATE_EVENT) + if (written && hdr.event_type != ROTATE_EVENT) { - close(fd); - blr_slave_rotate(slave, GWBUF_DATA(record)); - if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1) - { - LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, - "blr_slave_catchup failed to open binlog file %s\n", - slave->binlogfile))); - break; - } + slave->binlog_pos = hdr.next_pos; } + rval = written; atomic_add(&slave->stats.n_events, 1); burst++; } @@ -762,7 +764,21 @@ struct timespec req; slave->cstate &= ~CS_READING; spinlock_release(&slave->catch_lock); if (record) + { atomic_add(&slave->stats.n_flows, 1); + spinlock_acquire(&slave->catch_lock); + slave->cstate |= CS_EXPECTCB; + spinlock_release(&slave->catch_lock); + } + else + { + spinlock_acquire(&slave->catch_lock); + slave->cstate |= CS_UPTODATE; + spinlock_release(&slave->catch_lock); + LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, + "blr_slave_catchup slave is up to date %s, %u\n", + slave->binlogfile, slave->binlog_pos))); + } return rval; } @@ -790,6 +806,7 @@ ROUTER_INSTANCE *router = slave->router; atomic_add(&slave->stats.n_events, 1); blr_slave_catchup(router, slave); } + return 0; } /**