MXS-2068: Move RWBackend into mysqlcommon
This cleanly allows multiple modules to use it.
This commit is contained in:
@ -1,4 +1,4 @@
|
||||
add_library(cat SHARED cat.cc catsession.cc ../readwritesplit/rwbackend.cc ../readwritesplit/response_stat.cc)
|
||||
add_library(cat SHARED cat.cc catsession.cc)
|
||||
target_link_libraries(cat maxscale-common mysqlcommon)
|
||||
set_target_properties(cat PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs)
|
||||
install_module(cat experimental)
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
|
||||
#include "cat.hh"
|
||||
|
||||
#include "../readwritesplit/rwbackend.hh"
|
||||
#include <maxscale/protocol/rwbackend.hh>
|
||||
|
||||
class Cat;
|
||||
|
||||
|
||||
@ -5,8 +5,6 @@ rwsplit_mysql.cc
|
||||
rwsplit_route_stmt.cc
|
||||
rwsplit_select_backends.cc
|
||||
rwsplit_session_cmd.cc
|
||||
rwbackend.cc
|
||||
response_stat.cc
|
||||
)
|
||||
target_link_libraries(readwritesplit maxscale-common mysqlcommon)
|
||||
set_target_properties(readwritesplit PROPERTIES VERSION "1.0.2" LINK_FLAGS -Wl,-z,defs)
|
||||
|
||||
@ -35,8 +35,7 @@
|
||||
#include <maxscale/session_command.hh>
|
||||
#include <maxscale/protocol/mysql.h>
|
||||
#include <maxscale/routingworker.hh>
|
||||
|
||||
#include "rwbackend.hh"
|
||||
#include <maxscale/protocol/rwbackend.hh>
|
||||
|
||||
enum backend_type_t
|
||||
{
|
||||
|
||||
@ -1,103 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2022-01-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
#include "response_stat.hh"
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
namespace maxscale
|
||||
{
|
||||
ResponseStat::ResponseStat(int num_filter_samples,
|
||||
int num_synch_medians,
|
||||
maxbase::Duration sync_duration)
|
||||
: m_num_filter_samples {num_filter_samples}
|
||||
, m_num_synch_medians{num_synch_medians}
|
||||
, m_sync_duration{sync_duration}
|
||||
, m_sample_count{0}
|
||||
, m_samples(num_filter_samples)
|
||||
, m_last_start{maxbase::TimePoint()}
|
||||
, m_next_sync{maxbase::Clock::now() + sync_duration}
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
void ResponseStat::query_started()
|
||||
{
|
||||
m_last_start = maxbase::Clock::now();
|
||||
}
|
||||
|
||||
void ResponseStat::query_ended()
|
||||
{
|
||||
if (m_last_start == maxbase::TimePoint())
|
||||
{
|
||||
// m_last_start is defaulted. Ignore, avoids extra logic in call sites.
|
||||
return;
|
||||
}
|
||||
m_samples[m_sample_count] = maxbase::Clock::now() - m_last_start;
|
||||
|
||||
if (++m_sample_count == m_num_filter_samples)
|
||||
{
|
||||
std::sort(m_samples.begin(), m_samples.end());
|
||||
maxbase::Duration new_sample = m_samples[m_num_filter_samples / 2];
|
||||
m_average.add(std::chrono::duration<double>(new_sample).count());
|
||||
m_sample_count = 0;
|
||||
}
|
||||
m_last_start = maxbase::TimePoint();
|
||||
}
|
||||
|
||||
bool ResponseStat::make_valid()
|
||||
{
|
||||
if (!m_average.num_samples() && m_sample_count)
|
||||
{
|
||||
maxbase::Duration new_sample = m_samples[m_sample_count / 2];
|
||||
m_average.add(std::chrono::duration<double>(new_sample).count());
|
||||
m_sample_count = 0;
|
||||
}
|
||||
|
||||
return is_valid();
|
||||
}
|
||||
|
||||
bool ResponseStat::is_valid() const
|
||||
{
|
||||
return m_average.num_samples();
|
||||
}
|
||||
|
||||
int ResponseStat::num_samples() const
|
||||
{
|
||||
return m_average.num_samples();
|
||||
}
|
||||
|
||||
maxbase::Duration ResponseStat::average() const
|
||||
{
|
||||
return maxbase::Duration(m_average.average());
|
||||
}
|
||||
|
||||
bool ResponseStat::sync_time_reached()
|
||||
{
|
||||
auto now = maxbase::Clock::now();
|
||||
bool reached = m_next_sync < now
|
||||
|| m_average.num_samples() >= m_num_synch_medians;
|
||||
|
||||
if (reached)
|
||||
{
|
||||
m_next_sync = now + m_sync_duration;
|
||||
}
|
||||
return reached;
|
||||
}
|
||||
|
||||
void ResponseStat::reset()
|
||||
{
|
||||
m_sample_count = 0;
|
||||
m_average.reset();
|
||||
m_next_sync = maxbase::Clock::now() + m_sync_duration;
|
||||
}
|
||||
}
|
||||
@ -1,62 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2022-01-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <maxscale/ccdefs.hh>
|
||||
#include <maxbase/stopwatch.hh>
|
||||
#include <maxbase/average.hh>
|
||||
|
||||
/** This could arguably be a utility, but is written specifically for rwsplit
|
||||
* so it stays here at least for now.
|
||||
*/
|
||||
namespace maxscale
|
||||
{
|
||||
/**
|
||||
* Query response statistics. Uses median of N samples to filter noise, then
|
||||
* uses those medians to calculate the average response time.
|
||||
* The class makes an average of the durations between calls to query_started()
|
||||
* and query_ended(). Once the stats are good, sync_time_reached(int max) returns true,
|
||||
* based on the average containing at least max samples (or medians), or the time
|
||||
* sync_duration (constructor arg) has passed since the last reset().
|
||||
*/
|
||||
class ResponseStat
|
||||
{
|
||||
public:
|
||||
/*
|
||||
* @param num_filter_samples - collect num samples, use median
|
||||
* @param num_synch_medians - this many medians before the average should be synced, or
|
||||
* @param sync_duration - this much time between syncs.
|
||||
*/
|
||||
ResponseStat(int num_filter_samples = 5,
|
||||
int num_synch_medians = 500,
|
||||
maxbase::Duration sync_duration = std::chrono::seconds(5));
|
||||
|
||||
void query_started();
|
||||
void query_ended();// ok to call without a query_started
|
||||
bool make_valid(); // make valid even if there are too few filter_samples
|
||||
bool is_valid() const;
|
||||
int num_samples() const;
|
||||
maxbase::Duration average() const;
|
||||
bool sync_time_reached(); // is it time to apply the average?
|
||||
void reset();
|
||||
private:
|
||||
const int m_num_filter_samples;
|
||||
const int m_num_synch_medians;
|
||||
const maxbase::Duration m_sync_duration;
|
||||
int m_sample_count;
|
||||
std::vector<maxbase::Duration> m_samples; // N sampels from which median is used
|
||||
maxbase::CumulativeAverage m_average;
|
||||
maxbase::TimePoint m_last_start;
|
||||
maxbase::TimePoint m_next_sync;
|
||||
};
|
||||
}
|
||||
@ -1,252 +0,0 @@
|
||||
#include "rwbackend.hh"
|
||||
|
||||
#include <maxscale/modutil.h>
|
||||
#include <maxscale/protocol/mysql.h>
|
||||
#include <maxscale/log.h>
|
||||
|
||||
namespace maxscale
|
||||
{
|
||||
|
||||
RWBackend::RWBackend(SERVER_REF* ref)
|
||||
: mxs::Backend(ref)
|
||||
, m_reply_state(REPLY_STATE_DONE)
|
||||
, m_modutil_state{0}
|
||||
, m_command(0)
|
||||
, m_opening_cursor(false)
|
||||
, m_expected_rows(0)
|
||||
, m_local_infile_requested(false)
|
||||
{
|
||||
}
|
||||
|
||||
RWBackend::~RWBackend()
|
||||
{
|
||||
}
|
||||
|
||||
bool RWBackend::execute_session_command()
|
||||
{
|
||||
m_command = next_session_command()->get_command();
|
||||
bool expect_response = mxs_mysql_command_will_respond(m_command);
|
||||
bool rval = mxs::Backend::execute_session_command();
|
||||
|
||||
if (rval && expect_response)
|
||||
{
|
||||
set_reply_state(REPLY_STATE_START);
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
bool RWBackend::continue_session_command(GWBUF* buffer)
|
||||
{
|
||||
return Backend::write(buffer, NO_RESPONSE);
|
||||
}
|
||||
|
||||
void RWBackend::add_ps_handle(uint32_t id, uint32_t handle)
|
||||
{
|
||||
m_ps_handles[id] = handle;
|
||||
MXS_INFO("PS response for %s: %u -> %u", name(), id, handle);
|
||||
}
|
||||
|
||||
uint32_t RWBackend::get_ps_handle(uint32_t id) const
|
||||
{
|
||||
BackendHandleMap::const_iterator it = m_ps_handles.find(id);
|
||||
|
||||
if (it != m_ps_handles.end())
|
||||
{
|
||||
return it->second;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool RWBackend::write(GWBUF* buffer, response_type type)
|
||||
{
|
||||
uint8_t cmd = mxs_mysql_get_command(buffer);
|
||||
|
||||
m_command = cmd;
|
||||
|
||||
if (mxs_mysql_is_ps_command(cmd))
|
||||
{
|
||||
uint32_t id = mxs_mysql_extract_ps_id(buffer);
|
||||
BackendHandleMap::iterator it = m_ps_handles.find(id);
|
||||
|
||||
if (it != m_ps_handles.end())
|
||||
{
|
||||
/** Replace the client handle with the real PS handle */
|
||||
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
|
||||
gw_mysql_set_byte4(ptr, it->second);
|
||||
|
||||
if (cmd == MXS_COM_STMT_EXECUTE)
|
||||
{
|
||||
// Extract the flag byte after the statement ID
|
||||
uint8_t flags = 0;
|
||||
gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 1, &flags);
|
||||
|
||||
// Any non-zero flag value means that we have an open cursor
|
||||
m_opening_cursor = flags != 0;
|
||||
}
|
||||
else if (cmd == MXS_COM_STMT_FETCH)
|
||||
{
|
||||
// Number of rows to fetch is a 4 byte integer after the ID
|
||||
uint8_t buf[4];
|
||||
gwbuf_copy_data(buffer, MYSQL_PS_ID_OFFSET + MYSQL_PS_ID_SIZE, 4, buf);
|
||||
m_expected_rows = gw_mysql_get_byte4(buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return mxs::Backend::write(buffer, type);
|
||||
}
|
||||
|
||||
void RWBackend::close(close_type type)
|
||||
{
|
||||
m_reply_state = REPLY_STATE_DONE;
|
||||
mxs::Backend::close(type);
|
||||
}
|
||||
|
||||
bool RWBackend::consume_fetched_rows(GWBUF* buffer)
|
||||
{
|
||||
m_expected_rows -= modutil_count_packets(buffer);
|
||||
mxb_assert(m_expected_rows >= 0);
|
||||
return m_expected_rows == 0;
|
||||
}
|
||||
|
||||
static inline bool have_next_packet(GWBUF* buffer)
|
||||
{
|
||||
uint32_t len = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN;
|
||||
return gwbuf_length(buffer) > len;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Check if we have received a complete reply from the backend
|
||||
*
|
||||
* @param backend Backend reference
|
||||
* @param buffer Buffer containing the response
|
||||
*
|
||||
* @return True if the complete response has been received
|
||||
*/
|
||||
bool RWBackend::reply_is_complete(GWBUF* buffer)
|
||||
{
|
||||
if (current_command() == MXS_COM_STMT_FETCH)
|
||||
{
|
||||
bool more = false;
|
||||
int n_eof = modutil_count_signal_packets(buffer, 0, &more, &m_modutil_state);
|
||||
|
||||
// If the server responded with an error, n_eof > 0
|
||||
if (n_eof > 0 || consume_fetched_rows(buffer))
|
||||
{
|
||||
|
||||
set_reply_state(REPLY_STATE_DONE);
|
||||
}
|
||||
}
|
||||
else if (current_command() == MXS_COM_STATISTICS)
|
||||
{
|
||||
// COM_STATISTICS returns a single string and thus requires special handling
|
||||
set_reply_state(REPLY_STATE_DONE);
|
||||
}
|
||||
else if (get_reply_state() == REPLY_STATE_START
|
||||
&& (!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
|
||||
{
|
||||
m_local_infile_requested = false;
|
||||
|
||||
if (GWBUF_IS_COLLECTED_RESULT(buffer)
|
||||
|| current_command() == MXS_COM_STMT_PREPARE
|
||||
|| !mxs_mysql_is_ok_packet(buffer)
|
||||
|| !mxs_mysql_more_results_after_ok(buffer))
|
||||
{
|
||||
/** Not a result set, we have the complete response */
|
||||
set_reply_state(REPLY_STATE_DONE);
|
||||
|
||||
if (mxs_mysql_is_local_infile(buffer))
|
||||
{
|
||||
m_local_infile_requested = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// This is an OK packet and more results will follow
|
||||
mxb_assert(mxs_mysql_is_ok_packet(buffer)
|
||||
&& mxs_mysql_more_results_after_ok(buffer));
|
||||
|
||||
if (have_next_packet(buffer))
|
||||
{
|
||||
// TODO: Don't clone the buffer
|
||||
GWBUF* tmp = gwbuf_clone(buffer);
|
||||
tmp = gwbuf_consume(tmp, mxs_mysql_get_packet_len(tmp));
|
||||
bool rval = reply_is_complete(tmp);
|
||||
gwbuf_free(tmp);
|
||||
return rval;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
bool more = false;
|
||||
int n_old_eof = get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
|
||||
int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &m_modutil_state);
|
||||
|
||||
if (n_eof > 2)
|
||||
{
|
||||
/**
|
||||
* We have multiple results in the buffer, we only care about
|
||||
* the state of the last one. Skip the complete result sets and act
|
||||
* like we're processing a single result set.
|
||||
*/
|
||||
n_eof = n_eof % 2 ? 1 : 2;
|
||||
}
|
||||
|
||||
if (n_eof == 0)
|
||||
{
|
||||
/** Waiting for the EOF packet after the column definitions */
|
||||
set_reply_state(REPLY_STATE_RSET_COLDEF);
|
||||
}
|
||||
else if (n_eof == 1 && current_command() != MXS_COM_FIELD_LIST)
|
||||
{
|
||||
/** Waiting for the EOF packet after the rows */
|
||||
set_reply_state(REPLY_STATE_RSET_ROWS);
|
||||
|
||||
if (is_opening_cursor())
|
||||
{
|
||||
set_cursor_opened();
|
||||
MXS_INFO("Cursor successfully opened");
|
||||
set_reply_state(REPLY_STATE_DONE);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/** We either have a complete result set or a response to
|
||||
* a COM_FIELD_LIST command */
|
||||
mxb_assert(n_eof == 2 || (n_eof == 1 && current_command() == MXS_COM_FIELD_LIST));
|
||||
set_reply_state(REPLY_STATE_DONE);
|
||||
|
||||
if (more)
|
||||
{
|
||||
/** The server will send more resultsets */
|
||||
set_reply_state(REPLY_STATE_START);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return get_reply_state() == REPLY_STATE_DONE;
|
||||
}
|
||||
|
||||
ResponseStat& RWBackend::response_stat()
|
||||
{
|
||||
return m_response_stat;
|
||||
}
|
||||
|
||||
SRWBackendList RWBackend::from_servers(SERVER_REF* servers)
|
||||
{
|
||||
SRWBackendList backends;
|
||||
|
||||
for (SERVER_REF* ref = servers; ref; ref = ref->next)
|
||||
{
|
||||
if (ref->active)
|
||||
{
|
||||
backends.push_back(mxs::SRWBackend(new mxs::RWBackend(ref)));
|
||||
}
|
||||
}
|
||||
|
||||
return backends;
|
||||
}
|
||||
}
|
||||
@ -1,137 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2018 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2022-01-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include "response_stat.hh"
|
||||
|
||||
#include <map>
|
||||
#include <memory>
|
||||
|
||||
#include <maxscale/backend.hh>
|
||||
#include <maxscale/modutil.h>
|
||||
|
||||
namespace maxscale
|
||||
{
|
||||
|
||||
/** Enum for tracking client reply state */
|
||||
enum reply_state_t
|
||||
{
|
||||
REPLY_STATE_START, /**< Query sent to backend */
|
||||
REPLY_STATE_DONE, /**< Complete reply received */
|
||||
REPLY_STATE_RSET_COLDEF, /**< Resultset response, waiting for column definitions */
|
||||
REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */
|
||||
};
|
||||
|
||||
typedef std::map<uint32_t, uint32_t> BackendHandleMap; /** Internal ID to external ID */
|
||||
|
||||
class RWBackend;
|
||||
typedef std::shared_ptr<RWBackend> SRWBackend;
|
||||
typedef std::list<SRWBackend> SRWBackendList;
|
||||
|
||||
class RWBackend : public mxs::Backend
|
||||
{
|
||||
RWBackend(const RWBackend&);
|
||||
RWBackend& operator=(const RWBackend&);
|
||||
|
||||
public:
|
||||
|
||||
static SRWBackendList from_servers(SERVER_REF* servers);
|
||||
|
||||
RWBackend(SERVER_REF* ref);
|
||||
~RWBackend();
|
||||
|
||||
inline reply_state_t get_reply_state() const
|
||||
{
|
||||
return m_reply_state;
|
||||
}
|
||||
|
||||
inline void set_reply_state(reply_state_t state)
|
||||
{
|
||||
m_reply_state = state;
|
||||
}
|
||||
|
||||
void add_ps_handle(uint32_t id, uint32_t handle);
|
||||
uint32_t get_ps_handle(uint32_t id) const;
|
||||
|
||||
bool execute_session_command();
|
||||
bool continue_session_command(GWBUF* buffer);
|
||||
|
||||
/**
|
||||
* Write a query to the backend
|
||||
*
|
||||
* This function handles the replacement of the prepared statement IDs from
|
||||
* the internal ID to the server specific one. Trailing parts of large
|
||||
* packets should use RWBackend::continue_write.
|
||||
*
|
||||
* @param buffer Buffer to write
|
||||
* @param type Whether a response is expected
|
||||
*
|
||||
* @return True if writing was successful
|
||||
*/
|
||||
bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE);
|
||||
|
||||
/**
|
||||
* Continue a previously started write
|
||||
*
|
||||
* This should only be used when RWBackend::write has been called to start
|
||||
* a new query.
|
||||
*
|
||||
* @param buffer Buffer to write
|
||||
*
|
||||
* @return True if writing was successful
|
||||
*/
|
||||
bool continue_write(GWBUF* buffer)
|
||||
{
|
||||
return mxs::Backend::write(buffer, Backend::NO_RESPONSE);
|
||||
}
|
||||
|
||||
void close(close_type type = CLOSE_NORMAL);
|
||||
|
||||
// For COM_STMT_FETCH processing
|
||||
bool consume_fetched_rows(GWBUF* buffer);
|
||||
|
||||
inline uint8_t current_command() const
|
||||
{
|
||||
return m_command;
|
||||
}
|
||||
|
||||
bool local_infile_requested() const
|
||||
{
|
||||
return m_local_infile_requested;
|
||||
}
|
||||
|
||||
bool reply_is_complete(GWBUF* buffer);
|
||||
|
||||
// Controlled by the session
|
||||
ResponseStat& response_stat();
|
||||
private:
|
||||
reply_state_t m_reply_state;
|
||||
BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */
|
||||
modutil_state m_modutil_state; /**< @see modutil_count_signal_packets */
|
||||
uint8_t m_command;
|
||||
bool m_opening_cursor; /**< Whether we are opening a cursor */
|
||||
uint32_t m_expected_rows; /**< Number of rows a COM_STMT_FETCH is retrieving */
|
||||
bool m_local_infile_requested; /**< Whether a LOCAL INFILE was requested */
|
||||
ResponseStat m_response_stat;
|
||||
|
||||
inline bool is_opening_cursor() const
|
||||
{
|
||||
return m_opening_cursor;
|
||||
}
|
||||
|
||||
inline void set_cursor_opened()
|
||||
{
|
||||
m_opening_cursor = false;
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -13,7 +13,6 @@
|
||||
#pragma once
|
||||
|
||||
#include "readwritesplit.hh"
|
||||
#include "rwbackend.hh"
|
||||
#include "trx.hh"
|
||||
|
||||
#include <string>
|
||||
@ -21,6 +20,7 @@
|
||||
#include <maxscale/buffer.hh>
|
||||
#include <maxscale/modutil.h>
|
||||
#include <maxscale/queryclassifier.hh>
|
||||
#include <maxscale/protocol/rwbackend.hh>
|
||||
|
||||
#define TARGET_IS_MASTER(t) maxscale::QueryClassifier::target_is_master(t)
|
||||
#define TARGET_IS_SLAVE(t) maxscale::QueryClassifier::target_is_slave(t)
|
||||
|
||||
Reference in New Issue
Block a user