diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index d8f27e675..58600d259 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -302,7 +302,9 @@ typedef struct router_slave { char binlogfile[BINLOG_FNAMELEN+1]; /*< Current binlog file for this slave */ char *uuid; /*< Slave UUID */ +#ifdef BLFILE_IN_SLAVE BLFILE *file; /*< Currently open binlog file */ +#endif int serverid; /*< Server-id of the slave */ char *hostname; /*< Hostname of the slave, if known */ char *user; /*< Username if given */ diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 76443e13c..629eb99c9 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -756,7 +756,9 @@ ROUTER_SLAVE *slave; spinlock_init(&slave->catch_lock); slave->dcb = session->client; slave->router = inst; +#ifdef BLFILE_IN_SLAVE slave->file = NULL; +#endif strcpy(slave->binlogfile, "unassigned"); slave->connect_time = time(0); slave->lastEventTimestamp = 0; @@ -901,8 +903,12 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session; */ slave->state = BLRS_UNREGISTERED; +#if BLFILE_IN_SLAVE + // TODO: Is it really certain the file can be closed here? If other + // TODO: threads are using the slave instance, bag things will happen. [JWi]. if (slave->file) blr_close_binlog(router, slave->file); +#endif /* Unlock */ rses_end_locked_router_action(slave); diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index dbfd061e7..c799d6071 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -103,7 +103,7 @@ static int blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, G int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large); uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr); int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data); -static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); +static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, BLFILE** filep); static void blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); static int blr_slave_send_maxscale_version(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); static int blr_slave_send_server_id(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); @@ -1915,10 +1915,18 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1]; slave->cstate |= CS_BUSY; spinlock_release(&slave->catch_lock); - if (slave->file == NULL) + BLFILE *file; +#ifdef BLFILE_IN_SLAVE + file = slave->file; + slave->file = NULL; +#else + file = NULL; +#endif + + if (file == NULL) { rotating = router->rotating; - if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL) + if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL) { char err_msg[BINLOG_ERROR_MSG_LEN+1]; err_msg[BINLOG_ERROR_MSG_LEN] = '\0'; @@ -1951,8 +1959,12 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1]; } slave->stats.n_bursts++; +#ifdef BLSLAVE_IN_FILE + slave->file = file; +#endif + while (burst-- && burst_size > 0 && - (record = blr_read_binlog(router, slave->file, slave->binlog_pos, &hdr, read_errmsg)) != NULL) + (record = blr_read_binlog(router, file, slave->binlog_pos, &hdr, read_errmsg)) != NULL) { head = gwbuf_alloc(5); ptr = GWBUF_DATA(head); @@ -1967,13 +1979,17 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1]; if (hdr.event_type == ROTATE_EVENT) { unsigned long beat1 = hkheartbeat; - blr_close_binlog(router, slave->file); + blr_close_binlog(router, file); if (hkheartbeat - beat1 > 1) MXS_ERROR("blr_close_binlog took %lu maxscale beats", hkheartbeat - beat1); blr_slave_rotate(router, slave, GWBUF_DATA(record)); beat1 = hkheartbeat; +#ifdef BLFILE_IN_SLAVE if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL) +#else + if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL) +#endif { char err_msg[BINLOG_ERROR_MSG_LEN+1]; err_msg[BINLOG_ERROR_MSG_LEN] = '\0'; @@ -2003,6 +2019,9 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1]; dcb_close(slave->dcb); break; } +#ifdef BLFILE_IN_SLAVE + file = slave->file; +#endif if (hkheartbeat - beat1 > 1) MXS_ERROR("blr_open_binlog took %lu beats", hkheartbeat - beat1); @@ -2076,7 +2095,9 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1]; blr_send_custom_error(slave->dcb, slave->seqno++, 0, read_errmsg, "HY000", 1236); dcb_close(slave->dcb); - +#ifndef BLFILE_IN_SLAVE + blr_close_binlog(router, file); +#endif return 0; } @@ -2096,6 +2117,9 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1]; */ dcb_close(slave->dcb); +#ifndef BLFILE_IN_SLAVE + blr_close_binlog(router, file); +#endif return 0; } } @@ -2194,7 +2218,7 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1]; } else { - if (slave->binlog_pos >= blr_file_size(slave->file) + if (slave->binlog_pos >= blr_file_size(file) && router->rotating == 0 && strcmp(router->binlog_name, slave->binlogfile) != 0 && (blr_master_connected(router) @@ -2219,7 +2243,11 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1]; slave->binlogfile, (unsigned long)slave->binlog_pos, router->binlog_name, router->binlog_position); - if (blr_slave_fake_rotate(router, slave)) +#ifdef BLFILE_IN_SLAVE + if (blr_slave_fake_rotate(router, slave, &slave->file)) +#else + if (blr_slave_fake_rotate(router, slave, &file)) +#endif { spinlock_acquire(&slave->catch_lock); slave->cstate |= CS_EXPECTCB; @@ -2240,6 +2268,13 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1]; poll_fake_write_event(slave->dcb); } } + +#ifndef BLFILE_IN_SLAVE + if (file) + { + blr_close_binlog(router, file); + } +#endif return rval; } @@ -2371,7 +2406,7 @@ int len = EXTRACT24(ptr + 9); // Extract the event length * @return Non-zero if the rotate took place */ static int -blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) +blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, BLFILE** filep) { char *sptr; int filenum; @@ -2383,11 +2418,11 @@ uint32_t chksum; if ((sptr = strrchr(slave->binlogfile, '.')) == NULL) return 0; - blr_close_binlog(router, slave->file); + blr_close_binlog(router, *filep); filenum = atoi(sptr + 1); sprintf(slave->binlogfile, BINLOG_NAMEFMT, router->fileroot, filenum + 1); slave->binlog_pos = 4; - if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL) + if ((*filep = blr_open_binlog(router, slave->binlogfile)) == NULL) return 0; binlognamelen = strlen(slave->binlogfile);