Move reply_is_complete into RWBackend

Moved the reply state checking inside the RWBackend class to make it
possibly reusable in other parts of MaxScale. Also removed a redundant
function.
This commit is contained in:
Markus Mäkelä
2018-03-29 04:28:43 +03:00
parent 3857912c61
commit 8222bdbc20
3 changed files with 28 additions and 32 deletions

View File

@ -517,20 +517,22 @@ static inline bool have_next_packet(GWBUF* buffer)
* @param buffer Buffer containing the response * @param buffer Buffer containing the response
* *
* @return True if the complete response has been received * @return True if the complete response has been received
*
* TODO: Move this into a separate file
*/ */
bool reply_is_complete(SRWBackend& backend, GWBUF *buffer) bool RWBackend::reply_is_complete(GWBUF *buffer)
{ {
if (backend->get_reply_state() == REPLY_STATE_START && if (get_reply_state() == REPLY_STATE_START &&
(!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer))) (!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
{ {
if (GWBUF_IS_COLLECTED_RESULT(buffer) || if (GWBUF_IS_COLLECTED_RESULT(buffer) ||
backend->current_command() == MXS_COM_STMT_PREPARE || current_command() == MXS_COM_STMT_PREPARE ||
!mxs_mysql_is_ok_packet(buffer) || !mxs_mysql_is_ok_packet(buffer) ||
!mxs_mysql_more_results_after_ok(buffer)) !mxs_mysql_more_results_after_ok(buffer))
{ {
/** Not a result set, we have the complete response */ /** Not a result set, we have the complete response */
LOG_RS(backend, REPLY_STATE_DONE); LOG_RS(this, REPLY_STATE_DONE);
backend->set_reply_state(REPLY_STATE_DONE); set_reply_state(REPLY_STATE_DONE);
} }
else else
{ {
@ -540,19 +542,19 @@ bool reply_is_complete(SRWBackend& backend, GWBUF *buffer)
if (have_next_packet(buffer)) if (have_next_packet(buffer))
{ {
LOG_RS(backend, REPLY_STATE_RSET_COLDEF); LOG_RS(this, REPLY_STATE_RSET_COLDEF);
backend->set_reply_state(REPLY_STATE_RSET_COLDEF); set_reply_state(REPLY_STATE_RSET_COLDEF);
return reply_is_complete(backend, buffer); return reply_is_complete(buffer);
} }
} }
} }
else else
{ {
bool more = false; bool more = false;
modutil_state state = {backend->is_large_packet()}; modutil_state state = {is_large_packet()};
int n_old_eof = backend->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0; int n_old_eof = get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &state); int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &state);
backend->set_large_packet(state.state); set_large_packet(state.state);
if (n_eof > 2) if (n_eof > 2)
{ {
@ -567,33 +569,33 @@ bool reply_is_complete(SRWBackend& backend, GWBUF *buffer)
if (n_eof == 0) if (n_eof == 0)
{ {
/** Waiting for the EOF packet after the column definitions */ /** Waiting for the EOF packet after the column definitions */
LOG_RS(backend, REPLY_STATE_RSET_COLDEF); LOG_RS(this, REPLY_STATE_RSET_COLDEF);
backend->set_reply_state(REPLY_STATE_RSET_COLDEF); set_reply_state(REPLY_STATE_RSET_COLDEF);
} }
else if (n_eof == 1 && backend->current_command() != MXS_COM_FIELD_LIST) else if (n_eof == 1 && current_command() != MXS_COM_FIELD_LIST)
{ {
/** Waiting for the EOF packet after the rows */ /** Waiting for the EOF packet after the rows */
LOG_RS(backend, REPLY_STATE_RSET_ROWS); LOG_RS(this, REPLY_STATE_RSET_ROWS);
backend->set_reply_state(REPLY_STATE_RSET_ROWS); set_reply_state(REPLY_STATE_RSET_ROWS);
} }
else else
{ {
/** We either have a complete result set or a response to /** We either have a complete result set or a response to
* a COM_FIELD_LIST command */ * a COM_FIELD_LIST command */
ss_dassert(n_eof == 2 || (n_eof == 1 && backend->current_command() == MXS_COM_FIELD_LIST)); ss_dassert(n_eof == 2 || (n_eof == 1 && current_command() == MXS_COM_FIELD_LIST));
LOG_RS(backend, REPLY_STATE_DONE); LOG_RS(this, REPLY_STATE_DONE);
backend->set_reply_state(REPLY_STATE_DONE); set_reply_state(REPLY_STATE_DONE);
if (more) if (more)
{ {
/** The server will send more resultsets */ /** The server will send more resultsets */
LOG_RS(backend, REPLY_STATE_START); LOG_RS(this, REPLY_STATE_START);
backend->set_reply_state(REPLY_STATE_START); set_reply_state(REPLY_STATE_START);
} }
} }
} }
return backend->get_reply_state() == REPLY_STATE_DONE; return get_reply_state() == REPLY_STATE_DONE;
} }
void close_all_connections(SRWBackendList& backends) void close_all_connections(SRWBackendList& backends)
@ -1251,7 +1253,7 @@ static void clientReply(MXS_ROUTER *instance,
session_clear_stmt(backend_dcb->session); session_clear_stmt(backend_dcb->session);
} }
if (reply_is_complete(backend, writebuf)) if (backend->reply_is_complete(writebuf))
{ {
/** Got a complete reply, acknowledge the write and decrement expected response count */ /** Got a complete reply, acknowledge the write and decrement expected response count */
backend->ack_write(); backend->ack_write();

View File

@ -829,14 +829,6 @@ bool handle_master_is_target(RWSplit *inst, RWSplitSession *rses,
return succp; return succp;
} }
static inline bool query_creates_reply(uint8_t cmd)
{
return cmd != MXS_COM_QUIT &&
cmd != MXS_COM_STMT_SEND_LONG_DATA &&
cmd != MXS_COM_STMT_CLOSE &&
cmd != MXS_COM_STMT_FETCH; // Fetch is done mid-result
}
static inline bool is_large_query(GWBUF* buf) static inline bool is_large_query(GWBUF* buf)
{ {
uint32_t buflen = gwbuf_length(buf); uint32_t buflen = gwbuf_length(buf);
@ -938,7 +930,7 @@ bool handle_got_target(RWSplit *inst, RWSplitSession *rses,
} }
if (rses->load_data_state != LOAD_DATA_ACTIVE && if (rses->load_data_state != LOAD_DATA_ACTIVE &&
query_creates_reply(cmd)) mxs_mysql_command_will_respond(cmd))
{ {
response = mxs::Backend::EXPECT_RESPONSE; response = mxs::Backend::EXPECT_RESPONSE;
} }

View File

@ -83,6 +83,8 @@ public:
return m_command; return m_command;
} }
bool reply_is_complete(GWBUF *buffer);
private: private:
reply_state_t m_reply_state; reply_state_t m_reply_state;
BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */ BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */