Addition of send_slave_heartbeat option

Addition of send_slave_heartbeat option
This commit is contained in:
MassimilianoPinto
2015-09-30 14:39:26 +02:00
parent 4aa161d78e
commit f9ee9ca028
4 changed files with 35 additions and 11 deletions

View File

@ -318,6 +318,7 @@ typedef struct router_slave {
char *warning_msg; /*< Warning message */ char *warning_msg; /*< Warning message */
int heartbeat; /*< Heartbeat in seconds */ int heartbeat; /*< Heartbeat in seconds */
uint8_t lastEventReceived; /*< Last event received */ uint8_t lastEventReceived; /*< Last event received */
time_t lastReply; /*< Last event sent */
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail; skygw_chk_t rses_chk_tail;
#endif #endif
@ -436,6 +437,7 @@ typedef struct router_instance {
char *set_master_hostname; /*< Send custom Hostname to slaves */ char *set_master_hostname; /*< Send custom Hostname to slaves */
char *set_master_uuid; /*< Send custom Master UUID to slaves */ char *set_master_uuid; /*< Send custom Master UUID to slaves */
char *set_master_server_id; /*< Send custom Master server_id to slaves */ char *set_master_server_id; /*< Send custom Master server_id to slaves */
int send_slave_heartbeat; /*< Enable sending heartbeat to slaves */
struct router_instance *next; struct router_instance *next;
} ROUTER_INSTANCE; } ROUTER_INSTANCE;
@ -545,7 +547,7 @@ extern int blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *);
extern int blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t); extern int blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t);
extern void blr_file_flush(ROUTER_INSTANCE *); extern void blr_file_flush(ROUTER_INSTANCE *);
extern BLFILE *blr_open_binlog(ROUTER_INSTANCE *, char *); extern BLFILE *blr_open_binlog(ROUTER_INSTANCE *, char *);
extern GWBUF *blr_read_binlog(ROUTER_INSTANCE *, BLFILE *, unsigned int, REP_HEADER *); extern GWBUF *blr_read_binlog(ROUTER_INSTANCE *, BLFILE *, unsigned long, REP_HEADER *);
extern void blr_close_binlog(ROUTER_INSTANCE *, BLFILE *); extern void blr_close_binlog(ROUTER_INSTANCE *, BLFILE *);
extern unsigned long blr_file_size(BLFILE *); extern unsigned long blr_file_size(BLFILE *);
extern int blr_statistics(ROUTER_INSTANCE *, ROUTER_SLAVE *, GWBUF *); extern int blr_statistics(ROUTER_INSTANCE *, ROUTER_SLAVE *, GWBUF *);

View File

