diff --git a/include/maxscale/protocol/mysql.h b/include/maxscale/protocol/mysql.h index dafdf31b5..303ad016d 100644 --- a/include/maxscale/protocol/mysql.h +++ b/include/maxscale/protocol/mysql.h @@ -355,6 +355,7 @@ typedef struct bool collect_result; /*< Collect the next result set as one buffer */ bool changing_user; uint32_t num_eof_packets; /*< Encountered eof packet number, used for check packet type */ + bool large_query; /*< Whether to ignore the command byte of the next packet*/ #if defined(SS_DEBUG) skygw_chk_t protocol_chk_tail; #endif diff --git a/maxscale-system-test/mxs1804_long_ps_hang.cpp b/maxscale-system-test/mxs1804_long_ps_hang.cpp index 2479bee33..4d4b18b16 100644 --- a/maxscale-system-test/mxs1804_long_ps_hang.cpp +++ b/maxscale-system-test/mxs1804_long_ps_hang.cpp @@ -33,7 +33,7 @@ int main(int argc, char** argv) test.maxscales->connect(); MYSQL_STMT* stmt = mysql_stmt_init(test.maxscales->conn_rwsplit[0]); - test.assert(mysql_stmt_prepare(stmt, sqlstr, strlen(sqlstr)) != 0, "Prepare should fail in 2.2 but not hang", + test.assert(mysql_stmt_prepare(stmt, sqlstr, strlen(sqlstr)) == 0, "Prepare should not fail", mysql_stmt_error(stmt)); mysql_stmt_close(stmt); diff --git a/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc b/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc index 962146369..7b1d5bea4 100644 --- a/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc +++ b/server/modules/protocol/MySQL/mariadbbackend/mysql_backend.cc @@ -409,7 +409,19 @@ static inline void prepare_for_write(DCB *dcb, GWBUF *buffer) */ if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT)) { - proto->current_command = (mxs_mysql_cmd_t)MYSQL_GET_COMMAND(GWBUF_DATA(buffer)); + uint8_t* data = GWBUF_DATA(buffer); + + if (!proto->large_query) + { + proto->current_command = (mxs_mysql_cmd_t)MYSQL_GET_COMMAND(data); + } + + /** + * If the buffer contains a large query, we have to skip the command + * byte extraction for the next packet. This way current_command always + * contains the latest command executed on this backend. + */ + proto->large_query = MYSQL_GET_PAYLOAD_LEN(data) == MYSQL_PACKET_LENGTH_MAX; } else if (dcb->session->client_dcb && dcb->session->client_dcb->protocol) { diff --git a/server/modules/protocol/MySQL/mysql_common.cc b/server/modules/protocol/MySQL/mysql_common.cc index 3c4470420..14a3b2466 100644 --- a/server/modules/protocol/MySQL/mysql_common.cc +++ b/server/modules/protocol/MySQL/mysql_common.cc @@ -73,6 +73,7 @@ MySQLProtocol* mysql_protocol_init(DCB* dcb, int fd) p->collect_result = false; p->changing_user = false; p->num_eof_packets = 0; + p->large_query = false; #if defined(SS_DEBUG) p->protocol_chk_top = CHK_NUM_PROTOCOL; p->protocol_chk_tail = CHK_NUM_PROTOCOL; diff --git a/server/modules/routing/readwritesplit/rwbackend.cc b/server/modules/routing/readwritesplit/rwbackend.cc index b7b1e32ad..d962ac22d 100644 --- a/server/modules/routing/readwritesplit/rwbackend.cc +++ b/server/modules/routing/readwritesplit/rwbackend.cc @@ -35,6 +35,11 @@ bool RWBackend::execute_session_command() return rval; } +bool RWBackend::continue_session_command(GWBUF* buffer) +{ + return Backend::write(buffer, NO_RESPONSE); +} + void RWBackend::add_ps_handle(uint32_t id, uint32_t handle) { m_ps_handles[id] = handle; diff --git a/server/modules/routing/readwritesplit/rwbackend.hh b/server/modules/routing/readwritesplit/rwbackend.hh index bb0edbe04..4db872016 100644 --- a/server/modules/routing/readwritesplit/rwbackend.hh +++ b/server/modules/routing/readwritesplit/rwbackend.hh @@ -64,6 +64,7 @@ public: uint32_t get_ps_handle(uint32_t id) const; bool execute_session_command(); + bool continue_session_command(GWBUF* buffer); bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE); void close(close_type type = CLOSE_NORMAL); diff --git a/server/modules/routing/readwritesplit/rwsplit_mysql.cc b/server/modules/routing/readwritesplit/rwsplit_mysql.cc index 32906365d..96bd254ad 100644 --- a/server/modules/routing/readwritesplit/rwsplit_mysql.cc +++ b/server/modules/routing/readwritesplit/rwsplit_mysql.cc @@ -83,6 +83,7 @@ bool RWSplitSession::handle_target_is_all(route_target_t route_target, GWBUF *qu int packet_type, uint32_t qtype) { bool result = false; + bool is_large = is_large_query(querybuf); if (TARGET_IS_MASTER(route_target) || TARGET_IS_SLAVE(route_target)) { @@ -111,13 +112,23 @@ bool RWSplitSession::handle_target_is_all(route_target_t route_target, GWBUF *qu MXS_FREE(query_str); MXS_FREE(qtype_str); } + else if (m_qc.large_query()) + { + // TODO: Append to the already stored session command instead of disabling history + MXS_INFO("Large session write, have to disable session command history"); + m_config.disable_sescmd_history = true; + + continue_large_session_write(querybuf, qtype); + result = true; + } else if (route_session_write(gwbuf_clone(querybuf), packet_type, qtype)) { - result = true; atomic_add_uint64(&m_router->stats().n_all, 1); } + m_qc.set_large_query(is_large); + return result; } diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 796e6f2e1..843e6d83f 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -159,7 +159,6 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf) if (TARGET_IS_ALL(route_target)) { - // TODO: Handle payloads larger than (2^24 - 1) bytes that are routed to all servers succp = handle_target_is_all(route_target, querybuf, command, qtype); } else @@ -263,18 +262,6 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf) return succp; } -static inline bool is_large_query(GWBUF* buf) -{ - uint32_t buflen = gwbuf_length(buf); - - // The buffer should contain at most (2^24 - 1) + 4 bytes ... - ss_dassert(buflen <= MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN); - // ... and the payload should be buflen - 4 bytes - ss_dassert(MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buf)) == buflen - MYSQL_HEADER_LEN); - - return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN; -} - /** * Purge session command history * @@ -322,6 +309,19 @@ void RWSplitSession::purge_history(mxs::SSessionCommand& sescmd) } } +void RWSplitSession::continue_large_session_write(GWBUF *querybuf, uint32_t type) +{ + for (auto it = m_backends.begin(); it != m_backends.end(); it++) + { + SRWBackend& backend = *it; + + if (backend->in_use()) + { + backend->continue_session_command(gwbuf_clone(querybuf)); + } + } +} + /** * Execute in backends used by current router session. * Save session variable commands to router session property @@ -343,13 +343,6 @@ void RWSplitSession::purge_history(mxs::SSessionCommand& sescmd) */ bool RWSplitSession::route_session_write(GWBUF *querybuf, uint8_t command, uint32_t type) { - if (is_large_query(querybuf)) - { - MXS_ERROR("Session command is too large, session cannot continue. " - "Large session commands are not supported in 2.2."); - return false; - } - /** The SessionCommand takes ownership of the buffer */ uint64_t id = m_sescmd_count++; mxs::SSessionCommand sescmd(new mxs::SessionCommand(querybuf, id)); diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index bf2021003..4a01c3699 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -141,7 +141,10 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf) current_target = QueryClassifier::CURRENT_TARGET_SLAVE; } - m_qc.update_route_info(current_target, querybuf); + if (!m_qc.large_query()) + { + m_qc.update_route_info(current_target, querybuf); + } /** No active or pending queries */ if (route_single_stmt(querybuf)) diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index d476bc01f..61f82dda3 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -151,6 +151,7 @@ private: void purge_history(mxs::SSessionCommand& sescmd); bool route_session_write(GWBUF *querybuf, uint8_t command, uint32_t type); + void continue_large_session_write(GWBUF *querybuf, uint32_t type); bool route_single_stmt(GWBUF *querybuf); bool route_stored_query(); @@ -217,6 +218,18 @@ private: { return !m_config.disable_sescmd_history || m_recv_sescmd == 0; } + + inline bool is_large_query(GWBUF* buf) + { + uint32_t buflen = gwbuf_length(buf); + + // The buffer should contain at most (2^24 - 1) + 4 bytes ... + ss_dassert(buflen <= MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN); + // ... and the payload should be buflen - 4 bytes + ss_dassert(MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buf)) == buflen - MYSQL_HEADER_LEN); + + return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN; + } }; /**