MXS-1299: Fix CREATE TABLE t1 LIKE t2 processing

The creation of tables from other tables was not working even though the
information was available inside the avrorouter.
This commit is contained in:
Markus Mäkelä 2017-06-28 11:25:57 +03:00
parent 84925b3f10
commit 40b1739249
3 changed files with 241 additions and 2 deletions

View File

@ -914,6 +914,15 @@ bool is_create_table_statement(AVRO_INSTANCE *router, char* ptr, size_t len)
return rc > 0;
}
bool is_create_like_statement(const char* ptr, size_t len)
{
char sql[len + 1];
memcpy(sql, ptr, len);
sql[len] = '\0';
// This is not pretty but it should work
return strcasestr(sql, " like ") || strcasestr(sql, "(like ");
}
/**
* @brief Detection of table alteration statements
@ -1023,7 +1032,16 @@ void handle_query_event(AVRO_INSTANCE *router, REP_HEADER *hdr, int *pending_tra
if (is_create_table_statement(router, sql, len))
{
TABLE_CREATE *created = table_create_alloc(sql, db);
TABLE_CREATE *created = NULL;
if (is_create_like_statement(sql, len))
{
created = table_create_copy(router, sql, len, db);
}
else
{
created = table_create_alloc(sql, db);
}
if (created && !save_and_replace_table_create(router, created))
{
@ -1056,7 +1074,6 @@ void handle_query_event(AVRO_INSTANCE *router, REP_HEADER *hdr, int *pending_tra
strcat(full_ident, ident);
TABLE_CREATE *created = hashtable_fetch(router->created_tables, full_ident);
ss_dassert(created);
if (created)
{

View File

@ -771,6 +771,227 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
return rval;
}
static const char* TOK_CREATE[] =
{
"CREATE",
NULL
};
static const char* TOK_TABLE[] =
{
"TABLE",
NULL
};
static const char* TOK_GROUP_REPLACE[] =
{
"OR",
"REPLACE",
NULL
};
static const char* TOK_GROUP_EXISTS[] =
{
"IF",
"NOT",
"EXISTS",
NULL
};
/**
* Read one token (i.e. SQL keyword)
*/
static const char* get_token(const char* ptr, const char* end, char* dest)
{
while (ptr < end && isspace(*ptr))
{
ptr++;
}
const char* start = ptr;
while (ptr < end && !isspace(*ptr))
{
ptr++;
}
size_t len = ptr - start;
memcpy(dest, start, len);
dest[len] = '\0';
return ptr;
}
/**
* Consume one token
*/
static bool chomp_one_token(const char* expected, const char** ptr, const char* end, char* buf)
{
bool rval = false;
const char* next = get_token(*ptr, end, buf);
if (strcasecmp(buf, expected) == 0)
{
rval = true;
*ptr = next;
}
return rval;
}
/**
* Consume all tokens in a group
*/
static bool chomp_tokens(const char** tokens, const char** ptr, const char* end, char* buf)
{
bool next = true;
bool rval = false;
do
{
next = false;
for (int i = 0; tokens[i]; i++)
{
if (chomp_one_token(tokens[i], ptr, end, buf))
{
rval = true;
next = true;
break;
}
}
}
while (next);
return rval;
}
/**
* Remove any extra characters from a string
*/
static void remove_extras(char* str)
{
char* end = strchr(str, '\0') - 1;
while (end > str && (*end == '`' || *end == ')' || *end == '('))
{
*end-- = '\0';
}
char* start = str;
while (start < end && (*start == '`' || *start == ')' || *start == '('))
{
start++;
}
size_t len = strlen(start);
memmove(str, start, len);
str[len] = '\0';
ss_dassert(strlen(str) == len);
}
/**
* Extract both tables from a `CREATE TABLE t1 LIKE t2` statement
*/
static bool extract_create_like_identifier(const char* sql, size_t len, char* target, char* source)
{
bool rval = false;
char buffer[len + 1];
buffer[0] = '\0';
const char* ptr = sql;
const char* end = ptr + sizeof(buffer);
if (chomp_tokens(TOK_CREATE, &ptr, end, buffer))
{
chomp_tokens(TOK_GROUP_REPLACE, &ptr, end, buffer);
if (chomp_tokens(TOK_TABLE, &ptr, end, buffer))
{
chomp_tokens(TOK_GROUP_EXISTS, &ptr, end, buffer);
// Read the target table name
ptr = get_token(ptr, end, buffer);
strcpy(target, buffer);
// Skip the LIKE token
ptr = get_token(ptr, end, buffer);
// Read the source table name
ptr = get_token(ptr, end, buffer);
remove_extras(buffer);
strcpy(source, buffer);
rval = true;
}
}
return rval;
}
/**
* Create a table from another table
*/
TABLE_CREATE* table_create_copy(AVRO_INSTANCE *router, const char* sql, size_t len, const char* db)
{
TABLE_CREATE* rval = NULL;
char target[MYSQL_TABLE_MAXLEN + 1] = "";
char source[MYSQL_TABLE_MAXLEN + 1] = "";
if (extract_create_like_identifier(sql, len, target, source))
{
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2] = "";
if (strchr(source, '.') == NULL)
{
strcpy(table_ident, db);
strcat(table_ident, ".");
}
strcat(table_ident, source);
TABLE_CREATE *old = hashtable_fetch(router->created_tables, table_ident);
if (old)
{
int n = old->columns;
char** names = MXS_MALLOC(sizeof(char*) * n);
char** types = MXS_MALLOC(sizeof(char*) * n);
int* lengths = MXS_MALLOC(sizeof(int) * n);
rval = MXS_MALLOC(sizeof(TABLE_CREATE));
MXS_ABORT_IF_FALSE(names && types && lengths && rval);
for (uint64_t i = 0; i < old->columns; i++)
{
names[i] = MXS_STRDUP_A(old->column_names[i]);
types[i] = MXS_STRDUP_A(old->column_types[i]);
lengths[i] = old->column_lengths[i];
}
rval->version = 1;
rval->was_used = false;
rval->column_names = names;
rval->column_lengths = lengths;
rval->column_types = types;
rval->columns = old->columns;
rval->database = MXS_STRDUP_A(db);
char* table = strchr(target, '.');
table = table ? table + 1 : target;
rval->table = MXS_STRDUP_A(table);
}
else
{
MXS_ERROR("Could not find table '%s' that '%s' is being created from: %.*s",
table_ident, target, (int)len, sql);
}
}
return rval;
}
/**
* Free a TABLE_CREATE structure
* @param value Value to free

View File

@ -302,6 +302,7 @@ extern void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *tab
extern TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create);
extern void table_map_free(TABLE_MAP *map);
extern TABLE_CREATE* table_create_alloc(const char* sql, const char* db);
extern TABLE_CREATE* table_create_copy(AVRO_INSTANCE *router, const char* sql, size_t len, const char* db);
extern void table_create_free(TABLE_CREATE* value);
extern bool table_create_save(TABLE_CREATE *create, const char *filename);
extern bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end);