Binlogserver sends GTID_LIST Fake Event when a MariaDB 10 slave connects with GTID
Now sending GTID_LIST Fake Event when a MariaDB10 Slave connects with GTID
This commit is contained in:
@ -270,6 +270,11 @@ static bool blr_slave_gtid_request(ROUTER_INSTANCE *router,
|
||||
bool req_file,
|
||||
unsigned long req_pos);
|
||||
|
||||
static int blr_send_fake_gtid_list(ROUTER_SLAVE *slave,
|
||||
const char *gtid,
|
||||
uint32_t serverid);
|
||||
static bool blr_parse_gtid(const char *gtid, MARIADB_GTID_ELEMS *info);
|
||||
|
||||
void poll_fake_write_event(DCB *dcb);
|
||||
|
||||
/**
|
||||
@ -2416,6 +2421,23 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue
|
||||
blr_slave_read_ste(router, slave, fde_end_pos);
|
||||
}
|
||||
|
||||
/* Add GTID_LIST Fake Event before sending any new event */
|
||||
if (slave->mariadb10_compat &&
|
||||
slave->mariadb_gtid)
|
||||
{
|
||||
if (!blr_send_fake_gtid_list(slave,
|
||||
slave->mariadb_gtid,
|
||||
router->masterid))
|
||||
{
|
||||
// ERROR
|
||||
slave->state = BLRS_ERRORED;
|
||||
dcb_close(slave->dcb);
|
||||
return 1;
|
||||
}
|
||||
slave->lastEventReceived = MARIADB10_GTID_GTID_LIST_EVENT;
|
||||
}
|
||||
|
||||
/* Set dcb_callback for the events reading routine */
|
||||
dcb_add_callback(slave->dcb, DCB_REASON_DRAINED, blr_slave_callback, slave);
|
||||
|
||||
slave->state = BLRS_DUMPING;
|
||||
@ -6935,3 +6957,173 @@ static bool blr_slave_gtid_request(ROUTER_INSTANCE *router,
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a Fake GTID_LIST event
|
||||
*
|
||||
* The routine creates a Fake GTID_LIST event
|
||||
*
|
||||
* @param slave The connected client
|
||||
* @param gtid The requested GTID from client
|
||||
* @param serverid The router server_id to add
|
||||
* in the replication event header
|
||||
* @return Fake GTID_LIST event on success or NULL
|
||||
*
|
||||
*/
|
||||
static GWBUF *blr_build_fake_gtid_list_event(ROUTER_SLAVE *slave,
|
||||
const char *gtid,
|
||||
uint32_t serverid)
|
||||
{
|
||||
int len;
|
||||
GWBUF *gl_event;
|
||||
uint8_t *ptr;
|
||||
REP_HEADER hdr;
|
||||
uint32_t chksum;
|
||||
MARIADB_GTID_ELEMS req_gtid = {};
|
||||
|
||||
if (!blr_parse_gtid(gtid, &req_gtid))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* We only support one GTID in the GTID_LIST event
|
||||
*
|
||||
* Paylod is:
|
||||
* BINLOG_EVENT_HDR_LEN + 4 bytes GTID count + 1 GTID
|
||||
*/
|
||||
len = BINLOG_EVENT_HDR_LEN + 4 + 1 * (4 + 4 + 8);
|
||||
|
||||
/* Add CRC32 bytes if needed */
|
||||
len += slave->nocrc ? 0 : BINLOG_EVENT_CRC_SIZE;
|
||||
|
||||
/* Allocate space for packet header, status and data */
|
||||
if ((gl_event = gwbuf_alloc(MYSQL_HEADER_LEN + 1 + len)) == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
/* Add 1 byte to paylod for status indicator */
|
||||
hdr.payload_len = len + 1;
|
||||
|
||||
/* Add sequence and increment it */
|
||||
hdr.seqno = slave->seqno++;
|
||||
|
||||
/* Set status indicator byte to OK */
|
||||
hdr.ok = 0;
|
||||
|
||||
/* No timestamp is required */
|
||||
hdr.timestamp = 0L;
|
||||
|
||||
/* GTID Event Type */
|
||||
hdr.event_type = MARIADB10_GTID_GTID_LIST_EVENT;
|
||||
hdr.serverid = serverid;
|
||||
hdr.event_size = len;
|
||||
|
||||
/* Next pos is set */
|
||||
hdr.next_pos = slave->binlog_pos;
|
||||
|
||||
/* Artificial Event Flag */
|
||||
hdr.flags = 0x20;
|
||||
|
||||
/* Add replication hdr to resp */
|
||||
ptr = blr_build_header(gl_event, &hdr);
|
||||
|
||||
/* Add 4 bytes count */
|
||||
/* Note: We set only 1 GTID in GTID_LIST Event */
|
||||
encode_value(ptr, 1, 32);
|
||||
ptr += 4;
|
||||
|
||||
/* Add 4 bytes domain id */
|
||||
encode_value(ptr, req_gtid.domain_id, 32);
|
||||
ptr += 4;
|
||||
|
||||
/* Add 4 bytes server id*/
|
||||
encode_value(ptr, req_gtid.server_id, 32);
|
||||
ptr += 4;
|
||||
|
||||
/* Add 8 bytes sequence */
|
||||
encode_value(ptr, req_gtid.seq_no, 64);
|
||||
ptr += 8;
|
||||
|
||||
/* Now add the CRC to the fake binlog rotate event */
|
||||
if (!slave->nocrc)
|
||||
{
|
||||
/*
|
||||
* First checksum of an empty buffer
|
||||
* then the checksum of the event portion of the message:
|
||||
* we do not include the len, seq number and ok byte that are part of
|
||||
* first 5 bytes of the message.
|
||||
* We also do not include the 4 byte checksum itself.
|
||||
*/
|
||||
chksum = crc32(0L, NULL, 0);
|
||||
chksum = crc32(chksum,
|
||||
GWBUF_DATA(gl_event) + MYSQL_HEADER_LEN + 1,
|
||||
hdr.event_size - BINLOG_EVENT_CRC_SIZE);
|
||||
encode_value(ptr, chksum, 32);
|
||||
}
|
||||
|
||||
return gl_event;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and send a Fake GTID_LIST event
|
||||
*
|
||||
* @param slave Current slave server
|
||||
* @param gtid The requested GTID from client
|
||||
* @oaram serverid The server_id to use in replication header
|
||||
* @return Non-zero if data has been sent
|
||||
*/
|
||||
static int blr_send_fake_gtid_list(ROUTER_SLAVE *slave,
|
||||
const char *gtid,
|
||||
uint32_t serverid)
|
||||
{
|
||||
/* Build Fake GTID_LIST Event */
|
||||
GWBUF *gl_event = blr_build_fake_gtid_list_event(slave,
|
||||
gtid,
|
||||
serverid);
|
||||
|
||||
/* Send Fake GTID_LIST Event or return 0*/
|
||||
return gl_event ? slave->dcb->func.write(slave->dcb, gl_event) : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the GTID the client requested
|
||||
*
|
||||
* @param gtid Then input GTID
|
||||
* @param info The GTID structure to fil
|
||||
* @return True for a parsed GTID string or false
|
||||
*/
|
||||
static bool blr_parse_gtid(const char *gtid, MARIADB_GTID_ELEMS *info)
|
||||
{
|
||||
const char *ptr = gtid;
|
||||
int read = 0;
|
||||
int len = strlen(gtid);
|
||||
|
||||
while (ptr < gtid + len)
|
||||
{
|
||||
if (!isdigit(*ptr))
|
||||
{
|
||||
ptr++;
|
||||
}
|
||||
else
|
||||
{
|
||||
char *end;
|
||||
switch (read)
|
||||
{
|
||||
case 0:
|
||||
info->domain_id = strtoul(ptr, &end, 10);
|
||||
break;
|
||||
case 1:
|
||||
info->server_id = strtoul(ptr, &end, 10);
|
||||
break;
|
||||
case 2:
|
||||
info->seq_no = strtoul(ptr, &end, 10);
|
||||
break;
|
||||
}
|
||||
read++;
|
||||
ptr = end;
|
||||
}
|
||||
}
|
||||
|
||||
return (info->server_id && info->seq_no) ? true : false;
|
||||
}
|
||||
|
Reference in New Issue
Block a user