Files
MaxScale/server/modules/routing/avrorouter/replicator.cc
Markus Mäkelä 7723e7c933 Add modified Replicator for avrorouter
Repurposed the Replicator from the CDC integration project as a
replication event processing service. It is similar to the CDC version of
the Replicator and is still in the same namespace but it lacks all of the
cross-thread communication that was a part of the integration project.
2019-04-17 14:13:44 +03:00

448 lines
11 KiB
C++

/*
* Copyright (c) 2019 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2022-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "avrorouter.hh"
#include "replicator.hh"
#include <atomic>
#include <chrono>
#include <cstdint>
#include <future>
#include <fstream>
#include <sstream>
#include <thread>
#include <tuple>
#include <unordered_map>
#include <vector>
#include <mysql.h>
#include <mariadb_rpl.h>
#include <errmsg.h>
#include <maxscale/query_classifier.hh>
#include <maxscale/buffer.hh>
#include <maxscale/utils.hh>
// Private headers
#include "sql.hh"
using std::chrono::duration_cast;
using Clock = std::chrono::steady_clock;
using Timepoint = Clock::time_point;
using std::chrono::milliseconds;
namespace
{
std::vector<cdc::Server> service_to_servers(SERVICE* service)
{
std::vector<cdc::Server> servers;
for (auto s = service->dbref; s; s = s->next)
{
if (s->active && s->server->is_master())
{
// TODO: per-server credentials aren't exposed in the public class
servers.push_back({s->server->address, s->server->port, service->user, service->password});
}
}
return servers;
}
}
namespace cdc
{
// A very small daemon. The main class that drives the whole replication process
class Replicator::Imp
{
public:
Imp& operator=(Imp&) = delete;
Imp(Imp&) = delete;
// Flag used in GTID events to signal statements that perform an implicit commit
static constexpr int IMPLICIT_COMMIT_FLAG = 0x1;
// Creates a new replication stream and starts it
Imp(const Config& cnf, Rpl* rpl);
// Check if the replicator is still OK
bool ok() const;
~Imp();
private:
static const std::string STATEFILE_DIR;
static const std::string STATEFILE_NAME;
static const std::string STATEFILE_TMP_SUFFIX;
bool connect();
void process_events();
bool process_one_event(SQL::Event& event);
bool load_gtid_state();
void save_gtid_state() const;
Config m_cnf; // The configuration the stream was started with
std::unique_ptr<SQL> m_sql; // Database connection
std::atomic<bool> m_running {true}; // Whether the stream is running
std::atomic<bool> m_should_stop {false}; // Set to true when doing a controlled shutdown
std::atomic<bool> m_safe_to_stop {false}; // Whether it safe to stop the processing
std::string m_gtid; // GTID position to start from
std::string m_current_gtid; // GTID of the transaction being processed
bool m_implicit_commit {false}; // Commit after next query event
Rpl* m_rpl; // Class that handles the replicated events
// NOTE: must be declared last
std::thread m_thr; // Thread that receives the replication events
};
const std::string Replicator::Imp::STATEFILE_DIR = "./";
const std::string Replicator::Imp::STATEFILE_NAME = "current_gtid.txt";
const std::string Replicator::Imp::STATEFILE_TMP_SUFFIX = ".tmp";
Replicator::Imp::Imp(const Config& cnf, Rpl* rpl)
: m_cnf(cnf)
, m_gtid(cnf.gtid)
, m_rpl(rpl)
, m_thr(std::thread(&Imp::process_events, this))
{
mxb_assert(m_rpl);
}
bool Replicator::Imp::ok() const
{
return m_running;
}
bool Replicator::Imp::connect()
{
cdc::Server old_server = {};
if (m_sql)
{
if (m_sql->errnum())
{
old_server = m_sql->server();
m_sql.reset();
}
else
{
// We already have a connection
return true;
}
}
bool rval = false;
std::string err;
auto servers = service_to_servers(m_cnf.service);
std::tie(err, m_sql) = SQL::connect(servers);
if (!err.empty())
{
if (!servers.empty())
{
// We had a valid master candidate but we couldn't connect to it
MXB_ERROR("%s", err.c_str());
}
}
else
{
mxb_assert(m_sql);
std::string gtid_start_pos = "SET @slave_connect_state='" + m_gtid + "'";
// Queries required to start GTID replication
std::vector<std::string> queries =
{
"SET @master_binlog_checksum = @@global.binlog_checksum",
"SET @mariadb_slave_capability=4",
gtid_start_pos,
"SET @slave_gtid_strict_mode=1",
"SET @slave_gtid_ignore_duplicates=1",
"SET NAMES latin1"
};
if (!m_sql->query(queries))
{
MXB_ERROR("Failed to prepare connection: %s", m_sql->error().c_str());
}
else if (!m_sql->replicate(m_cnf.server_id))
{
MXB_ERROR("Failed to open replication channel: %s", m_sql->error().c_str());
}
else
{
if (old_server.host != m_sql->server().host || old_server.port != m_sql->server().port)
{
MXB_NOTICE("Started replicating from [%s]:%d at GTID '%s'", m_sql->server().host.c_str(),
m_sql->server().port, m_gtid.c_str());
}
rval = true;
}
}
if (!rval)
{
m_sql.reset();
}
return rval;
}
void Replicator::Imp::process_events()
{
pthread_setname_np(m_thr.native_handle(), "cdc::Replicator");
if (!load_gtid_state())
{
m_running = false;
}
qc_thread_init(QC_INIT_BOTH);
if (!m_gtid.empty())
{
m_rpl->set_gtid(gtid_pos_t::from_string(m_gtid));
}
while (m_running)
{
if (!connect())
{
// We failed to connect to any of the servers, try again in a few seconds
std::this_thread::sleep_for(milliseconds(5000));
continue;
}
auto event = m_sql->fetch_event();
if (event)
{
if (!process_one_event(event))
{
/**
* Fatal error encountered. Fixing it might require manual intervention so
* the safest thing to do is to stop processing data.
*/
m_running = false;
}
}
else if (m_sql->errnum() == CR_SERVER_LOST)
{
if (m_should_stop)
{
if (m_current_gtid == m_gtid)
{
/**
* The latest committed GTID points to the current GTID being processed,
* no transaction in progress.
*/
m_safe_to_stop = true;
}
else
{
MXB_WARNING("Lost connection to server '%s:%d' when processing GTID '%s' while a "
"controlled shutdown was in progress. Attempting to roll back partial "
"transactions.", m_sql->server().host.c_str(), m_sql->server().port,
m_current_gtid.c_str());
m_running = false;
}
}
// The network error will be detected at the start of the next round
}
else
{
MXB_ERROR("Failed to read replicated event: %s", m_sql->error().c_str());
break;
}
if (m_safe_to_stop)
{
MXB_NOTICE("Stopped at GTID '%s'", m_gtid.c_str());
break;
}
}
}
std::string to_gtid_string(const MARIADB_RPL_EVENT& event)
{
std::stringstream ss;
ss << event.event.gtid.domain_id << '-' << event.server_id << '-' << event.event.gtid.sequence_nr;
return ss.str();
}
bool Replicator::Imp::load_gtid_state()
{
bool rval = false;
std::string filename = m_cnf.statedir + STATEFILE_NAME;
std::ifstream statefile(filename);
std::string gtid;
statefile >> gtid;
if (statefile)
{
rval = true;
if (!gtid.empty())
{
m_gtid = gtid;
MXB_NOTICE("Continuing from GTID '%s'", m_gtid.c_str());
}
}
else
{
if (errno == ENOENT)
{
// No GTID file, use the GTID provided in the configuration
rval = true;
}
else
{
MXB_ERROR("Failed to load current GTID state from file '%s': %d, %s",
filename.c_str(), errno, mxb_strerror(errno));
}
}
return rval;
}
void Replicator::Imp::save_gtid_state() const
{
std::ofstream statefile(m_cnf.statedir + "/" + STATEFILE_NAME);
statefile << m_current_gtid << std::endl;
if (!statefile)
{
MXB_ERROR("Failed to store current GTID state inside '%s': %d, %s",
m_cnf.statedir.c_str(), errno, mxb_strerror(errno));
}
}
bool Replicator::Imp::process_one_event(SQL::Event& event)
{
bool commit = false;
switch (event->event_type)
{
case ROTATE_EVENT:
if (m_should_stop)
{
// Rotating to a new binlog file, a safe place to stop
m_safe_to_stop = true;
}
break;
case GTID_EVENT:
if (m_should_stop)
{
// Start of a new transaction, a safe place to stop
m_safe_to_stop = true;
break;
}
else if (event->event.gtid.flags & IMPLICIT_COMMIT_FLAG)
{
m_implicit_commit = true;
}
m_current_gtid = to_gtid_string(*event);
MXB_INFO("GTID: %s", m_current_gtid.c_str());
break;
case XID_EVENT:
commit = true;
MXB_INFO("XID for GTID '%s': %lu", m_current_gtid.c_str(), event->event.xid.transaction_nr);
if (m_should_stop)
{
// End of a transaction, a safe place to stop
m_safe_to_stop = true;
break;
}
break;
case QUERY_EVENT:
case USER_VAR_EVENT:
if (m_implicit_commit)
{
m_implicit_commit = false;
commit = true;
}
break;
default:
// Ignore the event
break;
}
bool rval = true;
REP_HEADER hdr;
uint8_t* ptr = m_sql->event_data() + 20;
MARIADB_RPL_EVENT& ev = *event;
hdr.event_size = ev.event_length + (m_rpl->have_checksums() ? 4 : 0);
hdr.event_type = ev.event_type;
hdr.flags = ev.flags;
hdr.next_pos = ev.next_event_pos;
hdr.ok = ev.ok;
hdr.payload_len = hdr.event_size + 4;
hdr.seqno = 0;
hdr.serverid = ev.server_id;
hdr.timestamp = ev.timestamp;
m_rpl->handle_event(hdr, ptr);
if (commit)
{
m_rpl->flush();
notify_all_clients(m_cnf.service);
m_gtid = m_current_gtid;
save_gtid_state();
}
return rval;
}
Replicator::Imp::~Imp()
{
m_should_stop = true;
m_thr.join();
}
//
// The public API
//
// static
std::unique_ptr<Replicator> Replicator::start(const Config& cnf, Rpl* rpl)
{
return std::unique_ptr<Replicator>(new Replicator(cnf, rpl));
}
bool Replicator::ok() const
{
return m_imp->ok();
}
Replicator::~Replicator()
{
}
Replicator::Replicator(const Config& cnf, Rpl* rpl)
: m_imp(new Imp(cnf, rpl))
{
}
}