Use shared configurations in readwritesplit
By using a shared pointer instead of a plain object, we can replace the router configuration without it affecting existing sessions. This is a change that is required to enable runtime reconfiguration of readwritesplit.
This commit is contained in:
@ -56,7 +56,7 @@ using namespace maxscale;
|
|||||||
* @param options Router options
|
* @param options Router options
|
||||||
* @return True on success, false if a configuration error was found
|
* @return True on success, false if a configuration error was found
|
||||||
*/
|
*/
|
||||||
static bool rwsplit_process_router_options(Config& config, char **options)
|
static bool rwsplit_process_router_options(SConfig config, char **options)
|
||||||
{
|
{
|
||||||
if (options == NULL)
|
if (options == NULL)
|
||||||
{
|
{
|
||||||
@ -92,51 +92,51 @@ static bool rwsplit_process_router_options(Config& config, char **options)
|
|||||||
"Allowed values are LEAST_GLOBAL_CONNECTIONS, "
|
"Allowed values are LEAST_GLOBAL_CONNECTIONS, "
|
||||||
"LEAST_ROUTER_CONNECTIONS, LEAST_BEHIND_MASTER,"
|
"LEAST_ROUTER_CONNECTIONS, LEAST_BEHIND_MASTER,"
|
||||||
"and LEAST_CURRENT_OPERATIONS.",
|
"and LEAST_CURRENT_OPERATIONS.",
|
||||||
STRCRITERIA(config.slave_selection_criteria));
|
STRCRITERIA(config->slave_selection_criteria));
|
||||||
success = false;
|
success = false;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
config.slave_selection_criteria = c;
|
config->slave_selection_criteria = c;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (strcmp(options[i], "max_sescmd_history") == 0)
|
else if (strcmp(options[i], "max_sescmd_history") == 0)
|
||||||
{
|
{
|
||||||
config.max_sescmd_history = atoi(value);
|
config->max_sescmd_history = atoi(value);
|
||||||
}
|
}
|
||||||
else if (strcmp(options[i], "disable_sescmd_history") == 0)
|
else if (strcmp(options[i], "disable_sescmd_history") == 0)
|
||||||
{
|
{
|
||||||
config.disable_sescmd_history = config_truth_value(value);
|
config->disable_sescmd_history = config_truth_value(value);
|
||||||
}
|
}
|
||||||
else if (strcmp(options[i], "master_accept_reads") == 0)
|
else if (strcmp(options[i], "master_accept_reads") == 0)
|
||||||
{
|
{
|
||||||
config.master_accept_reads = config_truth_value(value);
|
config->master_accept_reads = config_truth_value(value);
|
||||||
}
|
}
|
||||||
else if (strcmp(options[i], "strict_multi_stmt") == 0)
|
else if (strcmp(options[i], "strict_multi_stmt") == 0)
|
||||||
{
|
{
|
||||||
config.strict_multi_stmt = config_truth_value(value);
|
config->strict_multi_stmt = config_truth_value(value);
|
||||||
}
|
}
|
||||||
else if (strcmp(options[i], "strict_sp_calls") == 0)
|
else if (strcmp(options[i], "strict_sp_calls") == 0)
|
||||||
{
|
{
|
||||||
config.strict_sp_calls = config_truth_value(value);
|
config->strict_sp_calls = config_truth_value(value);
|
||||||
}
|
}
|
||||||
else if (strcmp(options[i], "retry_failed_reads") == 0)
|
else if (strcmp(options[i], "retry_failed_reads") == 0)
|
||||||
{
|
{
|
||||||
config.retry_failed_reads = config_truth_value(value);
|
config->retry_failed_reads = config_truth_value(value);
|
||||||
}
|
}
|
||||||
else if (strcmp(options[i], "master_failure_mode") == 0)
|
else if (strcmp(options[i], "master_failure_mode") == 0)
|
||||||
{
|
{
|
||||||
if (strcasecmp(value, "fail_instantly") == 0)
|
if (strcasecmp(value, "fail_instantly") == 0)
|
||||||
{
|
{
|
||||||
config.master_failure_mode = RW_FAIL_INSTANTLY;
|
config->master_failure_mode = RW_FAIL_INSTANTLY;
|
||||||
}
|
}
|
||||||
else if (strcasecmp(value, "fail_on_write") == 0)
|
else if (strcasecmp(value, "fail_on_write") == 0)
|
||||||
{
|
{
|
||||||
config.master_failure_mode = RW_FAIL_ON_WRITE;
|
config->master_failure_mode = RW_FAIL_ON_WRITE;
|
||||||
}
|
}
|
||||||
else if (strcasecmp(value, "error_on_write") == 0)
|
else if (strcasecmp(value, "error_on_write") == 0)
|
||||||
{
|
{
|
||||||
config.master_failure_mode = RW_ERROR_ON_WRITE;
|
config->master_failure_mode = RW_ERROR_ON_WRITE;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -157,7 +157,7 @@ static bool rwsplit_process_router_options(Config& config, char **options)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Don't process parameters in readwritesplit
|
// TODO: Don't process parameters in readwritesplit
|
||||||
static bool handle_max_slaves(Config& config, const char *str)
|
static bool handle_max_slaves(SConfig config, const char *str)
|
||||||
{
|
{
|
||||||
bool rval = true;
|
bool rval = true;
|
||||||
char *endptr;
|
char *endptr;
|
||||||
@ -165,13 +165,13 @@ static bool handle_max_slaves(Config& config, const char *str)
|
|||||||
|
|
||||||
if (*endptr == '%' && *(endptr + 1) == '\0')
|
if (*endptr == '%' && *(endptr + 1) == '\0')
|
||||||
{
|
{
|
||||||
config.rw_max_slave_conn_percent = val;
|
config->rw_max_slave_conn_percent = val;
|
||||||
config.max_slave_connections = 0;
|
config->max_slave_connections = 0;
|
||||||
}
|
}
|
||||||
else if (*endptr == '\0')
|
else if (*endptr == '\0')
|
||||||
{
|
{
|
||||||
config.max_slave_connections = val;
|
config->max_slave_connections = val;
|
||||||
config.rw_max_slave_conn_percent = 0;
|
config->rw_max_slave_conn_percent = 0;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -182,7 +182,7 @@ static bool handle_max_slaves(Config& config, const char *str)
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
RWSplit::RWSplit(SERVICE* service, const Config& config):
|
RWSplit::RWSplit(SERVICE* service, SConfig config):
|
||||||
mxs::Router<RWSplit, RWSplitSession>(service),
|
mxs::Router<RWSplit, RWSplitSession>(service),
|
||||||
m_service(service),
|
m_service(service),
|
||||||
m_config(config)
|
m_config(config)
|
||||||
@ -198,8 +198,9 @@ SERVICE* RWSplit::service() const
|
|||||||
return m_service;
|
return m_service;
|
||||||
}
|
}
|
||||||
|
|
||||||
const Config& RWSplit::config() const
|
SConfig RWSplit::config() const
|
||||||
{
|
{
|
||||||
|
ss_dassert(m_config);
|
||||||
return m_config;
|
return m_config;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -215,9 +216,9 @@ const Stats& RWSplit::stats() const
|
|||||||
int RWSplit::max_slave_count() const
|
int RWSplit::max_slave_count() const
|
||||||
{
|
{
|
||||||
int router_nservers = m_service->n_dbref;
|
int router_nservers = m_service->n_dbref;
|
||||||
int conf_max_nslaves = m_config.max_slave_connections > 0 ?
|
int conf_max_nslaves = m_config->max_slave_connections > 0 ?
|
||||||
m_config.max_slave_connections :
|
m_config->max_slave_connections :
|
||||||
(router_nservers * m_config.rw_max_slave_conn_percent) / 100;
|
(router_nservers * m_config->rw_max_slave_conn_percent) / 100;
|
||||||
return MXS_MIN(router_nservers - 1, MXS_MAX(1, conf_max_nslaves));
|
return MXS_MIN(router_nservers - 1, MXS_MAX(1, conf_max_nslaves));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -227,8 +228,8 @@ bool RWSplit::have_enough_servers() const
|
|||||||
const int min_nsrv = 1;
|
const int min_nsrv = 1;
|
||||||
const int router_nsrv = m_service->n_dbref;
|
const int router_nsrv = m_service->n_dbref;
|
||||||
|
|
||||||
int n_serv = MXS_MAX(m_config.max_slave_connections,
|
int n_serv = MXS_MAX(m_config->max_slave_connections,
|
||||||
(router_nsrv * m_config.rw_max_slave_conn_percent) / 100);
|
(router_nsrv * m_config->rw_max_slave_conn_percent) / 100);
|
||||||
|
|
||||||
/** With too few servers session is not created */
|
/** With too few servers session is not created */
|
||||||
if (router_nsrv < min_nsrv || n_serv < min_nsrv)
|
if (router_nsrv < min_nsrv || n_serv < min_nsrv)
|
||||||
@ -241,15 +242,15 @@ bool RWSplit::have_enough_servers() const
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
int pct = m_config.rw_max_slave_conn_percent / 100;
|
int pct = m_config->rw_max_slave_conn_percent / 100;
|
||||||
int nservers = router_nsrv * pct;
|
int nservers = router_nsrv * pct;
|
||||||
|
|
||||||
if (m_config.max_slave_connections < min_nsrv)
|
if (m_config->max_slave_connections < min_nsrv)
|
||||||
{
|
{
|
||||||
MXS_ERROR("Unable to start %s service. There are "
|
MXS_ERROR("Unable to start %s service. There are "
|
||||||
"too few backend servers configured in "
|
"too few backend servers configured in "
|
||||||
"MaxScale.cnf. Found %d when %d is required.",
|
"MaxScale.cnf. Found %d when %d is required.",
|
||||||
m_service->name, m_config.max_slave_connections, min_nsrv);
|
m_service->name, m_config->max_slave_connections, min_nsrv);
|
||||||
}
|
}
|
||||||
if (nservers < min_nsrv)
|
if (nservers < min_nsrv)
|
||||||
{
|
{
|
||||||
@ -258,7 +259,7 @@ bool RWSplit::have_enough_servers() const
|
|||||||
"too few backend servers configured in "
|
"too few backend servers configured in "
|
||||||
"MaxScale.cnf. Found %d%% when at least %.0f%% "
|
"MaxScale.cnf. Found %d%% when at least %.0f%% "
|
||||||
"would be required.", m_service->name,
|
"would be required.", m_service->name,
|
||||||
m_config.rw_max_slave_conn_percent, dbgpct);
|
m_config->rw_max_slave_conn_percent, dbgpct);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
succp = false;
|
succp = false;
|
||||||
@ -274,9 +275,8 @@ bool RWSplit::have_enough_servers() const
|
|||||||
|
|
||||||
RWSplit* RWSplit::create(SERVICE *service, char **options)
|
RWSplit* RWSplit::create(SERVICE *service, char **options)
|
||||||
{
|
{
|
||||||
|
|
||||||
MXS_CONFIG_PARAMETER* params = service->svc_config_param;
|
MXS_CONFIG_PARAMETER* params = service->svc_config_param;
|
||||||
Config config(params);
|
SConfig config(new Config(params));
|
||||||
|
|
||||||
if (!handle_max_slaves(config, config_get_string(params, "max_slave_connections")) ||
|
if (!handle_max_slaves(config, config_get_string(params, "max_slave_connections")) ||
|
||||||
(options && !rwsplit_process_router_options(config, options)))
|
(options && !rwsplit_process_router_options(config, options)))
|
||||||
@ -285,25 +285,25 @@ RWSplit* RWSplit::create(SERVICE *service, char **options)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** These options cancel each other out */
|
/** These options cancel each other out */
|
||||||
if (config.disable_sescmd_history && config.max_sescmd_history > 0)
|
if (config->disable_sescmd_history && config->max_sescmd_history > 0)
|
||||||
{
|
{
|
||||||
config.max_sescmd_history = 0;
|
config->max_sescmd_history = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config.optimistic_trx)
|
if (config->optimistic_trx)
|
||||||
{
|
{
|
||||||
// Optimistic transaction routing requires transaction replay
|
// Optimistic transaction routing requires transaction replay
|
||||||
config.transaction_replay = true;
|
config->transaction_replay = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config.transaction_replay)
|
if (config->transaction_replay)
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Replaying transactions requires that we are able to do delayed query
|
* Replaying transactions requires that we are able to do delayed query
|
||||||
* retries and reconnect to a master.
|
* retries and reconnect to a master.
|
||||||
*/
|
*/
|
||||||
config.delayed_retry = true;
|
config->delayed_retry = true;
|
||||||
config.master_reconnection = true;
|
config->master_reconnection = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new (std::nothrow) RWSplit(service, config);
|
return new (std::nothrow) RWSplit(service, config);
|
||||||
@ -320,40 +320,41 @@ void RWSplit::diagnostics(DCB *dcb)
|
|||||||
{
|
{
|
||||||
const char *weightby = serviceGetWeightingParameter(service());
|
const char *weightby = serviceGetWeightingParameter(service());
|
||||||
double master_pct = 0.0, slave_pct = 0.0, all_pct = 0.0;
|
double master_pct = 0.0, slave_pct = 0.0, all_pct = 0.0;
|
||||||
|
SConfig cnf = config();
|
||||||
|
|
||||||
dcb_printf(dcb, "\n");
|
dcb_printf(dcb, "\n");
|
||||||
dcb_printf(dcb, "\tuse_sql_variables_in: %s\n",
|
dcb_printf(dcb, "\tuse_sql_variables_in: %s\n",
|
||||||
mxs_target_to_str(config().use_sql_variables_in));
|
mxs_target_to_str(cnf->use_sql_variables_in));
|
||||||
dcb_printf(dcb, "\tslave_selection_criteria: %s\n",
|
dcb_printf(dcb, "\tslave_selection_criteria: %s\n",
|
||||||
select_criteria_to_str(config().slave_selection_criteria));
|
select_criteria_to_str(cnf->slave_selection_criteria));
|
||||||
dcb_printf(dcb, "\tmaster_failure_mode: %s\n",
|
dcb_printf(dcb, "\tmaster_failure_mode: %s\n",
|
||||||
failure_mode_to_str(config().master_failure_mode));
|
failure_mode_to_str(cnf->master_failure_mode));
|
||||||
dcb_printf(dcb, "\tmax_slave_replication_lag: %d\n",
|
dcb_printf(dcb, "\tmax_slave_replication_lag: %d\n",
|
||||||
config().max_slave_replication_lag);
|
cnf->max_slave_replication_lag);
|
||||||
dcb_printf(dcb, "\tretry_failed_reads: %s\n",
|
dcb_printf(dcb, "\tretry_failed_reads: %s\n",
|
||||||
config().retry_failed_reads ? "true" : "false");
|
cnf->retry_failed_reads ? "true" : "false");
|
||||||
dcb_printf(dcb, "\tstrict_multi_stmt: %s\n",
|
dcb_printf(dcb, "\tstrict_multi_stmt: %s\n",
|
||||||
config().strict_multi_stmt ? "true" : "false");
|
cnf->strict_multi_stmt ? "true" : "false");
|
||||||
dcb_printf(dcb, "\tstrict_sp_calls: %s\n",
|
dcb_printf(dcb, "\tstrict_sp_calls: %s\n",
|
||||||
config().strict_sp_calls ? "true" : "false");
|
cnf->strict_sp_calls ? "true" : "false");
|
||||||
dcb_printf(dcb, "\tdisable_sescmd_history: %s\n",
|
dcb_printf(dcb, "\tdisable_sescmd_history: %s\n",
|
||||||
config().disable_sescmd_history ? "true" : "false");
|
cnf->disable_sescmd_history ? "true" : "false");
|
||||||
dcb_printf(dcb, "\tmax_sescmd_history: %lu\n",
|
dcb_printf(dcb, "\tmax_sescmd_history: %lu\n",
|
||||||
config().max_sescmd_history);
|
cnf->max_sescmd_history);
|
||||||
dcb_printf(dcb, "\tmaster_accept_reads: %s\n",
|
dcb_printf(dcb, "\tmaster_accept_reads: %s\n",
|
||||||
config().master_accept_reads ? "true" : "false");
|
cnf->master_accept_reads ? "true" : "false");
|
||||||
dcb_printf(dcb, "\tconnection_keepalive: %d\n",
|
dcb_printf(dcb, "\tconnection_keepalive: %d\n",
|
||||||
config().connection_keepalive);
|
cnf->connection_keepalive);
|
||||||
dcb_printf(dcb, "\tcausal_reads: %s\n",
|
dcb_printf(dcb, "\tcausal_reads: %s\n",
|
||||||
config().causal_reads ? "true" : "false");
|
cnf->causal_reads ? "true" : "false");
|
||||||
dcb_printf(dcb, "\tcausal_reads_timeout: %s\n",
|
dcb_printf(dcb, "\tcausal_reads_timeout: %s\n",
|
||||||
config().causal_reads_timeout.c_str());
|
cnf->causal_reads_timeout.c_str());
|
||||||
dcb_printf(dcb, "\tmaster_reconnection: %s\n",
|
dcb_printf(dcb, "\tmaster_reconnection: %s\n",
|
||||||
config().master_reconnection ? "true" : "false");
|
cnf->master_reconnection ? "true" : "false");
|
||||||
dcb_printf(dcb, "\tdelayed_retry: %s\n",
|
dcb_printf(dcb, "\tdelayed_retry: %s\n",
|
||||||
config().delayed_retry ? "true" : "false");
|
cnf->delayed_retry ? "true" : "false");
|
||||||
dcb_printf(dcb, "\tdelayed_retry_timeout: %lu\n",
|
dcb_printf(dcb, "\tdelayed_retry_timeout: %lu\n",
|
||||||
config().delayed_retry_timeout);
|
cnf->delayed_retry_timeout);
|
||||||
|
|
||||||
dcb_printf(dcb, "\n");
|
dcb_printf(dcb, "\n");
|
||||||
|
|
||||||
|
|||||||
@ -200,6 +200,8 @@ struct Config
|
|||||||
bool optimistic_trx; /**< Enable optimistic transactions */
|
bool optimistic_trx; /**< Enable optimistic transactions */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef std::tr1::shared_ptr<Config> SConfig;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The statistics for this router instance
|
* The statistics for this router instance
|
||||||
*/
|
*/
|
||||||
@ -226,11 +228,11 @@ class RWSplit: public mxs::Router<RWSplit, RWSplitSession>
|
|||||||
RWSplit& operator=(const RWSplit&);
|
RWSplit& operator=(const RWSplit&);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
RWSplit(SERVICE* service, const Config& config);
|
RWSplit(SERVICE* service, SConfig config);
|
||||||
~RWSplit();
|
~RWSplit();
|
||||||
|
|
||||||
SERVICE* service() const;
|
SERVICE* service() const;
|
||||||
const Config& config() const;
|
SConfig config() const;
|
||||||
Stats& stats();
|
Stats& stats();
|
||||||
const Stats& stats() const;
|
const Stats& stats() const;
|
||||||
int max_slave_count() const;
|
int max_slave_count() const;
|
||||||
@ -298,7 +300,7 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
SERVICE* m_service; /**< Service where the router belongs*/
|
SERVICE* m_service; /**< Service where the router belongs*/
|
||||||
Config m_config;
|
SConfig m_config;
|
||||||
Stats m_stats;
|
Stats m_stats;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -116,7 +116,7 @@ bool RWSplitSession::handle_target_is_all(route_target_t route_target, GWBUF *qu
|
|||||||
{
|
{
|
||||||
// TODO: Append to the already stored session command instead of disabling history
|
// TODO: Append to the already stored session command instead of disabling history
|
||||||
MXS_INFO("Large session write, have to disable session command history");
|
MXS_INFO("Large session write, have to disable session command history");
|
||||||
m_config.disable_sescmd_history = true;
|
m_config->disable_sescmd_history = true;
|
||||||
|
|
||||||
continue_large_session_write(querybuf, qtype);
|
continue_large_session_write(querybuf, qtype);
|
||||||
result = true;
|
result = true;
|
||||||
|
|||||||
@ -84,7 +84,7 @@ void RWSplitSession::handle_connection_keepalive(SRWBackend& target)
|
|||||||
ss_dassert(target);
|
ss_dassert(target);
|
||||||
ss_debug(int nserv = 0);
|
ss_debug(int nserv = 0);
|
||||||
/** Each heartbeat is 1/10th of a second */
|
/** Each heartbeat is 1/10th of a second */
|
||||||
int keepalive = m_config.connection_keepalive * 10;
|
int keepalive = m_config->connection_keepalive * 10;
|
||||||
|
|
||||||
for (auto it = m_backends.begin(); it != m_backends.end(); it++)
|
for (auto it = m_backends.begin(); it != m_backends.end(); it++)
|
||||||
{
|
{
|
||||||
@ -115,7 +115,7 @@ bool RWSplitSession::prepare_target(SRWBackend& target, route_target_t route_tar
|
|||||||
if (!target->in_use())
|
if (!target->in_use())
|
||||||
{
|
{
|
||||||
ss_dassert(target->can_connect() && can_recover_servers());
|
ss_dassert(target->can_connect() && can_recover_servers());
|
||||||
ss_dassert(!TARGET_IS_MASTER(route_target) || m_config.master_reconnection);
|
ss_dassert(!TARGET_IS_MASTER(route_target) || m_config->master_reconnection);
|
||||||
rval = target->connect(m_client->session, &m_sescmd_list);
|
rval = target->connect(m_client->session, &m_sescmd_list);
|
||||||
MXS_INFO("Connected to '%s'", target->name());
|
MXS_INFO("Connected to '%s'", target->name());
|
||||||
|
|
||||||
@ -166,7 +166,7 @@ bool RWSplitSession::have_connected_slaves() const
|
|||||||
|
|
||||||
bool RWSplitSession::should_try_trx_on_slave(route_target_t route_target) const
|
bool RWSplitSession::should_try_trx_on_slave(route_target_t route_target) const
|
||||||
{
|
{
|
||||||
return m_config.optimistic_trx && // Optimistic transactions are enabled
|
return m_config->optimistic_trx && // Optimistic transactions are enabled
|
||||||
!is_locked_to_master() && // Not locked to master
|
!is_locked_to_master() && // Not locked to master
|
||||||
!m_is_replay_active && // Not replaying a transaction
|
!m_is_replay_active && // Not replaying a transaction
|
||||||
m_otrx_state == OTRX_INACTIVE && // Not yet in optimistic mode
|
m_otrx_state == OTRX_INACTIVE && // Not yet in optimistic mode
|
||||||
@ -260,7 +260,7 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// If delayed query retry is enabled, we need to store the current statement
|
// If delayed query retry is enabled, we need to store the current statement
|
||||||
bool store_stmt = m_config.delayed_retry;
|
bool store_stmt = m_config->delayed_retry;
|
||||||
|
|
||||||
if (m_qc.large_query())
|
if (m_qc.large_query())
|
||||||
{
|
{
|
||||||
@ -297,7 +297,7 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
|
|||||||
{
|
{
|
||||||
succp = true;
|
succp = true;
|
||||||
|
|
||||||
if (m_config.retry_failed_reads &&
|
if (m_config->retry_failed_reads &&
|
||||||
(command == MXS_COM_QUERY || command == MXS_COM_STMT_EXECUTE))
|
(command == MXS_COM_QUERY || command == MXS_COM_STMT_EXECUTE))
|
||||||
{
|
{
|
||||||
// Only commands that can contain an SQL statement should be stored
|
// Only commands that can contain an SQL statement should be stored
|
||||||
@ -309,8 +309,8 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
|
|||||||
{
|
{
|
||||||
succp = handle_master_is_target(&target);
|
succp = handle_master_is_target(&target);
|
||||||
|
|
||||||
if (!m_config.strict_multi_stmt &&
|
if (!m_config->strict_multi_stmt &&
|
||||||
!m_config.strict_sp_calls &&
|
!m_config->strict_sp_calls &&
|
||||||
m_target_node == m_current_master)
|
m_target_node == m_current_master)
|
||||||
{
|
{
|
||||||
/** Reset the forced node as we're in relaxed multi-statement mode */
|
/** Reset the forced node as we're in relaxed multi-statement mode */
|
||||||
@ -363,7 +363,7 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (succp && m_router->config().connection_keepalive &&
|
if (succp && m_config->connection_keepalive &&
|
||||||
(TARGET_IS_SLAVE(route_target) || TARGET_IS_MASTER(route_target)))
|
(TARGET_IS_SLAVE(route_target) || TARGET_IS_MASTER(route_target)))
|
||||||
{
|
{
|
||||||
handle_connection_keepalive(target);
|
handle_connection_keepalive(target);
|
||||||
@ -488,7 +488,7 @@ bool RWSplitSession::route_session_write(GWBUF *querybuf, uint8_t command, uint3
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (m_config.max_sescmd_history > 0 && m_sescmd_list.size() >= m_config.max_sescmd_history)
|
if (m_config->max_sescmd_history > 0 && m_sescmd_list.size() >= m_config->max_sescmd_history)
|
||||||
{
|
{
|
||||||
static bool warn_history_exceeded = true;
|
static bool warn_history_exceeded = true;
|
||||||
if (warn_history_exceeded)
|
if (warn_history_exceeded)
|
||||||
@ -500,16 +500,16 @@ bool RWSplitSession::route_session_write(GWBUF *querybuf, uint8_t command, uint3
|
|||||||
"command history, add `disable_sescmd_history=true` to "
|
"command history, add `disable_sescmd_history=true` to "
|
||||||
"service '%s'. To increase the limit (currently %lu), add "
|
"service '%s'. To increase the limit (currently %lu), add "
|
||||||
"`max_sescmd_history` to the same service and increase the value.",
|
"`max_sescmd_history` to the same service and increase the value.",
|
||||||
m_router->service()->name, m_config.max_sescmd_history);
|
m_router->service()->name, m_config->max_sescmd_history);
|
||||||
warn_history_exceeded = false;
|
warn_history_exceeded = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
m_config.disable_sescmd_history = true;
|
m_config->disable_sescmd_history = true;
|
||||||
m_config.max_sescmd_history = 0;
|
m_config->max_sescmd_history = 0;
|
||||||
m_sescmd_list.clear();
|
m_sescmd_list.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (m_config.disable_sescmd_history)
|
if (m_config->disable_sescmd_history)
|
||||||
{
|
{
|
||||||
/** Prune stored responses */
|
/** Prune stored responses */
|
||||||
ResponseMap::iterator it = m_sescmd_responses.lower_bound(lowest_pos);
|
ResponseMap::iterator it = m_sescmd_responses.lower_bound(lowest_pos);
|
||||||
@ -596,7 +596,7 @@ SRWBackend RWSplitSession::get_slave_backend(int max_rlag)
|
|||||||
}
|
}
|
||||||
else if (backend->in_use() || counts.second < m_router->max_slave_count())
|
else if (backend->in_use() || counts.second < m_router->max_slave_count())
|
||||||
{
|
{
|
||||||
if (!m_config.master_accept_reads && rval->is_master())
|
if (!m_config->master_accept_reads && rval->is_master())
|
||||||
{
|
{
|
||||||
// Pick slaves over masters with master_accept_reads=false
|
// Pick slaves over masters with master_accept_reads=false
|
||||||
rval = backend;
|
rval = backend;
|
||||||
@ -604,7 +604,7 @@ SRWBackend RWSplitSession::get_slave_backend(int max_rlag)
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
// Compare the two servers and pick the best one
|
// Compare the two servers and pick the best one
|
||||||
rval = compare_backends(rval, backend, m_config.slave_selection_criteria);
|
rval = compare_backends(rval, backend, m_config->slave_selection_criteria);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -622,7 +622,7 @@ SRWBackend RWSplitSession::get_master_backend()
|
|||||||
|
|
||||||
if (master)
|
if (master)
|
||||||
{
|
{
|
||||||
if (master->in_use() || (m_config.master_reconnection && master->can_connect()))
|
if (master->in_use() || (m_config->master_reconnection && master->can_connect()))
|
||||||
{
|
{
|
||||||
if (master->is_master())
|
if (master->is_master())
|
||||||
{
|
{
|
||||||
@ -696,9 +696,9 @@ int RWSplitSession::get_max_replication_lag()
|
|||||||
int conf_max_rlag;
|
int conf_max_rlag;
|
||||||
|
|
||||||
/** if there is no configured value, then longest possible int is used */
|
/** if there is no configured value, then longest possible int is used */
|
||||||
if (m_config.max_slave_replication_lag > 0)
|
if (m_config->max_slave_replication_lag > 0)
|
||||||
{
|
{
|
||||||
conf_max_rlag = m_config.max_slave_replication_lag;
|
conf_max_rlag = m_config->max_slave_replication_lag;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -878,7 +878,7 @@ void RWSplitSession::log_master_routing_failure(bool found,
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
/** We never had a master connection, the session must be in read-only mode */
|
/** We never had a master connection, the session must be in read-only mode */
|
||||||
if (m_config.master_failure_mode != RW_FAIL_INSTANTLY)
|
if (m_config->master_failure_mode != RW_FAIL_INSTANTLY)
|
||||||
{
|
{
|
||||||
sprintf(errmsg, "Session is in read-only mode because it was created "
|
sprintf(errmsg, "Session is in read-only mode because it was created "
|
||||||
"when no master was available");
|
"when no master was available");
|
||||||
@ -899,7 +899,7 @@ void RWSplitSession::log_master_routing_failure(bool found,
|
|||||||
|
|
||||||
bool RWSplitSession::should_replace_master(SRWBackend& target)
|
bool RWSplitSession::should_replace_master(SRWBackend& target)
|
||||||
{
|
{
|
||||||
return m_config.master_reconnection &&
|
return m_config->master_reconnection &&
|
||||||
// We have a target server and it's not the current master
|
// We have a target server and it's not the current master
|
||||||
target && target != m_current_master &&
|
target && target != m_current_master &&
|
||||||
// We are not inside a transaction (also checks for autocommit=1)
|
// We are not inside a transaction (also checks for autocommit=1)
|
||||||
@ -946,7 +946,7 @@ bool RWSplitSession::handle_master_is_target(SRWBackend* dest)
|
|||||||
{
|
{
|
||||||
succp = false;
|
succp = false;
|
||||||
/** The original master is not available, we can't route the write */
|
/** The original master is not available, we can't route the write */
|
||||||
if (m_config.master_failure_mode == RW_ERROR_ON_WRITE)
|
if (m_config->master_failure_mode == RW_ERROR_ON_WRITE)
|
||||||
{
|
{
|
||||||
succp = send_readonly_error(m_client);
|
succp = send_readonly_error(m_client);
|
||||||
|
|
||||||
@ -955,8 +955,8 @@ bool RWSplitSession::handle_master_is_target(SRWBackend* dest)
|
|||||||
m_current_master->close();
|
m_current_master->close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (!m_config.delayed_retry ||
|
else if (!m_config->delayed_retry ||
|
||||||
m_retry_duration >= m_config.delayed_retry_timeout)
|
m_retry_duration >= m_config->delayed_retry_timeout)
|
||||||
{
|
{
|
||||||
// Cannot retry the query, log a message that routing has failed
|
// Cannot retry the query, log a message that routing has failed
|
||||||
log_master_routing_failure(succp, m_current_master, target);
|
log_master_routing_failure(succp, m_current_master, target);
|
||||||
@ -993,7 +993,7 @@ GWBUF* RWSplitSession::add_prefix_wait_gtid(SERVER *server, GWBUF *origin)
|
|||||||
GWBUF* rval = origin;
|
GWBUF* rval = origin;
|
||||||
const char* wait_func = (server->server_type == SERVER_TYPE_MARIADB) ?
|
const char* wait_func = (server->server_type == SERVER_TYPE_MARIADB) ?
|
||||||
MARIADB_WAIT_GTID_FUNC : MYSQL_WAIT_GTID_FUNC;
|
MARIADB_WAIT_GTID_FUNC : MYSQL_WAIT_GTID_FUNC;
|
||||||
const char *gtid_wait_timeout = m_router->config().causal_reads_timeout.c_str();
|
const char *gtid_wait_timeout = m_config->causal_reads_timeout.c_str();
|
||||||
const char *gtid_position = m_gtid_pos.c_str();
|
const char *gtid_position = m_gtid_pos.c_str();
|
||||||
|
|
||||||
/* Create a new buffer to store prefix sql */
|
/* Create a new buffer to store prefix sql */
|
||||||
@ -1051,7 +1051,7 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, SRWBackend& target, bool
|
|||||||
uint8_t cmd = mxs_mysql_get_command(querybuf);
|
uint8_t cmd = mxs_mysql_get_command(querybuf);
|
||||||
GWBUF *send_buf = gwbuf_clone(querybuf);
|
GWBUF *send_buf = gwbuf_clone(querybuf);
|
||||||
|
|
||||||
if (cmd == COM_QUERY && m_router->config().causal_reads && !m_gtid_pos .empty())
|
if (cmd == COM_QUERY && m_config->causal_reads && !m_gtid_pos .empty())
|
||||||
{
|
{
|
||||||
send_buf = add_prefix_wait_gtid(target->server(), send_buf);
|
send_buf = add_prefix_wait_gtid(target->server(), send_buf);
|
||||||
m_wait_gtid = WAITING_FOR_HEADER;
|
m_wait_gtid = WAITING_FOR_HEADER;
|
||||||
|
|||||||
@ -301,15 +301,16 @@ bool RWSplit::select_connect_backend_servers(MXS_SESSION *session,
|
|||||||
connection_type type)
|
connection_type type)
|
||||||
{
|
{
|
||||||
SRWBackend master = get_root_master(backends);
|
SRWBackend master = get_root_master(backends);
|
||||||
|
SConfig cnf(config());
|
||||||
|
|
||||||
if (!master && config().master_failure_mode == RW_FAIL_INSTANTLY)
|
if (!master && cnf->master_failure_mode == RW_FAIL_INSTANTLY)
|
||||||
{
|
{
|
||||||
MXS_ERROR("Couldn't find suitable Master from %lu candidates.", backends.size());
|
MXS_ERROR("Couldn't find suitable Master from %lu candidates.", backends.size());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Check slave selection criteria and set compare function */
|
/** Check slave selection criteria and set compare function */
|
||||||
select_criteria_t select_criteria = config().slave_selection_criteria;
|
select_criteria_t select_criteria = cnf->slave_selection_criteria;
|
||||||
auto cmpfun = criteria_cmpfun[select_criteria];
|
auto cmpfun = criteria_cmpfun[select_criteria];
|
||||||
ss_dassert(cmpfun);
|
ss_dassert(cmpfun);
|
||||||
|
|
||||||
|
|||||||
@ -38,17 +38,17 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
|||||||
m_gtid_pos(""),
|
m_gtid_pos(""),
|
||||||
m_wait_gtid(NONE),
|
m_wait_gtid(NONE),
|
||||||
m_next_seq(0),
|
m_next_seq(0),
|
||||||
m_qc(this, session, instance->config().use_sql_variables_in),
|
m_qc(this, session, m_config->use_sql_variables_in),
|
||||||
m_retry_duration(0),
|
m_retry_duration(0),
|
||||||
m_is_replay_active(false),
|
m_is_replay_active(false),
|
||||||
m_can_replay_trx(true)
|
m_can_replay_trx(true)
|
||||||
{
|
{
|
||||||
if (m_config.rw_max_slave_conn_percent)
|
if (m_config->rw_max_slave_conn_percent)
|
||||||
{
|
{
|
||||||
int n_conn = 0;
|
int n_conn = 0;
|
||||||
double pct = (double)m_config.rw_max_slave_conn_percent / 100.0;
|
double pct = (double)m_config->rw_max_slave_conn_percent / 100.0;
|
||||||
n_conn = MXS_MAX(floor((double)m_nbackends * pct), 1);
|
n_conn = MXS_MAX(floor((double)m_nbackends * pct), 1);
|
||||||
m_config.max_slave_connections = n_conn;
|
m_config->max_slave_connections = n_conn;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -350,7 +350,7 @@ static void log_unexpected_response(SRWBackend& backend, GWBUF* buffer, GWBUF* c
|
|||||||
|
|
||||||
GWBUF* RWSplitSession::handle_causal_read_reply(GWBUF *writebuf, SRWBackend& backend)
|
GWBUF* RWSplitSession::handle_causal_read_reply(GWBUF *writebuf, SRWBackend& backend)
|
||||||
{
|
{
|
||||||
if (m_config.causal_reads)
|
if (m_config->causal_reads)
|
||||||
{
|
{
|
||||||
if (GWBUF_IS_REPLY_OK(writebuf) && backend == m_current_master)
|
if (GWBUF_IS_REPLY_OK(writebuf) && backend == m_current_master)
|
||||||
{
|
{
|
||||||
@ -473,7 +473,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
|||||||
poll_fake_hangup_event(backend_dcb);
|
poll_fake_hangup_event(backend_dcb);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (m_config.transaction_replay && m_can_replay_trx &&
|
else if (m_config->transaction_replay && m_can_replay_trx &&
|
||||||
session_trx_is_active(m_client->session))
|
session_trx_is_active(m_client->session))
|
||||||
{
|
{
|
||||||
if (!backend->has_session_commands())
|
if (!backend->has_session_commands())
|
||||||
@ -491,7 +491,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
|||||||
|
|
||||||
size_t size{m_trx.size() + m_current_query.length()};
|
size_t size{m_trx.size() + m_current_query.length()};
|
||||||
// A transaction is open and it is eligible for replaying
|
// A transaction is open and it is eligible for replaying
|
||||||
if (size < m_config.trx_max_size)
|
if (size < m_config->trx_max_size)
|
||||||
{
|
{
|
||||||
/** Transaction size is OK, store the statement for replaying and
|
/** Transaction size is OK, store the statement for replaying and
|
||||||
* update the checksum of the result */
|
* update the checksum of the result */
|
||||||
@ -524,7 +524,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
|||||||
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
|
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
|
||||||
MXS_INFO("Reply complete, last reply from %s", backend->name());
|
MXS_INFO("Reply complete, last reply from %s", backend->name());
|
||||||
|
|
||||||
if (m_config.causal_reads)
|
if (m_config->causal_reads)
|
||||||
{
|
{
|
||||||
// The reply should never be complete while we are still waiting for the header.
|
// The reply should never be complete while we are still waiting for the header.
|
||||||
ss_dassert(m_wait_gtid != WAITING_FOR_HEADER);
|
ss_dassert(m_wait_gtid != WAITING_FOR_HEADER);
|
||||||
@ -560,7 +560,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
|||||||
}
|
}
|
||||||
else if (m_is_replay_active)
|
else if (m_is_replay_active)
|
||||||
{
|
{
|
||||||
ss_dassert(m_config.transaction_replay);
|
ss_dassert(m_config->transaction_replay);
|
||||||
handle_trx_replay();
|
handle_trx_replay();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -582,7 +582,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (m_config.transaction_replay && session_trx_is_ending(m_client->session))
|
else if (m_config->transaction_replay && session_trx_is_ending(m_client->session))
|
||||||
{
|
{
|
||||||
m_trx.close();
|
m_trx.close();
|
||||||
m_can_replay_trx = true;
|
m_can_replay_trx = true;
|
||||||
@ -638,7 +638,7 @@ bool RWSplitSession::start_trx_replay()
|
|||||||
{
|
{
|
||||||
bool rval = false;
|
bool rval = false;
|
||||||
|
|
||||||
if (!m_is_replay_active && m_config.transaction_replay && m_can_replay_trx)
|
if (!m_is_replay_active && m_config->transaction_replay && m_can_replay_trx)
|
||||||
{
|
{
|
||||||
if (m_trx.have_stmts() || m_current_query.get())
|
if (m_trx.have_stmts() || m_current_query.get())
|
||||||
{
|
{
|
||||||
@ -736,7 +736,7 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
|
|||||||
* can't be sure whether it was executed or not. In this
|
* can't be sure whether it was executed or not. In this
|
||||||
* case the safest thing to do is to close the client
|
* case the safest thing to do is to close the client
|
||||||
* connection. */
|
* connection. */
|
||||||
if (m_config.master_failure_mode != RW_FAIL_INSTANTLY)
|
if (m_config->master_failure_mode != RW_FAIL_INSTANTLY)
|
||||||
{
|
{
|
||||||
can_continue = true;
|
can_continue = true;
|
||||||
}
|
}
|
||||||
@ -752,7 +752,7 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
|
|||||||
can_continue = true;
|
can_continue = true;
|
||||||
retry_query(m_current_query.release());
|
retry_query(m_current_query.release());
|
||||||
}
|
}
|
||||||
else if (m_config.master_failure_mode == RW_ERROR_ON_WRITE)
|
else if (m_config->master_failure_mode == RW_ERROR_ON_WRITE)
|
||||||
{
|
{
|
||||||
/** In error_on_write mode, the session can continue even
|
/** In error_on_write mode, the session can continue even
|
||||||
* if the master is lost. Send a read-only error to
|
* if the master is lost. Send a read-only error to
|
||||||
@ -865,7 +865,7 @@ bool RWSplitSession::handle_error_new_connection(DCB *backend_dcb, GWBUF *errmsg
|
|||||||
*/
|
*/
|
||||||
GWBUF *stored = m_current_query.release();
|
GWBUF *stored = m_current_query.release();
|
||||||
|
|
||||||
if (stored && m_config.retry_failed_reads)
|
if (stored && m_config->retry_failed_reads)
|
||||||
{
|
{
|
||||||
MXS_INFO("Re-routing failed read after server '%s' failed", backend->name());
|
MXS_INFO("Re-routing failed read after server '%s' failed", backend->name());
|
||||||
retry_query(stored, 0);
|
retry_query(stored, 0);
|
||||||
@ -905,7 +905,7 @@ bool RWSplitSession::handle_error_new_connection(DCB *backend_dcb, GWBUF *errmsg
|
|||||||
* Try to get replacement slave or at least the minimum
|
* Try to get replacement slave or at least the minimum
|
||||||
* number of slave connections for router session.
|
* number of slave connections for router session.
|
||||||
*/
|
*/
|
||||||
if (m_recv_sescmd > 0 && m_config.disable_sescmd_history)
|
if (m_recv_sescmd > 0 && m_config->disable_sescmd_history)
|
||||||
{
|
{
|
||||||
succp = m_router->have_enough_servers();
|
succp = m_router->have_enough_servers();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -132,7 +132,7 @@ public:
|
|||||||
mxs::SRWBackend m_current_master; /**< Current master server */
|
mxs::SRWBackend m_current_master; /**< Current master server */
|
||||||
mxs::SRWBackend m_target_node; /**< The currently locked target node */
|
mxs::SRWBackend m_target_node; /**< The currently locked target node */
|
||||||
mxs::SRWBackend m_prev_target; /**< The previous target where a query was sent */
|
mxs::SRWBackend m_prev_target; /**< The previous target where a query was sent */
|
||||||
Config m_config; /**< copied config info from router instance */
|
SConfig m_config; /**< Configuration for this session */
|
||||||
int m_nbackends; /**< Number of backend servers (obsolete) */
|
int m_nbackends; /**< Number of backend servers (obsolete) */
|
||||||
DCB* m_client; /**< The client DCB */
|
DCB* m_client; /**< The client DCB */
|
||||||
uint64_t m_sescmd_count; /**< Number of executed session commands */
|
uint64_t m_sescmd_count; /**< Number of executed session commands */
|
||||||
@ -249,14 +249,14 @@ private:
|
|||||||
*
|
*
|
||||||
* @see handle_trx_replay
|
* @see handle_trx_replay
|
||||||
*/
|
*/
|
||||||
return m_config.delayed_retry &&
|
return m_config->delayed_retry &&
|
||||||
m_retry_duration < m_config.delayed_retry_timeout &&
|
m_retry_duration < m_config->delayed_retry_timeout &&
|
||||||
!session_trx_is_active(m_client->session);
|
!session_trx_is_active(m_client->session);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline bool can_recover_servers() const
|
inline bool can_recover_servers() const
|
||||||
{
|
{
|
||||||
return !m_config.disable_sescmd_history || m_recv_sescmd == 0;
|
return !m_config->disable_sescmd_history || m_recv_sescmd == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline bool is_large_query(GWBUF* buf)
|
inline bool is_large_query(GWBUF* buf)
|
||||||
|
|||||||
Reference in New Issue
Block a user