Addition of slave heartbeat check

Addition of slave heartbeat check
This commit is contained in:
MassimilianoPinto
2015-09-25 15:58:36 +02:00
parent 653bb57e7f
commit b6df52a68e
5 changed files with 228 additions and 35 deletions

View File

@ -55,6 +55,9 @@
* 03/09/2015 Massimiliano Pinto Added support for SHOW [GLOBAL] VARIABLES LIKE
* 04/09/2015 Massimiliano Pinto Added support for SHOW WARNINGS
* 15/09/2015 Massimiliano Pinto Added support for SHOW [GLOBAL] STATUS LIKE 'Uptime'
* 25/09/2015 Massimiliano Pinto Addition of slave heartbeat:
* the period set during registration is checked
* and heartbeat event might be sent to the affected slave.
*
* @endverbatim
*/
@ -145,6 +148,8 @@ extern int MaxScaleUptime();
static int blr_slave_send_status_variable(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *variable, char *value, int column_type);
static int blr_slave_handle_status_variables(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *stmt);
static int blr_slave_send_columndef_with_status_schema(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *name, int type, int len, uint8_t seqno);
static void blr_send_slave_heartbeat(void *inst);
static int blr_slave_send_heartbeat(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
void poll_fake_write_event(DCB *dcb);
@ -226,8 +231,20 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
}
break;
case COM_BINLOG_DUMP:
return blr_slave_binlog_dump(router, slave, queue);
{
char task_name[BLRM_TASK_NAME_LEN+1]="";
int rc = 0;
rc = blr_slave_binlog_dump(router, slave, queue);
if (rc) {
snprintf(task_name, BLRM_TASK_NAME_LEN, "%s slaves heartbeat send", router->service->name);
hktask_add(task_name, blr_send_slave_heartbeat, router, 1);
}
return rc;
break;
}
case COM_STATISTICS:
return blr_statistics(router, slave, queue);
break;
@ -613,6 +630,22 @@ extern char *strcasestr();
}
else if (strcasecmp(word, "@master_heartbeat_period") == 0)
{
int slave_heartbeat;
int v_len = 0;
word = strtok_r(NULL, sep, &brkb);
if (word) {
char *new_val;
v_len = strlen(word);
if (v_len > 6) {
new_val = strndup(word, v_len - 6);
slave->heartbeat = atoi(new_val) / 1000;
} else {
new_val = strndup(word, v_len);
slave->heartbeat = atoi(new_val) / 1000000;
}
free(new_val);
}
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.heartbeat);
}
@ -1708,9 +1741,9 @@ uint32_t chksum;
if (slave->nocrc)
len = 19 + 8 + binlognamelen;
len = BINLOG_EVENT_HDR_LEN + 8 + binlognamelen;
else
len = 19 + 8 + 4 + binlognamelen;
len = BINLOG_EVENT_HDR_LEN + 8 + 4 + binlognamelen;
// Build a fake rotate event
resp = gwbuf_alloc(len + 5);
@ -1744,6 +1777,9 @@ uint32_t chksum;
encode_value(ptr, chksum, 32);
}
slave->lastEventTimestamp = time(0);
slave->lastEventReceived = ROTATE_EVENT;
rval = slave->dcb->func.write(slave->dcb, resp);
/* Send the FORMAT_DESCRIPTION_EVENT */
@ -1913,15 +1949,18 @@ uint8_t *ptr;
*ptr++ = 0; // OK
head = gwbuf_append(head, record);
slave->lastEventTimestamp = hdr.timestamp;
slave->lastEventReceived = hdr.event_type;
if (hdr.event_type == ROTATE_EVENT)
{
unsigned long beat1 = hkheartbeat;
unsigned long beat1 = hkheartbeat;
blr_close_binlog(router, slave->file);
if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "blr_close_binlog took %d beats",
hkheartbeat - beat1)));
if (hkheartbeat - beat1 > 1)
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "blr_close_binlog took %d maxscale beats",
hkheartbeat - beat1)));
blr_slave_rotate(router, slave, GWBUF_DATA(record));
beat1 = hkheartbeat;
beat1 = hkheartbeat;
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
{
if (rotating)
@ -1941,9 +1980,10 @@ beat1 = hkheartbeat;
dcb_close(slave->dcb);
break;
}
if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "blr_open_binlog took %d beats",
hkheartbeat - beat1)));
if (hkheartbeat - beat1 > 1)
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "blr_open_binlog took %d beats",
hkheartbeat - beat1)));
}
slave->stats.n_bytes += gwbuf_length(head);
written = slave->dcb->func.write(slave->dcb, head);
@ -2129,12 +2169,12 @@ blr_slave_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, uint8_t *ptr)
{
int len = EXTRACT24(ptr + 9); // Extract the event length
len = len - (19 + 8); // Remove length of header and position
len = len - (BINLOG_EVENT_HDR_LEN + 8); // Remove length of header and position
if (router->master_chksum)
len -= 4;
if (len > BINLOG_FNAMELEN)
len = BINLOG_FNAMELEN;
ptr += 19; // Skip header
ptr += BINLOG_EVENT_HDR_LEN; // Skip header
slave->binlog_pos = extract_field(ptr, 32);
slave->binlog_pos += (((uint64_t)extract_field(ptr+4, 32)) << 32);
memcpy(slave->binlogfile, ptr + 8, len);
@ -2170,7 +2210,7 @@ uint32_t chksum;
return 0;
binlognamelen = strlen(slave->binlogfile);
len = 19 + 8 + 4 + binlognamelen;
len = BINLOG_EVENT_HDR_LEN + 8 + 4 + binlognamelen;
/* no slave crc, remove 4 bytes */
if (slave->nocrc)
len -= 4;
@ -2226,6 +2266,8 @@ GWBUF *record, *head;
uint8_t *ptr;
uint32_t chksum;
memset(&hdr, 0, BINLOG_EVENT_HDR_LEN);
if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL)
return;
if ((record = blr_read_binlog(router, file, 4, &hdr)) == NULL)
@ -2256,6 +2298,10 @@ uint32_t chksum;
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, GWBUF_DATA(record), hdr.event_size - 4);
encode_value(ptr, chksum, 32);
slave->lastEventTimestamp = time(0);
slave->lastEventReceived = FORMAT_DESCRIPTION_EVENT;
slave->dcb->func.write(slave->dcb, head);
}
@ -3700,6 +3746,9 @@ GWBUF *pkt;
uint8_t *ptr;
int len, vers_len;
if (value == NULL)
return blr_slave_send_ok(router, slave);
vers_len = strlen(value);
blr_slave_send_fieldcount(router, slave, 1);
blr_slave_send_columndef(router, slave, variable, column_type, vers_len, 2);
@ -3740,6 +3789,9 @@ char *p = strdup(variable);
int var_len;
char *old_ptr = p;
if (value == NULL)
return 0;
/* Remove heading and trailing "'" */
if(*p == '\'')
p++;
@ -3922,8 +3974,9 @@ char *sep = " ,=";
} else
return blr_slave_replay(router, slave, router->saved_master.server_id);
} else if (strcasecmp(word, "'SERVER_UUID'") == 0) {
if (router->set_master_uuid)
if (router->set_master_uuid) {
return blr_slave_send_variable(router, slave, "'SERVER_UUID'", router->master_uuid, BLR_TYPE_STRING);
}
else
return blr_slave_replay(router, slave, router->saved_master.uuid);
} else if (strcasecmp(word, "'MAXSCALE%'") == 0) {
@ -4243,3 +4296,121 @@ char *ptr_name_start = name;
return slave->dcb->func.write(slave->dcb, pkt);
}
/**
* The heartbeat check function called from the housekeeper for registered slaves.
*
* @param router Current router instance
*/
static void
blr_send_slave_heartbeat(void *inst) {
ROUTER_SLAVE *sptr = NULL;
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *) inst;
time_t t_now = time(0);
spinlock_acquire(&router->lock);
sptr = router->slaves;
while (sptr)
{
/* skip servers with state = 0 */
if ( (sptr->state == BLRS_DUMPING) && (sptr->heartbeat > 0) && ((t_now + 1 - sptr->lastEventTimestamp) >= sptr->heartbeat) )
{
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE, "Sending Heartbeat to slave server-id %d in State %d, cstate %d. "
"Heartbeat interval is %d, last event time is %lu",
sptr->serverid, sptr->state, sptr->cstate, sptr->heartbeat,
(unsigned long)sptr->lastEventTimestamp)));
blr_slave_send_heartbeat(router, sptr);
sptr->lastEventTimestamp = t_now;
sptr->lastEventReceived = HEARTBEAT_EVENT;
}
sptr = sptr->next;
}
spinlock_release(&router->lock);
}
/**
* Create and send an hearbeat packet to be sent to a registered slave server
*/
static int
blr_slave_send_heartbeat(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
{
REP_HEADER hdr;
GWBUF *resp;
uint8_t *ptr;
int len = BINLOG_EVENT_HDR_LEN;
uint32_t chksum;
/* Add CRC32 4 bytes */
if (!slave->nocrc)
len +=4;
/* add binlogname to data content len */
len += strlen(slave->binlogfile);
/**
* Alloc buffer for network binlog stream:
*
* 4 bytes header (3 for pkt len + 1 seq.no)
* 1 byte for Ok / ERR
* n bytes data content
*
* Total = 5 bytes + len
*/
resp = gwbuf_alloc(5 + len);
/* The OK/Err byte is part of payload */
hdr.payload_len = len + 1;
/* Add sequence no */
hdr.seqno = slave->seqno++;
/* Add OK */
hdr.ok = 0;
/* Add timestamp: 0 */
hdr.timestamp = 0L;
/* Set Event Type */
hdr.event_type = HEARTBEAT_EVENT;
/* Add master server id */
hdr.serverid = router->masterid;
/* Add event size */
hdr.event_size = len;
/* Add Next Pos */
hdr.next_pos = slave->binlog_pos;
/* Add flags */
hdr.flags = 0x20;
/* point just after the header */
ptr = blr_build_header(resp, &hdr);
/* Copy binlog name */
memcpy(ptr, slave->binlogfile, BINLOG_FNAMELEN);
ptr += strlen(slave->binlogfile);
/* Add the CRC32 */
if (!slave->nocrc)
{
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, GWBUF_DATA(resp) + 5, hdr.event_size - 4);
encode_value(ptr, chksum, 32);
}
/* Write the packet */
return slave->dcb->func.write(slave->dcb, resp);
}