From f9ee9ca028cdee9549bdd439c5bd674b3854c900 Mon Sep 17 00:00:00 2001 From: MassimilianoPinto Date: Wed, 30 Sep 2015 14:39:26 +0200 Subject: [PATCH] Addition of send_slave_heartbeat option Addition of send_slave_heartbeat option --- server/modules/include/blr.h | 4 ++- server/modules/routing/binlog/blr.c | 6 ++++ server/modules/routing/binlog/blr_master.c | 4 +++ server/modules/routing/binlog/blr_slave.c | 32 +++++++++++++++------- 4 files changed, 35 insertions(+), 11 deletions(-) diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 06d5619dd..08f0acbea 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -318,6 +318,7 @@ typedef struct router_slave { char *warning_msg; /*< Warning message */ int heartbeat; /*< Heartbeat in seconds */ uint8_t lastEventReceived; /*< Last event received */ + time_t lastReply; /*< Last event sent */ #if defined(SS_DEBUG) skygw_chk_t rses_chk_tail; #endif @@ -436,6 +437,7 @@ typedef struct router_instance { char *set_master_hostname; /*< Send custom Hostname to slaves */ char *set_master_uuid; /*< Send custom Master UUID to slaves */ char *set_master_server_id; /*< Send custom Master server_id to slaves */ + int send_slave_heartbeat; /*< Enable sending heartbeat to slaves */ struct router_instance *next; } ROUTER_INSTANCE; @@ -545,7 +547,7 @@ extern int blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *); extern int blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t); extern void blr_file_flush(ROUTER_INSTANCE *); extern BLFILE *blr_open_binlog(ROUTER_INSTANCE *, char *); -extern GWBUF *blr_read_binlog(ROUTER_INSTANCE *, BLFILE *, unsigned int, REP_HEADER *); +extern GWBUF *blr_read_binlog(ROUTER_INSTANCE *, BLFILE *, unsigned long, REP_HEADER *); extern void blr_close_binlog(ROUTER_INSTANCE *, BLFILE *); extern unsigned long blr_file_size(BLFILE *); extern int blr_statistics(ROUTER_INSTANCE *, ROUTER_SLAVE *, GWBUF *); diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index baa73f7ab..0087942ab 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -46,6 +46,7 @@ * If set those values are sent to slaves instead of * saved master responses * 23/08/2015 Massimiliano Pinto Added strerror_r + * 30/09/2015 Massimiliano Pinto Addition of send_slave_heartbeat option * * @endverbatim */ @@ -287,6 +288,7 @@ char task_name[BLRM_TASK_NAME_LEN+1] = ""; inst->set_master_hostname = NULL; inst->set_master_uuid = NULL; inst->set_master_server_id = NULL; + inst->send_slave_heartbeat = 0; inst->serverid = 0; @@ -468,6 +470,10 @@ char task_name[BLRM_TASK_NAME_LEN+1] = ""; inst->heartbeat = h_val; } } + else if (strcmp(options[i], "send_slave_heartbeat") == 0) + { + inst->send_slave_heartbeat = atoi(value); + } else if (strcmp(options[i], "binlogdir") == 0) { inst->binlogdir = strdup(value); diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index bf9509b17..3716dc370 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -1613,6 +1613,10 @@ int action; slave->lastEventTimestamp = hdr->timestamp; slave->lastEventReceived = hdr->event_type; + /* set lastReply */ + if (router->send_slave_heartbeat == 1) + slave->lastReply = time(0); + pkt = gwbuf_alloc(hdr->event_size + 5); buf = GWBUF_DATA(pkt); encode_value(buf, hdr->event_size + 1, 24); diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index e6bdcc035..7566e5c6a 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -237,8 +237,10 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) rc = blr_slave_binlog_dump(router, slave, queue); - if (rc) { + if (router->send_slave_heartbeat == 1 && rc && slave->heartbeat > 0) { snprintf(task_name, BLRM_TASK_NAME_LEN, "%s slaves heartbeat send", router->service->name); + + /* Add slave heartbeat check task: it runs with 1 second frequency */ hktask_add(task_name, blr_send_slave_heartbeat, router, 1); } @@ -1784,18 +1786,28 @@ uint32_t chksum; encode_value(ptr, chksum, 32); } - slave->lastEventTimestamp = time(0); + /* Send Fake Rotate Event */ + rval = slave->dcb->func.write(slave->dcb, resp); + + /* set lastEventReceived */ slave->lastEventReceived = ROTATE_EVENT; - rval = slave->dcb->func.write(slave->dcb, resp); + /* set lastReply for slave heartbeat check */ + if (router->send_slave_heartbeat) + slave->lastReply = time(0); /* Send the FORMAT_DESCRIPTION_EVENT */ if (slave->binlog_pos != 4) blr_slave_send_fde(router, slave); + /* set lastEventReceived */ + slave->lastEventReceived = FORMAT_DESCRIPTION_EVENT; + slave->dcb->low_water = router->low_water; slave->dcb->high_water = router->high_water; + dcb_add_callback(slave->dcb, DCB_REASON_DRAINED, blr_slave_callback, slave); + slave->state = BLRS_DUMPING; LOGIF(LM, (skygw_log_write( @@ -2001,6 +2013,10 @@ uint8_t *ptr; rval = written; slave->stats.n_events++; burst_size -= hdr.event_size; + + /* set lastReply for slave heartbeat check */ + if (router->send_slave_heartbeat) + slave->lastReply = time(0); } if (record == NULL) slave->stats.n_failed_read++; @@ -2306,9 +2322,6 @@ uint32_t chksum; chksum = crc32(chksum, GWBUF_DATA(record), hdr.event_size - 4); encode_value(ptr, chksum, 32); - slave->lastEventTimestamp = time(0); - slave->lastEventReceived = FORMAT_DESCRIPTION_EVENT; - slave->dcb->func.write(slave->dcb, head); } @@ -4323,19 +4336,18 @@ time_t t_now = time(0); { /* skip servers with state = 0 */ - if ( (sptr->state == BLRS_DUMPING) && (sptr->heartbeat > 0) && ((t_now + 1 - sptr->lastEventTimestamp) >= sptr->heartbeat) ) + if ( (sptr->state == BLRS_DUMPING) && (sptr->heartbeat > 0) && ((t_now + 1 - sptr->lastReply) >= sptr->heartbeat) ) { LOGIF(LM, (skygw_log_write( LOGFILE_MESSAGE, "Sending Heartbeat to slave server-id %d in State %d, cstate %d. " "Heartbeat interval is %d, last event time is %lu", sptr->serverid, sptr->state, sptr->cstate, sptr->heartbeat, - (unsigned long)sptr->lastEventTimestamp))); + (unsigned long)sptr->lastReply))); blr_slave_send_heartbeat(router, sptr); - sptr->lastEventTimestamp = t_now; + sptr->lastReply = t_now; - sptr->lastEventReceived = HEARTBEAT_EVENT; } sptr = sptr->next;