MXS-1881: Move RBR objects into a separate header
The RBR event handling related objects are now all in the rpl_events.hh header. The intention is to combine all replication processing related events used in the binlogrouter and avrorouter into this header to make them reusable. Also fixed the TableCreateEvent constructor to use an rvalue instead of stealing an lvalue.
This commit is contained in:
parent
e9dee55245
commit
8fab725413
@ -33,58 +33,12 @@ static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET va
|
||||
void notify_all_clients(Avro *router);
|
||||
void add_used_table(Avro* router, const char* table);
|
||||
|
||||
|
||||
class EventConverter
|
||||
{
|
||||
public:
|
||||
EventConverter(const STableMapEvent& map, const STableCreateEvent& create):
|
||||
m_map(map),
|
||||
m_create(create)
|
||||
{
|
||||
}
|
||||
|
||||
virtual ~EventConverter()
|
||||
{
|
||||
}
|
||||
|
||||
// Prepare a new row for processing
|
||||
virtual void prepare(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) = 0;
|
||||
|
||||
// Called once all columns are processed
|
||||
virtual bool commit() = 0;
|
||||
|
||||
// 32-bit integer handler
|
||||
virtual void column(int i, int32_t value) = 0;
|
||||
|
||||
// 64-bit integer handler
|
||||
virtual void column(int i, int64_t value) = 0;
|
||||
|
||||
// Float handler
|
||||
virtual void column(int i, float value) = 0;
|
||||
|
||||
// Double handler
|
||||
virtual void column(int i, double value) = 0;
|
||||
|
||||
// String handler
|
||||
virtual void column(int i, std::string value) = 0;
|
||||
|
||||
// Bytes handler
|
||||
virtual void column(int i, uint8_t* value, int len) = 0;
|
||||
|
||||
// Empty (NULL) value type handler
|
||||
virtual void column(int i) = 0;
|
||||
|
||||
protected:
|
||||
const STableMapEvent& m_map;
|
||||
const STableCreateEvent& m_create;
|
||||
};
|
||||
|
||||
class AvroConverter : public EventConverter
|
||||
class AvroConverter : public RowEventHandler
|
||||
{
|
||||
public:
|
||||
|
||||
AvroConverter(const STableMapEvent& map, const STableCreateEvent& create, SAvroTable table):
|
||||
EventConverter(map, create),
|
||||
RowEventHandler(map, create),
|
||||
m_writer_iface(table->avro_writer_iface),
|
||||
m_avro_file(table->avro_file)
|
||||
{
|
||||
@ -117,8 +71,6 @@ public:
|
||||
avro_value_set_enum(&m_field, event_type);
|
||||
}
|
||||
|
||||
// Called once all columns are processed
|
||||
|
||||
bool commit()
|
||||
{
|
||||
bool rval = true;
|
||||
@ -132,56 +84,42 @@ public:
|
||||
return rval;
|
||||
}
|
||||
|
||||
// 32-bit integer handler
|
||||
|
||||
void column(int i, int32_t value)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_int(&m_field, value);
|
||||
}
|
||||
|
||||
// 64-bit integer handler
|
||||
|
||||
void column(int i, int64_t value)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_long(&m_field, value);
|
||||
}
|
||||
|
||||
// Float handler
|
||||
|
||||
void column(int i, float value)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_float(&m_field, value);
|
||||
}
|
||||
|
||||
// Double handler
|
||||
|
||||
void column(int i, double value)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_double(&m_field, value);
|
||||
}
|
||||
|
||||
// String handler
|
||||
|
||||
void column(int i, std::string value)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_string(&m_field, value.c_str());
|
||||
}
|
||||
|
||||
// Bytes handler
|
||||
|
||||
void column(int i, uint8_t* value, int len)
|
||||
{
|
||||
set_active(i);
|
||||
avro_value_set_bytes(&m_field, value, len);
|
||||
}
|
||||
|
||||
// Empty (NULL) value type handler
|
||||
|
||||
void column(int i)
|
||||
{
|
||||
set_active(i);
|
||||
@ -212,8 +150,9 @@ private:
|
||||
};
|
||||
|
||||
|
||||
uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, EventConverter* conv,
|
||||
uint8_t *ptr, uint8_t *columns_present, uint8_t *end);
|
||||
uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create,
|
||||
RowEventHandler* conv, uint8_t *ptr,
|
||||
uint8_t *columns_present, uint8_t *end);
|
||||
|
||||
/**
|
||||
* @brief Get row event name
|
||||
@ -532,7 +471,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
* @param metadata Field metadata
|
||||
* @param value Pointer to the start of the in-memory representation of the data
|
||||
*/
|
||||
void set_numeric_field_value(EventConverter* conv, int idx, uint8_t type,
|
||||
void set_numeric_field_value(RowEventHandler* conv, int idx, uint8_t type,
|
||||
uint8_t *metadata, uint8_t *value)
|
||||
{
|
||||
switch (type)
|
||||
@ -692,7 +631,7 @@ static bool all_fields_null(uint8_t* null_bitmap, int ncolumns)
|
||||
* this row event. Currently this should be a bitfield which has all bits set.
|
||||
* @return Pointer to the first byte after the current row event
|
||||
*/
|
||||
uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, EventConverter* conv,
|
||||
uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, RowEventHandler* conv,
|
||||
uint8_t *ptr, uint8_t *columns_present, uint8_t *end)
|
||||
{
|
||||
int npresent = 0;
|
||||
|
@ -618,7 +618,7 @@ TableCreateEvent* table_create_from_schema(const char* file, const char* db,
|
||||
|
||||
if (json_extract_field_names(file, columns))
|
||||
{
|
||||
newtable = new (std::nothrow)TableCreateEvent(db, table, version, columns);
|
||||
newtable = new (std::nothrow)TableCreateEvent(db, table, version, std::move(columns));
|
||||
}
|
||||
|
||||
return newtable;
|
||||
@ -672,7 +672,7 @@ TableCreateEvent* table_create_alloc(char* ident, const char* sql, int len)
|
||||
if (!columns.empty())
|
||||
{
|
||||
int version = resolve_table_version(database, table);
|
||||
rval = new (std::nothrow) TableCreateEvent(database, table, version, columns);
|
||||
rval = new (std::nothrow) TableCreateEvent(database, table, version, std::move(columns));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -35,6 +35,8 @@
|
||||
#include <maxscale/sqlite3.h>
|
||||
#include <maxscale/protocol/mysql.h>
|
||||
|
||||
#include "rpl_events.hh"
|
||||
|
||||
MXS_BEGIN_DECLS
|
||||
|
||||
/**
|
||||
@ -121,75 +123,6 @@ typedef enum avro_binlog_end
|
||||
/** How many bytes each thread tries to send */
|
||||
#define AVRO_DATA_BURST_SIZE (32 * 1024)
|
||||
|
||||
/** A single column in a CREATE TABLE statement */
|
||||
struct Column
|
||||
{
|
||||
Column(std::string name, std::string type = "unknown", int length = -1):
|
||||
name(name),
|
||||
type(type),
|
||||
length(length)
|
||||
{
|
||||
}
|
||||
|
||||
std::string name;
|
||||
std::string type;
|
||||
int length;
|
||||
};
|
||||
|
||||
/** A CREATE TABLE abstraction */
|
||||
struct TableCreateEvent
|
||||
{
|
||||
TableCreateEvent(std::string db, std::string table, int version, std::vector<Column>& cols):
|
||||
table(table),
|
||||
database(db),
|
||||
version(version),
|
||||
was_used(false)
|
||||
|
||||
{
|
||||
columns.swap(cols);
|
||||
}
|
||||
|
||||
std::vector<Column> columns;
|
||||
std::string table;
|
||||
std::string database;
|
||||
int version; /**< How many versions of this table have been used */
|
||||
bool was_used; /**< Has this schema been persisted to disk */
|
||||
};
|
||||
|
||||
typedef std::vector<uint8_t> Bytes;
|
||||
|
||||
/** A representation of a table map event read from a binary log. A table map
|
||||
* maps a table to a unique ID which can be used to match row events to table map
|
||||
* events. The table map event tells us how the table is laid out and gives us
|
||||
* some meta information on the columns. */
|
||||
struct TableMapEvent
|
||||
{
|
||||
TableMapEvent(const std::string& db, const std::string& table, uint64_t id,
|
||||
int version, Bytes&& cols, Bytes&& nulls, Bytes&& metadata):
|
||||
database(db),
|
||||
table(table),
|
||||
id(id),
|
||||
version(version),
|
||||
column_types(cols),
|
||||
null_bitmap(nulls),
|
||||
column_metadata(metadata)
|
||||
{
|
||||
}
|
||||
|
||||
uint64_t columns() const
|
||||
{
|
||||
return column_types.size();
|
||||
}
|
||||
|
||||
std::string database;
|
||||
std::string table;
|
||||
uint64_t id;
|
||||
int version;
|
||||
Bytes column_types;
|
||||
Bytes null_bitmap;
|
||||
Bytes column_metadata;
|
||||
};
|
||||
|
||||
struct AvroTable
|
||||
{
|
||||
AvroTable(avro_file_writer_t file, avro_value_iface_t* iface, avro_schema_t schema):
|
||||
@ -227,30 +160,7 @@ enum mxs_avro_codec_type
|
||||
MXS_AVRO_CODEC_SNAPPY, /**< Not yet implemented */
|
||||
};
|
||||
|
||||
struct gtid_pos_t
|
||||
{
|
||||
gtid_pos_t():
|
||||
timestamp(0),
|
||||
domain(0),
|
||||
server_id(0),
|
||||
seq(0),
|
||||
event_num(0)
|
||||
{
|
||||
}
|
||||
|
||||
uint32_t timestamp; /*< GTID event timestamp */
|
||||
uint64_t domain; /*< Replication domain */
|
||||
uint64_t server_id; /*< Server ID */
|
||||
uint64_t seq; /*< Sequence number */
|
||||
uint64_t event_num; /*< Subsequence number, increases monotonically. This
|
||||
* is an internal representation of the position of
|
||||
* an event inside a GTID event and it is used to
|
||||
* rebuild GTID events in the correct order. */
|
||||
};
|
||||
|
||||
typedef std::tr1::shared_ptr<TableCreateEvent> STableCreateEvent;
|
||||
typedef std::tr1::shared_ptr<AvroTable> SAvroTable;
|
||||
typedef std::tr1::shared_ptr<TableMapEvent> STableMapEvent;
|
||||
|
||||
typedef std::tr1::unordered_map<std::string, STableCreateEvent> CreatedTables;
|
||||
typedef std::tr1::unordered_map<std::string, SAvroTable> AvroTables;
|
||||
|
156
server/modules/routing/avrorouter/rpl_events.hh
Normal file
156
server/modules/routing/avrorouter/rpl_events.hh
Normal file
@ -0,0 +1,156 @@
|
||||
#pragma once
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2020-01-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include <vector>
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
|
||||
typedef std::vector<uint8_t> Bytes;
|
||||
|
||||
// A GTID position
|
||||
struct gtid_pos_t
|
||||
{
|
||||
gtid_pos_t():
|
||||
timestamp(0),
|
||||
domain(0),
|
||||
server_id(0),
|
||||
seq(0),
|
||||
event_num(0)
|
||||
{
|
||||
}
|
||||
|
||||
uint32_t timestamp; /*< GTID event timestamp */
|
||||
uint64_t domain; /*< Replication domain */
|
||||
uint64_t server_id; /*< Server ID */
|
||||
uint64_t seq; /*< Sequence number */
|
||||
uint64_t event_num; /*< Subsequence number, increases monotonically. This
|
||||
* is an internal representation of the position of
|
||||
* an event inside a GTID event and it is used to
|
||||
* rebuild GTID events in the correct order. */
|
||||
};
|
||||
|
||||
/** A single column in a CREATE TABLE statement */
|
||||
struct Column
|
||||
{
|
||||
Column(std::string name, std::string type = "unknown", int length = -1):
|
||||
name(name),
|
||||
type(type),
|
||||
length(length)
|
||||
{
|
||||
}
|
||||
|
||||
std::string name;
|
||||
std::string type;
|
||||
int length;
|
||||
};
|
||||
|
||||
/** A CREATE TABLE abstraction */
|
||||
struct TableCreateEvent
|
||||
{
|
||||
TableCreateEvent(std::string db, std::string table, int version, std::vector<Column>&& cols):
|
||||
columns(cols),
|
||||
table(table),
|
||||
database(db),
|
||||
version(version),
|
||||
was_used(false)
|
||||
{
|
||||
}
|
||||
|
||||
std::vector<Column> columns;
|
||||
std::string table;
|
||||
std::string database;
|
||||
int version; /**< How many versions of this table have been used */
|
||||
bool was_used; /**< Has this schema been persisted to disk */
|
||||
};
|
||||
|
||||
/** A representation of a table map event read from a binary log. A table map
|
||||
* maps a table to a unique ID which can be used to match row events to table map
|
||||
* events. The table map event tells us how the table is laid out and gives us
|
||||
* some meta information on the columns. */
|
||||
struct TableMapEvent
|
||||
{
|
||||
TableMapEvent(const std::string& db, const std::string& table, uint64_t id,
|
||||
int version, Bytes&& cols, Bytes&& nulls, Bytes&& metadata):
|
||||
database(db),
|
||||
table(table),
|
||||
id(id),
|
||||
version(version),
|
||||
column_types(cols),
|
||||
null_bitmap(nulls),
|
||||
column_metadata(metadata)
|
||||
{
|
||||
}
|
||||
|
||||
uint64_t columns() const
|
||||
{
|
||||
return column_types.size();
|
||||
}
|
||||
|
||||
std::string database;
|
||||
std::string table;
|
||||
uint64_t id;
|
||||
int version;
|
||||
Bytes column_types;
|
||||
Bytes null_bitmap;
|
||||
Bytes column_metadata;
|
||||
};
|
||||
|
||||
typedef std::tr1::shared_ptr<TableCreateEvent> STableCreateEvent;
|
||||
typedef std::tr1::shared_ptr<TableMapEvent> STableMapEvent;
|
||||
|
||||
// Handler class for row based replication events
|
||||
class RowEventHandler
|
||||
{
|
||||
public:
|
||||
RowEventHandler(const STableMapEvent& map, const STableCreateEvent& create):
|
||||
m_map(map),
|
||||
m_create(create)
|
||||
{
|
||||
}
|
||||
|
||||
virtual ~RowEventHandler()
|
||||
{
|
||||
}
|
||||
|
||||
// Prepare a new row for processing
|
||||
virtual void prepare(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) = 0;
|
||||
|
||||
// Called once all columns are processed
|
||||
virtual bool commit() = 0;
|
||||
|
||||
// 32-bit integer handler
|
||||
virtual void column(int i, int32_t value) = 0;
|
||||
|
||||
// 64-bit integer handler
|
||||
virtual void column(int i, int64_t value) = 0;
|
||||
|
||||
// Float handler
|
||||
virtual void column(int i, float value) = 0;
|
||||
|
||||
// Double handler
|
||||
virtual void column(int i, double value) = 0;
|
||||
|
||||
// String handler
|
||||
virtual void column(int i, std::string value) = 0;
|
||||
|
||||
// Bytes handler
|
||||
virtual void column(int i, uint8_t* value, int len) = 0;
|
||||
|
||||
// Empty (NULL) value type handler
|
||||
virtual void column(int i) = 0;
|
||||
|
||||
protected:
|
||||
const STableMapEvent& m_map; // The table map event for this row
|
||||
const STableCreateEvent& m_create; // The CREATE TABLE statement for this row
|
||||
};
|
Loading…
x
Reference in New Issue
Block a user