From b3fbc6aa3df6b232ae2d95de8d783adcdb8717fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sat, 1 Dec 2018 20:59:44 +0200 Subject: [PATCH] MXS-2196: Remove DCB from Listener Replaced the DCB with a single file descriptor that the listener listens on and which is added to all of the workers. The Listener also extends the MXB_POLL_DATA which allows it to handle epoll events. Moved the code that creates the listening socket into listener.cc where it belongs and did a minor cleanup of it. --- include/maxscale/listener.hh | 33 ++++- server/core/config_runtime.cc | 2 +- server/core/dcb.cc | 199 +++--------------------------- server/core/listener.cc | 225 +++++++++++++++++++++++++--------- server/core/service.cc | 2 +- 5 files changed, 215 insertions(+), 246 deletions(-) diff --git a/include/maxscale/listener.hh b/include/maxscale/listener.hh index eea25a5cf..4a4b7fad9 100644 --- a/include/maxscale/listener.hh +++ b/include/maxscale/listener.hh @@ -23,6 +23,7 @@ #include #include #include +#include struct DCB; class SERVICE; @@ -34,7 +35,7 @@ using SListener = std::shared_ptr; * The Listener class is used to link a network port to a service. It defines the name of the * protocol module that should be loaded as well as the authenticator that is used. */ -class Listener +class Listener : public MXB_POLL_DATA { public: @@ -78,7 +79,7 @@ public: * * @return True if the listener was able to start listening */ - bool listen(const SListener& self); + bool listen(); /** * Stop the listener @@ -172,6 +173,12 @@ public: */ void print_users(DCB* dcb); + // TODO: Move dcb_accept code into listener.cc and remove this + int fd() const + { + return m_fd; + } + // Functions that are temporarily public bool create_listener_config(const char* filename); struct users* users() const; @@ -196,13 +203,27 @@ private: std::string m_auth_options; /**< Authenticator options */ void* m_auth_instance; /**< Authenticator instance */ SSL_LISTENER* m_ssl; /**< Structure of SSL data or NULL */ - DCB* m_listener; /**< The DCB for the listener */ struct users* m_users; /**< The user data for this listener */ SERVICE* m_service; /**< The service which used by this listener */ std::atomic m_active; /**< True if the port has not been deleted */ MXS_PROTOCOL m_proto_func; /**< Preloaded protocol functions */ MXS_AUTHENTICATOR m_auth_func; /**< Preloaded authenticator functions */ + int m_fd; /**< File descriptor the listener listens on */ + + /** A shared pointer to the listener itself that is passed as the argument to + * the protocol's accept function. This allows client connections to live + * longer than the listener they started on. + * + * This will eventually be replaced with a shared_ptr of the authenticator instance as that is + * what is actually required by the client sessions. + * + * In practice as a service must outlive all sessions on it, the reference could be owned by the service + * instead of each individual client. This would remove the need to increment the listener reference + * count every time a client is accepted. + */ + SListener m_self; + /** * Creates a new listener that points to a service * @@ -220,6 +241,12 @@ private: const std::string& protocol, const std::string& authenticator, const std::string& auth_opts, void* auth_instance, SSL_LISTENER* ssl); + // Listen on a file descriptor shared between all workers + bool listen_shared(std::string config_bind); + + // Handler for EPOLL_IN events + static uint32_t poll_handler(MXB_POLL_DATA* data, MXB_WORKER* worker, uint32_t events); + friend DCB* dcb_accept(const SListener& listener); }; diff --git a/server/core/config_runtime.cc b/server/core/config_runtime.cc index ad15beb50..029414659 100644 --- a/server/core/config_runtime.cc +++ b/server/core/config_runtime.cc @@ -1022,7 +1022,7 @@ bool runtime_create_listener(Service* service, MXS_NOTICE("Created %slistener '%s' at %s:%s for service '%s'", ssl ? "TLS encrypted " : "", name, print_addr, port, service->name); - if (listener->listen(listener)) + if (listener->listen()) { rval = true; } diff --git a/server/core/dcb.cc b/server/core/dcb.cc index c3c51e0da..ca0fabbae 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -2321,7 +2321,7 @@ DCB* dcb_accept(const SListener& listener) int c_sock; struct sockaddr_storage client_conn; - if ((c_sock = dcb_accept_one_connection(listener->m_listener->fd, (struct sockaddr*)&client_conn)) >= 0) + if ((c_sock = dcb_accept_one_connection(listener->fd(), (struct sockaddr*)&client_conn)) >= 0) { configure_network_socket(c_sock, client_conn.ss_family); @@ -2334,7 +2334,6 @@ DCB* dcb_accept(const SListener& listener) } else { - client_dcb->service = listener->service(); client_dcb->session = session_set_dummy(client_dcb); client_dcb->fd = c_sock; @@ -2497,145 +2496,6 @@ static int dcb_accept_one_connection(int fd, struct sockaddr* client_conn) return c_sock; } -/** - * @brief Create a listener, add new information to the given DCB - * - * First creates and opens a socket, either TCP or Unix according to the - * configuration data provided. Then try to listen on the socket and - * record the socket in the given DCB. Add the given DCB into the poll - * list. The protocol name does not affect the logic, but is used in - * log messages. - * - * @param dcb Listener DCB that is being created - * @param config Configuration for port to listen on - * - * @return 0 if new listener created successfully, otherwise -1 - */ -int dcb_listen(DCB* dcb, const char* config) -{ - char host[strlen(config) + 1]; - strcpy(host, config); - char* port_str = strrchr(host, '|'); - uint16_t port = 0; - - if (port_str) - { - *port_str++ = 0; - port = atoi(port_str); - } - - int listener_socket = -1; - - if (strchr(host, '/')) - { - listener_socket = dcb_listen_create_socket_unix(host); - - if (listener_socket != -1) - { - dcb->path = MXS_STRDUP_A(host); - } - } - else if (port > 0) - { - listener_socket = dcb_listen_create_socket_inet(host, port); - - if (listener_socket == -1 && strcmp(host, "::") == 0) - { - /** Attempt to bind to the IPv4 if the default IPv6 one is used */ - MXS_WARNING("Failed to bind on default IPv6 host '::', attempting " - "to bind on IPv4 version '0.0.0.0'"); - strcpy(host, "0.0.0.0"); - listener_socket = dcb_listen_create_socket_inet(host, port); - } - } - else - { - // We don't have a socket path or a network port - mxb_assert(false); - } - - if (listener_socket < 0) - { - mxb_assert(listener_socket == -1); - return -1; - } - - /** - * The use of INT_MAX for backlog length in listen() allows the end-user to - * control the backlog length with the net.ipv4.tcp_max_syn_backlog kernel - * option since the parameter is silently truncated to the configured value. - * - * @see man 2 listen - */ - if (listen(listener_socket, INT_MAX) != 0) - { - MXS_ERROR("Failed to start listening on [%s]:%u: %d, %s", - host, - port, - errno, - mxs_strerror(errno)); - close(listener_socket); - return -1; - } - - MXS_NOTICE("Listening for connections at [%s]:%u", host, port); - - // assign listener_socket to dcb - dcb->fd = listener_socket; - - // add listening socket to poll structure - if (poll_add_dcb(dcb) != 0) - { - MXS_ERROR("MaxScale encountered system limit while " - "attempting to register on an epoll instance."); - return -1; - } - return 0; -} - -/** - * @brief Create a network listener socket - * - * @param host The network address to listen on - * @param port The port to listen on - * @return The opened socket or -1 on error - */ -static int dcb_listen_create_socket_inet(const char* host, uint16_t port) -{ - struct sockaddr_storage server_address = {}; - return open_network_socket(MXS_SOCKET_LISTENER, &server_address, host, port); -} - -/** - * @brief Create a Unix domain socket - * - * @param path The socket path - * @return The opened socket or -1 on error - */ -static int dcb_listen_create_socket_unix(const char* path) -{ - if (unlink(path) == -1 && errno != ENOENT) - { - MXS_ERROR("Failed to unlink Unix Socket %s: %d %s", - path, - errno, - mxs_strerror(errno)); - } - - struct sockaddr_un local_addr; - int listener_socket = open_unix_socket(MXS_SOCKET_LISTENER, &local_addr, path); - - if (listener_socket >= 0 && chmod(path, 0777) < 0) - { - MXS_ERROR("Failed to change permissions on UNIX Domain socket '%s': %d, %s", - path, - errno, - mxs_strerror(errno)); - } - - return listener_socket; -} - /** * @brief Set socket options, log an error if fails * @@ -2989,49 +2849,28 @@ static uint32_t dcb_process_poll_events(DCB* dcb, uint32_t events) } if ((events & EPOLLIN) && (dcb->n_close == 0)) { - if (dcb->state == DCB_STATE_LISTENING) + MXS_DEBUG("%lu [poll_waitevents] " + "Read in dcb %p fd %d", + pthread_self(), + dcb, + dcb->fd); + rc |= MXB_POLL_READ; + + if (dcb_session_check(dcb, "read")) { - MXS_DEBUG("%lu [poll_waitevents] " - "Accept in fd %d", - pthread_self(), - dcb->fd); - rc |= MXB_POLL_ACCEPT; - - if (dcb_session_check(dcb, "accept")) + int return_code = 1; + /** SSL authentication is still going on, we need to call dcb_accept_SSL + * until it return 1 for success or -1 for error */ + if (dcb->ssl_state == SSL_HANDSHAKE_REQUIRED) { - DCB* client_dcb; - - while ((client_dcb = dcb_accept(dcb->listener))) - { - dcb->func.accept(client_dcb); - } + return_code = (DCB_ROLE_CLIENT_HANDLER == dcb->dcb_role) ? + dcb_accept_SSL(dcb) : + dcb_connect_SSL(dcb); } - } - else - { - MXS_DEBUG("%lu [poll_waitevents] " - "Read in dcb %p fd %d", - pthread_self(), - dcb, - dcb->fd); - rc |= MXB_POLL_READ; - - if (dcb_session_check(dcb, "read")) + if (1 == return_code) { - int return_code = 1; - /** SSL authentication is still going on, we need to call dcb_accept_SSL - * until it return 1 for success or -1 for error */ - if (dcb->ssl_state == SSL_HANDSHAKE_REQUIRED) - { - return_code = (DCB_ROLE_CLIENT_HANDLER == dcb->dcb_role) ? - dcb_accept_SSL(dcb) : - dcb_connect_SSL(dcb); - } - if (1 == return_code) - { - DCB_EH_NOTICE("Calling dcb->func.read(%p)", dcb); - dcb->func.read(dcb); - } + DCB_EH_NOTICE("Calling dcb->func.read(%p)", dcb); + dcb->func.read(dcb); } } } diff --git a/server/core/listener.cc b/server/core/listener.cc index 74fe5388b..654e88f4c 100644 --- a/server/core/listener.cc +++ b/server/core/listener.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -33,6 +34,7 @@ #include #include #include +#include #include "internal/modules.hh" @@ -46,7 +48,8 @@ static RSA* tmp_rsa_callback(SSL* s, int is_export, int keylength); Listener::Listener(SERVICE* service, const std::string& name, const std::string& address, uint16_t port, const std::string& protocol, const std::string& authenticator, const std::string& auth_opts, void* auth_instance, SSL_LISTENER* ssl) - : m_name(name) + : MXB_POLL_DATA{Listener::poll_handler} + , m_name(name) , m_state(CREATED) , m_protocol(protocol) , m_port(port) @@ -55,7 +58,6 @@ Listener::Listener(SERVICE* service, const std::string& name, const std::string& , m_auth_options(auth_opts) , m_auth_instance(auth_instance) , m_ssl(ssl) - , m_listener(nullptr) , m_users(nullptr) , m_service(service) , m_proto_func(*(MXS_PROTOCOL*)load_module(protocol.c_str(), MODULE_PROTOCOL)) @@ -70,11 +72,6 @@ Listener::~Listener() users_free(m_users); } - if (m_listener) - { - dcb_close(m_listener); - } - SSL_LISTENER_free(m_ssl); } @@ -116,6 +113,10 @@ SListener Listener::create(SERVICE* service, if (listener) { + // Storing a self-reference to the listener makes it possible to easily + // increment the reference count when new connections are accepted. + listener->m_self = listener; + // Note: This isn't good: we modify the service from a listener and the service itself should do this. service->capabilities |= proto_mod->module_capabilities | auth_mod->module_capabilities; @@ -128,17 +129,14 @@ SListener Listener::create(SERVICE* service, void Listener::destroy(const SListener& listener) { + // Remove the listener from all workers. This makes sure that there's no concurrent access while we're + // closing things up. listener->stop(); - // TODO: This is not pretty but it works, revise when listeners are refactored. This is - // thread-safe as the listener is freed on the same thread that closes the socket. - ::close(listener->m_listener->fd); - listener->m_listener->fd = -1; + close(listener->m_fd); + listener->m_fd = -1; listener->m_state = DESTROYED; - // This frees the self-reference the listener's own DCB has to itself - listener->m_listener->listener.reset(); - std::lock_guard guard(listener_lock); all_listeners.remove(listener); } @@ -146,9 +144,8 @@ void Listener::destroy(const SListener& listener) bool Listener::stop() { bool rval = (m_state == STOPPED); - mxb_assert(m_listener); - if (m_state == STARTED && poll_remove_dcb(m_listener) == 0) + if (m_state == STARTED && mxs::RoutingWorker::remove_shared_fd(m_fd)) { m_state = STOPPED; rval = true; @@ -160,9 +157,8 @@ bool Listener::stop() bool Listener::start() { bool rval = (m_state == STARTED); - mxb_assert(m_listener); - if (m_state == STOPPED && poll_add_dcb(m_listener) == 0) + if (m_state == STOPPED && mxs::RoutingWorker::add_shared_fd(m_fd, EPOLLIN, this)) { m_state = STARTED; rval = true; @@ -701,11 +697,11 @@ const char* Listener::state() const void Listener::print_users(DCB* dcb) { - if (m_listener && m_listener->authfunc.diagnostic) + if (m_auth_func.diagnostic) { dcb_printf(dcb, "User names (%s): ", name()); - m_listener->authfunc.diagnostic(dcb, this); + m_auth_func.diagnostic(dcb, this); dcb_printf(dcb, "\n"); } @@ -715,9 +711,9 @@ int Listener::load_users() { int rval = MXS_AUTH_LOADUSERS_OK; - if (m_listener && m_listener->authfunc.loadusers) + if (m_auth_func.loadusers) { - rval = m_listener->authfunc.loadusers(this); + rval = m_auth_func.loadusers(this); } return rval; @@ -734,34 +730,138 @@ void Listener::set_users(struct users* u) m_users = u; } -bool Listener::listen(const SListener& self) +namespace +{ + +/** + * @brief Create a Unix domain socket + * + * @param path The socket path + * @return The opened socket or -1 on error + */ +static int create_unix_socket(const char* path) +{ + if (unlink(path) == -1 && errno != ENOENT) + { + MXS_ERROR("Failed to unlink Unix Socket %s: %d %s", path, errno, mxs_strerror(errno)); + } + + struct sockaddr_un local_addr; + int listener_socket = open_unix_socket(MXS_SOCKET_LISTENER, &local_addr, path); + + if (listener_socket >= 0 && chmod(path, 0777) < 0) + { + MXS_ERROR("Failed to change permissions on UNIX Domain socket '%s': %d, %s", + path, errno, mxs_strerror(errno)); + } + + return listener_socket; +} + +/** + * @brief Create a listener, add new information to the given DCB + * + * First creates and opens a socket, either TCP or Unix according to the + * configuration data provided. Then try to listen on the socket and + * record the socket in the given DCB. Add the given DCB into the poll + * list. The protocol name does not affect the logic, but is used in + * log messages. + * + * @param config Configuration for port to listen on + * + * @return New socket or -1 on error + */ +int start_listening(const char* config) +{ + char host[strlen(config) + 1]; + strcpy(host, config); + char* port_str = strrchr(host, '|'); + uint16_t port = 0; + + if (port_str) + { + *port_str++ = 0; + port = atoi(port_str); + } + + mxb_assert(strchr(host, '/') || port > 0); + + int listener_socket = -1; + + if (strchr(host, '/')) + { + listener_socket = create_unix_socket(host); + } + else if (port > 0) + { + struct sockaddr_storage server_address = {}; + listener_socket = open_network_socket(MXS_SOCKET_LISTENER, &server_address, host, port); + + if (listener_socket == -1 && strcmp(host, "::") == 0) + { + /** Attempt to bind to the IPv4 if the default IPv6 one is used */ + MXS_WARNING("Failed to bind on default IPv6 host '::', attempting " + "to bind on IPv4 version '0.0.0.0'"); + strcpy(host, "0.0.0.0"); + listener_socket = open_network_socket(MXS_SOCKET_LISTENER, &server_address, host, port); + } + } + + if (listener_socket != -1) + { + /** + * The use of INT_MAX for backlog length in listen() allows the end-user to + * control the backlog length with the net.ipv4.tcp_max_syn_backlog kernel + * option since the parameter is silently truncated to the configured value. + * + * @see man 2 listen + */ + if (listen(listener_socket, INT_MAX) != 0) + { + MXS_ERROR("Failed to start listening on [%s]:%u: %d, %s", host, port, errno, mxs_strerror(errno)); + close(listener_socket); + return -1; + } + } + + return listener_socket; +} +} + +bool Listener::listen_shared(std::string config_bind) +{ + bool rval = false; + int fd = start_listening(config_bind.data()); + + if (fd != -1) + { + if (mxs::RoutingWorker::add_shared_fd(fd, EPOLLIN, this)) + { + m_fd = fd; + rval = true; + m_state = STARTED; + } + else + { + close(fd); + } + } + else + { + MXS_ERROR("[%s] Failed to listen on %s", m_service->name, config_bind.c_str()); + } + + return rval; +} + +bool Listener::listen() { m_state = FAILED; - // This is a temporary workaround until the DCBs are removed from listeners - m_listener = dcb_alloc(DCB_ROLE_SERVICE_LISTENER, self, m_service); - - if (!m_listener) - { - MXS_ERROR("Failed to create listener for service %s.", m_service->name); - return false; - } - - m_listener->func = m_proto_func; - m_listener->authfunc = m_auth_func; - - /** - * Normally, we'd allocate the DCB specific authentication data. As the - * listeners aren't normal DCBs, we can skip that. - */ - std::stringstream ss; - ss << m_address << "|" << m_port; - auto config_bind = ss.str(); - /** Load the authentication users before before starting the listener */ - if (m_listener->authfunc.loadusers) + if (m_auth_func.loadusers) { - switch (m_listener->authfunc.loadusers(this)) + switch (m_auth_func.loadusers(this)) { case MXS_AUTH_LOADUSERS_FATAL: MXS_ERROR("[%s] Fatal error when loading users for listener '%s', " @@ -779,26 +879,29 @@ bool Listener::listen(const SListener& self) } bool rval = false; + std::stringstream ss; + ss << m_address << "|" << m_port; - if (dcb_listen(m_listener, config_bind.data()) == 0) - { - m_listener->session = session_alloc(m_service, m_listener); + // TODO: Detect the need for SO_REUSEPORT here + rval = listen_shared(ss.str()); - if (m_listener->session != NULL) - { - m_listener->session->state = SESSION_STATE_LISTENER; - m_state = STARTED; - rval = true; - } - else - { - MXS_ERROR("[%s] Failed to create listener session.", m_service->name); - } - } - else + if (rval) { - MXS_ERROR("[%s] Failed to listen on %s", m_service->name, config_bind.c_str()); + MXS_NOTICE("Listening for connections at [%s]:%u", m_address.c_str(), m_port); } return rval; } + +uint32_t Listener::poll_handler(MXB_POLL_DATA* data, MXB_WORKER* worker, uint32_t events) +{ + Listener* listener = static_cast(data); + DCB* client_dcb; + + while ((client_dcb = dcb_accept(listener->m_self))) + { + listener->m_proto_func.accept(client_dcb); + } + + return 1; +} diff --git a/server/core/service.cc b/server/core/service.cc index d95343e7c..3337230c8 100644 --- a/server/core/service.cc +++ b/server/core/service.cc @@ -323,7 +323,7 @@ bool service_isvalid(Service* service) static int serviceStartPort(Service* service, const SListener& port) { mxb_assert(service && service->router && service->router_instance); - return port->listen(port); + return port->listen(); } /**