Changed from router->binlog_lock to router->lock for transaction safety code and cleaned up code.
This commit is contained in:
@ -47,6 +47,7 @@
|
||||
* saved master responses
|
||||
* 23/08/2015 Massimiliano Pinto Added strerror_r
|
||||
* 30/09/2015 Massimiliano Pinto Addition of send_slave_heartbeat option
|
||||
* 23/10/2015 Markus Makela Added current_safe_event
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -509,7 +510,7 @@ char task_name[BLRM_TASK_NAME_LEN+1] = "";
|
||||
|
||||
inst->binlog_position = 0;
|
||||
inst->current_pos = 0;
|
||||
inst->current_safe_event = 0;
|
||||
inst->current_safe_event = 0;
|
||||
|
||||
strcpy(inst->binlog_name, "");
|
||||
strcpy(inst->prevbinlog, "");
|
||||
|
@ -37,6 +37,7 @@
|
||||
* This is the current supported condition for detecting
|
||||
* MariaDB 10 transaction start point.
|
||||
* It's no longer using QUERY_EVENT with BEGIN
|
||||
* 23/10/15 Markus Makela Added current_safe_event
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -199,7 +200,7 @@ unsigned char magic[] = BINLOG_MAGIC;
|
||||
write(fd, magic, 4);
|
||||
router->current_pos = 4; /* Initial position after the magic number */
|
||||
router->binlog_position = 4; /* Initial position after the magic number */
|
||||
router->current_safe_event = 4;
|
||||
router->current_safe_event = 4;
|
||||
}
|
||||
|
||||
|
||||
@ -853,7 +854,7 @@ double average_bytes = 0;
|
||||
|
||||
router->current_pos = 4;
|
||||
router->binlog_position = 4;
|
||||
router->current_safe_event = 4;
|
||||
router->current_safe_event = 4;
|
||||
|
||||
while (1){
|
||||
|
||||
@ -951,7 +952,7 @@ double average_bytes = 0;
|
||||
/* any error */
|
||||
if (n != 0) {
|
||||
router->binlog_position = last_known_commit;
|
||||
router->current_safe_event = last_known_commit;
|
||||
router->current_safe_event = last_known_commit;
|
||||
router->current_pos = pos;
|
||||
|
||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||
@ -971,7 +972,7 @@ double average_bytes = 0;
|
||||
return 1;
|
||||
} else {
|
||||
router->binlog_position = pos;
|
||||
router->current_safe_event = pos;
|
||||
router->current_safe_event = pos;
|
||||
router->current_pos = pos;
|
||||
|
||||
return 0;
|
||||
@ -1014,7 +1015,7 @@ double average_bytes = 0;
|
||||
if (event_error) {
|
||||
|
||||
router->binlog_position = last_known_commit;
|
||||
router->current_safe_event = last_known_commit;
|
||||
router->current_safe_event = last_known_commit;
|
||||
router->current_pos = pos;
|
||||
|
||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||
@ -1456,7 +1457,7 @@ double average_bytes = 0;
|
||||
last_known_commit)));
|
||||
|
||||
router->binlog_position = last_known_commit;
|
||||
router->current_safe_event = last_known_commit;
|
||||
router->current_safe_event = last_known_commit;
|
||||
router->current_pos = pos;
|
||||
router->pending_transaction = 1;
|
||||
|
||||
|
@ -47,6 +47,7 @@
|
||||
* MariaDB 10 transaction start point.
|
||||
* It's no longer using QUERY_EVENT with BEGIN
|
||||
* 25/09/2015 Massimiliano Pinto Addition of lastEventReceived for slaves
|
||||
* 23/10/15 Markus Makela Added current_safe_event
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -1061,12 +1062,12 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) {
|
||||
/* no pending transaction: set current_pos to binlog_position */
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
router->binlog_position = router->current_pos;
|
||||
router->current_safe_event = router->current_pos;
|
||||
router->current_safe_event = router->current_pos;
|
||||
|
||||
spinlock_release(&router->binlog_lock);
|
||||
spinlock_release(&router->lock);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1281,12 +1282,12 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
*/
|
||||
|
||||
if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) {
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
router->binlog_position = router->current_pos;
|
||||
router->current_safe_event = router->current_pos;
|
||||
router->current_safe_event = router->current_pos;
|
||||
|
||||
spinlock_release(&router->binlog_lock);
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
/* Now distribute events */
|
||||
blr_distribute_binlog_record(router, &hdr, ptr);
|
||||
@ -1311,11 +1312,11 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
REP_HEADER new_hdr;
|
||||
int i=0;
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
pos = router->binlog_position;
|
||||
|
||||
spinlock_release(&router->binlog_lock);
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
|
||||
while ((record = blr_read_events_from_pos(router, pos, &new_hdr)) != NULL) {
|
||||
@ -1325,18 +1326,18 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
/* distribute event */
|
||||
blr_distribute_binlog_record(router, &new_hdr, raw_data);
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
/** The current safe position is only updated
|
||||
* if it points to the event we just distributed. */
|
||||
if(router->current_safe_event == pos)
|
||||
{
|
||||
router->current_safe_event = new_hdr.next_pos;
|
||||
}
|
||||
/** The current safe position is only updated
|
||||
* if it points to the event we just distributed. */
|
||||
if(router->current_safe_event == pos)
|
||||
{
|
||||
router->current_safe_event = new_hdr.next_pos;
|
||||
}
|
||||
|
||||
pos = new_hdr.next_pos;
|
||||
|
||||
spinlock_release(&router->binlog_lock);
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
gwbuf_free(record);
|
||||
}
|
||||
@ -1590,6 +1591,14 @@ MYSQL_session *auth_info;
|
||||
return auth_info;
|
||||
}
|
||||
|
||||
/** Actions that can be taken when an event is being distributed to the slaves*/
|
||||
typedef enum
|
||||
{
|
||||
SLAVE_SEND_EVENT, /*< Send the event to the slave */
|
||||
SLAVE_FORCE_CATCHUP, /*< Force the slave into catchup mode */
|
||||
SLAVE_EVENT_ALREADY_SENT /*< The slave already has the event, don't send it */
|
||||
} slave_event_action_t;
|
||||
|
||||
/**
|
||||
* Distribute the binlog record we have just received to all the registered slaves.
|
||||
*
|
||||
@ -1605,10 +1614,6 @@ uint8_t *buf;
|
||||
ROUTER_SLAVE *slave, *nextslave;
|
||||
int action;
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
uint64_t current_safe_event = router->current_safe_event;
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
slave = router->slaves;
|
||||
while (slave)
|
||||
@ -1647,32 +1652,30 @@ int action;
|
||||
slave->stats.n_actions[action-1]++;
|
||||
spinlock_release(&slave->catch_lock);
|
||||
|
||||
bool sendevent = false;
|
||||
bool forcecatchup = false;
|
||||
bool alreadysent = false;
|
||||
|
||||
if (action == 1)
|
||||
{
|
||||
if(router->trx_safe && slave->binlog_pos == current_safe_event &&
|
||||
(strcmp(slave->binlogfile, router->binlog_name) == 0 ||
|
||||
slave_event_action_t slave_action = SLAVE_FORCE_CATCHUP;
|
||||
|
||||
if(router->trx_safe && slave->binlog_pos == router->current_safe_event &&
|
||||
(strcmp(slave->binlogfile, router->binlog_name) == 0 ||
|
||||
(hdr->event_type == ROTATE_EVENT &&
|
||||
strcmp(slave->binlogfile, router->prevbinlog))))
|
||||
{
|
||||
/**
|
||||
* Slave needs the current event being distributed
|
||||
*/
|
||||
sendevent = true;
|
||||
}
|
||||
{
|
||||
/**
|
||||
* Slave needs the current event being distributed
|
||||
*/
|
||||
slave_action = SLAVE_SEND_EVENT;
|
||||
}
|
||||
else if (slave->binlog_pos == router->last_written &&
|
||||
(strcmp(slave->binlogfile, router->binlog_name) == 0 ||
|
||||
(hdr->event_type == ROTATE_EVENT &&
|
||||
strcmp(slave->binlogfile, router->prevbinlog))))
|
||||
{
|
||||
/**
|
||||
* Transaction safety is off or there are no pending transactions
|
||||
*/
|
||||
/**
|
||||
* Transaction safety is off or there are no pending transactions
|
||||
*/
|
||||
|
||||
sendevent = true;
|
||||
slave_action = SLAVE_SEND_EVENT;
|
||||
}
|
||||
else if (slave->binlog_pos == hdr->next_pos
|
||||
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
|
||||
@ -1681,7 +1684,7 @@ int action;
|
||||
* Slave has already read record from file, no
|
||||
* need to distrbute this event
|
||||
*/
|
||||
alreadysent = true;
|
||||
slave_action = SLAVE_EVENT_ALREADY_SENT;
|
||||
}
|
||||
else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size)
|
||||
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
|
||||
@ -1697,77 +1700,76 @@ int action;
|
||||
slave->serverid, slave->binlogfile,
|
||||
(unsigned long)slave->binlog_pos,
|
||||
hdr->next_pos - hdr->event_size)));
|
||||
forcecatchup = true;
|
||||
}
|
||||
else
|
||||
|
||||
/*
|
||||
* If slave_action is SLAVE_FORCE_CATCHUP then
|
||||
* the slave is not at the position it should be. Force it into
|
||||
* catchup mode rather than send this event.
|
||||
*/
|
||||
|
||||
switch(slave_action)
|
||||
{
|
||||
/*
|
||||
* The slave is not at the position it should be. Force it into
|
||||
* catchup mode rather than send this event.
|
||||
*/
|
||||
forcecatchup = true;
|
||||
}
|
||||
case SLAVE_SEND_EVENT:
|
||||
/*
|
||||
* The slave should be up to date, check that the binlog
|
||||
* position matches the event we have to distribute or
|
||||
* this is a rotate event. Send the event directly from
|
||||
* memory to the slave.
|
||||
*/
|
||||
slave->lastEventTimestamp = hdr->timestamp;
|
||||
slave->lastEventReceived = hdr->event_type;
|
||||
|
||||
if(sendevent)
|
||||
{
|
||||
/*
|
||||
* The slave should be up to date, check that the binlog
|
||||
* position matches the event we have to distribute or
|
||||
* this is a rotate event. Send the event directly from
|
||||
* memory to the slave.
|
||||
*/
|
||||
slave->lastEventTimestamp = hdr->timestamp;
|
||||
slave->lastEventReceived = hdr->event_type;
|
||||
/* set lastReply */
|
||||
if (router->send_slave_heartbeat)
|
||||
slave->lastReply = time(0);
|
||||
|
||||
/* set lastReply */
|
||||
if (router->send_slave_heartbeat)
|
||||
slave->lastReply = time(0);
|
||||
pkt = gwbuf_alloc(hdr->event_size + 5);
|
||||
buf = GWBUF_DATA(pkt);
|
||||
encode_value(buf, hdr->event_size + 1, 24);
|
||||
buf += 3;
|
||||
*buf++ = slave->seqno++;
|
||||
*buf++ = 0; // OK
|
||||
memcpy(buf, ptr, hdr->event_size);
|
||||
if (hdr->event_type == ROTATE_EVENT)
|
||||
{
|
||||
blr_slave_rotate(router, slave, ptr);
|
||||
}
|
||||
slave->stats.n_bytes += gwbuf_length(pkt);
|
||||
slave->stats.n_events++;
|
||||
slave->dcb->func.write(slave->dcb, pkt);
|
||||
if (hdr->event_type != ROTATE_EVENT)
|
||||
{
|
||||
slave->binlog_pos = hdr->next_pos;
|
||||
}
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
if (slave->overrun)
|
||||
{
|
||||
slave->stats.n_overrun++;
|
||||
slave->overrun = 0;
|
||||
poll_fake_write_event(slave->dcb);
|
||||
}
|
||||
else
|
||||
{
|
||||
slave->cstate &= ~CS_BUSY;
|
||||
}
|
||||
spinlock_release(&slave->catch_lock);
|
||||
break;
|
||||
|
||||
pkt = gwbuf_alloc(hdr->event_size + 5);
|
||||
buf = GWBUF_DATA(pkt);
|
||||
encode_value(buf, hdr->event_size + 1, 24);
|
||||
buf += 3;
|
||||
*buf++ = slave->seqno++;
|
||||
*buf++ = 0; // OK
|
||||
memcpy(buf, ptr, hdr->event_size);
|
||||
if (hdr->event_type == ROTATE_EVENT)
|
||||
{
|
||||
blr_slave_rotate(router, slave, ptr);
|
||||
}
|
||||
slave->stats.n_bytes += gwbuf_length(pkt);
|
||||
slave->stats.n_events++;
|
||||
slave->dcb->func.write(slave->dcb, pkt);
|
||||
if (hdr->event_type != ROTATE_EVENT)
|
||||
{
|
||||
slave->binlog_pos = hdr->next_pos;
|
||||
}
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
if (slave->overrun)
|
||||
{
|
||||
slave->stats.n_overrun++;
|
||||
slave->overrun = 0;
|
||||
poll_fake_write_event(slave->dcb);
|
||||
}
|
||||
else
|
||||
{
|
||||
case SLAVE_EVENT_ALREADY_SENT:
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
slave->cstate &= ~CS_BUSY;
|
||||
}
|
||||
spinlock_release(&slave->catch_lock);
|
||||
spinlock_release(&slave->catch_lock);
|
||||
break;
|
||||
|
||||
case SLAVE_FORCE_CATCHUP:
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
slave->cstate &= ~(CS_UPTODATE|CS_BUSY);
|
||||
slave->cstate |= CS_EXPECTCB;
|
||||
spinlock_release(&slave->catch_lock);
|
||||
poll_fake_write_event(slave->dcb);
|
||||
break;
|
||||
}
|
||||
else if (alreadysent)
|
||||
{
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
slave->cstate &= ~CS_BUSY;
|
||||
spinlock_release(&slave->catch_lock);
|
||||
}
|
||||
else if (forcecatchup)
|
||||
{
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
slave->cstate &= ~(CS_UPTODATE|CS_BUSY);
|
||||
slave->cstate |= CS_EXPECTCB;
|
||||
spinlock_release(&slave->catch_lock);
|
||||
poll_fake_write_event(slave->dcb);
|
||||
}
|
||||
}
|
||||
else if (action == 3)
|
||||
{
|
||||
|
@ -58,6 +58,7 @@
|
||||
* 25/09/2015 Massimiliano Pinto Addition of slave heartbeat:
|
||||
* the period set during registration is checked
|
||||
* and heartbeat event might be sent to the affected slave.
|
||||
* 23/10/15 Markus Makela Added current_safe_event
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -2925,7 +2926,7 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
|
||||
router->master_state = BLRM_UNCONNECTED;
|
||||
router->current_pos = 4;
|
||||
router->binlog_position = 4;
|
||||
router->current_safe_event = 4;
|
||||
router->current_safe_event = 4;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
@ -3226,7 +3227,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error
|
||||
|
||||
router->current_pos = 4;
|
||||
router->binlog_position = 4;
|
||||
router->current_safe_event = 4;
|
||||
router->current_safe_event = 4;
|
||||
|
||||
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "%s: New MASTER_LOG_FILE is [%s]",
|
||||
router->service->name,
|
||||
@ -3284,7 +3285,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error
|
||||
if (router->master_state == BLRM_UNCONFIGURED) {
|
||||
router->current_pos = 4;
|
||||
router->binlog_position = 4;
|
||||
router->current_safe_event = 4;
|
||||
router->current_safe_event = 4;
|
||||
memset(router->binlog_name, '\0', sizeof(router->binlog_name));
|
||||
strncpy(router->binlog_name, master_logfile, BINLOG_FNAMELEN);
|
||||
|
||||
@ -3553,7 +3554,7 @@ blr_master_set_empty_config(ROUTER_INSTANCE *router) {
|
||||
|
||||
router->current_pos = 4;
|
||||
router->binlog_position = 4;
|
||||
router->current_safe_event = 4;
|
||||
router->current_safe_event = 4;
|
||||
strcpy(router->binlog_name, "");
|
||||
}
|
||||
|
||||
@ -3569,7 +3570,7 @@ blr_master_apply_config(ROUTER_INSTANCE *router, MASTER_SERVER_CFG *prev_master)
|
||||
server_update_port(router->service->dbref->server, prev_master->port);
|
||||
router->current_pos = prev_master->pos;
|
||||
router->binlog_position = prev_master->safe_pos;
|
||||
router->current_safe_event = prev_master->safe_pos;
|
||||
router->current_safe_event = prev_master->safe_pos;
|
||||
strcpy(router->binlog_name, prev_master->logfile);
|
||||
if (router->user) {
|
||||
free(router->user);
|
||||
|
Reference in New Issue
Block a user