diff --git a/server/modules/routing/readwritesplit/CMakeLists.txt b/server/modules/routing/readwritesplit/CMakeLists.txt index c970f3c63..73ba0f832 100644 --- a/server/modules/routing/readwritesplit/CMakeLists.txt +++ b/server/modules/routing/readwritesplit/CMakeLists.txt @@ -1,5 +1,6 @@ add_library(readwritesplit SHARED readwritesplit.cc +rwsplitsession.cc rwsplit_mysql.cc rwsplit_route_stmt.cc rwsplit_select_backends.cc diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index 31126e59c..752dcc572 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -12,7 +12,6 @@ */ #include "readwritesplit.hh" -#include "rwsplit_internal.hh" #include #include @@ -32,6 +31,9 @@ #include #include +#include "rwsplit_internal.hh" +#include "rwsplitsession.hh" + /** * The entry points for the read/write query splitting router module. * diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index 80f72410d..3e45a2f37 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -272,231 +272,3 @@ static inline const char* failure_mode_to_str(enum failure_mode type) return "UNDEFINED_MODE"; } } - -/** - * The following code is client session specific, to be moved into a separate file - */ - -/** Enum for tracking client reply state */ -enum reply_state_t -{ - REPLY_STATE_START, /**< Query sent to backend */ - REPLY_STATE_DONE, /**< Complete reply received */ - REPLY_STATE_RSET_COLDEF, /**< Resultset response, waiting for column definitions */ - REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */ -}; - -/** Reply state change debug logging */ -#define LOG_RS(a, b) MXS_DEBUG("%s %s -> %s", (a)->uri(), \ - rstostr((a)->get_reply_state()), rstostr(b)); - -static inline bool is_ps_command(uint8_t cmd) -{ - return cmd == MYSQL_COM_STMT_EXECUTE || - cmd == MYSQL_COM_STMT_SEND_LONG_DATA || - cmd == MYSQL_COM_STMT_CLOSE || - cmd == MYSQL_COM_STMT_FETCH || - cmd == MYSQL_COM_STMT_RESET; -} - -typedef std::map BackendHandleMap; /** Internal ID to external ID */ -typedef std::map ClientHandleMap; /** External ID to internal ID */ - -class RWBackend: public mxs::Backend -{ - RWBackend(const RWBackend&); - RWBackend& operator=(const RWBackend&); - -public: - RWBackend(SERVER_REF* ref): - mxs::Backend(ref), - m_reply_state(REPLY_STATE_DONE) - { - } - - ~RWBackend() - { - } - - reply_state_t get_reply_state() const - { - return m_reply_state; - } - - void set_reply_state(reply_state_t state) - { - m_reply_state = state; - } - - bool execute_session_command() - { - bool expect_response = mxs_mysql_command_will_respond(next_session_command()->get_command()); - bool rval = mxs::Backend::execute_session_command(); - - if (rval && expect_response) - { - set_reply_state(REPLY_STATE_START); - } - - return rval; - } - - void add_ps_handle(uint32_t id, uint32_t handle) - { - m_ps_handles[id] = handle; - MXS_INFO("PS response for %s: %u -> %u", name(), id, handle); - } - - uint32_t get_ps_handle(uint32_t id) const - { - BackendHandleMap::const_iterator it = m_ps_handles.find(id); - - if (it != m_ps_handles.end()) - { - return it->second; - } - - return 0; - } - - bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE) - { - uint8_t cmd = mxs_mysql_get_command(buffer); - - if (is_ps_command(cmd)) - { - uint32_t id = mxs_mysql_extract_ps_id(buffer); - BackendHandleMap::iterator it = m_ps_handles.find(id); - - if (it != m_ps_handles.end()) - { - /** Replace the client handle with the real PS handle */ - uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET; - gw_mysql_set_byte4(ptr, it->second); - } - } - - return mxs::Backend::write(buffer); - } - -private: - reply_state_t m_reply_state; - BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */ -}; - -/** Prepared statement ID to type maps for text protocols */ -typedef std::tr1::unordered_map BinaryPSMap; -typedef std::tr1::unordered_map TextPSMap; - -class PSManager -{ - PSManager(const PSManager&); - PSManager& operator =(const PSManager&); - -public: - PSManager(); - ~PSManager(); - - /** - * @brief Store and process a prepared statement - * - * @param buffer Buffer containing either a text or a binary protocol - * prepared statement - * @param id The unique ID for this statement - */ - void store(GWBUF* buffer, uint32_t id); - - /** - * @brief Get the type of a stored prepared statement - * - * @param id The unique identifier for the prepared statement or the plaintext - * name of the prepared statement - * - * @return The type of the prepared statement - */ - uint32_t get_type(uint32_t id) const; - uint32_t get_type(std::string id) const; - - /** - * @brief Remove a prepared statement - * - * @param id Statement identifier to remove - */ - void erase(std::string id); - void erase(uint32_t id); - -private: - BinaryPSMap m_binary_ps; - TextPSMap m_text_ps; -}; - -typedef std::tr1::shared_ptr SRWBackend; -typedef std::list SRWBackendList; - -typedef std::tr1::unordered_set TableSet; -typedef std::map ResponseMap; - -/** Map of COM_STMT_EXECUTE targets by internal ID */ -typedef std::tr1::unordered_map ExecMap; - -/** - * The client session of a RWSplit instance - */ -class RWSplitSession -{ - RWSplitSession(const RWSplitSession&); - RWSplitSession& operator=(const RWSplitSession&); - -public: - RWSplitSession(const Config& config); - - // TODO: Make member variables private - skygw_chk_t rses_chk_top; - bool rses_closed; /**< true when closeSession is called */ - SRWBackendList backends; /**< List of backend servers */ - SRWBackend current_master; /**< Current master server */ - SRWBackend target_node; /**< The currently locked target node */ - Config rses_config; /**< copied config info from router instance */ - int rses_nbackends; - enum ld_state load_data_state; /**< Current load data state */ - bool have_tmp_tables; - uint64_t rses_load_data_sent; /**< How much data has been sent */ - DCB* client_dcb; - uint64_t sescmd_count; - int expected_responses; /**< Number of expected responses to the current query */ - GWBUF* query_queue; /**< Queued commands waiting to be executed */ - RWSplit* router; /**< The router instance */ - TableSet temp_tables; /**< Set of temporary tables */ - mxs::SessionCommandList sescmd_list; /**< List of executed session commands */ - ResponseMap sescmd_responses; /**< Response to each session command */ - uint64_t sent_sescmd; /**< ID of the last sent session command*/ - uint64_t recv_sescmd; /**< ID of the most recently completed session command */ - PSManager ps_manager; /**< Prepared statement manager*/ - ClientHandleMap ps_handles; /**< Client PS handle to internal ID mapping */ - ExecMap exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */ - skygw_chk_t rses_chk_tail; -}; - -/** - * Helper function to convert reply_state_t to string - */ -static inline const char* rstostr(reply_state_t state) -{ - switch (state) - { - case REPLY_STATE_START: - return "REPLY_STATE_START"; - - case REPLY_STATE_DONE: - return "REPLY_STATE_DONE"; - - case REPLY_STATE_RSET_COLDEF: - return "REPLY_STATE_RSET_COLDEF"; - - case REPLY_STATE_RSET_ROWS: - return "REPLY_STATE_RSET_ROWS"; - } - - ss_dassert(false); - return "UNKNOWN"; -} diff --git a/server/modules/routing/readwritesplit/rwsplit_internal.hh b/server/modules/routing/readwritesplit/rwsplit_internal.hh index 8b8c02abc..ba3c7637e 100644 --- a/server/modules/routing/readwritesplit/rwsplit_internal.hh +++ b/server/modules/routing/readwritesplit/rwsplit_internal.hh @@ -12,14 +12,14 @@ * Public License. */ -#include +#include "readwritesplit.hh" #include #include #include -#include "readwritesplit.hh" +#include "rwsplitsession.hh" #define RW_CHK_DCB(b, d) \ do{ \ @@ -31,6 +31,15 @@ do{ \ #define RW_CLOSE_BREF(b) do{ if (b){ (b)->closed_at = __LINE__; } } while (false) +static inline bool is_ps_command(uint8_t cmd) +{ + return cmd == MYSQL_COM_STMT_EXECUTE || + cmd == MYSQL_COM_STMT_SEND_LONG_DATA || + cmd == MYSQL_COM_STMT_CLOSE || + cmd == MYSQL_COM_STMT_FETCH || + cmd == MYSQL_COM_STMT_RESET; +} + /* * The following are implemented in rwsplit_mysql.c */ @@ -113,12 +122,3 @@ bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type); uint32_t determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet); void close_all_connections(RWSplitSession* rses); - -/** - * @brief Extract text identifier of a PREPARE or EXECUTE statement - * - * @param buffer Buffer containing a PREPARE or EXECUTE command - * - * @return The string identifier of the statement - */ -std::string extract_text_ps_id(GWBUF* buffer); diff --git a/server/modules/routing/readwritesplit/rwsplit_ps.cc b/server/modules/routing/readwritesplit/rwsplit_ps.cc index 138af7a1a..603c81ef4 100644 --- a/server/modules/routing/readwritesplit/rwsplit_ps.cc +++ b/server/modules/routing/readwritesplit/rwsplit_ps.cc @@ -17,7 +17,9 @@ #include #include -static uint32_t get_prepare_type(GWBUF* buffer) +#include "rwsplit_internal.hh" + +uint32_t get_prepare_type(GWBUF* buffer) { uint32_t type; @@ -56,7 +58,7 @@ static uint32_t get_prepare_type(GWBUF* buffer) return type; } -std::string extract_text_ps_id(GWBUF* buffer) +std::string get_text_ps_id(GWBUF* buffer) { std::string rval; char* name = qc_get_prepare_name(buffer); @@ -70,6 +72,12 @@ std::string extract_text_ps_id(GWBUF* buffer) return rval; } +void replace_binary_ps_id(GWBUF* buffer, uint32_t id) +{ + uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET; + gw_mysql_set_byte4(ptr, id); +} + PSManager::PSManager() { } @@ -138,7 +146,7 @@ void PSManager::store(GWBUF* buffer, uint32_t id) switch (mxs_mysql_get_command(buffer)) { case MYSQL_COM_QUERY: - m_text_ps[extract_text_ps_id(buffer)] = get_prepare_type(buffer); + m_text_ps[get_text_ps_id(buffer)] = get_prepare_type(buffer); break; case MYSQL_COM_STMT_PREPARE: diff --git a/server/modules/routing/readwritesplit/rwsplit_ps.hh b/server/modules/routing/readwritesplit/rwsplit_ps.hh new file mode 100644 index 000000000..fce893cf6 --- /dev/null +++ b/server/modules/routing/readwritesplit/rwsplit_ps.hh @@ -0,0 +1,91 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "readwritesplit.hh" + +#include +#include + +/** Prepared statement ID to type maps for text protocols */ +typedef std::tr1::unordered_map BinaryPSMap; +typedef std::tr1::unordered_map TextPSMap; + +/** Class for tracking prepared statement types by PS statement ID */ +class PSManager +{ + PSManager(const PSManager&); + PSManager& operator =(const PSManager&); + +public: + PSManager(); + ~PSManager(); + + /** + * @brief Store and process a prepared statement + * + * @param buffer Buffer containing either a text or a binary protocol + * prepared statement + * @param id The unique ID for this statement + */ + void store(GWBUF* buffer, uint32_t id); + + /** + * @brief Get the type of a stored prepared statement + * + * @param id The unique identifier for the prepared statement or the plaintext + * name of the prepared statement + * + * @return The type of the prepared statement + */ + uint32_t get_type(uint32_t id) const; + uint32_t get_type(std::string id) const; + + /** + * @brief Remove a prepared statement + * + * @param id Statement identifier to remove + */ + void erase(std::string id); + void erase(uint32_t id); + +private: + BinaryPSMap m_binary_ps; + TextPSMap m_text_ps; +}; + +/** + * @brief Get the type of a prepared statement + * + * @param buffer Buffer containing either a text or a binary prepared statement + * + * @return The type of the prepared statement + */ +uint32_t get_prepare_type(GWBUF* buffer); + +/** + * @brief Extract text identifier of a PREPARE or EXECUTE statement + * + * @param buffer Buffer containing a PREPARE or EXECUTE command + * + * @return The string identifier of the statement + */ +std::string get_text_ps_id(GWBUF* buffer); + +/** + * @brief Replace the ID of a binary protocol statement + * + * @param buffer Buffer containing a binary protocol statement with an ID + * @param id ID to insert into the buffer + */ +void replace_binary_ps_id(GWBUF* buffer, uint32_t id); diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 4333f35c7..99663ffd7 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -91,28 +91,6 @@ void handle_connection_keepalive(RWSplit *inst, RWSplitSession *rses, ss_dassert(nserv < rses->rses_nbackends); } -uint32_t get_stmt_id(RWSplitSession* rses, GWBUF* buffer) -{ - uint32_t rval = 0; - - // All COM_STMT type statements store the ID in the same place - uint32_t id = mxs_mysql_extract_ps_id(buffer); - ClientHandleMap::iterator it = rses->ps_handles.find(id); - - if (it != rses->ps_handles.end()) - { - rval = it->second; - } - - return rval; -} - -void replace_stmt_id(GWBUF* buffer, uint32_t id) -{ - uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET; - gw_mysql_set_byte4(ptr, id); -} - /** * Routing function. Find out query type, backend type, and target DCB(s). * Then route query to found target(s). @@ -166,14 +144,14 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses, if (command == MYSQL_COM_QUERY && qc_get_operation(querybuf) == QUERY_OP_EXECUTE) { - std::string id = extract_text_ps_id(querybuf); + std::string id = get_text_ps_id(querybuf); qtype = rses->ps_manager.get_type(id); } else if (is_ps_command(command)) { - stmt_id = get_stmt_id(rses, querybuf); + stmt_id = get_internal_ps_id(rses, querybuf); qtype = rses->ps_manager.get_type(stmt_id); - replace_stmt_id(querybuf, stmt_id); + replace_binary_ps_id(querybuf, stmt_id); } route_target = get_route_target(rses, command, qtype, querybuf->hint); diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc new file mode 100644 index 000000000..c00a06bb2 --- /dev/null +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "rwsplitsession.hh" +#include "rwsplit_internal.hh" + +RWBackend::RWBackend(SERVER_REF* ref): + mxs::Backend(ref), + m_reply_state(REPLY_STATE_DONE) +{ +} + +RWBackend::~RWBackend() +{ +} + +reply_state_t RWBackend::get_reply_state() const +{ + return m_reply_state; +} + +void RWBackend::set_reply_state(reply_state_t state) +{ + m_reply_state = state; +} + +bool RWBackend::execute_session_command() +{ + bool expect_response = mxs_mysql_command_will_respond(next_session_command()->get_command()); + bool rval = mxs::Backend::execute_session_command(); + + if (rval && expect_response) + { + set_reply_state(REPLY_STATE_START); + } + + return rval; +} + +void RWBackend::add_ps_handle(uint32_t id, uint32_t handle) +{ + m_ps_handles[id] = handle; + MXS_INFO("PS response for %s: %u -> %u", name(), id, handle); +} + +uint32_t RWBackend::get_ps_handle(uint32_t id) const +{ + BackendHandleMap::const_iterator it = m_ps_handles.find(id); + + if (it != m_ps_handles.end()) + { + return it->second; + } + + return 0; +} + +bool RWBackend::write(GWBUF* buffer, response_type type) +{ + uint8_t cmd = mxs_mysql_get_command(buffer); + + if (is_ps_command(cmd)) + { + uint32_t id = mxs_mysql_extract_ps_id(buffer); + BackendHandleMap::iterator it = m_ps_handles.find(id); + + if (it != m_ps_handles.end()) + { + /** Replace the client handle with the real PS handle */ + uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET; + gw_mysql_set_byte4(ptr, it->second); + } + } + + return mxs::Backend::write(buffer); +} + +uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer) +{ + uint32_t rval = 0; + + // All COM_STMT type statements store the ID in the same place + uint32_t id = mxs_mysql_extract_ps_id(buffer); + ClientHandleMap::iterator it = rses->ps_handles.find(id); + + if (it != rses->ps_handles.end()) + { + rval = it->second; + } + + return rval; +} diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh new file mode 100644 index 000000000..de58125cd --- /dev/null +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -0,0 +1,137 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "readwritesplit.hh" +#include "rwsplit_ps.hh" + + +/** Enum for tracking client reply state */ +enum reply_state_t +{ + REPLY_STATE_START, /**< Query sent to backend */ + REPLY_STATE_DONE, /**< Complete reply received */ + REPLY_STATE_RSET_COLDEF, /**< Resultset response, waiting for column definitions */ + REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */ +}; + +/** Reply state change debug logging */ +#define LOG_RS(a, b) MXS_DEBUG("%s %s -> %s", (a)->uri(), \ + rstostr((a)->get_reply_state()), rstostr(b)); + +typedef std::map BackendHandleMap; /** Internal ID to external ID */ +typedef std::map ClientHandleMap; /** External ID to internal ID */ + +class RWBackend: public mxs::Backend +{ + RWBackend(const RWBackend&); + RWBackend& operator=(const RWBackend&); + +public: + RWBackend(SERVER_REF* ref); + ~RWBackend(); + + reply_state_t get_reply_state() const; + void set_reply_state(reply_state_t state); + + void add_ps_handle(uint32_t id, uint32_t handle); + uint32_t get_ps_handle(uint32_t id) const; + + bool execute_session_command(); + bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE); + +private: + reply_state_t m_reply_state; + BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */ +}; + +typedef std::tr1::shared_ptr SRWBackend; +typedef std::list SRWBackendList; + +typedef std::tr1::unordered_set TableSet; +typedef std::map ResponseMap; + +/** Map of COM_STMT_EXECUTE targets by internal ID */ +typedef std::tr1::unordered_map ExecMap; + +/** + * The client session of a RWSplit instance + */ +class RWSplitSession +{ + RWSplitSession(const RWSplitSession&); + RWSplitSession& operator=(const RWSplitSession&); + +public: + RWSplitSession(const Config& config); + + // TODO: Make member variables private + skygw_chk_t rses_chk_top; + bool rses_closed; /**< true when closeSession is called */ + SRWBackendList backends; /**< List of backend servers */ + SRWBackend current_master; /**< Current master server */ + SRWBackend target_node; /**< The currently locked target node */ + Config rses_config; /**< copied config info from router instance */ + int rses_nbackends; + enum ld_state load_data_state; /**< Current load data state */ + bool have_tmp_tables; + uint64_t rses_load_data_sent; /**< How much data has been sent */ + DCB* client_dcb; + uint64_t sescmd_count; + int expected_responses; /**< Number of expected responses to the current query */ + GWBUF* query_queue; /**< Queued commands waiting to be executed */ + RWSplit* router; /**< The router instance */ + TableSet temp_tables; /**< Set of temporary tables */ + mxs::SessionCommandList sescmd_list; /**< List of executed session commands */ + ResponseMap sescmd_responses; /**< Response to each session command */ + uint64_t sent_sescmd; /**< ID of the last sent session command*/ + uint64_t recv_sescmd; /**< ID of the most recently completed session command */ + PSManager ps_manager; /**< Prepared statement manager*/ + ClientHandleMap ps_handles; /**< Client PS handle to internal ID mapping */ + ExecMap exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */ + skygw_chk_t rses_chk_tail; +}; + +/** + * Helper function to convert reply_state_t to string + */ +static inline const char* rstostr(reply_state_t state) +{ + switch (state) + { + case REPLY_STATE_START: + return "REPLY_STATE_START"; + + case REPLY_STATE_DONE: + return "REPLY_STATE_DONE"; + + case REPLY_STATE_RSET_COLDEF: + return "REPLY_STATE_RSET_COLDEF"; + + case REPLY_STATE_RSET_ROWS: + return "REPLY_STATE_RSET_ROWS"; + } + + ss_dassert(false); + return "UNKNOWN"; +} + +/** + * @brief Get the internal ID for the given binary prepared statement + * + * @param rses Router client session + * @param buffer Buffer containing a binary protocol statement other than COM_STMT_PREPARE + * + * @return The internal ID of the prepared statement that the buffer contents refer to + */ +uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer);