Merge '1.2.1-binlog_router_trx' into develop
This commit is contained in:
commit
5353353a3a
@ -181,9 +181,10 @@
|
||||
#define BLRM_MASTER_REGITRATION_QUERY_LEN 255
|
||||
|
||||
/* Read Binlog position states */
|
||||
#define SLAVE_POS_READ_OK 0x0
|
||||
#define SLAVE_POS_READ_OK 0x00
|
||||
#define SLAVE_POS_READ_ERR 0xff
|
||||
#define SLAVE_POS_READ_UNSAFE 0xfe
|
||||
|
||||
/**
|
||||
* Some useful macros for examining the MySQL Response packets
|
||||
*/
|
||||
|
@ -866,10 +866,10 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
|
||||
atomic_add(&router->stats.n_registered, -1);
|
||||
|
||||
if (slave->state > 0) {
|
||||
MXS_NOTICE("%s: Slave %s, server id %d, disconnected after %ld seconds. "
|
||||
"%d SQL commands, %d events sent (%lu bytes), binlog '%s', "
|
||||
MXS_NOTICE("%s: Slave %s:%d, server id %d, disconnected after %ld seconds. "
|
||||
"%d SQL commands, %d events sent (%lu bytes), binlog '%s', "
|
||||
"last position %lu",
|
||||
router->service->name, slave->dcb->remote,
|
||||
router->service->name, slave->dcb->remote, ntohs((slave->dcb->ipv4).sin_port),
|
||||
slave->serverid,
|
||||
time(0) - slave->connect_time,
|
||||
slave->stats.n_queries,
|
||||
|
@ -202,6 +202,7 @@ unsigned char magic[] = BINLOG_MAGIC;
|
||||
router->current_pos = 4; /* Initial position after the magic number */
|
||||
router->binlog_position = 4; /* Initial position after the magic number */
|
||||
router->current_safe_event = 4;
|
||||
router->last_written = 0;
|
||||
}
|
||||
|
||||
|
||||
@ -415,35 +416,54 @@ struct stat statb;
|
||||
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Invalid file pointer for requested binlog at position %lu", pos);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
spinlock_acquire(&file->lock);
|
||||
if (fstat(file->fd, &statb) == 0)
|
||||
filelen = statb.st_size;
|
||||
else
|
||||
{
|
||||
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Invalid size of binlog file, pos %lu", pos);
|
||||
return NULL;
|
||||
else {
|
||||
if (file->fd == -1) {
|
||||
hdr->ok = SLAVE_POS_READ_OK;
|
||||
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "blr_read_binlog called with invalid file->fd, pos %lu", pos);
|
||||
spinlock_release(&file->lock);
|
||||
MXS_ERROR("Slave has failed fstat %s", errmsg);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
spinlock_release(&file->lock);
|
||||
|
||||
if (pos > filelen)
|
||||
{
|
||||
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested position %lu is beyond end of the binlog file '%s', size %lu", pos, file->binlogname, filelen);
|
||||
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested position %lu is beyond end of the binlog file '%s', size %lu",
|
||||
pos, file->binlogname, filelen);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
spinlock_acquire(&file->lock);
|
||||
|
||||
if (strcmp(router->binlog_name, file->binlogname) == 0 &&
|
||||
pos >= router->binlog_position)
|
||||
{
|
||||
if (pos > router->binlog_position)
|
||||
if (pos > router->binlog_position && !router->rotating)
|
||||
{
|
||||
/* Unsafe position, slave will be disconnected by the calling routine */
|
||||
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested binlog position %lu. Position is unsafe so disconnecting. Latest safe position %lu, end of binlog file %lu", pos, router->binlog_position, router->current_pos);
|
||||
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested binlog position %lu. Position is unsafe so disconnecting. "
|
||||
"Latest safe position %lu, end of binlog file %lu",
|
||||
pos, router->binlog_position, router->current_pos);
|
||||
|
||||
hdr->ok = SLAVE_POS_READ_UNSAFE;
|
||||
} else {
|
||||
/* accessing last position is ok */
|
||||
hdr->ok = SLAVE_POS_READ_OK;
|
||||
}
|
||||
|
||||
spinlock_release(&file->lock);
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
spinlock_release(&file->lock);
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
/* Read the header information from the file */
|
||||
if ((n = pread(file->fd, hdbuf, BINLOG_EVENT_HDR_LEN, pos)) != BINLOG_EVENT_HDR_LEN)
|
||||
@ -944,9 +964,8 @@ int fde_seen = 0;
|
||||
router->pending_transaction = 1;
|
||||
pending_transaction = 0;
|
||||
|
||||
MXS_WARNING("pending transaction has been found. "
|
||||
"Setting safe pos to %lu, current pos %lu",
|
||||
router->binlog_position, router->current_pos);
|
||||
MXS_ERROR("Binlog '%s' ends at position %lu and has an incomplete transaction at %lu. ",
|
||||
router->binlog_name, router->current_pos, router->binlog_position);
|
||||
|
||||
return 0;
|
||||
} else {
|
||||
|
@ -92,7 +92,7 @@ void blr_master_close(ROUTER_INSTANCE *);
|
||||
char *blr_extract_column(GWBUF *buf, int col);
|
||||
void blr_cache_response(ROUTER_INSTANCE *router, char *response, GWBUF *buf);
|
||||
void poll_fake_write_event(DCB *dcb);
|
||||
GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr);
|
||||
GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr, unsigned long long pos_end);
|
||||
static void blr_check_last_master_event(void *inst);
|
||||
extern int blr_check_heartbeat(ROUTER_INSTANCE *router);
|
||||
extern char * blr_last_event_description(ROUTER_INSTANCE *router);
|
||||
@ -1024,16 +1024,13 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
* won't be updated to router->current_pos
|
||||
*/
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) {
|
||||
/* no pending transaction: set current_pos to binlog_position */
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
router->binlog_position = router->current_pos;
|
||||
router->current_safe_event = router->current_pos;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
}
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
/**
|
||||
* Detect transactions in events
|
||||
@ -1056,6 +1053,8 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
flags = *(ptr+4+20 + 8 + 4);
|
||||
|
||||
if (flags == 0) {
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
|
||||
if (router->pending_transaction > 0) {
|
||||
MXS_ERROR("A MariaDB 10 transaction "
|
||||
"is already open "
|
||||
@ -1069,11 +1068,9 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
// An action should be taken here
|
||||
}
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
router->pending_transaction = 1;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
spinlock_release(&router->binlog_lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1092,9 +1089,10 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
statement_sql = calloc(1, statement_len+1);
|
||||
strncpy(statement_sql, (char *)ptr+4+20+4+4+1+2+2+var_block_len+1+db_name_len, statement_len);
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
|
||||
/* Check for BEGIN (it comes for START TRANSACTION too) */
|
||||
if (strncmp(statement_sql, "BEGIN", 5) == 0) {
|
||||
|
||||
if (router->pending_transaction > 0) {
|
||||
MXS_ERROR("A transaction is already open "
|
||||
"@ %lu and a new one starts @ %lu",
|
||||
@ -1104,34 +1102,28 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
// An action should be taken here
|
||||
}
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
router->pending_transaction = 1;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
}
|
||||
|
||||
/* Check for COMMIT in non transactional store engines */
|
||||
if (strncmp(statement_sql, "COMMIT", 6) == 0) {
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
router->pending_transaction = 2;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
}
|
||||
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
free(statement_sql);
|
||||
}
|
||||
|
||||
/* Check for COMMIT in Transactional engines, i.e InnoDB */
|
||||
if(hdr.event_type == XID_EVENT) {
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
|
||||
if (router->pending_transaction) {
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
router->pending_transaction = 3;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
}
|
||||
spinlock_release(&router->binlog_lock);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1193,6 +1185,8 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
|
||||
router->stats.n_heartbeats++;
|
||||
|
||||
if (router->pending_transaction)
|
||||
router->stats.lastReply = time(0);
|
||||
}
|
||||
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
|
||||
{
|
||||
@ -1241,13 +1235,14 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
* may depend on pending transaction
|
||||
*/
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
|
||||
if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) {
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
router->binlog_position = router->current_pos;
|
||||
router->current_safe_event = router->current_pos;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
/* Now distribute events */
|
||||
blr_distribute_binlog_record(router, &hdr, ptr);
|
||||
@ -1267,26 +1262,25 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
|
||||
if (router->pending_transaction > 1) {
|
||||
unsigned long long pos;
|
||||
unsigned long long end_pos;
|
||||
GWBUF *record;
|
||||
uint8_t *raw_data;
|
||||
REP_HEADER new_hdr;
|
||||
int i=0;
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
pos = router->binlog_position;
|
||||
end_pos = router->current_pos;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
while ((record = blr_read_events_from_pos(router, pos, &new_hdr)) != NULL) {
|
||||
while ((record = blr_read_events_from_pos(router, pos, &new_hdr, end_pos)) != NULL) {
|
||||
i++;
|
||||
raw_data = GWBUF_DATA(record);
|
||||
|
||||
/* distribute event */
|
||||
blr_distribute_binlog_record(router, &new_hdr, raw_data);
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
|
||||
/** The current safe position is only updated
|
||||
* if it points to the event we just distributed. */
|
||||
@ -1297,13 +1291,12 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
|
||||
pos = new_hdr.next_pos;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
gwbuf_free(record);
|
||||
}
|
||||
|
||||
/* Check whether binlog records has been read in previous loop */
|
||||
|
||||
if (pos < router->current_pos) {
|
||||
char err_message[BINLOG_ERROR_MSG_LEN+1];
|
||||
|
||||
@ -1336,16 +1329,17 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
blr_distribute_error_message(router, err_message, "HY000", 1236);
|
||||
}
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
/* update binlog_position and set pending to 0 */
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
|
||||
router->binlog_position = router->current_pos;
|
||||
router->pending_transaction = 0;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
spinlock_release(&router->binlog_lock);
|
||||
} else {
|
||||
spinlock_release(&router->binlog_lock);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1609,6 +1603,8 @@ int action;
|
||||
|
||||
if (action == 1)
|
||||
{
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
|
||||
slave_event_action_t slave_action = SLAVE_FORCE_CATCHUP;
|
||||
|
||||
if(router->trx_safe && slave->binlog_pos == router->current_safe_event &&
|
||||
@ -1656,6 +1652,8 @@ int action;
|
||||
hdr->next_pos - hdr->event_size);
|
||||
}
|
||||
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
/*
|
||||
* If slave_action is SLAVE_FORCE_CATCHUP then
|
||||
* the slave is not at the position it should be. Force it into
|
||||
@ -1692,11 +1690,11 @@ int action;
|
||||
slave->stats.n_bytes += gwbuf_length(pkt);
|
||||
slave->stats.n_events++;
|
||||
slave->dcb->func.write(slave->dcb, pkt);
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
if (hdr->event_type != ROTATE_EVENT)
|
||||
{
|
||||
slave->binlog_pos = hdr->next_pos;
|
||||
}
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
if (slave->overrun)
|
||||
{
|
||||
slave->stats.n_overrun++;
|
||||
@ -1861,7 +1859,7 @@ char *rval;
|
||||
* @return The binlog record wrapped in a GWBUF structure
|
||||
*/
|
||||
GWBUF
|
||||
*blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr) {
|
||||
*blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr, unsigned long long pos_end) {
|
||||
unsigned long long end_pos = 0;
|
||||
struct stat statb;
|
||||
uint8_t hdbuf[19];
|
||||
@ -1871,7 +1869,7 @@ int n;
|
||||
int event_limit;
|
||||
|
||||
/* Get current binnlog position */
|
||||
end_pos = router->current_pos;
|
||||
end_pos = pos_end;
|
||||
|
||||
/* end of file reached, we're done */
|
||||
if (pos == end_pos) {
|
||||
@ -2026,14 +2024,10 @@ blr_stop_start_master(ROUTER_INSTANCE *router) {
|
||||
}
|
||||
router->residual = NULL;
|
||||
|
||||
/* Now it is safe to unleash other threads on this router instance */
|
||||
router->reconnect_pending = 0;
|
||||
router->active_logs = 0;
|
||||
|
||||
router->master_state = BLRM_UNCONNECTED;
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
blr_start_master(router);
|
||||
blr_master_reconnect(router);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -667,8 +667,19 @@ extern char *strcasestr();
|
||||
}
|
||||
else if (strcasecmp(word, "@slave_uuid") == 0)
|
||||
{
|
||||
if ((word = strtok_r(NULL, sep, &brkb)) != NULL)
|
||||
slave->uuid = strdup(word);
|
||||
if ((word = strtok_r(NULL, sep, &brkb)) != NULL) {
|
||||
int len = strlen(word);
|
||||
char *word_ptr = word;
|
||||
if (len) {
|
||||
if (word[len-1] == '\'')
|
||||
word[len-1] = '\0';
|
||||
if (word[0] == '\'') {
|
||||
word[0] = '\0';
|
||||
word_ptr++;
|
||||
}
|
||||
}
|
||||
slave->uuid = strdup(word_ptr);
|
||||
}
|
||||
free(query_text);
|
||||
return blr_slave_replay(router, slave, router->saved_master.setslaveuuid);
|
||||
}
|
||||
@ -1781,10 +1792,11 @@ uint32_t chksum;
|
||||
|
||||
slave->state = BLRS_DUMPING;
|
||||
|
||||
MXS_NOTICE("%s: Slave %s, server id %d requested binlog file %s from position %lu",
|
||||
router->service->name, slave->dcb->remote,
|
||||
slave->serverid,
|
||||
slave->binlogfile, (unsigned long)slave->binlog_pos);
|
||||
MXS_NOTICE("%s: Slave %s:%d, server id %d requested binlog file %s from position %lu",
|
||||
router->service->name, slave->dcb->remote,
|
||||
ntohs((slave->dcb->ipv4).sin_port),
|
||||
slave->serverid,
|
||||
slave->binlogfile, (unsigned long)slave->binlog_pos);
|
||||
|
||||
if (slave->binlog_pos != router->binlog_position ||
|
||||
strcmp(slave->binlogfile, router->binlog_name) != 0)
|
||||
@ -1921,9 +1933,9 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
||||
return rval;
|
||||
}
|
||||
MXS_ERROR("Slave %s:%i, server-id %d, binlog '%s': blr_slave_catchup "
|
||||
"failed to open binlog file",
|
||||
slave->dcb->remote, slave->port, slave->serverid,
|
||||
slave->binlogfile);
|
||||
"failed to open binlog file",
|
||||
slave->dcb->remote, ntohs((slave->dcb->ipv4).sin_port), slave->serverid,
|
||||
slave->binlogfile);
|
||||
|
||||
slave->cstate &= ~CS_BUSY;
|
||||
slave->state = BLRS_ERRORED;
|
||||
@ -1975,11 +1987,11 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
||||
return rval;
|
||||
}
|
||||
MXS_ERROR("Slave %s:%i, server-id %d, binlog '%s': blr_slave_catchup "
|
||||
"failed to open binlog file in rotate event",
|
||||
slave->dcb->remote,
|
||||
slave->port,
|
||||
slave->serverid,
|
||||
slave->binlogfile);
|
||||
"failed to open binlog file in rotate event",
|
||||
slave->dcb->remote,
|
||||
ntohs((slave->dcb->ipv4).sin_port),
|
||||
slave->serverid,
|
||||
slave->binlogfile);
|
||||
|
||||
slave->state = BLRS_ERRORED;
|
||||
|
||||
@ -2014,12 +2026,12 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
||||
|
||||
if (hdr.ok == SLAVE_POS_READ_ERR) {
|
||||
MXS_ERROR("%s Slave %s:%i, server-id %d, binlog '%s', %s",
|
||||
router->service->name,
|
||||
slave->dcb->remote,
|
||||
slave->port,
|
||||
slave->serverid,
|
||||
slave->binlogfile,
|
||||
read_errmsg);
|
||||
router->service->name,
|
||||
slave->dcb->remote,
|
||||
ntohs((slave->dcb->ipv4).sin_port),
|
||||
slave->serverid,
|
||||
slave->binlogfile,
|
||||
read_errmsg);
|
||||
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
|
||||
@ -2039,15 +2051,21 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
||||
|
||||
if (hdr.ok == SLAVE_POS_READ_UNSAFE) {
|
||||
|
||||
ROUTER_OBJECT *router_obj= router->service->router;
|
||||
ROUTER_OBJECT *router_obj;
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
router_obj = router->service->router;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
MXS_ERROR("%s: Slave %s:%i, server-id %d, binlog '%s', %s",
|
||||
router->service->name,
|
||||
slave->dcb->remote,
|
||||
slave->port,
|
||||
slave->serverid,
|
||||
slave->binlogfile,
|
||||
read_errmsg);
|
||||
router->service->name,
|
||||
slave->dcb->remote,
|
||||
ntohs((slave->dcb->ipv4).sin_port),
|
||||
slave->serverid,
|
||||
slave->binlogfile,
|
||||
read_errmsg);
|
||||
|
||||
/*
|
||||
* Close the slave session and socket
|
||||
@ -2114,21 +2132,21 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
||||
slave->stats.n_caughtup++;
|
||||
if (slave->stats.n_caughtup == 1)
|
||||
{
|
||||
MXS_NOTICE("%s: Slave %s:%d, server-id %d is up to date '%s', position %lu.",
|
||||
router->service->name,
|
||||
slave->dcb->remote,
|
||||
slave->port,
|
||||
slave->serverid,
|
||||
slave->binlogfile, (unsigned long)slave->binlog_pos);
|
||||
MXS_ERROR("%s: Slave %s:%d, server-id %d is now up to date '%s', position %lu.",
|
||||
router->service->name,
|
||||
slave->dcb->remote,
|
||||
ntohs((slave->dcb->ipv4).sin_port),
|
||||
slave->serverid,
|
||||
slave->binlogfile, (unsigned long)slave->binlog_pos);
|
||||
}
|
||||
else if ((slave->stats.n_caughtup % 50) == 0)
|
||||
{
|
||||
MXS_NOTICE("%s: Slave %s:%d, server-id %d is up to date '%s', position %lu.",
|
||||
router->service->name,
|
||||
slave->dcb->remote,
|
||||
slave->port,
|
||||
slave->serverid,
|
||||
slave->binlogfile, (unsigned long)slave->binlog_pos);
|
||||
MXS_ERROR("%s: Slave %s:%d, server-id %d is up to date '%s', position %lu.",
|
||||
router->service->name,
|
||||
slave->dcb->remote,
|
||||
ntohs((slave->dcb->ipv4).sin_port),
|
||||
slave->serverid,
|
||||
slave->binlogfile, (unsigned long)slave->binlog_pos);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2207,6 +2225,29 @@ ROUTER_INSTANCE *router = slave->router;
|
||||
{
|
||||
if (slave->state == BLRS_DUMPING)
|
||||
{
|
||||
int do_return;
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
|
||||
do_return = 0;
|
||||
|
||||
/* check for a pending transaction and not rotating */
|
||||
if (router->pending_transaction && strcmp(router->binlog_name, slave->binlogfile) == 0 &&
|
||||
(slave->binlog_pos > router->binlog_position) && !router->rotating) {
|
||||
do_return = 1;
|
||||
}
|
||||
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
if (do_return) {
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
slave->cstate |= CS_EXPECTCB;
|
||||
spinlock_release(&slave->catch_lock);
|
||||
poll_fake_write_event(slave->dcb);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
slave->cstate &= ~(CS_UPTODATE|CS_EXPECTCB);
|
||||
spinlock_release(&slave->catch_lock);
|
||||
@ -2354,11 +2395,11 @@ char err_msg[BINLOG_ERROR_MSG_LEN+1];
|
||||
{
|
||||
if (hdr.ok != SLAVE_POS_READ_OK) {
|
||||
MXS_ERROR("Slave %s:%i, server-id %d, binlog '%s', blr_read_binlog failure: %s",
|
||||
slave->dcb->remote,
|
||||
slave->port,
|
||||
slave->serverid,
|
||||
slave->binlogfile,
|
||||
err_msg);
|
||||
slave->dcb->remote,
|
||||
ntohs((slave->dcb->ipv4).sin_port),
|
||||
slave->serverid,
|
||||
slave->binlogfile,
|
||||
err_msg);
|
||||
}
|
||||
|
||||
blr_close_binlog(router, file);
|
||||
|
Loading…
x
Reference in New Issue
Block a user