Live events distribution to slaves has been removed

New CS_WAIT_DATA allows slave to receive notifications for live events.

There is no longer event distribution from blr_master.c
This commit is contained in:
MassimilianoPinto
2016-08-24 12:13:35 +02:00
parent e231236cee
commit 3e336323e8
4 changed files with 102 additions and 507 deletions

View File

@ -50,6 +50,7 @@
* for connection error and authentication failure.
* 11/07/2016 Massimiliano Pinto Added SSL backend support
* 22/07/2016 Massimiliano Pinto Added semi_sync replication support
* 24/08/2016 Massimiliano Pinto Added slave notification via CS_WAIT_DATA new state
*
* @endverbatim
*/
@ -1531,21 +1532,19 @@ diagnostics(ROUTER *router, DCB *dcb)
{
dcb_printf(dcb, "\t\tSlave_mode: connected\n");
}
else if ((session->cstate & CS_UPTODATE) == 0)
{
dcb_printf(dcb, "\t\tSlave_mode: catchup. %s%s\n",
((session->cstate & CS_EXPECTCB) == 0 ? "" :
"Waiting for DCB queue to drain."),
((session->cstate & CS_BUSY) == 0 ? "" :
" Busy in slave catchup."));
}
else
{
dcb_printf(dcb, "\t\tSlave_mode: follow\n");
if (session->binlog_pos != router_inst->binlog_position)
if ((session->cstate & CS_WAIT_DATA) == CS_WAIT_DATA)
{
dcb_printf(dcb, "\t\tSlave reports up to date however "
"the slave binlog position does not match the master\n");
dcb_printf(dcb, "\t\tSlave_mode: wait-for-data\n");
}
else
{
dcb_printf(dcb, "\t\tSlave_mode: catchup. %s%s\n",
((session->cstate & CS_EXPECTCB) == 0 ? "" :
"Waiting for DCB queue to drain."),
((session->cstate & CS_BUSY) == 0 ? "" :
" Busy in slave catchup."));
}
}
#if SPINLOCK_PROFILE

View File

