Fixed for EPOLLHUP events

Possible double free in maxscaled on close

binlog router debugging/tracing
This commit is contained in:
Mark Riddoch
2014-09-05 17:29:17 +01:00
parent 877442c941
commit c273988e51
11 changed files with 160 additions and 63 deletions

View File

@ -308,6 +308,8 @@ int rval = 0;
/** /**
* Trim bytes form the end of a GWBUF structure * Trim bytes form the end of a GWBUF structure
* *
* This routine assumes the buffer is not part of a chain
*
* @param buf The buffer to trim * @param buf The buffer to trim
* @param nbytes The number of bytes to trim off * @param nbytes The number of bytes to trim off
* @return The buffer chain * @return The buffer chain
@ -315,6 +317,8 @@ int rval = 0;
GWBUF * GWBUF *
gwbuf_trim(GWBUF *buf, unsigned int n_bytes) gwbuf_trim(GWBUF *buf, unsigned int n_bytes)
{ {
ss_dassert(buf->next == NULL);
if (GWBUF_LENGTH(buf) <= n_bytes) if (GWBUF_LENGTH(buf) <= n_bytes)
{ {
gwbuf_consume(buf, GWBUF_LENGTH(buf)); gwbuf_consume(buf, GWBUF_LENGTH(buf));

View File

@ -388,11 +388,6 @@ DCB_CALLBACK *cb;
} }
spinlock_release(&dcb->cb_lock); spinlock_release(&dcb->cb_lock);
if (dcb->dcb_readqueue)
{
GWBUF* queue = dcb->dcb_readqueue;
while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL);
}
bitmask_free(&dcb->memdata.bitmask); bitmask_free(&dcb->memdata.bitmask);
simple_mutex_done(&dcb->dcb_read_lock); simple_mutex_done(&dcb->dcb_read_lock);
simple_mutex_done(&dcb->dcb_write_lock); simple_mutex_done(&dcb->dcb_write_lock);
@ -411,7 +406,7 @@ DCB_CALLBACK *cb;
* *
* @param threadid The thread ID of the caller * @param threadid The thread ID of the caller
*/ */
DCB* DCB *
dcb_process_zombies(int threadid) dcb_process_zombies(int threadid)
{ {
DCB *ptr, *lptr; DCB *ptr, *lptr;
@ -1255,6 +1250,12 @@ void dprintAllDCBs(DCB *pdcb)
DCB *dcb; DCB *dcb;
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
#if SPINLOCK_PROFILE
dcb_printf(pdcb, "DCB List Spinlock Statistics:\n");
spinlock_stats(&dcbspin, spin_reporter, pdcb);
dcb_printf(pdcb, "Zombie Queue Lock Statistics:\n");
spinlock_stats(&zombiespin, spin_reporter, pdcb);
#endif
dcb = allDCBs; dcb = allDCBs;
while (dcb) while (dcb)
{ {
@ -1280,7 +1281,7 @@ DCB *dcb;
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls); dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls);
dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_readrechecks); dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_readrechecks);
dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls); dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls);
dcb_printf(pdcb, "\t\tNo. of write rechecks: %d\n", dcb->stats.n_writerechecks); dcb_printf(pdcb, "\t\tNo. of write rechecks: %d\n", dcb->stats.n_writerechecks);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
@ -1304,21 +1305,20 @@ DCB *dcb;
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
dcb = allDCBs; dcb = allDCBs;
dcb_printf(pdcb, "Descriptor Control Blocks\n"); dcb_printf(pdcb, "Descriptor Control Blocks\n");
dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n"); dcb_printf(pdcb, "------------------+----------------------------+--------------------+----------\n");
dcb_printf(pdcb, " %-10s | %-26s | %-20s | %s\n", dcb_printf(pdcb, " %-16s | %-26s | %-18s | %s\n",
"DCB", "State", "Service", "Remote"); "DCB", "State", "Service", "Remote");
dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n"); dcb_printf(pdcb, "------------------+----------------------------+--------------------+----------\n");
while (dcb) while (dcb)
{ {
dcb_printf(pdcb, " %-14p | %-26s | %-20s | %s\n", dcb_printf(pdcb, " %-16p | %-26s | %-18s | %s\n",
dcb, gw_dcb_state2string(dcb->state), dcb, gw_dcb_state2string(dcb->state),
(dcb->session->service ? dcb->session->service->name : ""),
(dcb->session->service ? ((dcb->session && dcb->session->service) ? dcb->session->service->name : ""),
dcb->session->service->name : ""),
(dcb->remote ? dcb->remote : "")); (dcb->remote ? dcb->remote : ""));
dcb = dcb->next; dcb = dcb->next;
} }
dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n\n"); dcb_printf(pdcb, "------------------+----------------------------+--------------------+----------\n\n");
spinlock_release(&dcbspin); spinlock_release(&dcbspin);
} }
@ -1335,16 +1335,16 @@ DCB *dcb;
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
dcb = allDCBs; dcb = allDCBs;
dcb_printf(pdcb, "Client Connections\n"); dcb_printf(pdcb, "Client Connections\n");
dcb_printf(pdcb, "-----------------+------------+----------------------+------------\n"); dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n");
dcb_printf(pdcb, " %-15s | %-10s | %-20s | %s\n", dcb_printf(pdcb, " %-15s | %-16s | %-20s | %s\n",
"Client", "DCB", "Service", "Session"); "Client", "DCB", "Service", "Session");
dcb_printf(pdcb, "-----------------+------------+----------------------+------------\n"); dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n");
while (dcb) while (dcb)
{ {
if (dcb_isclient(dcb) if (dcb_isclient(dcb)
&& dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER) && dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER)
{ {
dcb_printf(pdcb, " %-15s | %10p | %-20s | %10p\n", dcb_printf(pdcb, " %-15s | %16p | %-20s | %10p\n",
(dcb->remote ? dcb->remote : ""), (dcb->remote ? dcb->remote : ""),
dcb, (dcb->session->service ? dcb, (dcb->session->service ?
dcb->session->service->name : ""), dcb->session->service->name : ""),
@ -1352,7 +1352,7 @@ DCB *dcb;
} }
dcb = dcb->next; dcb = dcb->next;
} }
dcb_printf(pdcb, "-----------------+------------+----------------------+------------\n\n"); dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n\n");
spinlock_release(&dcbspin); spinlock_release(&dcbspin);
} }
@ -1392,7 +1392,7 @@ dprintDCB(DCB *pdcb, DCB *dcb)
dcb->stats.n_accepts); dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls); dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls);
dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_readrechecks); dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_readrechecks);
dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls); dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls);
dcb_printf(pdcb, "\t\tNo. of write rechecks: %d\n", dcb->stats.n_writerechecks); dcb_printf(pdcb, "\t\tNo. of write rechecks: %d\n", dcb->stats.n_writerechecks);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n",
dcb->stats.n_high_water); dcb->stats.n_high_water);
@ -1929,7 +1929,7 @@ int rval = 0;
* @param dcb The DCB that has data available * @param dcb The DCB that has data available
*/ */
void void
dcb_pollin(DCB *dcb) dcb_pollin(DCB *dcb, int thread_id)
{ {
spinlock_acquire(&dcb->pollinlock); spinlock_acquire(&dcb->pollinlock);
@ -1938,7 +1938,10 @@ dcb_pollin(DCB *dcb)
dcb->pollinbusy = 1; dcb->pollinbusy = 1;
do { do {
if (dcb->readcheck) if (dcb->readcheck)
{
dcb->stats.n_readrechecks++; dcb->stats.n_readrechecks++;
dcb_process_zombies(thread_id);
}
dcb->readcheck = 0; dcb->readcheck = 0;
spinlock_release(&dcb->pollinlock); spinlock_release(&dcb->pollinlock);
dcb->func.read(dcb); dcb->func.read(dcb);
@ -1970,7 +1973,7 @@ dcb_pollin(DCB *dcb)
* @param dcb The DCB thats available for writes * @param dcb The DCB thats available for writes
*/ */
void void
dcb_pollout(DCB *dcb) dcb_pollout(DCB *dcb, int thread_id)
{ {
spinlock_acquire(&dcb->polloutlock); spinlock_acquire(&dcb->polloutlock);
@ -1979,7 +1982,10 @@ dcb_pollout(DCB *dcb)
dcb->polloutbusy = 1; dcb->polloutbusy = 1;
do { do {
if (dcb->writecheck) if (dcb->writecheck)
{
dcb_process_zombies(thread_id);
dcb->stats.n_writerechecks++; dcb->stats.n_writerechecks++;
}
dcb->writecheck = 0; dcb->writecheck = 0;
spinlock_release(&dcb->polloutlock); spinlock_release(&dcb->polloutlock);
dcb->func.write_ready(dcb); dcb->func.write_ready(dcb);

View File

@ -181,7 +181,7 @@ poll_add_dcb(DCB *dcb)
CHK_DCB(dcb); CHK_DCB(dcb);
ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET; ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET;
ev.data.ptr = dcb; ev.data.ptr = dcb;
/*< /*<
@ -501,7 +501,7 @@ DCB *zombies = NULL;
#else #else
atomic_add(&pollStats.n_write, atomic_add(&pollStats.n_write,
1); 1);
dcb_pollout(dcb); dcb_pollout(dcb, thread_id);
#endif #endif
} else { } else {
LOGIF(LD, (skygw_log_write( LOGIF(LD, (skygw_log_write(
@ -551,7 +551,7 @@ DCB *zombies = NULL;
#if MUTEX_BLOCK #if MUTEX_BLOCK
dcb->func.read(dcb); dcb->func.read(dcb);
#else #else
dcb_pollin(dcb); dcb_pollin(dcb, thread_id);
#endif #endif
} }
#if MUTEX_BLOCK #if MUTEX_BLOCK
@ -706,7 +706,7 @@ int i;
dcb_printf(dcb, "Number of times no threads polling: %d\n", dcb_printf(dcb, "Number of times no threads polling: %d\n",
pollStats.n_nothreads); pollStats.n_nothreads);
dcb_printf(dcb, "No of poll completions with dscriptors\n"); dcb_printf(dcb, "No of poll completions with descriptors\n");
dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n"); dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n");
for (i = 0; i < MAXNFDS - 1; i++) for (i = 0; i < MAXNFDS - 1; i++)
{ {

View File

@ -271,8 +271,8 @@ int fail_accept_errno;
#define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water) #define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water)
#define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) #define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water)
void dcb_pollin(DCB *); void dcb_pollin(DCB *, int);
void dcb_pollout(DCB *); void dcb_pollout(DCB *, int);
DCB *dcb_get_zombies(void); DCB *dcb_get_zombies(void);
int gw_write( int gw_write(
#if defined(SS_DEBUG) #if defined(SS_DEBUG)

View File

@ -216,6 +216,7 @@ typedef struct router_instance {
unsigned int high_water; /*< High water mark for client DCB */ unsigned int high_water; /*< High water mark for client DCB */
BLCACHE *cache[2]; BLCACHE *cache[2];
ROUTER_STATS stats; /*< Statistics for this router */ ROUTER_STATS stats; /*< Statistics for this router */
SPINLOCK alock;
int active_logs; int active_logs;
int reconnect_pending; int reconnect_pending;
GWBUF *queue; GWBUF *queue;

View File

@ -235,6 +235,7 @@ maxscaled_error(DCB *dcb)
static int static int
maxscaled_hangup(DCB *dcb) maxscaled_hangup(DCB *dcb)
{ {
dcb_close(dcb);
return 0; return 0;
} }
@ -313,9 +314,11 @@ maxscaled_close(DCB *dcb)
MAXSCALED *maxscaled = dcb->protocol; MAXSCALED *maxscaled = dcb->protocol;
if (maxscaled && maxscaled->username) if (maxscaled && maxscaled->username)
{
free(maxscaled->username); free(maxscaled->username);
maxscaled->username = NULL;
}
dcb_close(dcb);
return 0; return 0;
} }

View File

@ -75,10 +75,12 @@ libreadwritesplit.so:
clean: clean:
rm -f $(OBJ) $(MODULES) rm -f $(OBJ) $(MODULES)
(cd readwritesplit; touch depend.mk; make clean) (cd readwritesplit; touch depend.mk; make clean)
(cd binlog; touch depend.mk; make clean)
tags: tags:
ctags $(SRCS) $(HDRS) ctags $(SRCS) $(HDRS)
(cd readwritesplit; make tags) (cd readwritesplit; make tags)
(cd binlog; make tags)
depend: depend:
@rm -f depend.mk @rm -f depend.mk

View File

@ -275,8 +275,13 @@ int i;
instances = inst; instances = inst;
spinlock_release(&instlock); spinlock_release(&instlock);
spinlock_init(&inst->alock);
inst->active_logs = 0; inst->active_logs = 0;
inst->reconnect_pending = 0; inst->reconnect_pending = 0;
inst->queue = NULL;
inst->residual = NULL;
inst->slaves = NULL;
inst->next = NULL;
/* /*
* Initialise the binlog file and position * Initialise the binlog file and position
@ -347,7 +352,7 @@ ROUTER_SLAVE *slave;
slave->overrun = 0; slave->overrun = 0;
spinlock_init(&slave->catch_lock); spinlock_init(&slave->catch_lock);
slave->dcb = session->client; slave->dcb = session->client;
slave->router = instance; slave->router = inst;
/** /**
* Add this session to the list of active sessions. * Add this session to the list of active sessions.
@ -606,6 +611,8 @@ struct tm tm;
spinlock_stats(&instlock, spin_reporter, dcb); spinlock_stats(&instlock, spin_reporter, dcb);
dcb_printf(dcb, "\tSpinlock statistics (instance lock):\n"); dcb_printf(dcb, "\tSpinlock statistics (instance lock):\n");
spinlock_stats(&router_inst->lock, spin_reporter, dcb); spinlock_stats(&router_inst->lock, spin_reporter, dcb);
dcb_printf(dcb, "\tSpinlock statistics (active log lock):\n");
spinlock_stats(&router_inst->alock, spin_reporter, dcb);
#endif #endif
if (router_inst->slaves) if (router_inst->slaves)

View File

@ -160,6 +160,11 @@ unsigned char magic[] = BINLOG_MAGIC;
{ {
write(fd, magic, 4); write(fd, magic, 4);
} }
else
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to create binlog file %s\n", path)));
}
fsync(fd); fsync(fd);
close(router->binlog_fd); close(router->binlog_fd);
strcpy(router->binlog_name, file); strcpy(router->binlog_name, file);
@ -190,7 +195,13 @@ int fd;
strcat(path, "/"); strcat(path, "/");
strcat(path, file); strcat(path, file);
fd = open(path, O_RDWR|O_APPEND, 0666); if ((fd = open(path, O_RDWR|O_APPEND, 0666)) == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to open binlog file %s for append.\n",
path)));
return;
}
fsync(fd); fsync(fd);
close(router->binlog_fd); close(router->binlog_fd);
strcpy(router->binlog_name, file); strcpy(router->binlog_name, file);
@ -227,6 +238,7 @@ int
blr_open_binlog(ROUTER_INSTANCE *router, char *binlog) blr_open_binlog(ROUTER_INSTANCE *router, char *binlog)
{ {
char *ptr, path[1024]; char *ptr, path[1024];
int rval;
strcpy(path, "/usr/local/skysql/MaxScale"); strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL) if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
@ -238,7 +250,13 @@ char *ptr, path[1024];
strcat(path, "/"); strcat(path, "/");
strcat(path, binlog); strcat(path, binlog);
return open(path, O_RDONLY, 0666); if ((rval = open(path, O_RDONLY, 0666)) == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to open binlog file %s\n", path)));
}
return rval;
} }
/** /**
@ -255,6 +273,7 @@ blr_read_binlog(int fd, unsigned int pos, REP_HEADER *hdr)
uint8_t hdbuf[19]; uint8_t hdbuf[19];
GWBUF *result; GWBUF *result;
unsigned char *data; unsigned char *data;
int n;
if (lseek(fd, pos, SEEK_SET) != pos) if (lseek(fd, pos, SEEK_SET) != pos)
{ {
@ -265,11 +284,16 @@ unsigned char *data;
} }
/* Read the header information from the file */ /* Read the header information from the file */
if (read(fd, hdbuf, 19) != 19) if ((n = read(fd, hdbuf, 19)) != 19)
{ {
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to read header for binlog entry, " "Failed to read header for binlog entry, "
"at %d (%s).\n", pos, strerror(errno)))); "at %d (%s).\n", pos, strerror(errno))));
if (n> 0 && n < 19)
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Short read when reading the header. "
"Expected 19 bytes got %d bytes.\n",
n)));
return NULL; return NULL;
} }
hdr->timestamp = extract_field(hdbuf, 32); hdr->timestamp = extract_field(hdbuf, 32);
@ -288,7 +312,16 @@ unsigned char *data;
} }
data = GWBUF_DATA(result); data = GWBUF_DATA(result);
memcpy(data, hdbuf, 19); // Copy the header in memcpy(data, hdbuf, 19); // Copy the header in
read(fd, &data[19], hdr->event_size - 19); // Read the balance if ((n = read(fd, &data[19], hdr->event_size - 19))
!= hdr->event_size - 19) // Read the balance
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Short read when reading the event at %d. "
"Expected %d bytes got %d bytes.\n",
pos, n)));
gwbuf_consume(result, hdr->event_size);
return NULL;
}
return result; return result;
} }

