MXS-2785: Prevent broken replication setups
When rewrite_src and rewrite_dest have different lengths, the slave must use GTID based replication. This removes the need for one-to-one matching between the slave's relay log and the master's binlog which gets broken when event lengths are modified due to event rewriting.
This commit is contained in:
@ -90,16 +90,24 @@ BinlogFilterSession* BinlogFilterSession::create(MXS_SESSION* pSession,
|
|||||||
return new BinlogFilterSession(pSession, pFilter);
|
return new BinlogFilterSession(pSession, pFilter);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool is_master_binlog_checksum(GWBUF* buffer)
|
static bool is_matching_query(GWBUF* buffer, const char* target)
|
||||||
{
|
{
|
||||||
const char target[] = "SELECT @master_binlog_checksum";
|
|
||||||
char query[1024]; // Large enough for most practical cases
|
char query[1024]; // Large enough for most practical cases
|
||||||
size_t bytes = gwbuf_copy_data(buffer, MYSQL_HEADER_LEN + 1, sizeof(query) - 1, (uint8_t*)query);
|
size_t bytes = gwbuf_copy_data(buffer, MYSQL_HEADER_LEN + 1, sizeof(query) - 1, (uint8_t*)query);
|
||||||
query[bytes] = '\0';
|
query[bytes] = '\0';
|
||||||
|
|
||||||
return strcasestr(query, target);
|
return strcasestr(query, target);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool is_master_binlog_checksum(GWBUF* buffer)
|
||||||
|
{
|
||||||
|
return is_matching_query(buffer, "SELECT @master_binlog_checksum");
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool is_using_gtid(GWBUF* buffer)
|
||||||
|
{
|
||||||
|
return is_matching_query(buffer, "SET @slave_connect_state=");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Route input data from client.
|
* Route input data from client.
|
||||||
*
|
*
|
||||||
@ -129,6 +137,19 @@ int BinlogFilterSession::routeQuery(GWBUF* pPacket)
|
|||||||
// Connected Slave server is waiting for binlog events
|
// Connected Slave server is waiting for binlog events
|
||||||
m_state = BINLOG_MODE;
|
m_state = BINLOG_MODE;
|
||||||
MXS_INFO("Slave server %u is waiting for binlog events.", m_serverid);
|
MXS_INFO("Slave server %u is waiting for binlog events.", m_serverid);
|
||||||
|
|
||||||
|
if (!m_is_gtid
|
||||||
|
&& m_filter.getConfig().rewrite_src.length() != m_filter.getConfig().rewrite_dest.length())
|
||||||
|
{
|
||||||
|
gwbuf_free(pPacket);
|
||||||
|
std::ostringstream ss;
|
||||||
|
ss << "GTID replication is required when '"
|
||||||
|
<< REWRITE_SRC << "' and '" << REWRITE_DEST << "' are of different length";
|
||||||
|
mxs::FilterSession::clientReply(
|
||||||
|
modutil_create_mysql_err_msg(1, 0, ER_MASTER_FATAL_ERROR_READING_BINLOG, "HY000",
|
||||||
|
ss.str().c_str()));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case MXS_COM_QUERY:
|
case MXS_COM_QUERY:
|
||||||
@ -136,6 +157,11 @@ int BinlogFilterSession::routeQuery(GWBUF* pPacket)
|
|||||||
m_state = COMMAND_MODE;
|
m_state = COMMAND_MODE;
|
||||||
m_reading_checksum = is_master_binlog_checksum(pPacket);
|
m_reading_checksum = is_master_binlog_checksum(pPacket);
|
||||||
gwbuf_set_type(pPacket, GWBUF_TYPE_COLLECT_RESULT);
|
gwbuf_set_type(pPacket, GWBUF_TYPE_COLLECT_RESULT);
|
||||||
|
|
||||||
|
if (is_using_gtid(pPacket))
|
||||||
|
{
|
||||||
|
m_is_gtid = true;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
@ -124,4 +124,5 @@ private:
|
|||||||
uint32_t m_large_left = 0; // Remaining bytes of a large event
|
uint32_t m_large_left = 0; // Remaining bytes of a large event
|
||||||
bool m_is_large = false; // Large Event indicator
|
bool m_is_large = false; // Large Event indicator
|
||||||
bool m_reading_checksum = false;// Whether we are waiting for the binlog checksum response
|
bool m_reading_checksum = false;// Whether we are waiting for the binlog checksum response
|
||||||
|
bool m_is_gtid = false;
|
||||||
};
|
};
|
||||||
|
Reference in New Issue
Block a user