From 075968e947ff1720d610285b3c58a4cd7db6a84a Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Mon, 28 Nov 2016 23:22:28 +0200 Subject: [PATCH] Allow avrorouter converstion to be started and stopped The modulecmd functionality allows the avrorouter to easily control the conversion process with one command. The conversion can now be started and stopped by the user. This also fixes a bug where the conversion would stop if there were no binlog files present when the service was started. --- server/modules/routing/avro/avro.c | 66 +++++++++++++++++++++++------- 1 file changed, 51 insertions(+), 15 deletions(-) diff --git a/server/modules/routing/avro/avro.c b/server/modules/routing/avro/avro.c index 73a361525..4cf5ad365 100644 --- a/server/modules/routing/avro/avro.c +++ b/server/modules/routing/avro/avro.c @@ -50,6 +50,7 @@ #include #include #include +#include #ifndef BINLOG_NAMEFMT #define BINLOG_NAMEFMT "%s.%06d" @@ -93,6 +94,7 @@ bool avro_save_conversion_state(AVRO_INSTANCE *router); static void stats_func(void *); void avro_index_file(AVRO_INSTANCE *router, const char* path); void avro_update_index(AVRO_INSTANCE* router); +static bool conversion_task_ctl(AVRO_INSTANCE *inst, bool start); /** The module object definition */ static ROUTER_OBJECT MyObject = @@ -123,6 +125,26 @@ version() return version_str; } +bool avro_handle_convert(const MODULECMD_ARG *args) +{ + bool rval = false; + + if (strcmp(args->argv[1].value.string, "start") == 0 && + conversion_task_ctl(args->argv[0].value.service->router_instance, true)) + { + MXS_NOTICE("Started conversion for service '%s'.", args->argv[0].value.service->name); + rval = true; + } + else if (strcmp(args->argv[1].value.string, "stop") == 0 && + conversion_task_ctl(args->argv[0].value.service->router_instance, false)) + { + MXS_NOTICE("Stopped conversion for service '%s'.", args->argv[0].value.service->name); + rval = true; + } + + return rval; +} + /** * The module initialisation routine, called when the module * is first loaded. @@ -133,6 +155,13 @@ ModuleInit() MXS_NOTICE("Initialized avrorouter module %s.\n", version_str); spinlock_init(&instlock); instances = NULL; + + modulecmd_arg_type_t args[] = + { + { MODULECMD_ARG_SERVICE, "The avrorouter service" }, + { MODULECMD_ARG_STRING, "Action, whether to 'start' or 'stop' the conversion process" } + }; + modulecmd_register_command("avrorouter", "convert", avro_handle_convert, 2, args); } /** @@ -224,22 +253,29 @@ bool create_tables(sqlite3* handle) return true; } -static bool add_conversion_task(AVRO_INSTANCE *inst) +static bool conversion_task_ctl(AVRO_INSTANCE *inst, bool start) { - char tasknm[strlen(avro_task_name) + strlen(inst->service->name) + 2]; - snprintf(tasknm, sizeof(tasknm), "%s-%s", inst->service->name, avro_task_name); - if (inst->service->svc_do_shutdown) + bool rval = false; + + if (!inst->service->svc_do_shutdown) { - MXS_INFO("AVRO converter task is not added due to MaxScale shutdown"); - return false; + char tasknm[strlen(avro_task_name) + strlen(inst->service->name) + 2]; + snprintf(tasknm, sizeof(tasknm), "%s-%s", inst->service->name, avro_task_name); + + /** Remove old task and create a new one */ + hktask_remove(tasknm); + + if (!start || hktask_add(tasknm, converter_func, inst, inst->task_delay)) + { + rval = true; + } + else + { + MXS_ERROR("Failed to add binlog to Avro conversion task to housekeeper."); + } } - MXS_INFO("Setting task for converter_func"); - if (hktask_oneshot(tasknm, converter_func, inst, inst->task_delay) == 0) - { - MXS_ERROR("Failed to add binlog to Avro conversion task to housekeeper."); - return false; - } - return true; + + return rval; } /** @@ -566,7 +602,7 @@ createInstance(SERVICE *service, char **options) */ /* Start the scan, read, convert AVRO task */ - add_conversion_task(inst); + conversion_task_ctl(inst, true); MXS_INFO("AVRO: current MySQL binlog file is %s, pos is %lu\n", inst->binlog_name, inst->current_pos); @@ -1047,7 +1083,7 @@ void converter_func(void* data) if (binlog_end == AVRO_LAST_FILE) { router->task_delay = MXS_MIN(router->task_delay + 1, AVRO_TASK_DELAY_MAX); - if (add_conversion_task(router)) + if (conversion_task_ctl(router, true)) { MXS_INFO("Stopped processing file %s at position %lu. Waiting until" " more data is written before continuing. Next check in %d seconds.",