diff --git a/server/core/admin.cc b/server/core/admin.cc index cdfd087b6..f92cd5c47 100644 --- a/server/core/admin.cc +++ b/server/core/admin.cc @@ -12,6 +12,7 @@ */ #include "maxscale/admin.hh" +#include "maxscale/hk_heartbeat.h" #include #include @@ -26,6 +27,7 @@ static AdminListener* admin = NULL; static THREAD admin_thread; +static THREAD timeout_thread; // TODO: Read values from the configuration static AdminConfig config = {DEFAULT_ADMIN_HOST, DEFAULT_ADMIN_PORT}; @@ -36,6 +38,12 @@ void admin_main(void* data) admin->start(); } +void timeout_main(void *data) +{ + AdminListener* admin = reinterpret_cast(data); + admin->check_timeouts(); +} + AdminConfig& mxs_admin_get_config() { return config; @@ -56,12 +64,14 @@ bool mxs_admin_init() if (admin) { - if (thread_start(&admin_thread, admin_main, admin)) + if (thread_start(&admin_thread, admin_main, admin) && + thread_start(&timeout_thread, timeout_main, admin)) { rval = true; } else { + admin->stop(); delete admin; admin = NULL; } @@ -87,6 +97,7 @@ void mxs_admin_shutdown() if (admin) { admin->stop(); + thread_wait(timeout_thread); thread_wait(admin_thread); delete admin; admin = NULL; @@ -95,7 +106,7 @@ void mxs_admin_shutdown() AdminListener::AdminListener(int sock): m_socket(sock), - m_active(0), + m_active(1), m_timeout(10) { } @@ -111,15 +122,15 @@ void AdminListener::handle_clients() if (client) { - client->process(); - delete client; + SAdminClient sclient(client); + ClientList::iterator it = m_clients.insert(m_clients.begin(), sclient); + sclient->process(); + m_clients.erase(it); } } void AdminListener::start() { - atomic_write(&m_active, 1); - while (atomic_read(&m_active)) { MXS_EXCEPTION_GUARD(handle_clients()); @@ -140,7 +151,6 @@ AdminClient* AdminListener::accept_client() if (fd > -1) { - setnonblocking(fd); rval = new AdminClient(fd, addr, m_timeout); } else if (errno == EAGAIN || errno == EWOULDBLOCK) @@ -155,3 +165,29 @@ AdminClient* AdminListener::accept_client() return rval; } + +void AdminListener::handle_timeouts() +{ + int64_t now = hkheartbeat; + + for (ClientList::iterator it = m_clients.begin(); it != m_clients.end(); it++) + { + SAdminClient& client = *it; + + if (now - client->last_activity() > m_timeout * 10) + { + client->close_connection(); + } + } + + /** Sleep for roughly one housekeeper heartbeat */ + thread_millisleep(100); +} + +void AdminListener::check_timeouts() +{ + while (atomic_read(&m_active)) + { + MXS_EXCEPTION_GUARD(handle_timeouts()); + } +} diff --git a/server/core/adminclient.cc b/server/core/adminclient.cc index 848cb2eb7..87629427a 100644 --- a/server/core/adminclient.cc +++ b/server/core/adminclient.cc @@ -12,90 +12,92 @@ */ #include "maxscale/adminclient.hh" +#include "maxscale/httpparser.hh" #include +#include #include #include #include +#include +#include using std::string; +using std::stringstream; +using mxs::SpinLockGuard; AdminClient::AdminClient(int fd, const struct sockaddr_storage& addr, int timeout): m_fd(fd), - m_timeout(timeout), + m_last_activity(atomic_read_int64(&hkheartbeat)), m_addr(addr) { +} +void AdminClient::close_connection() +{ + SpinLockGuard guard(m_lock); + + if (m_fd != -1) + { + close(m_fd); + m_fd = -1; + } } AdminClient::~AdminClient() { - close(m_fd); + close_connection(); } -static bool read_request_header(int fd, int timeout, string& output) +static bool read_request(int fd, string& output) { - int64_t start = atomic_read_int64(&hkheartbeat); - - while ((atomic_read_int64(&hkheartbeat) - start) / 10 < timeout) + while (true) { char buf[1024]; - int rc = read(fd, buf, sizeof(buf)); + int bufsize = sizeof(buf) - 1; - if (rc == -1 && errno != EAGAIN && errno != EWOULDBLOCK) + int rc = read(fd, buf, bufsize); + + if (rc == -1) { return false; } - else if (rc > 0) - { - buf[rc] = '\0'; - output += buf; - if (output.find("\r\n\r\n") != std::string::npos) - { - break; - } + buf[rc] = '\0'; + output += buf; + + if (rc < bufsize) + { + /** Complete request read */ + break; } } return true; } -static bool write_response(int fd, int timeout, string input) +static bool write_response(int fd, string input) { - int64_t start = atomic_read_int64(&hkheartbeat); - - while ((atomic_read_int64(&hkheartbeat) - start) / 10 < timeout && input.length() > 0) - { - int rc = write(fd, input.c_str(), input.length()); - - if (rc == -1 && errno != EAGAIN && errno != EWOULDBLOCK) - { - return false; - } - else if (rc > 0) - { - input.erase(0, rc); - } - } - - return true; - + return write(fd, input.c_str(), input.length()) != -1; } void AdminClient::process() { string request; + atomic_write_int64(&m_last_activity, hkheartbeat); - if (read_request_header(m_fd, m_timeout, request)) + if (read_request(m_fd, request)) { - /** Send the Status-Line part of the response */ - string response = "HTTP/1.1 200 OK\r\n"; + SHttpParser parser(HttpParser::parse(request)); - response += "\r\n"; + string status = parser.get() ? "200 OK" : "400 Bad Request"; - write_response(m_fd, m_timeout, response); + stringstream resp; + resp << "HTTP/1.1 " << status << "\r\n\r\n" << parser->get_body() << "\r\n"; + + atomic_write_int64(&m_last_activity, hkheartbeat); + write_response(m_fd, resp.str()); } else { diff --git a/server/core/maxscale/admin.hh b/server/core/maxscale/admin.hh index 9e6a150bc..03d005a65 100644 --- a/server/core/maxscale/admin.hh +++ b/server/core/maxscale/admin.hh @@ -15,13 +15,17 @@ #include #include +#include #include #include "adminclient.hh" +using std::deque; using std::string; +typedef deque ClientList; + /** The admin interface configuration */ struct AdminConfig { @@ -50,13 +54,20 @@ public: */ void stop(); + /** + * Close timed out connections + */ + void check_timeouts(); + private: void handle_clients(); + void handle_timeouts(); AdminClient* accept_client(); - int m_socket; /**< The network socket we listen on */ - int m_active; /**< Positive value if the admin is active */ - int m_timeout; /**< Network timeout in seconds */ + int m_socket; /**< The network socket we listen on */ + int m_active; /**< Positive value if the admin is active */ + int m_timeout; /**< Network timeout in seconds */ + ClientList m_clients; }; /** diff --git a/server/core/maxscale/adminclient.hh b/server/core/maxscale/adminclient.hh index 55e6928f9..8f06ee714 100644 --- a/server/core/maxscale/adminclient.hh +++ b/server/core/maxscale/adminclient.hh @@ -17,6 +17,12 @@ #include #include +#include + +#include +#include + +using mxs::SpinLock; class AdminClient { @@ -33,12 +39,30 @@ public: ~AdminClient(); /** - * Process one request + * @brief Process one request */ void process(); + /** + * @brief Close the connection + */ + void close_connection(); + + /** + * @brief Get last activity timestamp + * + * @return The hkheartbeat of the last activity + */ + int64_t last_activity() + { + return atomic_read_int64(&m_last_activity); + } + private: int m_fd; /**< The client socket */ - int m_timeout; /**< Network timeout for reads and writes */ + int64_t m_last_activity; struct sockaddr_storage m_addr; /**< Network info for the client */ + SpinLock m_lock; }; + +typedef std::shared_ptr SAdminClient;