Use Backend in readwritesplit.cc

Removed old router property code as it is no longer needed when
SessionCommand class used by the Backend class is taken into use.

Removed unnecessary code that is implemented as a part of the Backend
class.

Changed functions to return references to Backends instead of handling raw
DCBs. This introduces a few cases where the code returns a reference when
no reference is actually available. These cases are solved by having an
empty static shared_ptr that is returned in these cases. This is done to
silence any compiler warnings that returning references to local variables
would bring as these should never happen if the code is functioning
properly.
This commit is contained in:
Markus Mäkelä
2017-06-15 16:32:11 +03:00
parent c3c905f745
commit ab56cd0074
3 changed files with 125 additions and 609 deletions

View File

@ -49,19 +49,7 @@
* The functions that implement the router module API
*/
static MXS_ROUTER *createInstance(SERVICE *service, char **options);
static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *instance, MXS_SESSION *session);
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session);
static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session);
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session, GWBUF *queue);
static void diagnostics(MXS_ROUTER *instance, DCB *dcb);
static json_t* diagnostics_json(const MXS_ROUTER *instance);
static void clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue,
DCB *backend_dcb);
static void handleError(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
GWBUF *errmsgbuf, DCB *backend_dcb,
mxs_error_action_t action, bool *succp);
static uint64_t getCapabilities(MXS_ROUTER* instance);
static bool rwsplit_process_router_options(ROUTER_INSTANCE *router,
char **options);
@ -72,7 +60,6 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
DCB *backend_dcb, GWBUF *errmsg);
static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv,
int router_nsrv, ROUTER_INSTANCE *router);
static bool create_backends(ROUTER_CLIENT_SES *rses, backend_ref_t** dest, int* n_backend);
/**
* Enum values for router parameters
@ -105,149 +92,6 @@ static const MXS_ENUM_VALUE master_failure_mode_values[] =
* Internal functions
*/
/*
* @brief Clear one or more bits in the backend reference state
*
* The router session holds details of the backend servers that are
* involved in the routing for this particular service. Each backend
* server has a state bit string, and this function (along with
* bref_set_state) is used to manage the state.
*
* @param bref The backend reference to be modified
* @param state A bit string where the 1 bits indicate bits that should
* be turned off in the bref state.
*/
void bref_clear_state(backend_ref_t *bref, bref_state_t state)
{
ss_dassert(bref);
if ((state & BREF_WAITING_RESULT) && (bref->bref_state & BREF_WAITING_RESULT))
{
int prev1;
int prev2;
/** Decrease waiter count */
prev1 = atomic_add(&bref->bref_num_result_wait, -1);
if (prev1 <= 0)
{
atomic_add(&bref->bref_num_result_wait, 1);
}
else
{
/** Decrease global operation count */
prev2 = atomic_add(&bref->ref->server->stats.n_current_ops, -1);
ss_dassert(prev2 > 0);
if (prev2 <= 0)
{
MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u",
__FUNCTION__, bref->ref->server->name,
bref->ref->server->port);
}
}
}
bref->bref_state &= ~state;
}
/*
* @brief Set one or more bits in the backend reference state
*
* The router session holds details of the backend servers that are
* involved in the routing for this particular service. Each backend
* server has a state bit string, and this function (along with
* bref_clear_state) is used to manage the state.
*
* @param bref The backend reference to be modified
* @param state A bit string where the 1 bits indicate bits that should
* be turned on in the bref state.
*/
void bref_set_state(backend_ref_t *bref, bref_state_t state)
{
ss_dassert(bref);
if ((state & BREF_WAITING_RESULT) && (bref->bref_state & BREF_WAITING_RESULT) == 0)
{
int prev1;
int prev2;
/** Increase waiter count */
prev1 = atomic_add(&bref->bref_num_result_wait, 1);
ss_dassert(prev1 >= 0);
if (prev1 < 0)
{
MXS_ERROR("[%s] Error: negative number of connections waiting for "
"results in backend %s:%u", __FUNCTION__,
bref->ref->server->name, bref->ref->server->port);
}
/** Increase global operation count */
prev2 = atomic_add(&bref->ref->server->stats.n_current_ops, 1);
ss_dassert(prev2 >= 0);
if (prev2 < 0)
{
MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u",
__FUNCTION__, bref->ref->server->name, bref->ref->server->port);
}
}
bref->bref_state |= state;
}
/**
* @brief Create a generic router session property structure.
*
* @param prop_type Property type
*
* @return property structure of requested type, or NULL if failed
*/
rses_property_t *rses_property_init(rses_property_type_t prop_type)
{
rses_property_t* prop = new (std::nothrow) rses_property_t;
if (prop == NULL)
{
MXS_OOM();
return NULL;
}
prop->rses_prop_type = prop_type;
prop->rses_prop_next = NULL;
prop->rses_prop_refcount = 1;
prop->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY;
prop->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY;
return prop;
}
/**
* @brief Free resources belonging to a property
*
* Property is freed at the end of router client session.
*
* @param prop The property whose resources are to be released
*/
void rses_property_done(rses_property_t *prop)
{
ss_dassert(prop);
CHK_RSES_PROP(prop);
switch (prop->rses_prop_type)
{
case RSES_PROP_TYPE_SESCMD:
mysql_sescmd_done(&prop->rses_prop_data.sescmd);
break;
default:
MXS_DEBUG("%lu [rses_property_done] Unknown property type %d "
"in property %p", pthread_self(), prop->rses_prop_type, prop);
ss_dassert(false);
break;
}
delete prop;
}
/**
* @brief Get count of backend servers that are slaves.
*
@ -304,6 +148,14 @@ int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses)
return conf_max_rlag;
}
namespace
{
/** This will never get used but it should catch faults in the code */
static SRWBackend no_backend;
}
/**
* @brief Find a back end reference that matches the given DCB
*
@ -315,23 +167,26 @@ int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses)
*
* @return backend reference pointer if succeed or NULL
*/
backend_ref_t *get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb)
SRWBackend& get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb)
{
ss_dassert(dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
CHK_DCB(dcb);
CHK_CLIENT_RSES(rses);
for (int i = 0; i < rses->rses_nbackends; i++)
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
{
if (rses->rses_backend_ref[i].bref_dcb == dcb)
SRWBackend& backend = *it;
if (backend->dcb() == dcb)
{
return &rses->rses_backend_ref[i];
return backend;
}
}
/** We should always have a valid backend reference */
ss_dassert(false);
return NULL;
return no_backend;
}
/**
@ -478,30 +333,13 @@ static bool handle_max_slaves(ROUTER_INSTANCE *router, const char *str)
static void handle_error_reply_client(MXS_SESSION *ses, ROUTER_CLIENT_SES *rses,
DCB *backend_dcb, GWBUF *errmsg)
{
mxs_session_state_t sesstate;
DCB *client_dcb;
backend_ref_t *bref;
sesstate = ses->state;
client_dcb = ses->client_dcb;
mxs_session_state_t sesstate = ses->state;
DCB *client_dcb = ses->client_dcb;
if ((bref = get_bref_from_dcb(rses, backend_dcb)) != NULL)
{
CHK_BACKEND_REF(bref);
SRWBackend& bref = get_bref_from_dcb(rses, backend_dcb);
if (BREF_IS_IN_USE(bref))
{
close_failed_bref(bref, false);
RW_CHK_DCB(bref, backend_dcb);
dcb_close(backend_dcb);
RW_CLOSE_BREF(bref);
}
}
else
{
// All dcbs should be associated with a backend reference.
ss_dassert(!true);
}
bref->close();
if (sesstate == SESSION_STATE_ROUTER_READY)
{
@ -510,7 +348,7 @@ static void handle_error_reply_client(MXS_SESSION *ses, ROUTER_CLIENT_SES *rses,
}
}
static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, backend_ref_t *old, GWBUF *stored)
static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, SRWBackend& old, GWBUF *stored)
{
bool success = false;
@ -520,21 +358,22 @@ static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, backend_ref_t *old
* Only try to retry the read if autocommit is enabled and we are
* outside of a transaction
*/
for (int i = 0; i < rses->rses_nbackends; i++)
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
{
backend_ref_t *bref = &rses->rses_backend_ref[i];
SRWBackend& bref = *it;
if (BREF_IS_IN_USE(bref) && bref != old &&
!SERVER_IS_MASTER(bref->ref->server) &&
SERVER_IS_SLAVE(bref->ref->server))
if (bref->in_use() && bref != old &&
!SERVER_IS_MASTER(bref->server()) &&
SERVER_IS_SLAVE(bref->server()))
{
/** Found a valid candidate; a non-master slave that's in use */
if (bref->bref_dcb->func.write(bref->bref_dcb, stored))
if (bref->write(stored))
{
MXS_INFO("Retrying failed read at '%s'.", bref->ref->server->unique_name);
ss_dassert(bref->reply_state == REPLY_STATE_DONE);
MXS_INFO("Retrying failed read at '%s'.", bref->server()->unique_name);
ss_dassert(bref->get_reply_state() == REPLY_STATE_DONE);
LOG_RS(bref, REPLY_STATE_START);
bref->reply_state = REPLY_STATE_START;
bref->set_reply_state(REPLY_STATE_START);
rses->expected_responses++;
success = true;
break;
@ -542,21 +381,18 @@ static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, backend_ref_t *old
}
}
if (!success && rses->rses_master_ref && BREF_IS_IN_USE(rses->rses_master_ref) &&
rses->current_master && rses->current_master->in_use())
if (!success && rses->current_master && rses->current_master->in_use())
{
/**
* Either we failed to write to the slave or no valid slave was found.
* Try to retry the read on the master.
*/
backend_ref_t *bref = rses->rses_master_ref;
if (bref->bref_dcb->func.write(bref->bref_dcb, stored))
if (rses->current_master->write(stored))
{
MXS_INFO("Retrying failed read at '%s'.", bref->ref->server->unique_name);
LOG_RS(bref, REPLY_STATE_START);
ss_dassert(bref->reply_state == REPLY_STATE_DONE);
bref->reply_state = REPLY_STATE_START;
MXS_INFO("Retrying failed read at '%s'.", rses->current_master->server()->unique_name);
LOG_RS(rses->current_master, REPLY_STATE_START);
ss_dassert(rses->current_master->get_reply_state() == REPLY_STATE_DONE);
rses->current_master->set_reply_state(REPLY_STATE_START);
rses->expected_responses++;
success = true;
}
@ -585,30 +421,13 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
ROUTER_CLIENT_SES **rses,
DCB *backend_dcb, GWBUF *errmsg)
{
ROUTER_CLIENT_SES *myrses;
MXS_SESSION *ses;
int max_nslaves;
int max_slave_rlag;
backend_ref_t *bref;
bool succp;
ROUTER_CLIENT_SES *myrses = *rses;
SRWBackend& bref = get_bref_from_dcb(myrses, backend_dcb);
myrses = *rses;
ses = backend_dcb->session;
MXS_SESSION* ses = backend_dcb->session;
CHK_SESSION(ses);
/**
* If bref == NULL it has been replaced already with another one.
*
* NOTE: This can never happen.
*/
if ((bref = get_bref_from_dcb(myrses, backend_dcb)) == NULL)
{
return true;
}
CHK_BACKEND_REF(bref);
if (BREF_IS_WAITING_RESULT(bref))
if (bref->is_waiting_result())
{
/**
* A query was sent through the backend and it is waiting for a reply.
@ -619,7 +438,7 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
const SERVER *target;
if (!session_take_stmt(backend_dcb->session, &stored, &target) ||
target != bref->ref->server ||
target != bref->backend()->server ||
!reroute_stored_statement(*rses, bref, stored))
{
/**
@ -630,7 +449,7 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
gwbuf_free(stored);
myrses->expected_responses--;
if (!sescmd_cursor_is_active(&bref->bref_sescmd_cur))
if (bref->session_command_count())
{
/**
* The backend was executing a command that requires a reply.
@ -652,12 +471,11 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
}
}
RW_CHK_DCB(bref, backend_dcb);
dcb_close(backend_dcb);
RW_CLOSE_BREF(bref);
close_failed_bref(bref, false);
max_nslaves = rses_get_max_slavecount(myrses, myrses->rses_nbackends);
max_slave_rlag = rses_get_max_replication_lag(myrses);
/** Close the current connection */
bref->close();
int max_nslaves = rses_get_max_slavecount(myrses, myrses->rses_nbackends);
bool succp;
/**
* Try to get replacement slave or at least the minimum
* number of slave connections for router session.
@ -668,10 +486,7 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
}
else
{
succp = select_connect_backend_servers(&myrses->rses_master_ref,
myrses->rses_backend_ref,
myrses->rses_nbackends,
max_nslaves, max_slave_rlag,
succp = select_connect_backend_servers(myrses->rses_nbackends, max_nslaves,
myrses->rses_config.slave_selection_criteria,
ses, inst, myrses, true);
}
@ -737,94 +552,6 @@ static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv,
return succp;
}
/**
* @brief Create backend server references
*
* This creates a new set of backend references for the client session. Currently
* this is only used on startup but it could be used to dynamically change the
* set of used servers.
*
* @param rses Client router session
* @param dest Destination where the array of backens is stored
* @param n_backend Number of items in the array
* @return True on success, false on error
*/
static bool create_backends(ROUTER_CLIENT_SES *rses, backend_ref_t** dest, int* n_backend)
{
backend_ref_t* backend_ref = new (std::nothrow) backend_ref_t[*n_backend];
if (backend_ref == NULL)
{
return false;
}
int i = 0;
for (SERVER_REF *sref = rses->router->service->dbref; sref && i < *n_backend; sref = sref->next)
{
if (sref->active)
{
rses->backends.push_back(SRWBackend(new RWBackend(sref)));
backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF;
backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF;
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR;
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR;
backend_ref[i].closed_at = 0;
backend_ref[i].bref_state = 0;
backend_ref[i].ref = sref;
backend_ref[i].reply_state = REPLY_STATE_DONE;
backend_ref[i].reply_cmd = 0;
backend_ref[i].bref_dcb = NULL;
backend_ref[i].bref_num_result_wait = 0;
/** store pointers to sescmd list to both cursors */
backend_ref[i].bref_sescmd_cur.scmd_cur_rses = rses;
backend_ref[i].bref_sescmd_cur.scmd_cur_active = false;
backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property =
&rses->rses_properties[RSES_PROP_TYPE_SESCMD];
backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL;
i++;
}
}
if (i < *n_backend)
{
MXS_INFO("The service reported %d servers but only took %d into use.", *n_backend, i);
*n_backend = i;
}
*dest = backend_ref;
return true;
}
/**
* @brief Mark a backend reference as failed
*
* @param bref Backend reference to close
* @param fatal Whether the failure was fatal
*/
void close_failed_bref(backend_ref_t *bref, bool fatal)
{
if (BREF_IS_WAITING_RESULT(bref))
{
bref_clear_state(bref, BREF_WAITING_RESULT);
}
bref_clear_state(bref, BREF_QUERY_ACTIVE);
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);
if (fatal)
{
bref_set_state(bref, BREF_FATAL_FAILURE);
}
if (sescmd_cursor_is_active(&bref->bref_sescmd_cur))
{
sescmd_cursor_set_active(&bref->bref_sescmd_cur, false);
}
}
bool route_stored_query(ROUTER_CLIENT_SES *rses)
{
bool rval = true;
@ -871,36 +598,36 @@ bool route_stored_query(ROUTER_CLIENT_SES *rses)
*
* @return True if the complete response has been received
*/
bool reply_is_complete(backend_ref_t* bref, GWBUF *buffer)
bool reply_is_complete(SRWBackend& bref, GWBUF *buffer)
{
mysql_server_cmd_t cmd = mxs_mysql_current_command(bref->bref_dcb->session);
mysql_server_cmd_t cmd = mxs_mysql_current_command(bref->dcb()->session);
if (bref->reply_state == REPLY_STATE_START && !mxs_mysql_is_result_set(buffer))
if (bref->get_reply_state() == REPLY_STATE_START && !mxs_mysql_is_result_set(buffer))
{
if (cmd == MYSQL_COM_STMT_PREPARE || !mxs_mysql_more_results_after_ok(buffer))
{
/** Not a result set, we have the complete response */
LOG_RS(bref, REPLY_STATE_DONE);
bref->reply_state = REPLY_STATE_DONE;
bref->set_reply_state(REPLY_STATE_DONE);
}
}
else
{
bool more = false;
int old_eof = bref->reply_state == REPLY_STATE_RSET_ROWS ? 1 : 0;
int old_eof = bref->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
int n_eof = modutil_count_signal_packets(buffer, old_eof, &more);
if (n_eof == 0)
{
/** Waiting for the EOF packet after the column definitions */
LOG_RS(bref, REPLY_STATE_RSET_COLDEF);
bref->reply_state = REPLY_STATE_RSET_COLDEF;
bref->set_reply_state(REPLY_STATE_RSET_COLDEF);
}
else if (n_eof == 1 && cmd != MYSQL_COM_FIELD_LIST)
{
/** Waiting for the EOF packet after the rows */
LOG_RS(bref, REPLY_STATE_RSET_ROWS);
bref->reply_state = REPLY_STATE_RSET_ROWS;
bref->set_reply_state(REPLY_STATE_RSET_ROWS);
}
else
{
@ -908,32 +635,18 @@ bool reply_is_complete(backend_ref_t* bref, GWBUF *buffer)
* a COM_FIELD_LIST command */
ss_dassert(n_eof == 2 || (n_eof == 1 && cmd == MYSQL_COM_FIELD_LIST));
LOG_RS(bref, REPLY_STATE_DONE);
bref->reply_state = REPLY_STATE_DONE;
bref->set_reply_state(REPLY_STATE_DONE);
if (more)
{
/** The server will send more resultsets */
LOG_RS(bref, REPLY_STATE_START);
bref->reply_state = REPLY_STATE_START;
bref->set_reply_state(REPLY_STATE_START);
}
}
}
return bref->reply_state == REPLY_STATE_DONE;
}
SRWBackend get_backend_from_bref(ROUTER_CLIENT_SES* rses, backend_ref_t* bref)
{
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
{
if ((*it)->backend() == bref->ref)
{
return *it;
}
}
return SRWBackend();
return bref->get_reply_state() == REPLY_STATE_DONE;
}
void close_all_connections(ROUTER_CLIENT_SES* rses)
@ -941,14 +654,11 @@ void close_all_connections(ROUTER_CLIENT_SES* rses)
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
{
if (BREF_IS_IN_USE((&backend_ref[i])))
{
SRWBackend& bref = *it;
SRWBackend& bref = *it;
if (bref->in_use())
{
bref->close();
}
if (bref->in_use())
{
bref->close();
}
}
}
@ -1059,36 +769,38 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess
client_rses->rses_chk_top = CHK_NUM_ROUTER_SES;
client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES;
client_rses->rses_closed = false;
client_rses->rses_properties[RSES_PROP_TYPE_SESCMD] = NULL;
client_rses->router = router;
client_rses->client_dcb = session->client_dcb;
client_rses->have_tmp_tables = false;
client_rses->forced_node = NULL;
client_rses->expected_responses = 0;
client_rses->query_queue = NULL;
client_rses->load_data_state = LOAD_DATA_INACTIVE;
client_rses->sent_sescmd = 0;
client_rses->recv_sescmd = 0;
memcpy(&client_rses->rses_config, &router->rwsplit_config, sizeof(client_rses->rses_config));
int router_nservers = router->service->n_dbref;
const int min_nservers = 1; /*< hard-coded for now */
backend_ref_t *backend_ref;
if (!have_enough_servers(client_rses, min_nservers, router_nservers, router) ||
!create_backends(client_rses, &backend_ref, &router_nservers))
if (!have_enough_servers(client_rses, min_nservers, router_nservers, router))
{
delete client_rses;
return NULL;
}
int max_nslaves = rses_get_max_slavecount(client_rses, router_nservers);
int max_slave_rlag = rses_get_max_replication_lag(client_rses);
for (SERVER_REF *sref = router->service->dbref; sref; sref = sref->next)
{
if (sref->active)
{
client_rses->backends.push_back(SRWBackend(new RWBackend(sref)));
}
}
int max_nslaves = rses_get_max_slavecount(client_rses, router_nservers);
client_rses->rses_backend_ref = backend_ref;
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
backend_ref_t *master_ref = NULL; /*< pointer to selected master */
if (!select_connect_backend_servers(&master_ref, backend_ref, router_nservers,
max_nslaves, max_slave_rlag,
if (!select_connect_backend_servers(router_nservers, max_nslaves,
client_rses->rses_config.slave_selection_criteria,
session, router, client_rses, false))
{
@ -1097,15 +809,10 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess
* in the strict mode. If sessions without master are allowed, only
* <min_nslaves> slaves must be found.
*/
delete[] client_rses->rses_backend_ref;
delete client_rses;
return NULL;
}
/** Copy backend pointers to router session. */
client_rses->rses_master_ref = master_ref;
client_rses->current_master = get_backend_from_bref(client_rses, master_ref);
if (client_rses->rses_config.rw_max_slave_conn_percent)
{
int n_conn = 0;
@ -1143,62 +850,7 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio
* of every API function to quickly stop the processing of closed sessions.
*/
router_cli_ses->rses_closed = true;
for (int i = 0; i < router_cli_ses->rses_nbackends; i++)
{
backend_ref_t *bref = &router_cli_ses->rses_backend_ref[i];
if (BREF_IS_IN_USE(bref))
{
/** This backend is in use and it needs to be closed */
DCB *dcb = bref->bref_dcb;
CHK_DCB(dcb);
ss_dassert(dcb->session->state == SESSION_STATE_STOPPING);
if (BREF_IS_WAITING_RESULT(bref))
{
/** This backend was executing a query when the session was closed */
bref_clear_state(bref, BREF_WAITING_RESULT);
}
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);
RW_CHK_DCB(bref, dcb);
/** MXS-956: This will prevent closed DCBs from being closed twice.
* It should not happen but for currently unknown reasons, a DCB
* gets closed twice; first in handleError and a second time here. */
if (dcb && dcb->state == DCB_STATE_POLLING)
{
dcb_close(dcb);
}
RW_CLOSE_BREF(bref);
/** decrease server current connection counters */
atomic_add(&bref->ref->connections, -1);
}
else
{
ss_dassert(!BREF_IS_WAITING_RESULT(bref));
/** This should never be true unless a backend reference is taken
* out of use before clearing the BREF_WAITING_RESULT state */
if (BREF_IS_WAITING_RESULT(bref))
{
MXS_WARNING("A closed backend was expecting a result, this should not be possible. "
"Decrementing active operation counter for this backend.");
bref_clear_state(bref, BREF_WAITING_RESULT);
}
}
}
for (SRWBackendList::iterator it = router_cli_ses->backends.begin();
it != router_cli_ses->backends.end(); it++)
{
SRWBackend& backend = *it;
backend->close();
}
close_all_connections(router_cli_ses);
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) &&
router_cli_ses->sescmd_list.size())
@ -1231,25 +883,6 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio
static void freeSession(MXS_ROUTER *router_instance, MXS_ROUTER_SESSION *router_client_session)
{
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session;
/**
* For each property type, walk through the list, finalize properties
* and free the allocated memory.
*/
for (int i = RSES_PROP_TYPE_FIRST; i < RSES_PROP_TYPE_COUNT; i++)
{
rses_property_t *p = router_cli_ses->rses_properties[i];
rses_property_t *q = p;
while (p != NULL)
{
q = p->rses_prop_next;
rses_property_done(p);
p = q;
}
}
delete[] router_cli_ses->rses_backend_ref;
delete router_cli_ses;
}
@ -1481,20 +1114,18 @@ static void clientReply(MXS_ROUTER *instance,
* and
*/
backend_ref_t *bref = get_bref_from_dcb(router_cli_ses, backend_dcb);
CHK_BACKEND_REF(bref);
sescmd_cursor_t *scur = &bref->bref_sescmd_cur;
SRWBackend& bref = get_bref_from_dcb(router_cli_ses, backend_dcb);
/** Statement was successfully executed, free the stored statement */
session_clear_stmt(backend_dcb->session);
ss_dassert(bref->reply_state != REPLY_STATE_DONE);
ss_dassert(bref->get_reply_state() != REPLY_STATE_DONE);
if (reply_is_complete(bref, writebuf))
{
/** Got a complete reply, decrement expected response count */
router_cli_ses->expected_responses--;
ss_dassert(router_cli_ses->expected_responses >= 0);
ss_dassert(bref->reply_state == REPLY_STATE_DONE);
ss_dassert(bref->get_reply_state() == REPLY_STATE_DONE);
}
else
{
@ -1505,9 +1136,9 @@ static void clientReply(MXS_ROUTER *instance,
* Active cursor means that reply is from session command
* execution.
*/
if (sescmd_cursor_is_active(scur))
if (bref->session_command_count())
{
check_session_command_reply(writebuf, scur, bref);
check_session_command_reply(writebuf, bref);
if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf))
{
@ -1517,49 +1148,33 @@ static void clientReply(MXS_ROUTER *instance,
* needs to be sent to client or NULL.
*/
bool rconn = false;
writebuf = sescmd_cursor_process_replies(writebuf, bref, &rconn);
process_sescmd_response(router_cli_ses, bref, &writebuf, &rconn);
if (rconn && !router_inst->rwsplit_config.disable_sescmd_history)
{
select_connect_backend_servers(
&router_cli_ses->rses_master_ref, router_cli_ses->rses_backend_ref,
router_cli_ses->rses_nbackends,
router_cli_ses->rses_config.max_slave_connections,
router_cli_ses->rses_config.max_slave_replication_lag,
router_cli_ses->rses_config.slave_selection_criteria,
router_cli_ses->rses_master_ref->bref_dcb->session,
router_cli_ses->client_dcb->session,
router_cli_ses->router,
router_cli_ses,
true);
}
}
/**
* If response will be sent to client, decrease waiter count.
* This applies to session commands only. Counter decrement
* for other type of queries is done outside this block.
*/
}
/** Set response status as replied */
bref_clear_state(bref, BREF_WAITING_RESULT);
}
/**
* Clear BREF_QUERY_ACTIVE flag and decrease waiter counter.
* This applies for queries other than session commands.
*/
else if (BREF_IS_QUERY_ACTIVE(bref))
{
bref_clear_state(bref, BREF_QUERY_ACTIVE);
/** Set response status as replied */
bref_clear_state(bref, BREF_WAITING_RESULT);
}
/** Complete the write */
bref->ack_write();
bool queue_routed = false;
if (router_cli_ses->expected_responses == 0)
{
for (int i = 0; i < router_cli_ses->rses_nbackends; i++)
for (SRWBackendList::iterator it = router_cli_ses->backends.begin();
it != router_cli_ses->backends.end(); it++)
{
ss_dassert(router_cli_ses->rses_backend_ref[i].reply_state == REPLY_STATE_DONE);
ss_dassert((*it)->get_reply_state() == REPLY_STATE_DONE);
}
queue_routed = router_cli_ses->query_queue != NULL;
@ -1569,18 +1184,19 @@ static void clientReply(MXS_ROUTER *instance,
{
ss_dassert(router_cli_ses->expected_responses > 0);
}
if (writebuf != NULL && client_dcb != NULL)
if (writebuf && client_dcb)
{
/** Write reply to client DCB */
MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
}
/** There is one pending session command to be executed. */
else if (!queue_routed && sescmd_cursor_is_active(scur))
/** Check pending session commands */
else if (!queue_routed && bref->session_command_count())
{
MXS_INFO("Backend [%s]:%d processed reply and starts to execute active cursor.",
bref->ref->server->name, bref->ref->server->port);
bref->server()->name, bref->server()->port);
if (execute_sescmd_in_backend(bref))
if (bref->execute_session_command())
{
router_cli_ses->expected_responses++;
}
@ -1649,7 +1265,7 @@ static void handleError(MXS_ROUTER *instance,
MXS_SESSION *session = problem_dcb->session;
ss_dassert(session);
backend_ref_t *bref = get_bref_from_dcb(rses, problem_dcb);
SRWBackend& bref = get_bref_from_dcb(rses, problem_dcb);
switch (action)
{
@ -1659,13 +1275,13 @@ static void handleError(MXS_ROUTER *instance,
* If master has lost its Master status error can't be
* handled so that session could continue.
*/
if (rses->rses_master_ref && rses->rses_master_ref->bref_dcb == problem_dcb)
if (rses->current_master && rses->current_master->dcb() == problem_dcb)
{
SERVER *srv = rses->rses_master_ref->ref->server;
SERVER *srv = rses->current_master->server();
bool can_continue = false;
if (rses->rses_config.master_failure_mode != RW_FAIL_INSTANTLY &&
(bref == NULL || !BREF_IS_WAITING_RESULT(bref)))
(!bref || !bref->is_waiting_result()))
{
/** The failure of a master is not considered a critical
* failure as partial functionality still remains. Reads
@ -1688,13 +1304,9 @@ static void handleError(MXS_ROUTER *instance,
*succp = can_continue;
if (bref != NULL)
if (bref)
{
CHK_BACKEND_REF(bref);
RW_CHK_DCB(bref, problem_dcb);
dcb_close(problem_dcb);
RW_CLOSE_BREF(bref);
close_failed_bref(bref, true);
bref->close(mxs::Backend::CLOSE_FATAL);
}
else
{
@ -1704,19 +1316,18 @@ static void handleError(MXS_ROUTER *instance,
}
else if (bref)
{
/** Check whether problem_dcb is same as dcb of rses->forced_node
/** Check whether problem_dcb is same as dcb of rses->target_node
* and within READ ONLY transaction:
* if true reset rses->forced_node and close session
* if true reset rses->target_node and close session
*/
if (rses->forced_node &&
(rses->forced_node->bref_dcb == problem_dcb &&
if (rses->target_node &&
(rses->target_node->dcb() == problem_dcb &&
session_trx_is_read_only(problem_dcb->session)))
{
MXS_ERROR("forced_node SLAVE %s in opened READ ONLY transaction has failed:"
" closing session",
problem_dcb->server->unique_name);
rses->forced_node = NULL;
rses->target_node.reset();
*succp = false;
break;
@ -1731,11 +1342,11 @@ static void handleError(MXS_ROUTER *instance,
if (bref)
{
/** This is a valid DCB for a backend ref */
if (BREF_IS_IN_USE(bref) && bref->bref_dcb == problem_dcb)
if (bref->in_use() && bref->dcb() == problem_dcb)
{
ss_dassert(false);
MXS_ERROR("Backend '%s' is still in use and points to the problem DCB.",
bref->ref->server->unique_name);
bref->server()->unique_name);
}
}
else

View File

@ -30,15 +30,6 @@
#include <maxscale/backend.hh>
#include <maxscale/session_command.hh>
enum bref_state_t
{
BREF_IN_USE = 0x01,
BREF_WAITING_RESULT = 0x02, /**< for session commands only */
BREF_QUERY_ACTIVE = 0x04, /**< for other queries */
BREF_CLOSED = 0x08,
BREF_FATAL_FAILURE = 0x10 /**< Backend references that should be dropped */
};
enum backend_type_t
{
BE_UNDEFINED = -1,
@ -58,15 +49,6 @@ enum route_target_t
TARGET_RLAG_MAX = 0x10
};
enum rses_property_type_t
{
RSES_PROP_TYPE_UNDEFINED = -1,
RSES_PROP_TYPE_SESCMD = 0,
RSES_PROP_TYPE_FIRST = RSES_PROP_TYPE_SESCMD,
RSES_PROP_TYPE_LAST = RSES_PROP_TYPE_SESCMD,
RSES_PROP_TYPE_COUNT = RSES_PROP_TYPE_LAST + 1
};
/**
* This criteria is used when backends are chosen for a router session's use.
* Backend servers are sorted to ascending order according to the criteria
@ -133,59 +115,12 @@ enum ld_state
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED));
/** Reply state change debug logging */
#define LOG_RS(a, b) MXS_DEBUG("[%s]:%d %s -> %s", (a)->ref->server->name, \
(a)->ref->server->port, rstostr((a)->reply_state), rstostr(b));
#define LOG_RS(a, b) MXS_DEBUG("[%s]:%d %s -> %s", (a)->server()->name, \
(a)->server()->port, rstostr((a)->get_reply_state()), rstostr(b));
struct rses_property_t;
struct ROUTER_INSTANCE;
struct ROUTER_CLIENT_SES;
/**
* Session variable command
*/
struct mysql_sescmd_t
{
skygw_chk_t my_sescmd_chk_top;
struct rses_property_t* my_sescmd_prop; /**< parent property */
GWBUF* my_sescmd_buf; /**< query buffer */
unsigned char my_sescmd_packet_type; /**< packet type */
bool my_sescmd_is_replied; /**< is cmd replied to client */
unsigned char reply_cmd; /**< The reply command. One of OK, ERR, RESULTSET or
* LOCAL_INFILE. Slave servers are compared to this
* when they return session command replies.*/
uint64_t position; /**< Position of this command */
skygw_chk_t my_sescmd_chk_tail;
};
/**
* Property structure
*/
struct rses_property_t
{
skygw_chk_t rses_prop_chk_top;
ROUTER_CLIENT_SES* rses_prop_rsession; /**< parent router session */
int rses_prop_refcount;
rses_property_type_t rses_prop_type;
struct rses_prop_data // TODO: Remove the properties and integrate them into the session object
{
mysql_sescmd_t sescmd;
} rses_prop_data;
rses_property_t* rses_prop_next; /**< next property of same type */
skygw_chk_t rses_prop_chk_tail;
};
struct sescmd_cursor_t
{
skygw_chk_t scmd_cur_chk_top;
ROUTER_CLIENT_SES* scmd_cur_rses; /**< pointer to owning router session */
rses_property_t** scmd_cur_ptr_property; /**< address of pointer to owner property */
mysql_sescmd_t* scmd_cur_cmd; /**< pointer to current session command */
bool scmd_cur_active; /**< true if command is being executed */
uint64_t position; /**< Position of this cursor */
skygw_chk_t scmd_cur_chk_tail;
};
/** Enum for tracking client reply state */
enum reply_state_t
{
@ -258,7 +193,6 @@ struct ROUTER_CLIENT_SES
{
skygw_chk_t rses_chk_top;
bool rses_closed; /**< true when closeSession is called */
rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; /**< Properties listed by their type */
SRWBackendList backends; /**< List of backend servers */
SRWBackend current_master; /**< Current master server */
SRWBackend target_node; /**< The currently locked target node */

View File

@ -36,26 +36,22 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf);
void closed_session_reply(GWBUF *querybuf);
void print_error_packet(ROUTER_CLIENT_SES *rses, GWBUF *buf, DCB *dcb);
void check_session_command_reply(GWBUF *writebuf, sescmd_cursor_t *scur, backend_ref_t *bref);
bool execute_sescmd_in_backend(backend_ref_t *backend_ref);
void check_session_command_reply(GWBUF *writebuf, SRWBackend bref);
bool execute_sescmd_in_backend(SRWBackend& backend_ref);
bool handle_target_is_all(route_target_t route_target,
ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, int packet_type, uint32_t qtype);
uint8_t determine_packet_type(GWBUF *querybuf, bool *non_empty_packet);
void log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint32_t qtype);
bool is_packet_a_one_way_message(int packet_type);
sescmd_cursor_t *backend_ref_get_sescmd_cursor(backend_ref_t *bref);
bool is_packet_a_query(int packet_type);
bool send_readonly_error(DCB *dcb);
/*
* The following are implemented in readwritesplit.c
*/
void bref_clear_state(backend_ref_t *bref, bref_state_t state);
void bref_set_state(backend_ref_t *bref, bref_state_t state);
int router_handle_state_switch(DCB *dcb, DCB_REASON reason, void *data);
backend_ref_t *get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb);
void rses_property_done(rses_property_t *prop);
SRWBackend& get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb);
int rses_get_max_slavecount(ROUTER_CLIENT_SES *rses, int router_nservers);
int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses);
@ -65,57 +61,32 @@ int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses);
bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf);
int rwsplit_hashkeyfun(const void *key);
int rwsplit_hashcmpfun(const void *v1, const void *v2);
void *rwsplit_hstrdup(const void *fval);
void rwsplit_hfree(void *fval);
bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
char *name, int max_rlag);
bool get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
char *name, int max_rlag, SRWBackend& target);
route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
uint32_t qtype, HINT *hint);
rses_property_t *rses_property_init(rses_property_type_t prop_type);
int rses_property_add(ROUTER_CLIENT_SES *rses, rses_property_t *prop);
void handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
uint8_t packet_type, uint32_t *qtype);
bool handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
route_target_t route_target, DCB **target_dcb);
route_target_t route_target, SRWBackend& target);
bool handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
DCB **target_dcb);
SRWBackend& target);
bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
DCB **target_dcb);
SRWBackend& target);
bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, DCB *target_dcb, bool store);
GWBUF *querybuf, SRWBackend& target, bool store);
bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf, ROUTER_INSTANCE *inst,
int packet_type,
uint32_t qtype);
/*
* The following are implemented in rwsplit_session_cmd.c
*/
mysql_sescmd_t *rses_property_get_sescmd(rses_property_t *prop);
mysql_sescmd_t *mysql_sescmd_init(rses_property_t *rses_prop,
GWBUF *sescmd_buf,
unsigned char packet_type,
ROUTER_CLIENT_SES *rses);
void mysql_sescmd_done(mysql_sescmd_t *sescmd);
mysql_sescmd_t *sescmd_cursor_get_command(sescmd_cursor_t *scur);
bool sescmd_cursor_is_active(sescmd_cursor_t *sescmd_cursor);
void sescmd_cursor_set_active(sescmd_cursor_t *sescmd_cursor,
bool value);
bool execute_sescmd_history(backend_ref_t *bref);
GWBUF *sescmd_cursor_clone_querybuf(sescmd_cursor_t *scur);
GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
backend_ref_t *bref,
bool *reconnect);
void process_sescmd_response(ROUTER_CLIENT_SES* rses, SRWBackend& bref,
GWBUF** ppPacket, bool* reconnect);
/*
* The following are implemented in rwsplit_select_backends.c
*/
bool select_connect_backend_servers(backend_ref_t **p_master_ref,
backend_ref_t *backend_ref,
int router_nservers, int max_nslaves,
int max_slave_rlag,
bool select_connect_backend_servers(int router_nservers,
int max_nslaves,
select_criteria_t select_criteria,
MXS_SESSION *session,
ROUTER_INSTANCE *router,
@ -133,5 +104,5 @@ void check_create_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf, uint32_t type);
bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type);
uint32_t determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet);
void close_failed_bref(backend_ref_t *bref, bool fatal);
SRWBackend get_backend_from_bref(ROUTER_CLIENT_SES* rses, backend_ref_t* bref);
void close_all_connections(ROUTER_CLIENT_SES* rses);