diff --git a/server/modules/routing/avro/avro_file.c b/server/modules/routing/avro/avro_file.c index 4d9819f0d..fc8834cff 100644 --- a/server/modules/routing/avro/avro_file.c +++ b/server/modules/routing/avro/avro_file.c @@ -460,6 +460,17 @@ void notify_all_clients(AVRO_INSTANCE *router) } } +void do_checkpoint(AVRO_INSTANCE *router, uint64_t *total_rows, uint64_t *total_commits) +{ + update_used_tables(router); + avro_flush_all_tables(router); + avro_save_conversion_state(router); + notify_all_clients(router); + *total_rows += router->row_count; + *total_commits += router->trx_count; + router->row_count = router->trx_count = 0; +} + /** * @brief Read all replication events from a binlog file. * @@ -541,6 +552,8 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router) } else { + do_checkpoint(router, &total_rows, &total_commits); + MXS_INFO("Processed %lu transactions and %lu row events.", total_commits, total_rows); if (rotate_seen) @@ -734,13 +747,7 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router) if (router->row_count >= router->row_target || router->trx_count >= router->trx_target) { - update_used_tables(router); - avro_flush_all_tables(router); - avro_save_conversion_state(router); - notify_all_clients(router); - total_rows += router->row_count; - total_commits += router->trx_count; - router->row_count = router->trx_count = 0; + do_checkpoint(router, &total_rows, &total_commits); } }