/* * Copyright (c) 2016 MariaDB Corporation Ab * * Use of this software is governed by the Business Source License included * in the LICENSE.TXT file and at www.mariadb.com/bsl11. * * Change Date: 2019-07-01 * * On the date above, in accordance with the Business Source License, use * of this software will be governed by version 2 or later of the General * Public License. */ /** * @file avro_schema.c - Avro schema related functions */ #include "avrorouter.h" #include #include #include #include #include #include #include #include #include #include #include #include /** * @brief Convert the MySQL column type to a compatible Avro type * * Some fields are larger than they need to be but since the Avro integer * compression is quite efficient, the real loss in performance is negligible. * @param type MySQL column type * @return String representation of the Avro type */ static const char* column_type_to_avro_type(uint8_t type) { switch (type) { case TABLE_COL_TYPE_TINY: case TABLE_COL_TYPE_SHORT: case TABLE_COL_TYPE_LONG: case TABLE_COL_TYPE_INT24: case TABLE_COL_TYPE_BIT: return "int"; case TABLE_COL_TYPE_FLOAT: return "float"; case TABLE_COL_TYPE_DOUBLE: case TABLE_COL_TYPE_NEWDECIMAL: return "double"; case TABLE_COL_TYPE_NULL: return "null"; case TABLE_COL_TYPE_LONGLONG: return "long"; case TABLE_COL_TYPE_TINY_BLOB: case TABLE_COL_TYPE_MEDIUM_BLOB: case TABLE_COL_TYPE_LONG_BLOB: case TABLE_COL_TYPE_BLOB: return "bytes"; default: return "string"; } } /** * @brief Create a new JSON Avro schema from the table map and create table abstractions * * The schema will always have a GTID field and all records contain the current * GTID of the transaction. * @param map TABLE_MAP for this table * @param create The TABLE_CREATE for this table * @return New schema or NULL if an error occurred */ char* json_new_schema_from_table(TABLE_MAP *map) { TABLE_CREATE *create = map->table_create; if (map->version != create->version) { MXS_ERROR("Version mismatch for table %s.%s. Table map version is %d and " "the table definition version is %d.", map->database, map->table, map->version, create->version); return NULL; } json_error_t err; memset(&err, 0, sizeof(err)); json_t *schema = json_object(); json_object_set_new(schema, "namespace", json_string("MaxScaleChangeDataSchema.avro")); json_object_set_new(schema, "type", json_string("record")); json_object_set_new(schema, "name", json_string("ChangeRecord")); json_t *array = json_array(); json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", avro_domain, "type", "int")); json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", avro_server_id, "type", "int")); json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", avro_sequence, "type", "int")); json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", avro_event_number, "type", "int")); json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", avro_timestamp, "type", "int")); /** Enums and other complex types are defined with complete JSON objects * instead of string values */ json_t *event_types = json_pack_ex(&err, 0, "{s:s, s:s, s:[s,s,s,s]}", "type", "enum", "name", "EVENT_TYPES", "symbols", "insert", "update_before", "update_after", "delete"); json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:o}", "name", avro_event_type, "type", event_types)); for (uint64_t i = 0; i < map->columns; i++) { json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", create->column_names[i], "type", column_type_to_avro_type(map->column_types[i]))); } json_object_set_new(schema, "fields", array); char* rval = json_dumps(schema, JSON_PRESERVE_ORDER); json_decref(schema); return rval; } /** * @brief Check whether the field is one that was generated by the avrorouter * * @param name Name of the field in the Avro schema * @return True if field was not generated by the avrorouter */ static inline bool not_generated_field(const char* name) { return strcmp(name, avro_domain) && strcmp(name, avro_server_id) && strcmp(name, avro_sequence) && strcmp(name, avro_event_number) && strcmp(name, avro_event_type) && strcmp(name, avro_timestamp); } /** * @brief Extract the field names from a JSON Avro schema file * * This function extracts the names of the columns from the JSON format Avro * schema in the file @c filename. This function assumes that the field definitions * in @c filename are in the same order as they are in the CREATE TABLE statement. * * @param filename The Avro schema in JSON format * @param table The TABLE_CREATE object to populate * @return True on success successfully, false on error */ bool json_extract_field_names(const char* filename, TABLE_CREATE *table) { bool rval = false; json_error_t err; err.text[0] = '\0'; json_t *obj, *arr; if ((obj = json_load_file(filename, 0, &err)) && (arr = json_object_get(obj, "fields"))) { ss_dassert(json_is_array(arr)); if (json_is_array(arr)) { int array_size = json_array_size(arr); table->column_names = (char**)MXS_MALLOC(sizeof(char*) * (array_size)); if (table->column_names) { int columns = 0; rval = true; for (int i = 0; i < array_size; i++) { json_t* val = json_array_get(arr, i); if (json_is_object(val)) { json_t *name = json_object_get(val, "name"); if (name && json_is_string(name)) { const char *name_str = json_string_value(name); if (not_generated_field(name_str)) { table->column_names[columns++] = MXS_STRDUP_A(name_str); } } else { MXS_ERROR("JSON value for \"name\" was not a string in " "file '%s'.", filename); rval = false; } } else { MXS_ERROR("JSON value for \"fields\" was not an array of objects in " "file '%s'.", filename); rval = false; } } table->columns = columns; } } else { MXS_ERROR("JSON value for \"fields\" was not an array in file '%s'.", filename); } json_decref(obj); } else { MXS_ERROR("Failed to load JSON from file '%s': %s", filename, obj && !arr ? "No 'fields' value in object." : err.text); } return rval; } /** * @brief Save the Avro schema of a table to disk * * @param path Schema directory * @param schema Schema in JSON format * @param map Table map that @p schema represents */ void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map) { char filepath[PATH_MAX]; snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avsc", path, map->database, map->table, map->version); if (access(filepath, F_OK) != 0) { if (!map->table_create->was_used) { FILE *file = fopen(filepath, "wb"); if (file) { fprintf(file, "%s\n", schema); map->table_create->was_used = true; fclose(file); } } } else { MXS_NOTICE("Schema version %d already exists: %s", map->version, filepath); } } /** * Extract the table definition from a CREATE TABLE statement * @param sql The SQL statement * @param size Length of the statement * @return Pointer to the start of the definition of NULL if the query is * malformed. */ static const char* get_table_definition(const char *sql, int* size) { const char *rval = NULL; const char *ptr = sql; const char *end = strchr(sql, '\0'); while (ptr < end && *ptr != '(') { ptr++; } /** We assume at least the parentheses are in the statement */ if (ptr < end - 2) { int depth = 0; ptr++; const char *start = ptr; // Skip first parenthesis while (ptr < end) { switch (*ptr) { case '(': depth++; break; case ')': depth--; break; default: break; } /** We found the last closing parenthesis */ if (depth < 0) { *size = ptr - start; rval = start; break; } ptr++; } } return rval; } /** * Extract the table name from a CREATE TABLE statement * @param sql SQL statement * @param dest Destination where the table name is extracted. Must be at least * MYSQL_TABLE_MAXLEN bytes long. * @return True if extraction was successful */ static bool get_table_name(const char* sql, char* dest) { bool rval = false; const char* ptr = strchr(sql, '('); if (ptr) { ptr--; while (*ptr == '`' || isspace(*ptr)) { ptr--; } const char* end = ptr + 1; while (*ptr != '`' && *ptr != '.' && !isspace(*ptr)) { ptr--; } ptr++; memcpy(dest, ptr, end - ptr); dest[end - ptr] = '\0'; rval = true; } return rval; } /** * Extract the database name from a CREATE TABLE statement * @param sql SQL statement * @param dest Destination where the database name is extracted. Must be at least * MYSQL_DATABASE_MAXLEN bytes long. * @return True if extraction was successful */ static bool get_database_name(const char* sql, char* dest) { bool rval = false; const char* ptr = strchr(sql, '('); if (ptr) { ptr--; while (*ptr == '`' || isspace(*ptr)) { ptr--; } while (*ptr != '`' && *ptr != '.' && !isspace(*ptr)) { ptr--; } while (*ptr == '`' || *ptr == '.' || isspace(*ptr)) { ptr--; } const char* end = ptr + 1; while (*ptr != '`' && *ptr != '.' && !isspace(*ptr)) { ptr--; } ptr++; memcpy(dest, ptr, end - ptr); dest[end - ptr] = '\0'; rval = true; } return rval; } void make_valid_avro_identifier(char* ptr) { while (*ptr) { if (!isalnum(*ptr) && *ptr != '_') { *ptr = '_'; } ptr++; } } const char* next_field_definition(const char* ptr) { int depth = 0; bool quoted = false; char qchar; while (*ptr) { if (!quoted) { if (*ptr == '(') { depth++; } else if (*ptr == ')') { depth--; } else if (*ptr == '"' || *ptr == '\'') { qchar = *ptr; quoted = true; } else if (*ptr == ',' && depth == 0 && !quoted) { ptr++; break; } } else if (qchar == *ptr) { quoted = false; } ptr++; } return ptr; } static const char *extract_field_name(const char* ptr, char* dest, size_t size) { bool bt = false; while (*ptr && (isspace(*ptr) || (bt = *ptr == '`'))) { ptr++; } if (strncasecmp(ptr, "constraint", 10) == 0 || strncasecmp(ptr, "index", 5) == 0 || strncasecmp(ptr, "key", 3) == 0 || strncasecmp(ptr, "fulltext", 8) == 0 || strncasecmp(ptr, "spatial", 7) == 0 || strncasecmp(ptr, "foreign", 7) == 0 || strncasecmp(ptr, "unique", 6) == 0 || strncasecmp(ptr, "primary", 7) == 0) { return NULL; } const char *start = ptr; if (!bt) { while (*ptr && !isspace(*ptr)) { ptr++; } } else { while (*ptr && *ptr != '`') { ptr++; } } if (ptr > start) { /** Valid identifier */ size_t bytes = ptr - start; if (bt) { bytes--; } memcpy(dest, start, bytes); dest[bytes] = '\0'; make_valid_avro_identifier(dest); ptr = next_field_definition(ptr); } else { ptr = NULL; } return ptr; } /** * Process a table definition into an array of column names * @param nameptr table definition * @return Number of processed columns or -1 on error */ static int process_column_definition(const char *nameptr, char*** dest) { /** Process columns in groups of 8 */ size_t chunks = 1; const size_t chunk_size = 8; int i = 0; char **names = MXS_MALLOC(sizeof(char*) * (chunks * chunk_size + 1)); if (names == NULL) { return -1; } char colname[512]; while ((nameptr = extract_field_name(nameptr, colname, sizeof(colname)))) { if (i >= chunks * chunk_size) { char **tmp = MXS_REALLOC(names, (++chunks * chunk_size + 1) * sizeof(char*)); if (tmp == NULL) { for (int x = 0; x < i; x++) { MXS_FREE(names[x]); } MXS_FREE(names); return -1; } names = tmp; } if ((names[i++] = MXS_STRDUP(colname)) == NULL) { for (int x = 0; x < i; x++) { MXS_FREE(names[x]); } MXS_FREE(names); return -1; } } *dest = names; return i; } TABLE_CREATE* table_create_from_schema(const char* file, const char* db, const char* table, int version) { db = MXS_STRDUP(db); table = MXS_STRDUP(table); TABLE_CREATE* newtable = (TABLE_CREATE*)MXS_MALLOC(sizeof(TABLE_CREATE)); if (!db || !table || !newtable) { MXS_FREE((void*)db); MXS_FREE((void*)table); MXS_FREE(newtable); return NULL; } newtable->table = (char*)table; newtable->database = (char*)db; newtable->version = version; newtable->was_used = true; if (!json_extract_field_names(file, newtable)) { MXS_FREE(newtable->table); MXS_FREE(newtable->database); MXS_FREE(newtable); newtable = NULL; } return newtable; } /** * @brief Handle a query event which contains a CREATE TABLE statement * @param sql Query SQL * @param db Database where this query was executed * @return New CREATE_TABLE object or NULL if an error occurred */ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db) { /** Extract the table definition so we can get the column names from it */ int stmt_len = 0; const char* statement_sql = get_table_definition(sql, &stmt_len); ss_dassert(statement_sql); char table[MYSQL_TABLE_MAXLEN + 1]; char database[MYSQL_DATABASE_MAXLEN + 1]; const char *db = event_db; MXS_DEBUG("Create table statement: %.*s", stmt_len, statement_sql); if (!get_table_name(sql, table)) { MXS_ERROR("Malformed CREATE TABLE statement, could not extract table name: %s", sql); return NULL; } /** The CREATE statement contains the database name */ if (strlen(db) == 0) { if (!get_database_name(sql, database)) { MXS_ERROR("Malformed CREATE TABLE statement, could not extract " "database name: %s", sql); return NULL; } db = database; } char **names = NULL; int n_columns = process_column_definition(statement_sql, &names); ss_dassert(n_columns > 0); /** We have appear to have a valid CREATE TABLE statement */ TABLE_CREATE *rval = NULL; if (n_columns > 0) { if ((rval = MXS_MALLOC(sizeof(TABLE_CREATE)))) { rval->version = 1; rval->was_used = false; rval->column_names = names; rval->columns = n_columns; rval->database = MXS_STRDUP(db); rval->table = MXS_STRDUP(table); } if (rval == NULL || rval->database == NULL || rval->table == NULL) { if (rval) { MXS_FREE(rval->database); MXS_FREE(rval->table); MXS_FREE(rval); } for (int i = 0; i < n_columns; i++) { MXS_FREE(names[i]); } MXS_FREE(names); rval = NULL; } } else { MXS_ERROR("No columns in a CREATE TABLE statement: %.*s", stmt_len, statement_sql); } return rval; } /** * Free a TABLE_CREATE structure * @param value Value to free */ void table_create_free(TABLE_CREATE* value) { if (value) { for (uint64_t i = 0; i < value->columns; i++) { MXS_FREE(value->column_names[i]); } MXS_FREE(value->column_names); MXS_FREE(value->table); MXS_FREE(value->database); MXS_FREE(value); } } static const char* get_next_def(const char* sql, const char* end) { int depth = 0; while (sql < end) { if (*sql == ',' && depth == 0) { return sql + 1; } else if (*sql == '(') { depth++; } else if (*sql == ')') { depth--; } sql++; } return NULL; } static const char* get_tok(const char* sql, int* toklen, const char* end) { const char *start = sql; while (isspace(*start)) { start++; } int len = 0; int depth = 0; while (start + len < end) { if (isspace(start[len]) && depth == 0) { *toklen = len; return start; } else if (start[len] == '(') { depth++; } else if (start[len] == ')') { depth--; } len++; } if (len > 0 && start + len <= end) { *toklen = len; return start; } return NULL; } static bool tok_eq(const char *a, const char *b, size_t len) { size_t i = 0; while (i < len) { if (tolower(a[i]) - tolower(b[i]) != 0) { return false; } i++; } return true; } void read_alter_identifier(const char *sql, const char *end, char *dest, int size) { int len = 0; const char *tok = get_tok(sql, &len, end); if (tok && (tok = get_tok(tok + len, &len, end)) && (tok = get_tok(tok + len, &len, end))) { snprintf(dest, size, "%.*s", len, tok); } } void make_avro_token(char* dest, const char* src, int length) { while (*src == '(' || *src == ')' || *src == '`' || isspace(*src)) { src++; length--; } const char *end = src; for (int i = 0; i < length; i++) { if (end[i] == '(' || end[i] == ')' || end[i] == '`' || isspace(end[i])) { length = i; break; } } memcpy(dest, src, length); dest[length] = '\0'; } bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end) { const char *tbl = strcasestr(sql, "table"), *def; if ((def = strchr(tbl, ' '))) { int len = 0; const char *tok = get_tok(def, &len, end); if (tok) { MXS_DEBUG("Altering table %.*s\n", len, tok); def = tok + len; } int updates = 0; while (tok && (tok = get_tok(tok + len, &len, end))) { const char *ptok = tok; int plen = len; tok = get_tok(tok + len, &len, end); if (tok) { if (tok_eq(ptok, "add", plen) && tok_eq(tok, "column", len)) { tok = get_tok(tok + len, &len, end); char ** tmp = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns + 1); ss_dassert(tmp); if (tmp == NULL) { return false; } create->column_names = tmp; char avro_token[len + 1]; make_avro_token(avro_token, tok, len); create->column_names[create->columns] = MXS_STRDUP_A(avro_token); create->columns++; updates++; tok = get_next_def(tok, end); len = 0; } else if (tok_eq(ptok, "drop", plen) && tok_eq(tok, "column", len)) { tok = get_tok(tok + len, &len, end); MXS_FREE(create->column_names[create->columns - 1]); char ** tmp = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns - 1); ss_dassert(tmp); if (tmp == NULL) { return false; } create->column_names = tmp; create->columns--; updates++; tok = get_next_def(tok, end); len = 0; } else if (tok_eq(ptok, "change", plen) && tok_eq(tok, "column", len)) { tok = get_tok(tok + len, &len, end); MXS_FREE(create->column_names[create->columns - 1]); create->column_names[create->columns - 1] = strndup(tok, len); updates++; tok = get_next_def(tok, end); len = 0; } } else { break; } } /** Only increment the create version if it has an associated .avro * file. The .avro file is only created if it is acutally used. */ if (updates > 0 && create->was_used) { create->version++; create->was_used = false; } } return true; } /** * @brief Read the fully qualified name of the table * * @param ptr Pointer to the start of the event payload * @param post_header_len Length of the event specific header, 8 or 6 bytes * @param dest Destination where the string is stored * @param len Size of destination */ void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *tbl_id, char* dest, size_t len) { uint64_t table_id = 0; size_t id_size = post_header_len == 6 ? 4 : 6; memcpy(&table_id, ptr, id_size); ptr += id_size; uint16_t flags = 0; memcpy(&flags, ptr, 2); ptr += 2; uint8_t schema_name_len = *ptr++; char schema_name[schema_name_len + 2]; /** Copy the NULL byte after the schema name */ memcpy(schema_name, ptr, schema_name_len + 1); ptr += schema_name_len + 1; uint8_t table_name_len = *ptr++; char table_name[table_name_len + 2]; /** Copy the NULL byte after the table name */ memcpy(table_name, ptr, table_name_len + 1); snprintf(dest, len, "%s.%s", schema_name, table_name); *tbl_id = table_id; } /** * @brief Extract a table map from a table map event * * This assumes that the complete event minus the replication header is stored * at @p ptr * @param ptr Pointer to the start of the event payload * @param post_header_len Length of the event specific header, 8 or 6 bytes * @return New TABLE_MAP or NULL if memory allocation failed */ TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create) { uint64_t table_id = 0; size_t id_size = hdr_len == 6 ? 4 : 6; memcpy(&table_id, ptr, id_size); ptr += id_size; uint16_t flags = 0; memcpy(&flags, ptr, 2); ptr += 2; uint8_t schema_name_len = *ptr++; char schema_name[schema_name_len + 2]; /** Copy the NULL byte after the schema name */ memcpy(schema_name, ptr, schema_name_len + 1); ptr += schema_name_len + 1; uint8_t table_name_len = *ptr++; char table_name[table_name_len + 2]; /** Copy the NULL byte after the table name */ memcpy(table_name, ptr, table_name_len + 1); ptr += table_name_len + 1; uint64_t column_count = mxs_leint_value(ptr); ptr += mxs_leint_bytes(ptr); /** Column types */ uint8_t *column_types = ptr; ptr += column_count; size_t metadata_size = 0; uint8_t* metadata = (uint8_t*)mxs_lestr_consume(&ptr, &metadata_size); uint8_t *nullmap = ptr; size_t nullmap_size = (column_count + 7) / 8; TABLE_MAP *map = MXS_MALLOC(sizeof(TABLE_MAP)); if (map) { map->id = table_id; map->version = create->version; map->flags = flags; ss_dassert(column_count == create->columns); map->columns = column_count; map->column_types = MXS_MALLOC(column_count); /** Allocate at least one byte for the metadata */ map->column_metadata = MXS_CALLOC(1, metadata_size + 1); map->column_metadata_size = metadata_size; map->null_bitmap = MXS_MALLOC(nullmap_size); map->database = MXS_STRDUP(schema_name); map->table = MXS_STRDUP(table_name); map->table_create = create; if (map->column_types && map->database && map->table && map->column_metadata && map->null_bitmap) { memcpy(map->column_types, column_types, column_count); memcpy(map->null_bitmap, nullmap, nullmap_size); memcpy(map->column_metadata, metadata, metadata_size); } else { MXS_FREE(map->null_bitmap); MXS_FREE(map->column_metadata); MXS_FREE(map->column_types); MXS_FREE(map->database); MXS_FREE(map->table); MXS_FREE(map); map = NULL; } } return map; } /** * @brief Free a table map * @param map Table map to free */ void table_map_free(TABLE_MAP *map) { if (map) { MXS_FREE(map->column_types); MXS_FREE(map->database); MXS_FREE(map->table); MXS_FREE(map); } } /** * @brief Map a table to a different ID * * This updates the table ID that the @c TABLE_MAP object is assigned with * * @param ptr Pointer to the start of a table map event * @param hdr_len Post-header length * @param map Table map to remap */ void table_map_remap(uint8_t *ptr, uint8_t hdr_len, TABLE_MAP *map) { uint64_t table_id = 0; size_t id_size = hdr_len == 6 ? 4 : 6; memcpy(&table_id, ptr, id_size); map->id = table_id; }