Move DCB owner selection into Listener
The code that selects which worker to assign the DCB to is now completely in the Listener class. This removes the need to change the ownership of a DCB after it has been allocated.
This commit is contained in:
parent
510cae2fe0
commit
ea14331d18
@ -45,7 +45,8 @@ public:
|
||||
{
|
||||
UNIX_SOCKET, // UNIX domain socket shared between workers
|
||||
SHARED_TCP, // TCP listening socket shared between workers
|
||||
UNIQUE_TCP // Unique TCP listening socket for each worker
|
||||
UNIQUE_TCP, // Unique TCP listening socket for each worker
|
||||
MAIN_WORKER, // Listener that always moves the execution to the main worker
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -3068,68 +3068,24 @@ static bool dcb_add_to_worker(Worker* worker, DCB* dcb, uint32_t events)
|
||||
int poll_add_dcb(DCB* dcb)
|
||||
{
|
||||
dcb_sanity_check(dcb);
|
||||
|
||||
uint32_t events = poll_events;
|
||||
|
||||
/** Choose new state and worker thread ID according to the role of DCB. */
|
||||
dcb_state_t new_state;
|
||||
RoutingWorker* owner = nullptr;
|
||||
|
||||
if (dcb->role == DCB::Role::CLIENT)
|
||||
{
|
||||
if (strcasecmp(dcb->service->router_name(), "cli") == 0
|
||||
|| strcasecmp(dcb->service->router_name(), "maxinfo") == 0)
|
||||
{
|
||||
// If the DCB refers to an accepted maxadmin/maxinfo socket, we force it
|
||||
// to the main thread. That's done in order to prevent a deadlock
|
||||
// that may happen if there are multiple concurrent administrative calls,
|
||||
// handled by different worker threads.
|
||||
// See: https://jira.mariadb.org/browse/MXS-1805 and https://jira.mariadb.org/browse/MXS-1833
|
||||
owner = RoutingWorker::get(RoutingWorker::MAIN);
|
||||
}
|
||||
else if (dcb->state == DCB_STATE_NOPOLLING)
|
||||
{
|
||||
// This DCB was removed and added back to epoll. Assign it to the same worker it started with.
|
||||
owner = static_cast<RoutingWorker*>(dcb->owner);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Assign to current worker
|
||||
owner = RoutingWorker::get_current();
|
||||
}
|
||||
|
||||
new_state = DCB_STATE_POLLING;
|
||||
dcb->owner = owner;
|
||||
}
|
||||
else
|
||||
{
|
||||
mxb_assert(dcb->role == DCB::Role::BACKEND);
|
||||
mxb_assert(RoutingWorker::get_current_id() != -1);
|
||||
mxb_assert(RoutingWorker::get_current() == dcb->owner);
|
||||
|
||||
new_state = DCB_STATE_POLLING;
|
||||
owner = static_cast<RoutingWorker*>(dcb->owner);
|
||||
}
|
||||
int rc = 0;
|
||||
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->owner);
|
||||
mxb_assert(owner == RoutingWorker::get_current());
|
||||
|
||||
/**
|
||||
* Assign the new state before adding the DCB to the worker and store the
|
||||
* old state in case we need to revert it.
|
||||
*/
|
||||
dcb_state_t old_state = dcb->state;
|
||||
dcb->state = new_state;
|
||||
dcb->state = DCB_STATE_POLLING;
|
||||
|
||||
int rc = 0;
|
||||
|
||||
if (!dcb_add_to_worker(owner, dcb, events))
|
||||
if (!dcb_add_to_worker(owner, dcb, poll_events))
|
||||
{
|
||||
/**
|
||||
* We failed to add the DCB to a worker. Revert the state so that it
|
||||
* will be treated as a DCB in the correct state. As this will involve
|
||||
* cleanup, ensure that the current thread is the owner, as otherwise
|
||||
* debug asserts will be triggered.
|
||||
* will be treated as a DCB in the correct state.
|
||||
*/
|
||||
dcb->state = old_state;
|
||||
dcb->owner = RoutingWorker::get_current();
|
||||
rc = -1;
|
||||
}
|
||||
|
||||
|
@ -65,7 +65,11 @@ Listener::Listener(SERVICE* service, const std::string& name, const std::string&
|
||||
, m_proto_func(*(MXS_PROTOCOL*)load_module(protocol.c_str(), MODULE_PROTOCOL))
|
||||
, m_auth_func(*(MXS_AUTHENTICATOR*)load_module(authenticator.c_str(), MODULE_AUTHENTICATOR))
|
||||
{
|
||||
if (m_address[0] == '/')
|
||||
if (strcasecmp(service->router_name(), "cli") == 0 || strcasecmp(service->router_name(), "maxinfo") == 0)
|
||||
{
|
||||
m_type = Type::MAIN_WORKER;
|
||||
}
|
||||
else if (m_address[0] == '/')
|
||||
{
|
||||
m_type = Type::UNIX_SOCKET;
|
||||
}
|
||||
@ -1160,7 +1164,10 @@ void Listener::accept_connections()
|
||||
}
|
||||
else
|
||||
{
|
||||
auto worker = mxs::RoutingWorker::pick_worker();
|
||||
auto worker = type() == Type::MAIN_WORKER ?
|
||||
mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN) :
|
||||
mxs::RoutingWorker::pick_worker();
|
||||
|
||||
worker->execute([this, conn]() {
|
||||
if (DCB* dcb = accept_one_dcb(conn.fd, &conn.addr, conn.host))
|
||||
{
|
||||
|
@ -412,6 +412,7 @@ bool RoutingWorker::add_shared_fd(int fd, uint32_t events, MXB_POLL_DATA* pData)
|
||||
ev.events = events;
|
||||
ev.data.ptr = pData;
|
||||
|
||||
// The main worker takes ownership of all shared fds
|
||||
pData->owner = RoutingWorker::get(RoutingWorker::MAIN);
|
||||
|
||||
if (epoll_ctl(this_unit.epoll_listener_fd, EPOLL_CTL_ADD, fd, &ev) != 0)
|
||||
|
@ -169,12 +169,11 @@ bool session_start(MXS_SESSION* session)
|
||||
|
||||
void session_link_backend_dcb(MXS_SESSION* session, DCB* dcb)
|
||||
{
|
||||
mxb_assert(dcb->owner == session->client_dcb->owner);
|
||||
mxb_assert(dcb->role == DCB::Role::BACKEND);
|
||||
|
||||
mxb::atomic::add(&session->refcount, 1);
|
||||
dcb->session = session;
|
||||
/** Move this DCB under the same thread */
|
||||
dcb->owner = session->client_dcb->owner;
|
||||
|
||||
Session* ses = static_cast<Session*>(session);
|
||||
ses->link_backend_dcb(dcb);
|
||||
|
@ -226,11 +226,7 @@ static void blr_start_master(void* data)
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* 'client' is the fake DCB that emulates a client session:
|
||||
* we need to set the poll.thread.id for the "dummy client"
|
||||
*/
|
||||
router->client->owner = mxs_rworker_get_current();
|
||||
mxb_assert(router->client->owner == mxs_rworker_get_current());
|
||||
|
||||
/* Connect to configured master server */
|
||||
if ((router->master = dcb_connect(router->service->dbref->server,
|
||||
|
Loading…
x
Reference in New Issue
Block a user