diff --git a/include/maxscale/backend.hh b/include/maxscale/backend.hh new file mode 100644 index 000000000..3948065a5 --- /dev/null +++ b/include/maxscale/backend.hh @@ -0,0 +1,195 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include + +#include +#include + +#include +#include + + +namespace maxscale +{ + +/** + * The state of the backend server reference + */ +enum bref_state +{ + BREF_IN_USE = 0x01, + BREF_WAITING_RESULT = 0x02, /**< for session commands only */ + BREF_QUERY_ACTIVE = 0x04, /**< for other queries */ + BREF_CLOSED = 0x08, +}; + +class Backend +{ + Backend(const Backend&); + Backend& operator =(const Backend&); +public: + /** + * @brief Create new Backend + * + * @param ref Server reference used by this backend + */ + Backend(SERVER_REF* ref); + + virtual ~Backend(); + + /** + * @brief Execute the next session command in the queue + * + * @return True if the command was executed successfully + */ + bool execute_session_command(); + + /** + * @brief Add a new session command to the tail of the command queue + * + * @param buffer Session command to add + * @param sequence Sequence identifier of this session command, returned when + * the session command is completed + */ + void add_session_command(GWBUF* buffer, uint64_t sequence); + + /** + * @brief Mark the current session command as successfully executed + * + * This should be called when the response to the command is received + * + * @return The sequence identifier for this session command + */ + uint64_t complete_session_command(); + + /** + * @brief Check if backend has session commands + * + * @return True if backend has session commands + */ + size_t session_command_count() const; + + /** + * @brief Clear state + * + * @param state State to clear + */ + void clear_state(enum bref_state state); + + /** + * @brief Set state + * + * @param state State to set + */ + void set_state(enum bref_state state); + + /** + * @brief Get pointer to server reference + * + * @return Pointer to server reference + */ + SERVER_REF* backend() const; + + /** + * @brief Create a new connection + * + * @param session The session to which the connection is linked + * + * @return True if connection was successfully created + */ + bool connect(MXS_SESSION* session); + + /** + * @brief Close the backend + * + * This will close all active connections created by the backend. + */ + void close(); + + /** + * @brief Get a pointer to the internal DCB + * + * @return Pointer to internal DCB + */ + DCB* dcb() const; + + /** + * @brief Write data to the backend server + * + * @param buffer Buffer containing the data to write + * + * @return True if data was written successfully + */ + bool write(GWBUF* buffer); + + /** + * @brief Store a command + * + * The command is stored and executed once the session can execute + * the next command. + * + * @param buffer Buffer to store + */ + void store_command(GWBUF* buffer); + + /** + * @brief Write the stored command to the backend server + * + * @return True if command was written successfully + */ + bool write_stored_command(); + + /** + * @brief Check if backend is in use + * + * @return True if backend is in use + */ + bool in_use() const; + + /** + * @brief Check if backend is waiting for a result + * + * @return True if backend is waiting for a result + */ + bool is_waiting_result() const; + + /** + * @brief Check if a query is active + * + * @return True if a query is active + */ + bool is_query_active() const; + + /** + * @brief Check if the backend is closed + * + * @return True if the backend is closed + */ + bool is_closed() const; + +private: + bool m_closed; /**< True if a connection has been opened and closed */ + SERVER_REF* m_backend; /**< Backend server */ + DCB* m_dcb; /**< Backend DCB */ + int m_num_result_wait; /**< Number of not yet received results */ + mxs::Buffer m_pending_cmd; /**< Pending commands */ + int m_state; /**< State of the backend */ + SessionCommandList m_session_commands; /**< List of session commands that are + * to be executed on this backend server */ +}; + +typedef std::tr1::shared_ptr SBackend; +typedef std::list BackendList; +} diff --git a/server/modules/routing/schemarouter/session_command.hh b/include/maxscale/session_command.hh similarity index 73% rename from server/modules/routing/schemarouter/session_command.hh rename to include/maxscale/session_command.hh index 7fa37e754..59c198f43 100644 --- a/server/modules/routing/schemarouter/session_command.hh +++ b/include/maxscale/session_command.hh @@ -12,18 +12,21 @@ * Public License. */ +#include + +#include #include #include #include -using namespace maxscale; - -class SessionCommand; -typedef std::list SessionCommandList; +namespace maxscale +{ class SessionCommand { + SessionCommand(const SessionCommand&); + SessionCommand& operator=(const SessionCommand&); public: /** * @brief Mark reply as received @@ -54,7 +57,7 @@ public: * @brief Creates a copy of the internal buffer * @return A copy of the internal buffer */ - Buffer copy_buffer() const; + mxs::Buffer copy_buffer() const; /** * @brief Create a new session command @@ -75,8 +78,13 @@ public: std::string to_string(); private: - Buffer m_buffer; /**< The buffer containing the command */ - uint8_t m_command; /**< The command being executed */ - uint64_t m_pos; /**< Unique position identifier */ - bool m_reply_sent; /**< Whether the session command reply has been sent */ + mxs::Buffer m_buffer; /**< The buffer containing the command */ + uint8_t m_command; /**< The command being executed */ + uint64_t m_pos; /**< Unique position identifier */ + bool m_reply_sent; /**< Whether the session command reply has been sent */ }; + +typedef std::tr1::shared_ptr SSessionCommand; +typedef std::list SessionCommandList; + +} diff --git a/server/core/CMakeLists.txt b/server/core/CMakeLists.txt index 7ef6a6f32..371e550ec 100644 --- a/server/core/CMakeLists.txt +++ b/server/core/CMakeLists.txt @@ -4,6 +4,7 @@ add_library(maxscale-common SHARED alloc.cc atomic.cc authenticator.cc + backend.cc buffer.cc config.cc config_runtime.cc @@ -41,6 +42,7 @@ add_library(maxscale-common SHARED server.cc service.cc session.cc + session_command.cc skygw_utils.cc spinlock.cc ssl.cc diff --git a/server/core/backend.cc b/server/core/backend.cc new file mode 100644 index 000000000..04b4ce889 --- /dev/null +++ b/server/core/backend.cc @@ -0,0 +1,220 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include +#include + +using namespace maxscale; + +Backend::Backend(SERVER_REF *ref): + m_closed(false), + m_backend(ref), + m_dcb(NULL), + m_num_result_wait(0), + m_state(0) +{ +} + +Backend::~Backend() +{ + ss_dassert(m_closed); + + if (!m_closed) + { + close(); + } +} + +void Backend::close() +{ + if (!m_closed) + { + m_closed = true; + + if (in_use()) + { + CHK_DCB(m_dcb); + + /** Clean operation counter in bref and in SERVER */ + while (is_waiting_result()) + { + clear_state(BREF_WAITING_RESULT); + } + clear_state(BREF_IN_USE); + set_state(BREF_CLOSED); + + dcb_close(m_dcb); + + /** decrease server current connection counters */ + atomic_add(&m_backend->connections, -1); + } + } + else + { + ss_dassert(false); + } +} + +bool Backend::execute_session_command() +{ + if (is_closed() || !session_command_count()) + { + return false; + } + + CHK_DCB(m_dcb); + + int rc = 0; + + SessionCommandList::iterator iter = m_session_commands.begin(); + SessionCommand& sescmd = *(*iter); + GWBUF *buffer = sescmd.copy_buffer().release(); + + switch (sescmd.get_command()) + { + case MYSQL_COM_CHANGE_USER: + /** This makes it possible to handle replies correctly */ + gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); + rc = m_dcb->func.auth(m_dcb, NULL, m_dcb->session, buffer); + break; + + case MYSQL_COM_QUERY: + default: + /** + * Mark session command buffer, it triggers writing + * MySQL command to protocol + */ + gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); + rc = m_dcb->func.write(m_dcb, buffer); + break; + } + + return rc == 1; +} + +void Backend::add_session_command(GWBUF* buffer, uint64_t sequence) +{ + m_session_commands.push_back(SSessionCommand(new SessionCommand(buffer, sequence))); +} + +uint64_t Backend::complete_session_command() +{ + uint64_t rval = m_session_commands.front()->get_position(); + m_session_commands.pop_front(); + return rval; +} + +size_t Backend::session_command_count() const +{ + return m_session_commands.size(); +} + +void Backend::clear_state(enum bref_state state) +{ + if (state != BREF_WAITING_RESULT) + { + m_state &= ~state; + } + else + { + /** Decrease global operation count */ + ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, -1); + ss_dassert(prev2 > 0); + } +} + +void Backend::set_state(enum bref_state state) +{ + if (state != BREF_WAITING_RESULT) + { + m_state |= state; + } + else + { + /** Increase global operation count */ + ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, 1); + ss_dassert(prev2 >= 0); + } +} + +SERVER_REF* Backend::backend() const +{ + return m_backend; +} + +bool Backend::connect(MXS_SESSION* session) +{ + bool rval = false; + + if ((m_dcb = dcb_connect(m_backend->server, session, m_backend->server->protocol))) + { + m_state = BREF_IN_USE; + atomic_add(&m_backend->connections, 1); + rval = true; + } + + return rval; +} + +DCB* Backend::dcb() const +{ + return m_dcb; +} + +bool Backend::write(GWBUF* buffer) +{ + return m_dcb->func.write(m_dcb, buffer) != 0; +} + +void Backend::store_command(GWBUF* buffer) +{ + m_pending_cmd.reset(buffer); +} + +bool Backend::write_stored_command() +{ + bool rval = false; + + if (m_pending_cmd.length()) + { + rval = write(m_pending_cmd.release()); + + if (!rval) + { + MXS_ERROR("Routing of pending query failed."); + } + } + + return rval; +} + +bool Backend::in_use() const +{ + return m_state & BREF_IN_USE; +} + +bool Backend::is_waiting_result() const +{ + return m_num_result_wait > 0; +} + +bool Backend::is_query_active() const +{ + return m_state & BREF_QUERY_ACTIVE; +} + +bool Backend::is_closed() const +{ + return m_state & BREF_CLOSED; +} diff --git a/server/modules/routing/schemarouter/session_command.cc b/server/core/session_command.cc similarity index 95% rename from server/modules/routing/schemarouter/session_command.cc rename to server/core/session_command.cc index a5d7c520c..c663ee0ff 100644 --- a/server/modules/routing/schemarouter/session_command.cc +++ b/server/core/session_command.cc @@ -11,10 +11,13 @@ * Public License. */ -#include "session_command.hh" +#include + #include #include +using namespace maxscale; + void SessionCommand::mark_reply_received() { m_reply_sent = true; diff --git a/server/modules/routing/schemarouter/CMakeLists.txt b/server/modules/routing/schemarouter/CMakeLists.txt index 2d136dcba..c2d3dd5d6 100644 --- a/server/modules/routing/schemarouter/CMakeLists.txt +++ b/server/modules/routing/schemarouter/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(schemarouter SHARED schemarouter.cc schemarouterinstance.cc schemaroutersession.cc shard_map.cc session_command.cc) +add_library(schemarouter SHARED schemarouter.cc schemarouterinstance.cc schemaroutersession.cc shard_map.cc) target_link_libraries(schemarouter maxscale-common MySQLCommon) add_dependencies(schemarouter pcre2) set_target_properties(schemarouter PROPERTIES VERSION "1.0.0") diff --git a/server/modules/routing/schemarouter/schemarouter.cc b/server/modules/routing/schemarouter/schemarouter.cc index dc61bdf68..899bb7750 100644 --- a/server/modules/routing/schemarouter/schemarouter.cc +++ b/server/modules/routing/schemarouter/schemarouter.cc @@ -13,218 +13,17 @@ #include "schemarouter.hh" -#include - -using namespace schemarouter; - -Backend::Backend(SERVER_REF *ref): - m_closed(false), - m_backend(ref), - m_dcb(NULL), - m_mapped(false), - m_num_result_wait(0), - m_state(0) +namespace schemarouter { -} -Backend::~Backend() -{ - ss_dassert(m_closed); - - if (!m_closed) - { - close(); - } -} - -void Backend::close() -{ - if (!m_closed) - { - m_closed = true; - - if (in_use()) - { - CHK_DCB(m_dcb); - - /** Clean operation counter in bref and in SERVER */ - while (is_waiting_result()) - { - clear_state(BREF_WAITING_RESULT); - } - clear_state(BREF_IN_USE); - set_state(BREF_CLOSED); - - dcb_close(m_dcb); - - /** decrease server current connection counters */ - atomic_add(&m_backend->connections, -1); - } - } - else - { - ss_dassert(false); - } -} - -bool Backend::execute_session_command() -{ - if (is_closed() || !session_command_count()) - { - return false; - } - - CHK_DCB(m_dcb); - - int rc = 0; - - SessionCommandList::iterator iter = m_session_commands.begin(); - GWBUF *buffer = iter->copy_buffer().release(); - - switch (iter->get_command()) - { - case MYSQL_COM_CHANGE_USER: - /** This makes it possible to handle replies correctly */ - gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); - rc = m_dcb->func.auth(m_dcb, NULL, m_dcb->session, buffer); - break; - - case MYSQL_COM_QUERY: - default: - /** - * Mark session command buffer, it triggers writing - * MySQL command to protocol - */ - gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); - rc = m_dcb->func.write(m_dcb, buffer); - break; - } - - return rc == 1; -} - -void Backend::add_session_command(GWBUF* buffer, uint64_t sequence) -{ - m_session_commands.push_back(SessionCommand(buffer, sequence)); -} - -uint64_t Backend::complete_session_command() -{ - uint64_t rval = m_session_commands.front().get_position(); - m_session_commands.pop_front(); - return rval; -} - -size_t Backend::session_command_count() const -{ - return m_session_commands.size(); -} - -void Backend::clear_state(enum bref_state state) -{ - if (state != BREF_WAITING_RESULT) - { - m_state &= ~state; - } - else - { - /** Decrease global operation count */ - ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, -1); - ss_dassert(prev2 > 0); - } -} - -void Backend::set_state(enum bref_state state) -{ - if (state != BREF_WAITING_RESULT) - { - m_state |= state; - } - else - { - /** Increase global operation count */ - ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, 1); - ss_dassert(prev2 >= 0); - } -} - -SERVER_REF* Backend::backend() const -{ - return m_backend; -} - -bool Backend::connect(MXS_SESSION* session) -{ - bool rval = false; - - if ((m_dcb = dcb_connect(m_backend->server, session, m_backend->server->protocol))) - { - m_state = BREF_IN_USE; - atomic_add(&m_backend->connections, 1); - rval = true; - } - - return rval; -} - -DCB* Backend::dcb() const -{ - return m_dcb; -} - -bool Backend::write(GWBUF* buffer) -{ - return m_dcb->func.write(m_dcb, buffer) != 0; -} - -void Backend::store_command(GWBUF* buffer) -{ - m_pending_cmd.reset(buffer); -} - -bool Backend::write_stored_command() -{ - bool rval = false; - - if (m_pending_cmd.length()) - { - rval = write(m_pending_cmd.release()); - - if (!rval) - { - MXS_ERROR("Routing of pending query failed."); - } - } - - return rval; -} - -bool Backend::in_use() const -{ - return m_state & BREF_IN_USE; -} - -bool Backend::is_waiting_result() const -{ - return m_num_result_wait > 0; -} - -bool Backend::is_query_active() const -{ - return m_state & BREF_QUERY_ACTIVE; -} - -bool Backend::is_closed() const -{ - return m_state & BREF_CLOSED; -} - -void Backend::set_mapped(bool value) +void SRBackend::set_mapped(bool value) { m_mapped = value; } -bool Backend::is_mapped() const +bool SRBackend::is_mapped() const { return m_mapped; } + +} \ No newline at end of file diff --git a/server/modules/routing/schemarouter/schemarouter.hh b/server/modules/routing/schemarouter/schemarouter.hh index b9ce7b66c..aa5ff84c9 100644 --- a/server/modules/routing/schemarouter/schemarouter.hh +++ b/server/modules/routing/schemarouter/schemarouter.hh @@ -29,27 +29,7 @@ #include #include #include - -#include "session_command.hh" - -using std::list; -using std::set; -using std::string; -using std::tr1::shared_ptr; - -using maxscale::Buffer; - -/** - * The state of the backend server reference - */ -enum bref_state -{ - BREF_IN_USE = 0x01, - BREF_WAITING_RESULT = 0x02, /**< for session commands only */ - BREF_QUERY_ACTIVE = 0x04, /**< for other queries */ - BREF_CLOSED = 0x08, - BREF_DB_MAPPED = 0x10 -}; +#include namespace schemarouter { @@ -65,7 +45,7 @@ struct Config bool debug; /**< Enable verbose debug messages to clients */ pcre2_code* ignore_regex; /**< Regular expression used to ignore databases */ pcre2_match_data* ignore_match_data; /**< Match data for @c ignore_regex */ - set ignored_dbs; /**< Set of ignored databases */ + std::set ignored_dbs; /**< Set of ignored databases */ Config(): refresh_min_interval(0.0), @@ -110,151 +90,23 @@ struct Stats }; /** - * Reference to BACKEND. + * Reference to a backend * * Owned by router client session. */ -class Backend +class SRBackend: public mxs::Backend { public: - /** - * @brief Create new Backend - * - * @param ref Server reference used by this backend - */ - Backend(SERVER_REF *ref); - ~Backend(); + SRBackend(SERVER_REF *ref): + mxs::Backend(ref), + m_mapped(false) + { + } - /** - * @brief Execute the next session command in the queue - * - * @return True if the command was executed successfully - */ - bool execute_session_command(); - - /** - * @brief Add a new session command to the tail of the command queue - * - * @param buffer Session command to add - * @param sequence Sequence identifier of this session command, returned when - * the session command is completed - */ - void add_session_command(GWBUF* buffer, uint64_t sequence); - - /** - * @brief Mark the current session command as successfully executed - * - * This should be called when the response to the command is received - * - * @return The sequence identifier for this session command - */ - uint64_t complete_session_command(); - - /** - * @brief Check if backend has session commands - * - * @return True if backend has session commands - */ - size_t session_command_count() const; - - /** - * @brief Clear state - * - * @param state State to clear - */ - void clear_state(enum bref_state state); - - /** - * @brief Set state - * - * @param state State to set - */ - void set_state(enum bref_state state); - - /** - * @brief Get pointer to server reference - * - * @return Pointer to server reference - */ - SERVER_REF* backend() const; - - /** - * @brief Create a new connection - * - * @param session The session to which the connection is linked - * - * @return True if connection was successfully created - */ - bool connect(MXS_SESSION* session); - - /** - * @brief Close the backend - * - * This will close all active connections created by the backend. - */ - void close(); - - /** - * @brief Get a pointer to the internal DCB - * - * @return Pointer to internal DCB - */ - DCB* dcb() const; - - /** - * @brief Write data to the backend server - * - * @param buffer Buffer containing the data to write - * - * @return True if data was written successfully - */ - bool write(GWBUF* buffer); - - /** - * @brief Store a command - * - * The command is stored and executed once the session can execute - * the next command. - * - * @param buffer Buffer to store - */ - void store_command(GWBUF* buffer); - - /** - * @brief Write the stored command to the backend server - * - * @return True if command was written successfully - */ - bool write_stored_command(); - - /** - * @brief Check if backend is in use - * - * @return True if backend is in use - */ - bool in_use() const; - - /** - * @brief Check if backend is waiting for a result - * - * @return True if backend is waiting for a result - */ - bool is_waiting_result() const; - - /** - * @brief Check if a query is active - * - * @return True if a query is active - */ - bool is_query_active() const; - - /** - * @brief Check if the backend is closed - * - * @return True if the backend is closed - */ - bool is_closed() const; + ~SRBackend() + { + } /** * @brief Set the mapping state of the backend @@ -271,18 +123,10 @@ public: bool is_mapped() const; private: - bool m_closed; /**< True if a connection has been opened and closed */ - SERVER_REF* m_backend; /**< Backend server */ - DCB* m_dcb; /**< Backend DCB */ - bool m_mapped; /**< Whether the backend has been mapped */ - int m_num_result_wait; /**< Number of not yet received results */ - Buffer m_pending_cmd; /**< Pending commands */ - int m_state; /**< State of the backend */ - SessionCommandList m_session_commands; /**< List of session commands that are - * to be executed on this backend server */ + bool m_mapped; /**< Whether the backend has been mapped */ }; -typedef shared_ptr SBackend; -typedef list BackendList; +typedef std::tr1::shared_ptr SSRBackend; +typedef std::list SSRBackendList; } diff --git a/server/modules/routing/schemarouter/schemarouterinstance.cc b/server/modules/routing/schemarouter/schemarouterinstance.cc index a488dccae..02cfd3ab2 100644 --- a/server/modules/routing/schemarouter/schemarouterinstance.cc +++ b/server/modules/routing/schemarouter/schemarouterinstance.cc @@ -34,6 +34,9 @@ using std::string; using std::map; +namespace schemarouter +{ + #define DEFAULT_REFRESH_INTERVAL "300" /** @@ -195,7 +198,7 @@ SchemaRouter* SchemaRouter::create(SERVICE* pService, char** pzOptions) * connections because all servers are supposed to be operational. It is, * however, possible that there are less available servers than expected. */ -bool connect_backend_servers(BackendList& backends, MXS_SESSION* session) +bool connect_backend_servers(SSRBackendList& backends, MXS_SESSION* session) { bool succp = false; int servers_found = 0; @@ -206,7 +209,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session) { MXS_INFO("Servers and connection counts:"); - for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) + for (SSRBackendList::iterator it = backends.begin(); it != backends.end(); it++) { SERVER_REF* b = (*it)->backend(); @@ -222,7 +225,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session) * Scan server list and connect each of them. None should fail or session * can't be established. */ - for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) + for (SSRBackendList::iterator it = backends.begin(); it != backends.end(); it++) { SERVER_REF* b = (*it)->backend(); @@ -262,7 +265,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session) if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) { - for (BackendList::iterator it = backends.begin(); it != backends.end(); it++) + for (SSRBackendList::iterator it = backends.begin(); it != backends.end(); it++) { SERVER_REF* b = (*it)->backend(); @@ -282,13 +285,13 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session) SchemaRouterSession* SchemaRouter::newSession(MXS_SESSION* pSession) { - BackendList backends; + SSRBackendList backends; for (SERVER_REF *ref = m_service->dbref; ref; ref = ref->next) { if (ref->active) { - backends.push_back(SBackend(new Backend(ref))); + backends.push_back(SSRBackend(new SRBackend(ref))); } } @@ -336,8 +339,8 @@ void SchemaRouter::diagnostics(DCB* dcb) json_t* SchemaRouter::diagnostics_json() const { double sescmd_pct = m_stats.n_sescmd != 0 ? - 100.0 * ((double)m_stats.n_sescmd / (double)m_stats.n_queries) : - 0.0; + 100.0 * ((double)m_stats.n_sescmd / (double)m_stats.n_queries) : + 0.0; json_t* rval = json_object(); json_object_set_new(rval, "queries", json_integer(m_stats.n_queries)); @@ -364,6 +367,8 @@ uint64_t SchemaRouter::getCapabilities() return RCAP_TYPE_NONE; } +} + MXS_BEGIN_DECLS /** @@ -384,7 +389,7 @@ MXS_MODULE* MXS_CREATE_MODULE() "A database sharding router for simple sharding", "V1.0.0", RCAP_TYPE_CONTIGUOUS_INPUT, - &SchemaRouter::s_object, + &schemarouter::SchemaRouter::s_object, NULL, /* Process init. */ NULL, /* Process finish. */ NULL, /* Thread init. */ diff --git a/server/modules/routing/schemarouter/schemarouterinstance.hh b/server/modules/routing/schemarouter/schemarouterinstance.hh index b3cb3ca8b..431b11474 100644 --- a/server/modules/routing/schemarouter/schemarouterinstance.hh +++ b/server/modules/routing/schemarouter/schemarouterinstance.hh @@ -22,9 +22,8 @@ #include "schemaroutersession.hh" -using std::string; -using std::set; -using namespace schemarouter; +namespace schemarouter +{ class SchemaRouterSession; @@ -54,3 +53,5 @@ private: SPINLOCK m_lock; /*< Lock for the instance data */ Stats m_stats; /*< Statistics for this router */ }; + +} \ No newline at end of file diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index 1c9d24fd1..d8109ee65 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -12,6 +12,8 @@ */ #include "schemarouter.hh" +#include "schemaroutersession.hh" +#include "schemarouterinstance.hh" #include @@ -19,10 +21,10 @@ #include #include -#include "schemaroutersession.hh" -#include "schemarouterinstance.hh" +namespace schemarouter +{ -bool connect_backend_servers(BackendList& backends, MXS_SESSION* session); +bool connect_backend_servers(SSRBackendList& backends, MXS_SESSION* session); enum route_target get_shard_route_target(uint32_t qtype); bool change_current_db(string& dest, Shard& shard, GWBUF* buf); @@ -30,7 +32,8 @@ bool extract_database(GWBUF* buf, char* str); bool detect_show_shards(GWBUF* query); void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg); -SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router, BackendList& backends): +SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router, + SSRBackendList& backends): mxs::RouterSession(session), m_closed(false), m_client(session->client_dcb), @@ -92,7 +95,7 @@ void SchemaRouterSession::close() { m_closed = true; - for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) + for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { /** The backends are closed here to trigger the shutdown of * the connected DCBs */ @@ -225,7 +228,7 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket, if (TARGET_IS_ANY(route_target)) { - for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) + for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { SERVER *server = (*it)->backend()->server; if (SERVER_IS_RUNNING(server)) @@ -414,7 +417,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) get_shard_dcb(&target_dcb, target->unique_name)) { /** We know where to route this query */ - SBackend bref = get_bref_from_dcb(target_dcb); + SSRBackend bref = get_bref_from_dcb(target_dcb); if (op == QUERY_OP_LOAD) { @@ -447,7 +450,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) gwbuf_free(pPacket); return ret; } -void SchemaRouterSession::handle_mapping_reply(SBackend& bref, GWBUF** pPacket) +void SchemaRouterSession::handle_mapping_reply(SSRBackend& bref, GWBUF** pPacket) { int rc = inspect_mapping_states(bref, pPacket); @@ -481,7 +484,7 @@ void SchemaRouterSession::handle_mapping_reply(SBackend& bref, GWBUF** pPacket) } } -void SchemaRouterSession::process_sescmd_response(SBackend& bref, GWBUF** ppPacket) +void SchemaRouterSession::process_sescmd_response(SSRBackend& bref, GWBUF** ppPacket) { if (bref->session_command_count()) { @@ -519,7 +522,7 @@ void SchemaRouterSession::process_sescmd_response(SBackend& bref, GWBUF** ppPack void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) { - SBackend bref = get_bref_from_dcb(pDcb); + SSRBackend bref = get_bref_from_dcb(pDcb); if (m_closed || bref.get() == NULL) // The bref should always be valid { @@ -591,7 +594,7 @@ void SchemaRouterSession::handleError(GWBUF* pMessage, { ss_dassert(pProblem->dcb_role == DCB_ROLE_BACKEND_HANDLER); CHK_DCB(pProblem); - SBackend bref = get_bref_from_dcb(pProblem); + SSRBackend bref = get_bref_from_dcb(pProblem); if (bref.get() == NULL) // Should never happen { @@ -723,7 +726,7 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) /** Increment the session command count */ ++m_sent_sescmd; - for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) + for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { if ((*it)->in_use()) { @@ -789,7 +792,7 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) */ bool SchemaRouterSession::have_servers() { - for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) + for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { if ((*it)->in_use() && !(*it)->is_closed()) { @@ -808,11 +811,11 @@ bool SchemaRouterSession::have_servers() * * @return backend reference pointer if succeed or NULL */ -SBackend SchemaRouterSession::get_bref_from_dcb(DCB* dcb) +SSRBackend SchemaRouterSession::get_bref_from_dcb(DCB* dcb) { CHK_DCB(dcb); - for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) + for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { if ((*it)->dcb() == dcb) { @@ -822,7 +825,7 @@ SBackend SchemaRouterSession::get_bref_from_dcb(DCB* dcb) // This should not happen ss_dassert(false); - return SBackend(reinterpret_cast(NULL)); + return SSRBackend(reinterpret_cast(NULL)); } /** @@ -1019,13 +1022,13 @@ void SchemaRouterSession::route_queued_query() * @param router_cli_ses Router client session * @return 1 if mapping is done, 0 if it is still ongoing and -1 on error */ -int SchemaRouterSession::inspect_mapping_states(SBackend& bref, +int SchemaRouterSession::inspect_mapping_states(SSRBackend& bref, GWBUF** wbuf) { bool mapped = true; GWBUF* writebuf = *wbuf; - for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) + for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { if (bref->dcb() == (*it)->dcb() && !(*it)->is_mapped()) { @@ -1259,7 +1262,7 @@ bool SchemaRouterSession::ignore_duplicate_database(const char* data) * @return 1 if a complete response was received, 0 if a partial response was received * and -1 if a database was found on more than one server. */ -enum showdb_response SchemaRouterSession::parse_mapping_response(SBackend& bref, GWBUF** buffer) +enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bref, GWBUF** buffer) { unsigned char* ptr; SERVER* target = bref->backend()->server; @@ -1378,7 +1381,7 @@ enum showdb_response SchemaRouterSession::parse_mapping_response(SBackend& bref, void SchemaRouterSession::query_databases() { - for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) + for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { (*it)->set_mapped(false); } @@ -1389,7 +1392,7 @@ void SchemaRouterSession::query_databases() GWBUF *buffer = modutil_create_query("SHOW DATABASES"); gwbuf_set_type(buffer, GWBUF_TYPE_COLLECT_RESULT); - for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) + for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { if ((*it)->in_use() && !(*it)->is_closed() & SERVER_IS_RUNNING((*it)->backend()->server)) @@ -1497,7 +1500,7 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) } else if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER) { - for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) + for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { char *srvnm = (*it)->backend()->server->unique_name; @@ -1546,7 +1549,7 @@ bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name) bool succp = false; ss_dassert(p_dcb != NULL && *(p_dcb) == NULL); - for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) + for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) { SERVER_REF* b = (*it)->backend(); /** @@ -1661,3 +1664,5 @@ bool SchemaRouterSession::send_databases() return rval; } + +} \ No newline at end of file diff --git a/server/modules/routing/schemarouter/schemaroutersession.hh b/server/modules/routing/schemarouter/schemaroutersession.hh index 43908e6a7..bbe277403 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.hh +++ b/server/modules/routing/schemarouter/schemaroutersession.hh @@ -19,15 +19,12 @@ #include #include +#include #include "shard_map.hh" -#include "session_command.hh" - -using std::string; -using std::list; - -using namespace schemarouter; +namespace schemarouter +{ /** * Bitmask values for the router session's initialization. These values are used * to prevent responses from internal commands being forwarded to the client. @@ -80,7 +77,7 @@ class SchemaRouterSession: public mxs::RouterSession { public: - SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router, BackendList& backends); + SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router, SSRBackendList& backends); /** * The RouterSession instance will be deleted when a client session @@ -128,7 +125,7 @@ private: /** Helper functions */ SERVER* get_shard_target(GWBUF* buffer, uint32_t qtype); - SBackend get_bref_from_dcb(DCB* dcb); + SSRBackend get_bref_from_dcb(DCB* dcb); bool get_shard_dcb(DCB** dcb, char* name); bool have_servers(); bool handle_default_db(); @@ -136,7 +133,7 @@ private: /** Routing functions */ bool route_session_write(GWBUF* querybuf, uint8_t command); - void process_sescmd_response(SBackend& bref, GWBUF** ppPacket); + void process_sescmd_response(SSRBackend& bref, GWBUF** ppPacket); SERVER* resolve_query_target(GWBUF* pPacket, uint32_t type, uint8_t command, enum route_target& route_target); @@ -144,26 +141,27 @@ private: bool send_databases(); bool send_shards(); void query_databases(); - int inspect_mapping_states(SBackend& bref, GWBUF** wbuf); - enum showdb_response parse_mapping_response(SBackend& bref, GWBUF** buffer); + int inspect_mapping_states(SSRBackend& bref, GWBUF** wbuf); + enum showdb_response parse_mapping_response(SSRBackend& bref, GWBUF** buffer); void route_queued_query(); void synchronize_shards(); - void handle_mapping_reply(SBackend& bref, GWBUF** pPacket); + void handle_mapping_reply(SSRBackend& bref, GWBUF** pPacket); /** Member variables */ - bool m_closed; /**< True if session closed */ - DCB* m_client; /**< The client DCB */ - MYSQL_session* m_mysql_session; /**< Session client data (username, password, SHA1). */ - BackendList m_backends; /**< Backend references */ - Config* m_config; /**< Pointer to router config */ - SchemaRouter* m_router; /**< The router instance */ - Shard m_shard; /**< Database to server mapping */ - string m_connect_db; /**< Database the user was trying to connect to */ - string m_current_db; /**< Current active database */ - int m_state; /**< Initialization state bitmask */ - list m_queue; /**< Query that was received before the session was ready */ - Stats m_stats; /**< Statistics for this router */ - uint64_t m_sent_sescmd; /**< The latest session command being executed */ - uint64_t m_replied_sescmd; /**< The last session command reply that was sent to the client */ - SERVER* m_load_target; /**< Target for LOAD DATA LOCAL INFILE */ + bool m_closed; /**< True if session closed */ + DCB* m_client; /**< The client DCB */ + MYSQL_session* m_mysql_session; /**< Session client data (username, password, SHA1). */ + SSRBackendList m_backends; /**< Backend references */ + Config* m_config; /**< Pointer to router config */ + SchemaRouter* m_router; /**< The router instance */ + Shard m_shard; /**< Database to server mapping */ + std::string m_connect_db; /**< Database the user was trying to connect to */ + std::string m_current_db; /**< Current active database */ + int m_state; /**< Initialization state bitmask */ + std::list m_queue; /**< Query that was received before the session was ready */ + Stats m_stats; /**< Statistics for this router */ + uint64_t m_sent_sescmd; /**< The latest session command being executed */ + uint64_t m_replied_sescmd; /**< The last session command reply that was sent to the client */ + SERVER* m_load_target; /**< Target for LOAD DATA LOCAL INFILE */ }; +} \ No newline at end of file