MXS-173 switched to use maxscale::Worker::delayed_call()
Using delayed_call rather than usleep. This caused a fair amount of changes to the timing ascpects (or delaying). Also some other small changes; more config and all durations in milliseconds.
This commit is contained in:
@ -135,7 +135,7 @@ uint64_t WorkerLoad::get_time()
|
|||||||
|
|
||||||
timespec t;
|
timespec t;
|
||||||
|
|
||||||
ss_debug(int rv=)clock_gettime(CLOCK_MONOTONIC, &t);
|
ss_debug(int rv = )clock_gettime(CLOCK_MONOTONIC, &t);
|
||||||
ss_dassert(rv == 0);
|
ss_dassert(rv == 0);
|
||||||
|
|
||||||
return t.tv_sec * 1000 + (t.tv_nsec / 1000000);
|
return t.tv_sec * 1000 + (t.tv_nsec / 1000000);
|
||||||
@ -233,7 +233,7 @@ WorkerTimer::~WorkerTimer()
|
|||||||
|
|
||||||
void WorkerTimer::start(int32_t interval)
|
void WorkerTimer::start(int32_t interval)
|
||||||
{
|
{
|
||||||
ss_dassert(interval > 0);
|
ss_dassert(interval >= 0);
|
||||||
|
|
||||||
// TODO: Add possibility to set initial delay and interval.
|
// TODO: Add possibility to set initial delay and interval.
|
||||||
time_t initial_sec = interval / 1000;
|
time_t initial_sec = interval / 1000;
|
||||||
@ -1126,7 +1126,7 @@ namespace
|
|||||||
int64_t get_current_time_ms()
|
int64_t get_current_time_ms()
|
||||||
{
|
{
|
||||||
struct timespec ts;
|
struct timespec ts;
|
||||||
ss_debug(int rv =) clock_gettime(CLOCK_MONOTONIC, &ts);
|
ss_debug(int rv = ) clock_gettime(CLOCK_MONOTONIC, &ts);
|
||||||
ss_dassert(rv == 0);
|
ss_dassert(rv == 0);
|
||||||
|
|
||||||
return ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
|
return ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
|
||||||
|
|||||||
@ -33,7 +33,7 @@ class EventCount
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
explicit EventCount(const std::string& event_id, Duration time_window,
|
explicit EventCount(const std::string& event_id, Duration time_window,
|
||||||
Duration granularity = Duration(std::chrono::milliseconds(100)));
|
Duration granularity = Duration(std::chrono::milliseconds(10)));
|
||||||
EventCount(const EventCount&) = delete;
|
EventCount(const EventCount&) = delete;
|
||||||
EventCount& operator=(const EventCount&) = delete;
|
EventCount& operator=(const EventCount&) = delete;
|
||||||
EventCount(EventCount&&); // can't be defaulted in gcc 4.4
|
EventCount(EventCount&&); // can't be defaulted in gcc 4.4
|
||||||
@ -74,7 +74,7 @@ class SessionCount
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
SessionCount(const std::string& sess_id, Duration time_window,
|
SessionCount(const std::string& sess_id, Duration time_window,
|
||||||
Duration granularity = Duration(std::chrono::seconds(1)));
|
Duration granularity = Duration(std::chrono::milliseconds(10)));
|
||||||
SessionCount(const SessionCount&) = delete;
|
SessionCount(const SessionCount&) = delete;
|
||||||
SessionCount& operator=(const SessionCount&) = delete;
|
SessionCount& operator=(const SessionCount&) = delete;
|
||||||
SessionCount(SessionCount &&); // can't be defaulted in gcc 4.4
|
SessionCount(SessionCount &&); // can't be defaulted in gcc 4.4
|
||||||
|
|||||||
@ -22,11 +22,11 @@
|
|||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
const char* const MAX_QPS_CFG = "max_qps";
|
const char* const MAX_QPS_CFG = "max_qps";
|
||||||
const char* const TRIGGER_DURATION_CFG = "trigger_duration";
|
const char* const SAMPLING_DURATION_CFG = "sampling_duration";
|
||||||
const char* const THROTTLE_DURATION_CFG = "throttle_duration";
|
const char* const THROTTLE_DURATION_CFG = "throttling_duration";
|
||||||
|
const char* const CONTINUOUS_DURATION_CFG = "continuous_duration";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
extern "C" MXS_MODULE* MXS_CREATE_MODULE()
|
extern "C" MXS_MODULE* MXS_CREATE_MODULE()
|
||||||
{
|
{
|
||||||
static MXS_MODULE info =
|
static MXS_MODULE info =
|
||||||
@ -44,8 +44,9 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
|
|||||||
NULL, /* Thread finish. */
|
NULL, /* Thread finish. */
|
||||||
{
|
{
|
||||||
{MAX_QPS_CFG, MXS_MODULE_PARAM_INT},
|
{MAX_QPS_CFG, MXS_MODULE_PARAM_INT},
|
||||||
{TRIGGER_DURATION_CFG, MXS_MODULE_PARAM_INT},
|
{SAMPLING_DURATION_CFG, MXS_MODULE_PARAM_INT, "250"},
|
||||||
{THROTTLE_DURATION_CFG, MXS_MODULE_PARAM_INT},
|
{THROTTLE_DURATION_CFG, MXS_MODULE_PARAM_INT},
|
||||||
|
{CONTINUOUS_DURATION_CFG, MXS_MODULE_PARAM_INT, "2000"},
|
||||||
{ MXS_END_MODULE_PARAMS }
|
{ MXS_END_MODULE_PARAMS }
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -63,8 +64,9 @@ ThrottleFilter::ThrottleFilter(const ThrottleConfig &config) : m_config(config)
|
|||||||
ThrottleFilter * ThrottleFilter::create(const char* zName, char * * pzOptions, MXS_CONFIG_PARAMETER * pParams)
|
ThrottleFilter * ThrottleFilter::create(const char* zName, char * * pzOptions, MXS_CONFIG_PARAMETER * pParams)
|
||||||
{
|
{
|
||||||
int max_qps = config_get_integer(pParams, MAX_QPS_CFG);
|
int max_qps = config_get_integer(pParams, MAX_QPS_CFG);
|
||||||
int trigger_secs = config_get_integer(pParams, TRIGGER_DURATION_CFG);
|
int sample_msecs = config_get_integer(pParams, SAMPLING_DURATION_CFG);
|
||||||
int throttle_secs = config_get_integer(pParams, THROTTLE_DURATION_CFG);
|
int throttle_msecs = config_get_integer(pParams, THROTTLE_DURATION_CFG);
|
||||||
|
int cont_msecs = config_get_integer(pParams, CONTINUOUS_DURATION_CFG);
|
||||||
bool config_ok = true;
|
bool config_ok = true;
|
||||||
|
|
||||||
if (max_qps < 2)
|
if (max_qps < 2)
|
||||||
@ -73,29 +75,34 @@ ThrottleFilter * ThrottleFilter::create(const char* zName, char * * pzOptions, M
|
|||||||
config_ok = false;
|
config_ok = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (trigger_secs < 1)
|
if (sample_msecs < 0)
|
||||||
{
|
{
|
||||||
MXS_ERROR("Config value %s must be > 0", TRIGGER_DURATION_CFG);
|
MXS_ERROR("Config value %s must be >= 0", SAMPLING_DURATION_CFG);
|
||||||
config_ok = false;
|
config_ok = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (throttle_secs < 0)
|
if (throttle_msecs <= 0)
|
||||||
{
|
{
|
||||||
MXS_ERROR("Config value %s must be >= 0", THROTTLE_DURATION_CFG);
|
MXS_ERROR("Config value %s must be > 0", THROTTLE_DURATION_CFG);
|
||||||
config_ok = false;
|
config_ok = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
Duration trigger_duration {std::chrono::seconds(trigger_secs)};
|
if (cont_msecs < 0)
|
||||||
Duration throttle_duration {std::chrono::seconds(throttle_secs)};
|
{
|
||||||
|
MXS_ERROR("Config value %s must be >= 0", CONTINUOUS_DURATION_CFG);
|
||||||
|
config_ok = false;
|
||||||
|
}
|
||||||
|
|
||||||
ThrottleFilter* filter {NULL};
|
ThrottleFilter* filter {NULL};
|
||||||
if (config_ok)
|
if (config_ok)
|
||||||
{
|
{
|
||||||
ThrottleConfig config = {max_qps, trigger_duration, throttle_duration};
|
Duration sampling_duration {std::chrono::milliseconds(sample_msecs)};
|
||||||
|
Duration throttling_duration {std::chrono::milliseconds(throttle_msecs)};
|
||||||
|
Duration continuous_duration {std::chrono::milliseconds(cont_msecs)};
|
||||||
|
|
||||||
std::ostringstream os1, os2;
|
ThrottleConfig config = {max_qps, sampling_duration,
|
||||||
os1 << config.trigger_duration;
|
throttling_duration, continuous_duration
|
||||||
os2 << config.throttle_duration;
|
};
|
||||||
|
|
||||||
filter = new ThrottleFilter(config);
|
filter = new ThrottleFilter(config);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -27,18 +27,21 @@ struct ThrottleConfig
|
|||||||
{
|
{
|
||||||
|
|
||||||
int max_qps; // if this many queries per second is exceeded..
|
int max_qps; // if this many queries per second is exceeded..
|
||||||
Duration trigger_duration; // .. in this time window, then cap qps to max_qps ..
|
Duration sampling_duration; // .. in this time window, then cap qps to max_qps ..
|
||||||
Duration throttle_duration; // .. for this long before disconnect.
|
Duration throttling_duration; // .. for this long before disconnect.
|
||||||
// Example: max 100qps and trigger 5s. As soon as more than 500 queries happen in less than
|
Duration continuous_duration; // What time window is considered continuous meddling.
|
||||||
// 5s the action is triggered. So, 501 queries in one second is a trigger, but 400qps
|
|
||||||
// for one second is acceptable as long as the qps stayed low for the previous 4 seconds,
|
// Example: max 100qps and sampling 5s. As soon as more than 500 queries are made in less
|
||||||
// and stays low for the next 4 seconds. In other words, a short burst>max is not a trigger.
|
// then any 5s period throttling is triggered (because 501 > 100qps * 5 s). But also note
|
||||||
// Further, say max_throttle_duration is 60s. If the qps is continously capped (throttled)
|
// that qps can stay at 200qps for 2.5s before throttling starts.
|
||||||
// for 60s, the session is disconnected.
|
|
||||||
|
// Once throttling has started a countdown for the throttling_duration is started. Throttling
|
||||||
|
// is stopped if the qps stays below max_qps for continuous_duration. If throttling continues
|
||||||
|
// for more than throttling_duration, the session is disconnected.
|
||||||
|
|
||||||
// TODO: this should probably depend on overall activity. If this is to protect the
|
// TODO: this should probably depend on overall activity. If this is to protect the
|
||||||
// database, multiple sessions gone haywire will still cause problems. It would be quite
|
// database, multiple sessions gone haywire will still cause problems. It would be quite
|
||||||
// east to add a counter into the filter to measure overall qps. On the other hand, if
|
// easy to add a counter into the filter to measure overall qps. On the other hand, if
|
||||||
// a single session is active, it should be allowed to run at whatever the absolute
|
// a single session is active, it should be allowed to run at whatever the absolute
|
||||||
// allowable speed is.
|
// allowable speed is.
|
||||||
};
|
};
|
||||||
|
|||||||
@ -23,57 +23,102 @@
|
|||||||
#include <string>
|
#include <string>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
|
#include <cmath>
|
||||||
|
|
||||||
namespace throttle
|
namespace throttle
|
||||||
{
|
{
|
||||||
ThrottleSession::ThrottleSession(MXS_SESSION* mxsSession, ThrottleFilter &filter)
|
ThrottleSession::ThrottleSession(MXS_SESSION* mxsSession, ThrottleFilter &filter)
|
||||||
: maxscale::FilterSession(mxsSession),
|
: maxscale::FilterSession(mxsSession),
|
||||||
m_filter(filter),
|
m_filter(filter),
|
||||||
m_query_count("num-queries", filter.config().trigger_duration),
|
m_query_count("num-queries", filter.config().sampling_duration),
|
||||||
|
m_delayed_call_id(0),
|
||||||
m_state(State::MEASURING)
|
m_state(State::MEASURING)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
int ThrottleSession::routeQuery(GWBUF *buffer)
|
ThrottleSession::~ThrottleSession()
|
||||||
|
{
|
||||||
|
if (m_delayed_call_id)
|
||||||
|
{
|
||||||
|
maxscale::Worker* worker = maxscale::Worker::get_current();
|
||||||
|
ss_dassert(worker);
|
||||||
|
worker->cancel_delayed_call(m_delayed_call_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int ThrottleSession::real_routeQuery(GWBUF *buffer, bool is_delayed)
|
||||||
{
|
{
|
||||||
// TODO: count everything, or filter something out?
|
|
||||||
using namespace std::chrono;
|
using namespace std::chrono;
|
||||||
|
|
||||||
m_query_count.increment();
|
|
||||||
int count = m_query_count.count();
|
int count = m_query_count.count();
|
||||||
int secs = duration_cast<seconds>(m_filter.config().trigger_duration).count();
|
// not in g++ 4.4: duration<float>(x).count(), so
|
||||||
|
long micro = duration_cast<microseconds>(m_filter.config().sampling_duration).count();
|
||||||
|
float secs = micro / 1000000.0;
|
||||||
float qps = count / secs; // not instantaneous, but over so many seconds
|
float qps = count / secs; // not instantaneous, but over so many seconds
|
||||||
|
|
||||||
if (qps >= m_filter.config().max_qps) // trigger
|
if (!is_delayed && qps >= m_filter.config().max_qps) // trigger
|
||||||
{
|
{
|
||||||
// sleep a few cycles, keeping qps near max.
|
// delay the current routeQuery for at least one cycle at stated max speed.
|
||||||
usleep(4 * 1000000 / qps); // to be replaced with delayed calls
|
int32_t delay = 1 + std::ceil(1000.0 / m_filter.config().max_qps);
|
||||||
|
maxscale::Worker* worker = maxscale::Worker::get_current();
|
||||||
|
ss_dassert(worker);
|
||||||
|
m_delayed_call_id = worker->delayed_call(delay, &ThrottleSession::delayed_routeQuery,
|
||||||
|
this, buffer);
|
||||||
if (m_state == State::MEASURING)
|
if (m_state == State::MEASURING)
|
||||||
{
|
{
|
||||||
MXS_INFO("Query throttling STARTED session %ld user %s",
|
MXS_INFO("Query throttling STARTED session %ld user %s",
|
||||||
m_pSession->ses_id, m_pSession->client_dcb->user);
|
m_pSession->ses_id, m_pSession->client_dcb->user);
|
||||||
m_state = State::THROTTLING;
|
m_state = State::THROTTLING;
|
||||||
m_first_trigger.restart();
|
m_first_sample.restart();
|
||||||
}
|
}
|
||||||
m_last_trigger.restart();
|
|
||||||
|
m_last_sample.restart();
|
||||||
|
|
||||||
|
// Filter pipeline ok thus far, will continue after the delay
|
||||||
|
// from this point in the pipeline.
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
else if (m_state == State::THROTTLING)
|
else if (m_state == State::THROTTLING)
|
||||||
{
|
{
|
||||||
if (m_last_trigger.lap() > Duration(std::chrono::seconds(2))) // TODO, might be ok though.
|
if (m_last_sample.lap() > m_filter.config().continuous_duration)
|
||||||
{
|
{
|
||||||
m_state = State::MEASURING;
|
m_state = State::MEASURING;
|
||||||
MXS_INFO("Query throttling stopped session %ld user %s",
|
MXS_INFO("Query throttling stopped session %ld user %s",
|
||||||
m_pSession->ses_id, m_pSession->client_dcb->user);
|
m_pSession->ses_id, m_pSession->client_dcb->user);
|
||||||
}
|
}
|
||||||
else if (m_first_trigger.lap() > m_filter.config().throttle_duration)
|
else if (m_first_sample.lap() > m_filter.config().throttling_duration)
|
||||||
{
|
{
|
||||||
MXS_NOTICE("Session %ld user %s, qps throttling limit reached. Disconnect.",
|
MXS_NOTICE("Query throttling Session %ld user %s, throttling limit reached. Disconnect.",
|
||||||
m_pSession->ses_id, m_pSession->client_dcb->user);
|
m_pSession->ses_id, m_pSession->client_dcb->user);
|
||||||
return false; // disconnect
|
return false; // disconnect
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m_query_count.increment();
|
||||||
|
|
||||||
return mxs::FilterSession::routeQuery(buffer);
|
return mxs::FilterSession::routeQuery(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ThrottleSession::delayed_routeQuery(maxscale::Worker::Call::action_t action, GWBUF *buffer)
|
||||||
|
{
|
||||||
|
m_delayed_call_id = 0;
|
||||||
|
switch (action)
|
||||||
|
{
|
||||||
|
case maxscale::Worker::Call::EXECUTE:
|
||||||
|
real_routeQuery(buffer, true);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case maxscale::Worker::Call::CANCEL:
|
||||||
|
gwbuf_free(buffer);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ThrottleSession::routeQuery(GWBUF *buffer)
|
||||||
|
{
|
||||||
|
return real_routeQuery(buffer, false);
|
||||||
|
}
|
||||||
|
|
||||||
} // throttle
|
} // throttle
|
||||||
|
|||||||
@ -13,6 +13,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <maxscale/filter.hh>
|
#include <maxscale/filter.hh>
|
||||||
|
#include <maxscale/worker.hh>
|
||||||
#include "eventcount.hh"
|
#include "eventcount.hh"
|
||||||
|
|
||||||
namespace throttle
|
namespace throttle
|
||||||
@ -26,14 +27,18 @@ public:
|
|||||||
ThrottleSession(MXS_SESSION* pSession, ThrottleFilter& filter);
|
ThrottleSession(MXS_SESSION* pSession, ThrottleFilter& filter);
|
||||||
ThrottleSession(const ThrottleSession&) = delete;
|
ThrottleSession(const ThrottleSession&) = delete;
|
||||||
ThrottleSession& operator = (const ThrottleSession&) = delete;
|
ThrottleSession& operator = (const ThrottleSession&) = delete;
|
||||||
|
~ThrottleSession();
|
||||||
|
|
||||||
int routeQuery(GWBUF* buffer);
|
int routeQuery(GWBUF* buffer);
|
||||||
private:
|
private:
|
||||||
|
bool delayed_routeQuery(maxscale::Worker::Call::action_t action,
|
||||||
|
GWBUF* buffer);
|
||||||
|
int real_routeQuery(GWBUF* buffer, bool is_delayed);
|
||||||
ThrottleFilter& m_filter;
|
ThrottleFilter& m_filter;
|
||||||
EventCount m_query_count;
|
EventCount m_query_count;
|
||||||
StopWatch m_first_trigger;
|
StopWatch m_first_sample;
|
||||||
StopWatch m_last_trigger;
|
StopWatch m_last_sample;
|
||||||
StopWatch remove_me;
|
uint32_t m_delayed_call_id; // there can be only one in flight
|
||||||
|
|
||||||
enum class State {MEASURING, THROTTLING};
|
enum class State {MEASURING, THROTTLING};
|
||||||
State m_state;
|
State m_state;
|
||||||
|
|||||||
Reference in New Issue
Block a user