diff --git a/include/maxscale/dcb.h b/include/maxscale/dcb.h index 535a55422..dee34ef13 100644 --- a/include/maxscale/dcb.h +++ b/include/maxscale/dcb.h @@ -23,6 +23,7 @@ #include #include #include +#include #include MXS_BEGIN_DECLS @@ -184,6 +185,7 @@ typedef enum */ typedef struct dcb { + MXS_POLL_DATA poll; skygw_chk_t dcb_chk_top; bool dcb_errhandle_called; /*< this can be called only once */ bool dcb_is_zombie; /**< Whether the DCB is in the zombie list */ @@ -228,7 +230,6 @@ typedef struct dcb bool was_persistent; /**< Whether this DCB was in the persistent pool */ struct { - int id; /**< The owning thread's ID */ struct dcb *next; /**< Next DCB in owning thread's list */ struct dcb *tail; /**< Last DCB in owning thread's list */ } thread; diff --git a/include/maxscale/poll_core.h b/include/maxscale/poll_core.h new file mode 100644 index 000000000..ef5a0c8cc --- /dev/null +++ b/include/maxscale/poll_core.h @@ -0,0 +1,83 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +/** + * @file poll_basic.h The Descriptor Control Block + */ + +#include + +typedef struct mxs_poll_data +{ + /** Pointer to function that knows how to handle events for this particular + * 'struct mxs_poll_data' structure. + * + * @param data The `mxs_poll_data` instance that contained this pointer. + * @param wid The worker thread id. + * @param events The epoll events. + */ + void (*handler)(struct mxs_poll_data *data, int wid, uint32_t events); + + struct + { + /** + * The id of the worker thread + */ + int id; + } thread; +} MXS_POLL_DATA; + +/** + * A file descriptor should be added to the poll set of all workers. + */ +#define MXS_WORKER_ALL -1 + +/** + * A file descriptor should be added to the poll set of some worker. + */ +#define MXS_WORKER_ANY -2 + +/** + * Add a file descriptor with associated data to the poll set. + * + * @param wid `MXS_WORKER_ALL` if the file descriptor should be added to the + * poll set of all workers, `MXS_WORKER_ANY` if the file descriptor + * should be added to some worker, otherwise the id of a worker. + * @param fd The file descriptor to be added. + * @param events Mask of epoll event types. + * @param data The structure containing the file descriptor to be + * added. + * + * data->handler : Handler that knows how to deal with events + * for this particular type of 'struct mxs_poll_data'. + * data->thread.id: Will be updated by `poll_add_fd_to_worker`. + * + * @attention If the descriptor should be added to all workers, then the worker + * thread id will be 0. + * + * @return 0 on success, non-zero on failure. + */ +int poll_add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data); + + +/** + * Remove a file descriptor from a poll set. + * + * @param wid `MXS_WORKER_ALL` if the file descriptor should be removed from + * the poll set of all workers; otherwise the id of a worker. + * @param fd The file descriptor to be removed. + * + * @return 0 on success, non-zero on failure. + */ +int poll_remove_fd_from_worker(int wid, int fd); diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 4d27c20f2..6354f9e6b 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -277,7 +277,7 @@ dcb_clone(DCB *orig) clonedcb->ssl_state = orig->ssl_state; clonedcb->remote = remote; clonedcb->user = user; - clonedcb->thread.id = orig->thread.id; + clonedcb->poll.thread.id = orig->poll.thread.id; clonedcb->protocol = orig->protocol; clonedcb->func.write = dcb_null_write; @@ -619,7 +619,7 @@ dcb_connect(SERVER *server, MXS_SESSION *session, const char *protocol) { MXS_DEBUG("%lu [dcb_connect] Looking for persistent connection DCB " "user %s protocol %s\n", pthread_self(), user, protocol); - dcb = server_get_persistent(server, user, protocol, session->client_dcb->thread.id); + dcb = server_get_persistent(server, user, protocol, session->client_dcb->poll.thread.id); if (dcb) { /** @@ -1428,7 +1428,7 @@ dcb_close(DCB *dcb) /*< * Add closing dcb to the top of the list, setting zombie marker */ - int owner = dcb->thread.id; + int owner = dcb->poll.thread.id; dcb->dcb_is_zombie = true; dcb->memdata.next = zombies[owner]; zombies[owner] = dcb; @@ -1462,7 +1462,7 @@ dcb_maybe_add_persistent(DCB *dcb) && (dcb->server->status & SERVER_RUNNING) && !dcb->dcb_errhandle_called && !(dcb->flags & DCBF_HUNG) - && dcb_persistent_clean_count(dcb, dcb->thread.id, false) < dcb->server->persistpoolmax + && dcb_persistent_clean_count(dcb, dcb->poll.thread.id, false) < dcb->server->persistpoolmax && dcb->server->stats.n_persistent < dcb->server->persistpoolmax) { DCB_CALLBACK *loopcallback; @@ -1492,8 +1492,8 @@ dcb_maybe_add_persistent(DCB *dcb) MXS_FREE(loopcallback); } - dcb->nextpersistent = dcb->server->persistent[dcb->thread.id]; - dcb->server->persistent[dcb->thread.id] = dcb; + dcb->nextpersistent = dcb->server->persistent[dcb->poll.thread.id]; + dcb->server->persistent[dcb->poll.thread.id] = dcb; atomic_add(&dcb->server->stats.n_persistent, 1); atomic_add(&dcb->server->stats.n_current, -1); return true; @@ -3035,20 +3035,20 @@ void dcb_add_to_list(DCB *dcb) * as that part is done in the final zombie processing. */ - spinlock_acquire(&all_dcbs_lock[dcb->thread.id]); + spinlock_acquire(&all_dcbs_lock[dcb->poll.thread.id]); - if (all_dcbs[dcb->thread.id] == NULL) + if (all_dcbs[dcb->poll.thread.id] == NULL) { - all_dcbs[dcb->thread.id] = dcb; - all_dcbs[dcb->thread.id]->thread.tail = dcb; + all_dcbs[dcb->poll.thread.id] = dcb; + all_dcbs[dcb->poll.thread.id]->thread.tail = dcb; } else { - all_dcbs[dcb->thread.id]->thread.tail->thread.next = dcb; - all_dcbs[dcb->thread.id]->thread.tail = dcb; + all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb; + all_dcbs[dcb->poll.thread.id]->thread.tail = dcb; } - spinlock_release(&all_dcbs_lock[dcb->thread.id]); + spinlock_release(&all_dcbs_lock[dcb->poll.thread.id]); } } @@ -3059,30 +3059,30 @@ void dcb_add_to_list(DCB *dcb) */ static void dcb_remove_from_list(DCB *dcb) { - spinlock_acquire(&all_dcbs_lock[dcb->thread.id]); + spinlock_acquire(&all_dcbs_lock[dcb->poll.thread.id]); - if (dcb == all_dcbs[dcb->thread.id]) + if (dcb == all_dcbs[dcb->poll.thread.id]) { - DCB *tail = all_dcbs[dcb->thread.id]->thread.tail; - all_dcbs[dcb->thread.id] = all_dcbs[dcb->thread.id]->thread.next; + DCB *tail = all_dcbs[dcb->poll.thread.id]->thread.tail; + all_dcbs[dcb->poll.thread.id] = all_dcbs[dcb->poll.thread.id]->thread.next; - if (all_dcbs[dcb->thread.id]) + if (all_dcbs[dcb->poll.thread.id]) { - all_dcbs[dcb->thread.id]->thread.tail = tail; + all_dcbs[dcb->poll.thread.id]->thread.tail = tail; } } else { - DCB *current = all_dcbs[dcb->thread.id]->thread.next; - DCB *prev = all_dcbs[dcb->thread.id]; + DCB *current = all_dcbs[dcb->poll.thread.id]->thread.next; + DCB *prev = all_dcbs[dcb->poll.thread.id]; while (current) { if (current == dcb) { - if (current == all_dcbs[dcb->thread.id]->thread.tail) + if (current == all_dcbs[dcb->poll.thread.id]->thread.tail) { - all_dcbs[dcb->thread.id]->thread.tail = prev; + all_dcbs[dcb->poll.thread.id]->thread.tail = prev; } prev->thread.next = current->thread.next; break; @@ -3097,7 +3097,7 @@ static void dcb_remove_from_list(DCB *dcb) dcb->thread.next = NULL; dcb->thread.tail = NULL; - spinlock_release(&all_dcbs_lock[dcb->thread.id]); + spinlock_release(&all_dcbs_lock[dcb->poll.thread.id]); } /** diff --git a/server/core/poll.cc b/server/core/poll.cc index 92930e688..f859d3eb4 100644 --- a/server/core/poll.cc +++ b/server/core/poll.cc @@ -111,7 +111,9 @@ static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */ #endif static int n_waiting = 0; /*< No. of threads in epoll_wait */ -static int process_pollq(int thread_id, struct epoll_event *event); +static int process_pollq_dcb(DCB *dcb, int thread_id, uint32_t ev); +static void dcb_poll_handler(MXS_POLL_DATA *data, int wid, uint32_t events); + static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, uint32_t ev); static bool poll_dcb_session_check(DCB *dcb, const char *); static void poll_check_message(void); @@ -218,7 +220,7 @@ static void poll_loadav(void *); /** * Function to analyse error return from epoll_ctl */ -static int poll_resolve_error(DCB *, int, bool); +static int poll_resolve_error(int fd, int error, int op); /** * Initialise the polling system we are using for the gateway. @@ -320,21 +322,153 @@ poll_init() max_poll_sleep = config_pollsleep(); } +static int add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data) +{ + ss_dassert((wid >= 0) && (wid <= n_threads)); + + struct epoll_event ev; + + ev.events = events; + ev.data.ptr = data; + + data->thread.id = wid; + int rc = epoll_ctl(epoll_fd[wid], EPOLL_CTL_ADD, fd, &ev); + + if (rc != 0) + { + rc = poll_resolve_error(fd, errno, EPOLL_CTL_ADD); + } + + return rc; +} + +static int add_fd_to_workers(int fd, uint32_t events, MXS_POLL_DATA* data) +{ + struct epoll_event ev; + + ev.events = events; + ev.data.ptr = data; + data->thread.id = 0; // In this case, the dcb will appear to be on the main thread. + + int stored_errno = 0; + int rc = 0; + + for (int i = 0; i < n_threads; i++) + { + rc = epoll_ctl(epoll_fd[i], EPOLL_CTL_ADD, fd, &ev); + + if (rc != 0) + { + stored_errno = errno; + /** Remove the fd from the previous epoll instances */ + for (int j = 0; j < i; j++) + { + epoll_ctl(epoll_fd[j], EPOLL_CTL_DEL, fd, &ev); + } + break; + } + } + + if (rc != 0) + { + rc = poll_resolve_error(fd, stored_errno, EPOLL_CTL_ADD); + } + + return rc; +} + +int poll_add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data) +{ + int rc; + + if (wid == MXS_WORKER_ANY) + { + wid = (int)atomic_add(&next_epoll_fd, 1) % n_threads; + + rc = add_fd_to_worker(wid, fd, events, data); + } + else if (wid == MXS_WORKER_ALL) + { + rc = add_fd_to_workers(fd, events, data); + } + else + { + ss_dassert((wid >= 0) && (wid < n_threads)); + + if ((wid >= 0) && (wid < n_threads)) + { + rc = add_fd_to_worker(wid, fd, events, data); + } + else + { + errno = EINVAL; + rc = -1; + } + } + + return rc; +} + +static int remove_fd_from_worker(int wid, int fd) +{ + ss_dassert((wid >= 0) && (wid < n_threads)); + + struct epoll_event ev = {}; + + int rc = epoll_ctl(epoll_fd[wid], EPOLL_CTL_DEL, fd, &ev); + + if (rc == -1) + { + rc = poll_resolve_error(fd, errno, EPOLL_CTL_DEL); + } + + return rc; +} + +static int remove_fd_from_workers(int fd) +{ + int rc; + + for (int i = 0; i < n_threads; ++i) + { + // We don't store the error, anything serious and the process will + // have been taken down in poll_resolve_error(). + remove_fd_from_worker(i, fd); + } + + return 0; +} + +int poll_remove_fd_from_worker(int wid, int fd) +{ + int rc; + + if (wid == MXS_WORKER_ALL) + { + rc = remove_fd_from_workers(fd); + } + else + { + rc = remove_fd_from_worker(wid, fd); + } + + return rc; +} + int poll_add_dcb(DCB *dcb) { int rc = -1; dcb_state_t old_state = dcb->state; dcb_state_t new_state; - struct epoll_event ev; + uint32_t events = 0; CHK_DCB(dcb); #ifdef EPOLLRDHUP - ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET; + events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET; #else - ev.events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET; + events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET; #endif - ev.data.ptr = dcb; /*< * Choose new state according to the role of dcb. @@ -377,57 +511,28 @@ int poll_add_dcb(DCB *dcb) * The only possible failure that will not cause a crash is * running out of system resources. */ - int owner = 0; - - if (dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER) - { - owner = dcb->session->client_dcb->thread.id; - } - else - { - owner = (unsigned int)atomic_add(&next_epoll_fd, 1) % n_threads; - } - - dcb->thread.id = owner; - - dcb_add_to_list(dcb); - - int error_num = 0; + int worker_id = 0; if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER) { - /** Listeners are added to all epoll instances */ - int nthr = config_threadcount(); - - for (int i = 0; i < nthr; i++) - { - if ((rc = epoll_ctl(epoll_fd[i], EPOLL_CTL_ADD, dcb->fd, &ev))) - { - error_num = errno; - /** Remove the listener from the previous epoll instances */ - for (int j = 0; j < i; j++) - { - epoll_ctl(epoll_fd[j], EPOLL_CTL_DEL, dcb->fd, &ev); - } - break; - } - } + worker_id = MXS_WORKER_ALL; + } + else if (dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER) + { + worker_id = dcb->session->client_dcb->poll.thread.id; } else { - if ((rc = epoll_ctl(epoll_fd[owner], EPOLL_CTL_ADD, dcb->fd, &ev))) - { - error_num = errno; - } + worker_id = (unsigned int)atomic_add(&next_epoll_fd, 1) % n_threads; } - if (rc) - { - /* Some errors are actually considered acceptable */ - rc = poll_resolve_error(dcb, error_num, true); - } + dcb->poll.handler = dcb_poll_handler; + rc = poll_add_fd_to_worker(worker_id, dcb->fd, events, (MXS_POLL_DATA*)dcb); + if (0 == rc) { + dcb_add_to_list(dcb); + MXS_DEBUG("%lu [poll_add_dcb] Added dcb %p in state %s to poll set.", pthread_self(), dcb, @@ -477,46 +582,18 @@ int poll_remove_dcb(DCB *dcb) if (dcbfd > 0) { - int error_num = 0; + int worker_id; if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER) { - /** Listeners are added to all epoll instances */ - int nthr = config_threadcount(); - - for (int i = 0; i < nthr; i++) - { - int tmp_rc = epoll_ctl(epoll_fd[i], EPOLL_CTL_DEL, dcb->fd, &ev); - if (tmp_rc && rc == 0) - { - /** Even if one of the instances failed to remove it, try - * to remove it from all the others */ - rc = tmp_rc; - error_num = errno; - ss_dassert(error_num); - } - } + worker_id = MXS_WORKER_ALL; } else { - if ((rc = epoll_ctl(epoll_fd[dcb->thread.id], EPOLL_CTL_DEL, dcbfd, &ev))) - { - error_num = errno; - } - } - /** - * The poll_resolve_error function will always - * return 0 or crash. So if it returns non-zero result, - * things have gone wrong and we crash. - */ - if (rc) - { - rc = poll_resolve_error(dcb, error_num, false); - } - if (rc) - { - raise(SIGABRT); + worker_id = dcb->poll.thread.id; } + + rc = poll_remove_fd_from_worker(worker_id, dcbfd); } return rc; } @@ -530,20 +607,20 @@ int poll_remove_dcb(DCB *dcb) * here to record the problem. * * @param errornum The errno set by epoll_ctl - * @param adding True for adding to poll list, false for removing + * @param op Either EPOLL_CTL_ADD or EPOLL_CTL_DEL. * @return -1 on error or 0 for possibly revised return code */ static int -poll_resolve_error(DCB *dcb, int errornum, bool adding) +poll_resolve_error(int fd, int errornum, int op) { - if (adding) + if (op == EPOLL_CTL_ADD) { if (EEXIST == errornum) { MXS_ERROR("%lu [poll_resolve_error] Error : epoll_ctl could not add, " - "already exists for DCB %p.", + "already exists for descriptor %d.", pthread_self(), - dcb); + fd); // Assume another thread added and no serious harm done return 0; } @@ -552,22 +629,24 @@ poll_resolve_error(DCB *dcb, int errornum, bool adding) MXS_ERROR("%lu [poll_resolve_error] The limit imposed by " "/proc/sys/fs/epoll/max_user_watches was " "encountered while trying to register (EPOLL_CTL_ADD) a new " - "file descriptor on an epoll instance for dcb %p.", + "file descriptor on an epoll instance for descriptor %d.", pthread_self(), - dcb); + fd); /* Failure - assume handled by callers */ return -1; } } else { + ss_dassert(op == EPOLL_CTL_DEL); + /* Must be removing */ if (ENOENT == errornum) { MXS_ERROR("%lu [poll_resolve_error] Error : epoll_ctl could not remove, " - "not found, for dcb %p.", + "not found, for dcb %d.", pthread_self(), - dcb); + fd); // Assume another thread removed and no serious harm done return 0; } @@ -759,32 +838,9 @@ poll_waitevents(void *arg) /* Process of the queue of waiting requests */ for (int i = 0; i < nfds; i++) { - process_pollq(thread_id, &events[i]); - } + MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr; - fake_event_t *event = NULL; - - /** It is very likely that the queue is empty so to avoid hitting the - * spinlock every time we receive events, we only do a dirty read. Currently, - * only the monitors inject fake events from external threads. */ - if (fake_events[thread_id]) - { - spinlock_acquire(&fake_event_lock[thread_id]); - event = fake_events[thread_id]; - fake_events[thread_id] = NULL; - spinlock_release(&fake_event_lock[thread_id]); - } - - while (event) - { - struct epoll_event ev; - event->dcb->dcb_fakequeue = event->data; - ev.data.ptr = event->dcb; - ev.events = event->event; - process_pollq(thread_id, &ev); - fake_event_t *tmp = event; - event = event->next; - MXS_FREE(tmp); + data->handler(data, thread_id, events[i].events); } dcb_process_idle_sessions(thread_id); @@ -863,11 +919,9 @@ poll_set_maxwait(unsigned int maxwait) * @return 0 if no DCB's have been processed */ static int -process_pollq(int thread_id, struct epoll_event *event) +process_pollq_dcb(DCB *dcb, int thread_id, uint32_t ev) { - uint32_t ev = event->events; - DCB *dcb = (DCB*)event->data.ptr; - ss_dassert(dcb->thread.id == thread_id || dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER); + ss_dassert(dcb->poll.thread.id == thread_id || dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER); /** Calculate event queue statistics */ uint64_t started = hkheartbeat; @@ -1057,6 +1111,38 @@ process_pollq(int thread_id, struct epoll_event *event) return 1; } +static void dcb_poll_handler(MXS_POLL_DATA *data, int wid, uint32_t events) +{ + process_pollq_dcb((DCB*)data, wid, events); + + // Since this loop is now here, it will be processed once per extracted epoll + // event and not once per extraction of events, but as this is temporary code + // that's ok. Once it'll be possible to send cross-thread messages, the need + // for the fake event list will disappear. + + fake_event_t *event = NULL; + + /** It is very likely that the queue is empty so to avoid hitting the + * spinlock every time we receive events, we only do a dirty read. Currently, + * only the monitors inject fake events from external threads. */ + if (fake_events[wid]) + { + spinlock_acquire(&fake_event_lock[wid]); + event = fake_events[wid]; + fake_events[wid] = NULL; + spinlock_release(&fake_event_lock[wid]); + } + + while (event) + { + event->dcb->dcb_fakequeue = event->data; + process_pollq_dcb(event->dcb, wid, event->event); + fake_event_t *tmp = event; + event = event->next; + MXS_FREE(tmp); + } +} + /** * * Check that the DCB has a session link before processing. @@ -1398,7 +1484,7 @@ static void poll_add_event_to_dcb(DCB* dcb, event->next = NULL; event->tail = event; - int thr = dcb->thread.id; + int thr = dcb->poll.thread.id; /** It is possible that a housekeeper or a monitor thread inserts a fake * event into the thread's event queue which is why the operation needs diff --git a/server/core/session.cc b/server/core/session.cc index 07d8ba7e3..879aa66dd 100644 --- a/server/core/session.cc +++ b/server/core/session.cc @@ -277,7 +277,7 @@ session_link_dcb(MXS_SESSION *session, DCB *dcb) atomic_add(&session->refcount, 1); dcb->session = session; /** Move this DCB under the same thread */ - dcb->thread.id = session->client_dcb->thread.id; + dcb->poll.thread.id = session->client_dcb->poll.thread.id; return true; }