Added blr_distribute_error_message when distributing transaction events

Added blr_distribute_error_message when distributing transaction events
This commit is contained in:
MassimilianoPinto
2015-10-02 09:46:52 +02:00
parent d31e081e45
commit 1e76de540f
2 changed files with 67 additions and 17 deletions

View File

@ -486,7 +486,7 @@ struct stat statb;
hdr->flags = EXTRACT16(&hdbuf[17]);
/* event pos & size checks */
if (hdr->next_pos < 0 || hdr->event_size <= 0 || ((hdr->next_pos != (pos + hdr->event_size)) && (hdr->event_type != ROTATE_EVENT))) {
if (hdr->event_size == 0 || ((hdr->next_pos != (pos + hdr->event_size)) && (hdr->event_type != ROTATE_EVENT))) {
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Client requested master to start replication from invalid position %lu", pos);
return NULL;
}

View File

@ -100,6 +100,7 @@ static void blr_check_last_master_event(void *inst);
extern int blr_check_heartbeat(ROUTER_INSTANCE *router);
extern char * blr_last_event_description(ROUTER_INSTANCE *router);
static void blr_log_identity(ROUTER_INSTANCE *router);
static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code);
static int keepalive = 1;
@ -1321,31 +1322,48 @@ int n_bufs = -1, pn_bufs = -1;
/* distribute event */
blr_distribute_binlog_record(router, &new_hdr, raw_data);
spinlock_acquire(&router->binlog_lock);
pos = new_hdr.next_pos;
spinlock_release(&router->binlog_lock);
gwbuf_free(record);
}
/* Log whether no event has been sent */
if (pos == router->binlog_position) {
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"No events distributed to slaves for a pending transaction in %s at %lu."
" Last event from master at %lu",
router->binlog_name,
router->binlog_position,
router->current_pos)));
}
/* Check whether binlog records has been read in previous loop */
if (pos < router->current_pos) {
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"Some events were not distributed to slaves for a pending transaction "
"in %s at %lu. Last distributed even at %lu, last event from master at %lu",
router->binlog_name,
router->binlog_position,
pos,
router->current_pos)));
char err_message[BINLOG_ERROR_MSG_LEN+1];
err_message[BINLOG_ERROR_MSG_LEN] = '\0';
/* No event has been sent */
if (pos == router->binlog_position) {
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"No events distributed to slaves for a pending transaction in %s at %lu."
" Last event from master at %lu",
router->binlog_name,
router->binlog_position,
router->current_pos)));
strncpy(err_message, "No transaction events sent", BINLOG_ERROR_MSG_LEN);
} else {
/* Some events have been sent */
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"Some events were not distributed to slaves for a pending transaction "
"in %s at %lu. Last distributed even at %lu, last event from master at %lu",
router->binlog_name,
router->binlog_position,
pos,
router->current_pos)));
strncpy(err_message, "Incomplete transaction events sent", BINLOG_ERROR_MSG_LEN);
}
/* distribute error message to registered slaves */
blr_distribute_error_message(router, err_message, "HY000", 1236);
}
spinlock_acquire(&router->lock);
@ -2177,3 +2195,35 @@ static void blr_log_identity(ROUTER_INSTANCE *router) {
}
}
/**
* Distribute an error message to all the registered slaves.
*
* @param router The router instance
* @param message The message to send
* @param state The MySQL State for message
* @param err_code The MySQL error code for message
*/
static void
blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code) {
ROUTER_SLAVE *slave;
spinlock_acquire(&router->lock);
slave = router->slaves;
while (slave)
{
if (slave->state != BLRS_DUMPING)
{
slave = slave->next;
continue;
}
/* send the error that stops slave replication */
blr_send_custom_error(slave->dcb, slave->seqno++, 0, message, state, err_code);
slave = slave->next;
}
spinlock_release(&router->lock);
}