Split readwritesplit sources into separate parts

The common readwritesplit header was split into three distinct parts; the
instance, session and prepared statement headers. The definitions of any
members were moved to .cc files away from the headers.

The RWSplitSession, with its RWBackend class, is declared in the
rwsplitsession.hh header with all relevant definitions in
rwsplitsession.cc.

The PSManager class and all prepared statement related functions are now
located in the rwsplit_ps.hh header.

The old readwritesplit.hh header now contains the instance level
structures and all common classes used by the router. The
rwsplit_internal.hh header could be absorbed into the three newly created
headers with new headers for distinct parts of the router.
This commit is contained in:
Markus Mäkelä
2017-06-25 20:49:45 +03:00
parent 013b081b9e
commit a9a1291703
9 changed files with 359 additions and 268 deletions

View File

@ -1,5 +1,6 @@
add_library(readwritesplit SHARED
readwritesplit.cc
rwsplitsession.cc
rwsplit_mysql.cc
rwsplit_route_stmt.cc
rwsplit_select_backends.cc

View File

@ -12,7 +12,6 @@
*/
#include "readwritesplit.hh"
#include "rwsplit_internal.hh"
#include <inttypes.h>
#include <stdio.h>
@ -32,6 +31,9 @@
#include <maxscale/modutil.h>
#include <maxscale/alloc.h>
#include "rwsplit_internal.hh"
#include "rwsplitsession.hh"
/**
* The entry points for the read/write query splitting router module.
*

View File

@ -272,231 +272,3 @@ static inline const char* failure_mode_to_str(enum failure_mode type)
return "UNDEFINED_MODE";
}
}
/**
* The following code is client session specific, to be moved into a separate file
*/
/** Enum for tracking client reply state */
enum reply_state_t
{
REPLY_STATE_START, /**< Query sent to backend */
REPLY_STATE_DONE, /**< Complete reply received */
REPLY_STATE_RSET_COLDEF, /**< Resultset response, waiting for column definitions */
REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */
};
/** Reply state change debug logging */
#define LOG_RS(a, b) MXS_DEBUG("%s %s -> %s", (a)->uri(), \
rstostr((a)->get_reply_state()), rstostr(b));
static inline bool is_ps_command(uint8_t cmd)
{
return cmd == MYSQL_COM_STMT_EXECUTE ||
cmd == MYSQL_COM_STMT_SEND_LONG_DATA ||
cmd == MYSQL_COM_STMT_CLOSE ||
cmd == MYSQL_COM_STMT_FETCH ||
cmd == MYSQL_COM_STMT_RESET;
}
typedef std::map<uint32_t, uint32_t> BackendHandleMap; /** Internal ID to external ID */
typedef std::map<uint32_t, uint32_t> ClientHandleMap; /** External ID to internal ID */
class RWBackend: public mxs::Backend
{
RWBackend(const RWBackend&);
RWBackend& operator=(const RWBackend&);
public:
RWBackend(SERVER_REF* ref):
mxs::Backend(ref),
m_reply_state(REPLY_STATE_DONE)
{
}
~RWBackend()
{
}
reply_state_t get_reply_state() const
{
return m_reply_state;
}
void set_reply_state(reply_state_t state)
{
m_reply_state = state;
}
bool execute_session_command()
{
bool expect_response = mxs_mysql_command_will_respond(next_session_command()->get_command());
bool rval = mxs::Backend::execute_session_command();
if (rval && expect_response)
{
set_reply_state(REPLY_STATE_START);
}
return rval;
}
void add_ps_handle(uint32_t id, uint32_t handle)
{
m_ps_handles[id] = handle;
MXS_INFO("PS response for %s: %u -> %u", name(), id, handle);
}
uint32_t get_ps_handle(uint32_t id) const
{
BackendHandleMap::const_iterator it = m_ps_handles.find(id);
if (it != m_ps_handles.end())
{
return it->second;
}
return 0;
}
bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE)
{
uint8_t cmd = mxs_mysql_get_command(buffer);
if (is_ps_command(cmd))
{
uint32_t id = mxs_mysql_extract_ps_id(buffer);
BackendHandleMap::iterator it = m_ps_handles.find(id);
if (it != m_ps_handles.end())
{
/** Replace the client handle with the real PS handle */
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
gw_mysql_set_byte4(ptr, it->second);
}
}
return mxs::Backend::write(buffer);
}
private:
reply_state_t m_reply_state;
BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */
};
/** Prepared statement ID to type maps for text protocols */
typedef std::tr1::unordered_map<uint32_t, uint32_t> BinaryPSMap;
typedef std::tr1::unordered_map<std::string, uint32_t> TextPSMap;
class PSManager
{
PSManager(const PSManager&);
PSManager& operator =(const PSManager&);
public:
PSManager();
~PSManager();
/**
* @brief Store and process a prepared statement
*
* @param buffer Buffer containing either a text or a binary protocol
* prepared statement
* @param id The unique ID for this statement
*/
void store(GWBUF* buffer, uint32_t id);
/**
* @brief Get the type of a stored prepared statement
*
* @param id The unique identifier for the prepared statement or the plaintext
* name of the prepared statement
*
* @return The type of the prepared statement
*/
uint32_t get_type(uint32_t id) const;
uint32_t get_type(std::string id) const;
/**
* @brief Remove a prepared statement
*
* @param id Statement identifier to remove
*/
void erase(std::string id);
void erase(uint32_t id);
private:
BinaryPSMap m_binary_ps;
TextPSMap m_text_ps;
};
typedef std::tr1::shared_ptr<RWBackend> SRWBackend;
typedef std::list<SRWBackend> SRWBackendList;
typedef std::tr1::unordered_set<std::string> TableSet;
typedef std::map<uint64_t, uint8_t> ResponseMap;
/** Map of COM_STMT_EXECUTE targets by internal ID */
typedef std::tr1::unordered_map<uint32_t, SRWBackend> ExecMap;
/**
* The client session of a RWSplit instance
*/
class RWSplitSession
{
RWSplitSession(const RWSplitSession&);
RWSplitSession& operator=(const RWSplitSession&);
public:
RWSplitSession(const Config& config);
// TODO: Make member variables private
skygw_chk_t rses_chk_top;
bool rses_closed; /**< true when closeSession is called */
SRWBackendList backends; /**< List of backend servers */
SRWBackend current_master; /**< Current master server */
SRWBackend target_node; /**< The currently locked target node */
Config rses_config; /**< copied config info from router instance */
int rses_nbackends;
enum ld_state load_data_state; /**< Current load data state */
bool have_tmp_tables;
uint64_t rses_load_data_sent; /**< How much data has been sent */
DCB* client_dcb;
uint64_t sescmd_count;
int expected_responses; /**< Number of expected responses to the current query */
GWBUF* query_queue; /**< Queued commands waiting to be executed */
RWSplit* router; /**< The router instance */
TableSet temp_tables; /**< Set of temporary tables */
mxs::SessionCommandList sescmd_list; /**< List of executed session commands */
ResponseMap sescmd_responses; /**< Response to each session command */
uint64_t sent_sescmd; /**< ID of the last sent session command*/
uint64_t recv_sescmd; /**< ID of the most recently completed session command */
PSManager ps_manager; /**< Prepared statement manager*/
ClientHandleMap ps_handles; /**< Client PS handle to internal ID mapping */
ExecMap exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */
skygw_chk_t rses_chk_tail;
};
/**
* Helper function to convert reply_state_t to string
*/
static inline const char* rstostr(reply_state_t state)
{
switch (state)
{
case REPLY_STATE_START:
return "REPLY_STATE_START";
case REPLY_STATE_DONE:
return "REPLY_STATE_DONE";
case REPLY_STATE_RSET_COLDEF:
return "REPLY_STATE_RSET_COLDEF";
case REPLY_STATE_RSET_ROWS:
return "REPLY_STATE_RSET_ROWS";
}
ss_dassert(false);
return "UNKNOWN";
}

