diff --git a/server/modules/routing/avrorouter/avro_file.cc b/server/modules/routing/avrorouter/avro_file.cc index 2c3b0af6e..48618f5fc 100644 --- a/server/modules/routing/avrorouter/avro_file.cc +++ b/server/modules/routing/avrorouter/avro_file.cc @@ -249,7 +249,7 @@ bool avro_load_conversion_state(Avro *router) * @return AVRO_OK if the next file exists, AVRO_LAST_FILE if this is the last * available file. */ -static avro_binlog_end_t rotate_to_next_file_if_exists(Avro* router, uint64_t pos, bool stop_seen) +static avro_binlog_end_t rotate_to_next_file_if_exists(Avro* router, uint64_t pos) { avro_binlog_end_t rval = AVRO_LAST_FILE; @@ -266,30 +266,13 @@ static avro_binlog_end_t rotate_to_next_file_if_exists(Avro* router, uint64_t po } else { - if (stop_seen) - { - MXS_NOTICE("End of binlog file [%s] at %lu with a " - "close event. Rotating to next binlog file [%s].", - router->binlog_name.c_str(), pos, next_binlog); - } - else - { - MXS_NOTICE("End of binlog file [%s] at %lu with no " - "close or rotate event. Rotating to next binlog file [%s].", - router->binlog_name.c_str(), pos, next_binlog); - } - + MXS_INFO("End of binlog file [%s] at %lu. Rotating to next binlog file [%s].", + router->binlog_name.c_str(), pos, next_binlog); rval = AVRO_OK; router->binlog_name = next_binlog; router->current_pos = 4; } } - else if (stop_seen) - { - MXS_NOTICE("End of binlog file [%s] at %lu with a close event. " - "Next binlog file does not exist, pausing file conversion.", - router->binlog_name.c_str(), pos); - } return rval; } @@ -455,7 +438,7 @@ bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_bin return rval; } -static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos, uint64_t original_size) +static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos) { bool rval = false; @@ -464,7 +447,7 @@ static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos, uint64_ MXS_INFO("Binlog %s: next pos %u < pos %lu, truncating to %lu", router->binlog_name.c_str(), hdr.next_pos, pos, pos); } - else if (hdr.next_pos > 0 && hdr.next_pos != (pos + original_size)) + else if (hdr.next_pos > 0 && hdr.next_pos != (pos + hdr.event_size)) { MXS_INFO("Binlog %s: next pos %u != (pos %lu + event_size %u), truncating to %lu", router->binlog_name.c_str(), hdr.next_pos, pos, hdr.event_size, pos); @@ -482,13 +465,8 @@ static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos, uint64_ return rval; } -void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos) +void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr) { - if (router->binlog_checksum) - { - hdr.event_size -= 4; - } - // The following events are related to the actual data if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) { @@ -553,10 +531,8 @@ void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos avro_binlog_end_t avro_read_all_events(Avro *router) { uint64_t pos = router->current_pos; - char next_binlog[BINLOG_FNAMELEN + 1]; - bool found_chksum = false; + std::string next_binlog; bool rotate_seen = false; - bool stop_seen = false; ss_dassert(router->binlog_fd != -1); @@ -573,11 +549,11 @@ avro_binlog_end_t avro_read_all_events(Avro *router) if (rotate_seen) { - rotate_to_file(router, pos, next_binlog); + rotate_to_file(router, pos, next_binlog.c_str()); } else { - rc = rotate_to_next_file_if_exists(router, pos, stop_seen); + rc = rotate_to_next_file_if_exists(router, pos); } } return rc; @@ -591,44 +567,20 @@ avro_binlog_end_t avro_read_all_events(Avro *router) return AVRO_BINLOG_ERROR; } - uint64_t original_size = hdr.event_size; - /* get event content */ uint8_t* ptr = GWBUF_DATA(result); // These events are only related to binary log files - if (hdr.event_type == STOP_EVENT) + if (hdr.event_type == ROTATE_EVENT) { - char next_file[BLRM_BINLOG_NAME_STR_LEN + 1]; - stop_seen = true; - snprintf(next_file, sizeof(next_file), BINLOG_NAMEFMT, router->filestem.c_str(), - blr_file_get_next_binlogname(router->binlog_name.c_str())); - } - else if (hdr.event_type == ROTATE_EVENT) - { - int len = hdr.event_size - BINLOG_EVENT_HDR_LEN - 8; - - if (found_chksum || router->binlog_checksum) - { - len -= 4; - } - - if (len > BINLOG_FNAMELEN) - { - MXS_WARNING("Truncated binlog name from %d to %d characters.", - len, BINLOG_FNAMELEN); - len = BINLOG_FNAMELEN; - } - - memcpy(next_binlog, ptr + 8, len); - next_binlog[len] = 0; + int len = hdr.event_size - BINLOG_EVENT_HDR_LEN - 8 - (router->binlog_checksum ? 4 : 0); + next_binlog.assign((char*)ptr + 8, len); rotate_seen = true; - } else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT) { // This appears to need special handling - int annotate_len = hdr.event_size - BINLOG_EVENT_HDR_LEN - (found_chksum || router->binlog_checksum ? 4 : 0); + int annotate_len = hdr.event_size - BINLOG_EVENT_HDR_LEN - (router->binlog_checksum ? 4 : 0); MXS_INFO("Annotate_rows_event: %.*s", annotate_len, ptr); pos += hdr.event_size; router->current_pos = pos; @@ -637,10 +589,22 @@ avro_binlog_end_t avro_read_all_events(Avro *router) } else { - handle_one_event(router, ptr, hdr, pos); + uint32_t orig_size = hdr.event_size; + + if (router->binlog_checksum) + { + // We don't care about the checksum at this point so we ignore it + hdr.event_size -= 4; + } + + handle_one_event(router, ptr, hdr); + + hdr.event_size = orig_size; } - if (pos_is_ok(router, hdr, pos, original_size)) + gwbuf_free(result); + + if (pos_is_ok(router, hdr, pos)) { pos = hdr.next_pos; router->current_pos = pos;