diff --git a/server/modules/filter/cache/cache.c b/server/modules/filter/cache/cache.c index ba28389f3..62a9d40c5 100644 --- a/server/modules/filter/cache/cache.c +++ b/server/modules/filter/cache/cache.c @@ -118,11 +118,12 @@ typedef struct cache_instance typedef enum cache_session_state { - CACHE_EXPECTING_RESPONSE, // A select has been sent, and we are waiting for the response. - CACHE_EXPECTING_FIELDS, // A select has been sent, and we want more fields. - CACHE_EXPECTING_ROWS, // A select has been sent, and we want more rows. - CACHE_EXPECTING_NOTHING, // We are not expecting anything from the server. - CACHE_IGNORING_RESPONSE, // We are not interested in the data received from the server. + CACHE_EXPECTING_RESPONSE, // A select has been sent, and we are waiting for the response. + CACHE_EXPECTING_FIELDS, // A select has been sent, and we want more fields. + CACHE_EXPECTING_ROWS, // A select has been sent, and we want more rows. + CACHE_EXPECTING_NOTHING, // We are not expecting anything from the server. + CACHE_EXPECTING_USE_RESPONSE, // A "USE DB" was issued. + CACHE_IGNORING_RESPONSE, // We are not interested in the data received from the server. } cache_session_state_t; typedef struct cache_request_state @@ -143,15 +144,17 @@ static void cache_response_state_reset(CACHE_RESPONSE_STATE *state); typedef struct cache_session_data { - CACHE_INSTANCE *instance; /**< The cache instance the session is associated with. */ - CACHE_STORAGE_API *api; /**< The storage API to be used. */ - CACHE_STORAGE *storage; /**< The storage to be used with this session data. */ - DOWNSTREAM down; /**< The previous filter or equivalent. */ - UPSTREAM up; /**< The next filter or equivalent. */ - CACHE_REQUEST_STATE req; /**< The request state. */ - CACHE_RESPONSE_STATE res; /**< The response state. */ - SESSION *session; /**< The session this data is associated with. */ + CACHE_INSTANCE *instance; /**< The cache instance the session is associated with. */ + CACHE_STORAGE_API *api; /**< The storage API to be used. */ + CACHE_STORAGE *storage; /**< The storage to be used with this session data. */ + DOWNSTREAM down; /**< The previous filter or equivalent. */ + UPSTREAM up; /**< The next filter or equivalent. */ + CACHE_REQUEST_STATE req; /**< The request state. */ + CACHE_RESPONSE_STATE res; /**< The response state. */ + SESSION *session; /**< The session this data is associated with. */ char key[CACHE_KEY_MAXLEN]; /**< Key storage. */ + char *default_db; /**< The default database. */ + char *use_db; /**< Pending default database. Needs server response. */ cache_session_state_t state; } CACHE_SESSION_DATA; @@ -162,6 +165,7 @@ static int handle_expecting_fields(CACHE_SESSION_DATA *csdata); static int handle_expecting_nothing(CACHE_SESSION_DATA *csdata); static int handle_expecting_response(CACHE_SESSION_DATA *csdata); static int handle_expecting_rows(CACHE_SESSION_DATA *csdata); +static int handle_expecting_use_response(CACHE_SESSION_DATA *csdata); static int handle_ignoring_response(CACHE_SESSION_DATA *csdata); static bool route_using_cache(CACHE_SESSION_DATA *sdata, const GWBUF *key, GWBUF **value); @@ -417,36 +421,78 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *data) cache_response_state_reset(&csdata->res); csdata->state = CACHE_IGNORING_RESPONSE; - // TODO: This returns the wrong result if GWBUF_LENGTH(packet) is < 5. - if (modutil_is_SQL(packet)) + if (gwbuf_length(packet) > MYSQL_HEADER_LEN + 1) // We need at least a packet with a type. { - packet = gwbuf_make_contiguous(packet); + uint8_t header[MYSQL_HEADER_LEN + 1]; - // We do not care whether the query was fully parsed or not. - // If a query cannot be fully parsed, the worst thing that can - // happen is that caching is not used, even though it would be - // possible. + gwbuf_copy_data(packet, 0, sizeof(header), header); - if (qc_get_operation(packet) == QUERY_OP_SELECT) + switch ((int)MYSQL_GET_COMMAND(header)) { - GWBUF *result; - use_default = !route_using_cache(csdata, packet, &result); - - if (use_default) + case MYSQL_COM_INIT_DB: { - csdata->state = CACHE_EXPECTING_RESPONSE; - } - else - { - csdata->state = CACHE_EXPECTING_NOTHING; - C_DEBUG("Using data from cache."); - gwbuf_free(packet); - DCB *dcb = csdata->session->client_dcb; + ss_dassert(!csdata->use_db); + size_t len = MYSQL_GET_PACKET_LEN(header) - 1; // Remove the command byte. + csdata->use_db = MXS_MALLOC(len + 1); - // TODO: This is not ok. Any filters before this filter, will not - // TODO: see this data. - rv = dcb->func.write(dcb, result); + if (csdata->use_db) + { + uint8_t *use_db = (uint8_t*)csdata->use_db; + gwbuf_copy_data(packet, MYSQL_HEADER_LEN + 1, len, use_db); + csdata->use_db[len] = 0; + csdata->state = CACHE_EXPECTING_USE_RESPONSE; + } + else + { + // Memory allocation failed. Let's remove the default database to + // prevent incorrect cache entries, since we won't know what the + // default db will be. + MXS_FREE(csdata->default_db); + csdata->default_db = NULL; + } } + break; + + case MYSQL_COM_QUERY: + { + GWBUF *tmp = gwbuf_make_contiguous(packet); + + if (tmp) + { + packet = tmp; + + // We do not care whether the query was fully parsed or not. + // If a query cannot be fully parsed, the worst thing that can + // happen is that caching is not used, even though it would be + // possible. + + if (qc_get_operation(packet) == QUERY_OP_SELECT) + { + GWBUF *result; + use_default = !route_using_cache(csdata, packet, &result); + + if (use_default) + { + csdata->state = CACHE_EXPECTING_RESPONSE; + } + else + { + csdata->state = CACHE_EXPECTING_NOTHING; + C_DEBUG("Using data from cache."); + gwbuf_free(packet); + DCB *dcb = csdata->session->client_dcb; + + // TODO: This is not ok. Any filters before this filter, will not + // TODO: see this data. + rv = dcb->func.write(dcb, result); + } + } + } + } + break; + + default: + break; } } @@ -519,6 +565,10 @@ static int clientReply(FILTER *instance, void *sdata, GWBUF *data) rv = handle_expecting_rows(csdata); break; + case CACHE_EXPECTING_USE_RESPONSE: + rv = handle_expecting_use_response(csdata); + break; + case CACHE_IGNORING_RESPONSE: rv = handle_ignoring_response(csdata); break; @@ -603,6 +653,8 @@ static void cache_session_data_free(CACHE_SESSION_DATA* data) { if (data) { + ss_dassert(!data->use_db); + MXS_FREE(data->default_db); MXS_FREE(data); } } @@ -704,7 +756,7 @@ static int handle_expecting_response(CACHE_SESSION_DATA *csdata) store_result(csdata); rv = send_upstream(csdata); - csdata->state = CACHE_EXPECTING_NOTHING; + csdata->state = CACHE_IGNORING_RESPONSE; break; case 0xfb: // GET_MORE_CLIENT_DATA/SEND_MORE_CLIENT_DATA @@ -818,6 +870,58 @@ static int handle_expecting_rows(CACHE_SESSION_DATA *csdata) return rv; } +/** + * Called when a response to a "USE db" is received from the server. + * + * @param csdata The cache session data. + */ +static int handle_expecting_use_response(CACHE_SESSION_DATA *csdata) +{ + ss_dassert(csdata->state == CACHE_EXPECTING_USE_RESPONSE); + ss_dassert(csdata->res.data); + + int rv = 1; + + size_t buflen = gwbuf_length(csdata->res.data); + + if (buflen >= MYSQL_HEADER_LEN + 1) // We need the command byte. + { + uint8_t command; + + gwbuf_copy_data(csdata->res.data, MYSQL_HEADER_LEN, 1, &command); + + switch (command) + { + case 0x00: // OK + // In case csdata->use_db could not be allocated in routeQuery(), we will + // in fact reset the default db here. That's ok as it will prevent broken + // entries in the cache. + MXS_FREE(csdata->default_db); + csdata->default_db = csdata->use_db; + csdata->use_db = NULL; + break; + + case 0xff: // ERR + MXS_FREE(csdata->use_db); + csdata->use_db = NULL; + break; + + default: + MXS_ERROR("\"USE %s\" received unexpected server response %d.", + csdata->use_db ? csdata->use_db : "", command); + MXS_FREE(csdata->default_db); + MXS_FREE(csdata->use_db); + csdata->default_db = NULL; + csdata->use_db = NULL; + } + + rv = send_upstream(csdata); + csdata->state = CACHE_IGNORING_RESPONSE; + } + + return rv; +} + /** * Called when all data from the server is ignored. * @@ -883,14 +987,19 @@ static void store_result(CACHE_SESSION_DATA *csdata) { ss_dassert(csdata->res.data); - csdata->res.data = gwbuf_make_contiguous(csdata->res.data); + GWBUF *data = gwbuf_make_contiguous(csdata->res.data); - cache_result_t result = csdata->api->putValue(csdata->storage, - csdata->key, - csdata->res.data); - - if (result != CACHE_RESULT_OK) + if (data) { - MXS_ERROR("Could not store cache item."); + csdata->res.data = data; + + cache_result_t result = csdata->api->putValue(csdata->storage, + csdata->key, + csdata->res.data); + + if (result != CACHE_RESULT_OK) + { + MXS_ERROR("Could not store cache item."); + } } }