Return results as sets of packets

Returning the results of a query as a set of packets is currently more
efficient. This is mainly due to the fact that each individual packet for
single packet routing is allocated from the heap which causes a
significant loss in performance.

Took the new capability into use in readwritesplit and modified the
reply_is_complete function to work with non-contiguous results.
This commit is contained in:
Markus Mäkelä
2017-10-09 16:14:36 +03:00
parent 3015e6c97d
commit d64cd5cab8
4 changed files with 65 additions and 126 deletions

View File

@ -42,6 +42,8 @@ typedef enum routing_capability
RCAP_TYPE_CONTIGUOUS_OUTPUT = 0x0030, /* 0b0000000000110000 */
/** Result sets are delivered in one buffer; implies RCAP_TYPE_STMT_OUTPUT. */
RCAP_TYPE_RESULTSET_OUTPUT = 0x0050, /* 0b0000000001110000 */
/** Results are delivered as a set of complete packets */
RCAP_TYPE_PACKET_OUTPUT = 0x0080, /* 0b0000000010000000 */
} mxs_routing_capability_t;

View File

@ -676,6 +676,11 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more, modutil_
}
offset += pktlen;
if (offset >= GWBUF_LENGTH(reply) && reply->next)
{
offset -= GWBUF_LENGTH(reply);
reply = reply->next;
}
}
int total = err + eof + n_found;

View File

