Merge branch 'blr' into cenh

Conflicts:
	Documentation/MaxScale Configuration And Usage Scenarios.pdf
	server/core/config.c
	server/core/dcb.c
	server/core/service.c
	server/modules/routing/Makefile
	server/modules/routing/debugcmd.c
	server/modules/routing/readwritesplit/readwritesplit.c
	server/modules/routing/readwritesplit/test/rwsplit.sh
This commit is contained in:
Mark Riddoch
2014-05-23 16:39:39 +01:00
19 changed files with 681 additions and 89 deletions

View File

@ -47,6 +47,7 @@
* error and 0 bytes to read.
* This fixes a bug with many reads from
* backend
* 07/05/2014 Mark Riddoch Addition of callback mechanism
*
* @endverbatim
*/
@ -80,6 +81,7 @@ static bool dcb_set_state_nomutex(
DCB* dcb,
const dcb_state_t new_state,
dcb_state_t* old_state);
static void dcb_call_callback(DCB *dcb, DCB_REASON reason);
DCB* dcb_get_zombies(void)
{
@ -94,8 +96,8 @@ DCB* dcb_get_zombies(void)
*
* @return A newly allocated DCB or NULL if non could be allocated.
*/
DCB * dcb_alloc(
dcb_role_t role)
DCB *
dcb_alloc(dcb_role_t role)
{
DCB *rval;
@ -119,11 +121,16 @@ DCB *rval;
spinlock_init(&rval->delayqlock);
spinlock_init(&rval->dcb_readqlock);
spinlock_init(&rval->authlock);
spinlock_init(&rval->cb_lock);
rval->fd = -1;
memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics
rval->state = DCB_STATE_ALLOC;
bitmask_init(&rval->memdata.bitmask);
rval->writeqlen = 0;
rval->high_water = 0;
rval->low_water = 0;
rval->next = NULL;
rval->callbacks = NULL;
spinlock_acquire(&dcbspin);
if (allDCBs == NULL)
@ -249,6 +256,8 @@ dcb_add_to_zombieslist(DCB *dcb)
static void
dcb_final_free(DCB *dcb)
{
DCB_CALLBACK *cb;
CHK_DCB(dcb);
ss_info_dassert(dcb->state == DCB_STATE_DISCONNECTED,
"dcb not in DCB_STATE_DISCONNECTED state.");
@ -303,17 +312,27 @@ dcb_final_free(DCB *dcb)
if (dcb->remote)
free(dcb->remote);
/* Clear write and read buffers */
if (dcb->delayq) {
GWBUF *queue = dcb->delayq;
while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL);
}
if (dcb->dcb_readqueue)
{
GWBUF* queue = dcb->dcb_readqueue;
while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL);
}
spinlock_acquire(&dcb->cb_lock);
while ((cb = dcb->callbacks) != NULL)
{
dcb->callbacks = cb->next;
free(cb);
}
spinlock_release(&dcb->cb_lock);
bitmask_free(&dcb->memdata.bitmask);
simple_mutex_done(&dcb->dcb_read_lock);
simple_mutex_done(&dcb->dcb_write_lock);
@ -689,9 +708,11 @@ return_n:
int
dcb_write(DCB *dcb, GWBUF *queue)
{
int w;
int saved_errno = 0;
int w, qlen;
int saved_errno = 0;
int below_water;
below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0;
ss_dassert(queue != NULL);
if (queue == NULL ||
@ -725,6 +746,8 @@ dcb_write(DCB *dcb, GWBUF *queue)
* the routine that drains the queue data, so we should
* not have a race condition on the event.
*/
qlen = gwbuf_length(queue);
atomic_add(&dcb->writeqlen, qlen);
dcb->writeq = gwbuf_append(dcb->writeq, queue);
dcb->stats.n_buffered++;
LOGIF(LD, (skygw_log_write(
@ -837,6 +860,8 @@ dcb_write(DCB *dcb, GWBUF *queue)
* for suspended write.
*/
dcb->writeq = queue;
qlen = gwbuf_length(queue);
atomic_add(&dcb->writeqlen, qlen);
if (queue != NULL)
{
@ -860,6 +885,13 @@ dcb_write(DCB *dcb, GWBUF *queue)
return 0;
}
spinlock_release(&dcb->writeqlock);
if (dcb->high_water && dcb->writeqlen > dcb->high_water && below_water)
{
atomic_add(&dcb->stats.n_high_water, 1);
dcb_call_callback(dcb, DCB_REASON_HIGH_WATER);
}
return 1;
}
@ -874,9 +906,12 @@ dcb_write(DCB *dcb, GWBUF *queue)
int
dcb_drain_writeq(DCB *dcb)
{
int n = 0;
int w;
int saved_errno = 0;
int n = 0;
int w;
int saved_errno = 0;
int above_water;
above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0;
spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq)
@ -937,6 +972,17 @@ int saved_errno = 0;
}
}
spinlock_release(&dcb->writeqlock);
atomic_add(&dcb->writeqlen, -n);
/* The write queue has drained, potentially need to call a callback function */
if (dcb->writeq == NULL)
dcb_call_callback(dcb, DCB_REASON_DRAINED);
if (above_water && dcb->writeqlen < dcb->low_water)
{
atomic_add(&dcb->stats.n_low_water, 1);
dcb_call_callback(dcb, DCB_REASON_LOW_WATER);
}
return n;
}
@ -979,6 +1025,8 @@ dcb_close(DCB *dcb)
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE);
dcb_call_callback(dcb, DCB_REASON_CLOSE);
if (rc == 0) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
@ -1021,6 +1069,8 @@ printDCB(DCB *dcb)
printf("\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
printf("\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
printf("\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
printf("\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
printf("\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
}
/**
@ -1067,6 +1117,8 @@ DCB *dcb;
dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
dcb = dcb->next;
}
spinlock_release(&dcbspin);
@ -1092,6 +1144,8 @@ dprintDCB(DCB *pdcb, DCB *dcb)
dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
}
/**
@ -1428,4 +1482,163 @@ int gw_write(
return w;
}
/**
* Add a callback
*
* Duplicate registrations are not allowed, therefore an error will be returned if
* the specific function, reason and userdata triple are already registered.
* An error will also be returned if the is insufficient memeory available to
* create the registration.
*
* @param dcb The DCB to add the callback to
* @param reason The callback reason
* @param cb The callback function to call
* @param userdata User data to send in the call
* @return Non-zero (true) if the callback was added
*/
int
dcb_add_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata)
{
DCB_CALLBACK *cb, *ptr;
int rval = 1;
if ((ptr = (DCB_CALLBACK *)malloc(sizeof(DCB_CALLBACK))) == NULL)
{
return 0;
}
ptr->reason = reason;
ptr->cb = callback;
ptr->userdata = userdata;
ptr->next = NULL;
spinlock_acquire(&dcb->cb_lock);
cb = dcb->callbacks;
if (cb == NULL)
{
dcb->callbacks = ptr;
spinlock_release(&dcb->cb_lock);
}
else
{
while (cb)
{
if (cb->reason == reason && cb->cb == callback &&
cb->userdata == userdata)
{
free(ptr);
spinlock_release(&dcb->cb_lock);
return 0;
}
if (cb->next == NULL)
cb->next = ptr;
cb = cb->next;
}
spinlock_release(&dcb->cb_lock);
}
return rval;
}
/**
* Remove a callback from the callback list for the DCB
*
* Searches down the linked list to find he callback with a matching reason, function
* and userdata.
*
* @param dcb The DCB to add the callback to
* @param reason The callback reason
* @param cb The callback function to call
* @param userdata User data to send in the call
* @return Non-zero (true) if the callback was removed
*/
int
dcb_remove_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON), void *userdata)
{
DCB_CALLBACK *cb, *pcb = NULL;
int rval = 0;
spinlock_acquire(&dcb->cb_lock);
cb = dcb->callbacks;
if (cb == NULL)
{
rval = 0;
}
else
{
while (cb)
{
if (cb->reason == reason && cb->cb == callback
&& cb->userdata == userdata)
{
if (pcb == NULL)
pcb->next = cb->next;
else
dcb->callbacks = cb->next;
spinlock_release(&dcb->cb_lock);
free(cb);
rval = 1;
break;
}
pcb = cb;
cb = cb->next;
}
}
if (!rval)
spinlock_release(&dcb->cb_lock);
return rval;
}
/**
* Call the set of callbacks registered for a particular reason.
*
* @param dcb The DCB to call the callbacks regarding
* @param reason The reason that has triggered the call
*/
static void
dcb_call_callback(DCB *dcb, DCB_REASON reason)
{
DCB_CALLBACK *cb, *nextcb;
spinlock_acquire(&dcb->cb_lock);
cb = dcb->callbacks;
while (cb)
{
if (cb->reason == reason)
{
nextcb = cb->next;
spinlock_release(&dcb->cb_lock);
cb->cb(dcb, reason, cb->userdata);
spinlock_acquire(&dcb->cb_lock);
cb = nextcb;
}
else
cb = cb->next;
}
spinlock_release(&dcb->cb_lock);
}
/**
* Check the passed DCB to ensure it is in the list of allDCBS
*
* @param DCB The DCB to check
* @return 1 if the DCB is in the list, otherwise 0
*/
int
dcb_isvalid(DCB *dcb)
{
DCB *ptr;
int rval = 0;
spinlock_acquire(&dcbspin);
ptr = allDCBs;
while (ptr)
{
if (ptr == dcb)
{
rval = 1;
break;
}
ptr = ptr->next;
}
spinlock_release(&dcbspin);
return rval;
}