cache: Explicitly handle COM_INIT_DB
With this change, the cache will be aware of which default database is being used. That will remove the need for the cache parameter 'allowed_references' and thus make the cache easier to configure and manage.
This commit is contained in:
197
server/modules/filter/cache/cache.c
vendored
197
server/modules/filter/cache/cache.c
vendored
@ -118,11 +118,12 @@ typedef struct cache_instance
|
|||||||
|
|
||||||
typedef enum cache_session_state
|
typedef enum cache_session_state
|
||||||
{
|
{
|
||||||
CACHE_EXPECTING_RESPONSE, // A select has been sent, and we are waiting for the response.
|
CACHE_EXPECTING_RESPONSE, // A select has been sent, and we are waiting for the response.
|
||||||
CACHE_EXPECTING_FIELDS, // A select has been sent, and we want more fields.
|
CACHE_EXPECTING_FIELDS, // A select has been sent, and we want more fields.
|
||||||
CACHE_EXPECTING_ROWS, // A select has been sent, and we want more rows.
|
CACHE_EXPECTING_ROWS, // A select has been sent, and we want more rows.
|
||||||
CACHE_EXPECTING_NOTHING, // We are not expecting anything from the server.
|
CACHE_EXPECTING_NOTHING, // We are not expecting anything from the server.
|
||||||
CACHE_IGNORING_RESPONSE, // We are not interested in the data received from the server.
|
CACHE_EXPECTING_USE_RESPONSE, // A "USE DB" was issued.
|
||||||
|
CACHE_IGNORING_RESPONSE, // We are not interested in the data received from the server.
|
||||||
} cache_session_state_t;
|
} cache_session_state_t;
|
||||||
|
|
||||||
typedef struct cache_request_state
|
typedef struct cache_request_state
|
||||||
@ -143,15 +144,17 @@ static void cache_response_state_reset(CACHE_RESPONSE_STATE *state);
|
|||||||
|
|
||||||
typedef struct cache_session_data
|
typedef struct cache_session_data
|
||||||
{
|
{
|
||||||
CACHE_INSTANCE *instance; /**< The cache instance the session is associated with. */
|
CACHE_INSTANCE *instance; /**< The cache instance the session is associated with. */
|
||||||
CACHE_STORAGE_API *api; /**< The storage API to be used. */
|
CACHE_STORAGE_API *api; /**< The storage API to be used. */
|
||||||
CACHE_STORAGE *storage; /**< The storage to be used with this session data. */
|
CACHE_STORAGE *storage; /**< The storage to be used with this session data. */
|
||||||
DOWNSTREAM down; /**< The previous filter or equivalent. */
|
DOWNSTREAM down; /**< The previous filter or equivalent. */
|
||||||
UPSTREAM up; /**< The next filter or equivalent. */
|
UPSTREAM up; /**< The next filter or equivalent. */
|
||||||
CACHE_REQUEST_STATE req; /**< The request state. */
|
CACHE_REQUEST_STATE req; /**< The request state. */
|
||||||
CACHE_RESPONSE_STATE res; /**< The response state. */
|
CACHE_RESPONSE_STATE res; /**< The response state. */
|
||||||
SESSION *session; /**< The session this data is associated with. */
|
SESSION *session; /**< The session this data is associated with. */
|
||||||
char key[CACHE_KEY_MAXLEN]; /**< Key storage. */
|
char key[CACHE_KEY_MAXLEN]; /**< Key storage. */
|
||||||
|
char *default_db; /**< The default database. */
|
||||||
|
char *use_db; /**< Pending default database. Needs server response. */
|
||||||
cache_session_state_t state;
|
cache_session_state_t state;
|
||||||
} CACHE_SESSION_DATA;
|
} CACHE_SESSION_DATA;
|
||||||
|
|
||||||
@ -162,6 +165,7 @@ static int handle_expecting_fields(CACHE_SESSION_DATA *csdata);
|
|||||||
static int handle_expecting_nothing(CACHE_SESSION_DATA *csdata);
|
static int handle_expecting_nothing(CACHE_SESSION_DATA *csdata);
|
||||||
static int handle_expecting_response(CACHE_SESSION_DATA *csdata);
|
static int handle_expecting_response(CACHE_SESSION_DATA *csdata);
|
||||||
static int handle_expecting_rows(CACHE_SESSION_DATA *csdata);
|
static int handle_expecting_rows(CACHE_SESSION_DATA *csdata);
|
||||||
|
static int handle_expecting_use_response(CACHE_SESSION_DATA *csdata);
|
||||||
static int handle_ignoring_response(CACHE_SESSION_DATA *csdata);
|
static int handle_ignoring_response(CACHE_SESSION_DATA *csdata);
|
||||||
|
|
||||||
static bool route_using_cache(CACHE_SESSION_DATA *sdata, const GWBUF *key, GWBUF **value);
|
static bool route_using_cache(CACHE_SESSION_DATA *sdata, const GWBUF *key, GWBUF **value);
|
||||||
@ -417,36 +421,78 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *data)
|
|||||||
cache_response_state_reset(&csdata->res);
|
cache_response_state_reset(&csdata->res);
|
||||||
csdata->state = CACHE_IGNORING_RESPONSE;
|
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||||
|
|
||||||
// TODO: This returns the wrong result if GWBUF_LENGTH(packet) is < 5.
|
if (gwbuf_length(packet) > MYSQL_HEADER_LEN + 1) // We need at least a packet with a type.
|
||||||
if (modutil_is_SQL(packet))
|
|
||||||
{
|
{
|
||||||
packet = gwbuf_make_contiguous(packet);
|
uint8_t header[MYSQL_HEADER_LEN + 1];
|
||||||
|
|
||||||
// We do not care whether the query was fully parsed or not.
|
gwbuf_copy_data(packet, 0, sizeof(header), header);
|
||||||
// If a query cannot be fully parsed, the worst thing that can
|
|
||||||
// happen is that caching is not used, even though it would be
|
|
||||||
// possible.
|
|
||||||
|
|
||||||
if (qc_get_operation(packet) == QUERY_OP_SELECT)
|
switch ((int)MYSQL_GET_COMMAND(header))
|
||||||
{
|
{
|
||||||
GWBUF *result;
|
case MYSQL_COM_INIT_DB:
|
||||||
use_default = !route_using_cache(csdata, packet, &result);
|
|
||||||
|
|
||||||
if (use_default)
|
|
||||||
{
|
{
|
||||||
csdata->state = CACHE_EXPECTING_RESPONSE;
|
ss_dassert(!csdata->use_db);
|
||||||
}
|
size_t len = MYSQL_GET_PACKET_LEN(header) - 1; // Remove the command byte.
|
||||||
else
|
csdata->use_db = MXS_MALLOC(len + 1);
|
||||||
{
|
|
||||||
csdata->state = CACHE_EXPECTING_NOTHING;
|
|
||||||
C_DEBUG("Using data from cache.");
|
|
||||||
gwbuf_free(packet);
|
|
||||||
DCB *dcb = csdata->session->client_dcb;
|
|
||||||
|
|
||||||
// TODO: This is not ok. Any filters before this filter, will not
|
if (csdata->use_db)
|
||||||
// TODO: see this data.
|
{
|
||||||
rv = dcb->func.write(dcb, result);
|
uint8_t *use_db = (uint8_t*)csdata->use_db;
|
||||||
|
gwbuf_copy_data(packet, MYSQL_HEADER_LEN + 1, len, use_db);
|
||||||
|
csdata->use_db[len] = 0;
|
||||||
|
csdata->state = CACHE_EXPECTING_USE_RESPONSE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Memory allocation failed. Let's remove the default database to
|
||||||
|
// prevent incorrect cache entries, since we won't know what the
|
||||||
|
// default db will be.
|
||||||
|
MXS_FREE(csdata->default_db);
|
||||||
|
csdata->default_db = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MYSQL_COM_QUERY:
|
||||||
|
{
|
||||||
|
GWBUF *tmp = gwbuf_make_contiguous(packet);
|
||||||
|
|
||||||
|
if (tmp)
|
||||||
|
{
|
||||||
|
packet = tmp;
|
||||||
|
|
||||||
|
// We do not care whether the query was fully parsed or not.
|
||||||
|
// If a query cannot be fully parsed, the worst thing that can
|
||||||
|
// happen is that caching is not used, even though it would be
|
||||||
|
// possible.
|
||||||
|
|
||||||
|
if (qc_get_operation(packet) == QUERY_OP_SELECT)
|
||||||
|
{
|
||||||
|
GWBUF *result;
|
||||||
|
use_default = !route_using_cache(csdata, packet, &result);
|
||||||
|
|
||||||
|
if (use_default)
|
||||||
|
{
|
||||||
|
csdata->state = CACHE_EXPECTING_RESPONSE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
csdata->state = CACHE_EXPECTING_NOTHING;
|
||||||
|
C_DEBUG("Using data from cache.");
|
||||||
|
gwbuf_free(packet);
|
||||||
|
DCB *dcb = csdata->session->client_dcb;
|
||||||
|
|
||||||
|
// TODO: This is not ok. Any filters before this filter, will not
|
||||||
|
// TODO: see this data.
|
||||||
|
rv = dcb->func.write(dcb, result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -519,6 +565,10 @@ static int clientReply(FILTER *instance, void *sdata, GWBUF *data)
|
|||||||
rv = handle_expecting_rows(csdata);
|
rv = handle_expecting_rows(csdata);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case CACHE_EXPECTING_USE_RESPONSE:
|
||||||
|
rv = handle_expecting_use_response(csdata);
|
||||||
|
break;
|
||||||
|
|
||||||
case CACHE_IGNORING_RESPONSE:
|
case CACHE_IGNORING_RESPONSE:
|
||||||
rv = handle_ignoring_response(csdata);
|
rv = handle_ignoring_response(csdata);
|
||||||
break;
|
break;
|
||||||
@ -603,6 +653,8 @@ static void cache_session_data_free(CACHE_SESSION_DATA* data)
|
|||||||
{
|
{
|
||||||
if (data)
|
if (data)
|
||||||
{
|
{
|
||||||
|
ss_dassert(!data->use_db);
|
||||||
|
MXS_FREE(data->default_db);
|
||||||
MXS_FREE(data);
|
MXS_FREE(data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -704,7 +756,7 @@ static int handle_expecting_response(CACHE_SESSION_DATA *csdata)
|
|||||||
store_result(csdata);
|
store_result(csdata);
|
||||||
|
|
||||||
rv = send_upstream(csdata);
|
rv = send_upstream(csdata);
|
||||||
csdata->state = CACHE_EXPECTING_NOTHING;
|
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case 0xfb: // GET_MORE_CLIENT_DATA/SEND_MORE_CLIENT_DATA
|
case 0xfb: // GET_MORE_CLIENT_DATA/SEND_MORE_CLIENT_DATA
|
||||||
@ -818,6 +870,58 @@ static int handle_expecting_rows(CACHE_SESSION_DATA *csdata)
|
|||||||
return rv;
|
return rv;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when a response to a "USE db" is received from the server.
|
||||||
|
*
|
||||||
|
* @param csdata The cache session data.
|
||||||
|
*/
|
||||||
|
static int handle_expecting_use_response(CACHE_SESSION_DATA *csdata)
|
||||||
|
{
|
||||||
|
ss_dassert(csdata->state == CACHE_EXPECTING_USE_RESPONSE);
|
||||||
|
ss_dassert(csdata->res.data);
|
||||||
|
|
||||||
|
int rv = 1;
|
||||||
|
|
||||||
|
size_t buflen = gwbuf_length(csdata->res.data);
|
||||||
|
|
||||||
|
if (buflen >= MYSQL_HEADER_LEN + 1) // We need the command byte.
|
||||||
|
{
|
||||||
|
uint8_t command;
|
||||||
|
|
||||||
|
gwbuf_copy_data(csdata->res.data, MYSQL_HEADER_LEN, 1, &command);
|
||||||
|
|
||||||
|
switch (command)
|
||||||
|
{
|
||||||
|
case 0x00: // OK
|
||||||
|
// In case csdata->use_db could not be allocated in routeQuery(), we will
|
||||||
|
// in fact reset the default db here. That's ok as it will prevent broken
|
||||||
|
// entries in the cache.
|
||||||
|
MXS_FREE(csdata->default_db);
|
||||||
|
csdata->default_db = csdata->use_db;
|
||||||
|
csdata->use_db = NULL;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 0xff: // ERR
|
||||||
|
MXS_FREE(csdata->use_db);
|
||||||
|
csdata->use_db = NULL;
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
MXS_ERROR("\"USE %s\" received unexpected server response %d.",
|
||||||
|
csdata->use_db ? csdata->use_db : "<db>", command);
|
||||||
|
MXS_FREE(csdata->default_db);
|
||||||
|
MXS_FREE(csdata->use_db);
|
||||||
|
csdata->default_db = NULL;
|
||||||
|
csdata->use_db = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
rv = send_upstream(csdata);
|
||||||
|
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when all data from the server is ignored.
|
* Called when all data from the server is ignored.
|
||||||
*
|
*
|
||||||
@ -883,14 +987,19 @@ static void store_result(CACHE_SESSION_DATA *csdata)
|
|||||||
{
|
{
|
||||||
ss_dassert(csdata->res.data);
|
ss_dassert(csdata->res.data);
|
||||||
|
|
||||||
csdata->res.data = gwbuf_make_contiguous(csdata->res.data);
|
GWBUF *data = gwbuf_make_contiguous(csdata->res.data);
|
||||||
|
|
||||||
cache_result_t result = csdata->api->putValue(csdata->storage,
|
if (data)
|
||||||
csdata->key,
|
|
||||||
csdata->res.data);
|
|
||||||
|
|
||||||
if (result != CACHE_RESULT_OK)
|
|
||||||
{
|
{
|
||||||
MXS_ERROR("Could not store cache item.");
|
csdata->res.data = data;
|
||||||
|
|
||||||
|
cache_result_t result = csdata->api->putValue(csdata->storage,
|
||||||
|
csdata->key,
|
||||||
|
csdata->res.data);
|
||||||
|
|
||||||
|
if (result != CACHE_RESULT_OK)
|
||||||
|
{
|
||||||
|
MXS_ERROR("Could not store cache item.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user