Merge branch 'develop' into MXS-1266

This commit is contained in:
MassimilianoPinto
2017-06-29 11:31:56 +02:00
121 changed files with 4956 additions and 5528 deletions

View File

@ -575,9 +575,12 @@ int extract_type_length(const char* ptr, char *dest)
}
/** Store type */
int typelen = ptr - start;
memcpy(dest, start, typelen);
dest[typelen] = '\0';
for (const char* c = start; c < ptr; c++)
{
*dest++ = tolower(*c);
}
*dest++ = '\0';
/** Skip whitespace */
while (*ptr && isspace(*ptr))
@ -880,7 +883,7 @@ void read_alter_identifier(const char *sql, const char *end, char *dest, int siz
void make_avro_token(char* dest, const char* src, int length)
{
while (*src == '(' || *src == ')' || *src == '`' || isspace(*src))
while (length > 0 && (*src == '(' || *src == ')' || *src == '`' || isspace(*src)))
{
src++;
length--;
@ -902,16 +905,17 @@ void make_avro_token(char* dest, const char* src, int length)
fix_reserved_word(dest);
}
int get_column_index(TABLE_CREATE *create, const char *tok)
int get_column_index(TABLE_CREATE *create, const char *tok, int len)
{
int idx = -1;
char safe_tok[strlen(tok) + 2];
strcpy(safe_tok, tok);
char safe_tok[len + 2];
memcpy(safe_tok, tok, len);
safe_tok[len] = '\0';
fix_reserved_word(safe_tok);
for (int x = 0; x < create->columns; x++)
{
if (strcasecmp(create->column_names[x], tok) == 0)
if (strcasecmp(create->column_names[x], safe_tok) == 0)
{
idx = x;
break;
@ -950,18 +954,17 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
{
tok = get_tok(tok + len, &len, end);
char ** tmp = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns + 1);
ss_dassert(tmp);
create->column_names = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns + 1);
create->column_types = MXS_REALLOC(create->column_types, sizeof(char*) * create->columns + 1);
create->column_lengths = MXS_REALLOC(create->column_lengths, sizeof(int) * create->columns + 1);
if (tmp == NULL)
{
return false;
}
create->column_names = tmp;
char avro_token[len + 1];
make_avro_token(avro_token, tok, len);
char field_type[200] = ""; // Enough to hold all types
int field_length = extract_type_length(tok + len, field_type);
create->column_names[create->columns] = MXS_STRDUP_A(avro_token);
create->column_types[create->columns] = MXS_STRDUP_A(field_type);
create->column_lengths[create->columns] = field_length;
create->columns++;
updates++;
tok = get_next_def(tok, end);
@ -971,25 +974,22 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
{
tok = get_tok(tok + len, &len, end);
int idx = get_column_index(create, tok);
int idx = get_column_index(create, tok, len);
if (idx != -1)
{
MXS_FREE(create->column_names[idx]);
MXS_FREE(create->column_types[idx]);
for (int i = idx; i < (int)create->columns - 1; i++)
{
create->column_names[i] = create->column_names[i + 1];
create->column_types[i] = create->column_types[i + 1];
create->column_lengths[i] = create->column_lengths[i + 1];
}
char ** tmp = realloc(create->column_names, sizeof(char*) * create->columns - 1);
ss_dassert(tmp);
if (tmp == NULL)
{
return false;
}
create->column_names = tmp;
create->column_names = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns - 1);
create->column_types = MXS_REALLOC(create->column_types, sizeof(char*) * create->columns - 1);
create->column_lengths = MXS_REALLOC(create->column_lengths, sizeof(int) * create->columns - 1);
create->columns--;
updates++;
}
@ -1001,12 +1001,19 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
{
tok = get_tok(tok + len, &len, end);
int idx = get_column_index(create, tok);
int idx = get_column_index(create, tok, len);
if (idx != -1)
if (idx != -1 && (tok = get_tok(tok + len, &len, end)))
{
MXS_FREE(create->column_names[idx]);
create->column_names[idx] = strndup(tok, len);
MXS_FREE(create->column_types[idx]);
char avro_token[len + 1];
make_avro_token(avro_token, tok, len);
char field_type[200] = ""; // Enough to hold all types
int field_length = extract_type_length(tok + len, field_type);
create->column_names[idx] = MXS_STRDUP_A(avro_token);
create->column_types[idx] = MXS_STRDUP_A(field_type);
create->column_lengths[idx] = field_length;
updates++;
}
@ -1021,7 +1028,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
}
/** Only increment the create version if it has an associated .avro
* file. The .avro file is only created if it is acutally used. */
* file. The .avro file is only created if it is actually used. */
if (updates > 0 && create->was_used)
{
create->version++;

View File

@ -505,7 +505,7 @@ closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
static void log_closed_session(mysql_server_cmd_t mysql_command, bool is_closed,
SERVER_REF *ref)
{
char msg[MAX_SERVER_NAME_LEN + 200] = ""; // Extra space for message
char msg[MAX_SERVER_ADDRESS_LEN + 200] = ""; // Extra space for message
if (is_closed)
{

View File

@ -1,4 +1,11 @@
add_library(readwritesplit SHARED readwritesplit.c rwsplit_mysql.c rwsplit_route_stmt.c rwsplit_select_backends.c rwsplit_session_cmd.c rwsplit_tmp_table_multi.c)
add_library(readwritesplit SHARED
readwritesplit.cc
rwsplit_mysql.cc
rwsplit_route_stmt.cc
rwsplit_select_backends.cc
rwsplit_session_cmd.cc
rwsplit_tmp_table_multi.cc
rwsplit_ps.cc)
target_link_libraries(readwritesplit maxscale-common MySQLCommon)
set_target_properties(readwritesplit PROPERTIES VERSION "1.0.2")
install_module(readwritesplit core)

View File

@ -1,424 +0,0 @@
#pragma once
#ifndef _RWSPLITROUTER_H
#define _RWSPLITROUTER_H
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
/**
* @file router.h - The read write split router module heder file
*
* @verbatim
* Revision History
*
* See GitHub https://github.com/mariadb-corporation/MaxScale
*
* @endverbatim
*/
#define MXS_MODULE_NAME "readwritesplit"
#include <maxscale/cdefs.h>
#include <math.h>
#include <maxscale/dcb.h>
#include <maxscale/hashtable.h>
#include <maxscale/router.h>
#include <maxscale/service.h>
MXS_BEGIN_DECLS
typedef enum bref_state
{
BREF_IN_USE = 0x01,
BREF_WAITING_RESULT = 0x02, /*< for session commands only */
BREF_QUERY_ACTIVE = 0x04, /*< for other queries */
BREF_CLOSED = 0x08,
BREF_FATAL_FAILURE = 0x10 /*< Backend references that should be dropped */
} bref_state_t;
#define BREF_IS_NOT_USED(s) ((s)->bref_state & ~BREF_IN_USE)
#define BREF_IS_IN_USE(s) ((s)->bref_state & BREF_IN_USE)
#define BREF_IS_WAITING_RESULT(s) ((s)->bref_num_result_wait > 0)
#define BREF_IS_QUERY_ACTIVE(s) ((s)->bref_state & BREF_QUERY_ACTIVE)
#define BREF_IS_CLOSED(s) ((s)->bref_state & BREF_CLOSED)
#define BREF_HAS_FAILED(s) ((s)->bref_state & BREF_FATAL_FAILURE)
typedef enum backend_type_t
{
BE_UNDEFINED = -1,
BE_MASTER,
BE_JOINED = BE_MASTER,
BE_SLAVE,
BE_COUNT
} backend_type_t;
struct router_instance;
typedef enum
{
TARGET_UNDEFINED = 0x00,
TARGET_MASTER = 0x01,
TARGET_SLAVE = 0x02,
TARGET_NAMED_SERVER = 0x04,
TARGET_ALL = 0x08,
TARGET_RLAG_MAX = 0x10
} route_target_t;
#define TARGET_IS_MASTER(t) (t & TARGET_MASTER)
#define TARGET_IS_SLAVE(t) (t & TARGET_SLAVE)
#define TARGET_IS_NAMED_SERVER(t) (t & TARGET_NAMED_SERVER)
#define TARGET_IS_ALL(t) (t & TARGET_ALL)
#define TARGET_IS_RLAG_MAX(t) (t & TARGET_RLAG_MAX)
typedef struct rses_property_st rses_property_t;
typedef struct router_client_session ROUTER_CLIENT_SES;
typedef enum rses_property_type_t
{
RSES_PROP_TYPE_UNDEFINED = -1,
RSES_PROP_TYPE_SESCMD = 0,
RSES_PROP_TYPE_FIRST = RSES_PROP_TYPE_SESCMD,
RSES_PROP_TYPE_TMPTABLES,
RSES_PROP_TYPE_LAST = RSES_PROP_TYPE_TMPTABLES,
RSES_PROP_TYPE_COUNT = RSES_PROP_TYPE_LAST + 1
} rses_property_type_t;
/**
* This criteria is used when backends are chosen for a router session's use.
* Backend servers are sorted to ascending order according to the criteria
* and top N are chosen.
*/
typedef enum select_criteria
{
UNDEFINED_CRITERIA = 0,
LEAST_GLOBAL_CONNECTIONS, /*< all connections established by MaxScale */
LEAST_ROUTER_CONNECTIONS, /*< connections established by this router */
LEAST_BEHIND_MASTER,
LEAST_CURRENT_OPERATIONS,
DEFAULT_CRITERIA = LEAST_CURRENT_OPERATIONS,
LAST_CRITERIA /*< not used except for an index */
} select_criteria_t;
static inline const char* select_criteria_to_str(select_criteria_t type)
{
switch (type)
{
case LEAST_GLOBAL_CONNECTIONS:
return "LEAST_GLOBAL_CONNECTIONS";
case LEAST_ROUTER_CONNECTIONS:
return "LEAST_ROUTER_CONNECTIONS";
case LEAST_BEHIND_MASTER:
return "LEAST_BEHIND_MASTER";
case LEAST_CURRENT_OPERATIONS:
return "LEAST_CURRENT_OPERATIONS";
default:
return "UNDEFINED_CRITERIA";
}
}
/**
* Controls how master failure is handled
*/
enum failure_mode
{
RW_FAIL_INSTANTLY, /**< Close the connection as soon as the master is lost */
RW_FAIL_ON_WRITE, /**< Close the connection when the first write is received */
RW_ERROR_ON_WRITE /**< Don't close the connection but send an error for writes */
};
static inline const char* failure_mode_to_str(enum failure_mode type)
{
switch (type)
{
case RW_FAIL_INSTANTLY:
return "fail_instantly";
case RW_FAIL_ON_WRITE:
return "fail_on_write";
case RW_ERROR_ON_WRITE:
return "error_on_write";
default:
ss_dassert(false);
return "UNDEFINED_MODE";
}
}
/** default values for rwsplit configuration parameters */
#define CONFIG_MAX_SLAVE_CONN 1
#define CONFIG_MAX_SLAVE_RLAG -1 /*< not used */
#define CONFIG_SQL_VARIABLES_IN TYPE_ALL
#define GET_SELECT_CRITERIA(s) \
(strncmp(s,"LEAST_GLOBAL_CONNECTIONS", strlen("LEAST_GLOBAL_CONNECTIONS")) == 0 ? \
LEAST_GLOBAL_CONNECTIONS : ( \
strncmp(s,"LEAST_BEHIND_MASTER", strlen("LEAST_BEHIND_MASTER")) == 0 ? \
LEAST_BEHIND_MASTER : ( \
strncmp(s,"LEAST_ROUTER_CONNECTIONS", strlen("LEAST_ROUTER_CONNECTIONS")) == 0 ? \
LEAST_ROUTER_CONNECTIONS : ( \
strncmp(s,"LEAST_CURRENT_OPERATIONS", strlen("LEAST_CURRENT_OPERATIONS")) == 0 ? \
LEAST_CURRENT_OPERATIONS : UNDEFINED_CRITERIA))))
/**
* Session variable command
*/
typedef struct mysql_sescmd_st
{
#if defined(SS_DEBUG)
skygw_chk_t my_sescmd_chk_top;
#endif
rses_property_t* my_sescmd_prop; /*< parent property */
GWBUF* my_sescmd_buf; /*< query buffer */
unsigned char my_sescmd_packet_type; /*< packet type */
bool my_sescmd_is_replied; /*< is cmd replied to client */
unsigned char reply_cmd; /*< The reply command. One of OK, ERR, RESULTSET or
* LOCAL_INFILE. Slave servers are compared to this
* when they return session command replies.*/
int position; /*< Position of this command */
#if defined(SS_DEBUG)
skygw_chk_t my_sescmd_chk_tail;
#endif
} mysql_sescmd_t;
/**
* Property structure
*/
struct rses_property_st
{
#if defined(SS_DEBUG)
skygw_chk_t rses_prop_chk_top;
#endif
ROUTER_CLIENT_SES* rses_prop_rsession; /*< parent router session */
int rses_prop_refcount;
rses_property_type_t rses_prop_type;
union rses_prop_data
{
mysql_sescmd_t sescmd;
HASHTABLE* temp_tables;
} rses_prop_data;
rses_property_t* rses_prop_next; /*< next property of same type */
#if defined(SS_DEBUG)
skygw_chk_t rses_prop_chk_tail;
#endif
} ;
typedef struct sescmd_cursor_st
{
#if defined(SS_DEBUG)
skygw_chk_t scmd_cur_chk_top;
#endif
ROUTER_CLIENT_SES* scmd_cur_rses; /*< pointer to owning router session */
rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */
mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */
bool scmd_cur_active; /*< true if command is being executed */
int position; /*< Position of this cursor */
#if defined(SS_DEBUG)
skygw_chk_t scmd_cur_chk_tail;
#endif
} sescmd_cursor_t;
/** Enum for tracking client reply state */
typedef enum
{
REPLY_STATE_START, /**< Query sent to backend */
REPLY_STATE_DONE, /**< Complete reply received */
REPLY_STATE_RSET_COLDEF, /**< Resultset response, waiting for column definitions */
REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */
} reply_state_t;
/**
* Helper function to convert reply_state_t to string
*/
static inline const char* rstostr(reply_state_t state)
{
switch (state)
{
case REPLY_STATE_START:
return "REPLY_STATE_START";
case REPLY_STATE_DONE:
return "REPLY_STATE_DONE";
case REPLY_STATE_RSET_COLDEF:
return "REPLY_STATE_RSET_COLDEF";
case REPLY_STATE_RSET_ROWS:
return "REPLY_STATE_RSET_ROWS";
}
ss_dassert(false);
return "UNKNOWN";
}
/**
* Reference to BACKEND.
*
* Owned by router client session.
*/
typedef struct backend_ref_st
{
#if defined(SS_DEBUG)
skygw_chk_t bref_chk_top;
#endif
SERVER_REF* ref;
DCB* bref_dcb;
bref_state_t bref_state;
int bref_num_result_wait;
sescmd_cursor_t bref_sescmd_cur;
unsigned char reply_cmd; /**< The reply the backend server sent to a session command.
* Used to detect slaves that fail to execute session command. */
reply_state_t reply_state; /**< Reply state of the current query */
#if defined(SS_DEBUG)
skygw_chk_t bref_chk_tail;
#endif
int closed_at; /** DEBUG: Line number where this backend reference was closed */
} backend_ref_t;
typedef struct rwsplit_config_st
{
int rw_max_slave_conn_percent; /**< Maximum percentage of slaves
* to use for each connection*/
int max_slave_connections; /**< Maximum number of slaves for each connection*/
select_criteria_t slave_selection_criteria; /**< The slave selection criteria */
int max_slave_replication_lag; /**< Maximum replication lag */
mxs_target_t use_sql_variables_in; /**< Whether to send user variables
* to master or all nodes */
int max_sescmd_history; /**< Maximum amount of session commands to store */
bool disable_sescmd_history; /**< Disable session command history */
bool master_accept_reads; /**< Use master for reads */
bool strict_multi_stmt; /**< Force non-multistatement queries to be routed
* to the master after a multistatement query. */
enum failure_mode master_failure_mode; /**< Master server failure handling mode.
* @see enum failure_mode */
bool retry_failed_reads; /**< Retry failed reads on other servers */
int connection_keepalive; /**< Send pings to servers that have
* been idle for too long */
} rwsplit_config_t;
#if defined(PREP_STMT_CACHING)
typedef struct prep_stmt_st
{
#if defined(SS_DEBUG)
skygw_chk_t pstmt_chk_top;
#endif
union id
{
int seq;
char* name;
} pstmt_id;
prep_stmt_state_t pstmt_state;
prep_stmt_type_t pstmt_type;
#if defined(SS_DEBUG)
skygw_chk_t pstmt_chk_tail;
#endif
} prep_stmt_t;
#endif /*< PREP_STMT_CACHING */
/** States of a LOAD DATA LOCAL INFILE */
enum ld_state
{
LOAD_DATA_INACTIVE, /**< Not active */
LOAD_DATA_START, /**< Current query starts a load */
LOAD_DATA_ACTIVE, /**< Load is active */
LOAD_DATA_END /**< Current query contains an empty packet that ends the load */
};
/**
* The client session structure used within this router.
*/
struct router_client_session
{
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_top;
#endif
bool rses_closed; /*< true when closeSession is called */
rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; /*< Properties listed by their type */
backend_ref_t* rses_master_ref;
backend_ref_t* rses_backend_ref; /*< Pointer to backend reference array */
rwsplit_config_t rses_config; /*< copied config info from router instance */
int rses_nbackends;
int rses_nsescmd; /*< Number of executed session commands */
enum ld_state load_data_state; /*< Current load data state */
bool have_tmp_tables;
uint64_t rses_load_data_sent; /*< How much data has been sent */
DCB* client_dcb;
int pos_generator;
backend_ref_t *forced_node; /*< Current server where all queries should be sent */
int expected_responses; /**< Number of expected responses to the current query */
GWBUF* query_queue; /**< Queued commands waiting to be executed */
#if defined(PREP_STMT_CACHING)
HASHTABLE* rses_prep_stmt[2];
#endif
struct router_instance *router; /*< The router instance */
struct router_client_session *next;
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;
#endif
} ;
/**
* The statistics for this router instance
*/
typedef struct
{
uint64_t n_sessions; /*< Number sessions created */
uint64_t n_queries; /*< Number of queries forwarded */
uint64_t n_master; /*< Number of stmts sent to master */
uint64_t n_slave; /*< Number of stmts sent to slave */
uint64_t n_all; /*< Number of stmts sent to all */
} ROUTER_STATS;
/**
* The per instance data for the router.
*/
typedef struct router_instance
{
SERVICE* service; /*< Pointer to service */
rwsplit_config_t rwsplit_config; /*< expanded config info from SERVICE */
int rwsplit_version; /*< version number for router's config */
ROUTER_STATS stats; /*< Statistics for this router */
bool available_slaves; /*< The router has some slaves avialable */
} ROUTER_INSTANCE;
#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED));
/**
* @brief Route a stored query
*
* When multiple queries are executed in a pipeline fashion, the readwritesplit
* stores the extra queries in a queue. This queue is emptied after reading a
* reply from the backend server.
*
* @param rses Router client session
* @return True if a stored query was routed successfully
*/
bool route_stored_query(ROUTER_CLIENT_SES *rses);
/** Reply state change debug logging */
#define LOG_RS(a, b) MXS_DEBUG("[%s]:%d %s -> %s", (a)->ref->server->name, \
(a)->ref->server->port, rstostr((a)->reply_state), rstostr(b));
MXS_END_DECLS
#endif /*< _RWSPLITROUTER_H */

View File

@ -0,0 +1,335 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
/**
* @file router.h - The read write split router module heder file
*/
#define MXS_MODULE_NAME "readwritesplit"
#include <maxscale/cppdefs.hh>
#include <tr1/unordered_set>
#include <tr1/unordered_map>
#include <map>
#include <string>
#include <maxscale/dcb.h>
#include <maxscale/hashtable.h>
#include <maxscale/router.h>
#include <maxscale/service.h>
#include <maxscale/backend.hh>
#include <maxscale/session_command.hh>
enum backend_type_t
{
BE_UNDEFINED = -1,
BE_MASTER,
BE_JOINED = BE_MASTER,
BE_SLAVE,
BE_COUNT
};
enum route_target_t
{
TARGET_UNDEFINED = 0x00,
TARGET_MASTER = 0x01,
TARGET_SLAVE = 0x02,
TARGET_NAMED_SERVER = 0x04,
TARGET_ALL = 0x08,
TARGET_RLAG_MAX = 0x10
};
/**
* This criteria is used when backends are chosen for a router session's use.
* Backend servers are sorted to ascending order according to the criteria
* and top N are chosen.
*/
enum select_criteria_t
{
UNDEFINED_CRITERIA = 0,
LEAST_GLOBAL_CONNECTIONS, /**< all connections established by MaxScale */
LEAST_ROUTER_CONNECTIONS, /**< connections established by this router */
LEAST_BEHIND_MASTER,
LEAST_CURRENT_OPERATIONS,
DEFAULT_CRITERIA = LEAST_CURRENT_OPERATIONS,
LAST_CRITERIA /**< not used except for an index */
};
/**
* Controls how master failure is handled
*/
enum failure_mode
{
RW_FAIL_INSTANTLY, /**< Close the connection as soon as the master is lost */
RW_FAIL_ON_WRITE, /**< Close the connection when the first write is received */
RW_ERROR_ON_WRITE /**< Don't close the connection but send an error for writes */
};
/** States of a LOAD DATA LOCAL INFILE */
enum ld_state
{
LOAD_DATA_INACTIVE, /**< Not active */
LOAD_DATA_START, /**< Current query starts a load */
LOAD_DATA_ACTIVE, /**< Load is active */
LOAD_DATA_END /**< Current query contains an empty packet that ends the load */
};
#define TARGET_IS_MASTER(t) (t & TARGET_MASTER)
#define TARGET_IS_SLAVE(t) (t & TARGET_SLAVE)
#define TARGET_IS_NAMED_SERVER(t) (t & TARGET_NAMED_SERVER)
#define TARGET_IS_ALL(t) (t & TARGET_ALL)
#define TARGET_IS_RLAG_MAX(t) (t & TARGET_RLAG_MAX)
#define BREF_IS_NOT_USED(s) ((s)->bref_state & ~BREF_IN_USE)
#define BREF_IS_IN_USE(s) ((s)->bref_state & BREF_IN_USE)
#define BREF_IS_WAITING_RESULT(s) ((s)->bref_num_result_wait > 0)
#define BREF_IS_QUERY_ACTIVE(s) ((s)->bref_state & BREF_QUERY_ACTIVE)
#define BREF_IS_CLOSED(s) ((s)->bref_state & BREF_CLOSED)
#define BREF_HAS_FAILED(s) ((s)->bref_state & BREF_FATAL_FAILURE)
/** default values for rwsplit configuration parameters */
#define CONFIG_MAX_SLAVE_CONN 1
#define CONFIG_MAX_SLAVE_RLAG -1 /**< not used */
#define CONFIG_SQL_VARIABLES_IN TYPE_ALL
#define GET_SELECT_CRITERIA(s) \
(strncmp(s,"LEAST_GLOBAL_CONNECTIONS", strlen("LEAST_GLOBAL_CONNECTIONS")) == 0 ? \
LEAST_GLOBAL_CONNECTIONS : ( \
strncmp(s,"LEAST_BEHIND_MASTER", strlen("LEAST_BEHIND_MASTER")) == 0 ? \
LEAST_BEHIND_MASTER : ( \
strncmp(s,"LEAST_ROUTER_CONNECTIONS", strlen("LEAST_ROUTER_CONNECTIONS")) == 0 ? \
LEAST_ROUTER_CONNECTIONS : ( \
strncmp(s,"LEAST_CURRENT_OPERATIONS", strlen("LEAST_CURRENT_OPERATIONS")) == 0 ? \
LEAST_CURRENT_OPERATIONS : UNDEFINED_CRITERIA))))
#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED));
/** Reply state change debug logging */
#define LOG_RS(a, b) MXS_DEBUG("[%s]:%d %s -> %s", (a)->server()->name, \
(a)->server()->port, rstostr((a)->get_reply_state()), rstostr(b));
struct ROUTER_INSTANCE;
struct ROUTER_CLIENT_SES;
/** Enum for tracking client reply state */
enum reply_state_t
{
REPLY_STATE_START, /**< Query sent to backend */
REPLY_STATE_DONE, /**< Complete reply received */
REPLY_STATE_RSET_COLDEF, /**< Resultset response, waiting for column definitions */
REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */
};
struct rwsplit_config_t
{
int rw_max_slave_conn_percent; /**< Maximum percentage of slaves
* to use for each connection*/
int max_slave_connections; /**< Maximum number of slaves for each connection*/
select_criteria_t slave_selection_criteria; /**< The slave selection criteria */
int max_slave_replication_lag; /**< Maximum replication lag */
mxs_target_t use_sql_variables_in; /**< Whether to send user variables
* to master or all nodes */
uint64_t max_sescmd_history; /**< Maximum amount of session commands to store */
bool disable_sescmd_history; /**< Disable session command history */
bool master_accept_reads; /**< Use master for reads */
bool strict_multi_stmt; /**< Force non-multistatement queries to be routed
* to the master after a multistatement query. */
enum failure_mode master_failure_mode; /**< Master server failure handling mode.
* @see enum failure_mode */
bool retry_failed_reads; /**< Retry failed reads on other servers */
int connection_keepalive; /**< Send pings to servers that have
* been idle for too long */
};
class RWBackend: public mxs::Backend
{
RWBackend(const RWBackend&);
RWBackend& operator=(const RWBackend&);
public:
RWBackend(SERVER_REF* ref):
mxs::Backend(ref),
m_reply_state(REPLY_STATE_DONE)
{
}
~RWBackend()
{
}
reply_state_t get_reply_state() const
{
return m_reply_state;
}
void set_reply_state(reply_state_t state)
{
m_reply_state = state;
}
bool execute_session_command()
{
bool rval = mxs::Backend::execute_session_command();
if (rval)
{
set_reply_state(REPLY_STATE_START);
}
return rval;
}
private:
reply_state_t m_reply_state;
};
typedef std::tr1::shared_ptr<RWBackend> SRWBackend;
typedef std::list<SRWBackend> SRWBackendList;
typedef std::tr1::unordered_set<std::string> TableSet;
typedef std::map<uint64_t, uint8_t> ResponseMap;
/** Prepared statement ID to type maps for text and binary protocols */
typedef std::tr1::unordered_map<std::string, uint32_t> TextPSMap;
/**
* The client session structure used within this router.
*/
struct ROUTER_CLIENT_SES
{
skygw_chk_t rses_chk_top;
bool rses_closed; /**< true when closeSession is called */
SRWBackendList backends; /**< List of backend servers */
SRWBackend current_master; /**< Current master server */
SRWBackend target_node; /**< The currently locked target node */
rwsplit_config_t rses_config; /**< copied config info from router instance */
int rses_nbackends;
enum ld_state load_data_state; /**< Current load data state */
bool have_tmp_tables;
uint64_t rses_load_data_sent; /**< How much data has been sent */
DCB* client_dcb;
uint64_t sescmd_count;
int expected_responses; /**< Number of expected responses to the current query */
GWBUF* query_queue; /**< Queued commands waiting to be executed */
struct ROUTER_INSTANCE *router; /**< The router instance */
struct ROUTER_CLIENT_SES *next;
TableSet temp_tables; /**< Set of temporary tables */
mxs::SessionCommandList sescmd_list; /**< List of executed session commands */
ResponseMap sescmd_responses; /**< Response to each session command */
uint64_t sent_sescmd; /**< ID of the last sent session command*/
uint64_t recv_sescmd; /**< ID of the most recently completed session command */
TextPSMap ps_text; /**< Text protocol prepared statements */
skygw_chk_t rses_chk_tail;
};
/**
* The statistics for this router instance
*/
struct ROUTER_STATS
{
uint64_t n_sessions; /**< Number sessions created */
uint64_t n_queries; /**< Number of queries forwarded */
uint64_t n_master; /**< Number of stmts sent to master */
uint64_t n_slave; /**< Number of stmts sent to slave */
uint64_t n_all; /**< Number of stmts sent to all */
};
/**
* The per instance data for the router.
*/
struct ROUTER_INSTANCE
{
SERVICE* service; /**< Pointer to service */
rwsplit_config_t rwsplit_config; /**< expanded config info from SERVICE */
int rwsplit_version; /**< version number for router's config */
ROUTER_STATS stats; /**< Statistics for this router */
bool available_slaves; /**< The router has some slaves avialable */
};
/**
* @brief Route a stored query
*
* When multiple queries are executed in a pipeline fashion, the readwritesplit
* stores the extra queries in a queue. This queue is emptied after reading a
* reply from the backend server.
*
* @param rses Router client session
* @return True if a stored query was routed successfully
*/
bool route_stored_query(ROUTER_CLIENT_SES *rses);
static inline const char* select_criteria_to_str(select_criteria_t type)
{
switch (type)
{
case LEAST_GLOBAL_CONNECTIONS:
return "LEAST_GLOBAL_CONNECTIONS";
case LEAST_ROUTER_CONNECTIONS:
return "LEAST_ROUTER_CONNECTIONS";
case LEAST_BEHIND_MASTER:
return "LEAST_BEHIND_MASTER";
case LEAST_CURRENT_OPERATIONS:
return "LEAST_CURRENT_OPERATIONS";
default:
return "UNDEFINED_CRITERIA";
}
}
/**
* Helper function to convert reply_state_t to string
*/
static inline const char* rstostr(reply_state_t state)
{
switch (state)
{
case REPLY_STATE_START:
return "REPLY_STATE_START";
case REPLY_STATE_DONE:
return "REPLY_STATE_DONE";
case REPLY_STATE_RSET_COLDEF:
return "REPLY_STATE_RSET_COLDEF";
case REPLY_STATE_RSET_ROWS:
return "REPLY_STATE_RSET_ROWS";
}
ss_dassert(false);
return "UNKNOWN";
}
static inline const char* failure_mode_to_str(enum failure_mode type)
{
switch (type)
{
case RW_FAIL_INSTANTLY:
return "fail_instantly";
case RW_FAIL_ON_WRITE:
return "fail_on_write";
case RW_ERROR_ON_WRITE:
return "error_on_write";
default:
ss_dassert(false);
return "UNDEFINED_MODE";
}
}

View File

@ -1,158 +0,0 @@
#pragma once
#ifndef _RWSPLIT_INTERNAL_H
#define _RWSPLIT_INTERNAL_H
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
/*
* File: rwsplit_internal.h
* Author: mbrampton
*
* Created on 08 August 2016, 11:54
*/
#include <maxscale/cdefs.h>
#include <maxscale/query_classifier.h>
MXS_BEGIN_DECLS
/* This needs to be removed along with dependency on it - see the
* rwsplit_tmp_table_multi functions
*/
#include <maxscale/protocol/mysql.h>
#define RW_CHK_DCB(b, d) \
do{ \
if(d->state == DCB_STATE_DISCONNECTED){ \
MXS_NOTICE("DCB was closed on line %d and another attempt to close it is made on line %d." , \
(b) ? (b)->closed_at : -1, __LINE__); \
} \
}while (false)
#define RW_CLOSE_BREF(b) do{ if (b){ (b)->closed_at = __LINE__; } } while (false)
/*
* The following are implemented in rwsplit_mysql.c
*/
bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf);
void closed_session_reply(GWBUF *querybuf);
void print_error_packet(ROUTER_CLIENT_SES *rses, GWBUF *buf, DCB *dcb);
void check_session_command_reply(GWBUF *writebuf, sescmd_cursor_t *scur, backend_ref_t *bref);
bool execute_sescmd_in_backend(backend_ref_t *backend_ref);
bool handle_target_is_all(route_target_t route_target,
ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, int packet_type, qc_query_type_t qtype);
int determine_packet_type(GWBUF *querybuf, bool *non_empty_packet);
void log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t qtype);
bool is_packet_a_one_way_message(int packet_type);
sescmd_cursor_t *backend_ref_get_sescmd_cursor(backend_ref_t *bref);
bool is_packet_a_query(int packet_type);
bool send_readonly_error(DCB *dcb);
/*
* The following are implemented in readwritesplit.c
*/
void bref_clear_state(backend_ref_t *bref, bref_state_t state);
void bref_set_state(backend_ref_t *bref, bref_state_t state);
int router_handle_state_switch(DCB *dcb, DCB_REASON reason, void *data);
backend_ref_t *get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb);
void rses_property_done(rses_property_t *prop);
int rses_get_max_slavecount(ROUTER_CLIENT_SES *rses, int router_nservers);
int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses);
/*
* The following are implemented in rwsplit_route_stmt.c
*/
bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf);
int rwsplit_hashkeyfun(const void *key);
int rwsplit_hashcmpfun(const void *v1, const void *v2);
void *rwsplit_hstrdup(const void *fval);
void rwsplit_hfree(void *fval);
bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
char *name, int max_rlag);
route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
qc_query_type_t qtype, HINT *hint);
rses_property_t *rses_property_init(rses_property_type_t prop_type);
int rses_property_add(ROUTER_CLIENT_SES *rses, rses_property_t *prop);
void handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
int packet_type, int *qtype);
bool handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
route_target_t route_target, DCB **target_dcb);
bool handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
DCB **target_dcb);
bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
DCB **target_dcb);
bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, DCB *target_dcb, bool store);
bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf, ROUTER_INSTANCE *inst,
int packet_type,
qc_query_type_t qtype);
/*
* The following are implemented in rwsplit_session_cmd.c
*/
mysql_sescmd_t *rses_property_get_sescmd(rses_property_t *prop);
mysql_sescmd_t *mysql_sescmd_init(rses_property_t *rses_prop,
GWBUF *sescmd_buf,
unsigned char packet_type,
ROUTER_CLIENT_SES *rses);
void mysql_sescmd_done(mysql_sescmd_t *sescmd);
mysql_sescmd_t *sescmd_cursor_get_command(sescmd_cursor_t *scur);
bool sescmd_cursor_is_active(sescmd_cursor_t *sescmd_cursor);
void sescmd_cursor_set_active(sescmd_cursor_t *sescmd_cursor,
bool value);
bool execute_sescmd_history(backend_ref_t *bref);
GWBUF *sescmd_cursor_clone_querybuf(sescmd_cursor_t *scur);
GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
backend_ref_t *bref,
bool *reconnect);
/*
* The following are implemented in rwsplit_select_backends.c
*/
bool select_connect_backend_servers(backend_ref_t **p_master_ref,
backend_ref_t *backend_ref,
int router_nservers, int max_nslaves,
int max_slave_rlag,
select_criteria_t select_criteria,
MXS_SESSION *session,
ROUTER_INSTANCE *router,
bool active_session);
/*
* The following are implemented in rwsplit_tmp_table_multi.c
*/
void check_drop_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf,
mysql_server_cmd_t packet_type);
bool is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf,
qc_query_type_t type);
void check_create_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf, qc_query_type_t type);
bool check_for_multi_stmt(GWBUF *buf, void *protocol, mysql_server_cmd_t packet_type);
qc_query_type_t determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet);
void close_failed_bref(backend_ref_t *bref, bool fatal);
#ifdef __cplusplus
}
#endif
MXS_END_DECLS
#endif /* RWSPLIT_INTERNAL_H */

