MXS-1392 Add reference count to MXS_POLL_DATA

The polling mechanism can now optionally be used for managing
the lifetime of an object placed into the poll set.

If a MXS_POLL_DATA has a non-null 'free', then the reference count
of the data will be increased before calling the handler and
decreased after. In that case, if the reference count reaches 0,
the free function will be called.

Note that the reference counts of *all* MXS_POLL_DATAs returned
by 'epoll_wait' will be increased before the events are delivered
to the handlers individually for each MXS_POLL_DATA, and then once
all events have been delivered, the reference count of each
MXS_POLL_DATA will be decreased.

This ensure that if there are interdependencies between different
MXS_POLL_DATAs returned by one call to 'epoll_wait', the case that
an MXS_POLL_DATA is deleted before its events have been delivered
can be avoided by using the reference count for lifetime management.

In subsequent commits, the reference count will be taken into use
in the lifetime management of DCBs.
This commit is contained in:
Johan Wikman 2017-09-06 15:47:07 +03:00
parent daef8ad5d7
commit 7e17e2cd56
3 changed files with 90 additions and 27 deletions

View File

@ -18,6 +18,7 @@
#include <maxscale/cdefs.h>
#include <sys/epoll.h>
#include <maxscale/atomic.h>
MXS_BEGIN_DECLS
@ -32,10 +33,12 @@ typedef enum mxs_poll_action
} mxs_poll_action_t;
struct mxs_poll_data;
/** Pointer to function that knows how to handle events for a particular
* 'struct mxs_poll_data' structure.
/**
* Pointer to function that knows how to handle events for a particular
* 'struct mxs_poll_data' structure.
*
* @param data The `mxs_poll_data` instance that contained this pointer.
* @param data The `mxs_poll_data` instance that contained this function pointer.
* @param wid The worker thread id.
* @param events The epoll events.
*
@ -43,9 +46,23 @@ struct mxs_poll_data;
*/
typedef uint32_t (*mxs_poll_handler_t)(struct mxs_poll_data* data, int wid, uint32_t events);
/**
* Pointer to function that knows how to free a particular
* 'struct mxs_poll_data' structure
*
* @param data The `mxs_poll_data` instance that contained this function pointer.
*/
typedef void (*mxs_poll_free_t)(struct mxs_poll_data* data);
typedef struct mxs_poll_data
{
mxs_poll_handler_t handler; /*< Handler for this particular kind of mxs_poll_data. */
mxs_poll_free_t free; /*< Optional free function for mxs_poll_data. */
uint32_t refcount; /*< Reference count, optionally used. If 'free' is provided,
refcount will be incremented before events are delivered
to the handler and decremented after. If reaches 0, then
the free function is called.
*/
struct
{
int id; /*< The id of the worker thread. */
@ -98,4 +115,27 @@ bool poll_add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data
*/
bool poll_remove_fd_from_worker(int wid, int fd);
/**
* Increase the reference count of the poll data.
*
* @param data The poll data whose reference count should be increased.
*/
static inline void poll_inc_ref(MXS_POLL_DATA* data)
{
atomic_add_uint32(&data->refcount, 1);
}
/**
* Decrease the reference count of the poll data.
*
* @param data The poll data whose reference count should be decreased.
*
* @return The previous reference count. If the returned value is 1, then
* the caller is the last user of the data.
*/
static inline uint32_t poll_dec_ref(MXS_POLL_DATA* data)
{
return atomic_add_uint32(&data->refcount, -1);
}
MXS_END_DECLS

View File

@ -91,28 +91,6 @@ static thread_local struct
}
void dcb_global_init()
{
this_unit.dcb_initialized.dcb_chk_top = CHK_NUM_DCB;
this_unit.dcb_initialized.fd = DCBFD_CLOSED;
this_unit.dcb_initialized.state = DCB_STATE_ALLOC;
this_unit.dcb_initialized.ssl_state = SSL_HANDSHAKE_UNKNOWN;
this_unit.dcb_initialized.dcb_chk_tail = CHK_NUM_DCB;
int nthreads = config_threadcount();
if ((this_unit.all_dcbs = (DCB**)MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL)
{
MXS_OOM();
raise(SIGABRT);
}
}
void dcb_finish()
{
// TODO: Free all resources.
}
static void dcb_initialize(DCB *dcb);
static void dcb_final_free(DCB *dcb);
static void dcb_final_close(DCB *dcb);
@ -145,6 +123,30 @@ static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t ev
static uint32_t dcb_process_poll_events(DCB *dcb, uint32_t ev);
static bool dcb_session_check(DCB *dcb, const char *);
void dcb_global_init()
{
this_unit.dcb_initialized.dcb_chk_top = CHK_NUM_DCB;
this_unit.dcb_initialized.fd = DCBFD_CLOSED;
this_unit.dcb_initialized.state = DCB_STATE_ALLOC;
this_unit.dcb_initialized.ssl_state = SSL_HANDSHAKE_UNKNOWN;
this_unit.dcb_initialized.poll.handler = dcb_poll_handler;
this_unit.dcb_initialized.poll.free = NULL;
this_unit.dcb_initialized.dcb_chk_tail = CHK_NUM_DCB;
int nthreads = config_threadcount();
if ((this_unit.all_dcbs = (DCB**)MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL)
{
MXS_OOM();
raise(SIGABRT);
}
}
void dcb_finish()
{
// TODO: Free all resources.
}
uint64_t dcb_get_session_id(DCB *dcb)
{
return (dcb && dcb->session) ? dcb->session->ses_id : 0;
@ -165,8 +167,6 @@ static void
dcb_initialize(DCB *dcb)
{
*dcb = this_unit.dcb_initialized;
dcb->poll.handler = dcb_poll_handler;
}
/**

View File

@ -162,6 +162,8 @@ Worker::Worker(int id,
, m_shutdown_initiated(false)
{
MXS_POLL_DATA::handler = &Worker::epoll_instance_handler;
MXS_POLL_DATA::free = NULL;
MXS_POLL_DATA::refcount = 0;
MXS_POLL_DATA::thread.id = id;
}
@ -1139,6 +1141,15 @@ void Worker::poll_waitevents()
uint64_t cycle_start = hkheartbeat;
for (int i = 0; i < nfds; i++)
{
MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr;
if (data->free)
{
poll_inc_ref(data);
}
}
for (int i = 0; i < nfds; i++)
{
/** Calculate event queue statistics */
@ -1200,6 +1211,18 @@ void Worker::poll_waitevents()
m_statistics.maxexectime = MXS_MAX(m_statistics.maxexectime, qtime);
}
for (int i = 0; i < nfds; i++)
{
MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr;
if (data->free)
{
if (poll_dec_ref(data) == 1)
{
data->free(data);
}
}
}
dcb_process_idle_sessions(m_id);
m_state = ZPROCESSING;