MXS-2555 Beta Smart Router.

1. Remove persistence of performance data
2. Move global CanonicalPerformance into SmartRouter object
3. Implement another kill_all_others_v2. Left kill_all_others_v1
    in case it should be fixed and used instead.
This commit is contained in:
Niclas Antti
2019-06-13 13:43:30 +03:00
parent 1241300494
commit 6a7b6d4b89
6 changed files with 138 additions and 144 deletions

View File

@ -19,11 +19,9 @@
const std::string file_version = "Alpha"; // if a file has a different version string, discard it.
CanonicalPerformance::CanonicalPerformance(const std::string& persistent_file)
: m_persistent_file(persistent_file)
, m_nChanges(0)
CanonicalPerformance::CanonicalPerformance()
: m_nChanges(0)
{
read_persisted();
}
bool CanonicalPerformance::insert(const std::string& canonical, const PerformanceInfo& perf)
@ -50,71 +48,6 @@ PerformanceInfo CanonicalPerformance::find(const std::string& canonical)
void CanonicalPerformance::clear()
{
m_perfs.clear();
std::remove(m_persistent_file.c_str());
m_nChanges = 0;
}
// TODO, expensive. Saves the whole file whenever there are changes.
void CanonicalPerformance::persist() const
{
if (m_nChanges == 0)
{
return;
}
std::ofstream out(m_persistent_file);
if (!out)
{
MXS_ERROR("Could not open %s for writing", m_persistent_file.c_str());
}
out << file_version << '\n';
for (const auto& e : m_perfs)
{
out << e.first << '\n';
out << e.second.host() << '\n';
out << e.second.duration().count() << '\n';
}
m_nChanges = 0;
}
void CanonicalPerformance::read_persisted()
{
std::ifstream in(m_persistent_file);
if (!in)
{
return;
}
std::string version;
std::getline(in, version);
if (version != file_version)
{
MXS_INFO("%s version does not match the expected one. Discarding file.", m_persistent_file.c_str());
in.close();
std::remove(m_persistent_file.c_str());
return;
}
while (in)
{
std::string canonical;
std::string host_str;
std::string nano_str;
std::getline(in, canonical);
std::getline(in, host_str);
std::getline(in, nano_str);
if (!in)
{
break;
}
m_perfs.insert({canonical, {maxbase::Host(host_str), maxbase::Duration(std::stoull(nano_str))}});
}
m_nChanges = 0;
}
@ -132,7 +65,7 @@ std::string show_some(const std::string& str, int nchars)
}
// This needs TODO:
// These are TODOs for the GA version. The Beta version will not have persistence.
// 1. Read the file once at startup. There might also be a need to do cleanup
// of the file if the configuration has changed.
// 2. Updates to the data should "quickly" become globally available,
@ -153,44 +86,3 @@ std::string show_some(const std::string& str, int nchars)
// 3. If the measured time is very different from the stored one (+/20%),
// expire the entry (not implemented).
// More rules can be found out by testing.
namespace
{
std::mutex canon_mutex;
const std::string persistent_file = "/tmp/max_canonical_perf.dat"; // TODO config
CanonicalPerformance& canon_store()
{
// Note that the age of entries become "Now", the age was not written to file.
static CanonicalPerformance cp(persistent_file);
return cp;
}
}
PerformanceInfo perf_find(const std::string& canonical)
{
std::unique_lock<std::mutex> guard(canon_mutex);
auto perf = canon_store().find(canonical);
if (perf.is_valid() && perf.age() > std::chrono::minutes(1)) // TODO to config, but not yet
{
canon_store().remove(canonical);
return PerformanceInfo();
}
return perf;
}
bool perf_update(const std::string& canonical, const PerformanceInfo& perf)
{
std::unique_lock<std::mutex> guard(canon_mutex);
auto ret = canon_store().insert(canonical, perf);
canon_store().persist();
if (ret)
{
MXS_SDEBUG("Stored perf " << perf.duration() << ' ' << perf.host() << ' ' << show_some(canonical));
}
return ret;
}

View File

