diff --git a/server/modules/routing/avrorouter/avro.cc b/server/modules/routing/avrorouter/avro.cc index f017e5ca8..b6f12d8d1 100644 --- a/server/modules/routing/avrorouter/avro.cc +++ b/server/modules/routing/avrorouter/avro.cc @@ -104,7 +104,7 @@ void Avro::read_source_service_options(SERVICE* source) } //static -Avro* Avro::create(SERVICE* service) +Avro* Avro::create(SERVICE* service, SRowEventHandler handler) { SERVICE* source_service = NULL; MXS_CONFIG_PARAMETER *param = config_get_param(service->svc_config_param, "source"); @@ -135,10 +135,10 @@ Avro* Avro::create(SERVICE* service) } } - return new (std::nothrow) Avro(service, service->svc_config_param, source_service); + return new (std::nothrow) Avro(service, service->svc_config_param, source_service, handler); } -Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source): +Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRowEventHandler handler): service(service), filestem(config_get_string(params, "filestem")), binlogdir(config_get_string(params, "binlogdir")), @@ -153,14 +153,8 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source): row_count(0), row_target(config_get_integer(params, "group_rows")), task_handle(0), - stats{0} + event_handler(handler) { - uint64_t block_size = config_get_size(params, "block_size"); - mxs_avro_codec_type codec = static_cast(config_get_enum(params, "codec", codec_values)); - - // TODO: pass this as a parameter or something - event_hander = new AvroConverter(avrodir, block_size, codec); - /** For detection of CREATE/ALTER TABLE statements */ static const char* create_table_regex = "(?i)create[a-z0-9[:space:]_]+table"; static const char* alter_table_regex = "(?i)alter[[:space:]]+table"; diff --git a/server/modules/routing/avrorouter/avro_file.cc b/server/modules/routing/avrorouter/avro_file.cc index d5f2c1fca..2c3b0af6e 100644 --- a/server/modules/routing/avrorouter/avro_file.cc +++ b/server/modules/routing/avrorouter/avro_file.cc @@ -380,7 +380,7 @@ void notify_all_clients(Avro *router) void do_checkpoint(Avro *router) { - router->event_hander->flush_tables(); + router->event_handler->flush_tables(); avro_save_conversion_state(router); notify_all_clients(router); router->row_count = router->trx_count = 0; diff --git a/server/modules/routing/avrorouter/avro_main.cc b/server/modules/routing/avrorouter/avro_main.cc index 9aa849ecd..09f9d89a7 100644 --- a/server/modules/routing/avrorouter/avro_main.cc +++ b/server/modules/routing/avrorouter/avro_main.cc @@ -40,6 +40,8 @@ #include #include +#include "avro_converter.hh" + using namespace mxs; static bool conversion_task_ctl(Avro *inst, bool start); @@ -59,7 +61,12 @@ static bool conversion_task_ctl(Avro *inst, bool start); */ MXS_ROUTER* createInstance(SERVICE *service, char **options) { - Avro* router = Avro::create(service); + uint64_t block_size = config_get_size(service->svc_config_param, "block_size"); + mxs_avro_codec_type codec = static_cast(config_get_enum(service->svc_config_param, "codec", codec_values)); + std::string avrodir = config_get_string(service->svc_config_param, "avrodir"); + SRowEventHandler handler(new AvroConverter(avrodir, block_size, codec)); + + Avro* router = Avro::create(service, handler); if (router) { @@ -274,7 +281,7 @@ bool converter_func(Worker::Call::action_t action, Avro* router) /** We reached end of file, flush unwritten records to disk */ if (progress) { - router->event_hander->flush_tables(); + router->event_handler->flush_tables(); avro_save_conversion_state(router); logged = false; } diff --git a/server/modules/routing/avrorouter/avro_rbr.cc b/server/modules/routing/avrorouter/avro_rbr.cc index 9c8f6a96c..23bcbe163 100644 --- a/server/modules/routing/avrorouter/avro_rbr.cc +++ b/server/modules/routing/avrorouter/avro_rbr.cc @@ -31,7 +31,7 @@ static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET va * larger than 255 is added */ uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, - RowEventHandler* conv, uint8_t *ptr, + SRowEventHandler& conv, uint8_t *ptr, uint8_t *columns_present, uint8_t *end); /** @@ -102,7 +102,7 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) } } - if (router->event_hander->open_table(map, create->second)) + if (router->event_handler->open_table(map, create->second)) { create->second->was_used = true; @@ -221,7 +221,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2]; snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str()); - bool ok = router->event_hander->prepare_table(map->database, map->table); + bool ok = router->event_handler->prepare_table(map->database, map->table); auto create = router->created_tables.find(table_ident); if (ok && create != router->created_tables.end() && @@ -241,18 +241,18 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) // Increment the event count for this transaction router->gtid.event_num++; - router->event_hander->prepare_row(router->gtid, *hdr, event_type); - ptr = process_row_event_data(map, create->second, router->event_hander, ptr, col_present, end); - router->event_hander->commit(router->gtid); + router->event_handler->prepare_row(router->gtid, *hdr, event_type); + ptr = process_row_event_data(map, create->second, router->event_handler, ptr, col_present, end); + router->event_handler->commit(router->gtid); /** Update rows events have the before and after images of the * affected rows so we'll process them as another record with * a different type */ if (event_type == UPDATE_EVENT) { - router->event_hander->prepare_row(router->gtid, *hdr, UPDATE_EVENT_AFTER); - ptr = process_row_event_data(map, create->second, router->event_hander, ptr, col_present, end); - router->event_hander->commit(router->gtid); + router->event_handler->prepare_row(router->gtid, *hdr, UPDATE_EVENT_AFTER); + ptr = process_row_event_data(map, create->second, router->event_handler, ptr, col_present, end); + router->event_handler->commit(router->gtid); } rows++; @@ -304,7 +304,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) * @param metadata Field metadata * @param value Pointer to the start of the in-memory representation of the data */ -void set_numeric_field_value(RowEventHandler* conv, int idx, uint8_t type, +void set_numeric_field_value(SRowEventHandler& conv, int idx, uint8_t type, uint8_t *metadata, uint8_t *value) { switch (type) @@ -464,8 +464,9 @@ static bool all_fields_null(uint8_t* null_bitmap, int ncolumns) * this row event. Currently this should be a bitfield which has all bits set. * @return Pointer to the first byte after the current row event */ -uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, RowEventHandler* conv, - uint8_t *ptr, uint8_t *columns_present, uint8_t *end) +uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, + SRowEventHandler& conv, uint8_t *ptr, + uint8_t *columns_present, uint8_t *end) { int npresent = 0; long ncolumns = map->columns(); diff --git a/server/modules/routing/avrorouter/avrorouter.hh b/server/modules/routing/avrorouter/avrorouter.hh index 5532c61be..6ddec4c19 100644 --- a/server/modules/routing/avrorouter/avrorouter.hh +++ b/server/modules/routing/avrorouter/avrorouter.hh @@ -123,7 +123,7 @@ class Avro: public MXS_ROUTER Avro& operator=(const Avro&) = delete; public: - static Avro* create(SERVICE* service); + static Avro* create(SERVICE* service, SRowEventHandler handler); SERVICE* service; /*< Pointer to the service using this router */ std::string filestem; /*< Root of binlog filename */ @@ -148,15 +148,10 @@ public: uint64_t row_target; /*< Minimum about of row events that will trigger * a flush of all tables */ uint32_t task_handle; /**< Delayed task handle */ - RowEventHandler* event_hander; - - struct - { - int n_clients; /*< Number client sessions created */ - } stats; /*< Statistics for this router */ + SRowEventHandler event_handler; private: - Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source); + Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRowEventHandler handler); void read_source_service_options(SERVICE* source); }; diff --git a/server/modules/routing/avrorouter/rpl_events.hh b/server/modules/routing/avrorouter/rpl_events.hh index df8ff43e3..c5054916a 100644 --- a/server/modules/routing/avrorouter/rpl_events.hh +++ b/server/modules/routing/avrorouter/rpl_events.hh @@ -165,3 +165,5 @@ public: // Empty (NULL) value type handler virtual void column(int i) = 0; }; + +typedef std::auto_ptr SRowEventHandler;