Merge branch '2.1' into develop
This commit is contained in:
@ -524,8 +524,28 @@ int CacheFilterSession::handle_expecting_nothing()
|
||||
{
|
||||
ss_dassert(m_state == CACHE_EXPECTING_NOTHING);
|
||||
ss_dassert(m_res.pData);
|
||||
MXS_ERROR("Received data from the backend althoug we were expecting nothing.");
|
||||
ss_dassert(!true);
|
||||
unsigned long msg_size = gwbuf_length(m_res.pData);
|
||||
|
||||
if ((int)MYSQL_GET_COMMAND(GWBUF_DATA(m_res.pData)) == 0xff)
|
||||
{
|
||||
/**
|
||||
* Error text message is after:
|
||||
* MYSQL_HEADER_LEN offset + status flag (1) + error code (2) +
|
||||
* 6 bytes message status = MYSQL_HEADER_LEN + 9
|
||||
*/
|
||||
MXS_INFO("Error packet received from backend "
|
||||
"(possibly a server shut down ?): [%.*s].",
|
||||
(int)msg_size - (MYSQL_HEADER_LEN + 9),
|
||||
GWBUF_DATA(m_res.pData) + MYSQL_HEADER_LEN + 9);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_WARNING("Received data from the backend although "
|
||||
"filter is expecting nothing. "
|
||||
"Packet size is %lu bytes long.",
|
||||
msg_size);
|
||||
ss_dassert(!true);
|
||||
}
|
||||
|
||||
return send_upstream();
|
||||
}
|
||||
|
@ -180,8 +180,8 @@ typedef struct maxrows_response_state
|
||||
size_t n_fields; /**< How many fields we have received, <= n_totalfields. */
|
||||
size_t n_rows; /**< How many rows we have received. */
|
||||
size_t offset; /**< Where we are in the response buffer. */
|
||||
size_t rows_offset; /**< Offset to first row in result set */
|
||||
size_t length; /**< Buffer size. */
|
||||
GWBUF* column_defs; /**< Buffer with result set columns definitions */
|
||||
} MAXROWS_RESPONSE_STATE;
|
||||
|
||||
static void maxrows_response_state_reset(MAXROWS_RESPONSE_STATE *state);
|
||||
@ -206,7 +206,7 @@ static void maxrows_session_data_free(MAXROWS_SESSION_DATA *data);
|
||||
static int handle_expecting_fields(MAXROWS_SESSION_DATA *csdata);
|
||||
static int handle_expecting_nothing(MAXROWS_SESSION_DATA *csdata);
|
||||
static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata);
|
||||
static int handle_rows(MAXROWS_SESSION_DATA *csdata);
|
||||
static int handle_rows(MAXROWS_SESSION_DATA *csdata, GWBUF* buffer, size_t extra_offset);
|
||||
static int handle_ignoring_response(MAXROWS_SESSION_DATA *csdata);
|
||||
static bool process_params(char **options,
|
||||
MXS_CONFIG_PARAMETER *params,
|
||||
@ -410,8 +410,19 @@ static int clientReply(MXS_FILTER *instance,
|
||||
|
||||
if (csdata->res.data)
|
||||
{
|
||||
gwbuf_append(csdata->res.data, data);
|
||||
csdata->res.length += gwbuf_length(data);
|
||||
if (csdata->discard_resultset &&
|
||||
csdata->state == MAXROWS_EXPECTING_ROWS)
|
||||
{
|
||||
gwbuf_free(csdata->res.data);
|
||||
csdata->res.data = data;
|
||||
csdata->res.length = gwbuf_length(data);
|
||||
csdata->res.offset = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
gwbuf_append(csdata->res.data, data);
|
||||
csdata->res.length += gwbuf_length(data);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -453,7 +464,7 @@ static int clientReply(MXS_FILTER *instance,
|
||||
break;
|
||||
|
||||
case MAXROWS_EXPECTING_ROWS:
|
||||
rv = handle_rows(csdata);
|
||||
rv = handle_rows(csdata, data, 0);
|
||||
break;
|
||||
|
||||
case MAXROWS_IGNORING_RESPONSE:
|
||||
@ -529,7 +540,7 @@ static void maxrows_response_state_reset(MAXROWS_RESPONSE_STATE *state)
|
||||
state->n_fields = 0;
|
||||
state->n_rows = 0;
|
||||
state->offset = 0;
|
||||
state->rows_offset = 0;
|
||||
state->column_defs = NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -608,16 +619,18 @@ static int handle_expecting_fields(MAXROWS_SESSION_DATA *csdata)
|
||||
case 0xfe: // EOF, the one after the fields.
|
||||
csdata->res.offset += packetlen;
|
||||
|
||||
/* Now set the offset to the first resultset
|
||||
* this could be used for empty response handler
|
||||
/**
|
||||
* Set the buffer with column definitions.
|
||||
* This will be used only by the empty response handler.
|
||||
*/
|
||||
if (!csdata->res.rows_offset)
|
||||
if (!csdata->res.column_defs &&
|
||||
csdata->instance->config.m_return == MAXROWS_RETURN_EMPTY)
|
||||
{
|
||||
csdata->res.rows_offset = csdata->res.offset;
|
||||
csdata->res.column_defs = gwbuf_clone(csdata->res.data);
|
||||
}
|
||||
|
||||
csdata->state = MAXROWS_EXPECTING_ROWS;
|
||||
rv = handle_rows(csdata);
|
||||
rv = handle_rows(csdata, csdata->res.data, csdata->res.offset);
|
||||
break;
|
||||
|
||||
default: // Field information.
|
||||
@ -646,8 +659,28 @@ static int handle_expecting_nothing(MAXROWS_SESSION_DATA *csdata)
|
||||
{
|
||||
ss_dassert(csdata->state == MAXROWS_EXPECTING_NOTHING);
|
||||
ss_dassert(csdata->res.data);
|
||||
MXS_ERROR("Received data from the backend although we were expecting nothing.");
|
||||
ss_dassert(!true);
|
||||
unsigned long msg_size = gwbuf_length(csdata->res.data);
|
||||
|
||||
if ((int)MYSQL_GET_COMMAND(GWBUF_DATA(csdata->res.data)) == 0xff)
|
||||
{
|
||||
/**
|
||||
* Error text message is after:
|
||||
* MYSQL_HEADER_LEN offset + status flag (1) + error code (2) +
|
||||
* 6 bytes message status = MYSQL_HEADER_LEN + 9
|
||||
*/
|
||||
MXS_INFO("Error packet received from backend "
|
||||
"(possibly a server shut down ?): [%.*s].",
|
||||
(int)msg_size - (MYSQL_HEADER_LEN + 9),
|
||||
GWBUF_DATA(csdata->res.data) + MYSQL_HEADER_LEN + 9);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_WARNING("Received data from the backend although "
|
||||
"filter is expecting nothing. "
|
||||
"Packet size is %lu bytes long.",
|
||||
msg_size);
|
||||
ss_dassert(!true);
|
||||
}
|
||||
|
||||
return send_upstream(csdata);
|
||||
}
|
||||
@ -772,32 +805,37 @@ static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata)
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when resultset rows are handled.
|
||||
* Called when resultset rows are handled
|
||||
*
|
||||
* @param csdata The maxrows session data.
|
||||
* @param csdata The maxrows session data
|
||||
* @param buffer The buffer containing the packet
|
||||
* @param extra_offset Offset into @c buffer where the packet is stored
|
||||
*
|
||||
* @return The return value of the upstream component
|
||||
*/
|
||||
static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
||||
static int handle_rows(MAXROWS_SESSION_DATA *csdata, GWBUF* buffer, size_t extra_offset)
|
||||
{
|
||||
ss_dassert(csdata->state == MAXROWS_EXPECTING_ROWS);
|
||||
ss_dassert(csdata->res.data);
|
||||
|
||||
int rv = 1;
|
||||
bool insufficient = false;
|
||||
size_t buflen = csdata->res.length;
|
||||
size_t offset = extra_offset;
|
||||
size_t buflen = gwbuf_length(buffer);
|
||||
|
||||
while (!insufficient && (buflen - csdata->res.offset >= MYSQL_HEADER_LEN))
|
||||
while (!insufficient && (buflen - offset >= MYSQL_HEADER_LEN))
|
||||
{
|
||||
bool pending_large_data = csdata->large_packet;
|
||||
// header array holds a full EOF packet
|
||||
uint8_t header[MYSQL_EOF_PACKET_LEN];
|
||||
gwbuf_copy_data(csdata->res.data,
|
||||
csdata->res.offset,
|
||||
gwbuf_copy_data(buffer,
|
||||
offset,
|
||||
MYSQL_EOF_PACKET_LEN,
|
||||
header);
|
||||
|
||||
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PAYLOAD_LEN(header);
|
||||
|
||||
if (csdata->res.offset + packetlen <= buflen)
|
||||
if (offset + packetlen <= buflen)
|
||||
{
|
||||
/* Check for large packet packet terminator:
|
||||
* min is 4 bytes "0x0 0x0 0x0 0xseq_no and
|
||||
@ -809,10 +847,8 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
||||
packetlen < MYSQL_EOF_PACKET_LEN))
|
||||
{
|
||||
// Update offset, number of rows and break
|
||||
csdata->res.offset += packetlen;
|
||||
offset += packetlen;
|
||||
csdata->res.n_rows++;
|
||||
|
||||
ss_dassert(csdata->res.offset == buflen);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -826,9 +862,7 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
||||
// Mark the beginning of a large packet receiving
|
||||
csdata->large_packet = true;
|
||||
// Just update offset and break
|
||||
csdata->res.offset += packetlen;
|
||||
|
||||
ss_dassert(csdata->res.offset == buflen);
|
||||
offset += packetlen;
|
||||
break;
|
||||
}
|
||||
else
|
||||
@ -843,8 +877,7 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
||||
switch (command)
|
||||
{
|
||||
case 0xff: // ERR packet after the rows.
|
||||
csdata->res.offset += packetlen;
|
||||
ss_dassert(csdata->res.offset == buflen);
|
||||
offset += packetlen;
|
||||
|
||||
// This is the end of resultset: set big packet var to false
|
||||
csdata->large_packet = false;
|
||||
@ -883,8 +916,7 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
||||
* NOTE: not supported right now
|
||||
*/
|
||||
case 0xfe: // EOF, the one after the rows.
|
||||
csdata->res.offset += packetlen;
|
||||
ss_dassert(csdata->res.offset == buflen);
|
||||
offset += packetlen;
|
||||
|
||||
/* EOF could be the last packet in the transmission:
|
||||
* check first whether SERVER_MORE_RESULTS_EXIST flag is set.
|
||||
@ -949,7 +981,7 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
||||
|
||||
case 0xfb: // NULL
|
||||
default: // length-encoded-string
|
||||
csdata->res.offset += packetlen;
|
||||
offset += packetlen;
|
||||
// Increase res.n_rows counter while not receiving large packets
|
||||
if (!csdata->large_packet)
|
||||
{
|
||||
@ -981,6 +1013,8 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
||||
}
|
||||
}
|
||||
|
||||
csdata->res.offset += offset - extra_offset;
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
@ -1009,12 +1043,20 @@ static int send_upstream(MAXROWS_SESSION_DATA *csdata)
|
||||
ss_dassert(csdata->res.data != NULL);
|
||||
|
||||
/* Free a saved SQL not freed by send_error_upstream() */
|
||||
if (csdata->input_sql)
|
||||
if (csdata->instance->config.m_return == MAXROWS_RETURN_ERR)
|
||||
{
|
||||
gwbuf_free(csdata->input_sql);
|
||||
csdata->input_sql = NULL;
|
||||
}
|
||||
|
||||
/* Free a saved columndefs not freed by send_eof_upstream() */
|
||||
if (csdata->instance->config.m_return == MAXROWS_RETURN_EMPTY)
|
||||
{
|
||||
gwbuf_free(csdata->res.column_defs);
|
||||
csdata->res.column_defs = NULL;
|
||||
}
|
||||
|
||||
/* Send data to client */
|
||||
int rv = csdata->up.clientReply(csdata->up.instance,
|
||||
csdata->up.session,
|
||||
csdata->res.data);
|
||||
@ -1039,22 +1081,24 @@ static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata)
|
||||
/* Sequence byte is #3 */
|
||||
uint8_t eof[MYSQL_EOF_PACKET_LEN] = {05, 00, 00, 01, 0xfe, 00, 00, 02, 00};
|
||||
GWBUF *new_pkt = NULL;
|
||||
|
||||
ss_dassert(csdata->res.data != NULL);
|
||||
ss_dassert(csdata->res.column_defs != NULL);
|
||||
|
||||
/**
|
||||
* The offset to server reply pointing to
|
||||
* next byte after column definitions EOF
|
||||
* of the first result set.
|
||||
*/
|
||||
size_t offset = csdata->res.rows_offset;
|
||||
|
||||
ss_dassert(csdata->res.data != NULL);
|
||||
size_t offset = gwbuf_length(csdata->res.column_defs);
|
||||
|
||||
/* Data to send + added EOF */
|
||||
uint8_t *new_result = MXS_MALLOC(offset + MYSQL_EOF_PACKET_LEN);
|
||||
|
||||
if (new_result)
|
||||
{
|
||||
/* Get contiguous data from beginning to specified offset */
|
||||
gwbuf_copy_data(csdata->res.data, 0, offset, new_result);
|
||||
/* Get contiguous data from saved columns defintions buffer */
|
||||
gwbuf_copy_data(csdata->res.column_defs, 0, offset, new_result);
|
||||
|
||||
/* Increment sequence number for the EOF being added for empty resultset:
|
||||
* last one if found in EOF terminating column def
|
||||
@ -1087,9 +1131,12 @@ static int send_eof_upstream(MAXROWS_SESSION_DATA *csdata)
|
||||
rv = 0;
|
||||
}
|
||||
|
||||
/* Free full input buffer */
|
||||
/* Free all data buffers */
|
||||
gwbuf_free(csdata->res.data);
|
||||
gwbuf_free(csdata->res.column_defs);
|
||||
|
||||
csdata->res.data = NULL;
|
||||
csdata->res.column_defs = NULL;
|
||||
|
||||
return rv;
|
||||
}
|
||||
@ -1127,6 +1174,8 @@ static int send_ok_upstream(MAXROWS_SESSION_DATA *csdata)
|
||||
int rv = csdata->up.clientReply(csdata->up.instance,
|
||||
csdata->up.session,
|
||||
packet);
|
||||
|
||||
/* Free server result buffer */
|
||||
gwbuf_free(csdata->res.data);
|
||||
csdata->res.data = NULL;
|
||||
|
||||
@ -1208,10 +1257,10 @@ static int send_error_upstream(MAXROWS_SESSION_DATA *csdata)
|
||||
|
||||
/* Free server result buffer */
|
||||
gwbuf_free(csdata->res.data);
|
||||
csdata->res.data = NULL;
|
||||
|
||||
/* Free input_sql buffer */
|
||||
gwbuf_free(csdata->input_sql);
|
||||
|
||||
csdata->res.data = NULL;
|
||||
csdata->input_sql = NULL;
|
||||
|
||||
return rv;
|
||||
|
@ -519,10 +519,6 @@ static void log_closed_session(mxs_mysql_cmd_t mysql_command, bool is_closed,
|
||||
{
|
||||
sprintf(msg, "Server '%s' is down.", ref->server->unique_name);
|
||||
}
|
||||
else if (!SERVER_REF_IS_ACTIVE(ref))
|
||||
{
|
||||
sprintf(msg, "Server '%s' was removed from the service.", ref->server->unique_name);
|
||||
}
|
||||
else if (SERVER_IN_MAINT(ref->server))
|
||||
{
|
||||
sprintf(msg, "Server '%s' is in maintenance.", ref->server->unique_name);
|
||||
@ -576,7 +572,6 @@ routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queu
|
||||
}
|
||||
|
||||
if (rses_is_closed || backend_dcb == NULL ||
|
||||
!SERVER_REF_IS_ACTIVE(router_cli_ses->backend) ||
|
||||
!SERVER_IS_RUNNING(router_cli_ses->backend->server))
|
||||
{
|
||||
log_closed_session(mysql_command, rses_is_closed, router_cli_ses->backend);
|
||||
|
@ -424,7 +424,7 @@ SRWBackend get_target_backend(RWSplitSession *rses, backend_type_t btype,
|
||||
|
||||
/** The server must be a valid slave, relay server, or master */
|
||||
|
||||
if (backend->in_use() && backend->is_active() &&
|
||||
if (backend->in_use() &&
|
||||
(strcasecmp(name, backend->name()) == 0) &&
|
||||
(backend->is_slave() ||
|
||||
backend->is_relay() ||
|
||||
@ -451,7 +451,7 @@ SRWBackend get_target_backend(RWSplitSession *rses, backend_type_t btype,
|
||||
* Unused backend or backend which is not master nor
|
||||
* slave can't be used
|
||||
*/
|
||||
if (!backend->in_use() || !backend->is_active() ||
|
||||
if (!backend->in_use() ||
|
||||
(!backend->is_master() && !backend->is_slave()))
|
||||
{
|
||||
continue;
|
||||
@ -527,7 +527,7 @@ SRWBackend get_target_backend(RWSplitSession *rses, backend_type_t btype,
|
||||
*/
|
||||
else if (btype == BE_MASTER)
|
||||
{
|
||||
if (master && master->is_active())
|
||||
if (master)
|
||||
{
|
||||
/** It is possible for the server status to change at any point in time
|
||||
* so copying it locally will make possible error messages
|
||||
|
Reference in New Issue
Block a user