Remove unnecessary SConfig from readwritesplit

The configuration doesn't need to be contained in shared pointer as each
session holds its own version of it. This removes most of the overhead in
configuration reloading. The only thing that's left is any overhead added
by the use of thread-local storage.
This commit is contained in:
Markus Mäkelä
2018-08-01 10:25:15 +03:00
parent 403b9e09f5
commit c01840ffb3
7 changed files with 116 additions and 117 deletions

View File

@ -58,7 +58,7 @@ using namespace maxscale;
* @param options Router options * @param options Router options
* @return True on success, false if a configuration error was found * @return True on success, false if a configuration error was found
*/ */
static bool rwsplit_process_router_options(SConfig config, char **options) static bool rwsplit_process_router_options(Config& config, char **options)
{ {
if (options == NULL) if (options == NULL)
{ {
@ -94,51 +94,51 @@ static bool rwsplit_process_router_options(SConfig config, char **options)
"Allowed values are LEAST_GLOBAL_CONNECTIONS, " "Allowed values are LEAST_GLOBAL_CONNECTIONS, "
"LEAST_ROUTER_CONNECTIONS, LEAST_BEHIND_MASTER," "LEAST_ROUTER_CONNECTIONS, LEAST_BEHIND_MASTER,"
"and LEAST_CURRENT_OPERATIONS.", "and LEAST_CURRENT_OPERATIONS.",
STRCRITERIA(config->slave_selection_criteria)); STRCRITERIA(config.slave_selection_criteria));
success = false; success = false;
} }
else else
{ {
config->slave_selection_criteria = c; config.slave_selection_criteria = c;
} }
} }
else if (strcmp(options[i], "max_sescmd_history") == 0) else if (strcmp(options[i], "max_sescmd_history") == 0)
{ {
config->max_sescmd_history = atoi(value); config.max_sescmd_history = atoi(value);
} }
else if (strcmp(options[i], "disable_sescmd_history") == 0) else if (strcmp(options[i], "disable_sescmd_history") == 0)
{ {
config->disable_sescmd_history = config_truth_value(value); config.disable_sescmd_history = config_truth_value(value);
} }
else if (strcmp(options[i], "master_accept_reads") == 0) else if (strcmp(options[i], "master_accept_reads") == 0)
{ {
config->master_accept_reads = config_truth_value(value); config.master_accept_reads = config_truth_value(value);
} }
else if (strcmp(options[i], "strict_multi_stmt") == 0) else if (strcmp(options[i], "strict_multi_stmt") == 0)
{ {
config->strict_multi_stmt = config_truth_value(value); config.strict_multi_stmt = config_truth_value(value);
} }
else if (strcmp(options[i], "strict_sp_calls") == 0) else if (strcmp(options[i], "strict_sp_calls") == 0)
{ {
config->strict_sp_calls = config_truth_value(value); config.strict_sp_calls = config_truth_value(value);
} }
else if (strcmp(options[i], "retry_failed_reads") == 0) else if (strcmp(options[i], "retry_failed_reads") == 0)
{ {
config->retry_failed_reads = config_truth_value(value); config.retry_failed_reads = config_truth_value(value);
} }
else if (strcmp(options[i], "master_failure_mode") == 0) else if (strcmp(options[i], "master_failure_mode") == 0)
{ {
if (strcasecmp(value, "fail_instantly") == 0) if (strcasecmp(value, "fail_instantly") == 0)
{ {
config->master_failure_mode = RW_FAIL_INSTANTLY; config.master_failure_mode = RW_FAIL_INSTANTLY;
} }
else if (strcasecmp(value, "fail_on_write") == 0) else if (strcasecmp(value, "fail_on_write") == 0)
{ {
config->master_failure_mode = RW_FAIL_ON_WRITE; config.master_failure_mode = RW_FAIL_ON_WRITE;
} }
else if (strcasecmp(value, "error_on_write") == 0) else if (strcasecmp(value, "error_on_write") == 0)
{ {
config->master_failure_mode = RW_ERROR_ON_WRITE; config.master_failure_mode = RW_ERROR_ON_WRITE;
} }
else else
{ {
@ -159,7 +159,7 @@ static bool rwsplit_process_router_options(SConfig config, char **options)
} }
// TODO: Don't process parameters in readwritesplit // TODO: Don't process parameters in readwritesplit
static bool handle_max_slaves(SConfig config, const char *str) static bool handle_max_slaves(Config& config, const char *str)
{ {
bool rval = true; bool rval = true;
char *endptr; char *endptr;
@ -167,13 +167,13 @@ static bool handle_max_slaves(SConfig config, const char *str)
if (*endptr == '%' && *(endptr + 1) == '\0') if (*endptr == '%' && *(endptr + 1) == '\0')
{ {
config->rw_max_slave_conn_percent = val; config.rw_max_slave_conn_percent = val;
config->max_slave_connections = 0; config.max_slave_connections = 0;
} }
else if (*endptr == '\0') else if (*endptr == '\0')
{ {
config->max_slave_connections = val; config.max_slave_connections = val;
config->rw_max_slave_conn_percent = 0; config.rw_max_slave_conn_percent = 0;
} }
else else
{ {
@ -184,7 +184,7 @@ static bool handle_max_slaves(SConfig config, const char *str)
return rval; return rval;
} }
RWSplit::RWSplit(SERVICE* service, SConfig config): RWSplit::RWSplit(SERVICE* service, const Config& config):
mxs::Router<RWSplit, RWSplitSession>(service), mxs::Router<RWSplit, RWSplitSession>(service),
m_service(service), m_service(service),
m_config(config), m_config(config),
@ -194,7 +194,7 @@ RWSplit::RWSplit(SERVICE* service, SConfig config):
static void data_destroy_callback(void* data) static void data_destroy_callback(void* data)
{ {
SConfig* my_config = static_cast<SConfig*>(data); Config* my_config = static_cast<Config*>(data);
delete my_config; delete my_config;
} }
@ -208,16 +208,17 @@ SERVICE* RWSplit::service() const
return m_service; return m_service;
} }
SConfig* RWSplit::get_local_config() const Config* RWSplit::get_local_config() const
{ {
SConfig* my_config = static_cast<SConfig*>(mxs_rworker_get_data(m_wkey)); Config* my_config = static_cast<Config*>(mxs_rworker_get_data(m_wkey));
if (my_config == nullptr) if (my_config == nullptr)
{ {
// First time we get the configuration, create and update it // First time we get the configuration, create and update it
my_config = new SConfig; m_lock.lock();
my_config = new Config(m_config);
m_lock.unlock();
mxs_rworker_set_data(m_wkey, my_config, data_destroy_callback); mxs_rworker_set_data(m_wkey, my_config, data_destroy_callback);
update_local_config();
} }
ss_dassert(my_config); ss_dassert(my_config);
@ -226,7 +227,7 @@ SConfig* RWSplit::get_local_config() const
void RWSplit::update_local_config() const void RWSplit::update_local_config() const
{ {
SConfig* my_config = get_local_config(); Config* my_config = get_local_config();
m_lock.lock(); m_lock.lock();
*my_config = m_config; *my_config = m_config;
@ -239,7 +240,7 @@ void RWSplit::update_config(void* data)
inst->update_local_config(); inst->update_local_config();
} }
void RWSplit::store_config(SConfig config) void RWSplit::store_config(const Config& config)
{ {
m_lock.lock(); m_lock.lock();
m_config = config; m_config = config;
@ -249,7 +250,7 @@ void RWSplit::store_config(SConfig config)
mxs_rworker_broadcast(update_config, this); mxs_rworker_broadcast(update_config, this);
} }
SConfig RWSplit::config() const const Config& RWSplit::config() const
{ {
return *get_local_config(); return *get_local_config();
} }
@ -266,9 +267,9 @@ const Stats& RWSplit::stats() const
int RWSplit::max_slave_count() const int RWSplit::max_slave_count() const
{ {
int router_nservers = m_service->n_dbref; int router_nservers = m_service->n_dbref;
int conf_max_nslaves = m_config->max_slave_connections > 0 ? int conf_max_nslaves = m_config.max_slave_connections > 0 ?
m_config->max_slave_connections : m_config.max_slave_connections :
(router_nservers * m_config->rw_max_slave_conn_percent) / 100; (router_nservers * m_config.rw_max_slave_conn_percent) / 100;
return MXS_MIN(router_nservers - 1, MXS_MAX(1, conf_max_nslaves)); return MXS_MIN(router_nservers - 1, MXS_MAX(1, conf_max_nslaves));
} }
@ -278,8 +279,8 @@ bool RWSplit::have_enough_servers() const
const int min_nsrv = 1; const int min_nsrv = 1;
const int router_nsrv = m_service->n_dbref; const int router_nsrv = m_service->n_dbref;
int n_serv = MXS_MAX(m_config->max_slave_connections, int n_serv = MXS_MAX(m_config.max_slave_connections,
(router_nsrv * m_config->rw_max_slave_conn_percent) / 100); (router_nsrv * m_config.rw_max_slave_conn_percent) / 100);
/** With too few servers session is not created */ /** With too few servers session is not created */
if (router_nsrv < min_nsrv || n_serv < min_nsrv) if (router_nsrv < min_nsrv || n_serv < min_nsrv)
@ -292,15 +293,15 @@ bool RWSplit::have_enough_servers() const
} }
else else
{ {
int pct = m_config->rw_max_slave_conn_percent / 100; int pct = m_config.rw_max_slave_conn_percent / 100;
int nservers = router_nsrv * pct; int nservers = router_nsrv * pct;
if (m_config->max_slave_connections < min_nsrv) if (m_config.max_slave_connections < min_nsrv)
{ {
MXS_ERROR("Unable to start %s service. There are " MXS_ERROR("Unable to start %s service. There are "
"too few backend servers configured in " "too few backend servers configured in "
"MaxScale.cnf. Found %d when %d is required.", "MaxScale.cnf. Found %d when %d is required.",
m_service->name, m_config->max_slave_connections, min_nsrv); m_service->name, m_config.max_slave_connections, min_nsrv);
} }
if (nservers < min_nsrv) if (nservers < min_nsrv)
{ {
@ -309,7 +310,7 @@ bool RWSplit::have_enough_servers() const
"too few backend servers configured in " "too few backend servers configured in "
"MaxScale.cnf. Found %d%% when at least %.0f%% " "MaxScale.cnf. Found %d%% when at least %.0f%% "
"would be required.", m_service->name, "would be required.", m_service->name,
m_config->rw_max_slave_conn_percent, dbgpct); m_config.rw_max_slave_conn_percent, dbgpct);
} }
} }
succp = false; succp = false;
@ -345,7 +346,7 @@ RWSplit* RWSplit::create(SERVICE *service, MXS_CONFIG_PARAMETER* params)
return NULL; return NULL;
} }
SConfig config(new Config(params)); Config config(params);
if (!handle_max_slaves(config, config_get_string(params, "max_slave_connections"))) if (!handle_max_slaves(config, config_get_string(params, "max_slave_connections")))
{ {
@ -353,25 +354,25 @@ RWSplit* RWSplit::create(SERVICE *service, MXS_CONFIG_PARAMETER* params)
} }
/** These options cancel each other out */ /** These options cancel each other out */
if (config->disable_sescmd_history && config->max_sescmd_history > 0) if (config.disable_sescmd_history && config.max_sescmd_history > 0)
{ {
config->max_sescmd_history = 0; config.max_sescmd_history = 0;
} }
if (config->optimistic_trx) if (config.optimistic_trx)
{ {
// Optimistic transaction routing requires transaction replay // Optimistic transaction routing requires transaction replay
config->transaction_replay = true; config.transaction_replay = true;
} }
if (config->transaction_replay) if (config.transaction_replay)
{ {
/** /**
* Replaying transactions requires that we are able to do delayed query * Replaying transactions requires that we are able to do delayed query
* retries and reconnect to a master. * retries and reconnect to a master.
*/ */
config->delayed_retry = true; config.delayed_retry = true;
config->master_reconnection = true; config.master_reconnection = true;
} }
return new (std::nothrow) RWSplit(service, config); return new (std::nothrow) RWSplit(service, config);
@ -388,41 +389,41 @@ void RWSplit::diagnostics(DCB *dcb)
{ {
const char *weightby = serviceGetWeightingParameter(service()); const char *weightby = serviceGetWeightingParameter(service());
double master_pct = 0.0, slave_pct = 0.0, all_pct = 0.0; double master_pct = 0.0, slave_pct = 0.0, all_pct = 0.0;
SConfig cnf = config(); Config cnf = config();
dcb_printf(dcb, "\n"); dcb_printf(dcb, "\n");
dcb_printf(dcb, "\tuse_sql_variables_in: %s\n", dcb_printf(dcb, "\tuse_sql_variables_in: %s\n",
mxs_target_to_str(cnf->use_sql_variables_in)); mxs_target_to_str(cnf.use_sql_variables_in));
dcb_printf(dcb, "\tslave_selection_criteria: %s\n", dcb_printf(dcb, "\tslave_selection_criteria: %s\n",
select_criteria_to_str(cnf->slave_selection_criteria)); select_criteria_to_str(cnf.slave_selection_criteria));
dcb_printf(dcb, "\tmaster_failure_mode: %s\n", dcb_printf(dcb, "\tmaster_failure_mode: %s\n",
failure_mode_to_str(cnf->master_failure_mode)); failure_mode_to_str(cnf.master_failure_mode));
dcb_printf(dcb, "\tmax_slave_replication_lag: %d\n", dcb_printf(dcb, "\tmax_slave_replication_lag: %d\n",
cnf->max_slave_replication_lag); cnf.max_slave_replication_lag);
dcb_printf(dcb, "\tretry_failed_reads: %s\n", dcb_printf(dcb, "\tretry_failed_reads: %s\n",
cnf->retry_failed_reads ? "true" : "false"); cnf.retry_failed_reads ? "true" : "false");
dcb_printf(dcb, "\tstrict_multi_stmt: %s\n", dcb_printf(dcb, "\tstrict_multi_stmt: %s\n",
cnf->strict_multi_stmt ? "true" : "false"); cnf.strict_multi_stmt ? "true" : "false");
dcb_printf(dcb, "\tstrict_sp_calls: %s\n", dcb_printf(dcb, "\tstrict_sp_calls: %s\n",
cnf->strict_sp_calls ? "true" : "false"); cnf.strict_sp_calls ? "true" : "false");
dcb_printf(dcb, "\tdisable_sescmd_history: %s\n", dcb_printf(dcb, "\tdisable_sescmd_history: %s\n",
cnf->disable_sescmd_history ? "true" : "false"); cnf.disable_sescmd_history ? "true" : "false");
dcb_printf(dcb, "\tmax_sescmd_history: %lu\n", dcb_printf(dcb, "\tmax_sescmd_history: %lu\n",
cnf->max_sescmd_history); cnf.max_sescmd_history);
dcb_printf(dcb, "\tmaster_accept_reads: %s\n", dcb_printf(dcb, "\tmaster_accept_reads: %s\n",
cnf->master_accept_reads ? "true" : "false"); cnf.master_accept_reads ? "true" : "false");
dcb_printf(dcb, "\tconnection_keepalive: %d\n", dcb_printf(dcb, "\tconnection_keepalive: %d\n",
cnf->connection_keepalive); cnf.connection_keepalive);
dcb_printf(dcb, "\tcausal_reads: %s\n", dcb_printf(dcb, "\tcausal_reads: %s\n",
cnf->causal_reads ? "true" : "false"); cnf.causal_reads ? "true" : "false");
dcb_printf(dcb, "\tcausal_reads_timeout: %s\n", dcb_printf(dcb, "\tcausal_reads_timeout: %s\n",
cnf->causal_reads_timeout.c_str()); cnf.causal_reads_timeout.c_str());
dcb_printf(dcb, "\tmaster_reconnection: %s\n", dcb_printf(dcb, "\tmaster_reconnection: %s\n",
cnf->master_reconnection ? "true" : "false"); cnf.master_reconnection ? "true" : "false");
dcb_printf(dcb, "\tdelayed_retry: %s\n", dcb_printf(dcb, "\tdelayed_retry: %s\n",
cnf->delayed_retry ? "true" : "false"); cnf.delayed_retry ? "true" : "false");
dcb_printf(dcb, "\tdelayed_retry_timeout: %lu\n", dcb_printf(dcb, "\tdelayed_retry_timeout: %lu\n",
cnf->delayed_retry_timeout); cnf.delayed_retry_timeout);
dcb_printf(dcb, "\n"); dcb_printf(dcb, "\n");
@ -504,11 +505,11 @@ uint64_t RWSplit::getCapabilities()
bool RWSplit::configure(MXS_CONFIG_PARAMETER* params) bool RWSplit::configure(MXS_CONFIG_PARAMETER* params)
{ {
bool rval = false; bool rval = false;
SConfig cnf(new Config(params)); Config cnf(params);
if (handle_max_slaves(cnf, config_get_string(params, "max_slave_connections"))) if (handle_max_slaves(cnf, config_get_string(params, "max_slave_connections")))
{ {
store_config(std::move(cnf)); store_config(cnf);
rval = true; rval = true;
} }

View File

@ -201,8 +201,6 @@ struct Config
bool optimistic_trx; /**< Enable optimistic transactions */ bool optimistic_trx; /**< Enable optimistic transactions */
}; };
typedef std::shared_ptr<Config> SConfig;
/** /**
* The statistics for this router instance * The statistics for this router instance
*/ */
@ -229,11 +227,11 @@ class RWSplit: public mxs::Router<RWSplit, RWSplitSession>
RWSplit& operator=(const RWSplit&); RWSplit& operator=(const RWSplit&);
public: public:
RWSplit(SERVICE* service, SConfig config); RWSplit(SERVICE* service, const Config& config);
~RWSplit(); ~RWSplit();
SERVICE* service() const; SERVICE* service() const;
SConfig config() const; const Config& config() const;
Stats& stats(); Stats& stats();
const Stats& stats() const; const Stats& stats() const;
int max_slave_count() const; int max_slave_count() const;
@ -303,15 +301,15 @@ public:
private: private:
// Update configuration // Update configuration
void store_config(SConfig config); void store_config(const Config& config);
void update_local_config() const; void update_local_config() const;
SConfig* get_local_config() const; Config* get_local_config() const;
// Called when worker local data needs to be updated // Called when worker local data needs to be updated
static void update_config(void* data); static void update_config(void* data);
SERVICE* m_service; /**< Service where the router belongs*/ SERVICE* m_service; /**< Service where the router belongs*/
SConfig m_config; Config m_config;
mutable std::mutex m_lock; /**< Protects updates of m_config */ mutable std::mutex m_lock; /**< Protects updates of m_config */
Stats m_stats; Stats m_stats;

View File

@ -116,7 +116,7 @@ bool RWSplitSession::handle_target_is_all(route_target_t route_target, GWBUF *qu
{ {
// TODO: Append to the already stored session command instead of disabling history // TODO: Append to the already stored session command instead of disabling history
MXS_INFO("Large session write, have to disable session command history"); MXS_INFO("Large session write, have to disable session command history");
m_config->disable_sescmd_history = true; m_config.disable_sescmd_history = true;
continue_large_session_write(querybuf, qtype); continue_large_session_write(querybuf, qtype);
result = true; result = true;

View File

@ -84,7 +84,7 @@ void RWSplitSession::handle_connection_keepalive(SRWBackend& target)
ss_dassert(target); ss_dassert(target);
ss_debug(int nserv = 0); ss_debug(int nserv = 0);
/** Each heartbeat is 1/10th of a second */ /** Each heartbeat is 1/10th of a second */
int keepalive = m_config->connection_keepalive * 10; int keepalive = m_config.connection_keepalive * 10;
for (auto it = m_backends.begin(); it != m_backends.end(); it++) for (auto it = m_backends.begin(); it != m_backends.end(); it++)
{ {
@ -115,7 +115,7 @@ bool RWSplitSession::prepare_target(SRWBackend& target, route_target_t route_tar
if (!target->in_use()) if (!target->in_use())
{ {
ss_dassert(target->can_connect() && can_recover_servers()); ss_dassert(target->can_connect() && can_recover_servers());
ss_dassert(!TARGET_IS_MASTER(route_target) || m_config->master_reconnection); ss_dassert(!TARGET_IS_MASTER(route_target) || m_config.master_reconnection);
rval = target->connect(m_client->session, &m_sescmd_list); rval = target->connect(m_client->session, &m_sescmd_list);
MXS_INFO("Connected to '%s'", target->name()); MXS_INFO("Connected to '%s'", target->name());
@ -166,7 +166,7 @@ bool RWSplitSession::have_connected_slaves() const
bool RWSplitSession::should_try_trx_on_slave(route_target_t route_target) const bool RWSplitSession::should_try_trx_on_slave(route_target_t route_target) const
{ {
return m_config->optimistic_trx && // Optimistic transactions are enabled return m_config.optimistic_trx && // Optimistic transactions are enabled
!is_locked_to_master() && // Not locked to master !is_locked_to_master() && // Not locked to master
!m_is_replay_active && // Not replaying a transaction !m_is_replay_active && // Not replaying a transaction
m_otrx_state == OTRX_INACTIVE && // Not yet in optimistic mode m_otrx_state == OTRX_INACTIVE && // Not yet in optimistic mode
@ -255,7 +255,7 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
} }
// If delayed query retry is enabled, we need to store the current statement // If delayed query retry is enabled, we need to store the current statement
bool store_stmt = m_config->delayed_retry; bool store_stmt = m_config.delayed_retry;
if (m_qc.large_query()) if (m_qc.large_query())
{ {
@ -294,7 +294,7 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
{ {
succp = true; succp = true;
if (m_config->retry_failed_reads && if (m_config.retry_failed_reads &&
(command == MXS_COM_QUERY || command == MXS_COM_STMT_EXECUTE)) (command == MXS_COM_QUERY || command == MXS_COM_STMT_EXECUTE))
{ {
// Only commands that can contain an SQL statement should be stored // Only commands that can contain an SQL statement should be stored
@ -371,7 +371,7 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
} }
} }
if (succp && m_config->connection_keepalive && !TARGET_IS_ALL(route_target)) if (succp && m_config.connection_keepalive && !TARGET_IS_ALL(route_target))
{ {
handle_connection_keepalive(target); handle_connection_keepalive(target);
} }
@ -495,7 +495,7 @@ bool RWSplitSession::route_session_write(GWBUF *querybuf, uint8_t command, uint3
} }
} }
if (m_config->max_sescmd_history > 0 && m_sescmd_list.size() >= m_config->max_sescmd_history) if (m_config.max_sescmd_history > 0 && m_sescmd_list.size() >= m_config.max_sescmd_history)
{ {
static bool warn_history_exceeded = true; static bool warn_history_exceeded = true;
if (warn_history_exceeded) if (warn_history_exceeded)
@ -507,16 +507,16 @@ bool RWSplitSession::route_session_write(GWBUF *querybuf, uint8_t command, uint3
"command history, add `disable_sescmd_history=true` to " "command history, add `disable_sescmd_history=true` to "
"service '%s'. To increase the limit (currently %lu), add " "service '%s'. To increase the limit (currently %lu), add "
"`max_sescmd_history` to the same service and increase the value.", "`max_sescmd_history` to the same service and increase the value.",
m_router->service()->name, m_config->max_sescmd_history); m_router->service()->name, m_config.max_sescmd_history);
warn_history_exceeded = false; warn_history_exceeded = false;
} }
m_config->disable_sescmd_history = true; m_config.disable_sescmd_history = true;
m_config->max_sescmd_history = 0; m_config.max_sescmd_history = 0;
m_sescmd_list.clear(); m_sescmd_list.clear();
} }
if (m_config->disable_sescmd_history) if (m_config.disable_sescmd_history)
{ {
/** Prune stored responses */ /** Prune stored responses */
ResponseMap::iterator it = m_sescmd_responses.lower_bound(lowest_pos); ResponseMap::iterator it = m_sescmd_responses.lower_bound(lowest_pos);
@ -603,7 +603,7 @@ SRWBackend RWSplitSession::get_slave_backend(int max_rlag)
} }
else if (backend->in_use() || counts.second < m_router->max_slave_count()) else if (backend->in_use() || counts.second < m_router->max_slave_count())
{ {
if (!m_config->master_accept_reads && rval->is_master()) if (!m_config.master_accept_reads && rval->is_master())
{ {
// Pick slaves over masters with master_accept_reads=false // Pick slaves over masters with master_accept_reads=false
rval = backend; rval = backend;
@ -611,7 +611,7 @@ SRWBackend RWSplitSession::get_slave_backend(int max_rlag)
else else
{ {
// Compare the two servers and pick the best one // Compare the two servers and pick the best one
rval = compare_backends(rval, backend, m_config->slave_selection_criteria); rval = compare_backends(rval, backend, m_config.slave_selection_criteria);
} }
} }
} }
@ -629,7 +629,7 @@ SRWBackend RWSplitSession::get_master_backend()
if (master) if (master)
{ {
if (master->in_use() || (m_config->master_reconnection && master->can_connect())) if (master->in_use() || (m_config.master_reconnection && master->can_connect()))
{ {
if (master->is_master()) if (master->is_master())
{ {
@ -708,9 +708,9 @@ int RWSplitSession::get_max_replication_lag()
int conf_max_rlag; int conf_max_rlag;
/** if there is no configured value, then longest possible int is used */ /** if there is no configured value, then longest possible int is used */
if (m_config->max_slave_replication_lag > 0) if (m_config.max_slave_replication_lag > 0)
{ {
conf_max_rlag = m_config->max_slave_replication_lag; conf_max_rlag = m_config.max_slave_replication_lag;
} }
else else
{ {
@ -890,7 +890,7 @@ void RWSplitSession::log_master_routing_failure(bool found,
else else
{ {
/** We never had a master connection, the session must be in read-only mode */ /** We never had a master connection, the session must be in read-only mode */
if (m_config->master_failure_mode != RW_FAIL_INSTANTLY) if (m_config.master_failure_mode != RW_FAIL_INSTANTLY)
{ {
sprintf(errmsg, "Session is in read-only mode because it was created " sprintf(errmsg, "Session is in read-only mode because it was created "
"when no master was available"); "when no master was available");
@ -911,7 +911,7 @@ void RWSplitSession::log_master_routing_failure(bool found,
bool RWSplitSession::should_replace_master(SRWBackend& target) bool RWSplitSession::should_replace_master(SRWBackend& target)
{ {
return m_config->master_reconnection && return m_config.master_reconnection &&
// We have a target server and it's not the current master // We have a target server and it's not the current master
target && target != m_current_master && target && target != m_current_master &&
// We are not inside a transaction (also checks for autocommit=1) // We are not inside a transaction (also checks for autocommit=1)
@ -929,7 +929,7 @@ void RWSplitSession::replace_master(SRWBackend& target)
bool RWSplitSession::should_migrate_trx(SRWBackend& target) bool RWSplitSession::should_migrate_trx(SRWBackend& target)
{ {
return m_config->transaction_replay && return m_config.transaction_replay &&
// We have a target server and it's not the current master // We have a target server and it's not the current master
target && target != m_current_master && target && target != m_current_master &&
// Transaction replay is not active (replay is only attempted once) // Transaction replay is not active (replay is only attempted once)
@ -971,7 +971,7 @@ bool RWSplitSession::handle_master_is_target(SRWBackend* dest)
{ {
succp = false; succp = false;
/** The original master is not available, we can't route the write */ /** The original master is not available, we can't route the write */
if (m_config->master_failure_mode == RW_ERROR_ON_WRITE) if (m_config.master_failure_mode == RW_ERROR_ON_WRITE)
{ {
succp = send_readonly_error(m_client); succp = send_readonly_error(m_client);
@ -980,15 +980,15 @@ bool RWSplitSession::handle_master_is_target(SRWBackend* dest)
m_current_master->close(); m_current_master->close();
} }
} }
else if (!m_config->delayed_retry || else if (!m_config.delayed_retry ||
m_retry_duration >= m_config->delayed_retry_timeout) m_retry_duration >= m_config.delayed_retry_timeout)
{ {
// Cannot retry the query, log a message that routing has failed // Cannot retry the query, log a message that routing has failed
log_master_routing_failure(succp, m_current_master, target); log_master_routing_failure(succp, m_current_master, target);
} }
} }
if (!m_config->strict_multi_stmt && !m_config->strict_sp_calls && if (!m_config.strict_multi_stmt && !m_config.strict_sp_calls &&
m_target_node == m_current_master) m_target_node == m_current_master)
{ {
/** Reset the forced node as we're in relaxed multi-statement mode */ /** Reset the forced node as we're in relaxed multi-statement mode */
@ -1025,7 +1025,7 @@ GWBUF* RWSplitSession::add_prefix_wait_gtid(SERVER *server, GWBUF *origin)
GWBUF* rval = origin; GWBUF* rval = origin;
const char* wait_func = (server->server_type == SERVER_TYPE_MARIADB) ? const char* wait_func = (server->server_type == SERVER_TYPE_MARIADB) ?
MARIADB_WAIT_GTID_FUNC : MYSQL_WAIT_GTID_FUNC; MARIADB_WAIT_GTID_FUNC : MYSQL_WAIT_GTID_FUNC;
const char *gtid_wait_timeout = m_config->causal_reads_timeout.c_str(); const char *gtid_wait_timeout = m_config.causal_reads_timeout.c_str();
const char *gtid_position = m_gtid_pos.c_str(); const char *gtid_position = m_gtid_pos.c_str();
/* Create a new buffer to store prefix sql */ /* Create a new buffer to store prefix sql */
@ -1083,7 +1083,7 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool
uint8_t cmd = mxs_mysql_get_command(querybuf); uint8_t cmd = mxs_mysql_get_command(querybuf);
GWBUF *send_buf = gwbuf_clone(querybuf); GWBUF *send_buf = gwbuf_clone(querybuf);
if (m_config->causal_reads && cmd == COM_QUERY && !m_gtid_pos.empty() && if (m_config.causal_reads && cmd == COM_QUERY && !m_gtid_pos.empty() &&
target->is_slave()) target->is_slave())
{ {
// Perform the causal read only when the query is routed to a slave // Perform the causal read only when the query is routed to a slave

View File

@ -301,16 +301,16 @@ bool RWSplit::select_connect_backend_servers(MXS_SESSION *session,
connection_type type) connection_type type)
{ {
SRWBackend master = get_root_master(backends); SRWBackend master = get_root_master(backends);
SConfig cnf(config()); Config cnf(config());
if (!master && cnf->master_failure_mode == RW_FAIL_INSTANTLY) if (!master && cnf.master_failure_mode == RW_FAIL_INSTANTLY)
{ {
MXS_ERROR("Couldn't find suitable Master from %lu candidates.", backends.size()); MXS_ERROR("Couldn't find suitable Master from %lu candidates.", backends.size());
return false; return false;
} }
/** Check slave selection criteria and set compare function */ /** Check slave selection criteria and set compare function */
select_criteria_t select_criteria = cnf->slave_selection_criteria; select_criteria_t select_criteria = cnf.slave_selection_criteria;
auto cmpfun = criteria_cmpfun[select_criteria]; auto cmpfun = criteria_cmpfun[select_criteria];
ss_dassert(cmpfun); ss_dassert(cmpfun);

View File

@ -38,17 +38,17 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session,
m_gtid_pos(""), m_gtid_pos(""),
m_wait_gtid(NONE), m_wait_gtid(NONE),
m_next_seq(0), m_next_seq(0),
m_qc(this, session, m_config->use_sql_variables_in), m_qc(this, session, m_config.use_sql_variables_in),
m_retry_duration(0), m_retry_duration(0),
m_is_replay_active(false), m_is_replay_active(false),
m_can_replay_trx(true) m_can_replay_trx(true)
{ {
if (m_config->rw_max_slave_conn_percent) if (m_config.rw_max_slave_conn_percent)
{ {
int n_conn = 0; int n_conn = 0;
double pct = (double)m_config->rw_max_slave_conn_percent / 100.0; double pct = (double)m_config.rw_max_slave_conn_percent / 100.0;
n_conn = MXS_MAX(floor((double)m_nbackends * pct), 1); n_conn = MXS_MAX(floor((double)m_nbackends * pct), 1);
m_config->max_slave_connections = n_conn; m_config.max_slave_connections = n_conn;
} }
} }
@ -359,7 +359,7 @@ static void log_unexpected_response(SRWBackend& backend, GWBUF* buffer, GWBUF* c
GWBUF* RWSplitSession::handle_causal_read_reply(GWBUF *writebuf, SRWBackend& backend) GWBUF* RWSplitSession::handle_causal_read_reply(GWBUF *writebuf, SRWBackend& backend)
{ {
if (m_config->causal_reads) if (m_config.causal_reads)
{ {
if (GWBUF_IS_REPLY_OK(writebuf) && backend == m_current_master) if (GWBUF_IS_REPLY_OK(writebuf) && backend == m_current_master)
{ {
@ -480,7 +480,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
poll_fake_hangup_event(backend_dcb); poll_fake_hangup_event(backend_dcb);
} }
} }
else if (m_config->transaction_replay && m_can_replay_trx && else if (m_config.transaction_replay && m_can_replay_trx &&
session_trx_is_active(m_client->session)) session_trx_is_active(m_client->session))
{ {
if (!backend->has_session_commands()) if (!backend->has_session_commands())
@ -498,7 +498,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
size_t size{m_trx.size() + m_current_query.length()}; size_t size{m_trx.size() + m_current_query.length()};
// A transaction is open and it is eligible for replaying // A transaction is open and it is eligible for replaying
if (size < m_config->trx_max_size) if (size < m_config.trx_max_size)
{ {
/** Transaction size is OK, store the statement for replaying and /** Transaction size is OK, store the statement for replaying and
* update the checksum of the result */ * update the checksum of the result */
@ -540,7 +540,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE); ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
MXS_INFO("Reply complete, last reply from %s", backend->name()); MXS_INFO("Reply complete, last reply from %s", backend->name());
if (m_config->causal_reads) if (m_config.causal_reads)
{ {
// The reply should never be complete while we are still waiting for the header. // The reply should never be complete while we are still waiting for the header.
ss_dassert(m_wait_gtid != WAITING_FOR_HEADER); ss_dassert(m_wait_gtid != WAITING_FOR_HEADER);
@ -579,7 +579,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
} }
else if (m_is_replay_active) else if (m_is_replay_active)
{ {
ss_dassert(m_config->transaction_replay); ss_dassert(m_config.transaction_replay);
if (m_expected_responses == 0) if (m_expected_responses == 0)
{ {
@ -606,7 +606,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
return; return;
} }
} }
else if (m_config->transaction_replay && session_trx_is_ending(m_client->session)) else if (m_config.transaction_replay && session_trx_is_ending(m_client->session))
{ {
MXS_INFO("Transaction complete"); MXS_INFO("Transaction complete");
m_trx.close(); m_trx.close();
@ -668,7 +668,7 @@ bool RWSplitSession::start_trx_replay()
{ {
bool rval = false; bool rval = false;
if (!m_is_replay_active && m_config->transaction_replay && m_can_replay_trx) if (!m_is_replay_active && m_config.transaction_replay && m_can_replay_trx)
{ {
if (m_trx.have_stmts() || m_current_query.get()) if (m_trx.have_stmts() || m_current_query.get())
{ {
@ -766,7 +766,7 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
* can't be sure whether it was executed or not. In this * can't be sure whether it was executed or not. In this
* case the safest thing to do is to close the client * case the safest thing to do is to close the client
* connection. */ * connection. */
if (m_config->master_failure_mode != RW_FAIL_INSTANTLY) if (m_config.master_failure_mode != RW_FAIL_INSTANTLY)
{ {
can_continue = true; can_continue = true;
} }
@ -782,7 +782,7 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
can_continue = true; can_continue = true;
retry_query(m_current_query.release()); retry_query(m_current_query.release());
} }
else if (m_config->master_failure_mode == RW_ERROR_ON_WRITE) else if (m_config.master_failure_mode == RW_ERROR_ON_WRITE)
{ {
/** In error_on_write mode, the session can continue even /** In error_on_write mode, the session can continue even
* if the master is lost. Send a read-only error to * if the master is lost. Send a read-only error to
@ -895,7 +895,7 @@ bool RWSplitSession::handle_error_new_connection(DCB *backend_dcb, GWBUF *errmsg
*/ */
GWBUF *stored = m_current_query.release(); GWBUF *stored = m_current_query.release();
if (stored && m_config->retry_failed_reads) if (stored && m_config.retry_failed_reads)
{ {
MXS_INFO("Re-routing failed read after server '%s' failed", backend->name()); MXS_INFO("Re-routing failed read after server '%s' failed", backend->name());
retry_query(stored, 0); retry_query(stored, 0);
@ -935,7 +935,7 @@ bool RWSplitSession::handle_error_new_connection(DCB *backend_dcb, GWBUF *errmsg
* Try to get replacement slave or at least the minimum * Try to get replacement slave or at least the minimum
* number of slave connections for router session. * number of slave connections for router session.
*/ */
if (m_recv_sescmd > 0 && m_config->disable_sescmd_history) if (m_recv_sescmd > 0 && m_config.disable_sescmd_history)
{ {
succp = m_router->have_enough_servers(); succp = m_router->have_enough_servers();
} }

View File

@ -134,7 +134,7 @@ public:
mxs::SRWBackend m_current_master; /**< Current master server */ mxs::SRWBackend m_current_master; /**< Current master server */
mxs::SRWBackend m_target_node; /**< The currently locked target node */ mxs::SRWBackend m_target_node; /**< The currently locked target node */
mxs::SRWBackend m_prev_target; /**< The previous target where a query was sent */ mxs::SRWBackend m_prev_target; /**< The previous target where a query was sent */
SConfig m_config; /**< Configuration for this session */ Config m_config; /**< Configuration for this session */
int m_nbackends; /**< Number of backend servers (obsolete) */ int m_nbackends; /**< Number of backend servers (obsolete) */
DCB* m_client; /**< The client DCB */ DCB* m_client; /**< The client DCB */
uint64_t m_sescmd_count; /**< Number of executed session commands */ uint64_t m_sescmd_count; /**< Number of executed session commands */
@ -253,14 +253,14 @@ private:
* *
* @see handle_trx_replay * @see handle_trx_replay
*/ */
return m_config->delayed_retry && return m_config.delayed_retry &&
m_retry_duration < m_config->delayed_retry_timeout && m_retry_duration < m_config.delayed_retry_timeout &&
!session_trx_is_active(m_client->session); !session_trx_is_active(m_client->session);
} }
inline bool can_recover_servers() const inline bool can_recover_servers() const
{ {
return !m_config->disable_sescmd_history || m_recv_sescmd == 0; return !m_config.disable_sescmd_history || m_recv_sescmd == 0;
} }
inline bool is_large_query(GWBUF* buf) inline bool is_large_query(GWBUF* buf)