diff --git a/server/core/internal/service.hh b/server/core/internal/service.hh index f96a7ed2f..3f002b89b 100644 --- a/server/core/internal/service.hh +++ b/server/core/internal/service.hh @@ -76,11 +76,12 @@ public: /** * Get the list of filters this service uses * - * @note This locks the service + * @note This can lock the service if this is the first time this worker + * accesses the filter list * * @return A list of filters or an empty list of no filters are in use */ - std::vector get_filters() const; + const FilterList& get_filters() const; inline bool has_filters() const { @@ -114,6 +115,20 @@ private: std::string m_weightby; /**< Weighting parameter name */ std::string m_version_string; /**< Version string sent to clients */ RateLimits m_rate_limits; /**< The refresh rate limits for users of each thread */ + uint64_t m_wkey; /**< Key for worker local data */ + + // Get the worker local filter list + FilterList* get_local_filters() const; + + // Update the local filter list on the current worker + void update_local_filters(); + + // Callback for updating the local filter list + static void update_filters_cb(void* data) + { + Service* service = static_cast(data); + service->update_local_filters(); + } }; /** diff --git a/server/core/service.cc b/server/core/service.cc index 0c4720aba..cc73c6552 100644 --- a/server/core/service.cc +++ b/server/core/service.cc @@ -69,7 +69,8 @@ using std::string; using std::set; using namespace maxscale; -using Guard = std::lock_guard; +using LockGuard = std::lock_guard; +using UniqueLock = std::unique_lock; /** Base value for server weights */ #define SERVICE_BASE_SERVER_WEIGHT 1000 @@ -121,7 +122,7 @@ Service* service_alloc(const char *name, const char *router, MXS_CONFIG_PARAMETE service->capabilities |= router_api->getCapabilities(service->router_instance); } - Guard guard(service_spin); + LockGuard guard(service_spin); all_services.push_back(service); return service; @@ -157,7 +158,8 @@ Service::Service(const std::string& service_name, const std::string& router_name m_password(config_get_string(params, CN_PASSWORD)), m_weightby(config_get_string(params, CN_WEIGHTBY)), m_version_string(get_version_string(params)), - m_rate_limits(config_threadcount()) + m_rate_limits(config_threadcount()), + m_wkey(mxs_rworker_create_key()) { const MXS_MODULE* module = get_module(router_name.c_str(), MODULE_ROUTER); ss_dassert(module); @@ -222,6 +224,8 @@ Service::Service(const std::string& service_name, const std::string& router_name Service::~Service() { + mxs_rworker_delete_data(m_wkey); + while (auto tmp = ports) { ports = ports->next; @@ -250,7 +254,7 @@ void service_free(Service* service) ss_dassert(!service->active || maxscale_teardown_in_progress()); { - Guard guard(service_spin); + LockGuard guard(service_spin); auto it = std::remove(all_services.begin(), all_services.end(), service); ss_dassert(it != all_services.end()); all_services.erase(it); @@ -295,7 +299,7 @@ void service_destroy(Service* service) */ bool service_isvalid(Service *service) { - Guard guard(service_spin); + LockGuard guard(service_spin); return std::find(all_services.begin(), all_services.end(), service) != all_services.end(); } @@ -540,7 +544,7 @@ bool serviceLaunchListener(Service *service, SERV_LISTENER *port) { ss_dassert(service->state != SERVICE_STATE_FAILED); bool rval = true; - Guard guard(service->lock); + LockGuard guard(service->lock); if (serviceStartPort(service, port) == 0) { @@ -921,7 +925,7 @@ bool serviceAddBackend(SERVICE *svc, SERVER *server) if (new_ref) { rval = true; - Guard guard(service->lock); + LockGuard guard(service->lock); service->n_dbref++; @@ -974,7 +978,7 @@ bool serviceAddBackend(SERVICE *svc, SERVER *server) */ void serviceRemoveBackend(Service *service, const SERVER *server) { - Guard guard(service->lock); + LockGuard guard(service->lock); for (SERVER_REF *ref = service->dbref; ref; ref = ref->next) { @@ -998,7 +1002,7 @@ bool serviceHasBackend(Service *service, SERVER *server) { SERVER_REF *ptr; - Guard guard(service->lock); + LockGuard guard(service->lock); ptr = service->dbref; while (ptr) { @@ -1082,18 +1086,49 @@ bool Service::set_filters(const std::string& filters) if (rval) { - Guard guard(lock); + UniqueLock guard(lock); m_filters = flist; - capabilities |= my_capabilities; + capabilities |= capabilities; + guard.unlock(); + + // Broadcast a message to other workers to update their filter lists + mxs_rworker_broadcast(update_filters_cb, this); } return rval; } -std::vector Service::get_filters() const +static void destroy_filter_list(void* data) { - Guard guard(lock); - return m_filters; + Service::FilterList* filters = static_cast(data); + delete filters; +} + +Service::FilterList* Service::get_local_filters() const +{ + FilterList* filters = static_cast(mxs_rworker_get_data(m_wkey)); + + if (filters == nullptr) + { + UniqueLock guard(lock); + filters = new FilterList(m_filters); + guard.unlock(); + mxs_rworker_set_data(m_wkey, filters, destroy_filter_list); + } + + return filters; +} + +void Service::update_local_filters() +{ + FilterList* filters = get_local_filters(); + LockGuard guard(lock); + *filters = m_filters; +} + +const Service::FilterList& Service::get_filters() const +{ + return *get_local_filters(); } /** @@ -1112,7 +1147,7 @@ bool service_set_filters(Service* service, const char* filters) Service* service_internal_find(const char *name) { - Guard guard(service_spin); + LockGuard guard(service_spin); for (Service* s : all_services) { @@ -1145,7 +1180,7 @@ SERVICE* service_find(const char *servname) void dprintAllServices(DCB *dcb) { - Guard guard(service_spin); + LockGuard guard(service_spin); for (Service* s : all_services) { @@ -1237,7 +1272,7 @@ dListServices(DCB *dcb) { const char HORIZ_SEPARATOR[] = "--------------------------+-------------------" "+--------+----------------+-------------------\n"; - Guard guard(service_spin); + LockGuard guard(service_spin); if (!all_services.empty()) { @@ -1285,7 +1320,7 @@ dListServices(DCB *dcb) */ void dListListeners(DCB *dcb) { - Guard guard(service_spin); + LockGuard guard(service_spin); if (!all_services.empty()) { @@ -1329,7 +1364,7 @@ bool Service::refresh_users() time_t now = time(NULL); // Use unique_lock instead of lock_guard to make the locking conditional - std::unique_lock guard(lock, std::defer_lock); + UniqueLock guard(lock, std::defer_lock); if ((capabilities & ACAP_TYPE_ASYNC) == 0) { @@ -1510,7 +1545,7 @@ int serviceSessionCountAll() { int rval = 0; - Guard guard(service_spin); + LockGuard guard(service_spin); for (Service* service : all_services) { @@ -1528,7 +1563,7 @@ serviceSessionCountAll() std::unique_ptr serviceGetListenerList() { std::unique_ptr set = ResultSet::create({"Service Name", "Protocol Module", "Address", "Port", "State"}); - Guard guard(service_spin); + LockGuard guard(service_spin); for (Service* service : all_services) { @@ -1553,7 +1588,7 @@ std::unique_ptr serviceGetListenerList() std::unique_ptr serviceGetList() { std::unique_ptr set = ResultSet::create({"Service Name", "Router Module", "No. Sessions", "Total Sessions"}); - Guard guard(service_spin); + LockGuard guard(service_spin); for (Service* s : all_services) { @@ -1582,7 +1617,7 @@ static bool service_internal_restart(void *data) bool service_all_services_have_listeners() { bool rval = true; - Guard guard(service_spin); + LockGuard guard(service_spin); for (Service* service : all_services) { @@ -1675,7 +1710,7 @@ static void service_calculate_weights(SERVICE *service) void service_update_weights() { - Guard guard(service_spin); + LockGuard guard(service_spin); for (Service* service : all_services) { @@ -1685,11 +1720,11 @@ void service_update_weights() bool service_server_in_use(const SERVER *server) { - Guard guard(service_spin); + LockGuard guard(service_spin); for (Service* service : all_services) { - Guard guard(service->lock); + LockGuard guard(service->lock); for (SERVER_REF *ref = service->dbref; ref; ref = ref->next) { @@ -1706,7 +1741,7 @@ bool service_server_in_use(const SERVER *server) bool service_filter_in_use(const SFilterDef& filter) { ss_dassert(filter); - Guard guard(service_spin); + LockGuard guard(service_spin); for (Service* service : all_services) { @@ -1910,7 +1945,7 @@ void service_print_users(DCB *dcb, const SERVICE *service) bool service_port_is_used(unsigned short port) { bool rval = false; - Guard guard(service_spin); + LockGuard guard(service_spin); for (Service* service : all_services) { @@ -2112,7 +2147,7 @@ json_t* service_json_data(const SERVICE* svc, const char* host) { const Service* service = static_cast(svc); json_t* rval = json_object(); - Guard guard(service->lock); + LockGuard guard(service->lock); json_object_set_new(rval, CN_ID, json_string(service->name)); json_object_set_new(rval, CN_TYPE, json_string(CN_SERVICES)); @@ -2156,7 +2191,7 @@ json_t* service_listener_to_json(const Service* service, const char* name, const json_t* service_list_to_json(const char* host) { json_t* arr = json_array(); - Guard guard(service_spin); + LockGuard guard(service_spin); for (Service* service : all_services) { @@ -2174,7 +2209,7 @@ json_t* service_list_to_json(const char* host) json_t* service_relations_to_filter(const SFilterDef& filter, const char* host) { json_t* rel = mxs_json_relationship(host, MXS_JSON_API_SERVICES); - Guard guard(service_spin); + LockGuard guard(service_spin); for (Service* service : all_services) { @@ -2194,11 +2229,11 @@ json_t* service_relations_to_filter(const SFilterDef& filter, const char* host) json_t* service_relations_to_server(const SERVER* server, const char* host) { std::vector names; - Guard guard(service_spin); + LockGuard guard(service_spin); for (Service* service : all_services) { - Guard guard(service->lock); + LockGuard guard(service->lock); for (SERVER_REF *ref = service->dbref; ref; ref = ref->next) { @@ -2306,7 +2341,7 @@ uint64_t service_get_version(const SERVICE *svc, service_version_which_t which) bool service_thread_init() { - Guard guard(service_spin); + LockGuard guard(service_spin); for (Service* service : all_services) {