MXS-701: Clean up binlogfilter
Removed unused code, ordered function definitions that declarations aren't needed, changed functions to pass pointers to the event body instead of the start of the protocol packet.
This commit is contained in:
parent
5869f5369c
commit
82b3ffdf60
@ -55,21 +55,12 @@
|
||||
#include "binlogfilter.hh"
|
||||
#include "binlogfiltersession.hh"
|
||||
|
||||
|
||||
// New packet which replaces the skipped events has 0 payload
|
||||
#define NEW_PACKET_PAYLOD BINLOG_EVENT_HDR_LEN
|
||||
|
||||
static char* extract_column(GWBUF* buf, int col);
|
||||
static void event_set_crc32(uint8_t* event, uint32_t event_size);
|
||||
static void extract_header(register const uint8_t* event,
|
||||
register REP_HEADER* hdr);
|
||||
/**
|
||||
* BinlogFilterSession constructor
|
||||
*
|
||||
* @param pSession The calling routing/filter session
|
||||
* @param pFilter Pointer to filter configuration
|
||||
*/
|
||||
|
||||
BinlogFilterSession::BinlogFilterSession(MXS_SESSION* pSession,
|
||||
const BinlogFilter* pFilter)
|
||||
: mxs::FilterSession(pSession)
|
||||
@ -154,6 +145,35 @@ int BinlogFilterSession::routeQuery(GWBUF* pPacket)
|
||||
return mxs::FilterSession::routeQuery(pPacket);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract binlog replication header from event data
|
||||
*
|
||||
* @param event The replication event
|
||||
* @param hdr Pointer to repliction header to fill
|
||||
*/
|
||||
static void extract_header(register const uint8_t* event,
|
||||
register REP_HEADER* hdr)
|
||||
{
|
||||
hdr->seqno = event[3];
|
||||
hdr->payload_len = gw_mysql_get_byte3(event);
|
||||
hdr->ok = event[MYSQL_HEADER_LEN];
|
||||
if (hdr->ok != 0)
|
||||
{
|
||||
// Don't parse data in case of Error in Replication Stream
|
||||
return;
|
||||
}
|
||||
|
||||
// event points to Event Header (19 bytes)
|
||||
event += MYSQL_HEADER_LEN + 1;
|
||||
hdr->timestamp = gw_mysql_get_byte4(event);
|
||||
hdr->event_type = event[4];
|
||||
// TODO: add offsets in order to facilitate reading
|
||||
hdr->serverid = gw_mysql_get_byte4(event + 4 + 1);
|
||||
hdr->event_size = gw_mysql_get_byte4(event + 4 + 1 + 4);
|
||||
hdr->next_pos = gw_mysql_get_byte4(event + 4 + 1 + 4 + 4);
|
||||
hdr->flags = gw_mysql_get_byte2(event + 4 + 1 + 4 + 4 + 4);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reply data to client: Binlog events can be filtered
|
||||
*
|
||||
@ -201,7 +221,7 @@ int BinlogFilterSession::clientReply(GWBUF* pPacket)
|
||||
{
|
||||
// Handle data part of a large event:
|
||||
// Packet sequence is at offset 3
|
||||
handleEventData(len, event[3]);
|
||||
handleEventData(len);
|
||||
}
|
||||
|
||||
// If transaction events need to be skipped,
|
||||
@ -225,40 +245,6 @@ int BinlogFilterSession::clientReply(GWBUF* pPacket)
|
||||
*/
|
||||
void BinlogFilterSession::close()
|
||||
{
|
||||
if (m_state == BINLOG_MODE)
|
||||
{
|
||||
MXS_DEBUG("Slave server %" PRIu32 ": replication stopped.",
|
||||
m_serverid);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract binlog replication header from event data
|
||||
*
|
||||
* @param event The replication event
|
||||
* @param hdr Pointer to repliction header to fill
|
||||
*/
|
||||
static void extract_header(register const uint8_t* event,
|
||||
register REP_HEADER* hdr)
|
||||
{
|
||||
hdr->seqno = event[3];
|
||||
hdr->payload_len = gw_mysql_get_byte3(event);
|
||||
hdr->ok = event[MYSQL_HEADER_LEN];
|
||||
if (hdr->ok != 0)
|
||||
{
|
||||
// Don't parse data in case of Error in Replication Stream
|
||||
return;
|
||||
}
|
||||
|
||||
// event points to Event Header (19 bytes)
|
||||
event += MYSQL_HEADER_LEN + 1;
|
||||
hdr->timestamp = gw_mysql_get_byte4(event);
|
||||
hdr->event_type = event[4];
|
||||
// TODO: add offsets in order to facilitate reading
|
||||
hdr->serverid = gw_mysql_get_byte4(event + 4 + 1);
|
||||
hdr->event_size = gw_mysql_get_byte4(event + 4 + 1 + 4);
|
||||
hdr->next_pos = gw_mysql_get_byte4(event + 4 + 1 + 4 + 4);
|
||||
hdr->flags = gw_mysql_get_byte2(event + 4 + 1 + 4 + 4 + 4);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -278,15 +264,15 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer,
|
||||
mxb_assert(!m_is_large);
|
||||
|
||||
uint8_t* event = GWBUF_DATA(buffer);
|
||||
uint8_t* body = event + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN;
|
||||
uint32_t body_size = hdr.event_size - BINLOG_EVENT_HDR_LEN;
|
||||
|
||||
if (hdr.ok != 0)
|
||||
{
|
||||
// Error in binlog stream: no filter
|
||||
m_state = ERRORED;
|
||||
m_skip = false;
|
||||
MXS_ERROR("Slave server %" PRIu32 " received error in replication stream, packet #%u",
|
||||
m_serverid,
|
||||
event[3]);
|
||||
MXS_INFO("Slave server %" PRIu32 " received error in replication stream", m_serverid);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -307,18 +293,18 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer,
|
||||
case MARIADB_ANNOTATE_ROWS_EVENT:
|
||||
// This even can come if replication mode is ROW and it comes before TABLE_MAP event. It has no
|
||||
// effect so it can be safely replicated.
|
||||
checkAnnotate(event, hdr.event_size);
|
||||
checkAnnotate(body, body_size);
|
||||
break;
|
||||
|
||||
case TABLE_MAP_EVENT:
|
||||
// Check db/table and set m_skip accordingly
|
||||
skipDatabaseTable(event, hdr);
|
||||
skipDatabaseTable(body);
|
||||
break;
|
||||
|
||||
case QUERY_EVENT:
|
||||
// Handle the SQL statement: DDL, DML, BEGIN, COMMIT
|
||||
// If statement is COMMIT, then continue with next case.
|
||||
if (checkStatement(event, hdr.event_size))
|
||||
if (checkStatement(body, body_size))
|
||||
{
|
||||
break;
|
||||
}
|
||||
@ -364,7 +350,7 @@ static std::string inline extract_table_info(const uint8_t* ptr)
|
||||
* https://dev.mysql.com/doc/internals/en/event-data-for-specific-event-types.html
|
||||
*/
|
||||
|
||||
int db_len_offset = MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 6 + 2;
|
||||
int db_len_offset = 6 + 2;
|
||||
int db_len = ptr[db_len_offset];
|
||||
int tbl_len = ptr[db_len_offset + 1 + db_len + 1]; // DB is NULL terminated
|
||||
|
||||
@ -432,17 +418,24 @@ static bool should_skip_query(const BinlogConfig& config, const std::string& sql
|
||||
* @param data Binlog event data
|
||||
* @param hdr Reference to replication event header
|
||||
*/
|
||||
void BinlogFilterSession::skipDatabaseTable(const uint8_t* data,
|
||||
const REP_HEADER& hdr)
|
||||
void BinlogFilterSession::skipDatabaseTable(const uint8_t* data)
|
||||
{
|
||||
// Check for TABLE_MAP event:
|
||||
// Note: Each time this event is seen the m_skip is overwritten
|
||||
if (hdr.event_type == TABLE_MAP_EVENT)
|
||||
{
|
||||
std::string table = extract_table_info(data);
|
||||
m_skip = should_skip(m_filter.getConfig(), table);
|
||||
MXS_INFO("[%s] TABLE_MAP: %s", m_skip ? "SKIP" : " ", table.c_str());
|
||||
}
|
||||
std::string table = extract_table_info(data);
|
||||
m_skip = should_skip(m_filter.getConfig(), table);
|
||||
MXS_INFO("[%s] TABLE_MAP: %s", m_skip ? "SKIP" : " ", table.c_str());
|
||||
}
|
||||
|
||||
/**
|
||||
* Set CRC32 in the event buffer
|
||||
*
|
||||
* @param event Pointer to event data
|
||||
* @param event_size The event size
|
||||
*/
|
||||
static void event_set_crc32(uint8_t* event, uint32_t event_size)
|
||||
{
|
||||
uint32_t chksum = crc32(0L, NULL, 0);
|
||||
chksum = crc32(chksum, event, event_size - 4);
|
||||
gw_mysql_set_byte4(event + event_size - 4, chksum);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -604,19 +597,6 @@ void BinlogFilterSession::replaceEvent(GWBUF** ppPacket)
|
||||
|
||||
// Fix Event Next pos = 0 and set new CRC32
|
||||
fixEvent(ptr + MYSQL_HEADER_LEN + 1, new_event_size);
|
||||
|
||||
// Log the replaced event
|
||||
// Now point to event_size offset
|
||||
event_header_offset = MYSQL_HEADER_LEN + 1 + 4 + 1 + 4;
|
||||
MXS_DEBUG("Filtered event #%d, "
|
||||
"ok %d, type %d, flags %d, size %d, next_pos %d, packet_size %d\n",
|
||||
ptr[3],
|
||||
ptr[4],
|
||||
RAND_EVENT,
|
||||
gw_mysql_get_byte2(ptr + event_header_offset + 4 + 4),
|
||||
gw_mysql_get_byte4(ptr + event_header_offset),
|
||||
gw_mysql_get_byte4(ptr + event_header_offset + 4),
|
||||
gw_mysql_get_byte3(ptr));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -701,21 +681,6 @@ static char* extract_column(GWBUF* buf, int col)
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set CRC32 in the event buffer
|
||||
*
|
||||
* @param event Pointer to event data
|
||||
* @param event_size The event size
|
||||
*/
|
||||
static void event_set_crc32(uint8_t* event, uint32_t event_size)
|
||||
{
|
||||
uint32_t chksum = crc32(0L, NULL, 0);
|
||||
chksum = crc32(chksum,
|
||||
event,
|
||||
event_size - 4);
|
||||
gw_mysql_set_byte4(event + event_size - 4, chksum);
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort filter operation
|
||||
*
|
||||
@ -772,12 +737,6 @@ void BinlogFilterSession::handlePackets(uint32_t len, const REP_HEADER& hdr)
|
||||
|
||||
// Set remaining data to receive accordingy to hdr.event_size
|
||||
m_large_left = hdr.event_size - (MYSQL_PACKET_LENGTH_MAX - 1);
|
||||
|
||||
// Log large event receiving
|
||||
MXS_DEBUG("Large event start: size %" PRIu32 ", "
|
||||
"remaining %" PRIu32 " bytes",
|
||||
hdr.event_size,
|
||||
m_large_left);
|
||||
}
|
||||
}
|
||||
|
||||
@ -791,8 +750,7 @@ void BinlogFilterSession::handlePackets(uint32_t len, const REP_HEADER& hdr)
|
||||
* @param len Received payload size
|
||||
* @param seqno Packet seqno, logging only
|
||||
*/
|
||||
void BinlogFilterSession::handleEventData(uint32_t len,
|
||||
const uint8_t seqno)
|
||||
void BinlogFilterSession::handleEventData(uint32_t len)
|
||||
{
|
||||
/**
|
||||
* Received bytes are part of a large event transmission
|
||||
@ -824,19 +782,13 @@ void BinlogFilterSession::handleEventData(uint32_t len,
|
||||
* @param event_size The binlog event size
|
||||
* @return False for COMMIT, true otherwise
|
||||
*/
|
||||
bool BinlogFilterSession::checkStatement(const uint8_t* event,
|
||||
const uint32_t event_size)
|
||||
bool BinlogFilterSession::checkStatement(const uint8_t* event, const uint32_t event_size)
|
||||
{
|
||||
int db_name_len, var_block_len, statement_len, var_block_len_offset, var_block_end;
|
||||
db_name_len = event[MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 4 + 4];
|
||||
var_block_len_offset = MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 4 + 4 + 1 + 2;
|
||||
var_block_len = gw_mysql_get_byte2(event + var_block_len_offset);
|
||||
var_block_end = var_block_len_offset + 2;
|
||||
|
||||
// SQL statement len
|
||||
statement_len = (MYSQL_HEADER_LEN + 1 + event_size)
|
||||
- (var_block_end + var_block_len + db_name_len + 1)
|
||||
- (m_crc ? 4 : 0);
|
||||
int db_name_len = event[4 + 4];
|
||||
int var_block_len_offset = 4 + 4 + 1 + 2;
|
||||
int var_block_len = gw_mysql_get_byte2(event + var_block_len_offset);
|
||||
int var_block_end = var_block_len_offset + 2;
|
||||
int statement_len = event_size - var_block_end - var_block_len - db_name_len - 1 - (m_crc ? 4 : 0);
|
||||
|
||||
// Set SQL statement, NULL is NOT present in the packet !
|
||||
std::string sql((char*)event + var_block_end + var_block_len + db_name_len + 1, statement_len);
|
||||
@ -856,9 +808,7 @@ bool BinlogFilterSession::checkStatement(const uint8_t* event,
|
||||
|
||||
void BinlogFilterSession::checkAnnotate(const uint8_t* event, const uint32_t event_size)
|
||||
{
|
||||
int statement_len = event_size - (MYSQL_HEADER_LEN + BINLOG_EVENT_HDR_LEN);
|
||||
std::string sql((char*)event + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN - 1, statement_len);
|
||||
|
||||
std::string sql((char*)event, event_size - (m_crc ? 4 : 0));
|
||||
m_skip = should_skip_query(m_filter.getConfig(), sql);
|
||||
MXS_INFO("[%s] Annotate: %s", m_skip ? "SKIP" : " ", sql.c_str());
|
||||
}
|
||||
|
@ -76,7 +76,7 @@ private:
|
||||
const BinlogFilter& m_filter;
|
||||
|
||||
// Skip database/table events in current trasaction
|
||||
void skipDatabaseTable(const uint8_t* data, const REP_HEADER& hdr);
|
||||
void skipDatabaseTable(const uint8_t* data);
|
||||
|
||||
// Get Replication Checksum from registration query
|
||||
void getReplicationChecksum(GWBUF* pPacket);
|
||||
@ -97,7 +97,7 @@ private:
|
||||
void handlePackets(uint32_t len, const REP_HEADER& hdr);
|
||||
|
||||
// Handle event data
|
||||
void handleEventData(uint32_t len, const uint8_t seqno);
|
||||
void handleEventData(uint32_t len);
|
||||
|
||||
// Check SQL statement in QUERY_EVENT
|
||||
bool checkStatement(const uint8_t* event,
|
||||
|
Loading…
x
Reference in New Issue
Block a user