294 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			294 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
 * Copyright (c) 2018 MariaDB Corporation Ab
 | 
						|
 *
 | 
						|
 * Use of this software is governed by the Business Source License included
 | 
						|
 * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | 
						|
 *
 | 
						|
 * Change Date: 2022-01-01
 | 
						|
 *
 | 
						|
 * On the date above, in accordance with the Business Source License, use
 | 
						|
 * of this software will be governed by version 2 or later of the General
 | 
						|
 * Public License.
 | 
						|
 */
 | 
						|
#pragma once
 | 
						|
 | 
						|
#include <vector>
 | 
						|
#include <cstdint>
 | 
						|
#include <string>
 | 
						|
#include <sstream>
 | 
						|
#include <memory>
 | 
						|
#include <unordered_map>
 | 
						|
 | 
						|
#include <maxscale/pcre2.h>
 | 
						|
#include <maxscale/service.hh>
 | 
						|
#include <binlog_common.hh>
 | 
						|
 | 
						|
typedef std::vector<uint8_t> Bytes;
 | 
						|
 | 
						|
// A GTID position
 | 
						|
struct gtid_pos_t
 | 
						|
{
 | 
						|
    gtid_pos_t()
 | 
						|
        : timestamp(0)
 | 
						|
        , domain(0)
 | 
						|
        , server_id(0)
 | 
						|
        , seq(0)
 | 
						|
        , event_num(0)
 | 
						|
    {
 | 
						|
    }
 | 
						|
 | 
						|
    uint32_t timestamp; /*< GTID event timestamp */
 | 
						|
    uint64_t domain;    /*< Replication domain */
 | 
						|
    uint64_t server_id; /*< Server ID */
 | 
						|
    uint64_t seq;       /*< Sequence number */
 | 
						|
    uint64_t event_num; /*< Subsequence number, increases monotonically. This
 | 
						|
                         * is an internal representation of the position of
 | 
						|
                         * an event inside a GTID event and it is used to
 | 
						|
                         * rebuild GTID events in the correct order. */
 | 
						|
 | 
						|
    void              extract(const REP_HEADER& hdr, uint8_t* ptr);
 | 
						|
    bool              parse(const char* str);
 | 
						|
    static gtid_pos_t from_string(std::string str);
 | 
						|
    std::string       to_string() const;
 | 
						|
    bool              empty() const;
 | 
						|
};
 | 
						|
 | 
						|
/** A single column in a CREATE TABLE statement */
 | 
						|
struct Column
 | 
						|
{
 | 
						|
    Column(std::string name, std::string type = "unknown", int length = -1)
 | 
						|
        : name(name)
 | 
						|
        , type(type)
 | 
						|
        , length(length)
 | 
						|
    {
 | 
						|
    }
 | 
						|
 | 
						|
    std::string name;
 | 
						|
    std::string type;
 | 
						|
    int         length;
 | 
						|
 | 
						|
    json_t*       to_json() const;
 | 
						|
    static Column from_json(json_t* json);
 | 
						|
};
 | 
						|
 | 
						|
struct TableCreateEvent;
 | 
						|
typedef std::shared_ptr<TableCreateEvent> STableCreateEvent;
 | 
						|
 | 
						|
/** A CREATE TABLE abstraction */
 | 
						|
