Protect updating of router when rotating.

When rotating, all state variables of router are now updated while
protected by the router->binlog_lock lock.
This commit is contained in:
Johan Wikman
2016-01-28 14:17:57 +02:00
parent 0deffbf2f2
commit d9b022db10
3 changed files with 77 additions and 40 deletions

View File

@ -48,6 +48,7 @@
#define BINLOG_FNAMELEN 255 #define BINLOG_FNAMELEN 255
#define BLR_PROTOCOL "MySQLBackend" #define BLR_PROTOCOL "MySQLBackend"
#define BINLOG_MAGIC { 0xfe, 0x62, 0x69, 0x6e } #define BINLOG_MAGIC { 0xfe, 0x62, 0x69, 0x6e }
#define BINLOG_MAGIC_SIZE 4
#define BINLOG_NAMEFMT "%s.%06d" #define BINLOG_NAMEFMT "%s.%06d"
#define BINLOG_NAME_ROOT "mysql-bin" #define BINLOG_NAME_ROOT "mysql-bin"

View File

@ -189,59 +189,77 @@ blr_file_rotate(ROUTER_INSTANCE *router, char *file, uint64_t pos)
* binlog files need an initial 4 magic bytes at the start. blr_file_add_magic() * binlog files need an initial 4 magic bytes at the start. blr_file_add_magic()
* adds them. * adds them.
* *
* @param router The router instance * @param fd file descriptor to the open binlog file
* @param fd file descriptor to the open binlog file * @return True if the magic string could be written to the file.
* @return Nothing
*/ */
static void static bool
blr_file_add_magic(ROUTER_INSTANCE *router, int fd) blr_file_add_magic(int fd)
{ {
unsigned char magic[] = BINLOG_MAGIC; static const unsigned char magic[] = BINLOG_MAGIC;
write(fd, magic, 4); ssize_t written = write(fd, magic, BINLOG_MAGIC_SIZE);
router->current_pos = 4; /* Initial position after the magic number */
router->binlog_position = 4; /* Initial position after the magic number */ return written == BINLOG_MAGIC_SIZE;
router->current_safe_event = 4;
router->last_written = 0;
} }
/** /**
* Create a new binlog file for the router to use. * Create a new binlog file for the router to use.
* *
* @param router The router instance * @param router The router instance
* @param file The binlog file name * @param file The binlog file name
* @return Non-zero if the fie creation succeeded * @return Non-zero if the fie creation succeeded
*/ */
static int static int
blr_file_create(ROUTER_INSTANCE *router, char *file) blr_file_create(ROUTER_INSTANCE *router, char *file)
{ {
char path[PATH_MAX + 1] = ""; int created = 0;
int fd; char err_msg[STRERROR_BUFLEN];
strcpy(path, router->binlogdir); char path[PATH_MAX + 1] = "";
strcat(path, "/");
strcat(path, file);
if ((fd = open(path, O_RDWR|O_CREAT, 0666)) != -1) strcpy(path, router->binlogdir);
{ strcat(path, "/");
blr_file_add_magic(router,fd); strcat(path, file);
}
else
{
char err_msg[STRERROR_BUFLEN];
MXS_ERROR("%s: Failed to create binlog file %s, %s.", int fd = open(path, O_RDWR|O_CREAT, 0666);
if (fd != -1)
{
if (blr_file_add_magic(fd))
{
close(router->binlog_fd);
spinlock_acquire(&router->binlog_lock);
strncpy(router->binlog_name, file, BINLOG_FNAMELEN);
router->binlog_fd = fd;
router->current_pos = BINLOG_MAGIC_SIZE; /* Initial position after the magic number */
router->binlog_position = BINLOG_MAGIC_SIZE;
router->current_safe_event = BINLOG_MAGIC_SIZE;
router->last_written = 0;
spinlock_release(&router->binlog_lock);
created = 1;
}
else
{
MXS_ERROR("%s: Failed to write magic string to created binlog file %s, %s.",
router->service->name, path, strerror_r(errno, err_msg, sizeof(err_msg)));
close(fd);
if (!unlink(path))
{
MXS_ERROR("%s: Failed to delete file %s, %s.",
router->service->name, path, strerror_r(errno, err_msg, sizeof(err_msg))); router->service->name, path, strerror_r(errno, err_msg, sizeof(err_msg)));
return 0; }
} }
fsync(fd); }
close(router->binlog_fd); else
spinlock_acquire(&router->binlog_lock); {
strncpy(router->binlog_name, file, BINLOG_FNAMELEN); MXS_ERROR("%s: Failed to create binlog file %s, %s.",
router->binlog_fd = fd; router->service->name, path, strerror_r(errno, err_msg, sizeof(err_msg)));
spinlock_release(&router->binlog_lock); }
return 1;
return created;
} }
/** /**
@ -273,7 +291,15 @@ int fd;
router->current_pos = lseek(fd, 0L, SEEK_END); router->current_pos = lseek(fd, 0L, SEEK_END);
if (router->current_pos < 4) { if (router->current_pos < 4) {
if (router->current_pos == 0) { if (router->current_pos == 0) {
blr_file_add_magic(router, fd); if (blr_file_add_magic(fd))
{
router->current_pos = BINLOG_MAGIC_SIZE;
router->binlog_position = BINLOG_MAGIC_SIZE;
router->current_safe_event = BINLOG_MAGIC_SIZE;
router->last_written = 0;
} else {
MXS_ERROR("%s: Could not write magic to binlog file.", router->service->name);
}
} else { } else {
/* If for any reason the file's length is between 1 and 3 bytes /* If for any reason the file's length is between 1 and 3 bytes
* then report an error. */ * then report an error. */

