Cache: Assume we need to handle one packet at a time
This commit is contained in:
80
server/modules/filter/cache/cache.c
vendored
80
server/modules/filter/cache/cache.c
vendored
@ -364,84 +364,58 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *packets)
|
|||||||
|
|
||||||
if (csdata->packets)
|
if (csdata->packets)
|
||||||
{
|
{
|
||||||
C_DEBUG("Old packets exist.");
|
|
||||||
gwbuf_append(csdata->packets, packets);
|
gwbuf_append(csdata->packets, packets);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
C_DEBUG("NO old packets exist.");
|
|
||||||
csdata->packets = packets;
|
csdata->packets = packets;
|
||||||
}
|
}
|
||||||
|
|
||||||
packets = modutil_get_complete_packets(&csdata->packets);
|
GWBUF *packet = modutil_get_next_MySQL_packet(&csdata->packets);
|
||||||
|
|
||||||
int rv;
|
int rv;
|
||||||
|
|
||||||
if (packets)
|
if (packet)
|
||||||
{
|
{
|
||||||
C_DEBUG("At least one complete packet exist.");
|
bool use_default = true;
|
||||||
GWBUF *packet;
|
|
||||||
|
|
||||||
// TODO: Is it really possible to get more that one packet
|
// TODO: This returns the wrong result if GWBUF_LENGTH(packet) is < 5.
|
||||||
// TODO: is this loop? If so, can those packets be sent
|
if (modutil_is_SQL(packet))
|
||||||
// TODO: after one and other, or do we need to wait for
|
|
||||||
// TODO: a replies? If there are more complete packets
|
|
||||||
// TODO: than one, then either CACHE_SESSION_DATA::key
|
|
||||||
// TODO: needs to be a queue
|
|
||||||
|
|
||||||
// TODO: modutil_get_next_MySQL_packet *copies* the data.
|
|
||||||
while ((packet = modutil_get_next_MySQL_packet(&packets)))
|
|
||||||
{
|
{
|
||||||
C_DEBUG("Processing packet.");
|
packet = gwbuf_make_contiguous(packet);
|
||||||
bool use_default = true;
|
|
||||||
|
|
||||||
if (modutil_is_SQL(packet))
|
// We do not care whether the query was fully parsed or not.
|
||||||
|
// If a query cannot be fully parsed, the worst thing that can
|
||||||
|
// happen is that caching is not used, even though it would be
|
||||||
|
// possible.
|
||||||
|
|
||||||
|
if (qc_get_operation(packet) == QUERY_OP_SELECT)
|
||||||
{
|
{
|
||||||
C_DEBUG("Is SQL.");
|
GWBUF *result;
|
||||||
// We do not care whether the query was fully parsed or not.
|
use_default = !route_using_cache(cinstance, csdata, packet, &result);
|
||||||
// If a query cannot be fully parsed, the worst thing that can
|
|
||||||
// happen is that caching is not used, even though it would be
|
|
||||||
// possible.
|
|
||||||
|
|
||||||
if (qc_get_operation(packet) == QUERY_OP_SELECT)
|
if (!use_default)
|
||||||
{
|
{
|
||||||
C_DEBUG("Is a SELECT");
|
C_DEBUG("Using data from cache.");
|
||||||
|
gwbuf_free(packet);
|
||||||
|
DCB *dcb = csdata->session->client_dcb;
|
||||||
|
|
||||||
GWBUF *result;
|
// TODO: This is not ok. Any filters before this filter, will not
|
||||||
use_default = !route_using_cache(cinstance, csdata, packet, &result);
|
// TODO: see this data.
|
||||||
|
rv = dcb->func.write(dcb, result);
|
||||||
if (!use_default)
|
|
||||||
{
|
|
||||||
C_DEBUG("Using data from cache.");
|
|
||||||
gwbuf_free(packet);
|
|
||||||
DCB *dcb = csdata->session->client_dcb;
|
|
||||||
|
|
||||||
// TODO: This is not ok. Any filters before this filter, will not
|
|
||||||
// TODO: see this data.
|
|
||||||
rv = dcb->func.write(dcb, result);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
C_DEBUG("Is NOT a SELECT");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
}
|
||||||
{
|
|
||||||
C_DEBUG("Is NOT SQL.");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (use_default)
|
if (use_default)
|
||||||
{
|
{
|
||||||
C_DEBUG("Using default processing.");
|
C_DEBUG("Using default processing.");
|
||||||
rv = csdata->down.routeQuery(csdata->down.instance, csdata->down.session, packet);
|
rv = csdata->down.routeQuery(csdata->down.instance, csdata->down.session, packet);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
C_DEBUG("Not even one complete packet exist; more data needed.");
|
// We need more data before we can do something.
|
||||||
// Ok, we need more data before we can do something.
|
|
||||||
rv = 1;
|
rv = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -168,6 +168,7 @@ cache_result_t RocksDBStorage::getValue(const char* pKey, GWBUF** ppResult)
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
MXS_NOTICE("Cache item is stale, not using.");
|
MXS_NOTICE("Cache item is stale, not using.");
|
||||||
|
result = CACHE_RESULT_NOT_FOUND;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -190,7 +191,7 @@ cache_result_t RocksDBStorage::getValue(const char* pKey, GWBUF** ppResult)
|
|||||||
|
|
||||||
cache_result_t RocksDBStorage::putValue(const char* pKey, const GWBUF* pValue)
|
cache_result_t RocksDBStorage::putValue(const char* pKey, const GWBUF* pValue)
|
||||||
{
|
{
|
||||||
// ss_dassert(gwbuf_is_contiguous(pValue));
|
ss_dassert(GWBUF_IS_CONTIGUOUS(pValue));
|
||||||
|
|
||||||
rocksdb::Slice key(pKey, ROCKSDB_KEY_LENGTH);
|
rocksdb::Slice key(pKey, ROCKSDB_KEY_LENGTH);
|
||||||
rocksdb::Slice value(static_cast<const char*>(GWBUF_DATA(pValue)), GWBUF_LENGTH(pValue));
|
rocksdb::Slice value(static_cast<const char*>(GWBUF_DATA(pValue)), GWBUF_LENGTH(pValue));
|
||||||
|
|||||||
@ -25,6 +25,8 @@ bool initialize()
|
|||||||
|
|
||||||
CACHE_STORAGE* createInstance(const char* zName, uint32_t ttl, int argc, char* argv[])
|
CACHE_STORAGE* createInstance(const char* zName, uint32_t ttl, int argc, char* argv[])
|
||||||
{
|
{
|
||||||
|
ss_dassert(zName);
|
||||||
|
|
||||||
CACHE_STORAGE* pStorage = 0;
|
CACHE_STORAGE* pStorage = 0;
|
||||||
|
|
||||||
try
|
try
|
||||||
@ -56,6 +58,10 @@ cache_result_t getKey(CACHE_STORAGE* pStorage,
|
|||||||
const GWBUF* pQuery,
|
const GWBUF* pQuery,
|
||||||
char* pKey)
|
char* pKey)
|
||||||
{
|
{
|
||||||
|
ss_dassert(pStorage);
|
||||||
|
ss_dassert(pQuery);
|
||||||
|
ss_dassert(pKey);
|
||||||
|
|
||||||
cache_result_t result = CACHE_RESULT_ERROR;
|
cache_result_t result = CACHE_RESULT_ERROR;
|
||||||
|
|
||||||
try
|
try
|
||||||
@ -80,6 +86,10 @@ cache_result_t getKey(CACHE_STORAGE* pStorage,
|
|||||||
|
|
||||||
cache_result_t getValue(CACHE_STORAGE* pStorage, const char* pKey, GWBUF** ppResult)
|
cache_result_t getValue(CACHE_STORAGE* pStorage, const char* pKey, GWBUF** ppResult)
|
||||||
{
|
{
|
||||||
|
ss_dassert(pStorage);
|
||||||
|
ss_dassert(pKey);
|
||||||
|
ss_dassert(ppResult);
|
||||||
|
|
||||||
cache_result_t result = CACHE_RESULT_ERROR;
|
cache_result_t result = CACHE_RESULT_ERROR;
|
||||||
|
|
||||||
try
|
try
|
||||||
@ -106,6 +116,10 @@ cache_result_t putValue(CACHE_STORAGE* pStorage,
|
|||||||
const char* pKey,
|
const char* pKey,
|
||||||
const GWBUF* pValue)
|
const GWBUF* pValue)
|
||||||
{
|
{
|
||||||
|
ss_dassert(pStorage);
|
||||||
|
ss_dassert(pKey);
|
||||||
|
ss_dassert(pValue);
|
||||||
|
|
||||||
cache_result_t result = CACHE_RESULT_ERROR;
|
cache_result_t result = CACHE_RESULT_ERROR;
|
||||||
|
|
||||||
try
|
try
|
||||||
|
|||||||
Reference in New Issue
Block a user