diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 1ccabce27..a4ff2ac48 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -196,6 +196,18 @@ dcb_alloc(dcb_role_t role, SERV_LISTENER *listener) newdcb->listener = listener; newdcb->last_read = hkheartbeat; + if (role == DCB_ROLE_SERVICE_LISTENER) + { + /** All listeners are owned by the main thread (i.e. thread no. 0) */ + newdcb->poll.thread.id = 0; + } + else + { + /** Otherwise the DCB is owned by the thread that allocates it */ + ss_dassert(Worker::get_current_id() != -1); + newdcb->poll.thread.id = Worker::get_current_id(); + } + return newdcb; } @@ -3273,14 +3285,31 @@ dcb_session_check(DCB *dcb, const char *function) } } +/** DCB Sanity checks */ +static inline void dcb_sanity_check(DCB* dcb) +{ + CHK_DCB(dcb); + + if (dcb->state == DCB_STATE_DISCONNECTED || dcb->state == DCB_STATE_UNDEFINED) + { + MXS_ERROR("%lu [poll_add_dcb] Error : existing state of dcb %p " + "is %s, but this should be impossible, crashing.", + thread_self(), dcb, STRDCBSTATE(dcb->state)); + raise(SIGABRT); + } + else if (dcb->state == DCB_STATE_POLLING || dcb->state == DCB_STATE_LISTENING) + { + MXS_ERROR("%lu [poll_add_dcb] Error : existing state of dcb %p " + "is %s, but this is probably an error, not crashing.", + thread_self(), dcb, STRDCBSTATE(dcb->state)); + } +} + int poll_add_dcb(DCB *dcb) { - int rc = -1; - dcb_state_t old_state = dcb->state; - dcb_state_t new_state; - uint32_t events = 0; + dcb_sanity_check(dcb); - CHK_DCB(dcb); + uint32_t events = 0; #ifdef EPOLLRDHUP events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET; @@ -3288,14 +3317,11 @@ int poll_add_dcb(DCB *dcb) events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET; #endif - /*< - * Choose new state according to the role of dcb. - */ - if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER || dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER) - { - new_state = DCB_STATE_POLLING; - } - else + /** Choose new state and worker thread ID according to the role of DCB. */ + dcb_state_t new_state; + int worker_id = 0; + + if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER) { /** * Listeners are always added in level triggered mode. This will cause @@ -3303,72 +3329,45 @@ int poll_add_dcb(DCB *dcb) * to accept. */ events = EPOLLIN; - ss_dassert(dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER); new_state = DCB_STATE_LISTENING; - } - /* - * Check DCB current state seems sensible - */ - if (DCB_STATE_DISCONNECTED == dcb->state - || DCB_STATE_UNDEFINED == dcb->state) - { - MXS_ERROR("%lu [poll_add_dcb] Error : existing state of dcb %p " - "is %s, but this should be impossible, crashing.", - pthread_self(), - dcb, - STRDCBSTATE(dcb->state)); - raise(SIGABRT); - } - if (DCB_STATE_POLLING == dcb->state - || DCB_STATE_LISTENING == dcb->state) - { - MXS_ERROR("%lu [poll_add_dcb] Error : existing state of dcb %p " - "is %s, but this is probably an error, not crashing.", - pthread_self(), - dcb, - STRDCBSTATE(dcb->state)); - } - dcb->state = new_state; - - /* - * The only possible failure that will not cause a crash is - * running out of system resources. - */ - int worker_id = 0; - - if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER) - { worker_id = MXS_WORKER_ALL; } - else if (dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER) - { - ss_dassert(Worker::get_current_id() != -1); - - worker_id = dcb->session->client_dcb->poll.thread.id; - ss_dassert(worker_id == Worker::get_current_id()); - } else { + ss_dassert(dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER || + dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); ss_dassert(Worker::get_current_id() != -1); + ss_dassert(Worker::get_current_id() == dcb->poll.thread.id); - worker_id = Worker::get_current_id(); + new_state = DCB_STATE_POLLING; + worker_id = dcb->poll.thread.id; } + /** + * Assign the new state before adding the DCB to the worker and store the + * old state in case we need to revert it. + */ + dcb_state_t old_state = dcb->state; + dcb->state = new_state; + + int rc = 0; + if (poll_add_fd_to_worker(worker_id, dcb->fd, events, (MXS_POLL_DATA*)dcb)) { dcb_add_to_list(dcb); - MXS_DEBUG("%lu [poll_add_dcb] Added dcb %p in state %s to poll set.", - pthread_self(), - dcb, - STRDCBSTATE(dcb->state)); - rc = 0; + thread_self(), dcb, STRDCBSTATE(dcb->state)); } else { + /** + * We failed to add the DCB to a worker. Revert the state so that it + * will be treated as a DCB in the correct state. + */ dcb->state = old_state; rc = -1; } + return rc; }