MXS-701: binlog filtering

MXS-701: first implementation of binlog filter
This commit is contained in:
MassimilianoPinto
2017-11-10 19:54:15 +01:00
parent e586d84c47
commit 50b3c6a2de
6 changed files with 548 additions and 0 deletions

View File

@ -14,3 +14,4 @@ add_subdirectory(topfilter)
add_subdirectory(tpmfilter)
add_subdirectory(masking)
add_subdirectory(insertstream)
add_subdirectory(binlogfilter)

View File

@ -0,0 +1,3 @@
add_library(binlogfilter SHARED binlogfilter.cc binlogfiltersession.cc)
set_target_properties(binlogfilter PROPERTIES VERSION "1.0.0")
install_module(binlogfilter core)

View File

@ -0,0 +1,92 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
// All log messages from this module are prefixed with this
#define MXS_MODULE_NAME "binlogfilter"
#include "binlogfilter.hh"
// This declares a module in MaxScale
extern "C" MXS_MODULE* MXS_CREATE_MODULE()
{
static MXS_MODULE info =
{
MXS_MODULE_API_FILTER,
MXS_MODULE_IN_DEVELOPMENT,
MXS_FILTER_VERSION,
"A binlog event filter for slave servers",
"V1.0.0",
RCAP_TYPE_NONE,
&BinlogFilter::s_object, // This is defined in the MaxScale filter template
NULL, /* Process init. */
NULL, /* Process finish. */
NULL, /* Thread init. */
NULL, /* Thread finish. */
{
{"filter_events", MXS_MODULE_PARAM_BOOL, "false"},
{"skip_table", MXS_MODULE_PARAM_STRING, ""},
{"skip_db", MXS_MODULE_PARAM_STRING, ""},
{ MXS_END_MODULE_PARAMS }
}
};
return &info;
}
// BinlogFilter constructor
BinlogFilter::BinlogFilter(const MXS_CONFIG_PARAMETER* pParams)
: m_config(pParams)
{
}
// BinlogFilter destructor
BinlogFilter::~BinlogFilter()
{
}
// static: filter create routine
BinlogFilter* BinlogFilter::create(const char* zName,
char** pzOptions,
MXS_CONFIG_PARAMETER* pParams)
{
return new BinlogFilter(pParams);
}
// BinlogFilterSession create routine
BinlogFilterSession* BinlogFilter::newSession(MXS_SESSION* pSession)
{
return BinlogFilterSession::create(pSession, this);
}
// static
void BinlogFilter::diagnostics(DCB* pDcb) const
{
}
// static
json_t* BinlogFilter::diagnostics_json() const
{
return NULL;
}
// static
uint64_t BinlogFilter::getCapabilities()
{
return RCAP_TYPE_NONE;
}
// Return BinlogFilter active state
bool BinlogFilter::is_active() const
{
return m_config.active;
}

View File

@ -0,0 +1,93 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <string>
#include <maxscale/cppdefs.hh>
#include <maxscale/filter.hh>
#include "binlogfiltersession.hh"
// Binlog Filter configuration
struct BinlogConfig
{
// Constructor
BinlogConfig(const MXS_CONFIG_PARAMETER* pParams)
: active(config_get_bool(pParams, "filter_events"))
, dbname(config_get_string(pParams, "skip_db"))
, table(config_get_string(pParams, "skip_table"))
{
};
// Destructor
~BinlogConfig() {};
// Members mapped to config options
bool active;
std::string dbname;
std::string table;
};
class BinlogFilter : public maxscale::Filter<BinlogFilter, BinlogFilterSession>
{
// Prevent copy-constructor and assignment operator usage
BinlogFilter(const BinlogFilter&);
BinlogFilter& operator = (const BinlogFilter&);
public:
~BinlogFilter();
// Creates a new filter instance
static BinlogFilter* create(const char* zName,
char** pzOptions,
MXS_CONFIG_PARAMETER* ppParams);
// Creates a new session for this filter
BinlogFilterSession* newSession(MXS_SESSION* pSession);
// Print diagnostics to a DCB
void diagnostics(DCB* pDcb) const;
// Returns JSON form diagnostic data
json_t* diagnostics_json() const;
// Get filter capabilities
uint64_t getCapabilities();
// Filter is active
bool is_active() const;
// Return reference to filter config
const BinlogConfig& getConfig() const
{
return m_config;
}
private:
// Constructor: used in the create function
BinlogFilter(const MXS_CONFIG_PARAMETER* pParams);
private:
/**
* Current configuration in maxscale.cnf
*
* [BinlogFilter]
* type=filter
* module=binlogfilter
* filter_events=On
* skip_table=t4
* skip_db=test
*
* Note: Only one table and one db right now
*/
BinlogConfig m_config;
};

View File

