From 3e336323e829507cd86607ef3b77cb5a8c5a0d60 Mon Sep 17 00:00:00 2001 From: MassimilianoPinto Date: Wed, 24 Aug 2016 12:13:35 +0200 Subject: [PATCH] Live events distribution to slaves has been removed New CS_WAIT_DATA allows slave to receive notifications for live events. There is no longer event distribution from blr_master.c --- server/modules/include/blr.h | 13 +- server/modules/routing/binlog/blr.c | 23 +- server/modules/routing/binlog/blr_master.c | 451 +++------------------ server/modules/routing/binlog/blr_slave.c | 122 ++---- 4 files changed, 102 insertions(+), 507 deletions(-) diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 9f0bcd2e2..586942ee1 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -33,6 +33,8 @@ * 26/04/16 Massimiliano Pinto Added MariaDB 10.0 and 10.1 GTID event flags detection * 11/07/16 Massimiliano Pinto Added SSL backend support * 22/07/16 Massimiliano Pinto Added Semi-Sync replication support + * 24/08/16 Massimiliano Pinto Added slave notification state CS_WAIT_DATA. + * State CS_UPTODATE removed. * * @endverbatim */ @@ -590,13 +592,10 @@ static char *blrs_states[] = /** * Slave catch-up status */ -#define CS_UPTODATE 0x0004 -#define CS_EXPECTCB 0x0008 -#define CS_DIST 0x0010 -#define CS_DISTLATCH 0x0020 -#define CS_THRDWAIT 0x0040 -#define CS_BUSY 0x0100 -#define CS_HOLD 0x0200 +#define CS_UNDEFINED 0x0000 +#define CS_EXPECTCB 0x0004 +#define CS_WAIT_DATA 0x0020 +#define CS_BUSY 0x0040 /** * MySQL protocol OpCodes needed for replication diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 3e1f3d96b..1d8a3a39e 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -50,6 +50,7 @@ * for connection error and authentication failure. * 11/07/2016 Massimiliano Pinto Added SSL backend support * 22/07/2016 Massimiliano Pinto Added semi_sync replication support + * 24/08/2016 Massimiliano Pinto Added slave notification via CS_WAIT_DATA new state * * @endverbatim */ @@ -1531,21 +1532,19 @@ diagnostics(ROUTER *router, DCB *dcb) { dcb_printf(dcb, "\t\tSlave_mode: connected\n"); } - else if ((session->cstate & CS_UPTODATE) == 0) - { - dcb_printf(dcb, "\t\tSlave_mode: catchup. %s%s\n", - ((session->cstate & CS_EXPECTCB) == 0 ? "" : - "Waiting for DCB queue to drain."), - ((session->cstate & CS_BUSY) == 0 ? "" : - " Busy in slave catchup.")); - } else { - dcb_printf(dcb, "\t\tSlave_mode: follow\n"); - if (session->binlog_pos != router_inst->binlog_position) + if ((session->cstate & CS_WAIT_DATA) == CS_WAIT_DATA) { - dcb_printf(dcb, "\t\tSlave reports up to date however " - "the slave binlog position does not match the master\n"); + dcb_printf(dcb, "\t\tSlave_mode: wait-for-data\n"); + } + else + { + dcb_printf(dcb, "\t\tSlave_mode: catchup. %s%s\n", + ((session->cstate & CS_EXPECTCB) == 0 ? "" : + "Waiting for DCB queue to drain."), + ((session->cstate & CS_BUSY) == 0 ? "" : + " Busy in slave catchup.")); } } #if SPINLOCK_PROFILE diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index fe92566f1..52b27ce10 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -48,6 +48,7 @@ * 23/10/2015 Markus Makela Added current_safe_event * 26/04/2016 Massimiliano Pinto Added MariaDB 10.0 and 10.1 GTID event flags detection * 22/07/2016 Massimiliano Pinto Added semi_sync replication support + * 24/08/2016 Massimiliano Pinto Added slave notification and blr_distribute_binlog_record removed * * @endverbatim */ @@ -87,8 +88,6 @@ static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router); void encode_value(unsigned char *data, unsigned int value, int len); void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt); static int blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr); -void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr, - blr_thread_role_t role); static void *CreateMySQLAuthData(char *username, char *password, char *database); void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr); static void blr_log_packet(int priority, char *msg, uint8_t *ptr, int len); @@ -100,8 +99,6 @@ GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, static void blr_check_last_master_event(void *inst); extern int blr_check_heartbeat(ROUTER_INSTANCE *router); static void blr_log_identity(ROUTER_INSTANCE *router); -static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, - unsigned int err_code); static void blr_extract_header_semisync(uint8_t *pkt, REP_HEADER *hdr); static int blr_send_semisync_ack (ROUTER_INSTANCE *router, uint64_t pos); static int blr_get_master_semisync(GWBUF *buf); @@ -109,6 +106,8 @@ static int blr_get_master_semisync(GWBUF *buf); int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf); void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len); static void blr_terminate_master_replication(ROUTER_INSTANCE *router, uint8_t* ptr, int len); +void blr_notify_all_slaves(ROUTER_INSTANCE *router); +extern bool blr_notify_waiting_slave(ROUTER_SLAVE *slave); static int keepalive = 1; @@ -1659,138 +1658,26 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) spinlock_release(&router->binlog_lock); - if (router->master_event_state == BLR_EVENT_COMPLETE) - { - /** Read the complete event from the disk */ - GWBUF *record = blr_read_events_from_pos(router, - router->last_event_pos, - &hdr, hdr.next_pos); - if (record) - { - uint8_t *data = GWBUF_DATA(record); - blr_distribute_binlog_record(router, &hdr, data, - BLR_THREAD_ROLE_MASTER_LARGE_NOTRX); - gwbuf_free(record); - } - else - { - MXS_ERROR("Failed to read event at position" - "%lu with a size of %u bytes.", - router->last_event_pos, hdr.event_size); - } - } - else - { - /* Now distribute events */ - blr_distribute_binlog_record(router, &hdr, ptr, - BLR_THREAD_ROLE_MASTER_NOTRX); - } + /* Notify clients events can be read */ + blr_notify_all_slaves(router); } else { /** * If transaction is closed: * - * 1) read current binlog starting + * 1) Notify clients events can be read * from router->binlog_position - * - * 2) distribute read event - * - * 3) set router->binlog_position to + * 2) set router->binlog_position to * router->current_pos - * */ if (router->pending_transaction > BLRM_TRANSACTION_START) { - unsigned long long pos; - unsigned long long end_pos; - GWBUF *record; - uint8_t *raw_data; - REP_HEADER new_hdr; - - pos = router->binlog_position; - end_pos = router->current_pos; - spinlock_release(&router->binlog_lock); - while ((record = blr_read_events_from_pos(router, - pos, - &new_hdr, - end_pos)) != NULL) - { - raw_data = GWBUF_DATA(record); - - /* distribute event */ - blr_distribute_binlog_record(router, &new_hdr, raw_data, - BLR_THREAD_ROLE_MASTER_TRX); - - spinlock_acquire(&router->binlog_lock); - - /** The current safe position is only updated - * if it points to the event we just distributed. */ - if (router->current_safe_event == pos) - { - router->current_safe_event = new_hdr.next_pos; - } - else - { - MXS_ERROR("Current safe event (%lu) does" - " not point at the event we " - "just sent (%llu) from binlog file %s. " - "Last commit at %lu, last write at %lu.", - router->current_safe_event, pos, - router->binlog_name, router->last_safe_pos, - router->last_written); - } - - pos = new_hdr.next_pos; - - spinlock_release(&router->binlog_lock); - - gwbuf_free(record); - } - - /* Check whether binlog records has been read in previous loop */ - if (pos < router->current_pos) - { - char err_message[BINLOG_ERROR_MSG_LEN + 1]; - - err_message[BINLOG_ERROR_MSG_LEN] = '\0'; - - /* No event has been sent */ - if (pos == router->binlog_position) - { - MXS_ERROR("No events distributed to slaves for a pending " - "transaction in %s at %lu. " - "Last event from master at %lu", - router->binlog_name, - router->binlog_position, - router->current_pos); - - static const char MESSAGE[] = "No transaction events sent"; - ss_dassert(sizeof(MESSAGE) <= BINLOG_ERROR_MSG_LEN); - strcpy(err_message, MESSAGE); - } - else - { - /* Some events have been sent */ - MXS_ERROR("Some events were not distributed to slaves for a " - "pending transaction in %s at %lu. Last distributed " - "even at %llu, last event from master at %lu", - router->binlog_name, - router->binlog_position, - pos, - router->current_pos); - - static const char MESSAGE[] = "Incomplete transaction events sent"; - ss_dassert(sizeof(MESSAGE) <= BINLOG_ERROR_MSG_LEN); - strcpy(err_message, MESSAGE); - } - - /* distribute error message to registered slaves */ - blr_distribute_error_message(router, err_message, "HY000", 1236); - } + /* Notify clients events can be read */ + blr_notify_all_slaves(router); /* update binlog_position and set pending to 0 */ spinlock_acquire(&router->binlog_lock); @@ -2021,256 +1908,6 @@ typedef enum SLAVE_EVENT_ALREADY_SENT /*< The slave already has the event, don't send it */ } slave_event_action_t; -/** - * Distribute the binlog record we have just received to all the registered slaves. - * - * @param router The router instance - * @param hdr The replication event header - * @param ptr The raw replication event data - */ -void -blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr, - blr_thread_role_t role) -{ - ROUTER_SLAVE *slave; - int action; - unsigned int cstate; - - spinlock_acquire(&router->lock); - slave = router->slaves; - while (slave) - { - if (slave->state != BLRS_DUMPING) - { - slave = slave->next; - continue; - } - spinlock_acquire(&slave->catch_lock); - if ((slave->cstate & (CS_UPTODATE | CS_BUSY)) == CS_UPTODATE) - { - /* - * This slave is reporting it is to date with the binlog of the - * master running on this slave. - * It has no thread running currently that is sending binlog - * events. - */ - action = 1; - slave->cstate |= CS_BUSY; - } - else if ((slave->cstate & (CS_UPTODATE | CS_BUSY)) == (CS_UPTODATE | CS_BUSY)) - { - /* - * The slave is up to date with the binlog and a process is - * running on this slave to send binlog events. - */ - slave->overrun = 1; - action = 2; - } - else if ((slave->cstate & CS_UPTODATE) == 0) - { - /* Slave is in catchup mode */ - action = 3; - } - else - { - MXS_ERROR("slave->cstate does not contain a meaningful state %d", slave->cstate); - action = 0; - } - slave->stats.n_actions[action - 1]++; - spinlock_release(&slave->catch_lock); - - if (action == 1) - { - spinlock_acquire(&router->binlog_lock); - - slave_event_action_t slave_action = SLAVE_FORCE_CATCHUP; - const bool same_file = strcmp(slave->binlogfile, router->binlog_name) == 0; - const bool rotate = hdr->event_type == ROTATE_EVENT && - strcmp(slave->binlogfile, router->prevbinlog) == 0; - - if (router->trx_safe && (same_file || rotate) && - slave->binlog_pos == router->current_safe_event) - { - /** Slave needs the current event being distributed */ - slave_action = SLAVE_SEND_EVENT; - } - else if (!router->trx_safe && (same_file || rotate) && - slave->binlog_pos == router->last_event_pos) - { - /** Transaction safety is off */ - slave_action = SLAVE_SEND_EVENT; - } - else if (same_file) - { - if (slave->binlog_pos == hdr->next_pos) - { - /* - * Slave has already read record from file, no - * need to distrbute this event - */ - slave_action = SLAVE_EVENT_ALREADY_SENT; - } - else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size)) - { - /* - * The slave is ahead of the master, this should never - * happen. Force the slave to catchup mode in order to - * try to resolve the issue. - */ - MXS_ERROR("Slave %s:%d server ID %d is ahead of expected position %s@%u. " - "Expected position %d", slave->dcb->remote, - ntohs((slave->dcb->ipv4).sin_port), slave->serverid, - slave->binlogfile, slave->binlog_pos, - hdr->next_pos - hdr->event_size); - } - /* If none of the above conditions were met, a slave in catchup - * mode transitioned into up-to-date mode while we were - * distributing events. The slave needs to be forced into - * catchup mode since it isn't up to date anymore. */ - } - else if (rotate) - { - /** Slave is more than one binlog file behind */ - MXS_WARNING("Slave %s:%d server ID %d is behind more than one binlog file " - "from the master. Slave is using '%s' with position %d " - "when master binlog file is '%s'.", slave->dcb->remote, - ntohs((slave->dcb->ipv4).sin_port), slave->serverid, - slave->binlogfile, slave->binlog_pos, router->binlog_name); - } - else - { - /** Slave is lagging behind */ - MXS_WARNING("Slave %s:%d server ID %d is using binlog file '%s' with " - "position %d. Master binlog file is '%s' at position %lu " - "with last safe event at %lu.", slave->dcb->remote, - ntohs((slave->dcb->ipv4).sin_port), slave->serverid, - slave->binlogfile, slave->binlog_pos, router->binlog_name, - router->current_pos, router->current_safe_event); - } - - spinlock_release(&router->binlog_lock); - - /* - * If slave_action is SLAVE_FORCE_CATCHUP then - * the slave is not at the position it should be. Force it into - * catchup mode rather than send this event. - */ - - switch (slave_action) - { - char binlog_name[BINLOG_FNAMELEN + 1]; - uint32_t binlog_pos; - - case SLAVE_SEND_EVENT: - /* - * The slave should be up to date, check that the binlog - * position matches the event we have to distribute or - * this is a rotate event. Send the event directly from - * memory to the slave. - */ - slave->lastEventTimestamp = hdr->timestamp; - slave->lastEventReceived = hdr->event_type; - - /* set lastReply */ - if (router->send_slave_heartbeat) - { - slave->lastReply = time(0); - } - - strcpy(binlog_name, slave->binlogfile); - binlog_pos = slave->binlog_pos; - - if (hdr->event_type == ROTATE_EVENT) - { - blr_slave_rotate(router, slave, ptr); - } - - if (blr_send_event(role, binlog_name, binlog_pos, slave, hdr, ptr)) - { - spinlock_acquire(&slave->catch_lock); - if (hdr->event_type != ROTATE_EVENT) - { - slave->binlog_pos = hdr->next_pos; - } - if (slave->overrun) - { - slave->stats.n_overrun++; - slave->overrun = 0; - poll_fake_write_event(slave->dcb); - } - else - { - slave->cstate &= ~CS_BUSY; - } - spinlock_release(&slave->catch_lock); - } - else - { - MXS_WARNING("Slave %s:%i, server-id %d, binlog '%s, position %u: " - "Master-thread could not send event to slave, closing connection.", - slave->dcb->remote, - ntohs((slave->dcb->ipv4).sin_port), - slave->serverid, - binlog_name, - binlog_pos); - slave->state = BLRS_ERRORED; - dcb_close(slave->dcb); - } - break; - - case SLAVE_EVENT_ALREADY_SENT: - spinlock_acquire(&slave->catch_lock); - slave->cstate &= ~CS_BUSY; - spinlock_release(&slave->catch_lock); - break; - - case SLAVE_FORCE_CATCHUP: - spinlock_acquire(&slave->catch_lock); - cstate = slave->cstate; - slave->cstate &= ~(CS_UPTODATE | CS_BUSY); - slave->cstate |= CS_EXPECTCB; - spinlock_release(&slave->catch_lock); - if ((cstate & CS_UPTODATE) == CS_UPTODATE) - { -#ifdef STATE_CHANGE_LOGGING_ENABLED - MXS_NOTICE("%s: Slave %s:%d, server-id %d transition from " - "up-to-date to catch-up in blr_distribute_binlog_record, " - "binlog file '%s', position %lu.", - router->service->name, - slave->dcb->remote, - ntohs((slave->dcb->ipv4).sin_port), - slave->serverid, - slave->binlogfile, (unsigned long)slave->binlog_pos); -#endif - } - poll_fake_write_event(slave->dcb); - break; - } - } - else if (action == 3) - { - /* Slave is not up to date - * Check if it is either expecting a callback or - * is busy processing a callback - */ - spinlock_acquire(&slave->catch_lock); - if ((slave->cstate & (CS_EXPECTCB | CS_BUSY)) == 0) - { - slave->cstate |= CS_EXPECTCB; - spinlock_release(&slave->catch_lock); - poll_fake_write_event(slave->dcb); - } - else - { - spinlock_release(&slave->catch_lock); - } - } - - slave = slave->next; - } - spinlock_release(&router->lock); -} - /** * Write a raw event (the first 40 bytes at most) to a log file * @@ -2744,39 +2381,6 @@ static void blr_log_identity(ROUTER_INSTANCE *router) } } -/** - * Distribute an error message to all the registered slaves. - * - * @param router The router instance - * @param message The message to send - * @param state The MySQL State for message - * @param err_code The MySQL error code for message - */ -static void -blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code) -{ - ROUTER_SLAVE *slave; - - spinlock_acquire(&router->lock); - - slave = router->slaves; - while (slave) - { - if (slave->state != BLRS_DUMPING) - { - slave = slave->next; - continue; - } - - /* send the error that stops slave replication */ - blr_send_custom_error(slave->dcb, slave->seqno++, 0, message, state, err_code); - - slave = slave->next; - } - - spinlock_release(&router->lock); -} - /** * @brief Write data into binlogs (incomplete event) * @@ -3118,3 +2722,40 @@ blr_get_master_semisync(GWBUF *buf) return master_semisync; } + +/** + * Notify all the registered slaves to read from binlog file + * the new events just received + * + * @param router The router instance + */ +void blr_notify_all_slaves(ROUTER_INSTANCE *router) +{ + ROUTER_SLAVE *slave; + int notified = 0; + + spinlock_acquire(&router->lock); + slave = router->slaves; + while (slave) + { + if (slave->state != BLRS_DUMPING) + { + slave = slave->next; + continue; + } + + /* Notify a slave that has CS_WAIT_DATA bit set */ + if (blr_notify_waiting_slave(slave)) + { + notified++; + } + + slave = slave->next; + } + spinlock_release(&router->lock); + + if (notified > 0) + { + MXS_BEBUG("Notified %d slaves about new data.", notified); + } +} diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index 6a6c72798..e8756883d 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -60,6 +60,7 @@ * 23/10/2015 Markus Makela Added current_safe_event * 09/05/2016 Massimiliano Pinto Added SELECT USER() * 11/07/2016 Massimiliano Pinto Added SSL backend support + * 24/08/2016 Massimiliano Pinto Added slave notification via CS_WAIT_DATA * * @endverbatim */ @@ -2111,15 +2112,9 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue slave->serverid, slave->binlogfile, (unsigned long)slave->binlog_pos); - if (slave->binlog_pos != router->binlog_position || - strcmp(slave->binlogfile, router->binlog_name) != 0) - { - spinlock_acquire(&slave->catch_lock); - slave->cstate &= ~CS_UPTODATE; - slave->cstate |= CS_EXPECTCB; - spinlock_release(&slave->catch_lock); - poll_fake_write_event(slave->dcb); - } + /* force the slave to call catchup routine */ + poll_fake_write_event(slave->dcb); + return rval; } @@ -2489,18 +2484,16 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large) spinlock_acquire(&slave->catch_lock); slave->cstate |= CS_EXPECTCB; spinlock_release(&slave->catch_lock); + + /* force slave to read events via catchup routine */ poll_fake_write_event(slave->dcb); } else if (slave->binlog_pos == router->binlog_position && strcmp(slave->binlogfile, router->binlog_name) == 0) { - int state_change = 0; - unsigned int cstate = 0; spinlock_acquire(&router->binlog_lock); spinlock_acquire(&slave->catch_lock); - cstate = slave->cstate; - /* * Now check again since we hold the router->binlog_lock * and slave->catch_lock. @@ -2508,69 +2501,23 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large) if (slave->binlog_pos != router->binlog_position || strcmp(slave->binlogfile, router->binlog_name) != 0) { - slave->cstate &= ~CS_UPTODATE; slave->cstate |= CS_EXPECTCB; spinlock_release(&slave->catch_lock); spinlock_release(&router->binlog_lock); - if ((cstate & CS_UPTODATE) == CS_UPTODATE) - { -#ifdef STATE_CHANGE_LOGGING_ENABLED - MXS_NOTICE("%s: Slave %s:%d, server-id %d transition from up-to-date to " - "catch-up in blr_slave_catchup, binlog file '%s', position %lu.", - router->service->name, - slave->dcb->remote, - ntohs((slave->dcb->ipv4).sin_port), - slave->serverid, - slave->binlogfile, (unsigned long)slave->binlog_pos); -#endif - } - + /* force slave to read events via catchup routine */ poll_fake_write_event(slave->dcb); } else { - if ((slave->cstate & CS_UPTODATE) == 0) - { - slave->stats.n_upd++; - slave->cstate |= CS_UPTODATE; - spinlock_release(&slave->catch_lock); - spinlock_release(&router->binlog_lock); - state_change = 1; - } - else - { - MXS_NOTICE("Execution entered branch were locks previously were NOT " - "released. Previously this would have caused a lock-up."); - spinlock_release(&slave->catch_lock); - spinlock_release(&router->binlog_lock); - } - } + /* set the CS_WAIT_DATA that allows notification + * when new events are received form master server + * call back routine will be called later. + */ + slave->cstate |= CS_WAIT_DATA; - if (state_change) - { - slave->stats.n_caughtup++; -#ifdef STATE_CHANGE_LOGGING_ENABLED - // TODO: The % 50 should be removed. Now only every 50th state change is logged. - if (slave->stats.n_caughtup == 1) - { - MXS_NOTICE("%s: Slave %s:%d, server-id %d is now up to date '%s', position %lu.", - router->service->name, - slave->dcb->remote, - ntohs((slave->dcb->ipv4).sin_port), - slave->serverid, - slave->binlogfile, (unsigned long)slave->binlog_pos); - } - else if ((slave->stats.n_caughtup % 50) == 0) - { - MXS_NOTICE("%s: Slave %s:%d, server-id %d is up to date '%s', position %lu.", - router->service->name, - slave->dcb->remote, - ntohs((slave->dcb->ipv4).sin_port), - slave->serverid, - slave->binlogfile, (unsigned long)slave->binlog_pos); - } -#endif + spinlock_release(&slave->catch_lock); + spinlock_release(&router->binlog_lock); } } else @@ -2649,7 +2596,6 @@ blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data) { ROUTER_SLAVE *slave = (ROUTER_SLAVE *)data; ROUTER_INSTANCE *router = slave->router; - unsigned int cstate; if (NULL == dcb->session->router_session) { @@ -2670,25 +2616,12 @@ blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data) spinlock_release(&slave->catch_lock); return 0; } - cstate = slave->cstate; - slave->cstate &= ~(CS_UPTODATE | CS_EXPECTCB); + slave->cstate &= ~(CS_EXPECTCB); slave->cstate |= CS_BUSY; spinlock_release(&slave->catch_lock); - if ((cstate & CS_UPTODATE) == CS_UPTODATE) - { -#ifdef STATE_CHANGE_LOGGING_ENABLED - MXS_NOTICE("%s: Slave %s:%d, server-id %d transition from up-to-date to " - "catch-up in blr_slave_callback, binlog file '%s', position %lu.", - router->service->name, - slave->dcb->remote, - ntohs((slave->dcb->ipv4).sin_port), - slave->serverid, - slave->binlogfile, (unsigned long)slave->binlog_pos); -#endif - } - slave->stats.n_dcb++; + blr_slave_catchup(router, slave, true); } else @@ -5609,3 +5542,26 @@ blr_set_master_ssl(ROUTER_INSTANCE *router, CHANGE_MASTER_OPTIONS config, char * else return 0; } + +/** + * Notify a waiting slave that new events are stored in binlog file + * + * @param slave Current connected slave + * @return true if slave has been notified + * + */ +bool blr_notify_waiting_slave(ROUTER_SLAVE *slave) +{ + bool ret = false; + spinlock_acquire(&slave->catch_lock); + if (slave->cstate & CS_WAIT_DATA) + { + ret = true; + /* Add fake event that will call the blr_slave_callback routine */ + poll_fake_write_event(slave->dcb); + slave->cstate &= ~CS_WAIT_DATA; + } + spinlock_release(&slave->catch_lock); + + return ret; +}