From 7c1869660800f885ade865b1ce102acce9870b0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Thu, 7 Jun 2018 17:52:53 +0300 Subject: [PATCH] Abstract all row event processing The code that handles the Avro files is now fully abstracted behind the AvroConverter class that implements the RowEventHandler interface. The code still has some avro specific behavior in a few places (parsing of JSON files into TableCreate objects). This can be replaced, if needed, by querying the master server for the CREATE TABLE statements. --- .../modules/routing/avrorouter/CMakeLists.txt | 3 +- server/modules/routing/avrorouter/avro.cc | 11 +- .../routing/avrorouter/avro_converter.cc | 392 ++++++++++++++++++ .../routing/avrorouter/avro_converter.hh | 77 ++++ .../modules/routing/avrorouter/avro_file.cc | 92 +--- .../modules/routing/avrorouter/avro_main.cc | 4 +- server/modules/routing/avrorouter/avro_rbr.cc | 210 ++-------- .../modules/routing/avrorouter/avro_schema.cc | 134 ------ .../modules/routing/avrorouter/avrorouter.hh | 48 +-- .../modules/routing/avrorouter/rpl_events.hh | 35 +- 10 files changed, 531 insertions(+), 475 deletions(-) create mode 100644 server/modules/routing/avrorouter/avro_converter.cc create mode 100644 server/modules/routing/avrorouter/avro_converter.hh diff --git a/server/modules/routing/avrorouter/CMakeLists.txt b/server/modules/routing/avrorouter/CMakeLists.txt index ae8dd991f..bf6b4811b 100644 --- a/server/modules/routing/avrorouter/CMakeLists.txt +++ b/server/modules/routing/avrorouter/CMakeLists.txt @@ -3,7 +3,8 @@ if(AVRO_FOUND AND JANSSON_FOUND) include_directories(${JANSSON_INCLUDE_DIR}) # The common avrorouter functionality - add_library(avro-common SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc avro_schema.cc avro_rbr.cc avro_file.cc avro_index.cc) + add_library(avro-common SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc + avro_schema.cc avro_rbr.cc avro_file.cc avro_index.cc avro_converter.cc) set_target_properties(avro-common PROPERTIES VERSION "1.0.0") set_target_properties(avro-common PROPERTIES LINK_FLAGS -Wl,-z,defs) target_link_libraries(avro-common maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro lzma) diff --git a/server/modules/routing/avrorouter/avro.cc b/server/modules/routing/avrorouter/avro.cc index fb8f192a7..d15fe1100 100644 --- a/server/modules/routing/avrorouter/avro.cc +++ b/server/modules/routing/avrorouter/avro.cc @@ -18,7 +18,6 @@ #include "avrorouter.hh" -#include #include #include #include @@ -47,6 +46,8 @@ #include #include +#include "avro_converter.hh" + using namespace maxscale; #ifndef BINLOG_NAMEFMT @@ -275,12 +276,16 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, sqlite3* handle, SERV trx_target(config_get_integer(params, "group_trx")), row_count(0), row_target(config_get_integer(params, "group_rows")), - block_size(config_get_size(params, "block_size")), - codec(static_cast(config_get_enum(params, "codec", codec_values))), sqlite_handle(handle), task_handle(0), stats{0} { + uint64_t block_size = config_get_size(params, "block_size"); + mxs_avro_codec_type codec = static_cast(config_get_enum(params, "codec", codec_values)); + + // TODO: pass this as a parameter or something + event_hander = new AvroConverter(avrodir, block_size, codec); + int pcreerr; size_t erroff; create_table_re = pcre2_compile((PCRE2_SPTR) create_table_regex, PCRE2_ZERO_TERMINATED, diff --git a/server/modules/routing/avrorouter/avro_converter.cc b/server/modules/routing/avrorouter/avro_converter.cc new file mode 100644 index 000000000..b257dedcd --- /dev/null +++ b/server/modules/routing/avrorouter/avro_converter.cc @@ -0,0 +1,392 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "avro_converter.hh" + +#include + +#include +#include +#include + +/** + * @brief Allocate an Avro table + * + * Create an Aro table and prepare it for writing. + * @param filepath Path to the created file + * @param json_schema The schema of the table in JSON format + */ +AvroTable* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec, + size_t block_size) +{ + avro_file_writer_t avro_file; + avro_value_iface_t* avro_writer_iface; + avro_schema_t avro_schema; + + if (avro_schema_from_json_length(json_schema, strlen(json_schema), + &avro_schema)) + { + MXS_ERROR("Avro error: %s", avro_strerror()); + MXS_INFO("Avro schema: %s", json_schema); + return NULL; + } + + int rc = 0; + + if (access(filepath, F_OK) == 0) + { + rc = avro_file_writer_open_bs(filepath, &avro_file, block_size); + } + else + { + rc = avro_file_writer_create_with_codec(filepath, avro_schema, + &avro_file, codec, block_size); + } + + if (rc) + { + MXS_ERROR("Avro error: %s", avro_strerror()); + avro_schema_decref(avro_schema); + return NULL; + } + + if ((avro_writer_iface = avro_generic_class_from_schema(avro_schema)) == NULL) + { + MXS_ERROR("Avro error: %s", avro_strerror()); + avro_schema_decref(avro_schema); + avro_file_writer_close(avro_file); + return NULL; + } + + AvroTable* table = new (std::nothrow) AvroTable(avro_file, avro_writer_iface, avro_schema); + + if (!table) + { + avro_file_writer_close(avro_file); + avro_value_iface_decref(avro_writer_iface); + avro_schema_decref(avro_schema); + MXS_OOM(); + } + + return table; +} + +/** + * @brief Convert the MySQL column type to a compatible Avro type + * + * Some fields are larger than they need to be but since the Avro integer + * compression is quite efficient, the real loss in performance is negligible. + * @param type MySQL column type + * @return String representation of the Avro type + */ +static const char* column_type_to_avro_type(uint8_t type) +{ + switch (type) + { + case TABLE_COL_TYPE_TINY: + case TABLE_COL_TYPE_SHORT: + case TABLE_COL_TYPE_LONG: + case TABLE_COL_TYPE_INT24: + case TABLE_COL_TYPE_BIT: + return "int"; + + case TABLE_COL_TYPE_FLOAT: + return "float"; + + case TABLE_COL_TYPE_DOUBLE: + case TABLE_COL_TYPE_NEWDECIMAL: + return "double"; + + case TABLE_COL_TYPE_NULL: + return "null"; + + case TABLE_COL_TYPE_LONGLONG: + return "long"; + + case TABLE_COL_TYPE_TINY_BLOB: + case TABLE_COL_TYPE_MEDIUM_BLOB: + case TABLE_COL_TYPE_LONG_BLOB: + case TABLE_COL_TYPE_BLOB: + return "bytes"; + + default: + return "string"; + } +} + +/** + * @brief Create a new JSON Avro schema from the table map and create table abstractions + * + * The schema will always have a GTID field and all records contain the current + * GTID of the transaction. + * @param map TABLE_MAP for this table + * @param create The TABLE_CREATE for this table + * @return New schema or NULL if an error occurred + */ +char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create) +{ + if (map->version != create->version) + { + MXS_ERROR("Version mismatch for table %s.%s. Table map version is %d and " + "the table definition version is %d.", map->database.c_str(), + map->table.c_str(), map->version, create->version); + ss_dassert(!true); // Should not happen + return NULL; + } + + json_error_t err; + memset(&err, 0, sizeof(err)); + json_t *schema = json_object(); + json_object_set_new(schema, "namespace", json_string("MaxScaleChangeDataSchema.avro")); + json_object_set_new(schema, "type", json_string("record")); + json_object_set_new(schema, "name", json_string("ChangeRecord")); + + json_t *array = json_array(); + json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", + avro_domain, "type", "int")); + json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", + avro_server_id, "type", "int")); + json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", + avro_sequence, "type", "int")); + json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", + avro_event_number, "type", "int")); + json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", + avro_timestamp, "type", "int")); + + /** Enums and other complex types are defined with complete JSON objects + * instead of string values */ + json_t *event_types = json_pack_ex(&err, 0, "{s:s, s:s, s:[s,s,s,s]}", "type", "enum", + "name", "EVENT_TYPES", "symbols", "insert", + "update_before", "update_after", "delete"); + + // Ownership of `event_types` is stolen when using the `o` format + json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:o}", "name", avro_event_type, + "type", event_types)); + + for (uint64_t i = 0; i < map->columns() && i < create->columns.size(); i++) + { + json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}", + "name", create->columns[i].name.c_str(), + "type", column_type_to_avro_type(map->column_types[i]), + "real_type", create->columns[i].type.c_str(), + "length", create->columns[i].length)); + } + json_object_set_new(schema, "fields", array); + char* rval = json_dumps(schema, JSON_PRESERVE_ORDER); + json_decref(schema); + return rval; +} + +/** + * @brief Save the Avro schema of a table to disk + * + * @param path Schema directory + * @param schema Schema in JSON format + * @param map Table map that @p schema represents + */ +void save_avro_schema(const char *path, const char* schema, const STableMapEvent& map, + const STableCreateEvent& create) +{ + char filepath[PATH_MAX]; + snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avsc", path, + map->database.c_str(), map->table.c_str(), map->version); + + if (access(filepath, F_OK) != 0) + { + if (!create->was_used) + { + FILE *file = fopen(filepath, "wb"); + if (file) + { + fprintf(file, "%s\n", schema); + fclose(file); + } + } + } +} + +static const char* codec_to_string(enum mxs_avro_codec_type type) +{ + switch (type) + { + case MXS_AVRO_CODEC_NULL: + return "null"; + case MXS_AVRO_CODEC_DEFLATE: + return "deflate"; + case MXS_AVRO_CODEC_SNAPPY: + return "snappy"; + default: + ss_dassert(false); + return "null"; + } +} + +AvroConverter::AvroConverter(std::string avrodir, uint64_t block_size, mxs_avro_codec_type codec): + m_avrodir(avrodir), + m_block_size(block_size), + m_codec(codec) +{ +} + +bool AvroConverter::open_table(const STableMapEvent& map, const STableCreateEvent& create) +{ + bool rval = false; + char* json_schema = json_new_schema_from_table(map, create); + + if (json_schema) + { + char filepath[PATH_MAX + 1]; + snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avro", m_avrodir.c_str(), + map->database.c_str(), map->table.c_str(), map->version); + + SAvroTable avro_table(avro_table_alloc(filepath, json_schema, + codec_to_string(m_codec), + m_block_size)); + + if (avro_table) + { + m_open_tables[map->database + "." + map->table] = avro_table; + save_avro_schema(m_avrodir.c_str(), json_schema, map, create); + m_map = map; + m_create = create; + rval = true; + } + else + { + MXS_ERROR("Failed to open new Avro file for writing."); + } + MXS_FREE(json_schema); + } + else + { + MXS_ERROR("Failed to create JSON schema."); + } + + return rval; +} + +bool AvroConverter::prepare_table(std::string database, std::string table) +{ + bool rval = false; + auto it = m_open_tables.find(database + "." + table); + + if (it != m_open_tables.end()) + { + m_writer_iface = it->second->avro_writer_iface; + m_avro_file = &it->second->avro_file; + rval = true; + } + + return rval; +} + +void AvroConverter::flush_tables() +{ + for (auto it = m_open_tables.begin(); it != m_open_tables.end(); it++) + { + avro_file_writer_flush(it->second->avro_file); + } +} + +void AvroConverter::prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) +{ + avro_generic_value_new(m_writer_iface, &m_record); + avro_value_get_by_name(&m_record, avro_domain, &m_field, NULL); + avro_value_set_int(&m_field, gtid.domain); + + avro_value_get_by_name(&m_record, avro_server_id, &m_field, NULL); + avro_value_set_int(&m_field, gtid.server_id); + + avro_value_get_by_name(&m_record, avro_sequence, &m_field, NULL); + avro_value_set_int(&m_field, gtid.seq); + + avro_value_get_by_name(&m_record, avro_event_number, &m_field, NULL); + avro_value_set_int(&m_field, gtid.event_num); + + avro_value_get_by_name(&m_record, avro_timestamp, &m_field, NULL); + avro_value_set_int(&m_field, hdr.timestamp); + + avro_value_get_by_name(&m_record, avro_event_type, &m_field, NULL); + avro_value_set_enum(&m_field, event_type); +} + +bool AvroConverter::commit(const gtid_pos_t& gtid) +{ + bool rval = true; + + if (avro_file_writer_append_value(*m_avro_file, &m_record)) + { + MXS_ERROR("Failed to write value: %s", avro_strerror()); + rval = false; + } + + return rval; +} + +void AvroConverter::column(int i, int32_t value) +{ + set_active(i); + avro_value_set_int(&m_field, value); +} + +void AvroConverter::column(int i, int64_t value) +{ + set_active(i); + avro_value_set_long(&m_field, value); +} + +void AvroConverter::column(int i, float value) +{ + set_active(i); + avro_value_set_float(&m_field, value); +} + +void AvroConverter::column(int i, double value) +{ + set_active(i); + avro_value_set_double(&m_field, value); +} + +void AvroConverter::column(int i, std::string value) +{ + set_active(i); + avro_value_set_string(&m_field, value.c_str()); +} + +void AvroConverter::column(int i, uint8_t* value, int len) +{ + set_active(i); + avro_value_set_bytes(&m_field, value, len); +} + +void AvroConverter::column(int i) +{ + set_active(i); + + if (column_is_blob(m_map->column_types[i])) + { + uint8_t nullvalue = 0; + avro_value_set_bytes(&m_field, &nullvalue, 1); + } + else + { + avro_value_set_null(&m_field); + } +} + +void AvroConverter::set_active(int i) +{ + ss_debug(int rc =)avro_value_get_by_name(&m_record, m_create->columns[i].name.c_str(), + &m_field, NULL); + ss_dassert(rc == 0); +} diff --git a/server/modules/routing/avrorouter/avro_converter.hh b/server/modules/routing/avrorouter/avro_converter.hh new file mode 100644 index 000000000..ff43d622f --- /dev/null +++ b/server/modules/routing/avrorouter/avro_converter.hh @@ -0,0 +1,77 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "rpl_events.hh" +#include "avrorouter.hh" + +#include + +struct AvroTable +{ + AvroTable(avro_file_writer_t file, avro_value_iface_t* iface, avro_schema_t schema): + avro_file(file), + avro_writer_iface(iface), + avro_schema(schema) + { + } + + ~AvroTable() + { + avro_file_writer_flush(avro_file); + avro_file_writer_close(avro_file); + avro_value_iface_decref(avro_writer_iface); + avro_schema_decref(avro_schema); + } + + avro_file_writer_t avro_file; /*< Current Avro data file */ + avro_value_iface_t* avro_writer_iface; /*< Avro C API writer interface */ + avro_schema_t avro_schema; /*< Native Avro schema of the table */ +}; + +typedef std::tr1::shared_ptr SAvroTable; +typedef std::tr1::unordered_map AvroTables; + +// Converts replicated events into CDC events +class AvroConverter : public RowEventHandler +{ +public: + + AvroConverter(std::string avrodir, uint64_t block_size, mxs_avro_codec_type codec); + bool open_table(const STableMapEvent& map, const STableCreateEvent& create); + bool prepare_table(std::string database, std::string table); + void flush_tables(); + void prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type); + bool commit(const gtid_pos_t& gtid); + void column(int i, int32_t value); + void column(int i, int64_t value); + void column(int i, float value); + void column(int i, double value); + void column(int i, std::string value); + void column(int i, uint8_t* value, int len); + void column(int i); + +private: + avro_value_iface_t* m_writer_iface; + avro_file_writer_t* m_avro_file; + avro_value_t m_record; + avro_value_t m_field; + std::string m_avrodir; + AvroTables m_open_tables; + uint64_t m_block_size; + mxs_avro_codec_type m_codec; + STableMapEvent m_map; + STableCreateEvent m_create; + + void set_active(int i); +}; diff --git a/server/modules/routing/avrorouter/avro_file.cc b/server/modules/routing/avrorouter/avro_file.cc index d02f6b439..c2c60b777 100644 --- a/server/modules/routing/avrorouter/avro_file.cc +++ b/server/modules/routing/avrorouter/avro_file.cc @@ -91,68 +91,6 @@ void avro_close_binlog(int fd) close(fd); } -/** - * @brief Allocate an Avro table - * - * Create an Aro table and prepare it for writing. - * @param filepath Path to the created file - * @param json_schema The schema of the table in JSON format - */ -AvroTable* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec, - size_t block_size) -{ - avro_file_writer_t avro_file; - avro_value_iface_t* avro_writer_iface; - avro_schema_t avro_schema; - - if (avro_schema_from_json_length(json_schema, strlen(json_schema), - &avro_schema)) - { - MXS_ERROR("Avro error: %s", avro_strerror()); - MXS_INFO("Avro schema: %s", json_schema); - return NULL; - } - - int rc = 0; - - if (access(filepath, F_OK) == 0) - { - rc = avro_file_writer_open_bs(filepath, &avro_file, block_size); - } - else - { - rc = avro_file_writer_create_with_codec(filepath, avro_schema, - &avro_file, codec, block_size); - } - - if (rc) - { - MXS_ERROR("Avro error: %s", avro_strerror()); - avro_schema_decref(avro_schema); - return NULL; - } - - if ((avro_writer_iface = avro_generic_class_from_schema(avro_schema)) == NULL) - { - MXS_ERROR("Avro error: %s", avro_strerror()); - avro_schema_decref(avro_schema); - avro_file_writer_close(avro_file); - return NULL; - } - - AvroTable* table = new (std::nothrow) AvroTable(avro_file, avro_writer_iface, avro_schema); - - if (!table) - { - avro_file_writer_close(avro_file); - avro_value_iface_decref(avro_writer_iface); - avro_schema_decref(avro_schema); - MXS_OOM(); - } - - return table; -} - /** * @brief Write a new ini file with current conversion status * @@ -448,7 +386,7 @@ void notify_all_clients(Avro *router) void do_checkpoint(Avro *router) { update_used_tables(router); - avro_flush_all_tables(router, AVROROUTER_FLUSH); + router->event_hander->flush_tables(); avro_save_conversion_state(router); notify_all_clients(router); router->row_count = router->trx_count = 0; @@ -585,13 +523,9 @@ void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos } else if (hdr.event_type == MARIADB10_GTID_EVENT) { - uint64_t n_sequence; /* 8 bytes */ - uint32_t domainid; /* 4 bytes */ - n_sequence = extract_field(ptr, 64); - domainid = extract_field(ptr + 8, 32); - router->gtid.domain = domainid; + router->gtid.domain = extract_field(ptr + 8, 32); router->gtid.server_id = hdr.serverid; - router->gtid.seq = n_sequence; + router->gtid.seq = extract_field(ptr, 64); router->gtid.event_num = 0; router->gtid.timestamp = hdr.timestamp; } @@ -788,26 +722,6 @@ void avro_load_metadata_from_schemas(Avro *router) globfree(&files); } -/** - * @brief Flush all Avro records to disk - * @param router Avro router instance - */ -void avro_flush_all_tables(Avro *router, enum avrorouter_file_op flush) -{ - for (auto it = router->open_tables.begin(); it != router->open_tables.end(); it++) - { - if (flush == AVROROUTER_FLUSH) - { - avro_file_writer_flush(it->second->avro_file); - } - else - { - ss_dassert(flush == AVROROUTER_SYNC); - avro_file_writer_sync(it->second->avro_file); - } - } -} - /** * @brief Detection of table creation statements * @param router Avro router instance diff --git a/server/modules/routing/avrorouter/avro_main.cc b/server/modules/routing/avrorouter/avro_main.cc index 701c95658..356652f8d 100644 --- a/server/modules/routing/avrorouter/avro_main.cc +++ b/server/modules/routing/avrorouter/avro_main.cc @@ -13,7 +13,6 @@ #include "avrorouter.hh" -#include #include #include #include @@ -24,7 +23,6 @@ #include #include #include -#include #include #include #include @@ -286,7 +284,7 @@ bool converter_func(Worker::Call::action_t action, Avro* router) /** We reached end of file, flush unwritten records to disk */ if (progress) { - avro_flush_all_tables(router, AVROROUTER_FLUSH); + router->event_hander->flush_tables(); avro_save_conversion_state(router); logged = false; } diff --git a/server/modules/routing/avrorouter/avro_rbr.cc b/server/modules/routing/avrorouter/avro_rbr.cc index dac01a9d6..a5392a285 100644 --- a/server/modules/routing/avrorouter/avro_rbr.cc +++ b/server/modules/routing/avrorouter/avro_rbr.cc @@ -33,122 +33,6 @@ static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET va void notify_all_clients(Avro *router); void add_used_table(Avro* router, const char* table); -class AvroConverter : public RowEventHandler -{ -public: - - AvroConverter(const STableMapEvent& map, const STableCreateEvent& create, SAvroTable table): - RowEventHandler(map, create), - m_writer_iface(table->avro_writer_iface), - m_avro_file(table->avro_file) - { - avro_generic_value_new(m_writer_iface, &m_record); - } - - ~AvroConverter() - { - avro_value_decref(&m_record); - } - - void prepare(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) - { - avro_value_get_by_name(&m_record, avro_domain, &m_field, NULL); - avro_value_set_int(&m_field, gtid.domain); - - avro_value_get_by_name(&m_record, avro_server_id, &m_field, NULL); - avro_value_set_int(&m_field, gtid.server_id); - - avro_value_get_by_name(&m_record, avro_sequence, &m_field, NULL); - avro_value_set_int(&m_field, gtid.seq); - - avro_value_get_by_name(&m_record, avro_event_number, &m_field, NULL); - avro_value_set_int(&m_field, gtid.event_num); - - avro_value_get_by_name(&m_record, avro_timestamp, &m_field, NULL); - avro_value_set_int(&m_field, hdr.timestamp); - - avro_value_get_by_name(&m_record, avro_event_type, &m_field, NULL); - avro_value_set_enum(&m_field, event_type); - } - - bool commit() - { - bool rval = true; - - if (avro_file_writer_append_value(m_avro_file, &m_record)) - { - MXS_ERROR("Failed to write value: %s", avro_strerror()); - rval = false; - } - - return rval; - } - - void column(int i, int32_t value) - { - set_active(i); - avro_value_set_int(&m_field, value); - } - - void column(int i, int64_t value) - { - set_active(i); - avro_value_set_long(&m_field, value); - } - - void column(int i, float value) - { - set_active(i); - avro_value_set_float(&m_field, value); - } - - void column(int i, double value) - { - set_active(i); - avro_value_set_double(&m_field, value); - } - - void column(int i, std::string value) - { - set_active(i); - avro_value_set_string(&m_field, value.c_str()); - } - - void column(int i, uint8_t* value, int len) - { - set_active(i); - avro_value_set_bytes(&m_field, value, len); - } - - void column(int i) - { - set_active(i); - - if (column_is_blob(m_map->column_types[i])) - { - uint8_t nullvalue = 0; - avro_value_set_bytes(&m_field, &nullvalue, 1); - } - else - { - avro_value_set_null(&m_field); - } - } - -private: - avro_value_iface_t* m_writer_iface; - avro_file_writer_t& m_avro_file; - avro_value_t m_record; - avro_value_t m_field; - - void set_active(int i) - { - ss_debug(int rc =)avro_value_get_by_name(&m_record, m_create->columns[i].name.c_str(), - &m_field, NULL); - ss_dassert(rc == 0); - } -}; - uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, RowEventHandler* conv, uint8_t *ptr, @@ -185,22 +69,6 @@ static int get_event_type(uint8_t event) } } -static const char* codec_to_string(enum mxs_avro_codec_type type) -{ - switch (type) - { - case MXS_AVRO_CODEC_NULL: - return "null"; - case MXS_AVRO_CODEC_DEFLATE: - return "deflate"; - case MXS_AVRO_CODEC_SNAPPY: - return "snappy"; - default: - ss_dassert(false); - return "null"; - } -} - /** * @brief Handle a table map event * @@ -238,50 +106,28 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) } } - char* json_schema = json_new_schema_from_table(map, create->second); - - if (json_schema) + if (router->event_hander->open_table(map, create->second)) { - char filepath[PATH_MAX + 1]; - snprintf(filepath, sizeof(filepath), "%s/%s.%06d.avro", - router->avrodir.c_str(), table_ident, map->version); + create->second->was_used = true; - SAvroTable avro_table(avro_table_alloc(filepath, json_schema, - codec_to_string(router->codec), - router->block_size)); + auto old = router->table_maps.find(table_ident); + bool notify = old != router->table_maps.end(); - if (avro_table) + if (notify) { - auto old = router->table_maps.find(table_ident); - bool notify = old != router->table_maps.end(); - - if (notify) - { - router->active_maps.erase(old->second->id); - } - - router->table_maps[table_ident] = map; - router->open_tables[table_ident] = avro_table; - save_avro_schema(router->avrodir.c_str(), json_schema, map, create->second); - router->active_maps[map->id] = map; - ss_dassert(router->active_maps[id] == map); - MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id); - rval = true; - - if (notify) - { - notify_all_clients(router); - } + router->active_maps.erase(old->second->id); } - else + + router->table_maps[table_ident] = map; + router->active_maps[map->id] = map; + ss_dassert(router->active_maps[id] == map); + MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id); + rval = true; + + if (notify) { - MXS_ERROR("Failed to open new Avro file for writing."); + notify_all_clients(router); } - MXS_FREE(json_schema); - } - else - { - MXS_ERROR("Failed to create JSON schema."); } } else @@ -378,21 +224,13 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) STableMapEvent map = it->second; char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2]; snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str()); - SAvroTable table; - auto it = router->open_tables.find(table_ident); - - if (it != router->open_tables.end()) - { - table = it->second; - } + bool ok = router->event_hander->prepare_table(map->database, map->table); auto create = router->created_tables.find(table_ident); - if (table && create != router->created_tables.end() && + if (ok && create != router->created_tables.end() && ncolumns == map->columns() && create->second->columns.size() == map->columns()) { - AvroConverter conv(map, create->second, table); - /** Each event has one or more rows in it. The number of rows is not known * beforehand so we must continue processing them until we reach the end * of the event. */ @@ -407,18 +245,18 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) // Increment the event count for this transaction router->gtid.event_num++; - conv.prepare(router->gtid, *hdr, event_type); - ptr = process_row_event_data(map, create->second, &conv, ptr, col_present, end); - conv.commit(); + router->event_hander->prepare_row(router->gtid, *hdr, event_type); + ptr = process_row_event_data(map, create->second, router->event_hander, ptr, col_present, end); + router->event_hander->commit(router->gtid); /** Update rows events have the before and after images of the * affected rows so we'll process them as another record with * a different type */ if (event_type == UPDATE_EVENT) { - conv.prepare(router->gtid, *hdr, UPDATE_EVENT_AFTER); - ptr = process_row_event_data(map, create->second, &conv, ptr, col_present, end); - conv.commit(); + router->event_hander->prepare_row(router->gtid, *hdr, UPDATE_EVENT_AFTER); + ptr = process_row_event_data(map, create->second, router->event_hander, ptr, col_present, end); + router->event_hander->commit(router->gtid); } rows++; @@ -427,7 +265,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) add_used_table(router, table_ident); rval = true; } - else if (!table) + else if (!ok) { MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier" " errors for more details.", map->database.c_str(), map->table.c_str()); diff --git a/server/modules/routing/avrorouter/avro_schema.cc b/server/modules/routing/avrorouter/avro_schema.cc index 9cf096265..28ed33d05 100644 --- a/server/modules/routing/avrorouter/avro_schema.cc +++ b/server/modules/routing/avrorouter/avro_schema.cc @@ -30,112 +30,6 @@ #include #include -/** - * @brief Convert the MySQL column type to a compatible Avro type - * - * Some fields are larger than they need to be but since the Avro integer - * compression is quite efficient, the real loss in performance is negligible. - * @param type MySQL column type - * @return String representation of the Avro type - */ -static const char* column_type_to_avro_type(uint8_t type) -{ - switch (type) - { - case TABLE_COL_TYPE_TINY: - case TABLE_COL_TYPE_SHORT: - case TABLE_COL_TYPE_LONG: - case TABLE_COL_TYPE_INT24: - case TABLE_COL_TYPE_BIT: - return "int"; - - case TABLE_COL_TYPE_FLOAT: - return "float"; - - case TABLE_COL_TYPE_DOUBLE: - case TABLE_COL_TYPE_NEWDECIMAL: - return "double"; - - case TABLE_COL_TYPE_NULL: - return "null"; - - case TABLE_COL_TYPE_LONGLONG: - return "long"; - - case TABLE_COL_TYPE_TINY_BLOB: - case TABLE_COL_TYPE_MEDIUM_BLOB: - case TABLE_COL_TYPE_LONG_BLOB: - case TABLE_COL_TYPE_BLOB: - return "bytes"; - - default: - return "string"; - } -} - -/** - * @brief Create a new JSON Avro schema from the table map and create table abstractions - * - * The schema will always have a GTID field and all records contain the current - * GTID of the transaction. - * @param map TABLE_MAP for this table - * @param create The TABLE_CREATE for this table - * @return New schema or NULL if an error occurred - */ -char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create) -{ - if (map->version != create->version) - { - MXS_ERROR("Version mismatch for table %s.%s. Table map version is %d and " - "the table definition version is %d.", map->database.c_str(), - map->table.c_str(), map->version, create->version); - ss_dassert(!true); // Should not happen - return NULL; - } - - json_error_t err; - memset(&err, 0, sizeof(err)); - json_t *schema = json_object(); - json_object_set_new(schema, "namespace", json_string("MaxScaleChangeDataSchema.avro")); - json_object_set_new(schema, "type", json_string("record")); - json_object_set_new(schema, "name", json_string("ChangeRecord")); - - json_t *array = json_array(); - json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", - avro_domain, "type", "int")); - json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", - avro_server_id, "type", "int")); - json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", - avro_sequence, "type", "int")); - json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", - avro_event_number, "type", "int")); - json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", - avro_timestamp, "type", "int")); - - /** Enums and other complex types are defined with complete JSON objects - * instead of string values */ - json_t *event_types = json_pack_ex(&err, 0, "{s:s, s:s, s:[s,s,s,s]}", "type", "enum", - "name", "EVENT_TYPES", "symbols", "insert", - "update_before", "update_after", "delete"); - - // Ownership of `event_types` is stolen when using the `o` format - json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:o}", "name", avro_event_type, - "type", event_types)); - - for (uint64_t i = 0; i < map->columns() && i < create->columns.size(); i++) - { - json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}", - "name", create->columns[i].name.c_str(), - "type", column_type_to_avro_type(map->column_types[i]), - "real_type", create->columns[i].type.c_str(), - "length", create->columns[i].length)); - } - json_object_set_new(schema, "fields", array); - char* rval = json_dumps(schema, JSON_PRESERVE_ORDER); - json_decref(schema); - return rval; -} - /** * @brief Check whether the field is one that was generated by the avrorouter * @@ -242,34 +136,6 @@ bool json_extract_field_names(const char* filename, std::vector& columns return rval; } -/** - * @brief Save the Avro schema of a table to disk - * - * @param path Schema directory - * @param schema Schema in JSON format - * @param map Table map that @p schema represents - */ -void save_avro_schema(const char *path, const char* schema, STableMapEvent& map, STableCreateEvent& create) -{ - char filepath[PATH_MAX]; - snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avsc", path, - map->database.c_str(), map->table.c_str(), map->version); - - if (access(filepath, F_OK) != 0) - { - if (!create->was_used) - { - FILE *file = fopen(filepath, "wb"); - if (file) - { - fprintf(file, "%s\n", schema); - create->was_used = true; - fclose(file); - } - } - } -} - /** * Extract the table definition from a CREATE TABLE statement * @param sql The SQL statement diff --git a/server/modules/routing/avrorouter/avrorouter.hh b/server/modules/routing/avrorouter/avrorouter.hh index e8bea0bb4..a77106f72 100644 --- a/server/modules/routing/avrorouter/avrorouter.hh +++ b/server/modules/routing/avrorouter/avrorouter.hh @@ -19,14 +19,12 @@ #include #include #include -#include #include #include #include #include #include #include -#include #include #include #include @@ -123,28 +121,6 @@ typedef enum avro_binlog_end /** How many bytes each thread tries to send */ #define AVRO_DATA_BURST_SIZE (32 * 1024) -struct AvroTable -{ - AvroTable(avro_file_writer_t file, avro_value_iface_t* iface, avro_schema_t schema): - avro_file(file), - avro_writer_iface(iface), - avro_schema(schema) - { - } - - ~AvroTable() - { - avro_file_writer_flush(avro_file); - avro_file_writer_close(avro_file); - avro_value_iface_decref(avro_writer_iface); - avro_schema_decref(avro_schema); - } - - avro_file_writer_t avro_file; /*< Current Avro data file */ - avro_value_iface_t* avro_writer_iface; /*< Avro C API writer interface */ - avro_schema_t avro_schema; /*< Native Avro schema of the table */ -}; - /** Data format used when streaming data to the clients */ enum avro_data_format { @@ -169,10 +145,7 @@ static const MXS_ENUM_VALUE codec_values[] = {NULL} }; -typedef std::tr1::shared_ptr SAvroTable; - typedef std::tr1::unordered_map CreatedTables; -typedef std::tr1::unordered_map AvroTables; typedef std::tr1::unordered_map MappedTables; typedef std::tr1::unordered_map ActiveMaps; @@ -199,7 +172,6 @@ public: gtid_pos_t gtid; ActiveMaps active_maps; MappedTables table_maps; - AvroTables open_tables; CreatedTables created_tables; uint64_t trx_count; /*< Transactions processed */ uint64_t trx_target; /*< Minimum about of transactions that will trigger @@ -207,10 +179,9 @@ public: uint64_t row_count; /*< Row events processed */ uint64_t row_target; /*< Minimum about of row events that will trigger * a flush of all tables */ - uint64_t block_size; /**< Avro datablock size */ - enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */ sqlite3* sqlite_handle; uint32_t task_handle; /**< Delayed task handle */ + RowEventHandler* event_hander; struct { @@ -288,10 +259,7 @@ extern void avro_client_rotate(Avro *router, AvroSession *client, uint8_t *ptr); extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd); extern void avro_close_binlog(int fd); extern avro_binlog_end_t avro_read_all_events(Avro *router); -extern AvroTable* avro_table_alloc(const char* filepath, const char* json_schema, - const char *codec, size_t block_size); extern char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create); -extern void save_avro_schema(const char *path, const char* schema, STableMapEvent& map, STableCreateEvent& create); extern bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); extern bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos); @@ -299,20 +267,6 @@ REP_HEADER construct_header(uint8_t* ptr); bool avro_save_conversion_state(Avro *router); void avro_update_index(Avro* router); -enum avrorouter_file_op -{ - AVROROUTER_SYNC, - AVROROUTER_FLUSH -}; - -/** - * @brief Flush or sync all tables - * - * @param router Router instance - * @param flush AVROROUTER_SYNC for sync only or AVROROUTER_FLUSH for full flush - */ -extern void avro_flush_all_tables(Avro *router, enum avrorouter_file_op flush); - #define AVRO_CLIENT_UNREGISTERED 0x0000 #define AVRO_CLIENT_REGISTERED 0x0001 #define AVRO_CLIENT_REQUEST_DATA 0x0002 diff --git a/server/modules/routing/avrorouter/rpl_events.hh b/server/modules/routing/avrorouter/rpl_events.hh index 0e3d5db33..df8ff43e3 100644 --- a/server/modules/routing/avrorouter/rpl_events.hh +++ b/server/modules/routing/avrorouter/rpl_events.hh @@ -15,6 +15,10 @@ #include #include #include +#include +#include + +#include typedef std::vector Bytes; @@ -113,21 +117,32 @@ typedef std::tr1::shared_ptr STableMapEvent; class RowEventHandler { public: - RowEventHandler(const STableMapEvent& map, const STableCreateEvent& create): - m_map(map), - m_create(create) - { - } - virtual ~RowEventHandler() { } + // A table was opened + virtual bool open_table(const STableMapEvent& map, const STableCreateEvent& create) + { + return true; + } + // Prepare a new row for processing - virtual void prepare(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) = 0; + virtual bool prepare_table(std::string database, std::string table) + { + return true; + } + + // Flush open tables + virtual void flush_tables() + { + } + + // Prepare a new row for processing + virtual void prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) = 0; // Called once all columns are processed - virtual bool commit() = 0; + virtual bool commit(const gtid_pos_t& gtid) = 0; // 32-bit integer handler virtual void column(int i, int32_t value) = 0; @@ -149,8 +164,4 @@ public: // Empty (NULL) value type handler virtual void column(int i) = 0; - -protected: - const STableMapEvent& m_map; // The table map event for this row - const STableCreateEvent& m_create; // The CREATE TABLE statement for this row };