diff --git a/server/modules/routing/schemarouter/CMakeLists.txt b/server/modules/routing/schemarouter/CMakeLists.txt index 4eec2e949..4be7c2221 100644 --- a/server/modules/routing/schemarouter/CMakeLists.txt +++ b/server/modules/routing/schemarouter/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(schemarouter SHARED schemarouter.cc shard_map.cc) +add_library(schemarouter SHARED schemarouter.cc shard_map.cc session_command.cc) target_link_libraries(schemarouter maxscale-common) add_dependencies(schemarouter pcre2) set_target_properties(schemarouter PROPERTIES VERSION "1.0.0") diff --git a/server/modules/routing/schemarouter/schemarouter.cc b/server/modules/routing/schemarouter/schemarouter.cc index 324126d2e..d26929e8f 100644 --- a/server/modules/routing/schemarouter/schemarouter.cc +++ b/server/modules/routing/schemarouter/schemarouter.cc @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -30,6 +31,8 @@ #include #include +using std::string; + #define DEFAULT_REFRESH_INTERVAL "300" /** Hashtable size for the per user shard maps */ @@ -569,9 +572,10 @@ static route_target_t get_shard_route_target(uint32_t qtype, * These queries are not affected by hints */ if (qc_query_is_type(qtype, QUERY_TYPE_SESSION_WRITE) || + qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_WRITE) || + qc_query_is_type(qtype, QUERY_TYPE_USERVAR_WRITE) || qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) || qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT) || - qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_WRITE) || /** enable or disable autocommit are always routed to all */ qc_query_is_type(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) || qc_query_is_type(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT)) @@ -1793,6 +1797,18 @@ static bool route_session_write(SCHEMAROUTER_SESSION* router_cli_ses, { if (BREF_IS_IN_USE((&backend_ref[i]))) { + GWBUF *buffer = gwbuf_clone(querybuf); + backend_ref[i].session_commands.push_back(buffer); + + for (SessionCommandList::iterator iter = backend_ref[i].session_commands.begin(); + iter != backend_ref[i].session_commands.end(); + iter++) + { + SessionCommand& scmd = *iter; + string str = scmd.to_string(); + MXS_INFO("%s: %s", backend_ref[i].bref_backend->server->unique_name, str.c_str()); + } + sescmd_cursor_t* scur; if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) diff --git a/server/modules/routing/schemarouter/schemarouter.h b/server/modules/routing/schemarouter/schemarouter.h index ebbedbe72..25c7c0cab 100644 --- a/server/modules/routing/schemarouter/schemarouter.h +++ b/server/modules/routing/schemarouter/schemarouter.h @@ -34,6 +34,7 @@ #include #include "shard_map.hh" +#include "session_command.hh" MXS_BEGIN_DECLS @@ -254,6 +255,9 @@ typedef struct backend_ref_st int bref_num_result_wait; /*< Number of not yet received results */ sescmd_cursor_t bref_sescmd_cur; /*< Session command cursor */ GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */ + + SessionCommandList session_commands; /**< List of session commands that are + * to be executed on this backend server */ #if defined(SS_DEBUG) skygw_chk_t bref_chk_tail; #endif diff --git a/server/modules/routing/schemarouter/session_command.cc b/server/modules/routing/schemarouter/session_command.cc new file mode 100644 index 000000000..409a88edd --- /dev/null +++ b/server/modules/routing/schemarouter/session_command.cc @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "session_command.hh" +#include + +void SessionCommand::mark_reply_received() +{ + m_replySent = true; +} + +bool SessionCommand::is_reply_received() const +{ + return m_replySent; +} + +Buffer SessionCommand::copy_buffer() const +{ + return m_buffer; +} + +SessionCommand::SessionCommand(GWBUF *buffer): + m_buffer(buffer), + m_replySent(false) +{ +} + +SessionCommand::~SessionCommand() +{ +} + +std::string SessionCommand::to_string() +{ + std::string str; + + GWBUF **buf = &m_buffer; + char *sql; + int sql_len; + + if (modutil_extract_SQL(*buf, &sql, &sql_len)) + { + str.append(sql, sql_len); + } + + return str; +} diff --git a/server/modules/routing/schemarouter/session_command.hh b/server/modules/routing/schemarouter/session_command.hh new file mode 100644 index 000000000..569e5ffc6 --- /dev/null +++ b/server/modules/routing/schemarouter/session_command.hh @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#pragma once + +#include +#include + +#include + +using namespace maxscale; + +class SessionCommand; +typedef std::list SessionCommandList; + +class SessionCommand +{ +public: + /** + * @brief Mark reply as received + */ + void mark_reply_received(); + + /** + * @brief Check if the session command has received a reply + * @return True if the reply is already received + */ + bool is_reply_received() const; + + /** + * @brief Creates a copy of the internal buffer + * @return A copy of the internal buffer + */ + Buffer copy_buffer() const; + + /** + * @brief Create a new session command + * + * @param buffer The buffer containing the command. Note that the ownership + * of @c buffer is transferred to this object. + */ + SessionCommand(GWBUF *buffer); + + ~SessionCommand(); + + /** + * @brief Debug function for printing session commands + * + * @return String representation of the object + */ + std::string to_string(); + +private: + Buffer m_buffer; /**< The buffer containing the command */ + bool m_replySent; /**< Whether the session command reply has been sent */ + + SessionCommand(); + SessionCommand& operator = (const SessionCommand& command); +};