Addition of MAXROWS_DISCARDING_RESPONSE
Addition of MAXROWS_DISCARDING_RESPONSE Packets received after sent OK are consumed and not sent
This commit is contained in:
@ -132,6 +132,7 @@ typedef enum maxrows_session_state
|
|||||||
MAXROWS_EXPECTING_ROWS, // A select has been sent, and we want more rows.
|
MAXROWS_EXPECTING_ROWS, // A select has been sent, and we want more rows.
|
||||||
MAXROWS_EXPECTING_NOTHING, // We are not expecting anything from the server.
|
MAXROWS_EXPECTING_NOTHING, // We are not expecting anything from the server.
|
||||||
MAXROWS_IGNORING_RESPONSE, // We are not interested in the data received from the server.
|
MAXROWS_IGNORING_RESPONSE, // We are not interested in the data received from the server.
|
||||||
|
MAXROWS_DISCARDING_RESPONSE, // We have returned empty result set and we discard any new data.
|
||||||
} maxrows_session_state_t;
|
} maxrows_session_state_t;
|
||||||
|
|
||||||
typedef struct maxrows_response_state
|
typedef struct maxrows_response_state
|
||||||
@ -145,9 +146,9 @@ typedef struct maxrows_response_state
|
|||||||
|
|
||||||
static void maxrows_response_state_reset(MAXROWS_RESPONSE_STATE *state);
|
static void maxrows_response_state_reset(MAXROWS_RESPONSE_STATE *state);
|
||||||
|
|
||||||
typedef struct cache_session_data
|
typedef struct maxrows_session_data
|
||||||
{
|
{
|
||||||
MAXROWS_INSTANCE *instance; /**< The cache instance the session is associated with. */
|
MAXROWS_INSTANCE *instance; /**< The maxrows instance the session is associated with. */
|
||||||
DOWNSTREAM down; /**< The previous filter or equivalent. */
|
DOWNSTREAM down; /**< The previous filter or equivalent. */
|
||||||
UPSTREAM up; /**< The next filter or equivalent. */
|
UPSTREAM up; /**< The next filter or equivalent. */
|
||||||
MAXROWS_RESPONSE_STATE res; /**< The response state. */
|
MAXROWS_RESPONSE_STATE res; /**< The response state. */
|
||||||
@ -169,11 +170,12 @@ static bool process_params(char **options, FILTER_PARAMETER **params, MAXROWS_CO
|
|||||||
|
|
||||||
static int send_upstream(MAXROWS_SESSION_DATA *csdata);
|
static int send_upstream(MAXROWS_SESSION_DATA *csdata);
|
||||||
static int send_ok_upstream(MAXROWS_SESSION_DATA *csdata);
|
static int send_ok_upstream(MAXROWS_SESSION_DATA *csdata);
|
||||||
|
static int handle_discarding_response(MAXROWS_SESSION_DATA *csdata);
|
||||||
|
|
||||||
/* API BEGIN */
|
/* API BEGIN */
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create an instance of the cache filter for a particular service
|
* Create an instance of the maxrows filter for a particular service
|
||||||
* within MaxScale.
|
* within MaxScale.
|
||||||
*
|
*
|
||||||
* @param name The name of the instance (as defined in the config file).
|
* @param name The name of the instance (as defined in the config file).
|
||||||
@ -203,7 +205,7 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
|
|||||||
/**
|
/**
|
||||||
* Associate a new session with this instance of the filter.
|
* Associate a new session with this instance of the filter.
|
||||||
*
|
*
|
||||||
* @param instance The cache instance data
|
* @param instance The maxrows instance data
|
||||||
* @param session The session itself
|
* @param session The session itself
|
||||||
*
|
*
|
||||||
* @return Session specific data for this session
|
* @return Session specific data for this session
|
||||||
@ -219,7 +221,7 @@ static void *newSession(FILTER *instance, SESSION *session)
|
|||||||
/**
|
/**
|
||||||
* A session has been closed.
|
* A session has been closed.
|
||||||
*
|
*
|
||||||
* @param instance The cache instance data
|
* @param instance The maxrows instance data
|
||||||
* @param sdata The session data of the session being closed
|
* @param sdata The session data of the session being closed
|
||||||
*/
|
*/
|
||||||
static void closeSession(FILTER *instance, void *sdata)
|
static void closeSession(FILTER *instance, void *sdata)
|
||||||
@ -231,7 +233,7 @@ static void closeSession(FILTER *instance, void *sdata)
|
|||||||
/**
|
/**
|
||||||
* Free the session data.
|
* Free the session data.
|
||||||
*
|
*
|
||||||
* @param instance The cache instance data
|
* @param instance The maxrows instance data
|
||||||
* @param sdata The session data of the session being closed
|
* @param sdata The session data of the session being closed
|
||||||
*/
|
*/
|
||||||
static void freeSession(FILTER *instance, void *sdata)
|
static void freeSession(FILTER *instance, void *sdata)
|
||||||
@ -245,7 +247,7 @@ static void freeSession(FILTER *instance, void *sdata)
|
|||||||
/**
|
/**
|
||||||
* Set the downstream component for this filter.
|
* Set the downstream component for this filter.
|
||||||
*
|
*
|
||||||
* @param instance The cache instance data
|
* @param instance The maxrowsinstance data
|
||||||
* @param sdata The session data of the session
|
* @param sdata The session data of the session
|
||||||
* @param down The downstream filter or router
|
* @param down The downstream filter or router
|
||||||
*/
|
*/
|
||||||
@ -260,7 +262,7 @@ static void setDownstream(FILTER *instance, void *sdata, DOWNSTREAM *down)
|
|||||||
/**
|
/**
|
||||||
* Set the upstream component for this filter.
|
* Set the upstream component for this filter.
|
||||||
*
|
*
|
||||||
* @param instance The cache instance data
|
* @param instance The maxrows instance data
|
||||||
* @param sdata The session data of the session
|
* @param sdata The session data of the session
|
||||||
* @param up The upstream filter or router
|
* @param up The upstream filter or router
|
||||||
*/
|
*/
|
||||||
@ -320,7 +322,7 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *packet)
|
|||||||
|
|
||||||
if (use_default)
|
if (use_default)
|
||||||
{
|
{
|
||||||
C_DEBUG("Maxrows filter is sends data.");
|
C_DEBUG("Maxrows filter is sending data.");
|
||||||
rv = csdata->down.routeQuery(csdata->down.instance, csdata->down.session, packet);
|
rv = csdata->down.routeQuery(csdata->down.instance, csdata->down.session, packet);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -355,11 +357,11 @@ static int clientReply(FILTER *instance, void *sdata, GWBUF *data)
|
|||||||
if (gwbuf_length(csdata->res.data) > csdata->instance->config.max_resultset_size)
|
if (gwbuf_length(csdata->res.data) > csdata->instance->config.max_resultset_size)
|
||||||
{
|
{
|
||||||
C_DEBUG("Current size %uB of resultset, at least as much "
|
C_DEBUG("Current size %uB of resultset, at least as much "
|
||||||
"as maximum allowed size %uKiB. Not caching.",
|
"as maximum allowed size %uKiB. Not returning data.",
|
||||||
gwbuf_length(csdata->res.data),
|
gwbuf_length(csdata->res.data),
|
||||||
csdata->instance->config.max_resultset_size / 1024);
|
csdata->instance->config.max_resultset_size / 1024);
|
||||||
|
|
||||||
csdata->state = MAXROWS_IGNORING_RESPONSE;
|
csdata->state = MAXROWS_DISCARDING_RESPONSE;
|
||||||
|
|
||||||
/* Send empty result set */
|
/* Send empty result set */
|
||||||
return send_ok_upstream(csdata);
|
return send_ok_upstream(csdata);
|
||||||
@ -388,6 +390,10 @@ static int clientReply(FILTER *instance, void *sdata, GWBUF *data)
|
|||||||
rv = handle_ignoring_response(csdata);
|
rv = handle_ignoring_response(csdata);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case MAXROWS_DISCARDING_RESPONSE:
|
||||||
|
rv = handle_discarding_response(csdata);
|
||||||
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
MXS_ERROR("Internal filter logic broken, unexpected state: %d", csdata->state);
|
MXS_ERROR("Internal filter logic broken, unexpected state: %d", csdata->state);
|
||||||
ss_dassert(!true);
|
ss_dassert(!true);
|
||||||
@ -431,7 +437,7 @@ static uint64_t getCapabilities(void)
|
|||||||
/* API END */
|
/* API END */
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reset cache response state
|
* Reset maxrows response state
|
||||||
*
|
*
|
||||||
* @param state Pointer to object.
|
* @param state Pointer to object.
|
||||||
*/
|
*/
|
||||||
@ -445,9 +451,9 @@ static void maxrows_response_state_reset(MAXROWS_RESPONSE_STATE *state)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create cache session data
|
* Create maxrows session data
|
||||||
*
|
*
|
||||||
* @param instance The cache instance this data is associated with.
|
* @param instance The maxrows instance this data is associated with.
|
||||||
*
|
*
|
||||||
* @return Session data or NULL if creation fails.
|
* @return Session data or NULL if creation fails.
|
||||||
*/
|
*/
|
||||||
@ -487,9 +493,9 @@ static MAXROWS_SESSION_DATA *maxrows_session_data_create(MAXROWS_INSTANCE *insta
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Free cache session data.
|
* Free maxrows session data.
|
||||||
*
|
*
|
||||||
* @param A cache session data previously allocated using session_data_create().
|
* @param A maxrows session data previously allocated using session_data_create().
|
||||||
*/
|
*/
|
||||||
static void maxrows_session_data_free(MAXROWS_SESSION_DATA* data)
|
static void maxrows_session_data_free(MAXROWS_SESSION_DATA* data)
|
||||||
{
|
{
|
||||||
@ -507,7 +513,7 @@ static void maxrows_session_data_free(MAXROWS_SESSION_DATA* data)
|
|||||||
/**
|
/**
|
||||||
* Called when resultset field information is handled.
|
* Called when resultset field information is handled.
|
||||||
*
|
*
|
||||||
* @param csdata The cache session data.
|
* @param csdata The maxrows session data.
|
||||||
*/
|
*/
|
||||||
static int handle_expecting_fields(MAXROWS_SESSION_DATA *csdata)
|
static int handle_expecting_fields(MAXROWS_SESSION_DATA *csdata)
|
||||||
{
|
{
|
||||||
@ -560,7 +566,7 @@ static int handle_expecting_fields(MAXROWS_SESSION_DATA *csdata)
|
|||||||
/**
|
/**
|
||||||
* Called when data is received (even if nothing is expected) from the server.
|
* Called when data is received (even if nothing is expected) from the server.
|
||||||
*
|
*
|
||||||
* @param csdata The cache session data.
|
* @param csdata The maxrows session data.
|
||||||
*/
|
*/
|
||||||
static int handle_expecting_nothing(MAXROWS_SESSION_DATA *csdata)
|
static int handle_expecting_nothing(MAXROWS_SESSION_DATA *csdata)
|
||||||
{
|
{
|
||||||
@ -575,7 +581,7 @@ static int handle_expecting_nothing(MAXROWS_SESSION_DATA *csdata)
|
|||||||
/**
|
/**
|
||||||
* Called when a response is received from the server.
|
* Called when a response is received from the server.
|
||||||
*
|
*
|
||||||
* @param csdata The cache session data.
|
* @param csdata The maxrows session data.
|
||||||
*/
|
*/
|
||||||
static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata)
|
static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata)
|
||||||
{
|
{
|
||||||
@ -651,7 +657,7 @@ static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata)
|
|||||||
/**
|
/**
|
||||||
* Called when resultset rows are handled.
|
* Called when resultset rows are handled.
|
||||||
*
|
*
|
||||||
* @param csdata The cache session data.
|
* @param csdata The maxrows session data.
|
||||||
*/
|
*/
|
||||||
static int handle_expecting_rows(MAXROWS_SESSION_DATA *csdata)
|
static int handle_expecting_rows(MAXROWS_SESSION_DATA *csdata)
|
||||||
{
|
{
|
||||||
@ -682,11 +688,7 @@ static int handle_expecting_rows(MAXROWS_SESSION_DATA *csdata)
|
|||||||
csdata->res.offset += packetlen;
|
csdata->res.offset += packetlen;
|
||||||
ss_dassert(csdata->res.offset == buflen);
|
ss_dassert(csdata->res.offset == buflen);
|
||||||
|
|
||||||
/* Send data only if number of rows is below the limit */
|
rv = send_upstream(csdata);
|
||||||
if (csdata->state != MAXROWS_IGNORING_RESPONSE)
|
|
||||||
{
|
|
||||||
rv = send_upstream(csdata);
|
|
||||||
}
|
|
||||||
|
|
||||||
csdata->state = MAXROWS_EXPECTING_NOTHING;
|
csdata->state = MAXROWS_EXPECTING_NOTHING;
|
||||||
break;
|
break;
|
||||||
@ -698,11 +700,16 @@ static int handle_expecting_rows(MAXROWS_SESSION_DATA *csdata)
|
|||||||
|
|
||||||
if (csdata->res.n_rows > csdata->instance->config.max_resultset_rows)
|
if (csdata->res.n_rows > csdata->instance->config.max_resultset_rows)
|
||||||
{
|
{
|
||||||
C_DEBUG("Max rows %lu reached, not caching result.", csdata->res.n_rows);
|
C_DEBUG("max_resultset_rows %lu reached, not returning the result.", csdata->res.n_rows);
|
||||||
|
|
||||||
/* Just return 0 result set */
|
/* Just return 0 result set */
|
||||||
rv = send_ok_upstream(csdata);
|
rv = send_ok_upstream(csdata);
|
||||||
csdata->res.offset = buflen; // To abort the loop.
|
csdata->res.offset = buflen; // To abort the loop.
|
||||||
csdata->state = MAXROWS_IGNORING_RESPONSE;
|
|
||||||
|
/* If additional packets from server will come
|
||||||
|
* they will be "consumed" and not sent to client
|
||||||
|
*/
|
||||||
|
csdata->state = MAXROWS_DISCARDING_RESPONSE;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -720,7 +727,7 @@ static int handle_expecting_rows(MAXROWS_SESSION_DATA *csdata)
|
|||||||
/**
|
/**
|
||||||
* Called when all data from the server is ignored.
|
* Called when all data from the server is ignored.
|
||||||
*
|
*
|
||||||
* @param csdata The cache session data.
|
* @param csdata The maxrows session data.
|
||||||
*/
|
*/
|
||||||
static int handle_ignoring_response(MAXROWS_SESSION_DATA *csdata)
|
static int handle_ignoring_response(MAXROWS_SESSION_DATA *csdata)
|
||||||
{
|
{
|
||||||
@ -731,7 +738,7 @@ static int handle_ignoring_response(MAXROWS_SESSION_DATA *csdata)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Processes the cache params
|
* Processes the maxrows params
|
||||||
*
|
*
|
||||||
* @param options Options as passed to the filter.
|
* @param options Options as passed to the filter.
|
||||||
* @param params Parameters as passed to the filter.
|
* @param params Parameters as passed to the filter.
|
||||||
@ -847,3 +854,21 @@ static int send_ok_upstream(MAXROWS_SESSION_DATA *csdata)
|
|||||||
return rv;
|
return rv;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when all data from the server should be consumed and not sent.
|
||||||
|
*
|
||||||
|
* @param csdata The maxrows session data.
|
||||||
|
*/
|
||||||
|
static int handle_discarding_response(MAXROWS_SESSION_DATA *csdata)
|
||||||
|
{
|
||||||
|
ss_dassert(csdata->state == MAXROWS_DISCARDING_RESPONSE);
|
||||||
|
ss_dassert(csdata->res.data);
|
||||||
|
|
||||||
|
gwbuf_free(csdata->res.data);
|
||||||
|
csdata->res.data = NULL;
|
||||||
|
|
||||||
|
MXS_INFO("maxrows filter is receiving response packets from the backend in 'discarding_response' state.");
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user