Reorganize avrorouter event processing

Reorganized and cleaned up the binlog event processing code. Moved some of
the sanity checks into subfunctions and placed file related checks into a
separate section.
This commit is contained in:
Markus Mäkelä 2018-06-07 09:40:12 +03:00
parent 8fab725413
commit 1786fb59bb
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19

View File

@ -518,6 +518,35 @@ bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_bin
return rval;
}
static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos, uint64_t original_size)
{
bool rval = false;
if (hdr.next_pos > 0 && hdr.next_pos < pos)
{
MXS_INFO("Binlog %s: next pos %u < pos %lu, truncating to %lu",
router->binlog_name.c_str(), hdr.next_pos, pos, pos);
}
if (hdr.next_pos > 0 && hdr.next_pos != (pos + original_size))
{
MXS_INFO("Binlog %s: next pos %u != (pos %lu + event_size %u), truncating to %lu",
router->binlog_name.c_str(), hdr.next_pos, pos, hdr.event_size, pos);
}
if (hdr.next_pos > 0)
{
rval = true;
}
else
{
MXS_ERROR("Current event type %d @ %lu has nex pos = %u : exiting",
hdr.event_type, pos, hdr.next_pos);
}
return rval;
}
/**
* @brief Read all replication events from a binlog file.
*
@ -531,7 +560,7 @@ bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_bin
*/
avro_binlog_end_t avro_read_all_events(Avro *router)
{
unsigned long long pos = router->current_pos;
uint64_t pos = router->current_pos;
char next_binlog[BINLOG_FNAMELEN + 1];
uint64_t total_commits = 0, total_rows = 0;
bool found_chksum = false;
@ -581,48 +610,14 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
hdr.event_size -= 4;
}
/* check for FORMAT DESCRIPTION EVENT */
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
{
const int BLRM_FDE_EVENT_TYPES_OFFSET = 2 + 50 + 4 + 1;
const int FDE_EXTRA_BYTES = 5;
int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1];
int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES;
uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES;
// Precaution to prevent writing too much in case new events are added
int real_len = MXS_MIN(n_events, (int)sizeof(router->event_type_hdr_lens));
memcpy(router->event_type_hdr_lens, ptr + BLRM_FDE_EVENT_TYPES_OFFSET, real_len);
router->event_types = n_events;
router->binlog_checksum = checksum[0];
}
/* Decode CLOSE/STOP Event */
else if (hdr.event_type == STOP_EVENT)
// These events are only related to binary log files
if (hdr.event_type == STOP_EVENT)
{
char next_file[BLRM_BINLOG_NAME_STR_LEN + 1];
stop_seen = true;
snprintf(next_file, sizeof(next_file), BINLOG_NAMEFMT, router->filestem.c_str(),
blr_file_get_next_binlogname(router->binlog_name.c_str()));
}
else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT)
{
MXS_INFO("Annotate_rows_event: %.*s", hdr.event_size - BINLOG_EVENT_HDR_LEN, ptr);
pos += original_size;
router->current_pos = pos;
continue;
}
else if (hdr.event_type == TABLE_MAP_EVENT)
{
handle_table_map_event(router, &hdr, ptr);
}
else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) ||
(hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2))
{
router->row_count++;
handle_row_event(router, &hdr, ptr);
}
/* Decode ROTATE EVENT */
else if (hdr.event_type == ROTATE_EVENT)
{
int len = hdr.event_size - BINLOG_EVENT_HDR_LEN - 8;
@ -644,6 +639,41 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
rotate_seen = true;
}
// The following events are related to the actual data
else if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
{
const int BLRM_FDE_EVENT_TYPES_OFFSET = 2 + 50 + 4 + 1;
const int FDE_EXTRA_BYTES = 5;
int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1];
int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES;
uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES;
// Precaution to prevent writing too much in case new events are added
int real_len = MXS_MIN(n_events, (int)sizeof(router->event_type_hdr_lens));
memcpy(router->event_type_hdr_lens, ptr + BLRM_FDE_EVENT_TYPES_OFFSET, real_len);
router->event_types = n_events;
router->binlog_checksum = checksum[0];
}
else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT)
{
// This appears to need special handling
MXS_INFO("Annotate_rows_event: %.*s", hdr.event_size - BINLOG_EVENT_HDR_LEN, ptr);
pos += original_size;
router->current_pos = pos;
gwbuf_free(result);
continue;
}
else if (hdr.event_type == TABLE_MAP_EVENT)
{
handle_table_map_event(router, &hdr, ptr);
}
else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) ||
(hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2))
{
router->row_count++;
handle_row_event(router, &hdr, ptr);
}
else if (hdr.event_type == MARIADB10_GTID_EVENT)
{
uint64_t n_sequence; /* 8 bytes */
@ -656,12 +686,6 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
router->gtid.event_num = 0;
router->gtid.timestamp = hdr.timestamp;
}
/**
* Check QUERY_EVENT
*
* Check for BEGIN ( ONLY for mysql 5.6, mariadb 5.5 )
* Check for COMMIT (not transactional engines)
*/
else if (hdr.event_type == QUERY_EVENT)
{
handle_query_event(router, &hdr, ptr);
@ -679,31 +703,13 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
gwbuf_free(result);
/* pos and next_pos sanity checks */
if (hdr.next_pos > 0 && hdr.next_pos < pos)
{
MXS_INFO("Binlog %s: next pos %u < pos %llu, truncating to %llu",
router->binlog_name.c_str(), hdr.next_pos, pos, pos);
break;
}
if (hdr.next_pos > 0 && hdr.next_pos != (pos + original_size))
{
MXS_INFO("Binlog %s: next pos %u != (pos %llu + event_size %u), truncating to %llu",
router->binlog_name.c_str(), hdr.next_pos, pos, hdr.event_size, pos);
break;
}
/* set pos to new value */
if (hdr.next_pos > 0)
if (pos_is_ok(router, hdr, pos, original_size))
{
pos = hdr.next_pos;
router->current_pos = pos;
}
else
{
MXS_ERROR("Current event type %d @ %llu has nex pos = %u : exiting",
hdr.event_type, pos, hdr.next_pos);
break;
}
}