Only flush tables to disk at the end of the binlog
Currently, when the avrorouter finishes reading a binlog file or when a certain number of rows or transactions is reached, it will flush all tables to disk. This is quite slow as events can easily be written faster into the binlog than they can be processed by avrorouter. A solution to this would be to only sync the tables (close the Avro block) instead of flushing them to disk. This would allow more efficient processing of the files while still retaining the safe shutdown that flushing offers.
This commit is contained in:
@ -1076,7 +1076,7 @@ void converter_func(void* data)
|
||||
/** We reached end of file, flush unwritten records to disk */
|
||||
if (router->task_delay == 1)
|
||||
{
|
||||
avro_flush_all_tables(router);
|
||||
avro_flush_all_tables(router, true);
|
||||
avro_save_conversion_state(router);
|
||||
}
|
||||
|
||||
|
@ -46,7 +46,6 @@ static const char *ddl_list_name = "table-ddl.list";
|
||||
void handle_query_event(AVRO_INSTANCE *router, REP_HEADER *hdr,
|
||||
int *pending_transaction, uint8_t *ptr);
|
||||
bool is_create_table_statement(AVRO_INSTANCE *router, char* ptr, size_t len);
|
||||
void avro_flush_all_tables(AVRO_INSTANCE *router);
|
||||
void avro_notify_client(AVRO_CLIENT *client);
|
||||
void avro_update_index(AVRO_INSTANCE* router);
|
||||
void update_used_tables(AVRO_INSTANCE* router);
|
||||
@ -745,7 +744,7 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router)
|
||||
router->trx_count >= router->trx_target)
|
||||
{
|
||||
update_used_tables(router);
|
||||
avro_flush_all_tables(router);
|
||||
avro_flush_all_tables(router, false);
|
||||
avro_save_conversion_state(router);
|
||||
notify_all_clients(router);
|
||||
total_rows += router->row_count;
|
||||
@ -859,7 +858,7 @@ void avro_load_metadata_from_schemas(AVRO_INSTANCE *router)
|
||||
* @brief Flush all Avro records to disk
|
||||
* @param router Avro router instance
|
||||
*/
|
||||
void avro_flush_all_tables(AVRO_INSTANCE *router)
|
||||
void avro_flush_all_tables(AVRO_INSTANCE *router, enum avrorouter_file_op flush)
|
||||
{
|
||||
HASHITERATOR *iter = hashtable_iterator(router->open_tables);
|
||||
|
||||
@ -872,7 +871,15 @@ void avro_flush_all_tables(AVRO_INSTANCE *router)
|
||||
|
||||
if (table)
|
||||
{
|
||||
avro_file_writer_flush(table->avro_file);
|
||||
if (flush == AVROROUTER_FLUSH)
|
||||
{
|
||||
avro_file_writer_flush(table->avro_file);
|
||||
}
|
||||
else
|
||||
{
|
||||
ss_dassert(flush == AVROROUTER_SYNC);
|
||||
avro_file_writer_sync(table->avro_file);
|
||||
}
|
||||
}
|
||||
}
|
||||
hashtable_iterator_free(iter);
|
||||
|
@ -295,13 +295,26 @@ extern void avro_close_binlog(int fd);
|
||||
extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router);
|
||||
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema);
|
||||
extern void avro_table_free(AVRO_TABLE *table);
|
||||
extern void avro_flush_all_tables(AVRO_INSTANCE *router);
|
||||
extern char* json_new_schema_from_table(TABLE_MAP *map);
|
||||
extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);
|
||||
extern bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
|
||||
extern bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
|
||||
extern void table_map_remap(uint8_t *ptr, uint8_t hdr_len, TABLE_MAP *map);
|
||||
|
||||
enum avrorouter_file_op
|
||||
{
|
||||
AVROROUTER_SYNC,
|
||||
AVROROUTER_FLUSH
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Flush or sync all tables
|
||||
*
|
||||
* @param router Router instance
|
||||
* @param flush AVROROUTER_SYNC for sync only or AVROROUTER_FLUSH for full flush
|
||||
*/
|
||||
extern void avro_flush_all_tables(AVRO_INSTANCE *router, enum avrorouter_file_op flush);
|
||||
|
||||
#define AVRO_CLIENT_UNREGISTERED 0x0000
|
||||
#define AVRO_CLIENT_REGISTERED 0x0001
|
||||
#define AVRO_CLIENT_REQUEST_DATA 0x0002
|
||||
|
Reference in New Issue
Block a user