Use STL containers in avrorouter
The HASHTABLE can be replaced with std::unordered_map. This simplifies the management by making the deletion of old objects automatic. More cleanup and refactoring is needed to make the contained classes cleaner.
This commit is contained in:
@ -440,33 +440,6 @@ void read_source_service_options(Avro *inst, const char** options,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* TABLE_CREATE free function for use with hashtable.
|
|
||||||
* @param v Pointer to a TABLE_CREATE
|
|
||||||
*/
|
|
||||||
static void table_create_hfree(void* v)
|
|
||||||
{
|
|
||||||
table_create_free((TABLE_CREATE*)v);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* AVRO_TABLE free function for use with hashtable.
|
|
||||||
* @param v Pointer to a AVRO_TABLE
|
|
||||||
*/
|
|
||||||
static void avro_table_hfree(void* v)
|
|
||||||
{
|
|
||||||
avro_table_free((AVRO_TABLE*)v);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* TABLE_MAP free function for use with hashtable.
|
|
||||||
* @param v Pointer to a TABLE_MAP
|
|
||||||
*/
|
|
||||||
static void table_map_hfree(void* v)
|
|
||||||
{
|
|
||||||
table_map_free((TABLE_MAP*)v);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create an instance of the router for a particular service
|
* Create an instance of the router for a particular service
|
||||||
* within MaxScale.
|
* within MaxScale.
|
||||||
@ -573,24 +546,6 @@ createInstance(SERVICE *service, char **options)
|
|||||||
MXS_NOTICE("[%s] First binlog is: %s", service->name, inst->binlog_name);
|
MXS_NOTICE("[%s] First binlog is: %s", service->name, inst->binlog_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((inst->table_maps = hashtable_alloc(1000, hashtable_item_strhash, hashtable_item_strcmp)) &&
|
|
||||||
(inst->open_tables = hashtable_alloc(1000, hashtable_item_strhash, hashtable_item_strcmp)) &&
|
|
||||||
(inst->created_tables = hashtable_alloc(1000, hashtable_item_strhash, hashtable_item_strcmp)))
|
|
||||||
{
|
|
||||||
hashtable_memory_fns(inst->table_maps, hashtable_item_strdup, NULL,
|
|
||||||
hashtable_item_free, table_map_hfree);
|
|
||||||
hashtable_memory_fns(inst->open_tables, hashtable_item_strdup, NULL,
|
|
||||||
hashtable_item_free, avro_table_hfree);
|
|
||||||
hashtable_memory_fns(inst->created_tables, hashtable_item_strdup, NULL,
|
|
||||||
hashtable_item_free, table_create_hfree);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
MXS_ERROR("Hashtable allocation failed. This is most likely caused "
|
|
||||||
"by a lack of available memory.");
|
|
||||||
err = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
int pcreerr;
|
int pcreerr;
|
||||||
size_t erroff;
|
size_t erroff;
|
||||||
pcre2_code *create_re = pcre2_compile((PCRE2_SPTR) create_table_regex,
|
pcre2_code *create_re = pcre2_compile((PCRE2_SPTR) create_table_regex,
|
||||||
@ -633,13 +588,10 @@ createInstance(SERVICE *service, char **options)
|
|||||||
if (err)
|
if (err)
|
||||||
{
|
{
|
||||||
sqlite3_close_v2(inst->sqlite_handle);
|
sqlite3_close_v2(inst->sqlite_handle);
|
||||||
hashtable_free(inst->table_maps);
|
|
||||||
hashtable_free(inst->open_tables);
|
|
||||||
hashtable_free(inst->created_tables);
|
|
||||||
MXS_FREE(inst->avrodir);
|
MXS_FREE(inst->avrodir);
|
||||||
MXS_FREE(inst->binlogdir);
|
MXS_FREE(inst->binlogdir);
|
||||||
MXS_FREE(inst->fileroot);
|
MXS_FREE(inst->fileroot);
|
||||||
MXS_FREE(inst);
|
delete inst;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,7 +103,8 @@ void avro_close_binlog(int fd)
|
|||||||
AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec,
|
AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec,
|
||||||
size_t block_size)
|
size_t block_size)
|
||||||
{
|
{
|
||||||
AVRO_TABLE *table = static_cast<AVRO_TABLE*>(MXS_CALLOC(1, sizeof(AVRO_TABLE)));
|
AVRO_TABLE *table = new (std::nothrow)AVRO_TABLE;
|
||||||
|
|
||||||
if (table)
|
if (table)
|
||||||
{
|
{
|
||||||
if (avro_schema_from_json_length(json_schema, strlen(json_schema),
|
if (avro_schema_from_json_length(json_schema, strlen(json_schema),
|
||||||
@ -147,6 +148,7 @@ AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, cons
|
|||||||
table->json_schema = MXS_STRDUP_A(json_schema);
|
table->json_schema = MXS_STRDUP_A(json_schema);
|
||||||
table->filename = MXS_STRDUP_A(filepath);
|
table->filename = MXS_STRDUP_A(filepath);
|
||||||
}
|
}
|
||||||
|
|
||||||
return table;
|
return table;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -304,24 +306,6 @@ bool avro_load_conversion_state(Avro *router)
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Free an AVRO_TABLE
|
|
||||||
*
|
|
||||||
* @param table Table to free
|
|
||||||
*/
|
|
||||||
void avro_table_free(AVRO_TABLE *table)
|
|
||||||
{
|
|
||||||
if (table)
|
|
||||||
{
|
|
||||||
avro_file_writer_flush(table->avro_file);
|
|
||||||
avro_file_writer_close(table->avro_file);
|
|
||||||
avro_value_iface_decref(table->avro_writer_iface);
|
|
||||||
avro_schema_decref(table->avro_schema);
|
|
||||||
MXS_FREE(table->json_schema);
|
|
||||||
MXS_FREE(table->filename);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Rotate to next file if it exists
|
* @brief Rotate to next file if it exists
|
||||||
*
|
*
|
||||||
@ -818,18 +802,13 @@ void avro_load_metadata_from_schemas(Avro *router)
|
|||||||
if (versionend == suffix)
|
if (versionend == suffix)
|
||||||
{
|
{
|
||||||
snprintf(table_ident, sizeof(table_ident), "%s.%s", db, table);
|
snprintf(table_ident, sizeof(table_ident), "%s.%s", db, table);
|
||||||
TABLE_CREATE *old =
|
auto it = router->created_tables.find(table_ident);
|
||||||
static_cast<TABLE_CREATE*>(hashtable_fetch(router->created_tables, table_ident));
|
|
||||||
|
|
||||||
if (old == NULL || version > old->version)
|
if (it == router->created_tables.end() || version > it->second->version)
|
||||||
{
|
{
|
||||||
TABLE_CREATE *created = table_create_from_schema(files.gl_pathv[i],
|
STableCreate created(table_create_from_schema(files.gl_pathv[i],
|
||||||
db, table, version);
|
db, table, version));
|
||||||
if (old)
|
router->created_tables[table_ident] = created;
|
||||||
{
|
|
||||||
hashtable_delete(router->created_tables, table_ident);
|
|
||||||
}
|
|
||||||
hashtable_add(router->created_tables, table_ident, created);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -848,29 +827,17 @@ void avro_load_metadata_from_schemas(Avro *router)
|
|||||||
*/
|
*/
|
||||||
void avro_flush_all_tables(Avro *router, enum avrorouter_file_op flush)
|
void avro_flush_all_tables(Avro *router, enum avrorouter_file_op flush)
|
||||||
{
|
{
|
||||||
HASHITERATOR *iter = hashtable_iterator(router->open_tables);
|
for (auto it = router->open_tables.begin(); it != router->open_tables.end(); it++)
|
||||||
|
|
||||||
if (iter)
|
|
||||||
{
|
{
|
||||||
char *key;
|
if (flush == AVROROUTER_FLUSH)
|
||||||
while ((key = (char*)hashtable_next(iter)))
|
|
||||||
{
|
{
|
||||||
AVRO_TABLE *table = static_cast<AVRO_TABLE*>(hashtable_fetch(router->open_tables, key));
|
avro_file_writer_flush(it->second->avro_file);
|
||||||
|
}
|
||||||
if (table)
|
else
|
||||||
{
|
{
|
||||||
if (flush == AVROROUTER_FLUSH)
|
ss_dassert(flush == AVROROUTER_SYNC);
|
||||||
{
|
avro_file_writer_sync(it->second->avro_file);
|
||||||
avro_file_writer_flush(table->avro_file);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ss_dassert(flush == AVROROUTER_SYNC);
|
|
||||||
avro_file_writer_sync(table->avro_file);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
hashtable_iterator_free(iter);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -968,29 +935,22 @@ bool save_and_replace_table_create(Avro *router, TABLE_CREATE *created)
|
|||||||
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
||||||
snprintf(table_ident, sizeof(table_ident), "%s.%s", created->database, created->table);
|
snprintf(table_ident, sizeof(table_ident), "%s.%s", created->database, created->table);
|
||||||
|
|
||||||
TABLE_CREATE *old = static_cast<TABLE_CREATE*>(hashtable_fetch(router->created_tables, table_ident));
|
auto it = router->created_tables.find(table_ident);
|
||||||
|
|
||||||
if (old)
|
if (it != router->created_tables.end())
|
||||||
{
|
{
|
||||||
HASHITERATOR *iter = hashtable_iterator(router->table_maps);
|
for (auto a = router->table_maps.begin(); a != router->table_maps.end(); a++)
|
||||||
|
|
||||||
char *key;
|
|
||||||
while ((key = static_cast<char*>(hashtable_next(iter))))
|
|
||||||
{
|
{
|
||||||
if (strcmp(key, table_ident) == 0)
|
if (a->first == table_ident)
|
||||||
{
|
{
|
||||||
TABLE_MAP* map = static_cast<TABLE_MAP*>(hashtable_fetch(router->table_maps, key));
|
router->active_maps[a->second->id % MAX_MAPPED_TABLES] = NULL;
|
||||||
router->active_maps[map->id % MAX_MAPPED_TABLES] = NULL;
|
|
||||||
hashtable_delete(router->table_maps, key);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
hashtable_iterator_free(iter);
|
router->table_maps.erase(table_ident);
|
||||||
|
|
||||||
hashtable_delete(router->created_tables, table_ident);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
hashtable_add(router->created_tables, table_ident, created);
|
router->created_tables[table_ident] = STableCreate(created);
|
||||||
ss_dassert(created->columns > 0);
|
ss_dassert(created->columns > 0);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -1119,11 +1079,11 @@ void handle_query_event(Avro *router, REP_HEADER *hdr, int *pending_transaction,
|
|||||||
}
|
}
|
||||||
else if (is_alter_table_statement(router, sql, len))
|
else if (is_alter_table_statement(router, sql, len))
|
||||||
{
|
{
|
||||||
TABLE_CREATE *created = static_cast<TABLE_CREATE*>(hashtable_fetch(router->created_tables, ident));
|
auto it = router->created_tables.find(ident);
|
||||||
|
|
||||||
if (created)
|
if (it != router->created_tables.end())
|
||||||
{
|
{
|
||||||
table_create_alter(created, sql, sql + len);
|
table_create_alter(it->second.get(), sql, sql + len);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -100,24 +100,28 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
int ev_len = router->event_type_hdr_lens[hdr->event_type];
|
int ev_len = router->event_type_hdr_lens[hdr->event_type];
|
||||||
|
|
||||||
read_table_info(ptr, ev_len, &id, table_ident, sizeof(table_ident));
|
read_table_info(ptr, ev_len, &id, table_ident, sizeof(table_ident));
|
||||||
TABLE_CREATE* create = static_cast<TABLE_CREATE*>(hashtable_fetch(router->created_tables, table_ident));
|
auto create = router->created_tables.find(table_ident);
|
||||||
|
|
||||||
if (create)
|
if (create != router->created_tables.end())
|
||||||
{
|
{
|
||||||
ss_dassert(create->columns > 0);
|
ss_dassert(create->second->columns > 0);
|
||||||
TABLE_MAP *old = static_cast<TABLE_MAP*>(hashtable_fetch(router->table_maps, table_ident));
|
auto it = router->table_maps.find(table_ident);
|
||||||
TABLE_MAP *map = table_map_alloc(ptr, ev_len, create);
|
STableMap map(table_map_alloc(ptr, ev_len, create->second.get()));
|
||||||
MXS_ABORT_IF_NULL(map); // Fatal error at this point
|
|
||||||
|
|
||||||
if (old && old->id == map->id && old->version == map->version &&
|
if (it != router->table_maps.end())
|
||||||
strcmp(old->table, map->table) == 0 &&
|
|
||||||
strcmp(old->database, map->database) == 0)
|
|
||||||
{
|
{
|
||||||
table_map_free(map);
|
auto old = it->second;
|
||||||
return true;
|
|
||||||
|
if (old->id == map->id && old->version == map->version &&
|
||||||
|
strcmp(old->table, map->table) == 0 &&
|
||||||
|
strcmp(old->database, map->database) == 0)
|
||||||
|
{
|
||||||
|
// We can reuse the table map object
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
char* json_schema = json_new_schema_from_table(map);
|
char* json_schema = json_new_schema_from_table(map.get());
|
||||||
|
|
||||||
if (json_schema)
|
if (json_schema)
|
||||||
{
|
{
|
||||||
@ -125,26 +129,25 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
snprintf(filepath, sizeof(filepath), "%s/%s.%06d.avro",
|
snprintf(filepath, sizeof(filepath), "%s/%s.%06d.avro",
|
||||||
router->avrodir, table_ident, map->version);
|
router->avrodir, table_ident, map->version);
|
||||||
|
|
||||||
/** Close the file and open a new one */
|
SAvroTable avro_table(avro_table_alloc(filepath, json_schema,
|
||||||
hashtable_delete(router->open_tables, table_ident);
|
codec_to_string(router->codec),
|
||||||
AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema,
|
router->block_size));
|
||||||
codec_to_string(router->codec),
|
|
||||||
router->block_size);
|
|
||||||
|
|
||||||
if (avro_table)
|
if (avro_table)
|
||||||
{
|
{
|
||||||
bool notify = old != NULL;
|
auto old = router->table_maps.find(table_ident);
|
||||||
|
bool notify = old != router->table_maps.end();
|
||||||
|
|
||||||
if (old)
|
if (notify)
|
||||||
{
|
{
|
||||||
router->active_maps[old->id % MAX_MAPPED_TABLES] = NULL;
|
router->active_maps[old->second->id % MAX_MAPPED_TABLES] = NULL;
|
||||||
}
|
}
|
||||||
hashtable_delete(router->table_maps, table_ident);
|
|
||||||
hashtable_add(router->table_maps, (void*)table_ident, map);
|
router->table_maps[table_ident] = map;
|
||||||
hashtable_add(router->open_tables, table_ident, avro_table);
|
router->open_tables[table_ident] = avro_table;
|
||||||
save_avro_schema(router->avrodir, json_schema, map);
|
save_avro_schema(router->avrodir, json_schema, map.get());
|
||||||
router->active_maps[map->id % MAX_MAPPED_TABLES] = map;
|
router->active_maps[map->id % MAX_MAPPED_TABLES] = map.get();
|
||||||
ss_dassert(router->active_maps[id % MAX_MAPPED_TABLES] == map);
|
ss_dassert(router->active_maps[id % MAX_MAPPED_TABLES] == map.get());
|
||||||
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
|
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
|
||||||
rval = true;
|
rval = true;
|
||||||
|
|
||||||
@ -292,9 +295,15 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
{
|
{
|
||||||
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
||||||
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database, map->table);
|
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database, map->table);
|
||||||
AVRO_TABLE* table = static_cast<AVRO_TABLE*>(hashtable_fetch(router->open_tables, table_ident));
|
SAvroTable table;
|
||||||
|
auto it = router->open_tables.find(table_ident);
|
||||||
|
|
||||||
|
if (it != router->open_tables.end())
|
||||||
|
{
|
||||||
|
table = it->second;
|
||||||
|
}
|
||||||
|
|
||||||
TABLE_CREATE* create = map->table_create;
|
TABLE_CREATE* create = map->table_create;
|
||||||
ss_dassert(hashtable_fetch(router->created_tables, table_ident) == create);
|
|
||||||
|
|
||||||
if (table && create && ncolumns == map->columns && create->columns == map->columns)
|
if (table && create && ncolumns == map->columns && create->columns == map->columns)
|
||||||
{
|
{
|
||||||
@ -343,7 +352,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
avro_value_decref(&record);
|
avro_value_decref(&record);
|
||||||
rval = true;
|
rval = true;
|
||||||
}
|
}
|
||||||
else if (table == NULL)
|
else if (!table)
|
||||||
{
|
{
|
||||||
MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier"
|
MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier"
|
||||||
" errors for more details.", map->database, map->table);
|
" errors for more details.", map->database, map->table);
|
||||||
|
@ -753,7 +753,7 @@ TABLE_CREATE* table_create_alloc(char* ident, const char* sql, int len)
|
|||||||
TABLE_CREATE *rval = NULL;
|
TABLE_CREATE *rval = NULL;
|
||||||
if (n_columns > 0)
|
if (n_columns > 0)
|
||||||
{
|
{
|
||||||
if ((rval = static_cast<TABLE_CREATE*>(MXS_MALLOC(sizeof(TABLE_CREATE)))))
|
if ((rval = new (std::nothrow) TABLE_CREATE))
|
||||||
{
|
{
|
||||||
rval->version = resolve_table_version(database, table);
|
rval->version = resolve_table_version(database, table);
|
||||||
rval->was_used = false;
|
rval->was_used = false;
|
||||||
@ -771,7 +771,7 @@ TABLE_CREATE* table_create_alloc(char* ident, const char* sql, int len)
|
|||||||
{
|
{
|
||||||
MXS_FREE(rval->database);
|
MXS_FREE(rval->database);
|
||||||
MXS_FREE(rval->table);
|
MXS_FREE(rval->table);
|
||||||
MXS_FREE(rval);
|
delete rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < n_columns; i++)
|
for (int i = 0; i < n_columns; i++)
|
||||||
@ -990,15 +990,16 @@ TABLE_CREATE* table_create_copy(Avro *router, const char* sql, size_t len, const
|
|||||||
|
|
||||||
strcat(table_ident, source);
|
strcat(table_ident, source);
|
||||||
|
|
||||||
TABLE_CREATE *old = static_cast<TABLE_CREATE*>(hashtable_fetch(router->created_tables, table_ident));
|
auto it = router->created_tables.find(table_ident);
|
||||||
|
|
||||||
if (old)
|
if (it != router->created_tables.end())
|
||||||
{
|
{
|
||||||
|
auto old = it->second;
|
||||||
int n = old->columns;
|
int n = old->columns;
|
||||||
char** names = static_cast<char**>(MXS_MALLOC(sizeof(char*) * n));
|
char** names = static_cast<char**>(MXS_MALLOC(sizeof(char*) * n));
|
||||||
char** types = static_cast<char**>(MXS_MALLOC(sizeof(char*) * n));
|
char** types = static_cast<char**>(MXS_MALLOC(sizeof(char*) * n));
|
||||||
int* lengths = static_cast<int*>(MXS_MALLOC(sizeof(int) * n));
|
int* lengths = static_cast<int*>(MXS_MALLOC(sizeof(int) * n));
|
||||||
rval = static_cast<TABLE_CREATE*>(MXS_MALLOC(sizeof(TABLE_CREATE)));
|
rval = new (std::nothrow) TABLE_CREATE;
|
||||||
|
|
||||||
MXS_ABORT_IF_FALSE(names && types && lengths && rval);
|
MXS_ABORT_IF_FALSE(names && types && lengths && rval);
|
||||||
|
|
||||||
@ -1031,28 +1032,6 @@ TABLE_CREATE* table_create_copy(Avro *router, const char* sql, size_t len, const
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Free a TABLE_CREATE structure
|
|
||||||
* @param value Value to free
|
|
||||||
*/
|
|
||||||
void table_create_free(TABLE_CREATE* value)
|
|
||||||
{
|
|
||||||
if (value)
|
|
||||||
{
|
|
||||||
for (uint64_t i = 0; i < value->columns; i++)
|
|
||||||
{
|
|
||||||
MXS_FREE(value->column_names[i]);
|
|
||||||
MXS_FREE(value->column_types[i]);
|
|
||||||
}
|
|
||||||
MXS_FREE(value->column_names);
|
|
||||||
MXS_FREE(value->column_types);
|
|
||||||
MXS_FREE(value->column_lengths);
|
|
||||||
MXS_FREE(value->table);
|
|
||||||
MXS_FREE(value->database);
|
|
||||||
MXS_FREE(value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static const char* get_next_def(const char* sql, const char* end)
|
static const char* get_next_def(const char* sql, const char* end)
|
||||||
{
|
{
|
||||||
int depth = 0;
|
int depth = 0;
|
||||||
@ -1669,7 +1648,7 @@ TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create)
|
|||||||
uint8_t* metadata = (uint8_t*)mxs_lestr_consume(&ptr, &metadata_size);
|
uint8_t* metadata = (uint8_t*)mxs_lestr_consume(&ptr, &metadata_size);
|
||||||
uint8_t *nullmap = ptr;
|
uint8_t *nullmap = ptr;
|
||||||
size_t nullmap_size = (column_count + 7) / 8;
|
size_t nullmap_size = (column_count + 7) / 8;
|
||||||
TABLE_MAP *map = static_cast<TABLE_MAP*>(MXS_MALLOC(sizeof(TABLE_MAP)));
|
TABLE_MAP *map = new (std::nothrow)TABLE_MAP;
|
||||||
|
|
||||||
if (map)
|
if (map)
|
||||||
{
|
{
|
||||||
@ -1706,20 +1685,3 @@ TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create)
|
|||||||
|
|
||||||
return map;
|
return map;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Free a table map
|
|
||||||
* @param map Table map to free
|
|
||||||
*/
|
|
||||||
void table_map_free(TABLE_MAP *map)
|
|
||||||
{
|
|
||||||
if (map)
|
|
||||||
{
|
|
||||||
MXS_FREE(map->column_types);
|
|
||||||
MXS_FREE(map->column_metadata);
|
|
||||||
MXS_FREE(map->null_bitmap);
|
|
||||||
MXS_FREE(map->database);
|
|
||||||
MXS_FREE(map->table);
|
|
||||||
MXS_FREE(map);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -17,7 +17,10 @@
|
|||||||
#include <maxscale/cdefs.h>
|
#include <maxscale/cdefs.h>
|
||||||
#include <stdbool.h>
|
#include <stdbool.h>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
#include <string>
|
||||||
|
#include <tr1/memory>
|
||||||
#include <blr_constants.h>
|
#include <blr_constants.h>
|
||||||
|
#include <maxscale/alloc.h>
|
||||||
#include <maxscale/dcb.h>
|
#include <maxscale/dcb.h>
|
||||||
#include <maxscale/service.h>
|
#include <maxscale/service.h>
|
||||||
#include <maxscale/spinlock.h>
|
#include <maxscale/spinlock.h>
|
||||||
@ -116,8 +119,22 @@ typedef enum avro_binlog_end
|
|||||||
#define AVRO_DATA_BURST_SIZE (32 * 1024)
|
#define AVRO_DATA_BURST_SIZE (32 * 1024)
|
||||||
|
|
||||||
/** A CREATE TABLE abstraction */
|
/** A CREATE TABLE abstraction */
|
||||||
typedef struct table_create
|
struct TABLE_CREATE
|
||||||
{
|
{
|
||||||
|
~TABLE_CREATE()
|
||||||
|
{
|
||||||
|
for (uint64_t i = 0; i < columns; i++)
|
||||||
|
{
|
||||||
|
MXS_FREE(column_names[i]);
|
||||||
|
MXS_FREE(column_types[i]);
|
||||||
|
}
|
||||||
|
MXS_FREE(column_names);
|
||||||
|
MXS_FREE(column_types);
|
||||||
|
MXS_FREE(column_lengths);
|
||||||
|
MXS_FREE(table);
|
||||||
|
MXS_FREE(database);
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t columns;
|
uint64_t columns;
|
||||||
char** column_names;
|
char** column_names;
|
||||||
char** column_types;
|
char** column_types;
|
||||||
@ -126,14 +143,23 @@ typedef struct table_create
|
|||||||
char* database;
|
char* database;
|
||||||
int version; /**< How many versions of this table have been used */
|
int version; /**< How many versions of this table have been used */
|
||||||
bool was_used; /**< Has this schema been persisted to disk */
|
bool was_used; /**< Has this schema been persisted to disk */
|
||||||
} TABLE_CREATE;
|
};
|
||||||
|
|
||||||
/** A representation of a table map event read from a binary log. A table map
|
/** A representation of a table map event read from a binary log. A table map
|
||||||
* maps a table to a unique ID which can be used to match row events to table map
|
* maps a table to a unique ID which can be used to match row events to table map
|
||||||
* events. The table map event tells us how the table is laid out and gives us
|
* events. The table map event tells us how the table is laid out and gives us
|
||||||
* some meta information on the columns. */
|
* some meta information on the columns. */
|
||||||
typedef struct table_map
|
struct TABLE_MAP
|
||||||
{
|
{
|
||||||
|
~TABLE_MAP()
|
||||||
|
{
|
||||||
|
MXS_FREE(column_types);
|
||||||
|
MXS_FREE(column_metadata);
|
||||||
|
MXS_FREE(null_bitmap);
|
||||||
|
MXS_FREE(database);
|
||||||
|
MXS_FREE(table);
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t id;
|
uint64_t id;
|
||||||
uint64_t columns;
|
uint64_t columns;
|
||||||
uint16_t flags;
|
uint16_t flags;
|
||||||
@ -146,7 +172,7 @@ typedef struct table_map
|
|||||||
char version_string[TABLE_MAP_VERSION_DIGITS + 1];
|
char version_string[TABLE_MAP_VERSION_DIGITS + 1];
|
||||||
char* table;
|
char* table;
|
||||||
char* database;
|
char* database;
|
||||||
} TABLE_MAP;
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The statistics for this AVRO router instance
|
* The statistics for this AVRO router instance
|
||||||
@ -156,14 +182,24 @@ typedef struct
|
|||||||
int n_clients; /*< Number client sessions created */
|
int n_clients; /*< Number client sessions created */
|
||||||
} AVRO_ROUTER_STATS;
|
} AVRO_ROUTER_STATS;
|
||||||
|
|
||||||
typedef struct avro_table_t
|
struct AVRO_TABLE
|
||||||
{
|
{
|
||||||
|
~AVRO_TABLE()
|
||||||
|
{
|
||||||
|
avro_file_writer_flush(avro_file);
|
||||||
|
avro_file_writer_close(avro_file);
|
||||||
|
avro_value_iface_decref(avro_writer_iface);
|
||||||
|
avro_schema_decref(avro_schema);
|
||||||
|
MXS_FREE(json_schema);
|
||||||
|
MXS_FREE(filename);
|
||||||
|
}
|
||||||
|
|
||||||
char* filename; /*< Absolute filename */
|
char* filename; /*< Absolute filename */
|
||||||
char* json_schema; /*< JSON representation of the schema */
|
char* json_schema; /*< JSON representation of the schema */
|
||||||
avro_file_writer_t avro_file; /*< Current Avro data file */
|
avro_file_writer_t avro_file; /*< Current Avro data file */
|
||||||
avro_value_iface_t* avro_writer_iface; /*< Avro C API writer interface */
|
avro_value_iface_t* avro_writer_iface; /*< Avro C API writer interface */
|
||||||
avro_schema_t avro_schema; /*< Native Avro schema of the table */
|
avro_schema_t avro_schema; /*< Native Avro schema of the table */
|
||||||
} AVRO_TABLE;
|
};
|
||||||
|
|
||||||
/** Data format used when streaming data to the clients */
|
/** Data format used when streaming data to the clients */
|
||||||
enum avro_data_format
|
enum avro_data_format
|
||||||
@ -192,6 +228,14 @@ typedef struct gtid_pos
|
|||||||
* rebuild GTID events in the correct order. */
|
* rebuild GTID events in the correct order. */
|
||||||
} gtid_pos_t;
|
} gtid_pos_t;
|
||||||
|
|
||||||
|
typedef std::tr1::shared_ptr<TABLE_CREATE> STableCreate;
|
||||||
|
typedef std::tr1::shared_ptr<AVRO_TABLE> SAvroTable;
|
||||||
|
typedef std::tr1::shared_ptr<TABLE_MAP> STableMap;
|
||||||
|
|
||||||
|
typedef std::tr1::unordered_map<std::string, STableCreate> CreatedTables;
|
||||||
|
typedef std::tr1::unordered_map<std::string, SAvroTable> AvroTables;
|
||||||
|
typedef std::tr1::unordered_map<std::string, STableMap> MappedTables;
|
||||||
|
|
||||||
struct Avro
|
struct Avro
|
||||||
{
|
{
|
||||||
SERVICE* service; /*< Pointer to the service using this router */
|
SERVICE* service; /*< Pointer to the service using this router */
|
||||||
@ -213,9 +257,9 @@ struct Avro
|
|||||||
uint8_t binlog_checksum;
|
uint8_t binlog_checksum;
|
||||||
gtid_pos_t gtid;
|
gtid_pos_t gtid;
|
||||||
TABLE_MAP* active_maps[MAX_MAPPED_TABLES];
|
TABLE_MAP* active_maps[MAX_MAPPED_TABLES];
|
||||||
HASHTABLE* table_maps;
|
MappedTables table_maps;
|
||||||
HASHTABLE* open_tables;
|
AvroTables open_tables;
|
||||||
HASHTABLE* created_tables;
|
CreatedTables created_tables;
|
||||||
sqlite3* sqlite_handle;
|
sqlite3* sqlite_handle;
|
||||||
char prevbinlog[BINLOG_FNAMELEN + 1];
|
char prevbinlog[BINLOG_FNAMELEN + 1];
|
||||||
int rotating; /*< Rotation in progress flag */
|
int rotating; /*< Rotation in progress flag */
|
||||||
@ -254,10 +298,8 @@ struct AvroSession
|
|||||||
extern void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id,
|
extern void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id,
|
||||||
char* dest, size_t len);
|
char* dest, size_t len);
|
||||||
extern TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create);
|
extern TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create);
|
||||||
extern void table_map_free(TABLE_MAP *map);
|
|
||||||
extern TABLE_CREATE* table_create_alloc(char* ident, const char* sql, int len);
|
extern TABLE_CREATE* table_create_alloc(char* ident, const char* sql, int len);
|
||||||
extern TABLE_CREATE* table_create_copy(Avro *router, const char* sql, size_t len, const char* db);
|
extern TABLE_CREATE* table_create_copy(Avro *router, const char* sql, size_t len, const char* db);
|
||||||
extern void table_create_free(TABLE_CREATE* value);
|
|
||||||
extern bool table_create_save(TABLE_CREATE *create, const char *filename);
|
extern bool table_create_save(TABLE_CREATE *create, const char *filename);
|
||||||
extern bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end);
|
extern bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end);
|
||||||
extern void read_table_identifier(const char* db, const char *sql, const char *end, char *dest, int size);
|
extern void read_table_identifier(const char* db, const char *sql, const char *end, char *dest, int size);
|
||||||
@ -268,7 +310,6 @@ extern void avro_close_binlog(int fd);
|
|||||||
extern avro_binlog_end_t avro_read_all_events(Avro *router);
|
extern avro_binlog_end_t avro_read_all_events(Avro *router);
|
||||||
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema,
|
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema,
|
||||||
const char *codec, size_t block_size);
|
const char *codec, size_t block_size);
|
||||||
extern void avro_table_free(AVRO_TABLE *table);
|
|
||||||
extern char* json_new_schema_from_table(TABLE_MAP *map);
|
extern char* json_new_schema_from_table(TABLE_MAP *map);
|
||||||
extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);
|
extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);
|
||||||
extern bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
|
extern bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
|
||||||
|
Reference in New Issue
Block a user