diff --git a/server/core/worker.cc b/server/core/worker.cc index 9e7e47a7d..ea1e1c0dd 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -135,7 +135,7 @@ uint64_t WorkerLoad::get_time() timespec t; - ss_debug(int rv=)clock_gettime(CLOCK_MONOTONIC, &t); + ss_debug(int rv = )clock_gettime(CLOCK_MONOTONIC, &t); ss_dassert(rv == 0); return t.tv_sec * 1000 + (t.tv_nsec / 1000000); @@ -233,7 +233,7 @@ WorkerTimer::~WorkerTimer() void WorkerTimer::start(int32_t interval) { - ss_dassert(interval > 0); + ss_dassert(interval >= 0); // TODO: Add possibility to set initial delay and interval. time_t initial_sec = interval / 1000; @@ -1126,7 +1126,7 @@ namespace int64_t get_current_time_ms() { struct timespec ts; - ss_debug(int rv =) clock_gettime(CLOCK_MONOTONIC, &ts); + ss_debug(int rv = ) clock_gettime(CLOCK_MONOTONIC, &ts); ss_dassert(rv == 0); return ts.tv_sec * 1000 + ts.tv_nsec / 1000000; diff --git a/server/modules/filter/throttlefilter/eventcount.hh b/server/modules/filter/throttlefilter/eventcount.hh index fc7c7a5ed..b9725c795 100644 --- a/server/modules/filter/throttlefilter/eventcount.hh +++ b/server/modules/filter/throttlefilter/eventcount.hh @@ -33,7 +33,7 @@ class EventCount { public: explicit EventCount(const std::string& event_id, Duration time_window, - Duration granularity = Duration(std::chrono::milliseconds(100))); + Duration granularity = Duration(std::chrono::milliseconds(10))); EventCount(const EventCount&) = delete; EventCount& operator=(const EventCount&) = delete; EventCount(EventCount&&); // can't be defaulted in gcc 4.4 @@ -74,7 +74,7 @@ class SessionCount { public: SessionCount(const std::string& sess_id, Duration time_window, - Duration granularity = Duration(std::chrono::seconds(1))); + Duration granularity = Duration(std::chrono::milliseconds(10))); SessionCount(const SessionCount&) = delete; SessionCount& operator=(const SessionCount&) = delete; SessionCount(SessionCount &&); // can't be defaulted in gcc 4.4 diff --git a/server/modules/filter/throttlefilter/throttlefilter.cc b/server/modules/filter/throttlefilter/throttlefilter.cc index d999147d7..2afef0ad5 100644 --- a/server/modules/filter/throttlefilter/throttlefilter.cc +++ b/server/modules/filter/throttlefilter/throttlefilter.cc @@ -22,11 +22,11 @@ namespace { const char* const MAX_QPS_CFG = "max_qps"; -const char* const TRIGGER_DURATION_CFG = "trigger_duration"; -const char* const THROTTLE_DURATION_CFG = "throttle_duration"; +const char* const SAMPLING_DURATION_CFG = "sampling_duration"; +const char* const THROTTLE_DURATION_CFG = "throttling_duration"; +const char* const CONTINUOUS_DURATION_CFG = "continuous_duration"; } - extern "C" MXS_MODULE* MXS_CREATE_MODULE() { static MXS_MODULE info = @@ -44,8 +44,9 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE() NULL, /* Thread finish. */ { {MAX_QPS_CFG, MXS_MODULE_PARAM_INT}, - {TRIGGER_DURATION_CFG, MXS_MODULE_PARAM_INT}, + {SAMPLING_DURATION_CFG, MXS_MODULE_PARAM_INT, "250"}, {THROTTLE_DURATION_CFG, MXS_MODULE_PARAM_INT}, + {CONTINUOUS_DURATION_CFG, MXS_MODULE_PARAM_INT, "2000"}, { MXS_END_MODULE_PARAMS } } }; @@ -62,10 +63,11 @@ ThrottleFilter::ThrottleFilter(const ThrottleConfig &config) : m_config(config) ThrottleFilter * ThrottleFilter::create(const char* zName, char * * pzOptions, MXS_CONFIG_PARAMETER * pParams) { - int max_qps = config_get_integer(pParams, MAX_QPS_CFG); - int trigger_secs = config_get_integer(pParams, TRIGGER_DURATION_CFG); - int throttle_secs = config_get_integer(pParams, THROTTLE_DURATION_CFG); - bool config_ok = true; + int max_qps = config_get_integer(pParams, MAX_QPS_CFG); + int sample_msecs = config_get_integer(pParams, SAMPLING_DURATION_CFG); + int throttle_msecs = config_get_integer(pParams, THROTTLE_DURATION_CFG); + int cont_msecs = config_get_integer(pParams, CONTINUOUS_DURATION_CFG); + bool config_ok = true; if (max_qps < 2) { @@ -73,29 +75,34 @@ ThrottleFilter * ThrottleFilter::create(const char* zName, char * * pzOptions, M config_ok = false; } - if (trigger_secs < 1) + if (sample_msecs < 0) { - MXS_ERROR("Config value %s must be > 0", TRIGGER_DURATION_CFG); + MXS_ERROR("Config value %s must be >= 0", SAMPLING_DURATION_CFG); config_ok = false; } - if (throttle_secs < 0) + if (throttle_msecs <= 0) { - MXS_ERROR("Config value %s must be >= 0", THROTTLE_DURATION_CFG); + MXS_ERROR("Config value %s must be > 0", THROTTLE_DURATION_CFG); config_ok = false; } - Duration trigger_duration {std::chrono::seconds(trigger_secs)}; - Duration throttle_duration {std::chrono::seconds(throttle_secs)}; + if (cont_msecs < 0) + { + MXS_ERROR("Config value %s must be >= 0", CONTINUOUS_DURATION_CFG); + config_ok = false; + } ThrottleFilter* filter {NULL}; if (config_ok) { - ThrottleConfig config = {max_qps, trigger_duration, throttle_duration}; + Duration sampling_duration {std::chrono::milliseconds(sample_msecs)}; + Duration throttling_duration {std::chrono::milliseconds(throttle_msecs)}; + Duration continuous_duration {std::chrono::milliseconds(cont_msecs)}; - std::ostringstream os1, os2; - os1 << config.trigger_duration; - os2 << config.throttle_duration; + ThrottleConfig config = {max_qps, sampling_duration, + throttling_duration, continuous_duration + }; filter = new ThrottleFilter(config); } diff --git a/server/modules/filter/throttlefilter/throttlefilter.hh b/server/modules/filter/throttlefilter/throttlefilter.hh index 3b8430796..5d9af1b4b 100644 --- a/server/modules/filter/throttlefilter/throttlefilter.hh +++ b/server/modules/filter/throttlefilter/throttlefilter.hh @@ -26,19 +26,22 @@ namespace throttle struct ThrottleConfig { - int max_qps; // if this many queries per second is exceeded.. - Duration trigger_duration; // .. in this time window, then cap qps to max_qps .. - Duration throttle_duration; // .. for this long before disconnect. - // Example: max 100qps and trigger 5s. As soon as more than 500 queries happen in less than - // 5s the action is triggered. So, 501 queries in one second is a trigger, but 400qps - // for one second is acceptable as long as the qps stayed low for the previous 4 seconds, - // and stays low for the next 4 seconds. In other words, a short burst>max is not a trigger. - // Further, say max_throttle_duration is 60s. If the qps is continously capped (throttled) - // for 60s, the session is disconnected. + int max_qps; // if this many queries per second is exceeded.. + Duration sampling_duration; // .. in this time window, then cap qps to max_qps .. + Duration throttling_duration; // .. for this long before disconnect. + Duration continuous_duration; // What time window is considered continuous meddling. + + // Example: max 100qps and sampling 5s. As soon as more than 500 queries are made in less + // then any 5s period throttling is triggered (because 501 > 100qps * 5 s). But also note + // that qps can stay at 200qps for 2.5s before throttling starts. + + // Once throttling has started a countdown for the throttling_duration is started. Throttling + // is stopped if the qps stays below max_qps for continuous_duration. If throttling continues + // for more than throttling_duration, the session is disconnected. // TODO: this should probably depend on overall activity. If this is to protect the // database, multiple sessions gone haywire will still cause problems. It would be quite - // east to add a counter into the filter to measure overall qps. On the other hand, if + // easy to add a counter into the filter to measure overall qps. On the other hand, if // a single session is active, it should be allowed to run at whatever the absolute // allowable speed is. }; diff --git a/server/modules/filter/throttlefilter/throttlesession.cc b/server/modules/filter/throttlefilter/throttlesession.cc index b70f4b3bc..6b8d99836 100644 --- a/server/modules/filter/throttlefilter/throttlesession.cc +++ b/server/modules/filter/throttlefilter/throttlesession.cc @@ -23,57 +23,102 @@ #include #include #include +#include namespace throttle { ThrottleSession::ThrottleSession(MXS_SESSION* mxsSession, ThrottleFilter &filter) : maxscale::FilterSession(mxsSession), m_filter(filter), - m_query_count("num-queries", filter.config().trigger_duration), + m_query_count("num-queries", filter.config().sampling_duration), + m_delayed_call_id(0), m_state(State::MEASURING) { } -int ThrottleSession::routeQuery(GWBUF *buffer) +ThrottleSession::~ThrottleSession() +{ + if (m_delayed_call_id) + { + maxscale::Worker* worker = maxscale::Worker::get_current(); + ss_dassert(worker); + worker->cancel_delayed_call(m_delayed_call_id); + } +} + +int ThrottleSession::real_routeQuery(GWBUF *buffer, bool is_delayed) { - // TODO: count everything, or filter something out? using namespace std::chrono; - m_query_count.increment(); - int count = m_query_count.count(); - int secs = duration_cast(m_filter.config().trigger_duration).count(); - float qps = count / secs; // not instantaneous, but over so many seconds + int count = m_query_count.count(); + // not in g++ 4.4: duration(x).count(), so + long micro = duration_cast(m_filter.config().sampling_duration).count(); + float secs = micro / 1000000.0; + float qps = count / secs; // not instantaneous, but over so many seconds - if (qps >= m_filter.config().max_qps) // trigger + if (!is_delayed && qps >= m_filter.config().max_qps) // trigger { - // sleep a few cycles, keeping qps near max. - usleep(4 * 1000000 / qps); // to be replaced with delayed calls - + // delay the current routeQuery for at least one cycle at stated max speed. + int32_t delay = 1 + std::ceil(1000.0 / m_filter.config().max_qps); + maxscale::Worker* worker = maxscale::Worker::get_current(); + ss_dassert(worker); + m_delayed_call_id = worker->delayed_call(delay, &ThrottleSession::delayed_routeQuery, + this, buffer); if (m_state == State::MEASURING) { MXS_INFO("Query throttling STARTED session %ld user %s", m_pSession->ses_id, m_pSession->client_dcb->user); m_state = State::THROTTLING; - m_first_trigger.restart(); + m_first_sample.restart(); } - m_last_trigger.restart(); + + m_last_sample.restart(); + + // Filter pipeline ok thus far, will continue after the delay + // from this point in the pipeline. + return true; } else if (m_state == State::THROTTLING) { - if (m_last_trigger.lap() > Duration(std::chrono::seconds(2))) // TODO, might be ok though. + if (m_last_sample.lap() > m_filter.config().continuous_duration) { m_state = State::MEASURING; MXS_INFO("Query throttling stopped session %ld user %s", m_pSession->ses_id, m_pSession->client_dcb->user); } - else if (m_first_trigger.lap() > m_filter.config().throttle_duration) + else if (m_first_sample.lap() > m_filter.config().throttling_duration) { - MXS_NOTICE("Session %ld user %s, qps throttling limit reached. Disconnect.", + MXS_NOTICE("Query throttling Session %ld user %s, throttling limit reached. Disconnect.", m_pSession->ses_id, m_pSession->client_dcb->user); return false; // disconnect } } + m_query_count.increment(); + return mxs::FilterSession::routeQuery(buffer); } + +bool ThrottleSession::delayed_routeQuery(maxscale::Worker::Call::action_t action, GWBUF *buffer) +{ + m_delayed_call_id = 0; + switch (action) + { + case maxscale::Worker::Call::EXECUTE: + real_routeQuery(buffer, true); + break; + + case maxscale::Worker::Call::CANCEL: + gwbuf_free(buffer); + break; + } + + return false; +} + +int ThrottleSession::routeQuery(GWBUF *buffer) +{ + return real_routeQuery(buffer, false); +} + } // throttle diff --git a/server/modules/filter/throttlefilter/throttlesession.hh b/server/modules/filter/throttlefilter/throttlesession.hh index 698a2109f..5107b8678 100644 --- a/server/modules/filter/throttlefilter/throttlesession.hh +++ b/server/modules/filter/throttlefilter/throttlesession.hh @@ -13,6 +13,7 @@ #pragma once #include +#include #include "eventcount.hh" namespace throttle @@ -26,14 +27,18 @@ public: ThrottleSession(MXS_SESSION* pSession, ThrottleFilter& filter); ThrottleSession(const ThrottleSession&) = delete; ThrottleSession& operator = (const ThrottleSession&) = delete; + ~ThrottleSession(); int routeQuery(GWBUF* buffer); private: + bool delayed_routeQuery(maxscale::Worker::Call::action_t action, + GWBUF* buffer); + int real_routeQuery(GWBUF* buffer, bool is_delayed); ThrottleFilter& m_filter; EventCount m_query_count; - StopWatch m_first_trigger; - StopWatch m_last_trigger; - StopWatch remove_me; + StopWatch m_first_sample; + StopWatch m_last_sample; + uint32_t m_delayed_call_id; // there can be only one in flight enum class State {MEASURING, THROTTLING}; State m_state;