Merge branch 'develop' into blr

Add instrumentation

Remove mutexes

Improve gwbuf_append performance

Conflicts:
	server/core/dcb.c
	server/modules/protocol/mysql_backend.c
This commit is contained in:
Mark Riddoch
2014-08-28 11:41:26 +01:00
96 changed files with 9409 additions and 1986 deletions

View File

@ -58,7 +58,7 @@
extern int lm_enabled_logfiles_bitmask;
static char *version_str = "V1.0.1";
static char *version_str = "V1.0.6";
/* The router entry points */
static ROUTER *createInstance(SERVICE *service, char **options);
@ -343,6 +343,8 @@ ROUTER_SLAVE *slave;
atomic_add(&inst->stats.n_slaves, 1);
slave->state = BLRS_CREATED; /* Set initial state of the slave */
slave->cstate = 0;
slave->pthread = 0;
slave->overrun = 0;
spinlock_init(&slave->catch_lock);
slave->dcb = session->client;
slave->router = instance;
@ -501,6 +503,20 @@ static char *event_names[] = {
"Update Rows Event (v2)", "Delete Rows Event (v2)", "GTID Event",
"Anonymous GTID Event", "Previous GTIDS Event"
};
/**
* Display an entry from the spinlock statistics data
*
* @param dcb The DCB to print to
* @param desc Description of the statistic
* @param value The statistic value
*/
static void
spin_reporter(void *dcb, char *desc, int value)
{
dcb_printf((DCB *)dcb, "\t\t%-35s %d\n", desc, value);
}
/**
* Display router diagnostics
*
@ -585,6 +601,13 @@ struct tm tm;
dcb_printf(dcb, "\t\t%-38s: %u\n", event_names[i], router_inst->stats.events[i]);
}
#if SPINLOCK_PROFILE
dcb_printf(dcb, "\tSpinlock statistics (instlock):\n");
spinlock_stats(&instlock, spin_reporter, dcb);
dcb_printf(dcb, "\tSpinlock statistics (instance lock):\n");
spinlock_stats(&router_inst->lock, spin_reporter, dcb);
#endif
if (router_inst->slaves)
{
dcb_printf(dcb, "\tSlaves:\n");
@ -592,26 +615,55 @@ struct tm tm;
session = router_inst->slaves;
while (session)
{
dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid);
dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid);
if (session->hostname)
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
dcb_printf(dcb, "\t\tSlave DCB: %p\n", session->dcb);
dcb_printf(dcb, "\t\tNext Sequence No: %d\n", session->seqno);
dcb_printf(dcb, "\t\tState: %s\n", blrs_states[session->state]);
dcb_printf(dcb, "\t\tBinlog file: %s\n", session->binlogfile);
dcb_printf(dcb, "\t\tBinlog position: %u\n", session->binlog_pos);
dcb_printf(dcb, "\t\tNo. requests: %u\n", session->stats.n_requests);
dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events);
dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts);
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
dcb_printf(dcb, "\t\tSlave DCB: %p\n", session->dcb);
dcb_printf(dcb, "\t\tNext Sequence No: %d\n", session->seqno);
dcb_printf(dcb, "\t\tState: %s\n", blrs_states[session->state]);
dcb_printf(dcb, "\t\tBinlog file: %s\n", session->binlogfile);
dcb_printf(dcb, "\t\tBinlog position: %u\n", session->binlog_pos);
if (session->nocrc)
dcb_printf(dcb, "\t\tMaster Binlog CRC: None\n");
dcb_printf(dcb, "\t\tNo. requests: %u\n", session->stats.n_requests);
dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events);
dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts);
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
dcb_printf(dcb, "\t\tNo. catchup NRs: %u\n", session->stats.n_catchupnr);
dcb_printf(dcb, "\t\tNo. already up to date: %u\n", session->stats.n_alreadyupd);
dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd);
dcb_printf(dcb, "\t\tNo. of low water cbs %u\n", session->stats.n_cb);
dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb);
dcb_printf(dcb, "\t\tNo. of low water cbs N/A %u\n", session->stats.n_cbna);
dcb_printf(dcb, "\t\tNo. of events > high water %u\n", session->stats.n_above);
dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read);
dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun);
dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]);
dcb_printf(dcb, "\t\tNo. of distribute action 2 %u\n", session->stats.n_actions[1]);
dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]);
if ((session->cstate & CS_UPTODATE) == 0)
{
dcb_printf(dcb, "\t\tSlave is in catchup mode. %s\n",
((session->cstate & CS_EXPECTCB) == 0 ? "" :
"Waiting for DCB queue to drain."));
}
else
{
dcb_printf(dcb, "\t\tSlave is in normal mode.\n");
if (session->binlog_pos != router_inst->binlog_position)
{
dcb_printf(dcb, "\t\tSlave reports up to date however "
"the slave binlog position does not match the master\n");
}
}
#if SPINLOCK_PROFILE
dcb_printf(dcb, "\tSpinlock statistics (catch_lock):\n");
spinlock_stats(&session->catch_lock, spin_reporter, dcb);
dcb_printf(dcb, "\tSpinlock statistics (rses_lock):\n");
spinlock_stats(&session->rses_lock, spin_reporter, dcb);
#endif
session = session->next;
}
spinlock_release(&router_inst->lock);

View File

@ -30,6 +30,7 @@
*/
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
@ -257,12 +258,20 @@ unsigned char *data;
if (lseek(fd, pos, SEEK_SET) != pos)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to seek for binlog entry, "
"at %d.\n", pos)));
return NULL;
}
/* Read the header information from the file */
if (read(fd, hdbuf, 19) != 19)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to read header for binlog entry, "
"at %d (%s).\n", pos, strerror(errno))));
return NULL;
}
hdr->timestamp = extract_field(hdbuf, 32);
hdr->event_type = hdbuf[4];
hdr->serverid = extract_field(&hdbuf[5], 32);
@ -272,7 +281,9 @@ unsigned char *data;
if ((result = gwbuf_alloc(hdr->event_size)) == NULL)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to allocate memory for binlog entry.\n")));
"Failed to allocate memory for binlog entry, "
"size %d at %d.\n",
hdr->event_size, pos)));
return NULL;
}
data = GWBUF_DATA(result);

