MXS-1777: Initial version of routing based on query response time.

The main piece of code, slave selection (backend_cmp_response_time), uses the available
method of pair-wise comparison of slaves. This will be changed to selection using all
available slaves, along with removal of hard coded values.
This commit is contained in:
Niclas Antti 2018-07-27 13:07:18 +03:00
parent 1e6509423a
commit 6351ab9c73
10 changed files with 250 additions and 10 deletions

View File

@ -29,6 +29,7 @@
#include <list>
#include <mutex>
#include <sstream>
#include <mutex>
#include <maxbase/stopwatch.hh>
@ -1508,9 +1509,15 @@ bool server_set_disk_space_threshold(SERVER *server, const char *disk_space_thre
return rv;
}
namespace
{
// Only need to prevent multiple writes, as long as only the average is read where
// needed (and not in combination with num_samples), which is by design.
std::mutex add_response_mutex;
}
void server_add_response_average(SERVER *server, double ave, int num_samples)
{
spinlock_acquire(&server->lock);
std::lock_guard<std::mutex> lock(add_response_mutex);
server->response_time->add(ave, num_samples);
spinlock_release(&server->lock);
}

View File

@ -1,4 +1,4 @@
add_library(cat SHARED cat.cc catsession.cc ../readwritesplit/rwbackend.cc)
add_library(cat SHARED cat.cc catsession.cc ../readwritesplit/rwbackend.cc ../readwritesplit/response_stat.cc)
target_link_libraries(cat maxscale-common mysqlcommon)
set_target_properties(cat PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs)
install_module(cat experimental)

View File

@ -5,7 +5,9 @@ rwsplit_mysql.cc
rwsplit_route_stmt.cc
rwsplit_select_backends.cc
rwsplit_session_cmd.cc
rwbackend.cc)
rwbackend.cc
response_stat.cc
)
target_link_libraries(readwritesplit maxscale-common mysqlcommon)
set_target_properties(readwritesplit PROPERTIES VERSION "1.0.2" LINK_FLAGS -Wl,-z,defs)
install_module(readwritesplit core)

View File

