Develop merge

Develop merge
This commit is contained in:
MassimilianoPinto
2017-06-29 15:34:22 +02:00
parent 4993fd683c
commit cb57e10761
122 changed files with 16937 additions and 1627 deletions

View File

@ -911,6 +911,15 @@ bool is_create_table_statement(AVRO_INSTANCE *router, char* ptr, size_t len)
return rc > 0;
}
bool is_create_like_statement(const char* ptr, size_t len)
{
char sql[len + 1];
memcpy(sql, ptr, len);
sql[len] = '\0';
// This is not pretty but it should work
return strcasestr(sql, " like ") || strcasestr(sql, "(like ");
}
/**
* @brief Detection of table alteration statements
@ -1020,7 +1029,16 @@ void handle_query_event(AVRO_INSTANCE *router, REP_HEADER *hdr, int *pending_tra
if (is_create_table_statement(router, sql, len))
{
TABLE_CREATE *created = table_create_alloc(sql, db);
TABLE_CREATE *created = NULL;
if (is_create_like_statement(sql, len))
{
created = table_create_copy(router, sql, len, db);
}
else
{
created = table_create_alloc(sql, db);
}
if (created && !save_and_replace_table_create(router, created))
{
@ -1053,7 +1071,6 @@ void handle_query_event(AVRO_INSTANCE *router, REP_HEADER *hdr, int *pending_tra
strcat(full_ident, ident);
TABLE_CREATE *created = hashtable_fetch(router->created_tables, full_ident);
ss_dassert(created);
if (created)
{

View File

@ -131,13 +131,13 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
if (old)
{
router->active_maps[old->id % sizeof(router->active_maps)] = NULL;
router->active_maps[old->id % MAX_MAPPED_TABLES] = NULL;
}
hashtable_delete(router->table_maps, table_ident);
hashtable_add(router->table_maps, (void*) table_ident, map);
hashtable_add(router->open_tables, table_ident, avro_table);
save_avro_schema(router->avrodir, json_schema, map);
router->active_maps[map->id % sizeof(router->active_maps)] = map;
router->active_maps[map->id % MAX_MAPPED_TABLES] = map;
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
rval = true;
@ -164,10 +164,10 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
}
else
{
ss_dassert(router->active_maps[old->id % sizeof(router->active_maps)] == old);
router->active_maps[old->id % sizeof(router->active_maps)] = NULL;
ss_dassert(router->active_maps[old->id % MAX_MAPPED_TABLES] == old);
router->active_maps[old->id % MAX_MAPPED_TABLES] = NULL;
table_map_remap(ptr, ev_len, old);
router->active_maps[old->id % sizeof(router->active_maps)] = old;
router->active_maps[old->id % MAX_MAPPED_TABLES] = old;
MXS_DEBUG("Table %s re-mapped to %lu", table_ident, old->id);
/** No changes in the schema */
rval = true;
@ -294,7 +294,7 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
/** There should always be a table map event prior to a row event.
* TODO: Make the active_maps dynamic */
TABLE_MAP *map = router->active_maps[table_id % sizeof(router->active_maps)];
TABLE_MAP *map = router->active_maps[table_id % MAX_MAPPED_TABLES];
if (map)
{

View File

@ -771,6 +771,227 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
return rval;
}
static const char* TOK_CREATE[] =
{
"CREATE",
NULL
};
static const char* TOK_TABLE[] =
{
"TABLE",
NULL
};
static const char* TOK_GROUP_REPLACE[] =
{
"OR",
"REPLACE",
NULL
};
static const char* TOK_GROUP_EXISTS[] =
{
"IF",
"NOT",
"EXISTS",
NULL
};
/**
* Read one token (i.e. SQL keyword)
*/
static const char* get_token(const char* ptr, const char* end, char* dest)
{
while (ptr < end && isspace(*ptr))
{
ptr++;
}
const char* start = ptr;
while (ptr < end && !isspace(*ptr))
{
ptr++;
}
size_t len = ptr - start;
memcpy(dest, start, len);
dest[len] = '\0';
return ptr;
}
/**
* Consume one token
*/
static bool chomp_one_token(const char* expected, const char** ptr, const char* end, char* buf)
{
bool rval = false;
const char* next = get_token(*ptr, end, buf);
if (strcasecmp(buf, expected) == 0)
{
rval = true;
*ptr = next;
}
return rval;
}
/**
* Consume all tokens in a group
*/
static bool chomp_tokens(const char** tokens, const char** ptr, const char* end, char* buf)
{
bool next = true;
bool rval = false;
do
{
next = false;
for (int i = 0; tokens[i]; i++)
{
if (chomp_one_token(tokens[i], ptr, end, buf))
{
rval = true;
next = true;
break;
}
}
}
while (next);
return rval;
}
/**
* Remove any extra characters from a string
*/
static void remove_extras(char* str)
{
char* end = strchr(str, '\0') - 1;
while (end > str && (*end == '`' || *end == ')' || *end == '('))
{
*end-- = '\0';
}
char* start = str;
while (start < end && (*start == '`' || *start == ')' || *start == '('))
{
start++;
}
size_t len = strlen(start);
memmove(str, start, len);
str[len] = '\0';
ss_dassert(strlen(str) == len);
}
/**
* Extract both tables from a `CREATE TABLE t1 LIKE t2` statement
*/
static bool extract_create_like_identifier(const char* sql, size_t len, char* target, char* source)
{
bool rval = false;
char buffer[len + 1];
buffer[0] = '\0';
const char* ptr = sql;
const char* end = ptr + sizeof(buffer);
if (chomp_tokens(TOK_CREATE, &ptr, end, buffer))
{
chomp_tokens(TOK_GROUP_REPLACE, &ptr, end, buffer);
if (chomp_tokens(TOK_TABLE, &ptr, end, buffer))
{
chomp_tokens(TOK_GROUP_EXISTS, &ptr, end, buffer);
// Read the target table name
ptr = get_token(ptr, end, buffer);
strcpy(target, buffer);
// Skip the LIKE token
ptr = get_token(ptr, end, buffer);
// Read the source table name
ptr = get_token(ptr, end, buffer);
remove_extras(buffer);
strcpy(source, buffer);
rval = true;
}
}
return rval;
}
/**
* Create a table from another table
*/
TABLE_CREATE* table_create_copy(AVRO_INSTANCE *router, const char* sql, size_t len, const char* db)
{
TABLE_CREATE* rval = NULL;
char target[MYSQL_TABLE_MAXLEN + 1] = "";
char source[MYSQL_TABLE_MAXLEN + 1] = "";
if (extract_create_like_identifier(sql, len, target, source))
{
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2] = "";
if (strchr(source, '.') == NULL)
{
strcpy(table_ident, db);
strcat(table_ident, ".");
}
strcat(table_ident, source);
TABLE_CREATE *old = hashtable_fetch(router->created_tables, table_ident);
if (old)
{
int n = old->columns;
char** names = MXS_MALLOC(sizeof(char*) * n);
char** types = MXS_MALLOC(sizeof(char*) * n);
int* lengths = MXS_MALLOC(sizeof(int) * n);
rval = MXS_MALLOC(sizeof(TABLE_CREATE));
MXS_ABORT_IF_FALSE(names && types && lengths && rval);
for (uint64_t i = 0; i < old->columns; i++)
{
names[i] = MXS_STRDUP_A(old->column_names[i]);
types[i] = MXS_STRDUP_A(old->column_types[i]);
lengths[i] = old->column_lengths[i];
}
rval->version = 1;
rval->was_used = false;
rval->column_names = names;
rval->column_lengths = lengths;
rval->column_types = types;
rval->columns = old->columns;
rval->database = MXS_STRDUP_A(db);
char* table = strchr(target, '.');
table = table ? table + 1 : target;
rval->table = MXS_STRDUP_A(table);
}
else
{
MXS_ERROR("Could not find table '%s' that '%s' is being created from: %.*s",
table_ident, target, (int)len, sql);
}
}
return rval;
}
/**
* Free a TABLE_CREATE structure
* @param value Value to free

View File

@ -310,6 +310,7 @@ extern void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *tab
extern TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create);
extern void table_map_free(TABLE_MAP *map);
extern TABLE_CREATE* table_create_alloc(const char* sql, const char* db);
extern TABLE_CREATE* table_create_copy(AVRO_INSTANCE *router, const char* sql, size_t len, const char* db);
extern void table_create_free(TABLE_CREATE* value);
extern bool table_create_save(TABLE_CREATE *create, const char *filename);
extern bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end);

View File

@ -72,6 +72,7 @@
#include <maxscale/alloc.h>
#include <inttypes.h>
#include <maxscale/secrets.h>
#include <maxscale/encryption.h>
/**
* AES_CTR handling
@ -3548,7 +3549,6 @@ static GWBUF *blr_aes_crypt(ROUTER_INSTANCE *router,
uint8_t *iv,
int action)
{
EVP_CIPHER_CTX ctx;
uint8_t *key = router->encryption.key_value;
unsigned int key_len = router->encryption.key_len;
int outlen;
@ -3571,10 +3571,10 @@ static GWBUF *blr_aes_crypt(ROUTER_INSTANCE *router,
out_ptr = GWBUF_DATA(outbuf);
EVP_CIPHER_CTX_init(&ctx);
EVP_CIPHER_CTX *ctx = mxs_evp_cipher_ctx_alloc();
/* Set the encryption algorithm accordingly to key_len and encryption mode */
if (!EVP_CipherInit_ex(&ctx,
if (!EVP_CipherInit_ex(ctx,
ciphers[router->encryption.encryption_algorithm](router->encryption.key_len),
NULL,
key,
@ -3583,23 +3583,23 @@ static GWBUF *blr_aes_crypt(ROUTER_INSTANCE *router,
{
MXS_ERROR("Error in EVP_CipherInit_ex for algo %d",
router->encryption.encryption_algorithm);
EVP_CIPHER_CTX_cleanup(&ctx);
mxs_evp_cipher_ctx_free(ctx);
MXS_FREE(outbuf);
return NULL;
}
/* Set no padding */
EVP_CIPHER_CTX_set_padding(&ctx, 0);
EVP_CIPHER_CTX_set_padding(ctx, 0);
/* Encryt/Decrypt the input data */
if (!EVP_CipherUpdate(&ctx,
if (!EVP_CipherUpdate(ctx,
out_ptr + 4,
&outlen,
buffer,
size))
{
MXS_ERROR("Error in EVP_CipherUpdate");
EVP_CIPHER_CTX_cleanup(&ctx);
mxs_evp_cipher_ctx_free(ctx);
MXS_FREE(outbuf);
return NULL;
}
@ -3610,7 +3610,7 @@ static GWBUF *blr_aes_crypt(ROUTER_INSTANCE *router,
if (router->encryption.encryption_algorithm != BLR_AES_CBC)
{
/* Call Final_ex */
if (!EVP_CipherFinal_ex(&ctx,
if (!EVP_CipherFinal_ex(ctx,
(out_ptr + 4 + outlen),
(int*)&flen))
{
@ -3624,12 +3624,12 @@ static GWBUF *blr_aes_crypt(ROUTER_INSTANCE *router,
* If some bytes (ctx.buf_len) are still available in ctx.buf
* handle them with ECB and XOR
*/
if (ctx.buf_len)
if (size - outlen > 0)
{
if (!blr_aes_create_tail_for_cbc(out_ptr + 4 + outlen,
ctx.buf,
ctx.buf_len,
ctx.oiv,
mxs_evp_cipher_ctx_buf(ctx),
size - outlen,
mxs_evp_cipher_ctx_oiv(ctx),
router->encryption.key_value,
router->encryption.key_len))
{
@ -3645,7 +3645,7 @@ static GWBUF *blr_aes_crypt(ROUTER_INSTANCE *router,
outbuf = NULL;
}
EVP_CIPHER_CTX_cleanup(&ctx);
mxs_evp_cipher_ctx_free(ctx);
return outbuf;
}
@ -3829,14 +3829,13 @@ static int blr_aes_create_tail_for_cbc(uint8_t *output,
uint8_t *key,
unsigned int key_len)
{
EVP_CIPHER_CTX t_ctx;
uint8_t mask[AES_BLOCK_SIZE];
int mlen = 0;
EVP_CIPHER_CTX_init(&t_ctx);
EVP_CIPHER_CTX* t_ctx = mxs_evp_cipher_ctx_alloc();
/* Initialise with AES_ECB and NULL iv */
if (!EVP_CipherInit_ex(&t_ctx,
if (!EVP_CipherInit_ex(t_ctx,
ciphers[BLR_AES_ECB](key_len),
NULL,
key,
@ -3844,22 +3843,22 @@ static int blr_aes_create_tail_for_cbc(uint8_t *output,
BINLOG_FLAG_ENCRYPT))
{
MXS_ERROR("Error in EVP_CipherInit_ex CBC for last block (ECB)");
EVP_CIPHER_CTX_cleanup(&t_ctx);
mxs_evp_cipher_ctx_free(t_ctx);
return 0;
}
/* Set no padding */
EVP_CIPHER_CTX_set_padding(&t_ctx, 0);
EVP_CIPHER_CTX_set_padding(t_ctx, 0);
/* Do the enc/dec of the IV (the one from previous stage) */
if (!EVP_CipherUpdate(&t_ctx,
if (!EVP_CipherUpdate(t_ctx,
mask,
&mlen,
iv,
sizeof(mask)))
{
MXS_ERROR("Error in EVP_CipherUpdate ECB");
EVP_CIPHER_CTX_cleanup(&t_ctx);
mxs_evp_cipher_ctx_free(t_ctx);
return 0;
}
@ -3874,7 +3873,7 @@ static int blr_aes_create_tail_for_cbc(uint8_t *output,
output[i] = input[i] ^ mask[i];
}
EVP_CIPHER_CTX_cleanup(&t_ctx);
mxs_evp_cipher_ctx_free(t_ctx);
return 1;
}

View File

@ -1,5 +1,6 @@
add_library(readwritesplit SHARED
readwritesplit.cc
rwsplitsession.cc
rwsplit_mysql.cc
rwsplit_route_stmt.cc
rwsplit_select_backends.cc

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,7 @@
*/
/**
* @file router.h - The read write split router module heder file
* @file Readwritesplit common header
*/
#define MXS_MODULE_NAME "readwritesplit"
@ -27,10 +27,12 @@
#include <maxscale/dcb.h>
#include <maxscale/hashtable.h>
#include <maxscale/log_manager.h>
#include <maxscale/router.h>
#include <maxscale/service.h>
#include <maxscale/backend.hh>
#include <maxscale/session_command.hh>
#include <maxscale/protocol/mysql.h>
enum backend_type_t
{
@ -77,6 +79,33 @@ enum failure_mode
RW_ERROR_ON_WRITE /**< Don't close the connection but send an error for writes */
};
/**
* Enum values for router parameters
*/
static const MXS_ENUM_VALUE use_sql_variables_in_values[] =
{
{"all", TYPE_ALL},
{"master", TYPE_MASTER},
{NULL}
};
static const MXS_ENUM_VALUE slave_selection_criteria_values[] =
{
{"LEAST_GLOBAL_CONNECTIONS", LEAST_GLOBAL_CONNECTIONS},
{"LEAST_ROUTER_CONNECTIONS", LEAST_ROUTER_CONNECTIONS},
{"LEAST_BEHIND_MASTER", LEAST_BEHIND_MASTER},
{"LEAST_CURRENT_OPERATIONS", LEAST_CURRENT_OPERATIONS},
{NULL}
};
static const MXS_ENUM_VALUE master_failure_mode_values[] =
{
{"fail_instantly", RW_FAIL_INSTANTLY},
{"fail_on_write", RW_FAIL_ON_WRITE},
{"error_on_write", RW_ERROR_ON_WRITE},
{NULL}
};
/** States of a LOAD DATA LOCAL INFILE */
enum ld_state
{
@ -116,129 +145,65 @@ enum ld_state
#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED));
/** Reply state change debug logging */
#define LOG_RS(a, b) MXS_DEBUG("[%s]:%d %s -> %s", (a)->server()->name, \
(a)->server()->port, rstostr((a)->get_reply_state()), rstostr(b));
struct ROUTER_INSTANCE;
struct ROUTER_CLIENT_SES;
/** Enum for tracking client reply state */
enum reply_state_t
struct Config
{
REPLY_STATE_START, /**< Query sent to backend */
REPLY_STATE_DONE, /**< Complete reply received */
REPLY_STATE_RSET_COLDEF, /**< Resultset response, waiting for column definitions */
REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */
};
Config(MXS_CONFIG_PARAMETER* params):
slave_selection_criteria(
(select_criteria_t)config_get_enum(
params, "slave_selection_criteria", slave_selection_criteria_values)),
use_sql_variables_in(
(mxs_target_t)config_get_enum(
params, "use_sql_variables_in", use_sql_variables_in_values)),
master_failure_mode(
(enum failure_mode)config_get_enum(
params, "master_failure_mode", master_failure_mode_values)),
max_sescmd_history(config_get_integer(params, "max_sescmd_history")),
disable_sescmd_history(config_get_bool(params, "disable_sescmd_history")),
master_accept_reads(config_get_bool(params, "master_accept_reads")),
strict_multi_stmt(config_get_bool(params, "strict_multi_stmt")),
retry_failed_reads(config_get_bool(params, "retry_failed_reads")),
connection_keepalive(config_get_integer(params, "connection_keepalive")),
max_slave_replication_lag(config_get_integer(params, "max_slave_replication_lag")),
rw_max_slave_conn_percent(0),
max_slave_connections(0)
{
}
struct rwsplit_config_t
{
int rw_max_slave_conn_percent; /**< Maximum percentage of slaves
* to use for each connection*/
int max_slave_connections; /**< Maximum number of slaves for each connection*/
select_criteria_t slave_selection_criteria; /**< The slave selection criteria */
select_criteria_t slave_selection_criteria; /**< The slave selection criteria */
mxs_target_t use_sql_variables_in; /**< Whether to send user variables to
* master or all nodes */
failure_mode master_failure_mode; /**< Master server failure handling mode */
uint64_t max_sescmd_history; /**< Maximum amount of session commands to store */
bool disable_sescmd_history; /**< Disable session command history */
bool master_accept_reads; /**< Use master for reads */
bool strict_multi_stmt; /**< Force non-multistatement queries to be routed to
* the master after a multistatement query. */
bool retry_failed_reads; /**< Retry failed reads on other servers */
int connection_keepalive; /**< Send pings to servers that have been idle
* for too long */
int max_slave_replication_lag; /**< Maximum replication lag */
mxs_target_t use_sql_variables_in; /**< Whether to send user variables
* to master or all nodes */
uint64_t max_sescmd_history; /**< Maximum amount of session commands to store */
bool disable_sescmd_history; /**< Disable session command history */
bool master_accept_reads; /**< Use master for reads */
bool strict_multi_stmt; /**< Force non-multistatement queries to be routed
* to the master after a multistatement query. */
enum failure_mode master_failure_mode; /**< Master server failure handling mode.
* @see enum failure_mode */
bool retry_failed_reads; /**< Retry failed reads on other servers */
int connection_keepalive; /**< Send pings to servers that have
* been idle for too long */
};
int rw_max_slave_conn_percent; /**< Maximum percentage of slaves to use for
* each connection*/
int max_slave_connections; /**< Maximum number of slaves for each connection*/
class RWBackend: public mxs::Backend
{
RWBackend(const RWBackend&);
RWBackend& operator=(const RWBackend&);
public:
RWBackend(SERVER_REF* ref):
mxs::Backend(ref),
m_reply_state(REPLY_STATE_DONE)
{
}
~RWBackend()
{
}
reply_state_t get_reply_state() const
{
return m_reply_state;
}
void set_reply_state(reply_state_t state)
{
m_reply_state = state;
}
bool execute_session_command()
{
bool rval = mxs::Backend::execute_session_command();
if (rval)
{
set_reply_state(REPLY_STATE_START);
}
return rval;
}
private:
reply_state_t m_reply_state;
};
typedef std::tr1::shared_ptr<RWBackend> SRWBackend;
typedef std::list<SRWBackend> SRWBackendList;
typedef std::tr1::unordered_set<std::string> TableSet;
typedef std::map<uint64_t, uint8_t> ResponseMap;
/** Prepared statement ID to type maps for text and binary protocols */
typedef std::tr1::unordered_map<std::string, uint32_t> TextPSMap;
/**
* The client session structure used within this router.
*/
struct ROUTER_CLIENT_SES
{
skygw_chk_t rses_chk_top;
bool rses_closed; /**< true when closeSession is called */
SRWBackendList backends; /**< List of backend servers */
SRWBackend current_master; /**< Current master server */
SRWBackend target_node; /**< The currently locked target node */
rwsplit_config_t rses_config; /**< copied config info from router instance */
int rses_nbackends;
enum ld_state load_data_state; /**< Current load data state */
bool have_tmp_tables;
uint64_t rses_load_data_sent; /**< How much data has been sent */
DCB* client_dcb;
uint64_t sescmd_count;
int expected_responses; /**< Number of expected responses to the current query */
GWBUF* query_queue; /**< Queued commands waiting to be executed */
struct ROUTER_INSTANCE *router; /**< The router instance */
struct ROUTER_CLIENT_SES *next;
TableSet temp_tables; /**< Set of temporary tables */
mxs::SessionCommandList sescmd_list; /**< List of executed session commands */
ResponseMap sescmd_responses; /**< Response to each session command */
uint64_t sent_sescmd; /**< ID of the last sent session command*/
uint64_t recv_sescmd; /**< ID of the most recently completed session command */
TextPSMap ps_text; /**< Text protocol prepared statements */
skygw_chk_t rses_chk_tail;
};
/**
* The statistics for this router instance
*/
struct ROUTER_STATS
struct Stats
{
public:
Stats():
n_sessions(0),
n_queries(0),
n_master(0),
n_slave(0),
n_all(0)
{
}
uint64_t n_sessions; /**< Number sessions created */
uint64_t n_queries; /**< Number of queries forwarded */
uint64_t n_master; /**< Number of stmts sent to master */
@ -249,26 +214,24 @@ struct ROUTER_STATS
/**
* The per instance data for the router.
*/
struct ROUTER_INSTANCE
class RWSplit
{
SERVICE* service; /**< Pointer to service */
rwsplit_config_t rwsplit_config; /**< expanded config info from SERVICE */
int rwsplit_version; /**< version number for router's config */
ROUTER_STATS stats; /**< Statistics for this router */
bool available_slaves; /**< The router has some slaves avialable */
};
RWSplit(const RWSplit&);
RWSplit& operator=(const RWSplit&);
/**
* @brief Route a stored query
*
* When multiple queries are executed in a pipeline fashion, the readwritesplit
* stores the extra queries in a queue. This queue is emptied after reading a
* reply from the backend server.
*
* @param rses Router client session
* @return True if a stored query was routed successfully
*/
bool route_stored_query(ROUTER_CLIENT_SES *rses);
public:
RWSplit(SERVICE* service, const Config& config);
~RWSplit();
SERVICE* service() const;
const Config& config() const;
Stats& stats();
private:
SERVICE* m_service; /**< Service where the router belongs*/
Config m_config;
Stats m_stats;
};
static inline const char* select_criteria_to_str(select_criteria_t type)
{
@ -291,30 +254,6 @@ static inline const char* select_criteria_to_str(select_criteria_t type)
}
}
/**
* Helper function to convert reply_state_t to string
*/
static inline const char* rstostr(reply_state_t state)
{
switch (state)
{
case REPLY_STATE_START:
return "REPLY_STATE_START";
case REPLY_STATE_DONE:
return "REPLY_STATE_DONE";
case REPLY_STATE_RSET_COLDEF:
return "REPLY_STATE_RSET_COLDEF";
case REPLY_STATE_RSET_ROWS:
return "REPLY_STATE_RSET_ROWS";
}
ss_dassert(false);
return "UNKNOWN";
}
static inline const char* failure_mode_to_str(enum failure_mode type)
{
switch (type)

View File

@ -12,14 +12,14 @@
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include "readwritesplit.hh"
#include <string>
#include <maxscale/query_classifier.h>
#include <maxscale/protocol/mysql.h>
#include "readwritesplit.hh"
#include "rwsplitsession.hh"
#define RW_CHK_DCB(b, d) \
do{ \
@ -31,21 +31,28 @@ do{ \
#define RW_CLOSE_BREF(b) do{ if (b){ (b)->closed_at = __LINE__; } } while (false)
static inline bool is_ps_command(uint8_t cmd)
{
return cmd == MYSQL_COM_STMT_EXECUTE ||
cmd == MYSQL_COM_STMT_SEND_LONG_DATA ||
cmd == MYSQL_COM_STMT_CLOSE ||
cmd == MYSQL_COM_STMT_FETCH ||
cmd == MYSQL_COM_STMT_RESET;
}
/*
* The following are implemented in rwsplit_mysql.c
*/
bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
bool route_single_stmt(RWSplit *inst, RWSplitSession *rses,
GWBUF *querybuf);
void closed_session_reply(GWBUF *querybuf);
void print_error_packet(ROUTER_CLIENT_SES *rses, GWBUF *buf, DCB *dcb);
void print_error_packet(RWSplitSession *rses, GWBUF *buf, DCB *dcb);
void check_session_command_reply(GWBUF *writebuf, SRWBackend bref);
bool execute_sescmd_in_backend(SRWBackend& backend_ref);
bool handle_target_is_all(route_target_t route_target,
ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
RWSplit *inst, RWSplitSession *rses,
GWBUF *querybuf, int packet_type, uint32_t qtype);
uint8_t determine_packet_type(GWBUF *querybuf, bool *non_empty_packet);
void log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint32_t qtype);
bool command_will_respond(uint8_t packet_type);
void log_transaction_status(RWSplitSession *rses, GWBUF *querybuf, uint32_t qtype);
bool is_packet_a_query(int packet_type);
bool send_readonly_error(DCB *dcb);
@ -53,32 +60,34 @@ bool send_readonly_error(DCB *dcb);
* The following are implemented in readwritesplit.c
*/
int router_handle_state_switch(DCB *dcb, DCB_REASON reason, void *data);
SRWBackend get_backend_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb);
int rses_get_max_slavecount(ROUTER_CLIENT_SES *rses);
int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses);
SRWBackend get_backend_from_dcb(RWSplitSession *rses, DCB *dcb);
int rses_get_max_slavecount(RWSplitSession *rses);
int rses_get_max_replication_lag(RWSplitSession *rses);
/*
* The following are implemented in rwsplit_route_stmt.c
*/
bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf);
SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
bool route_single_stmt(RWSplit *inst, RWSplitSession *rses,
GWBUF *querybuf, const RouteInfo& info);
SRWBackend get_target_backend(RWSplitSession *rses, backend_type_t btype,
char *name, int max_rlag);
route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
route_target_t get_route_target(RWSplitSession *rses, uint8_t command,
uint32_t qtype, HINT *hint);
void handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
void handle_multi_temp_and_load(RWSplitSession *rses, GWBUF *querybuf,
uint8_t packet_type, uint32_t *qtype);
SRWBackend handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf,
route_target_t route_target);
SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses);
bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses,
uint8_t cmd, uint32_t id);
bool handle_master_is_target(RWSplit *inst, RWSplitSession *rses,
SRWBackend* dest);
bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
bool handle_got_target(RWSplit *inst, RWSplitSession *rses,
GWBUF *querybuf, SRWBackend& target, bool store);
bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t command);
bool route_session_write(RWSplitSession *rses, GWBUF *querybuf,
uint8_t command, uint32_t type);
void process_sescmd_response(ROUTER_CLIENT_SES* rses, SRWBackend& bref,
void process_sescmd_response(RWSplitSession* rses, SRWBackend& bref,
GWBUF** ppPacket, bool* reconnect);
/*
* The following are implemented in rwsplit_select_backends.c
@ -95,28 +104,35 @@ bool select_connect_backend_servers(int router_nservers,
int max_nslaves,
select_criteria_t select_criteria,
MXS_SESSION *session,
ROUTER_INSTANCE *router,
ROUTER_CLIENT_SES *rses,
RWSplit *router,
RWSplitSession *rses,
connection_type type);
/*
* The following are implemented in rwsplit_tmp_table_multi.c
*/
void check_drop_tmp_table(ROUTER_CLIENT_SES *router_cli_ses, GWBUF *querybuf);
bool is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
void check_drop_tmp_table(RWSplitSession *router_cli_ses, GWBUF *querybuf);
bool is_read_tmp_table(RWSplitSession *router_cli_ses,
GWBUF *querybuf,
uint32_t type);
void check_create_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
void check_create_tmp_table(RWSplitSession *router_cli_ses,
GWBUF *querybuf, uint32_t type);
bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type);
uint32_t determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet);
void close_all_connections(ROUTER_CLIENT_SES* rses);
void close_all_connections(RWSplitSession* rses);
uint32_t determine_query_type(GWBUF *querybuf, int command);
/**
* Functions for prepared statement handling
* @brief Get the routing requirements for a query
*
* @param rses Router client session
* @param buffer Buffer containing the query
* @param command Output parameter where the packet command is stored
* @param type Output parameter where the query type is stored
* @param stmt_id Output parameter where statement ID, if the query is a binary protocol command, is stored
*
* @return The target type where this query should be routed
*/
std::string extract_text_ps_id(GWBUF* buffer);
void store_text_ps(ROUTER_CLIENT_SES* rses, std::string id, GWBUF* buffer);
void erase_text_ps(ROUTER_CLIENT_SES* rses, std::string id);
bool get_text_ps_type(ROUTER_CLIENT_SES* rses, GWBUF* buffer, uint32_t* out);
route_target_t get_target_type(RWSplitSession* rses, GWBUF* buffer, uint8_t* command,
uint32_t* type, uint32_t* stmt_id);

View File

@ -51,37 +51,65 @@
*/
/**
* @brief Determine packet type
* @brief Determine the type of a query
*
* Examine the packet in the buffer to extract the type, if possible. At the
* same time set the second parameter to indicate whether the packet was
* empty.
* @param querybuf GWBUF containing the query
* @param packet_type Integer denoting DB specific enum
* @param non_empty_packet Boolean to be set by this function
*
* It is assumed that the packet length and type are contained within a single
* buffer, the one indicated by the first parameter.
*
* @param querybuf Buffer containing the packet
* @param non_empty_packet bool indicating whether the packet is non-empty
* @return The packet type, or MYSQL_COM_UNDEFINED; also the second parameter is set
* @return uint32_t the query type; also the non_empty_packet bool is set
*/
uint8_t
determine_packet_type(GWBUF *querybuf, bool *non_empty_packet)
uint32_t determine_query_type(GWBUF *querybuf, int command)
{
uint8_t packet_type;
uint8_t *packet = GWBUF_DATA(querybuf);
uint32_t type = QUERY_TYPE_UNKNOWN;
if (gw_mysql_get_byte3(packet) == 0)
switch (command)
{
/** Empty packet signals end of LOAD DATA LOCAL INFILE, send it to master*/
*non_empty_packet = false;
packet_type = (uint8_t)MYSQL_COM_UNDEFINED;
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
case MYSQL_COM_PING: /*< 0e all servers are pinged */
case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
case MYSQL_COM_SET_OPTION: /*< 1b send options to all servers */
type = QUERY_TYPE_SESSION_WRITE;
break;
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
type = QUERY_TYPE_WRITE;
break;
case MYSQL_COM_QUERY:
type = qc_get_type_mask(querybuf);
break;
case MYSQL_COM_STMT_PREPARE:
type = qc_get_type_mask(querybuf);
type |= QUERY_TYPE_PREPARE_STMT;
break;
case MYSQL_COM_STMT_EXECUTE:
/** Parsing is not needed for this type of packet */
type = QUERY_TYPE_EXEC_STMT;
break;
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
case MYSQL_COM_STATISTICS: /**< 9 ? */
case MYSQL_COM_PROCESS_INFO: /**< 0a ? */
case MYSQL_COM_CONNECT: /**< 0b ? */
case MYSQL_COM_PROCESS_KILL: /**< 0c ? */
case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */
case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */
case MYSQL_COM_DAEMON: /**< 1d ? */
default:
break;
}
else
{
*non_empty_packet = true;
packet_type = packet[4];
}
return packet_type;
return type;
}
/*
@ -104,27 +132,6 @@ is_packet_a_query(int packet_type)
return (packet_type == MYSQL_COM_QUERY);
}
/*
* This looks MySQL specific
*/
/**
* @brief Determine if a packet contains a one way message
*
* Packet type tells us this, but in a DB specific way. This function is
* provided so that code that is not DB specific can find out whether a packet
* contains a one way messsage. Clearly, to be effective different functions must be
* called for different DB types.
*
* @param packet_type Type of packet (integer)
* @return bool indicating whether packet contains a one way message
*/
bool command_will_respond(uint8_t packet_type)
{
return packet_type != MYSQL_COM_STMT_SEND_LONG_DATA &&
packet_type != MYSQL_COM_QUIT &&
packet_type != MYSQL_COM_STMT_CLOSE;
}
/*
* This one is problematic because it is MySQL specific, but also router
* specific.
@ -141,7 +148,7 @@ bool command_will_respond(uint8_t packet_type)
* @param qtype Query type
*/
void
log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint32_t qtype)
log_transaction_status(RWSplitSession *rses, GWBUF *querybuf, uint32_t qtype)
{
if (rses->load_data_state == LOAD_DATA_INACTIVE)
{
@ -201,21 +208,12 @@ log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint32_t qtype)
* @param qtype Query type
* @return bool indicating whether the session can continue
*/
bool handle_target_is_all(route_target_t route_target, ROUTER_INSTANCE *inst,
ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
bool handle_target_is_all(route_target_t route_target, RWSplit *inst,
RWSplitSession *rses, GWBUF *querybuf,
int packet_type, uint32_t qtype)
{
bool result = false;
if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT))
{
store_text_ps(rses, extract_text_ps_id(querybuf), querybuf);
}
else if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT))
{
gwbuf_set_type(querybuf, GWBUF_TYPE_COLLECT_RESULT);
}
if (TARGET_IS_MASTER(route_target) || TARGET_IS_SLAVE(route_target))
{
/**
@ -243,11 +241,11 @@ bool handle_target_is_all(route_target_t route_target, ROUTER_INSTANCE *inst,
MXS_FREE(query_str);
MXS_FREE(qtype_str);
}
else if (route_session_write(rses, gwbuf_clone(querybuf), packet_type))
else if (route_session_write(rses, gwbuf_clone(querybuf), packet_type, qtype))
{
result = true;
atomic_add_uint64(&inst->stats.n_all, 1);
atomic_add_uint64(&inst->stats().n_all, 1);
}
return result;

View File

@ -15,8 +15,50 @@
#include <maxscale/alloc.h>
#include <maxscale/query_classifier.h>
#include <maxscale/protocol/mysql.h>
std::string extract_text_ps_id(GWBUF* buffer)
#include "rwsplit_internal.hh"
uint32_t get_prepare_type(GWBUF* buffer)
{
uint32_t type;
if (mxs_mysql_get_command(buffer) == MYSQL_COM_STMT_PREPARE)
{
// TODO: This could be done inside the query classifier
size_t packet_len = gwbuf_length(buffer);
size_t payload_len = packet_len - MYSQL_HEADER_LEN;
GWBUF* stmt = gwbuf_alloc(packet_len);
uint8_t* ptr = GWBUF_DATA(stmt);
// Payload length
*ptr++ = payload_len;
*ptr++ = (payload_len >> 8);
*ptr++ = (payload_len >> 16);
// Sequence id
*ptr++ = 0x00;
// Command
*ptr++ = MYSQL_COM_QUERY;
gwbuf_copy_data(buffer, MYSQL_HEADER_LEN + 1, payload_len - 1, ptr);
type = qc_get_type_mask(stmt);
gwbuf_free(stmt);
}
else
{
GWBUF* stmt = qc_get_preparable_stmt(buffer);
ss_dassert(stmt);
type = qc_get_type_mask(stmt);
}
ss_dassert((type & (QUERY_TYPE_PREPARE_STMT | QUERY_TYPE_PREPARE_NAMED_STMT)) == 0);
return type;
}
std::string get_text_ps_id(GWBUF* buffer)
{
std::string rval;
char* name = qc_get_prepare_name(buffer);
@ -30,39 +72,89 @@ std::string extract_text_ps_id(GWBUF* buffer)
return rval;
}
void store_text_ps(ROUTER_CLIENT_SES* rses, std::string id, GWBUF* buffer)
void replace_binary_ps_id(GWBUF* buffer, uint32_t id)
{
GWBUF* stmt = qc_get_preparable_stmt(buffer);
ss_dassert(stmt);
uint32_t type = qc_get_type_mask(stmt);
ss_dassert((type & (QUERY_TYPE_PREPARE_STMT | QUERY_TYPE_PREPARE_NAMED_STMT)) == 0);
rses->ps_text[id] = type;
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
gw_mysql_set_byte4(ptr, id);
}
void erase_text_ps(ROUTER_CLIENT_SES* rses, std::string id)
PSManager::PSManager()
{
rses->ps_text.erase(id);
}
bool get_text_ps_type(ROUTER_CLIENT_SES* rses, GWBUF* buffer, uint32_t* out)
PSManager::~PSManager()
{
bool rval = false;
char* name = qc_get_prepare_name(buffer);
}
if (name)
void PSManager::erase(uint32_t id)
{
if (m_binary_ps.erase(id) == 0)
{
TextPSMap::iterator it = rses->ps_text.find(name);
MXS_WARNING("Closing unknown prepared statement with ID %u", id);
}
}
if (it != rses->ps_text.end())
{
*out = it->second;
rval = true;
}
void PSManager::erase(std::string id)
{
if (m_text_ps.erase(id) == 0)
{
MXS_WARNING("Closing unknown prepared statement with ID '%s'", id.c_str());
}
}
MXS_FREE(name);
uint32_t PSManager::get_type(std::string id) const
{
uint32_t rval = QUERY_TYPE_UNKNOWN;
TextPSMap::const_iterator it = m_text_ps.find(id);
if (it != m_text_ps.end())
{
rval = it->second;
}
else
{
MXS_WARNING("Using unknown prepared statement with ID '%s'", id.c_str());
}
return rval;
}
uint32_t PSManager::get_type(uint32_t id) const
{
uint32_t rval = QUERY_TYPE_UNKNOWN;
BinaryPSMap::const_iterator it = m_binary_ps.find(id);
if (it != m_binary_ps.end())
{
rval = it->second;
}
else
{
MXS_WARNING("Using unknown prepared statement with ID %u", id);
}
return rval;
}
void PSManager::store(GWBUF* buffer, uint32_t id)
{
ss_dassert(mxs_mysql_get_command(buffer) == MYSQL_COM_STMT_PREPARE ||
qc_query_is_type(qc_get_type_mask(buffer),
QUERY_TYPE_PREPARE_NAMED_STMT));
switch (mxs_mysql_get_command(buffer))
{
case MYSQL_COM_QUERY:
m_text_ps[get_text_ps_id(buffer)] = get_prepare_type(buffer);
break;
case MYSQL_COM_STMT_PREPARE:
m_binary_ps[id] = get_prepare_type(buffer);
break;
default:
ss_dassert(!true);
break;
}
}

View File

@ -0,0 +1,91 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "readwritesplit.hh"
#include <tr1/unordered_map>
#include <string>
/** Prepared statement ID to type maps for text protocols */
typedef std::tr1::unordered_map<uint32_t, uint32_t> BinaryPSMap;
typedef std::tr1::unordered_map<std::string, uint32_t> TextPSMap;
/** Class for tracking prepared statement types by PS statement ID */
class PSManager
{
PSManager(const PSManager&);
PSManager& operator =(const PSManager&);
public:
PSManager();
~PSManager();
/**
* @brief Store and process a prepared statement
*
* @param buffer Buffer containing either a text or a binary protocol
* prepared statement
* @param id The unique ID for this statement
*/
void store(GWBUF* buffer, uint32_t id);
/**
* @brief Get the type of a stored prepared statement
*
* @param id The unique identifier for the prepared statement or the plaintext
* name of the prepared statement
*
* @return The type of the prepared statement
*/
uint32_t get_type(uint32_t id) const;
uint32_t get_type(std::string id) const;
/**
* @brief Remove a prepared statement
*
* @param id Statement identifier to remove
*/
void erase(std::string id);
void erase(uint32_t id);
private:
BinaryPSMap m_binary_ps;
TextPSMap m_text_ps;
};
/**
* @brief Get the type of a prepared statement
*
* @param buffer Buffer containing either a text or a binary prepared statement
*
* @return The type of the prepared statement
*/
uint32_t get_prepare_type(GWBUF* buffer);
/**
* @brief Extract text identifier of a PREPARE or EXECUTE statement
*
* @param buffer Buffer containing a PREPARE or EXECUTE command
*
* @return The string identifier of the statement
*/
std::string get_text_ps_id(GWBUF* buffer);
/**
* @brief Replace the ID of a binary protocol statement
*
* @param buffer Buffer containing a binary protocol statement with an ID
* @param id ID to insert into the buffer
*/
void replace_binary_ps_id(GWBUF* buffer, uint32_t id);

View File

@ -31,7 +31,7 @@
extern int (*criteria_cmpfun[LAST_CRITERIA])(const SRWBackend&, const SRWBackend&);
static SRWBackend get_root_master_backend(ROUTER_CLIENT_SES *rses);
static SRWBackend get_root_master_backend(RWSplitSession *rses);
/**
* Find out which of the two backend servers has smaller value for select
@ -58,16 +58,16 @@ static SRWBackend compare_backends(SRWBackend a, SRWBackend b, select_criteria_t
return a;
}
return p(a, b) < 0 ? a : b;
return p(a, b) <= 0 ? a : b;
}
void handle_connection_keepalive(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
void handle_connection_keepalive(RWSplit *inst, RWSplitSession *rses,
SRWBackend& target)
{
ss_dassert(target);
ss_debug(int nserv = 0);
/** Each heartbeat is 1/10th of a second */
int keepalive = inst->rwsplit_config.connection_keepalive * 10;
int keepalive = inst->config().connection_keepalive * 10;
for (SRWBackendList::iterator it = rses->backends.begin();
it != rses->backends.end(); it++)
@ -91,36 +91,21 @@ void handle_connection_keepalive(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
ss_dassert(nserv < rses->rses_nbackends);
}
/**
* Routing function. Find out query type, backend type, and target DCB(s).
* Then route query to found target(s).
* @param inst router instance
* @param rses router session
* @param querybuf GWBUF including the query
*
* @return true if routing succeed or if it failed due to unsupported query.
* false if backend failure was encountered.
*/
bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf)
route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer,
uint8_t* command, uint32_t* type, uint32_t* stmt_id)
{
route_target_t route_target;
bool succp = false;
bool non_empty_packet;
route_target_t route_target = TARGET_MASTER;
ss_dassert(querybuf->next == NULL); // The buffer must be contiguous.
/* packet_type is a problem as it is MySQL specific */
uint8_t command = determine_packet_type(querybuf, &non_empty_packet);
uint32_t qtype = determine_query_type(querybuf, command, non_empty_packet);
if (non_empty_packet)
if (gwbuf_length(buffer) > MYSQL_HEADER_LEN)
{
handle_multi_temp_and_load(rses, querybuf, command, &qtype);
*command = mxs_mysql_get_command(buffer);
*type = determine_query_type(buffer, *command);
handle_multi_temp_and_load(rses, buffer, *command, type);
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{
log_transaction_status(rses, querybuf, qtype);
log_transaction_status(rses, buffer, *type);
}
/**
* Find out where to route the query. Result may not be clear; it is
@ -140,23 +125,69 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
* eventually to master
*/
uint32_t ps_type;
if (qc_get_operation(querybuf) == QUERY_OP_EXECUTE &&
get_text_ps_type(rses, querybuf, &ps_type))
if (rses->target_node && rses->target_node == rses->current_master)
{
qtype = ps_type;
}
/** The session is locked to the master */
route_target = TARGET_MASTER;
route_target = get_route_target(rses, qtype, querybuf->hint);
if (qc_query_is_type(*type, QUERY_TYPE_PREPARE_NAMED_STMT) ||
qc_query_is_type(*type, QUERY_TYPE_PREPARE_STMT))
{
gwbuf_set_type(buffer, GWBUF_TYPE_COLLECT_RESULT);
}
}
else
{
if (*command == MYSQL_COM_QUERY &&
qc_get_operation(buffer) == QUERY_OP_EXECUTE)
{
std::string id = get_text_ps_id(buffer);
*type = rses->ps_manager.get_type(id);
}
else if (is_ps_command(*command))
{
*stmt_id = get_internal_ps_id(rses, buffer);
*type = rses->ps_manager.get_type(*stmt_id);
}
route_target = get_route_target(rses, *command, *type, buffer->hint);
}
}
else
{
/** Empty packet signals end of LOAD DATA LOCAL INFILE, send it to master*/
route_target = TARGET_MASTER;
rses->load_data_state = LOAD_DATA_END;
MXS_INFO("> LOAD DATA LOCAL INFILE finished: %lu bytes sent.",
rses->rses_load_data_sent + gwbuf_length(querybuf));
rses->rses_load_data_sent + gwbuf_length(buffer));
}
return route_target;
}
/**
* Routing function. Find out query type, backend type, and target DCB(s).
* Then route query to found target(s).
* @param inst router instance
* @param rses router session
* @param querybuf GWBUF including the query
*
* @return true if routing succeed or if it failed due to unsupported query.
* false if backend failure was encountered.
*/
bool route_single_stmt(RWSplit *inst, RWSplitSession *rses, GWBUF *querybuf, const RouteInfo& info)
{
bool succp = false;
uint32_t stmt_id = info.stmt_id;
uint8_t command = info.command;
uint32_t qtype = info.type;
route_target_t route_target = info.target;
bool not_locked_to_master = !rses->target_node || rses->target_node != rses->current_master;
if (is_ps_command(command) && not_locked_to_master)
{
/** Replace the client statement ID with our internal one only if the
* target node is not the current master */
replace_binary_ps_id(querybuf, stmt_id);
}
SRWBackend target;
@ -183,7 +214,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
}
else if (TARGET_IS_SLAVE(route_target))
{
if ((target = handle_slave_is_target(inst, rses)))
if ((target = handle_slave_is_target(inst, rses, command, stmt_id)))
{
succp = true;
store_stmt = rses->rses_config.retry_failed_reads;
@ -204,11 +235,21 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
if (target && succp) /*< Have DCB of the target backend */
{
ss_dassert(!store_stmt || TARGET_IS_SLAVE(route_target));
handle_got_target(inst, rses, querybuf, target, store_stmt);
succp = handle_got_target(inst, rses, querybuf, target, store_stmt);
if (succp && command == MYSQL_COM_STMT_EXECUTE && not_locked_to_master)
{
/** Track the targets of the COM_STMT_EXECUTE statements. This
* information is used to route all COM_STMT_FETCH commands
* to the same server where the COM_STMT_EXECUTE was done. */
ss_dassert(stmt_id > 0);
rses->exec_map[stmt_id] = target;
MXS_INFO("COM_STMT_EXECUTE on %s", target->uri());
}
}
}
if (succp && inst->rwsplit_config.connection_keepalive &&
if (succp && inst->config().connection_keepalive &&
(TARGET_IS_SLAVE(route_target) || TARGET_IS_MASTER(route_target)))
{
handle_connection_keepalive(inst, rses, target);
@ -237,15 +278,23 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
* backends being used, otherwise false.
*
*/
bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t command)
bool route_session_write(RWSplitSession *rses, GWBUF *querybuf,
uint8_t command, uint32_t type)
{
/** The SessionCommand takes ownership of the buffer */
uint64_t id = rses->sescmd_count++;
mxs::SSessionCommand sescmd(new mxs::SessionCommand(querybuf, id));
bool expecting_response = command_will_respond(command);
bool expecting_response = mxs_mysql_command_will_respond(command);
int nsucc = 0;
uint64_t lowest_pos = id;
if (qc_query_is_type(type, QUERY_TYPE_PREPARE_NAMED_STMT) ||
qc_query_is_type(type, QUERY_TYPE_PREPARE_STMT))
{
gwbuf_set_type(querybuf, GWBUF_TYPE_COLLECT_RESULT);
rses->ps_manager.store(querybuf, id);
}
MXS_INFO("Session write, routing to all servers.");
for (SRWBackendList::iterator it = rses->backends.begin();
@ -314,6 +363,13 @@ bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t comma
if (nsucc)
{
rses->sent_sescmd = id;
if (!expecting_response)
{
/** The command doesn't generate a response so we increment the
* completed session command count */
rses->recv_sescmd++;
}
}
return nsucc;
@ -330,7 +386,7 @@ bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t comma
*
* @return True if a backend was found
*/
SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
SRWBackend get_target_backend(RWSplitSession *rses, backend_type_t btype,
char *name, int max_rlag)
{
CHK_CLIENT_RSES(rses);
@ -504,25 +560,26 @@ SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
* @return bitfield including the routing target, or the target server name
* if the query would otherwise be routed to slave.
*/
route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
uint32_t qtype, HINT *hint)
route_target_t get_route_target(RWSplitSession *rses, uint8_t command,
uint32_t qtype, HINT *query_hints)
{
bool trx_active = session_trx_is_active(rses->client_dcb->session);
bool load_active = rses->load_data_state != LOAD_DATA_INACTIVE;
mxs_target_t use_sql_variables_in = rses->rses_config.use_sql_variables_in;
int target = TARGET_UNDEFINED;
if (rses->target_node && rses->target_node == rses->current_master)
{
target = TARGET_MASTER;
}
else if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) ||
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT))
/**
* Prepared statements preparations should go to all servers
*/
if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) ||
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT) ||
command == MYSQL_COM_STMT_CLOSE ||
command == MYSQL_COM_STMT_RESET)
{
target = TARGET_ALL;
}
/**
* These queries are not affected by hints
* These queries should be routed to all servers
*/
else if (!load_active &&
(qc_query_is_type(qtype, QUERY_TYPE_SESSION_WRITE) ||
@ -598,7 +655,7 @@ route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
}
else if (session_trx_is_read_only(rses->client_dcb->session))
{
/* Force TARGET_SLAVE for READ ONLY tranaction (active or ending) */
/* Force TARGET_SLAVE for READ ONLY transaction (active or ending) */
target = TARGET_SLAVE;
}
else
@ -631,14 +688,13 @@ route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
target = TARGET_MASTER;
}
/** process routing hints */
while (hint != NULL)
/** Process routing hints */
for (HINT* hint = query_hints; hint; hint = hint->next)
{
if (hint->type == HINT_ROUTE_TO_MASTER)
{
target = TARGET_MASTER; /*< override */
MXS_DEBUG("%lu [get_route_target] Hint: route to master.",
pthread_self());
MXS_DEBUG("Hint: route to master");
break;
}
else if (hint->type == HINT_ROUTE_TO_NAMED_SERVER)
@ -648,42 +704,38 @@ route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
* found, the oroginal target is chosen.
*/
target |= TARGET_NAMED_SERVER;
MXS_DEBUG("%lu [get_route_target] Hint: route to "
"named server : ",
pthread_self());
MXS_DEBUG("Hint: route to named server: %s", (char*)hint->data);
}
else if (hint->type == HINT_ROUTE_TO_UPTODATE_SERVER)
{
/** not implemented */
ss_dassert(false);
}
else if (hint->type == HINT_ROUTE_TO_ALL)
{
/** not implemented */
ss_dassert(false);
}
else if (hint->type == HINT_PARAMETER)
{
if (strncasecmp((char *)hint->data, "max_slave_replication_lag",
if (strncasecmp((char*)hint->data, "max_slave_replication_lag",
strlen("max_slave_replication_lag")) == 0)
{
target |= TARGET_RLAG_MAX;
}
else
{
MXS_ERROR("Unknown hint parameter "
"'%s' when 'max_slave_replication_lag' "
"was expected.",
(char *)hint->data);
MXS_ERROR("Unknown hint parameter '%s' when "
"'max_slave_replication_lag' was expected.",
(char*)hint->data);
}
}
else if (hint->type == HINT_ROUTE_TO_SLAVE)
{
target = TARGET_SLAVE;
MXS_DEBUG("%lu [get_route_target] Hint: route to "
"slave.",
pthread_self());
MXS_DEBUG("Hint: route to slave.");
}
hint = hint->next;
} /*< while (hint != NULL) */
}
return (route_target_t)target;
}
@ -699,7 +751,7 @@ route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
* @param qtype Query type
*/
void
handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
handle_multi_temp_and_load(RWSplitSession *rses, GWBUF *querybuf,
uint8_t packet_type, uint32_t *qtype)
{
/** Check for multi-statement queries. If no master server is available
@ -796,7 +848,7 @@ handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
*
* @return bool - true if succeeded, false otherwise
*/
SRWBackend handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf,
route_target_t route_target)
{
char *named_server = NULL;
@ -875,14 +927,37 @@ SRWBackend handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
*
* @return bool - true if succeeded, false otherwise
*/
SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses)
SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses,
uint8_t cmd, uint32_t stmt_id)
{
int rlag_max = rses_get_max_replication_lag(rses);
SRWBackend target = get_target_backend(rses, BE_SLAVE, NULL, rlag_max);
SRWBackend target;
if (cmd == MYSQL_COM_STMT_FETCH)
{
/** The COM_STMT_FETCH must be executed on the same server as the
* COM_STMT_EXECUTE was executed on */
ExecMap::iterator it = rses->exec_map.find(stmt_id);
if (it != rses->exec_map.end())
{
target = it->second;
MXS_INFO("COM_STMT_FETCH on %s", target->uri());
}
else
{
MXS_WARNING("Unknown statement ID %u used in COM_STMT_FETCH", stmt_id);
}
}
if (!target)
{
target = get_target_backend(rses, BE_SLAVE, NULL, rlag_max);
}
if (target)
{
atomic_add_uint64(&inst->stats.n_slave, 1);
atomic_add_uint64(&inst->stats().n_slave, 1);
}
else
{
@ -897,7 +972,7 @@ SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses
*
* @param rses Router session
*/
static void log_master_routing_failure(ROUTER_CLIENT_SES *rses, bool found,
static void log_master_routing_failure(RWSplitSession *rses, bool found,
SRWBackend& old_master, SRWBackend& curr_master)
{
char errmsg[MAX_SERVER_ADDRESS_LEN * 2 + 100]; // Extra space for error message
@ -946,7 +1021,7 @@ static void log_master_routing_failure(ROUTER_CLIENT_SES *rses, bool found,
}
MXS_WARNING("[%s] Write query received from %s@%s. %s. Closing client connection.",
rses->router->service->name, rses->client_dcb->user,
rses->router->service()->name, rses->client_dcb->user,
rses->client_dcb->remote, errmsg);
}
@ -961,7 +1036,7 @@ static void log_master_routing_failure(ROUTER_CLIENT_SES *rses, bool found,
*
* @return bool - true if succeeded, false otherwise
*/
bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
bool handle_master_is_target(RWSplit *inst, RWSplitSession *rses,
SRWBackend* dest)
{
SRWBackend target = get_target_backend(rses, BE_MASTER, NULL, MAX_RLAG_UNDEFINED);
@ -969,7 +1044,7 @@ bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
if (target && target == rses->current_master)
{
atomic_add_uint64(&inst->stats.n_master, 1);
atomic_add_uint64(&inst->stats().n_master, 1);
}
else
{
@ -994,28 +1069,21 @@ bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
return succp;
}
static inline bool query_creates_reply(mysql_server_cmd_t cmd)
static inline bool query_creates_reply(uint8_t cmd)
{
return cmd != MYSQL_COM_QUIT &&
cmd != MYSQL_COM_STMT_SEND_LONG_DATA &&
cmd != MYSQL_COM_STMT_CLOSE;
cmd != MYSQL_COM_STMT_CLOSE &&
cmd != MYSQL_COM_STMT_FETCH; // Fetch is done mid-result
}
/**
* @brief Handle got a target
* @brief Handle writing to a target server
*
* One of the possible types of handling required when a request is routed
*
* @param inst Router instance
* @param ses Router session
* @param querybuf Buffer containing query to be routed
* @param target_dcb DCB for the target server
*
* @return bool - true if succeeded, false otherwise
* @return True on success
*/
bool
handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, SRWBackend& target, bool store)
bool handle_got_target(RWSplit *inst, RWSplitSession *rses,
GWBUF *querybuf, SRWBackend& target, bool store)
{
/**
* If the transaction is READ ONLY set forced_node to this backend.
@ -1036,7 +1104,7 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
ss_dassert(target->session_command_count() == 0);
mxs::Backend::response_type response = mxs::Backend::NO_RESPONSE;
mysql_server_cmd_t cmd = mxs_mysql_current_command(rses->client_dcb->session);
uint8_t cmd = mxs_mysql_get_command(querybuf);
if (rses->load_data_state != LOAD_DATA_ACTIVE &&
query_creates_reply(cmd))
@ -1051,7 +1119,7 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
MXS_ERROR("Failed to store current statement, it won't be retried if it fails.");
}
atomic_add_uint64(&inst->stats.n_queries, 1);
atomic_add_uint64(&inst->stats().n_queries, 1);
if (response == mxs::Backend::EXPECT_RESPONSE)
{
@ -1107,7 +1175,7 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
* @return The backend that points to the master server or an empty reference
* if the master cannot be found
*/
static SRWBackend get_root_master_backend(ROUTER_CLIENT_SES *rses)
static SRWBackend get_root_master_backend(RWSplitSession *rses)
{
SRWBackend candidate;
SERVER master = {};

View File

@ -54,7 +54,7 @@ static bool valid_for_slave(const SERVER *server, const SERVER *master)
*
* @return The best slave backend reference or NULL if no candidates could be found
*/
SRWBackend get_slave_candidate(ROUTER_CLIENT_SES* rses, const SERVER *master,
SRWBackend get_slave_candidate(RWSplitSession* rses, const SERVER *master,
int (*cmpfun)(const SRWBackend&, const SRWBackend&))
{
SRWBackend candidate;
@ -198,7 +198,7 @@ int (*criteria_cmpfun[LAST_CRITERIA])(const SRWBackend&, const SRWBackend&) =
* @param rses Router client session
*/
static void log_server_connections(select_criteria_t criteria,
ROUTER_CLIENT_SES* rses)
RWSplitSession* rses)
{
MXS_INFO("Servers and %s connection counts:",
criteria == LEAST_GLOBAL_CONNECTIONS ? "all MaxScale" : "router");
@ -246,7 +246,7 @@ static void log_server_connections(select_criteria_t criteria,
*
* @return The root master reference or NULL if no master is found
*/
static SERVER_REF* get_root_master(ROUTER_CLIENT_SES* rses)
static SERVER_REF* get_root_master(RWSplitSession* rses)
{
SERVER_REF *master_host = NULL;
@ -290,15 +290,15 @@ bool select_connect_backend_servers(int router_nservers,
int max_nslaves,
select_criteria_t select_criteria,
MXS_SESSION *session,
ROUTER_INSTANCE *router,
ROUTER_CLIENT_SES *rses,
RWSplit *router,
RWSplitSession *rses,
connection_type type)
{
/* get the root Master */
SERVER_REF *master_backend = get_root_master(rses);
SERVER *master_host = master_backend ? master_backend->server : NULL;
if (router->rwsplit_config.master_failure_mode == RW_FAIL_INSTANTLY &&
if (router->config().master_failure_mode == RW_FAIL_INSTANTLY &&
(master_host == NULL || SERVER_IS_DOWN(master_host)))
{
MXS_ERROR("Couldn't find suitable Master from %d candidates.", router_nservers);

View File

@ -26,7 +26,7 @@
* Functions for session command handling
*/
void process_sescmd_response(ROUTER_CLIENT_SES* rses, SRWBackend& backend,
void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend,
GWBUF** ppPacket, bool* pReconnect)
{
if (backend->session_command_count())
@ -36,7 +36,17 @@ void process_sescmd_response(ROUTER_CLIENT_SES* rses, SRWBackend& backend,
{
uint8_t cmd;
gwbuf_copy_data(*ppPacket, MYSQL_HEADER_LEN, 1, &cmd);
uint8_t command = backend->next_session_command()->get_command();
uint64_t id = backend->complete_session_command();
MXS_PS_RESPONSE resp = {};
if (command == MYSQL_COM_STMT_PREPARE)
{
// This should never fail or the backend protocol is broken
ss_debug(bool b = )mxs_mysql_extract_ps_response(*ppPacket, &resp);
ss_dassert(b);
backend->add_ps_handle(id, resp.id);
}
if (rses->recv_sescmd < rses->sent_sescmd &&
id == rses->recv_sescmd + 1 &&
@ -49,6 +59,12 @@ void process_sescmd_response(ROUTER_CLIENT_SES* rses, SRWBackend& backend,
/** Store the master's response so that the slave responses can
* be compared to it */
rses->sescmd_responses[id] = cmd;
if (command == MYSQL_COM_STMT_PREPARE)
{
/** Map the returned response to the internal ID */
rses->ps_handles[resp.id] = id;
}
}
else
{

View File

@ -35,6 +35,68 @@
* somewhere else, outside this router. Perhaps in the query classifier?
*/
/**
* @brief Map a function over the list of tables in the query
*
* @param rses Router client session
* @param querybuf The query to inspect
* @param func Callback that is called for each table
*
* @return True if all tables were iterated, false if the iteration was stopped early
*/
static bool foreach_table(RWSplitSession* rses, GWBUF* querybuf, bool (*func)(RWSplitSession*,
const std::string&))
{
bool rval = true;
int n_tables;
char** tables = qc_get_table_names(querybuf, &n_tables, true);
for (int i = 0; i < n_tables; i++)
{
const char* db = mxs_mysql_get_current_db(rses->client_dcb->session);
std::string table;
if (strchr(tables[i], '.') == NULL)
{
table += db;
table += ".";
}
table += tables[i];
if (!func(rses, table))
{
rval = false;
break;
}
}
return rval;
}
/**
* Delete callback for foreach_table
*/
bool delete_table(RWSplitSession *rses, const std::string& table)
{
rses->temp_tables.erase(table);
return true;
}
/**
* Find callback for foreach_table
*/
bool find_table(RWSplitSession* rses, const std::string& table)
{
if (rses->temp_tables.find(table) != rses->temp_tables.end())
{
MXS_INFO("Query targets a temporary table: %s", table.c_str());
return false;
}
return true;
}
/**
* @brief Check for dropping of temporary tables
*
@ -44,27 +106,11 @@
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
*/
void check_drop_tmp_table(ROUTER_CLIENT_SES *router_cli_ses, GWBUF *querybuf)
void check_drop_tmp_table(RWSplitSession *rses, GWBUF *querybuf)
{
if (qc_is_drop_table_query(querybuf))
{
const QC_FIELD_INFO* info;
size_t n_infos;
qc_get_field_info(querybuf, &info, &n_infos);
for (size_t i = 0; i < n_infos; i++)
{
const char* db = mxs_mysql_get_current_db(router_cli_ses->client_dcb->session);
std::string table = info[i].database ? info[i].database : db;
table += ".";
if (info[i].table)
{
table += info[i].table;
}
router_cli_ses->temp_tables.erase(table);
}
foreach_table(rses, querybuf, delete_table);
}
}
@ -75,41 +121,22 @@ void check_drop_tmp_table(ROUTER_CLIENT_SES *router_cli_ses, GWBUF *querybuf)
* @param type The type of the query resolved so far
* @return The type of the query
*/
bool is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
bool is_read_tmp_table(RWSplitSession *rses,
GWBUF *querybuf,
uint32_t qtype)
{
ss_dassert(router_cli_ses && querybuf && router_cli_ses->client_dcb);
ss_dassert(rses && querybuf && rses->client_dcb);
bool rval = false;
if (qtype & (QUERY_TYPE_READ |
QUERY_TYPE_LOCAL_READ |
QUERY_TYPE_USERVAR_READ |
QUERY_TYPE_SYSVAR_READ |
QUERY_TYPE_GSYSVAR_READ))
if (qc_query_is_type(qtype, QUERY_TYPE_READ) ||
qc_query_is_type(qtype, QUERY_TYPE_LOCAL_READ) ||
qc_query_is_type(qtype, QUERY_TYPE_USERVAR_READ) ||
qc_query_is_type(qtype, QUERY_TYPE_SYSVAR_READ) ||
qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_READ))
{
const QC_FIELD_INFO* info;
size_t n_infos;
qc_get_field_info(querybuf, &info, &n_infos);
for (size_t i = 0; i < n_infos; i++)
if (!foreach_table(rses, querybuf, find_table))
{
const char* db = mxs_mysql_get_current_db(router_cli_ses->client_dcb->session);
std::string table = info[i].database ? info[i].database : db;
table += ".";
if (info[i].table)
{
table += info[i].table;
}
if (router_cli_ses->temp_tables.find(table) !=
router_cli_ses->temp_tables.end())
{
rval = true;
MXS_INFO("Query targets a temporary table: %s", table.c_str());
break;
}
rval = true;
}
}
@ -125,7 +152,7 @@ bool is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
*/
void check_create_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
void check_create_tmp_table(RWSplitSession *router_cli_ses,
GWBUF *querybuf, uint32_t type)
{
if (qc_query_is_type(type, QUERY_TYPE_CREATE_TMP_TABLE))
@ -137,7 +164,7 @@ void check_create_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
char* tblname = qc_get_created_table_name(querybuf);
std::string table;
if (tblname && *tblname)
if (tblname && *tblname && strchr(tblname, '.') == NULL)
{
const char* db = mxs_mysql_get_current_db(router_cli_ses->client_dcb->session);
table += db;
@ -197,69 +224,3 @@ bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type)
return rval;
}
/**
* @brief Determine the type of a query
*
* @param querybuf GWBUF containing the query
* @param packet_type Integer denoting DB specific enum
* @param non_empty_packet Boolean to be set by this function
*
* @return uint32_t the query type; also the non_empty_packet bool is set
*/
uint32_t
determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet)
{
uint32_t qtype = QUERY_TYPE_UNKNOWN;
if (non_empty_packet)
{
uint8_t my_packet_type = (uint8_t)packet_type;
switch (my_packet_type)
{
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
case MYSQL_COM_PING: /*< 0e all servers are pinged */
case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
case MYSQL_COM_SET_OPTION: /*< 1b send options to all servers */
qtype = QUERY_TYPE_SESSION_WRITE;
break;
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
qtype = QUERY_TYPE_WRITE;
break;
case MYSQL_COM_QUERY:
qtype = qc_get_type_mask(querybuf);
break;
case MYSQL_COM_STMT_PREPARE:
qtype = qc_get_type_mask(querybuf);
qtype |= QUERY_TYPE_PREPARE_STMT;
break;
case MYSQL_COM_STMT_EXECUTE:
/** Parsing is not needed for this type of packet */
qtype = QUERY_TYPE_EXEC_STMT;
break;
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
case MYSQL_COM_STATISTICS: /**< 9 ? */
case MYSQL_COM_PROCESS_INFO: /**< 0a ? */
case MYSQL_COM_CONNECT: /**< 0b ? */
case MYSQL_COM_PROCESS_KILL: /**< 0c ? */
case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */
case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */
case MYSQL_COM_DAEMON: /**< 1d ? */
default:
break;
} /**< switch by packet type */
}
return qtype;
}

View File

@ -0,0 +1,111 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "rwsplitsession.hh"
#include "rwsplit_internal.hh"
RWBackend::RWBackend(SERVER_REF* ref):
mxs::Backend(ref),
m_reply_state(REPLY_STATE_DONE)
{
}
RWBackend::~RWBackend()
{
}
reply_state_t RWBackend::get_reply_state() const
{
return m_reply_state;
}
void RWBackend::set_reply_state(reply_state_t state)
{
m_reply_state = state;
}
bool RWBackend::execute_session_command()
{
bool expect_response = mxs_mysql_command_will_respond(next_session_command()->get_command());
bool rval = mxs::Backend::execute_session_command();
if (rval && expect_response)
{
set_reply_state(REPLY_STATE_START);
}
return rval;
}
void RWBackend::add_ps_handle(uint32_t id, uint32_t handle)
{
m_ps_handles[id] = handle;
MXS_INFO("PS response for %s: %u -> %u", name(), id, handle);
}
uint32_t RWBackend::get_ps_handle(uint32_t id) const
{
BackendHandleMap::const_iterator it = m_ps_handles.find(id);
if (it != m_ps_handles.end())
{
return it->second;
}
return 0;
}
bool RWBackend::write(GWBUF* buffer, response_type type)
{
uint8_t cmd = mxs_mysql_get_command(buffer);
if (is_ps_command(cmd))
{
uint32_t id = mxs_mysql_extract_ps_id(buffer);
BackendHandleMap::iterator it = m_ps_handles.find(id);
if (it != m_ps_handles.end())
{
/** Replace the client handle with the real PS handle */
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
gw_mysql_set_byte4(ptr, it->second);
}
}
return mxs::Backend::write(buffer);
}
uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer)
{
uint32_t rval = 0;
// All COM_STMT type statements store the ID in the same place
uint32_t id = mxs_mysql_extract_ps_id(buffer);
ClientHandleMap::iterator it = rses->ps_handles.find(id);
if (it != rses->ps_handles.end())
{
rval = it->second;
}
return rval;
}
RouteInfo::RouteInfo(RWSplitSession* rses, GWBUF* buffer):
target(TARGET_UNDEFINED),
command(0xff),
type(QUERY_TYPE_UNKNOWN),
stmt_id(0)
{
target = get_target_type(rses, buffer, &command, &type, &stmt_id);
}

View File

@ -0,0 +1,148 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "readwritesplit.hh"
#include "rwsplit_ps.hh"
/** Enum for tracking client reply state */
enum reply_state_t
{
REPLY_STATE_START, /**< Query sent to backend */
REPLY_STATE_DONE, /**< Complete reply received */
REPLY_STATE_RSET_COLDEF, /**< Resultset response, waiting for column definitions */
REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */
};
/** Reply state change debug logging */
#define LOG_RS(a, b) MXS_DEBUG("%s %s -> %s", (a)->uri(), \
rstostr((a)->get_reply_state()), rstostr(b));
typedef std::map<uint32_t, uint32_t> BackendHandleMap; /** Internal ID to external ID */
typedef std::map<uint32_t, uint32_t> ClientHandleMap; /** External ID to internal ID */
class RWBackend: public mxs::Backend
{
RWBackend(const RWBackend&);
RWBackend& operator=(const RWBackend&);
public:
RWBackend(SERVER_REF* ref);
~RWBackend();
reply_state_t get_reply_state() const;
void set_reply_state(reply_state_t state);
void add_ps_handle(uint32_t id, uint32_t handle);
uint32_t get_ps_handle(uint32_t id) const;
bool execute_session_command();
bool write(GWBUF* buffer, response_type type = EXPECT_RESPONSE);
private:
reply_state_t m_reply_state;
BackendHandleMap m_ps_handles; /**< Internal ID to backend PS handle mapping */
};
typedef std::tr1::shared_ptr<RWBackend> SRWBackend;
typedef std::list<SRWBackend> SRWBackendList;
typedef std::tr1::unordered_set<std::string> TableSet;
typedef std::map<uint64_t, uint8_t> ResponseMap;
/** Map of COM_STMT_EXECUTE targets by internal ID */
typedef std::tr1::unordered_map<uint32_t, SRWBackend> ExecMap;
/**
* The client session of a RWSplit instance
*/
class RWSplitSession
{
RWSplitSession(const RWSplitSession&);
RWSplitSession& operator=(const RWSplitSession&);
public:
RWSplitSession(const Config& config);
// TODO: Make member variables private
skygw_chk_t rses_chk_top;
bool rses_closed; /**< true when closeSession is called */
SRWBackendList backends; /**< List of backend servers */
SRWBackend current_master; /**< Current master server */
SRWBackend target_node; /**< The currently locked target node */
Config rses_config; /**< copied config info from router instance */
int rses_nbackends;
enum ld_state load_data_state; /**< Current load data state */
bool have_tmp_tables;
uint64_t rses_load_data_sent; /**< How much data has been sent */
DCB* client_dcb;
uint64_t sescmd_count;
int expected_responses; /**< Number of expected responses to the current query */
GWBUF* query_queue; /**< Queued commands waiting to be executed */
RWSplit* router; /**< The router instance */
TableSet temp_tables; /**< Set of temporary tables */
mxs::SessionCommandList sescmd_list; /**< List of executed session commands */
ResponseMap sescmd_responses; /**< Response to each session command */
uint64_t sent_sescmd; /**< ID of the last sent session command*/
uint64_t recv_sescmd; /**< ID of the most recently completed session command */
PSManager ps_manager; /**< Prepared statement manager*/
ClientHandleMap ps_handles; /**< Client PS handle to internal ID mapping */
ExecMap exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */
skygw_chk_t rses_chk_tail;
};
/** Struct for holding routing related information */
struct RouteInfo
{
RouteInfo(RWSplitSession* rses, GWBUF* buffer);
route_target_t target; /**< Route target type, TARGET_UNDEFINED for unknown */
uint8_t command; /**< The command byte, 0xff for unknown commands */
uint32_t type; /**< The query type, QUERY_TYPE_UNKNOWN for unknown types*/
uint32_t stmt_id; /**< Prepared statement ID, 0 for unknown */
};
/**
* Helper function to convert reply_state_t to string
*/
static inline const char* rstostr(reply_state_t state)
{
switch (state)
{
case REPLY_STATE_START:
return "REPLY_STATE_START";
case REPLY_STATE_DONE:
return "REPLY_STATE_DONE";
case REPLY_STATE_RSET_COLDEF:
return "REPLY_STATE_RSET_COLDEF";
case REPLY_STATE_RSET_ROWS:
return "REPLY_STATE_RSET_ROWS";
}
ss_dassert(false);
return "UNKNOWN";
}
/**
* @brief Get the internal ID for the given binary prepared statement
*
* @param rses Router client session
* @param buffer Buffer containing a binary protocol statement other than COM_STMT_PREPARE
*
* @return The internal ID of the prepared statement that the buffer contents refer to
*/
uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer);