Merge branch 'develop' into MAX-324
This commit is contained in:
@ -181,6 +181,8 @@ unsigned char *defuuid;
|
||||
spinlock_init(&inst->binlog_lock);
|
||||
|
||||
inst->binlog_fd = -1;
|
||||
inst->master_chksum = true;
|
||||
inst->master_uuid = NULL;
|
||||
|
||||
inst->low_water = DEF_LOW_WATER;
|
||||
inst->high_water = DEF_HIGH_WATER;
|
||||
|
||||
@ -62,7 +62,7 @@ static void blr_log_header(logfile_id_t file, char *msg, uint8_t *ptr);
|
||||
|
||||
/**
|
||||
* Initialise the binlog file for this instance. MaxScale will look
|
||||
* for all the binlogs that it has on local disk, determien the next
|
||||
* for all the binlogs that it has on local disk, determine the next
|
||||
* binlog to use and initialise it for writing, determining the
|
||||
* next record to be fetched from the real master.
|
||||
*
|
||||
@ -354,6 +354,10 @@ int n;
|
||||
unsigned long filelen = 0;
|
||||
struct stat statb;
|
||||
|
||||
if (!file)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
if (fstat(file->fd, &statb) == 0)
|
||||
filelen = statb.st_size;
|
||||
if (pos >= filelen)
|
||||
@ -695,3 +699,26 @@ GWBUF *buf;
|
||||
close(fd);
|
||||
return buf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Does the next binlog file in the sequence for the slave exist.
|
||||
*
|
||||
* @param router The router instance
|
||||
* @param slave The slave in question
|
||||
* @retuen 0 if the next file does not exist
|
||||
*/
|
||||
int
|
||||
blr_file_next_exists(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
|
||||
{
|
||||
char *sptr, buf[80], bigbuf[4096];
|
||||
int filenum;
|
||||
|
||||
if ((sptr = strrchr(slave->binlogfile, '.')) == NULL)
|
||||
return 0;
|
||||
filenum = atoi(sptr + 1);
|
||||
sprintf(buf, BINLOG_NAMEFMT, router->fileroot, filenum + 1);
|
||||
sprintf(bigbuf, "%s/%s", router->binlogdir, buf);
|
||||
if (access(bigbuf, R_OK) == -1)
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -79,6 +79,8 @@ void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
|
||||
inline uint32_t extract_field(uint8_t *src, int bits);
|
||||
static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len);
|
||||
static void blr_master_close(ROUTER_INSTANCE *);
|
||||
static char *blr_extract_column(GWBUF *buf, int col);
|
||||
|
||||
static int keepalive = 1;
|
||||
|
||||
/**
|
||||
@ -384,6 +386,9 @@ char query[128];
|
||||
router->retry_backoff = 1;
|
||||
break;
|
||||
case BLRM_SERVERID:
|
||||
{
|
||||
char *val = blr_extract_column(buf, 1);
|
||||
|
||||
// Response to fetch of master's server-id
|
||||
if (router->saved_master.server_id)
|
||||
GWBUF_CONSUME_ALL(router->saved_master.server_id);
|
||||
@ -398,6 +403,7 @@ char query[128];
|
||||
router->master_state = BLRM_HBPERIOD;
|
||||
router->master->func.write(router->master, buf);
|
||||
break;
|
||||
}
|
||||
case BLRM_HBPERIOD:
|
||||
// Response to set the heartbeat period
|
||||
if (router->saved_master.heartbeat)
|
||||
@ -419,6 +425,15 @@ char query[128];
|
||||
router->master->func.write(router->master, buf);
|
||||
break;
|
||||
case BLRM_CHKSUM2:
|
||||
{
|
||||
char *val = blr_extract_column(buf, 1);
|
||||
|
||||
if (val && strncasecmp(val, "NONE", 4) == 0)
|
||||
{
|
||||
router->master_chksum = false;
|
||||
}
|
||||
if (val)
|
||||
free(val);
|
||||
// Response to the master_binlog_checksum, should be stored
|
||||
if (router->saved_master.chksum2)
|
||||
GWBUF_CONSUME_ALL(router->saved_master.chksum2);
|
||||
@ -428,6 +443,7 @@ char query[128];
|
||||
router->master_state = BLRM_GTIDMODE;
|
||||
router->master->func.write(router->master, buf);
|
||||
break;
|
||||
}
|
||||
case BLRM_GTIDMODE:
|
||||
// Response to the GTID_MODE, should be stored
|
||||
if (router->saved_master.gtid_mode)
|
||||
@ -439,6 +455,10 @@ char query[128];
|
||||
router->master->func.write(router->master, buf);
|
||||
break;
|
||||
case BLRM_MUUID:
|
||||
{
|
||||
char *val = blr_extract_column(buf, 1);
|
||||
router->master_uuid = val;
|
||||
|
||||
// Response to the SERVER_UUID, should be stored
|
||||
if (router->saved_master.uuid)
|
||||
GWBUF_CONSUME_ALL(router->saved_master.uuid);
|
||||
@ -449,6 +469,7 @@ char query[128];
|
||||
router->master_state = BLRM_SUUID;
|
||||
router->master->func.write(router->master, buf);
|
||||
break;
|
||||
}
|
||||
case BLRM_SUUID:
|
||||
// Response to the SET @server_uuid, should be stored
|
||||
if (router->saved_master.setslaveuuid)
|
||||
@ -874,30 +895,33 @@ static REP_HEADER phdr;
|
||||
* First check that the checksum we calculate matches the
|
||||
* checksum in the packet we received.
|
||||
*/
|
||||
uint32_t chksum, pktsum;
|
||||
|
||||
chksum = crc32(0L, NULL, 0);
|
||||
chksum = crc32(chksum, ptr + 5, hdr.event_size - 4);
|
||||
pktsum = EXTRACT32(ptr + hdr.event_size + 1);
|
||||
if (pktsum != chksum)
|
||||
if (router->master_chksum)
|
||||
{
|
||||
router->stats.n_badcrc++;
|
||||
if (msg)
|
||||
uint32_t chksum, pktsum;
|
||||
|
||||
chksum = crc32(0L, NULL, 0);
|
||||
chksum = crc32(chksum, ptr + 5, hdr.event_size - 4);
|
||||
pktsum = EXTRACT32(ptr + hdr.event_size + 1);
|
||||
if (pktsum != chksum)
|
||||
{
|
||||
free(msg);
|
||||
msg = NULL;
|
||||
router->stats.n_badcrc++;
|
||||
if (msg)
|
||||
{
|
||||
free(msg);
|
||||
msg = NULL;
|
||||
}
|
||||
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
|
||||
"%s: Checksum error in event "
|
||||
"from master, "
|
||||
"binlog %s @ %d. "
|
||||
"Closing master connection.",
|
||||
router->service->name,
|
||||
router->binlog_name,
|
||||
router->binlog_position)));
|
||||
blr_master_close(router);
|
||||
blr_master_delayed_connect(router);
|
||||
return;
|
||||
}
|
||||
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
|
||||
"%s: Checksum error in event "
|
||||
"from master, "
|
||||
"binlog %s @ %d. "
|
||||
"Closing master connection.",
|
||||
router->service->name,
|
||||
router->binlog_name,
|
||||
router->binlog_position)));
|
||||
blr_master_close(router);
|
||||
blr_master_delayed_connect(router);
|
||||
return;
|
||||
}
|
||||
router->stats.n_binlogs++;
|
||||
router->lastEventReceived = hdr.event_type;
|
||||
@ -1145,6 +1169,8 @@ char file[BINLOG_FNAMELEN+1];
|
||||
pos <<= 32;
|
||||
pos |= extract_field(ptr, 32);
|
||||
slen = len - (8 + 4); // Allow for position and CRC
|
||||
if (router->master_chksum == 0)
|
||||
slen += 4;
|
||||
if (slen > BINLOG_FNAMELEN)
|
||||
slen = BINLOG_FNAMELEN;
|
||||
memcpy(file, ptr + 8, slen);
|
||||
@ -1274,7 +1300,7 @@ int action;
|
||||
memcpy(buf, ptr, hdr->event_size);
|
||||
if (hdr->event_type == ROTATE_EVENT)
|
||||
{
|
||||
blr_slave_rotate(slave, ptr);
|
||||
blr_slave_rotate(router, slave, ptr);
|
||||
}
|
||||
slave->stats.n_bytes += gwbuf_length(pkt);
|
||||
slave->dcb->func.write(slave->dcb, pkt);
|
||||
@ -1398,3 +1424,59 @@ blr_master_connected(ROUTER_INSTANCE *router)
|
||||
{
|
||||
return router->master_state == BLRM_BINLOGDUMP;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract a result value from the set of messages that make up a
|
||||
* MySQL response packet.
|
||||
*
|
||||
* @param buf The GWBUF containing the response
|
||||
* @param col The column number to return
|
||||
* @return The result form the column or NULL. The caller must free the result
|
||||
*/
|
||||
static char *
|
||||
blr_extract_column(GWBUF *buf, int col)
|
||||
{
|
||||
uint8_t *ptr;
|
||||
int len, ncol, collen;
|
||||
char *rval;
|
||||
|
||||
ptr = (uint8_t *)GWBUF_DATA(buf);
|
||||
/* First packet should be the column count */
|
||||
len = EXTRACT24(ptr);
|
||||
ptr += 3;
|
||||
if (*ptr != 1) // Check sequence number is 1
|
||||
return NULL;
|
||||
ptr++;
|
||||
ncol = *ptr++;
|
||||
if (ncol < col) // Not that many column in result
|
||||
return NULL;
|
||||
// Now ptr points at the column definition
|
||||
while (ncol-- > 0)
|
||||
{
|
||||
len = EXTRACT24(ptr);
|
||||
ptr += 4; // Skip to payload
|
||||
ptr += len; // Skip over payload
|
||||
}
|
||||
// Now we should have an EOF packet
|
||||
len = EXTRACT24(ptr);
|
||||
ptr += 4; // Skip to payload
|
||||
if (*ptr != 0xfe)
|
||||
return NULL;
|
||||
ptr += len;
|
||||
|
||||
// Finally we have reached the row
|
||||
len = EXTRACT24(ptr);
|
||||
ptr += 4;
|
||||
while (--col > 0)
|
||||
{
|
||||
collen = *ptr++;
|
||||
ptr += collen;
|
||||
}
|
||||
collen = *ptr++;
|
||||
if ((rval = malloc(collen + 1)) == NULL)
|
||||
return NULL;
|
||||
memcpy(rval, ptr, collen);
|
||||
rval[collen] = 0; // NULL terminate
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
@ -614,7 +614,7 @@ int len, file_len;
|
||||
|
||||
sprintf(file, "%s", router->binlog_name);
|
||||
file_len = strlen(file);
|
||||
sprintf(position, "%d", router->binlog_position);
|
||||
sprintf(position, "%ld", router->binlog_position);
|
||||
len = 5 + file_len + strlen(position) + 1 + 3;
|
||||
if ((pkt = gwbuf_alloc(len)) == NULL)
|
||||
return 0;
|
||||
@ -859,7 +859,8 @@ int len, actual_len, col_len, seqno, ncols, i;
|
||||
|
||||
*ptr++ = 0;
|
||||
|
||||
sprintf(column, "%s", router->uuid);
|
||||
sprintf(column, "%s", router->master_uuid ?
|
||||
router->master_uuid : router->uuid);
|
||||
col_len = strlen(column);
|
||||
*ptr++ = col_len; // Length of result string
|
||||
strncpy((char *)ptr, column, col_len); // Result string
|
||||
@ -1078,6 +1079,15 @@ uint32_t chksum;
|
||||
ptr = GWBUF_DATA(queue);
|
||||
len = extract_field(ptr, 24);
|
||||
binlognamelen = len - 11;
|
||||
if (binlognamelen > BINLOG_FNAMELEN)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"blr_slave_binlog_dump truncating binlog filename "
|
||||
"from %d to %d",
|
||||
binlognamelen, BINLOG_FNAMELEN)));
|
||||
binlognamelen = BINLOG_FNAMELEN;
|
||||
}
|
||||
ptr += 4; // Skip length and sequence number
|
||||
if (*ptr++ != COM_BINLOG_DUMP)
|
||||
{
|
||||
@ -1097,6 +1107,13 @@ uint32_t chksum;
|
||||
strncpy(slave->binlogfile, (char *)ptr, binlognamelen);
|
||||
slave->binlogfile[binlognamelen] = 0;
|
||||
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%s: COM_BINLOG_DUMP: binlog name '%s', length %d, "
|
||||
"from position %d.", router->service->name,
|
||||
slave->binlogfile, binlognamelen,
|
||||
slave->binlog_pos)));
|
||||
|
||||
slave->seqno = 1;
|
||||
|
||||
|
||||
@ -1333,7 +1350,7 @@ unsigned long beat1 = hkheartbeat;
|
||||
if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
|
||||
LOGFILE_ERROR, "blr_close_binlog took %d beats",
|
||||
hkheartbeat - beat1)));
|
||||
blr_slave_rotate(slave, GWBUF_DATA(record));
|
||||
blr_slave_rotate(router, slave, GWBUF_DATA(record));
|
||||
beat1 = hkheartbeat;
|
||||
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
|
||||
{
|
||||
@ -1440,7 +1457,8 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
|
||||
if (slave->binlog_pos >= blr_file_size(slave->file)
|
||||
&& router->rotating == 0
|
||||
&& strcmp(router->binlog_name, slave->binlogfile) != 0
|
||||
&& blr_master_connected(router))
|
||||
&& (blr_master_connected(router)
|
||||
|| blr_file_next_exists(router, slave)))
|
||||
{
|
||||
/* We may have reached the end of file of a non-current
|
||||
* binlog file.
|
||||
@ -1470,7 +1488,7 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
|
||||
dcb_close(slave->dcb);
|
||||
}
|
||||
}
|
||||
else
|
||||
else if (blr_master_connected(router))
|
||||
{
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
slave->cstate |= CS_EXPECTCB;
|
||||
@ -1537,11 +1555,13 @@ ROUTER_INSTANCE *router = slave->router;
|
||||
* @param ptr The rotate event (minus header and OK byte)
|
||||
*/
|
||||
void
|
||||
blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr)
|
||||
blr_slave_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, uint8_t *ptr)
|
||||
{
|
||||
int len = EXTRACT24(ptr + 9); // Extract the event length
|
||||
|
||||
len = len - (19 + 8 + 4); // Remove length of header, checksum and position
|
||||
len = len - (19 + 8); // Remove length of header and position
|
||||
if (router->master_chksum)
|
||||
len -= 4;
|
||||
if (len > BINLOG_FNAMELEN)
|
||||
len = BINLOG_FNAMELEN;
|
||||
ptr += 19; // Skip header
|
||||
|
||||
@ -325,7 +325,7 @@ static int hashkeyfun(
|
||||
while((c = *ptr++)){
|
||||
hash = c + (hash << 6) + (hash << 16) - hash;
|
||||
}
|
||||
return *(int *)key;
|
||||
return hash;
|
||||
}
|
||||
|
||||
static int hashcmpfun(
|
||||
|
||||
Reference in New Issue
Block a user