Add modified Replicator for avrorouter
Repurposed the Replicator from the CDC integration project as a replication event processing service. It is similar to the CDC version of the Replicator and is still in the same namespace but it lacks all of the cross-thread communication that was a part of the integration project.
This commit is contained in:
43
server/modules/routing/avrorouter/config.hh
Normal file
43
server/modules/routing/avrorouter/config.hh
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2019 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2022-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
#include <string>
|
||||||
|
#include <unordered_set>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include <maxscale/service.hh>
|
||||||
|
|
||||||
|
#include "rpl.hh"
|
||||||
|
|
||||||
|
namespace cdc
|
||||||
|
{
|
||||||
|
|
||||||
|
struct Server
|
||||||
|
{
|
||||||
|
std::string host; // Address to connect to
|
||||||
|
int port; // Port where the server is listening
|
||||||
|
std::string user; // Username used for the connection
|
||||||
|
std::string password; // Password for the user
|
||||||
|
};
|
||||||
|
|
||||||
|
struct Config
|
||||||
|
{
|
||||||
|
int server_id = 1234; // Server ID used in registration
|
||||||
|
std::string gtid; // Starting GTID
|
||||||
|
SERVICE* service;
|
||||||
|
std::string statedir = ".";
|
||||||
|
};
|
||||||
|
}
|
447
server/modules/routing/avrorouter/replicator.cc
Normal file
447
server/modules/routing/avrorouter/replicator.cc
Normal file
@ -0,0 +1,447 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2019 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2022-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "avrorouter.hh"
|
||||||
|
#include "replicator.hh"
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <chrono>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <future>
|
||||||
|
#include <fstream>
|
||||||
|
#include <sstream>
|
||||||
|
#include <thread>
|
||||||
|
#include <tuple>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include <mysql.h>
|
||||||
|
#include <mariadb_rpl.h>
|
||||||
|
#include <errmsg.h>
|
||||||
|
|
||||||
|
#include <maxscale/query_classifier.hh>
|
||||||
|
#include <maxscale/buffer.hh>
|
||||||
|
#include <maxscale/utils.hh>
|
||||||
|
|
||||||
|
// Private headers
|
||||||
|
#include "sql.hh"
|
||||||
|
|
||||||
|
|
||||||
|
using std::chrono::duration_cast;
|
||||||
|
using Clock = std::chrono::steady_clock;
|
||||||
|
using Timepoint = Clock::time_point;
|
||||||
|
using std::chrono::milliseconds;
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
std::vector<cdc::Server> service_to_servers(SERVICE* service)
|
||||||
|
{
|
||||||
|
std::vector<cdc::Server> servers;
|
||||||
|
|
||||||
|
for (auto s = service->dbref; s; s = s->next)
|
||||||
|
{
|
||||||
|
if (s->active && s->server->is_master())
|
||||||
|
{
|
||||||
|
// TODO: per-server credentials aren't exposed in the public class
|
||||||
|
servers.push_back({s->server->address, s->server->port, service->user, service->password});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return servers;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace cdc
|
||||||
|
{
|
||||||
|
|
||||||
|
|
||||||
|
// A very small daemon. The main class that drives the whole replication process
|
||||||
|
class Replicator::Imp
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
Imp& operator=(Imp&) = delete;
|
||||||
|
Imp(Imp&) = delete;
|
||||||
|
|
||||||
|
// Flag used in GTID events to signal statements that perform an implicit commit
|
||||||
|
static constexpr int IMPLICIT_COMMIT_FLAG = 0x1;
|
||||||
|
|
||||||
|
// Creates a new replication stream and starts it
|
||||||
|
Imp(const Config& cnf, Rpl* rpl);
|
||||||
|
|
||||||
|
// Check if the replicator is still OK
|
||||||
|
bool ok() const;
|
||||||
|
|
||||||
|
~Imp();
|
||||||
|
|
||||||
|
private:
|
||||||
|
static const std::string STATEFILE_DIR;
|
||||||
|
static const std::string STATEFILE_NAME;
|
||||||
|
static const std::string STATEFILE_TMP_SUFFIX;
|
||||||
|
|
||||||
|
bool connect();
|
||||||
|
void process_events();
|
||||||
|
bool process_one_event(SQL::Event& event);
|
||||||
|
bool load_gtid_state();
|
||||||
|
void save_gtid_state() const;
|
||||||
|
|
||||||
|
Config m_cnf; // The configuration the stream was started with
|
||||||
|
std::unique_ptr<SQL> m_sql; // Database connection
|
||||||
|
std::atomic<bool> m_running {true}; // Whether the stream is running
|
||||||
|
std::atomic<bool> m_should_stop {false}; // Set to true when doing a controlled shutdown
|
||||||
|
std::atomic<bool> m_safe_to_stop {false}; // Whether it safe to stop the processing
|
||||||
|
std::string m_gtid; // GTID position to start from
|
||||||
|
std::string m_current_gtid; // GTID of the transaction being processed
|
||||||
|
bool m_implicit_commit {false}; // Commit after next query event
|
||||||
|
Rpl* m_rpl; // Class that handles the replicated events
|
||||||
|
|
||||||
|
// NOTE: must be declared last
|
||||||
|
std::thread m_thr; // Thread that receives the replication events
|
||||||
|
};
|
||||||
|
|
||||||
|
const std::string Replicator::Imp::STATEFILE_DIR = "./";
|
||||||
|
const std::string Replicator::Imp::STATEFILE_NAME = "current_gtid.txt";
|
||||||
|
const std::string Replicator::Imp::STATEFILE_TMP_SUFFIX = ".tmp";
|
||||||
|
|
||||||
|
Replicator::Imp::Imp(const Config& cnf, Rpl* rpl)
|
||||||
|
: m_cnf(cnf)
|
||||||
|
, m_gtid(cnf.gtid)
|
||||||
|
, m_rpl(rpl)
|
||||||
|
, m_thr(std::thread(&Imp::process_events, this))
|
||||||
|
{
|
||||||
|
mxb_assert(m_rpl);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Replicator::Imp::ok() const
|
||||||
|
{
|
||||||
|
return m_running;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Replicator::Imp::connect()
|
||||||
|
{
|
||||||
|
cdc::Server old_server = {};
|
||||||
|
|
||||||
|
if (m_sql)
|
||||||
|
{
|
||||||
|
if (m_sql->errnum())
|
||||||
|
{
|
||||||
|
old_server = m_sql->server();
|
||||||
|
m_sql.reset();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// We already have a connection
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool rval = false;
|
||||||
|
std::string err;
|
||||||
|
|
||||||
|
auto servers = service_to_servers(m_cnf.service);
|
||||||
|
std::tie(err, m_sql) = SQL::connect(servers);
|
||||||
|
|
||||||
|
if (!err.empty())
|
||||||
|
{
|
||||||
|
if (!servers.empty())
|
||||||
|
{
|
||||||
|
// We had a valid master candidate but we couldn't connect to it
|
||||||
|
MXB_ERROR("%s", err.c_str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
mxb_assert(m_sql);
|
||||||
|
std::string gtid_start_pos = "SET @slave_connect_state='" + m_gtid + "'";
|
||||||
|
|
||||||
|
// Queries required to start GTID replication
|
||||||
|
std::vector<std::string> queries =
|
||||||
|
{
|
||||||
|
"SET @master_binlog_checksum = @@global.binlog_checksum",
|
||||||
|
"SET @mariadb_slave_capability=4",
|
||||||
|
gtid_start_pos,
|
||||||
|
"SET @slave_gtid_strict_mode=1",
|
||||||
|
"SET @slave_gtid_ignore_duplicates=1",
|
||||||
|
"SET NAMES latin1"
|
||||||
|
};
|
||||||
|
|
||||||
|
if (!m_sql->query(queries))
|
||||||
|
{
|
||||||
|
MXB_ERROR("Failed to prepare connection: %s", m_sql->error().c_str());
|
||||||
|
}
|
||||||
|
else if (!m_sql->replicate(m_cnf.server_id))
|
||||||
|
{
|
||||||
|
MXB_ERROR("Failed to open replication channel: %s", m_sql->error().c_str());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (old_server.host != m_sql->server().host || old_server.port != m_sql->server().port)
|
||||||
|
{
|
||||||
|
MXB_NOTICE("Started replicating from [%s]:%d at GTID '%s'", m_sql->server().host.c_str(),
|
||||||
|
m_sql->server().port, m_gtid.c_str());
|
||||||
|
}
|
||||||
|
rval = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!rval)
|
||||||
|
{
|
||||||
|
m_sql.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Replicator::Imp::process_events()
|
||||||
|
{
|
||||||
|
pthread_setname_np(m_thr.native_handle(), "cdc::Replicator");
|
||||||
|
|
||||||
|
if (!load_gtid_state())
|
||||||
|
{
|
||||||
|
m_running = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
qc_thread_init(QC_INIT_BOTH);
|
||||||
|
|
||||||
|
if (!m_gtid.empty())
|
||||||
|
{
|
||||||
|
m_rpl->set_gtid(gtid_pos_t::from_string(m_gtid));
|
||||||
|
}
|
||||||
|
|
||||||
|
while (m_running)
|
||||||
|
{
|
||||||
|
if (!connect())
|
||||||
|
{
|
||||||
|
// We failed to connect to any of the servers, try again in a few seconds
|
||||||
|
std::this_thread::sleep_for(milliseconds(5000));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto event = m_sql->fetch_event();
|
||||||
|
|
||||||
|
if (event)
|
||||||
|
{
|
||||||
|
if (!process_one_event(event))
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Fatal error encountered. Fixing it might require manual intervention so
|
||||||
|
* the safest thing to do is to stop processing data.
|
||||||
|
*/
|
||||||
|
m_running = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (m_sql->errnum() == CR_SERVER_LOST)
|
||||||
|
{
|
||||||
|
if (m_should_stop)
|
||||||
|
{
|
||||||
|
if (m_current_gtid == m_gtid)
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* The latest committed GTID points to the current GTID being processed,
|
||||||
|
* no transaction in progress.
|
||||||
|
*/
|
||||||
|
m_safe_to_stop = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXB_WARNING("Lost connection to server '%s:%d' when processing GTID '%s' while a "
|
||||||
|
"controlled shutdown was in progress. Attempting to roll back partial "
|
||||||
|
"transactions.", m_sql->server().host.c_str(), m_sql->server().port,
|
||||||
|
m_current_gtid.c_str());
|
||||||
|
m_running = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The network error will be detected at the start of the next round
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXB_ERROR("Failed to read replicated event: %s", m_sql->error().c_str());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_safe_to_stop)
|
||||||
|
{
|
||||||
|
MXB_NOTICE("Stopped at GTID '%s'", m_gtid.c_str());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string to_gtid_string(const MARIADB_RPL_EVENT& event)
|
||||||
|
{
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << event.event.gtid.domain_id << '-' << event.server_id << '-' << event.event.gtid.sequence_nr;
|
||||||
|
return ss.str();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Replicator::Imp::load_gtid_state()
|
||||||
|
{
|
||||||
|
bool rval = false;
|
||||||
|
std::string filename = m_cnf.statedir + STATEFILE_NAME;
|
||||||
|
std::ifstream statefile(filename);
|
||||||
|
std::string gtid;
|
||||||
|
statefile >> gtid;
|
||||||
|
|
||||||
|
if (statefile)
|
||||||
|
{
|
||||||
|
rval = true;
|
||||||
|
|
||||||
|
if (!gtid.empty())
|
||||||
|
{
|
||||||
|
m_gtid = gtid;
|
||||||
|
MXB_NOTICE("Continuing from GTID '%s'", m_gtid.c_str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (errno == ENOENT)
|
||||||
|
{
|
||||||
|
// No GTID file, use the GTID provided in the configuration
|
||||||
|
rval = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXB_ERROR("Failed to load current GTID state from file '%s': %d, %s",
|
||||||
|
filename.c_str(), errno, mxb_strerror(errno));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Replicator::Imp::save_gtid_state() const
|
||||||
|
{
|
||||||
|
std::ofstream statefile(m_cnf.statedir + "/" + STATEFILE_NAME);
|
||||||
|
statefile << m_current_gtid << std::endl;
|
||||||
|
|
||||||
|
if (!statefile)
|
||||||
|
{
|
||||||
|
MXB_ERROR("Failed to store current GTID state inside '%s': %d, %s",
|
||||||
|
m_cnf.statedir.c_str(), errno, mxb_strerror(errno));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Replicator::Imp::process_one_event(SQL::Event& event)
|
||||||
|
{
|
||||||
|
bool commit = false;
|
||||||
|
|
||||||
|
switch (event->event_type)
|
||||||
|
{
|
||||||
|
case ROTATE_EVENT:
|
||||||
|
if (m_should_stop)
|
||||||
|
{
|
||||||
|
// Rotating to a new binlog file, a safe place to stop
|
||||||
|
m_safe_to_stop = true;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case GTID_EVENT:
|
||||||
|
if (m_should_stop)
|
||||||
|
{
|
||||||
|
// Start of a new transaction, a safe place to stop
|
||||||
|
m_safe_to_stop = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else if (event->event.gtid.flags & IMPLICIT_COMMIT_FLAG)
|
||||||
|
{
|
||||||
|
m_implicit_commit = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
m_current_gtid = to_gtid_string(*event);
|
||||||
|
MXB_INFO("GTID: %s", m_current_gtid.c_str());
|
||||||
|
break;
|
||||||
|
|
||||||
|
case XID_EVENT:
|
||||||
|
commit = true;
|
||||||
|
MXB_INFO("XID for GTID '%s': %lu", m_current_gtid.c_str(), event->event.xid.transaction_nr);
|
||||||
|
|
||||||
|
if (m_should_stop)
|
||||||
|
{
|
||||||
|
// End of a transaction, a safe place to stop
|
||||||
|
m_safe_to_stop = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case QUERY_EVENT:
|
||||||
|
case USER_VAR_EVENT:
|
||||||
|
if (m_implicit_commit)
|
||||||
|
{
|
||||||
|
m_implicit_commit = false;
|
||||||
|
commit = true;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
// Ignore the event
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool rval = true;
|
||||||
|
REP_HEADER hdr;
|
||||||
|
uint8_t* ptr = m_sql->event_data() + 20;
|
||||||
|
|
||||||
|
MARIADB_RPL_EVENT& ev = *event;
|
||||||
|
hdr.event_size = ev.event_length + (m_rpl->have_checksums() ? 4 : 0);
|
||||||
|
hdr.event_type = ev.event_type;
|
||||||
|
hdr.flags = ev.flags;
|
||||||
|
hdr.next_pos = ev.next_event_pos;
|
||||||
|
hdr.ok = ev.ok;
|
||||||
|
hdr.payload_len = hdr.event_size + 4;
|
||||||
|
hdr.seqno = 0;
|
||||||
|
hdr.serverid = ev.server_id;
|
||||||
|
hdr.timestamp = ev.timestamp;
|
||||||
|
|
||||||
|
m_rpl->handle_event(hdr, ptr);
|
||||||
|
|
||||||
|
if (commit)
|
||||||
|
{
|
||||||
|
m_rpl->flush();
|
||||||
|
notify_all_clients(m_cnf.service);
|
||||||
|
m_gtid = m_current_gtid;
|
||||||
|
save_gtid_state();
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
Replicator::Imp::~Imp()
|
||||||
|
{
|
||||||
|
m_should_stop = true;
|
||||||
|
m_thr.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// The public API
|
||||||
|
//
|
||||||
|
|
||||||
|
// static
|
||||||
|
std::unique_ptr<Replicator> Replicator::start(const Config& cnf, Rpl* rpl)
|
||||||
|
{
|
||||||
|
return std::unique_ptr<Replicator>(new Replicator(cnf, rpl));
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Replicator::ok() const
|
||||||
|
{
|
||||||
|
return m_imp->ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
Replicator::~Replicator()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
Replicator::Replicator(const Config& cnf, Rpl* rpl)
|
||||||
|
: m_imp(new Imp(cnf, rpl))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
60
server/modules/routing/avrorouter/replicator.hh
Normal file
60
server/modules/routing/avrorouter/replicator.hh
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2019 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2022-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
#include "config.hh"
|
||||||
|
#include "rpl.hh"
|
||||||
|
|
||||||
|
namespace cdc
|
||||||
|
{
|
||||||
|
|
||||||
|
// Final name pending
|
||||||
|
class Replicator
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
Replicator(const Replicator&) = delete;
|
||||||
|
Replicator& operator=(const Replicator&) = delete;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new data replicator
|
||||||
|
*
|
||||||
|
* @param cnf The configuration to use
|
||||||
|
*
|
||||||
|
* @return The new Replicator instance
|
||||||
|
*/
|
||||||
|
static std::unique_ptr<Replicator> start(const Config& cnf, Rpl* rpl);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the replicator is OK
|
||||||
|
*
|
||||||
|
* @return True if everything is OK. False if any errors have occurred and the replicator has stopped.
|
||||||
|
*/
|
||||||
|
bool ok() const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destroys the Replicator and stops the processing of data
|
||||||
|
*/
|
||||||
|
~Replicator();
|
||||||
|
|
||||||
|
private:
|
||||||
|
class Imp;
|
||||||
|
Replicator(const Config& cnf, Rpl* rpl);
|
||||||
|
|
||||||
|
// Pointer to the implementation of the abstract interface
|
||||||
|
std::unique_ptr<Replicator::Imp> m_imp;
|
||||||
|
};
|
||||||
|
}
|
110
server/modules/routing/avrorouter/sql.cc
Normal file
110
server/modules/routing/avrorouter/sql.cc
Normal file
@ -0,0 +1,110 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2019 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2022-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "sql.hh"
|
||||||
|
|
||||||
|
SQL::SQL(MYSQL* mysql, const cdc::Server& server)
|
||||||
|
: m_mysql(mysql)
|
||||||
|
, m_server(server)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
SQL::~SQL()
|
||||||
|
{
|
||||||
|
mysql_free_result(m_res);
|
||||||
|
mariadb_rpl_close(m_rpl);
|
||||||
|
mysql_close(m_mysql);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::pair<std::string, std::unique_ptr<SQL>> SQL::connect(const std::vector<cdc::Server>& servers,
|
||||||
|
int connect_timeout, int read_timeout)
|
||||||
|
{
|
||||||
|
std::unique_ptr<SQL> rval;
|
||||||
|
MYSQL* mysql = nullptr;
|
||||||
|
std::string error;
|
||||||
|
|
||||||
|
if (servers.empty())
|
||||||
|
{
|
||||||
|
error = "No servers defined";
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto& server : servers)
|
||||||
|
{
|
||||||
|
if (!(mysql = mysql_init(nullptr)))
|
||||||
|
{
|
||||||
|
error = "Connection initialization failed";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
mysql_optionsv(mysql, MYSQL_OPT_CONNECT_TIMEOUT, &connect_timeout);
|
||||||
|
mysql_optionsv(mysql, MYSQL_OPT_READ_TIMEOUT, &read_timeout);
|
||||||
|
|
||||||
|
if (!mysql_real_connect(mysql, server.host.c_str(), server.user.c_str(), server.password.c_str(),
|
||||||
|
nullptr, server.port, nullptr, 0))
|
||||||
|
{
|
||||||
|
error = "Connection creation failed: " + std::string(mysql_error(mysql));
|
||||||
|
mysql_close(mysql);
|
||||||
|
mysql = nullptr;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Successful connection
|
||||||
|
rval.reset(new SQL(mysql, server));
|
||||||
|
error.clear();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {error, std::move(rval)};
|
||||||
|
}
|
||||||
|
|
||||||
|
bool SQL::query(const std::string& sql)
|
||||||
|
{
|
||||||
|
if (m_res)
|
||||||
|
{
|
||||||
|
mysql_free_result(m_res);
|
||||||
|
m_res = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
return mysql_query(m_mysql, sql.c_str()) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool SQL::query(const std::vector<std::string>& sql)
|
||||||
|
{
|
||||||
|
for (const auto& a : sql)
|
||||||
|
{
|
||||||
|
if (!query(a.c_str()))
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool SQL::replicate(int server_id)
|
||||||
|
{
|
||||||
|
if (!(m_rpl = mariadb_rpl_init(m_mysql)))
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
mariadb_rpl_optionsv(m_rpl, MARIADB_RPL_SERVER_ID, &server_id);
|
||||||
|
|
||||||
|
if (mariadb_rpl_open(m_rpl))
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
129
server/modules/routing/avrorouter/sql.hh
Normal file
129
server/modules/routing/avrorouter/sql.hh
Normal file
@ -0,0 +1,129 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2019 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2022-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <mysql.h>
|
||||||
|
#include <mariadb_rpl.h>
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
#include "config.hh"
|
||||||
|
|
||||||
|
// Convenience class that wraps a MYSQL connection and provides a minimal C++ interface
|
||||||
|
class SQL
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
SQL(const SQL&) = delete;
|
||||||
|
SQL& operator=(const SQL&) = delete;
|
||||||
|
|
||||||
|
using Event = std::unique_ptr<MARIADB_RPL_EVENT, std::function<decltype(mariadb_free_rpl_event)>>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new connection from a list of servers
|
||||||
|
*
|
||||||
|
* The first available server is chosen from the provided list
|
||||||
|
*
|
||||||
|
* @param servers List of server candidates
|
||||||
|
* @param connect_timeout Connect timeout in seconds, defaults to 10 seconds
|
||||||
|
* @param read_timeout Read timeout in seconds, defaults to 5 seconds
|
||||||
|
*
|
||||||
|
* @return The error message and a unique_ptr. If an error occurred, the error string contains the
|
||||||
|
* error description and the unique_ptr is empty.
|
||||||
|
*/
|
||||||
|
static std::pair<std::string, std::unique_ptr<SQL>> connect(const std::vector<cdc::Server>& servers,
|
||||||
|
int connect_timeout = 30,
|
||||||
|
int read_timeout = 60);
|
||||||
|
|
||||||
|
~SQL();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a query
|
||||||
|
*
|
||||||
|
* @param sql SQL to execute
|
||||||
|
*
|
||||||
|
* @return True on success, false on error
|
||||||
|
*/
|
||||||
|
bool query(const std::string& sql);
|
||||||
|
bool query(const std::vector<std::string>& sql);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return latest error string
|
||||||
|
*
|
||||||
|
* @return The latest error
|
||||||
|
*/
|
||||||
|
std::string error() const
|
||||||
|
{
|
||||||
|
return mysql_error(m_mysql);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return latest error number
|
||||||
|
*
|
||||||
|
* @return The latest number
|
||||||
|
*/
|
||||||
|
int errnum() const
|
||||||
|
{
|
||||||
|
return mysql_errno(m_mysql);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the server where the connection was created
|
||||||
|
*
|
||||||
|
* @return The server where the connection was created
|
||||||
|
*/
|
||||||
|
const cdc::Server& server() const
|
||||||
|
{
|
||||||
|
return m_server;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start replicating data from the server
|
||||||
|
*
|
||||||
|
* @param server_id Server ID to connect with
|
||||||
|
*
|
||||||
|
* @return True if replication was started successfully
|
||||||
|
*/
|
||||||
|
bool replicate(int server_id);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch one replication event
|
||||||
|
*
|
||||||
|
* @return The next replicated event or null on error
|
||||||
|
*/
|
||||||
|
Event fetch_event()
|
||||||
|
{
|
||||||
|
return Event {mariadb_rpl_fetch(m_rpl, nullptr), mariadb_free_rpl_event};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pointer to the raw event data
|
||||||
|
*/
|
||||||
|
uint8_t* event_data() const
|
||||||
|
{
|
||||||
|
return m_rpl->buffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
SQL(MYSQL* mysql, const cdc::Server& server);
|
||||||
|
|
||||||
|
MYSQL* m_mysql {nullptr}; // Database handle
|
||||||
|
MYSQL_RES* m_res {nullptr}; // Open result set
|
||||||
|
MARIADB_RPL* m_rpl {nullptr}; // Replication handle
|
||||||
|
cdc::Server m_server; // The server where the connection was made
|
||||||
|
};
|
||||||
|
|
||||||
|
// String conversion helper
|
||||||
|
static inline std::string to_string(const MARIADB_STRING& str)
|
||||||
|
{
|
||||||
|
return std::string(str.str, str.length);
|
||||||
|
}
|
Reference in New Issue
Block a user