Relay log clear supports multiple slave connections
Now waits for the relay log of the correct slave connection.
This commit is contained in:
@ -606,7 +606,7 @@ bool MariaDBMonitor::failover_perform(MariaDBServer* promotion_target, MariaDBSe
|
||||
|
||||
bool rval = false;
|
||||
// Step 2: Wait until relay log consumed.
|
||||
if (promotion_target->failover_wait_relay_log(seconds_remaining, error_out))
|
||||
if (promotion_target->failover_wait_relay_log(demotion_target, seconds_remaining, error_out))
|
||||
{
|
||||
time_t step2_time = time(NULL);
|
||||
int seconds_step2 = difftime(step2_time, step1_time);
|
||||
@ -858,8 +858,8 @@ bool MariaDBMonitor::wait_cluster_stabilization(MariaDBServer* new_master, const
|
||||
wait_list.erase(wait_list.begin() + i);
|
||||
repl_fails++;
|
||||
}
|
||||
else if (GtidList::events_ahead(target, slave->m_gtid_current_pos,
|
||||
GtidList::MISSING_DOMAIN_IGNORE) == 0)
|
||||
else if (target.events_ahead(slave->m_gtid_current_pos,
|
||||
GtidList::MISSING_DOMAIN_IGNORE) == 0)
|
||||
{
|
||||
// This slave has reached the same gtid as master, remove from list
|
||||
wait_list.erase(wait_list.begin() + i);
|
||||
|
@ -84,7 +84,7 @@ bool GtidList::can_replicate_from(const GtidList& master_gtid)
|
||||
{
|
||||
/* The result of this function is false if the source and master have a common domain id where
|
||||
* the source is ahead of the master. */
|
||||
return (events_ahead(*this, master_gtid, MISSING_DOMAIN_IGNORE) == 0);
|
||||
return (events_ahead(master_gtid, MISSING_DOMAIN_IGNORE) == 0);
|
||||
}
|
||||
|
||||
bool GtidList::empty() const
|
||||
@ -97,17 +97,16 @@ bool GtidList::operator == (const GtidList& rhs) const
|
||||
return m_triplets == rhs.m_triplets;
|
||||
}
|
||||
|
||||
uint64_t GtidList::events_ahead(const GtidList& lhs, const GtidList& rhs,
|
||||
substraction_mode_t domain_substraction_mode)
|
||||
int64_t GtidList::events_ahead(const GtidList& rhs, substraction_mode_t domain_substraction_mode) const
|
||||
{
|
||||
const size_t n_lhs = lhs.m_triplets.size();
|
||||
const size_t n_lhs = m_triplets.size();
|
||||
const size_t n_rhs = rhs.m_triplets.size();
|
||||
size_t ind_lhs = 0, ind_rhs = 0;
|
||||
uint64_t events = 0;
|
||||
|
||||
// GtidLists are assumed to be ordered by domain in ascending order.
|
||||
while (ind_lhs < n_lhs && ind_rhs < n_rhs)
|
||||
{
|
||||
auto lhs_triplet = lhs.m_triplets[ind_lhs];
|
||||
auto lhs_triplet = m_triplets[ind_lhs];
|
||||
auto rhs_triplet = rhs.m_triplets[ind_rhs];
|
||||
// Server id -1 should never be saved in a real gtid variable.
|
||||
mxb_assert(lhs_triplet.m_server_id != SERVER_ID_UNKNOWN &&
|
||||
@ -131,7 +130,7 @@ uint64_t GtidList::events_ahead(const GtidList& lhs, const GtidList& rhs,
|
||||
// Domains match, check sequences.
|
||||
if (lhs_triplet.m_sequence > rhs_triplet.m_sequence)
|
||||
{
|
||||
/* Same domains, but lhs sequence is equal or ahead of rhs sequence. */
|
||||
/* Same domains, but lhs sequence is ahead of rhs sequence. */
|
||||
events += lhs_triplet.m_sequence - rhs_triplet.m_sequence;
|
||||
}
|
||||
// Continue to next domains.
|
||||
@ -139,7 +138,7 @@ uint64_t GtidList::events_ahead(const GtidList& lhs, const GtidList& rhs,
|
||||
ind_rhs++;
|
||||
}
|
||||
}
|
||||
return events;
|
||||
return (events > INT64_MAX) ? INT64_MAX : events;
|
||||
}
|
||||
|
||||
Gtid Gtid::from_string(const char* str, char** endptr)
|
||||
|
@ -129,20 +129,18 @@ public:
|
||||
bool operator == (const GtidList& rhs) const;
|
||||
|
||||
/**
|
||||
* Calculate the number of events between two gtid:s with possibly multiple triplets. The
|
||||
* Calculate the number of events this GtidList is ahead of the given GtidList. The
|
||||
* result is always 0 or greater: if a sequence number of a domain on rhs is greater than on the same
|
||||
* domain on lhs, the sequences are considered identical. Missing domains are handled depending on the
|
||||
* value of @c domain_substraction_mode.
|
||||
* domain on the calling GtidList, the sequences are considered identical. Missing domains are
|
||||
* handled depending on the value of @c domain_substraction_mode.
|
||||
*
|
||||
* @param lhs The value substracted from
|
||||
* @param io_pos The value doing the substracting
|
||||
* @param domain_substraction_mode How domains that exist on one side but not the other are handled. If
|
||||
* MISSING_DOMAIN_IGNORE, these are simply ignored. If MISSING_DOMAIN_LHS_ADD, the sequence number on lhs
|
||||
* is added to the total difference.
|
||||
* @param rhs The value doing the substracting
|
||||
* @param domain_substraction_mode How domains that exist on the caller but not on @c rhs are handled.
|
||||
* If MISSING_DOMAIN_IGNORE, these are simply ignored. If MISSING_DOMAIN_LHS_ADD,
|
||||
* the sequence number on lhs is added to the total difference.
|
||||
* @return The number of events between the two gtid:s
|
||||
*/
|
||||
static uint64_t events_ahead(const GtidList& lhs, const GtidList& rhs,
|
||||
substraction_mode_t domain_substraction_mode);
|
||||
int64_t events_ahead(const GtidList& rhs, substraction_mode_t domain_substraction_mode) const;
|
||||
|
||||
/**
|
||||
* Return an individual gtid with the given domain.
|
||||
|
@ -73,15 +73,19 @@ void NodeData::reset_indexes()
|
||||
in_stack = false;
|
||||
}
|
||||
|
||||
int64_t MariaDBServer::relay_log_events()
|
||||
int64_t MariaDBServer::relay_log_events(const MariaDBServer* master)
|
||||
{
|
||||
/* The events_ahead-call below ignores domains where current_pos is ahead of io_pos. This situation is
|
||||
* rare but is possible (I guess?) if the server is replicating a domain from multiple masters
|
||||
* and decides to process events from one relay log before getting new events to the other. In
|
||||
* any case, such events are obsolete and the server can be considered to have processed such logs. */
|
||||
// TODO: Fix for multisource repl
|
||||
return !m_slave_status.empty() ? GtidList::events_ahead(m_slave_status[0].gtid_io_pos, m_gtid_current_pos,
|
||||
GtidList::MISSING_DOMAIN_LHS_ADD) : 0;
|
||||
int64_t rval = -1;
|
||||
const SlaveStatus* sstatus = slave_connection_status(master);
|
||||
if (sstatus)
|
||||
{
|
||||
rval = sstatus->gtid_io_pos.events_ahead(m_gtid_current_pos, GtidList::MISSING_DOMAIN_IGNORE);
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
std::unique_ptr<QueryResult> MariaDBServer::execute_query(const string& query, std::string* errmsg_out)
|
||||
@ -397,7 +401,7 @@ bool MariaDBServer::wait_until_gtid(const GtidList& target, int timeout, json_t*
|
||||
if (update_gtids())
|
||||
{
|
||||
const GtidList& compare_to = use_binlog_pos ? m_gtid_binlog_pos : m_gtid_current_pos;
|
||||
if (GtidList::events_ahead(target, compare_to, GtidList::MISSING_DOMAIN_IGNORE) == 0)
|
||||
if (target.events_ahead(compare_to, GtidList::MISSING_DOMAIN_IGNORE) == 0)
|
||||
{
|
||||
gtid_reached = true;
|
||||
}
|
||||
@ -656,31 +660,37 @@ bool MariaDBServer::join_cluster(const string& change_cmd)
|
||||
return success;
|
||||
}
|
||||
|
||||
bool MariaDBServer::failover_wait_relay_log(int seconds_remaining, json_t** err_out)
|
||||
bool MariaDBServer::failover_wait_relay_log(const MariaDBServer* master, int seconds_remaining,
|
||||
json_t** err_out)
|
||||
{
|
||||
time_t begin = time(NULL);
|
||||
bool query_ok = true;
|
||||
bool io_pos_stable = true;
|
||||
while (relay_log_events() > 0 &&
|
||||
query_ok &&
|
||||
io_pos_stable &&
|
||||
difftime(time(NULL), begin) < seconds_remaining)
|
||||
int64_t events = relay_log_events(master);
|
||||
while (events > 0 && query_ok && io_pos_stable && difftime(time(NULL), begin) < seconds_remaining)
|
||||
{
|
||||
MXS_INFO("Relay log of server '%s' not yet empty, waiting to clear %" PRId64 " events.",
|
||||
name(), relay_log_events());
|
||||
const SlaveStatus* sstatus = slave_connection_status(master);
|
||||
mxb_assert(sstatus);
|
||||
GtidList old_gtid_io_pos = sstatus->gtid_io_pos;
|
||||
|
||||
// Sleep for a while before querying server again.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
||||
// TODO: check server version before entering failover.
|
||||
// TODO: fix for multisource
|
||||
GtidList old_gtid_io_pos = m_slave_status[0].gtid_io_pos;
|
||||
MXS_NOTICE("Relay log of server '%s' not yet empty, waiting to clear %" PRId64 " events.",
|
||||
name(), events);
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
// Update gtid:s first to make sure Gtid_IO_Pos is the more recent value.
|
||||
// It doesn't matter here, but is a general rule.
|
||||
query_ok = update_gtids() && do_show_slave_status();
|
||||
io_pos_stable = (old_gtid_io_pos == m_slave_status[0].gtid_io_pos);
|
||||
if (query_ok)
|
||||
{
|
||||
const SlaveStatus* new_sstatus = slave_connection_status(master);
|
||||
io_pos_stable = new_sstatus ? (old_gtid_io_pos == new_sstatus->gtid_io_pos) : false;
|
||||
events = relay_log_events(master);
|
||||
}
|
||||
}
|
||||
|
||||
bool rval = false;
|
||||
if (relay_log_events() == 0)
|
||||
if (events == 0 && query_ok && io_pos_stable)
|
||||
{
|
||||
rval = true;
|
||||
}
|
||||
@ -695,10 +705,9 @@ bool MariaDBServer::failover_wait_relay_log(int seconds_remaining, json_t** err_
|
||||
{
|
||||
reason = "Old master sent new event(s)";
|
||||
}
|
||||
else if (relay_log_events() < 0) // TODO: This is currently impossible
|
||||
else if (events < 0)
|
||||
{
|
||||
reason = "Invalid Gtid(s) (current_pos: " + m_gtid_current_pos.to_string() +
|
||||
", io_pos: " + m_slave_status[0].gtid_io_pos.to_string() + ")";
|
||||
reason = string_printf("Slave connection to '%s' was removed", master->name());
|
||||
}
|
||||
PRINT_MXS_JSON_ERROR(err_out, "Failover: %s while waiting for server '%s' to process relay log. "
|
||||
"Cancelling failover.", reason.c_str(), name());
|
||||
|
@ -176,11 +176,13 @@ public:
|
||||
void check_permissions();
|
||||
|
||||
/**
|
||||
* Calculate how many events are left in the relay log.
|
||||
* Calculate how many events are left in the relay log of the slave connection to 'master'.
|
||||
*
|
||||
* @return Number of events in relay log according to latest queried info.
|
||||
* @param master The master server from which the slave connection is replicating from
|
||||
* @return Number of events in relay log according to latest queried info. Negative on error,
|
||||
* e.g. the slave connection didn't exist.
|
||||
*/
|
||||
int64_t relay_log_events();
|
||||
int64_t relay_log_events(const MariaDBServer* master);
|
||||
|
||||
/**
|
||||
* Execute a query which returns data. The results are returned as a unique pointer to a QueryResult
|
||||
@ -399,12 +401,13 @@ public:
|
||||
/**
|
||||
* Waits until this server has processed all its relay log, or time is up.
|
||||
*
|
||||
* @param seconds_remaining How much time left
|
||||
* @param err_out Json error output
|
||||
* @param master The master (or relay) whose relay log should be waited on
|
||||
* @param seconds_remaining Maximum wait time
|
||||
* @param err_out Error output
|
||||
* @return True if relay log was processed within time limit, or false if time ran out
|
||||
* or an error occurred.
|
||||
*/
|
||||
bool failover_wait_relay_log(int seconds_remaining, json_t** err_out);
|
||||
bool failover_wait_relay_log(const MariaDBServer* master, int seconds_remaining, json_t** err_out);
|
||||
|
||||
/**
|
||||
* Check if the server can be demoted by switchover.
|
||||
|
Reference in New Issue
Block a user