From 50b3c6a2debf29b779067d3b2f94905b94e6b58f Mon Sep 17 00:00:00 2001 From: MassimilianoPinto Date: Fri, 10 Nov 2017 19:54:15 +0100 Subject: [PATCH] MXS-701: binlog filtering MXS-701: first implementation of binlog filter --- server/modules/filter/CMakeLists.txt | 1 + .../filter/binlogfilter/CMakeLists.txt | 3 + .../filter/binlogfilter/binlogfilter.cc | 92 ++++++ .../filter/binlogfilter/binlogfilter.hh | 93 +++++++ .../binlogfilter/binlogfiltersession.cc | 261 ++++++++++++++++++ .../binlogfilter/binlogfiltersession.hh | 98 +++++++ 6 files changed, 548 insertions(+) create mode 100644 server/modules/filter/binlogfilter/CMakeLists.txt create mode 100644 server/modules/filter/binlogfilter/binlogfilter.cc create mode 100644 server/modules/filter/binlogfilter/binlogfilter.hh create mode 100644 server/modules/filter/binlogfilter/binlogfiltersession.cc create mode 100644 server/modules/filter/binlogfilter/binlogfiltersession.hh diff --git a/server/modules/filter/CMakeLists.txt b/server/modules/filter/CMakeLists.txt index 7a6bcead4..36752ba42 100644 --- a/server/modules/filter/CMakeLists.txt +++ b/server/modules/filter/CMakeLists.txt @@ -14,3 +14,4 @@ add_subdirectory(topfilter) add_subdirectory(tpmfilter) add_subdirectory(masking) add_subdirectory(insertstream) +add_subdirectory(binlogfilter) diff --git a/server/modules/filter/binlogfilter/CMakeLists.txt b/server/modules/filter/binlogfilter/CMakeLists.txt new file mode 100644 index 000000000..d62d608da --- /dev/null +++ b/server/modules/filter/binlogfilter/CMakeLists.txt @@ -0,0 +1,3 @@ +add_library(binlogfilter SHARED binlogfilter.cc binlogfiltersession.cc) +set_target_properties(binlogfilter PROPERTIES VERSION "1.0.0") +install_module(binlogfilter core) diff --git a/server/modules/filter/binlogfilter/binlogfilter.cc b/server/modules/filter/binlogfilter/binlogfilter.cc new file mode 100644 index 000000000..72fa6ee7f --- /dev/null +++ b/server/modules/filter/binlogfilter/binlogfilter.cc @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +// All log messages from this module are prefixed with this +#define MXS_MODULE_NAME "binlogfilter" + +#include "binlogfilter.hh" + +// This declares a module in MaxScale +extern "C" MXS_MODULE* MXS_CREATE_MODULE() +{ + static MXS_MODULE info = + { + MXS_MODULE_API_FILTER, + MXS_MODULE_IN_DEVELOPMENT, + MXS_FILTER_VERSION, + "A binlog event filter for slave servers", + "V1.0.0", + RCAP_TYPE_NONE, + &BinlogFilter::s_object, // This is defined in the MaxScale filter template + NULL, /* Process init. */ + NULL, /* Process finish. */ + NULL, /* Thread init. */ + NULL, /* Thread finish. */ + { + {"filter_events", MXS_MODULE_PARAM_BOOL, "false"}, + {"skip_table", MXS_MODULE_PARAM_STRING, ""}, + {"skip_db", MXS_MODULE_PARAM_STRING, ""}, + { MXS_END_MODULE_PARAMS } + } + }; + + return &info; +} + +// BinlogFilter constructor +BinlogFilter::BinlogFilter(const MXS_CONFIG_PARAMETER* pParams) + : m_config(pParams) +{ +} + +// BinlogFilter destructor +BinlogFilter::~BinlogFilter() +{ +} + +// static: filter create routine +BinlogFilter* BinlogFilter::create(const char* zName, + char** pzOptions, + MXS_CONFIG_PARAMETER* pParams) +{ + return new BinlogFilter(pParams); +} + +// BinlogFilterSession create routine +BinlogFilterSession* BinlogFilter::newSession(MXS_SESSION* pSession) +{ + return BinlogFilterSession::create(pSession, this); +} + +// static +void BinlogFilter::diagnostics(DCB* pDcb) const +{ +} + +// static +json_t* BinlogFilter::diagnostics_json() const +{ + return NULL; +} + +// static +uint64_t BinlogFilter::getCapabilities() +{ + return RCAP_TYPE_NONE; +} + +// Return BinlogFilter active state +bool BinlogFilter::is_active() const +{ + return m_config.active; +} diff --git a/server/modules/filter/binlogfilter/binlogfilter.hh b/server/modules/filter/binlogfilter/binlogfilter.hh new file mode 100644 index 000000000..db47ef502 --- /dev/null +++ b/server/modules/filter/binlogfilter/binlogfilter.hh @@ -0,0 +1,93 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include +#include +#include "binlogfiltersession.hh" + +// Binlog Filter configuration +struct BinlogConfig +{ + // Constructor + BinlogConfig(const MXS_CONFIG_PARAMETER* pParams) + : active(config_get_bool(pParams, "filter_events")) + , dbname(config_get_string(pParams, "skip_db")) + , table(config_get_string(pParams, "skip_table")) + { + }; + + // Destructor + ~BinlogConfig() {}; + + // Members mapped to config options + bool active; + std::string dbname; + std::string table; +}; + +class BinlogFilter : public maxscale::Filter +{ + // Prevent copy-constructor and assignment operator usage + BinlogFilter(const BinlogFilter&); + BinlogFilter& operator = (const BinlogFilter&); + +public: + ~BinlogFilter(); + + // Creates a new filter instance + static BinlogFilter* create(const char* zName, + char** pzOptions, + MXS_CONFIG_PARAMETER* ppParams); + + // Creates a new session for this filter + BinlogFilterSession* newSession(MXS_SESSION* pSession); + + // Print diagnostics to a DCB + void diagnostics(DCB* pDcb) const; + + // Returns JSON form diagnostic data + json_t* diagnostics_json() const; + + // Get filter capabilities + uint64_t getCapabilities(); + + // Filter is active + bool is_active() const; + + // Return reference to filter config + const BinlogConfig& getConfig() const + { + return m_config; + } + +private: + // Constructor: used in the create function + BinlogFilter(const MXS_CONFIG_PARAMETER* pParams); + +private: + /** + * Current configuration in maxscale.cnf + * + * [BinlogFilter] + * type=filter + * module=binlogfilter + * filter_events=On + * skip_table=t4 + * skip_db=test + * + * Note: Only one table and one db right now + */ + BinlogConfig m_config; +}; diff --git a/server/modules/filter/binlogfilter/binlogfiltersession.cc b/server/modules/filter/binlogfilter/binlogfiltersession.cc new file mode 100644 index 000000000..709439296 --- /dev/null +++ b/server/modules/filter/binlogfilter/binlogfiltersession.cc @@ -0,0 +1,261 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +// All log messages from this module are prefixed with this +#define MXS_MODULE_NAME "binlogfilter" + +#include +#include "binlogfilter.hh" +#include "binlogfiltersession.hh" +#include + +// New packet which replaces the skipped events has 0 payload +#define NEW_PACKET_PAYLOD BINLOG_EVENT_HDR_LEN + +// BinlogFilterSession constructor +BinlogFilterSession::BinlogFilterSession(MXS_SESSION* pSession, + const BinlogFilter* pFilter) + : mxs::FilterSession(pSession) + , m_filter(*pFilter) + , m_serverid(0) + , m_state(pFilter->is_active() ? COMMAND_MODE : INACTIVE) + , m_skip(false) + , m_complete_packet(true) + , m_crc(false) +{ + MXS_NOTICE("Filter [%s] is %s", + MXS_MODULE_NAME, + m_filter.getConfig().active ? "enabled" : "disabled"); +} + +// BinlogFilterSession destructor +BinlogFilterSession::~BinlogFilterSession() +{ +} + +//static: create new session +BinlogFilterSession* BinlogFilterSession::create(MXS_SESSION* pSession, + const BinlogFilter* pFilter) +{ + return new BinlogFilterSession(pSession, pFilter); +} + +// route input data from client +int BinlogFilterSession::routeQuery(GWBUF* pPacket) +{ + if (m_state != INACTIVE) + { + uint8_t *data = GWBUF_DATA(pPacket); + + // We assume OK indicator, the first byte after MYSQL_HEADER_LEN is 0 + // TODO: check complete packet or + // at least MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN bytes + switch ((int)MYSQL_GET_COMMAND(data)) + { + case COM_REGISTER_SLAVE: + // Connected client is registering as Slave Server + m_serverid = gw_mysql_get_byte4(data + MYSQL_HEADER_LEN + 1); + MXS_INFO("Client is registering as " + "Slave server with ID %" PRIu32 "", + m_serverid); + break; + case COM_BINLOG_DUMP: + // Connected Slave server waits for binlog events + m_state = BINLOG_MODE; + MXS_INFO("Slave server %" PRIu32 " is waiting for binlog events.", + m_serverid); + break; + default: + // Connected client is using SQL mode + m_state = COMMAND_MODE; + break; + } + } + + // Route input data + return mxs::FilterSession::routeQuery(pPacket); +} + +// Reply data to client +int BinlogFilterSession::clientReply(GWBUF* pPacket) +{ + if (m_state == BINLOG_MODE) + { + if (skipEvent(pPacket)) + { + // Assuming ROW replication format: + // If transaction events needs to be skipped, + // they are replaced by an empty paylod packet + filterEvent(pPacket); + } + } + + // Send data + return mxs::FilterSession::clientReply(pPacket); +} + +// close session +void BinlogFilterSession::close() +{ + if (m_state == BINLOG_MODE) + { + MXS_INFO("Slave server %" PRIu32 ": replication stopped.", + m_serverid); + } +} + +// extract binlog replication header from event data +static inline void extractHeader(register const uint8_t *event, + register REP_HEADER *hdr) +{ + hdr->payload_len = gw_mysql_get_byte3(event); + hdr->seqno = event[3]; + hdr->ok = event[MYSQL_HEADER_LEN]; + hdr->timestamp = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1); + hdr->event_type = event[MYSQL_HEADER_LEN + 1 + 4]; + // TODO: add offsets in order to facilitate reading + hdr->serverid = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1 + 4 + 1); + hdr->event_size = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4); + hdr->next_pos = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4); + hdr->flags = gw_mysql_get_byte2(event + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4 + 4); + + MXS_INFO("Slave server %" PRIu32 ": clientReply, event_type [%d], " + "flags %d, event_size %" PRIu32 ", next_pos %" PRIu32 ", packet size %" PRIu32 "", + hdr->serverid, + hdr->event_type, + hdr->flags, + hdr->event_size, + hdr->next_pos, + hdr->payload_len); +} + +// Check whether events in a transaction can be skipped. +// The triggering event is TABLE_MAP_EVENT. +bool BinlogFilterSession::skipEvent(GWBUF* buffer) +{ + uint8_t *event = GWBUF_DATA(buffer); + REP_HEADER hdr; + + // Extract Replication header event from event data + extractHeader(event, &hdr); + + if (hdr.ok == 0) + { + switch(hdr.event_type) + { + case TABLE_MAP_EVENT: + // Check db/table and set m_skip accordingly + skipDatabaseTable(event, hdr); + break; + case XID_EVENT: + // COMMIT: reset m_skip if set and set next pos to 0 + if (m_skip) + { + m_skip = false; + // Some events skipped. + // Set next pos to 0 instead of real one. + gw_mysql_set_byte4(event + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4, 0); + + MXS_INFO("Skipped events: Setting next_pos = 0 in XID_EVENT"); + } + break; + default: + // Other events can be skipped or not, depending on m_skip value + break; + } + // m_skip could be true or false + return m_skip; + } + else + { + return false; // always false: no filtering + } +} + +// Extract Dbname and Tabe name from TABLE_MAP event +static void inline extractTableInfo(const uint8_t *ptr, + char **dbname, + char **tblname) +{ + // TODO: add offsets in order to facilitate reading + int db_len = *(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8); + + *dbname = (char *)(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 1); + *tblname = (char *)(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 1 + db_len + 1 + 1); +} + +// Check whether a db/table can be skipped based on configuration +void BinlogFilterSession::skipDatabaseTable(const uint8_t* data, + const REP_HEADER& hdr) +{ + // Check for TABBLE_MAP event: + // Each time this event is seen the m_skip is overwritten + if (hdr.ok == 0 && hdr.event_type == TABLE_MAP_EVENT) + { + char *db = NULL; + char *table = NULL; + const BinlogConfig& fConfig = m_filter.getConfig(); + + // Get db/table + extractTableInfo(data, &db, &table); + + // Check match with configuration + m_skip = (bool)(strcmp(db, fConfig.dbname.c_str()) == 0 || + strcmp(table, fConfig.table.c_str()) == 0); + + MXS_INFO("Dbname is [%s], Table is [%s], Skip [%s]\n", + db ? db : "", + table ? table : "", + m_skip ? "Yes" : "No"); + } +} + +// Replace the current event: no memory allocation +void BinlogFilterSession::filterEvent(GWBUF* pPacket) +{ + ss_dassert(m_skip == true); + + uint8_t *ptr = GWBUF_DATA(pPacket); + + // Set NEW event_type + ptr[MYSQL_HEADER_LEN + 1 + 4] = RAND_EVENT; + // SET ignorable flags + gw_mysql_set_byte2(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4 + 4, + LOG_EVENT_IGNORABLE_F | LOG_EVENT_SKIP_REPLICATION_F); + + // Set event_len, size of empty rand_event (header + 0 bytes) + gw_mysql_set_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4, + BINLOG_EVENT_HDR_LEN + 0); + + // Set next pos to 0 + gw_mysql_set_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4, 0); + + // Set New Packet size: even_len + 1 byte replication status + gw_mysql_set_byte3(ptr, BINLOG_EVENT_HDR_LEN + 0 + 1); + + MXS_INFO("All events belonging to this table will be skipped"); + + MXS_INFO("Filtered event #%d," + "ok %d, type %d, flags %d, size %d, next_pos %d, packet_size %d\n", + ptr[3], + ptr[4], + RAND_EVENT, + gw_mysql_get_byte2(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4 + 4), + gw_mysql_get_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4), + gw_mysql_get_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4), + gw_mysql_get_byte3(ptr)); + + // Remove useless bytes + pPacket = gwbuf_rtrim(pPacket, + gwbuf_length(pPacket) - (BINLOG_EVENT_HDR_LEN + 1 + 4)); +} diff --git a/server/modules/filter/binlogfilter/binlogfiltersession.hh b/server/modules/filter/binlogfilter/binlogfiltersession.hh new file mode 100644 index 000000000..64d676a8a --- /dev/null +++ b/server/modules/filter/binlogfilter/binlogfiltersession.hh @@ -0,0 +1,98 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include +#include "binlogfilter.hh" + +//TODO: add in a separate .h file in common to BinlogRouter code +#define LOG_EVENT_IGNORABLE_F 0x0080 +#define LOG_EVENT_SKIP_REPLICATION_F 0x8000 +#define RAND_EVENT 0x000D +#define TABLE_MAP_EVENT 0x0013 +#define XID_EVENT 0x0010 +#define BINLOG_EVENT_HDR_LEN 19 + +typedef struct rep_header_t +{ + int payload_len; /*< Payload length (24 bits) */ + uint8_t seqno; /*< Response sequence number */ + uint8_t ok; /*< OK Byte from packet */ + uint32_t timestamp; /*< Timestamp - start of binlog record */ + uint8_t event_type; /*< Binlog event type */ + uint32_t serverid; /*< Server id of master */ + uint32_t event_size; /*< Size of header, post-header and body */ + uint32_t next_pos; /*< Position of next event */ + uint16_t flags; /*< Event flags */ +} REP_HEADER; +// End TODO +// +class BinlogFilter; + +class BinlogFilterSession : public maxscale::FilterSession +{ + // Prevent copy-constructor and assignment operator usage + BinlogFilterSession(const BinlogFilterSession&); + BinlogFilterSession& operator = (const BinlogFilterSession&); + +public: + ~BinlogFilterSession(); + + // Create a new filter session + static BinlogFilterSession* create(MXS_SESSION* pSession, + const BinlogFilter* pFilter); + + // Called when a client session has been closed + void close(); + + // Handle a query from the client + int routeQuery(GWBUF* pPacket); + + // Handle a reply from server + int clientReply(GWBUF* pPacket); + + // Whether to skip current event + bool skipEvent(GWBUF* data); + + // Skip database/table events in current trasaction + void skipDatabaseTable(const uint8_t* data, const REP_HEADER& hdr); + + // Filter the replication event + void filterEvent(GWBUF* data); + +private: + // Used in the create function + BinlogFilterSession(MXS_SESSION* pSession, const BinlogFilter* pFilter); + + // Reference to Filter instance + const BinlogFilter& m_filter; + +private: + // Internal states for filter operations + enum state_t + { + INACTIVE, // Fitering is not active + COMMAND_MODE, // Connected client in SQL mode: no filtering + BINLOG_MODE // Connected client in BINLOG_MODE: filter events + }; + +private: + // Event filtering member vars + uint32_t m_serverid; // server-id of connected slave + state_t m_state; // Internal state + bool m_skip; // Mark event skipping + bool m_complete_packet; // A complete received. Not implemented + bool m_crc; // CRC32 for events. Not implemented + bool m_large_payload; // Packet bigger than 16MB. Not implemented +};