Abstract all row event processing
The code that handles the Avro files is now fully abstracted behind the AvroConverter class that implements the RowEventHandler interface. The code still has some avro specific behavior in a few places (parsing of JSON files into TableCreate objects). This can be replaced, if needed, by querying the master server for the CREATE TABLE statements.
This commit is contained in:
parent
d094e93209
commit
7c18696608
@ -3,7 +3,8 @@ if(AVRO_FOUND AND JANSSON_FOUND)
|
||||
include_directories(${JANSSON_INCLUDE_DIR})
|
||||
|
||||
# The common avrorouter functionality
|
||||
add_library(avro-common SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc avro_schema.cc avro_rbr.cc avro_file.cc avro_index.cc)
|
||||
add_library(avro-common SHARED avro.cc ../binlogrouter/binlog_common.cc avro_client.cc
|
||||
avro_schema.cc avro_rbr.cc avro_file.cc avro_index.cc avro_converter.cc)
|
||||
set_target_properties(avro-common PROPERTIES VERSION "1.0.0")
|
||||
set_target_properties(avro-common PROPERTIES LINK_FLAGS -Wl,-z,defs)
|
||||
target_link_libraries(avro-common maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro lzma)
|
||||
|
@ -18,7 +18,6 @@
|
||||
|
||||
#include "avrorouter.hh"
|
||||
|
||||
#include <avro/errors.h>
|
||||
#include <ctype.h>
|
||||
#include <ini.h>
|
||||
#include <stdio.h>
|
||||
@ -47,6 +46,8 @@
|
||||
#include <maxscale/worker.hh>
|
||||
#include <binlog_common.h>
|
||||
|
||||
#include "avro_converter.hh"
|
||||
|
||||
using namespace maxscale;
|
||||
|
||||
#ifndef BINLOG_NAMEFMT
|
||||
@ -275,12 +276,16 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, sqlite3* handle, SERV
|
||||
trx_target(config_get_integer(params, "group_trx")),
|
||||
row_count(0),
|
||||
row_target(config_get_integer(params, "group_rows")),
|
||||
block_size(config_get_size(params, "block_size")),
|
||||
codec(static_cast<mxs_avro_codec_type>(config_get_enum(params, "codec", codec_values))),
|
||||
sqlite_handle(handle),
|
||||
task_handle(0),
|
||||
stats{0}
|
||||
{
|
||||
uint64_t block_size = config_get_size(params, "block_size");
|
||||
mxs_avro_codec_type codec = static_cast<mxs_avro_codec_type>(config_get_enum(params, "codec", codec_values));
|
||||
|
||||
// TODO: pass this as a parameter or something
|
||||
event_hander = new AvroConverter(avrodir, block_size, codec);
|
||||
|
||||
int pcreerr;
|
||||
size_t erroff;
|
||||
create_table_re = pcre2_compile((PCRE2_SPTR) create_table_regex, PCRE2_ZERO_TERMINATED,
|
||||
|
392
server/modules/routing/avrorouter/avro_converter.cc
Normal file
392
server/modules/routing/avrorouter/avro_converter.cc
Normal file
@ -0,0 +1,392 @@
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2020-01-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include "avro_converter.hh"
|
||||
|
||||
#include <limits.h>
|
||||
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/debug.h>
|
||||
#include <maxscale/log_manager.h>
|
||||
|
||||
/**
|
||||
* @brief Allocate an Avro table
|
||||
*
|
||||
* Create an Aro table and prepare it for writing.
|
||||
* @param filepath Path to the created file
|
||||
* @param json_schema The schema of the table in JSON format
|
||||
*/
|
||||
AvroTable* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec,
|
||||
size_t block_size)
|
||||
{
|
||||
avro_file_writer_t avro_file;
|
||||
avro_value_iface_t* avro_writer_iface;
|
||||
avro_schema_t avro_schema;
|
||||
|
||||
if (avro_schema_from_json_length(json_schema, strlen(json_schema),
|
||||
&avro_schema))
|
||||
{
|
||||
MXS_ERROR("Avro error: %s", avro_strerror());
|
||||
MXS_INFO("Avro schema: %s", json_schema);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int rc = 0;
|
||||
|
||||
if (access(filepath, F_OK) == 0)
|
||||
{
|
||||
rc = avro_file_writer_open_bs(filepath, &avro_file, block_size);
|
||||
}
|
||||
else
|
||||
{
|
||||
rc = avro_file_writer_create_with_codec(filepath, avro_schema,
|
||||
&avro_file, codec, block_size);
|
||||
}
|
||||
|
||||
if (rc)
|
||||
{
|
||||
MXS_ERROR("Avro error: %s", avro_strerror());
|
||||
avro_schema_decref(avro_schema);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if ((avro_writer_iface = avro_generic_class_from_schema(avro_schema)) == NULL)
|
||||
{
|
||||
MXS_ERROR("Avro error: %s", avro_strerror());
|
||||
avro_schema_decref(avro_schema);
|
||||
avro_file_writer_close(avro_file);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
AvroTable* table = new (std::nothrow) AvroTable(avro_file, avro_writer_iface, avro_schema);
|
||||
|
||||
if (!table)
|
||||
{
|
||||
avro_file_writer_close(avro_file);
|
||||
avro_value_iface_decref(avro_writer_iface);
|
||||
avro_schema_decref(avro_schema);
|
||||
MXS_OOM();
|
||||
}
|
||||
|
||||
return table;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Convert the MySQL column type to a compatible Avro type
|
||||
*
|
||||
* Some fields are larger than they need to be but since the Avro integer
|
||||
* compression is quite efficient, the real loss in performance is negligible.
|
||||
* @param type MySQL column type
|
||||
* @return String representation of the Avro type
|
||||
*/
|
||||
static const char* column_type_to_avro_type(uint8_t type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case TABLE_COL_TYPE_TINY:
|
||||
case TABLE_COL_TYPE_SHORT:
|
||||
case TABLE_COL_TYPE_LONG:
|
||||
case TABLE_COL_TYPE_INT24:
|
||||
case TABLE_COL_TYPE_BIT:
|
||||
return "int";
|
||||
|
||||
case TABLE_COL_TYPE_FLOAT:
|
||||
return "float";
|
||||
|
||||
case TABLE_COL_TYPE_DOUBLE:
|
||||
case TABLE_COL_TYPE_NEWDECIMAL:
|
||||
return "double";
|
||||
|
||||
case TABLE_COL_TYPE_NULL:
|
||||
return "null";
|
||||
|
||||
case TABLE_COL_TYPE_LONGLONG:
|
||||
return "long";
|
||||
|
||||
case TABLE_COL_TYPE_TINY_BLOB:
|
||||
case TABLE_COL_TYPE_MEDIUM_BLOB:
|
||||
case TABLE_COL_TYPE_LONG_BLOB:
|
||||
case TABLE_COL_TYPE_BLOB:
|
||||
return "bytes";
|
||||
|
||||
default:
|
||||
return "string";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Create a new JSON Avro schema from the table map and create table abstractions
|
||||
*
|
||||
* The schema will always have a GTID field and all records contain the current
|
||||
* GTID of the transaction.
|
||||
* @param map TABLE_MAP for this table
|
||||
* @param create The TABLE_CREATE for this table
|
||||
* @return New schema or NULL if an error occurred
|
||||
*/
|
||||
char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create)
|
||||
{
|
||||
if (map->version != create->version)
|
||||
{
|
||||
MXS_ERROR("Version mismatch for table %s.%s. Table map version is %d and "
|
||||
"the table definition version is %d.", map->database.c_str(),
|
||||
map->table.c_str(), map->version, create->version);
|
||||
ss_dassert(!true); // Should not happen
|
||||
return NULL;
|
||||
}
|
||||
|
||||
json_error_t err;
|
||||
memset(&err, 0, sizeof(err));
|
||||
json_t *schema = json_object();
|
||||
json_object_set_new(schema, "namespace", json_string("MaxScaleChangeDataSchema.avro"));
|
||||
json_object_set_new(schema, "type", json_string("record"));
|
||||
json_object_set_new(schema, "name", json_string("ChangeRecord"));
|
||||
|
||||
json_t *array = json_array();
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
|
||||
avro_domain, "type", "int"));
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
|
||||
avro_server_id, "type", "int"));
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
|
||||
avro_sequence, "type", "int"));
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
|
||||
avro_event_number, "type", "int"));
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
|
||||
avro_timestamp, "type", "int"));
|
||||
|
||||
/** Enums and other complex types are defined with complete JSON objects
|
||||
* instead of string values */
|
||||
json_t *event_types = json_pack_ex(&err, 0, "{s:s, s:s, s:[s,s,s,s]}", "type", "enum",
|
||||
"name", "EVENT_TYPES", "symbols", "insert",
|
||||
"update_before", "update_after", "delete");
|
||||
|
||||
// Ownership of `event_types` is stolen when using the `o` format
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:o}", "name", avro_event_type,
|
||||
"type", event_types));
|
||||
|
||||
for (uint64_t i = 0; i < map->columns() && i < create->columns.size(); i++)
|
||||
{
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}",
|
||||
"name", create->columns[i].name.c_str(),
|
||||
"type", column_type_to_avro_type(map->column_types[i]),
|
||||
"real_type", create->columns[i].type.c_str(),
|
||||
"length", create->columns[i].length));
|
||||
}
|
||||
json_object_set_new(schema, "fields", array);
|
||||
char* rval = json_dumps(schema, JSON_PRESERVE_ORDER);
|
||||
json_decref(schema);
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Save the Avro schema of a table to disk
|
||||
*
|
||||
* @param path Schema directory
|
||||
* @param schema Schema in JSON format
|
||||
* @param map Table map that @p schema represents
|
||||
*/
|
||||
void save_avro_schema(const char *path, const char* schema, const STableMapEvent& map,
|
||||
const STableCreateEvent& create)
|
||||
{
|
||||
char filepath[PATH_MAX];
|
||||
snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avsc", path,
|
||||
map->database.c_str(), map->table.c_str(), map->version);
|
||||
|
||||
if (access(filepath, F_OK) != 0)
|
||||
{
|
||||
if (!create->was_used)
|
||||
{
|
||||
FILE *file = fopen(filepath, "wb");
|
||||
if (file)
|
||||
{
|
||||
fprintf(file, "%s\n", schema);
|
||||
fclose(file);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static const char* codec_to_string(enum mxs_avro_codec_type type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case MXS_AVRO_CODEC_NULL:
|
||||
return "null";
|
||||
case MXS_AVRO_CODEC_DEFLATE:
|
||||
return "deflate";
|
||||
case MXS_AVRO_CODEC_SNAPPY:
|
||||
return "snappy";
|
||||
default:
|
||||
ss_dassert(false);
|
||||
return "null";
|
||||
}
|
||||
}
|
||||
|
||||
AvroConverter::AvroConverter(std::string avrodir, uint64_t block_size, mxs_avro_codec_type codec):
|
||||
m_avrodir(avrodir),
|
||||
m_block_size(block_size),
|
||||
m_codec(codec)
|
||||
{
|
||||
}
|
||||
|
||||
bool AvroConverter::open_table(const STableMapEvent& map, const STableCreateEvent& create)
|
||||
{
|
||||
bool rval = false;
|
||||
char* json_schema = json_new_schema_from_table(map, create);
|
||||
|
||||
if (json_schema)
|
||||
{
|
||||
char filepath[PATH_MAX + 1];
|
||||
snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avro", m_avrodir.c_str(),
|
||||
map->database.c_str(), map->table.c_str(), map->version);
|
||||
|
||||
SAvroTable avro_table(avro_table_alloc(filepath, json_schema,
|
||||
codec_to_string(m_codec),
|
||||
m_block_size));
|
||||
|
||||
if (avro_table)
|
||||
{
|
||||
m_open_tables[map->database + "." + map->table] = avro_table;
|
||||
save_avro_schema(m_avrodir.c_str(), json_schema, map, create);
|
||||
m_map = map;
|
||||
m_create = create;
|
||||
rval = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Failed to open new Avro file for writing.");
|
||||
}
|
||||
MXS_FREE(json_schema);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Failed to create JSON schema.");
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
bool AvroConverter::prepare_table(std::string database, std::string table)
|
||||
{
|
||||
bool rval = false;
|
||||
auto it = m_open_tables.find(database + "." + table);
|
||||
|
||||
if (it != m_open_tables.end())
|
||||
{
|
||||
m_writer_iface = it->second->avro_writer_iface;
|
||||
m_avro_file = &it->second->avro_file;
|
||||
rval = true;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
void AvroConverter::flush_tables()
|
||||
{
|
||||
for (auto it = m_open_tables.begin(); it != m_open_tables.end(); it++)
|
||||
{
|
||||
avro_file_writer_flush(it->second->avro_file);
|
||||
}
|
||||
}
|
||||
|
||||
void AvroConverter::prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type)
|
||||
{
|
||||
avro_generic_value_new(m_writer_iface, &m_record);
|
||||
avro_value_get_by_name(&m_record, avro_domain, &m_field, NULL);
|
||||
avro_value_set_int(&m_field, gtid.domain);
|
||||
|
||||
avro_value_get_by_name(&m_record, avro_server_id, &m_field, NULL);
|
||||
avro_value_set_int(&m_field, gtid.server_id);
|
||||
|
||||
avro_value_get_by_name(&m_record, avro_sequence, &m_field, NULL);
|
||||
avro_value_set_int(&m_field, gtid.seq);
|
||||
|
||||
avro_value_get_by_name(&m_record, avro_event_number, &m_field, NULL);
|
||||
avro_value_set_int(&m_field, gtid.event_num);
|
||||
|
||||
avro_value_get_by_name(&m_record, avro_timestamp, &m_field, NULL);
|
||||
avro_value_set_int(&m_field, hdr.timestamp);
|
||||
|
||||
avro_value_get_by_name(&m_record, avro_event_type, &m_field, NULL);
|
||||
avro_value_set_enum(&m_field, event_type);
|
||||
}
|
||||
|
||||
bool AvroConverter::commit(const gtid_pos_t& gtid)
|
||||
{
|
||||
bool rval = true;
|
||||
|
||||
if (avro_file_writer_append_value(*m_avro_file, &m_record))
|
||||
{
|
||||
MXS_ERROR("Failed to write value: %s", avro_strerror());
|
||||
rval = false;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
void AvroConverter::column(int i, int32_t value)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_int(&m_field, value);
|
||||
}
|
||||
|
||||
void AvroConverter::column(int i, int64_t value)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_long(&m_field, value);
|
||||
}
|
||||
|
||||
void AvroConverter::column(int i, float value)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_float(&m_field, value);
|
||||
}
|
||||
|
||||
void AvroConverter::column(int i, double value)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_double(&m_field, value);
|
||||
}
|
||||
|
||||
void AvroConverter::column(int i, std::string value)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_string(&m_field, value.c_str());
|
||||
}
|
||||
|
||||
void AvroConverter::column(int i, uint8_t* value, int len)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_bytes(&m_field, value, len);
|
||||
}
|
||||
|
||||
void AvroConverter::column(int i)
|
||||
{
|
||||
set_active(i);
|
||||
|
||||
if (column_is_blob(m_map->column_types[i]))
|
||||
{
|
||||
uint8_t nullvalue = 0;
|
||||
avro_value_set_bytes(&m_field, &nullvalue, 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
avro_value_set_null(&m_field);
|
||||
}
|
||||
}
|
||||
|
||||
void AvroConverter::set_active(int i)
|
||||
{
|
||||
ss_debug(int rc =)avro_value_get_by_name(&m_record, m_create->columns[i].name.c_str(),
|
||||
&m_field, NULL);
|
||||
ss_dassert(rc == 0);
|
||||
}
|
77
server/modules/routing/avrorouter/avro_converter.hh
Normal file
77
server/modules/routing/avrorouter/avro_converter.hh
Normal file
@ -0,0 +1,77 @@
|
||||
#pragma once
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2020-01-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include "rpl_events.hh"
|
||||
#include "avrorouter.hh"
|
||||
|
||||
#include <avro.h>
|
||||
|
||||
struct AvroTable
|
||||
{
|
||||
AvroTable(avro_file_writer_t file, avro_value_iface_t* iface, avro_schema_t schema):
|
||||
avro_file(file),
|
||||
avro_writer_iface(iface),
|
||||
avro_schema(schema)
|
||||
{
|
||||
}
|
||||
|
||||
~AvroTable()
|
||||
{
|
||||
avro_file_writer_flush(avro_file);
|
||||
avro_file_writer_close(avro_file);
|
||||
avro_value_iface_decref(avro_writer_iface);
|
||||
avro_schema_decref(avro_schema);
|
||||
}
|
||||
|
||||
avro_file_writer_t avro_file; /*< Current Avro data file */
|
||||
avro_value_iface_t* avro_writer_iface; /*< Avro C API writer interface */
|
||||
avro_schema_t avro_schema; /*< Native Avro schema of the table */
|
||||
};
|
||||
|
||||
typedef std::tr1::shared_ptr<AvroTable> SAvroTable;
|
||||
typedef std::tr1::unordered_map<std::string, SAvroTable> AvroTables;
|
||||
|
||||
// Converts replicated events into CDC events
|
||||
class AvroConverter : public RowEventHandler
|
||||
{
|
||||
public:
|
||||
|
||||
AvroConverter(std::string avrodir, uint64_t block_size, mxs_avro_codec_type codec);
|
||||
bool open_table(const STableMapEvent& map, const STableCreateEvent& create);
|
||||
bool prepare_table(std::string database, std::string table);
|
||||
void flush_tables();
|
||||
void prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type);
|
||||
bool commit(const gtid_pos_t& gtid);
|
||||
void column(int i, int32_t value);
|
||||
void column(int i, int64_t value);
|
||||
void column(int i, float value);
|
||||
void column(int i, double value);
|
||||
void column(int i, std::string value);
|
||||
void column(int i, uint8_t* value, int len);
|
||||
void column(int i);
|
||||
|
||||
private:
|
||||
avro_value_iface_t* m_writer_iface;
|
||||
avro_file_writer_t* m_avro_file;
|
||||
avro_value_t m_record;
|
||||
avro_value_t m_field;
|
||||
std::string m_avrodir;
|
||||
AvroTables m_open_tables;
|
||||
uint64_t m_block_size;
|
||||
mxs_avro_codec_type m_codec;
|
||||
STableMapEvent m_map;
|
||||
STableCreateEvent m_create;
|
||||
|
||||
void set_active(int i);
|
||||
};
|
@ -91,68 +91,6 @@ void avro_close_binlog(int fd)
|
||||
close(fd);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Allocate an Avro table
|
||||
*
|
||||
* Create an Aro table and prepare it for writing.
|
||||
* @param filepath Path to the created file
|
||||
* @param json_schema The schema of the table in JSON format
|
||||
*/
|
||||
AvroTable* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec,
|
||||
size_t block_size)
|
||||
{
|
||||
avro_file_writer_t avro_file;
|
||||
avro_value_iface_t* avro_writer_iface;
|
||||
avro_schema_t avro_schema;
|
||||
|
||||
if (avro_schema_from_json_length(json_schema, strlen(json_schema),
|
||||
&avro_schema))
|
||||
{
|
||||
MXS_ERROR("Avro error: %s", avro_strerror());
|
||||
MXS_INFO("Avro schema: %s", json_schema);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int rc = 0;
|
||||
|
||||
if (access(filepath, F_OK) == 0)
|
||||
{
|
||||
rc = avro_file_writer_open_bs(filepath, &avro_file, block_size);
|
||||
}
|
||||
else
|
||||
{
|
||||
rc = avro_file_writer_create_with_codec(filepath, avro_schema,
|
||||
&avro_file, codec, block_size);
|
||||
}
|
||||
|
||||
if (rc)
|
||||
{
|
||||
MXS_ERROR("Avro error: %s", avro_strerror());
|
||||
avro_schema_decref(avro_schema);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if ((avro_writer_iface = avro_generic_class_from_schema(avro_schema)) == NULL)
|
||||
{
|
||||
MXS_ERROR("Avro error: %s", avro_strerror());
|
||||
avro_schema_decref(avro_schema);
|
||||
avro_file_writer_close(avro_file);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
AvroTable* table = new (std::nothrow) AvroTable(avro_file, avro_writer_iface, avro_schema);
|
||||
|
||||
if (!table)
|
||||
{
|
||||
avro_file_writer_close(avro_file);
|
||||
avro_value_iface_decref(avro_writer_iface);
|
||||
avro_schema_decref(avro_schema);
|
||||
MXS_OOM();
|
||||
}
|
||||
|
||||
return table;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Write a new ini file with current conversion status
|
||||
*
|
||||
@ -448,7 +386,7 @@ void notify_all_clients(Avro *router)
|
||||
void do_checkpoint(Avro *router)
|
||||
{
|
||||
update_used_tables(router);
|
||||
avro_flush_all_tables(router, AVROROUTER_FLUSH);
|
||||
router->event_hander->flush_tables();
|
||||
avro_save_conversion_state(router);
|
||||
notify_all_clients(router);
|
||||
router->row_count = router->trx_count = 0;
|
||||
@ -585,13 +523,9 @@ void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos
|
||||
}
|
||||
else if (hdr.event_type == MARIADB10_GTID_EVENT)
|
||||
{
|
||||
uint64_t n_sequence; /* 8 bytes */
|
||||
uint32_t domainid; /* 4 bytes */
|
||||
n_sequence = extract_field(ptr, 64);
|
||||
domainid = extract_field(ptr + 8, 32);
|
||||
router->gtid.domain = domainid;
|
||||
router->gtid.domain = extract_field(ptr + 8, 32);
|
||||
router->gtid.server_id = hdr.serverid;
|
||||
router->gtid.seq = n_sequence;
|
||||
router->gtid.seq = extract_field(ptr, 64);
|
||||
router->gtid.event_num = 0;
|
||||
router->gtid.timestamp = hdr.timestamp;
|
||||
}
|
||||
@ -788,26 +722,6 @@ void avro_load_metadata_from_schemas(Avro *router)
|
||||
globfree(&files);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Flush all Avro records to disk
|
||||
* @param router Avro router instance
|
||||
*/
|
||||
void avro_flush_all_tables(Avro *router, enum avrorouter_file_op flush)
|
||||
{
|
||||
for (auto it = router->open_tables.begin(); it != router->open_tables.end(); it++)
|
||||
{
|
||||
if (flush == AVROROUTER_FLUSH)
|
||||
{
|
||||
avro_file_writer_flush(it->second->avro_file);
|
||||
}
|
||||
else
|
||||
{
|
||||
ss_dassert(flush == AVROROUTER_SYNC);
|
||||
avro_file_writer_sync(it->second->avro_file);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Detection of table creation statements
|
||||
* @param router Avro router instance
|
||||
|
@ -13,7 +13,6 @@
|
||||
|
||||
#include "avrorouter.hh"
|
||||
|
||||
#include <avro/errors.h>
|
||||
#include <ctype.h>
|
||||
#include <ini.h>
|
||||
#include <stdio.h>
|
||||
@ -24,7 +23,6 @@
|
||||
#include <glob.h>
|
||||
#include <ini.h>
|
||||
#include <sys/stat.h>
|
||||
#include <avro/errors.h>
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/atomic.h>
|
||||
#include <maxscale/dcb.h>
|
||||
@ -286,7 +284,7 @@ bool converter_func(Worker::Call::action_t action, Avro* router)
|
||||
/** We reached end of file, flush unwritten records to disk */
|
||||
if (progress)
|
||||
{
|
||||
avro_flush_all_tables(router, AVROROUTER_FLUSH);
|
||||
router->event_hander->flush_tables();
|
||||
avro_save_conversion_state(router);
|
||||
logged = false;
|
||||
}
|
||||
|
@ -33,122 +33,6 @@ static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET va
|
||||
void notify_all_clients(Avro *router);
|
||||
void add_used_table(Avro* router, const char* table);
|
||||
|
||||
class AvroConverter : public RowEventHandler
|
||||
{
|
||||
public:
|
||||
|
||||
AvroConverter(const STableMapEvent& map, const STableCreateEvent& create, SAvroTable table):
|
||||
RowEventHandler(map, create),
|
||||
m_writer_iface(table->avro_writer_iface),
|
||||
m_avro_file(table->avro_file)
|
||||
{
|
||||
avro_generic_value_new(m_writer_iface, &m_record);
|
||||
}
|
||||
|
||||
~AvroConverter()
|
||||
{
|
||||
avro_value_decref(&m_record);
|
||||
}
|
||||
|
||||
void prepare(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type)
|
||||
{
|
||||
avro_value_get_by_name(&m_record, avro_domain, &m_field, NULL);
|
||||
avro_value_set_int(&m_field, gtid.domain);
|
||||
|
||||
avro_value_get_by_name(&m_record, avro_server_id, &m_field, NULL);
|
||||
avro_value_set_int(&m_field, gtid.server_id);
|
||||
|
||||
avro_value_get_by_name(&m_record, avro_sequence, &m_field, NULL);
|
||||
avro_value_set_int(&m_field, gtid.seq);
|
||||
|
||||
avro_value_get_by_name(&m_record, avro_event_number, &m_field, NULL);
|
||||
avro_value_set_int(&m_field, gtid.event_num);
|
||||
|
||||
avro_value_get_by_name(&m_record, avro_timestamp, &m_field, NULL);
|
||||
avro_value_set_int(&m_field, hdr.timestamp);
|
||||
|
||||
avro_value_get_by_name(&m_record, avro_event_type, &m_field, NULL);
|
||||
avro_value_set_enum(&m_field, event_type);
|
||||
}
|
||||
|
||||
bool commit()
|
||||
{
|
||||
bool rval = true;
|
||||
|
||||
if (avro_file_writer_append_value(m_avro_file, &m_record))
|
||||
{
|
||||
MXS_ERROR("Failed to write value: %s", avro_strerror());
|
||||
rval = false;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
void column(int i, int32_t value)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_int(&m_field, value);
|
||||
}
|
||||
|
||||
void column(int i, int64_t value)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_long(&m_field, value);
|
||||
}
|
||||
|
||||
void column(int i, float value)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_float(&m_field, value);
|
||||
}
|
||||
|
||||
void column(int i, double value)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_double(&m_field, value);
|
||||
}
|
||||
|
||||
void column(int i, std::string value)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_string(&m_field, value.c_str());
|
||||
}
|
||||
|
||||
void column(int i, uint8_t* value, int len)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_bytes(&m_field, value, len);
|
||||
}
|
||||
|
||||
void column(int i)
|
||||
{
|
||||
set_active(i);
|
||||
|
||||
if (column_is_blob(m_map->column_types[i]))
|
||||
{
|
||||
uint8_t nullvalue = 0;
|
||||
avro_value_set_bytes(&m_field, &nullvalue, 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
avro_value_set_null(&m_field);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
avro_value_iface_t* m_writer_iface;
|
||||
avro_file_writer_t& m_avro_file;
|
||||
avro_value_t m_record;
|
||||
avro_value_t m_field;
|
||||
|
||||
void set_active(int i)
|
||||
{
|
||||
ss_debug(int rc =)avro_value_get_by_name(&m_record, m_create->columns[i].name.c_str(),
|
||||
&m_field, NULL);
|
||||
ss_dassert(rc == 0);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create,
|
||||
RowEventHandler* conv, uint8_t *ptr,
|
||||
@ -185,22 +69,6 @@ static int get_event_type(uint8_t event)
|
||||
}
|
||||
}
|
||||
|
||||
static const char* codec_to_string(enum mxs_avro_codec_type type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case MXS_AVRO_CODEC_NULL:
|
||||
return "null";
|
||||
case MXS_AVRO_CODEC_DEFLATE:
|
||||
return "deflate";
|
||||
case MXS_AVRO_CODEC_SNAPPY:
|
||||
return "snappy";
|
||||
default:
|
||||
ss_dassert(false);
|
||||
return "null";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Handle a table map event
|
||||
*
|
||||
@ -238,50 +106,28 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
}
|
||||
}
|
||||
|
||||
char* json_schema = json_new_schema_from_table(map, create->second);
|
||||
|
||||
if (json_schema)
|
||||
if (router->event_hander->open_table(map, create->second))
|
||||
{
|
||||
char filepath[PATH_MAX + 1];
|
||||
snprintf(filepath, sizeof(filepath), "%s/%s.%06d.avro",
|
||||
router->avrodir.c_str(), table_ident, map->version);
|
||||
create->second->was_used = true;
|
||||
|
||||
SAvroTable avro_table(avro_table_alloc(filepath, json_schema,
|
||||
codec_to_string(router->codec),
|
||||
router->block_size));
|
||||
auto old = router->table_maps.find(table_ident);
|
||||
bool notify = old != router->table_maps.end();
|
||||
|
||||
if (avro_table)
|
||||
if (notify)
|
||||
{
|
||||
auto old = router->table_maps.find(table_ident);
|
||||
bool notify = old != router->table_maps.end();
|
||||
|
||||
if (notify)
|
||||
{
|
||||
router->active_maps.erase(old->second->id);
|
||||
}
|
||||
|
||||
router->table_maps[table_ident] = map;
|
||||
router->open_tables[table_ident] = avro_table;
|
||||
save_avro_schema(router->avrodir.c_str(), json_schema, map, create->second);
|
||||
router->active_maps[map->id] = map;
|
||||
ss_dassert(router->active_maps[id] == map);
|
||||
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
|
||||
rval = true;
|
||||
|
||||
if (notify)
|
||||
{
|
||||
notify_all_clients(router);
|
||||
}
|
||||
router->active_maps.erase(old->second->id);
|
||||
}
|
||||
else
|
||||
|
||||
router->table_maps[table_ident] = map;
|
||||
router->active_maps[map->id] = map;
|
||||
ss_dassert(router->active_maps[id] == map);
|
||||
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
|
||||
rval = true;
|
||||
|
||||
if (notify)
|
||||
{
|
||||
MXS_ERROR("Failed to open new Avro file for writing.");
|
||||
notify_all_clients(router);
|
||||
}
|
||||
MXS_FREE(json_schema);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Failed to create JSON schema.");
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -378,21 +224,13 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
STableMapEvent map = it->second;
|
||||
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
||||
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str());
|
||||
SAvroTable table;
|
||||
auto it = router->open_tables.find(table_ident);
|
||||
|
||||
if (it != router->open_tables.end())
|
||||
{
|
||||
table = it->second;
|
||||
}
|
||||
|
||||
bool ok = router->event_hander->prepare_table(map->database, map->table);
|
||||
auto create = router->created_tables.find(table_ident);
|
||||
|
||||
if (table && create != router->created_tables.end() &&
|
||||
if (ok && create != router->created_tables.end() &&
|
||||
ncolumns == map->columns() && create->second->columns.size() == map->columns())
|
||||
{
|
||||
AvroConverter conv(map, create->second, table);
|
||||
|
||||
/** Each event has one or more rows in it. The number of rows is not known
|
||||
* beforehand so we must continue processing them until we reach the end
|
||||
* of the event. */
|
||||
@ -407,18 +245,18 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
// Increment the event count for this transaction
|
||||
router->gtid.event_num++;
|
||||
|
||||
conv.prepare(router->gtid, *hdr, event_type);
|
||||
ptr = process_row_event_data(map, create->second, &conv, ptr, col_present, end);
|
||||
conv.commit();
|
||||
router->event_hander->prepare_row(router->gtid, *hdr, event_type);
|
||||
ptr = process_row_event_data(map, create->second, router->event_hander, ptr, col_present, end);
|
||||
router->event_hander->commit(router->gtid);
|
||||
|
||||
/** Update rows events have the before and after images of the
|
||||
* affected rows so we'll process them as another record with
|
||||
* a different type */
|
||||
if (event_type == UPDATE_EVENT)
|
||||
{
|
||||
conv.prepare(router->gtid, *hdr, UPDATE_EVENT_AFTER);
|
||||
ptr = process_row_event_data(map, create->second, &conv, ptr, col_present, end);
|
||||
conv.commit();
|
||||
router->event_hander->prepare_row(router->gtid, *hdr, UPDATE_EVENT_AFTER);
|
||||
ptr = process_row_event_data(map, create->second, router->event_hander, ptr, col_present, end);
|
||||
router->event_hander->commit(router->gtid);
|
||||
}
|
||||
|
||||
rows++;
|
||||
@ -427,7 +265,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
add_used_table(router, table_ident);
|
||||
rval = true;
|
||||
}
|
||||
else if (!table)
|
||||
else if (!ok)
|
||||
{
|
||||
MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier"
|
||||
" errors for more details.", map->database.c_str(), map->table.c_str());
|
||||
|
@ -30,112 +30,6 @@
|
||||
#include <strings.h>
|
||||
#include <maxscale/alloc.h>
|
||||
|
||||
/**
|
||||
* @brief Convert the MySQL column type to a compatible Avro type
|
||||
*
|
||||
* Some fields are larger than they need to be but since the Avro integer
|
||||
* compression is quite efficient, the real loss in performance is negligible.
|
||||
* @param type MySQL column type
|
||||
* @return String representation of the Avro type
|
||||
*/
|
||||
static const char* column_type_to_avro_type(uint8_t type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case TABLE_COL_TYPE_TINY:
|
||||
case TABLE_COL_TYPE_SHORT:
|
||||
case TABLE_COL_TYPE_LONG:
|
||||
case TABLE_COL_TYPE_INT24:
|
||||
case TABLE_COL_TYPE_BIT:
|
||||
return "int";
|
||||
|
||||
case TABLE_COL_TYPE_FLOAT:
|
||||
return "float";
|
||||
|
||||
case TABLE_COL_TYPE_DOUBLE:
|
||||
case TABLE_COL_TYPE_NEWDECIMAL:
|
||||
return "double";
|
||||
|
||||
case TABLE_COL_TYPE_NULL:
|
||||
return "null";
|
||||
|
||||
case TABLE_COL_TYPE_LONGLONG:
|
||||
return "long";
|
||||
|
||||
case TABLE_COL_TYPE_TINY_BLOB:
|
||||
case TABLE_COL_TYPE_MEDIUM_BLOB:
|
||||
case TABLE_COL_TYPE_LONG_BLOB:
|
||||
case TABLE_COL_TYPE_BLOB:
|
||||
return "bytes";
|
||||
|
||||
default:
|
||||
return "string";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Create a new JSON Avro schema from the table map and create table abstractions
|
||||
*
|
||||
* The schema will always have a GTID field and all records contain the current
|
||||
* GTID of the transaction.
|
||||
* @param map TABLE_MAP for this table
|
||||
* @param create The TABLE_CREATE for this table
|
||||
* @return New schema or NULL if an error occurred
|
||||
*/
|
||||
char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create)
|
||||
{
|
||||
if (map->version != create->version)
|
||||
{
|
||||
MXS_ERROR("Version mismatch for table %s.%s. Table map version is %d and "
|
||||
"the table definition version is %d.", map->database.c_str(),
|
||||
map->table.c_str(), map->version, create->version);
|
||||
ss_dassert(!true); // Should not happen
|
||||
return NULL;
|
||||
}
|
||||
|
||||
json_error_t err;
|
||||
memset(&err, 0, sizeof(err));
|
||||
json_t *schema = json_object();
|
||||
json_object_set_new(schema, "namespace", json_string("MaxScaleChangeDataSchema.avro"));
|
||||
json_object_set_new(schema, "type", json_string("record"));
|
||||
json_object_set_new(schema, "name", json_string("ChangeRecord"));
|
||||
|
||||
json_t *array = json_array();
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
|
||||
avro_domain, "type", "int"));
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
|
||||
avro_server_id, "type", "int"));
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
|
||||
avro_sequence, "type", "int"));
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
|
||||
avro_event_number, "type", "int"));
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
|
||||
avro_timestamp, "type", "int"));
|
||||
|
||||
/** Enums and other complex types are defined with complete JSON objects
|
||||
* instead of string values */
|
||||
json_t *event_types = json_pack_ex(&err, 0, "{s:s, s:s, s:[s,s,s,s]}", "type", "enum",
|
||||
"name", "EVENT_TYPES", "symbols", "insert",
|
||||
"update_before", "update_after", "delete");
|
||||
|
||||
// Ownership of `event_types` is stolen when using the `o` format
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:o}", "name", avro_event_type,
|
||||
"type", event_types));
|
||||
|
||||
for (uint64_t i = 0; i < map->columns() && i < create->columns.size(); i++)
|
||||
{
|
||||
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}",
|
||||
"name", create->columns[i].name.c_str(),
|
||||
"type", column_type_to_avro_type(map->column_types[i]),
|
||||
"real_type", create->columns[i].type.c_str(),
|
||||
"length", create->columns[i].length));
|
||||
}
|
||||
json_object_set_new(schema, "fields", array);
|
||||
char* rval = json_dumps(schema, JSON_PRESERVE_ORDER);
|
||||
json_decref(schema);
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Check whether the field is one that was generated by the avrorouter
|
||||
*
|
||||
@ -242,34 +136,6 @@ bool json_extract_field_names(const char* filename, std::vector<Column>& columns
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Save the Avro schema of a table to disk
|
||||
*
|
||||
* @param path Schema directory
|
||||
* @param schema Schema in JSON format
|
||||
* @param map Table map that @p schema represents
|
||||
*/
|
||||
void save_avro_schema(const char *path, const char* schema, STableMapEvent& map, STableCreateEvent& create)
|
||||
{
|
||||
char filepath[PATH_MAX];
|
||||
snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avsc", path,
|
||||
map->database.c_str(), map->table.c_str(), map->version);
|
||||
|
||||
if (access(filepath, F_OK) != 0)
|
||||
{
|
||||
if (!create->was_used)
|
||||
{
|
||||
FILE *file = fopen(filepath, "wb");
|
||||
if (file)
|
||||
{
|
||||
fprintf(file, "%s\n", schema);
|
||||
create->was_used = true;
|
||||
fclose(file);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the table definition from a CREATE TABLE statement
|
||||
* @param sql The SQL statement
|
||||
|
@ -19,14 +19,12 @@
|
||||
#include <stdint.h>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <tr1/memory>
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/dcb.h>
|
||||
#include <maxscale/service.h>
|
||||
#include <maxscale/spinlock.h>
|
||||
#include <maxscale/mysql_binlog.h>
|
||||
#include <maxscale/users.h>
|
||||
#include <avro.h>
|
||||
#include <cdc.h>
|
||||
#include <maxscale/pcre2.h>
|
||||
#include <maxavro.h>
|
||||
@ -123,28 +121,6 @@ typedef enum avro_binlog_end
|
||||
/** How many bytes each thread tries to send */
|
||||
#define AVRO_DATA_BURST_SIZE (32 * 1024)
|
||||
|
||||
struct AvroTable
|
||||
{
|
||||
AvroTable(avro_file_writer_t file, avro_value_iface_t* iface, avro_schema_t schema):
|
||||
avro_file(file),
|
||||
avro_writer_iface(iface),
|
||||
avro_schema(schema)
|
||||
{
|
||||
}
|
||||
|
||||
~AvroTable()
|
||||
{
|
||||
avro_file_writer_flush(avro_file);
|
||||
avro_file_writer_close(avro_file);
|
||||
avro_value_iface_decref(avro_writer_iface);
|
||||
avro_schema_decref(avro_schema);
|
||||
}
|
||||
|
||||
avro_file_writer_t avro_file; /*< Current Avro data file */
|
||||
avro_value_iface_t* avro_writer_iface; /*< Avro C API writer interface */
|
||||
avro_schema_t avro_schema; /*< Native Avro schema of the table */
|
||||
};
|
||||
|
||||
/** Data format used when streaming data to the clients */
|
||||
enum avro_data_format
|
||||
{
|
||||
@ -169,10 +145,7 @@ static const MXS_ENUM_VALUE codec_values[] =
|
||||
{NULL}
|
||||
};
|
||||
|
||||
typedef std::tr1::shared_ptr<AvroTable> SAvroTable;
|
||||
|
||||
typedef std::tr1::unordered_map<std::string, STableCreateEvent> CreatedTables;
|
||||
typedef std::tr1::unordered_map<std::string, SAvroTable> AvroTables;
|
||||
typedef std::tr1::unordered_map<std::string, STableMapEvent> MappedTables;
|
||||
typedef std::tr1::unordered_map<uint64_t, STableMapEvent> ActiveMaps;
|
||||
|
||||
@ -199,7 +172,6 @@ public:
|
||||
gtid_pos_t gtid;
|
||||
ActiveMaps active_maps;
|
||||
MappedTables table_maps;
|
||||
AvroTables open_tables;
|
||||
CreatedTables created_tables;
|
||||
uint64_t trx_count; /*< Transactions processed */
|
||||
uint64_t trx_target; /*< Minimum about of transactions that will trigger
|
||||
@ -207,10 +179,9 @@ public:
|
||||
uint64_t row_count; /*< Row events processed */
|
||||
uint64_t row_target; /*< Minimum about of row events that will trigger
|
||||
* a flush of all tables */
|
||||
uint64_t block_size; /**< Avro datablock size */
|
||||
enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */
|
||||
sqlite3* sqlite_handle;
|
||||
uint32_t task_handle; /**< Delayed task handle */
|
||||
RowEventHandler* event_hander;
|
||||
|
||||
struct
|
||||
{
|
||||
@ -288,10 +259,7 @@ extern void avro_client_rotate(Avro *router, AvroSession *client, uint8_t *ptr);
|
||||
extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd);
|
||||
extern void avro_close_binlog(int fd);
|
||||
extern avro_binlog_end_t avro_read_all_events(Avro *router);
|
||||
extern AvroTable* avro_table_alloc(const char* filepath, const char* json_schema,
|
||||
const char *codec, size_t block_size);
|
||||
extern char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create);
|
||||
extern void save_avro_schema(const char *path, const char* schema, STableMapEvent& map, STableCreateEvent& create);
|
||||
extern bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
|
||||
extern bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
|
||||
void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos);
|
||||
@ -299,20 +267,6 @@ REP_HEADER construct_header(uint8_t* ptr);
|
||||
bool avro_save_conversion_state(Avro *router);
|
||||
void avro_update_index(Avro* router);
|
||||
|
||||
enum avrorouter_file_op
|
||||
{
|
||||
AVROROUTER_SYNC,
|
||||
AVROROUTER_FLUSH
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Flush or sync all tables
|
||||
*
|
||||
* @param router Router instance
|
||||
* @param flush AVROROUTER_SYNC for sync only or AVROROUTER_FLUSH for full flush
|
||||
*/
|
||||
extern void avro_flush_all_tables(Avro *router, enum avrorouter_file_op flush);
|
||||
|
||||
#define AVRO_CLIENT_UNREGISTERED 0x0000
|
||||
#define AVRO_CLIENT_REGISTERED 0x0001
|
||||
#define AVRO_CLIENT_REQUEST_DATA 0x0002
|
||||
|
@ -15,6 +15,10 @@
|
||||
#include <vector>
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
#include <tr1/memory>
|
||||
#include <tr1/unordered_map>
|
||||
|
||||
#include <binlog_common.h>
|
||||
|
||||
typedef std::vector<uint8_t> Bytes;
|
||||
|
||||
@ -113,21 +117,32 @@ typedef std::tr1::shared_ptr<TableMapEvent> STableMapEvent;
|
||||
class RowEventHandler
|
||||
{
|
||||
public:
|
||||
RowEventHandler(const STableMapEvent& map, const STableCreateEvent& create):
|
||||
m_map(map),
|
||||
m_create(create)
|
||||
{
|
||||
}
|
||||
|
||||
virtual ~RowEventHandler()
|
||||
{
|
||||
}
|
||||
|
||||
// A table was opened
|
||||
virtual bool open_table(const STableMapEvent& map, const STableCreateEvent& create)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
// Prepare a new row for processing
|
||||
virtual void prepare(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) = 0;
|
||||
virtual bool prepare_table(std::string database, std::string table)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
// Flush open tables
|
||||
virtual void flush_tables()
|
||||
{
|
||||
}
|
||||
|
||||
// Prepare a new row for processing
|
||||
virtual void prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) = 0;
|
||||
|
||||
// Called once all columns are processed
|
||||
virtual bool commit() = 0;
|
||||
virtual bool commit(const gtid_pos_t& gtid) = 0;
|
||||
|
||||
// 32-bit integer handler
|
||||
virtual void column(int i, int32_t value) = 0;
|
||||
@ -149,8 +164,4 @@ public:
|
||||
|
||||
// Empty (NULL) value type handler
|
||||
virtual void column(int i) = 0;
|
||||
|
||||
protected:
|
||||
const STableMapEvent& m_map; // The table map event for this row
|
||||
const STableCreateEvent& m_create; // The CREATE TABLE statement for this row
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user