View File

@ -153,10 +153,10 @@ GWBUF *ptr;
} }
router->queue = NULL; router->queue = NULL;
/* Now it is safe to unleash other threads on this router instance */ /* Now it is safe to unleash other threads on this router instance */
spinlock_acquire(&router->lock); spinlock_acquire(&router->alock);
router->reconnect_pending = 0; router->reconnect_pending = 0;
router->active_logs = 0; router->active_logs = 0;
spinlock_release(&router->lock); spinlock_release(&router->alock);
blr_start_master(router); blr_start_master(router);
} }
@ -175,7 +175,7 @@ blr_master_reconnect(ROUTER_INSTANCE *router)
{ {
int do_reconnect = 0; int do_reconnect = 0;
spinlock_acquire(&router->lock); spinlock_acquire(&router->alock);
if (router->active_logs) if (router->active_logs)
{ {
/* Currently processing a response, set a flag /* Currently processing a response, set a flag
@ -190,13 +190,13 @@ int do_reconnect = 0;
router->active_logs = 1; router->active_logs = 1;
do_reconnect = 1; do_reconnect = 1;
} }
spinlock_release(&router->lock); spinlock_release(&router->alock);
if (do_reconnect) if (do_reconnect)
{ {
blr_restart_master(router); blr_restart_master(router);
spinlock_acquire(&router->lock); spinlock_acquire(&router->alock);
router->active_logs = 0; router->active_logs = 0;
spinlock_release(&router->lock); spinlock_release(&router->alock);
} }
} }
@ -231,10 +231,9 @@ char query[128];
* manipulate the queue or the flag the router spinlock must * manipulate the queue or the flag the router spinlock must
* be held. * be held.
*/ */
spinlock_acquire(&router->lock); spinlock_acquire(&router->alock);
if (router->active_logs) if (router->active_logs)
{ {
int length;
/* /*
* Thread already processing a packet and has not got * Thread already processing a packet and has not got
* to the point that it will not look at new packets * to the point that it will not look at new packets
@ -242,21 +241,32 @@ char query[128];
*/ */
router->stats.n_queueadd++; router->stats.n_queueadd++;
router->queue = gwbuf_append(router->queue, buf); router->queue = gwbuf_append(router->queue, buf);
length = gwbuf_length(router->queue); spinlock_release(&router->alock);
spinlock_release(&router->lock);
LOGIF(LT, (skygw_log_write( LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE, "Queued data due to active log " LOGFILE_TRACE, "Queued data due to active log "
"handling. %s @ %d, queue length %d\n", "handling. %s @ %d, queue length %d\n",
router->binlog_name, router->binlog_name,
router->binlog_position, router->binlog_position,
length))); gwbuf_length(router->queue))));
return; return;
} }
else else
{ {
router->active_logs = 1; router->active_logs = 1;
if (router->queue)
{
GWBUF *tmp;
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Found an unexpected queue item"
" prepending queue of length %d.\n",
gwbuf_length(router->queue))));
tmp = gwbuf_append(router->queue, buf);
buf = tmp;
router->queue = NULL;
}
} }
spinlock_release(&router->lock); spinlock_release(&router->alock);
if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE) if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE)
{ {
@ -264,15 +274,16 @@ char query[128];
LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n", LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n",
router->master_state))); router->master_state)));
gwbuf_consume(buf, gwbuf_length(buf)); gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->lock); spinlock_acquire(&router->alock);
if (router->reconnect_pending) if (router->reconnect_pending)
{ {
spinlock_release(&router->lock); router->active_logs = 0;
spinlock_release(&router->alock);
blr_restart_master(router); blr_restart_master(router);
return; return;
} }
router->active_logs = 0; router->active_logs = 0;
spinlock_release(&router->lock); spinlock_release(&router->alock);
return; return;
} }
@ -284,15 +295,15 @@ char query[128];
MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state] MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state]
))); )));
gwbuf_consume(buf, gwbuf_length(buf)); gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->lock); spinlock_acquire(&router->alock);
router->active_logs = 0; router->active_logs = 0;
if (router->reconnect_pending) if (router->reconnect_pending)
{ {
spinlock_release(&router->lock); spinlock_release(&router->alock);
blr_restart_master(router); blr_restart_master(router);
return; return;
} }
spinlock_release(&router->lock); spinlock_release(&router->alock);
return; return;
} }
do { do {
@ -399,7 +410,7 @@ char query[128];
/* /*
* Check for messages queued by other threads. * Check for messages queued by other threads.
*/ */
spinlock_acquire(&router->lock); spinlock_acquire(&router->alock);
if ((buf = router->queue) != NULL) if ((buf = router->queue) != NULL)
{ {
router->queue = NULL; router->queue = NULL;
@ -408,16 +419,14 @@ char query[128];
{ {
if (router->reconnect_pending) if (router->reconnect_pending)
{ {
spinlock_release(&router->lock);
blr_restart_master(router); blr_restart_master(router);
spinlock_acquire(&router->lock);
} }
else else
{ {
router->active_logs = 0; router->active_logs = 0;
} }
} }
spinlock_release(&router->lock); spinlock_release(&router->alock);
} while (buf != NULL); } while (buf != NULL);
} }
@ -543,8 +552,12 @@ uint8_t *msg = NULL, *ptr, *pdata;
REP_HEADER hdr; REP_HEADER hdr;
int len, reslen; int len, reslen;
int no_residual = 1; int no_residual = 1;
int preslen = -1;
int prev_length = -1;
int n_bufs = -1, pn_bufs = -1;
/* Prepend any residual buffer to the buffer chain we have /*
* Prepend any residual buffer to the buffer chain we have
* been called with. * been called with.
*/ */
if (router->residual) if (router->residual)
@ -606,6 +619,7 @@ int no_residual = 1;
break; break;
} }
n_bufs = 0;
ptr = msg; ptr = msg;
while (p && remainder > 0) while (p && remainder > 0)
{ {
@ -616,6 +630,7 @@ int no_residual = 1;
ptr += n; ptr += n;
if (remainder > 0) if (remainder > 0)
p = p->next; p = p->next;
n_bufs++;
} }
if (remainder) if (remainder)
{ {
@ -626,6 +641,8 @@ int no_residual = 1;
"message as expected. %s @ %d\n", "message as expected. %s @ %d\n",
router->binlog_name, router->binlog_name,
router->binlog_position))); router->binlog_position)));
free(msg);
msg = NULL;
break; break;
} }
@ -653,6 +670,7 @@ int no_residual = 1;
* The message is fully contained in the current buffer * The message is fully contained in the current buffer
*/ */
ptr = pdata; ptr = pdata;
n_bufs = 1;
} }
blr_extract_header(ptr, &hdr); blr_extract_header(ptr, &hdr);
@ -662,11 +680,27 @@ int no_residual = 1;
LOGIF(LE,(skygw_log_write( LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR, LOGFILE_ERROR,
"Packet length is %d, but event size is %d, " "Packet length is %d, but event size is %d, "
"binlog file %s position %d", "binlog file %s position %d"
"reslen is %d and preslen is %d, "
"length of previous event %d. %s",
len, hdr.event_size, len, hdr.event_size,
router->binlog_name, router->binlog_name,
router->binlog_position))); router->binlog_position,
reslen, preslen, prev_length,
(prev_length == -1 ?
(no_residual ? "No residual data from previous call" : "Residual data from previous call") : "")
)));
blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len); blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len);
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"This event (0x%x) was contained in %d GWBUFs, "
"the previous events was contained in %d GWBUFs",
router->lastEventReceived, n_bufs, pn_bufs)));
if (msg)
{
free(msg);
msg = NULL;
}
break; break;
} }
if (hdr.ok == 0) if (hdr.ok == 0)
@ -768,6 +802,7 @@ int no_residual = 1;
free(msg); free(msg);
msg = NULL; msg = NULL;
} }
prev_length = len;
while (len > 0) while (len > 0)
{ {
int n, plen; int n, plen;
@ -776,6 +811,8 @@ int no_residual = 1;
pkt = gwbuf_consume(pkt, n); pkt = gwbuf_consume(pkt, n);
len -= n; len -= n;
} }
preslen = reslen;
pn_bufs = n_bufs;
} }
/* /*
@ -846,7 +883,9 @@ char file[BINLOG_FNAMELEN+1];
ptr += 19; // Skip event header ptr += 19; // Skip event header
len = hdr->event_size - 19; // Event size minus header len = hdr->event_size - 19; // Event size minus header
pos = extract_field(ptr, 32) + (extract_field(ptr+4, 32) << 32); pos = extract_field(ptr+4, 32);
pos <<= 32;
pos |= extract_field(ptr, 32);
slen = len - 8; slen = len - 8;
if (slen > BINLOG_FNAMELEN) if (slen > BINLOG_FNAMELEN)
slen = BINLOG_FNAMELEN; slen = BINLOG_FNAMELEN;
@ -905,7 +944,7 @@ MYSQL_session *auth_info;
static void static void
blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
{ {
GWBUF *pkt, *distq; GWBUF *pkt;
uint8_t *buf; uint8_t *buf;
ROUTER_SLAVE *slave; ROUTER_SLAVE *slave;
int action; int action;

View File

@ -883,6 +883,7 @@ if (DCB_BELOW_LOW_WATER(slave->dcb) && slave->binlog_pos != router->binlog_posit
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Expected to be above low water\n"))); LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Expected to be above low water\n")));
goto doitagain; goto doitagain;
} }
return rval;
} }
/** /**
@ -936,7 +937,8 @@ void
blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr) blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr)
{ {
ptr += 19; // Skip header ptr += 19; // Skip header
slave->binlog_pos = extract_field(ptr, 32) + (extract_field(ptr+4, 32) << 32); slave->binlog_pos = extract_field(ptr, 32);
slave->binlog_pos += (extract_field(ptr+4, 32) << 32);
memcpy(slave->binlogfile, ptr + 8, BINLOG_FNAMELEN); memcpy(slave->binlogfile, ptr + 8, BINLOG_FNAMELEN);
slave->binlogfile[BINLOG_FNAMELEN] = 0; slave->binlogfile[BINLOG_FNAMELEN] = 0;
} }