 b223e6eed3
			
		
	
	b223e6eed3
	
	
	
		
			
			RENAME TABLE is now fully supported and works as expected. With the fix to table versioning, the new table name will receive the latest version number.
		
			
				
	
	
		
			300 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			300 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|  * Copyright (c) 2018 MariaDB Corporation Ab
 | |
|  *
 | |
|  * Use of this software is governed by the Business Source License included
 | |
|  * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | |
|  *
 | |
|  * Change Date: 2023-01-01
 | |
|  *
 | |
|  * On the date above, in accordance with the Business Source License, use
 | |
|  * of this software will be governed by version 2 or later of the General
 | |
|  * Public License.
 | |
|  */
 | |
| #pragma once
 | |
| 
 | |
| #include <vector>
 | |
| #include <cstdint>
 | |
| #include <string>
 | |
| #include <sstream>
 | |
| #include <memory>
 | |
| #include <unordered_map>
 | |
| 
 | |
| #include <maxscale/pcre2.h>
 | |
| #include <maxscale/service.hh>
 | |
| #include <binlog_common.hh>
 | |
| 
 | |
| typedef std::vector<uint8_t> Bytes;
 | |
| 
 | |
| // A GTID position
 | |
| struct gtid_pos_t
 | |
| {
 | |
|     gtid_pos_t()
 | |
|         : timestamp(0)
 | |
|         , domain(0)
 | |
|         , server_id(0)
 | |
|         , seq(0)
 | |
|         , event_num(0)
 | |
|     {
 | |
|     }
 | |
| 
 | |
|     uint32_t timestamp; /*< GTID event timestamp */
 | |
|     uint64_t domain;    /*< Replication domain */
 | |
|     uint64_t server_id; /*< Server ID */
 | |
|     uint64_t seq;       /*< Sequence number */
 | |
|     uint64_t event_num; /*< Subsequence number, increases monotonically. This
 | |
|                          * is an internal representation of the position of
 | |
|                          * an event inside a GTID event and it is used to
 | |
|                          * rebuild GTID events in the correct order. */
 | |
| 
 | |
|     void              extract(const REP_HEADER& hdr, uint8_t* ptr);
 | |
|     bool              parse(const char* str);
 | |
|     static gtid_pos_t from_string(std::string str);
 | |
|     std::string       to_string() const;
 | |
|     bool              empty() const;
 | |
| };
 | |
| 
 | |
| /** A single column in a CREATE TABLE statement */
 | |
| struct Column
 | |
| {
 | |
|     Column(std::string name, std::string type = "unknown", int length = -1, bool is_unsigned = false)
 | |
|         : name(name)
 | |
|         , type(type)
 | |
|         , length(length)
 | |
|         , is_unsigned(is_unsigned)
 | |
|     {
 | |
|     }
 | |
| 
 | |
|     std::string name;
 | |
|     std::string type;
 | |
|     int         length;
 | |
|     bool        is_unsigned;
 | |
| 
 | |
|     json_t*       to_json() const;
 | |
|     static Column from_json(json_t* json);
 | |
| };
 | |
| 
 | |
| struct TableCreateEvent;
 | |
| typedef std::shared_ptr<TableCreateEvent> STableCreateEvent;
 | |
| 
 | |
| /** A CREATE TABLE abstraction */
 | |
| struct TableCreateEvent
 | |
