diff --git a/server/modules/routing/avrorouter/CMakeLists.txt b/server/modules/routing/avrorouter/CMakeLists.txt index 9d640104d..363950609 100644 --- a/server/modules/routing/avrorouter/CMakeLists.txt +++ b/server/modules/routing/avrorouter/CMakeLists.txt @@ -4,7 +4,7 @@ if(AVRO_FOUND AND JANSSON_FOUND) # The common avrorouter functionality add_library(avro-common SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc - avro_schema.cc avro_rbr.cc avro_file.cc avro_converter.cc rpl.cc) + avro_schema.cc avro_rbr.cc avro_file.cc avro_converter.cc rpl.cc replicator.cc sql.cc) set_target_properties(avro-common PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs) target_link_libraries(avro-common maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro lzma) install_module(avro-common core) diff --git a/server/modules/routing/avrorouter/avro.cc b/server/modules/routing/avrorouter/avro.cc index 1ac50d0a8..6d460fc1a 100644 --- a/server/modules/routing/avrorouter/avro.cc +++ b/server/modules/routing/avrorouter/avro.cc @@ -133,22 +133,41 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRow , handler(service, handler, params->get_compiled_regex("match", 0, NULL).release(), params->get_compiled_regex("exclude", 0, NULL).release()) { - if (source) + if (params->contains(CN_SERVERS)) { - read_source_service_options(source); + MXS_NOTICE("Replicating directly from a master server"); + cdc::Config cnf; + cnf.service = service; + cnf.statedir = avrodir; + cnf.server_id = params->get_integer("server_id"); + cnf.gtid = params->get_string("gtid_start_pos"); + + auto worker = mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN); + worker->execute([this, cnf]() { + m_replicator = cdc::Replicator::start(cnf, &this->handler); + mxb_assert(m_replicator); + }, mxs::RoutingWorker::EXECUTE_QUEUED); + } + else + { + if (source) + { + read_source_service_options(source); + } + + char filename[BINLOG_FNAMELEN + 1]; + snprintf(filename, + sizeof(filename), + BINLOG_NAMEFMT, + filestem.c_str(), + static_cast(params->get_integer("start_index"))); + binlog_name = filename; + + MXS_NOTICE("Reading MySQL binlog files from %s", binlogdir.c_str()); + MXS_NOTICE("First binlog is: %s", binlog_name.c_str()); } - char filename[BINLOG_FNAMELEN + 1]; - snprintf(filename, - sizeof(filename), - BINLOG_NAMEFMT, - filestem.c_str(), - static_cast(params->get_integer("start_index"))); - binlog_name = filename; - - MXS_NOTICE("Reading MySQL binlog files from %s", binlogdir.c_str()); MXS_NOTICE("Avro files stored at: %s", avrodir.c_str()); - MXS_NOTICE("First binlog is: %s", binlog_name.c_str()); // TODO: Do these in Avro::create avro_load_conversion_state(this); diff --git a/server/modules/routing/avrorouter/avro_file.cc b/server/modules/routing/avrorouter/avro_file.cc index efe46fac3..946b70827 100644 --- a/server/modules/routing/avrorouter/avro_file.cc +++ b/server/modules/routing/avrorouter/avro_file.cc @@ -384,7 +384,10 @@ bool notify_cb(DCB* dcb, void* data) void notify_all_clients(SERVICE* service) { - dcb_foreach(notify_cb, service); + auto worker = mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN); + worker->execute([service]() { + dcb_foreach(notify_cb, service); + }, mxs::RoutingWorker::EXECUTE_AUTO); } void do_checkpoint(Avro* router) diff --git a/server/modules/routing/avrorouter/avro_main.cc b/server/modules/routing/avrorouter/avro_main.cc index 2e240ae0f..634df31fc 100644 --- a/server/modules/routing/avrorouter/avro_main.cc +++ b/server/modules/routing/avrorouter/avro_main.cc @@ -62,13 +62,13 @@ MXS_ROUTER* createInstance(SERVICE* service, MXS_CONFIG_PARAMETER* params) { uint64_t block_size = service->svc_config_param.get_size("block_size"); mxs_avro_codec_type codec = static_cast( - service->svc_config_param.get_enum("codec", codec_values)); + service->svc_config_param.get_enum("codec", codec_values)); std::string avrodir = service->svc_config_param.get_string("avrodir"); SRowEventHandler handler(new AvroConverter(avrodir, block_size, codec)); Avro* router = Avro::create(service, handler); - if (router) + if (router && !params->contains(CN_SERVERS)) { conversion_task_ctl(router, true); } @@ -525,25 +525,17 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE() | MXS_MODULE_OPT_PATH_X_OK | MXS_MODULE_OPT_PATH_CREAT }, - {"source", - MXS_MODULE_PARAM_SERVICE}, - {"filestem", MXS_MODULE_PARAM_STRING, - BINLOG_NAME_ROOT}, - {"group_rows", MXS_MODULE_PARAM_COUNT, - "1000"}, - {"group_trx", MXS_MODULE_PARAM_COUNT, - "1"}, - {"start_index", MXS_MODULE_PARAM_COUNT, - "1"}, - {"block_size", MXS_MODULE_PARAM_SIZE, - "0"}, - {"codec", MXS_MODULE_PARAM_ENUM, "null", - MXS_MODULE_OPT_ENUM_UNIQUE, - codec_values}, - {"match", - MXS_MODULE_PARAM_REGEX}, - {"exclude", - MXS_MODULE_PARAM_REGEX}, + {"source", MXS_MODULE_PARAM_SERVICE}, + {"filestem", MXS_MODULE_PARAM_STRING, BINLOG_NAME_ROOT}, + {"group_rows", MXS_MODULE_PARAM_COUNT, "1000"}, + {"group_trx", MXS_MODULE_PARAM_COUNT, "1"}, + {"start_index", MXS_MODULE_PARAM_COUNT, "1"}, + {"block_size", MXS_MODULE_PARAM_SIZE, "0"}, + {"codec", MXS_MODULE_PARAM_ENUM, "null", MXS_MODULE_OPT_ENUM_UNIQUE, codec_values}, + {"match", MXS_MODULE_PARAM_REGEX}, + {"exclude", MXS_MODULE_PARAM_REGEX}, + {"server_id", MXS_MODULE_PARAM_STRING, "1234"}, + {"gtid_start_pos", MXS_MODULE_PARAM_STRING}, {MXS_END_MODULE_PARAMS} } }; diff --git a/server/modules/routing/avrorouter/avro_rbr.cc b/server/modules/routing/avrorouter/avro_rbr.cc index f9c157662..e07d3a97e 100644 --- a/server/modules/routing/avrorouter/avro_rbr.cc +++ b/server/modules/routing/avrorouter/avro_rbr.cc @@ -998,7 +998,7 @@ void Rpl::handle_event(REP_HEADER hdr, uint8_t* ptr) { handle_row_event(&hdr, ptr); } - else if (hdr.event_type == MARIADB10_GTID_EVENT) + else if (hdr.event_type == GTID_EVENT) { m_gtid.extract(hdr, ptr); } diff --git a/server/modules/routing/avrorouter/avrorouter.hh b/server/modules/routing/avrorouter/avrorouter.hh index 05546925b..50b45fdfc 100644 --- a/server/modules/routing/avrorouter/avrorouter.hh +++ b/server/modules/routing/avrorouter/avrorouter.hh @@ -31,6 +31,7 @@ #include #include "rpl.hh" +#include "replicator.hh" MXS_BEGIN_DECLS @@ -104,7 +105,7 @@ enum mxs_avro_codec_type static const MXS_ENUM_VALUE codec_values[] = { - {"null", MXS_AVRO_CODEC_NULL }, + {"null", MXS_AVRO_CODEC_NULL}, {"deflate", MXS_AVRO_CODEC_DEFLATE}, // Not yet implemented // {"snappy", MXS_AVRO_CODEC_SNAPPY}, @@ -135,6 +136,8 @@ public: Rpl handler; private: + std::unique_ptr m_replicator; + Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRowEventHandler handler); void read_source_service_options(SERVICE* source); }; @@ -219,6 +222,6 @@ REP_HEADER construct_header(uint8_t* ptr); bool avro_save_conversion_state(Avro* router); bool avro_load_conversion_state(Avro* router); void avro_load_metadata_from_schemas(Avro* router); -void notify_all_clients(Avro* router); +void notify_all_clients(SERVICE* router); MXS_END_DECLS