From 917fe21f7270c7e1db51a3f4239c61a2d6ec7ed3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sun, 25 Jun 2017 15:00:46 +0300 Subject: [PATCH] Refactor readwritesplit instance class Changed the ROUTER_INSTANCE struct to a class and added functions for common operations. Renamed configuration and statistics structures and added constructors. Moved objects around in readwritesplit.hh to be ready for a split into multiple headers. --- .../routing/readwritesplit/readwritesplit.cc | 271 ++++++++---------- .../routing/readwritesplit/readwritesplit.hh | 269 ++++++++++------- .../readwritesplit/rwsplit_internal.hh | 14 +- .../routing/readwritesplit/rwsplit_mysql.cc | 4 +- .../readwritesplit/rwsplit_route_stmt.cc | 22 +- .../readwritesplit/rwsplit_select_backends.cc | 4 +- 6 files changed, 310 insertions(+), 274 deletions(-) diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index fb277e030..d0ba8c695 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -51,42 +51,16 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session, GWBUF *queue); -static bool rwsplit_process_router_options(ROUTER_INSTANCE *router, +static bool rwsplit_process_router_options(RWSplit *router, char **options); static void handle_error_reply_client(MXS_SESSION *ses, ROUTER_CLIENT_SES *rses, DCB *backend_dcb, GWBUF *errmsg); -static bool handle_error_new_connection(ROUTER_INSTANCE *inst, +static bool handle_error_new_connection(RWSplit *inst, ROUTER_CLIENT_SES **rses, DCB *backend_dcb, GWBUF *errmsg); static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv, - int router_nsrv, ROUTER_INSTANCE *router); - -/** - * Enum values for router parameters - */ -static const MXS_ENUM_VALUE use_sql_variables_in_values[] = -{ - {"all", TYPE_ALL}, - {"master", TYPE_MASTER}, - {NULL} -}; - -static const MXS_ENUM_VALUE slave_selection_criteria_values[] = -{ - {"LEAST_GLOBAL_CONNECTIONS", LEAST_GLOBAL_CONNECTIONS}, - {"LEAST_ROUTER_CONNECTIONS", LEAST_ROUTER_CONNECTIONS}, - {"LEAST_BEHIND_MASTER", LEAST_BEHIND_MASTER}, - {"LEAST_CURRENT_OPERATIONS", LEAST_CURRENT_OPERATIONS}, - {NULL} -}; - -static const MXS_ENUM_VALUE master_failure_mode_values[] = -{ - {"fail_instantly", RW_FAIL_INSTANTLY}, - {"fail_on_write", RW_FAIL_ON_WRITE}, - {"error_on_write", RW_ERROR_ON_WRITE}, - {NULL} -}; + int router_nsrv, RWSplit *router); +static bool route_stored_query(ROUTER_CLIENT_SES *rses); /** * Internal functions @@ -186,7 +160,7 @@ SRWBackend get_backend_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb) * @param options Router options * @return True on success, false if a configuration error was found */ -static bool rwsplit_process_router_options(ROUTER_INSTANCE *router, +static bool rwsplit_process_router_options(Config& config, char **options) { int i; @@ -226,47 +200,47 @@ static bool rwsplit_process_router_options(ROUTER_INSTANCE *router, "Allowed values are LEAST_GLOBAL_CONNECTIONS, " "LEAST_ROUTER_CONNECTIONS, LEAST_BEHIND_MASTER," "and LEAST_CURRENT_OPERATIONS.", - STRCRITERIA(router->rwsplit_config.slave_selection_criteria)); + STRCRITERIA(config.slave_selection_criteria)); success = false; } else { - router->rwsplit_config.slave_selection_criteria = c; + config.slave_selection_criteria = c; } } else if (strcmp(options[i], "max_sescmd_history") == 0) { - router->rwsplit_config.max_sescmd_history = atoi(value); + config.max_sescmd_history = atoi(value); } else if (strcmp(options[i], "disable_sescmd_history") == 0) { - router->rwsplit_config.disable_sescmd_history = config_truth_value(value); + config.disable_sescmd_history = config_truth_value(value); } else if (strcmp(options[i], "master_accept_reads") == 0) { - router->rwsplit_config.master_accept_reads = config_truth_value(value); + config.master_accept_reads = config_truth_value(value); } else if (strcmp(options[i], "strict_multi_stmt") == 0) { - router->rwsplit_config.strict_multi_stmt = config_truth_value(value); + config.strict_multi_stmt = config_truth_value(value); } else if (strcmp(options[i], "retry_failed_reads") == 0) { - router->rwsplit_config.retry_failed_reads = config_truth_value(value); + config.retry_failed_reads = config_truth_value(value); } else if (strcmp(options[i], "master_failure_mode") == 0) { if (strcasecmp(value, "fail_instantly") == 0) { - router->rwsplit_config.master_failure_mode = RW_FAIL_INSTANTLY; + config.master_failure_mode = RW_FAIL_INSTANTLY; } else if (strcasecmp(value, "fail_on_write") == 0) { - router->rwsplit_config.master_failure_mode = RW_FAIL_ON_WRITE; + config.master_failure_mode = RW_FAIL_ON_WRITE; } else if (strcasecmp(value, "error_on_write") == 0) { - router->rwsplit_config.master_failure_mode = RW_ERROR_ON_WRITE; + config.master_failure_mode = RW_ERROR_ON_WRITE; } else { @@ -287,7 +261,7 @@ static bool rwsplit_process_router_options(ROUTER_INSTANCE *router, } // TODO: Don't process parameters in readwritesplit -static bool handle_max_slaves(ROUTER_INSTANCE *router, const char *str) +static bool handle_max_slaves(Config& config, const char *str) { bool rval = true; char *endptr; @@ -295,13 +269,13 @@ static bool handle_max_slaves(ROUTER_INSTANCE *router, const char *str) if (*endptr == '%' && *(endptr + 1) == '\0') { - router->rwsplit_config.rw_max_slave_conn_percent = val; - router->rwsplit_config.max_slave_connections = 0; + config.rw_max_slave_conn_percent = val; + config.max_slave_connections = 0; } else if (*endptr == '\0') { - router->rwsplit_config.max_slave_connections = val; - router->rwsplit_config.rw_max_slave_conn_percent = 0; + config.max_slave_connections = val; + config.rw_max_slave_conn_percent = 0; } else { @@ -407,7 +381,7 @@ static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, const SRWBackend& * @return true if there are enough backend connections to continue, false if * not */ -static bool handle_error_new_connection(ROUTER_INSTANCE *inst, +static bool handle_error_new_connection(RWSplit *inst, ROUTER_CLIENT_SES **rses, DCB *backend_dcb, GWBUF *errmsg) { @@ -470,7 +444,7 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst, * Try to get replacement slave or at least the minimum * number of slave connections for router session. */ - if (inst->rwsplit_config.disable_sescmd_history) + if (inst->config().disable_sescmd_history) { succp = have_enough_servers(myrses, 1, myrses->rses_nbackends, inst) ? true : false; } @@ -496,7 +470,7 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst, * @return bool - whether enough, side effect is error logging */ static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv, - int router_nsrv, ROUTER_INSTANCE *router) + int router_nsrv, RWSplit *router) { bool succp = true; @@ -511,7 +485,7 @@ static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv, MXS_ERROR("Unable to start %s service. There are " "too few backend servers available. Found %d " "when %d is required.", - router->service->name, router_nsrv, min_nsrv); + router->service()->name, router_nsrv, min_nsrv); } else { @@ -523,7 +497,7 @@ static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv, MXS_ERROR("Unable to start %s service. There are " "too few backend servers configured in " "MaxScale.cnf. Found %d when %d is required.", - router->service->name, + router->service()->name, (rses)->rses_config.max_slave_connections, min_nsrv); } if (nservers < min_nsrv) @@ -533,7 +507,7 @@ static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv, "too few backend servers configured in " "MaxScale.cnf. Found %d%% when at least %.0f%% " "would be required.", - router->service->name, + router->service()->name, (rses)->rses_config.rw_max_slave_conn_percent, dbgpct); } } @@ -543,7 +517,17 @@ static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv, return succp; } -bool route_stored_query(ROUTER_CLIENT_SES *rses) +/** + * @brief Route a stored query + * + * When multiple queries are executed in a pipeline fashion, the readwritesplit + * stores the extra queries in a queue. This queue is emptied after reading a + * reply from the backend server. + * + * @param rses Router client session + * @return True if a stored query was routed successfully + */ +static bool route_stored_query(ROUTER_CLIENT_SES *rses) { bool rval = true; @@ -666,6 +650,36 @@ void close_all_connections(ROUTER_CLIENT_SES* rses) } } +RWSplit::RWSplit(SERVICE* service, const Config& config): + m_service(service), + m_config(config) +{ +} + +RWSplit::~RWSplit() +{ +} + +SERVICE* RWSplit::service() const +{ + return m_service; +} + +Config& RWSplit::config() +{ + return m_config; +} + +Stats& RWSplit::stats() +{ + return m_stats; +} + +ROUTER_CLIENT_SES::ROUTER_CLIENT_SES(const Config& config): + rses_config(config) +{ +} + /** * API function definitions */ @@ -684,61 +698,23 @@ void close_all_connections(ROUTER_CLIENT_SES* rses) */ static MXS_ROUTER *createInstance(SERVICE *service, char **options) { - ROUTER_INSTANCE* router = new (std::nothrow) ROUTER_INSTANCE; - if (router == NULL) + MXS_CONFIG_PARAMETER* params = service->svc_config_param; + Config config(params); + + if (!handle_max_slaves(config, config_get_string(params, "max_slave_connections")) || + (options && !rwsplit_process_router_options(config, options))) { return NULL; } - router->service = service; - - /* - * Until we know otherwise assume we have some available slaves. - */ - router->available_slaves = true; - - /** By default, the client connection is closed immediately when a master - * failure is detected */ - router->rwsplit_config.master_failure_mode = RW_FAIL_INSTANTLY; - - MXS_CONFIG_PARAMETER *params = service->svc_config_param; - - router->rwsplit_config.use_sql_variables_in = - (mxs_target_t)config_get_enum(params, "use_sql_variables_in", - use_sql_variables_in_values); - - router->rwsplit_config.slave_selection_criteria = - (select_criteria_t)config_get_enum(params, "slave_selection_criteria", - slave_selection_criteria_values); - - router->rwsplit_config.master_failure_mode = - (enum failure_mode)config_get_enum(params, "master_failure_mode", - master_failure_mode_values); - - router->rwsplit_config.max_slave_replication_lag = config_get_integer(params, "max_slave_replication_lag"); - router->rwsplit_config.retry_failed_reads = config_get_bool(params, "retry_failed_reads"); - router->rwsplit_config.strict_multi_stmt = config_get_bool(params, "strict_multi_stmt"); - router->rwsplit_config.disable_sescmd_history = config_get_bool(params, "disable_sescmd_history"); - router->rwsplit_config.max_sescmd_history = config_get_integer(params, "max_sescmd_history"); - router->rwsplit_config.master_accept_reads = config_get_bool(params, "master_accept_reads"); - router->rwsplit_config.connection_keepalive = config_get_integer(params, "connection_keepalive"); - - if (!handle_max_slaves(router, config_get_string(params, "max_slave_connections")) || - (options && !rwsplit_process_router_options(router, options))) - { - delete router; - return NULL; - } - /** These options cancel each other out */ - if (router->rwsplit_config.disable_sescmd_history && - router->rwsplit_config.max_sescmd_history > 0) + if (config.disable_sescmd_history && config.max_sescmd_history > 0) { - router->rwsplit_config.max_sescmd_history = 0; + config.max_sescmd_history = 0; } - return (MXS_ROUTER*)router; + return (MXS_ROUTER*)new (std::nothrow) RWSplit(service, config); } /** @@ -760,8 +736,8 @@ static MXS_ROUTER *createInstance(SERVICE *service, char **options) */ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *session) { - ROUTER_INSTANCE* router = (ROUTER_INSTANCE*)router_inst; - ROUTER_CLIENT_SES* client_rses = new (std::nothrow) ROUTER_CLIENT_SES; + RWSplit* router = (RWSplit*)router_inst; + ROUTER_CLIENT_SES* client_rses = new (std::nothrow) ROUTER_CLIENT_SES(router->config()); if (client_rses == NULL) { @@ -781,9 +757,8 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess client_rses->sent_sescmd = 0; client_rses->recv_sescmd = 0; client_rses->sescmd_count = 1; // Needs to be a positive number to work - memcpy(&client_rses->rses_config, &router->rwsplit_config, sizeof(client_rses->rses_config)); - int router_nservers = router->service->n_dbref; + int router_nservers = router->service()->n_dbref; const int min_nservers = 1; /*< hard-coded for now */ if (!have_enough_servers(client_rses, min_nservers, router_nservers, router)) @@ -792,7 +767,7 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess return NULL; } - for (SERVER_REF *sref = router->service->dbref; sref; sref = sref->next) + for (SERVER_REF *sref = router->service()->dbref; sref; sref = sref->next) { if (sref->active) { @@ -827,7 +802,7 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess client_rses->rses_config.max_slave_connections = n_conn; } - router->stats.n_sessions += 1; + router->stats().n_sessions += 1; return (MXS_ROUTER_SESSION*)client_rses; } @@ -907,7 +882,7 @@ static void freeSession(MXS_ROUTER *router_instance, MXS_ROUTER_SESSION *router_ */ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *querybuf) { - ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *) instance; + RWSplit *inst = (RWSplit *) instance; ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *) router_session; int rval = 0; @@ -967,50 +942,50 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, */ static void diagnostics(MXS_ROUTER *instance, DCB *dcb) { - ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; - const char *weightby = serviceGetWeightingParameter(router->service); + RWSplit *router = (RWSplit *)instance; + const char *weightby = serviceGetWeightingParameter(router->service()); double master_pct = 0.0, slave_pct = 0.0, all_pct = 0.0; dcb_printf(dcb, "\n"); dcb_printf(dcb, "\tuse_sql_variables_in: %s\n", - mxs_target_to_str(router->rwsplit_config.use_sql_variables_in)); + mxs_target_to_str(router->config().use_sql_variables_in)); dcb_printf(dcb, "\tslave_selection_criteria: %s\n", - select_criteria_to_str(router->rwsplit_config.slave_selection_criteria)); + select_criteria_to_str(router->config().slave_selection_criteria)); dcb_printf(dcb, "\tmaster_failure_mode: %s\n", - failure_mode_to_str(router->rwsplit_config.master_failure_mode)); + failure_mode_to_str(router->config().master_failure_mode)); dcb_printf(dcb, "\tmax_slave_replication_lag: %d\n", - router->rwsplit_config.max_slave_replication_lag); + router->config().max_slave_replication_lag); dcb_printf(dcb, "\tretry_failed_reads: %s\n", - router->rwsplit_config.retry_failed_reads ? "true" : "false"); + router->config().retry_failed_reads ? "true" : "false"); dcb_printf(dcb, "\tstrict_multi_stmt: %s\n", - router->rwsplit_config.strict_multi_stmt ? "true" : "false"); + router->config().strict_multi_stmt ? "true" : "false"); dcb_printf(dcb, "\tdisable_sescmd_history: %s\n", - router->rwsplit_config.disable_sescmd_history ? "true" : "false"); + router->config().disable_sescmd_history ? "true" : "false"); dcb_printf(dcb, "\tmax_sescmd_history: %lu\n", - router->rwsplit_config.max_sescmd_history); + router->config().max_sescmd_history); dcb_printf(dcb, "\tmaster_accept_reads: %s\n", - router->rwsplit_config.master_accept_reads ? "true" : "false"); + router->config().master_accept_reads ? "true" : "false"); dcb_printf(dcb, "\n"); - if (router->stats.n_queries > 0) + if (router->stats().n_queries > 0) { - master_pct = ((double)router->stats.n_master / (double)router->stats.n_queries) * 100.0; - slave_pct = ((double)router->stats.n_slave / (double)router->stats.n_queries) * 100.0; - all_pct = ((double)router->stats.n_all / (double)router->stats.n_queries) * 100.0; + master_pct = ((double)router->stats().n_master / (double)router->stats().n_queries) * 100.0; + slave_pct = ((double)router->stats().n_slave / (double)router->stats().n_queries) * 100.0; + all_pct = ((double)router->stats().n_all / (double)router->stats().n_queries) * 100.0; } dcb_printf(dcb, "\tNumber of router sessions: %" PRIu64 "\n", - router->stats.n_sessions); + router->stats().n_sessions); dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n", - router->service->stats.n_current); + router->service()->stats.n_current); dcb_printf(dcb, "\tNumber of queries forwarded: %" PRIu64 "\n", - router->stats.n_queries); + router->stats().n_queries); dcb_printf(dcb, "\tNumber of queries forwarded to master: %" PRIu64 " (%.2f%%)\n", - router->stats.n_master, master_pct); + router->stats().n_master, master_pct); dcb_printf(dcb, "\tNumber of queries forwarded to slave: %" PRIu64 " (%.2f%%)\n", - router->stats.n_slave, slave_pct); + router->stats().n_slave, slave_pct); dcb_printf(dcb, "\tNumber of queries forwarded to all: %" PRIu64 " (%.2f%%)\n", - router->stats.n_all, all_pct); + router->stats().n_all, all_pct); if (*weightby) { @@ -1020,7 +995,7 @@ static void diagnostics(MXS_ROUTER *instance, DCB *dcb) dcb_printf(dcb, "\t\tServer Target %% Connections " "Operations\n"); dcb_printf(dcb, "\t\t Global Router\n"); - for (SERVER_REF *ref = router->service->dbref; ref; ref = ref->next) + for (SERVER_REF *ref = router->service()->dbref; ref; ref = ref->next) { dcb_printf(dcb, "\t\t%-20s %3.1f%% %-6d %-6d %d\n", ref->server->unique_name, (float)ref->weight / 10, @@ -1040,38 +1015,38 @@ static void diagnostics(MXS_ROUTER *instance, DCB *dcb) */ static json_t* diagnostics_json(const MXS_ROUTER *instance) { - ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; + RWSplit *router = (RWSplit *)instance; json_t* rval = json_object(); json_object_set_new(rval, "use_sql_variables_in", - json_string(mxs_target_to_str(router->rwsplit_config.use_sql_variables_in))); + json_string(mxs_target_to_str(router->config().use_sql_variables_in))); json_object_set_new(rval, "slave_selection_criteria", - json_string(select_criteria_to_str(router->rwsplit_config.slave_selection_criteria))); + json_string(select_criteria_to_str(router->config().slave_selection_criteria))); json_object_set_new(rval, "master_failure_mode", - json_string(failure_mode_to_str(router->rwsplit_config.master_failure_mode))); + json_string(failure_mode_to_str(router->config().master_failure_mode))); json_object_set_new(rval, "max_slave_replication_lag", - json_integer(router->rwsplit_config.max_slave_replication_lag)); + json_integer(router->config().max_slave_replication_lag)); json_object_set_new(rval, "retry_failed_reads", - json_boolean(router->rwsplit_config.retry_failed_reads)); + json_boolean(router->config().retry_failed_reads)); json_object_set_new(rval, "strict_multi_stmt", - json_boolean(router->rwsplit_config.strict_multi_stmt)); + json_boolean(router->config().strict_multi_stmt)); json_object_set_new(rval, "disable_sescmd_history", - json_boolean(router->rwsplit_config.disable_sescmd_history)); + json_boolean(router->config().disable_sescmd_history)); json_object_set_new(rval, "max_sescmd_history", - json_integer(router->rwsplit_config.max_sescmd_history)); + json_integer(router->config().max_sescmd_history)); json_object_set_new(rval, "master_accept_reads", - json_boolean(router->rwsplit_config.master_accept_reads)); + json_boolean(router->config().master_accept_reads)); - json_object_set_new(rval, "connections", json_integer(router->stats.n_sessions)); - json_object_set_new(rval, "current_connections", json_integer(router->service->stats.n_current)); - json_object_set_new(rval, "queries", json_integer(router->stats.n_queries)); - json_object_set_new(rval, "route_master", json_integer(router->stats.n_master)); - json_object_set_new(rval, "route_slave", json_integer(router->stats.n_slave)); - json_object_set_new(rval, "route_all", json_integer(router->stats.n_all)); + json_object_set_new(rval, "connections", json_integer(router->stats().n_sessions)); + json_object_set_new(rval, "current_connections", json_integer(router->service()->stats.n_current)); + json_object_set_new(rval, "queries", json_integer(router->stats().n_queries)); + json_object_set_new(rval, "route_master", json_integer(router->stats().n_master)); + json_object_set_new(rval, "route_slave", json_integer(router->stats().n_slave)); + json_object_set_new(rval, "route_all", json_integer(router->stats().n_all)); - const char *weightby = serviceGetWeightingParameter(router->service); + const char *weightby = serviceGetWeightingParameter(router->service()); if (*weightby) { @@ -1097,7 +1072,7 @@ static void clientReply(MXS_ROUTER *instance, DCB *backend_dcb) { ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session; - ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)instance; + RWSplit *router_inst = (RWSplit *)instance; DCB *client_dcb = backend_dcb->session->client_dcb; CHK_CLIENT_RSES(router_cli_ses); @@ -1153,7 +1128,7 @@ static void clientReply(MXS_ROUTER *instance, bool rconn = false; process_sescmd_response(router_cli_ses, backend, &writebuf, &rconn); - if (rconn && !router_inst->rwsplit_config.disable_sescmd_history) + if (rconn && !router_inst->config().disable_sescmd_history) { select_connect_backend_servers( router_cli_ses->rses_nbackends, @@ -1250,7 +1225,7 @@ static void handleError(MXS_ROUTER *instance, bool *succp) { ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); - ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; + RWSplit *inst = (RWSplit *)instance; ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(rses); CHK_DCB(problem_dcb); diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index 3aab902fe..075a2292b 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -13,7 +13,7 @@ */ /** - * @file router.h - The read write split router module heder file + * @file Readwritesplit common header */ #define MXS_MODULE_NAME "readwritesplit" @@ -79,6 +79,33 @@ enum failure_mode RW_ERROR_ON_WRITE /**< Don't close the connection but send an error for writes */ }; +/** + * Enum values for router parameters + */ +static const MXS_ENUM_VALUE use_sql_variables_in_values[] = +{ + {"all", TYPE_ALL}, + {"master", TYPE_MASTER}, + {NULL} +}; + +static const MXS_ENUM_VALUE slave_selection_criteria_values[] = +{ + {"LEAST_GLOBAL_CONNECTIONS", LEAST_GLOBAL_CONNECTIONS}, + {"LEAST_ROUTER_CONNECTIONS", LEAST_ROUTER_CONNECTIONS}, + {"LEAST_BEHIND_MASTER", LEAST_BEHIND_MASTER}, + {"LEAST_CURRENT_OPERATIONS", LEAST_CURRENT_OPERATIONS}, + {NULL} +}; + +static const MXS_ENUM_VALUE master_failure_mode_values[] = +{ + {"fail_instantly", RW_FAIL_INSTANTLY}, + {"fail_on_write", RW_FAIL_ON_WRITE}, + {"error_on_write", RW_ERROR_ON_WRITE}, + {NULL} +}; + /** States of a LOAD DATA LOCAL INFILE */ enum ld_state { @@ -118,12 +145,137 @@ enum ld_state #define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \ (SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED)); -/** Reply state change debug logging */ -#define LOG_RS(a, b) MXS_DEBUG("%s %s -> %s", (a)->uri(), \ - rstostr((a)->get_reply_state()), rstostr(b)); +struct Config +{ + Config(MXS_CONFIG_PARAMETER* params): + slave_selection_criteria( + (select_criteria_t)config_get_enum( + params, "slave_selection_criteria", slave_selection_criteria_values)), + use_sql_variables_in( + (mxs_target_t)config_get_enum( + params, "use_sql_variables_in", use_sql_variables_in_values)), + master_failure_mode( + (enum failure_mode)config_get_enum( + params, "master_failure_mode", master_failure_mode_values)), + max_sescmd_history(config_get_integer(params, "max_sescmd_history")), + disable_sescmd_history(config_get_bool(params, "disable_sescmd_history")), + master_accept_reads(config_get_bool(params, "master_accept_reads")), + strict_multi_stmt(config_get_bool(params, "strict_multi_stmt")), + retry_failed_reads(config_get_bool(params, "retry_failed_reads")), + connection_keepalive(config_get_integer(params, "connection_keepalive")), + max_slave_replication_lag(config_get_integer(params, "max_slave_replication_lag")), + rw_max_slave_conn_percent(0), + max_slave_connections(0) + { + } -struct ROUTER_INSTANCE; -struct ROUTER_CLIENT_SES; + select_criteria_t slave_selection_criteria; /**< The slave selection criteria */ + mxs_target_t use_sql_variables_in; /**< Whether to send user variables to + * master or all nodes */ + failure_mode master_failure_mode; /**< Master server failure handling mode */ + uint64_t max_sescmd_history; /**< Maximum amount of session commands to store */ + bool disable_sescmd_history; /**< Disable session command history */ + bool master_accept_reads; /**< Use master for reads */ + bool strict_multi_stmt; /**< Force non-multistatement queries to be routed to + * the master after a multistatement query. */ + bool retry_failed_reads; /**< Retry failed reads on other servers */ + int connection_keepalive; /**< Send pings to servers that have been idle + * for too long */ + int max_slave_replication_lag; /**< Maximum replication lag */ + int rw_max_slave_conn_percent; /**< Maximum percentage of slaves to use for + * each connection*/ + int max_slave_connections; /**< Maximum number of slaves for each connection*/ + +}; + +/** + * The statistics for this router instance + */ +struct Stats +{ +public: + + Stats(): + n_sessions(0), + n_queries(0), + n_master(0), + n_slave(0), + n_all(0) + { + } + + uint64_t n_sessions; /**< Number sessions created */ + uint64_t n_queries; /**< Number of queries forwarded */ + uint64_t n_master; /**< Number of stmts sent to master */ + uint64_t n_slave; /**< Number of stmts sent to slave */ + uint64_t n_all; /**< Number of stmts sent to all */ +}; + +/** + * The per instance data for the router. + */ +class RWSplit +{ + RWSplit(const RWSplit&); + RWSplit& operator=(const RWSplit&); + +public: + RWSplit(SERVICE* service, const Config& config); + ~RWSplit(); + + SERVICE* service() const; + Config& config(); + Stats& stats(); + +private: + SERVICE* m_service; /**< Service where the router belongs*/ + Config m_config; + Stats m_stats; +}; + +static inline const char* select_criteria_to_str(select_criteria_t type) +{ + switch (type) + { + case LEAST_GLOBAL_CONNECTIONS: + return "LEAST_GLOBAL_CONNECTIONS"; + + case LEAST_ROUTER_CONNECTIONS: + return "LEAST_ROUTER_CONNECTIONS"; + + case LEAST_BEHIND_MASTER: + return "LEAST_BEHIND_MASTER"; + + case LEAST_CURRENT_OPERATIONS: + return "LEAST_CURRENT_OPERATIONS"; + + default: + return "UNDEFINED_CRITERIA"; + } +} + +static inline const char* failure_mode_to_str(enum failure_mode type) +{ + switch (type) + { + case RW_FAIL_INSTANTLY: + return "fail_instantly"; + + case RW_FAIL_ON_WRITE: + return "fail_on_write"; + + case RW_ERROR_ON_WRITE: + return "error_on_write"; + + default: + ss_dassert(false); + return "UNDEFINED_MODE"; + } +} + +/** + * The following code is client session specific, to be moved into a separate file + */ /** Enum for tracking client reply state */ enum reply_state_t @@ -134,26 +286,9 @@ enum reply_state_t REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */ }; -struct rwsplit_config_t -{ - int rw_max_slave_conn_percent; /**< Maximum percentage of slaves - * to use for each connection*/ - int max_slave_connections; /**< Maximum number of slaves for each connection*/ - select_criteria_t slave_selection_criteria; /**< The slave selection criteria */ - int max_slave_replication_lag; /**< Maximum replication lag */ - mxs_target_t use_sql_variables_in; /**< Whether to send user variables - * to master or all nodes */ - uint64_t max_sescmd_history; /**< Maximum amount of session commands to store */ - bool disable_sescmd_history; /**< Disable session command history */ - bool master_accept_reads; /**< Use master for reads */ - bool strict_multi_stmt; /**< Force non-multistatement queries to be routed - * to the master after a multistatement query. */ - enum failure_mode master_failure_mode; /**< Master server failure handling mode. - * @see enum failure_mode */ - bool retry_failed_reads; /**< Retry failed reads on other servers */ - int connection_keepalive; /**< Send pings to servers that have - * been idle for too long */ -}; +/** Reply state change debug logging */ +#define LOG_RS(a, b) MXS_DEBUG("%s %s -> %s", (a)->uri(), \ + rstostr((a)->get_reply_state()), rstostr(b)); static inline bool is_ps_command(uint8_t cmd) { @@ -309,12 +444,14 @@ typedef std::tr1::unordered_map ExecMap; */ struct ROUTER_CLIENT_SES { + ROUTER_CLIENT_SES(const Config& config); + skygw_chk_t rses_chk_top; bool rses_closed; /**< true when closeSession is called */ SRWBackendList backends; /**< List of backend servers */ SRWBackend current_master; /**< Current master server */ SRWBackend target_node; /**< The currently locked target node */ - rwsplit_config_t rses_config; /**< copied config info from router instance */ + Config rses_config; /**< copied config info from router instance */ int rses_nbackends; enum ld_state load_data_state; /**< Current load data state */ bool have_tmp_tables; @@ -323,7 +460,7 @@ struct ROUTER_CLIENT_SES uint64_t sescmd_count; int expected_responses; /**< Number of expected responses to the current query */ GWBUF* query_queue; /**< Queued commands waiting to be executed */ - struct ROUTER_INSTANCE *router; /**< The router instance */ + class RWSplit *router; /**< The router instance */ struct ROUTER_CLIENT_SES *next; TableSet temp_tables; /**< Set of temporary tables */ mxs::SessionCommandList sescmd_list; /**< List of executed session commands */ @@ -336,63 +473,6 @@ struct ROUTER_CLIENT_SES skygw_chk_t rses_chk_tail; }; -/** - * The statistics for this router instance - */ -struct ROUTER_STATS -{ - uint64_t n_sessions; /**< Number sessions created */ - uint64_t n_queries; /**< Number of queries forwarded */ - uint64_t n_master; /**< Number of stmts sent to master */ - uint64_t n_slave; /**< Number of stmts sent to slave */ - uint64_t n_all; /**< Number of stmts sent to all */ -}; - -/** - * The per instance data for the router. - */ -struct ROUTER_INSTANCE -{ - SERVICE* service; /**< Pointer to service */ - rwsplit_config_t rwsplit_config; /**< expanded config info from SERVICE */ - int rwsplit_version; /**< version number for router's config */ - ROUTER_STATS stats; /**< Statistics for this router */ - bool available_slaves; /**< The router has some slaves avialable */ -}; - -/** - * @brief Route a stored query - * - * When multiple queries are executed in a pipeline fashion, the readwritesplit - * stores the extra queries in a queue. This queue is emptied after reading a - * reply from the backend server. - * - * @param rses Router client session - * @return True if a stored query was routed successfully - */ -bool route_stored_query(ROUTER_CLIENT_SES *rses); - -static inline const char* select_criteria_to_str(select_criteria_t type) -{ - switch (type) - { - case LEAST_GLOBAL_CONNECTIONS: - return "LEAST_GLOBAL_CONNECTIONS"; - - case LEAST_ROUTER_CONNECTIONS: - return "LEAST_ROUTER_CONNECTIONS"; - - case LEAST_BEHIND_MASTER: - return "LEAST_BEHIND_MASTER"; - - case LEAST_CURRENT_OPERATIONS: - return "LEAST_CURRENT_OPERATIONS"; - - default: - return "UNDEFINED_CRITERIA"; - } -} - /** * Helper function to convert reply_state_t to string */ @@ -416,22 +496,3 @@ static inline const char* rstostr(reply_state_t state) ss_dassert(false); return "UNKNOWN"; } - -static inline const char* failure_mode_to_str(enum failure_mode type) -{ - switch (type) - { - case RW_FAIL_INSTANTLY: - return "fail_instantly"; - - case RW_FAIL_ON_WRITE: - return "fail_on_write"; - - case RW_ERROR_ON_WRITE: - return "error_on_write"; - - default: - ss_dassert(false); - return "UNDEFINED_MODE"; - } -} diff --git a/server/modules/routing/readwritesplit/rwsplit_internal.hh b/server/modules/routing/readwritesplit/rwsplit_internal.hh index cc18c0bcc..a8b75381c 100644 --- a/server/modules/routing/readwritesplit/rwsplit_internal.hh +++ b/server/modules/routing/readwritesplit/rwsplit_internal.hh @@ -34,14 +34,14 @@ do{ \ /* * The following are implemented in rwsplit_mysql.c */ -bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, +bool route_single_stmt(RWSplit *inst, ROUTER_CLIENT_SES *rses, GWBUF *querybuf); void closed_session_reply(GWBUF *querybuf); void print_error_packet(ROUTER_CLIENT_SES *rses, GWBUF *buf, DCB *dcb); void check_session_command_reply(GWBUF *writebuf, SRWBackend bref); bool execute_sescmd_in_backend(SRWBackend& backend_ref); bool handle_target_is_all(route_target_t route_target, - ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, + RWSplit *inst, ROUTER_CLIENT_SES *rses, GWBUF *querybuf, int packet_type, uint32_t qtype); uint8_t determine_packet_type(GWBUF *querybuf, bool *non_empty_packet); void log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint32_t qtype); @@ -60,7 +60,7 @@ int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses); * The following are implemented in rwsplit_route_stmt.c */ -bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, +bool route_single_stmt(RWSplit *inst, ROUTER_CLIENT_SES *rses, GWBUF *querybuf); SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype, char *name, int max_rlag); @@ -70,11 +70,11 @@ void handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t packet_type, uint32_t *qtype); SRWBackend handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, route_target_t route_target); -SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, +SRWBackend handle_slave_is_target(RWSplit *inst, ROUTER_CLIENT_SES *rses, uint8_t cmd, uint32_t id); -bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, +bool handle_master_is_target(RWSplit *inst, ROUTER_CLIENT_SES *rses, SRWBackend* dest); -bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, +bool handle_got_target(RWSplit *inst, ROUTER_CLIENT_SES *rses, GWBUF *querybuf, SRWBackend& target, bool store); bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t command, uint32_t type); @@ -96,7 +96,7 @@ bool select_connect_backend_servers(int router_nservers, int max_nslaves, select_criteria_t select_criteria, MXS_SESSION *session, - ROUTER_INSTANCE *router, + RWSplit *router, ROUTER_CLIENT_SES *rses, connection_type type); diff --git a/server/modules/routing/readwritesplit/rwsplit_mysql.cc b/server/modules/routing/readwritesplit/rwsplit_mysql.cc index a285d4c24..e7e5c32b8 100644 --- a/server/modules/routing/readwritesplit/rwsplit_mysql.cc +++ b/server/modules/routing/readwritesplit/rwsplit_mysql.cc @@ -180,7 +180,7 @@ log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint32_t qtype) * @param qtype Query type * @return bool indicating whether the session can continue */ -bool handle_target_is_all(route_target_t route_target, ROUTER_INSTANCE *inst, +bool handle_target_is_all(route_target_t route_target, RWSplit *inst, ROUTER_CLIENT_SES *rses, GWBUF *querybuf, int packet_type, uint32_t qtype) { @@ -217,7 +217,7 @@ bool handle_target_is_all(route_target_t route_target, ROUTER_INSTANCE *inst, { result = true; - atomic_add_uint64(&inst->stats.n_all, 1); + atomic_add_uint64(&inst->stats().n_all, 1); } return result; diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 549247ac9..1dd4e8279 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -61,13 +61,13 @@ static SRWBackend compare_backends(SRWBackend a, SRWBackend b, select_criteria_t return p(a, b) < 0 ? a : b; } -void handle_connection_keepalive(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, +void handle_connection_keepalive(RWSplit *inst, ROUTER_CLIENT_SES *rses, SRWBackend& target) { ss_dassert(target); ss_debug(int nserv = 0); /** Each heartbeat is 1/10th of a second */ - int keepalive = inst->rwsplit_config.connection_keepalive * 10; + int keepalive = inst->config().connection_keepalive * 10; for (SRWBackendList::iterator it = rses->backends.begin(); it != rses->backends.end(); it++) @@ -123,7 +123,7 @@ void replace_stmt_id(GWBUF* buffer, uint32_t id) * @return true if routing succeed or if it failed due to unsupported query. * false if backend failure was encountered. */ -bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, +bool route_single_stmt(RWSplit *inst, ROUTER_CLIENT_SES *rses, GWBUF *querybuf) { route_target_t route_target; @@ -246,7 +246,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, } } - if (succp && inst->rwsplit_config.connection_keepalive && + if (succp && inst->config().connection_keepalive && (TARGET_IS_SLAVE(route_target) || TARGET_IS_MASTER(route_target))) { handle_connection_keepalive(inst, rses, target); @@ -930,7 +930,7 @@ SRWBackend handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, * * @return bool - true if succeeded, false otherwise */ -SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, +SRWBackend handle_slave_is_target(RWSplit *inst, ROUTER_CLIENT_SES *rses, uint8_t cmd, uint32_t stmt_id) { int rlag_max = rses_get_max_replication_lag(rses); @@ -960,7 +960,7 @@ SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses if (target) { - atomic_add_uint64(&inst->stats.n_slave, 1); + atomic_add_uint64(&inst->stats().n_slave, 1); } else { @@ -1024,7 +1024,7 @@ static void log_master_routing_failure(ROUTER_CLIENT_SES *rses, bool found, } MXS_WARNING("[%s] Write query received from %s@%s. %s. Closing client connection.", - rses->router->service->name, rses->client_dcb->user, + rses->router->service()->name, rses->client_dcb->user, rses->client_dcb->remote, errmsg); } @@ -1039,7 +1039,7 @@ static void log_master_routing_failure(ROUTER_CLIENT_SES *rses, bool found, * * @return bool - true if succeeded, false otherwise */ -bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, +bool handle_master_is_target(RWSplit *inst, ROUTER_CLIENT_SES *rses, SRWBackend* dest) { SRWBackend target = get_target_backend(rses, BE_MASTER, NULL, MAX_RLAG_UNDEFINED); @@ -1047,7 +1047,7 @@ bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, if (target && target == rses->current_master) { - atomic_add_uint64(&inst->stats.n_master, 1); + atomic_add_uint64(&inst->stats().n_master, 1); } else { @@ -1085,7 +1085,7 @@ static inline bool query_creates_reply(mysql_server_cmd_t cmd) * * @return True on success */ -bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, +bool handle_got_target(RWSplit *inst, ROUTER_CLIENT_SES *rses, GWBUF *querybuf, SRWBackend& target, bool store) { /** @@ -1122,7 +1122,7 @@ bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, MXS_ERROR("Failed to store current statement, it won't be retried if it fails."); } - atomic_add_uint64(&inst->stats.n_queries, 1); + atomic_add_uint64(&inst->stats().n_queries, 1); if (response == mxs::Backend::EXPECT_RESPONSE) { diff --git a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc index 95a204303..d10e210e1 100644 --- a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc +++ b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc @@ -290,7 +290,7 @@ bool select_connect_backend_servers(int router_nservers, int max_nslaves, select_criteria_t select_criteria, MXS_SESSION *session, - ROUTER_INSTANCE *router, + RWSplit *router, ROUTER_CLIENT_SES *rses, connection_type type) { @@ -298,7 +298,7 @@ bool select_connect_backend_servers(int router_nservers, SERVER_REF *master_backend = get_root_master(rses); SERVER *master_host = master_backend ? master_backend->server : NULL; - if (router->rwsplit_config.master_failure_mode == RW_FAIL_INSTANTLY && + if (router->config().master_failure_mode == RW_FAIL_INSTANTLY && (master_host == NULL || SERVER_IS_DOWN(master_host))) { MXS_ERROR("Couldn't find suitable Master from %d candidates.", router_nservers);