@ -0,0 +1,94 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2022-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "response_stat.hh"
#include <algorithm>
namespace maxscale
{
ResponseStat::ResponseStat(int ignore_first_n, int num_filter_samples,
maxbase::Duration sync_duration) :
m_ignore_first_n{ignore_first_n},
m_num_filter_samples {num_filter_samples},
m_sync_duration{sync_duration},
m_sample_count{0},
m_samples(num_filter_samples),
m_last_start{maxbase::TimePoint()},
m_next_sync{maxbase::Clock::now() + sync_duration}
{}
void ResponseStat::query_started()
{
if (m_ignore_first_n)
{
--m_ignore_first_n;
return;
}
m_last_start = maxbase::Clock::now();
}
void ResponseStat::query_ended()
{
if (m_last_start == maxbase::TimePoint())
{
// m_last_start is defaulted. Ignore, avoids extra logic in call sites.
return;
}
m_samples[m_sample_count] = maxbase::Clock::now() - m_last_start;
if (++m_sample_count == m_num_filter_samples)
{
std::sort(begin(m_samples), end(m_samples));
maxbase::Duration new_sample = m_samples[m_num_filter_samples / 2];
m_average.add(std::chrono::duration<double>(new_sample).count());
m_sample_count = 0;
}
m_last_start = maxbase::TimePoint();
}
bool ResponseStat::is_valid() const
{
return m_average.num_samples();
}
int ResponseStat::num_samples() const
{
return m_average.num_samples();
}
maxbase::Duration ResponseStat::average() const
{
return maxbase::Duration(m_average.average());
}
bool ResponseStat::sync_time_reached(int num_synch_medians)
{
auto now = maxbase::Clock::now();
bool reached = m_next_sync < now || m_average.num_samples() >= num_synch_medians;
if (reached)
{
m_next_sync = now + m_sync_duration;
}
return reached;
}
void ResponseStat::reset()
{
m_sample_count = 0;
m_average.reset();
m_next_sync = maxbase::Clock::now() + m_sync_duration;
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2022-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#pragma once
#include <maxscale/cppdefs.hh>
#include <maxbase/stopwatch.hh>
#include <maxbase/average.hh>
/** This could arguably be a utility, but is written specifically for rwsplit
* so it stays here at least for now.
*/
namespace maxscale
{
/**
* Query response statistics. Uses median of N samples to filter noise, then
* uses those medians to calculate the average response time.
* The class makes an average of the durations between calls to query_started()
* and query_ended(). Once the stats are good, sync_time_reached(int max) returns true,
* based on the average containing at least max samples (or medians), or the time
* sync_duration (constructor arg) has passed since the last reset().
*/
class ResponseStat
{
public:
/* @param ignore_first_n - the first few queries tend to have more overhead
* @param n_filter_samples - collect num samples, use median
* @param num_synch_samples - this many medians before the average should be synced, or
* @param sync_duration - this much time between syncs.
*/
ResponseStat(int ignore_first_n = 5, int num_filter_samples = 3,
maxbase::Duration sync_duration = std::chrono::seconds(5));
void query_started();
void query_ended(); // ok to call without a query_started
bool is_valid() const;
int num_samples() const;
maxbase::Duration average() const;
bool sync_time_reached(int num_synch_medians); // is it time to apply the average?
void reset();
private:
int m_ignore_first_n;
const int m_num_filter_samples;
const maxbase::Duration m_sync_duration;
int m_sample_count;
std::vector<maxbase::Duration> m_samples; // N sampels from which median is used
maxbase::CumulativeAverage m_average;
maxbase::TimePoint m_last_start;
maxbase::TimePoint m_next_sync;
};
}

View File

@ -230,6 +230,11 @@ bool RWBackend::reply_is_complete(GWBUF *buffer)
return get_reply_state() == REPLY_STATE_DONE;
}
ResponseStat &RWBackend::response_stat()
{
return m_response_stat;
}
SRWBackendList RWBackend::from_servers(SERVER_REF* servers)
{
SRWBackendList backends;

View File

@ -12,7 +12,7 @@
*/
#pragma once
#include <maxscale/ccdefs.hh>
#include "response_stat.hh"
#include <map>
#include <memory>
@ -112,6 +112,8 @@ public:
bool reply_is_complete(GWBUF *buffer);
// Controlled by the session
ResponseStat& response_stat();
private:
reply_state_t m_reply_state;
BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */
@ -120,6 +122,7 @@ private:
bool m_opening_cursor; /**< Whether we are opening a cursor */
uint32_t m_expected_rows; /**< Number of rows a COM_STMT_FETCH is retrieving */
bool m_local_infile_requested; /**< Whether a LOCAL INFILE was requested */
ResponseStat m_response_stat;
inline bool is_opening_cursor() const
{

View File

@ -294,11 +294,15 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
{
succp = true;
if (m_config.retry_failed_reads &&
(command == MXS_COM_QUERY || command == MXS_COM_STMT_EXECUTE))
bool is_sql = command == MXS_COM_QUERY || command == MXS_COM_STMT_EXECUTE;
if (is_sql)
{
// Only commands that can contain an SQL statement should be stored
store_stmt = true;
target->response_stat().query_started();
if (m_config.retry_failed_reads)
{
store_stmt = true;
}
}
}
}

View File

@ -18,7 +18,9 @@
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <sstream>
#include <maxbase/stopwatch.hh>
#include <maxscale/router.h>
using namespace maxscale;
@ -179,6 +181,34 @@ static int backend_cmp_current_load(const SRWBackend& a, const SRWBackend& b)
((1000 + 1000 * second->server->stats.n_current_ops) / second->weight);
}
/** nantti. TODO. TEMP, this needs to see all eligible servers at the same time.
*/
static int backend_cmp_response_time(const SRWBackend& a, const SRWBackend& b)
{
// Minimum average response time for use in selection. Avoids special cases (zero),
// and new servers immediately get some traffic.
constexpr double min_average = 100.0/1000000000; // 100 nano seconds
// Invert the response times.
double lhs = 1/std::max(min_average, a->backend()->server->response_time->average());
double rhs = 1/std::max(min_average, b->backend()->server->response_time->average());
// Clamp values to a range where the slowest is at least some fraction of the speed of the
// fastest. This allows sampling of slaves that have experienced anomalies. Also, if one
// slave is really slow compared to another, something is wrong and perhaps we should
// log something informational.
constexpr int clamp = 20;
double fastest = std::max(lhs, rhs);
lhs = std::max(lhs, fastest / clamp);
rhs = std::max(rhs, fastest / clamp);
// If random numbers are too slow to generate, an array of, say 500'000
// random numbers in the range [0.0, 1.0] could be generated during startup.
double r = rand() / static_cast<double>(RAND_MAX);
return (r < (lhs / (lhs + rhs))) ? -1 : 1;
}
/**
* The order of functions _must_ match with the order the select criteria are
* listed in select_criteria_t definition in readwritesplit.h
@ -189,7 +219,8 @@ int (*criteria_cmpfun[LAST_CRITERIA])(const SRWBackend&, const SRWBackend&) =
backend_cmp_global_conn,
backend_cmp_router_conn,
backend_cmp_behind_master,
backend_cmp_current_load
backend_cmp_current_load,
backend_cmp_response_time
};
/**
@ -234,6 +265,17 @@ static void log_server_connections(select_criteria_t criteria, const SRWBackendL
b->server->port, STRSRVSTATUS(b->server));
break;
case LOWEST_RESPONSE_TIME:
{
maxbase::Duration response_ave(b->server->response_time->average());
std::ostringstream os;
os << response_ave;
MXS_INFO("Average response time : %s from \t[%s]:%d %s",
os.str().c_str(), b->server->address,
b->server->port, STRSRVSTATUS(b->server));
}
break;
default:
mxb_assert(!true);
break;

View File

@ -97,6 +97,18 @@ void RWSplitSession::close()
{
close_all_connections(m_backends);
m_current_query.reset();
for (auto& backend : m_backends)
{
const ResponseStat& stat = backend->response_stat();
if (stat.is_valid())
{
server_add_response_average(backend->server(),
stat.average().secs(), stat.num_samples());
}
backend->response_stat().reset();
}
}
int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
@ -613,6 +625,16 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
m_can_replay_trx = true;
}
ResponseStat& stat = backend->response_stat();
stat.query_ended();
if (stat.is_valid() && stat.sync_time_reached(500)) // nantti, TODO
{
server_add_response_average(backend->server(),
stat.average().secs(), stat.num_samples());
stat.reset();
}
if (backend->in_use() && backend->has_session_commands())
{
// Backend is still in use and has more session commands to execute