View File

@ -0,0 +1,122 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include <string>
#include <maxscale/query_classifier.h>
#include <maxscale/protocol/mysql.h>
#include "readwritesplit.hh"
#define RW_CHK_DCB(b, d) \
do{ \
if(d->state == DCB_STATE_DISCONNECTED){ \
MXS_NOTICE("DCB was closed on line %d and another attempt to close it is made on line %d." , \
(b) ? (b)->closed_at : -1, __LINE__); \
} \
}while (false)
#define RW_CLOSE_BREF(b) do{ if (b){ (b)->closed_at = __LINE__; } } while (false)
/*
* The following are implemented in rwsplit_mysql.c
*/
bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf);
void closed_session_reply(GWBUF *querybuf);
void print_error_packet(ROUTER_CLIENT_SES *rses, GWBUF *buf, DCB *dcb);
void check_session_command_reply(GWBUF *writebuf, SRWBackend bref);
bool execute_sescmd_in_backend(SRWBackend& backend_ref);
bool handle_target_is_all(route_target_t route_target,
ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, int packet_type, uint32_t qtype);
uint8_t determine_packet_type(GWBUF *querybuf, bool *non_empty_packet);
void log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint32_t qtype);
bool command_will_respond(uint8_t packet_type);
bool is_packet_a_query(int packet_type);
bool send_readonly_error(DCB *dcb);
/*
* The following are implemented in readwritesplit.c
*/
int router_handle_state_switch(DCB *dcb, DCB_REASON reason, void *data);
SRWBackend get_backend_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb);
int rses_get_max_slavecount(ROUTER_CLIENT_SES *rses);
int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses);
/*
* The following are implemented in rwsplit_route_stmt.c
*/
bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf);
SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
char *name, int max_rlag);
route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
uint32_t qtype, HINT *hint);
void handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
uint8_t packet_type, uint32_t *qtype);
SRWBackend handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
route_target_t route_target);
SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses);
bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
SRWBackend* dest);
bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, SRWBackend& target, bool store);
bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t command);
void process_sescmd_response(ROUTER_CLIENT_SES* rses, SRWBackend& bref,
GWBUF** ppPacket, bool* reconnect);
/*
* The following are implemented in rwsplit_select_backends.c
*/
/** What sort of connections should be create */
enum connection_type
{
ALL,
SLAVE
};
bool select_connect_backend_servers(int router_nservers,
int max_nslaves,
select_criteria_t select_criteria,
MXS_SESSION *session,
ROUTER_INSTANCE *router,
ROUTER_CLIENT_SES *rses,
connection_type type);
/*
* The following are implemented in rwsplit_tmp_table_multi.c
*/
void check_drop_tmp_table(ROUTER_CLIENT_SES *router_cli_ses, GWBUF *querybuf);
bool is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf,
uint32_t type);
void check_create_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf, uint32_t type);
bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type);
uint32_t determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet);
void close_all_connections(ROUTER_CLIENT_SES* rses);
/**
* Functions for prepared statement handling
*/
std::string extract_text_ps_id(GWBUF* buffer);
void store_text_ps(ROUTER_CLIENT_SES* rses, std::string id, GWBUF* buffer);
void erase_text_ps(ROUTER_CLIENT_SES* rses, std::string id);
bool get_text_ps_type(ROUTER_CLIENT_SES* rses, GWBUF* buffer, uint32_t* out);

