MXS-701: added support for DDLs and STATEMENT replication
MXS-701: added support for DDLs and STATEMENT replication
This commit is contained in:
@ -94,9 +94,6 @@ int BinlogFilterSession::routeQuery(GWBUF* pPacket)
|
||||
{
|
||||
uint8_t *data = GWBUF_DATA(pPacket);
|
||||
|
||||
// We assume OK indicator, the first byte after MYSQL_HEADER_LEN is 0
|
||||
// TODO: check complete packet or
|
||||
// at least MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN bytes
|
||||
switch (MYSQL_GET_COMMAND(data))
|
||||
{
|
||||
case COM_REGISTER_SLAVE:
|
||||
@ -287,14 +284,33 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer,
|
||||
// or is the begiining of large event.
|
||||
switch(hdr.event_type)
|
||||
{
|
||||
case MARIADB10_GTID_EVENT:
|
||||
// New transaction, reset m_skip anyway
|
||||
m_skip = false;
|
||||
break;
|
||||
case MARIADB_ANNOTATE_ROWS_EVENT:
|
||||
// This even can come if replication mode is ROW
|
||||
// and it comes before TABLE_MAP event
|
||||
// m_skip can be set to true/false
|
||||
checkAnnotate(event, hdr.event_size);
|
||||
break;
|
||||
case TABLE_MAP_EVENT:
|
||||
// Check db/table and set m_skip accordingly
|
||||
skipDatabaseTable(event, hdr);
|
||||
break;
|
||||
case QUERY_EVENT:
|
||||
//TODO handle the event and extract dbname or COMMIT
|
||||
// Handle the SQL statement: DDL, DML, BEGIN, COMMIT
|
||||
// If statement is COMMIT, then continue with next case.
|
||||
if (checkStatement(event, hdr.event_size))
|
||||
{
|
||||
break;
|
||||
}
|
||||
case XID_EVENT:
|
||||
// COMMIT: reset m_skip if set and set next pos to 0
|
||||
/** Note: This case is reached when event_type is
|
||||
* XID_EVENT or QUERY_EVENT with COMMIT
|
||||
*
|
||||
* reset m_skip if set and set next pos to 0
|
||||
*/
|
||||
if (m_skip)
|
||||
{
|
||||
m_skip = false;
|
||||
@ -304,7 +320,9 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer,
|
||||
*/
|
||||
fixEvent(event + MYSQL_HEADER_LEN + 1, hdr.event_size);
|
||||
|
||||
MXS_INFO("Skipped events: Setting next_pos = 0 in XID_EVENT/COMMIT");
|
||||
MXS_INFO("Skipped events: Setting next_pos = 0 in %s",
|
||||
event[4] == XID_EVENT ?
|
||||
"XID_EVENT" : "COMMIT");
|
||||
}
|
||||
break;
|
||||
default:
|
||||
@ -324,15 +342,21 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer,
|
||||
* @param dbname Pointer to pointer to database name
|
||||
* @param tblname Pointer to pointer to table name
|
||||
*/
|
||||
static void inline extractTableInfo(const uint8_t *ptr,
|
||||
char **dbname,
|
||||
char **tblname)
|
||||
static void inline extract_table_info(const uint8_t *ptr,
|
||||
std::string& dbname,
|
||||
std::string& tblname)
|
||||
{
|
||||
// TODO: add offsets in order to facilitate reading
|
||||
int db_len = *(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8);
|
||||
/**
|
||||
* Extract dbname and table name from Table_map_log_event/TABLE_MAP_EVENT
|
||||
* https://dev.mysql.com/doc/internals/en/event-data-for-specific-event-types.html
|
||||
*/
|
||||
|
||||
*dbname = (char *)(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 1);
|
||||
*tblname = (char *)(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 1 + db_len + 1 + 1);
|
||||
int db_len_offset = MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 6 + 2;
|
||||
int db_len = ptr[db_len_offset];
|
||||
int tbl_len = ptr[db_len_offset + 1 + db_len + 1]; // DB is NULL terminated
|
||||
|
||||
dbname.assign((char *)(ptr + db_len_offset + 1), db_len);
|
||||
tblname.assign((char *)(ptr + db_len_offset + 1 + db_len + 1 + 1), tbl_len);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -348,25 +372,61 @@ void BinlogFilterSession::skipDatabaseTable(const uint8_t* data,
|
||||
const REP_HEADER& hdr)
|
||||
{
|
||||
// Check for TABLE_MAP event:
|
||||
// Each time this event is seen the m_skip is overwritten
|
||||
// Note: Each time this event is seen the m_skip is overwritten
|
||||
if (hdr.event_type == TABLE_MAP_EVENT)
|
||||
{
|
||||
char *db = NULL;
|
||||
char *table = NULL;
|
||||
|
||||
// Get filter configuration
|
||||
const BinlogConfig& fConfig = m_filter.getConfig();
|
||||
|
||||
// Get db/table
|
||||
extractTableInfo(data, &db, &table);
|
||||
// Set m_skip to false and return with empty config values
|
||||
if (fConfig.dbname.empty() && fConfig.table.empty())
|
||||
{
|
||||
m_skip = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// Check match with configuration
|
||||
m_skip = (bool)(strcmp(db, fConfig.dbname.c_str()) == 0 ||
|
||||
strcmp(table, fConfig.table.c_str()) == 0);
|
||||
std::string db;
|
||||
std::string table;
|
||||
|
||||
MXS_INFO("Dbname is [%s], Table is [%s], Skip [%s]\n",
|
||||
db ? db : "",
|
||||
table ? table : "",
|
||||
// Get db/table from event data
|
||||
extract_table_info(data, db, table);
|
||||
|
||||
/**
|
||||
* Check db/table match with configuration
|
||||
*
|
||||
* Note: currently only one dbname and table name in config.
|
||||
*
|
||||
* (1.1) config db is set and config table is not set:
|
||||
* - if current db matches m_skip = true (say "db.*")
|
||||
* (1.2) config db is set and config table is set
|
||||
* - both db and table should match for m_skip = true (db.table)
|
||||
* (2.1) No db match or config table not set:
|
||||
* - m_skip set to false;
|
||||
* (2.2) config db is not set:
|
||||
* - if config table is set and matches: m_skip = true (say *.table)
|
||||
*/
|
||||
if (!fConfig.dbname.empty() &&
|
||||
db == fConfig.dbname)
|
||||
{
|
||||
// Config Db name matches: 1.1 OR 1.2
|
||||
m_skip = fConfig.table.empty() /* 1.1 */ ||
|
||||
table == fConfig.table; /* 1.2 */
|
||||
}
|
||||
else
|
||||
{
|
||||
// No Db name match or db config is not set: 2.1 OR 2.2
|
||||
// Check only table name if set, the dbname doesn't matter
|
||||
m_skip = (!fConfig.dbname.empty() || fConfig.table.empty()) ?
|
||||
// (2.1)
|
||||
false :
|
||||
// (2.2)
|
||||
(fConfig.dbname.empty() &&
|
||||
table == fConfig.table) == 0;
|
||||
}
|
||||
|
||||
MXS_INFO("TABLE_MAP_EVENT: Dbname is [%s], Table is [%s], Skip [%s]",
|
||||
db.c_str(),
|
||||
table.c_str(),
|
||||
m_skip ? "Yes" : "No");
|
||||
}
|
||||
}
|
||||
@ -669,3 +729,233 @@ void BinlogFilterSession::handleEventData(uint32_t len,
|
||||
len,
|
||||
m_large_left);
|
||||
}
|
||||
|
||||
/**
|
||||
* The routines sets the DB.TABLE for matching in QUERY_EVENT
|
||||
*
|
||||
* The db_table can be set to empty value in case the default db is present
|
||||
* but no config is set.
|
||||
*
|
||||
* If a default db is present the match is only for table name
|
||||
*
|
||||
* @param db_table Reference to db.table string to fill
|
||||
* @param config Reference filter config
|
||||
* @param use_db The event comes with default db (caused by USE db stmt)
|
||||
*/
|
||||
void matchDbTableSQL(std::string& db_table,
|
||||
const BinlogConfig& config,
|
||||
bool use_db)
|
||||
{
|
||||
|
||||
/**
|
||||
* Db name and Table name match in a QUERY_EVENT event data
|
||||
*
|
||||
* Note:
|
||||
* - db_table comes as an empty string ("")
|
||||
* - with default db only table if set for match even if empty
|
||||
*/
|
||||
if (!use_db && !config.dbname.empty())
|
||||
{
|
||||
// Set [FULL] TABLE NAME for match
|
||||
// default db is not set and config db is set:
|
||||
// use "db."
|
||||
db_table = config.dbname;
|
||||
db_table += ".";
|
||||
// Note: table name is added at the end
|
||||
}
|
||||
|
||||
// Add the config table for matching:
|
||||
db_table += config.table;
|
||||
|
||||
/**
|
||||
* db_table can be now:
|
||||
* A) "" which disables the matching in the caller
|
||||
* B) if use_db is not set and config db is set:
|
||||
* "cfg_db.cfg_table" or "cfg_db." (say db.*)
|
||||
* C) if use_db is true or config db is not set:
|
||||
* "cfg_table" (say *.table)
|
||||
*/
|
||||
}
|
||||
|
||||
/**
|
||||
* Check wether the config for db/table filtering is found in
|
||||
* the SQL statement inside QUERY_EVENT binlog event.
|
||||
* Note: COMMIT is an exception here, routine returns false
|
||||
* and the called will set m_skip to the right value.
|
||||
*
|
||||
* In case of config match the member variable m_skip is set to true.
|
||||
* With empty config it returns true and skipping is always false.
|
||||
*
|
||||
*
|
||||
* @param event The QUERY_EVENT binlog event
|
||||
* @param event_size The binlog event size
|
||||
* @return False for COMMIT, true otherwise
|
||||
*/
|
||||
bool BinlogFilterSession::checkStatement(const uint8_t* event,
|
||||
const uint32_t event_size)
|
||||
{
|
||||
// Get filter configuration
|
||||
const BinlogConfig& fConfig = m_filter.getConfig();
|
||||
|
||||
// Set m_skip to false and return with empty config values
|
||||
if (fConfig.dbname.empty() && fConfig.table.empty())
|
||||
{
|
||||
m_skip = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handling of Query_log_event/QUERY_EVENT is based on this doc:
|
||||
* https://dev.mysql.com/doc/internals/en/event-data-for-specific-event-types.html
|
||||
*/
|
||||
int db_name_len, var_block_len, statement_len, var_block_len_offset, var_block_end;
|
||||
db_name_len = event[MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 4 + 4];
|
||||
var_block_len_offset = MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 4 + 4 + 1 + 2;
|
||||
var_block_len = gw_mysql_get_byte2(event + var_block_len_offset);
|
||||
var_block_end = var_block_len_offset + 2;
|
||||
|
||||
// SQL statement len
|
||||
statement_len = (MYSQL_HEADER_LEN + 1 + event_size) - \
|
||||
(var_block_end + var_block_len + db_name_len + 1) - \
|
||||
(m_crc ? 4 : 0);
|
||||
|
||||
// Set dbname (NULL terminated in the packet)
|
||||
std::string db_name = (char *)event + var_block_end + var_block_len;
|
||||
|
||||
// Set SQL statement, NULL is NOT present in the packet !
|
||||
std::string statement_sql((char *)event + var_block_end + \
|
||||
var_block_len + db_name_len + 1,
|
||||
statement_len);
|
||||
|
||||
// Check for BEGIN (it comes for START TRANSACTION too)
|
||||
if (statement_sql.compare(0, 5, "BEGIN") == 0)
|
||||
{
|
||||
// New transaction, reset m_skip anyway
|
||||
m_skip = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check for COMMIT
|
||||
if (statement_sql.compare(0, 6, "COMMIT") == 0)
|
||||
{
|
||||
// End of transaction, m_skip handled by the caller
|
||||
return false;
|
||||
}
|
||||
|
||||
// Default DB check with config
|
||||
if (!db_name_len || !checkUseDB(db_name, fConfig))
|
||||
{
|
||||
std::string db_table("");
|
||||
|
||||
// The Default db is not present in the event.
|
||||
// Create the TABLE or DB.TABLE or DB. match
|
||||
matchDbTableSQL(db_table, fConfig, db_name_len != 0);
|
||||
|
||||
// Match non empty db/table in the SQL statement
|
||||
m_skip = !db_table.empty() &&
|
||||
statement_sql.find(db_table) != std::string::npos;
|
||||
|
||||
MXS_INFO("QUERY_EVENT: config DB.TABLE is [%s], Skip [%s]",
|
||||
db_table.c_str(),
|
||||
m_skip ? "Yes" : "No");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether default DB config match is enough
|
||||
* for event skipping
|
||||
*
|
||||
* The routines sets member variable m_skip to true if:
|
||||
* - config db is set, db name matches and no config table
|
||||
* Return is true in the above case or if dbname doesn't match.
|
||||
*
|
||||
* @param db_name Reference to default db in QUERY_EVENT
|
||||
* @param config Reference to filter config
|
||||
* @return True if match can be stopped, false otherwise.
|
||||
*/
|
||||
bool BinlogFilterSession::checkUseDB(const std::string& db_name,
|
||||
const BinlogConfig& config)
|
||||
{
|
||||
bool ret = false;
|
||||
|
||||
// Check match with configuration and set m_skip
|
||||
if ((!config.dbname.empty() &&
|
||||
db_name.compare(config.dbname) == 0))
|
||||
{
|
||||
// Config db exists and dbname in QUERY_EVENT matches
|
||||
// and config table is not set: set m_skip = true
|
||||
m_skip = config.table.empty();
|
||||
|
||||
if (m_skip)
|
||||
{
|
||||
// Return: no need to match the config table (say db.*)
|
||||
ret = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!config.dbname.empty())
|
||||
{
|
||||
// No db match
|
||||
m_skip = false;
|
||||
// Return, no other checks
|
||||
ret = true;
|
||||
}
|
||||
}
|
||||
|
||||
MXS_INFO("QUERY_EVENT: Default DB is [%s], config [%s], Skip [%s], "
|
||||
"Stop matching [%s]",
|
||||
db_name.c_str(),
|
||||
config.dbname.c_str(),
|
||||
m_skip ? "Yes" : "No",
|
||||
ret ? "Yes" : "No");
|
||||
|
||||
// Return true stops the matching, false keeps it.
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether an ANNOTATE_ROWS event
|
||||
* can be skipped, based on config matching against
|
||||
* SQL statement (which start right after the even header)
|
||||
*
|
||||
* In case of match the member var m_skip is set to true
|
||||
*
|
||||
* @param event The QUERY_EVENT binlog event
|
||||
* @param event_size The binlog event size
|
||||
*/
|
||||
void BinlogFilterSession::checkAnnotate(const uint8_t* event,
|
||||
const uint32_t event_size)
|
||||
{
|
||||
// Get filter configuration
|
||||
const BinlogConfig& fConfig = m_filter.getConfig();
|
||||
|
||||
// Set m_skip to false and return with empty config values
|
||||
if (fConfig.dbname.empty() && fConfig.table.empty())
|
||||
{
|
||||
m_skip = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// SQL statement len
|
||||
int statement_len = event_size - \
|
||||
(MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN) - \
|
||||
(m_crc ? 4 : 0);
|
||||
|
||||
std::string db_table("");
|
||||
std::string statement_sql((char *)event + \
|
||||
MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN,
|
||||
statement_len);
|
||||
|
||||
// Create the TABLE or DB.TABLE or DB. match
|
||||
matchDbTableSQL(db_table, fConfig, false);
|
||||
|
||||
// Match non empty db/table in the SQL statement
|
||||
m_skip = !db_table.empty() &&
|
||||
statement_sql.find(db_table) != std::string::npos;
|
||||
|
||||
MXS_INFO("ANNOTATE_ROWS_EVENT: config DB.TABLE is [%s], Skip [%s]",
|
||||
db_table.c_str(),
|
||||
m_skip ? "Yes" : "No");
|
||||
}
|
||||
|
@ -23,8 +23,12 @@
|
||||
#define TABLE_MAP_EVENT 0x0013
|
||||
#define XID_EVENT 0x0010
|
||||
#define QUERY_EVENT 0x0002
|
||||
#define MARIADB10_GTID_EVENT 0x00a2
|
||||
#define MARIADB_ANNOTATE_ROWS_EVENT 0x00a0
|
||||
#define BINLOG_EVENT_HDR_LEN 19
|
||||
|
||||
class BinlogConfig;
|
||||
|
||||
typedef struct rep_header_t
|
||||
{
|
||||
int payload_len; /*< Payload length (24 bits) */
|
||||
@ -94,6 +98,18 @@ private:
|
||||
// Handle event data
|
||||
void handleEventData(uint32_t len, const uint8_t seqno);
|
||||
|
||||
// Check SQL statement in QUERY_EVENT
|
||||
bool checkStatement(const uint8_t* event,
|
||||
const uint32_t event_size);
|
||||
|
||||
// Check Default DB name extracted from QUERY_EVENT
|
||||
bool checkUseDB(const std::string& db_name,
|
||||
const BinlogConfig& config);
|
||||
|
||||
// Check DB.TABLE in ANNOTATE_ROWS event
|
||||
void checkAnnotate(const uint8_t* event,
|
||||
const uint32_t event_size);
|
||||
|
||||
private:
|
||||
// Internal states for filter operations
|
||||
enum state_t
|
||||
|
Reference in New Issue
Block a user