diff --git a/server/core/poll.c b/server/core/poll.c index b77f59304..e0acd30b1 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -387,7 +387,29 @@ poll_add_dcb(DCB *dcb) dcb_add_to_list(dcb); - rc = epoll_ctl(epoll_fd[owner], EPOLL_CTL_ADD, dcb->fd, &ev); + if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER) + { + /** Listeners are added to all epoll instances */ + int nthr = config_threadcount(); + + for (int i = 0; i < nthr; i++) + { + if ((rc = epoll_ctl(epoll_fd[i], EPOLL_CTL_ADD, dcb->fd, &ev))) + { + /** Remove the listener from the previous epoll instances */ + for (int j = 0; j < i; j++) + { + epoll_ctl(epoll_fd[j], EPOLL_CTL_DEL, dcb->fd, &ev); + } + break; + } + } + } + else + { + rc = epoll_ctl(epoll_fd[owner], EPOLL_CTL_ADD, dcb->fd, &ev); + } + if (rc) { /* Some errors are actually considered acceptable */ @@ -455,7 +477,26 @@ poll_remove_dcb(DCB *dcb) if (dcbfd > 0) { - rc = epoll_ctl(epoll_fd[dcb->owner], EPOLL_CTL_DEL, dcbfd, &ev); + if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER) + { + /** Listeners are added to all epoll instances */ + int nthr = config_threadcount(); + + for (int i = 0; i < nthr; i++) + { + int tmp_rc = epoll_ctl(epoll_fd[i], EPOLL_CTL_DEL, dcb->fd, &ev); + if (tmp_rc) + { + /** Even if one of the instances failed to remove it, try + * to remove it from all the others */ + rc = tmp_rc; + } + } + } + else + { + rc = epoll_ctl(epoll_fd[dcb->owner], EPOLL_CTL_DEL, dcbfd, &ev); + } /** * The poll_resolve_error function will always * return 0 or crash. So if it returns non-zero result, @@ -844,7 +885,7 @@ process_pollq(int thread_id, struct epoll_event *event) unsigned long qtime; DCB *dcb = event->data.ptr; - ss_dassert(dcb->owner == thread_id); + ss_dassert(dcb->owner == thread_id || dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER); #if PROFILE_POLL memlog_log(plog, hkheartbeat - dcb->evq.inserted); #endif