Addition of thread data to commands

This commit is contained in:
Mark Riddoch 2014-08-29 11:24:58 +01:00
parent cf55f271f0
commit 531dfd017c
5 changed files with 210 additions and 3 deletions

View File

@ -28,7 +28,7 @@
* and value and to free them.
*
* The hashtable is arrange as a set of linked lists, the number of linked
* lists beign the hashsize as requested by the user. Entries are hashed by
* lists being the hashsize as requested by the user. Entries are hashed by
* calling the hash function that is passed in by the user, this is used as
* an index into the array of linked lists, usign modulo hashsize.
*

View File

@ -28,6 +28,7 @@
#include <skygw_utils.h>
#include <log_manager.h>
#include <gw.h>
#include <config.h>
extern int lm_enabled_logfiles_bitmask;
@ -41,6 +42,8 @@ extern int lm_enabled_logfiles_bitmask;
* 19/06/13 Mark Riddoch Initial implementation
* 28/06/13 Mark Riddoch Added poll mask support and DCB
* zombie management
* 29/08/14 Mark Riddoch Addition of thread status data, load average
* etc.
*
* @endverbatim
*/
@ -51,6 +54,45 @@ static GWBITMASK poll_mask;
static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */
static int n_waiting = 0; /*< No. of threads in epoll_wait */
/**
* Thread load average, this is the average number of descriptors in each
* poll completion, a value of 1 or less is he ideal.
*/
static double load_average = 0.0;
static int load_samples = 0;
/* Thread statistics data */
static int n_threads; /*< No. of threads */
/**
* Internal MaxScale thread states
*/
typedef enum { THREAD_STOPPED, THREAD_IDLE,
THREAD_POLLING, THREAD_PROCESSING,
THREAD_ZPROCESSING } THREAD_STATE;
/**
* Thread data used to report the current state and activity related to
* a thread
*/
typedef struct {
THREAD_STATE state; /*< Current thread state */
int n_fds; /*< No. of descriptors thread is processing */
DCB *cur_dcb; /*< Current DCB being processed */
uint32_t event; /*< Current event being processed */
} THREAD_DATA;
static THREAD_DATA *thread_data = NULL; /*< Status of each thread */
/**
* The number of buckets used to gather statistics about how many
* descriptors where processed on each epoll completion.
*
* An array of wakeup counts is created, with the number of descriptors used
* to index that array. Each time a completion occurs the n_fds - 1 value is
* used to index this array and increment the count held there.
* If n_fds - 1 >= MAXFDS then the count at MAXFDS -1 is incremented.
*/
#define MAXNFDS 10
/**
@ -77,6 +119,8 @@ static struct {
void
poll_init()
{
int i;
if (epoll_fd != -1)
return;
if ((epoll_fd = epoll_create(MAX_EVENTS)) == -1)
@ -86,6 +130,15 @@ poll_init()
}
memset(&pollStats, 0, sizeof(pollStats));
bitmask_init(&poll_mask);
n_threads = config_threadcount();
if ((thread_data =
(THREAD_DATA *)malloc(n_threads * sizeof(THREAD_DATA))) != NULL)
{
for (i = 0; i < n_threads; i++)
{
thread_data[i].state = THREAD_STOPPED;
}
}
simple_mutex_init(&epoll_wait_mutex, "epoll_wait_mutex");
}
@ -260,6 +313,10 @@ DCB *zombies = NULL;
/* Add this thread to the bitmask of running polling threads */
bitmask_set(&poll_mask, thread_id);
if (thread_data)
{
thread_data[thread_id].state = THREAD_IDLE;
}
while (1)
{
@ -280,6 +337,10 @@ DCB *zombies = NULL;
#if 0
simple_mutex_lock(&epoll_wait_mutex, TRUE);
#endif
if (thread_data)
{
thread_data[thread_id].state = THREAD_POLLING;
}
if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1)
{
@ -339,8 +400,19 @@ DCB *zombies = NULL;
pthread_self(),
nfds)));
atomic_add(&pollStats.n_polls, 1);
if (thread_data)
{
thread_data[thread_id].n_fds = nfds;
thread_data[thread_id].cur_dcb = NULL;
thread_data[thread_id].event = 0;
thread_data[thread_id].state = THREAD_PROCESSING;
}
pollStats.n_fds[(nfds < MAXNFDS ? (nfds - 1) : MAXNFDS)]++;
pollStats.n_fds[(nfds < MAXNFDS ? (nfds - 1) : MAXNFDS - 1)]++;
load_average = (load_average * load_samples + nfds)
/ (load_samples + 1);
atomic_add(&load_samples, 1);
for (i = 0; i < nfds; i++)
{
@ -348,6 +420,11 @@ DCB *zombies = NULL;
__uint32_t ev = events[i].events;
CHK_DCB(dcb);
if (thread_data)
{
thread_data[thread_id].cur_dcb = dcb;
thread_data[thread_id].event = ev;
}
#if defined(SS_DEBUG)
if (dcb_fake_write_ev[dcb->fd] != 0) {
@ -531,6 +608,10 @@ DCB *zombies = NULL;
no_op = FALSE;
}
process_zombies:
if (thread_data)
{
thread_data[thread_id].state = THREAD_ZPROCESSING;
}
zombies = dcb_process_zombies(thread_id);
if (zombies == NULL) {
@ -543,9 +624,17 @@ DCB *zombies = NULL;
* Remove the thread from the bitmask of running
* polling threads.
*/
if (thread_data)
{
thread_data[thread_id].state = THREAD_STOPPED;
}
bitmask_clear(&poll_mask, thread_id);
return;
}
if (thread_data)
{
thread_data[thread_id].state = THREAD_IDLE;
}
} /*< while(1) */
}
@ -600,6 +689,114 @@ int i;
{
dcb_printf(dcb, "\t%2d\t\t\t%d\n", i + 1, pollStats.n_fds[i]);
}
dcb_printf(dcb, "\t> %d\t\t\t%d\n", MAXNFDS,
dcb_printf(dcb, "\t>= %d\t\t\t%d\n", MAXNFDS,
pollStats.n_fds[MAXNFDS-1]);
}
/**
* Convert an EPOLL event mask into a printable string
*
* @param event The event mask
* @return A string representation, the caller must free the string
*/
static char *
event_to_string(uint32_t event)
{
char *str;
str = malloc(22); // 22 is max returned string length
if (str == NULL)
return NULL;
*str = 0;
if (event & EPOLLIN)
{
strcat(str, "IN");
}
if (event & EPOLLOUT)
{
if (*str)
strcat(str, "|");
strcat(str, "OUT");
}
if (event & EPOLLERR)
{
if (*str)
strcat(str, "|");
strcat(str, "ERR");
}
if (event & EPOLLHUP)
{
if (*str)
strcat(str, "|");
strcat(str, "HUP");
}
if (event & EPOLLRDHUP)
{
if (*str)
strcat(str, "|");
strcat(str, "RDHUP");
}
return str;
}
/**
* Print the thread status for all the polling threads
*
* @param dcb The DCB to send the thread status data
*/
void
dShowThreads(DCB *dcb)
{
int i;
char *state;
dcb_printf(dcb, "Polling Threads.\n\n");
dcb_printf(dcb, "Thread Load Average: %.2f.\n", load_average);
if (thread_data == NULL)
return;
dcb_printf(dcb, " ID | State | # fds | Descriptor | Event\n");
dcb_printf(dcb, "----+------------+--------+------------------+---------------\n");
for (i = 0; i < n_threads; i++)
{
switch (thread_data[i].state)
{
case THREAD_STOPPED:
state = "Stopped";
break;
case THREAD_IDLE:
state = "Idle";
break;
case THREAD_POLLING:
state = "Polling";
break;
case THREAD_PROCESSING:
state = "Processing";
break;
case THREAD_ZPROCESSING:
state = "Collecting";
break;
}
if (thread_data[i].state != THREAD_PROCESSING)
dcb_printf(dcb,
" %2d | %-10s | | |\n",
i, state);
else if (thread_data[i].cur_dcb == NULL)
dcb_printf(dcb,
" %2d | %-10s | %6d | |\n",
i, state, thread_data[i].n_fds);
else
{
char *event_string
= event_to_string(thread_data[i].event);
if (event_string == NULL)
event_string = "??";
dcb_printf(dcb,
" %2d | %-10s | %6d | %-16p | %s\n",
i, state, thread_data[i].n_fds,
thread_data[i].cur_dcb, event_string);
free(event_string);
}
}
}

