Files
MaxScale/server/core/poll.cc
Johan Wikman 8ea7d8898a MXS-1915 Remove id from mxs::Worker
The id has now been moved from mxs::Worker to mxs::RoutingWorker
and the implications are felt in many places.

The primary need for the id was to be able to access worker specfic
data, maintained outside of a routing worker, when given a worker
(the id is used to index into an array). Slightly related to that
was the need to be able to iterate over all workers. That obviously
implies some kind of collection.

That causes all sorts of issues if there is a need for being able
to create and destroy a worker at runtime. With the id removed from
mxs::Worker all those issues are gone, and its perfectly ok to create
and destory mxs::Workers as needed.

Further, while there is a need to broadcast a particular message to
all _routing_ workers, it hardly makes sense to broadcast a particular
message too _all_ workers. Consequently, only routing workers are kept
in a collection and all static member functions dealing with all
workers (e.g. broadcast) have now been moved to mxs::RoutingWorker.

Now, instead of passing the id around we instead deal directly
with the worker pointer. Later the data in all those external arrays
will be moved into mxs::[Worker|RoutingWorker] so that worker related
data is maintained in exactly one place.
2018-06-26 09:19:46 +03:00

324 lines
9.3 KiB
C++

/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2022-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
/**
* @file poll.c - Abstraction of the epoll functionality
*/
#include <maxscale/poll.h>
#include <errno.h>
#include <inttypes.h>
#include <signal.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <mysql.h>
#include <sys/epoll.h>
#include <maxscale/alloc.h>
#include <maxscale/atomic.h>
#include <maxscale/config.h>
#include <maxscale/clock.h>
#include <maxscale/platform.h>
#include <maxscale/server.h>
#include <maxscale/statistics.h>
#include "internal/poll.h"
#include "internal/routingworker.hh"
using maxscale::Worker;
using maxscale::RoutingWorker;
static int n_threads; /*< Number of threads */
/**
* Initialise the polling system we are using for the gateway.
*
* In this case we are using the Linux epoll mechanism
*/
void
poll_init()
{
n_threads = config_threadcount();
}
/**
* Set the number of non-blocking poll cycles that will be done before
* a blocking poll will take place. Whenever an event arrives on a thread
* or the thread sees a pending event to execute it will reset it's
* poll_spin coutn to zero and will then poll with a 0 timeout until the
* poll_spin value is greater than the value set here.
*
* @param nbpolls Number of non-block polls to perform before blocking
*/
void
poll_set_nonblocking_polls(unsigned int nbpolls)
{
RoutingWorker::set_nonblocking_polls(nbpolls);
}
/**
* Set the maximum amount of time, in milliseconds, the polling thread
* will block before it will wake and check the event queue for work
* that may have been added by another thread.
*
* @param maxwait Maximum wait time in milliseconds
*/
void
poll_set_maxwait(unsigned int maxwait)
{
RoutingWorker::set_maxwait(maxwait);
}
/**
* Display an entry from the spinlock statistics data
*
* @param dcb The DCB to print to
* @param desc Description of the statistic
* @param value The statistic value
*/
static void
spin_reporter(void *dcb, char *desc, int value)
{
dcb_printf((DCB *)dcb, "\t%-40s %d\n", desc, value);
}
/**
* Debug routine to print the polling statistics
*
* @param dcb DCB to print to
*/
void
dprintPollStats(DCB *dcb)
{
int i;
Worker::STATISTICS s = RoutingWorker::get_statistics();
dcb_printf(dcb, "\nPoll Statistics.\n\n");
dcb_printf(dcb, "No. of epoll cycles: %" PRId64 "\n", s.n_polls);
dcb_printf(dcb, "No. of epoll cycles with wait: %" PRId64 "\n", s.blockingpolls);
dcb_printf(dcb, "No. of epoll calls returning events: %" PRId64 "\n", s.n_pollev);
dcb_printf(dcb, "No. of non-blocking calls returning events: %" PRId64 "\n", s.n_nbpollev);
dcb_printf(dcb, "No. of read events: %" PRId64 "\n", s.n_read);
dcb_printf(dcb, "No. of write events: %" PRId64 "\n", s.n_write);
dcb_printf(dcb, "No. of error events: %" PRId64 "\n", s.n_error);
dcb_printf(dcb, "No. of hangup events: %" PRId64 "\n", s.n_hup);
dcb_printf(dcb, "No. of accept events: %" PRId64 "\n", s.n_accept);
dcb_printf(dcb, "Total event queue length: %" PRId64 "\n", s.evq_length);
dcb_printf(dcb, "Average event queue length: %" PRId64 "\n", s.evq_length);
dcb_printf(dcb, "Maximum event queue length: %" PRId64 "\n", s.evq_max);
dcb_printf(dcb, "No of poll completions with descriptors\n");
dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n");
for (i = 0; i < Worker::STATISTICS::MAXNFDS - 1; i++)
{
dcb_printf(dcb, "\t%2d\t\t\t%" PRId64 "\n", i + 1, s.n_fds[i]);
}
dcb_printf(dcb, "\t>= %d\t\t\t%" PRId64 "\n",
Worker::STATISTICS::MAXNFDS, s.n_fds[Worker::STATISTICS::MAXNFDS - 1]);
}
/**
* Print the thread status for all the polling threads
*
* @param dcb The DCB to send the thread status data
*/
void
dShowThreads(DCB *dcb)
{
dcb_printf(dcb, "Polling Threads.\n\n");
dcb_printf(dcb, " ID | State | #descriptors (curr) | #descriptors (tot) | Load (1s) | Load (1m) | Load (1h) |\n");
dcb_printf(dcb, "----+------------+---------------------+---------------------+-----------+-----------+-----------+\n");
for (int i = 0; i < n_threads; i++)
{
Worker* worker = RoutingWorker::get(i);
ss_dassert(worker);
const char *state = "Unknown";
switch (worker->state())
{
case Worker::STOPPED:
state = "Stopped";
break;
case Worker::IDLE:
state = "Idle";
break;
case Worker::POLLING:
state = "Polling";
break;
case Worker::PROCESSING:
state = "Processing";
break;
case Worker::ZPROCESSING:
state = "Collecting";
break;
default:
ss_dassert(!true);
}
uint32_t nCurrent;
uint64_t nTotal;
worker->get_descriptor_counts(&nCurrent, &nTotal);
dcb_printf(dcb, " %2d | %10s | %19" PRIu32 " | %19" PRIu64 " | %9d | %9d | %9d |\n",
i, state, nCurrent, nTotal,
worker->load(Worker::Load::ONE_SECOND),
worker->load(Worker::Load::ONE_MINUTE),
worker->load(Worker::Load::ONE_HOUR));
}
}
/**
* Print the event queue statistics
*
* @param pdcb The DCB to print the event queue to
*/
void
dShowEventStats(DCB *pdcb)
{
int i;
Worker::STATISTICS s = RoutingWorker::get_statistics();
dcb_printf(pdcb, "\nEvent statistics.\n");
dcb_printf(pdcb, "Maximum queue time: %3" PRId64 "00ms\n", s.maxqtime);
dcb_printf(pdcb, "Maximum execution time: %3" PRId64 "00ms\n", s.maxexectime);
dcb_printf(pdcb, "Maximum event queue length: %3" PRId64 "\n", s.evq_max);
dcb_printf(pdcb, "Average event queue length: %3" PRId64 "\n", s.evq_length);
dcb_printf(pdcb, "\n");
dcb_printf(pdcb, " | Number of events\n");
dcb_printf(pdcb, "Duration | Queued | Executed\n");
dcb_printf(pdcb, "---------------+------------+-----------\n");
dcb_printf(pdcb, " < 100ms | %-10d | %-10d\n", s.qtimes[0], s.exectimes[0]);
for (i = 1; i < Worker::STATISTICS::N_QUEUE_TIMES; i++)
{
dcb_printf(pdcb, " %2d00 - %2d00ms | %-10d | %-10d\n", i, i + 1, s.qtimes[i], s.exectimes[i]);
}
dcb_printf(pdcb, " > %2d00ms | %-10d | %-10d\n", Worker::STATISTICS::N_QUEUE_TIMES,
s.qtimes[Worker::STATISTICS::N_QUEUE_TIMES], s.exectimes[Worker::STATISTICS::N_QUEUE_TIMES]);
}
/**
* Return a poll statistic from the polling subsystem
*
* @param what The required statistic
* @return The value of that statistic
*/
int64_t
poll_get_stat(POLL_STAT what)
{
return RoutingWorker::get_one_statistic(what);
}
namespace
{
struct EVENT_TIMES_CB_DATA
{
int rowno;
Worker::STATISTICS stats;
};
}
/**
* Provide a row to the result set that defines the event queue statistics
*
* @param set The result set
* @param data The index of the row to send
* @return The next row or NULL
*/
static RESULT_ROW *
eventTimesRowCallback(RESULTSET *set, void *v)
{
EVENT_TIMES_CB_DATA* data = (EVENT_TIMES_CB_DATA*)v;
char buf[40];
RESULT_ROW *row;
if (data->rowno >= Worker::STATISTICS::N_QUEUE_TIMES)
{
MXS_FREE(data);
return NULL;
}
row = resultset_make_row(set);
if (data->rowno == 0)
{
resultset_row_set(row, 0, "< 100ms");
}
else if (data->rowno == Worker::STATISTICS::N_QUEUE_TIMES - 1)
{
snprintf(buf, 39, "> %2d00ms", Worker::STATISTICS::N_QUEUE_TIMES);
buf[39] = '\0';
resultset_row_set(row, 0, buf);
}
else
{
snprintf(buf, 39, "%2d00 - %2d00ms", data->rowno, data->rowno + 1);
buf[39] = '\0';
resultset_row_set(row, 0, buf);
}
snprintf(buf, 39, "%u", data->stats.qtimes[data->rowno]);
buf[39] = '\0';
resultset_row_set(row, 1, buf);
snprintf(buf, 39, "%u", data->stats.exectimes[data->rowno]);
buf[39] = '\0';
resultset_row_set(row, 2, buf);
data->rowno++;
return row;
}
/**
* Return a result set that has the current set of services in it
*
* @return A Result set
*/
RESULTSET *
eventTimesGetList()
{
RESULTSET *set;
EVENT_TIMES_CB_DATA *data;
if ((data = (EVENT_TIMES_CB_DATA*)MXS_MALLOC(sizeof(EVENT_TIMES_CB_DATA))) == NULL)
{
return NULL;
}
data->rowno = 0;
data->stats = RoutingWorker::get_statistics();
if ((set = resultset_create(eventTimesRowCallback, data)) == NULL)
{
MXS_FREE(data);
return NULL;
}
resultset_add_column(set, "Duration", 20, COL_TYPE_VARCHAR);
resultset_add_column(set, "No. Events Queued", 12, COL_TYPE_VARCHAR);
resultset_add_column(set, "No. Events Executed", 12, COL_TYPE_VARCHAR);
return set;
}