Change of distribution mechanism to use fake events exclusively and avoid

complex locking mechanism and blocking the master DCB eent processign thread

Addition of shared BLFILE mechanism to reduce open/close overhead
This commit is contained in:
Mark Riddoch
2014-10-03 14:33:28 +01:00
parent 501d0bcae6
commit fd1154b944
5 changed files with 205 additions and 119 deletions

View File

@ -172,6 +172,8 @@ int i;
inst->service = service;
spinlock_init(&inst->lock);
inst->files = NULL;
spinlock_init(&inst->fileslock);
inst->binlog_fd = -1;
@ -379,6 +381,7 @@ ROUTER_SLAVE *slave;
spinlock_init(&slave->catch_lock);
slave->dcb = session->client;
slave->router = inst;
slave->file = NULL;
/**
* Add this session to the list of active sessions.
@ -498,6 +501,9 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
*/
slave->state = BLRS_UNREGISTERED;
if (slave->file)
blr_close_binlog(router, slave->file);
/* Unlock */
rses_end_locked_router_action(slave);
}
@ -719,8 +725,6 @@ struct tm tm;
session->stats.minavgs[minno], min5, min10,
min15, min30);
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
dcb_printf(dcb, "\t\tNo. catchup NRs: %u\n", session->stats.n_catchupnr);
dcb_printf(dcb, "\t\tNo. already up to date: %u\n", session->stats.n_alreadyupd);
dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd);
dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb);
dcb_printf(dcb, "\t\tNo. of low water cbs N/A %u\n", session->stats.n_cbna);

View File

