|
|
|
|
@ -46,17 +46,47 @@
|
|
|
|
|
#include <maxscale/debug.h>
|
|
|
|
|
#include "maxrows.h"
|
|
|
|
|
|
|
|
|
|
static MXS_FILTER *createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *);
|
|
|
|
|
static MXS_FILTER_SESSION *newSession(MXS_FILTER *instance, MXS_SESSION *session);
|
|
|
|
|
static void closeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata);
|
|
|
|
|
static void freeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata);
|
|
|
|
|
static void setDownstream(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, MXS_DOWNSTREAM *downstream);
|
|
|
|
|
static void setUpstream(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, MXS_UPSTREAM *upstream);
|
|
|
|
|
static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *queue);
|
|
|
|
|
static int clientReply(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *queue);
|
|
|
|
|
static void diagnostics(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, DCB *dcb);
|
|
|
|
|
static MXS_FILTER *createInstance(const char *name,
|
|
|
|
|
char **options,
|
|
|
|
|
MXS_CONFIG_PARAMETER *);
|
|
|
|
|
static MXS_FILTER_SESSION *newSession(MXS_FILTER *instance,
|
|
|
|
|
MXS_SESSION *session);
|
|
|
|
|
static void closeSession(MXS_FILTER *instance,
|
|
|
|
|
MXS_FILTER_SESSION *sdata);
|
|
|
|
|
static void freeSession(MXS_FILTER *instance,
|
|
|
|
|
MXS_FILTER_SESSION *sdata);
|
|
|
|
|
static void setDownstream(MXS_FILTER *instance,
|
|
|
|
|
MXS_FILTER_SESSION *sdata,
|
|
|
|
|
MXS_DOWNSTREAM *downstream);
|
|
|
|
|
static void setUpstream(MXS_FILTER *instance,
|
|
|
|
|
MXS_FILTER_SESSION *sdata,
|
|
|
|
|
MXS_UPSTREAM *upstream);
|
|
|
|
|
static int routeQuery(MXS_FILTER *instance,
|
|
|
|
|
MXS_FILTER_SESSION *sdata,
|
|
|
|
|
GWBUF *queue);
|
|
|
|
|
static int clientReply(MXS_FILTER *instance,
|
|
|
|
|
MXS_FILTER_SESSION *sdata,
|
|
|
|
|
GWBUF *queue);
|
|
|
|
|
static void diagnostics(MXS_FILTER *instance,
|
|
|
|
|
MXS_FILTER_SESSION *sdata,
|
|
|
|
|
DCB *dcb);
|
|
|
|
|
static uint64_t getCapabilities(MXS_FILTER *instance);
|
|
|
|
|
|
|
|
|
|
enum maxrows_return_mode
|
|
|
|
|
{
|
|
|
|
|
MAXROWS_RETURN_EMPTY = 0,
|
|
|
|
|
MAXROWS_RETURN_ERR,
|
|
|
|
|
MAXROWS_RETURN_OK
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
static const MXS_ENUM_VALUE return_option_values[] =
|
|
|
|
|
{
|
|
|
|
|
{"empty", MAXROWS_RETURN_EMPTY},
|
|
|
|
|
{"error", MAXROWS_RETURN_ERR},
|
|
|
|
|
{"ok", MAXROWS_RETURN_OK},
|
|
|
|
|
{NULL}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/* Global symbols of the Module */
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
@ -102,7 +132,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"max_resultset_size",
|
|
|
|
|
MXS_MODULE_PARAM_COUNT,
|
|
|
|
|
MXS_MODULE_PARAM_SIZE,
|
|
|
|
|
MAXROWS_DEFAULT_MAX_RESULTSET_SIZE
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
@ -110,6 +140,13 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
|
|
|
|
MXS_MODULE_PARAM_COUNT,
|
|
|
|
|
MAXROWS_DEFAULT_DEBUG
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"max_resultset_return",
|
|
|
|
|
MXS_MODULE_PARAM_ENUM,
|
|
|
|
|
"empty",
|
|
|
|
|
MXS_MODULE_OPT_ENUM_UNIQUE,
|
|
|
|
|
return_option_values
|
|
|
|
|
},
|
|
|
|
|
{MXS_END_MODULE_PARAMS}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
@ -121,9 +158,10 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
|
|
|
|
|
|
|
|
|
typedef struct maxrows_config
|
|
|
|
|
{
|
|
|
|
|
uint32_t max_resultset_rows;
|
|
|
|
|
uint32_t max_resultset_size;
|
|
|
|
|
uint32_t debug;
|
|
|
|
|
uint32_t max_resultset_rows;
|
|
|
|
|
uint32_t max_resultset_size;
|
|
|
|
|
uint32_t debug;
|
|
|
|
|
enum maxrows_return_mode m_return;
|
|
|
|
|
} MAXROWS_CONFIG;
|
|
|
|
|
|
|
|
|
|
typedef struct maxrows_instance
|
|
|
|
|
@ -155,7 +193,7 @@ static void maxrows_response_state_reset(MAXROWS_RESPONSE_STATE *state);
|
|
|
|
|
|
|
|
|
|
typedef struct maxrows_session_data
|
|
|
|
|
{
|
|
|
|
|
MAXROWS_INSTANCE *instance; /**< The maxrows instance the session is associated with. */
|
|
|
|
|
MAXROWS_INSTANCE *instance; /**< The maxrows instance the session is associated with. */
|
|
|
|
|
MXS_DOWNSTREAM down; /**< The previous filter or equivalent. */
|
|
|
|
|
MXS_UPSTREAM up; /**< The next filter or equivalent. */
|
|
|
|
|
MAXROWS_RESPONSE_STATE res; /**< The response state. */
|
|
|
|
|
@ -163,9 +201,11 @@ typedef struct maxrows_session_data
|
|
|
|
|
maxrows_session_state_t state;
|
|
|
|
|
bool large_packet; /**< Large packet (> 16MB)) indicator */
|
|
|
|
|
bool discard_resultset; /**< Discard resultset indicator */
|
|
|
|
|
GWBUF *input_sql; /**< Input query */
|
|
|
|
|
} MAXROWS_SESSION_DATA;
|
|
|
|
|
|
|
|
|
|
static MAXROWS_SESSION_DATA *maxrows_session_data_create(MAXROWS_INSTANCE *instance, MXS_SESSION *session);
|
|
|
|
|
static MAXROWS_SESSION_DATA *maxrows_session_data_create(MAXROWS_INSTANCE *instance,
|
|
|
|
|
MXS_SESSION *session);
|
|
|
|
|
static void maxrows_session_data_free(MAXROWS_SESSION_DATA *data);
|
|
|
|
|
|
|
|
|
|
static int handle_expecting_fields(MAXROWS_SESSION_DATA *csdata);
|
|
|
|
|
@ -173,10 +213,14 @@ static int handle_expecting_nothing(MAXROWS_SESSION_DATA *csdata);
|
|
|
|
|
static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata);
|
|
|
|
|
static int handle_rows(MAXROWS_SESSION_DATA *csdata);
|
|
|
|
|
static int handle_ignoring_response(MAXROWS_SESSION_DATA *csdata);
|
|
|
|
|
static bool process_params(char **options, MXS_CONFIG_PARAMETER *params, MAXROWS_CONFIG* config);
|
|
|
|
|
static bool process_params(char **options,
|
|
|
|
|
MXS_CONFIG_PARAMETER *params,
|
|
|
|
|
MAXROWS_CONFIG* config);
|
|
|
|
|
|
|
|
|
|
static int send_upstream(MAXROWS_SESSION_DATA *csdata);
|
|
|
|
|
static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata, size_t offset);
|
|
|
|
|
static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata);
|
|
|
|
|
static int send_error_upstream(MAXROWS_SESSION_DATA *csdata);
|
|
|
|
|
static int send_maxrows_reply_limit(MAXROWS_SESSION_DATA *csdata);
|
|
|
|
|
|
|
|
|
|
/* API BEGIN */
|
|
|
|
|
|
|
|
|
|
@ -190,15 +234,22 @@ static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata, size_t offset);
|
|
|
|
|
*
|
|
|
|
|
* @return The instance data for this new instance
|
|
|
|
|
*/
|
|
|
|
|
static MXS_FILTER *createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *params)
|
|
|
|
|
static MXS_FILTER *createInstance(const char *name,
|
|
|
|
|
char **options,
|
|
|
|
|
MXS_CONFIG_PARAMETER *params)
|
|
|
|
|
{
|
|
|
|
|
MAXROWS_INSTANCE *cinstance = MXS_CALLOC(1, sizeof(MAXROWS_INSTANCE));
|
|
|
|
|
|
|
|
|
|
if (cinstance)
|
|
|
|
|
{
|
|
|
|
|
cinstance->name = name;
|
|
|
|
|
cinstance->config.max_resultset_rows = config_get_integer(params, "max_resultset_rows");
|
|
|
|
|
cinstance->config.max_resultset_size = config_get_integer(params, "max_resultset_size");
|
|
|
|
|
cinstance->config.max_resultset_rows = config_get_integer(params,
|
|
|
|
|
"max_resultset_rows");
|
|
|
|
|
cinstance->config.max_resultset_size = config_get_size(params,
|
|
|
|
|
"max_resultset_size");
|
|
|
|
|
cinstance->config.m_return = config_get_enum(params,
|
|
|
|
|
"max_resultset_return",
|
|
|
|
|
return_option_values);
|
|
|
|
|
cinstance->config.debug = config_get_integer(params, "debug");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -284,7 +335,9 @@ static void setUpstream(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, MXS_UPS
|
|
|
|
|
* @param sdata The filter session data
|
|
|
|
|
* @param buffer Buffer containing an MySQL protocol packet.
|
|
|
|
|
*/
|
|
|
|
|
static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *packet)
|
|
|
|
|
static int routeQuery(MXS_FILTER *instance,
|
|
|
|
|
MXS_FILTER_SESSION *sdata,
|
|
|
|
|
GWBUF *packet)
|
|
|
|
|
{
|
|
|
|
|
MAXROWS_INSTANCE *cinstance = (MAXROWS_INSTANCE*)instance;
|
|
|
|
|
MAXROWS_SESSION_DATA *csdata = (MAXROWS_SESSION_DATA*)sdata;
|
|
|
|
|
@ -294,7 +347,8 @@ static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *pa
|
|
|
|
|
// All of these should be guaranteed by RCAP_TYPE_TRANSACTION_TRACKING
|
|
|
|
|
ss_dassert(GWBUF_IS_CONTIGUOUS(packet));
|
|
|
|
|
ss_dassert(GWBUF_LENGTH(packet) >= MYSQL_HEADER_LEN + 1);
|
|
|
|
|
ss_dassert(MYSQL_GET_PAYLOAD_LEN(data) + MYSQL_HEADER_LEN == GWBUF_LENGTH(packet));
|
|
|
|
|
ss_dassert(MYSQL_GET_PAYLOAD_LEN(data) +
|
|
|
|
|
MYSQL_HEADER_LEN == GWBUF_LENGTH(packet));
|
|
|
|
|
|
|
|
|
|
maxrows_response_state_reset(&csdata->res);
|
|
|
|
|
csdata->state = MAXROWS_IGNORING_RESPONSE;
|
|
|
|
|
@ -306,6 +360,23 @@ static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *pa
|
|
|
|
|
case MYSQL_COM_QUERY:
|
|
|
|
|
case MYSQL_COM_STMT_EXECUTE:
|
|
|
|
|
{
|
|
|
|
|
/* Set input query only with MAXROWS_RETURN_ERR */
|
|
|
|
|
if (csdata->instance->config.m_return == MAXROWS_RETURN_ERR &&
|
|
|
|
|
(csdata->input_sql = gwbuf_clone(packet)) == NULL)
|
|
|
|
|
{
|
|
|
|
|
csdata->state = MAXROWS_EXPECTING_NOTHING;
|
|
|
|
|
|
|
|
|
|
/* Abort client connection on copy failure */
|
|
|
|
|
poll_fake_hangup_event(csdata->session->client_dcb);
|
|
|
|
|
gwbuf_free(csdata->res.data);
|
|
|
|
|
gwbuf_free(packet);
|
|
|
|
|
MXS_FREE(csdata);
|
|
|
|
|
csdata->res.data = NULL;
|
|
|
|
|
packet = NULL;
|
|
|
|
|
csdata = NULL;
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
csdata->state = MAXROWS_EXPECTING_RESPONSE;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
@ -319,7 +390,9 @@ static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *pa
|
|
|
|
|
MXS_NOTICE("Maxrows filter is sending data.");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return csdata->down.routeQuery(csdata->down.instance, csdata->down.session, packet);
|
|
|
|
|
return csdata->down.routeQuery(csdata->down.instance,
|
|
|
|
|
csdata->down.session,
|
|
|
|
|
packet);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
@ -329,7 +402,9 @@ static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *pa
|
|
|
|
|
* @param sdata The filter session data
|
|
|
|
|
* @param queue The query data
|
|
|
|
|
*/
|
|
|
|
|
static int clientReply(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *data)
|
|
|
|
|
static int clientReply(MXS_FILTER *instance,
|
|
|
|
|
MXS_FILTER_SESSION *sdata,
|
|
|
|
|
GWBUF *data)
|
|
|
|
|
{
|
|
|
|
|
MAXROWS_INSTANCE *cinstance = (MAXROWS_INSTANCE*)instance;
|
|
|
|
|
MAXROWS_SESSION_DATA *csdata = (MAXROWS_SESSION_DATA*)sdata;
|
|
|
|
|
@ -387,7 +462,8 @@ static int clientReply(MXS_FILTER *instance, MXS_FILTER_SESSION *sdata, GWBUF *d
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
MXS_ERROR("Internal filter logic broken, unexpected state: %d", csdata->state);
|
|
|
|
|
MXS_ERROR("Internal filter logic broken, unexpected state: %d",
|
|
|
|
|
csdata->state);
|
|
|
|
|
ss_dassert(!true);
|
|
|
|
|
rv = send_upstream(csdata);
|
|
|
|
|
maxrows_response_state_reset(&csdata->res);
|
|
|
|
|
@ -463,6 +539,7 @@ static MAXROWS_SESSION_DATA *maxrows_session_data_create(MAXROWS_INSTANCE *insta
|
|
|
|
|
MYSQL_session *mysql_session = (MYSQL_session*)session->client_dcb->data;
|
|
|
|
|
data->instance = instance;
|
|
|
|
|
data->session = session;
|
|
|
|
|
data->input_sql = NULL;
|
|
|
|
|
data->state = MAXROWS_EXPECTING_NOTHING;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -501,7 +578,10 @@ static int handle_expecting_fields(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
while (!insufficient && (buflen - csdata->res.offset >= MYSQL_HEADER_LEN))
|
|
|
|
|
{
|
|
|
|
|
uint8_t header[MYSQL_HEADER_LEN + 1];
|
|
|
|
|
gwbuf_copy_data(csdata->res.data, csdata->res.offset, MYSQL_HEADER_LEN + 1, header);
|
|
|
|
|
gwbuf_copy_data(csdata->res.data,
|
|
|
|
|
csdata->res.offset,
|
|
|
|
|
MYSQL_HEADER_LEN + 1,
|
|
|
|
|
header);
|
|
|
|
|
|
|
|
|
|
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PAYLOAD_LEN(header);
|
|
|
|
|
|
|
|
|
|
@ -585,7 +665,10 @@ static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
uint8_t header[MYSQL_HEADER_LEN + 1 + 8];
|
|
|
|
|
|
|
|
|
|
// Read packet header from buffer at current offset
|
|
|
|
|
gwbuf_copy_data(csdata->res.data, csdata->res.offset, MYSQL_HEADER_LEN + 1, header);
|
|
|
|
|
gwbuf_copy_data(csdata->res.data,
|
|
|
|
|
csdata->res.offset,
|
|
|
|
|
MYSQL_HEADER_LEN + 1,
|
|
|
|
|
header);
|
|
|
|
|
|
|
|
|
|
switch ((int)MYSQL_GET_COMMAND(header))
|
|
|
|
|
{
|
|
|
|
|
@ -611,7 +694,7 @@ static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
|
|
|
|
|
if (csdata->discard_resultset)
|
|
|
|
|
{
|
|
|
|
|
rv = send_eof_upstream(csdata, csdata->res.rows_offset);
|
|
|
|
|
rv = send_maxrows_reply_limit(csdata);
|
|
|
|
|
csdata->state = MAXROWS_EXPECTING_NOTHING;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
@ -653,7 +736,9 @@ static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
// Now we can figure out how many fields there are, but first we
|
|
|
|
|
// need to copy some more data.
|
|
|
|
|
gwbuf_copy_data(csdata->res.data,
|
|
|
|
|
MYSQL_HEADER_LEN + 1, n_bytes - 1, &header[MYSQL_HEADER_LEN + 1]);
|
|
|
|
|
MYSQL_HEADER_LEN + 1,
|
|
|
|
|
n_bytes - 1,
|
|
|
|
|
&header[MYSQL_HEADER_LEN + 1]);
|
|
|
|
|
|
|
|
|
|
csdata->res.n_totalfields = mxs_leint_value(&header[4]);
|
|
|
|
|
csdata->res.offset += MYSQL_HEADER_LEN + n_bytes;
|
|
|
|
|
@ -692,7 +777,10 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
bool pending_large_data = csdata->large_packet;
|
|
|
|
|
// header array holds a full EOF packet
|
|
|
|
|
uint8_t header[MYSQL_EOF_PACKET_LEN];
|
|
|
|
|
gwbuf_copy_data(csdata->res.data, csdata->res.offset, MYSQL_EOF_PACKET_LEN, header);
|
|
|
|
|
gwbuf_copy_data(csdata->res.data,
|
|
|
|
|
csdata->res.offset,
|
|
|
|
|
MYSQL_EOF_PACKET_LEN,
|
|
|
|
|
header);
|
|
|
|
|
|
|
|
|
|
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PAYLOAD_LEN(header);
|
|
|
|
|
|
|
|
|
|
@ -703,7 +791,9 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
* max is 1 byte less than EOF_PACKET_LEN
|
|
|
|
|
* If true skip data processing.
|
|
|
|
|
*/
|
|
|
|
|
if (pending_large_data && (packetlen >= MYSQL_HEADER_LEN && packetlen < MYSQL_EOF_PACKET_LEN))
|
|
|
|
|
if (pending_large_data &&
|
|
|
|
|
(packetlen >= MYSQL_HEADER_LEN &&
|
|
|
|
|
packetlen < MYSQL_EOF_PACKET_LEN))
|
|
|
|
|
{
|
|
|
|
|
// Update offset, number of rows and break
|
|
|
|
|
csdata->res.offset += packetlen;
|
|
|
|
|
@ -758,7 +848,7 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
// Send data in buffer or empty resultset
|
|
|
|
|
if (csdata->discard_resultset)
|
|
|
|
|
{
|
|
|
|
|
rv = send_eof_upstream(csdata, csdata->res.rows_offset);
|
|
|
|
|
rv = send_maxrows_reply_limit(csdata);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
@ -790,8 +880,10 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
*/
|
|
|
|
|
if (packetlen < MYSQL_EOF_PACKET_LEN)
|
|
|
|
|
{
|
|
|
|
|
MXS_ERROR("EOF packet has size of %lu instead of %d", packetlen, MYSQL_EOF_PACKET_LEN);
|
|
|
|
|
rv = send_eof_upstream(csdata, csdata->res.rows_offset);
|
|
|
|
|
MXS_ERROR("EOF packet has size of %lu instead of %d",
|
|
|
|
|
packetlen,
|
|
|
|
|
MYSQL_EOF_PACKET_LEN);
|
|
|
|
|
rv = send_maxrows_reply_limit(csdata);
|
|
|
|
|
csdata->state = MAXROWS_EXPECTING_NOTHING;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
@ -812,7 +904,7 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
// Discard data or send data
|
|
|
|
|
if (csdata->discard_resultset)
|
|
|
|
|
{
|
|
|
|
|
rv = send_eof_upstream(csdata, csdata->res.rows_offset);
|
|
|
|
|
rv = send_maxrows_reply_limit(csdata);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
@ -858,7 +950,8 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
{
|
|
|
|
|
if (csdata->instance->config.debug & MAXROWS_DEBUG_DISCARDING)
|
|
|
|
|
{
|
|
|
|
|
MXS_INFO("max_resultset_rows %lu reached, not returning the resultset.", csdata->res.n_rows);
|
|
|
|
|
MXS_INFO("max_resultset_rows %lu reached, not returning the resultset.",
|
|
|
|
|
csdata->res.n_rows);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Set the discard indicator
|
|
|
|
|
@ -902,7 +995,16 @@ static int send_upstream(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
{
|
|
|
|
|
ss_dassert(csdata->res.data != NULL);
|
|
|
|
|
|
|
|
|
|
int rv = csdata->up.clientReply(csdata->up.instance, csdata->up.session, csdata->res.data);
|
|
|
|
|
/* Free a saved SQL not freed by send_error_upstream() */
|
|
|
|
|
if (csdata->input_sql)
|
|
|
|
|
{
|
|
|
|
|
gwbuf_free(csdata->input_sql);
|
|
|
|
|
csdata->input_sql = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int rv = csdata->up.clientReply(csdata->up.instance,
|
|
|
|
|
csdata->up.session,
|
|
|
|
|
csdata->res.data);
|
|
|
|
|
csdata->res.data = NULL;
|
|
|
|
|
|
|
|
|
|
return rv;
|
|
|
|
|
@ -915,18 +1017,21 @@ static int send_upstream(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
* at the end.
|
|
|
|
|
*
|
|
|
|
|
* @param csdata Session data
|
|
|
|
|
* @param offset The offset to server reply pointing to
|
|
|
|
|
* next byte after column definitions EOF
|
|
|
|
|
* of the first result set.
|
|
|
|
|
*
|
|
|
|
|
* @return Whatever the upstream returns.
|
|
|
|
|
* @return Non-Zero if successful, 0 on errors
|
|
|
|
|
*/
|
|
|
|
|
static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata, size_t offset)
|
|
|
|
|
static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
{
|
|
|
|
|
int rv = -1;
|
|
|
|
|
/* Sequence byte is #3 */
|
|
|
|
|
uint8_t eof[MYSQL_EOF_PACKET_LEN] = {05, 00, 00, 01, 0xfe, 00, 00, 02, 00};
|
|
|
|
|
GWBUF *new_pkt = NULL;
|
|
|
|
|
/**
|
|
|
|
|
* The offset to server reply pointing to
|
|
|
|
|
* next byte after column definitions EOF
|
|
|
|
|
* of the first result set.
|
|
|
|
|
*/
|
|
|
|
|
size_t offset = csdata->res.rows_offset;
|
|
|
|
|
|
|
|
|
|
ss_dassert(csdata->res.data != NULL);
|
|
|
|
|
|
|
|
|
|
@ -955,7 +1060,9 @@ static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata, size_t offset)
|
|
|
|
|
if (new_pkt)
|
|
|
|
|
{
|
|
|
|
|
/* new_pkt will be freed by write routine */
|
|
|
|
|
rv = csdata->up.clientReply(csdata->up.instance, csdata->up.session, new_pkt);
|
|
|
|
|
rv = csdata->up.clientReply(csdata->up.instance,
|
|
|
|
|
csdata->up.session,
|
|
|
|
|
new_pkt);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -973,3 +1080,154 @@ static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata, size_t offset)
|
|
|
|
|
|
|
|
|
|
return rv;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Send OK packet data upstream.
|
|
|
|
|
*
|
|
|
|
|
* @param csdata Session data
|
|
|
|
|
*
|
|
|
|
|
* @return Non-Zero if successful, 0 on errors
|
|
|
|
|
*/
|
|
|
|
|
static int send_ok_upstream(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
{
|
|
|
|
|
/* Note: sequence id is always 01 (4th byte) */
|
|
|
|
|
const static uint8_t ok[MYSQL_OK_PACKET_MIN_LEN] = { 07, 00, 00, 01, 00, 00,
|
|
|
|
|
00, 02, 00, 00, 00 };
|
|
|
|
|
|
|
|
|
|
ss_dassert(csdata->res.data != NULL);
|
|
|
|
|
|
|
|
|
|
GWBUF *packet = gwbuf_alloc(MYSQL_OK_PACKET_MIN_LEN);
|
|
|
|
|
if(!packet)
|
|
|
|
|
{
|
|
|
|
|
/* Abort clienrt connection */
|
|
|
|
|
poll_fake_hangup_event(csdata->session->client_dcb);
|
|
|
|
|
gwbuf_free(csdata->res.data);
|
|
|
|
|
csdata->res.data = NULL;
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
uint8_t *ptr = GWBUF_DATA(packet);
|
|
|
|
|
memcpy(ptr, &ok, MYSQL_OK_PACKET_MIN_LEN);
|
|
|
|
|
|
|
|
|
|
ss_dassert(csdata->res.data != NULL);
|
|
|
|
|
|
|
|
|
|
int rv = csdata->up.clientReply(csdata->up.instance,
|
|
|
|
|
csdata->up.session,
|
|
|
|
|
packet);
|
|
|
|
|
gwbuf_free(csdata->res.data);
|
|
|
|
|
csdata->res.data = NULL;
|
|
|
|
|
|
|
|
|
|
return rv;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Send ERR packet data upstream.
|
|
|
|
|
*
|
|
|
|
|
* An error packet is sent to client including
|
|
|
|
|
* a message prefix plus the original SQL input
|
|
|
|
|
*
|
|
|
|
|
* @param csdata Session data
|
|
|
|
|
* @return Non-Zero if successful, 0 on errors
|
|
|
|
|
*/
|
|
|
|
|
static int send_error_upstream(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
{
|
|
|
|
|
GWBUF *err_pkt;
|
|
|
|
|
uint8_t hdr_err[MYSQL_ERR_PACKET_MIN_LEN];
|
|
|
|
|
unsigned long bytes_copied;
|
|
|
|
|
char *err_msg_prefix = "Row limit/size exceeded for query: ";
|
|
|
|
|
int err_prefix_len = strlen(err_msg_prefix);
|
|
|
|
|
unsigned long pkt_len = MYSQL_ERR_PACKET_MIN_LEN + err_prefix_len;
|
|
|
|
|
unsigned long sql_len = gwbuf_length(csdata->input_sql) -
|
|
|
|
|
(MYSQL_HEADER_LEN + 1);
|
|
|
|
|
/**
|
|
|
|
|
* The input SQL statement added in the error message
|
|
|
|
|
* has a limit of MAXROWS_INPUT_SQL_MAX_LEN bytes
|
|
|
|
|
*/
|
|
|
|
|
sql_len = (sql_len > MAXROWS_INPUT_SQL_MAX_LEN) ?
|
|
|
|
|
MAXROWS_INPUT_SQL_MAX_LEN : sql_len;
|
|
|
|
|
uint8_t sql[sql_len];
|
|
|
|
|
|
|
|
|
|
ss_dassert(csdata->res.data != NULL);
|
|
|
|
|
|
|
|
|
|
pkt_len += sql_len;
|
|
|
|
|
|
|
|
|
|
bytes_copied = gwbuf_copy_data(csdata->input_sql,
|
|
|
|
|
MYSQL_HEADER_LEN + 1,
|
|
|
|
|
sql_len,
|
|
|
|
|
sql);
|
|
|
|
|
|
|
|
|
|
if (!bytes_copied ||
|
|
|
|
|
(err_pkt = gwbuf_alloc(MYSQL_HEADER_LEN + pkt_len)) == NULL)
|
|
|
|
|
{
|
|
|
|
|
/* Abort client connection */
|
|
|
|
|
poll_fake_hangup_event(csdata->session->client_dcb);
|
|
|
|
|
gwbuf_free(csdata->res.data);
|
|
|
|
|
gwbuf_free(csdata->input_sql);
|
|
|
|
|
csdata->res.data = NULL;
|
|
|
|
|
csdata->input_sql = NULL;
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
uint8_t *ptr = GWBUF_DATA(err_pkt);
|
|
|
|
|
memcpy(ptr, &hdr_err, MYSQL_ERR_PACKET_MIN_LEN);
|
|
|
|
|
unsigned int err_errno = 1415;
|
|
|
|
|
char err_state[7] = "#0A000";
|
|
|
|
|
|
|
|
|
|
/* Set the payload length of the whole error message */
|
|
|
|
|
gw_mysql_set_byte3(&ptr[0], pkt_len);
|
|
|
|
|
/* Note: sequence id is always 01 (4th byte) */
|
|
|
|
|
ptr[3] = 1;
|
|
|
|
|
/* Error indicator */
|
|
|
|
|
ptr[4] = 0xff;
|
|
|
|
|
/* MySQL error code: 2 bytes */
|
|
|
|
|
gw_mysql_set_byte2(&ptr[5], err_errno);
|
|
|
|
|
/* Status Message 6 bytes */
|
|
|
|
|
memcpy((char *)&ptr[7], err_state, 6);
|
|
|
|
|
/* Copy error message prefix */
|
|
|
|
|
memcpy(&ptr[13], err_msg_prefix, err_prefix_len);
|
|
|
|
|
/* Copy SQL input */
|
|
|
|
|
memcpy(&ptr[13 + err_prefix_len], sql, sql_len);
|
|
|
|
|
|
|
|
|
|
int rv = csdata->up.clientReply(csdata->up.instance,
|
|
|
|
|
csdata->up.session,
|
|
|
|
|
err_pkt);
|
|
|
|
|
|
|
|
|
|
/* Free server result buffer */
|
|
|
|
|
gwbuf_free(csdata->res.data);
|
|
|
|
|
/* Free input_sql buffer */
|
|
|
|
|
gwbuf_free(csdata->input_sql);
|
|
|
|
|
|
|
|
|
|
csdata->res.data = NULL;
|
|
|
|
|
csdata->input_sql = NULL;
|
|
|
|
|
|
|
|
|
|
return rv;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Send the proper reply to client when the maxrows
|
|
|
|
|
* limit/size is hit.
|
|
|
|
|
*
|
|
|
|
|
* @param csdata Session data
|
|
|
|
|
* @return Non-Zero if successful, 0 on errors
|
|
|
|
|
*/
|
|
|
|
|
static int send_maxrows_reply_limit(MAXROWS_SESSION_DATA *csdata)
|
|
|
|
|
{
|
|
|
|
|
switch(csdata->instance->config.m_return)
|
|
|
|
|
{
|
|
|
|
|
case MAXROWS_RETURN_EMPTY:
|
|
|
|
|
return send_eof_upstream(csdata);
|
|
|
|
|
break;
|
|
|
|
|
case MAXROWS_RETURN_OK:
|
|
|
|
|
return send_ok_upstream(csdata);
|
|
|
|
|
break;
|
|
|
|
|
case MAXROWS_RETURN_ERR:
|
|
|
|
|
return send_error_upstream(csdata);
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
MXS_ERROR("MaxRows config value not expected!");
|
|
|
|
|
ss_dassert(!true);
|
|
|
|
|
return 0;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|