Add listener iterator

The listener iterator hides the details of the listener iteration behind a
small set of functions.

The following for-loop demonstrates the main use-case for the iterator:

    LISTENER_ITERATOR iter;

    for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
         listener; listener = listener_iterator_next(&iter))
    {
        /** Do something with the listener */
    }

As the listeners are mostly iterated to either check a fact about them or
print their information, the functions cater to that use-case. For this
reason, they should always be allocated off the stack.
This commit is contained in:
Markus Mäkelä 2017-05-08 14:17:14 +03:00
parent c30b817abc
commit 613b924f2e
3 changed files with 179 additions and 73 deletions

View File

@ -49,7 +49,12 @@ typedef struct servlistener
SPINLOCK lock;
int active; /**< True if the port has not been deleted */
struct servlistener *next; /**< Next service protocol */
} SERV_LISTENER;
} SERV_LISTENER; // TODO: Rename to LISTENER
typedef struct listener_iterator
{
SERV_LISTENER* current;
} LISTENER_ITERATOR;
/**
* @brief Serialize a listener to a file
@ -80,4 +85,40 @@ int listener_set_ssl_version(SSL_LISTENER *ssl_listener, char* version);
void listener_set_certificates(SSL_LISTENER *ssl_listener, char* cert, char* key, char* ca_cert);
int listener_init_SSL(SSL_LISTENER *ssl_listener);
/**
* @brief Check if listener is active
*
* @param listener Listener to check
*
* @return True if listener is active
*/
bool listener_is_active(SERV_LISTENER* listener);
/**
* @brief Modify listener active state
*
* @param listener Listener to modify
* @param active True to activate, false to disable
*/
void listener_set_active(SERV_LISTENER* listener, bool active);
/**
* @brief Initialize a listener iterator for iterating service listeners
*
* @param service Service whose listeners are iterated
* @param iter Pointer to iterator to initialize
*
* @return The first value pointed by the iterator
*/
SERV_LISTENER* listener_iterator_init(const struct service* service, LISTENER_ITERATOR* iter);
/**
* @brief Get the next listener
*
* @param iter Listener iterator
*
* @return The next listener or NULL on end of list
*/
SERV_LISTENER* listener_iterator_next(LISTENER_ITERATOR* iter);
MXS_END_DECLS

View File

@ -545,3 +545,37 @@ json_t* listener_to_json(const SERV_LISTENER* listener)
return rval;
}
void listener_set_active(SERV_LISTENER* listener, bool active)
{
atomic_store_int32(&listener->active, active ? 1 : 0);
}
bool listener_is_active(SERV_LISTENER* listener)
{
return atomic_load_int32(&listener->active);
}
static inline SERV_LISTENER* load_port(SERV_LISTENER const *const *const port)
{
return (SERV_LISTENER*)atomic_load_ptr((void**)port);
}
SERV_LISTENER* listener_iterator_init(const SERVICE* service, LISTENER_ITERATOR* iter)
{
ss_dassert(iter);
iter->current = load_port(&service->ports);
return iter->current;
}
SERV_LISTENER* listener_iterator_next(LISTENER_ITERATOR* iter)
{
ss_dassert(iter);
if (iter->current)
{
iter->current = load_port(&iter->current->next);
}
return iter->current;
}

View File

