diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index d522abe1c..e7a3ee00a 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -45,7 +45,8 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, , m_backends(backends) , m_config(router->m_config) , m_router(router) - , m_shard(m_router->m_shard_manager.get_shard(get_cache_key(), m_config->refresh_min_interval)) + , m_key(get_cache_key()) + , m_shard(m_router->m_shard_manager.get_shard(m_key, m_config->refresh_min_interval)) , m_state(0) , m_sent_sescmd(0) , m_replied_sescmd(0) @@ -111,6 +112,11 @@ void SchemaRouterSession::close() } } + if (m_state & INIT_MAPPING) + { + m_router->m_shard_manager.cancel_update(m_key); + } + std::lock_guard guard(m_router->m_lock); if (m_router->m_stats.longest_sescmd < m_stats.longest_sescmd) @@ -283,8 +289,26 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) if (m_shard.empty()) { - /* Generate database list */ - query_databases(); + // Check if another session has managed to update the shard cache + m_shard = m_router->m_shard_manager.get_shard(m_key, m_config->refresh_min_interval); + + if (m_shard.empty()) + { + // No entries in the cache, try to start an update + if (m_router->m_shard_manager.start_update(m_key)) + { + // No other sessions are doing an update for this user, start one + query_databases(); + } + else + { + // Too many concurrent updates + + // TODO: Queue this session instead of killing it + m_pSession->kill(modutil_create_mysql_err_msg(1, 0, 1096, "HY000", "Too many updates")); + return 0; + } + } } int ret = 0; @@ -753,7 +777,7 @@ void SchemaRouterSession::handleError(GWBUF* pMessage, void SchemaRouterSession::synchronize_shards() { m_router->m_stats.shmap_cache_miss++; - m_router->m_shard_manager.update_shard(m_shard, get_cache_key()); + m_router->m_shard_manager.update_shard(m_shard, m_key); } /** diff --git a/server/modules/routing/schemarouter/schemaroutersession.hh b/server/modules/routing/schemarouter/schemaroutersession.hh index bfb35e6d9..902b123f6 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.hh +++ b/server/modules/routing/schemarouter/schemaroutersession.hh @@ -160,6 +160,7 @@ private: SSRBackendList m_backends; /**< Backend references */ SConfig m_config; /**< Session specific configuration */ SchemaRouter* m_router; /**< The router instance */ + std::string m_key; /**< Shard cache key */ Shard m_shard; /**< Database to server mapping */ std::string m_connect_db; /**< Database the user was trying to connect to */ std::string m_current_db; /**< Current active database */ diff --git a/server/modules/routing/schemarouter/shard_map.cc b/server/modules/routing/schemarouter/shard_map.cc index c653709f9..9204e51b0 100644 --- a/server/modules/routing/schemarouter/shard_map.cc +++ b/server/modules/routing/schemarouter/shard_map.cc @@ -209,4 +209,34 @@ void ShardManager::update_shard(Shard& shard, std::string user) { m_maps[user] = shard; } + + mxb_assert(m_limits[user] > 0); + --m_limits[user]; +} + +void ShardManager::set_update_limit(int64_t limit) +{ + std::lock_guard guard(m_lock); + m_update_limit = limit; +} + +bool ShardManager::start_update(const std::string& user) +{ + bool rval = false; + std::lock_guard guard(m_lock); + + if (m_limits[user] < m_update_limit) + { + ++m_limits[user]; + rval = true; + } + + return rval; +} + +void ShardManager::cancel_update(const std::string& user) +{ + std::lock_guard guard(m_lock); + mxb_assert(m_limits[user] > 0); + --m_limits[user]; } diff --git a/server/modules/routing/schemarouter/shard_map.hh b/server/modules/routing/schemarouter/shard_map.hh index fd242c0a0..a12b4c91a 100644 --- a/server/modules/routing/schemarouter/shard_map.hh +++ b/server/modules/routing/schemarouter/shard_map.hh @@ -111,7 +111,8 @@ private: time_t m_last_updated; }; -typedef std::unordered_map ShardMap; +typedef std::unordered_map ShardMap; +typedef std::unordered_map MapLimits; class ShardManager { @@ -141,7 +142,38 @@ public: */ void update_shard(Shard& shard, std::string user); + /** + * Set how many concurrent shard updates are allowed per user + * + * By default only one update per user is allowed. + * + * @param limit Number of concurrent users to allow + */ + void set_update_limit(int64_t limit); + + /** + * Start a shard update + * + * The update is considered finished when either update_shard() or cancel_update() is called. One of these + * two must be called by the session once start_update() has returned true. + * + * @param user The user whose shard is about to be updated + * + * @return True if an update can be done. False if there are too many concurrent + * updates being done by this user. + */ + bool start_update(const std::string& user); + + /** + * Cancels a started shard update + * + * @param user The user whose shard was being updated + */ + void cancel_update(const std::string& user); + private: mutable std::mutex m_lock; ShardMap m_maps; + MapLimits m_limits; + int64_t m_update_limit {1}; };