Process file and data events separately

The various file operation related binlog events are now processed on the
upper level. This makes the actual data event processing simpler and
easier to comprehend.
This commit is contained in:
Markus Mäkelä 2018-06-07 15:42:26 +03:00
parent 9ec6293e3f
commit 5268d032c5
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19
4 changed files with 109 additions and 87 deletions

View File

@ -1,10 +1,19 @@
if(AVRO_FOUND AND JANSSON_FOUND)
include_directories(${AVRO_INCLUDE_DIR})
include_directories(${JANSSON_INCLUDE_DIR})
add_library(avrorouter SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc avro_schema.cc avro_rbr.cc avro_file.cc avro_index.cc avro_main.cc)
# The common avrorouter functionality
add_library(avro-common SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc avro_schema.cc avro_rbr.cc avro_file.cc avro_index.cc)
set_target_properties(avro-common PROPERTIES VERSION "1.0.0")
set_target_properties(avro-common PROPERTIES LINK_FLAGS -Wl,-z,defs)
target_link_libraries(avro-common maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro lzma)
install_module(avro-common core)
# The actual avrorouter implementation
add_library(avrorouter SHARED avro_main.cc)
set_target_properties(avrorouter PROPERTIES VERSION "1.0.0")
set_target_properties(avrorouter PROPERTIES LINK_FLAGS -Wl,-z,defs)
target_link_libraries(avrorouter maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro lzma)
target_link_libraries(avrorouter avro-common)
install_module(avrorouter core)
if (BUILD_TESTS)

View File

