Move RWBackend into a separate file
Moved the RWBackend class implementation into its own file. Made some of the command type functions a part of the <maxscale/protocol/mysql.h> header to make it reusable.
This commit is contained in:
@ -567,6 +567,15 @@ bool mxs_mysql_is_result_set(GWBUF *buffer);
|
|||||||
*/
|
*/
|
||||||
bool mxs_mysql_is_prep_stmt_ok(GWBUF *buffer);
|
bool mxs_mysql_is_prep_stmt_ok(GWBUF *buffer);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is this a binary protocol command
|
||||||
|
*
|
||||||
|
* @param cmd Command to check
|
||||||
|
*
|
||||||
|
* @return True if the command is a binary protocol command
|
||||||
|
*/
|
||||||
|
bool mxs_mysql_is_ps_command(uint8_t cmd);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Check if the OK packet is followed by another result
|
* @brief Check if the OK packet is followed by another result
|
||||||
*
|
*
|
||||||
|
@ -1576,6 +1576,15 @@ bool mxs_mysql_is_prep_stmt_ok(GWBUF *buffer)
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool mxs_mysql_is_ps_command(uint8_t cmd)
|
||||||
|
{
|
||||||
|
return cmd == MXS_COM_STMT_EXECUTE ||
|
||||||
|
cmd == MXS_COM_STMT_SEND_LONG_DATA ||
|
||||||
|
cmd == MXS_COM_STMT_CLOSE ||
|
||||||
|
cmd == MXS_COM_STMT_FETCH ||
|
||||||
|
cmd == MXS_COM_STMT_RESET;
|
||||||
|
}
|
||||||
|
|
||||||
bool mxs_mysql_more_results_after_ok(GWBUF *buffer)
|
bool mxs_mysql_more_results_after_ok(GWBUF *buffer)
|
||||||
{
|
{
|
||||||
bool rval = false;
|
bool rval = false;
|
||||||
|
@ -6,7 +6,8 @@ rwsplit_mysql.cc
|
|||||||
rwsplit_route_stmt.cc
|
rwsplit_route_stmt.cc
|
||||||
rwsplit_select_backends.cc
|
rwsplit_select_backends.cc
|
||||||
rwsplit_session_cmd.cc
|
rwsplit_session_cmd.cc
|
||||||
rwsplit_ps.cc)
|
rwsplit_ps.cc
|
||||||
|
rwbackend.cc)
|
||||||
target_link_libraries(readwritesplit maxscale-common mysqlcommon)
|
target_link_libraries(readwritesplit maxscale-common mysqlcommon)
|
||||||
set_target_properties(readwritesplit PROPERTIES VERSION "1.0.2")
|
set_target_properties(readwritesplit PROPERTIES VERSION "1.0.2")
|
||||||
install_module(readwritesplit core)
|
install_module(readwritesplit core)
|
||||||
|
@ -36,6 +36,8 @@
|
|||||||
#include "rwsplitsession.hh"
|
#include "rwsplitsession.hh"
|
||||||
#include "routeinfo.hh"
|
#include "routeinfo.hh"
|
||||||
|
|
||||||
|
using namespace maxscale;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The entry points for the read/write query splitting router module.
|
* The entry points for the read/write query splitting router module.
|
||||||
*
|
*
|
||||||
|
@ -737,7 +737,7 @@ route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer,
|
|||||||
std::string id = get_text_ps_id(buffer);
|
std::string id = get_text_ps_id(buffer);
|
||||||
*type = rses->ps_manager.get_type(id);
|
*type = rses->ps_manager.get_type(id);
|
||||||
}
|
}
|
||||||
else if (is_ps_command(*command))
|
else if (mxs_mysql_is_ps_command(*command))
|
||||||
{
|
{
|
||||||
*stmt_id = get_internal_ps_id(rses, buffer);
|
*stmt_id = get_internal_ps_id(rses, buffer);
|
||||||
*type = rses->ps_manager.get_type(*stmt_id);
|
*type = rses->ps_manager.get_type(*stmt_id);
|
||||||
|
166
server/modules/routing/readwritesplit/rwbackend.cc
Normal file
166
server/modules/routing/readwritesplit/rwbackend.cc
Normal file
@ -0,0 +1,166 @@
|
|||||||
|
#include "rwbackend.hh"
|
||||||
|
|
||||||
|
#include <maxscale/modutil.h>
|
||||||
|
#include <maxscale/protocol/mysql.h>
|
||||||
|
#include <maxscale/log_manager.h>
|
||||||
|
|
||||||
|
namespace maxscale
|
||||||
|
{
|
||||||
|
|
||||||
|
RWBackend::RWBackend(SERVER_REF* ref):
|
||||||
|
mxs::Backend(ref),
|
||||||
|
m_reply_state(REPLY_STATE_DONE),
|
||||||
|
m_large_packet(false)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
RWBackend::~RWBackend()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
bool RWBackend::execute_session_command()
|
||||||
|
{
|
||||||
|
bool expect_response = mxs_mysql_command_will_respond(next_session_command()->get_command());
|
||||||
|
bool rval = mxs::Backend::execute_session_command();
|
||||||
|
|
||||||
|
if (rval && expect_response)
|
||||||
|
{
|
||||||
|
set_reply_state(REPLY_STATE_START);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
void RWBackend::add_ps_handle(uint32_t id, uint32_t handle)
|
||||||
|
{
|
||||||
|
m_ps_handles[id] = handle;
|
||||||
|
MXS_INFO("PS response for %s: %u -> %u", name(), id, handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t RWBackend::get_ps_handle(uint32_t id) const
|
||||||
|
{
|
||||||
|
BackendHandleMap::const_iterator it = m_ps_handles.find(id);
|
||||||
|
|
||||||
|
if (it != m_ps_handles.end())
|
||||||
|
{
|
||||||
|
return it->second;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool RWBackend::write(GWBUF* buffer, response_type type)
|
||||||
|
{
|
||||||
|
uint8_t cmd = mxs_mysql_get_command(buffer);
|
||||||
|
|
||||||
|
m_command = cmd;
|
||||||
|
|
||||||
|
if (mxs_mysql_is_ps_command(cmd))
|
||||||
|
{
|
||||||
|
uint32_t id = mxs_mysql_extract_ps_id(buffer);
|
||||||
|
BackendHandleMap::iterator it = m_ps_handles.find(id);
|
||||||
|
|
||||||
|
if (it != m_ps_handles.end())
|
||||||
|
{
|
||||||
|
/** Replace the client handle with the real PS handle */
|
||||||
|
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
|
||||||
|
gw_mysql_set_byte4(ptr, it->second);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return mxs::Backend::write(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
void RWBackend::close(close_type type)
|
||||||
|
{
|
||||||
|
m_reply_state = REPLY_STATE_DONE;
|
||||||
|
mxs::Backend::close(type);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline bool have_next_packet(GWBUF* buffer)
|
||||||
|
{
|
||||||
|
uint32_t len = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN;
|
||||||
|
return gwbuf_length(buffer) > len;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Check if we have received a complete reply from the backend
|
||||||
|
*
|
||||||
|
* @param backend Backend reference
|
||||||
|
* @param buffer Buffer containing the response
|
||||||
|
*
|
||||||
|
* @return True if the complete response has been received
|
||||||
|
*/
|
||||||
|
bool RWBackend::reply_is_complete(GWBUF *buffer)
|
||||||
|
{
|
||||||
|
if (get_reply_state() == REPLY_STATE_START &&
|
||||||
|
(!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
|
||||||
|
{
|
||||||
|
if (GWBUF_IS_COLLECTED_RESULT(buffer) ||
|
||||||
|
current_command() == MXS_COM_STMT_PREPARE ||
|
||||||
|
!mxs_mysql_is_ok_packet(buffer) ||
|
||||||
|
!mxs_mysql_more_results_after_ok(buffer))
|
||||||
|
{
|
||||||
|
/** Not a result set, we have the complete response */
|
||||||
|
set_reply_state(REPLY_STATE_DONE);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// This is an OK packet and more results will follow
|
||||||
|
ss_dassert(mxs_mysql_is_ok_packet(buffer) &&
|
||||||
|
mxs_mysql_more_results_after_ok(buffer));
|
||||||
|
|
||||||
|
if (have_next_packet(buffer))
|
||||||
|
{
|
||||||
|
set_reply_state(REPLY_STATE_RSET_COLDEF);
|
||||||
|
return reply_is_complete(buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
bool more = false;
|
||||||
|
modutil_state state = {is_large_packet()};
|
||||||
|
int n_old_eof = get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
|
||||||
|
int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &state);
|
||||||
|
set_large_packet(state.state);
|
||||||
|
|
||||||
|
if (n_eof > 2)
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* We have multiple results in the buffer, we only care about
|
||||||
|
* the state of the last one. Skip the complete result sets and act
|
||||||
|
* like we're processing a single result set.
|
||||||
|
*/
|
||||||
|
n_eof = n_eof % 2 ? 1 : 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (n_eof == 0)
|
||||||
|
{
|
||||||
|
/** Waiting for the EOF packet after the column definitions */
|
||||||
|
set_reply_state(REPLY_STATE_RSET_COLDEF);
|
||||||
|
}
|
||||||
|
else if (n_eof == 1 && current_command() != MXS_COM_FIELD_LIST)
|
||||||
|
{
|
||||||
|
/** Waiting for the EOF packet after the rows */
|
||||||
|
set_reply_state(REPLY_STATE_RSET_ROWS);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/** We either have a complete result set or a response to
|
||||||
|
* a COM_FIELD_LIST command */
|
||||||
|
ss_dassert(n_eof == 2 || (n_eof == 1 && current_command() == MXS_COM_FIELD_LIST));
|
||||||
|
set_reply_state(REPLY_STATE_DONE);
|
||||||
|
|
||||||
|
if (more)
|
||||||
|
{
|
||||||
|
/** The server will send more resultsets */
|
||||||
|
set_reply_state(REPLY_STATE_START);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return get_reply_state() == REPLY_STATE_DONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
91
server/modules/routing/readwritesplit/rwbackend.hh
Normal file
91
server/modules/routing/readwritesplit/rwbackend.hh
Normal file
@ -0,0 +1,91 @@
|
|||||||
|
#pragma once
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2020-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <maxscale/cppdefs.hh>
|
||||||
|
|
||||||
|
#include <map>
|
||||||
|
#include <tr1/memory>
|
||||||
|
|
||||||
|
#include <maxscale/backend.hh>
|
||||||
|
|
||||||
|
namespace maxscale
|
||||||
|
{
|
||||||
|
|
||||||
|
/** Enum for tracking client reply state */
|
||||||
|
enum reply_state_t
|
||||||
|
{
|
||||||
|
REPLY_STATE_START, /**< Query sent to backend */
|
||||||
|
REPLY_STATE_DONE, /**< Complete reply received */
|
||||||
|
REPLY_STATE_RSET_COLDEF, /**< Resultset response, waiting for column definitions */
|
||||||
|
REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef std::map<uint32_t, uint32_t> BackendHandleMap; /** Internal ID to external ID */
|
||||||
|
|
||||||
|
class RWBackend: public mxs::Backend
|
||||||
|
{
|
||||||
|
RWBackend(const RWBackend&);
|
||||||
|
RWBackend& operator=(const RWBackend&);
|
||||||
|
|
||||||
|
public:
|
||||||
|
RWBackend(SERVER_REF* ref);
|
||||||
|
~RWBackend();
|
||||||
|
|
||||||
|
inline reply_state_t get_reply_state() const
|
||||||
|
{
|
||||||
|
return m_reply_state;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline void set_reply_state(reply_state_t state)
|
||||||
|
{
|
||||||
|
m_reply_state = state;
|
||||||
|
}
|
||||||
|
|
||||||
|
void add_ps_handle(uint32_t id, uint32_t handle);
|
||||||
|
uint32_t get_ps_handle(uint32_t id) const;
|
||||||
|
|
||||||
|
bool execute_session_command();
|
||||||
|
bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE);
|
||||||
|
void close(close_type type = CLOSE_NORMAL);
|
||||||
|
|
||||||
|
inline void set_large_packet(bool value)
|
||||||
|
{
|
||||||
|
m_large_packet = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline bool is_large_packet() const
|
||||||
|
{
|
||||||
|
return m_large_packet;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline uint8_t current_command() const
|
||||||
|
{
|
||||||
|
return m_command;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool reply_is_complete(GWBUF *buffer);
|
||||||
|
|
||||||
|
private:
|
||||||
|
reply_state_t m_reply_state;
|
||||||
|
BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */
|
||||||
|
bool m_large_packet; /**< Used to store the state of the EOF packet
|
||||||
|
*calculation for result sets when the result
|
||||||
|
* contains very large rows */
|
||||||
|
uint8_t m_command;
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef std::tr1::shared_ptr<RWBackend> SRWBackend;
|
||||||
|
typedef std::list<SRWBackend> SRWBackendList;
|
||||||
|
|
||||||
|
}
|
@ -34,15 +34,6 @@ do{ \
|
|||||||
|
|
||||||
#define RW_CLOSE_BREF(b) do{ if (b){ (b)->closed_at = __LINE__; } } while (false)
|
#define RW_CLOSE_BREF(b) do{ if (b){ (b)->closed_at = __LINE__; } } while (false)
|
||||||
|
|
||||||
static inline bool is_ps_command(uint8_t cmd)
|
|
||||||
{
|
|
||||||
return cmd == MXS_COM_STMT_EXECUTE ||
|
|
||||||
cmd == MXS_COM_STMT_SEND_LONG_DATA ||
|
|
||||||
cmd == MXS_COM_STMT_CLOSE ||
|
|
||||||
cmd == MXS_COM_STMT_FETCH ||
|
|
||||||
cmd == MXS_COM_STMT_RESET;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The following are implemented in rwsplit_mysql.c
|
* The following are implemented in rwsplit_mysql.c
|
||||||
*/
|
*/
|
||||||
@ -50,8 +41,8 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses,
|
|||||||
GWBUF *querybuf);
|
GWBUF *querybuf);
|
||||||
void closed_session_reply(GWBUF *querybuf);
|
void closed_session_reply(GWBUF *querybuf);
|
||||||
void print_error_packet(RWSplitSession *rses, GWBUF *buf, DCB *dcb);
|
void print_error_packet(RWSplitSession *rses, GWBUF *buf, DCB *dcb);
|
||||||
void check_session_command_reply(GWBUF *buffer, SRWBackend& backend);
|
void check_session_command_reply(GWBUF *buffer, mxs::SRWBackend& backend);
|
||||||
bool execute_sescmd_in_backend(SRWBackend& backend_ref);
|
bool execute_sescmd_in_backend(mxs::SRWBackend& backend_ref);
|
||||||
bool handle_target_is_all(route_target_t route_target,
|
bool handle_target_is_all(route_target_t route_target,
|
||||||
RWSplit *inst, RWSplitSession *rses,
|
RWSplit *inst, RWSplitSession *rses,
|
||||||
GWBUF *querybuf, int packet_type, uint32_t qtype);
|
GWBUF *querybuf, int packet_type, uint32_t qtype);
|
||||||
@ -69,20 +60,20 @@ int rses_get_max_replication_lag(RWSplitSession *rses);
|
|||||||
|
|
||||||
bool route_single_stmt(RWSplit *inst, RWSplitSession *rses,
|
bool route_single_stmt(RWSplit *inst, RWSplitSession *rses,
|
||||||
GWBUF *querybuf, const RouteInfo& info);
|
GWBUF *querybuf, const RouteInfo& info);
|
||||||
SRWBackend get_target_backend(RWSplitSession *rses, backend_type_t btype,
|
mxs::SRWBackend get_target_backend(RWSplitSession *rses, backend_type_t btype,
|
||||||
char *name, int max_rlag);
|
char *name, int max_rlag);
|
||||||
SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf,
|
mxs::SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf,
|
||||||
route_target_t route_target);
|
route_target_t route_target);
|
||||||
SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses,
|
mxs::SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses,
|
||||||
uint8_t cmd, uint32_t id);
|
uint8_t cmd, uint32_t id);
|
||||||
bool handle_master_is_target(RWSplit *inst, RWSplitSession *rses,
|
bool handle_master_is_target(RWSplit *inst, RWSplitSession *rses,
|
||||||
SRWBackend* dest);
|
mxs::SRWBackend* dest);
|
||||||
bool handle_got_target(RWSplit *inst, RWSplitSession *rses,
|
bool handle_got_target(RWSplit *inst, RWSplitSession *rses,
|
||||||
GWBUF *querybuf, SRWBackend& target, bool store);
|
GWBUF *querybuf, mxs::SRWBackend& target, bool store);
|
||||||
bool route_session_write(RWSplitSession *rses, GWBUF *querybuf,
|
bool route_session_write(RWSplitSession *rses, GWBUF *querybuf,
|
||||||
uint8_t command, uint32_t type);
|
uint8_t command, uint32_t type);
|
||||||
|
|
||||||
void process_sescmd_response(RWSplitSession* rses, SRWBackend& bref, GWBUF** ppPacket);
|
void process_sescmd_response(RWSplitSession* rses, mxs::SRWBackend& bref, GWBUF** ppPacket);
|
||||||
/*
|
/*
|
||||||
* The following are implemented in rwsplit_select_backends.c
|
* The following are implemented in rwsplit_select_backends.c
|
||||||
*/
|
*/
|
||||||
@ -95,12 +86,12 @@ enum connection_type
|
|||||||
};
|
};
|
||||||
|
|
||||||
bool select_connect_backend_servers(RWSplit *inst, MXS_SESSION *session,
|
bool select_connect_backend_servers(RWSplit *inst, MXS_SESSION *session,
|
||||||
SRWBackendList& backends,
|
mxs::SRWBackendList& backends,
|
||||||
SRWBackend& current_master,
|
mxs::SRWBackend& current_master,
|
||||||
mxs::SessionCommandList* sescmd,
|
mxs::SessionCommandList* sescmd,
|
||||||
int* expected_responses,
|
int* expected_responses,
|
||||||
connection_type type);
|
connection_type type);
|
||||||
SRWBackend get_root_master(const SRWBackendList& backends);
|
mxs::SRWBackend get_root_master(const mxs::SRWBackendList& backends);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get total slave count and connected slave count
|
* Get total slave count and connected slave count
|
||||||
@ -110,9 +101,9 @@ SRWBackend get_root_master(const SRWBackendList& backends);
|
|||||||
*
|
*
|
||||||
* @return Total number of slaves and number of slaves we are connected to
|
* @return Total number of slaves and number of slaves we are connected to
|
||||||
*/
|
*/
|
||||||
std::pair<int, int> get_slave_counts(SRWBackendList& backends, SRWBackend& master);
|
std::pair<int, int> get_slave_counts(mxs::SRWBackendList& backends, mxs::SRWBackend& master);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The following are implemented in rwsplit_tmp_table_multi.c
|
* The following are implemented in rwsplit_tmp_table_multi.c
|
||||||
*/
|
*/
|
||||||
void close_all_connections(SRWBackendList& backends);
|
void close_all_connections(mxs::SRWBackendList& backends);
|
||||||
|
@ -28,6 +28,8 @@
|
|||||||
#include "routeinfo.hh"
|
#include "routeinfo.hh"
|
||||||
#include "rwsplit_internal.hh"
|
#include "rwsplit_internal.hh"
|
||||||
|
|
||||||
|
using namespace maxscale;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The functions that support the routing of queries to back end
|
* The functions that support the routing of queries to back end
|
||||||
* servers. All the functions in this module are internal to the read
|
* servers. All the functions in this module are internal to the read
|
||||||
@ -150,7 +152,7 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses, GWBUF *querybuf, con
|
|||||||
route_target_t route_target = info.target;
|
route_target_t route_target = info.target;
|
||||||
bool not_locked_to_master = !locked_to_master(rses);
|
bool not_locked_to_master = !locked_to_master(rses);
|
||||||
|
|
||||||
if (not_locked_to_master && is_ps_command(command))
|
if (not_locked_to_master && mxs_mysql_is_ps_command(command))
|
||||||
{
|
{
|
||||||
/** Replace the client statement ID with our internal one only if the
|
/** Replace the client statement ID with our internal one only if the
|
||||||
* target node is not the current master */
|
* target node is not the current master */
|
||||||
|
@ -22,6 +22,8 @@
|
|||||||
|
|
||||||
#include <maxscale/router.h>
|
#include <maxscale/router.h>
|
||||||
|
|
||||||
|
using namespace maxscale;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The functions that implement back end selection for the read write
|
* The functions that implement back end selection for the read write
|
||||||
* split router. All of these functions are internal to that router and
|
* split router. All of these functions are internal to that router and
|
||||||
|
@ -22,6 +22,8 @@
|
|||||||
|
|
||||||
#include <maxscale/router.h>
|
#include <maxscale/router.h>
|
||||||
|
|
||||||
|
using namespace maxscale;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Functions for session command handling
|
* Functions for session command handling
|
||||||
*/
|
*/
|
||||||
|
@ -15,162 +15,6 @@
|
|||||||
#include "rwsplit_internal.hh"
|
#include "rwsplit_internal.hh"
|
||||||
#include "routeinfo.hh"
|
#include "routeinfo.hh"
|
||||||
|
|
||||||
RWBackend::RWBackend(SERVER_REF* ref):
|
|
||||||
mxs::Backend(ref),
|
|
||||||
m_reply_state(REPLY_STATE_DONE),
|
|
||||||
m_large_packet(false)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
RWBackend::~RWBackend()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
bool RWBackend::execute_session_command()
|
|
||||||
{
|
|
||||||
bool expect_response = mxs_mysql_command_will_respond(next_session_command()->get_command());
|
|
||||||
bool rval = mxs::Backend::execute_session_command();
|
|
||||||
|
|
||||||
if (rval && expect_response)
|
|
||||||
{
|
|
||||||
set_reply_state(REPLY_STATE_START);
|
|
||||||
}
|
|
||||||
|
|
||||||
return rval;
|
|
||||||
}
|
|
||||||
|
|
||||||
void RWBackend::add_ps_handle(uint32_t id, uint32_t handle)
|
|
||||||
{
|
|
||||||
m_ps_handles[id] = handle;
|
|
||||||
MXS_INFO("PS response for %s: %u -> %u", name(), id, handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t RWBackend::get_ps_handle(uint32_t id) const
|
|
||||||
{
|
|
||||||
BackendHandleMap::const_iterator it = m_ps_handles.find(id);
|
|
||||||
|
|
||||||
if (it != m_ps_handles.end())
|
|
||||||
{
|
|
||||||
return it->second;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool RWBackend::write(GWBUF* buffer, response_type type)
|
|
||||||
{
|
|
||||||
uint8_t cmd = mxs_mysql_get_command(buffer);
|
|
||||||
|
|
||||||
m_command = cmd;
|
|
||||||
|
|
||||||
if (is_ps_command(cmd))
|
|
||||||
{
|
|
||||||
uint32_t id = mxs_mysql_extract_ps_id(buffer);
|
|
||||||
BackendHandleMap::iterator it = m_ps_handles.find(id);
|
|
||||||
|
|
||||||
if (it != m_ps_handles.end())
|
|
||||||
{
|
|
||||||
/** Replace the client handle with the real PS handle */
|
|
||||||
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
|
|
||||||
gw_mysql_set_byte4(ptr, it->second);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return mxs::Backend::write(buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
void RWBackend::close(close_type type)
|
|
||||||
{
|
|
||||||
m_reply_state = REPLY_STATE_DONE;
|
|
||||||
mxs::Backend::close(type);
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline bool have_next_packet(GWBUF* buffer)
|
|
||||||
{
|
|
||||||
uint32_t len = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN;
|
|
||||||
return gwbuf_length(buffer) > len;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Check if we have received a complete reply from the backend
|
|
||||||
*
|
|
||||||
* @param backend Backend reference
|
|
||||||
* @param buffer Buffer containing the response
|
|
||||||
*
|
|
||||||
* @return True if the complete response has been received
|
|
||||||
*/
|
|
||||||
bool RWBackend::reply_is_complete(GWBUF *buffer)
|
|
||||||
{
|
|
||||||
if (get_reply_state() == REPLY_STATE_START &&
|
|
||||||
(!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
|
|
||||||
{
|
|
||||||
if (GWBUF_IS_COLLECTED_RESULT(buffer) ||
|
|
||||||
current_command() == MXS_COM_STMT_PREPARE ||
|
|
||||||
!mxs_mysql_is_ok_packet(buffer) ||
|
|
||||||
!mxs_mysql_more_results_after_ok(buffer))
|
|
||||||
{
|
|
||||||
/** Not a result set, we have the complete response */
|
|
||||||
set_reply_state(REPLY_STATE_DONE);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// This is an OK packet and more results will follow
|
|
||||||
ss_dassert(mxs_mysql_is_ok_packet(buffer) &&
|
|
||||||
mxs_mysql_more_results_after_ok(buffer));
|
|
||||||
|
|
||||||
if (have_next_packet(buffer))
|
|
||||||
{
|
|
||||||
set_reply_state(REPLY_STATE_RSET_COLDEF);
|
|
||||||
return reply_is_complete(buffer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
bool more = false;
|
|
||||||
modutil_state state = {is_large_packet()};
|
|
||||||
int n_old_eof = get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
|
|
||||||
int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &state);
|
|
||||||
set_large_packet(state.state);
|
|
||||||
|
|
||||||
if (n_eof > 2)
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* We have multiple results in the buffer, we only care about
|
|
||||||
* the state of the last one. Skip the complete result sets and act
|
|
||||||
* like we're processing a single result set.
|
|
||||||
*/
|
|
||||||
n_eof = n_eof % 2 ? 1 : 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (n_eof == 0)
|
|
||||||
{
|
|
||||||
/** Waiting for the EOF packet after the column definitions */
|
|
||||||
set_reply_state(REPLY_STATE_RSET_COLDEF);
|
|
||||||
}
|
|
||||||
else if (n_eof == 1 && current_command() != MXS_COM_FIELD_LIST)
|
|
||||||
{
|
|
||||||
/** Waiting for the EOF packet after the rows */
|
|
||||||
set_reply_state(REPLY_STATE_RSET_ROWS);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/** We either have a complete result set or a response to
|
|
||||||
* a COM_FIELD_LIST command */
|
|
||||||
ss_dassert(n_eof == 2 || (n_eof == 1 && current_command() == MXS_COM_FIELD_LIST));
|
|
||||||
set_reply_state(REPLY_STATE_DONE);
|
|
||||||
|
|
||||||
if (more)
|
|
||||||
{
|
|
||||||
/** The server will send more resultsets */
|
|
||||||
set_reply_state(REPLY_STATE_START);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return get_reply_state() == REPLY_STATE_DONE;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer)
|
uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer)
|
||||||
{
|
{
|
||||||
uint32_t rval = 0;
|
uint32_t rval = 0;
|
||||||
|
@ -14,20 +14,12 @@
|
|||||||
|
|
||||||
#include "readwritesplit.hh"
|
#include "readwritesplit.hh"
|
||||||
#include "rwsplit_ps.hh"
|
#include "rwsplit_ps.hh"
|
||||||
|
#include "rwbackend.hh"
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
#include <maxscale/modutil.h>
|
#include <maxscale/modutil.h>
|
||||||
|
|
||||||
|
|
||||||
/** Enum for tracking client reply state */
|
|
||||||
enum reply_state_t
|
|
||||||
{
|
|
||||||
REPLY_STATE_START, /**< Query sent to backend */
|
|
||||||
REPLY_STATE_DONE, /**< Complete reply received */
|
|
||||||
REPLY_STATE_RSET_COLDEF, /**< Resultset response, waiting for column definitions */
|
|
||||||
REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef enum
|
typedef enum
|
||||||
{
|
{
|
||||||
EXPECTING_NOTHING = 0,
|
EXPECTING_NOTHING = 0,
|
||||||
@ -35,72 +27,16 @@ typedef enum
|
|||||||
EXPECTING_REAL_RESULT
|
EXPECTING_REAL_RESULT
|
||||||
} wait_gtid_state_t;
|
} wait_gtid_state_t;
|
||||||
|
|
||||||
typedef std::map<uint32_t, uint32_t> BackendHandleMap; /** Internal ID to external ID */
|
|
||||||
typedef std::map<uint32_t, uint32_t> ClientHandleMap; /** External ID to internal ID */
|
typedef std::map<uint32_t, uint32_t> ClientHandleMap; /** External ID to internal ID */
|
||||||
|
|
||||||
class RWBackend: public mxs::Backend
|
|
||||||
{
|
|
||||||
RWBackend(const RWBackend&);
|
|
||||||
RWBackend& operator=(const RWBackend&);
|
|
||||||
|
|
||||||
public:
|
|
||||||
RWBackend(SERVER_REF* ref);
|
|
||||||
~RWBackend();
|
|
||||||
|
|
||||||
inline reply_state_t get_reply_state() const
|
|
||||||
{
|
|
||||||
return m_reply_state;
|
|
||||||
}
|
|
||||||
|
|
||||||
inline void set_reply_state(reply_state_t state)
|
|
||||||
{
|
|
||||||
m_reply_state = state;
|
|
||||||
}
|
|
||||||
|
|
||||||
void add_ps_handle(uint32_t id, uint32_t handle);
|
|
||||||
uint32_t get_ps_handle(uint32_t id) const;
|
|
||||||
|
|
||||||
bool execute_session_command();
|
|
||||||
bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE);
|
|
||||||
void close(close_type type = CLOSE_NORMAL);
|
|
||||||
|
|
||||||
inline void set_large_packet(bool value)
|
|
||||||
{
|
|
||||||
m_large_packet = value;
|
|
||||||
}
|
|
||||||
|
|
||||||
inline bool is_large_packet() const
|
|
||||||
{
|
|
||||||
return m_large_packet;
|
|
||||||
}
|
|
||||||
|
|
||||||
inline uint8_t current_command() const
|
|
||||||
{
|
|
||||||
return m_command;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool reply_is_complete(GWBUF *buffer);
|
|
||||||
|
|
||||||
private:
|
|
||||||
reply_state_t m_reply_state;
|
|
||||||
BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */
|
|
||||||
bool m_large_packet; /**< Used to store the state of the EOF packet
|
|
||||||
*calculation for result sets when the result
|
|
||||||
* contains very large rows */
|
|
||||||
uint8_t m_command;
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef std::tr1::shared_ptr<RWBackend> SRWBackend;
|
|
||||||
typedef std::list<SRWBackend> SRWBackendList;
|
|
||||||
|
|
||||||
typedef std::tr1::unordered_set<std::string> TableSet;
|
typedef std::tr1::unordered_set<std::string> TableSet;
|
||||||
typedef std::map<uint64_t, uint8_t> ResponseMap;
|
typedef std::map<uint64_t, uint8_t> ResponseMap;
|
||||||
|
|
||||||
/** List of slave responses that arrived before the master */
|
/** List of slave responses that arrived before the master */
|
||||||
typedef std::list< std::pair<SRWBackend, uint8_t> > SlaveResponseList;
|
typedef std::list< std::pair<mxs::SRWBackend, uint8_t> > SlaveResponseList;
|
||||||
|
|
||||||
/** Map of COM_STMT_EXECUTE targets by internal ID */
|
/** Map of COM_STMT_EXECUTE targets by internal ID */
|
||||||
typedef std::tr1::unordered_map<uint32_t, SRWBackend> ExecMap;
|
typedef std::tr1::unordered_map<uint32_t, mxs::SRWBackend> ExecMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The client session of a RWSplit instance
|
* The client session of a RWSplit instance
|
||||||
@ -125,10 +61,10 @@ public:
|
|||||||
// TODO: Make member variables private
|
// TODO: Make member variables private
|
||||||
skygw_chk_t rses_chk_top;
|
skygw_chk_t rses_chk_top;
|
||||||
bool rses_closed; /**< true when closeSession is called */
|
bool rses_closed; /**< true when closeSession is called */
|
||||||
SRWBackendList backends; /**< List of backend servers */
|
mxs::SRWBackendList backends; /**< List of backend servers */
|
||||||
SRWBackend current_master; /**< Current master server */
|
mxs::SRWBackend current_master; /**< Current master server */
|
||||||
SRWBackend target_node; /**< The currently locked target node */
|
mxs::SRWBackend target_node; /**< The currently locked target node */
|
||||||
SRWBackend prev_target; /**< The previous target where a query was sent */
|
mxs::SRWBackend prev_target; /**< The previous target where a query was sent */
|
||||||
bool large_query; /**< Set to true when processing payloads >= 2^24 bytes */
|
bool large_query; /**< Set to true when processing payloads >= 2^24 bytes */
|
||||||
Config rses_config; /**< copied config info from router instance */
|
Config rses_config; /**< copied config info from router instance */
|
||||||
int rses_nbackends;
|
int rses_nbackends;
|
||||||
@ -156,33 +92,9 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
||||||
const SRWBackendList& backends, const SRWBackend& master);
|
const mxs::SRWBackendList& backends, const mxs::SRWBackend& master);
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper function to convert reply_state_t to string
|
|
||||||
*/
|
|
||||||
static inline const char* rstostr(reply_state_t state)
|
|
||||||
{
|
|
||||||
switch (state)
|
|
||||||
{
|
|
||||||
case REPLY_STATE_START:
|
|
||||||
return "REPLY_STATE_START";
|
|
||||||
|
|
||||||
case REPLY_STATE_DONE:
|
|
||||||
return "REPLY_STATE_DONE";
|
|
||||||
|
|
||||||
case REPLY_STATE_RSET_COLDEF:
|
|
||||||
return "REPLY_STATE_RSET_COLDEF";
|
|
||||||
|
|
||||||
case REPLY_STATE_RSET_ROWS:
|
|
||||||
return "REPLY_STATE_RSET_ROWS";
|
|
||||||
}
|
|
||||||
|
|
||||||
ss_dassert(false);
|
|
||||||
return "UNKNOWN";
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Get the internal ID for the given binary prepared statement
|
* @brief Get the internal ID for the given binary prepared statement
|
||||||
*
|
*
|
||||||
|
Reference in New Issue
Block a user