MXS-1068: Maxrows filter doesn't count properly the result rows with size greater than16MBytes

16MBytes packets are now handled in MULTI result sets as well.
MAXROWS_DISCARDING_RESPONSE state has been removed.
Two new flags control large packets and result discarding
This commit is contained in:
MassimilianoPinto
2016-12-21 10:06:59 +01:00
parent 5d1b94de88
commit 6da835ee44

View File

@ -24,6 +24,8 @@
* 04/11/2016 Massimiliano Pinto Addition of SERVER_MORE_RESULTS_EXIST flag (0x0008) * 04/11/2016 Massimiliano Pinto Addition of SERVER_MORE_RESULTS_EXIST flag (0x0008)
* detection in handle_expecting_rows(). * detection in handle_expecting_rows().
* 07/11/2016 Massimiliano Pinto handle_expecting_rows renamed to handle_rows * 07/11/2016 Massimiliano Pinto handle_expecting_rows renamed to handle_rows
* 20/12/2016 Massimiliano Pinto csdata->res.n_rows counter works with MULTI_RESULT
* and large packets (> 16MB)
* *
* @endverbatim * @endverbatim
*/ */
@ -129,12 +131,11 @@ typedef struct maxrows_instance
typedef enum maxrows_session_state typedef enum maxrows_session_state
{ {
MAXROWS_EXPECTING_RESPONSE, // A select has been sent, and we are waiting for the response. MAXROWS_EXPECTING_RESPONSE = 1, // A select has been sent, and we are waiting for the response.
MAXROWS_EXPECTING_FIELDS, // A select has been sent, and we want more fields. MAXROWS_EXPECTING_FIELDS, // A select has been sent, and we want more fields.
MAXROWS_EXPECTING_ROWS, // A select has been sent, and we want more rows. MAXROWS_EXPECTING_ROWS, // A select has been sent, and we want more rows.
MAXROWS_EXPECTING_NOTHING, // We are not expecting anything from the server. MAXROWS_EXPECTING_NOTHING, // We are not expecting anything from the server.
MAXROWS_IGNORING_RESPONSE, // We are not interested in the data received from the server. MAXROWS_IGNORING_RESPONSE, // We are not interested in the data received from the server.
MAXROWS_DISCARDING_RESPONSE, // We have returned empty result set and we discard any new data.
} maxrows_session_state_t; } maxrows_session_state_t;
typedef struct maxrows_response_state typedef struct maxrows_response_state
@ -155,9 +156,9 @@ typedef struct maxrows_session_data
UPSTREAM up; /**< The next filter or equivalent. */ UPSTREAM up; /**< The next filter or equivalent. */
MAXROWS_RESPONSE_STATE res; /**< The response state. */ MAXROWS_RESPONSE_STATE res; /**< The response state. */
SESSION *session; /**< The session this data is associated with. */ SESSION *session; /**< The session this data is associated with. */
char *default_db; /**< The default database. */
char *use_db; /**< Pending default database. Needs server response. */
maxrows_session_state_t state; maxrows_session_state_t state;
bool large_packet; /**< Large packet (> 16MB)) indicator */
bool discard_resultset; /**< Discard resultset indicator */
} MAXROWS_SESSION_DATA; } MAXROWS_SESSION_DATA;
static MAXROWS_SESSION_DATA *maxrows_session_data_create(MAXROWS_INSTANCE *instance, SESSION *session); static MAXROWS_SESSION_DATA *maxrows_session_data_create(MAXROWS_INSTANCE *instance, SESSION *session);
@ -296,6 +297,8 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *packet)
maxrows_response_state_reset(&csdata->res); maxrows_response_state_reset(&csdata->res);
csdata->state = MAXROWS_IGNORING_RESPONSE; csdata->state = MAXROWS_IGNORING_RESPONSE;
csdata->large_packet = false;
csdata->discard_resultset = false;
switch ((int)MYSQL_GET_COMMAND(data)) switch ((int)MYSQL_GET_COMMAND(data))
{ {
@ -342,7 +345,7 @@ static int clientReply(FILTER *instance, void *sdata, GWBUF *data)
if (csdata->state != MAXROWS_IGNORING_RESPONSE) if (csdata->state != MAXROWS_IGNORING_RESPONSE)
{ {
if (csdata->state != MAXROWS_DISCARDING_RESPONSE) if (!csdata->discard_resultset)
{ {
if (gwbuf_length(csdata->res.data) > csdata->instance->config.max_resultset_size) if (gwbuf_length(csdata->res.data) > csdata->instance->config.max_resultset_size)
{ {
@ -354,7 +357,7 @@ static int clientReply(FILTER *instance, void *sdata, GWBUF *data)
csdata->instance->config.max_resultset_size / 1024); csdata->instance->config.max_resultset_size / 1024);
} }
csdata->state = MAXROWS_DISCARDING_RESPONSE; csdata->discard_resultset = true;
} }
} }
} }
@ -373,7 +376,6 @@ static int clientReply(FILTER *instance, void *sdata, GWBUF *data)
rv = handle_expecting_response(csdata); rv = handle_expecting_response(csdata);
break; break;
case MAXROWS_DISCARDING_RESPONSE:
case MAXROWS_EXPECTING_ROWS: case MAXROWS_EXPECTING_ROWS:
rv = handle_rows(csdata); rv = handle_rows(csdata);
break; break;
@ -419,7 +421,7 @@ static void diagnostics(FILTER *instance, void *sdata, DCB *dcb)
*/ */
static uint64_t getCapabilities(void) static uint64_t getCapabilities(void)
{ {
return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_CONTIGUOUS_OUTPUT; return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_STMT_OUTPUT;
} }
/* API END */ /* API END */
@ -452,29 +454,13 @@ static MAXROWS_SESSION_DATA *maxrows_session_data_create(MAXROWS_INSTANCE *insta
if (data) if (data)
{ {
char *default_db = NULL;
ss_dassert(session->client_dcb); ss_dassert(session->client_dcb);
ss_dassert(session->client_dcb->data); ss_dassert(session->client_dcb->data);
MYSQL_session *mysql_session = (MYSQL_session*)session->client_dcb->data; MYSQL_session *mysql_session = (MYSQL_session*)session->client_dcb->data;
data->instance = instance;
if (mysql_session->db[0] != 0) data->session = session;
{ data->state = MAXROWS_EXPECTING_NOTHING;
default_db = MXS_STRDUP(mysql_session->db);
}
if ((mysql_session->db[0] == 0) || default_db)
{
data->instance = instance;
data->session = session;
data->state = MAXROWS_EXPECTING_NOTHING;
data->default_db = default_db;
}
else
{
MXS_FREE(data);
data = NULL;
}
} }
return data; return data;
@ -489,11 +475,6 @@ static void maxrows_session_data_free(MAXROWS_SESSION_DATA* data)
{ {
if (data) if (data)
{ {
// In normal circumstances, only data->default_db may be non-NULL at
// this point. However, if the authentication with the backend fails
// and the session is closed, data->use_db may be non-NULL.
MXS_FREE(data->use_db);
MXS_FREE(data->default_db);
MXS_FREE(data); MXS_FREE(data);
} }
} }
@ -577,26 +558,55 @@ static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata)
ss_dassert(csdata->res.data); ss_dassert(csdata->res.data);
int rv = 1; int rv = 1;
size_t buflen = gwbuf_length(csdata->res.data); size_t buflen = gwbuf_length(csdata->res.data);
// Reset field counters
csdata->res.n_fields = 0;
csdata->res.n_totalfields = 0;
// Reset large packet var
csdata->large_packet = false;
if (buflen >= MYSQL_HEADER_LEN + 1) // We need the command byte. if (buflen >= MYSQL_HEADER_LEN + 1) // We need the command byte.
{ {
// Reserve enough space to accomodate for the largest length encoded integer, // Reserve enough space to accomodate for the largest length encoded integer,
// which is type field + 8 bytes. // which is type field + 8 bytes.
uint8_t header[MYSQL_HEADER_LEN + 1 + 8]; uint8_t header[MYSQL_HEADER_LEN + 1 + 8];
gwbuf_copy_data(csdata->res.data, 0, MYSQL_HEADER_LEN + 1, header);
// Read packet header from buffer at current offset
gwbuf_copy_data(csdata->res.data, csdata->res.offset, MYSQL_HEADER_LEN + 1, header);
switch ((int)MYSQL_GET_COMMAND(header)) switch ((int)MYSQL_GET_COMMAND(header))
{ {
case 0x00: // OK case 0x00: // OK
case 0xff: // ERR case 0xff: // ERR
/**
* This also handles the OK packet that terminates
* a Multi-Resultset seen in handle_rows()
*/
if (csdata->instance->config.debug & MAXROWS_DEBUG_DECISIONS) if (csdata->instance->config.debug & MAXROWS_DEBUG_DECISIONS)
{ {
MXS_NOTICE("OK or ERR"); if (csdata->res.n_rows)
{
MXS_NOTICE("OK or ERR seen. The resultset has %lu rows.%s",
csdata->res.n_rows,
csdata->discard_resultset ? " [Discarded]" : "");
}
else
{
MXS_NOTICE("OK or ERR");
}
}
if (csdata->discard_resultset)
{
rv = send_ok_upstream(csdata);
csdata->state = MAXROWS_EXPECTING_NOTHING;
}
else
{
rv = send_upstream(csdata);
csdata->state = MAXROWS_IGNORING_RESPONSE;
} }
rv = send_upstream(csdata);
csdata->state = MAXROWS_IGNORING_RESPONSE;
break; break;
case 0xfb: // GET_MORE_CLIENT_DATA/SEND_MORE_CLIENT_DATA case 0xfb: // GET_MORE_CLIENT_DATA/SEND_MORE_CLIENT_DATA
@ -634,7 +644,7 @@ static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata)
MYSQL_HEADER_LEN + 1, n_bytes - 1, &header[MYSQL_HEADER_LEN + 1]); MYSQL_HEADER_LEN + 1, n_bytes - 1, &header[MYSQL_HEADER_LEN + 1]);
csdata->res.n_totalfields = leint_value(&header[4]); csdata->res.n_totalfields = leint_value(&header[4]);
csdata->res.offset = MYSQL_HEADER_LEN + n_bytes; csdata->res.offset += MYSQL_HEADER_LEN + n_bytes;
csdata->state = MAXROWS_EXPECTING_FIELDS; csdata->state = MAXROWS_EXPECTING_FIELDS;
rv = handle_expecting_fields(csdata); rv = handle_expecting_fields(csdata);
@ -658,22 +668,29 @@ static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata)
*/ */
static int handle_rows(MAXROWS_SESSION_DATA *csdata) static int handle_rows(MAXROWS_SESSION_DATA *csdata)
{ {
ss_dassert(csdata->state == MAXROWS_EXPECTING_ROWS || csdata->state == MAXROWS_DISCARDING_RESPONSE); ss_dassert(csdata->state == MAXROWS_EXPECTING_ROWS);
ss_dassert(csdata->res.data); ss_dassert(csdata->res.data);
int rv = 1; int rv = 1;
bool insufficient = false; bool insufficient = false;
size_t buflen = gwbuf_length(csdata->res.data); size_t buflen = gwbuf_length(csdata->res.data);
// reset large packet indicator
csdata->large_packet = false;
while (!insufficient && (buflen - csdata->res.offset >= MYSQL_HEADER_LEN)) while (!insufficient && (buflen - csdata->res.offset >= MYSQL_HEADER_LEN))
{ {
uint8_t header[MAXROWS_EOF_PACKET_LEN]; //it holds a full EOF packet // header array holds a full EOF packet
uint8_t header[MAXROWS_EOF_PACKET_LEN];
gwbuf_copy_data(csdata->res.data, csdata->res.offset, MAXROWS_EOF_PACKET_LEN, header); gwbuf_copy_data(csdata->res.data, csdata->res.offset, MAXROWS_EOF_PACKET_LEN, header);
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PACKET_LEN(header); size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PACKET_LEN(header);
// Mark the beginning of a large packet
if (packetlen >= MYSQL_PACKET_LENGTH_MAX)
{
csdata->large_packet = true;
}
if (csdata->res.offset + packetlen <= buflen) if (csdata->res.offset + packetlen <= buflen)
{ {
// We have at least one complete packet. // We have at least one complete packet.
@ -685,6 +702,9 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
csdata->res.offset += packetlen; csdata->res.offset += packetlen;
ss_dassert(csdata->res.offset == buflen); ss_dassert(csdata->res.offset == buflen);
// This is the end of resultset: set big packet var to false
csdata->large_packet = false;
if (csdata->instance->config.debug & MAXROWS_DEBUG_DECISIONS) if (csdata->instance->config.debug & MAXROWS_DEBUG_DECISIONS)
{ {
MXS_NOTICE("Error packet seen while handling result set"); MXS_NOTICE("Error packet seen while handling result set");
@ -692,7 +712,9 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
/* /*
* This is the ERR packet that could terminate a Multi-Resultset. * This is the ERR packet that could terminate a Multi-Resultset.
*/ */
if (csdata->state == MAXROWS_DISCARDING_RESPONSE)
// Send data in buffer or empty resultset
if (csdata->discard_resultset)
{ {
rv = send_ok_upstream(csdata); rv = send_ok_upstream(csdata);
} }
@ -700,6 +722,7 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
{ {
rv = send_upstream(csdata); rv = send_upstream(csdata);
} }
csdata->state = MAXROWS_EXPECTING_NOTHING; csdata->state = MAXROWS_EXPECTING_NOTHING;
break; break;
@ -707,7 +730,8 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
case 0x0: // OK packet after the rows. case 0x0: // OK packet after the rows.
/* OK could the last packet in the Multi-Resultset transmission: /* OK could the last packet in the Multi-Resultset transmission:
* handle DISCARD or send all the data. * handle DISCARD or send all the data.
* But it could also be sent instead of EOF from as in MySQL 5.7.5 *
* It could also be sent instead of EOF from as in MySQL 5.7.5
* if client sends CLIENT_DEPRECATE_EOF capability OK packet could * if client sends CLIENT_DEPRECATE_EOF capability OK packet could
* have the SERVER_MORE_RESULTS_EXIST flag. * have the SERVER_MORE_RESULTS_EXIST flag.
* Note: Flags in the OK packet are at the same offset as in EOF. * Note: Flags in the OK packet are at the same offset as in EOF.
@ -731,14 +755,25 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
int flags = gw_mysql_get_byte2(header + MAXROWS_MYSQL_EOF_PACKET_FLAGS_OFFSET); int flags = gw_mysql_get_byte2(header + MAXROWS_MYSQL_EOF_PACKET_FLAGS_OFFSET);
// Check whether the EOF terminates the resultset or indicates MORE_RESULTS
if (!(flags & SERVER_MORE_RESULTS_EXIST)) if (!(flags & SERVER_MORE_RESULTS_EXIST))
{ {
if (csdata->instance->config.debug & MAXROWS_DEBUG_DECISIONS) // End of the resultset
if (csdata->large_packet)
{ {
MXS_NOTICE("OK or EOF packet seen terminating the resultset"); // Reset large packet indicator
csdata->large_packet = false;
} }
if (csdata->state == MAXROWS_DISCARDING_RESPONSE) if (csdata->instance->config.debug & MAXROWS_DEBUG_DECISIONS)
{
MXS_NOTICE("OK or EOF packet seen: the resultset has %lu rows.%s",
csdata->res.n_rows,
csdata->discard_resultset ? " [Discarded]" : "");
}
// Discard data or send data
if (csdata->discard_resultset)
{ {
rv = send_ok_upstream(csdata); rv = send_ok_upstream(csdata);
} }
@ -751,9 +786,28 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
} }
else else
{ {
/*
* SERVER_MORE_RESULTS_EXIST flag is present: additional resultsets will come.
*
* Note: the OK packet that terminates the Multi-Resultset
* is handled by handle_expecting_response()
*/
csdata->state = MAXROWS_EXPECTING_RESPONSE;
// Increase res.n_rows counter at the beginning of a large packet
if (csdata->large_packet)
{
// Turn off large packet indicator
csdata->large_packet = false;
csdata->res.n_rows++;
}
if (csdata->instance->config.debug & MAXROWS_DEBUG_DECISIONS) if (csdata->instance->config.debug & MAXROWS_DEBUG_DECISIONS)
{ {
MXS_NOTICE("EOF or OK packet seen with SERVER_MORE_RESULTS_EXIST flag: waiting for more data"); MXS_NOTICE("EOF or OK packet seen with SERVER_MORE_RESULTS_EXIST flag:"
" waiting for more data (%lu rows so far)",
csdata->res.n_rows);
} }
} }
@ -762,18 +816,24 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
case 0xfb: // NULL case 0xfb: // NULL
default: // length-encoded-string default: // length-encoded-string
csdata->res.offset += packetlen; csdata->res.offset += packetlen;
++csdata->res.n_rows; // Increase res.n_rows counter while not receiving large packets
if (!csdata->large_packet)
{
csdata->res.n_rows++;
}
if (csdata->state != MAXROWS_DISCARDING_RESPONSE) // Check for max_resultset_rows limit
if (!csdata->discard_resultset)
{ {
if (csdata->res.n_rows > csdata->instance->config.max_resultset_rows) if (csdata->res.n_rows > csdata->instance->config.max_resultset_rows)
{ {
if (csdata->instance->config.debug & MAXROWS_DEBUG_DISCARDING) if (csdata->instance->config.debug & MAXROWS_DEBUG_DISCARDING)
{ {
MXS_INFO("max_resultset_rows %lu reached, not returning the result.", csdata->res.n_rows); MXS_INFO("max_resultset_rows %lu reached, not returning the resultset.", csdata->res.n_rows);
} }
csdata->state = MAXROWS_DISCARDING_RESPONSE; // Set the discard indicator
csdata->discard_resultset = true;
} }
} }
break; break;