MXS-936: AVRO converter_func update
MXS-936: AVRO converter_func update: added new checks for pending shutdown
This commit is contained in:
@ -224,7 +224,7 @@ bool create_tables(sqlite3* handle)
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void add_conversion_task(AVRO_INSTANCE *inst)
|
static bool add_conversion_task(AVRO_INSTANCE *inst)
|
||||||
{
|
{
|
||||||
char tasknm[strlen(avro_task_name) + strlen(inst->service->name) + 2];
|
char tasknm[strlen(avro_task_name) + strlen(inst->service->name) + 2];
|
||||||
snprintf(tasknm, sizeof(tasknm), "%s-%s", inst->service->name, avro_task_name);
|
snprintf(tasknm, sizeof(tasknm), "%s-%s", inst->service->name, avro_task_name);
|
||||||
@ -232,13 +232,15 @@ static void add_conversion_task(AVRO_INSTANCE *inst)
|
|||||||
{
|
{
|
||||||
MXS_INFO("AVRO converter task is not added due to MaxScale shutdown");
|
MXS_INFO("AVRO converter task is not added due to MaxScale shutdown");
|
||||||
avro_close_binlog(inst->binlog_fd);
|
avro_close_binlog(inst->binlog_fd);
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
MXS_INFO("Setting task for converter_func");
|
MXS_INFO("Setting task for converter_func");
|
||||||
if (hktask_oneshot(tasknm, converter_func, inst, inst->task_delay) == 0)
|
if (hktask_oneshot(tasknm, converter_func, inst, inst->task_delay) == 0)
|
||||||
{
|
{
|
||||||
MXS_ERROR("Failed to add binlog to Avro conversion task to housekeeper.");
|
MXS_ERROR("Failed to add binlog to Avro conversion task to housekeeper.");
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1020,13 +1022,14 @@ void converter_func(void* data)
|
|||||||
uint64_t start_pos = router->current_pos;
|
uint64_t start_pos = router->current_pos;
|
||||||
if (router->service->svc_do_shutdown)
|
if (router->service->svc_do_shutdown)
|
||||||
{
|
{
|
||||||
MXS_INFO("AVRO converter task is exiting due to MaxScale shutdown");
|
MXS_INFO("AVRO converter task is not handling events due to MaxScale shutdown");
|
||||||
avro_close_binlog(router->binlog_fd);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (avro_open_binlog(router->binlogdir, router->binlog_name, &router->binlog_fd))
|
if (avro_open_binlog(router->binlogdir, router->binlog_name, &router->binlog_fd))
|
||||||
{
|
{
|
||||||
|
binlog_end = avro_read_all_events(router);
|
||||||
|
|
||||||
if (router->current_pos != start_pos)
|
if (router->current_pos != start_pos)
|
||||||
{
|
{
|
||||||
/** We processed some data, reset the conversion task delay */
|
/** We processed some data, reset the conversion task delay */
|
||||||
@ -1041,16 +1044,14 @@ void converter_func(void* data)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (router->service->svc_do_shutdown)
|
|
||||||
{
|
|
||||||
MXS_INFO("AVRO converter task is exiting due to MaxScale shutdown");
|
|
||||||
avro_close_binlog(router->binlog_fd);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** We reached end of file, flush unwritten records to disk */
|
/** We reached end of file, flush unwritten records to disk */
|
||||||
if (router->task_delay == 1)
|
if (router->task_delay == 1)
|
||||||
{
|
{
|
||||||
|
if (router->service->svc_do_shutdown)
|
||||||
|
{
|
||||||
|
MXS_INFO("AVRO converter task is not indexing due to MaxScale shutdown");
|
||||||
|
return;
|
||||||
|
}
|
||||||
avro_flush_all_tables(router);
|
avro_flush_all_tables(router);
|
||||||
avro_save_conversion_state(router);
|
avro_save_conversion_state(router);
|
||||||
}
|
}
|
||||||
@ -1058,10 +1059,12 @@ void converter_func(void* data)
|
|||||||
if (binlog_end == AVRO_LAST_FILE)
|
if (binlog_end == AVRO_LAST_FILE)
|
||||||
{
|
{
|
||||||
router->task_delay = MXS_MIN(router->task_delay + 1, AVRO_TASK_DELAY_MAX);
|
router->task_delay = MXS_MIN(router->task_delay + 1, AVRO_TASK_DELAY_MAX);
|
||||||
add_conversion_task(router);
|
if (add_conversion_task(router))
|
||||||
MXS_INFO("Stopped processing file %s at position %lu. Waiting until"
|
{
|
||||||
" more data is written before continuing. Next check in %d seconds.",
|
MXS_INFO("Stopped processing file %s at position %lu. Waiting until"
|
||||||
router->binlog_name, router->current_pos, router->task_delay);
|
" more data is written before continuing. Next check in %d seconds.",
|
||||||
|
router->binlog_name, router->current_pos, router->task_delay);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user