diff --git a/include/maxscale/worker.h b/include/maxscale/worker.h index a3002342b..25f4196cb 100644 --- a/include/maxscale/worker.h +++ b/include/maxscale/worker.h @@ -68,6 +68,22 @@ enum mxs_worker_msg_id */ MXS_WORKER* mxs_worker_get(int worker_id); +/** + * Return the worker of the current thread. + * + * @return The worker instance or NULL if the calling thread is not associated + * with a worker. + */ +MXS_WORKER* mxs_worker_get_current(); + +/** + * Return the id of the worker of the current thread. + * + * @return The worker id or -1 if the calling thread is not associated + * with a worker. + */ +int mxs_worker_get_current_id(); + /** * Return the id of the worker. * diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 207a81b04..cb2a44163 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -3054,6 +3054,16 @@ void dcb_append_readqueue(DCB *dcb, GWBUF *buffer) dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, buffer); } + +static void dcb_add_to_worker_list(int thread_id, void* data) +{ + DCB *dcb = (DCB*)data; + + ss_dassert(thread_id == dcb->poll.thread.id); + + dcb_add_to_list(dcb); +} + void dcb_add_to_list(DCB *dcb) { if (dcb->dcb_role != DCB_ROLE_SERVICE_LISTENER || @@ -3065,20 +3075,34 @@ void dcb_add_to_list(DCB *dcb) * as that part is done in the final zombie processing. */ - spinlock_acquire(&all_dcbs_lock[dcb->poll.thread.id]); + int worker_id = mxs_worker_get_current_id(); - if (all_dcbs[dcb->poll.thread.id] == NULL) + if (worker_id == dcb->poll.thread.id) { - all_dcbs[dcb->poll.thread.id] = dcb; - all_dcbs[dcb->poll.thread.id]->thread.tail = dcb; + if (all_dcbs[dcb->poll.thread.id] == NULL) + { + all_dcbs[dcb->poll.thread.id] = dcb; + all_dcbs[dcb->poll.thread.id]->thread.tail = dcb; + } + else + { + all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb; + all_dcbs[dcb->poll.thread.id]->thread.tail = dcb; + } } else { - all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb; - all_dcbs[dcb->poll.thread.id]->thread.tail = dcb; - } + MXS_WORKER* worker = mxs_worker_get(dcb->poll.thread.id); + ss_dassert(worker); - spinlock_release(&all_dcbs_lock[dcb->poll.thread.id]); + intptr_t arg1 = (intptr_t)dcb_add_to_worker_list; + intptr_t arg2 = (intptr_t)dcb; + + if (!mxs_worker_post_message(worker, MXS_WORKER_MSG_CALL, arg1, arg2)) + { + MXS_ERROR("Could not post DCB to worker."); + } + } } } @@ -3089,7 +3113,7 @@ void dcb_add_to_list(DCB *dcb) */ static void dcb_remove_from_list(DCB *dcb) { - spinlock_acquire(&all_dcbs_lock[dcb->poll.thread.id]); + ss_dassert(mxs_worker_get_current_id() == dcb->poll.thread.id); if (dcb == all_dcbs[dcb->poll.thread.id]) { @@ -3126,8 +3150,6 @@ static void dcb_remove_from_list(DCB *dcb) * again, it will be in a clean state. */ dcb->thread.next = NULL; dcb->thread.tail = NULL; - - spinlock_release(&all_dcbs_lock[dcb->poll.thread.id]); } /** diff --git a/server/core/poll.cc b/server/core/poll.cc index 0735ad289..8fbb906a4 100644 --- a/server/core/poll.cc +++ b/server/core/poll.cc @@ -317,7 +317,7 @@ static int add_fd_to_workers(int fd, uint32_t events, MXS_POLL_DATA* data) ev.events = events; ev.data.ptr = data; - data->thread.id = 0; // In this case, the data will appear to be on the main thread. + data->thread.id = current_thread_id; // The DCB will appear on the list of the calling thread. int stored_errno = 0; int rc = 0; diff --git a/server/core/worker.c b/server/core/worker.c index 1df0548b7..c2778537d 100644 --- a/server/core/worker.c +++ b/server/core/worker.c @@ -19,17 +19,32 @@ #include #include #include +#include #include "maxscale/modules.h" #include "maxscale/poll.h" +#define WORKER_ABSENT_ID -1 + /** * Unit variables. */ -static struct worker_unit +static struct this_unit { - int n_workers; - MXS_WORKER** workers; -} this_unit; + int n_workers; // How many workers there are. + MXS_WORKER** workers; // Array of worker instances. +} this_unit = +{ + 0, + NULL +}; + +static thread_local struct this_thread +{ + int current_worker_id; // The worker id of the current thread +} this_thread = +{ + WORKER_ABSENT_ID +}; /** * Structure used for sending cross-thread messages. @@ -97,6 +112,25 @@ MXS_WORKER* mxs_worker_get(int worker_id) return this_unit.workers[worker_id]; } +MXS_WORKER* mxs_worker_get_current() +{ + MXS_WORKER* worker = NULL; + + int worker_id = this_thread.current_worker_id; + + if (worker_id != WORKER_ABSENT_ID) + { + worker = mxs_worker_get(worker_id); + } + + return worker; +} + +int mxs_worker_get_current_id() +{ + return this_thread.current_worker_id; +} + bool mxs_worker_post_message(MXS_WORKER *worker, uint32_t msg_id, intptr_t arg1, intptr_t arg2) { // NOTE: No logging here, this function must be signal safe. @@ -129,7 +163,9 @@ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg void mxs_worker_main(MXS_WORKER* worker) { + this_thread.current_worker_id = worker->id; poll_waitevents(worker); + this_thread.current_worker_id = WORKER_ABSENT_ID; MXS_NOTICE("Worker %d has shut down.", worker->id); }