Remove file from slave

The binlog file is now always opened when it is needed and closed
when we are finished with it. That will remove any potential
file concurrency issues between different threads dealing with
the same slave.
This commit is contained in:
Johan Wikman
2015-12-04 11:43:00 +02:00
committed by Markus Makela
parent 71212a824b
commit 04f807290a
3 changed files with 54 additions and 11 deletions

View File

@ -756,7 +756,9 @@ ROUTER_SLAVE *slave;
spinlock_init(&slave->catch_lock);
slave->dcb = session->client;
slave->router = inst;
#ifdef BLFILE_IN_SLAVE
slave->file = NULL;
#endif
strcpy(slave->binlogfile, "unassigned");
slave->connect_time = time(0);
slave->lastEventTimestamp = 0;
@ -901,8 +903,12 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
*/
slave->state = BLRS_UNREGISTERED;
#if BLFILE_IN_SLAVE
// TODO: Is it really certain the file can be closed here? If other
// TODO: threads are using the slave instance, bag things will happen. [JWi].
if (slave->file)
blr_close_binlog(router, slave->file);
#endif
/* Unlock */
rses_end_locked_router_action(slave);

View File

@ -103,7 +103,7 @@ static int blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, G
int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, BLFILE** filep);
static void blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_send_maxscale_version(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_send_server_id(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
@ -1915,10 +1915,18 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
slave->cstate |= CS_BUSY;
spinlock_release(&slave->catch_lock);
if (slave->file == NULL)
BLFILE *file;
#ifdef BLFILE_IN_SLAVE
file = slave->file;
slave->file = NULL;
#else
file = NULL;
#endif
if (file == NULL)
{
rotating = router->rotating;
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL)
{
char err_msg[BINLOG_ERROR_MSG_LEN+1];
err_msg[BINLOG_ERROR_MSG_LEN] = '\0';
@ -1951,8 +1959,12 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
}
slave->stats.n_bursts++;
#ifdef BLSLAVE_IN_FILE
slave->file = file;
#endif
while (burst-- && burst_size > 0 &&
(record = blr_read_binlog(router, slave->file, slave->binlog_pos, &hdr, read_errmsg)) != NULL)
(record = blr_read_binlog(router, file, slave->binlog_pos, &hdr, read_errmsg)) != NULL)
{
head = gwbuf_alloc(5);
ptr = GWBUF_DATA(head);
@ -1967,13 +1979,17 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
if (hdr.event_type == ROTATE_EVENT)
{
unsigned long beat1 = hkheartbeat;
blr_close_binlog(router, slave->file);
blr_close_binlog(router, file);
if (hkheartbeat - beat1 > 1)
MXS_ERROR("blr_close_binlog took %lu maxscale beats",
hkheartbeat - beat1);
blr_slave_rotate(router, slave, GWBUF_DATA(record));
beat1 = hkheartbeat;
#ifdef BLFILE_IN_SLAVE
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
#else
if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL)
#endif
{
char err_msg[BINLOG_ERROR_MSG_LEN+1];
err_msg[BINLOG_ERROR_MSG_LEN] = '\0';
@ -2003,6 +2019,9 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
dcb_close(slave->dcb);
break;
}
#ifdef BLFILE_IN_SLAVE
file = slave->file;
#endif
if (hkheartbeat - beat1 > 1)
MXS_ERROR("blr_open_binlog took %lu beats",
hkheartbeat - beat1);
@ -2076,7 +2095,9 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
blr_send_custom_error(slave->dcb, slave->seqno++, 0, read_errmsg, "HY000", 1236);
dcb_close(slave->dcb);
#ifndef BLFILE_IN_SLAVE
blr_close_binlog(router, file);
#endif
return 0;
}
@ -2096,6 +2117,9 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
*/
dcb_close(slave->dcb);
#ifndef BLFILE_IN_SLAVE
blr_close_binlog(router, file);
#endif
return 0;
}
}
@ -2194,7 +2218,7 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
}
else
{
if (slave->binlog_pos >= blr_file_size(slave->file)
if (slave->binlog_pos >= blr_file_size(file)
&& router->rotating == 0
&& strcmp(router->binlog_name, slave->binlogfile) != 0
&& (blr_master_connected(router)
@ -2219,7 +2243,11 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
slave->binlogfile, (unsigned long)slave->binlog_pos,
router->binlog_name, router->binlog_position);
if (blr_slave_fake_rotate(router, slave))
#ifdef BLFILE_IN_SLAVE
if (blr_slave_fake_rotate(router, slave, &slave->file))
#else
if (blr_slave_fake_rotate(router, slave, &file))
#endif
{
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
@ -2240,6 +2268,13 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
poll_fake_write_event(slave->dcb);
}
}
#ifndef BLFILE_IN_SLAVE
if (file)
{
blr_close_binlog(router, file);
}
#endif
return rval;
}
@ -2371,7 +2406,7 @@ int len = EXTRACT24(ptr + 9); // Extract the event length
* @return Non-zero if the rotate took place
*/
static int
blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, BLFILE** filep)
{
char *sptr;
int filenum;
@ -2383,11 +2418,11 @@ uint32_t chksum;
if ((sptr = strrchr(slave->binlogfile, '.')) == NULL)
return 0;
blr_close_binlog(router, slave->file);
blr_close_binlog(router, *filep);
filenum = atoi(sptr + 1);
sprintf(slave->binlogfile, BINLOG_NAMEFMT, router->fileroot, filenum + 1);
slave->binlog_pos = 4;
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
if ((*filep = blr_open_binlog(router, slave->binlogfile)) == NULL)
return 0;
binlognamelen = strlen(slave->binlogfile);