Merge branch '2.1' into 2.2

This commit is contained in:
Markus Mäkelä
2017-12-12 13:23:02 +02:00
97 changed files with 5945 additions and 361 deletions

View File

@ -657,6 +657,13 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router)
snprintf(next_file, sizeof(next_file), BINLOG_NAMEFMT, router->fileroot,
blr_file_get_next_binlogname(router->binlog_name));
}
else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT)
{
MXS_INFO("Annotate_rows_event: %.*s", hdr.event_size - BINLOG_EVENT_HDR_LEN, ptr);
pos += original_size;
router->current_pos = pos;
continue;
}
else if (hdr.event_type == TABLE_MAP_EVENT)
{
handle_table_map_event(router, &hdr, ptr);
@ -956,6 +963,8 @@ bool save_and_replace_table_create(AVRO_INSTANCE *router, TABLE_CREATE *created)
{
if (strcmp(key, table_ident) == 0)
{
TABLE_MAP* map = hashtable_fetch(router->table_maps, key);
router->active_maps[map->id % MAX_MAPPED_TABLES] = NULL;
hashtable_delete(router->table_maps, key);
}
}
@ -1000,13 +1009,13 @@ void handle_query_event(AVRO_INSTANCE *router, REP_HEADER *hdr, int *pending_tra
memcpy(db, (char*) ptr + PHDR_OFF + vblklen, dblen);
db[dblen] = 0;
unify_whitespace(sql, len);
size_t sqlsz = len, tmpsz = len;
char *tmp = MXS_MALLOC(len);
MXS_ABORT_IF_NULL(tmp);
remove_mysql_comments((const char**)&sql, &sqlsz, &tmp, &tmpsz);
sql = tmp;
len = tmpsz;
unify_whitespace(sql, len);
if (is_create_table_statement(router, sql, len))
{

View File

@ -104,72 +104,53 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
{
ss_dassert(create->columns > 0);
TABLE_MAP *old = hashtable_fetch(router->table_maps, table_ident);
TABLE_MAP *map = table_map_alloc(ptr, ev_len, create);
MXS_ABORT_IF_NULL(map); // Fatal error at this point
char* json_schema = json_new_schema_from_table(map);
if (old == NULL || old->version != create->version)
if (json_schema)
{
TABLE_MAP *map = table_map_alloc(ptr, ev_len, create);
char filepath[PATH_MAX + 1];
snprintf(filepath, sizeof(filepath), "%s/%s.%06d.avro",
router->avrodir, table_ident, map->version);
if (map)
/** Close the file and open a new one */
hashtable_delete(router->open_tables, table_ident);
AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema,
codec_to_string(router->codec),
router->block_size);
if (avro_table)
{
char* json_schema = json_new_schema_from_table(map);
bool notify = old != NULL;
if (json_schema)
if (old)
{
char filepath[PATH_MAX + 1];
snprintf(filepath, sizeof(filepath), "%s/%s.%06d.avro",
router->avrodir, table_ident, map->version);
/** Close the file and open a new one */
hashtable_delete(router->open_tables, table_ident);
AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema,
codec_to_string(router->codec),
router->block_size);
if (avro_table)
{
bool notify = old != NULL;
if (old)
{
router->active_maps[old->id % MAX_MAPPED_TABLES] = NULL;
}
hashtable_delete(router->table_maps, table_ident);
hashtable_add(router->table_maps, (void*) table_ident, map);
hashtable_add(router->open_tables, table_ident, avro_table);
save_avro_schema(router->avrodir, json_schema, map);
router->active_maps[map->id % MAX_MAPPED_TABLES] = map;
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
rval = true;
hashtable_add(router->table_maps, (void*)table_ident, map);
hashtable_add(router->open_tables, table_ident, avro_table);
save_avro_schema(router->avrodir, json_schema, map);
router->active_maps[map->id % MAX_MAPPED_TABLES] = map;
ss_dassert(router->active_maps[id % MAX_MAPPED_TABLES] == map);
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
rval = true;
if (notify)
{
notify_all_clients(router);
}
}
else
{
MXS_ERROR("Failed to open new Avro file for writing.");
}
MXS_FREE(json_schema);
}
else
if (notify)
{
MXS_ERROR("Failed to create JSON schema.");
notify_all_clients(router);
}
}
else
{
MXS_ERROR("Failed to allocate new table map.");
MXS_ERROR("Failed to open new Avro file for writing.");
}
MXS_FREE(json_schema);
}
else
{
router->active_maps[old->id % MAX_MAPPED_TABLES] = NULL;
table_map_remap(ptr, ev_len, old);
router->active_maps[old->id % MAX_MAPPED_TABLES] = old;
MXS_DEBUG("Table %s re-mapped to %lu", table_ident, old->id);
/** No changes in the schema */
rval = true;
MXS_ERROR("Failed to create JSON schema.");
}
}
else
@ -363,8 +344,9 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
}
else
{
MXS_ERROR("Row event and table map event have different column counts."
" Only full row image is currently supported.");
MXS_ERROR("Row event and table map event have different column "
"counts for table %s.%s, only full row image is currently "
"supported.", map->database, map->table);
}
}
else
@ -606,7 +588,6 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
}
MXS_INFO("[%ld] CHAR: field: %d bytes, data: %d bytes", i, field_length, bytes);
ss_dassert(bytes || *ptr == '\0');
char str[bytes + 1];
memcpy(str, ptr, bytes);
str[bytes] = '\0';

