MXS-1881: Pass the RowEventConverer as a parameter
The RowEventConverter is now passed as a parameter to the Avro instance. Wrapped the value in an std::auto_ptr to make the cleanup automatic (when it is implemented). Fixed a typo in the event handler member variable and removed the unused stats member.
This commit is contained in:
@ -104,7 +104,7 @@ void Avro::read_source_service_options(SERVICE* source)
|
|||||||
}
|
}
|
||||||
|
|
||||||
//static
|
//static
|
||||||
Avro* Avro::create(SERVICE* service)
|
Avro* Avro::create(SERVICE* service, SRowEventHandler handler)
|
||||||
{
|
{
|
||||||
SERVICE* source_service = NULL;
|
SERVICE* source_service = NULL;
|
||||||
MXS_CONFIG_PARAMETER *param = config_get_param(service->svc_config_param, "source");
|
MXS_CONFIG_PARAMETER *param = config_get_param(service->svc_config_param, "source");
|
||||||
@ -135,10 +135,10 @@ Avro* Avro::create(SERVICE* service)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new (std::nothrow) Avro(service, service->svc_config_param, source_service);
|
return new (std::nothrow) Avro(service, service->svc_config_param, source_service, handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source):
|
Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRowEventHandler handler):
|
||||||
service(service),
|
service(service),
|
||||||
filestem(config_get_string(params, "filestem")),
|
filestem(config_get_string(params, "filestem")),
|
||||||
binlogdir(config_get_string(params, "binlogdir")),
|
binlogdir(config_get_string(params, "binlogdir")),
|
||||||
@ -153,14 +153,8 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source):
|
|||||||
row_count(0),
|
row_count(0),
|
||||||
row_target(config_get_integer(params, "group_rows")),
|
row_target(config_get_integer(params, "group_rows")),
|
||||||
task_handle(0),
|
task_handle(0),
|
||||||
stats{0}
|
event_handler(handler)
|
||||||
{
|
{
|
||||||
uint64_t block_size = config_get_size(params, "block_size");
|
|
||||||
mxs_avro_codec_type codec = static_cast<mxs_avro_codec_type>(config_get_enum(params, "codec", codec_values));
|
|
||||||
|
|
||||||
// TODO: pass this as a parameter or something
|
|
||||||
event_hander = new AvroConverter(avrodir, block_size, codec);
|
|
||||||
|
|
||||||
/** For detection of CREATE/ALTER TABLE statements */
|
/** For detection of CREATE/ALTER TABLE statements */
|
||||||
static const char* create_table_regex = "(?i)create[a-z0-9[:space:]_]+table";
|
static const char* create_table_regex = "(?i)create[a-z0-9[:space:]_]+table";
|
||||||
static const char* alter_table_regex = "(?i)alter[[:space:]]+table";
|
static const char* alter_table_regex = "(?i)alter[[:space:]]+table";
|
||||||
|
@ -380,7 +380,7 @@ void notify_all_clients(Avro *router)
|
|||||||
|
|
||||||
void do_checkpoint(Avro *router)
|
void do_checkpoint(Avro *router)
|
||||||
{
|
{
|
||||||
router->event_hander->flush_tables();
|
router->event_handler->flush_tables();
|
||||||
avro_save_conversion_state(router);
|
avro_save_conversion_state(router);
|
||||||
notify_all_clients(router);
|
notify_all_clients(router);
|
||||||
router->row_count = router->trx_count = 0;
|
router->row_count = router->trx_count = 0;
|
||||||
|
@ -40,6 +40,8 @@
|
|||||||
#include <maxscale/worker.hh>
|
#include <maxscale/worker.hh>
|
||||||
#include <binlog_common.h>
|
#include <binlog_common.h>
|
||||||
|
|
||||||
|
#include "avro_converter.hh"
|
||||||
|
|
||||||
using namespace mxs;
|
using namespace mxs;
|
||||||
|
|
||||||
static bool conversion_task_ctl(Avro *inst, bool start);
|
static bool conversion_task_ctl(Avro *inst, bool start);
|
||||||
@ -59,7 +61,12 @@ static bool conversion_task_ctl(Avro *inst, bool start);
|
|||||||
*/
|
*/
|
||||||
MXS_ROUTER* createInstance(SERVICE *service, char **options)
|
MXS_ROUTER* createInstance(SERVICE *service, char **options)
|
||||||
{
|
{
|
||||||
Avro* router = Avro::create(service);
|
uint64_t block_size = config_get_size(service->svc_config_param, "block_size");
|
||||||
|
mxs_avro_codec_type codec = static_cast<mxs_avro_codec_type>(config_get_enum(service->svc_config_param, "codec", codec_values));
|
||||||
|
std::string avrodir = config_get_string(service->svc_config_param, "avrodir");
|
||||||
|
SRowEventHandler handler(new AvroConverter(avrodir, block_size, codec));
|
||||||
|
|
||||||
|
Avro* router = Avro::create(service, handler);
|
||||||
|
|
||||||
if (router)
|
if (router)
|
||||||
{
|
{
|
||||||
@ -274,7 +281,7 @@ bool converter_func(Worker::Call::action_t action, Avro* router)
|
|||||||
/** We reached end of file, flush unwritten records to disk */
|
/** We reached end of file, flush unwritten records to disk */
|
||||||
if (progress)
|
if (progress)
|
||||||
{
|
{
|
||||||
router->event_hander->flush_tables();
|
router->event_handler->flush_tables();
|
||||||
avro_save_conversion_state(router);
|
avro_save_conversion_state(router);
|
||||||
logged = false;
|
logged = false;
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,7 @@ static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET va
|
|||||||
* larger than 255 is added */
|
* larger than 255 is added */
|
||||||
|
|
||||||
uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create,
|
uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create,
|
||||||
RowEventHandler* conv, uint8_t *ptr,
|
SRowEventHandler& conv, uint8_t *ptr,
|
||||||
uint8_t *columns_present, uint8_t *end);
|
uint8_t *columns_present, uint8_t *end);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -102,7 +102,7 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (router->event_hander->open_table(map, create->second))
|
if (router->event_handler->open_table(map, create->second))
|
||||||
{
|
{
|
||||||
create->second->was_used = true;
|
create->second->was_used = true;
|
||||||
|
|
||||||
@ -221,7 +221,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
||||||
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str());
|
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str());
|
||||||
|
|
||||||
bool ok = router->event_hander->prepare_table(map->database, map->table);
|
bool ok = router->event_handler->prepare_table(map->database, map->table);
|
||||||
auto create = router->created_tables.find(table_ident);
|
auto create = router->created_tables.find(table_ident);
|
||||||
|
|
||||||
if (ok && create != router->created_tables.end() &&
|
if (ok && create != router->created_tables.end() &&
|
||||||
@ -241,18 +241,18 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
// Increment the event count for this transaction
|
// Increment the event count for this transaction
|
||||||
router->gtid.event_num++;
|
router->gtid.event_num++;
|
||||||
|
|
||||||
router->event_hander->prepare_row(router->gtid, *hdr, event_type);
|
router->event_handler->prepare_row(router->gtid, *hdr, event_type);
|
||||||
ptr = process_row_event_data(map, create->second, router->event_hander, ptr, col_present, end);
|
ptr = process_row_event_data(map, create->second, router->event_handler, ptr, col_present, end);
|
||||||
router->event_hander->commit(router->gtid);
|
router->event_handler->commit(router->gtid);
|
||||||
|
|
||||||
/** Update rows events have the before and after images of the
|
/** Update rows events have the before and after images of the
|
||||||
* affected rows so we'll process them as another record with
|
* affected rows so we'll process them as another record with
|
||||||
* a different type */
|
* a different type */
|
||||||
if (event_type == UPDATE_EVENT)
|
if (event_type == UPDATE_EVENT)
|
||||||
{
|
{
|
||||||
router->event_hander->prepare_row(router->gtid, *hdr, UPDATE_EVENT_AFTER);
|
router->event_handler->prepare_row(router->gtid, *hdr, UPDATE_EVENT_AFTER);
|
||||||
ptr = process_row_event_data(map, create->second, router->event_hander, ptr, col_present, end);
|
ptr = process_row_event_data(map, create->second, router->event_handler, ptr, col_present, end);
|
||||||
router->event_hander->commit(router->gtid);
|
router->event_handler->commit(router->gtid);
|
||||||
}
|
}
|
||||||
|
|
||||||
rows++;
|
rows++;
|
||||||
@ -304,7 +304,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
* @param metadata Field metadata
|
* @param metadata Field metadata
|
||||||
* @param value Pointer to the start of the in-memory representation of the data
|
* @param value Pointer to the start of the in-memory representation of the data
|
||||||
*/
|
*/
|
||||||
void set_numeric_field_value(RowEventHandler* conv, int idx, uint8_t type,
|
void set_numeric_field_value(SRowEventHandler& conv, int idx, uint8_t type,
|
||||||
uint8_t *metadata, uint8_t *value)
|
uint8_t *metadata, uint8_t *value)
|
||||||
{
|
{
|
||||||
switch (type)
|
switch (type)
|
||||||
@ -464,8 +464,9 @@ static bool all_fields_null(uint8_t* null_bitmap, int ncolumns)
|
|||||||
* this row event. Currently this should be a bitfield which has all bits set.
|
* this row event. Currently this should be a bitfield which has all bits set.
|
||||||
* @return Pointer to the first byte after the current row event
|
* @return Pointer to the first byte after the current row event
|
||||||
*/
|
*/
|
||||||
uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, RowEventHandler* conv,
|
uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create,
|
||||||
uint8_t *ptr, uint8_t *columns_present, uint8_t *end)
|
SRowEventHandler& conv, uint8_t *ptr,
|
||||||
|
uint8_t *columns_present, uint8_t *end)
|
||||||
{
|
{
|
||||||
int npresent = 0;
|
int npresent = 0;
|
||||||
long ncolumns = map->columns();
|
long ncolumns = map->columns();
|
||||||
|
@ -123,7 +123,7 @@ class Avro: public MXS_ROUTER
|
|||||||
Avro& operator=(const Avro&) = delete;
|
Avro& operator=(const Avro&) = delete;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
static Avro* create(SERVICE* service);
|
static Avro* create(SERVICE* service, SRowEventHandler handler);
|
||||||
|
|
||||||
SERVICE* service; /*< Pointer to the service using this router */
|
SERVICE* service; /*< Pointer to the service using this router */
|
||||||
std::string filestem; /*< Root of binlog filename */
|
std::string filestem; /*< Root of binlog filename */
|
||||||
@ -148,15 +148,10 @@ public:
|
|||||||
uint64_t row_target; /*< Minimum about of row events that will trigger
|
uint64_t row_target; /*< Minimum about of row events that will trigger
|
||||||
* a flush of all tables */
|
* a flush of all tables */
|
||||||
uint32_t task_handle; /**< Delayed task handle */
|
uint32_t task_handle; /**< Delayed task handle */
|
||||||
RowEventHandler* event_hander;
|
SRowEventHandler event_handler;
|
||||||
|
|
||||||
struct
|
|
||||||
{
|
|
||||||
int n_clients; /*< Number client sessions created */
|
|
||||||
} stats; /*< Statistics for this router */
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source);
|
Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRowEventHandler handler);
|
||||||
void read_source_service_options(SERVICE* source);
|
void read_source_service_options(SERVICE* source);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -165,3 +165,5 @@ public:
|
|||||||
// Empty (NULL) value type handler
|
// Empty (NULL) value type handler
|
||||||
virtual void column(int i) = 0;
|
virtual void column(int i) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef std::auto_ptr<RowEventHandler> SRowEventHandler;
|
||||||
|
Reference in New Issue
Block a user