@ -50,9 +50,9 @@
#include <skygw_utils.h>
#include <log_manager.h>
extern int lm_enabled_logfiles_bitmask;
static void blr_file_create(ROUTER_INSTANCE *router, char *file);
static void blr_file_append(ROUTER_INSTANCE *router, char *file);
static uint32_t extract_field(uint8_t *src, int bits);
@ -85,6 +85,8 @@ struct dirent *dp;
if (access(path, R_OK) == -1)
mkdir(path, 0777);
router->binlogdir = strdup(path);
/* First try to find a binlog file number by reading the directory */
root_len = strlen(router->fileroot);
dirp = opendir(path);
@ -146,17 +148,11 @@ blr_file_rotate(ROUTER_INSTANCE *router, char *file, uint64_t pos)
static void
blr_file_create(ROUTER_INSTANCE *router, char *file)
{
char *ptr, path[1024];
char path[1024];
int fd;
unsigned char magic[] = BINLOG_MAGIC;
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
{
strcpy(path, ptr);
}
strcat(path, "/");
strcat(path, router->service->name);
strcpy(path, router->binlogdir);
strcat(path, "/");
strcat(path, file);
@ -186,16 +182,10 @@ unsigned char magic[] = BINLOG_MAGIC;
static void
blr_file_append(ROUTER_INSTANCE *router, char *file)
{
char *ptr, path[1024];
char path[1024];
int fd;
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
{
strcpy(path, ptr);
}
strcat(path, "/");
strcat(path, router->service->name);
strcpy(path, router->binlogdir);
strcat(path, "/");
strcat(path, file);
@ -238,57 +228,79 @@ blr_file_flush(ROUTER_INSTANCE *router)
fsync(router->binlog_fd);
}
int
/**
* Open a binlog file for reading binlog records
*
* @param router The router instance
* @param binlog The binlog filename
* @return a binlog file record
*/
BLFILE *
blr_open_binlog(ROUTER_INSTANCE *router, char *binlog)
{
char *ptr, path[1024];
int rval;
BLFILE *file;
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
spinlock_acquire(&router->fileslock);
file = router->files;
while (file && strcmp(file->binlogname, binlog) != 0)
file = file->next;
if (file)
{
strcpy(path, ptr);
file->refcnt++;
spinlock_release(&router->fileslock);
return file;
}
strcat(path, "/");
strcat(path, router->service->name);
if ((file = (BLFILE *)malloc(sizeof(BLFILE))) == NULL)
{
spinlock_release(&router->fileslock);
return NULL;
}
strcpy(file->binlogname, binlog);
file->refcnt = 1;
file->cache = 0;
spinlock_init(&file->lock);
strcpy(path, router->binlogdir);
strcat(path, "/");
strcat(path, binlog);
if ((rval = open(path, O_RDONLY, 0666)) == -1)
if ((file->fd = open(path, O_RDONLY, 0666)) == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to open binlog file %s\n", path)));
free(file);
spinlock_release(&router->fileslock);
return NULL;
}
return rval;
file->next = router->files;
router->files = file;
spinlock_release(&router->fileslock);
return file;
}
/**
* Read a replication event into a GWBUF structure.
*
* @param fd File descriptor of the binlog file
* @param file File record
* @param pos Position of binlog record to read
* @param hdr Binlog header to populate
* @return The binlog record wrapped in a GWBUF structure
*/
GWBUF *
blr_read_binlog(int fd, unsigned int pos, REP_HEADER *hdr)
blr_read_binlog(BLFILE *file, unsigned int pos, REP_HEADER *hdr)
{
uint8_t hdbuf[19];
GWBUF *result;
unsigned char *data;
int n;
if (lseek(fd, pos, SEEK_SET) != pos)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to seek for binlog entry, "
"at %d.\n", pos)));
return NULL;
}
/* Read the header information from the file */
if ((n = read(fd, hdbuf, 19)) != 19)
if ((n = pread(file->fd, hdbuf, 19, pos)) != 19)
{
switch (n)
{
@ -299,24 +311,26 @@ int n;
break;
case -1:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to read binlog file at position %d"
" (%s).\n", pos, strerror(errno))));
"Failed to read binlog file %s at position %d"
" (%s).\n", file->binlogname,
pos, strerror(errno))));
break;
default:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Short read when reading the header. "
"Expected 19 bytes got %d bytes.\n",
n)));
"Expected 19 bytes got %d bytes."
"Binlog file is %s, position %d\n",
file->binlogname, pos, n)));
break;
}
return NULL;
}
hdr->timestamp = extract_field(hdbuf, 32);
hdr->timestamp = EXTRACT32(hdbuf);
hdr->event_type = hdbuf[4];
hdr->serverid = extract_field(&hdbuf[5], 32);
hdr->event_size = extract_field(&hdbuf[9], 32);
hdr->next_pos = extract_field(&hdbuf[13], 32);
hdr->flags = extract_field(&hdbuf[17], 16);
hdr->serverid = EXTRACT32(&hdbuf[5]);
hdr->event_size = EXTRACT32(&hdbuf[9]);
hdr->next_pos = EXTRACT32(&hdbuf[13]);
hdr->flags = EXTRACT16(&hdbuf[17]);
if ((result = gwbuf_alloc(hdr->event_size)) == NULL)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
@ -327,7 +341,7 @@ int n;
}
data = GWBUF_DATA(result);
memcpy(data, hdbuf, 19); // Copy the header in
if ((n = read(fd, &data[19], hdr->event_size - 19))
if ((n = pread(file->fd, &data[19], hdr->event_size - 19, pos + 19))
!= hdr->event_size - 19) // Read the balance
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
@ -340,6 +354,40 @@ int n;
return result;
}
/**
* Close a binlog file that has been opened to read binlog records
*
* The open binlog files are shared between multiple slaves that are
* reading the same binlog file.
*
* @param router The router instance
* @param file The file to close
*/
void
blr_close_binlog(ROUTER_INSTANCE *router, BLFILE *file)
{
spinlock_acquire(&file->lock);
file->refcnt--;
if (file->refcnt == 0)
{
spinlock_acquire(&router->fileslock);
if (router->files == file)
router->files = file;
else
{
BLFILE *ptr = router->files;
while (ptr && ptr->next != file)
ptr = ptr->next;
if (ptr)
ptr->next = file->next;
}
spinlock_release(&router->fileslock);
close(file->fd);
}
spinlock_release(&file->lock);
}
/**
* Extract a numeric field from a packet of the specified number of bits
*

View File

@ -955,6 +955,9 @@ int action;
{
slave->stats.n_overrun++;
slave->overrun = 0;
#if QUEUE_SLAVE
poll_fake_write_event(slave->dcb);
#else
spinlock_release(&router->lock);
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
spinlock_release(&slave->catch_lock);
@ -965,6 +968,7 @@ int action;
continue;
else
break;
#endif
}
else
{
@ -991,6 +995,9 @@ int action;
*/
if (slave->cstate & CS_UPTODATE)
{
#if QUEUE_SLAVE
poll_fake_write_event(slave->dcb);
#else
nextslave = slave->next;
spinlock_release(&router->lock);
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
@ -1016,6 +1023,7 @@ int action;
{
break;
}
#endif
}
}
}

View File