@ -36,6 +36,7 @@ public:
/** When was this PerformanceInfo created.
*/
maxbase::TimePoint creation_time() const;
/** Duration since this PerformanceInfo was created
*/
maxbase::Duration age() const;
@ -46,13 +47,14 @@ private:
maxbase::TimePoint m_creation_time = maxbase::Clock::now();
};
/** class CanonicalPerformance holds the persistent performance
* info gathered thus far.
/** class CanonicalPerformance holds the performance
* info gathered since the start of Maxscale.
* The Beta release will not perist to file.
*/
class CanonicalPerformance
{
public:
CanonicalPerformance(const std::string& persistent_file);
explicit CanonicalPerformance();
/** Insert if not already inserted and return true, else false. */
bool insert(const std::string& canonical, const PerformanceInfo& perf);
@ -64,23 +66,12 @@ public:
PerformanceInfo find(const std::string& canonical);
void clear();
void persist() const;
private:
const std::string m_persistent_file;
std::unordered_map<std::string, PerformanceInfo> m_perfs;
void read_persisted();
mutable int m_nChanges;
};
// Threadsafe global singleton behind perf_find and perf_update.
// perf_find, find existing performance info, handles expirations
PerformanceInfo perf_find(const std::string& canonical);
// Insert if not already inserted and return true, else false.
// Will probably handle some expiration like work.
bool perf_update(const std::string& canonical, const PerformanceInfo& perf);
// For logging. Shortens str to nchars and adds "..." TODO move somewhere more appropriate
std::string show_some(const std::string& str, int nchars = 70);

View File

@ -25,19 +25,17 @@ namespace smartrouter
config::Specification specification(MXS_MODULE_NAME, config::Specification::ROUTER);
config::ParamServer
master(&specification,
"master",
"The server/cluster to be treated as master, that is, the one where updates are sent.");
master(&specification,
"master",
"The server/cluster to be treated as master, that is, the one where updates are sent.");
config::ParamBool
persist_performance_data(&specification,
"persist_performance_data",
"Persist performance data so that the smartrouter can use information "
"collected during earlier runs.",
true); // Default value
persist_performance_data(&specification,
"persist_performance_data",
"Persist performance data so that the smartrouter can use information "
"collected during earlier runs.",
true); // Default value
}
}
/**
@ -126,7 +124,7 @@ bool SmartRouter::Config::post_configure(const MXS_CONFIG_PARAMETER& params)
{
if (!s.empty())
{
s+= ", ";
s += ", ";
}
s += server->name();
@ -198,3 +196,30 @@ uint64_t SmartRouter::getCapabilities()
{
return RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_CONTIGUOUS_INPUT | RCAP_TYPE_CONTIGUOUS_OUTPUT;
}
PerformanceInfo SmartRouter::perf_find(const std::string& canonical)
{
std::unique_lock<std::mutex> guard(m_perf_mutex);
auto perf = m_performance.find(canonical);
if (perf.is_valid() && perf.age() > std::chrono::minutes(1)) // TODO to config, but not yet
{
m_performance.remove(canonical);
return PerformanceInfo();
}
return perf;
}
bool SmartRouter::perf_update(const std::string& canonical, const PerformanceInfo& perf)
{
std::unique_lock<std::mutex> guard(m_perf_mutex);
auto ret = m_performance.insert(canonical, perf);
if (ret)
{
MXS_SDEBUG("Stored perf " << perf.duration() << ' ' << perf.host() << ' ' << show_some(canonical));
}
return ret;
}

View File

@ -18,6 +18,8 @@
* @file Smart Router. Routes queries to the best router for the type of query.
*/
#include "performance.hh"
#include <maxscale/ccdefs.hh>
#include <maxscale/config2.hh>
#include <maxscale/router.hh>
@ -75,8 +77,18 @@ public:
return m_config;
}
/** Thread safe find a PerformanceInfo. Some entry expiration handling is done here.
*/
PerformanceInfo perf_find(const std::string& canonical);
/** Thread safe update/insert a PerformanceInfo. Some entry expiration handling is done here.
*/
bool perf_update(const std::string& canonical, const PerformanceInfo& perf);
private:
SmartRouter(SERVICE* service);
Config m_config;
Config m_config;
CanonicalPerformance m_performance;
std::mutex m_perf_mutex;
};

View File

