MXS-2196: Store listener reference in DCB

By storing the reference in the DCB, the two-way dependency between the
listeners and services is severed. Now the services have no direct link to
listeners and after the destruction of a listener it will be freed once
all connections through it have closed.

Due to the fact that a listener itself has a DCB that must point to a
valid listener, a self-reference is stored in the listener DCB. This is
extremely confusing and is only here to keep the code functional until the
DCB part of the listener can be factored out.
This commit is contained in:
Markus Mäkelä 2018-11-30 17:48:32 +02:00
parent 7a87ff9ce1
commit 45827dd433
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19
12 changed files with 39 additions and 71 deletions

View File

@ -26,8 +26,11 @@
#include <maxscale/protocol.h>
#include <maxscale/ssl.h>
#include <memory>
class SERVICE;
class Listener;
using SListener = std::shared_ptr<Listener>;
MXS_BEGIN_DECLS
@ -174,7 +177,7 @@ typedef enum
*/
struct DCB : public MXB_POLL_DATA
{
DCB(dcb_role_t role, Listener* listener, SERVICE* service);
DCB(dcb_role_t role, const SListener& listener, SERVICE* service);
~DCB();
bool dcb_errhandle_called = false; /**< this can be called only once */
@ -189,7 +192,7 @@ struct DCB : public MXB_POLL_DATA
size_t protocol_packet_length = 0; /**< protocol packet length */
size_t protocol_bytes_processed = 0; /**< How many bytes have been read */
struct session* session = nullptr; /**< The owning session */
Listener* listener = nullptr; /**< For a client DCB, the listener data */
SListener listener; /**< The origin of the connection */
MXS_PROTOCOL func = {}; /**< Protocol functions for the DCB */
MXS_AUTHENTICATOR authfunc = {}; /**< Authenticator functions for the DCB */
uint64_t writeqlen = 0; /**< Bytes in writeq */
@ -260,8 +263,8 @@ typedef enum
void dcb_global_init();
int dcb_write(DCB*, GWBUF*);
DCB* dcb_accept(Listener* listener);
DCB* dcb_alloc(dcb_role_t, Listener*, SERVICE* service);
DCB* dcb_accept(const SListener& listener);
DCB* dcb_alloc(dcb_role_t, const SListener&, SERVICE* service);
DCB* dcb_connect(struct server*, struct session*, const char*);
int dcb_read(DCB*, GWBUF**, int);
int dcb_drain_writeq(DCB*);

View File

@ -78,7 +78,7 @@ public:
*
* @return True if the listener was able to start listening
*/
bool listen();
bool listen(const SListener& self);
/**
* Stop the listener
@ -220,7 +220,7 @@ private:
const std::string& protocol, const std::string& authenticator,
const std::string& auth_opts, void* auth_instance, SSL_LISTENER* ssl);
friend DCB* dcb_accept(Listener* listener);
friend DCB* dcb_accept(const SListener& listener);
};
/**

View File

@ -3892,11 +3892,7 @@ int create_new_listener(CONFIG_CONTEXT* obj)
auto listener = Listener::create(service, obj->object, protocol, socket ? socket : address,
net_port, authenticator, authenticator_options, ssl_info);
if (listener)
{
service->add_listener(listener);
}
else
if (!listener)
{
++error_count;
}

View File

@ -1015,23 +1015,14 @@ bool runtime_create_listener(Service* service,
else
{
const char* print_addr = addr ? addr : "::";
SListener listener = Listener::create(service,
name,
proto,
addr,
u_port,
auth,
auth_opt,
ssl);
auto listener = Listener::create(service, name, proto, addr, u_port, auth, auth_opt, ssl);
if (listener && listener_serialize(listener))
{
MXS_NOTICE("Created %slistener '%s' at %s:%s for service '%s'",
ssl ? "TLS encrypted " : "", name, print_addr, port, service->name);
service->add_listener(listener);
if (listener->listen())
if (listener->listen(listener))
{
rval = true;
}

View File

@ -160,7 +160,7 @@ static MXB_WORKER* get_dcb_owner(dcb_role_t role)
return owner;
}
DCB::DCB(dcb_role_t role, Listener* listener, SERVICE* service)
DCB::DCB(dcb_role_t role, const SListener& listener, SERVICE* service)
: MXB_POLL_DATA{dcb_poll_handler, get_dcb_owner(role)}
, dcb_role(role)
, listener(listener)
@ -216,12 +216,13 @@ DCB::~DCB()
* Remaining fields are set from the given parameters, and then the DCB is
* flagged as ready for use.
*
* @param dcb_role_t The role for the new DCB
* @param listener The listener if applicable.
* @param role The role for the new DCB
* @param listener The listener if applicable.
* @param service The service which is used
*
* @return An available DCB or NULL if none could be allocated.
*/
DCB* dcb_alloc(dcb_role_t role, Listener* listener, SERVICE* service)
DCB* dcb_alloc(dcb_role_t role, const SListener& listener, SERVICE* service)
{
return new(std::nothrow) DCB(role, listener, service);
}
@ -2314,7 +2315,7 @@ int dcb_connect_SSL(DCB* dcb)
*
* @return DCB - The new client DCB for the new connection, or NULL if failed
*/
DCB* dcb_accept(Listener* listener)
DCB* dcb_accept(const SListener& listener)
{
DCB* client_dcb = NULL;
int c_sock;

View File

@ -81,20 +81,6 @@ public:
*/
const FilterList& get_filters() const;
/**
* Add a listener to a service
*
* @param listener Listener to add
*/
void add_listener(const SListener& listener);
/**
* Remove a listener from a service
*
* @param listener Listener to remove
*/
void remove_listener(const SListener& listener);
/**
* Reload users for all listeners
*
@ -118,16 +104,15 @@ public:
mutable std::mutex lock;
private:
FilterList m_filters; /**< Ordered list of filters */
std::vector<SListener> m_listeners; /**< Listeners pointing to this service */
std::string m_name; /**< Name of the service */
std::string m_router_name; /**< Router module */
std::string m_user; /**< Username */
std::string m_password; /**< Password */
std::string m_weightby; /**< Weighting parameter name */
std::string m_version_string;/**< Version string sent to clients */
RateLimits m_rate_limits; /**< The refresh rate limits for users of each thread */
uint64_t m_wkey; /**< Key for worker local data */
FilterList m_filters; /**< Ordered list of filters */
std::string m_name; /**< Name of the service */
std::string m_router_name; /**< Router module */
std::string m_user; /**< Username */
std::string m_password; /**< Password */
std::string m_weightby; /**< Weighting parameter name */
std::string m_version_string; /**< Version string sent to clients */
RateLimits m_rate_limits; /**< The refresh rate limits for users of each thread */
uint64_t m_wkey; /**< Key for worker local data */
// Get the worker local filter list
FilterList* get_local_filters() const;

View File

@ -136,6 +136,9 @@ void Listener::destroy(const SListener& listener)
listener->m_listener->fd = -1;
listener->m_state = DESTROYED;
// This frees the self-reference the listener's own DCB has to itself
listener->m_listener->listener.reset();
std::lock_guard<std::mutex> guard(listener_lock);
all_listeners.remove(listener);
}
@ -731,10 +734,12 @@ void Listener::set_users(struct users* u)
m_users = u;
}
bool Listener::listen()
bool Listener::listen(const SListener& self)
{
m_state = FAILED;
m_listener = dcb_alloc(DCB_ROLE_SERVICE_LISTENER, this, m_service);
// This is a temporary workaround until the DCBs are removed from listeners
m_listener = dcb_alloc(DCB_ROLE_SERVICE_LISTENER, self, m_service);
if (!m_listener)
{

View File

@ -323,7 +323,7 @@ bool service_isvalid(Service* service)
static int serviceStartPort(Service* service, const SListener& port)
{
mxb_assert(service && service->router && service->router_instance);
return port->listen();
return port->listen(port);
}
/**
@ -835,16 +835,6 @@ const Service::FilterList& Service::get_filters() const
return *get_local_filters();
}
void Service::add_listener(const SListener& listener)
{
m_listeners.push_back(listener);
}
void Service::remove_listener(const SListener& listener)
{
m_listeners.erase(std::remove(m_listeners.begin(), m_listeners.end(), listener), m_listeners.end());
}
Service* service_internal_find(const char* name)
{
LockGuard guard(this_unit.lock);

View File

@ -48,10 +48,9 @@
static int test1()
{
DCB* dcb;
Listener* dummy = nullptr;
/* Single buffer tests */
fprintf(stderr, "testdcb : creating buffer with type DCB_ROLE_INTERNAL");
dcb = dcb_alloc(DCB_ROLE_INTERNAL, dummy, NULL);
dcb = dcb_alloc(DCB_ROLE_INTERNAL, nullptr, nullptr);
printDCB(dcb);
fprintf(stderr, "\t..done\nAllocated dcb.");
// TODO: Without running workers, the following will hang. As it does not

View File

@ -47,9 +47,7 @@
static int test1()
{
DCB* dcb;
int result;
int eno = 0;
Listener* dummy = nullptr;
SERVICE service;
service.routerModule = (char*)"required by a check in dcb.cc";
@ -59,7 +57,7 @@ static int test1()
"testpoll : Initialise the polling system.");
init_test_env(NULL);
fprintf(stderr, "\t..done\nAdd a DCB");
dcb = dcb_alloc(DCB_ROLE_CLIENT_HANDLER, dummy, NULL);
dcb = dcb_alloc(DCB_ROLE_CLIENT_HANDLER, nullptr, nullptr);
if (dcb == NULL)
{

View File

@ -80,7 +80,7 @@ static int test1()
"localhost",
9876,
"MySQLAuth",
NULL,
"",
NULL),
"Add Protocol should succeed");
mxb_assert_message(service_find_listener(service, "", "localhost", 9876),

View File

@ -263,7 +263,7 @@ static int cdc_auth_authenticate(DCB* dcb)
cdc_auth_check(dcb, protocol, client_data->user, client_data->auth_data, client_data->flags);
/* On failed authentication try to reload users and authenticate again */
if (CDC_STATE_AUTH_OK != auth_ret && cdc_replace_users(dcb->listener) == MXS_AUTH_LOADUSERS_OK)
if (CDC_STATE_AUTH_OK != auth_ret && cdc_replace_users(dcb->listener.get()) == MXS_AUTH_LOADUSERS_OK)
{
auth_ret = cdc_auth_check(dcb,
protocol,