diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index e7a3ee00a..57a479d4f 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -101,6 +101,11 @@ void SchemaRouterSession::close() { m_closed = true; + if (m_dcid) + { + mxb::Worker::get_current()->cancel_delayed_call(m_dcid); + } + for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { SSRBackend& bref = *it; @@ -287,7 +292,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) return 0; } - if (m_shard.empty()) + if (m_shard.empty() && (m_state & INIT_MAPPING) == 0) { // Check if another session has managed to update the shard cache m_shard = m_router->m_shard_manager.get_shard(m_key, m_config->refresh_min_interval); @@ -303,10 +308,17 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) else { // Too many concurrent updates + m_queue.push_back(pPacket); - // TODO: Queue this session instead of killing it - m_pSession->kill(modutil_create_mysql_err_msg(1, 0, 1096, "HY000", "Too many updates")); - return 0; + if (m_dcid == 0) + { + // Wait for the other session to finish its update + auto worker = mxs::RoutingWorker::get_current(); + m_dcid = worker->delayed_call(1000, &SchemaRouterSession::delay_routing, this); + MXS_INFO("Waiting for the database mapping to be completed by another session"); + } + + return 1; } } } @@ -1105,6 +1117,37 @@ void SchemaRouterSession::route_queued_query() session_delay_routing(m_pSession, router_as_downstream(m_pSession), tmp, 0); } +bool SchemaRouterSession::delay_routing(mxb::Worker::Call::action_t action) +{ + bool rv = false; + + if (action == mxb::Worker::Call::EXECUTE) + { + mxb_assert(m_shard.empty()); + m_shard = m_router->m_shard_manager.get_shard(m_key, m_config->refresh_min_interval); + + if (!m_shard.empty()) + { + MXS_INFO("Another session updated the shard information, reusing the result"); + route_queued_query(); + m_dcid = 0; + } + else if (m_router->m_shard_manager.start_update(m_key)) + { + // No other sessions are doing an update, start our own update + query_databases(); + m_dcid = 0; + } + else + { + // We're still waiting for an update from another session + rv = true; + } + } + + return rv; +} + /** * * @param router_cli_ses Router client session @@ -1477,6 +1520,7 @@ enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bre */ void SchemaRouterSession::query_databases() { + MXS_INFO("Mapping databases"); for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { diff --git a/server/modules/routing/schemarouter/schemaroutersession.hh b/server/modules/routing/schemarouter/schemaroutersession.hh index 902b123f6..9ec3f8bf7 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.hh +++ b/server/modules/routing/schemarouter/schemaroutersession.hh @@ -148,6 +148,7 @@ private: int inspect_mapping_states(SSRBackend& bref, GWBUF** wbuf); enum showdb_response parse_mapping_response(SSRBackend& bref, GWBUF** buffer); void route_queued_query(); + bool delay_routing(mxb::Worker::Call::action_t action); void synchronize_shards(); void handle_mapping_reply(SSRBackend& bref, GWBUF** pPacket); bool handle_statement(GWBUF* querybuf, SSRBackend& bref, uint8_t command, uint32_t type); @@ -170,5 +171,6 @@ private: uint64_t m_sent_sescmd; /**< The latest session command being executed */ uint64_t m_replied_sescmd;/**< The last session command reply that was sent to the client */ SERVER* m_load_target; /**< Target for LOAD DATA LOCAL INFILE */ + uint32_t m_dcid {0}; }; }