From df1ff25be4fb1e3e497c76f7b96051087384a1b9 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Wed, 10 Dec 2014 09:07:14 +0000 Subject: [PATCH] Changes for file system full detection --- server/modules/include/blr.h | 6 +- server/modules/routing/binlog/blr_file.c | 19 ++++- server/modules/routing/binlog/blr_master.c | 89 ++++++++++++++++++++-- 3 files changed, 102 insertions(+), 12 deletions(-) diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 2225ff655..48e3a6c9a 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -429,9 +429,9 @@ extern void blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr); extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large); extern void blr_init_cache(ROUTER_INSTANCE *); -extern void blr_file_init(ROUTER_INSTANCE *); -extern void blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *); -extern void blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t); +extern int blr_file_init(ROUTER_INSTANCE *); +extern int blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *); +extern int blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t); extern void blr_file_flush(ROUTER_INSTANCE *); extern BLFILE *blr_open_binlog(ROUTER_INSTANCE *, char *); extern GWBUF *blr_read_binlog(ROUTER_INSTANCE *, BLFILE *, unsigned int, REP_HEADER *); diff --git a/server/modules/routing/binlog/blr_file.c b/server/modules/routing/binlog/blr_file.c index 52b60e66c..e87e219ff 100644 --- a/server/modules/routing/binlog/blr_file.c +++ b/server/modules/routing/binlog/blr_file.c @@ -242,15 +242,30 @@ int fd; * @param router The router instance * @param buf The binlog record * @param len The length of the binlog record + * @return Return the number of bytes written */ -void +int blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf) { - pwrite(router->binlog_fd, buf, hdr->event_size, hdr->next_pos - hdr->event_size); +int n; + + if ((n = pwrite(router->binlog_fd, buf, hdr->event_size, + hdr->next_pos - hdr->event_size)) != hdr->event_size) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "%s: Failed to write binlog record at %d of %s. " + "Truncating to previous record.", + router->service->name, hdr->next_pos - hdr->event_size, + router->binlog_name))); + /* Remove any partual event that was written */ + ftruncate(router->binlog_fd, hdr->next_pos - hdr->event_size); + return 0; + } spinlock_acquire(&router->binlog_lock); router->binlog_position = hdr->next_pos; router->last_written = hdr->next_pos - hdr->event_size; spinlock_release(&router->binlog_lock); + return n; } /** diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index db95cf6c5..ecf702b5e 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -71,13 +71,13 @@ static GWBUF *blr_make_registration(ROUTER_INSTANCE *router); static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router); void encode_value(unsigned char *data, unsigned int value, int len); void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt); -static void blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr); +static int blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr); void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr); static void *CreateMySQLAuthData(char *username, char *password, char *database); void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr); inline uint32_t extract_field(uint8_t *src, int bits); static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len); - +static void blr_master_close(ROUTER_INSTANCE *); static int keepalive = 1; /** @@ -247,6 +247,37 @@ int do_reconnect = 0; } } +/** + * Shutdown a connection to the master + * + * @param router The router instance + */ +void +blr_master_close(ROUTER_INSTANCE *router) +{ + dcb_close(router->master); + router->master_state = BLRM_UNCONNECTED; +} + +/** + * Mark this master connection for a delayed reconnect, used during + * error recovery to cause a reconnect after 60 seconds. + * + * @param router The router instance + */ +void +blr_master_delayed_connect(ROUTER_INSTANCE *router) +{ +char *name; + + if ((name = malloc(strlen(router->service->name) + + strlen(" Master Recovery")+1)) != NULL); + { + sprintf(name, "%s Master Recovery", router->service->name); + hktask_oneshot(name, blr_start_master, router, 60); + } +} + /** * Binlog router master side state machine event handler. * @@ -809,10 +840,36 @@ static REP_HEADER phdr; // into the binlog file if (hdr.event_type == ROTATE_EVENT) router->rotating = 1; - blr_write_binlog_record(router, &hdr, ptr); + if (blr_write_binlog_record(router, &hdr, ptr) == 0) + { + /* + * Failed to write to the + * binlog file, destroy the + * buffer chain and close the + * connection with the master + */ + while ((pkt = gwbuf_consume(pkt, + GWBUF_LENGTH(pkt))) != NULL); + blr_master_close(router); + blr_master_delayed_connect(router); + return; + } if (hdr.event_type == ROTATE_EVENT) { - blr_rotate_event(router, ptr, &hdr); + if (!blr_rotate_event(router, ptr, &hdr)) + { + /* + * Failed to write to the + * binlog file, destroy the + * buffer chain and close the + * connection with the master + */ + while ((pkt = gwbuf_consume(pkt, + GWBUF_LENGTH(pkt))) != NULL); + blr_master_close(router); + blr_master_delayed_connect(router); + return; + } } blr_distribute_binlog_record(router, &hdr, ptr); } @@ -833,7 +890,20 @@ static REP_HEADER phdr; if (hdr.event_type == ROTATE_EVENT) { router->rotating = 1; - blr_rotate_event(router, ptr, &hdr); + if (!blr_rotate_event(router, ptr, &hdr)) + { + /* + * Failed to write to the + * binlog file, destroy the + * buffer chain and close the + * connection with the master + */ + while ((pkt = gwbuf_consume(pkt, + GWBUF_LENGTH(pkt))) != NULL); + blr_master_close(router); + blr_master_delayed_connect(router); + return; + } } } } @@ -933,7 +1003,7 @@ register uint32_t rval = 0, shift = 0; * @param ptr The packet containing the rotate event * @param hdr The replication message header */ -static void +static int blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr) { int len, slen; @@ -963,9 +1033,14 @@ char file[BINLOG_FNAMELEN+1]; if (strncmp(router->binlog_name, file, slen) != 0) { router->stats.n_rotates++; - blr_file_rotate(router, file, pos); + if (blr_file_rotate(router, file, pos) == 0) + { + router->rotating = 0; + return 0; + } } router->rotating = 0; + return 1; } /**