Create thread specific zombie queues
Because each thread has their own epoll file descriptor and only one thread can process a DCB, it makes sense to move to a per thread zombie queue. This removes one of the last restrictions on scalability.
This commit is contained in:
@ -100,18 +100,30 @@ static LIST_CONFIG DCBlist =
|
||||
/* A DCB with null values, used for initialization */
|
||||
static DCB dcb_initialized = DCB_INIT;
|
||||
|
||||
static DCB *zombies = NULL;
|
||||
static int nzombies = 0;
|
||||
static DCB **zombies;
|
||||
static int *nzombies;
|
||||
static int maxzombies = 0;
|
||||
static SPINLOCK zombiespin = SPINLOCK_INIT;
|
||||
|
||||
void dcb_global_init()
|
||||
{
|
||||
int nthreads = config_threadcount();
|
||||
|
||||
if ((zombies = MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL ||
|
||||
(nzombies = MXS_CALLOC(nthreads, sizeof(int))) == NULL)
|
||||
{
|
||||
MXS_OOM();
|
||||
raise(SIGABRT);
|
||||
}
|
||||
}
|
||||
|
||||
static void dcb_initialize(void *dcb);
|
||||
static void dcb_final_free(DCB *dcb);
|
||||
static void dcb_call_callback(DCB *dcb, DCB_REASON reason);
|
||||
static int dcb_null_write(DCB *dcb, GWBUF *buf);
|
||||
static int dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf);
|
||||
static inline DCB * dcb_find_in_list(DCB *dcb);
|
||||
static inline void dcb_process_victim_queue(DCB *listofdcb);
|
||||
static inline void dcb_process_victim_queue(int threadid);
|
||||
static void dcb_stop_polling_and_shutdown (DCB *dcb);
|
||||
static bool dcb_maybe_add_persistent(DCB *);
|
||||
static inline bool dcb_write_parameter_check(DCB *dcb, GWBUF *queue);
|
||||
@ -166,17 +178,6 @@ bool dcb_get_ses_log_info(
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the pointer to the list of zombie DCB's
|
||||
*
|
||||
* @return Zombies DCB list
|
||||
*/
|
||||
DCB *
|
||||
dcb_get_zombies(void)
|
||||
{
|
||||
return zombies;
|
||||
}
|
||||
|
||||
/*
|
||||
* @brief Pre-allocate memory for a number of DCBs
|
||||
*
|
||||
@ -455,109 +456,12 @@ dcb_free_all_memory(DCB *dcb)
|
||||
*
|
||||
* @param threadid The thread ID of the caller
|
||||
*/
|
||||
DCB *
|
||||
dcb_process_zombies(int threadid)
|
||||
void dcb_process_zombies(int threadid)
|
||||
{
|
||||
DCB *zombiedcb;
|
||||
DCB *previousdcb = NULL, *nextdcb;
|
||||
DCB *listofdcb = NULL;
|
||||
|
||||
/**
|
||||
* Perform a dirty read to see if there is anything in the queue.
|
||||
* This avoids threads hitting the queue spinlock when the queue
|
||||
* is empty. This will really help when the only entry is being
|
||||
* freed, since the queue is updated before the expensive call to
|
||||
* dcb_final_free.
|
||||
*/
|
||||
if (!zombies)
|
||||
if (zombies[threadid])
|
||||
{
|
||||
return NULL;
|
||||
dcb_process_victim_queue(threadid);
|
||||
}
|
||||
|
||||
/*
|
||||
* Process the zombie queue and create a list of DCB's that can be
|
||||
* finally freed. This processing is down under a spinlock that
|
||||
* will prevent new entries being added to the zombie queue. Therefore
|
||||
* we do not want to do any expensive operations under this spinlock
|
||||
* as it will block other threads. The expensive operations will be
|
||||
* performed on the victim queue within holding the zombie queue
|
||||
* spinlock.
|
||||
*/
|
||||
spinlock_acquire(&zombiespin);
|
||||
zombiedcb = zombies;
|
||||
while (zombiedcb)
|
||||
{
|
||||
CHK_DCB(zombiedcb);
|
||||
nextdcb = zombiedcb->memdata.next;
|
||||
/*
|
||||
* Skip processing of DCB's that are
|
||||
* in the event queue waiting to be processed.
|
||||
*/
|
||||
if (zombiedcb->evq.next || zombiedcb->evq.prev)
|
||||
{
|
||||
previousdcb = zombiedcb;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
if (bitmask_clear_without_spinlock(&zombiedcb->memdata.bitmask, threadid))
|
||||
{
|
||||
/**
|
||||
* Remove the DCB from the zombie queue
|
||||
* and call the final free routine for the
|
||||
* DCB
|
||||
*
|
||||
* zombiedcb is the DCB we are processing
|
||||
* previousdcb is the previous DCB on the zombie
|
||||
* queue or NULL if the DCB is at the head of the
|
||||
* queue. Remove zombiedcb from the zombies list.
|
||||
*/
|
||||
if (NULL == previousdcb)
|
||||
{
|
||||
zombies = zombiedcb->memdata.next;
|
||||
}
|
||||
else
|
||||
{
|
||||
previousdcb->memdata.next = zombiedcb->memdata.next;
|
||||
}
|
||||
|
||||
MXS_DEBUG("%lu [%s] Remove dcb "
|
||||
"%p fd %d in state %s from the "
|
||||
"list of zombies.",
|
||||
pthread_self(),
|
||||
__func__,
|
||||
zombiedcb,
|
||||
zombiedcb->fd,
|
||||
STRDCBSTATE(zombiedcb->state));
|
||||
/*<
|
||||
* Move zombie dcb to linked list of victim dcbs.
|
||||
* The variable dcb is used to hold the last DCB
|
||||
* to have been added to the linked list, or NULL
|
||||
* if none has yet been added. If the list
|
||||
* (listofdcb) is not NULL, then it follows that
|
||||
* dcb will also not be null.
|
||||
*/
|
||||
nzombies--;
|
||||
zombiedcb->memdata.next = listofdcb;
|
||||
listofdcb = zombiedcb;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Since we didn't remove this dcb from the zombies
|
||||
list, we need to advance the previous pointer */
|
||||
previousdcb = zombiedcb;
|
||||
}
|
||||
}
|
||||
zombiedcb = nextdcb;
|
||||
}
|
||||
spinlock_release(&zombiespin);
|
||||
|
||||
if (listofdcb)
|
||||
{
|
||||
dcb_process_victim_queue(listofdcb);
|
||||
}
|
||||
|
||||
return zombies;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -570,17 +474,17 @@ dcb_process_zombies(int threadid)
|
||||
* @param listofdcb The first victim DCB
|
||||
*/
|
||||
static inline void
|
||||
dcb_process_victim_queue(DCB *listofdcb)
|
||||
dcb_process_victim_queue(int threadid)
|
||||
{
|
||||
DCB *dcb = listofdcb;
|
||||
/** Grab the zombie queue to a local queue. This allows us to add back DCBs
|
||||
* that should not yet be closed. */
|
||||
DCB *dcblist = zombies[threadid];
|
||||
zombies[threadid] = NULL;
|
||||
|
||||
while (dcb != NULL)
|
||||
while (dcblist)
|
||||
{
|
||||
DCB *nextdcb;
|
||||
/*<
|
||||
* Stop dcb's listening and modify state accordingly.
|
||||
*/
|
||||
spinlock_acquire(&dcb->dcb_initlock);
|
||||
DCB *dcb = dcblist;
|
||||
|
||||
if (dcb->state == DCB_STATE_POLLING || dcb->state == DCB_STATE_LISTENING)
|
||||
{
|
||||
if (dcb->state == DCB_STATE_LISTENING)
|
||||
@ -595,34 +499,28 @@ dcb_process_victim_queue(DCB *listofdcb)
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Must be DCB_STATE_POLLING */
|
||||
spinlock_release(&dcb->dcb_initlock);
|
||||
if (0 == dcb->persistentstart && dcb_maybe_add_persistent(dcb))
|
||||
{
|
||||
/* Have taken DCB into persistent pool, no further killing */
|
||||
dcb = dcb->memdata.next;
|
||||
continue;
|
||||
dcblist = dcblist->memdata.next;
|
||||
}
|
||||
else
|
||||
{
|
||||
DCB *next2dcb;
|
||||
/** The DCB is still polling. Shut it down and process it later. */
|
||||
dcb_stop_polling_and_shutdown(dcb);
|
||||
spinlock_acquire(&zombiespin);
|
||||
bitmask_copy(&dcb->memdata.bitmask, poll_bitmask());
|
||||
next2dcb = dcb->memdata.next;
|
||||
dcb->memdata.next = zombies;
|
||||
zombies = dcb;
|
||||
nzombies++;
|
||||
if (nzombies > maxzombies)
|
||||
{
|
||||
maxzombies = nzombies;
|
||||
}
|
||||
spinlock_release(&zombiespin);
|
||||
dcb = next2dcb;
|
||||
continue;
|
||||
DCB *newzombie = dcblist;
|
||||
dcblist = dcblist->memdata.next;
|
||||
newzombie->memdata.next = zombies[threadid];
|
||||
zombies[threadid] = newzombie;
|
||||
}
|
||||
|
||||
/** Nothing to do here but to process the next DCB */
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
nzombies[threadid]--;
|
||||
|
||||
/*
|
||||
* Into the final close logic, so if DCB is for backend server, we
|
||||
* must decrement the number of current connections.
|
||||
@ -690,11 +588,14 @@ dcb_process_victim_queue(DCB *listofdcb)
|
||||
&mxs_log_tls.li_sesid,
|
||||
&mxs_log_tls.li_enabled_priorities);
|
||||
|
||||
/** Move to the next DCB before freeing the previous one */
|
||||
dcblist = dcblist->memdata.next;
|
||||
|
||||
/** After these calls, the DCB should be treated as if it were freed.
|
||||
* Whether it is actually freed depends on the type of the DCB and how
|
||||
* many DCBs are linked to it via the SESSION object. */
|
||||
dcb->state = DCB_STATE_DISCONNECTED;
|
||||
nextdcb = dcb->memdata.next;
|
||||
spinlock_release(&dcb->dcb_initlock);
|
||||
dcb_final_free(dcb);
|
||||
dcb = nextdcb;
|
||||
}
|
||||
/** Reset threads session data */
|
||||
mxs_log_tls.li_sesid = 0;
|
||||
@ -1748,20 +1649,15 @@ dcb_close(DCB *dcb)
|
||||
if (dcb->state == DCB_STATE_ALLOC && dcb->fd == DCBFD_CLOSED)
|
||||
{
|
||||
dcb_final_free(dcb);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* If DCB is in persistent pool, mark it as an error and exit
|
||||
*/
|
||||
if (dcb->persistentstart > 0)
|
||||
else if (dcb->persistentstart > 0)
|
||||
{
|
||||
dcb->dcb_errhandle_called = true;
|
||||
return;
|
||||
}
|
||||
|
||||
spinlock_acquire(&zombiespin);
|
||||
if (!dcb->dcb_is_zombie)
|
||||
else if (!dcb->dcb_is_zombie)
|
||||
{
|
||||
if (DCB_ROLE_BACKEND_HANDLER == dcb->dcb_role && 0 == dcb->persistentstart
|
||||
&& dcb->server && DCB_STATE_POLLING == dcb->state)
|
||||
@ -1777,23 +1673,21 @@ dcb_close(DCB *dcb)
|
||||
/*<
|
||||
* Add closing dcb to the top of the list, setting zombie marker
|
||||
*/
|
||||
int owner = dcb->owner;
|
||||
dcb->dcb_is_zombie = true;
|
||||
dcb->memdata.next = zombies;
|
||||
zombies = dcb;
|
||||
nzombies++;
|
||||
if (nzombies > maxzombies)
|
||||
dcb->memdata.next = zombies[owner];
|
||||
zombies[owner] = dcb;
|
||||
nzombies[owner]++;
|
||||
if (nzombies[owner] > maxzombies)
|
||||
{
|
||||
maxzombies = nzombies;
|
||||
}
|
||||
/*< Set bit for each maxscale thread. This should be done before
|
||||
* the state is changed, so as to protect the DCB from premature
|
||||
* destruction. */
|
||||
if (dcb->server)
|
||||
{
|
||||
bitmask_copy(&dcb->memdata.bitmask, poll_bitmask());
|
||||
maxzombies = nzombies[owner];
|
||||
}
|
||||
}
|
||||
spinlock_release(&zombiespin);
|
||||
else
|
||||
{
|
||||
/** DCBs in the zombie queue can still receive events which means that
|
||||
* a DCB can be closed multiple times while it's in the zombie queue. */
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -2621,50 +2515,6 @@ dcb_isvalid(DCB *dcb)
|
||||
return (int)list_is_entry_in_use(&DCBlist, (list_entry_t *)dcb);
|
||||
}
|
||||
|
||||
/**
|
||||
* Call all the callbacks on all DCB's that match the server and the reason given
|
||||
*
|
||||
* @param reason The DCB_REASON that triggers the callback
|
||||
*/
|
||||
void
|
||||
dcb_call_foreach(struct server* server, DCB_REASON reason)
|
||||
{
|
||||
MXS_DEBUG("%lu [dcb_call_foreach]", pthread_self());
|
||||
|
||||
switch (reason) {
|
||||
case DCB_REASON_DRAINED:
|
||||
case DCB_REASON_HIGH_WATER:
|
||||
case DCB_REASON_LOW_WATER:
|
||||
case DCB_REASON_ERROR:
|
||||
case DCB_REASON_HUP:
|
||||
case DCB_REASON_NOT_RESPONDING:
|
||||
{
|
||||
DCB *dcb;
|
||||
list_entry_t *current;
|
||||
|
||||
current = list_start_iteration(&DCBlist);
|
||||
|
||||
while (current)
|
||||
{
|
||||
dcb = (DCB *)current;
|
||||
spinlock_acquire(&dcb->dcb_initlock);
|
||||
if (dcb->state == DCB_STATE_POLLING && dcb->server &&
|
||||
strcmp(dcb->server->unique_name,server->unique_name) == 0)
|
||||
{
|
||||
dcb_call_callback(dcb, DCB_REASON_NOT_RESPONDING);
|
||||
}
|
||||
spinlock_release(&dcb->dcb_initlock);
|
||||
current = list_iterate(&DCBlist, current);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Call all the callbacks on all DCB's that match the server and the reason given
|
||||
*
|
||||
|
@ -1941,6 +1941,7 @@ int main(int argc, char **argv)
|
||||
/* Init MaxScale poll system */
|
||||
poll_init();
|
||||
|
||||
dcb_global_init();
|
||||
/**
|
||||
* Init mysql thread context for main thread as well. Needed when users
|
||||
* are queried from backends.
|
||||
|
@ -450,6 +450,7 @@ poll_remove_dcb(DCB *dcb)
|
||||
*/
|
||||
dcbfd = dcb->fd;
|
||||
spinlock_release(&dcb->dcb_initlock);
|
||||
|
||||
if (dcbfd > 0)
|
||||
{
|
||||
rc = epoll_ctl(epoll_fd[dcb->owner], EPOLL_CTL_DEL, dcbfd, &ev);
|
||||
@ -842,12 +843,11 @@ poll_set_maxwait(unsigned int maxwait)
|
||||
static int
|
||||
process_pollq(int thread_id, struct epoll_event *event)
|
||||
{
|
||||
int found = 0;
|
||||
uint32_t ev = event->events;
|
||||
unsigned long qtime;
|
||||
|
||||
DCB *dcb = event->data.ptr;
|
||||
|
||||
ss_dassert(dcb->owner == thread_id);
|
||||
#if PROFILE_POLL
|
||||
memlog_log(plog, hkheartbeat - dcb->evq.inserted);
|
||||
#endif
|
||||
|
@ -319,6 +319,8 @@ session_link_dcb(SESSION *session, DCB *dcb)
|
||||
}
|
||||
atomic_add(&session->refcount, 1);
|
||||
dcb->session = session;
|
||||
/** Move this DCB under the same thread */
|
||||
dcb->owner = session->client_dcb->owner;
|
||||
spinlock_release(&session->ses_lock);
|
||||
return true;
|
||||
}
|
||||
|
@ -67,7 +67,6 @@ test1()
|
||||
ss_dfprintf(stderr, "\t..done\nMake clone DCB a zombie");
|
||||
clone->state = DCB_STATE_NOPOLLING;
|
||||
dcb_close(clone);
|
||||
ss_info_dassert(dcb_get_zombies() == clone, "Clone DCB must be start of zombie list now");
|
||||
ss_dfprintf(stderr, "\t..done\nProcess the zombies list");
|
||||
dcb_process_zombies(0);
|
||||
ss_dfprintf(stderr, "\t..done\nCheck clone no longer valid");
|
||||
|
Reference in New Issue
Block a user