Add duplicate event detection & logging.
When an event is sent to a slave, we store information about the event and who sent it, so that we can detect if the same event is sent twice. If a duplicate event is detected, we log information about it.
This commit is contained in:
@ -308,6 +308,12 @@ typedef struct
|
|||||||
int minavgs[BLR_NSTATS_MINUTES];
|
int minavgs[BLR_NSTATS_MINUTES];
|
||||||
} SLAVE_STATS;
|
} SLAVE_STATS;
|
||||||
|
|
||||||
|
typedef enum blr_thread_role
|
||||||
|
{
|
||||||
|
BLR_THREAD_ROLE_MASTER,
|
||||||
|
BLR_THREAD_ROLE_SLAVE
|
||||||
|
} blr_thread_role_t;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The client session structure used within this router. This represents
|
* The client session structure used within this router. This represents
|
||||||
* the slaves that are replicating binlogs from MaxScale.
|
* the slaves that are replicating binlogs from MaxScale.
|
||||||
@ -350,6 +356,11 @@ typedef struct router_slave
|
|||||||
int heartbeat; /*< Heartbeat in seconds */
|
int heartbeat; /*< Heartbeat in seconds */
|
||||||
uint8_t lastEventReceived; /*< Last event received */
|
uint8_t lastEventReceived; /*< Last event received */
|
||||||
time_t lastReply; /*< Last event sent */
|
time_t lastReply; /*< Last event sent */
|
||||||
|
// lsi: Last Sent Information
|
||||||
|
blr_thread_role_t lsi_sender_role; /*< Master or slave code sent */
|
||||||
|
THREAD lsi_sender_tid; /*< Who sent */
|
||||||
|
char lsi_binlog_name[BINLOG_FNAMELEN + 1]; /*< Which binlog file */
|
||||||
|
uint32_t lsi_binlog_pos; /*< What position */
|
||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
skygw_chk_t rses_chk_tail;
|
skygw_chk_t rses_chk_tail;
|
||||||
#endif
|
#endif
|
||||||
@ -605,4 +616,12 @@ extern int blr_ping(ROUTER_INSTANCE *, ROUTER_SLAVE *, GWBUF *);
|
|||||||
extern int blr_send_custom_error(DCB *, int, int, char *, char *, unsigned int);
|
extern int blr_send_custom_error(DCB *, int, int, char *, char *, unsigned int);
|
||||||
extern int blr_file_next_exists(ROUTER_INSTANCE *, ROUTER_SLAVE *);
|
extern int blr_file_next_exists(ROUTER_INSTANCE *, ROUTER_SLAVE *);
|
||||||
uint32_t extract_field(uint8_t *src, int bits);
|
uint32_t extract_field(uint8_t *src, int bits);
|
||||||
|
|
||||||
|
extern bool blr_send_event(blr_thread_role_t role,
|
||||||
|
const char* binlog_name,
|
||||||
|
uint32_t binlog_pos,
|
||||||
|
ROUTER_SLAVE *slave,
|
||||||
|
REP_HEADER *hdr,
|
||||||
|
uint8_t *buf);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@ -102,7 +102,6 @@ static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message,
|
|||||||
unsigned int err_code);
|
unsigned int err_code);
|
||||||
|
|
||||||
int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf);
|
int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf);
|
||||||
bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf);
|
|
||||||
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len);
|
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len);
|
||||||
|
|
||||||
static int keepalive = 1;
|
static int keepalive = 1;
|
||||||
@ -1955,6 +1954,9 @@ blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *
|
|||||||
|
|
||||||
switch (slave_action)
|
switch (slave_action)
|
||||||
{
|
{
|
||||||
|
char binlog_name[BINLOG_FNAMELEN + 1];
|
||||||
|
uint32_t binlog_pos;
|
||||||
|
|
||||||
case SLAVE_SEND_EVENT:
|
case SLAVE_SEND_EVENT:
|
||||||
/*
|
/*
|
||||||
* The slave should be up to date, check that the binlog
|
* The slave should be up to date, check that the binlog
|
||||||
@ -1971,12 +1973,15 @@ blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *
|
|||||||
slave->lastReply = time(0);
|
slave->lastReply = time(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
strcpy(binlog_name, slave->binlogfile);
|
||||||
|
binlog_pos = slave->binlog_pos;
|
||||||
|
|
||||||
if (hdr->event_type == ROTATE_EVENT)
|
if (hdr->event_type == ROTATE_EVENT)
|
||||||
{
|
{
|
||||||
blr_slave_rotate(router, slave, ptr);
|
blr_slave_rotate(router, slave, ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
blr_send_event(slave, hdr, ptr);
|
blr_send_event(BLR_THREAD_ROLE_MASTER, binlog_name, binlog_pos, slave, hdr, ptr);
|
||||||
|
|
||||||
spinlock_acquire(&slave->catch_lock);
|
spinlock_acquire(&slave->catch_lock);
|
||||||
if (hdr->event_type != ROTATE_EVENT)
|
if (hdr->event_type != ROTATE_EVENT)
|
||||||
@ -2640,15 +2645,40 @@ bool blr_send_packet(ROUTER_SLAVE *slave, uint8_t *buf, uint32_t len, bool first
|
|||||||
* This sends the complete replication event to a slave. If the event size exceeds
|
* This sends the complete replication event to a slave. If the event size exceeds
|
||||||
* the maximum size of a MySQL packet, it will be sent in multiple packets.
|
* the maximum size of a MySQL packet, it will be sent in multiple packets.
|
||||||
*
|
*
|
||||||
|
* @param role What is the role of the caller, slave or master.
|
||||||
|
* @param binlog_name The name of the binlogfile.
|
||||||
|
* @param binlog_pos The position in the binlogfile.
|
||||||
* @param slave Slave where the event is sent to
|
* @param slave Slave where the event is sent to
|
||||||
* @param hdr Replication header
|
* @param hdr Replication header
|
||||||
* @param buf Pointer to the replication event as it was read from the disk
|
* @param buf Pointer to the replication event as it was read from the disk
|
||||||
* @return True on success, false if memory allocation failed
|
* @return True on success, false if memory allocation failed
|
||||||
*/
|
*/
|
||||||
bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf)
|
bool blr_send_event(blr_thread_role_t role,
|
||||||
|
const char* binlog_name,
|
||||||
|
uint32_t binlog_pos,
|
||||||
|
ROUTER_SLAVE *slave,
|
||||||
|
REP_HEADER *hdr,
|
||||||
|
uint8_t *buf)
|
||||||
{
|
{
|
||||||
bool rval = true;
|
bool rval = true;
|
||||||
|
|
||||||
|
if ((strcmp(slave->lsi_binlog_name, binlog_name) == 0) &&
|
||||||
|
(slave->lsi_binlog_pos == binlog_pos))
|
||||||
|
{
|
||||||
|
MXS_ERROR("Slave %s:%i, server-id %d, binlog '%s', position %u: "
|
||||||
|
"thread %lu in the role of %s could not send the event, "
|
||||||
|
"the event has already been sent by thread %lu in the role of %s.",
|
||||||
|
slave->dcb->remote,
|
||||||
|
ntohs((slave->dcb->ipv4).sin_port),
|
||||||
|
slave->serverid,
|
||||||
|
binlog_name,
|
||||||
|
binlog_pos,
|
||||||
|
THREAD_SHELF(),
|
||||||
|
role == BLR_THREAD_ROLE_MASTER ? "master" : "slave",
|
||||||
|
slave->lsi_sender_tid,
|
||||||
|
slave->lsi_sender_role == BLR_THREAD_ROLE_MASTER ? "master" : "slave");
|
||||||
|
}
|
||||||
|
|
||||||
/** Check if the event and the OK byte fit into a single packet */
|
/** Check if the event and the OK byte fit into a single packet */
|
||||||
if (hdr->event_size + 1 < MYSQL_PACKET_LENGTH_MAX)
|
if (hdr->event_size + 1 < MYSQL_PACKET_LENGTH_MAX)
|
||||||
{
|
{
|
||||||
@ -2688,7 +2718,14 @@ bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf)
|
|||||||
|
|
||||||
slave->stats.n_events++;
|
slave->stats.n_events++;
|
||||||
|
|
||||||
if (!rval)
|
if (rval)
|
||||||
|
{
|
||||||
|
strcpy(slave->lsi_binlog_name, binlog_name);
|
||||||
|
slave->lsi_binlog_pos = binlog_pos;
|
||||||
|
slave->lsi_sender_role = role;
|
||||||
|
slave->lsi_sender_tid = THREAD_SHELF();
|
||||||
|
}
|
||||||
|
else
|
||||||
{
|
{
|
||||||
MXS_ERROR("Failed to send an event of %u bytes to slave at %s:%d.",
|
MXS_ERROR("Failed to send an event of %u bytes to slave at %s:%d.",
|
||||||
hdr->event_size, slave->dcb->remote,
|
hdr->event_size, slave->dcb->remote,
|
||||||
|
|||||||
@ -160,7 +160,6 @@ static int blr_slave_send_columndef_with_status_schema(ROUTER_INSTANCE *router,
|
|||||||
char *name, int type, int len, uint8_t seqno);
|
char *name, int type, int len, uint8_t seqno);
|
||||||
static void blr_send_slave_heartbeat(void *inst);
|
static void blr_send_slave_heartbeat(void *inst);
|
||||||
static int blr_slave_send_heartbeat(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
static int blr_slave_send_heartbeat(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
||||||
bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf);
|
|
||||||
|
|
||||||
void poll_fake_write_event(DCB *dcb);
|
void poll_fake_write_event(DCB *dcb);
|
||||||
|
|
||||||
@ -2234,6 +2233,12 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
|
|||||||
while (burst-- && burst_size > 0 &&
|
while (burst-- && burst_size > 0 &&
|
||||||
(record = blr_read_binlog(router, file, slave->binlog_pos, &hdr, read_errmsg)) != NULL)
|
(record = blr_read_binlog(router, file, slave->binlog_pos, &hdr, read_errmsg)) != NULL)
|
||||||
{
|
{
|
||||||
|
char binlog_name[BINLOG_FNAMELEN + 1];
|
||||||
|
uint32_t binlog_pos;
|
||||||
|
|
||||||
|
strcpy(binlog_name, slave->binlogfile);
|
||||||
|
binlog_pos = slave->binlog_pos;
|
||||||
|
|
||||||
if (hdr.event_type == ROTATE_EVENT)
|
if (hdr.event_type == ROTATE_EVENT)
|
||||||
{
|
{
|
||||||
unsigned long beat1 = hkheartbeat;
|
unsigned long beat1 = hkheartbeat;
|
||||||
@ -2288,7 +2293,8 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (blr_send_event(slave, &hdr, (uint8_t*) record->start))
|
if (blr_send_event(BLR_THREAD_ROLE_SLAVE, binlog_name, binlog_pos,
|
||||||
|
slave, &hdr, (uint8_t*) record->start))
|
||||||
{
|
{
|
||||||
if (hdr.event_type != ROTATE_EVENT)
|
if (hdr.event_type != ROTATE_EVENT)
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user