Merge branch '2.2' into 2.2-mrm

This commit is contained in:
Johan Wikman
2017-10-31 16:24:10 +02:00
8 changed files with 304 additions and 130 deletions

View File

@ -138,6 +138,15 @@ static const MXS_ENUM_VALUE parameter_selects_values[] =
{NULL}
};
// Enumeration values for `cache_in_transaction`
static const MXS_ENUM_VALUE parameter_cache_in_trxs_values[] =
{
{"never", CACHE_IN_TRXS_NEVER},
{"read_only_transactions", CACHE_IN_TRXS_READ_ONLY},
{"all_transactions", CACHE_IN_TRXS_ALL},
{NULL}
};
extern "C" MXS_MODULE* MXS_CREATE_MODULE()
{
static modulecmd_arg_type_t show_argv[] =
@ -227,6 +236,13 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
MXS_MODULE_OPT_NONE,
parameter_selects_values
},
{
"cache_in_transactions",
MXS_MODULE_PARAM_ENUM,
CACHE_DEFAULT_CACHE_IN_TRXS,
MXS_MODULE_OPT_NONE,
parameter_cache_in_trxs_values
},
{MXS_END_MODULE_PARAMS}
}
};
@ -332,6 +348,9 @@ bool CacheFilter::process_params(char **pzOptions, MXS_CONFIG_PARAMETER *ppParam
config.selects = static_cast<cache_selects_t>(config_get_enum(ppParams,
"selects",
parameter_selects_values));
config.cache_in_trxs = static_cast<cache_in_trxs_t>(config_get_enum(ppParams,
"cache_in_transactions",
parameter_cache_in_trxs_values));
if (!config.storage)
{

View File

@ -58,6 +58,8 @@
#define CACHE_DEFAULT_SELECTS "verify_cacheable"
// Storage
#define CACHE_DEFAULT_STORAGE "storage_inmemory"
// Transaction behaviour
#define CACHE_DEFAULT_CACHE_IN_TRXS "all_transactions"
typedef enum cache_selects
{
@ -65,6 +67,14 @@ typedef enum cache_selects
CACHE_SELECTS_VERIFY_CACHEABLE,
} cache_selects_t;
typedef enum cache_in_trxs
{
// Do NOT change the order. Code relies upon NEVER < READ_ONLY < ALL.
CACHE_IN_TRXS_NEVER,
CACHE_IN_TRXS_READ_ONLY,
CACHE_IN_TRXS_ALL,
} cache_in_trxs_t;
typedef struct cache_config
{
uint64_t max_resultset_rows; /**< The maximum number of rows of a resultset for it to be cached. */
@ -81,4 +91,5 @@ typedef struct cache_config
uint32_t debug; /**< Debug settings. */
cache_thread_model_t thread_model; /**< Thread model. */
cache_selects_t selects; /**< Assume/verify that selects are cacheable. */
cache_in_trxs_t cache_in_trxs; /**< To cache or not to cache inside transactions. */
} CACHE_CONFIG;

View File

@ -239,12 +239,12 @@ int CacheFilterSession::routeQuery(GWBUF* pPacket)
ss_dassert(GWBUF_LENGTH(pPacket) >= MYSQL_HEADER_LEN + 1);
ss_dassert(MYSQL_GET_PAYLOAD_LEN(pData) + MYSQL_HEADER_LEN == GWBUF_LENGTH(pPacket));
bool fetch_from_server = true;
routing_action_t action = ROUTING_CONTINUE;
reset_response_state();
m_state = CACHE_IGNORING_RESPONSE;
int rv;
int rv = 1;
switch ((int)MYSQL_GET_COMMAND(pData))
{
@ -287,113 +287,14 @@ int CacheFilterSession::routeQuery(GWBUF* pPacket)
break;
case MXS_COM_QUERY:
if (should_consult_cache(pPacket))
{
if (m_pCache->should_store(m_zDefaultDb, pPacket))
{
cache_result_t result = m_pCache->get_key(m_zDefaultDb, pPacket, &m_key);
if (CACHE_RESULT_IS_OK(result))
{
if (m_pCache->should_use(m_pSession))
{
uint32_t flags = CACHE_FLAGS_INCLUDE_STALE;
GWBUF* pResponse;
result = m_pCache->get_value(m_key, flags, &pResponse);
if (CACHE_RESULT_IS_OK(result))
{
if (CACHE_RESULT_IS_STALE(result))
{
// The value was found, but it was stale. Now we need to
// figure out whether somebody else is already fetching it.
if (m_pCache->must_refresh(m_key, this))
{
// We were the first ones who hit the stale item. It's
// our responsibility now to fetch it.
if (log_decisions())
{
MXS_NOTICE("Cache data is stale, fetching fresh from server.");
}
// As we don't use the response it must be freed.
gwbuf_free(pResponse);
m_refreshing = true;
fetch_from_server = true;
}
else
{
// Somebody is already fetching the new value. So, let's
// use the stale value. No point in hitting the server twice.
if (log_decisions())
{
MXS_NOTICE("Cache data is stale but returning it, fresh "
"data is being fetched already.");
}
fetch_from_server = false;
}
}
else
{
if (log_decisions())
{
MXS_NOTICE("Using fresh data from cache.");
}
fetch_from_server = false;
}
}
else
{
fetch_from_server = true;
}
if (fetch_from_server)
{
m_state = CACHE_EXPECTING_RESPONSE;
}
else
{
m_state = CACHE_EXPECTING_NOTHING;
gwbuf_free(pPacket);
DCB *dcb = m_pSession->client_dcb;
// TODO: This is not ok. Any filters before this filter, will not
// TODO: see this data.
rv = dcb->func.write(dcb, pResponse);
}
}
else
{
// We will not use any value in the cache, but we will update
// the existing value.
if (log_decisions())
{
MXS_NOTICE("Unconditionally fetching data from the server, "
"refreshing cache entry.");
}
m_state = CACHE_EXPECTING_RESPONSE;
}
}
else
{
MXS_ERROR("Could not create cache key.");
m_state = CACHE_IGNORING_RESPONSE;
}
}
else
{
m_state = CACHE_IGNORING_RESPONSE;
}
}
action = route_COM_QUERY(pPacket);
break;
default:
break;
}
if (fetch_from_server)
if (action == ROUTING_CONTINUE)
{
rv = m_down.routeQuery(pPacket);
}
@ -839,15 +740,16 @@ void CacheFilterSession::store_result()
*
* @param pParam The GWBUF being handled.
*
* @return True, if the cache should be consulted, false otherwise.
* @return Enum value indicating appropriate action.
*/
bool CacheFilterSession::should_consult_cache(GWBUF* pPacket)
CacheFilterSession::cache_action_t CacheFilterSession::get_cache_action(GWBUF* pPacket)
{
bool consult_cache = false;
cache_action_t action = CACHE_IGNORE;
uint32_t type_mask = qc_get_trx_type_mask(pPacket); // Note, only trx-related type mask
const char* zReason = NULL;
const CACHE_CONFIG& config = m_pCache->config();
if (qc_query_is_type(type_mask, QUERY_TYPE_BEGIN_TRX))
{
@ -865,23 +767,55 @@ bool CacheFilterSession::should_consult_cache(GWBUF* pPacket)
{
zReason = "no transaction";
}
consult_cache = true;
action = CACHE_USE_AND_POPULATE;
}
else if (session_trx_is_read_only(m_pSession))
{
if (log_decisions())
if (config.cache_in_trxs >= CACHE_IN_TRXS_READ_ONLY)
{
zReason = "explicitly read-only transaction";
if (log_decisions())
{
zReason = "explicitly read-only transaction";
}
action = CACHE_USE_AND_POPULATE;
}
else
{
ss_dassert(config.cache_in_trxs == CACHE_IN_TRXS_NEVER);
if (log_decisions())
{
zReason = "populating but not using cache inside read-only transactions";
}
action = CACHE_POPULATE;
}
consult_cache = true;
}
else if (m_is_read_only)
{
if (log_decisions())
// There is a transaction and it is *not* explicitly read-only,
// although so far there has only been SELECTs.
if (config.cache_in_trxs >= CACHE_IN_TRXS_ALL)
{
zReason = "ordinary transaction that has so far been read-only";
if (log_decisions())
{
zReason = "ordinary transaction that has so far been read-only";
}
action = CACHE_USE_AND_POPULATE;
}
else
{
ss_dassert((config.cache_in_trxs == CACHE_IN_TRXS_NEVER) ||
(config.cache_in_trxs == CACHE_IN_TRXS_READ_ONLY));
if (log_decisions())
{
zReason =
"populating but not using cache inside transaction that is not "
"explicitly read-only, but that has used only SELECTs sofar";
}
action = CACHE_POPULATE;
}
consult_cache = true;
}
else
{
@ -891,11 +825,11 @@ bool CacheFilterSession::should_consult_cache(GWBUF* pPacket)
}
}
if (consult_cache)
if (action != CACHE_IGNORE)
{
if (is_select_statement(pPacket))
{
if (m_pCache->config().selects == CACHE_SELECTS_VERIFY_CACHEABLE)
if (config.selects == CACHE_SELECTS_VERIFY_CACHEABLE)
{
// Note that the type mask must be obtained a new. A few lines
// above we only got the transaction state related type mask.
@ -903,22 +837,22 @@ bool CacheFilterSession::should_consult_cache(GWBUF* pPacket)
if (qc_query_is_type(type_mask, QUERY_TYPE_USERVAR_READ))
{
consult_cache = false;
action = CACHE_IGNORE;
zReason = "user variables are read";
}
else if (qc_query_is_type(type_mask, QUERY_TYPE_SYSVAR_READ))
{
consult_cache = false;
action = CACHE_IGNORE;
zReason = "system variables are read";
}
else if (uses_non_cacheable_function(pPacket))
{
consult_cache = false;
action = CACHE_IGNORE;
zReason = "uses non-cacheable function";
}
else if (uses_non_cacheable_variable(pPacket))
{
consult_cache = false;
action = CACHE_IGNORE;
zReason = "uses non-cacheable variable";
}
}
@ -927,9 +861,11 @@ bool CacheFilterSession::should_consult_cache(GWBUF* pPacket)
{
// A bit broad, as e.g. SHOW will cause the read only state to be turned
// off. However, during normal use this will always be an UPDATE, INSERT
// or DELETE.
// or DELETE. Note that 'm_is_read_only' only affects transactions that
// are not explicitly read-only.
m_is_read_only = false;
consult_cache = false;
action = CACHE_IGNORE;
zReason = "statement is not SELECT";
}
}
@ -955,11 +891,154 @@ bool CacheFilterSession::should_consult_cache(GWBUF* pPacket)
length = max_length - 3; // strlen("...");
}
const char* zDecision = (consult_cache ? "CONSULT" : "IGNORE ");
const char* zDecision = (action == CACHE_IGNORE) ? "IGNORE" : "CONSULT";
ss_dassert(zReason);
MXS_NOTICE(zFormat, zDecision, length, pSql, zReason);
}
return consult_cache;
return action;
}
/**
* Routes a COM_QUERY packet.
*
* @param pPacket A contiguous COM_QUERY packet.
*
* @return ROUTING_ABORT if the processing of the packet should be aborted
* (as the data is obtained from the cache) or
* ROUTING_CONTINUE if the normal processing should continue.
*/
CacheFilterSession::routing_action_t CacheFilterSession::route_COM_QUERY(GWBUF* pPacket)
{
ss_debug(uint8_t* pData = static_cast<uint8_t*>(GWBUF_DATA(pPacket)));
ss_dassert((int)MYSQL_GET_COMMAND(pData) == MXS_COM_QUERY);
routing_action_t routing_action = ROUTING_CONTINUE;
cache_action_t cache_action = get_cache_action(pPacket);
if (cache_action != CACHE_IGNORE)
{
if (m_pCache->should_store(m_zDefaultDb, pPacket))
{
cache_result_t result = m_pCache->get_key(m_zDefaultDb, pPacket, &m_key);
if (CACHE_RESULT_IS_OK(result))
{
routing_action = route_SELECT(cache_action, pPacket);
}
else
{
MXS_ERROR("Could not create cache key.");
m_state = CACHE_IGNORING_RESPONSE;
}
}
else
{
m_state = CACHE_IGNORING_RESPONSE;
}
}
return routing_action;
}
/**
* Routes a SELECT packet.
*
* @param cache_action The desired action.
* @param pPacket A contiguous COM_QUERY packet containing a SELECT.
*
* @return ROUTING_ABORT if the processing of the packet should be aborted
* (as the data is obtained from the cache) or
* ROUTING_CONTINUE if the normal processing should continue.
*/
CacheFilterSession::routing_action_t CacheFilterSession::route_SELECT(cache_action_t cache_action, GWBUF* pPacket)
{
routing_action_t routing_action = ROUTING_CONTINUE;
if (should_use(cache_action) && m_pCache->should_use(m_pSession))
{
uint32_t flags = CACHE_FLAGS_INCLUDE_STALE;
GWBUF* pResponse;
cache_result_t result = m_pCache->get_value(m_key, flags, &pResponse);
if (CACHE_RESULT_IS_OK(result))
{
if (CACHE_RESULT_IS_STALE(result))
{
// The value was found, but it was stale. Now we need to
// figure out whether somebody else is already fetching it.
if (m_pCache->must_refresh(m_key, this))
{
// We were the first ones who hit the stale item. It's
// our responsibility now to fetch it.
if (log_decisions())
{
MXS_NOTICE("Cache data is stale, fetching fresh from server.");
}
// As we don't use the response it must be freed.
gwbuf_free(pResponse);
m_refreshing = true;
routing_action = ROUTING_CONTINUE;
}
else
{
// Somebody is already fetching the new value. So, let's
// use the stale value. No point in hitting the server twice.
if (log_decisions())
{
MXS_NOTICE("Cache data is stale but returning it, fresh "
"data is being fetched already.");
}
routing_action = ROUTING_ABORT;
}
}
else
{
if (log_decisions())
{
MXS_NOTICE("Using fresh data from cache.");
}
routing_action = ROUTING_ABORT;
}
}
else
{
routing_action = ROUTING_CONTINUE;
}
if (routing_action == ROUTING_CONTINUE)
{
m_state = CACHE_EXPECTING_RESPONSE;
}
else
{
m_state = CACHE_EXPECTING_NOTHING;
gwbuf_free(pPacket);
DCB *dcb = m_pSession->client_dcb;
// TODO: This is not ok. Any filters before this filter, will not
// TODO: see this data.
dcb->func.write(dcb, pResponse);
}
}
else
{
ss_dassert(should_populate(cache_action));
// We will not use any value in the cache, but we will update
// the existing value.
if (log_decisions())
{
MXS_NOTICE("Unconditionally fetching data from the server, "
"refreshing cache entry.");
}
m_state = CACHE_EXPECTING_RESPONSE;
}
return routing_action;
}

View File

@ -109,7 +109,34 @@ private:
void store_result();
bool should_consult_cache(GWBUF* pPacket);
enum cache_action_t
{
CACHE_IGNORE = 0,
CACHE_USE = 1,
CACHE_POPULATE = 2,
CACHE_USE_AND_POPULATE = (CACHE_USE | CACHE_POPULATE)
};
static bool should_use(cache_action_t action)
{
return action & CACHE_USE ? true : false;
}
static bool should_populate(cache_action_t action)
{
return action & CACHE_POPULATE ? true : false;
}
cache_action_t get_cache_action(GWBUF* pPacket);
enum routing_action_t
{
ROUTING_ABORT, /**< Abort normal routing activity, data is coming from cache. */
ROUTING_CONTINUE, /**< Continue normal routing activity. */
};
routing_action_t route_COM_QUERY(GWBUF* pPacket);
routing_action_t route_SELECT(cache_action_t action, GWBUF* pPacket);
private:
CacheFilterSession(MXS_SESSION* pSession, Cache* pCache, char* zDefaultDb);