MXS-701: RAND_EVENT nows has the two seeds filled
MXS-701: RAND_EVENT nows has the two seeds filled with original packet size and original event type: #700101 1:00:00 server id 0 end_log_pos 0 CRC32 0x858bdfd5 Rand SET @@RAND_SEED1=62, @@RAND_SEED2=19/*!*/;
This commit is contained in:
@ -469,77 +469,156 @@ void BinlogFilterSession::fixEvent(uint8_t* event, uint32_t event_size)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Replace data in the current event: no memory allocation
|
* Replace data in the current packet with binlog event
|
||||||
|
* with a RAND_EVENT
|
||||||
|
* No memory allocation is done if current packet size
|
||||||
|
* is bigger than (MYSQL_HEADER_LEN + 1 + RAND_EVENT)
|
||||||
*
|
*
|
||||||
* @param pPacket The GWBUF with event data
|
* @param pPacket The GWBUF with event data
|
||||||
*/
|
*/
|
||||||
void BinlogFilterSession::replaceEvent(GWBUF** pPacket)
|
void BinlogFilterSession::replaceEvent(GWBUF** ppPacket)
|
||||||
{
|
{
|
||||||
|
|
||||||
//GWBUF* packet;
|
uint32_t buf_len = gwbuf_length(*ppPacket);
|
||||||
uint32_t event_len = gwbuf_length(*pPacket);
|
uint32_t orig_event_type = 0;
|
||||||
|
|
||||||
// If size < BINLOG_EVENT_HDR_LEN + crc32 add rand_event to buff contiguos
|
// If size < BINLOG_EVENT_HDR_LEN + crc32 add rand_event to buff contiguos
|
||||||
ss_dassert(m_skip == true);
|
ss_dassert(m_skip == true);
|
||||||
|
|
||||||
// size of empty rand_event (header + 0 bytes + CRC32)
|
/**
|
||||||
uint32_t new_event_size = BINLOG_EVENT_HDR_LEN + 0;
|
* RAND_EVENT is:
|
||||||
|
* - 19 bytes header
|
||||||
|
* - 8 bytes first seed
|
||||||
|
* - 8 bytes second seed
|
||||||
|
* - 4 bytes CRC32 (if required)
|
||||||
|
*/
|
||||||
|
// size of rand_event (header + 16 bytes + CRC32)
|
||||||
|
uint32_t new_event_size = BINLOG_EVENT_HDR_LEN + 16;
|
||||||
new_event_size += m_crc ? 4 : 0;
|
new_event_size += m_crc ? 4 : 0;
|
||||||
|
|
||||||
// If size < BINLOG_EVENT_HDR_LEN + crc32, then create rand_event
|
/**
|
||||||
if (event_len < (MYSQL_HEADER_LEN + 1 + new_event_size))
|
* If buf_len < (network packet len with RAND_EVENT),
|
||||||
|
* then create a new complete rand_event network packet.
|
||||||
|
*
|
||||||
|
* This might happen in case of:
|
||||||
|
* - any "small" binlog event
|
||||||
|
* or
|
||||||
|
* - remaining bytes of a large event transmission
|
||||||
|
*/
|
||||||
|
if (buf_len < (MYSQL_HEADER_LEN + 1 + new_event_size))
|
||||||
{
|
{
|
||||||
GWBUF* tmp_buff;
|
GWBUF* pTmpbuf;
|
||||||
tmp_buff = gwbuf_alloc(MYSQL_HEADER_LEN + 1 + (new_event_size - event_len));
|
pTmpbuf = gwbuf_alloc(MYSQL_HEADER_LEN + 1 + \
|
||||||
|
(new_event_size - buf_len));
|
||||||
// Append new buff to current one
|
// Append new buff to current one
|
||||||
*pPacket = gwbuf_append(*pPacket, tmp_buff);
|
*ppPacket = gwbuf_append(*ppPacket, pTmpbuf);
|
||||||
// Make current buff contiguous
|
// Make current buff contiguous
|
||||||
*pPacket = gwbuf_make_contiguous(*pPacket);
|
*ppPacket = gwbuf_make_contiguous(*ppPacket);
|
||||||
}
|
}
|
||||||
|
|
||||||
// point do data
|
// point to data
|
||||||
uint8_t *ptr = GWBUF_DATA(*pPacket);
|
uint8_t *ptr = GWBUF_DATA(*ppPacket);
|
||||||
// Force OK flag
|
|
||||||
ptr[MYSQL_HEADER_LEN] = 0;
|
|
||||||
|
|
||||||
// TODO Add offsets
|
|
||||||
|
|
||||||
// Force Set timestamp to 0
|
|
||||||
gw_mysql_set_byte4(ptr + MYSQL_HEADER_LEN + 1, 0);
|
|
||||||
// Force Set server_id to 0
|
|
||||||
gw_mysql_set_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1, 0);
|
|
||||||
// Set NEW event_type
|
|
||||||
ptr[MYSQL_HEADER_LEN + 1 + 4] = RAND_EVENT;
|
|
||||||
// SET ignorable flags
|
|
||||||
gw_mysql_set_byte2(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4 + 4,
|
|
||||||
LOG_EVENT_IGNORABLE_F | LOG_EVENT_SKIP_REPLICATION_F);
|
|
||||||
|
|
||||||
// Set event_event size)
|
|
||||||
gw_mysql_set_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4,
|
|
||||||
new_event_size);
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Replication protocol:
|
||||||
|
* 1) set 3 bytes for packet size
|
||||||
|
* 2) the packet sequence is not touched!!!
|
||||||
|
* 3) set 1 byte OK indicator
|
||||||
|
*/
|
||||||
// Set New Packet size: new event_size + 1 byte replication status
|
// Set New Packet size: new event_size + 1 byte replication status
|
||||||
gw_mysql_set_byte3(ptr, new_event_size + 1);
|
gw_mysql_set_byte3(ptr, new_event_size + 1);
|
||||||
|
|
||||||
// Remove the useless bytes in the buffer
|
// Force OK flag after 3 bytes packet size
|
||||||
if (gwbuf_length(*pPacket) > (new_event_size + 1 + MYSQL_HEADER_LEN))
|
ptr[MYSQL_HEADER_LEN] = 0;
|
||||||
|
|
||||||
|
// Now modify the event header fields (19 bytes)
|
||||||
|
// 4 bytes timestamp
|
||||||
|
// 1 byte event type
|
||||||
|
// 4 bytes server_id
|
||||||
|
// 4 bytes event_size
|
||||||
|
// 4 bytes next_pos
|
||||||
|
// 2 bytes flags
|
||||||
|
int event_header_offset = MYSQL_HEADER_LEN + 1;
|
||||||
|
|
||||||
|
// Force Set timestamp to 0 [4 bytes]
|
||||||
|
gw_mysql_set_byte4(ptr + event_header_offset, 0);
|
||||||
|
// Point to event_type
|
||||||
|
event_header_offset += 4;
|
||||||
|
|
||||||
|
// Save current event type only for standard packets
|
||||||
|
if (!m_is_large)
|
||||||
{
|
{
|
||||||
uint32_t remove_bytes = gwbuf_length(*pPacket) - (new_event_size + 1 + MYSQL_HEADER_LEN);
|
orig_event_type = ptr[event_header_offset];
|
||||||
*pPacket = gwbuf_rtrim(*pPacket, remove_bytes);
|
}
|
||||||
|
// Set NEW event_type [1 byte]
|
||||||
|
ptr[event_header_offset] = RAND_EVENT;
|
||||||
|
// Point to server_id
|
||||||
|
event_header_offset++;
|
||||||
|
|
||||||
|
// Force Set server_id to 0 [4 bytes]
|
||||||
|
gw_mysql_set_byte4(ptr + event_header_offset, 0);
|
||||||
|
// Point to event_size
|
||||||
|
event_header_offset += 4;
|
||||||
|
|
||||||
|
// Set event_event size [4 bytes]
|
||||||
|
gw_mysql_set_byte4(ptr + event_header_offset, new_event_size);
|
||||||
|
// Next pos [4 bytes] is set by fixEvent(), go ahead!!!
|
||||||
|
event_header_offset += 4;
|
||||||
|
// Point to event_flags,
|
||||||
|
event_header_offset += 4;
|
||||||
|
|
||||||
|
// Set LOG_EVENT_SKIP_REPLICATION_F flags [2 bytes]
|
||||||
|
gw_mysql_set_byte2(ptr + event_header_offset,
|
||||||
|
LOG_EVENT_SKIP_REPLICATION_F);
|
||||||
|
|
||||||
|
// Point to RAND_EVENT content now
|
||||||
|
event_header_offset += 2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We set now the value for the first and second seed
|
||||||
|
* as input packet size and event_size
|
||||||
|
* event_size is 0 for all packets belonging to a large event
|
||||||
|
*/
|
||||||
|
// Set first seed as the input packet size (4 bytes only)
|
||||||
|
gw_mysql_set_byte4(ptr + event_header_offset,
|
||||||
|
buf_len - (MYSQL_HEADER_LEN + 1));
|
||||||
|
event_header_offset += 4;
|
||||||
|
// Set 0 for next 4 bytes of first seed
|
||||||
|
gw_mysql_set_byte4(ptr + event_header_offset, 0);
|
||||||
|
|
||||||
|
// Point to second seed
|
||||||
|
event_header_offset += 4;
|
||||||
|
// Set second seed as the input event type (4 bytes only)
|
||||||
|
gw_mysql_set_byte4(ptr + event_header_offset, orig_event_type);
|
||||||
|
event_header_offset += 4;
|
||||||
|
// Set 0 for next 4 bytes of second seed
|
||||||
|
gw_mysql_set_byte4(ptr + event_header_offset, 0);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Now we remove the useless bytes in the buffer
|
||||||
|
* in case of inout packet is bigger than RAND_EVENT packet
|
||||||
|
*/
|
||||||
|
if (gwbuf_length(*ppPacket) > (new_event_size + 1 + MYSQL_HEADER_LEN))
|
||||||
|
{
|
||||||
|
uint32_t remove_bytes = gwbuf_length(*ppPacket) - \
|
||||||
|
(new_event_size + 1 + MYSQL_HEADER_LEN);
|
||||||
|
*ppPacket = gwbuf_rtrim(*ppPacket, remove_bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fix Event Next pos = 0 and set new CRC32
|
// Fix Event Next pos = 0 and set new CRC32
|
||||||
fixEvent(ptr + MYSQL_HEADER_LEN + 1, new_event_size);
|
fixEvent(ptr + MYSQL_HEADER_LEN + 1, new_event_size);
|
||||||
|
|
||||||
// Log Filtered event
|
// Log the replaced event
|
||||||
|
// Now point to event_size offset
|
||||||
|
event_header_offset = MYSQL_HEADER_LEN + 1 + 4 + 1 + 4;
|
||||||
MXS_DEBUG("Filtered event #%d, "
|
MXS_DEBUG("Filtered event #%d, "
|
||||||
"ok %d, type %d, flags %d, size %d, next_pos %d, packet_size %d\n",
|
"ok %d, type %d, flags %d, size %d, next_pos %d, packet_size %d\n",
|
||||||
ptr[3],
|
ptr[3],
|
||||||
ptr[4],
|
ptr[4],
|
||||||
RAND_EVENT,
|
RAND_EVENT,
|
||||||
gw_mysql_get_byte2(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4 + 4),
|
gw_mysql_get_byte2(ptr + event_header_offset + 4 + 4),
|
||||||
gw_mysql_get_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4),
|
gw_mysql_get_byte4(ptr + event_header_offset),
|
||||||
gw_mysql_get_byte4(ptr + MYSQL_HEADER_LEN + 1 + 4 + 1 + 4 + 4),
|
gw_mysql_get_byte4(ptr + event_header_offset + 4),
|
||||||
gw_mysql_get_byte3(ptr));
|
gw_mysql_get_byte3(ptr));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user