Implement non-blocking alternative to mutexes for read serialisation

This commit is contained in:
Mark Riddoch
2014-06-10 17:59:49 +01:00
parent f7a177dac8
commit 2963a8448b
7 changed files with 230 additions and 51 deletions

View File

@ -120,6 +120,86 @@ perror("setsockopt");
router->stats.n_masterstarts++;
}
/**
* Reconnect to the master server.
*
* IMPORTANT - must be called with router->active_logs set by the
* thread that set active_logs.
*
* @param router The router instance
*/
static void
blr_restart_master(ROUTER_INSTANCE *router)
{
GWBUF *ptr;
dcb_close(router->master);
dcb_free(router->master);
dcb_free(router->client);
/* Discard the queued residual data */
ptr = router->residual;
while (ptr)
{
ptr = gwbuf_consume(ptr, GWBUF_LENGTH(ptr));
}
router->residual = NULL;
/* Discard the queued data */
ptr = router->queue;
while (ptr)
{
ptr = gwbuf_consume(ptr, GWBUF_LENGTH(ptr));
}
router->queue = NULL;
/* Now it is safe to unleash other threads on this router instance */
spinlock_acquire(&router->lock);
router->reconnect_pending = 0;
router->active_logs = 0;
spinlock_release(&router->lock);
blr_start_master(router);
}
/**
* Request a reconnect to the master.
*
* If another thread is active processing messages from the master
* then merely set a flag for that thread to do the restart. If no
* threads are active then directly call the restart routine to
* reconnect to the master.
*
* @param router The router instance
*/
void
blr_master_reconnect(ROUTER_INSTANCE *router)
{
int do_reconnect = 0;
spinlock_acquire(&router->lock);
if (router->active_logs)
{
/* Currently processing a response, set a flag
* and get the thread that is process a response
* to deal with the reconnect.
*/
router->reconnect_pending = 1;
router->stats.n_delayedreconnects++;
}
else
{
router->active_logs = 1;
do_reconnect = 1;
}
spinlock_release(&router->lock);
if (do_reconnect)
{
blr_restart_master(router);
spinlock_acquire(&router->lock);
router->active_logs = 0;
spinlock_release(&router->lock);
}
}
/**
* Binlog router master side state machine event handler.
*
@ -154,6 +234,7 @@ char query[128];
spinlock_acquire(&router->lock);
if (router->active_logs)
{
int length;
/*
* Thread already processing a packet and has not got
* to the point that it will not look at new packets
@ -161,7 +242,14 @@ char query[128];
*/
router->stats.n_queueadd++;
router->queue = gwbuf_append(router->queue, buf);
length = gwbuf_length(router->queue);
spinlock_release(&router->lock);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE, "Queued data due to active log "
"handling. %s @ %d, queue length %d\n",
router->binlog_name,
router->binlog_position,
length)));
return;
}
else
@ -172,11 +260,17 @@ char query[128];
if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE)
{
LOGIF(LM, (skygw_log_write(
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n",
router->master_state)));
gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->lock);
if (router->reconnect_pending)
{
spinlock_release(&router->lock);
blr_restart_master(router);
return;
}
router->active_logs = 0;
spinlock_release(&router->lock);
return;
@ -184,7 +278,7 @@ char query[128];
if (router->master_state != BLRM_BINLOGDUMP && MYSQL_RESPONSE_ERR(buf))
{
LOGIF(LM, (skygw_log_write(
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Received error: %d, %s from master during %s phase of the master state machine.\n",
MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state]
@ -192,6 +286,12 @@ char query[128];
gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->lock);
router->active_logs = 0;
if (router->reconnect_pending)
{
spinlock_release(&router->lock);
blr_restart_master(router);
return;
}
spinlock_release(&router->lock);
return;
}
@ -281,12 +381,20 @@ char query[128];
spinlock_acquire(&router->lock);
if ((buf = router->queue) != NULL)
{
router->queue = buf->next;
buf->next = NULL;
router->queue = NULL;
}
else
{
router->active_logs = 0;
if (router->reconnect_pending)
{
spinlock_release(&router->lock);
blr_restart_master(router);
spinlock_acquire(&router->lock);
}
else
{
router->active_logs = 0;
}
}
spinlock_release(&router->lock);
} while (buf != NULL);
@ -415,7 +523,9 @@ REP_HEADER hdr;
int len, reslen;
int no_residual = 1;
/* Prepend any residual buffer to the buffer chain we have been called with. */
/* Prepend any residual buffer to the buffer chain we have
* been called with.
*/
if (router->residual)
{
pkt = gwbuf_append(router->residual, pkt);
@ -833,14 +943,16 @@ ROUTER_SLAVE *slave;
static void
blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len)
{
char buf[400], *bufp;
int i;
skygw_log_write(file, "%s length = %d: ", msg, len);
bufp = buf;
bufp += sprintf(bufp, "%s length = %d: ", msg, len);
for (i = 0; i < len && i < 40; i++)
skygw_log_write(file, "0x%02x ", ptr[i]);
bufp += sprintf(bufp, "0x%02x ", ptr[i]);
if (i < len)
skygw_log_write_flush(file, "...\n");
skygw_log_write_flush(file, "%s...\n", buf);
else
skygw_log_write_flush(file, "\n");
skygw_log_write_flush(file, "%s\n", buf);
}