diff --git a/include/maxscale/poll_core.h b/include/maxscale/poll_core.h index e8df31b24..9f0927c56 100644 --- a/include/maxscale/poll_core.h +++ b/include/maxscale/poll_core.h @@ -18,6 +18,7 @@ #include #include +#include MXS_BEGIN_DECLS @@ -32,10 +33,12 @@ typedef enum mxs_poll_action } mxs_poll_action_t; struct mxs_poll_data; -/** Pointer to function that knows how to handle events for a particular - * 'struct mxs_poll_data' structure. + +/** + * Pointer to function that knows how to handle events for a particular + * 'struct mxs_poll_data' structure. * - * @param data The `mxs_poll_data` instance that contained this pointer. + * @param data The `mxs_poll_data` instance that contained this function pointer. * @param wid The worker thread id. * @param events The epoll events. * @@ -43,9 +46,23 @@ struct mxs_poll_data; */ typedef uint32_t (*mxs_poll_handler_t)(struct mxs_poll_data* data, int wid, uint32_t events); +/** + * Pointer to function that knows how to free a particular + * 'struct mxs_poll_data' structure + * + * @param data The `mxs_poll_data` instance that contained this function pointer. + */ +typedef void (*mxs_poll_free_t)(struct mxs_poll_data* data); + typedef struct mxs_poll_data { mxs_poll_handler_t handler; /*< Handler for this particular kind of mxs_poll_data. */ + mxs_poll_free_t free; /*< Optional free function for mxs_poll_data. */ + uint32_t refcount; /*< Reference count, optionally used. If 'free' is provided, + refcount will be incremented before events are delivered + to the handler and decremented after. If reaches 0, then + the free function is called. + */ struct { int id; /*< The id of the worker thread. */ @@ -98,4 +115,27 @@ bool poll_add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data */ bool poll_remove_fd_from_worker(int wid, int fd); +/** + * Increase the reference count of the poll data. + * + * @param data The poll data whose reference count should be increased. + */ +static inline void poll_inc_ref(MXS_POLL_DATA* data) +{ + atomic_add_uint32(&data->refcount, 1); +} + +/** + * Decrease the reference count of the poll data. + * + * @param data The poll data whose reference count should be decreased. + * + * @return The previous reference count. If the returned value is 1, then + * the caller is the last user of the data. + */ +static inline uint32_t poll_dec_ref(MXS_POLL_DATA* data) +{ + return atomic_add_uint32(&data->refcount, -1); +} + MXS_END_DECLS diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 2351f327c..e463ed59a 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -91,28 +91,6 @@ static thread_local struct } -void dcb_global_init() -{ - this_unit.dcb_initialized.dcb_chk_top = CHK_NUM_DCB; - this_unit.dcb_initialized.fd = DCBFD_CLOSED; - this_unit.dcb_initialized.state = DCB_STATE_ALLOC; - this_unit.dcb_initialized.ssl_state = SSL_HANDSHAKE_UNKNOWN; - this_unit.dcb_initialized.dcb_chk_tail = CHK_NUM_DCB; - - int nthreads = config_threadcount(); - - if ((this_unit.all_dcbs = (DCB**)MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL) - { - MXS_OOM(); - raise(SIGABRT); - } -} - -void dcb_finish() -{ - // TODO: Free all resources. -} - static void dcb_initialize(DCB *dcb); static void dcb_final_free(DCB *dcb); static void dcb_final_close(DCB *dcb); @@ -145,6 +123,30 @@ static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t ev static uint32_t dcb_process_poll_events(DCB *dcb, uint32_t ev); static bool dcb_session_check(DCB *dcb, const char *); +void dcb_global_init() +{ + this_unit.dcb_initialized.dcb_chk_top = CHK_NUM_DCB; + this_unit.dcb_initialized.fd = DCBFD_CLOSED; + this_unit.dcb_initialized.state = DCB_STATE_ALLOC; + this_unit.dcb_initialized.ssl_state = SSL_HANDSHAKE_UNKNOWN; + this_unit.dcb_initialized.poll.handler = dcb_poll_handler; + this_unit.dcb_initialized.poll.free = NULL; + this_unit.dcb_initialized.dcb_chk_tail = CHK_NUM_DCB; + + int nthreads = config_threadcount(); + + if ((this_unit.all_dcbs = (DCB**)MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL) + { + MXS_OOM(); + raise(SIGABRT); + } +} + +void dcb_finish() +{ + // TODO: Free all resources. +} + uint64_t dcb_get_session_id(DCB *dcb) { return (dcb && dcb->session) ? dcb->session->ses_id : 0; @@ -165,8 +167,6 @@ static void dcb_initialize(DCB *dcb) { *dcb = this_unit.dcb_initialized; - - dcb->poll.handler = dcb_poll_handler; } /** diff --git a/server/core/worker.cc b/server/core/worker.cc index 707b8da5e..ff36e0391 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -162,6 +162,8 @@ Worker::Worker(int id, , m_shutdown_initiated(false) { MXS_POLL_DATA::handler = &Worker::epoll_instance_handler; + MXS_POLL_DATA::free = NULL; + MXS_POLL_DATA::refcount = 0; MXS_POLL_DATA::thread.id = id; } @@ -1139,6 +1141,15 @@ void Worker::poll_waitevents() uint64_t cycle_start = hkheartbeat; + for (int i = 0; i < nfds; i++) + { + MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr; + if (data->free) + { + poll_inc_ref(data); + } + } + for (int i = 0; i < nfds; i++) { /** Calculate event queue statistics */ @@ -1200,6 +1211,18 @@ void Worker::poll_waitevents() m_statistics.maxexectime = MXS_MAX(m_statistics.maxexectime, qtime); } + for (int i = 0; i < nfds; i++) + { + MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr; + if (data->free) + { + if (poll_dec_ref(data) == 1) + { + data->free(data); + } + } + } + dcb_process_idle_sessions(m_id); m_state = ZPROCESSING;