@ -61,8 +61,8 @@ static int blr_slave_send_timestamp(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave
static int blr_slave_register(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
static int blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
static uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
static int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
extern int lm_enabled_logfiles_bitmask;
@ -577,7 +577,11 @@ uint32_t chksum;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_UPTODATE;
spinlock_release(&slave->catch_lock);
#if QUEUE_SLAVE
poll_fake_write_event(slave->dcb);
#else
rval = blr_slave_catchup(router, slave, true);
#endif
}
return rval;
@ -631,7 +635,7 @@ encode_value(unsigned char *data, unsigned int value, int len)
* @param hdr The packet header to populate
* @return A pointer to the first byte following the event header
*/
static uint8_t *
uint8_t *
blr_build_header(GWBUF *pkt, REP_HEADER *hdr)
{
uint8_t *ptr;
@ -688,7 +692,7 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
{
GWBUF *head, *record;
REP_HEADER hdr;
int written, fd, rval = 1, burst;
int written, rval = 1, burst;
uint8_t *ptr;
struct timespec req;
@ -701,6 +705,12 @@ unsigned long beat;
else
burst = router->short_burst;
spinlock_acquire(&slave->catch_lock);
#if QUEUE_SLAVE
if (slave->cstate & CS_BUSY)
return 0;
slave->cstate = CS_BUSY;
spinlock_release(&slave->catch_lock);
#else
while ((slave->cstate & (CS_HOLD|CS_BUSY)) == (CS_HOLD|CS_BUSY))
{
spinlock_release(&slave->catch_lock);
@ -715,29 +725,36 @@ unsigned long beat;
if (hkheartbeat - beat > 5) LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Long wait in blr_salve_catchup %ld00ms with %s burst, return without write records.\n",
hkheartbeat - beat, large ? "long" : "short")));
return 0;
return;
}
slave->cstate |= CS_BUSY;
spinlock_release(&slave->catch_lock);
if (hkheartbeat - beat > 5) LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Long wait in blr_salve_catchup %ld00ms with %s burst.\n",
"Long wait in blr_slave_catchup %ld00ms with %s burst.\n",
hkheartbeat - beat, large ? "long" : "short")));
#endif
if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
if (slave->file == NULL)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s\n",
slave->binlogfile)));
return 0;
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s\n",
slave->binlogfile)));
return 0;
}
}
slave->stats.n_bursts++;
while (burst-- &&
(record = blr_read_binlog(fd, slave->binlog_pos, &hdr)) != NULL)
(record = blr_read_binlog(slave->file, slave->binlog_pos, &hdr)) != NULL)
{
#if QUEUE_SLAVE
#else
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_HOLD;
spinlock_release(&slave->catch_lock);
#endif
head = gwbuf_alloc(5);
ptr = GWBUF_DATA(head);
encode_value(ptr, hdr.event_size + 1, 24);
@ -747,9 +764,9 @@ hkheartbeat - beat, large ? "long" : "short")));
head = gwbuf_append(head, record);
if (hdr.event_type == ROTATE_EVENT)
{
close(fd);
blr_close_binlog(router, slave->file);
blr_slave_rotate(slave, GWBUF_DATA(record));
if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
@ -765,9 +782,12 @@ hkheartbeat - beat, large ? "long" : "short")));
}
rval = written;
slave->stats.n_events++;
#if QUEUE_SLAVE
#else
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_HOLD;
spinlock_release(&slave->catch_lock);
#endif
}
if (record == NULL)
slave->stats.n_failed_read++;
@ -775,8 +795,6 @@ hkheartbeat - beat, large ? "long" : "short")));
slave->cstate &= ~CS_BUSY;
spinlock_release(&slave->catch_lock);
if (fd != -1)
close(fd);
if (record)
{
slave->stats.n_flows++;
@ -814,7 +832,7 @@ hkheartbeat - beat, large ? "long" : "short")));
* @param reason The reason the callback was called
* @param data The user data, in this case the server structure
*/
static int
int
blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data)
{
ROUTER_SLAVE *slave = (ROUTER_SLAVE *)data;
@ -823,8 +841,12 @@ ROUTER_INSTANCE *router = slave->router;
if (reason == DCB_REASON_DRAINED)
{
if (slave->state == BLRS_DUMPING &&
slave->binlog_pos != router->binlog_position)
(slave->binlog_pos != router->binlog_position
|| strcmp(slave->binlogfile, router->binlog_name)))
{
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_UPTODATE;
spinlock_release(&slave->catch_lock);
slave->stats.n_dcb++;
blr_slave_catchup(router, slave, true);
}