diff --git a/server/modules/routing/smartrouter/performance.cc b/server/modules/routing/smartrouter/performance.cc index ad4213da4..35134e554 100644 --- a/server/modules/routing/smartrouter/performance.cc +++ b/server/modules/routing/smartrouter/performance.cc @@ -19,11 +19,9 @@ const std::string file_version = "Alpha"; // if a file has a different version string, discard it. -CanonicalPerformance::CanonicalPerformance(const std::string& persistent_file) - : m_persistent_file(persistent_file) - , m_nChanges(0) +CanonicalPerformance::CanonicalPerformance() + : m_nChanges(0) { - read_persisted(); } bool CanonicalPerformance::insert(const std::string& canonical, const PerformanceInfo& perf) @@ -50,71 +48,6 @@ PerformanceInfo CanonicalPerformance::find(const std::string& canonical) void CanonicalPerformance::clear() { m_perfs.clear(); - std::remove(m_persistent_file.c_str()); - m_nChanges = 0; -} - -// TODO, expensive. Saves the whole file whenever there are changes. -void CanonicalPerformance::persist() const -{ - if (m_nChanges == 0) - { - return; - } - - std::ofstream out(m_persistent_file); - if (!out) - { - MXS_ERROR("Could not open %s for writing", m_persistent_file.c_str()); - } - - out << file_version << '\n'; - - for (const auto& e : m_perfs) - { - out << e.first << '\n'; - out << e.second.host() << '\n'; - out << e.second.duration().count() << '\n'; - } - - m_nChanges = 0; -} - -void CanonicalPerformance::read_persisted() -{ - std::ifstream in(m_persistent_file); - if (!in) - { - return; - } - - std::string version; - std::getline(in, version); - if (version != file_version) - { - MXS_INFO("%s version does not match the expected one. Discarding file.", m_persistent_file.c_str()); - in.close(); - std::remove(m_persistent_file.c_str()); - return; - } - - while (in) - { - std::string canonical; - std::string host_str; - std::string nano_str; - std::getline(in, canonical); - std::getline(in, host_str); - std::getline(in, nano_str); - - if (!in) - { - break; - } - - m_perfs.insert({canonical, {maxbase::Host(host_str), maxbase::Duration(std::stoull(nano_str))}}); - } - m_nChanges = 0; } @@ -132,7 +65,7 @@ std::string show_some(const std::string& str, int nchars) } -// This needs TODO: +// These are TODOs for the GA version. The Beta version will not have persistence. // 1. Read the file once at startup. There might also be a need to do cleanup // of the file if the configuration has changed. // 2. Updates to the data should "quickly" become globally available, @@ -153,44 +86,3 @@ std::string show_some(const std::string& str, int nchars) // 3. If the measured time is very different from the stored one (+/20%), // expire the entry (not implemented). // More rules can be found out by testing. - - -namespace -{ -std::mutex canon_mutex; -const std::string persistent_file = "/tmp/max_canonical_perf.dat"; // TODO config -CanonicalPerformance& canon_store() -{ - // Note that the age of entries become "Now", the age was not written to file. - static CanonicalPerformance cp(persistent_file); - return cp; -} -} - -PerformanceInfo perf_find(const std::string& canonical) -{ - std::unique_lock guard(canon_mutex); - auto perf = canon_store().find(canonical); - - if (perf.is_valid() && perf.age() > std::chrono::minutes(1)) // TODO to config, but not yet - { - canon_store().remove(canonical); - return PerformanceInfo(); - } - - return perf; -} - -bool perf_update(const std::string& canonical, const PerformanceInfo& perf) -{ - std::unique_lock guard(canon_mutex); - auto ret = canon_store().insert(canonical, perf); - canon_store().persist(); - - if (ret) - { - MXS_SDEBUG("Stored perf " << perf.duration() << ' ' << perf.host() << ' ' << show_some(canonical)); - } - - return ret; -} diff --git a/server/modules/routing/smartrouter/performance.hh b/server/modules/routing/smartrouter/performance.hh index bc58108c0..02eace7c4 100644 --- a/server/modules/routing/smartrouter/performance.hh +++ b/server/modules/routing/smartrouter/performance.hh @@ -36,6 +36,7 @@ public: /** When was this PerformanceInfo created. */ maxbase::TimePoint creation_time() const; + /** Duration since this PerformanceInfo was created */ maxbase::Duration age() const; @@ -46,13 +47,14 @@ private: maxbase::TimePoint m_creation_time = maxbase::Clock::now(); }; -/** class CanonicalPerformance holds the persistent performance - * info gathered thus far. +/** class CanonicalPerformance holds the performance + * info gathered since the start of Maxscale. + * The Beta release will not perist to file. */ class CanonicalPerformance { public: - CanonicalPerformance(const std::string& persistent_file); + explicit CanonicalPerformance(); /** Insert if not already inserted and return true, else false. */ bool insert(const std::string& canonical, const PerformanceInfo& perf); @@ -64,23 +66,12 @@ public: PerformanceInfo find(const std::string& canonical); void clear(); - void persist() const; - private: - const std::string m_persistent_file; std::unordered_map m_perfs; - void read_persisted(); + mutable int m_nChanges; }; -// Threadsafe global singleton behind perf_find and perf_update. -// perf_find, find existing performance info, handles expirations -PerformanceInfo perf_find(const std::string& canonical); - -// Insert if not already inserted and return true, else false. -// Will probably handle some expiration like work. -bool perf_update(const std::string& canonical, const PerformanceInfo& perf); - // For logging. Shortens str to nchars and adds "..." TODO move somewhere more appropriate std::string show_some(const std::string& str, int nchars = 70); diff --git a/server/modules/routing/smartrouter/smartrouter.cc b/server/modules/routing/smartrouter/smartrouter.cc index 3ab800deb..b1ff53077 100644 --- a/server/modules/routing/smartrouter/smartrouter.cc +++ b/server/modules/routing/smartrouter/smartrouter.cc @@ -25,19 +25,17 @@ namespace smartrouter config::Specification specification(MXS_MODULE_NAME, config::Specification::ROUTER); config::ParamServer -master(&specification, - "master", - "The server/cluster to be treated as master, that is, the one where updates are sent."); + master(&specification, + "master", + "The server/cluster to be treated as master, that is, the one where updates are sent."); config::ParamBool -persist_performance_data(&specification, - "persist_performance_data", - "Persist performance data so that the smartrouter can use information " - "collected during earlier runs.", - true); // Default value - + persist_performance_data(&specification, + "persist_performance_data", + "Persist performance data so that the smartrouter can use information " + "collected during earlier runs.", + true); // Default value } - } /** @@ -126,7 +124,7 @@ bool SmartRouter::Config::post_configure(const MXS_CONFIG_PARAMETER& params) { if (!s.empty()) { - s+= ", "; + s += ", "; } s += server->name(); @@ -198,3 +196,30 @@ uint64_t SmartRouter::getCapabilities() { return RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_CONTIGUOUS_INPUT | RCAP_TYPE_CONTIGUOUS_OUTPUT; } + +PerformanceInfo SmartRouter::perf_find(const std::string& canonical) +{ + std::unique_lock guard(m_perf_mutex); + auto perf = m_performance.find(canonical); + + if (perf.is_valid() && perf.age() > std::chrono::minutes(1)) // TODO to config, but not yet + { + m_performance.remove(canonical); + return PerformanceInfo(); + } + + return perf; +} + +bool SmartRouter::perf_update(const std::string& canonical, const PerformanceInfo& perf) +{ + std::unique_lock guard(m_perf_mutex); + auto ret = m_performance.insert(canonical, perf); + + if (ret) + { + MXS_SDEBUG("Stored perf " << perf.duration() << ' ' << perf.host() << ' ' << show_some(canonical)); + } + + return ret; +} diff --git a/server/modules/routing/smartrouter/smartrouter.hh b/server/modules/routing/smartrouter/smartrouter.hh index e5193d4ac..9bbdacbd7 100644 --- a/server/modules/routing/smartrouter/smartrouter.hh +++ b/server/modules/routing/smartrouter/smartrouter.hh @@ -18,6 +18,8 @@ * @file Smart Router. Routes queries to the best router for the type of query. */ +#include "performance.hh" + #include #include #include @@ -75,8 +77,18 @@ public: return m_config; } + /** Thread safe find a PerformanceInfo. Some entry expiration handling is done here. + */ + PerformanceInfo perf_find(const std::string& canonical); + + /** Thread safe update/insert a PerformanceInfo. Some entry expiration handling is done here. + */ + bool perf_update(const std::string& canonical, const PerformanceInfo& perf); + private: SmartRouter(SERVICE* service); - Config m_config; + Config m_config; + CanonicalPerformance m_performance; + std::mutex m_perf_mutex; }; diff --git a/server/modules/routing/smartrouter/smartsession.cc b/server/modules/routing/smartrouter/smartsession.cc index 98db9bc47..8e59d9c41 100644 --- a/server/modules/routing/smartrouter/smartsession.cc +++ b/server/modules/routing/smartrouter/smartsession.cc @@ -74,10 +74,11 @@ std::string extract_error(GWBUF* buffer) return rval; } -SmartRouterSession::SmartRouterSession(SmartRouter*, +SmartRouterSession::SmartRouterSession(SmartRouter* pRouter, MXS_SESSION* pSession, Clusters clusters) : mxs::RouterSession(pSession) + , m_router(*pRouter) , m_pClient_dcb(pSession->client_dcb) , m_clusters(std::move(clusters)) , m_qc(this, pSession, TYPE_ALL) @@ -192,7 +193,7 @@ int SmartRouterSession::routeQuery(GWBUF* pBuf) else { std::string canonical = maxscale::get_canonical(pBuf); - auto perf = perf_find(canonical); + auto perf = m_router.perf_find(canonical); if (perf.is_valid()) { @@ -287,8 +288,10 @@ void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) if (m_mode == Mode::MeasureQuery) { - perf_update(m_measurement.canonical, {cluster.host, query_dur}); - // kill_all_others(cluster.host); + m_router.perf_update(m_measurement.canonical, {cluster.host, query_dur}); + // If the query is still going on, an error packet is received, else the + // whole query might play out (and be discarded). + kill_all_others_v2(cluster.host); } m_mode = Mode::CollectResults; @@ -435,7 +438,11 @@ bool SmartRouterSession::write_split_packets(GWBUF* pBuf) return true; // TODO. What could possibly go wrong? } -void SmartRouterSession::kill_all_others(const maxbase::Host& host) +/* TODO This should work much the way that kill_all_others_v2 works, but it does + * not. Something funky happens to the dcb/pipeline when this is used. + * Leaving it here, since it should be fixed. + */ +void SmartRouterSession::kill_all_others_v1(const maxbase::Host& host) { MySQLProtocol* pProt = static_cast(m_pClient_dcb->protocol); uint64_t mysql_thread_id = pProt->thread_id; @@ -452,6 +459,70 @@ void SmartRouterSession::kill_all_others(const maxbase::Host& host) } } +struct KillStruct +{ + std::string user; + std::string password; + maxbase::Host host; + int mysql_thread_id; +}; +using KillStructs = std::vector; + +void kill_thread(const KillStructs& kill_structs) +{ + for (auto& ks : kill_structs) + { + auto conn = mysql_init(nullptr); + if (mysql_real_connect(conn, ks.host.address().c_str(), + ks.user.c_str(), ks.password.c_str(), + "", ks.host.port(), nullptr, 0) == nullptr) + { + MXS_SERROR("Trying to kill query on " << ks.host << " but failed to connect"); + continue; + } + + std::ostringstream os; + os << "kill query " << ks.mysql_thread_id; + auto sql = os.str(); + MXS_SINFO("Sending '" << sql << "' to " << ks.host); + mysql_real_query(conn, sql.c_str(), sql.size()); + auto err_code = mysql_errno(conn); + if (err_code) + { + MXS_SERROR("Failed to send kill err code=" << err_code); + } + } +} + +void SmartRouterSession::kill_all_others_v2(const maxbase::Host& host) +{ + MySQLProtocol* pProt = static_cast(m_pClient_dcb->protocol); + int mysql_thread_id = pProt->thread_id; + + KillStructs kill_structs; + for (Cluster& cluster : m_clusters) + { + if (cluster.host == host || !cluster.tracker.expecting_response_packets()) + { + continue; + } + // TODO TODO: Where do the user and password come from? And also, open + // a permanent connection to each Cluster for killing. + std::string TODO_user = "maxscale"; + std::string TODO_password = "pass"; + + kill_structs.push_back(KillStruct {TODO_user, TODO_password, + cluster.host, mysql_thread_id}); + MXS_SDEBUG("Queue " << cluster.host << " mysql_thread_id=" << mysql_thread_id << " for kill"); + } + + if (!kill_structs.empty()) + { + std::thread murderer {kill_thread, kill_structs}; + murderer.detach(); + } +} + void SmartRouterSession::handleError(GWBUF* pPacket, DCB* pProblem, diff --git a/server/modules/routing/smartrouter/smartsession.hh b/server/modules/routing/smartrouter/smartsession.hh index 874eec624..f12e6b5ea 100644 --- a/server/modules/routing/smartrouter/smartsession.hh +++ b/server/modules/routing/smartrouter/smartsession.hh @@ -93,7 +93,8 @@ private: bool write_to_all(GWBUF* pBuf, Mode mode); bool write_split_packets(GWBUF* pBuf); - void kill_all_others(const maxbase::Host& host); + void kill_all_others_v1(const maxbase::Host& host); + void kill_all_others_v2(const maxbase::Host& host); bool expecting_request_packets() const; bool expecting_response_packets() const; @@ -104,6 +105,8 @@ private: bool is_locked_to_master() const override; bool supports_hint(HINT_TYPE hint_type) const override; + SmartRouter& m_router; + Mode m_mode = Mode::Idle; GWBUF* m_pDelayed_packet = nullptr;