diff --git a/server/modules/routing/binlogrouter/blr.c b/server/modules/routing/binlogrouter/blr.c index 2ad65d887..e15632af3 100644 --- a/server/modules/routing/binlogrouter/blr.c +++ b/server/modules/routing/binlogrouter/blr.c @@ -530,7 +530,6 @@ createInstance(SERVICE *service, char **options) inst->reconnect_pending = 0; inst->handling_threads = 0; inst->rotating = 0; - inst->residual = NULL; inst->slaves = NULL; inst->next = NULL; inst->lastEventTimestamp = 0; @@ -2334,13 +2333,6 @@ destroyInstance(ROUTER *instance) } } - /* Discard the queued residual data */ - while (inst->residual) - { - inst->residual = gwbuf_consume(inst->residual, GWBUF_LENGTH(inst->residual)); - } - inst->residual = NULL; - MXS_INFO("%s is being stopped by MaxScale shudown. Disconnecting from master %s:%d, " "read up to log %s, pos %lu, transaction safe pos %lu", inst->service->name, diff --git a/server/modules/routing/binlogrouter/blr.h b/server/modules/routing/binlogrouter/blr.h index 576542ff4..7e21c2c2c 100644 --- a/server/modules/routing/binlogrouter/blr.h +++ b/server/modules/routing/binlogrouter/blr.h @@ -231,12 +231,9 @@ static const char BLR_DBUSERS_FILE[] = "dbusers"; /** Possible states of an event sent by the master */ enum blr_event_state { - BLR_EVENT_DONE, /*< No event being processed */ - BLR_EVENT_STARTED, /*< The first packet of an event which spans multiple packets - * has been received */ + BLR_EVENT_STARTED, /*< The first packet of an event has been received */ BLR_EVENT_ONGOING, /*< Other packets of a multi-packet event are being processed */ - BLR_EVENT_COMPLETE /*< A multi-packet event has been successfully processed - * but the router is not yet ready to process another one */ + BLR_EVENT_DONE, /*< The complete event was received */ }; /* Master Server configuration struct */ @@ -490,19 +487,14 @@ typedef struct router_instance unsigned int master_state; /*< State of the master FSM */ uint8_t lastEventReceived; /*< Last even received */ uint32_t lastEventTimestamp; /*< Timestamp from last event */ - GWBUF *residual; /*< Any residual binlog event */ MASTER_RESPONSES saved_master; /*< Saved master responses */ char *binlogdir; /*< The directory with the binlog files */ SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */ int trx_safe; /*< Detect and handle partial transactions */ int pending_transaction; /*< Pending transaction */ enum blr_event_state master_event_state; /*< Packet read state */ - uint32_t stored_checksum; /*< The current value of the checksum */ - uint8_t partial_checksum[MYSQL_CHECKSUM_LEN]; /*< The partial value of the checksum - * received from the master */ - uint8_t partial_checksum_bytes; /*< How many bytes of the checksum we have read */ - uint64_t checksum_size; /*< Data size for the checksum */ REP_HEADER stored_header; /*< Relication header of the event the master is sending */ + GWBUF *stored_event; /*< Partial even buffer */ uint64_t last_safe_pos; /* last committed transaction */ char binlog_name[BINLOG_FNAMELEN + 1]; /*< Name of the current binlog file */ diff --git a/server/modules/routing/binlogrouter/blr_master.c b/server/modules/routing/binlogrouter/blr_master.c index 7599277ba..c5464af80 100644 --- a/server/modules/routing/binlogrouter/blr_master.c +++ b/server/modules/routing/binlogrouter/blr_master.c @@ -102,8 +102,6 @@ static void blr_extract_header_semisync(uint8_t *pkt, REP_HEADER *hdr); static int blr_send_semisync_ack (ROUTER_INSTANCE *router, uint64_t pos); static int blr_get_master_semisync(GWBUF *buf); -int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf); -void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len); static void blr_terminate_master_replication(ROUTER_INSTANCE *router, uint8_t* ptr, int len); void blr_notify_all_slaves(ROUTER_INSTANCE *router); extern bool blr_notify_waiting_slave(ROUTER_SLAVE *slave); @@ -166,13 +164,6 @@ blr_start_master(void* data) } router->master_state = BLRM_CONNECTING; - /* Discard the queued residual data */ - while (router->residual) - { - router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual)); - } - router->residual = NULL; - spinlock_release(&router->lock); if ((client = dcb_alloc(DCB_ROLE_INTERNAL, NULL)) == NULL) { @@ -240,13 +231,6 @@ blr_restart_master(ROUTER_INSTANCE *router) { dcb_close(router->client); - /* Discard the queued residual data */ - while (router->residual) - { - router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual)); - } - router->residual = NULL; - /* Now it is safe to unleash other threads on this router instance */ spinlock_acquire(&router->lock); router->reconnect_pending = 0; @@ -333,6 +317,8 @@ blr_master_close(ROUTER_INSTANCE *router) dcb_close(router->master); router->master_state = BLRM_UNCONNECTED; router->master_event_state = BLR_EVENT_DONE; + gwbuf_free(router->stored_event); + router->stored_event = NULL; } /** @@ -985,6 +971,31 @@ encode_value(unsigned char *data, unsigned int value, int len) } } +/** + * Check that the stored event checksum matches the calculated checksum + */ +static bool verify_checksum(ROUTER_INSTANCE *router, size_t len, uint8_t *ptr) +{ + bool rval = true; + uint32_t offset = MYSQL_HEADER_LEN + 1; + uint32_t size = len - (offset + MYSQL_CHECKSUM_LEN); + + uint32_t checksum = crc32(0L, ptr + offset, size); + uint32_t pktsum = EXTRACT32(ptr + offset + size); + + if (pktsum != checksum) + { + rval = false; + MXS_ERROR("%s: Checksum error in event from master, " + "binlog %s @ %lu. Closing master connection.", + router->service->name, router->binlog_name, + router->current_pos); + router->stats.n_badcrc++; + } + + return rval; +} + /** * blr_handle_binlog_record - we have received binlog records from * the master and we must now work out what to do with them. @@ -998,30 +1009,18 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) uint8_t *msg = NULL, *ptr; REP_HEADER hdr; unsigned int len = 0; - unsigned int pkt_length; int prev_length = -1; int n_bufs = -1, pn_bufs = -1; int check_packet_len; int semisync_bytes; int semi_sync_send_ack = 0; - /* - * Prepend any residual buffer to the buffer chain we have - * been called with. - */ - if (router->residual) - { - pkt = gwbuf_append(router->residual, pkt); - router->residual = NULL; - } - - pkt_length = gwbuf_length(pkt); /* * Loop over all the packets while we still have some data * and the packet length is enough to hold a replication event * header. */ - while (pkt && pkt_length > 24) + while (pkt) { ptr = GWBUF_DATA(pkt); len = gw_mysql_get_byte3(ptr); @@ -1035,9 +1034,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) if (len < BINLOG_EVENT_HDR_LEN && router->master_event_state != BLR_EVENT_ONGOING) { - // Dead code? - - char *event_msg = ""; + char *event_msg = "unknown"; /* Packet is too small to be a binlog event */ if (ptr[4] == 0xfe) /* EOF Packet */ @@ -1049,6 +1046,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) event_msg = "error"; } MXS_NOTICE("Non-event message (%s) from master.", event_msg); + pkt = gwbuf_consume(pkt, len); } else { @@ -1089,7 +1087,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) /* Sanity check */ if (hdr.ok == 0) { - if (hdr.event_size != len - check_packet_len && + if (hdr.event_size != len - (check_packet_len - MYSQL_HEADER_LEN) && (hdr.event_size + (check_packet_len - MYSQL_HEADER_LEN)) < MYSQL_PACKET_LENGTH_MAX) { MXS_ERROR("Packet length is %d, but event size is %d, " @@ -1114,18 +1112,10 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) break; } - else if (hdr.event_size + (check_packet_len - MYSQL_HEADER_LEN) >= MYSQL_PACKET_LENGTH_MAX) - { - router->master_event_state = BLR_EVENT_STARTED; - /** Store the header for later use */ - memcpy(&router->stored_header, &hdr, sizeof(hdr)); - } - - /** Prepare the checksum variables for this event */ - router->stored_checksum = crc32(0L, NULL, 0); - router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN; - router->partial_checksum_bytes = 0; + /** Store the header for later use */ + memcpy(&router->stored_header, &hdr, sizeof(hdr)); + router->master_event_state = BLR_EVENT_STARTED; } else { @@ -1134,7 +1124,6 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) gwbuf_free(pkt); pkt = NULL; - pkt_length = 0; break; } @@ -1166,125 +1155,63 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) } } - /* pending large event */ - if (router->master_event_state != BLR_EVENT_DONE) + /** Gather the event into one big buffer */ + GWBUF *part = gwbuf_split(&pkt, len + MYSQL_HEADER_LEN); + + if (semisync_bytes) { - if (len - MYSQL_HEADER_LEN < MYSQL_PACKET_LENGTH_MAX) + /** Consume the two semi-sync bytes */ + part = gwbuf_consume(part, semisync_bytes); + } + + ss_dassert(router->master_event_state == BLR_EVENT_STARTED || + router->master_event_state == BLR_EVENT_ONGOING); + + if (router->master_event_state == BLR_EVENT_ONGOING) + { + /** Only consume the network header */ + part = gwbuf_consume(part, MYSQL_HEADER_LEN); + } + + router->stored_event = gwbuf_append(router->stored_event, part); + + if (len < MYSQL_PACKET_LENGTH_MAX) + { + /** This is the last packet, we can now write it to disk */ + ss_dassert(router->master_event_state != BLR_EVENT_DONE); + + if (router->master_event_state != BLR_EVENT_STARTED) { - /** This is the last packet, we can now proceed to distribute - * the event afer it has been written to disk */ - ss_dassert(router->master_event_state != BLR_EVENT_COMPLETE); - router->master_event_state = BLR_EVENT_COMPLETE; + /** This is not the first packet of the event, copy the + * stored header */ memcpy(&hdr, &router->stored_header, sizeof(hdr)); } - else - { - /* current partial event is being written to disk file */ - uint32_t offset = MYSQL_HEADER_LEN; - uint32_t extra_bytes = MYSQL_HEADER_LEN; - - /** Don't write the OK byte into the binlog */ - if (router->master_event_state == BLR_EVENT_STARTED) - { - offset = MYSQL_HEADER_LEN + 1; - router->master_event_state = BLR_EVENT_ONGOING; - extra_bytes = MYSQL_HEADER_LEN + 1; - } - - ss_dassert(len - extra_bytes - semisync_bytes > 0); - uint32_t bytes_available = len - extra_bytes - semisync_bytes; - - if (router->master_chksum) - { - uint32_t size = MXS_MIN(len - extra_bytes - semisync_bytes, - router->checksum_size); - - router->stored_checksum = crc32(router->stored_checksum, - ptr + offset, - size); - router->checksum_size -= size; - - if (router->checksum_size == 0 && size < bytes_available) - { - extract_checksum(router, ptr + offset + size, - bytes_available - size); - } - } - - if (blr_write_data_into_binlog(router, bytes_available, - ptr + offset) == 0) - { - /** Failed to write to the binlog file, destroy the buffer - * chain and close the connection with the master */ - while (pkt) - { - pkt = GWBUF_CONSUME_ALL(pkt); - } - blr_master_close(router); - blr_master_delayed_connect(router); - return; - } - pkt = gwbuf_consume(pkt, len); - pkt_length -= len; - continue; - } + router->master_event_state = BLR_EVENT_DONE; } + else + { + router->master_event_state = BLR_EVENT_ONGOING; + continue; + } + + /** We have the complete event in one contiguous buffer */ + router->stored_event = gwbuf_make_contiguous(router->stored_event); + ptr = GWBUF_DATA(router->stored_event); + + /** len is now the length of the complete event plus 4 bytes of network + * header and one OK byte. Semi-sync bytes are never stored. */ + len = gwbuf_length(router->stored_event); /* * First check that the checksum we calculate matches the * checksum in the packet we received. */ - if (router->master_chksum) + if (router->master_chksum && !verify_checksum(router, len, ptr)) { - uint32_t offset = MYSQL_HEADER_LEN; - uint32_t size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN; - - if (router->master_event_state == BLR_EVENT_DONE) - { - /** Set the pointer offset to the first byte after - * the header and OK byte */ - offset = MYSQL_HEADER_LEN + 1; - size = len - (check_packet_len + MYSQL_CHECKSUM_LEN); - } - - size = MXS_MIN(size, router->checksum_size); - - if (router->checksum_size > 0) - { - router->stored_checksum = crc32(router->stored_checksum, - ptr + offset, size); - router->checksum_size -= size; - } - - if(router->checksum_size == 0 && size < (len - offset - semisync_bytes)) - { - extract_checksum(router, ptr + offset + size, - len - offset - size - semisync_bytes); - } - - if (router->partial_checksum_bytes == MYSQL_CHECKSUM_LEN) - { - uint32_t pktsum = EXTRACT32(router->partial_checksum); - if (pktsum != router->stored_checksum) - { - router->stats.n_badcrc++; - MXS_FREE(msg); - /* msg = NULL; Not needed unless msg will be referred to again */ - MXS_ERROR("%s: Checksum error in event from master, " - "binlog %s @ %lu. Closing master connection.", - router->service->name, router->binlog_name, - router->current_pos); - blr_master_close(router); - blr_master_delayed_connect(router); - return; - } - } - else - { - pkt = gwbuf_consume(pkt, len); - pkt_length -= len; - continue; - } + MXS_FREE(msg); + blr_master_close(router); + blr_master_delayed_connect(router); + return; } if (hdr.ok == 0) @@ -1479,15 +1406,8 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) } else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F) { - ptr = ptr + 4; // Skip header - uint32_t offset = 4; - - if (router->master_event_state == BLR_EVENT_STARTED || - router->master_event_state == BLR_EVENT_DONE) - { - ptr++; - offset++; - } + ptr = ptr + MYSQL_HEADER_LEN + 1; // Skip header and OK byte + uint32_t offset = MYSQL_HEADER_LEN + 1; if (hdr.event_type == ROTATE_EVENT) { @@ -1496,23 +1416,10 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) spinlock_release(&router->binlog_lock); } - /* Current event is being written to disk file. - * It is possible for an empty packet to be sent if an - * event is exactly 2^24 bytes long. In this case the - * empty packet should be discarded. */ - if (len > MYSQL_HEADER_LEN && - blr_write_binlog_record(router, &hdr, len - offset - semisync_bytes, ptr) == 0) + /* Write event to disk */ + if (blr_write_binlog_record(router, &hdr, len - offset, ptr) == 0) { - /* - * Failed to write to the - * binlog file, destroy the - * buffer chain and close the - * connection with the master - */ - while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL) - { - ; - } + gwbuf_free(pkt); blr_master_close(router); blr_master_delayed_connect(router); return; @@ -1523,16 +1430,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) { if (!blr_rotate_event(router, ptr, &hdr)) { - /* - * Failed to write to the - * binlog file, destroy the - * buffer chain and close the - * connection with the master - */ - while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL) - { - ; - } + gwbuf_free(pkt); blr_master_close(router); blr_master_delayed_connect(router); return; @@ -1542,8 +1440,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) /* Handle semi-sync request fom master */ if (router->master_semi_sync != MASTER_SEMISYNC_NOT_AVAILABLE && semi_sync_send_ack == BLR_MASTER_SEMI_SYNC_ACK_REQ && - (router->master_event_state == BLR_EVENT_COMPLETE || - router->master_event_state == BLR_EVENT_DONE)) + (router->master_event_state == BLR_EVENT_DONE)) { MXS_DEBUG("%s: binlog record in file %s, pos %lu has " @@ -1629,16 +1526,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) spinlock_release(&router->binlog_lock); if (!blr_rotate_event(router, ptr, &hdr)) { - /* - * Failed to write to the - * binlog file, destroy the - * buffer chain and close the - * connection with the master - */ - while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL) - { - ; - } + gwbuf_free(pkt); blr_master_close(router); blr_master_delayed_connect(router); return; @@ -1646,17 +1534,15 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) } } } - - /** A large event is now fully received and processed */ - if (router->master_event_state == BLR_EVENT_COMPLETE) - { - router->master_event_state = BLR_EVENT_DONE; - } } else { blr_terminate_master_replication(router, ptr, len); } + + /** Finished processing the event */ + gwbuf_free(router->stored_event); + router->stored_event = NULL; } if (msg) @@ -1664,33 +1550,8 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) MXS_FREE(msg); msg = NULL; } - prev_length = len; - while (len > 0) - { - unsigned int n, plen; - plen = GWBUF_LENGTH(pkt); - n = (plen < len ? plen : len); - pkt = gwbuf_consume(pkt, n); - len -= n; - pkt_length -= n; - } - - pn_bufs = n_bufs; } - /* - * Check if we have a residual, part binlog message to deal with. - * Just simply store the GWBUF for next time - */ - if (pkt) - { - router->residual = pkt; - ss_dassert(pkt_length != 0); - } - else - { - ss_dassert(pkt_length == 0); - } blr_file_flush(router); } @@ -2129,13 +1990,6 @@ blr_stop_start_master(ROUTER_INSTANCE *router) } } - /* Discard the queued residual data */ - while (router->residual) - { - router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual)); - } - router->residual = NULL; - router->master_state = BLRM_UNCONNECTED; spinlock_release(&router->lock); @@ -2298,45 +2152,6 @@ static void blr_log_identity(ROUTER_INSTANCE *router) } } -/** - * @brief Write data into binlogs (incomplete event) - * - * Writes @c data_len bytes of data from @c buf into the current binlog being processed. - * - * @param router Router instance - * @param data_len Number of bytes to write - * @param buf Pointer where the data is read - * @return Number of bytes written or 0 on error - */ -int -blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf) -{ - int n; - - if ((n = pwrite(router->binlog_fd, buf, data_len, - router->last_written)) != data_len) - { - char err_msg[MXS_STRERROR_BUFLEN]; - MXS_ERROR("%s: Failed to write binlog record at %lu of %s, %s. " - "Truncating to previous record.", - router->service->name, router->last_written, - router->binlog_name, - strerror_r(errno, err_msg, sizeof(err_msg))); - - /* Remove any partial event that was written */ - if (ftruncate(router->binlog_fd, router->last_written)) - { - MXS_ERROR("%s: Failed to truncate binlog record at %lu of %s, %s. ", - router->service->name, router->last_written, - router->binlog_name, - strerror_r(errno, err_msg, sizeof(err_msg))); - } - return 0; - } - router->last_written += data_len; - return n; -} - /** * Send a replication event packet to a slave * @@ -2485,26 +2300,6 @@ bool blr_send_event(blr_thread_role_t role, return rval; } -/** - * Extract the checksum from the binlogs - * - * This updates the internal state of the router and will allow us to detect - * if the checksum is split across two packets. - * @param router Router instance - * @param cksumptr Pointer to the checksum - * @param len How much of the data is readable - */ -void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len) -{ - uint8_t *ptr = cksumptr; - while (ptr - cksumptr < len && router->partial_checksum_bytes < MYSQL_CHECKSUM_LEN) - { - router->partial_checksum[router->partial_checksum_bytes] = *ptr; - ptr++; - router->partial_checksum_bytes++; - } -} - /** * Stop the slave connection and log errors * diff --git a/server/modules/routing/binlogrouter/blr_slave.c b/server/modules/routing/binlogrouter/blr_slave.c index 9de254e0c..bf4763fd9 100644 --- a/server/modules/routing/binlogrouter/blr_slave.c +++ b/server/modules/routing/binlogrouter/blr_slave.c @@ -3332,13 +3332,6 @@ blr_stop_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) } } - /* Discard the queued residual data */ - while (router->residual) - { - router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual)); - } - router->residual = NULL; - /* Now it is safe to unleash other threads on this router instance */ router->reconnect_pending = 0; router->active_logs = 0;