Merge branch 'develop' into binlog_server_wait_data

This commit is contained in:
MassimilianoPinto
2016-09-30 11:52:09 +02:00
23 changed files with 2424 additions and 245 deletions

View File

@ -21,6 +21,8 @@ addons:
- libpcre3-dev
- doxygen
- pandoc
- uuid
- uuid-dev
coverity_scan:
project:
name: "mariadb-corporation/MaxScale"

View File

@ -1,4 +1,4 @@
#Cache
# Cache
## Overview
The cache filter is capable of caching the result of SELECTs, so that subsequent identical
@ -15,7 +15,9 @@ type=filter
module=cache
ttl=5
storage=...
storage_args=...
storage_options=...
rules=...
debug=...
[Cached Routing Service]
type=service
@ -42,51 +44,21 @@ optional ones.
#### `storage`
The name of the module that provides the storage for the cache. That
module will be loaded and provided with the value of `storage_args` as
module will be loaded and provided with the value of `storage_options` as
argument. For instance:
```
storage=storage_rocksdb
```
#### `storage_args`
#### `storage_options`
A comma separated list of arguments to be provided to the storage module,
specified in `storage`, when it is loaded. Note that the needed arguments
depend upon the specific module. For instance,
```
storage_args=path=/usr/maxscale/cache/rocksdb
storage_options=storage_specific_option1=value1,storage_specific_option2=value2
```
#### `allowed_references`
Specifies whether any or only fully qualified references are allowed in
queries stored to the cache.
```
allowed_references=[qualified|any]
```
The default is `qualified`, which means that only queries where
the database name is included in the table name are subject to caching.
```
select col from db.tbl;
```
If `any` is specified, then also queries where the table name is not
fully qualified are subject to caching.
```
select col from tbl;
```
Care should be excersized before this setting is changed, because, for
instance, the following is likely to produce unexpected results.
```
use db1;
select col from tbl;
...
use db2;
select col from tbl;
```
The setting can be changed to `any`, provided fully qualified names
are always used or if the names of tables in different databases are
different.
#### `max_resultset_rows`
Specifies the maximum number of rows a resultset can have in order to be
@ -119,6 +91,234 @@ If nothing is specified, the default _ttl_ value is 10.
ttl=60
```
#Storage
#### `rules`
Specifies the path of the file where the caching rules are stored. A relative
path is interpreted relative to the _data directory_ of MariaDB MaxScale.
```
rules=/path/to/rules-file
```
#### `debug`
An integer value, using which the level of debug logging made by the cache
can be controlled. The value is actually a bitfield with different bits
denoting different logging.
* `0` (`0b0000`) No logging is made.
* `1` (`0b0001`) A matching rule is logged.
* `2` (`0b0010`) A non-matching rule is logged.
* `4` (`0b0100`) A decision to use data from the cache is logged.
* `8` (`0b1000`) A decision not to use data from the cache is logged.
Default is `0`. To log everything, give `debug` a value of `15`.
```
debug=2
```
# Rules
The caching rules are expressed as a JSON object.
There are two decisions to be made regarding the caching; in what circumstances
should data be stored to the cache and in what circumstances should the data in
the cache be used.
In the JSON object this is visible as follows:
```
{
store: [ ... ],
use: [ ... ]
}
```
The `store` field specifies in what circumstances data should be stored to
the cache and the `use` field specifies in what circumstances the data in
the cache should be used. In both cases, the value is a JSON array containg
objects.
## When to Store
By default, if no rules file have been provided or if the `store` field is
missing from the object, the results of all queries will be stored to the
cache, subject to `max_resultset_rows` and `max_resultset_size` cache filter
parameters.
By providing a `store` field in the JSON object, the decision whether to
store the result of a particular query to the cache can be controlled in
a more detailed manner. The decision to cache the results of a query can
depend upon
* the database,
* the table,
* the column, or
* the query itself.
Each entry in the `store` array is an object containing three fields,
```
{
"attribute": <string>,
"op": <string>
"value": <string>
}
```
where,
* the _attribute_ can be `database`, `table`, `column` or `query`,
* the _op_ can be `=`, `!=`, `like` or `unlike`, and
* the _value_ a string.
If _op_ is `=` or `!=` then _value_ is used verbatim; if it is `like`
or `unlike`, then _value_ is interpreted as a _pcre2_ regular expression.
The objects in the `store` array are processed in order. If the result
of a comparison is _true_, no further processing will be made and the
result of the query in question will be stored to the cache.
If the result of the comparison is _false_, then the next object is
processed. The process continues until the array is exhausted. If there
is no match, then the result of the query is not stored to the cache.
Note that as the query itself is used as the key, although the following
queries
```
select * from db1.tbl
```
and
```
use db1;
select * from tbl
```
target the same table and produce the same results, they will be cached
separately. The same holds for queries like
```
select * from tbl where a = 2 and b = 3;
```
and
```
select * from tbl where b = 3 and a = 2;
```
as well. Although they conceptually are identical, there will be two
cache entries.
### Examples
Cache all queries targeting a particular database.
```
{
"store": [
{
"attribute": "database",
"op": "=",
"value": "db1"
}
]
}
```
Cache all queries _not_ targeting a particular table
```
{
"store": [
{
"attribute": "table",
"op": "!=",
"value": "tbl1"
}
]
}
```
That will exclude queries targeting table _tbl1_ irrespective of which
database it is in. To exclude a table in a particular database, specify
the table name using a qualified name.
```
{
"store": [
{
"attribute": "table",
"op": "!=",
"value": "db1.tbl1"
}
]
}
```
Cache all queries containing a WHERE clause
```
{
"store": [
{
"attribute": "query",
"op": "like",
"value": ".*WHERE.*"
}
]
}
```
Note that that will actually cause all queries that contain WHERE anywhere,
to be cached.
## When to Use
By default, if no rules file have been provided or if the `use` field is
missing from the object, all users may be returned data from the cache.
By providing a `use` field in the JSON object, the decision whether to use
data from the cache can be controlled in a more detailed manner. The decision
to use data from the cache can depend upon
* the user.
Each entry in the `use` array is an object containing three fields,
```
{
"attribute": <string>,
"op": <string>
"value": <string>
}
```
where,
* the _attribute_ can be `user`,
* the _op_ can be `=`, `!=`, `like` or `unlike`, and
* the _value_ a string.
If _op_ is `=` or `!=` then _value_ is used verbatim; if it is `like`
or `unlike`, then _value_ is interpreted as a _pcre2_ regular expression.
The objects in the `use` array are processed in order. If the result
of a comparison is _true_, no further processing will be made and the
data in the cache will be used, subject to the value of `ttl`.
If the result of the comparison is _false_, then the next object is
processed. The process continues until the array is exhausted. If there
is no match, then data in the cache will not be used.
Note that `use` is relevant only if the query is subject to caching,
that is, if all queries are cached or if a query matches a particular
rule in the `store` array.
### Examples
Use data from the cache for all users except `admin`.
```
{
"use": [
{
"attribute": "user",
"op": "!=",
"value": "admin"
}
]
}
```
# Storage
## Storage RocksDB

View File

@ -119,6 +119,51 @@ This functionality is similar to the [Multi-Master Monitor](MM-Monitor.md)
functionality. The only difference is that the MySQL monitor will also detect
traditional Master-Slave topologies.
### `failover`
Failover mode. This feature takes a boolean parameter is disabled by default.
This parameter is intended to be used with simple, two node master-slave pairs
where the failure of the master can be resolved by "promoting" the slave as the
new master. Normally this is done by using an external agent of some sort
(possibly triggered by MaxScale's monitor scripts), like
[MariaDB Replication Manager](https://github.com/tanji/replication-manager)
or [MHA](https://code.google.com/p/mysql-master-ha/).
The failover mode in mysqlmon is completely passive in the sense that it does
not modify the cluster or any servers in it. It labels a slave server as a
master server when there is only one running server. Before a failover can be
initiated, the following conditions must have been met:
- The monitor has repeatedly failed to connect to the failed servers
- There is only one running server among the monitored servers
- @@read_only is not enabled on the last running server
When these conditions are met, the monitor assigns the last remaining server the
master status and puts all other servers into maintenance mode. This is done to
prevent accidental use of the failed servers if they came back online.
When the failed servers come back up, the maintenance mode needs to be manually
cleared once replication has been set up.
**Note**: A failover will cause permanent changes in the data of the promoted
server. Only use this feature if you know that the slave servers are capable
of acting as master servers.
### `failcount`
Number of failures that must occur on all failed servers before a failover is
initiated. The default value is 5 failures.
The monitor will attemt to contact all servers once per monitoring cycle. When
_failover_ mode is enabled, all of the failed servers must fail _failcount_
number of connection attemps before a failover is initiated.
The formula for calculating the actual number of milliseconds before failover
can start is `monitor_interval * failcount`. This means that to trigger a
failover after 10 seconds of master failure with a _monitor_interval_ of 1000
milliseconds, the value of _failcount_ must be 10.
## Example 1 - Monitor script
Here is an example shell script which sends an email to an admin when a server goes down.

View File

@ -66,6 +66,20 @@ which of them is the real write node and assigns the appropriate status for each
node. This module also supports launchable scripts on monitored events. Read the
[Monitor Common Documentation](../Monitors/Monitor-Common.md) for more details.
### Multi-master mode for MySQL Monitor
The MySQL monitor now detects complex multi-master replication
topologies. This allows the mysqlmon module to be used as a replacement
for the mmmon module. For more details, please read the
[MySQL Monitor Documentation](../Monitors/MySQL-Monitor.md).
### Failover mode for MySQL Monitor
A simple failover mode has been added to the MySQL Monitor. This mode is
aimed for two node master-slave clusters where the slave can act as a
master in case the original master fails. For more details, please read
the [MySQL Monitor Documentation](../Monitors/MySQL-Monitor.md).
## Bug fixes
[Here is a list of bugs fixed since the release of MaxScale 2.0.X.](https://jira.mariadb.org/browse/MXS-739?jql=project%20%3D%20MXS%20AND%20issuetype%20%3D%20Bug%20AND%20resolution%20in%20(Fixed%2C%20Done)%20AND%20fixVersion%20%3D%202.0.0)

View File

@ -185,6 +185,8 @@ static char *monitor_params[] =
"disable_master_role_setting",
"use_priority",
"multimaster",
"failover",
"failcount",
NULL
};

View File

@ -365,9 +365,9 @@ sigfatal_handler(int i)
GATEWAY_CONF* cnf = config_get_global_options();
fprintf(stderr, "\n\nMaxScale "MAXSCALE_VERSION" received fatal signal %d\n", i);
MXS_ERROR("Fatal: MaxScale "MAXSCALE_VERSION" received fatal signal %d. Attempting backtrace.", i);
MXS_ALERT("Fatal: MaxScale "MAXSCALE_VERSION" received fatal signal %d. Attempting backtrace.", i);
MXS_ERROR("Commit ID: %s System name: %s "
MXS_ALERT("Commit ID: %s System name: %s "
"Release string: %s",
maxscale_commit, cnf->sysname, cnf->release_string);
@ -380,7 +380,7 @@ sigfatal_handler(int i)
{
for (n = 0; n < count; n++)
{
MXS_ERROR(" %s\n", symbols[n]);
MXS_ALERT(" %s\n", symbols[n]);
}
MXS_FREE(symbols);
}

View File

@ -548,7 +548,7 @@ static bool logmanager_init_nomutex(const char* ident,
/**
* Set global variable
*/
mxs_log_enabled_priorities = MXS_LOG_ERR | MXS_LOG_NOTICE | MXS_LOG_WARNING;
mxs_log_enabled_priorities = (1 << LOG_ERR) | (1 << LOG_NOTICE) | (1 << LOG_WARNING);
/**
* Initialize filewriter data and open the log file

View File

@ -54,21 +54,6 @@ extern "C" {
#define MXS_MODULE_NAME NULL
#endif
enum mxs_log_priorities
{
MXS_LOG_EMERG = (1 << LOG_EMERG),
MXS_LOG_ALERT = (1 << LOG_ALERT),
MXS_LOG_CRIT = (1 << LOG_CRIT),
MXS_LOG_ERR = (1 << LOG_ERR),
MXS_LOG_WARNING = (1 << LOG_WARNING),
MXS_LOG_NOTICE = (1 << LOG_NOTICE),
MXS_LOG_INFO = (1 << LOG_INFO),
MXS_LOG_DEBUG = (1 << LOG_DEBUG),
MXS_LOG_MASK = (MXS_LOG_EMERG | MXS_LOG_ALERT | MXS_LOG_CRIT | MXS_LOG_ERR |
MXS_LOG_WARNING | MXS_LOG_NOTICE | MXS_LOG_INFO | MXS_LOG_DEBUG),
};
typedef enum
{
MXS_LOG_TARGET_DEFAULT = 0,
@ -93,6 +78,8 @@ extern __thread mxs_log_info_t mxs_log_tls;
/**
* Check if specified log type is enabled in general or if it is enabled
* for the current session.
*
* @param priority One of the syslog LOG_ERR, LOG_WARNING, etc. constants.
*/
#define MXS_LOG_PRIORITY_IS_ENABLED(priority) \
(((mxs_log_enabled_priorities & (1 << priority)) || \
@ -149,11 +136,20 @@ int mxs_log_message(int priority,
mxs_log_message(priority, MXS_MODULE_NAME, __FILE__, __LINE__, __func__, format, ##__VA_ARGS__)
/**
* Log an error, warning, notice, info, or debug message.
* Log an alert, error, warning, notice, info, or debug message.
*
* MXS_ALERT Not throttled To be used when the system is about to go down in flames.
* MXS_ERROR Throttled For errors.
* MXS_WARNING Throttled For warnings.
* MXS_NOTICE Not Throttled For messages deemed important, typically used during startup.
* MXS_INFO Not Throttled For information thought to be of value for investigating some problem.
* MXS_DEBUG Not Throttled For debugging messages during development. Should be removed when a
* feature is ready.
*
* @param format The printf format of the message.
* @param ... Arguments, depending on the format.
*/
#define MXS_ALERT(format, ...) MXS_LOG_MESSAGE(LOG_ALERT, format, ##__VA_ARGS__)
#define MXS_ERROR(format, ...) MXS_LOG_MESSAGE(LOG_ERR, format, ##__VA_ARGS__)
#define MXS_WARNING(format, ...) MXS_LOG_MESSAGE(LOG_WARNING, format, ##__VA_ARGS__)
#define MXS_NOTICE(format, ...) MXS_LOG_MESSAGE(LOG_NOTICE, format, ##__VA_ARGS__)

View File

@ -1,6 +1,7 @@
add_library(cache SHARED cache.c storage.c)
target_link_libraries(cache maxscale-common)
add_library(cache SHARED cache.c rules.c storage.c)
target_link_libraries(cache maxscale-common jansson)
set_target_properties(cache PROPERTIES VERSION "1.0.0")
set_target_properties(cache PROPERTIES LINK_FLAGS -Wl,-z,defs)
install_module(cache experimental)
add_subdirectory(storage)

View File

@ -14,12 +14,14 @@
#define MXS_MODULE_NAME "cache"
#include <maxscale/alloc.h>
#include <filter.h>
#include <gwdirs.h>
#include <log_manager.h>
#include <modinfo.h>
#include <modutil.h>
#include <mysql_utils.h>
#include <query_classifier.h>
#include "cache.h"
#include "rules.h"
#include "storage.h"
static char VERSION_STRING[] = "V1.0.0";
@ -90,28 +92,31 @@ FILTER_OBJECT *GetModuleObject()
typedef struct cache_config
{
cache_references_t allowed_references;
uint32_t max_resultset_rows;
uint32_t max_resultset_size;
const char* rules;
const char *storage;
const char *storage_args;
const char *storage_options;
uint32_t ttl;
uint32_t debug;
} CACHE_CONFIG;
static const CACHE_CONFIG DEFAULT_CONFIG =
{
CACHE_DEFAULT_ALLOWED_REFERENCES,
CACHE_DEFAULT_MAX_RESULTSET_ROWS,
CACHE_DEFAULT_MAX_RESULTSET_SIZE,
NULL,
NULL,
CACHE_DEFAULT_TTL
NULL,
CACHE_DEFAULT_TTL,
CACHE_DEFAULT_DEBUG
};
typedef struct cache_instance
{
const char *name;
CACHE_CONFIG config;
CACHE_RULES *rules;
CACHE_STORAGE_MODULE *module;
CACHE_STORAGE *storage;
} CACHE_INSTANCE;
@ -122,6 +127,7 @@ typedef enum cache_session_state
CACHE_EXPECTING_FIELDS, // A select has been sent, and we want more fields.
CACHE_EXPECTING_ROWS, // A select has been sent, and we want more rows.
CACHE_EXPECTING_NOTHING, // We are not expecting anything from the server.
CACHE_EXPECTING_USE_RESPONSE, // A "USE DB" was issued.
CACHE_IGNORING_RESPONSE, // We are not interested in the data received from the server.
} cache_session_state_t;
@ -152,6 +158,8 @@ typedef struct cache_session_data
CACHE_RESPONSE_STATE res; /**< The response state. */
SESSION *session; /**< The session this data is associated with. */
char key[CACHE_KEY_MAXLEN]; /**< Key storage. */
char *default_db; /**< The default database. */
char *use_db; /**< Pending default database. Needs server response. */
cache_session_state_t state;
} CACHE_SESSION_DATA;
@ -162,8 +170,9 @@ static int handle_expecting_fields(CACHE_SESSION_DATA *csdata);
static int handle_expecting_nothing(CACHE_SESSION_DATA *csdata);
static int handle_expecting_response(CACHE_SESSION_DATA *csdata);
static int handle_expecting_rows(CACHE_SESSION_DATA *csdata);
static int handle_expecting_use_response(CACHE_SESSION_DATA *csdata);
static int handle_ignoring_response(CACHE_SESSION_DATA *csdata);
static bool process_params(char **options, FILTER_PARAMETER **params, CACHE_CONFIG* config);
static bool route_using_cache(CACHE_SESSION_DATA *sdata, const GWBUF *key, GWBUF **value);
static int send_upstream(CACHE_SESSION_DATA *csdata);
@ -186,91 +195,23 @@ static void store_result(CACHE_SESSION_DATA *csdata);
*/
static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **params)
{
CACHE_INSTANCE *cinstance = NULL;
CACHE_CONFIG config = DEFAULT_CONFIG;
bool error = false;
if (process_params(options, params, &config))
{
CACHE_RULES *rules = NULL;
for (int i = 0; params[i]; ++i)
if (config.rules)
{
const FILTER_PARAMETER *param = params[i];
if (strcmp(param->name, "allowed_references") == 0)
{
if (strcmp(param->value, "qualified") == 0)
{
config.allowed_references = CACHE_REFERENCES_QUALIFIED;
}
else if (strcmp(param->value, "any") == 0)
{
config.allowed_references = CACHE_REFERENCES_ANY;
rules = cache_rules_load(config.rules, config.debug);
}
else
{
MXS_ERROR("Unknown value '%s' for parameter '%s'.", param->value, param->name);
error = true;
}
}
else if (strcmp(param->name, "max_resultset_rows") == 0)
{
int v = atoi(param->value);
if (v > 0)
{
config.max_resultset_rows = v;
}
else
{
config.max_resultset_rows = CACHE_DEFAULT_MAX_RESULTSET_ROWS;
}
}
else if (strcmp(param->name, "max_resultset_size") == 0)
{
int v = atoi(param->value);
if (v > 0)
{
config.max_resultset_size = v * 1024;
}
else
{
MXS_ERROR("The value of the configuration entry '%s' must "
"be an integer larger than 0.", param->name);
error = true;
}
}
else if (strcmp(param->name, "storage_args") == 0)
{
config.storage_args = param->value;
}
else if (strcmp(param->name, "storage") == 0)
{
config.storage = param->value;
}
else if (strcmp(param->name, "ttl") == 0)
{
int v = atoi(param->value);
if (v > 0)
{
config.ttl = v;
}
else
{
MXS_ERROR("The value of the configuration entry '%s' must "
"be an integer larger than 0.", param->name);
error = true;
}
}
else if (!filter_standard_parameter(params[i]->name))
{
MXS_ERROR("Unknown configuration entry '%s'.", param->name);
error = true;
}
rules = cache_rules_create(config.debug);
}
CACHE_INSTANCE *cinstance = NULL;
if (!error)
if (rules)
{
if ((cinstance = MXS_CALLOC(1, sizeof(CACHE_INSTANCE))) != NULL)
{
@ -284,6 +225,7 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
{
cinstance->name = name;
cinstance->config = config;
cinstance->rules = rules;
cinstance->module = module;
cinstance->storage = storage;
@ -291,7 +233,8 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
}
else
{
MXS_ERROR("Could not create storage instance for %s.", name);
MXS_ERROR("Could not create storage instance for '%s'.", name);
cache_rules_free(rules);
cache_storage_close(module);
MXS_FREE(cinstance);
cinstance = NULL;
@ -299,15 +242,13 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
}
else
{
MXS_ERROR("Could not load cache storage module %s.", name);
MXS_ERROR("Could not load cache storage module '%s'.", name);
cache_rules_free(rules);
MXS_FREE(cinstance);
cinstance = NULL;
}
}
}
else
{
cinstance = NULL;
}
return (FILTER*)cinstance;
@ -417,10 +358,46 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *data)
cache_response_state_reset(&csdata->res);
csdata->state = CACHE_IGNORING_RESPONSE;
// TODO: This returns the wrong result if GWBUF_LENGTH(packet) is < 5.
if (modutil_is_SQL(packet))
if (gwbuf_length(packet) > MYSQL_HEADER_LEN + 1) // We need at least a packet with a type.
{
packet = gwbuf_make_contiguous(packet);
uint8_t header[MYSQL_HEADER_LEN + 1];
gwbuf_copy_data(packet, 0, sizeof(header), header);
switch ((int)MYSQL_GET_COMMAND(header))
{
case MYSQL_COM_INIT_DB:
{
ss_dassert(!csdata->use_db);
size_t len = MYSQL_GET_PACKET_LEN(header) - 1; // Remove the command byte.
csdata->use_db = MXS_MALLOC(len + 1);
if (csdata->use_db)
{
uint8_t *use_db = (uint8_t*)csdata->use_db;
gwbuf_copy_data(packet, MYSQL_HEADER_LEN + 1, len, use_db);
csdata->use_db[len] = 0;
csdata->state = CACHE_EXPECTING_USE_RESPONSE;
}
else
{
// Memory allocation failed. We need to remove the default database to
// prevent incorrect cache entries, since we won't know what the
// default db is. But we only need to do that if "USE <db>" really
// succeeds. The right thing will happen by itself in
// handle_expecting_use_response(); if OK is returned, default_db will
// become NULL, if ERR, default_db will not be changed.
}
}
break;
case MYSQL_COM_QUERY:
{
GWBUF *tmp = gwbuf_make_contiguous(packet);
if (tmp)
{
packet = tmp;
// We do not care whether the query was fully parsed or not.
// If a query cannot be fully parsed, the worst thing that can
@ -428,6 +405,10 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *data)
// possible.
if (qc_get_operation(packet) == QUERY_OP_SELECT)
{
if (cache_rules_should_store(cinstance->rules, csdata->default_db, packet))
{
if (cache_rules_should_use(cinstance->rules, csdata->session))
{
GWBUF *result;
use_default = !route_using_cache(csdata, packet, &result);
@ -449,6 +430,19 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *data)
}
}
}
else
{
csdata->state = CACHE_IGNORING_RESPONSE;
}
}
}
}
break;
default:
break;
}
}
if (use_default)
{
@ -519,6 +513,10 @@ static int clientReply(FILTER *instance, void *sdata, GWBUF *data)
rv = handle_expecting_rows(csdata);
break;
case CACHE_EXPECTING_USE_RESPONSE:
rv = handle_expecting_use_response(csdata);
break;
case CACHE_IGNORING_RESPONSE:
rv = handle_ignoring_response(csdata);
break;
@ -583,12 +581,32 @@ static CACHE_SESSION_DATA *cache_session_data_create(CACHE_INSTANCE *instance,
CACHE_SESSION_DATA *data = (CACHE_SESSION_DATA*)MXS_CALLOC(1, sizeof(CACHE_SESSION_DATA));
if (data)
{
char *default_db = NULL;
ss_dassert(session->client_dcb);
ss_dassert(session->client_dcb->data);
MYSQL_session *mysql_session = (MYSQL_session*)session->client_dcb->data;
if (mysql_session->db[0] != 0)
{
default_db = MXS_STRDUP(mysql_session->db);
}
if ((mysql_session->db[0] == 0) || default_db)
{
data->instance = instance;
data->api = instance->module->api;
data->storage = instance->storage;
data->session = session;
data->state = CACHE_EXPECTING_NOTHING;
data->default_db = default_db;
}
else
{
MXS_FREE(data);
data = NULL;
}
}
return data;
@ -603,6 +621,8 @@ static void cache_session_data_free(CACHE_SESSION_DATA* data)
{
if (data)
{
ss_dassert(!data->use_db);
MXS_FREE(data->default_db);
MXS_FREE(data);
}
}
@ -704,7 +724,7 @@ static int handle_expecting_response(CACHE_SESSION_DATA *csdata)
store_result(csdata);
rv = send_upstream(csdata);
csdata->state = CACHE_EXPECTING_NOTHING;
csdata->state = CACHE_IGNORING_RESPONSE;
break;
case 0xfb: // GET_MORE_CLIENT_DATA/SEND_MORE_CLIENT_DATA
@ -818,6 +838,58 @@ static int handle_expecting_rows(CACHE_SESSION_DATA *csdata)
return rv;
}
/**
* Called when a response to a "USE db" is received from the server.
*
* @param csdata The cache session data.
*/
static int handle_expecting_use_response(CACHE_SESSION_DATA *csdata)
{
ss_dassert(csdata->state == CACHE_EXPECTING_USE_RESPONSE);
ss_dassert(csdata->res.data);
int rv = 1;
size_t buflen = gwbuf_length(csdata->res.data);
if (buflen >= MYSQL_HEADER_LEN + 1) // We need the command byte.
{
uint8_t command;
gwbuf_copy_data(csdata->res.data, MYSQL_HEADER_LEN, 1, &command);
switch (command)
{
case 0x00: // OK
// In case csdata->use_db could not be allocated in routeQuery(), we will
// in fact reset the default db here. That's ok as it will prevent broken
// entries in the cache.
MXS_FREE(csdata->default_db);
csdata->default_db = csdata->use_db;
csdata->use_db = NULL;
break;
case 0xff: // ERR
MXS_FREE(csdata->use_db);
csdata->use_db = NULL;
break;
default:
MXS_ERROR("\"USE %s\" received unexpected server response %d.",
csdata->use_db ? csdata->use_db : "<db>", command);
MXS_FREE(csdata->default_db);
MXS_FREE(csdata->use_db);
csdata->default_db = NULL;
csdata->use_db = NULL;
}
rv = send_upstream(csdata);
csdata->state = CACHE_IGNORING_RESPONSE;
}
return rv;
}
/**
* Called when all data from the server is ignored.
*
@ -831,6 +903,125 @@ static int handle_ignoring_response(CACHE_SESSION_DATA *csdata)
return send_upstream(csdata);
}
/**
* Processes the cache params
*
* @param options Options as passed to the filter.
* @param params Parameters as passed to the filter.
* @param config Pointer to config instance where params will be stored.
*
* @return True if all parameters could be processed, false otherwise.
*/
static bool process_params(char **options, FILTER_PARAMETER **params, CACHE_CONFIG* config)
{
bool error = false;
for (int i = 0; params[i]; ++i)
{
const FILTER_PARAMETER *param = params[i];
if (strcmp(param->name, "max_resultset_rows") == 0)
{
int v = atoi(param->value);
if (v > 0)
{
config->max_resultset_rows = v;
}
else
{
config->max_resultset_rows = CACHE_DEFAULT_MAX_RESULTSET_ROWS;
}
}
else if (strcmp(param->name, "max_resultset_size") == 0)
{
int v = atoi(param->value);
if (v > 0)
{
config->max_resultset_size = v * 1024;
}
else
{
MXS_ERROR("The value of the configuration entry '%s' must "
"be an integer larger than 0.", param->name);
error = true;
}
}
else if (strcmp(param->name, "rules") == 0)
{
if (*param->value == '/')
{
config->rules = MXS_STRDUP(param->value);
}
else
{
const char *datadir = get_datadir();
size_t len = strlen(datadir) + 1 + strlen(param->value) + 1;
char *rules = MXS_MALLOC(len);
if (rules)
{
sprintf(rules, "%s/%s", datadir, param->value);
config->rules = rules;
}
}
if (!config->rules)
{
error = true;
}
}
else if (strcmp(param->name, "storage_options") == 0)
{
config->storage_options = param->value;
}
else if (strcmp(param->name, "storage") == 0)
{
config->storage = param->value;
}
else if (strcmp(param->name, "ttl") == 0)
{
int v = atoi(param->value);
if (v > 0)
{
config->ttl = v;
}
else
{
MXS_ERROR("The value of the configuration entry '%s' must "
"be an integer larger than 0.", param->name);
error = true;
}
}
else if (strcmp(param->name, "debug") == 0)
{
int v = atoi(param->value);
if ((v >= CACHE_DEBUG_MIN) && (v <= CACHE_DEBUG_MAX))
{
config->debug = v;
}
else
{
MXS_ERROR("The value of the configuration entry '%s' must "
"be between %d and %d, inclusive.",
param->name, CACHE_DEBUG_MIN, CACHE_DEBUG_MAX);
error = true;
}
}
else if (!filter_standard_parameter(params[i]->name))
{
MXS_ERROR("Unknown configuration entry '%s'.", param->name);
error = true;
}
}
return !error;
}
/**
* Route a query via the cache.
*
@ -843,7 +1034,7 @@ static bool route_using_cache(CACHE_SESSION_DATA *csdata,
const GWBUF *query,
GWBUF **value)
{
cache_result_t result = csdata->api->getKey(csdata->storage, query, csdata->key);
cache_result_t result = csdata->api->getKey(csdata->storage, csdata->default_db, query, csdata->key);
if (result == CACHE_RESULT_OK)
{
@ -883,7 +1074,11 @@ static void store_result(CACHE_SESSION_DATA *csdata)
{
ss_dassert(csdata->res.data);
csdata->res.data = gwbuf_make_contiguous(csdata->res.data);
GWBUF *data = gwbuf_make_contiguous(csdata->res.data);
if (data)
{
csdata->res.data = data;
cache_result_t result = csdata->api->putValue(csdata->storage,
csdata->key,
@ -893,4 +1088,5 @@ static void store_result(CACHE_SESSION_DATA *csdata)
{
MXS_ERROR("Could not store cache item.");
}
}
}

View File

@ -15,19 +15,24 @@
#include <limits.h>
#define CACHE_DEBUG_NONE 0
#define CACHE_DEBUG_MATCHING 1
#define CACHE_DEBUG_NON_MATCHING 2
#define CACHE_DEBUG_USE 4
#define CACHE_DEBUG_NON_USE 8
typedef enum cache_references
{
CACHE_REFERENCES_ANY, // select * from tbl;
CACHE_REFERENCES_QUALIFIED // select * from db.tbl;
} cache_references_t;
#define CACHE_DEBUG_RULES (CACHE_DEBUG_MATCHING | CACHE_DEBUG_NON_MATCHING)
#define CACHE_DEBUG_USAGE (CACHE_DEBUG_USE | CACHE_DEBUG_NON_USE)
#define CACHE_DEBUG_MIN CACHE_DEBUG_NONE
#define CACHE_DEBUG_MAX (CACHE_DEBUG_RULES | CACHE_DEBUG_USAGE)
#define CACHE_DEFAULT_ALLOWED_REFERENCES CACHE_REFERENCES_QUALIFIED
// Count
#define CACHE_DEFAULT_MAX_RESULTSET_ROWS UINT_MAX
// Bytes
#define CACHE_DEFAULT_MAX_RESULTSET_SIZE 64 * 1024
// Seconds
#define CACHE_DEFAULT_TTL 10
// Integer value
#define CACHE_DEFAULT_DEBUG 0
#endif

View File

@ -53,7 +53,7 @@ typedef struct cache_storage_api
* @param name The name of the cache instance.
* @param ttl Time to live; number of seconds the value is valid.
* @param argc The number of elements in the argv array.
* @param argv Array of arguments, as passed in the `storage_args` parameter
* @param argv Array of arguments, as passed in the `storage_options` parameter
* in the cache section in the MaxScale configuration file.
* @return A new cache instance, or NULL if the instance could not be
* created.
@ -79,6 +79,7 @@ typedef struct cache_storage_api
* @return CACHE_RESULT_OK if a key was created, otherwise some error code.
*/
cache_result_t (*getKey)(CACHE_STORAGE* storage,
const char* default_db,
const GWBUF* query,
char* key);
/**

1108
server/modules/filter/cache/rules.c vendored Normal file

File diff suppressed because it is too large Load Diff

71
server/modules/filter/cache/rules.h vendored Normal file
View File

@ -0,0 +1,71 @@
#ifndef _MAXSCALE_FILTER_CACHE_RULES_H
#define _MAXSCALE_FILTER_CACHE_RULES_H
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <stdbool.h>
#include <jansson.h>
#include <buffer.h>
#include <session.h>
#include <maxscale_pcre2.h>
typedef enum cache_rule_attribute
{
CACHE_ATTRIBUTE_COLUMN,
CACHE_ATTRIBUTE_DATABASE,
CACHE_ATTRIBUTE_QUERY,
CACHE_ATTRIBUTE_TABLE,
CACHE_ATTRIBUTE_USER,
} cache_rule_attribute_t;
typedef enum cache_rule_op
{
CACHE_OP_EQ,
CACHE_OP_NEQ,
CACHE_OP_LIKE,
CACHE_OP_UNLIKE
} cache_rule_op_t;
typedef struct cache_rule
{
cache_rule_attribute_t attribute; // What attribute is evalued.
cache_rule_op_t op; // What operator is used.
char *value; // The value from the rule file.
struct
{
pcre2_code* code;
pcre2_match_data* data;
} regexp; // Regexp data, only for CACHE_OP_[LIKE|UNLIKE].
uint32_t debug; // The debug level.
struct cache_rule *next;
} CACHE_RULE;
typedef struct cache_rules
{
uint32_t debug; // The debug level.
CACHE_RULE *store_rules; // The rules for when to store data to the cache.
CACHE_RULE *use_rules; // The rules for when to use data from the cache.
} CACHE_RULES;
CACHE_RULES *cache_rules_create(uint32_t debug);
void cache_rules_free(CACHE_RULES *rules);
CACHE_RULES *cache_rules_load(const char *path, uint32_t debug);
bool cache_rules_should_store(CACHE_RULES *rules, const char *default_db, const GWBUF* query);
bool cache_rules_should_use(CACHE_RULES *rules, const SESSION *session);
#endif

View File

@ -15,10 +15,21 @@
#include <openssl/sha.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <algorithm>
#include <set>
#include <rocksdb/env.h>
#include <maxscale/alloc.h>
#include <gwdirs.h>
extern "C"
{
// TODO: Add extern "C" to modutil.h
#include <modutil.h>
}
#include <query_classifier.h>
#include "rocksdbinternals.h"
using std::for_each;
using std::set;
using std::string;
using std::unique_ptr;
@ -28,13 +39,52 @@ namespace
string u_storageDirectory;
const size_t ROCKSDB_KEY_LENGTH = SHA512_DIGEST_LENGTH;
const size_t ROCKSDB_KEY_LENGTH = 2 * SHA512_DIGEST_LENGTH;
#if ROCKSDB_KEY_LENGTH > CACHE_KEY_MAXLEN
#error storage_rocksdb key is too long.
#endif
// See https://github.com/facebook/rocksdb/wiki/Basic-Operations#thread-pools
// These figures should perhaps depend upon the number of cache instances.
const size_t ROCKSDB_N_LOW_THREADS = 2;
const size_t ROCKSDB_N_HIGH_THREADS = 1;
struct StorageRocksDBVersion
{
uint8_t major;
uint8_t minor;
uint8_t correction;
};
const uint8_t STORAGE_ROCKSDB_MAJOR = 0;
const uint8_t STORAGE_ROCKSDB_MINOR = 1;
const uint8_t STORAGE_ROCKSDB_CORRECTION = 0;
const StorageRocksDBVersion STORAGE_ROCKSDB_VERSION =
{
STORAGE_ROCKSDB_MAJOR,
STORAGE_ROCKSDB_MINOR,
STORAGE_ROCKSDB_CORRECTION
};
string toString(const StorageRocksDBVersion& version)
{
string rv;
rv += "{ ";
rv += std::to_string(version.major);
rv += ", ";
rv += std::to_string(version.minor);
rv += ", ";
rv += std::to_string(version.correction);
rv += " }";
return rv;
}
const char STORAGE_ROCKSDB_VERSION_KEY[] = "MaxScale_Storage_RocksDB_Version";
}
//private
@ -99,39 +149,162 @@ RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc
options.env = rocksdb::Env::Default();
options.max_background_compactions = ROCKSDB_N_LOW_THREADS;
options.max_background_flushes = ROCKSDB_N_HIGH_THREADS;
options.create_if_missing = true;
rocksdb::DBWithTTL* pDb;
rocksdb::Status status = rocksdb::DBWithTTL::Open(options, path, &pDb, ttl);
rocksdb::DBWithTTL* pDb;
rocksdb::Status status;
rocksdb::Slice key(STORAGE_ROCKSDB_VERSION_KEY);
do
{
// Try to open existing.
options.create_if_missing = false;
options.error_if_exists = false;
status = rocksdb::DBWithTTL::Open(options, path, &pDb, ttl);
if (status.IsInvalidArgument()) // Did not exist
{
MXS_NOTICE("Database \"%s\" does not exist, creating.", path.c_str());
options.create_if_missing = true;
options.error_if_exists = true;
status = rocksdb::DBWithTTL::Open(options, path, &pDb, ttl);
if (status.ok())
{
MXS_NOTICE("Database \"%s\" created, storing version %s into it.",
path.c_str(), toString(STORAGE_ROCKSDB_VERSION).c_str());
rocksdb::Slice value(reinterpret_cast<const char*>(&STORAGE_ROCKSDB_VERSION),
sizeof(STORAGE_ROCKSDB_VERSION));
status = pDb->Put(rocksdb::WriteOptions(), key, value);
if (!status.ok())
{
MXS_ERROR("Could not store version information to created RocksDB database \"%s\". "
"You may need to delete the database and retry. RocksDB error: %s",
path.c_str(),
status.ToString().c_str());
}
}
}
}
while (status.IsInvalidArgument());
RocksDBStorage* pStorage = nullptr;
if (status.ok())
{
std::string value;
status = pDb->Get(rocksdb::ReadOptions(), key, &value);
if (status.ok())
{
const StorageRocksDBVersion* pVersion =
reinterpret_cast<const StorageRocksDBVersion*>(value.data());
// When the version is bumped, it needs to be decided what if any
// backward compatibility is provided. After all, it's a cache, so
// you should be able to delete it at any point and pay a small
// price while the cache is rebuilt.
if ((pVersion->major == STORAGE_ROCKSDB_MAJOR) &&
(pVersion->minor == STORAGE_ROCKSDB_MINOR) &&
(pVersion->correction == STORAGE_ROCKSDB_CORRECTION))
{
MXS_NOTICE("Version of \"%s\" is %s, version of storage_rocksdb is %s.",
path.c_str(),
toString(*pVersion).c_str(),
toString(STORAGE_ROCKSDB_VERSION).c_str());
unique_ptr<rocksdb::DBWithTTL> sDb(pDb);
pStorage = new RocksDBStorage(sDb, zName, path, ttl);
}
else
{
MXS_ERROR("Could not open RocksDB database %s using path %s: %s",
zName, path.c_str(), status.ToString().c_str());
MXS_ERROR("Version of RocksDB database \"%s\" is %s, while version required "
"is %s. You need to delete the database and restart.",
path.c_str(),
toString(*pVersion).c_str(),
toString(STORAGE_ROCKSDB_VERSION).c_str());
delete pDb;
}
}
else
{
MXS_ERROR("Could not read version information from RocksDB database %s. "
"You may need to delete the database and retry. RocksDB error: %s",
path.c_str(),
status.ToString().c_str());
delete pDb;
}
}
else
{
MXS_ERROR("Could not open/initialize RocksDB database %s. RocksDB error: %s",
path.c_str(), status.ToString().c_str());
}
return pStorage;
}
cache_result_t RocksDBStorage::getKey(const GWBUF* pQuery, char* pKey)
cache_result_t RocksDBStorage::getKey(const char* zDefaultDB, const GWBUF* pQuery, char* pKey)
{
// ss_dassert(gwbuf_is_contiguous(pQuery));
const uint8_t* pData = static_cast<const uint8_t*>(GWBUF_DATA(pQuery));
size_t len = MYSQL_GET_PACKET_LEN(pData) - 1; // Subtract 1 for packet type byte.
ss_dassert(GWBUF_IS_CONTIGUOUS(pQuery));
const uint8_t* pSql = &pData[5]; // Symbolic constant for 5?
int n;
bool fullnames = true;
char** pzTables = qc_get_table_names(const_cast<GWBUF*>(pQuery), &n, fullnames);
set<string> dbs; // Elements in set are sorted.
for (int i = 0; i < n; ++i)
{
char *zTable = pzTables[i];
char *zDot = strchr(zTable, '.');
if (zDot)
{
*zDot = 0;
dbs.insert(zTable);
}
else if (zDefaultDB)
{
// If zDefaultDB is NULL, then there will be a table for which we
// do not know the database. However, that will fail in the server,
// so nothing will be stored.
dbs.insert(zDefaultDB);
}
MXS_FREE(zTable);
}
MXS_FREE(pzTables);
// dbs now contain each accessed database in sorted order. Now copy them to a single string.
string tag;
for_each(dbs.begin(), dbs.end(), [&tag](const string& db) { tag.append(db); });
memset(pKey, 0, CACHE_KEY_MAXLEN);
SHA512(pSql, len, reinterpret_cast<unsigned char*>(pKey));
const unsigned char* pData;
// We store the databases in the first half of the key. That will ensure that
// identical queries targeting different default databases will not clash.
// This will also mean that entries related to the same databases will
// be placed near each other.
pData = reinterpret_cast<const unsigned char*>(tag.data());
SHA512(pData, tag.length(), reinterpret_cast<unsigned char*>(pKey));
char *pSql;
int length;
modutil_extract_SQL(const_cast<GWBUF*>(pQuery), &pSql, &length);
// Then we store the query itself in the second half of the key.
pData = reinterpret_cast<const unsigned char*>(pSql);
SHA512(pData, length, reinterpret_cast<unsigned char*>(pKey) + SHA512_DIGEST_LENGTH);
return CACHE_RESULT_OK;
}

View File

@ -27,7 +27,7 @@ public:
static RocksDBStorage* Create(const char* zName, uint32_t ttl, int argc, char* argv[]);
~RocksDBStorage();
cache_result_t getKey(const GWBUF* pQuery, char* pKey);
cache_result_t getKey(const char* zDefaultDB, const GWBUF* pQuery, char* pKey);
cache_result_t getValue(const char* pKey, GWBUF** ppResult);
cache_result_t putValue(const char* pKey, const GWBUF* pValue);

View File

@ -55,10 +55,12 @@ void freeInstance(CACHE_STORAGE* pInstance)
}
cache_result_t getKey(CACHE_STORAGE* pStorage,
const char* zDefaultDB,
const GWBUF* pQuery,
char* pKey)
{
ss_dassert(pStorage);
// zDefaultDB may be NULL.
ss_dassert(pQuery);
ss_dassert(pKey);
@ -66,7 +68,7 @@ cache_result_t getKey(CACHE_STORAGE* pStorage,
try
{
result = reinterpret_cast<RocksDBStorage*>(pStorage)->getKey(pQuery, pKey);
result = reinterpret_cast<RocksDBStorage*>(pStorage)->getKey(zDefaultDB, pQuery, pKey);
}
catch (const std::bad_alloc&)
{

View File

@ -50,6 +50,8 @@
* @endverbatim
*/
#define MYSQLMON_DEFAULT_FAILCOUNT 5
/**
* The handle for an instance of a MySQL Monitor module
*/
@ -72,6 +74,9 @@ typedef struct
char* script; /*< Script to call when state changes occur on servers */
bool events[MAX_MONITOR_EVENT]; /*< enabled events */
HASHTABLE *server_info; /**< Contains server specific information */
bool failover; /**< If simple failover is enabled */
int failcount; /**< How many monitoring cycles servers must be
down before failover is initiated */
} MYSQL_MONITOR;
#endif

View File

@ -273,6 +273,8 @@ startMonitor(MONITOR *monitor, const CONFIG_PARAMETER* params)
handle->script = NULL;
handle->multimaster = false;
handle->mysql51_replication = false;
handle->failover = false;
handle->failcount = MYSQLMON_DEFAULT_FAILCOUNT;
memset(handle->events, false, sizeof(handle->events));
spinlock_init(&handle->lock);
}
@ -295,6 +297,19 @@ startMonitor(MONITOR *monitor, const CONFIG_PARAMETER* params)
{
handle->multimaster = config_truth_value(params->value);
}
else if (!strcmp(params->name, "failover"))
{
handle->failover = config_truth_value(params->value);
}
else if (!strcmp(params->name, "failcount"))
{
handle->failcount = atoi(params->value);
if (handle->failcount <= 0)
{
MXS_ERROR("[%s] Invalid value for 'failcount': %s", monitor->name, params->value);
error = true;
}
}
else if (!strcmp(params->name, "script"))
{
if (externcmd_can_execute(params->value))
@ -352,6 +367,7 @@ startMonitor(MONITOR *monitor, const CONFIG_PARAMETER* params)
hashtable_free(handle->server_info);
MXS_FREE(handle->script);
MXS_FREE(handle);
handle = NULL;
}
else if (thread_start(&handle->thread, monitorMain, monitor) == NULL)
{
@ -1021,6 +1037,80 @@ void find_graph_cycles(MYSQL_MONITOR *handle, MONITOR_SERVERS *database, int nse
}
}
/**
* @brief Check whether failover conditions have been met
*
* This function checks whether all the conditions to trigger a failover have
* been met. For a failover to happen, only one server must be available and
* other servers must have passed the configured tolerance level of failures.
*
* @param handle Monitor instance
* @param db Monitor servers
*
* @return True if failover is required
*/
bool failover_required(MYSQL_MONITOR *handle, MONITOR_SERVERS *db)
{
int candidates = 0;
while (db)
{
if (SERVER_IS_RUNNING(db->server))
{
candidates++;
MYSQL_SERVER_INFO *server_info = hashtable_fetch(handle->server_info, db->server->unique_name);
if (server_info->read_only || candidates > 1)
{
return false;
}
}
else if (db->mon_err_count < handle->failcount)
{
return false;
}
db = db->next;
}
return candidates == 1;
}
/**
* @brief Initiate simple failover
*
* This function does the actual failover by assigning the last remaining server
* the master status and setting all other servers into maintenance mode. By
* setting the servers into maintenance mode, we prevent any possible conflicts
* when the failed servers come back up.
*
* @param handle Monitor instance
* @param db Monitor servers
*/
void do_failover(MYSQL_MONITOR *handle, MONITOR_SERVERS *db)
{
while (db)
{
if (SERVER_IS_RUNNING(db->server))
{
if (!SERVER_IS_MASTER(db->server))
{
MXS_WARNING("Failover initiated, server '%s' is now the master. "
"All other servers are set into maintenance mode.",
db->server->unique_name);
}
monitor_set_pending_status(db, SERVER_MASTER);
monitor_clear_pending_status(db, SERVER_SLAVE);
}
else
{
monitor_set_pending_status(db, SERVER_MAINT);
}
db = db->next;
}
}
/**
* The entry point for the monitoring module thread
*
@ -1296,6 +1386,17 @@ monitorMain(void *arg)
ptr = ptr->next;
}
/** Now that all servers have their status correctly set, we can check
if we need to do a failover */
if (handle->failover)
{
if (failover_required(handle, mon->databases))
{
/** Other servers have died, initiate a failover to the last remaining server */
do_failover(handle, mon->databases);
}
}
ptr = mon->databases;
monitor_event_t evtype;
while (ptr)

View File

@ -1511,6 +1511,12 @@ backend_ref_t *get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb)
*
* Calls hang-up function for DCB if it is not both running and in
* master/slave/joined/ndb role. Called by DCB's callback routine.
*
* @param dcb DCB relating to a backend server
* @param reason The reason for the state change
* @param data Data is a backend reference structure belonging to this router
*
* @return 1 for success, 0 for failure
*/
int router_handle_state_switch(DCB *dcb, DCB_REASON reason, void *data)
{
@ -1668,6 +1674,14 @@ static bool rwsplit_process_router_options(ROUTER_INSTANCE *router,
return success;
}
/**
* @brief Handle an error reply for a client
*
* @param ses Session
* @param rses Router session
* @param backend_dcb DCB for the backend server that has failed
* @param errmsg GWBUF containing the error message
*/
static void handle_error_reply_client(SESSION *ses, ROUTER_CLIENT_SES *rses,
DCB *backend_dcb, GWBUF *errmsg)
{
@ -1801,6 +1815,13 @@ return_succp:
return succp;
}
/**
* @brief Calculate the number of backend servers
*
* @param inst Router instance
*
* @return int - count of servers
*/
static int router_get_servercount(ROUTER_INSTANCE *inst)
{
int router_nservers = 0;
@ -1814,6 +1835,16 @@ static int router_get_servercount(ROUTER_INSTANCE *inst)
return router_nservers;
}
/**
* @brief Calculate whether we have enough servers to route a query
*
* @param p_rses Router session
* @param min_nsrv Minimum number of servers that is sufficient
* @param nsrv Actual number of servers
* @param router Router instance
*
* @return bool - whether enough, side effect is error logging
*/
static bool have_enough_servers(ROUTER_CLIENT_SES **p_rses, const int min_nsrv,
int router_nsrv, ROUTER_INSTANCE *router)
{

View File

@ -60,8 +60,26 @@
*/
/* This could be placed in the protocol, with a new API entry point
* It is certainly MySQL specific.
* */
* It is certainly MySQL specific. Packet types are DB specific, but can be
* assumed to be enums, which can be handled as integers without knowing
* which DB is involved until the packet type needs to be interpreted.
*
*/
/**
* @brief Determine packet type
*
* Examine the packet in the buffer to extract the type, if possible. At the
* same time set the second parameter to indicate whether the packet was
* empty.
*
* It is assumed that the packet length and type are contained within a single
* buffer, the one indicated by the first parameter.
*
* @param querybuf Buffer containing the packet
* @param non_empty_packet bool indicating whether the packet is non-empty
* @return The packet type, or MYSQL_COM_UNDEFINED; also the second parameter is set
*/
int
determine_packet_type(GWBUF *querybuf, bool *non_empty_packet)
{
@ -85,6 +103,17 @@ determine_packet_type(GWBUF *querybuf, bool *non_empty_packet)
/*
* This appears to be MySQL specific
*/
/**
* @brief Determine if a packet contains a SQL query
*
* Packet type tells us this, but in a DB specific way. This function is
* provided so that code that is not DB specific can find out whether a packet
* contains a SQL query. Clearly, to be effective different functions must be
* called for different DB types.
*
* @param packet_type Type of packet (integer)
* @return bool indicating whether packet contains a SQL query
*/
bool
is_packet_a_query(int packet_type)
{
@ -94,6 +123,17 @@ is_packet_a_query(int packet_type)
/*
* This looks MySQL specific
*/
/**
* @brief Determine if a packet contains a one way message
*
* Packet type tells us this, but in a DB specific way. This function is
* provided so that code that is not DB specific can find out whether a packet
* contains a one way messsage. Clearly, to be effective different functions must be
* called for different DB types.
*
* @param packet_type Type of packet (integer)
* @return bool indicating whether packet contains a one way message
*/
bool
is_packet_a_one_way_message(int packet_type)
{
@ -105,6 +145,17 @@ is_packet_a_one_way_message(int packet_type)
* This one is problematic because it is MySQL specific, but also router
* specific.
*/
/**
* @brief Log the transaction status
*
* The router session and the query buffer are used to log the transaction
* status, along with the query type (which is a generic description that
* should be usable across all DB types).
*
* @param rses Router session
* @param querybuf Query buffer
* @param qtype Query type
*/
void
log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t qtype)
{
@ -140,6 +191,23 @@ log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t
* utility to convert packet type to a string. The aim is for most of this
* code to remain as part of the router.
*/
/**
* @brief Operations to be carried out if request is for all backend servers
*
* If the choice of sending to all backends is in conflict with other bit
* settings in route_target, then error messages are written to the log.
*
* Otherwise, the function route_session_write is called to carry out the
* actual routing.
*
* @param route_target Bit map indicating where packet should be routed
* @param inst Router instance
* @param rses Router session
* @param querybuf Query buffer containing packet
* @param packet_type Integer (enum) indicating type of packet
* @param qtype Query type
* @return bool indicating whether the session can continue
*/
bool
handle_target_is_all(route_target_t route_target,
ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
@ -218,6 +286,15 @@ handle_target_is_all(route_target_t route_target,
}
/* This is MySQL specific */
/**
* @brief Write an error message to the log indicating failure
*
* Used when an attempt to lock the router session fails.
*
* @param querybuf Query buffer containing packet
* @param packet_type Integer (enum) indicating type of packet
* @param qtype Query type
*/
void
session_lock_failure_handling(GWBUF *querybuf, int packet_type, qc_query_type_t qtype)
{
@ -237,6 +314,14 @@ session_lock_failure_handling(GWBUF *querybuf, int packet_type, qc_query_type_t
/*
* Probably MySQL specific because of modutil function
*/
/**
* @brief Write an error message to the log for closed session
*
* This happens if a request is received for a session that is already
* closing down.
*
* @param querybuf Query buffer containing packet
*/
void closed_session_reply(GWBUF *querybuf)
{
uint8_t* data = GWBUF_DATA(querybuf);
@ -254,6 +339,15 @@ void closed_session_reply(GWBUF *querybuf)
/*
* Probably MySQL specific because of modutil function
*/
/**
* @brief First step to handle request in a live session
*
* Used when a request is about to be routed. Note that the query buffer is
* passed by name and is likely to be modified by this function.
*
* @param querybuf Query buffer containing packet
* @param rses Router session
*/
void live_session_reply(GWBUF **querybuf, ROUTER_CLIENT_SES *rses)
{
GWBUF *tmpbuf = *querybuf;
@ -276,6 +370,16 @@ void live_session_reply(GWBUF **querybuf, ROUTER_CLIENT_SES *rses)
/*
* Uses MySQL specific mechanisms
*/
/**
* @brief Write an error message to the log for session lock failure
*
* This happens when processing a client reply and the session cannot be
* locked.
*
* @param rses Router session
* @param buf Query buffer containing reply data
* @param dcb The backend DCB that sent the reply
*/
void print_error_packet(ROUTER_CLIENT_SES *rses, GWBUF *buf, DCB *dcb)
{
#if defined(SS_DEBUG)
@ -326,6 +430,15 @@ void print_error_packet(ROUTER_CLIENT_SES *rses, GWBUF *buf, DCB *dcb)
/*
* Uses MySQL specific mechanisms
*/
/**
* @brief Check the reply from a backend server to a session command
*
* If the reply is an error, a message may be logged.
*
* @param writebuf Query buffer containing reply data
* @param scur Session cursor
* @param bref Router session data for a backend server
*/
void check_session_command_reply(GWBUF *writebuf, sescmd_cursor_t *scur, backend_ref_t *bref)
{
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_ERR) &&
@ -349,7 +462,7 @@ void check_session_command_reply(GWBUF *writebuf, sescmd_cursor_t *scur, backend
}
/**
* If session command cursor is passive, sends the command to backend for
* @brief If session command cursor is passive, sends the command to backend for
* execution.
*
* Returns true if command was sent or added successfully to the queue.
@ -357,6 +470,9 @@ void check_session_command_reply(GWBUF *writebuf, sescmd_cursor_t *scur, backend
* commands.
*
* Router session must be locked.
*
* @param backend_ref Router session backend database data
* @return bool - true for success, false for failure
*/
/*
* Uses MySQL specific values in the large switch statement, although it

View File

@ -424,6 +424,14 @@ return_succp:
return succp;
}
/**
* @brief Function to hash keys in read-write split router
*
* Used to store information about temporary tables.
*
* @param key key to be hashed, actually a character string
* @result the hash value integer
*/
int rwsplit_hashkeyfun(const void *key)
{
if (key == NULL)
@ -441,6 +449,15 @@ int rwsplit_hashkeyfun(const void *key)
return hash;
}
/**
* @brief Function to compare hash keys in read-write split router
*
* Used to manage information about temporary tables.
*
* @param key first key to be compared, actually a character string
* @param v2 second key to be compared, actually a character string
* @result 1 if keys are equal, 0 otherwise
*/
int rwsplit_hashcmpfun(const void *v1, const void *v2)
{
const char *i1 = (const char *)v1;
@ -449,12 +466,27 @@ int rwsplit_hashcmpfun(const void *v1, const void *v2)
return strcmp(i1, i2);
}
/**
* @brief Function to duplicate a hash value in read-write split router
*
* Used to manage information about temporary tables.
*
* @param fval value to be duplicated, actually a character string
* @result the duplicated value, actually a character string
*/
void *rwsplit_hstrdup(const void *fval)
{
char *str = (char *)fval;
return MXS_STRDUP(str);
}
/**
* @brief Function to free hash values in read-write split router
*
* Used to manage information about temporary tables.
*
* @param key value to be freed
*/
void rwsplit_hfree(void *fval)
{
MXS_FREE(fval);
@ -869,6 +901,16 @@ route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
return target;
}
/**
* @brief Handle multi statement queries and load statements
*
* One of the possible types of handling required when a request is routed
*
* @param ses Router session
* @param querybuf Buffer containing query to be routed
* @param packet_type Type of packet (database specific)
* @param qtype Query type
*/
void
handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
int packet_type, int *qtype)
@ -955,6 +997,18 @@ handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
}
}
/**
* @brief Handle hinted target query
*
* One of the possible types of handling required when a request is routed
*
* @param ses Router session
* @param querybuf Buffer containing query to be routed
* @param route_target Target for the query
* @param target_dcb DCB for the target server
*
* @return bool - true if succeeded, false otherwise
*/
bool handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
route_target_t route_target, DCB **target_dcb)
{
@ -1027,6 +1081,17 @@ bool handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
return succp;
}
/**
* @brief Handle slave is the target
*
* One of the possible types of handling required when a request is routed
*
* @param inst Router instance
* @param ses Router session
* @param target_dcb DCB for the target server
*
* @return bool - true if succeeded, false otherwise
*/
bool handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
DCB **target_dcb)
{
@ -1050,6 +1115,17 @@ bool handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
}
}
/**
* @brief Handle master is the target
*
* One of the possible types of handling required when a request is routed
*
* @param inst Router instance
* @param ses Router session
* @param target_dcb DCB for the target server
*
* @return bool - true if succeeded, false otherwise
*/
bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
DCB **target_dcb)
{
@ -1090,6 +1166,18 @@ bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
return succp;
}
/**
* @brief Handle got a target
*
* One of the possible types of handling required when a request is routed
*
* @param inst Router instance
* @param ses Router session
* @param querybuf Buffer containing query to be routed
* @param target_dcb DCB for the target server
*
* @return bool - true if succeeded, false otherwise
*/
bool
handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, DCB *target_dcb)
@ -1149,7 +1237,11 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
}
/**
* Create a generic router session property structure.
* @brief Create a generic router session property structure.
*
* @param prop_type Property type
*
* @return property structure of requested type, or NULL if failed
*/
rses_property_t *rses_property_init(rses_property_type_t prop_type)
{
@ -1171,11 +1263,18 @@ rses_property_t *rses_property_init(rses_property_type_t prop_type)
}
/**
* @brief Add property to the router client session
*
* Add property to the router_client_ses structure's rses_properties
* array. The slot is determined by the type of property.
* In each slot there is a list of properties of similar type.
*
* Router client session must be locked.
*
* @param rses Router session
* @param prop Router session property to be added
*
* @return -1 on failure, 0 on success
*/
int rses_property_add(ROUTER_CLIENT_SES *rses, rses_property_t *prop)
{

View File

@ -44,6 +44,8 @@
*/
/**
* @brief Check for dropping of temporary tables
*
* Check if the query is a DROP TABLE... query and
* if it targets a temporary table, remove it from the hashtable.
* @param router_cli_ses Router client session
@ -351,6 +353,15 @@ bool check_for_multi_stmt(GWBUF *buf, void *protocol, mysql_server_cmd_t packet_
return rval;
}
/**
* @brief Determine the type of a query
*
* @param querybuf GWBUF containing the query
* @param packet_type Integer denoting DB specific enum
* @param non_empty_packet Boolean to be set by this function
*
* @return qc_query_type_t the query type; also the non_empty_packet bool is set
*/
qc_query_type_t
determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet)
{