Added a variable for current event being processed after a transaction is committed.
This commit is contained in:
@ -415,6 +415,8 @@ typedef struct router_instance {
|
|||||||
* file being written
|
* file being written
|
||||||
*/
|
*/
|
||||||
uint64_t last_written; /*< Position of last event written */
|
uint64_t last_written; /*< Position of last event written */
|
||||||
|
uint64_t current_safe_event;
|
||||||
|
/*< Position of the latest safe eventbeing sent to slaves */
|
||||||
char prevbinlog[BINLOG_FNAMELEN+1];
|
char prevbinlog[BINLOG_FNAMELEN+1];
|
||||||
int rotating; /*< Rotation in progress flag */
|
int rotating; /*< Rotation in progress flag */
|
||||||
BLFILE *files; /*< Files used by the slaves */
|
BLFILE *files; /*< Files used by the slaves */
|
||||||
|
|||||||
@ -509,6 +509,7 @@ char task_name[BLRM_TASK_NAME_LEN+1] = "";
|
|||||||
|
|
||||||
inst->binlog_position = 0;
|
inst->binlog_position = 0;
|
||||||
inst->current_pos = 0;
|
inst->current_pos = 0;
|
||||||
|
inst->current_safe_event = 0;
|
||||||
|
|
||||||
strcpy(inst->binlog_name, "");
|
strcpy(inst->binlog_name, "");
|
||||||
strcpy(inst->prevbinlog, "");
|
strcpy(inst->prevbinlog, "");
|
||||||
|
|||||||
@ -199,6 +199,7 @@ unsigned char magic[] = BINLOG_MAGIC;
|
|||||||
write(fd, magic, 4);
|
write(fd, magic, 4);
|
||||||
router->current_pos = 4; /* Initial position after the magic number */
|
router->current_pos = 4; /* Initial position after the magic number */
|
||||||
router->binlog_position = 4; /* Initial position after the magic number */
|
router->binlog_position = 4; /* Initial position after the magic number */
|
||||||
|
router->current_safe_event = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -852,6 +853,7 @@ double average_bytes = 0;
|
|||||||
|
|
||||||
router->current_pos = 4;
|
router->current_pos = 4;
|
||||||
router->binlog_position = 4;
|
router->binlog_position = 4;
|
||||||
|
router->current_safe_event = 4;
|
||||||
|
|
||||||
while (1){
|
while (1){
|
||||||
|
|
||||||
@ -934,6 +936,7 @@ double average_bytes = 0;
|
|||||||
|
|
||||||
if (pending_transaction) {
|
if (pending_transaction) {
|
||||||
router->binlog_position = last_known_commit;
|
router->binlog_position = last_known_commit;
|
||||||
|
router->current_safe_event = last_known_commit;
|
||||||
router->current_pos = pos;
|
router->current_pos = pos;
|
||||||
router->pending_transaction = 1;
|
router->pending_transaction = 1;
|
||||||
pending_transaction = 0;
|
pending_transaction = 0;
|
||||||
@ -948,6 +951,7 @@ double average_bytes = 0;
|
|||||||
/* any error */
|
/* any error */
|
||||||
if (n != 0) {
|
if (n != 0) {
|
||||||
router->binlog_position = last_known_commit;
|
router->binlog_position = last_known_commit;
|
||||||
|
router->current_safe_event = last_known_commit;
|
||||||
router->current_pos = pos;
|
router->current_pos = pos;
|
||||||
|
|
||||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||||
@ -967,6 +971,7 @@ double average_bytes = 0;
|
|||||||
return 1;
|
return 1;
|
||||||
} else {
|
} else {
|
||||||
router->binlog_position = pos;
|
router->binlog_position = pos;
|
||||||
|
router->current_safe_event = pos;
|
||||||
router->current_pos = pos;
|
router->current_pos = pos;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
@ -1009,6 +1014,7 @@ double average_bytes = 0;
|
|||||||
if (event_error) {
|
if (event_error) {
|
||||||
|
|
||||||
router->binlog_position = last_known_commit;
|
router->binlog_position = last_known_commit;
|
||||||
|
router->current_safe_event = last_known_commit;
|
||||||
router->current_pos = pos;
|
router->current_pos = pos;
|
||||||
|
|
||||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||||
@ -1039,6 +1045,7 @@ double average_bytes = 0;
|
|||||||
hdr.event_size, pos)));
|
hdr.event_size, pos)));
|
||||||
|
|
||||||
router->binlog_position = last_known_commit;
|
router->binlog_position = last_known_commit;
|
||||||
|
router->current_safe_event = last_known_commit;
|
||||||
router->current_pos = pos;
|
router->current_pos = pos;
|
||||||
|
|
||||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||||
@ -1067,6 +1074,7 @@ double average_bytes = 0;
|
|||||||
hdr.event_size, pos)));
|
hdr.event_size, pos)));
|
||||||
|
|
||||||
router->binlog_position = last_known_commit;
|
router->binlog_position = last_known_commit;
|
||||||
|
router->current_safe_event = last_known_commit;
|
||||||
router->current_pos = pos;
|
router->current_pos = pos;
|
||||||
|
|
||||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||||
@ -1123,6 +1131,7 @@ double average_bytes = 0;
|
|||||||
gwbuf_free(result);
|
gwbuf_free(result);
|
||||||
|
|
||||||
router->binlog_position = last_known_commit;
|
router->binlog_position = last_known_commit;
|
||||||
|
router->current_safe_event = last_known_commit;
|
||||||
router->current_pos = pos;
|
router->current_pos = pos;
|
||||||
|
|
||||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||||
@ -1367,6 +1376,7 @@ double average_bytes = 0;
|
|||||||
pos)));
|
pos)));
|
||||||
|
|
||||||
router->binlog_position = last_known_commit;
|
router->binlog_position = last_known_commit;
|
||||||
|
router->current_safe_event = last_known_commit;
|
||||||
router->current_pos = pos;
|
router->current_pos = pos;
|
||||||
|
|
||||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||||
@ -1396,6 +1406,7 @@ double average_bytes = 0;
|
|||||||
pos)));
|
pos)));
|
||||||
|
|
||||||
router->binlog_position = last_known_commit;
|
router->binlog_position = last_known_commit;
|
||||||
|
router->current_safe_event = last_known_commit;
|
||||||
router->current_pos = pos;
|
router->current_pos = pos;
|
||||||
|
|
||||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||||
@ -1445,6 +1456,7 @@ double average_bytes = 0;
|
|||||||
last_known_commit)));
|
last_known_commit)));
|
||||||
|
|
||||||
router->binlog_position = last_known_commit;
|
router->binlog_position = last_known_commit;
|
||||||
|
router->current_safe_event = last_known_commit;
|
||||||
router->current_pos = pos;
|
router->current_pos = pos;
|
||||||
router->pending_transaction = 1;
|
router->pending_transaction = 1;
|
||||||
|
|
||||||
@ -1456,6 +1468,7 @@ double average_bytes = 0;
|
|||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
router->binlog_position = pos;
|
router->binlog_position = pos;
|
||||||
|
router->current_safe_event = pos;
|
||||||
router->current_pos = pos;
|
router->current_pos = pos;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|||||||
@ -1064,6 +1064,7 @@ int n_bufs = -1, pn_bufs = -1;
|
|||||||
spinlock_acquire(&router->binlog_lock);
|
spinlock_acquire(&router->binlog_lock);
|
||||||
|
|
||||||
router->binlog_position = router->current_pos;
|
router->binlog_position = router->current_pos;
|
||||||
|
router->current_safe_event = router->current_pos;
|
||||||
|
|
||||||
spinlock_release(&router->binlog_lock);
|
spinlock_release(&router->binlog_lock);
|
||||||
}
|
}
|
||||||
@ -1283,6 +1284,7 @@ int n_bufs = -1, pn_bufs = -1;
|
|||||||
spinlock_acquire(&router->binlog_lock);
|
spinlock_acquire(&router->binlog_lock);
|
||||||
|
|
||||||
router->binlog_position = router->current_pos;
|
router->binlog_position = router->current_pos;
|
||||||
|
router->current_safe_event = router->current_pos;
|
||||||
|
|
||||||
spinlock_release(&router->binlog_lock);
|
spinlock_release(&router->binlog_lock);
|
||||||
|
|
||||||
@ -1325,6 +1327,13 @@ int n_bufs = -1, pn_bufs = -1;
|
|||||||
|
|
||||||
spinlock_acquire(&router->binlog_lock);
|
spinlock_acquire(&router->binlog_lock);
|
||||||
|
|
||||||
|
/** The current safe position is only updated
|
||||||
|
* if it points to the event we just distributed. */
|
||||||
|
if(router->current_safe_event == pos)
|
||||||
|
{
|
||||||
|
router->current_safe_event = new_hdr.next_pos;
|
||||||
|
}
|
||||||
|
|
||||||
pos = new_hdr.next_pos;
|
pos = new_hdr.next_pos;
|
||||||
|
|
||||||
spinlock_release(&router->binlog_lock);
|
spinlock_release(&router->binlog_lock);
|
||||||
@ -1596,6 +1605,10 @@ uint8_t *buf;
|
|||||||
ROUTER_SLAVE *slave, *nextslave;
|
ROUTER_SLAVE *slave, *nextslave;
|
||||||
int action;
|
int action;
|
||||||
|
|
||||||
|
spinlock_acquire(&router->binlog_lock);
|
||||||
|
uint64_t current_safe_event = router->current_safe_event;
|
||||||
|
spinlock_release(&router->binlog_lock);
|
||||||
|
|
||||||
spinlock_acquire(&router->lock);
|
spinlock_acquire(&router->lock);
|
||||||
slave = router->slaves;
|
slave = router->slaves;
|
||||||
while (slave)
|
while (slave)
|
||||||
@ -1634,13 +1647,69 @@ int action;
|
|||||||
slave->stats.n_actions[action-1]++;
|
slave->stats.n_actions[action-1]++;
|
||||||
spinlock_release(&slave->catch_lock);
|
spinlock_release(&slave->catch_lock);
|
||||||
|
|
||||||
|
bool sendevent = false;
|
||||||
|
bool forcecatchup = false;
|
||||||
|
bool alreadysent = false;
|
||||||
|
|
||||||
if (action == 1)
|
if (action == 1)
|
||||||
{
|
{
|
||||||
if (slave->binlog_pos <= router->last_written &&
|
if(router->trx_safe && slave->binlog_pos == current_safe_event &&
|
||||||
|
(strcmp(slave->binlogfile, router->binlog_name) == 0 ||
|
||||||
|
(hdr->event_type == ROTATE_EVENT &&
|
||||||
|
strcmp(slave->binlogfile, router->prevbinlog))))
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Slave needs the current event being distributed
|
||||||
|
*/
|
||||||
|
sendevent = true;
|
||||||
|
}
|
||||||
|
else if (slave->binlog_pos == router->last_written &&
|
||||||
(strcmp(slave->binlogfile, router->binlog_name) == 0 ||
|
(strcmp(slave->binlogfile, router->binlog_name) == 0 ||
|
||||||
(hdr->event_type == ROTATE_EVENT &&
|
(hdr->event_type == ROTATE_EVENT &&
|
||||||
strcmp(slave->binlogfile, router->prevbinlog))))
|
strcmp(slave->binlogfile, router->prevbinlog))))
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* Transaction safety is off or there are no pending transactions
|
||||||
|
*/
|
||||||
|
|
||||||
|
sendevent = true;
|
||||||
|
}
|
||||||
|
else if (slave->binlog_pos == hdr->next_pos
|
||||||
|
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Slave has already read record from file, no
|
||||||
|
* need to distrbute this event
|
||||||
|
*/
|
||||||
|
alreadysent = true;
|
||||||
|
}
|
||||||
|
else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size)
|
||||||
|
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The slave is ahead of the master, this should never
|
||||||
|
* happen. Force the slave to catchup mode in order to
|
||||||
|
* try to resolve the issue.
|
||||||
|
*/
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||||
|
"Slave %d is ahead of expected position %s@%d. "
|
||||||
|
"Expected position %d",
|
||||||
|
slave->serverid, slave->binlogfile,
|
||||||
|
(unsigned long)slave->binlog_pos,
|
||||||
|
hdr->next_pos - hdr->event_size)));
|
||||||
|
forcecatchup = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The slave is not at the position it should be. Force it into
|
||||||
|
* catchup mode rather than send this event.
|
||||||
|
*/
|
||||||
|
forcecatchup = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(sendevent)
|
||||||
|
{
|
||||||
/*
|
/*
|
||||||
* The slave should be up to date, check that the binlog
|
* The slave should be up to date, check that the binlog
|
||||||
* position matches the event we have to distribute or
|
* position matches the event we have to distribute or
|
||||||
@ -1685,49 +1754,20 @@ int action;
|
|||||||
}
|
}
|
||||||
spinlock_release(&slave->catch_lock);
|
spinlock_release(&slave->catch_lock);
|
||||||
}
|
}
|
||||||
else if (slave->binlog_pos == hdr->next_pos
|
else if (alreadysent)
|
||||||
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
|
{
|
||||||
{
|
spinlock_acquire(&slave->catch_lock);
|
||||||
/*
|
|
||||||
* Slave has already read record from file, no
|
|
||||||
* need to distrbute this event
|
|
||||||
*/
|
|
||||||
spinlock_acquire(&slave->catch_lock);
|
|
||||||
slave->cstate &= ~CS_BUSY;
|
slave->cstate &= ~CS_BUSY;
|
||||||
spinlock_release(&slave->catch_lock);
|
spinlock_release(&slave->catch_lock);
|
||||||
}
|
}
|
||||||
else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size)
|
else if (forcecatchup)
|
||||||
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
|
{
|
||||||
{
|
spinlock_acquire(&slave->catch_lock);
|
||||||
/*
|
|
||||||
* The slave is ahead of the master, this should never
|
|
||||||
* happen. Force the slave to catchup mode in order to
|
|
||||||
* try to resolve the issue.
|
|
||||||
*/
|
|
||||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
|
||||||
"Slave %d is ahead of expected position %s@%d. "
|
|
||||||
"Expected position %d",
|
|
||||||
slave->serverid, slave->binlogfile,
|
|
||||||
(unsigned long)slave->binlog_pos,
|
|
||||||
hdr->next_pos - hdr->event_size)));
|
|
||||||
spinlock_acquire(&slave->catch_lock);
|
|
||||||
slave->cstate &= ~(CS_UPTODATE|CS_BUSY);
|
slave->cstate &= ~(CS_UPTODATE|CS_BUSY);
|
||||||
slave->cstate |= CS_EXPECTCB;
|
slave->cstate |= CS_EXPECTCB;
|
||||||
spinlock_release(&slave->catch_lock);
|
spinlock_release(&slave->catch_lock);
|
||||||
poll_fake_write_event(slave->dcb);
|
poll_fake_write_event(slave->dcb);
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* The slave is not at the position it should be. Force it into
|
|
||||||
* catchup mode rather than send this event.
|
|
||||||
*/
|
|
||||||
spinlock_acquire(&slave->catch_lock);
|
|
||||||
slave->cstate &= ~(CS_UPTODATE|CS_BUSY);
|
|
||||||
slave->cstate |= CS_EXPECTCB;
|
|
||||||
spinlock_release(&slave->catch_lock);
|
|
||||||
poll_fake_write_event(slave->dcb);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else if (action == 3)
|
else if (action == 3)
|
||||||
{
|
{
|
||||||
|
|||||||
@ -2925,6 +2925,7 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
|
|||||||
router->master_state = BLRM_UNCONNECTED;
|
router->master_state = BLRM_UNCONNECTED;
|
||||||
router->current_pos = 4;
|
router->current_pos = 4;
|
||||||
router->binlog_position = 4;
|
router->binlog_position = 4;
|
||||||
|
router->current_safe_event = 4;
|
||||||
|
|
||||||
spinlock_release(&router->lock);
|
spinlock_release(&router->lock);
|
||||||
|
|
||||||
@ -3225,6 +3226,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error
|
|||||||
|
|
||||||
router->current_pos = 4;
|
router->current_pos = 4;
|
||||||
router->binlog_position = 4;
|
router->binlog_position = 4;
|
||||||
|
router->current_safe_event = 4;
|
||||||
|
|
||||||
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "%s: New MASTER_LOG_FILE is [%s]",
|
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "%s: New MASTER_LOG_FILE is [%s]",
|
||||||
router->service->name,
|
router->service->name,
|
||||||
@ -3282,6 +3284,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error
|
|||||||
if (router->master_state == BLRM_UNCONFIGURED) {
|
if (router->master_state == BLRM_UNCONFIGURED) {
|
||||||
router->current_pos = 4;
|
router->current_pos = 4;
|
||||||
router->binlog_position = 4;
|
router->binlog_position = 4;
|
||||||
|
router->current_safe_event = 4;
|
||||||
memset(router->binlog_name, '\0', sizeof(router->binlog_name));
|
memset(router->binlog_name, '\0', sizeof(router->binlog_name));
|
||||||
strncpy(router->binlog_name, master_logfile, BINLOG_FNAMELEN);
|
strncpy(router->binlog_name, master_logfile, BINLOG_FNAMELEN);
|
||||||
|
|
||||||
@ -3550,6 +3553,7 @@ blr_master_set_empty_config(ROUTER_INSTANCE *router) {
|
|||||||
|
|
||||||
router->current_pos = 4;
|
router->current_pos = 4;
|
||||||
router->binlog_position = 4;
|
router->binlog_position = 4;
|
||||||
|
router->current_safe_event = 4;
|
||||||
strcpy(router->binlog_name, "");
|
strcpy(router->binlog_name, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3565,6 +3569,7 @@ blr_master_apply_config(ROUTER_INSTANCE *router, MASTER_SERVER_CFG *prev_master)
|
|||||||
server_update_port(router->service->dbref->server, prev_master->port);
|
server_update_port(router->service->dbref->server, prev_master->port);
|
||||||
router->current_pos = prev_master->pos;
|
router->current_pos = prev_master->pos;
|
||||||
router->binlog_position = prev_master->safe_pos;
|
router->binlog_position = prev_master->safe_pos;
|
||||||
|
router->current_safe_event = prev_master->safe_pos;
|
||||||
strcpy(router->binlog_name, prev_master->logfile);
|
strcpy(router->binlog_name, prev_master->logfile);
|
||||||
if (router->user) {
|
if (router->user) {
|
||||||
free(router->user);
|
free(router->user);
|
||||||
|
|||||||
Reference in New Issue
Block a user