MXS-1220: Add threads resource

The threads are now a REST API resource exposed via the /maxscale/threads
resource collection.
This commit is contained in:
Markus Mäkelä
2017-05-05 09:48:30 +03:00
parent ac21443529
commit 0e57bec4ef
4 changed files with 151 additions and 9 deletions

View File

@ -12,11 +12,15 @@
*/
#include "maxscale/worker.hh"
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdlib.h>
#include <unistd.h>
#include <vector>
#include <sstream>
#include <maxscale/alloc.h>
#include <maxscale/atomic.h>
#include <maxscale/config.h>
@ -24,13 +28,21 @@
#include <maxscale/log_manager.h>
#include <maxscale/platform.h>
#include <maxscale/semaphore.hh>
#include <maxscale/json_api.h>
#include <maxscale/utils.hh>
#include "maxscale/modules.h"
#include "maxscale/poll.h"
#include "maxscale/statistics.h"
#include "maxscale/workertask.hh"
#define WORKER_ABSENT_ID -1
using maxscale::Worker;
using maxscale::Closer;
using maxscale::Semaphore;
using std::vector;
using std::stringstream;
namespace
{
@ -767,6 +779,87 @@ MXS_SESSION* Worker::find_session(uint64_t id)
return rval;
}
class WorkerInfoTask: public maxscale::WorkerTask
{
public:
WorkerInfoTask(const char* host, uint32_t nthreads):
m_host(host)
{
m_data.resize(nthreads);
}
void execute(Worker& worker)
{
json_t* stats = json_object();
const Worker::STATISTICS& s = worker.get_local_statistics();
json_object_set_new(stats, "reads", json_integer(s.n_read));
json_object_set_new(stats, "writes", json_integer(s.n_write));
json_object_set_new(stats, "errors", json_integer(s.n_error));
json_object_set_new(stats, "hangups", json_integer(s.n_hup));
json_object_set_new(stats, "accepts", json_integer(s.n_accept));
json_object_set_new(stats, "blocking_polls", json_integer(s.blockingpolls));
json_object_set_new(stats, "event_queue_length", json_integer(s.evq_length));
json_object_set_new(stats, "max_event_queue_length", json_integer(s.evq_max));
json_object_set_new(stats, "max_exec_time", json_integer(s.maxexectime));
json_object_set_new(stats, "max_queue_time", json_integer(s.maxqtime));
json_t* attr = json_object();
json_object_set_new(attr, "stats", stats);
int idx = worker.get_current_id();
stringstream ss;
ss << idx;
json_t* json = json_object();
json_object_set_new(json, CN_ID, json_string(ss.str().c_str()));
json_object_set_new(json, CN_TYPE, json_string(CN_THREADS));
json_object_set_new(json, CN_ATTRIBUTES, attr);
ss_dassert((size_t)idx < m_data.size());
m_data[idx] = json;
}
json_t* resource()
{
json_t* arr = json_array();
for (vector<json_t*>::iterator it = m_data.begin(); it != m_data.end(); it++)
{
json_array_append_new(arr, *it);
}
return mxs_json_resource(m_host, MXS_JSON_API_THREADS, arr);
}
json_t* resource(int id)
{
return mxs_json_resource(m_host, MXS_JSON_API_THREADS, m_data[id]);
}
private:
vector<json_t*> m_data;
const char* m_host;
};
json_t* mxs_worker_to_json(const char* host, int id)
{
Worker* target = Worker::get(id);
WorkerInfoTask task(host, id + 1);
Semaphore sem;
target->post(&task, &sem);
sem.wait();
return task.resource(id);
}
json_t* mxs_worker_list_to_json(const char* host)
{
WorkerInfoTask task(host, config_threadcount());
Worker::execute_concurrently(task);
return task.resource();
}
void Worker::run()
{
this_thread.current_worker_id = m_id;