MXS-701: added support for large packets

MXS-701: added support for large packets.

Set timestamp and server id to 0 into replaced events
This commit is contained in:
MassimilianoPinto
2017-11-21 09:52:17 +01:00
parent f5d3fa80ba
commit 954847f8d9
2 changed files with 183 additions and 44 deletions

View File

@ -27,6 +27,8 @@
static char* extract_column(GWBUF *buf, int col); static char* extract_column(GWBUF *buf, int col);
static void event_set_crc32(uint8_t* event, uint32_t event_size); static void event_set_crc32(uint8_t* event, uint32_t event_size);
static void extract_header(register const uint8_t *event,
register REP_HEADER *hdr);
/** /**
* BinlogFilterSession constructor * BinlogFilterSession constructor
@ -42,8 +44,9 @@ BinlogFilterSession::BinlogFilterSession(MXS_SESSION* pSession,
, m_serverid(0) , m_serverid(0)
, m_state(pFilter->is_active() ? COMMAND_MODE : INACTIVE) , m_state(pFilter->is_active() ? COMMAND_MODE : INACTIVE)
, m_skip(false) , m_skip(false)
, m_complete_packet(true)
, m_crc(false) , m_crc(false)
, m_large_left(0)
, m_is_large(0)
, m_sql_query(NULL) , m_sql_query(NULL)
{ {
MXS_NOTICE("Filter [%s] is %s", MXS_NOTICE("Filter [%s] is %s",
@ -150,6 +153,10 @@ int BinlogFilterSession::routeQuery(GWBUF* pPacket)
*/ */
int BinlogFilterSession::clientReply(GWBUF* pPacket) int BinlogFilterSession::clientReply(GWBUF* pPacket)
{ {
uint8_t* event = GWBUF_DATA(pPacket);
uint32_t len = MYSQL_GET_PAYLOAD_LEN(event);
REP_HEADER hdr;
switch (m_state) switch (m_state)
{ {
/** /**
@ -168,12 +175,32 @@ int BinlogFilterSession::clientReply(GWBUF* pPacket)
break; break;
case BINLOG_MODE: case BINLOG_MODE:
if (skipEvent(pPacket)) if (!m_is_large)
{ {
// This binlog event contains:
// OK byte
// replication event header
// event data, partial or total (if > 16 MBytes)
extract_header(event, &hdr);
// Check whether this event and next ones can be filtered
checkEvent(pPacket, hdr);
// Check whether this event is part of a large event being sent
handlePackets(len, hdr);
}
else
{
// Handle data part of a large event
handleEventData(len, event[3]);
}
// Assuming ROW replication format: // Assuming ROW replication format:
// If transaction events need to be skipped, // If transaction events need to be skipped,
// they are replaced by an empty paylod packet // they are replaced by an empty paylod packet
filterEvent(pPacket); if (m_skip)
{
replaceEvent(&pPacket);
} }
break; break;
@ -203,23 +230,26 @@ void BinlogFilterSession::close()
* @param event The replication event * @param event The replication event
* @param hdr Pointer to repliction header to fill * @param hdr Pointer to repliction header to fill
*/ */
static inline void extractHeader(register const uint8_t *event, static void extract_header(register const uint8_t *event,
register REP_HEADER *hdr) register REP_HEADER *hdr)
{ {
hdr->payload_len = gw_mysql_get_byte3(event);
hdr->seqno = event[3]; hdr->seqno = event[3];
hdr->payload_len = gw_mysql_get_byte3(event);
hdr->ok = event[MYSQL_HEADER_LEN]; hdr->ok = event[MYSQL_HEADER_LEN];
hdr->timestamp = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1); event++;
hdr->event_type = event[MYSQL_HEADER_LEN + 1 + 4]; hdr->timestamp = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN);
hdr->event_type = event[MYSQL_HEADER_LEN + 4];
// TODO: add offsets in order to facilitate reading // TODO: add offsets in order to facilitate reading
hdr->serverid = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1 + 4 + 1); hdr->serverid = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 4 + 1);
hdr->event_size = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4); hdr->event_size = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 4 + 1 + 4);
hdr->next_pos = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4); hdr->next_pos = gw_mysql_get_byte4(event + MYSQL_HEADER_LEN + 4 + 1 + 4 + 4);
hdr->flags = gw_mysql_get_byte2(event + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4 + 4); hdr->flags = gw_mysql_get_byte2(event + MYSQL_HEADER_LEN + 4 + 1 + 4 + 4 + 4);
MXS_INFO("Event Header: serverId %" PRIu32 ", event_type [%d], " MXS_INFO("Binlog Event, Header: pkt #%d, "
"serverId %" PRIu32 ", event_type [%d], "
"flags %d, event_size %" PRIu32 ", next_pos %" PRIu32 ", " "flags %d, event_size %" PRIu32 ", next_pos %" PRIu32 ", "
"packet size %" PRIu32 "", "packet size %" PRIu32 "",
hdr->seqno,
hdr->serverid, hdr->serverid,
hdr->event_type, hdr->event_type,
hdr->flags, hdr->flags,
@ -235,25 +265,34 @@ static inline void extractHeader(register const uint8_t *event,
* Member variable m_skip is set accordingly to db/table match. * Member variable m_skip is set accordingly to db/table match.
* *
* @param buffer The GWBUF with binlog event data * @param buffer The GWBUF with binlog event data
* @param hdr Reference to repliction event header
* @return True id TABLE_MAP_EVENT contains * @return True id TABLE_MAP_EVENT contains
* db/table names to skip * db/table names to skip
*/ */
bool BinlogFilterSession::skipEvent(GWBUF* buffer) bool BinlogFilterSession::checkEvent(GWBUF* buffer,
const REP_HEADER& hdr)
{ {
uint8_t *event = GWBUF_DATA(buffer); uint8_t *event = GWBUF_DATA(buffer);
REP_HEADER hdr;
// Extract Replication header event from event data if (hdr.ok != 0)
extractHeader(event, &hdr);
if (hdr.ok == 0)
{ {
// Error in binlog stream: no filter
m_skip = false;
return m_skip;
}
if (!m_is_large)
{
// Current event size is less than MYSQL_PACKET_LENGTH_MAX
// or is the begiining of large event.
switch(hdr.event_type) switch(hdr.event_type)
{ {
case TABLE_MAP_EVENT: case TABLE_MAP_EVENT:
// Check db/table and set m_skip accordingly // Check db/table and set m_skip accordingly
skipDatabaseTable(event, hdr); skipDatabaseTable(event, hdr);
break; break;
case QUERY_EVENT:
//TODO handle the event and extract dbname or COMMIT
case XID_EVENT: case XID_EVENT:
// COMMIT: reset m_skip if set and set next pos to 0 // COMMIT: reset m_skip if set and set next pos to 0
if (m_skip) if (m_skip)
@ -265,21 +304,18 @@ bool BinlogFilterSession::skipEvent(GWBUF* buffer)
*/ */
fixEvent(event + MYSQL_HEADER_LEN + 1, hdr.event_size); fixEvent(event + MYSQL_HEADER_LEN + 1, hdr.event_size);
MXS_INFO("Skipped events: Setting next_pos = 0 in XID_EVENT"); MXS_INFO("Skipped events: Setting next_pos = 0 in XID_EVENT/COMMIT");
} }
break; break;
default: default:
// Other events can be skipped or not, depending on m_skip value // Other events can be skipped or not, depending on m_skip value
break; break;
} }
}
// m_skip could be true or false // m_skip could be true or false
return m_skip; return m_skip;
} }
else
{
return false; // always false: no filtering
}
}
/** /**
* Extract Dbname and Table name from TABLE_MAP event * Extract Dbname and Table name from TABLE_MAP event
@ -313,7 +349,7 @@ void BinlogFilterSession::skipDatabaseTable(const uint8_t* data,
{ {
// Check for TABLE_MAP event: // Check for TABLE_MAP event:
// Each time this event is seen the m_skip is overwritten // Each time this event is seen the m_skip is overwritten
if (hdr.ok == 0 && hdr.event_type == TABLE_MAP_EVENT) if (hdr.event_type == TABLE_MAP_EVENT)
{ {
char *db = NULL; char *db = NULL;
char *table = NULL; char *table = NULL;
@ -360,15 +396,41 @@ void BinlogFilterSession::fixEvent(uint8_t* event, uint32_t event_size)
* *
* @param pPacket The GWBUF with event data * @param pPacket The GWBUF with event data
*/ */
void BinlogFilterSession::filterEvent(GWBUF* pPacket) void BinlogFilterSession::replaceEvent(GWBUF** pPacket)
{ {
//GWBUF* packet;
uint32_t event_len = gwbuf_length(*pPacket);
// If size < BINLOG_EVENT_HDR_LEN + crc32 add rand_event to buff contiguos
ss_dassert(m_skip == true); ss_dassert(m_skip == true);
uint8_t *ptr = GWBUF_DATA(pPacket);
// size of empty rand_event (header + 0 bytes + CRC32) // size of empty rand_event (header + 0 bytes + CRC32)
uint32_t new_event_size = BINLOG_EVENT_HDR_LEN + 0; uint32_t new_event_size = BINLOG_EVENT_HDR_LEN + 0;
new_event_size += m_crc ? 4 : 0; new_event_size += m_crc ? 4 : 0;
// If size < BINLOG_EVENT_HDR_LEN + crc32, then create rand_event
if (event_len < (MYSQL_HEADER_LEN + 1 + new_event_size))
{
GWBUF* tmp_buff;
tmp_buff = gwbuf_alloc(MYSQL_HEADER_LEN + 1 + (new_event_size - event_len));
// Append new buff to current one
*pPacket = gwbuf_append(*pPacket, tmp_buff);
// Make current buff contiguous
*pPacket = gwbuf_make_contiguous(*pPacket);
}
// point do data
uint8_t *ptr = GWBUF_DATA(*pPacket);
// Force OK flag
ptr[MYSQL_HEADER_LEN] = 0;
// TODO Add offsets
// Force Set timestamp to 0
gw_mysql_set_byte4(ptr + MYSQL_HEADER_LEN + 1, 0);
// Force Set server_id to 0
gw_mysql_set_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1, 0);
// Set NEW event_type // Set NEW event_type
ptr[MYSQL_HEADER_LEN + 1 + 4] = RAND_EVENT; ptr[MYSQL_HEADER_LEN + 1 + 4] = RAND_EVENT;
// SET ignorable flags // SET ignorable flags
@ -382,7 +444,18 @@ void BinlogFilterSession::filterEvent(GWBUF* pPacket)
// Set New Packet size: new event_size + 1 byte replication status // Set New Packet size: new event_size + 1 byte replication status
gw_mysql_set_byte3(ptr, new_event_size + 1); gw_mysql_set_byte3(ptr, new_event_size + 1);
MXS_INFO("Filtered event #%d," // Remove the useless bytes in the buffer
if (gwbuf_length(*pPacket) > (new_event_size + 1 + MYSQL_HEADER_LEN))
{
uint32_t remove_bytes = gwbuf_length(*pPacket) - (new_event_size + 1 + MYSQL_HEADER_LEN);
*pPacket = gwbuf_rtrim(*pPacket, remove_bytes);
}
// Fix Event Next pos = 0 and set new CRC32
fixEvent(ptr + MYSQL_HEADER_LEN + 1, new_event_size);
// Log Filtered event
MXS_DEBUG("Filtered event #%d,"
"ok %d, type %d, flags %d, size %d, next_pos %d, packet_size %d\n", "ok %d, type %d, flags %d, size %d, next_pos %d, packet_size %d\n",
ptr[3], ptr[3],
ptr[4], ptr[4],
@ -391,13 +464,6 @@ void BinlogFilterSession::filterEvent(GWBUF* pPacket)
gw_mysql_get_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4), gw_mysql_get_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4),
gw_mysql_get_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4), gw_mysql_get_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4),
gw_mysql_get_byte3(ptr)); gw_mysql_get_byte3(ptr));
// Remove the useless bytes in the buffer
uint32_t remove_bytes = gwbuf_length(pPacket) - (new_event_size + 1 + MYSQL_HEADER_LEN);
pPacket = gwbuf_rtrim(pPacket, remove_bytes);
// Fix Event Next pos = 0 and set new CRC32
fixEvent(ptr + MYSQL_HEADER_LEN + 1, new_event_size);
} }
/** /**
@ -537,3 +603,69 @@ bool BinlogFilterSession::getReplicationChecksum(GWBUF* pPacket)
return true; return true;
} }
/**
* Handles the event size and sets member variables
* 'm_is_large' and 'm_large_left'
*
* If received data len is MYSQL_PACKET_LENGTH_MAX
* then the beginning of a large event receiving is set.
*
* Also remaininf data are set
*
* @param len The binlog event paylod len
* @param hdr The reference to binlog event header
*/
void BinlogFilterSession::handlePackets(uint32_t len, const REP_HEADER& hdr)
{
// Mark the beginning of a lrage event transmission
if (len == MYSQL_PACKET_LENGTH_MAX)
{
// Mark the beginning of a large event transmission
m_is_large = true;
// Set remaining data to receive accordingy to hdr.event_size
m_large_left = hdr.event_size - (MYSQL_PACKET_LENGTH_MAX - 1);
// Log large event receiving
MXS_DEBUG("Large event start: size %" PRIu32 ", "
"remaining %" PRIu32 " bytes",
hdr.event_size,
m_large_left);
}
}
/**
* Process received data size of a large event trasmission
* Incoming data don't carry the OK byte and event header
*
* This sets member variables
* 'm_is_large' and 'm_large_left'
*
* @param len Received payload size
* @param seqno Packet seqno, logging only
*/
void BinlogFilterSession::handleEventData(uint32_t len,
const uint8_t seqno)
{
/**
* Received bytes are part of a large event transmission
* Network packet has 4 bytes header + data:
* no ok byte, no event header!
*/
// Decrement remaining bytes
m_large_left -= len;
// Mark the end of a large event transmission
if (m_large_left == 0)
{
m_is_large = false;
}
MXS_INFO("Binlog Event, data_only: pkt #%d, received %" PRIu32 ", "
"remaining %" PRIu32 " bytes\n",
seqno,
len,
m_large_left);
}