@ -46,6 +46,7 @@
* If set those values are sent to slaves instead of * If set those values are sent to slaves instead of
* saved master responses * saved master responses
* 23/08/2015 Massimiliano Pinto Added strerror_r * 23/08/2015 Massimiliano Pinto Added strerror_r
* 30/09/2015 Massimiliano Pinto Addition of send_slave_heartbeat option
* *
* @endverbatim * @endverbatim
*/ */
@ -287,6 +288,7 @@ char task_name[BLRM_TASK_NAME_LEN+1] = "";
inst->set_master_hostname = NULL; inst->set_master_hostname = NULL;
inst->set_master_uuid = NULL; inst->set_master_uuid = NULL;
inst->set_master_server_id = NULL; inst->set_master_server_id = NULL;
inst->send_slave_heartbeat = 0;
inst->serverid = 0; inst->serverid = 0;
@ -468,6 +470,10 @@ char task_name[BLRM_TASK_NAME_LEN+1] = "";
inst->heartbeat = h_val; inst->heartbeat = h_val;
} }
} }
else if (strcmp(options[i], "send_slave_heartbeat") == 0)
{
inst->send_slave_heartbeat = atoi(value);
}
else if (strcmp(options[i], "binlogdir") == 0) else if (strcmp(options[i], "binlogdir") == 0)
{ {
inst->binlogdir = strdup(value); inst->binlogdir = strdup(value);

View File

@ -1613,6 +1613,10 @@ int action;
slave->lastEventTimestamp = hdr->timestamp; slave->lastEventTimestamp = hdr->timestamp;
slave->lastEventReceived = hdr->event_type; slave->lastEventReceived = hdr->event_type;
/* set lastReply */
if (router->send_slave_heartbeat == 1)
slave->lastReply = time(0);
pkt = gwbuf_alloc(hdr->event_size + 5); pkt = gwbuf_alloc(hdr->event_size + 5);
buf = GWBUF_DATA(pkt); buf = GWBUF_DATA(pkt);
encode_value(buf, hdr->event_size + 1, 24); encode_value(buf, hdr->event_size + 1, 24);

View File

@ -237,8 +237,10 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
rc = blr_slave_binlog_dump(router, slave, queue); rc = blr_slave_binlog_dump(router, slave, queue);
if (rc) { if (router->send_slave_heartbeat == 1 && rc && slave->heartbeat > 0) {
snprintf(task_name, BLRM_TASK_NAME_LEN, "%s slaves heartbeat send", router->service->name); snprintf(task_name, BLRM_TASK_NAME_LEN, "%s slaves heartbeat send", router->service->name);
/* Add slave heartbeat check task: it runs with 1 second frequency */
hktask_add(task_name, blr_send_slave_heartbeat, router, 1); hktask_add(task_name, blr_send_slave_heartbeat, router, 1);
} }
@ -1784,18 +1786,28 @@ uint32_t chksum;
encode_value(ptr, chksum, 32); encode_value(ptr, chksum, 32);
} }
slave->lastEventTimestamp = time(0); /* Send Fake Rotate Event */
rval = slave->dcb->func.write(slave->dcb, resp);
/* set lastEventReceived */
slave->lastEventReceived = ROTATE_EVENT; slave->lastEventReceived = ROTATE_EVENT;
rval = slave->dcb->func.write(slave->dcb, resp); /* set lastReply for slave heartbeat check */
if (router->send_slave_heartbeat)
slave->lastReply = time(0);
/* Send the FORMAT_DESCRIPTION_EVENT */ /* Send the FORMAT_DESCRIPTION_EVENT */
if (slave->binlog_pos != 4) if (slave->binlog_pos != 4)
blr_slave_send_fde(router, slave); blr_slave_send_fde(router, slave);
/* set lastEventReceived */
slave->lastEventReceived = FORMAT_DESCRIPTION_EVENT;
slave->dcb->low_water = router->low_water; slave->dcb->low_water = router->low_water;
slave->dcb->high_water = router->high_water; slave->dcb->high_water = router->high_water;
dcb_add_callback(slave->dcb, DCB_REASON_DRAINED, blr_slave_callback, slave); dcb_add_callback(slave->dcb, DCB_REASON_DRAINED, blr_slave_callback, slave);
slave->state = BLRS_DUMPING; slave->state = BLRS_DUMPING;
LOGIF(LM, (skygw_log_write( LOGIF(LM, (skygw_log_write(
@ -2001,6 +2013,10 @@ uint8_t *ptr;
rval = written; rval = written;
slave->stats.n_events++; slave->stats.n_events++;
burst_size -= hdr.event_size; burst_size -= hdr.event_size;
/* set lastReply for slave heartbeat check */
if (router->send_slave_heartbeat)
slave->lastReply = time(0);
} }
if (record == NULL) if (record == NULL)
slave->stats.n_failed_read++; slave->stats.n_failed_read++;
@ -2306,9 +2322,6 @@ uint32_t chksum;
chksum = crc32(chksum, GWBUF_DATA(record), hdr.event_size - 4); chksum = crc32(chksum, GWBUF_DATA(record), hdr.event_size - 4);
encode_value(ptr, chksum, 32); encode_value(ptr, chksum, 32);
slave->lastEventTimestamp = time(0);
slave->lastEventReceived = FORMAT_DESCRIPTION_EVENT;
slave->dcb->func.write(slave->dcb, head); slave->dcb->func.write(slave->dcb, head);
} }
@ -4323,19 +4336,18 @@ time_t t_now = time(0);
{ {
/* skip servers with state = 0 */ /* skip servers with state = 0 */
if ( (sptr->state == BLRS_DUMPING) && (sptr->heartbeat > 0) && ((t_now + 1 - sptr->lastEventTimestamp) >= sptr->heartbeat) ) if ( (sptr->state == BLRS_DUMPING) && (sptr->heartbeat > 0) && ((t_now + 1 - sptr->lastReply) >= sptr->heartbeat) )
{ {
LOGIF(LM, (skygw_log_write( LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE, "Sending Heartbeat to slave server-id %d in State %d, cstate %d. " LOGFILE_MESSAGE, "Sending Heartbeat to slave server-id %d in State %d, cstate %d. "
"Heartbeat interval is %d, last event time is %lu", "Heartbeat interval is %d, last event time is %lu",
sptr->serverid, sptr->state, sptr->cstate, sptr->heartbeat, sptr->serverid, sptr->state, sptr->cstate, sptr->heartbeat,
(unsigned long)sptr->lastEventTimestamp))); (unsigned long)sptr->lastReply)));
blr_slave_send_heartbeat(router, sptr); blr_slave_send_heartbeat(router, sptr);
sptr->lastEventTimestamp = t_now; sptr->lastReply = t_now;
sptr->lastEventReceived = HEARTBEAT_EVENT;
} }
sptr = sptr->next; sptr = sptr->next;