From cd2af6ffef226dc5874de2406516219366517458 Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Mon, 15 Feb 2016 16:41:45 +0200 Subject: [PATCH] Cleaned up the code based on the code review Added missing error condition checks and cleaned up code. --- server/modules/include/blr.h | 11 ++- server/modules/routing/binlog/blr_master.c | 86 ++++++++++------------ server/modules/routing/binlog/blr_slave.c | 17 +++-- 3 files changed, 55 insertions(+), 59 deletions(-) diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 75fcd7d47..c9a22fd0c 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -199,12 +199,15 @@ #define MYSQL_ERROR_MSG(buf) ((uint8_t *)GWBUF_DATA(buf) + 7) #define MYSQL_COMMAND(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4)) +/** Possible states of an event sent by the master */ enum blr_event_state { - BLR_EVENT_DONE, - BLR_EVENT_STARTED, - BLR_EVENT_ONGOING, - BLR_EVENT_COMPLETE + BLR_EVENT_DONE, /*< No event being processed */ + BLR_EVENT_STARTED, /*< The first packet of an event which spans multiple packets + * has been received */ + BLR_EVENT_ONGOING, /*< Other packets of a multi-packet event are being processed */ + BLR_EVENT_COMPLETE /*< A multi-packet event has been successfully processed + * but the router is not yet ready to process another one */ }; /* Master Server configuration struct */ diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index 29ff2567c..8787bcb36 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -795,9 +795,6 @@ int no_residual = 1; int preslen = -1; int prev_length = -1; int n_bufs = -1, pn_bufs = -1; -int event_limit; -uint32_t totalsize = 0; -uint32_t partialpos = 0; /* * Prepend any residual buffer to the buffer chain we have @@ -985,6 +982,11 @@ uint32_t partialpos = 0; /** Store the header for later use */ memcpy(&router->stored_header, &hdr, sizeof(hdr)); } + + /** Prepare the checksum variables for this event */ + router->stored_checksum = crc32(0L, NULL, 0); + router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN; + router->partial_checksum_bytes = 0; } if (hdr.ok == 0) @@ -1012,7 +1014,7 @@ uint32_t partialpos = 0; if (len - MYSQL_HEADER_LEN < MYSQL_PACKET_LENGTH_MAX) { /** This is the last packet, we can now proceed to distribute - * the event */ + * the event afer it has been written to disk */ ss_dassert(router->master_event_state != BLR_EVENT_COMPLETE); router->master_event_state = BLR_EVENT_COMPLETE; memcpy(&hdr, &router->stored_header, sizeof(hdr)); @@ -1028,16 +1030,7 @@ uint32_t partialpos = 0; { offset = MYSQL_HEADER_LEN + 1; router->master_event_state = BLR_EVENT_ONGOING; - - /** Initialize the checksum and calculate it for - * the first packet */ - if (router->master_chksum) - { - router->stored_checksum = crc32(0L, NULL, 0); - router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN; - router->partial_checksum_bytes = 0; - extra_bytes = MYSQL_HEADER_LEN + 1; - } + extra_bytes = MYSQL_HEADER_LEN + 1; } if (router->master_chksum) @@ -1048,7 +1041,7 @@ uint32_t partialpos = 0; size); router->checksum_size -= size; - if(router->checksum_size == 0 && size < len - offset) + if (router->checksum_size == 0 && size < len - offset) { extract_checksum(router, ptr + offset + size, len - offset - size); } @@ -1083,11 +1076,8 @@ uint32_t partialpos = 0; if (router->master_event_state == BLR_EVENT_DONE) { - /** Initialize the checksum and set the pointer offset to - * the first byte after the header and OK byte */ - router->stored_checksum = crc32(0L, NULL, 0); - router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN; - router->partial_checksum_bytes = 0; + /** Set the pointer offset to the first byte after + * the header and OK byte */ offset = MYSQL_HEADER_LEN + 1; size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN - 1; } @@ -1248,7 +1238,9 @@ uint32_t partialpos = 0; } } - event_limit = router->mariadb10_compat ? MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE; + /** Gather statistics about the replication event types */ + int event_limit = router->mariadb10_compat ? + MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE; if (hdr.event_type >= 0 && hdr.event_type <= event_limit) router->stats.events[hdr.event_type]++; @@ -2396,7 +2388,13 @@ int n; strerror_r(errno, err_msg, sizeof(err_msg))); /* Remove any partial event that was written */ - ftruncate(router->binlog_fd, router->last_written); + if(ftruncate(router->binlog_fd, router->last_written)) + { + MXS_ERROR("%s: Failed to truncate binlog record at %lu of %s, %s. ", + router->service->name, router->last_written, + router->binlog_name, + strerror_r(errno, err_msg, sizeof(err_msg))); + } return 0; } router->last_written += data_len; @@ -2474,43 +2472,28 @@ bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf) } else { - int64_t len = hdr->event_size + 1; - - if (blr_send_packet(slave, buf, MYSQL_PACKET_LENGTH_MAX - 1, true)) - { - len -= MYSQL_PACKET_LENGTH_MAX; - buf += MYSQL_PACKET_LENGTH_MAX - 1; - - /** Check for special case where length of the MySQL packet - * is exactly 0x00ffffff bytes long. In this case we need to send - * an empty packet to tell the slave that the whole event is sent. */ - if (len == 0) - { - blr_send_packet(slave, buf, 0, false); - } - } - else - { - rval = false; - } + /** Total size of all the payloads in all the packets */ + int64_t len = hdr->event_size; + bool first = true; while (rval && len > 0) { - /** Write rest of the event data */ - uint64_t payload_len = MIN(MYSQL_PACKET_LENGTH_MAX, len); + uint64_t payload_len = first ? MYSQL_PACKET_LENGTH_MAX - 1 : + MIN(MYSQL_PACKET_LENGTH_MAX, len); - if (blr_send_packet(slave, buf, payload_len, false)) + if (blr_send_packet(slave, buf, payload_len, first)) { /** The check for exactly 0x00ffffff bytes needs to be done * here as well */ if (len == MYSQL_PACKET_LENGTH_MAX) { - ss_dassert(len - MYSQL_PACKET_LENGTH_MAX == 0); blr_send_packet(slave, buf, 0, false); } - len -= payload_len; + /** Add the extra byte written by blr_send_packet */ + len -= first ? payload_len + 1 : payload_len; buf += payload_len; + first = false; } else { @@ -2530,6 +2513,15 @@ bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf) return rval; } +/** + * Extract the checksum from the binlogs + * + * This updates the internal state of the router and will allow us to detect + * if the checksum is split across two packets. + * @param router Router instance + * @param cksumptr Pointer to the checksum + * @param len How much of the data is readable + */ void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len) { uint8_t *ptr = cksumptr; @@ -2539,4 +2531,4 @@ void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len) ptr++; router->partial_checksum_bytes++; } -} \ No newline at end of file +} diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index 08a11793a..bccca4bbf 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -2078,17 +2078,18 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1]; hkheartbeat - beat1); } - blr_send_event(slave, &hdr, (uint8_t*) record->start); + if (blr_send_event(slave, &hdr, (uint8_t*) record->start)) + { + if (hdr.event_type != ROTATE_EVENT) + { + slave->binlog_pos = hdr.next_pos; + } + slave->stats.n_events++; + burst_size -= hdr.event_size; + } gwbuf_free(record); record = NULL; - if (hdr.event_type != ROTATE_EVENT) - { - slave->binlog_pos = hdr.next_pos; - } - slave->stats.n_events++; - burst_size -= hdr.event_size; - /* set lastReply for slave heartbeat check */ if (router->send_slave_heartbeat) slave->lastReply = time(0);