@ -48,6 +48,7 @@
* 23/10/2015 Markus Makela Added current_safe_event
* 26/04/2016 Massimiliano Pinto Added MariaDB 10.0 and 10.1 GTID event flags detection
* 22/07/2016 Massimiliano Pinto Added semi_sync replication support
* 24/08/2016 Massimiliano Pinto Added slave notification and blr_distribute_binlog_record removed
*
* @endverbatim
*/
@ -87,8 +88,6 @@ static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
void encode_value(unsigned char *data, unsigned int value, int len);
void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
static int blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr,
blr_thread_role_t role);
static void *CreateMySQLAuthData(char *username, char *password, char *database);
void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
static void blr_log_packet(int priority, char *msg, uint8_t *ptr, int len);
@ -100,8 +99,6 @@ GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos,
static void blr_check_last_master_event(void *inst);
extern int blr_check_heartbeat(ROUTER_INSTANCE *router);
static void blr_log_identity(ROUTER_INSTANCE *router);
static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state,
unsigned int err_code);
static void blr_extract_header_semisync(uint8_t *pkt, REP_HEADER *hdr);
static int blr_send_semisync_ack (ROUTER_INSTANCE *router, uint64_t pos);
static int blr_get_master_semisync(GWBUF *buf);
@ -109,6 +106,8 @@ static int blr_get_master_semisync(GWBUF *buf);
int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf);
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len);
static void blr_terminate_master_replication(ROUTER_INSTANCE *router, uint8_t* ptr, int len);
void blr_notify_all_slaves(ROUTER_INSTANCE *router);
extern bool blr_notify_waiting_slave(ROUTER_SLAVE *slave);
static int keepalive = 1;
@ -1659,138 +1658,26 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
spinlock_release(&router->binlog_lock);
if (router->master_event_state == BLR_EVENT_COMPLETE)
{
/** Read the complete event from the disk */
GWBUF *record = blr_read_events_from_pos(router,
router->last_event_pos,
&hdr, hdr.next_pos);
if (record)
{
uint8_t *data = GWBUF_DATA(record);
blr_distribute_binlog_record(router, &hdr, data,
BLR_THREAD_ROLE_MASTER_LARGE_NOTRX);
gwbuf_free(record);
}
else
{
MXS_ERROR("Failed to read event at position"
"%lu with a size of %u bytes.",
router->last_event_pos, hdr.event_size);
}
}
else
{
/* Now distribute events */
blr_distribute_binlog_record(router, &hdr, ptr,
BLR_THREAD_ROLE_MASTER_NOTRX);
}
/* Notify clients events can be read */
blr_notify_all_slaves(router);
}
else
{
/**
* If transaction is closed:
*
* 1) read current binlog starting
* 1) Notify clients events can be read
* from router->binlog_position
*
* 2) distribute read event
*
* 3) set router->binlog_position to
* 2) set router->binlog_position to
* router->current_pos
*
*/
if (router->pending_transaction > BLRM_TRANSACTION_START)
{
unsigned long long pos;
unsigned long long end_pos;
GWBUF *record;
uint8_t *raw_data;
REP_HEADER new_hdr;
pos = router->binlog_position;
end_pos = router->current_pos;
spinlock_release(&router->binlog_lock);
while ((record = blr_read_events_from_pos(router,
pos,
&new_hdr,
end_pos)) != NULL)
{
raw_data = GWBUF_DATA(record);
/* distribute event */
blr_distribute_binlog_record(router, &new_hdr, raw_data,
BLR_THREAD_ROLE_MASTER_TRX);
spinlock_acquire(&router->binlog_lock);
/** The current safe position is only updated
* if it points to the event we just distributed. */
if (router->current_safe_event == pos)
{
router->current_safe_event = new_hdr.next_pos;
}
else
{
MXS_ERROR("Current safe event (%lu) does"
" not point at the event we "
"just sent (%llu) from binlog file %s. "
"Last commit at %lu, last write at %lu.",
router->current_safe_event, pos,
router->binlog_name, router->last_safe_pos,
router->last_written);
}
pos = new_hdr.next_pos;
spinlock_release(&router->binlog_lock);
gwbuf_free(record);
}
/* Check whether binlog records has been read in previous loop */
if (pos < router->current_pos)
{
char err_message[BINLOG_ERROR_MSG_LEN + 1];
err_message[BINLOG_ERROR_MSG_LEN] = '\0';
/* No event has been sent */
if (pos == router->binlog_position)
{
MXS_ERROR("No events distributed to slaves for a pending "
"transaction in %s at %lu. "
"Last event from master at %lu",
router->binlog_name,
router->binlog_position,
router->current_pos);
static const char MESSAGE[] = "No transaction events sent";
ss_dassert(sizeof(MESSAGE) <= BINLOG_ERROR_MSG_LEN);
strcpy(err_message, MESSAGE);
}
else
{
/* Some events have been sent */
MXS_ERROR("Some events were not distributed to slaves for a "
"pending transaction in %s at %lu. Last distributed "
"even at %llu, last event from master at %lu",
router->binlog_name,
router->binlog_position,
pos,
router->current_pos);
static const char MESSAGE[] = "Incomplete transaction events sent";
ss_dassert(sizeof(MESSAGE) <= BINLOG_ERROR_MSG_LEN);
strcpy(err_message, MESSAGE);
}
/* distribute error message to registered slaves */
blr_distribute_error_message(router, err_message, "HY000", 1236);
}
/* Notify clients events can be read */
blr_notify_all_slaves(router);
/* update binlog_position and set pending to 0 */
spinlock_acquire(&router->binlog_lock);
@ -2021,256 +1908,6 @@ typedef enum
SLAVE_EVENT_ALREADY_SENT /*< The slave already has the event, don't send it */
} slave_event_action_t;
/**
* Distribute the binlog record we have just received to all the registered slaves.
*
* @param router The router instance
* @param hdr The replication event header
* @param ptr The raw replication event data
*/
void
blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr,
blr_thread_role_t role)
{
ROUTER_SLAVE *slave;
int action;
unsigned int cstate;
spinlock_acquire(&router->lock);
slave = router->slaves;
while (slave)
{
if (slave->state != BLRS_DUMPING)
{
slave = slave->next;
continue;
}
spinlock_acquire(&slave->catch_lock);
if ((slave->cstate & (CS_UPTODATE | CS_BUSY)) == CS_UPTODATE)
{
/*
* This slave is reporting it is to date with the binlog of the
* master running on this slave.
* It has no thread running currently that is sending binlog
* events.
*/
action = 1;
slave->cstate |= CS_BUSY;
}
else if ((slave->cstate & (CS_UPTODATE | CS_BUSY)) == (CS_UPTODATE | CS_BUSY))
{
/*
* The slave is up to date with the binlog and a process is
* running on this slave to send binlog events.
*/
slave->overrun = 1;
action = 2;
}
else if ((slave->cstate & CS_UPTODATE) == 0)
{
/* Slave is in catchup mode */
action = 3;
}
else
{
MXS_ERROR("slave->cstate does not contain a meaningful state %d", slave->cstate);
action = 0;
}
slave->stats.n_actions[action - 1]++;
spinlock_release(&slave->catch_lock);
if (action == 1)
{
spinlock_acquire(&router->binlog_lock);
slave_event_action_t slave_action = SLAVE_FORCE_CATCHUP;
const bool same_file = strcmp(slave->binlogfile, router->binlog_name) == 0;
const bool rotate = hdr->event_type == ROTATE_EVENT &&
strcmp(slave->binlogfile, router->prevbinlog) == 0;
if (router->trx_safe && (same_file || rotate) &&
slave->binlog_pos == router->current_safe_event)
{
/** Slave needs the current event being distributed */
slave_action = SLAVE_SEND_EVENT;
}
else if (!router->trx_safe && (same_file || rotate) &&
slave->binlog_pos == router->last_event_pos)
{
/** Transaction safety is off */
slave_action = SLAVE_SEND_EVENT;
}
else if (same_file)
{
if (slave->binlog_pos == hdr->next_pos)
{
/*
* Slave has already read record from file, no
* need to distrbute this event
*/
slave_action = SLAVE_EVENT_ALREADY_SENT;
}
else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size))
{
/*
* The slave is ahead of the master, this should never
* happen. Force the slave to catchup mode in order to
* try to resolve the issue.
*/
MXS_ERROR("Slave %s:%d server ID %d is ahead of expected position %s@%u. "
"Expected position %d", slave->dcb->remote,
ntohs((slave->dcb->ipv4).sin_port), slave->serverid,
slave->binlogfile, slave->binlog_pos,
hdr->next_pos - hdr->event_size);
}
/* If none of the above conditions were met, a slave in catchup
* mode transitioned into up-to-date mode while we were
* distributing events. The slave needs to be forced into
* catchup mode since it isn't up to date anymore. */
}
else if (rotate)
{
/** Slave is more than one binlog file behind */
MXS_WARNING("Slave %s:%d server ID %d is behind more than one binlog file "
"from the master. Slave is using '%s' with position %d "
"when master binlog file is '%s'.", slave->dcb->remote,
ntohs((slave->dcb->ipv4).sin_port), slave->serverid,
slave->binlogfile, slave->binlog_pos, router->binlog_name);
}
else
{
/** Slave is lagging behind */
MXS_WARNING("Slave %s:%d server ID %d is using binlog file '%s' with "
"position %d. Master binlog file is '%s' at position %lu "
"with last safe event at %lu.", slave->dcb->remote,
ntohs((slave->dcb->ipv4).sin_port), slave->serverid,
slave->binlogfile, slave->binlog_pos, router->binlog_name,
router->current_pos, router->current_safe_event);
}
spinlock_release(&router->binlog_lock);
/*
* If slave_action is SLAVE_FORCE_CATCHUP then
* the slave is not at the position it should be. Force it into
* catchup mode rather than send this event.
*/
switch (slave_action)
{
char binlog_name[BINLOG_FNAMELEN + 1];
uint32_t binlog_pos;
case SLAVE_SEND_EVENT:
/*
* The slave should be up to date, check that the binlog
* position matches the event we have to distribute or
* this is a rotate event. Send the event directly from
* memory to the slave.
*/
slave->lastEventTimestamp = hdr->timestamp;
slave->lastEventReceived = hdr->event_type;
/* set lastReply */
if (router->send_slave_heartbeat)
{
slave->lastReply = time(0);
}
strcpy(binlog_name, slave->binlogfile);
binlog_pos = slave->binlog_pos;
if (hdr->event_type == ROTATE_EVENT)
{
blr_slave_rotate(router, slave, ptr);
}
if (blr_send_event(role, binlog_name, binlog_pos, slave, hdr, ptr))
{
spinlock_acquire(&slave->catch_lock);
if (hdr->event_type != ROTATE_EVENT)
{
slave->binlog_pos = hdr->next_pos;
}
if (slave->overrun)
{
slave->stats.n_overrun++;
slave->overrun = 0;
poll_fake_write_event(slave->dcb);
}
else
{
slave->cstate &= ~CS_BUSY;
}
spinlock_release(&slave->catch_lock);
}
else
{
MXS_WARNING("Slave %s:%i, server-id %d, binlog '%s, position %u: "
"Master-thread could not send event to slave, closing connection.",
slave->dcb->remote,
ntohs((slave->dcb->ipv4).sin_port),
slave->serverid,
binlog_name,
binlog_pos);
slave->state = BLRS_ERRORED;
dcb_close(slave->dcb);
}
break;
case SLAVE_EVENT_ALREADY_SENT:
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_BUSY;
spinlock_release(&slave->catch_lock);
break;
case SLAVE_FORCE_CATCHUP:
spinlock_acquire(&slave->catch_lock);
cstate = slave->cstate;
slave->cstate &= ~(CS_UPTODATE | CS_BUSY);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
if ((cstate & CS_UPTODATE) == CS_UPTODATE)
{
#ifdef STATE_CHANGE_LOGGING_ENABLED
MXS_NOTICE("%s: Slave %s:%d, server-id %d transition from "
"up-to-date to catch-up in blr_distribute_binlog_record, "
"binlog file '%s', position %lu.",
router->service->name,
slave->dcb->remote,
ntohs((slave->dcb->ipv4).sin_port),
slave->serverid,
slave->binlogfile, (unsigned long)slave->binlog_pos);
#endif
}
poll_fake_write_event(slave->dcb);
break;
}
}
else if (action == 3)
{
/* Slave is not up to date
* Check if it is either expecting a callback or
* is busy processing a callback
*/
spinlock_acquire(&slave->catch_lock);
if ((slave->cstate & (CS_EXPECTCB | CS_BUSY)) == 0)
{
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
}
else
{
spinlock_release(&slave->catch_lock);
}
}
slave = slave->next;
}
spinlock_release(&router->lock);
}
/**
* Write a raw event (the first 40 bytes at most) to a log file
*
@ -2744,39 +2381,6 @@ static void blr_log_identity(ROUTER_INSTANCE *router)
}
}
/**
* Distribute an error message to all the registered slaves.
*
* @param router The router instance
* @param message The message to send
* @param state The MySQL State for message
* @param err_code The MySQL error code for message
*/
static void
blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code)
{
ROUTER_SLAVE *slave;
spinlock_acquire(&router->lock);
slave = router->slaves;
while (slave)
{
if (slave->state != BLRS_DUMPING)
{
slave = slave->next;
continue;
}
/* send the error that stops slave replication */
blr_send_custom_error(slave->dcb, slave->seqno++, 0, message, state, err_code);
slave = slave->next;
}
spinlock_release(&router->lock);
}
/**
* @brief Write data into binlogs (incomplete event)
*
@ -3118,3 +2722,40 @@ blr_get_master_semisync(GWBUF *buf)
return master_semisync;
}
/**
* Notify all the registered slaves to read from binlog file
* the new events just received
*
* @param router The router instance
*/
void blr_notify_all_slaves(ROUTER_INSTANCE *router)
{
ROUTER_SLAVE *slave;
int notified = 0;
spinlock_acquire(&router->lock);
slave = router->slaves;
while (slave)
{
if (slave->state != BLRS_DUMPING)
{
slave = slave->next;
continue;
}
/* Notify a slave that has CS_WAIT_DATA bit set */
if (blr_notify_waiting_slave(slave))
{
notified++;
}
slave = slave->next;
}
spinlock_release(&router->lock);
if (notified > 0)
{
MXS_BEBUG("Notified %d slaves about new data.", notified);
}
}

