Make service ports lock-free

The service port list is now iterated in a safe and lock-free manner. This
makes the handling of service ports somewhat simpler since once an item
has been added to the list it will never be removed. It also removes the
need to lock the service when checking whether a service listens on a port
which caused potential deadlocks.
This commit is contained in:
Markus Mäkelä 2017-05-07 11:18:54 +03:00
parent 461cd6afd9
commit 717f883839
3 changed files with 71 additions and 78 deletions

View File

@ -47,6 +47,7 @@ typedef struct servlistener
struct users *users; /**< The user data for this listener */
struct service* service; /**< The service which used by this listener */
SPINLOCK lock;
int active; /**< True if the port has not been deleted */
struct servlistener *next; /**< Next service protocol */
} SERV_LISTENER;

View File

@ -119,6 +119,7 @@ listener_alloc(struct service* service, const char* name, const char *protocol,
return NULL;
}
proto->active = 1;
proto->name = my_name;
proto->listener = NULL;
proto->service = service;

View File

@ -94,6 +94,11 @@ static void service_internal_restart(void *data);
static void service_queue_check(void *data);
static void service_calculate_weights(SERVICE *service);
static inline SERV_LISTENER* load_port(SERV_LISTENER const *const *const port)
{
return (SERV_LISTENER*)atomic_load_ptr((void**)port);
}
SERVICE* service_alloc(const char *name, const char *router)
{
char *my_name = MXS_STRDUP(name);
@ -499,36 +504,22 @@ int serviceInitialize(SERVICE *service)
}
/**
* @brief Remove a failed listener
* @brief Remove a listener from use
*
* This should only be called when a newly created listener fails to start.
*
* @note The service spinlock must be held when this function is called.
* @note This does not free the memory
*
* @param service Service where @c port points to
* @param port Port to remove
*/
void serviceRemoveListener(SERVICE *service, SERV_LISTENER *port)
void serviceRemoveListener(SERVICE *service, SERV_LISTENER *target)
{
if (service->ports == port)
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
{
service->ports = service->ports->next;
}
else
{
SERV_LISTENER *prev = service->ports;
SERV_LISTENER *current = service->ports->next;
while (current)
if (port == target)
{
if (current == port)
{
prev->next = current->next;
break;
}
prev = current;
current = current->next;
atomic_store_int32(&port->active, 0);
break;
}
}
}
@ -544,7 +535,6 @@ bool serviceLaunchListener(SERVICE *service, SERV_LISTENER *port)
{
/** Failed to start the listener */
serviceRemoveListener(service, port);
listener_free(port);
rval = false;
}
@ -557,11 +547,10 @@ bool serviceStopListener(SERVICE *service, const char *name)
{
bool rval = false;
spinlock_acquire(&service->spin);
for (SERV_LISTENER *port = service->ports; port; port = port->next)
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
{
if (strcmp(port->name, name) == 0)
if (atomic_load_int32(&port->active) && strcmp(port->name, name) == 0)
{
if (poll_remove_dcb(port->listener) == 0)
{
@ -572,8 +561,6 @@ bool serviceStopListener(SERVICE *service, const char *name)
}
}
spinlock_release(&service->spin);
return rval;
}
@ -581,9 +568,8 @@ bool serviceStartListener(SERVICE *service, const char *name)
{
bool rval = false;
spinlock_acquire(&service->spin);
for (SERV_LISTENER *port = service->ports; port; port = port->next)
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
{
if (strcmp(port->name, name) == 0)
{
@ -597,8 +583,6 @@ bool serviceStartListener(SERVICE *service, const char *name)
}
}
spinlock_release(&service->spin);
return rval;
}
@ -632,9 +616,11 @@ bool serviceStop(SERVICE *service)
if (service)
{
for (SERV_LISTENER * port = service->ports; port; port = port->next)
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
{
if (port->listener && port->listener->session->state == SESSION_STATE_LISTENER)
if (atomic_load_int32(&port->active) &&
port->listener && port->listener->session->state == SESSION_STATE_LISTENER)
{
if (poll_remove_dcb(port->listener) == 0)
{
@ -664,9 +650,11 @@ bool serviceStart(SERVICE *service)
if (service)
{
for (SERV_LISTENER* port = service->ports; port; port = port->next)
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
{
if (port->listener && port->listener->session->state == SESSION_STATE_LISTENER_STOPPED)
if (atomic_load_int32(&port->active) &&
port->listener && port->listener->session->state == SESSION_STATE_LISTENER_STOPPED)
{
if (poll_add_dcb(port->listener) == 0)
{
@ -748,10 +736,11 @@ SERV_LISTENER* serviceCreateListener(SERVICE *service, const char *name, const c
if (proto)
{
spinlock_acquire(&service->spin);
proto->next = service->ports;
service->ports = proto;
spinlock_release(&service->spin);
do
{
proto->next = load_port(&service->ports);
}
while (!atomic_cas_ptr((void**)&service->ports, (void**)&proto->next, proto));
}
return proto;
@ -769,23 +758,20 @@ SERV_LISTENER* serviceCreateListener(SERVICE *service, const char *name, const c
bool serviceHasListener(SERVICE *service, const char *protocol,
const char* address, unsigned short port)
{
SERV_LISTENER *proto;
spinlock_acquire(&service->spin);
proto = service->ports;
while (proto)
for (SERV_LISTENER *proto = load_port(&service->ports);
proto; proto = load_port(&proto->next))
{
if (strcmp(proto->protocol, protocol) == 0 && proto->port == port &&
if (atomic_load_int32(&proto->active) &&
strcmp(proto->protocol, protocol) == 0 && proto->port == port &&
((address && proto->address && strcmp(proto->address, address) == 0) ||
(address == NULL && proto->address == NULL)))
{
break;
return true;
}
proto = proto->next;
}
spinlock_release(&service->spin);
return proto != NULL;
return false;
}
/**
@ -1523,7 +1509,7 @@ void
dListListeners(DCB *dcb)
{
SERVICE *service;
SERV_LISTENER *lptr;
SERV_LISTENER *port;
spinlock_acquire(&service_spin);
service = allServices;
@ -1539,19 +1525,20 @@ dListListeners(DCB *dcb)
}
while (service)
{
lptr = service->ports;
while (lptr)
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
{
dcb_printf(dcb, "%-20s | %-19s | %-18s | %-15s | %5d | %s\n",
lptr->name, service->name, lptr->protocol,
(lptr && lptr->address) ? lptr->address : "*",
lptr->port,
(!lptr->listener ||
!lptr->listener->session ||
lptr->listener->session->state == SESSION_STATE_LISTENER_STOPPED) ?
"Stopped" : "Running");
lptr = lptr->next;
if (atomic_load_int32(&port->active))
{
dcb_printf(dcb, "%-20s | %-19s | %-18s | %-15s | %5d | %s\n",
port->name, service->name, port->protocol,
(port && port->address) ? port->address : "*",
port->port,
(!port->listener ||
!port->listener->session ||
port->listener->session->state == SESSION_STATE_LISTENER_STOPPED) ?
"Stopped" : "Running");
}
}
service = service->next;
}
@ -1639,10 +1626,12 @@ int service_refresh_users(SERVICE *service)
ret = 0;
for (SERV_LISTENER *port = service->ports; port; port = port->next)
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
{
/** Load the authentication users before before starting the listener */
if (port->listener && port->listener->authfunc.loadusers)
if (atomic_load_int32(&port->active) &&
port->listener && port->listener->authfunc.loadusers)
{
switch (port->listener->authfunc.loadusers(port))
{
@ -1903,6 +1892,8 @@ serviceSessionCountAll()
* Provide a row to the result set that defines the set of service
* listeners
*
* TODO: Replace these
*
* @param set The result set
* @param data The index of the row to send
* @return The next row or NULL
@ -2086,7 +2077,7 @@ bool service_all_services_have_listeners()
while (service)
{
if (service->ports == NULL)
if (load_port(&service->ports) == NULL)
{
MXS_ERROR("Service '%s' has no listeners.", service->name);
rval = false;
@ -2341,9 +2332,11 @@ bool service_serialize_servers(const SERVICE *service)
void service_print_users(DCB *dcb, const SERVICE *service)
{
for (SERV_LISTENER *port = service->ports; port; port = port->next)
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
{
if (port->listener && port->listener->authfunc.diagnostic)
if (atomic_load_int32(&port->active) &&
port->listener && port->listener->authfunc.diagnostic)
{
port->listener->authfunc.diagnostic(dcb, port);
}
@ -2357,18 +2350,15 @@ bool service_port_is_used(unsigned short port)
for (SERVICE *service = allServices; service && !rval; service = service->next)
{
spinlock_acquire(&service->spin);
for (SERV_LISTENER *proto = service->ports; proto; proto = proto->next)
for (SERV_LISTENER *proto = load_port(&service->ports);
proto; proto = load_port(&proto->next))
{
if (proto->port == port)
if (atomic_load_int32(&proto->active) && proto->port == port)
{
rval = true;
break;
}
}
spinlock_release(&service->spin);
}
spinlock_release(&service_spin);
@ -2463,11 +2453,12 @@ json_t* service_listeners_json_data(const SERVICE* service)
{
json_t* arr = json_array();
if (service->ports)
for (SERV_LISTENER *port = load_port(&service->ports);
port; port = load_port(&port->next))
{
for (SERV_LISTENER* p = service->ports; p; p = p->next)
if (atomic_load_int32(&port->active))
{
json_array_append_new(arr, listener_to_json(p));
json_array_append_new(arr, listener_to_json(port));
}
}