@ -0,0 +1,261 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
// All log messages from this module are prefixed with this
#define MXS_MODULE_NAME "binlogfilter"
#include <maxscale/protocol/mysql.h>
#include "binlogfilter.hh"
#include "binlogfiltersession.hh"
#include <inttypes.h>
// New packet which replaces the skipped events has 0 payload
#define NEW_PACKET_PAYLOD BINLOG_EVENT_HDR_LEN
// BinlogFilterSession constructor
BinlogFilterSession::BinlogFilterSession(MXS_SESSION* pSession,
const BinlogFilter* pFilter)
: mxs::FilterSession(pSession)
, m_filter(*pFilter)
, m_serverid(0)
, m_state(pFilter->is_active() ? COMMAND_MODE : INACTIVE)
, m_skip(false)
, m_complete_packet(true)
, m_crc(false)
{
MXS_NOTICE("Filter [%s] is %s",
MXS_MODULE_NAME,
m_filter.getConfig().active ? "enabled" : "disabled");
}
// BinlogFilterSession destructor
BinlogFilterSession::~BinlogFilterSession()
{
}
//static: create new session
BinlogFilterSession* BinlogFilterSession::create(MXS_SESSION* pSession,
const BinlogFilter* pFilter)
{
return new BinlogFilterSession(pSession, pFilter);
}
// route input data from client
int BinlogFilterSession::routeQuery(GWBUF* pPacket)
{
if (m_state != INACTIVE)
{
uint8_t *data = GWBUF_DATA(pPacket);
// We assume OK indicator, the first byte after MYSQL_HEADER_LEN is 0
// TODO: check complete packet or
// at least MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN bytes
switch ((int)MYSQL_GET_COMMAND(data))
{
case COM_REGISTER_SLAVE:
// Connected client is registering as Slave Server
m_serverid = gw_mysql_get_byte4(data + MYSQL_HEADER_LEN + 1);
MXS_INFO("Client is registering as "
"Slave server with ID %" PRIu32 "",
m_serverid);
break;
case COM_BINLOG_DUMP:
// Connected Slave server waits for binlog events
m_state = BINLOG_MODE;
MXS_INFO("Slave server %" PRIu32 " is waiting for binlog events.",
m_serverid);
break;
default:
// Connected client is using SQL mode
m_state = COMMAND_MODE;
break;
}
}
// Route input data
return mxs::FilterSession::routeQuery(pPacket);
}
// Reply data to client
int BinlogFilterSession::clientReply(GWBUF* pPacket)
{
if (m_state == BINLOG_MODE)
{
if (skipEvent(pPacket))
{
// Assuming ROW replication format:
// If transaction events needs to be skipped,
// they are replaced by an empty paylod packet
filterEvent(pPacket);
}
}
// Send data
return mxs::FilterSession::clientReply(pPacket);
}
// close session
void BinlogFilterSession::close()
{
if (m_state == BINLOG_MODE)
{
MXS_INFO("Slave server %" PRIu32 ": replication stopped.",
m_serverid);
}
}
// extract binlog replication header from event data
static inline void extractHeader(register const uint8_t *event,
register REP_HEADER *hdr)
{
hdr->payload_len = gw_mysql_get_byte3(event);
hdr->seqno = event[3];
hdr->ok = event[MYSQL_HEADER_LEN];
hdr->timestamp = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1);
hdr->event_type = event[MYSQL_HEADER_LEN + 1 + 4];
// TODO: add offsets in order to facilitate reading
hdr->serverid = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1 + 4 + 1);
hdr->event_size = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4);
hdr->next_pos = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4);
hdr->flags = gw_mysql_get_byte2(event + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4 + 4);
MXS_INFO("Slave server %" PRIu32 ": clientReply, event_type [%d], "
"flags %d, event_size %" PRIu32 ", next_pos %" PRIu32 ", packet size %" PRIu32 "",
hdr->serverid,
hdr->event_type,
hdr->flags,
hdr->event_size,
hdr->next_pos,
hdr->payload_len);
}
// Check whether events in a transaction can be skipped.
// The triggering event is TABLE_MAP_EVENT.
bool BinlogFilterSession::skipEvent(GWBUF* buffer)
{
uint8_t *event = GWBUF_DATA(buffer);
REP_HEADER hdr;
// Extract Replication header event from event data
extractHeader(event, &hdr);
if (hdr.ok == 0)
{
switch(hdr.event_type)
{
case TABLE_MAP_EVENT:
// Check db/table and set m_skip accordingly
skipDatabaseTable(event, hdr);
break;
case XID_EVENT:
// COMMIT: reset m_skip if set and set next pos to 0
if (m_skip)
{
m_skip = false;
// Some events skipped.
// Set next pos to 0 instead of real one.
gw_mysql_set_byte4(event + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4, 0);
MXS_INFO("Skipped events: Setting next_pos = 0 in XID_EVENT");
}
break;
default:
// Other events can be skipped or not, depending on m_skip value
break;
}
// m_skip could be true or false
return m_skip;
}
else
{
return false; // always false: no filtering
}
}
// Extract Dbname and Tabe name from TABLE_MAP event
static void inline extractTableInfo(const uint8_t *ptr,
char **dbname,
char **tblname)
{
// TODO: add offsets in order to facilitate reading
int db_len = *(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8);
*dbname = (char *)(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 1);
*tblname = (char *)(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 1 + db_len + 1 + 1);
}
// Check whether a db/table can be skipped based on configuration
void BinlogFilterSession::skipDatabaseTable(const uint8_t* data,
const REP_HEADER& hdr)
{
// Check for TABBLE_MAP event:
// Each time this event is seen the m_skip is overwritten
if (hdr.ok == 0 && hdr.event_type == TABLE_MAP_EVENT)
{
char *db = NULL;
char *table = NULL;
const BinlogConfig& fConfig = m_filter.getConfig();
// Get db/table
extractTableInfo(data, &db, &table);
// Check match with configuration
m_skip = (bool)(strcmp(db, fConfig.dbname.c_str()) == 0 ||
strcmp(table, fConfig.table.c_str()) == 0);
MXS_INFO("Dbname is [%s], Table is [%s], Skip [%s]\n",
db ? db : "",
table ? table : "",
m_skip ? "Yes" : "No");
}
}
// Replace the current event: no memory allocation
void BinlogFilterSession::filterEvent(GWBUF* pPacket)
{
ss_dassert(m_skip == true);
uint8_t *ptr = GWBUF_DATA(pPacket);
// Set NEW event_type
ptr[MYSQL_HEADER_LEN + 1 + 4] = RAND_EVENT;
// SET ignorable flags
gw_mysql_set_byte2(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4 + 4,
LOG_EVENT_IGNORABLE_F | LOG_EVENT_SKIP_REPLICATION_F);
// Set event_len, size of empty rand_event (header + 0 bytes)
gw_mysql_set_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4,
BINLOG_EVENT_HDR_LEN + 0);
// Set next pos to 0
gw_mysql_set_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4, 0);
// Set New Packet size: even_len + 1 byte replication status
gw_mysql_set_byte3(ptr, BINLOG_EVENT_HDR_LEN + 0 + 1);
MXS_INFO("All events belonging to this table will be skipped");
MXS_INFO("Filtered event #%d,"
"ok %d, type %d, flags %d, size %d, next_pos %d, packet_size %d\n",
ptr[3],
ptr[4],
RAND_EVENT,
gw_mysql_get_byte2(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4 + 4),
gw_mysql_get_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4),
gw_mysql_get_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4),
gw_mysql_get_byte3(ptr));
// Remove useless bytes
pPacket = gwbuf_rtrim(pPacket,
gwbuf_length(pPacket) - (BINLOG_EVENT_HDR_LEN + 1 + 4));
}