View File

@ -11,18 +11,15 @@
* Public License.
*/
#include "readwritesplit.h"
#include "readwritesplit.hh"
#include "rwsplit_internal.hh"
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <maxscale/router.h>
#include "rwsplit_internal.h"
#include <mysql.h>
#include <maxscale/log_manager.h>
#include <maxscale/query_classifier.h>
#include <maxscale/dcb.h>
@ -30,32 +27,19 @@
#include <maxscale/modinfo.h>
#include <maxscale/modutil.h>
#include <maxscale/protocol/mysql.h>
#include <mysqld_error.h>
#include <maxscale/alloc.h>
#if defined(SS_DEBUG)
#include <maxscale/protocol/mysql.h>
#endif
#define RWSPLIT_TRACE_MSG_LEN 1000
/**
* @file rwsplit_mysql.c Functions within the read-write split router that
* are specific to MySQL. The aim is to either remove these into a separate
* module or to move them into the MySQL protocol modules.
*
* @verbatim
* Revision History
*
* Date Who Description
* 08/08/2016 Martin Brampton Initial implementation
*
* @endverbatim
* Functions within the read-write split router that are specific to
* MySQL. The aim is to either remove these into a separate module or to
* move them into the MySQL protocol modules.
*/
/*
* The following functions are called from elsewhere in the router and
* are defined in rwsplit_internal.h. They are not intended to be called
* are defined in rwsplit_internal.hh. They are not intended to be called
* from outside this router.
*/
@ -80,24 +64,24 @@
* @param non_empty_packet bool indicating whether the packet is non-empty
* @return The packet type, or MYSQL_COM_UNDEFINED; also the second parameter is set
*/
int
uint8_t
determine_packet_type(GWBUF *querybuf, bool *non_empty_packet)
{
mysql_server_cmd_t packet_type;
uint8_t packet_type;
uint8_t *packet = GWBUF_DATA(querybuf);
if (gw_mysql_get_byte3(packet) == 0)
{
/** Empty packet signals end of LOAD DATA LOCAL INFILE, send it to master*/
*non_empty_packet = false;
packet_type = MYSQL_COM_UNDEFINED;
packet_type = (uint8_t)MYSQL_COM_UNDEFINED;
}
else
{
*non_empty_packet = true;
packet_type = packet[4];
}
return (int)packet_type;
return packet_type;
}
/*
@ -134,11 +118,11 @@ is_packet_a_query(int packet_type)
* @param packet_type Type of packet (integer)
* @return bool indicating whether packet contains a one way message
*/
bool
is_packet_a_one_way_message(int packet_type)
bool command_will_respond(uint8_t packet_type)
{
return (packet_type == MYSQL_COM_STMT_SEND_LONG_DATA ||
packet_type == MYSQL_COM_QUIT || packet_type == MYSQL_COM_STMT_CLOSE);
return packet_type != MYSQL_COM_STMT_SEND_LONG_DATA &&
packet_type != MYSQL_COM_QUIT &&
packet_type != MYSQL_COM_STMT_CLOSE;
}
/*
@ -157,7 +141,7 @@ is_packet_a_one_way_message(int packet_type)
* @param qtype Query type
*/
void
log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t qtype)
log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint32_t qtype)
{
if (rses->load_data_state == LOAD_DATA_INACTIVE)
{
@ -219,10 +203,19 @@ log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t
*/
bool handle_target_is_all(route_target_t route_target, ROUTER_INSTANCE *inst,
ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
int packet_type, qc_query_type_t qtype)
int packet_type, uint32_t qtype)
{
bool result = false;
if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT))
{
store_text_ps(rses, extract_text_ps_id(querybuf), querybuf);
}
else if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT))
{
gwbuf_set_type(querybuf, GWBUF_TYPE_COLLECT_RESULT);
}
if (TARGET_IS_MASTER(route_target) || TARGET_IS_SLAVE(route_target))
{
/**
@ -250,7 +243,7 @@ bool handle_target_is_all(route_target_t route_target, ROUTER_INSTANCE *inst,
MXS_FREE(query_str);
MXS_FREE(qtype_str);
}
else if (route_session_write(rses, gwbuf_clone(querybuf), inst, packet_type, qtype))
else if (route_session_write(rses, gwbuf_clone(querybuf), packet_type))
{
result = true;
@ -291,202 +284,33 @@ void closed_session_reply(GWBUF *querybuf)
/**
* @brief Check the reply from a backend server to a session command
*
* If the reply is an error, a message may be logged.
* If the reply is an error, a message is logged.
*
* @param writebuf Query buffer containing reply data
* @param scur Session cursor
* @param bref Router session data for a backend server
* @param buffer Query buffer containing reply data
* @param backend Router session data for a backend server
*/
void check_session_command_reply(GWBUF *writebuf, sescmd_cursor_t *scur, backend_ref_t *bref)
void check_session_command_reply(GWBUF *buffer, SRWBackend backend)
{
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_ERR) &&
MYSQL_IS_ERROR_PACKET(((uint8_t *)GWBUF_DATA(writebuf))))
if (MYSQL_IS_ERROR_PACKET(((uint8_t *)GWBUF_DATA(buffer))))
{
uint8_t *buf = (uint8_t *)GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf));
uint8_t *replybuf = (uint8_t *)GWBUF_DATA(writebuf);
size_t len = MYSQL_GET_PAYLOAD_LEN(buf);
size_t replylen = MYSQL_GET_PAYLOAD_LEN(replybuf);
char *err = strndup(&((char *)replybuf)[8], 5);
char *replystr = strndup(&((char *)replybuf)[13], replylen - 4 - 5);
size_t replylen = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer));
char replybuf[replylen];
gwbuf_copy_data(buffer, 0, gwbuf_length(buffer), (uint8_t*)replybuf);
std::string err;
std::string msg;
err.append(replybuf + 8, 5);
msg.append(replybuf + 13, replylen - 4 - 5);
ss_dassert(len + 4 == GWBUF_LENGTH(scur->scmd_cur_cmd->my_sescmd_buf));
MXS_ERROR("Failed to execute session command in [%s]:%d. Error was: %s %s",
bref->ref->server->name,
bref->ref->server->port, err, replystr);
MXS_FREE(err);
MXS_FREE(replystr);
MXS_ERROR("Failed to execute session command in %s. Error was: %s %s",
backend->uri(), err.c_str(), msg.c_str());
}
}
/**
* @brief If session command cursor is passive, sends the command to backend for
* execution.
* @brief Send an error message to the client telling that the server is in read only mode
*
* Returns true if command was sent or added successfully to the queue.
* Returns false if command sending failed or if there are no pending session
* commands.
*
* Router session must be locked.
*
* @param backend_ref Router session backend database data
* @return bool - true for success, false for failure
*/
/*
* Uses MySQL specific values in the large switch statement, although it
* may be possible to generalize them.
*/
bool execute_sescmd_in_backend(backend_ref_t *backend_ref)
{
ss_dassert(backend_ref);
CHK_BACKEND_REF(backend_ref);
bool succp = false;
if (BREF_IS_CLOSED(backend_ref))
{
return succp;
}
DCB *dcb = backend_ref->bref_dcb;
CHK_DCB(dcb);
/**
* Get cursor pointer and copy of command buffer to cursor.
*/
sescmd_cursor_t *scur = &backend_ref->bref_sescmd_cur;
/** Return if there are no pending ses commands */
if (sescmd_cursor_get_command(scur) == NULL)
{
succp = true;
MXS_INFO("Cursor had no pending session commands.");
return succp;
}
if (!sescmd_cursor_is_active(scur))
{
/** Cursor is left active when function returns. */
sescmd_cursor_set_active(scur, true);
}
int rc = 0;
GWBUF *buf;
switch (scur->scmd_cur_cmd->my_sescmd_packet_type)
{
case MYSQL_COM_CHANGE_USER:
/** This makes it possible to handle replies correctly */
gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD);
buf = sescmd_cursor_clone_querybuf(scur);
rc = dcb->func.auth(dcb, NULL, dcb->session, buf);
break;
case MYSQL_COM_INIT_DB:
{
/**
* Record database name and store to session.
*
* TODO: Do this in the client protocol module
*/
GWBUF *tmpbuf;
MYSQL_session* data;
unsigned int qlen;
data = (MYSQL_session*)dcb->session->client_dcb->data;
*data->db = 0;
tmpbuf = scur->scmd_cur_cmd->my_sescmd_buf;
qlen = MYSQL_GET_PAYLOAD_LEN((unsigned char *) GWBUF_DATA(tmpbuf));
if (qlen)
{
--qlen; // The COM_INIT_DB byte
if (qlen > MYSQL_DATABASE_MAXLEN)
{
MXS_ERROR("Too long a database name received in COM_INIT_DB, "
"trailing data will be cut.");
qlen = MYSQL_DATABASE_MAXLEN;
}
memcpy(data->db, (char*)GWBUF_DATA(tmpbuf) + 5, qlen);
data->db[qlen] = 0;
}
}
/** Fallthrough */
case MYSQL_COM_QUERY:
default:
/**
* Mark session command buffer, it triggers writing
* MySQL command to protocol
*/
gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD);
buf = sescmd_cursor_clone_querybuf(scur);
rc = dcb->func.write(dcb, buf);
break;
}
if (rc == 1)
{
succp = true;
ss_dassert(backend_ref->reply_state == REPLY_STATE_DONE);
LOG_RS(backend_ref, REPLY_STATE_START);
backend_ref->reply_state = REPLY_STATE_START;
}
return succp;
}
/*
* End of functions called from other router modules; start of functions that
* are internal to this module
*/
/**
* Get client DCB pointer of the router client session.
* This routine must be protected by Router client session lock.
*
* APPEARS TO NEVER BE USED!!
*
* @param rses Router client session pointer
*
* @return Pointer to client DCB
*/
static DCB *rses_get_client_dcb(ROUTER_CLIENT_SES *rses)
{
DCB *dcb = NULL;
int i;
for (i = 0; i < rses->rses_nbackends; i++)
{
if ((dcb = rses->rses_backend_ref[i].bref_dcb) != NULL &&
BREF_IS_IN_USE(&rses->rses_backend_ref[i]) && dcb->session != NULL &&
dcb->session->client_dcb != NULL)
{
return dcb->session->client_dcb;
}
}
return NULL;
}
/*
* The following are internal (directly or indirectly) to routing a statement
* and should be moved to rwsplit_route_cmd.c if the MySQL specific code can
* be removed.
*/
sescmd_cursor_t *backend_ref_get_sescmd_cursor(backend_ref_t *bref)
{
sescmd_cursor_t *scur;
CHK_BACKEND_REF(bref);
scur = &bref->bref_sescmd_cur;
CHK_SESCMD_CUR(scur);
return scur;
}
/**
* Send an error message to the client telling that the server is in read only mode
* @param dcb Client DCB
*
* @return True if sending the message was successful, false if an error occurred
*/
bool send_readonly_error(DCB *dcb)

View File

@ -0,0 +1,68 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "readwritesplit.hh"
#include <maxscale/alloc.h>
#include <maxscale/query_classifier.h>
std::string extract_text_ps_id(GWBUF* buffer)
{
std::string rval;
char* name = qc_get_prepare_name(buffer);
if (name)
{
rval = name;
MXS_FREE(name);
}
return rval;
}
void store_text_ps(ROUTER_CLIENT_SES* rses, std::string id, GWBUF* buffer)
{
GWBUF* stmt = qc_get_preparable_stmt(buffer);
ss_dassert(stmt);
uint32_t type = qc_get_type_mask(stmt);
ss_dassert((type & (QUERY_TYPE_PREPARE_STMT | QUERY_TYPE_PREPARE_NAMED_STMT)) == 0);
rses->ps_text[id] = type;
}
void erase_text_ps(ROUTER_CLIENT_SES* rses, std::string id)
{
rses->ps_text.erase(id);
}
bool get_text_ps_type(ROUTER_CLIENT_SES* rses, GWBUF* buffer, uint32_t* out)
{
bool rval = false;
char* name = qc_get_prepare_name(buffer);
if (name)
{
TextPSMap::iterator it = rses->ps_text.find(name);
if (it != rses->ps_text.end())
{
*out = it->second;
rval = true;
}
MXS_FREE(name);
}
return rval;
}

