diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index 94c9b9632..819b460c6 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -100,6 +100,7 @@ static void blr_log_identity(ROUTER_INSTANCE *router); static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code); int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf); +bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf); static int keepalive = 1; @@ -999,11 +1000,9 @@ uint32_t partialpos = 0; router->m_errmsg = NULL; spinlock_release(&router->lock); -#define SHOW_EVENTS #ifdef SHOW_EVENTS printf("blr: len %lu, event type 0x%02x, flags 0x%04x, event size %d, event timestamp %lu\n", (unsigned long)len-4, hdr.event_type, hdr.flags, hdr.event_size, (unsigned long)hdr.timestamp); #endif - /* * First check that the checksum we calculate matches the * checksum in the packet we received. @@ -1778,20 +1777,13 @@ unsigned int cstate; if (router->send_slave_heartbeat) slave->lastReply = time(0); - pkt = gwbuf_alloc(hdr->event_size + 5); - buf = GWBUF_DATA(pkt); - encode_value(buf, hdr->event_size + 1, 24); - buf += 3; - *buf++ = slave->seqno++; - *buf++ = 0; // OK - memcpy(buf, ptr, hdr->event_size); if (hdr->event_type == ROTATE_EVENT) { blr_slave_rotate(router, slave, ptr); } - slave->stats.n_bytes += gwbuf_length(pkt); - slave->stats.n_events++; - slave->dcb->func.write(slave->dcb, pkt); + + blr_send_event(slave, hdr, ptr); + spinlock_acquire(&slave->catch_lock); if (hdr->event_type != ROTATE_EVENT) { @@ -2335,3 +2327,130 @@ int n; router->last_written += data_len; return n; } + +/** + * Send a replication event packet to a slave + * + * The first replication event packet contains one byte set to either + * 0x0, 0xfe or 0xff which signals what the state of the replication stream is. + * If the data pointed by @c buf is not the start of the replication header + * and part of the replication event is already sent, @c first must be set to + * false so that the first status byte is not sent again. + * + * @param slave Slave where the packet is sent to + * @param buf Buffer containing the data + * @param len Length of the data + * @param first If this is the first packet of a multi-packet event + * @return True on success, false when memory allocation fails + */ +bool blr_send_packet(ROUTER_SLAVE *slave, uint8_t *buf, uint32_t len, bool first) +{ + bool rval = true; + unsigned int datalen = len + (first ? 1 : 0); + GWBUF *buffer = gwbuf_alloc(datalen + MYSQL_HEADER_LEN); + if (buffer) + { + uint8_t *data = GWBUF_DATA(buffer); + encode_value(data, datalen, 24); + data += 3; + *data++ = slave->seqno++; + + if (first) + { + *data++ = 0; // OK byte + } + + if (len > 0) + { + memcpy(data, buf, len); + } + + slave->stats.n_bytes += GWBUF_LENGTH(buffer); + slave->dcb->func.write(slave->dcb, buffer); + } + else + { + MXS_ERROR("failed to allocate %ld bytes of memory when writing an" + " event.", datalen + MYSQL_HEADER_LEN); + rval = false; + } + return rval; +} + +/** + * Send a single replication event to a slave + * + * This sends the complete replication event to a slave. If the event size exceeds + * the maximum size of a MySQL packet, it will be sent in multiple packets. + * + * @param slave Slave where the event is sent to + * @param hdr Replication header + * @param buf Pointer to the replication event as it was read from the disk + * @return True on success, false if memory allocation failed + */ +bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf) +{ + bool rval = true; + + /** Check if the event and the OK byte fit into a single packet */ + if (hdr->event_size + 1 < MYSQL_PACKET_LENGTH_MAX) + { + rval = blr_send_packet(slave, buf, hdr->event_size, true); + } + else + { + int64_t len = hdr->event_size + 1; + + if (blr_send_packet(slave, buf, MYSQL_PACKET_LENGTH_MAX - 1, true)) + { + len -= MYSQL_PACKET_LENGTH_MAX; + buf += MYSQL_PACKET_LENGTH_MAX - 1; + + /** Check for special case where length of the MySQL packet + * is exactly 0x00ffffff bytes long. In this case we need to send + * an empty packet to tell the slave that the whole event is sent. */ + if (len == 0) + { + blr_send_packet(slave, buf, 0, false); + } + } + else + { + rval = false; + } + + while (rval && len > 0) + { + /** Write rest of the event data */ + uint64_t payload_len = MIN(MYSQL_PACKET_LENGTH_MAX, len); + + if (blr_send_packet(slave, buf, payload_len, false)) + { + /** The check for exactly 0x00ffffff bytes needs to be done + * here as well */ + if (len == MYSQL_PACKET_LENGTH_MAX) + { + ss_dassert(len - MYSQL_PACKET_LENGTH_MAX == 0); + blr_send_packet(slave, buf, 0, false); + } + + len -= payload_len; + buf += payload_len; + } + else + { + rval = false; + } + } + } + + slave->stats.n_events++; + + if (!rval) + { + MXS_ERROR("Failed to send an event of %u bytes to slave at %s:%d.", + hdr->event_size, slave->dcb->remote, + ntohs(slave->dcb->ipv4.sin_port)); + } + return rval; +} diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index 3496cd92a..08a11793a 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -152,6 +152,7 @@ static int blr_slave_handle_status_variables(ROUTER_INSTANCE *router, ROUTER_SLA static int blr_slave_send_columndef_with_status_schema(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *name, int type, int len, uint8_t seqno); static void blr_send_slave_heartbeat(void *inst); static int blr_slave_send_heartbeat(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); +bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf); void poll_fake_write_event(DCB *dcb); @@ -2026,60 +2027,6 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1]; while (burst-- && burst_size > 0 && (record = blr_read_binlog(router, file, slave->binlog_pos, &hdr, read_errmsg)) != NULL) { - uint32_t totalsize = hdr.event_size + 1; - - if (totalsize >= 0x00ffffff) - { - /** Write first header with the OK byte */ - GWBUF *head_part = gwbuf_alloc(5); - ptr = GWBUF_DATA(head_part); - encode_value(ptr, 0x00ffffff, 24); - ptr += 3; - *ptr++ = slave->seqno++; - *ptr++ = 0; - GWBUF* record_part = gwbuf_clone_portion(record, 0, 0x00ffffff - 1); - head_part = gwbuf_append(head_part, record_part); - record = gwbuf_consume(record, 0x00ffffff - 1); - written = slave->dcb->func.write(slave->dcb, head_part); - totalsize -= 0x00ffffff; - - /** Write all packets that are larger than 0x00ffffff */ - while (totalsize >= 0x00ffffff) - { - head_part = gwbuf_alloc(4); - ptr = GWBUF_DATA(head_part); - encode_value(ptr, 0x00ffffff, 24); - ptr += 3; - *ptr = slave->seqno++; - record_part = gwbuf_clone_portion(record, 0, 0x00ffffff); - head_part = gwbuf_append(head_part, record_part); - record = gwbuf_consume(record, 0x00ffffff); - written += slave->dcb->func.write(slave->dcb, head_part); - totalsize -= 0x00ffffff; - } - - /** Write last packet which is smaller than 0x00ffffff */ - head_part = gwbuf_alloc(4); - ptr = GWBUF_DATA(head_part); - encode_value(ptr, totalsize, 24); - ptr += 3; - *ptr = slave->seqno++; - head_part = gwbuf_append(head_part, record); - ss_dassert(totalsize - (gwbuf_length(head_part) - 4) == 0); - written += slave->dcb->func.write(slave->dcb, head_part); - } - else - { - head = gwbuf_alloc(5); - ptr = GWBUF_DATA(head); - encode_value(ptr, hdr.event_size + 1, 24); - ptr += 3; - *ptr++ = slave->seqno++; - *ptr++ = 0; // OK - head = gwbuf_append(head, record); - slave->lastEventTimestamp = hdr.timestamp; - slave->lastEventReceived = hdr.event_type; - if (hdr.event_type == ROTATE_EVENT) { unsigned long beat1 = hkheartbeat; @@ -2130,11 +2077,12 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1]; MXS_ERROR("blr_open_binlog took %lu beats", hkheartbeat - beat1); } - slave->stats.n_bytes += gwbuf_length(head); - written = slave->dcb->func.write(slave->dcb, head); - } - if (written && hdr.event_type != ROTATE_EVENT) + blr_send_event(slave, &hdr, (uint8_t*) record->start); + gwbuf_free(record); + record = NULL; + + if (hdr.event_type != ROTATE_EVENT) { slave->binlog_pos = hdr.next_pos; } @@ -4720,4 +4668,3 @@ int filename_len = strlen(slave->binlogfile); /* Write the packet */ return slave->dcb->func.write(slave->dcb, resp); } -