Clean up readwritesplit API functions
Cleaned up some of the comments and documentation for the functions. Renamed some variables and moved parts of the error handling logging into a subfunction.
This commit is contained in:
@ -652,6 +652,29 @@ void close_all_connections(RWSplitSession* rses)
|
||||
}
|
||||
}
|
||||
|
||||
void check_and_log_backend_state(const SRWBackend& backend, DCB* problem_dcb)
|
||||
{
|
||||
if (backend)
|
||||
{
|
||||
/** This is a valid DCB for a backend ref */
|
||||
if (backend->in_use() && backend->dcb() == problem_dcb)
|
||||
{
|
||||
ss_dassert(false);
|
||||
MXS_ERROR("Backend '%s' is still in use and points to the problem DCB.",
|
||||
backend->name());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
const char *remote = problem_dcb->state == DCB_STATE_POLLING &&
|
||||
problem_dcb->server ? problem_dcb->server->unique_name : "CLOSED";
|
||||
|
||||
MXS_ERROR("DCB connected to '%s' is not in use by the router "
|
||||
"session, not closing it. DCB is in state '%s'",
|
||||
remote, STRDCBSTATE(problem_dcb->state));
|
||||
}
|
||||
}
|
||||
|
||||
RWSplit::RWSplit(SERVICE* service, const Config& config):
|
||||
m_service(service),
|
||||
m_config(config)
|
||||
@ -687,16 +710,15 @@ RWSplitSession::RWSplitSession(const Config& config):
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief Create an instance of the read/write router (API).
|
||||
* @brief Create a new readwritesplit router instance
|
||||
*
|
||||
* Create an instance of read/write statement router within the MaxScale. One
|
||||
* instance of the router is required for each service that is defined in the
|
||||
* configuration as using this router. One instance of the router will handle
|
||||
* multiple connections (or router sessions).
|
||||
* An instance of the router is required for each service that uses this router.
|
||||
* One instance of the router will handle multiple router sessions.
|
||||
*
|
||||
* @param service The service this router is being create for
|
||||
* @param options The options for this query router
|
||||
* @return NULL in failure, pointer to router in success.
|
||||
*
|
||||
* @return New router instance or NULL on error
|
||||
*/
|
||||
static MXS_ROUTER* createInstance(SERVICE *service, char **options)
|
||||
{
|
||||
@ -720,7 +742,7 @@ static MXS_ROUTER *createInstance(SERVICE *service, char **options)
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Associate a new session with this instance of the router (API).
|
||||
* @brief Create a new session for this router instance
|
||||
*
|
||||
* The session is used to store all the data required by the router for a
|
||||
* particular client connection. The instance of the router that relates to a
|
||||
@ -734,7 +756,8 @@ static MXS_ROUTER *createInstance(SERVICE *service, char **options)
|
||||
*
|
||||
* @param instance The router instance data
|
||||
* @param session The MaxScale session (generic connection data)
|
||||
* @return Session specific data for this session, i.e. a router session
|
||||
*
|
||||
* @return New router session or NULL on error
|
||||
*/
|
||||
static MXS_ROUTER_SESSION* newSession(MXS_ROUTER *router_inst, MXS_SESSION *session)
|
||||
{
|
||||
@ -743,7 +766,6 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess
|
||||
|
||||
if (client_rses == NULL)
|
||||
{
|
||||
delete client_rses;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -761,7 +783,7 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess
|
||||
client_rses->sescmd_count = 1; // Needs to be a positive number to work
|
||||
|
||||
int router_nservers = router->service()->n_dbref;
|
||||
const int min_nservers = 1; /*< hard-coded for now */
|
||||
const int min_nservers = 1;
|
||||
|
||||
if (!have_enough_servers(client_rses, min_nservers, router_nservers, router))
|
||||
{
|
||||
@ -777,20 +799,18 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess
|
||||
}
|
||||
}
|
||||
|
||||
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
|
||||
client_rses->rses_nbackends = router_nservers;
|
||||
|
||||
int max_nslaves = rses_get_max_slavecount(client_rses);
|
||||
|
||||
|
||||
if (!select_connect_backend_servers(router_nservers, max_nslaves,
|
||||
client_rses->rses_config.slave_selection_criteria,
|
||||
session, router, client_rses,
|
||||
connection_type::ALL))
|
||||
{
|
||||
/**
|
||||
* Master and at least <min_nslaves> slaves must be found if the router is
|
||||
* in the strict mode. If sessions without master are allowed, only
|
||||
* <min_nslaves> slaves must be found.
|
||||
* At least the master must be found if the router is in the strict mode.
|
||||
* If sessions without master are allowed, only a slave must be found.
|
||||
*/
|
||||
delete client_rses;
|
||||
return NULL;
|
||||
@ -810,13 +830,12 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Close a router session (API).
|
||||
* @brief Close a router session
|
||||
*
|
||||
* Close a session with the router, this is the mechanism by which a router
|
||||
* may cleanup data structure etc. The instance of the router that relates to
|
||||
* may perform cleanup. The instance of the router that relates to
|
||||
* the relevant service is passed, along with the router session that is to
|
||||
* be closed. Typically the function is used in conjunction with freeSession
|
||||
* which will release the resources used by a router session (see below).
|
||||
* be closed. The freeSession will be called once the session has been closed.
|
||||
*
|
||||
* @param instance The router instance data
|
||||
* @param session The router session being closed
|
||||
@ -828,10 +847,6 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio
|
||||
|
||||
if (!router_cli_ses->rses_closed)
|
||||
{
|
||||
/**
|
||||
* Mark router session as closed. @c rses_closed is checked at the start
|
||||
* of every API function to quickly stop the processing of closed sessions.
|
||||
*/
|
||||
router_cli_ses->rses_closed = true;
|
||||
close_all_connections(router_cli_ses);
|
||||
|
||||
@ -854,23 +869,23 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Free a router session (API).
|
||||
* @brief Free a router session
|
||||
*
|
||||
* When a router session has been closed, freeSession can be called to free
|
||||
* allocated resources.
|
||||
*
|
||||
* @param router_instance The router instance the session belongs to
|
||||
* @param router_client_session Client session
|
||||
* @param instance The router instance
|
||||
* @param session The router session
|
||||
*
|
||||
*/
|
||||
static void freeSession(MXS_ROUTER *router_instance, MXS_ROUTER_SESSION *router_client_session)
|
||||
static void freeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* session)
|
||||
{
|
||||
RWSplitSession *router_cli_ses = (RWSplitSession *)router_client_session;
|
||||
delete router_cli_ses;
|
||||
RWSplitSession* rses = reinterpret_cast<RWSplitSession*>(session);
|
||||
delete rses;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief The main routing entry point for a query (API)
|
||||
* @brief The main routing entry point
|
||||
*
|
||||
* The routeQuery function will make the routing decision based on the contents
|
||||
* of the instance, session and the query itself. The query always represents
|
||||
@ -880,6 +895,7 @@ static void freeSession(MXS_ROUTER *router_instance, MXS_ROUTER_SESSION *router_
|
||||
* @param instance Router instance
|
||||
* @param router_session Router session associated with the client
|
||||
* @param querybuf Buffer containing the query
|
||||
*
|
||||
* @return 1 on success, 0 on error
|
||||
*/
|
||||
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *querybuf)
|
||||
@ -912,9 +928,11 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
|
||||
}
|
||||
else
|
||||
{
|
||||
/**
|
||||
* We are already processing a request from the client. Store the
|
||||
* new query and wait for the previous one to complete.
|
||||
*/
|
||||
ss_dassert(rses->expected_responses || rses->query_queue);
|
||||
/** We are already processing a request from the client. Store the
|
||||
* new query and wait for the previous one to complete. */
|
||||
MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command",
|
||||
gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4],
|
||||
rses->expected_responses);
|
||||
@ -938,7 +956,7 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Diagnostics routine (API)
|
||||
* @brief Diagnostics routine
|
||||
*
|
||||
* Print query router statistics to the DCB passed in
|
||||
*
|
||||
@ -1011,9 +1029,7 @@ static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Diagnostics routine (API)
|
||||
*
|
||||
* Print query router statistics to the DCB passed in
|
||||
* @brief JSON diagnostics routine
|
||||
*
|
||||
* @param instance The router instance
|
||||
* @param dcb The DCB for diagnostic output
|
||||
@ -1062,47 +1078,30 @@ static json_t* diagnostics_json(const MXS_ROUTER *instance)
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Client Reply routine (API)
|
||||
*
|
||||
* The routine will reply to client for session change with master server data
|
||||
* @brief Client Reply routine
|
||||
*
|
||||
* @param instance The router instance
|
||||
* @param router_session The router session
|
||||
* @param backend_dcb The backend DCB
|
||||
* @param queue The GWBUF with reply data
|
||||
* @param queue The Buffer containing the reply
|
||||
*/
|
||||
static void clientReply(MXS_ROUTER *instance,
|
||||
MXS_ROUTER_SESSION *router_session,
|
||||
GWBUF *writebuf,
|
||||
DCB *backend_dcb)
|
||||
{
|
||||
RWSplitSession *router_cli_ses = (RWSplitSession *)router_session;
|
||||
RWSplit *router_inst = (RWSplit *)instance;
|
||||
RWSplitSession *rses = (RWSplitSession *)router_session;
|
||||
DCB *client_dcb = backend_dcb->session->client_dcb;
|
||||
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
/**
|
||||
* Lock router client session for secure read of router session members.
|
||||
* Note that this could be done without lock by using version #
|
||||
*/
|
||||
if (router_cli_ses->rses_closed)
|
||||
if (rses->rses_closed)
|
||||
{
|
||||
gwbuf_free(writebuf);
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* 1. Check if backend received reply to sescmd.
|
||||
* 2. Check sescmd's state whether OK_PACKET has been
|
||||
* sent to client already and if not, lock property cursor,
|
||||
* reply to client, and move property cursor forward. Finally
|
||||
* release the lock.
|
||||
* 3. If reply for this sescmd is sent, lock property cursor
|
||||
* and
|
||||
*/
|
||||
|
||||
SRWBackend backend = get_backend_from_dcb(router_cli_ses, backend_dcb);
|
||||
SRWBackend backend = get_backend_from_dcb(rses, backend_dcb);
|
||||
|
||||
/** Statement was successfully executed, free the stored statement */
|
||||
session_clear_stmt(backend_dcb->session);
|
||||
@ -1110,15 +1109,15 @@ static void clientReply(MXS_ROUTER *instance,
|
||||
|
||||
if (reply_is_complete(backend, writebuf))
|
||||
{
|
||||
/** Got a complete reply, acknowledge the write decrement expected response count */
|
||||
/** Got a complete reply, acknowledge the write and decrement expected response count */
|
||||
backend->ack_write();
|
||||
router_cli_ses->expected_responses--;
|
||||
ss_dassert(router_cli_ses->expected_responses >= 0);
|
||||
rses->expected_responses--;
|
||||
ss_dassert(rses->expected_responses >= 0);
|
||||
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_DEBUG("Reply not yet complete, waiting for %d replies", router_cli_ses->expected_responses);
|
||||
MXS_DEBUG("Reply not yet complete, waiting for %d replies", rses->expected_responses);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1131,37 +1130,37 @@ static void clientReply(MXS_ROUTER *instance,
|
||||
|
||||
/** This discards all responses that have already been sent to the client */
|
||||
bool rconn = false;
|
||||
process_sescmd_response(router_cli_ses, backend, &writebuf, &rconn);
|
||||
process_sescmd_response(rses, backend, &writebuf, &rconn);
|
||||
|
||||
if (rconn && !router_inst->config().disable_sescmd_history)
|
||||
if (rconn && !rses->router->config().disable_sescmd_history)
|
||||
{
|
||||
select_connect_backend_servers(
|
||||
router_cli_ses->rses_nbackends,
|
||||
router_cli_ses->rses_config.max_slave_connections,
|
||||
router_cli_ses->rses_config.slave_selection_criteria,
|
||||
router_cli_ses->client_dcb->session,
|
||||
router_cli_ses->router,
|
||||
router_cli_ses,
|
||||
rses->rses_nbackends,
|
||||
rses->rses_config.max_slave_connections,
|
||||
rses->rses_config.slave_selection_criteria,
|
||||
rses->client_dcb->session,
|
||||
rses->router,
|
||||
rses,
|
||||
connection_type::SLAVE);
|
||||
}
|
||||
}
|
||||
|
||||
bool queue_routed = false;
|
||||
|
||||
if (router_cli_ses->expected_responses == 0)
|
||||
if (rses->expected_responses == 0)
|
||||
{
|
||||
for (SRWBackendList::iterator it = router_cli_ses->backends.begin();
|
||||
it != router_cli_ses->backends.end(); it++)
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
ss_dassert((*it)->get_reply_state() == REPLY_STATE_DONE);
|
||||
}
|
||||
|
||||
queue_routed = router_cli_ses->query_queue != NULL;
|
||||
route_stored_query(router_cli_ses);
|
||||
queue_routed = rses->query_queue != NULL;
|
||||
route_stored_query(rses);
|
||||
}
|
||||
else
|
||||
{
|
||||
ss_dassert(router_cli_ses->expected_responses > 0);
|
||||
ss_dassert(rses->expected_responses > 0);
|
||||
}
|
||||
|
||||
if (writebuf && client_dcb)
|
||||
@ -1177,37 +1176,24 @@ static void clientReply(MXS_ROUTER *instance,
|
||||
|
||||
if (backend->execute_session_command())
|
||||
{
|
||||
router_cli_ses->expected_responses++;
|
||||
rses->expected_responses++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief Get router capabilities (API)
|
||||
*
|
||||
* Return a bit map indicating the characteristics of this particular router.
|
||||
* In this case, the only bit set indicates that the router wants to receive
|
||||
* data for routing as whole SQL statements.
|
||||
*
|
||||
* @return RCAP_TYPE_STMT_INPUT.
|
||||
* @brief Get router capabilities
|
||||
*/
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance)
|
||||
{
|
||||
return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_STMT_OUTPUT;
|
||||
}
|
||||
|
||||
/*
|
||||
* This is the end of the API functions, and the start of functions that are
|
||||
* used by the API functions and also used in other modules of the router
|
||||
* code. Their prototypes are included in rwsplit_internal.h since these
|
||||
* functions are not intended for use outside the read write split router.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief Router error handling routine (API)
|
||||
* @brief Router error handling routine
|
||||
*
|
||||
* Error Handler routine to resolve _backend_ failures. If it succeeds then
|
||||
* Error Handler routine to resolve backend failures. If it succeeds then
|
||||
* there are enough operative backends available and connected. Otherwise it
|
||||
* fails, and session is terminated.
|
||||
*
|
||||
@ -1217,10 +1203,7 @@ static uint64_t getCapabilities(MXS_ROUTER* instance)
|
||||
* @param backend_dcb The backend DCB
|
||||
* @param action The action: ERRACT_NEW_CONNECTION or
|
||||
* ERRACT_REPLY_CLIENT
|
||||
* @param succp Result of action: true iff router can continue
|
||||
*
|
||||
* Even if succp == true connecting to new slave may have failed. succp is to
|
||||
* tell whether router has enough master/slave connections to continue work.
|
||||
* @param succp Result of action: true if router can continue
|
||||
*/
|
||||
static void handleError(MXS_ROUTER *instance,
|
||||
MXS_ROUTER_SESSION *router_session,
|
||||
@ -1250,12 +1233,9 @@ static void handleError(MXS_ROUTER *instance,
|
||||
{
|
||||
case ERRACT_NEW_CONNECTION:
|
||||
{
|
||||
/**
|
||||
* If master has lost its Master status error can't be
|
||||
* handled so that session could continue.
|
||||
*/
|
||||
if (rses->current_master && rses->current_master->dcb() == problem_dcb)
|
||||
{
|
||||
/** The connection to the master has failed */
|
||||
SERVER *srv = rses->current_master->server();
|
||||
bool can_continue = false;
|
||||
|
||||
@ -1295,48 +1275,23 @@ static void handleError(MXS_ROUTER *instance,
|
||||
}
|
||||
else if (backend)
|
||||
{
|
||||
/** Check whether problem_dcb is same as dcb of rses->target_node
|
||||
* and within READ ONLY transaction:
|
||||
* if true reset rses->target_node and close session
|
||||
*/
|
||||
if (rses->target_node &&
|
||||
(rses->target_node->dcb() == problem_dcb &&
|
||||
session_trx_is_read_only(problem_dcb->session)))
|
||||
{
|
||||
MXS_ERROR("forced_node SLAVE %s in opened READ ONLY transaction has failed:"
|
||||
" closing session",
|
||||
problem_dcb->server->unique_name);
|
||||
|
||||
/** The problem DCB is the current target of a READ ONLY transaction.
|
||||
* Reset the target and close the session. */
|
||||
rses->target_node.reset();
|
||||
*succp = false;
|
||||
break;
|
||||
}
|
||||
|
||||
/** We should reconnect only if we find a backend for this
|
||||
* DCB. If this DCB is an older DCB that has been closed,
|
||||
* we can ignore it. */
|
||||
*succp = handle_error_new_connection(inst, &rses, problem_dcb, errmsgbuf);
|
||||
}
|
||||
|
||||
if (backend)
|
||||
{
|
||||
/** This is a valid DCB for a backend ref */
|
||||
if (backend->in_use() && backend->dcb() == problem_dcb)
|
||||
{
|
||||
ss_dassert(false);
|
||||
MXS_ERROR("Backend '%s' is still in use and points to the problem DCB.",
|
||||
backend->name());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
const char *remote = problem_dcb->state == DCB_STATE_POLLING &&
|
||||
problem_dcb->server ? problem_dcb->server->unique_name : "CLOSED";
|
||||
|
||||
MXS_ERROR("DCB connected to '%s' is not in use by the router "
|
||||
"session, not closing it. DCB is in state '%s'",
|
||||
remote, STRDCBSTATE(problem_dcb->state));
|
||||
/** Try to replace the failed connection with a new one */
|
||||
*succp = handle_error_new_connection(inst, &rses, problem_dcb, errmsgbuf);
|
||||
}
|
||||
}
|
||||
|
||||
check_and_log_backend_state(backend, problem_dcb);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -1357,12 +1312,9 @@ static void handleError(MXS_ROUTER *instance,
|
||||
MXS_BEGIN_DECLS
|
||||
|
||||
/**
|
||||
* The module entry point routine. It is this routine that
|
||||
* must return the structure that is referred to as the
|
||||
* "module object", this is a structure with the set of
|
||||
* external entry points for this module.
|
||||
*
|
||||
* @return The module object
|
||||
* The module entry point routine. It is this routine that must return
|
||||
* the structure that is referred to as the "module object". This is a
|
||||
* structure with the set of external entry points for this module.
|
||||
*/
|
||||
MXS_MODULE *MXS_CREATE_MODULE()
|
||||
{
|
||||
|
Reference in New Issue
Block a user