diff --git a/server/modules/routing/binlogrouter/blr.h b/server/modules/routing/binlogrouter/blr.h index 7e21c2c2c..4b0f294b2 100644 --- a/server/modules/routing/binlogrouter/blr.h +++ b/server/modules/routing/binlogrouter/blr.h @@ -494,7 +494,7 @@ typedef struct router_instance int pending_transaction; /*< Pending transaction */ enum blr_event_state master_event_state; /*< Packet read state */ REP_HEADER stored_header; /*< Relication header of the event the master is sending */ - GWBUF *stored_event; /*< Partial even buffer */ + GWBUF *stored_event; /*< Buffer where partial events are stored */ uint64_t last_safe_pos; /* last committed transaction */ char binlog_name[BINLOG_FNAMELEN + 1]; /*< Name of the current binlog file */ diff --git a/server/modules/routing/binlogrouter/blr_master.c b/server/modules/routing/binlogrouter/blr_master.c index c5464af80..f6f3e52a4 100644 --- a/server/modules/routing/binlogrouter/blr_master.c +++ b/server/modules/routing/binlogrouter/blr_master.c @@ -996,6 +996,38 @@ static bool verify_checksum(ROUTER_INSTANCE *router, size_t len, uint8_t *ptr) return rval; } +/** + * @brief Reset router errors + * + * @param router Router instance + * @param hdr Replication header + */ +static void reset_errors(ROUTER_INSTANCE *router, REP_HEADER *hdr) +{ + spinlock_acquire(&router->lock); + + /* set mysql errno to 0 */ + router->m_errno = 0; + + /* Remove error message */ + if (router->m_errmsg) + { + MXS_FREE(router->m_errmsg); + } + router->m_errmsg = NULL; + + spinlock_release(&router->lock); +#ifdef SHOW_EVENTS + printf("blr: len %lu, event type 0x%02x, flags 0x%04x, " + "event size %d, event timestamp %lu\n", + (unsigned long)len - 4, + hdr->event_type, + hdr->flags, + hdr->event_size, + (unsigned long)hdr->timestamp); +#endif +} + /** * blr_handle_binlog_record - we have received binlog records from * the master and we must now work out what to do with them. @@ -1052,6 +1084,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) { if (router->master_event_state == BLR_EVENT_DONE) { + /** This is the start of a new event */ spinlock_acquire(&router->lock); router->stats.n_binlogs++; router->stats.n_binlogs_ses++; @@ -1113,9 +1146,12 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) break; } - /** Store the header for later use */ - memcpy(&router->stored_header, &hdr, sizeof(hdr)); + /** This is the first (and possibly last) packet of a replication + * event. We store the header in case the event is large and + * it is transmitted over multiple network packets. */ router->master_event_state = BLR_EVENT_STARTED; + memcpy(&router->stored_header, &hdr, sizeof(hdr)); + reset_errors(router, &hdr); } else { @@ -1127,32 +1163,11 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) break; } - - if (hdr.ok == 0) - { - spinlock_acquire(&router->lock); - - /* set mysql errno to 0 */ - router->m_errno = 0; - - /* Remove error message */ - if (router->m_errmsg) - { - MXS_FREE(router->m_errmsg); - } - router->m_errmsg = NULL; - - spinlock_release(&router->lock); -#ifdef SHOW_EVENTS - printf("blr: len %lu, event type 0x%02x, flags 0x%04x, " - "event size %d, event timestamp %lu\n", - (unsigned long)len - 4, - hdr.event_type, - hdr.flags, - hdr.event_size, - (unsigned long)hdr.timestamp); -#endif - } + } + else + { + /** We're processing a multi-packet replication event */ + ss_dassert(router->master_event_state == BLR_EVENT_ONGOING); } /** Gather the event into one big buffer */ @@ -1169,7 +1184,11 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) if (router->master_event_state == BLR_EVENT_ONGOING) { - /** Only consume the network header */ + /** + * Consume the network header so that we can append the raw + * event data to the original buffer. This allows both checksum + * calculations and encryption to process it as a contiguous event + */ part = gwbuf_consume(part, MYSQL_HEADER_LEN); } @@ -1177,24 +1196,43 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) if (len < MYSQL_PACKET_LENGTH_MAX) { - /** This is the last packet, we can now write it to disk */ + /** + * This is either the only packet for the event or the last + * packet in a series for this event. The buffer now contains + * the network header of the first packet (4 bytes) and one OK byte. + * The semi-sync bytes are always consumed at an earlier stage. + */ ss_dassert(router->master_event_state != BLR_EVENT_DONE); if (router->master_event_state != BLR_EVENT_STARTED) { - /** This is not the first packet of the event, copy the - * stored header */ + /** + * This is not the first packet for this event. We must use + * the stored header. + */ memcpy(&hdr, &router->stored_header, sizeof(hdr)); } + + /** The event is now complete */ router->master_event_state = BLR_EVENT_DONE; } else { + /** + * This packet is a part of a series of packets that contain an + * event larger than MYSQL_PACKET_LENGTH_MAX bytes. + * + * For each partial event chunk, we remove the network header and + * append it to router->stored_event. The first event is an + * exception to this and it is appended as-is with the network + * header and the extra OK byte. + */ + ss_dassert(len == MYSQL_PACKET_LENGTH_MAX); router->master_event_state = BLR_EVENT_ONGOING; continue; } - /** We have the complete event in one contiguous buffer */ + /** We now have the complete event in one contiguous buffer */ router->stored_event = gwbuf_make_contiguous(router->stored_event); ptr = GWBUF_DATA(router->stored_event); @@ -1202,9 +1240,9 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) * header and one OK byte. Semi-sync bytes are never stored. */ len = gwbuf_length(router->stored_event); - /* - * First check that the checksum we calculate matches the - * checksum in the packet we received. + /** + * If checksums are enabled, verify that the stored checksum + * matches the one we calculated */ if (router->master_chksum && !verify_checksum(router, len, ptr)) { @@ -1406,9 +1444,6 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) } else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F) { - ptr = ptr + MYSQL_HEADER_LEN + 1; // Skip header and OK byte - uint32_t offset = MYSQL_HEADER_LEN + 1; - if (hdr.event_type == ROTATE_EVENT) { spinlock_acquire(&router->binlog_lock); @@ -1416,8 +1451,13 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) spinlock_release(&router->binlog_lock); } - /* Write event to disk */ - if (blr_write_binlog_record(router, &hdr, len - offset, ptr) == 0) + uint32_t offset = MYSQL_HEADER_LEN + 1; // Skip header and OK byte + + /** + * Write the raw event data to disk without the network + * header or the OK byte + */ + if (blr_write_binlog_record(router, &hdr, len - offset, ptr + offset) == 0) { gwbuf_free(pkt); blr_master_close(router); @@ -1425,7 +1465,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) return; } - /* Check for rotete event */ + /* Check for rotate event */ if (hdr.event_type == ROTATE_EVENT) { if (!blr_rotate_event(router, ptr, &hdr)) @@ -1437,7 +1477,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) } } - /* Handle semi-sync request fom master */ + /* Handle semi-sync request from master */ if (router->master_semi_sync != MASTER_SEMISYNC_NOT_AVAILABLE && semi_sync_send_ack == BLR_MASTER_SEMI_SYNC_ACK_REQ && (router->master_event_state == BLR_EVENT_DONE))