MXS-1951: Add SO_REUSEPORT support

If SO_REUSEPORT is available and the kernel supports it, listeners will
now listen on separate file descriptors. This removes the need for
cross-worker communication when in normal operation which should make
MaxScale scale better.
This commit is contained in:
Markus Mäkelä
2019-03-20 12:06:09 +02:00
parent cb957200c9
commit df3b501563
2 changed files with 142 additions and 11 deletions

View File

@ -41,6 +41,13 @@ public:
~Listener();
enum class Type
{
UNIX_SOCKET, // UNIX domain socket shared between workers
SHARED_TCP, // TCP listening socket shared between workers
UNIQUE_TCP // Unique TCP listening socket for each worker
};
/**
* Create a new listener
*
@ -173,6 +180,11 @@ public:
*/
void print_users(DCB* dcb);
Type type() const
{
return m_type;
}
// Functions that are temporarily public
bool create_listener_config(const char* filename);
struct users* users() const;
@ -203,6 +215,8 @@ private:
MXS_PROTOCOL m_proto_func; /**< Preloaded protocol functions */
MXS_AUTHENTICATOR m_auth_func; /**< Preloaded authenticator functions */
Type m_type; /**< The type of the listener */
mxs::rworker_local<int> m_fd {-1}; /**< File descriptor the listener listens on */
/** A shared pointer to the listener itself that is passed as the argument to
@ -242,6 +256,13 @@ private:
*/
bool listen_shared();
/**
* Listen with a unique file descriptor for each worker
*
* @return True if the listening was started successfully
*/
bool listen_unique();
/**
* Close all opened file descriptors for this listener
*/

View File

@ -64,6 +64,18 @@ Listener::Listener(SERVICE* service, const std::string& name, const std::string&
, m_proto_func(*(MXS_PROTOCOL*)load_module(protocol.c_str(), MODULE_PROTOCOL))
, m_auth_func(*(MXS_AUTHENTICATOR*)load_module(authenticator.c_str(), MODULE_AUTHENTICATOR))
{
if (m_address[0] == '/')
{
m_type = Type::UNIX_SOCKET;
}
else if (mxs::have_so_reuseport())
{
m_type = Type::UNIQUE_TCP;
}
else
{
m_type = Type::SHARED_TCP;
}
}
Listener::~Listener()
@ -155,14 +167,46 @@ void Listener::destroy(const SListener& listener)
all_listeners.remove(listener);
}
// Helper function that executes a function on all workers and checks the result
static bool execute_and_check(const std::function<bool ()>& func)
{
std::atomic<size_t> n_ok {0};
auto wrapper = [func, &n_ok]() {
if (func())
{
++n_ok;
}
};
size_t n_executed = mxs::RoutingWorker::execute_concurrently(wrapper);
return n_executed == n_ok;
}
bool Listener::stop()
{
bool rval = (m_state == STOPPED);
if (m_state == STARTED && mxs::RoutingWorker::remove_shared_fd(m_fd))
if (m_state == STARTED)
{
m_state = STOPPED;
rval = true;
if (m_type == Type::UNIQUE_TCP)
{
if (execute_and_check([this]() {
mxb_assert(*m_fd != -1);
return mxs::RoutingWorker::get_current()->remove_fd(*m_fd);
}))
{
m_state = STOPPED;
rval = true;
}
}
else
{
if (mxs::RoutingWorker::remove_shared_fd(m_fd))
{
m_state = STOPPED;
rval = true;
}
}
}
return rval;
@ -172,10 +216,27 @@ bool Listener::start()
{
bool rval = (m_state == STARTED);
if (m_state == STOPPED && mxs::RoutingWorker::add_shared_fd(m_fd, EPOLLIN, this))
if (m_state == STOPPED)
{
m_state = STARTED;
rval = true;
if (m_type == Type::UNIQUE_TCP)
{
if (execute_and_check([this]() {
mxb_assert(*m_fd != -1);
return mxs::RoutingWorker::get_current()->add_fd(*m_fd, EPOLLIN, this);
}))
{
m_state = STARTED;
rval = true;
}
}
else
{
if (mxs::RoutingWorker::add_shared_fd(*m_fd, EPOLLIN, this))
{
m_state = STARTED;
rval = true;
}
}
}
return rval;
@ -980,6 +1041,41 @@ bool Listener::listen_shared()
return rval;
}
bool Listener::listen_unique()
{
auto open_socket = [this]() {
bool rval = false;
int fd = start_listening(m_address.c_str(), m_port);
if (fd != -1)
{
if (mxs::RoutingWorker::get_current()->add_fd(fd, EPOLLIN, this))
{
// Set the worker-local fd to the unique value
*m_fd = fd;
rval = true;
}
else
{
close(fd);
}
}
return rval;
};
bool rval = execute_and_check(open_socket);
if (!rval)
{
close_all_fds();
MXS_ERROR("[%s] One or more workers failed to listen on '[%s]:%u'.", m_service->name(),
m_address.c_str(), m_port);
}
return rval;
}
bool Listener::listen()
{
m_state = FAILED;
@ -1006,7 +1102,14 @@ bool Listener::listen()
bool rval = false;
rval = listen_shared();
if (m_type == Type::UNIQUE_TCP)
{
rval = listen_unique();
}
else
{
rval = listen_shared();
}
if (rval)
{
@ -1024,10 +1127,17 @@ uint32_t Listener::poll_handler(MXB_POLL_DATA* data, MXB_WORKER* worker, uint32_
while ((client_dcb = listener->accept_one_dcb()))
{
auto worker = mxs::RoutingWorker::pick_worker();
worker->execute([listener, client_dcb]() {
listener->m_proto_func.accept(client_dcb);
}, mxs::RoutingWorker::EXECUTE_AUTO);
if (listener->type() == Type::UNIQUE_TCP)
{
listener->m_proto_func.accept(client_dcb);
}
else
{
auto worker = mxs::RoutingWorker::pick_worker();
worker->execute([listener, client_dcb]() {
listener->m_proto_func.accept(client_dcb);
}, mxs::RoutingWorker::EXECUTE_AUTO);
}
}
return 1;