diff --git a/include/maxscale/routing.h b/include/maxscale/routing.h index 9a381e927..b7ac075b7 100644 --- a/include/maxscale/routing.h +++ b/include/maxscale/routing.h @@ -42,6 +42,8 @@ typedef enum routing_capability RCAP_TYPE_CONTIGUOUS_OUTPUT = 0x0030, /* 0b0000000000110000 */ /** Result sets are delivered in one buffer; implies RCAP_TYPE_STMT_OUTPUT. */ RCAP_TYPE_RESULTSET_OUTPUT = 0x0050, /* 0b0000000001110000 */ + /** Results are delivered as a set of complete packets */ + RCAP_TYPE_PACKET_OUTPUT = 0x0080, /* 0b0000000010000000 */ } mxs_routing_capability_t; diff --git a/server/core/modutil.cc b/server/core/modutil.cc index 317f4f9ae..3b2577a57 100644 --- a/server/core/modutil.cc +++ b/server/core/modutil.cc @@ -676,6 +676,11 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more, modutil_ } offset += pktlen; + if (offset >= GWBUF_LENGTH(reply) && reply->next) + { + offset -= GWBUF_LENGTH(reply); + reply = reply->next; + } } int total = err + eof + n_found; diff --git a/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c b/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c index d4f332fe3..faa779e1f 100644 --- a/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c +++ b/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c @@ -745,8 +745,9 @@ gw_read_and_write(DCB *dcb) bool result_collected = false; MySQLProtocol *proto = (MySQLProtocol *)dcb->protocol; - if (rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_OUTPUT) || - proto->collect_result || proto->ignore_replies != 0) + if (rcap_type_required(capabilities, RCAP_TYPE_PACKET_OUTPUT) || + rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_OUTPUT) || + proto->ignore_replies != 0) { GWBUF *tmp = modutil_get_complete_packets(&read_buffer); /* Put any residue into the read queue */ @@ -761,48 +762,53 @@ gw_read_and_write(DCB *dcb) read_buffer = tmp; - if ((tmp = gwbuf_make_contiguous(read_buffer))) + if (rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_OUTPUT) || + proto->collect_result || + proto->ignore_replies != 0) { - read_buffer = tmp; - } - else - { - /** Failed to make the buffer contiguous */ - gwbuf_free(read_buffer); - poll_fake_hangup_event(dcb); - return 0; - } - - if (collecting_resultset(proto, capabilities)) - { - if (expecting_resultset(proto)) + if ((tmp = gwbuf_make_contiguous(read_buffer))) { - if (mxs_mysql_is_result_set(read_buffer)) + read_buffer = tmp; + } + else + { + /** Failed to make the buffer contiguous */ + gwbuf_free(read_buffer); + poll_fake_hangup_event(dcb); + return 0; + } + + if (collecting_resultset(proto, capabilities)) + { + if (expecting_resultset(proto)) { - bool more = false; - if (modutil_count_signal_packets(read_buffer, 0, &more, NULL) != 2) + if (mxs_mysql_is_result_set(read_buffer)) + { + bool more = false; + if (modutil_count_signal_packets(read_buffer, 0, &more, NULL) != 2) + { + dcb_readq_prepend(dcb, read_buffer); + return 0; + } + } + + // Collected the complete result + proto->collect_result = false; + result_collected = true; + } + else if (expecting_ps_response(proto) && + mxs_mysql_is_prep_stmt_ok(read_buffer)) + { + if (!complete_ps_response(read_buffer)) { dcb_readq_prepend(dcb, read_buffer); return 0; } - } - // Collected the complete result - proto->collect_result = false; - result_collected = true; - } - else if (expecting_ps_response(proto) && - mxs_mysql_is_prep_stmt_ok(read_buffer)) - { - if (!complete_ps_response(read_buffer)) - { - dcb_readq_prepend(dcb, read_buffer); - return 0; + // Collected the complete result + proto->collect_result = false; + result_collected = true; } - - // Collected the complete result - proto->collect_result = false; - result_collected = true; } } } @@ -905,8 +911,7 @@ gw_read_and_write(DCB *dcb) * If protocol has session command set, concatenate whole * response into one buffer. */ - if (proto->protocol_command.scom_cmd != MXS_COM_UNDEFINED && - protocol_get_srv_command(proto, true) != MXS_COM_UNDEFINED) + if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, true) != MXS_COM_UNDEFINED) { if (result_collected) { @@ -940,23 +945,12 @@ gw_read_and_write(DCB *dcb) !rcap_type_required(capabilities, RCAP_TYPE_RESULTSET_OUTPUT) && !result_collected) { - if ((stmt = modutil_get_next_MySQL_packet(&read_buffer))) - { - if (!GWBUF_IS_CONTIGUOUS(stmt)) - { - // Make sure the buffer is contiguous - stmt = gwbuf_make_contiguous(stmt); - } - } - else - { - // All complete packets are processed, store partial packets for later use - if (read_buffer) - { - dcb_readq_prepend(dcb, read_buffer); - } + stmt = modutil_get_next_MySQL_packet(&read_buffer); - return return_code; + if (!GWBUF_IS_CONTIGUOUS(stmt)) + { + // Make sure the buffer is contiguous + stmt = gwbuf_make_contiguous(stmt); } } else diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index b16edbe5c..24260c7b7 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -515,55 +515,6 @@ static bool route_stored_query(RWSplitSession *rses) return rval; } -static inline bool is_eof(GWBUF* buffer, size_t len) -{ - uint8_t* data = GWBUF_DATA(buffer); - return data[MYSQL_HEADER_LEN] == MYSQL_REPLY_EOF && - len == MYSQL_EOF_PACKET_LEN - MYSQL_HEADER_LEN; -} - -static inline bool is_ok(GWBUF* buffer) -{ - uint8_t* data = GWBUF_DATA(buffer); - return data[MYSQL_HEADER_LEN] == MYSQL_REPLY_OK; -} - -static inline bool more_results_exist(GWBUF* buffer) -{ - ss_dassert(is_eof(buffer, gw_mysql_get_byte3(GWBUF_DATA(buffer))) || - mxs_mysql_is_ok_packet(buffer)); - ss_dassert(GWBUF_IS_CONTIGUOUS(buffer)); - - uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_HEADER_LEN + 1; - ptr += mxs_leint_bytes(ptr); - ptr += mxs_leint_bytes(ptr); - - uint16_t status = gw_mysql_get_byte2(ptr); - return status & SERVER_MORE_RESULTS_EXIST; -} - -static inline bool is_result_set(GWBUF *buffer) -{ - bool rval = false; - - switch (GWBUF_DATA(buffer)[MYSQL_HEADER_LEN]) - { - - case MYSQL_REPLY_OK: - case MYSQL_REPLY_ERR: - case MYSQL_REPLY_LOCAL_INFILE: - case MYSQL_REPLY_EOF: - /** Not a result set */ - break; - - default: - rval = true; - break; - } - - return rval; -} - /** * @brief Check if we have received a complete reply from the backend * @@ -575,11 +526,12 @@ static inline bool is_result_set(GWBUF *buffer) bool reply_is_complete(SRWBackend& backend, GWBUF *buffer) { if (backend->get_reply_state() == REPLY_STATE_START && - (!is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer))) + (!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer))) { if (GWBUF_IS_COLLECTED_RESULT(buffer) || backend->current_command() == MXS_COM_STMT_PREPARE || - !is_ok(buffer) || !more_results_exist(buffer)) + !mxs_mysql_is_ok_packet(buffer) || + !mxs_mysql_more_results_after_ok(buffer)) { /** Not a result set, we have the complete response */ LOG_RS(backend, REPLY_STATE_DONE); @@ -588,21 +540,11 @@ bool reply_is_complete(SRWBackend& backend, GWBUF *buffer) } else { - int n_eof = backend->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0; - size_t len = gw_mysql_get_byte3(GWBUF_DATA(buffer)); - - if (len == GW_MYSQL_MAX_PACKET_LEN) - { - backend->set_large_packet(true); - } - else if (backend->is_large_packet()) - { - backend->set_large_packet(false); - } - else if (is_eof(buffer, len)) - { - n_eof++; - } + bool more = false; + modutil_state state = {backend->is_large_packet()}; + int n_old_eof = backend->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0; + int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &state); + backend->set_large_packet(state.state); if (n_eof == 0) { @@ -624,7 +566,7 @@ bool reply_is_complete(SRWBackend& backend, GWBUF *buffer) LOG_RS(backend, REPLY_STATE_DONE); backend->set_reply_state(REPLY_STATE_DONE); - if (more_results_exist(buffer)) + if (more) { /** The server will send more resultsets */ LOG_RS(backend, REPLY_STATE_START); @@ -1183,10 +1125,6 @@ static void clientReply(MXS_ROUTER *instance, GWBUF *writebuf, DCB *backend_dcb) { - ss_dassert((GWBUF_IS_CONTIGUOUS(writebuf) && - MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(writebuf)) + - MYSQL_HEADER_LEN == gwbuf_length(writebuf)) || - GWBUF_IS_COLLECTED_RESULT(writebuf)); RWSplitSession *rses = (RWSplitSession *)router_session; DCB *client_dcb = backend_dcb->session->client_dcb; CHK_CLIENT_RSES(rses); @@ -1280,7 +1218,7 @@ static void clientReply(MXS_ROUTER *instance, */ static uint64_t getCapabilities(MXS_ROUTER* instance) { - return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_STMT_OUTPUT; + return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_PACKET_OUTPUT; } /** @@ -1431,7 +1369,7 @@ MXS_MODULE *MXS_CREATE_MODULE() MXS_MODULE_API_ROUTER, MXS_MODULE_GA, MXS_ROUTER_VERSION, "A Read/Write splitting router for enhancement read scalability", "V1.1.0", - RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_STMT_OUTPUT, + RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_PACKET_OUTPUT, &MyObject, NULL, /* Process init. */ NULL, /* Process finish. */