diff --git a/include/maxscale/poll.h b/include/maxscale/poll.h index 315423cb2..28e9451ba 100644 --- a/include/maxscale/poll.h +++ b/include/maxscale/poll.h @@ -52,6 +52,11 @@ typedef enum POLL_STAT_MAX_EXECTIME } POLL_STAT; +enum poll_message +{ + POLL_MSG_CLEAN_PERSISTENT = 0x01 +}; + extern void poll_init(); extern int poll_add_dcb(DCB *); extern int poll_remove_dcb(DCB *); @@ -71,5 +76,6 @@ extern void poll_fake_event(DCB *dcb, enum EPOLL_EVENTS ev); extern void poll_fake_hangup_event(DCB *dcb); extern void poll_fake_write_event(DCB *dcb); extern void poll_fake_read_event(DCB *dcb); +extern void poll_send_message(enum poll_message msg, void *data); MXS_END_DECLS diff --git a/server/core/dcb.c b/server/core/dcb.c index 7f1274ea5..f7f307f67 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -1687,7 +1687,6 @@ dcb_close(DCB *dcb) static bool dcb_maybe_add_persistent(DCB *dcb) { - int poolcount = -1; if (dcb->user != NULL && strlen(dcb->user) && dcb->server @@ -1695,7 +1694,8 @@ dcb_maybe_add_persistent(DCB *dcb) && (dcb->server->status & SERVER_RUNNING) && !dcb->dcb_errhandle_called && !(dcb->flags & DCBF_HUNG) - && (poolcount = dcb_persistent_clean_count(dcb, dcb->thread.id, false)) < dcb->server->persistpoolmax) + && dcb_persistent_clean_count(dcb, dcb->thread.id, false) < dcb->server->persistpoolmax + && dcb->server->stats.n_persistent < dcb->server->persistpoolmax) { DCB_CALLBACK *loopcallback; MXS_DEBUG("%lu [dcb_maybe_add_persistent] Adding DCB to persistent pool, user %s.\n", @@ -1742,7 +1742,7 @@ dcb_maybe_add_persistent(DCB *dcb) dcb->dcb_errhandle_called ? "true" : "false", (dcb->flags & DCBF_HUNG) ? "true" : "false", dcb->server ? dcb->server->status : 0, - poolcount); + dcb->server->stats.n_persistent); } return false; } @@ -2573,6 +2573,7 @@ dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf) * Check persistent pool for expiry or excess size and count * * @param dcb The DCB being closed. + * @param id Thread ID * @param cleanall Boolean, if true the whole pool is cleared for the * server related to the given DCB * @return A count of the DCBs remaining in the pool diff --git a/server/core/poll.c b/server/core/poll.c index 9a5e65772..764b47a47 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -33,6 +33,9 @@ #include #include #include +#include +#include +#include #define PROFILE_POLL 0 @@ -91,12 +94,19 @@ typedef struct fake_event struct fake_event *next; /*< The next event */ } fake_event_t; +thread_local int thread_id; /**< This thread's ID */ static int *epoll_fd; /*< The epoll file descriptor */ static int next_epoll_fd = 0; /*< Which thread handles the next DCB */ static fake_event_t **fake_events; /*< Thread-specific fake event queue */ static SPINLOCK *fake_event_lock; static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */ static GWBITMASK poll_mask; + +/** Poll cross-thread messaging variables */ +static int *poll_msg; +static void *poll_msg_data = NULL; +static SPINLOCK poll_msg_lock = SPINLOCK_INIT; + #if MUTEX_EPOLL static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */ #endif @@ -105,6 +115,7 @@ static int n_waiting = 0; /*< No. of threads in epoll_wait */ static int process_pollq(int thread_id, struct epoll_event *event); static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, uint32_t ev); static bool poll_dcb_session_check(DCB *dcb, const char *); +static void poll_check_message(void); DCB *eventq = NULL; SPINLOCK pollqlock = SPINLOCK_INIT; @@ -246,6 +257,11 @@ poll_init() exit(-1); } + if ((poll_msg = MXS_CALLOC(n_threads, sizeof(int))) == NULL) + { + exit(-1); + } + for (int i = 0; i < n_threads; i++) { spinlock_init(&fake_event_lock[i]); @@ -661,7 +677,7 @@ poll_waitevents(void *arg) { struct epoll_event events[MAX_EVENTS]; int i, nfds, timeout_bias = 1; - intptr_t thread_id = (intptr_t)arg; + thread_id = (intptr_t)arg; int poll_spins = 0; /** Add this thread to the bitmask of running polling threads */ @@ -816,6 +832,8 @@ poll_waitevents(void *arg) /** Process closed DCBs */ dcb_process_zombies(thread_id); + poll_check_message(); + if (thread_data) { thread_data[thread_id].state = THREAD_IDLE; @@ -1729,3 +1747,48 @@ eventTimesGetList() return set; } + +void poll_send_message(enum poll_message msg, void *data) +{ + spinlock_acquire(&poll_msg_lock); + int nthr = config_threadcount(); + poll_msg_data = data; + + for (int i = 0; i < nthr; i++) + { + if (i != thread_id) + { + /** Synchronize writes to poll_msg */ + atomic_synchronize(); + } + poll_msg[i] |= msg; + } + + /** Handle this thread's message */ + poll_check_message(); + + for (int i = 0; i < nthr; i++) + { + if (i != thread_id) + { + while (poll_msg[i] & msg) + { + thread_millisleep(1); + } + } + } + + poll_msg_data = NULL; + spinlock_release(&poll_msg_lock); +} + +static void poll_check_message() +{ + if (poll_msg[thread_id] & POLL_MSG_CLEAN_PERSISTENT) + { + SERVER *server = (SERVER*)poll_msg_data; + dcb_persistent_clean_count(server->persistent[thread_id], thread_id, false); + atomic_synchronize(); + poll_msg[thread_id] &= ~POLL_MSG_CLEAN_PERSISTENT; + } +} diff --git a/server/core/server.c b/server/core/server.c index 664b54d67..3daf47179 100644 --- a/server/core/server.c +++ b/server/core/server.c @@ -553,6 +553,7 @@ dprintServer(DCB *dcb, SERVER *server) if (server->persistpoolmax) { dcb_printf(dcb, "\tPersistent pool size: %d\n", server->stats.n_persistent); + poll_send_message(POLL_MSG_CLEAN_PERSISTENT, server); dcb_printf(dcb, "\tPersistent measured pool size: %d\n", server->stats.n_persistent); dcb_printf(dcb, "\tPersistent actual size max: %d\n", server->persistmax); dcb_printf(dcb, "\tPersistent pool size limit: %ld\n", server->persistpoolmax);