Resolve transition from catchup to follow mode.

This commit is contained in:
Mark Riddoch
2014-10-14 11:43:08 +01:00
parent 52e8b33422
commit fe0e7c74d0
5 changed files with 325 additions and 164 deletions

View File

@ -56,6 +56,7 @@ extern int lm_enabled_logfiles_bitmask;
static void blr_file_create(ROUTER_INSTANCE *router, char *file);
static void blr_file_append(ROUTER_INSTANCE *router, char *file);
static uint32_t extract_field(uint8_t *src, int bits);
static void blr_log_header(logfile_id_t file, char *msg, uint8_t *ptr);
/**
* Initialise the binlog file for this instance. MaxScale will look
@ -163,7 +164,7 @@ unsigned char magic[] = BINLOG_MAGIC;
else
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to create binlog file %s\n", path)));
"Failed to create binlog file %s", path)));
}
fsync(fd);
close(router->binlog_fd);
@ -192,7 +193,7 @@ int fd;
if ((fd = open(path, O_RDWR|O_APPEND, 0666)) == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to open binlog file %s for append.\n",
"Failed to open binlog file %s for append.",
path)));
return;
}
@ -215,6 +216,7 @@ blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf)
{
pwrite(router->binlog_fd, buf, hdr->event_size, hdr->next_pos - hdr->event_size);
router->binlog_position = hdr->next_pos;
router->last_written = hdr->next_pos - hdr->event_size;
}
/**
@ -270,7 +272,7 @@ BLFILE *file;
if ((file->fd = open(path, O_RDONLY, 0666)) == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to open binlog file %s\n", path)));
"Failed to open binlog file %s", path)));
free(file);
spinlock_release(&router->fileslock);
return NULL;
@ -286,18 +288,38 @@ BLFILE *file;
/**
* Read a replication event into a GWBUF structure.
*
* @param file File record
* @param pos Position of binlog record to read
* @param hdr Binlog header to populate
* @return The binlog record wrapped in a GWBUF structure
* @param router The router instance
* @param file File record
* @param pos Position of binlog record to read
* @param hdr Binlog header to populate
* @return The binlog record wrapped in a GWBUF structure
*/
GWBUF *
blr_read_binlog(BLFILE *file, unsigned int pos, REP_HEADER *hdr)
blr_read_binlog(ROUTER_INSTANCE *router, BLFILE *file, unsigned int pos, REP_HEADER *hdr)
{
uint8_t hdbuf[19];
GWBUF *result;
unsigned char *data;
int n;
unsigned long filelen = 0;
struct stat statb;
if (fstat(file->fd, &statb) == 0)
filelen = statb.st_size;
if (pos >= filelen)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Attempting to read off the end of the binlog file %s, "
"event at %lu.", file->binlogname, pos)));
return NULL;
}
if (strcmp(router->binlog_name, file->binlogname) == 0 &&
pos >= router->binlog_position)
{
return NULL;
}
/* Read the header information from the file */
if ((n = pread(file->fd, hdbuf, 19, pos)) != 19)
@ -306,20 +328,25 @@ int n;
{
case 0:
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"Reached end of binlog file at %d.\n",
"Reached end of binlog file at %d.",
pos)));
break;
case -1:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to read binlog file %s at position %d"
" (%s).\n", file->binlogname,
" (%s).", file->binlogname,
pos, strerror(errno))));
if (errno == EBADF)
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Bad file descriptor in read binlog for file %s"
", reference count is %d, descriptor %d.",
file->binlogname, file->refcnt, file->fd)));
break;
default:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Short read when reading the header. "
"Expected 19 bytes got %d bytes."
"Binlog file is %s, position %d\n",
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %d",
file->binlogname, pos, n)));
break;
}
@ -328,14 +355,73 @@ int n;
hdr->timestamp = EXTRACT32(hdbuf);
hdr->event_type = hdbuf[4];
hdr->serverid = EXTRACT32(&hdbuf[5]);
hdr->event_size = EXTRACT32(&hdbuf[9]);
hdr->event_size = extract_field(&hdbuf[9], 32);
hdr->next_pos = EXTRACT32(&hdbuf[13]);
hdr->flags = EXTRACT16(&hdbuf[17]);
if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Next position in header appears to be incorrect "
"rereading event header at pos %ul in file %s, "
"file size is %ul. Master will write %ul in %s next.",
pos, file->binlogname, filelen, router->binlog_position,
router->binlog_name)));
if ((n = pread(file->fd, hdbuf, 19, pos)) != 19)
{
switch (n)
{
case 0:
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"Reached end of binlog file at %d.",
pos)));
break;
case -1:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to read binlog file %s at position %d"
" (%s).", file->binlogname,
pos, strerror(errno))));
if (errno == EBADF)
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Bad file descriptor in read binlog for file %s"
", reference count is %d, descriptor %d.",
file->binlogname, file->refcnt, file->fd)));
break;
default:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Short read when reading the header. "
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %d",
file->binlogname, pos, n)));
break;
}
return NULL;
}
hdr->timestamp = EXTRACT32(hdbuf);
hdr->event_type = hdbuf[4];
hdr->serverid = EXTRACT32(&hdbuf[5]);
hdr->event_size = extract_field(&hdbuf[9], 32);
hdr->next_pos = EXTRACT32(&hdbuf[13]);
hdr->flags = EXTRACT16(&hdbuf[17]);
if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Next position still incorrect after "
"rereading")));
return NULL;
}
else
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Next position corrected by "
"rereading")));
}
}
if ((result = gwbuf_alloc(hdr->event_size)) == NULL)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to allocate memory for binlog entry, "
"size %d at %d.\n",
"size %d at %d.",
hdr->event_size, pos)));
return NULL;
}
@ -344,10 +430,29 @@ int n;
if ((n = pread(file->fd, &data[19], hdr->event_size - 19, pos + 19))
!= hdr->event_size - 19) // Read the balance
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Short read when reading the event at %d. "
"Expected %d bytes got %d bytes.\n",
pos, n)));
if (n == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Error reading the event at %ld in %s. "
"%s, expected %d bytes.",
pos, file->binlogname,
strerror(errno), hdr->event_size - 19)));
}
else
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Short read when reading the event at %ld in %s. "
"Expected %d bytes got %d bytes.",
pos, file->binlogname, hdr->event_size - 19, n)));
if (filelen != 0 && filelen - pos < hdr->event_size)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Binlog event is close to the end of the binlog file, "
"current file size is %u.",
filelen)));
}
blr_log_header(LOGFILE_ERROR, "Possible malformed event header", hdbuf);
}
gwbuf_consume(result, hdr->event_size);
return NULL;
}
@ -372,7 +477,7 @@ blr_close_binlog(ROUTER_INSTANCE *router, BLFILE *file)
{
spinlock_acquire(&router->fileslock);
if (router->files == file)
router->files = file;
router->files = file->next;
else
{
BLFILE *ptr = router->files;
@ -384,8 +489,11 @@ blr_close_binlog(ROUTER_INSTANCE *router, BLFILE *file)
spinlock_release(&router->fileslock);
close(file->fd);
file->fd = -1;
}
spinlock_release(&file->lock);
if (file->refcnt == 0)
free(file);
}
/**
@ -407,3 +515,17 @@ uint32_t rval = 0, shift = 0;
}
return rval;
}
static void
blr_log_header(logfile_id_t file, char *msg, uint8_t *ptr)
{
char buf[400], *bufp;
int i;
bufp = buf;
bufp += sprintf(bufp, "%s: ", msg);
for (i = 0; i < 19; i++)
bufp += sprintf(bufp, "0x%02x ", ptr[i]);
skygw_log_write_flush(file, "%s", buf);
}