diff --git a/include/maxscale/listener.hh b/include/maxscale/listener.hh index 1540acf4e..9cfa62f83 100644 --- a/include/maxscale/listener.hh +++ b/include/maxscale/listener.hh @@ -41,6 +41,13 @@ public: ~Listener(); + enum class Type + { + UNIX_SOCKET, // UNIX domain socket shared between workers + SHARED_TCP, // TCP listening socket shared between workers + UNIQUE_TCP // Unique TCP listening socket for each worker + }; + /** * Create a new listener * @@ -173,6 +180,11 @@ public: */ void print_users(DCB* dcb); + Type type() const + { + return m_type; + } + // Functions that are temporarily public bool create_listener_config(const char* filename); struct users* users() const; @@ -203,6 +215,8 @@ private: MXS_PROTOCOL m_proto_func; /**< Preloaded protocol functions */ MXS_AUTHENTICATOR m_auth_func; /**< Preloaded authenticator functions */ + Type m_type; /**< The type of the listener */ + mxs::rworker_local m_fd {-1}; /**< File descriptor the listener listens on */ /** A shared pointer to the listener itself that is passed as the argument to @@ -242,6 +256,13 @@ private: */ bool listen_shared(); + /** + * Listen with a unique file descriptor for each worker + * + * @return True if the listening was started successfully + */ + bool listen_unique(); + /** * Close all opened file descriptors for this listener */ diff --git a/server/core/listener.cc b/server/core/listener.cc index d4b6eade6..049b2677d 100644 --- a/server/core/listener.cc +++ b/server/core/listener.cc @@ -64,6 +64,18 @@ Listener::Listener(SERVICE* service, const std::string& name, const std::string& , m_proto_func(*(MXS_PROTOCOL*)load_module(protocol.c_str(), MODULE_PROTOCOL)) , m_auth_func(*(MXS_AUTHENTICATOR*)load_module(authenticator.c_str(), MODULE_AUTHENTICATOR)) { + if (m_address[0] == '/') + { + m_type = Type::UNIX_SOCKET; + } + else if (mxs::have_so_reuseport()) + { + m_type = Type::UNIQUE_TCP; + } + else + { + m_type = Type::SHARED_TCP; + } } Listener::~Listener() @@ -155,14 +167,46 @@ void Listener::destroy(const SListener& listener) all_listeners.remove(listener); } +// Helper function that executes a function on all workers and checks the result +static bool execute_and_check(const std::function& func) +{ + std::atomic n_ok {0}; + auto wrapper = [func, &n_ok]() { + if (func()) + { + ++n_ok; + } + }; + + size_t n_executed = mxs::RoutingWorker::execute_concurrently(wrapper); + return n_executed == n_ok; +} + bool Listener::stop() { bool rval = (m_state == STOPPED); - if (m_state == STARTED && mxs::RoutingWorker::remove_shared_fd(m_fd)) + if (m_state == STARTED) { - m_state = STOPPED; - rval = true; + if (m_type == Type::UNIQUE_TCP) + { + if (execute_and_check([this]() { + mxb_assert(*m_fd != -1); + return mxs::RoutingWorker::get_current()->remove_fd(*m_fd); + })) + { + m_state = STOPPED; + rval = true; + } + } + else + { + if (mxs::RoutingWorker::remove_shared_fd(m_fd)) + { + m_state = STOPPED; + rval = true; + } + } } return rval; @@ -172,10 +216,27 @@ bool Listener::start() { bool rval = (m_state == STARTED); - if (m_state == STOPPED && mxs::RoutingWorker::add_shared_fd(m_fd, EPOLLIN, this)) + if (m_state == STOPPED) { - m_state = STARTED; - rval = true; + if (m_type == Type::UNIQUE_TCP) + { + if (execute_and_check([this]() { + mxb_assert(*m_fd != -1); + return mxs::RoutingWorker::get_current()->add_fd(*m_fd, EPOLLIN, this); + })) + { + m_state = STARTED; + rval = true; + } + } + else + { + if (mxs::RoutingWorker::add_shared_fd(*m_fd, EPOLLIN, this)) + { + m_state = STARTED; + rval = true; + } + } } return rval; @@ -980,6 +1041,41 @@ bool Listener::listen_shared() return rval; } +bool Listener::listen_unique() +{ + auto open_socket = [this]() { + bool rval = false; + int fd = start_listening(m_address.c_str(), m_port); + + if (fd != -1) + { + if (mxs::RoutingWorker::get_current()->add_fd(fd, EPOLLIN, this)) + { + // Set the worker-local fd to the unique value + *m_fd = fd; + rval = true; + } + else + { + close(fd); + } + } + + return rval; + }; + + bool rval = execute_and_check(open_socket); + + if (!rval) + { + close_all_fds(); + MXS_ERROR("[%s] One or more workers failed to listen on '[%s]:%u'.", m_service->name(), + m_address.c_str(), m_port); + } + + return rval; +} + bool Listener::listen() { m_state = FAILED; @@ -1006,7 +1102,14 @@ bool Listener::listen() bool rval = false; - rval = listen_shared(); + if (m_type == Type::UNIQUE_TCP) + { + rval = listen_unique(); + } + else + { + rval = listen_shared(); + } if (rval) { @@ -1024,10 +1127,17 @@ uint32_t Listener::poll_handler(MXB_POLL_DATA* data, MXB_WORKER* worker, uint32_ while ((client_dcb = listener->accept_one_dcb())) { - auto worker = mxs::RoutingWorker::pick_worker(); - worker->execute([listener, client_dcb]() { - listener->m_proto_func.accept(client_dcb); - }, mxs::RoutingWorker::EXECUTE_AUTO); + if (listener->type() == Type::UNIQUE_TCP) + { + listener->m_proto_func.accept(client_dcb); + } + else + { + auto worker = mxs::RoutingWorker::pick_worker(); + worker->execute([listener, client_dcb]() { + listener->m_proto_func.accept(client_dcb); + }, mxs::RoutingWorker::EXECUTE_AUTO); + } } return 1;