Addition of new tuning parameters for epoll spins and wait time

This commit is contained in:
Mark Riddoch
2014-10-17 12:28:27 +01:00
parent 2edab598b2
commit e729a17fd4
6 changed files with 162 additions and 17 deletions

View File

@ -1106,6 +1106,31 @@ config_threadcount()
return gateway.n_threads; return gateway.n_threads;
} }
/**
* Return the number of non-blocking polls to be done before a blocking poll
* is issued.
*
* @return The number of blocking poll calls to make before a blocking call
*/
unsigned int
config_nbpolls()
{
return gateway.n_nbpoll;
}
/**
* Return the configured number of milliseconds for which we wait when we do
* a blocking poll call.
*
* @return The number of milliseconds to sleep in a blocking poll call
*/
unsigned int
config_pollsleep()
{
return gateway.pollsleep;
}
static struct { static struct {
char *logname; char *logname;
logfile_id_t logfile; logfile_id_t logfile;
@ -1126,9 +1151,20 @@ static int
handle_global_item(const char *name, const char *value) handle_global_item(const char *name, const char *value)
{ {
int i; int i;
if (strcmp(name, "threads") == 0) { if (strcmp(name, "threads") == 0)
{
gateway.n_threads = atoi(value); gateway.n_threads = atoi(value);
} else { }
else if (strcmp(name, "non_blocking_polls") == 0)
{
gateway.n_nbpoll = atoi(value);
}
else if (strcmp(name, "poll_sleep") == 0)
{
gateway.pollsleep = atoi(value);
}
else
{
for (i = 0; lognames[i].logname; i++) for (i = 0; lognames[i].logname; i++)
{ {
if (strcasecmp(name, lognames[i].logname) == 0) if (strcasecmp(name, lognames[i].logname) == 0)
@ -1150,6 +1186,8 @@ static void
global_defaults() global_defaults()
{ {
gateway.n_threads = 1; gateway.n_threads = 1;
gateway.n_nbpoll = DEFAULT_NBPOLLS;
gateway.pollsleep = DEFAULT_POLLSLEEP;
if (version_string != NULL) if (version_string != NULL)
gateway.version_string = strdup(version_string); gateway.version_string = strdup(version_string);
else else

View File

@ -44,6 +44,9 @@ MEMLOG *plog;
extern int lm_enabled_logfiles_bitmask; extern int lm_enabled_logfiles_bitmask;
int number_poll_spins;
int max_poll_sleep;
/** /**
* @file poll.c - Abstraction of the epoll functionality * @file poll.c - Abstraction of the epoll functionality
* *
@ -70,7 +73,7 @@ extern int lm_enabled_logfiles_bitmask;
/** /**
* Control the use of mutexes for the epoll_wait call. Setting to 1 will * Control the use of mutexes for the epoll_wait call. Setting to 1 will
* cause the epoll_wait calls to be moved under a mutex. This may be useful * cause the epoll_wait calls to be moved under a mutex. This may be useful
* for debuggign purposes but should be avoided in general use. * for debugging purposes but should be avoided in general use.
*/ */
#define MUTEX_EPOLL 0 #define MUTEX_EPOLL 0
@ -151,6 +154,7 @@ static struct {
int evq_pending; /*< Number of pending descriptors in event queue */ int evq_pending; /*< Number of pending descriptors in event queue */
int evq_max; /*< Maximum event queue length */ int evq_max; /*< Maximum event queue length */
int wake_evqpending;/*< Woken from epoll_wait with pending events in queue */ int wake_evqpending;/*< Woken from epoll_wait with pending events in queue */
int blockingpolls; /*< Number of epoll_waits with a timeout specified */
} pollStats; } pollStats;
/** /**
@ -204,6 +208,9 @@ int i;
for (i = 0; i < n_avg_samples; i++) for (i = 0; i < n_avg_samples; i++)
evqp_samples[i] = 0.0; evqp_samples[i] = 0.0;
number_poll_spins = config_nbpolls();
max_poll_sleep = config_pollsleep();
#if PROFILE_POLL #if PROFILE_POLL
plog = memlog_create("EventQueueWaitTime", ML_LONG, 10000); plog = memlog_create("EventQueueWaitTime", ML_LONG, 10000);
#endif #endif
@ -370,7 +377,7 @@ return_rc:
* deschedule a process if a timeout is included, but will not do this if a 0 timeout * deschedule a process if a timeout is included, but will not do this if a 0 timeout
* value is given. this improves performance when the gateway is under heavy load. * value is given. this improves performance when the gateway is under heavy load.
* *
* In order to provide a fairer means of sharign the threads between the different * In order to provide a fairer means of sharing the threads between the different
* DCB's the poll mechanism has been decoupled from the processing of the events. * DCB's the poll mechanism has been decoupled from the processing of the events.
* The events are now recieved via the epoll_wait call, a queue of DCB's that have * The events are now recieved via the epoll_wait call, a queue of DCB's that have
* events pending is maintained and as new events arrive the DCB is added to the end * events pending is maintained and as new events arrive the DCB is added to the end
@ -383,7 +390,7 @@ return_rc:
* *
* The introduction of the ability to inject "fake" write events into the event queue meant * The introduction of the ability to inject "fake" write events into the event queue meant
* that there was a possibility to "starve" new events sicne the polling loop would * that there was a possibility to "starve" new events sicne the polling loop would
* consume the event queue before lookign for new events. If the DCB that inject * consume the event queue before looking for new events. If the DCB that inject
* the fake event then injected another fake event as a result of the first it meant * the fake event then injected another fake event as a result of the first it meant
* that new events did not get added to the queue. The strategy has been updated to * that new events did not get added to the queue. The strategy has been updated to
* not consume the entire event queue, but process one event before doing a non-blocking * not consume the entire event queue, but process one event before doing a non-blocking
@ -407,6 +414,7 @@ struct epoll_event events[MAX_EVENTS];
int i, nfds, timeout_bias = 1; int i, nfds, timeout_bias = 1;
int thread_id = (int)arg; int thread_id = (int)arg;
DCB *zombies = NULL; DCB *zombies = NULL;
int poll_spins = 0;
/** Add this thread to the bitmask of running polling threads */ /** Add this thread to the bitmask of running polling threads */
bitmask_set(&poll_mask, thread_id); bitmask_set(&poll_mask, thread_id);
@ -460,14 +468,18 @@ DCB *zombies = NULL;
* We calculate a timeout bias to alter the length of the blocking * We calculate a timeout bias to alter the length of the blocking
* call based on the time since we last received an event to process * call based on the time since we last received an event to process
*/ */
else if (nfds == 0 && pollStats.evq_pending > 0) else if (nfds == 0 && pollStats.evq_pending == 0 && poll_spins++ > number_poll_spins)
{ {
atomic_add(&pollStats.blockingpolls, 1);
nfds = epoll_wait(epoll_fd, nfds = epoll_wait(epoll_fd,
events, events,
MAX_EVENTS, MAX_EVENTS,
(EPOLL_TIMEOUT * timeout_bias) / 10); (max_poll_sleep * timeout_bias) / 10);
if (nfds == 0 && pollStats.evq_pending) if (nfds == 0 && pollStats.evq_pending)
{
atomic_add(&pollStats.wake_evqpending, 1); atomic_add(&pollStats.wake_evqpending, 1);
poll_spins = 0;
}
} }
else else
{ {
@ -483,6 +495,7 @@ DCB *zombies = NULL;
if (nfds > 0) if (nfds > 0)
{ {
timeout_bias = 1; timeout_bias = 1;
poll_spins = 0;
LOGIF(LD, (skygw_log_write( LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG, LOGFILE_DEBUG,
"%lu [poll_waitevents] epoll_wait found %d fds", "%lu [poll_waitevents] epoll_wait found %d fds",
@ -598,6 +611,34 @@ DCB *zombies = NULL;
} /*< while(1) */ } /*< while(1) */
} }
/**
* Set the number of non-blocking poll cycles that will be done before
* a blocking poll will take place. Whenever an event arrives on a thread
* or the thread sees a pending event to execute it will reset it's
* poll_spin coutn to zero and will then poll with a 0 timeout until the
* poll_spin value is greater than the value set here.
*
* @param nbpolls Number of non-block polls to perform before blocking
*/
void
poll_set_nonblocking_polls(unsigned int nbpolls)
{
number_poll_spins = nbpolls;
}
/**
* Set the maximum amount of time, in milliseconds, the polling thread
* will block before it will wake and check the event queue for work
* that may have been added by another thread.
*
* @param maxwait Maximum wait time in milliseconds
*/
void
poll_set_maxwait(unsigned int maxwait)
{
max_poll_sleep = maxwait;
}
/** /**
* Process of the queue of DCB's that have outstanding events * Process of the queue of DCB's that have outstanding events
* *
@ -654,6 +695,7 @@ uint32_t ev;
if (found) if (found)
{ {
ev = dcb->evq.pending_events; ev = dcb->evq.pending_events;
dcb->evq.processing_events = ev;
dcb->evq.pending_events = 0; dcb->evq.pending_events = 0;
pollStats.evq_pending--; pollStats.evq_pending--;
} }
@ -869,6 +911,8 @@ uint32_t ev;
#endif #endif
spinlock_acquire(&pollqlock); spinlock_acquire(&pollqlock);
dcb->evq.processing_events = 0;
if (dcb->evq.pending_events == 0) if (dcb->evq.pending_events == 0)
{ {
/* No pending events so remove from the queue */ /* No pending events so remove from the queue */
@ -963,6 +1007,8 @@ int i;
dcb_printf(dcb, "Number of epoll cycles: %d\n", dcb_printf(dcb, "Number of epoll cycles: %d\n",
pollStats.n_polls); pollStats.n_polls);
dcb_printf(dcb, "Number of epoll cycles with wait: %d\n",
pollStats.blockingpolls);
dcb_printf(dcb, "Number of read events: %d\n", dcb_printf(dcb, "Number of read events: %d\n",
pollStats.n_read); pollStats.n_read);
dcb_printf(dcb, "Number of write events: %d\n", dcb_printf(dcb, "Number of write events: %d\n",
@ -1220,6 +1266,8 @@ uint32_t ev = EPOLLOUT;
if (DCB_POLL_BUSY(dcb)) if (DCB_POLL_BUSY(dcb))
{ {
if (dcb->evq.pending_events == 0)
pollStats.evq_pending++;
dcb->evq.pending_events |= ev; dcb->evq.pending_events |= ev;
} }
else else
@ -1271,11 +1319,13 @@ uint32_t ev;
return 0; return 0;
} }
dcb = eventq; dcb = eventq;
dcb_printf(pdcb, "%16s | %10s | %s\n", "DCB", "Status", "Events"); dcb_printf(pdcb, "%-16s | %-10s | %-18s | %s\n", "DCB", "Status", "Processing Events",
dcb_printf(pdcb, "-----------------+------------+--------------------\n"); "Pending Events");
dcb_printf(pdcb, "-----------------+------------+--------------------+-------------------\n");
do { do {
dcb_printf(pdcb, "%16p | %10s | %s\n", dcb, dcb_printf(pdcb, "%-16p | %-10s | %-18s | %-18s\n", dcb,
dcb->evq.processing ? "Processing" : "Pending", dcb->evq.processing ? "Processing" : "Pending",
event_to_string(dcb->evq.processing_events),
event_to_string(dcb->evq.pending_events)); event_to_string(dcb->evq.pending_events));
dcb = dcb->evq.next; dcb = dcb->evq.next;
} while (dcb != eventq); } while (dcb != eventq);

View File

@ -29,10 +29,13 @@
* 21/06/13 Mark Riddoch Initial implementation * 21/06/13 Mark Riddoch Initial implementation
* 07/05/14 Massimiliano Pinto Added version_string to global configuration * 07/05/14 Massimiliano Pinto Added version_string to global configuration
* 23/05/14 Massimiliano Pinto Added id to global configuration * 23/05/14 Massimiliano Pinto Added id to global configuration
* 17/10/14 Mark Riddoch Added poll tuning configuration parameters
* *
* @endverbatim * @endverbatim
*/ */
#define DEFAULT_NBPOLLS 3 /**< Default number of non block polls before we block */
#define DEFAULT_POLLSLEEP 1000 /**< Default poll wait time (milliseconds) */
/** /**
* Maximum length for configuration parameter value. * Maximum length for configuration parameter value.
*/ */
@ -92,11 +95,15 @@ typedef struct {
int n_threads; /**< Number of polling threads */ int n_threads; /**< Number of polling threads */
char *version_string; /**< The version string of embedded database library */ char *version_string; /**< The version string of embedded database library */
unsigned long id; /**< MaxScale ID */ unsigned long id; /**< MaxScale ID */
unsigned int n_nbpoll; /**< Tune number of non-blocking polls */
unsigned int pollsleep; /**< Wait time in blocking polls */
} GATEWAY_CONF; } GATEWAY_CONF;
extern int config_load(char *); extern int config_load(char *);
extern int config_reload(); extern int config_reload();
extern int config_threadcount(); extern int config_threadcount();
extern unsigned int config_nbpolls();
extern unsigned int config_pollsleep();
CONFIG_PARAMETER* config_get_param(CONFIG_PARAMETER* params, const char* name); CONFIG_PARAMETER* config_get_param(CONFIG_PARAMETER* params, const char* name);
config_param_type_t config_get_paramtype(CONFIG_PARAMETER* param); config_param_type_t config_get_paramtype(CONFIG_PARAMETER* param);
CONFIG_PARAMETER* config_clone_param(CONFIG_PARAMETER* param); CONFIG_PARAMETER* config_clone_param(CONFIG_PARAMETER* param);

View File

@ -105,6 +105,7 @@ typedef struct gw_protocol {
* next The next DCB in the event queue * next The next DCB in the event queue
* prev The previous DCB in the event queue * prev The previous DCB in the event queue
* pending_events The events that are pending processing * pending_events The events that are pending processing
* processing_events The evets currently being processed
* processing Flag to indicate the processing status of the DCB * processing Flag to indicate the processing status of the DCB
* eventqlock Spinlock to protect this structure * eventqlock Spinlock to protect this structure
* inserted Insertion time for logging purposes * inserted Insertion time for logging purposes
@ -113,6 +114,7 @@ typedef struct {
struct dcb *next; struct dcb *next;
struct dcb *prev; struct dcb *prev;
uint32_t pending_events; uint32_t pending_events;
uint32_t processing_events;
int processing; int processing;
SPINLOCK eventqlock; SPINLOCK eventqlock;
unsigned long inserted; unsigned long inserted;

View File

@ -32,7 +32,6 @@
* @endverbatim * @endverbatim
*/ */
#define MAX_EVENTS 1000 #define MAX_EVENTS 1000
#define EPOLL_TIMEOUT 1000 /**< The epoll timeout in milliseconds */
extern void poll_init(); extern void poll_init();
extern int poll_add_dcb(DCB *); extern int poll_add_dcb(DCB *);
@ -40,6 +39,8 @@ extern int poll_remove_dcb(DCB *);
extern void poll_waitevents(void *); extern void poll_waitevents(void *);
extern void poll_shutdown(); extern void poll_shutdown();
extern GWBITMASK *poll_bitmask(); extern GWBITMASK *poll_bitmask();
extern void poll_set_maxwait(unsigned int);
extern void poll_set_nonblocking_polls(unsigned int);
extern void dprintPollStats(DCB *); extern void dprintPollStats(DCB *);
extern void dShowThreads(DCB *dcb); extern void dShowThreads(DCB *dcb);
extern void dShowEventQ(DCB *dcb); extern void dShowEventQ(DCB *dcb);

View File

@ -66,6 +66,7 @@
#include <adminusers.h> #include <adminusers.h>
#include <monitor.h> #include <monitor.h>
#include <debugcli.h> #include <debugcli.h>
#include <poll.h>
#include <skygw_utils.h> #include <skygw_utils.h>
#include <log_manager.h> #include <log_manager.h>
@ -81,6 +82,7 @@
#define ARG_TYPE_DCB 7 #define ARG_TYPE_DCB 7
#define ARG_TYPE_MONITOR 8 #define ARG_TYPE_MONITOR 8
#define ARG_TYPE_FILTER 9 #define ARG_TYPE_FILTER 9
#define ARG_TYPE_NUMERIC 10
/** /**
* The subcommand structure * The subcommand structure
@ -286,6 +288,8 @@ struct subcommand restartoptions[] = {
}; };
static void set_server(DCB *dcb, SERVER *server, char *bit); static void set_server(DCB *dcb, SERVER *server, char *bit);
static void set_pollsleep(DCB *dcb, int);
static void set_nbpoll(DCB *dcb, int);
/** /**
* The subcommands of the set command * The subcommands of the set command
*/ */
@ -294,6 +298,15 @@ struct subcommand setoptions[] = {
"Set the status of a server. E.g. set server dbnode4 master", "Set the status of a server. E.g. set server dbnode4 master",
"Set the status of a server. E.g. set server 0x4838320 master", "Set the status of a server. E.g. set server 0x4838320 master",
{ARG_TYPE_SERVER, ARG_TYPE_STRING, 0} }, {ARG_TYPE_SERVER, ARG_TYPE_STRING, 0} },
{ "pollsleep", 1, set_pollsleep,
"Set the maximum poll sleep period in milliseconds",
"Set the maximum poll sleep period in milliseconds",
{ARG_TYPE_NUMERIC, 0, 0} },
{ "nbpolls", 1, set_nbpoll,
"Set the number of non-blocking polls",
"Set the number of non-blocking polls",
{ARG_TYPE_NUMERIC, 0, 0} },
{ NULL, 0, NULL, NULL, NULL, { NULL, 0, NULL, NULL, NULL,
{0, 0, 0} } {0, 0, 0} }
}; };
@ -577,6 +590,16 @@ SERVICE *service;
if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0) if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0)
rval = (unsigned long)filter_find(arg); rval = (unsigned long)filter_find(arg);
return rval; return rval;
case ARG_TYPE_NUMERIC:
{
int i;
for (i = 0; arg[i]; i++)
{
if (arg[i] < '0' || arg[i] > '9')
return 0;
}
return atoi(arg);
}
} }
return 0; return 0;
} }
@ -1117,6 +1140,30 @@ static void disable_log_action(DCB *dcb, char *arg1) {
skygw_log_disable(type); skygw_log_disable(type);
} }
/**
* Set the duration of the sleep passed to the poll wait
*
* @param dcb DCB for output
* @param sleeptime Sleep time in milliseconds
*/
static void
set_pollsleep(DCB *dcb, int sleeptime)
{
poll_set_maxwait(sleeptime);
}
/**
* Set the number of non-blockign spins to make
*
* @param dcb DCB for output
* @param nb Number of spins
*/
static void
set_nbpoll(DCB *dcb, int nb)
{
poll_set_nonblocking_polls(nb);
}
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
static void fail_backendfd(void) static void fail_backendfd(void)
{ {