diff --git a/server/modules/routing/avrorouter/avro.cc b/server/modules/routing/avrorouter/avro.cc index b6f12d8d1..d25a868d4 100644 --- a/server/modules/routing/avrorouter/avro.cc +++ b/server/modules/routing/avrorouter/avro.cc @@ -145,28 +145,13 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRow avrodir(config_get_string(params, "avrodir")), current_pos(4), binlog_fd(-1), - event_types(0), - event_type_hdr_lens{0}, - binlog_checksum(0), trx_count(0), trx_target(config_get_integer(params, "group_trx")), row_count(0), row_target(config_get_integer(params, "group_rows")), task_handle(0), - event_handler(handler) + handler(service, handler) { - /** For detection of CREATE/ALTER TABLE statements */ - static const char* create_table_regex = "(?i)create[a-z0-9[:space:]_]+table"; - static const char* alter_table_regex = "(?i)alter[[:space:]]+table"; - int pcreerr; - size_t erroff; - create_table_re = pcre2_compile((PCRE2_SPTR) create_table_regex, PCRE2_ZERO_TERMINATED, - 0, &pcreerr, &erroff, NULL); - ss_dassert(create_table_re); // This should never fail - alter_table_re = pcre2_compile((PCRE2_SPTR) alter_table_regex, PCRE2_ZERO_TERMINATED, - 0, &pcreerr, &erroff, NULL); - ss_dassert(alter_table_re); // This should never fail - if (source) { read_source_service_options(source); diff --git a/server/modules/routing/avrorouter/avro_converter.hh b/server/modules/routing/avrorouter/avro_converter.hh index ff43d622f..5c1b8a21f 100644 --- a/server/modules/routing/avrorouter/avro_converter.hh +++ b/server/modules/routing/avrorouter/avro_converter.hh @@ -12,8 +12,8 @@ * Public License. */ -#include "rpl_events.hh" #include "avrorouter.hh" +#include "rpl.hh" #include diff --git a/server/modules/routing/avrorouter/avro_file.cc b/server/modules/routing/avrorouter/avro_file.cc index 48618f5fc..3534737c0 100644 --- a/server/modules/routing/avrorouter/avro_file.cc +++ b/server/modules/routing/avrorouter/avro_file.cc @@ -39,7 +39,6 @@ static const char *statefile_section = "avro-conversion"; -void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); /** * Open a binlog file for reading @@ -111,10 +110,11 @@ bool avro_save_conversion_state(Avro *router) return false; } + gtid_pos_t gtid = router->handler.get_gtid(); fprintf(config_file, "[%s]\n", statefile_section); fprintf(config_file, "position=%lu\n", router->current_pos); - fprintf(config_file, "gtid=%lu-%lu-%lu:%lu\n", router->gtid.domain, - router->gtid.server_id, router->gtid.seq, router->gtid.event_num); + fprintf(config_file, "gtid=%lu-%lu-%lu:%lu\n", gtid.domain, + gtid.server_id, gtid.seq, gtid.event_num); fprintf(config_file, "file=%s\n", router->binlog_name.c_str()); fclose(config_file); @@ -159,10 +159,12 @@ static int conv_state_handler(void* data, const char* section, const char* key, if (domain && serv_id && seq && subseq) { - router->gtid.domain = strtol(domain, NULL, 10); - router->gtid.server_id = strtol(serv_id, NULL, 10); - router->gtid.seq = strtol(seq, NULL, 10); - router->gtid.event_num = strtol(subseq, NULL, 10); + gtid_pos_t gtid; + gtid.domain = strtol(domain, NULL, 10); + gtid.server_id = strtol(serv_id, NULL, 10); + gtid.seq = strtol(seq, NULL, 10); + gtid.event_num = strtol(subseq, NULL, 10); + router->handler.set_gtid(gtid); } } else if (strcmp(key, "position") == 0) @@ -217,10 +219,13 @@ bool avro_load_conversion_state(Avro *router) switch (rc) { case 0: - rval = true; - MXS_NOTICE("Loaded stored binary log conversion state: File: [%s] Position: [%ld] GTID: [%lu-%lu-%lu:%lu]", - router->binlog_name.c_str(), router->current_pos, router->gtid.domain, - router->gtid.server_id, router->gtid.seq, router->gtid.event_num); + { + rval = true; + gtid_pos_t gtid = router->handler.get_gtid(); + MXS_NOTICE("Loaded stored binary log conversion state: File: [%s] Position: [%ld] GTID: [%lu-%lu-%lu:%lu]", + router->binlog_name.c_str(), router->current_pos, gtid.domain, + gtid.server_id, gtid.seq, gtid.event_num); + } break; case -1: @@ -356,19 +361,34 @@ bool notify_cb(DCB* dcb, void* data) return true; } -void notify_all_clients(Avro *router) +void notify_all_clients(SERVICE* service) { - dcb_foreach(notify_cb, router->service); + dcb_foreach(notify_cb, service); } void do_checkpoint(Avro *router) { - router->event_handler->flush_tables(); + router->handler.flush(); avro_save_conversion_state(router); - notify_all_clients(router); + notify_all_clients(router->service); router->row_count = router->trx_count = 0; } +void Rpl::flush() +{ + m_handler->flush_tables(); +} + +void Rpl::add_create(STableCreateEvent create) +{ + auto it = m_created_tables.find(create->id()); + + if (it == m_created_tables.end() || create->version > it->second->version) + { + m_created_tables[create->id()] = create; + } +} + REP_HEADER construct_header(uint8_t* ptr) { REP_HEADER hdr; @@ -465,8 +485,14 @@ static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos) return rval; } -void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr) +void Rpl::handle_event(REP_HEADER hdr, uint8_t* ptr) { + if (m_binlog_checksum) + { + // We don't care about the checksum at this point so we ignore it + hdr.event_size -= 4; + } + // The following events are related to the actual data if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) { @@ -475,45 +501,26 @@ void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr) int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1]; int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES; uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES; - - // Precaution to prevent writing too much in case new events are added - int real_len = MXS_MIN(n_events, (int)sizeof(router->event_type_hdr_lens)); - memcpy(router->event_type_hdr_lens, ptr + BLRM_FDE_EVENT_TYPES_OFFSET, real_len); - - router->event_types = n_events; - router->binlog_checksum = checksum[0]; + m_event_type_hdr_lens.assign(ptr, ptr + n_events); + m_event_types = n_events; + m_binlog_checksum = checksum[0]; } else if (hdr.event_type == TABLE_MAP_EVENT) { - handle_table_map_event(router, &hdr, ptr); + handle_table_map_event(&hdr, ptr); } else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) || (hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2)) { - router->row_count++; - handle_row_event(router, &hdr, ptr); + handle_row_event(&hdr, ptr); } else if (hdr.event_type == MARIADB10_GTID_EVENT) { - router->gtid.domain = extract_field(ptr + 8, 32); - router->gtid.server_id = hdr.serverid; - router->gtid.seq = extract_field(ptr, 64); - router->gtid.event_num = 0; - router->gtid.timestamp = hdr.timestamp; + m_gtid.extract(hdr, ptr); } else if (hdr.event_type == QUERY_EVENT) { - handle_query_event(router, &hdr, ptr); - } - else if (hdr.event_type == XID_EVENT) - { - router->trx_count++; - - if (router->row_count >= router->row_target || - router->trx_count >= router->trx_target) - { - do_checkpoint(router); - } + handle_query_event(&hdr, ptr); } } @@ -573,14 +580,14 @@ avro_binlog_end_t avro_read_all_events(Avro *router) // These events are only related to binary log files if (hdr.event_type == ROTATE_EVENT) { - int len = hdr.event_size - BINLOG_EVENT_HDR_LEN - 8 - (router->binlog_checksum ? 4 : 0); + int len = hdr.event_size - BINLOG_EVENT_HDR_LEN - 8 - (router->handler.have_checksums() ? 4 : 0); next_binlog.assign((char*)ptr + 8, len); rotate_seen = true; } else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT) { // This appears to need special handling - int annotate_len = hdr.event_size - BINLOG_EVENT_HDR_LEN - (router->binlog_checksum ? 4 : 0); + int annotate_len = hdr.event_size - BINLOG_EVENT_HDR_LEN - (router->handler.have_checksums() ? 4 : 0); MXS_INFO("Annotate_rows_event: %.*s", annotate_len, ptr); pos += hdr.event_size; router->current_pos = pos; @@ -589,21 +596,27 @@ avro_binlog_end_t avro_read_all_events(Avro *router) } else { - uint32_t orig_size = hdr.event_size; - - if (router->binlog_checksum) + if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) || + (hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2)) { - // We don't care about the checksum at this point so we ignore it - hdr.event_size -= 4; + router->row_count++; + } + else if (hdr.event_type == XID_EVENT) + { + router->trx_count++; } - handle_one_event(router, ptr, hdr); - - hdr.event_size = orig_size; + router->handler.handle_event(hdr, ptr); } gwbuf_free(result); + if (router->row_count >= router->row_target || + router->trx_count >= router->trx_target) + { + do_checkpoint(router); + } + if (pos_is_ok(router, hdr, pos)) { pos = hdr.next_pos; @@ -662,14 +675,9 @@ void avro_load_metadata_from_schemas(Avro *router) if (versionend == suffix) { snprintf(table_ident, sizeof(table_ident), "%s.%s", db, table); - auto it = router->created_tables.find(table_ident); - - if (it == router->created_tables.end() || version > it->second->version) - { - STableCreateEvent created(table_create_from_schema(files.gl_pathv[i], - db, table, version)); - router->created_tables[table_ident] = created; - } + STableCreateEvent created(table_create_from_schema(files.gl_pathv[i], + db, table, version)); + router->handler.add_create(created); } else { @@ -688,14 +696,14 @@ void avro_load_metadata_from_schemas(Avro *router) * @param len Statement length * @return True if the statement creates a new table */ -bool is_create_table_statement(Avro *router, char* ptr, size_t len) +bool is_create_table_statement(pcre2_code* create_table_re, char* ptr, size_t len) { int rc = 0; - pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(router->create_table_re, NULL); + pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(create_table_re, NULL); if (mdata) { - rc = pcre2_match(router->create_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL); + rc = pcre2_match(create_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL); pcre2_match_data_free(mdata); } @@ -740,14 +748,14 @@ bool is_create_as_statement(const char* ptr, size_t len) * @param len Statement length * @return True if the statement alters a table */ -bool is_alter_table_statement(Avro *router, char* ptr, size_t len) +bool is_alter_table_statement(pcre2_code* alter_table_re, char* ptr, size_t len) { int rc = 0; - pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(router->alter_table_re, NULL); + pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(alter_table_re, NULL); if (mdata) { - rc = pcre2_match(router->alter_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL); + rc = pcre2_match(alter_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL); pcre2_match_data_free(mdata); } @@ -770,23 +778,23 @@ bool is_alter_table_statement(Avro *router, char* ptr, size_t len) * @param created Created table * @return False if an error occurred and true if successful */ -bool save_and_replace_table_create(Avro *router, TableCreateEvent *created) +bool Rpl::save_and_replace_table_create(STableCreateEvent created) { std::string table_ident = created->database + "." + created->table; - auto it = router->created_tables.find(table_ident); + auto it = m_created_tables.find(table_ident); - if (it != router->created_tables.end()) + if (it != m_created_tables.end()) { - auto tm_it = router->table_maps.find(table_ident); + auto tm_it = m_table_maps.find(table_ident); - if (tm_it != router->table_maps.end()) + if (tm_it != m_table_maps.end()) { - router->active_maps.erase(tm_it->second->id); - router->table_maps.erase(tm_it); + m_active_maps.erase(tm_it->second->id); + m_table_maps.erase(tm_it); } } - router->created_tables[table_ident] = STableCreateEvent(created); + m_created_tables[table_ident] = created; ss_dassert(created->columns.size() > 0); return true; } @@ -844,7 +852,7 @@ static void strip_executable_comments(char *sql, int* len) * @param pending_transaction Pointer where status of pending transaction is stored * @param ptr Pointer to the start of the event payload */ -void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) +void Rpl::handle_query_event(REP_HEADER *hdr, uint8_t *ptr) { int dblen = ptr[DBNM_OFF]; int vblklen = gw_mysql_get_byte2(ptr + VBLK_OFF); @@ -892,13 +900,13 @@ void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) char ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2]; read_table_identifier(db, sql, sql + len, ident, sizeof(ident)); - if (is_create_table_statement(router, sql, len)) + if (is_create_table_statement(m_create_table_re, sql, len)) { - TableCreateEvent *created = NULL; + STableCreateEvent created; if (is_create_like_statement(sql, len)) { - created = table_create_copy(router, sql, len, db); + created = table_create_copy(sql, len, db); } else if (is_create_as_statement(sql, len)) { @@ -914,16 +922,16 @@ void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) created = table_create_alloc(ident, sql, len); } - if (created && !save_and_replace_table_create(router, created)) + if (created && !save_and_replace_table_create(created)) { MXS_ERROR("Failed to save statement to disk: %.*s", len, sql); } } - else if (is_alter_table_statement(router, sql, len)) + else if (is_alter_table_statement(m_alter_table_re, sql, len)) { - auto it = router->created_tables.find(ident); + auto it = m_created_tables.find(ident); - if (it != router->created_tables.end()) + if (it != m_created_tables.end()) { table_create_alter(it->second.get(), sql, sql + len); } @@ -932,11 +940,7 @@ void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) MXS_ERROR("Alter statement to table '%s' has no preceding create statement.", ident); } } - /* Commit received for non transactional tables, i.e. MyISAM */ - else if (strncmp(sql, "COMMIT", 6) == 0) - { - router->trx_count++; - } + // TODO: Add COMMIT handling for non-transactional tables MXS_FREE(tmp); } diff --git a/server/modules/routing/avrorouter/avro_main.cc b/server/modules/routing/avrorouter/avro_main.cc index 09f9d89a7..0f0f2408f 100644 --- a/server/modules/routing/avrorouter/avro_main.cc +++ b/server/modules/routing/avrorouter/avro_main.cc @@ -62,7 +62,8 @@ static bool conversion_task_ctl(Avro *inst, bool start); MXS_ROUTER* createInstance(SERVICE *service, char **options) { uint64_t block_size = config_get_size(service->svc_config_param, "block_size"); - mxs_avro_codec_type codec = static_cast(config_get_enum(service->svc_config_param, "codec", codec_values)); + mxs_avro_codec_type codec = static_cast(config_get_enum(service->svc_config_param, + "codec", codec_values)); std::string avrodir = config_get_string(service->svc_config_param, "avrodir"); SRowEventHandler handler(new AvroConverter(avrodir, block_size, codec)); @@ -153,6 +154,7 @@ static void diagnostics(MXS_ROUTER *router, DCB *dcb) { Avro *router_inst = (Avro *) router; + gtid_pos_t gtid = router_inst->handler.get_gtid(); dcb_printf(dcb, "\tAVRO files directory: %s\n", router_inst->avrodir.c_str()); @@ -164,12 +166,9 @@ diagnostics(MXS_ROUTER *router, DCB *dcb) dcb_printf(dcb, "\tCurrent binlog position: %lu\n", router_inst->current_pos); dcb_printf(dcb, "\tCurrent GTID value: %lu-%lu-%lu\n", - router_inst->gtid.domain, router_inst->gtid.server_id, - router_inst->gtid.seq); - dcb_printf(dcb, "\tCurrent GTID timestamp: %u\n", - router_inst->gtid.timestamp); - dcb_printf(dcb, "\tCurrent GTID #events: %lu\n", - router_inst->gtid.event_num); + gtid.domain, gtid.server_id, gtid.seq); + dcb_printf(dcb, "\tCurrent GTID timestamp: %u\n", gtid.timestamp); + dcb_printf(dcb, "\tCurrent GTID #events: %lu\n", gtid.event_num); } /** @@ -192,11 +191,11 @@ static json_t* diagnostics_json(const MXS_ROUTER *router) json_object_set_new(rval, "binlog_name", json_string(router_inst->binlog_name.c_str())); json_object_set_new(rval, "binlog_pos", json_integer(router_inst->current_pos)); - snprintf(pathbuf, sizeof(pathbuf), "%lu-%lu-%lu", router_inst->gtid.domain, - router_inst->gtid.server_id, router_inst->gtid.seq); + gtid_pos_t gtid = router_inst->handler.get_gtid(); + snprintf(pathbuf, sizeof(pathbuf), "%lu-%lu-%lu", gtid.domain, gtid.server_id, gtid.seq); json_object_set_new(rval, "gtid", json_string(pathbuf)); - json_object_set_new(rval, "gtid_timestamp", json_integer(router_inst->gtid.timestamp)); - json_object_set_new(rval, "gtid_event_number", json_integer(router_inst->gtid.event_num)); + json_object_set_new(rval, "gtid_timestamp", json_integer(gtid.timestamp)); + json_object_set_new(rval, "gtid_event_number", json_integer(gtid.event_num)); return rval; } @@ -281,7 +280,7 @@ bool converter_func(Worker::Call::action_t action, Avro* router) /** We reached end of file, flush unwritten records to disk */ if (progress) { - router->event_handler->flush_tables(); + router->handler.flush(); avro_save_conversion_state(router); logged = false; } diff --git a/server/modules/routing/avrorouter/avro_rbr.cc b/server/modules/routing/avrorouter/avro_rbr.cc index 23bcbe163..ff9cd2fc8 100644 --- a/server/modules/routing/avrorouter/avro_rbr.cc +++ b/server/modules/routing/avrorouter/avro_rbr.cc @@ -74,23 +74,23 @@ static int get_event_type(uint8_t event) * @param hdr Replication header * @param ptr Pointer to event payload */ -bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) +bool Rpl::handle_table_map_event(REP_HEADER *hdr, uint8_t *ptr) { bool rval = false; uint64_t id; char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2]; - int ev_len = router->event_type_hdr_lens[hdr->event_type]; + int ev_len = m_event_type_hdr_lens[hdr->event_type]; read_table_info(ptr, ev_len, &id, table_ident, sizeof(table_ident)); - auto create = router->created_tables.find(table_ident); + auto create = m_created_tables.find(table_ident); - if (create != router->created_tables.end()) + if (create != m_created_tables.end()) { ss_dassert(create->second->columns.size() > 0); - auto it = router->table_maps.find(table_ident); + auto it = m_table_maps.find(table_ident); STableMapEvent map(table_map_alloc(ptr, ev_len, create->second.get())); - if (it != router->table_maps.end()) + if (it != m_table_maps.end()) { auto old = it->second; @@ -102,28 +102,23 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) } } - if (router->event_handler->open_table(map, create->second)) + if (m_handler->open_table(map, create->second)) { create->second->was_used = true; - auto old = router->table_maps.find(table_ident); - bool notify = old != router->table_maps.end(); + auto old = m_table_maps.find(table_ident); + bool notify = old != m_table_maps.end(); if (notify) { - router->active_maps.erase(old->second->id); + m_active_maps.erase(old->second->id); } - router->table_maps[table_ident] = map; - router->active_maps[map->id] = map; - ss_dassert(router->active_maps[id] == map); + m_table_maps[table_ident] = map; + m_active_maps[map->id] = map; + ss_dassert(m_active_maps[id] == map); MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id); rval = true; - - if (notify) - { - notify_all_clients(router); - } } } else @@ -133,11 +128,6 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) "table until a DDL statement for it is read.", table_ident); } - if (rval) - { - MXS_INFO("Table Map for '%s' at %lu", table_ident, router->current_pos); - } - return rval; } @@ -152,12 +142,11 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) * @param ptr Pointer to the start of the event * @return True on succcess, false on error */ -bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) +bool Rpl::handle_row_event(REP_HEADER *hdr, uint8_t *ptr) { bool rval = false; - uint8_t *start = ptr; uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN; - uint8_t table_id_size = router->event_type_hdr_lens[hdr->event_type] == 6 ? 4 : 6; + uint8_t table_id_size = m_event_type_hdr_lens[hdr->event_type] == 6 ? 4 : 6; uint64_t table_id = 0; /** The first value is the ID where the table was mapped. This should be @@ -213,46 +202,45 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) // There should always be a table map event prior to a row event. - auto it = router->active_maps.find(table_id); + auto it = m_active_maps.find(table_id); - if (it != router->active_maps.end()) + if (it != m_active_maps.end()) { STableMapEvent map = it->second; char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2]; snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str()); - bool ok = router->event_handler->prepare_table(map->database, map->table); - auto create = router->created_tables.find(table_ident); + bool ok = m_handler->prepare_table(map->database, map->table); + auto create = m_created_tables.find(table_ident); - if (ok && create != router->created_tables.end() && + if (ok && create != m_created_tables.end() && ncolumns == map->columns() && create->second->columns.size() == map->columns()) { /** Each event has one or more rows in it. The number of rows is not known * beforehand so we must continue processing them until we reach the end * of the event. */ int rows = 0; - MXS_INFO("Row Event for '%s' at %lu", table_ident, router->current_pos); + MXS_INFO("Row Event for '%s' at %u", table_ident, hdr->next_pos - hdr->event_size); while (ptr < end) { - static uint64_t total_row_count = 1; int event_type = get_event_type(hdr->event_type); // Increment the event count for this transaction - router->gtid.event_num++; + m_gtid.event_num++; - router->event_handler->prepare_row(router->gtid, *hdr, event_type); - ptr = process_row_event_data(map, create->second, router->event_handler, ptr, col_present, end); - router->event_handler->commit(router->gtid); + m_handler->prepare_row(m_gtid, *hdr, event_type); + ptr = process_row_event_data(map, create->second, m_handler, ptr, col_present, end); + m_handler->commit(m_gtid); /** Update rows events have the before and after images of the * affected rows so we'll process them as another record with * a different type */ if (event_type == UPDATE_EVENT) { - router->event_handler->prepare_row(router->gtid, *hdr, UPDATE_EVENT_AFTER); - ptr = process_row_event_data(map, create->second, router->event_handler, ptr, col_present, end); - router->event_handler->commit(router->gtid); + m_handler->prepare_row(m_gtid, *hdr, UPDATE_EVENT_AFTER); + ptr = process_row_event_data(map, create->second, m_handler, ptr, col_present, end); + m_handler->commit(m_gtid); } rows++; @@ -265,7 +253,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier" " errors for more details.", map->database.c_str(), map->table.c_str()); } - else if (create == router->created_tables.end()) + else if (create == m_created_tables.end()) { MXS_ERROR("Create table statement for %s.%s was not found from the " "binary logs or the stored schema was not correct.", diff --git a/server/modules/routing/avrorouter/avro_schema.cc b/server/modules/routing/avrorouter/avro_schema.cc index 28ed33d05..b44e28901 100644 --- a/server/modules/routing/avrorouter/avro_schema.cc +++ b/server/modules/routing/avrorouter/avro_schema.cc @@ -514,7 +514,7 @@ int resolve_table_version(const char* db, const char* table) * * @return New CREATE_TABLE object or NULL if an error occurred */ -TableCreateEvent* table_create_alloc(char* ident, const char* sql, int len) +STableCreateEvent table_create_alloc(char* ident, const char* sql, int len) { /** Extract the table definition so we can get the column names from it */ int stmt_len = 0; @@ -533,12 +533,12 @@ TableCreateEvent* table_create_alloc(char* ident, const char* sql, int len) std::vector columns; process_column_definition(statement_sql, columns); - TableCreateEvent *rval = NULL; + STableCreateEvent rval; if (!columns.empty()) { int version = resolve_table_version(database, table); - rval = new (std::nothrow) TableCreateEvent(database, table, version, std::move(columns)); + rval.reset(new (std::nothrow) TableCreateEvent(database, table, version, std::move(columns))); } else { @@ -730,9 +730,9 @@ static bool extract_create_like_identifier(const char* sql, size_t len, char* ta /** * Create a table from another table */ -TableCreateEvent* table_create_copy(Avro *router, const char* sql, size_t len, const char* db) +STableCreateEvent Rpl::table_create_copy(const char* sql, size_t len, const char* db) { - TableCreateEvent* rval = NULL; + STableCreateEvent rval; char target[MYSQL_TABLE_MAXLEN + 1] = ""; char source[MYSQL_TABLE_MAXLEN + 1] = ""; @@ -748,11 +748,11 @@ TableCreateEvent* table_create_copy(Avro *router, const char* sql, size_t len, c strcat(table_ident, source); - auto it = router->created_tables.find(table_ident); + auto it = m_created_tables.find(table_ident); - if (it != router->created_tables.end()) + if (it != m_created_tables.end()) { - rval = new (std::nothrow) TableCreateEvent(*it->second); + rval.reset(new (std::nothrow) TableCreateEvent(*it->second)); char* table = strchr(target, '.'); table = table ? table + 1 : target; rval->table = table; diff --git a/server/modules/routing/avrorouter/avrorouter.hh b/server/modules/routing/avrorouter/avrorouter.hh index 6ddec4c19..85f7cdeb6 100644 --- a/server/modules/routing/avrorouter/avrorouter.hh +++ b/server/modules/routing/avrorouter/avrorouter.hh @@ -26,13 +26,12 @@ #include #include #include -#include #include #include #include #include -#include "rpl_events.hh" +#include "rpl.hh" MXS_BEGIN_DECLS @@ -113,9 +112,6 @@ static const MXS_ENUM_VALUE codec_values[] = {NULL} }; -typedef std::tr1::unordered_map CreatedTables; -typedef std::tr1::unordered_map MappedTables; -typedef std::tr1::unordered_map ActiveMaps; class Avro: public MXS_ROUTER { @@ -132,23 +128,12 @@ public: std::string binlog_name; /*< Name of the current binlog file */ uint64_t current_pos; /*< Current binlog position */ int binlog_fd; /*< File descriptor of the binlog file being read */ - pcre2_code* create_table_re; - pcre2_code* alter_table_re; - uint8_t event_types; - uint8_t event_type_hdr_lens[MAX_EVENT_TYPE_END]; - uint8_t binlog_checksum; - gtid_pos_t gtid; - ActiveMaps active_maps; - MappedTables table_maps; - CreatedTables created_tables; - uint64_t trx_count; /*< Transactions processed */ - uint64_t trx_target; /*< Minimum about of transactions that will trigger - * a flush of all tables */ - uint64_t row_count; /*< Row events processed */ - uint64_t row_target; /*< Minimum about of row events that will trigger - * a flush of all tables */ + uint64_t trx_count; /*< Transactions processed */ + uint64_t trx_target; /*< Number of transactions that trigger a flush */ + uint64_t row_count; /*< Row events processed */ + uint64_t row_target; /*< Number of row events that trigger a flush */ uint32_t task_handle; /**< Delayed task handle */ - SRowEventHandler event_handler; + Rpl handler; private: Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRowEventHandler handler); @@ -208,8 +193,7 @@ private: void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id, char* dest, size_t len); TableMapEvent *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TableCreateEvent* create); -TableCreateEvent* table_create_alloc(char* ident, const char* sql, int len); -TableCreateEvent* table_create_copy(Avro *router, const char* sql, size_t len, const char* db); +STableCreateEvent table_create_alloc(char* ident, const char* sql, int len); bool table_create_save(TableCreateEvent *create, const char *filename); bool table_create_alter(TableCreateEvent *create, const char *sql, const char *end); TableCreateEvent* table_create_from_schema(const char* file, const char* db, const char* table, @@ -221,7 +205,6 @@ bool avro_open_binlog(const char *binlogdir, const char *file, int *fd); void avro_close_binlog(int fd); avro_binlog_end_t avro_read_all_events(Avro *router); char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create); -bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos); REP_HEADER construct_header(uint8_t* ptr); diff --git a/server/modules/routing/avrorouter/rpl_events.hh b/server/modules/routing/avrorouter/rpl.hh similarity index 67% rename from server/modules/routing/avrorouter/rpl_events.hh rename to server/modules/routing/avrorouter/rpl.hh index c5054916a..fefb5a4de 100644 --- a/server/modules/routing/avrorouter/rpl_events.hh +++ b/server/modules/routing/avrorouter/rpl.hh @@ -18,6 +18,8 @@ #include #include +#include +#include #include typedef std::vector Bytes; @@ -42,6 +44,15 @@ struct gtid_pos_t * is an internal representation of the position of * an event inside a GTID event and it is used to * rebuild GTID events in the correct order. */ + + void extract(const REP_HEADER& hdr, uint8_t* ptr) + { + domain = extract_field(ptr + 8, 32); + server_id = hdr.serverid; + seq = extract_field(ptr, 64); + event_num = 0; + timestamp = hdr.timestamp; + } }; /** A single column in a CREATE TABLE statement */ @@ -71,6 +82,11 @@ struct TableCreateEvent { } + std::string id() const + { + return database + '.' + table; + } + std::vector columns; std::string table; std::string database; @@ -113,6 +129,11 @@ struct TableMapEvent typedef std::tr1::shared_ptr STableCreateEvent; typedef std::tr1::shared_ptr STableMapEvent; +// Containers for the replication events +typedef std::tr1::unordered_map CreatedTables; +typedef std::tr1::unordered_map MappedTables; +typedef std::tr1::unordered_map ActiveMaps; + // Handler class for row based replication events class RowEventHandler { @@ -167,3 +188,59 @@ public: }; typedef std::auto_ptr SRowEventHandler; + +class Rpl +{ +public: + Rpl(const Rpl&) = delete; + Rpl& operator=(const Rpl&) = delete; + + // Construct a new replication stream transformer + Rpl(SERVICE* service, SRowEventHandler event_handler); + + // Add a stored TableCreateEvent + void add_create(STableCreateEvent create); + + // Handle a replicated binary log event + void handle_event(REP_HEADER hdr, uint8_t* ptr); + + // Called when processed events need to be persisted to disk + void flush(); + + // Check if binlog checksums are enabled + bool have_checksums() const + { + return m_binlog_checksum; + } + + // Set current GTID + void set_gtid(gtid_pos_t gtid) + { + m_gtid = gtid; + } + + // Get current GTID + const gtid_pos_t& get_gtid() const + { + return m_gtid; + } + +private: + SRowEventHandler m_handler; + SERVICE* m_service; + pcre2_code* m_create_table_re; + pcre2_code* m_alter_table_re; + uint8_t m_binlog_checksum; + uint8_t m_event_types; + Bytes m_event_type_hdr_lens; + gtid_pos_t m_gtid; + ActiveMaps m_active_maps; + MappedTables m_table_maps; + CreatedTables m_created_tables; + + void handle_query_event(REP_HEADER *hdr, uint8_t *ptr); + bool handle_table_map_event(REP_HEADER *hdr, uint8_t *ptr); + bool handle_row_event(REP_HEADER *hdr, uint8_t *ptr); + STableCreateEvent table_create_copy(const char* sql, size_t len, const char* db); + bool save_and_replace_table_create(STableCreateEvent created); +};