MXS-1474 Refactor for forthcoming changes
This commit is contained in:
181
server/modules/filter/cache/cachefiltersession.cc
vendored
181
server/modules/filter/cache/cachefiltersession.cc
vendored
@ -897,7 +897,7 @@ bool CacheFilterSession::should_consult_cache(GWBUF* pPacket)
|
|||||||
/**
|
/**
|
||||||
* Routes a COM_QUERY packet.
|
* Routes a COM_QUERY packet.
|
||||||
*
|
*
|
||||||
* @param pPacket A contiguousCOM_QUERY packet.
|
* @param pPacket A contiguous COM_QUERY packet.
|
||||||
*
|
*
|
||||||
* @return ROUTING_ABORT if the processing of the packet should be aborted
|
* @return ROUTING_ABORT if the processing of the packet should be aborted
|
||||||
* (as the data is obtained from the cache) or
|
* (as the data is obtained from the cache) or
|
||||||
@ -918,86 +918,7 @@ CacheFilterSession::routing_action_t CacheFilterSession::route_COM_QUERY(GWBUF*
|
|||||||
|
|
||||||
if (CACHE_RESULT_IS_OK(result))
|
if (CACHE_RESULT_IS_OK(result))
|
||||||
{
|
{
|
||||||
if (m_pCache->should_use(m_pSession))
|
action = route_SELECT(pPacket);
|
||||||
{
|
|
||||||
uint32_t flags = CACHE_FLAGS_INCLUDE_STALE;
|
|
||||||
GWBUF* pResponse;
|
|
||||||
result = m_pCache->get_value(m_key, flags, &pResponse);
|
|
||||||
|
|
||||||
if (CACHE_RESULT_IS_OK(result))
|
|
||||||
{
|
|
||||||
if (CACHE_RESULT_IS_STALE(result))
|
|
||||||
{
|
|
||||||
// The value was found, but it was stale. Now we need to
|
|
||||||
// figure out whether somebody else is already fetching it.
|
|
||||||
|
|
||||||
if (m_pCache->must_refresh(m_key, this))
|
|
||||||
{
|
|
||||||
// We were the first ones who hit the stale item. It's
|
|
||||||
// our responsibility now to fetch it.
|
|
||||||
if (log_decisions())
|
|
||||||
{
|
|
||||||
MXS_NOTICE("Cache data is stale, fetching fresh from server.");
|
|
||||||
}
|
|
||||||
|
|
||||||
// As we don't use the response it must be freed.
|
|
||||||
gwbuf_free(pResponse);
|
|
||||||
|
|
||||||
m_refreshing = true;
|
|
||||||
action = ROUTING_CONTINUE;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// Somebody is already fetching the new value. So, let's
|
|
||||||
// use the stale value. No point in hitting the server twice.
|
|
||||||
if (log_decisions())
|
|
||||||
{
|
|
||||||
MXS_NOTICE("Cache data is stale but returning it, fresh "
|
|
||||||
"data is being fetched already.");
|
|
||||||
}
|
|
||||||
action = ROUTING_ABORT;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (log_decisions())
|
|
||||||
{
|
|
||||||
MXS_NOTICE("Using fresh data from cache.");
|
|
||||||
}
|
|
||||||
action = ROUTING_ABORT;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
action = ROUTING_CONTINUE;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (action == ROUTING_CONTINUE)
|
|
||||||
{
|
|
||||||
m_state = CACHE_EXPECTING_RESPONSE;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
m_state = CACHE_EXPECTING_NOTHING;
|
|
||||||
gwbuf_free(pPacket);
|
|
||||||
DCB *dcb = m_pSession->client_dcb;
|
|
||||||
|
|
||||||
// TODO: This is not ok. Any filters before this filter, will not
|
|
||||||
// TODO: see this data.
|
|
||||||
dcb->func.write(dcb, pResponse);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// We will not use any value in the cache, but we will update
|
|
||||||
// the existing value.
|
|
||||||
if (log_decisions())
|
|
||||||
{
|
|
||||||
MXS_NOTICE("Unconditionally fetching data from the server, "
|
|
||||||
"refreshing cache entry.");
|
|
||||||
}
|
|
||||||
m_state = CACHE_EXPECTING_RESPONSE;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -1013,3 +934,101 @@ CacheFilterSession::routing_action_t CacheFilterSession::route_COM_QUERY(GWBUF*
|
|||||||
|
|
||||||
return action;
|
return action;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Routes a SELECT packet.
|
||||||
|
*
|
||||||
|
* @param pPacket A contiguous COM_QUERY packet containing a SELECT.
|
||||||
|
*
|
||||||
|
* @return ROUTING_ABORT if the processing of the packet should be aborted
|
||||||
|
* (as the data is obtained from the cache) or
|
||||||
|
* ROUTING_CONTINUE if the normal processing should continue.
|
||||||
|
*/
|
||||||
|
CacheFilterSession::routing_action_t CacheFilterSession::route_SELECT(GWBUF* pPacket)
|
||||||
|
{
|
||||||
|
routing_action_t action = ROUTING_CONTINUE;
|
||||||
|
|
||||||
|
if (m_pCache->should_use(m_pSession))
|
||||||
|
{
|
||||||
|
uint32_t flags = CACHE_FLAGS_INCLUDE_STALE;
|
||||||
|
GWBUF* pResponse;
|
||||||
|
cache_result_t result = m_pCache->get_value(m_key, flags, &pResponse);
|
||||||
|
|
||||||
|
if (CACHE_RESULT_IS_OK(result))
|
||||||
|
{
|
||||||
|
if (CACHE_RESULT_IS_STALE(result))
|
||||||
|
{
|
||||||
|
// The value was found, but it was stale. Now we need to
|
||||||
|
// figure out whether somebody else is already fetching it.
|
||||||
|
|
||||||
|
if (m_pCache->must_refresh(m_key, this))
|
||||||
|
{
|
||||||
|
// We were the first ones who hit the stale item. It's
|
||||||
|
// our responsibility now to fetch it.
|
||||||
|
if (log_decisions())
|
||||||
|
{
|
||||||
|
MXS_NOTICE("Cache data is stale, fetching fresh from server.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// As we don't use the response it must be freed.
|
||||||
|
gwbuf_free(pResponse);
|
||||||
|
|
||||||
|
m_refreshing = true;
|
||||||
|
action = ROUTING_CONTINUE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Somebody is already fetching the new value. So, let's
|
||||||
|
// use the stale value. No point in hitting the server twice.
|
||||||
|
if (log_decisions())
|
||||||
|
{
|
||||||
|
MXS_NOTICE("Cache data is stale but returning it, fresh "
|
||||||
|
"data is being fetched already.");
|
||||||
|
}
|
||||||
|
action = ROUTING_ABORT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (log_decisions())
|
||||||
|
{
|
||||||
|
MXS_NOTICE("Using fresh data from cache.");
|
||||||
|
}
|
||||||
|
action = ROUTING_ABORT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
action = ROUTING_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (action == ROUTING_CONTINUE)
|
||||||
|
{
|
||||||
|
m_state = CACHE_EXPECTING_RESPONSE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
m_state = CACHE_EXPECTING_NOTHING;
|
||||||
|
gwbuf_free(pPacket);
|
||||||
|
DCB *dcb = m_pSession->client_dcb;
|
||||||
|
|
||||||
|
// TODO: This is not ok. Any filters before this filter, will not
|
||||||
|
// TODO: see this data.
|
||||||
|
dcb->func.write(dcb, pResponse);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// We will not use any value in the cache, but we will update
|
||||||
|
// the existing value.
|
||||||
|
if (log_decisions())
|
||||||
|
{
|
||||||
|
MXS_NOTICE("Unconditionally fetching data from the server, "
|
||||||
|
"refreshing cache entry.");
|
||||||
|
}
|
||||||
|
m_state = CACHE_EXPECTING_RESPONSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
return action;
|
||||||
|
}
|
||||||
|
|||||||
@ -118,6 +118,7 @@ private:
|
|||||||
};
|
};
|
||||||
|
|
||||||
routing_action_t route_COM_QUERY(GWBUF* pPacket);
|
routing_action_t route_COM_QUERY(GWBUF* pPacket);
|
||||||
|
routing_action_t route_SELECT(GWBUF* pPacket);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
CacheFilterSession(MXS_SESSION* pSession, Cache* pCache, char* zDefaultDb);
|
CacheFilterSession(MXS_SESSION* pSession, Cache* pCache, char* zDefaultDb);
|
||||||
|
|||||||
Reference in New Issue
Block a user