Merge branch 'binlog_server_wait_data' into binlog_server_waitdata_encryption

This commit is contained in:
MassimilianoPinto
2016-09-23 16:37:22 +02:00
52 changed files with 4965 additions and 3844 deletions

View File

@ -380,9 +380,9 @@ GWBUF* gwbuf_clone_all(GWBUF* buf)
}
GWBUF *gwbuf_clone_portion(GWBUF *buf,
size_t start_offset,
size_t length)
static GWBUF *gwbuf_clone_portion(GWBUF *buf,
size_t start_offset,
size_t length)
{
GWBUF* clonebuf;

View File

@ -957,6 +957,10 @@ handle_global_item(const char *name, const char *value)
{
mxs_log_set_highprecision_enabled(config_truth_value((char*)value));
}
else if (strcmp(name, "skip_permission_checks") == 0)
{
gateway.skip_permission_checks = config_truth_value((char*)value);
}
else if (strcmp(name, "auth_connect_timeout") == 0)
{
char* endptr;
@ -1301,6 +1305,7 @@ global_defaults()
gateway.auth_conn_timeout = DEFAULT_AUTH_CONNECT_TIMEOUT;
gateway.auth_read_timeout = DEFAULT_AUTH_READ_TIMEOUT;
gateway.auth_write_timeout = DEFAULT_AUTH_WRITE_TIMEOUT;
gateway.skip_permission_checks = false;
if (version_string != NULL)
{
gateway.version_string = MXS_STRDUP_A(version_string);

View File

@ -2689,7 +2689,8 @@ static bool check_server_permissions(SERVICE *service, SERVER* server,
*/
bool check_service_permissions(SERVICE* service)
{
if (is_internal_service(service->routerModule))
if (is_internal_service(service->routerModule) ||
config_get_global_options()->skip_permission_checks)
{
return true;
}

View File

@ -215,10 +215,10 @@ static const int LM_MESSAGE_HASH_SIZE = 293; /** A prime, and roughly a quarter
*
* @return Current monotonic raw time in milliseconds.
*/
static uint64_t time_monotonic_raw_ms()
static uint64_t time_monotonic_ms()
{
struct timespec now;
clock_gettime(CLOCK_MONOTONIC_RAW, &now);
clock_gettime(CLOCK_MONOTONIC, &now);
return now.tv_sec * 1000 + now.tv_nsec / 1000000;
}
@ -2815,7 +2815,7 @@ static message_suppression_t message_status(const char* file, int line)
{
LM_MESSAGE_STATS stats;
spinlock_init(&stats.lock);
stats.first_ms = time_monotonic_raw_ms();
stats.first_ms = time_monotonic_ms();
stats.last_ms = 0;
stats.count = 0;
@ -2829,7 +2829,7 @@ static message_suppression_t message_status(const char* file, int line)
if (value)
{
uint64_t now_ms = time_monotonic_raw_ms();
uint64_t now_ms = time_monotonic_ms();
spinlock_acquire(&value->lock);

View File

@ -543,6 +543,11 @@ bool check_monitor_permissions(MONITOR* monitor, const char* query)
return false;
}
if (config_get_global_options()->skip_permission_checks)
{
return true;
}
char *user = monitor->user;
char *dpasswd = decryptPassword(monitor->password);
GATEWAY_CONF* cnf = config_get_global_options();

View File

@ -183,14 +183,24 @@ char* qc_get_canonical(GWBUF* query)
{
QC_TRACE();
ss_dassert(classifier);
char *rval;
if (classifier->qc_get_canonical)
{
return classifier->qc_get_canonical(query);
rval = classifier->qc_get_canonical(query);
}
else
{
return modutil_get_canonical(query);
rval = modutil_get_canonical(query);
}
if (rval)
{
squeeze_whitespace(rval);
}
return rval;
}
bool qc_query_has_clause(GWBUF* query)

View File

@ -300,7 +300,7 @@ serviceStartPort(SERVICE *service, SERV_LISTENER *port)
/** Load the authentication users before before starting the listener */
if (port->listener->authfunc.loadusers &&
(service->router->getCapabilities() & RCAP_TYPE_NO_USERS_INIT) == 0 &&
port->listener->authfunc.loadusers(port) != AUTH_LOADUSERS_OK)
port->listener->authfunc.loadusers(port) != MXS_AUTH_LOADUSERS_OK)
{
MXS_ERROR("[%s] Failed to load users for listener '%s', authentication might not work.",
service->name, port->name);
@ -1457,7 +1457,7 @@ int service_refresh_users(SERVICE *service)
for (SERV_LISTENER *port = service->ports; port; port = port->next)
{
if (port->listener->authfunc.loadusers(port) != AUTH_LOADUSERS_OK)
if (port->listener->authfunc.loadusers(port) != MXS_AUTH_LOADUSERS_OK)
{
MXS_ERROR("[%s] Failed to load users for listener '%s', authentication might not work.",
service->name, port->name);

View File

@ -377,15 +377,6 @@ test1()
gwbuf_free(clone);
ss_dfprintf(stderr, "Freed cloned buffer");
ss_dfprintf(stderr, "\t..done\n");
partclone = gwbuf_clone_portion(buffer, 25, 50);
buflen = GWBUF_LENGTH(partclone);
ss_dfprintf(stderr, "Part cloned buffer length is now %d", buflen);
ss_info_dassert(50 == buflen, "Incorrect buffer size");
ss_info_dassert(0 == GWBUF_EMPTY(partclone), "Part cloned buffer should not be empty");
ss_dfprintf(stderr, "\t..done\n");
gwbuf_free(partclone);
ss_dfprintf(stderr, "Freed part cloned buffer");
ss_dfprintf(stderr, "\t..done\n");
buffer = gwbuf_consume(buffer, bite1);
ss_info_dassert(NULL != buffer, "Buffer should not be null");
buflen = GWBUF_LENGTH(buffer);

View File

@ -224,5 +224,5 @@ int users_default_loadusers(SERV_LISTENER *port)
{
users_free(port->users);
port->users = users_alloc();
return AUTH_LOADUSERS_OK;
return MXS_AUTH_LOADUSERS_OK;
}

View File

@ -197,7 +197,6 @@ extern unsigned int gwbuf_length(GWBUF *head);
extern int gwbuf_count(GWBUF *head);
extern size_t gwbuf_copy_data(GWBUF *buffer, size_t offset, size_t bytes,
uint8_t* dest);
extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len);
extern GWBUF *gwbuf_split(GWBUF **buf, size_t length);
extern GWBUF *gwbuf_clone_transform(GWBUF *head, gwbuf_type_t type);
extern GWBUF *gwbuf_clone_all(GWBUF* head);

View File

@ -47,6 +47,7 @@ struct servlistener;
* authenticate Carry out the authentication
* free Free extracted data
* loadusers Load or update authenticator user data
* plugin_name The protocol specific name of the authentication plugin.
* @endverbatim
*
* This forms the "module object" for authenticator modules within the gateway.
@ -60,11 +61,21 @@ typedef struct gw_authenticator
int (*authenticate)(struct dcb *);
void (*free)(struct dcb *);
int (*loadusers)(struct servlistener *);
const char* plugin_name;
} GWAUTHENTICATOR;
/** Return values for extract and authenticate entry points */
#define MXS_AUTH_SUCCEEDED 0 /**< Authentication was successful */
#define MXS_AUTH_FAILED 1 /**< Authentication failed */
#define MXS_AUTH_FAILED_DB 2
#define MXS_AUTH_FAILED_SSL 3
#define MXS_AUTH_INCOMPLETE 4 /**< Authentication is not yet complete */
#define MXS_AUTH_SSL_INCOMPLETE 5 /**< SSL connection is not yet complete */
#define MXS_AUTH_NO_SESSION 6
/** Return values for the loadusers entry point */
#define AUTH_LOADUSERS_OK 0 /**< Users loaded successfully */
#define AUTH_LOADUSERS_ERROR 1 /**< Failed to load users */
#define MXS_AUTH_LOADUSERS_OK 0 /**< Users loaded successfully */
#define MXS_AUTH_LOADUSERS_ERROR 1 /**< Failed to load users */
/**
* The GWAUTHENTICATOR version data. The following should be updated whenever

View File

@ -21,30 +21,31 @@
EXTERN_C_BLOCK_BEGIN
// NOTE: If you make changes here, ensure they are compatible with the
// situation in <root>/CMakeLists.txt, where directories are installed.
#define MXS_DEFAULT_PID_SUBPATH "run/maxscale"
#define MXS_DEFAULT_LOG_SUBPATH "log/maxscale"
#define MXS_DEFAULT_DATA_SUBPATH "lib/maxscale"
#define MXS_DEFAULT_LIB_SUBPATH "@MAXSCALE_LIBDIR@"
#define MXS_DEFAULT_CACHE_SUBPATH "cache/maxscale"
#define MXS_DEFAULT_LANG_SUBPATH "lib/maxscale"
#define MXS_DEFAULT_EXEC_SUBPATH "@MAXSCALE_BINDIR@"
#define MXS_DEFAULT_CONFIG_SUBPATH "etc"
/**
* All of the following DEFAULT_* variables are defined in cmake/install_layout.cmake
*/
#define MXS_DEFAULT_PID_SUBPATH "@DEFAULT_PID_SUBPATH@"
#define MXS_DEFAULT_LOG_SUBPATH "@DEFAULT_LOG_SUBPATH@"
#define MXS_DEFAULT_DATA_SUBPATH "@DEFAULT_DATA_SUBPATH@"
#define MXS_DEFAULT_LIB_SUBPATH "@DEFAULT_LIB_SUBPATH@"
#define MXS_DEFAULT_CACHE_SUBPATH "@DEFAULT_CACHE_SUBPATH@"
#define MXS_DEFAULT_LANG_SUBPATH "@DEFAULT_LANG_SUBPATH@"
#define MXS_DEFAULT_EXEC_SUBPATH "@DEFAULT_EXEC_SUBPATH@"
#define MXS_DEFAULT_CONFIG_SUBPATH "@DEFAULT_CONFIG_SUBPATH@"
/** Default file locations, configured by CMake */
static const char* default_cnf_fname = "maxscale.cnf";
static const char* default_configdir = "/" MXS_DEFAULT_CONFIG_SUBPATH;
static const char* default_configdir = "@DEFAULT_CONFIGDIR@";
/*< This should be changed to just /run eventually,
* the /var/run folder is an old standard and the newer FSH 3.0
* uses /run for PID files.*/
static const char* default_piddir = "@MAXSCALE_VARDIR@/" MXS_DEFAULT_PID_SUBPATH;
static const char* default_logdir = "@MAXSCALE_VARDIR@/" MXS_DEFAULT_LOG_SUBPATH;
static const char* default_datadir = "@MAXSCALE_VARDIR@/" MXS_DEFAULT_DATA_SUBPATH;
static const char* default_libdir = "@CMAKE_INSTALL_PREFIX@/" MXS_DEFAULT_LIB_SUBPATH;
static const char* default_cachedir = "@MAXSCALE_VARDIR@/" MXS_DEFAULT_CACHE_SUBPATH;
static const char* default_langdir = "@MAXSCALE_VARDIR@/" MXS_DEFAULT_LANG_SUBPATH;
static const char* default_execdir = "@CMAKE_INSTALL_PREFIX@/" MXS_DEFAULT_EXEC_SUBPATH;
static const char* default_piddir = "@DEFAULT_PIDDIR@";
static const char* default_logdir = "@DEFAULT_LOGDIR@";
static const char* default_datadir = "@DEFAULT_DATADIR@";
static const char* default_libdir = "@DEFAULT_LIBDIR@";
static const char* default_cachedir = "@DEFAULT_CACHEDIR@";
static const char* default_langdir = "@DEFAULT_LANGDIR@";
static const char* default_execdir = "@DEFAULT_EXECDIR@";
static char* configdir = NULL;
static char* logdir = NULL;

View File

@ -121,6 +121,7 @@ typedef struct
unsigned int auth_conn_timeout; /**< Connection timeout for the user authentication */
unsigned int auth_read_timeout; /**< Read timeout for the user authentication */
unsigned int auth_write_timeout; /**< Write timeout for the user authentication */
bool skip_permission_checks; /**< Skip service and monitor permission checks */
char qc_name[PATH_MAX]; /**< The name of the query classifier to load */
char* qc_args; /**< Arguments for the query classifier */
} GATEWAY_CONF;

View File

@ -71,7 +71,8 @@ static GWAUTHENTICATOR MyObject =
cdc_auth_is_client_ssl_capable, /* Check if client supports SSL */
cdc_auth_authenticate, /* Authenticate user credentials */
cdc_auth_free_client_data, /* Free the client data held in DCB */
cdc_replace_users
cdc_replace_users,
NULL
};
static int cdc_auth_check(
@ -180,7 +181,7 @@ cdc_auth_authenticate(DCB *dcb)
auth_ret = cdc_auth_check(dcb, protocol, client_data->user, client_data->auth_data, client_data->flags);
/* On failed authentication try to reload users and authenticate again */
if (CDC_STATE_AUTH_OK != auth_ret && cdc_replace_users(dcb->listener) == AUTH_LOADUSERS_OK)
if (CDC_STATE_AUTH_OK != auth_ret && cdc_replace_users(dcb->listener) == MXS_AUTH_LOADUSERS_OK)
{
auth_ret = cdc_auth_check(dcb, protocol, client_data->user, client_data->auth_data, client_data->flags);
}
@ -482,7 +483,7 @@ cdc_read_users(USERS *users, char *usersfile)
*/
int cdc_replace_users(SERV_LISTENER *listener)
{
int rc = AUTH_LOADUSERS_ERROR;
int rc = MXS_AUTH_LOADUSERS_ERROR;
USERS *newusers = users_alloc();
if (newusers)
@ -500,7 +501,7 @@ int cdc_replace_users(SERV_LISTENER *listener)
/** Successfully loaded at least one user */
oldusers = listener->users;
listener->users = newusers;
rc = AUTH_LOADUSERS_OK;
rc = MXS_AUTH_LOADUSERS_OK;
}
else if (listener->users)
{

View File

@ -63,7 +63,8 @@ static GWAUTHENTICATOR MyObject =
http_auth_is_client_ssl_capable, /* Check if client supports SSL */
http_auth_authenticate, /* Authenticate user credentials */
http_auth_free_client_data, /* Free the client data held in DCB */
users_default_loadusers
users_default_loadusers,
NULL
};
typedef struct http_auth

View File

@ -63,7 +63,8 @@ static GWAUTHENTICATOR MyObject =
max_admin_auth_is_client_ssl_capable, /* Check if client supports SSL */
max_admin_auth_authenticate, /* Authenticate user credentials */
max_admin_auth_free_client_data, /* Free the client data held in DCB */
users_default_loadusers
users_default_loadusers,
NULL
};
/**

View File

@ -66,7 +66,8 @@ static GWAUTHENTICATOR MyObject =
mysql_auth_is_client_ssl_capable, /* Check if client supports SSL */
mysql_auth_authenticate, /* Authenticate user credentials */
mysql_auth_free_client_data, /* Free the client data held in DCB */
mysql_auth_load_users /* Load users from backend databases */
mysql_auth_load_users, /* Load users from backend databases */
"mysql_native_password"
};
static int combined_auth_check(
@ -151,23 +152,23 @@ mysql_auth_authenticate(DCB *dcb)
if (0 != ssl_ret)
{
auth_ret = (SSL_ERROR_CLIENT_NOT_SSL == ssl_ret) ? MYSQL_FAILED_AUTH_SSL : MYSQL_FAILED_AUTH;
auth_ret = (SSL_ERROR_CLIENT_NOT_SSL == ssl_ret) ? MXS_AUTH_FAILED_SSL : MXS_AUTH_FAILED;
}
else if (!health_after)
{
auth_ret = MYSQL_AUTH_SSL_INCOMPLETE;
auth_ret = MXS_AUTH_SSL_INCOMPLETE;
}
else if (!health_before && health_after)
{
auth_ret = MYSQL_AUTH_SSL_INCOMPLETE;
auth_ret = MXS_AUTH_SSL_INCOMPLETE;
poll_add_epollin_event_to_dcb(dcb, NULL);
}
else if (0 == strlen(client_data->user))
{
auth_ret = MYSQL_FAILED_AUTH;
auth_ret = MXS_AUTH_FAILED;
}
else
@ -180,14 +181,14 @@ mysql_auth_authenticate(DCB *dcb)
/* On failed authentication try to load user table from backend database */
/* Success for service_refresh_users returns 0 */
if (MYSQL_AUTH_SUCCEEDED != auth_ret && 0 == service_refresh_users(dcb->service))
if (MXS_AUTH_SUCCEEDED != auth_ret && 0 == service_refresh_users(dcb->service))
{
auth_ret = combined_auth_check(dcb, client_data->auth_token, client_data->auth_token_len, protocol,
client_data->user, client_data->client_sha1, client_data->db);
}
/* on successful authentication, set user into dcb field */
if (MYSQL_AUTH_SUCCEEDED == auth_ret)
if (MXS_AUTH_SUCCEEDED == auth_ret)
{
dcb->user = MXS_STRDUP_A(client_data->user);
}
@ -246,7 +247,7 @@ mysql_auth_set_protocol_data(DCB *dcb, GWBUF *buf)
{
if (NULL == (client_data = (MYSQL_session *)MXS_CALLOC(1, sizeof(MYSQL_session))))
{
return MYSQL_FAILED_AUTH;
return MXS_AUTH_FAILED;
}
#if defined(SS_DEBUG)
client_data->myses_chk_top = CHK_NUM_MYSQLSES;
@ -278,7 +279,7 @@ mysql_auth_set_protocol_data(DCB *dcb, GWBUF *buf)
if (client_auth_packet_size < (4 + 4 + 4 + 1 + 23))
{
/* Packet is not big enough */
return MYSQL_FAILED_AUTH;
return MXS_AUTH_FAILED;
}
return mysql_auth_set_client_data(client_data, protocol, buf);
@ -338,7 +339,7 @@ mysql_auth_set_client_data(
else
{
/* Packet has incomplete or too long username */
return MYSQL_FAILED_AUTH;
return MXS_AUTH_FAILED;
}
if (client_auth_packet_size > (auth_packet_base_size + user_length + 1))
{
@ -363,13 +364,13 @@ mysql_auth_set_client_data(
else
{
/* Failed to allocate space for authentication token string */
return MYSQL_FAILED_AUTH;
return MXS_AUTH_FAILED;
}
}
else
{
/* Packet was too small to contain authentication token */
return MYSQL_FAILED_AUTH;
return MXS_AUTH_FAILED;
}
packet_length_used += 1 + client_data->auth_token_len;
/*
@ -391,12 +392,12 @@ mysql_auth_set_client_data(
{
/* Packet is too short to contain database string */
/* or database string in packet is too long */
return MYSQL_FAILED_AUTH;
return MXS_AUTH_FAILED;
}
}
}
}
return MYSQL_AUTH_SUCCEEDED;
return MXS_AUTH_SUCCEEDED;
}
/**
@ -614,7 +615,7 @@ gw_check_mysql_scramble_data(DCB *dcb,
if ((username == NULL) || (mxs_scramble == NULL) || (stage1_hash == NULL))
{
return MYSQL_FAILED_AUTH;
return MXS_AUTH_FAILED;
}
/*<
@ -632,7 +633,7 @@ gw_check_mysql_scramble_data(DCB *dcb,
memcpy(stage1_hash, (char *)"_", 1);
}
return MYSQL_FAILED_AUTH;
return MXS_AUTH_FAILED;
}
if (token && token_len)
@ -648,7 +649,7 @@ gw_check_mysql_scramble_data(DCB *dcb,
{
/* check if the password is not set in the user table */
return memcmp(password, null_client_sha1, MYSQL_SCRAMBLE_LEN) ?
MYSQL_FAILED_AUTH : MYSQL_AUTH_SUCCEEDED;
MXS_AUTH_FAILED : MXS_AUTH_SUCCEEDED;
}
/*<
@ -703,7 +704,7 @@ gw_check_mysql_scramble_data(DCB *dcb,
/* now compare SHA1(SHA1(gateway_password)) and check_hash: return 0 is MYSQL_AUTH_OK */
return (0 == memcmp(password, check_hash, SHA_DIGEST_LENGTH)) ?
MYSQL_AUTH_SUCCEEDED : MYSQL_FAILED_AUTH;
MXS_AUTH_SUCCEEDED : MXS_AUTH_FAILED;
}
/**
@ -745,14 +746,14 @@ check_db_name_after_auth(DCB *dcb, char *database, int auth_ret)
db_exists = -1;
}
if (db_exists == 0 && auth_ret == MYSQL_AUTH_SUCCEEDED)
if (db_exists == 0 && auth_ret == MXS_AUTH_SUCCEEDED)
{
auth_ret = MYSQL_FAILED_AUTH_DB;
auth_ret = MXS_AUTH_FAILED_DB;
}
if (db_exists < 0 && auth_ret == MYSQL_AUTH_SUCCEEDED)
if (db_exists < 0 && auth_ret == MXS_AUTH_SUCCEEDED)
{
auth_ret = MYSQL_FAILED_AUTH;
auth_ret = MXS_AUTH_FAILED;
}
}
@ -829,7 +830,7 @@ mysql_auth_free_client_data(DCB *dcb)
*/
static int mysql_auth_load_users(SERV_LISTENER *port)
{
int rc = AUTH_LOADUSERS_OK;
int rc = MXS_AUTH_LOADUSERS_OK;
SERVICE *service = port->listener->service;
int loaded = replace_mysql_users(port);
@ -846,7 +847,7 @@ static int mysql_auth_load_users(SERV_LISTENER *port)
if ((loaded = dbusers_load(port->users, path)) == -1)
{
MXS_ERROR("[%s] Failed to load cached users from '%s'.", service->name, path);;
rc = AUTH_LOADUSERS_ERROR;
rc = MXS_AUTH_LOADUSERS_ERROR;
}
else
{

View File

@ -62,7 +62,8 @@ static GWAUTHENTICATOR MyObject =
null_auth_is_client_ssl_capable, /* Check if client supports SSL */
null_auth_authenticate, /* Authenticate user credentials */
null_auth_free_client_data, /* Free the client data held in DCB */
users_default_loadusers
users_default_loadusers,
NULL
};
/**

View File

@ -62,7 +62,8 @@ static GWAUTHENTICATOR MyObject =
null_auth_is_client_ssl_capable, /* Check if client supports SSL */
null_auth_authenticate, /* Authenticate user credentials */
null_auth_free_client_data, /* Free the client data held in DCB */
users_default_loadusers
users_default_loadusers,
NULL
};
/**

View File

@ -17,23 +17,13 @@
#include <log_manager.h>
#include <modinfo.h>
#include <modutil.h>
#include <mysql_utils.h>
#include <query_classifier.h>
#include "cache.h"
#include "storage.h"
static char VERSION_STRING[] = "V1.0.0";
typedef enum cache_references
{
CACHE_REFERENCES_ANY,
CACHE_REFERENCES_QUALIFIED
} cache_references_t;
#define DEFAULT_ALLOWED_REFERENCES CACHE_REFERENCES_QUALIFIED
// Bytes
#define DEFAULT_MAX_RESULTSET_SIZE 64 * 1024
// Seconds
#define DEFAULT_TTL 10
static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **);
static void *newSession(FILTER *instance, SESSION *session);
static void closeSession(FILTER *instance, void *sdata);
@ -101,12 +91,23 @@ FILTER_OBJECT *GetModuleObject()
typedef struct cache_config
{
cache_references_t allowed_references;
uint32_t max_resultset_rows;
uint32_t max_resultset_size;
const char *storage;
const char *storage_args;
uint32_t ttl;
} CACHE_CONFIG;
static const CACHE_CONFIG DEFAULT_CONFIG =
{
CACHE_DEFAULT_ALLOWED_REFERENCES,
CACHE_DEFAULT_MAX_RESULTSET_ROWS,
CACHE_DEFAULT_MAX_RESULTSET_SIZE,
NULL,
NULL,
CACHE_DEFAULT_TTL
};
typedef struct cache_instance
{
const char *name;
@ -115,31 +116,59 @@ typedef struct cache_instance
CACHE_STORAGE *storage;
} CACHE_INSTANCE;
static const CACHE_CONFIG DEFAULT_CONFIG =
typedef enum cache_session_state
{
DEFAULT_ALLOWED_REFERENCES,
DEFAULT_MAX_RESULTSET_SIZE,
NULL,
NULL,
DEFAULT_TTL
};
CACHE_EXPECTING_RESPONSE, // A select has been sent, and we are waiting for the response.
CACHE_EXPECTING_FIELDS, // A select has been sent, and we want more fields.
CACHE_EXPECTING_ROWS, // A select has been sent, and we want more rows.
CACHE_EXPECTING_NOTHING, // We are not expecting anything from the server.
CACHE_IGNORING_RESPONSE, // We are not interested in the data received from the server.
} cache_session_state_t;
typedef struct cache_request_state
{
GWBUF* data; /**< Request data, possibly incomplete. */
} CACHE_REQUEST_STATE;
typedef struct cache_response_state
{
GWBUF* data; /**< Response data, possibly incomplete. */
size_t n_totalfields; /**< The number of fields a resultset contains. */
size_t n_fields; /**< How many fields we have received, <= n_totalfields. */
size_t n_rows; /**< How many rows we have received. */
size_t offset; /**< Where we are in the response buffer. */
} CACHE_RESPONSE_STATE;
static void cache_response_state_reset(CACHE_RESPONSE_STATE *state);
typedef struct cache_session_data
{
CACHE_STORAGE_API *api; /**< The storage API to be used. */
CACHE_STORAGE *storage; /**< The storage to be used with this session data. */
DOWNSTREAM down; /**< The previous filter or equivalent. */
UPSTREAM up; /**< The next filter or equivalent. */
GWBUF *packets; /**< A possible incomplete packet. */
SESSION *session; /**< The session this data is associated with. */
char key[CACHE_KEY_MAXLEN]; /**< Key storage. */
char *used_key; /**< A key if one is ued. */
CACHE_INSTANCE *instance; /**< The cache instance the session is associated with. */
CACHE_STORAGE_API *api; /**< The storage API to be used. */
CACHE_STORAGE *storage; /**< The storage to be used with this session data. */
DOWNSTREAM down; /**< The previous filter or equivalent. */
UPSTREAM up; /**< The next filter or equivalent. */
CACHE_REQUEST_STATE req; /**< The request state. */
CACHE_RESPONSE_STATE res; /**< The response state. */
SESSION *session; /**< The session this data is associated with. */
char key[CACHE_KEY_MAXLEN]; /**< Key storage. */
cache_session_state_t state;
} CACHE_SESSION_DATA;
static bool route_using_cache(CACHE_INSTANCE *instance,
CACHE_SESSION_DATA *sdata,
const GWBUF *key,
GWBUF **value);
static CACHE_SESSION_DATA *cache_session_data_create(CACHE_INSTANCE *instance, SESSION *session);
static void cache_session_data_free(CACHE_SESSION_DATA *data);
static int handle_expecting_fields(CACHE_SESSION_DATA *csdata);
static int handle_expecting_nothing(CACHE_SESSION_DATA *csdata);
static int handle_expecting_response(CACHE_SESSION_DATA *csdata);
static int handle_expecting_rows(CACHE_SESSION_DATA *csdata);
static int handle_ignoring_response(CACHE_SESSION_DATA *csdata);
static bool route_using_cache(CACHE_SESSION_DATA *sdata, const GWBUF *key, GWBUF **value);
static int send_upstream(CACHE_SESSION_DATA *csdata);
static void store_result(CACHE_SESSION_DATA *csdata);
//
// API BEGIN
@ -181,13 +210,26 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
error = true;
}
}
else if (strcmp(param->name, "max_resultset_rows") == 0)
{
int v = atoi(param->value);
if (v > 0)
{
config.max_resultset_rows = v;
}
else
{
config.max_resultset_rows = CACHE_DEFAULT_MAX_RESULTSET_ROWS;
}
}
else if (strcmp(param->name, "max_resultset_size") == 0)
{
int v = atoi(param->value);
if (v > 0)
{
config.max_resultset_size = v;
config.max_resultset_size = v * 1024;
}
else
{
@ -282,14 +324,7 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
static void *newSession(FILTER *instance, SESSION *session)
{
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)MXS_CALLOC(1, sizeof(CACHE_SESSION_DATA));
if (csdata)
{
csdata->api = cinstance->module->api;
csdata->storage = cinstance->storage;
csdata->session = session;
}
CACHE_SESSION_DATA *csdata = cache_session_data_create(cinstance, session);
return csdata;
}
@ -317,7 +352,7 @@ static void freeSession(FILTER *instance, void *sdata)
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
MXS_FREE(csdata);
cache_session_data_free(csdata);
}
/**
@ -357,91 +392,73 @@ static void setUpstream(FILTER *instance, void *sdata, UPSTREAM *up)
* @param sdata The filter session data
* @param packets The query data
*/
static int routeQuery(FILTER *instance, void *sdata, GWBUF *packets)
static int routeQuery(FILTER *instance, void *sdata, GWBUF *data)
{
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
if (csdata->packets)
if (csdata->req.data)
{
C_DEBUG("Old packets exist.");
gwbuf_append(csdata->packets, packets);
gwbuf_append(csdata->req.data, data);
}
else
{
C_DEBUG("NO old packets exist.");
csdata->packets = packets;
csdata->req.data = data;
}
packets = modutil_get_complete_packets(&csdata->packets);
GWBUF *packet = modutil_get_next_MySQL_packet(&csdata->req.data);
int rv;
if (packets)
if (packet)
{
C_DEBUG("At least one complete packet exist.");
GWBUF *packet;
bool use_default = true;
// TODO: Is it really possible to get more that one packet
// TODO: is this loop? If so, can those packets be sent
// TODO: after one and other, or do we need to wait for
// TODO: a replies? If there are more complete packets
// TODO: than one, then either CACHE_SESSION_DATA::key
// TODO: needs to be a queue
cache_response_state_reset(&csdata->res);
csdata->state = CACHE_IGNORING_RESPONSE;
// TODO: modutil_get_next_MySQL_packet *copies* the data.
while ((packet = modutil_get_next_MySQL_packet(&packets)))
// TODO: This returns the wrong result if GWBUF_LENGTH(packet) is < 5.
if (modutil_is_SQL(packet))
{
C_DEBUG("Processing packet.");
bool use_default = true;
packet = gwbuf_make_contiguous(packet);
if (modutil_is_SQL(packet))
// We do not care whether the query was fully parsed or not.
// If a query cannot be fully parsed, the worst thing that can
// happen is that caching is not used, even though it would be
// possible.
if (qc_get_operation(packet) == QUERY_OP_SELECT)
{
C_DEBUG("Is SQL.");
// We do not care whether the query was fully parsed or not.
// If a query cannot be fully parsed, the worst thing that can
// happen is that caching is not used, even though it would be
// possible.
GWBUF *result;
use_default = !route_using_cache(csdata, packet, &result);
if (qc_get_operation(packet) == QUERY_OP_SELECT)
if (use_default)
{
C_DEBUG("Is a SELECT");
GWBUF *result;
use_default = !route_using_cache(cinstance, csdata, packet, &result);
if (!use_default)
{
C_DEBUG("Using data from cache.");
gwbuf_free(packet);
DCB *dcb = csdata->session->client_dcb;
// TODO: This is not ok. Any filters before this filter, will not
// TODO: see this data.
rv = dcb->func.write(dcb, result);
}
csdata->state = CACHE_EXPECTING_RESPONSE;
}
else
{
C_DEBUG("Is NOT a SELECT");
csdata->state = CACHE_EXPECTING_NOTHING;
C_DEBUG("Using data from cache.");
gwbuf_free(packet);
DCB *dcb = csdata->session->client_dcb;
// TODO: This is not ok. Any filters before this filter, will not
// TODO: see this data.
rv = dcb->func.write(dcb, result);
}
}
else
{
C_DEBUG("Is NOT SQL.");
}
}
if (use_default)
{
C_DEBUG("Using default processing.");
rv = csdata->down.routeQuery(csdata->down.instance, csdata->down.session, packet);
}
if (use_default)
{
C_DEBUG("Using default processing.");
rv = csdata->down.routeQuery(csdata->down.instance, csdata->down.session, packet);
}
}
else
{
C_DEBUG("Not even one complete packet exist; more data needed.");
// Ok, we need more data before we can do something.
// We need more data before we can do something.
rv = 1;
}
@ -455,33 +472,66 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *packets)
* @param sdata The filter session data
* @param queue The query data
*/
static int clientReply(FILTER *instance, void *sdata, GWBUF *queue)
static int clientReply(FILTER *instance, void *sdata, GWBUF *data)
{
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
// TODO: queue can be put to the cache only if it is a complete
// TODO: response. If it isn't, then we need to stash it and wait
// TODO: we get a complete response.
// TODO: Since we will know from the first queue how big the
// TODO: entire response will be, this is also where we can decide
// TODO: that something is too large to cache. If it is, an existing
// TODO: item must be deleted.
int rv;
if (csdata->used_key)
if (csdata->res.data)
{
C_DEBUG("Key available, storing result.");
gwbuf_append(csdata->res.data, data);
}
else
{
csdata->res.data = data;
}
cache_result_t result = csdata->api->putValue(csdata->storage, csdata->used_key, queue);
csdata->used_key = NULL;
if (result != CACHE_RESULT_OK)
if (csdata->state != CACHE_IGNORING_RESPONSE)
{
if (gwbuf_length(csdata->res.data) > csdata->instance->config.max_resultset_size)
{
MXS_ERROR("Could not store cache item.");
C_DEBUG("Current size %uB of resultset, at least as much "
"as maximum allowed size %uKiB. Not caching.",
gwbuf_length(csdata->res.data),
csdata->instance->config.max_resultset_size / 1024);
csdata->state = CACHE_IGNORING_RESPONSE;
}
}
return csdata->up.clientReply(csdata->up.instance, csdata->up.session, queue);
switch (csdata->state)
{
case CACHE_EXPECTING_FIELDS:
rv = handle_expecting_fields(csdata);
break;
case CACHE_EXPECTING_NOTHING:
rv = handle_expecting_nothing(csdata);
break;
case CACHE_EXPECTING_RESPONSE:
rv = handle_expecting_response(csdata);
break;
case CACHE_EXPECTING_ROWS:
rv = handle_expecting_rows(csdata);
break;
case CACHE_IGNORING_RESPONSE:
rv = handle_ignoring_response(csdata);
break;
default:
MXS_ERROR("Internal cache logic broken, unexpected state: %d", csdata->state);
ss_dassert(!true);
rv = send_upstream(csdata);
cache_response_state_reset(&csdata->res);
csdata->state = CACHE_IGNORING_RESPONSE;
}
return rv;
}
/**
@ -506,47 +556,341 @@ static void diagnostics(FILTER *instance, void *sdata, DCB *dcb)
// API END
//
/**
* Reset cache response state
*
* @param state Pointer to object.
*/
static void cache_response_state_reset(CACHE_RESPONSE_STATE *state)
{
state->data = NULL;
state->n_totalfields = 0;
state->n_fields = 0;
state->n_rows = 0;
state->offset = 0;
}
/**
* Create cache session data
*
* @param instance The cache instance this data is associated with.
*
* @return Session data or NULL if creation fails.
*/
static CACHE_SESSION_DATA *cache_session_data_create(CACHE_INSTANCE *instance,
SESSION* session)
{
CACHE_SESSION_DATA *data = (CACHE_SESSION_DATA*)MXS_CALLOC(1, sizeof(CACHE_SESSION_DATA));
if (data)
{
data->instance = instance;
data->api = instance->module->api;
data->storage = instance->storage;
data->session = session;
data->state = CACHE_EXPECTING_NOTHING;
}
return data;
}
/**
* Free cache session data.
*
* @param A cache session data previously allocated using session_data_create().
*/
static void cache_session_data_free(CACHE_SESSION_DATA* data)
{
if (data)
{
MXS_FREE(data);
}
}
/**
* Called when resultset field information is handled.
*
* @param csdata The cache session data.
*/
static int handle_expecting_fields(CACHE_SESSION_DATA *csdata)
{
ss_dassert(csdata->state == CACHE_EXPECTING_FIELDS);
ss_dassert(csdata->res.data);
int rv = 1;
bool insufficient = false;
size_t buflen = gwbuf_length(csdata->res.data);
while (!insufficient && (buflen - csdata->res.offset >= MYSQL_HEADER_LEN))
{
uint8_t header[MYSQL_HEADER_LEN + 1];
gwbuf_copy_data(csdata->res.data, csdata->res.offset, MYSQL_HEADER_LEN + 1, header);
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PACKET_LEN(header);
if (csdata->res.offset + packetlen <= buflen)
{
// We have at least one complete packet.
int command = (int)MYSQL_GET_COMMAND(header);
switch (command)
{
case 0xfe: // EOF, the one after the fields.
csdata->res.offset += packetlen;
csdata->state = CACHE_EXPECTING_ROWS;
rv = handle_expecting_rows(csdata);
break;
default: // Field information.
csdata->res.offset += packetlen;
++csdata->res.n_fields;
ss_dassert(csdata->res.n_fields <= csdata->res.n_totalfields);
break;
}
}
else
{
// We need more data
insufficient = true;
}
}
return rv;
}
/**
* Called when data is received (even if nothing is expected) from the server.
*
* @param csdata The cache session data.
*/
static int handle_expecting_nothing(CACHE_SESSION_DATA *csdata)
{
ss_dassert(csdata->state == CACHE_EXPECTING_NOTHING);
ss_dassert(csdata->res.data);
MXS_ERROR("Received data from the backend althoug we were expecting nothing.");
ss_dassert(!true);
return send_upstream(csdata);
}
/**
* Called when a response is received from the server.
*
* @param csdata The cache session data.
*/
static int handle_expecting_response(CACHE_SESSION_DATA *csdata)
{
ss_dassert(csdata->state == CACHE_EXPECTING_RESPONSE);
ss_dassert(csdata->res.data);
int rv = 1;
size_t buflen = gwbuf_length(csdata->res.data);
if (buflen >= MYSQL_HEADER_LEN + 1) // We need the command byte.
{
// Reserve enough space to accomodate for the largest length encoded integer,
// which is type field + 8 bytes.
uint8_t header[MYSQL_HEADER_LEN + 1 + 8];
gwbuf_copy_data(csdata->res.data, 0, MYSQL_HEADER_LEN + 1, header);
switch ((int)MYSQL_GET_COMMAND(header))
{
case 0x00: // OK
case 0xff: // ERR
C_DEBUG("OK or ERR");
store_result(csdata);
rv = send_upstream(csdata);
csdata->state = CACHE_EXPECTING_NOTHING;
break;
case 0xfb: // GET_MORE_CLIENT_DATA/SEND_MORE_CLIENT_DATA
C_DEBUG("GET_MORE_CLIENT_DATA");
rv = send_upstream(csdata);
csdata->state = CACHE_IGNORING_RESPONSE;
break;
default:
C_DEBUG("RESULTSET");
if (csdata->res.n_totalfields != 0)
{
// We've seen the header and have figured out how many fields there are.
csdata->state = CACHE_EXPECTING_FIELDS;
rv = handle_expecting_fields(csdata);
}
else
{
// leint_bytes() returns the length of the int type field + the size of the
// integer.
size_t n_bytes = leint_bytes(&header[4]);
if (MYSQL_HEADER_LEN + n_bytes <= buflen)
{
// Now we can figure out how many fields there are, but first we
// need to copy some more data.
gwbuf_copy_data(csdata->res.data,
MYSQL_HEADER_LEN + 1, n_bytes - 1, &header[MYSQL_HEADER_LEN + 1]);
csdata->res.n_totalfields = leint_value(&header[4]);
csdata->res.offset = MYSQL_HEADER_LEN + n_bytes;
csdata->state = CACHE_EXPECTING_FIELDS;
rv = handle_expecting_fields(csdata);
}
else
{
// We need more data. We will be called again, when data is available.
}
}
break;
}
}
return rv;
}
/**
* Called when resultset rows are handled.
*
* @param csdata The cache session data.
*/
static int handle_expecting_rows(CACHE_SESSION_DATA *csdata)
{
ss_dassert(csdata->state == CACHE_EXPECTING_ROWS);
ss_dassert(csdata->res.data);
int rv = 1;
bool insufficient = false;
size_t buflen = gwbuf_length(csdata->res.data);
while (!insufficient && (buflen - csdata->res.offset >= MYSQL_HEADER_LEN))
{
uint8_t header[MYSQL_HEADER_LEN + 1];
gwbuf_copy_data(csdata->res.data, csdata->res.offset, MYSQL_HEADER_LEN + 1, header);
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PACKET_LEN(header);
if (csdata->res.offset + packetlen <= buflen)
{
// We have at least one complete packet.
int command = (int)MYSQL_GET_COMMAND(header);
switch (command)
{
case 0xfe: // EOF, the one after the rows.
csdata->res.offset += packetlen;
ss_dassert(csdata->res.offset == buflen);
store_result(csdata);
rv = send_upstream(csdata);
csdata->state = CACHE_EXPECTING_NOTHING;
break;
case 0xfb: // NULL
default: // length-encoded-string
csdata->res.offset += packetlen;
++csdata->res.n_rows;
if (csdata->res.n_rows > csdata->instance->config.max_resultset_rows)
{
C_DEBUG("Max rows %lu reached, not caching result.", csdata->res.n_rows);
rv = send_upstream(csdata);
csdata->res.offset = buflen; // To abort the loop.
csdata->state = CACHE_IGNORING_RESPONSE;
}
break;
}
}
else
{
// We need more data
insufficient = true;
}
}
return rv;
}
/**
* Called when all data from the server is ignored.
*
* @param csdata The cache session data.
*/
static int handle_ignoring_response(CACHE_SESSION_DATA *csdata)
{
ss_dassert(csdata->state == CACHE_IGNORING_RESPONSE);
ss_dassert(csdata->res.data);
return send_upstream(csdata);
}
/**
* Route a query via the cache.
*
* @param instance The filter instance.
* @param sdata Session data
* @param csdata Session data
* @param key A SELECT packet.
* @param value The result.
* @return True if the query was satisfied from the query.
*/
static bool route_using_cache(CACHE_INSTANCE *instance,
CACHE_SESSION_DATA *csdata,
static bool route_using_cache(CACHE_SESSION_DATA *csdata,
const GWBUF *query,
GWBUF **value)
{
// TODO: This works *only* if only one request/response is handled at a time.
// TODO: Is that the case, or is it not?
cache_result_t result = csdata->api->getKey(csdata->storage, query, csdata->key);
if (result == CACHE_RESULT_OK)
{
result = csdata->api->getValue(csdata->storage, csdata->key, value);
switch (result)
{
case CACHE_RESULT_OK:
csdata->used_key = NULL;
break;
default:
MXS_ERROR("Could not get value from cache storage.");
case CACHE_RESULT_NOT_FOUND:
csdata->used_key = csdata->key;
break;
}
}
else
{
MXS_ERROR("Could not create cache key.");
csdata->used_key = NULL;
}
return result == CACHE_RESULT_OK;
}
/**
* Send data upstream.
*
* @param csdata Session data
*
* @return Whatever the upstream returns.
*/
static int send_upstream(CACHE_SESSION_DATA *csdata)
{
ss_dassert(csdata->res.data != NULL);
int rv = csdata->up.clientReply(csdata->up.instance, csdata->up.session, csdata->res.data);
csdata->res.data = NULL;
return rv;
}
/**
* Store the data.
*
* @param csdata Session data
*/
static void store_result(CACHE_SESSION_DATA *csdata)
{
ss_dassert(csdata->res.data);
csdata->res.data = gwbuf_make_contiguous(csdata->res.data);
cache_result_t result = csdata->api->putValue(csdata->storage,
csdata->key,
csdata->res.data);
if (result != CACHE_RESULT_OK)
{
MXS_ERROR("Could not store cache item.");
}
}

33
server/modules/filter/cache/cache.h vendored Normal file
View File

@ -0,0 +1,33 @@
#ifndef CACHE_H
#define CACHE_H
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <limits.h>
typedef enum cache_references
{
CACHE_REFERENCES_ANY, // select * from tbl;
CACHE_REFERENCES_QUALIFIED // select * from db.tbl;
} cache_references_t;
#define CACHE_DEFAULT_ALLOWED_REFERENCES CACHE_REFERENCES_QUALIFIED
// Count
#define CACHE_DEFAULT_MAX_RESULTSET_ROWS UINT_MAX
// Bytes
#define CACHE_DEFAULT_MAX_RESULTSET_SIZE 64 * 1024
// Seconds
#define CACHE_DEFAULT_TTL 10
#endif

View File

@ -168,6 +168,7 @@ cache_result_t RocksDBStorage::getValue(const char* pKey, GWBUF** ppResult)
else
{
MXS_NOTICE("Cache item is stale, not using.");
result = CACHE_RESULT_NOT_FOUND;
}
}
else
@ -190,7 +191,7 @@ cache_result_t RocksDBStorage::getValue(const char* pKey, GWBUF** ppResult)
cache_result_t RocksDBStorage::putValue(const char* pKey, const GWBUF* pValue)
{
// ss_dassert(gwbuf_is_contiguous(pValue));
ss_dassert(GWBUF_IS_CONTIGUOUS(pValue));
rocksdb::Slice key(pKey, ROCKSDB_KEY_LENGTH);
rocksdb::Slice value(static_cast<const char*>(GWBUF_DATA(pValue)), GWBUF_LENGTH(pValue));

View File

@ -25,6 +25,8 @@ bool initialize()
CACHE_STORAGE* createInstance(const char* zName, uint32_t ttl, int argc, char* argv[])
{
ss_dassert(zName);
CACHE_STORAGE* pStorage = 0;
try
@ -56,6 +58,10 @@ cache_result_t getKey(CACHE_STORAGE* pStorage,
const GWBUF* pQuery,
char* pKey)
{
ss_dassert(pStorage);
ss_dassert(pQuery);
ss_dassert(pKey);
cache_result_t result = CACHE_RESULT_ERROR;
try
@ -80,6 +86,10 @@ cache_result_t getKey(CACHE_STORAGE* pStorage,
cache_result_t getValue(CACHE_STORAGE* pStorage, const char* pKey, GWBUF** ppResult)
{
ss_dassert(pStorage);
ss_dassert(pKey);
ss_dassert(ppResult);
cache_result_t result = CACHE_RESULT_ERROR;
try
@ -106,6 +116,10 @@ cache_result_t putValue(CACHE_STORAGE* pStorage,
const char* pKey,
const GWBUF* pValue)
{
ss_dassert(pStorage);
ss_dassert(pKey);
ss_dassert(pValue);
cache_result_t result = CACHE_RESULT_ERROR;
try

View File

@ -79,6 +79,8 @@
#define GW_MYSQL_SCRAMBLE_SIZE 20
#define GW_SCRAMBLE_LENGTH_323 8
#define DEFAULT_AUTH_PLUGIN_NAME "mysql_native_password"
/** Maximum length of a MySQL packet */
#define MYSQL_PACKET_LENGTH_MAX 0x00ffffff
@ -95,13 +97,6 @@
#define COM_QUIT_PACKET_SIZE (4+1)
struct dcb;
#define MYSQL_AUTH_SUCCEEDED 0
#define MYSQL_FAILED_AUTH 1
#define MYSQL_FAILED_AUTH_DB 2
#define MYSQL_FAILED_AUTH_SSL 3
#define MYSQL_AUTH_SSL_INCOMPLETE 4
#define MYSQL_AUTH_NO_SESSION 5
typedef enum
{
MYSQL_ALLOC, /* Initial state of protocol auth state */

View File

@ -0,0 +1,142 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: rwsplit_internal.h
* Author: mbrampton
*
* Created on 08 August 2016, 11:54
*/
#ifndef RWSPLIT_INTERNAL_H
#define RWSPLIT_INTERNAL_H
#ifdef __cplusplus
extern "C" {
#endif
#include <query_classifier.h>
/* This needs to be removed along with dependency on it - see the
* rwsplit_tmp_table_multi functions
*/
#include <mysql_client_server_protocol.h>
/*
* The following are implemented in rwsplit_mysql.c
*/
bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf);
void closed_session_reply(GWBUF *querybuf);
void live_session_reply(GWBUF **querybuf, ROUTER_CLIENT_SES *rses);
void print_error_packet(ROUTER_CLIENT_SES *rses, GWBUF *buf, DCB *dcb);
void check_session_command_reply(GWBUF *writebuf, sescmd_cursor_t *scur, backend_ref_t *bref);
bool execute_sescmd_in_backend(backend_ref_t *backend_ref);
bool handle_target_is_all(route_target_t route_target,
ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, int packet_type, qc_query_type_t qtype);
int determine_packet_type(GWBUF *querybuf, bool *non_empty_packet);
void log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t qtype);
void session_lock_failure_handling(GWBUF *querybuf, int packet_type, qc_query_type_t qtype);
bool is_packet_a_one_way_message(int packet_type);
sescmd_cursor_t *backend_ref_get_sescmd_cursor(backend_ref_t *bref);
bool is_packet_a_query(int packet_type);
bool send_readonly_error(DCB *dcb);
/*
* The following are implemented in readwritesplit.c
*/
bool rses_begin_locked_router_action(ROUTER_CLIENT_SES *rses);
void rses_end_locked_router_action(ROUTER_CLIENT_SES *rses);
void bref_clear_state(backend_ref_t *bref, bref_state_t state);
void bref_set_state(backend_ref_t *bref, bref_state_t state);
int router_handle_state_switch(DCB *dcb, DCB_REASON reason, void *data);
backend_ref_t *get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb);
void rses_property_done(rses_property_t *prop);
int rses_get_max_slavecount(ROUTER_CLIENT_SES *rses, int router_nservers);
int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses);
/*
* The following are implemented in rwsplit_route_stmt.c
*/
bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf);
int rwsplit_hashkeyfun(const void *key);
int rwsplit_hashcmpfun(const void *v1, const void *v2);
void *rwsplit_hstrdup(const void *fval);
void rwsplit_hfree(void *fval);
bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
char *name, int max_rlag);
route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
qc_query_type_t qtype, HINT *hint);
rses_property_t *rses_property_init(rses_property_type_t prop_type);
int rses_property_add(ROUTER_CLIENT_SES *rses, rses_property_t *prop);
void handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
int packet_type, int *qtype);
bool handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
route_target_t route_target, DCB **target_dcb);
bool handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
DCB **target_dcb);
bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
DCB **target_dcb);
bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, DCB *target_dcb);
bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf, ROUTER_INSTANCE *inst,
int packet_type,
qc_query_type_t qtype);
/*
* The following are implemented in rwsplit_session_cmd.c
*/
mysql_sescmd_t *rses_property_get_sescmd(rses_property_t *prop);
mysql_sescmd_t *mysql_sescmd_init(rses_property_t *rses_prop,
GWBUF *sescmd_buf,
unsigned char packet_type,
ROUTER_CLIENT_SES *rses);
void mysql_sescmd_done(mysql_sescmd_t *sescmd);
mysql_sescmd_t *sescmd_cursor_get_command(sescmd_cursor_t *scur);
bool sescmd_cursor_is_active(sescmd_cursor_t *sescmd_cursor);
void sescmd_cursor_set_active(sescmd_cursor_t *sescmd_cursor,
bool value);
bool execute_sescmd_history(backend_ref_t *bref);
GWBUF *sescmd_cursor_clone_querybuf(sescmd_cursor_t *scur);
GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
backend_ref_t *bref,
bool *reconnect);
/*
* The following are implemented in rwsplit_select_backends.c
*/
bool select_connect_backend_servers(backend_ref_t **p_master_ref,
backend_ref_t *backend_ref,
int router_nservers, int max_nslaves,
int max_rlag,
select_criteria_t select_criteria,
SESSION *session,
ROUTER_INSTANCE *router);
/*
* The following are implemented in rwsplit_tmp_table_multi.c
*/
void check_drop_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf,
mysql_server_cmd_t packet_type);
qc_query_type_t is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf,
qc_query_type_t type);
void check_create_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf, qc_query_type_t type);
bool check_for_multi_stmt(GWBUF *buf, void *protocol, mysql_server_cmd_t packet_type);
qc_query_type_t determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet);
#ifdef __cplusplus
}
#endif
#endif /* RWSPLIT_INTERNAL_H */

