MXS-2264: Add support for RENAME TABLE

RENAME TABLE is now fully supported and works as expected. With the fix to
table versioning, the new table name will receive the latest version
number.
This commit is contained in:
Markus Mäkelä 2019-09-30 09:57:07 +03:00
parent 7767231c8f
commit b223e6eed3
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19
3 changed files with 66 additions and 2 deletions

View File

@ -804,6 +804,13 @@ bool is_alter_table_statement(pcre2_code* alter_table_re, char* ptr, size_t len)
return rc > 0;
}
bool is_rename_table_statement(const char* sql)
{
int err = 0;
const char* pattern = "(?i)^\\s*rename\\s*table\\s*";
return mxs_pcre2_simple_match(pattern, sql, 0, &err) == MXS_PCRE2_MATCH;
}
/** Database name offset */
#define DBNM_OFF 8
@ -1006,7 +1013,10 @@ void Rpl::handle_query_event(REP_HEADER* hdr, uint8_t* ptr)
MXS_ERROR("Alter statement to table '%s' has no preceding create statement.", ident);
}
}
// TODO: Add COMMIT handling for non-transactional tables
else if (is_rename_table_statement(sql))
{
table_create_rename(db, sql, sql + len);
}
MXS_FREE(tmp);
}

View File

@ -934,7 +934,7 @@ static const char* get_tok(const char* sql, int* toklen, const char* end)
int depth = 0;
while (start + len < end)
{
if (isspace(start[len]) && depth == 0)
if (depth == 0 && (isspace(start[len]) || start[len] == ','))
{
*toklen = len;
return start;
@ -1429,6 +1429,59 @@ bool Rpl::table_create_alter(STableCreateEvent create, const char* sql, const ch
return true;
}
void Rpl::table_create_rename(const std::string& db, const char* sql, const char* end)
{
const char* tbl = strcasestr(sql, "table");
if (const char* def = strchr(tbl, ' '))
{
int len = 0;
const char* tok = def;
while (tok && (tok = get_tok(tok + len, &len, end)))
{
char old_name[len + 1];
make_avro_token(old_name, tok, len);
// Skip over the TO keyword
tok = get_tok(tok + len, &len, end);
tok = get_tok(tok + len, &len, end);
char new_name[len + 1];
make_avro_token(new_name, tok, len);
std::string from = strchr(old_name, '.') ? old_name : db + "." + old_name;
auto it = m_created_tables.find(from);
if (it != m_created_tables.end())
{
auto& create = it->second;
if (char* p = strchr(new_name, '.'))
{
*p++ = '\0';
create->database = new_name;
create->table = p;
}
else
{
create->database = db;
create->table = new_name;
}
MXS_INFO("Renamed '%s' to '%s'", from.c_str(), create->id().c_str());
create->version = ++m_versions[create->id()];
create->was_used = false;
rename_table_create(create, from);
}
tok = get_next_def(tok, end);
len = 0;
}
}
}
bool Rpl::table_matches(const std::string& ident)
{
bool rval = false;

View File

@ -294,5 +294,6 @@ private:
bool save_and_replace_table_create(STableCreateEvent created);
bool rename_table_create(STableCreateEvent created, const std::string& old_id);
bool table_create_alter(STableCreateEvent create, const char* sql, const char* end);
void table_create_rename(const std::string& db, const char* sql, const char* end);
bool table_matches(const std::string& ident);
};