diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 8715e1e13..1e0b3de5d 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -239,6 +239,7 @@ typedef struct router_instance { GWBUF *residual; /*< Any residual binlog event */ MASTER_RESPONSES saved_master; /*< Saved master responses */ char *binlogdir; /*< The directory with the binlog files */ + SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */ char binlog_name[BINLOG_FNAMELEN+1]; /*< Name of the current binlog file */ uint64_t binlog_position; @@ -248,6 +249,7 @@ typedef struct router_instance { */ uint64_t last_written; /*< Position of last event written */ char prevbinlog[BINLOG_FNAMELEN+1]; + int rotating; /*< Rotation in progress flag */ BLFILE *files; /*< Files used by the slaves */ SPINLOCK fileslock; /*< Lock for the files queue above */ unsigned int low_water; /*< Low water mark for client DCB */ diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index c088c89d8..ce1860aac 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -174,6 +174,7 @@ int i; spinlock_init(&inst->lock); inst->files = NULL; spinlock_init(&inst->fileslock); + spinlock_init(&inst->binlog_lock); inst->binlog_fd = -1; @@ -305,6 +306,7 @@ int i; inst->active_logs = 0; inst->reconnect_pending = 0; inst->handling_threads = 0; + inst->rotating = 0; inst->residual = NULL; inst->slaves = NULL; inst->next = NULL; @@ -679,6 +681,8 @@ struct tm tm; spinlock_stats(&instlock, spin_reporter, dcb); dcb_printf(dcb, "\tSpinlock statistics (instance lock):\n"); spinlock_stats(&router_inst->lock, spin_reporter, dcb); + dcb_printf(dcb, "\tSpinlock statistics (binlog position lock):\n"); + spinlock_stats(&router_inst->binlog_lock, spin_reporter, dcb); #endif if (router_inst->slaves) @@ -743,9 +747,11 @@ struct tm tm; dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]); if ((session->cstate & CS_UPTODATE) == 0) { - dcb_printf(dcb, "\t\tSlave is in catchup mode. %s\n", - ((session->cstate & CS_EXPECTCB) == 0 ? "" : - "Waiting for DCB queue to drain.")); + dcb_printf(dcb, "\t\tSlave is in catchup mode. %s%s\n", + ((session->cstate & CS_EXPECTCB) == 0 ? "" : + "Waiting for DCB queue to drain."), + ((session->cstate & CS_BUSY) == 0 ? "" : + " Busy in slave catchup.")); } else diff --git a/server/modules/routing/binlog/blr_file.c b/server/modules/routing/binlog/blr_file.c index 1ef52968a..5c4f81cb3 100644 --- a/server/modules/routing/binlog/blr_file.c +++ b/server/modules/routing/binlog/blr_file.c @@ -168,8 +168,10 @@ unsigned char magic[] = BINLOG_MAGIC; } fsync(fd); close(router->binlog_fd); + spinlock_acquire(&router->binlog_lock); strcpy(router->binlog_name, file); router->binlog_position = 4; /* Initial position after the magic number */ + spinlock_release(&router->binlog_lock); router->binlog_fd = fd; } @@ -199,8 +201,10 @@ int fd; } fsync(fd); close(router->binlog_fd); + spinlock_acquire(&router->binlog_lock); strcpy(router->binlog_name, file); router->binlog_position = lseek(fd, 0L, SEEK_END); + spinlock_release(&router->binlog_lock); router->binlog_fd = fd; } @@ -215,8 +219,10 @@ void blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf) { pwrite(router->binlog_fd, buf, hdr->event_size, hdr->next_pos - hdr->event_size); + spinlock_acquire(&router->binlog_lock); router->binlog_position = hdr->next_pos; router->last_written = hdr->next_pos - hdr->event_size; + spinlock_release(&router->binlog_lock); } /** @@ -308,7 +314,7 @@ struct stat statb; filelen = statb.st_size; if (pos >= filelen) { - LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + LOGIF(LD, (skygw_log_write(LOGFILE_ERROR, "Attempting to read off the end of the binlog file %s, " "event at %lu.", file->binlogname, pos))); return NULL; diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index 6d1d7edfd..241364373 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -695,6 +695,7 @@ static REP_HEADER phdr; } else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F) { + router->rotating = 1; ptr = ptr + 5; // We don't put the first byte of the payload // into the binlog file blr_write_binlog_record(router, &hdr, ptr); @@ -717,6 +718,7 @@ static REP_HEADER phdr; hdr.event_size, router->binlog_name, router->binlog_position))); + router->rotating = 1; ptr += 5; if (hdr.event_type == ROTATE_EVENT) { @@ -852,6 +854,7 @@ char file[BINLOG_FNAMELEN+1]; router->stats.n_rotates++; blr_file_rotate(router, file, pos); } + router->rotating = 0; } /** @@ -943,9 +946,9 @@ int action; { /* * The slave should be up to date, check that the binlog - * position matches the event we have to distribute or this - * is a rotate event. Send the event directly from memory to - * the slave. + * position matches the event we have to distribute or + * this is a rotate event. Send the event directly from + * memory to the slave. */ pkt = gwbuf_alloc(hdr->event_size + 5); buf = GWBUF_DATA(pkt); @@ -976,6 +979,17 @@ int action; } spinlock_release(&slave->catch_lock); } + else if (slave->binlog_pos == hdr->next_pos + && strcmp(slave->binlogfile, router->binlog_name) == 0) + { + /* + * Slave has already read record from file, no + * need to distrbute this event + */ + spinlock_acquire(&slave->catch_lock); + slave->cstate &= ~CS_BUSY; + spinlock_release(&slave->catch_lock); + } else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size) && strcmp(slave->binlogfile, router->binlog_name) == 0) { diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index 487661596..880d2879f 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -775,9 +775,13 @@ unsigned long beat; strcmp(slave->binlogfile, router->binlog_name) == 0) { int state_change = 0; + spinlock_acquire(&router->binlog_lock); spinlock_acquire(&slave->catch_lock); - /* Now check again since we hold the slave->catch_lock. */ + /* + * Now check again since we hold the router->binlog_lock + * and slave->catch_lock. + */ if (slave->binlog_pos != router->binlog_position || strcmp(slave->binlogfile, router->binlog_name) != 0) { @@ -799,6 +803,7 @@ unsigned long beat; memlog_log(slave->clog, 5); } spinlock_release(&slave->catch_lock); + spinlock_release(&router->binlog_lock); if (state_change) { @@ -809,10 +814,15 @@ unsigned long beat; } else { - if (strcmp(router->binlog_name, slave->binlogfile) != 0) + if (router->rotating != 0 && strcmp(router->binlog_name, slave->binlogfile) != 0) { /* We may have reached the end of file of a non-current * binlog file. + * + * Note if the master is rotating there is a window during + * whch the rotate event has been written to the old binlog + * but the new binlog file has not yet been created. Therefore + * we ignore these issues during the rotate processing. */ LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Slave reached end of file for binlong file %s "