View File

@ -1127,10 +1127,6 @@ monitorMain(void *arg)
{
dcb_hangup_foreach(ptr->server);
}
}
if (mon_status_changed(ptr))
@ -1225,6 +1221,8 @@ monitorMain(void *arg)
ptr = mon->databases;
while (ptr)
{
MYSQL_SERVER_INFO *serv_info = hashtable_fetch(handle->server_info, ptr->server->unique_name);
ss_dassert(serv_info);
if (!SERVER_IN_MAINT(ptr->server))
{
/** If "detect_stale_master" option is On, let's use the previous master.
@ -1287,6 +1285,10 @@ monitorMain(void *arg)
{
ptr->pending_status |= SERVER_SLAVE;
}
else if (root_master == NULL && serv_info->slave_configured)
{
ptr->pending_status |= SERVER_SLAVE;
}
}
ptr->server->status = ptr->pending_status;
@ -1433,6 +1435,8 @@ static void set_master_heartbeat(MYSQL_MONITOR *handle, MONITOR_SERVERS *databas
time_t purge_time;
char heartbeat_insert_query[512] = "";
char heartbeat_purge_query[512] = "";
MYSQL_RES *result;
long returned_rows;
if (handle->master == NULL)
{
@ -1440,28 +1444,43 @@ static void set_master_heartbeat(MYSQL_MONITOR *handle, MONITOR_SERVERS *databas
return;
}
/* create the maxscale_schema database */
if (mysql_query(database->con, "CREATE DATABASE IF NOT EXISTS maxscale_schema"))
/* check if the maxscale_schema database and replication_heartbeat table exist */
if (mysql_query(database->con, "SELECT table_name FROM information_schema.tables "
"WHERE table_schema = 'maxscale_schema' AND table_name = 'replication_heartbeat'"))
{
MXS_ERROR("[mysql_mon]: Error creating maxscale_schema database in Master server"
": %s", mysql_error(database->con));
MXS_ERROR( "[mysql_mon]: Error checking for replication_heartbeat in Master server"
": %s", mysql_error(database->con));
database->server->rlag = -1;
}
/* create repl_heartbeat table in maxscale_schema database */
if (mysql_query(database->con, "CREATE TABLE IF NOT EXISTS "
"maxscale_schema.replication_heartbeat "
"(maxscale_id INT NOT NULL, "
"master_server_id INT NOT NULL, "
"master_timestamp INT UNSIGNED NOT NULL, "
"PRIMARY KEY ( master_server_id, maxscale_id ) ) "
"ENGINE=MYISAM DEFAULT CHARSET=latin1"))
{
MXS_ERROR("[mysql_mon]: Error creating maxscale_schema.replication_heartbeat "
"table in Master server: %s", mysql_error(database->con));
result = mysql_store_result(database->con);
database->server->rlag = -1;
if (result == NULL)
{
returned_rows = 0;
}
else
{
returned_rows = mysql_num_rows(result);
mysql_free_result(result);
}
if (0 == returned_rows)
{
/* create repl_heartbeat table in maxscale_schema database */
if (mysql_query(database->con, "CREATE TABLE IF NOT EXISTS "
"maxscale_schema.replication_heartbeat "
"(maxscale_id INT NOT NULL, "
"master_server_id INT NOT NULL, "
"master_timestamp INT UNSIGNED NOT NULL, "
"PRIMARY KEY ( master_server_id, maxscale_id ) ) "
"ENGINE=MYISAM DEFAULT CHARSET=latin1"))
{
MXS_ERROR("[mysql_mon]: Error creating maxscale_schema.replication_heartbeat "
"table in Master server: %s", mysql_error(database->con));
database->server->rlag = -1;
}
}
/* auto purge old values after 48 hours*/

View File

@ -1185,48 +1185,38 @@ static int gw_write_backend_event(DCB *dcb)
*/
if (dcb->state != DCB_STATE_POLLING)
{
uint8_t* data;
uint8_t* data = NULL;
bool com_quit = false;
if (dcb->writeq != NULL)
spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq)
{
data = (uint8_t *) GWBUF_DATA(dcb->writeq);
com_quit = MYSQL_IS_COM_QUIT(data);
rc = 0;
}
spinlock_release(&dcb->writeqlock);
if (dcb->session->client_dcb == NULL)
{
rc = 0;
}
else if (!(MYSQL_IS_COM_QUIT(data)))
{
/*< vraa : errorHandle */
mysql_send_custom_error(dcb->session->client_dcb,
1,
0,
"Writing to backend failed due invalid Maxscale "
"state.");
MXS_DEBUG("%lu [gw_write_backend_event] Write to backend "
"dcb %p fd %d "
"failed due invalid state %s.",
pthread_self(),
dcb,
dcb->fd,
STRDCBSTATE(dcb->state));
MXS_ERROR("Attempt to write buffered data to backend "
"failed "
"due internal inconsistent state.");
if (data && !com_quit)
{
mysql_send_custom_error(dcb->session->client_dcb, 1, 0,
"Writing to backend failed due invalid Maxscale state.");
MXS_DEBUG("%lu [gw_write_backend_event] Write to backend "
"dcb %p fd %d failed due invalid state %s.",
pthread_self(), dcb, dcb->fd, STRDCBSTATE(dcb->state));
rc = 0;
}
MXS_ERROR("Attempt to write buffered data to backend "
"failed due internal inconsistent state.");
}
else
{
MXS_DEBUG("%lu [gw_write_backend_event] Dcb %p in state %s "
"but there's nothing to write either.",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state));
"but there's nothing to write either.",
pthread_self(), dcb, STRDCBSTATE(dcb->state));
rc = 1;
}
goto return_rc;
}
@ -2031,22 +2021,17 @@ static GWBUF* process_response_data(DCB* dcb,
outbuf = gwbuf_append(outbuf, readbuf);
readbuf = NULL;
}
/**
* Packet was read. There should be more since bytes were
* left over.
* Move the next packet to its own buffer and add that next
* to the prev packet's buffer.
*/
else /*< nbytes_left < nbytes_to_process */
/**
* Buffer contains more data than we need. Split the complete packet and
* the extra data into two separate buffers.
*/
else
{
ss_dassert(nbytes_left >= 0);
nbytes_to_process -= nbytes_left;
/** Move the prefix of the buffer to outbuf from redbuf */
outbuf = gwbuf_append(outbuf,
gwbuf_clone_portion(readbuf, 0, (size_t) nbytes_left));
readbuf = gwbuf_consume(readbuf, (size_t) nbytes_left);
ss_dassert(nbytes_left < nbytes_to_process);
ss_dassert(nbytes_left > 0);
ss_dassert(npackets_left > 0);
outbuf = gwbuf_append(outbuf, gwbuf_split(&readbuf, nbytes_left));
nbytes_to_process -= nbytes_left;
npackets_left -= 1;
nbytes_left = 0;
}