View File

@ -359,6 +359,27 @@ char query[128];
case BLRM_LATIN1:
// Response to the SET NAMES latin1, should be stored
router->saved_master.setnames = buf;
buf = blr_make_query("SET NAMES utf8");
router->master_state = BLRM_UTF8;
router->master->func.write(router->master, buf);
break;
case BLRM_UTF8:
// Response to the SET NAMES utf8, should be stored
router->saved_master.utf8 = buf;
buf = blr_make_query("SELECT 1");
router->master_state = BLRM_SELECT1;
router->master->func.write(router->master, buf);
break;
case BLRM_SELECT1:
// Response to the SELECT 1, should be stored
router->saved_master.select1 = buf;
buf = blr_make_query("SELECT VERSION();");
router->master_state = BLRM_SELECTVER;
router->master->func.write(router->master, buf);
break;
case BLRM_SELECTVER:
// Response to SELECT VERSION should be stored
router->saved_master.selectver = buf;
buf = blr_make_registration(router);
router->master_state = BLRM_REGISTER;
router->master->func.write(router->master, buf);
@ -879,59 +900,123 @@ MYSQL_session *auth_info;
*
* @param router The router instance
* @param hdr The replication event header
* @param ptr The raw replication eent data
* @param ptr The raw replication event data
*/
static void
blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
{
GWBUF *pkt;
GWBUF *pkt, *distq;
uint8_t *buf;
ROUTER_SLAVE *slave;
int action;
spinlock_acquire(&router->lock);
slave = router->slaves;
while (slave)
{
if ((slave->binlog_pos == hdr->next_pos - hdr->event_size)
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
spinlock_acquire(&slave->catch_lock);
if ((slave->cstate & (CS_UPTODATE|CS_DIST)) == CS_UPTODATE)
{
pkt = gwbuf_alloc(hdr->event_size + 5);
buf = GWBUF_DATA(pkt);
encode_value(buf, hdr->event_size + 1, 24);
buf += 3;
*buf++ = slave->seqno++;
*buf++ = 0; // OK
memcpy(buf, ptr, hdr->event_size);
if (hdr->event_type == ROTATE_EVENT)
{
blr_slave_rotate(slave, ptr);
}
slave->dcb->func.write(slave->dcb, pkt);
if (hdr->event_type != ROTATE_EVENT)
{
slave->binlog_pos = hdr->next_pos;
}
}
else if ((hdr->event_type != ROTATE_EVENT)
&& (slave->binlog_pos != hdr->next_pos ||
strcmp(slave->binlogfile, router->binlog_name) != 0))
{
/* Check slave is in catchup mode and if not
* force it to go into catchup mode.
/* Slave is up to date with the binlog and no distribute is
* running on this slave.
*/
if (slave->cstate & CS_UPTODATE)
action = 1;
slave->cstate |= CS_DIST;
}
else if ((slave->cstate & (CS_UPTODATE|CS_DIST)) == (CS_UPTODATE|CS_DIST))
{
/* Slave is up to date with the binlog and a distribute is
* running on this slave.
*/
slave->overrun = 1;
action = 2;
}
else if ((slave->cstate & CS_UPTODATE) == 0)
{
/* Slave is in catchup mode */
action = 3;
}
slave->stats.n_actions[action-1]++;
spinlock_release(&slave->catch_lock);
if (action == 1)
{
if ((slave->binlog_pos == hdr->next_pos - hdr->event_size)
&& (strcmp(slave->binlogfile, router->binlog_name) == 0 ||
hdr->event_type == ROTATE_EVENT))
{
spinlock_release(&router->lock);
pkt = gwbuf_alloc(hdr->event_size + 5);
buf = GWBUF_DATA(pkt);
encode_value(buf, hdr->event_size + 1, 24);
buf += 3;
*buf++ = slave->seqno++;
*buf++ = 0; // OK
memcpy(buf, ptr, hdr->event_size);
if (hdr->event_type == ROTATE_EVENT)
{
blr_slave_rotate(slave, ptr);
}
slave->dcb->func.write(slave->dcb, pkt);
if (hdr->event_type != ROTATE_EVENT)
{
slave->binlog_pos = hdr->next_pos;
}
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_UPTODATE;
spinlock_release(&slave->catch_lock);
blr_slave_catchup(router, slave);
spinlock_acquire(&router->lock);
slave = router->slaves;
if (slave)
continue;
if (slave->overrun)
{
slave->stats.n_overrun++;
slave->overrun = 0;
spinlock_release(&router->lock);
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
spinlock_release(&slave->catch_lock);
blr_slave_catchup(router, slave);
spinlock_acquire(&router->lock);
slave = router->slaves;
if (slave)
continue;
else
break;
}
else
break;
{
slave->cstate &= ~CS_DIST;
}
spinlock_release(&slave->catch_lock);
}
else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size)
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Slave %d is ahead of expected position %s@%d. "
"Expected position %d",
slave->serverid, slave->binlogfile,
slave->binlog_pos,
hdr->next_pos - hdr->event_size)));
}
else if ((hdr->event_type != ROTATE_EVENT)
&& (slave->binlog_pos != hdr->next_pos - hdr->event_size ||
strcmp(slave->binlogfile, router->binlog_name) != 0))
{
/* Check slave is in catchup mode and if not
* force it to go into catchup mode.
*/
if (slave->cstate & CS_UPTODATE)
{
spinlock_release(&router->lock);
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"Force slave %d into catchup mode %s@%d\n",
slave->serverid, slave->binlogfile,
slave->binlog_pos)));
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
spinlock_release(&slave->catch_lock);
blr_slave_catchup(router, slave);
spinlock_acquire(&router->lock);
slave = router->slaves;
if (slave)
continue;
else
break;
}
}
}

