Move RWBackend::reply_is_complete into rwsplitsession.cc
The function is now defined in the correct file. Removed the debug output as it can be logged inside RWBackend::set_reply_state.
This commit is contained in:
@ -319,7 +319,6 @@ static bool reroute_stored_statement(RWSplitSession *rses, const SRWBackend& old
|
|||||||
{
|
{
|
||||||
MXS_INFO("Retrying failed read at '%s'.", backend->name());
|
MXS_INFO("Retrying failed read at '%s'.", backend->name());
|
||||||
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
|
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
|
||||||
LOG_RS(backend, REPLY_STATE_START);
|
|
||||||
backend->set_reply_state(REPLY_STATE_START);
|
backend->set_reply_state(REPLY_STATE_START);
|
||||||
rses->expected_responses++;
|
rses->expected_responses++;
|
||||||
success = true;
|
success = true;
|
||||||
@ -337,7 +336,6 @@ static bool reroute_stored_statement(RWSplitSession *rses, const SRWBackend& old
|
|||||||
if (rses->current_master->write(stored))
|
if (rses->current_master->write(stored))
|
||||||
{
|
{
|
||||||
MXS_INFO("Retrying failed read at '%s'.", rses->current_master->name());
|
MXS_INFO("Retrying failed read at '%s'.", rses->current_master->name());
|
||||||
LOG_RS(rses->current_master, REPLY_STATE_START);
|
|
||||||
ss_dassert(rses->current_master->get_reply_state() == REPLY_STATE_DONE);
|
ss_dassert(rses->current_master->get_reply_state() == REPLY_STATE_DONE);
|
||||||
rses->current_master->set_reply_state(REPLY_STATE_START);
|
rses->current_master->set_reply_state(REPLY_STATE_START);
|
||||||
rses->expected_responses++;
|
rses->expected_responses++;
|
||||||
@ -504,100 +502,6 @@ static bool route_stored_query(RWSplitSession *rses)
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline bool have_next_packet(GWBUF* buffer)
|
|
||||||
{
|
|
||||||
uint32_t len = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN;
|
|
||||||
return gwbuf_length(buffer) > len;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Check if we have received a complete reply from the backend
|
|
||||||
*
|
|
||||||
* @param backend Backend reference
|
|
||||||
* @param buffer Buffer containing the response
|
|
||||||
*
|
|
||||||
* @return True if the complete response has been received
|
|
||||||
*
|
|
||||||
* TODO: Move this into a separate file
|
|
||||||
*/
|
|
||||||
bool RWBackend::reply_is_complete(GWBUF *buffer)
|
|
||||||
{
|
|
||||||
if (get_reply_state() == REPLY_STATE_START &&
|
|
||||||
(!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
|
|
||||||
{
|
|
||||||
if (GWBUF_IS_COLLECTED_RESULT(buffer) ||
|
|
||||||
current_command() == MXS_COM_STMT_PREPARE ||
|
|
||||||
!mxs_mysql_is_ok_packet(buffer) ||
|
|
||||||
!mxs_mysql_more_results_after_ok(buffer))
|
|
||||||
{
|
|
||||||
/** Not a result set, we have the complete response */
|
|
||||||
LOG_RS(this, REPLY_STATE_DONE);
|
|
||||||
set_reply_state(REPLY_STATE_DONE);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// This is an OK packet and more results will follow
|
|
||||||
ss_dassert(mxs_mysql_is_ok_packet(buffer) &&
|
|
||||||
mxs_mysql_more_results_after_ok(buffer));
|
|
||||||
|
|
||||||
if (have_next_packet(buffer))
|
|
||||||
{
|
|
||||||
LOG_RS(this, REPLY_STATE_RSET_COLDEF);
|
|
||||||
set_reply_state(REPLY_STATE_RSET_COLDEF);
|
|
||||||
return reply_is_complete(buffer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
bool more = false;
|
|
||||||
modutil_state state = {is_large_packet()};
|
|
||||||
int n_old_eof = get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
|
|
||||||
int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &state);
|
|
||||||
set_large_packet(state.state);
|
|
||||||
|
|
||||||
if (n_eof > 2)
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* We have multiple results in the buffer, we only care about
|
|
||||||
* the state of the last one. Skip the complete result sets and act
|
|
||||||
* like we're processing a single result set.
|
|
||||||
*/
|
|
||||||
n_eof = n_eof % 2 ? 1 : 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (n_eof == 0)
|
|
||||||
{
|
|
||||||
/** Waiting for the EOF packet after the column definitions */
|
|
||||||
LOG_RS(this, REPLY_STATE_RSET_COLDEF);
|
|
||||||
set_reply_state(REPLY_STATE_RSET_COLDEF);
|
|
||||||
}
|
|
||||||
else if (n_eof == 1 && current_command() != MXS_COM_FIELD_LIST)
|
|
||||||
{
|
|
||||||
/** Waiting for the EOF packet after the rows */
|
|
||||||
LOG_RS(this, REPLY_STATE_RSET_ROWS);
|
|
||||||
set_reply_state(REPLY_STATE_RSET_ROWS);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/** We either have a complete result set or a response to
|
|
||||||
* a COM_FIELD_LIST command */
|
|
||||||
ss_dassert(n_eof == 2 || (n_eof == 1 && current_command() == MXS_COM_FIELD_LIST));
|
|
||||||
LOG_RS(this, REPLY_STATE_DONE);
|
|
||||||
set_reply_state(REPLY_STATE_DONE);
|
|
||||||
|
|
||||||
if (more)
|
|
||||||
{
|
|
||||||
/** The server will send more resultsets */
|
|
||||||
LOG_RS(this, REPLY_STATE_START);
|
|
||||||
set_reply_state(REPLY_STATE_START);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return get_reply_state() == REPLY_STATE_DONE;
|
|
||||||
}
|
|
||||||
|
|
||||||
void close_all_connections(SRWBackendList& backends)
|
void close_all_connections(SRWBackendList& backends)
|
||||||
{
|
{
|
||||||
for (SRWBackendList::iterator it = backends.begin(); it != backends.end(); it++)
|
for (SRWBackendList::iterator it = backends.begin(); it != backends.end(); it++)
|
||||||
|
@ -950,8 +950,6 @@ bool handle_got_target(RWSplit *inst, RWSplitSession *rses,
|
|||||||
{
|
{
|
||||||
/** The server will reply to this command */
|
/** The server will reply to this command */
|
||||||
ss_dassert(target->get_reply_state() == REPLY_STATE_DONE);
|
ss_dassert(target->get_reply_state() == REPLY_STATE_DONE);
|
||||||
|
|
||||||
LOG_RS(target, REPLY_STATE_START);
|
|
||||||
target->set_reply_state(REPLY_STATE_START);
|
target->set_reply_state(REPLY_STATE_START);
|
||||||
rses->expected_responses++;
|
rses->expected_responses++;
|
||||||
|
|
||||||
|
@ -85,6 +85,92 @@ void RWBackend::close(close_type type)
|
|||||||
mxs::Backend::close(type);
|
mxs::Backend::close(type);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline bool have_next_packet(GWBUF* buffer)
|
||||||
|
{
|
||||||
|
uint32_t len = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN;
|
||||||
|
return gwbuf_length(buffer) > len;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Check if we have received a complete reply from the backend
|
||||||
|
*
|
||||||
|
* @param backend Backend reference
|
||||||
|
* @param buffer Buffer containing the response
|
||||||
|
*
|
||||||
|
* @return True if the complete response has been received
|
||||||
|
*/
|
||||||
|
bool RWBackend::reply_is_complete(GWBUF *buffer)
|
||||||
|
{
|
||||||
|
if (get_reply_state() == REPLY_STATE_START &&
|
||||||
|
(!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
|
||||||
|
{
|
||||||
|
if (GWBUF_IS_COLLECTED_RESULT(buffer) ||
|
||||||
|
current_command() == MXS_COM_STMT_PREPARE ||
|
||||||
|
!mxs_mysql_is_ok_packet(buffer) ||
|
||||||
|
!mxs_mysql_more_results_after_ok(buffer))
|
||||||
|
{
|
||||||
|
/** Not a result set, we have the complete response */
|
||||||
|
set_reply_state(REPLY_STATE_DONE);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// This is an OK packet and more results will follow
|
||||||
|
ss_dassert(mxs_mysql_is_ok_packet(buffer) &&
|
||||||
|
mxs_mysql_more_results_after_ok(buffer));
|
||||||
|
|
||||||
|
if (have_next_packet(buffer))
|
||||||
|
{
|
||||||
|
set_reply_state(REPLY_STATE_RSET_COLDEF);
|
||||||
|
return reply_is_complete(buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
bool more = false;
|
||||||
|
modutil_state state = {is_large_packet()};
|
||||||
|
int n_old_eof = get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
|
||||||
|
int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &state);
|
||||||
|
set_large_packet(state.state);
|
||||||
|
|
||||||
|
if (n_eof > 2)
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* We have multiple results in the buffer, we only care about
|
||||||
|
* the state of the last one. Skip the complete result sets and act
|
||||||
|
* like we're processing a single result set.
|
||||||
|
*/
|
||||||
|
n_eof = n_eof % 2 ? 1 : 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (n_eof == 0)
|
||||||
|
{
|
||||||
|
/** Waiting for the EOF packet after the column definitions */
|
||||||
|
set_reply_state(REPLY_STATE_RSET_COLDEF);
|
||||||
|
}
|
||||||
|
else if (n_eof == 1 && current_command() != MXS_COM_FIELD_LIST)
|
||||||
|
{
|
||||||
|
/** Waiting for the EOF packet after the rows */
|
||||||
|
set_reply_state(REPLY_STATE_RSET_ROWS);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/** We either have a complete result set or a response to
|
||||||
|
* a COM_FIELD_LIST command */
|
||||||
|
ss_dassert(n_eof == 2 || (n_eof == 1 && current_command() == MXS_COM_FIELD_LIST));
|
||||||
|
set_reply_state(REPLY_STATE_DONE);
|
||||||
|
|
||||||
|
if (more)
|
||||||
|
{
|
||||||
|
/** The server will send more resultsets */
|
||||||
|
set_reply_state(REPLY_STATE_START);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return get_reply_state() == REPLY_STATE_DONE;
|
||||||
|
}
|
||||||
|
|
||||||
uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer)
|
uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer)
|
||||||
{
|
{
|
||||||
uint32_t rval = 0;
|
uint32_t rval = 0;
|
||||||
|
@ -35,10 +35,6 @@ typedef enum
|
|||||||
EXPECTING_REAL_RESULT
|
EXPECTING_REAL_RESULT
|
||||||
} wait_gtid_state_t;
|
} wait_gtid_state_t;
|
||||||
|
|
||||||
/** Reply state change debug logging */
|
|
||||||
#define LOG_RS(a, b) MXS_DEBUG("%s %s -> %s", (a)->uri(), \
|
|
||||||
rstostr((a)->get_reply_state()), rstostr(b));
|
|
||||||
|
|
||||||
typedef std::map<uint32_t, uint32_t> BackendHandleMap; /** Internal ID to external ID */
|
typedef std::map<uint32_t, uint32_t> BackendHandleMap; /** Internal ID to external ID */
|
||||||
typedef std::map<uint32_t, uint32_t> ClientHandleMap; /** External ID to internal ID */
|
typedef std::map<uint32_t, uint32_t> ClientHandleMap; /** External ID to internal ID */
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user