diff --git a/include/maxscale/listener.hh b/include/maxscale/listener.hh index 5b39e183d..f890d3af7 100644 --- a/include/maxscale/listener.hh +++ b/include/maxscale/listener.hh @@ -45,7 +45,8 @@ public: { UNIX_SOCKET, // UNIX domain socket shared between workers SHARED_TCP, // TCP listening socket shared between workers - UNIQUE_TCP // Unique TCP listening socket for each worker + UNIQUE_TCP, // Unique TCP listening socket for each worker + MAIN_WORKER, // Listener that always moves the execution to the main worker }; /** diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 2fdec9032..8914fb0e7 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -3068,68 +3068,24 @@ static bool dcb_add_to_worker(Worker* worker, DCB* dcb, uint32_t events) int poll_add_dcb(DCB* dcb) { dcb_sanity_check(dcb); - - uint32_t events = poll_events; - - /** Choose new state and worker thread ID according to the role of DCB. */ - dcb_state_t new_state; - RoutingWorker* owner = nullptr; - - if (dcb->role == DCB::Role::CLIENT) - { - if (strcasecmp(dcb->service->router_name(), "cli") == 0 - || strcasecmp(dcb->service->router_name(), "maxinfo") == 0) - { - // If the DCB refers to an accepted maxadmin/maxinfo socket, we force it - // to the main thread. That's done in order to prevent a deadlock - // that may happen if there are multiple concurrent administrative calls, - // handled by different worker threads. - // See: https://jira.mariadb.org/browse/MXS-1805 and https://jira.mariadb.org/browse/MXS-1833 - owner = RoutingWorker::get(RoutingWorker::MAIN); - } - else if (dcb->state == DCB_STATE_NOPOLLING) - { - // This DCB was removed and added back to epoll. Assign it to the same worker it started with. - owner = static_cast(dcb->owner); - } - else - { - // Assign to current worker - owner = RoutingWorker::get_current(); - } - - new_state = DCB_STATE_POLLING; - dcb->owner = owner; - } - else - { - mxb_assert(dcb->role == DCB::Role::BACKEND); - mxb_assert(RoutingWorker::get_current_id() != -1); - mxb_assert(RoutingWorker::get_current() == dcb->owner); - - new_state = DCB_STATE_POLLING; - owner = static_cast(dcb->owner); - } + int rc = 0; + RoutingWorker* owner = static_cast(dcb->owner); + mxb_assert(owner == RoutingWorker::get_current()); /** * Assign the new state before adding the DCB to the worker and store the * old state in case we need to revert it. */ dcb_state_t old_state = dcb->state; - dcb->state = new_state; + dcb->state = DCB_STATE_POLLING; - int rc = 0; - - if (!dcb_add_to_worker(owner, dcb, events)) + if (!dcb_add_to_worker(owner, dcb, poll_events)) { /** * We failed to add the DCB to a worker. Revert the state so that it - * will be treated as a DCB in the correct state. As this will involve - * cleanup, ensure that the current thread is the owner, as otherwise - * debug asserts will be triggered. + * will be treated as a DCB in the correct state. */ dcb->state = old_state; - dcb->owner = RoutingWorker::get_current(); rc = -1; } diff --git a/server/core/listener.cc b/server/core/listener.cc index 3f9eab14c..53edaac57 100644 --- a/server/core/listener.cc +++ b/server/core/listener.cc @@ -65,7 +65,11 @@ Listener::Listener(SERVICE* service, const std::string& name, const std::string& , m_proto_func(*(MXS_PROTOCOL*)load_module(protocol.c_str(), MODULE_PROTOCOL)) , m_auth_func(*(MXS_AUTHENTICATOR*)load_module(authenticator.c_str(), MODULE_AUTHENTICATOR)) { - if (m_address[0] == '/') + if (strcasecmp(service->router_name(), "cli") == 0 || strcasecmp(service->router_name(), "maxinfo") == 0) + { + m_type = Type::MAIN_WORKER; + } + else if (m_address[0] == '/') { m_type = Type::UNIX_SOCKET; } @@ -1160,7 +1164,10 @@ void Listener::accept_connections() } else { - auto worker = mxs::RoutingWorker::pick_worker(); + auto worker = type() == Type::MAIN_WORKER ? + mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN) : + mxs::RoutingWorker::pick_worker(); + worker->execute([this, conn]() { if (DCB* dcb = accept_one_dcb(conn.fd, &conn.addr, conn.host)) { diff --git a/server/core/routingworker.cc b/server/core/routingworker.cc index dac4a2ac8..93b2319ac 100644 --- a/server/core/routingworker.cc +++ b/server/core/routingworker.cc @@ -412,6 +412,7 @@ bool RoutingWorker::add_shared_fd(int fd, uint32_t events, MXB_POLL_DATA* pData) ev.events = events; ev.data.ptr = pData; + // The main worker takes ownership of all shared fds pData->owner = RoutingWorker::get(RoutingWorker::MAIN); if (epoll_ctl(this_unit.epoll_listener_fd, EPOLL_CTL_ADD, fd, &ev) != 0) diff --git a/server/core/session.cc b/server/core/session.cc index 1bf883f95..52e61c9b6 100644 --- a/server/core/session.cc +++ b/server/core/session.cc @@ -169,12 +169,11 @@ bool session_start(MXS_SESSION* session) void session_link_backend_dcb(MXS_SESSION* session, DCB* dcb) { + mxb_assert(dcb->owner == session->client_dcb->owner); mxb_assert(dcb->role == DCB::Role::BACKEND); mxb::atomic::add(&session->refcount, 1); dcb->session = session; - /** Move this DCB under the same thread */ - dcb->owner = session->client_dcb->owner; Session* ses = static_cast(session); ses->link_backend_dcb(dcb); diff --git a/server/modules/routing/binlogrouter/blr_master.cc b/server/modules/routing/binlogrouter/blr_master.cc index 558f062a2..3560c22a6 100644 --- a/server/modules/routing/binlogrouter/blr_master.cc +++ b/server/modules/routing/binlogrouter/blr_master.cc @@ -226,11 +226,7 @@ static void blr_start_master(void* data) return; } - /** - * 'client' is the fake DCB that emulates a client session: - * we need to set the poll.thread.id for the "dummy client" - */ - router->client->owner = mxs_rworker_get_current(); + mxb_assert(router->client->owner == mxs_rworker_get_current()); /* Connect to configured master server */ if ((router->master = dcb_connect(router->service->dbref->server,