MXS-1777 use maxbase library

The library directory structure could be simplified for ease of use, but better done with a separate commit.
This commit is contained in:
Niclas Antti
2018-06-16 10:38:52 +03:00
parent d2e1cfdf4e
commit 81deedd857
8 changed files with 15 additions and 630 deletions

View File

@ -1,4 +1,4 @@
add_library(throttlefilter SHARED throttlefilter.cc throttlesession.cc eventcount.cc stopwatch.cc)
target_link_libraries(throttlefilter maxscale-common mysqlcommon)
add_library(throttlefilter SHARED throttlefilter.cc throttlesession.cc)
target_link_libraries(throttlefilter maxbase maxscale-common mysqlcommon)
set_target_properties(throttlefilter PROPERTIES VERSION "1.0.0")
install_module(throttlefilter core)

View File

@ -1,292 +0,0 @@
/*
* Copyright (c) 2018 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include "eventcount.hh"
#include <algorithm>
#include <iostream>
#include <iomanip>
#include <sstream>
#include <map>
namespace throttle
{
EventCount::EventCount(const std::string& event_id, Duration time_window, Duration granularity) :
m_event_id(event_id),
m_time_window(time_window),
m_granularity(granularity.count())
{
increment();
}
void EventCount::increment()
{
using namespace std::chrono;
auto ticks = time_point_cast<nanoseconds>(Clock::now()).time_since_epoch().count();
if (m_granularity)
{
ticks = ticks / m_granularity * m_granularity;
}
if (m_timestamps.empty()
|| m_timestamps.back().time_point.time_since_epoch().count() != ticks)
{
m_timestamps.emplace_back(TimePoint(ticks), 1);
}
else
{
++m_timestamps.back().count;
}
}
namespace
{
struct TimePointLessEqual
{
TimePoint lhs;
TimePointLessEqual(TimePoint tp) : lhs(tp) {}
bool operator()(const EventCount::Timestamp& rhs) const
{
return lhs <= rhs.time_point;
}
bool operator()(TimePoint rhs) const
{
return lhs <= rhs;
}
};
}
void EventCount::purge() const
{
StopWatch sw;
auto windowBegin = Clock::now() - m_time_window;
auto ite = std::find_if(m_timestamps.begin(), m_timestamps.end(),
TimePointLessEqual(windowBegin));
m_timestamps.erase(m_timestamps.begin(), ite);
}
int EventCount::count() const
{
purge();
int count {0};
for (auto ite = m_timestamps.begin(); ite != m_timestamps.end(); ++ite)
{
count += ite->count;
}
return count;
}
void EventCount::dump(std::ostream &os) const
{
os << m_event_id << ": " << count() << " " << m_timestamps.size();
}
std::ostream& operator<<(std::ostream& os, const EventCount& EventCount)
{
EventCount.dump(os);
return os;
}
// Force a purge once in awhile, could be configurable. This is needed if
// a client generates lots of events but rarely reads them back (purges).
const int CleanupCountdown = 10000;
SessionCount::SessionCount(const std::string& sess_id, Duration time_window,
Duration granularity) :
m_sess_id(sess_id), m_time_window(time_window), m_granularity(granularity),
m_cleanup_countdown(CleanupCountdown)
{
}
const std::vector<EventCount> &SessionCount::event_counts() const
{
purge();
return m_event_counts;
}
bool SessionCount::empty() const
{
purge();
return m_event_counts.empty();
}
namespace
{
struct MatchEventId
{
std::string event_id;
MatchEventId(const std::string& id) : event_id(id) {};
bool operator()(const EventCount& stats) const
{
return event_id == stats.event_id();
}
};
}
void SessionCount::increment(const std::string& event_id)
{
// Always put the incremented entry (latest timestamp) last in the vector (using
// rotate). This means the vector is ordered so that expired entries are always first.
// Find in reverse, the entry is more likely to be towards the end. Actually no,
// for some reason the normal search is slightly faster when measured.
auto ite = find_if(m_event_counts.begin(), m_event_counts.end(),
MatchEventId(event_id));
if (ite == m_event_counts.end())
{
m_event_counts.emplace_back(event_id, m_time_window, m_granularity);
}
else
{
ite->increment();
// rotate so that the entry becomes the last one
auto next = std::next(ite);
std::rotate(ite, next, m_event_counts.end());
}
if (!--m_cleanup_countdown)
{
purge();
}
}
namespace
{
struct NonZeroEntry
{
bool operator()(const EventCount& stats)
{
return stats.count() != 0;
}
};
}
void SessionCount::purge() const
{
StopWatch sw;
m_cleanup_countdown = CleanupCountdown;
// erase entries up to the first non-zero one
auto ite = find_if(m_event_counts.begin(), m_event_counts.end(), NonZeroEntry());
// The gcc 4.4 vector::erase bug only happens if iterators are the same.
if (ite != m_event_counts.begin())
{
m_event_counts.erase(m_event_counts.begin(), ite);
}
}
void SessionCount::dump(std::ostream& os) const
{
purge();
if (!m_event_counts.empty())
{
os << " Session: " << m_sess_id << '\n';
for (auto ite = m_event_counts.begin(); ite != m_event_counts.end(); ++ite)
{
os << " " << *ite << '\n';
}
}
}
void dumpHeader(std::ostream& os, const SessionCount& stats, const std::string& type)
{
TimePoint tp = Clock::now();
os << type << ": Time:" << tp
<< " Time Window: " << stats.time_window() << '\n';
}
void dump(std::ostream& os, const std::vector<SessionCount>& sessions)
{
if (sessions.empty())
{
return;
}
dumpHeader(os, sessions[0], "Count");
for (auto session = sessions.begin(); session != sessions.end(); ++session)
{
session->dump(os);
}
}
void dumpTotals(std::ostream& os, const std::vector<SessionCount> &sessions)
{
if (sessions.empty())
{
return;
}
std::map<std::string, int> counts;
for (auto session = sessions.begin(); session != sessions.end(); ++session)
{
const auto& events = session->event_counts();
for (auto event = events.begin(); event != events.end(); ++event)
{
counts[event->event_id()] += event->count();
}
}
if (!counts.empty())
{
dumpHeader(os, sessions[0], "Count Totals");
for (auto ite = counts.begin(); ite != counts.end(); ++ite)
{
os << " " << ite->first << ": " << ite->second << '\n';
}
}
}
// EXTRA
// This section needed for gcc 4.4, to use move semantics and variadics.
EventCount::EventCount(EventCount && ss) :
m_event_id(std::move(ss.m_event_id)),
m_time_window(std::move(ss.m_time_window)),
m_granularity(std::move(ss.m_granularity)),
m_timestamps(std::move(ss.m_timestamps))
{
}
EventCount &EventCount::operator=(EventCount && ss)
{
m_event_id = std::move(ss.m_event_id);
m_time_window = std::move(ss.m_time_window);
m_granularity = std::move(ss.m_granularity);
m_timestamps = std::move(ss.m_timestamps);
return *this;
}
SessionCount::SessionCount(SessionCount&& ss) :
m_sess_id(std::move(ss.m_sess_id)),
m_time_window(std::move(ss.m_time_window)),
m_granularity(std::move(ss.m_granularity)),
m_cleanup_countdown(std::move(ss.m_cleanup_countdown)),
m_event_counts(std::move(ss.m_event_counts))
{
}
SessionCount & SessionCount::operator=(SessionCount&& ss)
{
m_sess_id = std::move(ss.m_sess_id);
m_time_window = std::move(ss.m_time_window);
m_granularity = std::move(ss.m_granularity);
m_cleanup_countdown = std::move(ss.m_cleanup_countdown);
m_event_counts = std::move(ss.m_event_counts);
return *this;
}
} // throttle

View File

@ -1,111 +0,0 @@
/*
* Copyright (c) 2018 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#pragma once
#include "stopwatch.hh"
#include <iosfwd>
#include <vector>
namespace throttle
{
/**
* @brief Keep a count of an events for a time period from "now" into the past.
*
* Events are stored, or distinguished, with timestamps of a given granularity.
* For example, if the granularity is set to 1s, each time an event is reported the current
* time is rounded down to the nearest whole second. All events that round down to the same
* second share a single entry in the EventCount. Granularity==0 causes all events (timestamps)
* to be stored in their own entry, which could use large amounts of memory.
*
*/
class EventCount
{
public:
explicit EventCount(const std::string& event_id, Duration time_window,
Duration granularity = Duration(std::chrono::milliseconds(10)));
EventCount(const EventCount&) = delete;
EventCount& operator=(const EventCount&) = delete;
EventCount(EventCount&&); // can't be defaulted in gcc 4.4
EventCount& operator=(EventCount&&); // can't be defaulted in gcc 4.4
const std::string& event_id() const
{
return m_event_id;
}
Duration time_window() const
{
return m_time_window;
}
void dump(std::ostream& os) const;
int count() const;
void increment();
// these defs need not be public once lambdas are available
struct Timestamp
{
TimePoint time_point;
int count;
Timestamp(TimePoint p, int c) : time_point(p), count(c) {}
};
private:
void purge() const; // remove out of window stats
std::string m_event_id;
Duration m_time_window;
Duration::rep m_granularity;
mutable std::vector<Timestamp> m_timestamps;
};
std::ostream& operator<<(std::ostream& os, const EventCount& stats);
// Time series statistics for a Session (a collection of related EventCount).
class SessionCount
{
public:
SessionCount(const std::string& sess_id, Duration time_window,
Duration granularity = Duration(std::chrono::milliseconds(10)));
SessionCount(const SessionCount&) = delete;
SessionCount& operator=(const SessionCount&) = delete;
SessionCount(SessionCount &&); // can't be defaulted in gcc 4.4
SessionCount& operator=(SessionCount&&); // can't be defaulted in gcc 4.4
const std::string& session_id() const
{
return m_sess_id;
}
Duration time_window() const
{
return m_time_window;
}
const std::vector<EventCount>& event_counts() const;
void dump(std::ostream& os) const;
bool empty() const; // no stats
void increment(const std::string& event_id);
private:
void purge() const; // remove out of window stats
std::string m_sess_id;
Duration m_time_window;
Duration m_granularity;
mutable int m_cleanup_countdown;
mutable std::vector<EventCount> m_event_counts;
};
// conveniece. Any real formatted output should go elsewhere.
std::ostream& operator<<(std::ostream& os, const SessionCount& stats);
void dump(std::ostream& os, const std::vector<SessionCount>& sessions);
void dumpTotals(std::ostream& os, const std::vector<SessionCount> &sessions);
} // throttle

