diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 49b8e67b7..c592dec8c 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -65,6 +65,12 @@ using maxbase::Worker; #define DCB_EH_NOTICE(s, p) #endif +#ifdef EPOLLRDHUP +constexpr uint32_t poll_events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET; +#else +constexpr uint32_t poll_events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET; +#endif + namespace { @@ -2826,6 +2832,7 @@ static void dcb_add_to_list(DCB* dcb) } else { + mxb_assert(this_unit.all_dcbs[id]->thread.tail->thread.next != dcb); this_unit.all_dcbs[id]->thread.tail->thread.next = dcb; this_unit.all_dcbs[id]->thread.tail = dcb; } @@ -2989,6 +2996,8 @@ void dcb_foreach_local(bool (* func)(DCB* dcb, void* data), void* data) for (DCB* dcb = this_unit.all_dcbs[thread_id]; dcb; dcb = dcb->thread.next) { + mxb_assert(dcb->thread.next != dcb); + if (!func(dcb, data)) { break; @@ -3568,13 +3577,7 @@ int poll_add_dcb(DCB* dcb) { dcb_sanity_check(dcb); - uint32_t events = 0; - -#ifdef EPOLLRDHUP - events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET; -#else - events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET; -#endif + uint32_t events = poll_events; /** Choose new state and worker thread ID according to the role of DCB. */ dcb_state_t new_state; @@ -3726,13 +3729,23 @@ DCB* dcb_get_current() static int upstream_throttle_callback(DCB* dcb, DCB_REASON reason, void* userdata) { DCB* client_dcb = dcb->session->client_dcb; + mxb::Worker* worker = static_cast(client_dcb->poll.owner); + + // The fd is removed manually here due to the fact that poll_add_dcb causes the DCB to be added to the + // worker's list of DCBs but poll_remove_dcb doesn't remove it from it. This is due to the fact that the + // DCBs are only removed from the list when they are closed. if (reason == DCB_REASON_HIGH_WATER) { - poll_remove_dcb(client_dcb); + MXS_INFO("High water mark hit for '%s'@'%s', not reading data until low water mark is hit", + client_dcb->user, client_dcb->remote); + worker->remove_fd(client_dcb->fd); + client_dcb->state = DCB_STATE_NOPOLLING; } else if (reason == DCB_REASON_LOW_WATER) { - poll_add_dcb(client_dcb); + MXS_INFO("Low water mark hit for '%s'@'%s', accepting new data", client_dcb->user, client_dcb->remote); + worker->add_fd(client_dcb->fd, poll_events, (MXB_POLL_DATA*)client_dcb); + client_dcb->state = DCB_STATE_POLLING; } return 0; @@ -3744,7 +3757,13 @@ bool backend_dcb_remove_func(DCB* dcb, void* data) if (dcb->session == session && dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER) { - poll_remove_dcb(dcb); + DCB* client_dcb = dcb->session->client_dcb; + MXS_INFO("High water mark hit for connection to '%s' from %s'@'%s', not reading data until low water " + "mark is hit", dcb->server->name, client_dcb->user, client_dcb->remote); + + mxb::Worker* worker = static_cast(dcb->poll.owner); + worker->remove_fd(dcb->fd); + dcb->state = DCB_STATE_NOPOLLING; } return true; @@ -3756,7 +3775,13 @@ bool backend_dcb_add_func(DCB* dcb, void* data) if (dcb->session == session && dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER) { - poll_add_dcb(dcb); + DCB* client_dcb = dcb->session->client_dcb; + MXS_INFO("Low water mark hit for connection to '%s' from '%s'@'%s', accepting new data", + dcb->server->name, client_dcb->user, client_dcb->remote); + + mxb::Worker* worker = static_cast(dcb->poll.owner); + worker->add_fd(dcb->fd, poll_events, (MXB_POLL_DATA*)dcb); + dcb->state = DCB_STATE_POLLING; } return true;