MXS-2196: Remove DCB from Listener

Replaced the DCB with a single file descriptor that the listener listens
on and which is added to all of the workers. The Listener also extends the
MXB_POLL_DATA which allows it to handle epoll events.

Moved the code that creates the listening socket into listener.cc where it
belongs and did a minor cleanup of it.
This commit is contained in:
Markus Mäkelä 2018-12-01 20:59:44 +02:00
parent 3791fdded7
commit b3fbc6aa3d
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19
5 changed files with 215 additions and 246 deletions

View File

@ -23,6 +23,7 @@
#include <maxscale/protocol.h>
#include <maxscale/ssl.h>
#include <maxscale/service.hh>
#include <maxscale/routingworker.hh>
struct DCB;
class SERVICE;
@ -34,7 +35,7 @@ using SListener = std::shared_ptr<Listener>;
* The Listener class is used to link a network port to a service. It defines the name of the
* protocol module that should be loaded as well as the authenticator that is used.
*/
class Listener
class Listener : public MXB_POLL_DATA
{
public:
@ -78,7 +79,7 @@ public:
*
* @return True if the listener was able to start listening
*/
bool listen(const SListener& self);
bool listen();
/**
* Stop the listener
@ -172,6 +173,12 @@ public:
*/
void print_users(DCB* dcb);
// TODO: Move dcb_accept code into listener.cc and remove this
int fd() const
{
return m_fd;
}
// Functions that are temporarily public
bool create_listener_config(const char* filename);
struct users* users() const;
@ -196,13 +203,27 @@ private:
std::string m_auth_options; /**< Authenticator options */
void* m_auth_instance; /**< Authenticator instance */
SSL_LISTENER* m_ssl; /**< Structure of SSL data or NULL */
DCB* m_listener; /**< The DCB for the listener */
struct users* m_users; /**< The user data for this listener */
SERVICE* m_service; /**< The service which used by this listener */
std::atomic<bool> m_active; /**< True if the port has not been deleted */
MXS_PROTOCOL m_proto_func; /**< Preloaded protocol functions */
MXS_AUTHENTICATOR m_auth_func; /**< Preloaded authenticator functions */
int m_fd; /**< File descriptor the listener listens on */
/** A shared pointer to the listener itself that is passed as the argument to
* the protocol's accept function. This allows client connections to live
* longer than the listener they started on.
*
* This will eventually be replaced with a shared_ptr of the authenticator instance as that is
* what is actually required by the client sessions.
*
* In practice as a service must outlive all sessions on it, the reference could be owned by the service
* instead of each individual client. This would remove the need to increment the listener reference
* count every time a client is accepted.
*/
SListener m_self;
/**
* Creates a new listener that points to a service
*
@ -220,6 +241,12 @@ private:
const std::string& protocol, const std::string& authenticator,
const std::string& auth_opts, void* auth_instance, SSL_LISTENER* ssl);
// Listen on a file descriptor shared between all workers
bool listen_shared(std::string config_bind);
// Handler for EPOLL_IN events
static uint32_t poll_handler(MXB_POLL_DATA* data, MXB_WORKER* worker, uint32_t events);
friend DCB* dcb_accept(const SListener& listener);
};

View File

@ -1022,7 +1022,7 @@ bool runtime_create_listener(Service* service,
MXS_NOTICE("Created %slistener '%s' at %s:%s for service '%s'",
ssl ? "TLS encrypted " : "", name, print_addr, port, service->name);
if (listener->listen(listener))
if (listener->listen())
{
rval = true;
}

View File

@ -2321,7 +2321,7 @@ DCB* dcb_accept(const SListener& listener)
int c_sock;
struct sockaddr_storage client_conn;
if ((c_sock = dcb_accept_one_connection(listener->m_listener->fd, (struct sockaddr*)&client_conn)) >= 0)
if ((c_sock = dcb_accept_one_connection(listener->fd(), (struct sockaddr*)&client_conn)) >= 0)
{
configure_network_socket(c_sock, client_conn.ss_family);
@ -2334,7 +2334,6 @@ DCB* dcb_accept(const SListener& listener)
}
else
{
client_dcb->service = listener->service();
client_dcb->session = session_set_dummy(client_dcb);
client_dcb->fd = c_sock;
@ -2497,145 +2496,6 @@ static int dcb_accept_one_connection(int fd, struct sockaddr* client_conn)
return c_sock;
}
/**
* @brief Create a listener, add new information to the given DCB
*
* First creates and opens a socket, either TCP or Unix according to the
* configuration data provided. Then try to listen on the socket and
* record the socket in the given DCB. Add the given DCB into the poll
* list. The protocol name does not affect the logic, but is used in
* log messages.
*
* @param dcb Listener DCB that is being created
* @param config Configuration for port to listen on
*
* @return 0 if new listener created successfully, otherwise -1
*/
int dcb_listen(DCB* dcb, const char* config)
{
char host[strlen(config) + 1];
strcpy(host, config);
char* port_str = strrchr(host, '|');
uint16_t port = 0;
if (port_str)
{
*port_str++ = 0;
port = atoi(port_str);
}
int listener_socket = -1;
if (strchr(host, '/'))
{
listener_socket = dcb_listen_create_socket_unix(host);
if (listener_socket != -1)
{
dcb->path = MXS_STRDUP_A(host);
}
}
else if (port > 0)
{
listener_socket = dcb_listen_create_socket_inet(host, port);
if (listener_socket == -1 && strcmp(host, "::") == 0)
{
/** Attempt to bind to the IPv4 if the default IPv6 one is used */
MXS_WARNING("Failed to bind on default IPv6 host '::', attempting "
"to bind on IPv4 version '0.0.0.0'");
strcpy(host, "0.0.0.0");
listener_socket = dcb_listen_create_socket_inet(host, port);
}
}
else
{
// We don't have a socket path or a network port
mxb_assert(false);
}
if (listener_socket < 0)
{
mxb_assert(listener_socket == -1);
return -1;
}
/**
* The use of INT_MAX for backlog length in listen() allows the end-user to
* control the backlog length with the net.ipv4.tcp_max_syn_backlog kernel
* option since the parameter is silently truncated to the configured value.
*
* @see man 2 listen
*/
if (listen(listener_socket, INT_MAX) != 0)
{
MXS_ERROR("Failed to start listening on [%s]:%u: %d, %s",
host,
port,
errno,
mxs_strerror(errno));
close(listener_socket);
return -1;
}
MXS_NOTICE("Listening for connections at [%s]:%u", host, port);
// assign listener_socket to dcb
dcb->fd = listener_socket;
// add listening socket to poll structure
if (poll_add_dcb(dcb) != 0)
{
MXS_ERROR("MaxScale encountered system limit while "
"attempting to register on an epoll instance.");
return -1;
}
return 0;
}
/**
* @brief Create a network listener socket
*
* @param host The network address to listen on
* @param port The port to listen on
* @return The opened socket or -1 on error
*/
static int dcb_listen_create_socket_inet(const char* host, uint16_t port)
{
struct sockaddr_storage server_address = {};
return open_network_socket(MXS_SOCKET_LISTENER, &server_address, host, port);
}
/**
* @brief Create a Unix domain socket
*
* @param path The socket path
* @return The opened socket or -1 on error
*/
static int dcb_listen_create_socket_unix(const char* path)
{
if (unlink(path) == -1 && errno != ENOENT)
{
MXS_ERROR("Failed to unlink Unix Socket %s: %d %s",
path,
errno,
mxs_strerror(errno));
}
struct sockaddr_un local_addr;
int listener_socket = open_unix_socket(MXS_SOCKET_LISTENER, &local_addr, path);
if (listener_socket >= 0 && chmod(path, 0777) < 0)
{
MXS_ERROR("Failed to change permissions on UNIX Domain socket '%s': %d, %s",
path,
errno,
mxs_strerror(errno));
}
return listener_socket;
}
/**
* @brief Set socket options, log an error if fails
*
@ -2989,49 +2849,28 @@ static uint32_t dcb_process_poll_events(DCB* dcb, uint32_t events)
}
if ((events & EPOLLIN) && (dcb->n_close == 0))
{
if (dcb->state == DCB_STATE_LISTENING)
MXS_DEBUG("%lu [poll_waitevents] "
"Read in dcb %p fd %d",
pthread_self(),
dcb,
dcb->fd);
rc |= MXB_POLL_READ;
if (dcb_session_check(dcb, "read"))
{
MXS_DEBUG("%lu [poll_waitevents] "
"Accept in fd %d",
pthread_self(),
dcb->fd);
rc |= MXB_POLL_ACCEPT;
if (dcb_session_check(dcb, "accept"))
int return_code = 1;
/** SSL authentication is still going on, we need to call dcb_accept_SSL
* until it return 1 for success or -1 for error */
if (dcb->ssl_state == SSL_HANDSHAKE_REQUIRED)
{
DCB* client_dcb;
while ((client_dcb = dcb_accept(dcb->listener)))
{
dcb->func.accept(client_dcb);
}
return_code = (DCB_ROLE_CLIENT_HANDLER == dcb->dcb_role) ?
dcb_accept_SSL(dcb) :
dcb_connect_SSL(dcb);
}
}
else
{
MXS_DEBUG("%lu [poll_waitevents] "
"Read in dcb %p fd %d",
pthread_self(),
dcb,
dcb->fd);
rc |= MXB_POLL_READ;
if (dcb_session_check(dcb, "read"))
if (1 == return_code)
{
int return_code = 1;
/** SSL authentication is still going on, we need to call dcb_accept_SSL
* until it return 1 for success or -1 for error */
if (dcb->ssl_state == SSL_HANDSHAKE_REQUIRED)
{
return_code = (DCB_ROLE_CLIENT_HANDLER == dcb->dcb_role) ?
dcb_accept_SSL(dcb) :
dcb_connect_SSL(dcb);
}
if (1 == return_code)
{
DCB_EH_NOTICE("Calling dcb->func.read(%p)", dcb);
dcb->func.read(dcb);
}
DCB_EH_NOTICE("Calling dcb->func.read(%p)", dcb);
dcb->func.read(dcb);
}
}
}