View File

@ -1,557 +0,0 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "readwritesplit.h"
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <maxscale/router.h>
#include "rwsplit_internal.h"
/**
* @file rwsplit_select_backends.c The functions that implement back end
* selection for the read write split router. All of these functions are
* internal to that router and not intended to be called from elsewhere.
*
* @verbatim
* Revision History
*
* Date Who Description
* 08/08/2016 Martin Brampton Initial implementation
*
* @endverbatim
*/
static bool connect_server(backend_ref_t *bref, MXS_SESSION *session, bool execute_history);
static void log_server_connections(select_criteria_t select_criteria,
backend_ref_t *backend_ref, int router_nservers);
static SERVER_REF *get_root_master(backend_ref_t *servers, int router_nservers);
static int bref_cmp_global_conn(const void *bref1, const void *bref2);
static int bref_cmp_router_conn(const void *bref1, const void *bref2);
static int bref_cmp_behind_master(const void *bref1, const void *bref2);
static int bref_cmp_current_load(const void *bref1, const void *bref2);
/**
* The order of functions _must_ match with the order the select criteria are
* listed in select_criteria_t definition in readwritesplit.h
*/
int (*criteria_cmpfun[LAST_CRITERIA])(const void *, const void *) =
{
NULL,
bref_cmp_global_conn,
bref_cmp_router_conn,
bref_cmp_behind_master,
bref_cmp_current_load
};
/**
* @brief Check whether it's possible to connect to this server
*
* @param bref Backend reference
* @return True if a connection to this server can be attempted
*/
static bool bref_valid_for_connect(const backend_ref_t *bref)
{
return !BREF_HAS_FAILED(bref) && SERVER_IS_RUNNING(bref->ref->server);
}
/**
* Check whether it's possible to use this server as a slave
*
* @param bref Backend reference
* @param master_host The master server
* @return True if this server is a valid slave candidate
*/
static bool bref_valid_for_slave(const backend_ref_t *bref, const SERVER *master_host)
{
SERVER *server = bref->ref->server;
return (SERVER_IS_SLAVE(server) || SERVER_IS_RELAY_SERVER(server)) &&
(master_host == NULL || (server != master_host));
}
/**
* @brief Find the best slave candidate
*
* This function iterates through @c bref and tries to find the best backend
* reference that is not in use. @c cmpfun will be called to compare the backends.
*
* @param bref Backend reference
* @param n Size of @c bref
* @param master The master server
* @param cmpfun qsort() compatible comparison function
* @return The best slave backend reference or NULL if no candidates could be found
*/
backend_ref_t* get_slave_candidate(backend_ref_t *bref, int n, const SERVER *master,
int (*cmpfun)(const void *, const void *))
{
backend_ref_t *candidate = NULL;
for (int i = 0; i < n; i++)
{
if (!BREF_IS_IN_USE(&bref[i]) &&
bref_valid_for_connect(&bref[i]) &&
bref_valid_for_slave(&bref[i], master))
{
if (candidate)
{
if (cmpfun(candidate, &bref[i]) > 0)
{
candidate = &bref[i];
}
}
else
{
candidate = &bref[i];
}
}
}
return candidate;
}
/**
* @brief Search suitable backend servers from those of router instance
*
* It is assumed that there is only one master among servers of a router instance.
* As a result, the first master found is chosen. There will possibly be more
* backend references than connected backends because only those in correct state
* are connected to.
*
* @param p_master_ref Pointer to location where master's backend reference is to be stored
* @param backend_ref Pointer to backend server reference object array
* @param router_nservers Number of backend server pointers pointed to by @p backend_ref
* @param max_nslaves Upper limit for the number of slaves
* @param max_slave_rlag Maximum allowed replication lag for any slave
* @param select_criteria Slave selection criteria
* @param session Client session
* @param router Router instance
* @return true, if at least one master and one slave was found.
*/
bool select_connect_backend_servers(backend_ref_t **p_master_ref,
backend_ref_t *backend_ref,
int router_nservers, int max_nslaves,
int max_slave_rlag,
select_criteria_t select_criteria,
MXS_SESSION *session,
ROUTER_INSTANCE *router,
bool active_session)
{
if (p_master_ref == NULL || backend_ref == NULL)
{
MXS_ERROR("Master reference (%p) or backend reference (%p) is NULL.",
p_master_ref, backend_ref);
ss_dassert(false);
return false;
}
/* get the root Master */
SERVER_REF *master_backend = get_root_master(backend_ref, router_nservers);
SERVER *master_host = master_backend ? master_backend->server : NULL;
if (router->rwsplit_config.master_failure_mode == RW_FAIL_INSTANTLY &&
(master_host == NULL || SERVER_IS_DOWN(master_host)))
{
MXS_ERROR("Couldn't find suitable Master from %d candidates.", router_nservers);
return false;
}
/**
* New session:
*
* Connect to both master and slaves
*
* Existing session:
*
* Master is already connected or we don't have a master. The function was
* called because new slaves must be selected to replace failed ones.
*/
bool master_connected = active_session || *p_master_ref != NULL;
/** Check slave selection criteria and set compare function */
int (*p)(const void *, const void *) = criteria_cmpfun[select_criteria];
ss_dassert(p);
SERVER *old_master = *p_master_ref ? (*p_master_ref)->ref->server : NULL;
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{
log_server_connections(select_criteria, backend_ref, router_nservers);
}
int slaves_found = 0;
int slaves_connected = 0;
const int min_nslaves = 0; /*< not configurable at the time */
bool succp = false;
if (!master_connected)
{
/** Find a master server */
for (int i = 0; i < router_nservers; i++)
{
SERVER *serv = backend_ref[i].ref->server;
if (bref_valid_for_connect(&backend_ref[i]) &&
master_host && serv == master_host)
{
if (connect_server(&backend_ref[i], session, false))
{
*p_master_ref = &backend_ref[i];
break;
}
}
}
}
/** Calculate how many connections we already have */
for (int i = 0; i < router_nservers; i++)
{
if (bref_valid_for_connect(&backend_ref[i]) &&
bref_valid_for_slave(&backend_ref[i], master_host))
{
slaves_found += 1;
if (BREF_IS_IN_USE(&backend_ref[i]))
{
slaves_connected += 1;
}
}
}
ss_dassert(slaves_connected < max_nslaves || max_nslaves == 0);
backend_ref_t *bref = get_slave_candidate(backend_ref, router_nservers, master_host, p);
/** Connect to all possible slaves */
while (bref && slaves_connected < max_nslaves)
{
if (connect_server(bref, session, true))
{
slaves_connected += 1;
}
else
{
/** Failed to connect, mark server as failed */
bref_set_state(bref, BREF_FATAL_FAILURE);
}
bref = get_slave_candidate(backend_ref, router_nservers, master_host, p);
}
/**
* Successful cases
*/
if (slaves_connected >= min_nslaves && slaves_connected <= max_nslaves)
{
succp = true;
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{
if (slaves_connected < max_nslaves)
{
MXS_INFO("Couldn't connect to maximum number of "
"slaves. Connected successfully to %d slaves "
"of %d of them.", slaves_connected, slaves_found);
}
for (int i = 0; i < router_nservers; i++)
{
if (BREF_IS_IN_USE((&backend_ref[i])))
{
MXS_INFO("Selected %s in \t[%s]:%d",
STRSRVSTATUS(backend_ref[i].ref->server),
backend_ref[i].ref->server->name,
backend_ref[i].ref->server->port);
}
} /* for */
}
}
/** Failure cases */
else
{
MXS_ERROR("Couldn't establish required amount of slave connections for "
"router session. Would need between %d and %d slaves but only have %d.",
min_nslaves, max_nslaves, slaves_connected);
/** Clean up connections */
for (int i = 0; i < router_nservers; i++)
{
if (BREF_IS_IN_USE((&backend_ref[i])))
{
ss_dassert(backend_ref[i].ref->connections > 0);
close_failed_bref(&backend_ref[i], true);
/** Decrease backend's connection counter. */
atomic_add(&backend_ref[i].ref->connections, -1);
RW_CHK_DCB(&backend_ref[i], backend_ref[i].bref_dcb);
dcb_close(backend_ref[i].bref_dcb);
RW_CLOSE_BREF(&backend_ref[i]);
}
}
}
return succp;
}
/** Compare number of connections from this router in backend servers */
static int bref_cmp_router_conn(const void *bref1, const void *bref2)
{
SERVER_REF *b1 = ((backend_ref_t *)bref1)->ref;
SERVER_REF *b2 = ((backend_ref_t *)bref2)->ref;
if (b1->weight == 0 && b2->weight == 0)
{
return b1->connections - b2->connections;
}
else if (b1->weight == 0)
{
return 1;
}
else if (b2->weight == 0)
{
return -1;
}
return ((1000 + 1000 * b1->connections) / b1->weight) -
((1000 + 1000 * b2->connections) / b2->weight);
}
/** Compare number of global connections in backend servers */
static int bref_cmp_global_conn(const void *bref1, const void *bref2)
{
SERVER_REF *b1 = ((backend_ref_t *)bref1)->ref;
SERVER_REF *b2 = ((backend_ref_t *)bref2)->ref;
if (b1->weight == 0 && b2->weight == 0)
{
return b1->server->stats.n_current -
b2->server->stats.n_current;
}
else if (b1->weight == 0)
{
return 1;
}
else if (b2->weight == 0)
{
return -1;
}
return ((1000 + 1000 * b1->server->stats.n_current) / b1->weight) -
((1000 + 1000 * b2->server->stats.n_current) / b2->weight);
}
/** Compare replication lag between backend servers */
static int bref_cmp_behind_master(const void *bref1, const void *bref2)
{
SERVER_REF *b1 = ((backend_ref_t *)bref1)->ref;
SERVER_REF *b2 = ((backend_ref_t *)bref2)->ref;
if (b1->weight == 0 && b2->weight == 0)
{
return b1->server->rlag -
b2->server->rlag;
}
else if (b1->weight == 0)
{
return 1;
}
else if (b2->weight == 0)
{
return -1;
}
return ((1000 + 1000 * b1->server->rlag) / b1->weight) -
((1000 + 1000 * b2->server->rlag) / b2->weight);
}
/** Compare number of current operations in backend servers */
static int bref_cmp_current_load(const void *bref1, const void *bref2)
{
SERVER_REF *b1 = ((backend_ref_t *)bref1)->ref;
SERVER_REF *b2 = ((backend_ref_t *)bref2)->ref;
if (b1->weight == 0 && b2->weight == 0)
{
return b1->server->stats.n_current_ops - b2->server->stats.n_current_ops;
}
else if (b1->weight == 0)
{
return 1;
}
else if (b2->weight == 0)
{
return -1;
}
return ((1000 + 1000 * b1->server->stats.n_current_ops) / b1->weight) -
((1000 + 1000 * b2->server->stats.n_current_ops) / b2->weight);
}
/**
* @brief Connect a server
*
* Connects to a server, adds callbacks to the created DCB and updates
* router statistics. If @p execute_history is true, the session command
* history will be executed on this server.
*
* @param b Router's backend structure for the server
* @param session Client's session object
* @param execute_history Execute session command history
* @return True if successful, false if an error occurred
*/
static bool connect_server(backend_ref_t *bref, MXS_SESSION *session, bool execute_history)
{
SERVER *serv = bref->ref->server;
bool rval = false;
bref->bref_dcb = dcb_connect(serv, session, serv->protocol);
if (bref->bref_dcb != NULL)
{
bref_clear_state(bref, BREF_CLOSED);
bref->closed_at = 0;
if (!execute_history || execute_sescmd_history(bref))
{
bref->bref_state = 0;
bref_set_state(bref, BREF_IN_USE);
atomic_add(&bref->ref->connections, 1);
rval = true;
}
else
{
MXS_ERROR("Failed to execute session command in %s ([%s]:%d). See earlier "
"errors for more details.",
bref->ref->server->unique_name,
bref->ref->server->name,
bref->ref->server->port);
RW_CHK_DCB(bref, bref->bref_dcb);
dcb_close(bref->bref_dcb);
RW_CLOSE_BREF(bref);
bref->bref_dcb = NULL;
}
}
else
{
MXS_ERROR("Unable to establish connection with server [%s]:%d",
serv->name, serv->port);
}
return rval;
}
/**
* @brief Log server connections
*
* @param select_criteria Slave selection criteria
* @param backend_ref Backend reference array
* @param router_nservers Number of backends in @p backend_ref
*/
static void log_server_connections(select_criteria_t select_criteria,
backend_ref_t *backend_ref, int router_nservers)
{
if (select_criteria == LEAST_GLOBAL_CONNECTIONS ||
select_criteria == LEAST_ROUTER_CONNECTIONS ||
select_criteria == LEAST_BEHIND_MASTER ||
select_criteria == LEAST_CURRENT_OPERATIONS)
{
MXS_INFO("Servers and %s connection counts:",
select_criteria == LEAST_GLOBAL_CONNECTIONS ? "all MaxScale"
: "router");
for (int i = 0; i < router_nservers; i++)
{
SERVER_REF *b = backend_ref[i].ref;
switch (select_criteria)
{
case LEAST_GLOBAL_CONNECTIONS:
MXS_INFO("MaxScale connections : %d in \t[%s]:%d %s",
b->server->stats.n_current, b->server->name,
b->server->port, STRSRVSTATUS(b->server));
break;
case LEAST_ROUTER_CONNECTIONS:
MXS_INFO("RWSplit connections : %d in \t[%s]:%d %s",
b->connections, b->server->name,
b->server->port, STRSRVSTATUS(b->server));
break;
case LEAST_CURRENT_OPERATIONS:
MXS_INFO("current operations : %d in \t[%s]:%d %s",
b->server->stats.n_current_ops,
b->server->name, b->server->port,
STRSRVSTATUS(b->server));
break;
case LEAST_BEHIND_MASTER:
MXS_INFO("replication lag : %d in \t[%s]:%d %s",
b->server->rlag, b->server->name,
b->server->port, STRSRVSTATUS(b->server));
default:
break;
}
}
}
}
/********************************
* This routine returns the root master server from MySQL replication tree
* Get the root Master rule:
*
* find server with the lowest replication depth level
* and the SERVER_MASTER bitval
* Servers are checked even if they are in 'maintenance'
*
* @param servers The list of servers
* @param router_nservers The number of servers
* @return The Master found
*
*/
static SERVER_REF *get_root_master(backend_ref_t *servers, int router_nservers)
{
int i = 0;
SERVER_REF *master_host = NULL;
for (i = 0; i < router_nservers; i++)
{
if (servers[i].ref == NULL)
{
/** This should not happen */
ss_dassert(false);
continue;
}
SERVER_REF *b = servers[i].ref;
if (SERVER_IS_MASTER(b->server))
{
if (master_host == NULL ||
(b->server->depth < master_host->server->depth))
{
master_host = b;
}
}
}
return master_host;
}

View File

