Refactor readwritesplit instance class

Changed the ROUTER_INSTANCE struct to a class and added functions for
common operations.

Renamed configuration and statistics structures and added constructors.

Moved objects around in readwritesplit.hh to be ready for a split into
multiple headers.
This commit is contained in:
Markus Mäkelä
2017-06-25 15:00:46 +03:00
parent 8ed16fd9d2
commit 917fe21f72
6 changed files with 310 additions and 274 deletions

View File

@ -51,42 +51,16 @@
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session, GWBUF *queue);
static bool rwsplit_process_router_options(ROUTER_INSTANCE *router,
static bool rwsplit_process_router_options(RWSplit *router,
char **options);
static void handle_error_reply_client(MXS_SESSION *ses, ROUTER_CLIENT_SES *rses,
DCB *backend_dcb, GWBUF *errmsg);
static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
static bool handle_error_new_connection(RWSplit *inst,
ROUTER_CLIENT_SES **rses,
DCB *backend_dcb, GWBUF *errmsg);
static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv,
int router_nsrv, ROUTER_INSTANCE *router);
/**
* Enum values for router parameters
*/
static const MXS_ENUM_VALUE use_sql_variables_in_values[] =
{
{"all", TYPE_ALL},
{"master", TYPE_MASTER},
{NULL}
};
static const MXS_ENUM_VALUE slave_selection_criteria_values[] =
{
{"LEAST_GLOBAL_CONNECTIONS", LEAST_GLOBAL_CONNECTIONS},
{"LEAST_ROUTER_CONNECTIONS", LEAST_ROUTER_CONNECTIONS},
{"LEAST_BEHIND_MASTER", LEAST_BEHIND_MASTER},
{"LEAST_CURRENT_OPERATIONS", LEAST_CURRENT_OPERATIONS},
{NULL}
};
static const MXS_ENUM_VALUE master_failure_mode_values[] =
{
{"fail_instantly", RW_FAIL_INSTANTLY},
{"fail_on_write", RW_FAIL_ON_WRITE},
{"error_on_write", RW_ERROR_ON_WRITE},
{NULL}
};
int router_nsrv, RWSplit *router);
static bool route_stored_query(ROUTER_CLIENT_SES *rses);
/**
* Internal functions
@ -186,7 +160,7 @@ SRWBackend get_backend_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb)
* @param options Router options
* @return True on success, false if a configuration error was found
*/
static bool rwsplit_process_router_options(ROUTER_INSTANCE *router,
static bool rwsplit_process_router_options(Config& config,
char **options)
{
int i;
@ -226,47 +200,47 @@ static bool rwsplit_process_router_options(ROUTER_INSTANCE *router,
"Allowed values are LEAST_GLOBAL_CONNECTIONS, "
"LEAST_ROUTER_CONNECTIONS, LEAST_BEHIND_MASTER,"
"and LEAST_CURRENT_OPERATIONS.",
STRCRITERIA(router->rwsplit_config.slave_selection_criteria));
STRCRITERIA(config.slave_selection_criteria));
success = false;
}
else
{
router->rwsplit_config.slave_selection_criteria = c;
config.slave_selection_criteria = c;
}
}
else if (strcmp(options[i], "max_sescmd_history") == 0)
{
router->rwsplit_config.max_sescmd_history = atoi(value);
config.max_sescmd_history = atoi(value);
}
else if (strcmp(options[i], "disable_sescmd_history") == 0)
{
router->rwsplit_config.disable_sescmd_history = config_truth_value(value);
config.disable_sescmd_history = config_truth_value(value);
}
else if (strcmp(options[i], "master_accept_reads") == 0)
{
router->rwsplit_config.master_accept_reads = config_truth_value(value);
config.master_accept_reads = config_truth_value(value);
}
else if (strcmp(options[i], "strict_multi_stmt") == 0)
{
router->rwsplit_config.strict_multi_stmt = config_truth_value(value);
config.strict_multi_stmt = config_truth_value(value);
}
else if (strcmp(options[i], "retry_failed_reads") == 0)
{
router->rwsplit_config.retry_failed_reads = config_truth_value(value);
config.retry_failed_reads = config_truth_value(value);
}
else if (strcmp(options[i], "master_failure_mode") == 0)
{
if (strcasecmp(value, "fail_instantly") == 0)
{
router->rwsplit_config.master_failure_mode = RW_FAIL_INSTANTLY;
config.master_failure_mode = RW_FAIL_INSTANTLY;
}
else if (strcasecmp(value, "fail_on_write") == 0)
{
router->rwsplit_config.master_failure_mode = RW_FAIL_ON_WRITE;
config.master_failure_mode = RW_FAIL_ON_WRITE;
}
else if (strcasecmp(value, "error_on_write") == 0)
{
router->rwsplit_config.master_failure_mode = RW_ERROR_ON_WRITE;
config.master_failure_mode = RW_ERROR_ON_WRITE;
}
else
{
@ -287,7 +261,7 @@ static bool rwsplit_process_router_options(ROUTER_INSTANCE *router,
}
// TODO: Don't process parameters in readwritesplit
static bool handle_max_slaves(ROUTER_INSTANCE *router, const char *str)
static bool handle_max_slaves(Config& config, const char *str)
{
bool rval = true;
char *endptr;
@ -295,13 +269,13 @@ static bool handle_max_slaves(ROUTER_INSTANCE *router, const char *str)
if (*endptr == '%' && *(endptr + 1) == '\0')
{
router->rwsplit_config.rw_max_slave_conn_percent = val;
router->rwsplit_config.max_slave_connections = 0;
config.rw_max_slave_conn_percent = val;
config.max_slave_connections = 0;
}
else if (*endptr == '\0')
{
router->rwsplit_config.max_slave_connections = val;
router->rwsplit_config.rw_max_slave_conn_percent = 0;
config.max_slave_connections = val;
config.rw_max_slave_conn_percent = 0;
}
else
{
@ -407,7 +381,7 @@ static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, const SRWBackend&
* @return true if there are enough backend connections to continue, false if
* not
*/
static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
static bool handle_error_new_connection(RWSplit *inst,
ROUTER_CLIENT_SES **rses,
DCB *backend_dcb, GWBUF *errmsg)
{
@ -470,7 +444,7 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
* Try to get replacement slave or at least the minimum
* number of slave connections for router session.
*/
if (inst->rwsplit_config.disable_sescmd_history)
if (inst->config().disable_sescmd_history)
{
succp = have_enough_servers(myrses, 1, myrses->rses_nbackends, inst) ? true : false;
}
@ -496,7 +470,7 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
* @return bool - whether enough, side effect is error logging
*/
static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv,
int router_nsrv, ROUTER_INSTANCE *router)
int router_nsrv, RWSplit *router)
{
bool succp = true;
@ -511,7 +485,7 @@ static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv,
MXS_ERROR("Unable to start %s service. There are "
"too few backend servers available. Found %d "
"when %d is required.",
router->service->name, router_nsrv, min_nsrv);
router->service()->name, router_nsrv, min_nsrv);
}
else
{
@ -523,7 +497,7 @@ static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv,
MXS_ERROR("Unable to start %s service. There are "
"too few backend servers configured in "
"MaxScale.cnf. Found %d when %d is required.",
router->service->name,
router->service()->name,
(rses)->rses_config.max_slave_connections, min_nsrv);
}
if (nservers < min_nsrv)
@ -533,7 +507,7 @@ static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv,
"too few backend servers configured in "
"MaxScale.cnf. Found %d%% when at least %.0f%% "
"would be required.",
router->service->name,
router->service()->name,
(rses)->rses_config.rw_max_slave_conn_percent, dbgpct);
}
}
@ -543,7 +517,17 @@ static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv,
return succp;
}
bool route_stored_query(ROUTER_CLIENT_SES *rses)
/**
* @brief Route a stored query
*
* When multiple queries are executed in a pipeline fashion, the readwritesplit
* stores the extra queries in a queue. This queue is emptied after reading a
* reply from the backend server.
*
* @param rses Router client session
* @return True if a stored query was routed successfully
*/
static bool route_stored_query(ROUTER_CLIENT_SES *rses)
{
bool rval = true;
@ -666,6 +650,36 @@ void close_all_connections(ROUTER_CLIENT_SES* rses)
}
}
RWSplit::RWSplit(SERVICE* service, const Config& config):
m_service(service),
m_config(config)
{
}
RWSplit::~RWSplit()
{
}
SERVICE* RWSplit::service() const
{
return m_service;
}
Config& RWSplit::config()
{
return m_config;
}
Stats& RWSplit::stats()
{
return m_stats;
}
ROUTER_CLIENT_SES::ROUTER_CLIENT_SES(const Config& config):
rses_config(config)
{
}
/**
* API function definitions
*/
@ -684,61 +698,23 @@ void close_all_connections(ROUTER_CLIENT_SES* rses)
*/
static MXS_ROUTER *createInstance(SERVICE *service, char **options)
{
ROUTER_INSTANCE* router = new (std::nothrow) ROUTER_INSTANCE;
if (router == NULL)
MXS_CONFIG_PARAMETER* params = service->svc_config_param;
Config config(params);
if (!handle_max_slaves(config, config_get_string(params, "max_slave_connections")) ||
(options && !rwsplit_process_router_options(config, options)))
{
return NULL;
}
router->service = service;
/*
* Until we know otherwise assume we have some available slaves.
*/
router->available_slaves = true;
/** By default, the client connection is closed immediately when a master
* failure is detected */
router->rwsplit_config.master_failure_mode = RW_FAIL_INSTANTLY;
MXS_CONFIG_PARAMETER *params = service->svc_config_param;
router->rwsplit_config.use_sql_variables_in =
(mxs_target_t)config_get_enum(params, "use_sql_variables_in",
use_sql_variables_in_values);
router->rwsplit_config.slave_selection_criteria =
(select_criteria_t)config_get_enum(params, "slave_selection_criteria",
slave_selection_criteria_values);
router->rwsplit_config.master_failure_mode =
(enum failure_mode)config_get_enum(params, "master_failure_mode",
master_failure_mode_values);
router->rwsplit_config.max_slave_replication_lag = config_get_integer(params, "max_slave_replication_lag");
router->rwsplit_config.retry_failed_reads = config_get_bool(params, "retry_failed_reads");
router->rwsplit_config.strict_multi_stmt = config_get_bool(params, "strict_multi_stmt");
router->rwsplit_config.disable_sescmd_history = config_get_bool(params, "disable_sescmd_history");
router->rwsplit_config.max_sescmd_history = config_get_integer(params, "max_sescmd_history");
router->rwsplit_config.master_accept_reads = config_get_bool(params, "master_accept_reads");
router->rwsplit_config.connection_keepalive = config_get_integer(params, "connection_keepalive");
if (!handle_max_slaves(router, config_get_string(params, "max_slave_connections")) ||
(options && !rwsplit_process_router_options(router, options)))
{
delete router;
return NULL;
}
/** These options cancel each other out */
if (router->rwsplit_config.disable_sescmd_history &&
router->rwsplit_config.max_sescmd_history > 0)
if (config.disable_sescmd_history && config.max_sescmd_history > 0)
{
router->rwsplit_config.max_sescmd_history = 0;
config.max_sescmd_history = 0;
}
return (MXS_ROUTER*)router;
return (MXS_ROUTER*)new (std::nothrow) RWSplit(service, config);
}
/**
@ -760,8 +736,8 @@ static MXS_ROUTER *createInstance(SERVICE *service, char **options)
*/
static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *session)
{
ROUTER_INSTANCE* router = (ROUTER_INSTANCE*)router_inst;
ROUTER_CLIENT_SES* client_rses = new (std::nothrow) ROUTER_CLIENT_SES;
RWSplit* router = (RWSplit*)router_inst;
ROUTER_CLIENT_SES* client_rses = new (std::nothrow) ROUTER_CLIENT_SES(router->config());
if (client_rses == NULL)
{
@ -781,9 +757,8 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess
client_rses->sent_sescmd = 0;
client_rses->recv_sescmd = 0;
client_rses->sescmd_count = 1; // Needs to be a positive number to work
memcpy(&client_rses->rses_config, &router->rwsplit_config, sizeof(client_rses->rses_config));
int router_nservers = router->service->n_dbref;
int router_nservers = router->service()->n_dbref;
const int min_nservers = 1; /*< hard-coded for now */
if (!have_enough_servers(client_rses, min_nservers, router_nservers, router))
@ -792,7 +767,7 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess
return NULL;
}
for (SERVER_REF *sref = router->service->dbref; sref; sref = sref->next)
for (SERVER_REF *sref = router->service()->dbref; sref; sref = sref->next)
{
if (sref->active)
{
@ -827,7 +802,7 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess
client_rses->rses_config.max_slave_connections = n_conn;
}
router->stats.n_sessions += 1;
router->stats().n_sessions += 1;
return (MXS_ROUTER_SESSION*)client_rses;
}
@ -907,7 +882,7 @@ static void freeSession(MXS_ROUTER *router_instance, MXS_ROUTER_SESSION *router_
*/
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *querybuf)
{
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *) instance;
RWSplit *inst = (RWSplit *) instance;
ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *) router_session;
int rval = 0;
@ -967,50 +942,50 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
*/
static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
const char *weightby = serviceGetWeightingParameter(router->service);
RWSplit *router = (RWSplit *)instance;
const char *weightby = serviceGetWeightingParameter(router->service());
double master_pct = 0.0, slave_pct = 0.0, all_pct = 0.0;
dcb_printf(dcb, "\n");
dcb_printf(dcb, "\tuse_sql_variables_in: %s\n",
mxs_target_to_str(router->rwsplit_config.use_sql_variables_in));
mxs_target_to_str(router->config().use_sql_variables_in));
dcb_printf(dcb, "\tslave_selection_criteria: %s\n",
select_criteria_to_str(router->rwsplit_config.slave_selection_criteria));
select_criteria_to_str(router->config().slave_selection_criteria));
dcb_printf(dcb, "\tmaster_failure_mode: %s\n",
failure_mode_to_str(router->rwsplit_config.master_failure_mode));
failure_mode_to_str(router->config().master_failure_mode));
dcb_printf(dcb, "\tmax_slave_replication_lag: %d\n",
router->rwsplit_config.max_slave_replication_lag);
router->config().max_slave_replication_lag);
dcb_printf(dcb, "\tretry_failed_reads: %s\n",
router->rwsplit_config.retry_failed_reads ? "true" : "false");
router->config().retry_failed_reads ? "true" : "false");
dcb_printf(dcb, "\tstrict_multi_stmt: %s\n",
router->rwsplit_config.strict_multi_stmt ? "true" : "false");
router->config().strict_multi_stmt ? "true" : "false");
dcb_printf(dcb, "\tdisable_sescmd_history: %s\n",
router->rwsplit_config.disable_sescmd_history ? "true" : "false");
router->config().disable_sescmd_history ? "true" : "false");
dcb_printf(dcb, "\tmax_sescmd_history: %lu\n",
router->rwsplit_config.max_sescmd_history);
router->config().max_sescmd_history);
dcb_printf(dcb, "\tmaster_accept_reads: %s\n",
router->rwsplit_config.master_accept_reads ? "true" : "false");
router->config().master_accept_reads ? "true" : "false");
dcb_printf(dcb, "\n");
if (router->stats.n_queries > 0)
if (router->stats().n_queries > 0)
{
master_pct = ((double)router->stats.n_master / (double)router->stats.n_queries) * 100.0;
slave_pct = ((double)router->stats.n_slave / (double)router->stats.n_queries) * 100.0;
all_pct = ((double)router->stats.n_all / (double)router->stats.n_queries) * 100.0;
master_pct = ((double)router->stats().n_master / (double)router->stats().n_queries) * 100.0;
slave_pct = ((double)router->stats().n_slave / (double)router->stats().n_queries) * 100.0;
all_pct = ((double)router->stats().n_all / (double)router->stats().n_queries) * 100.0;
}
dcb_printf(dcb, "\tNumber of router sessions: %" PRIu64 "\n",
router->stats.n_sessions);
router->stats().n_sessions);
dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n",
router->service->stats.n_current);
router->service()->stats.n_current);
dcb_printf(dcb, "\tNumber of queries forwarded: %" PRIu64 "\n",
router->stats.n_queries);
router->stats().n_queries);
dcb_printf(dcb, "\tNumber of queries forwarded to master: %" PRIu64 " (%.2f%%)\n",
router->stats.n_master, master_pct);
router->stats().n_master, master_pct);
dcb_printf(dcb, "\tNumber of queries forwarded to slave: %" PRIu64 " (%.2f%%)\n",
router->stats.n_slave, slave_pct);
router->stats().n_slave, slave_pct);
dcb_printf(dcb, "\tNumber of queries forwarded to all: %" PRIu64 " (%.2f%%)\n",
router->stats.n_all, all_pct);
router->stats().n_all, all_pct);
if (*weightby)
{
@ -1020,7 +995,7 @@ static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
dcb_printf(dcb, "\t\tServer Target %% Connections "
"Operations\n");
dcb_printf(dcb, "\t\t Global Router\n");
for (SERVER_REF *ref = router->service->dbref; ref; ref = ref->next)
for (SERVER_REF *ref = router->service()->dbref; ref; ref = ref->next)
{
dcb_printf(dcb, "\t\t%-20s %3.1f%% %-6d %-6d %d\n",
ref->server->unique_name, (float)ref->weight / 10,
@ -1040,38 +1015,38 @@ static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
*/
static json_t* diagnostics_json(const MXS_ROUTER *instance)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
RWSplit *router = (RWSplit *)instance;
json_t* rval = json_object();
json_object_set_new(rval, "use_sql_variables_in",
json_string(mxs_target_to_str(router->rwsplit_config.use_sql_variables_in)));
json_string(mxs_target_to_str(router->config().use_sql_variables_in)));
json_object_set_new(rval, "slave_selection_criteria",
json_string(select_criteria_to_str(router->rwsplit_config.slave_selection_criteria)));
json_string(select_criteria_to_str(router->config().slave_selection_criteria)));
json_object_set_new(rval, "master_failure_mode",
json_string(failure_mode_to_str(router->rwsplit_config.master_failure_mode)));
json_string(failure_mode_to_str(router->config().master_failure_mode)));
json_object_set_new(rval, "max_slave_replication_lag",
json_integer(router->rwsplit_config.max_slave_replication_lag));
json_integer(router->config().max_slave_replication_lag));
json_object_set_new(rval, "retry_failed_reads",
json_boolean(router->rwsplit_config.retry_failed_reads));
json_boolean(router->config().retry_failed_reads));
json_object_set_new(rval, "strict_multi_stmt",
json_boolean(router->rwsplit_config.strict_multi_stmt));
json_boolean(router->config().strict_multi_stmt));
json_object_set_new(rval, "disable_sescmd_history",
json_boolean(router->rwsplit_config.disable_sescmd_history));
json_boolean(router->config().disable_sescmd_history));
json_object_set_new(rval, "max_sescmd_history",
json_integer(router->rwsplit_config.max_sescmd_history));
json_integer(router->config().max_sescmd_history));
json_object_set_new(rval, "master_accept_reads",
json_boolean(router->rwsplit_config.master_accept_reads));
json_boolean(router->config().master_accept_reads));
json_object_set_new(rval, "connections", json_integer(router->stats.n_sessions));
json_object_set_new(rval, "current_connections", json_integer(router->service->stats.n_current));
json_object_set_new(rval, "queries", json_integer(router->stats.n_queries));
json_object_set_new(rval, "route_master", json_integer(router->stats.n_master));
json_object_set_new(rval, "route_slave", json_integer(router->stats.n_slave));
json_object_set_new(rval, "route_all", json_integer(router->stats.n_all));
json_object_set_new(rval, "connections", json_integer(router->stats().n_sessions));
json_object_set_new(rval, "current_connections", json_integer(router->service()->stats.n_current));
json_object_set_new(rval, "queries", json_integer(router->stats().n_queries));
json_object_set_new(rval, "route_master", json_integer(router->stats().n_master));
json_object_set_new(rval, "route_slave", json_integer(router->stats().n_slave));
json_object_set_new(rval, "route_all", json_integer(router->stats().n_all));
const char *weightby = serviceGetWeightingParameter(router->service);
const char *weightby = serviceGetWeightingParameter(router->service());
if (*weightby)
{
@ -1097,7 +1072,7 @@ static void clientReply(MXS_ROUTER *instance,
DCB *backend_dcb)
{
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)instance;
RWSplit *router_inst = (RWSplit *)instance;
DCB *client_dcb = backend_dcb->session->client_dcb;
CHK_CLIENT_RSES(router_cli_ses);
@ -1153,7 +1128,7 @@ static void clientReply(MXS_ROUTER *instance,
bool rconn = false;
process_sescmd_response(router_cli_ses, backend, &writebuf, &rconn);
if (rconn && !router_inst->rwsplit_config.disable_sescmd_history)
if (rconn && !router_inst->config().disable_sescmd_history)
{
select_connect_backend_servers(
router_cli_ses->rses_nbackends,
@ -1250,7 +1225,7 @@ static void handleError(MXS_ROUTER *instance,
bool *succp)
{
ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance;
RWSplit *inst = (RWSplit *)instance;
ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *)router_session;
CHK_CLIENT_RSES(rses);
CHK_DCB(problem_dcb);

View File

@ -13,7 +13,7 @@
*/
/**
* @file router.h - The read write split router module heder file
* @file Readwritesplit common header
*/
#define MXS_MODULE_NAME "readwritesplit"
@ -79,6 +79,33 @@ enum failure_mode
RW_ERROR_ON_WRITE /**< Don't close the connection but send an error for writes */
};
/**
* Enum values for router parameters
*/
static const MXS_ENUM_VALUE use_sql_variables_in_values[] =
{
{"all", TYPE_ALL},
{"master", TYPE_MASTER},
{NULL}
};
static const MXS_ENUM_VALUE slave_selection_criteria_values[] =
{
{"LEAST_GLOBAL_CONNECTIONS", LEAST_GLOBAL_CONNECTIONS},
{"LEAST_ROUTER_CONNECTIONS", LEAST_ROUTER_CONNECTIONS},
{"LEAST_BEHIND_MASTER", LEAST_BEHIND_MASTER},
{"LEAST_CURRENT_OPERATIONS", LEAST_CURRENT_OPERATIONS},
{NULL}
};
static const MXS_ENUM_VALUE master_failure_mode_values[] =
{
{"fail_instantly", RW_FAIL_INSTANTLY},
{"fail_on_write", RW_FAIL_ON_WRITE},
{"error_on_write", RW_ERROR_ON_WRITE},
{NULL}
};
/** States of a LOAD DATA LOCAL INFILE */
enum ld_state
{
@ -118,12 +145,137 @@ enum ld_state
#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED));
/** Reply state change debug logging */
#define LOG_RS(a, b) MXS_DEBUG("%s %s -> %s", (a)->uri(), \
rstostr((a)->get_reply_state()), rstostr(b));
struct Config
{
Config(MXS_CONFIG_PARAMETER* params):
slave_selection_criteria(
(select_criteria_t)config_get_enum(
params, "slave_selection_criteria", slave_selection_criteria_values)),
use_sql_variables_in(
(mxs_target_t)config_get_enum(
params, "use_sql_variables_in", use_sql_variables_in_values)),
master_failure_mode(
(enum failure_mode)config_get_enum(
params, "master_failure_mode", master_failure_mode_values)),
max_sescmd_history(config_get_integer(params, "max_sescmd_history")),
disable_sescmd_history(config_get_bool(params, "disable_sescmd_history")),
master_accept_reads(config_get_bool(params, "master_accept_reads")),
strict_multi_stmt(config_get_bool(params, "strict_multi_stmt")),
retry_failed_reads(config_get_bool(params, "retry_failed_reads")),
connection_keepalive(config_get_integer(params, "connection_keepalive")),
max_slave_replication_lag(config_get_integer(params, "max_slave_replication_lag")),
rw_max_slave_conn_percent(0),
max_slave_connections(0)
{
}
struct ROUTER_INSTANCE;
struct ROUTER_CLIENT_SES;
select_criteria_t slave_selection_criteria; /**< The slave selection criteria */
mxs_target_t use_sql_variables_in; /**< Whether to send user variables to
* master or all nodes */
failure_mode master_failure_mode; /**< Master server failure handling mode */
uint64_t max_sescmd_history; /**< Maximum amount of session commands to store */
bool disable_sescmd_history; /**< Disable session command history */
bool master_accept_reads; /**< Use master for reads */
bool strict_multi_stmt; /**< Force non-multistatement queries to be routed to
* the master after a multistatement query. */
bool retry_failed_reads; /**< Retry failed reads on other servers */
int connection_keepalive; /**< Send pings to servers that have been idle
* for too long */
int max_slave_replication_lag; /**< Maximum replication lag */
int rw_max_slave_conn_percent; /**< Maximum percentage of slaves to use for
* each connection*/
int max_slave_connections; /**< Maximum number of slaves for each connection*/
};
/**
* The statistics for this router instance
*/
struct Stats
{
public:
Stats():
n_sessions(0),
n_queries(0),
n_master(0),
n_slave(0),
n_all(0)
{
}
uint64_t n_sessions; /**< Number sessions created */
uint64_t n_queries; /**< Number of queries forwarded */
uint64_t n_master; /**< Number of stmts sent to master */
uint64_t n_slave; /**< Number of stmts sent to slave */
uint64_t n_all; /**< Number of stmts sent to all */
};
/**
* The per instance data for the router.
*/
class RWSplit
{
RWSplit(const RWSplit&);
RWSplit& operator=(const RWSplit&);
public:
RWSplit(SERVICE* service, const Config& config);
~RWSplit();
SERVICE* service() const;
Config& config();
Stats& stats();
private:
SERVICE* m_service; /**< Service where the router belongs*/
Config m_config;
Stats m_stats;
};
static inline const char* select_criteria_to_str(select_criteria_t type)
{
switch (type)
{
case LEAST_GLOBAL_CONNECTIONS:
return "LEAST_GLOBAL_CONNECTIONS";
case LEAST_ROUTER_CONNECTIONS:
return "LEAST_ROUTER_CONNECTIONS";
case LEAST_BEHIND_MASTER:
return "LEAST_BEHIND_MASTER";
case LEAST_CURRENT_OPERATIONS:
return "LEAST_CURRENT_OPERATIONS";
default:
return "UNDEFINED_CRITERIA";
}
}
static inline const char* failure_mode_to_str(enum failure_mode type)
{
switch (type)
{
case RW_FAIL_INSTANTLY:
return "fail_instantly";
case RW_FAIL_ON_WRITE:
return "fail_on_write";
case RW_ERROR_ON_WRITE:
return "error_on_write";
default:
ss_dassert(false);
return "UNDEFINED_MODE";
}
}
/**
* The following code is client session specific, to be moved into a separate file
*/
/** Enum for tracking client reply state */
enum reply_state_t
@ -134,26 +286,9 @@ enum reply_state_t
REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */
};
struct rwsplit_config_t
{
int rw_max_slave_conn_percent; /**< Maximum percentage of slaves
* to use for each connection*/
int max_slave_connections; /**< Maximum number of slaves for each connection*/
select_criteria_t slave_selection_criteria; /**< The slave selection criteria */
int max_slave_replication_lag; /**< Maximum replication lag */
mxs_target_t use_sql_variables_in; /**< Whether to send user variables
* to master or all nodes */
uint64_t max_sescmd_history; /**< Maximum amount of session commands to store */
bool disable_sescmd_history; /**< Disable session command history */
bool master_accept_reads; /**< Use master for reads */
bool strict_multi_stmt; /**< Force non-multistatement queries to be routed
* to the master after a multistatement query. */
enum failure_mode master_failure_mode; /**< Master server failure handling mode.
* @see enum failure_mode */
bool retry_failed_reads; /**< Retry failed reads on other servers */
int connection_keepalive; /**< Send pings to servers that have
* been idle for too long */
};
/** Reply state change debug logging */
#define LOG_RS(a, b) MXS_DEBUG("%s %s -> %s", (a)->uri(), \
rstostr((a)->get_reply_state()), rstostr(b));
static inline bool is_ps_command(uint8_t cmd)
{
@ -309,12 +444,14 @@ typedef std::tr1::unordered_map<uint32_t, SRWBackend> ExecMap;
*/
struct ROUTER_CLIENT_SES
{
ROUTER_CLIENT_SES(const Config& config);
skygw_chk_t rses_chk_top;
bool rses_closed; /**< true when closeSession is called */
SRWBackendList backends; /**< List of backend servers */
SRWBackend current_master; /**< Current master server */
SRWBackend target_node; /**< The currently locked target node */
rwsplit_config_t rses_config; /**< copied config info from router instance */
Config rses_config; /**< copied config info from router instance */
int rses_nbackends;
enum ld_state load_data_state; /**< Current load data state */
bool have_tmp_tables;
@ -323,7 +460,7 @@ struct ROUTER_CLIENT_SES
uint64_t sescmd_count;
int expected_responses; /**< Number of expected responses to the current query */
GWBUF* query_queue; /**< Queued commands waiting to be executed */
struct ROUTER_INSTANCE *router; /**< The router instance */
class RWSplit *router; /**< The router instance */
struct ROUTER_CLIENT_SES *next;
TableSet temp_tables; /**< Set of temporary tables */
mxs::SessionCommandList sescmd_list; /**< List of executed session commands */
@ -336,63 +473,6 @@ struct ROUTER_CLIENT_SES
skygw_chk_t rses_chk_tail;
};
/**
* The statistics for this router instance
*/
struct ROUTER_STATS
{
uint64_t n_sessions; /**< Number sessions created */
uint64_t n_queries; /**< Number of queries forwarded */
uint64_t n_master; /**< Number of stmts sent to master */
uint64_t n_slave; /**< Number of stmts sent to slave */
uint64_t n_all; /**< Number of stmts sent to all */
};
/**
* The per instance data for the router.
*/
struct ROUTER_INSTANCE
{
SERVICE* service; /**< Pointer to service */
rwsplit_config_t rwsplit_config; /**< expanded config info from SERVICE */
int rwsplit_version; /**< version number for router's config */
ROUTER_STATS stats; /**< Statistics for this router */
bool available_slaves; /**< The router has some slaves avialable */
};
/**
* @brief Route a stored query
*
* When multiple queries are executed in a pipeline fashion, the readwritesplit
* stores the extra queries in a queue. This queue is emptied after reading a
* reply from the backend server.
*
* @param rses Router client session
* @return True if a stored query was routed successfully
*/
bool route_stored_query(ROUTER_CLIENT_SES *rses);
static inline const char* select_criteria_to_str(select_criteria_t type)
{
switch (type)
{
case LEAST_GLOBAL_CONNECTIONS:
return "LEAST_GLOBAL_CONNECTIONS";
case LEAST_ROUTER_CONNECTIONS:
return "LEAST_ROUTER_CONNECTIONS";
case LEAST_BEHIND_MASTER:
return "LEAST_BEHIND_MASTER";
case LEAST_CURRENT_OPERATIONS:
return "LEAST_CURRENT_OPERATIONS";
default:
return "UNDEFINED_CRITERIA";
}
}
/**
* Helper function to convert reply_state_t to string
*/
@ -416,22 +496,3 @@ static inline const char* rstostr(reply_state_t state)
ss_dassert(false);
return "UNKNOWN";
}
static inline const char* failure_mode_to_str(enum failure_mode type)
{
switch (type)
{
case RW_FAIL_INSTANTLY:
return "fail_instantly";
case RW_FAIL_ON_WRITE:
return "fail_on_write";
case RW_ERROR_ON_WRITE:
return "error_on_write";
default:
ss_dassert(false);
return "UNDEFINED_MODE";
}
}

View File

@ -34,14 +34,14 @@ do{ \
/*
* The following are implemented in rwsplit_mysql.c
*/
bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
bool route_single_stmt(RWSplit *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf);
void closed_session_reply(GWBUF *querybuf);
void print_error_packet(ROUTER_CLIENT_SES *rses, GWBUF *buf, DCB *dcb);
void check_session_command_reply(GWBUF *writebuf, SRWBackend bref);
bool execute_sescmd_in_backend(SRWBackend& backend_ref);
bool handle_target_is_all(route_target_t route_target,
ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
RWSplit *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, int packet_type, uint32_t qtype);
uint8_t determine_packet_type(GWBUF *querybuf, bool *non_empty_packet);
void log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint32_t qtype);
@ -60,7 +60,7 @@ int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses);
* The following are implemented in rwsplit_route_stmt.c
*/
bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
bool route_single_stmt(RWSplit *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf);
SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
char *name, int max_rlag);
@ -70,11 +70,11 @@ void handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
uint8_t packet_type, uint32_t *qtype);
SRWBackend handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
route_target_t route_target);
SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
SRWBackend handle_slave_is_target(RWSplit *inst, ROUTER_CLIENT_SES *rses,
uint8_t cmd, uint32_t id);
bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
bool handle_master_is_target(RWSplit *inst, ROUTER_CLIENT_SES *rses,
SRWBackend* dest);
bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
bool handle_got_target(RWSplit *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, SRWBackend& target, bool store);
bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
uint8_t command, uint32_t type);
@ -96,7 +96,7 @@ bool select_connect_backend_servers(int router_nservers,
int max_nslaves,
select_criteria_t select_criteria,
MXS_SESSION *session,
ROUTER_INSTANCE *router,
RWSplit *router,
ROUTER_CLIENT_SES *rses,
connection_type type);

