From 30927455efee4aafbd004016eb6064fc55941c57 Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Sun, 20 Nov 2016 10:17:04 +0200 Subject: [PATCH] Add each DCB to the owning thread's list Each DCB needs to be added to the owning thread's list so that they can be iterated through. As sessions always have a client DCB, the sessions don't need to be added to a similar per thread list. This change fixes a problem with dcb_hangup_foreach that the removal of the list manager introduced. Now the hangup events are properly injected for the DCBs that connect to the server in question. --- include/maxscale/dcb.h | 11 ++- server/core/dcb.c | 177 +++++++++++++++++++++++++++++------------ server/core/poll.c | 47 ++--------- 3 files changed, 144 insertions(+), 91 deletions(-) diff --git a/include/maxscale/dcb.h b/include/maxscale/dcb.h index f687ae0b4..4b39d4b9c 100644 --- a/include/maxscale/dcb.h +++ b/include/maxscale/dcb.h @@ -278,6 +278,8 @@ typedef struct dcb bool ssl_write_want_write; /*< Flag */ int dcb_port; /**< port of target server */ bool was_persistent; /**< Whether this DCB was in the persistent pool */ + struct dcb *thr_next; /**< Next DCB in owning thread's list */ + struct dcb *thr_tail; /**< Last DCB in owning thread's list */ skygw_chk_t dcb_chk_tail; } DCB; @@ -288,7 +290,7 @@ typedef struct dcb .cb_lock = SPINLOCK_INIT, .pollinlock = SPINLOCK_INIT, \ .fd = DCBFD_CLOSED, .stats = DCBSTATS_INIT, .ssl_state = SSL_HANDSHAKE_UNKNOWN, \ .state = DCB_STATE_ALLOC, .polloutlock = SPINLOCK_INIT, .dcb_chk_tail = CHK_NUM_DCB, \ - .authenticator_data = NULL} + .authenticator_data = NULL, .thr_next = NULL, .thr_tail = NULL} /** * The DCB usage filer used for returning DCB's in use for a certain reason @@ -344,6 +346,13 @@ void dcb_close(DCB *); */ void dcb_process_zombies(int threadid); +/** + * Add a DCB to the owner's list + * + * @param dcb DCB to add + */ +void dcb_add_to_list(DCB *dcb); + void printAllDCBs(); /* Debug to print all DCB in the system */ void printDCB(DCB *); /* Debug print routine */ void dprintDCBList(DCB *); /* Debug print DCB list statistics */ diff --git a/server/core/dcb.c b/server/core/dcb.c index 87b9bca5a..471e9df08 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -100,6 +100,8 @@ static LIST_CONFIG DCBlist = /* A DCB with null values, used for initialization */ static DCB dcb_initialized = DCB_INIT; +static DCB **all_dcbs; +static SPINLOCK *all_dcbs_lock; static DCB **zombies; static int *nzombies; static int maxzombies = 0; @@ -110,11 +112,18 @@ void dcb_global_init() int nthreads = config_threadcount(); if ((zombies = MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL || + (all_dcbs = MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL || + (all_dcbs_lock = MXS_CALLOC(nthreads, sizeof(SPINLOCK))) == NULL || (nzombies = MXS_CALLOC(nthreads, sizeof(int))) == NULL) { MXS_OOM(); raise(SIGABRT); } + + for (int i = 0; i < nthreads; i++) + { + spinlock_init(&all_dcbs_lock[i]); + } } static void dcb_initialize(void *dcb); @@ -145,6 +154,7 @@ static int dcb_set_socket_option(int sockfd, int level, int optname, void *optva static void dcb_add_to_all_list(DCB *dcb); static DCB *dcb_find_free(); static GWBUF *dcb_grab_writeq(DCB *dcb, bool first_time); +static void dcb_remove_from_list(DCB *dcb); size_t dcb_get_session_id( DCB *dcb) @@ -597,6 +607,7 @@ dcb_process_victim_queue(int threadid) * Whether it is actually freed depends on the type of the DCB and how * many DCBs are linked to it via the SESSION object. */ dcb->state = DCB_STATE_DISCONNECTED; + dcb_remove_from_list(dcb); dcb_final_free(dcb); } /** Reset threads session data */ @@ -1940,7 +1951,6 @@ dprintOneDCB(DCB *pdcb, DCB *dcb) void dprintDCBList(DCB *pdcb) { - dprintListStats(pdcb, &DCBlist, "All DCBs"); } /** @@ -1951,19 +1961,19 @@ dprintDCBList(DCB *pdcb) void dprintAllDCBs(DCB *pdcb) { - list_entry_t *current; - current = list_start_iteration(&DCBlist); -#if SPINLOCK_PROFILE - dcb_printf(pdcb, "DCB List Spinlock Statistics:\n"); - spinlock_stats(&DCBlist->list_lock, spin_reporter, pdcb); - dcb_printf(pdcb, "Zombie Queue Lock Statistics:\n"); - spinlock_stats(&zombiespin, spin_reporter, pdcb); -#endif - while (current) + int nthr = config_threadcount(); + + for (int i = 0; i < nthr; i++) { - dprintOneDCB(pdcb, (DCB *)current); - current = list_iterate(&DCBlist, current); + spinlock_acquire(&all_dcbs_lock[i]); + + for (DCB *dcb = all_dcbs[i]; dcb; dcb = dcb->memdata.next) + { + dprintOneDCB(pdcb, dcb); + } + + spinlock_release(&all_dcbs_lock[i]); } } @@ -1975,24 +1985,28 @@ dprintAllDCBs(DCB *pdcb) void dListDCBs(DCB *pdcb) { - DCB *dcb; - list_entry_t *current; - - current = list_start_iteration(&DCBlist); dcb_printf(pdcb, "Descriptor Control Blocks\n"); dcb_printf(pdcb, "------------------+----------------------------+--------------------+----------\n"); dcb_printf(pdcb, " %-16s | %-26s | %-18s | %s\n", "DCB", "State", "Service", "Remote"); dcb_printf(pdcb, "------------------+----------------------------+--------------------+----------\n"); - while (current) + + int nthr = config_threadcount(); + + for (int i = 0; i < nthr; i++) { - dcb = (DCB *)current; - dcb_printf(pdcb, " %-16p | %-26s | %-18s | %s\n", - dcb, gw_dcb_state2string(dcb->state), - ((dcb->session && dcb->session->service) ? dcb->session->service->name : ""), - (dcb->remote ? dcb->remote : "")); - current = list_iterate(&DCBlist, current); + spinlock_acquire(&all_dcbs_lock[i]); + for (DCB *dcb = all_dcbs[i]; dcb; dcb = dcb->memdata.next) + { + dcb_printf(pdcb, " %-16p | %-26s | %-18s | %s\n", + dcb, gw_dcb_state2string(dcb->state), + ((dcb->session && dcb->session->service) ? dcb->session->service->name : ""), + (dcb->remote ? dcb->remote : "")); + } + + spinlock_release(&all_dcbs_lock[i]); } + dcb_printf(pdcb, "------------------+----------------------------+--------------------+----------\n\n"); } @@ -2004,33 +2018,35 @@ dListDCBs(DCB *pdcb) void dListClients(DCB *pdcb) { - DCB *dcb; - list_entry_t *current; - - current = list_start_iteration(&DCBlist); - dcb_printf(pdcb, "Client Connections\n"); dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n"); dcb_printf(pdcb, " %-15s | %-16s | %-20s | %s\n", "Client", "DCB", "Service", "Session"); dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n"); - while (current) + + int nthr = config_threadcount(); + + for (int i = 0; i < nthr; i++) { - dcb = (DCB *)current; - if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER) + spinlock_acquire(&all_dcbs_lock[i]); + for (DCB *dcb = all_dcbs[i]; dcb; dcb = dcb->memdata.next) { - dcb_printf(pdcb, " %-15s | %16p | %-20s | %10p\n", - (dcb->remote ? dcb->remote : ""), - dcb, (dcb->session->service ? - dcb->session->service->name : ""), - dcb->session); + if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER) + { + dcb_printf(pdcb, " %-15s | %16p | %-20s | %10p\n", + (dcb->remote ? dcb->remote : ""), + dcb, (dcb->session->service ? + dcb->session->service->name : ""), + dcb->session); + } } - current = list_iterate(&DCBlist, current); + + spinlock_release(&all_dcbs_lock[i]); } + dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n\n"); } - /** * Diagnostic to print a DCB to another DCB * @@ -2502,7 +2518,7 @@ dcb_call_callback(DCB *dcb, DCB_REASON reason) int dcb_isvalid(DCB *dcb) { - return (int)list_is_entry_in_use(&DCBlist, (list_entry_t *)dcb); + return !dcb->dcb_is_zombie; } /** @@ -2513,26 +2529,28 @@ dcb_isvalid(DCB *dcb) void dcb_hangup_foreach(struct server* server) { - DCB *dcb; - list_entry_t *current; + int nthr = config_threadcount(); - current = list_start_iteration(&DCBlist); - while (current) + for (int i = 0; i < nthr; i++) { - dcb = (DCB *)current; - spinlock_acquire(&dcb->dcb_initlock); - if (dcb->state == DCB_STATE_POLLING && dcb->server && - dcb->server == server) + spinlock_acquire(&all_dcbs_lock[i]); + + for (DCB *dcb = all_dcbs[i]; dcb; dcb = dcb->memdata.next) { - poll_fake_hangup_event(dcb); + spinlock_acquire(&dcb->dcb_initlock); + if (dcb->state == DCB_STATE_POLLING && dcb->server && + dcb->server == server) + { + poll_fake_hangup_event(dcb); + } + spinlock_release(&dcb->dcb_initlock); } - spinlock_release(&dcb->dcb_initlock); - current = list_iterate(&DCBlist, current); + + spinlock_release(&all_dcbs_lock[i]); } } - /** * Null protocol write routine used for cloned dcb's. It merely consumes * buffers written on the cloned DCB and sets the DCB_REPLIED flag. @@ -3404,3 +3422,60 @@ void dcb_append_readqueue(DCB *dcb, GWBUF *buffer) { dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, buffer); } + +void dcb_add_to_list(DCB *dcb) +{ + spinlock_acquire(&all_dcbs_lock[dcb->owner]); + + if (all_dcbs[dcb->owner] == NULL) + { + all_dcbs[dcb->owner] = dcb; + all_dcbs[dcb->owner]->thr_tail = dcb; + } + else + { + all_dcbs[dcb->owner]->thr_tail->thr_next = dcb; + all_dcbs[dcb->owner]->thr_tail = dcb; + } + + spinlock_release(&all_dcbs_lock[dcb->owner]); +} + +/** + * Remove a DCB from the owner's list + * + * @param dcb DCB to remove + */ +static void dcb_remove_from_list(DCB *dcb) +{ + spinlock_acquire(&all_dcbs_lock[dcb->owner]); + + if (dcb == all_dcbs[dcb->owner]) + { + DCB *tail = all_dcbs[dcb->owner]->thr_tail; + all_dcbs[dcb->owner] = all_dcbs[dcb->owner]->thr_next; + all_dcbs[dcb->owner]->thr_tail = tail; + } + else + { + DCB *current = all_dcbs[dcb->owner]->thr_next; + DCB *prev = all_dcbs[dcb->owner]; + + while (current) + { + if (current == dcb) + { + if (current == all_dcbs[dcb->owner]->thr_tail) + { + all_dcbs[dcb->owner]->thr_tail = prev; + } + prev->thr_next = current->thr_next; + break; + } + prev = current; + current = current->thr_next; + } + } + + spinlock_release(&all_dcbs_lock[dcb->owner]); +} diff --git a/server/core/poll.c b/server/core/poll.c index e2cc58d0c..0955ba20f 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -385,6 +385,8 @@ poll_add_dcb(DCB *dcb) dcb->owner = owner; spinlock_release(&dcb->dcb_initlock); + dcb_add_to_list(dcb); + rc = epoll_ctl(epoll_fd[owner], EPOLL_CTL_ADD, dcb->fd, &ev); if (rc) { @@ -719,13 +721,7 @@ poll_waitevents(void *arg) */ } - /* - * Process of the queue of waiting requests - * This is done without checking the evq_pending count as a - * precautionary measure to avoid issues if the house keeping - * of the count goes wrong. - */ - + /* Process of the queue of waiting requests */ for (int i = 0; i < nfds; i++) { process_pollq(thread_id, &events[i]); @@ -744,7 +740,6 @@ poll_waitevents(void *arg) spinlock_release(&fake_event_lock[thread_id]); } - /** Process fake events */ while (event) { struct epoll_event ev; @@ -752,7 +747,9 @@ poll_waitevents(void *arg) ev.data.ptr = event->dcb; ev.events = event->event; process_pollq(thread_id, &ev); + fake_event_t *tmp = event; event = event->next; + MXS_FREE(tmp); } if (check_timeouts && hkheartbeat >= next_timeout_check) @@ -764,7 +761,10 @@ poll_waitevents(void *arg) { thread_data[thread_id].state = THREAD_ZPROCESSING; } + + /** Process closed DCBs */ dcb_process_zombies(thread_id); + if (thread_data) { thread_data[thread_id].state = THREAD_IDLE; @@ -1195,10 +1195,6 @@ dprintPollStats(DCB *dcb) dcb_printf(dcb, "\t>= %d\t\t\t%" PRId32 "\n", MAXNFDS, pollStats.n_fds[MAXNFDS - 1]); -#if SPINLOCK_PROFILE - dcb_printf(dcb, "Event queue lock statistics:\n"); - spinlock_stats(&pollqlock, spin_reporter, dcb); -#endif } /** @@ -1541,33 +1537,6 @@ poll_fake_hangup_event(DCB *dcb) void dShowEventQ(DCB *pdcb) { - DCB *dcb; - char *tmp1, *tmp2; - - spinlock_acquire(&pollqlock); - if (eventq == NULL) - { - /* Nothing to process */ - spinlock_release(&pollqlock); - return; - } - dcb = eventq; - dcb_printf(pdcb, "\nEvent Queue.\n"); - dcb_printf(pdcb, "%-16s | %-10s | %-18s | %s\n", "DCB", "Status", "Processing Events", - "Pending Events"); - dcb_printf(pdcb, "-----------------+------------+--------------------+-------------------\n"); - do - { - dcb_printf(pdcb, "%-16p | %-10s | %-18s | %-18s\n", dcb, - dcb->evq.processing ? "Processing" : "Pending", - (tmp1 = event_to_string(dcb->evq.processing_events)), - (tmp2 = event_to_string(dcb->evq.pending_events))); - MXS_FREE(tmp1); - MXS_FREE(tmp2); - dcb = dcb->evq.next; - } - while (dcb != eventq); - spinlock_release(&pollqlock); }