Return results as sets of packets
Returning the results of a query as a set of packets is currently more efficient. This is mainly due to the fact that each individual packet for single packet routing is allocated from the heap which causes a significant loss in performance. Took the new capability into use in readwritesplit and modified the reply_is_complete function to work with non-contiguous results.
This commit is contained in:

committed by
Johan Wikman

parent
0e7f592bd7
commit
f3b0245c0b
@ -42,6 +42,8 @@ typedef enum routing_capability
|
|||||||
RCAP_TYPE_CONTIGUOUS_OUTPUT = 0x0030, /* 0b0000000000110000 */
|
RCAP_TYPE_CONTIGUOUS_OUTPUT = 0x0030, /* 0b0000000000110000 */
|
||||||
/** Result sets are delivered in one buffer; implies RCAP_TYPE_STMT_OUTPUT. */
|
/** Result sets are delivered in one buffer; implies RCAP_TYPE_STMT_OUTPUT. */
|
||||||
RCAP_TYPE_RESULTSET_OUTPUT = 0x0050, /* 0b0000000001110000 */
|
RCAP_TYPE_RESULTSET_OUTPUT = 0x0050, /* 0b0000000001110000 */
|
||||||
|
/** Results are delivered as a set of complete packets */
|
||||||
|
RCAP_TYPE_PACKET_OUTPUT = 0x0080, /* 0b0000000010000000 */
|
||||||
|
|
||||||
} mxs_routing_capability_t;
|
} mxs_routing_capability_t;
|
||||||
|
|
||||||
|
@ -676,6 +676,11 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more, modutil_
|
|||||||
}
|
}
|
||||||
|
|
||||||
offset += pktlen;
|
offset += pktlen;
|
||||||
|
if (offset >= GWBUF_LENGTH(reply) && reply->next)
|
||||||
|
{
|
||||||
|
offset -= GWBUF_LENGTH(reply);
|
||||||
|
reply = reply->next;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int total = err + eof + n_found;
|
int total = err + eof + n_found;
|
||||||
|
@ -745,8 +745,9 @@ gw_read_and_write(DCB *dcb)
|
|||||||
bool result_collected = false;
|
bool result_collected = false;
|
||||||
MySQLProtocol *proto = (MySQLProtocol *)dcb->protocol;
|
MySQLProtocol *proto = (MySQLProtocol *)dcb->protocol;
|
||||||
|
|
||||||
if (rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_OUTPUT) ||
|
if (rcap_type_required(capabilities, RCAP_TYPE_PACKET_OUTPUT) ||
|
||||||
proto->collect_result || proto->ignore_replies != 0)
|
rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_OUTPUT) ||
|
||||||
|
proto->ignore_replies != 0)
|
||||||
{
|
{
|
||||||
GWBUF *tmp = modutil_get_complete_packets(&read_buffer);
|
GWBUF *tmp = modutil_get_complete_packets(&read_buffer);
|
||||||
/* Put any residue into the read queue */
|
/* Put any residue into the read queue */
|
||||||
@ -761,6 +762,10 @@ gw_read_and_write(DCB *dcb)
|
|||||||
|
|
||||||
read_buffer = tmp;
|
read_buffer = tmp;
|
||||||
|
|
||||||
|
if (rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_OUTPUT) ||
|
||||||
|
proto->collect_result ||
|
||||||
|
proto->ignore_replies != 0)
|
||||||
|
{
|
||||||
if ((tmp = gwbuf_make_contiguous(read_buffer)))
|
if ((tmp = gwbuf_make_contiguous(read_buffer)))
|
||||||
{
|
{
|
||||||
read_buffer = tmp;
|
read_buffer = tmp;
|
||||||
@ -806,6 +811,7 @@ gw_read_and_write(DCB *dcb)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (proto->ignore_replies > 0)
|
if (proto->ignore_replies > 0)
|
||||||
{
|
{
|
||||||
@ -905,8 +911,7 @@ gw_read_and_write(DCB *dcb)
|
|||||||
* If protocol has session command set, concatenate whole
|
* If protocol has session command set, concatenate whole
|
||||||
* response into one buffer.
|
* response into one buffer.
|
||||||
*/
|
*/
|
||||||
if (proto->protocol_command.scom_cmd != MXS_COM_UNDEFINED &&
|
if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, true) != MXS_COM_UNDEFINED)
|
||||||
protocol_get_srv_command(proto, true) != MXS_COM_UNDEFINED)
|
|
||||||
{
|
{
|
||||||
if (result_collected)
|
if (result_collected)
|
||||||
{
|
{
|
||||||
@ -940,8 +945,8 @@ gw_read_and_write(DCB *dcb)
|
|||||||
!rcap_type_required(capabilities, RCAP_TYPE_RESULTSET_OUTPUT) &&
|
!rcap_type_required(capabilities, RCAP_TYPE_RESULTSET_OUTPUT) &&
|
||||||
!result_collected)
|
!result_collected)
|
||||||
{
|
{
|
||||||
if ((stmt = modutil_get_next_MySQL_packet(&read_buffer)))
|
stmt = modutil_get_next_MySQL_packet(&read_buffer);
|
||||||
{
|
|
||||||
if (!GWBUF_IS_CONTIGUOUS(stmt))
|
if (!GWBUF_IS_CONTIGUOUS(stmt))
|
||||||
{
|
{
|
||||||
// Make sure the buffer is contiguous
|
// Make sure the buffer is contiguous
|
||||||
@ -949,17 +954,6 @@ gw_read_and_write(DCB *dcb)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
|
||||||
// All complete packets are processed, store partial packets for later use
|
|
||||||
if (read_buffer)
|
|
||||||
{
|
|
||||||
dcb_readq_prepend(dcb, read_buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
return return_code;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
{
|
||||||
stmt = read_buffer;
|
stmt = read_buffer;
|
||||||
read_buffer = NULL;
|
read_buffer = NULL;
|
||||||
|
@ -515,55 +515,6 @@ static bool route_stored_query(RWSplitSession *rses)
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline bool is_eof(GWBUF* buffer, size_t len)
|
|
||||||
{
|
|
||||||
uint8_t* data = GWBUF_DATA(buffer);
|
|
||||||
return data[MYSQL_HEADER_LEN] == MYSQL_REPLY_EOF &&
|
|
||||||
len == MYSQL_EOF_PACKET_LEN - MYSQL_HEADER_LEN;
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline bool is_ok(GWBUF* buffer)
|
|
||||||
{
|
|
||||||
uint8_t* data = GWBUF_DATA(buffer);
|
|
||||||
return data[MYSQL_HEADER_LEN] == MYSQL_REPLY_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline bool more_results_exist(GWBUF* buffer)
|
|
||||||
{
|
|
||||||
ss_dassert(is_eof(buffer, gw_mysql_get_byte3(GWBUF_DATA(buffer))) ||
|
|
||||||
mxs_mysql_is_ok_packet(buffer));
|
|
||||||
ss_dassert(GWBUF_IS_CONTIGUOUS(buffer));
|
|
||||||
|
|
||||||
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_HEADER_LEN + 1;
|
|
||||||
ptr += mxs_leint_bytes(ptr);
|
|
||||||
ptr += mxs_leint_bytes(ptr);
|
|
||||||
|
|
||||||
uint16_t status = gw_mysql_get_byte2(ptr);
|
|
||||||
return status & SERVER_MORE_RESULTS_EXIST;
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline bool is_result_set(GWBUF *buffer)
|
|
||||||
{
|
|
||||||
bool rval = false;
|
|
||||||
|
|
||||||
switch (GWBUF_DATA(buffer)[MYSQL_HEADER_LEN])
|
|
||||||
{
|
|
||||||
|
|
||||||
case MYSQL_REPLY_OK:
|
|
||||||
case MYSQL_REPLY_ERR:
|
|
||||||
case MYSQL_REPLY_LOCAL_INFILE:
|
|
||||||
case MYSQL_REPLY_EOF:
|
|
||||||
/** Not a result set */
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
rval = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
return rval;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Check if we have received a complete reply from the backend
|
* @brief Check if we have received a complete reply from the backend
|
||||||
*
|
*
|
||||||
@ -575,11 +526,12 @@ static inline bool is_result_set(GWBUF *buffer)
|
|||||||
bool reply_is_complete(SRWBackend& backend, GWBUF *buffer)
|
bool reply_is_complete(SRWBackend& backend, GWBUF *buffer)
|
||||||
{
|
{
|
||||||
if (backend->get_reply_state() == REPLY_STATE_START &&
|
if (backend->get_reply_state() == REPLY_STATE_START &&
|
||||||
(!is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
|
(!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
|
||||||
{
|
{
|
||||||
if (GWBUF_IS_COLLECTED_RESULT(buffer) ||
|
if (GWBUF_IS_COLLECTED_RESULT(buffer) ||
|
||||||
backend->current_command() == MXS_COM_STMT_PREPARE ||
|
backend->current_command() == MXS_COM_STMT_PREPARE ||
|
||||||
!is_ok(buffer) || !more_results_exist(buffer))
|
!mxs_mysql_is_ok_packet(buffer) ||
|
||||||
|
!mxs_mysql_more_results_after_ok(buffer))
|
||||||
{
|
{
|
||||||
/** Not a result set, we have the complete response */
|
/** Not a result set, we have the complete response */
|
||||||
LOG_RS(backend, REPLY_STATE_DONE);
|
LOG_RS(backend, REPLY_STATE_DONE);
|
||||||
@ -588,21 +540,11 @@ bool reply_is_complete(SRWBackend& backend, GWBUF *buffer)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
int n_eof = backend->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
|
bool more = false;
|
||||||
size_t len = gw_mysql_get_byte3(GWBUF_DATA(buffer));
|
modutil_state state = {backend->is_large_packet()};
|
||||||
|
int n_old_eof = backend->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
|
||||||
if (len == GW_MYSQL_MAX_PACKET_LEN)
|
int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &state);
|
||||||
{
|
backend->set_large_packet(state.state);
|
||||||
backend->set_large_packet(true);
|
|
||||||
}
|
|
||||||
else if (backend->is_large_packet())
|
|
||||||
{
|
|
||||||
backend->set_large_packet(false);
|
|
||||||
}
|
|
||||||
else if (is_eof(buffer, len))
|
|
||||||
{
|
|
||||||
n_eof++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (n_eof == 0)
|
if (n_eof == 0)
|
||||||
{
|
{
|
||||||
@ -624,7 +566,7 @@ bool reply_is_complete(SRWBackend& backend, GWBUF *buffer)
|
|||||||
LOG_RS(backend, REPLY_STATE_DONE);
|
LOG_RS(backend, REPLY_STATE_DONE);
|
||||||
backend->set_reply_state(REPLY_STATE_DONE);
|
backend->set_reply_state(REPLY_STATE_DONE);
|
||||||
|
|
||||||
if (more_results_exist(buffer))
|
if (more)
|
||||||
{
|
{
|
||||||
/** The server will send more resultsets */
|
/** The server will send more resultsets */
|
||||||
LOG_RS(backend, REPLY_STATE_START);
|
LOG_RS(backend, REPLY_STATE_START);
|
||||||
@ -1183,10 +1125,6 @@ static void clientReply(MXS_ROUTER *instance,
|
|||||||
GWBUF *writebuf,
|
GWBUF *writebuf,
|
||||||
DCB *backend_dcb)
|
DCB *backend_dcb)
|
||||||
{
|
{
|
||||||
ss_dassert((GWBUF_IS_CONTIGUOUS(writebuf) &&
|
|
||||||
MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(writebuf)) +
|
|
||||||
MYSQL_HEADER_LEN == gwbuf_length(writebuf)) ||
|
|
||||||
GWBUF_IS_COLLECTED_RESULT(writebuf));
|
|
||||||
RWSplitSession *rses = (RWSplitSession *)router_session;
|
RWSplitSession *rses = (RWSplitSession *)router_session;
|
||||||
DCB *client_dcb = backend_dcb->session->client_dcb;
|
DCB *client_dcb = backend_dcb->session->client_dcb;
|
||||||
CHK_CLIENT_RSES(rses);
|
CHK_CLIENT_RSES(rses);
|
||||||
@ -1280,7 +1218,7 @@ static void clientReply(MXS_ROUTER *instance,
|
|||||||
*/
|
*/
|
||||||
static uint64_t getCapabilities(MXS_ROUTER* instance)
|
static uint64_t getCapabilities(MXS_ROUTER* instance)
|
||||||
{
|
{
|
||||||
return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_STMT_OUTPUT;
|
return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_PACKET_OUTPUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1431,7 +1369,7 @@ MXS_MODULE *MXS_CREATE_MODULE()
|
|||||||
MXS_MODULE_API_ROUTER, MXS_MODULE_GA, MXS_ROUTER_VERSION,
|
MXS_MODULE_API_ROUTER, MXS_MODULE_GA, MXS_ROUTER_VERSION,
|
||||||
"A Read/Write splitting router for enhancement read scalability",
|
"A Read/Write splitting router for enhancement read scalability",
|
||||||
"V1.1.0",
|
"V1.1.0",
|
||||||
RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_STMT_OUTPUT,
|
RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_PACKET_OUTPUT,
|
||||||
&MyObject,
|
&MyObject,
|
||||||
NULL, /* Process init. */
|
NULL, /* Process init. */
|
||||||
NULL, /* Process finish. */
|
NULL, /* Process finish. */
|
||||||
|
Reference in New Issue
Block a user