Replace session command implementation

The schemarouter now uses the new session commands. It uses a standard
library container to manage the execution and storage of session commands.

The session command history is disabled until a more complete refactoring
can be done.
This commit is contained in:
Markus Mäkelä
2017-03-26 11:42:33 +03:00
parent 1ba399a62a
commit bda0fd2db0
4 changed files with 149 additions and 720 deletions

File diff suppressed because it is too large Load Diff

View File

@ -146,25 +146,6 @@ typedef enum rses_property_type_t
strncmp(s,"LEAST_CURRENT_OPERATIONS", strlen("LEAST_CURRENT_OPERATIONS")) == 0 ? \ strncmp(s,"LEAST_CURRENT_OPERATIONS", strlen("LEAST_CURRENT_OPERATIONS")) == 0 ? \
LEAST_CURRENT_OPERATIONS : UNDEFINED_CRITERIA)))) LEAST_CURRENT_OPERATIONS : UNDEFINED_CRITERIA))))
/**
* Session variable command
*/
typedef struct mysql_sescmd_st
{
#if defined(SS_DEBUG)
skygw_chk_t my_sescmd_chk_top;
#endif
rses_property_t* my_sescmd_prop; /*< Parent property */
GWBUF* my_sescmd_buf; /*< Query buffer */
unsigned char my_sescmd_packet_type;/*< Packet type */
bool my_sescmd_is_replied; /*< Is cmd replied to client */
int position; /*< Position of this command */
#if defined(SS_DEBUG)
skygw_chk_t my_sescmd_chk_tail;
#endif
} mysql_sescmd_t;
/** /**
* Property structure * Property structure
*/ */
@ -178,7 +159,6 @@ struct rses_property_st
rses_property_type_t rses_prop_type; /*< Property type */ rses_property_type_t rses_prop_type; /*< Property type */
union rses_prop_data union rses_prop_data
{ {
mysql_sescmd_t sescmd; /*< Session commands */
HASHTABLE* temp_tables; /*< Hashtable of table names */ HASHTABLE* temp_tables; /*< Hashtable of table names */
} rses_prop_data; } rses_prop_data;
rses_property_t* rses_prop_next; /*< Next property of same type */ rses_property_t* rses_prop_next; /*< Next property of same type */
@ -187,21 +167,6 @@ struct rses_property_st
#endif #endif
}; };
typedef struct sescmd_cursor_st
{
#if defined(SS_DEBUG)
skygw_chk_t scmd_cur_chk_top;
#endif
SCHEMAROUTER_SESSION* scmd_cur_rses; /*< pointer to owning router session */
rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */
mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */
bool scmd_cur_active; /*< true if command is being executed */
int position; /*< Position of this cursor */
#if defined(SS_DEBUG)
skygw_chk_t scmd_cur_chk_tail;
#endif
} sescmd_cursor_t;
/** /**
* Internal structure used to define the set of backend servers we are routing * Internal structure used to define the set of backend servers we are routing
* connections to. This provides the storage for routing module specific data * connections to. This provides the storage for routing module specific data
@ -253,7 +218,6 @@ typedef struct backend_ref_st
bool bref_mapped; /*< Whether the backend has been mapped */ bool bref_mapped; /*< Whether the backend has been mapped */
bool last_sescmd_replied; bool last_sescmd_replied;
int bref_num_result_wait; /*< Number of not yet received results */ int bref_num_result_wait; /*< Number of not yet received results */
sescmd_cursor_t bref_sescmd_cur; /*< Session command cursor */
GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */ GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */
SessionCommandList session_commands; /**< List of session commands that are SessionCommandList session_commands; /**< List of session commands that are
@ -271,8 +235,6 @@ typedef struct schemarouter_config_st
int rw_max_slave_conn_percent; int rw_max_slave_conn_percent;
int rw_max_slave_conn_count; int rw_max_slave_conn_count;
mxs_target_t rw_use_sql_variables_in; mxs_target_t rw_use_sql_variables_in;
int max_sescmd_hist;
bool disable_sescmd_hist;
time_t last_refresh; /*< Last time the database list was refreshed */ time_t last_refresh; /*< Last time the database list was refreshed */
double refresh_min_interval; /*< Minimum required interval between refreshes of databases */ double refresh_min_interval; /*< Minimum required interval between refreshes of databases */
bool refresh_databases; /*< Are databases refreshed when they are not found in the hashtable */ bool refresh_databases; /*< Are databases refreshed when they are not found in the hashtable */
@ -329,6 +291,9 @@ struct schemarouter_session
ROUTER_STATS stats; /*< Statistics for this router */ ROUTER_STATS stats; /*< Statistics for this router */
int n_sescmd; int n_sescmd;
int pos_generator; int pos_generator;
uint64_t sent_sescmd; /**< The latest session command being executed */
uint64_t replied_sescmd; /**< The last session command reply that was sent to the client */
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail; skygw_chk_t rses_chk_tail;
#endif #endif

