MXS-1211: maxrows should be configurable to return error when limit has been exceeded
New parameter added to maxsrows filter: max_resultset_return=empty|error|ok Default, 'empty' is to return an empty set, as the current implementation. 'err' will return an ERR reply with the input SQL statement 'ok' will return an OK packet
This commit is contained in:
@ -74,6 +74,7 @@ MXS_BEGIN_DECLS
|
|||||||
#define MYSQL_CHECKSUM_LEN 4
|
#define MYSQL_CHECKSUM_LEN 4
|
||||||
#define MYSQL_EOF_PACKET_LEN 9
|
#define MYSQL_EOF_PACKET_LEN 9
|
||||||
#define MYSQL_OK_PACKET_MIN_LEN 11
|
#define MYSQL_OK_PACKET_MIN_LEN 11
|
||||||
|
#define MYSQL_ERR_PACKET_MIN_LEN 9
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Offsets and sizes of various parts of the client packet. If the offset is
|
* Offsets and sizes of various parts of the client packet. If the offset is
|
||||||
|
@ -46,17 +46,47 @@
|
|||||||
#include <maxscale/debug.h>
|
#include <maxscale/debug.h>
|
||||||
#include "maxrows.h"
|
#include "maxrows.h"
|
||||||
|
|
||||||
static MXS_FILTER *createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *);
|
static MXS_FILTER *createInstance(const char *name,
|
||||||
static MXS_FILTER_SESSION *newSession(MXS_FILTER *instance, MXS_SESSION *session);
|
char **options,
|
||||||
static void closeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata);
|
MXS_CONFIG_PARAMETER *);
|
||||||
static void freeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata);
|
static MXS_FILTER_SESSION *newSession(MXS_FILTER *instance,
|
||||||
static void setDownstream(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, MXS_DOWNSTREAM *downstream);
|
MXS_SESSION *session);
|
||||||
static void setUpstream(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, MXS_UPSTREAM *upstream);
|
static void closeSession(MXS_FILTER *instance,
|
||||||
static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *queue);
|
MXS_FILTER_SESSION *sdata);
|
||||||
static int clientReply(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *queue);
|
static void freeSession(MXS_FILTER *instance,
|
||||||
static void diagnostics(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, DCB *dcb);
|
MXS_FILTER_SESSION *sdata);
|
||||||
|
static void setDownstream(MXS_FILTER *instance,
|
||||||
|
MXS_FILTER_SESSION *sdata,
|
||||||
|
MXS_DOWNSTREAM *downstream);
|
||||||
|
static void setUpstream(MXS_FILTER *instance,
|
||||||
|
MXS_FILTER_SESSION *sdata,
|
||||||
|
MXS_UPSTREAM *upstream);
|
||||||
|
static int routeQuery(MXS_FILTER *instance,
|
||||||
|
MXS_FILTER_SESSION *sdata,
|
||||||
|
GWBUF *queue);
|
||||||
|
static int clientReply(MXS_FILTER *instance,
|
||||||
|
MXS_FILTER_SESSION *sdata,
|
||||||
|
GWBUF *queue);
|
||||||
|
static void diagnostics(MXS_FILTER *instance,
|
||||||
|
MXS_FILTER_SESSION *sdata,
|
||||||
|
DCB *dcb);
|
||||||
static uint64_t getCapabilities(MXS_FILTER *instance);
|
static uint64_t getCapabilities(MXS_FILTER *instance);
|
||||||
|
|
||||||
|
enum maxrows_return_mode
|
||||||
|
{
|
||||||
|
MAXROWS_RETURN_EMPTY = 0,
|
||||||
|
MAXROWS_RETURN_ERR,
|
||||||
|
MAXROWS_RETURN_OK
|
||||||
|
};
|
||||||
|
|
||||||
|
static const MXS_ENUM_VALUE return_option_values[] =
|
||||||
|
{
|
||||||
|
{"empty", MAXROWS_RETURN_EMPTY},
|
||||||
|
{"error", MAXROWS_RETURN_ERR},
|
||||||
|
{"ok", MAXROWS_RETURN_OK},
|
||||||
|
{NULL}
|
||||||
|
};
|
||||||
|
|
||||||
/* Global symbols of the Module */
|
/* Global symbols of the Module */
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -109,6 +139,13 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
|||||||
MXS_MODULE_PARAM_COUNT,
|
MXS_MODULE_PARAM_COUNT,
|
||||||
MAXROWS_DEFAULT_DEBUG
|
MAXROWS_DEFAULT_DEBUG
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"max_resultset_return",
|
||||||
|
MXS_MODULE_PARAM_ENUM,
|
||||||
|
"empty",
|
||||||
|
MXS_MODULE_OPT_ENUM_UNIQUE,
|
||||||
|
return_option_values
|
||||||
|
},
|
||||||
{MXS_END_MODULE_PARAMS}
|
{MXS_END_MODULE_PARAMS}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -120,9 +157,10 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
|||||||
|
|
||||||
typedef struct maxrows_config
|
typedef struct maxrows_config
|
||||||
{
|
{
|
||||||
uint32_t max_resultset_rows;
|
uint32_t max_resultset_rows;
|
||||||
uint32_t max_resultset_size;
|
uint32_t max_resultset_size;
|
||||||
uint32_t debug;
|
uint32_t debug;
|
||||||
|
enum maxrows_return_mode m_return;
|
||||||
} MAXROWS_CONFIG;
|
} MAXROWS_CONFIG;
|
||||||
|
|
||||||
typedef struct maxrows_instance
|
typedef struct maxrows_instance
|
||||||
@ -154,7 +192,7 @@ static void maxrows_response_state_reset(MAXROWS_RESPONSE_STATE *state);
|
|||||||
|
|
||||||
typedef struct maxrows_session_data
|
typedef struct maxrows_session_data
|
||||||
{
|
{
|
||||||
MAXROWS_INSTANCE *instance; /**< The maxrows instance the session is associated with. */
|
MAXROWS_INSTANCE *instance; /**< The maxrows instance the session is associated with. */
|
||||||
MXS_DOWNSTREAM down; /**< The previous filter or equivalent. */
|
MXS_DOWNSTREAM down; /**< The previous filter or equivalent. */
|
||||||
MXS_UPSTREAM up; /**< The next filter or equivalent. */
|
MXS_UPSTREAM up; /**< The next filter or equivalent. */
|
||||||
MAXROWS_RESPONSE_STATE res; /**< The response state. */
|
MAXROWS_RESPONSE_STATE res; /**< The response state. */
|
||||||
@ -162,9 +200,11 @@ typedef struct maxrows_session_data
|
|||||||
maxrows_session_state_t state;
|
maxrows_session_state_t state;
|
||||||
bool large_packet; /**< Large packet (> 16MB)) indicator */
|
bool large_packet; /**< Large packet (> 16MB)) indicator */
|
||||||
bool discard_resultset; /**< Discard resultset indicator */
|
bool discard_resultset; /**< Discard resultset indicator */
|
||||||
|
GWBUF *input_sql; /**< Input query */
|
||||||
} MAXROWS_SESSION_DATA;
|
} MAXROWS_SESSION_DATA;
|
||||||
|
|
||||||
static MAXROWS_SESSION_DATA *maxrows_session_data_create(MAXROWS_INSTANCE *instance, MXS_SESSION *session);
|
static MAXROWS_SESSION_DATA *maxrows_session_data_create(MAXROWS_INSTANCE *instance,
|
||||||
|
MXS_SESSION *session);
|
||||||
static void maxrows_session_data_free(MAXROWS_SESSION_DATA *data);
|
static void maxrows_session_data_free(MAXROWS_SESSION_DATA *data);
|
||||||
|
|
||||||
static int handle_expecting_fields(MAXROWS_SESSION_DATA *csdata);
|
static int handle_expecting_fields(MAXROWS_SESSION_DATA *csdata);
|
||||||
@ -172,10 +212,14 @@ static int handle_expecting_nothing(MAXROWS_SESSION_DATA *csdata);
|
|||||||
static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata);
|
static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata);
|
||||||
static int handle_rows(MAXROWS_SESSION_DATA *csdata);
|
static int handle_rows(MAXROWS_SESSION_DATA *csdata);
|
||||||
static int handle_ignoring_response(MAXROWS_SESSION_DATA *csdata);
|
static int handle_ignoring_response(MAXROWS_SESSION_DATA *csdata);
|
||||||
static bool process_params(char **options, MXS_CONFIG_PARAMETER *params, MAXROWS_CONFIG* config);
|
static bool process_params(char **options,
|
||||||
|
MXS_CONFIG_PARAMETER *params,
|
||||||
|
MAXROWS_CONFIG* config);
|
||||||
|
|
||||||
static int send_upstream(MAXROWS_SESSION_DATA *csdata);
|
static int send_upstream(MAXROWS_SESSION_DATA *csdata);
|
||||||
static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata, size_t offset);
|
static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata);
|
||||||
|
static int send_error_upstream(MAXROWS_SESSION_DATA *csdata);
|
||||||
|
static int send_maxrows_reply_limit(MAXROWS_SESSION_DATA *csdata);
|
||||||
|
|
||||||
/* API BEGIN */
|
/* API BEGIN */
|
||||||
|
|
||||||
@ -189,15 +233,22 @@ static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata, size_t offset);
|
|||||||
*
|
*
|
||||||
* @return The instance data for this new instance
|
* @return The instance data for this new instance
|
||||||
*/
|
*/
|
||||||
static MXS_FILTER *createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *params)
|
static MXS_FILTER *createInstance(const char *name,
|
||||||
|
char **options,
|
||||||
|
MXS_CONFIG_PARAMETER *params)
|
||||||
{
|
{
|
||||||
MAXROWS_INSTANCE *cinstance = MXS_CALLOC(1, sizeof(MAXROWS_INSTANCE));
|
MAXROWS_INSTANCE *cinstance = MXS_CALLOC(1, sizeof(MAXROWS_INSTANCE));
|
||||||
|
|
||||||
if (cinstance)
|
if (cinstance)
|
||||||
{
|
{
|
||||||
cinstance->name = name;
|
cinstance->name = name;
|
||||||
cinstance->config.max_resultset_rows = config_get_integer(params, "max_resultset_rows");
|
cinstance->config.max_resultset_rows = config_get_integer(params,
|
||||||
cinstance->config.max_resultset_size = config_get_integer(params, "max_resultset_size");
|
"max_resultset_rows");
|
||||||
|
cinstance->config.max_resultset_size = config_get_integer(params,
|
||||||
|
"max_resultset_size");
|
||||||
|
cinstance->config.m_return = config_get_enum(params,
|
||||||
|
"max_resultset_return",
|
||||||
|
return_option_values);
|
||||||
cinstance->config.debug = config_get_integer(params, "debug");
|
cinstance->config.debug = config_get_integer(params, "debug");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -283,7 +334,9 @@ static void setUpstream(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, MXS_UPS
|
|||||||
* @param sdata The filter session data
|
* @param sdata The filter session data
|
||||||
* @param buffer Buffer containing an MySQL protocol packet.
|
* @param buffer Buffer containing an MySQL protocol packet.
|
||||||
*/
|
*/
|
||||||
static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *packet)
|
static int routeQuery(MXS_FILTER *instance,
|
||||||
|
MXS_FILTER_SESSION *sdata,
|
||||||
|
GWBUF *packet)
|
||||||
{
|
{
|
||||||
MAXROWS_INSTANCE *cinstance = (MAXROWS_INSTANCE*)instance;
|
MAXROWS_INSTANCE *cinstance = (MAXROWS_INSTANCE*)instance;
|
||||||
MAXROWS_SESSION_DATA *csdata = (MAXROWS_SESSION_DATA*)sdata;
|
MAXROWS_SESSION_DATA *csdata = (MAXROWS_SESSION_DATA*)sdata;
|
||||||
@ -293,7 +346,8 @@ static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *pa
|
|||||||
// All of these should be guaranteed by RCAP_TYPE_TRANSACTION_TRACKING
|
// All of these should be guaranteed by RCAP_TYPE_TRANSACTION_TRACKING
|
||||||
ss_dassert(GWBUF_IS_CONTIGUOUS(packet));
|
ss_dassert(GWBUF_IS_CONTIGUOUS(packet));
|
||||||
ss_dassert(GWBUF_LENGTH(packet) >= MYSQL_HEADER_LEN + 1);
|
ss_dassert(GWBUF_LENGTH(packet) >= MYSQL_HEADER_LEN + 1);
|
||||||
ss_dassert(MYSQL_GET_PAYLOAD_LEN(data) + MYSQL_HEADER_LEN == GWBUF_LENGTH(packet));
|
ss_dassert(MYSQL_GET_PAYLOAD_LEN(data) +
|
||||||
|
MYSQL_HEADER_LEN == GWBUF_LENGTH(packet));
|
||||||
|
|
||||||
maxrows_response_state_reset(&csdata->res);
|
maxrows_response_state_reset(&csdata->res);
|
||||||
csdata->state = MAXROWS_IGNORING_RESPONSE;
|
csdata->state = MAXROWS_IGNORING_RESPONSE;
|
||||||
@ -305,6 +359,23 @@ static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *pa
|
|||||||
case MYSQL_COM_QUERY:
|
case MYSQL_COM_QUERY:
|
||||||
case MYSQL_COM_STMT_EXECUTE:
|
case MYSQL_COM_STMT_EXECUTE:
|
||||||
{
|
{
|
||||||
|
/* Set input query only with MAXROWS_RETURN_ERR */
|
||||||
|
if (csdata->instance->config.m_return == MAXROWS_RETURN_ERR &&
|
||||||
|
(csdata->input_sql = gwbuf_clone(packet)) == NULL)
|
||||||
|
{
|
||||||
|
csdata->state = MAXROWS_EXPECTING_NOTHING;
|
||||||
|
|
||||||
|
/* Abort client connection on copy failure */
|
||||||
|
poll_fake_hangup_event(csdata->session->client_dcb);
|
||||||
|
gwbuf_free(csdata->res.data);
|
||||||
|
gwbuf_free(packet);
|
||||||
|
MXS_FREE(csdata);
|
||||||
|
csdata->res.data = NULL;
|
||||||
|
packet = NULL;
|
||||||
|
csdata = NULL;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
csdata->state = MAXROWS_EXPECTING_RESPONSE;
|
csdata->state = MAXROWS_EXPECTING_RESPONSE;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -318,7 +389,9 @@ static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *pa
|
|||||||
MXS_NOTICE("Maxrows filter is sending data.");
|
MXS_NOTICE("Maxrows filter is sending data.");
|
||||||
}
|
}
|
||||||
|
|
||||||
return csdata->down.routeQuery(csdata->down.instance, csdata->down.session, packet);
|
return csdata->down.routeQuery(csdata->down.instance,
|
||||||
|
csdata->down.session,
|
||||||
|
packet);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -328,7 +401,9 @@ static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *pa
|
|||||||
* @param sdata The filter session data
|
* @param sdata The filter session data
|
||||||
* @param queue The query data
|
* @param queue The query data
|
||||||
*/
|
*/
|
||||||
static int clientReply(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *data)
|
static int clientReply(MXS_FILTER *instance,
|
||||||
|
MXS_FILTER_SESSION *sdata,
|
||||||
|
GWBUF *data)
|
||||||
{
|
{
|
||||||
MAXROWS_INSTANCE *cinstance = (MAXROWS_INSTANCE*)instance;
|
MAXROWS_INSTANCE *cinstance = (MAXROWS_INSTANCE*)instance;
|
||||||
MAXROWS_SESSION_DATA *csdata = (MAXROWS_SESSION_DATA*)sdata;
|
MAXROWS_SESSION_DATA *csdata = (MAXROWS_SESSION_DATA*)sdata;
|
||||||
@ -386,7 +461,8 @@ static int clientReply(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *d
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
MXS_ERROR("Internal filter logic broken, unexpected state: %d", csdata->state);
|
MXS_ERROR("Internal filter logic broken, unexpected state: %d",
|
||||||
|
csdata->state);
|
||||||
ss_dassert(!true);
|
ss_dassert(!true);
|
||||||
rv = send_upstream(csdata);
|
rv = send_upstream(csdata);
|
||||||
maxrows_response_state_reset(&csdata->res);
|
maxrows_response_state_reset(&csdata->res);
|
||||||
@ -462,6 +538,7 @@ static MAXROWS_SESSION_DATA *maxrows_session_data_create(MAXROWS_INSTANCE *insta
|
|||||||
MYSQL_session *mysql_session = (MYSQL_session*)session->client_dcb->data;
|
MYSQL_session *mysql_session = (MYSQL_session*)session->client_dcb->data;
|
||||||
data->instance = instance;
|
data->instance = instance;
|
||||||
data->session = session;
|
data->session = session;
|
||||||
|
data->input_sql = NULL;
|
||||||
data->state = MAXROWS_EXPECTING_NOTHING;
|
data->state = MAXROWS_EXPECTING_NOTHING;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -500,7 +577,10 @@ static int handle_expecting_fields(MAXROWS_SESSION_DATA *csdata)
|
|||||||
while (!insufficient && (buflen - csdata->res.offset >= MYSQL_HEADER_LEN))
|
while (!insufficient && (buflen - csdata->res.offset >= MYSQL_HEADER_LEN))
|
||||||
{
|
{
|
||||||
uint8_t header[MYSQL_HEADER_LEN + 1];
|
uint8_t header[MYSQL_HEADER_LEN + 1];
|
||||||
gwbuf_copy_data(csdata->res.data, csdata->res.offset, MYSQL_HEADER_LEN + 1, header);
|
gwbuf_copy_data(csdata->res.data,
|
||||||
|
csdata->res.offset,
|
||||||
|
MYSQL_HEADER_LEN + 1,
|
||||||
|
header);
|
||||||
|
|
||||||
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PAYLOAD_LEN(header);
|
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PAYLOAD_LEN(header);
|
||||||
|
|
||||||
@ -584,7 +664,10 @@ static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata)
|
|||||||
uint8_t header[MYSQL_HEADER_LEN + 1 + 8];
|
uint8_t header[MYSQL_HEADER_LEN + 1 + 8];
|
||||||
|
|
||||||
// Read packet header from buffer at current offset
|
// Read packet header from buffer at current offset
|
||||||
gwbuf_copy_data(csdata->res.data, csdata->res.offset, MYSQL_HEADER_LEN + 1, header);
|
gwbuf_copy_data(csdata->res.data,
|
||||||
|
csdata->res.offset,
|
||||||
|
MYSQL_HEADER_LEN + 1,
|
||||||
|
header);
|
||||||
|
|
||||||
switch ((int)MYSQL_GET_COMMAND(header))
|
switch ((int)MYSQL_GET_COMMAND(header))
|
||||||
{
|
{
|
||||||
@ -610,7 +693,7 @@ static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata)
|
|||||||
|
|
||||||
if (csdata->discard_resultset)
|
if (csdata->discard_resultset)
|
||||||
{
|
{
|
||||||
rv = send_eof_upstream(csdata, csdata->res.rows_offset);
|
rv = send_maxrows_reply_limit(csdata);
|
||||||
csdata->state = MAXROWS_EXPECTING_NOTHING;
|
csdata->state = MAXROWS_EXPECTING_NOTHING;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -652,7 +735,9 @@ static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata)
|
|||||||
// Now we can figure out how many fields there are, but first we
|
// Now we can figure out how many fields there are, but first we
|
||||||
// need to copy some more data.
|
// need to copy some more data.
|
||||||
gwbuf_copy_data(csdata->res.data,
|
gwbuf_copy_data(csdata->res.data,
|
||||||
MYSQL_HEADER_LEN + 1, n_bytes - 1, &header[MYSQL_HEADER_LEN + 1]);
|
MYSQL_HEADER_LEN + 1,
|
||||||
|
n_bytes - 1,
|
||||||
|
&header[MYSQL_HEADER_LEN + 1]);
|
||||||
|
|
||||||
csdata->res.n_totalfields = mxs_leint_value(&header[4]);
|
csdata->res.n_totalfields = mxs_leint_value(&header[4]);
|
||||||
csdata->res.offset += MYSQL_HEADER_LEN + n_bytes;
|
csdata->res.offset += MYSQL_HEADER_LEN + n_bytes;
|
||||||
@ -691,7 +776,10 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
|||||||
bool pending_large_data = csdata->large_packet;
|
bool pending_large_data = csdata->large_packet;
|
||||||
// header array holds a full EOF packet
|
// header array holds a full EOF packet
|
||||||
uint8_t header[MYSQL_EOF_PACKET_LEN];
|
uint8_t header[MYSQL_EOF_PACKET_LEN];
|
||||||
gwbuf_copy_data(csdata->res.data, csdata->res.offset, MYSQL_EOF_PACKET_LEN, header);
|
gwbuf_copy_data(csdata->res.data,
|
||||||
|
csdata->res.offset,
|
||||||
|
MYSQL_EOF_PACKET_LEN,
|
||||||
|
header);
|
||||||
|
|
||||||
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PAYLOAD_LEN(header);
|
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PAYLOAD_LEN(header);
|
||||||
|
|
||||||
@ -702,7 +790,9 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
|||||||
* max is 1 byte less than EOF_PACKET_LEN
|
* max is 1 byte less than EOF_PACKET_LEN
|
||||||
* If true skip data processing.
|
* If true skip data processing.
|
||||||
*/
|
*/
|
||||||
if (pending_large_data && (packetlen >= MYSQL_HEADER_LEN && packetlen < MYSQL_EOF_PACKET_LEN))
|
if (pending_large_data &&
|
||||||
|
(packetlen >= MYSQL_HEADER_LEN &&
|
||||||
|
packetlen < MYSQL_EOF_PACKET_LEN))
|
||||||
{
|
{
|
||||||
// Update offset, number of rows and break
|
// Update offset, number of rows and break
|
||||||
csdata->res.offset += packetlen;
|
csdata->res.offset += packetlen;
|
||||||
@ -757,7 +847,7 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
|||||||
// Send data in buffer or empty resultset
|
// Send data in buffer or empty resultset
|
||||||
if (csdata->discard_resultset)
|
if (csdata->discard_resultset)
|
||||||
{
|
{
|
||||||
rv = send_eof_upstream(csdata, csdata->res.rows_offset);
|
rv = send_maxrows_reply_limit(csdata);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -789,8 +879,10 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
|||||||
*/
|
*/
|
||||||
if (packetlen < MYSQL_EOF_PACKET_LEN)
|
if (packetlen < MYSQL_EOF_PACKET_LEN)
|
||||||
{
|
{
|
||||||
MXS_ERROR("EOF packet has size of %lu instead of %d", packetlen, MYSQL_EOF_PACKET_LEN);
|
MXS_ERROR("EOF packet has size of %lu instead of %d",
|
||||||
rv = send_eof_upstream(csdata, csdata->res.rows_offset);
|
packetlen,
|
||||||
|
MYSQL_EOF_PACKET_LEN);
|
||||||
|
rv = send_maxrows_reply_limit(csdata);
|
||||||
csdata->state = MAXROWS_EXPECTING_NOTHING;
|
csdata->state = MAXROWS_EXPECTING_NOTHING;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -811,7 +903,7 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
|||||||
// Discard data or send data
|
// Discard data or send data
|
||||||
if (csdata->discard_resultset)
|
if (csdata->discard_resultset)
|
||||||
{
|
{
|
||||||
rv = send_eof_upstream(csdata, csdata->res.rows_offset);
|
rv = send_maxrows_reply_limit(csdata);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -857,7 +949,8 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
|||||||
{
|
{
|
||||||
if (csdata->instance->config.debug & MAXROWS_DEBUG_DISCARDING)
|
if (csdata->instance->config.debug & MAXROWS_DEBUG_DISCARDING)
|
||||||
{
|
{
|
||||||
MXS_INFO("max_resultset_rows %lu reached, not returning the resultset.", csdata->res.n_rows);
|
MXS_INFO("max_resultset_rows %lu reached, not returning the resultset.",
|
||||||
|
csdata->res.n_rows);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the discard indicator
|
// Set the discard indicator
|
||||||
@ -901,7 +994,16 @@ static int send_upstream(MAXROWS_SESSION_DATA *csdata)
|
|||||||
{
|
{
|
||||||
ss_dassert(csdata->res.data != NULL);
|
ss_dassert(csdata->res.data != NULL);
|
||||||
|
|
||||||
int rv = csdata->up.clientReply(csdata->up.instance, csdata->up.session, csdata->res.data);
|
/* Free a saved SQL not freed by send_error_upstream() */
|
||||||
|
if (csdata->input_sql)
|
||||||
|
{
|
||||||
|
gwbuf_free(csdata->input_sql);
|
||||||
|
csdata->input_sql = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int rv = csdata->up.clientReply(csdata->up.instance,
|
||||||
|
csdata->up.session,
|
||||||
|
csdata->res.data);
|
||||||
csdata->res.data = NULL;
|
csdata->res.data = NULL;
|
||||||
|
|
||||||
return rv;
|
return rv;
|
||||||
@ -914,18 +1016,21 @@ static int send_upstream(MAXROWS_SESSION_DATA *csdata)
|
|||||||
* at the end.
|
* at the end.
|
||||||
*
|
*
|
||||||
* @param csdata Session data
|
* @param csdata Session data
|
||||||
* @param offset The offset to server reply pointing to
|
|
||||||
* next byte after column definitions EOF
|
|
||||||
* of the first result set.
|
|
||||||
*
|
*
|
||||||
* @return Whatever the upstream returns.
|
* @return Non-Zero if successful, 0 on errors
|
||||||
*/
|
*/
|
||||||
static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata, size_t offset)
|
static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata)
|
||||||
{
|
{
|
||||||
int rv = -1;
|
int rv = -1;
|
||||||
/* Sequence byte is #3 */
|
/* Sequence byte is #3 */
|
||||||
uint8_t eof[MYSQL_EOF_PACKET_LEN] = {05, 00, 00, 01, 0xfe, 00, 00, 02, 00};
|
uint8_t eof[MYSQL_EOF_PACKET_LEN] = {05, 00, 00, 01, 0xfe, 00, 00, 02, 00};
|
||||||
GWBUF *new_pkt = NULL;
|
GWBUF *new_pkt = NULL;
|
||||||
|
/**
|
||||||
|
* The offset to server reply pointing to
|
||||||
|
* next byte after column definitions EOF
|
||||||
|
* of the first result set.
|
||||||
|
*/
|
||||||
|
size_t offset = csdata->res.rows_offset;
|
||||||
|
|
||||||
ss_dassert(csdata->res.data != NULL);
|
ss_dassert(csdata->res.data != NULL);
|
||||||
|
|
||||||
@ -954,7 +1059,9 @@ static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata, size_t offset)
|
|||||||
if (new_pkt)
|
if (new_pkt)
|
||||||
{
|
{
|
||||||
/* new_pkt will be freed by write routine */
|
/* new_pkt will be freed by write routine */
|
||||||
rv = csdata->up.clientReply(csdata->up.instance, csdata->up.session, new_pkt);
|
rv = csdata->up.clientReply(csdata->up.instance,
|
||||||
|
csdata->up.session,
|
||||||
|
new_pkt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -972,3 +1079,153 @@ static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata, size_t offset)
|
|||||||
|
|
||||||
return rv;
|
return rv;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send OK packet data upstream.
|
||||||
|
*
|
||||||
|
* @param csdata Session data
|
||||||
|
*
|
||||||
|
* @return Non-Zero if successful, 0 on errors
|
||||||
|
*/
|
||||||
|
static int send_ok_upstream(MAXROWS_SESSION_DATA *csdata)
|
||||||
|
{
|
||||||
|
/* Note: sequence id is always 01 (4th byte) */
|
||||||
|
const static uint8_t ok[MYSQL_OK_PACKET_MIN_LEN] = { 07, 00, 00, 01, 00, 00,
|
||||||
|
00, 02, 00, 00, 00 };
|
||||||
|
|
||||||
|
ss_dassert(csdata->res.data != NULL);
|
||||||
|
|
||||||
|
GWBUF *packet = gwbuf_alloc(MYSQL_OK_PACKET_MIN_LEN);
|
||||||
|
if(!packet)
|
||||||
|
{
|
||||||
|
/* Abort clienrt connection */
|
||||||
|
poll_fake_hangup_event(csdata->session->client_dcb);
|
||||||
|
gwbuf_free(csdata->res.data);
|
||||||
|
csdata->res.data = NULL;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t *ptr = GWBUF_DATA(packet);
|
||||||
|
memcpy(ptr, &ok, MYSQL_OK_PACKET_MIN_LEN);
|
||||||
|
|
||||||
|
ss_dassert(csdata->res.data != NULL);
|
||||||
|
|
||||||
|
int rv = csdata->up.clientReply(csdata->up.instance,
|
||||||
|
csdata->up.session,
|
||||||
|
packet);
|
||||||
|
gwbuf_free(csdata->res.data);
|
||||||
|
csdata->res.data = NULL;
|
||||||
|
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send ERR packet data upstream.
|
||||||
|
*
|
||||||
|
* An error packet is sent to client including
|
||||||
|
* a message prefix plus the original SQL input
|
||||||
|
*
|
||||||
|
* @param csdata Session data
|
||||||
|
* @return Non-Zero if successful, 0 on errors
|
||||||
|
*/
|
||||||
|
static int send_error_upstream(MAXROWS_SESSION_DATA *csdata)
|
||||||
|
{
|
||||||
|
GWBUF *err_pkt;
|
||||||
|
uint8_t hdr_err[MYSQL_ERR_PACKET_MIN_LEN];
|
||||||
|
unsigned long bytes_copied;
|
||||||
|
char *err_msg_prefix = "Row limit/size exceeded for query: ";
|
||||||
|
int err_prefix_len = strlen(err_msg_prefix);
|
||||||
|
unsigned long pkt_len = MYSQL_ERR_PACKET_MIN_LEN + err_prefix_len;
|
||||||
|
unsigned long sql_len = gwbuf_length(csdata->input_sql) -
|
||||||
|
(MYSQL_HEADER_LEN + 1);
|
||||||
|
/**
|
||||||
|
* The input SQL statement added in the error message
|
||||||
|
* has a limit of MAXROWS_INPUT_SQL_MAX_LEN bytes
|
||||||
|
*/
|
||||||
|
sql_len = (sql_len > MAXROWS_INPUT_SQL_MAX_LEN) ?
|
||||||
|
MAXROWS_INPUT_SQL_MAX_LEN : sql_len;
|
||||||
|
uint8_t sql[sql_len];
|
||||||
|
|
||||||
|
ss_dassert(csdata->res.data != NULL);
|
||||||
|
|
||||||
|
pkt_len += sql_len;
|
||||||
|
|
||||||
|
bytes_copied = gwbuf_copy_data(csdata->input_sql,
|
||||||
|
MYSQL_HEADER_LEN + 1,
|
||||||
|
sql_len,
|
||||||
|
sql);
|
||||||
|
|
||||||
|
if (!bytes_copied ||
|
||||||
|
(err_pkt = gwbuf_alloc(MYSQL_HEADER_LEN + pkt_len)) == NULL)
|
||||||
|
{
|
||||||
|
/* Abort client connection */
|
||||||
|
poll_fake_hangup_event(csdata->session->client_dcb);
|
||||||
|
gwbuf_free(csdata->res.data);
|
||||||
|
gwbuf_free(csdata->input_sql);
|
||||||
|
csdata->res.data = NULL;
|
||||||
|
csdata->input_sql = NULL;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t *ptr = GWBUF_DATA(err_pkt);
|
||||||
|
memcpy(ptr, &hdr_err, MYSQL_ERR_PACKET_MIN_LEN);
|
||||||
|
unsigned int err_errno = 1415;
|
||||||
|
char err_state[7] = "#0A000";
|
||||||
|
|
||||||
|
/* Set the payload length of the whole error message */
|
||||||
|
gw_mysql_set_byte3(&ptr[0], pkt_len);
|
||||||
|
/* Note: sequence id is always 01 (4th byte) */
|
||||||
|
ptr[3] = 1;
|
||||||
|
/* Error indicator */
|
||||||
|
ptr[4] = 0xff;
|
||||||
|
/* MySQL error code: 2 bytes */
|
||||||
|
gw_mysql_set_byte2(&ptr[5], err_errno);
|
||||||
|
/* Status Message 6 bytes */
|
||||||
|
memcpy((char *)&ptr[7], err_state, 6);
|
||||||
|
/* Copy error message prefix */
|
||||||
|
memcpy(&ptr[13], err_msg_prefix, err_prefix_len);
|
||||||
|
/* Copy SQL input */
|
||||||
|
memcpy(&ptr[13 + err_prefix_len], sql, sql_len);
|
||||||
|
|
||||||
|
int rv = csdata->up.clientReply(csdata->up.instance,
|
||||||
|
csdata->up.session,
|
||||||
|
err_pkt);
|
||||||
|
|
||||||
|
/* Free server result buffer */
|
||||||
|
gwbuf_free(csdata->res.data);
|
||||||
|
/* Free input_sql buffer */
|
||||||
|
gwbuf_free(csdata->input_sql);
|
||||||
|
|
||||||
|
csdata->res.data = NULL;
|
||||||
|
csdata->input_sql = NULL;
|
||||||
|
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send the proper reply to client when the maxrows
|
||||||
|
* limit/size is hit.
|
||||||
|
*
|
||||||
|
* @param csdata Session data
|
||||||
|
* @return Non-Zero if successful, 0 on errors
|
||||||
|
*/
|
||||||
|
static int send_maxrows_reply_limit(MAXROWS_SESSION_DATA *csdata)
|
||||||
|
{
|
||||||
|
switch(csdata->instance->config.m_return)
|
||||||
|
{
|
||||||
|
case MAXROWS_RETURN_EMPTY:
|
||||||
|
return send_eof_upstream(csdata);
|
||||||
|
break;
|
||||||
|
case MAXROWS_RETURN_OK:
|
||||||
|
return send_ok_upstream(csdata);
|
||||||
|
break;
|
||||||
|
case MAXROWS_RETURN_ERR:
|
||||||
|
return send_error_upstream(csdata);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
MXS_ERROR("MaxRows config value not expected!");
|
||||||
|
ss_dassert(!true);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -36,5 +36,7 @@ MXS_BEGIN_DECLS
|
|||||||
#define MAXROWS_DEFAULT_MAX_RESULTSET_SIZE "65536"
|
#define MAXROWS_DEFAULT_MAX_RESULTSET_SIZE "65536"
|
||||||
// Integer value
|
// Integer value
|
||||||
#define MAXROWS_DEFAULT_DEBUG "0"
|
#define MAXROWS_DEFAULT_DEBUG "0"
|
||||||
|
// Max size of copied input SQL
|
||||||
|
#define MAXROWS_INPUT_SQL_MAX_LEN 1024
|
||||||
|
|
||||||
MXS_END_DECLS
|
MXS_END_DECLS
|
||||||
|
Reference in New Issue
Block a user