diff --git a/server/modules/routing/avrorouter/avro.cc b/server/modules/routing/avrorouter/avro.cc index 5b932b67d..dba8bad0e 100644 --- a/server/modules/routing/avrorouter/avro.cc +++ b/server/modules/routing/avrorouter/avro.cc @@ -33,7 +33,6 @@ #include #include #include -#include #include #include #include @@ -44,15 +43,18 @@ #include #include #include +#include +#include #include +using namespace maxscale; + #ifndef BINLOG_NAMEFMT #define BINLOG_NAMEFMT "%s.%06d" #endif #define AVRO_TASK_DELAY_MAX 15 -static const char* avro_task_name = "binlog_to_avro"; static const char* index_task_name = "avro_indexing"; static const char* avro_index_name = "avro.index"; @@ -77,7 +79,7 @@ static void errorReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, static uint64_t getCapabilities(MXS_ROUTER* instance); extern int MaxScaleUptime(); extern void avro_get_used_tables(Avro *router, DCB *dcb); -bool converter_func(void* data); +bool converter_func(Worker::Call::action_t action, Avro* router); bool binlog_next_file_exists(const char* binlogdir, const char* binlog); int blr_file_get_next_binlogname(const char *router); bool avro_load_conversion_state(Avro *router); @@ -363,15 +365,17 @@ static bool conversion_task_ctl(Avro *inst, bool start) if (!inst->service->svc_do_shutdown) { - char tasknm[strlen(avro_task_name) + strlen(inst->service->name) + 2]; - snprintf(tasknm, sizeof(tasknm), "%s-%s", inst->service->name, avro_task_name); + Worker* worker = static_cast(mxs_rworker_get(MXS_RWORKER_MAIN)); - /** Remove old task and create a new one */ - hktask_remove(tasknm); + if (inst->task_handle) + { + worker->cancel_delayed_call(inst->task_handle); + inst->task_handle = 0; + } if (start) { - hktask_add(tasknm, converter_func, inst, inst->task_delay); + inst->task_handle = worker->delayed_call(inst->task_delay * 1000, converter_func, inst); } rval = true; @@ -643,14 +647,6 @@ createInstance(SERVICE *service, char **options) avro_load_conversion_state(inst); avro_load_metadata_from_schemas(inst); - /* - * Add tasks for statistic computation - */ - /** Not used currenly - snprintf(task_name, BLRM_TASK_NAME_LEN, "%s stats", service->name); - hktask_add(task_name, stats_func, inst, AVRO_STATS_FREQ); - */ - /* Start the scan, read, convert AVRO task */ conversion_task_ctl(inst, true); @@ -901,9 +897,13 @@ static uint64_t getCapabilities(MXS_ROUTER* instance) /** * Conversion task: MySQL binlogs to AVRO files */ -bool converter_func(void* data) +bool converter_func(Worker::Call::action_t action, Avro* router) { - Avro* router = (Avro*) data; + if (action == Worker::Call::CANCEL) + { + return false; + } + bool ok = true; avro_binlog_end_t binlog_end = AVRO_OK; diff --git a/server/modules/routing/avrorouter/avrorouter.hh b/server/modules/routing/avrorouter/avrorouter.hh index e90b8a190..fca244fa4 100644 --- a/server/modules/routing/avrorouter/avrorouter.hh +++ b/server/modules/routing/avrorouter/avrorouter.hh @@ -229,6 +229,7 @@ struct Avro * a flush of all tables */ uint64_t block_size; /**< Avro datablock size */ enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */ + uint32_t task_handle; /**< Delayed task handle */ }; struct AvroSession