WIP: Allow the adding of any fds to the poll set

This is just a first step in a trial that will allow the addition
of any file descriptor to the general poll mechanism and hence
allow any i/o to be handled by the worker threads.

There is a structure

  typedef struct mxs_poll_data
  {
      void (*handler)(struct mxs_poll_data *data, int wid, uint32_t events);
      struct
      {
          int id;
      } thread;
  } MXS_POLL_DATA;

that any other structure (e.g. a DCB) encapsulating a file descriptor must
have as its first member (a C++ struct could basically derive from it).

That structure contains two members; 'handler' and 'thread.id'. Handler is a
pointer to a function taking a pointer to a struct mxs_poll_data, a worker thread
if and an epoll event mask as argument.

So, DCB is modified to have MXS_POLL_DATA as its first member and 'handler'
is initialized with a function that *knows* the passed MXS_POLL_DATA can
be downcast to a DCB.

process_pollq no longer exists, but is now called process_pollq_dcb. The
general stuff related to statistics etc. will be moved to poll_waitevents
itself after which the whole function is moved to dcb.c. At that point,
the handler pointer will be set in dcb_alloc().

Effectively poll.[h|c] will provide a generic mechanism for listening on
whatever descriptors and the dcb stuff will be part of dcb.[h|c].
This commit is contained in:
Johan Wikman
2017-02-01 18:44:53 +02:00
parent 5704ae5ffd
commit 63141bb191
5 changed files with 315 additions and 145 deletions

View File

@ -23,6 +23,7 @@
#include <maxscale/authenticator.h>
#include <maxscale/ssl.h>
#include <maxscale/modinfo.h>
#include <maxscale/poll_core.h>
#include <netinet/in.h>
MXS_BEGIN_DECLS
@ -184,6 +185,7 @@ typedef enum
*/
typedef struct dcb
{
MXS_POLL_DATA poll;
skygw_chk_t dcb_chk_top;
bool dcb_errhandle_called; /*< this can be called only once */
bool dcb_is_zombie; /**< Whether the DCB is in the zombie list */
@ -228,7 +230,6 @@ typedef struct dcb
bool was_persistent; /**< Whether this DCB was in the persistent pool */
struct
{
int id; /**< The owning thread's ID */
struct dcb *next; /**< Next DCB in owning thread's list */
struct dcb *tail; /**< Last DCB in owning thread's list */
} thread;

View File

@ -0,0 +1,83 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
/**
* @file poll_basic.h The Descriptor Control Block
*/
#include <maxscale/cdefs.h>
typedef struct mxs_poll_data
{
/** Pointer to function that knows how to handle events for this particular
* 'struct mxs_poll_data' structure.
*
* @param data The `mxs_poll_data` instance that contained this pointer.
* @param wid The worker thread id.
* @param events The epoll events.
*/
void (*handler)(struct mxs_poll_data *data, int wid, uint32_t events);
struct
{
/**
* The id of the worker thread
*/
int id;
} thread;
} MXS_POLL_DATA;
/**
* A file descriptor should be added to the poll set of all workers.
*/
#define MXS_WORKER_ALL -1
/**
* A file descriptor should be added to the poll set of some worker.
*/
#define MXS_WORKER_ANY -2
/**
* Add a file descriptor with associated data to the poll set.
*
* @param wid `MXS_WORKER_ALL` if the file descriptor should be added to the
* poll set of all workers, `MXS_WORKER_ANY` if the file descriptor
* should be added to some worker, otherwise the id of a worker.
* @param fd The file descriptor to be added.
* @param events Mask of epoll event types.
* @param data The structure containing the file descriptor to be
* added.
*
* data->handler : Handler that knows how to deal with events
* for this particular type of 'struct mxs_poll_data'.
* data->thread.id: Will be updated by `poll_add_fd_to_worker`.
*
* @attention If the descriptor should be added to all workers, then the worker
* thread id will be 0.
*
* @return 0 on success, non-zero on failure.
*/
int poll_add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data);
/**
* Remove a file descriptor from a poll set.
*
* @param wid `MXS_WORKER_ALL` if the file descriptor should be removed from
* the poll set of all workers; otherwise the id of a worker.
* @param fd The file descriptor to be removed.
*
* @return 0 on success, non-zero on failure.
*/
int poll_remove_fd_from_worker(int wid, int fd);

View File

