MXS-2259: Fix double addition of DCBs to worker list
When poll_add_dcb was called for a DCB that once was polling system but was subsequently removed, the DCB would appear twice in the worker's list of DCBs. This caused a hang when the DCB was the last one in the worker's list and dcb_foreach_local would be called. To prevent the aforementioned problem, the DCBs are now added and removed directly to and from the workers instead of indirectly via poll_add_dcb and poll_remove_dcb.
This commit is contained in:
@ -65,6 +65,12 @@ using maxbase::Worker;
|
|||||||
#define DCB_EH_NOTICE(s, p)
|
#define DCB_EH_NOTICE(s, p)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#ifdef EPOLLRDHUP
|
||||||
|
constexpr uint32_t poll_events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET;
|
||||||
|
#else
|
||||||
|
constexpr uint32_t poll_events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET;
|
||||||
|
#endif
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -2826,6 +2832,7 @@ static void dcb_add_to_list(DCB* dcb)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
mxb_assert(this_unit.all_dcbs[id]->thread.tail->thread.next != dcb);
|
||||||
this_unit.all_dcbs[id]->thread.tail->thread.next = dcb;
|
this_unit.all_dcbs[id]->thread.tail->thread.next = dcb;
|
||||||
this_unit.all_dcbs[id]->thread.tail = dcb;
|
this_unit.all_dcbs[id]->thread.tail = dcb;
|
||||||
}
|
}
|
||||||
@ -2989,6 +2996,8 @@ void dcb_foreach_local(bool (* func)(DCB* dcb, void* data), void* data)
|
|||||||
|
|
||||||
for (DCB* dcb = this_unit.all_dcbs[thread_id]; dcb; dcb = dcb->thread.next)
|
for (DCB* dcb = this_unit.all_dcbs[thread_id]; dcb; dcb = dcb->thread.next)
|
||||||
{
|
{
|
||||||
|
mxb_assert(dcb->thread.next != dcb);
|
||||||
|
|
||||||
if (!func(dcb, data))
|
if (!func(dcb, data))
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
@ -3568,13 +3577,7 @@ int poll_add_dcb(DCB* dcb)
|
|||||||
{
|
{
|
||||||
dcb_sanity_check(dcb);
|
dcb_sanity_check(dcb);
|
||||||
|
|
||||||
uint32_t events = 0;
|
uint32_t events = poll_events;
|
||||||
|
|
||||||
#ifdef EPOLLRDHUP
|
|
||||||
events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET;
|
|
||||||
#else
|
|
||||||
events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/** Choose new state and worker thread ID according to the role of DCB. */
|
/** Choose new state and worker thread ID according to the role of DCB. */
|
||||||
dcb_state_t new_state;
|
dcb_state_t new_state;
|
||||||
@ -3726,13 +3729,23 @@ DCB* dcb_get_current()
|
|||||||
static int upstream_throttle_callback(DCB* dcb, DCB_REASON reason, void* userdata)
|
static int upstream_throttle_callback(DCB* dcb, DCB_REASON reason, void* userdata)
|
||||||
{
|
{
|
||||||
DCB* client_dcb = dcb->session->client_dcb;
|
DCB* client_dcb = dcb->session->client_dcb;
|
||||||
|
mxb::Worker* worker = static_cast<mxb::Worker*>(client_dcb->poll.owner);
|
||||||
|
|
||||||
|
// The fd is removed manually here due to the fact that poll_add_dcb causes the DCB to be added to the
|
||||||
|
// worker's list of DCBs but poll_remove_dcb doesn't remove it from it. This is due to the fact that the
|
||||||
|
// DCBs are only removed from the list when they are closed.
|
||||||
if (reason == DCB_REASON_HIGH_WATER)
|
if (reason == DCB_REASON_HIGH_WATER)
|
||||||
{
|
{
|
||||||
poll_remove_dcb(client_dcb);
|
MXS_INFO("High water mark hit for '%s'@'%s', not reading data until low water mark is hit",
|
||||||
|
client_dcb->user, client_dcb->remote);
|
||||||
|
worker->remove_fd(client_dcb->fd);
|
||||||
|
client_dcb->state = DCB_STATE_NOPOLLING;
|
||||||
}
|
}
|
||||||
else if (reason == DCB_REASON_LOW_WATER)
|
else if (reason == DCB_REASON_LOW_WATER)
|
||||||
{
|
{
|
||||||
poll_add_dcb(client_dcb);
|
MXS_INFO("Low water mark hit for '%s'@'%s', accepting new data", client_dcb->user, client_dcb->remote);
|
||||||
|
worker->add_fd(client_dcb->fd, poll_events, (MXB_POLL_DATA*)client_dcb);
|
||||||
|
client_dcb->state = DCB_STATE_POLLING;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
@ -3744,7 +3757,13 @@ bool backend_dcb_remove_func(DCB* dcb, void* data)
|
|||||||
|
|
||||||
if (dcb->session == session && dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER)
|
if (dcb->session == session && dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER)
|
||||||
{
|
{
|
||||||
poll_remove_dcb(dcb);
|
DCB* client_dcb = dcb->session->client_dcb;
|
||||||
|
MXS_INFO("High water mark hit for connection to '%s' from %s'@'%s', not reading data until low water "
|
||||||
|
"mark is hit", dcb->server->name, client_dcb->user, client_dcb->remote);
|
||||||
|
|
||||||
|
mxb::Worker* worker = static_cast<mxb::Worker*>(dcb->poll.owner);
|
||||||
|
worker->remove_fd(dcb->fd);
|
||||||
|
dcb->state = DCB_STATE_NOPOLLING;
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
@ -3756,7 +3775,13 @@ bool backend_dcb_add_func(DCB* dcb, void* data)
|
|||||||
|
|
||||||
if (dcb->session == session && dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER)
|
if (dcb->session == session && dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER)
|
||||||
{
|
{
|
||||||
poll_add_dcb(dcb);
|
DCB* client_dcb = dcb->session->client_dcb;
|
||||||
|
MXS_INFO("Low water mark hit for connection to '%s' from '%s'@'%s', accepting new data",
|
||||||
|
dcb->server->name, client_dcb->user, client_dcb->remote);
|
||||||
|
|
||||||
|
mxb::Worker* worker = static_cast<mxb::Worker*>(dcb->poll.owner);
|
||||||
|
worker->add_fd(dcb->fd, poll_events, (MXB_POLL_DATA*)dcb);
|
||||||
|
dcb->state = DCB_STATE_POLLING;
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|||||||
Reference in New Issue
Block a user