Merge branch 'release-1.0beta-refresh' into blr
This commit is contained in:
@ -404,17 +404,22 @@ DCB_CALLBACK *cb;
|
||||
* the memdata.bitmask then the DCB is no longer able to be
|
||||
* referenced and it can be finally removed.
|
||||
*
|
||||
* The excluded DCB allows a thread to exclude a DCB from zombie processing.
|
||||
* It is used when a thread calls dcb_process_zombies when there is
|
||||
* a DCB that the caller knows it will continue processing with.
|
||||
*
|
||||
* @param threadid The thread ID of the caller
|
||||
* @param excluded The DCB the thread currently uses, NULL or valid DCB.
|
||||
*/
|
||||
DCB *
|
||||
dcb_process_zombies(int threadid)
|
||||
dcb_process_zombies(int threadid, DCB *excluded)
|
||||
{
|
||||
DCB *ptr, *lptr;
|
||||
DCB* dcb_list = NULL;
|
||||
DCB* dcb = NULL;
|
||||
bool succp = false;
|
||||
|
||||
/*<
|
||||
/**
|
||||
* Perform a dirty read to see if there is anything in the queue.
|
||||
* This avoids threads hitting the queue spinlock when the queue
|
||||
* is empty. This will really help when the only entry is being
|
||||
@ -424,69 +429,100 @@ bool succp = false;
|
||||
if (!zombies)
|
||||
return NULL;
|
||||
|
||||
/*
|
||||
* Process the zombie queue and create a list of DCB's that can be
|
||||
* finally freed. This processing is down under a spinlock that
|
||||
* will prevent new entries being added to the zombie queue. Therefore
|
||||
* we do not want to do any expensive operations under this spinlock
|
||||
* as it will block other threads. The expensive operations will be
|
||||
* performed on the victim queue within holding the zombie queue
|
||||
* spinlock.
|
||||
*/
|
||||
spinlock_acquire(&zombiespin);
|
||||
ptr = zombies;
|
||||
lptr = NULL;
|
||||
while (ptr)
|
||||
{
|
||||
bitmask_clear(&ptr->memdata.bitmask, threadid);
|
||||
if (bitmask_isallclear(&ptr->memdata.bitmask))
|
||||
{
|
||||
/*<
|
||||
* Remove the DCB from the zombie queue
|
||||
* and call the final free routine for the
|
||||
* DCB
|
||||
*
|
||||
* ptr is the DCB we are processing
|
||||
* lptr is the previous DCB on the zombie queue
|
||||
* or NULL if the DCB is at the head of the queue
|
||||
* tptr is the DCB after the one we are processing
|
||||
* on the zombie queue
|
||||
*/
|
||||
DCB *tptr = ptr->memdata.next;
|
||||
if (lptr == NULL)
|
||||
zombies = tptr;
|
||||
else
|
||||
lptr->memdata.next = tptr;
|
||||
LOGIF(LD, (skygw_log_write_flush(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_process_zombies] Remove dcb %p fd %d "
|
||||
"in state %s from zombies list.",
|
||||
pthread_self(),
|
||||
ptr,
|
||||
ptr->fd,
|
||||
STRDCBSTATE(ptr->state))));
|
||||
ss_info_dassert(ptr->state == DCB_STATE_ZOMBIE,
|
||||
"dcb not in DCB_STATE_ZOMBIE state.");
|
||||
/*<
|
||||
* Move dcb to linked list of victim dcbs.
|
||||
*/
|
||||
if (dcb_list == NULL) {
|
||||
dcb_list = ptr;
|
||||
dcb = dcb_list;
|
||||
} else {
|
||||
dcb->memdata.next = ptr;
|
||||
dcb = dcb->memdata.next;
|
||||
}
|
||||
dcb->memdata.next = NULL;
|
||||
ptr = tptr;
|
||||
}
|
||||
else
|
||||
{
|
||||
CHK_DCB(ptr);
|
||||
|
||||
/*
|
||||
* Skip processing of the excluded DCB
|
||||
*/
|
||||
if (ptr == excluded)
|
||||
{
|
||||
lptr = ptr;
|
||||
ptr = ptr->memdata.next;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
bitmask_clear(&ptr->memdata.bitmask, threadid);
|
||||
|
||||
if (bitmask_isallclear(&ptr->memdata.bitmask))
|
||||
{
|
||||
/**
|
||||
* Remove the DCB from the zombie queue
|
||||
* and call the final free routine for the
|
||||
* DCB
|
||||
*
|
||||
* ptr is the DCB we are processing
|
||||
* lptr is the previous DCB on the zombie queue
|
||||
* or NULL if the DCB is at the head of the
|
||||
* queue tptr is the DCB after the one we are
|
||||
* processing on the zombie queue
|
||||
*/
|
||||
DCB *tptr = ptr->memdata.next;
|
||||
if (lptr == NULL)
|
||||
zombies = tptr;
|
||||
else
|
||||
lptr->memdata.next = tptr;
|
||||
LOGIF(LD, (skygw_log_write_flush(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_process_zombies] Remove dcb "
|
||||
"%p fd %d " "in state %s from the "
|
||||
"list of zombies.",
|
||||
pthread_self(),
|
||||
ptr,
|
||||
ptr->fd,
|
||||
STRDCBSTATE(ptr->state))));
|
||||
ss_info_dassert(ptr->state == DCB_STATE_ZOMBIE,
|
||||
"dcb not in DCB_STATE_ZOMBIE state.");
|
||||
/*<
|
||||
* Move dcb to linked list of victim dcbs.
|
||||
*/
|
||||
if (dcb_list == NULL) {
|
||||
dcb_list = ptr;
|
||||
dcb = dcb_list;
|
||||
} else {
|
||||
dcb->memdata.next = ptr;
|
||||
dcb = dcb->memdata.next;
|
||||
}
|
||||
dcb->memdata.next = NULL;
|
||||
ptr = tptr;
|
||||
}
|
||||
else
|
||||
{
|
||||
lptr = ptr;
|
||||
ptr = ptr->memdata.next;
|
||||
}
|
||||
}
|
||||
}
|
||||
spinlock_release(&zombiespin);
|
||||
|
||||
/*
|
||||
* Process the victim queue. These are DCBs that are not in
|
||||
* use by any thread.
|
||||
* The corresponding file descriptor is closed, the DCB marked
|
||||
* as disconnected and the DCB itself is fianlly freed.
|
||||
*/
|
||||
dcb = dcb_list;
|
||||
/*< Close, and set DISCONNECTED victims */
|
||||
while (dcb != NULL) {
|
||||
DCB* dcb_next = NULL;
|
||||
int rc = 0;
|
||||
/*<
|
||||
* Close file descriptor and move to clean-up phase.
|
||||
*/
|
||||
ss_dassert(excluded != dcb);
|
||||
rc = close(dcb->fd);
|
||||
|
||||
if (rc < 0) {
|
||||
@ -1108,8 +1144,8 @@ int above_water;
|
||||
/**
|
||||
* Removes dcb from poll set, and adds it to zombies list. As a consequense,
|
||||
* dcb first moves to DCB_STATE_NOPOLLING, and then to DCB_STATE_ZOMBIE state.
|
||||
* At the end of the function state may not be DCB_STATE_ZOMBIE because once dcb_initlock
|
||||
* is released parallel threads may change the state.
|
||||
* At the end of the function state may not be DCB_STATE_ZOMBIE because once
|
||||
* dcb_initlock is released parallel threads may change the state.
|
||||
*
|
||||
* Parameters:
|
||||
* @param dcb The DCB to close
|
||||
@ -1928,13 +1964,15 @@ int rval = 0;
|
||||
* and instead implements a queuing mechanism in which nested events are
|
||||
* queued on the DCB such that when the thread processing the first event
|
||||
* returns it will read the queued event and process it. This allows the
|
||||
* thread that woudl otherwise have to wait to process the nested event
|
||||
* thread that would otherwise have to wait to process the nested event
|
||||
* to return immediately and and process other events.
|
||||
*
|
||||
* @param dcb The DCB that has data available
|
||||
* @param thread_id The ID of the calling thread
|
||||
* @param nozombies If non-zero then do not do zombie processing
|
||||
*/
|
||||
void
|
||||
dcb_pollin(DCB *dcb, int thread_id)
|
||||
dcb_pollin(DCB *dcb, int thread_id, int nozombies)
|
||||
{
|
||||
|
||||
spinlock_acquire(&dcb->pollinlock);
|
||||
@ -1945,7 +1983,8 @@ dcb_pollin(DCB *dcb, int thread_id)
|
||||
if (dcb->readcheck)
|
||||
{
|
||||
dcb->stats.n_readrechecks++;
|
||||
dcb_process_zombies(thread_id);
|
||||
if (!nozombies)
|
||||
dcb_process_zombies(thread_id, dcb);
|
||||
}
|
||||
dcb->readcheck = 0;
|
||||
spinlock_release(&dcb->pollinlock);
|
||||
@ -1976,9 +2015,11 @@ dcb_pollin(DCB *dcb, int thread_id)
|
||||
* to return immediately and and process other events.
|
||||
*
|
||||
* @param dcb The DCB thats available for writes
|
||||
* @param thread_id The ID of the calling thread
|
||||
* @param nozombies If non-zero then do not do zombie processing
|
||||
*/
|
||||
void
|
||||
dcb_pollout(DCB *dcb, int thread_id)
|
||||
dcb_pollout(DCB *dcb, int thread_id, int nozombies)
|
||||
{
|
||||
|
||||
spinlock_acquire(&dcb->polloutlock);
|
||||
@ -1988,7 +2029,8 @@ dcb_pollout(DCB *dcb, int thread_id)
|
||||
do {
|
||||
if (dcb->writecheck)
|
||||
{
|
||||
dcb_process_zombies(thread_id);
|
||||
if (!nozombies)
|
||||
dcb_process_zombies(thread_id, dcb);
|
||||
dcb->stats.n_writerechecks++;
|
||||
}
|
||||
dcb->writecheck = 0;
|
||||
|
Reference in New Issue
Block a user