View File

@ -311,11 +311,15 @@ int MySQLSendHandshake(DCB* dcb)
memcpy(mysql_plugin_data, server_scramble + 8, 12);
const char* plugin_name = dcb->authfunc.plugin_name ?
dcb->authfunc.plugin_name : DEFAULT_AUTH_PLUGIN_NAME;
int plugin_name_len = strlen(plugin_name);
mysql_payload_size =
sizeof(mysql_protocol_version) + (len_version_string + 1) + sizeof(mysql_thread_id_num) + 8 +
sizeof(/* mysql_filler */ uint8_t) + sizeof(mysql_server_capabilities_one) + sizeof(mysql_server_language) +
sizeof(mysql_server_status) + sizeof(mysql_server_capabilities_two) + sizeof(mysql_scramble_len) +
sizeof(mysql_filler_ten) + 12 + sizeof(/* mysql_last_byte */ uint8_t) + strlen("mysql_native_password") +
sizeof(mysql_filler_ten) + 12 + sizeof(/* mysql_last_byte */ uint8_t) + plugin_name_len +
sizeof(/* mysql_last_byte */ uint8_t);
// allocate memory for packet header + payload
@ -407,8 +411,8 @@ int MySQLSendHandshake(DCB* dcb)
mysql_handshake_payload++;
// to be understanded ????
memcpy(mysql_handshake_payload, "mysql_native_password", strlen("mysql_native_password"));
mysql_handshake_payload = mysql_handshake_payload + strlen("mysql_native_password");
memcpy(mysql_handshake_payload, plugin_name, plugin_name_len);
mysql_handshake_payload = mysql_handshake_payload + plugin_name_len;
//write last byte, 0
*mysql_handshake_payload = 0x00;
@ -573,7 +577,7 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
* data extraction succeeds, then a call is made to the actual
* authenticate function to carry out the user checks.
*/
if (MYSQL_AUTH_SUCCEEDED == (
if (MXS_AUTH_SUCCEEDED == (
auth_val = dcb->authfunc.extract(dcb, read_buffer)))
{
/*
@ -593,7 +597,7 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
* non-null session) then the whole process has succeeded. In all
* other cases an error return is made.
*/
if (MYSQL_AUTH_SUCCEEDED == auth_val)
if (MXS_AUTH_SUCCEEDED == auth_val)
{
SESSION *session;
@ -624,14 +628,17 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
}
else
{
auth_val = MYSQL_AUTH_NO_SESSION;
auth_val = MXS_AUTH_NO_SESSION;
}
}
/**
* If we did not get success throughout, then the protocol state is updated,
* the client is notified of the failure and the DCB is closed.
* If we did not get success throughout or authentication is not yet complete,
* then the protocol state is updated, the client is notified of the failure
* and the DCB is closed.
*/
if (MYSQL_AUTH_SUCCEEDED != auth_val && MYSQL_AUTH_SSL_INCOMPLETE != auth_val)
if (MXS_AUTH_SUCCEEDED != auth_val &&
MXS_AUTH_INCOMPLETE != auth_val &&
MXS_AUTH_SSL_INCOMPLETE != auth_val)
{
protocol->protocol_auth_state = MYSQL_AUTH_FAILED;
mysql_client_auth_error_handling(dcb, auth_val);
@ -970,7 +977,7 @@ mysql_client_auth_error_handling(DCB *dcb, int auth_val)
switch (auth_val)
{
case MYSQL_AUTH_NO_SESSION:
case MXS_AUTH_NO_SESSION:
MXS_DEBUG("%lu [gw_read_client_event] session "
"creation failed. fd %d, "
"state = MYSQL_AUTH_NO_SESSION.",
@ -983,7 +990,7 @@ mysql_client_auth_error_handling(DCB *dcb, int auth_val)
0,
"failed to create new session");
break;
case MYSQL_FAILED_AUTH_DB:
case MXS_AUTH_FAILED_DB:
MXS_DEBUG("%lu [gw_read_client_event] database "
"specified was not valid. fd %d, "
"state = MYSQL_FAILED_AUTH_DB.",
@ -999,7 +1006,7 @@ mysql_client_auth_error_handling(DCB *dcb, int auth_val)
modutil_send_mysql_err_packet(dcb, packet_number, 0, 1049, "42000", fail_str);
break;
case MYSQL_FAILED_AUTH_SSL:
case MXS_AUTH_FAILED_SSL:
MXS_DEBUG("%lu [gw_read_client_event] client is "
"not SSL capable for SSL listener. fd %d, "
"state = MYSQL_FAILED_AUTH_SSL.",
@ -1012,7 +1019,7 @@ mysql_client_auth_error_handling(DCB *dcb, int auth_val)
0,
"failed to complete SSL authentication");
break;
case MYSQL_AUTH_SSL_INCOMPLETE:
case MXS_AUTH_SSL_INCOMPLETE:
MXS_DEBUG("%lu [gw_read_client_event] unable to "
"complete SSL authentication. fd %d, "
"state = MYSQL_AUTH_SSL_INCOMPLETE.",
@ -1025,7 +1032,7 @@ mysql_client_auth_error_handling(DCB *dcb, int auth_val)
0,
"failed to complete SSL authentication");
break;
case MYSQL_FAILED_AUTH:
case MXS_AUTH_FAILED:
MXS_DEBUG("%lu [gw_read_client_event] authentication failed. fd %d, "
"state = MYSQL_FAILED_AUTH.",
pthread_self(),

View File

@ -959,7 +959,7 @@ char *create_auth_fail_str(char *username,
{
ferrstr = "Access denied for user '%s'@'%s' (using password: %s) to database '%s'";
}
else if (errcode == MYSQL_FAILED_AUTH_SSL)
else if (errcode == MXS_AUTH_FAILED_SSL)
{
ferrstr = "Access without SSL denied";
}
@ -980,7 +980,7 @@ char *create_auth_fail_str(char *username,
{
sprintf(errstr, ferrstr, username, hostaddr, (*sha1 == '\0' ? "NO" : "YES"), db);
}
else if (errcode == MYSQL_FAILED_AUTH_SSL)
else if (errcode == MXS_AUTH_FAILED_SSL)
{
sprintf(errstr, "%s", ferrstr);
}

View File

@ -1,4 +1,4 @@
add_library(readwritesplit SHARED readwritesplit.c)
add_library(readwritesplit SHARED readwritesplit.c rwsplit_mysql.c rwsplit_route_stmt.c rwsplit_select_backends.c rwsplit_session_cmd.c rwsplit_tmp_table_multi.c)
target_link_libraries(readwritesplit maxscale-common)
set_target_properties(readwritesplit PROPERTIES VERSION "1.0.2")
install_module(readwritesplit core)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,541 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <my_config.h>
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <router.h>
#include <readwritesplit.h>
#include <rwsplit_internal.h>
#include <mysql.h>
#include <skygw_utils.h>
#include <log_manager.h>
#include <query_classifier.h>
#include <dcb.h>
#include <spinlock.h>
#include <modinfo.h>
#include <modutil.h>
#include <mysql_client_server_protocol.h>
#include <mysqld_error.h>
#include <maxscale/alloc.h>
#if defined(SS_DEBUG)
#include <mysql_client_server_protocol.h>
#endif
#define RWSPLIT_TRACE_MSG_LEN 1000
/**
* @file rwsplit_mysql.c Functions within the read-write split router that
* are specific to MySQL. The aim is to either remove these into a separate
* module or to move them into the MySQL protocol modules.
*
* @verbatim
* Revision History
*
* Date Who Description
* 08/08/2016 Martin Brampton Initial implementation
*
* @endverbatim
*/
/*
* The following functions are called from elsewhere in the router and
* are defined in rwsplit_internal.h. They are not intended to be called
* from outside this router.
*/
/* This could be placed in the protocol, with a new API entry point
* It is certainly MySQL specific.
* */
int
determine_packet_type(GWBUF *querybuf, bool *non_empty_packet)
{
mysql_server_cmd_t packet_type;
uint8_t *packet = GWBUF_DATA(querybuf);
if (gw_mysql_get_byte3(packet) == 0)
{
/** Empty packet signals end of LOAD DATA LOCAL INFILE, send it to master*/
*non_empty_packet = false;
packet_type = MYSQL_COM_UNDEFINED;
}
else
{
*non_empty_packet = true;
packet_type = packet[4];
}
return (int)packet_type;
}
/*
* This appears to be MySQL specific
*/
bool
is_packet_a_query(int packet_type)
{
return (packet_type == MYSQL_COM_QUERY);
}
/*
* This looks MySQL specific
*/
bool
is_packet_a_one_way_message(int packet_type)
{
return (packet_type == MYSQL_COM_STMT_SEND_LONG_DATA ||
packet_type == MYSQL_COM_QUIT || packet_type == MYSQL_COM_STMT_CLOSE);
}
/*
* This one is problematic because it is MySQL specific, but also router
* specific.
*/
void
log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t qtype)
{
if (!rses->rses_load_active)
{
uint8_t *packet = GWBUF_DATA(querybuf);
unsigned char ptype = packet[4];
size_t len = MIN(GWBUF_LENGTH(querybuf),
MYSQL_GET_PACKET_LEN((unsigned char *)querybuf->start) - 1);
char *data = (char *)&packet[5];
char *contentstr = strndup(data, MIN(len, RWSPLIT_TRACE_MSG_LEN));
char *qtypestr = qc_get_qtype_str(qtype);
MXS_INFO("> Autocommit: %s, trx is %s, cmd: %s, type: %s, stmt: %s%s %s",
(rses->rses_autocommit_enabled ? "[enabled]" : "[disabled]"),
(rses->rses_transaction_active ? "[open]" : "[not open]"),
STRPACKETTYPE(ptype), (qtypestr == NULL ? "N/A" : qtypestr),
contentstr, (querybuf->hint == NULL ? "" : ", Hint:"),
(querybuf->hint == NULL ? "" : STRHINTTYPE(querybuf->hint->type)));
MXS_FREE(contentstr);
MXS_FREE(qtypestr);
}
else
{
MXS_INFO("> Processing LOAD DATA LOCAL INFILE: %lu bytes sent.",
rses->rses_load_data_sent);
}
}
/*
* This is mostly router code, but it contains MySQL specific operations that
* maybe could be moved to the protocol module. The modutil functions are mostly
* MySQL specific and could migrate to the MySQL protocol; likewise the
* utility to convert packet type to a string. The aim is for most of this
* code to remain as part of the router.
*/
bool
handle_target_is_all(route_target_t route_target,
ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, int packet_type, qc_query_type_t qtype)
{
bool result;
/** Multiple, conflicting routing target. Return error */
if (TARGET_IS_MASTER(route_target) || TARGET_IS_SLAVE(route_target))
{
backend_ref_t *bref = rses->rses_backend_ref;
/* NOTE: modutil_get_query is MySQL specific */
char *query_str = modutil_get_query(querybuf);
char *qtype_str = qc_get_qtype_str(qtype);
/* NOTE: packet_type is MySQL specific */
MXS_ERROR("Can't route %s:%s:\"%s\". SELECT with session data "
"modification is not supported if configuration parameter "
"use_sql_variables_in=all .", STRPACKETTYPE(packet_type),
qtype_str, (query_str == NULL ? "(empty)" : query_str));
MXS_INFO("Unable to route the query without losing session data "
"modification from other servers. <");
while (bref != NULL && !BREF_IS_IN_USE(bref))
{
bref++;
}
if (bref != NULL && BREF_IS_IN_USE(bref))
{
/** Create and add MySQL error to eventqueue */
modutil_reply_parse_error(bref->bref_dcb,
MXS_STRDUP_A("Routing query to backend failed. "
"See the error log for further "
"details."), 0);
result = true;
}
else
{
/**
* If there were no available backend references
* available return false - session will be closed
*/
MXS_ERROR("Sending error message to client "
"failed. Router doesn't have any "
"available backends. Session will be "
"closed.");
result = false;
}
/* Test shouldn't be needed */
if (query_str)
{
MXS_FREE(query_str);
}
if (qtype_str)
{
MXS_FREE(qtype_str);
}
return result;
}
/**
* It is not sure if the session command in question requires
* response. Statement is examined in route_session_write.
* Router locking is done inside the function.
*/
result = route_session_write(rses, gwbuf_clone(querybuf), inst,
packet_type, qtype);
if (result)
{
atomic_add(&inst->stats.n_all, 1);
}
return result;
}
/* This is MySQL specific */
void
session_lock_failure_handling(GWBUF *querybuf, int packet_type, qc_query_type_t qtype)
{
if (packet_type != MYSQL_COM_QUIT)
{
/* NOTE: modutil_get_query is MySQL specific */
char *query_str = modutil_get_query(querybuf);
MXS_ERROR("Can't route %s:%s:\"%s\" to "
"backend server. Router is closed.",
STRPACKETTYPE(packet_type), STRQTYPE(qtype),
(query_str == NULL ? "(empty)" : query_str));
MXS_FREE(query_str);
}
}
/*
* Probably MySQL specific because of modutil function
*/
void closed_session_reply(GWBUF *querybuf)
{
uint8_t* data = GWBUF_DATA(querybuf);
if (GWBUF_LENGTH(querybuf) >= 5 && !MYSQL_IS_COM_QUIT(data))
{
/* Note that most modutil functions are MySQL specific */
char *query_str = modutil_get_query(querybuf);
MXS_ERROR("Can't route %s:\"%s\" to backend server. Router is closed.",
STRPACKETTYPE(data[4]), query_str ? query_str : "(empty)");
MXS_FREE(query_str);
}
}
/*
* Probably MySQL specific because of modutil function
*/
void live_session_reply(GWBUF **querybuf, ROUTER_CLIENT_SES *rses)
{
GWBUF *tmpbuf = *querybuf;
if (GWBUF_IS_TYPE_UNDEFINED(tmpbuf))
{
/* Note that many modutil functions are MySQL specific */
*querybuf = modutil_get_complete_packets(&tmpbuf);
if (tmpbuf)
{
rses->client_dcb->dcb_readqueue = gwbuf_append(rses->client_dcb->dcb_readqueue, tmpbuf);
}
*querybuf = gwbuf_make_contiguous(*querybuf);
/** Mark buffer to as MySQL type */
gwbuf_set_type(*querybuf, GWBUF_TYPE_MYSQL);
gwbuf_set_type(*querybuf, GWBUF_TYPE_SINGLE_STMT);
}
}
/*
* Uses MySQL specific mechanisms
*/
void print_error_packet(ROUTER_CLIENT_SES *rses, GWBUF *buf, DCB *dcb)
{
#if defined(SS_DEBUG)
if (GWBUF_IS_TYPE_MYSQL(buf))
{
while (gwbuf_length(buf) > 0)
{
/**
* This works with MySQL protocol only !
* Protocol specific packet print functions would be nice.
*/
uint8_t *ptr = GWBUF_DATA(buf);
size_t len = MYSQL_GET_PACKET_LEN(ptr);
if (MYSQL_GET_COMMAND(ptr) == 0xff)
{
SERVER *srv = NULL;
backend_ref_t *bref = rses->rses_backend_ref;
int i;
char *bufstr;
for (i = 0; i < rses->rses_nbackends; i++)
{
if (bref[i].bref_dcb == dcb)
{
srv = bref[i].bref_backend->backend_server;
}
}
ss_dassert(srv != NULL);
char *str = (char *)&ptr[7];
bufstr = strndup(str, len - 3);
MXS_ERROR("Backend server %s:%d responded with "
"error : %s",
srv->name, srv->port, bufstr);
MXS_FREE(bufstr);
}
buf = gwbuf_consume(buf, len + 4);
}
}
else
{
gwbuf_free(buf);
}
#endif /*< SS_DEBUG */
}
/*
* Uses MySQL specific mechanisms
*/
void check_session_command_reply(GWBUF *writebuf, sescmd_cursor_t *scur, backend_ref_t *bref)
{
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_ERR) &&
MYSQL_IS_ERROR_PACKET(((uint8_t *)GWBUF_DATA(writebuf))))
{
uint8_t *buf = (uint8_t *)GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf));
uint8_t *replybuf = (uint8_t *)GWBUF_DATA(writebuf);
size_t len = MYSQL_GET_PACKET_LEN(buf);
size_t replylen = MYSQL_GET_PACKET_LEN(replybuf);
char *err = strndup(&((char *)replybuf)[8], 5);
char *replystr = strndup(&((char *)replybuf)[13], replylen - 4 - 5);
ss_dassert(len + 4 == GWBUF_LENGTH(scur->scmd_cur_cmd->my_sescmd_buf));
MXS_ERROR("Failed to execute session command in %s:%d. Error was: %s %s",
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port, err, replystr);
MXS_FREE(err);
MXS_FREE(replystr);
}
}
/**
* If session command cursor is passive, sends the command to backend for
* execution.
*
* Returns true if command was sent or added successfully to the queue.
* Returns false if command sending failed or if there are no pending session
* commands.
*
* Router session must be locked.
*/
/*
* Uses MySQL specific values in the large switch statement, although it
* may be possible to generalize them.
*/
bool execute_sescmd_in_backend(backend_ref_t *backend_ref)
{
DCB *dcb;
bool succp;
int rc = 0;
sescmd_cursor_t *scur;
GWBUF *buf;
if (backend_ref == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return false;
}
if (BREF_IS_CLOSED(backend_ref))
{
succp = false;
goto return_succp;
}
dcb = backend_ref->bref_dcb;
CHK_DCB(dcb);
CHK_BACKEND_REF(backend_ref);
/**
* Get cursor pointer and copy of command buffer to cursor.
*/
scur = &backend_ref->bref_sescmd_cur;
/** Return if there are no pending ses commands */
if (sescmd_cursor_get_command(scur) == NULL)
{
succp = true;
MXS_INFO("Cursor had no pending session commands.");
goto return_succp;
}
if (!sescmd_cursor_is_active(scur))
{
/** Cursor is left active when function returns. */
sescmd_cursor_set_active(scur, true);
}
switch (scur->scmd_cur_cmd->my_sescmd_packet_type)
{
case MYSQL_COM_CHANGE_USER:
/** This makes it possible to handle replies correctly */
gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD);
buf = sescmd_cursor_clone_querybuf(scur);
rc = dcb->func.auth(dcb, NULL, dcb->session, buf);
break;
case MYSQL_COM_INIT_DB:
{
/**
* Record database name and store to session.
*/
GWBUF *tmpbuf;
MYSQL_session *data;
unsigned int qlen;
data = dcb->session->client_dcb->data;
*data->db = 0;
tmpbuf = scur->scmd_cur_cmd->my_sescmd_buf;
qlen = MYSQL_GET_PACKET_LEN((unsigned char *) GWBUF_DATA(tmpbuf));
if (qlen)
{
--qlen; // The COM_INIT_DB byte
if (qlen > MYSQL_DATABASE_MAXLEN)
{
MXS_ERROR("Too long a database name received in COM_INIT_DB, "
"trailing data will be cut.");
qlen = MYSQL_DATABASE_MAXLEN;
}
memcpy(data->db, (char*)GWBUF_DATA(tmpbuf) + 5, qlen);
data->db[qlen] = 0;
}
}
/** Fallthrough */
case MYSQL_COM_QUERY:
default:
/**
* Mark session command buffer, it triggers writing
* MySQL command to protocol
*/
gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD);
buf = sescmd_cursor_clone_querybuf(scur);
rc = dcb->func.write(dcb, buf);
break;
}
if (rc == 1)
{
succp = true;
}
else
{
succp = false;
}
return_succp:
return succp;
}
/*
* End of functions called from other router modules; start of functions that
* are internal to this module
*/
/**
* Get client DCB pointer of the router client session.
* This routine must be protected by Router client session lock.
*
* APPEARS TO NEVER BE USED!!
*
* @param rses Router client session pointer
*
* @return Pointer to client DCB
*/
static DCB *rses_get_client_dcb(ROUTER_CLIENT_SES *rses)
{
DCB *dcb = NULL;
int i;
for (i = 0; i < rses->rses_nbackends; i++)
{
if ((dcb = rses->rses_backend_ref[i].bref_dcb) != NULL &&
BREF_IS_IN_USE(&rses->rses_backend_ref[i]) && dcb->session != NULL &&
dcb->session->client_dcb != NULL)
{
return dcb->session->client_dcb;
}
}
return NULL;
}
/*
* The following are internal (directly or indirectly) to routing a statement
* and should be moved to rwsplit_route_cmd.c if the MySQL specific code can
* be removed.
*/
sescmd_cursor_t *backend_ref_get_sescmd_cursor(backend_ref_t *bref)
{
sescmd_cursor_t *scur;
CHK_BACKEND_REF(bref);
scur = &bref->bref_sescmd_cur;
CHK_SESCMD_CUR(scur);
return scur;
}
/**
* Send an error message to the client telling that the server is in read only mode
* @param dcb Client DCB
* @return True if sending the message was successful, false if an error occurred
*/
bool send_readonly_error(DCB *dcb)
{
bool succp = false;
const char* errmsg = "The MariaDB server is running with the --read-only"
" option so it cannot execute this statement";
GWBUF* err = modutil_create_mysql_err_msg(1, 0, ER_OPTION_PREVENTS_STATEMENT,
"HY000", errmsg);
if (err)
{
succp = dcb->func.write(dcb, err);
}
else
{
MXS_ERROR("Memory allocation failed when creating client error message.");
}
return succp;
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,475 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <router.h>
#include <readwritesplit.h>
#include <rwsplit_internal.h>
/**
* @file rwsplit_select_backends.c The functions that implement back end
* selection for the read write split router. All of these functions are
* internal to that router and not intended to be called from elsewhere.
*
* @verbatim
* Revision History
*
* Date Who Description
* 08/08/2016 Martin Brampton Initial implementation
*
* @endverbatim
*/
static bool connect_server(backend_ref_t *bref, SESSION *session, bool execute_history);
static void log_server_connections(select_criteria_t select_criteria,
backend_ref_t *backend_ref, int router_nservers);
static BACKEND *get_root_master(backend_ref_t *servers, int router_nservers);
static int bref_cmp_global_conn(const void *bref1, const void *bref2);
static int bref_cmp_router_conn(const void *bref1, const void *bref2);
static int bref_cmp_behind_master(const void *bref1, const void *bref2);
static int bref_cmp_current_load(const void *bref1, const void *bref2);
/**
* The order of functions _must_ match with the order the select criteria are
* listed in select_criteria_t definition in readwritesplit.h
*/
int (*criteria_cmpfun[LAST_CRITERIA])(const void *, const void *) =
{
NULL,
bref_cmp_global_conn,
bref_cmp_router_conn,
bref_cmp_behind_master,
bref_cmp_current_load
};
/*
* The following function is the only one that is called from elsewhere in
* the read write split router. It is not intended for use from outside this
* router. Other functions in this module are internal and are called
* directly or indirectly by this function.
*/
/**
* @brief Search suitable backend servers from those of router instance
*
* It is assumed that there is only one master among servers of a router instance.
* As a result, the first master found is chosen. There will possibly be more
* backend references than connected backends because only those in correct state
* are connected to.
*
* @param p_master_ref Pointer to location where master's backend reference is to be stored
* @param backend_ref Pointer to backend server reference object array
* @param router_nservers Number of backend server pointers pointed to by @p backend_ref
* @param max_nslaves Upper limit for the number of slaves
* @param max_slave_rlag Maximum allowed replication lag for any slave
* @param select_criteria Slave selection criteria
* @param session Client session
* @param router Router instance
* @return true, if at least one master and one slave was found.
*/
bool select_connect_backend_servers(backend_ref_t **p_master_ref,
backend_ref_t *backend_ref,
int router_nservers, int max_nslaves,
int max_slave_rlag,
select_criteria_t select_criteria,
SESSION *session,
ROUTER_INSTANCE *router)
{
if (p_master_ref == NULL || backend_ref == NULL)
{
MXS_ERROR("Master reference (%p) or backend reference (%p) is NULL.",
p_master_ref, backend_ref);
ss_dassert(false);
return false;
}
/* get the root Master */
BACKEND *master_host = get_root_master(backend_ref, router_nservers);
if (router->rwsplit_config.rw_master_failure_mode == RW_FAIL_INSTANTLY &&
(master_host == NULL || SERVER_IS_DOWN(master_host->backend_server)))
{
MXS_ERROR("Couldn't find suitable Master from %d candidates.", router_nservers);
return false;
}
/**
* Existing session : master is already chosen and connected.
* The function was called because new slave must be selected to replace
* failed one.
*/
bool master_connected = *p_master_ref != NULL;
/** Check slave selection criteria and set compare function */
int (*p)(const void *, const void *) = criteria_cmpfun[select_criteria];
ss_dassert(p);
/** Sort the pointer list to servers according to slave selection criteria.
* The servers that match the criteria the best are at the beginning of
* the list. */
qsort(backend_ref, (size_t) router_nservers, sizeof(backend_ref_t), p);
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{
log_server_connections(select_criteria, backend_ref, router_nservers);
}
int slaves_found = 0;
int slaves_connected = 0;
const int min_nslaves = 0; /*< not configurable at the time */
bool succp = false;
/**
* Choose at least 1+min_nslaves (master and slave) and at most 1+max_nslaves
* servers from the sorted list. First master found is selected.
*/
for (int i = 0; i < router_nservers &&
(slaves_connected < max_nslaves || !master_connected); i++)
{
SERVER *serv = backend_ref[i].bref_backend->backend_server;
if (!BREF_HAS_FAILED(&backend_ref[i]) && SERVER_IS_RUNNING(serv))
{
/* check also for relay servers and don't take the master_host */
if (slaves_found < max_nslaves &&
(max_slave_rlag == MAX_RLAG_UNDEFINED ||
(serv->rlag != MAX_RLAG_NOT_AVAILABLE &&
serv->rlag <= max_slave_rlag)) &&
(SERVER_IS_SLAVE(serv) || SERVER_IS_RELAY_SERVER(serv)) &&
(master_host == NULL || (serv != master_host->backend_server)))
{
slaves_found += 1;
if (BREF_IS_IN_USE((&backend_ref[i])) ||
connect_server(&backend_ref[i], session, true))
{
slaves_connected += 1;
}
}
/* take the master_host for master */
else if (master_host && (serv == master_host->backend_server))
{
/** p_master_ref must be assigned with this backend_ref pointer
* because its original value may have been lost when backend
* references were sorted with qsort. */
*p_master_ref = &backend_ref[i];
if (!master_connected)
{
if (connect_server(&backend_ref[i], session, false))
{
master_connected = true;
}
}
}
}
} /*< for */
/**
* Successful cases
*/
if (slaves_connected >= min_nslaves && slaves_connected <= max_nslaves)
{
succp = true;
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{
if (slaves_connected < max_nslaves)
{
MXS_INFO("Couldn't connect to maximum number of "
"slaves. Connected successfully to %d slaves "
"of %d of them.", slaves_connected, slaves_found);
}
for (int i = 0; i < router_nservers; i++)
{
if (BREF_IS_IN_USE((&backend_ref[i])))
{
MXS_INFO("Selected %s in \t%s:%d",
STRSRVSTATUS(backend_ref[i].bref_backend->backend_server),
backend_ref[i].bref_backend->backend_server->name,
backend_ref[i].bref_backend->backend_server->port);
}
} /* for */
}
}
/** Failure cases */
else
{
if (slaves_connected < min_nslaves)
{
MXS_ERROR("Couldn't establish required amount of "
"slave connections for router session.");
}
/** Clean up connections */
for (int i = 0; i < router_nservers; i++)
{
if (BREF_IS_IN_USE((&backend_ref[i])))
{
ss_dassert(backend_ref[i].bref_backend->backend_conn_count > 0);
/** disconnect opened connections */
bref_clear_state(&backend_ref[i], BREF_IN_USE);
/** Decrease backend's connection counter. */
atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1);
dcb_close(backend_ref[i].bref_dcb);
}
}
}
return succp;
}
/** Compare number of connections from this router in backend servers */
static int bref_cmp_router_conn(const void *bref1, const void *bref2)
{
BACKEND *b1 = ((backend_ref_t *)bref1)->bref_backend;
BACKEND *b2 = ((backend_ref_t *)bref2)->bref_backend;
if (b1->weight == 0 && b2->weight == 0)
{
return b1->backend_server->stats.n_current -
b2->backend_server->stats.n_current;
}
else if (b1->weight == 0)
{
return 1;
}
else if (b2->weight == 0)
{
return -1;
}
return ((1000 + 1000 * b1->backend_conn_count) / b1->weight) -
((1000 + 1000 * b2->backend_conn_count) / b2->weight);
}
/** Compare number of global connections in backend servers */
static int bref_cmp_global_conn(const void *bref1, const void *bref2)
{
BACKEND *b1 = ((backend_ref_t *)bref1)->bref_backend;
BACKEND *b2 = ((backend_ref_t *)bref2)->bref_backend;
if (b1->weight == 0 && b2->weight == 0)
{
return b1->backend_server->stats.n_current -
b2->backend_server->stats.n_current;
}
else if (b1->weight == 0)
{
return 1;
}
else if (b2->weight == 0)
{
return -1;
}
return ((1000 + 1000 * b1->backend_server->stats.n_current) / b1->weight) -
((1000 + 1000 * b2->backend_server->stats.n_current) / b2->weight);
}
/** Compare replication lag between backend servers */
static int bref_cmp_behind_master(const void *bref1, const void *bref2)
{
BACKEND *b1 = ((backend_ref_t *)bref1)->bref_backend;
BACKEND *b2 = ((backend_ref_t *)bref2)->bref_backend;
return ((b1->backend_server->rlag < b2->backend_server->rlag) ? -1
: ((b1->backend_server->rlag > b2->backend_server->rlag) ? 1 : 0));
}
/** Compare number of current operations in backend servers */
static int bref_cmp_current_load(const void *bref1, const void *bref2)
{
SERVER *s1 = ((backend_ref_t *)bref1)->bref_backend->backend_server;
SERVER *s2 = ((backend_ref_t *)bref2)->bref_backend->backend_server;
BACKEND *b1 = ((backend_ref_t *)bref1)->bref_backend;
BACKEND *b2 = ((backend_ref_t *)bref2)->bref_backend;
if (b1->weight == 0 && b2->weight == 0)
{
return b1->backend_server->stats.n_current -
b2->backend_server->stats.n_current;
}
else if (b1->weight == 0)
{
return 1;
}
else if (b2->weight == 0)
{
return -1;
}
return ((1000 * s1->stats.n_current_ops) - b1->weight) -
((1000 * s2->stats.n_current_ops) - b2->weight);
}
/**
* @brief Connect a server
*
* Connects to a server, adds callbacks to the created DCB and updates
* router statistics. If @p execute_history is true, the session command
* history will be executed on this server.
*
* @param b Router's backend structure for the server
* @param session Client's session object
* @param execute_history Execute session command history
* @return True if successful, false if an error occurred
*/
static bool connect_server(backend_ref_t *bref, SESSION *session, bool execute_history)
{
SERVER *serv = bref->bref_backend->backend_server;
bool rval = false;
bref->bref_dcb = dcb_connect(serv, session, serv->protocol);
if (bref->bref_dcb != NULL)
{
bref_clear_state(bref, BREF_CLOSED);
if (!execute_history || execute_sescmd_history(bref))
{
/** Add a callback for unresponsive server */
dcb_add_callback(bref->bref_dcb, DCB_REASON_NOT_RESPONDING,
&router_handle_state_switch, (void *) bref);
bref->bref_state = 0;
bref_set_state(bref, BREF_IN_USE);
atomic_add(&bref->bref_backend->backend_conn_count, 1);
rval = true;
}
else
{
MXS_ERROR("Failed to execute session command in %s (%s:%d). See earlier "
"errors for more details.",
bref->bref_backend->backend_server->unique_name,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port);
dcb_close(bref->bref_dcb);
bref->bref_dcb = NULL;
}
}
else
{
MXS_ERROR("Unable to establish connection with server %s:%d",
serv->name, serv->port);
}
return rval;
}
/**
* @brief Log server connections
*
* @param select_criteria Slave selection criteria
* @param backend_ref Backend reference array
* @param router_nservers Number of backends in @p backend_ref
*/
static void log_server_connections(select_criteria_t select_criteria,
backend_ref_t *backend_ref, int router_nservers)
{
if (select_criteria == LEAST_GLOBAL_CONNECTIONS ||
select_criteria == LEAST_ROUTER_CONNECTIONS ||
select_criteria == LEAST_BEHIND_MASTER ||
select_criteria == LEAST_CURRENT_OPERATIONS)
{
MXS_INFO("Servers and %s connection counts:",
select_criteria == LEAST_GLOBAL_CONNECTIONS ? "all MaxScale"
: "router");
for (int i = 0; i < router_nservers; i++)
{
BACKEND *b = backend_ref[i].bref_backend;
switch (select_criteria)
{
case LEAST_GLOBAL_CONNECTIONS:
MXS_INFO("MaxScale connections : %d in \t%s:%d %s",
b->backend_server->stats.n_current, b->backend_server->name,
b->backend_server->port, STRSRVSTATUS(b->backend_server));
break;
case LEAST_ROUTER_CONNECTIONS:
MXS_INFO("RWSplit connections : %d in \t%s:%d %s",
b->backend_conn_count, b->backend_server->name,
b->backend_server->port, STRSRVSTATUS(b->backend_server));
break;
case LEAST_CURRENT_OPERATIONS:
MXS_INFO("current operations : %d in \t%s:%d %s",
b->backend_server->stats.n_current_ops,
b->backend_server->name, b->backend_server->port,
STRSRVSTATUS(b->backend_server));
break;
case LEAST_BEHIND_MASTER:
MXS_INFO("replication lag : %d in \t%s:%d %s",
b->backend_server->rlag, b->backend_server->name,
b->backend_server->port, STRSRVSTATUS(b->backend_server));
default:
break;
}
}
}
}
/********************************
* This routine returns the root master server from MySQL replication tree
* Get the root Master rule:
*
* find server with the lowest replication depth level
* and the SERVER_MASTER bitval
* Servers are checked even if they are in 'maintenance'
*
* @param servers The list of servers
* @param router_nservers The number of servers
* @return The Master found
*
*/
static BACKEND *get_root_master(backend_ref_t *servers, int router_nservers)
{
int i = 0;
BACKEND *master_host = NULL;
for (i = 0; i < router_nservers; i++)
{
BACKEND *b;
if (servers[i].bref_backend == NULL)
{
continue;
}
b = servers[i].bref_backend;
if ((b->backend_server->status & (SERVER_MASTER | SERVER_MAINT)) ==
SERVER_MASTER)
{
if (master_host == NULL ||
(b->backend_server->depth < master_host->backend_server->depth))
{
master_host = b;
}
}
}
return master_host;
}

View File

@ -0,0 +1,480 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <router.h>
#include <readwritesplit.h>
#include <rwsplit_internal.h>
/**
* @file rwsplit_session_cmd.c The functions that provide session command
* handling for the read write split router.
*
* @verbatim
* Revision History
*
* Date Who Description
* 08/08/2016 Martin Brampton Initial implementation
*
* @endverbatim
*/
static bool sescmd_cursor_history_empty(sescmd_cursor_t *scur);
static void sescmd_cursor_reset(sescmd_cursor_t *scur);
static bool sescmd_cursor_next(sescmd_cursor_t *scur);
static rses_property_t *mysql_sescmd_get_property(mysql_sescmd_t *scmd);
/*
* The following functions, all to do with the handling of session commands,
* are called from other modules of the read write split router:
*/
/**
* Router session must be locked.
* Return session command pointer if succeed, NULL if failed.
*/
mysql_sescmd_t *rses_property_get_sescmd(rses_property_t *prop)
{
mysql_sescmd_t *sescmd;
if (prop == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return NULL;
}
CHK_RSES_PROP(prop);
ss_dassert(prop->rses_prop_rsession == NULL ||
SPINLOCK_IS_LOCKED(&prop->rses_prop_rsession->rses_lock));
sescmd = &prop->rses_prop_data.sescmd;
if (sescmd != NULL)
{
CHK_MYSQL_SESCMD(sescmd);
}
return sescmd;
}
/**
* Create session command property.
*/
mysql_sescmd_t *mysql_sescmd_init(rses_property_t *rses_prop,
GWBUF *sescmd_buf,
unsigned char packet_type,
ROUTER_CLIENT_SES *rses)
{
mysql_sescmd_t *sescmd;
CHK_RSES_PROP(rses_prop);
/** Can't call rses_property_get_sescmd with uninitialized sescmd */
sescmd = &rses_prop->rses_prop_data.sescmd;
sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */
#if defined(SS_DEBUG)
sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD;
sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD;
#endif
/** Set session command buffer */
sescmd->my_sescmd_buf = sescmd_buf;
sescmd->my_sescmd_packet_type = packet_type;
sescmd->position = atomic_add(&rses->pos_generator, 1);
return sescmd;
}
void mysql_sescmd_done(mysql_sescmd_t *sescmd)
{
if (sescmd == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return;
}
CHK_RSES_PROP(sescmd->my_sescmd_prop);
gwbuf_free(sescmd->my_sescmd_buf);
memset(sescmd, 0, sizeof(mysql_sescmd_t));
}
/**
* All cases where backend message starts at least with one response to session
* command are handled here.
* Read session commands from property list. If command is already replied,
* discard packet. Else send reply to client. In both cases move cursor forward
* until all session command replies are handled.
*
* Cases that are expected to happen and which are handled:
* s = response not yet replied to client, S = already replied response,
* q = query
* 1. q+ for example : select * from mysql.user
* 2. s+ for example : set autocommit=1
* 3. S+
* 4. sq+
* 5. Sq+
* 6. Ss+
* 7. Ss+q+
* 8. S+q+
* 9. s+q+
*/
GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
backend_ref_t *bref,
bool *reconnect)
{
mysql_sescmd_t *scmd;
sescmd_cursor_t *scur;
ROUTER_CLIENT_SES *ses;
scur = &bref->bref_sescmd_cur;
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
scmd = sescmd_cursor_get_command(scur);
ses = (*scur->scmd_cur_ptr_property)->rses_prop_rsession;
CHK_GWBUF(replybuf);
/**
* Walk through packets in the message and the list of session
* commands.
*/
while (scmd != NULL && replybuf != NULL)
{
bref->reply_cmd = *((unsigned char *)replybuf->start + 4);
scur->position = scmd->position;
/** Faster backend has already responded to client : discard */
if (scmd->my_sescmd_is_replied)
{
bool last_packet = false;
CHK_GWBUF(replybuf);
while (!last_packet)
{
int buflen;
buflen = GWBUF_LENGTH(replybuf);
last_packet = GWBUF_IS_TYPE_RESPONSE_END(replybuf);
/** discard packet */
replybuf = gwbuf_consume(replybuf, buflen);
}
/** Set response status received */
bref_clear_state(bref, BREF_WAITING_RESULT);
if (bref->reply_cmd != scmd->reply_cmd)
{
MXS_ERROR("Slave server '%s': response differs from master's response. "
"Closing connection due to inconsistent session state.",
bref->bref_backend->backend_server->unique_name);
sescmd_cursor_set_active(scur, false);
bref_clear_state(bref, BREF_QUERY_ACTIVE);
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);
bref_set_state(bref, BREF_SESCMD_FAILED);
if (bref->bref_dcb)
{
dcb_close(bref->bref_dcb);
}
*reconnect = true;
gwbuf_free(replybuf);
replybuf = NULL;
}
}
/** This is a response from the master and it is the "right" one.
* A slave server's response will be compared to this and if
* their response differs from the master server's response, they
* are dropped from the valid list of backend servers.
* Response is in the buffer and it will be sent to client.
*
* If we have no master server, the first slave's response is considered
* the "right" one. */
else if (ses->rses_master_ref == NULL ||
!BREF_IS_IN_USE(ses->rses_master_ref) ||
ses->rses_master_ref->bref_dcb == bref->bref_dcb)
{
/** Mark the rest session commands as replied */
scmd->my_sescmd_is_replied = true;
scmd->reply_cmd = *((unsigned char *)replybuf->start + 4);
MXS_INFO("Server '%s' responded to a session command, sending the response "
"to the client.", bref->bref_backend->backend_server->unique_name);
for (int i = 0; i < ses->rses_nbackends; i++)
{
if (!BREF_IS_WAITING_RESULT(&ses->rses_backend_ref[i]))
{
/** This backend has already received a response */
if (ses->rses_backend_ref[i].reply_cmd != scmd->reply_cmd &&
!BREF_IS_CLOSED(&ses->rses_backend_ref[i]))
{
bref_clear_state(&ses->rses_backend_ref[i], BREF_QUERY_ACTIVE);
bref_clear_state(&ses->rses_backend_ref[i], BREF_IN_USE);
bref_set_state(&ses->rses_backend_ref[i], BREF_CLOSED);
bref_set_state(bref, BREF_SESCMD_FAILED);
if (ses->rses_backend_ref[i].bref_dcb)
{
dcb_close(ses->rses_backend_ref[i].bref_dcb);
}
*reconnect = true;
MXS_INFO("Disabling slave %s:%d, result differs from "
"master's result. Master: %d Slave: %d",
ses->rses_backend_ref[i].bref_backend->backend_server->name,
ses->rses_backend_ref[i].bref_backend->backend_server->port,
bref->reply_cmd, ses->rses_backend_ref[i].reply_cmd);
}
}
}
}
else
{
MXS_INFO("Slave '%s' responded before master to a session command. Result: %d",
bref->bref_backend->backend_server->unique_name,
(int)bref->reply_cmd);
if (bref->reply_cmd == 0xff)
{
SERVER *serv = bref->bref_backend->backend_server;
MXS_ERROR("Slave '%s' (%s:%u) failed to execute session command.",
serv->unique_name, serv->name, serv->port);
}
gwbuf_free(replybuf);
replybuf = NULL;
}
if (sescmd_cursor_next(scur))
{
scmd = sescmd_cursor_get_command(scur);
}
else
{
scmd = NULL;
/** All session commands are replied */
scur->scmd_cur_active = false;
}
}
ss_dassert(replybuf == NULL || *scur->scmd_cur_ptr_property == NULL);
return replybuf;
}
/**
* Get the address of current session command.
*
* Router session must be locked */
mysql_sescmd_t *sescmd_cursor_get_command(sescmd_cursor_t *scur)
{
mysql_sescmd_t *scmd;
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
scur->scmd_cur_cmd = rses_property_get_sescmd(*scur->scmd_cur_ptr_property);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
scmd = scur->scmd_cur_cmd;
return scmd;
}
/** router must be locked */
bool sescmd_cursor_is_active(sescmd_cursor_t *sescmd_cursor)
{
bool succp;
if (sescmd_cursor == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return false;
}
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
succp = sescmd_cursor->scmd_cur_active;
return succp;
}
/** router must be locked */
void sescmd_cursor_set_active(sescmd_cursor_t *sescmd_cursor,
bool value)
{
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
/** avoid calling unnecessarily */
ss_dassert(sescmd_cursor->scmd_cur_active != value);
sescmd_cursor->scmd_cur_active = value;
}
/**
* Clone session command's command buffer.
* Router session must be locked
*/
GWBUF *sescmd_cursor_clone_querybuf(sescmd_cursor_t *scur)
{
GWBUF *buf;
if (scur == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return NULL;
}
ss_dassert(scur->scmd_cur_cmd != NULL);
buf = gwbuf_clone_all(scur->scmd_cur_cmd->my_sescmd_buf);
CHK_GWBUF(buf);
return buf;
}
bool execute_sescmd_history(backend_ref_t *bref)
{
bool succp = true;
sescmd_cursor_t *scur;
if (bref == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return false;
}
CHK_BACKEND_REF(bref);
scur = &bref->bref_sescmd_cur;
CHK_SESCMD_CUR(scur);
if (!sescmd_cursor_history_empty(scur))
{
sescmd_cursor_reset(scur);
succp = execute_sescmd_in_backend(bref);
}
return succp;
}
static bool sescmd_cursor_history_empty(sescmd_cursor_t *scur)
{
bool succp;
if (scur == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return true;
}
CHK_SESCMD_CUR(scur);
if (scur->scmd_cur_rses->rses_properties[RSES_PROP_TYPE_SESCMD] == NULL)
{
succp = true;
}
else
{
succp = false;
}
return succp;
}
/*
* End of functions called from other modules of the read write split router;
* start of functions that are internal to this module.
*/
static void sescmd_cursor_reset(sescmd_cursor_t *scur)
{
ROUTER_CLIENT_SES *rses;
if (scur == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return;
}
CHK_SESCMD_CUR(scur);
CHK_CLIENT_RSES(scur->scmd_cur_rses);
rses = scur->scmd_cur_rses;
scur->scmd_cur_ptr_property = &rses->rses_properties[RSES_PROP_TYPE_SESCMD];
CHK_RSES_PROP((*scur->scmd_cur_ptr_property));
scur->scmd_cur_active = false;
scur->scmd_cur_cmd = &(*scur->scmd_cur_ptr_property)->rses_prop_data.sescmd;
}
/**
* Moves cursor to next property and copied address of its sescmd to cursor.
* Current propery must be non-null.
* If current property is the last on the list, *scur->scmd_ptr_property == NULL
*
* Router session must be locked
*/
static bool sescmd_cursor_next(sescmd_cursor_t *scur)
{
bool succp = false;
rses_property_t *prop_curr;
rses_property_t *prop_next;
if (scur == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return false;
}
ss_dassert(scur != NULL);
ss_dassert(*(scur->scmd_cur_ptr_property) != NULL);
ss_dassert(SPINLOCK_IS_LOCKED(
&(*(scur->scmd_cur_ptr_property))->rses_prop_rsession->rses_lock));
/** Illegal situation */
if (scur == NULL || *scur->scmd_cur_ptr_property == NULL ||
scur->scmd_cur_cmd == NULL)
{
/** Log error */
goto return_succp;
}
prop_curr = *(scur->scmd_cur_ptr_property);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
ss_dassert(prop_curr == mysql_sescmd_get_property(scur->scmd_cur_cmd));
CHK_RSES_PROP(prop_curr);
/** Copy address of pointer to next property */
scur->scmd_cur_ptr_property = &(prop_curr->rses_prop_next);
prop_next = *scur->scmd_cur_ptr_property;
ss_dassert(prop_next == *(scur->scmd_cur_ptr_property));
/** If there is a next property move forward */
if (prop_next != NULL)
{
CHK_RSES_PROP(prop_next);
CHK_RSES_PROP((*(scur->scmd_cur_ptr_property)));
/** Get pointer to next property's sescmd */
scur->scmd_cur_cmd = rses_property_get_sescmd(prop_next);
ss_dassert(prop_next == scur->scmd_cur_cmd->my_sescmd_prop);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
CHK_RSES_PROP(scur->scmd_cur_cmd->my_sescmd_prop);
}
else
{
/** No more properties, can't proceed. */
goto return_succp;
}
if (scur->scmd_cur_cmd != NULL)
{
succp = true;
}
else
{
ss_dassert(false); /*< Log error, sescmd shouldn't be NULL */
}
return_succp:
return succp;
}
static rses_property_t *mysql_sescmd_get_property(mysql_sescmd_t *scmd)
{
CHK_MYSQL_SESCMD(scmd);
return scmd->my_sescmd_prop;
}