View File

@ -60,6 +60,7 @@
* 23/10/2015 Markus Makela Added current_safe_event
* 09/05/2016 Massimiliano Pinto Added SELECT USER()
* 11/07/2016 Massimiliano Pinto Added SSL backend support
* 24/08/2016 Massimiliano Pinto Added slave notification via CS_WAIT_DATA
*
* @endverbatim
*/
@ -2111,15 +2112,9 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue
slave->serverid,
slave->binlogfile, (unsigned long)slave->binlog_pos);
if (slave->binlog_pos != router->binlog_position ||
strcmp(slave->binlogfile, router->binlog_name) != 0)
{
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_UPTODATE;
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
}
/* force the slave to call catchup routine */
poll_fake_write_event(slave->dcb);
return rval;
}
@ -2489,18 +2484,16 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
/* force slave to read events via catchup routine */
poll_fake_write_event(slave->dcb);
}
else if (slave->binlog_pos == router->binlog_position &&
strcmp(slave->binlogfile, router->binlog_name) == 0)
{
int state_change = 0;
unsigned int cstate = 0;
spinlock_acquire(&router->binlog_lock);
spinlock_acquire(&slave->catch_lock);
cstate = slave->cstate;
/*
* Now check again since we hold the router->binlog_lock
* and slave->catch_lock.
@ -2508,69 +2501,23 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
if (slave->binlog_pos != router->binlog_position ||
strcmp(slave->binlogfile, router->binlog_name) != 0)
{
slave->cstate &= ~CS_UPTODATE;
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
spinlock_release(&router->binlog_lock);
if ((cstate & CS_UPTODATE) == CS_UPTODATE)
{
#ifdef STATE_CHANGE_LOGGING_ENABLED
MXS_NOTICE("%s: Slave %s:%d, server-id %d transition from up-to-date to "
"catch-up in blr_slave_catchup, binlog file '%s', position %lu.",
router->service->name,
slave->dcb->remote,
ntohs((slave->dcb->ipv4).sin_port),
slave->serverid,
slave->binlogfile, (unsigned long)slave->binlog_pos);
#endif
}
/* force slave to read events via catchup routine */
poll_fake_write_event(slave->dcb);
}
else
{
if ((slave->cstate & CS_UPTODATE) == 0)
{
slave->stats.n_upd++;
slave->cstate |= CS_UPTODATE;
spinlock_release(&slave->catch_lock);
spinlock_release(&router->binlog_lock);
state_change = 1;
}
else
{
MXS_NOTICE("Execution entered branch were locks previously were NOT "
"released. Previously this would have caused a lock-up.");
spinlock_release(&slave->catch_lock);
spinlock_release(&router->binlog_lock);
}
}
/* set the CS_WAIT_DATA that allows notification
* when new events are received form master server
* call back routine will be called later.
*/
slave->cstate |= CS_WAIT_DATA;
if (state_change)
{
slave->stats.n_caughtup++;
#ifdef STATE_CHANGE_LOGGING_ENABLED
// TODO: The % 50 should be removed. Now only every 50th state change is logged.
if (slave->stats.n_caughtup == 1)
{
MXS_NOTICE("%s: Slave %s:%d, server-id %d is now up to date '%s', position %lu.",
router->service->name,
slave->dcb->remote,
ntohs((slave->dcb->ipv4).sin_port),
slave->serverid,
slave->binlogfile, (unsigned long)slave->binlog_pos);
}
else if ((slave->stats.n_caughtup % 50) == 0)
{
MXS_NOTICE("%s: Slave %s:%d, server-id %d is up to date '%s', position %lu.",
router->service->name,
slave->dcb->remote,
ntohs((slave->dcb->ipv4).sin_port),
slave->serverid,
slave->binlogfile, (unsigned long)slave->binlog_pos);
}
#endif
spinlock_release(&slave->catch_lock);
spinlock_release(&router->binlog_lock);
}
}
else
@ -2649,7 +2596,6 @@ blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data)
{
ROUTER_SLAVE *slave = (ROUTER_SLAVE *)data;
ROUTER_INSTANCE *router = slave->router;
unsigned int cstate;
if (NULL == dcb->session->router_session)
{
@ -2670,25 +2616,12 @@ blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data)
spinlock_release(&slave->catch_lock);
return 0;
}
cstate = slave->cstate;
slave->cstate &= ~(CS_UPTODATE | CS_EXPECTCB);
slave->cstate &= ~(CS_EXPECTCB);
slave->cstate |= CS_BUSY;
spinlock_release(&slave->catch_lock);
if ((cstate & CS_UPTODATE) == CS_UPTODATE)
{
#ifdef STATE_CHANGE_LOGGING_ENABLED
MXS_NOTICE("%s: Slave %s:%d, server-id %d transition from up-to-date to "
"catch-up in blr_slave_callback, binlog file '%s', position %lu.",
router->service->name,
slave->dcb->remote,
ntohs((slave->dcb->ipv4).sin_port),
slave->serverid,
slave->binlogfile, (unsigned long)slave->binlog_pos);
#endif
}
slave->stats.n_dcb++;
blr_slave_catchup(router, slave, true);
}
else
@ -5609,3 +5542,26 @@ blr_set_master_ssl(ROUTER_INSTANCE *router, CHANGE_MASTER_OPTIONS config, char *
else
return 0;
}
/**
* Notify a waiting slave that new events are stored in binlog file
*
* @param slave Current connected slave
* @return true if slave has been notified
*
*/
bool blr_notify_waiting_slave(ROUTER_SLAVE *slave)
{
bool ret = false;
spinlock_acquire(&slave->catch_lock);
if (slave->cstate & CS_WAIT_DATA)
{
ret = true;
/* Add fake event that will call the blr_slave_callback routine */
poll_fake_write_event(slave->dcb);
slave->cstate &= ~CS_WAIT_DATA;
}
spinlock_release(&slave->catch_lock);
return ret;
}