diff --git a/server/modules/routing/avro/avro.c b/server/modules/routing/avro/avro.c index 19643dd80..663187e01 100644 --- a/server/modules/routing/avro/avro.c +++ b/server/modules/routing/avro/avro.c @@ -224,7 +224,7 @@ bool create_tables(sqlite3* handle) return true; } -static void add_conversion_task(AVRO_INSTANCE *inst) +static bool add_conversion_task(AVRO_INSTANCE *inst) { char tasknm[strlen(avro_task_name) + strlen(inst->service->name) + 2]; snprintf(tasknm, sizeof(tasknm), "%s-%s", inst->service->name, avro_task_name); @@ -232,13 +232,15 @@ static void add_conversion_task(AVRO_INSTANCE *inst) { MXS_INFO("AVRO converter task is not added due to MaxScale shutdown"); avro_close_binlog(inst->binlog_fd); - return; + return false; } MXS_INFO("Setting task for converter_func"); if (hktask_oneshot(tasknm, converter_func, inst, inst->task_delay) == 0) { MXS_ERROR("Failed to add binlog to Avro conversion task to housekeeper."); + return false; } + return true; } /** @@ -1020,13 +1022,14 @@ void converter_func(void* data) uint64_t start_pos = router->current_pos; if (router->service->svc_do_shutdown) { - MXS_INFO("AVRO converter task is exiting due to MaxScale shutdown"); - avro_close_binlog(router->binlog_fd); + MXS_INFO("AVRO converter task is not handling events due to MaxScale shutdown"); break; } if (avro_open_binlog(router->binlogdir, router->binlog_name, &router->binlog_fd)) { + binlog_end = avro_read_all_events(router); + if (router->current_pos != start_pos) { /** We processed some data, reset the conversion task delay */ @@ -1041,16 +1044,14 @@ void converter_func(void* data) } } - if (router->service->svc_do_shutdown) - { - MXS_INFO("AVRO converter task is exiting due to MaxScale shutdown"); - avro_close_binlog(router->binlog_fd); - return; - } - /** We reached end of file, flush unwritten records to disk */ if (router->task_delay == 1) { + if (router->service->svc_do_shutdown) + { + MXS_INFO("AVRO converter task is not indexing due to MaxScale shutdown"); + return; + } avro_flush_all_tables(router); avro_save_conversion_state(router); } @@ -1058,10 +1059,12 @@ void converter_func(void* data) if (binlog_end == AVRO_LAST_FILE) { router->task_delay = MXS_MIN(router->task_delay + 1, AVRO_TASK_DELAY_MAX); - add_conversion_task(router); - MXS_INFO("Stopped processing file %s at position %lu. Waiting until" - " more data is written before continuing. Next check in %d seconds.", - router->binlog_name, router->current_pos, router->task_delay); + if (add_conversion_task(router)) + { + MXS_INFO("Stopped processing file %s at position %lu. Waiting until" + " more data is written before continuing. Next check in %d seconds.", + router->binlog_name, router->current_pos, router->task_delay); + } } }