MXS-701: Improve binlogfilter
Added `match` and `exclude` functionality. This allows versatile filtering without a large investment of development time by leveraging the benefits of PCRE2 regular expressions. Also cleaned up the filter and removed the single table matching and active parameter that were obsoleted by the regular expression parameters.
This commit is contained in:
parent
f3d96822d0
commit
047242a721
@ -761,6 +761,13 @@ qc_sql_mode_t qc_get_sql_mode();
|
||||
*/
|
||||
char** qc_get_table_names(GWBUF* stmt, int* size, bool fullnames);
|
||||
|
||||
/**
|
||||
* Free tables returned by qc_get_table_names
|
||||
*
|
||||
* @param names List of names
|
||||
* @param size Size of @c names
|
||||
*/
|
||||
void qc_free_table_names(char** names, int size);
|
||||
|
||||
/**
|
||||
* Returns a bitmask specifying the type(s) of the statement. The result
|
||||
|
@ -654,6 +654,19 @@ char** qc_get_table_names(GWBUF* query, int* tblsize, bool fullnames)
|
||||
return names;
|
||||
}
|
||||
|
||||
void qc_free_table_names(char** names, int tblsize)
|
||||
{
|
||||
if (names)
|
||||
{
|
||||
for (int i = 0; i < tblsize; i++)
|
||||
{
|
||||
MXS_FREE(names[i]);
|
||||
}
|
||||
|
||||
MXS_FREE(names);
|
||||
}
|
||||
}
|
||||
|
||||
char* qc_get_canonical(GWBUF* query)
|
||||
{
|
||||
QC_TRACE();
|
||||
|
@ -19,23 +19,23 @@
|
||||
// This declares a module in MaxScale
|
||||
extern "C" MXS_MODULE* MXS_CREATE_MODULE()
|
||||
{
|
||||
static const char desc[] = "A binlog event filter for slave servers";
|
||||
static MXS_MODULE info =
|
||||
{
|
||||
MXS_MODULE_API_FILTER,
|
||||
MXS_MODULE_IN_DEVELOPMENT,
|
||||
MXS_FILTER_VERSION,
|
||||
"A binlog event filter for slave servers",
|
||||
desc,
|
||||
"V1.0.0",
|
||||
RCAP_TYPE_NONE,
|
||||
&BinlogFilter::s_object, // This is defined in the MaxScale filter template
|
||||
NULL, /* Process init. */
|
||||
NULL, /* Process finish. */
|
||||
NULL, /* Thread init. */
|
||||
NULL, /* Thread finish. */
|
||||
&BinlogFilter::s_object,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
{
|
||||
{"filter_events", MXS_MODULE_PARAM_BOOL, "false"},
|
||||
{"skip_table", MXS_MODULE_PARAM_STRING, "" },
|
||||
{"skip_db", MXS_MODULE_PARAM_STRING, "" },
|
||||
{"match", MXS_MODULE_PARAM_REGEX },
|
||||
{"exclude", MXS_MODULE_PARAM_REGEX },
|
||||
{MXS_END_MODULE_PARAMS}
|
||||
}
|
||||
};
|
||||
@ -83,9 +83,3 @@ uint64_t BinlogFilter::getCapabilities()
|
||||
{
|
||||
return RCAP_TYPE_NONE;
|
||||
}
|
||||
|
||||
// Return BinlogFilter active state
|
||||
bool BinlogFilter::is_active() const
|
||||
{
|
||||
return m_config.active;
|
||||
}
|
||||
|
@ -18,26 +18,20 @@
|
||||
#include "binlogfiltersession.hh"
|
||||
|
||||
// Binlog Filter configuration
|
||||
class BinlogConfig
|
||||
struct BinlogConfig
|
||||
{
|
||||
public:
|
||||
// Constructor
|
||||
BinlogConfig(const MXS_CONFIG_PARAMETER* pParams)
|
||||
: active(config_get_bool(pParams, "filter_events"))
|
||||
, dbname(config_get_string(pParams, "skip_db"))
|
||||
, table(config_get_string(pParams, "skip_table"))
|
||||
: match(config_get_compiled_regex(pParams, "match", 0, nullptr))
|
||||
, md_match(match ? pcre2_match_data_create_from_pattern(match, nullptr) : nullptr)
|
||||
, exclude(config_get_compiled_regex(pParams, "exclude", 0, nullptr))
|
||||
, md_exclude(exclude ? pcre2_match_data_create_from_pattern(exclude, nullptr) : nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
// Destructor
|
||||
~BinlogConfig()
|
||||
{
|
||||
}
|
||||
|
||||
// Members mapped to config options
|
||||
bool active;
|
||||
std::string dbname;
|
||||
std::string table;
|
||||
pcre2_code* match;
|
||||
pcre2_match_data* md_match;
|
||||
pcre2_code* exclude;
|
||||
pcre2_match_data* md_exclude;
|
||||
};
|
||||
|
||||
class BinlogFilter : public maxscale::Filter<BinlogFilter, BinlogFilterSession>
|
||||
@ -65,9 +59,6 @@ public:
|
||||
// Get filter capabilities
|
||||
uint64_t getCapabilities();
|
||||
|
||||
// Filter is active
|
||||
bool is_active() const;
|
||||
|
||||
// Return reference to filter config
|
||||
const BinlogConfig& getConfig() const
|
||||
{
|
||||
@ -79,17 +70,5 @@ private:
|
||||
BinlogFilter(const MXS_CONFIG_PARAMETER* pParams);
|
||||
|
||||
private:
|
||||
/**
|
||||
* Current configuration in maxscale.cnf
|
||||
*
|
||||
* [BinlogFilter]
|
||||
* type=filter
|
||||
* module=binlogfilter
|
||||
* filter_events=On
|
||||
* skip_table=t4
|
||||
* skip_db=test
|
||||
*
|
||||
* Note: Only one table and one db right now
|
||||
*/
|
||||
BinlogConfig m_config;
|
||||
};
|
||||
|
@ -44,13 +44,17 @@
|
||||
// All log messages from this module are prefixed with this
|
||||
#define MXS_MODULE_NAME "binlogfilter"
|
||||
|
||||
#include <zlib.h>
|
||||
#include <inttypes.h>
|
||||
#include <algorithm>
|
||||
|
||||
#include <maxscale/protocol/mysql.h>
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/poll.h>
|
||||
|
||||
#include "binlogfilter.hh"
|
||||
#include "binlogfiltersession.hh"
|
||||
#include <zlib.h>
|
||||
#include <inttypes.h>
|
||||
|
||||
|
||||
// New packet which replaces the skipped events has 0 payload
|
||||
#define NEW_PACKET_PAYLOD BINLOG_EVENT_HDR_LEN
|
||||
@ -70,17 +74,7 @@ BinlogFilterSession::BinlogFilterSession(MXS_SESSION* pSession,
|
||||
const BinlogFilter* pFilter)
|
||||
: mxs::FilterSession(pSession)
|
||||
, m_filter(*pFilter)
|
||||
, m_serverid(0)
|
||||
, m_state(pFilter->is_active() ? COMMAND_MODE : INACTIVE)
|
||||
, m_skip(false)
|
||||
, m_crc(false)
|
||||
, m_large_left(0)
|
||||
, m_is_large(0)
|
||||
, m_sql_query(NULL)
|
||||
{
|
||||
MXS_NOTICE("Filter [%s] is %s",
|
||||
MXS_MODULE_NAME,
|
||||
m_filter.getConfig().active ? "enabled" : "disabled");
|
||||
}
|
||||
|
||||
/**
|
||||
@ -104,6 +98,16 @@ BinlogFilterSession* BinlogFilterSession::create(MXS_SESSION* pSession,
|
||||
return new BinlogFilterSession(pSession, pFilter);
|
||||
}
|
||||
|
||||
static bool is_master_binlog_checksum(GWBUF* buffer)
|
||||
{
|
||||
const char target[] = "SELECT @master_binlog_checksum";
|
||||
char query[1024]; // Large enough for most practical cases
|
||||
size_t bytes = gwbuf_copy_data(buffer, MYSQL_HEADER_LEN + 1, sizeof(query) - 1, (uint8_t*)query);
|
||||
query[bytes] = '\0';
|
||||
|
||||
return strcasestr(query, target);
|
||||
}
|
||||
|
||||
/**
|
||||
* Route input data from client.
|
||||
*
|
||||
@ -119,52 +123,31 @@ BinlogFilterSession* BinlogFilterSession::create(MXS_SESSION* pSession,
|
||||
*/
|
||||
int BinlogFilterSession::routeQuery(GWBUF* pPacket)
|
||||
{
|
||||
if (m_state != INACTIVE)
|
||||
uint8_t* data = GWBUF_DATA(pPacket);
|
||||
|
||||
switch (MYSQL_GET_COMMAND(data))
|
||||
{
|
||||
uint8_t* data = GWBUF_DATA(pPacket);
|
||||
case MXS_COM_REGISTER_SLAVE:
|
||||
// Connected client is registering as Slave Server
|
||||
m_serverid = gw_mysql_get_byte4(data + MYSQL_HEADER_LEN + 1);
|
||||
MXS_INFO("Client is registering as Slave server with ID %" PRIu32 "", m_serverid);
|
||||
break;
|
||||
|
||||
switch (MYSQL_GET_COMMAND(data))
|
||||
{
|
||||
case MXS_COM_REGISTER_SLAVE:
|
||||
// Connected client is registering as Slave Server
|
||||
m_serverid = gw_mysql_get_byte4(data + MYSQL_HEADER_LEN + 1);
|
||||
MXS_INFO("Client is registering as "
|
||||
"Slave server with ID %" PRIu32 "",
|
||||
m_serverid);
|
||||
break;
|
||||
case MXS_COM_BINLOG_DUMP:
|
||||
// Connected Slave server is waiting for binlog events
|
||||
m_state = BINLOG_MODE;
|
||||
MXS_INFO("Slave server %" PRIu32 " is waiting for binlog events.", m_serverid);
|
||||
break;
|
||||
|
||||
case MXS_COM_BINLOG_DUMP:
|
||||
// Connected Slave server is waiting for binlog events
|
||||
m_state = BINLOG_MODE;
|
||||
MXS_INFO("Slave server %" PRIu32 " is waiting for binlog events.",
|
||||
m_serverid);
|
||||
break;
|
||||
case MXS_COM_QUERY:
|
||||
// Connected client is using SQL mode
|
||||
m_state = COMMAND_MODE;
|
||||
m_reading_checksum = is_master_binlog_checksum(pPacket);
|
||||
break;
|
||||
|
||||
default:
|
||||
// Connected client is using SQL mode
|
||||
m_state = COMMAND_MODE;
|
||||
/**
|
||||
* TODO: remove this code when filters
|
||||
* will be able to pass some session informations
|
||||
* from "session->router_session"
|
||||
* The calling session (ROUTER_SLAVE from blr.h) is not accessible.
|
||||
*
|
||||
* With new maxscale filter features, simply add:
|
||||
* in 'case COM_REGISTER_SLAVE:'
|
||||
* m_crc = (SOME_STRUCT* )get_calling_session_info()->crc;
|
||||
*
|
||||
*/
|
||||
if (strcasestr((char*)data + MYSQL_HEADER_LEN + 1,
|
||||
"SELECT @master_binlog_checksum") != NULL)
|
||||
{
|
||||
if ((m_sql_query = gwbuf_clone(pPacket)) == NULL)
|
||||
{
|
||||
filterError(pPacket);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
// Not something we care about, just pass it through
|
||||
break;
|
||||
}
|
||||
|
||||
// Route input data
|
||||
@ -192,12 +175,10 @@ int BinlogFilterSession::clientReply(GWBUF* pPacket)
|
||||
* m_crc will be thus set in routeQuery.
|
||||
*/
|
||||
case COMMAND_MODE:
|
||||
if (m_sql_query != NULL
|
||||
&& !getReplicationChecksum(pPacket))
|
||||
if (m_reading_checksum)
|
||||
{
|
||||
// Free buffer and close client connection
|
||||
filterError(pPacket);
|
||||
return 0;
|
||||
getReplicationChecksum(pPacket);
|
||||
m_reading_checksum = false;
|
||||
}
|
||||
break;
|
||||
|
||||
@ -315,8 +296,7 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer,
|
||||
// Error in binlog stream: no filter
|
||||
m_state = ERRORED;
|
||||
m_skip = false;
|
||||
MXS_ERROR("Slave server %" PRIu32 " received error "
|
||||
"in replication stream, packet #%u",
|
||||
MXS_ERROR("Slave server %" PRIu32 " received error in replication stream, packet #%u",
|
||||
m_serverid,
|
||||
event[3]);
|
||||
}
|
||||
@ -337,9 +317,8 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer,
|
||||
break;
|
||||
|
||||
case MARIADB_ANNOTATE_ROWS_EVENT:
|
||||
// This even can come if replication mode is ROW
|
||||
// and it comes before TABLE_MAP event
|
||||
// m_skip can be set to true/false
|
||||
// This even can come if replication mode is ROW and it comes before TABLE_MAP event. It has no
|
||||
// effect so it can be safely replicated.
|
||||
checkAnnotate(event, hdr.event_size);
|
||||
break;
|
||||
|
||||
@ -372,8 +351,7 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer,
|
||||
fixEvent(event + MYSQL_HEADER_LEN + 1, hdr.event_size);
|
||||
|
||||
MXS_INFO("Skipped events: Setting next_pos = 0 in %s",
|
||||
event[4] == XID_EVENT ?
|
||||
"XID_EVENT" : "COMMIT");
|
||||
event[4] == XID_EVENT ? "XID_EVENT" : "COMMIT");
|
||||
}
|
||||
break;
|
||||
|
||||
@ -394,9 +372,7 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer,
|
||||
* @param dbname Pointer to pointer to database name
|
||||
* @param tblname Pointer to pointer to table name
|
||||
*/
|
||||
static void inline extract_table_info(const uint8_t* ptr,
|
||||
std::string& dbname,
|
||||
std::string& tblname)
|
||||
static std::string inline extract_table_info(const uint8_t* ptr)
|
||||
{
|
||||
/**
|
||||
* Extract dbname and table name from Table_map_log_event/TABLE_MAP_EVENT
|
||||
@ -407,8 +383,59 @@ static void inline extract_table_info(const uint8_t* ptr,
|
||||
int db_len = ptr[db_len_offset];
|
||||
int tbl_len = ptr[db_len_offset + 1 + db_len + 1]; // DB is NULL terminated
|
||||
|
||||
dbname.assign((char*)(ptr + db_len_offset + 1), db_len);
|
||||
tblname.assign((char*)(ptr + db_len_offset + 1 + db_len + 1 + 1), tbl_len);
|
||||
std::string dbname((char*)(ptr + db_len_offset + 1), db_len);
|
||||
std::string tblname((char*)(ptr + db_len_offset + 1 + db_len + 1 + 1), tbl_len);
|
||||
return dbname + "." + tblname;
|
||||
}
|
||||
|
||||
static bool should_skip(const BinlogConfig& config, const std::string& str)
|
||||
{
|
||||
bool skip = true;
|
||||
|
||||
if (!config.match
|
||||
|| pcre2_match(config.match, (PCRE2_SPTR)str.c_str(), PCRE2_ZERO_TERMINATED,
|
||||
0, 0, config.md_match, NULL) >= 0)
|
||||
{
|
||||
if (!config.exclude
|
||||
|| pcre2_match(config.exclude, (PCRE2_SPTR)str.c_str(), PCRE2_ZERO_TERMINATED, 0, 0,
|
||||
config.md_exclude, NULL) == PCRE2_ERROR_NOMATCH)
|
||||
{
|
||||
skip = false;
|
||||
}
|
||||
}
|
||||
|
||||
return skip;
|
||||
}
|
||||
|
||||
static bool should_skip_query(const BinlogConfig& config, const std::string& sql)
|
||||
{
|
||||
uint32_t pktlen = sql.size() + 1; // Payload and command byte
|
||||
GWBUF* buf = gwbuf_alloc(MYSQL_HEADER_LEN + pktlen);
|
||||
uint8_t* data = GWBUF_DATA(buf);
|
||||
|
||||
data[0] = pktlen;
|
||||
data[1] = pktlen >> 8;
|
||||
data[2] = pktlen >> 16;
|
||||
data[3] = 0;
|
||||
data[4] = (uint8_t)MXS_COM_QUERY;
|
||||
strcpy((char*)&data[5], sql.c_str());
|
||||
|
||||
bool rval = false;
|
||||
int n = 0;
|
||||
char** names = qc_get_table_names(buf, &n, true);
|
||||
|
||||
for (int i = 0; i < n; i++)
|
||||
{
|
||||
if (should_skip(config, names[i]))
|
||||
{
|
||||
rval = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
qc_free_table_names(names, n);
|
||||
gwbuf_free(buf);
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -427,59 +454,7 @@ void BinlogFilterSession::skipDatabaseTable(const uint8_t* data,
|
||||
// Note: Each time this event is seen the m_skip is overwritten
|
||||
if (hdr.event_type == TABLE_MAP_EVENT)
|
||||
{
|
||||
// Get filter configuration
|
||||
const BinlogConfig& fConfig = m_filter.getConfig();
|
||||
|
||||
// Set m_skip to false and return with empty config values
|
||||
if (fConfig.dbname.empty() && fConfig.table.empty())
|
||||
{
|
||||
m_skip = false;
|
||||
return;
|
||||
}
|
||||
|
||||
std::string db;
|
||||
std::string table;
|
||||
|
||||
// Get db/table from event data
|
||||
extract_table_info(data, db, table);
|
||||
|
||||
/**
|
||||
* Check db/table match with configuration
|
||||
*
|
||||
* Note: currently only one dbname and table name in config.
|
||||
*
|
||||
* (1.1) config db is set and config table is not set:
|
||||
* - if current db matches m_skip = true (say "db.*")
|
||||
* (1.2) config db is set and config table is set
|
||||
* - both db and table should match for m_skip = true (db.table)
|
||||
* (2.1) No db match or config table not set:
|
||||
* - m_skip set to false;
|
||||
* (2.2) config db is not set:
|
||||
* - if config table is set and matches: m_skip = true (say *.table)
|
||||
*/
|
||||
if (!fConfig.dbname.empty()
|
||||
&& db == fConfig.dbname)
|
||||
{
|
||||
// Config Db name matches: 1.1 OR 1.2
|
||||
m_skip = fConfig.table.empty() /* 1.1 */
|
||||
|| table == fConfig.table; /* 1.2 */
|
||||
}
|
||||
else
|
||||
{
|
||||
// No Db name match or db config is not set: 2.1 OR 2.2
|
||||
// Check only table name if set, the dbname doesn't matter
|
||||
m_skip = (!fConfig.dbname.empty() || fConfig.table.empty())
|
||||
? // (2.1)
|
||||
false
|
||||
: // (2.2)
|
||||
(fConfig.dbname.empty()
|
||||
&& table == fConfig.table) == 0;
|
||||
}
|
||||
|
||||
MXS_INFO("TABLE_MAP_EVENT: Dbname is [%s], Table is [%s], Skip [%s]",
|
||||
db.c_str(),
|
||||
table.c_str(),
|
||||
m_skip ? "Yes" : "No");
|
||||
m_skip = should_skip(m_filter.getConfig(), extract_table_info(data));
|
||||
}
|
||||
}
|
||||
|
||||
@ -775,24 +750,17 @@ void BinlogFilterSession::filterError(GWBUF* pPacket)
|
||||
* @param pPacket The resultset
|
||||
* @return False on error
|
||||
*/
|
||||
bool BinlogFilterSession::getReplicationChecksum(GWBUF* pPacket)
|
||||
void BinlogFilterSession::getReplicationChecksum(GWBUF* pPacket)
|
||||
{
|
||||
char* crc;
|
||||
if ((crc = extract_column(pPacket, 1)) == NULL)
|
||||
if (char* crc = extract_column(pPacket, 1))
|
||||
{
|
||||
return false;
|
||||
if (strcasestr(crc, "CRC32"))
|
||||
{
|
||||
m_crc = true;
|
||||
}
|
||||
|
||||
MXS_FREE(crc);
|
||||
}
|
||||
|
||||
if (strcasestr(crc, "CRC32"))
|
||||
{
|
||||
m_crc = true;
|
||||
}
|
||||
|
||||
MXS_FREE(crc);
|
||||
gwbuf_free(m_sql_query);
|
||||
m_sql_query = NULL;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -854,58 +822,8 @@ void BinlogFilterSession::handleEventData(uint32_t len,
|
||||
m_is_large = false;
|
||||
}
|
||||
|
||||
MXS_INFO("Binlog Event, data_only: pkt #%d, received %" PRIu32 ", "
|
||||
"remaining %" PRIu32 " bytes\n",
|
||||
seqno,
|
||||
len,
|
||||
m_large_left);
|
||||
}
|
||||
|
||||
/**
|
||||
* The routines sets the DB.TABLE for matching in QUERY_EVENT
|
||||
*
|
||||
* The db_table can be set to empty value in case the default db is present
|
||||
* but no config is set.
|
||||
*
|
||||
* If a default db is present the match is only for table name
|
||||
*
|
||||
* @param db_table Reference to db.table string to fill
|
||||
* @param config Reference filter config
|
||||
* @param use_db The event comes with default db (caused by USE db stmt)
|
||||
*/
|
||||
void matchDbTableSQL(std::string& db_table,
|
||||
const BinlogConfig& config,
|
||||
bool use_db)
|
||||
{
|
||||
|
||||
/**
|
||||
* Db name and Table name match in a QUERY_EVENT event data
|
||||
*
|
||||
* Note:
|
||||
* - db_table comes as an empty string ("")
|
||||
* - with default db only table if set for match even if empty
|
||||
*/
|
||||
if (!use_db && !config.dbname.empty())
|
||||
{
|
||||
// Set [FULL] TABLE NAME for match
|
||||
// default db is not set and config db is set:
|
||||
// use "db."
|
||||
db_table = config.dbname;
|
||||
db_table += ".";
|
||||
// Note: table name is added at the end
|
||||
}
|
||||
|
||||
// Add the config table for matching:
|
||||
db_table += config.table;
|
||||
|
||||
/**
|
||||
* db_table can be now:
|
||||
* A) "" which disables the matching in the caller
|
||||
* B) if use_db is not set and config db is set:
|
||||
* "cfg_db.cfg_table" or "cfg_db." (say db.*)
|
||||
* C) if use_db is true or config db is not set:
|
||||
* "cfg_table" (say *.table)
|
||||
*/
|
||||
MXS_INFO("Binlog Event, data_only: pkt #%d, received %" PRIu32 ", remaining %" PRIu32 " bytes\n",
|
||||
seqno, len, m_large_left);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -925,20 +843,6 @@ void matchDbTableSQL(std::string& db_table,
|
||||
bool BinlogFilterSession::checkStatement(const uint8_t* event,
|
||||
const uint32_t event_size)
|
||||
{
|
||||
// Get filter configuration
|
||||
const BinlogConfig& fConfig = m_filter.getConfig();
|
||||
|
||||
// Set m_skip to false and return with empty config values
|
||||
if (fConfig.dbname.empty() && fConfig.table.empty())
|
||||
{
|
||||
m_skip = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handling of Query_log_event/QUERY_EVENT is based on this doc:
|
||||
* https://dev.mysql.com/doc/internals/en/event-data-for-specific-event-types.html
|
||||
*/
|
||||
int db_name_len, var_block_len, statement_len, var_block_len_offset, var_block_end;
|
||||
db_name_len = event[MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 4 + 4];
|
||||
var_block_len_offset = MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 4 + 4 + 1 + 2;
|
||||
@ -946,147 +850,28 @@ bool BinlogFilterSession::checkStatement(const uint8_t* event,
|
||||
var_block_end = var_block_len_offset + 2;
|
||||
|
||||
// SQL statement len
|
||||
statement_len = (MYSQL_HEADER_LEN + 1 + event_size) \
|
||||
- (var_block_end + var_block_len + db_name_len + 1) \
|
||||
statement_len = (MYSQL_HEADER_LEN + 1 + event_size)
|
||||
- (var_block_end + var_block_len + db_name_len + 1)
|
||||
- (m_crc ? 4 : 0);
|
||||
|
||||
// Set dbname (NULL terminated in the packet)
|
||||
std::string db_name = (char*)event + var_block_end + var_block_len;
|
||||
|
||||
// Set SQL statement, NULL is NOT present in the packet !
|
||||
std::string statement_sql((char*)event + var_block_end \
|
||||
+ var_block_len + db_name_len + 1,
|
||||
statement_len);
|
||||
std::string sql((char*)event + var_block_end + var_block_len + db_name_len + 1, statement_len);
|
||||
std::string lower_sql;
|
||||
std::transform(sql.begin(), sql.end(), std::back_inserter(lower_sql), tolower);
|
||||
|
||||
// Check for BEGIN (it comes for START TRANSACTION too)
|
||||
if (statement_sql.compare(0, 5, "BEGIN") == 0)
|
||||
if (lower_sql.find("commit") != std::string::npos)
|
||||
{
|
||||
// New transaction, reset m_skip anyway
|
||||
m_skip = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check for COMMIT
|
||||
if (statement_sql.compare(0, 6, "COMMIT") == 0)
|
||||
{
|
||||
// End of transaction, m_skip handled by the caller
|
||||
return false;
|
||||
}
|
||||
|
||||
// Default DB check with config
|
||||
if (!db_name_len || !checkUseDB(db_name, fConfig))
|
||||
{
|
||||
std::string db_table("");
|
||||
m_skip = should_skip_query(m_filter.getConfig(), sql);
|
||||
|
||||
// The Default db is not present in the event.
|
||||
// Create the TABLE or DB.TABLE or DB. match
|
||||
matchDbTableSQL(db_table, fConfig, db_name_len != 0);
|
||||
|
||||
// Match non empty db/table in the SQL statement
|
||||
m_skip = !db_table.empty()
|
||||
&& statement_sql.find(db_table) != std::string::npos;
|
||||
|
||||
MXS_INFO("QUERY_EVENT: config DB.TABLE is [%s], Skip [%s]",
|
||||
db_table.c_str(),
|
||||
m_skip ? "Yes" : "No");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether default DB config match is enough
|
||||
* for event skipping
|
||||
*
|
||||
* The routines sets member variable m_skip to true if:
|
||||
* - config db is set, db name matches and no config table
|
||||
* Return is true in the above case or if dbname doesn't match.
|
||||
*
|
||||
* @param db_name Reference to default db in QUERY_EVENT
|
||||
* @param config Reference to filter config
|
||||
* @return True if match can be stopped, false otherwise.
|
||||
*/
|
||||
bool BinlogFilterSession::checkUseDB(const std::string& db_name,
|
||||
const BinlogConfig& config)
|
||||
void BinlogFilterSession::checkAnnotate(const uint8_t* event, const uint32_t event_size)
|
||||
{
|
||||
bool ret = false;
|
||||
|
||||
// Check match with configuration and set m_skip
|
||||
if ((!config.dbname.empty()
|
||||
&& db_name.compare(config.dbname) == 0))
|
||||
{
|
||||
// Config db exists and dbname in QUERY_EVENT matches
|
||||
// and config table is not set: set m_skip = true
|
||||
m_skip = config.table.empty();
|
||||
|
||||
if (m_skip)
|
||||
{
|
||||
// Return: no need to match the config table (say db.*)
|
||||
ret = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!config.dbname.empty())
|
||||
{
|
||||
// No db match
|
||||
m_skip = false;
|
||||
// Return, no other checks
|
||||
ret = true;
|
||||
}
|
||||
}
|
||||
|
||||
MXS_INFO("QUERY_EVENT: Default DB is [%s], config [%s], Skip [%s], "
|
||||
"Stop matching [%s]",
|
||||
db_name.c_str(),
|
||||
config.dbname.c_str(),
|
||||
m_skip ? "Yes" : "No",
|
||||
ret ? "Yes" : "No");
|
||||
|
||||
// Return true stops the matching, false keeps it.
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether an ANNOTATE_ROWS event
|
||||
* can be skipped, based on config matching against
|
||||
* SQL statement (which start right after the even header)
|
||||
*
|
||||
* In case of match the member var m_skip is set to true
|
||||
*
|
||||
* @param event The QUERY_EVENT binlog event
|
||||
* @param event_size The binlog event size
|
||||
*/
|
||||
void BinlogFilterSession::checkAnnotate(const uint8_t* event,
|
||||
const uint32_t event_size)
|
||||
{
|
||||
// Get filter configuration
|
||||
const BinlogConfig& fConfig = m_filter.getConfig();
|
||||
|
||||
// Set m_skip to false and return with empty config values
|
||||
if (fConfig.dbname.empty() && fConfig.table.empty())
|
||||
{
|
||||
m_skip = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// SQL statement len
|
||||
int statement_len = event_size \
|
||||
- (MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN) \
|
||||
- (m_crc ? 4 : 0);
|
||||
|
||||
std::string db_table("");
|
||||
std::string statement_sql((char*)event \
|
||||
+ MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN,
|
||||
statement_len);
|
||||
|
||||
// Create the TABLE or DB.TABLE or DB. match
|
||||
matchDbTableSQL(db_table, fConfig, false);
|
||||
|
||||
// Match non empty db/table in the SQL statement
|
||||
m_skip = !db_table.empty()
|
||||
&& statement_sql.find(db_table) != std::string::npos;
|
||||
|
||||
MXS_INFO("ANNOTATE_ROWS_EVENT: config DB.TABLE is [%s], Skip [%s]",
|
||||
db_table.c_str(),
|
||||
m_skip ? "Yes" : "No");
|
||||
int statement_len = event_size - (MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN) - (m_crc ? 4 : 0);
|
||||
std::string sql((char*)event + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN, statement_len);
|
||||
m_skip = should_skip_query(m_filter.getConfig(), sql);
|
||||
}
|
||||
|
@ -79,7 +79,7 @@ private:
|
||||
void skipDatabaseTable(const uint8_t* data, const REP_HEADER& hdr);
|
||||
|
||||
// Get Replication Checksum from registration query
|
||||
bool getReplicationChecksum(GWBUF* pPacket);
|
||||
void getReplicationChecksum(GWBUF* pPacket);
|
||||
|
||||
// Abort filter operations
|
||||
void filterError(GWBUF* pPacket);
|
||||
@ -103,10 +103,6 @@ private:
|
||||
bool checkStatement(const uint8_t* event,
|
||||
const uint32_t event_size);
|
||||
|
||||
// Check Default DB name extracted from QUERY_EVENT
|
||||
bool checkUseDB(const std::string& db_name,
|
||||
const BinlogConfig& config);
|
||||
|
||||
// Check DB.TABLE in ANNOTATE_ROWS event
|
||||
void checkAnnotate(const uint8_t* event,
|
||||
const uint32_t event_size);
|
||||
@ -116,18 +112,17 @@ private:
|
||||
enum state_t
|
||||
{
|
||||
ERRORED, // A blocking error occurred
|
||||
INACTIVE, // Fitering is not active
|
||||
COMMAND_MODE, // Connected client in SQL mode: no filtering
|
||||
BINLOG_MODE // Connected client in BINLOG_MODE: filter events
|
||||
};
|
||||
|
||||
private:
|
||||
// Event filtering member vars
|
||||
uint32_t m_serverid; // server-id of connected slave
|
||||
state_t m_state; // Internal state
|
||||
bool m_skip; // Mark event skipping
|
||||
bool m_crc; // CRC32 for events. Not implemented
|
||||
uint32_t m_large_left; // Remaining bytes of a large event
|
||||
bool m_is_large; // Large Event indicator
|
||||
GWBUF* m_sql_query; // SQL query buffer
|
||||
uint32_t m_serverid = 0; // server-id of connected slave
|
||||
state_t m_state = COMMAND_MODE; // Internal state
|
||||
bool m_skip = false; // Mark event skipping
|
||||
bool m_crc = false; // CRC32 for events. Not implemented
|
||||
uint32_t m_large_left = 0; // Remaining bytes of a large event
|
||||
bool m_is_large = false; // Large Event indicator
|
||||
bool m_reading_checksum = false;// Whether we are waiting for the binlog checksum response
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user