@ -0,0 +1,429 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "readwritesplit.hh"
#include "rwsplit_internal.hh"
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <maxscale/router.h>
/**
* The functions that implement back end selection for the read write
* split router. All of these functions are internal to that router and
* not intended to be called from elsewhere.
*/
/**
* Check whether it's possible to use this server as a slave
*
* @param server The slave candidate
* @param master The master server or NULL if no master is available
*
* @return True if this server is a valid slave candidate
*/
static bool valid_for_slave(const SERVER *server, const SERVER *master)
{
return (SERVER_IS_SLAVE(server) || SERVER_IS_RELAY_SERVER(server)) &&
(master == NULL || (server != master));
}
/**
* @brief Find the best slave candidate
*
* This function iterates through @c backend and tries to find the best backend
* reference that is not in use. @c cmpfun will be called to compare the backends.
*
* @param rses Router client session
* @param master The master server
* @param cmpfun qsort() compatible comparison function
*
* @return The best slave backend reference or NULL if no candidates could be found
*/
SRWBackend get_slave_candidate(ROUTER_CLIENT_SES* rses, const SERVER *master,
int (*cmpfun)(const SRWBackend&, const SRWBackend&))
{
SRWBackend candidate;
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
{
SRWBackend& backend = *it;
if (!backend->in_use() && backend->can_connect() &&
valid_for_slave(backend->server(), master))
{
if (candidate)
{
if (cmpfun(candidate, backend) > 0)
{
candidate = backend;
}
}
else
{
candidate = backend;
}
}
}
return candidate;
}
/** Compare number of connections from this router in backend servers */
static int backend_cmp_router_conn(const SRWBackend& a, const SRWBackend& b)
{
SERVER_REF *first = a->backend();
SERVER_REF *second = b->backend();
if (first->weight == 0 && second->weight == 0)
{
return first->connections - second->connections;
}
else if (first->weight == 0)
{
return 1;
}
else if (second->weight == 0)
{
return -1;
}
return ((1000 + 1000 * first->connections) / first->weight) -
((1000 + 1000 * second->connections) / second->weight);
}
/** Compare number of global connections in backend servers */
static int backend_cmp_global_conn(const SRWBackend& a, const SRWBackend& b)
{
SERVER_REF *first = a->backend();
SERVER_REF *second = b->backend();
if (first->weight == 0 && second->weight == 0)
{
return first->server->stats.n_current -
second->server->stats.n_current;
}
else if (first->weight == 0)
{
return 1;
}
else if (second->weight == 0)
{
return -1;
}
return ((1000 + 1000 * first->server->stats.n_current) / first->weight) -
((1000 + 1000 * second->server->stats.n_current) / second->weight);
}
/** Compare replication lag between backend servers */
static int backend_cmp_behind_master(const SRWBackend& a, const SRWBackend& b)
{
SERVER_REF *first = a->backend();
SERVER_REF *second = b->backend();
if (first->weight == 0 && second->weight == 0)
{
return first->server->rlag -
second->server->rlag;
}
else if (first->weight == 0)
{
return 1;
}
else if (second->weight == 0)
{
return -1;
}
return ((1000 + 1000 * first->server->rlag) / first->weight) -
((1000 + 1000 * second->server->rlag) / second->weight);
}
/** Compare number of current operations in backend servers */
static int backend_cmp_current_load(const SRWBackend& a, const SRWBackend& b)
{
SERVER_REF *first = a->backend();
SERVER_REF *second = b->backend();
if (first->weight == 0 && second->weight == 0)
{
return first->server->stats.n_current_ops - second->server->stats.n_current_ops;
}
else if (first->weight == 0)
{
return 1;
}
else if (second->weight == 0)
{
return -1;
}
return ((1000 + 1000 * first->server->stats.n_current_ops) / first->weight) -
((1000 + 1000 * second->server->stats.n_current_ops) / second->weight);
}
/**
* The order of functions _must_ match with the order the select criteria are
* listed in select_criteria_t definition in readwritesplit.h
*/
int (*criteria_cmpfun[LAST_CRITERIA])(const SRWBackend&, const SRWBackend&) =
{
NULL,
backend_cmp_global_conn,
backend_cmp_router_conn,
backend_cmp_behind_master,
backend_cmp_current_load
};
/**
* @brief Log server connections
*
* @param criteria Slave selection criteria
* @param rses Router client session
*/
static void log_server_connections(select_criteria_t criteria,
ROUTER_CLIENT_SES* rses)
{
MXS_INFO("Servers and %s connection counts:",
criteria == LEAST_GLOBAL_CONNECTIONS ? "all MaxScale" : "router");
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
{
SERVER_REF* b = (*it)->backend();
switch (criteria)
{
case LEAST_GLOBAL_CONNECTIONS:
MXS_INFO("MaxScale connections : %d in \t[%s]:%d %s",
b->server->stats.n_current, b->server->name,
b->server->port, STRSRVSTATUS(b->server));
break;
case LEAST_ROUTER_CONNECTIONS:
MXS_INFO("RWSplit connections : %d in \t[%s]:%d %s",
b->connections, b->server->name,
b->server->port, STRSRVSTATUS(b->server));
break;
case LEAST_CURRENT_OPERATIONS:
MXS_INFO("current operations : %d in \t[%s]:%d %s",
b->server->stats.n_current_ops,
b->server->name, b->server->port,
STRSRVSTATUS(b->server));
break;
case LEAST_BEHIND_MASTER:
MXS_INFO("replication lag : %d in \t[%s]:%d %s",
b->server->rlag, b->server->name,
b->server->port, STRSRVSTATUS(b->server));
default:
ss_dassert(!true);
break;
}
}
}
/**
* @brief Find the master server that is at the root of the replication tree
*
* @param rses Router client session
*
* @return The root master reference or NULL if no master is found
*/
static SERVER_REF* get_root_master(ROUTER_CLIENT_SES* rses)
{
SERVER_REF *master_host = NULL;
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
{
SERVER_REF* b = (*it)->backend();
if (SERVER_IS_MASTER(b->server))
{
if (master_host == NULL ||
(b->server->depth < master_host->server->depth))
{
master_host = b;
}
}
}
return master_host;
}
/**
* @brief Search suitable backend servers from those of router instance
*
* It is assumed that there is only one master among servers of a router instance.
* As a result, the first master found is chosen. There will possibly be more
* backend references than connected backends because only those in correct state
* are connected to.
*
* @param router_nservers Number of backend servers
* @param max_nslaves Upper limit for the number of slaves
* @param select_criteria Slave selection criteria
* @param session Client session
* @param router Router instance
* @param rses Router client session
* @param type Connection type, ALL for all types, SLAVE for slaves only
*
* @return True if at least one master and one slave was found
*/
bool select_connect_backend_servers(int router_nservers,
int max_nslaves,
select_criteria_t select_criteria,
MXS_SESSION *session,
ROUTER_INSTANCE *router,
ROUTER_CLIENT_SES *rses,
connection_type type)
{
/* get the root Master */
SERVER_REF *master_backend = get_root_master(rses);
SERVER *master_host = master_backend ? master_backend->server : NULL;
if (router->rwsplit_config.master_failure_mode == RW_FAIL_INSTANTLY &&
(master_host == NULL || SERVER_IS_DOWN(master_host)))
{
MXS_ERROR("Couldn't find suitable Master from %d candidates.", router_nservers);
return false;
}
/**
* New session:
*
* Connect to both master and slaves
*
* Existing session:
*
* Master is already connected or we don't have a master. The function was
* called because new slaves must be selected to replace failed ones.
*/
bool master_connected = type == SLAVE || rses->current_master;
/** Check slave selection criteria and set compare function */
int (*cmpfun)(const SRWBackend&, const SRWBackend&) = criteria_cmpfun[select_criteria];
ss_dassert(cmpfun);
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{
log_server_connections(select_criteria, rses);
}
int slaves_found = 0;
int slaves_connected = 0;
const int min_nslaves = 0; /*< not configurable at the time */
bool succp = false;
if (!master_connected)
{
/** Find a master server */
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
{
SRWBackend& backend = *it;
if (backend->can_connect() && master_host && backend->server() == master_host)
{
if (backend->connect(session))
{
rses->current_master = backend;
}
}
}
}
/** Calculate how many connections we already have */
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
{
SRWBackend& backend = *it;
if (backend->can_connect() && valid_for_slave(backend->server(), master_host))
{
slaves_found += 1;
if (backend->in_use())
{
slaves_connected += 1;
}
}
}
ss_dassert(slaves_connected < max_nslaves || max_nslaves == 0);
/** Connect to all possible slaves */
for (SRWBackend backend(get_slave_candidate(rses, master_host, cmpfun));
backend && slaves_connected < max_nslaves;
backend = get_slave_candidate(rses, master_host, cmpfun))
{
if (backend->can_connect() && backend->connect(session))
{
if (rses->sescmd_list.size())
{
backend->append_session_command(rses->sescmd_list);
if (backend->execute_session_command())
{
rses->expected_responses++;
slaves_connected++;
}
}
else
{
slaves_connected++;
}
}
}
if (slaves_connected >= min_nslaves && slaves_connected <= max_nslaves)
{
succp = true;
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{
if (slaves_connected < max_nslaves)
{
MXS_INFO("Couldn't connect to maximum number of "
"slaves. Connected successfully to %d slaves "
"of %d of them.", slaves_connected, slaves_found);
}
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
{
SRWBackend& backend = *it;
if (backend->in_use())
{
MXS_INFO("Selected %s in \t%s", STRSRVSTATUS(backend->server()),
backend->uri());
}
}
}
}
else
{
MXS_ERROR("Couldn't establish required amount of slave connections for "
"router session. Would need between %d and %d slaves but only have %d.",
min_nslaves, max_nslaves, slaves_connected);
close_all_connections(rses);
}
return succp;
}

View File

@ -1,462 +0,0 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "readwritesplit.h"
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <maxscale/router.h>
#include "rwsplit_internal.h"
/**
* @file rwsplit_session_cmd.c The functions that provide session command
* handling for the read write split router.
*
* @verbatim
* Revision History
*
* Date Who Description
* 08/08/2016 Martin Brampton Initial implementation
*
* @endverbatim
*/
static bool sescmd_cursor_history_empty(sescmd_cursor_t *scur);
static void sescmd_cursor_reset(sescmd_cursor_t *scur);
static bool sescmd_cursor_next(sescmd_cursor_t *scur);
static rses_property_t *mysql_sescmd_get_property(mysql_sescmd_t *scmd);
/*
* The following functions, all to do with the handling of session commands,
* are called from other modules of the read write split router:
*/
/**
* Router session must be locked.
* Return session command pointer if succeed, NULL if failed.
*/
mysql_sescmd_t *rses_property_get_sescmd(rses_property_t *prop)
{
mysql_sescmd_t *sescmd;
if (prop == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return NULL;
}
CHK_RSES_PROP(prop);
sescmd = &prop->rses_prop_data.sescmd;
if (sescmd != NULL)
{
CHK_MYSQL_SESCMD(sescmd);
}
return sescmd;
}
/**
* Create session command property.
*/
mysql_sescmd_t *mysql_sescmd_init(rses_property_t *rses_prop,
GWBUF *sescmd_buf,
unsigned char packet_type,
ROUTER_CLIENT_SES *rses)
{
mysql_sescmd_t *sescmd;
CHK_RSES_PROP(rses_prop);
/** Can't call rses_property_get_sescmd with uninitialized sescmd */
sescmd = &rses_prop->rses_prop_data.sescmd;
sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */
#if defined(SS_DEBUG)
sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD;
sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD;
#endif
/** Set session command buffer */
sescmd->my_sescmd_buf = sescmd_buf;
sescmd->my_sescmd_packet_type = packet_type;
sescmd->position = atomic_add(&rses->pos_generator, 1);
return sescmd;
}
void mysql_sescmd_done(mysql_sescmd_t *sescmd)
{
if (sescmd == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return;
}
CHK_RSES_PROP(sescmd->my_sescmd_prop);
gwbuf_free(sescmd->my_sescmd_buf);
memset(sescmd, 0, sizeof(mysql_sescmd_t));
}
/**
* All cases where backend message starts at least with one response to session
* command are handled here.
* Read session commands from property list. If command is already replied,
* discard packet. Else send reply to client. In both cases move cursor forward
* until all session command replies are handled.
*
* Cases that are expected to happen and which are handled:
* s = response not yet replied to client, S = already replied response,
* q = query
* 1. q+ for example : select * from mysql.user
* 2. s+ for example : set autocommit=1
* 3. S+
* 4. sq+
* 5. Sq+
* 6. Ss+
* 7. Ss+q+
* 8. S+q+
* 9. s+q+
*/
GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
backend_ref_t *bref,
bool *reconnect)
{
sescmd_cursor_t *scur = &bref->bref_sescmd_cur;
mysql_sescmd_t *scmd = sescmd_cursor_get_command(scur);
ROUTER_CLIENT_SES *ses = (*scur->scmd_cur_ptr_property)->rses_prop_rsession;
CHK_GWBUF(replybuf);
/**
* Walk through packets in the message and the list of session
* commands.
*/
while (scmd != NULL && replybuf != NULL)
{
bref->reply_cmd = *((unsigned char *)replybuf->start + 4);
scur->position = scmd->position;
/** Faster backend has already responded to client : discard */
if (scmd->my_sescmd_is_replied)
{
bool last_packet = false;
CHK_GWBUF(replybuf);
while (!last_packet)
{
int buflen;
buflen = GWBUF_LENGTH(replybuf);
last_packet = GWBUF_IS_TYPE_RESPONSE_END(replybuf);
/** discard packet */
replybuf = gwbuf_consume(replybuf, buflen);
}
/** Set response status received */
bref_clear_state(bref, BREF_WAITING_RESULT);
if (bref->reply_cmd != scmd->reply_cmd && BREF_IS_IN_USE(bref))
{
MXS_ERROR("Slave server '%s': response differs from master's response. "
"Closing connection due to inconsistent session state.",
bref->ref->server->unique_name);
close_failed_bref(bref, true);
RW_CHK_DCB(bref, bref->bref_dcb);
dcb_close(bref->bref_dcb);
RW_CLOSE_BREF(bref);
*reconnect = true;
gwbuf_free(replybuf);
replybuf = NULL;
}
}
/** This is a response from the master and it is the "right" one.
* A slave server's response will be compared to this and if
* their response differs from the master server's response, they
* are dropped from the valid list of backend servers.
* Response is in the buffer and it will be sent to client.
*
* If we have no master server, the first slave's response is considered
* the "right" one. */
else if (ses->rses_master_ref == NULL ||
!BREF_IS_IN_USE(ses->rses_master_ref) ||
ses->rses_master_ref->bref_dcb == bref->bref_dcb)
{
/** Mark the rest session commands as replied */
scmd->my_sescmd_is_replied = true;
scmd->reply_cmd = *((unsigned char *)replybuf->start + 4);
MXS_INFO("Server '%s' responded to a session command, sending the response "
"to the client.", bref->ref->server->unique_name);
for (int i = 0; i < ses->rses_nbackends; i++)
{
if (!BREF_IS_WAITING_RESULT(&ses->rses_backend_ref[i]))
{
/** This backend has already received a response */
if (ses->rses_backend_ref[i].reply_cmd != scmd->reply_cmd &&
!BREF_IS_CLOSED(&ses->rses_backend_ref[i]) &&
BREF_IS_IN_USE(&ses->rses_backend_ref[i]))
{
close_failed_bref(&ses->rses_backend_ref[i], true);
if (ses->rses_backend_ref[i].bref_dcb)
{
RW_CHK_DCB(&ses->rses_backend_ref[i], ses->rses_backend_ref[i].bref_dcb);
dcb_close(ses->rses_backend_ref[i].bref_dcb);
RW_CLOSE_BREF(&ses->rses_backend_ref[i]);
}
*reconnect = true;
MXS_INFO("Disabling slave [%s]:%d, result differs from "
"master's result. Master: %d Slave: %d",
ses->rses_backend_ref[i].ref->server->name,
ses->rses_backend_ref[i].ref->server->port,
bref->reply_cmd, ses->rses_backend_ref[i].reply_cmd);
}
}
}
}
else
{
MXS_INFO("Slave '%s' responded before master to a session command. Result: %d",
bref->ref->server->unique_name,
(int)bref->reply_cmd);
if (bref->reply_cmd == 0xff)
{
SERVER *serv = bref->ref->server;
MXS_ERROR("Slave '%s' (%s:%u) failed to execute session command.",
serv->unique_name, serv->name, serv->port);
}
gwbuf_free(replybuf);
replybuf = NULL;
}
if (sescmd_cursor_next(scur))
{
scmd = sescmd_cursor_get_command(scur);
}
else
{
scmd = NULL;
/** All session commands are replied */
scur->scmd_cur_active = false;
}
}
ss_dassert(replybuf == NULL || *scur->scmd_cur_ptr_property == NULL);
return replybuf;
}
/**
* Get the address of current session command.
*
* Router session must be locked */
mysql_sescmd_t *sescmd_cursor_get_command(sescmd_cursor_t *scur)
{
mysql_sescmd_t *scmd;
scur->scmd_cur_cmd = rses_property_get_sescmd(*scur->scmd_cur_ptr_property);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
scmd = scur->scmd_cur_cmd;
return scmd;
}
/** router must be locked */
bool sescmd_cursor_is_active(sescmd_cursor_t *sescmd_cursor)
{
bool succp;
if (sescmd_cursor == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return false;
}
succp = sescmd_cursor->scmd_cur_active;
return succp;
}
/** router must be locked */
void sescmd_cursor_set_active(sescmd_cursor_t *sescmd_cursor,
bool value)
{
/** avoid calling unnecessarily */
ss_dassert(sescmd_cursor->scmd_cur_active != value);
sescmd_cursor->scmd_cur_active = value;
}
/**
* Clone session command's command buffer.
* Router session must be locked
*/
GWBUF *sescmd_cursor_clone_querybuf(sescmd_cursor_t *scur)
{
GWBUF *buf;
if (scur == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return NULL;
}
ss_dassert(scur->scmd_cur_cmd != NULL);
buf = gwbuf_clone(scur->scmd_cur_cmd->my_sescmd_buf);
CHK_GWBUF(buf);
return buf;
}
bool execute_sescmd_history(backend_ref_t *bref)
{
ss_dassert(bref);
CHK_BACKEND_REF(bref);
bool succp = true;
sescmd_cursor_t *scur = &bref->bref_sescmd_cur;
CHK_SESCMD_CUR(scur);
if (!sescmd_cursor_history_empty(scur))
{
sescmd_cursor_reset(scur);
succp = execute_sescmd_in_backend(bref);
}
return succp;
}
static bool sescmd_cursor_history_empty(sescmd_cursor_t *scur)
{
bool succp;
if (scur == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return true;
}
CHK_SESCMD_CUR(scur);
if (scur->scmd_cur_rses->rses_properties[RSES_PROP_TYPE_SESCMD] == NULL)
{
succp = true;
}
else
{
succp = false;
}
return succp;
}
/*
* End of functions called from other modules of the read write split router;
* start of functions that are internal to this module.
*/
static void sescmd_cursor_reset(sescmd_cursor_t *scur)
{
ROUTER_CLIENT_SES *rses;
if (scur == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return;
}
CHK_SESCMD_CUR(scur);
CHK_CLIENT_RSES(scur->scmd_cur_rses);
rses = scur->scmd_cur_rses;
scur->scmd_cur_ptr_property = &rses->rses_properties[RSES_PROP_TYPE_SESCMD];
CHK_RSES_PROP((*scur->scmd_cur_ptr_property));
scur->scmd_cur_active = false;
scur->scmd_cur_cmd = &(*scur->scmd_cur_ptr_property)->rses_prop_data.sescmd;
}
/**
* Moves cursor to next property and copied address of its sescmd to cursor.
* Current propery must be non-null.
* If current property is the last on the list, *scur->scmd_ptr_property == NULL
*
* Router session must be locked
*/
static bool sescmd_cursor_next(sescmd_cursor_t *scur)
{
bool succp = false;
rses_property_t *prop_curr;
rses_property_t *prop_next;
if (scur == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return false;
}
ss_dassert(scur != NULL);
ss_dassert(*(scur->scmd_cur_ptr_property) != NULL);
/** Illegal situation */
if (scur == NULL || *scur->scmd_cur_ptr_property == NULL ||
scur->scmd_cur_cmd == NULL)
{
/** Log error */
goto return_succp;
}
prop_curr = *(scur->scmd_cur_ptr_property);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
ss_dassert(prop_curr == mysql_sescmd_get_property(scur->scmd_cur_cmd));
CHK_RSES_PROP(prop_curr);
/** Copy address of pointer to next property */
scur->scmd_cur_ptr_property = &(prop_curr->rses_prop_next);
prop_next = *scur->scmd_cur_ptr_property;
ss_dassert(prop_next == *(scur->scmd_cur_ptr_property));
/** If there is a next property move forward */
if (prop_next != NULL)
{
CHK_RSES_PROP(prop_next);
CHK_RSES_PROP((*(scur->scmd_cur_ptr_property)));
/** Get pointer to next property's sescmd */
scur->scmd_cur_cmd = rses_property_get_sescmd(prop_next);
ss_dassert(prop_next == scur->scmd_cur_cmd->my_sescmd_prop);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
CHK_RSES_PROP(scur->scmd_cur_cmd->my_sescmd_prop);
}
else
{
/** No more properties, can't proceed. */
goto return_succp;
}
if (scur->scmd_cur_cmd != NULL)
{
succp = true;
}
else
{
ss_dassert(false); /*< Log error, sescmd shouldn't be NULL */
}
return_succp:
return succp;
}
static rses_property_t *mysql_sescmd_get_property(mysql_sescmd_t *scmd)
{
CHK_MYSQL_SESCMD(scmd);
return scmd->my_sescmd_prop;
}

