Merge branch '2.2' into develop
This commit is contained in:
commit
880e0fb74e
@ -693,6 +693,10 @@ add_test_executable(mxs1804_long_ps_hang.cpp mxs1804_long_ps_hang replication LA
|
||||
# https://jira.mariadb.org/browse/MXS-1808
|
||||
add_test_executable(mxs1808_long_data.cpp mxs1808_long_data replication LABELS readwritesplit REPL_BACKEND)
|
||||
|
||||
# MXS-1824: Debug assertion with two open cursors
|
||||
# https://jira.mariadb.org/browse/MXS-1824
|
||||
add_test_executable(mxs1824_double_cursor.cpp mxs1824_double_cursor replication LABELS readwritesplit REPL_BACKEND)
|
||||
|
||||
# 'namedserverfilter' test
|
||||
add_test_executable(namedserverfilter.cpp namedserverfilter namedserverfilter LABELS namedserverfilter LIGHT REPL_BACKEND)
|
||||
|
||||
|
66
maxscale-system-test/mxs1824_double_cursor.cpp
Normal file
66
maxscale-system-test/mxs1824_double_cursor.cpp
Normal file
@ -0,0 +1,66 @@
|
||||
/**
|
||||
* MXS-1824: Debug assertion with two open cursors
|
||||
*
|
||||
* https://jira.mariadb.org/browse/MXS-1824
|
||||
*/
|
||||
|
||||
#include "testconnections.h"
|
||||
|
||||
void double_cursor(TestConnections& test, MYSQL* conn)
|
||||
{
|
||||
test.try_query(conn, "CREATE OR REPLACE TABLE test.t1(id int)");
|
||||
test.try_query(conn, "INSERT INTO test.t1 VALUES (1), (2), (3)");
|
||||
|
||||
test.repl->connect();
|
||||
test.repl->sync_slaves();
|
||||
test.repl->disconnect();
|
||||
|
||||
MYSQL_STMT* stmt1 = mysql_stmt_init(conn);
|
||||
const char* query = "SELECT id FROM test.t1";
|
||||
int rc = mysql_stmt_prepare(stmt1, query, strlen(query));
|
||||
test.assert(rc == 0, "Prepare should work: %s %s", mysql_stmt_error(stmt1), mysql_error(conn));
|
||||
int type = CURSOR_TYPE_READ_ONLY;
|
||||
mysql_stmt_attr_set(stmt1, STMT_ATTR_CURSOR_TYPE, &type);
|
||||
|
||||
MYSQL_BIND bind[1] {};
|
||||
uint32_t id;
|
||||
bind[0].buffer_type = MYSQL_TYPE_LONG;
|
||||
bind[0].buffer = &id;
|
||||
mysql_stmt_bind_result(stmt1, bind);
|
||||
|
||||
test.assert(mysql_stmt_execute(stmt1) == 0, "Execute of first statement should work: %s %s",
|
||||
mysql_stmt_error(stmt1), mysql_error(conn));
|
||||
test.assert(mysql_stmt_fetch(stmt1) == 0, "First fetch should work: %s %s",
|
||||
mysql_stmt_error(stmt1), mysql_error(conn));
|
||||
|
||||
MYSQL_STMT* stmt2 = mysql_stmt_init(conn);
|
||||
rc = mysql_stmt_prepare(stmt2, query, strlen(query));
|
||||
test.assert(rc == 0, "Prepare should work: %s %s", mysql_stmt_error(stmt2), mysql_error(conn));
|
||||
mysql_stmt_attr_set(stmt2, STMT_ATTR_CURSOR_TYPE, &type);
|
||||
mysql_stmt_bind_result(stmt2, bind);
|
||||
|
||||
test.assert(mysql_stmt_execute(stmt2) == 0, "Execute of second statement should work: %s %s",
|
||||
mysql_stmt_error(stmt2), mysql_error(conn));
|
||||
test.assert(mysql_stmt_fetch(stmt2) == 0, "Second fetch should work: %s %s", mysql_stmt_error(stmt2),
|
||||
mysql_error(conn));
|
||||
mysql_stmt_reset(stmt2);
|
||||
|
||||
test.assert(mysql_stmt_fetch(stmt1) == 0, "Third fetch should work: %s %s", mysql_stmt_error(stmt1),
|
||||
mysql_error(conn));
|
||||
|
||||
mysql_stmt_close(stmt1);
|
||||
mysql_stmt_close(stmt2);
|
||||
|
||||
test.try_query(conn, "DROP TABLE test.t1");
|
||||
}
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
TestConnections test(argc, argv);
|
||||
|
||||
test.maxscales->connect();
|
||||
double_cursor(test, test.maxscales->conn_rwsplit[0]);
|
||||
test.maxscales->disconnect();
|
||||
|
||||
return test.global_result;
|
||||
}
|
@ -21,11 +21,23 @@ int main(int argc, char** argv)
|
||||
{ "SELECT PREVIOUS VALUE FOR seq", "1" },
|
||||
{ "SELECT NEXTVAL(seq)", "2" },
|
||||
{ "SELECT LASTVAL(seq)", "2" },
|
||||
};
|
||||
|
||||
for (auto a : statements)
|
||||
{
|
||||
test.assert(execute_query_check_one(test.maxscales->conn_rwsplit[0], a.first, a.second) == 0,
|
||||
"Expected '%s' for query: %s", a.second, a.first);
|
||||
}
|
||||
|
||||
test.try_query(test.maxscales->conn_rwsplit[0], "SET SQL_MODE='ORACLE'");
|
||||
|
||||
std::vector< std::pair<const char*, const char*> > oracle_statements =
|
||||
{
|
||||
{ "SELECT seq.nextval", "3" },
|
||||
{ "SELECT seq.currval", "3" },
|
||||
};
|
||||
|
||||
for (auto a : statements)
|
||||
for (auto a : oracle_statements)
|
||||
{
|
||||
test.assert(execute_query_check_one(test.maxscales->conn_rwsplit[0], a.first, a.second) == 0,
|
||||
"Expected '%s' for query: %s", a.second, a.first);
|
||||
|
@ -633,14 +633,20 @@ GWBUF* modutil_get_complete_packets(GWBUF **p_readbuf)
|
||||
|
||||
int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more_out, modutil_state* state)
|
||||
{
|
||||
enum
|
||||
{
|
||||
SKIP_NEXT = 0x1,
|
||||
PS_OUT_PARAM = 0x2,
|
||||
};
|
||||
|
||||
unsigned int len = gwbuf_length(reply);
|
||||
int eof = 0;
|
||||
int err = 0;
|
||||
size_t offset = 0;
|
||||
bool skip_next = state ? state->state : false;
|
||||
bool more = false;
|
||||
bool only_ok = true;
|
||||
uint64_t num_packets = 0;
|
||||
uint8_t internal_state = state ? state->state : 0;
|
||||
|
||||
while (offset < len)
|
||||
{
|
||||
@ -655,12 +661,12 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more_out, modu
|
||||
if (payloadlen == GW_MYSQL_MAX_PACKET_LEN)
|
||||
{
|
||||
only_ok = false;
|
||||
skip_next = true;
|
||||
internal_state |= SKIP_NEXT;
|
||||
}
|
||||
else if (skip_next)
|
||||
else if (internal_state & SKIP_NEXT)
|
||||
{
|
||||
only_ok = false;
|
||||
skip_next = false;
|
||||
internal_state &= ~SKIP_NEXT;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -678,6 +684,28 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more_out, modu
|
||||
{
|
||||
eof++;
|
||||
only_ok = false;
|
||||
|
||||
uint8_t status[2]; // Two byte server status
|
||||
gwbuf_copy_data(reply, offset + MYSQL_HEADER_LEN + 1 + 2, sizeof(status), status);
|
||||
more = gw_mysql_get_byte2(status) & SERVER_MORE_RESULTS_EXIST;
|
||||
|
||||
/**
|
||||
* MySQL 5.6 and 5.7 have a "feature" that doesn't set
|
||||
* the SERVER_MORE_RESULTS_EXIST flag in the last EOF packet of
|
||||
* a result set if the SERVER_PS_OUT_PARAMS flag was set in
|
||||
* the first result set. To handle this, we have to store
|
||||
* the information from the first EOF packet until we process
|
||||
* the second EOF packet.
|
||||
*/
|
||||
if (gw_mysql_get_byte2(status) & SERVER_PS_OUT_PARAMS)
|
||||
{
|
||||
internal_state |= PS_OUT_PARAM;
|
||||
}
|
||||
else if (internal_state & PS_OUT_PARAM)
|
||||
{
|
||||
more = true;
|
||||
internal_state &= ~PS_OUT_PARAM;
|
||||
}
|
||||
}
|
||||
else if (command == MYSQL_REPLY_OK && pktlen >= MYSQL_OK_PACKET_MIN_LEN &&
|
||||
(eof + n_found) % 2 == 0)
|
||||
@ -699,13 +727,6 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more_out, modu
|
||||
}
|
||||
}
|
||||
|
||||
if (offset + pktlen >= len || (eof + err + n_found) >= 2)
|
||||
{
|
||||
gwbuf_copy_data(reply, offset, sizeof(header), header);
|
||||
uint16_t* status = (uint16_t*)(header + MYSQL_HEADER_LEN + 1 + 2); // Skip command and warning count
|
||||
more = ((*status) & SERVER_MORE_RESULTS_EXIST);
|
||||
}
|
||||
|
||||
offset += pktlen;
|
||||
|
||||
if (offset >= GWBUF_LENGTH(reply) && reply->next)
|
||||
@ -720,7 +741,7 @@ int modutil_count_signal_packets(GWBUF *reply, int n_found, bool* more_out, modu
|
||||
|
||||
if (state)
|
||||
{
|
||||
state->state = skip_next;
|
||||
state->state = internal_state;
|
||||
}
|
||||
|
||||
*more_out = more;
|
||||
|
@ -10,9 +10,9 @@ namespace maxscale
|
||||
RWBackend::RWBackend(SERVER_REF* ref):
|
||||
mxs::Backend(ref),
|
||||
m_reply_state(REPLY_STATE_DONE),
|
||||
m_large_packet(false),
|
||||
m_modutil_state({}),
|
||||
m_command(0),
|
||||
m_open_cursor(false),
|
||||
m_opening_cursor(false),
|
||||
m_expected_rows(0)
|
||||
{
|
||||
}
|
||||
@ -77,20 +77,15 @@ bool RWBackend::write(GWBUF* buffer, response_type type)
|
||||
gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 1, &flags);
|
||||
|
||||
// Any non-zero flag value means that we have an open cursor
|
||||
m_open_cursor = flags != 0;
|
||||
m_opening_cursor = flags != 0;
|
||||
}
|
||||
else if (cmd == MXS_COM_STMT_FETCH)
|
||||
{
|
||||
ss_dassert(m_open_cursor);
|
||||
// Number of rows to fetch is a 4 byte integer after the ID
|
||||
uint8_t buf[4];
|
||||
gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 4, buf);
|
||||
m_expected_rows = gw_mysql_get_byte4(buf);
|
||||
}
|
||||
else
|
||||
{
|
||||
m_open_cursor = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -129,9 +124,7 @@ bool RWBackend::reply_is_complete(GWBUF *buffer)
|
||||
if (current_command() == MXS_COM_STMT_FETCH)
|
||||
{
|
||||
bool more = false;
|
||||
modutil_state state = {is_large_packet()};
|
||||
int n_eof = modutil_count_signal_packets(buffer, 0, &more, &state);
|
||||
set_large_packet(state.state);
|
||||
int n_eof = modutil_count_signal_packets(buffer, 0, &more, &m_modutil_state);
|
||||
|
||||
// If the server responded with an error, n_eof > 0
|
||||
if (n_eof > 0 || consume_fetched_rows(buffer))
|
||||
@ -172,10 +165,8 @@ bool RWBackend::reply_is_complete(GWBUF *buffer)
|
||||
else
|
||||
{
|
||||
bool more = false;
|
||||
modutil_state state = {is_large_packet()};
|
||||
int n_old_eof = get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
|
||||
int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &state);
|
||||
set_large_packet(state.state);
|
||||
int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &m_modutil_state);
|
||||
|
||||
if (n_eof > 2)
|
||||
{
|
||||
@ -197,8 +188,9 @@ bool RWBackend::reply_is_complete(GWBUF *buffer)
|
||||
/** Waiting for the EOF packet after the rows */
|
||||
set_reply_state(REPLY_STATE_RSET_ROWS);
|
||||
|
||||
if (cursor_is_open())
|
||||
if (is_opening_cursor())
|
||||
{
|
||||
set_cursor_opened();
|
||||
MXS_INFO("Cursor successfully opened");
|
||||
set_reply_state(REPLY_STATE_DONE);
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include <tr1/memory>
|
||||
|
||||
#include <maxscale/backend.hh>
|
||||
#include <maxscale/modutil.h>
|
||||
|
||||
namespace maxscale
|
||||
{
|
||||
@ -69,37 +70,30 @@ public:
|
||||
// For COM_STMT_FETCH processing
|
||||
bool consume_fetched_rows(GWBUF* buffer);
|
||||
|
||||
inline void set_large_packet(bool value)
|
||||
{
|
||||
m_large_packet = value;
|
||||
}
|
||||
|
||||
inline bool is_large_packet() const
|
||||
{
|
||||
return m_large_packet;
|
||||
}
|
||||
|
||||
inline uint8_t current_command() const
|
||||
{
|
||||
return m_command;
|
||||
}
|
||||
|
||||
inline bool cursor_is_open() const
|
||||
{
|
||||
return m_open_cursor;
|
||||
}
|
||||
|
||||
bool reply_is_complete(GWBUF *buffer);
|
||||
|
||||
private:
|
||||
reply_state_t m_reply_state;
|
||||
BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */
|
||||
bool m_large_packet; /**< Used to store the state of the EOF packet
|
||||
*calculation for result sets when the result
|
||||
* contains very large rows */
|
||||
modutil_state m_modutil_state; /**< @see modutil_count_signal_packets */
|
||||
uint8_t m_command;
|
||||
bool m_open_cursor; /**< Whether we have an open cursor */
|
||||
bool m_opening_cursor; /**< Whether we are opening a cursor */
|
||||
uint32_t m_expected_rows; /**< Number of rows a COM_STMT_FETCH is retrieving */
|
||||
|
||||
inline bool is_opening_cursor() const
|
||||
{
|
||||
return m_opening_cursor;
|
||||
}
|
||||
|
||||
inline void set_cursor_opened()
|
||||
{
|
||||
m_opening_cursor = false;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -240,7 +240,6 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
|
||||
/** Track the targets of the COM_STMT_EXECUTE statements. This
|
||||
* information is used to route all COM_STMT_FETCH commands
|
||||
* to the same server where the COM_STMT_EXECUTE was done. */
|
||||
ss_dassert(stmt_id > 0);
|
||||
m_exec_map[stmt_id] = target;
|
||||
MXS_INFO("COM_STMT_EXECUTE on %s: %s", target->name(), target->uri());
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user