MXS-1915 Remove id from mxs::Worker
The id has now been moved from mxs::Worker to mxs::RoutingWorker and the implications are felt in many places. The primary need for the id was to be able to access worker specfic data, maintained outside of a routing worker, when given a worker (the id is used to index into an array). Slightly related to that was the need to be able to iterate over all workers. That obviously implies some kind of collection. That causes all sorts of issues if there is a need for being able to create and destroy a worker at runtime. With the id removed from mxs::Worker all those issues are gone, and its perfectly ok to create and destory mxs::Workers as needed. Further, while there is a need to broadcast a particular message to all _routing_ workers, it hardly makes sense to broadcast a particular message too _all_ workers. Consequently, only routing workers are kept in a collection and all static member functions dealing with all workers (e.g. broadcast) have now been moved to mxs::RoutingWorker. Now, instead of passing the id around we instead deal directly with the worker pointer. Later the data in all those external arrays will be moved into mxs::[Worker|RoutingWorker] so that worker related data is maintained in exactly one place.
This commit is contained in:
@ -115,11 +115,11 @@ static int dcb_listen_create_socket_unix(const char *path);
|
||||
static int dcb_set_socket_option(int sockfd, int level, int optname, void *optval, socklen_t optlen);
|
||||
static void dcb_add_to_all_list(DCB *dcb);
|
||||
static void dcb_add_to_list(DCB *dcb);
|
||||
static bool dcb_add_to_worker(int worker_id, DCB *dcb, uint32_t events);
|
||||
static bool dcb_add_to_worker(Worker* worker, DCB *dcb, uint32_t events);
|
||||
static DCB *dcb_find_free();
|
||||
static void dcb_remove_from_list(DCB *dcb);
|
||||
|
||||
static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t events);
|
||||
static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, void* worker, uint32_t events);
|
||||
static uint32_t dcb_process_poll_events(DCB *dcb, uint32_t ev);
|
||||
static bool dcb_session_check(DCB *dcb, const char *);
|
||||
static int upstream_throttle_callback(DCB *dcb, DCB_REASON reason, void *userdata);
|
||||
@ -1102,12 +1102,12 @@ void dcb_close(DCB *dcb)
|
||||
CHK_DCB(dcb);
|
||||
|
||||
#if defined(SS_DEBUG)
|
||||
int wid = Worker::get_current_id();
|
||||
RoutingWorker* current = RoutingWorker::get_current();
|
||||
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->poll.owner);
|
||||
if ((wid != -1) && (owner->id() != wid))
|
||||
if (current && (current != owner))
|
||||
{
|
||||
MXS_ALERT("dcb_close(%p) called by %d, owned by %d.",
|
||||
dcb, wid, owner->id());
|
||||
dcb, current->id(), owner->id());
|
||||
ss_dassert(owner == RoutingWorker::get_current());
|
||||
}
|
||||
#endif
|
||||
@ -1166,7 +1166,7 @@ void dcb_close(DCB *dcb)
|
||||
}
|
||||
}
|
||||
|
||||
static void cb_dcb_close_in_owning_thread(int worker_id, void* data)
|
||||
static void cb_dcb_close_in_owning_thread(MXS_WORKER*, void* data)
|
||||
{
|
||||
DCB* dcb = static_cast<DCB*>(data);
|
||||
ss_dassert(dcb);
|
||||
@ -1197,13 +1197,13 @@ void dcb_close_in_owning_thread(DCB* dcb)
|
||||
void dcb_final_close(DCB* dcb)
|
||||
{
|
||||
#if defined(SS_DEBUG)
|
||||
int wid = Worker::get_current_id();
|
||||
RoutingWorker* current = RoutingWorker::get_current();
|
||||
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->poll.owner);
|
||||
if ((wid != -1) && (owner->id() != wid))
|
||||
if (current && (current != owner))
|
||||
{
|
||||
MXS_ALERT("dcb_final_close(%p) called by %d, owned by %d.",
|
||||
dcb, wid, owner->id());
|
||||
ss_dassert(owner->id() == Worker::get_current_id());
|
||||
dcb, current->id(), owner->id());
|
||||
ss_dassert(owner == current);
|
||||
}
|
||||
#endif
|
||||
ss_dassert(dcb->n_close != 0);
|
||||
@ -2000,9 +2000,12 @@ dcb_call_callback(DCB *dcb, DCB_REASON reason)
|
||||
}
|
||||
}
|
||||
|
||||
static void dcb_hangup_foreach_worker(int thread_id, struct server* server)
|
||||
static void dcb_hangup_foreach_worker(MXS_WORKER* worker, struct server* server)
|
||||
{
|
||||
for (DCB *dcb = this_unit.all_dcbs[thread_id]; dcb; dcb = dcb->thread.next)
|
||||
RoutingWorker* rworker = static_cast<RoutingWorker*>(worker);
|
||||
int id = rworker->id();
|
||||
|
||||
for (DCB *dcb = this_unit.all_dcbs[id]; dcb; dcb = dcb->thread.next)
|
||||
{
|
||||
if (dcb->state == DCB_STATE_POLLING && dcb->server &&
|
||||
dcb->server == server)
|
||||
@ -2024,7 +2027,7 @@ dcb_hangup_foreach(struct server* server)
|
||||
intptr_t arg1 = (intptr_t)dcb_hangup_foreach_worker;
|
||||
intptr_t arg2 = (intptr_t)server;
|
||||
|
||||
Worker::broadcast_message(MXS_WORKER_MSG_CALL, arg1, arg2);
|
||||
RoutingWorker::broadcast_message(MXS_WORKER_MSG_CALL, arg1, arg2);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -2917,7 +2920,8 @@ public:
|
||||
|
||||
void execute(Worker& worker)
|
||||
{
|
||||
int thread_id = worker.id();
|
||||
RoutingWorker& rworker = static_cast<RoutingWorker&>(worker);
|
||||
int thread_id = rworker.id();
|
||||
|
||||
for (DCB *dcb = this_unit.all_dcbs[thread_id];
|
||||
dcb && atomic_load_int32(&m_more);
|
||||
@ -2950,7 +2954,7 @@ private:
|
||||
bool dcb_foreach(bool(*func)(DCB *dcb, void *data), void *data)
|
||||
{
|
||||
SerialDcbTask task(func, data);
|
||||
Worker::execute_serially(task);
|
||||
RoutingWorker::execute_serially(task);
|
||||
return task.more();
|
||||
}
|
||||
|
||||
@ -2967,7 +2971,8 @@ public:
|
||||
|
||||
void execute(Worker& worker)
|
||||
{
|
||||
int thread_id = worker.id();
|
||||
RoutingWorker& rworker = static_cast<RoutingWorker&>(worker);
|
||||
int thread_id = rworker.id();
|
||||
|
||||
for (DCB *dcb = this_unit.all_dcbs[thread_id]; dcb; dcb = dcb->thread.next)
|
||||
{
|
||||
@ -2986,7 +2991,7 @@ private:
|
||||
void dcb_foreach_parallel(bool(*func)(DCB *dcb, void *data), void **data)
|
||||
{
|
||||
ParallelDcbTask task(func, data);
|
||||
Worker::execute_concurrently(task);
|
||||
RoutingWorker::execute_concurrently(task);
|
||||
}
|
||||
|
||||
int dcb_get_port(const DCB *dcb)
|
||||
@ -3014,7 +3019,7 @@ int dcb_get_port(const DCB *dcb)
|
||||
static uint32_t dcb_process_poll_events(DCB *dcb, uint32_t events)
|
||||
{
|
||||
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->poll.owner);
|
||||
ss_dassert(owner->id() == mxs::Worker::get_current_id() ||
|
||||
ss_dassert(owner == RoutingWorker::get_current() ||
|
||||
dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER);
|
||||
|
||||
CHK_DCB(dcb);
|
||||
@ -3220,7 +3225,7 @@ static uint32_t dcb_handler(DCB* dcb, uint32_t events)
|
||||
return rv;
|
||||
}
|
||||
|
||||
static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t events)
|
||||
static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, void* worker, uint32_t events)
|
||||
{
|
||||
uint32_t rval = 0;
|
||||
DCB *dcb = (DCB*)data;
|
||||
@ -3277,7 +3282,10 @@ public:
|
||||
|
||||
void execute(Worker& worker)
|
||||
{
|
||||
if (dcb_is_still_valid(m_dcb, worker.get_current_id()))
|
||||
ss_dassert(&worker == RoutingWorker::get_current());
|
||||
|
||||
RoutingWorker& rworker = static_cast<RoutingWorker&>(worker);
|
||||
if (dcb_is_still_valid(m_dcb, rworker.id()))
|
||||
{
|
||||
m_dcb->fakeq = m_buffer;
|
||||
dcb_handler(m_dcb, m_ev);
|
||||
@ -3423,7 +3431,7 @@ public:
|
||||
|
||||
ss_dassert(rworker.id() == static_cast<RoutingWorker*>(m_dcb->poll.owner)->id());
|
||||
|
||||
bool added = dcb_add_to_worker(worker.id(), m_dcb, m_events);
|
||||
bool added = dcb_add_to_worker(&rworker, m_dcb, m_events);
|
||||
ss_dassert(added);
|
||||
|
||||
if (!added)
|
||||
@ -3439,20 +3447,54 @@ private:
|
||||
|
||||
}
|
||||
|
||||
static bool dcb_add_to_worker(int worker_id, DCB* dcb, uint32_t events)
|
||||
static bool add_fd_to_routing_workers(int fd, uint32_t events, MXS_POLL_DATA* data)
|
||||
{
|
||||
bool rv = true;
|
||||
void* previous_owner = data->owner;
|
||||
|
||||
rv = RoutingWorker::add_shared_fd(fd, events, data);
|
||||
|
||||
if (rv)
|
||||
{
|
||||
// The DCB will appear on the list of the calling thread.
|
||||
RoutingWorker* worker = RoutingWorker::get_current();
|
||||
|
||||
if (!worker)
|
||||
{
|
||||
// TODO: Listeners are created before the workers have been started.
|
||||
// TODO: Hence there will be no current worker. So, we just store them
|
||||
// TODO: in the main worker.
|
||||
worker = RoutingWorker::get(RoutingWorker::MAIN);
|
||||
}
|
||||
|
||||
data->owner = worker;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Restore the situation.
|
||||
data->owner = previous_owner;
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
static bool dcb_add_to_worker(Worker* worker, DCB* dcb, uint32_t events)
|
||||
{
|
||||
bool rv = false;
|
||||
|
||||
if (worker_id == MXS_WORKER_ALL)
|
||||
if (!worker)
|
||||
{
|
||||
// A listening DCB, we add it immediately (poll_add_fd_to_worker() is thread-safe).
|
||||
if (poll_add_fd_to_worker(worker_id, dcb->fd, events, (MXS_POLL_DATA*)dcb))
|
||||
// No specific worker; indicates the DCB is a listening DCB.
|
||||
ss_dassert(dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER);
|
||||
|
||||
// A listening DCB, we add it immediately.
|
||||
if (add_fd_to_routing_workers(dcb->fd, events, (MXS_POLL_DATA*)dcb))
|
||||
{
|
||||
// If this takes place on the main thread (all listening DCBs are
|
||||
// stored on the main thread),
|
||||
// stored on the main thread)...
|
||||
if (dcb->poll.owner == RoutingWorker::get_current())
|
||||
{
|
||||
// we'll add it immediately to the list,
|
||||
// ..we'll add it immediately to the list,
|
||||
dcb_add_to_list(dcb);
|
||||
}
|
||||
else
|
||||
@ -3476,13 +3518,13 @@ static bool dcb_add_to_worker(int worker_id, DCB* dcb, uint32_t events)
|
||||
}
|
||||
else
|
||||
{
|
||||
ss_dassert(worker_id == static_cast<RoutingWorker*>(dcb->poll.owner)->id());
|
||||
ss_dassert(worker == dcb->poll.owner);
|
||||
|
||||
if (worker_id == RoutingWorker::get_current_id())
|
||||
if (worker == RoutingWorker::get_current())
|
||||
{
|
||||
// If the DCB should end up on the current thread, we can both add it
|
||||
// to the epoll-instance and to the DCB book-keeping immediately.
|
||||
if (poll_add_fd_to_worker(worker_id, dcb->fd, events, (MXS_POLL_DATA*)dcb))
|
||||
if (worker->add_fd(dcb->fd, events, (MXS_POLL_DATA*)dcb))
|
||||
{
|
||||
dcb_add_to_list(dcb);
|
||||
rv = true;
|
||||
@ -3534,7 +3576,7 @@ int poll_add_dcb(DCB *dcb)
|
||||
|
||||
/** Choose new state and worker thread ID according to the role of DCB. */
|
||||
dcb_state_t new_state;
|
||||
int worker_id = 0;
|
||||
RoutingWorker* owner = nullptr;
|
||||
|
||||
if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER)
|
||||
{
|
||||
@ -3545,7 +3587,6 @@ int poll_add_dcb(DCB *dcb)
|
||||
*/
|
||||
events = EPOLLIN;
|
||||
new_state = DCB_STATE_LISTENING;
|
||||
worker_id = MXS_WORKER_ALL;
|
||||
}
|
||||
else if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER &&
|
||||
(strcasecmp(dcb->service->routerModule, "cli") == 0 ||
|
||||
@ -3557,9 +3598,8 @@ int poll_add_dcb(DCB *dcb)
|
||||
// handled by different worker threads.
|
||||
// See: https://jira.mariadb.org/browse/MXS-1805 and https://jira.mariadb.org/browse/MXS-1833
|
||||
new_state = DCB_STATE_POLLING;
|
||||
RoutingWorker* owner = RoutingWorker::get(RoutingWorker::MAIN);
|
||||
owner = RoutingWorker::get(RoutingWorker::MAIN);
|
||||
dcb->poll.owner = owner;
|
||||
worker_id = owner->id();
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -3569,7 +3609,7 @@ int poll_add_dcb(DCB *dcb)
|
||||
ss_dassert(RoutingWorker::get_current() == dcb->poll.owner);
|
||||
|
||||
new_state = DCB_STATE_POLLING;
|
||||
worker_id = static_cast<RoutingWorker*>(dcb->poll.owner)->id();
|
||||
owner = static_cast<RoutingWorker*>(dcb->poll.owner);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -3581,7 +3621,7 @@ int poll_add_dcb(DCB *dcb)
|
||||
|
||||
int rc = 0;
|
||||
|
||||
if (!dcb_add_to_worker(worker_id, dcb, events))
|
||||
if (!dcb_add_to_worker(owner, dcb, events))
|
||||
{
|
||||
/**
|
||||
* We failed to add the DCB to a worker. Revert the state so that it
|
||||
@ -3630,24 +3670,24 @@ int poll_remove_dcb(DCB *dcb)
|
||||
|
||||
if (dcbfd > 0)
|
||||
{
|
||||
int worker_id;
|
||||
rc = -1;
|
||||
|
||||
if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER)
|
||||
{
|
||||
worker_id = MXS_WORKER_ALL;
|
||||
if (RoutingWorker::remove_shared_fd(dcbfd))
|
||||
{
|
||||
rc = 0;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
worker_id = static_cast<RoutingWorker*>(dcb->poll.owner)->id();
|
||||
}
|
||||
Worker* worker = static_cast<Worker*>(dcb->poll.owner);
|
||||
ss_dassert(worker);
|
||||
|
||||
if (poll_remove_fd_from_worker(worker_id, dcbfd))
|
||||
{
|
||||
rc = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
rc = -1;
|
||||
if (worker->remove_fd(dcbfd))
|
||||
{
|
||||
rc = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
|
||||
Reference in New Issue
Block a user