@ -745,8 +745,9 @@ gw_read_and_write(DCB *dcb)
bool result_collected = false;
MySQLProtocol *proto = (MySQLProtocol *)dcb->protocol;
if (rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_OUTPUT) ||
proto->collect_result || proto->ignore_replies != 0)
if (rcap_type_required(capabilities, RCAP_TYPE_PACKET_OUTPUT) ||
rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_OUTPUT) ||
proto->ignore_replies != 0)
{
GWBUF *tmp = modutil_get_complete_packets(&read_buffer);
/* Put any residue into the read queue */
@ -761,48 +762,53 @@ gw_read_and_write(DCB *dcb)
read_buffer = tmp;
if ((tmp = gwbuf_make_contiguous(read_buffer)))
if (rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_OUTPUT) ||
proto->collect_result ||
proto->ignore_replies != 0)
{
read_buffer = tmp;
}
else
{
/** Failed to make the buffer contiguous */
gwbuf_free(read_buffer);
poll_fake_hangup_event(dcb);
return 0;
}
if (collecting_resultset(proto, capabilities))
{
if (expecting_resultset(proto))
if ((tmp = gwbuf_make_contiguous(read_buffer)))
{
if (mxs_mysql_is_result_set(read_buffer))
read_buffer = tmp;
}
else
{
/** Failed to make the buffer contiguous */
gwbuf_free(read_buffer);
poll_fake_hangup_event(dcb);
return 0;
}
if (collecting_resultset(proto, capabilities))
{
if (expecting_resultset(proto))
{
bool more = false;
if (modutil_count_signal_packets(read_buffer, 0, &more, NULL) != 2)
if (mxs_mysql_is_result_set(read_buffer))
{
bool more = false;
if (modutil_count_signal_packets(read_buffer, 0, &more, NULL) != 2)
{
dcb_readq_prepend(dcb, read_buffer);
return 0;
}
}
// Collected the complete result
proto->collect_result = false;
result_collected = true;
}
else if (expecting_ps_response(proto) &&
mxs_mysql_is_prep_stmt_ok(read_buffer))
{
if (!complete_ps_response(read_buffer))
{
dcb_readq_prepend(dcb, read_buffer);
return 0;
}
}
// Collected the complete result
proto->collect_result = false;
result_collected = true;
}
else if (expecting_ps_response(proto) &&
mxs_mysql_is_prep_stmt_ok(read_buffer))
{
if (!complete_ps_response(read_buffer))
{
dcb_readq_prepend(dcb, read_buffer);
return 0;
// Collected the complete result
proto->collect_result = false;
result_collected = true;
}
// Collected the complete result
proto->collect_result = false;
result_collected = true;
}
}
}
@ -905,8 +911,7 @@ gw_read_and_write(DCB *dcb)
* If protocol has session command set, concatenate whole
* response into one buffer.
*/
if (proto->protocol_command.scom_cmd != MXS_COM_UNDEFINED &&
protocol_get_srv_command(proto, true) != MXS_COM_UNDEFINED)
if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, true) != MXS_COM_UNDEFINED)
{
if (result_collected)
{
@ -940,23 +945,12 @@ gw_read_and_write(DCB *dcb)
!rcap_type_required(capabilities, RCAP_TYPE_RESULTSET_OUTPUT) &&
!result_collected)
{
if ((stmt = modutil_get_next_MySQL_packet(&read_buffer)))
{
if (!GWBUF_IS_CONTIGUOUS(stmt))
{
// Make sure the buffer is contiguous
stmt = gwbuf_make_contiguous(stmt);
}
}
else
{
// All complete packets are processed, store partial packets for later use
if (read_buffer)
{
dcb_readq_prepend(dcb, read_buffer);
}
stmt = modutil_get_next_MySQL_packet(&read_buffer);
return return_code;
if (!GWBUF_IS_CONTIGUOUS(stmt))
{
// Make sure the buffer is contiguous
stmt = gwbuf_make_contiguous(stmt);
}
}
else

View File

@ -515,55 +515,6 @@ static bool route_stored_query(RWSplitSession *rses)
return rval;
}
static inline bool is_eof(GWBUF* buffer, size_t len)
{
uint8_t* data = GWBUF_DATA(buffer);
return data[MYSQL_HEADER_LEN] == MYSQL_REPLY_EOF &&
len == MYSQL_EOF_PACKET_LEN - MYSQL_HEADER_LEN;
}
static inline bool is_ok(GWBUF* buffer)
{
uint8_t* data = GWBUF_DATA(buffer);
return data[MYSQL_HEADER_LEN] == MYSQL_REPLY_OK;
}
static inline bool more_results_exist(GWBUF* buffer)
{
ss_dassert(is_eof(buffer, gw_mysql_get_byte3(GWBUF_DATA(buffer))) ||
mxs_mysql_is_ok_packet(buffer));
ss_dassert(GWBUF_IS_CONTIGUOUS(buffer));
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_HEADER_LEN + 1;
ptr += mxs_leint_bytes(ptr);
ptr += mxs_leint_bytes(ptr);
uint16_t status = gw_mysql_get_byte2(ptr);
return status & SERVER_MORE_RESULTS_EXIST;
}
static inline bool is_result_set(GWBUF *buffer)
{
bool rval = false;
switch (GWBUF_DATA(buffer)[MYSQL_HEADER_LEN])
{
case MYSQL_REPLY_OK:
case MYSQL_REPLY_ERR:
case MYSQL_REPLY_LOCAL_INFILE:
case MYSQL_REPLY_EOF:
/** Not a result set */
break;
default:
rval = true;
break;
}
return rval;
}
/**
* @brief Check if we have received a complete reply from the backend
*
@ -575,11 +526,12 @@ static inline bool is_result_set(GWBUF *buffer)
bool reply_is_complete(SRWBackend& backend, GWBUF *buffer)
{
if (backend->get_reply_state() == REPLY_STATE_START &&
(!is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
(!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
{
if (GWBUF_IS_COLLECTED_RESULT(buffer) ||
backend->current_command() == MXS_COM_STMT_PREPARE ||
!is_ok(buffer) || !more_results_exist(buffer))
!mxs_mysql_is_ok_packet(buffer) ||
!mxs_mysql_more_results_after_ok(buffer))
{
/** Not a result set, we have the complete response */
LOG_RS(backend, REPLY_STATE_DONE);
@ -588,21 +540,11 @@ bool reply_is_complete(SRWBackend& backend, GWBUF *buffer)
}
else
{
int n_eof = backend->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
size_t len = gw_mysql_get_byte3(GWBUF_DATA(buffer));
if (len == GW_MYSQL_MAX_PACKET_LEN)
{
backend->set_large_packet(true);
}
else if (backend->is_large_packet())
{
backend->set_large_packet(false);
}
else if (is_eof(buffer, len))
{
n_eof++;
}
bool more = false;
modutil_state state = {backend->is_large_packet()};
int n_old_eof = backend->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &state);
backend->set_large_packet(state.state);
if (n_eof == 0)
{
@ -624,7 +566,7 @@ bool reply_is_complete(SRWBackend& backend, GWBUF *buffer)
LOG_RS(backend, REPLY_STATE_DONE);
backend->set_reply_state(REPLY_STATE_DONE);
if (more_results_exist(buffer))
if (more)
{
/** The server will send more resultsets */
LOG_RS(backend, REPLY_STATE_START);
@ -1183,10 +1125,6 @@ static void clientReply(MXS_ROUTER *instance,
GWBUF *writebuf,
DCB *backend_dcb)
{
ss_dassert((GWBUF_IS_CONTIGUOUS(writebuf) &&
MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(writebuf)) +
MYSQL_HEADER_LEN == gwbuf_length(writebuf)) ||
GWBUF_IS_COLLECTED_RESULT(writebuf));
RWSplitSession *rses = (RWSplitSession *)router_session;
DCB *client_dcb = backend_dcb->session->client_dcb;
CHK_CLIENT_RSES(rses);
@ -1280,7 +1218,7 @@ static void clientReply(MXS_ROUTER *instance,
*/
static uint64_t getCapabilities(MXS_ROUTER* instance)
{
return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_STMT_OUTPUT;
return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_PACKET_OUTPUT;
}
/**
@ -1431,7 +1369,7 @@ MXS_MODULE *MXS_CREATE_MODULE()
MXS_MODULE_API_ROUTER, MXS_MODULE_GA, MXS_ROUTER_VERSION,
"A Read/Write splitting router for enhancement read scalability",
"V1.1.0",
RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_STMT_OUTPUT,
RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_PACKET_OUTPUT,
&MyObject,
NULL, /* Process init. */
NULL, /* Process finish. */