View File

@ -321,11 +321,11 @@ void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map)
* @return Pointer to the start of the definition of NULL if the query is
* malformed.
*/
static const char* get_table_definition(const char *sql, int* size)
static const char* get_table_definition(const char *sql, int len, int* size)
{
const char *rval = NULL;
const char *ptr = sql;
const char *end = strchr(sql, '\0');
const char *end = sql + len;
while (ptr < end && *ptr != '(')
{
ptr++;
@ -403,10 +403,12 @@ static bool get_table_name(const char* sql, char* dest)
/**
* Extract the database name from a CREATE TABLE statement
*
* @param sql SQL statement
* @param dest Destination where the database name is extracted. Must be at least
* MYSQL_DATABASE_MAXLEN bytes long.
* @return True if extraction was successful
* MYSQL_DATABASE_MAXLEN bytes long.
*
* @return True if a database name was extracted
*/
static bool get_database_name(const char* sql, char* dest)
{
@ -426,22 +428,27 @@ static bool get_database_name(const char* sql, char* dest)
ptr--;
}
while (*ptr == '`' || *ptr == '.' || isspace(*ptr))
if (*ptr == '.')
{
ptr--;
// The query defines an explicit database
while (*ptr == '`' || *ptr == '.' || isspace(*ptr))
{
ptr--;
}
const char* end = ptr + 1;
while (*ptr != '`' && *ptr != '.' && !isspace(*ptr))
{
ptr--;
}
ptr++;
memcpy(dest, ptr, end - ptr);
dest[end - ptr] = '\0';
rval = true;
}
const char* end = ptr + 1;
while (*ptr != '`' && *ptr != '.' && !isspace(*ptr))
{
ptr--;
}
ptr++;
memcpy(dest, ptr, end - ptr);
dest[end - ptr] = '\0';
rval = true;
}
return rval;
@ -512,12 +519,16 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
}
}
if (strncasecmp(ptr, "constraint", 10) == 0 || strncasecmp(ptr, "index", 5) == 0 ||
strncasecmp(ptr, "key", 3) == 0 || strncasecmp(ptr, "fulltext", 8) == 0 ||
strncasecmp(ptr, "spatial", 7) == 0 || strncasecmp(ptr, "foreign", 7) == 0 ||
strncasecmp(ptr, "unique", 6) == 0 || strncasecmp(ptr, "primary", 7) == 0)
if (!bt)
{
return NULL;
if (strncasecmp(ptr, "constraint", 10) == 0 || strncasecmp(ptr, "index", 5) == 0 ||
strncasecmp(ptr, "key", 3) == 0 || strncasecmp(ptr, "fulltext", 8) == 0 ||
strncasecmp(ptr, "spatial", 7) == 0 || strncasecmp(ptr, "foreign", 7) == 0 ||
strncasecmp(ptr, "unique", 6) == 0 || strncasecmp(ptr, "primary", 7) == 0)
{
// Found a keyword
return NULL;
}
}
const char *start = ptr;
@ -694,35 +705,42 @@ TABLE_CREATE* table_create_from_schema(const char* file, const char* db,
* @param db Database where this query was executed
* @return New CREATE_TABLE object or NULL if an error occurred
*/
TABLE_CREATE* table_create_alloc(const char* sql, int len, const char* event_db)
TABLE_CREATE* table_create_alloc(const char* sql, int len, const char* db)
{
/** Extract the table definition so we can get the column names from it */
int stmt_len = 0;
const char* statement_sql = get_table_definition(sql, &stmt_len);
const char* statement_sql = get_table_definition(sql, len, &stmt_len);
ss_dassert(statement_sql);
char table[MYSQL_TABLE_MAXLEN + 1];
char database[MYSQL_DATABASE_MAXLEN + 1];
const char *db = event_db;
const char* err = NULL;
MXS_INFO("Create table: %.*s", len, sql);
if (!get_table_name(sql, table))
if (!statement_sql)
{
MXS_ERROR("Malformed CREATE TABLE statement, could not extract table name: %s", sql);
return NULL;
err = "table definition";
}
else if (!get_table_name(sql, table))
{
err = "table name";
}
/** The CREATE statement contains the database name */
if (strlen(db) == 0)
if (get_database_name(sql, database))
{
if (!get_database_name(sql, database))
{
MXS_ERROR("Malformed CREATE TABLE statement, could not extract "
"database name: %s", sql);
return NULL;
}
// The CREATE statement contains the database name
db = database;
}
else if (*db == '\0')
{
// No explicit or current database
err = "database name";
}
if (err)
{
MXS_ERROR("Malformed CREATE TABLE statement, could not extract %s: %.*s", err, len, sql);
return NULL;
}
int* lengths = NULL;
char **names = NULL;
@ -893,6 +911,27 @@ static void remove_extras(char* str)
ss_dassert(strlen(str) == len);
}
static void remove_backticks(char* src)
{
char* dest = src;
while (*src)
{
if (*src != '`')
{
// Non-backtick character, keep it
*dest = *src;
dest++;
}
src++;
}
ss_dassert(dest == src || (*dest != '\0' && dest < src));
*dest = '\0';
}
/**
* Extract both tables from a `CREATE TABLE t1 LIKE t2` statement
*/
@ -1095,10 +1134,12 @@ static bool tok_eq(const char *a, const char *b, size_t len)
void read_alter_identifier(const char *sql, const char *end, char *dest, int size)
{
int len = 0;
const char *tok = get_tok(sql, &len, end);
if (tok && (tok = get_tok(tok + len, &len, end)) && (tok = get_tok(tok + len, &len, end)))
const char *tok = get_tok(sql, &len, end); // ALTER
if (tok && (tok = get_tok(tok + len, &len, end)) // TABLE
&& (tok = get_tok(tok + len, &len, end))) // Table identifier
{
snprintf(dest, size, "%.*s", len, tok);
remove_backticks(dest);
}
}
@ -1174,20 +1215,33 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
if (tok_eq(ptok, "add", plen) && tok_eq(tok, "column", len))
{
tok = get_tok(tok + len, &len, end);
create->column_names = MXS_REALLOC(create->column_names, sizeof(char*) * (create->columns + 1));
create->column_types = MXS_REALLOC(create->column_types, sizeof(char*) * (create->columns + 1));
create->column_lengths = MXS_REALLOC(create->column_lengths, sizeof(int) * (create->columns + 1));
char avro_token[len + 1];
make_avro_token(avro_token, tok, len);
char field_type[200] = ""; // Enough to hold all types
int field_length = extract_type_length(tok + len, field_type);
create->column_names[create->columns] = MXS_STRDUP_A(avro_token);
create->column_types[create->columns] = MXS_STRDUP_A(field_type);
create->column_lengths[create->columns] = field_length;
create->columns++;
updates++;
bool is_new = true;
for (uint64_t i = 0; i < create->columns; i++)
{
if (strcmp(avro_token, create->column_names[i]) == 0)
{
is_new = false;
break;
}
}
if (is_new)
{
create->column_names = MXS_REALLOC(create->column_names, sizeof(char*) * (create->columns + 1));
create->column_types = MXS_REALLOC(create->column_types, sizeof(char*) * (create->columns + 1));
create->column_lengths = MXS_REALLOC(create->column_lengths, sizeof(int) * (create->columns + 1));
char field_type[200] = ""; // Enough to hold all types
int field_length = extract_type_length(tok + len, field_type);
create->column_names[create->columns] = MXS_STRDUP_A(avro_token);
create->column_types[create->columns] = MXS_STRDUP_A(field_type);
create->column_lengths[create->columns] = field_length;
create->columns++;
updates++;
}
tok = get_next_def(tok, end);
len = 0;
}
@ -1393,20 +1447,3 @@ void table_map_free(TABLE_MAP *map)
MXS_FREE(map);
}
}
/**
* @brief Map a table to a different ID
*
* This updates the table ID that the @c TABLE_MAP object is assigned with
*
* @param ptr Pointer to the start of a table map event
* @param hdr_len Post-header length
* @param map Table map to remap
*/
void table_map_remap(uint8_t *ptr, uint8_t hdr_len, TABLE_MAP *map)
{
uint64_t table_id = 0;
size_t id_size = hdr_len == 6 ? 4 : 6;
memcpy(&table_id, ptr, id_size);
map->id = table_id;
}

View File

@ -328,7 +328,6 @@ extern char* json_new_schema_from_table(TABLE_MAP *map);
extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);
extern bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
extern bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
extern void table_map_remap(uint8_t *ptr, uint8_t hdr_len, TABLE_MAP *map);
enum avrorouter_file_op
{

View File

@ -229,6 +229,7 @@ static void blr_start_master(void* data)
return;
}
client->session = router->session;
client->service = router->service;
/**
* 'client' is the fake DCB that emulates a client session:
@ -265,6 +266,7 @@ static void blr_start_master(void* data)
return;
}
router->master->remote = MXS_STRDUP_A(router->service->dbref->server->name);
router->master->service = router->service;
MXS_NOTICE("%s: attempting to connect to master"
" server [%s]:%d, binlog='%s', pos=%lu%s%s",