View File

@ -12,14 +12,14 @@
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include "readwritesplit.hh"
#include <string>
#include <maxscale/query_classifier.h>
#include <maxscale/protocol/mysql.h>
#include "readwritesplit.hh"
#include "rwsplitsession.hh"
#define RW_CHK_DCB(b, d) \
do{ \
@ -31,6 +31,15 @@ do{ \
#define RW_CLOSE_BREF(b) do{ if (b){ (b)->closed_at = __LINE__; } } while (false)
static inline bool is_ps_command(uint8_t cmd)
{
return cmd == MYSQL_COM_STMT_EXECUTE ||
cmd == MYSQL_COM_STMT_SEND_LONG_DATA ||
cmd == MYSQL_COM_STMT_CLOSE ||
cmd == MYSQL_COM_STMT_FETCH ||
cmd == MYSQL_COM_STMT_RESET;
}
/*
* The following are implemented in rwsplit_mysql.c
*/
@ -113,12 +122,3 @@ bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type);
uint32_t determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet);
void close_all_connections(RWSplitSession* rses);
/**
* @brief Extract text identifier of a PREPARE or EXECUTE statement
*
* @param buffer Buffer containing a PREPARE or EXECUTE command
*
* @return The string identifier of the statement
*/
std::string extract_text_ps_id(GWBUF* buffer);

