MariaDBMon: Move replication manipulation functions to a separate file

Refactoring continues. This update moves some of the replication manipulation
functions to a separate file and turns them into class methods.
This commit is contained in:
Esa Korhonen
2018-02-21 15:10:28 +02:00
parent ff55106610
commit 8cdd23dda2
4 changed files with 263 additions and 234 deletions

View File

@ -20,14 +20,11 @@
#include "mariadbmon.hh"
#include <inttypes.h>
#include <limits>
#include <string>
#include <sstream>
#include <vector>
#include <maxscale/alloc.h>
#include <maxscale/dcb.h>
#include <maxscale/debug.h>
#include <maxscale/hk_heartbeat.h>
#include <maxscale/json_api.h>
#include <maxscale/modulecmd.h>
#include <maxscale/modutil.h>
#include <maxscale/mysql_utils.h>
@ -59,16 +56,6 @@
#define SLAVE_HOSTS_HOSTNAME 1
#define SLAVE_HOSTS_PORT 2
/** Utility macro for printing both MXS_ERROR and json error */
#define PRINT_MXS_JSON_ERROR(err_out, format, ...)\
do {\
MXS_ERROR(format, ##__VA_ARGS__);\
if (err_out)\
{\
*err_out = mxs_json_error_append(*err_out, format, ##__VA_ARGS__);\
}\
} while (false)
using std::string;
typedef std::vector<string> StringVector;
class MySqlServerInfo;
@ -100,8 +87,6 @@ static void set_slave_heartbeat(MXS_MONITOR *, MXS_MONITORED_SERVER *);
static int add_slave_to_master(long *, int, long);
static bool isMySQLEvent(mxs_monitor_event_t event);
void check_maxscale_schema_replication(MXS_MONITOR *monitor);
static MySqlServerInfo* get_server_info(MariaDBMonitor* handle, const MXS_MONITORED_SERVER* db);
static const MySqlServerInfo* get_server_info(const MariaDBMonitor* handle, const MXS_MONITORED_SERVER* db);
static bool mon_process_failover(MariaDBMonitor*, uint32_t, bool*);
static bool do_failover(MariaDBMonitor* mon, json_t** output);
static bool do_switchover(MariaDBMonitor* mon, MXS_MONITORED_SERVER* current_master,
@ -114,10 +99,8 @@ static void read_server_variables(MXS_MONITORED_SERVER* database, MySqlServerInf
static bool server_is_rejoin_suspect(MariaDBMonitor* mon, MXS_MONITORED_SERVER* server,
MySqlServerInfo* master_info);
static bool get_joinable_servers(MariaDBMonitor* mon, ServerVector* output);
static uint32_t do_rejoin(MariaDBMonitor* mon, const ServerVector& servers);
static bool join_cluster(MXS_MONITORED_SERVER* server, const char* change_cmd);
static void disable_setting(MariaDBMonitor* mon, const char* setting);
static bool cluster_can_be_joined(MariaDBMonitor* mon);
static bool can_replicate_from(MariaDBMonitor* mon,
MXS_MONITORED_SERVER* slave, MySqlServerInfo* slave_info,
MXS_MONITORED_SERVER* master, MySqlServerInfo* master_info);
@ -125,7 +108,6 @@ static bool wait_cluster_stabilization(MariaDBMonitor* mon, MXS_MONITORED_SERVER
const ServerVector& slaves, int seconds_remaining);
static string get_connection_errors(const ServerVector& servers);
static int64_t scan_server_id(const char* id_string);
static string generate_change_master_cmd(MariaDBMonitor* mon, const string& master_host, int master_port);
static bool report_version_err = true;
static const char* hb_table_name = "maxscale_schema.replication_heartbeat";
@ -549,7 +531,7 @@ bool mysql_rejoin(MXS_MONITOR* mon, SERVER* rejoin_server, json_t** output)
bool rval = false;
MariaDBMonitor *handle = static_cast<MariaDBMonitor*>(mon->handle);
if (cluster_can_be_joined(handle))
if (handle->cluster_can_be_joined())
{
MXS_MONITORED_SERVER* mon_server = mon_get_monitored_server(mon, rejoin_server);
if (mon_server)
@ -564,7 +546,7 @@ bool mysql_rejoin(MXS_MONITOR* mon, SERVER* rejoin_server, json_t** output)
{
ServerVector joinable_server;
joinable_server.push_back(mon_server);
if (do_rejoin(handle, joinable_server) == 1)
if (handle->do_rejoin(joinable_server) == 1)
{
rval = true;
MXS_NOTICE("Rejoin performed.");
@ -785,36 +767,36 @@ void init_server_info(MariaDBMonitor *handle)
}
}
static MySqlServerInfo* get_server_info(MariaDBMonitor* handle, const MXS_MONITORED_SERVER* db)
MySqlServerInfo* get_server_info(MariaDBMonitor* handle, const MXS_MONITORED_SERVER* db)
{
ServerInfoMap::iterator iter = handle->server_info.find(db);
ss_dassert(iter != handle->server_info.end());
return &iter->second;
}
static const MySqlServerInfo* get_server_info(const MariaDBMonitor* handle, const MXS_MONITORED_SERVER* db)
const MySqlServerInfo* get_server_info(const MariaDBMonitor* handle, const MXS_MONITORED_SERVER* db)
{
return get_server_info(const_cast<MariaDBMonitor*>(handle), db);
}
static bool set_replication_credentials(MariaDBMonitor *handle, const MXS_CONFIG_PARAMETER* params)
bool MariaDBMonitor::set_replication_credentials(const MXS_CONFIG_PARAMETER* params)
{
bool rval = false;
const char* repl_user = config_get_string(params, CN_REPLICATION_USER);
const char* repl_pw = config_get_string(params, CN_REPLICATION_PASSWORD);
string repl_user = config_get_string(params, CN_REPLICATION_USER);
string repl_pw = config_get_string(params, CN_REPLICATION_PASSWORD);
if (!*repl_user && !*repl_pw)
if (repl_user.empty() && repl_pw.empty())
{
// No replication credentials defined, use monitor credentials
repl_user = handle->monitor->user;
repl_pw = handle->monitor->password;
repl_user = monitor->user;
repl_pw = monitor->password;
}
if (*repl_user && *repl_pw)
if (!repl_user.empty() && !repl_pw.empty())
{
handle->replication_user = repl_user;
char* decrypted = decrypt_password(repl_pw);
handle->replication_password = decrypted;
m_replication_user = repl_user;
char* decrypted = decrypt_password(repl_pw.c_str());
m_replication_password = decrypted;
MXS_FREE(decrypted);
rval = true;
}
@ -860,8 +842,6 @@ startMonitor(MXS_MONITOR *monitor, const MXS_CONFIG_PARAMETER* params)
{
handle->shutdown = 0;
handle->script.clear();
handle->replication_user.clear();
handle->replication_password.clear();
handle->excluded_servers.clear();
}
else
@ -904,7 +884,7 @@ startMonitor(MXS_MONITOR *monitor, const MXS_CONFIG_PARAMETER* params)
}
MXS_FREE(excluded_array);
if (!set_replication_credentials(handle, params))
if (!handle->set_replication_credentials(params))
{
MXS_ERROR("Both '%s' and '%s' must be defined", CN_REPLICATION_USER, CN_REPLICATION_PASSWORD);
error = true;
@ -2364,13 +2344,13 @@ monitorMain(void *arg)
// Do not auto-join servers on this monitor loop if a failover (or any other cluster modification)
// has been performed, as server states have not been updated yet. It will happen next iteration.
if (!config_get_global_options()->passive && handle->auto_rejoin &&
!failover_performed && cluster_can_be_joined(handle))
!failover_performed && handle->cluster_can_be_joined())
{
// Check if any servers should be autojoined to the cluster
ServerVector joinable_servers;
if (get_joinable_servers(handle, &joinable_servers))
{
uint32_t joins = do_rejoin(handle, joinable_servers);
uint32_t joins = handle->do_rejoin(joinable_servers);
if (joins > 0)
{
MXS_NOTICE("%d server(s) redirected or rejoined the cluster.", joins);
@ -3461,25 +3441,6 @@ bool failover_wait_relay_log(MariaDBMonitor* mon, MXS_MONITORED_SERVER* new_mast
return rval;
}
bool start_external_replication(MariaDBMonitor* mon, MXS_MONITORED_SERVER* new_master, json_t** err_out)
{
bool rval = false;
string change_cmd = generate_change_master_cmd(mon, mon->external_master_host, mon->external_master_port);
if (mxs_mysql_query(new_master->con, change_cmd.c_str()) == 0 &&
mxs_mysql_query(new_master->con, "START SLAVE;") == 0)
{
MXS_NOTICE("New master starting replication from external master %s:%d.",
mon->external_master_host.c_str(), mon->external_master_port);
rval = true;
}
else
{
PRINT_MXS_JSON_ERROR(err_out, "Could not start replication from external master: '%s'.",
mysql_error(new_master->con));
}
return rval;
}
/**
* Prepares a server for the replication master role.
*
@ -3514,96 +3475,13 @@ bool promote_new_master(MariaDBMonitor* mon, MXS_MONITORED_SERVER* new_master, j
// If the previous master was a slave to an external master, start the equivalent slave connection on
// the new master. Success of replication is not checked.
else if (mon->external_master_port != PORT_UNKNOWN &&
!start_external_replication(mon, new_master, err_out))
!mon->start_external_replication(new_master, err_out))
{
success = false;
}
return success;
}
/**
* Generate a CHANGE MASTER TO-query.
*
* @param mon Cluster monitor, needed for username & password
* @param master_host Master hostname/address
* @param master_port Master port
* @return Generated query
*/
string generate_change_master_cmd(MariaDBMonitor* mon, const string& master_host, int master_port)
{
std::stringstream change_cmd;
change_cmd << "CHANGE MASTER TO MASTER_HOST = '" << master_host << "', ";
change_cmd << "MASTER_PORT = " << master_port << ", ";
change_cmd << "MASTER_USE_GTID = current_pos, ";
change_cmd << "MASTER_USER = '" << mon->replication_user << "', ";
const char MASTER_PW[] = "MASTER_PASSWORD = '";
const char END[] = "';";
#if defined(SS_DEBUG)
std::stringstream change_cmd_nopw;
change_cmd_nopw << change_cmd.str();
change_cmd_nopw << MASTER_PW << "******" << END;;
MXS_DEBUG("Change master command is '%s'.", change_cmd_nopw.str().c_str());
#endif
change_cmd << MASTER_PW << mon->replication_password << END;
return change_cmd.str();
}
/**
* Redirect one slave server to another master
*
* @param slave Server to redirect
* @param change_cmd Change master command, usually generated by generate_change_master_cmd()
* @return True if slave accepted all commands
*/
bool redirect_one_slave(MXS_MONITORED_SERVER* slave, const char* change_cmd)
{
bool rval = false;
if (mxs_mysql_query(slave->con, "STOP SLAVE;") == 0 &&
mxs_mysql_query(slave->con, "RESET SLAVE;") == 0 && // To erase any old I/O or SQL errors
mxs_mysql_query(slave->con, change_cmd) == 0 &&
mxs_mysql_query(slave->con, "START SLAVE;") == 0)
{
rval = true;
MXS_NOTICE("Slave '%s' redirected to new master.", slave->server->unique_name);
}
else
{
MXS_WARNING("Slave '%s' redirection failed: '%s'.", slave->server->unique_name,
mysql_error(slave->con));
}
return rval;
}
/**
* Redirects slaves to replicate from another master server.
*
* @param mon The monitor
* @param slaves An array of slaves
* @param new_master The replication master
* @param redirected_slaves A vector where to insert successfully redirected slaves. Can be NULL.
* @return The number of slaves successfully redirected.
*/
int redirect_slaves(MariaDBMonitor* mon, MXS_MONITORED_SERVER* new_master, const ServerVector& slaves,
ServerVector* redirected_slaves = NULL)
{
MXS_NOTICE("Redirecting slaves to new master.");
std::string change_cmd = generate_change_master_cmd(mon,
new_master->server->name, new_master->server->port);
int successes = 0;
for (ServerVector::const_iterator iter = slaves.begin(); iter != slaves.end(); iter++)
{
if (redirect_one_slave(*iter, change_cmd.c_str()))
{
successes++;
if (redirected_slaves)
{
redirected_slaves->push_back(*iter);
}
}
}
return successes;
}
/**
* Print a redirect error to logs. If err_out exists, generate a combined error message by querying all
* the server parameters for connection errors and append these errors to err_out.
@ -3677,7 +3555,7 @@ static bool do_failover(MariaDBMonitor* mon, json_t** err_out)
{
// Step 4: Redirect slaves.
ServerVector redirected_slaves;
int redirects = redirect_slaves(mon, new_master, redirectable_slaves, &redirected_slaves);
int redirects = mon->redirect_slaves(new_master, redirectable_slaves, &redirected_slaves);
bool success = redirectable_slaves.empty() ? true : redirects > 0;
if (success)
{
@ -4020,34 +3898,6 @@ static bool switchover_wait_slaves_catchup(const ServerVector& slaves, const Gti
return success;
}
/**
* Starts a new slave connection on a server. Should be used on a demoted master server.
*
* @param mon Cluster monitor
* @param old_master The server which will start replication
* @param new_master Replication target
* @return True if commands were accepted. This does not guarantee that replication proceeds
* successfully.
*/
static bool switchover_start_slave(MariaDBMonitor* mon, MXS_MONITORED_SERVER* old_master, SERVER* new_master)
{
bool rval = false;
std::string change_cmd = generate_change_master_cmd(mon, new_master->name, new_master->port);
if (mxs_mysql_query(old_master->con, change_cmd.c_str()) == 0 &&
mxs_mysql_query(old_master->con, "START SLAVE;") == 0)
{
MXS_NOTICE("Old master '%s' starting replication from '%s'.",
old_master->server->unique_name, new_master->unique_name);
rval = true;
}
else
{
MXS_ERROR("Old master '%s' could not start replication: '%s'.",
old_master->server->unique_name, mysql_error(old_master->con));
}
return rval;
}
/**
* Get MariaDB connection error strings from all the given servers, form one string.
*
@ -4256,13 +4106,13 @@ static bool do_switchover(MariaDBMonitor* mon, MXS_MONITORED_SERVER* current_mas
catchup_and_promote_success = true;
// Step 5: Redirect slaves and start replication on old master.
ServerVector redirected_slaves;
bool start_ok = switchover_start_slave(mon, demotion_target, promotion_target->server);
bool start_ok = mon->switchover_start_slave(demotion_target, promotion_target->server);
if (start_ok)
{
redirected_slaves.push_back(demotion_target);
}
int redirects = redirect_slaves(mon, promotion_target,
redirectable_slaves, &redirected_slaves);
int redirects = mon->redirect_slaves(promotion_target, redirectable_slaves,
&redirected_slaves);
bool success = redirectable_slaves.empty() ? start_ok : start_ok || redirects > 0;
if (success)
@ -4314,7 +4164,7 @@ static bool do_switchover(MariaDBMonitor* mon, MXS_MONITORED_SERVER* current_mas
// Try to reactivate external replication if any.
if (mon->external_master_port != PORT_UNKNOWN)
{
start_external_replication(mon, new_master, err_out);
mon->start_external_replication(new_master, err_out);
}
}
}
@ -4489,52 +4339,6 @@ static bool get_joinable_servers(MariaDBMonitor* mon, ServerVector* output)
return comm_ok;
}
/**
* (Re)join given servers to the cluster. The servers in the array are assumed to be joinable.
* Usually the list is created by get_joinable_servers().
*
* @param mon Cluster monitor
* @param joinable_servers Which servers to rejoin
* @return The number of servers successfully rejoined
*/
static uint32_t do_rejoin(MariaDBMonitor* mon, const ServerVector& joinable_servers)
{
SERVER* master = mon->master->server;
uint32_t servers_joined = 0;
if (!joinable_servers.empty())
{
string change_cmd = generate_change_master_cmd(mon, master->name, master->port);
for (ServerVector::const_iterator iter = joinable_servers.begin();
iter != joinable_servers.end();
iter++)
{
MXS_MONITORED_SERVER* joinable = *iter;
const char* name = joinable->server->unique_name;
const char* master_name = master->unique_name;
MySqlServerInfo* redir_info = get_server_info(mon, joinable);
bool op_success;
if (redir_info->n_slaves_configured == 0)
{
MXS_NOTICE("Directing standalone server '%s' to replicate from '%s'.", name, master_name);
op_success = join_cluster(joinable, change_cmd.c_str());
}
else
{
MXS_NOTICE("Server '%s' is replicating from a server other than '%s', "
"redirecting it to '%s'.", name, master_name, master_name);
op_success = redirect_one_slave(joinable, change_cmd.c_str());
}
if (op_success)
{
servers_joined++;
}
}
}
return servers_joined;
}
/**
* Joins a standalone server to the cluster.
*
@ -4542,7 +4346,7 @@ static uint32_t do_rejoin(MariaDBMonitor* mon, const ServerVector& joinable_serv
* @param change_cmd Change master command
* @return True if commands were accepted by server
*/
static bool join_cluster(MXS_MONITORED_SERVER* server, const char* change_cmd)
bool MariaDBMonitor::join_cluster(MXS_MONITORED_SERVER* server, const char* change_cmd)
{
/* Server does not have slave connections. This operation can fail, or the resulting
* replication may end up broken. */
@ -4575,17 +4379,6 @@ static void disable_setting(MariaDBMonitor* mon, const char* setting)
monitorAddParameters(mon->monitor, &p);
}
/**
* Is the cluster a valid rejoin target
*
* @param mon Cluster monitor
* @return True, if cluster can be joined
*/
static bool cluster_can_be_joined(MariaDBMonitor* mon)
{
return (mon->master != NULL && SERVER_IS_MASTER(mon->master->server) && mon->master_gtid_domain >= 0);
}
/**
* Scan a server id from a string.
*