MXS-1075: MariaDB 10 slave connecting with GTID
A MariaDB 10 slave connects to binlog server with GTID request
This commit is contained in:
@ -2204,31 +2204,186 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue
|
||||
REP_HEADER hdr;
|
||||
uint32_t chksum;
|
||||
uint32_t fde_end_pos;
|
||||
uint32_t requested_pos;
|
||||
|
||||
ptr = GWBUF_DATA(queue);
|
||||
len = extract_field(ptr, 24);
|
||||
binlognamelen = len - 11;
|
||||
if (binlognamelen > BINLOG_FNAMELEN)
|
||||
{
|
||||
MXS_ERROR("blr_slave_binlog_dump truncating binlog filename "
|
||||
"from %d to %d",
|
||||
binlognamelen, BINLOG_FNAMELEN);
|
||||
binlognamelen = BINLOG_FNAMELEN;
|
||||
}
|
||||
|
||||
ptr += 4; // Skip length and sequence number
|
||||
if (*ptr++ != COM_BINLOG_DUMP)
|
||||
{
|
||||
MXS_ERROR("blr_slave_binlog_dump expected a COM_BINLOG_DUMP but received %d",
|
||||
*(ptr - 1));
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
slave->binlog_pos = extract_field(ptr, 32);
|
||||
/* Get the current router binlog file */
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
strcpy(slave->binlogfile, router->binlog_name);
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
/* Set the safe pos */
|
||||
slave->binlog_pos = 4;
|
||||
|
||||
/* Get the requested pos from packet */
|
||||
requested_pos = extract_field(ptr, 32);
|
||||
|
||||
/* Go ahead: after 4 bytes pos, 2 bytes flag and 4 bytes serverid */
|
||||
ptr += 4;
|
||||
ptr += 2;
|
||||
ptr += 4;
|
||||
memcpy(slave->binlogfile, (char *)ptr, binlognamelen);
|
||||
slave->binlogfile[binlognamelen] = 0;
|
||||
|
||||
/* ptr now points to requested filename, if present */
|
||||
if (binlognamelen)
|
||||
{
|
||||
if (binlognamelen > BINLOG_FNAMELEN)
|
||||
{
|
||||
char req_file[binlognamelen + 1];
|
||||
char errmsg[BINLOG_ERROR_MSG_LEN + 1];
|
||||
memcpy(req_file, (char *)ptr, binlognamelen);
|
||||
|
||||
MXS_ERROR("Slave %lu requests COM_BINLOG_DUMP with a filename %s"
|
||||
" longer than max %d chars. Aborting.",
|
||||
(unsigned long)slave->serverid,
|
||||
req_file,
|
||||
BINLOG_FNAMELEN);
|
||||
snprintf(errmsg, BINLOG_ERROR_MSG_LEN,
|
||||
"Connecting slave requested binlog"
|
||||
" file name %s longer than max %d chars.",
|
||||
req_file,
|
||||
BINLOG_FNAMELEN);
|
||||
|
||||
errmsg[BINLOG_ERROR_MSG_LEN] = '\0';
|
||||
|
||||
blr_send_custom_error(slave->dcb,
|
||||
slave->seqno + 1,
|
||||
0,
|
||||
errmsg,
|
||||
"HY000",
|
||||
BINLOG_FATAL_ERROR_READING);
|
||||
slave->state = BLRS_ERRORED;
|
||||
dcb_close(slave->dcb);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Set the received filename */
|
||||
memcpy(slave->binlogfile, (char *)ptr, binlognamelen);
|
||||
slave->binlogfile[binlognamelen] = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check MariaDB GTID request
|
||||
*/
|
||||
if (slave->mariadb10_compat &&
|
||||
slave->mariadb_gtid)
|
||||
{
|
||||
MXS_INFO("Slave %lu is registering with MariaDB GTID '%s'",
|
||||
(unsigned long)slave->serverid,
|
||||
slave->mariadb_gtid);
|
||||
|
||||
if (!slave->mariadb_gtid[0])
|
||||
{
|
||||
/* Note: file is the router current file and pos is 4 */
|
||||
MXS_INFO("Slave %lu is registering with empty GTID:"
|
||||
" sending events from current binlog file %s, pos %lu",
|
||||
(unsigned long)slave->serverid,
|
||||
slave->binlogfile,
|
||||
(unsigned long)slave->binlog_pos);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* TODO */
|
||||
/* Shall we avoid the lookup if file & pos is set? */
|
||||
|
||||
/* Fetch the GTID from the storage */
|
||||
MARIADB_GTID_INFO *f_gtid = hashtable_fetch(router->gtid_repo,
|
||||
slave->mariadb_gtid);
|
||||
/* Not Found */
|
||||
if (!f_gtid)
|
||||
{
|
||||
MXS_WARNING("Requested MariaDB GTID '%s' by server %lu"
|
||||
" has not been found",
|
||||
slave->mariadb_gtid,
|
||||
(unsigned long)slave->serverid);
|
||||
|
||||
/* Check strict mode */
|
||||
if (slave->gtid_strict_mode)
|
||||
{
|
||||
strcpy(slave->binlogfile, "");
|
||||
slave->binlog_pos = 0;
|
||||
blr_send_custom_error(slave->dcb,
|
||||
slave->seqno + 1,
|
||||
0,
|
||||
"connecting slave requested to start"
|
||||
" from non existent GTID.",
|
||||
"HY000",
|
||||
BINLOG_FATAL_ERROR_READING);
|
||||
slave->state = BLRS_ERRORED;
|
||||
dcb_close(slave->dcb);
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
/**
|
||||
* Right now: use current router binlog file pos 4
|
||||
*/
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* A Gtid has been found */
|
||||
MXS_INFO("Found GTID '%s' for slave %lu at %s:%lu",
|
||||
slave->mariadb_gtid,
|
||||
(unsigned long)slave->serverid,
|
||||
f_gtid->file,
|
||||
f_gtid->end);
|
||||
|
||||
/**
|
||||
* Check whether GTID request has file & pos:
|
||||
*/
|
||||
if (!binlognamelen)
|
||||
{
|
||||
/**
|
||||
* No filename: use GTID file & pos
|
||||
*/
|
||||
|
||||
/* Set binlog file to the GTID one */
|
||||
strcpy(slave->binlogfile, f_gtid->file);
|
||||
|
||||
/**
|
||||
* Set position to GTID event pos:
|
||||
* i.e the next pos of COMMIT)
|
||||
*/
|
||||
slave->binlog_pos = f_gtid->end;
|
||||
}
|
||||
else
|
||||
{
|
||||
/**
|
||||
* The slave has requested a GTID while
|
||||
* requesting a file & pos which can be different
|
||||
* from file @ pos of that GTID:
|
||||
*
|
||||
* i.e: Log file is different (after rotate)
|
||||
* or other events (if any) could have been
|
||||
* written to binlog after GTID.
|
||||
*
|
||||
* Binlog file is the requested one, so just set
|
||||
* the pos to the requested one
|
||||
*/
|
||||
slave->binlog_pos = requested_pos;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/**
|
||||
* Binlog file has been set above.
|
||||
* Just set the received pos
|
||||
*/
|
||||
slave->binlog_pos = requested_pos;
|
||||
}
|
||||
|
||||
if (router->trx_safe)
|
||||
{
|
||||
@ -2272,6 +2427,8 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue
|
||||
}
|
||||
}
|
||||
|
||||
binlognamelen = strlen(slave->binlogfile);
|
||||
|
||||
MXS_DEBUG("%s: COM_BINLOG_DUMP: binlog name '%s', length %d, "
|
||||
"from position %lu.", router->service->name,
|
||||
slave->binlogfile, binlognamelen,
|
||||
@ -2401,7 +2558,6 @@ encode_value(unsigned char *data, unsigned int value, int len)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Populate a header structure for a replication message from a GWBUF structure.
|
||||
*
|
||||
@ -2547,7 +2703,12 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
|
||||
snprintf(err_msg, BINLOG_ERROR_MSG_LEN, "Failed to open binlog '%s'", slave->binlogfile);
|
||||
|
||||
/* Send error that stops slave replication */
|
||||
blr_send_custom_error(slave->dcb, slave->seqno++, 0, err_msg, "HY000", 1236);
|
||||
blr_send_custom_error(slave->dcb,
|
||||
slave->seqno++,
|
||||
0,
|
||||
err_msg,
|
||||
"HY000",
|
||||
BINLOG_FATAL_ERROR_READING);
|
||||
|
||||
dcb_close(slave->dcb);
|
||||
return 0;
|
||||
@ -2672,7 +2833,12 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
|
||||
"Failed to open binlog '%s' in rotate event", slave->binlogfile);
|
||||
|
||||
/* Send error that stops slave replication */
|
||||
blr_send_custom_error(slave->dcb, (slave->seqno - 1), 0, err_msg, "HY000", 1236);
|
||||
blr_send_custom_error(slave->dcb,
|
||||
(slave->seqno - 1),
|
||||
0,
|
||||
err_msg,
|
||||
"HY000",
|
||||
BINLOG_FATAL_ERROR_READING);
|
||||
|
||||
dcb_close(slave->dcb);
|
||||
break;
|
||||
@ -2783,7 +2949,12 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
|
||||
/*
|
||||
* Send an error that will stop slave replication
|
||||
*/
|
||||
blr_send_custom_error(slave->dcb, slave->seqno++, 0, read_errmsg, "HY000", 1236);
|
||||
blr_send_custom_error(slave->dcb,
|
||||
slave->seqno++,
|
||||
0,
|
||||
read_errmsg,
|
||||
"HY000",
|
||||
BINLOG_FATAL_ERROR_READING);
|
||||
|
||||
dcb_close(slave->dcb);
|
||||
#ifndef BLFILE_IN_SLAVE
|
||||
|
Reference in New Issue
Block a user