View File

@ -1,151 +0,0 @@
/*
* Copyright (c) 2018 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include "stopwatch.hh"
#include <iomanip>
#include <iostream>
#include <sstream>
#include <ctime>
namespace throttle
{
StopWatch::StopWatch()
{
restart();
}
Duration StopWatch::lap() const
{
return {Clock::now() - m_start};
}
Duration StopWatch::restart()
{
TimePoint now = Clock::now();
Duration lap = now - m_start;
m_start = now;
return lap;
}
} // maxscale
/********** OUTPUT ***********/
namespace
{
using namespace throttle;
struct TimeConvert
{
double div; // divide the value of the previous unit by this
std::string suffix; // milliseconds, hours etc.
double max_visual; // threashold to switch to the next unit
};
// Will never get to centuries because the duration is a long carrying nanoseconds
TimeConvert convert[]
{
{1, "ns", 1000}, {1000, "us", 1000}, {1000, "ms", 1000},
{1000, "s", 60}, {60, "min", 60}, {60, "hours", 24},
{24, "days", 365.25}, {365.25, "years", 10000},
{100, "centuries", std::numeric_limits<double>::max()}
};
int convert_size = sizeof(convert) / sizeof(convert[0]);
}
namespace throttle
{
std::pair<double, std::string> dur_to_human_readable(Duration dur)
{
using namespace std::chrono;
double time = duration_cast<nanoseconds>(dur).count();
bool negative = (time < 0) ? time = -time, true : false;
for (int i = 0; i <= convert_size; ++i)
{
if (i == convert_size)
{
return std::make_pair(negative ? -time : time,
convert[convert_size - 1].suffix);
}
time /= convert[i].div;
if (time < convert[i].max_visual)
{
return std::make_pair(negative ? -time : time, convert[i].suffix);
}
}
abort(); // should never get here
}
std::ostream& operator<<(std::ostream& os, Duration dur)
{
auto p = dur_to_human_readable(dur);
os << p.first << p.second;
return os;
}
// TODO: this will require some thought. time_point_to_string() for a system_clock is
// obvious, but not so for a steady_clock. Maybe TimePoint belongs to a system clock
// and sould be called something else here, and live in a time_measuring namespace.
std::string time_point_to_string(TimePoint tp, const std::string &fmt)
{
using namespace std::chrono;
std::time_t timet = system_clock::to_time_t(system_clock::now()
+ (tp - Clock::now()));
struct tm * ptm;
ptm = gmtime (&timet);
const int sz = 1024;
char buf[sz];
strftime(buf, sz, fmt.c_str(), ptm);
return buf;
}
std::ostream & operator<<(std::ostream & os, TimePoint tp)
{
os << time_point_to_string(tp);
return os;
}
void test_stopwatch_output(std::ostream & os)
{
long long dur[] =
{
400, // 400ns
5 * 1000, // 5us
500 * 1000, // 500us
1 * 1000000, // 1ms
700 * 1000000LL, // 700ms
5 * 1000000000LL, // 5s
200 * 1000000000LL, // 200s
5 * 60 * 1000000000LL, // 5m
45 * 60 * 1000000000LL, // 45m
130 * 60 * 1000000000LL, // 130m
24 * 60 * 60 * 1000000000LL, // 24 hours
3 * 24 * 60 * 60 * 1000000000LL, // 72 hours
180 * 24 * 60 * 60 * 1000000000LL, // 180 days
1000 * 24 * 60 * 60 * 1000000000LL // 1000 days
};
for (unsigned i = 0; i < sizeof(dur) / sizeof(dur[0]); ++i)
{
os << Duration(dur[i]) << std::endl;
}
}
} // throttle

