From 28609a2c776b552f94ced6f77e10cb20bdae25b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Mon, 16 Jul 2018 19:43:06 +0300 Subject: [PATCH] Remove session command processing from mariadbbackend With the removal of the old session command implementation, the code that used it can be removed or replaced with newer constructs. As a result, the backend protocol no longer does any session command processing. The three buffer types, GWBUF_TYPE_SESCMD_RESPONSE, GWBUF_TYPE_RESPONSE_END and GWBUF_TYPE_SESCMD as well as their related macros are no longer used and can be removed. --- include/maxscale/buffer.h | 28 +-- server/core/backend.cc | 7 +- .../MySQL/mariadbbackend/mysql_backend.cc | 235 +----------------- .../readwritesplit/rwsplit_session_cmd.cc | 111 ++++----- .../schemarouter/schemaroutersession.cc | 51 +--- 5 files changed, 87 insertions(+), 345 deletions(-) diff --git a/include/maxscale/buffer.h b/include/maxscale/buffer.h index ab13d7c68..46b53bbf2 100644 --- a/include/maxscale/buffer.h +++ b/include/maxscale/buffer.h @@ -51,25 +51,19 @@ typedef struct buf_property typedef enum { - GWBUF_TYPE_UNDEFINED = 0x00, - GWBUF_TYPE_SESCMD_RESPONSE = 0x01, - GWBUF_TYPE_RESPONSE_END = 0x02, - GWBUF_TYPE_SESCMD = 0x04, - GWBUF_TYPE_HTTP = 0x08, - GWBUF_TYPE_IGNORABLE = 0x10, - GWBUF_TYPE_COLLECT_RESULT = 0x20, - GWBUF_TYPE_RESULT = 0x40, - GWBUF_TYPE_REPLY_OK = 0x80, + GWBUF_TYPE_UNDEFINED = 0, + GWBUF_TYPE_HTTP = (1 << 0), + GWBUF_TYPE_IGNORABLE = (1 << 1), + GWBUF_TYPE_COLLECT_RESULT = (1 << 2), + GWBUF_TYPE_RESULT = (1 << 3), + GWBUF_TYPE_REPLY_OK = (1 << 4), } gwbuf_type_t; -#define GWBUF_IS_TYPE_UNDEFINED(b) (b->gwbuf_type == 0) -#define GWBUF_IS_TYPE_SESCMD_RESPONSE(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD_RESPONSE) -#define GWBUF_IS_TYPE_RESPONSE_END(b) (b->gwbuf_type & GWBUF_TYPE_RESPONSE_END) -#define GWBUF_IS_TYPE_SESCMD(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD) -#define GWBUF_IS_IGNORABLE(b) (b->gwbuf_type & GWBUF_TYPE_IGNORABLE) -#define GWBUF_IS_COLLECTED_RESULT(b) (b->gwbuf_type & GWBUF_TYPE_RESULT) -#define GWBUF_SHOULD_COLLECT_RESULT(b) (b->gwbuf_type & GWBUF_TYPE_COLLECT_RESULT) -#define GWBUF_IS_REPLY_OK(b) (b->gwbuf_type & GWBUF_TYPE_REPLY_OK) +#define GWBUF_IS_TYPE_UNDEFINED(b) ((b)->gwbuf_type == 0) +#define GWBUF_IS_IGNORABLE(b) ((b)->gwbuf_type & GWBUF_TYPE_IGNORABLE) +#define GWBUF_IS_COLLECTED_RESULT(b) ((b)->gwbuf_type & GWBUF_TYPE_RESULT) +#define GWBUF_SHOULD_COLLECT_RESULT(b) ((b)->gwbuf_type & GWBUF_TYPE_COLLECT_RESULT) +#define GWBUF_IS_REPLY_OK(b) ((b)->gwbuf_type & GWBUF_TYPE_REPLY_OK) typedef enum { diff --git a/server/core/backend.cc b/server/core/backend.cc index 0dd202849..50d68cc51 100644 --- a/server/core/backend.cc +++ b/server/core/backend.cc @@ -103,16 +103,13 @@ bool Backend::execute_session_command() break; case MXS_COM_CHANGE_USER: - /** This makes it possible to handle replies correctly */ - gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); rval = auth(buffer); break; case MXS_COM_QUERY: default: - // TODO: Remove use of GWBUF_TYPE_SESCMD - //Mark session command buffer, it triggers writing MySQL command to protocol - gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); + // We want the complete response in one packet + gwbuf_set_type(buffer, GWBUF_TYPE_COLLECT_RESULT); rval = write(buffer, EXPECT_RESPONSE); ss_dassert(is_waiting_result()); break; diff --git a/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc b/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc index fef393f07..4324e2118 100644 --- a/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc +++ b/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc @@ -976,43 +976,16 @@ gw_read_and_write(DCB *dcb) do { GWBUF *stmt = NULL; - /** - * If protocol has session command set, concatenate whole - * response into one buffer. - */ - if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, true) != MXS_COM_UNDEFINED) + + if (result_collected) { - if (result_collected) - { - /** The result set or PS response was collected, we know it's complete */ - stmt = read_buffer; - read_buffer = NULL; - gwbuf_set_type(stmt, GWBUF_TYPE_RESPONSE_END | GWBUF_TYPE_SESCMD_RESPONSE); - } - else - { - stmt = process_response_data(dcb, &read_buffer, gwbuf_length(read_buffer)); - /** - * Received incomplete response to session command. - * Store it to readqueue and return. - */ - if (!sescmd_response_complete(dcb)) - { - stmt = gwbuf_append(stmt, read_buffer); - dcb_readq_prepend(dcb, stmt); - return 0; - } - } - if (!stmt) - { - MXS_ERROR("Read buffer unexpectedly null, even though response " - "not marked as complete. User: %s", dcb->session->client_dcb->user); - return 0; - } + /** The result set or PS response was collected, we know it's complete */ + stmt = read_buffer; + read_buffer = NULL; + gwbuf_set_type(stmt, GWBUF_TYPE_RESULT); } else if (rcap_type_required(capabilities, RCAP_TYPE_STMT_OUTPUT) && - !rcap_type_required(capabilities, RCAP_TYPE_RESULTSET_OUTPUT) && - !result_collected) + !rcap_type_required(capabilities, RCAP_TYPE_RESULTSET_OUTPUT)) { stmt = modutil_get_next_MySQL_packet(&read_buffer); @@ -1664,195 +1637,6 @@ retblock: return rv; } -/** - * Move packets or parts of packets from readbuf to outbuf as the packet headers - * and lengths have been noticed and counted. - * Session commands need to be marked so that they can be handled properly in - * the router's clientReply. - * - * @param dcb Backend's DCB where data was read from - * @param readbuf GWBUF where data was read to - * @param nbytes_to_process Number of bytes that has been read and need to be processed - * - * @return GWBUF which includes complete MySQL packet - */ -static GWBUF* process_response_data(DCB* dcb, - GWBUF** readbuf, - int nbytes_to_process) -{ - int npackets_left = 0; /*< response's packet count */ - int nbytes_left = 0; /*< nbytes to be read for the packet */ - MySQLProtocol* p; - GWBUF* outbuf = NULL; - int initial_packets = npackets_left; - int initial_bytes = nbytes_left; - - /** Get command which was stored in gw_MySQLWrite_backend */ - p = DCB_PROTOCOL(dcb, MySQLProtocol); - CHK_PROTOCOL(p); - - /** All buffers processed here are sescmd responses */ - gwbuf_set_type(*readbuf, GWBUF_TYPE_SESCMD_RESPONSE); - - /** - * Now it is known how many packets there should be and how much - * is read earlier. - */ - while (nbytes_to_process != 0) - { - mxs_mysql_cmd_t srvcmd; - bool succp; - - srvcmd = protocol_get_srv_command(p, false); - - MXS_DEBUG("Read command %s for DCB %p fd %d.", STRPACKETTYPE(srvcmd), dcb, dcb->fd); - /** - * Read values from protocol structure, fails if values are - * uninitialized. - */ - if (npackets_left == 0) - { - size_t bytes; // nbytes_left is int, but the type must be size_t. - succp = protocol_get_response_status(p, &npackets_left, &bytes); - nbytes_left = bytes; - - if (!succp || npackets_left == 0) - { - /** - * Examine command type and the readbuf. Conclude response - * packet count from the command type or from the first - * packet content. Fails if read buffer doesn't include - * enough data to read the packet length. - */ - init_response_status(*readbuf, srvcmd, &npackets_left, &bytes); - nbytes_left = bytes; - } - - initial_packets = npackets_left; - initial_bytes = nbytes_left; - } - /** Only session commands with responses should be processed */ - ss_dassert(npackets_left > 0); - - /** Read incomplete packet. */ - if ((int)nbytes_left > nbytes_to_process) - { - /** Includes length info so it can be processed */ - if (nbytes_to_process >= 5) - { - /** discard source buffer */ - *readbuf = gwbuf_consume(*readbuf, GWBUF_LENGTH(*readbuf)); - nbytes_left -= nbytes_to_process; - } - nbytes_to_process = 0; - } - /** Packet was read. All bytes belonged to the last packet. */ - else if ((int)nbytes_left == nbytes_to_process) - { - nbytes_left = 0; - nbytes_to_process = 0; - ss_dassert(npackets_left > 0); - npackets_left -= 1; - outbuf = gwbuf_append(outbuf, *readbuf); - *readbuf = NULL; - } - /** - * Buffer contains more data than we need. Split the complete packet and - * the extra data into two separate buffers. - */ - else - { - ss_dassert((int)nbytes_left < nbytes_to_process); - ss_dassert(nbytes_left > 0); - ss_dassert(npackets_left > 0); - outbuf = gwbuf_append(outbuf, gwbuf_split(readbuf, nbytes_left)); - nbytes_to_process -= nbytes_left; - npackets_left -= 1; - nbytes_left = 0; - } - - /** Store new status to protocol structure */ - protocol_set_response_status(p, npackets_left, nbytes_left); - - /** A complete packet was read */ - if (nbytes_left == 0) - { - /** No more packets in this response */ - if (npackets_left == 0 && outbuf != NULL) - { - GWBUF* b = outbuf; - - while (b->next != NULL) - { - b = b->next; - } - /** Mark last as end of response */ - gwbuf_set_type(b, GWBUF_TYPE_RESPONSE_END); - - /** Archive the command */ - protocol_archive_srv_command(p); - - /** Ignore the rest of the response */ - nbytes_to_process = 0; - } - /** Read next packet */ - else - { - uint8_t* data; - - /** Read next packet length if there is at least - * three bytes left. If there is less than three - * bytes in the buffer or it is NULL, we need to - wait for more data from the backend server.*/ - if (*readbuf == NULL || gwbuf_length(*readbuf) < 3) - { - MXS_DEBUG("[%s] Read %d packets. Waiting for %d more " - "packets for a total of %d packets.", __FUNCTION__, - initial_packets - npackets_left, - npackets_left, initial_packets); - - /** Store the already read data into the readqueue of the DCB - * and restore the response status to the initial number of packets */ - - dcb_readq_prepend(dcb, outbuf); - - protocol_set_response_status(p, initial_packets, initial_bytes); - return NULL; - } - uint8_t packet_len[3]; - gwbuf_copy_data(*readbuf, 0, 3, packet_len); - nbytes_left = gw_mysql_get_byte3(packet_len) + MYSQL_HEADER_LEN; - /** Store new status to protocol structure */ - protocol_set_response_status(p, npackets_left, nbytes_left); - } - } - } - return outbuf; -} - -static bool sescmd_response_complete(DCB* dcb) -{ - int npackets_left; - size_t nbytes_left; - MySQLProtocol* p; - bool succp; - - p = DCB_PROTOCOL(dcb, MySQLProtocol); - CHK_PROTOCOL(p); - - protocol_get_response_status(p, &npackets_left, &nbytes_left); - - if (npackets_left == 0) - { - succp = true; - } - else - { - succp = false; - } - return succp; -} - /** * Create COM_CHANGE_USER packet and store it to GWBUF * @@ -1931,11 +1715,6 @@ gw_create_change_user_packet(MYSQL_session* mses, bytes += 4; buffer = gwbuf_alloc(bytes); - /** - * Set correct type to GWBUF so that it will be handled like session - * commands - */ - buffer->gwbuf_type = GWBUF_TYPE_SESCMD; payload = GWBUF_DATA(buffer); memset(payload, '\0', bytes); payload_start = payload; diff --git a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc index c913a84d0..28aad59ed 100644 --- a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc +++ b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc @@ -75,76 +75,73 @@ void RWSplitSession::process_sescmd_response(SRWBackend& backend, GWBUF** ppPack { if (backend->has_session_commands()) { - /** We are executing a session command */ - if (GWBUF_IS_TYPE_SESCMD_RESPONSE((*ppPacket))) + ss_dassert(GWBUF_IS_COLLECTED_RESULT(*ppPacket)); + uint8_t cmd; + gwbuf_copy_data(*ppPacket, MYSQL_HEADER_LEN, 1, &cmd); + uint8_t command = backend->next_session_command()->get_command(); + mxs::SSessionCommand sescmd = backend->next_session_command(); + uint64_t id = backend->complete_session_command(); + MXS_PS_RESPONSE resp = {}; + bool discard = true; + + if (command == MXS_COM_STMT_PREPARE && cmd != MYSQL_REPLY_ERR) { - uint8_t cmd; - gwbuf_copy_data(*ppPacket, MYSQL_HEADER_LEN, 1, &cmd); - uint8_t command = backend->next_session_command()->get_command(); - mxs::SSessionCommand sescmd = backend->next_session_command(); - uint64_t id = backend->complete_session_command(); - MXS_PS_RESPONSE resp = {}; - bool discard = true; + // This should never fail or the backend protocol is broken + ss_debug(bool b = )mxs_mysql_extract_ps_response(*ppPacket, &resp); + ss_dassert(b); + backend->add_ps_handle(id, resp.id); + } - if (command == MXS_COM_STMT_PREPARE && cmd != MYSQL_REPLY_ERR) + if (m_recv_sescmd < m_sent_sescmd && id == m_recv_sescmd + 1) + { + if (!m_current_master || !m_current_master->in_use() || // Session doesn't have a master + m_current_master == backend) // This is the master's response { - // This should never fail or the backend protocol is broken - ss_debug(bool b = )mxs_mysql_extract_ps_response(*ppPacket, &resp); - ss_dassert(b); - backend->add_ps_handle(id, resp.id); - } + /** First reply to this session command, route it to the client */ + ++m_recv_sescmd; + discard = false; - if (m_recv_sescmd < m_sent_sescmd && id == m_recv_sescmd + 1) - { - if (!m_current_master || !m_current_master->in_use() || // Session doesn't have a master - m_current_master == backend) // This is the master's response + /** Store the master's response so that the slave responses can + * be compared to it */ + m_sescmd_responses[id] = cmd; + + if (cmd == MYSQL_REPLY_ERR) { - /** First reply to this session command, route it to the client */ - ++m_recv_sescmd; - discard = false; - - /** Store the master's response so that the slave responses can - * be compared to it */ - m_sescmd_responses[id] = cmd; - - if (cmd == MYSQL_REPLY_ERR) - { - MXS_INFO("Session command no. %lu failed: %s", - id, extract_error(*ppPacket).c_str()); - } - else if (command == MXS_COM_STMT_PREPARE) - { - /** Map the returned response to the internal ID */ - MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id); - m_qc.ps_id_internal_put(resp.id, id); - } - - // Discard any slave connections that did not return the same result - for (SlaveResponseList::iterator it = m_slave_responses.begin(); - it != m_slave_responses.end(); it++) - { - discard_if_response_differs(it->first, cmd, it->second, sescmd); - } - - m_slave_responses.clear(); + MXS_INFO("Session command no. %lu failed: %s", + id, extract_error(*ppPacket).c_str()); } - else + else if (command == MXS_COM_STMT_PREPARE) { - /** Record slave command so that the response can be validated - * against the master's response when it arrives. */ - m_slave_responses.push_back(std::make_pair(backend, cmd)); + /** Map the returned response to the internal ID */ + MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id); + m_qc.ps_id_internal_put(resp.id, id); } + + // Discard any slave connections that did not return the same result + for (SlaveResponseList::iterator it = m_slave_responses.begin(); + it != m_slave_responses.end(); it++) + { + discard_if_response_differs(it->first, cmd, it->second, sescmd); + } + + m_slave_responses.clear(); } else { - discard_if_response_differs(backend, m_sescmd_responses[id], cmd, sescmd); + /** Record slave command so that the response can be validated + * against the master's response when it arrives. */ + m_slave_responses.push_back(std::make_pair(backend, cmd)); } + } + else + { + discard_if_response_differs(backend, m_sescmd_responses[id], cmd, sescmd); + } - if (discard) - { - gwbuf_free(*ppPacket); - *ppPacket = NULL; - } + if (discard) + { + gwbuf_free(*ppPacket); + *ppPacket = NULL; } } } diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index e7552b54f..0347558ee 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -501,23 +501,20 @@ void SchemaRouterSession::process_sescmd_response(SSRBackend& bref, GWBUF** ppPa { if (bref->has_session_commands()) { - /** We are executing a session command */ - if (GWBUF_IS_TYPE_SESCMD_RESPONSE((*ppPacket))) - { - uint64_t id = bref->complete_session_command(); + ss_dassert(GWBUF_IS_COLLECTED_RESULT(*ppPacket)); + uint64_t id = bref->complete_session_command(); - if (m_replied_sescmd < m_sent_sescmd && id == m_replied_sescmd + 1) - { - /** First reply to this session command, route it to the client */ - ++m_replied_sescmd; - } - else - { - /** The reply to this session command has already been sent to - * the client, discard it */ - gwbuf_free(*ppPacket); - *ppPacket = NULL; - } + if (m_replied_sescmd < m_sent_sescmd && id == m_replied_sescmd + 1) + { + /** First reply to this session command, route it to the client */ + ++m_replied_sescmd; + } + else + { + /** The reply to this session command has already been sent to + * the client, discard it */ + gwbuf_free(*ppPacket); + *ppPacket = NULL; } } } @@ -1096,28 +1093,6 @@ int SchemaRouterSession::inspect_mapping_states(SSRBackend& bref, return mapped ? 1 : 0; } -/** - * Create a fake error message from a DCB. - * @param fail_str Custom error message - * @param dcb DCB to use as the origin of the error - */ -void create_error_reply(char* fail_str, DCB* dcb) -{ - MXS_INFO("change_current_db: failed to change database: %s", fail_str); - GWBUF* errbuf = modutil_create_mysql_err_msg(1, 0, 1049, "42000", fail_str); - - if (errbuf == NULL) - { - MXS_ERROR("Creating buffer for error message failed."); - return; - } - /** Set flags that help router to identify session commands reply */ - gwbuf_set_type(errbuf, GWBUF_TYPE_SESCMD_RESPONSE); - gwbuf_set_type(errbuf, GWBUF_TYPE_RESPONSE_END); - - poll_add_epollin_event_to_dcb(dcb, errbuf); -} - /** * Read new database name from COM_INIT_DB packet or a literal USE ... COM_QUERY * packet, check that it exists in the hashtable and copy its name to MYSQL_session.