MXS-2555 Simple Smart Router

Does the measurments, usage of the same  and persistence to file.
Kill is not implemented, so waits for all clusters to fully responds

The performance data uses a mutex and the persistence data file
is written while holding the mutex. This obviously needs to be
improved, but this commit shows the working concept.
This commit is contained in:
Niclas Antti 2019-06-11 11:07:53 +03:00
parent c47a2d32fa
commit 1241300494
5 changed files with 381 additions and 31 deletions

View File

@ -1,6 +1,7 @@
add_library(smartrouter SHARED
smartrouter.cc
smartsession.cc
performance.cc
)
target_link_libraries(smartrouter maxscale-common mysqlcommon)

View File

@ -0,0 +1,196 @@
/*
* Copyright (c) 2019 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2022-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "performance.hh"
#include <cstdio>
#include <fstream>
#include <mutex>
const std::string file_version = "Alpha"; // if a file has a different version string, discard it.
CanonicalPerformance::CanonicalPerformance(const std::string& persistent_file)
: m_persistent_file(persistent_file)
, m_nChanges(0)
{
read_persisted();
}
bool CanonicalPerformance::insert(const std::string& canonical, const PerformanceInfo& perf)
{
bool saved = m_perfs.insert({canonical, perf}).second;
m_nChanges += saved;
return saved;
}
bool CanonicalPerformance::remove(const std::string& canonical)
{
auto erased = m_perfs.erase(canonical);
m_nChanges += erased;
return erased;
}
PerformanceInfo CanonicalPerformance::find(const std::string& canonical)
{
auto it = m_perfs.find(canonical);
return it == m_perfs.end() ? PerformanceInfo() : it->second;
}
void CanonicalPerformance::clear()
{
m_perfs.clear();
std::remove(m_persistent_file.c_str());
m_nChanges = 0;
}
// TODO, expensive. Saves the whole file whenever there are changes.
void CanonicalPerformance::persist() const
{
if (m_nChanges == 0)
{
return;
}
std::ofstream out(m_persistent_file);
if (!out)
{
MXS_ERROR("Could not open %s for writing", m_persistent_file.c_str());
}
out << file_version << '\n';
for (const auto& e : m_perfs)
{
out << e.first << '\n';
out << e.second.host() << '\n';
out << e.second.duration().count() << '\n';
}
m_nChanges = 0;
}
void CanonicalPerformance::read_persisted()
{
std::ifstream in(m_persistent_file);
if (!in)
{
return;
}
std::string version;
std::getline(in, version);
if (version != file_version)
{
MXS_INFO("%s version does not match the expected one. Discarding file.", m_persistent_file.c_str());
in.close();
std::remove(m_persistent_file.c_str());
return;
}
while (in)
{
std::string canonical;
std::string host_str;
std::string nano_str;
std::getline(in, canonical);
std::getline(in, host_str);
std::getline(in, nano_str);
if (!in)
{
break;
}
m_perfs.insert({canonical, {maxbase::Host(host_str), maxbase::Duration(std::stoull(nano_str))}});
}
m_nChanges = 0;
}
std::string show_some(const std::string& str, int nchars)
{
int sz = str.length();
if (sz > nchars)
{
return str.substr(0, nchars) + "...";
}
else
{
return str;
}
}
// This needs TODO:
// 1. Read the file once at startup. There might also be a need to do cleanup
// of the file if the configuration has changed.
// 2. Updates to the data should "quickly" become globally available,
// rather than be written after every change.
// 3. Writing the file back should go through something like of lockless queue,
// that triggers a write, possibly when there is less load on the system.
// 4. Every now and then some form of re-learn, maybe just dropping entries after
// some expiration time.
// 5. If a host goes away (even for maintenance) entries associated with it should
// probably be dropped immediately.
// 6. Save all data at shutdown.
// Start using xxhash
//
// Expiration rules. At least these rules should be implemented:
// 1. Since the various engines have different setup times during the first few queries,
// this should be taken into account (not implemented).
// 2. Expire entries after X minutes.
// 3. If the measured time is very different from the stored one (+/20%),
// expire the entry (not implemented).
// More rules can be found out by testing.
namespace
{
std::mutex canon_mutex;
const std::string persistent_file = "/tmp/max_canonical_perf.dat"; // TODO config
CanonicalPerformance& canon_store()
{
// Note that the age of entries become "Now", the age was not written to file.
static CanonicalPerformance cp(persistent_file);
return cp;
}
}
PerformanceInfo perf_find(const std::string& canonical)
{
std::unique_lock<std::mutex> guard(canon_mutex);
auto perf = canon_store().find(canonical);
if (perf.is_valid() && perf.age() > std::chrono::minutes(1)) // TODO to config, but not yet
{
canon_store().remove(canonical);
return PerformanceInfo();
}
return perf;
}
bool perf_update(const std::string& canonical, const PerformanceInfo& perf)
{
std::unique_lock<std::mutex> guard(canon_mutex);
auto ret = canon_store().insert(canonical, perf);
canon_store().persist();
if (ret)
{
MXS_SDEBUG("Stored perf " << perf.duration() << ' ' << perf.host() << ' ' << show_some(canonical));
}
return ret;
}

View File

@ -0,0 +1,117 @@
/*
* Copyright (c) 2019 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2022-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#pragma once
#include <maxscale/ccdefs.hh>
#include <maxbase/stopwatch.hh>
#include <maxbase/host.hh>
#include <unordered_map>
/** Class PerformanceInfo is a basic structure for storing a Host and Duration pair, along with
* the time it was created.
*/
class PerformanceInfo
{
public:
PerformanceInfo() = default; // creates an instance where is_valid()==false;
PerformanceInfo(const maxbase::Host& h, maxbase::Duration d);
bool is_valid() const;
maxbase::Host host() const;
maxbase::Duration duration() const;
/** When was this PerformanceInfo created.
*/
maxbase::TimePoint creation_time() const;
/** Duration since this PerformanceInfo was created
*/
maxbase::Duration age() const;
private:
maxbase::Host m_host;
maxbase::Duration m_duration;
maxbase::TimePoint m_creation_time = maxbase::Clock::now();
};
/** class CanonicalPerformance holds the persistent performance
* info gathered thus far.
*/
class CanonicalPerformance
{
public:
CanonicalPerformance(const std::string& persistent_file);
/** Insert if not already inserted and return true, else false. */
bool insert(const std::string& canonical, const PerformanceInfo& perf);
/** Remove if entry exists and return true, else false. */
bool remove(const std::string& canonical);
/** If entry does not exists, returned PerformanceInfo::is_valid()==false */
PerformanceInfo find(const std::string& canonical);
void clear();
void persist() const;
private:
const std::string m_persistent_file;
std::unordered_map<std::string, PerformanceInfo> m_perfs;
void read_persisted();
mutable int m_nChanges;
};
// Threadsafe global singleton behind perf_find and perf_update.
// perf_find, find existing performance info, handles expirations
PerformanceInfo perf_find(const std::string& canonical);
// Insert if not already inserted and return true, else false.
// Will probably handle some expiration like work.
bool perf_update(const std::string& canonical, const PerformanceInfo& perf);
// For logging. Shortens str to nchars and adds "..." TODO move somewhere more appropriate
std::string show_some(const std::string& str, int nchars = 70);
// implementation details below
inline PerformanceInfo::PerformanceInfo(const maxbase::Host& h, maxbase::Duration d)
: m_host(h)
, m_duration(d)
{
}
inline bool PerformanceInfo::is_valid() const
{
return m_host.is_valid();
}
inline maxbase::Host PerformanceInfo::host() const
{
return m_host;
}
inline maxbase::Duration PerformanceInfo::duration() const
{
return m_duration;
}
inline maxbase::TimePoint PerformanceInfo::creation_time() const
{
return m_creation_time;
}
inline maxbase::Duration PerformanceInfo::age() const
{
return maxbase::Clock::now() - m_creation_time;
}

