Merge branch '2.2' into develop
This commit is contained in:
@ -154,6 +154,7 @@ createInstance(SERVICE *service, char **options)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
inst->sessions = NULL;
|
||||
inst->service = service;
|
||||
spinlock_init(&inst->lock);
|
||||
|
||||
|
@ -11,7 +11,9 @@ RWBackend::RWBackend(SERVER_REF* ref):
|
||||
mxs::Backend(ref),
|
||||
m_reply_state(REPLY_STATE_DONE),
|
||||
m_large_packet(false),
|
||||
m_command(0)
|
||||
m_command(0),
|
||||
m_open_cursor(false),
|
||||
m_expected_rows(0)
|
||||
{
|
||||
}
|
||||
|
||||
@ -21,7 +23,8 @@ RWBackend::~RWBackend()
|
||||
|
||||
bool RWBackend::execute_session_command()
|
||||
{
|
||||
bool expect_response = mxs_mysql_command_will_respond(next_session_command()->get_command());
|
||||
m_command = next_session_command()->get_command();
|
||||
bool expect_response = mxs_mysql_command_will_respond(m_command);
|
||||
bool rval = mxs::Backend::execute_session_command();
|
||||
|
||||
if (rval && expect_response)
|
||||
@ -66,6 +69,28 @@ bool RWBackend::write(GWBUF* buffer, response_type type)
|
||||
/** Replace the client handle with the real PS handle */
|
||||
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
|
||||
gw_mysql_set_byte4(ptr, it->second);
|
||||
|
||||
if (cmd == MXS_COM_STMT_EXECUTE)
|
||||
{
|
||||
// Extract the flag byte after the statement ID
|
||||
uint8_t flags = 0;
|
||||
gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 1, &flags);
|
||||
|
||||
// Any non-zero flag value means that we have an open cursor
|
||||
m_open_cursor = flags != 0;
|
||||
}
|
||||
else if (cmd == MXS_COM_STMT_FETCH)
|
||||
{
|
||||
ss_dassert(m_open_cursor);
|
||||
// Number of rows to fetch is a 4 byte integer after the ID
|
||||
uint8_t buf[4];
|
||||
gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 4, buf);
|
||||
m_expected_rows = gw_mysql_get_byte4(buf);
|
||||
}
|
||||
else
|
||||
{
|
||||
m_open_cursor = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -78,6 +103,13 @@ void RWBackend::close(close_type type)
|
||||
mxs::Backend::close(type);
|
||||
}
|
||||
|
||||
bool RWBackend::consume_fetched_rows(GWBUF* buffer)
|
||||
{
|
||||
m_expected_rows -= modutil_count_packets(buffer);
|
||||
ss_dassert(m_expected_rows >= 0);
|
||||
return m_expected_rows == 0;
|
||||
}
|
||||
|
||||
static inline bool have_next_packet(GWBUF* buffer)
|
||||
{
|
||||
uint32_t len = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN;
|
||||
@ -94,7 +126,26 @@ static inline bool have_next_packet(GWBUF* buffer)
|
||||
*/
|
||||
bool RWBackend::reply_is_complete(GWBUF *buffer)
|
||||
{
|
||||
if (get_reply_state() == REPLY_STATE_START &&
|
||||
if (current_command() == MXS_COM_STMT_FETCH)
|
||||
{
|
||||
bool more = false;
|
||||
modutil_state state = {is_large_packet()};
|
||||
int n_eof = modutil_count_signal_packets(buffer, 0, &more, &state);
|
||||
set_large_packet(state.state);
|
||||
|
||||
// If the server responded with an error, n_eof > 0
|
||||
if (n_eof > 0 || consume_fetched_rows(buffer))
|
||||
{
|
||||
|
||||
set_reply_state(REPLY_STATE_DONE);
|
||||
}
|
||||
}
|
||||
else if (current_command() == MXS_COM_STATISTICS)
|
||||
{
|
||||
// COM_STATISTICS returns a single string and thus requires special handling
|
||||
set_reply_state(REPLY_STATE_DONE);
|
||||
}
|
||||
else if (get_reply_state() == REPLY_STATE_START &&
|
||||
(!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
|
||||
{
|
||||
if (GWBUF_IS_COLLECTED_RESULT(buffer) ||
|
||||
@ -145,6 +196,12 @@ bool RWBackend::reply_is_complete(GWBUF *buffer)
|
||||
{
|
||||
/** Waiting for the EOF packet after the rows */
|
||||
set_reply_state(REPLY_STATE_RSET_ROWS);
|
||||
|
||||
if (cursor_is_open())
|
||||
{
|
||||
MXS_INFO("Cursor successfully opened");
|
||||
set_reply_state(REPLY_STATE_DONE);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -66,6 +66,9 @@ public:
|
||||
bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE);
|
||||
void close(close_type type = CLOSE_NORMAL);
|
||||
|
||||
// For COM_STMT_FETCH processing
|
||||
bool consume_fetched_rows(GWBUF* buffer);
|
||||
|
||||
inline void set_large_packet(bool value)
|
||||
{
|
||||
m_large_packet = value;
|
||||
@ -81,6 +84,11 @@ public:
|
||||
return m_command;
|
||||
}
|
||||
|
||||
inline bool cursor_is_open() const
|
||||
{
|
||||
return m_open_cursor;
|
||||
}
|
||||
|
||||
bool reply_is_complete(GWBUF *buffer);
|
||||
|
||||
private:
|
||||
@ -90,6 +98,8 @@ private:
|
||||
*calculation for result sets when the result
|
||||
* contains very large rows */
|
||||
uint8_t m_command;
|
||||
bool m_open_cursor; /**< Whether we have an open cursor */
|
||||
uint32_t m_expected_rows; /**< Number of rows a COM_STMT_FETCH is retrieving */
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -116,7 +116,6 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
|
||||
|
||||
if (m_query_queue == NULL &&
|
||||
(m_expected_responses == 0 ||
|
||||
mxs_mysql_get_command(querybuf) == MXS_COM_STMT_FETCH ||
|
||||
m_qc.load_data_state() == QueryClassifier::LOAD_DATA_ACTIVE ||
|
||||
m_qc.large_query()))
|
||||
{
|
||||
@ -376,6 +375,13 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
||||
|
||||
SRWBackend& backend = get_backend_from_dcb(backend_dcb);
|
||||
|
||||
if (m_qc.load_data_state() == QueryClassifier::LOAD_DATA_ACTIVE &&
|
||||
mxs_mysql_is_err_packet(writebuf))
|
||||
{
|
||||
// Server responded with an error to the LOAD DATA LOCAL INFILE
|
||||
m_qc.set_load_data_state(QueryClassifier::LOAD_DATA_INACTIVE);
|
||||
}
|
||||
|
||||
if ((writebuf = handle_causal_read_reply(writebuf, backend)) == NULL)
|
||||
{
|
||||
return; // Nothing to route, return
|
||||
|
Reference in New Issue
Block a user