View File

@ -22,6 +22,7 @@
#define RAND_EVENT 0x000D #define RAND_EVENT 0x000D
#define TABLE_MAP_EVENT 0x0013 #define TABLE_MAP_EVENT 0x0013
#define XID_EVENT 0x0010 #define XID_EVENT 0x0010
#define QUERY_EVENT 0x0002
#define BINLOG_EVENT_HDR_LEN 19 #define BINLOG_EVENT_HDR_LEN 19
typedef struct rep_header_t typedef struct rep_header_t
@ -82,10 +83,16 @@ private:
void fixEvent(uint8_t* data, uint32_t event_size); void fixEvent(uint8_t* data, uint32_t event_size);
// Whether to skip current event // Whether to skip current event
bool skipEvent(GWBUF* data); bool checkEvent(GWBUF* data, const REP_HEADER& hdr);
// Filter the replication event // Filter the replication event
void filterEvent(GWBUF* data); void replaceEvent(GWBUF** data);
// Handle event size
void handlePackets(uint32_t len, const REP_HEADER& hdr);
// Handle event data
void handleEventData(uint32_t len, const uint8_t seqno);
private: private:
// Internal states for filter operations // Internal states for filter operations
@ -102,8 +109,8 @@ private:
uint32_t m_serverid; // server-id of connected slave uint32_t m_serverid; // server-id of connected slave
state_t m_state; // Internal state state_t m_state; // Internal state
bool m_skip; // Mark event skipping bool m_skip; // Mark event skipping
bool m_complete_packet; // A complete received. Not implemented
bool m_crc; // CRC32 for events. Not implemented bool m_crc; // CRC32 for events. Not implemented
bool m_large_payload; // Packet bigger than 16MB. Not implemented uint32_t m_large_left; // Remaining bytes of a large event
bool m_is_large; // Large Event indicator
GWBUF* m_sql_query; // SQL query buffer GWBUF* m_sql_query; // SQL query buffer
}; };