MXS-2785: Allow event size changes due to rewrites
The replication events use a redundant format that has both the length of the event and the position of the next event. The length can be modified so that the next event position of the previous event and the length of the curren event can be different. This includes overlap of the events where the next event position of an event is "inside" the current event. The next event position must retain its original value as that allows replication slaves to reconnect with the correct position when file and position based replication is used. For GTID replication, the slave asks for the coordinates from the master and uses those. When a slave receives a heartbeat event from a master, it checks that the binlog name matches and that the next event position in the event is not behind the slave's relay log position. These events must be modified to contain a fake next event position that will never be reached by the slave. This makes sure that the simple sanity checks never fail even if we've caused the slave's relay log to be ahead of the master's binlog.
This commit is contained in:
parent
babce13ec6
commit
3de5e4edcd
@ -68,10 +68,6 @@ BinlogFilter* BinlogFilter::create(const char* zName,
|
||||
{
|
||||
MXS_ERROR("Both '%s' and '%s' must be defined", REWRITE_SRC, REWRITE_DEST);
|
||||
}
|
||||
else if (src.length() != dest.length())
|
||||
{
|
||||
MXS_ERROR("Both '%s' and '%s' must have the same length", REWRITE_SRC, REWRITE_DEST);
|
||||
}
|
||||
else
|
||||
{
|
||||
rval = new BinlogFilter(pParams);
|
||||
|
@ -214,7 +214,7 @@ int BinlogFilterSession::clientReply(GWBUF* pPacket)
|
||||
extract_header(event, &hdr);
|
||||
|
||||
// Check whether this event and next ones can be filtered
|
||||
checkEvent(pPacket, hdr);
|
||||
checkEvent(&pPacket, hdr);
|
||||
|
||||
// Check whether this event is part of a large event being sent
|
||||
handlePackets(len, hdr);
|
||||
@ -260,12 +260,11 @@ void BinlogFilterSession::close()
|
||||
* @return True id TABLE_MAP_EVENT contains
|
||||
* db/table names to skip
|
||||
*/
|
||||
bool BinlogFilterSession::checkEvent(GWBUF* buffer,
|
||||
const REP_HEADER& hdr)
|
||||
bool BinlogFilterSession::checkEvent(GWBUF** buffer, const REP_HEADER& hdr)
|
||||
{
|
||||
mxb_assert(!m_is_large);
|
||||
|
||||
uint8_t* event = GWBUF_DATA(buffer);
|
||||
uint8_t* event = GWBUF_DATA(*buffer);
|
||||
uint8_t* body = event + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN;
|
||||
uint32_t body_size = hdr.event_size - BINLOG_EVENT_HDR_LEN;
|
||||
|
||||
@ -283,8 +282,22 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer,
|
||||
switch (hdr.event_type)
|
||||
{
|
||||
case HEARTBEAT_EVENT:
|
||||
// Set m_skip = false anyway: cannot alter this event
|
||||
m_skip = false;
|
||||
{
|
||||
// The slave server that receives this event will compare the binlog name and the next
|
||||
// position of the heartbeat event to its own. The binlog name check will pass but the
|
||||
// position check will fail if the slave's relay log is ahead of the master's binlog. Since
|
||||
// the slave only checks if it's ahead of the master, by setting the next event position to a
|
||||
// fake value we bypass this. This is safe as heartbeat events are never written into the
|
||||
// relay log and thus do not affect replication.
|
||||
REP_HEADER hdr_copy = hdr;
|
||||
hdr_copy.next_pos = 0xffffffff;
|
||||
fixEvent(GWBUF_DATA(*buffer) + MYSQL_HEADER_LEN + 1,
|
||||
gwbuf_length(*buffer) - MYSQL_HEADER_LEN - 1,
|
||||
hdr_copy);
|
||||
|
||||
// Set m_skip = false anyway: cannot alter this event
|
||||
m_skip = false;
|
||||
}
|
||||
break;
|
||||
|
||||
case MARIADB10_GTID_EVENT:
|
||||
@ -305,8 +318,12 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer,
|
||||
|
||||
case QUERY_EVENT:
|
||||
// Handle the SQL statement: DDL, DML, BEGIN, COMMIT
|
||||
checkStatement(body, body_size);
|
||||
fixEvent(event + MYSQL_HEADER_LEN + 1, hdr.event_size, hdr);
|
||||
checkStatement(buffer, hdr);
|
||||
|
||||
// checkStatement can reallocate the buffer in case the size changes: use fresh pointers
|
||||
fixEvent(GWBUF_DATA(*buffer) + MYSQL_HEADER_LEN + 1,
|
||||
gwbuf_length(*buffer) - MYSQL_HEADER_LEN - 1,
|
||||
hdr);
|
||||
break;
|
||||
|
||||
case XID_EVENT:
|
||||
@ -452,6 +469,9 @@ static void event_set_crc32(uint8_t* event, uint32_t event_size)
|
||||
*/
|
||||
void BinlogFilterSession::fixEvent(uint8_t* event, uint32_t event_size, const REP_HEADER& hdr)
|
||||
{
|
||||
// Update event length in case we changed it
|
||||
gw_mysql_set_byte4(event + 4 + 1 + 4, event_size);
|
||||
|
||||
// Set next pos to 0.
|
||||
// The next_pos offset is the 13th byte in replication event header 19 bytes
|
||||
// + 4 (time) + 1 (type) + 4 (server_id) + 4 (event_size)
|
||||
@ -775,23 +795,21 @@ void BinlogFilterSession::handleEventData(uint32_t len)
|
||||
}
|
||||
|
||||
/**
|
||||
* Check wether the config for db/table filtering is found in
|
||||
* the SQL statement inside QUERY_EVENT binlog event.
|
||||
* Note: COMMIT is an exception here, routine returns false
|
||||
* and the called will set m_skip to the right value.
|
||||
* Check QUERY_EVENT events.
|
||||
*
|
||||
* @see https://mariadb.com/kb/en/library/query_event/
|
||||
*
|
||||
* In case of config match the member variable m_skip is set to true.
|
||||
* With empty config it returns true and skipping is always false.
|
||||
* This function checks whether the statement should be replicated and whether the database/table name should
|
||||
* be rewritten. If a rewrite takes place, the buffer can be reallocated.
|
||||
*
|
||||
*
|
||||
* @param event The QUERY_EVENT binlog event
|
||||
* @param event_size The binlog event size
|
||||
* @return False for COMMIT, true otherwise
|
||||
* @param bufer Pointer to the buffer containing the event
|
||||
* @param hdr The extracted replication header
|
||||
*/
|
||||
bool BinlogFilterSession::checkStatement(uint8_t* event, const uint32_t event_size)
|
||||
void BinlogFilterSession::checkStatement(GWBUF** buffer, const REP_HEADER& hdr)
|
||||
{
|
||||
uint8_t* event = GWBUF_DATA(*buffer) + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN;
|
||||
uint32_t event_size = hdr.event_size - BINLOG_EVENT_HDR_LEN;
|
||||
|
||||
int db_name_len = event[4 + 4];
|
||||
int var_block_len_offset = 4 + 4 + 1 + 2;
|
||||
int var_block_len = gw_mysql_get_byte2(event + var_block_len_offset);
|
||||
@ -807,24 +825,53 @@ bool BinlogFilterSession::checkStatement(uint8_t* event, const uint32_t event_si
|
||||
|
||||
if (!m_skip && !config.rewrite_src.empty())
|
||||
{
|
||||
bool replace = false;
|
||||
|
||||
if (db == config.rewrite_src)
|
||||
{
|
||||
replace = true;
|
||||
db = config.rewrite_dest;
|
||||
}
|
||||
|
||||
while (char* p = strstr(&sql[0], config.rewrite_src.c_str()))
|
||||
size_t pos = 0;
|
||||
|
||||
while ((pos = sql.find(config.rewrite_src, pos)) != std::string::npos)
|
||||
{
|
||||
memcpy(p, config.rewrite_dest.c_str(), config.rewrite_dest.length());
|
||||
replace = true;
|
||||
sql.replace(pos, config.rewrite_src.length(), config.rewrite_dest);
|
||||
pos += config.rewrite_dest.length();
|
||||
}
|
||||
|
||||
mxb_assert(db.length() == (size_t)db_name_len);
|
||||
mxb_assert(sql.length() == (size_t)statement_len);
|
||||
memcpy(event + static_size + var_block_len, db.c_str(), db_name_len);
|
||||
memcpy(event + static_size + var_block_len + db_name_len + 1, sql.c_str(), statement_len);
|
||||
MXS_INFO("Rename: (%s) %s", db.c_str(), sql.c_str());
|
||||
}
|
||||
if (replace)
|
||||
{
|
||||
int len = sql.length() + db.length() - statement_len - db_name_len;
|
||||
|
||||
return true;
|
||||
|
||||
if (len > 0)
|
||||
{
|
||||
// Buffer is too short, extend it
|
||||
*buffer = gwbuf_make_contiguous(gwbuf_append(*buffer, gwbuf_alloc(len)));
|
||||
}
|
||||
else if (len < 0)
|
||||
{
|
||||
// Make the buffer shorter (len is negative so we add it to the total length)
|
||||
GWBUF* tmp = gwbuf_alloc_and_load(gwbuf_length(*buffer) + len, GWBUF_DATA(*buffer));
|
||||
gwbuf_free(*buffer);
|
||||
*buffer = tmp;
|
||||
}
|
||||
|
||||
event = GWBUF_DATA(*buffer) + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN;
|
||||
|
||||
memcpy(event + static_size + var_block_len, db.c_str(), db.length() + 1);
|
||||
memcpy(event + static_size + var_block_len + db.length() + 1, sql.c_str(), sql.length());
|
||||
event[4 + 4] = db.length();
|
||||
|
||||
// Also fix the packet length
|
||||
gw_mysql_set_byte3(GWBUF_DATA(*buffer), gwbuf_length(*buffer) - MYSQL_HEADER_LEN);
|
||||
|
||||
MXS_INFO("Rewrote query: (%s) %s", db.c_str(), sql.c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void BinlogFilterSession::checkAnnotate(const uint8_t* event, const uint32_t event_size)
|
||||
|
@ -88,7 +88,7 @@ private:
|
||||
void fixEvent(uint8_t* data, uint32_t event_size, const REP_HEADER& hdr);
|
||||
|
||||
// Whether to skip current event
|
||||
bool checkEvent(GWBUF* data, const REP_HEADER& hdr);
|
||||
bool checkEvent(GWBUF** data, const REP_HEADER& hdr);
|
||||
|
||||
// Filter the replication event
|
||||
void replaceEvent(GWBUF** data, const REP_HEADER& hdr);
|
||||
@ -100,7 +100,7 @@ private:
|
||||
void handleEventData(uint32_t len);
|
||||
|
||||
// Check SQL statement in QUERY_EVENT
|
||||
bool checkStatement(uint8_t* event, const uint32_t event_size);
|
||||
void checkStatement(GWBUF** buffer, const REP_HEADER& hdr);
|
||||
|
||||
// Check DB.TABLE in ANNOTATE_ROWS event
|
||||
void checkAnnotate(const uint8_t* event,
|
||||
|
Loading…
x
Reference in New Issue
Block a user