From 6351ab9c73ae96a6104eb4fc781b59df1aa962d5 Mon Sep 17 00:00:00 2001 From: Niclas Antti Date: Fri, 27 Jul 2018 13:07:18 +0300 Subject: [PATCH] MXS-1777: Initial version of routing based on query response time. The main piece of code, slave selection (backend_cmp_response_time), uses the available method of pair-wise comparison of slaves. This will be changed to selection using all available slaves, along with removal of hard coded values. --- server/core/server.cc | 11 ++- server/modules/routing/cat/CMakeLists.txt | 2 +- .../routing/readwritesplit/CMakeLists.txt | 4 +- .../routing/readwritesplit/response_stat.cc | 94 +++++++++++++++++++ .../routing/readwritesplit/response_stat.hh | 61 ++++++++++++ .../routing/readwritesplit/rwbackend.cc | 5 + .../routing/readwritesplit/rwbackend.hh | 5 +- .../readwritesplit/rwsplit_route_stmt.cc | 12 ++- .../readwritesplit/rwsplit_select_backends.cc | 44 ++++++++- .../routing/readwritesplit/rwsplitsession.cc | 22 +++++ 10 files changed, 250 insertions(+), 10 deletions(-) create mode 100644 server/modules/routing/readwritesplit/response_stat.cc create mode 100644 server/modules/routing/readwritesplit/response_stat.hh diff --git a/server/core/server.cc b/server/core/server.cc index 75505e779..b51e05f65 100644 --- a/server/core/server.cc +++ b/server/core/server.cc @@ -29,6 +29,7 @@ #include #include #include +#include #include @@ -1508,9 +1509,15 @@ bool server_set_disk_space_threshold(SERVER *server, const char *disk_space_thre return rv; } +namespace +{ +// Only need to prevent multiple writes, as long as only the average is read where +// needed (and not in combination with num_samples), which is by design. +std::mutex add_response_mutex; +} + void server_add_response_average(SERVER *server, double ave, int num_samples) { - spinlock_acquire(&server->lock); + std::lock_guard lock(add_response_mutex); server->response_time->add(ave, num_samples); - spinlock_release(&server->lock); } diff --git a/server/modules/routing/cat/CMakeLists.txt b/server/modules/routing/cat/CMakeLists.txt index 2183bc598..72106aa56 100644 --- a/server/modules/routing/cat/CMakeLists.txt +++ b/server/modules/routing/cat/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(cat SHARED cat.cc catsession.cc ../readwritesplit/rwbackend.cc) +add_library(cat SHARED cat.cc catsession.cc ../readwritesplit/rwbackend.cc ../readwritesplit/response_stat.cc) target_link_libraries(cat maxscale-common mysqlcommon) set_target_properties(cat PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs) install_module(cat experimental) diff --git a/server/modules/routing/readwritesplit/CMakeLists.txt b/server/modules/routing/readwritesplit/CMakeLists.txt index 031633058..53b583ae8 100644 --- a/server/modules/routing/readwritesplit/CMakeLists.txt +++ b/server/modules/routing/readwritesplit/CMakeLists.txt @@ -5,7 +5,9 @@ rwsplit_mysql.cc rwsplit_route_stmt.cc rwsplit_select_backends.cc rwsplit_session_cmd.cc -rwbackend.cc) +rwbackend.cc +response_stat.cc +) target_link_libraries(readwritesplit maxscale-common mysqlcommon) set_target_properties(readwritesplit PROPERTIES VERSION "1.0.2" LINK_FLAGS -Wl,-z,defs) install_module(readwritesplit core) diff --git a/server/modules/routing/readwritesplit/response_stat.cc b/server/modules/routing/readwritesplit/response_stat.cc new file mode 100644 index 000000000..89cef4d91 --- /dev/null +++ b/server/modules/routing/readwritesplit/response_stat.cc @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2022-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ +#include "response_stat.hh" + +#include + +namespace maxscale +{ +ResponseStat::ResponseStat(int ignore_first_n, int num_filter_samples, + maxbase::Duration sync_duration) : + m_ignore_first_n{ignore_first_n}, + m_num_filter_samples {num_filter_samples}, + m_sync_duration{sync_duration}, + m_sample_count{0}, + m_samples(num_filter_samples), + m_last_start{maxbase::TimePoint()}, + m_next_sync{maxbase::Clock::now() + sync_duration} +{} + + +void ResponseStat::query_started() +{ + if (m_ignore_first_n) + { + --m_ignore_first_n; + return; + } + m_last_start = maxbase::Clock::now(); +} + +void ResponseStat::query_ended() +{ + if (m_last_start == maxbase::TimePoint()) + { + // m_last_start is defaulted. Ignore, avoids extra logic in call sites. + return; + } + m_samples[m_sample_count] = maxbase::Clock::now() - m_last_start; + + if (++m_sample_count == m_num_filter_samples) + { + std::sort(begin(m_samples), end(m_samples)); + maxbase::Duration new_sample = m_samples[m_num_filter_samples / 2]; + m_average.add(std::chrono::duration(new_sample).count()); + m_sample_count = 0; + } + m_last_start = maxbase::TimePoint(); +} + +bool ResponseStat::is_valid() const +{ + return m_average.num_samples(); +} + +int ResponseStat::num_samples() const +{ + return m_average.num_samples(); +} + +maxbase::Duration ResponseStat::average() const +{ + return maxbase::Duration(m_average.average()); +} + +bool ResponseStat::sync_time_reached(int num_synch_medians) +{ + auto now = maxbase::Clock::now(); + bool reached = m_next_sync < now || m_average.num_samples() >= num_synch_medians; + if (reached) + { + m_next_sync = now + m_sync_duration; + } + return reached; +} + +void ResponseStat::reset() +{ + m_sample_count = 0; + m_average.reset(); + m_next_sync = maxbase::Clock::now() + m_sync_duration; +} + +} + diff --git a/server/modules/routing/readwritesplit/response_stat.hh b/server/modules/routing/readwritesplit/response_stat.hh new file mode 100644 index 000000000..faa6295a6 --- /dev/null +++ b/server/modules/routing/readwritesplit/response_stat.hh @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2022-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ +#pragma once + +#include + +#include +#include + +/** This could arguably be a utility, but is written specifically for rwsplit + * so it stays here at least for now. + */ +namespace maxscale +{ + +/** + * Query response statistics. Uses median of N samples to filter noise, then + * uses those medians to calculate the average response time. + * The class makes an average of the durations between calls to query_started() + * and query_ended(). Once the stats are good, sync_time_reached(int max) returns true, + * based on the average containing at least max samples (or medians), or the time + * sync_duration (constructor arg) has passed since the last reset(). + */ +class ResponseStat +{ +public: + /* @param ignore_first_n - the first few queries tend to have more overhead + * @param n_filter_samples - collect num samples, use median + * @param num_synch_samples - this many medians before the average should be synced, or + * @param sync_duration - this much time between syncs. + */ + ResponseStat(int ignore_first_n = 5, int num_filter_samples = 3, + maxbase::Duration sync_duration = std::chrono::seconds(5)); + void query_started(); + void query_ended(); // ok to call without a query_started + bool is_valid() const; + int num_samples() const; + maxbase::Duration average() const; + bool sync_time_reached(int num_synch_medians); // is it time to apply the average? + void reset(); +private: + int m_ignore_first_n; + const int m_num_filter_samples; + const maxbase::Duration m_sync_duration; + int m_sample_count; + std::vector m_samples; // N sampels from which median is used + maxbase::CumulativeAverage m_average; + maxbase::TimePoint m_last_start; + maxbase::TimePoint m_next_sync; +}; +} diff --git a/server/modules/routing/readwritesplit/rwbackend.cc b/server/modules/routing/readwritesplit/rwbackend.cc index 5cc6575db..f778e4746 100644 --- a/server/modules/routing/readwritesplit/rwbackend.cc +++ b/server/modules/routing/readwritesplit/rwbackend.cc @@ -230,6 +230,11 @@ bool RWBackend::reply_is_complete(GWBUF *buffer) return get_reply_state() == REPLY_STATE_DONE; } +ResponseStat &RWBackend::response_stat() +{ + return m_response_stat; +} + SRWBackendList RWBackend::from_servers(SERVER_REF* servers) { SRWBackendList backends; diff --git a/server/modules/routing/readwritesplit/rwbackend.hh b/server/modules/routing/readwritesplit/rwbackend.hh index 797c097ce..6d79434da 100644 --- a/server/modules/routing/readwritesplit/rwbackend.hh +++ b/server/modules/routing/readwritesplit/rwbackend.hh @@ -12,7 +12,7 @@ */ #pragma once -#include +#include "response_stat.hh" #include #include @@ -112,6 +112,8 @@ public: bool reply_is_complete(GWBUF *buffer); + // Controlled by the session + ResponseStat& response_stat(); private: reply_state_t m_reply_state; BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */ @@ -120,6 +122,7 @@ private: bool m_opening_cursor; /**< Whether we are opening a cursor */ uint32_t m_expected_rows; /**< Number of rows a COM_STMT_FETCH is retrieving */ bool m_local_infile_requested; /**< Whether a LOCAL INFILE was requested */ + ResponseStat m_response_stat; inline bool is_opening_cursor() const { diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index d402df90b..c81c9ecf7 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -294,11 +294,15 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf) { succp = true; - if (m_config.retry_failed_reads && - (command == MXS_COM_QUERY || command == MXS_COM_STMT_EXECUTE)) + bool is_sql = command == MXS_COM_QUERY || command == MXS_COM_STMT_EXECUTE; + if (is_sql) { - // Only commands that can contain an SQL statement should be stored - store_stmt = true; + target->response_stat().query_started(); + + if (m_config.retry_failed_reads) + { + store_stmt = true; + } } } } diff --git a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc index d455c8c1e..c2d94528e 100644 --- a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc +++ b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc @@ -18,7 +18,9 @@ #include #include #include +#include +#include #include using namespace maxscale; @@ -179,6 +181,34 @@ static int backend_cmp_current_load(const SRWBackend& a, const SRWBackend& b) ((1000 + 1000 * second->server->stats.n_current_ops) / second->weight); } + +/** nantti. TODO. TEMP, this needs to see all eligible servers at the same time. + */ +static int backend_cmp_response_time(const SRWBackend& a, const SRWBackend& b) +{ + // Minimum average response time for use in selection. Avoids special cases (zero), + // and new servers immediately get some traffic. + constexpr double min_average = 100.0/1000000000; // 100 nano seconds + + // Invert the response times. + double lhs = 1/std::max(min_average, a->backend()->server->response_time->average()); + double rhs = 1/std::max(min_average, b->backend()->server->response_time->average()); + + // Clamp values to a range where the slowest is at least some fraction of the speed of the + // fastest. This allows sampling of slaves that have experienced anomalies. Also, if one + // slave is really slow compared to another, something is wrong and perhaps we should + // log something informational. + constexpr int clamp = 20; + double fastest = std::max(lhs, rhs); + lhs = std::max(lhs, fastest / clamp); + rhs = std::max(rhs, fastest / clamp); + + // If random numbers are too slow to generate, an array of, say 500'000 + // random numbers in the range [0.0, 1.0] could be generated during startup. + double r = rand() / static_cast(RAND_MAX); + return (r < (lhs / (lhs + rhs))) ? -1 : 1; +} + /** * The order of functions _must_ match with the order the select criteria are * listed in select_criteria_t definition in readwritesplit.h @@ -189,7 +219,8 @@ int (*criteria_cmpfun[LAST_CRITERIA])(const SRWBackend&, const SRWBackend&) = backend_cmp_global_conn, backend_cmp_router_conn, backend_cmp_behind_master, - backend_cmp_current_load + backend_cmp_current_load, + backend_cmp_response_time }; /** @@ -234,6 +265,17 @@ static void log_server_connections(select_criteria_t criteria, const SRWBackendL b->server->port, STRSRVSTATUS(b->server)); break; + case LOWEST_RESPONSE_TIME: + { + maxbase::Duration response_ave(b->server->response_time->average()); + std::ostringstream os; + os << response_ave; + MXS_INFO("Average response time : %s from \t[%s]:%d %s", + os.str().c_str(), b->server->address, + b->server->port, STRSRVSTATUS(b->server)); + } + break; + default: mxb_assert(!true); break; diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 6254d6ab0..4c20ae215 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -97,6 +97,18 @@ void RWSplitSession::close() { close_all_connections(m_backends); m_current_query.reset(); + + for (auto& backend : m_backends) + { + const ResponseStat& stat = backend->response_stat(); + if (stat.is_valid()) + { + server_add_response_average(backend->server(), + stat.average().secs(), stat.num_samples()); + } + backend->response_stat().reset(); + } + } int32_t RWSplitSession::routeQuery(GWBUF* querybuf) @@ -613,6 +625,16 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) m_can_replay_trx = true; } + + ResponseStat& stat = backend->response_stat(); + stat.query_ended(); + if (stat.is_valid() && stat.sync_time_reached(500)) // nantti, TODO + { + server_add_response_average(backend->server(), + stat.average().secs(), stat.num_samples()); + stat.reset(); + } + if (backend->in_use() && backend->has_session_commands()) { // Backend is still in use and has more session commands to execute