Added support for distribution of packets larger than 2^24 bytes
Moved the the sending of the replication events to a different function and added support for events that span multiple MySQL packets.
This commit is contained in:
parent
12ee568978
commit
3e04a36ac3
@ -100,6 +100,7 @@ static void blr_log_identity(ROUTER_INSTANCE *router);
|
||||
static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code);
|
||||
|
||||
int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf);
|
||||
bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf);
|
||||
|
||||
static int keepalive = 1;
|
||||
|
||||
@ -999,11 +1000,9 @@ uint32_t partialpos = 0;
|
||||
router->m_errmsg = NULL;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
#define SHOW_EVENTS
|
||||
#ifdef SHOW_EVENTS
|
||||
printf("blr: len %lu, event type 0x%02x, flags 0x%04x, event size %d, event timestamp %lu\n", (unsigned long)len-4, hdr.event_type, hdr.flags, hdr.event_size, (unsigned long)hdr.timestamp);
|
||||
#endif
|
||||
|
||||
/*
|
||||
* First check that the checksum we calculate matches the
|
||||
* checksum in the packet we received.
|
||||
@ -1778,20 +1777,13 @@ unsigned int cstate;
|
||||
if (router->send_slave_heartbeat)
|
||||
slave->lastReply = time(0);
|
||||
|
||||
pkt = gwbuf_alloc(hdr->event_size + 5);
|
||||
buf = GWBUF_DATA(pkt);
|
||||
encode_value(buf, hdr->event_size + 1, 24);
|
||||
buf += 3;
|
||||
*buf++ = slave->seqno++;
|
||||
*buf++ = 0; // OK
|
||||
memcpy(buf, ptr, hdr->event_size);
|
||||
if (hdr->event_type == ROTATE_EVENT)
|
||||
{
|
||||
blr_slave_rotate(router, slave, ptr);
|
||||
}
|
||||
slave->stats.n_bytes += gwbuf_length(pkt);
|
||||
slave->stats.n_events++;
|
||||
slave->dcb->func.write(slave->dcb, pkt);
|
||||
|
||||
blr_send_event(slave, hdr, ptr);
|
||||
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
if (hdr->event_type != ROTATE_EVENT)
|
||||
{
|
||||
@ -2335,3 +2327,130 @@ int n;
|
||||
router->last_written += data_len;
|
||||
return n;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a replication event packet to a slave
|
||||
*
|
||||
* The first replication event packet contains one byte set to either
|
||||
* 0x0, 0xfe or 0xff which signals what the state of the replication stream is.
|
||||
* If the data pointed by @c buf is not the start of the replication header
|
||||
* and part of the replication event is already sent, @c first must be set to
|
||||
* false so that the first status byte is not sent again.
|
||||
*
|
||||
* @param slave Slave where the packet is sent to
|
||||
* @param buf Buffer containing the data
|
||||
* @param len Length of the data
|
||||
* @param first If this is the first packet of a multi-packet event
|
||||
* @return True on success, false when memory allocation fails
|
||||
*/
|
||||
bool blr_send_packet(ROUTER_SLAVE *slave, uint8_t *buf, uint32_t len, bool first)
|
||||
{
|
||||
bool rval = true;
|
||||
unsigned int datalen = len + (first ? 1 : 0);
|
||||
GWBUF *buffer = gwbuf_alloc(datalen + MYSQL_HEADER_LEN);
|
||||
if (buffer)
|
||||
{
|
||||
uint8_t *data = GWBUF_DATA(buffer);
|
||||
encode_value(data, datalen, 24);
|
||||
data += 3;
|
||||
*data++ = slave->seqno++;
|
||||
|
||||
if (first)
|
||||
{
|
||||
*data++ = 0; // OK byte
|
||||
}
|
||||
|
||||
if (len > 0)
|
||||
{
|
||||
memcpy(data, buf, len);
|
||||
}
|
||||
|
||||
slave->stats.n_bytes += GWBUF_LENGTH(buffer);
|
||||
slave->dcb->func.write(slave->dcb, buffer);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("failed to allocate %ld bytes of memory when writing an"
|
||||
" event.", datalen + MYSQL_HEADER_LEN);
|
||||
rval = false;
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a single replication event to a slave
|
||||
*
|
||||
* This sends the complete replication event to a slave. If the event size exceeds
|
||||
* the maximum size of a MySQL packet, it will be sent in multiple packets.
|
||||
*
|
||||
* @param slave Slave where the event is sent to
|
||||
* @param hdr Replication header
|
||||
* @param buf Pointer to the replication event as it was read from the disk
|
||||
* @return True on success, false if memory allocation failed
|
||||
*/
|
||||
bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf)
|
||||
{
|
||||
bool rval = true;
|
||||
|
||||
/** Check if the event and the OK byte fit into a single packet */
|
||||
if (hdr->event_size + 1 < MYSQL_PACKET_LENGTH_MAX)
|
||||
{
|
||||
rval = blr_send_packet(slave, buf, hdr->event_size, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
int64_t len = hdr->event_size + 1;
|
||||
|
||||
if (blr_send_packet(slave, buf, MYSQL_PACKET_LENGTH_MAX - 1, true))
|
||||
{
|
||||
len -= MYSQL_PACKET_LENGTH_MAX;
|
||||
buf += MYSQL_PACKET_LENGTH_MAX - 1;
|
||||
|
||||
/** Check for special case where length of the MySQL packet
|
||||
* is exactly 0x00ffffff bytes long. In this case we need to send
|
||||
* an empty packet to tell the slave that the whole event is sent. */
|
||||
if (len == 0)
|
||||
{
|
||||
blr_send_packet(slave, buf, 0, false);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
rval = false;
|
||||
}
|
||||
|
||||
while (rval && len > 0)
|
||||
{
|
||||
/** Write rest of the event data */
|
||||
uint64_t payload_len = MIN(MYSQL_PACKET_LENGTH_MAX, len);
|
||||
|
||||
if (blr_send_packet(slave, buf, payload_len, false))
|
||||
{
|
||||
/** The check for exactly 0x00ffffff bytes needs to be done
|
||||
* here as well */
|
||||
if (len == MYSQL_PACKET_LENGTH_MAX)
|
||||
{
|
||||
ss_dassert(len - MYSQL_PACKET_LENGTH_MAX == 0);
|
||||
blr_send_packet(slave, buf, 0, false);
|
||||
}
|
||||
|
||||
len -= payload_len;
|
||||
buf += payload_len;
|
||||
}
|
||||
else
|
||||
{
|
||||
rval = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
slave->stats.n_events++;
|
||||
|
||||
if (!rval)
|
||||
{
|
||||
MXS_ERROR("Failed to send an event of %u bytes to slave at %s:%d.",
|
||||
hdr->event_size, slave->dcb->remote,
|
||||
ntohs(slave->dcb->ipv4.sin_port));
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
@ -152,6 +152,7 @@ static int blr_slave_handle_status_variables(ROUTER_INSTANCE *router, ROUTER_SLA
|
||||
static int blr_slave_send_columndef_with_status_schema(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *name, int type, int len, uint8_t seqno);
|
||||
static void blr_send_slave_heartbeat(void *inst);
|
||||
static int blr_slave_send_heartbeat(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
||||
bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf);
|
||||
|
||||
void poll_fake_write_event(DCB *dcb);
|
||||
|
||||
@ -2026,60 +2027,6 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
||||
while (burst-- && burst_size > 0 &&
|
||||
(record = blr_read_binlog(router, file, slave->binlog_pos, &hdr, read_errmsg)) != NULL)
|
||||
{
|
||||
uint32_t totalsize = hdr.event_size + 1;
|
||||
|
||||
if (totalsize >= 0x00ffffff)
|
||||
{
|
||||
/** Write first header with the OK byte */
|
||||
GWBUF *head_part = gwbuf_alloc(5);
|
||||
ptr = GWBUF_DATA(head_part);
|
||||
encode_value(ptr, 0x00ffffff, 24);
|
||||
ptr += 3;
|
||||
*ptr++ = slave->seqno++;
|
||||
*ptr++ = 0;
|
||||
GWBUF* record_part = gwbuf_clone_portion(record, 0, 0x00ffffff - 1);
|
||||
head_part = gwbuf_append(head_part, record_part);
|
||||
record = gwbuf_consume(record, 0x00ffffff - 1);
|
||||
written = slave->dcb->func.write(slave->dcb, head_part);
|
||||
totalsize -= 0x00ffffff;
|
||||
|
||||
/** Write all packets that are larger than 0x00ffffff */
|
||||
while (totalsize >= 0x00ffffff)
|
||||
{
|
||||
head_part = gwbuf_alloc(4);
|
||||
ptr = GWBUF_DATA(head_part);
|
||||
encode_value(ptr, 0x00ffffff, 24);
|
||||
ptr += 3;
|
||||
*ptr = slave->seqno++;
|
||||
record_part = gwbuf_clone_portion(record, 0, 0x00ffffff);
|
||||
head_part = gwbuf_append(head_part, record_part);
|
||||
record = gwbuf_consume(record, 0x00ffffff);
|
||||
written += slave->dcb->func.write(slave->dcb, head_part);
|
||||
totalsize -= 0x00ffffff;
|
||||
}
|
||||
|
||||
/** Write last packet which is smaller than 0x00ffffff */
|
||||
head_part = gwbuf_alloc(4);
|
||||
ptr = GWBUF_DATA(head_part);
|
||||
encode_value(ptr, totalsize, 24);
|
||||
ptr += 3;
|
||||
*ptr = slave->seqno++;
|
||||
head_part = gwbuf_append(head_part, record);
|
||||
ss_dassert(totalsize - (gwbuf_length(head_part) - 4) == 0);
|
||||
written += slave->dcb->func.write(slave->dcb, head_part);
|
||||
}
|
||||
else
|
||||
{
|
||||
head = gwbuf_alloc(5);
|
||||
ptr = GWBUF_DATA(head);
|
||||
encode_value(ptr, hdr.event_size + 1, 24);
|
||||
ptr += 3;
|
||||
*ptr++ = slave->seqno++;
|
||||
*ptr++ = 0; // OK
|
||||
head = gwbuf_append(head, record);
|
||||
slave->lastEventTimestamp = hdr.timestamp;
|
||||
slave->lastEventReceived = hdr.event_type;
|
||||
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
{
|
||||
unsigned long beat1 = hkheartbeat;
|
||||
@ -2130,11 +2077,12 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
||||
MXS_ERROR("blr_open_binlog took %lu beats",
|
||||
hkheartbeat - beat1);
|
||||
}
|
||||
slave->stats.n_bytes += gwbuf_length(head);
|
||||
written = slave->dcb->func.write(slave->dcb, head);
|
||||
}
|
||||
|
||||
if (written && hdr.event_type != ROTATE_EVENT)
|
||||
blr_send_event(slave, &hdr, (uint8_t*) record->start);
|
||||
gwbuf_free(record);
|
||||
record = NULL;
|
||||
|
||||
if (hdr.event_type != ROTATE_EVENT)
|
||||
{
|
||||
slave->binlog_pos = hdr.next_pos;
|
||||
}
|
||||
@ -4720,4 +4668,3 @@ int filename_len = strlen(slave->binlogfile);
|
||||
/* Write the packet */
|
||||
return slave->dcb->func.write(slave->dcb, resp);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user