Move most readwritesplit functions into classes

Most of the funtionality is now a member function of either the RWSplit or
RWSplitSession class. This removes the need to pass the router and session
parameters to all functions.
This commit is contained in:
Markus Mäkelä
2018-04-03 23:18:57 +03:00
parent ce500d782a
commit 15f15be49d
11 changed files with 360 additions and 470 deletions

View File

@ -32,7 +32,6 @@
#include <maxscale/spinlock.h>
#include <maxscale/mysql_utils.h>
#include "rwsplit_internal.hh"
#include "rwsplitsession.hh"
#include "routeinfo.hh"
@ -50,40 +49,20 @@ using namespace maxscale;
/** Maximum number of slaves */
#define MAX_SLAVE_COUNT "255"
/*
* The functions that implement the router module API
*/
static bool rwsplit_process_router_options(RWSplit *router,
char **options);
static void handle_error_reply_client(MXS_SESSION *ses, RWSplitSession *rses,
DCB *backend_dcb, GWBUF *errmsg);
static bool handle_error_new_connection(RWSplit *inst,
RWSplitSession **rses,
DCB *backend_dcb, GWBUF *errmsg);
static bool route_stored_query(RWSplitSession *rses);
/**
* Internal functions
*/
/*
* @brief Get the maximum replication lag for this router
*
* @param rses Router client session
* @return Replication lag from configuration or very large number
*/
int rses_get_max_replication_lag(RWSplitSession *rses)
int RWSplitSession::get_max_replication_lag()
{
int conf_max_rlag;
CHK_CLIENT_RSES(rses);
/** if there is no configured value, then longest possible int is used */
if (rses->rses_config.max_slave_replication_lag > 0)
if (rses_config.max_slave_replication_lag > 0)
{
conf_max_rlag = rses->rses_config.max_slave_replication_lag;
conf_max_rlag = rses_config.max_slave_replication_lag;
}
else
{
@ -94,25 +73,18 @@ int rses_get_max_replication_lag(RWSplitSession *rses)
}
/**
* @brief Find a back end reference that matches the given DCB
* @brief Find the backend reference that matches the given DCB
*
* Finds out if there is a backend reference pointing at the DCB given as
* parameter.
* @param dcb DCB to match
*
* @param rses router client session
* @param dcb DCB
*
* @return backend reference pointer if succeed or NULL
* @return The correct reference
*/
static inline SRWBackend& get_backend_from_dcb(RWSplitSession *rses, DCB *dcb)
SRWBackend& RWSplitSession::get_backend_from_dcb(DCB *dcb)
{
ss_dassert(dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
CHK_DCB(dcb);
CHK_CLIENT_RSES(rses);
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
for (auto it = backends.begin(); it != backends.end(); it++)
{
SRWBackend& backend = *it;
@ -277,14 +249,10 @@ static bool handle_max_slaves(Config& config, const char *str)
* @param backend_dcb DCB for the backend server that has failed
* @param errmsg GWBUF containing the error message
*/
static void handle_error_reply_client(MXS_SESSION *ses, RWSplitSession *rses,
DCB *backend_dcb, GWBUF *errmsg)
void RWSplitSession::handle_error_reply_client(DCB *backend_dcb, GWBUF *errmsg)
{
mxs_session_state_t sesstate = ses->state;
DCB *client_dcb = ses->client_dcb;
SRWBackend& backend = get_backend_from_dcb(rses, backend_dcb);
mxs_session_state_t sesstate = client_dcb->session->state;
SRWBackend& backend = get_backend_from_dcb(backend_dcb);
backend->close();
@ -295,24 +263,22 @@ static void handle_error_reply_client(MXS_SESSION *ses, RWSplitSession *rses,
}
}
static bool reroute_stored_statement(RWSplitSession *rses, const SRWBackend& old, GWBUF *stored)
bool RWSplitSession::reroute_stored_statement(const SRWBackend& old, GWBUF *stored)
{
bool success = false;
if (!session_trx_is_active(rses->client_dcb->session))
if (!session_trx_is_active(client_dcb->session))
{
/**
* Only try to retry the read if autocommit is enabled and we are
* outside of a transaction
*/
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
for (auto it = backends.begin(); it != backends.end(); it++)
{
SRWBackend& backend = *it;
if (backend->in_use() && backend != old &&
!backend->is_master() &&
backend->is_slave())
!backend->is_master() && backend->is_slave())
{
/** Found a valid candidate; a non-master slave that's in use */
if (backend->write(stored))
@ -320,25 +286,25 @@ static bool reroute_stored_statement(RWSplitSession *rses, const SRWBackend& old
MXS_INFO("Retrying failed read at '%s'.", backend->name());
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
backend->set_reply_state(REPLY_STATE_START);
rses->expected_responses++;
expected_responses++;
success = true;
break;
}
}
}
if (!success && rses->current_master && rses->current_master->in_use())
if (!success && current_master && current_master->in_use())
{
/**
* Either we failed to write to the slave or no valid slave was found.
* Try to retry the read on the master.
*/
if (rses->current_master->write(stored))
if (current_master->write(stored))
{
MXS_INFO("Retrying failed read at '%s'.", rses->current_master->name());
ss_dassert(rses->current_master->get_reply_state() == REPLY_STATE_DONE);
rses->current_master->set_reply_state(REPLY_STATE_START);
rses->expected_responses++;
MXS_INFO("Retrying failed read at '%s'.", current_master->name());
ss_dassert(current_master->get_reply_state() == REPLY_STATE_DONE);
current_master->set_reply_state(REPLY_STATE_START);
expected_responses++;
success = true;
}
}
@ -362,21 +328,17 @@ static bool reroute_stored_statement(RWSplitSession *rses, const SRWBackend& old
* @return true if there are enough backend connections to continue, false if
* not
*/
static bool handle_error_new_connection(RWSplit *inst,
RWSplitSession **rses,
DCB *backend_dcb, GWBUF *errmsg)
bool RWSplitSession::handle_error_new_connection(DCB *backend_dcb, GWBUF *errmsg)
{
RWSplitSession *myrses = *rses;
SRWBackend& backend = get_backend_from_dcb(myrses, backend_dcb);
SRWBackend& backend = get_backend_from_dcb(backend_dcb);
MXS_SESSION* ses = backend_dcb->session;
bool route_stored = false;
CHK_SESSION(ses);
if (backend->is_waiting_result())
{
ss_dassert(myrses->expected_responses > 0);
myrses->expected_responses--;
ss_dassert(expected_responses > 0);
expected_responses--;
/**
* A query was sent through the backend and it is waiting for a reply.
@ -387,7 +349,7 @@ static bool handle_error_new_connection(RWSplit *inst,
const SERVER *target = NULL;
if (!session_take_stmt(backend_dcb->session, &stored, &target) ||
target != backend->backend()->server ||
!reroute_stored_statement(*rses, backend, stored))
!reroute_stored_statement(backend, stored))
{
/**
* We failed to route the stored statement or no statement was
@ -407,7 +369,7 @@ static bool handle_error_new_connection(RWSplit *inst,
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
}
if (myrses->expected_responses == 0)
if (expected_responses == 0)
{
/** The response from this server was the last one, try to
* route all queued queries */
@ -424,25 +386,26 @@ static bool handle_error_new_connection(RWSplit *inst,
if (route_stored)
{
route_stored_query(myrses);
route_stored_query();
}
int max_nslaves = inst->max_slave_count();
int max_nslaves = router->max_slave_count();
bool succp;
/**
* Try to get replacement slave or at least the minimum
* number of slave connections for router session.
*/
if (myrses->recv_sescmd > 0 && inst->config().disable_sescmd_history)
if (recv_sescmd > 0 && rses_config.disable_sescmd_history)
{
succp = inst->have_enough_servers();
succp = router->have_enough_servers();
}
else
{
succp = select_connect_backend_servers(inst, ses, myrses->backends,
myrses->current_master, &myrses->sescmd_list,
&myrses->expected_responses,
connection_type::SLAVE);
succp = router->select_connect_backend_servers(ses, backends,
current_master,
&sescmd_list,
&expected_responses,
connection_type::SLAVE);
}
return succp;
@ -458,43 +421,43 @@ static bool handle_error_new_connection(RWSplit *inst,
* @param rses Router client session
* @return True if a stored query was routed successfully
*/
static bool route_stored_query(RWSplitSession *rses)
bool RWSplitSession::route_stored_query()
{
bool rval = true;
/** Loop over the stored statements as long as the routeQuery call doesn't
* append more data to the queue. If it appends data to the queue, we need
* to wait for a response before attempting another reroute */
while (rses->query_queue)
while (query_queue)
{
GWBUF* query_queue = modutil_get_next_MySQL_packet(&rses->query_queue);
GWBUF* query_queue = modutil_get_next_MySQL_packet(&query_queue);
query_queue = gwbuf_make_contiguous(query_queue);
/** Store the query queue locally for the duration of the routeQuery call.
* This prevents recursive calls into this function. */
GWBUF *temp_storage = rses->query_queue;
rses->query_queue = NULL;
GWBUF *temp_storage = query_queue;
query_queue = NULL;
// TODO: Move the handling of queued queries to the client protocol
// TODO: module where the command tracking is done automatically.
uint8_t cmd = mxs_mysql_get_command(query_queue);
mysql_protocol_set_current_command(rses->client_dcb, (mxs_mysql_cmd_t)cmd);
mysql_protocol_set_current_command(client_dcb, (mxs_mysql_cmd_t)cmd);
if (!rses->routeQuery(query_queue))
if (!routeQuery(query_queue))
{
rval = false;
MXS_ERROR("Failed to route queued query.");
}
if (rses->query_queue == NULL)
if (query_queue == NULL)
{
/** Query successfully routed and no responses are expected */
rses->query_queue = temp_storage;
query_queue = temp_storage;
}
else
{
/** Routing was stopped, we need to wait for a response before retrying */
rses->query_queue = gwbuf_append(temp_storage, rses->query_queue);
query_queue = gwbuf_append(temp_storage, query_queue);
break;
}
}
@ -669,21 +632,18 @@ RWSplitSession* RWSplit::newSession(MXS_SESSION *session)
*/
void RWSplitSession::close()
{
RWSplitSession *router_cli_ses = this;
CHK_CLIENT_RSES(router_cli_ses);
if (!router_cli_ses->rses_closed)
if (!rses_closed)
{
router_cli_ses->rses_closed = true;
close_all_connections(router_cli_ses->backends);
rses_closed = true;
close_all_connections(backends);
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) &&
router_cli_ses->sescmd_list.size())
sescmd_list.size())
{
std::string sescmdstr;
for (mxs::SessionCommandList::iterator it = router_cli_ses->sescmd_list.begin();
it != router_cli_ses->sescmd_list.end(); it++)
for (mxs::SessionCommandList::iterator it = sescmd_list.begin();
it != sescmd_list.end(); it++)
{
mxs::SSessionCommand& scmd = *it;
sescmdstr += scmd->to_string();
@ -697,29 +657,25 @@ void RWSplitSession::close()
int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
{
RWSplit *inst = router;
RWSplitSession *rses = this;
int rval = 0;
CHK_CLIENT_RSES(rses);
if (rses->rses_closed)
if (rses_closed)
{
closed_session_reply(querybuf);
}
else
{
if (rses->query_queue == NULL &&
(rses->expected_responses == 0 ||
if (query_queue == NULL &&
(expected_responses == 0 ||
mxs_mysql_get_command(querybuf) == MXS_COM_STMT_FETCH ||
rses->load_data_state == LOAD_DATA_ACTIVE ||
rses->large_query))
load_data_state == LOAD_DATA_ACTIVE ||
large_query))
{
/** Gather the information required to make routing decisions */
RouteInfo info(rses, querybuf);
RouteInfo info(this, querybuf);
/** No active or pending queries */
if (route_single_stmt(inst, rses, querybuf, info))
if (route_single_stmt(querybuf, info))
{
rval = 1;
}
@ -730,16 +686,15 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
* We are already processing a request from the client. Store the
* new query and wait for the previous one to complete.
*/
ss_dassert(rses->expected_responses || rses->query_queue);
ss_dassert(expected_responses || query_queue);
MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command",
gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4],
rses->expected_responses);
rses->query_queue = gwbuf_append(rses->query_queue, querybuf);
gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], expected_responses);
query_queue = gwbuf_append(query_queue, querybuf);
querybuf = NULL;
rval = 1;
ss_dassert(rses->expected_responses > 0);
ss_dassert(expected_responses > 0);
if (rses->expected_responses == 0 && !route_stored_query(rses))
if (expected_responses == 0 && !route_stored_query())
{
rval = 0;
}
@ -906,7 +861,7 @@ static void log_unexpected_response(DCB* dcb, GWBUF* buffer)
* @param proto MySQLProtocol
* @return reset buffer
*/
GWBUF *discard_master_wait_gtid_result(GWBUF *buffer, RWSplitSession *rses)
GWBUF* RWSplitSession::discard_master_wait_gtid_result(GWBUF *buffer)
{
uint8_t header_and_command[MYSQL_HEADER_LEN + 1];
uint8_t packet_len = 0;
@ -917,15 +872,15 @@ GWBUF *discard_master_wait_gtid_result(GWBUF *buffer, RWSplitSession *rses)
/* ignore error packet */
if (MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_ERR)
{
rses->wait_gtid_state = EXPECTING_NOTHING;
wait_gtid_state = EXPECTING_NOTHING;
return buffer;
}
/* this packet must be an ok packet now */
ss_dassert(MYSQL_GET_COMMAND(header_and_command) == MYSQL_REPLY_OK);
packet_len = MYSQL_GET_PAYLOAD_LEN(header_and_command) + MYSQL_HEADER_LEN;
rses->wait_gtid_state = EXPECTING_REAL_RESULT;
rses->next_seq = 1;
wait_gtid_state = EXPECTING_REAL_RESULT;
next_seq = 1;
return gwbuf_consume(buffer, packet_len);
}
@ -937,19 +892,19 @@ GWBUF *discard_master_wait_gtid_result(GWBUF *buffer, RWSplitSession *rses)
* @param proto MySQLProtocol
*
*/
void correct_packet_sequence(GWBUF *buffer, RWSplitSession *rses)
void RWSplitSession::correct_packet_sequence(GWBUF *buffer)
{
uint8_t header[3];
uint32_t offset = 0;
uint32_t packet_len = 0;
if (rses->wait_gtid_state == EXPECTING_REAL_RESULT)
if (wait_gtid_state == EXPECTING_REAL_RESULT)
{
while (gwbuf_copy_data(buffer, offset, 3, header) == 3)
{
packet_len = MYSQL_GET_PAYLOAD_LEN(header) + MYSQL_HEADER_LEN;
uint8_t *seq = gwbuf_byte_pointer(buffer, offset + MYSQL_SEQ_OFFSET);
*seq = rses->next_seq;
rses->next_seq++;
*seq = next_seq;
next_seq++;
offset += packet_len;
}
}
@ -957,38 +912,35 @@ void correct_packet_sequence(GWBUF *buffer, RWSplitSession *rses)
void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
{
RWSplitSession *rses = this;
RWSplit *inst = router;
DCB *client_dcb = backend_dcb->session->client_dcb;
CHK_CLIENT_RSES(rses);
ss_dassert(!rses->rses_closed);
ss_dassert(!rses_closed);
SRWBackend& backend = get_backend_from_dcb(rses, backend_dcb);
SRWBackend& backend = get_backend_from_dcb(backend_dcb);
if (inst->config().enable_causal_read &&
if (rses_config.enable_causal_read &&
GWBUF_IS_REPLY_OK(writebuf) &&
backend == rses->current_master)
backend == current_master)
{
/** Save gtid position */
char *tmp = gwbuf_get_property(writebuf, (char *)"gtid");
if (tmp)
{
rses->gtid_pos = std::string(tmp);
gtid_pos = std::string(tmp);
}
}
if (rses->wait_gtid_state == EXPECTING_WAIT_GTID_RESULT)
if (wait_gtid_state == EXPECTING_WAIT_GTID_RESULT)
{
ss_dassert(rses->rses_config.enable_causal_read);
if ((writebuf = discard_master_wait_gtid_result(writebuf, rses)) == NULL)
ss_dassert(rses_config.enable_causal_read);
if ((writebuf = discard_master_wait_gtid_result(writebuf)) == NULL)
{
// Nothing to route, return
return;
}
}
if (rses->wait_gtid_state == EXPECTING_REAL_RESULT)
if (wait_gtid_state == EXPECTING_REAL_RESULT)
{
correct_packet_sequence(writebuf, rses);
correct_packet_sequence(writebuf);
}
if (backend->get_reply_state() == REPLY_STATE_DONE)
@ -1011,33 +963,33 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
{
/** Got a complete reply, acknowledge the write and decrement expected response count */
backend->ack_write();
rses->expected_responses--;
ss_dassert(rses->expected_responses >= 0);
expected_responses--;
ss_dassert(expected_responses >= 0);
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
MXS_INFO("Reply complete, last reply from %s", backend->name());
}
else
{
MXS_INFO("Reply not yet complete. Waiting for %d replies, got one from %s",
rses->expected_responses, backend->name());
expected_responses, backend->name());
}
if (backend->have_session_commands())
{
/** Reply to an executed session command */
process_sescmd_response(rses, backend, &writebuf);
process_sescmd_response(backend, &writebuf);
}
if (backend->have_session_commands())
{
if (backend->execute_session_command())
{
rses->expected_responses++;
expected_responses++;
}
}
else if (rses->expected_responses == 0 && rses->query_queue)
else if (expected_responses == 0 && query_queue)
{
route_stored_query(rses);
route_stored_query();
}
if (writebuf)
@ -1073,12 +1025,9 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
mxs_error_action_t action, bool *succp)
{
ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
RWSplit *inst = router;
RWSplitSession *rses = this;
CHK_CLIENT_RSES(rses);
CHK_DCB(problem_dcb);
if (rses->rses_closed)
if (rses_closed)
{
*succp = false;
return;
@ -1087,18 +1036,17 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
MXS_SESSION *session = problem_dcb->session;
ss_dassert(session);
SRWBackend& backend = get_backend_from_dcb(rses, problem_dcb);
SRWBackend& backend = get_backend_from_dcb(problem_dcb);
ss_dassert(backend->in_use());
switch (action)
{
case ERRACT_NEW_CONNECTION:
{
if (rses->current_master && rses->current_master->in_use() &&
rses->current_master == backend)
if (current_master && current_master->in_use() && current_master == backend)
{
/** The connection to the master has failed */
SERVER *srv = rses->current_master->server();
SERVER *srv = current_master->server();
bool can_continue = false;
if (!backend->is_waiting_result())
@ -1113,7 +1061,7 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
* can't be sure whether it was executed or not. In this
* case the safest thing to do is to close the client
* connection. */
if (rses->rses_config.master_failure_mode != RW_FAIL_INSTANTLY)
if (rses_config.master_failure_mode != RW_FAIL_INSTANTLY)
{
can_continue = true;
}
@ -1121,15 +1069,15 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
else
{
// We were expecting a response but we aren't going to get one
rses->expected_responses--;
expected_responses--;
if (rses->rses_config.master_failure_mode == RW_ERROR_ON_WRITE)
if (rses_config.master_failure_mode == RW_ERROR_ON_WRITE)
{
/** In error_on_write mode, the session can continue even
* if the master is lost. Send a read-only error to
* the client to let it know that the query failed. */
can_continue = true;
send_readonly_error(rses->client_dcb);
send_readonly_error(client_dcb);
}
if (!SERVER_IS_MASTER(srv) && !srv->master_err_is_logged)
@ -1153,7 +1101,7 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
}
else
{
if (rses->target_node && rses->target_node == backend &&
if (target_node && target_node == backend &&
session_trx_is_read_only(problem_dcb->session))
{
/**
@ -1167,7 +1115,7 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
else
{
/** Try to replace the failed connection with a new one */
*succp = handle_error_new_connection(inst, &rses, problem_dcb, errmsgbuf);
*succp = handle_error_new_connection(problem_dcb, errmsgbuf);
}
}
@ -1177,7 +1125,7 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
case ERRACT_REPLY_CLIENT:
{
handle_error_reply_client(session, rses, problem_dcb, errmsgbuf);
handle_error_reply_client(problem_dcb, errmsgbuf);
*succp = false; /*< no new backend servers were made available */
break;
}