Integrate Replicator into avrorouter

Took the Replicator into use in avrorouter as an alternative to the
binlogrouter based setup. This also allows the avrorouter to automatically
handle master failovers and to start replication from GTID coordinates.
This commit is contained in:
Markus Mäkelä 2019-04-07 11:33:24 +03:00
parent 7723e7c933
commit 733dc491d9
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19
6 changed files with 55 additions and 38 deletions

View File

@ -4,7 +4,7 @@ if(AVRO_FOUND AND JANSSON_FOUND)
# The common avrorouter functionality
add_library(avro-common SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc
avro_schema.cc avro_rbr.cc avro_file.cc avro_converter.cc rpl.cc)
avro_schema.cc avro_rbr.cc avro_file.cc avro_converter.cc rpl.cc replicator.cc sql.cc)
set_target_properties(avro-common PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs)
target_link_libraries(avro-common maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro lzma)
install_module(avro-common core)

View File

@ -133,22 +133,41 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRow
, handler(service, handler, params->get_compiled_regex("match", 0, NULL).release(),
params->get_compiled_regex("exclude", 0, NULL).release())
{
if (source)
if (params->contains(CN_SERVERS))
{
read_source_service_options(source);
MXS_NOTICE("Replicating directly from a master server");
cdc::Config cnf;
cnf.service = service;
cnf.statedir = avrodir;
cnf.server_id = params->get_integer("server_id");
cnf.gtid = params->get_string("gtid_start_pos");
auto worker = mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN);
worker->execute([this, cnf]() {
m_replicator = cdc::Replicator::start(cnf, &this->handler);
mxb_assert(m_replicator);
}, mxs::RoutingWorker::EXECUTE_QUEUED);
}
else
{
if (source)
{
read_source_service_options(source);
}
char filename[BINLOG_FNAMELEN + 1];
snprintf(filename,
sizeof(filename),
BINLOG_NAMEFMT,
filestem.c_str(),
static_cast<int>(params->get_integer("start_index")));
binlog_name = filename;
MXS_NOTICE("Reading MySQL binlog files from %s", binlogdir.c_str());
MXS_NOTICE("First binlog is: %s", binlog_name.c_str());
}
char filename[BINLOG_FNAMELEN + 1];
snprintf(filename,
sizeof(filename),
BINLOG_NAMEFMT,
filestem.c_str(),
static_cast<int>(params->get_integer("start_index")));
binlog_name = filename;
MXS_NOTICE("Reading MySQL binlog files from %s", binlogdir.c_str());
MXS_NOTICE("Avro files stored at: %s", avrodir.c_str());
MXS_NOTICE("First binlog is: %s", binlog_name.c_str());
// TODO: Do these in Avro::create
avro_load_conversion_state(this);

View File

@ -384,7 +384,10 @@ bool notify_cb(DCB* dcb, void* data)
void notify_all_clients(SERVICE* service)
{
dcb_foreach(notify_cb, service);
auto worker = mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN);
worker->execute([service]() {
dcb_foreach(notify_cb, service);
}, mxs::RoutingWorker::EXECUTE_AUTO);
}
void do_checkpoint(Avro* router)

View File

@ -62,13 +62,13 @@ MXS_ROUTER* createInstance(SERVICE* service, MXS_CONFIG_PARAMETER* params)
{
uint64_t block_size = service->svc_config_param.get_size("block_size");
mxs_avro_codec_type codec = static_cast<mxs_avro_codec_type>(
service->svc_config_param.get_enum("codec", codec_values));
service->svc_config_param.get_enum("codec", codec_values));
std::string avrodir = service->svc_config_param.get_string("avrodir");
SRowEventHandler handler(new AvroConverter(avrodir, block_size, codec));
Avro* router = Avro::create(service, handler);
if (router)
if (router && !params->contains(CN_SERVERS))
{
conversion_task_ctl(router, true);
}
@ -525,25 +525,17 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
| MXS_MODULE_OPT_PATH_X_OK
| MXS_MODULE_OPT_PATH_CREAT
},
{"source",
MXS_MODULE_PARAM_SERVICE},
{"filestem", MXS_MODULE_PARAM_STRING,
BINLOG_NAME_ROOT},
{"group_rows", MXS_MODULE_PARAM_COUNT,
"1000"},
{"group_trx", MXS_MODULE_PARAM_COUNT,
"1"},
{"start_index", MXS_MODULE_PARAM_COUNT,
"1"},
{"block_size", MXS_MODULE_PARAM_SIZE,
"0"},
{"codec", MXS_MODULE_PARAM_ENUM, "null",
MXS_MODULE_OPT_ENUM_UNIQUE,
codec_values},
{"match",
MXS_MODULE_PARAM_REGEX},
{"exclude",
MXS_MODULE_PARAM_REGEX},
{"source", MXS_MODULE_PARAM_SERVICE},
{"filestem", MXS_MODULE_PARAM_STRING, BINLOG_NAME_ROOT},
{"group_rows", MXS_MODULE_PARAM_COUNT, "1000"},
{"group_trx", MXS_MODULE_PARAM_COUNT, "1"},
{"start_index", MXS_MODULE_PARAM_COUNT, "1"},
{"block_size", MXS_MODULE_PARAM_SIZE, "0"},
{"codec", MXS_MODULE_PARAM_ENUM, "null", MXS_MODULE_OPT_ENUM_UNIQUE, codec_values},
{"match", MXS_MODULE_PARAM_REGEX},
{"exclude", MXS_MODULE_PARAM_REGEX},
{"server_id", MXS_MODULE_PARAM_STRING, "1234"},
{"gtid_start_pos", MXS_MODULE_PARAM_STRING},
{MXS_END_MODULE_PARAMS}
}
};

View File

@ -998,7 +998,7 @@ void Rpl::handle_event(REP_HEADER hdr, uint8_t* ptr)
{
handle_row_event(&hdr, ptr);
}
else if (hdr.event_type == MARIADB10_GTID_EVENT)
else if (hdr.event_type == GTID_EVENT)
{
m_gtid.extract(hdr, ptr);
}

View File

@ -31,6 +31,7 @@
#include <blr_constants.hh>
#include "rpl.hh"
#include "replicator.hh"
MXS_BEGIN_DECLS
@ -104,7 +105,7 @@ enum mxs_avro_codec_type
static const MXS_ENUM_VALUE codec_values[] =
{
{"null", MXS_AVRO_CODEC_NULL },
{"null", MXS_AVRO_CODEC_NULL},
{"deflate", MXS_AVRO_CODEC_DEFLATE},
// Not yet implemented
// {"snappy", MXS_AVRO_CODEC_SNAPPY},
@ -135,6 +136,8 @@ public:
Rpl handler;
private:
std::unique_ptr<cdc::Replicator> m_replicator;
Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRowEventHandler handler);
void read_source_service_options(SERVICE* source);
};
@ -219,6 +222,6 @@ REP_HEADER construct_header(uint8_t* ptr);
bool avro_save_conversion_state(Avro* router);
bool avro_load_conversion_state(Avro* router);
void avro_load_metadata_from_schemas(Avro* router);
void notify_all_clients(Avro* router);
void notify_all_clients(SERVICE* router);
MXS_END_DECLS