diff --git a/server/modules/routing/avrorouter/avro_file.c b/server/modules/routing/avrorouter/avro_file.c index 404737d72..d0772aeb9 100644 --- a/server/modules/routing/avrorouter/avro_file.c +++ b/server/modules/routing/avrorouter/avro_file.c @@ -481,6 +481,45 @@ void do_checkpoint(AVRO_INSTANCE *router, uint64_t *total_rows, uint64_t *total_ router->row_count = router->trx_count = 0; } +void process_fde(AVRO_INSTANCE* router, uint8_t* ptr, uint32_t event_size) +{ + const int BLRM_FDE_EVENT_TYPES_OFFSET = 2 + 50 + 4 + 1; + const int FDE_EXTRA_BYTES = 5; + int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1]; + int n_events = event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES; + uint8_t* checksum = ptr + event_size - event_header_length - FDE_EXTRA_BYTES; + + // Precaution to prevent writing too much in case new events are added + int real_len = MXS_MIN(n_events, sizeof(router->event_type_hdr_lens)); + memcpy(router->event_type_hdr_lens, ptr + BLRM_FDE_EVENT_TYPES_OFFSET, real_len); + + router->event_types = n_events; + router->binlog_checksum = checksum[0]; +} + +bool read_fde(AVRO_INSTANCE* router) +{ + bool rval = false; + uint8_t hdbuf[BINLOG_EVENT_HDR_LEN]; + errno = 0; // Helps prevent false positives + + if (pread(router->binlog_fd, hdbuf, BINLOG_EVENT_HDR_LEN, 4) == BINLOG_EVENT_HDR_LEN) + { + uint32_t event_size = extract_field(&hdbuf[9], 32); + uint8_t evbuf[event_size]; + uint8_t event_type = hdbuf[4]; + + if (event_type == FORMAT_DESCRIPTION_EVENT && + pread(router->binlog_fd, evbuf, event_size, 4 + BINLOG_EVENT_HDR_LEN) == event_size) + { + process_fde(router, evbuf, event_size); + rval = true; + } + } + + return rval; +} + /** * @brief Read all replication events from a binlog file. * @@ -512,6 +551,13 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router) return AVRO_BINLOG_ERROR; } + if (!read_fde(router)) + { + MXS_ERROR("Failed to read the FDE event from the binary log: %d, %s", + errno, mxs_strerror(errno)); + return AVRO_BINLOG_ERROR; + } + while (!router->service->svc_do_shutdown) { int n; @@ -638,24 +684,8 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router) hdr.event_size -= 4; } - /* check for FORMAT DESCRIPTION EVENT */ - if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) - { - const int BLRM_FDE_EVENT_TYPES_OFFSET = 2 + 50 + 4 + 1; - const int FDE_EXTRA_BYTES = 5; - int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1]; - int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES; - uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES; - - // Precaution to prevent writing too much in case new events are added - int real_len = MXS_MIN(n_events, sizeof(router->event_type_hdr_lens)); - memcpy(router->event_type_hdr_lens, ptr + BLRM_FDE_EVENT_TYPES_OFFSET, real_len); - - router->event_types = n_events; - router->binlog_checksum = checksum[0]; - } /* Decode CLOSE/STOP Event */ - else if (hdr.event_type == STOP_EVENT) + if (hdr.event_type == STOP_EVENT) { char next_file[BLRM_BINLOG_NAME_STR_LEN + 1]; stop_seen = true;