View File

@ -107,6 +107,11 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
case COM_BINLOG_DUMP:
return blr_slave_binlog_dump(router, slave, queue);
break;
case COM_QUIT:
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"COM_QUIT received from slave with server_id %d\n",
slave->serverid)));
break;
default:
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
@ -124,20 +129,23 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
* when MaxScale registered as a slave. The exception to the rule is the
* request to obtain the current timestamp value of the server.
*
* Three select statements are currently supported:
* Five select statements are currently supported:
* SELECT UNIX_TIMESTAMP();
* SELECT @master_binlog_checksum
* SELECT @@GLOBAL.GTID_MODE
* SELECT VERSION()
* SELECT 1
*
* Two show commands are supported:
* SHOW VARIABLES LIKE 'SERVER_ID'
* SHOW VARIABLES LIKE 'SERVER_UUID'
*
* Four set commands are supported:
* Five set commands are supported:
* SET @master_binlog_checksum = @@global.binlog_checksum
* SET @master_heartbeat_period=...
* SET @slave_slave_uuid=...
* SET NAMES latin1
* SET NAMES utf8
*
* @param router The router instance this defines the master for this replication chain
* @param slave The slave specific data
@ -186,6 +194,16 @@ int query_len;
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.gtid_mode);
}
else if (strcasecmp(word, "1") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.select1);
}
else if (strcasecmp(word, "VERSION()") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.selectver);
}
}
else if (strcasecmp(word, "SHOW") == 0)
{
@ -219,6 +237,11 @@ int query_len;
}
else if (strcasecmp(word, "@master_binlog_checksum") == 0)
{
word = strtok_r(NULL, sep, &brkb);
if (strcasecmp(word, "'none'") == 0)
slave->nocrc = 1;
else
slave->nocrc = 0;
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.chksum1);
}
@ -235,6 +258,11 @@ int query_len;
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.setnames);
}
else if (strcasecmp(word, "utf8") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.utf8);
}
}
}
free(query_text);
@ -473,34 +501,42 @@ uint32_t chksum;
slave->state = BLRS_DUMPING;
slave->seqno = 1;
if (slave->nocrc)
len = 0x2b;
else
len = 0x2f;
// Build a fake rotate event
resp = gwbuf_alloc(0x34);
hdr.payload_len = 0x30;
resp = gwbuf_alloc(len + 5);
hdr.payload_len = len + 1;
hdr.seqno = slave->seqno++;
hdr.ok = 0;
hdr.timestamp = 0L;
hdr.event_type = ROTATE_EVENT;
hdr.serverid = router->masterid;
hdr.event_size = 0x2f;
hdr.next_pos = slave->binlog_pos;
hdr.flags = 0;
hdr.event_size = len;
hdr.next_pos = 0;
hdr.flags = 0x20;
ptr = blr_build_header(resp, &hdr);
encode_value(ptr, slave->binlog_pos, 64);
ptr += 8;
memcpy(ptr, slave->binlogfile, BINLOG_FNAMELEN);
ptr += BINLOG_FNAMELEN;
/*
* Now add the CRC to the fake binlog rotate event.
*
* The algorithm is first to compute the checksum of an empty buffer
* and then the checksum of the event portion of the message, ie we do not
* include the length, sequence number and ok byte that makes up the first
* 5 bytes of the message. We also do not include the 4 byte checksum itself.
*/
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, GWBUF_DATA(resp) + 5, hdr.event_size - 4);
encode_value(ptr, chksum, 32);
if (!slave->nocrc)
{
/*
* Now add the CRC to the fake binlog rotate event.
*
* The algorithm is first to compute the checksum of an empty buffer
* and then the checksum of the event portion of the message, ie we do not
* include the length, sequence number and ok byte that makes up the first
* 5 bytes of the message. We also do not include the 4 byte checksum itself.
*/
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, GWBUF_DATA(resp) + 5, hdr.event_size - 4);
encode_value(ptr, chksum, 32);
}
rval = slave->dcb->func.write(slave->dcb, resp);
@ -532,8 +568,16 @@ uint32_t chksum;
slave->dcb->low_water = router->low_water;
slave->dcb->high_water = router->high_water;
dcb_add_callback(slave->dcb, DCB_REASON_LOW_WATER, blr_slave_callback, slave);
dcb_add_callback(slave->dcb, DCB_REASON_DRAINED, blr_slave_callback, slave);
rval = blr_slave_catchup(router, slave);
if (slave->binlog_pos != router->binlog_position ||
strcmp(slave->binlogfile, router->binlog_name) != 0)
{
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_UPTODATE;
spinlock_release(&slave->catch_lock);
rval = blr_slave_catchup(router, slave);
}
return rval;
}
@ -655,6 +699,7 @@ struct timespec req;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
doitagain:
/*
* We have a slightly complex syncronisation mechansim here,
* we need to make sure that we do not have multiple threads
@ -670,9 +715,9 @@ struct timespec req;
* in the outer loop and the CS_INNERLOOP, to say we are in
* the inner loop.
*
* If just CS_READING is set the thread other may be about to
* If just CS_READING is set the other thread may be about to
* enter the inner loop or may be about to exit the function
* completely. therefore we have to wait to see if CS_READING
* completely. Therefore we have to wait to see if CS_READING
* is cleared or CS_INNERLOOP is set.
*
* If CS_READING gets cleared then this thread should proceed
@ -687,24 +732,57 @@ struct timespec req;
req.tv_sec = 0;
req.tv_nsec = 1000;
spinlock_acquire(&slave->catch_lock);
if (slave->cstate & CS_READING)
if (slave->cstate & CS_UPTODATE)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"blr_slave_catchup called with up to date slave %d at "
"%s@%d. Reading position %s@%d\n",
slave->serverid, slave->binlogfile,
slave->binlog_pos, router->binlog_name,
router->binlog_position)));
slave->stats.n_alreadyupd++;
spinlock_release(&slave->catch_lock);
return 1;
}
while (slave->cstate & CS_READING)
{
// Wait until we know what the other thread is doing
while ((slave->cstate & (CS_READING|CS_INNERLOOP)) == CS_READING)
{
spinlock_release(&slave->catch_lock);
nanosleep(&req, NULL);
spinlock_acquire(&slave->catch_lock);
}
if (slave->cstate & CS_READING)
// Other thread is in the innerloop
if ((slave->cstate & (CS_READING|CS_INNERLOOP)) == (CS_READING|CS_INNERLOOP))
{
spinlock_release(&slave->catch_lock);
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"blr_slave_catchup thread returning due to "
"lock being held by another thread. %s@%d\n",
slave->binlogfile,
slave->binlog_pos)));
slave->stats.n_catchupnr++;
return 1; // We cheat here and return 1 because otherwise
// an error would be sent and we do not want that
}
/* Release the lock for a short time to allow the other
* thread to exit the outer reading loop.
*/
spinlock_release(&slave->catch_lock);
nanosleep(&req, NULL);
spinlock_acquire(&slave->catch_lock);
}
if (slave->pthread)
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG, "Multiple threads sending to same thread.\n")));
slave->pthread = pthread_self();
slave->cstate |= CS_READING;
spinlock_release(&slave->catch_lock);
if (DCB_ABOVE_HIGH_WATER(slave->dcb))
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "blr_slave_catchup above high water on entry.\n")));
do {
if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
@ -725,6 +803,7 @@ struct timespec req;
while ((!DCB_ABOVE_HIGH_WATER(slave->dcb)) &&
(record = blr_read_binlog(fd, slave->binlog_pos, &hdr)) != NULL)
{
if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++;
head = gwbuf_alloc(5);
ptr = GWBUF_DATA(head);
encode_value(ptr, hdr.event_size + 1, 24);
@ -754,15 +833,14 @@ struct timespec req;
atomic_add(&slave->stats.n_events, 1);
burst++;
}
if (record == NULL)
slave->stats.n_failed_read++;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_INNERLOOP;
spinlock_release(&slave->catch_lock);
close(fd);
} while (record && DCB_BELOW_LOW_WATER(slave->dcb));
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_READING;
spinlock_release(&slave->catch_lock);
if (record)
{
atomic_add(&slave->stats.n_flows, 1);
@ -772,14 +850,39 @@ struct timespec req;
}
else
{
int state_change = 0;
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_UPTODATE;
if ((slave->cstate & CS_UPTODATE) == 0)
{
atomic_add(&slave->stats.n_upd, 1);
slave->cstate |= CS_UPTODATE;
state_change = 1;
}
spinlock_release(&slave->catch_lock);
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"blr_slave_catchup slave is up to date %s, %u\n",
if (state_change)
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"blr_slave_catchup slave is up to date %s, %u\n",
slave->binlogfile, slave->binlog_pos)));
}
return rval;
spinlock_acquire(&slave->catch_lock);
#if 0
if (slave->pthread != pthread_self())
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Multple threads in catchup for same slave: %x and %x\n", slave->pthread, pthread_self())));
abort();
}
#endif
slave->pthread = 0;
#if 0
if (DCB_BELOW_LOW_WATER(slave->dcb) && slave->binlog_pos != router->binlog_position) abort();
#endif
slave->cstate &= ~CS_READING;
spinlock_release(&slave->catch_lock);
if (DCB_BELOW_LOW_WATER(slave->dcb) && slave->binlog_pos != router->binlog_position)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Expected to be above low water\n")));
goto doitagain;
}
}
/**
@ -798,13 +901,27 @@ blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data)
ROUTER_SLAVE *slave = (ROUTER_SLAVE *)data;
ROUTER_INSTANCE *router = slave->router;
if (reason != DCB_REASON_LOW_WATER)
return 0;
if (slave->state == BLRS_DUMPING)
if (reason == DCB_REASON_DRAINED)
{
atomic_add(&slave->stats.n_events, 1);
blr_slave_catchup(router, slave);
if (slave->state == BLRS_DUMPING &&
slave->binlog_pos != router->binlog_position)
{
atomic_add(&slave->stats.n_dcb, 1);
blr_slave_catchup(router, slave);
}
}
if (reason == DCB_REASON_LOW_WATER)
{
if (slave->state == BLRS_DUMPING)
{
atomic_add(&slave->stats.n_cb, 1);
blr_slave_catchup(router, slave);
}
else
{
atomic_add(&slave->stats.n_cbna, 1);
}
}
return 0;
}