diff --git a/include/maxscale/listener.h b/include/maxscale/listener.h index d52cd8f11..a854b9387 100644 --- a/include/maxscale/listener.h +++ b/include/maxscale/listener.h @@ -47,6 +47,7 @@ typedef struct servlistener struct users *users; /**< The user data for this listener */ struct service* service; /**< The service which used by this listener */ SPINLOCK lock; + int active; /**< True if the port has not been deleted */ struct servlistener *next; /**< Next service protocol */ } SERV_LISTENER; diff --git a/server/core/listener.cc b/server/core/listener.cc index 8153f85d7..816ba41e3 100644 --- a/server/core/listener.cc +++ b/server/core/listener.cc @@ -119,6 +119,7 @@ listener_alloc(struct service* service, const char* name, const char *protocol, return NULL; } + proto->active = 1; proto->name = my_name; proto->listener = NULL; proto->service = service; diff --git a/server/core/service.cc b/server/core/service.cc index 4aaac0b89..a9c451ef7 100644 --- a/server/core/service.cc +++ b/server/core/service.cc @@ -94,6 +94,11 @@ static void service_internal_restart(void *data); static void service_queue_check(void *data); static void service_calculate_weights(SERVICE *service); +static inline SERV_LISTENER* load_port(SERV_LISTENER const *const *const port) +{ + return (SERV_LISTENER*)atomic_load_ptr((void**)port); +} + SERVICE* service_alloc(const char *name, const char *router) { char *my_name = MXS_STRDUP(name); @@ -499,36 +504,22 @@ int serviceInitialize(SERVICE *service) } /** - * @brief Remove a failed listener + * @brief Remove a listener from use * - * This should only be called when a newly created listener fails to start. - * - * @note The service spinlock must be held when this function is called. + * @note This does not free the memory * * @param service Service where @c port points to * @param port Port to remove */ -void serviceRemoveListener(SERVICE *service, SERV_LISTENER *port) +void serviceRemoveListener(SERVICE *service, SERV_LISTENER *target) { - - if (service->ports == port) + for (SERV_LISTENER *port = load_port(&service->ports); + port; port = load_port(&port->next)) { - service->ports = service->ports->next; - } - else - { - SERV_LISTENER *prev = service->ports; - SERV_LISTENER *current = service->ports->next; - - while (current) + if (port == target) { - if (current == port) - { - prev->next = current->next; - break; - } - prev = current; - current = current->next; + atomic_store_int32(&port->active, 0); + break; } } } @@ -544,7 +535,6 @@ bool serviceLaunchListener(SERVICE *service, SERV_LISTENER *port) { /** Failed to start the listener */ serviceRemoveListener(service, port); - listener_free(port); rval = false; } @@ -557,11 +547,10 @@ bool serviceStopListener(SERVICE *service, const char *name) { bool rval = false; - spinlock_acquire(&service->spin); - - for (SERV_LISTENER *port = service->ports; port; port = port->next) + for (SERV_LISTENER *port = load_port(&service->ports); + port; port = load_port(&port->next)) { - if (strcmp(port->name, name) == 0) + if (atomic_load_int32(&port->active) && strcmp(port->name, name) == 0) { if (poll_remove_dcb(port->listener) == 0) { @@ -572,8 +561,6 @@ bool serviceStopListener(SERVICE *service, const char *name) } } - spinlock_release(&service->spin); - return rval; } @@ -581,9 +568,8 @@ bool serviceStartListener(SERVICE *service, const char *name) { bool rval = false; - spinlock_acquire(&service->spin); - - for (SERV_LISTENER *port = service->ports; port; port = port->next) + for (SERV_LISTENER *port = load_port(&service->ports); + port; port = load_port(&port->next)) { if (strcmp(port->name, name) == 0) { @@ -597,8 +583,6 @@ bool serviceStartListener(SERVICE *service, const char *name) } } - spinlock_release(&service->spin); - return rval; } @@ -632,9 +616,11 @@ bool serviceStop(SERVICE *service) if (service) { - for (SERV_LISTENER * port = service->ports; port; port = port->next) + for (SERV_LISTENER *port = load_port(&service->ports); + port; port = load_port(&port->next)) { - if (port->listener && port->listener->session->state == SESSION_STATE_LISTENER) + if (atomic_load_int32(&port->active) && + port->listener && port->listener->session->state == SESSION_STATE_LISTENER) { if (poll_remove_dcb(port->listener) == 0) { @@ -664,9 +650,11 @@ bool serviceStart(SERVICE *service) if (service) { - for (SERV_LISTENER* port = service->ports; port; port = port->next) + for (SERV_LISTENER *port = load_port(&service->ports); + port; port = load_port(&port->next)) { - if (port->listener && port->listener->session->state == SESSION_STATE_LISTENER_STOPPED) + if (atomic_load_int32(&port->active) && + port->listener && port->listener->session->state == SESSION_STATE_LISTENER_STOPPED) { if (poll_add_dcb(port->listener) == 0) { @@ -748,10 +736,11 @@ SERV_LISTENER* serviceCreateListener(SERVICE *service, const char *name, const c if (proto) { - spinlock_acquire(&service->spin); - proto->next = service->ports; - service->ports = proto; - spinlock_release(&service->spin); + do + { + proto->next = load_port(&service->ports); + } + while (!atomic_cas_ptr((void**)&service->ports, (void**)&proto->next, proto)); } return proto; @@ -769,23 +758,20 @@ SERV_LISTENER* serviceCreateListener(SERVICE *service, const char *name, const c bool serviceHasListener(SERVICE *service, const char *protocol, const char* address, unsigned short port) { - SERV_LISTENER *proto; - spinlock_acquire(&service->spin); - proto = service->ports; - while (proto) + for (SERV_LISTENER *proto = load_port(&service->ports); + proto; proto = load_port(&proto->next)) { - if (strcmp(proto->protocol, protocol) == 0 && proto->port == port && + if (atomic_load_int32(&proto->active) && + strcmp(proto->protocol, protocol) == 0 && proto->port == port && ((address && proto->address && strcmp(proto->address, address) == 0) || (address == NULL && proto->address == NULL))) { - break; + return true; } - proto = proto->next; } - spinlock_release(&service->spin); - return proto != NULL; + return false; } /** @@ -1523,7 +1509,7 @@ void dListListeners(DCB *dcb) { SERVICE *service; - SERV_LISTENER *lptr; + SERV_LISTENER *port; spinlock_acquire(&service_spin); service = allServices; @@ -1539,19 +1525,20 @@ dListListeners(DCB *dcb) } while (service) { - lptr = service->ports; - while (lptr) + for (SERV_LISTENER *port = load_port(&service->ports); + port; port = load_port(&port->next)) { - dcb_printf(dcb, "%-20s | %-19s | %-18s | %-15s | %5d | %s\n", - lptr->name, service->name, lptr->protocol, - (lptr && lptr->address) ? lptr->address : "*", - lptr->port, - (!lptr->listener || - !lptr->listener->session || - lptr->listener->session->state == SESSION_STATE_LISTENER_STOPPED) ? - "Stopped" : "Running"); - - lptr = lptr->next; + if (atomic_load_int32(&port->active)) + { + dcb_printf(dcb, "%-20s | %-19s | %-18s | %-15s | %5d | %s\n", + port->name, service->name, port->protocol, + (port && port->address) ? port->address : "*", + port->port, + (!port->listener || + !port->listener->session || + port->listener->session->state == SESSION_STATE_LISTENER_STOPPED) ? + "Stopped" : "Running"); + } } service = service->next; } @@ -1639,10 +1626,12 @@ int service_refresh_users(SERVICE *service) ret = 0; - for (SERV_LISTENER *port = service->ports; port; port = port->next) + for (SERV_LISTENER *port = load_port(&service->ports); + port; port = load_port(&port->next)) { /** Load the authentication users before before starting the listener */ - if (port->listener && port->listener->authfunc.loadusers) + if (atomic_load_int32(&port->active) && + port->listener && port->listener->authfunc.loadusers) { switch (port->listener->authfunc.loadusers(port)) { @@ -1903,6 +1892,8 @@ serviceSessionCountAll() * Provide a row to the result set that defines the set of service * listeners * + * TODO: Replace these + * * @param set The result set * @param data The index of the row to send * @return The next row or NULL @@ -2086,7 +2077,7 @@ bool service_all_services_have_listeners() while (service) { - if (service->ports == NULL) + if (load_port(&service->ports) == NULL) { MXS_ERROR("Service '%s' has no listeners.", service->name); rval = false; @@ -2341,9 +2332,11 @@ bool service_serialize_servers(const SERVICE *service) void service_print_users(DCB *dcb, const SERVICE *service) { - for (SERV_LISTENER *port = service->ports; port; port = port->next) + for (SERV_LISTENER *port = load_port(&service->ports); + port; port = load_port(&port->next)) { - if (port->listener && port->listener->authfunc.diagnostic) + if (atomic_load_int32(&port->active) && + port->listener && port->listener->authfunc.diagnostic) { port->listener->authfunc.diagnostic(dcb, port); } @@ -2357,18 +2350,15 @@ bool service_port_is_used(unsigned short port) for (SERVICE *service = allServices; service && !rval; service = service->next) { - spinlock_acquire(&service->spin); - - for (SERV_LISTENER *proto = service->ports; proto; proto = proto->next) + for (SERV_LISTENER *proto = load_port(&service->ports); + proto; proto = load_port(&proto->next)) { - if (proto->port == port) + if (atomic_load_int32(&proto->active) && proto->port == port) { rval = true; break; } } - - spinlock_release(&service->spin); } spinlock_release(&service_spin); @@ -2463,11 +2453,12 @@ json_t* service_listeners_json_data(const SERVICE* service) { json_t* arr = json_array(); - if (service->ports) + for (SERV_LISTENER *port = load_port(&service->ports); + port; port = load_port(&port->next)) { - for (SERV_LISTENER* p = service->ports; p; p = p->next) + if (atomic_load_int32(&port->active)) { - json_array_append_new(arr, listener_to_json(p)); + json_array_append_new(arr, listener_to_json(port)); } }