Added blr_check_heartbeat as task
Added blr_check_heartbeat as task
This commit is contained in:
@ -40,6 +40,7 @@
|
||||
* when an error is encountered in BLRM_BINLOGDUMP state.
|
||||
* Server error code and msg are reported via SHOW SLAVE STATUS
|
||||
* 03/08/2015 Massimiliano Pinto Initial implementation of transaction safety
|
||||
* 13/08/2015 Massimiliano Pinto Addition of heartbeat check
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -89,6 +90,8 @@ static char *blr_extract_column(GWBUF *buf, int col);
|
||||
void blr_cache_response(ROUTER_INSTANCE *router, char *response, GWBUF *buf);
|
||||
void poll_fake_write_event(DCB *dcb);
|
||||
GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr);
|
||||
static void blr_check_last_master_event(void *inst);
|
||||
extern int blr_check_heartbeat(ROUTER_INSTANCE *router);
|
||||
static int keepalive = 1;
|
||||
|
||||
/**
|
||||
@ -638,9 +641,22 @@ char query[128];
|
||||
router->service->dbref->server->port)));
|
||||
break;
|
||||
case BLRM_BINLOGDUMP:
|
||||
{
|
||||
char *name;
|
||||
|
||||
// Main body, we have received a binlog record from the master
|
||||
blr_handle_binlog_record(router, buf);
|
||||
|
||||
// set heartbeat check task
|
||||
if ((name = (char *)malloc(80)) != NULL)
|
||||
{
|
||||
sprintf(name, "%s heartbeat", router->service->name);
|
||||
hktask_add(name, blr_check_last_master_event, router, router->heartbeat);
|
||||
free(name);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (router->reconnect_pending)
|
||||
@ -978,6 +994,14 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
free(router->m_errmsg);
|
||||
router->m_errmsg = NULL;
|
||||
|
||||
router->stats.n_binlogs++;
|
||||
router->lastEventReceived = hdr.event_type;
|
||||
router->lastEventTimestamp = hdr.timestamp;
|
||||
|
||||
#ifdef SHOW_EVENTS
|
||||
printf("blr: event type 0x%02x, flags 0x%04x, event size %d, event timestamp %lu\n", hdr.event_type, hdr.flags, hdr.event_size, hdr.timestamp);
|
||||
#endif
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
/*
|
||||
@ -1012,13 +1036,7 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
return;
|
||||
}
|
||||
}
|
||||
router->stats.n_binlogs++;
|
||||
router->lastEventReceived = hdr.event_type;
|
||||
router->lastEventTimestamp = hdr.timestamp;
|
||||
|
||||
#ifdef SHOW_EVENTS
|
||||
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
|
||||
#endif
|
||||
/**
|
||||
* Check for an open transaction, if the option is set
|
||||
* Only complete transactions should be sent to sleves
|
||||
@ -1118,6 +1136,7 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
router->binlog_name,
|
||||
router->current_pos)));
|
||||
router->stats.n_fakeevents++;
|
||||
|
||||
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
|
||||
{
|
||||
uint8_t *new_fde;
|
||||
@ -1162,7 +1181,9 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
"Binlog %s @ %d.",
|
||||
router->binlog_name,
|
||||
router->current_pos)));
|
||||
|
||||
router->stats.n_heartbeats++;
|
||||
|
||||
}
|
||||
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
|
||||
{
|
||||
@ -1870,3 +1891,103 @@ int n;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop and start the master connection
|
||||
*
|
||||
* @param router The router instance
|
||||
*/
|
||||
void
|
||||
blr_stop_start_master(ROUTER_INSTANCE *router) {
|
||||
|
||||
if (router->master) {
|
||||
if (router->master->fd != -1 && router->master->state == DCB_STATE_POLLING) {
|
||||
blr_master_close(router);
|
||||
}
|
||||
}
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
router->master_state = BLRM_SLAVE_STOPPED;
|
||||
|
||||
/* set last_safe_pos */
|
||||
router->last_safe_pos = router->binlog_position;
|
||||
|
||||
/**
|
||||
* Set router->prevbinlog to router->binlog_name
|
||||
* The FDE event with current filename may arrive after STOP SLAVE is received
|
||||
*/
|
||||
|
||||
if (strcmp(router->binlog_name, router->prevbinlog) != 0)
|
||||
strncpy(router->prevbinlog, router->binlog_name, BINLOG_FNAMELEN);
|
||||
|
||||
if (router->client) {
|
||||
if (router->client->fd != -1 && router->client->state == DCB_STATE_POLLING) {
|
||||
dcb_close(router->client);
|
||||
router->client = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/* Discard the queued residual data */
|
||||
while (router->residual)
|
||||
{
|
||||
router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual));
|
||||
}
|
||||
router->residual = NULL;
|
||||
|
||||
/* Now it is safe to unleash other threads on this router instance */
|
||||
router->reconnect_pending = 0;
|
||||
router->active_logs = 0;
|
||||
|
||||
router->master_state = BLRM_UNCONNECTED;
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
blr_start_master(router);
|
||||
}
|
||||
|
||||
/**
|
||||
* The heartbeat check function called from the housekeeper.
|
||||
* We can try a new master connection if current one is seen out of date
|
||||
*
|
||||
* @param router Current router instance
|
||||
*/
|
||||
|
||||
static void
|
||||
blr_check_last_master_event(void *inst) {
|
||||
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)inst;
|
||||
int master_check = 1;
|
||||
int master_state = BLRM_UNCONNECTED;
|
||||
char *name = NULL;
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
master_check = blr_check_heartbeat(router);
|
||||
|
||||
master_state = router->master_state;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
if (!master_check) {
|
||||
/*
|
||||
* stop current master connection
|
||||
* and try a new connection
|
||||
*/
|
||||
blr_stop_start_master(router);
|
||||
}
|
||||
|
||||
if ( (!master_check) || (master_state != BLRM_BINLOGDUMP) ) {
|
||||
/*
|
||||
* Remove the task, it will be added again
|
||||
* when master state is back to BLRM_BINLOGDUMP
|
||||
* by blr_master_response()
|
||||
*/
|
||||
if ((name = (char *)malloc(80)) != NULL) {
|
||||
sprintf(name, "%s heartbeat", router->service->name);
|
||||
|
||||
hktask_remove(name);
|
||||
|
||||
free(name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user