From 5268d032c55d128205d97922a6042f13b4897a84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Thu, 7 Jun 2018 15:42:26 +0300 Subject: [PATCH] Process file and data events separately The various file operation related binlog events are now processed on the upper level. This makes the actual data event processing simpler and easier to comprehend. --- .../modules/routing/avrorouter/CMakeLists.txt | 13 +- server/modules/routing/avrorouter/avro.cc | 3 - .../modules/routing/avrorouter/avro_file.cc | 174 ++++++++++-------- .../modules/routing/avrorouter/avrorouter.hh | 6 +- 4 files changed, 109 insertions(+), 87 deletions(-) diff --git a/server/modules/routing/avrorouter/CMakeLists.txt b/server/modules/routing/avrorouter/CMakeLists.txt index 0b726173e..ae8dd991f 100644 --- a/server/modules/routing/avrorouter/CMakeLists.txt +++ b/server/modules/routing/avrorouter/CMakeLists.txt @@ -1,10 +1,19 @@ if(AVRO_FOUND AND JANSSON_FOUND) include_directories(${AVRO_INCLUDE_DIR}) include_directories(${JANSSON_INCLUDE_DIR}) - add_library(avrorouter SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc avro_schema.cc avro_rbr.cc avro_file.cc avro_index.cc avro_main.cc) + + # The common avrorouter functionality + add_library(avro-common SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc avro_schema.cc avro_rbr.cc avro_file.cc avro_index.cc) + set_target_properties(avro-common PROPERTIES VERSION "1.0.0") + set_target_properties(avro-common PROPERTIES LINK_FLAGS -Wl,-z,defs) + target_link_libraries(avro-common maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro lzma) + install_module(avro-common core) + + # The actual avrorouter implementation + add_library(avrorouter SHARED avro_main.cc) set_target_properties(avrorouter PROPERTIES VERSION "1.0.0") set_target_properties(avrorouter PROPERTIES LINK_FLAGS -Wl,-z,defs) - target_link_libraries(avrorouter maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro lzma) + target_link_libraries(avrorouter avro-common) install_module(avrorouter core) if (BUILD_TESTS) diff --git a/server/modules/routing/avrorouter/avro.cc b/server/modules/routing/avrorouter/avro.cc index 58d651618..fb8f192a7 100644 --- a/server/modules/routing/avrorouter/avro.cc +++ b/server/modules/routing/avrorouter/avro.cc @@ -67,10 +67,8 @@ bool avro_load_conversion_state(Avro *router); void avro_load_metadata_from_schemas(Avro *router); int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata); static bool ensure_dir_ok(std::string path, int mode); -bool avro_save_conversion_state(Avro *router); static void stats_func(void *); void avro_index_file(Avro *router, const char* path); -void avro_update_index(Avro* router); static SPINLOCK instlock; static Avro *instances; @@ -150,7 +148,6 @@ bool create_tables(sqlite3* handle) return true; } - /** * @brief Read router options from an external binlogrouter service * diff --git a/server/modules/routing/avrorouter/avro_file.cc b/server/modules/routing/avrorouter/avro_file.cc index 573bfe6f7..d02f6b439 100644 --- a/server/modules/routing/avrorouter/avro_file.cc +++ b/server/modules/routing/avrorouter/avro_file.cc @@ -42,7 +42,6 @@ static const char *ddl_list_name = "table-ddl.list"; void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); bool is_create_table_statement(Avro *router, char* ptr, size_t len); void avro_notify_client(AvroSession *client); -void avro_update_index(Avro* router); void update_used_tables(Avro* router); TableCreateEvent* table_create_from_schema(const char* file, const char* db, const char* table, int version); @@ -446,17 +445,29 @@ void notify_all_clients(Avro *router) dcb_foreach(notify_cb, router->service); } -void do_checkpoint(Avro *router, uint64_t *total_rows, uint64_t *total_commits) +void do_checkpoint(Avro *router) { update_used_tables(router); avro_flush_all_tables(router, AVROROUTER_FLUSH); avro_save_conversion_state(router); notify_all_clients(router); - *total_rows += router->row_count; - *total_commits += router->trx_count; router->row_count = router->trx_count = 0; } +REP_HEADER construct_header(uint8_t* ptr) +{ + REP_HEADER hdr; + + hdr.timestamp = EXTRACT32(ptr); + hdr.event_type = ptr[4]; + hdr.serverid = EXTRACT32(&ptr[5]); + hdr.event_size = extract_field(&ptr[9], 32); + hdr.next_pos = EXTRACT32(&ptr[13]); + hdr.flags = EXTRACT16(&ptr[17]); + + return hdr; +} + bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_binlog_end_t* rc) { uint8_t hdbuf[BINLOG_EVENT_HDR_LEN]; @@ -489,16 +500,10 @@ bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_bin return false; } - /* fill replication header struct */ - hdr->timestamp = EXTRACT32(hdbuf); - hdr->event_type = hdbuf[4]; - hdr->serverid = EXTRACT32(&hdbuf[5]); - hdr->event_size = extract_field(&hdbuf[9], 32); - hdr->next_pos = EXTRACT32(&hdbuf[13]); - hdr->flags = EXTRACT16(&hdbuf[17]); - bool rval = true; + *hdr = construct_header(hdbuf); + if (hdr->event_type > MAX_EVENT_TYPE_MARIADB10) { MXS_ERROR("Invalid MariaDB 10 event type 0x%x. Binlog file is %s, position %llu", @@ -527,14 +532,12 @@ static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos, uint64_ MXS_INFO("Binlog %s: next pos %u < pos %lu, truncating to %lu", router->binlog_name.c_str(), hdr.next_pos, pos, pos); } - - if (hdr.next_pos > 0 && hdr.next_pos != (pos + original_size)) + else if (hdr.next_pos > 0 && hdr.next_pos != (pos + original_size)) { MXS_INFO("Binlog %s: next pos %u != (pos %lu + event_size %u), truncating to %lu", router->binlog_name.c_str(), hdr.next_pos, pos, hdr.event_size, pos); } - - if (hdr.next_pos > 0) + else if (hdr.next_pos > 0) { rval = true; } @@ -547,6 +550,67 @@ static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos, uint64_ return rval; } +void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos) +{ + if (router->binlog_checksum) + { + hdr.event_size -= 4; + } + + // The following events are related to the actual data + if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) + { + const int BLRM_FDE_EVENT_TYPES_OFFSET = 2 + 50 + 4 + 1; + const int FDE_EXTRA_BYTES = 5; + int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1]; + int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES; + uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES; + + // Precaution to prevent writing too much in case new events are added + int real_len = MXS_MIN(n_events, (int)sizeof(router->event_type_hdr_lens)); + memcpy(router->event_type_hdr_lens, ptr + BLRM_FDE_EVENT_TYPES_OFFSET, real_len); + + router->event_types = n_events; + router->binlog_checksum = checksum[0]; + } + else if (hdr.event_type == TABLE_MAP_EVENT) + { + handle_table_map_event(router, &hdr, ptr); + } + else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) || + (hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2)) + { + router->row_count++; + handle_row_event(router, &hdr, ptr); + } + else if (hdr.event_type == MARIADB10_GTID_EVENT) + { + uint64_t n_sequence; /* 8 bytes */ + uint32_t domainid; /* 4 bytes */ + n_sequence = extract_field(ptr, 64); + domainid = extract_field(ptr + 8, 32); + router->gtid.domain = domainid; + router->gtid.server_id = hdr.serverid; + router->gtid.seq = n_sequence; + router->gtid.event_num = 0; + router->gtid.timestamp = hdr.timestamp; + } + else if (hdr.event_type == QUERY_EVENT) + { + handle_query_event(router, &hdr, ptr); + } + else if (hdr.event_type == XID_EVENT) + { + router->trx_count++; + + if (router->row_count >= router->row_target || + router->trx_count >= router->trx_target) + { + do_checkpoint(router); + } + } +} + /** * @brief Read all replication events from a binlog file. * @@ -562,7 +626,6 @@ avro_binlog_end_t avro_read_all_events(Avro *router) { uint64_t pos = router->current_pos; char next_binlog[BINLOG_FNAMELEN + 1]; - uint64_t total_commits = 0, total_rows = 0; bool found_chksum = false; bool rotate_seen = false; bool stop_seen = false; @@ -578,7 +641,7 @@ avro_binlog_end_t avro_read_all_events(Avro *router) { if (rc == AVRO_OK) { - do_checkpoint(router, &total_rows, &total_commits); + do_checkpoint(router); if (rotate_seen) { @@ -600,16 +663,11 @@ avro_binlog_end_t avro_read_all_events(Avro *router) return AVRO_BINLOG_ERROR; } + uint64_t original_size = hdr.event_size; + /* get event content */ uint8_t* ptr = GWBUF_DATA(result); - uint32_t original_size = hdr.event_size; - - if (router->binlog_checksum) - { - hdr.event_size -= 4; - } - // These events are only related to binary log files if (hdr.event_type == STOP_EVENT) { @@ -639,69 +697,19 @@ avro_binlog_end_t avro_read_all_events(Avro *router) rotate_seen = true; } - // The following events are related to the actual data - else if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) - { - const int BLRM_FDE_EVENT_TYPES_OFFSET = 2 + 50 + 4 + 1; - const int FDE_EXTRA_BYTES = 5; - int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1]; - int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES; - uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES; - - // Precaution to prevent writing too much in case new events are added - int real_len = MXS_MIN(n_events, (int)sizeof(router->event_type_hdr_lens)); - memcpy(router->event_type_hdr_lens, ptr + BLRM_FDE_EVENT_TYPES_OFFSET, real_len); - - router->event_types = n_events; - router->binlog_checksum = checksum[0]; - } else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT) { // This appears to need special handling MXS_INFO("Annotate_rows_event: %.*s", hdr.event_size - BINLOG_EVENT_HDR_LEN, ptr); - pos += original_size; + pos += hdr.event_size; router->current_pos = pos; gwbuf_free(result); continue; } - else if (hdr.event_type == TABLE_MAP_EVENT) + else { - handle_table_map_event(router, &hdr, ptr); + handle_one_event(router, ptr, hdr, pos); } - else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) || - (hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2)) - { - router->row_count++; - handle_row_event(router, &hdr, ptr); - } - else if (hdr.event_type == MARIADB10_GTID_EVENT) - { - uint64_t n_sequence; /* 8 bytes */ - uint32_t domainid; /* 4 bytes */ - n_sequence = extract_field(ptr, 64); - domainid = extract_field(ptr + 8, 32); - router->gtid.domain = domainid; - router->gtid.server_id = hdr.serverid; - router->gtid.seq = n_sequence; - router->gtid.event_num = 0; - router->gtid.timestamp = hdr.timestamp; - } - else if (hdr.event_type == QUERY_EVENT) - { - handle_query_event(router, &hdr, ptr); - } - else if (hdr.event_type == XID_EVENT) - { - router->trx_count++; - - if (router->row_count >= router->row_target || - router->trx_count >= router->trx_target) - { - do_checkpoint(router, &total_rows, &total_commits); - } - } - - gwbuf_free(result); if (pos_is_ok(router, hdr, pos, original_size)) { @@ -766,7 +774,7 @@ void avro_load_metadata_from_schemas(Avro *router) if (it == router->created_tables.end() || version > it->second->version) { STableCreateEvent created(table_create_from_schema(files.gl_pathv[i], - db, table, version)); + db, table, version)); router->created_tables[table_ident] = created; } } @@ -966,7 +974,7 @@ static void strip_executable_comments(char *sql, int* len) void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) { int dblen = ptr[DBNM_OFF]; - int vblklen = ptr[VBLK_OFF]; + int vblklen = gw_mysql_get_byte2(ptr + VBLK_OFF); int len = hdr->event_size - BINLOG_EVENT_HDR_LEN - (PHDR_OFF + vblklen + 1 + dblen); char *sql = (char *) ptr + PHDR_OFF + vblklen + 1 + dblen; char db[dblen + 1]; @@ -983,6 +991,12 @@ void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) strip_executable_comments(sql, &len); sql[len] = '\0'; + if (*sql == '\0') + { + MXS_FREE(tmp); + return; + } + static bool warn_not_row_format = true; if (warn_not_row_format) diff --git a/server/modules/routing/avrorouter/avrorouter.hh b/server/modules/routing/avrorouter/avrorouter.hh index 4cfd39d4b..e8bea0bb4 100644 --- a/server/modules/routing/avrorouter/avrorouter.hh +++ b/server/modules/routing/avrorouter/avrorouter.hh @@ -34,6 +34,7 @@ #include #include #include + #include "rpl_events.hh" MXS_BEGIN_DECLS @@ -290,10 +291,11 @@ extern avro_binlog_end_t avro_read_all_events(Avro *router); extern AvroTable* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec, size_t block_size); extern char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create); -extern void save_avro_schema(const char *path, const char* schema, STableMapEvent& map, - STableCreateEvent& create); +extern void save_avro_schema(const char *path, const char* schema, STableMapEvent& map, STableCreateEvent& create); extern bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); extern bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); +void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos); +REP_HEADER construct_header(uint8_t* ptr); bool avro_save_conversion_state(Avro *router); void avro_update_index(Avro* router);