Merge branch 'blr' into develop

This commit is contained in:
Mark Riddoch
2015-02-03 17:19:06 +00:00
10 changed files with 667 additions and 53 deletions

View File

@ -48,6 +48,7 @@
#include <dcb.h>
#include <spinlock.h>
#include <housekeeper.h>
#include <buffer.h>
#include <sys/types.h>
#include <sys/socket.h>
@ -104,6 +105,15 @@ GWBUF *buf;
return;
}
router->master_state = BLRM_CONNECTING;
/* Discard the queued residual data */
buf = router->residual;
while (buf)
{
buf = gwbuf_consume(buf, GWBUF_LENGTH(buf));
}
router->residual = NULL;
spinlock_release(&router->lock);
if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL)
{
@ -141,7 +151,7 @@ GWBUF *buf;
router->master->remote = strdup(router->service->dbref->server->name);
LOGIF(LM,(skygw_log_write(
LOGFILE_MESSAGE,
"%s: atempting to connect to master server %s.",
"%s: attempting to connect to master server %s.",
router->service->name, router->master->remote)));
router->connect_time = time(0);
@ -361,7 +371,10 @@ char query[128];
break;
case BLRM_SERVERID:
// Response to fetch of master's server-id
if (router->saved_master.server_id)
GWBUF_CONSUME_ALL(router->saved_master.server_id);
router->saved_master.server_id = buf;
blr_cache_response(router, "serverid", buf);
// TODO: Extract the value of server-id and place in router->master_id
{
char str[80];
@ -373,35 +386,50 @@ char query[128];
break;
case BLRM_HBPERIOD:
// Response to set the heartbeat period
if (router->saved_master.heartbeat)
GWBUF_CONSUME_ALL(router->saved_master.heartbeat);
router->saved_master.heartbeat = buf;
blr_cache_response(router, "heartbeat", buf);
buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum");
router->master_state = BLRM_CHKSUM1;
router->master->func.write(router->master, buf);
break;
case BLRM_CHKSUM1:
// Response to set the master binlog checksum
if (router->saved_master.chksum1)
GWBUF_CONSUME_ALL(router->saved_master.chksum1);
router->saved_master.chksum1 = buf;
blr_cache_response(router, "chksum1", buf);
buf = blr_make_query("SELECT @master_binlog_checksum");
router->master_state = BLRM_CHKSUM2;
router->master->func.write(router->master, buf);
break;
case BLRM_CHKSUM2:
// Response to the master_binlog_checksum, should be stored
if (router->saved_master.chksum2)
GWBUF_CONSUME_ALL(router->saved_master.chksum2);
router->saved_master.chksum2 = buf;
blr_cache_response(router, "chksum2", buf);
buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE");
router->master_state = BLRM_GTIDMODE;
router->master->func.write(router->master, buf);
break;
case BLRM_GTIDMODE:
// Response to the GTID_MODE, should be stored
if (router->saved_master.gtid_mode)
GWBUF_CONSUME_ALL(router->saved_master.gtid_mode);
router->saved_master.gtid_mode = buf;
blr_cache_response(router, "gtidmode", buf);
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'");
router->master_state = BLRM_MUUID;
router->master->func.write(router->master, buf);
break;
case BLRM_MUUID:
// Response to the SERVER_UUID, should be stored
if (router->saved_master.uuid)
GWBUF_CONSUME_ALL(router->saved_master.uuid);
router->saved_master.uuid = buf;
blr_cache_response(router, "uuid", buf);
sprintf(query, "SET @slave_uuid='%s'", router->uuid);
buf = blr_make_query(query);
router->master_state = BLRM_SUUID;
@ -409,49 +437,80 @@ char query[128];
break;
case BLRM_SUUID:
// Response to the SET @server_uuid, should be stored
if (router->saved_master.setslaveuuid)
GWBUF_CONSUME_ALL(router->saved_master.setslaveuuid);
router->saved_master.setslaveuuid = buf;
blr_cache_response(router, "ssuuid", buf);
buf = blr_make_query("SET NAMES latin1");
router->master_state = BLRM_LATIN1;
router->master->func.write(router->master, buf);
break;
case BLRM_LATIN1:
// Response to the SET NAMES latin1, should be stored
if (router->saved_master.setnames)
GWBUF_CONSUME_ALL(router->saved_master.setnames);
router->saved_master.setnames = buf;
blr_cache_response(router, "setnames", buf);
buf = blr_make_query("SET NAMES utf8");
router->master_state = BLRM_UTF8;
router->master->func.write(router->master, buf);
break;
case BLRM_UTF8:
// Response to the SET NAMES utf8, should be stored
if (router->saved_master.utf8)
GWBUF_CONSUME_ALL(router->saved_master.utf8);
router->saved_master.utf8 = buf;
blr_cache_response(router, "utf8", buf);
buf = blr_make_query("SELECT 1");
router->master_state = BLRM_SELECT1;
router->master->func.write(router->master, buf);
break;
case BLRM_SELECT1:
// Response to the SELECT 1, should be stored
if (router->saved_master.select1)
GWBUF_CONSUME_ALL(router->saved_master.select1);
router->saved_master.select1 = buf;
blr_cache_response(router, "select1", buf);
buf = blr_make_query("SELECT VERSION();");
router->master_state = BLRM_SELECTVER;
router->master->func.write(router->master, buf);
break;
case BLRM_SELECTVER:
// Response to SELECT VERSION should be stored
if (router->saved_master.selectver)
GWBUF_CONSUME_ALL(router->saved_master.selectver);
router->saved_master.selectver = buf;
blr_cache_response(router, "selectver", buf);
buf = blr_make_query("SELECT @@version_comment limit 1;");
router->master_state = BLRM_SELECTVERCOM;
router->master->func.write(router->master, buf);
break;
case BLRM_SELECTVERCOM:
// Response to SELECT @@version_comment should be stored
if (router->saved_master.selectvercom)
GWBUF_CONSUME_ALL(router->saved_master.selectvercom);
router->saved_master.selectvercom = buf;
blr_cache_response(router, "selectvercom", buf);
buf = blr_make_query("SELECT @@hostname;");
router->master_state = BLRM_SELECTHOSTNAME;
router->master->func.write(router->master, buf);
break;
case BLRM_SELECTHOSTNAME:
// Response to SELECT @@hostname should be stored
if (router->saved_master.selecthostname)
GWBUF_CONSUME_ALL(router->saved_master.selecthostname);
router->saved_master.selecthostname = buf;
blr_cache_response(router, "selecthostname", buf);
buf = blr_make_query("SELECT @@max_allowed_packet;");
router->master_state = BLRM_MAP;
router->master->func.write(router->master, buf);
break;
case BLRM_MAP:
// Response to SELECT @@max_allowed_packet should be stored
if (router->saved_master.map)
GWBUF_CONSUME_ALL(router->saved_master.map);
router->saved_master.map = buf;
blr_cache_response(router, "map", buf);
buf = blr_make_registration(router);
router->master_state = BLRM_REGISTER;
router->master->func.write(router->master, buf);
@ -622,6 +681,11 @@ static REP_HEADER phdr;
}
pkt_length = gwbuf_length(pkt);
/*
* Loop over all the packets while we still have some data
* and the packet length is enough to hold a replication event
* header.
*/
while (pkt && pkt_length > 24)
{
reslen = GWBUF_LENGTH(pkt);
@ -649,6 +713,7 @@ static REP_HEADER phdr;
{
len = EXTRACT24(pdata) + 4;
}
/* len is now the payload length for the packet we are working on */
if (reslen < len && pkt_length >= len)
{
@ -728,10 +793,17 @@ static REP_HEADER phdr;
n_bufs = 1;
}
/*
* ptr now points at the current message in a contiguous buffer,
* this buffer is either within the GWBUF or in a malloc'd
* copy if the message straddles GWBUF's.
*/
if (len < BINLOG_EVENT_HDR_LEN)
{
char *msg = "";
/* Packet is too small to be a binlog event */
if (ptr[4] == 0xfe) /* EOF Packet */
{
msg = "end of file";
@ -753,7 +825,7 @@ static REP_HEADER phdr;
blr_extract_header(ptr, &hdr);
if (hdr.event_size != len - 5)
if (hdr.event_size != len - 5) /* Sanity check */
{
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
@ -784,6 +856,35 @@ static REP_HEADER phdr;
phdr = hdr;
if (hdr.ok == 0)
{
/*
* First check that the checksum we calculate matches the
* checksum in the packet we received.
*/
uint32_t chksum, pktsum;
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, ptr + 5, hdr.event_size - 4);
pktsum = EXTRACT32(ptr + hdr.event_size + 1);
if (pktsum != chksum)
{
router->stats.n_badcrc++;
if (msg)
{
free(msg);
msg = NULL;
}
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"%s: Checksum error in event "
"from master, "
"binlog %s @ %d. "
"Closing master connection.",
router->service->name,
router->binlog_name,
router->binlog_position)));
blr_master_close(router);
blr_master_delayed_connect(router);
return;
}
router->stats.n_binlogs++;
router->lastEventReceived = hdr.event_type;