diff --git a/server/modules/filter/CMakeLists.txt b/server/modules/filter/CMakeLists.txt index 36752ba42..647d117e8 100644 --- a/server/modules/filter/CMakeLists.txt +++ b/server/modules/filter/CMakeLists.txt @@ -15,3 +15,4 @@ add_subdirectory(tpmfilter) add_subdirectory(masking) add_subdirectory(insertstream) add_subdirectory(binlogfilter) +add_subdirectory(throttlefilter) diff --git a/server/modules/filter/throttlefilter/CMakeLists.txt b/server/modules/filter/throttlefilter/CMakeLists.txt new file mode 100644 index 000000000..677d6704e --- /dev/null +++ b/server/modules/filter/throttlefilter/CMakeLists.txt @@ -0,0 +1,4 @@ +add_library(throttlefilter SHARED throttlefilter.cc throttlesession.cc eventcount.cc stopwatch.cc) +target_link_libraries(throttlefilter maxscale-common mysqlcommon) +set_target_properties(throttlefilter PROPERTIES VERSION "1.0.0") +install_module(throttlefilter core) diff --git a/server/modules/filter/throttlefilter/eventcount.cc b/server/modules/filter/throttlefilter/eventcount.cc new file mode 100644 index 000000000..09b435c76 --- /dev/null +++ b/server/modules/filter/throttlefilter/eventcount.cc @@ -0,0 +1,292 @@ +/* + * Copyright (c) 2018 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include + +#include "eventcount.hh" + +#include +#include +#include +#include +#include + +namespace throttle +{ + +EventCount::EventCount(const std::string& event_id, Duration time_window, Duration granularity) : + m_event_id(event_id), + m_time_window(time_window), + m_granularity(granularity.count()) +{ + increment(); +} + +void EventCount::increment() +{ + using namespace std::chrono; + auto ticks = time_point_cast(Clock::now()).time_since_epoch().count(); + if (m_granularity) + { + ticks = ticks / m_granularity * m_granularity; + } + + if (m_timestamps.empty() + || m_timestamps.back().time_point.time_since_epoch().count() != ticks) + { + m_timestamps.emplace_back(TimePoint(ticks), 1); + } + else + { + ++m_timestamps.back().count; + } +} + +namespace +{ +struct TimePointLessEqual +{ + TimePoint lhs; + TimePointLessEqual(TimePoint tp) : lhs(tp) {} + bool operator()(const EventCount::Timestamp& rhs) const + { + return lhs <= rhs.time_point; + } + bool operator()(TimePoint rhs) const + { + return lhs <= rhs; + } +}; +} + +void EventCount::purge() const +{ + StopWatch sw; + auto windowBegin = Clock::now() - m_time_window; + + auto ite = std::find_if(m_timestamps.begin(), m_timestamps.end(), + TimePointLessEqual(windowBegin)); + m_timestamps.erase(m_timestamps.begin(), ite); +} + +int EventCount::count() const +{ + purge(); + int count {0}; + + for (auto ite = m_timestamps.begin(); ite != m_timestamps.end(); ++ite) + { + count += ite->count; + } + return count; +} + +void EventCount::dump(std::ostream &os) const +{ + os << m_event_id << ": " << count() << " " << m_timestamps.size(); +} + +std::ostream& operator<<(std::ostream& os, const EventCount& EventCount) +{ + EventCount.dump(os); + return os; +} + +// Force a purge once in awhile, could be configurable. This is needed if +// a client generates lots of events but rarely reads them back (purges). +const int CleanupCountdown = 10000; + +SessionCount::SessionCount(const std::string& sess_id, Duration time_window, + Duration granularity) : + m_sess_id(sess_id), m_time_window(time_window), m_granularity(granularity), + m_cleanup_countdown(CleanupCountdown) +{ +} + +const std::vector &SessionCount::event_counts() const +{ + purge(); + return m_event_counts; +} + +bool SessionCount::empty() const +{ + purge(); + return m_event_counts.empty(); +} + +namespace +{ +struct MatchEventId +{ + std::string event_id; + MatchEventId(const std::string& id) : event_id(id) {}; + bool operator()(const EventCount& stats) const + { + return event_id == stats.event_id(); + } +}; +} + +void SessionCount::increment(const std::string& event_id) +{ + // Always put the incremented entry (latest timestamp) last in the vector (using + // rotate). This means the vector is ordered so that expired entries are always first. + + // Find in reverse, the entry is more likely to be towards the end. Actually no, + // for some reason the normal search is slightly faster when measured. + auto ite = find_if(m_event_counts.begin(), m_event_counts.end(), + MatchEventId(event_id)); + if (ite == m_event_counts.end()) + { + m_event_counts.emplace_back(event_id, m_time_window, m_granularity); + } + else + { + ite->increment(); + // rotate so that the entry becomes the last one + auto next = std::next(ite); + std::rotate(ite, next, m_event_counts.end()); + } + + if (!--m_cleanup_countdown) + { + purge(); + } +} + +namespace +{ +struct NonZeroEntry +{ + bool operator()(const EventCount& stats) + { + return stats.count() != 0; + } +}; +} + +void SessionCount::purge() const +{ + StopWatch sw; + m_cleanup_countdown = CleanupCountdown; + // erase entries up to the first non-zero one + auto ite = find_if(m_event_counts.begin(), m_event_counts.end(), NonZeroEntry()); + // The gcc 4.4 vector::erase bug only happens if iterators are the same. + if (ite != m_event_counts.begin()) + { + m_event_counts.erase(m_event_counts.begin(), ite); + } +} + +void SessionCount::dump(std::ostream& os) const +{ + purge(); + if (!m_event_counts.empty()) + { + os << " Session: " << m_sess_id << '\n'; + for (auto ite = m_event_counts.begin(); ite != m_event_counts.end(); ++ite) + { + os << " " << *ite << '\n'; + } + } +} + +void dumpHeader(std::ostream& os, const SessionCount& stats, const std::string& type) +{ + TimePoint tp = Clock::now(); + os << type << ": Time:" << tp + << " Time Window: " << stats.time_window() << '\n'; +} + +void dump(std::ostream& os, const std::vector& sessions) +{ + if (sessions.empty()) + { + return; + } + + dumpHeader(os, sessions[0], "Count"); + for (auto session = sessions.begin(); session != sessions.end(); ++session) + { + session->dump(os); + } +} + +void dumpTotals(std::ostream& os, const std::vector &sessions) +{ + if (sessions.empty()) + { + return; + } + + std::map counts; + for (auto session = sessions.begin(); session != sessions.end(); ++session) + { + const auto& events = session->event_counts(); + for (auto event = events.begin(); event != events.end(); ++event) + { + counts[event->event_id()] += event->count(); + } + } + + if (!counts.empty()) + { + dumpHeader(os, sessions[0], "Count Totals"); + for (auto ite = counts.begin(); ite != counts.end(); ++ite) + { + os << " " << ite->first << ": " << ite->second << '\n'; + } + } +} + +// EXTRA +// This section needed for gcc 4.4, to use move semantics and variadics. + +EventCount::EventCount(EventCount && ss) : + m_event_id(std::move(ss.m_event_id)), + m_time_window(std::move(ss.m_time_window)), + m_granularity(std::move(ss.m_granularity)), + m_timestamps(std::move(ss.m_timestamps)) +{ +} + +EventCount &EventCount::operator=(EventCount && ss) +{ + m_event_id = std::move(ss.m_event_id); + m_time_window = std::move(ss.m_time_window); + m_granularity = std::move(ss.m_granularity); + m_timestamps = std::move(ss.m_timestamps); + return *this; +} + +SessionCount::SessionCount(SessionCount&& ss) : + m_sess_id(std::move(ss.m_sess_id)), + m_time_window(std::move(ss.m_time_window)), + m_granularity(std::move(ss.m_granularity)), + m_cleanup_countdown(std::move(ss.m_cleanup_countdown)), + m_event_counts(std::move(ss.m_event_counts)) +{ +} + +SessionCount & SessionCount::operator=(SessionCount&& ss) +{ + m_sess_id = std::move(ss.m_sess_id); + m_time_window = std::move(ss.m_time_window); + m_granularity = std::move(ss.m_granularity); + m_cleanup_countdown = std::move(ss.m_cleanup_countdown); + m_event_counts = std::move(ss.m_event_counts); + + return *this; +} +} // throttle diff --git a/server/modules/filter/throttlefilter/eventcount.hh b/server/modules/filter/throttlefilter/eventcount.hh new file mode 100644 index 000000000..fc7c7a5ed --- /dev/null +++ b/server/modules/filter/throttlefilter/eventcount.hh @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2018 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#pragma once + +#include "stopwatch.hh" +#include +#include + +namespace throttle +{ +/** + * @brief Keep a count of an events for a time period from "now" into the past. + * + * Events are stored, or distinguished, with timestamps of a given granularity. + * For example, if the granularity is set to 1s, each time an event is reported the current + * time is rounded down to the nearest whole second. All events that round down to the same + * second share a single entry in the EventCount. Granularity==0 causes all events (timestamps) + * to be stored in their own entry, which could use large amounts of memory. + * + */ +class EventCount +{ +public: + explicit EventCount(const std::string& event_id, Duration time_window, + Duration granularity = Duration(std::chrono::milliseconds(100))); + EventCount(const EventCount&) = delete; + EventCount& operator=(const EventCount&) = delete; + EventCount(EventCount&&); // can't be defaulted in gcc 4.4 + EventCount& operator=(EventCount&&); // can't be defaulted in gcc 4.4 + + const std::string& event_id() const + { + return m_event_id; + } + Duration time_window() const + { + return m_time_window; + } + void dump(std::ostream& os) const; + int count() const; + void increment(); + + // these defs need not be public once lambdas are available + struct Timestamp + { + TimePoint time_point; + int count; + Timestamp(TimePoint p, int c) : time_point(p), count(c) {} + }; +private: + void purge() const; // remove out of window stats + + std::string m_event_id; + Duration m_time_window; + Duration::rep m_granularity; + mutable std::vector m_timestamps; +}; + +std::ostream& operator<<(std::ostream& os, const EventCount& stats); + +// Time series statistics for a Session (a collection of related EventCount). +class SessionCount +{ +public: + SessionCount(const std::string& sess_id, Duration time_window, + Duration granularity = Duration(std::chrono::seconds(1))); + SessionCount(const SessionCount&) = delete; + SessionCount& operator=(const SessionCount&) = delete; + SessionCount(SessionCount &&); // can't be defaulted in gcc 4.4 + SessionCount& operator=(SessionCount&&); // can't be defaulted in gcc 4.4 + + const std::string& session_id() const + { + return m_sess_id; + } + Duration time_window() const + { + return m_time_window; + } + const std::vector& event_counts() const; + void dump(std::ostream& os) const; + bool empty() const; // no stats + + void increment(const std::string& event_id); +private: + void purge() const; // remove out of window stats + + std::string m_sess_id; + Duration m_time_window; + Duration m_granularity; + mutable int m_cleanup_countdown; + mutable std::vector m_event_counts; +}; + +// conveniece. Any real formatted output should go elsewhere. +std::ostream& operator<<(std::ostream& os, const SessionCount& stats); +void dump(std::ostream& os, const std::vector& sessions); +void dumpTotals(std::ostream& os, const std::vector &sessions); + +} // throttle diff --git a/server/modules/filter/throttlefilter/stopwatch.cc b/server/modules/filter/throttlefilter/stopwatch.cc new file mode 100644 index 000000000..dc1b302cd --- /dev/null +++ b/server/modules/filter/throttlefilter/stopwatch.cc @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2018 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include + +#include "stopwatch.hh" + +#include +#include +#include +#include + +namespace throttle +{ + +StopWatch::StopWatch() +{ + restart(); +} + +Duration StopWatch::lap() const +{ + return {Clock::now() - m_start}; +} + +Duration StopWatch::restart() +{ + TimePoint now = Clock::now(); + Duration lap = now - m_start; + m_start = now; + return lap; +} +} // maxscale + +/********** OUTPUT ***********/ +namespace +{ +using namespace throttle; +struct TimeConvert +{ + double div; // divide the value of the previous unit by this + std::string suffix; // milliseconds, hours etc. + double max_visual; // threashold to switch to the next unit +}; +// Will never get to centuries because the duration is a long carrying nanoseconds +TimeConvert convert[] +{ + {1, "ns", 1000}, {1000, "us", 1000}, {1000, "ms", 1000}, + {1000, "s", 60}, {60, "min", 60}, {60, "hours", 24}, + {24, "days", 365.25}, {365.25, "years", 10000}, + {100, "centuries", std::numeric_limits::max()} +}; + +int convert_size = sizeof(convert) / sizeof(convert[0]); + +} + +namespace throttle +{ +std::pair dur_to_human_readable(Duration dur) +{ + using namespace std::chrono; + double time = duration_cast(dur).count(); + bool negative = (time < 0) ? time = -time, true : false; + + for (int i = 0; i <= convert_size; ++i) + { + if (i == convert_size) + { + return std::make_pair(negative ? -time : time, + convert[convert_size - 1].suffix); + } + + time /= convert[i].div; + + if (time < convert[i].max_visual) + { + return std::make_pair(negative ? -time : time, convert[i].suffix); + } + } + + abort(); // should never get here +} + +std::ostream& operator<<(std::ostream& os, Duration dur) +{ + auto p = dur_to_human_readable(dur); + os << p.first << p.second; + + return os; +} + +// TODO: this will require some thought. time_point_to_string() for a system_clock is +// obvious, but not so for a steady_clock. Maybe TimePoint belongs to a system clock +// and sould be called something else here, and live in a time_measuring namespace. +std::string time_point_to_string(TimePoint tp, const std::string &fmt) +{ + using namespace std::chrono; + std::time_t timet = system_clock::to_time_t(system_clock::now() + + (tp - Clock::now())); + + struct tm * ptm; + ptm = gmtime (&timet); + const int sz = 1024; + char buf[sz]; + strftime(buf, sz, fmt.c_str(), ptm); + return buf; +} + +std::ostream & operator<<(std::ostream & os, TimePoint tp) +{ + os << time_point_to_string(tp); + return os; +} + +void test_stopwatch_output(std::ostream & os) +{ + long long dur[] = + { + 400, // 400ns + 5 * 1000, // 5us + 500 * 1000, // 500us + 1 * 1000000, // 1ms + 700 * 1000000LL, // 700ms + 5 * 1000000000LL, // 5s + 200 * 1000000000LL, // 200s + 5 * 60 * 1000000000LL, // 5m + 45 * 60 * 1000000000LL, // 45m + 130 * 60 * 1000000000LL, // 130m + 24 * 60 * 60 * 1000000000LL, // 24 hours + 3 * 24 * 60 * 60 * 1000000000LL, // 72 hours + 180 * 24 * 60 * 60 * 1000000000LL, // 180 days + 1000 * 24 * 60 * 60 * 1000000000LL // 1000 days + }; + + for (unsigned i = 0; i < sizeof(dur) / sizeof(dur[0]); ++i) + { + os << Duration(dur[i]) << std::endl; + } +} +} // throttle diff --git a/server/modules/filter/throttlefilter/stopwatch.hh b/server/modules/filter/throttlefilter/stopwatch.hh new file mode 100644 index 000000000..a53929fe6 --- /dev/null +++ b/server/modules/filter/throttlefilter/stopwatch.hh @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2018 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ +#pragma once + +#include +#include +#include + +namespace throttle +{ + +#if __cplusplus >= 201103 +typedef std::chrono::steady_clock Clock; +#else +typedef std::chrono::system_clock Clock; +#endif + +struct Duration : public Clock::duration // for ADL +{ + // gcc 4.4 does not inherit constructors, so this is a bit limited. + Duration() = default; + Duration(long long l) : Clock::duration(l) {} + Duration(Clock::duration d) : Clock::duration(d) {} +}; + +typedef std::chrono::time_point TimePoint; + +class StopWatch +{ +public: + // Starts the stopwatch, which is always running. + StopWatch(); + // Get elapsed time. + Duration lap() const; + // Get elapsed time, restart StopWatch. + Duration restart(); +private: + TimePoint m_start; +}; + +// Returns the value as a double and string adjusted to a suffix like ms for milliseconds. +std::pair dur_to_human_readable(Duration dur); + +// Human readable output. No standard library for it yet. +std::ostream& operator<<(std::ostream&, Duration dur); + +// TimePoint +std::string time_point_to_string(TimePoint tp, const std::string& fmt = "%F %T"); +std::ostream& operator<<(std::ostream&, TimePoint tp); + +} // throttle diff --git a/server/modules/filter/throttlefilter/throttlefilter.cc b/server/modules/filter/throttlefilter/throttlefilter.cc new file mode 100644 index 000000000..d999147d7 --- /dev/null +++ b/server/modules/filter/throttlefilter/throttlefilter.cc @@ -0,0 +1,130 @@ +/* + * Copyright (c) Niclas Antti + * + * This software is released under the MIT License. + */ + +#define MXS_MODULE_NAME "throttlefilter" + +#include +#include +#include +#include + +#include "throttlefilter.hh" + +#include +#include +#include +#include +#include + +namespace +{ +const char* const MAX_QPS_CFG = "max_qps"; +const char* const TRIGGER_DURATION_CFG = "trigger_duration"; +const char* const THROTTLE_DURATION_CFG = "throttle_duration"; +} + + +extern "C" MXS_MODULE* MXS_CREATE_MODULE() +{ + static MXS_MODULE info = + { + MXS_MODULE_API_FILTER, + MXS_MODULE_IN_DEVELOPMENT, + MXS_FILTER_VERSION, + "Prevents high frequency querying from monopolizing the system", + "V1.0.0", + RCAP_TYPE_STMT_INPUT, + &throttle::ThrottleFilter::s_object, + NULL, /* Process init. */ + NULL, /* Process finish. */ + NULL, /* Thread init. */ + NULL, /* Thread finish. */ + { + {MAX_QPS_CFG, MXS_MODULE_PARAM_INT}, + {TRIGGER_DURATION_CFG, MXS_MODULE_PARAM_INT}, + {THROTTLE_DURATION_CFG, MXS_MODULE_PARAM_INT}, + { MXS_END_MODULE_PARAMS } + } + }; + + return &info; +} + +namespace throttle +{ + +ThrottleFilter::ThrottleFilter(const ThrottleConfig &config) : m_config(config) +{ +} + +ThrottleFilter * ThrottleFilter::create(const char* zName, char * * pzOptions, MXS_CONFIG_PARAMETER * pParams) +{ + int max_qps = config_get_integer(pParams, MAX_QPS_CFG); + int trigger_secs = config_get_integer(pParams, TRIGGER_DURATION_CFG); + int throttle_secs = config_get_integer(pParams, THROTTLE_DURATION_CFG); + bool config_ok = true; + + if (max_qps < 2) + { + MXS_ERROR("Config value %s must be > 1", MAX_QPS_CFG); + config_ok = false; + } + + if (trigger_secs < 1) + { + MXS_ERROR("Config value %s must be > 0", TRIGGER_DURATION_CFG); + config_ok = false; + } + + if (throttle_secs < 0) + { + MXS_ERROR("Config value %s must be >= 0", THROTTLE_DURATION_CFG); + config_ok = false; + } + + Duration trigger_duration {std::chrono::seconds(trigger_secs)}; + Duration throttle_duration {std::chrono::seconds(throttle_secs)}; + + ThrottleFilter* filter {NULL}; + if (config_ok) + { + ThrottleConfig config = {max_qps, trigger_duration, throttle_duration}; + + std::ostringstream os1, os2; + os1 << config.trigger_duration; + os2 << config.throttle_duration; + + filter = new ThrottleFilter(config); + } + + return filter; +} + +ThrottleSession* ThrottleFilter::newSession(MXS_SESSION * mxsSession) +{ + return new ThrottleSession(mxsSession, *this); +} + +void ThrottleFilter::diagnostics(DCB * pDcb) +{ +} + +json_t* ThrottleFilter::diagnostics_json() const +{ + return NULL; +} + +uint64_t ThrottleFilter::getCapabilities() +{ + return RCAP_TYPE_NONE; +} + +const ThrottleConfig &ThrottleFilter::config() const +{ + return m_config; +} + +} // throttle diff --git a/server/modules/filter/throttlefilter/throttlefilter.hh b/server/modules/filter/throttlefilter/throttlefilter.hh new file mode 100644 index 000000000..3b8430796 --- /dev/null +++ b/server/modules/filter/throttlefilter/throttlefilter.hh @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2018 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ +#pragma once + +#include +#include "throttlesession.hh" +#include "eventcount.hh" +#include "stopwatch.hh" +#include +#include +#include + +namespace throttle +{ + +struct ThrottleConfig +{ + + int max_qps; // if this many queries per second is exceeded.. + Duration trigger_duration; // .. in this time window, then cap qps to max_qps .. + Duration throttle_duration; // .. for this long before disconnect. + // Example: max 100qps and trigger 5s. As soon as more than 500 queries happen in less than + // 5s the action is triggered. So, 501 queries in one second is a trigger, but 400qps + // for one second is acceptable as long as the qps stayed low for the previous 4 seconds, + // and stays low for the next 4 seconds. In other words, a short burst>max is not a trigger. + // Further, say max_throttle_duration is 60s. If the qps is continously capped (throttled) + // for 60s, the session is disconnected. + + // TODO: this should probably depend on overall activity. If this is to protect the + // database, multiple sessions gone haywire will still cause problems. It would be quite + // east to add a counter into the filter to measure overall qps. On the other hand, if + // a single session is active, it should be allowed to run at whatever the absolute + // allowable speed is. +}; + +class ThrottleFilter : public maxscale::Filter +{ +public: + static ThrottleFilter* create(const char* zName, char** pzOptions, MXS_CONFIG_PARAMETER* pParams); + ThrottleFilter(const ThrottleFilter&) = delete; + ThrottleFilter& operator = (const ThrottleFilter&) = delete; + + ThrottleSession* newSession(MXS_SESSION* mxsSession); + + void diagnostics(DCB* pDcb); + json_t* diagnostics_json() const; + uint64_t getCapabilities(); + const ThrottleConfig& config() const; + void sessionClose(ThrottleSession* session); +private: + ThrottleFilter(const ThrottleConfig& config); + + ThrottleConfig m_config; +}; +} // throttle diff --git a/server/modules/filter/throttlefilter/throttlesession.cc b/server/modules/filter/throttlefilter/throttlesession.cc new file mode 100644 index 000000000..b70f4b3bc --- /dev/null +++ b/server/modules/filter/throttlefilter/throttlesession.cc @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2018 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#define MXS_MODULE_NAME "throttlefilter" + +#include +#include +#include + +#include "throttlesession.hh" +#include "throttlefilter.hh" + +#include +#include +#include + +namespace throttle +{ +ThrottleSession::ThrottleSession(MXS_SESSION* mxsSession, ThrottleFilter &filter) + : maxscale::FilterSession(mxsSession), + m_filter(filter), + m_query_count("num-queries", filter.config().trigger_duration), + m_state(State::MEASURING) +{ +} + +int ThrottleSession::routeQuery(GWBUF *buffer) +{ + // TODO: count everything, or filter something out? + using namespace std::chrono; + + m_query_count.increment(); + int count = m_query_count.count(); + int secs = duration_cast(m_filter.config().trigger_duration).count(); + float qps = count / secs; // not instantaneous, but over so many seconds + + if (qps >= m_filter.config().max_qps) // trigger + { + // sleep a few cycles, keeping qps near max. + usleep(4 * 1000000 / qps); // to be replaced with delayed calls + + if (m_state == State::MEASURING) + { + MXS_INFO("Query throttling STARTED session %ld user %s", + m_pSession->ses_id, m_pSession->client_dcb->user); + m_state = State::THROTTLING; + m_first_trigger.restart(); + } + m_last_trigger.restart(); + } + else if (m_state == State::THROTTLING) + { + if (m_last_trigger.lap() > Duration(std::chrono::seconds(2))) // TODO, might be ok though. + { + m_state = State::MEASURING; + MXS_INFO("Query throttling stopped session %ld user %s", + m_pSession->ses_id, m_pSession->client_dcb->user); + } + else if (m_first_trigger.lap() > m_filter.config().throttle_duration) + { + MXS_NOTICE("Session %ld user %s, qps throttling limit reached. Disconnect.", + m_pSession->ses_id, m_pSession->client_dcb->user); + return false; // disconnect + } + } + + return mxs::FilterSession::routeQuery(buffer); +} +} // throttle diff --git a/server/modules/filter/throttlefilter/throttlesession.hh b/server/modules/filter/throttlefilter/throttlesession.hh new file mode 100644 index 000000000..698a2109f --- /dev/null +++ b/server/modules/filter/throttlefilter/throttlesession.hh @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2018 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ +#pragma once + +#include +#include "eventcount.hh" + +namespace throttle +{ + +class ThrottleFilter; + +class ThrottleSession : public maxscale::FilterSession +{ +public: + ThrottleSession(MXS_SESSION* pSession, ThrottleFilter& filter); + ThrottleSession(const ThrottleSession&) = delete; + ThrottleSession& operator = (const ThrottleSession&) = delete; + + int routeQuery(GWBUF* buffer); +private: + ThrottleFilter& m_filter; + EventCount m_query_count; + StopWatch m_first_trigger; + StopWatch m_last_trigger; + StopWatch remove_me; + + enum class State {MEASURING, THROTTLING}; + State m_state; +}; +} // throttle diff --git a/server/modules/monitor/mariadbmon/cluster_manipulation.cc b/server/modules/monitor/mariadbmon/cluster_manipulation.cc index 9bf363fa4..d7c884078 100644 --- a/server/modules/monitor/mariadbmon/cluster_manipulation.cc +++ b/server/modules/monitor/mariadbmon/cluster_manipulation.cc @@ -824,8 +824,8 @@ bool MariaDBMonitor::switchover_demote_master(MariaDBServer* current_master, jso } else { - const char GTID_ERROR[] = "Demotion failed due to an error in updating gtid:s. " - "Check log for more details."; + const char * const GTID_ERROR = "Demotion failed due to an error in updating gtid:s. " + "Check log for more details."; PRINT_MXS_JSON_ERROR(err_out, GTID_ERROR); } } @@ -1465,7 +1465,7 @@ static void print_redirect_errors(MariaDBServer* first_server, const ServerArray { // Individual server errors have already been printed to the log. // For JSON, gather the errors again. - const char MSG[] = "Could not redirect any slaves to the new master."; + const char* const MSG = "Could not redirect any slaves to the new master."; MXS_ERROR(MSG); if (err_out) { diff --git a/server/modules/monitor/mariadbmon/mariadbmon.cc b/server/modules/monitor/mariadbmon/mariadbmon.cc index ad96620f2..76691cf5f 100644 --- a/server/modules/monitor/mariadbmon/mariadbmon.cc +++ b/server/modules/monitor/mariadbmon/mariadbmon.cc @@ -977,7 +977,7 @@ bool handle_manual_switchover(const MODULECMD_ARG* args, json_t** error_out) bool rval = false; if (config_get_global_options()->passive) { - const char MSG[] = "Switchover requested but not performed, as MaxScale is in passive mode."; + const char* const MSG = "Switchover requested but not performed, as MaxScale is in passive mode."; PRINT_MXS_JSON_ERROR(error_out, MSG); } else