Fix for rotate and catchup with transaction_safety = 1

Fix for rotate and catchup with transaction_safety = 1
This commit is contained in:
MassimilianoPinto
2015-11-06 18:14:52 +01:00
parent e8c1c82265
commit 00c506b029
4 changed files with 105 additions and 50 deletions

View File

@ -181,9 +181,10 @@
#define BLRM_MASTER_REGITRATION_QUERY_LEN 255 #define BLRM_MASTER_REGITRATION_QUERY_LEN 255
/* Read Binlog position states */ /* Read Binlog position states */
#define SLAVE_POS_READ_OK 0x0 #define SLAVE_POS_READ_OK 0x00
#define SLAVE_POS_READ_ERR 0xff #define SLAVE_POS_READ_ERR 0xff
#define SLAVE_POS_READ_UNSAFE 0xfe #define SLAVE_POS_READ_UNSAFE 0xfe
/** /**
* Some useful macros for examining the MySQL Response packets * Some useful macros for examining the MySQL Response packets
*/ */

View File

@ -208,6 +208,7 @@ unsigned char magic[] = BINLOG_MAGIC;
router->current_pos = 4; /* Initial position after the magic number */ router->current_pos = 4; /* Initial position after the magic number */
router->binlog_position = 4; /* Initial position after the magic number */ router->binlog_position = 4; /* Initial position after the magic number */
router->current_safe_event = 4; router->current_safe_event = 4;
router->last_written = 0;
} }
@ -426,30 +427,54 @@ struct stat statb;
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Invalid file pointer for requested binlog at position %lu", pos); snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Invalid file pointer for requested binlog at position %lu", pos);
return NULL; return NULL;
} }
spinlock_acquire(&file->lock);
if (fstat(file->fd, &statb) == 0) if (fstat(file->fd, &statb) == 0)
filelen = statb.st_size; filelen = statb.st_size;
else {
if (file->fd == -1) {
hdr->ok = SLAVE_POS_READ_OK;
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "blr_read_binlog called with invalid file->fd, pos %lu", pos);
spinlock_release(&file->lock);
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Slave has failed fstat %s", errmsg)));
return NULL;
}
}
spinlock_release(&file->lock);
if (pos > filelen) if (pos > filelen)
{ {
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested position %lu is beyond end of the binlog file '%s', size %lu", pos, file->binlogname, filelen); snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested position %lu is beyond end of the binlog file '%s', size %lu",
pos, file->binlogname, filelen);
return NULL; return NULL;
} }
spinlock_acquire(&router->binlog_lock);
spinlock_acquire(&file->lock);
if (strcmp(router->binlog_name, file->binlogname) == 0 && if (strcmp(router->binlog_name, file->binlogname) == 0 &&
pos >= router->binlog_position) pos >= router->binlog_position)
{ {
if (pos > router->binlog_position) if (pos > router->binlog_position && !router->rotating)
{ {
/* Unsafe position, slave will be disconnected by the calling routine */ /* Unsafe position, slave will be disconnected by the calling routine */
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested binlog position %lu. Position is unsafe so disconnecting. Latest safe position %lu, end of binlog file %lu", pos, router->binlog_position, router->current_pos); snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested position %lu is not available. "
"Latest safe position %lu, end of binlog '%s' is %lu",
pos, router->binlog_position, file->binlogname, router->current_pos);
hdr->ok = SLAVE_POS_READ_UNSAFE; hdr->ok = SLAVE_POS_READ_UNSAFE;
} else { } else {
/* accessing last position is ok */ /* accessing last position is ok */
hdr->ok = SLAVE_POS_READ_OK; hdr->ok = SLAVE_POS_READ_OK;
} }
spinlock_release(&file->lock);
spinlock_release(&router->binlog_lock);
return NULL; return NULL;
} }
spinlock_release(&file->lock);
spinlock_release(&router->binlog_lock);
/* Read the header information from the file */ /* Read the header information from the file */
if ((n = pread(file->fd, hdbuf, BINLOG_EVENT_HDR_LEN, pos)) != BINLOG_EVENT_HDR_LEN) if ((n = pread(file->fd, hdbuf, BINLOG_EVENT_HDR_LEN, pos)) != BINLOG_EVENT_HDR_LEN)
@ -960,9 +985,8 @@ int fde_seen = 0;
pending_transaction = 0; pending_transaction = 0;
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Warning : pending transaction has been found. " "Binlog '%s' ends at position %lu and has an incomplete transaction at %lu. ",
"Setting safe pos to %lu, current pos %lu", router->binlog_name, router->current_pos, router->binlog_position)));
router->binlog_position, router->current_pos)));
return 0; return 0;
} else { } else {

View File

@ -96,7 +96,7 @@ void blr_master_close(ROUTER_INSTANCE *);
char *blr_extract_column(GWBUF *buf, int col); char *blr_extract_column(GWBUF *buf, int col);
void blr_cache_response(ROUTER_INSTANCE *router, char *response, GWBUF *buf); void blr_cache_response(ROUTER_INSTANCE *router, char *response, GWBUF *buf);
void poll_fake_write_event(DCB *dcb); void poll_fake_write_event(DCB *dcb);
GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr); GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr, unsigned long long pos_end);
static void blr_check_last_master_event(void *inst); static void blr_check_last_master_event(void *inst);
extern int blr_check_heartbeat(ROUTER_INSTANCE *router); extern int blr_check_heartbeat(ROUTER_INSTANCE *router);
extern char * blr_last_event_description(ROUTER_INSTANCE *router); extern char * blr_last_event_description(ROUTER_INSTANCE *router);
@ -1059,16 +1059,13 @@ int n_bufs = -1, pn_bufs = -1;
* won't be updated to router->current_pos * won't be updated to router->current_pos
*/ */
spinlock_acquire(&router->binlog_lock);
if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) { if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) {
/* no pending transaction: set current_pos to binlog_position */ /* no pending transaction: set current_pos to binlog_position */
spinlock_acquire(&router->lock);
router->binlog_position = router->current_pos; router->binlog_position = router->current_pos;
router->current_safe_event = router->current_pos; router->current_safe_event = router->current_pos;
spinlock_release(&router->lock);
} }
spinlock_release(&router->binlog_lock);
/** /**
* Detect transactions in events * Detect transactions in events
@ -1091,6 +1088,8 @@ int n_bufs = -1, pn_bufs = -1;
flags = *(ptr+4+20 + 8 + 4); flags = *(ptr+4+20 + 8 + 4);
if (flags == 0) { if (flags == 0) {
spinlock_acquire(&router->binlog_lock);
if (router->pending_transaction > 0) { if (router->pending_transaction > 0) {
LOGIF(LE,(skygw_log_write_flush(LOGFILE_ERROR, LOGIF(LE,(skygw_log_write_flush(LOGFILE_ERROR,
"Error: a MariaDB 10 transaction " "Error: a MariaDB 10 transaction "
@ -1104,11 +1103,9 @@ int n_bufs = -1, pn_bufs = -1;
// An action should be taken here // An action should be taken here
} }
spinlock_acquire(&router->lock);
router->pending_transaction = 1; router->pending_transaction = 1;
spinlock_release(&router->lock); spinlock_release(&router->binlog_lock);
} }
} }
} }
@ -1127,9 +1124,10 @@ int n_bufs = -1, pn_bufs = -1;
statement_sql = calloc(1, statement_len+1); statement_sql = calloc(1, statement_len+1);
strncpy(statement_sql, (char *)ptr+4+20+4+4+1+2+2+var_block_len+1+db_name_len, statement_len); strncpy(statement_sql, (char *)ptr+4+20+4+4+1+2+2+var_block_len+1+db_name_len, statement_len);
spinlock_acquire(&router->binlog_lock);
/* Check for BEGIN (it comes for START TRANSACTION too) */ /* Check for BEGIN (it comes for START TRANSACTION too) */
if (strncmp(statement_sql, "BEGIN", 5) == 0) { if (strncmp(statement_sql, "BEGIN", 5) == 0) {
if (router->pending_transaction > 0) { if (router->pending_transaction > 0) {
LOGIF(LE,(skygw_log_write_flush(LOGFILE_ERROR, LOGIF(LE,(skygw_log_write_flush(LOGFILE_ERROR,
"Error: a transaction is already open " "Error: a transaction is already open "
@ -1140,34 +1138,28 @@ int n_bufs = -1, pn_bufs = -1;
// An action should be taken here // An action should be taken here
} }
spinlock_acquire(&router->lock);
router->pending_transaction = 1; router->pending_transaction = 1;
spinlock_release(&router->lock);
} }
/* Check for COMMIT in non transactional store engines */ /* Check for COMMIT in non transactional store engines */
if (strncmp(statement_sql, "COMMIT", 6) == 0) { if (strncmp(statement_sql, "COMMIT", 6) == 0) {
spinlock_acquire(&router->lock);
router->pending_transaction = 2; router->pending_transaction = 2;
spinlock_release(&router->lock);
} }
spinlock_release(&router->binlog_lock);
free(statement_sql); free(statement_sql);
} }
/* Check for COMMIT in Transactional engines, i.e InnoDB */ /* Check for COMMIT in Transactional engines, i.e InnoDB */
if(hdr.event_type == XID_EVENT) { if(hdr.event_type == XID_EVENT) {
spinlock_acquire(&router->binlog_lock);
if (router->pending_transaction) { if (router->pending_transaction) {
spinlock_acquire(&router->lock);
router->pending_transaction = 3; router->pending_transaction = 3;
spinlock_release(&router->lock);
} }
spinlock_release(&router->binlog_lock);
} }
} }
@ -1281,13 +1273,14 @@ int n_bufs = -1, pn_bufs = -1;
* may depend on pending transaction * may depend on pending transaction
*/ */
spinlock_acquire(&router->binlog_lock);
if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) { if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) {
spinlock_acquire(&router->lock);
router->binlog_position = router->current_pos; router->binlog_position = router->current_pos;
router->current_safe_event = router->current_pos; router->current_safe_event = router->current_pos;
spinlock_release(&router->lock); spinlock_release(&router->binlog_lock);
/* Now distribute events */ /* Now distribute events */
blr_distribute_binlog_record(router, &hdr, ptr); blr_distribute_binlog_record(router, &hdr, ptr);
@ -1307,26 +1300,25 @@ int n_bufs = -1, pn_bufs = -1;
if (router->pending_transaction > 1) { if (router->pending_transaction > 1) {
unsigned long long pos; unsigned long long pos;
unsigned long long end_pos;
GWBUF *record; GWBUF *record;
uint8_t *raw_data; uint8_t *raw_data;
REP_HEADER new_hdr; REP_HEADER new_hdr;
int i=0; int i=0;
spinlock_acquire(&router->lock);
pos = router->binlog_position; pos = router->binlog_position;
end_pos = router->current_pos;
spinlock_release(&router->lock); spinlock_release(&router->binlog_lock);
while ((record = blr_read_events_from_pos(router, pos, &new_hdr)) != NULL) { while ((record = blr_read_events_from_pos(router, pos, &new_hdr, end_pos)) != NULL) {
i++; i++;
raw_data = GWBUF_DATA(record); raw_data = GWBUF_DATA(record);
/* distribute event */ /* distribute event */
blr_distribute_binlog_record(router, &new_hdr, raw_data); blr_distribute_binlog_record(router, &new_hdr, raw_data);
spinlock_acquire(&router->lock); spinlock_acquire(&router->binlog_lock);
/** The current safe position is only updated /** The current safe position is only updated
* if it points to the event we just distributed. */ * if it points to the event we just distributed. */
@ -1337,13 +1329,12 @@ int n_bufs = -1, pn_bufs = -1;
pos = new_hdr.next_pos; pos = new_hdr.next_pos;
spinlock_release(&router->lock); spinlock_release(&router->binlog_lock);
gwbuf_free(record); gwbuf_free(record);
} }
/* Check whether binlog records has been read in previous loop */ /* Check whether binlog records has been read in previous loop */
if (pos < router->current_pos) { if (pos < router->current_pos) {
char err_message[BINLOG_ERROR_MSG_LEN+1]; char err_message[BINLOG_ERROR_MSG_LEN+1];
@ -1376,16 +1367,17 @@ int n_bufs = -1, pn_bufs = -1;
blr_distribute_error_message(router, err_message, "HY000", 1236); blr_distribute_error_message(router, err_message, "HY000", 1236);
} }
spinlock_acquire(&router->lock); /* update binlog_position and set pending to 0 */
spinlock_acquire(&router->binlog_lock);
router->binlog_position = router->current_pos; router->binlog_position = router->current_pos;
router->pending_transaction = 0; router->pending_transaction = 0;
spinlock_release(&router->lock); spinlock_release(&router->binlog_lock);
} else {
spinlock_release(&router->binlog_lock);
} }
} }
} }
else else
{ {
@ -1654,6 +1646,8 @@ int action;
if (action == 1) if (action == 1)
{ {
spinlock_acquire(&router->binlog_lock);
slave_event_action_t slave_action = SLAVE_FORCE_CATCHUP; slave_event_action_t slave_action = SLAVE_FORCE_CATCHUP;
if(router->trx_safe && slave->binlog_pos == router->current_safe_event && if(router->trx_safe && slave->binlog_pos == router->current_safe_event &&
@ -1702,6 +1696,8 @@ int action;
hdr->next_pos - hdr->event_size))); hdr->next_pos - hdr->event_size)));
} }
spinlock_release(&router->binlog_lock);
/* /*
* If slave_action is SLAVE_FORCE_CATCHUP then * If slave_action is SLAVE_FORCE_CATCHUP then
* the slave is not at the position it should be. Force it into * the slave is not at the position it should be. Force it into
@ -1738,11 +1734,11 @@ int action;
slave->stats.n_bytes += gwbuf_length(pkt); slave->stats.n_bytes += gwbuf_length(pkt);
slave->stats.n_events++; slave->stats.n_events++;
slave->dcb->func.write(slave->dcb, pkt); slave->dcb->func.write(slave->dcb, pkt);
spinlock_acquire(&slave->catch_lock);
if (hdr->event_type != ROTATE_EVENT) if (hdr->event_type != ROTATE_EVENT)
{ {
slave->binlog_pos = hdr->next_pos; slave->binlog_pos = hdr->next_pos;
} }
spinlock_acquire(&slave->catch_lock);
if (slave->overrun) if (slave->overrun)
{ {
slave->stats.n_overrun++; slave->stats.n_overrun++;
@ -1908,7 +1904,7 @@ char *rval;
* @return The binlog record wrapped in a GWBUF structure * @return The binlog record wrapped in a GWBUF structure
*/ */
GWBUF GWBUF
*blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr) { *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr, unsigned long long pos_end) {
unsigned long long end_pos = 0; unsigned long long end_pos = 0;
struct stat statb; struct stat statb;
uint8_t hdbuf[19]; uint8_t hdbuf[19];
@ -1918,7 +1914,7 @@ int n;
int event_limit; int event_limit;
/* Get current binnlog position */ /* Get current binnlog position */
end_pos = router->current_pos; end_pos = pos_end;
/* end of file reached, we're done */ /* end of file reached, we're done */
if (pos == end_pos) { if (pos == end_pos) {

View File

@ -1813,8 +1813,9 @@ uint32_t chksum;
LOGIF(LM, (skygw_log_write( LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE, LOGFILE_MESSAGE,
"%s: Slave %s, server id %d requested binlog file %s from position %lu", "%s: Slave %s:%d, server id %d requested binlog file %s from position %lu",
router->service->name, slave->dcb->remote, router->service->name, slave->dcb->remote,
slave->port,
slave->serverid, slave->serverid,
slave->binlogfile, (unsigned long)slave->binlog_pos))); slave->binlogfile, (unsigned long)slave->binlog_pos)));
@ -2050,8 +2051,7 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
if (hdr.ok == SLAVE_POS_READ_ERR) { if (hdr.ok == SLAVE_POS_READ_ERR) {
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"%s Slave %s:%i, server-id %d, binlog '%s', %s", "Slave %s:%i, server-id %d, binlog '%s', blr_read_binlog failure: %s",
router->service->name,
slave->dcb->remote, slave->dcb->remote,
slave->port, slave->port,
slave->serverid, slave->serverid,
@ -2076,7 +2076,13 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
if (hdr.ok == SLAVE_POS_READ_UNSAFE) { if (hdr.ok == SLAVE_POS_READ_UNSAFE) {
ROUTER_OBJECT *router_obj= router->service->router; ROUTER_OBJECT *router_obj;
spinlock_acquire(&router->lock);
router_obj = router->service->router;
spinlock_release(&router->lock);
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"%s: Slave %s:%i, server-id %d, binlog '%s', %s", "%s: Slave %s:%i, server-id %d, binlog '%s', %s",
@ -2146,7 +2152,7 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
if (slave->stats.n_caughtup == 1) if (slave->stats.n_caughtup == 1)
{ {
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s:%d, server-id %d is up to date '%s', position %lu.", "%s: Slave %s:%d, server-id %d is now up to date '%s', position %lu.",
router->service->name, router->service->name,
slave->dcb->remote, slave->dcb->remote,
slave->port, slave->port,
@ -2232,6 +2238,34 @@ ROUTER_INSTANCE *router = slave->router;
{ {
if (slave->state == BLRS_DUMPING) if (slave->state == BLRS_DUMPING)
{ {
int do_return;
spinlock_acquire(&router->binlog_lock);
do_return = 0;
/* check for a pending transaction and not rotating */
if (router->pending_transaction && strcmp(router->binlog_name, slave->binlogfile) == 0 &&
(slave->binlog_pos > router->binlog_position) && !router->rotating) {
do_return = 1;
}
spinlock_release(&router->binlog_lock);
if (do_return) {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Slave %s:%d, server-id %d, binlog '%s', blr_slave_callback is not calling blr_slave_catchup, pos %lu",
slave->dcb->remote,
slave->port,
slave->serverid,
slave->binlogfile,
(unsigned long)slave->binlog_pos)));
slave->cstate |= CS_EXPECTCB;
poll_fake_write_event(slave->dcb);
return 0;
}
spinlock_acquire(&slave->catch_lock); spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~(CS_UPTODATE|CS_EXPECTCB); slave->cstate &= ~(CS_UPTODATE|CS_EXPECTCB);
spinlock_release(&slave->catch_lock); spinlock_release(&slave->catch_lock);