View File

@ -0,0 +1,98 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include <maxscale/filter.hh>
#include "binlogfilter.hh"
//TODO: add in a separate .h file in common to BinlogRouter code
#define LOG_EVENT_IGNORABLE_F 0x0080
#define LOG_EVENT_SKIP_REPLICATION_F 0x8000
#define RAND_EVENT 0x000D
#define TABLE_MAP_EVENT 0x0013
#define XID_EVENT 0x0010
#define BINLOG_EVENT_HDR_LEN 19
typedef struct rep_header_t
{
int payload_len; /*< Payload length (24 bits) */
uint8_t seqno; /*< Response sequence number */
uint8_t ok; /*< OK Byte from packet */
uint32_t timestamp; /*< Timestamp - start of binlog record */
uint8_t event_type; /*< Binlog event type */
uint32_t serverid; /*< Server id of master */
uint32_t event_size; /*< Size of header, post-header and body */
uint32_t next_pos; /*< Position of next event */
uint16_t flags; /*< Event flags */
} REP_HEADER;
// End TODO
//
class BinlogFilter;
class BinlogFilterSession : public maxscale::FilterSession
{
// Prevent copy-constructor and assignment operator usage
BinlogFilterSession(const BinlogFilterSession&);
BinlogFilterSession& operator = (const BinlogFilterSession&);
public:
~BinlogFilterSession();
// Create a new filter session
static BinlogFilterSession* create(MXS_SESSION* pSession,
const BinlogFilter* pFilter);
// Called when a client session has been closed
void close();
// Handle a query from the client
int routeQuery(GWBUF* pPacket);
// Handle a reply from server
int clientReply(GWBUF* pPacket);
// Whether to skip current event
bool skipEvent(GWBUF* data);
// Skip database/table events in current trasaction
void skipDatabaseTable(const uint8_t* data, const REP_HEADER& hdr);
// Filter the replication event
void filterEvent(GWBUF* data);
private:
// Used in the create function
BinlogFilterSession(MXS_SESSION* pSession, const BinlogFilter* pFilter);
// Reference to Filter instance
const BinlogFilter& m_filter;
private:
// Internal states for filter operations
enum state_t
{
INACTIVE, // Fitering is not active
COMMAND_MODE, // Connected client in SQL mode: no filtering
BINLOG_MODE // Connected client in BINLOG_MODE: filter events
};
private:
// Event filtering member vars
uint32_t m_serverid; // server-id of connected slave
state_t m_state; // Internal state
bool m_skip; // Mark event skipping
bool m_complete_packet; // A complete received. Not implemented
bool m_crc; // CRC32 for events. Not implemented
bool m_large_payload; // Packet bigger than 16MB. Not implemented
};