MXS-3892: Delay routing instead of canceling it
Putting the sessions that aren't doing the update on hold makes the new mechanism work the same way the old one did with the exception that it won't put any extra work on the database itself.
This commit is contained in:
@ -101,6 +101,11 @@ void SchemaRouterSession::close()
|
|||||||
{
|
{
|
||||||
m_closed = true;
|
m_closed = true;
|
||||||
|
|
||||||
|
if (m_dcid)
|
||||||
|
{
|
||||||
|
mxb::Worker::get_current()->cancel_delayed_call(m_dcid);
|
||||||
|
}
|
||||||
|
|
||||||
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
|
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
|
||||||
{
|
{
|
||||||
SSRBackend& bref = *it;
|
SSRBackend& bref = *it;
|
||||||
@ -287,7 +292,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (m_shard.empty())
|
if (m_shard.empty() && (m_state & INIT_MAPPING) == 0)
|
||||||
{
|
{
|
||||||
// Check if another session has managed to update the shard cache
|
// Check if another session has managed to update the shard cache
|
||||||
m_shard = m_router->m_shard_manager.get_shard(m_key, m_config->refresh_min_interval);
|
m_shard = m_router->m_shard_manager.get_shard(m_key, m_config->refresh_min_interval);
|
||||||
@ -303,10 +308,17 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
// Too many concurrent updates
|
// Too many concurrent updates
|
||||||
|
m_queue.push_back(pPacket);
|
||||||
|
|
||||||
// TODO: Queue this session instead of killing it
|
if (m_dcid == 0)
|
||||||
m_pSession->kill(modutil_create_mysql_err_msg(1, 0, 1096, "HY000", "Too many updates"));
|
{
|
||||||
return 0;
|
// Wait for the other session to finish its update
|
||||||
|
auto worker = mxs::RoutingWorker::get_current();
|
||||||
|
m_dcid = worker->delayed_call(1000, &SchemaRouterSession::delay_routing, this);
|
||||||
|
MXS_INFO("Waiting for the database mapping to be completed by another session");
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1105,6 +1117,37 @@ void SchemaRouterSession::route_queued_query()
|
|||||||
session_delay_routing(m_pSession, router_as_downstream(m_pSession), tmp, 0);
|
session_delay_routing(m_pSession, router_as_downstream(m_pSession), tmp, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool SchemaRouterSession::delay_routing(mxb::Worker::Call::action_t action)
|
||||||
|
{
|
||||||
|
bool rv = false;
|
||||||
|
|
||||||
|
if (action == mxb::Worker::Call::EXECUTE)
|
||||||
|
{
|
||||||
|
mxb_assert(m_shard.empty());
|
||||||
|
m_shard = m_router->m_shard_manager.get_shard(m_key, m_config->refresh_min_interval);
|
||||||
|
|
||||||
|
if (!m_shard.empty())
|
||||||
|
{
|
||||||
|
MXS_INFO("Another session updated the shard information, reusing the result");
|
||||||
|
route_queued_query();
|
||||||
|
m_dcid = 0;
|
||||||
|
}
|
||||||
|
else if (m_router->m_shard_manager.start_update(m_key))
|
||||||
|
{
|
||||||
|
// No other sessions are doing an update, start our own update
|
||||||
|
query_databases();
|
||||||
|
m_dcid = 0;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// We're still waiting for an update from another session
|
||||||
|
rv = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @param router_cli_ses Router client session
|
* @param router_cli_ses Router client session
|
||||||
@ -1477,6 +1520,7 @@ enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bre
|
|||||||
*/
|
*/
|
||||||
void SchemaRouterSession::query_databases()
|
void SchemaRouterSession::query_databases()
|
||||||
{
|
{
|
||||||
|
MXS_INFO("Mapping databases");
|
||||||
|
|
||||||
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
|
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
|
||||||
{
|
{
|
||||||
|
|||||||
@ -148,6 +148,7 @@ private:
|
|||||||
int inspect_mapping_states(SSRBackend& bref, GWBUF** wbuf);
|
int inspect_mapping_states(SSRBackend& bref, GWBUF** wbuf);
|
||||||
enum showdb_response parse_mapping_response(SSRBackend& bref, GWBUF** buffer);
|
enum showdb_response parse_mapping_response(SSRBackend& bref, GWBUF** buffer);
|
||||||
void route_queued_query();
|
void route_queued_query();
|
||||||
|
bool delay_routing(mxb::Worker::Call::action_t action);
|
||||||
void synchronize_shards();
|
void synchronize_shards();
|
||||||
void handle_mapping_reply(SSRBackend& bref, GWBUF** pPacket);
|
void handle_mapping_reply(SSRBackend& bref, GWBUF** pPacket);
|
||||||
bool handle_statement(GWBUF* querybuf, SSRBackend& bref, uint8_t command, uint32_t type);
|
bool handle_statement(GWBUF* querybuf, SSRBackend& bref, uint8_t command, uint32_t type);
|
||||||
@ -170,5 +171,6 @@ private:
|
|||||||
uint64_t m_sent_sescmd; /**< The latest session command being executed */
|
uint64_t m_sent_sescmd; /**< The latest session command being executed */
|
||||||
uint64_t m_replied_sescmd;/**< The last session command reply that was sent to the client */
|
uint64_t m_replied_sescmd;/**< The last session command reply that was sent to the client */
|
||||||
SERVER* m_load_target; /**< Target for LOAD DATA LOCAL INFILE */
|
SERVER* m_load_target; /**< Target for LOAD DATA LOCAL INFILE */
|
||||||
|
uint32_t m_dcid {0};
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user