MXS-1881: Rename event processing related objects
The main objects involved in replication event processing now use camel case.
This commit is contained in:
@ -44,7 +44,7 @@ bool is_create_table_statement(Avro *router, char* ptr, size_t len);
|
|||||||
void avro_notify_client(AvroSession *client);
|
void avro_notify_client(AvroSession *client);
|
||||||
void avro_update_index(Avro* router);
|
void avro_update_index(Avro* router);
|
||||||
void update_used_tables(Avro* router);
|
void update_used_tables(Avro* router);
|
||||||
TABLE_CREATE* table_create_from_schema(const char* file, const char* db,
|
TableCreateEvent* table_create_from_schema(const char* file, const char* db,
|
||||||
const char* table, int version);
|
const char* table, int version);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -99,7 +99,7 @@ void avro_close_binlog(int fd)
|
|||||||
* @param filepath Path to the created file
|
* @param filepath Path to the created file
|
||||||
* @param json_schema The schema of the table in JSON format
|
* @param json_schema The schema of the table in JSON format
|
||||||
*/
|
*/
|
||||||
AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec,
|
AvroTable* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec,
|
||||||
size_t block_size)
|
size_t block_size)
|
||||||
{
|
{
|
||||||
avro_file_writer_t avro_file;
|
avro_file_writer_t avro_file;
|
||||||
@ -141,7 +141,7 @@ AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, cons
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
AVRO_TABLE* table = new (std::nothrow) AVRO_TABLE(avro_file, avro_writer_iface, avro_schema);
|
AvroTable* table = new (std::nothrow) AvroTable(avro_file, avro_writer_iface, avro_schema);
|
||||||
|
|
||||||
if (!table)
|
if (!table)
|
||||||
{
|
{
|
||||||
@ -759,7 +759,7 @@ void avro_load_metadata_from_schemas(Avro *router)
|
|||||||
|
|
||||||
if (it == router->created_tables.end() || version > it->second->version)
|
if (it == router->created_tables.end() || version > it->second->version)
|
||||||
{
|
{
|
||||||
STableCreate created(table_create_from_schema(files.gl_pathv[i],
|
STableCreateEvent created(table_create_from_schema(files.gl_pathv[i],
|
||||||
db, table, version));
|
db, table, version));
|
||||||
router->created_tables[table_ident] = created;
|
router->created_tables[table_ident] = created;
|
||||||
}
|
}
|
||||||
@ -883,7 +883,7 @@ bool is_alter_table_statement(Avro *router, char* ptr, size_t len)
|
|||||||
* @param created Created table
|
* @param created Created table
|
||||||
* @return False if an error occurred and true if successful
|
* @return False if an error occurred and true if successful
|
||||||
*/
|
*/
|
||||||
bool save_and_replace_table_create(Avro *router, TABLE_CREATE *created)
|
bool save_and_replace_table_create(Avro *router, TableCreateEvent *created)
|
||||||
{
|
{
|
||||||
std::string table_ident = created->database + "." + created->table;
|
std::string table_ident = created->database + "." + created->table;
|
||||||
auto it = router->created_tables.find(table_ident);
|
auto it = router->created_tables.find(table_ident);
|
||||||
@ -899,7 +899,7 @@ bool save_and_replace_table_create(Avro *router, TABLE_CREATE *created)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
router->created_tables[table_ident] = STableCreate(created);
|
router->created_tables[table_ident] = STableCreateEvent(created);
|
||||||
ss_dassert(created->columns.size() > 0);
|
ss_dassert(created->columns.size() > 0);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -1001,7 +1001,7 @@ void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
|
|
||||||
if (is_create_table_statement(router, sql, len))
|
if (is_create_table_statement(router, sql, len))
|
||||||
{
|
{
|
||||||
TABLE_CREATE *created = NULL;
|
TableCreateEvent *created = NULL;
|
||||||
|
|
||||||
if (is_create_like_statement(sql, len))
|
if (is_create_like_statement(sql, len))
|
||||||
{
|
{
|
||||||
|
|||||||
@ -30,7 +30,7 @@ static bool warn_bit = false; /**< Remove when support for BIT is added */
|
|||||||
static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET values
|
static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET values
|
||||||
* larger than 255 is added */
|
* larger than 255 is added */
|
||||||
|
|
||||||
uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create,
|
uint8_t* process_row_event_data(TableMapEvent *map, TableCreateEvent *create,
|
||||||
avro_value_t *record, uint8_t *ptr,
|
avro_value_t *record, uint8_t *ptr,
|
||||||
uint8_t *columns_present, uint8_t *end);
|
uint8_t *columns_present, uint8_t *end);
|
||||||
void notify_all_clients(Avro *router);
|
void notify_all_clients(Avro *router);
|
||||||
@ -106,7 +106,7 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
{
|
{
|
||||||
ss_dassert(create->second->columns.size() > 0);
|
ss_dassert(create->second->columns.size() > 0);
|
||||||
auto it = router->table_maps.find(table_ident);
|
auto it = router->table_maps.find(table_ident);
|
||||||
STableMap map(table_map_alloc(ptr, ev_len, create->second.get()));
|
STableMapEvent map(table_map_alloc(ptr, ev_len, create->second.get()));
|
||||||
|
|
||||||
if (it != router->table_maps.end())
|
if (it != router->table_maps.end())
|
||||||
{
|
{
|
||||||
@ -292,7 +292,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
|
|
||||||
if (it != router->active_maps.end())
|
if (it != router->active_maps.end())
|
||||||
{
|
{
|
||||||
TABLE_MAP* map = it->second.get();
|
TableMapEvent* map = it->second.get();
|
||||||
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
||||||
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str());
|
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str());
|
||||||
SAvroTable table;
|
SAvroTable table;
|
||||||
@ -556,7 +556,7 @@ static bool all_fields_null(uint8_t* null_bitmap, int ncolumns)
|
|||||||
* this row event. Currently this should be a bitfield which has all bits set.
|
* this row event. Currently this should be a bitfield which has all bits set.
|
||||||
* @return Pointer to the first byte after the current row event
|
* @return Pointer to the first byte after the current row event
|
||||||
*/
|
*/
|
||||||
uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value_t *record,
|
uint8_t* process_row_event_data(TableMapEvent *map, TableCreateEvent *create, avro_value_t *record,
|
||||||
uint8_t *ptr, uint8_t *columns_present, uint8_t *end)
|
uint8_t *ptr, uint8_t *columns_present, uint8_t *end)
|
||||||
{
|
{
|
||||||
int npresent = 0;
|
int npresent = 0;
|
||||||
|
|||||||
@ -82,7 +82,7 @@ static const char* column_type_to_avro_type(uint8_t type)
|
|||||||
* @param create The TABLE_CREATE for this table
|
* @param create The TABLE_CREATE for this table
|
||||||
* @return New schema or NULL if an error occurred
|
* @return New schema or NULL if an error occurred
|
||||||
*/
|
*/
|
||||||
char* json_new_schema_from_table(const STableMap& map, const STableCreate& create)
|
char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create)
|
||||||
{
|
{
|
||||||
if (map->version != create->version)
|
if (map->version != create->version)
|
||||||
{
|
{
|
||||||
@ -249,7 +249,7 @@ bool json_extract_field_names(const char* filename, std::vector<Column>& columns
|
|||||||
* @param schema Schema in JSON format
|
* @param schema Schema in JSON format
|
||||||
* @param map Table map that @p schema represents
|
* @param map Table map that @p schema represents
|
||||||
*/
|
*/
|
||||||
void save_avro_schema(const char *path, const char* schema, STableMap& map, STableCreate& create)
|
void save_avro_schema(const char *path, const char* schema, STableMapEvent& map, STableCreateEvent& create)
|
||||||
{
|
{
|
||||||
char filepath[PATH_MAX];
|
char filepath[PATH_MAX];
|
||||||
snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avsc", path,
|
snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avsc", path,
|
||||||
@ -610,15 +610,15 @@ static void process_column_definition(const char *nameptr, std::vector<Column>&
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TABLE_CREATE* table_create_from_schema(const char* file, const char* db,
|
TableCreateEvent* table_create_from_schema(const char* file, const char* db,
|
||||||
const char* table, int version)
|
const char* table, int version)
|
||||||
{
|
{
|
||||||
TABLE_CREATE* newtable = NULL;
|
TableCreateEvent* newtable = NULL;
|
||||||
std::vector<Column> columns;
|
std::vector<Column> columns;
|
||||||
|
|
||||||
if (json_extract_field_names(file, columns))
|
if (json_extract_field_names(file, columns))
|
||||||
{
|
{
|
||||||
newtable = new (std::nothrow)TABLE_CREATE(db, table, version, columns);
|
newtable = new (std::nothrow)TableCreateEvent(db, table, version, columns);
|
||||||
}
|
}
|
||||||
|
|
||||||
return newtable;
|
return newtable;
|
||||||
@ -648,7 +648,7 @@ int resolve_table_version(const char* db, const char* table)
|
|||||||
*
|
*
|
||||||
* @return New CREATE_TABLE object or NULL if an error occurred
|
* @return New CREATE_TABLE object or NULL if an error occurred
|
||||||
*/
|
*/
|
||||||
TABLE_CREATE* table_create_alloc(char* ident, const char* sql, int len)
|
TableCreateEvent* table_create_alloc(char* ident, const char* sql, int len)
|
||||||
{
|
{
|
||||||
/** Extract the table definition so we can get the column names from it */
|
/** Extract the table definition so we can get the column names from it */
|
||||||
int stmt_len = 0;
|
int stmt_len = 0;
|
||||||
@ -667,12 +667,12 @@ TABLE_CREATE* table_create_alloc(char* ident, const char* sql, int len)
|
|||||||
std::vector<Column> columns;
|
std::vector<Column> columns;
|
||||||
process_column_definition(statement_sql, columns);
|
process_column_definition(statement_sql, columns);
|
||||||
|
|
||||||
TABLE_CREATE *rval = NULL;
|
TableCreateEvent *rval = NULL;
|
||||||
|
|
||||||
if (!columns.empty())
|
if (!columns.empty())
|
||||||
{
|
{
|
||||||
int version = resolve_table_version(database, table);
|
int version = resolve_table_version(database, table);
|
||||||
rval = new (std::nothrow) TABLE_CREATE(database, table, version, columns);
|
rval = new (std::nothrow) TableCreateEvent(database, table, version, columns);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -864,9 +864,9 @@ static bool extract_create_like_identifier(const char* sql, size_t len, char* ta
|
|||||||
/**
|
/**
|
||||||
* Create a table from another table
|
* Create a table from another table
|
||||||
*/
|
*/
|
||||||
TABLE_CREATE* table_create_copy(Avro *router, const char* sql, size_t len, const char* db)
|
TableCreateEvent* table_create_copy(Avro *router, const char* sql, size_t len, const char* db)
|
||||||
{
|
{
|
||||||
TABLE_CREATE* rval = NULL;
|
TableCreateEvent* rval = NULL;
|
||||||
char target[MYSQL_TABLE_MAXLEN + 1] = "";
|
char target[MYSQL_TABLE_MAXLEN + 1] = "";
|
||||||
char source[MYSQL_TABLE_MAXLEN + 1] = "";
|
char source[MYSQL_TABLE_MAXLEN + 1] = "";
|
||||||
|
|
||||||
@ -886,7 +886,7 @@ TABLE_CREATE* table_create_copy(Avro *router, const char* sql, size_t len, const
|
|||||||
|
|
||||||
if (it != router->created_tables.end())
|
if (it != router->created_tables.end())
|
||||||
{
|
{
|
||||||
rval = new (std::nothrow) TABLE_CREATE(*it->second);
|
rval = new (std::nothrow) TableCreateEvent(*it->second);
|
||||||
char* table = strchr(target, '.');
|
char* table = strchr(target, '.');
|
||||||
table = table ? table + 1 : target;
|
table = table ? table + 1 : target;
|
||||||
rval->table = table;
|
rval->table = table;
|
||||||
@ -1265,7 +1265,7 @@ static bool not_column_operation(const char* tok, int len)
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
|
bool table_create_alter(TableCreateEvent *create, const char *sql, const char *end)
|
||||||
{
|
{
|
||||||
const char *tbl = strcasestr(sql, "table"), *def;
|
const char *tbl = strcasestr(sql, "table"), *def;
|
||||||
|
|
||||||
@ -1435,7 +1435,7 @@ void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *tbl_id, ch
|
|||||||
* @param post_header_len Length of the event specific header, 8 or 6 bytes
|
* @param post_header_len Length of the event specific header, 8 or 6 bytes
|
||||||
* @return New TABLE_MAP or NULL if memory allocation failed
|
* @return New TABLE_MAP or NULL if memory allocation failed
|
||||||
*/
|
*/
|
||||||
TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create)
|
TableMapEvent *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TableCreateEvent* create)
|
||||||
{
|
{
|
||||||
uint64_t table_id = 0;
|
uint64_t table_id = 0;
|
||||||
size_t id_size = hdr_len == 6 ? 4 : 6;
|
size_t id_size = hdr_len == 6 ? 4 : 6;
|
||||||
@ -1475,6 +1475,6 @@ TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create)
|
|||||||
Bytes cols(column_types, column_types + column_count);
|
Bytes cols(column_types, column_types + column_count);
|
||||||
Bytes nulls(nullmap, nullmap + nullmap_size);
|
Bytes nulls(nullmap, nullmap + nullmap_size);
|
||||||
Bytes meta(metadata, metadata + metadata_size);
|
Bytes meta(metadata, metadata + metadata_size);
|
||||||
return new (std::nothrow)TABLE_MAP(schema_name, table_name, table_id, create->version,
|
return new (std::nothrow)TableMapEvent(schema_name, table_name, table_id, create->version,
|
||||||
std::move(cols), std::move(nulls), std::move(meta));
|
std::move(cols), std::move(nulls), std::move(meta));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -137,9 +137,9 @@ struct Column
|
|||||||
};
|
};
|
||||||
|
|
||||||
/** A CREATE TABLE abstraction */
|
/** A CREATE TABLE abstraction */
|
||||||
struct TABLE_CREATE
|
struct TableCreateEvent
|
||||||
{
|
{
|
||||||
TABLE_CREATE(std::string db, std::string table, int version, std::vector<Column>& cols):
|
TableCreateEvent(std::string db, std::string table, int version, std::vector<Column>& cols):
|
||||||
table(table),
|
table(table),
|
||||||
database(db),
|
database(db),
|
||||||
version(version),
|
version(version),
|
||||||
@ -162,9 +162,9 @@ typedef std::vector<uint8_t> Bytes;
|
|||||||
* maps a table to a unique ID which can be used to match row events to table map
|
* maps a table to a unique ID which can be used to match row events to table map
|
||||||
* events. The table map event tells us how the table is laid out and gives us
|
* events. The table map event tells us how the table is laid out and gives us
|
||||||
* some meta information on the columns. */
|
* some meta information on the columns. */
|
||||||
struct TABLE_MAP
|
struct TableMapEvent
|
||||||
{
|
{
|
||||||
TABLE_MAP(const std::string& db, const std::string& table, uint64_t id,
|
TableMapEvent(const std::string& db, const std::string& table, uint64_t id,
|
||||||
int version, Bytes&& cols, Bytes&& nulls, Bytes&& metadata):
|
int version, Bytes&& cols, Bytes&& nulls, Bytes&& metadata):
|
||||||
database(db),
|
database(db),
|
||||||
table(table),
|
table(table),
|
||||||
@ -190,16 +190,16 @@ struct TABLE_MAP
|
|||||||
Bytes column_metadata;
|
Bytes column_metadata;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct AVRO_TABLE
|
struct AvroTable
|
||||||
{
|
{
|
||||||
AVRO_TABLE(avro_file_writer_t file, avro_value_iface_t* iface, avro_schema_t schema):
|
AvroTable(avro_file_writer_t file, avro_value_iface_t* iface, avro_schema_t schema):
|
||||||
avro_file(file),
|
avro_file(file),
|
||||||
avro_writer_iface(iface),
|
avro_writer_iface(iface),
|
||||||
avro_schema(schema)
|
avro_schema(schema)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
~AVRO_TABLE()
|
~AvroTable()
|
||||||
{
|
{
|
||||||
avro_file_writer_flush(avro_file);
|
avro_file_writer_flush(avro_file);
|
||||||
avro_file_writer_close(avro_file);
|
avro_file_writer_close(avro_file);
|
||||||
@ -248,14 +248,14 @@ struct gtid_pos_t
|
|||||||
* rebuild GTID events in the correct order. */
|
* rebuild GTID events in the correct order. */
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef std::tr1::shared_ptr<TABLE_CREATE> STableCreate;
|
typedef std::tr1::shared_ptr<TableCreateEvent> STableCreateEvent;
|
||||||
typedef std::tr1::shared_ptr<AVRO_TABLE> SAvroTable;
|
typedef std::tr1::shared_ptr<AvroTable> SAvroTable;
|
||||||
typedef std::tr1::shared_ptr<TABLE_MAP> STableMap;
|
typedef std::tr1::shared_ptr<TableMapEvent> STableMapEvent;
|
||||||
|
|
||||||
typedef std::tr1::unordered_map<std::string, STableCreate> CreatedTables;
|
typedef std::tr1::unordered_map<std::string, STableCreateEvent> CreatedTables;
|
||||||
typedef std::tr1::unordered_map<std::string, SAvroTable> AvroTables;
|
typedef std::tr1::unordered_map<std::string, SAvroTable> AvroTables;
|
||||||
typedef std::tr1::unordered_map<std::string, STableMap> MappedTables;
|
typedef std::tr1::unordered_map<std::string, STableMapEvent> MappedTables;
|
||||||
typedef std::tr1::unordered_map<uint64_t, STableMap> ActiveMaps;
|
typedef std::tr1::unordered_map<uint64_t, STableMapEvent> ActiveMaps;
|
||||||
|
|
||||||
class Avro: public MXS_ROUTER
|
class Avro: public MXS_ROUTER
|
||||||
{
|
{
|
||||||
@ -358,21 +358,21 @@ private:
|
|||||||
|
|
||||||
extern void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id,
|
extern void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id,
|
||||||
char* dest, size_t len);
|
char* dest, size_t len);
|
||||||
extern TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create);
|
extern TableMapEvent *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TableCreateEvent* create);
|
||||||
extern TABLE_CREATE* table_create_alloc(char* ident, const char* sql, int len);
|
extern TableCreateEvent* table_create_alloc(char* ident, const char* sql, int len);
|
||||||
extern TABLE_CREATE* table_create_copy(Avro *router, const char* sql, size_t len, const char* db);
|
extern TableCreateEvent* table_create_copy(Avro *router, const char* sql, size_t len, const char* db);
|
||||||
extern bool table_create_save(TABLE_CREATE *create, const char *filename);
|
extern bool table_create_save(TableCreateEvent *create, const char *filename);
|
||||||
extern bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end);
|
extern bool table_create_alter(TableCreateEvent *create, const char *sql, const char *end);
|
||||||
extern void read_table_identifier(const char* db, const char *sql, const char *end, char *dest, int size);
|
extern void read_table_identifier(const char* db, const char *sql, const char *end, char *dest, int size);
|
||||||
extern int avro_client_handle_request(Avro *, AvroSession *, GWBUF *);
|
extern int avro_client_handle_request(Avro *, AvroSession *, GWBUF *);
|
||||||
extern void avro_client_rotate(Avro *router, AvroSession *client, uint8_t *ptr);
|
extern void avro_client_rotate(Avro *router, AvroSession *client, uint8_t *ptr);
|
||||||
extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd);
|
extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd);
|
||||||
extern void avro_close_binlog(int fd);
|
extern void avro_close_binlog(int fd);
|
||||||
extern avro_binlog_end_t avro_read_all_events(Avro *router);
|
extern avro_binlog_end_t avro_read_all_events(Avro *router);
|
||||||
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema,
|
extern AvroTable* avro_table_alloc(const char* filepath, const char* json_schema,
|
||||||
const char *codec, size_t block_size);
|
const char *codec, size_t block_size);
|
||||||
extern char* json_new_schema_from_table(const STableMap& map, const STableCreate& create);
|
extern char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create);
|
||||||
extern void save_avro_schema(const char *path, const char* schema, STableMap& map, STableCreate& create);
|
extern void save_avro_schema(const char *path, const char* schema, STableMapEvent& map, STableCreateEvent& create);
|
||||||
extern bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
|
extern bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
|
||||||
extern bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
|
extern bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user