From 41c2a6ee8eda9f4586d106443f7e624ecfdfcad0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Thu, 23 Dec 2021 08:44:29 +0200 Subject: [PATCH] MXS-3892: Limit concurrent mapping of databases As there are no practical benefits to multiple sessions for the same user mapping the databases at the same time, limiting them to one update per user is sensible. This is especially true now that we know the information_schema tables aren't the most efficient things in the world. The current code implements this rate limiting by closing any extra sessions that would start a second update. The final implementation should suspend them for the duration of the update as it is far more user-friendly. The limits are currently global as the shard caches are also global. This is a performance bottleneck and it could be solved by storing the shard cache inside of a mxs::WorkerGlobal instead of having it as a global cache. --- .../schemarouter/schemaroutersession.cc | 32 ++++++++++++++--- .../schemarouter/schemaroutersession.hh | 1 + .../modules/routing/schemarouter/shard_map.cc | 30 ++++++++++++++++ .../modules/routing/schemarouter/shard_map.hh | 34 ++++++++++++++++++- 4 files changed, 92 insertions(+), 5 deletions(-) diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index d522abe1c..e7a3ee00a 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -45,7 +45,8 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, , m_backends(backends) , m_config(router->m_config) , m_router(router) - , m_shard(m_router->m_shard_manager.get_shard(get_cache_key(), m_config->refresh_min_interval)) + , m_key(get_cache_key()) + , m_shard(m_router->m_shard_manager.get_shard(m_key, m_config->refresh_min_interval)) , m_state(0) , m_sent_sescmd(0) , m_replied_sescmd(0) @@ -111,6 +112,11 @@ void SchemaRouterSession::close() } } + if (m_state & INIT_MAPPING) + { + m_router->m_shard_manager.cancel_update(m_key); + } + std::lock_guard guard(m_router->m_lock); if (m_router->m_stats.longest_sescmd < m_stats.longest_sescmd) @@ -283,8 +289,26 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) if (m_shard.empty()) { - /* Generate database list */ - query_databases(); + // Check if another session has managed to update the shard cache + m_shard = m_router->m_shard_manager.get_shard(m_key, m_config->refresh_min_interval); + + if (m_shard.empty()) + { + // No entries in the cache, try to start an update + if (m_router->m_shard_manager.start_update(m_key)) + { + // No other sessions are doing an update for this user, start one + query_databases(); + } + else + { + // Too many concurrent updates + + // TODO: Queue this session instead of killing it + m_pSession->kill(modutil_create_mysql_err_msg(1, 0, 1096, "HY000", "Too many updates")); + return 0; + } + } } int ret = 0; @@ -753,7 +777,7 @@ void SchemaRouterSession::handleError(GWBUF* pMessage, void SchemaRouterSession::synchronize_shards() { m_router->m_stats.shmap_cache_miss++; - m_router->m_shard_manager.update_shard(m_shard, get_cache_key()); + m_router->m_shard_manager.update_shard(m_shard, m_key); } /** diff --git a/server/modules/routing/schemarouter/schemaroutersession.hh b/server/modules/routing/schemarouter/schemaroutersession.hh index bfb35e6d9..902b123f6 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.hh +++ b/server/modules/routing/schemarouter/schemaroutersession.hh @@ -160,6 +160,7 @@ private: SSRBackendList m_backends; /**< Backend references */ SConfig m_config; /**< Session specific configuration */ SchemaRouter* m_router; /**< The router instance */ + std::string m_key; /**< Shard cache key */ Shard m_shard; /**< Database to server mapping */ std::string m_connect_db; /**< Database the user was trying to connect to */ std::string m_current_db; /**< Current active database */ diff --git a/server/modules/routing/schemarouter/shard_map.cc b/server/modules/routing/schemarouter/shard_map.cc index c653709f9..9204e51b0 100644 --- a/server/modules/routing/schemarouter/shard_map.cc +++ b/server/modules/routing/schemarouter/shard_map.cc @@ -209,4 +209,34 @@ void ShardManager::update_shard(Shard& shard, std::string user) { m_maps[user] = shard; } + + mxb_assert(m_limits[user] > 0); + --m_limits[user]; +} + +void ShardManager::set_update_limit(int64_t limit) +{ + std::lock_guard guard(m_lock); + m_update_limit = limit; +} + +bool ShardManager::start_update(const std::string& user) +{ + bool rval = false; + std::lock_guard guard(m_lock); + + if (m_limits[user] < m_update_limit) + { + ++m_limits[user]; + rval = true; + } + + return rval; +} + +void ShardManager::cancel_update(const std::string& user) +{ + std::lock_guard guard(m_lock); + mxb_assert(m_limits[user] > 0); + --m_limits[user]; } diff --git a/server/modules/routing/schemarouter/shard_map.hh b/server/modules/routing/schemarouter/shard_map.hh index fd242c0a0..a12b4c91a 100644 --- a/server/modules/routing/schemarouter/shard_map.hh +++ b/server/modules/routing/schemarouter/shard_map.hh @@ -111,7 +111,8 @@ private: time_t m_last_updated; }; -typedef std::unordered_map ShardMap; +typedef std::unordered_map ShardMap; +typedef std::unordered_map MapLimits; class ShardManager { @@ -141,7 +142,38 @@ public: */ void update_shard(Shard& shard, std::string user); + /** + * Set how many concurrent shard updates are allowed per user + * + * By default only one update per user is allowed. + * + * @param limit Number of concurrent users to allow + */ + void set_update_limit(int64_t limit); + + /** + * Start a shard update + * + * The update is considered finished when either update_shard() or cancel_update() is called. One of these + * two must be called by the session once start_update() has returned true. + * + * @param user The user whose shard is about to be updated + * + * @return True if an update can be done. False if there are too many concurrent + * updates being done by this user. + */ + bool start_update(const std::string& user); + + /** + * Cancels a started shard update + * + * @param user The user whose shard was being updated + */ + void cancel_update(const std::string& user); + private: mutable std::mutex m_lock; ShardMap m_maps; + MapLimits m_limits; + int64_t m_update_limit {1}; };