diff --git a/server/core/dcb.c b/server/core/dcb.c index a1681caa7..b8f9108de 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -403,13 +403,16 @@ DCB_CALLBACK *cb; * operation of clearing this bit means that no bits are set in * the memdata.bitmask then the DCB is no longer able to be * referenced and it can be finally removed. - * Thread won't clear its bit from bitmask of the DCB it is still using. + * + * The excluded DCB allows a thread to exclude a DCB from zombie processing. + * It is used when a thread calls dcb_process_zombies when there is + * a DCB that the caller knows it will continue processing with. * * @param threadid The thread ID of the caller - * @param dcb_in_use The DCB the thread currently uses, NULL or valid DCB. + * @param excluded The DCB the thread currently uses, NULL or valid DCB. */ DCB * -dcb_process_zombies(int threadid, DCB *dcb_in_use) +dcb_process_zombies(int threadid, DCB *excluded) { DCB *ptr, *lptr; DCB* dcb_list = NULL; @@ -426,78 +429,100 @@ bool succp = false; if (!zombies) return NULL; + /* + * Process the zombie queue and create a list of DCB's that can be + * finally freed. This processing is down under a spinlock that + * will prevent new entries being added to the zombie queue. Therefore + * we do not want to do any expensive operations under this spinlock + * as it will block other threads. The expensive operations will be + * performed on the victim queue within holding the zombie queue + * spinlock. + */ spinlock_acquire(&zombiespin); ptr = zombies; lptr = NULL; while (ptr) { CHK_DCB(ptr); - /** Don't clear the bit from DCB the user currently uses */ - if (dcb_in_use == NULL || ptr != dcb_in_use) - { - bitmask_clear(&ptr->memdata.bitmask, threadid); - } - if (ptr == dcb_in_use) - ss_dassert(!bitmask_isallclear(&ptr->memdata.bitmask)); - - if (bitmask_isallclear(&ptr->memdata.bitmask)) - { - /** - * Remove the DCB from the zombie queue - * and call the final free routine for the - * DCB - * - * ptr is the DCB we are processing - * lptr is the previous DCB on the zombie queue - * or NULL if the DCB is at the head of the queue - * tptr is the DCB after the one we are processing - * on the zombie queue - */ - DCB *tptr = ptr->memdata.next; - if (lptr == NULL) - zombies = tptr; - else - lptr->memdata.next = tptr; - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [dcb_process_zombies] Remove dcb %p fd %d " - "in state %s from zombies list.", - pthread_self(), - ptr, - ptr->fd, - STRDCBSTATE(ptr->state)))); - ss_info_dassert(ptr->state == DCB_STATE_ZOMBIE, - "dcb not in DCB_STATE_ZOMBIE state."); - /*< - * Move dcb to linked list of victim dcbs. - */ - if (dcb_list == NULL) { - dcb_list = ptr; - dcb = dcb_list; - } else { - dcb->memdata.next = ptr; - dcb = dcb->memdata.next; - } - dcb->memdata.next = NULL; - ptr = tptr; - } - else + + /* + * Skip processing of the excluded DCB + */ + if (ptr == excluded) { lptr = ptr; ptr = ptr->memdata.next; } + else + { + + bitmask_clear(&ptr->memdata.bitmask, threadid); + + if (bitmask_isallclear(&ptr->memdata.bitmask)) + { + /** + * Remove the DCB from the zombie queue + * and call the final free routine for the + * DCB + * + * ptr is the DCB we are processing + * lptr is the previous DCB on the zombie queue + * or NULL if the DCB is at the head of the + * queue tptr is the DCB after the one we are + * processing on the zombie queue + */ + DCB *tptr = ptr->memdata.next; + if (lptr == NULL) + zombies = tptr; + else + lptr->memdata.next = tptr; + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [dcb_process_zombies] Remove dcb " + "%p fd %d " "in state %s from the " + "list of zombies.", + pthread_self(), + ptr, + ptr->fd, + STRDCBSTATE(ptr->state)))); + ss_info_dassert(ptr->state == DCB_STATE_ZOMBIE, + "dcb not in DCB_STATE_ZOMBIE state."); + /*< + * Move dcb to linked list of victim dcbs. + */ + if (dcb_list == NULL) { + dcb_list = ptr; + dcb = dcb_list; + } else { + dcb->memdata.next = ptr; + dcb = dcb->memdata.next; + } + dcb->memdata.next = NULL; + ptr = tptr; + } + else + { + lptr = ptr; + ptr = ptr->memdata.next; + } + } } spinlock_release(&zombiespin); + /* + * Process the victim queue. These are DCBs that are not in + * use by any thread. + * The corresponding file descriptor is closed, the DCB marked + * as disconnected and the DCB itself is fianlly freed. + */ dcb = dcb_list; - /** Close, and set DISCONNECTED victims */ while (dcb != NULL) { DCB* dcb_next = NULL; int rc = 0; /*< * Close file descriptor and move to clean-up phase. */ - ss_dassert(dcb_in_use != dcb); + ss_dassert(excluded != dcb); rc = close(dcb->fd); if (rc < 0) { @@ -1119,8 +1144,8 @@ int above_water; /** * Removes dcb from poll set, and adds it to zombies list. As a consequense, * dcb first moves to DCB_STATE_NOPOLLING, and then to DCB_STATE_ZOMBIE state. - * At the end of the function state may not be DCB_STATE_ZOMBIE because once dcb_initlock - * is released parallel threads may change the state. + * At the end of the function state may not be DCB_STATE_ZOMBIE because once + * dcb_initlock is released parallel threads may change the state. * * Parameters: * @param dcb The DCB to close @@ -1943,9 +1968,11 @@ int rval = 0; * to return immediately and and process other events. * * @param dcb The DCB that has data available + * @param thread_id The ID of the calling thread + * @param nozombies If non-zero then do not do zombie processing */ void -dcb_pollin(DCB *dcb, int thread_id) +dcb_pollin(DCB *dcb, int thread_id, int nozombies) { spinlock_acquire(&dcb->pollinlock); @@ -1956,7 +1983,8 @@ dcb_pollin(DCB *dcb, int thread_id) if (dcb->readcheck) { dcb->stats.n_readrechecks++; - dcb_process_zombies(thread_id, dcb); + if (!nozombies) + dcb_process_zombies(thread_id, dcb); } dcb->readcheck = 0; spinlock_release(&dcb->pollinlock); @@ -1987,9 +2015,11 @@ dcb_pollin(DCB *dcb, int thread_id) * to return immediately and and process other events. * * @param dcb The DCB thats available for writes + * @param thread_id The ID of the calling thread + * @param nozombies If non-zero then do not do zombie processing */ void -dcb_pollout(DCB *dcb, int thread_id) +dcb_pollout(DCB *dcb, int thread_id, int nozombies) { spinlock_acquire(&dcb->polloutlock); @@ -1999,7 +2029,8 @@ dcb_pollout(DCB *dcb, int thread_id) do { if (dcb->writecheck) { - dcb_process_zombies(thread_id, dcb); + if (!nozombies) + dcb_process_zombies(thread_id, dcb); dcb->stats.n_writerechecks++; } dcb->writecheck = 0; diff --git a/server/core/housekeeper.c b/server/core/housekeeper.c index 6180f24a5..5aa5c7a29 100644 --- a/server/core/housekeeper.c +++ b/server/core/housekeeper.c @@ -16,6 +16,7 @@ * Copyright SkySQL Ab 2014 */ #include +#include #include #include #include diff --git a/server/core/poll.c b/server/core/poll.c index e2b284e27..b3a0e6b70 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -49,10 +49,19 @@ extern int lm_enabled_logfiles_bitmask; * @endverbatim */ +/** + * Control the use of mutexes for the epoll_wait call. Setting to 1 will + * cause the epoll_wait calls to be moved under a mutex. This may be useful + * for debuggign purposes but should be avoided in general use. + */ +#define MUTEX_EPOLL 0 + static int epoll_fd = -1; /*< The epoll file descriptor */ static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */ static GWBITMASK poll_mask; +#if MUTEX_EPOLL static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */ +#endif static int n_waiting = 0; /*< No. of threads in epoll_wait */ /** @@ -154,7 +163,9 @@ int i; thread_data[i].state = THREAD_STOPPED; } } +#if MUTEX_EPOLL simple_mutex_init(&epoll_wait_mutex, "epoll_wait_mutex"); +#endif hktask_add("Load Average", poll_loadav, NULL, POLL_LOAD_FREQ); n_avg_samples = 15 * 60 / POLL_LOAD_FREQ; @@ -359,7 +370,7 @@ DCB *zombies = NULL; thread_id))); no_op = TRUE; } -#if 0 +#if MUTEX_EPOLL simple_mutex_lock(&epoll_wait_mutex, TRUE); #endif if (thread_data) @@ -385,7 +396,7 @@ DCB *zombies = NULL; { atomic_add(&n_waiting, -1); if (process_zombies_only) { -#if 0 +#if MUTEX_EPOLL simple_mutex_unlock(&epoll_wait_mutex); #endif goto process_zombies; @@ -413,7 +424,7 @@ DCB *zombies = NULL; if (n_waiting == 0) atomic_add(&pollStats.n_nothreads, 1); -#if 0 +#if MUTEX_EPOLL simple_mutex_unlock(&epoll_wait_mutex); #endif #endif /* BLOCKINGPOLL */ @@ -442,7 +453,7 @@ DCB *zombies = NULL; for (i = 0; i < nfds; i++) { - DCB *dcb = (DCB *)events[i].data.ptr; + DCB *dcb = (DCB *)events[i].data.ptr; __uint32_t ev = events[i].events; CHK_DCB(dcb); @@ -504,7 +515,7 @@ DCB *zombies = NULL; #else atomic_add(&pollStats.n_write, 1); - dcb_pollout(dcb, thread_id); + dcb_pollout(dcb, thread_id, nfds); #endif } else { LOGIF(LD, (skygw_log_write( @@ -554,7 +565,7 @@ DCB *zombies = NULL; #if MUTEX_BLOCK dcb->func.read(dcb); #else - dcb_pollin(dcb, thread_id); + dcb_pollin(dcb, thread_id, nfds); #endif } #if MUTEX_BLOCK diff --git a/server/include/dcb.h b/server/include/dcb.h index 17cc0c998..3966dfead 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -271,8 +271,8 @@ int fail_accept_errno; #define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water) #define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) -void dcb_pollin(DCB *, int); -void dcb_pollout(DCB *, int); +void dcb_pollin(DCB *, int, int); +void dcb_pollout(DCB *, int, int); DCB *dcb_get_zombies(void); int gw_write( #if defined(SS_DEBUG)