Some general tidyup plus addition of code to block zombie processing

if epoll_wait returned multiple descriptors
This commit is contained in:
Mark Riddoch 2014-09-19 10:50:54 +01:00
parent a5975d8cff
commit 2402d55de6
4 changed files with 112 additions and 69 deletions

View File

@ -403,13 +403,16 @@ DCB_CALLBACK *cb;
* operation of clearing this bit means that no bits are set in
* the memdata.bitmask then the DCB is no longer able to be
* referenced and it can be finally removed.
* Thread won't clear its bit from bitmask of the DCB it is still using.
*
* The excluded DCB allows a thread to exclude a DCB from zombie processing.
* It is used when a thread calls dcb_process_zombies when there is
* a DCB that the caller knows it will continue processing with.
*
* @param threadid The thread ID of the caller
* @param dcb_in_use The DCB the thread currently uses, NULL or valid DCB.
* @param excluded The DCB the thread currently uses, NULL or valid DCB.
*/
DCB *
dcb_process_zombies(int threadid, DCB *dcb_in_use)
dcb_process_zombies(int threadid, DCB *excluded)
{
DCB *ptr, *lptr;
DCB* dcb_list = NULL;
@ -426,78 +429,100 @@ bool succp = false;
if (!zombies)
return NULL;
/*
* Process the zombie queue and create a list of DCB's that can be
* finally freed. This processing is down under a spinlock that
* will prevent new entries being added to the zombie queue. Therefore
* we do not want to do any expensive operations under this spinlock
* as it will block other threads. The expensive operations will be
* performed on the victim queue within holding the zombie queue
* spinlock.
*/
spinlock_acquire(&zombiespin);
ptr = zombies;
lptr = NULL;
while (ptr)
{
CHK_DCB(ptr);
/** Don't clear the bit from DCB the user currently uses */
if (dcb_in_use == NULL || ptr != dcb_in_use)
{
bitmask_clear(&ptr->memdata.bitmask, threadid);
}
if (ptr == dcb_in_use)
ss_dassert(!bitmask_isallclear(&ptr->memdata.bitmask));
if (bitmask_isallclear(&ptr->memdata.bitmask))
{
/**
* Remove the DCB from the zombie queue
* and call the final free routine for the
* DCB
*
* ptr is the DCB we are processing
* lptr is the previous DCB on the zombie queue
* or NULL if the DCB is at the head of the queue
* tptr is the DCB after the one we are processing
* on the zombie queue
*/
DCB *tptr = ptr->memdata.next;
if (lptr == NULL)
zombies = tptr;
else
lptr->memdata.next = tptr;
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [dcb_process_zombies] Remove dcb %p fd %d "
"in state %s from zombies list.",
pthread_self(),
ptr,
ptr->fd,
STRDCBSTATE(ptr->state))));
ss_info_dassert(ptr->state == DCB_STATE_ZOMBIE,
"dcb not in DCB_STATE_ZOMBIE state.");
/*<
* Move dcb to linked list of victim dcbs.
*/
if (dcb_list == NULL) {
dcb_list = ptr;
dcb = dcb_list;
} else {
dcb->memdata.next = ptr;
dcb = dcb->memdata.next;
}
dcb->memdata.next = NULL;
ptr = tptr;
}
else
/*
* Skip processing of the excluded DCB
*/
if (ptr == excluded)
{
lptr = ptr;
ptr = ptr->memdata.next;
}
else
{
bitmask_clear(&ptr->memdata.bitmask, threadid);
if (bitmask_isallclear(&ptr->memdata.bitmask))
{
/**
* Remove the DCB from the zombie queue
* and call the final free routine for the
* DCB
*
* ptr is the DCB we are processing
* lptr is the previous DCB on the zombie queue
* or NULL if the DCB is at the head of the
* queue tptr is the DCB after the one we are
* processing on the zombie queue
*/
DCB *tptr = ptr->memdata.next;
if (lptr == NULL)
zombies = tptr;
else
lptr->memdata.next = tptr;
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [dcb_process_zombies] Remove dcb "
"%p fd %d " "in state %s from the "
"list of zombies.",
pthread_self(),
ptr,
ptr->fd,
STRDCBSTATE(ptr->state))));
ss_info_dassert(ptr->state == DCB_STATE_ZOMBIE,
"dcb not in DCB_STATE_ZOMBIE state.");
/*<
* Move dcb to linked list of victim dcbs.
*/
if (dcb_list == NULL) {
dcb_list = ptr;
dcb = dcb_list;
} else {
dcb->memdata.next = ptr;
dcb = dcb->memdata.next;
}
dcb->memdata.next = NULL;
ptr = tptr;
}
else
{
lptr = ptr;
ptr = ptr->memdata.next;
}
}
}
spinlock_release(&zombiespin);
/*
* Process the victim queue. These are DCBs that are not in
* use by any thread.
* The corresponding file descriptor is closed, the DCB marked
* as disconnected and the DCB itself is fianlly freed.
*/
dcb = dcb_list;
/** Close, and set DISCONNECTED victims */
while (dcb != NULL) {
DCB* dcb_next = NULL;
int rc = 0;
/*<
* Close file descriptor and move to clean-up phase.
*/
ss_dassert(dcb_in_use != dcb);
ss_dassert(excluded != dcb);
rc = close(dcb->fd);
if (rc < 0) {
@ -1119,8 +1144,8 @@ int above_water;
/**
* Removes dcb from poll set, and adds it to zombies list. As a consequense,
* dcb first moves to DCB_STATE_NOPOLLING, and then to DCB_STATE_ZOMBIE state.
* At the end of the function state may not be DCB_STATE_ZOMBIE because once dcb_initlock
* is released parallel threads may change the state.
* At the end of the function state may not be DCB_STATE_ZOMBIE because once
* dcb_initlock is released parallel threads may change the state.
*
* Parameters:
* @param dcb The DCB to close
@ -1943,9 +1968,11 @@ int rval = 0;
* to return immediately and and process other events.
*
* @param dcb The DCB that has data available
* @param thread_id The ID of the calling thread
* @param nozombies If non-zero then do not do zombie processing
*/
void
dcb_pollin(DCB *dcb, int thread_id)
dcb_pollin(DCB *dcb, int thread_id, int nozombies)
{
spinlock_acquire(&dcb->pollinlock);
@ -1956,7 +1983,8 @@ dcb_pollin(DCB *dcb, int thread_id)
if (dcb->readcheck)
{
dcb->stats.n_readrechecks++;
dcb_process_zombies(thread_id, dcb);
if (!nozombies)
dcb_process_zombies(thread_id, dcb);
}
dcb->readcheck = 0;
spinlock_release(&dcb->pollinlock);
@ -1987,9 +2015,11 @@ dcb_pollin(DCB *dcb, int thread_id)
* to return immediately and and process other events.
*
* @param dcb The DCB thats available for writes
* @param thread_id The ID of the calling thread
* @param nozombies If non-zero then do not do zombie processing
*/
void
dcb_pollout(DCB *dcb, int thread_id)
dcb_pollout(DCB *dcb, int thread_id, int nozombies)
{
spinlock_acquire(&dcb->polloutlock);
@ -1999,7 +2029,8 @@ dcb_pollout(DCB *dcb, int thread_id)
do {
if (dcb->writecheck)
{
dcb_process_zombies(thread_id, dcb);
if (!nozombies)
dcb_process_zombies(thread_id, dcb);
dcb->stats.n_writerechecks++;
}
dcb->writecheck = 0;

View File

@ -16,6 +16,7 @@
* Copyright SkySQL Ab 2014
*/
#include <stdlib.h>
#include <string.h>
#include <housekeeper.h>
#include <thread.h>
#include <spinlock.h>

View File

@ -49,10 +49,19 @@ extern int lm_enabled_logfiles_bitmask;
* @endverbatim
*/
/**
* Control the use of mutexes for the epoll_wait call. Setting to 1 will
* cause the epoll_wait calls to be moved under a mutex. This may be useful
* for debuggign purposes but should be avoided in general use.
*/
#define MUTEX_EPOLL 0
static int epoll_fd = -1; /*< The epoll file descriptor */
static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */
static GWBITMASK poll_mask;
#if MUTEX_EPOLL
static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */
#endif
static int n_waiting = 0; /*< No. of threads in epoll_wait */
/**
@ -154,7 +163,9 @@ int i;
thread_data[i].state = THREAD_STOPPED;
}
}
#if MUTEX_EPOLL
simple_mutex_init(&epoll_wait_mutex, "epoll_wait_mutex");
#endif
hktask_add("Load Average", poll_loadav, NULL, POLL_LOAD_FREQ);
n_avg_samples = 15 * 60 / POLL_LOAD_FREQ;
@ -359,7 +370,7 @@ DCB *zombies = NULL;
thread_id)));
no_op = TRUE;
}
#if 0
#if MUTEX_EPOLL
simple_mutex_lock(&epoll_wait_mutex, TRUE);
#endif
if (thread_data)
@ -385,7 +396,7 @@ DCB *zombies = NULL;
{
atomic_add(&n_waiting, -1);
if (process_zombies_only) {
#if 0
#if MUTEX_EPOLL
simple_mutex_unlock(&epoll_wait_mutex);
#endif
goto process_zombies;
@ -413,7 +424,7 @@ DCB *zombies = NULL;
if (n_waiting == 0)
atomic_add(&pollStats.n_nothreads, 1);
#if 0
#if MUTEX_EPOLL
simple_mutex_unlock(&epoll_wait_mutex);
#endif
#endif /* BLOCKINGPOLL */
@ -442,7 +453,7 @@ DCB *zombies = NULL;
for (i = 0; i < nfds; i++)
{
DCB *dcb = (DCB *)events[i].data.ptr;
DCB *dcb = (DCB *)events[i].data.ptr;
__uint32_t ev = events[i].events;
CHK_DCB(dcb);
@ -504,7 +515,7 @@ DCB *zombies = NULL;
#else
atomic_add(&pollStats.n_write,
1);
dcb_pollout(dcb, thread_id);
dcb_pollout(dcb, thread_id, nfds);
#endif
} else {
LOGIF(LD, (skygw_log_write(
@ -554,7 +565,7 @@ DCB *zombies = NULL;
#if MUTEX_BLOCK
dcb->func.read(dcb);
#else
dcb_pollin(dcb, thread_id);
dcb_pollin(dcb, thread_id, nfds);
#endif
}
#if MUTEX_BLOCK

View File

@ -271,8 +271,8 @@ int fail_accept_errno;
#define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water)
#define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water)
void dcb_pollin(DCB *, int);
void dcb_pollout(DCB *, int);
void dcb_pollin(DCB *, int, int);
void dcb_pollout(DCB *, int, int);
DCB *dcb_get_zombies(void);
int gw_write(
#if defined(SS_DEBUG)