MXS-1220: Use libmicrohttpd for the HTTP handling

The HTTP side of the REST API is better handled with an actual
library. The libmicrohttpd library provides a convenient way of handling
the HTTP traffic between the clients and MaxScale.
This commit is contained in:
Markus Mäkelä
2017-04-18 12:19:01 +03:00
committed by Markus Mäkelä
parent eb3ff1cc7b
commit d242203279
14 changed files with 136 additions and 1357 deletions

View File

@ -15,6 +15,7 @@
#include <climits>
#include <new>
#include <microhttpd.h>
#include <maxscale/atomic.h>
#include <maxscale/debug.h>
@ -23,171 +24,66 @@
#include "maxscale/admin.hh"
#include "maxscale/hk_heartbeat.h"
#include "maxscale/resource.hh"
#define DEFAULT_ADMIN_HOST "127.0.0.1"
#define DEFAULT_ADMIN_PORT 8080
#define DEFAULT_ADMIN_AUTH HTTP_AUTH_NONE
static AdminListener* admin = NULL;
static THREAD admin_thread;
static THREAD timeout_thread;
static struct MHD_Daemon* http_daemon = NULL;
// TODO: Read values from the configuration
static AdminConfig config = {DEFAULT_ADMIN_HOST, DEFAULT_ADMIN_PORT, DEFAULT_ADMIN_AUTH};
int handle_client(void *cls,
struct MHD_Connection *connection,
const char *url,
const char *method,
const char *version,
const char *upload_data,
size_t *upload_data_size,
void **con_cls)
void admin_main(void* data)
{
AdminListener* admin = reinterpret_cast<AdminListener*>(data);
admin->start();
}
string verb(method);
json_t* json = NULL;
void timeout_main(void *data)
{
AdminListener* admin = reinterpret_cast<AdminListener*>(data);
admin->check_timeouts();
}
if (verb == "POST" || verb == "PUT" || verb == "PATCH")
{
json_error_t err = {};
if ((json = json_loadb(upload_data, *upload_data_size, 0, &err)) == NULL)
{
return MHD_NO;
}
}
AdminConfig& mxs_admin_get_config()
{
return config;
HttpRequest request(connection, url, method, json);
HttpResponse reply = resource_handle_request(request);
string data = reply.get_response();
struct MHD_Response *response = MHD_create_response_from_buffer(data.size(),
(void*)data.c_str(),
MHD_RESPMEM_MUST_COPY);
for (map<string, string>::const_iterator it = reply.get_headers().begin();
it != reply.get_headers().end(); it++)
{
MHD_add_response_header(response, it->first.c_str(), it->second.c_str());
}
MHD_queue_response(connection, reply.get_code(), response);
MHD_destroy_response(response);
return MHD_YES;
}
bool mxs_admin_init()
{
ss_dassert(admin == NULL);
bool rval = false;
struct sockaddr_storage addr = {};
int sock = open_network_socket(MXS_SOCKET_LISTENER, &addr, config.host.c_str(), config.port);
http_daemon = MHD_start_daemon(MHD_USE_EPOLL_INTERNALLY | MHD_USE_DUAL_STACK,
DEFAULT_ADMIN_PORT, NULL, NULL,
handle_client, NULL, MHD_OPTION_END);
return http_daemon != NULL;
if (sock > -1)
{
setblocking(sock);
if (listen(sock, INT_MAX) == 0)
{
admin = new (std::nothrow) AdminListener(sock);
if (admin)
{
if (thread_start(&admin_thread, admin_main, admin) &&
thread_start(&timeout_thread, timeout_main, admin))
{
rval = true;
}
else
{
admin->stop();
delete admin;
admin = NULL;
}
}
else
{
MXS_OOM();
}
}
else
{
MXS_ERROR("Failed to start listening on '[%s]:%u': %d, %s",
config.host.c_str(), config.port, errno, mxs_strerror(errno));
close(sock);
}
}
return rval;
}
void mxs_admin_shutdown()
{
if (admin)
{
admin->stop();
thread_wait(timeout_thread);
thread_wait(admin_thread);
delete admin;
admin = NULL;
}
}
AdminListener::AdminListener(int sock):
m_socket(sock),
m_active(1),
m_timeout(10)
{
}
AdminListener::~AdminListener()
{
close(m_socket);
}
void AdminListener::handle_clients()
{
AdminClient* client;
while ((client = accept_client()))
{
SAdminClient sclient(client);
ClientList::iterator it = m_clients.insert(m_clients.begin(), sclient);
sclient->process();
m_clients.erase(it);
}
}
void AdminListener::start()
{
while (atomic_read(&m_active))
{
MXS_EXCEPTION_GUARD(handle_clients());
}
}
void AdminListener::stop()
{
atomic_write(&m_active, 0);
}
AdminClient* AdminListener::accept_client()
{
AdminClient* rval = NULL;
struct sockaddr_storage addr = {};
socklen_t len = sizeof (addr);
int fd = accept(m_socket, (struct sockaddr*) &addr, &len);
if (fd > -1)
{
rval = new AdminClient(fd, addr, m_timeout);
}
else
{
MXS_ERROR("Failed to accept client: %d, %s\n", errno, mxs_strerror(errno));
}
return rval;
}
void AdminListener::handle_timeouts()
{
int64_t now = hkheartbeat;
for (ClientList::iterator it = m_clients.begin(); it != m_clients.end(); it++)
{
SAdminClient& client = *it;
if (now - client->last_activity() > m_timeout * 10)
{
client->close_connection();
}
}
/** Sleep for roughly one housekeeper heartbeat */
thread_millisleep(100);
}
void AdminListener::check_timeouts()
{
while (atomic_read(&m_active))
{
MXS_EXCEPTION_GUARD(handle_timeouts());
}
MHD_stop_daemon(http_daemon);
}