From 7cbbc6f8f7ae535a3ab3b1015155a934b2938823 Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Sat, 26 Nov 2016 08:19:36 +0200 Subject: [PATCH] Add a mechanism to synchronize persistent pool counts The polling system now has a concept of messages. This can be used to send a synchronous message to the polling system which waits for all threads to process the message before returning. Currently this is used to flush unused DCBs when server persistent statistics are reported. --- include/maxscale/poll.h | 6 ++++ server/core/dcb.c | 7 +++-- server/core/poll.c | 65 ++++++++++++++++++++++++++++++++++++++++- server/core/server.c | 1 + 4 files changed, 75 insertions(+), 4 deletions(-) diff --git a/include/maxscale/poll.h b/include/maxscale/poll.h index 315423cb2..28e9451ba 100644 --- a/include/maxscale/poll.h +++ b/include/maxscale/poll.h @@ -52,6 +52,11 @@ typedef enum POLL_STAT_MAX_EXECTIME } POLL_STAT; +enum poll_message +{ + POLL_MSG_CLEAN_PERSISTENT = 0x01 +}; + extern void poll_init(); extern int poll_add_dcb(DCB *); extern int poll_remove_dcb(DCB *); @@ -71,5 +76,6 @@ extern void poll_fake_event(DCB *dcb, enum EPOLL_EVENTS ev); extern void poll_fake_hangup_event(DCB *dcb); extern void poll_fake_write_event(DCB *dcb); extern void poll_fake_read_event(DCB *dcb); +extern void poll_send_message(enum poll_message msg, void *data); MXS_END_DECLS diff --git a/server/core/dcb.c b/server/core/dcb.c index 7f1274ea5..f7f307f67 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -1687,7 +1687,6 @@ dcb_close(DCB *dcb) static bool dcb_maybe_add_persistent(DCB *dcb) { - int poolcount = -1; if (dcb->user != NULL && strlen(dcb->user) && dcb->server @@ -1695,7 +1694,8 @@ dcb_maybe_add_persistent(DCB *dcb) && (dcb->server->status & SERVER_RUNNING) && !dcb->dcb_errhandle_called && !(dcb->flags & DCBF_HUNG) - && (poolcount = dcb_persistent_clean_count(dcb, dcb->thread.id, false)) < dcb->server->persistpoolmax) + && dcb_persistent_clean_count(dcb, dcb->thread.id, false) < dcb->server->persistpoolmax + && dcb->server->stats.n_persistent < dcb->server->persistpoolmax) { DCB_CALLBACK *loopcallback; MXS_DEBUG("%lu [dcb_maybe_add_persistent] Adding DCB to persistent pool, user %s.\n", @@ -1742,7 +1742,7 @@ dcb_maybe_add_persistent(DCB *dcb) dcb->dcb_errhandle_called ? "true" : "false", (dcb->flags & DCBF_HUNG) ? "true" : "false", dcb->server ? dcb->server->status : 0, - poolcount); + dcb->server->stats.n_persistent); } return false; } @@ -2573,6 +2573,7 @@ dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf) * Check persistent pool for expiry or excess size and count * * @param dcb The DCB being closed. + * @param id Thread ID * @param cleanall Boolean, if true the whole pool is cleared for the * server related to the given DCB * @return A count of the DCBs remaining in the pool diff --git a/server/core/poll.c b/server/core/poll.c index 9a5e65772..764b47a47 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -33,6 +33,9 @@ #include #include #include +#include +#include +#include #define PROFILE_POLL 0 @@ -91,12 +94,19 @@ typedef struct fake_event struct fake_event *next; /*< The next event */ } fake_event_t; +thread_local int thread_id; /**< This thread's ID */ static int *epoll_fd; /*< The epoll file descriptor */ static int next_epoll_fd = 0; /*< Which thread handles the next DCB */ static fake_event_t **fake_events; /*< Thread-specific fake event queue */ static SPINLOCK *fake_event_lock; static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */ static GWBITMASK poll_mask; + +/** Poll cross-thread messaging variables */ +static int *poll_msg; +static void *poll_msg_data = NULL; +static SPINLOCK poll_msg_lock = SPINLOCK_INIT; + #if MUTEX_EPOLL static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */ #endif @@ -105,6 +115,7 @@ static int n_waiting = 0; /*< No. of threads in epoll_wait */ static int process_pollq(int thread_id, struct epoll_event *event); static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, uint32_t ev); static bool poll_dcb_session_check(DCB *dcb, const char *); +static void poll_check_message(void); DCB *eventq = NULL; SPINLOCK pollqlock = SPINLOCK_INIT; @@ -246,6 +257,11 @@ poll_init() exit(-1); } + if ((poll_msg = MXS_CALLOC(n_threads, sizeof(int))) == NULL) + { + exit(-1); + } + for (int i = 0; i < n_threads; i++) { spinlock_init(&fake_event_lock[i]); @@ -661,7 +677,7 @@ poll_waitevents(void *arg) { struct epoll_event events[MAX_EVENTS]; int i, nfds, timeout_bias = 1; - intptr_t thread_id = (intptr_t)arg; + thread_id = (intptr_t)arg; int poll_spins = 0; /** Add this thread to the bitmask of running polling threads */ @@ -816,6 +832,8 @@ poll_waitevents(void *arg) /** Process closed DCBs */ dcb_process_zombies(thread_id); + poll_check_message(); + if (thread_data) { thread_data[thread_id].state = THREAD_IDLE; @@ -1729,3 +1747,48 @@ eventTimesGetList() return set; } + +void poll_send_message(enum poll_message msg, void *data) +{ + spinlock_acquire(&poll_msg_lock); + int nthr = config_threadcount(); + poll_msg_data = data; + + for (int i = 0; i < nthr; i++) + { + if (i != thread_id) + { + /** Synchronize writes to poll_msg */ + atomic_synchronize(); + } + poll_msg[i] |= msg; + } + + /** Handle this thread's message */ + poll_check_message(); + + for (int i = 0; i < nthr; i++) + { + if (i != thread_id) + { + while (poll_msg[i] & msg) + { + thread_millisleep(1); + } + } + } + + poll_msg_data = NULL; + spinlock_release(&poll_msg_lock); +} + +static void poll_check_message() +{ + if (poll_msg[thread_id] & POLL_MSG_CLEAN_PERSISTENT) + { + SERVER *server = (SERVER*)poll_msg_data; + dcb_persistent_clean_count(server->persistent[thread_id], thread_id, false); + atomic_synchronize(); + poll_msg[thread_id] &= ~POLL_MSG_CLEAN_PERSISTENT; + } +} diff --git a/server/core/server.c b/server/core/server.c index 664b54d67..3daf47179 100644 --- a/server/core/server.c +++ b/server/core/server.c @@ -553,6 +553,7 @@ dprintServer(DCB *dcb, SERVER *server) if (server->persistpoolmax) { dcb_printf(dcb, "\tPersistent pool size: %d\n", server->stats.n_persistent); + poll_send_message(POLL_MSG_CLEAN_PERSISTENT, server); dcb_printf(dcb, "\tPersistent measured pool size: %d\n", server->stats.n_persistent); dcb_printf(dcb, "\tPersistent actual size max: %d\n", server->persistmax); dcb_printf(dcb, "\tPersistent pool size limit: %ld\n", server->persistpoolmax);