Add session command class

The class abstracts the session commands and the lists of session commands
using standard library containers.
This commit is contained in:
Markus Mäkelä 2017-03-24 15:04:04 +02:00
parent 9e4e70a337
commit d8abefff5f
5 changed files with 147 additions and 2 deletions

View File

@ -1,4 +1,4 @@
add_library(schemarouter SHARED schemarouter.cc shard_map.cc)
add_library(schemarouter SHARED schemarouter.cc shard_map.cc session_command.cc)
target_link_libraries(schemarouter maxscale-common)
add_dependencies(schemarouter pcre2)
set_target_properties(schemarouter PROPERTIES VERSION "1.0.0")

View File

@ -18,6 +18,7 @@
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <string>
#include <maxscale/alloc.h>
#include <maxscale/buffer.h>
@ -30,6 +31,8 @@
#include <maxscale/secrets.h>
#include <maxscale/spinlock.h>
using std::string;
#define DEFAULT_REFRESH_INTERVAL "300"
/** Hashtable size for the per user shard maps */
@ -569,9 +572,10 @@ static route_target_t get_shard_route_target(uint32_t qtype,
* These queries are not affected by hints
*/
if (qc_query_is_type(qtype, QUERY_TYPE_SESSION_WRITE) ||
qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_WRITE) ||
qc_query_is_type(qtype, QUERY_TYPE_USERVAR_WRITE) ||
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) ||
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT) ||
qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_WRITE) ||
/** enable or disable autocommit are always routed to all */
qc_query_is_type(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) ||
qc_query_is_type(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT))
@ -1793,6 +1797,18 @@ static bool route_session_write(SCHEMAROUTER_SESSION* router_cli_ses,
{
if (BREF_IS_IN_USE((&backend_ref[i])))
{
GWBUF *buffer = gwbuf_clone(querybuf);
backend_ref[i].session_commands.push_back(buffer);
for (SessionCommandList::iterator iter = backend_ref[i].session_commands.begin();
iter != backend_ref[i].session_commands.end();
iter++)
{
SessionCommand& scmd = *iter;
string str = scmd.to_string();
MXS_INFO("%s: %s", backend_ref[i].bref_backend->server->unique_name, str.c_str());
}
sescmd_cursor_t* scur;
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))

View File

@ -34,6 +34,7 @@
#include <maxscale/pcre2.h>
#include "shard_map.hh"
#include "session_command.hh"
MXS_BEGIN_DECLS
@ -254,6 +255,9 @@ typedef struct backend_ref_st
int bref_num_result_wait; /*< Number of not yet received results */
sescmd_cursor_t bref_sescmd_cur; /*< Session command cursor */
GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */
SessionCommandList session_commands; /**< List of session commands that are
* to be executed on this backend server */
#if defined(SS_DEBUG)
skygw_chk_t bref_chk_tail;
#endif

View File

@ -0,0 +1,56 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "session_command.hh"
#include <maxscale/modutil.h>
void SessionCommand::mark_reply_received()
{
m_replySent = true;
}
bool SessionCommand::is_reply_received() const
{
return m_replySent;
}
Buffer SessionCommand::copy_buffer() const
{
return m_buffer;
}
SessionCommand::SessionCommand(GWBUF *buffer):
m_buffer(buffer),
m_replySent(false)
{
}
SessionCommand::~SessionCommand()
{
}
std::string SessionCommand::to_string()
{
std::string str;
GWBUF **buf = &m_buffer;
char *sql;
int sql_len;
if (modutil_extract_SQL(*buf, &sql, &sql_len))
{
str.append(sql, sql_len);
}
return str;
}

View File

@ -0,0 +1,69 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#pragma once
#include <list>
#include <string>
#include <maxscale/buffer.hh>
using namespace maxscale;
class SessionCommand;
typedef std::list<SessionCommand> SessionCommandList;
class SessionCommand
{
public:
/**
* @brief Mark reply as received
*/
void mark_reply_received();
/**
* @brief Check if the session command has received a reply
* @return True if the reply is already received
*/
bool is_reply_received() const;
/**
* @brief Creates a copy of the internal buffer
* @return A copy of the internal buffer
*/
Buffer copy_buffer() const;
/**
* @brief Create a new session command
*
* @param buffer The buffer containing the command. Note that the ownership
* of @c buffer is transferred to this object.
*/
SessionCommand(GWBUF *buffer);
~SessionCommand();
/**
* @brief Debug function for printing session commands
*
* @return String representation of the object
*/
std::string to_string();
private:
Buffer m_buffer; /**< The buffer containing the command */
bool m_replySent; /**< Whether the session command reply has been sent */
SessionCommand();
SessionCommand& operator = (const SessionCommand& command);
};