MXS-1506: Store queries inside RWSplitSession
As the readwritesplit is the only thing that uses the statement storage, it can be integrated into RWSplitSession. This makes the code a lot simpler.
This commit is contained in:
@ -203,11 +203,6 @@ typedef struct session
|
|||||||
mxs_session_trx_state_t trx_state; /*< The current transaction state. */
|
mxs_session_trx_state_t trx_state; /*< The current transaction state. */
|
||||||
bool autocommit; /*< Whether autocommit is on. */
|
bool autocommit; /*< Whether autocommit is on. */
|
||||||
intptr_t client_protocol_data; /*< Owned and managed by the client protocol. */
|
intptr_t client_protocol_data; /*< Owned and managed by the client protocol. */
|
||||||
struct
|
|
||||||
{
|
|
||||||
GWBUF *buffer; /**< Buffer containing the statement */
|
|
||||||
const struct server *target; /**< Where the statement was sent */
|
|
||||||
} stmt; /**< Current statement being executed */
|
|
||||||
bool qualifies_for_pooling; /**< Whether this session qualifies for the connection pool */
|
bool qualifies_for_pooling; /**< Whether this session qualifies for the connection pool */
|
||||||
SessionVarsByName* variables; /*< @maxscale variables associated with this session */
|
SessionVarsByName* variables; /*< @maxscale variables associated with this session */
|
||||||
struct
|
struct
|
||||||
@ -481,50 +476,6 @@ void session_close(MXS_SESSION *session);
|
|||||||
*/
|
*/
|
||||||
void session_put_ref(MXS_SESSION *session);
|
void session_put_ref(MXS_SESSION *session);
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Store the current statement into session
|
|
||||||
*
|
|
||||||
* This creates an additional reference to the buffer. If an old statement is stored,
|
|
||||||
* it will be replaced with a clone of @c buf.
|
|
||||||
*
|
|
||||||
* @param session Session where statement is stored
|
|
||||||
* @param buf Buffer containing the current statement
|
|
||||||
* @param server Server where the statement is being executed
|
|
||||||
* @return True if statement was successfully stored, false if the cloning of @c buf failed.
|
|
||||||
*/
|
|
||||||
bool session_store_stmt(MXS_SESSION *session, GWBUF *buf, const struct server *server);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Fetch stored statement
|
|
||||||
*
|
|
||||||
* The value returned by this call must be freed by the caller with gwbuf_free().
|
|
||||||
*
|
|
||||||
* @param session Session with a stored statement
|
|
||||||
* @param buffer Pointer where the buffer is stored
|
|
||||||
* @param target Pointer where target server is stored
|
|
||||||
* @return True if a statement was stored
|
|
||||||
*/
|
|
||||||
bool session_take_stmt(MXS_SESSION *session, GWBUF **buffer, const struct server **target);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Check if the session has a stored statement
|
|
||||||
*
|
|
||||||
* @param session Session to check
|
|
||||||
*
|
|
||||||
* @return True if the session has a stored statement
|
|
||||||
*/
|
|
||||||
static inline bool session_have_stmt(MXS_SESSION *session)
|
|
||||||
{
|
|
||||||
return session->stmt.buffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Clear the stored statement
|
|
||||||
*
|
|
||||||
* @param session Session to clear
|
|
||||||
*/
|
|
||||||
void session_clear_stmt(MXS_SESSION *session);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Convert a session to JSON
|
* @brief Convert a session to JSON
|
||||||
*
|
*
|
||||||
|
@ -128,8 +128,6 @@ static MXS_SESSION* session_alloc_body(SERVICE* service, DCB* client_dcb,
|
|||||||
session->service = service;
|
session->service = service;
|
||||||
session->client_dcb = client_dcb;
|
session->client_dcb = client_dcb;
|
||||||
session->stats.connect = time(0);
|
session->stats.connect = time(0);
|
||||||
session->stmt.buffer = NULL;
|
|
||||||
session->stmt.target = NULL;
|
|
||||||
session->qualifies_for_pooling = false;
|
session->qualifies_for_pooling = false;
|
||||||
|
|
||||||
MXS_CONFIG *config = config_get_global_options();
|
MXS_CONFIG *config = config_get_global_options();
|
||||||
@ -397,7 +395,6 @@ session_final_free(MXS_SESSION *session)
|
|||||||
session_dump_statements(session);
|
session_dump_statements(session);
|
||||||
}
|
}
|
||||||
|
|
||||||
gwbuf_free(session->stmt.buffer);
|
|
||||||
delete session->variables;
|
delete session->variables;
|
||||||
delete session->last_statements;
|
delete session->last_statements;
|
||||||
MXS_FREE(session);
|
MXS_FREE(session);
|
||||||
@ -964,50 +961,6 @@ void session_put_ref(MXS_SESSION *session)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool session_store_stmt(MXS_SESSION *session, GWBUF *buf, const SERVER *server)
|
|
||||||
{
|
|
||||||
bool rval = false;
|
|
||||||
|
|
||||||
if (session->stmt.buffer)
|
|
||||||
{
|
|
||||||
/** This should not happen with proper use */
|
|
||||||
ss_dassert(false);
|
|
||||||
gwbuf_free(session->stmt.buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((session->stmt.buffer = gwbuf_clone(buf)))
|
|
||||||
{
|
|
||||||
session->stmt.target = server;
|
|
||||||
/** No old statements were stored and we successfully cloned the buffer */
|
|
||||||
rval = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return rval;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool session_take_stmt(MXS_SESSION *session, GWBUF **buffer, const SERVER **target)
|
|
||||||
{
|
|
||||||
bool rval = false;
|
|
||||||
|
|
||||||
if (session->stmt.buffer && session->stmt.target)
|
|
||||||
{
|
|
||||||
*buffer = session->stmt.buffer;
|
|
||||||
*target = session->stmt.target;
|
|
||||||
session->stmt.buffer = NULL;
|
|
||||||
session->stmt.target = NULL;
|
|
||||||
rval = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return rval;
|
|
||||||
}
|
|
||||||
|
|
||||||
void session_clear_stmt(MXS_SESSION *session)
|
|
||||||
{
|
|
||||||
gwbuf_free(session->stmt.buffer);
|
|
||||||
session->stmt.buffer = NULL;
|
|
||||||
session->stmt.target = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint64_t session_get_next_id()
|
uint64_t session_get_next_id()
|
||||||
{
|
{
|
||||||
return atomic_add_uint64(&next_session_id, 1);
|
return atomic_add_uint64(&next_session_id, 1);
|
||||||
|
@ -959,9 +959,9 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool
|
|||||||
|
|
||||||
if (target->write(send_buf, response))
|
if (target->write(send_buf, response))
|
||||||
{
|
{
|
||||||
if (store && !session_store_stmt(m_client->session, querybuf, target->server()))
|
if (store)
|
||||||
{
|
{
|
||||||
MXS_ERROR("Failed to store current statement, it won't be retried if it fails.");
|
set_query(send_buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_add_uint64(&m_router->stats().n_queries, 1);
|
atomic_add_uint64(&m_router->stats().n_queries, 1);
|
||||||
|
@ -37,7 +37,8 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
|||||||
m_wait_gtid_state(EXPECTING_NOTHING),
|
m_wait_gtid_state(EXPECTING_NOTHING),
|
||||||
m_next_seq(0),
|
m_next_seq(0),
|
||||||
m_qc(this, session, instance->config().use_sql_variables_in),
|
m_qc(this, session, instance->config().use_sql_variables_in),
|
||||||
m_retry_duration(0)
|
m_retry_duration(0),
|
||||||
|
m_current_query(NULL)
|
||||||
{
|
{
|
||||||
if (m_config.rw_max_slave_conn_percent)
|
if (m_config.rw_max_slave_conn_percent)
|
||||||
{
|
{
|
||||||
@ -386,11 +387,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (session_have_stmt(backend_dcb->session))
|
reset_query();
|
||||||
{
|
|
||||||
/** Statement was successfully executed, free the stored statement */
|
|
||||||
session_clear_stmt(backend_dcb->session);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (backend->reply_is_complete(writebuf))
|
if (backend->reply_is_complete(writebuf))
|
||||||
{
|
{
|
||||||
@ -612,17 +609,12 @@ bool RWSplitSession::handle_error_new_connection(DCB *backend_dcb, GWBUF *errmsg
|
|||||||
* Try to reroute the statement to a working server or send an error
|
* Try to reroute the statement to a working server or send an error
|
||||||
* to the client.
|
* to the client.
|
||||||
*/
|
*/
|
||||||
GWBUF *stored = NULL;
|
GWBUF *stored = release_query();
|
||||||
const SERVER *target = NULL;
|
|
||||||
|
|
||||||
if (session_take_stmt(backend_dcb->session, &stored, &target) &&
|
if (stored && m_config.retry_failed_reads)
|
||||||
m_config.retry_failed_reads)
|
|
||||||
{
|
{
|
||||||
ss_dassert(target == backend->server());
|
|
||||||
MXS_INFO("Re-routing failed read after server '%s' failed", backend->name());
|
MXS_INFO("Re-routing failed read after server '%s' failed", backend->name());
|
||||||
MXS_SESSION* session = m_client->session;
|
MXS_SESSION* session = m_client->session;
|
||||||
|
|
||||||
// Try to route the failed read as often as possible
|
|
||||||
session_delay_routing(session, router_as_downstream(session), stored, 1);
|
session_delay_routing(session, router_as_downstream(session), stored, 1);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -127,6 +127,7 @@ public:
|
|||||||
uint32_t m_next_seq; /**< Next packet's sequence number */
|
uint32_t m_next_seq; /**< Next packet's sequence number */
|
||||||
mxs::QueryClassifier m_qc; /**< The query classifier. */
|
mxs::QueryClassifier m_qc; /**< The query classifier. */
|
||||||
uint64_t m_retry_duration; /**< Total time spent retrying queries */
|
uint64_t m_retry_duration; /**< Total time spent retrying queries */
|
||||||
|
GWBUF* m_current_query; /**< Current query being executed, NULL for no query */
|
||||||
|
|
||||||
private:
|
private:
|
||||||
RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
||||||
@ -191,6 +192,37 @@ private:
|
|||||||
m_retry_duration < m_config.query_retry_timeout &&
|
m_retry_duration < m_config.query_retry_timeout &&
|
||||||
!session_trx_is_active(m_client->session);
|
!session_trx_is_active(m_client->session);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the current query
|
||||||
|
*
|
||||||
|
* @param query The current query
|
||||||
|
*/
|
||||||
|
inline void set_query(GWBUF* query)
|
||||||
|
{
|
||||||
|
ss_dassert(!m_current_query);
|
||||||
|
m_current_query = gwbuf_clone(query);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release current query
|
||||||
|
*
|
||||||
|
* @return The current query
|
||||||
|
*/
|
||||||
|
inline GWBUF* release_query()
|
||||||
|
{
|
||||||
|
GWBUF* rval = m_current_query;
|
||||||
|
m_current_query = NULL;
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset current query
|
||||||
|
*/
|
||||||
|
inline void reset_query()
|
||||||
|
{
|
||||||
|
gwbuf_free(release_query());
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Reference in New Issue
Block a user