Impose locking on dcb_call_foreach DCB callback mechanism. Add counters and maxima for DCBs and zombies to aid diagnosis.

This commit is contained in:
counterpoint
2015-09-29 11:58:31 +01:00
parent 2231d0870c
commit d582c5880c

View File

@ -56,6 +56,8 @@
* remove dcb_set_state etc, simplifications. * remove dcb_set_state etc, simplifications.
* 10/07/2015 Martin Brampton Simplify, merge dcb_read and dcb_read_n * 10/07/2015 Martin Brampton Simplify, merge dcb_read and dcb_read_n
* 04/09/2015 Martin Brampton Changes to ensure DCB always has session pointer * 04/09/2015 Martin Brampton Changes to ensure DCB always has session pointer
* 28/09/2015 Martin Brampton Add counters, maxima for DCBs and zombies
* 29/05/2015 Martin Brampton Impose locking in dcb_call_foreach callbacks
* *
* @endverbatim * @endverbatim
*/ */
@ -87,7 +89,11 @@ extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info; extern __thread log_info_t tls_log_info;
static DCB *allDCBs = NULL; /* Diagnostics need a list of DCBs */ static DCB *allDCBs = NULL; /* Diagnostics need a list of DCBs */
static int nDCBs = 0;
static int maxDCBs = 0;
static DCB *zombies = NULL; static DCB *zombies = NULL;
static int nzombies = 0;
static int maxzombies = 0;
static SPINLOCK dcbspin = SPINLOCK_INIT; static SPINLOCK dcbspin = SPINLOCK_INIT;
static SPINLOCK zombiespin = SPINLOCK_INIT; static SPINLOCK zombiespin = SPINLOCK_INIT;
@ -227,6 +233,8 @@ DCB *newdcb;
ptr = ptr->next; ptr = ptr->next;
ptr->next = newdcb; ptr->next = newdcb;
} }
nDCBs++;
if (nDCBs > maxDCBs) maxDCBs = nDCBs;
spinlock_release(&dcbspin); spinlock_release(&dcbspin);
return newdcb; return newdcb;
} }
@ -328,6 +336,7 @@ dcb_final_free(DCB *dcb)
if (ptr) if (ptr)
ptr->next = dcb->next; ptr->next = dcb->next;
} }
nDCBs--;
spinlock_release(&dcbspin); spinlock_release(&dcbspin);
if (dcb->session) { if (dcb->session) {
@ -492,6 +501,7 @@ DCB *listofdcb = NULL;
* (listofdcb) is not NULL, then it follows that * (listofdcb) is not NULL, then it follows that
* dcb will also not be null. * dcb will also not be null.
*/ */
nzombies--;
zombiedcb->memdata.next = listofdcb; zombiedcb->memdata.next = listofdcb;
listofdcb = zombiedcb; listofdcb = zombiedcb;
} }
@ -565,6 +575,8 @@ dcb_process_victim_queue(DCB *listofdcb)
nextdcb = dcb->memdata.next; nextdcb = dcb->memdata.next;
dcb->memdata.next = zombies; dcb->memdata.next = zombies;
zombies = dcb; zombies = dcb;
nzombies++;
if (nzombies > maxzombies) maxzombies = nzombies;
spinlock_release(&zombiespin); spinlock_release(&zombiespin);
if (dcb->server) if (dcb->server)
{ {
@ -1800,6 +1812,8 @@ dcb_close(DCB *dcb)
dcb->dcb_is_zombie = true; dcb->dcb_is_zombie = true;
dcb->memdata.next = zombies; dcb->memdata.next = zombies;
zombies = dcb; zombies = dcb;
nzombies++;
if (nzombies > maxzombies) maxzombies = nzombies;
/*< Set bit for each maxscale thread. This should be done before /*< Set bit for each maxscale thread. This should be done before
* the state is changed, so as to protect the DCB from premature * the state is changed, so as to protect the DCB from premature
* destruction. */ * destruction. */
@ -2695,17 +2709,21 @@ dcb_call_foreach(struct server* server, DCB_REASON reason)
case DCB_REASON_NOT_RESPONDING: case DCB_REASON_NOT_RESPONDING:
{ {
DCB *dcb; DCB *dcb;
dcb = dcb_get_next(NULL); spinlock_acquire(&dcbspin);
dcb = allDCBs;
while (dcb != NULL) while (dcb != NULL)
{ {
spinlock_acquire(&dcb->dcb_initlock);
if (dcb->state == DCB_STATE_POLLING && dcb->server && if (dcb->state == DCB_STATE_POLLING && dcb->server &&
strcmp(dcb->server->unique_name,server->unique_name) == 0) strcmp(dcb->server->unique_name,server->unique_name) == 0)
{ {
dcb_call_callback(dcb, DCB_REASON_NOT_RESPONDING); dcb_call_callback(dcb, DCB_REASON_NOT_RESPONDING);
} }
dcb = dcb_get_next(dcb); spinlock_release(&dcb->dcb_initlock);
dcb = dcb->next;
} }
spinlock_release(&dcbspin);
break; break;
} }