Merge branch '2.1-oracle-compat' into develop-new-merge-oracle

This commit is contained in:
Johan Wikman
2017-06-28 22:30:16 +02:00
88 changed files with 15138 additions and 745 deletions

View File

@ -911,6 +911,15 @@ bool is_create_table_statement(AVRO_INSTANCE *router, char* ptr, size_t len)
return rc > 0;
}
bool is_create_like_statement(const char* ptr, size_t len)
{
char sql[len + 1];
memcpy(sql, ptr, len);
sql[len] = '\0';
// This is not pretty but it should work
return strcasestr(sql, " like ") || strcasestr(sql, "(like ");
}
/**
* @brief Detection of table alteration statements
@ -1020,7 +1029,16 @@ void handle_query_event(AVRO_INSTANCE *router, REP_HEADER *hdr, int *pending_tra
if (is_create_table_statement(router, sql, len))
{
TABLE_CREATE *created = table_create_alloc(sql, db);
TABLE_CREATE *created = NULL;
if (is_create_like_statement(sql, len))
{
created = table_create_copy(router, sql, len, db);
}
else
{
created = table_create_alloc(sql, db);
}
if (created && !save_and_replace_table_create(router, created))
{
@ -1053,7 +1071,6 @@ void handle_query_event(AVRO_INSTANCE *router, REP_HEADER *hdr, int *pending_tra
strcat(full_ident, ident);
TABLE_CREATE *created = hashtable_fetch(router->created_tables, full_ident);
ss_dassert(created);
if (created)
{

View File

@ -131,13 +131,13 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
if (old)
{
router->active_maps[old->id % sizeof(router->active_maps)] = NULL;
router->active_maps[old->id % MAX_MAPPED_TABLES] = NULL;
}
hashtable_delete(router->table_maps, table_ident);
hashtable_add(router->table_maps, (void*) table_ident, map);
hashtable_add(router->open_tables, table_ident, avro_table);
save_avro_schema(router->avrodir, json_schema, map);
router->active_maps[map->id % sizeof(router->active_maps)] = map;
router->active_maps[map->id % MAX_MAPPED_TABLES] = map;
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
rval = true;
@ -164,10 +164,10 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
}
else
{
ss_dassert(router->active_maps[old->id % sizeof(router->active_maps)] == old);
router->active_maps[old->id % sizeof(router->active_maps)] = NULL;
ss_dassert(router->active_maps[old->id % MAX_MAPPED_TABLES] == old);
router->active_maps[old->id % MAX_MAPPED_TABLES] = NULL;
table_map_remap(ptr, ev_len, old);
router->active_maps[old->id % sizeof(router->active_maps)] = old;
router->active_maps[old->id % MAX_MAPPED_TABLES] = old;
MXS_DEBUG("Table %s re-mapped to %lu", table_ident, old->id);
/** No changes in the schema */
rval = true;
@ -294,7 +294,7 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
/** There should always be a table map event prior to a row event.
* TODO: Make the active_maps dynamic */
TABLE_MAP *map = router->active_maps[table_id % sizeof(router->active_maps)];
TABLE_MAP *map = router->active_maps[table_id % MAX_MAPPED_TABLES];
if (map)
{

View File

@ -771,6 +771,227 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
return rval;
}
static const char* TOK_CREATE[] =
{
"CREATE",
NULL
};
static const char* TOK_TABLE[] =
{
"TABLE",
NULL
};
static const char* TOK_GROUP_REPLACE[] =
{
"OR",
"REPLACE",
NULL
};
static const char* TOK_GROUP_EXISTS[] =
{
"IF",
"NOT",
"EXISTS",
NULL
};
/**
* Read one token (i.e. SQL keyword)
*/
static const char* get_token(const char* ptr, const char* end, char* dest)
{
while (ptr < end && isspace(*ptr))
{
ptr++;
}
const char* start = ptr;
while (ptr < end && !isspace(*ptr))
{
ptr++;
}
size_t len = ptr - start;
memcpy(dest, start, len);
dest[len] = '\0';
return ptr;
}
/**
* Consume one token
*/
static bool chomp_one_token(const char* expected, const char** ptr, const char* end, char* buf)
{
bool rval = false;
const char* next = get_token(*ptr, end, buf);
if (strcasecmp(buf, expected) == 0)
{
rval = true;
*ptr = next;
}
return rval;
}
/**
* Consume all tokens in a group
*/
static bool chomp_tokens(const char** tokens, const char** ptr, const char* end, char* buf)
{
bool next = true;
bool rval = false;
do
{
next = false;
for (int i = 0; tokens[i]; i++)
{
if (chomp_one_token(tokens[i], ptr, end, buf))
{
rval = true;
next = true;
break;
}
}
}
while (next);
return rval;
}
/**
* Remove any extra characters from a string
*/
static void remove_extras(char* str)
{
char* end = strchr(str, '\0') - 1;
while (end > str && (*end == '`' || *end == ')' || *end == '('))
{
*end-- = '\0';
}
char* start = str;
while (start < end && (*start == '`' || *start == ')' || *start == '('))
{
start++;
}
size_t len = strlen(start);
memmove(str, start, len);
str[len] = '\0';
ss_dassert(strlen(str) == len);
}
/**
* Extract both tables from a `CREATE TABLE t1 LIKE t2` statement
*/
static bool extract_create_like_identifier(const char* sql, size_t len, char* target, char* source)
{
bool rval = false;
char buffer[len + 1];
buffer[0] = '\0';
const char* ptr = sql;
const char* end = ptr + sizeof(buffer);
if (chomp_tokens(TOK_CREATE, &ptr, end, buffer))
{
chomp_tokens(TOK_GROUP_REPLACE, &ptr, end, buffer);
if (chomp_tokens(TOK_TABLE, &ptr, end, buffer))
{
chomp_tokens(TOK_GROUP_EXISTS, &ptr, end, buffer);
// Read the target table name
ptr = get_token(ptr, end, buffer);
strcpy(target, buffer);
// Skip the LIKE token
ptr = get_token(ptr, end, buffer);
// Read the source table name
ptr = get_token(ptr, end, buffer);
remove_extras(buffer);
strcpy(source, buffer);
rval = true;
}
}
return rval;
}
/**
* Create a table from another table
*/
TABLE_CREATE* table_create_copy(AVRO_INSTANCE *router, const char* sql, size_t len, const char* db)
{
TABLE_CREATE* rval = NULL;
char target[MYSQL_TABLE_MAXLEN + 1] = "";
char source[MYSQL_TABLE_MAXLEN + 1] = "";
if (extract_create_like_identifier(sql, len, target, source))
{
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2] = "";
if (strchr(source, '.') == NULL)
{
strcpy(table_ident, db);
strcat(table_ident, ".");
}
strcat(table_ident, source);
TABLE_CREATE *old = hashtable_fetch(router->created_tables, table_ident);
if (old)
{
int n = old->columns;
char** names = MXS_MALLOC(sizeof(char*) * n);
char** types = MXS_MALLOC(sizeof(char*) * n);
int* lengths = MXS_MALLOC(sizeof(int) * n);
rval = MXS_MALLOC(sizeof(TABLE_CREATE));
MXS_ABORT_IF_FALSE(names && types && lengths && rval);
for (uint64_t i = 0; i < old->columns; i++)
{
names[i] = MXS_STRDUP_A(old->column_names[i]);
types[i] = MXS_STRDUP_A(old->column_types[i]);
lengths[i] = old->column_lengths[i];
}
rval->version = 1;
rval->was_used = false;
rval->column_names = names;
rval->column_lengths = lengths;
rval->column_types = types;
rval->columns = old->columns;
rval->database = MXS_STRDUP_A(db);
char* table = strchr(target, '.');
table = table ? table + 1 : target;
rval->table = MXS_STRDUP_A(table);
}
else
{
MXS_ERROR("Could not find table '%s' that '%s' is being created from: %.*s",
table_ident, target, (int)len, sql);
}
}
return rval;
}
/**
* Free a TABLE_CREATE structure
* @param value Value to free

View File

@ -310,6 +310,7 @@ extern void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *tab
extern TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create);
extern void table_map_free(TABLE_MAP *map);
extern TABLE_CREATE* table_create_alloc(const char* sql, const char* db);
extern TABLE_CREATE* table_create_copy(AVRO_INSTANCE *router, const char* sql, size_t len, const char* db);
extern void table_create_free(TABLE_CREATE* value);
extern bool table_create_save(TABLE_CREATE *create, const char *filename);
extern bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end);