From 3fce61a61572e6375dec83b955ee7a650e0194e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Tue, 22 May 2018 14:49:12 +0300 Subject: [PATCH] Do avro conversion in main thread The avrorouter no longer uses the housekeeper for the conversion task. This prevents the deadlock which could occur when clients were notified at the same time that the binlogrouter was adding a master reconnection task. --- server/modules/routing/avrorouter/avro.cc | 36 +++++++++---------- .../modules/routing/avrorouter/avrorouter.hh | 1 + 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/server/modules/routing/avrorouter/avro.cc b/server/modules/routing/avrorouter/avro.cc index 5b932b67d..dba8bad0e 100644 --- a/server/modules/routing/avrorouter/avro.cc +++ b/server/modules/routing/avrorouter/avro.cc @@ -33,7 +33,6 @@ #include #include #include -#include #include #include #include @@ -44,15 +43,18 @@ #include #include #include +#include +#include #include +using namespace maxscale; + #ifndef BINLOG_NAMEFMT #define BINLOG_NAMEFMT "%s.%06d" #endif #define AVRO_TASK_DELAY_MAX 15 -static const char* avro_task_name = "binlog_to_avro"; static const char* index_task_name = "avro_indexing"; static const char* avro_index_name = "avro.index"; @@ -77,7 +79,7 @@ static void errorReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, static uint64_t getCapabilities(MXS_ROUTER* instance); extern int MaxScaleUptime(); extern void avro_get_used_tables(Avro *router, DCB *dcb); -bool converter_func(void* data); +bool converter_func(Worker::Call::action_t action, Avro* router); bool binlog_next_file_exists(const char* binlogdir, const char* binlog); int blr_file_get_next_binlogname(const char *router); bool avro_load_conversion_state(Avro *router); @@ -363,15 +365,17 @@ static bool conversion_task_ctl(Avro *inst, bool start) if (!inst->service->svc_do_shutdown) { - char tasknm[strlen(avro_task_name) + strlen(inst->service->name) + 2]; - snprintf(tasknm, sizeof(tasknm), "%s-%s", inst->service->name, avro_task_name); + Worker* worker = static_cast(mxs_rworker_get(MXS_RWORKER_MAIN)); - /** Remove old task and create a new one */ - hktask_remove(tasknm); + if (inst->task_handle) + { + worker->cancel_delayed_call(inst->task_handle); + inst->task_handle = 0; + } if (start) { - hktask_add(tasknm, converter_func, inst, inst->task_delay); + inst->task_handle = worker->delayed_call(inst->task_delay * 1000, converter_func, inst); } rval = true; @@ -643,14 +647,6 @@ createInstance(SERVICE *service, char **options) avro_load_conversion_state(inst); avro_load_metadata_from_schemas(inst); - /* - * Add tasks for statistic computation - */ - /** Not used currenly - snprintf(task_name, BLRM_TASK_NAME_LEN, "%s stats", service->name); - hktask_add(task_name, stats_func, inst, AVRO_STATS_FREQ); - */ - /* Start the scan, read, convert AVRO task */ conversion_task_ctl(inst, true); @@ -901,9 +897,13 @@ static uint64_t getCapabilities(MXS_ROUTER* instance) /** * Conversion task: MySQL binlogs to AVRO files */ -bool converter_func(void* data) +bool converter_func(Worker::Call::action_t action, Avro* router) { - Avro* router = (Avro*) data; + if (action == Worker::Call::CANCEL) + { + return false; + } + bool ok = true; avro_binlog_end_t binlog_end = AVRO_OK; diff --git a/server/modules/routing/avrorouter/avrorouter.hh b/server/modules/routing/avrorouter/avrorouter.hh index e90b8a190..fca244fa4 100644 --- a/server/modules/routing/avrorouter/avrorouter.hh +++ b/server/modules/routing/avrorouter/avrorouter.hh @@ -229,6 +229,7 @@ struct Avro * a flush of all tables */ uint64_t block_size; /**< Avro datablock size */ enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */ + uint32_t task_handle; /**< Delayed task handle */ }; struct AvroSession