Add a mechanism to synchronize persistent pool counts

The polling system now has a concept of messages. This can be used to send
a synchronous message to the polling system which waits for all threads to
process the message before returning.

Currently this is used to flush unused DCBs when server persistent
statistics are reported.
This commit is contained in:
Markus Makela
2016-11-26 08:19:36 +02:00
parent 8573df6ee7
commit 7cbbc6f8f7
4 changed files with 75 additions and 4 deletions

View File

@ -52,6 +52,11 @@ typedef enum
POLL_STAT_MAX_EXECTIME
} POLL_STAT;
enum poll_message
{
POLL_MSG_CLEAN_PERSISTENT = 0x01
};
extern void poll_init();
extern int poll_add_dcb(DCB *);
extern int poll_remove_dcb(DCB *);
@ -71,5 +76,6 @@ extern void poll_fake_event(DCB *dcb, enum EPOLL_EVENTS ev);
extern void poll_fake_hangup_event(DCB *dcb);
extern void poll_fake_write_event(DCB *dcb);
extern void poll_fake_read_event(DCB *dcb);
extern void poll_send_message(enum poll_message msg, void *data);
MXS_END_DECLS

View File

@ -1687,7 +1687,6 @@ dcb_close(DCB *dcb)
static bool
dcb_maybe_add_persistent(DCB *dcb)
{
int poolcount = -1;
if (dcb->user != NULL
&& strlen(dcb->user)
&& dcb->server
@ -1695,7 +1694,8 @@ dcb_maybe_add_persistent(DCB *dcb)
&& (dcb->server->status & SERVER_RUNNING)
&& !dcb->dcb_errhandle_called
&& !(dcb->flags & DCBF_HUNG)
&& (poolcount = dcb_persistent_clean_count(dcb, dcb->thread.id, false)) < dcb->server->persistpoolmax)
&& dcb_persistent_clean_count(dcb, dcb->thread.id, false) < dcb->server->persistpoolmax
&& dcb->server->stats.n_persistent < dcb->server->persistpoolmax)
{
DCB_CALLBACK *loopcallback;
MXS_DEBUG("%lu [dcb_maybe_add_persistent] Adding DCB to persistent pool, user %s.\n",
@ -1742,7 +1742,7 @@ dcb_maybe_add_persistent(DCB *dcb)
dcb->dcb_errhandle_called ? "true" : "false",
(dcb->flags & DCBF_HUNG) ? "true" : "false",
dcb->server ? dcb->server->status : 0,
poolcount);
dcb->server->stats.n_persistent);
}
return false;
}
@ -2573,6 +2573,7 @@ dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf)
* Check persistent pool for expiry or excess size and count
*
* @param dcb The DCB being closed.
* @param id Thread ID
* @param cleanall Boolean, if true the whole pool is cleared for the
* server related to the given DCB
* @return A count of the DCBs remaining in the pool

View File

@ -33,6 +33,9 @@
#include <maxscale/statistics.h>
#include <maxscale/query_classifier.h>
#include <maxscale/utils.h>
#include <maxscale/server.h>
#include <maxscale/thread.h>
#include <maxscale/platform.h>
#define PROFILE_POLL 0
@ -91,12 +94,19 @@ typedef struct fake_event
struct fake_event *next; /*< The next event */
} fake_event_t;
thread_local int thread_id; /**< This thread's ID */
static int *epoll_fd; /*< The epoll file descriptor */
static int next_epoll_fd = 0; /*< Which thread handles the next DCB */
static fake_event_t **fake_events; /*< Thread-specific fake event queue */
static SPINLOCK *fake_event_lock;
static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */
static GWBITMASK poll_mask;
/** Poll cross-thread messaging variables */
static int *poll_msg;
static void *poll_msg_data = NULL;
static SPINLOCK poll_msg_lock = SPINLOCK_INIT;
#if MUTEX_EPOLL
static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */
#endif
@ -105,6 +115,7 @@ static int n_waiting = 0; /*< No. of threads in epoll_wait */
static int process_pollq(int thread_id, struct epoll_event *event);
static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, uint32_t ev);
static bool poll_dcb_session_check(DCB *dcb, const char *);
static void poll_check_message(void);
DCB *eventq = NULL;
SPINLOCK pollqlock = SPINLOCK_INIT;
@ -246,6 +257,11 @@ poll_init()
exit(-1);
}
if ((poll_msg = MXS_CALLOC(n_threads, sizeof(int))) == NULL)
{
exit(-1);
}
for (int i = 0; i < n_threads; i++)
{
spinlock_init(&fake_event_lock[i]);
@ -661,7 +677,7 @@ poll_waitevents(void *arg)
{
struct epoll_event events[MAX_EVENTS];
int i, nfds, timeout_bias = 1;
intptr_t thread_id = (intptr_t)arg;
thread_id = (intptr_t)arg;
int poll_spins = 0;
/** Add this thread to the bitmask of running polling threads */
@ -816,6 +832,8 @@ poll_waitevents(void *arg)
/** Process closed DCBs */
dcb_process_zombies(thread_id);
poll_check_message();
if (thread_data)
{
thread_data[thread_id].state = THREAD_IDLE;
@ -1729,3 +1747,48 @@ eventTimesGetList()
return set;
}
void poll_send_message(enum poll_message msg, void *data)
{
spinlock_acquire(&poll_msg_lock);
int nthr = config_threadcount();
poll_msg_data = data;
for (int i = 0; i < nthr; i++)
{
if (i != thread_id)
{
/** Synchronize writes to poll_msg */
atomic_synchronize();
}
poll_msg[i] |= msg;
}
/** Handle this thread's message */
poll_check_message();
for (int i = 0; i < nthr; i++)
{
if (i != thread_id)
{
while (poll_msg[i] & msg)
{
thread_millisleep(1);
}
}
}
poll_msg_data = NULL;
spinlock_release(&poll_msg_lock);
}
static void poll_check_message()
{
if (poll_msg[thread_id] & POLL_MSG_CLEAN_PERSISTENT)
{
SERVER *server = (SERVER*)poll_msg_data;
dcb_persistent_clean_count(server->persistent[thread_id], thread_id, false);
atomic_synchronize();
poll_msg[thread_id] &= ~POLL_MSG_CLEAN_PERSISTENT;
}
}

View File

@ -553,6 +553,7 @@ dprintServer(DCB *dcb, SERVER *server)
if (server->persistpoolmax)
{
dcb_printf(dcb, "\tPersistent pool size: %d\n", server->stats.n_persistent);
poll_send_message(POLL_MSG_CLEAN_PERSISTENT, server);
dcb_printf(dcb, "\tPersistent measured pool size: %d\n", server->stats.n_persistent);
dcb_printf(dcb, "\tPersistent actual size max: %d\n", server->persistmax);
dcb_printf(dcb, "\tPersistent pool size limit: %ld\n", server->persistpoolmax);