struct TableCreateEvent
 | 
						|
{
 | 
						|
    TableCreateEvent(std::string db, std::string table, int version, std::vector<Column>&& cols)
 | 
						|
        : columns(cols)
 | 
						|
        , table(table)
 | 
						|
        , database(db)
 | 
						|
        , version(version)
 | 
						|
        , was_used(false)
 | 
						|
    {
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * Get the table identifier i.e. `database.table`
 | 
						|
     *
 | 
						|
     * @return The table identifier
 | 
						|
     */
 | 
						|
    std::string id() const
 | 
						|
    {
 | 
						|
        return database + '.' + table;
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * Convert to JSON
 | 
						|
     *
 | 
						|
     * @return JSON representation of this object
 | 
						|
     */
 | 
						|
    json_t* to_json() const;
 | 
						|
 | 
						|
    /**
 | 
						|
     * Convert from JSON
 | 
						|
     *
 | 
						|
     * @param json JSON to convert from
 | 
						|
     *
 | 
						|
     * @return Object representation of JSON if it is valid or empty pointer if invalid.
 | 
						|
     */
 | 
						|
    static STableCreateEvent from_json(json_t* json);
 | 
						|
 | 
						|
    std::vector<Column> columns;
 | 
						|
    std::string         table;
 | 
						|
    std::string         database;
 | 
						|
    int                 version;        /**< How many versions of this table have been used */
 | 
						|
    bool                was_used;       /**< Has this schema been persisted to disk */
 | 
						|
};
 | 
						|
 | 
						|
/** A representation of a table map event read from a binary log. A table map
 | 
						|
 * maps a table to a unique ID which can be used to match row events to table map
 | 
						|
 * events. The table map event tells us how the table is laid out and gives us
 | 
						|
 * some meta information on the columns. */
 | 
						|
struct TableMapEvent
 | 
						|
{
 | 
						|
    TableMapEvent(const std::string& db,
 | 
						|
                  const std::string& table,
 | 
						|
                  uint64_t id,
 | 
						|
                  int version,
 | 
						|
                  Bytes&& cols,
 | 
						|
                  Bytes&& nulls,
 | 
						|
                  Bytes&& metadata)
 | 
						|
        : database(db)
 | 
						|
        , table(table)
 | 
						|
        , id(id)
 | 
						|
        , version(version)
 | 
						|
        , column_types(cols)
 | 
						|
        , null_bitmap(nulls)
 | 
						|
        , column_metadata(metadata)
 | 
						|
    {
 | 
						|
    }
 | 
						|
 | 
						|
    uint64_t columns() const
 | 
						|
    {
 | 
						|
        return column_types.size();
 | 
						|
    }
 | 
						|
 | 
						|
    std::string database;
 | 
						|
    std::string table;
 | 
						|
    uint64_t    id;
 | 
						|
    int         version;
 | 
						|
    Bytes       column_types;
 | 
						|
    Bytes       null_bitmap;
 | 
						|
    Bytes       column_metadata;
 | 
						|
};
 | 
						|
 | 
						|
typedef std::shared_ptr<TableMapEvent> STableMapEvent;
 | 
						|
 | 
						|
// Containers for the replication events
 | 
						|
typedef std::unordered_map<std::string, STableCreateEvent> CreatedTables;
 | 
						|
typedef std::unordered_map<std::string, STableMapEvent>    MappedTables;
 | 
						|
typedef std::unordered_map<uint64_t, STableMapEvent>       ActiveMaps;
 | 
						|
 | 
						|
// Handler class for row based replication events
 | 
						|
class RowEventHandler
 | 
						|
{
 | 
						|
public:
 | 
						|
    virtual ~RowEventHandler()
 | 
						|
    {
 | 
						|
    }
 | 
						|
 | 
						|
    // A table was created
 | 
						|
    virtual bool create_table(const STableCreateEvent& create)
 | 
						|
    {
 | 
						|
        return true;
 | 
						|
    }
 | 
						|
 | 
						|
    // A table was opened
 | 
						|
    virtual bool open_table(const STableMapEvent& map, const STableCreateEvent& create)
 | 
						|
    {
 | 
						|
        return true;
 | 
						|
    }
 | 
						|
 | 
						|
    // Prepare a table for row processing
 | 
						|
    virtual bool prepare_table(const STableMapEvent& map, const STableCreateEvent& create)
 | 
						|
    {
 | 
						|
        return true;
 | 
						|
    }
 | 
						|
 | 
						|
    // Flush open tables
 | 
						|
    virtual void flush_tables()
 | 
						|
    {
 | 
						|
    }
 | 
						|
 | 
						|
    // Prepare a new row for processing
 | 
						|
    virtual void prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) = 0;
 | 
						|
 | 
						|
    // Called once all columns are processed
 | 
						|
    virtual bool commit(const gtid_pos_t& gtid) = 0;
 | 
						|
 | 
						|
    // 32-bit integer handler
 | 
						|
    virtual void column(int i, int32_t value) = 0;
 | 
						|
 | 
						|
    // 64-bit integer handler
 | 
						|
    virtual void column(int i, int64_t value) = 0;
 | 
						|
 | 
						|
    // Float handler
 | 
						|
    virtual void column(int i, float value) = 0;
 | 
						|
 | 
						|
    // Double handler
 | 
						|
    virtual void column(int i, double value) = 0;
 | 
						|
 | 
						|
    // String handler
 | 
						|
    virtual void column(int i, std::string value) = 0;
 | 
						|
 | 
						|
    // Bytes handler
 | 
						|
    virtual void column(int i, uint8_t* value, int len) = 0;
 | 
						|
 | 
						|
    // Empty (NULL) value type handler
 | 
						|
    virtual void column(int i) = 0;
 | 
						|
};
 | 
						|
 | 
						|
typedef std::auto_ptr<RowEventHandler> SRowEventHandler;
 | 
						|
 | 
						|
class Rpl
 | 
						|
{
 | 
						|
public:
 | 
						|
    Rpl(const Rpl&) = delete;
 | 
						|
    Rpl& operator=(const Rpl&) = delete;
 | 
						|
 | 
						|
    // Construct a new replication stream transformer
 | 
						|
    Rpl(SERVICE* service,
 | 
						|
        SRowEventHandler event_handler,
 | 
						|
        pcre2_code* match,
 | 
						|
        pcre2_code* exclude,
 | 
						|
        gtid_pos_t =
 | 
						|
    {
 | 
						|
    });
 | 
						|
 | 
						|
    // Add a stored TableCreateEvent
 | 
						|
    void add_create(STableCreateEvent create);
 | 
						|
 | 
						|
    // Handle a replicated binary log event
 | 
						|
    void handle_event(REP_HEADER hdr, uint8_t* ptr);
 | 
						|
 | 
						|
    // Called when processed events need to be persisted to disk
 | 
						|
    void flush();
 | 
						|
 | 
						|
    // Check if binlog checksums are enabled
 | 
						|
    bool have_checksums() const
 | 
						|
    {
 | 
						|
        return m_binlog_checksum;
 | 
						|
    }
 | 
						|
 | 
						|
    // Set current GTID
 | 
						|
    void set_gtid(gtid_pos_t gtid)
 | 
						|
    {
 | 
						|
        m_gtid = gtid;
 | 
						|
    }
 | 
						|
 | 
						|
    // Get current GTID
 | 
						|
    const gtid_pos_t& get_gtid() const
 | 
						|
    {
 | 
						|
        return m_gtid;
 | 
						|
    }
 | 
						|
 | 
						|
private:
 | 
						|
    SRowEventHandler  m_handler;
 | 
						|
    SERVICE*          m_service;
 | 
						|
    pcre2_code*       m_create_table_re;
 | 
						|
    pcre2_code*       m_alter_table_re;
 | 
						|
    uint8_t           m_binlog_checksum;
 | 
						|
    uint8_t           m_event_types;
 | 
						|
    Bytes             m_event_type_hdr_lens;
 | 
						|
    gtid_pos_t        m_gtid;
 | 
						|
    ActiveMaps        m_active_maps;
 | 
						|
    MappedTables      m_table_maps;
 | 
						|
    CreatedTables     m_created_tables;
 | 
						|
    pcre2_code*       m_match;
 | 
						|
    pcre2_code*       m_exclude;
 | 
						|
    pcre2_match_data* m_md_match;
 | 
						|
    pcre2_match_data* m_md_exclude;
 | 
						|
 | 
						|
    void              handle_query_event(REP_HEADER* hdr, uint8_t* ptr);
 | 
						|
    bool              handle_table_map_event(REP_HEADER* hdr, uint8_t* ptr);
 | 
						|
    bool              handle_row_event(REP_HEADER* hdr, uint8_t* ptr);
 | 
						|
    STableCreateEvent table_create_copy(const char* sql, size_t len, const char* db);
 | 
						|
    bool              save_and_replace_table_create(STableCreateEvent created);
 | 
						|
    bool              table_create_alter(STableCreateEvent create, const char* sql, const char* end);
 | 
						|
    bool              table_matches(const std::string& ident);
 | 
						|
};
 |