View File

@ -13,6 +13,7 @@
#include "session_command.hh" #include "session_command.hh"
#include <maxscale/modutil.h> #include <maxscale/modutil.h>
#include <maxscale/protocol/mysql.h>
void SessionCommand::mark_reply_received() void SessionCommand::mark_reply_received()
{ {
@ -24,15 +25,31 @@ bool SessionCommand::is_reply_received() const
return m_replySent; return m_replySent;
} }
uint8_t SessionCommand::get_command() const
{
return m_command;
}
uint64_t SessionCommand::get_position() const
{
return m_pos;
}
Buffer SessionCommand::copy_buffer() const Buffer SessionCommand::copy_buffer() const
{ {
return m_buffer; return m_buffer;
} }
SessionCommand::SessionCommand(GWBUF *buffer): SessionCommand::SessionCommand(GWBUF *buffer, uint64_t id):
m_buffer(buffer), m_buffer(buffer),
m_command(0),
m_pos(id),
m_replySent(false) m_replySent(false)
{ {
if (buffer)
{
gwbuf_copy_data(buffer, MYSQL_HEADER_LEN, 1, &m_command);
}
} }
SessionCommand::~SessionCommand() SessionCommand::~SessionCommand()
@ -42,15 +59,18 @@ SessionCommand::~SessionCommand()
std::string SessionCommand::to_string() std::string SessionCommand::to_string()
{ {
std::string str; std::string str;
GWBUF **buf = &m_buffer;
char *sql; char *sql;
int sql_len; int sql_len;
if (modutil_extract_SQL(*buf, &sql, &sql_len)) /** TODO: Create C++ versions of modutil functions */
GWBUF *buf = m_buffer.release();
if (modutil_extract_SQL(buf, &sql, &sql_len))
{ {
str.append(sql, sql_len); str.append(sql, sql_len);
} }
m_buffer.reset(buf);
return str; return str;
} }

View File

@ -37,6 +37,20 @@ public:
*/ */
bool is_reply_received() const; bool is_reply_received() const;
/**
* @brief Get the command type of the session command
*
* @return The type of the command
*/
uint8_t get_command() const;
/**
* @brief Get the position of this session command
*
* @return The position of the session command
*/
uint64_t get_position() const;
/** /**
* @brief Creates a copy of the internal buffer * @brief Creates a copy of the internal buffer
* @return A copy of the internal buffer * @return A copy of the internal buffer
@ -48,8 +62,9 @@ public:
* *
* @param buffer The buffer containing the command. Note that the ownership * @param buffer The buffer containing the command. Note that the ownership
* of @c buffer is transferred to this object. * of @c buffer is transferred to this object.
* @param id A unique position identifier used to track replies
*/ */
SessionCommand(GWBUF *buffer); SessionCommand(GWBUF *buffer, uint64_t id);
~SessionCommand(); ~SessionCommand();
@ -62,6 +77,8 @@ public:
private: private:
Buffer m_buffer; /**< The buffer containing the command */ Buffer m_buffer; /**< The buffer containing the command */
uint8_t m_command; /**< The command being executed */
uint64_t m_pos; /**< Unique position identifier */
bool m_replySent; /**< Whether the session command reply has been sent */ bool m_replySent; /**< Whether the session command reply has been sent */
SessionCommand(); SessionCommand();