From 6b6a7fa4a1b7ae555f9a58d476e5ad4b215fe0ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 12 May 2017 11:30:19 +0300 Subject: [PATCH] Do checkpoint processing at end of binlog When the binlog has been read, it needs to be treated as if the transaction or row limit has been hit. This will cause all tables to be flushed to disk before the files are indexed. --- server/modules/routing/avro/avro_file.c | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/server/modules/routing/avro/avro_file.c b/server/modules/routing/avro/avro_file.c index 4d9819f0d..fc8834cff 100644 --- a/server/modules/routing/avro/avro_file.c +++ b/server/modules/routing/avro/avro_file.c @@ -460,6 +460,17 @@ void notify_all_clients(AVRO_INSTANCE *router) } } +void do_checkpoint(AVRO_INSTANCE *router, uint64_t *total_rows, uint64_t *total_commits) +{ + update_used_tables(router); + avro_flush_all_tables(router); + avro_save_conversion_state(router); + notify_all_clients(router); + *total_rows += router->row_count; + *total_commits += router->trx_count; + router->row_count = router->trx_count = 0; +} + /** * @brief Read all replication events from a binlog file. * @@ -541,6 +552,8 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router) } else { + do_checkpoint(router, &total_rows, &total_commits); + MXS_INFO("Processed %lu transactions and %lu row events.", total_commits, total_rows); if (rotate_seen) @@ -734,13 +747,7 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router) if (router->row_count >= router->row_target || router->trx_count >= router->trx_target) { - update_used_tables(router); - avro_flush_all_tables(router); - avro_save_conversion_state(router); - notify_all_clients(router); - total_rows += router->row_count; - total_commits += router->trx_count; - router->row_count = router->trx_count = 0; + do_checkpoint(router, &total_rows, &total_commits); } }