MXS-1929: Cache filter lists in workers
The services will now store a local copy of the filter lists in the worker local data associated with the service. This removes the instance level lock and removes the performance penalty that was previously imposed by it. The only remaining performance "regression" compared to 2.2 is the extra two atomic operations per filter that a session does when it is being created. This is quite hard to get rid of without significant amounts code and will hopefully be just a drop in the ocean.
This commit is contained in:
@ -76,11 +76,12 @@ public:
|
||||
/**
|
||||
* Get the list of filters this service uses
|
||||
*
|
||||
* @note This locks the service
|
||||
* @note This can lock the service if this is the first time this worker
|
||||
* accesses the filter list
|
||||
*
|
||||
* @return A list of filters or an empty list of no filters are in use
|
||||
*/
|
||||
std::vector<SFilterDef> get_filters() const;
|
||||
const FilterList& get_filters() const;
|
||||
|
||||
inline bool has_filters() const
|
||||
{
|
||||
@ -114,6 +115,20 @@ private:
|
||||
std::string m_weightby; /**< Weighting parameter name */
|
||||
std::string m_version_string; /**< Version string sent to clients */
|
||||
RateLimits m_rate_limits; /**< The refresh rate limits for users of each thread */
|
||||
uint64_t m_wkey; /**< Key for worker local data */
|
||||
|
||||
// Get the worker local filter list
|
||||
FilterList* get_local_filters() const;
|
||||
|
||||
// Update the local filter list on the current worker
|
||||
void update_local_filters();
|
||||
|
||||
// Callback for updating the local filter list
|
||||
static void update_filters_cb(void* data)
|
||||
{
|
||||
Service* service = static_cast<Service*>(data);
|
||||
service->update_local_filters();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@ -69,7 +69,8 @@
|
||||
using std::string;
|
||||
using std::set;
|
||||
using namespace maxscale;
|
||||
using Guard = std::lock_guard<std::mutex>;
|
||||
using LockGuard = std::lock_guard<std::mutex>;
|
||||
using UniqueLock = std::unique_lock<std::mutex>;
|
||||
|
||||
/** Base value for server weights */
|
||||
#define SERVICE_BASE_SERVER_WEIGHT 1000
|
||||
@ -121,7 +122,7 @@ Service* service_alloc(const char *name, const char *router, MXS_CONFIG_PARAMETE
|
||||
service->capabilities |= router_api->getCapabilities(service->router_instance);
|
||||
}
|
||||
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
all_services.push_back(service);
|
||||
|
||||
return service;
|
||||
@ -157,7 +158,8 @@ Service::Service(const std::string& service_name, const std::string& router_name
|
||||
m_password(config_get_string(params, CN_PASSWORD)),
|
||||
m_weightby(config_get_string(params, CN_WEIGHTBY)),
|
||||
m_version_string(get_version_string(params)),
|
||||
m_rate_limits(config_threadcount())
|
||||
m_rate_limits(config_threadcount()),
|
||||
m_wkey(mxs_rworker_create_key())
|
||||
{
|
||||
const MXS_MODULE* module = get_module(router_name.c_str(), MODULE_ROUTER);
|
||||
ss_dassert(module);
|
||||
@ -222,6 +224,8 @@ Service::Service(const std::string& service_name, const std::string& router_name
|
||||
|
||||
Service::~Service()
|
||||
{
|
||||
mxs_rworker_delete_data(m_wkey);
|
||||
|
||||
while (auto tmp = ports)
|
||||
{
|
||||
ports = ports->next;
|
||||
@ -250,7 +254,7 @@ void service_free(Service* service)
|
||||
ss_dassert(!service->active || maxscale_teardown_in_progress());
|
||||
|
||||
{
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
auto it = std::remove(all_services.begin(), all_services.end(), service);
|
||||
ss_dassert(it != all_services.end());
|
||||
all_services.erase(it);
|
||||
@ -295,7 +299,7 @@ void service_destroy(Service* service)
|
||||
*/
|
||||
bool service_isvalid(Service *service)
|
||||
{
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
return std::find(all_services.begin(), all_services.end(), service) != all_services.end();
|
||||
}
|
||||
|
||||
@ -540,7 +544,7 @@ bool serviceLaunchListener(Service *service, SERV_LISTENER *port)
|
||||
{
|
||||
ss_dassert(service->state != SERVICE_STATE_FAILED);
|
||||
bool rval = true;
|
||||
Guard guard(service->lock);
|
||||
LockGuard guard(service->lock);
|
||||
|
||||
if (serviceStartPort(service, port) == 0)
|
||||
{
|
||||
@ -921,7 +925,7 @@ bool serviceAddBackend(SERVICE *svc, SERVER *server)
|
||||
if (new_ref)
|
||||
{
|
||||
rval = true;
|
||||
Guard guard(service->lock);
|
||||
LockGuard guard(service->lock);
|
||||
|
||||
service->n_dbref++;
|
||||
|
||||
@ -974,7 +978,7 @@ bool serviceAddBackend(SERVICE *svc, SERVER *server)
|
||||
*/
|
||||
void serviceRemoveBackend(Service *service, const SERVER *server)
|
||||
{
|
||||
Guard guard(service->lock);
|
||||
LockGuard guard(service->lock);
|
||||
|
||||
for (SERVER_REF *ref = service->dbref; ref; ref = ref->next)
|
||||
{
|
||||
@ -998,7 +1002,7 @@ bool serviceHasBackend(Service *service, SERVER *server)
|
||||
{
|
||||
SERVER_REF *ptr;
|
||||
|
||||
Guard guard(service->lock);
|
||||
LockGuard guard(service->lock);
|
||||
ptr = service->dbref;
|
||||
while (ptr)
|
||||
{
|
||||
@ -1082,18 +1086,49 @@ bool Service::set_filters(const std::string& filters)
|
||||
|
||||
if (rval)
|
||||
{
|
||||
Guard guard(lock);
|
||||
UniqueLock guard(lock);
|
||||
m_filters = flist;
|
||||
capabilities |= my_capabilities;
|
||||
capabilities |= capabilities;
|
||||
guard.unlock();
|
||||
|
||||
// Broadcast a message to other workers to update their filter lists
|
||||
mxs_rworker_broadcast(update_filters_cb, this);
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
std::vector<SFilterDef> Service::get_filters() const
|
||||
static void destroy_filter_list(void* data)
|
||||
{
|
||||
Guard guard(lock);
|
||||
return m_filters;
|
||||
Service::FilterList* filters = static_cast<Service::FilterList*>(data);
|
||||
delete filters;
|
||||
}
|
||||
|
||||
Service::FilterList* Service::get_local_filters() const
|
||||
{
|
||||
FilterList* filters = static_cast<FilterList*>(mxs_rworker_get_data(m_wkey));
|
||||
|
||||
if (filters == nullptr)
|
||||
{
|
||||
UniqueLock guard(lock);
|
||||
filters = new FilterList(m_filters);
|
||||
guard.unlock();
|
||||
mxs_rworker_set_data(m_wkey, filters, destroy_filter_list);
|
||||
}
|
||||
|
||||
return filters;
|
||||
}
|
||||
|
||||
void Service::update_local_filters()
|
||||
{
|
||||
FilterList* filters = get_local_filters();
|
||||
LockGuard guard(lock);
|
||||
*filters = m_filters;
|
||||
}
|
||||
|
||||
const Service::FilterList& Service::get_filters() const
|
||||
{
|
||||
return *get_local_filters();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1112,7 +1147,7 @@ bool service_set_filters(Service* service, const char* filters)
|
||||
|
||||
Service* service_internal_find(const char *name)
|
||||
{
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
for (Service* s : all_services)
|
||||
{
|
||||
@ -1145,7 +1180,7 @@ SERVICE* service_find(const char *servname)
|
||||
void
|
||||
dprintAllServices(DCB *dcb)
|
||||
{
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
for (Service* s : all_services)
|
||||
{
|
||||
@ -1237,7 +1272,7 @@ dListServices(DCB *dcb)
|
||||
{
|
||||
const char HORIZ_SEPARATOR[] = "--------------------------+-------------------"
|
||||
"+--------+----------------+-------------------\n";
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
if (!all_services.empty())
|
||||
{
|
||||
@ -1285,7 +1320,7 @@ dListServices(DCB *dcb)
|
||||
*/
|
||||
void dListListeners(DCB *dcb)
|
||||
{
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
if (!all_services.empty())
|
||||
{
|
||||
@ -1329,7 +1364,7 @@ bool Service::refresh_users()
|
||||
time_t now = time(NULL);
|
||||
|
||||
// Use unique_lock instead of lock_guard to make the locking conditional
|
||||
std::unique_lock<std::mutex> guard(lock, std::defer_lock);
|
||||
UniqueLock guard(lock, std::defer_lock);
|
||||
|
||||
if ((capabilities & ACAP_TYPE_ASYNC) == 0)
|
||||
{
|
||||
@ -1510,7 +1545,7 @@ int
|
||||
serviceSessionCountAll()
|
||||
{
|
||||
int rval = 0;
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
for (Service* service : all_services)
|
||||
{
|
||||
@ -1528,7 +1563,7 @@ serviceSessionCountAll()
|
||||
std::unique_ptr<ResultSet> serviceGetListenerList()
|
||||
{
|
||||
std::unique_ptr<ResultSet> set = ResultSet::create({"Service Name", "Protocol Module", "Address", "Port", "State"});
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
for (Service* service : all_services)
|
||||
{
|
||||
@ -1553,7 +1588,7 @@ std::unique_ptr<ResultSet> serviceGetListenerList()
|
||||
std::unique_ptr<ResultSet> serviceGetList()
|
||||
{
|
||||
std::unique_ptr<ResultSet> set = ResultSet::create({"Service Name", "Router Module", "No. Sessions", "Total Sessions"});
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
for (Service* s : all_services)
|
||||
{
|
||||
@ -1582,7 +1617,7 @@ static bool service_internal_restart(void *data)
|
||||
bool service_all_services_have_listeners()
|
||||
{
|
||||
bool rval = true;
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
for (Service* service : all_services)
|
||||
{
|
||||
@ -1675,7 +1710,7 @@ static void service_calculate_weights(SERVICE *service)
|
||||
|
||||
void service_update_weights()
|
||||
{
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
for (Service* service : all_services)
|
||||
{
|
||||
@ -1685,11 +1720,11 @@ void service_update_weights()
|
||||
|
||||
bool service_server_in_use(const SERVER *server)
|
||||
{
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
for (Service* service : all_services)
|
||||
{
|
||||
Guard guard(service->lock);
|
||||
LockGuard guard(service->lock);
|
||||
|
||||
for (SERVER_REF *ref = service->dbref; ref; ref = ref->next)
|
||||
{
|
||||
@ -1706,7 +1741,7 @@ bool service_server_in_use(const SERVER *server)
|
||||
bool service_filter_in_use(const SFilterDef& filter)
|
||||
{
|
||||
ss_dassert(filter);
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
for (Service* service : all_services)
|
||||
{
|
||||
@ -1910,7 +1945,7 @@ void service_print_users(DCB *dcb, const SERVICE *service)
|
||||
bool service_port_is_used(unsigned short port)
|
||||
{
|
||||
bool rval = false;
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
for (Service* service : all_services)
|
||||
{
|
||||
@ -2112,7 +2147,7 @@ json_t* service_json_data(const SERVICE* svc, const char* host)
|
||||
{
|
||||
const Service* service = static_cast<const Service*>(svc);
|
||||
json_t* rval = json_object();
|
||||
Guard guard(service->lock);
|
||||
LockGuard guard(service->lock);
|
||||
|
||||
json_object_set_new(rval, CN_ID, json_string(service->name));
|
||||
json_object_set_new(rval, CN_TYPE, json_string(CN_SERVICES));
|
||||
@ -2156,7 +2191,7 @@ json_t* service_listener_to_json(const Service* service, const char* name, const
|
||||
json_t* service_list_to_json(const char* host)
|
||||
{
|
||||
json_t* arr = json_array();
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
for (Service* service : all_services)
|
||||
{
|
||||
@ -2174,7 +2209,7 @@ json_t* service_list_to_json(const char* host)
|
||||
json_t* service_relations_to_filter(const SFilterDef& filter, const char* host)
|
||||
{
|
||||
json_t* rel = mxs_json_relationship(host, MXS_JSON_API_SERVICES);
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
for (Service* service : all_services)
|
||||
{
|
||||
@ -2194,11 +2229,11 @@ json_t* service_relations_to_filter(const SFilterDef& filter, const char* host)
|
||||
json_t* service_relations_to_server(const SERVER* server, const char* host)
|
||||
{
|
||||
std::vector<std::string> names;
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
for (Service* service : all_services)
|
||||
{
|
||||
Guard guard(service->lock);
|
||||
LockGuard guard(service->lock);
|
||||
|
||||
for (SERVER_REF *ref = service->dbref; ref; ref = ref->next)
|
||||
{
|
||||
@ -2306,7 +2341,7 @@ uint64_t service_get_version(const SERVICE *svc, service_version_which_t which)
|
||||
|
||||
bool service_thread_init()
|
||||
{
|
||||
Guard guard(service_spin);
|
||||
LockGuard guard(service_spin);
|
||||
|
||||
for (Service* service : all_services)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user