DCBs now added/removed without locks

This is not globally safe yet, but all other access is directly or
indirectly related to maxadmin, which is irrelevant as far as
performance testing is concerned.
This commit is contained in:
Johan Wikman
2017-02-17 09:47:40 +02:00
parent 6c3c96cb7e
commit 86b7eb622e
4 changed files with 90 additions and 16 deletions

View File

@ -68,6 +68,22 @@ enum mxs_worker_msg_id
*/ */
MXS_WORKER* mxs_worker_get(int worker_id); MXS_WORKER* mxs_worker_get(int worker_id);
/**
* Return the worker of the current thread.
*
* @return The worker instance or NULL if the calling thread is not associated
* with a worker.
*/
MXS_WORKER* mxs_worker_get_current();
/**
* Return the id of the worker of the current thread.
*
* @return The worker id or -1 if the calling thread is not associated
* with a worker.
*/
int mxs_worker_get_current_id();
/** /**
* Return the id of the worker. * Return the id of the worker.
* *

View File

@ -3054,6 +3054,16 @@ void dcb_append_readqueue(DCB *dcb, GWBUF *buffer)
dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, buffer); dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, buffer);
} }
static void dcb_add_to_worker_list(int thread_id, void* data)
{
DCB *dcb = (DCB*)data;
ss_dassert(thread_id == dcb->poll.thread.id);
dcb_add_to_list(dcb);
}
void dcb_add_to_list(DCB *dcb) void dcb_add_to_list(DCB *dcb)
{ {
if (dcb->dcb_role != DCB_ROLE_SERVICE_LISTENER || if (dcb->dcb_role != DCB_ROLE_SERVICE_LISTENER ||
@ -3065,20 +3075,34 @@ void dcb_add_to_list(DCB *dcb)
* as that part is done in the final zombie processing. * as that part is done in the final zombie processing.
*/ */
spinlock_acquire(&all_dcbs_lock[dcb->poll.thread.id]); int worker_id = mxs_worker_get_current_id();
if (all_dcbs[dcb->poll.thread.id] == NULL) if (worker_id == dcb->poll.thread.id)
{ {
all_dcbs[dcb->poll.thread.id] = dcb; if (all_dcbs[dcb->poll.thread.id] == NULL)
all_dcbs[dcb->poll.thread.id]->thread.tail = dcb; {
all_dcbs[dcb->poll.thread.id] = dcb;
all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
}
else
{
all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb;
all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
}
} }
else else
{ {
all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb; MXS_WORKER* worker = mxs_worker_get(dcb->poll.thread.id);
all_dcbs[dcb->poll.thread.id]->thread.tail = dcb; ss_dassert(worker);
}
spinlock_release(&all_dcbs_lock[dcb->poll.thread.id]); intptr_t arg1 = (intptr_t)dcb_add_to_worker_list;
intptr_t arg2 = (intptr_t)dcb;
if (!mxs_worker_post_message(worker, MXS_WORKER_MSG_CALL, arg1, arg2))
{
MXS_ERROR("Could not post DCB to worker.");
}
}
} }
} }
@ -3089,7 +3113,7 @@ void dcb_add_to_list(DCB *dcb)
*/ */
static void dcb_remove_from_list(DCB *dcb) static void dcb_remove_from_list(DCB *dcb)
{ {
spinlock_acquire(&all_dcbs_lock[dcb->poll.thread.id]); ss_dassert(mxs_worker_get_current_id() == dcb->poll.thread.id);
if (dcb == all_dcbs[dcb->poll.thread.id]) if (dcb == all_dcbs[dcb->poll.thread.id])
{ {
@ -3126,8 +3150,6 @@ static void dcb_remove_from_list(DCB *dcb)
* again, it will be in a clean state. */ * again, it will be in a clean state. */
dcb->thread.next = NULL; dcb->thread.next = NULL;
dcb->thread.tail = NULL; dcb->thread.tail = NULL;
spinlock_release(&all_dcbs_lock[dcb->poll.thread.id]);
} }
/** /**

View File

@ -317,7 +317,7 @@ static int add_fd_to_workers(int fd, uint32_t events, MXS_POLL_DATA* data)
ev.events = events; ev.events = events;
ev.data.ptr = data; ev.data.ptr = data;
data->thread.id = 0; // In this case, the data will appear to be on the main thread. data->thread.id = current_thread_id; // The DCB will appear on the list of the calling thread.
int stored_errno = 0; int stored_errno = 0;
int rc = 0; int rc = 0;

View File

@ -19,17 +19,32 @@
#include <maxscale/alloc.h> #include <maxscale/alloc.h>
#include <maxscale/config.h> #include <maxscale/config.h>
#include <maxscale/log_manager.h> #include <maxscale/log_manager.h>
#include <maxscale/platform.h>
#include "maxscale/modules.h" #include "maxscale/modules.h"
#include "maxscale/poll.h" #include "maxscale/poll.h"
#define WORKER_ABSENT_ID -1
/** /**
* Unit variables. * Unit variables.
*/ */
static struct worker_unit static struct this_unit
{ {
int n_workers; int n_workers; // How many workers there are.
MXS_WORKER** workers; MXS_WORKER** workers; // Array of worker instances.
} this_unit; } this_unit =
{
0,
NULL
};
static thread_local struct this_thread
{
int current_worker_id; // The worker id of the current thread
} this_thread =
{
WORKER_ABSENT_ID
};
/** /**
* Structure used for sending cross-thread messages. * Structure used for sending cross-thread messages.
@ -97,6 +112,25 @@ MXS_WORKER* mxs_worker_get(int worker_id)
return this_unit.workers[worker_id]; return this_unit.workers[worker_id];
} }
MXS_WORKER* mxs_worker_get_current()
{
MXS_WORKER* worker = NULL;
int worker_id = this_thread.current_worker_id;
if (worker_id != WORKER_ABSENT_ID)
{
worker = mxs_worker_get(worker_id);
}
return worker;
}
int mxs_worker_get_current_id()
{
return this_thread.current_worker_id;
}
bool mxs_worker_post_message(MXS_WORKER *worker, uint32_t msg_id, intptr_t arg1, intptr_t arg2) bool mxs_worker_post_message(MXS_WORKER *worker, uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{ {
// NOTE: No logging here, this function must be signal safe. // NOTE: No logging here, this function must be signal safe.
@ -129,7 +163,9 @@ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg
void mxs_worker_main(MXS_WORKER* worker) void mxs_worker_main(MXS_WORKER* worker)
{ {
this_thread.current_worker_id = worker->id;
poll_waitevents(worker); poll_waitevents(worker);
this_thread.current_worker_id = WORKER_ABSENT_ID;
MXS_NOTICE("Worker %d has shut down.", worker->id); MXS_NOTICE("Worker %d has shut down.", worker->id);
} }