View File

@ -180,7 +180,7 @@ log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint32_t qtype)
* @param qtype Query type
* @return bool indicating whether the session can continue
*/
bool handle_target_is_all(route_target_t route_target, ROUTER_INSTANCE *inst,
bool handle_target_is_all(route_target_t route_target, RWSplit *inst,
ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
int packet_type, uint32_t qtype)
{
@ -217,7 +217,7 @@ bool handle_target_is_all(route_target_t route_target, ROUTER_INSTANCE *inst,
{
result = true;
atomic_add_uint64(&inst->stats.n_all, 1);
atomic_add_uint64(&inst->stats().n_all, 1);
}
return result;

View File

@ -61,13 +61,13 @@ static SRWBackend compare_backends(SRWBackend a, SRWBackend b, select_criteria_t
return p(a, b) < 0 ? a : b;
}
void handle_connection_keepalive(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
void handle_connection_keepalive(RWSplit *inst, ROUTER_CLIENT_SES *rses,
SRWBackend& target)
{
ss_dassert(target);
ss_debug(int nserv = 0);
/** Each heartbeat is 1/10th of a second */
int keepalive = inst->rwsplit_config.connection_keepalive * 10;
int keepalive = inst->config().connection_keepalive * 10;
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
@ -123,7 +123,7 @@ void replace_stmt_id(GWBUF* buffer, uint32_t id)
* @return true if routing succeed or if it failed due to unsupported query.
* false if backend failure was encountered.
*/
bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
bool route_single_stmt(RWSplit *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf)
{
route_target_t route_target;
@ -246,7 +246,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
}
}
if (succp && inst->rwsplit_config.connection_keepalive &&
if (succp && inst->config().connection_keepalive &&
(TARGET_IS_SLAVE(route_target) || TARGET_IS_MASTER(route_target)))
{
handle_connection_keepalive(inst, rses, target);
@ -930,7 +930,7 @@ SRWBackend handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
*
* @return bool - true if succeeded, false otherwise
*/
SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
SRWBackend handle_slave_is_target(RWSplit *inst, ROUTER_CLIENT_SES *rses,
uint8_t cmd, uint32_t stmt_id)
{
int rlag_max = rses_get_max_replication_lag(rses);
@ -960,7 +960,7 @@ SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses
if (target)
{
atomic_add_uint64(&inst->stats.n_slave, 1);
atomic_add_uint64(&inst->stats().n_slave, 1);
}
else
{
@ -1024,7 +1024,7 @@ static void log_master_routing_failure(ROUTER_CLIENT_SES *rses, bool found,
}
MXS_WARNING("[%s] Write query received from %s@%s. %s. Closing client connection.",
rses->router->service->name, rses->client_dcb->user,
rses->router->service()->name, rses->client_dcb->user,
rses->client_dcb->remote, errmsg);
}
@ -1039,7 +1039,7 @@ static void log_master_routing_failure(ROUTER_CLIENT_SES *rses, bool found,
*
* @return bool - true if succeeded, false otherwise
*/
bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
bool handle_master_is_target(RWSplit *inst, ROUTER_CLIENT_SES *rses,
SRWBackend* dest)
{
SRWBackend target = get_target_backend(rses, BE_MASTER, NULL, MAX_RLAG_UNDEFINED);
@ -1047,7 +1047,7 @@ bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
if (target && target == rses->current_master)
{
atomic_add_uint64(&inst->stats.n_master, 1);
atomic_add_uint64(&inst->stats().n_master, 1);
}
else
{
@ -1085,7 +1085,7 @@ static inline bool query_creates_reply(mysql_server_cmd_t cmd)
*
* @return True on success
*/
bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
bool handle_got_target(RWSplit *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, SRWBackend& target, bool store)
{
/**
@ -1122,7 +1122,7 @@ bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
MXS_ERROR("Failed to store current statement, it won't be retried if it fails.");
}
atomic_add_uint64(&inst->stats.n_queries, 1);
atomic_add_uint64(&inst->stats().n_queries, 1);
if (response == mxs::Backend::EXPECT_RESPONSE)
{

View File

@ -290,7 +290,7 @@ bool select_connect_backend_servers(int router_nservers,
int max_nslaves,
select_criteria_t select_criteria,
MXS_SESSION *session,
ROUTER_INSTANCE *router,
RWSplit *router,
ROUTER_CLIENT_SES *rses,
connection_type type)
{
@ -298,7 +298,7 @@ bool select_connect_backend_servers(int router_nservers,
SERVER_REF *master_backend = get_root_master(rses);
SERVER *master_host = master_backend ? master_backend->server : NULL;
if (router->rwsplit_config.master_failure_mode == RW_FAIL_INSTANTLY &&
if (router->config().master_failure_mode == RW_FAIL_INSTANTLY &&
(master_host == NULL || SERVER_IS_DOWN(master_host)))
{
MXS_ERROR("Couldn't find suitable Master from %d candidates.", router_nservers);