Binlog router performance improvements
This commit is contained in:
@ -239,6 +239,7 @@ typedef struct router_instance {
|
|||||||
GWBUF *residual; /*< Any residual binlog event */
|
GWBUF *residual; /*< Any residual binlog event */
|
||||||
MASTER_RESPONSES saved_master; /*< Saved master responses */
|
MASTER_RESPONSES saved_master; /*< Saved master responses */
|
||||||
char *binlogdir; /*< The directory with the binlog files */
|
char *binlogdir; /*< The directory with the binlog files */
|
||||||
|
SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */
|
||||||
char binlog_name[BINLOG_FNAMELEN+1];
|
char binlog_name[BINLOG_FNAMELEN+1];
|
||||||
/*< Name of the current binlog file */
|
/*< Name of the current binlog file */
|
||||||
uint64_t binlog_position;
|
uint64_t binlog_position;
|
||||||
@ -248,6 +249,7 @@ typedef struct router_instance {
|
|||||||
*/
|
*/
|
||||||
uint64_t last_written; /*< Position of last event written */
|
uint64_t last_written; /*< Position of last event written */
|
||||||
char prevbinlog[BINLOG_FNAMELEN+1];
|
char prevbinlog[BINLOG_FNAMELEN+1];
|
||||||
|
int rotating; /*< Rotation in progress flag */
|
||||||
BLFILE *files; /*< Files used by the slaves */
|
BLFILE *files; /*< Files used by the slaves */
|
||||||
SPINLOCK fileslock; /*< Lock for the files queue above */
|
SPINLOCK fileslock; /*< Lock for the files queue above */
|
||||||
unsigned int low_water; /*< Low water mark for client DCB */
|
unsigned int low_water; /*< Low water mark for client DCB */
|
||||||
|
@ -174,6 +174,7 @@ int i;
|
|||||||
spinlock_init(&inst->lock);
|
spinlock_init(&inst->lock);
|
||||||
inst->files = NULL;
|
inst->files = NULL;
|
||||||
spinlock_init(&inst->fileslock);
|
spinlock_init(&inst->fileslock);
|
||||||
|
spinlock_init(&inst->binlog_lock);
|
||||||
|
|
||||||
inst->binlog_fd = -1;
|
inst->binlog_fd = -1;
|
||||||
|
|
||||||
@ -305,6 +306,7 @@ int i;
|
|||||||
inst->active_logs = 0;
|
inst->active_logs = 0;
|
||||||
inst->reconnect_pending = 0;
|
inst->reconnect_pending = 0;
|
||||||
inst->handling_threads = 0;
|
inst->handling_threads = 0;
|
||||||
|
inst->rotating = 0;
|
||||||
inst->residual = NULL;
|
inst->residual = NULL;
|
||||||
inst->slaves = NULL;
|
inst->slaves = NULL;
|
||||||
inst->next = NULL;
|
inst->next = NULL;
|
||||||
@ -679,6 +681,8 @@ struct tm tm;
|
|||||||
spinlock_stats(&instlock, spin_reporter, dcb);
|
spinlock_stats(&instlock, spin_reporter, dcb);
|
||||||
dcb_printf(dcb, "\tSpinlock statistics (instance lock):\n");
|
dcb_printf(dcb, "\tSpinlock statistics (instance lock):\n");
|
||||||
spinlock_stats(&router_inst->lock, spin_reporter, dcb);
|
spinlock_stats(&router_inst->lock, spin_reporter, dcb);
|
||||||
|
dcb_printf(dcb, "\tSpinlock statistics (binlog position lock):\n");
|
||||||
|
spinlock_stats(&router_inst->binlog_lock, spin_reporter, dcb);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (router_inst->slaves)
|
if (router_inst->slaves)
|
||||||
@ -743,9 +747,11 @@ struct tm tm;
|
|||||||
dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]);
|
dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]);
|
||||||
if ((session->cstate & CS_UPTODATE) == 0)
|
if ((session->cstate & CS_UPTODATE) == 0)
|
||||||
{
|
{
|
||||||
dcb_printf(dcb, "\t\tSlave is in catchup mode. %s\n",
|
dcb_printf(dcb, "\t\tSlave is in catchup mode. %s%s\n",
|
||||||
((session->cstate & CS_EXPECTCB) == 0 ? "" :
|
((session->cstate & CS_EXPECTCB) == 0 ? "" :
|
||||||
"Waiting for DCB queue to drain."));
|
"Waiting for DCB queue to drain."),
|
||||||
|
((session->cstate & CS_BUSY) == 0 ? "" :
|
||||||
|
" Busy in slave catchup."));
|
||||||
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -168,8 +168,10 @@ unsigned char magic[] = BINLOG_MAGIC;
|
|||||||
}
|
}
|
||||||
fsync(fd);
|
fsync(fd);
|
||||||
close(router->binlog_fd);
|
close(router->binlog_fd);
|
||||||
|
spinlock_acquire(&router->binlog_lock);
|
||||||
strcpy(router->binlog_name, file);
|
strcpy(router->binlog_name, file);
|
||||||
router->binlog_position = 4; /* Initial position after the magic number */
|
router->binlog_position = 4; /* Initial position after the magic number */
|
||||||
|
spinlock_release(&router->binlog_lock);
|
||||||
router->binlog_fd = fd;
|
router->binlog_fd = fd;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -199,8 +201,10 @@ int fd;
|
|||||||
}
|
}
|
||||||
fsync(fd);
|
fsync(fd);
|
||||||
close(router->binlog_fd);
|
close(router->binlog_fd);
|
||||||
|
spinlock_acquire(&router->binlog_lock);
|
||||||
strcpy(router->binlog_name, file);
|
strcpy(router->binlog_name, file);
|
||||||
router->binlog_position = lseek(fd, 0L, SEEK_END);
|
router->binlog_position = lseek(fd, 0L, SEEK_END);
|
||||||
|
spinlock_release(&router->binlog_lock);
|
||||||
router->binlog_fd = fd;
|
router->binlog_fd = fd;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -215,8 +219,10 @@ void
|
|||||||
blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf)
|
blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf)
|
||||||
{
|
{
|
||||||
pwrite(router->binlog_fd, buf, hdr->event_size, hdr->next_pos - hdr->event_size);
|
pwrite(router->binlog_fd, buf, hdr->event_size, hdr->next_pos - hdr->event_size);
|
||||||
|
spinlock_acquire(&router->binlog_lock);
|
||||||
router->binlog_position = hdr->next_pos;
|
router->binlog_position = hdr->next_pos;
|
||||||
router->last_written = hdr->next_pos - hdr->event_size;
|
router->last_written = hdr->next_pos - hdr->event_size;
|
||||||
|
spinlock_release(&router->binlog_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -308,7 +314,7 @@ struct stat statb;
|
|||||||
filelen = statb.st_size;
|
filelen = statb.st_size;
|
||||||
if (pos >= filelen)
|
if (pos >= filelen)
|
||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
LOGIF(LD, (skygw_log_write(LOGFILE_ERROR,
|
||||||
"Attempting to read off the end of the binlog file %s, "
|
"Attempting to read off the end of the binlog file %s, "
|
||||||
"event at %lu.", file->binlogname, pos)));
|
"event at %lu.", file->binlogname, pos)));
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -695,6 +695,7 @@ static REP_HEADER phdr;
|
|||||||
}
|
}
|
||||||
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
|
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
|
||||||
{
|
{
|
||||||
|
router->rotating = 1;
|
||||||
ptr = ptr + 5; // We don't put the first byte of the payload
|
ptr = ptr + 5; // We don't put the first byte of the payload
|
||||||
// into the binlog file
|
// into the binlog file
|
||||||
blr_write_binlog_record(router, &hdr, ptr);
|
blr_write_binlog_record(router, &hdr, ptr);
|
||||||
@ -717,6 +718,7 @@ static REP_HEADER phdr;
|
|||||||
hdr.event_size,
|
hdr.event_size,
|
||||||
router->binlog_name,
|
router->binlog_name,
|
||||||
router->binlog_position)));
|
router->binlog_position)));
|
||||||
|
router->rotating = 1;
|
||||||
ptr += 5;
|
ptr += 5;
|
||||||
if (hdr.event_type == ROTATE_EVENT)
|
if (hdr.event_type == ROTATE_EVENT)
|
||||||
{
|
{
|
||||||
@ -852,6 +854,7 @@ char file[BINLOG_FNAMELEN+1];
|
|||||||
router->stats.n_rotates++;
|
router->stats.n_rotates++;
|
||||||
blr_file_rotate(router, file, pos);
|
blr_file_rotate(router, file, pos);
|
||||||
}
|
}
|
||||||
|
router->rotating = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -943,9 +946,9 @@ int action;
|
|||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* The slave should be up to date, check that the binlog
|
* The slave should be up to date, check that the binlog
|
||||||
* position matches the event we have to distribute or this
|
* position matches the event we have to distribute or
|
||||||
* is a rotate event. Send the event directly from memory to
|
* this is a rotate event. Send the event directly from
|
||||||
* the slave.
|
* memory to the slave.
|
||||||
*/
|
*/
|
||||||
pkt = gwbuf_alloc(hdr->event_size + 5);
|
pkt = gwbuf_alloc(hdr->event_size + 5);
|
||||||
buf = GWBUF_DATA(pkt);
|
buf = GWBUF_DATA(pkt);
|
||||||
@ -976,6 +979,17 @@ int action;
|
|||||||
}
|
}
|
||||||
spinlock_release(&slave->catch_lock);
|
spinlock_release(&slave->catch_lock);
|
||||||
}
|
}
|
||||||
|
else if (slave->binlog_pos == hdr->next_pos
|
||||||
|
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Slave has already read record from file, no
|
||||||
|
* need to distrbute this event
|
||||||
|
*/
|
||||||
|
spinlock_acquire(&slave->catch_lock);
|
||||||
|
slave->cstate &= ~CS_BUSY;
|
||||||
|
spinlock_release(&slave->catch_lock);
|
||||||
|
}
|
||||||
else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size)
|
else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size)
|
||||||
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
|
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
|
||||||
{
|
{
|
||||||
|
@ -775,9 +775,13 @@ unsigned long beat;
|
|||||||
strcmp(slave->binlogfile, router->binlog_name) == 0)
|
strcmp(slave->binlogfile, router->binlog_name) == 0)
|
||||||
{
|
{
|
||||||
int state_change = 0;
|
int state_change = 0;
|
||||||
|
spinlock_acquire(&router->binlog_lock);
|
||||||
spinlock_acquire(&slave->catch_lock);
|
spinlock_acquire(&slave->catch_lock);
|
||||||
|
|
||||||
/* Now check again since we hold the slave->catch_lock. */
|
/*
|
||||||
|
* Now check again since we hold the router->binlog_lock
|
||||||
|
* and slave->catch_lock.
|
||||||
|
*/
|
||||||
if (slave->binlog_pos != router->binlog_position ||
|
if (slave->binlog_pos != router->binlog_position ||
|
||||||
strcmp(slave->binlogfile, router->binlog_name) != 0)
|
strcmp(slave->binlogfile, router->binlog_name) != 0)
|
||||||
{
|
{
|
||||||
@ -799,6 +803,7 @@ unsigned long beat;
|
|||||||
memlog_log(slave->clog, 5);
|
memlog_log(slave->clog, 5);
|
||||||
}
|
}
|
||||||
spinlock_release(&slave->catch_lock);
|
spinlock_release(&slave->catch_lock);
|
||||||
|
spinlock_release(&router->binlog_lock);
|
||||||
|
|
||||||
if (state_change)
|
if (state_change)
|
||||||
{
|
{
|
||||||
@ -809,10 +814,15 @@ unsigned long beat;
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (strcmp(router->binlog_name, slave->binlogfile) != 0)
|
if (router->rotating != 0 && strcmp(router->binlog_name, slave->binlogfile) != 0)
|
||||||
{
|
{
|
||||||
/* We may have reached the end of file of a non-current
|
/* We may have reached the end of file of a non-current
|
||||||
* binlog file.
|
* binlog file.
|
||||||
|
*
|
||||||
|
* Note if the master is rotating there is a window during
|
||||||
|
* whch the rotate event has been written to the old binlog
|
||||||
|
* but the new binlog file has not yet been created. Therefore
|
||||||
|
* we ignore these issues during the rotate processing.
|
||||||
*/
|
*/
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||||
"Slave reached end of file for binlong file %s "
|
"Slave reached end of file for binlong file %s "
|
||||||
|
Reference in New Issue
Block a user