diff --git a/server/modules/filter/maxrows/maxrows.c b/server/modules/filter/maxrows/maxrows.c index d3c12076c..c4401b53e 100644 --- a/server/modules/filter/maxrows/maxrows.c +++ b/server/modules/filter/maxrows/maxrows.c @@ -24,6 +24,8 @@ * 04/11/2016 Massimiliano Pinto Addition of SERVER_MORE_RESULTS_EXIST flag (0x0008) * detection in handle_expecting_rows(). * 07/11/2016 Massimiliano Pinto handle_expecting_rows renamed to handle_rows + * 20/12/2016 Massimiliano Pinto csdata->res.n_rows counter works with MULTI_RESULT + * and large packets (> 16MB) * * @endverbatim */ @@ -129,12 +131,11 @@ typedef struct maxrows_instance typedef enum maxrows_session_state { - MAXROWS_EXPECTING_RESPONSE, // A select has been sent, and we are waiting for the response. + MAXROWS_EXPECTING_RESPONSE = 1, // A select has been sent, and we are waiting for the response. MAXROWS_EXPECTING_FIELDS, // A select has been sent, and we want more fields. MAXROWS_EXPECTING_ROWS, // A select has been sent, and we want more rows. MAXROWS_EXPECTING_NOTHING, // We are not expecting anything from the server. MAXROWS_IGNORING_RESPONSE, // We are not interested in the data received from the server. - MAXROWS_DISCARDING_RESPONSE, // We have returned empty result set and we discard any new data. } maxrows_session_state_t; typedef struct maxrows_response_state @@ -155,9 +156,9 @@ typedef struct maxrows_session_data UPSTREAM up; /**< The next filter or equivalent. */ MAXROWS_RESPONSE_STATE res; /**< The response state. */ SESSION *session; /**< The session this data is associated with. */ - char *default_db; /**< The default database. */ - char *use_db; /**< Pending default database. Needs server response. */ maxrows_session_state_t state; + bool large_packet; /**< Large packet (> 16MB)) indicator */ + bool discard_resultset; /**< Discard resultset indicator */ } MAXROWS_SESSION_DATA; static MAXROWS_SESSION_DATA *maxrows_session_data_create(MAXROWS_INSTANCE *instance, SESSION *session); @@ -296,6 +297,8 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *packet) maxrows_response_state_reset(&csdata->res); csdata->state = MAXROWS_IGNORING_RESPONSE; + csdata->large_packet = false; + csdata->discard_resultset = false; switch ((int)MYSQL_GET_COMMAND(data)) { @@ -342,7 +345,7 @@ static int clientReply(FILTER *instance, void *sdata, GWBUF *data) if (csdata->state != MAXROWS_IGNORING_RESPONSE) { - if (csdata->state != MAXROWS_DISCARDING_RESPONSE) + if (!csdata->discard_resultset) { if (gwbuf_length(csdata->res.data) > csdata->instance->config.max_resultset_size) { @@ -354,7 +357,7 @@ static int clientReply(FILTER *instance, void *sdata, GWBUF *data) csdata->instance->config.max_resultset_size / 1024); } - csdata->state = MAXROWS_DISCARDING_RESPONSE; + csdata->discard_resultset = true; } } } @@ -373,7 +376,6 @@ static int clientReply(FILTER *instance, void *sdata, GWBUF *data) rv = handle_expecting_response(csdata); break; - case MAXROWS_DISCARDING_RESPONSE: case MAXROWS_EXPECTING_ROWS: rv = handle_rows(csdata); break; @@ -419,7 +421,7 @@ static void diagnostics(FILTER *instance, void *sdata, DCB *dcb) */ static uint64_t getCapabilities(void) { - return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_CONTIGUOUS_OUTPUT; + return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_STMT_OUTPUT; } /* API END */ @@ -452,29 +454,13 @@ static MAXROWS_SESSION_DATA *maxrows_session_data_create(MAXROWS_INSTANCE *insta if (data) { - char *default_db = NULL; - ss_dassert(session->client_dcb); ss_dassert(session->client_dcb->data); + MYSQL_session *mysql_session = (MYSQL_session*)session->client_dcb->data; - - if (mysql_session->db[0] != 0) - { - default_db = MXS_STRDUP(mysql_session->db); - } - - if ((mysql_session->db[0] == 0) || default_db) - { - data->instance = instance; - data->session = session; - data->state = MAXROWS_EXPECTING_NOTHING; - data->default_db = default_db; - } - else - { - MXS_FREE(data); - data = NULL; - } + data->instance = instance; + data->session = session; + data->state = MAXROWS_EXPECTING_NOTHING; } return data; @@ -489,11 +475,6 @@ static void maxrows_session_data_free(MAXROWS_SESSION_DATA* data) { if (data) { - // In normal circumstances, only data->default_db may be non-NULL at - // this point. However, if the authentication with the backend fails - // and the session is closed, data->use_db may be non-NULL. - MXS_FREE(data->use_db); - MXS_FREE(data->default_db); MXS_FREE(data); } } @@ -577,26 +558,55 @@ static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata) ss_dassert(csdata->res.data); int rv = 1; - size_t buflen = gwbuf_length(csdata->res.data); + // Reset field counters + csdata->res.n_fields = 0; + csdata->res.n_totalfields = 0; + // Reset large packet var + csdata->large_packet = false; + if (buflen >= MYSQL_HEADER_LEN + 1) // We need the command byte. { // Reserve enough space to accomodate for the largest length encoded integer, // which is type field + 8 bytes. uint8_t header[MYSQL_HEADER_LEN + 1 + 8]; - gwbuf_copy_data(csdata->res.data, 0, MYSQL_HEADER_LEN + 1, header); + + // Read packet header from buffer at current offset + gwbuf_copy_data(csdata->res.data, csdata->res.offset, MYSQL_HEADER_LEN + 1, header); switch ((int)MYSQL_GET_COMMAND(header)) { case 0x00: // OK case 0xff: // ERR + /** + * This also handles the OK packet that terminates + * a Multi-Resultset seen in handle_rows() + */ if (csdata->instance->config.debug & MAXROWS_DEBUG_DECISIONS) { - MXS_NOTICE("OK or ERR"); + if (csdata->res.n_rows) + { + MXS_NOTICE("OK or ERR seen. The resultset has %lu rows.%s", + csdata->res.n_rows, + csdata->discard_resultset ? " [Discarded]" : ""); + } + else + { + MXS_NOTICE("OK or ERR"); + } + } + + if (csdata->discard_resultset) + { + rv = send_ok_upstream(csdata); + csdata->state = MAXROWS_EXPECTING_NOTHING; + } + else + { + rv = send_upstream(csdata); + csdata->state = MAXROWS_IGNORING_RESPONSE; } - rv = send_upstream(csdata); - csdata->state = MAXROWS_IGNORING_RESPONSE; break; case 0xfb: // GET_MORE_CLIENT_DATA/SEND_MORE_CLIENT_DATA @@ -634,7 +644,7 @@ static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata) MYSQL_HEADER_LEN + 1, n_bytes - 1, &header[MYSQL_HEADER_LEN + 1]); csdata->res.n_totalfields = leint_value(&header[4]); - csdata->res.offset = MYSQL_HEADER_LEN + n_bytes; + csdata->res.offset += MYSQL_HEADER_LEN + n_bytes; csdata->state = MAXROWS_EXPECTING_FIELDS; rv = handle_expecting_fields(csdata); @@ -658,22 +668,29 @@ static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata) */ static int handle_rows(MAXROWS_SESSION_DATA *csdata) { - ss_dassert(csdata->state == MAXROWS_EXPECTING_ROWS || csdata->state == MAXROWS_DISCARDING_RESPONSE); + ss_dassert(csdata->state == MAXROWS_EXPECTING_ROWS); ss_dassert(csdata->res.data); int rv = 1; - bool insufficient = false; - size_t buflen = gwbuf_length(csdata->res.data); + // reset large packet indicator + csdata->large_packet = false; while (!insufficient && (buflen - csdata->res.offset >= MYSQL_HEADER_LEN)) { - uint8_t header[MAXROWS_EOF_PACKET_LEN]; //it holds a full EOF packet + // header array holds a full EOF packet + uint8_t header[MAXROWS_EOF_PACKET_LEN]; gwbuf_copy_data(csdata->res.data, csdata->res.offset, MAXROWS_EOF_PACKET_LEN, header); size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PACKET_LEN(header); + // Mark the beginning of a large packet + if (packetlen >= MYSQL_PACKET_LENGTH_MAX) + { + csdata->large_packet = true; + } + if (csdata->res.offset + packetlen <= buflen) { // We have at least one complete packet. @@ -685,6 +702,9 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata) csdata->res.offset += packetlen; ss_dassert(csdata->res.offset == buflen); + // This is the end of resultset: set big packet var to false + csdata->large_packet = false; + if (csdata->instance->config.debug & MAXROWS_DEBUG_DECISIONS) { MXS_NOTICE("Error packet seen while handling result set"); @@ -692,7 +712,9 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata) /* * This is the ERR packet that could terminate a Multi-Resultset. */ - if (csdata->state == MAXROWS_DISCARDING_RESPONSE) + + // Send data in buffer or empty resultset + if (csdata->discard_resultset) { rv = send_ok_upstream(csdata); } @@ -700,6 +722,7 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata) { rv = send_upstream(csdata); } + csdata->state = MAXROWS_EXPECTING_NOTHING; break; @@ -707,7 +730,8 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata) case 0x0: // OK packet after the rows. /* OK could the last packet in the Multi-Resultset transmission: * handle DISCARD or send all the data. - * But it could also be sent instead of EOF from as in MySQL 5.7.5 + * + * It could also be sent instead of EOF from as in MySQL 5.7.5 * if client sends CLIENT_DEPRECATE_EOF capability OK packet could * have the SERVER_MORE_RESULTS_EXIST flag. * Note: Flags in the OK packet are at the same offset as in EOF. @@ -731,14 +755,25 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata) int flags = gw_mysql_get_byte2(header + MAXROWS_MYSQL_EOF_PACKET_FLAGS_OFFSET); + // Check whether the EOF terminates the resultset or indicates MORE_RESULTS if (!(flags & SERVER_MORE_RESULTS_EXIST)) { - if (csdata->instance->config.debug & MAXROWS_DEBUG_DECISIONS) + // End of the resultset + if (csdata->large_packet) { - MXS_NOTICE("OK or EOF packet seen terminating the resultset"); + // Reset large packet indicator + csdata->large_packet = false; } - if (csdata->state == MAXROWS_DISCARDING_RESPONSE) + if (csdata->instance->config.debug & MAXROWS_DEBUG_DECISIONS) + { + MXS_NOTICE("OK or EOF packet seen: the resultset has %lu rows.%s", + csdata->res.n_rows, + csdata->discard_resultset ? " [Discarded]" : ""); + } + + // Discard data or send data + if (csdata->discard_resultset) { rv = send_ok_upstream(csdata); } @@ -751,9 +786,28 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata) } else { + /* + * SERVER_MORE_RESULTS_EXIST flag is present: additional resultsets will come. + * + * Note: the OK packet that terminates the Multi-Resultset + * is handled by handle_expecting_response() + */ + + csdata->state = MAXROWS_EXPECTING_RESPONSE; + + // Increase res.n_rows counter at the beginning of a large packet + if (csdata->large_packet) + { + // Turn off large packet indicator + csdata->large_packet = false; + csdata->res.n_rows++; + } + if (csdata->instance->config.debug & MAXROWS_DEBUG_DECISIONS) { - MXS_NOTICE("EOF or OK packet seen with SERVER_MORE_RESULTS_EXIST flag: waiting for more data"); + MXS_NOTICE("EOF or OK packet seen with SERVER_MORE_RESULTS_EXIST flag:" + " waiting for more data (%lu rows so far)", + csdata->res.n_rows); } } @@ -762,18 +816,24 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata) case 0xfb: // NULL default: // length-encoded-string csdata->res.offset += packetlen; - ++csdata->res.n_rows; + // Increase res.n_rows counter while not receiving large packets + if (!csdata->large_packet) + { + csdata->res.n_rows++; + } - if (csdata->state != MAXROWS_DISCARDING_RESPONSE) + // Check for max_resultset_rows limit + if (!csdata->discard_resultset) { if (csdata->res.n_rows > csdata->instance->config.max_resultset_rows) { if (csdata->instance->config.debug & MAXROWS_DEBUG_DISCARDING) { - MXS_INFO("max_resultset_rows %lu reached, not returning the result.", csdata->res.n_rows); + MXS_INFO("max_resultset_rows %lu reached, not returning the resultset.", csdata->res.n_rows); } - csdata->state = MAXROWS_DISCARDING_RESPONSE; + // Set the discard indicator + csdata->discard_resultset = true; } } break;