Make directory name equal to the library name

* avro -> avrorouter
* binlog -> binlogrouter
This commit is contained in:
Johan Wikman
2016-12-01 15:23:42 +02:00
parent a6df875495
commit aef6c7b099
22 changed files with 2 additions and 2 deletions

View File

@ -0,0 +1,11 @@
if(AVRO_FOUND AND JANSSON_FOUND)
include_directories(${AVRO_INCLUDE_DIR})
include_directories(${JANSSON_INCLUDE_DIR})
add_library(avrorouter SHARED avro.c ../binlog/binlog_common.c avro_client.c avro_schema.c avro_rbr.c avro_file.c avro_index.c)
set_target_properties(avrorouter PROPERTIES VERSION "1.0.0")
set_target_properties(avrorouter PROPERTIES LINK_FLAGS -Wl,-z,defs)
target_link_libraries(avrorouter maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro sqlite3 lzma)
install_module(avrorouter core)
else()
message(STATUS "No Avro C or Jansson libraries found, not building avrorouter.")
endif()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,249 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
/**
* @file avro_index.c - GTID to file position index
*
* This file contains functions used to store index information
* about GTID position in an Avro file. Since all records in the Avro file
* that avrorouter uses contain the common GTID field, we can use it to create
* an index. This can then be used to speed up retrieval of Avro records by
* seeking to the offset of the file and reading the record instead of iterating
* through all the records and looking for a matching record.
*
* The index is stored as an SQLite3 database.
*
* @verbatim
* Revision History
*
* Date Who Description
* 2/04/2016 Markus Mäkelä Initial implementation
*
* @endverbatim
*/
#include "avrorouter.h"
#include <maxscale/debug.h>
#include <glob.h>
void* safe_key_free(void *data);
static const char insert_template[] = "INSERT INTO gtid(domain, server_id, "
"sequence, avrofile, position) values (%lu, %lu, %lu, \"%s\", %ld);";
static void set_gtid(gtid_pos_t *gtid, json_t *row)
{
json_t *obj = json_object_get(row, avro_sequence);
ss_dassert(json_is_integer(obj));
gtid->seq = json_integer_value(obj);
obj = json_object_get(row, avro_server_id);
ss_dassert(json_is_integer(obj));
gtid->server_id = json_integer_value(obj);
obj = json_object_get(row, avro_domain);
ss_dassert(json_is_integer(obj));
gtid->domain = json_integer_value(obj);
}
int index_query_cb(void *data, int rows, char** values, char** names)
{
for (int i = 0; i < rows; i++)
{
if (values[i])
{
*((long*)data) = strtol(values[i], NULL, 10);
return 0;
}
}
return 0;
}
void avro_index_file(AVRO_INSTANCE *router, const char* filename)
{
MAXAVRO_FILE *file = maxavro_file_open(filename);
if (file)
{
char *name = strrchr(filename, '/');
ss_dassert(name);
if (name)
{
char sql[AVRO_SQL_BUFFER_SIZE];
char *errmsg;
long pos = -1;
name++;
snprintf(sql, sizeof(sql), "SELECT position FROM "INDEX_TABLE_NAME
" WHERE filename=\"%s\";", name);
if (sqlite3_exec(router->sqlite_handle, sql, index_query_cb, &pos, &errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to read last indexed position of file '%s': %s",
name, errmsg);
}
else if (pos > 0)
{
/** Continue from last position */
maxavro_record_set_pos(file, pos);
}
sqlite3_free(errmsg);
errmsg = NULL;
gtid_pos_t prev_gtid = {0, 0, 0, 0, 0};
if (sqlite3_exec(router->sqlite_handle, "BEGIN", NULL, NULL, &errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to start transaction: %s", errmsg);
}
sqlite3_free(errmsg);
do
{
json_t *row = maxavro_record_read_json(file);
if (row)
{
gtid_pos_t gtid;
set_gtid(&gtid, row);
if (prev_gtid.domain != gtid.domain ||
prev_gtid.server_id != gtid.server_id ||
prev_gtid.seq != gtid.seq)
{
snprintf(sql, sizeof(sql), insert_template, gtid.domain,
gtid.server_id, gtid.seq, name, file->block_start_pos);
if (sqlite3_exec(router->sqlite_handle, sql, NULL, NULL,
&errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to insert GTID %lu-%lu-%lu for %s "
"into index database: %s", gtid.domain,
gtid.server_id, gtid.seq, name, errmsg);
}
sqlite3_free(errmsg);
errmsg = NULL;
prev_gtid = gtid;
}
}
else
{
break;
}
}
while (maxavro_next_block(file));
if (sqlite3_exec(router->sqlite_handle, "COMMIT", NULL, NULL, &errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to commit transaction: %s", errmsg);
}
sqlite3_free(errmsg);
snprintf(sql, sizeof(sql), "INSERT OR REPLACE INTO "INDEX_TABLE_NAME
" values (%lu, \"%s\");", file->block_start_pos, name);
if (sqlite3_exec(router->sqlite_handle, sql, NULL, NULL,
&errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to update indexing progress: %s", errmsg);
}
sqlite3_free(errmsg);
errmsg = NULL;
}
else
{
MXS_ERROR("Malformed filename: %s", filename);
}
maxavro_file_close(file);
}
}
/**
* @brief Avro file indexing task
*
* Builds an index of filenames, GTIDs and positions in the Avro file.
* This allows all tables that contain a GTID to be fetched in an effiecent
* manner.
* @param data The router instance
*/
void avro_update_index(AVRO_INSTANCE* router)
{
char path[PATH_MAX + 1];
snprintf(path, sizeof(path), "%s/*.avro", router->avrodir);
glob_t files;
if (glob(path, 0, NULL, &files) != GLOB_NOMATCH)
{
for (int i = 0; i < files.gl_pathc; i++)
{
avro_index_file(router, files.gl_pathv[i]);
}
}
globfree(&files);
}
/** The SQL for the in-memory used_tables table */
static const char *insert_sql = "INSERT OR IGNORE INTO "MEMORY_TABLE_NAME
"(domain, server_id, sequence, binlog_timestamp, table_name)"
" VALUES (%lu, %lu, %lu, %lu, \"%s\")";
/**
* @brief Add a used table to the current transaction
*
* This adds a table to the in-memory table used to store tables used by
* transactions. These are later flushed to disk with the Avro records.
*
* @param router Avro router instance
* @param table Table to add
*/
void add_used_table(AVRO_INSTANCE* router, char* table)
{
char sql[AVRO_SQL_BUFFER_SIZE], *errmsg;
snprintf(sql, sizeof(sql), insert_sql, router->gtid.domain, router->gtid.server_id,
router->gtid.seq, router->gtid.timestamp, table);
if (sqlite3_exec(router->sqlite_handle, sql, NULL, NULL, &errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to add used table %s for GTID %lu-%lu-%lu: %s",
table, router->gtid.domain, router->gtid.server_id,
router->gtid.seq, errmsg);
}
sqlite3_free(errmsg);
}
/**
* @brief Update the tables used in a transaction
*
* This flushes the in-memory table to disk and should be called after the
* Avro records have been flushed to disk.
*
* @param router Avro router instance
*/
void update_used_tables(AVRO_INSTANCE* router)
{
char *errmsg;
if (sqlite3_exec(router->sqlite_handle, "INSERT INTO "USED_TABLES_TABLE_NAME
" SELECT * FROM "MEMORY_TABLE_NAME, NULL, NULL, &errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to transfer used table data from memory to disk: %s", errmsg);
}
sqlite3_free(errmsg);
if (sqlite3_exec(router->sqlite_handle, "DELETE FROM "MEMORY_TABLE_NAME,
NULL, NULL, &errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to transfer used table data from memory to disk: %s", errmsg);
}
sqlite3_free(errmsg);
}

View File

@ -0,0 +1,604 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/mysql_utils.h>
#include <jansson.h>
#include <maxscale/alloc.h>
#include "avrorouter.h"
#include <strings.h>
#define WRITE_EVENT 0
#define UPDATE_EVENT 1
#define UPDATE_EVENT_AFTER 2
#define DELETE_EVENT 3
static bool warn_decimal = false; /**< Remove when support for DECIMAL is added */
static bool warn_bit = false; /**< Remove when support for BIT is added */
static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET values
* larger than 255 is added */
uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create,
avro_value_t *record, uint8_t *ptr,
uint8_t *columns_present);
void notify_all_clients(AVRO_INSTANCE *router);
void add_used_table(AVRO_INSTANCE* router, const char* table);
/**
* @brief Get row event name
* @param event Event type
* @return String representation of the event
*/
static int get_event_type(uint8_t event)
{
switch (event)
{
case WRITE_ROWS_EVENTv0:
case WRITE_ROWS_EVENTv1:
case WRITE_ROWS_EVENTv2:
return WRITE_EVENT;
case UPDATE_ROWS_EVENTv0:
case UPDATE_ROWS_EVENTv1:
case UPDATE_ROWS_EVENTv2:
return UPDATE_EVENT;
case DELETE_ROWS_EVENTv0:
case DELETE_ROWS_EVENTv1:
case DELETE_ROWS_EVENTv2:
return DELETE_EVENT;
default:
MXS_ERROR("Unexpected event type: %d (%0x)", event, event);
return -1;
}
}
/**
* @brief Handle a table map event
*
* This converts a table map events into table meta data that will be used when
* converting binlogs to Avro format.
* @param router Avro router instance
* @param hdr Replication header
* @param ptr Pointer to event payload
*/
bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
{
bool rval = false;
uint64_t id;
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
int ev_len = router->event_type_hdr_lens[hdr->event_type];
read_table_info(ptr, ev_len, &id, table_ident, sizeof(table_ident));
TABLE_CREATE* create = hashtable_fetch(router->created_tables, table_ident);
if (create)
{
ss_dassert(create->columns > 0);
TABLE_MAP *old = hashtable_fetch(router->table_maps, table_ident);
if (old == NULL || old->version != create->version)
{
TABLE_MAP *map = table_map_alloc(ptr, ev_len, create);
if (map)
{
char* json_schema = json_new_schema_from_table(map);
if (json_schema)
{
char filepath[PATH_MAX + 1];
snprintf(filepath, sizeof(filepath), "%s/%s.%06d.avro",
router->avrodir, table_ident, map->version);
/** Close the file and open a new one */
hashtable_delete(router->open_tables, table_ident);
AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema);
if (avro_table)
{
bool notify = old != NULL;
if (old)
{
router->active_maps[old->id % sizeof(router->active_maps)] = NULL;
}
hashtable_delete(router->table_maps, table_ident);
hashtable_add(router->table_maps, (void*) table_ident, map);
hashtable_add(router->open_tables, table_ident, avro_table);
save_avro_schema(router->avrodir, json_schema, map);
router->active_maps[map->id % sizeof(router->active_maps)] = map;
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
rval = true;
if (notify)
{
notify_all_clients(router);
}
}
else
{
MXS_ERROR("Failed to open new Avro file for writing.");
}
MXS_FREE(json_schema);
}
else
{
MXS_ERROR("Failed to create JSON schema.");
}
}
else
{
MXS_ERROR("Failed to allocate new table map.");
}
}
else
{
ss_dassert(router->active_maps[old->id % sizeof(router->active_maps)] == old);
router->active_maps[old->id % sizeof(router->active_maps)] = NULL;
table_map_remap(ptr, ev_len, old);
router->active_maps[old->id % sizeof(router->active_maps)] = old;
MXS_DEBUG("Table %s re-mapped to %lu", table_ident, old->id);
/** No changes in the schema */
rval = true;
}
}
else
{
MXS_WARNING("Table map event for table '%s' read before the DDL statement "
"for that table was read. Data will not be processed for this "
"table until a DDL statement for it is read.", table_ident);
}
return rval;
}
/**
* @brief Set common field values and update the GTID subsequence counter
*
* This sets the domain, server ID, sequence and event position fields of
* the GTID. It also sets the event timestamp and event type fields.
*
* @param router Avro router instance
* @param hdr Replication header
* @param event_type Event type
* @param record Record to prepare
*/
static void prepare_record(AVRO_INSTANCE *router, REP_HEADER *hdr,
int event_type, avro_value_t *record)
{
avro_value_t field;
avro_value_get_by_name(record, avro_domain, &field, NULL);
avro_value_set_int(&field, router->gtid.domain);
avro_value_get_by_name(record, avro_server_id, &field, NULL);
avro_value_set_int(&field, router->gtid.server_id);
avro_value_get_by_name(record, avro_sequence, &field, NULL);
avro_value_set_int(&field, router->gtid.seq);
router->gtid.event_num++;
avro_value_get_by_name(record, avro_event_number, &field, NULL);
avro_value_set_int(&field, router->gtid.event_num);
avro_value_get_by_name(record, avro_timestamp, &field, NULL);
avro_value_set_int(&field, hdr->timestamp);
avro_value_get_by_name(record, avro_event_type, &field, NULL);
avro_value_set_enum(&field, event_type);
}
/**
* @brief Handle a single RBR row event
*
* These events contain the changes in the data. This function assumes that full
* row image is sent in every row event.
*
* @param router Avro router instance
* @param hdr Replication header
* @param ptr Pointer to the start of the event
* @return True on succcess, false on error
*/
bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
{
bool rval = false;
uint8_t *start = ptr;
uint8_t table_id_size = router->event_type_hdr_lens[hdr->event_type] == 6 ? 4 : 6;
uint64_t table_id = 0;
/** The first value is the ID where the table was mapped. This should be
* the same as the ID in the table map even which was processed before this
* row event. */
memcpy(&table_id, ptr, table_id_size);
ptr += table_id_size;
/** Replication flags, currently ignored for the most part. */
uint16_t flags = 0;
memcpy(&flags, ptr, 2);
ptr += 2;
if (table_id == TABLE_DUMMY_ID && flags & ROW_EVENT_END_STATEMENT)
{
/** This is an dummy event which should release all table maps. Right
* now we just return without processing the rows. */
return true;
}
/** Newer replication events have extra data stored in the header. MariaDB
* 10.1 does not use these and instead use the v1 events */
if (hdr->event_type > DELETE_ROWS_EVENTv1)
{
/** Version 2 row event, skip extra data */
uint16_t extra_len = 0;
memcpy(&extra_len, ptr, 2);
ptr += 2 + extra_len;
}
/** Number of columns in the table */
uint64_t ncolumns = leint_consume(&ptr);
/** If full row image is used, all columns are present. Currently only full
* row image is supported and thus the bitfield should be all ones. In
* the future partial row images could be used if the bitfield containing
* the columns that are present in this event is used. */
const int coldata_size = (ncolumns + 7) / 8;
uint8_t col_present[coldata_size];
memcpy(&col_present, ptr, coldata_size);
ptr += coldata_size;
/** Update events have the before and after images of the row. This can be
* used to calculate a "delta" of sorts if necessary. Currently we store
* both the before and the after images. */
uint8_t col_update[coldata_size];
if (hdr->event_type == UPDATE_ROWS_EVENTv1 ||
hdr->event_type == UPDATE_ROWS_EVENTv2)
{
memcpy(&col_update, ptr, coldata_size);
ptr += coldata_size;
}
/** There should always be a table map event prior to a row event.
* TODO: Make the active_maps dynamic */
TABLE_MAP *map = router->active_maps[table_id % sizeof(router->active_maps)];
ss_dassert(map);
if (map)
{
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database, map->table);
AVRO_TABLE* table = hashtable_fetch(router->open_tables, table_ident);
TABLE_CREATE* create = map->table_create;
if (table && create && ncolumns == map->columns)
{
avro_value_t record;
avro_generic_value_new(table->avro_writer_iface, &record);
/** Each event has one or more rows in it. The number of rows is not known
* beforehand so we must continue processing them until we reach the end
* of the event. */
int rows = 0;
while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN)
{
/** Add the current GTID and timestamp */
int event_type = get_event_type(hdr->event_type);
prepare_record(router, hdr, event_type, &record);
ptr = process_row_event_data(map, create, &record, ptr, col_present);
avro_file_writer_append_value(table->avro_file, &record);
/** Update rows events have the before and after images of the
* affected rows so we'll process them as another record with
* a different type */
if (event_type == UPDATE_EVENT)
{
prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record);
ptr = process_row_event_data(map, create, &record, ptr, col_present);
avro_file_writer_append_value(table->avro_file, &record);
}
rows++;
}
add_used_table(router, table_ident);
avro_value_decref(&record);
rval = true;
}
else if (table == NULL)
{
MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier"
" errors for more details.", map->database, map->table);
}
else if (create == NULL)
{
MXS_ERROR("Create table statement for %s.%s was not found from the "
"binary logs or the stored schema was not correct.",
map->database, map->table);
}
else
{
MXS_ERROR("Row event and table map event have different column counts."
" Only full row image is currently supported.");
}
}
else
{
MXS_ERROR("Row event for unknown table mapped to ID %lu. Data will not "
"be processed.", table_id);
}
return rval;
}
/**
* @brief Unpack numeric types
*
* Convert the stored value into an Avro value and pack it in the record.
*
* @param field Avro value in a record
* @param type Type of the field
* @param metadata Field metadata
* @param value Pointer to the start of the in-memory representation of the data
*/
void set_numeric_field_value(avro_value_t *field, uint8_t type, uint8_t *metadata, uint8_t *value)
{
int64_t i = 0;
switch (type)
{
case TABLE_COL_TYPE_TINY:
i = *value;
avro_value_set_int(field, i);
break;
case TABLE_COL_TYPE_SHORT:
memcpy(&i, value, 2);
avro_value_set_int(field, i);
break;
case TABLE_COL_TYPE_INT24:
memcpy(&i, value, 3);
avro_value_set_int(field, i);
break;
case TABLE_COL_TYPE_LONG:
memcpy(&i, value, 4);
avro_value_set_int(field, i);
break;
case TABLE_COL_TYPE_LONGLONG:
memcpy(&i, value, 8);
avro_value_set_int(field, i);
break;
case TABLE_COL_TYPE_FLOAT:
memcpy(&i, value, 4);
avro_value_set_float(field, (float)i);
break;
case TABLE_COL_TYPE_DOUBLE:
memcpy(&i, value, 8);
avro_value_set_float(field, (double)i);
break;
default:
break;
}
}
/**
* @brief Check if a bit is set
*
* @param ptr Pointer to start of bitfield
* @param columns Number of columns (bits)
* @param current_column Zero indexed column number
* @return True if the bit is set
*/
static bool bit_is_set(uint8_t *ptr, int columns, int current_column)
{
if (current_column >= 8)
{
ptr += current_column / 8;
current_column = current_column % 8;
}
return ((*ptr) & (1 << current_column));
}
/**
* @brief Get the length of the metadata for a particular field
*
* @param type Type of the field
* @return Length of the metadata for this field
*/
int get_metadata_len(uint8_t type)
{
switch (type)
{
case TABLE_COL_TYPE_STRING:
case TABLE_COL_TYPE_VAR_STRING:
case TABLE_COL_TYPE_VARCHAR:
case TABLE_COL_TYPE_DECIMAL:
case TABLE_COL_TYPE_NEWDECIMAL:
case TABLE_COL_TYPE_ENUM:
case TABLE_COL_TYPE_SET:
case TABLE_COL_TYPE_BIT:
return 2;
case TABLE_COL_TYPE_BLOB:
case TABLE_COL_TYPE_FLOAT:
case TABLE_COL_TYPE_DOUBLE:
case TABLE_COL_TYPE_DATETIME2:
case TABLE_COL_TYPE_TIMESTAMP2:
case TABLE_COL_TYPE_TIME2:
return 1;
default:
return 0;
}
}
/**
* @brief Extract the values from a single row in a row event
*
* @param map Table map event associated with this row
* @param create Table creation associated with this row
* @param record Avro record used for storing this row
* @param ptr Pointer to the start of the row data, should be after the row event header
* @param columns_present The bitfield holding the columns that are present for
* this row event. Currently this should be a bitfield which has all bits set.
* @return Pointer to the first byte after the current row event
*/
uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value_t *record,
uint8_t *ptr, uint8_t *columns_present)
{
int npresent = 0;
avro_value_t field;
long ncolumns = map->columns;
uint8_t *metadata = map->column_metadata;
size_t metadata_offset = 0;
/** BIT type values use the extra bits in the row event header */
int extra_bits = (((ncolumns + 7) / 8) * 8) - ncolumns;
/** Store the null value bitmap */
uint8_t *null_bitmap = ptr;
ptr += (ncolumns + 7) / 8;
for (long i = 0; i < map->columns && npresent < ncolumns; i++)
{
ss_dassert(create->columns == map->columns);
avro_value_get_by_name(record, create->column_names[i], &field, NULL);
if (bit_is_set(columns_present, ncolumns, i))
{
npresent++;
if (bit_is_set(null_bitmap, ncolumns, i))
{
avro_value_set_null(&field);
}
else if (column_is_fixed_string(map->column_types[i]))
{
/** ENUM and SET are stored as STRING types with the type stored
* in the metadata. */
if (fixed_string_is_enum(metadata[metadata_offset]))
{
uint8_t val[metadata[metadata_offset + 1]];
uint64_t bytes = unpack_enum(ptr, &metadata[metadata_offset], val);
char strval[32];
/** Right now only ENUMs/SETs with less than 256 values
* are printed correctly */
snprintf(strval, sizeof(strval), "%hhu", val[0]);
if (bytes > 1 && warn_large_enumset)
{
warn_large_enumset = true;
MXS_WARNING("ENUM/SET values larger than 255 values aren't supported.");
}
avro_value_set_string(&field, strval);
ptr += bytes;
}
else
{
uint8_t bytes = *ptr;
char str[bytes + 1];
memcpy(str, ptr + 1, bytes);
str[bytes] = '\0';
avro_value_set_string(&field, str);
ptr += bytes + 1;
}
}
else if (column_is_bit(map->column_types[i]))
{
uint64_t value = 0;
int width = metadata[metadata_offset] + metadata[metadata_offset + 1] * 8;
int bits_in_nullmap = MXS_MIN(width, extra_bits);
extra_bits -= bits_in_nullmap;
width -= bits_in_nullmap;
size_t bytes = width / 8;
// TODO: extract the bytes
if (!warn_bit)
{
warn_bit = true;
MXS_WARNING("BIT is not currently supported, values are stored as 0.");
}
avro_value_set_int(&field, value);
ptr += bytes;
}
else if (column_is_decimal(map->column_types[i]))
{
const int dec_dig = 9;
int precision = metadata[metadata_offset];
int decimals = metadata[metadata_offset + 1];
int dig_bytes[] = {0, 1, 1, 2, 2, 3, 3, 4, 4, 4};
int ipart = precision - decimals;
int ipart1 = ipart / dec_dig;
int fpart1 = decimals / dec_dig;
int ipart2 = ipart - ipart1 * dec_dig;
int fpart2 = decimals - fpart1 * dec_dig;
int ibytes = ipart1 * 4 + dig_bytes[ipart2];
int fbytes = fpart1 * 4 + dig_bytes[fpart2];
ptr += ibytes + fbytes;
// TODO: Add support for DECIMAL
if (!warn_decimal)
{
warn_decimal = true;
MXS_WARNING("DECIMAL is not currently supported, values are stored as 0.");
}
avro_value_set_int(&field, 0);
}
else if (column_is_variable_string(map->column_types[i]))
{
size_t sz;
char *str = lestr_consume(&ptr, &sz);
char buf[sz + 1];
memcpy(buf, str, sz);
buf[sz] = '\0';
avro_value_set_string(&field, buf);
}
else if (column_is_blob(map->column_types[i]))
{
uint8_t bytes = metadata[metadata_offset];
uint64_t len = 0;
memcpy(&len, ptr, bytes);
ptr += bytes;
avro_value_set_bytes(&field, ptr, len);
ptr += len;
}
else if (column_is_temporal(map->column_types[i]))
{
char buf[80];
struct tm tm;
ptr += unpack_temporal_value(map->column_types[i], ptr, &metadata[metadata_offset], &tm);
format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
avro_value_set_string(&field, buf);
}
/** All numeric types (INT, LONG, FLOAT etc.) */
else
{
uint8_t lval[16];
memset(lval, 0, sizeof(lval));
ptr += unpack_numeric_field(ptr, map->column_types[i],
&metadata[metadata_offset], lval);
set_numeric_field_value(&field, map->column_types[i], &metadata[metadata_offset], lval);
}
ss_dassert(metadata_offset <= map->column_metadata_size);
metadata_offset += get_metadata_len(map->column_types[i]);
}
}
return ptr;
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,319 @@
#pragma once
#ifndef _MXS_AVRO_H
#define _MXS_AVRO_H
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
/**
* MaxScale AVRO router
*
*/
#include <maxscale/cdefs.h>
#include <stdbool.h>
#include <stdint.h>
#include <blr_constants.h>
#include <maxscale/dcb.h>
#include <maxscale/service.h>
#include <maxscale/spinlock.h>
#include <maxscale/mysql_binlog.h>
#include <maxscale/users.h>
#include <avro.h>
#include <cdc.h>
#include <maxscale/pcre2.h>
#include <maxavro.h>
#include <binlog_common.h>
#include <sqlite3.h>
#include <maxscale/protocol/mysql.h>
MXS_BEGIN_DECLS
/** SQLite3 version 3.7.14 introduced the new v2 close interface */
#if SQLITE_VERSION_NUMBER < 3007014
#define sqlite3_close_v2 sqlite3_close
#endif
/**
* How often to call the router status function (seconds)
*/
#define AVRO_STATS_FREQ 60
#define AVRO_NSTATS_MINUTES 30
/**
* Avro block grouping defaults
*/
#define AVRO_DEFAULT_BLOCK_TRX_COUNT 1
#define AVRO_DEFAULT_BLOCK_ROW_COUNT 1000
#define MAX_MAPPED_TABLES 1024
#define GTID_TABLE_NAME "gtid"
#define USED_TABLES_TABLE_NAME "used_tables"
#define MEMORY_DATABASE_NAME "memory"
#define MEMORY_TABLE_NAME MEMORY_DATABASE_NAME".mem_used_tables"
#define INDEX_TABLE_NAME "indexing_progress"
/** Name of the file where the binlog to Avro conversion progress is stored */
#define AVRO_PROGRESS_FILE "avro-conversion.ini"
/** Buffer limits */
#define AVRO_SQL_BUFFER_SIZE 2048
/** Avro filename maxlen */
#ifdef NAME_MAX
#define AVRO_MAX_FILENAME_LEN NAME_MAX
#else
#define AVRO_MAX_FILENAME_LEN 255
#endif
static char *avro_client_states[] = { "Unregistered", "Registered", "Processing", "Errored" };
static char *avro_client_client_mode[] = { "Catch-up", "Busy", "Wait_for_data" };
static const char *avro_domain = "domain";
static const char *avro_server_id = "server_id";
static const char *avro_sequence = "sequence";
static const char *avro_event_number = "event_number";
static const char *avro_event_type = "event_type";
static const char *avro_timestamp = "timestamp";
static char *avro_client_ouput[] = { "Undefined", "JSON", "Avro" };
/** How a binlog file is closed */
typedef enum avro_binlog_end
{
AVRO_OK = 0, /**< A newer binlog file exists with a rotate event to that file */
AVRO_LAST_FILE, /**< Last binlog which is closed */
AVRO_OPEN_TRANSACTION, /**< The binlog ends with an open transaction */
AVRO_BINLOG_ERROR /**< An error occurred while processing the binlog file */
} avro_binlog_end_t;
/** How many numbers each table version has (db.table.000001.avro) */
#define TABLE_MAP_VERSION_DIGITS 6
/** Maximum version number*/
#define TABLE_MAP_VERSION_MAX 999999
/** Maximum column name length */
#define TABLE_MAP_MAX_NAME_LEN 64
/** How many bytes each thread tries to send */
#define AVRO_DATA_BURST_SIZE (32 * 1024)
/** A CREATE TABLE abstraction */
typedef struct table_create
{
uint64_t columns;
char **column_names;
char *table;
char *database;
int version; /**< How many versions of this table have been used */
bool was_used; /**< Has this schema been persisted to disk */
} TABLE_CREATE;
/** A representation of a table map event read from a binary log. A table map
* maps a table to a unique ID which can be used to match row events to table map
* events. The table map event tells us how the table is laid out and gives us
* some meta information on the columns. */
typedef struct table_map
{
uint64_t id;
uint64_t columns;
uint16_t flags;
uint8_t *column_types;
uint8_t *null_bitmap;
uint8_t *column_metadata;
size_t column_metadata_size;
TABLE_CREATE *table_create; /*< The definition of the table */
int version;
char version_string[TABLE_MAP_VERSION_DIGITS + 1];
char *table;
char *database;
} TABLE_MAP;
/**
* The statistics for this AVRO router instance
*/
typedef struct
{
int n_clients; /*< Number slave sessions created */
int n_reads; /*< Number of record reads */
uint64_t n_binlogs; /*< Number of binlog records from master */
uint64_t n_rotates; /*< Number of binlog rotate events */
int n_masterstarts; /*< Number of times connection restarted */
time_t lastReply;
uint64_t events[MAX_EVENT_TYPE_END + 1]; /*< Per event counters */
uint64_t lastsample;
int minno;
int minavgs[AVRO_NSTATS_MINUTES];
} AVRO_ROUTER_STATS;
/**
* Client statistics
*/
typedef struct
{
int n_events; /*< Number of events sent */
unsigned long n_bytes; /*< Number of bytes sent */
int n_requests; /*< Number of requests received */
int n_queries; /*< Number of queries */
int n_failed_read;
uint64_t lastsample;
int minno;
int minavgs[AVRO_NSTATS_MINUTES];
} AVRO_CLIENT_STATS;
typedef struct avro_table_t
{
char* filename; /*< Absolute filename */
char* json_schema; /*< JSON representation of the schema */
avro_file_writer_t avro_file; /*< Current Avro data file */
avro_value_iface_t *avro_writer_iface; /*< Avro C API writer interface */
avro_schema_t avro_schema; /*< Native Avro schema of the table */
} AVRO_TABLE;
/** Data format used when streaming data to the clients */
enum avro_data_format
{
AVRO_FORMAT_UNDEFINED,
AVRO_FORMAT_JSON,
AVRO_FORMAT_AVRO,
};
typedef struct gtid_pos
{
uint32_t timestamp; /*< GTID event timestamp */
uint64_t domain; /*< Replication domain */
uint64_t server_id; /*< Server ID */
uint64_t seq; /*< Sequence number */
uint64_t event_num; /*< Subsequence number, increases monotonically. This
* is an internal representation of the position of
* an event inside a GTID event and it is used to
* rebuild GTID events in the correct order. */
} gtid_pos_t;
/**
* The client structure used within this router.
* This represents the clients that are requesting AVRO files from MaxScale.
*/
typedef struct avro_client
{
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_top;
#endif
DCB *dcb; /*< The client DCB */
int state; /*< The state of this client */
enum avro_data_format format; /*< Stream JSON or Avro data */
char *uuid; /*< Client UUID */
SPINLOCK catch_lock; /*< Event catchup lock */
SPINLOCK file_lock; /*< Protects rses_deleted */
struct avro_instance *router; /*< Pointer to the owning router */
struct avro_client *next;
MAXAVRO_FILE *file_handle; /*< Current open file handle */
uint64_t last_sent_pos; /*< The last record we sent */
AVRO_CLIENT_STATS stats; /*< Slave statistics */
time_t connect_time; /*< Connect time of slave */
MAXAVRO_FILE avro_file; /*< Avro file struct */
char avro_binfile[AVRO_MAX_FILENAME_LEN + 1];
bool requested_gtid; /*< If the client requested */
gtid_pos_t gtid; /*< Current/requested GTID */
gtid_pos_t gtid_start; /*< First sent GTID */
unsigned int cstate; /*< Catch up state */
sqlite3 *sqlite_handle;
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;
#endif
} AVRO_CLIENT;
/**
* * The per instance data for the AVRO router.
* */
typedef struct avro_instance
{
SERVICE *service; /*< Pointer to the service using this router */
AVRO_CLIENT *clients; /*< Link list of all the CDC client connections */
SPINLOCK lock; /*< Spinlock for the instance data */
int initbinlog; /*< Initial binlog file number */
char *fileroot; /*< Root of binlog filename */
unsigned int state; /*< State of the AVRO router */
uint8_t lastEventReceived; /*< Last even received */
uint32_t lastEventTimestamp; /*< Timestamp from last event */
char *binlogdir; /*< The directory where the binlog files are stored */
char *avrodir; /*< The directory with the AVRO files */
char binlog_name[BINLOG_FNAMELEN + 1];
/*< Name of the current binlog file */
uint64_t binlog_position;
/*< last committed transaction position */
uint64_t current_pos;
/*< Current binlog position */
int binlog_fd; /*< File descriptor of the binlog file being read */
pcre2_code *create_table_re;
pcre2_code *alter_table_re;
uint8_t event_types;
uint8_t event_type_hdr_lens[MAX_EVENT_TYPE_END];
gtid_pos_t gtid;
TABLE_MAP *active_maps[MAX_MAPPED_TABLES];
HASHTABLE *table_maps;
HASHTABLE *open_tables;
HASHTABLE *created_tables;
sqlite3 *sqlite_handle;
char prevbinlog[BINLOG_FNAMELEN + 1];
int rotating; /*< Rotation in progress flag */
SPINLOCK fileslock; /*< Lock for the files queue above */
AVRO_ROUTER_STATS stats; /*< Statistics for this router */
int task_delay; /*< Delay in seconds until the next conversion takes place */
uint64_t trx_count; /*< Transactions processed */
uint64_t trx_target; /*< Minimum about of transactions that will trigger
* a flush of all tables */
uint64_t row_count; /*< Row events processed */
uint64_t row_target; /*< Minimum about of row events that will trigger
* a flush of all tables */
struct avro_instance *next;
} AVRO_INSTANCE;
extern void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id,
char* dest, size_t len);
extern TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create);
extern void table_map_free(TABLE_MAP *map);
extern TABLE_CREATE* table_create_alloc(const char* sql, const char* db);
extern void table_create_free(TABLE_CREATE* value);
extern bool table_create_save(TABLE_CREATE *create, const char *filename);
extern bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end);
extern void read_alter_identifier(const char *sql, const char *end, char *dest, int size);
extern int avro_client_handle_request(AVRO_INSTANCE *, AVRO_CLIENT *, GWBUF *);
extern void avro_client_rotate(AVRO_INSTANCE *router, AVRO_CLIENT *client, uint8_t *ptr);
extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd);
extern void avro_close_binlog(int fd);
extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router);
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema);
extern void avro_table_free(AVRO_TABLE *table);
extern void avro_flush_all_tables(AVRO_INSTANCE *router);
extern char* json_new_schema_from_table(TABLE_MAP *map);
extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);
extern bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
extern bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
extern void table_map_remap(uint8_t *ptr, uint8_t hdr_len, TABLE_MAP *map);
#define AVRO_CLIENT_UNREGISTERED 0x0000
#define AVRO_CLIENT_REGISTERED 0x0001
#define AVRO_CLIENT_REQUEST_DATA 0x0002
#define AVRO_CLIENT_ERRORED 0x0003
#define AVRO_CLIENT_MAXSTATE 0x0003
/**
* Client catch-up status
*/
#define AVRO_CS_BUSY 0x0001
#define AVRO_WAIT_DATA 0x0002
MXS_END_DECLS
#endif