MXS-1220: Simplify admin request handling

The admin requests are now processed in blocking mode. The timing out of
connecttions is handled by a specific timeout thread that checks the state
of each admin request.

The simplification will help with the JSON parsing with PUT/POST
commands. If non-blocking IO is used, the network reading code and JSON
parsing needs a lot more work to handle partial reads.

If the administrative interface requires higher performance and
concurrency, a multi-threaded solution could be created.
This commit is contained in:
Markus Mäkelä
2017-04-15 06:26:10 +03:00
committed by Markus Mäkelä
parent 439d67d129
commit e34b65658e
4 changed files with 125 additions and 52 deletions

View File

@ -12,6 +12,7 @@
*/ */
#include "maxscale/admin.hh" #include "maxscale/admin.hh"
#include "maxscale/hk_heartbeat.h"
#include <climits> #include <climits>
#include <new> #include <new>
@ -26,6 +27,7 @@
static AdminListener* admin = NULL; static AdminListener* admin = NULL;
static THREAD admin_thread; static THREAD admin_thread;
static THREAD timeout_thread;
// TODO: Read values from the configuration // TODO: Read values from the configuration
static AdminConfig config = {DEFAULT_ADMIN_HOST, DEFAULT_ADMIN_PORT}; static AdminConfig config = {DEFAULT_ADMIN_HOST, DEFAULT_ADMIN_PORT};
@ -36,6 +38,12 @@ void admin_main(void* data)
admin->start(); admin->start();
} }
void timeout_main(void *data)
{
AdminListener* admin = reinterpret_cast<AdminListener*>(data);
admin->check_timeouts();
}
AdminConfig& mxs_admin_get_config() AdminConfig& mxs_admin_get_config()
{ {
return config; return config;
@ -56,12 +64,14 @@ bool mxs_admin_init()
if (admin) if (admin)
{ {
if (thread_start(&admin_thread, admin_main, admin)) if (thread_start(&admin_thread, admin_main, admin) &&
thread_start(&timeout_thread, timeout_main, admin))
{ {
rval = true; rval = true;
} }
else else
{ {
admin->stop();
delete admin; delete admin;
admin = NULL; admin = NULL;
} }
@ -87,6 +97,7 @@ void mxs_admin_shutdown()
if (admin) if (admin)
{ {
admin->stop(); admin->stop();
thread_wait(timeout_thread);
thread_wait(admin_thread); thread_wait(admin_thread);
delete admin; delete admin;
admin = NULL; admin = NULL;
@ -95,7 +106,7 @@ void mxs_admin_shutdown()
AdminListener::AdminListener(int sock): AdminListener::AdminListener(int sock):
m_socket(sock), m_socket(sock),
m_active(0), m_active(1),
m_timeout(10) m_timeout(10)
{ {
} }
@ -111,15 +122,15 @@ void AdminListener::handle_clients()
if (client) if (client)
{ {
client->process(); SAdminClient sclient(client);
delete client; ClientList::iterator it = m_clients.insert(m_clients.begin(), sclient);
sclient->process();
m_clients.erase(it);
} }
} }
void AdminListener::start() void AdminListener::start()
{ {
atomic_write(&m_active, 1);
while (atomic_read(&m_active)) while (atomic_read(&m_active))
{ {
MXS_EXCEPTION_GUARD(handle_clients()); MXS_EXCEPTION_GUARD(handle_clients());
@ -140,7 +151,6 @@ AdminClient* AdminListener::accept_client()
if (fd > -1) if (fd > -1)
{ {
setnonblocking(fd);
rval = new AdminClient(fd, addr, m_timeout); rval = new AdminClient(fd, addr, m_timeout);
} }
else if (errno == EAGAIN || errno == EWOULDBLOCK) else if (errno == EAGAIN || errno == EWOULDBLOCK)
@ -155,3 +165,29 @@ AdminClient* AdminListener::accept_client()
return rval; return rval;
} }
void AdminListener::handle_timeouts()
{
int64_t now = hkheartbeat;
for (ClientList::iterator it = m_clients.begin(); it != m_clients.end(); it++)
{
SAdminClient& client = *it;
if (now - client->last_activity() > m_timeout * 10)
{
client->close_connection();
}
}
/** Sleep for roughly one housekeeper heartbeat */
thread_millisleep(100);
}
void AdminListener::check_timeouts()
{
while (atomic_read(&m_active))
{
MXS_EXCEPTION_GUARD(handle_timeouts());
}
}

View File

@ -12,90 +12,92 @@
*/ */
#include "maxscale/adminclient.hh" #include "maxscale/adminclient.hh"
#include "maxscale/httpparser.hh"
#include <string> #include <string>
#include <sstream>
#include <maxscale/atomic.h> #include <maxscale/atomic.h>
#include <maxscale/hk_heartbeat.h> #include <maxscale/hk_heartbeat.h>
#include <maxscale/log_manager.h> #include <maxscale/log_manager.h>
#include <maxscale/thread.h>
#include <maxscale/spinlock.hh>
using std::string; using std::string;
using std::stringstream;
using mxs::SpinLockGuard;
AdminClient::AdminClient(int fd, const struct sockaddr_storage& addr, int timeout): AdminClient::AdminClient(int fd, const struct sockaddr_storage& addr, int timeout):
m_fd(fd), m_fd(fd),
m_timeout(timeout), m_last_activity(atomic_read_int64(&hkheartbeat)),
m_addr(addr) m_addr(addr)
{ {
}
void AdminClient::close_connection()
{
SpinLockGuard guard(m_lock);
if (m_fd != -1)
{
close(m_fd);
m_fd = -1;
}
} }
AdminClient::~AdminClient() AdminClient::~AdminClient()
{ {
close(m_fd); close_connection();
} }
static bool read_request_header(int fd, int timeout, string& output) static bool read_request(int fd, string& output)
{ {
int64_t start = atomic_read_int64(&hkheartbeat); while (true)
while ((atomic_read_int64(&hkheartbeat) - start) / 10 < timeout)
{ {
char buf[1024]; char buf[1024];
int rc = read(fd, buf, sizeof(buf)); int bufsize = sizeof(buf) - 1;
if (rc == -1 && errno != EAGAIN && errno != EWOULDBLOCK) int rc = read(fd, buf, bufsize);
if (rc == -1)
{ {
return false; return false;
} }
else if (rc > 0)
{
buf[rc] = '\0'; buf[rc] = '\0';
output += buf; output += buf;
if (output.find("\r\n\r\n") != std::string::npos) if (rc < bufsize)
{ {
/** Complete request read */
break; break;
} }
} }
}
return true; return true;
} }
static bool write_response(int fd, int timeout, string input) static bool write_response(int fd, string input)
{ {
int64_t start = atomic_read_int64(&hkheartbeat); return write(fd, input.c_str(), input.length()) != -1;
while ((atomic_read_int64(&hkheartbeat) - start) / 10 < timeout && input.length() > 0)
{
int rc = write(fd, input.c_str(), input.length());
if (rc == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
{
return false;
}
else if (rc > 0)
{
input.erase(0, rc);
}
}
return true;
} }
void AdminClient::process() void AdminClient::process()
{ {
string request; string request;
atomic_write_int64(&m_last_activity, hkheartbeat);
if (read_request_header(m_fd, m_timeout, request)) if (read_request(m_fd, request))
{ {
/** Send the Status-Line part of the response */ SHttpParser parser(HttpParser::parse(request));
string response = "HTTP/1.1 200 OK\r\n";
response += "\r\n"; string status = parser.get() ? "200 OK" : "400 Bad Request";
write_response(m_fd, m_timeout, response); stringstream resp;
resp << "HTTP/1.1 " << status << "\r\n\r\n" << parser->get_body() << "\r\n";
atomic_write_int64(&m_last_activity, hkheartbeat);
write_response(m_fd, resp.str());
} }
else else
{ {

View File

@ -15,13 +15,17 @@
#include <maxscale/cppdefs.hh> #include <maxscale/cppdefs.hh>
#include <string> #include <string>
#include <deque>
#include <maxscale/thread.h> #include <maxscale/thread.h>
#include "adminclient.hh" #include "adminclient.hh"
using std::deque;
using std::string; using std::string;
typedef deque<SAdminClient> ClientList;
/** The admin interface configuration */ /** The admin interface configuration */
struct AdminConfig struct AdminConfig
{ {
@ -50,13 +54,20 @@ public:
*/ */
void stop(); void stop();
/**
* Close timed out connections
*/
void check_timeouts();
private: private:
void handle_clients(); void handle_clients();
void handle_timeouts();
AdminClient* accept_client(); AdminClient* accept_client();
int m_socket; /**< The network socket we listen on */ int m_socket; /**< The network socket we listen on */
int m_active; /**< Positive value if the admin is active */ int m_active; /**< Positive value if the admin is active */
int m_timeout; /**< Network timeout in seconds */ int m_timeout; /**< Network timeout in seconds */
ClientList m_clients;
}; };
/** /**

View File

@ -17,6 +17,12 @@
#include <map> #include <map>
#include <sys/socket.h> #include <sys/socket.h>
#include <tr1/memory>
#include <maxscale/atomic.h>
#include <maxscale/spinlock.hh>
using mxs::SpinLock;
class AdminClient class AdminClient
{ {
@ -33,12 +39,30 @@ public:
~AdminClient(); ~AdminClient();
/** /**
* Process one request * @brief Process one request
*/ */
void process(); void process();
/**
* @brief Close the connection
*/
void close_connection();
/**
* @brief Get last activity timestamp
*
* @return The hkheartbeat of the last activity
*/
int64_t last_activity()
{
return atomic_read_int64(&m_last_activity);
}
private: private:
int m_fd; /**< The client socket */ int m_fd; /**< The client socket */
int m_timeout; /**< Network timeout for reads and writes */ int64_t m_last_activity;
struct sockaddr_storage m_addr; /**< Network info for the client */ struct sockaddr_storage m_addr; /**< Network info for the client */
SpinLock m_lock;
}; };
typedef std::shared_ptr<AdminClient> SAdminClient;