MXS-1881: Refactor TABLE_MAP initialization
Changed TABLE_MAP to use STL containers and types. The initialization is now done in the constructor. Removed unnecessary linkage between TABLE_MAP and TABLE_CREATE.
This commit is contained in:
parent
e35d9dfc10
commit
f61c56228c
@ -113,15 +113,14 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
auto old = it->second;
|
||||
|
||||
if (old->id == map->id && old->version == map->version &&
|
||||
strcmp(old->table, map->table) == 0 &&
|
||||
strcmp(old->database, map->database) == 0)
|
||||
old->table == map->table && old->database == map->database)
|
||||
{
|
||||
// We can reuse the table map object
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
char* json_schema = json_new_schema_from_table(map.get());
|
||||
char* json_schema = json_new_schema_from_table(map, create->second);
|
||||
|
||||
if (json_schema)
|
||||
{
|
||||
@ -145,7 +144,7 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
|
||||
router->table_maps[table_ident] = map;
|
||||
router->open_tables[table_ident] = avro_table;
|
||||
save_avro_schema(router->avrodir.c_str(), json_schema, map.get());
|
||||
save_avro_schema(router->avrodir.c_str(), json_schema, map, create->second);
|
||||
router->active_maps[map->id] = map;
|
||||
ss_dassert(router->active_maps[id] == map);
|
||||
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
|
||||
@ -295,7 +294,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
{
|
||||
TABLE_MAP* map = it->second.get();
|
||||
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
||||
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database, map->table);
|
||||
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str());
|
||||
SAvroTable table;
|
||||
auto it = router->open_tables.find(table_ident);
|
||||
|
||||
@ -304,9 +303,10 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
table = it->second;
|
||||
}
|
||||
|
||||
TABLE_CREATE* create = map->table_create;
|
||||
auto create = router->created_tables.find(table_ident);
|
||||
|
||||
if (table && create && ncolumns == map->columns && create->columns.size() == map->columns)
|
||||
if (table && create != router->created_tables.end() &&
|
||||
ncolumns == map->columns() && create->second->columns.size() == map->columns())
|
||||
{
|
||||
avro_value_t record;
|
||||
avro_generic_value_new(table->avro_writer_iface, &record);
|
||||
@ -325,7 +325,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
/** Add the current GTID and timestamp */
|
||||
int event_type = get_event_type(hdr->event_type);
|
||||
prepare_record(router, hdr, event_type, &record);
|
||||
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
|
||||
ptr = process_row_event_data(map, create->second.get(), &record, ptr, col_present, end);
|
||||
if (avro_file_writer_append_value(table->avro_file, &record))
|
||||
{
|
||||
MXS_ERROR("Failed to write value at position %ld: %s",
|
||||
@ -338,7 +338,8 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
if (event_type == UPDATE_EVENT)
|
||||
{
|
||||
prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record);
|
||||
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
|
||||
ptr = process_row_event_data(map, create->second.get(), &record, ptr, col_present, end);
|
||||
|
||||
if (avro_file_writer_append_value(table->avro_file, &record))
|
||||
{
|
||||
MXS_ERROR("Failed to write value at position %ld: %s",
|
||||
@ -356,25 +357,25 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
else if (!table)
|
||||
{
|
||||
MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier"
|
||||
" errors for more details.", map->database, map->table);
|
||||
" errors for more details.", map->database.c_str(), map->table.c_str());
|
||||
}
|
||||
else if (create == NULL)
|
||||
else if (create == router->created_tables.end())
|
||||
{
|
||||
MXS_ERROR("Create table statement for %s.%s was not found from the "
|
||||
"binary logs or the stored schema was not correct.",
|
||||
map->database, map->table);
|
||||
map->database.c_str(), map->table.c_str());
|
||||
}
|
||||
else if (ncolumns == map->columns && create->columns.size() != map->columns)
|
||||
else if (ncolumns == map->columns() && create->second->columns.size() != map->columns())
|
||||
{
|
||||
MXS_ERROR("Table map event has a different column count for table "
|
||||
"%s.%s than the CREATE TABLE statement. Possible "
|
||||
"unsupported DDL detected.", map->database, map->table);
|
||||
"unsupported DDL detected.", map->database.c_str(), map->table.c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Row event and table map event have different column "
|
||||
"counts for table %s.%s, only full row image is currently "
|
||||
"supported.", map->database, map->table);
|
||||
"supported.", map->database.c_str(), map->table.c_str());
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -560,8 +561,8 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
{
|
||||
int npresent = 0;
|
||||
avro_value_t field;
|
||||
long ncolumns = map->columns;
|
||||
uint8_t *metadata = map->column_metadata;
|
||||
long ncolumns = map->columns();
|
||||
uint8_t *metadata = &map->column_metadata[0];
|
||||
size_t metadata_offset = 0;
|
||||
|
||||
/** BIT type values use the extra bits in the row event header */
|
||||
@ -741,7 +742,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
sprintf(trace[i], "[%ld] %s", i, column_type_to_string(map->column_types[i]));
|
||||
check_overflow(ptr <= end);
|
||||
}
|
||||
ss_dassert(metadata_offset <= map->column_metadata_size);
|
||||
ss_dassert(metadata_offset <= map->column_metadata.size());
|
||||
metadata_offset += get_metadata_len(map->column_types[i]);
|
||||
}
|
||||
else
|
||||
|
@ -82,15 +82,14 @@ static const char* column_type_to_avro_type(uint8_t type)
|
||||
* @param create The TABLE_CREATE for this table
|
||||
* @return New schema or NULL if an error occurred
|
||||
*/
|
||||
char* json_new_schema_from_table(TABLE_MAP *map)
|
||||
char* json_new_schema_from_table(const STableMap& map, const STableCreate& create)
|
||||
{
|
||||
TABLE_CREATE *create = map->table_create;
|
||||
|
||||
if (map->version != create->version)
|
||||
{
|
||||
MXS_ERROR("Version mismatch for table %s.%s. Table map version is %d and "
|
||||
"the table definition version is %d.", map->database, map->table,
|
||||
map->version, create->version);
|
||||
"the table definition version is %d.", map->database.c_str(),
|
||||
map->table.c_str(), map->version, create->version);
|
||||
ss_dassert(!true); // Should not happen
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -123,7 +122,7 @@ char* json_new_schema_from_table(TABLE_MAP *map)
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:o}", "name", avro_event_type,
|
||||
"type", event_types));
|
||||
|
||||
for (uint64_t i = 0; i < map->columns && i < create->columns.size(); i++)
|
||||
for (uint64_t i = 0; i < map->columns() && i < create->columns.size(); i++)
|
||||
{
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}",
|
||||
"name", create->columns[i].name.c_str(),
|
||||
@ -250,21 +249,21 @@ bool json_extract_field_names(const char* filename, std::vector<Column>& columns
|
||||
* @param schema Schema in JSON format
|
||||
* @param map Table map that @p schema represents
|
||||
*/
|
||||
void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map)
|
||||
void save_avro_schema(const char *path, const char* schema, STableMap& map, STableCreate& create)
|
||||
{
|
||||
char filepath[PATH_MAX];
|
||||
snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avsc", path, map->database,
|
||||
map->table, map->version);
|
||||
snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avsc", path,
|
||||
map->database.c_str(), map->table.c_str(), map->version);
|
||||
|
||||
if (access(filepath, F_OK) != 0)
|
||||
{
|
||||
if (!map->table_create->was_used)
|
||||
if (!create->was_used)
|
||||
{
|
||||
FILE *file = fopen(filepath, "wb");
|
||||
if (file)
|
||||
{
|
||||
fprintf(file, "%s\n", schema);
|
||||
map->table_create->was_used = true;
|
||||
create->was_used = true;
|
||||
fclose(file);
|
||||
}
|
||||
}
|
||||
@ -1472,40 +1471,10 @@ TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create)
|
||||
uint8_t* metadata = (uint8_t*)mxs_lestr_consume(&ptr, &metadata_size);
|
||||
uint8_t *nullmap = ptr;
|
||||
size_t nullmap_size = (column_count + 7) / 8;
|
||||
TABLE_MAP *map = new (std::nothrow)TABLE_MAP;
|
||||
|
||||
if (map)
|
||||
{
|
||||
map->id = table_id;
|
||||
map->version = create->version;
|
||||
map->flags = flags;
|
||||
map->columns = column_count;
|
||||
map->column_types = static_cast<uint8_t*>(MXS_MALLOC(column_count));
|
||||
/** Allocate at least one byte for the metadata */
|
||||
map->column_metadata = static_cast<uint8_t*>(MXS_CALLOC(1, metadata_size + 1));
|
||||
map->column_metadata_size = metadata_size;
|
||||
map->null_bitmap = static_cast<uint8_t*>(MXS_MALLOC(nullmap_size));
|
||||
map->database = MXS_STRDUP(schema_name);
|
||||
map->table = MXS_STRDUP(table_name);
|
||||
map->table_create = create;
|
||||
if (map->column_types && map->database && map->table &&
|
||||
map->column_metadata && map->null_bitmap)
|
||||
{
|
||||
memcpy(map->column_types, column_types, column_count);
|
||||
memcpy(map->null_bitmap, nullmap, nullmap_size);
|
||||
memcpy(map->column_metadata, metadata, metadata_size);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_FREE(map->null_bitmap);
|
||||
MXS_FREE(map->column_metadata);
|
||||
MXS_FREE(map->column_types);
|
||||
MXS_FREE(map->database);
|
||||
MXS_FREE(map->table);
|
||||
MXS_FREE(map);
|
||||
map = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return map;
|
||||
Bytes cols(column_types, column_types + column_count);
|
||||
Bytes nulls(nullmap, nullmap + nullmap_size);
|
||||
Bytes meta(metadata, metadata + metadata_size);
|
||||
return new (std::nothrow)TABLE_MAP(schema_name, table_name, table_id, create->version,
|
||||
std::move(cols), std::move(nulls), std::move(meta));
|
||||
}
|
||||
|
@ -156,33 +156,38 @@ struct TABLE_CREATE
|
||||
bool was_used; /**< Has this schema been persisted to disk */
|
||||
};
|
||||
|
||||
typedef std::vector<uint8_t> Bytes;
|
||||
|
||||
/** A representation of a table map event read from a binary log. A table map
|
||||
* maps a table to a unique ID which can be used to match row events to table map
|
||||
* events. The table map event tells us how the table is laid out and gives us
|
||||
* some meta information on the columns. */
|
||||
struct TABLE_MAP
|
||||
{
|
||||
~TABLE_MAP()
|
||||
TABLE_MAP(const std::string& db, const std::string& table, uint64_t id,
|
||||
int version, Bytes&& cols, Bytes&& nulls, Bytes&& metadata):
|
||||
database(db),
|
||||
table(table),
|
||||
id(id),
|
||||
version(version),
|
||||
column_types(cols),
|
||||
null_bitmap(nulls),
|
||||
column_metadata(metadata)
|
||||
{
|
||||
MXS_FREE(column_types);
|
||||
MXS_FREE(column_metadata);
|
||||
MXS_FREE(null_bitmap);
|
||||
MXS_FREE(database);
|
||||
MXS_FREE(table);
|
||||
}
|
||||
|
||||
uint64_t id;
|
||||
uint64_t columns;
|
||||
uint16_t flags;
|
||||
uint8_t* column_types;
|
||||
uint8_t* null_bitmap;
|
||||
uint8_t* column_metadata;
|
||||
size_t column_metadata_size;
|
||||
TABLE_CREATE* table_create; /*< The definition of the table */
|
||||
int version;
|
||||
char version_string[TABLE_MAP_VERSION_DIGITS + 1];
|
||||
char* table;
|
||||
char* database;
|
||||
uint64_t columns() const
|
||||
{
|
||||
return column_types.size();
|
||||
}
|
||||
|
||||
std::string database;
|
||||
std::string table;
|
||||
uint64_t id;
|
||||
int version;
|
||||
Bytes column_types;
|
||||
Bytes null_bitmap;
|
||||
Bytes column_metadata;
|
||||
};
|
||||
|
||||
struct AVRO_TABLE
|
||||
@ -363,8 +368,8 @@ extern void avro_close_binlog(int fd);
|
||||
extern avro_binlog_end_t avro_read_all_events(Avro *router);
|
||||
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema,
|
||||
const char *codec, size_t block_size);
|
||||
extern char* json_new_schema_from_table(TABLE_MAP *map);
|
||||
extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);
|
||||
extern char* json_new_schema_from_table(const STableMap& map, const STableCreate& create);
|
||||
extern void save_avro_schema(const char *path, const char* schema, STableMap& map, STableCreate& create);
|
||||
extern bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
|
||||
extern bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user