@ -277,7 +277,7 @@ dcb_clone(DCB *orig)
clonedcb->ssl_state = orig->ssl_state;
clonedcb->remote = remote;
clonedcb->user = user;
clonedcb->thread.id = orig->thread.id;
clonedcb->poll.thread.id = orig->poll.thread.id;
clonedcb->protocol = orig->protocol;
clonedcb->func.write = dcb_null_write;
@ -619,7 +619,7 @@ dcb_connect(SERVER *server, MXS_SESSION *session, const char *protocol)
{
MXS_DEBUG("%lu [dcb_connect] Looking for persistent connection DCB "
"user %s protocol %s\n", pthread_self(), user, protocol);
dcb = server_get_persistent(server, user, protocol, session->client_dcb->thread.id);
dcb = server_get_persistent(server, user, protocol, session->client_dcb->poll.thread.id);
if (dcb)
{
/**
@ -1428,7 +1428,7 @@ dcb_close(DCB *dcb)
/*<
* Add closing dcb to the top of the list, setting zombie marker
*/
int owner = dcb->thread.id;
int owner = dcb->poll.thread.id;
dcb->dcb_is_zombie = true;
dcb->memdata.next = zombies[owner];
zombies[owner] = dcb;
@ -1462,7 +1462,7 @@ dcb_maybe_add_persistent(DCB *dcb)
&& (dcb->server->status & SERVER_RUNNING)
&& !dcb->dcb_errhandle_called
&& !(dcb->flags & DCBF_HUNG)
&& dcb_persistent_clean_count(dcb, dcb->thread.id, false) < dcb->server->persistpoolmax
&& dcb_persistent_clean_count(dcb, dcb->poll.thread.id, false) < dcb->server->persistpoolmax
&& dcb->server->stats.n_persistent < dcb->server->persistpoolmax)
{
DCB_CALLBACK *loopcallback;
@ -1492,8 +1492,8 @@ dcb_maybe_add_persistent(DCB *dcb)
MXS_FREE(loopcallback);
}
dcb->nextpersistent = dcb->server->persistent[dcb->thread.id];
dcb->server->persistent[dcb->thread.id] = dcb;
dcb->nextpersistent = dcb->server->persistent[dcb->poll.thread.id];
dcb->server->persistent[dcb->poll.thread.id] = dcb;
atomic_add(&dcb->server->stats.n_persistent, 1);
atomic_add(&dcb->server->stats.n_current, -1);
return true;
@ -3035,20 +3035,20 @@ void dcb_add_to_list(DCB *dcb)
* as that part is done in the final zombie processing.
*/
spinlock_acquire(&all_dcbs_lock[dcb->thread.id]);
spinlock_acquire(&all_dcbs_lock[dcb->poll.thread.id]);
if (all_dcbs[dcb->thread.id] == NULL)
if (all_dcbs[dcb->poll.thread.id] == NULL)
{
all_dcbs[dcb->thread.id] = dcb;
all_dcbs[dcb->thread.id]->thread.tail = dcb;
all_dcbs[dcb->poll.thread.id] = dcb;
all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
}
else
{
all_dcbs[dcb->thread.id]->thread.tail->thread.next = dcb;
all_dcbs[dcb->thread.id]->thread.tail = dcb;
all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb;
all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
}
spinlock_release(&all_dcbs_lock[dcb->thread.id]);
spinlock_release(&all_dcbs_lock[dcb->poll.thread.id]);
}
}
@ -3059,30 +3059,30 @@ void dcb_add_to_list(DCB *dcb)
*/
static void dcb_remove_from_list(DCB *dcb)
{
spinlock_acquire(&all_dcbs_lock[dcb->thread.id]);
spinlock_acquire(&all_dcbs_lock[dcb->poll.thread.id]);
if (dcb == all_dcbs[dcb->thread.id])
if (dcb == all_dcbs[dcb->poll.thread.id])
{
DCB *tail = all_dcbs[dcb->thread.id]->thread.tail;
all_dcbs[dcb->thread.id] = all_dcbs[dcb->thread.id]->thread.next;
DCB *tail = all_dcbs[dcb->poll.thread.id]->thread.tail;
all_dcbs[dcb->poll.thread.id] = all_dcbs[dcb->poll.thread.id]->thread.next;
if (all_dcbs[dcb->thread.id])
if (all_dcbs[dcb->poll.thread.id])
{
all_dcbs[dcb->thread.id]->thread.tail = tail;
all_dcbs[dcb->poll.thread.id]->thread.tail = tail;
}
}
else
{
DCB *current = all_dcbs[dcb->thread.id]->thread.next;
DCB *prev = all_dcbs[dcb->thread.id];
DCB *current = all_dcbs[dcb->poll.thread.id]->thread.next;
DCB *prev = all_dcbs[dcb->poll.thread.id];
while (current)
{
if (current == dcb)
{
if (current == all_dcbs[dcb->thread.id]->thread.tail)
if (current == all_dcbs[dcb->poll.thread.id]->thread.tail)
{
all_dcbs[dcb->thread.id]->thread.tail = prev;
all_dcbs[dcb->poll.thread.id]->thread.tail = prev;
}
prev->thread.next = current->thread.next;
break;
@ -3097,7 +3097,7 @@ static void dcb_remove_from_list(DCB *dcb)
dcb->thread.next = NULL;
dcb->thread.tail = NULL;
spinlock_release(&all_dcbs_lock[dcb->thread.id]);
spinlock_release(&all_dcbs_lock[dcb->poll.thread.id]);
}
/**

View File

@ -111,7 +111,9 @@ static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */
#endif
static int n_waiting = 0; /*< No. of threads in epoll_wait */
static int process_pollq(int thread_id, struct epoll_event *event);
static int process_pollq_dcb(DCB *dcb, int thread_id, uint32_t ev);
static void dcb_poll_handler(MXS_POLL_DATA *data, int wid, uint32_t events);
static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, uint32_t ev);
static bool poll_dcb_session_check(DCB *dcb, const char *);
static void poll_check_message(void);
@ -218,7 +220,7 @@ static void poll_loadav(void *);
/**
* Function to analyse error return from epoll_ctl
*/
static int poll_resolve_error(DCB *, int, bool);
static int poll_resolve_error(int fd, int error, int op);
/**
* Initialise the polling system we are using for the gateway.
@ -320,21 +322,153 @@ poll_init()
max_poll_sleep = config_pollsleep();
}
static int add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data)
{
ss_dassert((wid >= 0) && (wid <= n_threads));
struct epoll_event ev;
ev.events = events;
ev.data.ptr = data;
data->thread.id = wid;
int rc = epoll_ctl(epoll_fd[wid], EPOLL_CTL_ADD, fd, &ev);
if (rc != 0)
{
rc = poll_resolve_error(fd, errno, EPOLL_CTL_ADD);
}
return rc;
}
static int add_fd_to_workers(int fd, uint32_t events, MXS_POLL_DATA* data)
{
struct epoll_event ev;
ev.events = events;
ev.data.ptr = data;
data->thread.id = 0; // In this case, the dcb will appear to be on the main thread.
int stored_errno = 0;
int rc = 0;
for (int i = 0; i < n_threads; i++)
{
rc = epoll_ctl(epoll_fd[i], EPOLL_CTL_ADD, fd, &ev);
if (rc != 0)
{
stored_errno = errno;
/** Remove the fd from the previous epoll instances */
for (int j = 0; j < i; j++)
{
epoll_ctl(epoll_fd[j], EPOLL_CTL_DEL, fd, &ev);
}
break;
}
}
if (rc != 0)
{
rc = poll_resolve_error(fd, stored_errno, EPOLL_CTL_ADD);
}
return rc;
}
int poll_add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data)
{
int rc;
if (wid == MXS_WORKER_ANY)
{
wid = (int)atomic_add(&next_epoll_fd, 1) % n_threads;
rc = add_fd_to_worker(wid, fd, events, data);
}
else if (wid == MXS_WORKER_ALL)
{
rc = add_fd_to_workers(fd, events, data);
}
else
{
ss_dassert((wid >= 0) && (wid < n_threads));
if ((wid >= 0) && (wid < n_threads))
{
rc = add_fd_to_worker(wid, fd, events, data);
}
else
{
errno = EINVAL;
rc = -1;
}
}
return rc;
}
static int remove_fd_from_worker(int wid, int fd)
{
ss_dassert((wid >= 0) && (wid < n_threads));
struct epoll_event ev = {};
int rc = epoll_ctl(epoll_fd[wid], EPOLL_CTL_DEL, fd, &ev);
if (rc == -1)
{
rc = poll_resolve_error(fd, errno, EPOLL_CTL_DEL);
}
return rc;
}
static int remove_fd_from_workers(int fd)
{
int rc;
for (int i = 0; i < n_threads; ++i)
{
// We don't store the error, anything serious and the process will
// have been taken down in poll_resolve_error().
remove_fd_from_worker(i, fd);
}
return 0;
}
int poll_remove_fd_from_worker(int wid, int fd)
{
int rc;
if (wid == MXS_WORKER_ALL)
{
rc = remove_fd_from_workers(fd);
}
else
{
rc = remove_fd_from_worker(wid, fd);
}
return rc;
}
int poll_add_dcb(DCB *dcb)
{
int rc = -1;
dcb_state_t old_state = dcb->state;
dcb_state_t new_state;
struct epoll_event ev;
uint32_t events = 0;
CHK_DCB(dcb);
#ifdef EPOLLRDHUP
ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET;
events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET;
#else
ev.events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET;
events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET;
#endif
ev.data.ptr = dcb;
/*<
* Choose new state according to the role of dcb.
@ -377,57 +511,28 @@ int poll_add_dcb(DCB *dcb)
* The only possible failure that will not cause a crash is
* running out of system resources.
*/
int owner = 0;
if (dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER)
{
owner = dcb->session->client_dcb->thread.id;
}
else
{
owner = (unsigned int)atomic_add(&next_epoll_fd, 1) % n_threads;
}
dcb->thread.id = owner;
dcb_add_to_list(dcb);
int error_num = 0;
int worker_id = 0;
if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER)
{
/** Listeners are added to all epoll instances */
int nthr = config_threadcount();
for (int i = 0; i < nthr; i++)
{
if ((rc = epoll_ctl(epoll_fd[i], EPOLL_CTL_ADD, dcb->fd, &ev)))
{
error_num = errno;
/** Remove the listener from the previous epoll instances */
for (int j = 0; j < i; j++)
{
epoll_ctl(epoll_fd[j], EPOLL_CTL_DEL, dcb->fd, &ev);
}
break;
}
worker_id = MXS_WORKER_ALL;
}
else if (dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER)
{
worker_id = dcb->session->client_dcb->poll.thread.id;
}
else
{
if ((rc = epoll_ctl(epoll_fd[owner], EPOLL_CTL_ADD, dcb->fd, &ev)))
{
error_num = errno;
}
worker_id = (unsigned int)atomic_add(&next_epoll_fd, 1) % n_threads;
}
if (rc)
{
/* Some errors are actually considered acceptable */
rc = poll_resolve_error(dcb, error_num, true);
}
dcb->poll.handler = dcb_poll_handler;
rc = poll_add_fd_to_worker(worker_id, dcb->fd, events, (MXS_POLL_DATA*)dcb);
if (0 == rc)
{
dcb_add_to_list(dcb);
MXS_DEBUG("%lu [poll_add_dcb] Added dcb %p in state %s to poll set.",
pthread_self(),
dcb,
@ -477,46 +582,18 @@ int poll_remove_dcb(DCB *dcb)
if (dcbfd > 0)
{
int error_num = 0;
int worker_id;
if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER)
{
/** Listeners are added to all epoll instances */
int nthr = config_threadcount();
for (int i = 0; i < nthr; i++)
{
int tmp_rc = epoll_ctl(epoll_fd[i], EPOLL_CTL_DEL, dcb->fd, &ev);
if (tmp_rc && rc == 0)
{
/** Even if one of the instances failed to remove it, try
* to remove it from all the others */
rc = tmp_rc;
error_num = errno;
ss_dassert(error_num);
}
}
worker_id = MXS_WORKER_ALL;
}
else
{
if ((rc = epoll_ctl(epoll_fd[dcb->thread.id], EPOLL_CTL_DEL, dcbfd, &ev)))
{
error_num = errno;
}
}
/**
* The poll_resolve_error function will always
* return 0 or crash. So if it returns non-zero result,
* things have gone wrong and we crash.
*/
if (rc)
{
rc = poll_resolve_error(dcb, error_num, false);
}
if (rc)
{
raise(SIGABRT);
worker_id = dcb->poll.thread.id;
}
rc = poll_remove_fd_from_worker(worker_id, dcbfd);
}
return rc;
}
@ -530,20 +607,20 @@ int poll_remove_dcb(DCB *dcb)
* here to record the problem.
*
* @param errornum The errno set by epoll_ctl
* @param adding True for adding to poll list, false for removing
* @param op Either EPOLL_CTL_ADD or EPOLL_CTL_DEL.
* @return -1 on error or 0 for possibly revised return code
*/
static int
poll_resolve_error(DCB *dcb, int errornum, bool adding)
poll_resolve_error(int fd, int errornum, int op)
{
if (adding)
if (op == EPOLL_CTL_ADD)
{
if (EEXIST == errornum)
{
MXS_ERROR("%lu [poll_resolve_error] Error : epoll_ctl could not add, "
"already exists for DCB %p.",
"already exists for descriptor %d.",
pthread_self(),
dcb);
fd);
// Assume another thread added and no serious harm done
return 0;
}
@ -552,22 +629,24 @@ poll_resolve_error(DCB *dcb, int errornum, bool adding)
MXS_ERROR("%lu [poll_resolve_error] The limit imposed by "
"/proc/sys/fs/epoll/max_user_watches was "
"encountered while trying to register (EPOLL_CTL_ADD) a new "
"file descriptor on an epoll instance for dcb %p.",
"file descriptor on an epoll instance for descriptor %d.",
pthread_self(),
dcb);
fd);
/* Failure - assume handled by callers */
return -1;
}
}
else
{
ss_dassert(op == EPOLL_CTL_DEL);
/* Must be removing */
if (ENOENT == errornum)
{
MXS_ERROR("%lu [poll_resolve_error] Error : epoll_ctl could not remove, "
"not found, for dcb %p.",
"not found, for dcb %d.",
pthread_self(),
dcb);
fd);
// Assume another thread removed and no serious harm done
return 0;
}
@ -759,32 +838,9 @@ poll_waitevents(void *arg)
/* Process of the queue of waiting requests */
for (int i = 0; i < nfds; i++)
{
process_pollq(thread_id, &events[i]);
}
MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr;
fake_event_t *event = NULL;
/** It is very likely that the queue is empty so to avoid hitting the
* spinlock every time we receive events, we only do a dirty read. Currently,
* only the monitors inject fake events from external threads. */
if (fake_events[thread_id])
{
spinlock_acquire(&fake_event_lock[thread_id]);
event = fake_events[thread_id];
fake_events[thread_id] = NULL;
spinlock_release(&fake_event_lock[thread_id]);
}
while (event)
{
struct epoll_event ev;
event->dcb->dcb_fakequeue = event->data;
ev.data.ptr = event->dcb;
ev.events = event->event;
process_pollq(thread_id, &ev);
fake_event_t *tmp = event;
event = event->next;
MXS_FREE(tmp);
data->handler(data, thread_id, events[i].events);
}
dcb_process_idle_sessions(thread_id);
@ -863,11 +919,9 @@ poll_set_maxwait(unsigned int maxwait)
* @return 0 if no DCB's have been processed
*/
static int
process_pollq(int thread_id, struct epoll_event *event)
process_pollq_dcb(DCB *dcb, int thread_id, uint32_t ev)
{
uint32_t ev = event->events;
DCB *dcb = (DCB*)event->data.ptr;
ss_dassert(dcb->thread.id == thread_id || dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER);
ss_dassert(dcb->poll.thread.id == thread_id || dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER);
/** Calculate event queue statistics */
uint64_t started = hkheartbeat;
@ -1057,6 +1111,38 @@ process_pollq(int thread_id, struct epoll_event *event)
return 1;
}
static void dcb_poll_handler(MXS_POLL_DATA *data, int wid, uint32_t events)
{
process_pollq_dcb((DCB*)data, wid, events);
// Since this loop is now here, it will be processed once per extracted epoll
// event and not once per extraction of events, but as this is temporary code
// that's ok. Once it'll be possible to send cross-thread messages, the need
// for the fake event list will disappear.
fake_event_t *event = NULL;
/** It is very likely that the queue is empty so to avoid hitting the
* spinlock every time we receive events, we only do a dirty read. Currently,
* only the monitors inject fake events from external threads. */
if (fake_events[wid])
{
spinlock_acquire(&fake_event_lock[wid]);
event = fake_events[wid];
fake_events[wid] = NULL;
spinlock_release(&fake_event_lock[wid]);
}
while (event)
{
event->dcb->dcb_fakequeue = event->data;
process_pollq_dcb(event->dcb, wid, event->event);
fake_event_t *tmp = event;
event = event->next;
MXS_FREE(tmp);
}
}
/**
*
* Check that the DCB has a session link before processing.
@ -1398,7 +1484,7 @@ static void poll_add_event_to_dcb(DCB* dcb,
event->next = NULL;
event->tail = event;
int thr = dcb->thread.id;
int thr = dcb->poll.thread.id;
/** It is possible that a housekeeper or a monitor thread inserts a fake
* event into the thread's event queue which is why the operation needs

View File

@ -277,7 +277,7 @@ session_link_dcb(MXS_SESSION *session, DCB *dcb)
atomic_add(&session->refcount, 1);
dcb->session = session;
/** Move this DCB under the same thread */
dcb->thread.id = session->client_dcb->thread.id;
dcb->poll.thread.id = session->client_dcb->poll.thread.id;
return true;
}