View File

@ -0,0 +1,71 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "readwritesplit.hh"
#include "rwsplit_internal.hh"
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <maxscale/router.h>
/**
* Functions for session command handling
*/
void process_sescmd_response(ROUTER_CLIENT_SES* rses, SRWBackend& backend,
GWBUF** ppPacket, bool* pReconnect)
{
if (backend->session_command_count())
{
/** We are executing a session command */
if (GWBUF_IS_TYPE_SESCMD_RESPONSE((*ppPacket)))
{
uint8_t cmd;
gwbuf_copy_data(*ppPacket, MYSQL_HEADER_LEN, 1, &cmd);
uint64_t id = backend->complete_session_command();
if (rses->recv_sescmd < rses->sent_sescmd &&
id == rses->recv_sescmd + 1 &&
(!rses->current_master || // Session doesn't have a master
rses->current_master == backend)) // This is the master's response
{
/** First reply to this session command, route it to the client */
++rses->recv_sescmd;
/** Store the master's response so that the slave responses can
* be compared to it */
rses->sescmd_responses[id] = cmd;
}
else
{
/** The reply to this session command has already been sent to
* the client, discard it */
gwbuf_free(*ppPacket);
*ppPacket = NULL;
if (rses->sescmd_responses[id] != cmd)
{
MXS_ERROR("Slave server '%s': response differs from master's response. "
"Closing connection due to inconsistent session state.",
backend->name());
backend->close(mxs::Backend::CLOSE_FATAL);
*pReconnect = true;
}
}
}
}
}

View File

@ -1,422 +0,0 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "readwritesplit.h"
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <maxscale/alloc.h>
/* Note that modutil contains much MySQL specific code */
#include <maxscale/modutil.h>
#include <maxscale/router.h>
#include "rwsplit_internal.h"
/**
* @file rwsplit_tmp_table.c The functions that carry out checks on
* statements to see if they involve various operations involving temporary
* tables or multi-statement queries.
*
* @verbatim
* Revision History
*
* Date Who Description
* 08/08/2016 Martin Brampton Initial implementation
*
* @endverbatim
*/
/*
* The following are to do with checking whether the statement refers to
* temporary tables, or is a multi-statement request. Maybe they belong
* somewhere else, outside this router. Perhaps in the query classifier?
*/
/**
* @brief Check for dropping of temporary tables
*
* Check if the query is a DROP TABLE... query and
* if it targets a temporary table, remove it from the hashtable.
* @param router_cli_ses Router client session
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
*/
void check_drop_tmp_table(ROUTER_CLIENT_SES *router_cli_ses, GWBUF *querybuf,
mysql_server_cmd_t packet_type)
{
if (packet_type != MYSQL_COM_QUERY && packet_type != MYSQL_COM_DROP_DB)
{
return;
}
int tsize = 0, klen = 0, i;
char **tbl = NULL;
char *hkey, *dbname;
MYSQL_session *my_data;
rses_property_t *rses_prop_tmp;
MYSQL_session *data = (MYSQL_session *)router_cli_ses->client_dcb->data;
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
dbname = (char *)data->db;
if (qc_is_drop_table_query(querybuf))
{
tbl = qc_get_table_names(querybuf, &tsize, false);
if (tbl != NULL)
{
for (i = 0; i < tsize; i++)
{
/* Not clear why the next six lines are outside the if block */
klen = strlen(dbname) + strlen(tbl[i]) + 2;
hkey = MXS_CALLOC(klen, sizeof(char));
MXS_ABORT_IF_NULL(hkey);
strcpy(hkey, dbname);
strcat(hkey, ".");
strcat(hkey, tbl[i]);
if (rses_prop_tmp && rses_prop_tmp->rses_prop_data.temp_tables)
{
if (hashtable_delete(rses_prop_tmp->rses_prop_data.temp_tables,
(void *)hkey))
{
MXS_INFO("Temporary table dropped: %s", hkey);
}
}
MXS_FREE(tbl[i]);
MXS_FREE(hkey);
}
MXS_FREE(tbl);
}
}
}
/**
* Check if the query targets a temporary table.
* @param router_cli_ses Router client session
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
* @return The type of the query
*/
bool is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf,
qc_query_type_t qtype)
{
bool target_tmp_table = false;
int tsize = 0, klen = 0, i;
char **tbl = NULL;
char *dbname;
char hkey[MYSQL_DATABASE_MAXLEN + MYSQL_TABLE_MAXLEN + 2];
MYSQL_session *data;
bool rval = false;
rses_property_t *rses_prop_tmp;
if (router_cli_ses == NULL || querybuf == NULL)
{
MXS_ERROR("[%s] Error: NULL parameters passed: %p %p", __FUNCTION__,
router_cli_ses, querybuf);
return false;
}
if (router_cli_ses->client_dcb == NULL)
{
MXS_ERROR("[%s] Error: Client DCB is NULL.", __FUNCTION__);
return false;
}
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
data = (MYSQL_session *)router_cli_ses->client_dcb->data;
if (data == NULL)
{
MXS_ERROR("[%s] Error: User data in client DBC is NULL.", __FUNCTION__);
return false;
}
dbname = (char *)data->db;
if (qc_query_is_type(qtype, QUERY_TYPE_READ) ||
qc_query_is_type(qtype, QUERY_TYPE_LOCAL_READ) ||
qc_query_is_type(qtype, QUERY_TYPE_USERVAR_READ) ||
qc_query_is_type(qtype, QUERY_TYPE_SYSVAR_READ) ||
qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_READ))
{
tbl = qc_get_table_names(querybuf, &tsize, false);
if (tbl != NULL && tsize > 0)
{
/** Query targets at least one table */
for (i = 0; i < tsize && !target_tmp_table && tbl[i]; i++)
{
sprintf(hkey, "%s.%s", dbname, tbl[i]);
if (rses_prop_tmp && rses_prop_tmp->rses_prop_data.temp_tables)
{
if (hashtable_fetch(rses_prop_tmp->rses_prop_data.temp_tables, hkey))
{
/**Query target is a temporary table*/
rval = true;
MXS_INFO("Query targets a temporary table: %s", hkey);
break;
}
}
}
}
}
if (tbl != NULL)
{
for (i = 0; i < tsize; i++)
{
MXS_FREE(tbl[i]);
}
MXS_FREE(tbl);
}
return rval;
}
/**
* If query is of type QUERY_TYPE_CREATE_TMP_TABLE then find out
* the database and table name, create a hashvalue and
* add it to the router client session's property. If property
* doesn't exist then create it first.
* @param router_cli_ses Router client session
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
*/
void check_create_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf, qc_query_type_t type)
{
if (!qc_query_is_type(type, QUERY_TYPE_CREATE_TMP_TABLE))
{
return;
}
int klen = 0;
char *hkey, *dbname;
MYSQL_session *data;
rses_property_t *rses_prop_tmp;
HASHTABLE *h;
if (router_cli_ses == NULL || querybuf == NULL)
{
MXS_ERROR("[%s] Error: NULL parameters passed: %p %p", __FUNCTION__,
router_cli_ses, querybuf);
return;
}
if (router_cli_ses->client_dcb == NULL)
{
MXS_ERROR("[%s] Error: Client DCB is NULL.", __FUNCTION__);
return;
}
router_cli_ses->have_tmp_tables = true;
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
data = (MYSQL_session *)router_cli_ses->client_dcb->data;
if (data == NULL)
{
MXS_ERROR("[%s] Error: User data in master server DBC is NULL.",
__FUNCTION__);
return;
}
dbname = (char *)data->db;
bool is_temp = true;
char *tblname = NULL;
tblname = qc_get_created_table_name(querybuf);
if (tblname && strlen(tblname) > 0)
{
klen = strlen(dbname) + strlen(tblname) + 2;
hkey = MXS_CALLOC(klen, sizeof(char));
MXS_ABORT_IF_NULL(hkey);
strcpy(hkey, dbname);
strcat(hkey, ".");
strcat(hkey, tblname);
}
else
{
hkey = NULL;
}
if (rses_prop_tmp == NULL)
{
if ((rses_prop_tmp = (rses_property_t *)MXS_CALLOC(1, sizeof(rses_property_t))))
{
#if defined(SS_DEBUG)
rses_prop_tmp->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY;
rses_prop_tmp->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY;
#endif
rses_prop_tmp->rses_prop_rsession = router_cli_ses;
rses_prop_tmp->rses_prop_refcount = 1;
rses_prop_tmp->rses_prop_next = NULL;
rses_prop_tmp->rses_prop_type = RSES_PROP_TYPE_TMPTABLES;
router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES] = rses_prop_tmp;
}
}
if (rses_prop_tmp)
{
if (rses_prop_tmp->rses_prop_data.temp_tables == NULL)
{
h = hashtable_alloc(7, rwsplit_hashkeyfun, rwsplit_hashcmpfun);
hashtable_memory_fns(h, rwsplit_hstrdup, NULL, rwsplit_hfree, NULL);
if (h != NULL)
{
rses_prop_tmp->rses_prop_data.temp_tables = h;
}
else
{
MXS_ERROR("Failed to allocate a new hashtable.");
}
}
if (hkey && rses_prop_tmp->rses_prop_data.temp_tables &&
hashtable_add(rses_prop_tmp->rses_prop_data.temp_tables, (void *)hkey,
(void *)is_temp) == 0) /*< Conflict in hash table */
{
MXS_INFO("Temporary table conflict in hashtable: %s", hkey);
}
#if defined(SS_DEBUG)
{
bool retkey = hashtable_fetch(rses_prop_tmp->rses_prop_data.temp_tables, hkey);
if (retkey)
{
MXS_INFO("Temporary table added: %s", hkey);
}
}
#endif
}
MXS_FREE(hkey);
MXS_FREE(tblname);
}
/**
* @brief Detect multi-statement queries
*
* It is possible that the session state is modified inside a multi-statement
* query which would leave any slave sessions in an inconsistent state. Due to
* this, for the duration of this session, all queries will be sent to the
* master
* if the current query contains a multi-statement query.
* @param rses Router client session
* @param buf Buffer containing the full query
* @return True if the query contains multiple statements
*/
bool check_for_multi_stmt(GWBUF *buf, void *protocol, mysql_server_cmd_t packet_type)
{
MySQLProtocol *proto = (MySQLProtocol *)protocol;
bool rval = false;
if (proto->client_capabilities & GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS &&
packet_type == MYSQL_COM_QUERY)
{
char *ptr, *data = (char*)GWBUF_DATA(buf) + 5;
/** Payload size without command byte */
int buflen = gw_mysql_get_byte3((uint8_t *)GWBUF_DATA(buf)) - 1;
if ((ptr = strnchr_esc_mysql(data, ';', buflen)))
{
/** Skip stored procedures etc. */
while (ptr && is_mysql_sp_end(ptr, buflen - (ptr - data)))
{
ptr = strnchr_esc_mysql(ptr + 1, ';', buflen - (ptr - data) - 1);
}
if (ptr)
{
if (ptr < data + buflen &&
!is_mysql_statement_end(ptr, buflen - (ptr - data)))
{
rval = true;
}
}
}
}
return rval;
}
/**
* @brief Determine the type of a query
*
* @param querybuf GWBUF containing the query
* @param packet_type Integer denoting DB specific enum
* @param non_empty_packet Boolean to be set by this function
*
* @return qc_query_type_t the query type; also the non_empty_packet bool is set
*/
qc_query_type_t
determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet)
{
qc_query_type_t qtype = QUERY_TYPE_UNKNOWN;
if (non_empty_packet)
{
mysql_server_cmd_t my_packet_type = (mysql_server_cmd_t)packet_type;
switch (my_packet_type)
{
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
case MYSQL_COM_PING: /*< 0e all servers are pinged */
case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
case MYSQL_COM_SET_OPTION: /*< 1b send options to all servers */
qtype = QUERY_TYPE_SESSION_WRITE;
break;
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
qtype = QUERY_TYPE_WRITE;
break;
case MYSQL_COM_QUERY:
qtype = qc_get_type_mask(querybuf);
break;
case MYSQL_COM_STMT_PREPARE:
qtype = qc_get_type_mask(querybuf);
qtype |= QUERY_TYPE_PREPARE_STMT;
break;
case MYSQL_COM_STMT_EXECUTE:
/** Parsing is not needed for this type of packet */
qtype = QUERY_TYPE_EXEC_STMT;
break;
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
case MYSQL_COM_STATISTICS: /**< 9 ? */
case MYSQL_COM_PROCESS_INFO: /**< 0a ? */
case MYSQL_COM_CONNECT: /**< 0b ? */
case MYSQL_COM_PROCESS_KILL: /**< 0c ? */
case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */
case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */
case MYSQL_COM_DAEMON: /**< 1d ? */
default:
break;
} /**< switch by packet type */
}
return qtype;
}

View File

