/* * Copyright (c) 2016 MariaDB Corporation Ab * * Use of this software is governed by the Business Source License included * in the LICENSE.TXT file and at www.mariadb.com/bsl11. * * Change Date: 2020-01-01 * * On the date above, in accordance with the Business Source License, use * of this software will be governed by version 2 or later of the General * Public License. */ #include "readwritesplit.hh" #include "rwsplit_internal.hh" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /** * The entry points for the read/write query splitting router module. * * This file contains the entry points that comprise the API to the read * write query splitting router. It also contains functions that are * directly called by the entry point functions. Some of these are used by * functions in other modules of the read write split router, others are * used only within this module. */ /** Maximum number of slaves */ #define MAX_SLAVE_COUNT "255" /* * The functions that implement the router module API */ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session, GWBUF *queue); static bool rwsplit_process_router_options(ROUTER_INSTANCE *router, char **options); static void handle_error_reply_client(MXS_SESSION *ses, ROUTER_CLIENT_SES *rses, DCB *backend_dcb, GWBUF *errmsg); static bool handle_error_new_connection(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES **rses, DCB *backend_dcb, GWBUF *errmsg); static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv, int router_nsrv, ROUTER_INSTANCE *router); /** * Enum values for router parameters */ static const MXS_ENUM_VALUE use_sql_variables_in_values[] = { {"all", TYPE_ALL}, {"master", TYPE_MASTER}, {NULL} }; static const MXS_ENUM_VALUE slave_selection_criteria_values[] = { {"LEAST_GLOBAL_CONNECTIONS", LEAST_GLOBAL_CONNECTIONS}, {"LEAST_ROUTER_CONNECTIONS", LEAST_ROUTER_CONNECTIONS}, {"LEAST_BEHIND_MASTER", LEAST_BEHIND_MASTER}, {"LEAST_CURRENT_OPERATIONS", LEAST_CURRENT_OPERATIONS}, {NULL} }; static const MXS_ENUM_VALUE master_failure_mode_values[] = { {"fail_instantly", RW_FAIL_INSTANTLY}, {"fail_on_write", RW_FAIL_ON_WRITE}, {"error_on_write", RW_ERROR_ON_WRITE}, {NULL} }; /** * Internal functions */ /** * @brief Get count of backend servers that are slaves. * * Find out the number of read backend servers. * Depending on the configuration value type, either copy direct count * of slave connections or calculate the count from percentage value. * * @param rses Router client session * @param router_nservers The number of backend servers in total */ int rses_get_max_slavecount(ROUTER_CLIENT_SES *rses) { int conf_max_nslaves; int router_nservers = rses->rses_nbackends; CHK_CLIENT_RSES(rses); if (rses->rses_config.max_slave_connections > 0) { conf_max_nslaves = rses->rses_config.max_slave_connections; } else { conf_max_nslaves = (router_nservers * rses->rses_config.rw_max_slave_conn_percent) / 100; } return MXS_MIN(router_nservers - 1, MXS_MAX(1, conf_max_nslaves)); } /* * @brief Get the maximum replication lag for this router * * @param rses Router client session * @return Replication lag from configuration or very large number */ int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses) { int conf_max_rlag; CHK_CLIENT_RSES(rses); /** if there is no configured value, then longest possible int is used */ if (rses->rses_config.max_slave_replication_lag > 0) { conf_max_rlag = rses->rses_config.max_slave_replication_lag; } else { conf_max_rlag = ~(1 << 31); } return conf_max_rlag; } /** * @brief Find a back end reference that matches the given DCB * * Finds out if there is a backend reference pointing at the DCB given as * parameter. * * @param rses router client session * @param dcb DCB * * @return backend reference pointer if succeed or NULL */ SRWBackend get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb) { ss_dassert(dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); CHK_DCB(dcb); CHK_CLIENT_RSES(rses); for (SRWBackendList::iterator it = rses->backends.begin(); it != rses->backends.end(); it++) { SRWBackend& backend = *it; if (backend->dcb() == dcb) { return backend; } } /** We should always have a valid backend reference */ ss_dassert(false); return SRWBackend(); } /** * @brief Process router options * * @param router Router instance * @param options Router options * @return True on success, false if a configuration error was found */ static bool rwsplit_process_router_options(ROUTER_INSTANCE *router, char **options) { int i; char *value; select_criteria_t c; if (options == NULL) { return true; } MXS_WARNING("Router options for readwritesplit are deprecated."); bool success = true; for (i = 0; options[i]; i++) { if ((value = strchr(options[i], '=')) == NULL) { MXS_ERROR("Unsupported router option \"%s\" for readwritesplit router.", options[i]); success = false; } else { *value = 0; value++; if (strcmp(options[i], "slave_selection_criteria") == 0) { c = GET_SELECT_CRITERIA(value); ss_dassert(c == LEAST_GLOBAL_CONNECTIONS || c == LEAST_ROUTER_CONNECTIONS || c == LEAST_BEHIND_MASTER || c == LEAST_CURRENT_OPERATIONS || c == UNDEFINED_CRITERIA); if (c == UNDEFINED_CRITERIA) { MXS_ERROR("Unknown slave selection criteria \"%s\". " "Allowed values are LEAST_GLOBAL_CONNECTIONS, " "LEAST_ROUTER_CONNECTIONS, LEAST_BEHIND_MASTER," "and LEAST_CURRENT_OPERATIONS.", STRCRITERIA(router->rwsplit_config.slave_selection_criteria)); success = false; } else { router->rwsplit_config.slave_selection_criteria = c; } } else if (strcmp(options[i], "max_sescmd_history") == 0) { router->rwsplit_config.max_sescmd_history = atoi(value); } else if (strcmp(options[i], "disable_sescmd_history") == 0) { router->rwsplit_config.disable_sescmd_history = config_truth_value(value); } else if (strcmp(options[i], "master_accept_reads") == 0) { router->rwsplit_config.master_accept_reads = config_truth_value(value); } else if (strcmp(options[i], "strict_multi_stmt") == 0) { router->rwsplit_config.strict_multi_stmt = config_truth_value(value); } else if (strcmp(options[i], "retry_failed_reads") == 0) { router->rwsplit_config.retry_failed_reads = config_truth_value(value); } else if (strcmp(options[i], "master_failure_mode") == 0) { if (strcasecmp(value, "fail_instantly") == 0) { router->rwsplit_config.master_failure_mode = RW_FAIL_INSTANTLY; } else if (strcasecmp(value, "fail_on_write") == 0) { router->rwsplit_config.master_failure_mode = RW_FAIL_ON_WRITE; } else if (strcasecmp(value, "error_on_write") == 0) { router->rwsplit_config.master_failure_mode = RW_ERROR_ON_WRITE; } else { MXS_ERROR("Unknown value for 'master_failure_mode': %s", value); success = false; } } else { MXS_ERROR("Unknown router option \"%s=%s\" for readwritesplit router.", options[i], value); success = false; } } } /*< for */ return success; } // TODO: Don't process parameters in readwritesplit static bool handle_max_slaves(ROUTER_INSTANCE *router, const char *str) { bool rval = true; char *endptr; int val = strtol(str, &endptr, 10); if (*endptr == '%' && *(endptr + 1) == '\0') { router->rwsplit_config.rw_max_slave_conn_percent = val; router->rwsplit_config.max_slave_connections = 0; } else if (*endptr == '\0') { router->rwsplit_config.max_slave_connections = val; router->rwsplit_config.rw_max_slave_conn_percent = 0; } else { MXS_ERROR("Invalid value for 'max_slave_connections': %s", str); rval = false; } return rval; } /** * @brief Handle an error reply for a client * * @param ses Session * @param rses Router session * @param backend_dcb DCB for the backend server that has failed * @param errmsg GWBUF containing the error message */ static void handle_error_reply_client(MXS_SESSION *ses, ROUTER_CLIENT_SES *rses, DCB *backend_dcb, GWBUF *errmsg) { mxs_session_state_t sesstate = ses->state; DCB *client_dcb = ses->client_dcb; SRWBackend bref = get_bref_from_dcb(rses, backend_dcb); bref->close(); if (sesstate == SESSION_STATE_ROUTER_READY) { CHK_DCB(client_dcb); client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); } } static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, const SRWBackend& old, GWBUF *stored) { bool success = false; if (!session_trx_is_active(rses->client_dcb->session)) { /** * Only try to retry the read if autocommit is enabled and we are * outside of a transaction */ for (SRWBackendList::iterator it = rses->backends.begin(); it != rses->backends.end(); it++) { SRWBackend& bref = *it; if (bref->in_use() && bref != old && !SERVER_IS_MASTER(bref->server()) && SERVER_IS_SLAVE(bref->server())) { /** Found a valid candidate; a non-master slave that's in use */ if (bref->write(stored)) { MXS_INFO("Retrying failed read at '%s'.", bref->server()->unique_name); ss_dassert(bref->get_reply_state() == REPLY_STATE_DONE); LOG_RS(bref, REPLY_STATE_START); bref->set_reply_state(REPLY_STATE_START); rses->expected_responses++; success = true; break; } } } if (!success && rses->current_master && rses->current_master->in_use()) { /** * Either we failed to write to the slave or no valid slave was found. * Try to retry the read on the master. */ if (rses->current_master->write(stored)) { MXS_INFO("Retrying failed read at '%s'.", rses->current_master->server()->unique_name); LOG_RS(rses->current_master, REPLY_STATE_START); ss_dassert(rses->current_master->get_reply_state() == REPLY_STATE_DONE); rses->current_master->set_reply_state(REPLY_STATE_START); rses->expected_responses++; success = true; } } } return success; } /** * Check if there is backend reference pointing at failed DCB, and reset its * flags. Then clear DCB's callback and finally : try to find replacement(s) * for failed slave(s). * * This must be called with router lock. * * @param inst router instance * @param rses router client session * @param dcb failed DCB * @param errmsg error message which is sent to client if it is waiting * * @return true if there are enough backend connections to continue, false if * not */ static bool handle_error_new_connection(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES **rses, DCB *backend_dcb, GWBUF *errmsg) { ROUTER_CLIENT_SES *myrses = *rses; SRWBackend bref = get_bref_from_dcb(myrses, backend_dcb); MXS_SESSION* ses = backend_dcb->session; CHK_SESSION(ses); if (bref->is_waiting_result()) { /** * A query was sent through the backend and it is waiting for a reply. * Try to reroute the statement to a working server or send an error * to the client. */ GWBUF *stored = NULL; const SERVER *target; if (!session_take_stmt(backend_dcb->session, &stored, &target) || target != bref->backend()->server || !reroute_stored_statement(*rses, bref, stored)) { /** * We failed to route the stored statement or no statement was * stored for this server. Either way we can safely free the buffer * and decrement the expected response count. */ gwbuf_free(stored); myrses->expected_responses--; if (bref->session_command_count()) { /** * The backend was executing a command that requires a reply. * Send an error to the client to let it know the query has * failed. */ DCB *client_dcb = ses->client_dcb; client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); } if (myrses->expected_responses == 0) { /** * The response from this server was the last one, try to * route any stored queries */ route_stored_query(myrses); } } } /** Close the current connection */ bref->close(); int max_nslaves = rses_get_max_slavecount(myrses); bool succp; /** * Try to get replacement slave or at least the minimum * number of slave connections for router session. */ if (inst->rwsplit_config.disable_sescmd_history) { succp = have_enough_servers(myrses, 1, myrses->rses_nbackends, inst) ? true : false; } else { succp = select_connect_backend_servers(myrses->rses_nbackends, max_nslaves, myrses->rses_config.slave_selection_criteria, ses, inst, myrses, connection_type::SLAVE); } return succp; } /** * @brief Calculate whether we have enough servers to route a query * * @param p_rses Router session * @param min_nsrv Minimum number of servers that is sufficient * @param nsrv Actual number of servers * @param router Router instance * * @return bool - whether enough, side effect is error logging */ static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv, int router_nsrv, ROUTER_INSTANCE *router) { bool succp = true; /** With too few servers session is not created */ if (router_nsrv < min_nsrv || MXS_MAX((rses)->rses_config.max_slave_connections, (router_nsrv * (rses)->rses_config.rw_max_slave_conn_percent) / 100) < min_nsrv) { if (router_nsrv < min_nsrv) { MXS_ERROR("Unable to start %s service. There are " "too few backend servers available. Found %d " "when %d is required.", router->service->name, router_nsrv, min_nsrv); } else { int pct = (rses)->rses_config.rw_max_slave_conn_percent / 100; int nservers = router_nsrv * pct; if ((rses)->rses_config.max_slave_connections < min_nsrv) { MXS_ERROR("Unable to start %s service. There are " "too few backend servers configured in " "MaxScale.cnf. Found %d when %d is required.", router->service->name, (rses)->rses_config.max_slave_connections, min_nsrv); } if (nservers < min_nsrv) { double dbgpct = ((double)min_nsrv / (double)router_nsrv) * 100.0; MXS_ERROR("Unable to start %s service. There are " "too few backend servers configured in " "MaxScale.cnf. Found %d%% when at least %.0f%% " "would be required.", router->service->name, (rses)->rses_config.rw_max_slave_conn_percent, dbgpct); } } succp = false; } return succp; } bool route_stored_query(ROUTER_CLIENT_SES *rses) { bool rval = true; if (rses->query_queue) { GWBUF* query_queue = modutil_get_next_MySQL_packet(&rses->query_queue); query_queue = gwbuf_make_contiguous(query_queue); /** Store the query queue locally for the duration of the routeQuery call. * This prevents recursive calls into this function. */ GWBUF *temp_storage = rses->query_queue; rses->query_queue = NULL; if (!routeQuery((MXS_ROUTER*)rses->router, (MXS_ROUTER_SESSION*)rses, query_queue)) { rval = false; char* sql = modutil_get_SQL(query_queue); if (sql) { MXS_ERROR("Routing query \"%s\" failed.", sql); MXS_FREE(sql); } else { MXS_ERROR("Failed to route query."); } gwbuf_free(query_queue); } ss_dassert(rses->query_queue == NULL); rses->query_queue = temp_storage; } return rval; } /** * @brief Check if we have received a complete reply from the backend * * @param bref Backend reference * @param buffer Buffer containing the response * * @return True if the complete response has been received */ bool reply_is_complete(SRWBackend bref, GWBUF *buffer) { mysql_server_cmd_t cmd = mxs_mysql_current_command(bref->dcb()->session); if (bref->get_reply_state() == REPLY_STATE_START && !mxs_mysql_is_result_set(buffer)) { if (cmd == MYSQL_COM_STMT_PREPARE || !mxs_mysql_more_results_after_ok(buffer)) { /** Not a result set, we have the complete response */ LOG_RS(bref, REPLY_STATE_DONE); bref->set_reply_state(REPLY_STATE_DONE); } } else { bool more = false; int old_eof = bref->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0; int n_eof = modutil_count_signal_packets(buffer, old_eof, &more); if (n_eof == 0) { /** Waiting for the EOF packet after the column definitions */ LOG_RS(bref, REPLY_STATE_RSET_COLDEF); bref->set_reply_state(REPLY_STATE_RSET_COLDEF); } else if (n_eof == 1 && cmd != MYSQL_COM_FIELD_LIST) { /** Waiting for the EOF packet after the rows */ LOG_RS(bref, REPLY_STATE_RSET_ROWS); bref->set_reply_state(REPLY_STATE_RSET_ROWS); } else { /** We either have a complete result set or a response to * a COM_FIELD_LIST command */ ss_dassert(n_eof == 2 || (n_eof == 1 && cmd == MYSQL_COM_FIELD_LIST)); LOG_RS(bref, REPLY_STATE_DONE); bref->set_reply_state(REPLY_STATE_DONE); if (more) { /** The server will send more resultsets */ LOG_RS(bref, REPLY_STATE_START); bref->set_reply_state(REPLY_STATE_START); } } } return bref->get_reply_state() == REPLY_STATE_DONE; } void close_all_connections(ROUTER_CLIENT_SES* rses) { for (SRWBackendList::iterator it = rses->backends.begin(); it != rses->backends.end(); it++) { SRWBackend& bref = *it; if (bref->in_use()) { bref->close(); } } } /** * API function definitions */ /** * @brief Create an instance of the read/write router (API). * * Create an instance of read/write statement router within the MaxScale. One * instance of the router is required for each service that is defined in the * configuration as using this router. One instance of the router will handle * multiple connections (or router sessions). * * @param service The service this router is being create for * @param options The options for this query router * @return NULL in failure, pointer to router in success. */ static MXS_ROUTER *createInstance(SERVICE *service, char **options) { ROUTER_INSTANCE* router = new (std::nothrow) ROUTER_INSTANCE; if (router == NULL) { return NULL; } router->service = service; /* * Until we know otherwise assume we have some available slaves. */ router->available_slaves = true; /** By default, the client connection is closed immediately when a master * failure is detected */ router->rwsplit_config.master_failure_mode = RW_FAIL_INSTANTLY; MXS_CONFIG_PARAMETER *params = service->svc_config_param; router->rwsplit_config.use_sql_variables_in = (mxs_target_t)config_get_enum(params, "use_sql_variables_in", use_sql_variables_in_values); router->rwsplit_config.slave_selection_criteria = (select_criteria_t)config_get_enum(params, "slave_selection_criteria", slave_selection_criteria_values); router->rwsplit_config.master_failure_mode = (enum failure_mode)config_get_enum(params, "master_failure_mode", master_failure_mode_values); router->rwsplit_config.max_slave_replication_lag = config_get_integer(params, "max_slave_replication_lag"); router->rwsplit_config.retry_failed_reads = config_get_bool(params, "retry_failed_reads"); router->rwsplit_config.strict_multi_stmt = config_get_bool(params, "strict_multi_stmt"); router->rwsplit_config.disable_sescmd_history = config_get_bool(params, "disable_sescmd_history"); router->rwsplit_config.max_sescmd_history = config_get_integer(params, "max_sescmd_history"); router->rwsplit_config.master_accept_reads = config_get_bool(params, "master_accept_reads"); router->rwsplit_config.connection_keepalive = config_get_integer(params, "connection_keepalive"); if (!handle_max_slaves(router, config_get_string(params, "max_slave_connections")) || (options && !rwsplit_process_router_options(router, options))) { delete router; return NULL; } /** These options cancel each other out */ if (router->rwsplit_config.disable_sescmd_history && router->rwsplit_config.max_sescmd_history > 0) { router->rwsplit_config.max_sescmd_history = 0; } return (MXS_ROUTER*)router; } /** * @brief Associate a new session with this instance of the router (API). * * The session is used to store all the data required by the router for a * particular client connection. The instance of the router that relates to a * particular service is passed as the first parameter. The second parameter is * the session that has been created in response to the request from a client * for a connection. The passed session contains generic information; this * function creates the session structure that holds router specific data. * There is often a one to one relationship between sessions and router * sessions, although it is possible to create configurations where a * connection is handled by multiple routers, one after another. * * @param instance The router instance data * @param session The MaxScale session (generic connection data) * @return Session specific data for this session, i.e. a router session */ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *session) { ROUTER_INSTANCE* router = (ROUTER_INSTANCE*)router_inst; ROUTER_CLIENT_SES* client_rses = new (std::nothrow) ROUTER_CLIENT_SES; if (client_rses == NULL) { delete client_rses; return NULL; } client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; client_rses->rses_closed = false; client_rses->router = router; client_rses->client_dcb = session->client_dcb; client_rses->have_tmp_tables = false; client_rses->expected_responses = 0; client_rses->query_queue = NULL; client_rses->load_data_state = LOAD_DATA_INACTIVE; client_rses->sent_sescmd = 0; client_rses->recv_sescmd = 0; client_rses->sescmd_count = 1; // Needs to be a positive number to work memcpy(&client_rses->rses_config, &router->rwsplit_config, sizeof(client_rses->rses_config)); int router_nservers = router->service->n_dbref; const int min_nservers = 1; /*< hard-coded for now */ if (!have_enough_servers(client_rses, min_nservers, router_nservers, router)) { delete client_rses; return NULL; } for (SERVER_REF *sref = router->service->dbref; sref; sref = sref->next) { if (sref->active) { client_rses->backends.push_back(SRWBackend(new RWBackend(sref))); } } client_rses->rses_nbackends = router_nservers; /*< # of backend servers */ int max_nslaves = rses_get_max_slavecount(client_rses); if (!select_connect_backend_servers(router_nservers, max_nslaves, client_rses->rses_config.slave_selection_criteria, session, router, client_rses, connection_type::ALL)) { /** * Master and at least slaves must be found if the router is * in the strict mode. If sessions without master are allowed, only * slaves must be found. */ delete client_rses; return NULL; } if (client_rses->rses_config.rw_max_slave_conn_percent) { int n_conn = 0; double pct = (double)client_rses->rses_config.rw_max_slave_conn_percent / 100.0; n_conn = MXS_MAX(floor((double)client_rses->rses_nbackends * pct), 1); client_rses->rses_config.max_slave_connections = n_conn; } router->stats.n_sessions += 1; return (MXS_ROUTER_SESSION*)client_rses; } /** * @brief Close a router session (API). * * Close a session with the router, this is the mechanism by which a router * may cleanup data structure etc. The instance of the router that relates to * the relevant service is passed, along with the router session that is to * be closed. Typically the function is used in conjunction with freeSession * which will release the resources used by a router session (see below). * * @param instance The router instance data * @param session The router session being closed */ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session) { ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(router_cli_ses); if (!router_cli_ses->rses_closed) { /** * Mark router session as closed. @c rses_closed is checked at the start * of every API function to quickly stop the processing of closed sessions. */ router_cli_ses->rses_closed = true; close_all_connections(router_cli_ses); if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) && router_cli_ses->sescmd_list.size()) { std::string sescmdstr; for (mxs::SessionCommandList::iterator it = router_cli_ses->sescmd_list.begin(); it != router_cli_ses->sescmd_list.end(); it++) { mxs::SSessionCommand& scmd = *it; sescmdstr += scmd->to_string(); sescmdstr += "\n"; } MXS_INFO("Executed session commands:\n%s", sescmdstr.c_str()); } } } /** * @brief Free a router session (API). * * When a router session has been closed, freeSession can be called to free * allocated resources. * * @param router_instance The router instance the session belongs to * @param router_client_session Client session * */ static void freeSession(MXS_ROUTER *router_instance, MXS_ROUTER_SESSION *router_client_session) { ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session; delete router_cli_ses; } /** * @brief The main routing entry point for a query (API) * * The routeQuery function will make the routing decision based on the contents * of the instance, session and the query itself. The query always represents * a complete MariaDB/MySQL packet because we define the RCAP_TYPE_STMT_INPUT in * getCapabilities(). * * @param instance Router instance * @param router_session Router session associated with the client * @param querybuf Buffer containing the query * @return 1 on success, 0 on error */ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *querybuf) { ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *) instance; ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *) router_session; int rval = 0; CHK_CLIENT_RSES(rses); if (rses->rses_closed) { closed_session_reply(querybuf); } else { if ((rses->expected_responses == 0 && rses->query_queue == NULL) || rses->load_data_state == LOAD_DATA_ACTIVE) { /** No active or pending queries */ if (route_single_stmt(inst, rses, querybuf)) { rval = 1; } } else { ss_dassert(rses->expected_responses || rses->query_queue); /** We are already processing a request from the client. Store the * new query and wait for the previous one to complete. */ MXS_DEBUG("Storing query (len: %d cmd: %0x), expecting %d replies", gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], rses->expected_responses); rses->query_queue = gwbuf_append(rses->query_queue, querybuf); querybuf = NULL; rval = 1; if (rses->expected_responses == 0 && !route_stored_query(rses)) { rval = 0; } } } if (querybuf != NULL) { gwbuf_free(querybuf); } return rval; } /** * @brief Diagnostics routine (API) * * Print query router statistics to the DCB passed in * * @param instance The router instance * @param dcb The DCB for diagnostic output */ static void diagnostics(MXS_ROUTER *instance, DCB *dcb) { ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; const char *weightby = serviceGetWeightingParameter(router->service); double master_pct = 0.0, slave_pct = 0.0, all_pct = 0.0; dcb_printf(dcb, "\n"); dcb_printf(dcb, "\tuse_sql_variables_in: %s\n", mxs_target_to_str(router->rwsplit_config.use_sql_variables_in)); dcb_printf(dcb, "\tslave_selection_criteria: %s\n", select_criteria_to_str(router->rwsplit_config.slave_selection_criteria)); dcb_printf(dcb, "\tmaster_failure_mode: %s\n", failure_mode_to_str(router->rwsplit_config.master_failure_mode)); dcb_printf(dcb, "\tmax_slave_replication_lag: %d\n", router->rwsplit_config.max_slave_replication_lag); dcb_printf(dcb, "\tretry_failed_reads: %s\n", router->rwsplit_config.retry_failed_reads ? "true" : "false"); dcb_printf(dcb, "\tstrict_multi_stmt: %s\n", router->rwsplit_config.strict_multi_stmt ? "true" : "false"); dcb_printf(dcb, "\tdisable_sescmd_history: %s\n", router->rwsplit_config.disable_sescmd_history ? "true" : "false"); dcb_printf(dcb, "\tmax_sescmd_history: %lu\n", router->rwsplit_config.max_sescmd_history); dcb_printf(dcb, "\tmaster_accept_reads: %s\n", router->rwsplit_config.master_accept_reads ? "true" : "false"); dcb_printf(dcb, "\n"); if (router->stats.n_queries > 0) { master_pct = ((double)router->stats.n_master / (double)router->stats.n_queries) * 100.0; slave_pct = ((double)router->stats.n_slave / (double)router->stats.n_queries) * 100.0; all_pct = ((double)router->stats.n_all / (double)router->stats.n_queries) * 100.0; } dcb_printf(dcb, "\tNumber of router sessions: %" PRIu64 "\n", router->stats.n_sessions); dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n", router->service->stats.n_current); dcb_printf(dcb, "\tNumber of queries forwarded: %" PRIu64 "\n", router->stats.n_queries); dcb_printf(dcb, "\tNumber of queries forwarded to master: %" PRIu64 " (%.2f%%)\n", router->stats.n_master, master_pct); dcb_printf(dcb, "\tNumber of queries forwarded to slave: %" PRIu64 " (%.2f%%)\n", router->stats.n_slave, slave_pct); dcb_printf(dcb, "\tNumber of queries forwarded to all: %" PRIu64 " (%.2f%%)\n", router->stats.n_all, all_pct); if (*weightby) { dcb_printf(dcb, "\tConnection distribution based on %s " "server parameter.\n", weightby); dcb_printf(dcb, "\t\tServer Target %% Connections " "Operations\n"); dcb_printf(dcb, "\t\t Global Router\n"); for (SERVER_REF *ref = router->service->dbref; ref; ref = ref->next) { dcb_printf(dcb, "\t\t%-20s %3.1f%% %-6d %-6d %d\n", ref->server->unique_name, (float)ref->weight / 10, ref->server->stats.n_current, ref->connections, ref->server->stats.n_current_ops); } } } /** * @brief Diagnostics routine (API) * * Print query router statistics to the DCB passed in * * @param instance The router instance * @param dcb The DCB for diagnostic output */ static json_t* diagnostics_json(const MXS_ROUTER *instance) { ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; json_t* rval = json_object(); json_object_set_new(rval, "use_sql_variables_in", json_string(mxs_target_to_str(router->rwsplit_config.use_sql_variables_in))); json_object_set_new(rval, "slave_selection_criteria", json_string(select_criteria_to_str(router->rwsplit_config.slave_selection_criteria))); json_object_set_new(rval, "master_failure_mode", json_string(failure_mode_to_str(router->rwsplit_config.master_failure_mode))); json_object_set_new(rval, "max_slave_replication_lag", json_integer(router->rwsplit_config.max_slave_replication_lag)); json_object_set_new(rval, "retry_failed_reads", json_boolean(router->rwsplit_config.retry_failed_reads)); json_object_set_new(rval, "strict_multi_stmt", json_boolean(router->rwsplit_config.strict_multi_stmt)); json_object_set_new(rval, "disable_sescmd_history", json_boolean(router->rwsplit_config.disable_sescmd_history)); json_object_set_new(rval, "max_sescmd_history", json_integer(router->rwsplit_config.max_sescmd_history)); json_object_set_new(rval, "master_accept_reads", json_boolean(router->rwsplit_config.master_accept_reads)); json_object_set_new(rval, "connections", json_integer(router->stats.n_sessions)); json_object_set_new(rval, "current_connections", json_integer(router->service->stats.n_current)); json_object_set_new(rval, "queries", json_integer(router->stats.n_queries)); json_object_set_new(rval, "route_master", json_integer(router->stats.n_master)); json_object_set_new(rval, "route_slave", json_integer(router->stats.n_slave)); json_object_set_new(rval, "route_all", json_integer(router->stats.n_all)); const char *weightby = serviceGetWeightingParameter(router->service); if (*weightby) { json_object_set_new(rval, "weightby", json_string(weightby)); } return rval; } /** * @brief Client Reply routine (API) * * The routine will reply to client for session change with master server data * * @param instance The router instance * @param router_session The router session * @param backend_dcb The backend DCB * @param queue The GWBUF with reply data */ static void clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *writebuf, DCB *backend_dcb) { ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session; ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)instance; DCB *client_dcb = backend_dcb->session->client_dcb; CHK_CLIENT_RSES(router_cli_ses); /** * Lock router client session for secure read of router session members. * Note that this could be done without lock by using version # */ if (router_cli_ses->rses_closed) { gwbuf_free(writebuf); return; } /** * 1. Check if backend received reply to sescmd. * 2. Check sescmd's state whether OK_PACKET has been * sent to client already and if not, lock property cursor, * reply to client, and move property cursor forward. Finally * release the lock. * 3. If reply for this sescmd is sent, lock property cursor * and */ SRWBackend bref = get_bref_from_dcb(router_cli_ses, backend_dcb); /** Statement was successfully executed, free the stored statement */ session_clear_stmt(backend_dcb->session); ss_dassert(bref->get_reply_state() != REPLY_STATE_DONE); if (reply_is_complete(bref, writebuf)) { /** Got a complete reply, acknowledge the write decrement expected response count */ bref->ack_write(); router_cli_ses->expected_responses--; ss_dassert(router_cli_ses->expected_responses >= 0); ss_dassert(bref->get_reply_state() == REPLY_STATE_DONE); } else { MXS_DEBUG("Reply not yet complete, waiting for %d replies", router_cli_ses->expected_responses); } /** * Active cursor means that reply is from session command * execution. */ if (bref->session_command_count()) { check_session_command_reply(writebuf, bref); /** This discards all responses that have already been sent to the client */ bool rconn = false; process_sescmd_response(router_cli_ses, bref, &writebuf, &rconn); if (rconn && !router_inst->rwsplit_config.disable_sescmd_history) { select_connect_backend_servers( router_cli_ses->rses_nbackends, router_cli_ses->rses_config.max_slave_connections, router_cli_ses->rses_config.slave_selection_criteria, router_cli_ses->client_dcb->session, router_cli_ses->router, router_cli_ses, connection_type::SLAVE); } } bool queue_routed = false; if (router_cli_ses->expected_responses == 0) { for (SRWBackendList::iterator it = router_cli_ses->backends.begin(); it != router_cli_ses->backends.end(); it++) { ss_dassert((*it)->get_reply_state() == REPLY_STATE_DONE); } queue_routed = router_cli_ses->query_queue != NULL; route_stored_query(router_cli_ses); } else { ss_dassert(router_cli_ses->expected_responses > 0); } if (writebuf && client_dcb) { /** Write reply to client DCB */ MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); } /** Check pending session commands */ else if (!queue_routed && bref->session_command_count()) { MXS_INFO("Backend [%s]:%d processed reply and starts to execute active cursor.", bref->server()->name, bref->server()->port); if (bref->execute_session_command()) { router_cli_ses->expected_responses++; } } } /** * @brief Get router capabilities (API) * * Return a bit map indicating the characteristics of this particular router. * In this case, the only bit set indicates that the router wants to receive * data for routing as whole SQL statements. * * @return RCAP_TYPE_STMT_INPUT. */ static uint64_t getCapabilities(MXS_ROUTER* instance) { return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_STMT_OUTPUT; } /* * This is the end of the API functions, and the start of functions that are * used by the API functions and also used in other modules of the router * code. Their prototypes are included in rwsplit_internal.h since these * functions are not intended for use outside the read write split router. */ /** * @brief Router error handling routine (API) * * Error Handler routine to resolve _backend_ failures. If it succeeds then * there are enough operative backends available and connected. Otherwise it * fails, and session is terminated. * * @param instance The router instance * @param router_session The router session * @param errmsgbuf The error message to reply * @param backend_dcb The backend DCB * @param action The action: ERRACT_NEW_CONNECTION or * ERRACT_REPLY_CLIENT * @param succp Result of action: true iff router can continue * * Even if succp == true connecting to new slave may have failed. succp is to * tell whether router has enough master/slave connections to continue work. */ static void handleError(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *errmsgbuf, DCB *problem_dcb, mxs_error_action_t action, bool *succp) { ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(rses); CHK_DCB(problem_dcb); if (rses->rses_closed) { *succp = false; return; } MXS_SESSION *session = problem_dcb->session; ss_dassert(session); SRWBackend bref = get_bref_from_dcb(rses, problem_dcb); switch (action) { case ERRACT_NEW_CONNECTION: { /** * If master has lost its Master status error can't be * handled so that session could continue. */ if (rses->current_master && rses->current_master->dcb() == problem_dcb) { SERVER *srv = rses->current_master->server(); bool can_continue = false; if (rses->rses_config.master_failure_mode != RW_FAIL_INSTANTLY && (!bref || !bref->is_waiting_result())) { /** The failure of a master is not considered a critical * failure as partial functionality still remains. Reads * are allowed as long as slave servers are available * and writes will cause an error to be returned. * * If we were waiting for a response from the master, we * can't be sure whether it was executed or not. In this * case the safest thing to do is to close the client * connection. */ can_continue = true; } else if (!SERVER_IS_MASTER(srv) && !srv->master_err_is_logged) { MXS_ERROR("Server %s:%d lost the master status. Readwritesplit " "service can't locate the master. Client sessions " "will be closed.", srv->name, srv->port); srv->master_err_is_logged = true; } *succp = can_continue; if (bref) { bref->close(mxs::Backend::CLOSE_FATAL); } else { MXS_ERROR("Server %s:%d lost the master status but could not locate the " "corresponding backend ref.", srv->name, srv->port); } } else if (bref) { /** Check whether problem_dcb is same as dcb of rses->target_node * and within READ ONLY transaction: * if true reset rses->target_node and close session */ if (rses->target_node && (rses->target_node->dcb() == problem_dcb && session_trx_is_read_only(problem_dcb->session))) { MXS_ERROR("forced_node SLAVE %s in opened READ ONLY transaction has failed:" " closing session", problem_dcb->server->unique_name); rses->target_node.reset(); *succp = false; break; } /** We should reconnect only if we find a backend for this * DCB. If this DCB is an older DCB that has been closed, * we can ignore it. */ *succp = handle_error_new_connection(inst, &rses, problem_dcb, errmsgbuf); } if (bref) { /** This is a valid DCB for a backend ref */ if (bref->in_use() && bref->dcb() == problem_dcb) { ss_dassert(false); MXS_ERROR("Backend '%s' is still in use and points to the problem DCB.", bref->server()->unique_name); } } else { const char *remote = problem_dcb->state == DCB_STATE_POLLING && problem_dcb->server ? problem_dcb->server->unique_name : "CLOSED"; MXS_ERROR("DCB connected to '%s' is not in use by the router " "session, not closing it. DCB is in state '%s'", remote, STRDCBSTATE(problem_dcb->state)); } break; } case ERRACT_REPLY_CLIENT: { handle_error_reply_client(session, rses, problem_dcb, errmsgbuf); *succp = false; /*< no new backend servers were made available */ break; } default: ss_dassert(!true); *succp = false; break; } } MXS_BEGIN_DECLS /** * The module entry point routine. It is this routine that * must return the structure that is referred to as the * "module object", this is a structure with the set of * external entry points for this module. * * @return The module object */ MXS_MODULE *MXS_CREATE_MODULE() { static MXS_ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, freeSession, routeQuery, diagnostics, diagnostics_json, clientReply, handleError, getCapabilities, NULL }; static MXS_MODULE info = { MXS_MODULE_API_ROUTER, MXS_MODULE_GA, MXS_ROUTER_VERSION, "A Read/Write splitting router for enhancement read scalability", "V1.1.0", RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_STMT_OUTPUT, &MyObject, NULL, /* Process init. */ NULL, /* Process finish. */ NULL, /* Thread init. */ NULL, /* Thread finish. */ { { "use_sql_variables_in", MXS_MODULE_PARAM_ENUM, "all", MXS_MODULE_OPT_NONE, use_sql_variables_in_values }, { "slave_selection_criteria", MXS_MODULE_PARAM_ENUM, "LEAST_CURRENT_OPERATIONS", MXS_MODULE_OPT_NONE, slave_selection_criteria_values }, { "master_failure_mode", MXS_MODULE_PARAM_ENUM, "fail_instantly", MXS_MODULE_OPT_NONE, master_failure_mode_values }, {"max_slave_replication_lag", MXS_MODULE_PARAM_INT, "-1"}, {"max_slave_connections", MXS_MODULE_PARAM_STRING, MAX_SLAVE_COUNT}, {"retry_failed_reads", MXS_MODULE_PARAM_BOOL, "true"}, {"disable_sescmd_history", MXS_MODULE_PARAM_BOOL, "true"}, {"max_sescmd_history", MXS_MODULE_PARAM_COUNT, "0"}, {"strict_multi_stmt", MXS_MODULE_PARAM_BOOL, "true"}, {"master_accept_reads", MXS_MODULE_PARAM_BOOL, "false"}, {"connection_keepalive", MXS_MODULE_PARAM_COUNT, "0"}, {MXS_END_MODULE_PARAMS} } }; MXS_NOTICE("Initializing statement-based read/write split router module."); return &info; } MXS_END_DECLS