diff --git a/server/modules/routing/avrorouter/avro.c b/server/modules/routing/avrorouter/avro.c index 4cf5ad365..3cdbb3d4e 100644 --- a/server/modules/routing/avrorouter/avro.c +++ b/server/modules/routing/avrorouter/avro.c @@ -1076,7 +1076,7 @@ void converter_func(void* data) /** We reached end of file, flush unwritten records to disk */ if (router->task_delay == 1) { - avro_flush_all_tables(router); + avro_flush_all_tables(router, true); avro_save_conversion_state(router); } diff --git a/server/modules/routing/avrorouter/avro_file.c b/server/modules/routing/avrorouter/avro_file.c index 269662af5..5892a3c73 100644 --- a/server/modules/routing/avrorouter/avro_file.c +++ b/server/modules/routing/avrorouter/avro_file.c @@ -46,7 +46,6 @@ static const char *ddl_list_name = "table-ddl.list"; void handle_query_event(AVRO_INSTANCE *router, REP_HEADER *hdr, int *pending_transaction, uint8_t *ptr); bool is_create_table_statement(AVRO_INSTANCE *router, char* ptr, size_t len); -void avro_flush_all_tables(AVRO_INSTANCE *router); void avro_notify_client(AVRO_CLIENT *client); void avro_update_index(AVRO_INSTANCE* router); void update_used_tables(AVRO_INSTANCE* router); @@ -745,7 +744,7 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router) router->trx_count >= router->trx_target) { update_used_tables(router); - avro_flush_all_tables(router); + avro_flush_all_tables(router, false); avro_save_conversion_state(router); notify_all_clients(router); total_rows += router->row_count; @@ -859,7 +858,7 @@ void avro_load_metadata_from_schemas(AVRO_INSTANCE *router) * @brief Flush all Avro records to disk * @param router Avro router instance */ -void avro_flush_all_tables(AVRO_INSTANCE *router) +void avro_flush_all_tables(AVRO_INSTANCE *router, enum avrorouter_file_op flush) { HASHITERATOR *iter = hashtable_iterator(router->open_tables); @@ -872,7 +871,15 @@ void avro_flush_all_tables(AVRO_INSTANCE *router) if (table) { - avro_file_writer_flush(table->avro_file); + if (flush == AVROROUTER_FLUSH) + { + avro_file_writer_flush(table->avro_file); + } + else + { + ss_dassert(flush == AVROROUTER_SYNC); + avro_file_writer_sync(table->avro_file); + } } } hashtable_iterator_free(iter); diff --git a/server/modules/routing/avrorouter/avrorouter.h b/server/modules/routing/avrorouter/avrorouter.h index 6bdeecceb..ff0b7bb93 100644 --- a/server/modules/routing/avrorouter/avrorouter.h +++ b/server/modules/routing/avrorouter/avrorouter.h @@ -295,13 +295,26 @@ extern void avro_close_binlog(int fd); extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router); extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema); extern void avro_table_free(AVRO_TABLE *table); -extern void avro_flush_all_tables(AVRO_INSTANCE *router); extern char* json_new_schema_from_table(TABLE_MAP *map); extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map); extern bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr); extern bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr); extern void table_map_remap(uint8_t *ptr, uint8_t hdr_len, TABLE_MAP *map); +enum avrorouter_file_op +{ + AVROROUTER_SYNC, + AVROROUTER_FLUSH +}; + +/** + * @brief Flush or sync all tables + * + * @param router Router instance + * @param flush AVROROUTER_SYNC for sync only or AVROROUTER_FLUSH for full flush + */ +extern void avro_flush_all_tables(AVRO_INSTANCE *router, enum avrorouter_file_op flush); + #define AVRO_CLIENT_UNREGISTERED 0x0000 #define AVRO_CLIENT_REGISTERED 0x0001 #define AVRO_CLIENT_REQUEST_DATA 0x0002