From 675baad4b132fc69994e2b32ea5f8f3886d34368 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Wed, 17 Sep 2014 14:54:40 +0100 Subject: [PATCH 1/4] Fixed typo --- server/core/modutil.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/core/modutil.c b/server/core/modutil.c index e2800b849..7247f181d 100644 --- a/server/core/modutil.c +++ b/server/core/modutil.c @@ -125,7 +125,7 @@ unsigned char *ptr; /** * Replace the contents of a GWBUF with the new SQL statement passed as a text string. * The routine takes care of the modification needed to the MySQL packet, - * returning a GWBUF chian that cna be used to send the data to a MySQL server + * returning a GWBUF chain that can be used to send the data to a MySQL server * * @param orig The original request in a GWBUF * @param sql The SQL text to replace in the packet @@ -225,4 +225,4 @@ char* modutil_get_query( } /*< switch */ retblock: return query_str; -} \ No newline at end of file +} From 0f3db1c0918f56810196744a10b121dd34955f1b Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Wed, 17 Sep 2014 18:02:44 +0300 Subject: [PATCH 2/4] First changes for fixing #548, http://bugs.skysql.com/show_bug.cgi?id=548 dcb.c:dcb_process_zombies:added new parameter which tells what DCB the thread will use after calling dcb_process_zombies. Thus, processing that DCB is skipped. readwritesplit.c:routeQuery:removed double free call for query buffer. --- server/core/dcb.c | 29 +++++++++++++------ server/core/poll.c | 4 +-- server/include/dcb.h | 2 +- .../routing/readwritesplit/readwritesplit.c | 14 +++++++-- 4 files changed, 34 insertions(+), 15 deletions(-) diff --git a/server/core/dcb.c b/server/core/dcb.c index deccdb0a5..a1681caa7 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -403,18 +403,20 @@ DCB_CALLBACK *cb; * operation of clearing this bit means that no bits are set in * the memdata.bitmask then the DCB is no longer able to be * referenced and it can be finally removed. + * Thread won't clear its bit from bitmask of the DCB it is still using. * * @param threadid The thread ID of the caller + * @param dcb_in_use The DCB the thread currently uses, NULL or valid DCB. */ DCB * -dcb_process_zombies(int threadid) +dcb_process_zombies(int threadid, DCB *dcb_in_use) { DCB *ptr, *lptr; DCB* dcb_list = NULL; DCB* dcb = NULL; bool succp = false; - /*< + /** * Perform a dirty read to see if there is anything in the queue. * This avoids threads hitting the queue spinlock when the queue * is empty. This will really help when the only entry is being @@ -428,11 +430,19 @@ bool succp = false; ptr = zombies; lptr = NULL; while (ptr) - { - bitmask_clear(&ptr->memdata.bitmask, threadid); + { + CHK_DCB(ptr); + /** Don't clear the bit from DCB the user currently uses */ + if (dcb_in_use == NULL || ptr != dcb_in_use) + { + bitmask_clear(&ptr->memdata.bitmask, threadid); + } + if (ptr == dcb_in_use) + ss_dassert(!bitmask_isallclear(&ptr->memdata.bitmask)); + if (bitmask_isallclear(&ptr->memdata.bitmask)) { - /*< + /** * Remove the DCB from the zombie queue * and call the final free routine for the * DCB @@ -480,13 +490,14 @@ bool succp = false; spinlock_release(&zombiespin); dcb = dcb_list; - /*< Close, and set DISCONNECTED victims */ + /** Close, and set DISCONNECTED victims */ while (dcb != NULL) { DCB* dcb_next = NULL; int rc = 0; /*< * Close file descriptor and move to clean-up phase. */ + ss_dassert(dcb_in_use != dcb); rc = close(dcb->fd); if (rc < 0) { @@ -1928,7 +1939,7 @@ int rval = 0; * and instead implements a queuing mechanism in which nested events are * queued on the DCB such that when the thread processing the first event * returns it will read the queued event and process it. This allows the - * thread that woudl otherwise have to wait to process the nested event + * thread that would otherwise have to wait to process the nested event * to return immediately and and process other events. * * @param dcb The DCB that has data available @@ -1945,7 +1956,7 @@ dcb_pollin(DCB *dcb, int thread_id) if (dcb->readcheck) { dcb->stats.n_readrechecks++; - dcb_process_zombies(thread_id); + dcb_process_zombies(thread_id, dcb); } dcb->readcheck = 0; spinlock_release(&dcb->pollinlock); @@ -1988,7 +1999,7 @@ dcb_pollout(DCB *dcb, int thread_id) do { if (dcb->writecheck) { - dcb_process_zombies(thread_id); + dcb_process_zombies(thread_id, dcb); dcb->stats.n_writerechecks++; } dcb->writecheck = 0; diff --git a/server/core/poll.c b/server/core/poll.c index 2df68fd60..e2b284e27 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -633,12 +633,12 @@ DCB *zombies = NULL; } /*< for */ no_op = FALSE; } - process_zombies: +process_zombies: if (thread_data) { thread_data[thread_id].state = THREAD_ZPROCESSING; } - zombies = dcb_process_zombies(thread_id); + zombies = dcb_process_zombies(thread_id, NULL); if (zombies == NULL) { process_zombies_only = false; diff --git a/server/include/dcb.h b/server/include/dcb.h index 723acec5d..17cc0c998 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -289,7 +289,7 @@ DCB *dcb_clone(DCB *); int dcb_read(DCB *, GWBUF **); int dcb_drain_writeq(DCB *); void dcb_close(DCB *); -DCB *dcb_process_zombies(int); /* Process Zombies */ +DCB *dcb_process_zombies(int, DCB*); /* Process Zombies except the one behind the pointer */ void printAllDCBs(); /* Debug to print all DCB in the system */ void printDCB(DCB *); /* Debug print routine */ void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */ diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 967f332a1..451b29742 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -1674,7 +1674,6 @@ static int routeQuery( { rses_is_closed = true; } - ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf)); packet = GWBUF_DATA(querybuf); @@ -1702,7 +1701,6 @@ static int routeQuery( (rses_is_closed ? "Router was closed" : "Router has no backend servers where to " "route to")))); - free(querybuf); } goto retblock; } @@ -2195,7 +2193,17 @@ static void clientReply ( goto lock_failed; } bref = get_bref_from_dcb(router_cli_ses, backend_dcb); - + +#if !defined(FOR_BUG548_FIX_ONLY) + /** This makes the issue becoming visible in poll.c */ + if (bref == NULL) + { + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); + goto lock_failed; + } +#endif + CHK_BACKEND_REF(bref); scur = &bref->bref_sescmd_cur; /** From 2402d55de6e48e468339f7d093ee8a3ebf8f7eaa Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Fri, 19 Sep 2014 10:50:54 +0100 Subject: [PATCH 3/4] Some general tidyup plus addition of code to block zombie processing if epoll_wait returned multiple descriptors --- server/core/dcb.c | 153 +++++++++++++++++++++++--------------- server/core/housekeeper.c | 1 + server/core/poll.c | 23 ++++-- server/include/dcb.h | 4 +- 4 files changed, 112 insertions(+), 69 deletions(-) diff --git a/server/core/dcb.c b/server/core/dcb.c index a1681caa7..b8f9108de 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -403,13 +403,16 @@ DCB_CALLBACK *cb; * operation of clearing this bit means that no bits are set in * the memdata.bitmask then the DCB is no longer able to be * referenced and it can be finally removed. - * Thread won't clear its bit from bitmask of the DCB it is still using. + * + * The excluded DCB allows a thread to exclude a DCB from zombie processing. + * It is used when a thread calls dcb_process_zombies when there is + * a DCB that the caller knows it will continue processing with. * * @param threadid The thread ID of the caller - * @param dcb_in_use The DCB the thread currently uses, NULL or valid DCB. + * @param excluded The DCB the thread currently uses, NULL or valid DCB. */ DCB * -dcb_process_zombies(int threadid, DCB *dcb_in_use) +dcb_process_zombies(int threadid, DCB *excluded) { DCB *ptr, *lptr; DCB* dcb_list = NULL; @@ -426,78 +429,100 @@ bool succp = false; if (!zombies) return NULL; + /* + * Process the zombie queue and create a list of DCB's that can be + * finally freed. This processing is down under a spinlock that + * will prevent new entries being added to the zombie queue. Therefore + * we do not want to do any expensive operations under this spinlock + * as it will block other threads. The expensive operations will be + * performed on the victim queue within holding the zombie queue + * spinlock. + */ spinlock_acquire(&zombiespin); ptr = zombies; lptr = NULL; while (ptr) { CHK_DCB(ptr); - /** Don't clear the bit from DCB the user currently uses */ - if (dcb_in_use == NULL || ptr != dcb_in_use) - { - bitmask_clear(&ptr->memdata.bitmask, threadid); - } - if (ptr == dcb_in_use) - ss_dassert(!bitmask_isallclear(&ptr->memdata.bitmask)); - - if (bitmask_isallclear(&ptr->memdata.bitmask)) - { - /** - * Remove the DCB from the zombie queue - * and call the final free routine for the - * DCB - * - * ptr is the DCB we are processing - * lptr is the previous DCB on the zombie queue - * or NULL if the DCB is at the head of the queue - * tptr is the DCB after the one we are processing - * on the zombie queue - */ - DCB *tptr = ptr->memdata.next; - if (lptr == NULL) - zombies = tptr; - else - lptr->memdata.next = tptr; - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [dcb_process_zombies] Remove dcb %p fd %d " - "in state %s from zombies list.", - pthread_self(), - ptr, - ptr->fd, - STRDCBSTATE(ptr->state)))); - ss_info_dassert(ptr->state == DCB_STATE_ZOMBIE, - "dcb not in DCB_STATE_ZOMBIE state."); - /*< - * Move dcb to linked list of victim dcbs. - */ - if (dcb_list == NULL) { - dcb_list = ptr; - dcb = dcb_list; - } else { - dcb->memdata.next = ptr; - dcb = dcb->memdata.next; - } - dcb->memdata.next = NULL; - ptr = tptr; - } - else + + /* + * Skip processing of the excluded DCB + */ + if (ptr == excluded) { lptr = ptr; ptr = ptr->memdata.next; } + else + { + + bitmask_clear(&ptr->memdata.bitmask, threadid); + + if (bitmask_isallclear(&ptr->memdata.bitmask)) + { + /** + * Remove the DCB from the zombie queue + * and call the final free routine for the + * DCB + * + * ptr is the DCB we are processing + * lptr is the previous DCB on the zombie queue + * or NULL if the DCB is at the head of the + * queue tptr is the DCB after the one we are + * processing on the zombie queue + */ + DCB *tptr = ptr->memdata.next; + if (lptr == NULL) + zombies = tptr; + else + lptr->memdata.next = tptr; + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [dcb_process_zombies] Remove dcb " + "%p fd %d " "in state %s from the " + "list of zombies.", + pthread_self(), + ptr, + ptr->fd, + STRDCBSTATE(ptr->state)))); + ss_info_dassert(ptr->state == DCB_STATE_ZOMBIE, + "dcb not in DCB_STATE_ZOMBIE state."); + /*< + * Move dcb to linked list of victim dcbs. + */ + if (dcb_list == NULL) { + dcb_list = ptr; + dcb = dcb_list; + } else { + dcb->memdata.next = ptr; + dcb = dcb->memdata.next; + } + dcb->memdata.next = NULL; + ptr = tptr; + } + else + { + lptr = ptr; + ptr = ptr->memdata.next; + } + } } spinlock_release(&zombiespin); + /* + * Process the victim queue. These are DCBs that are not in + * use by any thread. + * The corresponding file descriptor is closed, the DCB marked + * as disconnected and the DCB itself is fianlly freed. + */ dcb = dcb_list; - /** Close, and set DISCONNECTED victims */ while (dcb != NULL) { DCB* dcb_next = NULL; int rc = 0; /*< * Close file descriptor and move to clean-up phase. */ - ss_dassert(dcb_in_use != dcb); + ss_dassert(excluded != dcb); rc = close(dcb->fd); if (rc < 0) { @@ -1119,8 +1144,8 @@ int above_water; /** * Removes dcb from poll set, and adds it to zombies list. As a consequense, * dcb first moves to DCB_STATE_NOPOLLING, and then to DCB_STATE_ZOMBIE state. - * At the end of the function state may not be DCB_STATE_ZOMBIE because once dcb_initlock - * is released parallel threads may change the state. + * At the end of the function state may not be DCB_STATE_ZOMBIE because once + * dcb_initlock is released parallel threads may change the state. * * Parameters: * @param dcb The DCB to close @@ -1943,9 +1968,11 @@ int rval = 0; * to return immediately and and process other events. * * @param dcb The DCB that has data available + * @param thread_id The ID of the calling thread + * @param nozombies If non-zero then do not do zombie processing */ void -dcb_pollin(DCB *dcb, int thread_id) +dcb_pollin(DCB *dcb, int thread_id, int nozombies) { spinlock_acquire(&dcb->pollinlock); @@ -1956,7 +1983,8 @@ dcb_pollin(DCB *dcb, int thread_id) if (dcb->readcheck) { dcb->stats.n_readrechecks++; - dcb_process_zombies(thread_id, dcb); + if (!nozombies) + dcb_process_zombies(thread_id, dcb); } dcb->readcheck = 0; spinlock_release(&dcb->pollinlock); @@ -1987,9 +2015,11 @@ dcb_pollin(DCB *dcb, int thread_id) * to return immediately and and process other events. * * @param dcb The DCB thats available for writes + * @param thread_id The ID of the calling thread + * @param nozombies If non-zero then do not do zombie processing */ void -dcb_pollout(DCB *dcb, int thread_id) +dcb_pollout(DCB *dcb, int thread_id, int nozombies) { spinlock_acquire(&dcb->polloutlock); @@ -1999,7 +2029,8 @@ dcb_pollout(DCB *dcb, int thread_id) do { if (dcb->writecheck) { - dcb_process_zombies(thread_id, dcb); + if (!nozombies) + dcb_process_zombies(thread_id, dcb); dcb->stats.n_writerechecks++; } dcb->writecheck = 0; diff --git a/server/core/housekeeper.c b/server/core/housekeeper.c index 6180f24a5..5aa5c7a29 100644 --- a/server/core/housekeeper.c +++ b/server/core/housekeeper.c @@ -16,6 +16,7 @@ * Copyright SkySQL Ab 2014 */ #include +#include #include #include #include diff --git a/server/core/poll.c b/server/core/poll.c index e2b284e27..b3a0e6b70 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -49,10 +49,19 @@ extern int lm_enabled_logfiles_bitmask; * @endverbatim */ +/** + * Control the use of mutexes for the epoll_wait call. Setting to 1 will + * cause the epoll_wait calls to be moved under a mutex. This may be useful + * for debuggign purposes but should be avoided in general use. + */ +#define MUTEX_EPOLL 0 + static int epoll_fd = -1; /*< The epoll file descriptor */ static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */ static GWBITMASK poll_mask; +#if MUTEX_EPOLL static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */ +#endif static int n_waiting = 0; /*< No. of threads in epoll_wait */ /** @@ -154,7 +163,9 @@ int i; thread_data[i].state = THREAD_STOPPED; } } +#if MUTEX_EPOLL simple_mutex_init(&epoll_wait_mutex, "epoll_wait_mutex"); +#endif hktask_add("Load Average", poll_loadav, NULL, POLL_LOAD_FREQ); n_avg_samples = 15 * 60 / POLL_LOAD_FREQ; @@ -359,7 +370,7 @@ DCB *zombies = NULL; thread_id))); no_op = TRUE; } -#if 0 +#if MUTEX_EPOLL simple_mutex_lock(&epoll_wait_mutex, TRUE); #endif if (thread_data) @@ -385,7 +396,7 @@ DCB *zombies = NULL; { atomic_add(&n_waiting, -1); if (process_zombies_only) { -#if 0 +#if MUTEX_EPOLL simple_mutex_unlock(&epoll_wait_mutex); #endif goto process_zombies; @@ -413,7 +424,7 @@ DCB *zombies = NULL; if (n_waiting == 0) atomic_add(&pollStats.n_nothreads, 1); -#if 0 +#if MUTEX_EPOLL simple_mutex_unlock(&epoll_wait_mutex); #endif #endif /* BLOCKINGPOLL */ @@ -442,7 +453,7 @@ DCB *zombies = NULL; for (i = 0; i < nfds; i++) { - DCB *dcb = (DCB *)events[i].data.ptr; + DCB *dcb = (DCB *)events[i].data.ptr; __uint32_t ev = events[i].events; CHK_DCB(dcb); @@ -504,7 +515,7 @@ DCB *zombies = NULL; #else atomic_add(&pollStats.n_write, 1); - dcb_pollout(dcb, thread_id); + dcb_pollout(dcb, thread_id, nfds); #endif } else { LOGIF(LD, (skygw_log_write( @@ -554,7 +565,7 @@ DCB *zombies = NULL; #if MUTEX_BLOCK dcb->func.read(dcb); #else - dcb_pollin(dcb, thread_id); + dcb_pollin(dcb, thread_id, nfds); #endif } #if MUTEX_BLOCK diff --git a/server/include/dcb.h b/server/include/dcb.h index 17cc0c998..3966dfead 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -271,8 +271,8 @@ int fail_accept_errno; #define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water) #define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) -void dcb_pollin(DCB *, int); -void dcb_pollout(DCB *, int); +void dcb_pollin(DCB *, int, int); +void dcb_pollout(DCB *, int, int); DCB *dcb_get_zombies(void); int gw_write( #if defined(SS_DEBUG) From 2d2fc28b074ef8c10aa6c0347ed4f6bb15632a04 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Fri, 19 Sep 2014 11:40:16 +0100 Subject: [PATCH 4/4] Addition of code to prevent multiple hangup's beign processed on the same DCB --- server/core/poll.c | 20 ++++++++++++++++++-- server/include/dcb.h | 7 +++++-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/server/core/poll.c b/server/core/poll.c index b3a0e6b70..ff8cfe7b3 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -620,7 +620,15 @@ DCB *zombies = NULL; eno, strerror(eno)))); atomic_add(&pollStats.n_hup, 1); - dcb->func.hangup(dcb); + spinlock_acquire(&dcb->dcb_initlock); + if ((dcb->flags & DCBF_HUNG) == 0) + { + dcb->flags |= DCBF_HUNG; + spinlock_release(&dcb->dcb_initlock); + dcb->func.hangup(dcb); + } + else + spinlock_release(&dcb->dcb_initlock); } if (ev & EPOLLRDHUP) @@ -639,7 +647,15 @@ DCB *zombies = NULL; eno, strerror(eno)))); atomic_add(&pollStats.n_hup, 1); - dcb->func.hangup(dcb); + spinlock_acquire(&dcb->dcb_initlock); + if ((dcb->flags & DCBF_HUNG) == 0) + { + dcb->flags |= DCBF_HUNG; + spinlock_release(&dcb->dcb_initlock); + dcb->func.hangup(dcb); + } + else + spinlock_release(&dcb->dcb_initlock); } } /*< for */ no_op = FALSE; diff --git a/server/include/dcb.h b/server/include/dcb.h index 3966dfead..06687f349 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -317,6 +317,9 @@ void dcb_call_foreach (DCB_REASON reason); void dcb_call_foreach ( DCB_REASON reason); -/* DCB flags values */ -#define DCBF_CLONE 0x0001 /* DCB is a clone */ +/** + * DCB flags values + */ +#define DCBF_CLONE 0x0001 /*< DCB is a clone */ +#define DCBF_HUNG 0x0002 /*< Hangup has been dispatched */ #endif /* _DCB_H */