Files
MaxScale/server/modules/routing/readwritesplit/readwritesplit.cc
Markus Mäkelä c7520a2156 Add name and uri helpers to Backend
Providing helper functions for the commonly used parts of the server makes
code easier to read. It also removes any possibility for formatting
problems by moving the URI and name string handling inside the Backend
class.
2017-06-22 10:40:18 +03:00

1440 lines
50 KiB
C++

/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "readwritesplit.hh"
#include "rwsplit_internal.hh"
#include <inttypes.h>
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <new>
#include <maxscale/router.h>
#include <maxscale/log_manager.h>
#include <maxscale/query_classifier.h>
#include <maxscale/dcb.h>
#include <maxscale/spinlock.h>
#include <maxscale/modinfo.h>
#include <maxscale/modutil.h>
#include <maxscale/alloc.h>
/**
* The entry points for the read/write query splitting router module.
*
* This file contains the entry points that comprise the API to the read
* write query splitting router. It also contains functions that are
* directly called by the entry point functions. Some of these are used by
* functions in other modules of the read write split router, others are
* used only within this module.
*/
/** Maximum number of slaves */
#define MAX_SLAVE_COUNT "255"
/*
* The functions that implement the router module API
*/
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session, GWBUF *queue);
static bool rwsplit_process_router_options(ROUTER_INSTANCE *router,
char **options);
static void handle_error_reply_client(MXS_SESSION *ses, ROUTER_CLIENT_SES *rses,
DCB *backend_dcb, GWBUF *errmsg);
static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
ROUTER_CLIENT_SES **rses,
DCB *backend_dcb, GWBUF *errmsg);
static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv,
int router_nsrv, ROUTER_INSTANCE *router);
/**
* Enum values for router parameters
*/
static const MXS_ENUM_VALUE use_sql_variables_in_values[] =
{
{"all", TYPE_ALL},
{"master", TYPE_MASTER},
{NULL}
};
static const MXS_ENUM_VALUE slave_selection_criteria_values[] =
{
{"LEAST_GLOBAL_CONNECTIONS", LEAST_GLOBAL_CONNECTIONS},
{"LEAST_ROUTER_CONNECTIONS", LEAST_ROUTER_CONNECTIONS},
{"LEAST_BEHIND_MASTER", LEAST_BEHIND_MASTER},
{"LEAST_CURRENT_OPERATIONS", LEAST_CURRENT_OPERATIONS},
{NULL}
};
static const MXS_ENUM_VALUE master_failure_mode_values[] =
{
{"fail_instantly", RW_FAIL_INSTANTLY},
{"fail_on_write", RW_FAIL_ON_WRITE},
{"error_on_write", RW_ERROR_ON_WRITE},
{NULL}
};
/**
* Internal functions
*/
/**
* @brief Get count of backend servers that are slaves.
*
* Find out the number of read backend servers.
* Depending on the configuration value type, either copy direct count
* of slave connections or calculate the count from percentage value.
*
* @param rses Router client session
* @param router_nservers The number of backend servers in total
*/
int rses_get_max_slavecount(ROUTER_CLIENT_SES *rses)
{
int conf_max_nslaves;
int router_nservers = rses->rses_nbackends;
CHK_CLIENT_RSES(rses);
if (rses->rses_config.max_slave_connections > 0)
{
conf_max_nslaves = rses->rses_config.max_slave_connections;
}
else
{
conf_max_nslaves = (router_nservers * rses->rses_config.rw_max_slave_conn_percent) / 100;
}
return MXS_MIN(router_nservers - 1, MXS_MAX(1, conf_max_nslaves));
}
/*
* @brief Get the maximum replication lag for this router
*
* @param rses Router client session
* @return Replication lag from configuration or very large number
*/
int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses)
{
int conf_max_rlag;
CHK_CLIENT_RSES(rses);
/** if there is no configured value, then longest possible int is used */
if (rses->rses_config.max_slave_replication_lag > 0)
{
conf_max_rlag = rses->rses_config.max_slave_replication_lag;
}
else
{
conf_max_rlag = ~(1 << 31);
}
return conf_max_rlag;
}
/**
* @brief Find a back end reference that matches the given DCB
*
* Finds out if there is a backend reference pointing at the DCB given as
* parameter.
*
* @param rses router client session
* @param dcb DCB
*
* @return backend reference pointer if succeed or NULL
*/
SRWBackend get_backend_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb)
{
ss_dassert(dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
CHK_DCB(dcb);
CHK_CLIENT_RSES(rses);
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
{
SRWBackend& backend = *it;
if (backend->dcb() == dcb)
{
return backend;
}
}
/** We should always have a valid backend reference */
ss_dassert(false);
return SRWBackend();
}
/**
* @brief Process router options
*
* @param router Router instance
* @param options Router options
* @return True on success, false if a configuration error was found
*/
static bool rwsplit_process_router_options(ROUTER_INSTANCE *router,
char **options)
{
int i;
char *value;
select_criteria_t c;
if (options == NULL)
{
return true;
}
MXS_WARNING("Router options for readwritesplit are deprecated.");
bool success = true;
for (i = 0; options[i]; i++)
{
if ((value = strchr(options[i], '=')) == NULL)
{
MXS_ERROR("Unsupported router option \"%s\" for readwritesplit router.", options[i]);
success = false;
}
else
{
*value = 0;
value++;
if (strcmp(options[i], "slave_selection_criteria") == 0)
{
c = GET_SELECT_CRITERIA(value);
ss_dassert(c == LEAST_GLOBAL_CONNECTIONS ||
c == LEAST_ROUTER_CONNECTIONS || c == LEAST_BEHIND_MASTER ||
c == LEAST_CURRENT_OPERATIONS || c == UNDEFINED_CRITERIA);
if (c == UNDEFINED_CRITERIA)
{
MXS_ERROR("Unknown slave selection criteria \"%s\". "
"Allowed values are LEAST_GLOBAL_CONNECTIONS, "
"LEAST_ROUTER_CONNECTIONS, LEAST_BEHIND_MASTER,"
"and LEAST_CURRENT_OPERATIONS.",
STRCRITERIA(router->rwsplit_config.slave_selection_criteria));
success = false;
}
else
{
router->rwsplit_config.slave_selection_criteria = c;
}
}
else if (strcmp(options[i], "max_sescmd_history") == 0)
{
router->rwsplit_config.max_sescmd_history = atoi(value);
}
else if (strcmp(options[i], "disable_sescmd_history") == 0)
{
router->rwsplit_config.disable_sescmd_history = config_truth_value(value);
}
else if (strcmp(options[i], "master_accept_reads") == 0)
{
router->rwsplit_config.master_accept_reads = config_truth_value(value);
}
else if (strcmp(options[i], "strict_multi_stmt") == 0)
{
router->rwsplit_config.strict_multi_stmt = config_truth_value(value);
}
else if (strcmp(options[i], "retry_failed_reads") == 0)
{
router->rwsplit_config.retry_failed_reads = config_truth_value(value);
}
else if (strcmp(options[i], "master_failure_mode") == 0)
{
if (strcasecmp(value, "fail_instantly") == 0)
{
router->rwsplit_config.master_failure_mode = RW_FAIL_INSTANTLY;
}
else if (strcasecmp(value, "fail_on_write") == 0)
{
router->rwsplit_config.master_failure_mode = RW_FAIL_ON_WRITE;
}
else if (strcasecmp(value, "error_on_write") == 0)
{
router->rwsplit_config.master_failure_mode = RW_ERROR_ON_WRITE;
}
else
{
MXS_ERROR("Unknown value for 'master_failure_mode': %s", value);
success = false;
}
}
else
{
MXS_ERROR("Unknown router option \"%s=%s\" for readwritesplit router.",
options[i], value);
success = false;
}
}
} /*< for */
return success;
}
// TODO: Don't process parameters in readwritesplit
static bool handle_max_slaves(ROUTER_INSTANCE *router, const char *str)
{
bool rval = true;
char *endptr;
int val = strtol(str, &endptr, 10);
if (*endptr == '%' && *(endptr + 1) == '\0')
{
router->rwsplit_config.rw_max_slave_conn_percent = val;
router->rwsplit_config.max_slave_connections = 0;
}
else if (*endptr == '\0')
{
router->rwsplit_config.max_slave_connections = val;
router->rwsplit_config.rw_max_slave_conn_percent = 0;
}
else
{
MXS_ERROR("Invalid value for 'max_slave_connections': %s", str);
rval = false;
}
return rval;
}
/**
* @brief Handle an error reply for a client
*
* @param ses Session
* @param rses Router session
* @param backend_dcb DCB for the backend server that has failed
* @param errmsg GWBUF containing the error message
*/
static void handle_error_reply_client(MXS_SESSION *ses, ROUTER_CLIENT_SES *rses,
DCB *backend_dcb, GWBUF *errmsg)
{
mxs_session_state_t sesstate = ses->state;
DCB *client_dcb = ses->client_dcb;
SRWBackend backend = get_backend_from_dcb(rses, backend_dcb);
backend->close();
if (sesstate == SESSION_STATE_ROUTER_READY)
{
CHK_DCB(client_dcb);
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
}
}
static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, const SRWBackend& old, GWBUF *stored)
{
bool success = false;
if (!session_trx_is_active(rses->client_dcb->session))
{
/**
* Only try to retry the read if autocommit is enabled and we are
* outside of a transaction
*/
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
{
SRWBackend& backend = *it;
if (backend->in_use() && backend != old &&
!backend->is_master() &&
backend->is_slave())
{
/** Found a valid candidate; a non-master slave that's in use */
if (backend->write(stored))
{
MXS_INFO("Retrying failed read at '%s'.", backend->name());
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
LOG_RS(backend, REPLY_STATE_START);
backend->set_reply_state(REPLY_STATE_START);
rses->expected_responses++;
success = true;
break;
}
}
}
if (!success && rses->current_master && rses->current_master->in_use())
{
/**
* Either we failed to write to the slave or no valid slave was found.
* Try to retry the read on the master.
*/
if (rses->current_master->write(stored))
{
MXS_INFO("Retrying failed read at '%s'.", rses->current_master->name());
LOG_RS(rses->current_master, REPLY_STATE_START);
ss_dassert(rses->current_master->get_reply_state() == REPLY_STATE_DONE);
rses->current_master->set_reply_state(REPLY_STATE_START);
rses->expected_responses++;
success = true;
}
}
}
return success;
}
/**
* Check if there is backend reference pointing at failed DCB, and reset its
* flags. Then clear DCB's callback and finally : try to find replacement(s)
* for failed slave(s).
*
* This must be called with router lock.
*
* @param inst router instance
* @param rses router client session
* @param dcb failed DCB
* @param errmsg error message which is sent to client if it is waiting
*
* @return true if there are enough backend connections to continue, false if
* not
*/
static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
ROUTER_CLIENT_SES **rses,
DCB *backend_dcb, GWBUF *errmsg)
{
ROUTER_CLIENT_SES *myrses = *rses;
SRWBackend backend = get_backend_from_dcb(myrses, backend_dcb);
MXS_SESSION* ses = backend_dcb->session;
CHK_SESSION(ses);
if (backend->is_waiting_result())
{
/**
* A query was sent through the backend and it is waiting for a reply.
* Try to reroute the statement to a working server or send an error
* to the client.
*/
GWBUF *stored = NULL;
const SERVER *target;
if (!session_take_stmt(backend_dcb->session, &stored, &target) ||
target != backend->backend()->server ||
!reroute_stored_statement(*rses, backend, stored))
{
/**
* We failed to route the stored statement or no statement was
* stored for this server. Either way we can safely free the buffer
* and decrement the expected response count.
*/
gwbuf_free(stored);
myrses->expected_responses--;
if (backend->session_command_count())
{
/**
* The backend was executing a command that requires a reply.
* Send an error to the client to let it know the query has
* failed.
*/
DCB *client_dcb = ses->client_dcb;
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
}
if (myrses->expected_responses == 0)
{
/**
* The response from this server was the last one, try to
* route any stored queries
*/
route_stored_query(myrses);
}
}
}
/** Close the current connection */
backend->close();
int max_nslaves = rses_get_max_slavecount(myrses);
bool succp;
/**
* Try to get replacement slave or at least the minimum
* number of slave connections for router session.
*/
if (inst->rwsplit_config.disable_sescmd_history)
{
succp = have_enough_servers(myrses, 1, myrses->rses_nbackends, inst) ? true : false;
}
else
{
succp = select_connect_backend_servers(myrses->rses_nbackends, max_nslaves,
myrses->rses_config.slave_selection_criteria,
ses, inst, myrses,
connection_type::SLAVE);
}
return succp;
}
/**
* @brief Calculate whether we have enough servers to route a query
*
* @param p_rses Router session
* @param min_nsrv Minimum number of servers that is sufficient
* @param nsrv Actual number of servers
* @param router Router instance
*
* @return bool - whether enough, side effect is error logging
*/
static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv,
int router_nsrv, ROUTER_INSTANCE *router)
{
bool succp = true;
/** With too few servers session is not created */
if (router_nsrv < min_nsrv ||
MXS_MAX((rses)->rses_config.max_slave_connections,
(router_nsrv * (rses)->rses_config.rw_max_slave_conn_percent) /
100) < min_nsrv)
{
if (router_nsrv < min_nsrv)
{
MXS_ERROR("Unable to start %s service. There are "
"too few backend servers available. Found %d "
"when %d is required.",
router->service->name, router_nsrv, min_nsrv);
}
else
{
int pct = (rses)->rses_config.rw_max_slave_conn_percent / 100;
int nservers = router_nsrv * pct;
if ((rses)->rses_config.max_slave_connections < min_nsrv)
{
MXS_ERROR("Unable to start %s service. There are "
"too few backend servers configured in "
"MaxScale.cnf. Found %d when %d is required.",
router->service->name,
(rses)->rses_config.max_slave_connections, min_nsrv);
}
if (nservers < min_nsrv)
{
double dbgpct = ((double)min_nsrv / (double)router_nsrv) * 100.0;
MXS_ERROR("Unable to start %s service. There are "
"too few backend servers configured in "
"MaxScale.cnf. Found %d%% when at least %.0f%% "
"would be required.",
router->service->name,
(rses)->rses_config.rw_max_slave_conn_percent, dbgpct);
}
}
succp = false;
}
return succp;
}
bool route_stored_query(ROUTER_CLIENT_SES *rses)
{
bool rval = true;
if (rses->query_queue)
{
GWBUF* query_queue = modutil_get_next_MySQL_packet(&rses->query_queue);
query_queue = gwbuf_make_contiguous(query_queue);
/** Store the query queue locally for the duration of the routeQuery call.
* This prevents recursive calls into this function. */
GWBUF *temp_storage = rses->query_queue;
rses->query_queue = NULL;
if (!routeQuery((MXS_ROUTER*)rses->router, (MXS_ROUTER_SESSION*)rses, query_queue))
{
rval = false;
char* sql = modutil_get_SQL(query_queue);
if (sql)
{
MXS_ERROR("Routing query \"%s\" failed.", sql);
MXS_FREE(sql);
}
else
{
MXS_ERROR("Failed to route query.");
}
gwbuf_free(query_queue);
}
ss_dassert(rses->query_queue == NULL);
rses->query_queue = temp_storage;
}
return rval;
}
/**
* @brief Check if we have received a complete reply from the backend
*
* @param backend Backend reference
* @param buffer Buffer containing the response
*
* @return True if the complete response has been received
*/
bool reply_is_complete(SRWBackend backend, GWBUF *buffer)
{
mysql_server_cmd_t cmd = mxs_mysql_current_command(backend->dcb()->session);
if (backend->get_reply_state() == REPLY_STATE_START && !mxs_mysql_is_result_set(buffer))
{
if (cmd == MYSQL_COM_STMT_PREPARE || !mxs_mysql_more_results_after_ok(buffer))
{
/** Not a result set, we have the complete response */
LOG_RS(backend, REPLY_STATE_DONE);
backend->set_reply_state(REPLY_STATE_DONE);
}
}
else
{
bool more = false;
int old_eof = backend->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
int n_eof = modutil_count_signal_packets(buffer, old_eof, &more);
if (n_eof == 0)
{
/** Waiting for the EOF packet after the column definitions */
LOG_RS(backend, REPLY_STATE_RSET_COLDEF);
backend->set_reply_state(REPLY_STATE_RSET_COLDEF);
}
else if (n_eof == 1 && cmd != MYSQL_COM_FIELD_LIST)
{
/** Waiting for the EOF packet after the rows */
LOG_RS(backend, REPLY_STATE_RSET_ROWS);
backend->set_reply_state(REPLY_STATE_RSET_ROWS);
}
else
{
/** We either have a complete result set or a response to
* a COM_FIELD_LIST command */
ss_dassert(n_eof == 2 || (n_eof == 1 && cmd == MYSQL_COM_FIELD_LIST));
LOG_RS(backend, REPLY_STATE_DONE);
backend->set_reply_state(REPLY_STATE_DONE);
if (more)
{
/** The server will send more resultsets */
LOG_RS(backend, REPLY_STATE_START);
backend->set_reply_state(REPLY_STATE_START);
}
}
}
return backend->get_reply_state() == REPLY_STATE_DONE;
}
void close_all_connections(ROUTER_CLIENT_SES* rses)
{
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
{
SRWBackend& backend = *it;
if (backend->in_use())
{
backend->close();
}
}
}
/**
* API function definitions
*/
/**
* @brief Create an instance of the read/write router (API).
*
* Create an instance of read/write statement router within the MaxScale. One
* instance of the router is required for each service that is defined in the
* configuration as using this router. One instance of the router will handle
* multiple connections (or router sessions).
*
* @param service The service this router is being create for
* @param options The options for this query router
* @return NULL in failure, pointer to router in success.
*/
static MXS_ROUTER *createInstance(SERVICE *service, char **options)
{
ROUTER_INSTANCE* router = new (std::nothrow) ROUTER_INSTANCE;
if (router == NULL)
{
return NULL;
}
router->service = service;
/*
* Until we know otherwise assume we have some available slaves.
*/
router->available_slaves = true;
/** By default, the client connection is closed immediately when a master
* failure is detected */
router->rwsplit_config.master_failure_mode = RW_FAIL_INSTANTLY;
MXS_CONFIG_PARAMETER *params = service->svc_config_param;
router->rwsplit_config.use_sql_variables_in =
(mxs_target_t)config_get_enum(params, "use_sql_variables_in",
use_sql_variables_in_values);
router->rwsplit_config.slave_selection_criteria =
(select_criteria_t)config_get_enum(params, "slave_selection_criteria",
slave_selection_criteria_values);
router->rwsplit_config.master_failure_mode =
(enum failure_mode)config_get_enum(params, "master_failure_mode",
master_failure_mode_values);
router->rwsplit_config.max_slave_replication_lag = config_get_integer(params, "max_slave_replication_lag");
router->rwsplit_config.retry_failed_reads = config_get_bool(params, "retry_failed_reads");
router->rwsplit_config.strict_multi_stmt = config_get_bool(params, "strict_multi_stmt");
router->rwsplit_config.disable_sescmd_history = config_get_bool(params, "disable_sescmd_history");
router->rwsplit_config.max_sescmd_history = config_get_integer(params, "max_sescmd_history");
router->rwsplit_config.master_accept_reads = config_get_bool(params, "master_accept_reads");
router->rwsplit_config.connection_keepalive = config_get_integer(params, "connection_keepalive");
if (!handle_max_slaves(router, config_get_string(params, "max_slave_connections")) ||
(options && !rwsplit_process_router_options(router, options)))
{
delete router;
return NULL;
}
/** These options cancel each other out */
if (router->rwsplit_config.disable_sescmd_history &&
router->rwsplit_config.max_sescmd_history > 0)
{
router->rwsplit_config.max_sescmd_history = 0;
}
return (MXS_ROUTER*)router;
}
/**
* @brief Associate a new session with this instance of the router (API).
*
* The session is used to store all the data required by the router for a
* particular client connection. The instance of the router that relates to a
* particular service is passed as the first parameter. The second parameter is
* the session that has been created in response to the request from a client
* for a connection. The passed session contains generic information; this
* function creates the session structure that holds router specific data.
* There is often a one to one relationship between sessions and router
* sessions, although it is possible to create configurations where a
* connection is handled by multiple routers, one after another.
*
* @param instance The router instance data
* @param session The MaxScale session (generic connection data)
* @return Session specific data for this session, i.e. a router session
*/
static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *session)
{
ROUTER_INSTANCE* router = (ROUTER_INSTANCE*)router_inst;
ROUTER_CLIENT_SES* client_rses = new (std::nothrow) ROUTER_CLIENT_SES;
if (client_rses == NULL)
{
delete client_rses;
return NULL;
}
client_rses->rses_chk_top = CHK_NUM_ROUTER_SES;
client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES;
client_rses->rses_closed = false;
client_rses->router = router;
client_rses->client_dcb = session->client_dcb;
client_rses->have_tmp_tables = false;
client_rses->expected_responses = 0;
client_rses->query_queue = NULL;
client_rses->load_data_state = LOAD_DATA_INACTIVE;
client_rses->sent_sescmd = 0;
client_rses->recv_sescmd = 0;
client_rses->sescmd_count = 1; // Needs to be a positive number to work
memcpy(&client_rses->rses_config, &router->rwsplit_config, sizeof(client_rses->rses_config));
int router_nservers = router->service->n_dbref;
const int min_nservers = 1; /*< hard-coded for now */
if (!have_enough_servers(client_rses, min_nservers, router_nservers, router))
{
delete client_rses;
return NULL;
}
for (SERVER_REF *sref = router->service->dbref; sref; sref = sref->next)
{
if (sref->active)
{
client_rses->backends.push_back(SRWBackend(new RWBackend(sref)));
}
}
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
int max_nslaves = rses_get_max_slavecount(client_rses);
if (!select_connect_backend_servers(router_nservers, max_nslaves,
client_rses->rses_config.slave_selection_criteria,
session, router, client_rses,
connection_type::ALL))
{
/**
* Master and at least <min_nslaves> slaves must be found if the router is
* in the strict mode. If sessions without master are allowed, only
* <min_nslaves> slaves must be found.
*/
delete client_rses;
return NULL;
}
if (client_rses->rses_config.rw_max_slave_conn_percent)
{
int n_conn = 0;
double pct = (double)client_rses->rses_config.rw_max_slave_conn_percent / 100.0;
n_conn = MXS_MAX(floor((double)client_rses->rses_nbackends * pct), 1);
client_rses->rses_config.max_slave_connections = n_conn;
}
router->stats.n_sessions += 1;
return (MXS_ROUTER_SESSION*)client_rses;
}
/**
* @brief Close a router session (API).
*
* Close a session with the router, this is the mechanism by which a router
* may cleanup data structure etc. The instance of the router that relates to
* the relevant service is passed, along with the router session that is to
* be closed. Typically the function is used in conjunction with freeSession
* which will release the resources used by a router session (see below).
*
* @param instance The router instance data
* @param session The router session being closed
*/
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
{
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
CHK_CLIENT_RSES(router_cli_ses);
if (!router_cli_ses->rses_closed)
{
/**
* Mark router session as closed. @c rses_closed is checked at the start
* of every API function to quickly stop the processing of closed sessions.
*/
router_cli_ses->rses_closed = true;
close_all_connections(router_cli_ses);
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) &&
router_cli_ses->sescmd_list.size())
{
std::string sescmdstr;
for (mxs::SessionCommandList::iterator it = router_cli_ses->sescmd_list.begin();
it != router_cli_ses->sescmd_list.end(); it++)
{
mxs::SSessionCommand& scmd = *it;
sescmdstr += scmd->to_string();
sescmdstr += "\n";
}
MXS_INFO("Executed session commands:\n%s", sescmdstr.c_str());
}
}
}
/**
* @brief Free a router session (API).
*
* When a router session has been closed, freeSession can be called to free
* allocated resources.
*
* @param router_instance The router instance the session belongs to
* @param router_client_session Client session
*
*/
static void freeSession(MXS_ROUTER *router_instance, MXS_ROUTER_SESSION *router_client_session)
{
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session;
delete router_cli_ses;
}
/**
* @brief The main routing entry point for a query (API)
*
* The routeQuery function will make the routing decision based on the contents
* of the instance, session and the query itself. The query always represents
* a complete MariaDB/MySQL packet because we define the RCAP_TYPE_STMT_INPUT in
* getCapabilities().
*
* @param instance Router instance
* @param router_session Router session associated with the client
* @param querybuf Buffer containing the query
* @return 1 on success, 0 on error
*/
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *querybuf)
{
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *) instance;
ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *) router_session;
int rval = 0;
CHK_CLIENT_RSES(rses);
if (rses->rses_closed)
{
closed_session_reply(querybuf);
}
else
{
if ((rses->expected_responses == 0 && rses->query_queue == NULL) ||
rses->load_data_state == LOAD_DATA_ACTIVE)
{
/** No active or pending queries */
if (route_single_stmt(inst, rses, querybuf))
{
rval = 1;
}
}
else
{
ss_dassert(rses->expected_responses || rses->query_queue);
/** We are already processing a request from the client. Store the
* new query and wait for the previous one to complete. */
MXS_DEBUG("Storing query (len: %d cmd: %0x), expecting %d replies",
gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4],
rses->expected_responses);
rses->query_queue = gwbuf_append(rses->query_queue, querybuf);
querybuf = NULL;
rval = 1;
if (rses->expected_responses == 0 && !route_stored_query(rses))
{
rval = 0;
}
}
}
if (querybuf != NULL)
{
gwbuf_free(querybuf);
}
return rval;
}
/**
* @brief Diagnostics routine (API)
*
* Print query router statistics to the DCB passed in
*
* @param instance The router instance
* @param dcb The DCB for diagnostic output
*/
static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
const char *weightby = serviceGetWeightingParameter(router->service);
double master_pct = 0.0, slave_pct = 0.0, all_pct = 0.0;
dcb_printf(dcb, "\n");
dcb_printf(dcb, "\tuse_sql_variables_in: %s\n",
mxs_target_to_str(router->rwsplit_config.use_sql_variables_in));
dcb_printf(dcb, "\tslave_selection_criteria: %s\n",
select_criteria_to_str(router->rwsplit_config.slave_selection_criteria));
dcb_printf(dcb, "\tmaster_failure_mode: %s\n",
failure_mode_to_str(router->rwsplit_config.master_failure_mode));
dcb_printf(dcb, "\tmax_slave_replication_lag: %d\n",
router->rwsplit_config.max_slave_replication_lag);
dcb_printf(dcb, "\tretry_failed_reads: %s\n",
router->rwsplit_config.retry_failed_reads ? "true" : "false");
dcb_printf(dcb, "\tstrict_multi_stmt: %s\n",
router->rwsplit_config.strict_multi_stmt ? "true" : "false");
dcb_printf(dcb, "\tdisable_sescmd_history: %s\n",
router->rwsplit_config.disable_sescmd_history ? "true" : "false");
dcb_printf(dcb, "\tmax_sescmd_history: %lu\n",
router->rwsplit_config.max_sescmd_history);
dcb_printf(dcb, "\tmaster_accept_reads: %s\n",
router->rwsplit_config.master_accept_reads ? "true" : "false");
dcb_printf(dcb, "\n");
if (router->stats.n_queries > 0)
{
master_pct = ((double)router->stats.n_master / (double)router->stats.n_queries) * 100.0;
slave_pct = ((double)router->stats.n_slave / (double)router->stats.n_queries) * 100.0;
all_pct = ((double)router->stats.n_all / (double)router->stats.n_queries) * 100.0;
}
dcb_printf(dcb, "\tNumber of router sessions: %" PRIu64 "\n",
router->stats.n_sessions);
dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n",
router->service->stats.n_current);
dcb_printf(dcb, "\tNumber of queries forwarded: %" PRIu64 "\n",
router->stats.n_queries);
dcb_printf(dcb, "\tNumber of queries forwarded to master: %" PRIu64 " (%.2f%%)\n",
router->stats.n_master, master_pct);
dcb_printf(dcb, "\tNumber of queries forwarded to slave: %" PRIu64 " (%.2f%%)\n",
router->stats.n_slave, slave_pct);
dcb_printf(dcb, "\tNumber of queries forwarded to all: %" PRIu64 " (%.2f%%)\n",
router->stats.n_all, all_pct);
if (*weightby)
{
dcb_printf(dcb, "\tConnection distribution based on %s "
"server parameter.\n",
weightby);
dcb_printf(dcb, "\t\tServer Target %% Connections "
"Operations\n");
dcb_printf(dcb, "\t\t Global Router\n");
for (SERVER_REF *ref = router->service->dbref; ref; ref = ref->next)
{
dcb_printf(dcb, "\t\t%-20s %3.1f%% %-6d %-6d %d\n",
ref->server->unique_name, (float)ref->weight / 10,
ref->server->stats.n_current, ref->connections,
ref->server->stats.n_current_ops);
}
}
}
/**
* @brief Diagnostics routine (API)
*
* Print query router statistics to the DCB passed in
*
* @param instance The router instance
* @param dcb The DCB for diagnostic output
*/
static json_t* diagnostics_json(const MXS_ROUTER *instance)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
json_t* rval = json_object();
json_object_set_new(rval, "use_sql_variables_in",
json_string(mxs_target_to_str(router->rwsplit_config.use_sql_variables_in)));
json_object_set_new(rval, "slave_selection_criteria",
json_string(select_criteria_to_str(router->rwsplit_config.slave_selection_criteria)));
json_object_set_new(rval, "master_failure_mode",
json_string(failure_mode_to_str(router->rwsplit_config.master_failure_mode)));
json_object_set_new(rval, "max_slave_replication_lag",
json_integer(router->rwsplit_config.max_slave_replication_lag));
json_object_set_new(rval, "retry_failed_reads",
json_boolean(router->rwsplit_config.retry_failed_reads));
json_object_set_new(rval, "strict_multi_stmt",
json_boolean(router->rwsplit_config.strict_multi_stmt));
json_object_set_new(rval, "disable_sescmd_history",
json_boolean(router->rwsplit_config.disable_sescmd_history));
json_object_set_new(rval, "max_sescmd_history",
json_integer(router->rwsplit_config.max_sescmd_history));
json_object_set_new(rval, "master_accept_reads",
json_boolean(router->rwsplit_config.master_accept_reads));
json_object_set_new(rval, "connections", json_integer(router->stats.n_sessions));
json_object_set_new(rval, "current_connections", json_integer(router->service->stats.n_current));
json_object_set_new(rval, "queries", json_integer(router->stats.n_queries));
json_object_set_new(rval, "route_master", json_integer(router->stats.n_master));
json_object_set_new(rval, "route_slave", json_integer(router->stats.n_slave));
json_object_set_new(rval, "route_all", json_integer(router->stats.n_all));
const char *weightby = serviceGetWeightingParameter(router->service);
if (*weightby)
{
json_object_set_new(rval, "weightby", json_string(weightby));
}
return rval;
}
/**
* @brief Client Reply routine (API)
*
* The routine will reply to client for session change with master server data
*
* @param instance The router instance
* @param router_session The router session
* @param backend_dcb The backend DCB
* @param queue The GWBUF with reply data
*/
static void clientReply(MXS_ROUTER *instance,
MXS_ROUTER_SESSION *router_session,
GWBUF *writebuf,
DCB *backend_dcb)
{
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)instance;
DCB *client_dcb = backend_dcb->session->client_dcb;
CHK_CLIENT_RSES(router_cli_ses);
/**
* Lock router client session for secure read of router session members.
* Note that this could be done without lock by using version #
*/
if (router_cli_ses->rses_closed)
{
gwbuf_free(writebuf);
return;
}
/**
* 1. Check if backend received reply to sescmd.
* 2. Check sescmd's state whether OK_PACKET has been
* sent to client already and if not, lock property cursor,
* reply to client, and move property cursor forward. Finally
* release the lock.
* 3. If reply for this sescmd is sent, lock property cursor
* and
*/
SRWBackend backend = get_backend_from_dcb(router_cli_ses, backend_dcb);
/** Statement was successfully executed, free the stored statement */
session_clear_stmt(backend_dcb->session);
ss_dassert(backend->get_reply_state() != REPLY_STATE_DONE);
if (reply_is_complete(backend, writebuf))
{
/** Got a complete reply, acknowledge the write decrement expected response count */
backend->ack_write();
router_cli_ses->expected_responses--;
ss_dassert(router_cli_ses->expected_responses >= 0);
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
}
else
{
MXS_DEBUG("Reply not yet complete, waiting for %d replies", router_cli_ses->expected_responses);
}
/**
* Active cursor means that reply is from session command
* execution.
*/
if (backend->session_command_count())
{
check_session_command_reply(writebuf, backend);
/** This discards all responses that have already been sent to the client */
bool rconn = false;
process_sescmd_response(router_cli_ses, backend, &writebuf, &rconn);
if (rconn && !router_inst->rwsplit_config.disable_sescmd_history)
{
select_connect_backend_servers(
router_cli_ses->rses_nbackends,
router_cli_ses->rses_config.max_slave_connections,
router_cli_ses->rses_config.slave_selection_criteria,
router_cli_ses->client_dcb->session,
router_cli_ses->router,
router_cli_ses,
connection_type::SLAVE);
}
}
bool queue_routed = false;
if (router_cli_ses->expected_responses == 0)
{
for (SRWBackendList::iterator it = router_cli_ses->backends.begin();
it != router_cli_ses->backends.end(); it++)
{
ss_dassert((*it)->get_reply_state() == REPLY_STATE_DONE);
}
queue_routed = router_cli_ses->query_queue != NULL;
route_stored_query(router_cli_ses);
}
else
{
ss_dassert(router_cli_ses->expected_responses > 0);
}
if (writebuf && client_dcb)
{
/** Write reply to client DCB */
MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
}
/** Check pending session commands */
else if (!queue_routed && backend->session_command_count())
{
MXS_INFO("Backend %s processed reply and starts to execute active cursor.",
backend->uri());
if (backend->execute_session_command())
{
router_cli_ses->expected_responses++;
}
}
}
/**
* @brief Get router capabilities (API)
*
* Return a bit map indicating the characteristics of this particular router.
* In this case, the only bit set indicates that the router wants to receive
* data for routing as whole SQL statements.
*
* @return RCAP_TYPE_STMT_INPUT.
*/
static uint64_t getCapabilities(MXS_ROUTER* instance)
{
return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_STMT_OUTPUT;
}
/*
* This is the end of the API functions, and the start of functions that are
* used by the API functions and also used in other modules of the router
* code. Their prototypes are included in rwsplit_internal.h since these
* functions are not intended for use outside the read write split router.
*/
/**
* @brief Router error handling routine (API)
*
* Error Handler routine to resolve _backend_ failures. If it succeeds then
* there are enough operative backends available and connected. Otherwise it
* fails, and session is terminated.
*
* @param instance The router instance
* @param router_session The router session
* @param errmsgbuf The error message to reply
* @param backend_dcb The backend DCB
* @param action The action: ERRACT_NEW_CONNECTION or
* ERRACT_REPLY_CLIENT
* @param succp Result of action: true iff router can continue
*
* Even if succp == true connecting to new slave may have failed. succp is to
* tell whether router has enough master/slave connections to continue work.
*/
static void handleError(MXS_ROUTER *instance,
MXS_ROUTER_SESSION *router_session,
GWBUF *errmsgbuf,
DCB *problem_dcb,
mxs_error_action_t action,
bool *succp)
{
ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *)router_session;
CHK_CLIENT_RSES(rses);
CHK_DCB(problem_dcb);
if (rses->rses_closed)
{
*succp = false;
return;
}
MXS_SESSION *session = problem_dcb->session;
ss_dassert(session);
SRWBackend backend = get_backend_from_dcb(rses, problem_dcb);
switch (action)
{
case ERRACT_NEW_CONNECTION:
{
/**
* If master has lost its Master status error can't be
* handled so that session could continue.
*/
if (rses->current_master && rses->current_master->dcb() == problem_dcb)
{
SERVER *srv = rses->current_master->server();
bool can_continue = false;
if (rses->rses_config.master_failure_mode != RW_FAIL_INSTANTLY &&
(!backend || !backend->is_waiting_result()))
{
/** The failure of a master is not considered a critical
* failure as partial functionality still remains. Reads
* are allowed as long as slave servers are available
* and writes will cause an error to be returned.
*
* If we were waiting for a response from the master, we
* can't be sure whether it was executed or not. In this
* case the safest thing to do is to close the client
* connection. */
can_continue = true;
}
else if (!SERVER_IS_MASTER(srv) && !srv->master_err_is_logged)
{
MXS_ERROR("Server %s:%d lost the master status. Readwritesplit "
"service can't locate the master. Client sessions "
"will be closed.", srv->name, srv->port);
srv->master_err_is_logged = true;
}
*succp = can_continue;
if (backend)
{
backend->close(mxs::Backend::CLOSE_FATAL);
}
else
{
MXS_ERROR("Server %s:%d lost the master status but could not locate the "
"corresponding backend ref.", srv->name, srv->port);
}
}
else if (backend)
{
/** Check whether problem_dcb is same as dcb of rses->target_node
* and within READ ONLY transaction:
* if true reset rses->target_node and close session
*/
if (rses->target_node &&
(rses->target_node->dcb() == problem_dcb &&
session_trx_is_read_only(problem_dcb->session)))
{
MXS_ERROR("forced_node SLAVE %s in opened READ ONLY transaction has failed:"
" closing session",
problem_dcb->server->unique_name);
rses->target_node.reset();
*succp = false;
break;
}
/** We should reconnect only if we find a backend for this
* DCB. If this DCB is an older DCB that has been closed,
* we can ignore it. */
*succp = handle_error_new_connection(inst, &rses, problem_dcb, errmsgbuf);
}
if (backend)
{
/** This is a valid DCB for a backend ref */
if (backend->in_use() && backend->dcb() == problem_dcb)
{
ss_dassert(false);
MXS_ERROR("Backend '%s' is still in use and points to the problem DCB.",
backend->name());
}
}
else
{
const char *remote = problem_dcb->state == DCB_STATE_POLLING &&
problem_dcb->server ? problem_dcb->server->unique_name : "CLOSED";
MXS_ERROR("DCB connected to '%s' is not in use by the router "
"session, not closing it. DCB is in state '%s'",
remote, STRDCBSTATE(problem_dcb->state));
}
break;
}
case ERRACT_REPLY_CLIENT:
{
handle_error_reply_client(session, rses, problem_dcb, errmsgbuf);
*succp = false; /*< no new backend servers were made available */
break;
}
default:
ss_dassert(!true);
*succp = false;
break;
}
}
MXS_BEGIN_DECLS
/**
* The module entry point routine. It is this routine that
* must return the structure that is referred to as the
* "module object", this is a structure with the set of
* external entry points for this module.
*
* @return The module object
*/
MXS_MODULE *MXS_CREATE_MODULE()
{
static MXS_ROUTER_OBJECT MyObject =
{
createInstance,
newSession,
closeSession,
freeSession,
routeQuery,
diagnostics,
diagnostics_json,
clientReply,
handleError,
getCapabilities,
NULL
};
static MXS_MODULE info =
{
MXS_MODULE_API_ROUTER, MXS_MODULE_GA, MXS_ROUTER_VERSION,
"A Read/Write splitting router for enhancement read scalability",
"V1.1.0",
RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_STMT_OUTPUT,
&MyObject,
NULL, /* Process init. */
NULL, /* Process finish. */
NULL, /* Thread init. */
NULL, /* Thread finish. */
{
{
"use_sql_variables_in",
MXS_MODULE_PARAM_ENUM,
"all",
MXS_MODULE_OPT_NONE,
use_sql_variables_in_values
},
{
"slave_selection_criteria",
MXS_MODULE_PARAM_ENUM,
"LEAST_CURRENT_OPERATIONS",
MXS_MODULE_OPT_NONE,
slave_selection_criteria_values
},
{
"master_failure_mode",
MXS_MODULE_PARAM_ENUM,
"fail_instantly",
MXS_MODULE_OPT_NONE,
master_failure_mode_values
},
{"max_slave_replication_lag", MXS_MODULE_PARAM_INT, "-1"},
{"max_slave_connections", MXS_MODULE_PARAM_STRING, MAX_SLAVE_COUNT},
{"retry_failed_reads", MXS_MODULE_PARAM_BOOL, "true"},
{"disable_sescmd_history", MXS_MODULE_PARAM_BOOL, "true"},
{"max_sescmd_history", MXS_MODULE_PARAM_COUNT, "0"},
{"strict_multi_stmt", MXS_MODULE_PARAM_BOOL, "true"},
{"master_accept_reads", MXS_MODULE_PARAM_BOOL, "false"},
{"connection_keepalive", MXS_MODULE_PARAM_COUNT, "0"},
{MXS_END_MODULE_PARAMS}
}
};
MXS_NOTICE("Initializing statement-based read/write split router module.");
return &info;
}
MXS_END_DECLS