View File

@ -0,0 +1,408 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <maxscale/alloc.h>
/* Note that modutil contains much MySQL specific code */
#include <modutil.h>
#include <router.h>
#include <readwritesplit.h>
#include <rwsplit_internal.h>
/**
* @file rwsplit_tmp_table.c The functions that carry out checks on
* statements to see if they involve various operations involving temporary
* tables or multi-statement queries.
*
* @verbatim
* Revision History
*
* Date Who Description
* 08/08/2016 Martin Brampton Initial implementation
*
* @endverbatim
*/
/*
* The following are to do with checking whether the statement refers to
* temporary tables, or is a multi-statement request. Maybe they belong
* somewhere else, outside this router. Perhaps in the query classifier?
*/
/**
* Check if the query is a DROP TABLE... query and
* if it targets a temporary table, remove it from the hashtable.
* @param router_cli_ses Router client session
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
*/
void check_drop_tmp_table(ROUTER_CLIENT_SES *router_cli_ses, GWBUF *querybuf,
mysql_server_cmd_t packet_type)
{
if (packet_type != MYSQL_COM_QUERY && packet_type != MYSQL_COM_DROP_DB)
{
return;
}
int tsize = 0, klen = 0, i;
char **tbl = NULL;
char *hkey, *dbname;
MYSQL_session *my_data;
rses_property_t *rses_prop_tmp;
MYSQL_session *data = (MYSQL_session *)router_cli_ses->client_dcb->data;
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
dbname = (char *)data->db;
if (qc_is_drop_table_query(querybuf))
{
tbl = qc_get_table_names(querybuf, &tsize, false);
if (tbl != NULL)
{
for (i = 0; i < tsize; i++)
{
/* Not clear why the next six lines are outside the if block */
klen = strlen(dbname) + strlen(tbl[i]) + 2;
hkey = MXS_CALLOC(klen, sizeof(char));
MXS_ABORT_IF_NULL(hkey);
strcpy(hkey, dbname);
strcat(hkey, ".");
strcat(hkey, tbl[i]);
if (rses_prop_tmp && rses_prop_tmp->rses_prop_data.temp_tables)
{
if (hashtable_delete(rses_prop_tmp->rses_prop_data.temp_tables,
(void *)hkey))
{
MXS_INFO("Temporary table dropped: %s", hkey);
}
}
MXS_FREE(tbl[i]);
MXS_FREE(hkey);
}
MXS_FREE(tbl);
}
}
}
/**
* Check if the query targets a temporary table.
* @param router_cli_ses Router client session
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
* @return The type of the query
*/
qc_query_type_t is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf,
qc_query_type_t type)
{
bool target_tmp_table = false;
int tsize = 0, klen = 0, i;
char **tbl = NULL;
char *dbname;
char hkey[MYSQL_DATABASE_MAXLEN + MYSQL_TABLE_MAXLEN + 2];
MYSQL_session *data;
qc_query_type_t qtype = type;
rses_property_t *rses_prop_tmp;
if (router_cli_ses == NULL || querybuf == NULL)
{
MXS_ERROR("[%s] Error: NULL parameters passed: %p %p", __FUNCTION__,
router_cli_ses, querybuf);
return type;
}
if (router_cli_ses->client_dcb == NULL)
{
MXS_ERROR("[%s] Error: Client DCB is NULL.", __FUNCTION__);
return type;
}
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
data = (MYSQL_session *)router_cli_ses->client_dcb->data;
if (data == NULL)
{
MXS_ERROR("[%s] Error: User data in client DBC is NULL.", __FUNCTION__);
return qtype;
}
dbname = (char *)data->db;
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_LOCAL_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_READ))
{
tbl = qc_get_table_names(querybuf, &tsize, false);
if (tbl != NULL && tsize > 0)
{
/** Query targets at least one table */
for (i = 0; i < tsize && !target_tmp_table && tbl[i]; i++)
{
sprintf(hkey, "%s.%s", dbname, tbl[i]);
if (rses_prop_tmp && rses_prop_tmp->rses_prop_data.temp_tables)
{
if (hashtable_fetch(rses_prop_tmp->rses_prop_data.temp_tables, hkey))
{
/**Query target is a temporary table*/
qtype = QUERY_TYPE_READ_TMP_TABLE;
MXS_INFO("Query targets a temporary table: %s", hkey);
break;
}
}
}
}
}
if (tbl != NULL)
{
for (i = 0; i < tsize; i++)
{
MXS_FREE(tbl[i]);
}
MXS_FREE(tbl);
}
return qtype;
}
/**
* If query is of type QUERY_TYPE_CREATE_TMP_TABLE then find out
* the database and table name, create a hashvalue and
* add it to the router client session's property. If property
* doesn't exist then create it first.
* @param router_cli_ses Router client session
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
*/
void check_create_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf, qc_query_type_t type)
{
if (!QUERY_IS_TYPE(type, QUERY_TYPE_CREATE_TMP_TABLE))
{
return;
}
int klen = 0;
char *hkey, *dbname;
MYSQL_session *data;
rses_property_t *rses_prop_tmp;
HASHTABLE *h;
if (router_cli_ses == NULL || querybuf == NULL)
{
MXS_ERROR("[%s] Error: NULL parameters passed: %p %p", __FUNCTION__,
router_cli_ses, querybuf);
return;
}
if (router_cli_ses->client_dcb == NULL)
{
MXS_ERROR("[%s] Error: Client DCB is NULL.", __FUNCTION__);
return;
}
router_cli_ses->have_tmp_tables = true;
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
data = (MYSQL_session *)router_cli_ses->client_dcb->data;
if (data == NULL)
{
MXS_ERROR("[%s] Error: User data in master server DBC is NULL.",
__FUNCTION__);
return;
}
dbname = (char *)data->db;
bool is_temp = true;
char *tblname = NULL;
tblname = qc_get_created_table_name(querybuf);
if (tblname && strlen(tblname) > 0)
{
klen = strlen(dbname) + strlen(tblname) + 2;
hkey = MXS_CALLOC(klen, sizeof(char));
MXS_ABORT_IF_NULL(hkey);
strcpy(hkey, dbname);
strcat(hkey, ".");
strcat(hkey, tblname);
}
else
{
hkey = NULL;
}
if (rses_prop_tmp == NULL)
{
if ((rses_prop_tmp = (rses_property_t *)MXS_CALLOC(1, sizeof(rses_property_t))))
{
#if defined(SS_DEBUG)
rses_prop_tmp->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY;
rses_prop_tmp->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY;
#endif
rses_prop_tmp->rses_prop_rsession = router_cli_ses;
rses_prop_tmp->rses_prop_refcount = 1;
rses_prop_tmp->rses_prop_next = NULL;
rses_prop_tmp->rses_prop_type = RSES_PROP_TYPE_TMPTABLES;
router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES] = rses_prop_tmp;
}
}
if (rses_prop_tmp)
{
if (rses_prop_tmp->rses_prop_data.temp_tables == NULL)
{
h = hashtable_alloc(7, rwsplit_hashkeyfun, rwsplit_hashcmpfun);
hashtable_memory_fns(h, rwsplit_hstrdup, NULL, rwsplit_hfree, NULL);
if (h != NULL)
{
rses_prop_tmp->rses_prop_data.temp_tables = h;
}
else
{
MXS_ERROR("Failed to allocate a new hashtable.");
}
}
if (hkey && rses_prop_tmp->rses_prop_data.temp_tables &&
hashtable_add(rses_prop_tmp->rses_prop_data.temp_tables, (void *)hkey,
(void *)is_temp) == 0) /*< Conflict in hash table */
{
MXS_INFO("Temporary table conflict in hashtable: %s", hkey);
}
#if defined(SS_DEBUG)
{
bool retkey = hashtable_fetch(rses_prop_tmp->rses_prop_data.temp_tables, hkey);
if (retkey)
{
MXS_INFO("Temporary table added: %s", hkey);
}
}
#endif
}
MXS_FREE(hkey);
MXS_FREE(tblname);
}
/**
* @brief Detect multi-statement queries
*
* It is possible that the session state is modified inside a multi-statement
* query which would leave any slave sessions in an inconsistent state. Due to
* this, for the duration of this session, all queries will be sent to the
* master
* if the current query contains a multi-statement query.
* @param rses Router client session
* @param buf Buffer containing the full query
* @return True if the query contains multiple statements
*/
bool check_for_multi_stmt(GWBUF *buf, void *protocol, mysql_server_cmd_t packet_type)
{
MySQLProtocol *proto = (MySQLProtocol *)protocol;
bool rval = false;
if (proto->client_capabilities & GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS &&
packet_type == MYSQL_COM_QUERY)
{
char *ptr, *data = GWBUF_DATA(buf) + 5;
/** Payload size without command byte */
int buflen = gw_mysql_get_byte3((uint8_t *)GWBUF_DATA(buf)) - 1;
if ((ptr = strnchr_esc_mysql(data, ';', buflen)))
{
/** Skip stored procedures etc. */
while (ptr && is_mysql_sp_end(ptr, buflen - (ptr - data)))
{
ptr = strnchr_esc_mysql(ptr + 1, ';', buflen - (ptr - data) - 1);
}
if (ptr)
{
if (ptr < data + buflen &&
!is_mysql_statement_end(ptr, buflen - (ptr - data)))
{
rval = true;
}
}
}
}
return rval;
}
qc_query_type_t
determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet)
{
qc_query_type_t qtype = QUERY_TYPE_UNKNOWN;
if (non_empty_packet)
{
mysql_server_cmd_t my_packet_type = (mysql_server_cmd_t)packet_type;
switch (my_packet_type)
{
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
case MYSQL_COM_PING: /*< 0e all servers are pinged */
case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
qtype = QUERY_TYPE_SESSION_WRITE;
break;
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
qtype = QUERY_TYPE_WRITE;
break;
case MYSQL_COM_QUERY:
qtype = qc_get_type(querybuf);
break;
case MYSQL_COM_STMT_PREPARE:
qtype = qc_get_type(querybuf);
qtype |= QUERY_TYPE_PREPARE_STMT;
break;
case MYSQL_COM_STMT_EXECUTE:
/** Parsing is not needed for this type of packet */
qtype = QUERY_TYPE_EXEC_STMT;
break;
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
case MYSQL_COM_STATISTICS: /**< 9 ? */
case MYSQL_COM_PROCESS_INFO: /**< 0a ? */
case MYSQL_COM_CONNECT: /**< 0b ? */
case MYSQL_COM_PROCESS_KILL: /**< 0c ? */
case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */
case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */
case MYSQL_COM_DAEMON: /**< 1d ? */
default:
break;
} /**< switch by packet type */
}
return qtype;
}