MXS-936: avro router converter routine handles shutdown

MXS-936: avro router converter routine handles shutdown
This commit is contained in:
MassimilianoPinto
2016-11-09 16:26:05 +01:00
parent da152353dd
commit 802152d7d3

View File

@ -228,6 +228,13 @@ static void add_conversion_task(AVRO_INSTANCE *inst)
{
char tasknm[strlen(avro_task_name) + strlen(inst->service->name) + 2];
snprintf(tasknm, sizeof(tasknm), "%s-%s", inst->service->name, avro_task_name);
if (inst->service->svc_do_shutdown)
{
MXS_INFO("AVRO converter task is not added due to MaxScale shutdown");
avro_close_binlog(inst->binlog_fd);
return;
}
MXS_INFO("Setting task for converter_func");
if (hktask_oneshot(tasknm, converter_func, inst, inst->task_delay) == 0)
{
MXS_ERROR("Failed to add binlog to Avro conversion task to housekeeper.");
@ -1007,13 +1014,19 @@ void converter_func(void* data)
AVRO_INSTANCE* router = (AVRO_INSTANCE*) data;
bool ok = true;
avro_binlog_end_t binlog_end = AVRO_OK;
while (ok && binlog_end == AVRO_OK)
{
uint64_t start_pos = router->current_pos;
if (router->service->svc_do_shutdown)
{
MXS_INFO("AVRO converter task is exiting due to MaxScale shutdown");
avro_close_binlog(router->binlog_fd);
break;
}
if (avro_open_binlog(router->binlogdir, router->binlog_name, &router->binlog_fd))
{
binlog_end = avro_read_all_events(router);
if (router->current_pos != start_pos)
{
/** We processed some data, reset the conversion task delay */
@ -1028,6 +1041,13 @@ void converter_func(void* data)
}
}
if (router->service->svc_do_shutdown)
{
MXS_INFO("AVRO converter task is exiting due to MaxScale shutdown");
avro_close_binlog(router->binlog_fd);
return;
}
/** We reached end of file, flush unwritten records to disk */
if (router->task_delay == 1)
{