Repurposed the Replicator from the CDC integration project as a replication event processing service. It is similar to the CDC version of the Replicator and is still in the same namespace but it lacks all of the cross-thread communication that was a part of the integration project.
		
			
				
	
	
		
			448 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			448 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
 * Copyright (c) 2019 MariaDB Corporation Ab
 | 
						|
 *
 | 
						|
 * Use of this software is governed by the Business Source License included
 | 
						|
 * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | 
						|
 *
 | 
						|
 * Change Date: 2022-01-01
 | 
						|
 *
 | 
						|
 * On the date above, in accordance with the Business Source License, use
 | 
						|
 * of this software will be governed by version 2 or later of the General
 | 
						|
 * Public License.
 | 
						|
 */
 | 
						|
 | 
						|
#include "avrorouter.hh"
 | 
						|
#include "replicator.hh"
 | 
						|
 | 
						|
#include <atomic>
 | 
						|
#include <chrono>
 | 
						|
#include <cstdint>
 | 
						|
#include <future>
 | 
						|
#include <fstream>
 | 
						|
#include <sstream>
 | 
						|
#include <thread>
 | 
						|
#include <tuple>
 | 
						|
#include <unordered_map>
 | 
						|
#include <vector>
 | 
						|
 | 
						|
#include <mysql.h>
 | 
						|
#include <mariadb_rpl.h>
 | 
						|
#include <errmsg.h>
 | 
						|
 | 
						|
#include <maxscale/query_classifier.hh>
 | 
						|
#include <maxscale/buffer.hh>
 | 
						|
#include <maxscale/utils.hh>
 | 
						|
 | 
						|
// Private headers
 | 
						|
#include "sql.hh"
 | 
						|
 | 
						|
 | 
						|
using std::chrono::duration_cast;
 | 
						|
using Clock = std::chrono::steady_clock;
 | 
						|
using Timepoint = Clock::time_point;
 | 
						|
using std::chrono::milliseconds;
 | 
						|
 | 
						|