@ -0,0 +1,265 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "readwritesplit.hh"
#include "rwsplit_internal.hh"
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <maxscale/modutil.h>
#include <maxscale/alloc.h>
#include <maxscale/router.h>
/**
* The functions that carry out checks on statements to see if they involve
* various operations involving temporary tables or multi-statement queries.
*/
/*
* The following are to do with checking whether the statement refers to
* temporary tables, or is a multi-statement request. Maybe they belong
* somewhere else, outside this router. Perhaps in the query classifier?
*/
/**
* @brief Check for dropping of temporary tables
*
* Check if the query is a DROP TABLE... query and
* if it targets a temporary table, remove it from the hashtable.
* @param router_cli_ses Router client session
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
*/
void check_drop_tmp_table(ROUTER_CLIENT_SES *router_cli_ses, GWBUF *querybuf)
{
if (qc_is_drop_table_query(querybuf))
{
const QC_FIELD_INFO* info;
size_t n_infos;
qc_get_field_info(querybuf, &info, &n_infos);
for (size_t i = 0; i < n_infos; i++)
{
const char* db = mxs_mysql_get_current_db(router_cli_ses->client_dcb->session);
std::string table = info[i].database ? info[i].database : db;
table += ".";
if (info[i].table)
{
table += info[i].table;
}
router_cli_ses->temp_tables.erase(table);
}
}
}
/**
* Check if the query targets a temporary table.
* @param router_cli_ses Router client session
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
* @return The type of the query
*/
bool is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf,
uint32_t qtype)
{
ss_dassert(router_cli_ses && querybuf && router_cli_ses->client_dcb);
bool rval = false;
if (qtype & (QUERY_TYPE_READ |
QUERY_TYPE_LOCAL_READ |
QUERY_TYPE_USERVAR_READ |
QUERY_TYPE_SYSVAR_READ |
QUERY_TYPE_GSYSVAR_READ))
{
const QC_FIELD_INFO* info;
size_t n_infos;
qc_get_field_info(querybuf, &info, &n_infos);
for (size_t i = 0; i < n_infos; i++)
{
const char* db = mxs_mysql_get_current_db(router_cli_ses->client_dcb->session);
std::string table = info[i].database ? info[i].database : db;
table += ".";
if (info[i].table)
{
table += info[i].table;
}
if (router_cli_ses->temp_tables.find(table) !=
router_cli_ses->temp_tables.end())
{
rval = true;
MXS_INFO("Query targets a temporary table: %s", table.c_str());
break;
}
}
}
return rval;
}
/**
* If query is of type QUERY_TYPE_CREATE_TMP_TABLE then find out
* the database and table name, create a hashvalue and
* add it to the router client session's property. If property
* doesn't exist then create it first.
* @param router_cli_ses Router client session
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
*/
void check_create_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf, uint32_t type)
{
if (qc_query_is_type(type, QUERY_TYPE_CREATE_TMP_TABLE))
{
ss_dassert(router_cli_ses && querybuf && router_cli_ses->client_dcb &&
router_cli_ses->client_dcb->data);
router_cli_ses->have_tmp_tables = true;
char* tblname = qc_get_created_table_name(querybuf);
std::string table;
if (tblname && *tblname)
{
const char* db = mxs_mysql_get_current_db(router_cli_ses->client_dcb->session);
table += db;
table += ".";
table += tblname;
}
/** Add the table to the set of temporary tables */
router_cli_ses->temp_tables.insert(table);
MXS_FREE(tblname);
}
}
/**
* @brief Detect multi-statement queries
*
* It is possible that the session state is modified inside a multi-statement
* query which would leave any slave sessions in an inconsistent state. Due to
* this, for the duration of this session, all queries will be sent to the
* master
* if the current query contains a multi-statement query.
* @param rses Router client session
* @param buf Buffer containing the full query
* @return True if the query contains multiple statements
*/
bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type)
{
MySQLProtocol *proto = (MySQLProtocol *)protocol;
bool rval = false;
if (proto->client_capabilities & GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS &&
packet_type == MYSQL_COM_QUERY)
{
char *ptr, *data = (char*)GWBUF_DATA(buf) + 5;
/** Payload size without command byte */
int buflen = gw_mysql_get_byte3((uint8_t *)GWBUF_DATA(buf)) - 1;
if ((ptr = strnchr_esc_mysql(data, ';', buflen)))
{
/** Skip stored procedures etc. */
while (ptr && is_mysql_sp_end(ptr, buflen - (ptr - data)))
{
ptr = strnchr_esc_mysql(ptr + 1, ';', buflen - (ptr - data) - 1);
}
if (ptr)
{
if (ptr < data + buflen &&
!is_mysql_statement_end(ptr, buflen - (ptr - data)))
{
rval = true;
}
}
}
}
return rval;
}
/**
* @brief Determine the type of a query
*
* @param querybuf GWBUF containing the query
* @param packet_type Integer denoting DB specific enum
* @param non_empty_packet Boolean to be set by this function
*
* @return uint32_t the query type; also the non_empty_packet bool is set
*/
uint32_t
determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet)
{
uint32_t qtype = QUERY_TYPE_UNKNOWN;
if (non_empty_packet)
{
uint8_t my_packet_type = (uint8_t)packet_type;
switch (my_packet_type)
{
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
case MYSQL_COM_PING: /*< 0e all servers are pinged */
case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
case MYSQL_COM_SET_OPTION: /*< 1b send options to all servers */
qtype = QUERY_TYPE_SESSION_WRITE;
break;
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
qtype = QUERY_TYPE_WRITE;
break;
case MYSQL_COM_QUERY:
qtype = qc_get_type_mask(querybuf);
break;
case MYSQL_COM_STMT_PREPARE:
qtype = qc_get_type_mask(querybuf);
qtype |= QUERY_TYPE_PREPARE_STMT;
break;
case MYSQL_COM_STMT_EXECUTE:
/** Parsing is not needed for this type of packet */
qtype = QUERY_TYPE_EXEC_STMT;
break;
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
case MYSQL_COM_STATISTICS: /**< 9 ? */
case MYSQL_COM_PROCESS_INFO: /**< 0a ? */
case MYSQL_COM_CONNECT: /**< 0b ? */
case MYSQL_COM_PROCESS_KILL: /**< 0c ? */
case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */
case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */
case MYSQL_COM_DAEMON: /**< 1d ? */
default:
break;
} /**< switch by packet type */
}
return qtype;
}

View File

@ -1,5 +1,5 @@
add_library(schemarouter SHARED schemarouter.cc schemarouterinstance.cc schemaroutersession.cc shard_map.cc session_command.cc)
target_link_libraries(schemarouter maxscale-common)
add_library(schemarouter SHARED schemarouter.cc schemarouterinstance.cc schemaroutersession.cc shard_map.cc)
target_link_libraries(schemarouter maxscale-common MySQLCommon)
add_dependencies(schemarouter pcre2)
set_target_properties(schemarouter PROPERTIES VERSION "1.0.0")
install_module(schemarouter core)

View File

@ -13,218 +13,17 @@
#include "schemarouter.hh"
#include <maxscale/protocol/mysql.h>
using namespace schemarouter;
Backend::Backend(SERVER_REF *ref):
m_closed(false),
m_backend(ref),
m_dcb(NULL),
m_mapped(false),
m_num_result_wait(0),
m_state(0)
namespace schemarouter
{
}
Backend::~Backend()
{
ss_dassert(m_closed);
if (!m_closed)
{
close();
}
}
void Backend::close()
{
if (!m_closed)
{
m_closed = true;
if (in_use())
{
CHK_DCB(m_dcb);
/** Clean operation counter in bref and in SERVER */
while (is_waiting_result())
{
clear_state(BREF_WAITING_RESULT);
}
clear_state(BREF_IN_USE);
set_state(BREF_CLOSED);
dcb_close(m_dcb);
/** decrease server current connection counters */
atomic_add(&m_backend->connections, -1);
}
}
else
{
ss_dassert(false);
}
}
bool Backend::execute_session_command()
{
if (is_closed() || !session_command_count())
{
return false;
}
CHK_DCB(m_dcb);
int rc = 0;
SessionCommandList::iterator iter = m_session_commands.begin();
GWBUF *buffer = iter->copy_buffer().release();
switch (iter->get_command())
{
case MYSQL_COM_CHANGE_USER:
/** This makes it possible to handle replies correctly */
gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD);
rc = m_dcb->func.auth(m_dcb, NULL, m_dcb->session, buffer);
break;
case MYSQL_COM_QUERY:
default:
/**
* Mark session command buffer, it triggers writing
* MySQL command to protocol
*/
gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD);
rc = m_dcb->func.write(m_dcb, buffer);
break;
}
return rc == 1;
}
void Backend::add_session_command(GWBUF* buffer, uint64_t sequence)
{
m_session_commands.push_back(SessionCommand(buffer, sequence));
}
uint64_t Backend::complete_session_command()
{
uint64_t rval = m_session_commands.front().get_position();
m_session_commands.pop_front();
return rval;
}
size_t Backend::session_command_count() const
{
return m_session_commands.size();
}
void Backend::clear_state(enum bref_state state)
{
if (state != BREF_WAITING_RESULT)
{
m_state &= ~state;
}
else
{
/** Decrease global operation count */
ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, -1);
ss_dassert(prev2 > 0);
}
}
void Backend::set_state(enum bref_state state)
{
if (state != BREF_WAITING_RESULT)
{
m_state |= state;
}
else
{
/** Increase global operation count */
ss_debug(int prev2 = )atomic_add(&m_backend->server->stats.n_current_ops, 1);
ss_dassert(prev2 >= 0);
}
}
SERVER_REF* Backend::backend() const
{
return m_backend;
}
bool Backend::connect(MXS_SESSION* session)
{
bool rval = false;
if ((m_dcb = dcb_connect(m_backend->server, session, m_backend->server->protocol)))
{
m_state = BREF_IN_USE;
atomic_add(&m_backend->connections, 1);
rval = true;
}
return rval;
}
DCB* Backend::dcb() const
{
return m_dcb;
}
bool Backend::write(GWBUF* buffer)
{
return m_dcb->func.write(m_dcb, buffer) != 0;
}
void Backend::store_command(GWBUF* buffer)
{
m_pending_cmd.reset(buffer);
}
bool Backend::write_stored_command()
{
bool rval = false;
if (m_pending_cmd.length())
{
rval = write(m_pending_cmd.release());
if (!rval)
{
MXS_ERROR("Routing of pending query failed.");
}
}
return rval;
}
bool Backend::in_use() const
{
return m_state & BREF_IN_USE;
}
bool Backend::is_waiting_result() const
{
return m_num_result_wait > 0;
}
bool Backend::is_query_active() const
{
return m_state & BREF_QUERY_ACTIVE;
}
bool Backend::is_closed() const
{
return m_state & BREF_CLOSED;
}
void Backend::set_mapped(bool value)
void SRBackend::set_mapped(bool value)
{
m_mapped = value;
}
bool Backend::is_mapped() const
bool SRBackend::is_mapped() const
{
return m_mapped;
}
}

View File

@ -29,27 +29,7 @@
#include <maxscale/buffer.hh>
#include <maxscale/pcre2.h>
#include <maxscale/service.h>
#include "session_command.hh"
using std::list;
using std::set;
using std::string;
using std::tr1::shared_ptr;
using maxscale::Buffer;
/**
* The state of the backend server reference
*/
enum bref_state
{
BREF_IN_USE = 0x01,
BREF_WAITING_RESULT = 0x02, /**< for session commands only */
BREF_QUERY_ACTIVE = 0x04, /**< for other queries */
BREF_CLOSED = 0x08,
BREF_DB_MAPPED = 0x10
};
#include <maxscale/backend.hh>
namespace schemarouter
{
@ -65,7 +45,7 @@ struct Config
bool debug; /**< Enable verbose debug messages to clients */
pcre2_code* ignore_regex; /**< Regular expression used to ignore databases */
pcre2_match_data* ignore_match_data; /**< Match data for @c ignore_regex */
set<string> ignored_dbs; /**< Set of ignored databases */
std::set<std::string> ignored_dbs; /**< Set of ignored databases */
Config():
refresh_min_interval(0.0),
@ -110,151 +90,23 @@ struct Stats
};
/**
* Reference to BACKEND.
* Reference to a backend
*
* Owned by router client session.
*/
class Backend
class SRBackend: public mxs::Backend
{
public:
/**
* @brief Create new Backend
*
* @param ref Server reference used by this backend
*/
Backend(SERVER_REF *ref);
~Backend();
SRBackend(SERVER_REF *ref):
mxs::Backend(ref),
m_mapped(false)
{
}
/**
* @brief Execute the next session command in the queue
*
* @return True if the command was executed successfully
*/
bool execute_session_command();
/**
* @brief Add a new session command to the tail of the command queue
*
* @param buffer Session command to add
* @param sequence Sequence identifier of this session command, returned when
* the session command is completed
*/
void add_session_command(GWBUF* buffer, uint64_t sequence);
/**
* @brief Mark the current session command as successfully executed
*
* This should be called when the response to the command is received
*
* @return The sequence identifier for this session command
*/
uint64_t complete_session_command();
/**
* @brief Check if backend has session commands
*
* @return True if backend has session commands
*/
size_t session_command_count() const;
/**
* @brief Clear state
*
* @param state State to clear
*/
void clear_state(enum bref_state state);
/**
* @brief Set state
*
* @param state State to set
*/
void set_state(enum bref_state state);
/**
* @brief Get pointer to server reference
*
* @return Pointer to server reference
*/
SERVER_REF* backend() const;
/**
* @brief Create a new connection
*
* @param session The session to which the connection is linked
*
* @return True if connection was successfully created
*/
bool connect(MXS_SESSION* session);
/**
* @brief Close the backend
*
* This will close all active connections created by the backend.
*/
void close();
/**
* @brief Get a pointer to the internal DCB
*
* @return Pointer to internal DCB
*/
DCB* dcb() const;
/**
* @brief Write data to the backend server
*
* @param buffer Buffer containing the data to write
*
* @return True if data was written successfully
*/
bool write(GWBUF* buffer);
/**
* @brief Store a command
*
* The command is stored and executed once the session can execute
* the next command.
*
* @param buffer Buffer to store
*/
void store_command(GWBUF* buffer);
/**
* @brief Write the stored command to the backend server
*
* @return True if command was written successfully
*/
bool write_stored_command();
/**
* @brief Check if backend is in use
*
* @return True if backend is in use
*/
bool in_use() const;
/**
* @brief Check if backend is waiting for a result
*
* @return True if backend is waiting for a result
*/
bool is_waiting_result() const;
/**
* @brief Check if a query is active
*
* @return True if a query is active
*/
bool is_query_active() const;
/**
* @brief Check if the backend is closed
*
* @return True if the backend is closed
*/
bool is_closed() const;
~SRBackend()
{
}
/**
* @brief Set the mapping state of the backend
@ -271,18 +123,10 @@ public:
bool is_mapped() const;
private:
bool m_closed; /**< True if a connection has been opened and closed */
SERVER_REF* m_backend; /**< Backend server */
DCB* m_dcb; /**< Backend DCB */
bool m_mapped; /**< Whether the backend has been mapped */
int m_num_result_wait; /**< Number of not yet received results */
Buffer m_pending_cmd; /**< Pending commands */
int m_state; /**< State of the backend */
SessionCommandList m_session_commands; /**< List of session commands that are
* to be executed on this backend server */
bool m_mapped; /**< Whether the backend has been mapped */
};
typedef shared_ptr<Backend> SBackend;
typedef list<SBackend> BackendList;
typedef std::tr1::shared_ptr<SRBackend> SSRBackend;
typedef std::list<SSRBackend> SSRBackendList;
}

View File