| {
 | |
|     TableCreateEvent(std::string db, std::string table, int version, std::vector<Column>&& cols)
 | |
|         : columns(cols)
 | |
|         , table(table)
 | |
|         , database(db)
 | |
|         , version(version)
 | |
|         , was_used(false)
 | |
|     {
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Get the table identifier i.e. `database.table`
 | |
|      *
 | |
|      * @return The table identifier
 | |
|      */
 | |
|     std::string id() const
 | |
|     {
 | |
|         return database + '.' + table;
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Convert to JSON
 | |
|      *
 | |
|      * @return JSON representation of this object
 | |
|      */
 | |
|     json_t* to_json() const;
 | |
| 
 | |
|     /**
 | |
|      * Convert from JSON
 | |
|      *
 | |
|      * @param json JSON to convert from
 | |
|      *
 | |
|      * @return Object representation of JSON if it is valid or empty pointer if invalid.
 | |
|      */
 | |
|     static STableCreateEvent from_json(json_t* json);
 | |
| 
 | |
|     std::vector<Column> columns;
 | |
|     std::string         table;
 | |
|     std::string         database;
 | |
|     int                 version;        /**< How many versions of this table have been used */
 | |
|     bool                was_used;       /**< Has this schema been persisted to disk */
 | |
| };
 | |
| 
 | |
| /** A representation of a table map event read from a binary log. A table map
 | |
|  * maps a table to a unique ID which can be used to match row events to table map
 | |
|  * events. The table map event tells us how the table is laid out and gives us
 | |
|  * some meta information on the columns. */
 | |
| struct TableMapEvent
 | |
| {
 | |
|     TableMapEvent(const std::string& db,
 | |
|                   const std::string& table,
 | |
|                   uint64_t id,
 | |
|                   int version,
 | |
|                   Bytes&& cols,
 | |
|                   Bytes&& nulls,
 | |
|                   Bytes&& metadata)
 | |
|         : database(db)
 | |
|         , table(table)
 | |
|         , id(id)
 | |
|         , version(version)
 | |
|         , column_types(cols)
 | |
|         , null_bitmap(nulls)
 | |
|         , column_metadata(metadata)
 | |
|     {
 | |
|     }
 | |
| 
 | |
|     uint64_t columns() const
 | |
|     {
 | |
|         return column_types.size();
 | |
|     }
 | |
| 
 | |
|     std::string database;
 | |
|     std::string table;
 | |
|     uint64_t    id;
 | |
|     int         version;
 | |
|     Bytes       column_types;
 | |
|     Bytes       null_bitmap;
 | |
|     Bytes       column_metadata;
 | |
| };
 | |
| 
 | |
| typedef std::shared_ptr<TableMapEvent> STableMapEvent;
 | |
| 
 | |
| // Containers for the replication events
 | |
| typedef std::unordered_map<std::string, STableCreateEvent> CreatedTables;
 | |
| typedef std::unordered_map<std::string, STableMapEvent>    MappedTables;
 | |
| typedef std::unordered_map<uint64_t, STableMapEvent>       ActiveMaps;
 | |
| 
 | |
| // Handler class for row based replication events
 | |
| class RowEventHandler
 | |
| {
 | |
| public:
 | |
|     virtual ~RowEventHandler()
 | |
|     {
 | |
|     }
 | |
| 
 | |
|     // A table was created
 | |
|     virtual bool create_table(const STableCreateEvent& create)
 | |
|     {
 | |
|         return true;
 | |
|     }
 | |
| 
 | |
|     // A table was opened
 | |
|     virtual bool open_table(const STableMapEvent& map, const STableCreateEvent& create)
 | |
|     {
 | |
|         return true;
 | |
|     }
 | |
| 
 | |
|     // Prepare a table for row processing
 | |
|     virtual bool prepare_table(const STableMapEvent& map, const STableCreateEvent& create)
 | |
|     {
 | |
|         return true;
 | |
|     }
 | |
| 
 | |
|     // Flush open tables
 | |
|     virtual void flush_tables()
 | |
|     {
 | |
|     }
 | |
| 
 | |
|     // Prepare a new row for processing
 | |
|     virtual void prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type) = 0;
 | |
| 
 | |
|     // Called once all columns are processed
 | |
|     virtual bool commit(const gtid_pos_t& gtid) = 0;
 | |
| 
 | |
|     // Integer handler for short types (less than 32 bits)
 | |
|     virtual void column_int(int i, int32_t value) = 0;
 | |
| 
 | |
|     // Integer handler for long integer types
 | |
|     virtual void column_long(int i, int64_t value) = 0;
 | |
| 
 | |
|     // Float handler
 | |
|     virtual void column_float(int i, float value) = 0;
 | |
| 
 | |
|     // Double handler
 | |
|     virtual void column_double(int i, double value) = 0;
 | |
| 
 | |
|     // String handler
 | |
|     virtual void column_string(int i, const std::string& value) = 0;
 | |
| 
 | |
|     // Bytes handler
 | |
|     virtual void column_bytes(int i, uint8_t* value, int len) = 0;
 | |
| 
 | |
|     // Empty (NULL) value type handler
 | |
|     virtual void column_null(int i) = 0;
 | |
| };
 | |
| 
 | |
| typedef std::auto_ptr<RowEventHandler> SRowEventHandler;
 | |
| 
 | |
| class Rpl
 | |
| {
 | |
| public:
 | |
|     Rpl(const Rpl&) = delete;
 | |
|     Rpl& operator=(const Rpl&) = delete;
 | |
| 
 | |
|     // Construct a new replication stream transformer
 | |
|     Rpl(SERVICE* service,
 | |
|         SRowEventHandler event_handler,
 | |
|         pcre2_code* match,
 | |
|         pcre2_code* exclude,
 | |
|         gtid_pos_t =
 | |
|     {
 | |
|     });
 | |
| 
 | |
|     // Add a stored TableCreateEvent
 | |
|     void add_create(STableCreateEvent create);
 | |
| 
 | |
|     // Handle a replicated binary log event
 | |
|     void handle_event(REP_HEADER hdr, uint8_t* ptr);
 | |
| 
 | |
|     // Called when processed events need to be persisted to disk
 | |
|     void flush();
 | |
| 
 | |
|     // Check if binlog checksums are enabled
 | |
|     bool have_checksums() const
 | |
|     {
 | |
|         return m_binlog_checksum;
 | |
|     }
 | |
| 
 | |
|     // Set current GTID
 | |
|     void set_gtid(gtid_pos_t gtid)
 | |
|     {
 | |
|         m_gtid = gtid;
 | |
|     }
 | |
| 
 | |
|     // Get current GTID
 | |
|     const gtid_pos_t& get_gtid() const
 | |
|     {
 | |
|         return m_gtid;
 | |
|     }
 | |
| 
 | |
| private:
 | |
|     SRowEventHandler  m_handler;
 | |
|     SERVICE*          m_service;
 | |
|     pcre2_code*       m_create_table_re;
 | |
|     pcre2_code*       m_alter_table_re;
 | |
|     uint8_t           m_binlog_checksum;
 | |
|     uint8_t           m_event_types;
 | |
|     Bytes             m_event_type_hdr_lens;
 | |
|     gtid_pos_t        m_gtid;
 | |
|     ActiveMaps        m_active_maps;
 | |
|     MappedTables      m_table_maps;
 | |
|     CreatedTables     m_created_tables;
 | |
|     pcre2_code*       m_match;
 | |
|     pcre2_code*       m_exclude;
 | |
|     pcre2_match_data* m_md_match;
 | |
|     pcre2_match_data* m_md_exclude;
 | |
| 
 | |
|     std::unordered_map<std::string, int> m_versions;    // Table version numbers per identifier
 | |
| 
 | |
|     void              handle_query_event(REP_HEADER* hdr, uint8_t* ptr);
 | |
|     bool              handle_table_map_event(REP_HEADER* hdr, uint8_t* ptr);
 | |
|     bool              handle_row_event(REP_HEADER* hdr, uint8_t* ptr);
 | |
|     STableCreateEvent table_create_copy(const char* sql, size_t len, const char* db);
 | |
|     bool              save_and_replace_table_create(STableCreateEvent created);
 | |
|     bool              rename_table_create(STableCreateEvent created, const std::string& old_id);
 | |
|     bool              table_create_alter(STableCreateEvent create, const char* sql, const char* end);
 | |
|     void              table_create_rename(const std::string& db, const char* sql, const char* end);
 | |
|     bool              table_matches(const std::string& ident);
 | |
| };
 |