Merge branch 'blr' into release-1.0GA

Conflicts:
	server/modules/protocol/mysql_backend.c
	server/modules/routing/binlog/blr_master.c
This commit is contained in:
Mark Riddoch
2014-12-05 11:42:02 +00:00
7 changed files with 483 additions and 247 deletions

View File

@ -428,6 +428,7 @@ ROUTER_SLAVE *slave;
slave->router = inst;
slave->file = NULL;
strcpy(slave->binlogfile, "unassigned");
slave->connect_time = time(0);
/**
* Add this session to the list of active sessions.
@ -521,9 +522,13 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
{
/*
* We must be closing the master session.
*
* TODO: Handle closure of master session
*/
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"%s: Master %s disconnected after %ld seconds. "
"%d events read,",
router->service->name, router->master->remote,
time(0) - router->connect_time, router->stats.n_binlogs_ses)));
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Binlog router close session with master server %s",
@ -541,6 +546,15 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
/* decrease server registered slaves counter */
atomic_add(&router->stats.n_registered, -1);
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"%s: Slave %s, server id %d, disconnected after %ld seconds. "
"%d events sent, %lu bytes.",
router->service->name, slave->dcb->remote,
slave->serverid,
time(0) - slave->connect_time, slave->stats.n_events,
slave->stats.n_bytes)));
/*
* Mark the slave as unregistered to prevent the forwarding
* of any more binlog records to this slave.
@ -755,19 +769,44 @@ struct tm tm;
min15 /= 15.0;
min10 /= 10.0;
min5 /= 5.0;
dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid);
dcb_printf(dcb,
"\t\tServer-id: %d\n",
session->serverid);
if (session->hostname)
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
dcb_printf(dcb, "\t\tSlave DCB: %p\n", session->dcb);
dcb_printf(dcb, "\t\tNext Sequence No: %d\n", session->seqno);
dcb_printf(dcb, "\t\tState: %s\n", blrs_states[session->state]);
dcb_printf(dcb, "\t\tBinlog file: %s\n", session->binlogfile);
dcb_printf(dcb, "\t\tBinlog position: %u\n", session->binlog_pos);
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
dcb_printf(dcb,
"\t\tSlave: %d\n",
session->dcb->remote);
dcb_printf(dcb,
"\t\tSlave DCB: %p\n",
session->dcb);
dcb_printf(dcb,
"\t\tNext Sequence No: %d\n",
session->seqno);
dcb_printf(dcb,
"\t\tState: %s\n",
blrs_states[session->state]);
dcb_printf(dcb,
"\t\tBinlog file: %s\n",
session->binlogfile);
dcb_printf(dcb,
"\t\tBinlog position: %u\n",
session->binlog_pos);
if (session->nocrc)
dcb_printf(dcb, "\t\tMaster Binlog CRC: None\n");
dcb_printf(dcb, "\t\tNo. requests: %u\n", session->stats.n_requests);
dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events);
dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts);
dcb_printf(dcb,
"\t\tMaster Binlog CRC: None\n");
dcb_printf(dcb,
"\t\tNo. requests: %u\n",
session->stats.n_requests);
dcb_printf(dcb,
"\t\tNo. events sent: %u\n",
session->stats.n_events);
dcb_printf(dcb,
"\t\tNo. bursts sent: %u\n",
session->stats.n_bursts);
dcb_printf(dcb,
"\t\tNo. transitions to follow mode: %u\n",
session->stats.n_bursts);
minno = session->stats.minno - 1;
if (minno == -1)
minno = 30;
@ -776,15 +815,18 @@ struct tm tm;
dcb_printf(dcb, "\t\t %6d %8.1f %8.1f %8.1f %8.1f\n",
session->stats.minavgs[minno], min5, min10,
min15, min30);
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd);
dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb);
dcb_printf(dcb, "\t\tNo. of low water cbs N/A %u\n", session->stats.n_cbna);
dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read);
dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun);
dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]);
dcb_printf(dcb, "\t\tNo. of distribute action 2 %u\n", session->stats.n_actions[1]);
dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]);
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd);
dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb);
dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read);
#if DETAILED_DIAG
dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun);
dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]);
dcb_printf(dcb, "\t\tNo. of distribute action 2 %u\n", session->stats.n_actions[1]);
dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]);
#endif
if ((session->cstate & CS_UPTODATE) == 0)
{
dcb_printf(dcb, "\t\tSlave is in catchup mode. %s%s\n",
@ -809,7 +851,7 @@ struct tm tm;
dcb_printf(dcb, "\tSpinlock statistics (rses_lock):\n");
spinlock_stats(&session->rses_lock, spin_reporter, dcb);
#endif
dcb_printf(dcb, "\n");
dcb_printf(dcb, "\t\t--------------------\n\n");
session = session->next;
}
spinlock_release(&router_inst->lock);
@ -838,6 +880,24 @@ ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
router->stats.lastReply = time(0);
}
static char *
extract_message(GWBUF *errpkt)
{
char *rval;
int len;
len = EXTRACT24(errpkt->start);
if ((rval = (char *)malloc(len)) == NULL)
return NULL;
memcpy(rval, (char *)(errpkt->start) + 7, 6);
rval[6] = ' ';
memcpy(&rval[7], (char *)(errpkt->start) + 13, len - 8);
rval[len-2] = 0;
return rval;
}
/**
* Error Reply routine
*
@ -857,10 +917,10 @@ errorReply(ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
int error, len;
char msg[85];
char msg[85], *errmsg;
len = sizeof(error);
if (getsockopt(router->master->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0)
if (router->master && getsockopt(router->master->fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0 && error != 0)
{
strerror_r(error, msg, 80);
strcat(msg, " ");
@ -868,10 +928,21 @@ char msg[85];
else
strcpy(msg, "");
errmsg = extract_message(message);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, "Master connection '%s', %sattempting reconnect to master",
message, msg)));
LOGFILE_ERROR, "%s: Master connection error '%s' in state '%s', "
"%sattempting reconnect to master",
router->service->name, errmsg,
blrm_states[router->master_state], msg)));
if (errmsg)
free(errmsg);
*succp = true;
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"%s: Master %s disconnected after %ld seconds. "
"%d events read.",
router->service->name, router->master->remote,
time(0) - router->connect_time, router->stats.n_binlogs_ses)));
blr_master_reconnect(router);
}

View File

@ -364,7 +364,7 @@ struct stat statb;
"Short read when reading the header. "
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %d",
file->binlogname, pos, n)));
n, file->binlogname, pos)));
break;
}
return NULL;
@ -375,6 +375,17 @@ struct stat statb;
hdr->event_size = extract_field(&hdbuf[9], 32);
hdr->next_pos = EXTRACT32(&hdbuf[13]);
hdr->flags = EXTRACT16(&hdbuf[17]);
if (hdr->event_type > MAX_EVENT_TYPE)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Invalid event type 0x%x. "
"Binlog file is %s, position %d",
hdr->event_type,
file->binlogname, pos)));
return NULL;
}
if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,

View File

@ -94,6 +94,17 @@ DCB *client;
GWBUF *buf;
router->stats.n_binlogs_ses = 0;
spinlock_acquire(&router->lock);
if (router->master_state != BLRM_UNCONNECTED)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"%s: Master Connect: Unexpected master state %s\n",
router->service->name, blrm_states[router->master_state])));
spinlock_release(&router->lock);
return;
}
router->master_state = BLRM_CONNECTING;
spinlock_release(&router->lock);
if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
@ -101,8 +112,8 @@ GWBUF *buf;
return;
}
router->client = client;
client->state = DCB_STATE_POLLING; /* Fake the client is reading */
client->data = CreateMySQLAuthData(router->user, router->password, "");
client->state = DCB_STATE_POLLING; // Lie to keep the protocol module happy
if ((router->session = session_alloc(router->service, client)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
@ -112,17 +123,27 @@ GWBUF *buf;
client->session = router->session;
if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL)
{
char *name = malloc(strlen(router->service->name) + strlen(" Master") + 1);
sprintf(name, "%s Master", router->service->name);
hktask_oneshot(name, blr_start_master, router,
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
char *name;
if ((name = malloc(strlen(router->service->name)
+ strlen(" Master") + 1)) != NULL)
{
sprintf(name, "%s Master", router->service->name);
hktask_oneshot(name, blr_start_master, router,
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
}
if (router->retry_backoff > BLR_MAX_BACKOFF)
router->retry_backoff = 1;
router->retry_backoff = BLR_MAX_BACKOFF;
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Binlog router: failed to connect to master server '%s'",
router->service->databases->unique_name)));
return;
}
router->master->remote = strdup(router->service->databases->name);
LOGIF(LM,(skygw_log_write(
LOGFILE_MESSAGE,
"%s: atempting to connect to master server %s.",
router->service->name, router->master->remote)));
router->connect_time = time(0);
if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive )))
perror("setsockopt");
@ -133,7 +154,6 @@ perror("setsockopt");
router->master_state = BLRM_TIMESTAMP;
router->stats.n_masterstarts++;
router->retry_backoff = 1;
}
/**
@ -164,7 +184,27 @@ GWBUF *ptr;
router->reconnect_pending = 0;
router->active_logs = 0;
spinlock_release(&router->lock);
blr_start_master(router);
if (router->master_state < BLRM_BINLOGDUMP)
{
char *name;
router->master_state = BLRM_UNCONNECTED;
if ((name = malloc(strlen(router->service->name)
+ strlen(" Master")+1)) != NULL);
{
sprintf(name, "%s Master", router->service->name);
hktask_oneshot(name, blr_start_master, router,
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
}
if (router->retry_backoff > BLR_MAX_BACKOFF)
router->retry_backoff = BLR_MAX_BACKOFF;
}
else
{
router->master_state = BLRM_UNCONNECTED;
blr_start_master(router);
}
}
/**
@ -229,8 +269,9 @@ char query[128];
if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.",
router->master_state)));
LOGFILE_ERROR,
"Invalid master state machine state (%d) for binlog router.",
router->master_state)));
gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->lock);
if (router->reconnect_pending)
@ -238,6 +279,12 @@ char query[128];
router->active_logs = 0;
spinlock_release(&router->lock);
atomic_add(&router->handling_threads, -1);
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"%s: Pending reconnect in state %s.",
router->service->name,
blrm_states[router->master_state]
)));
blr_restart_master(router);
return;
}
@ -251,8 +298,11 @@ char query[128];
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Received error: %d, %s from master during %s phase of the master state machine.",
MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state]
"%s: Received error: %d, %s from master during %s phase "
"of the master state machine.",
router->service->name,
MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf),
blrm_states[router->master_state]
)));
gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->lock);
@ -276,6 +326,7 @@ char query[128];
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'");
router->master_state = BLRM_SERVERID;
router->master->func.write(router->master, buf);
router->retry_backoff = 1;
break;
case BLRM_SERVERID:
// Response to fetch of master's server-id
@ -365,6 +416,12 @@ char query[128];
buf = blr_make_binlog_dump(router);
router->master_state = BLRM_BINLOGDUMP;
router->master->func.write(router->master, buf);
LOGIF(LM,(skygw_log_write(
LOGFILE_MESSAGE,
"%s: Request binlog records from %s at "
"position %d from master server %s.",
router->service->name, router->binlog_name,
router->binlog_position, router->master->remote)));
break;
case BLRM_BINLOGDUMP:
// Main body, we have received a binlog record from the master
@ -626,134 +683,172 @@ static REP_HEADER phdr;
n_bufs = 1;
}
blr_extract_header(ptr, &hdr);
if (hdr.event_size != len - 5)
if (len < BINLOG_EVENT_HDR_LEN)
{
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"Packet length is %d, but event size is %d, "
"binlog file %s position %d"
"reslen is %d and preslen is %d, "
"length of previous event %d. %s",
len, hdr.event_size,
router->binlog_name,
router->binlog_position,
reslen, preslen, prev_length,
(prev_length == -1 ?
(no_residual ? "No residual data from previous call" : "Residual data from previous call") : "")
)));
blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len);
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"This event (0x%x) was contained in %d GWBUFs, "
"the previous events was contained in %d GWBUFs",
router->lastEventReceived, n_bufs, pn_bufs)));
if (msg)
char *msg = "";
if (ptr[4] == 0xfe) /* EOF Packet */
{
free(msg);
msg = NULL;
msg = "end of file";
}
break;
else if (ptr[4] == 0xff) /* EOF Packet */
{
msg = "error";
}
LOGIF(LM,(skygw_log_write(
LOGFILE_MESSAGE,
"Non-event message (%s) from master.",
msg)));
}
phdr = hdr;
if (hdr.ok == 0)
else
{
router->stats.n_binlogs++;
router->stats.n_binlogs_ses++;
router->lastEventReceived = hdr.event_type;
blr_extract_header(ptr, &hdr);
if (hdr.event_size != len - 5)
{
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"Packet length is %d, but event size is %d, "
"binlog file %s position %d "
"reslen is %d and preslen is %d, "
"length of previous event %d. %s",
len, hdr.event_size,
router->binlog_name,
router->binlog_position,
reslen, preslen, prev_length,
(prev_length == -1 ?
(no_residual ? "No residual data from previous call" : "Residual data from previous call") : "")
)));
blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len);
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"This event (0x%x) was contained in %d GWBUFs, "
"the previous events was contained in %d GWBUFs",
router->lastEventReceived, n_bufs, pn_bufs)));
if (msg)
{
free(msg);
msg = NULL;
}
break;
}
phdr = hdr;
if (hdr.ok == 0)
{
router->stats.n_binlogs++;
router->lastEventReceived = hdr.event_type;
// #define SHOW_EVENTS
#ifdef SHOW_EVENTS
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
#endif
if (hdr.event_type >= 0 && hdr.event_type < 0x24)
router->stats.events[hdr.event_type]++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
{
// Fake format description message
LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG,
"Replication fake event. "
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_fakeevents++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
if (hdr.event_type >= 0 && hdr.event_type < 0x24)
router->stats.events[hdr.event_type]++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
{
/*
* We need to save this to replay to new
* slaves that attach later.
*/
if (router->saved_master.fde_event)
free(router->saved_master.fde_event);
router->saved_master.fde_len = hdr.event_size;
router->saved_master.fde_event = malloc(hdr.event_size);
if (router->saved_master.fde_event)
memcpy(router->saved_master.fde_event,
ptr + 5, hdr.event_size);
// Fake format description message
LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG,
"Replication fake event. "
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_fakeevents++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
{
uint8_t *new_fde;
unsigned int new_fde_len;
/*
* We need to save this to replay to new
* slaves that attach later.
*/
new_fde_len = hdr.event_size;
new_fde = malloc(hdr.event_size);
if (new_fde)
{
memcpy(new_fde, ptr + 5, hdr.event_size);
if (router->saved_master.fde_event)
free(router->saved_master.fde_event);
router->saved_master.fde_event = new_fde;
router->saved_master.fde_len = new_fde_len;
}
else
{
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"%s: Received a format description "
"event that MaxScale was unable to "
"record. Event length is %d.",
router->service->name,
hdr.event_size)));
blr_log_packet(LOGFILE_ERROR,
"Format Description Event:", ptr, len);
}
}
}
else
{
if (hdr.event_type == HEARTBEAT_EVENT)
{
#ifdef SHOW_EVENTS
printf("Replication heartbeat\n");
#endif
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Replication heartbeat. "
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_heartbeats++;
}
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
{
ptr = ptr + 5; // We don't put the first byte of the payload
// into the binlog file
if (hdr.event_type == ROTATE_EVENT)
router->rotating = 1;
blr_write_binlog_record(router, &hdr, ptr);
if (hdr.event_type == ROTATE_EVENT)
{
blr_rotate_event(router, ptr, &hdr);
}
blr_distribute_binlog_record(router, &hdr, ptr);
}
else
{
router->stats.n_artificial++;
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Artificial event not written "
"to disk or distributed. "
"Type 0x%x, Length %d, Binlog "
"%s @ %d.",
hdr.event_type,
hdr.event_size,
router->binlog_name,
router->binlog_position)));
ptr += 5;
if (hdr.event_type == ROTATE_EVENT)
{
router->rotating = 1;
blr_rotate_event(router, ptr, &hdr);
}
}
}
}
else
{
if (hdr.event_type == HEARTBEAT_EVENT)
{
#ifdef SHOW_EVENTS
printf("Replication heartbeat\n");
#endif
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Replication heartbeat. "
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_heartbeats++;
}
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
{
ptr = ptr + 5; // We don't put the first byte of the payload
// into the binlog file
if (hdr.event_type == ROTATE_EVENT)
router->rotating = 1;
blr_write_binlog_record(router, &hdr, ptr);
if (hdr.event_type == ROTATE_EVENT)
{
blr_rotate_event(router, ptr, &hdr);
}
blr_distribute_binlog_record(router, &hdr, ptr);
}
else
{
router->stats.n_artificial++;
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Artificial event not written "
"to disk or distributed. "
"Type 0x%x, Length %d, Binlog "
"%s @ %d.",
hdr.event_type,
hdr.event_size,
router->binlog_name,
router->binlog_position)));
ptr += 5;
if (hdr.event_type == ROTATE_EVENT)
{
router->rotating = 1;
blr_rotate_event(router, ptr, &hdr);
}
}
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"Error packet in binlog stream.%s @ %d.",
router->binlog_name,
router->binlog_position)));
blr_log_packet(LOGFILE_ERROR, "Error Packet:",
ptr, len);
router->stats.n_binlog_errors++;
}
}
else
{
printf("Binlog router error: %s\n", &ptr[7]);
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"Error packet in binlog stream.%s @ %d.",
router->binlog_name,
router->binlog_position)));
blr_log_packet(LOGFILE_ERROR, "Error Packet:",
ptr, len);
router->stats.n_binlog_errors++;
}
if (msg)
{
@ -977,6 +1072,7 @@ int action;
{
blr_slave_rotate(slave, ptr);
}
slave->stats.n_bytes += gwbuf_length(pkt);
slave->dcb->func.write(slave->dcb, pkt);
if (hdr->event_type != ROTATE_EVENT)
{

View File

@ -65,6 +65,7 @@ int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static void blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
@ -546,29 +547,8 @@ uint32_t chksum;
rval = slave->dcb->func.write(slave->dcb, resp);
/* Send the FORMAT_DESCRIPTION_EVENT */
if (router->saved_master.fde_event)
{
resp = gwbuf_alloc(router->saved_master.fde_len + 5);
ptr = GWBUF_DATA(resp);
encode_value(ptr, router->saved_master.fde_len + 1, 24); // Payload length
ptr += 3;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
memcpy(ptr, router->saved_master.fde_event, router->saved_master.fde_len);
encode_value(ptr, time(0), 32); // Overwrite timestamp
/*
* Since we have changed the timestamp we must recalculate the CRC
*
* Position ptr to the start of the event header,
* calculate a new checksum
* and write it into the header
*/
ptr = GWBUF_DATA(resp) + 5 + router->saved_master.fde_len - 4;
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, GWBUF_DATA(resp) + 5, router->saved_master.fde_len - 4);
encode_value(ptr, chksum, 32);
rval = slave->dcb->func.write(slave->dcb, resp);
}
if (slave->binlog_pos != 4)
blr_slave_send_fde(router, slave);
slave->dcb->low_water = router->low_water;
slave->dcb->high_water = router->high_water;
@ -785,6 +765,7 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "blr_open_binlog took %d beats",
hkheartbeat - beat1)));
}
slave->stats.n_bytes += gwbuf_length(head);
written = slave->dcb->func.write(slave->dcb, head);
if (written && hdr.event_type != ROTATE_EVENT)
{
@ -842,11 +823,23 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
if (state_change)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s is up to date %s, %u.",
slave->stats.n_caughtup++;
if (slave->stats.n_caughtup == 1)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s is up to date %s, %u.",
router->service->name,
slave->dcb->remote,
slave->binlogfile, slave->binlog_pos)));
}
else if ((slave->stats.n_caughtup % 50) == 0)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s is up to date %s, %u.",
router->service->name,
slave->dcb->remote,
slave->binlogfile, slave->binlog_pos)));
}
}
}
else
@ -1034,3 +1027,51 @@ uint32_t chksum;
slave->dcb->func.write(slave->dcb, resp);
return 1;
}
/**
* Send a "fake" format description event to the newly connected slave
*
* @param router The router instance
* @param slave The slave to send the event to
*/
static void
blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
{
BLFILE *file;
REP_HEADER hdr;
GWBUF *record, *head;
uint8_t *ptr;
uint32_t chksum;
if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL)
return;
if ((record = blr_read_binlog(router, file, 4, &hdr)) == NULL)
{
blr_close_binlog(router, file);
return;
}
blr_close_binlog(router, file);
head = gwbuf_alloc(5);
ptr = GWBUF_DATA(head);
encode_value(ptr, hdr.event_size + 1, 24); // Payload length
ptr += 3;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
head = gwbuf_append(head, record);
ptr = GWBUF_DATA(record);
encode_value(ptr, time(0), 32); // Overwrite timestamp
ptr += 13;
encode_value(ptr, 0, 32); // Set next position to 0
/*
* Since we have changed the timestamp we must recalculate the CRC
*
* Position ptr to the start of the event header,
* calculate a new checksum
* and write it into the header
*/
ptr = GWBUF_DATA(record) + hdr.event_size - 4;
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, GWBUF_DATA(record), hdr.event_size - 4);
encode_value(ptr, chksum, 32);
slave->dcb->func.write(slave->dcb, head);
}