From d6c44aaf5279a96bfe57ddddee7dbf461e32c7c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Wed, 2 May 2018 17:47:59 +0300 Subject: [PATCH] MXS-1804: Allow large session commands Session commands that span multiple packets are now allowed and will work. However, if one is executed the session command history is disabled as no interface for appending to session commands exists. The backend protocol modules now also correctly track the current command. This was a pre-requisite for large session commands as they needed to be gathered into a single buffer and to do this the current command had to be accurate. Updated tests to expect success instead of failure for large prepared statements. --- include/maxscale/protocol/mysql.h | 1 + maxscale-system-test/mxs1804_long_ps_hang.cpp | 2 +- .../MySQL/mariadbbackend/mysql_backend.cc | 14 +++++++- server/modules/protocol/MySQL/mysql_common.cc | 1 + .../routing/readwritesplit/rwbackend.cc | 5 +++ .../routing/readwritesplit/rwbackend.hh | 1 + .../routing/readwritesplit/rwsplit_mysql.cc | 13 +++++++- .../readwritesplit/rwsplit_route_stmt.cc | 33 ++++++++----------- .../routing/readwritesplit/rwsplitsession.cc | 5 ++- .../routing/readwritesplit/rwsplitsession.hh | 13 ++++++++ 10 files changed, 64 insertions(+), 24 deletions(-) diff --git a/include/maxscale/protocol/mysql.h b/include/maxscale/protocol/mysql.h index dafdf31b5..303ad016d 100644 --- a/include/maxscale/protocol/mysql.h +++ b/include/maxscale/protocol/mysql.h @@ -355,6 +355,7 @@ typedef struct bool collect_result; /*< Collect the next result set as one buffer */ bool changing_user; uint32_t num_eof_packets; /*< Encountered eof packet number, used for check packet type */ + bool large_query; /*< Whether to ignore the command byte of the next packet*/ #if defined(SS_DEBUG) skygw_chk_t protocol_chk_tail; #endif diff --git a/maxscale-system-test/mxs1804_long_ps_hang.cpp b/maxscale-system-test/mxs1804_long_ps_hang.cpp index 2479bee33..4d4b18b16 100644 --- a/maxscale-system-test/mxs1804_long_ps_hang.cpp +++ b/maxscale-system-test/mxs1804_long_ps_hang.cpp @@ -33,7 +33,7 @@ int main(int argc, char** argv) test.maxscales->connect(); MYSQL_STMT* stmt = mysql_stmt_init(test.maxscales->conn_rwsplit[0]); - test.assert(mysql_stmt_prepare(stmt, sqlstr, strlen(sqlstr)) != 0, "Prepare should fail in 2.2 but not hang", + test.assert(mysql_stmt_prepare(stmt, sqlstr, strlen(sqlstr)) == 0, "Prepare should not fail", mysql_stmt_error(stmt)); mysql_stmt_close(stmt); diff --git a/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc b/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc index 962146369..7b1d5bea4 100644 --- a/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc +++ b/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc @@ -409,7 +409,19 @@ static inline void prepare_for_write(DCB *dcb, GWBUF *buffer) */ if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT)) { - proto->current_command = (mxs_mysql_cmd_t)MYSQL_GET_COMMAND(GWBUF_DATA(buffer)); + uint8_t* data = GWBUF_DATA(buffer); + + if (!proto->large_query) + { + proto->current_command = (mxs_mysql_cmd_t)MYSQL_GET_COMMAND(data); + } + + /** + * If the buffer contains a large query, we have to skip the command + * byte extraction for the next packet. This way current_command always + * contains the latest command executed on this backend. + */ + proto->large_query = MYSQL_GET_PAYLOAD_LEN(data) == MYSQL_PACKET_LENGTH_MAX; } else if (dcb->session->client_dcb && dcb->session->client_dcb->protocol) { diff --git a/server/modules/protocol/MySQL/mysql_common.cc b/server/modules/protocol/MySQL/mysql_common.cc index 3c4470420..14a3b2466 100644 --- a/server/modules/protocol/MySQL/mysql_common.cc +++ b/server/modules/protocol/MySQL/mysql_common.cc @@ -73,6 +73,7 @@ MySQLProtocol* mysql_protocol_init(DCB* dcb, int fd) p->collect_result = false; p->changing_user = false; p->num_eof_packets = 0; + p->large_query = false; #if defined(SS_DEBUG) p->protocol_chk_top = CHK_NUM_PROTOCOL; p->protocol_chk_tail = CHK_NUM_PROTOCOL; diff --git a/server/modules/routing/readwritesplit/rwbackend.cc b/server/modules/routing/readwritesplit/rwbackend.cc index b7b1e32ad..d962ac22d 100644 --- a/server/modules/routing/readwritesplit/rwbackend.cc +++ b/server/modules/routing/readwritesplit/rwbackend.cc @@ -35,6 +35,11 @@ bool RWBackend::execute_session_command() return rval; } +bool RWBackend::continue_session_command(GWBUF* buffer) +{ + return Backend::write(buffer, NO_RESPONSE); +} + void RWBackend::add_ps_handle(uint32_t id, uint32_t handle) { m_ps_handles[id] = handle; diff --git a/server/modules/routing/readwritesplit/rwbackend.hh b/server/modules/routing/readwritesplit/rwbackend.hh index bb0edbe04..4db872016 100644 --- a/server/modules/routing/readwritesplit/rwbackend.hh +++ b/server/modules/routing/readwritesplit/rwbackend.hh @@ -64,6 +64,7 @@ public: uint32_t get_ps_handle(uint32_t id) const; bool execute_session_command(); + bool continue_session_command(GWBUF* buffer); bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE); void close(close_type type = CLOSE_NORMAL); diff --git a/server/modules/routing/readwritesplit/rwsplit_mysql.cc b/server/modules/routing/readwritesplit/rwsplit_mysql.cc index 32906365d..96bd254ad 100644 --- a/server/modules/routing/readwritesplit/rwsplit_mysql.cc +++ b/server/modules/routing/readwritesplit/rwsplit_mysql.cc @@ -83,6 +83,7 @@ bool RWSplitSession::handle_target_is_all(route_target_t route_target, GWBUF *qu int packet_type, uint32_t qtype) { bool result = false; + bool is_large = is_large_query(querybuf); if (TARGET_IS_MASTER(route_target) || TARGET_IS_SLAVE(route_target)) { @@ -111,13 +112,23 @@ bool RWSplitSession::handle_target_is_all(route_target_t route_target, GWBUF *qu MXS_FREE(query_str); MXS_FREE(qtype_str); } + else if (m_qc.large_query()) + { + // TODO: Append to the already stored session command instead of disabling history + MXS_INFO("Large session write, have to disable session command history"); + m_config.disable_sescmd_history = true; + + continue_large_session_write(querybuf, qtype); + result = true; + } else if (route_session_write(gwbuf_clone(querybuf), packet_type, qtype)) { - result = true; atomic_add_uint64(&m_router->stats().n_all, 1); } + m_qc.set_large_query(is_large); + return result; } diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 796e6f2e1..843e6d83f 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -159,7 +159,6 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf) if (TARGET_IS_ALL(route_target)) { - // TODO: Handle payloads larger than (2^24 - 1) bytes that are routed to all servers succp = handle_target_is_all(route_target, querybuf, command, qtype); } else @@ -263,18 +262,6 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf) return succp; } -static inline bool is_large_query(GWBUF* buf) -{ - uint32_t buflen = gwbuf_length(buf); - - // The buffer should contain at most (2^24 - 1) + 4 bytes ... - ss_dassert(buflen <= MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN); - // ... and the payload should be buflen - 4 bytes - ss_dassert(MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buf)) == buflen - MYSQL_HEADER_LEN); - - return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN; -} - /** * Purge session command history * @@ -322,6 +309,19 @@ void RWSplitSession::purge_history(mxs::SSessionCommand& sescmd) } } +void RWSplitSession::continue_large_session_write(GWBUF *querybuf, uint32_t type) +{ + for (auto it = m_backends.begin(); it != m_backends.end(); it++) + { + SRWBackend& backend = *it; + + if (backend->in_use()) + { + backend->continue_session_command(gwbuf_clone(querybuf)); + } + } +} + /** * Execute in backends used by current router session. * Save session variable commands to router session property @@ -343,13 +343,6 @@ void RWSplitSession::purge_history(mxs::SSessionCommand& sescmd) */ bool RWSplitSession::route_session_write(GWBUF *querybuf, uint8_t command, uint32_t type) { - if (is_large_query(querybuf)) - { - MXS_ERROR("Session command is too large, session cannot continue. " - "Large session commands are not supported in 2.2."); - return false; - } - /** The SessionCommand takes ownership of the buffer */ uint64_t id = m_sescmd_count++; mxs::SSessionCommand sescmd(new mxs::SessionCommand(querybuf, id)); diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index bf2021003..4a01c3699 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -141,7 +141,10 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf) current_target = QueryClassifier::CURRENT_TARGET_SLAVE; } - m_qc.update_route_info(current_target, querybuf); + if (!m_qc.large_query()) + { + m_qc.update_route_info(current_target, querybuf); + } /** No active or pending queries */ if (route_single_stmt(querybuf)) diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index d476bc01f..61f82dda3 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -151,6 +151,7 @@ private: void purge_history(mxs::SSessionCommand& sescmd); bool route_session_write(GWBUF *querybuf, uint8_t command, uint32_t type); + void continue_large_session_write(GWBUF *querybuf, uint32_t type); bool route_single_stmt(GWBUF *querybuf); bool route_stored_query(); @@ -217,6 +218,18 @@ private: { return !m_config.disable_sescmd_history || m_recv_sescmd == 0; } + + inline bool is_large_query(GWBUF* buf) + { + uint32_t buflen = gwbuf_length(buf); + + // The buffer should contain at most (2^24 - 1) + 4 bytes ... + ss_dassert(buflen <= MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN); + // ... and the payload should be buflen - 4 bytes + ss_dassert(MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buf)) == buflen - MYSQL_HEADER_LEN); + + return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN; + } }; /**