MXS-2196: Store listener references in Service
By storing a shared pointer to the listeners in the services, they will be available as long as the service using them exists. This enables clean destruction of listeners that still have open sessions. The listener creation code now separately creates the listener and links it to the service. Also replaced relevant parts of the related code with the listener implemented versions of it.
This commit is contained in:
parent
a6063b5e85
commit
330719c8f9
@ -3887,28 +3887,18 @@ int create_new_listener(CONFIG_CONTEXT* obj)
|
||||
// authenticators that are specific to each protocol module
|
||||
char* authenticator = config_get_value(obj->parameters, CN_AUTHENTICATOR);
|
||||
char* authenticator_options = config_get_value(obj->parameters, CN_AUTHENTICATOR_OPTIONS);
|
||||
int net_port = socket ? 0 : atoi(port);
|
||||
|
||||
if (socket)
|
||||
auto listener = Listener::create(service, obj->object, protocol, socket ? socket : address,
|
||||
net_port, authenticator, authenticator_options, ssl_info);
|
||||
|
||||
if (listener)
|
||||
{
|
||||
listener_alloc(service,
|
||||
obj->object,
|
||||
protocol,
|
||||
socket,
|
||||
0,
|
||||
authenticator,
|
||||
authenticator_options,
|
||||
ssl_info);
|
||||
service->add_listener(listener);
|
||||
}
|
||||
else if (port)
|
||||
else
|
||||
{
|
||||
listener_alloc(service,
|
||||
obj->object,
|
||||
protocol,
|
||||
address,
|
||||
atoi(port),
|
||||
authenticator,
|
||||
authenticator_options,
|
||||
ssl_info);
|
||||
++error_count;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1015,24 +1015,23 @@ bool runtime_create_listener(Service* service,
|
||||
else
|
||||
{
|
||||
const char* print_addr = addr ? addr : "::";
|
||||
SListener listener = listener_alloc(service,
|
||||
name,
|
||||
proto,
|
||||
addr,
|
||||
u_port,
|
||||
auth,
|
||||
auth_opt,
|
||||
ssl);
|
||||
SListener listener = Listener::create(service,
|
||||
name,
|
||||
proto,
|
||||
addr,
|
||||
u_port,
|
||||
auth,
|
||||
auth_opt,
|
||||
ssl);
|
||||
|
||||
if (listener && listener_serialize(listener))
|
||||
{
|
||||
MXS_NOTICE("Created %slistener '%s' at %s:%s for service '%s'",
|
||||
ssl ? "TLS encrypted " : "",
|
||||
name,
|
||||
print_addr,
|
||||
port,
|
||||
service->name);
|
||||
if (serviceLaunchListener(service, listener))
|
||||
ssl ? "TLS encrypted " : "", name, print_addr, port, service->name);
|
||||
|
||||
service->add_listener(listener);
|
||||
|
||||
if (listener->listen())
|
||||
{
|
||||
rval = true;
|
||||
}
|
||||
|
@ -81,6 +81,20 @@ public:
|
||||
*/
|
||||
const FilterList& get_filters() const;
|
||||
|
||||
/**
|
||||
* Add a listener to a service
|
||||
*
|
||||
* @param listener Listener to add
|
||||
*/
|
||||
void add_listener(const SListener& listener);
|
||||
|
||||
/**
|
||||
* Remove a listener from a service
|
||||
*
|
||||
* @param listener Listener to remove
|
||||
*/
|
||||
void remove_listener(const SListener& listener);
|
||||
|
||||
/**
|
||||
* Reload users for all listeners
|
||||
*
|
||||
@ -104,15 +118,16 @@ public:
|
||||
mutable std::mutex lock;
|
||||
|
||||
private:
|
||||
FilterList m_filters; /**< Ordered list of filters */
|
||||
std::string m_name; /**< Name of the service */
|
||||
std::string m_router_name; /**< Router module */
|
||||
std::string m_user; /**< Username */
|
||||
std::string m_password; /**< Password */
|
||||
std::string m_weightby; /**< Weighting parameter name */
|
||||
std::string m_version_string; /**< Version string sent to clients */
|
||||
RateLimits m_rate_limits; /**< The refresh rate limits for users of each thread */
|
||||
uint64_t m_wkey; /**< Key for worker local data */
|
||||
FilterList m_filters; /**< Ordered list of filters */
|
||||
std::vector<SListener> m_listeners; /**< Listeners pointing to this service */
|
||||
std::string m_name; /**< Name of the service */
|
||||
std::string m_router_name; /**< Router module */
|
||||
std::string m_user; /**< Username */
|
||||
std::string m_password; /**< Password */
|
||||
std::string m_weightby; /**< Weighting parameter name */
|
||||
std::string m_version_string;/**< Version string sent to clients */
|
||||
RateLimits m_rate_limits; /**< The refresh rate limits for users of each thread */
|
||||
uint64_t m_wkey; /**< Key for worker local data */
|
||||
|
||||
// Get the worker local filter list
|
||||
FilterList* get_local_filters() const;
|
||||
|
@ -328,129 +328,8 @@ bool service_isvalid(Service* service)
|
||||
*/
|
||||
static int serviceStartPort(Service* service, const SListener& port)
|
||||
{
|
||||
const size_t ANY_IPV4_ADDRESS_LEN = 7; // strlen("0:0:0:0");
|
||||
|
||||
int listeners = 0;
|
||||
size_t config_bind_len =
|
||||
(!port->address.empty() ? port->address.length() : ANY_IPV4_ADDRESS_LEN) + 1 + UINTLEN(port->port);
|
||||
char config_bind[config_bind_len + 1]; // +1 for NULL
|
||||
MXS_PROTOCOL* funcs;
|
||||
|
||||
if (service == NULL || service->router == NULL || service->router_instance == NULL)
|
||||
{
|
||||
/* Should never happen, this guarantees it can't */
|
||||
MXS_ERROR("Attempt to start port with null or incomplete service");
|
||||
mxb_assert(false);
|
||||
return 0;
|
||||
}
|
||||
|
||||
port->listener = dcb_alloc(DCB_ROLE_SERVICE_LISTENER, port.get());
|
||||
|
||||
if (port->listener == NULL)
|
||||
{
|
||||
MXS_ERROR("Failed to create listener for service %s.", service->name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
port->listener->service = service;
|
||||
|
||||
if ((funcs = (MXS_PROTOCOL*)load_module(port->protocol.c_str(), MODULE_PROTOCOL)) == NULL)
|
||||
{
|
||||
MXS_ERROR("Unable to load protocol module %s. Listener for service %s not started.",
|
||||
port->protocol.c_str(),
|
||||
service->name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
memcpy(&(port->listener->func), funcs, sizeof(MXS_PROTOCOL));
|
||||
|
||||
const char* authenticator_name = "NullAuthDeny";
|
||||
|
||||
if (!port->authenticator.empty())
|
||||
{
|
||||
authenticator_name = port->authenticator.c_str();
|
||||
}
|
||||
else if (port->listener->func.auth_default)
|
||||
{
|
||||
authenticator_name = port->listener->func.auth_default();
|
||||
}
|
||||
|
||||
MXS_AUTHENTICATOR* authfuncs = (MXS_AUTHENTICATOR*)load_module(authenticator_name, MODULE_AUTHENTICATOR);
|
||||
|
||||
if (authfuncs == NULL)
|
||||
{
|
||||
MXS_ERROR("Failed to load authenticator module '%s' for listener '%s'",
|
||||
authenticator_name,
|
||||
port->name.c_str());
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Add protocol and authenticator capabilities from the listener
|
||||
const MXS_MODULE* proto_mod = get_module(port->protocol.c_str(), MODULE_PROTOCOL);
|
||||
const MXS_MODULE* auth_mod = get_module(authenticator_name, MODULE_AUTHENTICATOR);
|
||||
mxb_assert(proto_mod && auth_mod);
|
||||
service->capabilities |= proto_mod->module_capabilities | auth_mod->module_capabilities;
|
||||
|
||||
memcpy(&port->listener->authfunc, authfuncs, sizeof(MXS_AUTHENTICATOR));
|
||||
|
||||
/**
|
||||
* Normally, we'd allocate the DCB specific authentication data. As the
|
||||
* listeners aren't normal DCBs, we can skip that.
|
||||
*/
|
||||
|
||||
if (!port->address.empty())
|
||||
{
|
||||
sprintf(config_bind, "%s|%d", port->address.c_str(), port->port);
|
||||
}
|
||||
else
|
||||
{
|
||||
sprintf(config_bind, "::|%d", port->port);
|
||||
}
|
||||
|
||||
/** Load the authentication users before before starting the listener */
|
||||
if (port->listener->authfunc.loadusers)
|
||||
{
|
||||
switch (port->listener->authfunc.loadusers(port.get()))
|
||||
{
|
||||
case MXS_AUTH_LOADUSERS_FATAL:
|
||||
MXS_ERROR("[%s] Fatal error when loading users for listener '%s', "
|
||||
"service is not started.",
|
||||
service->name,
|
||||
port->name.c_str());
|
||||
return 0;
|
||||
|
||||
case MXS_AUTH_LOADUSERS_ERROR:
|
||||
MXS_WARNING("[%s] Failed to load users for listener '%s', authentication"
|
||||
" might not work.",
|
||||
service->name,
|
||||
port->name.c_str());
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (port->listener->func.listen(port->listener, config_bind))
|
||||
{
|
||||
port->listener->session = session_alloc(service, port->listener);
|
||||
|
||||
if (port->listener->session != NULL)
|
||||
{
|
||||
port->listener->session->state = SESSION_STATE_LISTENER;
|
||||
listeners += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("[%s] Failed to create listener session.", service->name);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("[%s] Failed to listen on %s", service->name, config_bind);
|
||||
}
|
||||
|
||||
return listeners;
|
||||
mxb_assert(service && service->router && service->router_instance);
|
||||
return port->listen();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -566,13 +445,13 @@ bool serviceLaunchListener(Service* service, const SListener& port)
|
||||
bool serviceStopListener(SERVICE* svc, const char* name)
|
||||
{
|
||||
auto listener = listener_find(name);
|
||||
return listener && listener->service == svc && listener_stop(listener);
|
||||
return listener && listener->service() == svc && listener->stop();
|
||||
}
|
||||
|
||||
bool serviceStartListener(SERVICE* svc, const char* name)
|
||||
{
|
||||
auto listener = listener_find(name);
|
||||
return listener && listener->service == svc && listener_start(listener);
|
||||
return listener && listener->service() == svc && listener->start();
|
||||
}
|
||||
|
||||
int service_launch_all()
|
||||
@ -612,7 +491,7 @@ bool serviceStop(SERVICE* service)
|
||||
{
|
||||
for (const auto& listener : listener_find_by_service(service))
|
||||
{
|
||||
if (listener_stop(listener))
|
||||
if (listener->stop())
|
||||
{
|
||||
listeners++;
|
||||
}
|
||||
@ -640,7 +519,7 @@ bool serviceStart(SERVICE* service)
|
||||
{
|
||||
for (const auto& listener : listener_find_by_service(service))
|
||||
{
|
||||
if (listener_start(listener))
|
||||
if (listener->start())
|
||||
{
|
||||
listeners++;
|
||||
}
|
||||
@ -657,9 +536,9 @@ bool service_remove_listener(Service* service, const char* target)
|
||||
bool rval = false;
|
||||
auto listener = listener_find(target);
|
||||
|
||||
if (listener && listener->service == service)
|
||||
if (listener && listener->service() == service)
|
||||
{
|
||||
listener_destroy(listener);
|
||||
listener->close();
|
||||
rval = true;
|
||||
}
|
||||
|
||||
@ -992,6 +871,16 @@ const Service::FilterList& Service::get_filters() const
|
||||
return *get_local_filters();
|
||||
}
|
||||
|
||||
void Service::add_listener(const SListener& listener)
|
||||
{
|
||||
m_listeners.push_back(listener);
|
||||
}
|
||||
|
||||
void Service::remove_listener(const SListener& listener)
|
||||
{
|
||||
m_listeners.erase(std::remove(m_listeners.begin(), m_listeners.end(), listener), m_listeners.end());
|
||||
}
|
||||
|
||||
Service* service_internal_find(const char* name)
|
||||
{
|
||||
LockGuard guard(this_unit.lock);
|
||||
@ -1262,29 +1151,27 @@ bool Service::refresh_users()
|
||||
for (const auto& listener : listener_find_by_service(this))
|
||||
{
|
||||
/** Load the authentication users before before starting the listener */
|
||||
if (listener->listener && listener->listener->authfunc.loadusers)
|
||||
|
||||
switch (listener->load_users())
|
||||
{
|
||||
switch (listener->listener->authfunc.loadusers(listener.get()))
|
||||
{
|
||||
case MXS_AUTH_LOADUSERS_FATAL:
|
||||
MXS_ERROR("[%s] Fatal error when loading users for listener '%s',"
|
||||
" authentication will not work.",
|
||||
m_name.c_str(),
|
||||
listener->name.c_str());
|
||||
ret = false;
|
||||
break;
|
||||
case MXS_AUTH_LOADUSERS_FATAL:
|
||||
MXS_ERROR("[%s] Fatal error when loading users for listener '%s',"
|
||||
" authentication will not work.",
|
||||
m_name.c_str(),
|
||||
listener->name());
|
||||
ret = false;
|
||||
break;
|
||||
|
||||
case MXS_AUTH_LOADUSERS_ERROR:
|
||||
MXS_WARNING("[%s] Failed to load users for listener '%s', authentication"
|
||||
" might not work.",
|
||||
m_name.c_str(),
|
||||
listener->name.c_str());
|
||||
ret = false;
|
||||
break;
|
||||
case MXS_AUTH_LOADUSERS_ERROR:
|
||||
MXS_WARNING("[%s] Failed to load users for listener '%s', authentication"
|
||||
" might not work.",
|
||||
m_name.c_str(),
|
||||
listener->name());
|
||||
ret = false;
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1723,14 +1610,7 @@ void service_print_users(DCB* dcb, const SERVICE* service)
|
||||
{
|
||||
for (const auto& listener : listener_find_by_service(service))
|
||||
{
|
||||
if (listener->listener && listener->listener->authfunc.diagnostic)
|
||||
{
|
||||
dcb_printf(dcb, "User names (%s): ", listener->name.c_str());
|
||||
|
||||
listener->listener->authfunc.diagnostic(dcb, listener.get());
|
||||
|
||||
dcb_printf(dcb, "\n");
|
||||
}
|
||||
listener->print_users(dcb);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,14 +74,14 @@ static int test1()
|
||||
mxb_assert_message(0 != service_isvalid(service), "Service must be valid after creation");
|
||||
mxb_assert_message(0 == strcmp("MyService", service->name), "Service must have given name");
|
||||
fprintf(stderr, "\t..done\nAdding protocol testprotocol.");
|
||||
mxb_assert_message(listener_alloc(service,
|
||||
"TestProtocol",
|
||||
"mariadbclient",
|
||||
"localhost",
|
||||
9876,
|
||||
"MySQLAuth",
|
||||
NULL,
|
||||
NULL),
|
||||
mxb_assert_message(Listener::create(service,
|
||||
"TestProtocol",
|
||||
"mariadbclient",
|
||||
"localhost",
|
||||
9876,
|
||||
"MySQLAuth",
|
||||
NULL,
|
||||
NULL),
|
||||
"Add Protocol should succeed");
|
||||
mxb_assert_message(0 != serviceHasListener(service, "TestProtocol", "mariadbclient", "localhost", 9876),
|
||||
"Service should have new protocol as requested");
|
||||
|
Loading…
x
Reference in New Issue
Block a user