MXS-1881: Remove unneeded code
The code that checks for stop events is not needed as it is only used for log messages. These aren't really useful to the end user so they can be removed. Moved the modification of the event size in case binlog checksums are enabled outside of the handle_one_event function. This will make it possible to move most of the processing done inside it into a reusable class of its own. Also fixed the memory leak for the event data.
This commit is contained in:
@ -249,7 +249,7 @@ bool avro_load_conversion_state(Avro *router)
|
||||
* @return AVRO_OK if the next file exists, AVRO_LAST_FILE if this is the last
|
||||
* available file.
|
||||
*/
|
||||
static avro_binlog_end_t rotate_to_next_file_if_exists(Avro* router, uint64_t pos, bool stop_seen)
|
||||
static avro_binlog_end_t rotate_to_next_file_if_exists(Avro* router, uint64_t pos)
|
||||
{
|
||||
avro_binlog_end_t rval = AVRO_LAST_FILE;
|
||||
|
||||
@ -266,30 +266,13 @@ static avro_binlog_end_t rotate_to_next_file_if_exists(Avro* router, uint64_t po
|
||||
}
|
||||
else
|
||||
{
|
||||
if (stop_seen)
|
||||
{
|
||||
MXS_NOTICE("End of binlog file [%s] at %lu with a "
|
||||
"close event. Rotating to next binlog file [%s].",
|
||||
MXS_INFO("End of binlog file [%s] at %lu. Rotating to next binlog file [%s].",
|
||||
router->binlog_name.c_str(), pos, next_binlog);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_NOTICE("End of binlog file [%s] at %lu with no "
|
||||
"close or rotate event. Rotating to next binlog file [%s].",
|
||||
router->binlog_name.c_str(), pos, next_binlog);
|
||||
}
|
||||
|
||||
rval = AVRO_OK;
|
||||
router->binlog_name = next_binlog;
|
||||
router->current_pos = 4;
|
||||
}
|
||||
}
|
||||
else if (stop_seen)
|
||||
{
|
||||
MXS_NOTICE("End of binlog file [%s] at %lu with a close event. "
|
||||
"Next binlog file does not exist, pausing file conversion.",
|
||||
router->binlog_name.c_str(), pos);
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
@ -455,7 +438,7 @@ bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_bin
|
||||
return rval;
|
||||
}
|
||||
|
||||
static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos, uint64_t original_size)
|
||||
static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos)
|
||||
{
|
||||
bool rval = false;
|
||||
|
||||
@ -464,7 +447,7 @@ static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos, uint64_
|
||||
MXS_INFO("Binlog %s: next pos %u < pos %lu, truncating to %lu",
|
||||
router->binlog_name.c_str(), hdr.next_pos, pos, pos);
|
||||
}
|
||||
else if (hdr.next_pos > 0 && hdr.next_pos != (pos + original_size))
|
||||
else if (hdr.next_pos > 0 && hdr.next_pos != (pos + hdr.event_size))
|
||||
{
|
||||
MXS_INFO("Binlog %s: next pos %u != (pos %lu + event_size %u), truncating to %lu",
|
||||
router->binlog_name.c_str(), hdr.next_pos, pos, hdr.event_size, pos);
|
||||
@ -482,13 +465,8 @@ static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos, uint64_
|
||||
return rval;
|
||||
}
|
||||
|
||||
void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos)
|
||||
void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr)
|
||||
{
|
||||
if (router->binlog_checksum)
|
||||
{
|
||||
hdr.event_size -= 4;
|
||||
}
|
||||
|
||||
// The following events are related to the actual data
|
||||
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
|
||||
{
|
||||
@ -553,10 +531,8 @@ void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos
|
||||
avro_binlog_end_t avro_read_all_events(Avro *router)
|
||||
{
|
||||
uint64_t pos = router->current_pos;
|
||||
char next_binlog[BINLOG_FNAMELEN + 1];
|
||||
bool found_chksum = false;
|
||||
std::string next_binlog;
|
||||
bool rotate_seen = false;
|
||||
bool stop_seen = false;
|
||||
|
||||
ss_dassert(router->binlog_fd != -1);
|
||||
|
||||
@ -573,11 +549,11 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
|
||||
|
||||
if (rotate_seen)
|
||||
{
|
||||
rotate_to_file(router, pos, next_binlog);
|
||||
rotate_to_file(router, pos, next_binlog.c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
rc = rotate_to_next_file_if_exists(router, pos, stop_seen);
|
||||
rc = rotate_to_next_file_if_exists(router, pos);
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
@ -591,44 +567,20 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
|
||||
return AVRO_BINLOG_ERROR;
|
||||
}
|
||||
|
||||
uint64_t original_size = hdr.event_size;
|
||||
|
||||
/* get event content */
|
||||
uint8_t* ptr = GWBUF_DATA(result);
|
||||
|
||||
// These events are only related to binary log files
|
||||
if (hdr.event_type == STOP_EVENT)
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
{
|
||||
char next_file[BLRM_BINLOG_NAME_STR_LEN + 1];
|
||||
stop_seen = true;
|
||||
snprintf(next_file, sizeof(next_file), BINLOG_NAMEFMT, router->filestem.c_str(),
|
||||
blr_file_get_next_binlogname(router->binlog_name.c_str()));
|
||||
}
|
||||
else if (hdr.event_type == ROTATE_EVENT)
|
||||
{
|
||||
int len = hdr.event_size - BINLOG_EVENT_HDR_LEN - 8;
|
||||
|
||||
if (found_chksum || router->binlog_checksum)
|
||||
{
|
||||
len -= 4;
|
||||
}
|
||||
|
||||
if (len > BINLOG_FNAMELEN)
|
||||
{
|
||||
MXS_WARNING("Truncated binlog name from %d to %d characters.",
|
||||
len, BINLOG_FNAMELEN);
|
||||
len = BINLOG_FNAMELEN;
|
||||
}
|
||||
|
||||
memcpy(next_binlog, ptr + 8, len);
|
||||
next_binlog[len] = 0;
|
||||
int len = hdr.event_size - BINLOG_EVENT_HDR_LEN - 8 - (router->binlog_checksum ? 4 : 0);
|
||||
next_binlog.assign((char*)ptr + 8, len);
|
||||
rotate_seen = true;
|
||||
|
||||
}
|
||||
else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT)
|
||||
{
|
||||
// This appears to need special handling
|
||||
int annotate_len = hdr.event_size - BINLOG_EVENT_HDR_LEN - (found_chksum || router->binlog_checksum ? 4 : 0);
|
||||
int annotate_len = hdr.event_size - BINLOG_EVENT_HDR_LEN - (router->binlog_checksum ? 4 : 0);
|
||||
MXS_INFO("Annotate_rows_event: %.*s", annotate_len, ptr);
|
||||
pos += hdr.event_size;
|
||||
router->current_pos = pos;
|
||||
@ -637,10 +589,22 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
|
||||
}
|
||||
else
|
||||
{
|
||||
handle_one_event(router, ptr, hdr, pos);
|
||||
uint32_t orig_size = hdr.event_size;
|
||||
|
||||
if (router->binlog_checksum)
|
||||
{
|
||||
// We don't care about the checksum at this point so we ignore it
|
||||
hdr.event_size -= 4;
|
||||
}
|
||||
|
||||
if (pos_is_ok(router, hdr, pos, original_size))
|
||||
handle_one_event(router, ptr, hdr);
|
||||
|
||||
hdr.event_size = orig_size;
|
||||
}
|
||||
|
||||
gwbuf_free(result);
|
||||
|
||||
if (pos_is_ok(router, hdr, pos))
|
||||
{
|
||||
pos = hdr.next_pos;
|
||||
router->current_pos = pos;
|
||||
|
Reference in New Issue
Block a user