View File

@ -1196,7 +1196,11 @@ int n_bufs = -1, pn_bufs = -1;
ptr = ptr + 5; // We don't put the first byte of the payload ptr = ptr + 5; // We don't put the first byte of the payload
// into the binlog file // into the binlog file
if (hdr.event_type == ROTATE_EVENT) if (hdr.event_type == ROTATE_EVENT)
{
spinlock_acquire(&router->binlog_lock);
router->rotating = 1; router->rotating = 1;
spinlock_release(&router->binlog_lock);
}
/* current event is being written to disk file */ /* current event is being written to disk file */
if (blr_write_binlog_record(router, &hdr, ptr) == 0) if (blr_write_binlog_record(router, &hdr, ptr) == 0)
@ -1358,7 +1362,9 @@ int n_bufs = -1, pn_bufs = -1;
ptr += 5; ptr += 5;
if (hdr.event_type == ROTATE_EVENT) if (hdr.event_type == ROTATE_EVENT)
{ {
spinlock_acquire(&router->binlog_lock);
router->rotating = 1; router->rotating = 1;
spinlock_release(&router->binlog_lock);
if (!blr_rotate_event(router, ptr, &hdr)) if (!blr_rotate_event(router, ptr, &hdr))
{ {
/* /*
@ -1474,6 +1480,7 @@ blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr)
* @param router The instance of the router * @param router The instance of the router
* @param ptr The packet containing the rotate event * @param ptr The packet containing the rotate event
* @param hdr The replication message header * @param hdr The replication message header
* @return 1 if the file could be rotated, 0 otherwise.
*/ */
static int static int
blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr) blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr)
@ -1505,17 +1512,20 @@ char file[BINLOG_FNAMELEN+1];
strcpy(router->prevbinlog, router->binlog_name); strcpy(router->prevbinlog, router->binlog_name);
int rotated = 1;
if (strncmp(router->binlog_name, file, slen) != 0) if (strncmp(router->binlog_name, file, slen) != 0)
{ {
router->stats.n_rotates++; router->stats.n_rotates++;
if (blr_file_rotate(router, file, pos) == 0) if (blr_file_rotate(router, file, pos) == 0)
{ {
router->rotating = 0; rotated = 0;
return 0;
} }
} }
spinlock_acquire(&router->binlog_lock);
router->rotating = 0; router->rotating = 0;
return 1; spinlock_release(&router->binlog_lock);
return rotated;
} }
/** /**