
The code that handles the Avro files is now fully abstracted behind the AvroConverter class that implements the RowEventHandler interface. The code still has some avro specific behavior in a few places (parsing of JSON files into TableCreate objects). This can be replaced, if needed, by querying the master server for the CREATE TABLE statements.
1347 lines
31 KiB
C++
1347 lines
31 KiB
C++
/*
|
|
* Copyright (c) 2016 MariaDB Corporation Ab
|
|
*
|
|
* Use of this software is governed by the Business Source License included
|
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
|
*
|
|
* Change Date: 2020-01-01
|
|
*
|
|
* On the date above, in accordance with the Business Source License, use
|
|
* of this software will be governed by version 2 or later of the General
|
|
* Public License.
|
|
*/
|
|
|
|
/**
|
|
* @file avro_schema.c - Avro schema related functions
|
|
*/
|
|
|
|
#include "avrorouter.hh"
|
|
|
|
#include <maxscale/mysql_utils.h>
|
|
#include <jansson.h>
|
|
#include <stdio.h>
|
|
#include <limits.h>
|
|
#include <unistd.h>
|
|
#include <maxscale/log_manager.h>
|
|
#include <sys/stat.h>
|
|
#include <errno.h>
|
|
#include <maxscale/debug.h>
|
|
#include <string.h>
|
|
#include <strings.h>
|
|
#include <maxscale/alloc.h>
|
|
|
|
/**
|
|
* @brief Check whether the field is one that was generated by the avrorouter
|
|
*
|
|
* @param name Name of the field in the Avro schema
|
|
* @return True if field was not generated by the avrorouter
|
|
*/
|
|
static inline bool not_generated_field(const char* name)
|
|
{
|
|
return strcmp(name, avro_domain) && strcmp(name, avro_server_id) &&
|
|
strcmp(name, avro_sequence) && strcmp(name, avro_event_number) &&
|
|
strcmp(name, avro_event_type) && strcmp(name, avro_timestamp);
|
|
}
|
|
|
|
/**
|
|
* @brief Extract the field names from a JSON Avro schema file
|
|
*
|
|
* This function extracts the names of the columns from the JSON format Avro
|
|
* schema in the file @c filename. This function assumes that the field definitions
|
|
* in @c filename are in the same order as they are in the CREATE TABLE statement.
|
|
*
|
|
* @param filename The Avro schema in JSON format
|
|
* @param table The TABLE_CREATE object to populate
|
|
* @return True on success successfully, false on error
|
|
*/
|
|
bool json_extract_field_names(const char* filename, std::vector<Column>& columns)
|
|
{
|
|
bool rval = false;
|
|
json_error_t err;
|
|
err.text[0] = '\0';
|
|
json_t *obj, *arr;
|
|
|
|
if ((obj = json_load_file(filename, 0, &err)) && (arr = json_object_get(obj, "fields")))
|
|
{
|
|
if (json_is_array(arr))
|
|
{
|
|
int array_size = json_array_size(arr);
|
|
rval = true;
|
|
|
|
for (int i = 0; i < array_size; i++)
|
|
{
|
|
json_t* val = json_array_get(arr, i);
|
|
|
|
if (json_is_object(val))
|
|
{
|
|
|
|
json_t *name = json_object_get(val, "name");
|
|
|
|
if (name && json_is_string(name))
|
|
{
|
|
const char *name_str = json_string_value(name);
|
|
|
|
if (not_generated_field(name_str))
|
|
{
|
|
columns.emplace_back(name_str);
|
|
|
|
json_t* value;
|
|
|
|
if ((value = json_object_get(val, "real_type")) && json_is_string(value))
|
|
{
|
|
columns.back().type = json_string_value(value);
|
|
}
|
|
else
|
|
{
|
|
MXS_WARNING("No \"real_type\" value defined. Treating as unknown type field.");
|
|
}
|
|
|
|
if ((value = json_object_get(val, "length")) && json_is_integer(value))
|
|
{
|
|
columns.back().length = json_integer_value(value);
|
|
}
|
|
else
|
|
{
|
|
MXS_WARNING("No \"length\" value defined. Treating as default length field.");
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
MXS_ERROR("JSON value for \"name\" was not a string in "
|
|
"file '%s'.", filename);
|
|
rval = false;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
MXS_ERROR("JSON value for \"fields\" was not an array of objects in "
|
|
"file '%s'.", filename);
|
|
rval = false;
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
MXS_ERROR("JSON value for \"fields\" was not an array in file '%s'.", filename);
|
|
}
|
|
json_decref(obj);
|
|
}
|
|
else
|
|
{
|
|
MXS_ERROR("Failed to load JSON from file '%s': %s", filename,
|
|
obj && !arr ? "No 'fields' value in object." : err.text);
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
/**
|
|
* Extract the table definition from a CREATE TABLE statement
|
|
* @param sql The SQL statement
|
|
* @param size Length of the statement
|
|
* @return Pointer to the start of the definition of NULL if the query is
|
|
* malformed.
|
|
*/
|
|
static const char* get_table_definition(const char *sql, int len, int* size)
|
|
{
|
|
const char *rval = NULL;
|
|
const char *ptr = sql;
|
|
const char *end = sql + len;
|
|
while (ptr < end && *ptr != '(')
|
|
{
|
|
ptr++;
|
|
}
|
|
|
|
/** We assume at least the parentheses are in the statement */
|
|
if (ptr < end - 2)
|
|
{
|
|
int depth = 0;
|
|
ptr++;
|
|
const char *start = ptr; // Skip first parenthesis
|
|
while (ptr < end)
|
|
{
|
|
switch (*ptr)
|
|
{
|
|
case '(':
|
|
depth++;
|
|
break;
|
|
|
|
case ')':
|
|
depth--;
|
|
break;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
|
|
/** We found the last closing parenthesis */
|
|
if (depth < 0)
|
|
{
|
|
*size = ptr - start;
|
|
rval = start;
|
|
break;
|
|
}
|
|
ptr++;
|
|
}
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
/**
|
|
* Extract the table name from a CREATE TABLE statement
|
|
* @param sql SQL statement
|
|
* @param dest Destination where the table name is extracted. Must be at least
|
|
* MYSQL_TABLE_MAXLEN bytes long.
|
|
* @return True if extraction was successful
|
|
*/
|
|
static bool get_table_name(const char* sql, char* dest)
|
|
{
|
|
bool rval = false;
|
|
const char* ptr = strchr(sql, '(');
|
|
|
|
if (ptr)
|
|
{
|
|
ptr--;
|
|
while (*ptr == '`' || isspace(*ptr))
|
|
{
|
|
ptr--;
|
|
}
|
|
|
|
const char* end = ptr + 1;
|
|
while (*ptr != '`' && *ptr != '.' && !isspace(*ptr))
|
|
{
|
|
ptr--;
|
|
}
|
|
ptr++;
|
|
memcpy(dest, ptr, end - ptr);
|
|
dest[end - ptr] = '\0';
|
|
rval = true;
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
/**
|
|
* Extract the database name from a CREATE TABLE statement
|
|
*
|
|
* @param sql SQL statement
|
|
* @param dest Destination where the database name is extracted. Must be at least
|
|
* MYSQL_DATABASE_MAXLEN bytes long.
|
|
*
|
|
* @return True if a database name was extracted
|
|
*/
|
|
static bool get_database_name(const char* sql, char* dest)
|
|
{
|
|
bool rval = false;
|
|
const char* ptr = strchr(sql, '(');
|
|
|
|
if (ptr)
|
|
{
|
|
ptr--;
|
|
while (ptr >= sql && (*ptr == '`' || isspace(*ptr)))
|
|
{
|
|
ptr--;
|
|
}
|
|
|
|
while (ptr >= sql && *ptr != '`' && *ptr != '.' && !isspace(*ptr))
|
|
{
|
|
ptr--;
|
|
}
|
|
|
|
while (ptr >= sql && (*ptr == '`' || isspace(*ptr)))
|
|
{
|
|
ptr--;
|
|
}
|
|
|
|
if (ptr >= sql && *ptr == '.')
|
|
{
|
|
// The query defines an explicit database
|
|
|
|
while (ptr >= sql && (*ptr == '`' || *ptr == '.' || isspace(*ptr)))
|
|
{
|
|
ptr--;
|
|
}
|
|
|
|
const char* end = ptr + 1;
|
|
|
|
while (ptr >= sql && *ptr != '`' && *ptr != '.' && !isspace(*ptr))
|
|
{
|
|
ptr--;
|
|
}
|
|
|
|
ptr++;
|
|
memcpy(dest, ptr, end - ptr);
|
|
dest[end - ptr] = '\0';
|
|
rval = true;
|
|
}
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
void make_valid_avro_identifier(char* ptr)
|
|
{
|
|
while (*ptr)
|
|
{
|
|
if (!isalnum(*ptr) && *ptr != '_')
|
|
{
|
|
*ptr = '_';
|
|
}
|
|
ptr++;
|
|
}
|
|
}
|
|
|
|
const char* next_field_definition(const char* ptr)
|
|
{
|
|
int depth = 0;
|
|
bool quoted = false;
|
|
char qchar;
|
|
|
|
while (*ptr)
|
|
{
|
|
if (!quoted)
|
|
{
|
|
if (*ptr == '(')
|
|
{
|
|
depth++;
|
|
}
|
|
else if (*ptr == ')')
|
|
{
|
|
depth--;
|
|
}
|
|
else if (*ptr == '"' || *ptr == '\'')
|
|
{
|
|
qchar = *ptr;
|
|
quoted = true;
|
|
}
|
|
else if (*ptr == ',' && depth == 0 && !quoted)
|
|
{
|
|
ptr++;
|
|
break;
|
|
}
|
|
}
|
|
else if (qchar == *ptr)
|
|
{
|
|
quoted = false;
|
|
}
|
|
|
|
ptr++;
|
|
}
|
|
|
|
return ptr;
|
|
}
|
|
|
|
static const char *extract_field_name(const char* ptr, char* dest, size_t size)
|
|
{
|
|
bool bt = false;
|
|
|
|
while (*ptr && (isspace(*ptr) || (bt = *ptr == '`')))
|
|
{
|
|
ptr++;
|
|
if (bt)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!bt)
|
|
{
|
|
if (strncasecmp(ptr, "constraint", 10) == 0 || strncasecmp(ptr, "index", 5) == 0 ||
|
|
strncasecmp(ptr, "key", 3) == 0 || strncasecmp(ptr, "fulltext", 8) == 0 ||
|
|
strncasecmp(ptr, "spatial", 7) == 0 || strncasecmp(ptr, "foreign", 7) == 0 ||
|
|
strncasecmp(ptr, "unique", 6) == 0 || strncasecmp(ptr, "primary", 7) == 0)
|
|
{
|
|
// Found a keyword
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
const char *start = ptr;
|
|
|
|
if (!bt)
|
|
{
|
|
while (*ptr && !isspace(*ptr))
|
|
{
|
|
ptr++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
while (*ptr && *ptr != '`')
|
|
{
|
|
ptr++;
|
|
}
|
|
}
|
|
|
|
if (ptr > start)
|
|
{
|
|
/** Valid identifier */
|
|
size_t bytes = ptr - start;
|
|
|
|
memcpy(dest, start, bytes);
|
|
dest[bytes] = '\0';
|
|
|
|
make_valid_avro_identifier(dest);
|
|
ss_dassert(strlen(dest) > 0);
|
|
}
|
|
else
|
|
{
|
|
ptr = NULL;
|
|
}
|
|
|
|
return ptr;
|
|
}
|
|
|
|
int extract_type_length(const char* ptr, char *dest)
|
|
{
|
|
/** Skip any leading whitespace */
|
|
while (*ptr && (isspace(*ptr) || *ptr == '`'))
|
|
{
|
|
ptr++;
|
|
}
|
|
|
|
/** The field type definition starts here */
|
|
const char *start = ptr;
|
|
|
|
/** Skip characters until we either hit a whitespace character or the start
|
|
* of the length definition. */
|
|
while (*ptr && isalpha(*ptr))
|
|
{
|
|
ptr++;
|
|
}
|
|
|
|
/** Store type */
|
|
for (const char* c = start; c < ptr; c++)
|
|
{
|
|
*dest++ = tolower(*c);
|
|
}
|
|
|
|
*dest++ = '\0';
|
|
|
|
/** Skip whitespace */
|
|
while (*ptr && isspace(*ptr))
|
|
{
|
|
ptr++;
|
|
}
|
|
|
|
int rval = -1; // No length defined
|
|
|
|
/** Start of length definition */
|
|
if (*ptr == '(')
|
|
{
|
|
ptr++;
|
|
char *end;
|
|
int val = strtol(ptr, &end, 10);
|
|
|
|
if (*end == ')')
|
|
{
|
|
rval = val;
|
|
}
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
int count_columns(const char* ptr)
|
|
{
|
|
int i = 2;
|
|
|
|
while ((ptr = strchr(ptr, ',')))
|
|
{
|
|
ptr++;
|
|
i++;
|
|
}
|
|
|
|
return i;
|
|
}
|
|
|
|
/**
|
|
* Process a table definition into an array of column names
|
|
* @param nameptr table definition
|
|
* @return Number of processed columns or -1 on error
|
|
*/
|
|
static void process_column_definition(const char *nameptr, std::vector<Column>& columns)
|
|
{
|
|
char colname[512];
|
|
|
|
while ((nameptr = extract_field_name(nameptr, colname, sizeof(colname))))
|
|
{
|
|
char type[100] = "";
|
|
int len = extract_type_length(nameptr, type);
|
|
nameptr = next_field_definition(nameptr);
|
|
fix_reserved_word(colname);
|
|
columns.emplace_back(colname, type, len);
|
|
}
|
|
}
|
|
|
|
TableCreateEvent* table_create_from_schema(const char* file, const char* db,
|
|
const char* table, int version)
|
|
{
|
|
TableCreateEvent* newtable = NULL;
|
|
std::vector<Column> columns;
|
|
|
|
if (json_extract_field_names(file, columns))
|
|
{
|
|
newtable = new (std::nothrow)TableCreateEvent(db, table, version, std::move(columns));
|
|
}
|
|
|
|
return newtable;
|
|
}
|
|
|
|
int resolve_table_version(const char* db, const char* table)
|
|
{
|
|
int version = 0;
|
|
char buf[PATH_MAX + 1];
|
|
|
|
do
|
|
{
|
|
version++;
|
|
snprintf(buf, sizeof(buf), "%s.%s.%06d.avsc", db, table, version);
|
|
}
|
|
while (access(buf, F_OK) == 0);
|
|
|
|
return version;
|
|
}
|
|
|
|
/**
|
|
* @brief Handle a query event which contains a CREATE TABLE statement
|
|
*
|
|
* @param ident Table identifier in database.table format
|
|
* @param sql The CREATE TABLE statement
|
|
* @param len Length of @c sql
|
|
*
|
|
* @return New CREATE_TABLE object or NULL if an error occurred
|
|
*/
|
|
TableCreateEvent* table_create_alloc(char* ident, const char* sql, int len)
|
|
{
|
|
/** Extract the table definition so we can get the column names from it */
|
|
int stmt_len = 0;
|
|
const char* statement_sql = get_table_definition(sql, len, &stmt_len);
|
|
ss_dassert(statement_sql);
|
|
|
|
char* tbl_start = strchr(ident, '.');
|
|
ss_dassert(tbl_start);
|
|
*tbl_start++ = '\0';
|
|
|
|
char table[MYSQL_TABLE_MAXLEN + 1];
|
|
char database[MYSQL_DATABASE_MAXLEN + 1];
|
|
strcpy(database, ident);
|
|
strcpy(table, tbl_start);
|
|
|
|
std::vector<Column> columns;
|
|
process_column_definition(statement_sql, columns);
|
|
|
|
TableCreateEvent *rval = NULL;
|
|
|
|
if (!columns.empty())
|
|
{
|
|
int version = resolve_table_version(database, table);
|
|
rval = new (std::nothrow) TableCreateEvent(database, table, version, std::move(columns));
|
|
}
|
|
else
|
|
{
|
|
MXS_ERROR("No columns in a CREATE TABLE statement: %.*s", stmt_len, statement_sql);
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
static const char* TOK_CREATE[] =
|
|
{
|
|
"CREATE",
|
|
NULL
|
|
};
|
|
|
|
static const char* TOK_TABLE[] =
|
|
{
|
|
"TABLE",
|
|
NULL
|
|
};
|
|
|
|
static const char* TOK_GROUP_REPLACE[] =
|
|
{
|
|
"OR",
|
|
"REPLACE",
|
|
NULL
|
|
};
|
|
|
|
static const char* TOK_GROUP_EXISTS[] =
|
|
{
|
|
"IF",
|
|
"NOT",
|
|
"EXISTS",
|
|
NULL
|
|
};
|
|
|
|
/**
|
|
* Read one token (i.e. SQL keyword)
|
|
*/
|
|
static const char* get_token(const char* ptr, const char* end, char* dest)
|
|
{
|
|
while (ptr < end && isspace(*ptr))
|
|
{
|
|
ptr++;
|
|
}
|
|
|
|
const char* start = ptr;
|
|
|
|
while (ptr < end && !isspace(*ptr))
|
|
{
|
|
ptr++;
|
|
}
|
|
|
|
size_t len = ptr - start;
|
|
memcpy(dest, start, len);
|
|
dest[len] = '\0';
|
|
|
|
return ptr;
|
|
}
|
|
|
|
/**
|
|
* Consume one token
|
|
*/
|
|
static bool chomp_one_token(const char* expected, const char** ptr, const char* end, char* buf)
|
|
{
|
|
bool rval = false;
|
|
const char* next = get_token(*ptr, end, buf);
|
|
|
|
if (strcasecmp(buf, expected) == 0)
|
|
{
|
|
rval = true;
|
|
*ptr = next;
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
/**
|
|
* Consume all tokens in a group
|
|
*/
|
|
static bool chomp_tokens(const char** tokens, const char** ptr, const char* end, char* buf)
|
|
{
|
|
bool next = true;
|
|
bool rval = false;
|
|
|
|
do
|
|
{
|
|
next = false;
|
|
|
|
for (int i = 0; tokens[i]; i++)
|
|
{
|
|
if (chomp_one_token(tokens[i], ptr, end, buf))
|
|
{
|
|
rval = true;
|
|
next = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
while (next);
|
|
|
|
return rval;
|
|
}
|
|
|
|
/**
|
|
* Remove any extra characters from a string
|
|
*/
|
|
static void remove_extras(char* str)
|
|
{
|
|
char* end = strchr(str, '\0') - 1;
|
|
|
|
while (end > str && (*end == '`' || *end == ')' || *end == '('))
|
|
{
|
|
*end-- = '\0';
|
|
}
|
|
|
|
char* start = str;
|
|
|
|
while (start < end && (*start == '`' || *start == ')' || *start == '('))
|
|
{
|
|
start++;
|
|
}
|
|
|
|
size_t len = strlen(start);
|
|
|
|
memmove(str, start, len);
|
|
str[len] = '\0';
|
|
|
|
ss_dassert(strlen(str) == len);
|
|
}
|
|
|
|
static void remove_backticks(char* src)
|
|
{
|
|
char* dest = src;
|
|
|
|
while (*src)
|
|
{
|
|
if (*src != '`')
|
|
{
|
|
// Non-backtick character, keep it
|
|
*dest = *src;
|
|
dest++;
|
|
}
|
|
|
|
src++;
|
|
}
|
|
|
|
ss_dassert(dest == src || (*dest != '\0' && dest < src));
|
|
*dest = '\0';
|
|
}
|
|
|
|
/**
|
|
* Extract both tables from a `CREATE TABLE t1 LIKE t2` statement
|
|
*/
|
|
static bool extract_create_like_identifier(const char* sql, size_t len, char* target, char* source)
|
|
{
|
|
bool rval = false;
|
|
char buffer[len + 1];
|
|
buffer[0] = '\0';
|
|
const char* ptr = sql;
|
|
const char* end = ptr + sizeof(buffer);
|
|
|
|
if (chomp_tokens(TOK_CREATE, &ptr, end, buffer))
|
|
{
|
|
chomp_tokens(TOK_GROUP_REPLACE, &ptr, end, buffer);
|
|
|
|
if (chomp_tokens(TOK_TABLE, &ptr, end, buffer))
|
|
{
|
|
chomp_tokens(TOK_GROUP_EXISTS, &ptr, end, buffer);
|
|
|
|
// Read the target table name
|
|
ptr = get_token(ptr, end, buffer);
|
|
strcpy(target, buffer);
|
|
|
|
// Skip the LIKE token
|
|
ptr = get_token(ptr, end, buffer);
|
|
|
|
// Read the source table name
|
|
ptr = get_token(ptr, end, buffer);
|
|
remove_extras(buffer);
|
|
strcpy(source, buffer);
|
|
rval = true;
|
|
}
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
/**
|
|
* Create a table from another table
|
|
*/
|
|
TableCreateEvent* table_create_copy(Avro *router, const char* sql, size_t len, const char* db)
|
|
{
|
|
TableCreateEvent* rval = NULL;
|
|
char target[MYSQL_TABLE_MAXLEN + 1] = "";
|
|
char source[MYSQL_TABLE_MAXLEN + 1] = "";
|
|
|
|
if (extract_create_like_identifier(sql, len, target, source))
|
|
{
|
|
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2] = "";
|
|
|
|
if (strchr(source, '.') == NULL)
|
|
{
|
|
strcpy(table_ident, db);
|
|
strcat(table_ident, ".");
|
|
}
|
|
|
|
strcat(table_ident, source);
|
|
|
|
auto it = router->created_tables.find(table_ident);
|
|
|
|
if (it != router->created_tables.end())
|
|
{
|
|
rval = new (std::nothrow) TableCreateEvent(*it->second);
|
|
char* table = strchr(target, '.');
|
|
table = table ? table + 1 : target;
|
|
rval->table = table;
|
|
rval->version = 1;
|
|
rval->was_used = false;
|
|
}
|
|
else
|
|
{
|
|
MXS_ERROR("Could not find table '%s' that '%s' is being created from: %.*s",
|
|
table_ident, target, (int)len, sql);
|
|
}
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
static const char* get_next_def(const char* sql, const char* end)
|
|
{
|
|
int depth = 0;
|
|
while (sql < end)
|
|
{
|
|
if (*sql == ',' && depth == 0)
|
|
{
|
|
return sql + 1;
|
|
}
|
|
else if (*sql == '(')
|
|
{
|
|
depth++;
|
|
}
|
|
else if (*sql == ')')
|
|
{
|
|
depth--;
|
|
}
|
|
sql++;
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static const char* get_tok(const char* sql, int* toklen, const char* end)
|
|
{
|
|
const char *start = sql;
|
|
|
|
while (isspace(*start))
|
|
{
|
|
start++;
|
|
}
|
|
|
|
int len = 0;
|
|
int depth = 0;
|
|
while (start + len < end)
|
|
{
|
|
if (isspace(start[len]) && depth == 0)
|
|
{
|
|
*toklen = len;
|
|
return start;
|
|
}
|
|
else if (start[len] == '(')
|
|
{
|
|
depth++;
|
|
}
|
|
else if (start[len] == ')')
|
|
{
|
|
depth--;
|
|
}
|
|
|
|
len++;
|
|
}
|
|
|
|
if (len > 0 && start + len <= end)
|
|
{
|
|
*toklen = len;
|
|
return start;
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static void rskip_whitespace(const char* sql, const char** end)
|
|
{
|
|
const char* ptr = *end;
|
|
|
|
while (ptr > sql && isspace(*ptr))
|
|
{
|
|
ptr--;
|
|
}
|
|
|
|
*end = ptr;
|
|
}
|
|
|
|
static void rskip_token(const char* sql, const char** end)
|
|
{
|
|
const char* ptr = *end;
|
|
|
|
while (ptr > sql && !isspace(*ptr))
|
|
{
|
|
ptr--;
|
|
}
|
|
|
|
*end = ptr;
|
|
}
|
|
|
|
static bool get_placement_specifier(const char* sql, const char* end, const char** tgt, int* tgt_len)
|
|
{
|
|
bool rval = false;
|
|
ss_dassert(end > sql);
|
|
end--;
|
|
|
|
*tgt = NULL;
|
|
*tgt_len = 0;
|
|
|
|
// Skip any trailing whitespace
|
|
rskip_whitespace(sql, &end);
|
|
|
|
if (*end == '`')
|
|
{
|
|
// Identifier, possibly AFTER `column`
|
|
const char* id_end = end;
|
|
end--;
|
|
|
|
while (end > sql && *end != '`')
|
|
{
|
|
end--;
|
|
}
|
|
|
|
const char* id_start = end + 1;
|
|
ss_dassert(*end == '`' && *id_end == '`');
|
|
|
|
end--;
|
|
|
|
rskip_whitespace(sql, &end);
|
|
rskip_token(sql, &end);
|
|
|
|
// end points to the character _before_ the token
|
|
end++;
|
|
|
|
if (strncasecmp(end, "AFTER", 5) == 0)
|
|
{
|
|
// This column comes after the specified column
|
|
rval = true;
|
|
*tgt = id_start;
|
|
*tgt_len = id_end - id_start;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Something else, possibly FIRST or un-backtick'd AFTER
|
|
const char* id_end = end + 1; // Points to either a trailing space or one-after-the-end
|
|
rskip_token(sql, &end);
|
|
|
|
// end points to the character _before_ the token
|
|
end++;
|
|
|
|
if (strncasecmp(end, "FIRST", 5) == 0)
|
|
{
|
|
// Put this column first
|
|
rval = true;
|
|
}
|
|
else
|
|
{
|
|
const char* id_start = end + 1;
|
|
|
|
// Skip the whitespace and until the start of the current token
|
|
rskip_whitespace(sql, &end);
|
|
rskip_token(sql, &end);
|
|
|
|
// end points to the character _before_ the token
|
|
end++;
|
|
|
|
if (strncasecmp(end, "AFTER", 5) == 0)
|
|
{
|
|
// This column comes after the specified column
|
|
rval = true;
|
|
*tgt = id_start;
|
|
*tgt_len = id_end - id_start;
|
|
}
|
|
}
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
static bool tok_eq(const char *a, const char *b, size_t len)
|
|
{
|
|
size_t i = 0;
|
|
|
|
while (i < len)
|
|
{
|
|
if (tolower(a[i]) - tolower(b[i]) != 0)
|
|
{
|
|
return false;
|
|
}
|
|
i++;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
static void skip_whitespace(const char** saved)
|
|
{
|
|
const char* ptr = *saved;
|
|
|
|
while (*ptr && isspace(*ptr))
|
|
{
|
|
ptr++;
|
|
}
|
|
|
|
*saved = ptr;
|
|
}
|
|
|
|
static void skip_token(const char** saved)
|
|
{
|
|
const char* ptr = *saved;
|
|
|
|
while (*ptr && !isspace(*ptr) && *ptr != '(' && *ptr != '.')
|
|
{
|
|
ptr++;
|
|
}
|
|
|
|
*saved = ptr;
|
|
}
|
|
|
|
static void skip_non_backtick(const char** saved)
|
|
{
|
|
const char* ptr = *saved;
|
|
|
|
while (*ptr && *ptr != '`')
|
|
{
|
|
ptr++;
|
|
}
|
|
|
|
*saved = ptr;
|
|
}
|
|
|
|
const char* keywords[] =
|
|
{
|
|
"CREATE",
|
|
"DROP",
|
|
"ALTER",
|
|
"IF",
|
|
"EXISTS",
|
|
"REPLACE",
|
|
"OR",
|
|
"TABLE",
|
|
"NOT",
|
|
NULL
|
|
};
|
|
|
|
static bool token_is_keyword(const char* tok, int len)
|
|
{
|
|
for (int i = 0; keywords[i]; i++)
|
|
{
|
|
if (strncasecmp(keywords[i], tok, len) == 0)
|
|
{
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
void read_table_identifier(const char* db, const char *sql, const char *end, char *dest, int size)
|
|
{
|
|
const char* start;
|
|
int len = 0;
|
|
bool is_keyword = true;
|
|
|
|
while (is_keyword)
|
|
{
|
|
skip_whitespace(&sql); // Leading whitespace
|
|
|
|
if (*sql == '`')
|
|
{
|
|
// Quoted identifier, not a keyword
|
|
is_keyword = false;
|
|
sql++;
|
|
start = sql;
|
|
skip_non_backtick(&sql);
|
|
len = sql - start;
|
|
sql++;
|
|
}
|
|
else
|
|
{
|
|
start = sql;
|
|
skip_token(&sql);
|
|
len = sql - start;
|
|
is_keyword = token_is_keyword(start, len);
|
|
}
|
|
}
|
|
|
|
skip_whitespace(&sql); // Space after first identifier
|
|
|
|
if (*sql != '.')
|
|
{
|
|
// No explicit database
|
|
snprintf(dest, size, "%s.%.*s", db, len, start);
|
|
}
|
|
else
|
|
{
|
|
// Explicit database, skip the period
|
|
sql++;
|
|
skip_whitespace(&sql); // Space after first identifier
|
|
|
|
const char* id_start;
|
|
int id_len = 0;
|
|
|
|
if (*sql == '`')
|
|
{
|
|
sql++;
|
|
id_start = sql;
|
|
skip_non_backtick(&sql);
|
|
id_len = sql - id_start;
|
|
sql++;
|
|
}
|
|
else
|
|
{
|
|
id_start = sql;
|
|
skip_token(&sql);
|
|
id_len = sql - id_start;
|
|
}
|
|
|
|
snprintf(dest, size, "%.*s.%.*s", len, start, id_len, id_start);
|
|
}
|
|
}
|
|
|
|
void make_avro_token(char* dest, const char* src, int length)
|
|
{
|
|
while (length > 0 && (*src == '(' || *src == ')' || *src == '`' || isspace(*src)))
|
|
{
|
|
src++;
|
|
length--;
|
|
}
|
|
|
|
const char *end = src;
|
|
|
|
for (int i = 0; i < length; i++)
|
|
{
|
|
if (end[i] == '(' || end[i] == ')' || end[i] == '`' || isspace(end[i]))
|
|
{
|
|
length = i;
|
|
break;
|
|
}
|
|
}
|
|
|
|
memcpy(dest, src, length);
|
|
dest[length] = '\0';
|
|
fix_reserved_word(dest);
|
|
}
|
|
|
|
static bool not_column_operation(const char* tok, int len)
|
|
{
|
|
const char* keywords[] =
|
|
{
|
|
"PRIMARY",
|
|
"UNIQUE",
|
|
"FULLTEXT",
|
|
"SPATIAL",
|
|
"PERIOD",
|
|
"PRIMARY",
|
|
"KEY",
|
|
"KEYS",
|
|
"INDEX",
|
|
"FOREIGN",
|
|
"CONSTRAINT",
|
|
NULL
|
|
};
|
|
|
|
for (int i = 0; keywords[i]; i++)
|
|
{
|
|
if (tok_eq(tok, keywords[i], strlen(keywords[i])))
|
|
{
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
bool table_create_alter(TableCreateEvent *create, const char *sql, const char *end)
|
|
{
|
|
const char *tbl = strcasestr(sql, "table"), *def;
|
|
|
|
if ((def = strchr(tbl, ' ')))
|
|
{
|
|
int len = 0;
|
|
const char *tok = get_tok(def, &len, end);
|
|
|
|
if (tok)
|
|
{
|
|
MXS_INFO("Alter table '%.*s'; %.*s\n", len, tok, (int)(end - sql), sql);
|
|
def = tok + len;
|
|
}
|
|
|
|
int updates = 0;
|
|
|
|
while (tok && (tok = get_tok(tok + len, &len, end)))
|
|
{
|
|
const char *ptok = tok;
|
|
int plen = len;
|
|
tok = get_tok(tok + len, &len, end);
|
|
|
|
if (tok)
|
|
{
|
|
if (not_column_operation(tok, len))
|
|
{
|
|
MXS_INFO("Statement doesn't affect columns, not processing: %s", sql);
|
|
return true;
|
|
}
|
|
else if (tok_eq(tok, "column", len))
|
|
{
|
|
// Skip the optional COLUMN keyword
|
|
tok = get_tok(tok + len, &len, end);
|
|
}
|
|
|
|
char avro_token[len + 1];
|
|
make_avro_token(avro_token, tok, len);
|
|
|
|
if (tok_eq(ptok, "add", plen))
|
|
{
|
|
|
|
bool is_new = true;
|
|
|
|
for (auto it = create->columns.begin(); it != create->columns.end(); it++)
|
|
{
|
|
if (it->name == avro_token)
|
|
{
|
|
is_new = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (is_new)
|
|
{
|
|
char field_type[200] = ""; // Enough to hold all types
|
|
int field_length = extract_type_length(tok + len, field_type);
|
|
create->columns.emplace_back(std::string(avro_token),
|
|
std::string(field_type),
|
|
field_length);
|
|
updates++;
|
|
}
|
|
tok = get_next_def(tok, end);
|
|
len = 0;
|
|
}
|
|
else if (tok_eq(ptok, "drop", plen))
|
|
{
|
|
for (auto it = create->columns.begin(); it != create->columns.end(); it++)
|
|
{
|
|
if (it->name == avro_token)
|
|
{
|
|
create->columns.erase(it);
|
|
break;
|
|
}
|
|
}
|
|
|
|
updates++;
|
|
|
|
tok = get_next_def(tok, end);
|
|
len = 0;
|
|
}
|
|
else if (tok_eq(ptok, "change", plen))
|
|
{
|
|
for (auto it = create->columns.begin(); it != create->columns.end(); it++)
|
|
{
|
|
if (it->name == avro_token)
|
|
{
|
|
if ((tok = get_tok(tok + len, &len, end)))
|
|
{
|
|
char avro_token[len + 1];
|
|
make_avro_token(avro_token, tok, len);
|
|
char field_type[200] = ""; // Enough to hold all types
|
|
int field_length = extract_type_length(tok + len, field_type);
|
|
it->name = avro_token;
|
|
it->type = field_type;
|
|
it->length = field_length;
|
|
updates++;
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
tok = get_next_def(tok, end);
|
|
len = 0;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
/** Only increment the create version if it has an associated .avro
|
|
* file. The .avro file is only created if it is actually used. */
|
|
if (updates > 0 && create->was_used)
|
|
{
|
|
create->version++;
|
|
create->was_used = false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* @brief Read the fully qualified name of the table
|
|
*
|
|
* @param ptr Pointer to the start of the event payload
|
|
* @param post_header_len Length of the event specific header, 8 or 6 bytes
|
|
* @param dest Destination where the string is stored
|
|
* @param len Size of destination
|
|
*/
|
|
void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *tbl_id, char* dest, size_t len)
|
|
{
|
|
uint64_t table_id = 0;
|
|
size_t id_size = post_header_len == 6 ? 4 : 6;
|
|
memcpy(&table_id, ptr, id_size);
|
|
ptr += id_size;
|
|
|
|
uint16_t flags = 0;
|
|
memcpy(&flags, ptr, 2);
|
|
ptr += 2;
|
|
|
|
uint8_t schema_name_len = *ptr++;
|
|
char schema_name[schema_name_len + 2];
|
|
|
|
/** Copy the NULL byte after the schema name */
|
|
memcpy(schema_name, ptr, schema_name_len + 1);
|
|
ptr += schema_name_len + 1;
|
|
|
|
uint8_t table_name_len = *ptr++;
|
|
char table_name[table_name_len + 2];
|
|
|
|
/** Copy the NULL byte after the table name */
|
|
memcpy(table_name, ptr, table_name_len + 1);
|
|
|
|
snprintf(dest, len, "%s.%s", schema_name, table_name);
|
|
*tbl_id = table_id;
|
|
}
|
|
|
|
/**
|
|
* @brief Extract a table map from a table map event
|
|
*
|
|
* This assumes that the complete event minus the replication header is stored
|
|
* at @p ptr
|
|
* @param ptr Pointer to the start of the event payload
|
|
* @param post_header_len Length of the event specific header, 8 or 6 bytes
|
|
* @return New TABLE_MAP or NULL if memory allocation failed
|
|
*/
|
|
TableMapEvent *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TableCreateEvent* create)
|
|
{
|
|
uint64_t table_id = 0;
|
|
size_t id_size = hdr_len == 6 ? 4 : 6;
|
|
memcpy(&table_id, ptr, id_size);
|
|
ptr += id_size;
|
|
|
|
uint16_t flags = 0;
|
|
memcpy(&flags, ptr, 2);
|
|
ptr += 2;
|
|
|
|
uint8_t schema_name_len = *ptr++;
|
|
char schema_name[schema_name_len + 2];
|
|
|
|
/** Copy the NULL byte after the schema name */
|
|
memcpy(schema_name, ptr, schema_name_len + 1);
|
|
ptr += schema_name_len + 1;
|
|
|
|
uint8_t table_name_len = *ptr++;
|
|
char table_name[table_name_len + 2];
|
|
|
|
/** Copy the NULL byte after the table name */
|
|
memcpy(table_name, ptr, table_name_len + 1);
|
|
ptr += table_name_len + 1;
|
|
|
|
uint64_t column_count = mxs_leint_value(ptr);
|
|
ptr += mxs_leint_bytes(ptr);
|
|
|
|
/** Column types */
|
|
uint8_t *column_types = ptr;
|
|
ptr += column_count;
|
|
|
|
size_t metadata_size = 0;
|
|
uint8_t* metadata = (uint8_t*)mxs_lestr_consume(&ptr, &metadata_size);
|
|
uint8_t *nullmap = ptr;
|
|
size_t nullmap_size = (column_count + 7) / 8;
|
|
|
|
Bytes cols(column_types, column_types + column_count);
|
|
Bytes nulls(nullmap, nullmap + nullmap_size);
|
|
Bytes meta(metadata, metadata + metadata_size);
|
|
return new (std::nothrow)TableMapEvent(schema_name, table_name, table_id, create->version,
|
|
std::move(cols), std::move(nulls), std::move(meta));
|
|
}
|