diff --git a/server/modules/filter/binlogfilter/binlogfiltersession.cc b/server/modules/filter/binlogfilter/binlogfiltersession.cc index 881c868cc..e75b7dfee 100644 --- a/server/modules/filter/binlogfilter/binlogfiltersession.cc +++ b/server/modules/filter/binlogfilter/binlogfiltersession.cc @@ -55,21 +55,12 @@ #include "binlogfilter.hh" #include "binlogfiltersession.hh" - -// New packet which replaces the skipped events has 0 payload -#define NEW_PACKET_PAYLOD BINLOG_EVENT_HDR_LEN - -static char* extract_column(GWBUF* buf, int col); -static void event_set_crc32(uint8_t* event, uint32_t event_size); -static void extract_header(register const uint8_t* event, - register REP_HEADER* hdr); /** * BinlogFilterSession constructor * * @param pSession The calling routing/filter session * @param pFilter Pointer to filter configuration */ - BinlogFilterSession::BinlogFilterSession(MXS_SESSION* pSession, const BinlogFilter* pFilter) : mxs::FilterSession(pSession) @@ -154,6 +145,35 @@ int BinlogFilterSession::routeQuery(GWBUF* pPacket) return mxs::FilterSession::routeQuery(pPacket); } +/** + * Extract binlog replication header from event data + * + * @param event The replication event + * @param hdr Pointer to repliction header to fill + */ +static void extract_header(register const uint8_t* event, + register REP_HEADER* hdr) +{ + hdr->seqno = event[3]; + hdr->payload_len = gw_mysql_get_byte3(event); + hdr->ok = event[MYSQL_HEADER_LEN]; + if (hdr->ok != 0) + { + // Don't parse data in case of Error in Replication Stream + return; + } + + // event points to Event Header (19 bytes) + event += MYSQL_HEADER_LEN + 1; + hdr->timestamp = gw_mysql_get_byte4(event); + hdr->event_type = event[4]; + // TODO: add offsets in order to facilitate reading + hdr->serverid = gw_mysql_get_byte4(event + 4 + 1); + hdr->event_size = gw_mysql_get_byte4(event + 4 + 1 + 4); + hdr->next_pos = gw_mysql_get_byte4(event + 4 + 1 + 4 + 4); + hdr->flags = gw_mysql_get_byte2(event + 4 + 1 + 4 + 4 + 4); +} + /** * Reply data to client: Binlog events can be filtered * @@ -201,7 +221,7 @@ int BinlogFilterSession::clientReply(GWBUF* pPacket) { // Handle data part of a large event: // Packet sequence is at offset 3 - handleEventData(len, event[3]); + handleEventData(len); } // If transaction events need to be skipped, @@ -225,40 +245,6 @@ int BinlogFilterSession::clientReply(GWBUF* pPacket) */ void BinlogFilterSession::close() { - if (m_state == BINLOG_MODE) - { - MXS_DEBUG("Slave server %" PRIu32 ": replication stopped.", - m_serverid); - } -} - -/** - * Extract binlog replication header from event data - * - * @param event The replication event - * @param hdr Pointer to repliction header to fill - */ -static void extract_header(register const uint8_t* event, - register REP_HEADER* hdr) -{ - hdr->seqno = event[3]; - hdr->payload_len = gw_mysql_get_byte3(event); - hdr->ok = event[MYSQL_HEADER_LEN]; - if (hdr->ok != 0) - { - // Don't parse data in case of Error in Replication Stream - return; - } - - // event points to Event Header (19 bytes) - event += MYSQL_HEADER_LEN + 1; - hdr->timestamp = gw_mysql_get_byte4(event); - hdr->event_type = event[4]; - // TODO: add offsets in order to facilitate reading - hdr->serverid = gw_mysql_get_byte4(event + 4 + 1); - hdr->event_size = gw_mysql_get_byte4(event + 4 + 1 + 4); - hdr->next_pos = gw_mysql_get_byte4(event + 4 + 1 + 4 + 4); - hdr->flags = gw_mysql_get_byte2(event + 4 + 1 + 4 + 4 + 4); } /** @@ -278,15 +264,15 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer, mxb_assert(!m_is_large); uint8_t* event = GWBUF_DATA(buffer); + uint8_t* body = event + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN; + uint32_t body_size = hdr.event_size - BINLOG_EVENT_HDR_LEN; if (hdr.ok != 0) { // Error in binlog stream: no filter m_state = ERRORED; m_skip = false; - MXS_ERROR("Slave server %" PRIu32 " received error in replication stream, packet #%u", - m_serverid, - event[3]); + MXS_INFO("Slave server %" PRIu32 " received error in replication stream", m_serverid); } else { @@ -307,18 +293,18 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer, case MARIADB_ANNOTATE_ROWS_EVENT: // This even can come if replication mode is ROW and it comes before TABLE_MAP event. It has no // effect so it can be safely replicated. - checkAnnotate(event, hdr.event_size); + checkAnnotate(body, body_size); break; case TABLE_MAP_EVENT: // Check db/table and set m_skip accordingly - skipDatabaseTable(event, hdr); + skipDatabaseTable(body); break; case QUERY_EVENT: // Handle the SQL statement: DDL, DML, BEGIN, COMMIT // If statement is COMMIT, then continue with next case. - if (checkStatement(event, hdr.event_size)) + if (checkStatement(body, body_size)) { break; } @@ -364,7 +350,7 @@ static std::string inline extract_table_info(const uint8_t* ptr) * https://dev.mysql.com/doc/internals/en/event-data-for-specific-event-types.html */ - int db_len_offset = MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 6 + 2; + int db_len_offset = 6 + 2; int db_len = ptr[db_len_offset]; int tbl_len = ptr[db_len_offset + 1 + db_len + 1]; // DB is NULL terminated @@ -432,17 +418,24 @@ static bool should_skip_query(const BinlogConfig& config, const std::string& sql * @param data Binlog event data * @param hdr Reference to replication event header */ -void BinlogFilterSession::skipDatabaseTable(const uint8_t* data, - const REP_HEADER& hdr) +void BinlogFilterSession::skipDatabaseTable(const uint8_t* data) { - // Check for TABLE_MAP event: - // Note: Each time this event is seen the m_skip is overwritten - if (hdr.event_type == TABLE_MAP_EVENT) - { - std::string table = extract_table_info(data); - m_skip = should_skip(m_filter.getConfig(), table); - MXS_INFO("[%s] TABLE_MAP: %s", m_skip ? "SKIP" : " ", table.c_str()); - } + std::string table = extract_table_info(data); + m_skip = should_skip(m_filter.getConfig(), table); + MXS_INFO("[%s] TABLE_MAP: %s", m_skip ? "SKIP" : " ", table.c_str()); +} + +/** + * Set CRC32 in the event buffer + * + * @param event Pointer to event data + * @param event_size The event size + */ +static void event_set_crc32(uint8_t* event, uint32_t event_size) +{ + uint32_t chksum = crc32(0L, NULL, 0); + chksum = crc32(chksum, event, event_size - 4); + gw_mysql_set_byte4(event + event_size - 4, chksum); } /** @@ -604,19 +597,6 @@ void BinlogFilterSession::replaceEvent(GWBUF** ppPacket) // Fix Event Next pos = 0 and set new CRC32 fixEvent(ptr + MYSQL_HEADER_LEN + 1, new_event_size); - - // Log the replaced event - // Now point to event_size offset - event_header_offset = MYSQL_HEADER_LEN + 1 + 4 + 1 + 4; - MXS_DEBUG("Filtered event #%d, " - "ok %d, type %d, flags %d, size %d, next_pos %d, packet_size %d\n", - ptr[3], - ptr[4], - RAND_EVENT, - gw_mysql_get_byte2(ptr + event_header_offset + 4 + 4), - gw_mysql_get_byte4(ptr + event_header_offset), - gw_mysql_get_byte4(ptr + event_header_offset + 4), - gw_mysql_get_byte3(ptr)); } /** @@ -701,21 +681,6 @@ static char* extract_column(GWBUF* buf, int col) return rval; } -/** - * Set CRC32 in the event buffer - * - * @param event Pointer to event data - * @param event_size The event size - */ -static void event_set_crc32(uint8_t* event, uint32_t event_size) -{ - uint32_t chksum = crc32(0L, NULL, 0); - chksum = crc32(chksum, - event, - event_size - 4); - gw_mysql_set_byte4(event + event_size - 4, chksum); -} - /** * Abort filter operation * @@ -772,12 +737,6 @@ void BinlogFilterSession::handlePackets(uint32_t len, const REP_HEADER& hdr) // Set remaining data to receive accordingy to hdr.event_size m_large_left = hdr.event_size - (MYSQL_PACKET_LENGTH_MAX - 1); - - // Log large event receiving - MXS_DEBUG("Large event start: size %" PRIu32 ", " - "remaining %" PRIu32 " bytes", - hdr.event_size, - m_large_left); } } @@ -791,8 +750,7 @@ void BinlogFilterSession::handlePackets(uint32_t len, const REP_HEADER& hdr) * @param len Received payload size * @param seqno Packet seqno, logging only */ -void BinlogFilterSession::handleEventData(uint32_t len, - const uint8_t seqno) +void BinlogFilterSession::handleEventData(uint32_t len) { /** * Received bytes are part of a large event transmission @@ -824,19 +782,13 @@ void BinlogFilterSession::handleEventData(uint32_t len, * @param event_size The binlog event size * @return False for COMMIT, true otherwise */ -bool BinlogFilterSession::checkStatement(const uint8_t* event, - const uint32_t event_size) +bool BinlogFilterSession::checkStatement(const uint8_t* event, const uint32_t event_size) { - int db_name_len, var_block_len, statement_len, var_block_len_offset, var_block_end; - db_name_len = event[MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 4 + 4]; - var_block_len_offset = MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 4 + 4 + 1 + 2; - var_block_len = gw_mysql_get_byte2(event + var_block_len_offset); - var_block_end = var_block_len_offset + 2; - - // SQL statement len - statement_len = (MYSQL_HEADER_LEN + 1 + event_size) - - (var_block_end + var_block_len + db_name_len + 1) - - (m_crc ? 4 : 0); + int db_name_len = event[4 + 4]; + int var_block_len_offset = 4 + 4 + 1 + 2; + int var_block_len = gw_mysql_get_byte2(event + var_block_len_offset); + int var_block_end = var_block_len_offset + 2; + int statement_len = event_size - var_block_end - var_block_len - db_name_len - 1 - (m_crc ? 4 : 0); // Set SQL statement, NULL is NOT present in the packet ! std::string sql((char*)event + var_block_end + var_block_len + db_name_len + 1, statement_len); @@ -856,9 +808,7 @@ bool BinlogFilterSession::checkStatement(const uint8_t* event, void BinlogFilterSession::checkAnnotate(const uint8_t* event, const uint32_t event_size) { - int statement_len = event_size - (MYSQL_HEADER_LEN + BINLOG_EVENT_HDR_LEN); - std::string sql((char*)event + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN - 1, statement_len); - + std::string sql((char*)event, event_size - (m_crc ? 4 : 0)); m_skip = should_skip_query(m_filter.getConfig(), sql); MXS_INFO("[%s] Annotate: %s", m_skip ? "SKIP" : " ", sql.c_str()); } diff --git a/server/modules/filter/binlogfilter/binlogfiltersession.hh b/server/modules/filter/binlogfilter/binlogfiltersession.hh index 65b7c31e1..8b34c1ef2 100644 --- a/server/modules/filter/binlogfilter/binlogfiltersession.hh +++ b/server/modules/filter/binlogfilter/binlogfiltersession.hh @@ -76,7 +76,7 @@ private: const BinlogFilter& m_filter; // Skip database/table events in current trasaction - void skipDatabaseTable(const uint8_t* data, const REP_HEADER& hdr); + void skipDatabaseTable(const uint8_t* data); // Get Replication Checksum from registration query void getReplicationChecksum(GWBUF* pPacket); @@ -97,7 +97,7 @@ private: void handlePackets(uint32_t len, const REP_HEADER& hdr); // Handle event data - void handleEventData(uint32_t len, const uint8_t seqno); + void handleEventData(uint32_t len); // Check SQL statement in QUERY_EVENT bool checkStatement(const uint8_t* event,