diff --git a/include/maxscale/poll_core.h b/include/maxscale/poll_core.h index 0cc90e22b..5390df3a3 100644 --- a/include/maxscale/poll_core.h +++ b/include/maxscale/poll_core.h @@ -49,10 +49,7 @@ typedef uint32_t (*mxs_poll_handler_t)(struct mxs_poll_data* data, int wid, uint typedef struct mxs_poll_data { mxs_poll_handler_t handler; /*< Handler for this particular kind of mxs_poll_data. */ - struct - { - int id; /*< The id of the worker thread. */ - } thread; + void* owner; /*< Owning worker. */ } MXS_POLL_DATA; /** diff --git a/include/maxscale/poll_core.hh b/include/maxscale/poll_core.hh index 71e1018b0..60a642ffc 100644 --- a/include/maxscale/poll_core.hh +++ b/include/maxscale/poll_core.hh @@ -23,13 +23,13 @@ struct MxsPollData : MXS_POLL_DATA MxsPollData() { handler = NULL; - thread.id = 0; + owner = nullptr; } MxsPollData(mxs_poll_handler_t h) { handler = h; - thread.id = 0; + owner = nullptr; } }; diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 38cb15c0e..9ea12fdb9 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -205,13 +205,13 @@ dcb_alloc(dcb_role_t role, SERV_LISTENER *listener) if (role == DCB_ROLE_SERVICE_LISTENER) { /** All listeners are owned by the main thread (i.e. thread no. 0) */ - newdcb->poll.thread.id = 0; + newdcb->poll.owner = RoutingWorker::get(RoutingWorker::MAIN); } else { /** Otherwise the DCB is owned by the thread that allocates it */ ss_dassert(RoutingWorker::get_current_id() != -1); - newdcb->poll.thread.id = RoutingWorker::get_current_id(); + newdcb->poll.owner = RoutingWorker::get_current(); } return newdcb; @@ -345,7 +345,7 @@ dcb_free_all_memory(DCB *dcb) } // Ensure that id is immediately the wrong one. - dcb->poll.thread.id = 0xdeadbeef; + dcb->poll.owner = reinterpret_cast(0xdeadbeef); MXS_FREE(dcb); } @@ -393,8 +393,8 @@ dcb_connect(SERVER *server, MXS_SESSION *session, const char *protocol) if (user && strlen(user)) { MXS_DEBUG("Looking for persistent connection DCB user %s protocol %s", user, protocol); - dcb = server_get_persistent(server, user, session->client_dcb->remote, - protocol, session->client_dcb->poll.thread.id); + dcb = server_get_persistent(server, user, session->client_dcb->remote, protocol, + static_cast(session->client_dcb->poll.owner)->id()); if (dcb) { /** @@ -1103,11 +1103,12 @@ void dcb_close(DCB *dcb) #if defined(SS_DEBUG) int wid = Worker::get_current_id(); - if ((wid != -1) && (dcb->poll.thread.id != wid)) + RoutingWorker* owner = static_cast(dcb->poll.owner); + if ((wid != -1) && (owner->id() != wid)) { MXS_ALERT("dcb_close(%p) called by %d, owned by %d.", - dcb, wid, dcb->poll.thread.id); - ss_dassert(dcb->poll.thread.id == Worker::get_current_id()); + dcb, wid, owner->id()); + ss_dassert(owner == RoutingWorker::get_current()); } #endif @@ -1150,7 +1151,7 @@ void dcb_close(DCB *dcb) } else { - RoutingWorker* worker = RoutingWorker::get(dcb->poll.thread.id); + RoutingWorker* worker = static_cast(dcb->poll.owner); ss_dassert(worker); worker->register_zombie(dcb); @@ -1181,7 +1182,7 @@ void dcb_close_in_owning_thread(DCB* dcb) // TODO: reference counted, so that we could addref before posting, thus // TODO: preventing too early a deletion. - MXS_WORKER* worker = mxs_rworker_get(dcb->poll.thread.id); // The owning worker + MXS_WORKER* worker = static_cast(dcb->poll.owner); // The owning worker ss_dassert(worker); intptr_t arg1 = (intptr_t)cb_dcb_close_in_owning_thread; @@ -1197,11 +1198,12 @@ void dcb_final_close(DCB* dcb) { #if defined(SS_DEBUG) int wid = Worker::get_current_id(); - if ((wid != -1) && (dcb->poll.thread.id != wid)) + RoutingWorker* owner = static_cast(dcb->poll.owner); + if ((wid != -1) && (owner->id() != wid)) { MXS_ALERT("dcb_final_close(%p) called by %d, owned by %d.", - dcb, wid, dcb->poll.thread.id); - ss_dassert(dcb->poll.thread.id == Worker::get_current_id()); + dcb, wid, owner->id()); + ss_dassert(owner->id() == Worker::get_current_id()); } #endif ss_dassert(dcb->n_close != 0); @@ -1298,6 +1300,7 @@ void dcb_final_close(DCB* dcb) static bool dcb_maybe_add_persistent(DCB *dcb) { + RoutingWorker* owner = static_cast(dcb->poll.owner); if (dcb->user != NULL && (dcb->func.established == NULL || dcb->func.established(dcb)) && strlen(dcb->user) @@ -1308,7 +1311,7 @@ dcb_maybe_add_persistent(DCB *dcb) && (dcb->server->status & SERVER_RUNNING) && !dcb->dcb_errhandle_called && !(dcb->flags & DCBF_HUNG) - && dcb_persistent_clean_count(dcb, dcb->poll.thread.id, false) < dcb->server->persistpoolmax + && dcb_persistent_clean_count(dcb, owner->id(), false) < dcb->server->persistpoolmax && dcb->server->stats.n_persistent < dcb->server->persistpoolmax) { DCB_CALLBACK *loopcallback; @@ -1345,8 +1348,8 @@ dcb_maybe_add_persistent(DCB *dcb) dcb->delayq = NULL; dcb->writeq = NULL; - dcb->nextpersistent = dcb->server->persistent[dcb->poll.thread.id]; - dcb->server->persistent[dcb->poll.thread.id] = dcb; + dcb->nextpersistent = dcb->server->persistent[owner->id()]; + dcb->server->persistent[owner->id()] = dcb; atomic_add(&dcb->server->stats.n_persistent, 1); atomic_add(&dcb->server->stats.n_current, -1); return true; @@ -2768,7 +2771,7 @@ static void dcb_add_to_list_cb(int thread_id, void* data) { DCB *dcb = (DCB*)data; - ss_dassert(thread_id == dcb->poll.thread.id); + ss_dassert(thread_id == static_cast(dcb->poll.owner)->id()); dcb_add_to_list(dcb); } @@ -2783,17 +2786,18 @@ static void dcb_add_to_list(DCB *dcb) * is not in the list. Stopped listeners are not removed from the list. */ - ss_dassert(dcb->poll.thread.id == RoutingWorker::get_current_id()); + int id = static_cast(dcb->poll.owner)->id(); + ss_dassert(id == RoutingWorker::get_current_id()); - if (this_unit.all_dcbs[dcb->poll.thread.id] == NULL) + if (this_unit.all_dcbs[id] == NULL) { - this_unit.all_dcbs[dcb->poll.thread.id] = dcb; - this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb; + this_unit.all_dcbs[id] = dcb; + this_unit.all_dcbs[id]->thread.tail = dcb; } else { - this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb; - this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb; + this_unit.all_dcbs[id]->thread.tail->thread.next = dcb; + this_unit.all_dcbs[id]->thread.tail = dcb; } } } @@ -2807,14 +2811,16 @@ static void dcb_remove_from_list(DCB *dcb) { if (dcb->dcb_role != DCB_ROLE_SERVICE_LISTENER) { - if (dcb == this_unit.all_dcbs[dcb->poll.thread.id]) - { - DCB *tail = this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail; - this_unit.all_dcbs[dcb->poll.thread.id] = this_unit.all_dcbs[dcb->poll.thread.id]->thread.next; + int id = static_cast(dcb->poll.owner)->id(); - if (this_unit.all_dcbs[dcb->poll.thread.id]) + if (dcb == this_unit.all_dcbs[id]) + { + DCB *tail = this_unit.all_dcbs[id]->thread.tail; + this_unit.all_dcbs[id] = this_unit.all_dcbs[id]->thread.next; + + if (this_unit.all_dcbs[id]) { - this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = tail; + this_unit.all_dcbs[id]->thread.tail = tail; } } else @@ -2822,16 +2828,16 @@ static void dcb_remove_from_list(DCB *dcb) // If the creation of the DCB failed, it will not have been added // to the list at all. And if it happened to be the first DCB to be // created, then `prev` is NULL at this point. - DCB *prev = this_unit.all_dcbs[dcb->poll.thread.id]; + DCB *prev = this_unit.all_dcbs[id]; DCB *current = prev ? prev->thread.next : NULL; while (current) { if (current == dcb) { - if (current == this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail) + if (current == this_unit.all_dcbs[id]->thread.tail) { - this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = prev; + this_unit.all_dcbs[id]->thread.tail = prev; } prev->thread.next = current->thread.next; break; @@ -3007,7 +3013,8 @@ int dcb_get_port(const DCB *dcb) static uint32_t dcb_process_poll_events(DCB *dcb, uint32_t events) { - ss_dassert(dcb->poll.thread.id == mxs::Worker::get_current_id() || + RoutingWorker* owner = static_cast(dcb->poll.owner); + ss_dassert(owner->id() == mxs::Worker::get_current_id() || dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER); CHK_DCB(dcb); @@ -3031,7 +3038,7 @@ static uint32_t dcb_process_poll_events(DCB *dcb, uint32_t events) if (dcb->n_close != 0) { MXS_WARNING("Events reported for dcb(%p), owned by %d, that has been closed %" PRIu32 " times.", - dcb, dcb->poll.thread.id, dcb->n_close); + dcb, owner->id(), dcb->n_close); ss_dassert(!true); return rc; } @@ -3313,7 +3320,7 @@ static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, uint32_t ev) if (task) { - Worker* worker = Worker::get(dcb->poll.thread.id); + RoutingWorker* worker = static_cast(dcb->poll.owner); worker->post(std::auto_ptr(task), mxs::Worker::EXECUTE_QUEUED); } else @@ -3412,7 +3419,9 @@ public: void execute(Worker& worker) { - ss_dassert(worker.id() == m_dcb->poll.thread.id); + RoutingWorker& rworker = static_cast(worker); + + ss_dassert(rworker.id() == static_cast(m_dcb->poll.owner)->id()); bool added = dcb_add_to_worker(worker.id(), m_dcb, m_events); ss_dassert(added); @@ -3441,7 +3450,7 @@ static bool dcb_add_to_worker(int worker_id, DCB* dcb, uint32_t events) { // If this takes place on the main thread (all listening DCBs are // stored on the main thread), - if (dcb->poll.thread.id == RoutingWorker::get_current_id()) + if (dcb->poll.owner == RoutingWorker::get_current()) { // we'll add it immediately to the list, dcb_add_to_list(dcb); @@ -3450,7 +3459,7 @@ static bool dcb_add_to_worker(int worker_id, DCB* dcb, uint32_t events) { // otherwise we must move the adding to the main thread. // TODO: Separate listening and other DCBs, as this is a mess. - Worker* worker = RoutingWorker::get(dcb->poll.thread.id); + RoutingWorker* worker = static_cast(dcb->poll.owner); ss_dassert(worker); intptr_t arg1 = (intptr_t)dcb_add_to_list_cb; @@ -3467,7 +3476,7 @@ static bool dcb_add_to_worker(int worker_id, DCB* dcb, uint32_t events) } else { - ss_dassert(worker_id == dcb->poll.thread.id); + ss_dassert(worker_id == static_cast(dcb->poll.owner)->id()); if (worker_id == RoutingWorker::get_current_id()) { @@ -3489,7 +3498,7 @@ static bool dcb_add_to_worker(int worker_id, DCB* dcb, uint32_t events) if (task) { - Worker* worker = RoutingWorker::get(dcb->poll.thread.id); + Worker* worker = static_cast(dcb->poll.owner); ss_dassert(worker); if (worker->post(std::auto_ptr(task), mxs::Worker::EXECUTE_QUEUED)) @@ -3548,18 +3557,19 @@ int poll_add_dcb(DCB *dcb) // handled by different worker threads. // See: https://jira.mariadb.org/browse/MXS-1805 and https://jira.mariadb.org/browse/MXS-1833 new_state = DCB_STATE_POLLING; - dcb->poll.thread.id = 0; - worker_id = dcb->poll.thread.id; + RoutingWorker* owner = RoutingWorker::get(RoutingWorker::MAIN); + dcb->poll.owner = owner; + worker_id = owner->id(); } else { ss_dassert(dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER || dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); ss_dassert(RoutingWorker::get_current_id() != -1); - ss_dassert(RoutingWorker::get_current_id() == dcb->poll.thread.id); + ss_dassert(RoutingWorker::get_current() == dcb->poll.owner); new_state = DCB_STATE_POLLING; - worker_id = dcb->poll.thread.id; + worker_id = static_cast(dcb->poll.owner)->id(); } /** @@ -3580,7 +3590,7 @@ int poll_add_dcb(DCB *dcb) * debug asserts will be triggered. */ dcb->state = old_state; - dcb->poll.thread.id = Worker::get_current_id(); + dcb->poll.owner = RoutingWorker::get_current(); rc = -1; } @@ -3628,7 +3638,7 @@ int poll_remove_dcb(DCB *dcb) } else { - worker_id = dcb->poll.thread.id; + worker_id = static_cast(dcb->poll.owner)->id(); } if (poll_remove_fd_from_worker(worker_id, dcbfd)) diff --git a/server/core/poll.cc b/server/core/poll.cc index 5a114985c..c6d1cc4fc 100644 --- a/server/core/poll.cc +++ b/server/core/poll.cc @@ -67,30 +67,30 @@ static bool add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* da static bool add_fd_to_routing_workers(int fd, uint32_t events, MXS_POLL_DATA* data) { bool rv = true; - int thread_id = data->thread.id; + void* previous_owner = data->owner; rv = RoutingWorker::add_shared_fd(fd, events, data); if (rv) { // The DCB will appear on the list of the calling thread. - int wid = RoutingWorker::get_current_id(); + RoutingWorker* worker = RoutingWorker::get_current(); - if (wid == -1) + if (!worker) { // TODO: Listeners are created before the workers have been started. // TODO: Hence the returned id will be -1. We change it to 0, which in // TODO: practice will mean that they will end up on the Worker running // TODO: in the main thread. This needs to be sorted out. - wid = 0; + worker = RoutingWorker::get(RoutingWorker::MAIN); } - data->thread.id = wid; + data->owner = worker; } else { // Restore the situation. - data->thread.id = thread_id; + data->owner = previous_owner; } return rv; diff --git a/server/core/routingworker.cc b/server/core/routingworker.cc index f50531aec..472dbd6f8 100644 --- a/server/core/routingworker.cc +++ b/server/core/routingworker.cc @@ -163,7 +163,7 @@ namespace maxscale RoutingWorker::RoutingWorker() { MXS_POLL_DATA::handler = &RoutingWorker::epoll_instance_handler; - MXS_POLL_DATA::thread.id = m_id; + MXS_POLL_DATA::owner = this; } RoutingWorker::~RoutingWorker() @@ -306,7 +306,7 @@ bool RoutingWorker::add_shared_fd(int fd, uint32_t events, MXS_POLL_DATA* pData) ev.events = events; ev.data.ptr = pData; - pData->thread.id = 0; // TODO: Remove the thread id altogether. + pData->owner = RoutingWorker::get(RoutingWorker::MAIN); if (epoll_ctl(this_unit.epoll_listener_fd, EPOLL_CTL_ADD, fd, &ev) != 0) { @@ -431,7 +431,7 @@ RoutingWorker::SessionsById& RoutingWorker::session_registry() void RoutingWorker::register_zombie(DCB* pDcb) { - ss_dassert(pDcb->poll.thread.id == m_id); + ss_dassert(pDcb->poll.owner == this); m_zombies.push_back(pDcb); } diff --git a/server/core/session.cc b/server/core/session.cc index 95abd6da4..379fe9e4b 100644 --- a/server/core/session.cc +++ b/server/core/session.cc @@ -281,7 +281,7 @@ void session_link_backend_dcb(MXS_SESSION *session, DCB *dcb) dcb->session = session; dcb->service = session->service; /** Move this DCB under the same thread */ - dcb->poll.thread.id = session->client_dcb->poll.thread.id; + dcb->poll.owner = session->client_dcb->poll.owner; session->dcb_set->insert(dcb); } @@ -1410,7 +1410,7 @@ bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buf try { Worker* worker = Worker::get_current(); - ss_dassert(worker == Worker::get(session->client_dcb->poll.thread.id)); + ss_dassert(worker == session->client_dcb->poll.owner); std::auto_ptr task(new DelayedRoutingTask(session, down, buffer)); // Delay the routing for at least a millisecond diff --git a/server/core/worker.cc b/server/core/worker.cc index cef585ba6..0d26d5fa3 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -204,7 +204,7 @@ WorkerTimer::WorkerTimer(Worker* pWorker) , m_pWorker(pWorker) { MXS_POLL_DATA::handler = handler; - MXS_POLL_DATA::thread.id = m_pWorker->id(); + MXS_POLL_DATA::owner = m_pWorker; if (m_fd != -1) { @@ -571,7 +571,7 @@ bool Worker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData) ev.events = events; ev.data.ptr = pData; - pData->thread.id = m_id; + pData->owner = this; if (epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev) == 0) { diff --git a/server/modules/routing/binlogrouter/blr_master.cc b/server/modules/routing/binlogrouter/blr_master.cc index f3d9031f8..2e27a97d0 100644 --- a/server/modules/routing/binlogrouter/blr_master.cc +++ b/server/modules/routing/binlogrouter/blr_master.cc @@ -211,7 +211,7 @@ static void blr_start_master(void* data) * 'client' is the fake DCB that emulates a client session: * we need to set the poll.thread.id for the "dummy client" */ - client->session->client_dcb->poll.thread.id = mxs_rworker_get_current_id(); + client->session->client_dcb->poll.owner = mxs_rworker_get_current(); /* Connect to configured master server */ if ((router->master = dcb_connect(router->service->dbref->server,