@ -34,6 +34,9 @@
using std::string;
using std::map;
namespace schemarouter
{
#define DEFAULT_REFRESH_INTERVAL "300"
/**
@ -195,7 +198,7 @@ SchemaRouter* SchemaRouter::create(SERVICE* pService, char** pzOptions)
* connections because all servers are supposed to be operational. It is,
* however, possible that there are less available servers than expected.
*/
bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
bool connect_backend_servers(SSRBackendList& backends, MXS_SESSION* session)
{
bool succp = false;
int servers_found = 0;
@ -206,7 +209,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
{
MXS_INFO("Servers and connection counts:");
for (BackendList::iterator it = backends.begin(); it != backends.end(); it++)
for (SSRBackendList::iterator it = backends.begin(); it != backends.end(); it++)
{
SERVER_REF* b = (*it)->backend();
@ -222,7 +225,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
* Scan server list and connect each of them. None should fail or session
* can't be established.
*/
for (BackendList::iterator it = backends.begin(); it != backends.end(); it++)
for (SSRBackendList::iterator it = backends.begin(); it != backends.end(); it++)
{
SERVER_REF* b = (*it)->backend();
@ -262,7 +265,7 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{
for (BackendList::iterator it = backends.begin(); it != backends.end(); it++)
for (SSRBackendList::iterator it = backends.begin(); it != backends.end(); it++)
{
SERVER_REF* b = (*it)->backend();
@ -282,13 +285,13 @@ bool connect_backend_servers(BackendList& backends, MXS_SESSION* session)
SchemaRouterSession* SchemaRouter::newSession(MXS_SESSION* pSession)
{
BackendList backends;
SSRBackendList backends;
for (SERVER_REF *ref = m_service->dbref; ref; ref = ref->next)
{
if (ref->active)
{
backends.push_back(SBackend(new Backend(ref)));
backends.push_back(SSRBackend(new SRBackend(ref)));
}
}
@ -336,8 +339,8 @@ void SchemaRouter::diagnostics(DCB* dcb)
json_t* SchemaRouter::diagnostics_json() const
{
double sescmd_pct = m_stats.n_sescmd != 0 ?
100.0 * ((double)m_stats.n_sescmd / (double)m_stats.n_queries) :
0.0;
100.0 * ((double)m_stats.n_sescmd / (double)m_stats.n_queries) :
0.0;
json_t* rval = json_object();
json_object_set_new(rval, "queries", json_integer(m_stats.n_queries));
@ -364,6 +367,8 @@ uint64_t SchemaRouter::getCapabilities()
return RCAP_TYPE_NONE;
}
}
MXS_BEGIN_DECLS
/**
@ -384,7 +389,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
"A database sharding router for simple sharding",
"V1.0.0",
RCAP_TYPE_CONTIGUOUS_INPUT,
&SchemaRouter::s_object,
&schemarouter::SchemaRouter::s_object,
NULL, /* Process init. */
NULL, /* Process finish. */
NULL, /* Thread init. */

View File

@ -22,9 +22,8 @@
#include "schemaroutersession.hh"
using std::string;
using std::set;
using namespace schemarouter;
namespace schemarouter
{
class SchemaRouterSession;
@ -54,3 +53,5 @@ private:
SPINLOCK m_lock; /*< Lock for the instance data */
Stats m_stats; /*< Statistics for this router */
};
}

View File

@ -12,6 +12,8 @@
*/
#include "schemarouter.hh"
#include "schemaroutersession.hh"
#include "schemarouterinstance.hh"
#include <inttypes.h>
@ -19,10 +21,10 @@
#include <maxscale/query_classifier.h>
#include <maxscale/modutil.h>
#include "schemaroutersession.hh"
#include "schemarouterinstance.hh"
namespace schemarouter
{
bool connect_backend_servers(BackendList& backends, MXS_SESSION* session);
bool connect_backend_servers(SSRBackendList& backends, MXS_SESSION* session);
enum route_target get_shard_route_target(uint32_t qtype);
bool change_current_db(string& dest, Shard& shard, GWBUF* buf);
@ -30,7 +32,8 @@ bool extract_database(GWBUF* buf, char* str);
bool detect_show_shards(GWBUF* query);
void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg);
SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router, BackendList& backends):
SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router,
SSRBackendList& backends):
mxs::RouterSession(session),
m_closed(false),
m_client(session->client_dcb),
@ -46,28 +49,23 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* rou
{
char db[MYSQL_DATABASE_MAXLEN + 1] = "";
MySQLProtocol* protocol = (MySQLProtocol*)session->client_dcb->protocol;
MYSQL_session* data = (MYSQL_session*)session->client_dcb->data;
bool using_db = false;
bool have_db = false;
const char* current_db = mxs_mysql_get_current_db(session);
/* To enable connecting directly to a sharded database we first need
* to disable it for the client DCB's protocol so that we can connect to them*/
if (protocol->client_capabilities & GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB &&
(have_db = strnlen(data->db, MYSQL_DATABASE_MAXLEN) > 0))
(have_db = *current_db))
{
protocol->client_capabilities &= ~GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB;
strcpy(db, data->db);
*data->db = 0;
strcpy(db, current_db);
mxs_mysql_set_current_db(session, "");
using_db = true;
MXS_INFO("Client logging in directly to a database '%s', "
"postponing until databases have been mapped.", db);
}
if (!have_db)
{
MXS_INFO("Client'%s' connecting with empty database.", data->user);
}
if (using_db)
{
m_state |= INIT_USE_DB;
@ -97,11 +95,15 @@ void SchemaRouterSession::close()
{
m_closed = true;
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
SSRBackend& bref = *it;
/** The backends are closed here to trigger the shutdown of
* the connected DCBs */
(*it)->close();
if (bref->in_use())
{
bref->close();
}
}
spinlock_acquire(&m_router->m_lock);
@ -230,7 +232,7 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket,
if (TARGET_IS_ANY(route_target))
{
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
SERVER *server = (*it)->backend()->server;
if (SERVER_IS_RUNNING(server))
@ -419,7 +421,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
get_shard_dcb(&target_dcb, target->unique_name))
{
/** We know where to route this query */
SBackend bref = get_bref_from_dcb(target_dcb);
SSRBackend bref = get_bref_from_dcb(target_dcb);
if (op == QUERY_OP_LOAD)
{
@ -436,23 +438,22 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
pPacket = NULL;
ret = 1;
}
else if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1)
else if (bref->write(pPacket))
{
/** Add one query response waiter to backend reference */
bref->set_state(BREF_QUERY_ACTIVE);
bref->set_state(BREF_WAITING_RESULT);
atomic_add(&m_router->m_stats.n_queries, 1);
ret = 1;
}
else
{
MXS_ERROR("Routing query failed.");
gwbuf_free(pPacket);
}
}
gwbuf_free(pPacket);
return ret;
}
void SchemaRouterSession::handle_mapping_reply(SBackend& bref, GWBUF** pPacket)
void SchemaRouterSession::handle_mapping_reply(SSRBackend& bref, GWBUF** pPacket)
{
int rc = inspect_mapping_states(bref, pPacket);
@ -486,7 +487,7 @@ void SchemaRouterSession::handle_mapping_reply(SBackend& bref, GWBUF** pPacket)
}
}
void SchemaRouterSession::process_sescmd_response(SBackend& bref, GWBUF** ppPacket)
void SchemaRouterSession::process_sescmd_response(SSRBackend& bref, GWBUF** ppPacket)
{
if (bref->session_command_count())
{
@ -508,23 +509,12 @@ void SchemaRouterSession::process_sescmd_response(SBackend& bref, GWBUF** ppPack
*ppPacket = NULL;
}
}
if (*ppPacket)
{
bref->clear_state(BREF_WAITING_RESULT);
}
}
else if (bref->is_query_active())
{
bref->clear_state(BREF_QUERY_ACTIVE);
/** Set response status as replied */
bref->clear_state(BREF_WAITING_RESULT);
}
}
void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
{
SBackend bref = get_bref_from_dcb(pDcb);
SSRBackend bref = get_bref_from_dcb(pDcb);
if (m_closed || bref.get() == NULL) // The bref should always be valid
{
@ -567,6 +557,10 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
{
process_sescmd_response(bref, &pPacket);
ss_dassert(bref->is_waiting_result());
/** Set response status as replied */
bref->ack_write();
if (pPacket)
{
MXS_SESSION_ROUTE_REPLY(pDcb->session, pPacket);
@ -581,8 +575,6 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
else if (bref->write_stored_command())
{
atomic_add(&m_router->m_stats.n_queries, 1);
bref->set_state(BREF_QUERY_ACTIVE);
bref->set_state(BREF_WAITING_RESULT);
}
}
@ -596,7 +588,7 @@ void SchemaRouterSession::handleError(GWBUF* pMessage,
{
ss_dassert(pProblem->dcb_role == DCB_ROLE_BACKEND_HANDLER);
CHK_DCB(pProblem);
SBackend bref = get_bref_from_dcb(pProblem);
SSRBackend bref = get_bref_from_dcb(pProblem);
if (bref.get() == NULL) // Should never happen
{
@ -728,13 +720,13 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
/** Increment the session command count */
++m_sent_sescmd;
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
if ((*it)->in_use())
{
GWBUF *buffer = gwbuf_clone(querybuf);
(*it)->add_session_command(buffer, m_sent_sescmd);
(*it)->append_session_command(buffer, m_sent_sescmd);
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{
@ -746,19 +738,6 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
if ((*it)->session_command_count() == 1)
{
/** Only one command, execute it */
switch (command)
{
/** These types of commands don't generate responses */
case MYSQL_COM_QUIT:
case MYSQL_COM_STMT_CLOSE:
break;
default:
(*it)->set_state(BREF_WAITING_RESULT);
break;
}
if ((*it)->execute_session_command())
{
succp = true;
@ -783,6 +762,7 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
}
}
gwbuf_free(querybuf);
return succp;
}
@ -794,7 +774,7 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
*/
bool SchemaRouterSession::have_servers()
{
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
if ((*it)->in_use() && !(*it)->is_closed())
{
@ -813,11 +793,11 @@ bool SchemaRouterSession::have_servers()
*
* @return backend reference pointer if succeed or NULL
*/
SBackend SchemaRouterSession::get_bref_from_dcb(DCB* dcb)
SSRBackend SchemaRouterSession::get_bref_from_dcb(DCB* dcb)
{
CHK_DCB(dcb);
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
if ((*it)->dcb() == dcb)
{
@ -827,7 +807,7 @@ SBackend SchemaRouterSession::get_bref_from_dcb(DCB* dcb)
// This should not happen
ss_dassert(false);
return SBackend(reinterpret_cast<Backend*>(NULL));
return SSRBackend(reinterpret_cast<SRBackend*>(NULL));
}
/**
@ -1024,13 +1004,13 @@ void SchemaRouterSession::route_queued_query()
* @param router_cli_ses Router client session
* @return 1 if mapping is done, 0 if it is still ongoing and -1 on error
*/
int SchemaRouterSession::inspect_mapping_states(SBackend& bref,
int SchemaRouterSession::inspect_mapping_states(SSRBackend& bref,
GWBUF** wbuf)
{
bool mapped = true;
GWBUF* writebuf = *wbuf;
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
if (bref->dcb() == (*it)->dcb() && !(*it)->is_mapped())
{
@ -1248,6 +1228,8 @@ bool SchemaRouterSession::ignore_duplicate_database(const char* data)
{
rval = true;
}
pcre2_match_data_free(match_data);
}
return rval;
@ -1264,7 +1246,7 @@ bool SchemaRouterSession::ignore_duplicate_database(const char* data)
* @return 1 if a complete response was received, 0 if a partial response was received
* and -1 if a database was found on more than one server.
*/
enum showdb_response SchemaRouterSession::parse_mapping_response(SBackend& bref, GWBUF** buffer)
enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bref, GWBUF** buffer)
{
unsigned char* ptr;
SERVER* target = bref->backend()->server;
@ -1383,7 +1365,7 @@ enum showdb_response SchemaRouterSession::parse_mapping_response(SBackend& bref,
void SchemaRouterSession::query_databases()
{
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
(*it)->set_mapped(false);
}
@ -1394,7 +1376,7 @@ void SchemaRouterSession::query_databases()
GWBUF *buffer = modutil_create_query("SHOW DATABASES");
gwbuf_set_type(buffer, GWBUF_TYPE_COLLECT_RESULT);
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
if ((*it)->in_use() && !(*it)->is_closed() &
SERVER_IS_RUNNING((*it)->backend()->server))
@ -1502,7 +1484,7 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
}
else if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER)
{
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
char *srvnm = (*it)->backend()->server->unique_name;
@ -1551,7 +1533,7 @@ bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name)
bool succp = false;
ss_dassert(p_dcb != NULL && *(p_dcb) == NULL);
for (BackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
{
SERVER_REF* b = (*it)->backend();
/**
@ -1666,3 +1648,5 @@ bool SchemaRouterSession::send_databases()
return rval;
}
}

View File

@ -19,15 +19,12 @@
#include <maxscale/protocol/mysql.h>
#include <maxscale/router.hh>
#include <maxscale/session_command.hh>
#include "shard_map.hh"
#include "session_command.hh"
using std::string;
using std::list;
using namespace schemarouter;
namespace schemarouter
{
/**
* Bitmask values for the router session's initialization. These values are used
* to prevent responses from internal commands being forwarded to the client.
@ -80,7 +77,7 @@ class SchemaRouterSession: public mxs::RouterSession
{
public:
SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router, BackendList& backends);
SchemaRouterSession(MXS_SESSION* session, SchemaRouter* router, SSRBackendList& backends);
/**
* The RouterSession instance will be deleted when a client session
@ -128,7 +125,7 @@ private:
/** Helper functions */
SERVER* get_shard_target(GWBUF* buffer, uint32_t qtype);
SBackend get_bref_from_dcb(DCB* dcb);
SSRBackend get_bref_from_dcb(DCB* dcb);
bool get_shard_dcb(DCB** dcb, char* name);
bool have_servers();
bool handle_default_db();
@ -136,7 +133,7 @@ private:
/** Routing functions */
bool route_session_write(GWBUF* querybuf, uint8_t command);
void process_sescmd_response(SBackend& bref, GWBUF** ppPacket);
void process_sescmd_response(SSRBackend& bref, GWBUF** ppPacket);
SERVER* resolve_query_target(GWBUF* pPacket, uint32_t type, uint8_t command,
enum route_target& route_target);
@ -144,26 +141,27 @@ private:
bool send_databases();
bool send_shards();
void query_databases();
int inspect_mapping_states(SBackend& bref, GWBUF** wbuf);
enum showdb_response parse_mapping_response(SBackend& bref, GWBUF** buffer);
int inspect_mapping_states(SSRBackend& bref, GWBUF** wbuf);
enum showdb_response parse_mapping_response(SSRBackend& bref, GWBUF** buffer);
void route_queued_query();
void synchronize_shards();
void handle_mapping_reply(SBackend& bref, GWBUF** pPacket);
void handle_mapping_reply(SSRBackend& bref, GWBUF** pPacket);
/** Member variables */
bool m_closed; /**< True if session closed */
DCB* m_client; /**< The client DCB */
MYSQL_session* m_mysql_session; /**< Session client data (username, password, SHA1). */
BackendList m_backends; /**< Backend references */
Config* m_config; /**< Pointer to router config */
SchemaRouter* m_router; /**< The router instance */
Shard m_shard; /**< Database to server mapping */
string m_connect_db; /**< Database the user was trying to connect to */
string m_current_db; /**< Current active database */
int m_state; /**< Initialization state bitmask */
list<Buffer> m_queue; /**< Query that was received before the session was ready */
Stats m_stats; /**< Statistics for this router */
uint64_t m_sent_sescmd; /**< The latest session command being executed */
uint64_t m_replied_sescmd; /**< The last session command reply that was sent to the client */
SERVER* m_load_target; /**< Target for LOAD DATA LOCAL INFILE */
bool m_closed; /**< True if session closed */
DCB* m_client; /**< The client DCB */
MYSQL_session* m_mysql_session; /**< Session client data (username, password, SHA1). */
SSRBackendList m_backends; /**< Backend references */
Config* m_config; /**< Pointer to router config */
SchemaRouter* m_router; /**< The router instance */
Shard m_shard; /**< Database to server mapping */
std::string m_connect_db; /**< Database the user was trying to connect to */
std::string m_current_db; /**< Current active database */
int m_state; /**< Initialization state bitmask */
std::list<mxs::Buffer> m_queue; /**< Query that was received before the session was ready */
Stats m_stats; /**< Statistics for this router */
uint64_t m_sent_sescmd; /**< The latest session command being executed */
uint64_t m_replied_sescmd; /**< The last session command reply that was sent to the client */
SERVER* m_load_target; /**< Target for LOAD DATA LOCAL INFILE */
};
}

View File

@ -1,76 +0,0 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "session_command.hh"
#include <maxscale/modutil.h>
#include <maxscale/protocol/mysql.h>
void SessionCommand::mark_reply_received()
{
m_reply_sent = true;
}
bool SessionCommand::is_reply_received() const
{
return m_reply_sent;
}
uint8_t SessionCommand::get_command() const
{
return m_command;
}
uint64_t SessionCommand::get_position() const
{
return m_pos;
}
Buffer SessionCommand::copy_buffer() const
{
return m_buffer;
}
SessionCommand::SessionCommand(GWBUF *buffer, uint64_t id):
m_buffer(buffer),
m_command(0),
m_pos(id),
m_reply_sent(false)
{
if (buffer)
{
gwbuf_copy_data(buffer, MYSQL_HEADER_LEN, 1, &m_command);
}
}
SessionCommand::~SessionCommand()
{
}
std::string SessionCommand::to_string()
{
std::string str;
char *sql;
int sql_len;
/** TODO: Create C++ versions of modutil functions */
GWBUF *buf = m_buffer.release();
if (modutil_extract_SQL(buf, &sql, &sql_len))
{
str.append(sql, sql_len);
}
m_buffer.reset(buf);
return str;
}

View File

@ -1,82 +0,0 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <list>
#include <string>
#include <maxscale/buffer.hh>
using namespace maxscale;
class SessionCommand;
typedef std::list<SessionCommand> SessionCommandList;
class SessionCommand
{
public:
/**
* @brief Mark reply as received
*/
void mark_reply_received();
/**
* @brief Check if the session command has received a reply
* @return True if the reply is already received
*/
bool is_reply_received() const;
/**
* @brief Get the command type of the session command
*
* @return The type of the command
*/
uint8_t get_command() const;
/**
* @brief Get the position of this session command
*
* @return The position of the session command
*/
uint64_t get_position() const;
/**
* @brief Creates a copy of the internal buffer
* @return A copy of the internal buffer
*/
Buffer copy_buffer() const;
/**
* @brief Create a new session command
*
* @param buffer The buffer containing the command. Note that the ownership
* of @c buffer is transferred to this object.
* @param id A unique position identifier used to track replies
*/
SessionCommand(GWBUF *buffer, uint64_t id);
~SessionCommand();
/**
* @brief Debug function for printing session commands
*
* @return String representation of the object
*/
std::string to_string();
private:
Buffer m_buffer; /**< The buffer containing the command */
uint8_t m_command; /**< The command being executed */
uint64_t m_pos; /**< Unique position identifier */
bool m_reply_sent; /**< Whether the session command reply has been sent */
};