View File

@ -17,7 +17,9 @@
#include <maxscale/query_classifier.h>
#include <maxscale/protocol/mysql.h>
static uint32_t get_prepare_type(GWBUF* buffer)
#include "rwsplit_internal.hh"
uint32_t get_prepare_type(GWBUF* buffer)
{
uint32_t type;
@ -56,7 +58,7 @@ static uint32_t get_prepare_type(GWBUF* buffer)
return type;
}
std::string extract_text_ps_id(GWBUF* buffer)
std::string get_text_ps_id(GWBUF* buffer)
{
std::string rval;
char* name = qc_get_prepare_name(buffer);
@ -70,6 +72,12 @@ std::string extract_text_ps_id(GWBUF* buffer)
return rval;
}
void replace_binary_ps_id(GWBUF* buffer, uint32_t id)
{
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
gw_mysql_set_byte4(ptr, id);
}
PSManager::PSManager()
{
}
@ -138,7 +146,7 @@ void PSManager::store(GWBUF* buffer, uint32_t id)
switch (mxs_mysql_get_command(buffer))
{
case MYSQL_COM_QUERY:
m_text_ps[extract_text_ps_id(buffer)] = get_prepare_type(buffer);
m_text_ps[get_text_ps_id(buffer)] = get_prepare_type(buffer);
break;
case MYSQL_COM_STMT_PREPARE:

View File

@ -0,0 +1,91 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "readwritesplit.hh"
#include <tr1/unordered_map>
#include <string>
/** Prepared statement ID to type maps for text protocols */
typedef std::tr1::unordered_map<uint32_t, uint32_t> BinaryPSMap;
typedef std::tr1::unordered_map<std::string, uint32_t> TextPSMap;
/** Class for tracking prepared statement types by PS statement ID */
class PSManager
{
PSManager(const PSManager&);
PSManager& operator =(const PSManager&);
public:
PSManager();
~PSManager();
/**
* @brief Store and process a prepared statement
*
* @param buffer Buffer containing either a text or a binary protocol
* prepared statement
* @param id The unique ID for this statement
*/
void store(GWBUF* buffer, uint32_t id);
/**
* @brief Get the type of a stored prepared statement
*
* @param id The unique identifier for the prepared statement or the plaintext
* name of the prepared statement
*
* @return The type of the prepared statement
*/
uint32_t get_type(uint32_t id) const;
uint32_t get_type(std::string id) const;
/**
* @brief Remove a prepared statement
*
* @param id Statement identifier to remove
*/
void erase(std::string id);
void erase(uint32_t id);
private:
BinaryPSMap m_binary_ps;
TextPSMap m_text_ps;
};
/**
* @brief Get the type of a prepared statement
*
* @param buffer Buffer containing either a text or a binary prepared statement
*
* @return The type of the prepared statement
*/
uint32_t get_prepare_type(GWBUF* buffer);
/**
* @brief Extract text identifier of a PREPARE or EXECUTE statement
*
* @param buffer Buffer containing a PREPARE or EXECUTE command
*
* @return The string identifier of the statement
*/
std::string get_text_ps_id(GWBUF* buffer);
/**
* @brief Replace the ID of a binary protocol statement
*
* @param buffer Buffer containing a binary protocol statement with an ID
* @param id ID to insert into the buffer
*/
void replace_binary_ps_id(GWBUF* buffer, uint32_t id);

View File

@ -91,28 +91,6 @@ void handle_connection_keepalive(RWSplit *inst, RWSplitSession *rses,
ss_dassert(nserv < rses->rses_nbackends);
}
uint32_t get_stmt_id(RWSplitSession* rses, GWBUF* buffer)
{
uint32_t rval = 0;
// All COM_STMT type statements store the ID in the same place
uint32_t id = mxs_mysql_extract_ps_id(buffer);
ClientHandleMap::iterator it = rses->ps_handles.find(id);
if (it != rses->ps_handles.end())
{
rval = it->second;
}
return rval;
}
void replace_stmt_id(GWBUF* buffer, uint32_t id)
{
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
gw_mysql_set_byte4(ptr, id);
}
/**
* Routing function. Find out query type, backend type, and target DCB(s).
* Then route query to found target(s).
@ -166,14 +144,14 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses,
if (command == MYSQL_COM_QUERY &&
qc_get_operation(querybuf) == QUERY_OP_EXECUTE)
{
std::string id = extract_text_ps_id(querybuf);
std::string id = get_text_ps_id(querybuf);
qtype = rses->ps_manager.get_type(id);
}
else if (is_ps_command(command))
{
stmt_id = get_stmt_id(rses, querybuf);
stmt_id = get_internal_ps_id(rses, querybuf);
qtype = rses->ps_manager.get_type(stmt_id);
replace_stmt_id(querybuf, stmt_id);
replace_binary_ps_id(querybuf, stmt_id);
}
route_target = get_route_target(rses, command, qtype, querybuf->hint);

View File

@ -0,0 +1,102 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "rwsplitsession.hh"
#include "rwsplit_internal.hh"
RWBackend::RWBackend(SERVER_REF* ref):
mxs::Backend(ref),
m_reply_state(REPLY_STATE_DONE)
{
}
RWBackend::~RWBackend()
{
}
reply_state_t RWBackend::get_reply_state() const
{
return m_reply_state;
}
void RWBackend::set_reply_state(reply_state_t state)
{
m_reply_state = state;
}
bool RWBackend::execute_session_command()
{
bool expect_response = mxs_mysql_command_will_respond(next_session_command()->get_command());
bool rval = mxs::Backend::execute_session_command();
if (rval && expect_response)
{
set_reply_state(REPLY_STATE_START);
}
return rval;
}
void RWBackend::add_ps_handle(uint32_t id, uint32_t handle)
{
m_ps_handles[id] = handle;
MXS_INFO("PS response for %s: %u -> %u", name(), id, handle);
}
uint32_t RWBackend::get_ps_handle(uint32_t id) const
{
BackendHandleMap::const_iterator it = m_ps_handles.find(id);
if (it != m_ps_handles.end())
{
return it->second;
}
return 0;
}
bool RWBackend::write(GWBUF* buffer, response_type type)
{
uint8_t cmd = mxs_mysql_get_command(buffer);
if (is_ps_command(cmd))
{
uint32_t id = mxs_mysql_extract_ps_id(buffer);
BackendHandleMap::iterator it = m_ps_handles.find(id);
if (it != m_ps_handles.end())
{
/** Replace the client handle with the real PS handle */
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
gw_mysql_set_byte4(ptr, it->second);
}
}
return mxs::Backend::write(buffer);
}
uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer)
{
uint32_t rval = 0;
// All COM_STMT type statements store the ID in the same place
uint32_t id = mxs_mysql_extract_ps_id(buffer);
ClientHandleMap::iterator it = rses->ps_handles.find(id);
if (it != rses->ps_handles.end())
{
rval = it->second;
}
return rval;
}