@ -94,11 +94,6 @@ static void service_internal_restart(void *data);
static void service_queue_check(void *data);
static void service_calculate_weights(SERVICE *service);
static inline SERV_LISTENER* load_port(SERV_LISTENER const *const *const port)
{
return (SERV_LISTENER*)atomic_load_ptr((void**)port);
}
SERVICE* service_alloc(const char *name, const char *router)
{
char *my_name = MXS_STRDUP(name);
@ -513,12 +508,14 @@ int serviceInitialize(SERVICE *service)
*/
void serviceRemoveListener(SERVICE *service, SERV_LISTENER *target)
{
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (port == target)
if (listener == target)
{
atomic_store_int32(&port->active, 0);
listener_set_active(listener, false);
break;
}
}
@ -546,15 +543,16 @@ bool serviceLaunchListener(SERVICE *service, SERV_LISTENER *port)
bool serviceStopListener(SERVICE *service, const char *name)
{
bool rval = false;
LISTENER_ITERATOR iter;
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (atomic_load_int32(&port->active) && strcmp(port->name, name) == 0)
if (listener_is_active(listener) && strcmp(listener->name, name) == 0)
{
if (poll_remove_dcb(port->listener) == 0)
if (poll_remove_dcb(listener->listener) == 0)
{
port->listener->session->state = SESSION_STATE_LISTENER_STOPPED;
listener->listener->session->state = SESSION_STATE_LISTENER_STOPPED;
rval = true;
}
break;
@ -567,16 +565,17 @@ bool serviceStopListener(SERVICE *service, const char *name)
bool serviceStartListener(SERVICE *service, const char *name)
{
bool rval = false;
LISTENER_ITERATOR iter;
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (strcmp(port->name, name) == 0)
if (listener_is_active(listener) && strcmp(listener->name, name) == 0)
{
if (port->listener && port->listener->session->state == SESSION_STATE_LISTENER_STOPPED &&
poll_add_dcb(port->listener) == 0)
if (listener->listener && listener->listener->session->state == SESSION_STATE_LISTENER_STOPPED &&
poll_add_dcb(listener->listener) == 0)
{
port->listener->session->state = SESSION_STATE_LISTENER;
listener->listener->session->state = SESSION_STATE_LISTENER;
rval = true;
}
break;
@ -616,15 +615,17 @@ bool serviceStop(SERVICE *service)
if (service)
{
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (atomic_load_int32(&port->active) &&
port->listener && port->listener->session->state == SESSION_STATE_LISTENER)
if (listener_is_active(listener) &&
listener->listener && listener->listener->session->state == SESSION_STATE_LISTENER)
{
if (poll_remove_dcb(port->listener) == 0)
if (poll_remove_dcb(listener->listener) == 0)
{
port->listener->session->state = SESSION_STATE_LISTENER_STOPPED;
listener->listener->session->state = SESSION_STATE_LISTENER_STOPPED;
listeners++;
}
}
@ -650,15 +651,17 @@ bool serviceStart(SERVICE *service)
if (service)
{
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (atomic_load_int32(&port->active) &&
port->listener && port->listener->session->state == SESSION_STATE_LISTENER_STOPPED)
if (listener_is_active(listener) &&
listener->listener && listener->listener->session->state == SESSION_STATE_LISTENER_STOPPED)
{
if (poll_add_dcb(port->listener) == 0)
if (poll_add_dcb(listener->listener) == 0)
{
port->listener->session->state = SESSION_STATE_LISTENER;
listener->listener->session->state = SESSION_STATE_LISTENER;
listeners++;
}
}
@ -715,6 +718,25 @@ void service_free(SERVICE *service)
MXS_FREE(service);
}
/**
* Add a listener to a service
*
* @param service Service where listener is added
* @param proto Listener to add
*/
static void service_add_listener(SERVICE* service, SERV_LISTENER* proto)
{
do
{
/** Read the current value of the list's head. This will be our expected
* value for the following compare-and-swap operation. */
proto->next = (SERV_LISTENER*)atomic_load_ptr((void**)&service->ports);
}
/** Compare the current value to our expected value and if they match, replace
* the current value with our new value. */
while (!atomic_cas_ptr((void**)&service->ports, (void**)&proto->next, proto));
}
/**
* Create a listener for the service
*
@ -736,11 +758,7 @@ SERV_LISTENER* serviceCreateListener(SERVICE *service, const char *name, const c
if (proto)
{
do
{
proto->next = load_port(&service->ports);
}
while (!atomic_cas_ptr((void**)&service->ports, (void**)&proto->next, proto));
service_add_listener(service, proto);
}
return proto;
@ -758,14 +776,15 @@ SERV_LISTENER* serviceCreateListener(SERVICE *service, const char *name, const c
bool serviceHasListener(SERVICE *service, const char *protocol,
const char* address, unsigned short port)
{
LISTENER_ITERATOR iter;
for (SERV_LISTENER *proto = load_port(&service->ports);
proto; proto = load_port(&proto->next))
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (atomic_load_int32(&proto->active) &&
strcmp(proto->protocol, protocol) == 0 && proto->port == port &&
((address && proto->address && strcmp(proto->address, address) == 0) ||
(address == NULL && proto->address == NULL)))
if (listener_is_active(listener) &&
strcmp(listener->protocol, protocol) == 0 && listener->port == port &&
((address && listener->address && strcmp(listener->address, address) == 0) ||
(address == NULL && listener->address == NULL)))
{
return true;
}
@ -1525,18 +1544,20 @@ dListListeners(DCB *dcb)
}
while (service)
{
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (atomic_load_int32(&port->active))
if (listener_is_active(listener))
{
dcb_printf(dcb, "%-20s | %-19s | %-18s | %-15s | %5d | %s\n",
port->name, service->name, port->protocol,
(port && port->address) ? port->address : "*",
port->port,
(!port->listener ||
!port->listener->session ||
port->listener->session->state == SESSION_STATE_LISTENER_STOPPED) ?
listener->name, service->name, listener->protocol,
(listener && listener->address) ? listener->address : "*",
listener->port,
(!listener->listener ||
!listener->listener->session ||
listener->listener->session->state == SESSION_STATE_LISTENER_STOPPED) ?
"Stopped" : "Running");
}
}
@ -1625,25 +1646,26 @@ int service_refresh_users(SERVICE *service)
}
ret = 0;
LISTENER_ITERATOR iter;
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
/** Load the authentication users before before starting the listener */
if (atomic_load_int32(&port->active) &&
port->listener && port->listener->authfunc.loadusers)
if (listener_is_active(listener) && listener->listener &&
listener->listener->authfunc.loadusers)
{
switch (port->listener->authfunc.loadusers(port))
switch (listener->listener->authfunc.loadusers(listener))
{
case MXS_AUTH_LOADUSERS_FATAL:
MXS_ERROR("[%s] Fatal error when loading users for listener '%s',"
" authentication will not work.", service->name, port->name);
" authentication will not work.", service->name, listener->name);
ret = 1;
break;
case MXS_AUTH_LOADUSERS_ERROR:
MXS_WARNING("[%s] Failed to load users for listener '%s', authentication"
" might not work.", service->name, port->name);
" might not work.", service->name, listener->name);
ret = 1;
break;
@ -2077,11 +2099,15 @@ bool service_all_services_have_listeners()
while (service)
{
if (load_port(&service->ports) == NULL)
LISTENER_ITERATOR iter;
SERV_LISTENER *listener = listener_iterator_init(service, &iter);
if (listener == NULL)
{
MXS_ERROR("Service '%s' has no listeners.", service->name);
rval = false;
}
service = service->next;
}
@ -2332,13 +2358,15 @@ bool service_serialize_servers(const SERVICE *service)
void service_print_users(DCB *dcb, const SERVICE *service)
{
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (atomic_load_int32(&port->active) &&
port->listener && port->listener->authfunc.diagnostic)
if (listener_is_active(listener) && listener->listener &&
listener->listener->authfunc.diagnostic)
{
port->listener->authfunc.diagnostic(dcb, port);
listener->listener->authfunc.diagnostic(dcb, listener);
}
}
}
@ -2350,10 +2378,12 @@ bool service_port_is_used(unsigned short port)
for (SERVICE *service = allServices; service && !rval; service = service->next)
{
for (SERV_LISTENER *proto = load_port(&service->ports);
proto; proto = load_port(&proto->next))
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (atomic_load_int32(&proto->active) && proto->port == port)
if (listener_is_active(listener) && listener->port == port)
{
rval = true;
break;
@ -2452,13 +2482,14 @@ static inline bool have_active_servers(const SERVICE* service)
json_t* service_listeners_json_data(const SERVICE* service)
{
json_t* arr = json_array();
LISTENER_ITERATOR iter;
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (atomic_load_int32(&port->active))
if (listener_is_active(listener))
{
json_array_append_new(arr, listener_to_json(port));
json_array_append_new(arr, listener_to_json(listener));
}
}