View File

@ -272,6 +272,7 @@ int fail_accept_errno;
#define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water)
void dcb_pollin(DCB *);
void dcb_pollout(DCB *);
DCB *dcb_get_zombies(void);
int gw_write(
#if defined(SS_DEBUG)

View File

@ -41,4 +41,5 @@ extern void poll_waitevents(void *);
extern void poll_shutdown();
extern GWBITMASK *poll_bitmask();
extern void dprintPollStats(DCB *);
extern void dShowThreads(DCB *dcb);
#endif

View File

@ -160,6 +160,10 @@ struct subcommand showoptions[] = {
"Show all active sessions in MaxScale",
"Show all active sessions in MaxScale",
{0, 0, 0} },
{ "threads", 0, dShowThreads,
"Show the status of the polling threads in MaxScale",
"Show the status of the polling threads in MaxScale",
{0, 0, 0} },
{ "users", 0, telnetdShowUsers,
"Show statistics and user names for the debug interface",
"Show statistics and user names for the debug interface",
@ -208,6 +212,10 @@ struct subcommand listoptions[] = {
"List all the active sessions within MaxScale",
"List all the active sessions within MaxScale",
{0, 0, 0} },
{ "threads", 0, dShowThreads,
"List the status of the polling threads in MaxScale",
"List the status of the polling threads in MaxScale",
{0, 0, 0} },
{ NULL, 0, NULL, NULL, NULL,
{0, 0, 0} }
};