@ -67,10 +67,8 @@ bool avro_load_conversion_state(Avro *router);
void avro_load_metadata_from_schemas(Avro *router);
int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata);
static bool ensure_dir_ok(std::string path, int mode);
bool avro_save_conversion_state(Avro *router);
static void stats_func(void *);
void avro_index_file(Avro *router, const char* path);
void avro_update_index(Avro* router);
static SPINLOCK instlock;
static Avro *instances;
@ -150,7 +148,6 @@ bool create_tables(sqlite3* handle)
return true;
}
/**
* @brief Read router options from an external binlogrouter service
*

View File

@ -42,7 +42,6 @@ static const char *ddl_list_name = "table-ddl.list";
void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
bool is_create_table_statement(Avro *router, char* ptr, size_t len);
void avro_notify_client(AvroSession *client);
void avro_update_index(Avro* router);
void update_used_tables(Avro* router);
TableCreateEvent* table_create_from_schema(const char* file, const char* db,
const char* table, int version);
@ -446,17 +445,29 @@ void notify_all_clients(Avro *router)
dcb_foreach(notify_cb, router->service);
}
void do_checkpoint(Avro *router, uint64_t *total_rows, uint64_t *total_commits)
void do_checkpoint(Avro *router)
{
update_used_tables(router);
avro_flush_all_tables(router, AVROROUTER_FLUSH);
avro_save_conversion_state(router);
notify_all_clients(router);
*total_rows += router->row_count;
*total_commits += router->trx_count;
router->row_count = router->trx_count = 0;
}
REP_HEADER construct_header(uint8_t* ptr)
{
REP_HEADER hdr;
hdr.timestamp = EXTRACT32(ptr);
hdr.event_type = ptr[4];
hdr.serverid = EXTRACT32(&ptr[5]);
hdr.event_size = extract_field(&ptr[9], 32);
hdr.next_pos = EXTRACT32(&ptr[13]);
hdr.flags = EXTRACT16(&ptr[17]);
return hdr;
}
bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_binlog_end_t* rc)
{
uint8_t hdbuf[BINLOG_EVENT_HDR_LEN];
@ -489,16 +500,10 @@ bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_bin
return false;
}
/* fill replication header struct */
hdr->timestamp = EXTRACT32(hdbuf);
hdr->event_type = hdbuf[4];
hdr->serverid = EXTRACT32(&hdbuf[5]);
hdr->event_size = extract_field(&hdbuf[9], 32);
hdr->next_pos = EXTRACT32(&hdbuf[13]);
hdr->flags = EXTRACT16(&hdbuf[17]);
bool rval = true;
*hdr = construct_header(hdbuf);
if (hdr->event_type > MAX_EVENT_TYPE_MARIADB10)
{
MXS_ERROR("Invalid MariaDB 10 event type 0x%x. Binlog file is %s, position %llu",
@ -527,14 +532,12 @@ static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos, uint64_
MXS_INFO("Binlog %s: next pos %u < pos %lu, truncating to %lu",
router->binlog_name.c_str(), hdr.next_pos, pos, pos);
}
if (hdr.next_pos > 0 && hdr.next_pos != (pos + original_size))
else if (hdr.next_pos > 0 && hdr.next_pos != (pos + original_size))
{
MXS_INFO("Binlog %s: next pos %u != (pos %lu + event_size %u), truncating to %lu",
router->binlog_name.c_str(), hdr.next_pos, pos, hdr.event_size, pos);
}
if (hdr.next_pos > 0)
else if (hdr.next_pos > 0)
{
rval = true;
}
@ -547,6 +550,67 @@ static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos, uint64_
return rval;
}
void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos)
{
if (router->binlog_checksum)
{
hdr.event_size -= 4;
}
// The following events are related to the actual data
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
{
const int BLRM_FDE_EVENT_TYPES_OFFSET = 2 + 50 + 4 + 1;
const int FDE_EXTRA_BYTES = 5;
int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1];
int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES;
uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES;
// Precaution to prevent writing too much in case new events are added
int real_len = MXS_MIN(n_events, (int)sizeof(router->event_type_hdr_lens));
memcpy(router->event_type_hdr_lens, ptr + BLRM_FDE_EVENT_TYPES_OFFSET, real_len);
router->event_types = n_events;
router->binlog_checksum = checksum[0];
}
else if (hdr.event_type == TABLE_MAP_EVENT)
{
handle_table_map_event(router, &hdr, ptr);
}
else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) ||
(hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2))
{
router->row_count++;
handle_row_event(router, &hdr, ptr);
}
else if (hdr.event_type == MARIADB10_GTID_EVENT)
{
uint64_t n_sequence; /* 8 bytes */
uint32_t domainid; /* 4 bytes */
n_sequence = extract_field(ptr, 64);
domainid = extract_field(ptr + 8, 32);
router->gtid.domain = domainid;
router->gtid.server_id = hdr.serverid;
router->gtid.seq = n_sequence;
router->gtid.event_num = 0;
router->gtid.timestamp = hdr.timestamp;
}
else if (hdr.event_type == QUERY_EVENT)
{
handle_query_event(router, &hdr, ptr);
}
else if (hdr.event_type == XID_EVENT)
{
router->trx_count++;
if (router->row_count >= router->row_target ||
router->trx_count >= router->trx_target)
{
do_checkpoint(router);
}
}
}
/**
* @brief Read all replication events from a binlog file.
*
@ -562,7 +626,6 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
{
uint64_t pos = router->current_pos;
char next_binlog[BINLOG_FNAMELEN + 1];
uint64_t total_commits = 0, total_rows = 0;
bool found_chksum = false;
bool rotate_seen = false;
bool stop_seen = false;
@ -578,7 +641,7 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
{
if (rc == AVRO_OK)
{
do_checkpoint(router, &total_rows, &total_commits);
do_checkpoint(router);
if (rotate_seen)
{
@ -600,16 +663,11 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
return AVRO_BINLOG_ERROR;
}
uint64_t original_size = hdr.event_size;
/* get event content */
uint8_t* ptr = GWBUF_DATA(result);
uint32_t original_size = hdr.event_size;
if (router->binlog_checksum)
{
hdr.event_size -= 4;
}
// These events are only related to binary log files
if (hdr.event_type == STOP_EVENT)
{
@ -639,69 +697,19 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
rotate_seen = true;
}
// The following events are related to the actual data
else if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
{
const int BLRM_FDE_EVENT_TYPES_OFFSET = 2 + 50 + 4 + 1;
const int FDE_EXTRA_BYTES = 5;
int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1];
int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES;
uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES;
// Precaution to prevent writing too much in case new events are added
int real_len = MXS_MIN(n_events, (int)sizeof(router->event_type_hdr_lens));
memcpy(router->event_type_hdr_lens, ptr + BLRM_FDE_EVENT_TYPES_OFFSET, real_len);
router->event_types = n_events;
router->binlog_checksum = checksum[0];
}
else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT)
{
// This appears to need special handling
MXS_INFO("Annotate_rows_event: %.*s", hdr.event_size - BINLOG_EVENT_HDR_LEN, ptr);
pos += original_size;
pos += hdr.event_size;
router->current_pos = pos;
gwbuf_free(result);
continue;
}
else if (hdr.event_type == TABLE_MAP_EVENT)
else
{
handle_table_map_event(router, &hdr, ptr);
handle_one_event(router, ptr, hdr, pos);
}
else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) ||
(hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2))
{
router->row_count++;
handle_row_event(router, &hdr, ptr);
}
else if (hdr.event_type == MARIADB10_GTID_EVENT)
{
uint64_t n_sequence; /* 8 bytes */
uint32_t domainid; /* 4 bytes */
n_sequence = extract_field(ptr, 64);
domainid = extract_field(ptr + 8, 32);
router->gtid.domain = domainid;
router->gtid.server_id = hdr.serverid;
router->gtid.seq = n_sequence;
router->gtid.event_num = 0;
router->gtid.timestamp = hdr.timestamp;
}
else if (hdr.event_type == QUERY_EVENT)
{
handle_query_event(router, &hdr, ptr);
}
else if (hdr.event_type == XID_EVENT)
{
router->trx_count++;
if (router->row_count >= router->row_target ||
router->trx_count >= router->trx_target)
{
do_checkpoint(router, &total_rows, &total_commits);
}
}
gwbuf_free(result);
if (pos_is_ok(router, hdr, pos, original_size))
{
@ -766,7 +774,7 @@ void avro_load_metadata_from_schemas(Avro *router)
if (it == router->created_tables.end() || version > it->second->version)
{
STableCreateEvent created(table_create_from_schema(files.gl_pathv[i],
db, table, version));
db, table, version));
router->created_tables[table_ident] = created;
}
}
@ -966,7 +974,7 @@ static void strip_executable_comments(char *sql, int* len)
void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
{
int dblen = ptr[DBNM_OFF];
int vblklen = ptr[VBLK_OFF];
int vblklen = gw_mysql_get_byte2(ptr + VBLK_OFF);
int len = hdr->event_size - BINLOG_EVENT_HDR_LEN - (PHDR_OFF + vblklen + 1 + dblen);
char *sql = (char *) ptr + PHDR_OFF + vblklen + 1 + dblen;
char db[dblen + 1];
@ -983,6 +991,12 @@ void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
strip_executable_comments(sql, &len);
sql[len] = '\0';
if (*sql == '\0')
{
MXS_FREE(tmp);
return;
}
static bool warn_not_row_format = true;
if (warn_not_row_format)

View File

@ -34,6 +34,7 @@
#include <maxscale/sqlite3.h>
#include <maxscale/protocol/mysql.h>
#include <blr_constants.h>
#include "rpl_events.hh"
MXS_BEGIN_DECLS
@ -290,10 +291,11 @@ extern avro_binlog_end_t avro_read_all_events(Avro *router);
extern AvroTable* avro_table_alloc(const char* filepath, const char* json_schema,
const char *codec, size_t block_size);
extern char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create);
extern void save_avro_schema(const char *path, const char* schema, STableMapEvent& map,
STableCreateEvent& create);
extern void save_avro_schema(const char *path, const char* schema, STableMapEvent& map, STableCreateEvent& create);
extern bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
extern bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos);
REP_HEADER construct_header(uint8_t* ptr);
bool avro_save_conversion_state(Avro *router);
void avro_update_index(Avro* router);