MXS-1412: Process response buffers individually
By processing each buffer individually, the need to iterate over the whole resultset is removed. Profiling showed that most of the time was spent navigating the linked list of buffers when an offset into the whole resultset was used instead of an offset to the individual response buffer.
This commit is contained in:
@ -211,7 +211,7 @@ static void maxrows_session_data_free(MAXROWS_SESSION_DATA *data);
|
||||
static int handle_expecting_fields(MAXROWS_SESSION_DATA *csdata);
|
||||
static int handle_expecting_nothing(MAXROWS_SESSION_DATA *csdata);
|
||||
static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata);
|
||||
static int handle_rows(MAXROWS_SESSION_DATA *csdata);
|
||||
static int handle_rows(MAXROWS_SESSION_DATA *csdata, GWBUF* buffer, size_t extra_offset);
|
||||
static int handle_ignoring_response(MAXROWS_SESSION_DATA *csdata);
|
||||
static bool process_params(char **options,
|
||||
MXS_CONFIG_PARAMETER *params,
|
||||
@ -458,7 +458,7 @@ static int clientReply(MXS_FILTER *instance,
|
||||
break;
|
||||
|
||||
case MAXROWS_EXPECTING_ROWS:
|
||||
rv = handle_rows(csdata);
|
||||
rv = handle_rows(csdata, data, 0);
|
||||
break;
|
||||
|
||||
case MAXROWS_IGNORING_RESPONSE:
|
||||
@ -608,7 +608,7 @@ static int handle_expecting_fields(MAXROWS_SESSION_DATA *csdata)
|
||||
}
|
||||
|
||||
csdata->state = MAXROWS_EXPECTING_ROWS;
|
||||
rv = handle_rows(csdata);
|
||||
rv = handle_rows(csdata, csdata->res.data,csdata->res.offset);
|
||||
break;
|
||||
|
||||
default: // Field information.
|
||||
@ -767,28 +767,29 @@ static int handle_expecting_response(MAXROWS_SESSION_DATA *csdata)
|
||||
*
|
||||
* @param csdata The maxrows session data.
|
||||
*/
|
||||
static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
||||
static int handle_rows(MAXROWS_SESSION_DATA *csdata, GWBUF* buffer, size_t extra_offset)
|
||||
{
|
||||
ss_dassert(csdata->state == MAXROWS_EXPECTING_ROWS);
|
||||
ss_dassert(csdata->res.data);
|
||||
|
||||
int rv = 1;
|
||||
bool insufficient = false;
|
||||
size_t buflen = csdata->res.length;
|
||||
size_t offset = extra_offset;
|
||||
size_t buflen = gwbuf_length(buffer);
|
||||
|
||||
while (!insufficient && (buflen - csdata->res.offset >= MYSQL_HEADER_LEN))
|
||||
while (!insufficient && (buflen - offset >= MYSQL_HEADER_LEN))
|
||||
{
|
||||
bool pending_large_data = csdata->large_packet;
|
||||
// header array holds a full EOF packet
|
||||
uint8_t header[MYSQL_EOF_PACKET_LEN];
|
||||
gwbuf_copy_data(csdata->res.data,
|
||||
csdata->res.offset,
|
||||
gwbuf_copy_data(buffer,
|
||||
offset,
|
||||
MYSQL_EOF_PACKET_LEN,
|
||||
header);
|
||||
|
||||
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PAYLOAD_LEN(header);
|
||||
|
||||
if (csdata->res.offset + packetlen <= buflen)
|
||||
if (offset + packetlen <= buflen)
|
||||
{
|
||||
/* Check for large packet packet terminator:
|
||||
* min is 4 bytes "0x0 0x0 0x0 0xseq_no and
|
||||
@ -800,10 +801,8 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
||||
packetlen < MYSQL_EOF_PACKET_LEN))
|
||||
{
|
||||
// Update offset, number of rows and break
|
||||
csdata->res.offset += packetlen;
|
||||
offset += packetlen;
|
||||
csdata->res.n_rows++;
|
||||
|
||||
ss_dassert(csdata->res.offset == buflen);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -817,9 +816,7 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
||||
// Mark the beginning of a large packet receiving
|
||||
csdata->large_packet = true;
|
||||
// Just update offset and break
|
||||
csdata->res.offset += packetlen;
|
||||
|
||||
ss_dassert(csdata->res.offset == buflen);
|
||||
offset += packetlen;
|
||||
break;
|
||||
}
|
||||
else
|
||||
@ -834,8 +831,7 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
||||
switch (command)
|
||||
{
|
||||
case 0xff: // ERR packet after the rows.
|
||||
csdata->res.offset += packetlen;
|
||||
ss_dassert(csdata->res.offset == buflen);
|
||||
offset += packetlen;
|
||||
|
||||
// This is the end of resultset: set big packet var to false
|
||||
csdata->large_packet = false;
|
||||
@ -874,8 +870,7 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
||||
* NOTE: not supported right now
|
||||
*/
|
||||
case 0xfe: // EOF, the one after the rows.
|
||||
csdata->res.offset += packetlen;
|
||||
ss_dassert(csdata->res.offset == buflen);
|
||||
offset += packetlen;
|
||||
|
||||
/* EOF could be the last packet in the transmission:
|
||||
* check first whether SERVER_MORE_RESULTS_EXIST flag is set.
|
||||
@ -940,7 +935,7 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
||||
|
||||
case 0xfb: // NULL
|
||||
default: // length-encoded-string
|
||||
csdata->res.offset += packetlen;
|
||||
offset += packetlen;
|
||||
// Increase res.n_rows counter while not receiving large packets
|
||||
if (!csdata->large_packet)
|
||||
{
|
||||
@ -972,6 +967,8 @@ static int handle_rows(MAXROWS_SESSION_DATA *csdata)
|
||||
}
|
||||
}
|
||||
|
||||
csdata->res.offset += offset;
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user