Update cache with new capabilities
Now that a filter can express that the transaction state is tracked, the cache implementation can be simplified. We do not need to cater for the case that a "too short" or "too long" a packet would be delivered. Further, since the autocommit mode and transaction state of the session are tracked, the filter can cache data when it is safe to do so. In practice that means when either AUTOCOMMIT is ON and no explicit transaction is active or when a READ ONLY transaction is active, irrespective of the autocommit state. In principle it would be possible to tentatively cache data during a transaction, and if the transaction is committed successfully flush the tentatively cached data to the actual cache, but that will be for another day.
This commit is contained in:
parent
f961f87e5e
commit
8391579206
180
server/modules/filter/cache/cache.c
vendored
180
server/modules/filter/cache/cache.c
vendored
@ -133,11 +133,6 @@ typedef enum cache_session_state
|
||||
CACHE_IGNORING_RESPONSE, // We are not interested in the data received from the server.
|
||||
} cache_session_state_t;
|
||||
|
||||
typedef struct cache_request_state
|
||||
{
|
||||
GWBUF* data; /**< Request data, possibly incomplete. */
|
||||
} CACHE_REQUEST_STATE;
|
||||
|
||||
typedef struct cache_response_state
|
||||
{
|
||||
GWBUF* data; /**< Response data, possibly incomplete. */
|
||||
@ -156,7 +151,6 @@ typedef struct cache_session_data
|
||||
CACHE_STORAGE *storage; /**< The storage to be used with this session data. */
|
||||
DOWNSTREAM down; /**< The previous filter or equivalent. */
|
||||
UPSTREAM up; /**< The next filter or equivalent. */
|
||||
CACHE_REQUEST_STATE req; /**< The request state. */
|
||||
CACHE_RESPONSE_STATE res; /**< The response state. */
|
||||
SESSION *session; /**< The session this data is associated with. */
|
||||
char key[CACHE_KEY_MAXLEN]; /**< Key storage. */
|
||||
@ -333,129 +327,113 @@ static void setUpstream(FILTER *instance, void *sdata, UPSTREAM *up)
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param sdata The filter session data
|
||||
* @param packets The query data
|
||||
* @param buffer Buffer containing an MySQL protocol packet.
|
||||
*/
|
||||
static int routeQuery(FILTER *instance, void *sdata, GWBUF *data)
|
||||
static int routeQuery(FILTER *instance, void *sdata, GWBUF *packet)
|
||||
{
|
||||
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
||||
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
|
||||
|
||||
if (csdata->req.data)
|
||||
{
|
||||
gwbuf_append(csdata->req.data, data);
|
||||
}
|
||||
else
|
||||
{
|
||||
csdata->req.data = data;
|
||||
}
|
||||
uint8_t *data = GWBUF_DATA(packet);
|
||||
|
||||
GWBUF *packet = modutil_get_next_MySQL_packet(&csdata->req.data);
|
||||
// All of these should be guaranteed by RCAP_TYPE_TRANSACTION_TRACKING
|
||||
ss_dassert(GWBUF_IS_CONTIGUOUS(packet));
|
||||
ss_dassert(GWBUF_LENGTH(packet) >= MYSQL_HEADER_LEN + 1);
|
||||
ss_dassert(MYSQL_GET_PACKET_LEN(data) + MYSQL_HEADER_LEN == GWBUF_LENGTH(packet));
|
||||
|
||||
bool use_default = true;
|
||||
|
||||
cache_response_state_reset(&csdata->res);
|
||||
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||
|
||||
int rv;
|
||||
|
||||
if (packet)
|
||||
switch ((int)MYSQL_GET_COMMAND(data))
|
||||
{
|
||||
bool use_default = true;
|
||||
|
||||
cache_response_state_reset(&csdata->res);
|
||||
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||
|
||||
if (gwbuf_length(packet) > MYSQL_HEADER_LEN + 1) // We need at least a packet with a type.
|
||||
case MYSQL_COM_INIT_DB:
|
||||
{
|
||||
uint8_t header[MYSQL_HEADER_LEN + 1];
|
||||
ss_dassert(!csdata->use_db);
|
||||
size_t len = MYSQL_GET_PACKET_LEN(data) - 1; // Remove the command byte.
|
||||
csdata->use_db = MXS_MALLOC(len + 1);
|
||||
|
||||
gwbuf_copy_data(packet, 0, sizeof(header), header);
|
||||
|
||||
switch ((int)MYSQL_GET_COMMAND(header))
|
||||
if (csdata->use_db)
|
||||
{
|
||||
case MYSQL_COM_INIT_DB:
|
||||
memcpy(csdata->use_db, data + MYSQL_HEADER_LEN + 1, len);
|
||||
csdata->use_db[len] = 0;
|
||||
csdata->state = CACHE_EXPECTING_USE_RESPONSE;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Memory allocation failed. We need to remove the default database to
|
||||
// prevent incorrect cache entries, since we won't know what the
|
||||
// default db is. But we only need to do that if "USE <db>" really
|
||||
// succeeds. The right thing will happen by itself in
|
||||
// handle_expecting_use_response(); if OK is returned, default_db will
|
||||
// become NULL, if ERR, default_db will not be changed.
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case MYSQL_COM_QUERY:
|
||||
{
|
||||
// We do not care whether the query was fully parsed or not.
|
||||
// If a query cannot be fully parsed, the worst thing that can
|
||||
// happen is that caching is not used, even though it would be
|
||||
// possible.
|
||||
if (qc_get_operation(packet) == QUERY_OP_SELECT)
|
||||
{
|
||||
SESSION *session = csdata->session;
|
||||
|
||||
if ((session_is_autocommit(session) && !session_trx_is_active(session)) ||
|
||||
session_trx_is_read_only(session))
|
||||
{
|
||||
ss_dassert(!csdata->use_db);
|
||||
size_t len = MYSQL_GET_PACKET_LEN(header) - 1; // Remove the command byte.
|
||||
csdata->use_db = MXS_MALLOC(len + 1);
|
||||
|
||||
if (csdata->use_db)
|
||||
if (cache_rules_should_store(cinstance->rules, csdata->default_db, packet))
|
||||
{
|
||||
uint8_t *use_db = (uint8_t*)csdata->use_db;
|
||||
gwbuf_copy_data(packet, MYSQL_HEADER_LEN + 1, len, use_db);
|
||||
csdata->use_db[len] = 0;
|
||||
csdata->state = CACHE_EXPECTING_USE_RESPONSE;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Memory allocation failed. We need to remove the default database to
|
||||
// prevent incorrect cache entries, since we won't know what the
|
||||
// default db is. But we only need to do that if "USE <db>" really
|
||||
// succeeds. The right thing will happen by itself in
|
||||
// handle_expecting_use_response(); if OK is returned, default_db will
|
||||
// become NULL, if ERR, default_db will not be changed.
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case MYSQL_COM_QUERY:
|
||||
{
|
||||
GWBUF *tmp = gwbuf_make_contiguous(packet);
|
||||
|
||||
if (tmp)
|
||||
{
|
||||
packet = tmp;
|
||||
|
||||
// We do not care whether the query was fully parsed or not.
|
||||
// If a query cannot be fully parsed, the worst thing that can
|
||||
// happen is that caching is not used, even though it would be
|
||||
// possible.
|
||||
|
||||
if (qc_get_operation(packet) == QUERY_OP_SELECT)
|
||||
if (cache_rules_should_use(cinstance->rules, csdata->session))
|
||||
{
|
||||
if (cache_rules_should_store(cinstance->rules, csdata->default_db, packet))
|
||||
GWBUF *result;
|
||||
use_default = !route_using_cache(csdata, packet, &result);
|
||||
|
||||
if (use_default)
|
||||
{
|
||||
if (cache_rules_should_use(cinstance->rules, csdata->session))
|
||||
{
|
||||
GWBUF *result;
|
||||
use_default = !route_using_cache(csdata, packet, &result);
|
||||
|
||||
if (use_default)
|
||||
{
|
||||
csdata->state = CACHE_EXPECTING_RESPONSE;
|
||||
}
|
||||
else
|
||||
{
|
||||
csdata->state = CACHE_EXPECTING_NOTHING;
|
||||
C_DEBUG("Using data from cache.");
|
||||
gwbuf_free(packet);
|
||||
DCB *dcb = csdata->session->client_dcb;
|
||||
|
||||
// TODO: This is not ok. Any filters before this filter, will not
|
||||
// TODO: see this data.
|
||||
rv = dcb->func.write(dcb, result);
|
||||
}
|
||||
}
|
||||
csdata->state = CACHE_EXPECTING_RESPONSE;
|
||||
}
|
||||
else
|
||||
{
|
||||
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||
csdata->state = CACHE_EXPECTING_NOTHING;
|
||||
C_DEBUG("Using data from cache.");
|
||||
gwbuf_free(packet);
|
||||
DCB *dcb = csdata->session->client_dcb;
|
||||
|
||||
// TODO: This is not ok. Any filters before this filter, will not
|
||||
// TODO: see this data.
|
||||
rv = dcb->func.write(dcb, result);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
C_DEBUG("autocommit = %s and transaction state %s => Not using or storing to cache.",
|
||||
session_is_autocommit(csdata->session) ? "ON" : "OFF",
|
||||
session_trx_state_to_string(session_get_trx_state(csdata->session)));
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
if (use_default)
|
||||
{
|
||||
C_DEBUG("Using default processing.");
|
||||
rv = csdata->down.routeQuery(csdata->down.instance, csdata->down.session, packet);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
else
|
||||
|
||||
if (use_default)
|
||||
{
|
||||
// We need more data before we can do something.
|
||||
rv = 1;
|
||||
C_DEBUG("Using default processing.");
|
||||
rv = csdata->down.routeQuery(csdata->down.instance, csdata->down.session, packet);
|
||||
}
|
||||
|
||||
return rv;
|
||||
@ -560,7 +538,7 @@ static void diagnostics(FILTER *instance, void *sdata, DCB *dcb)
|
||||
*/
|
||||
static uint64_t getCapabilities(void)
|
||||
{
|
||||
return RCAP_TYPE_STMT_INPUT;
|
||||
return RCAP_TYPE_TRANSACTION_TRACKING;
|
||||
}
|
||||
|
||||
//
|
||||
|
Loading…
x
Reference in New Issue
Block a user