MXS-1915 Replace worker id with worker pointer

To get rid of the need that a Worker must have an id, we store
in the MXS_POLL_DATA structure a pointer to the owning worker
instead of the id of the owning worker. This also allows some
further cleanup as the need for switching back and forth between
the id and the worker disappears.

The id will be moved from Worker to RoutingWorker as there
currently is a fair amount of code that assumes that the id of
routing workers start from 0.
This commit is contained in:
Johan Wikman 2018-06-20 14:54:20 +03:00
parent 241c9b645d
commit 86b5238aaf
8 changed files with 73 additions and 66 deletions

View File

@ -49,10 +49,7 @@ typedef uint32_t (*mxs_poll_handler_t)(struct mxs_poll_data* data, int wid, uint
typedef struct mxs_poll_data
{
mxs_poll_handler_t handler; /*< Handler for this particular kind of mxs_poll_data. */
struct
{
int id; /*< The id of the worker thread. */
} thread;
void* owner; /*< Owning worker. */
} MXS_POLL_DATA;
/**

View File

@ -23,13 +23,13 @@ struct MxsPollData : MXS_POLL_DATA
MxsPollData()
{
handler = NULL;
thread.id = 0;
owner = nullptr;
}
MxsPollData(mxs_poll_handler_t h)
{
handler = h;
thread.id = 0;
owner = nullptr;
}
};

View File

@ -205,13 +205,13 @@ dcb_alloc(dcb_role_t role, SERV_LISTENER *listener)
if (role == DCB_ROLE_SERVICE_LISTENER)
{
/** All listeners are owned by the main thread (i.e. thread no. 0) */
newdcb->poll.thread.id = 0;
newdcb->poll.owner = RoutingWorker::get(RoutingWorker::MAIN);
}
else
{
/** Otherwise the DCB is owned by the thread that allocates it */
ss_dassert(RoutingWorker::get_current_id() != -1);
newdcb->poll.thread.id = RoutingWorker::get_current_id();
newdcb->poll.owner = RoutingWorker::get_current();
}
return newdcb;
@ -345,7 +345,7 @@ dcb_free_all_memory(DCB *dcb)
}
// Ensure that id is immediately the wrong one.
dcb->poll.thread.id = 0xdeadbeef;
dcb->poll.owner = reinterpret_cast<void*>(0xdeadbeef);
MXS_FREE(dcb);
}
@ -393,8 +393,8 @@ dcb_connect(SERVER *server, MXS_SESSION *session, const char *protocol)
if (user && strlen(user))
{
MXS_DEBUG("Looking for persistent connection DCB user %s protocol %s", user, protocol);
dcb = server_get_persistent(server, user, session->client_dcb->remote,
protocol, session->client_dcb->poll.thread.id);
dcb = server_get_persistent(server, user, session->client_dcb->remote, protocol,
static_cast<RoutingWorker*>(session->client_dcb->poll.owner)->id());
if (dcb)
{
/**
@ -1103,11 +1103,12 @@ void dcb_close(DCB *dcb)
#if defined(SS_DEBUG)
int wid = Worker::get_current_id();
if ((wid != -1) && (dcb->poll.thread.id != wid))
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->poll.owner);
if ((wid != -1) && (owner->id() != wid))
{
MXS_ALERT("dcb_close(%p) called by %d, owned by %d.",
dcb, wid, dcb->poll.thread.id);
ss_dassert(dcb->poll.thread.id == Worker::get_current_id());
dcb, wid, owner->id());
ss_dassert(owner == RoutingWorker::get_current());
}
#endif
@ -1150,7 +1151,7 @@ void dcb_close(DCB *dcb)
}
else
{
RoutingWorker* worker = RoutingWorker::get(dcb->poll.thread.id);
RoutingWorker* worker = static_cast<RoutingWorker*>(dcb->poll.owner);
ss_dassert(worker);
worker->register_zombie(dcb);
@ -1181,7 +1182,7 @@ void dcb_close_in_owning_thread(DCB* dcb)
// TODO: reference counted, so that we could addref before posting, thus
// TODO: preventing too early a deletion.
MXS_WORKER* worker = mxs_rworker_get(dcb->poll.thread.id); // The owning worker
MXS_WORKER* worker = static_cast<MXS_WORKER*>(dcb->poll.owner); // The owning worker
ss_dassert(worker);
intptr_t arg1 = (intptr_t)cb_dcb_close_in_owning_thread;
@ -1197,11 +1198,12 @@ void dcb_final_close(DCB* dcb)
{
#if defined(SS_DEBUG)
int wid = Worker::get_current_id();
if ((wid != -1) && (dcb->poll.thread.id != wid))
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->poll.owner);
if ((wid != -1) && (owner->id() != wid))
{
MXS_ALERT("dcb_final_close(%p) called by %d, owned by %d.",
dcb, wid, dcb->poll.thread.id);
ss_dassert(dcb->poll.thread.id == Worker::get_current_id());
dcb, wid, owner->id());
ss_dassert(owner->id() == Worker::get_current_id());
}
#endif
ss_dassert(dcb->n_close != 0);
@ -1298,6 +1300,7 @@ void dcb_final_close(DCB* dcb)
static bool
dcb_maybe_add_persistent(DCB *dcb)
{
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->poll.owner);
if (dcb->user != NULL
&& (dcb->func.established == NULL || dcb->func.established(dcb))
&& strlen(dcb->user)
@ -1308,7 +1311,7 @@ dcb_maybe_add_persistent(DCB *dcb)
&& (dcb->server->status & SERVER_RUNNING)
&& !dcb->dcb_errhandle_called
&& !(dcb->flags & DCBF_HUNG)
&& dcb_persistent_clean_count(dcb, dcb->poll.thread.id, false) < dcb->server->persistpoolmax
&& dcb_persistent_clean_count(dcb, owner->id(), false) < dcb->server->persistpoolmax
&& dcb->server->stats.n_persistent < dcb->server->persistpoolmax)
{
DCB_CALLBACK *loopcallback;
@ -1345,8 +1348,8 @@ dcb_maybe_add_persistent(DCB *dcb)
dcb->delayq = NULL;
dcb->writeq = NULL;
dcb->nextpersistent = dcb->server->persistent[dcb->poll.thread.id];
dcb->server->persistent[dcb->poll.thread.id] = dcb;
dcb->nextpersistent = dcb->server->persistent[owner->id()];
dcb->server->persistent[owner->id()] = dcb;
atomic_add(&dcb->server->stats.n_persistent, 1);
atomic_add(&dcb->server->stats.n_current, -1);
return true;
@ -2768,7 +2771,7 @@ static void dcb_add_to_list_cb(int thread_id, void* data)
{
DCB *dcb = (DCB*)data;
ss_dassert(thread_id == dcb->poll.thread.id);
ss_dassert(thread_id == static_cast<RoutingWorker*>(dcb->poll.owner)->id());
dcb_add_to_list(dcb);
}
@ -2783,17 +2786,18 @@ static void dcb_add_to_list(DCB *dcb)
* is not in the list. Stopped listeners are not removed from the list.
*/
ss_dassert(dcb->poll.thread.id == RoutingWorker::get_current_id());
int id = static_cast<RoutingWorker*>(dcb->poll.owner)->id();
ss_dassert(id == RoutingWorker::get_current_id());
if (this_unit.all_dcbs[dcb->poll.thread.id] == NULL)
if (this_unit.all_dcbs[id] == NULL)
{
this_unit.all_dcbs[dcb->poll.thread.id] = dcb;
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
this_unit.all_dcbs[id] = dcb;
this_unit.all_dcbs[id]->thread.tail = dcb;
}
else
{
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb;
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
this_unit.all_dcbs[id]->thread.tail->thread.next = dcb;
this_unit.all_dcbs[id]->thread.tail = dcb;
}
}
}
@ -2807,14 +2811,16 @@ static void dcb_remove_from_list(DCB *dcb)
{
if (dcb->dcb_role != DCB_ROLE_SERVICE_LISTENER)
{
if (dcb == this_unit.all_dcbs[dcb->poll.thread.id])
{
DCB *tail = this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail;
this_unit.all_dcbs[dcb->poll.thread.id] = this_unit.all_dcbs[dcb->poll.thread.id]->thread.next;
int id = static_cast<RoutingWorker*>(dcb->poll.owner)->id();
if (this_unit.all_dcbs[dcb->poll.thread.id])
if (dcb == this_unit.all_dcbs[id])
{
DCB *tail = this_unit.all_dcbs[id]->thread.tail;
this_unit.all_dcbs[id] = this_unit.all_dcbs[id]->thread.next;
if (this_unit.all_dcbs[id])
{
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = tail;
this_unit.all_dcbs[id]->thread.tail = tail;
}
}
else
@ -2822,16 +2828,16 @@ static void dcb_remove_from_list(DCB *dcb)
// If the creation of the DCB failed, it will not have been added
// to the list at all. And if it happened to be the first DCB to be
// created, then `prev` is NULL at this point.
DCB *prev = this_unit.all_dcbs[dcb->poll.thread.id];
DCB *prev = this_unit.all_dcbs[id];
DCB *current = prev ? prev->thread.next : NULL;
while (current)
{
if (current == dcb)
{
if (current == this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail)
if (current == this_unit.all_dcbs[id]->thread.tail)
{
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = prev;
this_unit.all_dcbs[id]->thread.tail = prev;
}
prev->thread.next = current->thread.next;
break;
@ -3007,7 +3013,8 @@ int dcb_get_port(const DCB *dcb)
static uint32_t dcb_process_poll_events(DCB *dcb, uint32_t events)
{
ss_dassert(dcb->poll.thread.id == mxs::Worker::get_current_id() ||
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->poll.owner);
ss_dassert(owner->id() == mxs::Worker::get_current_id() ||
dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER);
CHK_DCB(dcb);
@ -3031,7 +3038,7 @@ static uint32_t dcb_process_poll_events(DCB *dcb, uint32_t events)
if (dcb->n_close != 0)
{
MXS_WARNING("Events reported for dcb(%p), owned by %d, that has been closed %" PRIu32 " times.",
dcb, dcb->poll.thread.id, dcb->n_close);
dcb, owner->id(), dcb->n_close);
ss_dassert(!true);
return rc;
}
@ -3313,7 +3320,7 @@ static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, uint32_t ev)
if (task)
{
Worker* worker = Worker::get(dcb->poll.thread.id);
RoutingWorker* worker = static_cast<RoutingWorker*>(dcb->poll.owner);
worker->post(std::auto_ptr<FakeEventTask>(task), mxs::Worker::EXECUTE_QUEUED);
}
else
@ -3412,7 +3419,9 @@ public:
void execute(Worker& worker)
{
ss_dassert(worker.id() == m_dcb->poll.thread.id);
RoutingWorker& rworker = static_cast<RoutingWorker&>(worker);
ss_dassert(rworker.id() == static_cast<RoutingWorker*>(m_dcb->poll.owner)->id());
bool added = dcb_add_to_worker(worker.id(), m_dcb, m_events);
ss_dassert(added);
@ -3441,7 +3450,7 @@ static bool dcb_add_to_worker(int worker_id, DCB* dcb, uint32_t events)
{
// If this takes place on the main thread (all listening DCBs are
// stored on the main thread),
if (dcb->poll.thread.id == RoutingWorker::get_current_id())
if (dcb->poll.owner == RoutingWorker::get_current())
{
// we'll add it immediately to the list,
dcb_add_to_list(dcb);
@ -3450,7 +3459,7 @@ static bool dcb_add_to_worker(int worker_id, DCB* dcb, uint32_t events)
{
// otherwise we must move the adding to the main thread.
// TODO: Separate listening and other DCBs, as this is a mess.
Worker* worker = RoutingWorker::get(dcb->poll.thread.id);
RoutingWorker* worker = static_cast<RoutingWorker*>(dcb->poll.owner);
ss_dassert(worker);
intptr_t arg1 = (intptr_t)dcb_add_to_list_cb;
@ -3467,7 +3476,7 @@ static bool dcb_add_to_worker(int worker_id, DCB* dcb, uint32_t events)
}
else
{
ss_dassert(worker_id == dcb->poll.thread.id);
ss_dassert(worker_id == static_cast<RoutingWorker*>(dcb->poll.owner)->id());
if (worker_id == RoutingWorker::get_current_id())
{
@ -3489,7 +3498,7 @@ static bool dcb_add_to_worker(int worker_id, DCB* dcb, uint32_t events)
if (task)
{
Worker* worker = RoutingWorker::get(dcb->poll.thread.id);
Worker* worker = static_cast<RoutingWorker*>(dcb->poll.owner);
ss_dassert(worker);
if (worker->post(std::auto_ptr<AddDcbToWorker>(task), mxs::Worker::EXECUTE_QUEUED))
@ -3548,18 +3557,19 @@ int poll_add_dcb(DCB *dcb)
// handled by different worker threads.
// See: https://jira.mariadb.org/browse/MXS-1805 and https://jira.mariadb.org/browse/MXS-1833
new_state = DCB_STATE_POLLING;
dcb->poll.thread.id = 0;
worker_id = dcb->poll.thread.id;
RoutingWorker* owner = RoutingWorker::get(RoutingWorker::MAIN);
dcb->poll.owner = owner;
worker_id = owner->id();
}
else
{
ss_dassert(dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER ||
dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
ss_dassert(RoutingWorker::get_current_id() != -1);
ss_dassert(RoutingWorker::get_current_id() == dcb->poll.thread.id);
ss_dassert(RoutingWorker::get_current() == dcb->poll.owner);
new_state = DCB_STATE_POLLING;
worker_id = dcb->poll.thread.id;
worker_id = static_cast<RoutingWorker*>(dcb->poll.owner)->id();
}
/**
@ -3580,7 +3590,7 @@ int poll_add_dcb(DCB *dcb)
* debug asserts will be triggered.
*/
dcb->state = old_state;
dcb->poll.thread.id = Worker::get_current_id();
dcb->poll.owner = RoutingWorker::get_current();
rc = -1;
}
@ -3628,7 +3638,7 @@ int poll_remove_dcb(DCB *dcb)
}
else
{
worker_id = dcb->poll.thread.id;
worker_id = static_cast<RoutingWorker*>(dcb->poll.owner)->id();
}
if (poll_remove_fd_from_worker(worker_id, dcbfd))

View File

@ -67,30 +67,30 @@ static bool add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* da
static bool add_fd_to_routing_workers(int fd, uint32_t events, MXS_POLL_DATA* data)
{
bool rv = true;
int thread_id = data->thread.id;
void* previous_owner = data->owner;
rv = RoutingWorker::add_shared_fd(fd, events, data);
if (rv)
{
// The DCB will appear on the list of the calling thread.
int wid = RoutingWorker::get_current_id();
RoutingWorker* worker = RoutingWorker::get_current();
if (wid == -1)
if (!worker)
{
// TODO: Listeners are created before the workers have been started.
// TODO: Hence the returned id will be -1. We change it to 0, which in
// TODO: practice will mean that they will end up on the Worker running
// TODO: in the main thread. This needs to be sorted out.
wid = 0;
worker = RoutingWorker::get(RoutingWorker::MAIN);
}
data->thread.id = wid;
data->owner = worker;
}
else
{
// Restore the situation.
data->thread.id = thread_id;
data->owner = previous_owner;
}
return rv;

View File

@ -163,7 +163,7 @@ namespace maxscale
RoutingWorker::RoutingWorker()
{
MXS_POLL_DATA::handler = &RoutingWorker::epoll_instance_handler;
MXS_POLL_DATA::thread.id = m_id;
MXS_POLL_DATA::owner = this;
}
RoutingWorker::~RoutingWorker()
@ -306,7 +306,7 @@ bool RoutingWorker::add_shared_fd(int fd, uint32_t events, MXS_POLL_DATA* pData)
ev.events = events;
ev.data.ptr = pData;
pData->thread.id = 0; // TODO: Remove the thread id altogether.
pData->owner = RoutingWorker::get(RoutingWorker::MAIN);
if (epoll_ctl(this_unit.epoll_listener_fd, EPOLL_CTL_ADD, fd, &ev) != 0)
{
@ -431,7 +431,7 @@ RoutingWorker::SessionsById& RoutingWorker::session_registry()
void RoutingWorker::register_zombie(DCB* pDcb)
{
ss_dassert(pDcb->poll.thread.id == m_id);
ss_dassert(pDcb->poll.owner == this);
m_zombies.push_back(pDcb);
}

View File

@ -281,7 +281,7 @@ void session_link_backend_dcb(MXS_SESSION *session, DCB *dcb)
dcb->session = session;
dcb->service = session->service;
/** Move this DCB under the same thread */
dcb->poll.thread.id = session->client_dcb->poll.thread.id;
dcb->poll.owner = session->client_dcb->poll.owner;
session->dcb_set->insert(dcb);
}
@ -1410,7 +1410,7 @@ bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buf
try
{
Worker* worker = Worker::get_current();
ss_dassert(worker == Worker::get(session->client_dcb->poll.thread.id));
ss_dassert(worker == session->client_dcb->poll.owner);
std::auto_ptr<DelayedRoutingTask> task(new DelayedRoutingTask(session, down, buffer));
// Delay the routing for at least a millisecond

View File

@ -204,7 +204,7 @@ WorkerTimer::WorkerTimer(Worker* pWorker)
, m_pWorker(pWorker)
{
MXS_POLL_DATA::handler = handler;
MXS_POLL_DATA::thread.id = m_pWorker->id();
MXS_POLL_DATA::owner = m_pWorker;
if (m_fd != -1)
{
@ -571,7 +571,7 @@ bool Worker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData)
ev.events = events;
ev.data.ptr = pData;
pData->thread.id = m_id;
pData->owner = this;
if (epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev) == 0)
{

View File

@ -211,7 +211,7 @@ static void blr_start_master(void* data)
* 'client' is the fake DCB that emulates a client session:
* we need to set the poll.thread.id for the "dummy client"
*/
client->session->client_dcb->poll.thread.id = mxs_rworker_get_current_id();
client->session->client_dcb->poll.owner = mxs_rworker_get_current();
/* Connect to configured master server */
if ((router->master = dcb_connect(router->service->dbref->server,