
Repurposed the Replicator from the CDC integration project as a replication event processing service. It is similar to the CDC version of the Replicator and is still in the same namespace but it lacks all of the cross-thread communication that was a part of the integration project.
448 lines
11 KiB
C++
448 lines
11 KiB
C++
/*
|
|
* Copyright (c) 2019 MariaDB Corporation Ab
|
|
*
|
|
* Use of this software is governed by the Business Source License included
|
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
|
*
|
|
* Change Date: 2022-01-01
|
|
*
|
|
* On the date above, in accordance with the Business Source License, use
|
|
* of this software will be governed by version 2 or later of the General
|
|
* Public License.
|
|
*/
|
|
|
|
#include "avrorouter.hh"
|
|
#include "replicator.hh"
|
|
|
|
#include <atomic>
|
|
#include <chrono>
|
|
#include <cstdint>
|
|
#include <future>
|
|
#include <fstream>
|
|
#include <sstream>
|
|
#include <thread>
|
|
#include <tuple>
|
|
#include <unordered_map>
|
|
#include <vector>
|
|
|
|
#include <mysql.h>
|
|
#include <mariadb_rpl.h>
|
|
#include <errmsg.h>
|
|
|
|
#include <maxscale/query_classifier.hh>
|
|
#include <maxscale/buffer.hh>
|
|
#include <maxscale/utils.hh>
|
|
|
|
// Private headers
|
|
#include "sql.hh"
|
|
|
|
|
|
using std::chrono::duration_cast;
|
|
using Clock = std::chrono::steady_clock;
|
|
using Timepoint = Clock::time_point;
|
|
using std::chrono::milliseconds;
|
|
|
|
namespace
|
|
{
|
|
std::vector<cdc::Server> service_to_servers(SERVICE* service)
|
|
{
|
|
std::vector<cdc::Server> servers;
|
|
|
|
for (auto s = service->dbref; s; s = s->next)
|
|
{
|
|
if (s->active && s->server->is_master())
|
|
{
|
|
// TODO: per-server credentials aren't exposed in the public class
|
|
servers.push_back({s->server->address, s->server->port, service->user, service->password});
|
|
}
|
|
}
|
|
|
|
return servers;
|
|
}
|
|
}
|
|
|
|
namespace cdc
|
|
{
|
|
|
|
|
|
// A very small daemon. The main class that drives the whole replication process
|
|
class Replicator::Imp
|
|
{
|
|
public:
|
|
Imp& operator=(Imp&) = delete;
|
|
Imp(Imp&) = delete;
|
|
|
|
// Flag used in GTID events to signal statements that perform an implicit commit
|
|
static constexpr int IMPLICIT_COMMIT_FLAG = 0x1;
|
|
|
|
// Creates a new replication stream and starts it
|
|
Imp(const Config& cnf, Rpl* rpl);
|
|
|
|
// Check if the replicator is still OK
|
|
bool ok() const;
|
|
|
|
~Imp();
|
|
|
|
private:
|
|
static const std::string STATEFILE_DIR;
|
|
static const std::string STATEFILE_NAME;
|
|
static const std::string STATEFILE_TMP_SUFFIX;
|
|
|
|
bool connect();
|
|
void process_events();
|
|
bool process_one_event(SQL::Event& event);
|
|
bool load_gtid_state();
|
|
void save_gtid_state() const;
|
|
|
|
Config m_cnf; // The configuration the stream was started with
|
|
std::unique_ptr<SQL> m_sql; // Database connection
|
|
std::atomic<bool> m_running {true}; // Whether the stream is running
|
|
std::atomic<bool> m_should_stop {false}; // Set to true when doing a controlled shutdown
|
|
std::atomic<bool> m_safe_to_stop {false}; // Whether it safe to stop the processing
|
|
std::string m_gtid; // GTID position to start from
|
|
std::string m_current_gtid; // GTID of the transaction being processed
|
|
bool m_implicit_commit {false}; // Commit after next query event
|
|
Rpl* m_rpl; // Class that handles the replicated events
|
|
|
|
// NOTE: must be declared last
|
|
std::thread m_thr; // Thread that receives the replication events
|
|
};
|
|
|
|
const std::string Replicator::Imp::STATEFILE_DIR = "./";
|
|
const std::string Replicator::Imp::STATEFILE_NAME = "current_gtid.txt";
|
|
const std::string Replicator::Imp::STATEFILE_TMP_SUFFIX = ".tmp";
|
|
|
|
Replicator::Imp::Imp(const Config& cnf, Rpl* rpl)
|
|
: m_cnf(cnf)
|
|
, m_gtid(cnf.gtid)
|
|
, m_rpl(rpl)
|
|
, m_thr(std::thread(&Imp::process_events, this))
|
|
{
|
|
mxb_assert(m_rpl);
|
|
}
|
|
|
|
bool Replicator::Imp::ok() const
|
|
{
|
|
return m_running;
|
|
}
|
|
|
|
bool Replicator::Imp::connect()
|
|
{
|
|
cdc::Server old_server = {};
|
|
|
|
if (m_sql)
|
|
{
|
|
if (m_sql->errnum())
|
|
{
|
|
old_server = m_sql->server();
|
|
m_sql.reset();
|
|
}
|
|
else
|
|
{
|
|
// We already have a connection
|
|
return true;
|
|
}
|
|
}
|
|
|
|
bool rval = false;
|
|
std::string err;
|
|
|
|
auto servers = service_to_servers(m_cnf.service);
|
|
std::tie(err, m_sql) = SQL::connect(servers);
|
|
|
|
if (!err.empty())
|
|
{
|
|
if (!servers.empty())
|
|
{
|
|
// We had a valid master candidate but we couldn't connect to it
|
|
MXB_ERROR("%s", err.c_str());
|
|
}
|
|
}
|
|
else
|
|
{
|
|
mxb_assert(m_sql);
|
|
std::string gtid_start_pos = "SET @slave_connect_state='" + m_gtid + "'";
|
|
|
|
// Queries required to start GTID replication
|
|
std::vector<std::string> queries =
|
|
{
|
|
"SET @master_binlog_checksum = @@global.binlog_checksum",
|
|
"SET @mariadb_slave_capability=4",
|
|
gtid_start_pos,
|
|
"SET @slave_gtid_strict_mode=1",
|
|
"SET @slave_gtid_ignore_duplicates=1",
|
|
"SET NAMES latin1"
|
|
};
|
|
|
|
if (!m_sql->query(queries))
|
|
{
|
|
MXB_ERROR("Failed to prepare connection: %s", m_sql->error().c_str());
|
|
}
|
|
else if (!m_sql->replicate(m_cnf.server_id))
|
|
{
|
|
MXB_ERROR("Failed to open replication channel: %s", m_sql->error().c_str());
|
|
}
|
|
else
|
|
{
|
|
if (old_server.host != m_sql->server().host || old_server.port != m_sql->server().port)
|
|
{
|
|
MXB_NOTICE("Started replicating from [%s]:%d at GTID '%s'", m_sql->server().host.c_str(),
|
|
m_sql->server().port, m_gtid.c_str());
|
|
}
|
|
rval = true;
|
|
}
|
|
}
|
|
|
|
if (!rval)
|
|
{
|
|
m_sql.reset();
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
void Replicator::Imp::process_events()
|
|
{
|
|
pthread_setname_np(m_thr.native_handle(), "cdc::Replicator");
|
|
|
|
if (!load_gtid_state())
|
|
{
|
|
m_running = false;
|
|
}
|
|
|
|
qc_thread_init(QC_INIT_BOTH);
|
|
|
|
if (!m_gtid.empty())
|
|
{
|
|
m_rpl->set_gtid(gtid_pos_t::from_string(m_gtid));
|
|
}
|
|
|
|
while (m_running)
|
|
{
|
|
if (!connect())
|
|
{
|
|
// We failed to connect to any of the servers, try again in a few seconds
|
|
std::this_thread::sleep_for(milliseconds(5000));
|
|
continue;
|
|
}
|
|
|
|
auto event = m_sql->fetch_event();
|
|
|
|
if (event)
|
|
{
|
|
if (!process_one_event(event))
|
|
{
|
|
/**
|
|
* Fatal error encountered. Fixing it might require manual intervention so
|
|
* the safest thing to do is to stop processing data.
|
|
*/
|
|
m_running = false;
|
|
}
|
|
}
|
|
else if (m_sql->errnum() == CR_SERVER_LOST)
|
|
{
|
|
if (m_should_stop)
|
|
{
|
|
if (m_current_gtid == m_gtid)
|
|
{
|
|
/**
|
|
* The latest committed GTID points to the current GTID being processed,
|
|
* no transaction in progress.
|
|
*/
|
|
m_safe_to_stop = true;
|
|
}
|
|
else
|
|
{
|
|
MXB_WARNING("Lost connection to server '%s:%d' when processing GTID '%s' while a "
|
|
"controlled shutdown was in progress. Attempting to roll back partial "
|
|
"transactions.", m_sql->server().host.c_str(), m_sql->server().port,
|
|
m_current_gtid.c_str());
|
|
m_running = false;
|
|
}
|
|
}
|
|
|
|
// The network error will be detected at the start of the next round
|
|
}
|
|
else
|
|
{
|
|
MXB_ERROR("Failed to read replicated event: %s", m_sql->error().c_str());
|
|
break;
|
|
}
|
|
|
|
if (m_safe_to_stop)
|
|
{
|
|
MXB_NOTICE("Stopped at GTID '%s'", m_gtid.c_str());
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
std::string to_gtid_string(const MARIADB_RPL_EVENT& event)
|
|
{
|
|
std::stringstream ss;
|
|
ss << event.event.gtid.domain_id << '-' << event.server_id << '-' << event.event.gtid.sequence_nr;
|
|
return ss.str();
|
|
}
|
|
|
|
bool Replicator::Imp::load_gtid_state()
|
|
{
|
|
bool rval = false;
|
|
std::string filename = m_cnf.statedir + STATEFILE_NAME;
|
|
std::ifstream statefile(filename);
|
|
std::string gtid;
|
|
statefile >> gtid;
|
|
|
|
if (statefile)
|
|
{
|
|
rval = true;
|
|
|
|
if (!gtid.empty())
|
|
{
|
|
m_gtid = gtid;
|
|
MXB_NOTICE("Continuing from GTID '%s'", m_gtid.c_str());
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (errno == ENOENT)
|
|
{
|
|
// No GTID file, use the GTID provided in the configuration
|
|
rval = true;
|
|
}
|
|
else
|
|
{
|
|
MXB_ERROR("Failed to load current GTID state from file '%s': %d, %s",
|
|
filename.c_str(), errno, mxb_strerror(errno));
|
|
}
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
void Replicator::Imp::save_gtid_state() const
|
|
{
|
|
std::ofstream statefile(m_cnf.statedir + "/" + STATEFILE_NAME);
|
|
statefile << m_current_gtid << std::endl;
|
|
|
|
if (!statefile)
|
|
{
|
|
MXB_ERROR("Failed to store current GTID state inside '%s': %d, %s",
|
|
m_cnf.statedir.c_str(), errno, mxb_strerror(errno));
|
|
}
|
|
}
|
|
|
|
bool Replicator::Imp::process_one_event(SQL::Event& event)
|
|
{
|
|
bool commit = false;
|
|
|
|
switch (event->event_type)
|
|
{
|
|
case ROTATE_EVENT:
|
|
if (m_should_stop)
|
|
{
|
|
// Rotating to a new binlog file, a safe place to stop
|
|
m_safe_to_stop = true;
|
|
}
|
|
break;
|
|
|
|
case GTID_EVENT:
|
|
if (m_should_stop)
|
|
{
|
|
// Start of a new transaction, a safe place to stop
|
|
m_safe_to_stop = true;
|
|
break;
|
|
}
|
|
else if (event->event.gtid.flags & IMPLICIT_COMMIT_FLAG)
|
|
{
|
|
m_implicit_commit = true;
|
|
}
|
|
|
|
m_current_gtid = to_gtid_string(*event);
|
|
MXB_INFO("GTID: %s", m_current_gtid.c_str());
|
|
break;
|
|
|
|
case XID_EVENT:
|
|
commit = true;
|
|
MXB_INFO("XID for GTID '%s': %lu", m_current_gtid.c_str(), event->event.xid.transaction_nr);
|
|
|
|
if (m_should_stop)
|
|
{
|
|
// End of a transaction, a safe place to stop
|
|
m_safe_to_stop = true;
|
|
break;
|
|
}
|
|
break;
|
|
|
|
case QUERY_EVENT:
|
|
case USER_VAR_EVENT:
|
|
if (m_implicit_commit)
|
|
{
|
|
m_implicit_commit = false;
|
|
commit = true;
|
|
}
|
|
break;
|
|
|
|
default:
|
|
// Ignore the event
|
|
break;
|
|
}
|
|
|
|
bool rval = true;
|
|
REP_HEADER hdr;
|
|
uint8_t* ptr = m_sql->event_data() + 20;
|
|
|
|
MARIADB_RPL_EVENT& ev = *event;
|
|
hdr.event_size = ev.event_length + (m_rpl->have_checksums() ? 4 : 0);
|
|
hdr.event_type = ev.event_type;
|
|
hdr.flags = ev.flags;
|
|
hdr.next_pos = ev.next_event_pos;
|
|
hdr.ok = ev.ok;
|
|
hdr.payload_len = hdr.event_size + 4;
|
|
hdr.seqno = 0;
|
|
hdr.serverid = ev.server_id;
|
|
hdr.timestamp = ev.timestamp;
|
|
|
|
m_rpl->handle_event(hdr, ptr);
|
|
|
|
if (commit)
|
|
{
|
|
m_rpl->flush();
|
|
notify_all_clients(m_cnf.service);
|
|
m_gtid = m_current_gtid;
|
|
save_gtid_state();
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
Replicator::Imp::~Imp()
|
|
{
|
|
m_should_stop = true;
|
|
m_thr.join();
|
|
}
|
|
|
|
//
|
|
// The public API
|
|
//
|
|
|
|
// static
|
|
std::unique_ptr<Replicator> Replicator::start(const Config& cnf, Rpl* rpl)
|
|
{
|
|
return std::unique_ptr<Replicator>(new Replicator(cnf, rpl));
|
|
}
|
|
|
|
bool Replicator::ok() const
|
|
{
|
|
return m_imp->ok();
|
|
}
|
|
|
|
Replicator::~Replicator()
|
|
{
|
|
}
|
|
|
|
Replicator::Replicator(const Config& cnf, Rpl* rpl)
|
|
: m_imp(new Imp(cnf, rpl))
|
|
{
|
|
}
|
|
}
|