diff --git a/server/modules/routing/avrorouter/CMakeLists.txt b/server/modules/routing/avrorouter/CMakeLists.txt index ae8dd991f..bf6b4811b 100644 --- a/server/modules/routing/avrorouter/CMakeLists.txt +++ b/server/modules/routing/avrorouter/CMakeLists.txt @@ -3,7 +3,8 @@ if(AVRO_FOUND AND JANSSON_FOUND) include_directories(${JANSSON_INCLUDE_DIR}) # The common avrorouter functionality - add_library(avro-common SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc avro_schema.cc avro_rbr.cc avro_file.cc avro_index.cc) + add_library(avro-common SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc + avro_schema.cc avro_rbr.cc avro_file.cc avro_index.cc avro_converter.cc) set_target_properties(avro-common PROPERTIES VERSION "1.0.0") set_target_properties(avro-common PROPERTIES LINK_FLAGS -Wl,-z,defs) target_link_libraries(avro-common maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro lzma) diff --git a/server/modules/routing/avrorouter/avro.cc b/server/modules/routing/avrorouter/avro.cc index fb8f192a7..d15fe1100 100644 --- a/server/modules/routing/avrorouter/avro.cc +++ b/server/modules/routing/avrorouter/avro.cc @@ -18,7 +18,6 @@ #include "avrorouter.hh" -#include #include #include #include @@ -47,6 +46,8 @@ #include #include +#include "avro_converter.hh" + using namespace maxscale; #ifndef BINLOG_NAMEFMT @@ -275,12 +276,16 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, sqlite3* handle, SERV trx_target(config_get_integer(params, "group_trx")), row_count(0), row_target(config_get_integer(params, "group_rows")), - block_size(config_get_size(params, "block_size")), - codec(static_cast(config_get_enum(params, "codec", codec_values))), sqlite_handle(handle), task_handle(0), stats{0} { + uint64_t block_size = config_get_size(params, "block_size"); + mxs_avro_codec_type codec = static_cast(config_get_enum(params, "codec", codec_values)); + + // TODO: pass this as a parameter or something + event_hander = new AvroConverter(avrodir, block_size, codec); + int pcreerr; size_t erroff; create_table_re = pcre2_compile((PCRE2_SPTR) create_table_regex, PCRE2_ZERO_TERMINATED, diff --git a/server/modules/routing/avrorouter/avro_converter.cc b/server/modules/routing/avrorouter/avro_converter.cc new file mode 100644 index 000000000..b257dedcd --- /dev/null +++ b/server/modules/routing/avrorouter/avro_converter.cc @@ -0,0 +1,392 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "avro_converter.hh" + +#include + +#include +#include +#include + +/** + * @brief Allocate an Avro table + * + * Create an Aro table and prepare it for writing. + * @param filepath Path to the created file + * @param json_schema The schema of the table in JSON format + */ +AvroTable* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec, + size_t block_size) +{ + avro_file_writer_t avro_file; + avro_value_iface_t* avro_writer_iface; + avro_schema_t avro_schema; + + if (avro_schema_from_json_length(json_schema, strlen(json_schema), + &avro_schema)) + { + MXS_ERROR("Avro error: %s", avro_strerror()); + MXS_INFO("Avro schema: %s", json_schema); + return NULL; + } + + int rc = 0; + + if (access(filepath, F_OK) == 0) + { + rc = avro_file_writer_open_bs(filepath, &avro_file, block_size); + } + else + { + rc = avro_file_writer_create_with_codec(filepath, avro_schema, + &avro_file, codec, block_size); + } + + if (rc) + { + MXS_ERROR("Avro error: %s", avro_strerror()); + avro_schema_decref(avro_schema); + return NULL; + } + + if ((avro_writer_iface = avro_generic_class_from_schema(avro_schema)) == NULL) + { + MXS_ERROR("Avro error: %s", avro_strerror()); + avro_schema_decref(avro_schema); + avro_file_writer_close(avro_file); + return NULL; + } + + AvroTable* table = new (std::nothrow) AvroTable(avro_file, avro_writer_iface, avro_schema); + + if (!table) + { + avro_file_writer_close(avro_file); + avro_value_iface_decref(avro_writer_iface); + avro_schema_decref(avro_schema); + MXS_OOM(); + } + + return table; +} + +/** + * @brief Convert the MySQL column type to a compatible Avro type + * + * Some fields are larger than they need to be but since the Avro integer + * compression is quite efficient, the real loss in performance is negligible. + * @param type MySQL column type + * @return String representation of the Avro type + */ +static const char* column_type_to_avro_type(uint8_t type) +{ + switch (type) + { + case TABLE_COL_TYPE_TINY: + case TABLE_COL_TYPE_SHORT: + case TABLE_COL_TYPE_LONG: + case TABLE_COL_TYPE_INT24: + case TABLE_COL_TYPE_BIT: + return "int"; + + case TABLE_COL_TYPE_FLOAT: + return "float"; + + case TABLE_COL_TYPE_DOUBLE: + case TABLE_COL_TYPE_NEWDECIMAL: + return "double"; + + case TABLE_COL_TYPE_NULL: + return "null"; + + case TABLE_COL_TYPE_LONGLONG: + return "long"; + + case TABLE_COL_TYPE_TINY_BLOB: + case TABLE_COL_TYPE_MEDIUM_BLOB: + case TABLE_COL_TYPE_LONG_BLOB: + case TABLE_COL_TYPE_BLOB: + return "bytes"; + + default: + return "string"; + } +} + +/** + * @brief Create a new JSON Avro schema from the table map and create table abstractions + * + * The schema will always have a GTID field and all records contain the current + * GTID of the transaction. + * @param map TABLE_MAP for this table + * @param create The TABLE_CREATE for this table + * @return New schema or NULL if an error occurred + */ +char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create) +{ + if (map->version != create->version) + { + MXS_ERROR("Version mismatch for table %s.%s. Table map version is %d and " + "the table definition version is %d.", map->database.c_str(), + map->table.c_str(), map->version, create->version); + ss_dassert(!true); // Should not happen + return NULL; + } + + json_error_t err; + memset(&err, 0, sizeof(err)); + json_t *schema = json_object(); + json_object_set_new(schema, "namespace", json_string("MaxScaleChangeDataSchema.avro")); + json_object_set_new(schema, "type", json_string("record")); + json_object_set_new(schema, "name", json_string("ChangeRecord")); + + json_t *array = json_array(); + json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", + avro_domain, "type", "int")); + json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", + avro_server_id, "type", "int")); + json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", + avro_sequence, "type", "int")); + json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", + avro_event_number, "type", "int")); + json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", + avro_timestamp, "type", "int")); + + /** Enums and other complex types are defined with complete JSON objects + * instead of string values */ + json_t *event_types = json_pack_ex(&err, 0, "{s:s, s:s, s:[s,s,s,s]}", "type", "enum", + "name", "EVENT_TYPES", "symbols", "insert", + "update_before", "update_after", "delete"); + + // Ownership of `event_types` is stolen when using the `o` format + json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:o}", "name", avro_event_type, + "type", event_types)); + + for (uint64_t i = 0; i < map->columns() && i < create->columns.size(); i++) + { + json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}", + "name", create->columns[i].name.c_str(), + "type", column_type_to_avro_type(map->column_types[i]), + "real_type", create->columns[i].type.c_str(), + "length", create->columns[i].length)); + } + json_object_set_new(schema, "fields", array); + char* rval = json_dumps(schema, JSON_PRESERVE_ORDER); + json_decref(schema); + return rval; +} + +/** + * @brief Save the Avro schema of a table to disk + * + * @param path Schema directory + * @param schema Schema in JSON format + * @param map Table map that @p schema represents + */ +void save_avro_schema(const char *path, const char* schema, const STableMapEvent& map, + const STableCreateEvent& create) +{ + char filepath[PATH_MAX]; + snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avsc", path, + map->database.c_str(), map->table.c_str(), map->version); + + if (access(filepath, F_OK) != 0) + { + if (!create->was_used) + { + FILE *file = fopen(filepath, "wb"); + if (file) + { + fprintf(file, "%s\n", schema); + fclose(file); + } + } + } +} + +static const char* codec_to_string(enum mxs_avro_codec_type type) +{ + switch (type) + { + case MXS_AVRO_CODEC_NULL: + return "null"; + case MXS_AVRO_CODEC_DEFLATE: + return "deflate"; + case MXS_AVRO_CODEC_SNAPPY: + return "snappy"; + default: + ss_dassert(false); + return "null"; + } +} + +AvroConverter::AvroConverter(std::string avrodir, uint64_t block_size, mxs_avro_codec_type codec): + m_avrodir(avrodir), + m_block_size(block_size), + m_codec(codec) +{ +} + +bool AvroConverter::open_table(const STableMapEvent& map, const STableCreateEvent& create) +{ + bool rval = false; + char* json_schema = json_new_schema_from_table(map, create); + + if (json_schema) + { + char filepath[PATH_MAX + 1]; + snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avro", m_avrodir.c_str(), + map->database.c_str(), map->table.c_str(), map->version); + + SAvroTable avro_table(avro_table_alloc(filepath, json_schema, + codec_to_string(m_codec), + m_block_size)); + + if (avro_table) + { + m_open_tables[map->database + "." + map->table] = avro_table; + save_avro_schema(m_avrodir.c_str(), json_schema, map, create); + m_map = map; + m_create = create; + rval = true; + } + else + { + MXS_ERROR("Failed to open new Avro file for writing."); + } + MXS_FREE(json_schema); + } + else + { + MXS_ERROR("Failed to create JSON schema."); + } + + return rval; +} + +bool AvroConverter::prepare_table(std::string database, std::string table) +{ + bool rval = false; + auto it = m_open_tables.find(database + "." + table); + + if (it != m_open_tables.end()) + { + m_writer_iface = it->second->avro_writer_iface; + m_avro_file = &it->second->avro_file; + rval = true; + } + + return rval; +} + +void AvroConverter::flush_tables() +{ + for (auto it = m_open_tables.begin(); it != m_open_tables.end(); it++) + { + avro_file_writer_flush(it->second->avro_file); + } +} + +void AvroConverter::prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) +{ + avro_generic_value_new(m_writer_iface, &m_record); + avro_value_get_by_name(&m_record, avro_domain, &m_field, NULL); + avro_value_set_int(&m_field, gtid.domain); + + avro_value_get_by_name(&m_record, avro_server_id, &m_field, NULL); + avro_value_set_int(&m_field, gtid.server_id); + + avro_value_get_by_name(&m_record, avro_sequence, &m_field, NULL); + avro_value_set_int(&m_field, gtid.seq); + + avro_value_get_by_name(&m_record, avro_event_number, &m_field, NULL); + avro_value_set_int(&m_field, gtid.event_num); + + avro_value_get_by_name(&m_record, avro_timestamp, &m_field, NULL); + avro_value_set_int(&m_field, hdr.timestamp); + + avro_value_get_by_name(&m_record, avro_event_type, &m_field, NULL); + avro_value_set_enum(&m_field, event_type); +} + +bool AvroConverter::commit(const gtid_pos_t& gtid) +{ + bool rval = true; + + if (avro_file_writer_append_value(*m_avro_file, &m_record)) + { + MXS_ERROR("Failed to write value: %s", avro_strerror()); + rval = false; + } + + return rval; +} + +void AvroConverter::column(int i, int32_t value) +{ + set_active(i); + avro_value_set_int(&m_field, value); +} + +void AvroConverter::column(int i, int64_t value) +{ + set_active(i); + avro_value_set_long(&m_field, value); +} + +void AvroConverter::column(int i, float value) +{ + set_active(i); + avro_value_set_float(&m_field, value); +} + +void AvroConverter::column(int i, double value) +{ + set_active(i); + avro_value_set_double(&m_field, value); +} + +void AvroConverter::column(int i, std::string value) +{ + set_active(i); + avro_value_set_string(&m_field, value.c_str()); +} + +void AvroConverter::column(int i, uint8_t* value, int len) +{ + set_active(i); + avro_value_set_bytes(&m_field, value, len); +} + +void AvroConverter::column(int i) +{ + set_active(i); + + if (column_is_blob(m_map->column_types[i])) + { + uint8_t nullvalue = 0; + avro_value_set_bytes(&m_field, &nullvalue, 1); + } + else + { + avro_value_set_null(&m_field); + } +} + +void AvroConverter::set_active(int i) +{ + ss_debug(int rc =)avro_value_get_by_name(&m_record, m_create->columns[i].name.c_str(), + &m_field, NULL); + ss_dassert(rc == 0); +} diff --git a/server/modules/routing/avrorouter/avro_converter.hh b/server/modules/routing/avrorouter/avro_converter.hh new file mode 100644 index 000000000..ff43d622f --- /dev/null +++ b/server/modules/routing/avrorouter/avro_converter.hh @@ -0,0 +1,77 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "rpl_events.hh" +#include "avrorouter.hh" + +#include + +struct AvroTable +{ + AvroTable(avro_file_writer_t file, avro_value_iface_t* iface, avro_schema_t schema): + avro_file(file), + avro_writer_iface(iface), + avro_schema(schema) + { + } + + ~AvroTable() + { + avro_file_writer_flush(avro_file); + avro_file_writer_close(avro_file); + avro_value_iface_decref(avro_writer_iface); + avro_schema_decref(avro_schema); + } + + avro_file_writer_t avro_file; /*< Current Avro data file */ + avro_value_iface_t* avro_writer_iface; /*< Avro C API writer interface */ + avro_schema_t avro_schema; /*< Native Avro schema of the table */ +}; + +typedef std::tr1::shared_ptr SAvroTable; +typedef std::tr1::unordered_map AvroTables; + +// Converts replicated events into CDC events +class AvroConverter : public RowEventHandler +{ +public: + + AvroConverter(std::string avrodir, uint64_t block_size, mxs_avro_codec_type codec); + bool open_table(const STableMapEvent& map, const STableCreateEvent& create); + bool prepare_table(std::string database, std::string table); + void flush_tables(); + void prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type); + bool commit(const gtid_pos_t& gtid); + void column(int i, int32_t value); + void column(int i, int64_t value); + void column(int i, float value); + void column(int i, double value); + void column(int i, std::string value); + void column(int i, uint8_t* value, int len); + void column(int i); + +private: + avro_value_iface_t* m_writer_iface; + avro_file_writer_t* m_avro_file; + avro_value_t m_record; + avro_value_t m_field; + std::string m_avrodir; + AvroTables m_open_tables; + uint64_t m_block_size; + mxs_avro_codec_type m_codec; + STableMapEvent m_map; + STableCreateEvent m_create; + + void set_active(int i); +}; diff --git a/server/modules/routing/avrorouter/avro_file.cc b/server/modules/routing/avrorouter/avro_file.cc index d02f6b439..c2c60b777 100644 --- a/server/modules/routing/avrorouter/avro_file.cc +++ b/server/modules/routing/avrorouter/avro_file.cc @@ -91,68 +91,6 @@ void avro_close_binlog(int fd) close(fd); } -/** - * @brief Allocate an Avro table - * - * Create an Aro table and prepare it for writing. - * @param filepath Path to the created file - * @param json_schema The schema of the table in JSON format - */ -AvroTable* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec, - size_t block_size) -{ - avro_file_writer_t avro_file; - avro_value_iface_t* avro_writer_iface; - avro_schema_t avro_schema; - - if (avro_schema_from_json_length(json_schema, strlen(json_schema), - &avro_schema)) - { - MXS_ERROR("Avro error: %s", avro_strerror()); - MXS_INFO("Avro schema: %s", json_schema); - return NULL; - } - - int rc = 0; - - if (access(filepath, F_OK) == 0) - { - rc = avro_file_writer_open_bs(filepath, &avro_file, block_size); - } - else - { - rc = avro_file_writer_create_with_codec(filepath, avro_schema, - &avro_file, codec, block_size); - } - - if (rc) - { - MXS_ERROR("Avro error: %s", avro_strerror()); - avro_schema_decref(avro_schema); - return NULL; - } - - if ((avro_writer_iface = avro_generic_class_from_schema(avro_schema)) == NULL) - { - MXS_ERROR("Avro error: %s", avro_strerror()); - avro_schema_decref(avro_schema); - avro_file_writer_close(avro_file); - return NULL; - } - - AvroTable* table = new (std::nothrow) AvroTable(avro_file, avro_writer_iface, avro_schema); - - if (!table) - { - avro_file_writer_close(avro_file); - avro_value_iface_decref(avro_writer_iface); - avro_schema_decref(avro_schema); - MXS_OOM(); - } - - return table; -} - /** * @brief Write a new ini file with current conversion status * @@ -448,7 +386,7 @@ void notify_all_clients(Avro *router) void do_checkpoint(Avro *router) { update_used_tables(router); - avro_flush_all_tables(router, AVROROUTER_FLUSH); + router->event_hander->flush_tables(); avro_save_conversion_state(router); notify_all_clients(router); router->row_count = router->trx_count = 0; @@ -585,13 +523,9 @@ void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos } else if (hdr.event_type == MARIADB10_GTID_EVENT) { - uint64_t n_sequence; /* 8 bytes */ - uint32_t domainid; /* 4 bytes */ - n_sequence = extract_field(ptr, 64); - domainid = extract_field(ptr + 8, 32); - router->gtid.domain = domainid; + router->gtid.domain = extract_field(ptr + 8, 32); router->gtid.server_id = hdr.serverid; - router->gtid.seq = n_sequence; + router->gtid.seq = extract_field(ptr, 64); router->gtid.event_num = 0; router->gtid.timestamp = hdr.timestamp; } @@ -788,26 +722,6 @@ void avro_load_metadata_from_schemas(Avro *router) globfree(&files); } -/** - * @brief Flush all Avro records to disk - * @param router Avro router instance - */ -void avro_flush_all_tables(Avro *router, enum avrorouter_file_op flush) -{ - for (auto it = router->open_tables.begin(); it != router->open_tables.end(); it++) - { - if (flush == AVROROUTER_FLUSH) - { - avro_file_writer_flush(it->second->avro_file); - } - else - { - ss_dassert(flush == AVROROUTER_SYNC); - avro_file_writer_sync(it->second->avro_file); - } - } -} - /** * @brief Detection of table creation statements * @param router Avro router instance diff --git a/server/modules/routing/avrorouter/avro_main.cc b/server/modules/routing/avrorouter/avro_main.cc index 701c95658..356652f8d 100644 --- a/server/modules/routing/avrorouter/avro_main.cc +++ b/server/modules/routing/avrorouter/avro_main.cc @@ -13,7 +13,6 @@ #include "avrorouter.hh" -#include #include #include #include @@ -24,7 +23,6 @@ #include #include #include -#include #include #include #include @@ -286,7 +284,7 @@ bool converter_func(Worker::Call::action_t action, Avro* router) /** We reached end of file, flush unwritten records to disk */ if (progress) { - avro_flush_all_tables(router, AVROROUTER_FLUSH); + router->event_hander->flush_tables(); avro_save_conversion_state(router); logged = false; } diff --git a/server/modules/routing/avrorouter/avro_rbr.cc b/server/modules/routing/avrorouter/avro_rbr.cc index dac01a9d6..a5392a285 100644 --- a/server/modules/routing/avrorouter/avro_rbr.cc +++ b/server/modules/routing/avrorouter/avro_rbr.cc @@ -33,122 +33,6 @@ static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET va void notify_all_clients(Avro *router); void add_used_table(Avro* router, const char* table); -class AvroConverter : public RowEventHandler -{ -public: - - AvroConverter(const STableMapEvent& map, const STableCreateEvent& create, SAvroTable table): - RowEventHandler(map, create), - m_writer_iface(table->avro_writer_iface), - m_avro_file(table->avro_file) - { - avro_generic_value_new(m_writer_iface, &m_record); - } - - ~AvroConverter() - { - avro_value_decref(&m_record); - } - - void prepare(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) - { - avro_value_get_by_name(&m_record, avro_domain, &m_field, NULL); - avro_value_set_int(&m_field, gtid.domain); - - avro_value_get_by_name(&m_record, avro_server_id, &m_field, NULL); - avro_value_set_int(&m_field, gtid.server_id); - - avro_value_get_by_name(&m_record, avro_sequence, &m_field, NULL); - avro_value_set_int(&m_field, gtid.seq); - - avro_value_get_by_name(&m_record, avro_event_number, &m_field, NULL); - avro_value_set_int(&m_field, gtid.event_num); - - avro_value_get_by_name(&m_record, avro_timestamp, &m_field, NULL); - avro_value_set_int(&m_field, hdr.timestamp); - - avro_value_get_by_name(&m_record, avro_event_type, &m_field, NULL); - avro_value_set_enum(&m_field, event_type); - } - - bool commit() - { - bool rval = true; - - if (avro_file_writer_append_value(m_avro_file, &m_record)) - { - MXS_ERROR("Failed to write value: %s", avro_strerror()); - rval = false; - } - - return rval; - } - - void column(int i, int32_t value) - { - set_active(i); - avro_value_set_int(&m_field, value); - } - - void column(int i, int64_t value) - { - set_active(i); - avro_value_set_long(&m_field, value); - } - - void column(int i, float value) - { - set_active(i); - avro_value_set_float(&m_field, value); - } - - void column(int i, double value) - { - set_active(i); - avro_value_set_double(&m_field, value); - } - - void column(int i, std::string value) - { - set_active(i); - avro_value_set_string(&m_field, value.c_str()); - } - - void column(int i, uint8_t* value, int len) - { - set_active(i); - avro_value_set_bytes(&m_field, value, len); - } - - void column(int i) - { - set_active(i); - - if (column_is_blob(m_map->column_types[i])) - { - uint8_t nullvalue = 0; - avro_value_set_bytes(&m_field, &nullvalue, 1); - } - else - { - avro_value_set_null(&m_field); - } - } - -private: - avro_value_iface_t* m_writer_iface; - avro_file_writer_t& m_avro_file; - avro_value_t m_record; - avro_value_t m_field; - - void set_active(int i) - { - ss_debug(int rc =)avro_value_get_by_name(&m_record, m_create->columns[i].name.c_str(), - &m_field, NULL); - ss_dassert(rc == 0); - } -}; - uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, RowEventHandler* conv, uint8_t *ptr, @@ -185,22 +69,6 @@ static int get_event_type(uint8_t event) } } -static const char* codec_to_string(enum mxs_avro_codec_type type) -{ - switch (type) - { - case MXS_AVRO_CODEC_NULL: - return "null"; - case MXS_AVRO_CODEC_DEFLATE: - return "deflate"; - case MXS_AVRO_CODEC_SNAPPY: - return "snappy"; - default: - ss_dassert(false); - return "null"; - } -} - /** * @brief Handle a table map event * @@ -238,50 +106,28 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) } } - char* json_schema = json_new_schema_from_table(map, create->second); - - if (json_schema) + if (router->event_hander->open_table(map, create->second)) { - char filepath[PATH_MAX + 1]; - snprintf(filepath, sizeof(filepath), "%s/%s.%06d.avro", - router->avrodir.c_str(), table_ident, map->version); + create->second->was_used = true; - SAvroTable avro_table(avro_table_alloc(filepath, json_schema, - codec_to_string(router->codec), - router->block_size)); + auto old = router->table_maps.find(table_ident); + bool notify = old != router->table_maps.end(); - if (avro_table) + if (notify) { - auto old = router->table_maps.find(table_ident); - bool notify = old != router->table_maps.end(); - - if (notify) - { - router->active_maps.erase(old->second->id); - } - - router->table_maps[table_ident] = map; - router->open_tables[table_ident] = avro_table; - save_avro_schema(router->avrodir.c_str(), json_schema, map, create->second); - router->active_maps[map->id] = map; - ss_dassert(router->active_maps[id] == map); - MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id); - rval = true; - - if (notify) - { - notify_all_clients(router); - } + router->active_maps.erase(old->second->id); } - else + + router->table_maps[table_ident] = map; + router->active_maps[map->id] = map; + ss_dassert(router->active_maps[id] == map); + MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id); + rval = true; + + if (notify) { - MXS_ERROR("Failed to open new Avro file for writing."); + notify_all_clients(router); } - MXS_FREE(json_schema); - } - else - { - MXS_ERROR("Failed to create JSON schema."); } } else @@ -378,21 +224,13 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) STableMapEvent map = it->second; char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2]; snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str()); - SAvroTable table; - auto it = router->open_tables.find(table_ident); - - if (it != router->open_tables.end()) - { - table = it->second; - } + bool ok = router->event_hander->prepare_table(map->database, map->table); auto create = router->created_tables.find(table_ident); - if (table && create != router->created_tables.end() && + if (ok && create != router->created_tables.end() && ncolumns == map->columns() && create->second->columns.size() == map->columns()) { - AvroConverter conv(map, create->second, table); - /** Each event has one or more rows in it. The number of rows is not known * beforehand so we must continue processing them until we reach the end * of the event. */ @@ -407,18 +245,18 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) // Increment the event count for this transaction router->gtid.event_num++; - conv.prepare(router->gtid, *hdr, event_type); - ptr = process_row_event_data(map, create->second, &conv, ptr, col_present, end); - conv.commit(); + router->event_hander->prepare_row(router->gtid, *hdr, event_type); + ptr = process_row_event_data(map, create->second, router->event_hander, ptr, col_present, end); + router->event_hander->commit(router->gtid); /** Update rows events have the before and after images of the * affected rows so we'll process them as another record with * a different type */ if (event_type == UPDATE_EVENT) { - conv.prepare(router->gtid, *hdr, UPDATE_EVENT_AFTER); - ptr = process_row_event_data(map, create->second, &conv, ptr, col_present, end); - conv.commit(); + router->event_hander->prepare_row(router->gtid, *hdr, UPDATE_EVENT_AFTER); + ptr = process_row_event_data(map, create->second, router->event_hander, ptr, col_present, end); + router->event_hander->commit(router->gtid); } rows++; @@ -427,7 +265,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) add_used_table(router, table_ident); rval = true; } - else if (!table) + else if (!ok) { MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier" " errors for more details.", map->database.c_str(), map->table.c_str()); diff --git a/server/modules/routing/avrorouter/avro_schema.cc b/server/modules/routing/avrorouter/avro_schema.cc index 9cf096265..28ed33d05 100644 --- a/server/modules/routing/avrorouter/avro_schema.cc +++ b/server/modules/routing/avrorouter/avro_schema.cc @@ -30,112 +30,6 @@ #include #include -/** - * @brief Convert the MySQL column type to a compatible Avro type - * - * Some fields are larger than they need to be but since the Avro integer - * compression is quite efficient, the real loss in performance is negligible. - * @param type MySQL column type - * @return String representation of the Avro type - */ -static const char* column_type_to_avro_type(uint8_t type) -{ - switch (type) - { - case TABLE_COL_TYPE_TINY: - case TABLE_COL_TYPE_SHORT: - case TABLE_COL_TYPE_LONG: - case TABLE_COL_TYPE_INT24: - case TABLE_COL_TYPE_BIT: - return "int"; - - case TABLE_COL_TYPE_FLOAT: - return "float"; - - case TABLE_COL_TYPE_DOUBLE: - case TABLE_COL_TYPE_NEWDECIMAL: - return "double"; - - case TABLE_COL_TYPE_NULL: - return "null"; - - case TABLE_COL_TYPE_LONGLONG: - return "long"; - - case TABLE_COL_TYPE_TINY_BLOB: - case TABLE_COL_TYPE_MEDIUM_BLOB: - case TABLE_COL_TYPE_LONG_BLOB: - case TABLE_COL_TYPE_BLOB: - return "bytes"; - - default: - return "string"; - } -} - -/** - * @brief Create a new JSON Avro schema from the table map and create table abstractions - * - * The schema will always have a GTID field and all records contain the current - * GTID of the transaction. - * @param map TABLE_MAP for this table - * @param create The TABLE_CREATE for this table - * @return New schema or NULL if an error occurred - */ -char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create) -{ - if (map->version != create->version) - { - MXS_ERROR("Version mismatch for table %s.%s. Table map version is %d and " - "the table definition version is %d.", map->database.c_str(), - map->table.c_str(), map->version, create->version); - ss_dassert(!true); // Should not happen - return NULL; - } - - json_error_t err; - memset(&err, 0, sizeof(err)); - json_t *schema = json_object(); - json_object_set_new(schema, "namespace", json_string("MaxScaleChangeDataSchema.avro")); - json_object_set_new(schema, "type", json_string("record")); - json_object_set_new(schema, "name", json_string("ChangeRecord")); - - json_t *array = json_array(); - json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", - avro_domain, "type", "int")); - json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", - avro_server_id, "type", "int")); - json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", - avro_sequence, "type", "int")); - json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", - avro_event_number, "type", "int")); - json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", - avro_timestamp, "type", "int")); - - /** Enums and other complex types are defined with complete JSON objects - * instead of string values */ - json_t *event_types = json_pack_ex(&err, 0, "{s:s, s:s, s:[s,s,s,s]}", "type", "enum", - "name", "EVENT_TYPES", "symbols", "insert", - "update_before", "update_after", "delete"); - - // Ownership of `event_types` is stolen when using the `o` format - json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:o}", "name", avro_event_type, - "type", event_types)); - - for (uint64_t i = 0; i < map->columns() && i < create->columns.size(); i++) - { - json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}", - "name", create->columns[i].name.c_str(), - "type", column_type_to_avro_type(map->column_types[i]), - "real_type", create->columns[i].type.c_str(), - "length", create->columns[i].length)); - } - json_object_set_new(schema, "fields", array); - char* rval = json_dumps(schema, JSON_PRESERVE_ORDER); - json_decref(schema); - return rval; -} - /** * @brief Check whether the field is one that was generated by the avrorouter * @@ -242,34 +136,6 @@ bool json_extract_field_names(const char* filename, std::vector& columns return rval; } -/** - * @brief Save the Avro schema of a table to disk - * - * @param path Schema directory - * @param schema Schema in JSON format - * @param map Table map that @p schema represents - */ -void save_avro_schema(const char *path, const char* schema, STableMapEvent& map, STableCreateEvent& create) -{ - char filepath[PATH_MAX]; - snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avsc", path, - map->database.c_str(), map->table.c_str(), map->version); - - if (access(filepath, F_OK) != 0) - { - if (!create->was_used) - { - FILE *file = fopen(filepath, "wb"); - if (file) - { - fprintf(file, "%s\n", schema); - create->was_used = true; - fclose(file); - } - } - } -} - /** * Extract the table definition from a CREATE TABLE statement * @param sql The SQL statement diff --git a/server/modules/routing/avrorouter/avrorouter.hh b/server/modules/routing/avrorouter/avrorouter.hh index e8bea0bb4..a77106f72 100644 --- a/server/modules/routing/avrorouter/avrorouter.hh +++ b/server/modules/routing/avrorouter/avrorouter.hh @@ -19,14 +19,12 @@ #include #include #include -#include #include #include #include #include #include #include -#include #include #include #include @@ -123,28 +121,6 @@ typedef enum avro_binlog_end /** How many bytes each thread tries to send */ #define AVRO_DATA_BURST_SIZE (32 * 1024) -struct AvroTable -{ - AvroTable(avro_file_writer_t file, avro_value_iface_t* iface, avro_schema_t schema): - avro_file(file), - avro_writer_iface(iface), - avro_schema(schema) - { - } - - ~AvroTable() - { - avro_file_writer_flush(avro_file); - avro_file_writer_close(avro_file); - avro_value_iface_decref(avro_writer_iface); - avro_schema_decref(avro_schema); - } - - avro_file_writer_t avro_file; /*< Current Avro data file */ - avro_value_iface_t* avro_writer_iface; /*< Avro C API writer interface */ - avro_schema_t avro_schema; /*< Native Avro schema of the table */ -}; - /** Data format used when streaming data to the clients */ enum avro_data_format { @@ -169,10 +145,7 @@ static const MXS_ENUM_VALUE codec_values[] = {NULL} }; -typedef std::tr1::shared_ptr SAvroTable; - typedef std::tr1::unordered_map CreatedTables; -typedef std::tr1::unordered_map AvroTables; typedef std::tr1::unordered_map MappedTables; typedef std::tr1::unordered_map ActiveMaps; @@ -199,7 +172,6 @@ public: gtid_pos_t gtid; ActiveMaps active_maps; MappedTables table_maps; - AvroTables open_tables; CreatedTables created_tables; uint64_t trx_count; /*< Transactions processed */ uint64_t trx_target; /*< Minimum about of transactions that will trigger @@ -207,10 +179,9 @@ public: uint64_t row_count; /*< Row events processed */ uint64_t row_target; /*< Minimum about of row events that will trigger * a flush of all tables */ - uint64_t block_size; /**< Avro datablock size */ - enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */ sqlite3* sqlite_handle; uint32_t task_handle; /**< Delayed task handle */ + RowEventHandler* event_hander; struct { @@ -288,10 +259,7 @@ extern void avro_client_rotate(Avro *router, AvroSession *client, uint8_t *ptr); extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd); extern void avro_close_binlog(int fd); extern avro_binlog_end_t avro_read_all_events(Avro *router); -extern AvroTable* avro_table_alloc(const char* filepath, const char* json_schema, - const char *codec, size_t block_size); extern char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create); -extern void save_avro_schema(const char *path, const char* schema, STableMapEvent& map, STableCreateEvent& create); extern bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); extern bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos); @@ -299,20 +267,6 @@ REP_HEADER construct_header(uint8_t* ptr); bool avro_save_conversion_state(Avro *router); void avro_update_index(Avro* router); -enum avrorouter_file_op -{ - AVROROUTER_SYNC, - AVROROUTER_FLUSH -}; - -/** - * @brief Flush or sync all tables - * - * @param router Router instance - * @param flush AVROROUTER_SYNC for sync only or AVROROUTER_FLUSH for full flush - */ -extern void avro_flush_all_tables(Avro *router, enum avrorouter_file_op flush); - #define AVRO_CLIENT_UNREGISTERED 0x0000 #define AVRO_CLIENT_REGISTERED 0x0001 #define AVRO_CLIENT_REQUEST_DATA 0x0002 diff --git a/server/modules/routing/avrorouter/rpl_events.hh b/server/modules/routing/avrorouter/rpl_events.hh index 0e3d5db33..df8ff43e3 100644 --- a/server/modules/routing/avrorouter/rpl_events.hh +++ b/server/modules/routing/avrorouter/rpl_events.hh @@ -15,6 +15,10 @@ #include #include #include +#include +#include + +#include typedef std::vector Bytes; @@ -113,21 +117,32 @@ typedef std::tr1::shared_ptr STableMapEvent; class RowEventHandler { public: - RowEventHandler(const STableMapEvent& map, const STableCreateEvent& create): - m_map(map), - m_create(create) - { - } - virtual ~RowEventHandler() { } + // A table was opened + virtual bool open_table(const STableMapEvent& map, const STableCreateEvent& create) + { + return true; + } + // Prepare a new row for processing - virtual void prepare(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) = 0; + virtual bool prepare_table(std::string database, std::string table) + { + return true; + } + + // Flush open tables + virtual void flush_tables() + { + } + + // Prepare a new row for processing + virtual void prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) = 0; // Called once all columns are processed - virtual bool commit() = 0; + virtual bool commit(const gtid_pos_t& gtid) = 0; // 32-bit integer handler virtual void column(int i, int32_t value) = 0; @@ -149,8 +164,4 @@ public: // Empty (NULL) value type handler virtual void column(int i) = 0; - -protected: - const STableMapEvent& m_map; // The table map event for this row - const STableCreateEvent& m_create; // The CREATE TABLE statement for this row };