Insert fake events under a lock
The thread-specific spinlock needs to be acquired before a fake event is inserted from a non-polling thread. The usual situation is when a monitor thread inserts a hangup event for a DCB. Other, less common cases are when session timeouts have been enabled and the DCB needs to be closed. Here it is better to insert a fake hangup event instead of directly closing the DCB from an external thread.
This commit is contained in:
@ -94,6 +94,7 @@ typedef struct fake_event
|
|||||||
static int *epoll_fd; /*< The epoll file descriptor */
|
static int *epoll_fd; /*< The epoll file descriptor */
|
||||||
static int next_epoll_fd = 0; /*< Which thread handles the next DCB */
|
static int next_epoll_fd = 0; /*< Which thread handles the next DCB */
|
||||||
static fake_event_t **fake_events; /*< Thread-specific fake event queue */
|
static fake_event_t **fake_events; /*< Thread-specific fake event queue */
|
||||||
|
static SPINLOCK *fake_event_lock;
|
||||||
static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */
|
static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */
|
||||||
static GWBITMASK poll_mask;
|
static GWBITMASK poll_mask;
|
||||||
#if MUTEX_EPOLL
|
#if MUTEX_EPOLL
|
||||||
@ -235,11 +236,21 @@ poll_init()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((fake_events = MXS_CALLOC(sizeof(fake_event_t*), n_threads)) == NULL)
|
if ((fake_events = MXS_CALLOC(n_threads, sizeof(fake_event_t*))) == NULL)
|
||||||
{
|
{
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((fake_event_lock = MXS_CALLOC(n_threads, sizeof(SPINLOCK))) == NULL)
|
||||||
|
{
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < n_threads; i++)
|
||||||
|
{
|
||||||
|
spinlock_init(&fake_event_lock[i]);
|
||||||
|
}
|
||||||
|
|
||||||
memset(&pollStats, 0, sizeof(pollStats));
|
memset(&pollStats, 0, sizeof(pollStats));
|
||||||
memset(&queueStats, 0, sizeof(queueStats));
|
memset(&queueStats, 0, sizeof(queueStats));
|
||||||
bitmask_init(&poll_mask);
|
bitmask_init(&poll_mask);
|
||||||
@ -719,17 +730,28 @@ poll_waitevents(void *arg)
|
|||||||
process_pollq(thread_id, &events[i]);
|
process_pollq(thread_id, &events[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Process fake events */
|
fake_event_t *event = NULL;
|
||||||
while (fake_events[thread_id])
|
|
||||||
{
|
|
||||||
fake_event_t *event = fake_events[thread_id];
|
|
||||||
fake_events[thread_id] = fake_events[thread_id]->next;
|
|
||||||
|
|
||||||
|
/** It is very likely that the queue is empty so to avoid hitting the
|
||||||
|
* spinlock every time we receive events, we only do a dirty read. Currently,
|
||||||
|
* only the monitors inject fake events from external threads. */
|
||||||
|
if (fake_events[thread_id])
|
||||||
|
{
|
||||||
|
spinlock_acquire(&fake_event_lock[thread_id]);
|
||||||
|
event = fake_events[thread_id];
|
||||||
|
fake_events[thread_id] = NULL;
|
||||||
|
spinlock_release(&fake_event_lock[thread_id]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Process fake events */
|
||||||
|
while (event)
|
||||||
|
{
|
||||||
struct epoll_event ev;
|
struct epoll_event ev;
|
||||||
event->dcb->dcb_fakequeue = event->data;
|
event->dcb->dcb_fakequeue = event->data;
|
||||||
ev.data.ptr = event->dcb;
|
ev.data.ptr = event->dcb;
|
||||||
ev.events = event->event;
|
ev.events = event->event;
|
||||||
process_pollq(thread_id, &ev);
|
process_pollq(thread_id, &ev);
|
||||||
|
event = event->next;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (check_timeouts && hkheartbeat >= next_timeout_check)
|
if (check_timeouts && hkheartbeat >= next_timeout_check)
|
||||||
@ -1436,6 +1458,11 @@ static void poll_add_event_to_dcb(DCB* dcb,
|
|||||||
|
|
||||||
int thr = dcb->owner;
|
int thr = dcb->owner;
|
||||||
|
|
||||||
|
/** It is possible that a housekeeper or a monitor thread inserts a fake
|
||||||
|
* event into the thread's event queue which is why the operation needs
|
||||||
|
* to be protected by a spinlock */
|
||||||
|
spinlock_acquire(&fake_event_lock[thr]);
|
||||||
|
|
||||||
if (fake_events[thr])
|
if (fake_events[thr])
|
||||||
{
|
{
|
||||||
fake_events[thr]->tail->next = event;
|
fake_events[thr]->tail->next = event;
|
||||||
@ -1445,6 +1472,8 @@ static void poll_add_event_to_dcb(DCB* dcb,
|
|||||||
{
|
{
|
||||||
fake_events[thr] = event;
|
fake_events[thr] = event;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
spinlock_release(&fake_event_lock[thr]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -42,6 +42,7 @@
|
|||||||
#include <maxscale/atomic.h>
|
#include <maxscale/atomic.h>
|
||||||
#include <maxscale/log_manager.h>
|
#include <maxscale/log_manager.h>
|
||||||
#include <maxscale/housekeeper.h>
|
#include <maxscale/housekeeper.h>
|
||||||
|
#include <maxscale/poll.h>
|
||||||
|
|
||||||
/* This list of all sessions */
|
/* This list of all sessions */
|
||||||
LIST_CONFIG SESSIONlist =
|
LIST_CONFIG SESSIONlist =
|
||||||
@ -927,7 +928,7 @@ void process_idle_sessions()
|
|||||||
if (all_session->service && all_session->client_dcb && all_session->client_dcb->state == DCB_STATE_POLLING &&
|
if (all_session->service && all_session->client_dcb && all_session->client_dcb->state == DCB_STATE_POLLING &&
|
||||||
hkheartbeat - all_session->client_dcb->last_read > all_session->service->conn_idle_timeout * 10)
|
hkheartbeat - all_session->client_dcb->last_read > all_session->service->conn_idle_timeout * 10)
|
||||||
{
|
{
|
||||||
dcb_close(all_session->client_dcb);
|
poll_fake_hangup_event(all_session->client_dcb);
|
||||||
}
|
}
|
||||||
|
|
||||||
current = list_iterate(&SESSIONlist, current);
|
current = list_iterate(&SESSIONlist, current);
|
||||||
|
Reference in New Issue
Block a user