Remove file from slave
The binlog file is now always opened when it is needed and closed when we are finished with it. That will remove any potential file concurrency issues between different threads dealing with the same slave.
This commit is contained in:

committed by
MassimilianoPinto

parent
28f05198dd
commit
40cfacfec4
@ -301,7 +301,9 @@ typedef struct router_slave {
|
|||||||
char binlogfile[BINLOG_FNAMELEN+1];
|
char binlogfile[BINLOG_FNAMELEN+1];
|
||||||
/*< Current binlog file for this slave */
|
/*< Current binlog file for this slave */
|
||||||
char *uuid; /*< Slave UUID */
|
char *uuid; /*< Slave UUID */
|
||||||
|
#ifdef BLFILE_IN_SLAVE
|
||||||
BLFILE *file; /*< Currently open binlog file */
|
BLFILE *file; /*< Currently open binlog file */
|
||||||
|
#endif
|
||||||
int serverid; /*< Server-id of the slave */
|
int serverid; /*< Server-id of the slave */
|
||||||
char *hostname; /*< Hostname of the slave, if known */
|
char *hostname; /*< Hostname of the slave, if known */
|
||||||
char *user; /*< Username if given */
|
char *user; /*< Username if given */
|
||||||
|
@ -747,7 +747,9 @@ ROUTER_SLAVE *slave;
|
|||||||
spinlock_init(&slave->catch_lock);
|
spinlock_init(&slave->catch_lock);
|
||||||
slave->dcb = session->client;
|
slave->dcb = session->client;
|
||||||
slave->router = inst;
|
slave->router = inst;
|
||||||
|
#ifdef BLFILE_IN_SLAVE
|
||||||
slave->file = NULL;
|
slave->file = NULL;
|
||||||
|
#endif
|
||||||
strcpy(slave->binlogfile, "unassigned");
|
strcpy(slave->binlogfile, "unassigned");
|
||||||
slave->connect_time = time(0);
|
slave->connect_time = time(0);
|
||||||
slave->lastEventTimestamp = 0;
|
slave->lastEventTimestamp = 0;
|
||||||
@ -892,8 +894,12 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
|
|||||||
*/
|
*/
|
||||||
slave->state = BLRS_UNREGISTERED;
|
slave->state = BLRS_UNREGISTERED;
|
||||||
|
|
||||||
|
#if BLFILE_IN_SLAVE
|
||||||
|
// TODO: Is it really certain the file can be closed here? If other
|
||||||
|
// TODO: threads are using the slave instance, bag things will happen. [JWi].
|
||||||
if (slave->file)
|
if (slave->file)
|
||||||
blr_close_binlog(router, slave->file);
|
blr_close_binlog(router, slave->file);
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Unlock */
|
/* Unlock */
|
||||||
rses_end_locked_router_action(slave);
|
rses_end_locked_router_action(slave);
|
||||||
|
@ -103,7 +103,7 @@ static int blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, G
|
|||||||
int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
|
int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
|
||||||
uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
|
uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
|
||||||
int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
|
int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
|
||||||
static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, BLFILE** filep);
|
||||||
static void blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
static void blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
||||||
static int blr_slave_send_maxscale_version(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
static int blr_slave_send_maxscale_version(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
||||||
static int blr_slave_send_server_id(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
static int blr_slave_send_server_id(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
||||||
@ -1915,10 +1915,18 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
|||||||
slave->cstate |= CS_BUSY;
|
slave->cstate |= CS_BUSY;
|
||||||
spinlock_release(&slave->catch_lock);
|
spinlock_release(&slave->catch_lock);
|
||||||
|
|
||||||
if (slave->file == NULL)
|
BLFILE *file;
|
||||||
|
#ifdef BLFILE_IN_SLAVE
|
||||||
|
file = slave->file;
|
||||||
|
slave->file = NULL;
|
||||||
|
#else
|
||||||
|
file = NULL;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if (file == NULL)
|
||||||
{
|
{
|
||||||
rotating = router->rotating;
|
rotating = router->rotating;
|
||||||
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
|
if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL)
|
||||||
{
|
{
|
||||||
char err_msg[BINLOG_ERROR_MSG_LEN+1];
|
char err_msg[BINLOG_ERROR_MSG_LEN+1];
|
||||||
err_msg[BINLOG_ERROR_MSG_LEN] = '\0';
|
err_msg[BINLOG_ERROR_MSG_LEN] = '\0';
|
||||||
@ -1951,8 +1959,12 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
|||||||
}
|
}
|
||||||
slave->stats.n_bursts++;
|
slave->stats.n_bursts++;
|
||||||
|
|
||||||
|
#ifdef BLSLAVE_IN_FILE
|
||||||
|
slave->file = file;
|
||||||
|
#endif
|
||||||
|
|
||||||
while (burst-- && burst_size > 0 &&
|
while (burst-- && burst_size > 0 &&
|
||||||
(record = blr_read_binlog(router, slave->file, slave->binlog_pos, &hdr, read_errmsg)) != NULL)
|
(record = blr_read_binlog(router, file, slave->binlog_pos, &hdr, read_errmsg)) != NULL)
|
||||||
{
|
{
|
||||||
head = gwbuf_alloc(5);
|
head = gwbuf_alloc(5);
|
||||||
ptr = GWBUF_DATA(head);
|
ptr = GWBUF_DATA(head);
|
||||||
@ -1967,13 +1979,17 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
|||||||
if (hdr.event_type == ROTATE_EVENT)
|
if (hdr.event_type == ROTATE_EVENT)
|
||||||
{
|
{
|
||||||
unsigned long beat1 = hkheartbeat;
|
unsigned long beat1 = hkheartbeat;
|
||||||
blr_close_binlog(router, slave->file);
|
blr_close_binlog(router, file);
|
||||||
if (hkheartbeat - beat1 > 1)
|
if (hkheartbeat - beat1 > 1)
|
||||||
MXS_ERROR("blr_close_binlog took %lu maxscale beats",
|
MXS_ERROR("blr_close_binlog took %lu maxscale beats",
|
||||||
hkheartbeat - beat1);
|
hkheartbeat - beat1);
|
||||||
blr_slave_rotate(router, slave, GWBUF_DATA(record));
|
blr_slave_rotate(router, slave, GWBUF_DATA(record));
|
||||||
beat1 = hkheartbeat;
|
beat1 = hkheartbeat;
|
||||||
|
#ifdef BLFILE_IN_SLAVE
|
||||||
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
|
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
|
||||||
|
#else
|
||||||
|
if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL)
|
||||||
|
#endif
|
||||||
{
|
{
|
||||||
char err_msg[BINLOG_ERROR_MSG_LEN+1];
|
char err_msg[BINLOG_ERROR_MSG_LEN+1];
|
||||||
err_msg[BINLOG_ERROR_MSG_LEN] = '\0';
|
err_msg[BINLOG_ERROR_MSG_LEN] = '\0';
|
||||||
@ -2003,6 +2019,9 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
|||||||
dcb_close(slave->dcb);
|
dcb_close(slave->dcb);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
#ifdef BLFILE_IN_SLAVE
|
||||||
|
file = slave->file;
|
||||||
|
#endif
|
||||||
if (hkheartbeat - beat1 > 1)
|
if (hkheartbeat - beat1 > 1)
|
||||||
MXS_ERROR("blr_open_binlog took %lu beats",
|
MXS_ERROR("blr_open_binlog took %lu beats",
|
||||||
hkheartbeat - beat1);
|
hkheartbeat - beat1);
|
||||||
@ -2055,7 +2074,9 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
|||||||
blr_send_custom_error(slave->dcb, slave->seqno++, 0, read_errmsg, "HY000", 1236);
|
blr_send_custom_error(slave->dcb, slave->seqno++, 0, read_errmsg, "HY000", 1236);
|
||||||
|
|
||||||
dcb_close(slave->dcb);
|
dcb_close(slave->dcb);
|
||||||
|
#ifndef BLFILE_IN_SLAVE
|
||||||
|
blr_close_binlog(router, file);
|
||||||
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2075,6 +2096,9 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
|||||||
*/
|
*/
|
||||||
dcb_close(slave->dcb);
|
dcb_close(slave->dcb);
|
||||||
|
|
||||||
|
#ifndef BLFILE_IN_SLAVE
|
||||||
|
blr_close_binlog(router, file);
|
||||||
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2173,7 +2197,7 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (slave->binlog_pos >= blr_file_size(slave->file)
|
if (slave->binlog_pos >= blr_file_size(file)
|
||||||
&& router->rotating == 0
|
&& router->rotating == 0
|
||||||
&& strcmp(router->binlog_name, slave->binlogfile) != 0
|
&& strcmp(router->binlog_name, slave->binlogfile) != 0
|
||||||
&& (blr_master_connected(router)
|
&& (blr_master_connected(router)
|
||||||
@ -2198,7 +2222,11 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
|||||||
slave->binlogfile, (unsigned long)slave->binlog_pos,
|
slave->binlogfile, (unsigned long)slave->binlog_pos,
|
||||||
router->binlog_name, router->binlog_position);
|
router->binlog_name, router->binlog_position);
|
||||||
|
|
||||||
if (blr_slave_fake_rotate(router, slave))
|
#ifdef BLFILE_IN_SLAVE
|
||||||
|
if (blr_slave_fake_rotate(router, slave, &slave->file))
|
||||||
|
#else
|
||||||
|
if (blr_slave_fake_rotate(router, slave, &file))
|
||||||
|
#endif
|
||||||
{
|
{
|
||||||
spinlock_acquire(&slave->catch_lock);
|
spinlock_acquire(&slave->catch_lock);
|
||||||
slave->cstate |= CS_EXPECTCB;
|
slave->cstate |= CS_EXPECTCB;
|
||||||
@ -2219,6 +2247,13 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
|||||||
poll_fake_write_event(slave->dcb);
|
poll_fake_write_event(slave->dcb);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifndef BLFILE_IN_SLAVE
|
||||||
|
if (file)
|
||||||
|
{
|
||||||
|
blr_close_binlog(router, file);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2350,7 +2385,7 @@ int len = EXTRACT24(ptr + 9); // Extract the event length
|
|||||||
* @return Non-zero if the rotate took place
|
* @return Non-zero if the rotate took place
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
|
blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, BLFILE** filep)
|
||||||
{
|
{
|
||||||
char *sptr;
|
char *sptr;
|
||||||
int filenum;
|
int filenum;
|
||||||
@ -2362,11 +2397,11 @@ uint32_t chksum;
|
|||||||
|
|
||||||
if ((sptr = strrchr(slave->binlogfile, '.')) == NULL)
|
if ((sptr = strrchr(slave->binlogfile, '.')) == NULL)
|
||||||
return 0;
|
return 0;
|
||||||
blr_close_binlog(router, slave->file);
|
blr_close_binlog(router, *filep);
|
||||||
filenum = atoi(sptr + 1);
|
filenum = atoi(sptr + 1);
|
||||||
sprintf(slave->binlogfile, BINLOG_NAMEFMT, router->fileroot, filenum + 1);
|
sprintf(slave->binlogfile, BINLOG_NAMEFMT, router->fileroot, filenum + 1);
|
||||||
slave->binlog_pos = 4;
|
slave->binlog_pos = 4;
|
||||||
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
|
if ((*filep = blr_open_binlog(router, slave->binlogfile)) == NULL)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
binlognamelen = strlen(slave->binlogfile);
|
binlognamelen = strlen(slave->binlogfile);
|
||||||
|
Reference in New Issue
Block a user