View File

@ -17,6 +17,7 @@
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <algorithm>
#include <list>
@ -33,6 +34,7 @@
#include <maxscale/users.h>
#include <maxscale/service.hh>
#include <maxscale/poll.h>
#include <maxscale/routingworker.hh>
#include "internal/modules.hh"
@ -46,7 +48,8 @@ static RSA* tmp_rsa_callback(SSL* s, int is_export, int keylength);
Listener::Listener(SERVICE* service, const std::string& name, const std::string& address,
uint16_t port, const std::string& protocol, const std::string& authenticator,
const std::string& auth_opts, void* auth_instance, SSL_LISTENER* ssl)
: m_name(name)
: MXB_POLL_DATA{Listener::poll_handler}
, m_name(name)
, m_state(CREATED)
, m_protocol(protocol)
, m_port(port)
@ -55,7 +58,6 @@ Listener::Listener(SERVICE* service, const std::string& name, const std::string&
, m_auth_options(auth_opts)
, m_auth_instance(auth_instance)
, m_ssl(ssl)
, m_listener(nullptr)
, m_users(nullptr)
, m_service(service)
, m_proto_func(*(MXS_PROTOCOL*)load_module(protocol.c_str(), MODULE_PROTOCOL))
@ -70,11 +72,6 @@ Listener::~Listener()
users_free(m_users);
}
if (m_listener)
{
dcb_close(m_listener);
}
SSL_LISTENER_free(m_ssl);
}
@ -116,6 +113,10 @@ SListener Listener::create(SERVICE* service,
if (listener)
{
// Storing a self-reference to the listener makes it possible to easily
// increment the reference count when new connections are accepted.
listener->m_self = listener;
// Note: This isn't good: we modify the service from a listener and the service itself should do this.
service->capabilities |= proto_mod->module_capabilities | auth_mod->module_capabilities;
@ -128,17 +129,14 @@ SListener Listener::create(SERVICE* service,
void Listener::destroy(const SListener& listener)
{
// Remove the listener from all workers. This makes sure that there's no concurrent access while we're
// closing things up.
listener->stop();
// TODO: This is not pretty but it works, revise when listeners are refactored. This is
// thread-safe as the listener is freed on the same thread that closes the socket.
::close(listener->m_listener->fd);
listener->m_listener->fd = -1;
close(listener->m_fd);
listener->m_fd = -1;
listener->m_state = DESTROYED;
// This frees the self-reference the listener's own DCB has to itself
listener->m_listener->listener.reset();
std::lock_guard<std::mutex> guard(listener_lock);
all_listeners.remove(listener);
}
@ -146,9 +144,8 @@ void Listener::destroy(const SListener& listener)
bool Listener::stop()
{
bool rval = (m_state == STOPPED);
mxb_assert(m_listener);
if (m_state == STARTED && poll_remove_dcb(m_listener) == 0)
if (m_state == STARTED && mxs::RoutingWorker::remove_shared_fd(m_fd))
{
m_state = STOPPED;
rval = true;
@ -160,9 +157,8 @@ bool Listener::stop()
bool Listener::start()
{
bool rval = (m_state == STARTED);
mxb_assert(m_listener);
if (m_state == STOPPED && poll_add_dcb(m_listener) == 0)
if (m_state == STOPPED && mxs::RoutingWorker::add_shared_fd(m_fd, EPOLLIN, this))
{
m_state = STARTED;
rval = true;
@ -701,11 +697,11 @@ const char* Listener::state() const
void Listener::print_users(DCB* dcb)
{
if (m_listener && m_listener->authfunc.diagnostic)
if (m_auth_func.diagnostic)
{
dcb_printf(dcb, "User names (%s): ", name());
m_listener->authfunc.diagnostic(dcb, this);
m_auth_func.diagnostic(dcb, this);
dcb_printf(dcb, "\n");
}
@ -715,9 +711,9 @@ int Listener::load_users()
{
int rval = MXS_AUTH_LOADUSERS_OK;
if (m_listener && m_listener->authfunc.loadusers)
if (m_auth_func.loadusers)
{
rval = m_listener->authfunc.loadusers(this);
rval = m_auth_func.loadusers(this);
}
return rval;
@ -734,34 +730,138 @@ void Listener::set_users(struct users* u)
m_users = u;
}
bool Listener::listen(const SListener& self)
namespace
{
/**
* @brief Create a Unix domain socket
*
* @param path The socket path
* @return The opened socket or -1 on error
*/
static int create_unix_socket(const char* path)
{
if (unlink(path) == -1 && errno != ENOENT)
{
MXS_ERROR("Failed to unlink Unix Socket %s: %d %s", path, errno, mxs_strerror(errno));
}
struct sockaddr_un local_addr;
int listener_socket = open_unix_socket(MXS_SOCKET_LISTENER, &local_addr, path);
if (listener_socket >= 0 && chmod(path, 0777) < 0)
{
MXS_ERROR("Failed to change permissions on UNIX Domain socket '%s': %d, %s",
path, errno, mxs_strerror(errno));
}
return listener_socket;
}
/**
* @brief Create a listener, add new information to the given DCB
*
* First creates and opens a socket, either TCP or Unix according to the
* configuration data provided. Then try to listen on the socket and
* record the socket in the given DCB. Add the given DCB into the poll
* list. The protocol name does not affect the logic, but is used in
* log messages.
*
* @param config Configuration for port to listen on
*
* @return New socket or -1 on error
*/
int start_listening(const char* config)
{
char host[strlen(config) + 1];
strcpy(host, config);
char* port_str = strrchr(host, '|');
uint16_t port = 0;
if (port_str)
{
*port_str++ = 0;
port = atoi(port_str);
}
mxb_assert(strchr(host, '/') || port > 0);
int listener_socket = -1;
if (strchr(host, '/'))
{
listener_socket = create_unix_socket(host);
}
else if (port > 0)
{
struct sockaddr_storage server_address = {};
listener_socket = open_network_socket(MXS_SOCKET_LISTENER, &server_address, host, port);
if (listener_socket == -1 && strcmp(host, "::") == 0)
{
/** Attempt to bind to the IPv4 if the default IPv6 one is used */
MXS_WARNING("Failed to bind on default IPv6 host '::', attempting "
"to bind on IPv4 version '0.0.0.0'");
strcpy(host, "0.0.0.0");
listener_socket = open_network_socket(MXS_SOCKET_LISTENER, &server_address, host, port);
}
}
if (listener_socket != -1)
{
/**
* The use of INT_MAX for backlog length in listen() allows the end-user to
* control the backlog length with the net.ipv4.tcp_max_syn_backlog kernel
* option since the parameter is silently truncated to the configured value.
*
* @see man 2 listen
*/
if (listen(listener_socket, INT_MAX) != 0)
{
MXS_ERROR("Failed to start listening on [%s]:%u: %d, %s", host, port, errno, mxs_strerror(errno));
close(listener_socket);
return -1;
}
}
return listener_socket;
}
}
bool Listener::listen_shared(std::string config_bind)
{
bool rval = false;
int fd = start_listening(config_bind.data());
if (fd != -1)
{
if (mxs::RoutingWorker::add_shared_fd(fd, EPOLLIN, this))
{
m_fd = fd;
rval = true;
m_state = STARTED;
}
else
{
close(fd);
}
}
else
{
MXS_ERROR("[%s] Failed to listen on %s", m_service->name, config_bind.c_str());
}
return rval;
}
bool Listener::listen()
{
m_state = FAILED;
// This is a temporary workaround until the DCBs are removed from listeners
m_listener = dcb_alloc(DCB_ROLE_SERVICE_LISTENER, self, m_service);
if (!m_listener)
{
MXS_ERROR("Failed to create listener for service %s.", m_service->name);
return false;
}
m_listener->func = m_proto_func;
m_listener->authfunc = m_auth_func;
/**
* Normally, we'd allocate the DCB specific authentication data. As the
* listeners aren't normal DCBs, we can skip that.
*/
std::stringstream ss;
ss << m_address << "|" << m_port;
auto config_bind = ss.str();
/** Load the authentication users before before starting the listener */
if (m_listener->authfunc.loadusers)
if (m_auth_func.loadusers)
{
switch (m_listener->authfunc.loadusers(this))
switch (m_auth_func.loadusers(this))
{
case MXS_AUTH_LOADUSERS_FATAL:
MXS_ERROR("[%s] Fatal error when loading users for listener '%s', "
@ -779,26 +879,29 @@ bool Listener::listen(const SListener& self)
}
bool rval = false;
std::stringstream ss;
ss << m_address << "|" << m_port;
if (dcb_listen(m_listener, config_bind.data()) == 0)
{
m_listener->session = session_alloc(m_service, m_listener);
// TODO: Detect the need for SO_REUSEPORT here
rval = listen_shared(ss.str());
if (m_listener->session != NULL)
{
m_listener->session->state = SESSION_STATE_LISTENER;
m_state = STARTED;
rval = true;
}
else
{
MXS_ERROR("[%s] Failed to create listener session.", m_service->name);
}
}
else
if (rval)
{
MXS_ERROR("[%s] Failed to listen on %s", m_service->name, config_bind.c_str());
MXS_NOTICE("Listening for connections at [%s]:%u", m_address.c_str(), m_port);
}
return rval;
}
uint32_t Listener::poll_handler(MXB_POLL_DATA* data, MXB_WORKER* worker, uint32_t events)
{
Listener* listener = static_cast<Listener*>(data);
DCB* client_dcb;
while ((client_dcb = dcb_accept(listener->m_self)))
{
listener->m_proto_func.accept(client_dcb);
}
return 1;
}

View File

@ -323,7 +323,7 @@ bool service_isvalid(Service* service)
static int serviceStartPort(Service* service, const SListener& port)
{
mxb_assert(service && service->router && service->router_instance);
return port->listen(port);
return port->listen();
}
/**