Add SessionCache
SessionCache abstracts the cache for a specific session. It then uses the actual Cache instance for storing data. It is not used yet.
This commit is contained in:
parent
10deb9f07b
commit
b5022e1deb
2
server/modules/filter/cache/CMakeLists.txt
vendored
2
server/modules/filter/cache/CMakeLists.txt
vendored
@ -1,5 +1,5 @@
|
||||
if (JANSSON_FOUND)
|
||||
add_library(cache SHARED cachefilter.cc rules.cc storage.cc storagefactory.cc)
|
||||
add_library(cache SHARED cachefilter.cc rules.cc sessioncache.cc storage.cc storagefactory.cc)
|
||||
target_link_libraries(cache maxscale-common jansson)
|
||||
set_target_properties(cache PROPERTIES VERSION "1.0.0")
|
||||
set_target_properties(cache PROPERTIES LINK_FLAGS -Wl,-z,defs)
|
||||
|
26
server/modules/filter/cache/cachefilter.cc
vendored
26
server/modules/filter/cache/cachefilter.cc
vendored
@ -94,19 +94,6 @@ extern "C" FILTER_OBJECT *GetModuleObject()
|
||||
// Implementation
|
||||
//
|
||||
|
||||
typedef struct cache_config
|
||||
{
|
||||
uint32_t max_resultset_rows;
|
||||
uint32_t max_resultset_size;
|
||||
const char *rules;
|
||||
const char *storage;
|
||||
char *storage_options;
|
||||
char **storage_argv;
|
||||
int storage_argc;
|
||||
uint32_t ttl;
|
||||
uint32_t debug;
|
||||
} CACHE_CONFIG;
|
||||
|
||||
static const CACHE_CONFIG DEFAULT_CONFIG =
|
||||
{
|
||||
CACHE_DEFAULT_MAX_RESULTSET_ROWS,
|
||||
@ -120,17 +107,6 @@ static const CACHE_CONFIG DEFAULT_CONFIG =
|
||||
CACHE_DEFAULT_DEBUG
|
||||
};
|
||||
|
||||
typedef struct cache_instance
|
||||
{
|
||||
const char *name; // The name of the instance; the section name in the config.
|
||||
CACHE_CONFIG config; // The configuration of the cache instance.
|
||||
CACHE_RULES *rules; // The rules of the cache instance.
|
||||
StorageFactory *factory; // The storage factory.
|
||||
Storage *storage; // The storage instance to use.
|
||||
HASHTABLE *pending; // Pending items; being fetched from the backend.
|
||||
SPINLOCK pending_lock; // Lock used for protecting 'pending'.
|
||||
} CACHE_INSTANCE;
|
||||
|
||||
typedef enum cache_session_state
|
||||
{
|
||||
CACHE_EXPECTING_RESPONSE, // A select has been sent, and we are waiting for the response.
|
||||
@ -190,7 +166,7 @@ static void store_result(CACHE_SESSION_DATA *csdata);
|
||||
*
|
||||
* @returns Corresponding integer hash.
|
||||
*/
|
||||
static int hash_of_key(const void* key)
|
||||
int hash_of_key(const void* key)
|
||||
{
|
||||
int hash = 0;
|
||||
|
||||
|
31
server/modules/filter/cache/cachefilter.h
vendored
31
server/modules/filter/cache/cachefilter.h
vendored
@ -16,8 +16,11 @@
|
||||
|
||||
#include <maxscale/cdefs.h>
|
||||
#include <limits.h>
|
||||
#include <maxscale/hashtable.h>
|
||||
#include "rules.h"
|
||||
|
||||
MXS_BEGIN_DECLS
|
||||
class Storage;
|
||||
class StorageFactory;
|
||||
|
||||
#define CACHE_DEBUG_NONE 0 /* 0b00000 */
|
||||
#define CACHE_DEBUG_MATCHING 1 /* 0b00001 */
|
||||
@ -40,6 +43,30 @@ MXS_BEGIN_DECLS
|
||||
// Integer value
|
||||
#define CACHE_DEFAULT_DEBUG 0
|
||||
|
||||
MXS_END_DECLS
|
||||
typedef struct cache_config
|
||||
{
|
||||
uint32_t max_resultset_rows;
|
||||
uint32_t max_resultset_size;
|
||||
const char *rules;
|
||||
const char *storage;
|
||||
char *storage_options;
|
||||
char **storage_argv;
|
||||
int storage_argc;
|
||||
uint32_t ttl;
|
||||
uint32_t debug;
|
||||
} CACHE_CONFIG;
|
||||
|
||||
typedef struct cache_instance
|
||||
{
|
||||
const char *name; // The name of the instance; the section name in the config.
|
||||
CACHE_CONFIG config; // The configuration of the cache instance.
|
||||
CACHE_RULES *rules; // The rules of the cache instance.
|
||||
StorageFactory *factory; // The storage factory.
|
||||
Storage *storage; // The storage instance to use.
|
||||
HASHTABLE *pending; // Pending items; being fetched from the backend.
|
||||
SPINLOCK pending_lock; // Lock used for protecting 'pending'.
|
||||
} CACHE_INSTANCE;
|
||||
|
||||
int hash_of_key(const void* key);
|
||||
|
||||
#endif
|
||||
|
684
server/modules/filter/cache/sessioncache.cc
vendored
Normal file
684
server/modules/filter/cache/sessioncache.cc
vendored
Normal file
@ -0,0 +1,684 @@
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
|
||||
*
|
||||
* Change Date: 2019-07-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include "sessioncache.h"
|
||||
#include <new>
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/query_classifier.h>
|
||||
#include <maxscale/mysql_utils.h>
|
||||
#include "storage.h"
|
||||
|
||||
#define DUMMY_VALUE (void*)0xdeadbeef
|
||||
|
||||
SessionCache::SessionCache(CACHE_INSTANCE* pInstance, SESSION* pSession, char* zDefaultDb)
|
||||
: m_state(CACHE_EXPECTING_NOTHING)
|
||||
, m_pInstance(pInstance)
|
||||
, m_pSession(pSession)
|
||||
, m_pStorage(pInstance->storage)
|
||||
, m_zDefaultDb(zDefaultDb)
|
||||
, m_zUseDb(NULL)
|
||||
, m_refreshing(false)
|
||||
{
|
||||
memset(&m_down, 0, sizeof(m_down));
|
||||
memset(&m_up, 0, sizeof(m_up));
|
||||
memset(m_key, 0, CACHE_KEY_MAXLEN);
|
||||
|
||||
reset_response_state();
|
||||
}
|
||||
|
||||
SessionCache::~SessionCache()
|
||||
{
|
||||
MXS_FREE(m_zUseDb);
|
||||
MXS_FREE(m_zDefaultDb);
|
||||
}
|
||||
|
||||
//static
|
||||
SessionCache* SessionCache::Create(CACHE_INSTANCE* pInstance, SESSION* pSession)
|
||||
{
|
||||
SessionCache* pSessionCache = NULL;
|
||||
|
||||
ss_dassert(pSession->client_dcb);
|
||||
ss_dassert(pSession->client_dcb->data);
|
||||
|
||||
MYSQL_session *pMysqlSession = (MYSQL_session*)pSession->client_dcb->data;
|
||||
char* zDefaultDb = NULL;
|
||||
|
||||
if (pMysqlSession->db[0] != 0)
|
||||
{
|
||||
zDefaultDb = MXS_STRDUP(pMysqlSession->db);
|
||||
}
|
||||
|
||||
if ((pMysqlSession->db[0] == 0) || zDefaultDb)
|
||||
{
|
||||
pSessionCache = new (std::nothrow) SessionCache(pInstance, pSession, zDefaultDb);
|
||||
|
||||
if (!pSessionCache)
|
||||
{
|
||||
MXS_FREE(zDefaultDb);
|
||||
}
|
||||
}
|
||||
|
||||
return pSessionCache;
|
||||
}
|
||||
|
||||
void SessionCache::close()
|
||||
{
|
||||
}
|
||||
|
||||
void SessionCache::setDownstream(DOWNSTREAM* pDown)
|
||||
{
|
||||
m_down = *pDown;
|
||||
}
|
||||
|
||||
void SessionCache::setUpstream(UPSTREAM* pUp)
|
||||
{
|
||||
m_up = *pUp;
|
||||
}
|
||||
|
||||
int SessionCache::routeQuery(GWBUF* pPacket)
|
||||
{
|
||||
uint8_t* pData = static_cast<uint8_t*>(GWBUF_DATA(pPacket));
|
||||
|
||||
// All of these should be guaranteed by RCAP_TYPE_TRANSACTION_TRACKING
|
||||
ss_dassert(GWBUF_IS_CONTIGUOUS(pPacket));
|
||||
ss_dassert(GWBUF_LENGTH(pPacket) >= MYSQL_HEADER_LEN + 1);
|
||||
ss_dassert(MYSQL_GET_PACKET_LEN(pData) + MYSQL_HEADER_LEN == GWBUF_LENGTH(pPacket));
|
||||
|
||||
bool fetch_from_server = true;
|
||||
|
||||
reset_response_state();
|
||||
m_state = CACHE_IGNORING_RESPONSE;
|
||||
|
||||
int rv;
|
||||
|
||||
switch ((int)MYSQL_GET_COMMAND(pData))
|
||||
{
|
||||
case MYSQL_COM_INIT_DB:
|
||||
{
|
||||
ss_dassert(!m_zUseDb);
|
||||
size_t len = MYSQL_GET_PACKET_LEN(pData) - 1; // Remove the command byte.
|
||||
m_zUseDb = (char*)MXS_MALLOC(len + 1);
|
||||
|
||||
if (m_zUseDb)
|
||||
{
|
||||
memcpy(m_zUseDb, (char*)(pData + MYSQL_HEADER_LEN + 1), len);
|
||||
m_zUseDb[len] = 0;
|
||||
m_state = CACHE_EXPECTING_USE_RESPONSE;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Memory allocation failed. We need to remove the default database to
|
||||
// prevent incorrect cache entries, since we won't know what the
|
||||
// default db is. But we only need to do that if "USE <db>" really
|
||||
// succeeds. The right thing will happen by itself in
|
||||
// handle_expecting_use_response(); if OK is returned, default_db will
|
||||
// become NULL, if ERR, default_db will not be changed.
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case MYSQL_COM_QUERY:
|
||||
{
|
||||
// We do not care whether the query was fully parsed or not.
|
||||
// If a query cannot be fully parsed, the worst thing that can
|
||||
// happen is that caching is not used, even though it would be
|
||||
// possible.
|
||||
if (qc_get_operation(pPacket) == QUERY_OP_SELECT)
|
||||
{
|
||||
SESSION *session = m_pSession;
|
||||
|
||||
if ((session_is_autocommit(session) && !session_trx_is_active(session)) ||
|
||||
session_trx_is_read_only(session))
|
||||
{
|
||||
if (cache_rules_should_store(m_pInstance->rules, m_zDefaultDb, pPacket))
|
||||
{
|
||||
if (cache_rules_should_use(m_pInstance->rules, m_pSession))
|
||||
{
|
||||
GWBUF* pResponse;
|
||||
cache_result_t result = get_cached_response(pPacket, &pResponse);
|
||||
|
||||
switch (result)
|
||||
{
|
||||
case CACHE_RESULT_STALE:
|
||||
{
|
||||
// The value was found, but it was stale. Now we need to
|
||||
// figure out whether somebody else is already fetching it.
|
||||
|
||||
long key = hash_of_key(m_key);
|
||||
|
||||
spinlock_acquire(&m_pInstance->pending_lock);
|
||||
// TODO: Remove the internal locking of hashtable. The internal
|
||||
// TODO: locking is no good if you need transactional behaviour.
|
||||
// TODO: Now we lock twice.
|
||||
void *value = hashtable_fetch(m_pInstance->pending, (void*)key);
|
||||
if (!value)
|
||||
{
|
||||
// It's not being fetched, so we make a note that we are.
|
||||
hashtable_add(m_pInstance->pending, (void*)key, DUMMY_VALUE);
|
||||
}
|
||||
spinlock_release(&m_pInstance->pending_lock);
|
||||
|
||||
if (!value)
|
||||
{
|
||||
// We were the first ones who hit the stale item. It's
|
||||
// our responsibility now to fetch it.
|
||||
if (log_decisions())
|
||||
{
|
||||
MXS_NOTICE("Cache data is stale, fetching fresh from server.");
|
||||
}
|
||||
m_refreshing = true;
|
||||
fetch_from_server = true;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Somebody is already fetching the new value. So, let's
|
||||
// use the stale value. No point in hitting the server twice.
|
||||
if (log_decisions())
|
||||
{
|
||||
MXS_NOTICE("Cache data is stale but returning it, fresh "
|
||||
"data is being fetched already.");
|
||||
}
|
||||
fetch_from_server = false;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case CACHE_RESULT_OK:
|
||||
if (log_decisions())
|
||||
{
|
||||
MXS_NOTICE("Using fresh data from cache.");
|
||||
}
|
||||
fetch_from_server = false;
|
||||
break;
|
||||
|
||||
default:
|
||||
fetch_from_server = true;
|
||||
}
|
||||
|
||||
if (fetch_from_server)
|
||||
{
|
||||
m_state = CACHE_EXPECTING_RESPONSE;
|
||||
}
|
||||
else
|
||||
{
|
||||
m_state = CACHE_EXPECTING_NOTHING;
|
||||
gwbuf_free(pPacket);
|
||||
DCB *dcb = m_pSession->client_dcb;
|
||||
|
||||
// TODO: This is not ok. Any filters before this filter, will not
|
||||
// TODO: see this data.
|
||||
rv = dcb->func.write(dcb, pResponse);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
m_state = CACHE_IGNORING_RESPONSE;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (m_pInstance->config.debug & CACHE_DEBUG_DECISIONS)
|
||||
{
|
||||
MXS_NOTICE("autocommit = %s and transaction state %s => Not using or "
|
||||
"storing to cache.",
|
||||
session_is_autocommit(m_pSession) ? "ON" : "OFF",
|
||||
session_trx_state_to_string(session_get_trx_state(m_pSession)));
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (fetch_from_server)
|
||||
{
|
||||
rv = m_down.routeQuery(m_down.instance, m_down.session, pPacket);
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
int SessionCache::clientReply(GWBUF* pData)
|
||||
{
|
||||
int rv;
|
||||
|
||||
if (m_res.pData)
|
||||
{
|
||||
gwbuf_append(m_res.pData, pData);
|
||||
}
|
||||
else
|
||||
{
|
||||
m_res.pData = pData;
|
||||
}
|
||||
|
||||
if (m_state != CACHE_IGNORING_RESPONSE)
|
||||
{
|
||||
if (gwbuf_length(m_res.pData) > m_pInstance->config.max_resultset_size)
|
||||
{
|
||||
if (m_pInstance->config.debug & CACHE_DEBUG_DECISIONS)
|
||||
{
|
||||
MXS_NOTICE("Current size %uB of resultset, at least as much "
|
||||
"as maximum allowed size %uKiB. Not caching.",
|
||||
gwbuf_length(m_res.pData),
|
||||
m_pInstance->config.max_resultset_size / 1024);
|
||||
}
|
||||
|
||||
m_state = CACHE_IGNORING_RESPONSE;
|
||||
}
|
||||
}
|
||||
|
||||
switch (m_state)
|
||||
{
|
||||
case CACHE_EXPECTING_FIELDS:
|
||||
rv = handle_expecting_fields();
|
||||
break;
|
||||
|
||||
case CACHE_EXPECTING_NOTHING:
|
||||
rv = handle_expecting_nothing();
|
||||
break;
|
||||
|
||||
case CACHE_EXPECTING_RESPONSE:
|
||||
rv = handle_expecting_response();
|
||||
break;
|
||||
|
||||
case CACHE_EXPECTING_ROWS:
|
||||
rv = handle_expecting_rows();
|
||||
break;
|
||||
|
||||
case CACHE_EXPECTING_USE_RESPONSE:
|
||||
rv = handle_expecting_use_response();
|
||||
break;
|
||||
|
||||
case CACHE_IGNORING_RESPONSE:
|
||||
rv = handle_ignoring_response();
|
||||
break;
|
||||
|
||||
default:
|
||||
MXS_ERROR("Internal cache logic broken, unexpected state: %d", m_state);
|
||||
ss_dassert(!true);
|
||||
rv = send_upstream();
|
||||
reset_response_state();
|
||||
m_state = CACHE_IGNORING_RESPONSE;
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
void SessionCache::diagnostics(DCB* pDcb)
|
||||
{
|
||||
dcb_printf(pDcb, "Hello World from Cache!\n");
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when resultset field information is handled.
|
||||
*/
|
||||
int SessionCache::handle_expecting_fields()
|
||||
{
|
||||
ss_dassert(m_state == CACHE_EXPECTING_FIELDS);
|
||||
ss_dassert(m_res.pData);
|
||||
|
||||
int rv = 1;
|
||||
|
||||
bool insufficient = false;
|
||||
|
||||
size_t buflen = gwbuf_length(m_res.pData);
|
||||
|
||||
while (!insufficient && (buflen - m_res.offset >= MYSQL_HEADER_LEN))
|
||||
{
|
||||
uint8_t header[MYSQL_HEADER_LEN + 1];
|
||||
gwbuf_copy_data(m_res.pData, m_res.offset, MYSQL_HEADER_LEN + 1, header);
|
||||
|
||||
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PACKET_LEN(header);
|
||||
|
||||
if (m_res.offset + packetlen <= buflen)
|
||||
{
|
||||
// We have at least one complete packet.
|
||||
int command = (int)MYSQL_GET_COMMAND(header);
|
||||
|
||||
switch (command)
|
||||
{
|
||||
case 0xfe: // EOF, the one after the fields.
|
||||
m_res.offset += packetlen;
|
||||
m_state = CACHE_EXPECTING_ROWS;
|
||||
rv = handle_expecting_rows();
|
||||
break;
|
||||
|
||||
default: // Field information.
|
||||
m_res.offset += packetlen;
|
||||
++m_res.nFields;
|
||||
ss_dassert(m_res.nFields <= m_res.nTotalFields);
|
||||
break;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// We need more data
|
||||
insufficient = true;
|
||||
}
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when data is received (even if nothing is expected) from the server.
|
||||
*/
|
||||
int SessionCache::handle_expecting_nothing()
|
||||
{
|
||||
ss_dassert(m_state == CACHE_EXPECTING_NOTHING);
|
||||
ss_dassert(m_res.pData);
|
||||
MXS_ERROR("Received data from the backend althoug we were expecting nothing.");
|
||||
ss_dassert(!true);
|
||||
|
||||
return send_upstream();
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when a response is received from the server.
|
||||
*/
|
||||
int SessionCache::handle_expecting_response()
|
||||
{
|
||||
ss_dassert(m_state == CACHE_EXPECTING_RESPONSE);
|
||||
ss_dassert(m_res.pData);
|
||||
|
||||
int rv = 1;
|
||||
|
||||
size_t buflen = gwbuf_length(m_res.pData);
|
||||
|
||||
if (buflen >= MYSQL_HEADER_LEN + 1) // We need the command byte.
|
||||
{
|
||||
// Reserve enough space to accomodate for the largest length encoded integer,
|
||||
// which is type field + 8 bytes.
|
||||
uint8_t header[MYSQL_HEADER_LEN + 1 + 8];
|
||||
gwbuf_copy_data(m_res.pData, 0, MYSQL_HEADER_LEN + 1, header);
|
||||
|
||||
switch ((int)MYSQL_GET_COMMAND(header))
|
||||
{
|
||||
case 0x00: // OK
|
||||
case 0xff: // ERR
|
||||
store_result();
|
||||
|
||||
rv = send_upstream();
|
||||
m_state = CACHE_IGNORING_RESPONSE;
|
||||
break;
|
||||
|
||||
case 0xfb: // GET_MORE_CLIENT_DATA/SEND_MORE_CLIENT_DATA
|
||||
rv = send_upstream();
|
||||
m_state = CACHE_IGNORING_RESPONSE;
|
||||
break;
|
||||
|
||||
default:
|
||||
if (m_res.nTotalFields != 0)
|
||||
{
|
||||
// We've seen the header and have figured out how many fields there are.
|
||||
m_state = CACHE_EXPECTING_FIELDS;
|
||||
rv = handle_expecting_fields();
|
||||
}
|
||||
else
|
||||
{
|
||||
// leint_bytes() returns the length of the int type field + the size of the
|
||||
// integer.
|
||||
size_t n_bytes = leint_bytes(&header[4]);
|
||||
|
||||
if (MYSQL_HEADER_LEN + n_bytes <= buflen)
|
||||
{
|
||||
// Now we can figure out how many fields there are, but first we
|
||||
// need to copy some more data.
|
||||
gwbuf_copy_data(m_res.pData,
|
||||
MYSQL_HEADER_LEN + 1, n_bytes - 1, &header[MYSQL_HEADER_LEN + 1]);
|
||||
|
||||
m_res.nTotalFields = leint_value(&header[4]);
|
||||
m_res.offset = MYSQL_HEADER_LEN + n_bytes;
|
||||
|
||||
m_state = CACHE_EXPECTING_FIELDS;
|
||||
rv = handle_expecting_fields();
|
||||
}
|
||||
else
|
||||
{
|
||||
// We need more data. We will be called again, when data is available.
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when resultset rows are handled.
|
||||
*/
|
||||
int SessionCache::handle_expecting_rows()
|
||||
{
|
||||
ss_dassert(m_state == CACHE_EXPECTING_ROWS);
|
||||
ss_dassert(m_res.pData);
|
||||
|
||||
int rv = 1;
|
||||
|
||||
bool insufficient = false;
|
||||
|
||||
size_t buflen = gwbuf_length(m_res.pData);
|
||||
|
||||
while (!insufficient && (buflen - m_res.offset >= MYSQL_HEADER_LEN))
|
||||
{
|
||||
uint8_t header[MYSQL_HEADER_LEN + 1];
|
||||
gwbuf_copy_data(m_res.pData, m_res.offset, MYSQL_HEADER_LEN + 1, header);
|
||||
|
||||
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PACKET_LEN(header);
|
||||
|
||||
if (m_res.offset + packetlen <= buflen)
|
||||
{
|
||||
// We have at least one complete packet.
|
||||
int command = (int)MYSQL_GET_COMMAND(header);
|
||||
|
||||
switch (command)
|
||||
{
|
||||
case 0xfe: // EOF, the one after the rows.
|
||||
m_res.offset += packetlen;
|
||||
ss_dassert(m_res.offset == buflen);
|
||||
|
||||
store_result();
|
||||
|
||||
rv = send_upstream();
|
||||
m_state = CACHE_EXPECTING_NOTHING;
|
||||
break;
|
||||
|
||||
case 0xfb: // NULL
|
||||
default: // length-encoded-string
|
||||
m_res.offset += packetlen;
|
||||
++m_res.nRows;
|
||||
|
||||
if (m_res.nRows > m_pInstance->config.max_resultset_rows)
|
||||
{
|
||||
if (m_pInstance->config.debug & CACHE_DEBUG_DECISIONS)
|
||||
{
|
||||
MXS_NOTICE("Max rows %lu reached, not caching result.", m_res.nRows);
|
||||
}
|
||||
rv = send_upstream();
|
||||
m_res.offset = buflen; // To abort the loop.
|
||||
m_state = CACHE_IGNORING_RESPONSE;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// We need more data
|
||||
insufficient = true;
|
||||
}
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when a response to a "USE db" is received from the server.
|
||||
*/
|
||||
int SessionCache::handle_expecting_use_response()
|
||||
{
|
||||
ss_dassert(m_state == CACHE_EXPECTING_USE_RESPONSE);
|
||||
ss_dassert(m_res.pData);
|
||||
|
||||
int rv = 1;
|
||||
|
||||
size_t buflen = gwbuf_length(m_res.pData);
|
||||
|
||||
if (buflen >= MYSQL_HEADER_LEN + 1) // We need the command byte.
|
||||
{
|
||||
uint8_t command;
|
||||
|
||||
gwbuf_copy_data(m_res.pData, MYSQL_HEADER_LEN, 1, &command);
|
||||
|
||||
switch (command)
|
||||
{
|
||||
case 0x00: // OK
|
||||
// In case m_zUseDb could not be allocated in routeQuery(), we will
|
||||
// in fact reset the default db here. That's ok as it will prevent broken
|
||||
// entries in the cache.
|
||||
MXS_FREE(m_zDefaultDb);
|
||||
m_zDefaultDb = m_zUseDb;
|
||||
m_zUseDb = NULL;
|
||||
break;
|
||||
|
||||
case 0xff: // ERR
|
||||
MXS_FREE(m_zUseDb);
|
||||
m_zUseDb = NULL;
|
||||
break;
|
||||
|
||||
default:
|
||||
MXS_ERROR("\"USE %s\" received unexpected server response %d.",
|
||||
m_zUseDb ? m_zUseDb : "<db>", command);
|
||||
MXS_FREE(m_zDefaultDb);
|
||||
MXS_FREE(m_zUseDb);
|
||||
m_zDefaultDb = NULL;
|
||||
m_zUseDb = NULL;
|
||||
}
|
||||
|
||||
rv = send_upstream();
|
||||
m_state = CACHE_IGNORING_RESPONSE;
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when all data from the server is ignored.
|
||||
*/
|
||||
int SessionCache::handle_ignoring_response()
|
||||
{
|
||||
ss_dassert(m_state == CACHE_IGNORING_RESPONSE);
|
||||
ss_dassert(m_res.pData);
|
||||
|
||||
return send_upstream();
|
||||
}
|
||||
|
||||
/**
|
||||
* Send data upstream.
|
||||
*
|
||||
* @return Whatever the upstream returns.
|
||||
*/
|
||||
int SessionCache::send_upstream()
|
||||
{
|
||||
ss_dassert(m_res.pData != NULL);
|
||||
|
||||
int rv = m_up.clientReply(m_up.instance, m_up.session, m_res.pData);
|
||||
m_res.pData = NULL;
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset cache response state
|
||||
*/
|
||||
void SessionCache::reset_response_state()
|
||||
{
|
||||
m_res.pData = NULL;
|
||||
m_res.nTotalFields = 0;
|
||||
m_res.nFields = 0;
|
||||
m_res.nRows = 0;
|
||||
m_res.offset = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Route a query via the cache.
|
||||
*
|
||||
* @param key A SELECT packet.
|
||||
* @param value The result.
|
||||
* @return True if the query was satisfied from the query.
|
||||
*/
|
||||
cache_result_t SessionCache::get_cached_response(const GWBUF *pQuery, GWBUF **ppResponse)
|
||||
{
|
||||
cache_result_t result = m_pStorage->getKey(m_zDefaultDb, pQuery, m_key);
|
||||
|
||||
if (result == CACHE_RESULT_OK)
|
||||
{
|
||||
uint32_t flags = CACHE_FLAGS_INCLUDE_STALE;
|
||||
|
||||
result = m_pStorage->getValue(m_key, flags, ppResponse);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Could not create cache key.");
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the data.
|
||||
*
|
||||
* @param csdata Session data
|
||||
*/
|
||||
void SessionCache::store_result()
|
||||
{
|
||||
ss_dassert(m_res.pData);
|
||||
|
||||
GWBUF *pData = gwbuf_make_contiguous(m_res.pData);
|
||||
|
||||
if (pData)
|
||||
{
|
||||
m_res.pData = pData;
|
||||
|
||||
cache_result_t result = m_pStorage->putValue(m_key, m_res.pData);
|
||||
|
||||
if (result != CACHE_RESULT_OK)
|
||||
{
|
||||
MXS_ERROR("Could not store cache item, deleting it.");
|
||||
|
||||
result = m_pStorage->delValue(m_key);
|
||||
|
||||
if ((result != CACHE_RESULT_OK) || (result != CACHE_RESULT_NOT_FOUND))
|
||||
{
|
||||
MXS_ERROR("Could not delete cache item.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (m_refreshing)
|
||||
{
|
||||
long key = hash_of_key(m_key);
|
||||
|
||||
spinlock_acquire(&m_pInstance->pending_lock);
|
||||
ss_dassert(hashtable_fetch(m_pInstance->pending, (void*)key) == DUMMY_VALUE);
|
||||
ss_debug(int n =) hashtable_delete(m_pInstance->pending, (void*)key);
|
||||
ss_dassert(n == 1);
|
||||
spinlock_release(&m_pInstance->pending_lock);
|
||||
|
||||
m_refreshing = false;
|
||||
}
|
||||
}
|
140
server/modules/filter/cache/sessioncache.h
vendored
Normal file
140
server/modules/filter/cache/sessioncache.h
vendored
Normal file
@ -0,0 +1,140 @@
|
||||
#pragma once
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
|
||||
*
|
||||
* Change Date: 2019-07-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include <maxscale/cdefs.h>
|
||||
#include <maxscale/buffer.h>
|
||||
#include <maxscale/filter.h>
|
||||
#include "cachefilter.h"
|
||||
#include "cache_storage_api.h"
|
||||
|
||||
class SessionCache
|
||||
{
|
||||
public:
|
||||
enum cache_session_state_t
|
||||
{
|
||||
CACHE_EXPECTING_RESPONSE, // A select has been sent, and we are waiting for the response.
|
||||
CACHE_EXPECTING_FIELDS, // A select has been sent, and we want more fields.
|
||||
CACHE_EXPECTING_ROWS, // A select has been sent, and we want more rows.
|
||||
CACHE_EXPECTING_NOTHING, // We are not expecting anything from the server.
|
||||
CACHE_EXPECTING_USE_RESPONSE, // A "USE DB" was issued.
|
||||
CACHE_IGNORING_RESPONSE, // We are not interested in the data received from the server.
|
||||
};
|
||||
|
||||
struct CACHE_RESPONSE_STATE
|
||||
{
|
||||
GWBUF* pData; /**< Response data, possibly incomplete. */
|
||||
size_t nTotalFields; /**< The number of fields a resultset contains. */
|
||||
size_t nFields; /**< How many fields we have received, <= n_totalfields. */
|
||||
size_t nRows; /**< How many rows we have received. */
|
||||
size_t offset; /**< Where we are in the response buffer. */
|
||||
};
|
||||
|
||||
/**
|
||||
* Releases all resources held by the session cache.
|
||||
*/
|
||||
~SessionCache();
|
||||
|
||||
/**
|
||||
* Creates a SessionCache instance.
|
||||
*
|
||||
* @param pInstance Pointer to the cache instance to which this session cache
|
||||
* belongs. Must remain valid for the lifetime of the SessionCache
|
||||
* instance being created.
|
||||
* @param pSession Pointer to the session this session cache instance is
|
||||
* specific for. Must remain valid for the lifetime of the SessionCache
|
||||
* instance being created.
|
||||
*
|
||||
* @return A new instance or NULL if memory allocation fails.
|
||||
*/
|
||||
static SessionCache* Create(CACHE_INSTANCE* pInstance, SESSION* pSession);
|
||||
|
||||
/**
|
||||
* The session has been closed.
|
||||
*/
|
||||
void close();
|
||||
|
||||
/**
|
||||
* Set the downstream component for this session.
|
||||
*
|
||||
* @param pDown The downstream filter or router
|
||||
*/
|
||||
void setDownstream(DOWNSTREAM* pDownstream);
|
||||
|
||||
/**
|
||||
* Set the upstream component for this session.
|
||||
*
|
||||
* @param pUp The upstream filter or router
|
||||
*/
|
||||
void setUpstream(UPSTREAM* pUpstream);
|
||||
|
||||
/**
|
||||
* A request on its way to a backend is delivered to this function.
|
||||
*
|
||||
* @param pPacket Buffer containing an MySQL protocol packet.
|
||||
*/
|
||||
int routeQuery(GWBUF* pPacket);
|
||||
|
||||
/**
|
||||
* A response on its way to the client is delivered to this function.
|
||||
*
|
||||
* @param pData Response data.
|
||||
*/
|
||||
int clientReply(GWBUF* pPacket);
|
||||
|
||||
/**
|
||||
* Print diagnostics of the session cache.
|
||||
*/
|
||||
void diagnostics(DCB *dcb);
|
||||
|
||||
private:
|
||||
int handle_expecting_fields();
|
||||
int handle_expecting_nothing();
|
||||
int handle_expecting_response();
|
||||
int handle_expecting_rows();
|
||||
int handle_expecting_use_response();
|
||||
int handle_ignoring_response();
|
||||
|
||||
int send_upstream();
|
||||
|
||||
void reset_response_state();
|
||||
|
||||
cache_result_t get_cached_response(const GWBUF *pQuery, GWBUF **ppResponse);
|
||||
|
||||
bool log_decisions() const
|
||||
{
|
||||
return m_pInstance->config.debug & CACHE_DEBUG_DECISIONS ? true : false;
|
||||
}
|
||||
|
||||
void store_result();
|
||||
|
||||
private:
|
||||
SessionCache(CACHE_INSTANCE* pInstance, SESSION* pSession, char* zDefaultDb);
|
||||
|
||||
SessionCache(const SessionCache&);
|
||||
SessionCache& operator = (const SessionCache&);
|
||||
|
||||
private:
|
||||
cache_session_state_t m_state; /**< What state is the session in, what data is expected. */
|
||||
CACHE_INSTANCE* m_pInstance; /**< The cache instance the session is associated with. */
|
||||
SESSION* m_pSession; /**< The session this data is associated with. */
|
||||
Storage* m_pStorage; /**< The storage to be used with this session data. */
|
||||
DOWNSTREAM m_down; /**< The previous filter or equivalent. */
|
||||
UPSTREAM m_up; /**< The next filter or equivalent. */
|
||||
CACHE_RESPONSE_STATE m_res; /**< The response state. */
|
||||
char m_key[CACHE_KEY_MAXLEN]; /**< Key storage. */
|
||||
char* m_zDefaultDb; /**< The default database. */
|
||||
char* m_zUseDb; /**< Pending default database. Needs server response. */
|
||||
bool m_refreshing; /**< Whether the session is updating a stale cache entry. */
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user