From fd1154b9445976811c87d6e65cd371a6c29dc3bd Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Fri, 3 Oct 2014 14:33:28 +0100 Subject: [PATCH] Change of distribution mechanism to use fake events exclusively and avoid complex locking mechanism and blocking the master DCB eent processign thread Addition of shared BLFILE mechanism to reduce open/close overhead --- server/modules/include/blr.h | 106 +++++++-------- server/modules/routing/binlog/blr.c | 8 +- server/modules/routing/binlog/blr_file.c | 142 ++++++++++++++------- server/modules/routing/binlog/blr_master.c | 8 ++ server/modules/routing/binlog/blr_slave.c | 60 ++++++--- 5 files changed, 205 insertions(+), 119 deletions(-) diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index a6973593e..84240675e 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -43,6 +43,8 @@ #define BLR_STATS_FREQ 60 #define BLR_NSTATS_MINUTES 30 +#define QUEUE_SLAVE 1 + /** * High and Low water marks for the slave dcb. These values can be overriden * by the router options highwater and lowwater. @@ -66,6 +68,52 @@ #define MYSQL_ERROR_MSG(buf) ((uint8_t *)GWBUF_DATA(buf) + 6) #define MYSQL_COMMAND(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4)) + +/** + * Packet header for replication messages + */ +typedef struct rep_header { + int payload_len; /*< Payload length (24 bits) */ + uint8_t seqno; /*< Response sequence number */ + uint8_t ok; /*< OK Byte from packet */ + uint32_t timestamp; /*< Timestamp - start of binlog record */ + uint8_t event_type; /*< Binlog event type */ + uint32_t serverid; /*< Server id of master */ + uint32_t event_size; /*< Size of header, post-header and body */ + uint32_t next_pos; /*< Position of next event */ + uint16_t flags; /*< Event flags */ +} REP_HEADER; + +/** + * The binlog record structure. This contains the actual packet read from the binlog + * file. + */ +typedef struct { + unsigned long position; /*< binlog record position for this cache entry */ + GWBUF *pkt; /*< The packet received from the master */ + REP_HEADER hdr; /*< The packet header */ +} BLCACHE_RECORD; + +/** + * The binlog cache. A cache exists for each file that hold cached bin log records. + * Caches will be used for all files being read by more than 1 slave. + */ +typedef struct { + BLCACHE_RECORD **records; /*< The actual binlog records */ + int current; /*< The next record that will be inserted */ + int cnt; /*< The number of records in the cache */ + SPINLOCK lock; /*< The spinlock for the cache */ +} BLCACHE; + +typedef struct blfile { + char binlogname[BINLOG_FNAMELEN+1]; /*< Name of the binlog file */ + int fd; /*< Actual file descriptor */ + int refcnt; /*< Reference count for file */ + BLCACHE *cache; /*< Record cache for this file */ + SPINLOCK lock; /*< The file lock */ + struct blfile *next; /*< Next file in list */ +} BLFILE; + /** * Slave statistics */ @@ -74,8 +122,6 @@ typedef struct { int n_bursts; /*< Number of bursts sent */ int n_requests; /*< Number of requests received */ int n_flows; /*< Number of flow control restarts */ - int n_catchupnr; /*< No. of times catchup resulted in not entering loop */ - int n_alreadyupd; int n_upd; int n_cb; int n_cbna; @@ -102,6 +148,7 @@ typedef struct router_slave { int binlog_pos; /*< Binlog position for this slave */ char binlogfile[BINLOG_FNAMELEN+1]; /*< Current binlog file for this slave */ + BLFILE *file; /*< Currently open binlog file */ int serverid; /*< Server-id of the slave */ char *hostname; /*< Hostname of the slave, if known */ char *user; /*< Username if given */ @@ -169,37 +216,6 @@ typedef struct { int fde_len; /*< Length of fde_event */ } MASTER_RESPONSES; -/** - * The binlog record structure. This contains the actual packet received from the - * master, the binlog position of the data in the packet, a point to the data and - * the length of the binlog record. - * - * This allows requests for binlog records in the cache to be serviced by simply - * sending the exact same packet as was received by MaxScale from the master. - * Items are written to the backing file as soon as they are received. The binlog - * cache is flushed of old records periodically, releasing the GWBUF's back to the - * free memory pool. - */ -typedef struct { - unsigned long position; /*< binlog record position for this cache entry */ - GWBUF *pkt; /*< The packet received from the master */ - unsigned char *data; /*< Pointer to the data within the packet */ - unsigned int record_len; /*< Binlog record length */ -} BLCACHE_RECORD; - -/** - * The binlog cache. A cache exists for each file that hold cached bin log records. - * Typically the router will hold two binlog caches, one for the current file and one - * for the previous file. - */ -typedef struct { - char filename[BINLOG_FNAMELEN+1]; - BLCACHE_RECORD *first; - BLCACHE_RECORD *current; - int cnt; -} BLCACHE; - - /** * The per instance data for the router. */ @@ -221,6 +237,7 @@ typedef struct router_instance { uint8_t lastEventReceived; GWBUF *residual; /*< Any residual binlog event */ MASTER_RESPONSES saved_master; /*< Saved master responses */ + char *binlogdir; /*< The directory with the binlog files */ char binlog_name[BINLOG_FNAMELEN+1]; /*< Name of the current binlog file */ uint64_t binlog_position; @@ -228,11 +245,12 @@ typedef struct router_instance { int binlog_fd; /*< File descriptor of the binlog * file being written */ + BLFILE *files; /*< Files used by the slaves */ + SPINLOCK fileslock; /*< Lock for the files queue above */ unsigned int low_water; /*< Low water mark for client DCB */ unsigned int high_water; /*< High water mark for client DCB */ unsigned int short_burst; /*< Short burst for slave catchup */ unsigned int long_burst; /*< Long burst for slave catchup */ - BLCACHE *cache[2]; ROUTER_STATS stats; /*< Statistics for this router */ int active_logs; int reconnect_pending; @@ -241,21 +259,6 @@ typedef struct router_instance { *next; } ROUTER_INSTANCE; -/** - * Packet header for replication messages - */ -typedef struct rep_header { - int payload_len; /*< Payload length (24 bits) */ - uint8_t seqno; /*< Response sequence number */ - uint8_t ok; /*< OK Byte from packet */ - uint32_t timestamp; /*< Timestamp - start of binlog record */ - uint8_t event_type; /*< Binlog event type */ - uint32_t serverid; /*< Server id of master */ - uint32_t event_size; /*< Size of header, post-header and body */ - uint32_t next_pos; /*< Position of next event */ - uint16_t flags; /*< Event flags */ -} REP_HEADER; - /** * State machine for the master to MaxScale replication */ @@ -399,9 +402,10 @@ extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool extern void blr_init_cache(ROUTER_INSTANCE *); extern void blr_file_init(ROUTER_INSTANCE *); -extern int blr_open_binlog(ROUTER_INSTANCE *, char *); extern void blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *); extern void blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t); extern void blr_file_flush(ROUTER_INSTANCE *); -extern GWBUF *blr_read_binlog(int, unsigned int, REP_HEADER *); +extern BLFILE *blr_open_binlog(ROUTER_INSTANCE *, char *); +extern GWBUF *blr_read_binlog(BLFILE *, unsigned int, REP_HEADER *); +extern void blr_close_binlog(ROUTER_INSTANCE *, BLFILE *); #endif diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 6dcff344a..d9659ddd2 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -172,6 +172,8 @@ int i; inst->service = service; spinlock_init(&inst->lock); + inst->files = NULL; + spinlock_init(&inst->fileslock); inst->binlog_fd = -1; @@ -379,6 +381,7 @@ ROUTER_SLAVE *slave; spinlock_init(&slave->catch_lock); slave->dcb = session->client; slave->router = inst; + slave->file = NULL; /** * Add this session to the list of active sessions. @@ -498,6 +501,9 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session; */ slave->state = BLRS_UNREGISTERED; + if (slave->file) + blr_close_binlog(router, slave->file); + /* Unlock */ rses_end_locked_router_action(slave); } @@ -719,8 +725,6 @@ struct tm tm; session->stats.minavgs[minno], min5, min10, min15, min30); dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows); - dcb_printf(dcb, "\t\tNo. catchup NRs: %u\n", session->stats.n_catchupnr); - dcb_printf(dcb, "\t\tNo. already up to date: %u\n", session->stats.n_alreadyupd); dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd); dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb); dcb_printf(dcb, "\t\tNo. of low water cbs N/A %u\n", session->stats.n_cbna); diff --git a/server/modules/routing/binlog/blr_file.c b/server/modules/routing/binlog/blr_file.c index aad5fea45..8bab32fae 100644 --- a/server/modules/routing/binlog/blr_file.c +++ b/server/modules/routing/binlog/blr_file.c @@ -50,9 +50,9 @@ #include #include - extern int lm_enabled_logfiles_bitmask; + static void blr_file_create(ROUTER_INSTANCE *router, char *file); static void blr_file_append(ROUTER_INSTANCE *router, char *file); static uint32_t extract_field(uint8_t *src, int bits); @@ -85,6 +85,8 @@ struct dirent *dp; if (access(path, R_OK) == -1) mkdir(path, 0777); + router->binlogdir = strdup(path); + /* First try to find a binlog file number by reading the directory */ root_len = strlen(router->fileroot); dirp = opendir(path); @@ -146,17 +148,11 @@ blr_file_rotate(ROUTER_INSTANCE *router, char *file, uint64_t pos) static void blr_file_create(ROUTER_INSTANCE *router, char *file) { -char *ptr, path[1024]; +char path[1024]; int fd; unsigned char magic[] = BINLOG_MAGIC; - strcpy(path, "/usr/local/skysql/MaxScale"); - if ((ptr = getenv("MAXSCALE_HOME")) != NULL) - { - strcpy(path, ptr); - } - strcat(path, "/"); - strcat(path, router->service->name); + strcpy(path, router->binlogdir); strcat(path, "/"); strcat(path, file); @@ -186,16 +182,10 @@ unsigned char magic[] = BINLOG_MAGIC; static void blr_file_append(ROUTER_INSTANCE *router, char *file) { -char *ptr, path[1024]; +char path[1024]; int fd; - strcpy(path, "/usr/local/skysql/MaxScale"); - if ((ptr = getenv("MAXSCALE_HOME")) != NULL) - { - strcpy(path, ptr); - } - strcat(path, "/"); - strcat(path, router->service->name); + strcpy(path, router->binlogdir); strcat(path, "/"); strcat(path, file); @@ -238,57 +228,79 @@ blr_file_flush(ROUTER_INSTANCE *router) fsync(router->binlog_fd); } -int +/** + * Open a binlog file for reading binlog records + * + * @param router The router instance + * @param binlog The binlog filename + * @return a binlog file record + */ +BLFILE * blr_open_binlog(ROUTER_INSTANCE *router, char *binlog) { char *ptr, path[1024]; -int rval; +BLFILE *file; - strcpy(path, "/usr/local/skysql/MaxScale"); - if ((ptr = getenv("MAXSCALE_HOME")) != NULL) + spinlock_acquire(&router->fileslock); + file = router->files; + while (file && strcmp(file->binlogname, binlog) != 0) + file = file->next; + + if (file) { - strcpy(path, ptr); + file->refcnt++; + spinlock_release(&router->fileslock); + return file; } - strcat(path, "/"); - strcat(path, router->service->name); + + if ((file = (BLFILE *)malloc(sizeof(BLFILE))) == NULL) + { + spinlock_release(&router->fileslock); + return NULL; + } + strcpy(file->binlogname, binlog); + file->refcnt = 1; + file->cache = 0; + spinlock_init(&file->lock); + + strcpy(path, router->binlogdir); strcat(path, "/"); strcat(path, binlog); - if ((rval = open(path, O_RDONLY, 0666)) == -1) + if ((file->fd = open(path, O_RDONLY, 0666)) == -1) { LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Failed to open binlog file %s\n", path))); + free(file); + spinlock_release(&router->fileslock); + return NULL; } - return rval; + file->next = router->files; + router->files = file; + spinlock_release(&router->fileslock); + + return file; } /** * Read a replication event into a GWBUF structure. * - * @param fd File descriptor of the binlog file + * @param file File record * @param pos Position of binlog record to read * @param hdr Binlog header to populate * @return The binlog record wrapped in a GWBUF structure */ GWBUF * -blr_read_binlog(int fd, unsigned int pos, REP_HEADER *hdr) +blr_read_binlog(BLFILE *file, unsigned int pos, REP_HEADER *hdr) { uint8_t hdbuf[19]; GWBUF *result; unsigned char *data; int n; - if (lseek(fd, pos, SEEK_SET) != pos) - { - LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, - "Failed to seek for binlog entry, " - "at %d.\n", pos))); - return NULL; - } - /* Read the header information from the file */ - if ((n = read(fd, hdbuf, 19)) != 19) + if ((n = pread(file->fd, hdbuf, 19, pos)) != 19) { switch (n) { @@ -299,24 +311,26 @@ int n; break; case -1: LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, - "Failed to read binlog file at position %d" - " (%s).\n", pos, strerror(errno)))); + "Failed to read binlog file %s at position %d" + " (%s).\n", file->binlogname, + pos, strerror(errno)))); break; default: LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Short read when reading the header. " - "Expected 19 bytes got %d bytes.\n", - n))); + "Expected 19 bytes got %d bytes." + "Binlog file is %s, position %d\n", + file->binlogname, pos, n))); break; } return NULL; } - hdr->timestamp = extract_field(hdbuf, 32); + hdr->timestamp = EXTRACT32(hdbuf); hdr->event_type = hdbuf[4]; - hdr->serverid = extract_field(&hdbuf[5], 32); - hdr->event_size = extract_field(&hdbuf[9], 32); - hdr->next_pos = extract_field(&hdbuf[13], 32); - hdr->flags = extract_field(&hdbuf[17], 16); + hdr->serverid = EXTRACT32(&hdbuf[5]); + hdr->event_size = EXTRACT32(&hdbuf[9]); + hdr->next_pos = EXTRACT32(&hdbuf[13]); + hdr->flags = EXTRACT16(&hdbuf[17]); if ((result = gwbuf_alloc(hdr->event_size)) == NULL) { LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, @@ -327,7 +341,7 @@ int n; } data = GWBUF_DATA(result); memcpy(data, hdbuf, 19); // Copy the header in - if ((n = read(fd, &data[19], hdr->event_size - 19)) + if ((n = pread(file->fd, &data[19], hdr->event_size - 19, pos + 19)) != hdr->event_size - 19) // Read the balance { LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, @@ -340,6 +354,40 @@ int n; return result; } +/** + * Close a binlog file that has been opened to read binlog records + * + * The open binlog files are shared between multiple slaves that are + * reading the same binlog file. + * + * @param router The router instance + * @param file The file to close + */ +void +blr_close_binlog(ROUTER_INSTANCE *router, BLFILE *file) +{ + spinlock_acquire(&file->lock); + file->refcnt--; + if (file->refcnt == 0) + { + spinlock_acquire(&router->fileslock); + if (router->files == file) + router->files = file; + else + { + BLFILE *ptr = router->files; + while (ptr && ptr->next != file) + ptr = ptr->next; + if (ptr) + ptr->next = file->next; + } + spinlock_release(&router->fileslock); + + close(file->fd); + } + spinlock_release(&file->lock); +} + /** * Extract a numeric field from a packet of the specified number of bits * diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index ad030359d..395d10ba3 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -955,6 +955,9 @@ int action; { slave->stats.n_overrun++; slave->overrun = 0; +#if QUEUE_SLAVE + poll_fake_write_event(slave->dcb); +#else spinlock_release(&router->lock); slave->cstate &= ~(CS_UPTODATE|CS_DIST); spinlock_release(&slave->catch_lock); @@ -965,6 +968,7 @@ int action; continue; else break; +#endif } else { @@ -991,6 +995,9 @@ int action; */ if (slave->cstate & CS_UPTODATE) { +#if QUEUE_SLAVE + poll_fake_write_event(slave->dcb); +#else nextslave = slave->next; spinlock_release(&router->lock); LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, @@ -1016,6 +1023,7 @@ int action; { break; } +#endif } } } diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index 01e39739e..d7c691892 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -61,8 +61,8 @@ static int blr_slave_send_timestamp(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave static int blr_slave_register(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue); static int blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue); int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large); -static uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr); -static int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data); +uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr); +int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data); extern int lm_enabled_logfiles_bitmask; @@ -577,7 +577,11 @@ uint32_t chksum; spinlock_acquire(&slave->catch_lock); slave->cstate &= ~CS_UPTODATE; spinlock_release(&slave->catch_lock); +#if QUEUE_SLAVE + poll_fake_write_event(slave->dcb); +#else rval = blr_slave_catchup(router, slave, true); +#endif } return rval; @@ -631,7 +635,7 @@ encode_value(unsigned char *data, unsigned int value, int len) * @param hdr The packet header to populate * @return A pointer to the first byte following the event header */ -static uint8_t * +uint8_t * blr_build_header(GWBUF *pkt, REP_HEADER *hdr) { uint8_t *ptr; @@ -688,7 +692,7 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large) { GWBUF *head, *record; REP_HEADER hdr; -int written, fd, rval = 1, burst; +int written, rval = 1, burst; uint8_t *ptr; struct timespec req; @@ -701,6 +705,12 @@ unsigned long beat; else burst = router->short_burst; spinlock_acquire(&slave->catch_lock); +#if QUEUE_SLAVE + if (slave->cstate & CS_BUSY) + return 0; + slave->cstate = CS_BUSY; + spinlock_release(&slave->catch_lock); +#else while ((slave->cstate & (CS_HOLD|CS_BUSY)) == (CS_HOLD|CS_BUSY)) { spinlock_release(&slave->catch_lock); @@ -715,29 +725,36 @@ unsigned long beat; if (hkheartbeat - beat > 5) LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Long wait in blr_salve_catchup %ld00ms with %s burst, return without write records.\n", hkheartbeat - beat, large ? "long" : "short"))); - return 0; + return; } slave->cstate |= CS_BUSY; spinlock_release(&slave->catch_lock); if (hkheartbeat - beat > 5) LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, - "Long wait in blr_salve_catchup %ld00ms with %s burst.\n", + "Long wait in blr_slave_catchup %ld00ms with %s burst.\n", hkheartbeat - beat, large ? "long" : "short"))); +#endif - if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1) + if (slave->file == NULL) { - LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, - "blr_slave_catchup failed to open binlog file %s\n", - slave->binlogfile))); - return 0; + if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "blr_slave_catchup failed to open binlog file %s\n", + slave->binlogfile))); + return 0; + } } slave->stats.n_bursts++; while (burst-- && - (record = blr_read_binlog(fd, slave->binlog_pos, &hdr)) != NULL) + (record = blr_read_binlog(slave->file, slave->binlog_pos, &hdr)) != NULL) { +#if QUEUE_SLAVE +#else spinlock_acquire(&slave->catch_lock); slave->cstate &= ~CS_HOLD; spinlock_release(&slave->catch_lock); +#endif head = gwbuf_alloc(5); ptr = GWBUF_DATA(head); encode_value(ptr, hdr.event_size + 1, 24); @@ -747,9 +764,9 @@ hkheartbeat - beat, large ? "long" : "short"))); head = gwbuf_append(head, record); if (hdr.event_type == ROTATE_EVENT) { - close(fd); + blr_close_binlog(router, slave->file); blr_slave_rotate(slave, GWBUF_DATA(record)); - if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1) + if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL) { LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, @@ -765,9 +782,12 @@ hkheartbeat - beat, large ? "long" : "short"))); } rval = written; slave->stats.n_events++; +#if QUEUE_SLAVE +#else spinlock_acquire(&slave->catch_lock); slave->cstate |= CS_HOLD; spinlock_release(&slave->catch_lock); +#endif } if (record == NULL) slave->stats.n_failed_read++; @@ -775,8 +795,6 @@ hkheartbeat - beat, large ? "long" : "short"))); slave->cstate &= ~CS_BUSY; spinlock_release(&slave->catch_lock); - if (fd != -1) - close(fd); if (record) { slave->stats.n_flows++; @@ -814,7 +832,7 @@ hkheartbeat - beat, large ? "long" : "short"))); * @param reason The reason the callback was called * @param data The user data, in this case the server structure */ -static int +int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data) { ROUTER_SLAVE *slave = (ROUTER_SLAVE *)data; @@ -823,8 +841,12 @@ ROUTER_INSTANCE *router = slave->router; if (reason == DCB_REASON_DRAINED) { if (slave->state == BLRS_DUMPING && - slave->binlog_pos != router->binlog_position) + (slave->binlog_pos != router->binlog_position + || strcmp(slave->binlogfile, router->binlog_name))) { + spinlock_acquire(&slave->catch_lock); + slave->cstate &= ~CS_UPTODATE; + spinlock_release(&slave->catch_lock); slave->stats.n_dcb++; blr_slave_catchup(router, slave, true); }