Files
MaxScale/server/modules/routing/avrorouter/avro_converter.cc
Markus Mäkelä 7c18696608 Abstract all row event processing
The code that handles the Avro files is now fully abstracted behind the
AvroConverter class that implements the RowEventHandler interface.

The code still has some avro specific behavior in a few places (parsing of
JSON files into TableCreate objects). This can be replaced, if needed, by
querying the master server for the CREATE TABLE statements.
2018-06-08 12:18:14 +03:00

393 lines
12 KiB
C++

/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "avro_converter.hh"
#include <limits.h>
#include <maxscale/alloc.h>
#include <maxscale/debug.h>
#include <maxscale/log_manager.h>
/**
* @brief Allocate an Avro table
*
* Create an Aro table and prepare it for writing.
* @param filepath Path to the created file
* @param json_schema The schema of the table in JSON format
*/
AvroTable* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec,
size_t block_size)
{
avro_file_writer_t avro_file;
avro_value_iface_t* avro_writer_iface;
avro_schema_t avro_schema;
if (avro_schema_from_json_length(json_schema, strlen(json_schema),
&avro_schema))
{
MXS_ERROR("Avro error: %s", avro_strerror());
MXS_INFO("Avro schema: %s", json_schema);
return NULL;
}
int rc = 0;
if (access(filepath, F_OK) == 0)
{
rc = avro_file_writer_open_bs(filepath, &avro_file, block_size);
}
else
{
rc = avro_file_writer_create_with_codec(filepath, avro_schema,
&avro_file, codec, block_size);
}
if (rc)
{
MXS_ERROR("Avro error: %s", avro_strerror());
avro_schema_decref(avro_schema);
return NULL;
}
if ((avro_writer_iface = avro_generic_class_from_schema(avro_schema)) == NULL)
{
MXS_ERROR("Avro error: %s", avro_strerror());
avro_schema_decref(avro_schema);
avro_file_writer_close(avro_file);
return NULL;
}
AvroTable* table = new (std::nothrow) AvroTable(avro_file, avro_writer_iface, avro_schema);
if (!table)
{
avro_file_writer_close(avro_file);
avro_value_iface_decref(avro_writer_iface);
avro_schema_decref(avro_schema);
MXS_OOM();
}
return table;
}
/**
* @brief Convert the MySQL column type to a compatible Avro type
*
* Some fields are larger than they need to be but since the Avro integer
* compression is quite efficient, the real loss in performance is negligible.
* @param type MySQL column type
* @return String representation of the Avro type
*/
static const char* column_type_to_avro_type(uint8_t type)
{
switch (type)
{
case TABLE_COL_TYPE_TINY:
case TABLE_COL_TYPE_SHORT:
case TABLE_COL_TYPE_LONG:
case TABLE_COL_TYPE_INT24:
case TABLE_COL_TYPE_BIT:
return "int";
case TABLE_COL_TYPE_FLOAT:
return "float";
case TABLE_COL_TYPE_DOUBLE:
case TABLE_COL_TYPE_NEWDECIMAL:
return "double";
case TABLE_COL_TYPE_NULL:
return "null";
case TABLE_COL_TYPE_LONGLONG:
return "long";
case TABLE_COL_TYPE_TINY_BLOB:
case TABLE_COL_TYPE_MEDIUM_BLOB:
case TABLE_COL_TYPE_LONG_BLOB:
case TABLE_COL_TYPE_BLOB:
return "bytes";
default:
return "string";
}
}
/**
* @brief Create a new JSON Avro schema from the table map and create table abstractions
*
* The schema will always have a GTID field and all records contain the current
* GTID of the transaction.
* @param map TABLE_MAP for this table
* @param create The TABLE_CREATE for this table
* @return New schema or NULL if an error occurred
*/
char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create)
{
if (map->version != create->version)
{
MXS_ERROR("Version mismatch for table %s.%s. Table map version is %d and "
"the table definition version is %d.", map->database.c_str(),
map->table.c_str(), map->version, create->version);
ss_dassert(!true); // Should not happen
return NULL;
}
json_error_t err;
memset(&err, 0, sizeof(err));
json_t *schema = json_object();
json_object_set_new(schema, "namespace", json_string("MaxScaleChangeDataSchema.avro"));
json_object_set_new(schema, "type", json_string("record"));
json_object_set_new(schema, "name", json_string("ChangeRecord"));
json_t *array = json_array();
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
avro_domain, "type", "int"));
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
avro_server_id, "type", "int"));
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
avro_sequence, "type", "int"));
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
avro_event_number, "type", "int"));
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
avro_timestamp, "type", "int"));
/** Enums and other complex types are defined with complete JSON objects
* instead of string values */
json_t *event_types = json_pack_ex(&err, 0, "{s:s, s:s, s:[s,s,s,s]}", "type", "enum",
"name", "EVENT_TYPES", "symbols", "insert",
"update_before", "update_after", "delete");
// Ownership of `event_types` is stolen when using the `o` format
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:o}", "name", avro_event_type,
"type", event_types));
for (uint64_t i = 0; i < map->columns() && i < create->columns.size(); i++)
{
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}",
"name", create->columns[i].name.c_str(),
"type", column_type_to_avro_type(map->column_types[i]),
"real_type", create->columns[i].type.c_str(),
"length", create->columns[i].length));
}
json_object_set_new(schema, "fields", array);
char* rval = json_dumps(schema, JSON_PRESERVE_ORDER);
json_decref(schema);
return rval;
}
/**
* @brief Save the Avro schema of a table to disk
*
* @param path Schema directory
* @param schema Schema in JSON format
* @param map Table map that @p schema represents
*/
void save_avro_schema(const char *path, const char* schema, const STableMapEvent& map,
const STableCreateEvent& create)
{
char filepath[PATH_MAX];
snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avsc", path,
map->database.c_str(), map->table.c_str(), map->version);
if (access(filepath, F_OK) != 0)
{
if (!create->was_used)
{
FILE *file = fopen(filepath, "wb");
if (file)
{
fprintf(file, "%s\n", schema);
fclose(file);
}
}
}
}
static const char* codec_to_string(enum mxs_avro_codec_type type)
{
switch (type)
{
case MXS_AVRO_CODEC_NULL:
return "null";
case MXS_AVRO_CODEC_DEFLATE:
return "deflate";
case MXS_AVRO_CODEC_SNAPPY:
return "snappy";
default:
ss_dassert(false);
return "null";
}
}
AvroConverter::AvroConverter(std::string avrodir, uint64_t block_size, mxs_avro_codec_type codec):
m_avrodir(avrodir),
m_block_size(block_size),
m_codec(codec)
{
}
bool AvroConverter::open_table(const STableMapEvent& map, const STableCreateEvent& create)
{
bool rval = false;
char* json_schema = json_new_schema_from_table(map, create);
if (json_schema)
{
char filepath[PATH_MAX + 1];
snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avro", m_avrodir.c_str(),
map->database.c_str(), map->table.c_str(), map->version);
SAvroTable avro_table(avro_table_alloc(filepath, json_schema,
codec_to_string(m_codec),
m_block_size));
if (avro_table)
{
m_open_tables[map->database + "." + map->table] = avro_table;
save_avro_schema(m_avrodir.c_str(), json_schema, map, create);
m_map = map;
m_create = create;
rval = true;
}
else
{
MXS_ERROR("Failed to open new Avro file for writing.");
}
MXS_FREE(json_schema);
}
else
{
MXS_ERROR("Failed to create JSON schema.");
}
return rval;
}
bool AvroConverter::prepare_table(std::string database, std::string table)
{
bool rval = false;
auto it = m_open_tables.find(database + "." + table);
if (it != m_open_tables.end())
{
m_writer_iface = it->second->avro_writer_iface;
m_avro_file = &it->second->avro_file;
rval = true;
}
return rval;
}
void AvroConverter::flush_tables()
{
for (auto it = m_open_tables.begin(); it != m_open_tables.end(); it++)
{
avro_file_writer_flush(it->second->avro_file);
}
}
void AvroConverter::prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type)
{
avro_generic_value_new(m_writer_iface, &m_record);
avro_value_get_by_name(&m_record, avro_domain, &m_field, NULL);
avro_value_set_int(&m_field, gtid.domain);
avro_value_get_by_name(&m_record, avro_server_id, &m_field, NULL);
avro_value_set_int(&m_field, gtid.server_id);
avro_value_get_by_name(&m_record, avro_sequence, &m_field, NULL);
avro_value_set_int(&m_field, gtid.seq);
avro_value_get_by_name(&m_record, avro_event_number, &m_field, NULL);
avro_value_set_int(&m_field, gtid.event_num);
avro_value_get_by_name(&m_record, avro_timestamp, &m_field, NULL);
avro_value_set_int(&m_field, hdr.timestamp);
avro_value_get_by_name(&m_record, avro_event_type, &m_field, NULL);
avro_value_set_enum(&m_field, event_type);
}
bool AvroConverter::commit(const gtid_pos_t& gtid)
{
bool rval = true;
if (avro_file_writer_append_value(*m_avro_file, &m_record))
{
MXS_ERROR("Failed to write value: %s", avro_strerror());
rval = false;
}
return rval;
}
void AvroConverter::column(int i, int32_t value)
{
set_active(i);
avro_value_set_int(&m_field, value);
}
void AvroConverter::column(int i, int64_t value)
{
set_active(i);
avro_value_set_long(&m_field, value);
}
void AvroConverter::column(int i, float value)
{
set_active(i);
avro_value_set_float(&m_field, value);
}
void AvroConverter::column(int i, double value)
{
set_active(i);
avro_value_set_double(&m_field, value);
}
void AvroConverter::column(int i, std::string value)
{
set_active(i);
avro_value_set_string(&m_field, value.c_str());
}
void AvroConverter::column(int i, uint8_t* value, int len)
{
set_active(i);
avro_value_set_bytes(&m_field, value, len);
}
void AvroConverter::column(int i)
{
set_active(i);
if (column_is_blob(m_map->column_types[i]))
{
uint8_t nullvalue = 0;
avro_value_set_bytes(&m_field, &nullvalue, 1);
}
else
{
avro_value_set_null(&m_field);
}
}
void AvroConverter::set_active(int i)
{
ss_debug(int rc =)avro_value_get_by_name(&m_record, m_create->columns[i].name.c_str(),
&m_field, NULL);
ss_dassert(rc == 0);
}