Add ability to extract values from the result set

Extract and action the @master_binlog_checksum value

Store master's uuid for use in show slave status

Support for MariaDB 5.5 masters
This commit is contained in:
Mark Riddoch 2015-02-04 15:59:28 +00:00
parent 77af6b19d2
commit 07536611d3
4 changed files with 107 additions and 22 deletions

View File

@ -250,6 +250,8 @@ typedef struct router_instance {
char *user; /*< User name to use with master */
char *password; /*< Password to use with master */
char *fileroot; /*< Root of binlog filename */
bool master_chksum;/*< Does the master provide checksums */
char *master_uuid; /*< UUID of the master */
DCB *master; /*< DCB for master connection */
DCB *client; /*< DCB for dummy client */
SESSION *session; /*< Fake session for master connection */

View File

@ -181,6 +181,8 @@ unsigned char *defuuid;
spinlock_init(&inst->binlog_lock);
inst->binlog_fd = -1;
inst->master_chksum = true;
inst->master_uuid = NULL;
inst->low_water = DEF_LOW_WATER;
inst->high_water = DEF_HIGH_WATER;

View File

@ -79,6 +79,8 @@ void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
inline uint32_t extract_field(uint8_t *src, int bits);
static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len);
static void blr_master_close(ROUTER_INSTANCE *);
static char *blr_extract_column(GWBUF *buf, int col);
static int keepalive = 1;
/**
@ -384,6 +386,9 @@ char query[128];
router->retry_backoff = 1;
break;
case BLRM_SERVERID:
{
char *val = blr_extract_column(buf, 1);
// Response to fetch of master's server-id
if (router->saved_master.server_id)
GWBUF_CONSUME_ALL(router->saved_master.server_id);
@ -398,6 +403,7 @@ char query[128];
router->master_state = BLRM_HBPERIOD;
router->master->func.write(router->master, buf);
break;
}
case BLRM_HBPERIOD:
// Response to set the heartbeat period
if (router->saved_master.heartbeat)
@ -419,6 +425,15 @@ char query[128];
router->master->func.write(router->master, buf);
break;
case BLRM_CHKSUM2:
{
char *val = blr_extract_column(buf, 1);
if (val && strncasecmp(val, "NONE", 4) == 0)
{
router->master_chksum = false;
}
if (val)
free(val);
// Response to the master_binlog_checksum, should be stored
if (router->saved_master.chksum2)
GWBUF_CONSUME_ALL(router->saved_master.chksum2);
@ -428,6 +443,7 @@ char query[128];
router->master_state = BLRM_GTIDMODE;
router->master->func.write(router->master, buf);
break;
}
case BLRM_GTIDMODE:
// Response to the GTID_MODE, should be stored
if (router->saved_master.gtid_mode)
@ -439,6 +455,10 @@ char query[128];
router->master->func.write(router->master, buf);
break;
case BLRM_MUUID:
{
char *val = blr_extract_column(buf, 1);
router->master_uuid = val;
// Response to the SERVER_UUID, should be stored
if (router->saved_master.uuid)
GWBUF_CONSUME_ALL(router->saved_master.uuid);
@ -449,6 +469,7 @@ char query[128];
router->master_state = BLRM_SUUID;
router->master->func.write(router->master, buf);
break;
}
case BLRM_SUUID:
// Response to the SET @server_uuid, should be stored
if (router->saved_master.setslaveuuid)
@ -874,30 +895,33 @@ static REP_HEADER phdr;
* First check that the checksum we calculate matches the
* checksum in the packet we received.
*/
uint32_t chksum, pktsum;
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, ptr + 5, hdr.event_size - 4);
pktsum = EXTRACT32(ptr + hdr.event_size + 1);
if (pktsum != chksum)
if (router->master_chksum)
{
router->stats.n_badcrc++;
if (msg)
uint32_t chksum, pktsum;
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, ptr + 5, hdr.event_size - 4);
pktsum = EXTRACT32(ptr + hdr.event_size + 1);
if (pktsum != chksum)
{
free(msg);
msg = NULL;
router->stats.n_badcrc++;
if (msg)
{
free(msg);
msg = NULL;
}
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"%s: Checksum error in event "
"from master, "
"binlog %s @ %d. "
"Closing master connection.",
router->service->name,
router->binlog_name,
router->binlog_position)));
blr_master_close(router);
blr_master_delayed_connect(router);
return;
}
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"%s: Checksum error in event "
"from master, "
"binlog %s @ %d. "
"Closing master connection.",
router->service->name,
router->binlog_name,
router->binlog_position)));
blr_master_close(router);
blr_master_delayed_connect(router);
return;
}
router->stats.n_binlogs++;
router->lastEventReceived = hdr.event_type;
@ -1398,3 +1422,59 @@ blr_master_connected(ROUTER_INSTANCE *router)
{
return router->master_state == BLRM_BINLOGDUMP;
}
/**
* Extract a result value from the set of messages that make up a
* MySQL response packet.
*
* @param buf The GWBUF containing the response
* @param col The column number to return
* @return The result form the column or NULL. The caller must free the result
*/
static char *
blr_extract_column(GWBUF *buf, int col)
{
uint8_t *ptr;
int len, ncol, collen;
char *rval;
ptr = (uint8_t *)GWBUF_DATA(buf);
/* First packet should be the column count */
len = EXTRACT24(ptr);
ptr += 3;
if (*ptr != 1) // Check sequence number is 1
return NULL;
ptr++;
ncol = *ptr++;
if (ncol < col) // Not that many column in result
return NULL;
// Now ptr points at the column definition
while (ncol-- > 0)
{
len = EXTRACT24(ptr);
ptr += 4; // Skip to payload
ptr += len; // Skip over payload
}
// Now we should have an EOF packet
len = EXTRACT24(ptr);
ptr += 4; // Skip to payload
if (*ptr != 0xfe)
return NULL;
ptr += len;
// Finally we have reached the row
len = EXTRACT24(ptr);
ptr += 4;
while (--col > 0)
{
collen = *ptr++;
ptr += collen;
}
collen = *ptr++;
if ((rval = malloc(collen + 1)) == NULL)
return NULL;
memcpy(rval, ptr, collen);
rval[collen] = 0; // NULL terminate
return rval;
}

View File

@ -859,7 +859,8 @@ int len, actual_len, col_len, seqno, ncols, i;
*ptr++ = 0;
sprintf(column, "%s", router->uuid);
sprintf(column, "%s", router->master_uuid ?
router->master_uuid : router->uuid);
col_len = strlen(column);
*ptr++ = col_len; // Length of result string
strncpy((char *)ptr, column, col_len); // Result string