Merge branch 'develop' into blr

Conflicts:
	server/core/dcb.c
	server/modules/include/blr.h
	server/modules/routing/binlog/STATUS
	server/modules/routing/binlog/blr.c
	server/modules/routing/binlog/blr_file.c
	server/modules/routing/binlog/blr_master.c
	server/modules/routing/binlog/blr_slave.c
This commit is contained in:
Mark Riddoch
2014-06-08 00:50:36 +01:00
69 changed files with 5108 additions and 900 deletions

View File

@ -68,6 +68,7 @@
#include <atomic.h>
#include <skygw_utils.h>
#include <log_manager.h>
#include <hashtable.h>
extern int lm_enabled_logfiles_bitmask;
@ -311,11 +312,16 @@ DCB_CALLBACK *cb;
if (dcb->remote)
free(dcb->remote);
/* Consume dcb->delayq buffer */
/* Clear write and read buffers */
if (dcb->delayq) {
GWBUF *queue = dcb->delayq;
while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL);
}
if (dcb->dcb_readqueue)
{
GWBUF* queue = dcb->dcb_readqueue;
while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL);
}
spinlock_acquire(&dcb->cb_lock);
while ((cb = dcb->callbacks) != NULL)
@ -325,6 +331,11 @@ DCB_CALLBACK *cb;
}
spinlock_release(&dcb->cb_lock);
if (dcb->dcb_readqueue)
{
GWBUF* queue = dcb->dcb_readqueue;
while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL);
}
bitmask_free(&dcb->memdata.bitmask);
simple_mutex_done(&dcb->dcb_read_lock);
simple_mutex_done(&dcb->dcb_write_lock);
@ -707,11 +718,20 @@ int below_water;
below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0;
ss_dassert(queue != NULL);
/**
* SESSION_STATE_STOPPING means that one of the backends is closing
* the router session. Some backends may have not completed
* authentication yet and thus they have no information about router
* being closed. Session state is changed to SESSION_STATE_STOPPING
* before router's closeSession is called and that tells that DCB may
* still be writable.
*/
if (queue == NULL ||
(dcb->state != DCB_STATE_ALLOC &&
dcb->state != DCB_STATE_POLLING &&
dcb->state != DCB_STATE_LISTENING &&
dcb->state != DCB_STATE_NOPOLLING))
dcb->state != DCB_STATE_NOPOLLING &&
dcb->session->state != SESSION_STATE_STOPPING))
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
@ -722,6 +742,7 @@ int below_water;
dcb,
STRDCBSTATE(dcb->state),
dcb->fd)));
ss_dassert(false);
return 0;
}
@ -738,7 +759,10 @@ int below_water;
* the routine that drains the queue data, so we should
* not have a race condition on the event.
*/
qlen = gwbuf_length(queue);
if (queue)
qlen = gwbuf_length(queue);
else
qlen = 0;
atomic_add(&dcb->writeqlen, qlen);
dcb->writeq = gwbuf_append(dcb->writeq, queue);
dcb->stats.n_buffered++;
@ -852,7 +876,14 @@ int below_water;
* for suspended write.
*/
dcb->writeq = queue;
qlen = gwbuf_length(queue);
if (queue)
{
qlen = gwbuf_length(queue);
}
else
{
qlen = 0;
}
atomic_add(&dcb->writeqlen, qlen);
if (queue != NULL)
@ -1055,14 +1086,21 @@ printDCB(DCB *dcb)
printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state));
if (dcb->remote)
printf("\tConnected to: %s\n", dcb->remote);
printf("\tQueued write data: %d\n", gwbuf_length(dcb->writeq));
if (dcb->writeq)
printf("\tQueued write data: %d\n",gwbuf_length(dcb->writeq));
printf("\tStatistics:\n");
printf("\t\tNo. of Reads: %d\n", dcb->stats.n_reads);
printf("\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
printf("\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
printf("\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
printf("\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
printf("\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
printf("\t\tNo. of Reads: %d\n",
dcb->stats.n_reads);
printf("\t\tNo. of Writes: %d\n",
dcb->stats.n_writes);
printf("\t\tNo. of Buffered Writes: %d\n",
dcb->stats.n_buffered);
printf("\t\tNo. of Accepts: %d\n",
dcb->stats.n_accepts);
printf("\t\tNo. of High Water Events: %d\n",
dcb->stats.n_high_water);
printf("\t\tNo. of Low Water Events: %d\n",
dcb->stats.n_low_water);
}
/**
@ -1097,13 +1135,17 @@ DCB *dcb;
while (dcb)
{
dcb_printf(pdcb, "DCB: %p\n", (void *)dcb);
dcb_printf(pdcb, "\tDCB state: %s\n", gw_dcb_state2string(dcb->state));
dcb_printf(pdcb, "\tDCB state: %s\n",
gw_dcb_state2string(dcb->state));
if (dcb->session && dcb->session->service)
dcb_printf(pdcb, "\tService: %s\n", dcb->session->service->name);
dcb_printf(pdcb, "\tService: %s\n",
dcb->session->service->name);
if (dcb->remote)
dcb_printf(pdcb, "\tConnected to: %s\n", dcb->remote);
dcb_printf(pdcb, "\tConnected to: %s\n",
dcb->remote);
if (dcb->writeq)
dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq));
dcb_printf(pdcb, "\tQueued write data: %d\n",
gwbuf_length(dcb->writeq));
dcb_printf(pdcb, "\tStatistics:\n");
dcb_printf(pdcb, "\t\tNo. of Reads: %d\n", dcb->stats.n_reads);
dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
@ -1116,6 +1158,32 @@ DCB *dcb;
spinlock_release(&dcbspin);
}
/**
* Diagnotic routine to print DCB data in a tabular form.
*
* @param pdcb DCB to print results to
*/
void
dListDCBs(DCB *pdcb)
{
DCB *dcb;
spinlock_acquire(&dcbspin);
dcb = allDCBs;
dcb_printf(pdcb, " %-14s | %-26s | %-20s | %s\n",
"DCB", "State", "Service", "Remote");
dcb_printf(pdcb, "-----------------------------------------------------------------------------\n");
while (dcb)
{
dcb_printf(pdcb, " %-14p | %-26s | %-20s | %s\n",
dcb, gw_dcb_state2string(dcb->state),
(dcb->session->service ? dcb->session->service->name : ""),
(dcb->remote ? dcb->remote : ""));
dcb = dcb->next;
}
spinlock_release(&dcbspin);
}
/**
* Diagnostic to print a DCB to another DCB
*
@ -1129,16 +1197,24 @@ dprintDCB(DCB *pdcb, DCB *dcb)
dcb_printf(pdcb, "\tDCB state: %s\n", gw_dcb_state2string(dcb->state));
if (dcb->remote)
dcb_printf(pdcb, "\tConnected to: %s\n", dcb->remote);
dcb_printf(pdcb, "\tOwning Session: %d\n", dcb->session);
dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq));
dcb_printf(pdcb, "\tDelayed write data: %d\n", gwbuf_length(dcb->delayq));
dcb_printf(pdcb, "\tOwning Session: %p\n", dcb->session);
if (dcb->writeq)
dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq));
if (dcb->delayq)
dcb_printf(pdcb, "\tDelayed write data: %d\n", gwbuf_length(dcb->delayq));
dcb_printf(pdcb, "\tStatistics:\n");
dcb_printf(pdcb, "\t\tNo. of Reads: %d\n", dcb->stats.n_reads);
dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
dcb_printf(pdcb, "\t\tNo. of Reads: %d\n",
dcb->stats.n_reads);
dcb_printf(pdcb, "\t\tNo. of Writes: %d\n",
dcb->stats.n_writes);
dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n",
dcb->stats.n_buffered);
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n",
dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n",
dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n",
dcb->stats.n_low_water);
}
/**
@ -1533,7 +1609,7 @@ int rval = 1;
/**
* Remove a callback from the callback list for the DCB
*
* Searches down the linked list to find he callback with a matching reason, function
* Searches down the linked list to find the callback with a matching reason, function
* and userdata.
*
* @param dcb The DCB to add the callback to