namespace
 | 
						|
{
 | 
						|
std::vector<cdc::Server> service_to_servers(SERVICE* service)
 | 
						|
{
 | 
						|
    std::vector<cdc::Server> servers;
 | 
						|
 | 
						|
    for (auto s = service->dbref; s; s = s->next)
 | 
						|
    {
 | 
						|
        if (s->active && s->server->is_master())
 | 
						|
        {
 | 
						|
            // TODO: per-server credentials aren't exposed in the public class
 | 
						|
            servers.push_back({s->server->address, s->server->port, service->user, service->password});
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return servers;
 | 
						|
}
 | 
						|
}
 | 
						|
 | 
						|
namespace cdc
 | 
						|
{
 | 
						|
 | 
						|
 | 
						|
// A very small daemon. The main class that drives the whole replication process
 | 
						|
class Replicator::Imp
 | 
						|
{
 | 
						|
public:
 | 
						|
    Imp& operator=(Imp&) = delete;
 | 
						|
    Imp(Imp&) = delete;
 | 
						|
 | 
						|
    // Flag used in GTID events to signal statements that perform an implicit commit
 | 
						|
    static constexpr int IMPLICIT_COMMIT_FLAG = 0x1;
 | 
						|
 | 
						|
    // Creates a new replication stream and starts it
 | 
						|
    Imp(const Config& cnf, Rpl* rpl);
 | 
						|
 | 
						|
    // Check if the replicator is still OK
 | 
						|
    bool ok() const;
 | 
						|
 | 
						|
    ~Imp();
 | 
						|
 | 
						|
private:
 | 
						|
    static const std::string STATEFILE_DIR;
 | 
						|
    static const std::string STATEFILE_NAME;
 | 
						|
    static const std::string STATEFILE_TMP_SUFFIX;
 | 
						|
 | 
						|
    bool connect();
 | 
						|
    void process_events();
 | 
						|
    bool process_one_event(SQL::Event& event);
 | 
						|
    bool load_gtid_state();
 | 
						|
    void save_gtid_state() const;
 | 
						|
 | 
						|
    Config               m_cnf;                     // The configuration the stream was started with
 | 
						|
    std::unique_ptr<SQL> m_sql;                     // Database connection
 | 
						|
    std::atomic<bool>    m_running {true};          // Whether the stream is running
 | 
						|
    std::atomic<bool>    m_should_stop {false};     // Set to true when doing a controlled shutdown
 | 
						|
    std::atomic<bool>    m_safe_to_stop {false};    // Whether it safe to stop the processing
 | 
						|
    std::string          m_gtid;                    // GTID position to start from
 | 
						|
    std::string          m_current_gtid;            // GTID of the transaction being processed
 | 
						|
    bool                 m_implicit_commit {false}; // Commit after next query event
 | 
						|
    Rpl*                 m_rpl;                     // Class that handles the replicated events
 | 
						|
 | 
						|
    // NOTE: must be declared last
 | 
						|
    std::thread m_thr;      // Thread that receives the replication events
 | 
						|
};
 | 
						|
 | 
						|
const std::string Replicator::Imp::STATEFILE_DIR = "./";
 | 
						|
const std::string Replicator::Imp::STATEFILE_NAME = "current_gtid.txt";
 | 
						|
const std::string Replicator::Imp::STATEFILE_TMP_SUFFIX = ".tmp";
 | 
						|
 | 
						|
Replicator::Imp::Imp(const Config& cnf, Rpl* rpl)
 | 
						|
    : m_cnf(cnf)
 | 
						|
    , m_gtid(cnf.gtid)
 | 
						|
    , m_rpl(rpl)
 | 
						|
    , m_thr(std::thread(&Imp::process_events, this))
 | 
						|
{
 | 
						|
    mxb_assert(m_rpl);
 | 
						|
}
 | 
						|
 | 
						|
bool Replicator::Imp::ok() const
 | 
						|
{
 | 
						|
    return m_running;
 | 
						|
}
 | 
						|
 | 
						|
bool Replicator::Imp::connect()
 | 
						|
{
 | 
						|
    cdc::Server old_server = {};
 | 
						|
 | 
						|
    if (m_sql)
 | 
						|
    {
 | 
						|
        if (m_sql->errnum())
 | 
						|
        {
 | 
						|
            old_server = m_sql->server();
 | 
						|
            m_sql.reset();
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            // We already have a connection
 | 
						|
            return true;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    bool rval = false;
 | 
						|
    std::string err;
 | 
						|
 | 
						|
    auto servers = service_to_servers(m_cnf.service);
 | 
						|
    std::tie(err, m_sql) = SQL::connect(servers);
 | 
						|
 | 
						|
    if (!err.empty())
 | 
						|
    {
 | 
						|
        if (!servers.empty())
 | 
						|
        {
 | 
						|
            // We had a valid master candidate but we couldn't connect to it
 | 
						|
            MXB_ERROR("%s", err.c_str());
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        mxb_assert(m_sql);
 | 
						|
        std::string gtid_start_pos = "SET @slave_connect_state='" + m_gtid + "'";
 | 
						|
 | 
						|
        // Queries required to start GTID replication
 | 
						|
        std::vector<std::string> queries =
 | 
						|
        {
 | 
						|
            "SET @master_binlog_checksum = @@global.binlog_checksum",
 | 
						|
            "SET @mariadb_slave_capability=4",
 | 
						|
            gtid_start_pos,
 | 
						|
            "SET @slave_gtid_strict_mode=1",
 | 
						|
            "SET @slave_gtid_ignore_duplicates=1",
 | 
						|
            "SET NAMES latin1"
 | 
						|
        };
 | 
						|
 | 
						|
        if (!m_sql->query(queries))
 | 
						|
        {
 | 
						|
            MXB_ERROR("Failed to prepare connection: %s", m_sql->error().c_str());
 | 
						|
        }
 | 
						|
        else if (!m_sql->replicate(m_cnf.server_id))
 | 
						|
        {
 | 
						|
            MXB_ERROR("Failed to open replication channel: %s", m_sql->error().c_str());
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            if (old_server.host != m_sql->server().host || old_server.port != m_sql->server().port)
 | 
						|
            {
 | 
						|
                MXB_NOTICE("Started replicating from [%s]:%d at GTID '%s'", m_sql->server().host.c_str(),
 | 
						|
                           m_sql->server().port, m_gtid.c_str());
 | 
						|
            }
 | 
						|
            rval = true;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    if (!rval)
 | 
						|
    {
 | 
						|
        m_sql.reset();
 | 
						|
    }
 | 
						|
 | 
						|
    return rval;
 | 
						|
}
 | 
						|
 | 
						|
void Replicator::Imp::process_events()
 | 
						|
{
 | 
						|
    pthread_setname_np(m_thr.native_handle(), "cdc::Replicator");
 | 
						|
 | 
						|
    if (!load_gtid_state())
 | 
						|
    {
 | 
						|
        m_running = false;
 | 
						|
    }
 | 
						|
 | 
						|
    qc_thread_init(QC_INIT_BOTH);
 | 
						|
 | 
						|
    if (!m_gtid.empty())
 | 
						|
    {
 | 
						|
        m_rpl->set_gtid(gtid_pos_t::from_string(m_gtid));
 | 
						|
    }
 | 
						|
 | 
						|
    while (m_running)
 | 
						|
    {
 | 
						|
        if (!connect())
 | 
						|
        {
 | 
						|
            // We failed to connect to any of the servers, try again in a few seconds
 | 
						|
            std::this_thread::sleep_for(milliseconds(5000));
 | 
						|
            continue;
 | 
						|
        }
 | 
						|
 | 
						|
        auto event = m_sql->fetch_event();
 | 
						|
 | 
						|
        if (event)
 | 
						|
        {
 | 
						|
            if (!process_one_event(event))
 | 
						|
            {
 | 
						|
                /**
 | 
						|
                 * Fatal error encountered. Fixing it might require manual intervention so
 | 
						|
                 * the safest thing to do is to stop processing data.
 | 
						|
                 */
 | 
						|
                m_running = false;
 | 
						|
            }
 | 
						|
        }
 | 
						|
        else if (m_sql->errnum() == CR_SERVER_LOST)
 | 
						|
        {
 | 
						|
            if (m_should_stop)
 | 
						|
            {
 | 
						|
                if (m_current_gtid == m_gtid)
 | 
						|
                {
 | 
						|
                    /**
 | 
						|
                     * The latest committed GTID points to the current GTID being processed,
 | 
						|
                     * no transaction in progress.
 | 
						|
                     */
 | 
						|
                    m_safe_to_stop = true;
 | 
						|
                }
 | 
						|
                else
 | 
						|
                {
 | 
						|
                    MXB_WARNING("Lost connection to server '%s:%d' when processing GTID '%s' while a "
 | 
						|
                                "controlled shutdown was in progress. Attempting to roll back partial "
 | 
						|
                                "transactions.", m_sql->server().host.c_str(), m_sql->server().port,
 | 
						|
                                m_current_gtid.c_str());
 | 
						|
                    m_running = false;
 | 
						|
                }
 | 
						|
            }
 | 
						|
 | 
						|
            // The network error will be detected at the start of the next round
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            MXB_ERROR("Failed to read replicated event: %s", m_sql->error().c_str());
 | 
						|
            break;
 | 
						|
        }
 | 
						|
 | 
						|
        if (m_safe_to_stop)
 | 
						|
        {
 | 
						|
            MXB_NOTICE("Stopped at GTID '%s'", m_gtid.c_str());
 | 
						|
            break;
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
std::string to_gtid_string(const MARIADB_RPL_EVENT& event)
 | 
						|
{
 | 
						|
    std::stringstream ss;
 | 
						|
    ss << event.event.gtid.domain_id << '-' << event.server_id << '-' << event.event.gtid.sequence_nr;
 | 
						|
    return ss.str();
 | 
						|
}
 | 
						|
 | 
						|
bool Replicator::Imp::load_gtid_state()
 | 
						|
{
 | 
						|
    bool rval = false;
 | 
						|
    std::string filename = m_cnf.statedir + STATEFILE_NAME;
 | 
						|
    std::ifstream statefile(filename);
 | 
						|
    std::string gtid;
 | 
						|
    statefile >> gtid;
 | 
						|
 | 
						|
    if (statefile)
 | 
						|
    {
 | 
						|
        rval = true;
 | 
						|
 | 
						|
        if (!gtid.empty())
 | 
						|
        {
 | 
						|
            m_gtid = gtid;
 | 
						|
            MXB_NOTICE("Continuing from GTID '%s'", m_gtid.c_str());
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        if (errno == ENOENT)
 | 
						|
        {
 | 
						|
            //  No GTID file, use the GTID provided in the configuration
 | 
						|
            rval = true;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            MXB_ERROR("Failed to load current GTID state from file '%s': %d, %s",
 | 
						|
                      filename.c_str(), errno, mxb_strerror(errno));
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return rval;
 | 
						|
}
 | 
						|
 | 
						|
void Replicator::Imp::save_gtid_state() const
 | 
						|
{
 | 
						|
    std::ofstream statefile(m_cnf.statedir + "/" + STATEFILE_NAME);
 | 
						|
    statefile << m_current_gtid << std::endl;
 | 
						|
 | 
						|
    if (!statefile)
 | 
						|
    {
 | 
						|
        MXB_ERROR("Failed to store current GTID state inside '%s': %d, %s",
 | 
						|
                  m_cnf.statedir.c_str(), errno, mxb_strerror(errno));
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
bool Replicator::Imp::process_one_event(SQL::Event& event)
 | 
						|
{
 | 
						|
    bool commit = false;
 | 
						|
 | 
						|
    switch (event->event_type)
 | 
						|
    {
 | 
						|
    case ROTATE_EVENT:
 | 
						|
        if (m_should_stop)
 | 
						|
        {
 | 
						|
            // Rotating to a new binlog file, a safe place to stop
 | 
						|
            m_safe_to_stop = true;
 | 
						|
        }
 | 
						|
        break;
 | 
						|
 | 
						|
    case GTID_EVENT:
 | 
						|
        if (m_should_stop)
 | 
						|
        {
 | 
						|
            // Start of a new transaction, a safe place to stop
 | 
						|
            m_safe_to_stop = true;
 | 
						|
            break;
 | 
						|
        }
 | 
						|
        else if (event->event.gtid.flags & IMPLICIT_COMMIT_FLAG)
 | 
						|
        {
 | 
						|
            m_implicit_commit = true;
 | 
						|
        }
 | 
						|
 | 
						|
        m_current_gtid = to_gtid_string(*event);
 | 
						|
        MXB_INFO("GTID: %s", m_current_gtid.c_str());
 | 
						|
        break;
 | 
						|
 | 
						|
    case XID_EVENT:
 | 
						|
        commit = true;
 | 
						|
        MXB_INFO("XID for GTID '%s': %lu", m_current_gtid.c_str(), event->event.xid.transaction_nr);
 | 
						|
 | 
						|
        if (m_should_stop)
 | 
						|
        {
 | 
						|
            // End of a transaction, a safe place to stop
 | 
						|
            m_safe_to_stop = true;
 | 
						|
            break;
 | 
						|
        }
 | 
						|
        break;
 | 
						|
 | 
						|
    case QUERY_EVENT:
 | 
						|
    case USER_VAR_EVENT:
 | 
						|
        if (m_implicit_commit)
 | 
						|
        {
 | 
						|
            m_implicit_commit = false;
 | 
						|
            commit = true;
 | 
						|
        }
 | 
						|
        break;
 | 
						|
 | 
						|
    default:
 | 
						|
        // Ignore the event
 | 
						|
        break;
 | 
						|
    }
 | 
						|
 | 
						|
    bool rval = true;
 | 
						|
    REP_HEADER hdr;
 | 
						|
    uint8_t* ptr = m_sql->event_data() + 20;
 | 
						|
 | 
						|
    MARIADB_RPL_EVENT& ev = *event;
 | 
						|
    hdr.event_size = ev.event_length + (m_rpl->have_checksums() ? 4 : 0);
 | 
						|
    hdr.event_type = ev.event_type;
 | 
						|
    hdr.flags = ev.flags;
 | 
						|
    hdr.next_pos = ev.next_event_pos;
 | 
						|
    hdr.ok = ev.ok;
 | 
						|
    hdr.payload_len = hdr.event_size + 4;
 | 
						|
    hdr.seqno = 0;
 | 
						|
    hdr.serverid = ev.server_id;
 | 
						|
    hdr.timestamp = ev.timestamp;
 | 
						|
 | 
						|
    m_rpl->handle_event(hdr, ptr);
 | 
						|
 | 
						|
    if (commit)
 | 
						|
    {
 | 
						|
        m_rpl->flush();
 | 
						|
        notify_all_clients(m_cnf.service);
 | 
						|
        m_gtid = m_current_gtid;
 | 
						|
        save_gtid_state();
 | 
						|
    }
 | 
						|
 | 
						|
    return rval;
 | 
						|
}
 | 
						|
 | 
						|
Replicator::Imp::~Imp()
 | 
						|
{
 | 
						|
    m_should_stop = true;
 | 
						|
    m_thr.join();
 | 
						|
}
 | 
						|
 | 
						|
//
 | 
						|
// The public API
 | 
						|
//
 | 
						|
 | 
						|
// static
 | 
						|
std::unique_ptr<Replicator> Replicator::start(const Config& cnf, Rpl* rpl)
 | 
						|
{
 | 
						|
    return std::unique_ptr<Replicator>(new Replicator(cnf, rpl));
 | 
						|
}
 | 
						|
 | 
						|
bool Replicator::ok() const
 | 
						|
{
 | 
						|
    return m_imp->ok();
 | 
						|
}
 | 
						|
 | 
						|
Replicator::~Replicator()
 | 
						|
{
 | 
						|
}
 | 
						|
 | 
						|
Replicator::Replicator(const Config& cnf, Rpl* rpl)
 | 
						|
    : m_imp(new Imp(cnf, rpl))
 | 
						|
{
 | 
						|
}
 | 
						|
}
 |