View File

@ -0,0 +1,137 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "readwritesplit.hh"
#include "rwsplit_ps.hh"
/** Enum for tracking client reply state */
enum reply_state_t
{
REPLY_STATE_START, /**< Query sent to backend */
REPLY_STATE_DONE, /**< Complete reply received */
REPLY_STATE_RSET_COLDEF, /**< Resultset response, waiting for column definitions */
REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */
};
/** Reply state change debug logging */
#define LOG_RS(a, b) MXS_DEBUG("%s %s -> %s", (a)->uri(), \
rstostr((a)->get_reply_state()), rstostr(b));
typedef std::map<uint32_t, uint32_t> BackendHandleMap; /** Internal ID to external ID */
typedef std::map<uint32_t, uint32_t> ClientHandleMap; /** External ID to internal ID */
class RWBackend: public mxs::Backend
{
RWBackend(const RWBackend&);
RWBackend& operator=(const RWBackend&);
public:
RWBackend(SERVER_REF* ref);
~RWBackend();
reply_state_t get_reply_state() const;
void set_reply_state(reply_state_t state);
void add_ps_handle(uint32_t id, uint32_t handle);
uint32_t get_ps_handle(uint32_t id) const;
bool execute_session_command();
bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE);
private:
reply_state_t m_reply_state;
BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */
};
typedef std::tr1::shared_ptr<RWBackend> SRWBackend;
typedef std::list<SRWBackend> SRWBackendList;
typedef std::tr1::unordered_set<std::string> TableSet;
typedef std::map<uint64_t, uint8_t> ResponseMap;
/** Map of COM_STMT_EXECUTE targets by internal ID */
typedef std::tr1::unordered_map<uint32_t, SRWBackend> ExecMap;
/**
* The client session of a RWSplit instance
*/
class RWSplitSession
{
RWSplitSession(const RWSplitSession&);
RWSplitSession& operator=(const RWSplitSession&);
public:
RWSplitSession(const Config& config);
// TODO: Make member variables private
skygw_chk_t rses_chk_top;
bool rses_closed; /**< true when closeSession is called */
SRWBackendList backends; /**< List of backend servers */
SRWBackend current_master; /**< Current master server */
SRWBackend target_node; /**< The currently locked target node */
Config rses_config; /**< copied config info from router instance */
int rses_nbackends;
enum ld_state load_data_state; /**< Current load data state */
bool have_tmp_tables;
uint64_t rses_load_data_sent; /**< How much data has been sent */
DCB* client_dcb;
uint64_t sescmd_count;
int expected_responses; /**< Number of expected responses to the current query */
GWBUF* query_queue; /**< Queued commands waiting to be executed */
RWSplit* router; /**< The router instance */
TableSet temp_tables; /**< Set of temporary tables */
mxs::SessionCommandList sescmd_list; /**< List of executed session commands */
ResponseMap sescmd_responses; /**< Response to each session command */
uint64_t sent_sescmd; /**< ID of the last sent session command*/
uint64_t recv_sescmd; /**< ID of the most recently completed session command */
PSManager ps_manager; /**< Prepared statement manager*/
ClientHandleMap ps_handles; /**< Client PS handle to internal ID mapping */
ExecMap exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */
skygw_chk_t rses_chk_tail;
};
/**
* Helper function to convert reply_state_t to string
*/
static inline const char* rstostr(reply_state_t state)
{
switch (state)
{
case REPLY_STATE_START:
return "REPLY_STATE_START";
case REPLY_STATE_DONE:
return "REPLY_STATE_DONE";
case REPLY_STATE_RSET_COLDEF:
return "REPLY_STATE_RSET_COLDEF";
case REPLY_STATE_RSET_ROWS:
return "REPLY_STATE_RSET_ROWS";
}
ss_dassert(false);
return "UNKNOWN";
}
/**
* @brief Get the internal ID for the given binary prepared statement
*
* @param rses Router client session
* @param buffer Buffer containing a binary protocol statement other than COM_STMT_PREPARE
*
* @return The internal ID of the prepared statement that the buffer contents refer to
*/
uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer);