Fix handling of collected results
The result collection did not reset properly when a non-resultset was returned for a request. As collected result need to be distinguishable from single packet responses, a new buffer type was added. The new buffer type is used by readwritesplit which uses result collection for preparation of prepared statements. Moved the current command tracking to the RWBackend class as the command tracked by the protocol is can change before a response to the executed command is received. Removed a false debug assertion in the mxs_mysql_extract_ps_response function that was triggered when a very large prepared statement response was processed in multiple parts.
This commit is contained in:
parent
948f66f918
commit
a6eeed98fe
@ -57,7 +57,8 @@ typedef enum
|
||||
GWBUF_TYPE_SESCMD = 0x04,
|
||||
GWBUF_TYPE_HTTP = 0x08,
|
||||
GWBUF_TYPE_IGNORABLE = 0x10,
|
||||
GWBUF_TYPE_COLLECT_RESULT = 0x20
|
||||
GWBUF_TYPE_COLLECT_RESULT = 0x20,
|
||||
GWBUF_TYPE_RESULT = 0x40,
|
||||
} gwbuf_type_t;
|
||||
|
||||
#define GWBUF_IS_TYPE_UNDEFINED(b) (b->gwbuf_type == 0)
|
||||
@ -65,6 +66,7 @@ typedef enum
|
||||
#define GWBUF_IS_TYPE_RESPONSE_END(b) (b->gwbuf_type & GWBUF_TYPE_RESPONSE_END)
|
||||
#define GWBUF_IS_TYPE_SESCMD(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD)
|
||||
#define GWBUF_IS_IGNORABLE(b) (b->gwbuf_type & GWBUF_TYPE_IGNORABLE)
|
||||
#define GWBUF_IS_COLLECTED_RESULT(b) (b->gwbuf_type & GWBUF_TYPE_RESULT)
|
||||
#define GWBUF_SHOULD_COLLECT_RESULT(b) (b->gwbuf_type & GWBUF_TYPE_COLLECT_RESULT)
|
||||
|
||||
typedef enum
|
||||
|
@ -758,14 +758,16 @@ gw_read_and_write(DCB *dcb)
|
||||
|
||||
if (collecting_resultset(proto, capabilities))
|
||||
{
|
||||
if (expecting_resultset(proto) &&
|
||||
mxs_mysql_is_result_set(read_buffer))
|
||||
if (expecting_resultset(proto))
|
||||
{
|
||||
bool more = false;
|
||||
if (modutil_count_signal_packets(read_buffer, 0, &more, NULL) != 2)
|
||||
if (mxs_mysql_is_result_set(read_buffer))
|
||||
{
|
||||
dcb_readq_prepend(dcb, read_buffer);
|
||||
return 0;
|
||||
bool more = false;
|
||||
if (modutil_count_signal_packets(read_buffer, 0, &more, NULL) != 2)
|
||||
{
|
||||
dcb_readq_prepend(dcb, read_buffer);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Collected the complete result
|
||||
@ -937,6 +939,12 @@ gw_read_and_write(DCB *dcb)
|
||||
|
||||
if (session_ok_to_route(dcb))
|
||||
{
|
||||
if (result_collected)
|
||||
{
|
||||
// Mark that this is a buffer containing a collected result
|
||||
gwbuf_set_type(stmt, GWBUF_TYPE_RESULT);
|
||||
}
|
||||
|
||||
session->service->router->clientReply(session->service->router_instance,
|
||||
session->router_session,
|
||||
stmt, dcb);
|
||||
|
@ -1648,23 +1648,6 @@ bool mxs_mysql_extract_ps_response(GWBUF* buffer, MXS_PS_RESPONSE* out)
|
||||
out->parameters = gw_mysql_get_byte2(params);
|
||||
out->warnings = gw_mysql_get_byte2(warnings);
|
||||
rval = true;
|
||||
|
||||
#ifdef SS_DEBUG
|
||||
// Make sure that the PS response contains the whole response
|
||||
bool more;
|
||||
modutil_state state;
|
||||
int n_eof = modutil_count_signal_packets(buffer, 0, &more, &state);
|
||||
int n_expected = 0;
|
||||
if (out->columns)
|
||||
{
|
||||
n_expected++;
|
||||
}
|
||||
if (out->parameters)
|
||||
{
|
||||
n_expected++;
|
||||
}
|
||||
ss_dassert(n_eof == n_expected);
|
||||
#endif
|
||||
}
|
||||
|
||||
return rval;
|
||||
|
@ -515,6 +515,11 @@ static inline bool is_eof(GWBUF* buffer)
|
||||
gw_mysql_get_byte3(data) + MYSQL_HEADER_LEN == MYSQL_EOF_PACKET_LEN;
|
||||
}
|
||||
|
||||
static inline bool is_ok(GWBUF* buffer)
|
||||
{
|
||||
uint8_t* data = GWBUF_DATA(buffer);
|
||||
return data[MYSQL_HEADER_LEN] == MYSQL_REPLY_OK;
|
||||
}
|
||||
static inline bool is_large(GWBUF* buffer)
|
||||
{
|
||||
return gw_mysql_get_byte3(GWBUF_DATA(buffer)) == GW_MYSQL_MAX_PACKET_LEN;
|
||||
@ -549,11 +554,6 @@ static inline bool is_result_set(GWBUF *buffer)
|
||||
return rval;
|
||||
}
|
||||
|
||||
static inline uint8_t get_cmd(SRWBackend& backend)
|
||||
{
|
||||
return mxs_mysql_current_command(backend->dcb()->session);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Check if we have received a complete reply from the backend
|
||||
*
|
||||
@ -564,9 +564,21 @@ static inline uint8_t get_cmd(SRWBackend& backend)
|
||||
*/
|
||||
bool reply_is_complete(SRWBackend backend, GWBUF *buffer)
|
||||
{
|
||||
if (backend->get_reply_state() == REPLY_STATE_START && !is_result_set(buffer))
|
||||
if (GWBUF_IS_COLLECTED_RESULT(buffer))
|
||||
{
|
||||
if (!more_results_exist(buffer) || get_cmd(backend) == MXS_COM_STMT_PREPARE)
|
||||
// This branch should only be taken with a PS response
|
||||
ss_dassert(backend->get_reply_state() == REPLY_STATE_START);
|
||||
ss_dassert(backend->current_command() == MXS_COM_STMT_PREPARE ||
|
||||
backend->current_command() == MXS_COM_QUERY);
|
||||
|
||||
// This is a complete result of a request
|
||||
LOG_RS(backend, REPLY_STATE_DONE);
|
||||
backend->set_reply_state(REPLY_STATE_DONE);
|
||||
}
|
||||
else if (backend->get_reply_state() == REPLY_STATE_START && !is_result_set(buffer))
|
||||
{
|
||||
if (backend->current_command() == MXS_COM_STMT_PREPARE ||
|
||||
!is_ok(buffer) || !more_results_exist(buffer))
|
||||
{
|
||||
/** Not a result set, we have the complete response */
|
||||
LOG_RS(backend, REPLY_STATE_DONE);
|
||||
@ -599,7 +611,7 @@ bool reply_is_complete(SRWBackend backend, GWBUF *buffer)
|
||||
LOG_RS(backend, REPLY_STATE_RSET_COLDEF);
|
||||
backend->set_reply_state(REPLY_STATE_RSET_COLDEF);
|
||||
}
|
||||
else if (n_eof == 1 && get_cmd(backend) != MXS_COM_FIELD_LIST)
|
||||
else if (n_eof == 1 && backend->current_command() != MXS_COM_FIELD_LIST)
|
||||
{
|
||||
/** Waiting for the EOF packet after the rows */
|
||||
LOG_RS(backend, REPLY_STATE_RSET_ROWS);
|
||||
@ -609,7 +621,7 @@ bool reply_is_complete(SRWBackend backend, GWBUF *buffer)
|
||||
{
|
||||
/** We either have a complete result set or a response to
|
||||
* a COM_FIELD_LIST command */
|
||||
ss_dassert(n_eof == 2 || (n_eof == 1 && get_cmd(backend) == MXS_COM_FIELD_LIST));
|
||||
ss_dassert(n_eof == 2 || (n_eof == 1 && backend->current_command() == MXS_COM_FIELD_LIST));
|
||||
LOG_RS(backend, REPLY_STATE_DONE);
|
||||
backend->set_reply_state(REPLY_STATE_DONE);
|
||||
|
||||
@ -1172,8 +1184,10 @@ static void clientReply(MXS_ROUTER *instance,
|
||||
GWBUF *writebuf,
|
||||
DCB *backend_dcb)
|
||||
{
|
||||
ss_dassert(GWBUF_IS_CONTIGUOUS(writebuf) &&
|
||||
MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(writebuf)) + MYSQL_HEADER_LEN == gwbuf_length(writebuf));
|
||||
ss_dassert((GWBUF_IS_CONTIGUOUS(writebuf) &&
|
||||
MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(writebuf)) +
|
||||
MYSQL_HEADER_LEN == gwbuf_length(writebuf)) ||
|
||||
GWBUF_IS_COLLECTED_RESULT(writebuf));
|
||||
RWSplitSession *rses = (RWSplitSession *)router_session;
|
||||
DCB *client_dcb = backend_dcb->session->client_dcb;
|
||||
|
||||
|
@ -60,6 +60,8 @@ bool RWBackend::write(GWBUF* buffer, response_type type)
|
||||
{
|
||||
uint8_t cmd = mxs_mysql_get_command(buffer);
|
||||
|
||||
m_command = cmd;
|
||||
|
||||
if (is_ps_command(cmd))
|
||||
{
|
||||
uint32_t id = mxs_mysql_extract_ps_id(buffer);
|
||||
|
@ -69,12 +69,18 @@ public:
|
||||
return m_large_packet;
|
||||
}
|
||||
|
||||
inline uint8_t current_command() const
|
||||
{
|
||||
return m_command;
|
||||
}
|
||||
|
||||
private:
|
||||
reply_state_t m_reply_state;
|
||||
BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */
|
||||
bool m_large_packet; /**< Used to store the state of the EOF packet
|
||||
*calculation for result sets when the result
|
||||
* contains very large rows */
|
||||
uint8_t m_command;
|
||||
};
|
||||
|
||||
typedef std::tr1::shared_ptr<RWBackend> SRWBackend;
|
||||
|
Loading…
x
Reference in New Issue
Block a user