View File

@ -1,61 +0,0 @@
/*
* Copyright (c) 2018 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#pragma once
#include <chrono>
#include <iosfwd>
#include <string>
namespace throttle
{
#if __cplusplus >= 201103
typedef std::chrono::steady_clock Clock;
#else
typedef std::chrono::system_clock Clock;
#endif
struct Duration : public Clock::duration // for ADL
{
// gcc 4.4 does not inherit constructors, so this is a bit limited.
Duration() = default;
Duration(long long l) : Clock::duration(l) {}
Duration(Clock::duration d) : Clock::duration(d) {}
};
typedef std::chrono::time_point<Clock, Duration> TimePoint;
class StopWatch
{
public:
// Starts the stopwatch, which is always running.
StopWatch();
// Get elapsed time.
Duration lap() const;
// Get elapsed time, restart StopWatch.
Duration restart();
private:
TimePoint m_start;
};
// Returns the value as a double and string adjusted to a suffix like ms for milliseconds.
std::pair<double, std::string> dur_to_human_readable(Duration dur);
// Human readable output. No standard library for it yet.
std::ostream& operator<<(std::ostream&, Duration dur);
// TimePoint
std::string time_point_to_string(TimePoint tp, const std::string& fmt = "%F %T");
std::ostream& operator<<(std::ostream&, TimePoint tp);
} // throttle

View File

@ -96,9 +96,9 @@ ThrottleFilter * ThrottleFilter::create(const char* zName, char * * pzOptions, M
ThrottleFilter* filter {NULL};
if (config_ok)
{
Duration sampling_duration {std::chrono::milliseconds(sample_msecs)};
Duration throttling_duration {std::chrono::milliseconds(throttle_msecs)};
Duration continuous_duration {std::chrono::milliseconds(cont_msecs)};
maxbase::Duration sampling_duration {std::chrono::milliseconds(sample_msecs)};
maxbase::Duration throttling_duration {std::chrono::milliseconds(throttle_msecs)};
maxbase::Duration continuous_duration {std::chrono::milliseconds(cont_msecs)};
ThrottleConfig config = {max_qps, sampling_duration,
throttling_duration, continuous_duration

View File

@ -14,8 +14,8 @@
#include <maxscale/filter.hh>
#include "throttlesession.hh"
#include "eventcount.hh"
#include "stopwatch.hh"
#include <maxbase/eventcount.hh>
#include <maxbase/stopwatch.hh>
#include <iostream>
#include <thread>
#include <memory>
@ -27,9 +27,9 @@ struct ThrottleConfig
{
int max_qps; // if this many queries per second is exceeded..
Duration sampling_duration; // .. in this time window, then cap qps to max_qps ..
Duration throttling_duration; // .. for this long before disconnect.
Duration continuous_duration; // What time window is considered continuous meddling.
maxbase::Duration sampling_duration; // .. in this time window, then cap qps to max_qps ..
maxbase::Duration throttling_duration; // .. for this long before disconnect.
maxbase::Duration continuous_duration; // What time window is considered continuous meddling.
// Example: max 100qps and sampling 5s. As soon as more than 500 queries are made in less
// then any 5s period throttling is triggered (because 501 > 100qps * 5 s). But also note

View File

@ -14,7 +14,7 @@
#include <maxscale/filter.hh>
#include <maxscale/worker.hh>
#include "eventcount.hh"
#include <maxbase/eventcount.hh>
namespace throttle
{
@ -35,10 +35,10 @@ private:
GWBUF* buffer);
int real_routeQuery(GWBUF* buffer, bool is_delayed);
ThrottleFilter& m_filter;
EventCount m_query_count;
StopWatch m_first_sample;
StopWatch m_last_sample;
uint32_t m_delayed_call_id; // there can be only one in flight
maxbase::EventCount m_query_count;
maxbase::StopWatch m_first_sample;
maxbase::StopWatch m_last_sample;
uint32_t m_delayed_call_id; // there can be only one in flight
enum class State {MEASURING, THROTTLING};
State m_state;