diff --git a/server/modules/routing/avrorouter/avro_file.cc b/server/modules/routing/avrorouter/avro_file.cc index 8664fff99..573bfe6f7 100644 --- a/server/modules/routing/avrorouter/avro_file.cc +++ b/server/modules/routing/avrorouter/avro_file.cc @@ -518,6 +518,35 @@ bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_bin return rval; } +static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos, uint64_t original_size) +{ + bool rval = false; + + if (hdr.next_pos > 0 && hdr.next_pos < pos) + { + MXS_INFO("Binlog %s: next pos %u < pos %lu, truncating to %lu", + router->binlog_name.c_str(), hdr.next_pos, pos, pos); + } + + if (hdr.next_pos > 0 && hdr.next_pos != (pos + original_size)) + { + MXS_INFO("Binlog %s: next pos %u != (pos %lu + event_size %u), truncating to %lu", + router->binlog_name.c_str(), hdr.next_pos, pos, hdr.event_size, pos); + } + + if (hdr.next_pos > 0) + { + rval = true; + } + else + { + MXS_ERROR("Current event type %d @ %lu has nex pos = %u : exiting", + hdr.event_type, pos, hdr.next_pos); + } + + return rval; +} + /** * @brief Read all replication events from a binlog file. * @@ -531,7 +560,7 @@ bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_bin */ avro_binlog_end_t avro_read_all_events(Avro *router) { - unsigned long long pos = router->current_pos; + uint64_t pos = router->current_pos; char next_binlog[BINLOG_FNAMELEN + 1]; uint64_t total_commits = 0, total_rows = 0; bool found_chksum = false; @@ -581,48 +610,14 @@ avro_binlog_end_t avro_read_all_events(Avro *router) hdr.event_size -= 4; } - /* check for FORMAT DESCRIPTION EVENT */ - if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) - { - const int BLRM_FDE_EVENT_TYPES_OFFSET = 2 + 50 + 4 + 1; - const int FDE_EXTRA_BYTES = 5; - int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1]; - int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES; - uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES; - - // Precaution to prevent writing too much in case new events are added - int real_len = MXS_MIN(n_events, (int)sizeof(router->event_type_hdr_lens)); - memcpy(router->event_type_hdr_lens, ptr + BLRM_FDE_EVENT_TYPES_OFFSET, real_len); - - router->event_types = n_events; - router->binlog_checksum = checksum[0]; - } - /* Decode CLOSE/STOP Event */ - else if (hdr.event_type == STOP_EVENT) + // These events are only related to binary log files + if (hdr.event_type == STOP_EVENT) { char next_file[BLRM_BINLOG_NAME_STR_LEN + 1]; stop_seen = true; snprintf(next_file, sizeof(next_file), BINLOG_NAMEFMT, router->filestem.c_str(), blr_file_get_next_binlogname(router->binlog_name.c_str())); } - else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT) - { - MXS_INFO("Annotate_rows_event: %.*s", hdr.event_size - BINLOG_EVENT_HDR_LEN, ptr); - pos += original_size; - router->current_pos = pos; - continue; - } - else if (hdr.event_type == TABLE_MAP_EVENT) - { - handle_table_map_event(router, &hdr, ptr); - } - else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) || - (hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2)) - { - router->row_count++; - handle_row_event(router, &hdr, ptr); - } - /* Decode ROTATE EVENT */ else if (hdr.event_type == ROTATE_EVENT) { int len = hdr.event_size - BINLOG_EVENT_HDR_LEN - 8; @@ -644,6 +639,41 @@ avro_binlog_end_t avro_read_all_events(Avro *router) rotate_seen = true; } + // The following events are related to the actual data + else if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) + { + const int BLRM_FDE_EVENT_TYPES_OFFSET = 2 + 50 + 4 + 1; + const int FDE_EXTRA_BYTES = 5; + int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1]; + int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES; + uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES; + + // Precaution to prevent writing too much in case new events are added + int real_len = MXS_MIN(n_events, (int)sizeof(router->event_type_hdr_lens)); + memcpy(router->event_type_hdr_lens, ptr + BLRM_FDE_EVENT_TYPES_OFFSET, real_len); + + router->event_types = n_events; + router->binlog_checksum = checksum[0]; + } + else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT) + { + // This appears to need special handling + MXS_INFO("Annotate_rows_event: %.*s", hdr.event_size - BINLOG_EVENT_HDR_LEN, ptr); + pos += original_size; + router->current_pos = pos; + gwbuf_free(result); + continue; + } + else if (hdr.event_type == TABLE_MAP_EVENT) + { + handle_table_map_event(router, &hdr, ptr); + } + else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) || + (hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2)) + { + router->row_count++; + handle_row_event(router, &hdr, ptr); + } else if (hdr.event_type == MARIADB10_GTID_EVENT) { uint64_t n_sequence; /* 8 bytes */ @@ -656,12 +686,6 @@ avro_binlog_end_t avro_read_all_events(Avro *router) router->gtid.event_num = 0; router->gtid.timestamp = hdr.timestamp; } - /** - * Check QUERY_EVENT - * - * Check for BEGIN ( ONLY for mysql 5.6, mariadb 5.5 ) - * Check for COMMIT (not transactional engines) - */ else if (hdr.event_type == QUERY_EVENT) { handle_query_event(router, &hdr, ptr); @@ -679,31 +703,13 @@ avro_binlog_end_t avro_read_all_events(Avro *router) gwbuf_free(result); - /* pos and next_pos sanity checks */ - if (hdr.next_pos > 0 && hdr.next_pos < pos) - { - MXS_INFO("Binlog %s: next pos %u < pos %llu, truncating to %llu", - router->binlog_name.c_str(), hdr.next_pos, pos, pos); - break; - } - - if (hdr.next_pos > 0 && hdr.next_pos != (pos + original_size)) - { - MXS_INFO("Binlog %s: next pos %u != (pos %llu + event_size %u), truncating to %llu", - router->binlog_name.c_str(), hdr.next_pos, pos, hdr.event_size, pos); - break; - } - - /* set pos to new value */ - if (hdr.next_pos > 0) + if (pos_is_ok(router, hdr, pos, original_size)) { pos = hdr.next_pos; router->current_pos = pos; } else { - MXS_ERROR("Current event type %d @ %llu has nex pos = %u : exiting", - hdr.event_type, pos, hdr.next_pos); break; } }