View File

@ -12,34 +12,16 @@
*/
#include "smartsession.hh"
#include "smartrouter.hh"
#include "performance.hh"
#include <maxscale/modutil.hh>
#include <maxsql/mysql_plus.hh>
// TO_REVIEW:
// This is the base for Smart Router. It will currently route any normal query to all
// configured routers and only use the first response to forward back to the client.
// However, it should be reviewed as if it could actually be put in front of
// several current routers (several readwritesplits and readconnroutes) and
// succeed for anything but local infile.
// TO_REVIEW. There is no need to go through the functionality with a very, very fine-comb at this point,
// maria-test will be used to do that. The idea is really to look for the totality of the
// router and how it will interact with the rest of the system.
// TO_REVIEW routeQuery() and clientReply() are the obvious functions to check for correctness.
// TO_REVIEW my use of mxs::QueryClassifier might be overly simple. The use should
// be as simple as possible, but no simpler.
// TODO, missing error handling. I did not add overly many asserts, which make reading code harder.
// But please note any that may be missing.
// TODO, for m_qc.target_is_all(), check that responses from all routers match.
// TODO Smart Query is not here yet, this is just a stupid router-router.
// COPY-PASTED error-extraction functions from rwsplit. TODO move to lib.
inline void extract_error_state(uint8_t* pBuffer, uint8_t** ppState, uint16_t* pnState)
{
@ -194,10 +176,13 @@ int SmartRouterSession::routeQuery(GWBUF* pBuf)
else
{
auto route_info = m_qc.update_route_info(mxs::QueryClassifier::CURRENT_TARGET_UNDEFINED, pBuf);
m_measurement = {maxbase::Clock::now(), maxscale::get_canonical(pBuf)};
if (m_qc.target_is_all(route_info.target()))
{
MXS_SDEBUG("Write all");
ret = write_to_all(pBuf);
ret = write_to_all(pBuf, Mode::Query);
}
else if (m_qc.target_is_master(route_info.target()) || session_trx_is_active(m_pClient_dcb->session))
{
@ -206,9 +191,25 @@ int SmartRouterSession::routeQuery(GWBUF* pBuf)
}
else
{
// TODO: This is where canonical performance data will be used, and measurements initiated
// Currently writing to all for clientReply testing purposes.
ret = write_to_all(pBuf);
std::string canonical = maxscale::get_canonical(pBuf);
auto perf = perf_find(canonical);
if (perf.is_valid())
{
MXS_SDEBUG("Route to " << perf.host() << " based on performance, canonical = "
<< show_some(canonical));
ret = write_to_host(perf.host(), pBuf);
}
else if (modutil_is_SQL(pBuf))
{
MXS_SDEBUG("Start measurement");
ret = write_to_all(pBuf, Mode::MeasureQuery);
}
else
{
MXS_SWARNING("Could not determine target (non-sql query), goes to master");
ret = write_to_master(pBuf);
}
}
}
@ -233,7 +234,7 @@ void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
cluster.tracker.update_response(pPacket);
// these flags can all be true at the same time
bool first_response_packet = m_mode != Mode::CollectResults;
bool first_response_packet = (m_mode == Mode::Query || m_mode == Mode::MeasureQuery);
bool last_packet_for_this_cluster = !cluster.tracker.expecting_response_packets();
bool very_last_response_packet = !expecting_response_packets(); // last from all clusters
@ -278,10 +279,19 @@ void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
if (first_response_packet)
{
MXS_SDEBUG("Host " << cluster.host << " will be responding to the client");
maxbase::Duration query_dur = maxbase::Clock::now() - m_measurement.start;
MXS_SDEBUG("Host " << cluster.host << " will be responding to the client. "
<< "First packet received in time " << query_dur);
cluster.is_replying_to_client = true;
m_mode = Mode::CollectResults;
will_reply = true; // tentatively, the packet might have to be delayed
if (m_mode == Mode::MeasureQuery)
{
perf_update(m_measurement.canonical, {cluster.host, query_dur});
// kill_all_others(cluster.host);
}
m_mode = Mode::CollectResults;
}
if (very_last_response_packet)
@ -291,7 +301,7 @@ void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
mxb_assert(cluster.is_replying_to_client || m_pDelayed_packet);
if (m_pDelayed_packet)
{
MXS_SDEBUG("Picking up delayed packet, discarding response from" << cluster.host);
MXS_SDEBUG("Picking up delayed packet, discarding response from " << cluster.host);
gwbuf_free(pPacket);
pPacket = m_pDelayed_packet;
m_pDelayed_packet = nullptr;
@ -384,7 +394,7 @@ bool SmartRouterSession::write_to_host(const maxbase::Host& host, GWBUF* pBuf)
return cluster.pDcb->func.write(cluster.pDcb, pBuf);
}
bool SmartRouterSession::write_to_all(GWBUF* pBuf)
bool SmartRouterSession::write_to_all(GWBUF* pBuf, Mode mode)
{
for (auto it = begin(m_clusters); it != end(m_clusters); ++it)
{
@ -397,7 +407,7 @@ bool SmartRouterSession::write_to_all(GWBUF* pBuf)
if (expecting_response_packets())
{
m_mode = Mode::Query;
m_mode = mode;
}
return true; // TODO. What could possibly go wrong?
@ -425,6 +435,24 @@ bool SmartRouterSession::write_split_packets(GWBUF* pBuf)
return true; // TODO. What could possibly go wrong?
}
void SmartRouterSession::kill_all_others(const maxbase::Host& host)
{
MySQLProtocol* pProt = static_cast<MySQLProtocol*>(m_pClient_dcb->protocol);
uint64_t mysql_thread_id = pProt->thread_id;
for (Cluster& cluster : m_clusters)
{
if (cluster.host == host || !cluster.tracker.expecting_response_packets())
{
continue;
}
MXS_SDEBUG("Queue " << cluster.host << " mysql_thread_id=" << mysql_thread_id << " for kill");
mxs_mysql_execute_kill(cluster.pDcb->session, mysql_thread_id, KT_QUERY);
}
}
void SmartRouterSession::handleError(GWBUF* pPacket,
DCB* pProblem,
mxs_error_action_t action,

View File

@ -50,7 +50,7 @@ public:
bool* pSuccess);
private:
enum class Mode {Idle, Query, CollectResults}; // MeasureQuery
enum class Mode {Idle, Query, MeasureQuery, CollectResults};
/** struct Cluster represents a cluster of mariadb servers as a Maxscale internal Server.
* TODO In the next iteration a directly callable "Thing" should be implemented (Router, Backend
@ -90,9 +90,11 @@ private:
// The write functions initialize Cluster flags and Cluster::ProtocolTracker.
bool write_to_host(const maxbase::Host& host, GWBUF* pBuf);
bool write_to_master(GWBUF* pBuf);
bool write_to_all(GWBUF* pBuf);
bool write_to_all(GWBUF* pBuf, Mode mode);
bool write_split_packets(GWBUF* pBuf);
void kill_all_others(const maxbase::Host& host);
bool expecting_request_packets() const;
bool expecting_response_packets() const;
bool all_clusters_are_idle() const; // no clusters expect packets
@ -108,4 +110,10 @@ private:
DCB* m_pClient_dcb;
Clusters m_clusters;
mxs::QueryClassifier m_qc;
struct Measurement
{
maxbase::TimePoint start;
std::string canonical;
};
Measurement m_measurement;
};