Fix for 16MB transmission in the slave part
Fix for 16MB transmission in the slave part
This commit is contained in:
committed by
Markus Makela
parent
4eccc1acb6
commit
ab1fb90d86
@ -2025,6 +2025,50 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
|||||||
|
|
||||||
while (burst-- && burst_size > 0 &&
|
while (burst-- && burst_size > 0 &&
|
||||||
(record = blr_read_binlog(router, file, slave->binlog_pos, &hdr, read_errmsg)) != NULL)
|
(record = blr_read_binlog(router, file, slave->binlog_pos, &hdr, read_errmsg)) != NULL)
|
||||||
|
{
|
||||||
|
uint32_t totalsize = hdr.event_size + 1;
|
||||||
|
|
||||||
|
if (totalsize >= 0x00ffffff)
|
||||||
|
{
|
||||||
|
/** Write first header with the OK byte */
|
||||||
|
GWBUF *head_part = gwbuf_alloc(5);
|
||||||
|
ptr = GWBUF_DATA(head_part);
|
||||||
|
encode_value(ptr, 0x00ffffff, 24);
|
||||||
|
ptr += 3;
|
||||||
|
*ptr++ = slave->seqno++;
|
||||||
|
*ptr++ = 0;
|
||||||
|
GWBUF* record_part = gwbuf_clone_portion(record, 0, 0x00ffffff - 1);
|
||||||
|
head_part = gwbuf_append(head_part, record_part);
|
||||||
|
record = gwbuf_consume(record, 0x00ffffff - 1);
|
||||||
|
written = slave->dcb->func.write(slave->dcb, head_part);
|
||||||
|
totalsize -= 0x00ffffff;
|
||||||
|
|
||||||
|
/** Write all packets that are larger than 0x00ffffff */
|
||||||
|
while (totalsize >= 0x00ffffff)
|
||||||
|
{
|
||||||
|
head_part = gwbuf_alloc(4);
|
||||||
|
ptr = GWBUF_DATA(head_part);
|
||||||
|
encode_value(ptr, 0x00ffffff, 24);
|
||||||
|
ptr += 3;
|
||||||
|
*ptr = slave->seqno++;
|
||||||
|
record_part = gwbuf_clone_portion(record, 0, 0x00ffffff);
|
||||||
|
head_part = gwbuf_append(head_part, record_part);
|
||||||
|
record = gwbuf_consume(record, 0x00ffffff);
|
||||||
|
written += slave->dcb->func.write(slave->dcb, head_part);
|
||||||
|
totalsize -= 0x00ffffff;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Write last packet which is smaller than 0x00ffffff */
|
||||||
|
head_part = gwbuf_alloc(4);
|
||||||
|
ptr = GWBUF_DATA(head_part);
|
||||||
|
encode_value(ptr, totalsize, 24);
|
||||||
|
ptr += 3;
|
||||||
|
*ptr = slave->seqno++;
|
||||||
|
head_part = gwbuf_append(head_part, record);
|
||||||
|
ss_dassert(totalsize - (gwbuf_length(head_part) - 4) == 0);
|
||||||
|
written += slave->dcb->func.write(slave->dcb, head_part);
|
||||||
|
}
|
||||||
|
else
|
||||||
{
|
{
|
||||||
head = gwbuf_alloc(5);
|
head = gwbuf_alloc(5);
|
||||||
ptr = GWBUF_DATA(head);
|
ptr = GWBUF_DATA(head);
|
||||||
@ -2088,11 +2132,12 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
|||||||
}
|
}
|
||||||
slave->stats.n_bytes += gwbuf_length(head);
|
slave->stats.n_bytes += gwbuf_length(head);
|
||||||
written = slave->dcb->func.write(slave->dcb, head);
|
written = slave->dcb->func.write(slave->dcb, head);
|
||||||
|
}
|
||||||
|
|
||||||
if (written && hdr.event_type != ROTATE_EVENT)
|
if (written && hdr.event_type != ROTATE_EVENT)
|
||||||
{
|
{
|
||||||
slave->binlog_pos = hdr.next_pos;
|
slave->binlog_pos = hdr.next_pos;
|
||||||
}
|
}
|
||||||
rval = written;
|
|
||||||
slave->stats.n_events++;
|
slave->stats.n_events++;
|
||||||
burst_size -= hdr.event_size;
|
burst_size -= hdr.event_size;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user