MXS-1474 Factor out functionality

More changes coming, so better to factor out the COM_QUERY handling.
This commit is contained in:
Johan Wikman
2017-10-25 14:06:54 +03:00
parent 5068d49cf7
commit 20bb825882
2 changed files with 123 additions and 101 deletions

View File

@ -244,7 +244,7 @@ int CacheFilterSession::routeQuery(GWBUF* pPacket)
reset_response_state();
m_state = CACHE_IGNORING_RESPONSE;
int rv;
int rv = 1;
switch ((int)MYSQL_GET_COMMAND(pData))
{
@ -287,106 +287,7 @@ int CacheFilterSession::routeQuery(GWBUF* pPacket)
break;
case MXS_COM_QUERY:
if (should_consult_cache(pPacket))
{
if (m_pCache->should_store(m_zDefaultDb, pPacket))
{
cache_result_t result = m_pCache->get_key(m_zDefaultDb, pPacket, &m_key);
if (CACHE_RESULT_IS_OK(result))
{
if (m_pCache->should_use(m_pSession))
{
uint32_t flags = CACHE_FLAGS_INCLUDE_STALE;
GWBUF* pResponse;
result = m_pCache->get_value(m_key, flags, &pResponse);
if (CACHE_RESULT_IS_OK(result))
{
if (CACHE_RESULT_IS_STALE(result))
{
// The value was found, but it was stale. Now we need to
// figure out whether somebody else is already fetching it.
if (m_pCache->must_refresh(m_key, this))
{
// We were the first ones who hit the stale item. It's
// our responsibility now to fetch it.
if (log_decisions())
{
MXS_NOTICE("Cache data is stale, fetching fresh from server.");
}
// As we don't use the response it must be freed.
gwbuf_free(pResponse);
m_refreshing = true;
fetch_from_server = true;
}
else
{
// Somebody is already fetching the new value. So, let's
// use the stale value. No point in hitting the server twice.
if (log_decisions())
{
MXS_NOTICE("Cache data is stale but returning it, fresh "
"data is being fetched already.");
}
fetch_from_server = false;
}
}
else
{
if (log_decisions())
{
MXS_NOTICE("Using fresh data from cache.");
}
fetch_from_server = false;
}
}
else
{
fetch_from_server = true;
}
if (fetch_from_server)
{
m_state = CACHE_EXPECTING_RESPONSE;
}
else
{
m_state = CACHE_EXPECTING_NOTHING;
gwbuf_free(pPacket);
DCB *dcb = m_pSession->client_dcb;
// TODO: This is not ok. Any filters before this filter, will not
// TODO: see this data.
rv = dcb->func.write(dcb, pResponse);
}
}
else
{
// We will not use any value in the cache, but we will update
// the existing value.
if (log_decisions())
{
MXS_NOTICE("Unconditionally fetching data from the server, "
"refreshing cache entry.");
}
m_state = CACHE_EXPECTING_RESPONSE;
}
}
else
{
MXS_ERROR("Could not create cache key.");
m_state = CACHE_IGNORING_RESPONSE;
}
}
else
{
m_state = CACHE_IGNORING_RESPONSE;
}
}
fetch_from_server = route_COM_QUERY(pPacket);
break;
default:
@ -992,3 +893,122 @@ bool CacheFilterSession::should_consult_cache(GWBUF* pPacket)
return consult_cache;
}
/**
* Routes a COM_QUERY packet.
*
* @param pPacket A contiguousCOM_QUERY packet.
*
* @return True if the data should be fetched from the backend,
* false otherwise.
*/
bool CacheFilterSession::route_COM_QUERY(GWBUF* pPacket)
{
ss_debug(uint8_t* pData = static_cast<uint8_t*>(GWBUF_DATA(pPacket)));
ss_dassert((int)MYSQL_GET_COMMAND(pData) == MXS_COM_QUERY);
bool fetch_from_server = true;
if (should_consult_cache(pPacket))
{
if (m_pCache->should_store(m_zDefaultDb, pPacket))
{
cache_result_t result = m_pCache->get_key(m_zDefaultDb, pPacket, &m_key);
if (CACHE_RESULT_IS_OK(result))
{
if (m_pCache->should_use(m_pSession))
{
uint32_t flags = CACHE_FLAGS_INCLUDE_STALE;
GWBUF* pResponse;
result = m_pCache->get_value(m_key, flags, &pResponse);
if (CACHE_RESULT_IS_OK(result))
{
if (CACHE_RESULT_IS_STALE(result))
{
// The value was found, but it was stale. Now we need to
// figure out whether somebody else is already fetching it.
if (m_pCache->must_refresh(m_key, this))
{
// We were the first ones who hit the stale item. It's
// our responsibility now to fetch it.
if (log_decisions())
{
MXS_NOTICE("Cache data is stale, fetching fresh from server.");
}
// As we don't use the response it must be freed.
gwbuf_free(pResponse);
m_refreshing = true;
fetch_from_server = true;
}
else
{
// Somebody is already fetching the new value. So, let's
// use the stale value. No point in hitting the server twice.
if (log_decisions())
{
MXS_NOTICE("Cache data is stale but returning it, fresh "
"data is being fetched already.");
}
fetch_from_server = false;
}
}
else
{
if (log_decisions())
{
MXS_NOTICE("Using fresh data from cache.");
}
fetch_from_server = false;
}
}
else
{
fetch_from_server = true;
}
if (fetch_from_server)
{
m_state = CACHE_EXPECTING_RESPONSE;
}
else
{
m_state = CACHE_EXPECTING_NOTHING;
gwbuf_free(pPacket);
DCB *dcb = m_pSession->client_dcb;
// TODO: This is not ok. Any filters before this filter, will not
// TODO: see this data.
dcb->func.write(dcb, pResponse);
}
}
else
{
// We will not use any value in the cache, but we will update
// the existing value.
if (log_decisions())
{
MXS_NOTICE("Unconditionally fetching data from the server, "
"refreshing cache entry.");
}
m_state = CACHE_EXPECTING_RESPONSE;
}
}
else
{
MXS_ERROR("Could not create cache key.");
m_state = CACHE_IGNORING_RESPONSE;
}
}
else
{
m_state = CACHE_IGNORING_RESPONSE;
}
}
return fetch_from_server;
}

View File

@ -111,6 +111,8 @@ private:
bool should_consult_cache(GWBUF* pPacket);
bool route_COM_QUERY(GWBUF* pPacket);
private:
CacheFilterSession(MXS_SESSION* pSession, Cache* pCache, char* zDefaultDb);