Allow avrorouter converstion to be started and stopped
The modulecmd functionality allows the avrorouter to easily control the conversion process with one command. The conversion can now be started and stopped by the user. This also fixes a bug where the conversion would stop if there were no binlog files present when the service was started.
This commit is contained in:
@ -50,6 +50,7 @@
|
|||||||
#include <binlog_common.h>
|
#include <binlog_common.h>
|
||||||
#include <avro/errors.h>
|
#include <avro/errors.h>
|
||||||
#include <maxscale/alloc.h>
|
#include <maxscale/alloc.h>
|
||||||
|
#include <maxscale/modulecmd.h>
|
||||||
|
|
||||||
#ifndef BINLOG_NAMEFMT
|
#ifndef BINLOG_NAMEFMT
|
||||||
#define BINLOG_NAMEFMT "%s.%06d"
|
#define BINLOG_NAMEFMT "%s.%06d"
|
||||||
@ -93,6 +94,7 @@ bool avro_save_conversion_state(AVRO_INSTANCE *router);
|
|||||||
static void stats_func(void *);
|
static void stats_func(void *);
|
||||||
void avro_index_file(AVRO_INSTANCE *router, const char* path);
|
void avro_index_file(AVRO_INSTANCE *router, const char* path);
|
||||||
void avro_update_index(AVRO_INSTANCE* router);
|
void avro_update_index(AVRO_INSTANCE* router);
|
||||||
|
static bool conversion_task_ctl(AVRO_INSTANCE *inst, bool start);
|
||||||
|
|
||||||
/** The module object definition */
|
/** The module object definition */
|
||||||
static ROUTER_OBJECT MyObject =
|
static ROUTER_OBJECT MyObject =
|
||||||
@ -123,6 +125,26 @@ version()
|
|||||||
return version_str;
|
return version_str;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool avro_handle_convert(const MODULECMD_ARG *args)
|
||||||
|
{
|
||||||
|
bool rval = false;
|
||||||
|
|
||||||
|
if (strcmp(args->argv[1].value.string, "start") == 0 &&
|
||||||
|
conversion_task_ctl(args->argv[0].value.service->router_instance, true))
|
||||||
|
{
|
||||||
|
MXS_NOTICE("Started conversion for service '%s'.", args->argv[0].value.service->name);
|
||||||
|
rval = true;
|
||||||
|
}
|
||||||
|
else if (strcmp(args->argv[1].value.string, "stop") == 0 &&
|
||||||
|
conversion_task_ctl(args->argv[0].value.service->router_instance, false))
|
||||||
|
{
|
||||||
|
MXS_NOTICE("Stopped conversion for service '%s'.", args->argv[0].value.service->name);
|
||||||
|
rval = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The module initialisation routine, called when the module
|
* The module initialisation routine, called when the module
|
||||||
* is first loaded.
|
* is first loaded.
|
||||||
@ -133,6 +155,13 @@ ModuleInit()
|
|||||||
MXS_NOTICE("Initialized avrorouter module %s.\n", version_str);
|
MXS_NOTICE("Initialized avrorouter module %s.\n", version_str);
|
||||||
spinlock_init(&instlock);
|
spinlock_init(&instlock);
|
||||||
instances = NULL;
|
instances = NULL;
|
||||||
|
|
||||||
|
modulecmd_arg_type_t args[] =
|
||||||
|
{
|
||||||
|
{ MODULECMD_ARG_SERVICE, "The avrorouter service" },
|
||||||
|
{ MODULECMD_ARG_STRING, "Action, whether to 'start' or 'stop' the conversion process" }
|
||||||
|
};
|
||||||
|
modulecmd_register_command("avrorouter", "convert", avro_handle_convert, 2, args);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -224,22 +253,29 @@ bool create_tables(sqlite3* handle)
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool add_conversion_task(AVRO_INSTANCE *inst)
|
static bool conversion_task_ctl(AVRO_INSTANCE *inst, bool start)
|
||||||
{
|
{
|
||||||
|
bool rval = false;
|
||||||
|
|
||||||
|
if (!inst->service->svc_do_shutdown)
|
||||||
|
{
|
||||||
char tasknm[strlen(avro_task_name) + strlen(inst->service->name) + 2];
|
char tasknm[strlen(avro_task_name) + strlen(inst->service->name) + 2];
|
||||||
snprintf(tasknm, sizeof(tasknm), "%s-%s", inst->service->name, avro_task_name);
|
snprintf(tasknm, sizeof(tasknm), "%s-%s", inst->service->name, avro_task_name);
|
||||||
if (inst->service->svc_do_shutdown)
|
|
||||||
|
/** Remove old task and create a new one */
|
||||||
|
hktask_remove(tasknm);
|
||||||
|
|
||||||
|
if (!start || hktask_add(tasknm, converter_func, inst, inst->task_delay))
|
||||||
{
|
{
|
||||||
MXS_INFO("AVRO converter task is not added due to MaxScale shutdown");
|
rval = true;
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
MXS_INFO("Setting task for converter_func");
|
else
|
||||||
if (hktask_oneshot(tasknm, converter_func, inst, inst->task_delay) == 0)
|
|
||||||
{
|
{
|
||||||
MXS_ERROR("Failed to add binlog to Avro conversion task to housekeeper.");
|
MXS_ERROR("Failed to add binlog to Avro conversion task to housekeeper.");
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
return true;
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -566,7 +602,7 @@ createInstance(SERVICE *service, char **options)
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
/* Start the scan, read, convert AVRO task */
|
/* Start the scan, read, convert AVRO task */
|
||||||
add_conversion_task(inst);
|
conversion_task_ctl(inst, true);
|
||||||
|
|
||||||
MXS_INFO("AVRO: current MySQL binlog file is %s, pos is %lu\n",
|
MXS_INFO("AVRO: current MySQL binlog file is %s, pos is %lu\n",
|
||||||
inst->binlog_name, inst->current_pos);
|
inst->binlog_name, inst->current_pos);
|
||||||
@ -1047,7 +1083,7 @@ void converter_func(void* data)
|
|||||||
if (binlog_end == AVRO_LAST_FILE)
|
if (binlog_end == AVRO_LAST_FILE)
|
||||||
{
|
{
|
||||||
router->task_delay = MXS_MIN(router->task_delay + 1, AVRO_TASK_DELAY_MAX);
|
router->task_delay = MXS_MIN(router->task_delay + 1, AVRO_TASK_DELAY_MAX);
|
||||||
if (add_conversion_task(router))
|
if (conversion_task_ctl(router, true))
|
||||||
{
|
{
|
||||||
MXS_INFO("Stopped processing file %s at position %lu. Waiting until"
|
MXS_INFO("Stopped processing file %s at position %lu. Waiting until"
|
||||||
" more data is written before continuing. Next check in %d seconds.",
|
" more data is written before continuing. Next check in %d seconds.",
|
||||||
|
|||||||
Reference in New Issue
Block a user