From fe0e7c74d050c33104efb3302eca04d4529a87f1 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Tue, 14 Oct 2014 11:43:08 +0100 Subject: [PATCH] Resolve transition from catchup to follow mode. --- server/modules/include/blr.h | 14 +- server/modules/routing/binlog/blr.c | 8 + server/modules/routing/binlog/blr_file.c | 160 +++++++++++++++++--- server/modules/routing/binlog/blr_master.c | 163 +++++++++++---------- server/modules/routing/binlog/blr_slave.c | 144 ++++++++++-------- 5 files changed, 325 insertions(+), 164 deletions(-) diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 84240675e..8715e1e13 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -33,6 +33,8 @@ #include #include +#include + #define BINLOG_FNAMELEN 16 #define BLR_PROTOCOL "MySQLBackend" #define BINLOG_MAGIC { 0xfe, 0x62, 0x69, 0x6e } @@ -43,8 +45,6 @@ #define BLR_STATS_FREQ 60 #define BLR_NSTATS_MINUTES 30 -#define QUEUE_SLAVE 1 - /** * High and Low water marks for the slave dcb. These values can be overriden * by the router options highwater and lowwater. @@ -166,6 +166,7 @@ typedef struct router_slave { *router; /*< Pointer to the owning router */ struct router_slave *next; SLAVE_STATS stats; /*< Slave statistics */ + MEMLOG *clog; #if defined(SS_DEBUG) skygw_chk_t rses_chk_tail; #endif @@ -245,6 +246,8 @@ typedef struct router_instance { int binlog_fd; /*< File descriptor of the binlog * file being written */ + uint64_t last_written; /*< Position of last event written */ + char prevbinlog[BINLOG_FNAMELEN+1]; BLFILE *files; /*< Files used by the slaves */ SPINLOCK fileslock; /*< Lock for the files queue above */ unsigned int low_water; /*< Low water mark for client DCB */ @@ -291,11 +294,12 @@ static char *blrm_states[] = { "Unconnected", "Authenticated", "Timestamp retrie #define BLRS_UNREGISTERED 0x0001 #define BLRS_REGISTERED 0x0002 #define BLRS_DUMPING 0x0003 +#define BLRS_ERRORED 0x0004 -#define BLRS_MAXSTATE 0x0003 +#define BLRS_MAXSTATE 0x0004 static char *blrs_states[] = { "Created", "Unregistered", "Registered", - "Sending binlogs" }; + "Sending binlogs", "Errored" }; /** * Slave catch-up status @@ -406,6 +410,6 @@ extern void blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *); extern void blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t); extern void blr_file_flush(ROUTER_INSTANCE *); extern BLFILE *blr_open_binlog(ROUTER_INSTANCE *, char *); -extern GWBUF *blr_read_binlog(BLFILE *, unsigned int, REP_HEADER *); +extern GWBUF *blr_read_binlog(ROUTER_INSTANCE *, BLFILE *, unsigned int, REP_HEADER *); extern void blr_close_binlog(ROUTER_INSTANCE *, BLFILE *); #endif diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index d9659ddd2..c088c89d8 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -382,6 +382,14 @@ ROUTER_SLAVE *slave; slave->dcb = session->client; slave->router = inst; slave->file = NULL; + strcpy(slave->binlogfile, "unassigned"); + { + char buf[1000]; + sprintf(buf, "Slave History %x", slave); + slave->clog = memlog_create(buf, ML_INT, 2000); + if (slave->clog) + memlog_set(slave->clog, MLNOAUTOFLUSH); + } /** * Add this session to the list of active sessions. diff --git a/server/modules/routing/binlog/blr_file.c b/server/modules/routing/binlog/blr_file.c index 8bab32fae..1ef52968a 100644 --- a/server/modules/routing/binlog/blr_file.c +++ b/server/modules/routing/binlog/blr_file.c @@ -56,6 +56,7 @@ extern int lm_enabled_logfiles_bitmask; static void blr_file_create(ROUTER_INSTANCE *router, char *file); static void blr_file_append(ROUTER_INSTANCE *router, char *file); static uint32_t extract_field(uint8_t *src, int bits); +static void blr_log_header(logfile_id_t file, char *msg, uint8_t *ptr); /** * Initialise the binlog file for this instance. MaxScale will look @@ -163,7 +164,7 @@ unsigned char magic[] = BINLOG_MAGIC; else { LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, - "Failed to create binlog file %s\n", path))); + "Failed to create binlog file %s", path))); } fsync(fd); close(router->binlog_fd); @@ -192,7 +193,7 @@ int fd; if ((fd = open(path, O_RDWR|O_APPEND, 0666)) == -1) { LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, - "Failed to open binlog file %s for append.\n", + "Failed to open binlog file %s for append.", path))); return; } @@ -215,6 +216,7 @@ blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf) { pwrite(router->binlog_fd, buf, hdr->event_size, hdr->next_pos - hdr->event_size); router->binlog_position = hdr->next_pos; + router->last_written = hdr->next_pos - hdr->event_size; } /** @@ -270,7 +272,7 @@ BLFILE *file; if ((file->fd = open(path, O_RDONLY, 0666)) == -1) { LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, - "Failed to open binlog file %s\n", path))); + "Failed to open binlog file %s", path))); free(file); spinlock_release(&router->fileslock); return NULL; @@ -286,18 +288,38 @@ BLFILE *file; /** * Read a replication event into a GWBUF structure. * - * @param file File record - * @param pos Position of binlog record to read - * @param hdr Binlog header to populate - * @return The binlog record wrapped in a GWBUF structure + * @param router The router instance + * @param file File record + * @param pos Position of binlog record to read + * @param hdr Binlog header to populate + * @return The binlog record wrapped in a GWBUF structure */ GWBUF * -blr_read_binlog(BLFILE *file, unsigned int pos, REP_HEADER *hdr) +blr_read_binlog(ROUTER_INSTANCE *router, BLFILE *file, unsigned int pos, REP_HEADER *hdr) { uint8_t hdbuf[19]; GWBUF *result; unsigned char *data; int n; +unsigned long filelen = 0; +struct stat statb; + + if (fstat(file->fd, &statb) == 0) + filelen = statb.st_size; + if (pos >= filelen) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Attempting to read off the end of the binlog file %s, " + "event at %lu.", file->binlogname, pos))); + return NULL; + } + + if (strcmp(router->binlog_name, file->binlogname) == 0 && + pos >= router->binlog_position) + { + return NULL; + } + /* Read the header information from the file */ if ((n = pread(file->fd, hdbuf, 19, pos)) != 19) @@ -306,20 +328,25 @@ int n; { case 0: LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG, - "Reached end of binlog file at %d.\n", + "Reached end of binlog file at %d.", pos))); break; case -1: LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Failed to read binlog file %s at position %d" - " (%s).\n", file->binlogname, + " (%s).", file->binlogname, pos, strerror(errno)))); + if (errno == EBADF) + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Bad file descriptor in read binlog for file %s" + ", reference count is %d, descriptor %d.", + file->binlogname, file->refcnt, file->fd))); break; default: LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Short read when reading the header. " - "Expected 19 bytes got %d bytes." - "Binlog file is %s, position %d\n", + "Expected 19 bytes but got %d bytes. " + "Binlog file is %s, position %d", file->binlogname, pos, n))); break; } @@ -328,14 +355,73 @@ int n; hdr->timestamp = EXTRACT32(hdbuf); hdr->event_type = hdbuf[4]; hdr->serverid = EXTRACT32(&hdbuf[5]); - hdr->event_size = EXTRACT32(&hdbuf[9]); + hdr->event_size = extract_field(&hdbuf[9], 32); hdr->next_pos = EXTRACT32(&hdbuf[13]); hdr->flags = EXTRACT16(&hdbuf[17]); + if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Next position in header appears to be incorrect " + "rereading event header at pos %ul in file %s, " + "file size is %ul. Master will write %ul in %s next.", + pos, file->binlogname, filelen, router->binlog_position, + router->binlog_name))); + if ((n = pread(file->fd, hdbuf, 19, pos)) != 19) + { + switch (n) + { + case 0: + LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG, + "Reached end of binlog file at %d.", + pos))); + break; + case -1: + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Failed to read binlog file %s at position %d" + " (%s).", file->binlogname, + pos, strerror(errno)))); + if (errno == EBADF) + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Bad file descriptor in read binlog for file %s" + ", reference count is %d, descriptor %d.", + file->binlogname, file->refcnt, file->fd))); + break; + default: + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Short read when reading the header. " + "Expected 19 bytes but got %d bytes. " + "Binlog file is %s, position %d", + file->binlogname, pos, n))); + break; + } + return NULL; + } + hdr->timestamp = EXTRACT32(hdbuf); + hdr->event_type = hdbuf[4]; + hdr->serverid = EXTRACT32(&hdbuf[5]); + hdr->event_size = extract_field(&hdbuf[9], 32); + hdr->next_pos = EXTRACT32(&hdbuf[13]); + hdr->flags = EXTRACT16(&hdbuf[17]); + + if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Next position still incorrect after " + "rereading"))); + return NULL; + } + else + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Next position corrected by " + "rereading"))); + } + } if ((result = gwbuf_alloc(hdr->event_size)) == NULL) { LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Failed to allocate memory for binlog entry, " - "size %d at %d.\n", + "size %d at %d.", hdr->event_size, pos))); return NULL; } @@ -344,10 +430,29 @@ int n; if ((n = pread(file->fd, &data[19], hdr->event_size - 19, pos + 19)) != hdr->event_size - 19) // Read the balance { - LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, - "Short read when reading the event at %d. " - "Expected %d bytes got %d bytes.\n", - pos, n))); + if (n == -1) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Error reading the event at %ld in %s. " + "%s, expected %d bytes.", + pos, file->binlogname, + strerror(errno), hdr->event_size - 19))); + } + else + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Short read when reading the event at %ld in %s. " + "Expected %d bytes got %d bytes.", + pos, file->binlogname, hdr->event_size - 19, n))); + if (filelen != 0 && filelen - pos < hdr->event_size) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Binlog event is close to the end of the binlog file, " + "current file size is %u.", + filelen))); + } + blr_log_header(LOGFILE_ERROR, "Possible malformed event header", hdbuf); + } gwbuf_consume(result, hdr->event_size); return NULL; } @@ -372,7 +477,7 @@ blr_close_binlog(ROUTER_INSTANCE *router, BLFILE *file) { spinlock_acquire(&router->fileslock); if (router->files == file) - router->files = file; + router->files = file->next; else { BLFILE *ptr = router->files; @@ -384,8 +489,11 @@ blr_close_binlog(ROUTER_INSTANCE *router, BLFILE *file) spinlock_release(&router->fileslock); close(file->fd); + file->fd = -1; } spinlock_release(&file->lock); + if (file->refcnt == 0) + free(file); } /** @@ -407,3 +515,17 @@ uint32_t rval = 0, shift = 0; } return rval; } + +static void +blr_log_header(logfile_id_t file, char *msg, uint8_t *ptr) +{ +char buf[400], *bufp; +int i; + + bufp = buf; + bufp += sprintf(bufp, "%s: ", msg); + for (i = 0; i < 19; i++) + bufp += sprintf(bufp, "0x%02x ", ptr[i]); + skygw_log_write_flush(file, "%s", buf); + +} diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index 395d10ba3..6d1d7edfd 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -93,7 +93,7 @@ GWBUF *buf; if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL) { LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, - "Binlog router: failed to create DCB for dummy client\n"))); + "Binlog router: failed to create DCB for dummy client"))); return; } router->client = client; @@ -101,14 +101,14 @@ GWBUF *buf; if ((router->session = session_alloc(router->service, client)) == NULL) { LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, - "Binlog router: failed to create session for connection to master\n"))); + "Binlog router: failed to create session for connection to master"))); return; } client->session = router->session; if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL) { LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, - "Binlog router: failed to connect to master server '%s'\n", + "Binlog router: failed to connect to master server '%s'", router->service->databases->unique_name))); return; } @@ -219,7 +219,7 @@ char query[128]; if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE) { LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n", + LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.", router->master_state))); gwbuf_consume(buf, gwbuf_length(buf)); spinlock_acquire(&router->lock); @@ -241,7 +241,7 @@ char query[128]; { LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, - "Received error: %d, %s from master during %s phase of the master state machine.\n", + "Received error: %d, %s from master during %s phase of the master state machine.", MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state] ))); gwbuf_consume(buf, gwbuf_length(buf)); @@ -552,7 +552,7 @@ static REP_HEADER phdr; LOGIF(LE,(skygw_log_write( LOGFILE_ERROR, "Insufficient memory to buffer event " - "of %d bytes. Binlog %s @ %d\n.", + "of %d bytes. Binlog %s @ %d.", len, router->binlog_name, router->binlog_position))); break; @@ -577,7 +577,7 @@ static REP_HEADER phdr; LOGFILE_ERROR, "Expected entire message in buffer " "chain, but failed to create complete " - "message as expected. %s @ %d\n", + "message as expected. %s @ %d", router->binlog_name, router->binlog_position))); free(msg); @@ -598,7 +598,7 @@ static REP_HEADER phdr; router->stats.n_residuals++; LOGIF(LD,(skygw_log_write( LOGFILE_DEBUG, - "Residual data left after %d records. %s @ %d\n", + "Residual data left after %d records. %s @ %d", router->stats.n_binlogs, router->binlog_name, router->binlog_position))); break; @@ -650,7 +650,7 @@ static REP_HEADER phdr; // #define SHOW_EVENTS #ifdef SHOW_EVENTS - printf("blr: event type 0x%02x, flags 0x%04x, event size %d\n", hdr.event_type, hdr.flags, hdr.event_size); + printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size); #endif if (hdr.event_type >= 0 && hdr.event_type < 0x24) router->stats.events[hdr.event_type]++; @@ -659,7 +659,7 @@ static REP_HEADER phdr; // Fake format description message LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG, "Replication fake event. " - "Binlog %s @ %d.\n", + "Binlog %s @ %d.", router->binlog_name, router->binlog_position))); router->stats.n_fakeevents++; @@ -688,7 +688,7 @@ static REP_HEADER phdr; LOGIF(LD,(skygw_log_write( LOGFILE_DEBUG, "Replication heartbeat. " - "Binlog %s @ %d.\n", + "Binlog %s @ %d.", router->binlog_name, router->binlog_position))); router->stats.n_heartbeats++; @@ -712,7 +712,7 @@ static REP_HEADER phdr; "Artificial event not written " "to disk or distributed. " "Type 0x%x, Length %d, Binlog " - "%s @ %d\n.", + "%s @ %d.", hdr.event_type, hdr.event_size, router->binlog_name, @@ -729,7 +729,7 @@ static REP_HEADER phdr; { printf("Binlog router error: %s\n", &ptr[7]); LOGIF(LE,(skygw_log_write(LOGFILE_ERROR, - "Error packet in binlog stream.%s @ %d\n.", + "Error packet in binlog stream.%s @ %d.", router->binlog_name, router->binlog_position))); blr_log_packet(LOGFILE_ERROR, "Error Packet:", @@ -846,6 +846,7 @@ char file[BINLOG_FNAMELEN+1]; printf("New file: %s @ %ld\n", file, pos); #endif + strcpy(router->prevbinlog, router->binlog_name); if (strncmp(router->binlog_name, file, slen) != 0) { router->stats.n_rotates++; @@ -905,18 +906,22 @@ int action; continue; } spinlock_acquire(&slave->catch_lock); - if ((slave->cstate & (CS_UPTODATE|CS_DIST)) == CS_UPTODATE) + if ((slave->cstate & (CS_UPTODATE|CS_BUSY)) == CS_UPTODATE) { - /* Slave is up to date with the binlog and no distribute is - * running on this slave. + /* + * This slave is reporting it is to date with the binlog of the + * master running on this slave. + * It has no thread running currently that is sending binlog + * events. */ action = 1; - slave->cstate |= CS_DIST; + slave->cstate |= CS_BUSY; } - else if ((slave->cstate & (CS_UPTODATE|CS_DIST)) == (CS_UPTODATE|CS_DIST)) + else if ((slave->cstate & (CS_UPTODATE|CS_BUSY)) == (CS_UPTODATE|CS_BUSY)) { - /* Slave is up to date with the binlog and a distribute is - * running on this slave. + /* + * The slave is up to date with the binlog and a process is + * running on this slave to send binlog events. */ slave->overrun = 1; action = 2; @@ -928,12 +933,20 @@ int action; } slave->stats.n_actions[action-1]++; spinlock_release(&slave->catch_lock); + if (action == 1) { - if ((slave->binlog_pos == hdr->next_pos - hdr->event_size) - && (strcmp(slave->binlogfile, router->binlog_name) == 0 || - hdr->event_type == ROTATE_EVENT)) + if (slave->binlog_pos == router->last_written && + (strcmp(slave->binlogfile, router->binlog_name) == 0 || + (hdr->event_type == ROTATE_EVENT && + strcmp(slave->binlogfile, router->prevbinlog)))) { + /* + * The slave should be up to date, check that the binlog + * position matches the event we have to distribute or this + * is a rotate event. Send the event directly from memory to + * the slave. + */ pkt = gwbuf_alloc(hdr->event_size + 5); buf = GWBUF_DATA(pkt); encode_value(buf, hdr->event_size + 1, 24); @@ -955,84 +968,82 @@ int action; { slave->stats.n_overrun++; slave->overrun = 0; -#if QUEUE_SLAVE poll_fake_write_event(slave->dcb); -#else - spinlock_release(&router->lock); - slave->cstate &= ~(CS_UPTODATE|CS_DIST); - spinlock_release(&slave->catch_lock); - blr_slave_catchup(router, slave, false); - spinlock_acquire(&router->lock); - slave = router->slaves; - if (slave) - continue; - else - break; -#endif } else { - slave->cstate &= ~CS_DIST; + slave->cstate &= ~CS_BUSY; } spinlock_release(&slave->catch_lock); } else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size) && strcmp(slave->binlogfile, router->binlog_name) == 0) { + /* + * The slave is ahead of the master, this should never + * happen. Force the slave to catchup mode in order to + * try to resolve the issue. + */ LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, "Slave %d is ahead of expected position %s@%d. " "Expected position %d", slave->serverid, slave->binlogfile, slave->binlog_pos, hdr->next_pos - hdr->event_size))); + spinlock_acquire(&slave->catch_lock); + slave->cstate &= ~(CS_UPTODATE|CS_BUSY); + slave->cstate |= CS_EXPECTCB; + spinlock_release(&slave->catch_lock); + memlog_log(slave->clog, 22); + memlog_flush(slave->clog); + poll_fake_write_event(slave->dcb); } - else if ((hdr->event_type != ROTATE_EVENT) - && (slave->binlog_pos != hdr->next_pos - hdr->event_size || - strcmp(slave->binlogfile, router->binlog_name) != 0)) + else { - /* Check slave is in catchup mode and if not - * force it to go into catchup mode. + /* + * The slave is not at the position it should be. Force it into + * catchup mode rather than send this event. */ - if (slave->cstate & CS_UPTODATE) - { -#if QUEUE_SLAVE - poll_fake_write_event(slave->dcb); -#else - nextslave = slave->next; - spinlock_release(&router->lock); - LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, - "Force slave %d into catchup mode %s@%d\n", - slave->serverid, slave->binlogfile, - slave->binlog_pos))); - spinlock_acquire(&slave->catch_lock); - slave->cstate &= ~(CS_UPTODATE|CS_DIST); - spinlock_release(&slave->catch_lock); - blr_slave_catchup(router, slave, false); - spinlock_acquire(&router->lock); - slave = router->slaves; - if (slave) - { - while (slave && slave != nextslave) - slave = slave->next; - if (slave) - continue; - else - break; - } - else - { - break; - } -#endif - } + spinlock_acquire(&slave->catch_lock); + slave->cstate &= ~(CS_UPTODATE|CS_BUSY); + slave->cstate |= CS_EXPECTCB; + spinlock_release(&slave->catch_lock); + memlog_log(slave->clog, 21); + memlog_flush(slave->clog); + poll_fake_write_event(slave->dcb); } } + else if (action == 3) + { + /* Slave is not up to date + * Check if it is either expecting a callback or + * is busy processing a callback + */ + spinlock_acquire(&slave->catch_lock); + if ((slave->cstate & (CS_EXPECTCB|CS_BUSY)) == 0) + { + slave->cstate |= CS_EXPECTCB; + spinlock_release(&slave->catch_lock); + memlog_log(slave->clog, 20); + poll_fake_write_event(slave->dcb); + } + else + spinlock_release(&slave->catch_lock); + } slave = slave->next; } spinlock_release(&router->lock); } +/** + * Write a raw event (the first 40 bytes at most) to a log file + * + * @param file The logfile to write to + * @param msg A textual message to write before the packet + * @param ptr Pointer to the message buffer + * @param len Length of message packet + */ static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len) { @@ -1044,8 +1055,8 @@ int i; for (i = 0; i < len && i < 40; i++) bufp += sprintf(bufp, "0x%02x ", ptr[i]); if (i < len) - skygw_log_write_flush(file, "%s...\n", buf); + skygw_log_write_flush(file, "%s...", buf); else - skygw_log_write_flush(file, "%s\n", buf); + skygw_log_write_flush(file, "%s", buf); } diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index d7c691892..487661596 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -88,7 +88,7 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) if (slave->state < 0 || slave->state > BLRS_MAXSTATE) { LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, "Invalid slave state machine state (%d) for binlog router.\n", + LOGFILE_ERROR, "Invalid slave state machine state (%d) for binlog router.", slave->state))); gwbuf_consume(queue, gwbuf_length(queue)); return 0; @@ -108,13 +108,13 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) break; case COM_QUIT: LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG, - "COM_QUIT received from slave with server_id %d\n", + "COM_QUIT received from slave with server_id %d", slave->serverid))); break; default: LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, - "Unexpected MySQL Command (%d) received from slave\n", + "Unexpected MySQL Command (%d) received from slave", MYSQL_COMMAND(queue)))); break; } @@ -165,7 +165,7 @@ int query_len; query_text = strndup(qtext, query_len); LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, "Execute statement from the slave '%s'\n", query_text))); + LOGFILE_TRACE, "Execute statement from the slave '%s'", query_text))); /* * Implement a very rudimental "parsing" of the query text by extarcting the * words from the statement and matchng them against the subset of queries we @@ -268,7 +268,7 @@ int query_len; query_text = strndup(qtext, query_len); LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, "Unexpected query from slave server %s\n", query_text))); + LOGFILE_ERROR, "Unexpected query from slave server %s", query_text))); free(query_text); blr_slave_send_error(router, slave, "Unexpected SQL query received from slave."); return 0; @@ -299,7 +299,7 @@ GWBUF *clone; else { LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, - "Failed to clone server response to send to slave.\n"))); + "Failed to clone server response to send to slave."))); return 0; } } @@ -485,7 +485,7 @@ uint32_t chksum; { LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, - "blr_slave_binlog_dump expected a COM_BINLOG_DUMP but received %d\n", + "blr_slave_binlog_dump expected a COM_BINLOG_DUMP but received %d", *(ptr-1)))); return 0; } @@ -499,7 +499,6 @@ uint32_t chksum; strncpy(slave->binlogfile, (char *)ptr, binlognamelen); slave->binlogfile[binlognamelen] = 0; - slave->state = BLRS_DUMPING; slave->seqno = 1; if (slave->nocrc) @@ -521,8 +520,8 @@ uint32_t chksum; ptr = blr_build_header(resp, &hdr); encode_value(ptr, slave->binlog_pos, 64); ptr += 8; - memcpy(ptr, slave->binlogfile, BINLOG_FNAMELEN); - ptr += BINLOG_FNAMELEN; + memcpy(ptr, slave->binlogfile, binlognamelen); + ptr += binlognamelen; if (!slave->nocrc) { @@ -568,22 +567,18 @@ uint32_t chksum; slave->dcb->low_water = router->low_water; slave->dcb->high_water = router->high_water; -// dcb_add_callback(slave->dcb, DCB_REASON_LOW_WATER, blr_slave_callback, slave); dcb_add_callback(slave->dcb, DCB_REASON_DRAINED, blr_slave_callback, slave); + slave->state = BLRS_DUMPING; if (slave->binlog_pos != router->binlog_position || strcmp(slave->binlogfile, router->binlog_name) != 0) { spinlock_acquire(&slave->catch_lock); slave->cstate &= ~CS_UPTODATE; + slave->cstate |= CS_EXPECTCB; spinlock_release(&slave->catch_lock); -#if QUEUE_SLAVE poll_fake_write_event(slave->dcb); -#else - rval = blr_slave_catchup(router, slave, true); -#endif } - return rval; } @@ -705,34 +700,14 @@ unsigned long beat; else burst = router->short_burst; spinlock_acquire(&slave->catch_lock); -#if QUEUE_SLAVE if (slave->cstate & CS_BUSY) + { + spinlock_release(&slave->catch_lock); + memlog_log(slave->clog, 1); return 0; - slave->cstate = CS_BUSY; - spinlock_release(&slave->catch_lock); -#else - while ((slave->cstate & (CS_HOLD|CS_BUSY)) == (CS_HOLD|CS_BUSY)) - { - spinlock_release(&slave->catch_lock); - req.tv_sec = 0; - req.tv_nsec = 100; - nanosleep(&req, NULL); - spinlock_acquire(&slave->catch_lock); - } - if (slave->cstate & CS_BUSY) - { - spinlock_release(&slave->catch_lock); -if (hkheartbeat - beat > 5) LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, - "Long wait in blr_salve_catchup %ld00ms with %s burst, return without write records.\n", -hkheartbeat - beat, large ? "long" : "short"))); - return; } slave->cstate |= CS_BUSY; spinlock_release(&slave->catch_lock); -if (hkheartbeat - beat > 5) LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, - "Long wait in blr_slave_catchup %ld00ms with %s burst.\n", -hkheartbeat - beat, large ? "long" : "short"))); -#endif if (slave->file == NULL) { @@ -740,21 +715,18 @@ hkheartbeat - beat, large ? "long" : "short"))); { LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, - "blr_slave_catchup failed to open binlog file %s\n", + "blr_slave_catchup failed to open binlog file %s", slave->binlogfile))); + slave->cstate &= ~CS_BUSY; + slave->state = BLRS_ERRORED; + memlog_log(slave->clog, 2); return 0; } } slave->stats.n_bursts++; while (burst-- && - (record = blr_read_binlog(slave->file, slave->binlog_pos, &hdr)) != NULL) + (record = blr_read_binlog(router, slave->file, slave->binlog_pos, &hdr)) != NULL) { -#if QUEUE_SLAVE -#else - spinlock_acquire(&slave->catch_lock); - slave->cstate &= ~CS_HOLD; - spinlock_release(&slave->catch_lock); -#endif head = gwbuf_alloc(5); ptr = GWBUF_DATA(head); encode_value(ptr, hdr.event_size + 1, 24); @@ -770,8 +742,9 @@ hkheartbeat - beat, large ? "long" : "short"))); { LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, - "blr_slave_catchup failed to open binlog file %s\n", + "blr_slave_catchup failed to open binlog file %s", slave->binlogfile))); + slave->state = BLRS_ERRORED; break; } } @@ -782,12 +755,6 @@ hkheartbeat - beat, large ? "long" : "short"))); } rval = written; slave->stats.n_events++; -#if QUEUE_SLAVE -#else - spinlock_acquire(&slave->catch_lock); - slave->cstate |= CS_HOLD; - spinlock_release(&slave->catch_lock); -#endif } if (record == NULL) slave->stats.n_failed_read++; @@ -801,23 +768,67 @@ hkheartbeat - beat, large ? "long" : "short"))); spinlock_acquire(&slave->catch_lock); slave->cstate |= CS_EXPECTCB; spinlock_release(&slave->catch_lock); + memlog_log(slave->clog, 3); poll_fake_write_event(slave->dcb); } - else + else if (slave->binlog_pos == router->binlog_position && + strcmp(slave->binlogfile, router->binlog_name) == 0) { int state_change = 0; spinlock_acquire(&slave->catch_lock); - if ((slave->cstate & CS_UPTODATE) == 0) + + /* Now check again since we hold the slave->catch_lock. */ + if (slave->binlog_pos != router->binlog_position || + strcmp(slave->binlogfile, router->binlog_name) != 0) { - slave->stats.n_upd++; - slave->cstate |= CS_UPTODATE; - state_change = 1; + slave->cstate &= ~CS_UPTODATE; + slave->cstate |= CS_EXPECTCB; + memlog_log(slave->clog, 30); + poll_fake_write_event(slave->dcb); + } + else + { + if ((slave->cstate & CS_UPTODATE) == 0) + { + slave->stats.n_upd++; + slave->cstate |= CS_UPTODATE; + state_change = 1; + memlog_log(slave->clog, 4); + } + else + memlog_log(slave->clog, 5); } spinlock_release(&slave->catch_lock); + if (state_change) + { LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, - "blr_slave_catchup slave is up to date %s, %u\n", + "blr_slave_catchup slave is up to date %s, %u.", slave->binlogfile, slave->binlog_pos))); + } + } + else + { + if (strcmp(router->binlog_name, slave->binlogfile) != 0) + { + /* We may have reached the end of file of a non-current + * binlog file. + */ + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Slave reached end of file for binlong file %s " + "which is not the file currently being downloaded.", + slave->binlogfile))); + slave->state = BLRS_ERRORED; + memlog_log(slave->clog, 6); + } + else + { + spinlock_acquire(&slave->catch_lock); + slave->cstate |= CS_EXPECTCB; + spinlock_release(&slave->catch_lock); + memlog_log(slave->clog, 7); + poll_fake_write_event(slave->dcb); + } } return rval; } @@ -840,16 +851,21 @@ ROUTER_INSTANCE *router = slave->router; if (reason == DCB_REASON_DRAINED) { - if (slave->state == BLRS_DUMPING && - (slave->binlog_pos != router->binlog_position - || strcmp(slave->binlogfile, router->binlog_name))) + if (slave->state == BLRS_DUMPING) { spinlock_acquire(&slave->catch_lock); - slave->cstate &= ~CS_UPTODATE; + slave->cstate &= ~(CS_UPTODATE|CS_EXPECTCB); spinlock_release(&slave->catch_lock); slave->stats.n_dcb++; blr_slave_catchup(router, slave, true); } + else + { + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, "Ignored callback due to slave state %s", + blrs_states[slave->state]))); + memlog_log(slave->clog, 8); + } } if (reason == DCB_REASON_LOW_WATER)