From 733dc491d9d60e23e7bf76141c153048282f3bb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sun, 7 Apr 2019 11:33:24 +0300 Subject: [PATCH] Integrate Replicator into avrorouter Took the Replicator into use in avrorouter as an alternative to the binlogrouter based setup. This also allows the avrorouter to automatically handle master failovers and to start replication from GTID coordinates. --- .../modules/routing/avrorouter/CMakeLists.txt | 2 +- server/modules/routing/avrorouter/avro.cc | 43 +++++++++++++------ .../modules/routing/avrorouter/avro_file.cc | 5 ++- .../modules/routing/avrorouter/avro_main.cc | 34 ++++++--------- server/modules/routing/avrorouter/avro_rbr.cc | 2 +- .../modules/routing/avrorouter/avrorouter.hh | 7 ++- 6 files changed, 55 insertions(+), 38 deletions(-) diff --git a/server/modules/routing/avrorouter/CMakeLists.txt b/server/modules/routing/avrorouter/CMakeLists.txt index 9d640104d..363950609 100644 --- a/server/modules/routing/avrorouter/CMakeLists.txt +++ b/server/modules/routing/avrorouter/CMakeLists.txt @@ -4,7 +4,7 @@ if(AVRO_FOUND AND JANSSON_FOUND) # The common avrorouter functionality add_library(avro-common SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc - avro_schema.cc avro_rbr.cc avro_file.cc avro_converter.cc rpl.cc) + avro_schema.cc avro_rbr.cc avro_file.cc avro_converter.cc rpl.cc replicator.cc sql.cc) set_target_properties(avro-common PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs) target_link_libraries(avro-common maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro lzma) install_module(avro-common core) diff --git a/server/modules/routing/avrorouter/avro.cc b/server/modules/routing/avrorouter/avro.cc index 1ac50d0a8..6d460fc1a 100644 --- a/server/modules/routing/avrorouter/avro.cc +++ b/server/modules/routing/avrorouter/avro.cc @@ -133,22 +133,41 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRow , handler(service, handler, params->get_compiled_regex("match", 0, NULL).release(), params->get_compiled_regex("exclude", 0, NULL).release()) { - if (source) + if (params->contains(CN_SERVERS)) { - read_source_service_options(source); + MXS_NOTICE("Replicating directly from a master server"); + cdc::Config cnf; + cnf.service = service; + cnf.statedir = avrodir; + cnf.server_id = params->get_integer("server_id"); + cnf.gtid = params->get_string("gtid_start_pos"); + + auto worker = mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN); + worker->execute([this, cnf]() { + m_replicator = cdc::Replicator::start(cnf, &this->handler); + mxb_assert(m_replicator); + }, mxs::RoutingWorker::EXECUTE_QUEUED); + } + else + { + if (source) + { + read_source_service_options(source); + } + + char filename[BINLOG_FNAMELEN + 1]; + snprintf(filename, + sizeof(filename), + BINLOG_NAMEFMT, + filestem.c_str(), + static_cast(params->get_integer("start_index"))); + binlog_name = filename; + + MXS_NOTICE("Reading MySQL binlog files from %s", binlogdir.c_str()); + MXS_NOTICE("First binlog is: %s", binlog_name.c_str()); } - char filename[BINLOG_FNAMELEN + 1]; - snprintf(filename, - sizeof(filename), - BINLOG_NAMEFMT, - filestem.c_str(), - static_cast(params->get_integer("start_index"))); - binlog_name = filename; - - MXS_NOTICE("Reading MySQL binlog files from %s", binlogdir.c_str()); MXS_NOTICE("Avro files stored at: %s", avrodir.c_str()); - MXS_NOTICE("First binlog is: %s", binlog_name.c_str()); // TODO: Do these in Avro::create avro_load_conversion_state(this); diff --git a/server/modules/routing/avrorouter/avro_file.cc b/server/modules/routing/avrorouter/avro_file.cc index efe46fac3..946b70827 100644 --- a/server/modules/routing/avrorouter/avro_file.cc +++ b/server/modules/routing/avrorouter/avro_file.cc @@ -384,7 +384,10 @@ bool notify_cb(DCB* dcb, void* data) void notify_all_clients(SERVICE* service) { - dcb_foreach(notify_cb, service); + auto worker = mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN); + worker->execute([service]() { + dcb_foreach(notify_cb, service); + }, mxs::RoutingWorker::EXECUTE_AUTO); } void do_checkpoint(Avro* router) diff --git a/server/modules/routing/avrorouter/avro_main.cc b/server/modules/routing/avrorouter/avro_main.cc index 2e240ae0f..634df31fc 100644 --- a/server/modules/routing/avrorouter/avro_main.cc +++ b/server/modules/routing/avrorouter/avro_main.cc @@ -62,13 +62,13 @@ MXS_ROUTER* createInstance(SERVICE* service, MXS_CONFIG_PARAMETER* params) { uint64_t block_size = service->svc_config_param.get_size("block_size"); mxs_avro_codec_type codec = static_cast( - service->svc_config_param.get_enum("codec", codec_values)); + service->svc_config_param.get_enum("codec", codec_values)); std::string avrodir = service->svc_config_param.get_string("avrodir"); SRowEventHandler handler(new AvroConverter(avrodir, block_size, codec)); Avro* router = Avro::create(service, handler); - if (router) + if (router && !params->contains(CN_SERVERS)) { conversion_task_ctl(router, true); } @@ -525,25 +525,17 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE() | MXS_MODULE_OPT_PATH_X_OK | MXS_MODULE_OPT_PATH_CREAT }, - {"source", - MXS_MODULE_PARAM_SERVICE}, - {"filestem", MXS_MODULE_PARAM_STRING, - BINLOG_NAME_ROOT}, - {"group_rows", MXS_MODULE_PARAM_COUNT, - "1000"}, - {"group_trx", MXS_MODULE_PARAM_COUNT, - "1"}, - {"start_index", MXS_MODULE_PARAM_COUNT, - "1"}, - {"block_size", MXS_MODULE_PARAM_SIZE, - "0"}, - {"codec", MXS_MODULE_PARAM_ENUM, "null", - MXS_MODULE_OPT_ENUM_UNIQUE, - codec_values}, - {"match", - MXS_MODULE_PARAM_REGEX}, - {"exclude", - MXS_MODULE_PARAM_REGEX}, + {"source", MXS_MODULE_PARAM_SERVICE}, + {"filestem", MXS_MODULE_PARAM_STRING, BINLOG_NAME_ROOT}, + {"group_rows", MXS_MODULE_PARAM_COUNT, "1000"}, + {"group_trx", MXS_MODULE_PARAM_COUNT, "1"}, + {"start_index", MXS_MODULE_PARAM_COUNT, "1"}, + {"block_size", MXS_MODULE_PARAM_SIZE, "0"}, + {"codec", MXS_MODULE_PARAM_ENUM, "null", MXS_MODULE_OPT_ENUM_UNIQUE, codec_values}, + {"match", MXS_MODULE_PARAM_REGEX}, + {"exclude", MXS_MODULE_PARAM_REGEX}, + {"server_id", MXS_MODULE_PARAM_STRING, "1234"}, + {"gtid_start_pos", MXS_MODULE_PARAM_STRING}, {MXS_END_MODULE_PARAMS} } }; diff --git a/server/modules/routing/avrorouter/avro_rbr.cc b/server/modules/routing/avrorouter/avro_rbr.cc index f9c157662..e07d3a97e 100644 --- a/server/modules/routing/avrorouter/avro_rbr.cc +++ b/server/modules/routing/avrorouter/avro_rbr.cc @@ -998,7 +998,7 @@ void Rpl::handle_event(REP_HEADER hdr, uint8_t* ptr) { handle_row_event(&hdr, ptr); } - else if (hdr.event_type == MARIADB10_GTID_EVENT) + else if (hdr.event_type == GTID_EVENT) { m_gtid.extract(hdr, ptr); } diff --git a/server/modules/routing/avrorouter/avrorouter.hh b/server/modules/routing/avrorouter/avrorouter.hh index 05546925b..50b45fdfc 100644 --- a/server/modules/routing/avrorouter/avrorouter.hh +++ b/server/modules/routing/avrorouter/avrorouter.hh @@ -31,6 +31,7 @@ #include #include "rpl.hh" +#include "replicator.hh" MXS_BEGIN_DECLS @@ -104,7 +105,7 @@ enum mxs_avro_codec_type static const MXS_ENUM_VALUE codec_values[] = { - {"null", MXS_AVRO_CODEC_NULL }, + {"null", MXS_AVRO_CODEC_NULL}, {"deflate", MXS_AVRO_CODEC_DEFLATE}, // Not yet implemented // {"snappy", MXS_AVRO_CODEC_SNAPPY}, @@ -135,6 +136,8 @@ public: Rpl handler; private: + std::unique_ptr m_replicator; + Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRowEventHandler handler); void read_source_service_options(SERVICE* source); }; @@ -219,6 +222,6 @@ REP_HEADER construct_header(uint8_t* ptr); bool avro_save_conversion_state(Avro* router); bool avro_load_conversion_state(Avro* router); void avro_load_metadata_from_schemas(Avro* router); -void notify_all_clients(Avro* router); +void notify_all_clients(SERVICE* router); MXS_END_DECLS