diff --git a/maxscale-system-test/CMakeLists.txt b/maxscale-system-test/CMakeLists.txt index 9d6945acf..b9521bbcf 100644 --- a/maxscale-system-test/CMakeLists.txt +++ b/maxscale-system-test/CMakeLists.txt @@ -693,6 +693,10 @@ add_test_executable(mxs1804_long_ps_hang.cpp mxs1804_long_ps_hang replication LA # https://jira.mariadb.org/browse/MXS-1808 add_test_executable(mxs1808_long_data.cpp mxs1808_long_data replication LABELS readwritesplit REPL_BACKEND) +# MXS-1824: Debug assertion with two open cursors +# https://jira.mariadb.org/browse/MXS-1824 +add_test_executable(mxs1824_double_cursor.cpp mxs1824_double_cursor replication LABELS readwritesplit REPL_BACKEND) + # 'namedserverfilter' test add_test_executable(namedserverfilter.cpp namedserverfilter namedserverfilter LABELS namedserverfilter LIGHT REPL_BACKEND) diff --git a/maxscale-system-test/mxs1824_double_cursor.cpp b/maxscale-system-test/mxs1824_double_cursor.cpp new file mode 100644 index 000000000..01d5e9020 --- /dev/null +++ b/maxscale-system-test/mxs1824_double_cursor.cpp @@ -0,0 +1,66 @@ +/** + * MXS-1824: Debug assertion with two open cursors + * + * https://jira.mariadb.org/browse/MXS-1824 + */ + +#include "testconnections.h" + +void double_cursor(TestConnections& test, MYSQL* conn) +{ + test.try_query(conn, "CREATE OR REPLACE TABLE test.t1(id int)"); + test.try_query(conn, "INSERT INTO test.t1 VALUES (1), (2), (3)"); + + test.repl->connect(); + test.repl->sync_slaves(); + test.repl->disconnect(); + + MYSQL_STMT* stmt1 = mysql_stmt_init(conn); + const char* query = "SELECT id FROM test.t1"; + int rc = mysql_stmt_prepare(stmt1, query, strlen(query)); + test.assert(rc == 0, "Prepare should work: %s %s", mysql_stmt_error(stmt1), mysql_error(conn)); + int type = CURSOR_TYPE_READ_ONLY; + mysql_stmt_attr_set(stmt1, STMT_ATTR_CURSOR_TYPE, &type); + + MYSQL_BIND bind[1] {}; + uint32_t id; + bind[0].buffer_type = MYSQL_TYPE_LONG; + bind[0].buffer = &id; + mysql_stmt_bind_result(stmt1, bind); + + test.assert(mysql_stmt_execute(stmt1) == 0, "Execute of first statement should work: %s %s", + mysql_stmt_error(stmt1), mysql_error(conn)); + test.assert(mysql_stmt_fetch(stmt1) == 0, "First fetch should work: %s %s", + mysql_stmt_error(stmt1), mysql_error(conn)); + + MYSQL_STMT* stmt2 = mysql_stmt_init(conn); + rc = mysql_stmt_prepare(stmt2, query, strlen(query)); + test.assert(rc == 0, "Prepare should work: %s %s", mysql_stmt_error(stmt2), mysql_error(conn)); + mysql_stmt_attr_set(stmt2, STMT_ATTR_CURSOR_TYPE, &type); + mysql_stmt_bind_result(stmt2, bind); + + test.assert(mysql_stmt_execute(stmt2) == 0, "Execute of second statement should work: %s %s", + mysql_stmt_error(stmt2), mysql_error(conn)); + test.assert(mysql_stmt_fetch(stmt2) == 0, "Second fetch should work: %s %s", mysql_stmt_error(stmt2), + mysql_error(conn)); + mysql_stmt_reset(stmt2); + + test.assert(mysql_stmt_fetch(stmt1) == 0, "Third fetch should work: %s %s", mysql_stmt_error(stmt1), + mysql_error(conn)); + + mysql_stmt_close(stmt1); + mysql_stmt_close(stmt2); + + test.try_query(conn, "DROP TABLE test.t1"); +} + +int main(int argc, char** argv) +{ + TestConnections test(argc, argv); + + test.maxscales->connect(); + double_cursor(test, test.maxscales->conn_rwsplit[0]); + test.maxscales->disconnect(); + + return test.global_result; +} diff --git a/maxscale-system-test/sequence.cpp b/maxscale-system-test/sequence.cpp index 1f2a67128..12953adb2 100644 --- a/maxscale-system-test/sequence.cpp +++ b/maxscale-system-test/sequence.cpp @@ -21,11 +21,23 @@ int main(int argc, char** argv) { "SELECT PREVIOUS VALUE FOR seq", "1" }, { "SELECT NEXTVAL(seq)", "2" }, { "SELECT LASTVAL(seq)", "2" }, + }; + + for (auto a : statements) + { + test.assert(execute_query_check_one(test.maxscales->conn_rwsplit[0], a.first, a.second) == 0, + "Expected '%s' for query: %s", a.second, a.first); + } + + test.try_query(test.maxscales->conn_rwsplit[0], "SET SQL_MODE='ORACLE'"); + + std::vector< std::pair > oracle_statements = + { { "SELECT seq.nextval", "3" }, { "SELECT seq.currval", "3" }, }; - for (auto a : statements) + for (auto a : oracle_statements) { test.assert(execute_query_check_one(test.maxscales->conn_rwsplit[0], a.first, a.second) == 0, "Expected '%s' for query: %s", a.second, a.first); diff --git a/server/core/modutil.cc b/server/core/modutil.cc index d381dd340..65d7beae2 100644 --- a/server/core/modutil.cc +++ b/server/core/modutil.cc @@ -633,14 +633,20 @@ GWBUF* modutil_get_complete_packets(GWBUF **p_readbuf) int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more_out, modutil_state* state) { + enum + { + SKIP_NEXT = 0x1, + PS_OUT_PARAM = 0x2, + }; + unsigned int len = gwbuf_length(reply); int eof = 0; int err = 0; size_t offset = 0; - bool skip_next = state ? state->state : false; bool more = false; bool only_ok = true; uint64_t num_packets = 0; + uint8_t internal_state = state ? state->state : 0; while (offset < len) { @@ -655,12 +661,12 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more_out, modu if (payloadlen == GW_MYSQL_MAX_PACKET_LEN) { only_ok = false; - skip_next = true; + internal_state |= SKIP_NEXT; } - else if (skip_next) + else if (internal_state & SKIP_NEXT) { only_ok = false; - skip_next = false; + internal_state &= ~SKIP_NEXT; } else { @@ -678,6 +684,28 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more_out, modu { eof++; only_ok = false; + + uint8_t status[2]; // Two byte server status + gwbuf_copy_data(reply, offset + MYSQL_HEADER_LEN + 1 + 2, sizeof(status), status); + more = gw_mysql_get_byte2(status) & SERVER_MORE_RESULTS_EXIST; + + /** + * MySQL 5.6 and 5.7 have a "feature" that doesn't set + * the SERVER_MORE_RESULTS_EXIST flag in the last EOF packet of + * a result set if the SERVER_PS_OUT_PARAMS flag was set in + * the first result set. To handle this, we have to store + * the information from the first EOF packet until we process + * the second EOF packet. + */ + if (gw_mysql_get_byte2(status) & SERVER_PS_OUT_PARAMS) + { + internal_state |= PS_OUT_PARAM; + } + else if (internal_state & PS_OUT_PARAM) + { + more = true; + internal_state &= ~PS_OUT_PARAM; + } } else if (command == MYSQL_REPLY_OK && pktlen >= MYSQL_OK_PACKET_MIN_LEN && (eof + n_found) % 2 == 0) @@ -699,13 +727,6 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more_out, modu } } - if (offset + pktlen >= len || (eof + err + n_found) >= 2) - { - gwbuf_copy_data(reply, offset, sizeof(header), header); - uint16_t* status = (uint16_t*)(header + MYSQL_HEADER_LEN + 1 + 2); // Skip command and warning count - more = ((*status) & SERVER_MORE_RESULTS_EXIST); - } - offset += pktlen; if (offset >= GWBUF_LENGTH(reply) && reply->next) @@ -720,7 +741,7 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more_out, modu if (state) { - state->state = skip_next; + state->state = internal_state; } *more_out = more; diff --git a/server/modules/routing/readwritesplit/rwbackend.cc b/server/modules/routing/readwritesplit/rwbackend.cc index 015f7acba..b7b1e32ad 100644 --- a/server/modules/routing/readwritesplit/rwbackend.cc +++ b/server/modules/routing/readwritesplit/rwbackend.cc @@ -10,9 +10,9 @@ namespace maxscale RWBackend::RWBackend(SERVER_REF* ref): mxs::Backend(ref), m_reply_state(REPLY_STATE_DONE), - m_large_packet(false), + m_modutil_state({}), m_command(0), - m_open_cursor(false), + m_opening_cursor(false), m_expected_rows(0) { } @@ -77,20 +77,15 @@ bool RWBackend::write(GWBUF* buffer, response_type type) gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 1, &flags); // Any non-zero flag value means that we have an open cursor - m_open_cursor = flags != 0; + m_opening_cursor = flags != 0; } else if (cmd == MXS_COM_STMT_FETCH) { - ss_dassert(m_open_cursor); // Number of rows to fetch is a 4 byte integer after the ID uint8_t buf[4]; gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 4, buf); m_expected_rows = gw_mysql_get_byte4(buf); } - else - { - m_open_cursor = false; - } } } @@ -129,9 +124,7 @@ bool RWBackend::reply_is_complete(GWBUF *buffer) if (current_command() == MXS_COM_STMT_FETCH) { bool more = false; - modutil_state state = {is_large_packet()}; - int n_eof = modutil_count_signal_packets(buffer, 0, &more, &state); - set_large_packet(state.state); + int n_eof = modutil_count_signal_packets(buffer, 0, &more, &m_modutil_state); // If the server responded with an error, n_eof > 0 if (n_eof > 0 || consume_fetched_rows(buffer)) @@ -172,10 +165,8 @@ bool RWBackend::reply_is_complete(GWBUF *buffer) else { bool more = false; - modutil_state state = {is_large_packet()}; int n_old_eof = get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0; - int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &state); - set_large_packet(state.state); + int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &m_modutil_state); if (n_eof > 2) { @@ -197,8 +188,9 @@ bool RWBackend::reply_is_complete(GWBUF *buffer) /** Waiting for the EOF packet after the rows */ set_reply_state(REPLY_STATE_RSET_ROWS); - if (cursor_is_open()) + if (is_opening_cursor()) { + set_cursor_opened(); MXS_INFO("Cursor successfully opened"); set_reply_state(REPLY_STATE_DONE); } diff --git a/server/modules/routing/readwritesplit/rwbackend.hh b/server/modules/routing/readwritesplit/rwbackend.hh index 2340c909e..bb0edbe04 100644 --- a/server/modules/routing/readwritesplit/rwbackend.hh +++ b/server/modules/routing/readwritesplit/rwbackend.hh @@ -18,6 +18,7 @@ #include #include +#include namespace maxscale { @@ -69,37 +70,30 @@ public: // For COM_STMT_FETCH processing bool consume_fetched_rows(GWBUF* buffer); - inline void set_large_packet(bool value) - { - m_large_packet = value; - } - - inline bool is_large_packet() const - { - return m_large_packet; - } - inline uint8_t current_command() const { return m_command; } - inline bool cursor_is_open() const - { - return m_open_cursor; - } - bool reply_is_complete(GWBUF *buffer); private: reply_state_t m_reply_state; BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */ - bool m_large_packet; /**< Used to store the state of the EOF packet - *calculation for result sets when the result - * contains very large rows */ + modutil_state m_modutil_state; /**< @see modutil_count_signal_packets */ uint8_t m_command; - bool m_open_cursor; /**< Whether we have an open cursor */ + bool m_opening_cursor; /**< Whether we are opening a cursor */ uint32_t m_expected_rows; /**< Number of rows a COM_STMT_FETCH is retrieving */ + + inline bool is_opening_cursor() const + { + return m_opening_cursor; + } + + inline void set_cursor_opened() + { + m_opening_cursor = false; + } }; } diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 93315eee2..ede8c3705 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -240,7 +240,6 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf) /** Track the targets of the COM_STMT_EXECUTE statements. This * information is used to route all COM_STMT_FETCH commands * to the same server where the COM_STMT_EXECUTE was done. */ - ss_dassert(stmt_id > 0); m_exec_map[stmt_id] = target; MXS_INFO("COM_STMT_EXECUTE on %s: %s", target->name(), target->uri()); }