MXS-173 Initial version of throttle filter
This commit is contained in:
@ -15,3 +15,4 @@ add_subdirectory(tpmfilter)
|
|||||||
add_subdirectory(masking)
|
add_subdirectory(masking)
|
||||||
add_subdirectory(insertstream)
|
add_subdirectory(insertstream)
|
||||||
add_subdirectory(binlogfilter)
|
add_subdirectory(binlogfilter)
|
||||||
|
add_subdirectory(throttlefilter)
|
||||||
|
4
server/modules/filter/throttlefilter/CMakeLists.txt
Normal file
4
server/modules/filter/throttlefilter/CMakeLists.txt
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
add_library(throttlefilter SHARED throttlefilter.cc throttlesession.cc eventcount.cc stopwatch.cc)
|
||||||
|
target_link_libraries(throttlefilter maxscale-common mysqlcommon)
|
||||||
|
set_target_properties(throttlefilter PROPERTIES VERSION "1.0.0")
|
||||||
|
install_module(throttlefilter core)
|
292
server/modules/filter/throttlefilter/eventcount.cc
Normal file
292
server/modules/filter/throttlefilter/eventcount.cc
Normal file
@ -0,0 +1,292 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2018 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2020-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <maxscale/cppdefs.hh>
|
||||||
|
|
||||||
|
#include "eventcount.hh"
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <iostream>
|
||||||
|
#include <iomanip>
|
||||||
|
#include <sstream>
|
||||||
|
#include <map>
|
||||||
|
|
||||||
|
namespace throttle
|
||||||
|
{
|
||||||
|
|
||||||
|
EventCount::EventCount(const std::string& event_id, Duration time_window, Duration granularity) :
|
||||||
|
m_event_id(event_id),
|
||||||
|
m_time_window(time_window),
|
||||||
|
m_granularity(granularity.count())
|
||||||
|
{
|
||||||
|
increment();
|
||||||
|
}
|
||||||
|
|
||||||
|
void EventCount::increment()
|
||||||
|
{
|
||||||
|
using namespace std::chrono;
|
||||||
|
auto ticks = time_point_cast<nanoseconds>(Clock::now()).time_since_epoch().count();
|
||||||
|
if (m_granularity)
|
||||||
|
{
|
||||||
|
ticks = ticks / m_granularity * m_granularity;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_timestamps.empty()
|
||||||
|
|| m_timestamps.back().time_point.time_since_epoch().count() != ticks)
|
||||||
|
{
|
||||||
|
m_timestamps.emplace_back(TimePoint(ticks), 1);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
++m_timestamps.back().count;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
struct TimePointLessEqual
|
||||||
|
{
|
||||||
|
TimePoint lhs;
|
||||||
|
TimePointLessEqual(TimePoint tp) : lhs(tp) {}
|
||||||
|
bool operator()(const EventCount::Timestamp& rhs) const
|
||||||
|
{
|
||||||
|
return lhs <= rhs.time_point;
|
||||||
|
}
|
||||||
|
bool operator()(TimePoint rhs) const
|
||||||
|
{
|
||||||
|
return lhs <= rhs;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
void EventCount::purge() const
|
||||||
|
{
|
||||||
|
StopWatch sw;
|
||||||
|
auto windowBegin = Clock::now() - m_time_window;
|
||||||
|
|
||||||
|
auto ite = std::find_if(m_timestamps.begin(), m_timestamps.end(),
|
||||||
|
TimePointLessEqual(windowBegin));
|
||||||
|
m_timestamps.erase(m_timestamps.begin(), ite);
|
||||||
|
}
|
||||||
|
|
||||||
|
int EventCount::count() const
|
||||||
|
{
|
||||||
|
purge();
|
||||||
|
int count {0};
|
||||||
|
|
||||||
|
for (auto ite = m_timestamps.begin(); ite != m_timestamps.end(); ++ite)
|
||||||
|
{
|
||||||
|
count += ite->count;
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
void EventCount::dump(std::ostream &os) const
|
||||||
|
{
|
||||||
|
os << m_event_id << ": " << count() << " " << m_timestamps.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::ostream& operator<<(std::ostream& os, const EventCount& EventCount)
|
||||||
|
{
|
||||||
|
EventCount.dump(os);
|
||||||
|
return os;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force a purge once in awhile, could be configurable. This is needed if
|
||||||
|
// a client generates lots of events but rarely reads them back (purges).
|
||||||
|
const int CleanupCountdown = 10000;
|
||||||
|
|
||||||
|
SessionCount::SessionCount(const std::string& sess_id, Duration time_window,
|
||||||
|
Duration granularity) :
|
||||||
|
m_sess_id(sess_id), m_time_window(time_window), m_granularity(granularity),
|
||||||
|
m_cleanup_countdown(CleanupCountdown)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
const std::vector<EventCount> &SessionCount::event_counts() const
|
||||||
|
{
|
||||||
|
purge();
|
||||||
|
return m_event_counts;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool SessionCount::empty() const
|
||||||
|
{
|
||||||
|
purge();
|
||||||
|
return m_event_counts.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
struct MatchEventId
|
||||||
|
{
|
||||||
|
std::string event_id;
|
||||||
|
MatchEventId(const std::string& id) : event_id(id) {};
|
||||||
|
bool operator()(const EventCount& stats) const
|
||||||
|
{
|
||||||
|
return event_id == stats.event_id();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
void SessionCount::increment(const std::string& event_id)
|
||||||
|
{
|
||||||
|
// Always put the incremented entry (latest timestamp) last in the vector (using
|
||||||
|
// rotate). This means the vector is ordered so that expired entries are always first.
|
||||||
|
|
||||||
|
// Find in reverse, the entry is more likely to be towards the end. Actually no,
|
||||||
|
// for some reason the normal search is slightly faster when measured.
|
||||||
|
auto ite = find_if(m_event_counts.begin(), m_event_counts.end(),
|
||||||
|
MatchEventId(event_id));
|
||||||
|
if (ite == m_event_counts.end())
|
||||||
|
{
|
||||||
|
m_event_counts.emplace_back(event_id, m_time_window, m_granularity);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ite->increment();
|
||||||
|
// rotate so that the entry becomes the last one
|
||||||
|
auto next = std::next(ite);
|
||||||
|
std::rotate(ite, next, m_event_counts.end());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!--m_cleanup_countdown)
|
||||||
|
{
|
||||||
|
purge();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
struct NonZeroEntry
|
||||||
|
{
|
||||||
|
bool operator()(const EventCount& stats)
|
||||||
|
{
|
||||||
|
return stats.count() != 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
void SessionCount::purge() const
|
||||||
|
{
|
||||||
|
StopWatch sw;
|
||||||
|
m_cleanup_countdown = CleanupCountdown;
|
||||||
|
// erase entries up to the first non-zero one
|
||||||
|
auto ite = find_if(m_event_counts.begin(), m_event_counts.end(), NonZeroEntry());
|
||||||
|
// The gcc 4.4 vector::erase bug only happens if iterators are the same.
|
||||||
|
if (ite != m_event_counts.begin())
|
||||||
|
{
|
||||||
|
m_event_counts.erase(m_event_counts.begin(), ite);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void SessionCount::dump(std::ostream& os) const
|
||||||
|
{
|
||||||
|
purge();
|
||||||
|
if (!m_event_counts.empty())
|
||||||
|
{
|
||||||
|
os << " Session: " << m_sess_id << '\n';
|
||||||
|
for (auto ite = m_event_counts.begin(); ite != m_event_counts.end(); ++ite)
|
||||||
|
{
|
||||||
|
os << " " << *ite << '\n';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void dumpHeader(std::ostream& os, const SessionCount& stats, const std::string& type)
|
||||||
|
{
|
||||||
|
TimePoint tp = Clock::now();
|
||||||
|
os << type << ": Time:" << tp
|
||||||
|
<< " Time Window: " << stats.time_window() << '\n';
|
||||||
|
}
|
||||||
|
|
||||||
|
void dump(std::ostream& os, const std::vector<SessionCount>& sessions)
|
||||||
|
{
|
||||||
|
if (sessions.empty())
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
dumpHeader(os, sessions[0], "Count");
|
||||||
|
for (auto session = sessions.begin(); session != sessions.end(); ++session)
|
||||||
|
{
|
||||||
|
session->dump(os);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void dumpTotals(std::ostream& os, const std::vector<SessionCount> &sessions)
|
||||||
|
{
|
||||||
|
if (sessions.empty())
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::map<std::string, int> counts;
|
||||||
|
for (auto session = sessions.begin(); session != sessions.end(); ++session)
|
||||||
|
{
|
||||||
|
const auto& events = session->event_counts();
|
||||||
|
for (auto event = events.begin(); event != events.end(); ++event)
|
||||||
|
{
|
||||||
|
counts[event->event_id()] += event->count();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!counts.empty())
|
||||||
|
{
|
||||||
|
dumpHeader(os, sessions[0], "Count Totals");
|
||||||
|
for (auto ite = counts.begin(); ite != counts.end(); ++ite)
|
||||||
|
{
|
||||||
|
os << " " << ite->first << ": " << ite->second << '\n';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// EXTRA
|
||||||
|
// This section needed for gcc 4.4, to use move semantics and variadics.
|
||||||
|
|
||||||
|
EventCount::EventCount(EventCount && ss) :
|
||||||
|
m_event_id(std::move(ss.m_event_id)),
|
||||||
|
m_time_window(std::move(ss.m_time_window)),
|
||||||
|
m_granularity(std::move(ss.m_granularity)),
|
||||||
|
m_timestamps(std::move(ss.m_timestamps))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
EventCount &EventCount::operator=(EventCount && ss)
|
||||||
|
{
|
||||||
|
m_event_id = std::move(ss.m_event_id);
|
||||||
|
m_time_window = std::move(ss.m_time_window);
|
||||||
|
m_granularity = std::move(ss.m_granularity);
|
||||||
|
m_timestamps = std::move(ss.m_timestamps);
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
SessionCount::SessionCount(SessionCount&& ss) :
|
||||||
|
m_sess_id(std::move(ss.m_sess_id)),
|
||||||
|
m_time_window(std::move(ss.m_time_window)),
|
||||||
|
m_granularity(std::move(ss.m_granularity)),
|
||||||
|
m_cleanup_countdown(std::move(ss.m_cleanup_countdown)),
|
||||||
|
m_event_counts(std::move(ss.m_event_counts))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
SessionCount & SessionCount::operator=(SessionCount&& ss)
|
||||||
|
{
|
||||||
|
m_sess_id = std::move(ss.m_sess_id);
|
||||||
|
m_time_window = std::move(ss.m_time_window);
|
||||||
|
m_granularity = std::move(ss.m_granularity);
|
||||||
|
m_cleanup_countdown = std::move(ss.m_cleanup_countdown);
|
||||||
|
m_event_counts = std::move(ss.m_event_counts);
|
||||||
|
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
} // throttle
|
111
server/modules/filter/throttlefilter/eventcount.hh
Normal file
111
server/modules/filter/throttlefilter/eventcount.hh
Normal file
@ -0,0 +1,111 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2018 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2020-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "stopwatch.hh"
|
||||||
|
#include <iosfwd>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
namespace throttle
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @brief Keep a count of an events for a time period from "now" into the past.
|
||||||
|
*
|
||||||
|
* Events are stored, or distinguished, with timestamps of a given granularity.
|
||||||
|
* For example, if the granularity is set to 1s, each time an event is reported the current
|
||||||
|
* time is rounded down to the nearest whole second. All events that round down to the same
|
||||||
|
* second share a single entry in the EventCount. Granularity==0 causes all events (timestamps)
|
||||||
|
* to be stored in their own entry, which could use large amounts of memory.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
class EventCount
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit EventCount(const std::string& event_id, Duration time_window,
|
||||||
|
Duration granularity = Duration(std::chrono::milliseconds(100)));
|
||||||
|
EventCount(const EventCount&) = delete;
|
||||||
|
EventCount& operator=(const EventCount&) = delete;
|
||||||
|
EventCount(EventCount&&); // can't be defaulted in gcc 4.4
|
||||||
|
EventCount& operator=(EventCount&&); // can't be defaulted in gcc 4.4
|
||||||
|
|
||||||
|
const std::string& event_id() const
|
||||||
|
{
|
||||||
|
return m_event_id;
|
||||||
|
}
|
||||||
|
Duration time_window() const
|
||||||
|
{
|
||||||
|
return m_time_window;
|
||||||
|
}
|
||||||
|
void dump(std::ostream& os) const;
|
||||||
|
int count() const;
|
||||||
|
void increment();
|
||||||
|
|
||||||
|
// these defs need not be public once lambdas are available
|
||||||
|
struct Timestamp
|
||||||
|
{
|
||||||
|
TimePoint time_point;
|
||||||
|
int count;
|
||||||
|
Timestamp(TimePoint p, int c) : time_point(p), count(c) {}
|
||||||
|
};
|
||||||
|
private:
|
||||||
|
void purge() const; // remove out of window stats
|
||||||
|
|
||||||
|
std::string m_event_id;
|
||||||
|
Duration m_time_window;
|
||||||
|
Duration::rep m_granularity;
|
||||||
|
mutable std::vector<Timestamp> m_timestamps;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::ostream& operator<<(std::ostream& os, const EventCount& stats);
|
||||||
|
|
||||||
|
// Time series statistics for a Session (a collection of related EventCount).
|
||||||
|
class SessionCount
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
SessionCount(const std::string& sess_id, Duration time_window,
|
||||||
|
Duration granularity = Duration(std::chrono::seconds(1)));
|
||||||
|
SessionCount(const SessionCount&) = delete;
|
||||||
|
SessionCount& operator=(const SessionCount&) = delete;
|
||||||
|
SessionCount(SessionCount &&); // can't be defaulted in gcc 4.4
|
||||||
|
SessionCount& operator=(SessionCount&&); // can't be defaulted in gcc 4.4
|
||||||
|
|
||||||
|
const std::string& session_id() const
|
||||||
|
{
|
||||||
|
return m_sess_id;
|
||||||
|
}
|
||||||
|
Duration time_window() const
|
||||||
|
{
|
||||||
|
return m_time_window;
|
||||||
|
}
|
||||||
|
const std::vector<EventCount>& event_counts() const;
|
||||||
|
void dump(std::ostream& os) const;
|
||||||
|
bool empty() const; // no stats
|
||||||
|
|
||||||
|
void increment(const std::string& event_id);
|
||||||
|
private:
|
||||||
|
void purge() const; // remove out of window stats
|
||||||
|
|
||||||
|
std::string m_sess_id;
|
||||||
|
Duration m_time_window;
|
||||||
|
Duration m_granularity;
|
||||||
|
mutable int m_cleanup_countdown;
|
||||||
|
mutable std::vector<EventCount> m_event_counts;
|
||||||
|
};
|
||||||
|
|
||||||
|
// conveniece. Any real formatted output should go elsewhere.
|
||||||
|
std::ostream& operator<<(std::ostream& os, const SessionCount& stats);
|
||||||
|
void dump(std::ostream& os, const std::vector<SessionCount>& sessions);
|
||||||
|
void dumpTotals(std::ostream& os, const std::vector<SessionCount> &sessions);
|
||||||
|
|
||||||
|
} // throttle
|
151
server/modules/filter/throttlefilter/stopwatch.cc
Normal file
151
server/modules/filter/throttlefilter/stopwatch.cc
Normal file
@ -0,0 +1,151 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2018 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2020-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <maxscale/cppdefs.hh>
|
||||||
|
|
||||||
|
#include "stopwatch.hh"
|
||||||
|
|
||||||
|
#include <iomanip>
|
||||||
|
#include <iostream>
|
||||||
|
#include <sstream>
|
||||||
|
#include <ctime>
|
||||||
|
|
||||||
|
namespace throttle
|
||||||
|
{
|
||||||
|
|
||||||
|
StopWatch::StopWatch()
|
||||||
|
{
|
||||||
|
restart();
|
||||||
|
}
|
||||||
|
|
||||||
|
Duration StopWatch::lap() const
|
||||||
|
{
|
||||||
|
return {Clock::now() - m_start};
|
||||||
|
}
|
||||||
|
|
||||||
|
Duration StopWatch::restart()
|
||||||
|
{
|
||||||
|
TimePoint now = Clock::now();
|
||||||
|
Duration lap = now - m_start;
|
||||||
|
m_start = now;
|
||||||
|
return lap;
|
||||||
|
}
|
||||||
|
} // maxscale
|
||||||
|
|
||||||
|
/********** OUTPUT ***********/
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
using namespace throttle;
|
||||||
|
struct TimeConvert
|
||||||
|
{
|
||||||
|
double div; // divide the value of the previous unit by this
|
||||||
|
std::string suffix; // milliseconds, hours etc.
|
||||||
|
double max_visual; // threashold to switch to the next unit
|
||||||
|
};
|
||||||
|
// Will never get to centuries because the duration is a long carrying nanoseconds
|
||||||
|
TimeConvert convert[]
|
||||||
|
{
|
||||||
|
{1, "ns", 1000}, {1000, "us", 1000}, {1000, "ms", 1000},
|
||||||
|
{1000, "s", 60}, {60, "min", 60}, {60, "hours", 24},
|
||||||
|
{24, "days", 365.25}, {365.25, "years", 10000},
|
||||||
|
{100, "centuries", std::numeric_limits<double>::max()}
|
||||||
|
};
|
||||||
|
|
||||||
|
int convert_size = sizeof(convert) / sizeof(convert[0]);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace throttle
|
||||||
|
{
|
||||||
|
std::pair<double, std::string> dur_to_human_readable(Duration dur)
|
||||||
|
{
|
||||||
|
using namespace std::chrono;
|
||||||
|
double time = duration_cast<nanoseconds>(dur).count();
|
||||||
|
bool negative = (time < 0) ? time = -time, true : false;
|
||||||
|
|
||||||
|
for (int i = 0; i <= convert_size; ++i)
|
||||||
|
{
|
||||||
|
if (i == convert_size)
|
||||||
|
{
|
||||||
|
return std::make_pair(negative ? -time : time,
|
||||||
|
convert[convert_size - 1].suffix);
|
||||||
|
}
|
||||||
|
|
||||||
|
time /= convert[i].div;
|
||||||
|
|
||||||
|
if (time < convert[i].max_visual)
|
||||||
|
{
|
||||||
|
return std::make_pair(negative ? -time : time, convert[i].suffix);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
abort(); // should never get here
|
||||||
|
}
|
||||||
|
|
||||||
|
std::ostream& operator<<(std::ostream& os, Duration dur)
|
||||||
|
{
|
||||||
|
auto p = dur_to_human_readable(dur);
|
||||||
|
os << p.first << p.second;
|
||||||
|
|
||||||
|
return os;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: this will require some thought. time_point_to_string() for a system_clock is
|
||||||
|
// obvious, but not so for a steady_clock. Maybe TimePoint belongs to a system clock
|
||||||
|
// and sould be called something else here, and live in a time_measuring namespace.
|
||||||
|
std::string time_point_to_string(TimePoint tp, const std::string &fmt)
|
||||||
|
{
|
||||||
|
using namespace std::chrono;
|
||||||
|
std::time_t timet = system_clock::to_time_t(system_clock::now()
|
||||||
|
+ (tp - Clock::now()));
|
||||||
|
|
||||||
|
struct tm * ptm;
|
||||||
|
ptm = gmtime (&timet);
|
||||||
|
const int sz = 1024;
|
||||||
|
char buf[sz];
|
||||||
|
strftime(buf, sz, fmt.c_str(), ptm);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::ostream & operator<<(std::ostream & os, TimePoint tp)
|
||||||
|
{
|
||||||
|
os << time_point_to_string(tp);
|
||||||
|
return os;
|
||||||
|
}
|
||||||
|
|
||||||
|
void test_stopwatch_output(std::ostream & os)
|
||||||
|
{
|
||||||
|
long long dur[] =
|
||||||
|
{
|
||||||
|
400, // 400ns
|
||||||
|
5 * 1000, // 5us
|
||||||
|
500 * 1000, // 500us
|
||||||
|
1 * 1000000, // 1ms
|
||||||
|
700 * 1000000LL, // 700ms
|
||||||
|
5 * 1000000000LL, // 5s
|
||||||
|
200 * 1000000000LL, // 200s
|
||||||
|
5 * 60 * 1000000000LL, // 5m
|
||||||
|
45 * 60 * 1000000000LL, // 45m
|
||||||
|
130 * 60 * 1000000000LL, // 130m
|
||||||
|
24 * 60 * 60 * 1000000000LL, // 24 hours
|
||||||
|
3 * 24 * 60 * 60 * 1000000000LL, // 72 hours
|
||||||
|
180 * 24 * 60 * 60 * 1000000000LL, // 180 days
|
||||||
|
1000 * 24 * 60 * 60 * 1000000000LL // 1000 days
|
||||||
|
};
|
||||||
|
|
||||||
|
for (unsigned i = 0; i < sizeof(dur) / sizeof(dur[0]); ++i)
|
||||||
|
{
|
||||||
|
os << Duration(dur[i]) << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} // throttle
|
61
server/modules/filter/throttlefilter/stopwatch.hh
Normal file
61
server/modules/filter/throttlefilter/stopwatch.hh
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2018 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2020-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
#include <iosfwd>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
namespace throttle
|
||||||
|
{
|
||||||
|
|
||||||
|
#if __cplusplus >= 201103
|
||||||
|
typedef std::chrono::steady_clock Clock;
|
||||||
|
#else
|
||||||
|
typedef std::chrono::system_clock Clock;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
struct Duration : public Clock::duration // for ADL
|
||||||
|
{
|
||||||
|
// gcc 4.4 does not inherit constructors, so this is a bit limited.
|
||||||
|
Duration() = default;
|
||||||
|
Duration(long long l) : Clock::duration(l) {}
|
||||||
|
Duration(Clock::duration d) : Clock::duration(d) {}
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef std::chrono::time_point<Clock, Duration> TimePoint;
|
||||||
|
|
||||||
|
class StopWatch
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
// Starts the stopwatch, which is always running.
|
||||||
|
StopWatch();
|
||||||
|
// Get elapsed time.
|
||||||
|
Duration lap() const;
|
||||||
|
// Get elapsed time, restart StopWatch.
|
||||||
|
Duration restart();
|
||||||
|
private:
|
||||||
|
TimePoint m_start;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Returns the value as a double and string adjusted to a suffix like ms for milliseconds.
|
||||||
|
std::pair<double, std::string> dur_to_human_readable(Duration dur);
|
||||||
|
|
||||||
|
// Human readable output. No standard library for it yet.
|
||||||
|
std::ostream& operator<<(std::ostream&, Duration dur);
|
||||||
|
|
||||||
|
// TimePoint
|
||||||
|
std::string time_point_to_string(TimePoint tp, const std::string& fmt = "%F %T");
|
||||||
|
std::ostream& operator<<(std::ostream&, TimePoint tp);
|
||||||
|
|
||||||
|
} // throttle
|
130
server/modules/filter/throttlefilter/throttlefilter.cc
Normal file
130
server/modules/filter/throttlefilter/throttlefilter.cc
Normal file
@ -0,0 +1,130 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) Niclas Antti
|
||||||
|
*
|
||||||
|
* This software is released under the MIT License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define MXS_MODULE_NAME "throttlefilter"
|
||||||
|
|
||||||
|
#include <maxscale/cppdefs.hh>
|
||||||
|
#include <maxscale/utils.h>
|
||||||
|
#include <maxscale/json_api.h>
|
||||||
|
#include <maxscale/jansson.hh>
|
||||||
|
|
||||||
|
#include "throttlefilter.hh"
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <algorithm>
|
||||||
|
#include <sstream>
|
||||||
|
#include <fstream>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
const char* const MAX_QPS_CFG = "max_qps";
|
||||||
|
const char* const TRIGGER_DURATION_CFG = "trigger_duration";
|
||||||
|
const char* const THROTTLE_DURATION_CFG = "throttle_duration";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
extern "C" MXS_MODULE* MXS_CREATE_MODULE()
|
||||||
|
{
|
||||||
|
static MXS_MODULE info =
|
||||||
|
{
|
||||||
|
MXS_MODULE_API_FILTER,
|
||||||
|
MXS_MODULE_IN_DEVELOPMENT,
|
||||||
|
MXS_FILTER_VERSION,
|
||||||
|
"Prevents high frequency querying from monopolizing the system",
|
||||||
|
"V1.0.0",
|
||||||
|
RCAP_TYPE_STMT_INPUT,
|
||||||
|
&throttle::ThrottleFilter::s_object,
|
||||||
|
NULL, /* Process init. */
|
||||||
|
NULL, /* Process finish. */
|
||||||
|
NULL, /* Thread init. */
|
||||||
|
NULL, /* Thread finish. */
|
||||||
|
{
|
||||||
|
{MAX_QPS_CFG, MXS_MODULE_PARAM_INT},
|
||||||
|
{TRIGGER_DURATION_CFG, MXS_MODULE_PARAM_INT},
|
||||||
|
{THROTTLE_DURATION_CFG, MXS_MODULE_PARAM_INT},
|
||||||
|
{ MXS_END_MODULE_PARAMS }
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
return &info;
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace throttle
|
||||||
|
{
|
||||||
|
|
||||||
|
ThrottleFilter::ThrottleFilter(const ThrottleConfig &config) : m_config(config)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
ThrottleFilter * ThrottleFilter::create(const char* zName, char * * pzOptions, MXS_CONFIG_PARAMETER * pParams)
|
||||||
|
{
|
||||||
|
int max_qps = config_get_integer(pParams, MAX_QPS_CFG);
|
||||||
|
int trigger_secs = config_get_integer(pParams, TRIGGER_DURATION_CFG);
|
||||||
|
int throttle_secs = config_get_integer(pParams, THROTTLE_DURATION_CFG);
|
||||||
|
bool config_ok = true;
|
||||||
|
|
||||||
|
if (max_qps < 2)
|
||||||
|
{
|
||||||
|
MXS_ERROR("Config value %s must be > 1", MAX_QPS_CFG);
|
||||||
|
config_ok = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (trigger_secs < 1)
|
||||||
|
{
|
||||||
|
MXS_ERROR("Config value %s must be > 0", TRIGGER_DURATION_CFG);
|
||||||
|
config_ok = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (throttle_secs < 0)
|
||||||
|
{
|
||||||
|
MXS_ERROR("Config value %s must be >= 0", THROTTLE_DURATION_CFG);
|
||||||
|
config_ok = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Duration trigger_duration {std::chrono::seconds(trigger_secs)};
|
||||||
|
Duration throttle_duration {std::chrono::seconds(throttle_secs)};
|
||||||
|
|
||||||
|
ThrottleFilter* filter {NULL};
|
||||||
|
if (config_ok)
|
||||||
|
{
|
||||||
|
ThrottleConfig config = {max_qps, trigger_duration, throttle_duration};
|
||||||
|
|
||||||
|
std::ostringstream os1, os2;
|
||||||
|
os1 << config.trigger_duration;
|
||||||
|
os2 << config.throttle_duration;
|
||||||
|
|
||||||
|
filter = new ThrottleFilter(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
return filter;
|
||||||
|
}
|
||||||
|
|
||||||
|
ThrottleSession* ThrottleFilter::newSession(MXS_SESSION * mxsSession)
|
||||||
|
{
|
||||||
|
return new ThrottleSession(mxsSession, *this);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThrottleFilter::diagnostics(DCB * pDcb)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
json_t* ThrottleFilter::diagnostics_json() const
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t ThrottleFilter::getCapabilities()
|
||||||
|
{
|
||||||
|
return RCAP_TYPE_NONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
const ThrottleConfig &ThrottleFilter::config() const
|
||||||
|
{
|
||||||
|
return m_config;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // throttle
|
65
server/modules/filter/throttlefilter/throttlefilter.hh
Normal file
65
server/modules/filter/throttlefilter/throttlefilter.hh
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2018 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2020-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <maxscale/filter.hh>
|
||||||
|
#include "throttlesession.hh"
|
||||||
|
#include "eventcount.hh"
|
||||||
|
#include "stopwatch.hh"
|
||||||
|
#include <iostream>
|
||||||
|
#include <thread>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
namespace throttle
|
||||||
|
{
|
||||||
|
|
||||||
|
struct ThrottleConfig
|
||||||
|
{
|
||||||
|
|
||||||
|
int max_qps; // if this many queries per second is exceeded..
|
||||||
|
Duration trigger_duration; // .. in this time window, then cap qps to max_qps ..
|
||||||
|
Duration throttle_duration; // .. for this long before disconnect.
|
||||||
|
// Example: max 100qps and trigger 5s. As soon as more than 500 queries happen in less than
|
||||||
|
// 5s the action is triggered. So, 501 queries in one second is a trigger, but 400qps
|
||||||
|
// for one second is acceptable as long as the qps stayed low for the previous 4 seconds,
|
||||||
|
// and stays low for the next 4 seconds. In other words, a short burst>max is not a trigger.
|
||||||
|
// Further, say max_throttle_duration is 60s. If the qps is continously capped (throttled)
|
||||||
|
// for 60s, the session is disconnected.
|
||||||
|
|
||||||
|
// TODO: this should probably depend on overall activity. If this is to protect the
|
||||||
|
// database, multiple sessions gone haywire will still cause problems. It would be quite
|
||||||
|
// east to add a counter into the filter to measure overall qps. On the other hand, if
|
||||||
|
// a single session is active, it should be allowed to run at whatever the absolute
|
||||||
|
// allowable speed is.
|
||||||
|
};
|
||||||
|
|
||||||
|
class ThrottleFilter : public maxscale::Filter<ThrottleFilter, ThrottleSession>
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
static ThrottleFilter* create(const char* zName, char** pzOptions, MXS_CONFIG_PARAMETER* pParams);
|
||||||
|
ThrottleFilter(const ThrottleFilter&) = delete;
|
||||||
|
ThrottleFilter& operator = (const ThrottleFilter&) = delete;
|
||||||
|
|
||||||
|
ThrottleSession* newSession(MXS_SESSION* mxsSession);
|
||||||
|
|
||||||
|
void diagnostics(DCB* pDcb);
|
||||||
|
json_t* diagnostics_json() const;
|
||||||
|
uint64_t getCapabilities();
|
||||||
|
const ThrottleConfig& config() const;
|
||||||
|
void sessionClose(ThrottleSession* session);
|
||||||
|
private:
|
||||||
|
ThrottleFilter(const ThrottleConfig& config);
|
||||||
|
|
||||||
|
ThrottleConfig m_config;
|
||||||
|
};
|
||||||
|
} // throttle
|
79
server/modules/filter/throttlefilter/throttlesession.cc
Normal file
79
server/modules/filter/throttlefilter/throttlesession.cc
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2018 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2020-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define MXS_MODULE_NAME "throttlefilter"
|
||||||
|
|
||||||
|
#include <maxscale/cppdefs.hh>
|
||||||
|
#include <maxscale/modutil.h>
|
||||||
|
#include <maxscale/query_classifier.h>
|
||||||
|
|
||||||
|
#include "throttlesession.hh"
|
||||||
|
#include "throttlefilter.hh"
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <algorithm>
|
||||||
|
#include <sstream>
|
||||||
|
|
||||||
|
namespace throttle
|
||||||
|
{
|
||||||
|
ThrottleSession::ThrottleSession(MXS_SESSION* mxsSession, ThrottleFilter &filter)
|
||||||
|
: maxscale::FilterSession(mxsSession),
|
||||||
|
m_filter(filter),
|
||||||
|
m_query_count("num-queries", filter.config().trigger_duration),
|
||||||
|
m_state(State::MEASURING)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
int ThrottleSession::routeQuery(GWBUF *buffer)
|
||||||
|
{
|
||||||
|
// TODO: count everything, or filter something out?
|
||||||
|
using namespace std::chrono;
|
||||||
|
|
||||||
|
m_query_count.increment();
|
||||||
|
int count = m_query_count.count();
|
||||||
|
int secs = duration_cast<seconds>(m_filter.config().trigger_duration).count();
|
||||||
|
float qps = count / secs; // not instantaneous, but over so many seconds
|
||||||
|
|
||||||
|
if (qps >= m_filter.config().max_qps) // trigger
|
||||||
|
{
|
||||||
|
// sleep a few cycles, keeping qps near max.
|
||||||
|
usleep(4 * 1000000 / qps); // to be replaced with delayed calls
|
||||||
|
|
||||||
|
if (m_state == State::MEASURING)
|
||||||
|
{
|
||||||
|
MXS_INFO("Query throttling STARTED session %ld user %s",
|
||||||
|
m_pSession->ses_id, m_pSession->client_dcb->user);
|
||||||
|
m_state = State::THROTTLING;
|
||||||
|
m_first_trigger.restart();
|
||||||
|
}
|
||||||
|
m_last_trigger.restart();
|
||||||
|
}
|
||||||
|
else if (m_state == State::THROTTLING)
|
||||||
|
{
|
||||||
|
if (m_last_trigger.lap() > Duration(std::chrono::seconds(2))) // TODO, might be ok though.
|
||||||
|
{
|
||||||
|
m_state = State::MEASURING;
|
||||||
|
MXS_INFO("Query throttling stopped session %ld user %s",
|
||||||
|
m_pSession->ses_id, m_pSession->client_dcb->user);
|
||||||
|
}
|
||||||
|
else if (m_first_trigger.lap() > m_filter.config().throttle_duration)
|
||||||
|
{
|
||||||
|
MXS_NOTICE("Session %ld user %s, qps throttling limit reached. Disconnect.",
|
||||||
|
m_pSession->ses_id, m_pSession->client_dcb->user);
|
||||||
|
return false; // disconnect
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return mxs::FilterSession::routeQuery(buffer);
|
||||||
|
}
|
||||||
|
} // throttle
|
41
server/modules/filter/throttlefilter/throttlesession.hh
Normal file
41
server/modules/filter/throttlefilter/throttlesession.hh
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2018 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2020-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <maxscale/filter.hh>
|
||||||
|
#include "eventcount.hh"
|
||||||
|
|
||||||
|
namespace throttle
|
||||||
|
{
|
||||||
|
|
||||||
|
class ThrottleFilter;
|
||||||
|
|
||||||
|
class ThrottleSession : public maxscale::FilterSession
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
ThrottleSession(MXS_SESSION* pSession, ThrottleFilter& filter);
|
||||||
|
ThrottleSession(const ThrottleSession&) = delete;
|
||||||
|
ThrottleSession& operator = (const ThrottleSession&) = delete;
|
||||||
|
|
||||||
|
int routeQuery(GWBUF* buffer);
|
||||||
|
private:
|
||||||
|
ThrottleFilter& m_filter;
|
||||||
|
EventCount m_query_count;
|
||||||
|
StopWatch m_first_trigger;
|
||||||
|
StopWatch m_last_trigger;
|
||||||
|
StopWatch remove_me;
|
||||||
|
|
||||||
|
enum class State {MEASURING, THROTTLING};
|
||||||
|
State m_state;
|
||||||
|
};
|
||||||
|
} // throttle
|
@ -824,7 +824,7 @@ bool MariaDBMonitor::switchover_demote_master(MariaDBServer* current_master, jso
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
const char GTID_ERROR[] = "Demotion failed due to an error in updating gtid:s. "
|
const char * const GTID_ERROR = "Demotion failed due to an error in updating gtid:s. "
|
||||||
"Check log for more details.";
|
"Check log for more details.";
|
||||||
PRINT_MXS_JSON_ERROR(err_out, GTID_ERROR);
|
PRINT_MXS_JSON_ERROR(err_out, GTID_ERROR);
|
||||||
}
|
}
|
||||||
@ -1465,7 +1465,7 @@ static void print_redirect_errors(MariaDBServer* first_server, const ServerArray
|
|||||||
{
|
{
|
||||||
// Individual server errors have already been printed to the log.
|
// Individual server errors have already been printed to the log.
|
||||||
// For JSON, gather the errors again.
|
// For JSON, gather the errors again.
|
||||||
const char MSG[] = "Could not redirect any slaves to the new master.";
|
const char* const MSG = "Could not redirect any slaves to the new master.";
|
||||||
MXS_ERROR(MSG);
|
MXS_ERROR(MSG);
|
||||||
if (err_out)
|
if (err_out)
|
||||||
{
|
{
|
||||||
|
@ -977,7 +977,7 @@ bool handle_manual_switchover(const MODULECMD_ARG* args, json_t** error_out)
|
|||||||
bool rval = false;
|
bool rval = false;
|
||||||
if (config_get_global_options()->passive)
|
if (config_get_global_options()->passive)
|
||||||
{
|
{
|
||||||
const char MSG[] = "Switchover requested but not performed, as MaxScale is in passive mode.";
|
const char* const MSG = "Switchover requested but not performed, as MaxScale is in passive mode.";
|
||||||
PRINT_MXS_JSON_ERROR(error_out, MSG);
|
PRINT_MXS_JSON_ERROR(error_out, MSG);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
Reference in New Issue
Block a user