From 52b7fb93405054c2f719fd776157298eefe831b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Thu, 22 Jun 2017 13:16:12 +0300 Subject: [PATCH] MXS-852: Handle one-way session commands Session commands that will not return a response can be completed immediately. This requires some special code in the readwritesplit Backend class implementation as well as a small addition to the Backend class itself. Since not all commands expect a response from the server, the queued query routing function needs some adjustment. The routing of queued queries should be attempted until a command which expects a response is found or the queue is empty. By properly handling these types of session commands, the router can enable the execution of COM_STMT_CLOSE and COM_STMT_RESET on all servers. This will prevent resource leakages in the server and allow proper handling of COM_STMT type command. --- include/maxscale/protocol/mysql.h | 9 ++++++++ server/core/backend.cc | 2 ++ server/modules/protocol/MySQL/mysql_common.c | 7 +++++++ .../routing/readwritesplit/readwritesplit.cc | 18 +++++++++++++--- .../routing/readwritesplit/readwritesplit.hh | 3 ++- .../readwritesplit/rwsplit_internal.hh | 1 - .../routing/readwritesplit/rwsplit_mysql.cc | 21 ------------------- .../readwritesplit/rwsplit_route_stmt.cc | 13 ++++++++++-- 8 files changed, 46 insertions(+), 28 deletions(-) diff --git a/include/maxscale/protocol/mysql.h b/include/maxscale/protocol/mysql.h index 8ece64bfe..90795e36b 100644 --- a/include/maxscale/protocol/mysql.h +++ b/include/maxscale/protocol/mysql.h @@ -547,4 +547,13 @@ bool mxs_mysql_extract_ps_response(GWBUF* buffer, MXS_PS_RESPONSE* out); */ uint32_t mxs_mysql_extract_ps_id(GWBUF* buffer); +/** + * @brief Determine if a packet contains a one way message + * + * @param cmd Command to inspect + * + * @return True if a response is expected from the server + */ +bool mxs_mysql_command_will_respond(uint8_t cmd); + MXS_END_DECLS diff --git a/server/core/backend.cc b/server/core/backend.cc index ca373e38e..3ab16f860 100644 --- a/server/core/backend.cc +++ b/server/core/backend.cc @@ -93,7 +93,9 @@ bool Backend::execute_session_command() { case MYSQL_COM_QUIT: case MYSQL_COM_STMT_CLOSE: + /** These commands do not generate responses */ rval = write(buffer, NO_RESPONSE); + complete_session_command(); break; case MYSQL_COM_CHANGE_USER: diff --git a/server/modules/protocol/MySQL/mysql_common.c b/server/modules/protocol/MySQL/mysql_common.c index 839e313e0..767f0a46d 100644 --- a/server/modules/protocol/MySQL/mysql_common.c +++ b/server/modules/protocol/MySQL/mysql_common.c @@ -1649,3 +1649,10 @@ uint32_t mxs_mysql_extract_ps_id(GWBUF* buffer) return rval; } + +bool mxs_mysql_command_will_respond(uint8_t cmd) +{ + return cmd != MYSQL_COM_STMT_SEND_LONG_DATA && + cmd != MYSQL_COM_QUIT && + cmd != MYSQL_COM_STMT_CLOSE; +} diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index fcb253af8..20d6acf57 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -547,7 +547,10 @@ bool route_stored_query(ROUTER_CLIENT_SES *rses) { bool rval = true; - if (rses->query_queue) + /** Loop over the stored statements as long as the routeQuery call doesn't + * append more data to the queue. If it appends data to the queue, we need + * to wait for a response before attempting another reroute */ + while (rses->query_queue) { GWBUF* query_queue = modutil_get_next_MySQL_packet(&rses->query_queue); query_queue = gwbuf_make_contiguous(query_queue); @@ -574,8 +577,17 @@ bool route_stored_query(ROUTER_CLIENT_SES *rses) gwbuf_free(query_queue); } - ss_dassert(rses->query_queue == NULL); - rses->query_queue = temp_storage; + if (rses->query_queue == NULL) + { + /** Query successfully routed and no responses are expected */ + rses->query_queue = temp_storage; + } + else + { + /** Routing was stopped, we need to wait for a response before retrying */ + rses->query_queue = gwbuf_append(temp_storage, rses->query_queue); + break; + } } return rval; diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index dafc5752e..90a829d8a 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -195,9 +195,10 @@ public: bool execute_session_command() { + bool expect_response = mxs_mysql_command_will_respond(next_session_command()->get_command()); bool rval = mxs::Backend::execute_session_command(); - if (rval) + if (rval && expect_response) { set_reply_state(REPLY_STATE_START); } diff --git a/server/modules/routing/readwritesplit/rwsplit_internal.hh b/server/modules/routing/readwritesplit/rwsplit_internal.hh index 33f15a6d2..b4b2c68bb 100644 --- a/server/modules/routing/readwritesplit/rwsplit_internal.hh +++ b/server/modules/routing/readwritesplit/rwsplit_internal.hh @@ -45,7 +45,6 @@ bool handle_target_is_all(route_target_t route_target, GWBUF *querybuf, int packet_type, uint32_t qtype); uint8_t determine_packet_type(GWBUF *querybuf, bool *non_empty_packet); void log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint32_t qtype); -bool command_will_respond(uint8_t packet_type); bool is_packet_a_query(int packet_type); bool send_readonly_error(DCB *dcb); diff --git a/server/modules/routing/readwritesplit/rwsplit_mysql.cc b/server/modules/routing/readwritesplit/rwsplit_mysql.cc index cd89e0147..a285d4c24 100644 --- a/server/modules/routing/readwritesplit/rwsplit_mysql.cc +++ b/server/modules/routing/readwritesplit/rwsplit_mysql.cc @@ -104,27 +104,6 @@ is_packet_a_query(int packet_type) return (packet_type == MYSQL_COM_QUERY); } -/* - * This looks MySQL specific - */ -/** - * @brief Determine if a packet contains a one way message - * - * Packet type tells us this, but in a DB specific way. This function is - * provided so that code that is not DB specific can find out whether a packet - * contains a one way messsage. Clearly, to be effective different functions must be - * called for different DB types. - * - * @param packet_type Type of packet (integer) - * @return bool indicating whether packet contains a one way message - */ -bool command_will_respond(uint8_t packet_type) -{ - return packet_type != MYSQL_COM_STMT_SEND_LONG_DATA && - packet_type != MYSQL_COM_QUIT && - packet_type != MYSQL_COM_STMT_CLOSE; -} - /* * This one is problematic because it is MySQL specific, but also router * specific. diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index b3778a11b..e3fe66b79 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -270,7 +270,7 @@ bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, /** The SessionCommand takes ownership of the buffer */ uint64_t id = rses->sescmd_count++; mxs::SSessionCommand sescmd(new mxs::SessionCommand(querybuf, id)); - bool expecting_response = command_will_respond(command); + bool expecting_response = mxs_mysql_command_will_respond(command); int nsucc = 0; uint64_t lowest_pos = id; @@ -349,6 +349,13 @@ bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, if (nsucc) { rses->sent_sescmd = id; + + if (!expecting_response) + { + /** The command doesn't generate a response so we increment the + * completed session command count */ + rses->recv_sescmd++; + } } return nsucc; @@ -552,7 +559,9 @@ route_target_t get_route_target(ROUTER_CLIENT_SES *rses, uint8_t command, target = TARGET_MASTER; } else if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) || - qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT)) + qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT) || + command == MYSQL_COM_STMT_CLOSE || + command == MYSQL_COM_STMT_RESET) { target = TARGET_ALL; }