diff --git a/server/modules/routing/avrorouter/config.hh b/server/modules/routing/avrorouter/config.hh new file mode 100644 index 000000000..6307ffdfd --- /dev/null +++ b/server/modules/routing/avrorouter/config.hh @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2019 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2022-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#pragma once + +#include +#include +#include +#include + +#include + +#include "rpl.hh" + +namespace cdc +{ + +struct Server +{ + std::string host; // Address to connect to + int port; // Port where the server is listening + std::string user; // Username used for the connection + std::string password; // Password for the user +}; + +struct Config +{ + int server_id = 1234; // Server ID used in registration + std::string gtid; // Starting GTID + SERVICE* service; + std::string statedir = "."; +}; +} diff --git a/server/modules/routing/avrorouter/replicator.cc b/server/modules/routing/avrorouter/replicator.cc new file mode 100644 index 000000000..218d8055d --- /dev/null +++ b/server/modules/routing/avrorouter/replicator.cc @@ -0,0 +1,447 @@ +/* + * Copyright (c) 2019 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2022-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "avrorouter.hh" +#include "replicator.hh" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +// Private headers +#include "sql.hh" + + +using std::chrono::duration_cast; +using Clock = std::chrono::steady_clock; +using Timepoint = Clock::time_point; +using std::chrono::milliseconds; + +namespace +{ +std::vector service_to_servers(SERVICE* service) +{ + std::vector servers; + + for (auto s = service->dbref; s; s = s->next) + { + if (s->active && s->server->is_master()) + { + // TODO: per-server credentials aren't exposed in the public class + servers.push_back({s->server->address, s->server->port, service->user, service->password}); + } + } + + return servers; +} +} + +namespace cdc +{ + + +// A very small daemon. The main class that drives the whole replication process +class Replicator::Imp +{ +public: + Imp& operator=(Imp&) = delete; + Imp(Imp&) = delete; + + // Flag used in GTID events to signal statements that perform an implicit commit + static constexpr int IMPLICIT_COMMIT_FLAG = 0x1; + + // Creates a new replication stream and starts it + Imp(const Config& cnf, Rpl* rpl); + + // Check if the replicator is still OK + bool ok() const; + + ~Imp(); + +private: + static const std::string STATEFILE_DIR; + static const std::string STATEFILE_NAME; + static const std::string STATEFILE_TMP_SUFFIX; + + bool connect(); + void process_events(); + bool process_one_event(SQL::Event& event); + bool load_gtid_state(); + void save_gtid_state() const; + + Config m_cnf; // The configuration the stream was started with + std::unique_ptr m_sql; // Database connection + std::atomic m_running {true}; // Whether the stream is running + std::atomic m_should_stop {false}; // Set to true when doing a controlled shutdown + std::atomic m_safe_to_stop {false}; // Whether it safe to stop the processing + std::string m_gtid; // GTID position to start from + std::string m_current_gtid; // GTID of the transaction being processed + bool m_implicit_commit {false}; // Commit after next query event + Rpl* m_rpl; // Class that handles the replicated events + + // NOTE: must be declared last + std::thread m_thr; // Thread that receives the replication events +}; + +const std::string Replicator::Imp::STATEFILE_DIR = "./"; +const std::string Replicator::Imp::STATEFILE_NAME = "current_gtid.txt"; +const std::string Replicator::Imp::STATEFILE_TMP_SUFFIX = ".tmp"; + +Replicator::Imp::Imp(const Config& cnf, Rpl* rpl) + : m_cnf(cnf) + , m_gtid(cnf.gtid) + , m_rpl(rpl) + , m_thr(std::thread(&Imp::process_events, this)) +{ + mxb_assert(m_rpl); +} + +bool Replicator::Imp::ok() const +{ + return m_running; +} + +bool Replicator::Imp::connect() +{ + cdc::Server old_server = {}; + + if (m_sql) + { + if (m_sql->errnum()) + { + old_server = m_sql->server(); + m_sql.reset(); + } + else + { + // We already have a connection + return true; + } + } + + bool rval = false; + std::string err; + + auto servers = service_to_servers(m_cnf.service); + std::tie(err, m_sql) = SQL::connect(servers); + + if (!err.empty()) + { + if (!servers.empty()) + { + // We had a valid master candidate but we couldn't connect to it + MXB_ERROR("%s", err.c_str()); + } + } + else + { + mxb_assert(m_sql); + std::string gtid_start_pos = "SET @slave_connect_state='" + m_gtid + "'"; + + // Queries required to start GTID replication + std::vector queries = + { + "SET @master_binlog_checksum = @@global.binlog_checksum", + "SET @mariadb_slave_capability=4", + gtid_start_pos, + "SET @slave_gtid_strict_mode=1", + "SET @slave_gtid_ignore_duplicates=1", + "SET NAMES latin1" + }; + + if (!m_sql->query(queries)) + { + MXB_ERROR("Failed to prepare connection: %s", m_sql->error().c_str()); + } + else if (!m_sql->replicate(m_cnf.server_id)) + { + MXB_ERROR("Failed to open replication channel: %s", m_sql->error().c_str()); + } + else + { + if (old_server.host != m_sql->server().host || old_server.port != m_sql->server().port) + { + MXB_NOTICE("Started replicating from [%s]:%d at GTID '%s'", m_sql->server().host.c_str(), + m_sql->server().port, m_gtid.c_str()); + } + rval = true; + } + } + + if (!rval) + { + m_sql.reset(); + } + + return rval; +} + +void Replicator::Imp::process_events() +{ + pthread_setname_np(m_thr.native_handle(), "cdc::Replicator"); + + if (!load_gtid_state()) + { + m_running = false; + } + + qc_thread_init(QC_INIT_BOTH); + + if (!m_gtid.empty()) + { + m_rpl->set_gtid(gtid_pos_t::from_string(m_gtid)); + } + + while (m_running) + { + if (!connect()) + { + // We failed to connect to any of the servers, try again in a few seconds + std::this_thread::sleep_for(milliseconds(5000)); + continue; + } + + auto event = m_sql->fetch_event(); + + if (event) + { + if (!process_one_event(event)) + { + /** + * Fatal error encountered. Fixing it might require manual intervention so + * the safest thing to do is to stop processing data. + */ + m_running = false; + } + } + else if (m_sql->errnum() == CR_SERVER_LOST) + { + if (m_should_stop) + { + if (m_current_gtid == m_gtid) + { + /** + * The latest committed GTID points to the current GTID being processed, + * no transaction in progress. + */ + m_safe_to_stop = true; + } + else + { + MXB_WARNING("Lost connection to server '%s:%d' when processing GTID '%s' while a " + "controlled shutdown was in progress. Attempting to roll back partial " + "transactions.", m_sql->server().host.c_str(), m_sql->server().port, + m_current_gtid.c_str()); + m_running = false; + } + } + + // The network error will be detected at the start of the next round + } + else + { + MXB_ERROR("Failed to read replicated event: %s", m_sql->error().c_str()); + break; + } + + if (m_safe_to_stop) + { + MXB_NOTICE("Stopped at GTID '%s'", m_gtid.c_str()); + break; + } + } +} + +std::string to_gtid_string(const MARIADB_RPL_EVENT& event) +{ + std::stringstream ss; + ss << event.event.gtid.domain_id << '-' << event.server_id << '-' << event.event.gtid.sequence_nr; + return ss.str(); +} + +bool Replicator::Imp::load_gtid_state() +{ + bool rval = false; + std::string filename = m_cnf.statedir + STATEFILE_NAME; + std::ifstream statefile(filename); + std::string gtid; + statefile >> gtid; + + if (statefile) + { + rval = true; + + if (!gtid.empty()) + { + m_gtid = gtid; + MXB_NOTICE("Continuing from GTID '%s'", m_gtid.c_str()); + } + } + else + { + if (errno == ENOENT) + { + // No GTID file, use the GTID provided in the configuration + rval = true; + } + else + { + MXB_ERROR("Failed to load current GTID state from file '%s': %d, %s", + filename.c_str(), errno, mxb_strerror(errno)); + } + } + + return rval; +} + +void Replicator::Imp::save_gtid_state() const +{ + std::ofstream statefile(m_cnf.statedir + "/" + STATEFILE_NAME); + statefile << m_current_gtid << std::endl; + + if (!statefile) + { + MXB_ERROR("Failed to store current GTID state inside '%s': %d, %s", + m_cnf.statedir.c_str(), errno, mxb_strerror(errno)); + } +} + +bool Replicator::Imp::process_one_event(SQL::Event& event) +{ + bool commit = false; + + switch (event->event_type) + { + case ROTATE_EVENT: + if (m_should_stop) + { + // Rotating to a new binlog file, a safe place to stop + m_safe_to_stop = true; + } + break; + + case GTID_EVENT: + if (m_should_stop) + { + // Start of a new transaction, a safe place to stop + m_safe_to_stop = true; + break; + } + else if (event->event.gtid.flags & IMPLICIT_COMMIT_FLAG) + { + m_implicit_commit = true; + } + + m_current_gtid = to_gtid_string(*event); + MXB_INFO("GTID: %s", m_current_gtid.c_str()); + break; + + case XID_EVENT: + commit = true; + MXB_INFO("XID for GTID '%s': %lu", m_current_gtid.c_str(), event->event.xid.transaction_nr); + + if (m_should_stop) + { + // End of a transaction, a safe place to stop + m_safe_to_stop = true; + break; + } + break; + + case QUERY_EVENT: + case USER_VAR_EVENT: + if (m_implicit_commit) + { + m_implicit_commit = false; + commit = true; + } + break; + + default: + // Ignore the event + break; + } + + bool rval = true; + REP_HEADER hdr; + uint8_t* ptr = m_sql->event_data() + 20; + + MARIADB_RPL_EVENT& ev = *event; + hdr.event_size = ev.event_length + (m_rpl->have_checksums() ? 4 : 0); + hdr.event_type = ev.event_type; + hdr.flags = ev.flags; + hdr.next_pos = ev.next_event_pos; + hdr.ok = ev.ok; + hdr.payload_len = hdr.event_size + 4; + hdr.seqno = 0; + hdr.serverid = ev.server_id; + hdr.timestamp = ev.timestamp; + + m_rpl->handle_event(hdr, ptr); + + if (commit) + { + m_rpl->flush(); + notify_all_clients(m_cnf.service); + m_gtid = m_current_gtid; + save_gtid_state(); + } + + return rval; +} + +Replicator::Imp::~Imp() +{ + m_should_stop = true; + m_thr.join(); +} + +// +// The public API +// + +// static +std::unique_ptr Replicator::start(const Config& cnf, Rpl* rpl) +{ + return std::unique_ptr(new Replicator(cnf, rpl)); +} + +bool Replicator::ok() const +{ + return m_imp->ok(); +} + +Replicator::~Replicator() +{ +} + +Replicator::Replicator(const Config& cnf, Rpl* rpl) + : m_imp(new Imp(cnf, rpl)) +{ +} +} diff --git a/server/modules/routing/avrorouter/replicator.hh b/server/modules/routing/avrorouter/replicator.hh new file mode 100644 index 000000000..772686696 --- /dev/null +++ b/server/modules/routing/avrorouter/replicator.hh @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2019 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2022-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#pragma once + +#include +#include + +#include "config.hh" +#include "rpl.hh" + +namespace cdc +{ + +// Final name pending +class Replicator +{ +public: + Replicator(const Replicator&) = delete; + Replicator& operator=(const Replicator&) = delete; + + /** + * Create a new data replicator + * + * @param cnf The configuration to use + * + * @return The new Replicator instance + */ + static std::unique_ptr start(const Config& cnf, Rpl* rpl); + + /** + * Check if the replicator is OK + * + * @return True if everything is OK. False if any errors have occurred and the replicator has stopped. + */ + bool ok() const; + + /** + * Destroys the Replicator and stops the processing of data + */ + ~Replicator(); + +private: + class Imp; + Replicator(const Config& cnf, Rpl* rpl); + + // Pointer to the implementation of the abstract interface + std::unique_ptr m_imp; +}; +} diff --git a/server/modules/routing/avrorouter/sql.cc b/server/modules/routing/avrorouter/sql.cc new file mode 100644 index 000000000..7a56d94ca --- /dev/null +++ b/server/modules/routing/avrorouter/sql.cc @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2019 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2022-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "sql.hh" + +SQL::SQL(MYSQL* mysql, const cdc::Server& server) + : m_mysql(mysql) + , m_server(server) +{ +} + +SQL::~SQL() +{ + mysql_free_result(m_res); + mariadb_rpl_close(m_rpl); + mysql_close(m_mysql); +} + +std::pair> SQL::connect(const std::vector& servers, + int connect_timeout, int read_timeout) +{ + std::unique_ptr rval; + MYSQL* mysql = nullptr; + std::string error; + + if (servers.empty()) + { + error = "No servers defined"; + } + + for (const auto& server : servers) + { + if (!(mysql = mysql_init(nullptr))) + { + error = "Connection initialization failed"; + break; + } + + mysql_optionsv(mysql, MYSQL_OPT_CONNECT_TIMEOUT, &connect_timeout); + mysql_optionsv(mysql, MYSQL_OPT_READ_TIMEOUT, &read_timeout); + + if (!mysql_real_connect(mysql, server.host.c_str(), server.user.c_str(), server.password.c_str(), + nullptr, server.port, nullptr, 0)) + { + error = "Connection creation failed: " + std::string(mysql_error(mysql)); + mysql_close(mysql); + mysql = nullptr; + } + else + { + // Successful connection + rval.reset(new SQL(mysql, server)); + error.clear(); + break; + } + } + + return {error, std::move(rval)}; +} + +bool SQL::query(const std::string& sql) +{ + if (m_res) + { + mysql_free_result(m_res); + m_res = nullptr; + } + + return mysql_query(m_mysql, sql.c_str()) == 0; +} + +bool SQL::query(const std::vector& sql) +{ + for (const auto& a : sql) + { + if (!query(a.c_str())) + { + return false; + } + } + + return true; +} + +bool SQL::replicate(int server_id) +{ + if (!(m_rpl = mariadb_rpl_init(m_mysql))) + { + return false; + } + + mariadb_rpl_optionsv(m_rpl, MARIADB_RPL_SERVER_ID, &server_id); + + if (mariadb_rpl_open(m_rpl)) + { + return false; + } + + return true; +} diff --git a/server/modules/routing/avrorouter/sql.hh b/server/modules/routing/avrorouter/sql.hh new file mode 100644 index 000000000..52d8dd901 --- /dev/null +++ b/server/modules/routing/avrorouter/sql.hh @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2019 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2022-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ +#pragma once + +#include +#include + +#include + +#include "config.hh" + +// Convenience class that wraps a MYSQL connection and provides a minimal C++ interface +class SQL +{ +public: + SQL(const SQL&) = delete; + SQL& operator=(const SQL&) = delete; + + using Event = std::unique_ptr>; + + /** + * Create a new connection from a list of servers + * + * The first available server is chosen from the provided list + * + * @param servers List of server candidates + * @param connect_timeout Connect timeout in seconds, defaults to 10 seconds + * @param read_timeout Read timeout in seconds, defaults to 5 seconds + * + * @return The error message and a unique_ptr. If an error occurred, the error string contains the + * error description and the unique_ptr is empty. + */ + static std::pair> connect(const std::vector& servers, + int connect_timeout = 30, + int read_timeout = 60); + + ~SQL(); + + /** + * Execute a query + * + * @param sql SQL to execute + * + * @return True on success, false on error + */ + bool query(const std::string& sql); + bool query(const std::vector& sql); + + /** + * Return latest error string + * + * @return The latest error + */ + std::string error() const + { + return mysql_error(m_mysql); + } + + /** + * Return latest error number + * + * @return The latest number + */ + int errnum() const + { + return mysql_errno(m_mysql); + } + + /** + * Return the server where the connection was created + * + * @return The server where the connection was created + */ + const cdc::Server& server() const + { + return m_server; + } + + /** + * Start replicating data from the server + * + * @param server_id Server ID to connect with + * + * @return True if replication was started successfully + */ + bool replicate(int server_id); + + /** + * Fetch one replication event + * + * @return The next replicated event or null on error + */ + Event fetch_event() + { + return Event {mariadb_rpl_fetch(m_rpl, nullptr), mariadb_free_rpl_event}; + } + + /** + * Pointer to the raw event data + */ + uint8_t* event_data() const + { + return m_rpl->buffer; + } + +private: + SQL(MYSQL* mysql, const cdc::Server& server); + + MYSQL* m_mysql {nullptr}; // Database handle + MYSQL_RES* m_res {nullptr}; // Open result set + MARIADB_RPL* m_rpl {nullptr}; // Replication handle + cdc::Server m_server; // The server where the connection was made +}; + +// String conversion helper +static inline std::string to_string(const MARIADB_STRING& str) +{ + return std::string(str.str, str.length); +}