diff --git a/server/modules/filter/binlogfilter/binlogfiltersession.cc b/server/modules/filter/binlogfilter/binlogfiltersession.cc index 61f689276..5337d8934 100644 --- a/server/modules/filter/binlogfilter/binlogfiltersession.cc +++ b/server/modules/filter/binlogfilter/binlogfiltersession.cc @@ -27,6 +27,8 @@ static char* extract_column(GWBUF *buf, int col); static void event_set_crc32(uint8_t* event, uint32_t event_size); +static void extract_header(register const uint8_t *event, + register REP_HEADER *hdr); /** * BinlogFilterSession constructor @@ -42,8 +44,9 @@ BinlogFilterSession::BinlogFilterSession(MXS_SESSION* pSession, , m_serverid(0) , m_state(pFilter->is_active() ? COMMAND_MODE : INACTIVE) , m_skip(false) - , m_complete_packet(true) , m_crc(false) + , m_large_left(0) + , m_is_large(0) , m_sql_query(NULL) { MXS_NOTICE("Filter [%s] is %s", @@ -150,6 +153,10 @@ int BinlogFilterSession::routeQuery(GWBUF* pPacket) */ int BinlogFilterSession::clientReply(GWBUF* pPacket) { + uint8_t* event = GWBUF_DATA(pPacket); + uint32_t len = MYSQL_GET_PAYLOAD_LEN(event); + REP_HEADER hdr; + switch (m_state) { /** @@ -168,12 +175,32 @@ int BinlogFilterSession::clientReply(GWBUF* pPacket) break; case BINLOG_MODE: - if (skipEvent(pPacket)) + if (!m_is_large) { - // Assuming ROW replication format: - // If transaction events need to be skipped, - // they are replaced by an empty paylod packet - filterEvent(pPacket); + // This binlog event contains: + // OK byte + // replication event header + // event data, partial or total (if > 16 MBytes) + extract_header(event, &hdr); + + // Check whether this event and next ones can be filtered + checkEvent(pPacket, hdr); + + // Check whether this event is part of a large event being sent + handlePackets(len, hdr); + } + else + { + // Handle data part of a large event + handleEventData(len, event[3]); + } + + // Assuming ROW replication format: + // If transaction events need to be skipped, + // they are replaced by an empty paylod packet + if (m_skip) + { + replaceEvent(&pPacket); } break; @@ -203,23 +230,26 @@ void BinlogFilterSession::close() * @param event The replication event * @param hdr Pointer to repliction header to fill */ -static inline void extractHeader(register const uint8_t *event, - register REP_HEADER *hdr) +static void extract_header(register const uint8_t *event, + register REP_HEADER *hdr) { - hdr->payload_len = gw_mysql_get_byte3(event); hdr->seqno = event[3]; + hdr->payload_len = gw_mysql_get_byte3(event); hdr->ok = event[MYSQL_HEADER_LEN]; - hdr->timestamp = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1); - hdr->event_type = event[MYSQL_HEADER_LEN + 1 + 4]; + event++; + hdr->timestamp = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN); + hdr->event_type = event[MYSQL_HEADER_LEN + 4]; // TODO: add offsets in order to facilitate reading - hdr->serverid = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1 + 4 + 1); - hdr->event_size = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4); - hdr->next_pos = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4); - hdr->flags = gw_mysql_get_byte2(event + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4 + 4); + hdr->serverid = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 4 + 1); + hdr->event_size = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 4 + 1 + 4); + hdr->next_pos = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 4 + 1 + 4 + 4); + hdr->flags = gw_mysql_get_byte2(event + MYSQL_HEADER_LEN + 4 + 1 + 4 + 4 + 4); - MXS_INFO("Event Header: serverId %" PRIu32 ", event_type [%d], " + MXS_INFO("Binlog Event, Header: pkt #%d, " + "serverId %" PRIu32 ", event_type [%d], " "flags %d, event_size %" PRIu32 ", next_pos %" PRIu32 ", " "packet size %" PRIu32 "", + hdr->seqno, hdr->serverid, hdr->event_type, hdr->flags, @@ -235,25 +265,34 @@ static inline void extractHeader(register const uint8_t *event, * Member variable m_skip is set accordingly to db/table match. * * @param buffer The GWBUF with binlog event data + * @param hdr Reference to repliction event header * @return True id TABLE_MAP_EVENT contains * db/table names to skip */ -bool BinlogFilterSession::skipEvent(GWBUF* buffer) +bool BinlogFilterSession::checkEvent(GWBUF* buffer, + const REP_HEADER& hdr) { uint8_t *event = GWBUF_DATA(buffer); - REP_HEADER hdr; - // Extract Replication header event from event data - extractHeader(event, &hdr); - - if (hdr.ok == 0) + if (hdr.ok != 0) { + // Error in binlog stream: no filter + m_skip = false; + return m_skip; + } + + if (!m_is_large) + { + // Current event size is less than MYSQL_PACKET_LENGTH_MAX + // or is the begiining of large event. switch(hdr.event_type) { case TABLE_MAP_EVENT: // Check db/table and set m_skip accordingly skipDatabaseTable(event, hdr); break; + case QUERY_EVENT: + //TODO handle the event and extract dbname or COMMIT case XID_EVENT: // COMMIT: reset m_skip if set and set next pos to 0 if (m_skip) @@ -265,20 +304,17 @@ bool BinlogFilterSession::skipEvent(GWBUF* buffer) */ fixEvent(event + MYSQL_HEADER_LEN + 1, hdr.event_size); - MXS_INFO("Skipped events: Setting next_pos = 0 in XID_EVENT"); + MXS_INFO("Skipped events: Setting next_pos = 0 in XID_EVENT/COMMIT"); } break; default: // Other events can be skipped or not, depending on m_skip value break; } - // m_skip could be true or false - return m_skip; - } - else - { - return false; // always false: no filtering } + + // m_skip could be true or false + return m_skip; } /** @@ -313,7 +349,7 @@ void BinlogFilterSession::skipDatabaseTable(const uint8_t* data, { // Check for TABLE_MAP event: // Each time this event is seen the m_skip is overwritten - if (hdr.ok == 0 && hdr.event_type == TABLE_MAP_EVENT) + if (hdr.event_type == TABLE_MAP_EVENT) { char *db = NULL; char *table = NULL; @@ -360,15 +396,41 @@ void BinlogFilterSession::fixEvent(uint8_t* event, uint32_t event_size) * * @param pPacket The GWBUF with event data */ -void BinlogFilterSession::filterEvent(GWBUF* pPacket) +void BinlogFilterSession::replaceEvent(GWBUF** pPacket) { + + //GWBUF* packet; + uint32_t event_len = gwbuf_length(*pPacket); + + // If size < BINLOG_EVENT_HDR_LEN + crc32 add rand_event to buff contiguos ss_dassert(m_skip == true); - uint8_t *ptr = GWBUF_DATA(pPacket); // size of empty rand_event (header + 0 bytes + CRC32) uint32_t new_event_size = BINLOG_EVENT_HDR_LEN + 0; new_event_size += m_crc ? 4 : 0; + // If size < BINLOG_EVENT_HDR_LEN + crc32, then create rand_event + if (event_len < (MYSQL_HEADER_LEN + 1 + new_event_size)) + { + GWBUF* tmp_buff; + tmp_buff = gwbuf_alloc(MYSQL_HEADER_LEN + 1 + (new_event_size - event_len)); + // Append new buff to current one + *pPacket = gwbuf_append(*pPacket, tmp_buff); + // Make current buff contiguous + *pPacket = gwbuf_make_contiguous(*pPacket); + } + + // point do data + uint8_t *ptr = GWBUF_DATA(*pPacket); + // Force OK flag + ptr[MYSQL_HEADER_LEN] = 0; + + // TODO Add offsets + + // Force Set timestamp to 0 + gw_mysql_set_byte4(ptr + MYSQL_HEADER_LEN + 1, 0); + // Force Set server_id to 0 + gw_mysql_set_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1, 0); // Set NEW event_type ptr[MYSQL_HEADER_LEN + 1 + 4] = RAND_EVENT; // SET ignorable flags @@ -382,7 +444,18 @@ void BinlogFilterSession::filterEvent(GWBUF* pPacket) // Set New Packet size: new event_size + 1 byte replication status gw_mysql_set_byte3(ptr, new_event_size + 1); - MXS_INFO("Filtered event #%d," + // Remove the useless bytes in the buffer + if (gwbuf_length(*pPacket) > (new_event_size + 1 + MYSQL_HEADER_LEN)) + { + uint32_t remove_bytes = gwbuf_length(*pPacket) - (new_event_size + 1 + MYSQL_HEADER_LEN); + *pPacket = gwbuf_rtrim(*pPacket, remove_bytes); + } + + // Fix Event Next pos = 0 and set new CRC32 + fixEvent(ptr + MYSQL_HEADER_LEN + 1, new_event_size); + + // Log Filtered event + MXS_DEBUG("Filtered event #%d," "ok %d, type %d, flags %d, size %d, next_pos %d, packet_size %d\n", ptr[3], ptr[4], @@ -391,13 +464,6 @@ void BinlogFilterSession::filterEvent(GWBUF* pPacket) gw_mysql_get_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4), gw_mysql_get_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4), gw_mysql_get_byte3(ptr)); - - // Remove the useless bytes in the buffer - uint32_t remove_bytes = gwbuf_length(pPacket) - (new_event_size + 1 + MYSQL_HEADER_LEN); - pPacket = gwbuf_rtrim(pPacket, remove_bytes); - - // Fix Event Next pos = 0 and set new CRC32 - fixEvent(ptr + MYSQL_HEADER_LEN + 1, new_event_size); } /** @@ -537,3 +603,69 @@ bool BinlogFilterSession::getReplicationChecksum(GWBUF* pPacket) return true; } + +/** + * Handles the event size and sets member variables + * 'm_is_large' and 'm_large_left' + * + * If received data len is MYSQL_PACKET_LENGTH_MAX + * then the beginning of a large event receiving is set. + * + * Also remaininf data are set + * + * @param len The binlog event paylod len + * @param hdr The reference to binlog event header + */ +void BinlogFilterSession::handlePackets(uint32_t len, const REP_HEADER& hdr) +{ + // Mark the beginning of a lrage event transmission + if (len == MYSQL_PACKET_LENGTH_MAX) + { + // Mark the beginning of a large event transmission + m_is_large = true; + + // Set remaining data to receive accordingy to hdr.event_size + m_large_left = hdr.event_size - (MYSQL_PACKET_LENGTH_MAX - 1); + + // Log large event receiving + MXS_DEBUG("Large event start: size %" PRIu32 ", " + "remaining %" PRIu32 " bytes", + hdr.event_size, + m_large_left); + } +} + +/** + * Process received data size of a large event trasmission + * Incoming data don't carry the OK byte and event header + * + * This sets member variables + * 'm_is_large' and 'm_large_left' + * + * @param len Received payload size + * @param seqno Packet seqno, logging only + */ +void BinlogFilterSession::handleEventData(uint32_t len, + const uint8_t seqno) +{ + /** + * Received bytes are part of a large event transmission + * Network packet has 4 bytes header + data: + * no ok byte, no event header! + */ + + // Decrement remaining bytes + m_large_left -= len; + + // Mark the end of a large event transmission + if (m_large_left == 0) + { + m_is_large = false; + } + + MXS_INFO("Binlog Event, data_only: pkt #%d, received %" PRIu32 ", " + "remaining %" PRIu32 " bytes\n", + seqno, + len, + m_large_left); +} diff --git a/server/modules/filter/binlogfilter/binlogfiltersession.hh b/server/modules/filter/binlogfilter/binlogfiltersession.hh index c06ed1adf..1f4669893 100644 --- a/server/modules/filter/binlogfilter/binlogfiltersession.hh +++ b/server/modules/filter/binlogfilter/binlogfiltersession.hh @@ -22,6 +22,7 @@ #define RAND_EVENT 0x000D #define TABLE_MAP_EVENT 0x0013 #define XID_EVENT 0x0010 +#define QUERY_EVENT 0x0002 #define BINLOG_EVENT_HDR_LEN 19 typedef struct rep_header_t @@ -82,10 +83,16 @@ private: void fixEvent(uint8_t* data, uint32_t event_size); // Whether to skip current event - bool skipEvent(GWBUF* data); + bool checkEvent(GWBUF* data, const REP_HEADER& hdr); // Filter the replication event - void filterEvent(GWBUF* data); + void replaceEvent(GWBUF** data); + + // Handle event size + void handlePackets(uint32_t len, const REP_HEADER& hdr); + + // Handle event data + void handleEventData(uint32_t len, const uint8_t seqno); private: // Internal states for filter operations @@ -102,8 +109,8 @@ private: uint32_t m_serverid; // server-id of connected slave state_t m_state; // Internal state bool m_skip; // Mark event skipping - bool m_complete_packet; // A complete received. Not implemented bool m_crc; // CRC32 for events. Not implemented - bool m_large_payload; // Packet bigger than 16MB. Not implemented + uint32_t m_large_left; // Remaining bytes of a large event + bool m_is_large; // Large Event indicator GWBUF* m_sql_query; // SQL query buffer };