diff --git a/server/modules/routing/avrorouter/avro_rbr.cc b/server/modules/routing/avrorouter/avro_rbr.cc index b7a51ad4f..dac01a9d6 100644 --- a/server/modules/routing/avrorouter/avro_rbr.cc +++ b/server/modules/routing/avrorouter/avro_rbr.cc @@ -33,58 +33,12 @@ static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET va void notify_all_clients(Avro *router); void add_used_table(Avro* router, const char* table); - -class EventConverter -{ -public: - EventConverter(const STableMapEvent& map, const STableCreateEvent& create): - m_map(map), - m_create(create) - { - } - - virtual ~EventConverter() - { - } - - // Prepare a new row for processing - virtual void prepare(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) = 0; - - // Called once all columns are processed - virtual bool commit() = 0; - - // 32-bit integer handler - virtual void column(int i, int32_t value) = 0; - - // 64-bit integer handler - virtual void column(int i, int64_t value) = 0; - - // Float handler - virtual void column(int i, float value) = 0; - - // Double handler - virtual void column(int i, double value) = 0; - - // String handler - virtual void column(int i, std::string value) = 0; - - // Bytes handler - virtual void column(int i, uint8_t* value, int len) = 0; - - // Empty (NULL) value type handler - virtual void column(int i) = 0; - -protected: - const STableMapEvent& m_map; - const STableCreateEvent& m_create; -}; - -class AvroConverter : public EventConverter +class AvroConverter : public RowEventHandler { public: AvroConverter(const STableMapEvent& map, const STableCreateEvent& create, SAvroTable table): - EventConverter(map, create), + RowEventHandler(map, create), m_writer_iface(table->avro_writer_iface), m_avro_file(table->avro_file) { @@ -117,8 +71,6 @@ public: avro_value_set_enum(&m_field, event_type); } - // Called once all columns are processed - bool commit() { bool rval = true; @@ -132,56 +84,42 @@ public: return rval; } - // 32-bit integer handler - void column(int i, int32_t value) { set_active(i); avro_value_set_int(&m_field, value); } - // 64-bit integer handler - void column(int i, int64_t value) { set_active(i); avro_value_set_long(&m_field, value); } - // Float handler - void column(int i, float value) { set_active(i); avro_value_set_float(&m_field, value); } - // Double handler - void column(int i, double value) { set_active(i); avro_value_set_double(&m_field, value); } - // String handler - void column(int i, std::string value) { set_active(i); avro_value_set_string(&m_field, value.c_str()); } - // Bytes handler - void column(int i, uint8_t* value, int len) { set_active(i); avro_value_set_bytes(&m_field, value, len); } - // Empty (NULL) value type handler - void column(int i) { set_active(i); @@ -212,8 +150,9 @@ private: }; -uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, EventConverter* conv, - uint8_t *ptr, uint8_t *columns_present, uint8_t *end); +uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, + RowEventHandler* conv, uint8_t *ptr, + uint8_t *columns_present, uint8_t *end); /** * @brief Get row event name @@ -532,7 +471,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) * @param metadata Field metadata * @param value Pointer to the start of the in-memory representation of the data */ -void set_numeric_field_value(EventConverter* conv, int idx, uint8_t type, +void set_numeric_field_value(RowEventHandler* conv, int idx, uint8_t type, uint8_t *metadata, uint8_t *value) { switch (type) @@ -692,7 +631,7 @@ static bool all_fields_null(uint8_t* null_bitmap, int ncolumns) * this row event. Currently this should be a bitfield which has all bits set. * @return Pointer to the first byte after the current row event */ -uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, EventConverter* conv, +uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create, RowEventHandler* conv, uint8_t *ptr, uint8_t *columns_present, uint8_t *end) { int npresent = 0; diff --git a/server/modules/routing/avrorouter/avro_schema.cc b/server/modules/routing/avrorouter/avro_schema.cc index 8e5ddd124..9cf096265 100644 --- a/server/modules/routing/avrorouter/avro_schema.cc +++ b/server/modules/routing/avrorouter/avro_schema.cc @@ -618,7 +618,7 @@ TableCreateEvent* table_create_from_schema(const char* file, const char* db, if (json_extract_field_names(file, columns)) { - newtable = new (std::nothrow)TableCreateEvent(db, table, version, columns); + newtable = new (std::nothrow)TableCreateEvent(db, table, version, std::move(columns)); } return newtable; @@ -672,7 +672,7 @@ TableCreateEvent* table_create_alloc(char* ident, const char* sql, int len) if (!columns.empty()) { int version = resolve_table_version(database, table); - rval = new (std::nothrow) TableCreateEvent(database, table, version, columns); + rval = new (std::nothrow) TableCreateEvent(database, table, version, std::move(columns)); } else { diff --git a/server/modules/routing/avrorouter/avrorouter.hh b/server/modules/routing/avrorouter/avrorouter.hh index 8f25a29f9..178f1a07d 100644 --- a/server/modules/routing/avrorouter/avrorouter.hh +++ b/server/modules/routing/avrorouter/avrorouter.hh @@ -35,6 +35,8 @@ #include #include +#include "rpl_events.hh" + MXS_BEGIN_DECLS /** @@ -121,75 +123,6 @@ typedef enum avro_binlog_end /** How many bytes each thread tries to send */ #define AVRO_DATA_BURST_SIZE (32 * 1024) -/** A single column in a CREATE TABLE statement */ -struct Column -{ - Column(std::string name, std::string type = "unknown", int length = -1): - name(name), - type(type), - length(length) - { - } - - std::string name; - std::string type; - int length; -}; - -/** A CREATE TABLE abstraction */ -struct TableCreateEvent -{ - TableCreateEvent(std::string db, std::string table, int version, std::vector& cols): - table(table), - database(db), - version(version), - was_used(false) - - { - columns.swap(cols); - } - - std::vector columns; - std::string table; - std::string database; - int version; /**< How many versions of this table have been used */ - bool was_used; /**< Has this schema been persisted to disk */ -}; - -typedef std::vector Bytes; - -/** A representation of a table map event read from a binary log. A table map - * maps a table to a unique ID which can be used to match row events to table map - * events. The table map event tells us how the table is laid out and gives us - * some meta information on the columns. */ -struct TableMapEvent -{ - TableMapEvent(const std::string& db, const std::string& table, uint64_t id, - int version, Bytes&& cols, Bytes&& nulls, Bytes&& metadata): - database(db), - table(table), - id(id), - version(version), - column_types(cols), - null_bitmap(nulls), - column_metadata(metadata) - { - } - - uint64_t columns() const - { - return column_types.size(); - } - - std::string database; - std::string table; - uint64_t id; - int version; - Bytes column_types; - Bytes null_bitmap; - Bytes column_metadata; -}; - struct AvroTable { AvroTable(avro_file_writer_t file, avro_value_iface_t* iface, avro_schema_t schema): @@ -227,30 +160,7 @@ enum mxs_avro_codec_type MXS_AVRO_CODEC_SNAPPY, /**< Not yet implemented */ }; -struct gtid_pos_t -{ - gtid_pos_t(): - timestamp(0), - domain(0), - server_id(0), - seq(0), - event_num(0) - { - } - - uint32_t timestamp; /*< GTID event timestamp */ - uint64_t domain; /*< Replication domain */ - uint64_t server_id; /*< Server ID */ - uint64_t seq; /*< Sequence number */ - uint64_t event_num; /*< Subsequence number, increases monotonically. This - * is an internal representation of the position of - * an event inside a GTID event and it is used to - * rebuild GTID events in the correct order. */ -}; - -typedef std::tr1::shared_ptr STableCreateEvent; typedef std::tr1::shared_ptr SAvroTable; -typedef std::tr1::shared_ptr STableMapEvent; typedef std::tr1::unordered_map CreatedTables; typedef std::tr1::unordered_map AvroTables; diff --git a/server/modules/routing/avrorouter/rpl_events.hh b/server/modules/routing/avrorouter/rpl_events.hh new file mode 100644 index 000000000..0e3d5db33 --- /dev/null +++ b/server/modules/routing/avrorouter/rpl_events.hh @@ -0,0 +1,156 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include +#include + +typedef std::vector Bytes; + +// A GTID position +struct gtid_pos_t +{ + gtid_pos_t(): + timestamp(0), + domain(0), + server_id(0), + seq(0), + event_num(0) + { + } + + uint32_t timestamp; /*< GTID event timestamp */ + uint64_t domain; /*< Replication domain */ + uint64_t server_id; /*< Server ID */ + uint64_t seq; /*< Sequence number */ + uint64_t event_num; /*< Subsequence number, increases monotonically. This + * is an internal representation of the position of + * an event inside a GTID event and it is used to + * rebuild GTID events in the correct order. */ +}; + +/** A single column in a CREATE TABLE statement */ +struct Column +{ + Column(std::string name, std::string type = "unknown", int length = -1): + name(name), + type(type), + length(length) + { + } + + std::string name; + std::string type; + int length; +}; + +/** A CREATE TABLE abstraction */ +struct TableCreateEvent +{ + TableCreateEvent(std::string db, std::string table, int version, std::vector&& cols): + columns(cols), + table(table), + database(db), + version(version), + was_used(false) + { + } + + std::vector columns; + std::string table; + std::string database; + int version; /**< How many versions of this table have been used */ + bool was_used; /**< Has this schema been persisted to disk */ +}; + +/** A representation of a table map event read from a binary log. A table map + * maps a table to a unique ID which can be used to match row events to table map + * events. The table map event tells us how the table is laid out and gives us + * some meta information on the columns. */ +struct TableMapEvent +{ + TableMapEvent(const std::string& db, const std::string& table, uint64_t id, + int version, Bytes&& cols, Bytes&& nulls, Bytes&& metadata): + database(db), + table(table), + id(id), + version(version), + column_types(cols), + null_bitmap(nulls), + column_metadata(metadata) + { + } + + uint64_t columns() const + { + return column_types.size(); + } + + std::string database; + std::string table; + uint64_t id; + int version; + Bytes column_types; + Bytes null_bitmap; + Bytes column_metadata; +}; + +typedef std::tr1::shared_ptr STableCreateEvent; +typedef std::tr1::shared_ptr STableMapEvent; + +// Handler class for row based replication events +class RowEventHandler +{ +public: + RowEventHandler(const STableMapEvent& map, const STableCreateEvent& create): + m_map(map), + m_create(create) + { + } + + virtual ~RowEventHandler() + { + } + + // Prepare a new row for processing + virtual void prepare(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) = 0; + + // Called once all columns are processed + virtual bool commit() = 0; + + // 32-bit integer handler + virtual void column(int i, int32_t value) = 0; + + // 64-bit integer handler + virtual void column(int i, int64_t value) = 0; + + // Float handler + virtual void column(int i, float value) = 0; + + // Double handler + virtual void column(int i, double value) = 0; + + // String handler + virtual void column(int i, std::string value) = 0; + + // Bytes handler + virtual void column(int i, uint8_t* value, int len) = 0; + + // Empty (NULL) value type handler + virtual void column(int i) = 0; + +protected: + const STableMapEvent& m_map; // The table map event for this row + const STableCreateEvent& m_create; // The CREATE TABLE statement for this row +};