@ -74,10 +74,11 @@ std::string extract_error(GWBUF* buffer)
return rval;
}
SmartRouterSession::SmartRouterSession(SmartRouter*,
SmartRouterSession::SmartRouterSession(SmartRouter* pRouter,
MXS_SESSION* pSession,
Clusters clusters)
: mxs::RouterSession(pSession)
, m_router(*pRouter)
, m_pClient_dcb(pSession->client_dcb)
, m_clusters(std::move(clusters))
, m_qc(this, pSession, TYPE_ALL)
@ -192,7 +193,7 @@ int SmartRouterSession::routeQuery(GWBUF* pBuf)
else
{
std::string canonical = maxscale::get_canonical(pBuf);
auto perf = perf_find(canonical);
auto perf = m_router.perf_find(canonical);
if (perf.is_valid())
{
@ -287,8 +288,10 @@ void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
if (m_mode == Mode::MeasureQuery)
{
perf_update(m_measurement.canonical, {cluster.host, query_dur});
// kill_all_others(cluster.host);
m_router.perf_update(m_measurement.canonical, {cluster.host, query_dur});
// If the query is still going on, an error packet is received, else the
// whole query might play out (and be discarded).
kill_all_others_v2(cluster.host);
}
m_mode = Mode::CollectResults;
@ -435,7 +438,11 @@ bool SmartRouterSession::write_split_packets(GWBUF* pBuf)
return true; // TODO. What could possibly go wrong?
}
void SmartRouterSession::kill_all_others(const maxbase::Host& host)
/* TODO This should work much the way that kill_all_others_v2 works, but it does
* not. Something funky happens to the dcb/pipeline when this is used.
* Leaving it here, since it should be fixed.
*/
void SmartRouterSession::kill_all_others_v1(const maxbase::Host& host)
{
MySQLProtocol* pProt = static_cast<MySQLProtocol*>(m_pClient_dcb->protocol);
uint64_t mysql_thread_id = pProt->thread_id;
@ -452,6 +459,70 @@ void SmartRouterSession::kill_all_others(const maxbase::Host& host)
}
}
struct KillStruct
{
std::string user;
std::string password;
maxbase::Host host;
int mysql_thread_id;
};
using KillStructs = std::vector<KillStruct>;
void kill_thread(const KillStructs& kill_structs)
{
for (auto& ks : kill_structs)
{
auto conn = mysql_init(nullptr);
if (mysql_real_connect(conn, ks.host.address().c_str(),
ks.user.c_str(), ks.password.c_str(),
"", ks.host.port(), nullptr, 0) == nullptr)
{
MXS_SERROR("Trying to kill query on " << ks.host << " but failed to connect");
continue;
}
std::ostringstream os;
os << "kill query " << ks.mysql_thread_id;
auto sql = os.str();
MXS_SINFO("Sending '" << sql << "' to " << ks.host);
mysql_real_query(conn, sql.c_str(), sql.size());
auto err_code = mysql_errno(conn);
if (err_code)
{
MXS_SERROR("Failed to send kill err code=" << err_code);
}
}
}
void SmartRouterSession::kill_all_others_v2(const maxbase::Host& host)
{
MySQLProtocol* pProt = static_cast<MySQLProtocol*>(m_pClient_dcb->protocol);
int mysql_thread_id = pProt->thread_id;
KillStructs kill_structs;
for (Cluster& cluster : m_clusters)
{
if (cluster.host == host || !cluster.tracker.expecting_response_packets())
{
continue;
}
// TODO TODO: Where do the user and password come from? And also, open
// a permanent connection to each Cluster for killing.
std::string TODO_user = "maxscale";
std::string TODO_password = "pass";
kill_structs.push_back(KillStruct {TODO_user, TODO_password,
cluster.host, mysql_thread_id});
MXS_SDEBUG("Queue " << cluster.host << " mysql_thread_id=" << mysql_thread_id << " for kill");
}
if (!kill_structs.empty())
{
std::thread murderer {kill_thread, kill_structs};
murderer.detach();
}
}
void SmartRouterSession::handleError(GWBUF* pPacket,
DCB* pProblem,

View File

@ -93,7 +93,8 @@ private:
bool write_to_all(GWBUF* pBuf, Mode mode);
bool write_split_packets(GWBUF* pBuf);
void kill_all_others(const maxbase::Host& host);
void kill_all_others_v1(const maxbase::Host& host);
void kill_all_others_v2(const maxbase::Host& host);
bool expecting_request_packets() const;
bool expecting_response_packets() const;
@ -104,6 +105,8 @@ private:
bool is_locked_to_master() const override;
bool supports_hint(HINT_TYPE hint_type) const override;
SmartRouter& m_router;
Mode m_mode = Mode::Idle;
GWBUF* m_pDelayed_packet = nullptr;