Use worker messages for fake events
Fake events now use worker messages to trigger execution of the events.
This commit is contained in:
@ -98,11 +98,4 @@ bool poll_add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data
|
|||||||
*/
|
*/
|
||||||
bool poll_remove_fd_from_worker(int wid, int fd);
|
bool poll_remove_fd_from_worker(int wid, int fd);
|
||||||
|
|
||||||
/**
|
|
||||||
* Check whether there are cross-thread messages for current thread.
|
|
||||||
*
|
|
||||||
* @attention Only to be called by the system.
|
|
||||||
*/
|
|
||||||
void poll_check_message(void);
|
|
||||||
|
|
||||||
MXS_END_DECLS
|
MXS_END_DECLS
|
||||||
|
@ -69,19 +69,6 @@ using maxscale::Semaphore;
|
|||||||
/* A DCB with null values, used for initialization */
|
/* A DCB with null values, used for initialization */
|
||||||
static DCB dcb_initialized;
|
static DCB dcb_initialized;
|
||||||
|
|
||||||
/** Fake epoll event struct */
|
|
||||||
typedef struct fake_event
|
|
||||||
{
|
|
||||||
DCB *dcb; /*< The DCB where this event was generated */
|
|
||||||
GWBUF *data; /*< Fake data, placed in the DCB's read queue */
|
|
||||||
uint32_t event; /*< The EPOLL event type */
|
|
||||||
struct fake_event *tail; /*< The last event */
|
|
||||||
struct fake_event *next; /*< The next event */
|
|
||||||
} fake_event_t;
|
|
||||||
|
|
||||||
static fake_event_t **fake_events; /*< Thread-specific fake event queue */
|
|
||||||
static SPINLOCK *fake_event_lock;
|
|
||||||
|
|
||||||
static DCB **all_dcbs;
|
static DCB **all_dcbs;
|
||||||
static SPINLOCK *all_dcbs_lock;
|
static SPINLOCK *all_dcbs_lock;
|
||||||
static DCB **zombies;
|
static DCB **zombies;
|
||||||
@ -106,9 +93,7 @@ void dcb_global_init()
|
|||||||
if ((zombies = (DCB**)MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL ||
|
if ((zombies = (DCB**)MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL ||
|
||||||
(all_dcbs = (DCB**)MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL ||
|
(all_dcbs = (DCB**)MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL ||
|
||||||
(all_dcbs_lock = (SPINLOCK*)MXS_CALLOC(nthreads, sizeof(SPINLOCK))) == NULL ||
|
(all_dcbs_lock = (SPINLOCK*)MXS_CALLOC(nthreads, sizeof(SPINLOCK))) == NULL ||
|
||||||
(nzombies = (int*)MXS_CALLOC(nthreads, sizeof(int))) == NULL ||
|
(nzombies = (int*)MXS_CALLOC(nthreads, sizeof(int))) == NULL)
|
||||||
(fake_events = (fake_event_t**)MXS_CALLOC(nthreads, sizeof(fake_event_t*))) == NULL ||
|
|
||||||
(fake_event_lock = (SPINLOCK*)MXS_CALLOC(nthreads, sizeof(SPINLOCK))) == NULL)
|
|
||||||
{
|
{
|
||||||
MXS_OOM();
|
MXS_OOM();
|
||||||
raise(SIGABRT);
|
raise(SIGABRT);
|
||||||
@ -118,11 +103,6 @@ void dcb_global_init()
|
|||||||
{
|
{
|
||||||
spinlock_init(&all_dcbs_lock[i]);
|
spinlock_init(&all_dcbs_lock[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < nthreads; i++)
|
|
||||||
{
|
|
||||||
spinlock_init(&fake_event_lock[i]);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void dcb_finish()
|
void dcb_finish()
|
||||||
@ -159,8 +139,7 @@ static DCB *dcb_find_free();
|
|||||||
static void dcb_remove_from_list(DCB *dcb);
|
static void dcb_remove_from_list(DCB *dcb);
|
||||||
|
|
||||||
static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t events);
|
static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t events);
|
||||||
static uint32_t dcb_process_poll_events(DCB *dcb, int thread_id, uint32_t ev);
|
static uint32_t dcb_process_poll_events(DCB *dcb, uint32_t ev);
|
||||||
static void dcb_process_fake_events(DCB *dcb, int thread_id);
|
|
||||||
static bool dcb_session_check(DCB *dcb, const char *);
|
static bool dcb_session_check(DCB *dcb, const char *);
|
||||||
|
|
||||||
uint64_t dcb_get_session_id(DCB *dcb)
|
uint64_t dcb_get_session_id(DCB *dcb)
|
||||||
@ -2026,12 +2005,6 @@ static void dcb_hangup_foreach_worker(int thread_id, struct server* server)
|
|||||||
dcb->server == server)
|
dcb->server == server)
|
||||||
{
|
{
|
||||||
poll_fake_hangup_event(dcb);
|
poll_fake_hangup_event(dcb);
|
||||||
// dcb_hangup_foreach_worker() is called via the message loop,
|
|
||||||
// so immediately after the hangup event has been added, we can
|
|
||||||
// also process it. Indeed, it is necessary to do that because
|
|
||||||
// otherwise, unless there is a real event for the DCB descriptor,
|
|
||||||
// the fake event would not be handled.
|
|
||||||
dcb_process_fake_events(dcb, thread_id);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3034,9 +3007,10 @@ int dcb_get_port(const DCB *dcb)
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint32_t dcb_process_poll_events(DCB *dcb, int thread_id, uint32_t events)
|
static uint32_t dcb_process_poll_events(DCB *dcb, uint32_t events)
|
||||||
{
|
{
|
||||||
ss_dassert(dcb->poll.thread.id == thread_id || dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER);
|
ss_dassert(dcb->poll.thread.id == mxs::Worker::get_current_id() ||
|
||||||
|
dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER);
|
||||||
|
|
||||||
CHK_DCB(dcb);
|
CHK_DCB(dcb);
|
||||||
|
|
||||||
@ -3198,89 +3172,56 @@ static uint32_t dcb_process_poll_events(DCB *dcb, int thread_id, uint32_t events
|
|||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dcb_process_fake_events(DCB *dcb, int thread_id)
|
|
||||||
{
|
|
||||||
// Since this loop is now here, it will be processed once per extracted epoll
|
|
||||||
// event and not once per extraction of events, but as this is temporary code
|
|
||||||
// that's ok. Once it'll be possible to send cross-thread messages, the need
|
|
||||||
// for the fake event list will disappear.
|
|
||||||
|
|
||||||
fake_event_t *event = NULL;
|
|
||||||
|
|
||||||
/** It is very likely that the queue is empty so to avoid hitting the
|
|
||||||
* spinlock every time we receive events, we only do a dirty read. Currently,
|
|
||||||
* only the monitors inject fake events from external threads. */
|
|
||||||
if (fake_events[thread_id])
|
|
||||||
{
|
|
||||||
spinlock_acquire(&fake_event_lock[thread_id]);
|
|
||||||
event = fake_events[thread_id];
|
|
||||||
fake_events[thread_id] = NULL;
|
|
||||||
spinlock_release(&fake_event_lock[thread_id]);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (event)
|
|
||||||
{
|
|
||||||
event->dcb->dcb_fakequeue = event->data;
|
|
||||||
dcb_process_poll_events(event->dcb, thread_id, event->event);
|
|
||||||
fake_event_t *tmp = event;
|
|
||||||
event = event->next;
|
|
||||||
MXS_FREE(tmp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t events)
|
static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t events)
|
||||||
{
|
{
|
||||||
DCB *dcb = (DCB*)data;
|
DCB *dcb = (DCB*)data;
|
||||||
|
|
||||||
uint32_t rc = dcb_process_poll_events(dcb, thread_id, events);
|
return dcb_process_poll_events(dcb, events);
|
||||||
|
|
||||||
dcb_process_fake_events(dcb, thread_id);
|
|
||||||
|
|
||||||
return rc;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void poll_add_event_to_dcb(DCB* dcb,
|
class FakeEventTask: public mxs::WorkerDisposableTask
|
||||||
GWBUF* buf,
|
|
||||||
uint32_t ev)
|
|
||||||
{
|
{
|
||||||
fake_event_t *event = (fake_event_t*)MXS_MALLOC(sizeof(*event));
|
FakeEventTask(const FakeEventTask&);
|
||||||
|
FakeEventTask& operator=(const FakeEventTask&);
|
||||||
|
|
||||||
if (event)
|
public:
|
||||||
|
FakeEventTask(DCB* dcb, GWBUF* buf, uint32_t ev):
|
||||||
|
m_dcb(dcb),
|
||||||
|
m_buffer(buf),
|
||||||
|
m_ev(ev)
|
||||||
{
|
{
|
||||||
event->data = buf;
|
}
|
||||||
event->dcb = dcb;
|
|
||||||
event->event = ev;
|
|
||||||
event->next = NULL;
|
|
||||||
event->tail = event;
|
|
||||||
|
|
||||||
int thr = dcb->poll.thread.id;
|
void execute(Worker& worker)
|
||||||
|
{
|
||||||
|
m_dcb->dcb_fakequeue = m_buffer;
|
||||||
|
dcb_process_poll_events(m_dcb, m_ev);
|
||||||
|
}
|
||||||
|
|
||||||
/** It is possible that a housekeeper or a monitor thread inserts a fake
|
private:
|
||||||
* event into the thread's event queue which is why the operation needs
|
DCB* m_dcb;
|
||||||
* to be protected by a spinlock */
|
GWBUF* m_buffer;
|
||||||
spinlock_acquire(&fake_event_lock[thr]);
|
uint32_t m_ev;
|
||||||
|
};
|
||||||
|
|
||||||
if (fake_events[thr])
|
static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, uint32_t ev)
|
||||||
{
|
{
|
||||||
fake_events[thr]->tail->next = event;
|
FakeEventTask* task = new (std::nothrow) FakeEventTask(dcb, buf, ev);
|
||||||
fake_events[thr]->tail = event;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
fake_events[thr] = event;
|
|
||||||
}
|
|
||||||
|
|
||||||
spinlock_release(&fake_event_lock[thr]);
|
if (task)
|
||||||
|
{
|
||||||
|
Worker* worker = Worker::get(dcb->poll.thread.id);
|
||||||
|
worker->post(std::auto_ptr<FakeEventTask>(task), mxs::Worker::EXECUTE_QUEUED);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_OOM();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void poll_add_epollin_event_to_dcb(DCB* dcb, GWBUF* buf)
|
void poll_add_epollin_event_to_dcb(DCB* dcb, GWBUF* buf)
|
||||||
{
|
{
|
||||||
uint32_t ev;
|
poll_add_event_to_dcb(dcb, buf, EPOLLIN);
|
||||||
|
|
||||||
ev = EPOLLIN;
|
|
||||||
|
|
||||||
poll_add_event_to_dcb(dcb, buf, ev);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void poll_fake_write_event(DCB *dcb)
|
void poll_fake_write_event(DCB *dcb)
|
||||||
|
Reference in New Issue
Block a user