diff --git a/.travis.yml b/.travis.yml index 961fbfe4a..052d7da42 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,6 +21,8 @@ addons: - libpcre3-dev - doxygen - pandoc + - uuid + - uuid-dev coverity_scan: project: name: "mariadb-corporation/MaxScale" diff --git a/Documentation/Filters/Cache.md b/Documentation/Filters/Cache.md index bdd5fa555..a55765567 100644 --- a/Documentation/Filters/Cache.md +++ b/Documentation/Filters/Cache.md @@ -1,4 +1,4 @@ -#Cache +# Cache ## Overview The cache filter is capable of caching the result of SELECTs, so that subsequent identical @@ -15,7 +15,9 @@ type=filter module=cache ttl=5 storage=... -storage_args=... +storage_options=... +rules=... +debug=... [Cached Routing Service] type=service @@ -42,51 +44,21 @@ optional ones. #### `storage` The name of the module that provides the storage for the cache. That -module will be loaded and provided with the value of `storage_args` as +module will be loaded and provided with the value of `storage_options` as argument. For instance: ``` storage=storage_rocksdb ``` -#### `storage_args` +#### `storage_options` A comma separated list of arguments to be provided to the storage module, specified in `storage`, when it is loaded. Note that the needed arguments depend upon the specific module. For instance, ``` -storage_args=path=/usr/maxscale/cache/rocksdb +storage_options=storage_specific_option1=value1,storage_specific_option2=value2 ``` -#### `allowed_references` - -Specifies whether any or only fully qualified references are allowed in -queries stored to the cache. -``` -allowed_references=[qualified|any] -``` -The default is `qualified`, which means that only queries where -the database name is included in the table name are subject to caching. -``` -select col from db.tbl; -``` -If `any` is specified, then also queries where the table name is not -fully qualified are subject to caching. -``` -select col from tbl; -``` -Care should be excersized before this setting is changed, because, for -instance, the following is likely to produce unexpected results. -``` -use db1; -select col from tbl; -... -use db2; -select col from tbl; -``` -The setting can be changed to `any`, provided fully qualified names -are always used or if the names of tables in different databases are -different. - #### `max_resultset_rows` Specifies the maximum number of rows a resultset can have in order to be @@ -119,6 +91,234 @@ If nothing is specified, the default _ttl_ value is 10. ttl=60 ``` -#Storage +#### `rules` + +Specifies the path of the file where the caching rules are stored. A relative +path is interpreted relative to the _data directory_ of MariaDB MaxScale. + +``` +rules=/path/to/rules-file +``` + +#### `debug` + +An integer value, using which the level of debug logging made by the cache +can be controlled. The value is actually a bitfield with different bits +denoting different logging. + + * `0` (`0b0000`) No logging is made. + * `1` (`0b0001`) A matching rule is logged. + * `2` (`0b0010`) A non-matching rule is logged. + * `4` (`0b0100`) A decision to use data from the cache is logged. + * `8` (`0b1000`) A decision not to use data from the cache is logged. + +Default is `0`. To log everything, give `debug` a value of `15`. + +``` +debug=2 +``` + +# Rules + +The caching rules are expressed as a JSON object. + +There are two decisions to be made regarding the caching; in what circumstances +should data be stored to the cache and in what circumstances should the data in +the cache be used. + +In the JSON object this is visible as follows: + +``` +{ + store: [ ... ], + use: [ ... ] +} +``` + +The `store` field specifies in what circumstances data should be stored to +the cache and the `use` field specifies in what circumstances the data in +the cache should be used. In both cases, the value is a JSON array containg +objects. + +## When to Store + +By default, if no rules file have been provided or if the `store` field is +missing from the object, the results of all queries will be stored to the +cache, subject to `max_resultset_rows` and `max_resultset_size` cache filter +parameters. + +By providing a `store` field in the JSON object, the decision whether to +store the result of a particular query to the cache can be controlled in +a more detailed manner. The decision to cache the results of a query can +depend upon + + * the database, + * the table, + * the column, or + * the query itself. + +Each entry in the `store` array is an object containing three fields, + +``` +{ + "attribute": , + "op": + "value": +} +``` + +where, + * the _attribute_ can be `database`, `table`, `column` or `query`, + * the _op_ can be `=`, `!=`, `like` or `unlike`, and + * the _value_ a string. + +If _op_ is `=` or `!=` then _value_ is used verbatim; if it is `like` +or `unlike`, then _value_ is interpreted as a _pcre2_ regular expression. + +The objects in the `store` array are processed in order. If the result +of a comparison is _true_, no further processing will be made and the +result of the query in question will be stored to the cache. + +If the result of the comparison is _false_, then the next object is +processed. The process continues until the array is exhausted. If there +is no match, then the result of the query is not stored to the cache. + +Note that as the query itself is used as the key, although the following +queries +``` +select * from db1.tbl +``` +and +``` +use db1; +select * from tbl +``` +target the same table and produce the same results, they will be cached +separately. The same holds for queries like +``` +select * from tbl where a = 2 and b = 3; +``` +and +``` +select * from tbl where b = 3 and a = 2; +``` +as well. Although they conceptually are identical, there will be two +cache entries. + +### Examples + +Cache all queries targeting a particular database. +``` +{ + "store": [ + { + "attribute": "database", + "op": "=", + "value": "db1" + } + ] +} +``` + +Cache all queries _not_ targeting a particular table +``` +{ + "store": [ + { + "attribute": "table", + "op": "!=", + "value": "tbl1" + } + ] +} +``` + +That will exclude queries targeting table _tbl1_ irrespective of which +database it is in. To exclude a table in a particular database, specify +the table name using a qualified name. +``` +{ + "store": [ + { + "attribute": "table", + "op": "!=", + "value": "db1.tbl1" + } + ] +} +``` + +Cache all queries containing a WHERE clause +``` +{ + "store": [ + { + "attribute": "query", + "op": "like", + "value": ".*WHERE.*" + } + ] +} +``` + +Note that that will actually cause all queries that contain WHERE anywhere, +to be cached. + +## When to Use + +By default, if no rules file have been provided or if the `use` field is +missing from the object, all users may be returned data from the cache. + +By providing a `use` field in the JSON object, the decision whether to use +data from the cache can be controlled in a more detailed manner. The decision +to use data from the cache can depend upon + + * the user. + +Each entry in the `use` array is an object containing three fields, + +``` +{ + "attribute": , + "op": + "value": +} +``` + +where, + * the _attribute_ can be `user`, + * the _op_ can be `=`, `!=`, `like` or `unlike`, and + * the _value_ a string. + +If _op_ is `=` or `!=` then _value_ is used verbatim; if it is `like` +or `unlike`, then _value_ is interpreted as a _pcre2_ regular expression. + +The objects in the `use` array are processed in order. If the result +of a comparison is _true_, no further processing will be made and the +data in the cache will be used, subject to the value of `ttl`. + +If the result of the comparison is _false_, then the next object is +processed. The process continues until the array is exhausted. If there +is no match, then data in the cache will not be used. + +Note that `use` is relevant only if the query is subject to caching, +that is, if all queries are cached or if a query matches a particular +rule in the `store` array. + +### Examples + +Use data from the cache for all users except `admin`. +``` +{ + "use": [ + { + "attribute": "user", + "op": "!=", + "value": "admin" + } + ] +} +``` +# Storage ## Storage RocksDB diff --git a/Documentation/Monitors/MySQL-Monitor.md b/Documentation/Monitors/MySQL-Monitor.md index fe7ff6cf1..050109c2b 100644 --- a/Documentation/Monitors/MySQL-Monitor.md +++ b/Documentation/Monitors/MySQL-Monitor.md @@ -119,6 +119,51 @@ This functionality is similar to the [Multi-Master Monitor](MM-Monitor.md) functionality. The only difference is that the MySQL monitor will also detect traditional Master-Slave topologies. +### `failover` + +Failover mode. This feature takes a boolean parameter is disabled by default. + +This parameter is intended to be used with simple, two node master-slave pairs +where the failure of the master can be resolved by "promoting" the slave as the +new master. Normally this is done by using an external agent of some sort +(possibly triggered by MaxScale's monitor scripts), like +[MariaDB Replication Manager](https://github.com/tanji/replication-manager) +or [MHA](https://code.google.com/p/mysql-master-ha/). + +The failover mode in mysqlmon is completely passive in the sense that it does +not modify the cluster or any servers in it. It labels a slave server as a +master server when there is only one running server. Before a failover can be +initiated, the following conditions must have been met: + +- The monitor has repeatedly failed to connect to the failed servers +- There is only one running server among the monitored servers +- @@read_only is not enabled on the last running server + +When these conditions are met, the monitor assigns the last remaining server the +master status and puts all other servers into maintenance mode. This is done to +prevent accidental use of the failed servers if they came back online. + +When the failed servers come back up, the maintenance mode needs to be manually +cleared once replication has been set up. + +**Note**: A failover will cause permanent changes in the data of the promoted + server. Only use this feature if you know that the slave servers are capable + of acting as master servers. + +### `failcount` + +Number of failures that must occur on all failed servers before a failover is +initiated. The default value is 5 failures. + +The monitor will attemt to contact all servers once per monitoring cycle. When +_failover_ mode is enabled, all of the failed servers must fail _failcount_ +number of connection attemps before a failover is initiated. + +The formula for calculating the actual number of milliseconds before failover +can start is `monitor_interval * failcount`. This means that to trigger a +failover after 10 seconds of master failure with a _monitor_interval_ of 1000 +milliseconds, the value of _failcount_ must be 10. + ## Example 1 - Monitor script Here is an example shell script which sends an email to an admin when a server goes down. diff --git a/Documentation/Release-Notes/MaxScale-2.1.0-Release-Notes.md b/Documentation/Release-Notes/MaxScale-2.1.0-Release-Notes.md index 40b4de63e..fc42d8c7d 100644 --- a/Documentation/Release-Notes/MaxScale-2.1.0-Release-Notes.md +++ b/Documentation/Release-Notes/MaxScale-2.1.0-Release-Notes.md @@ -66,6 +66,20 @@ which of them is the real write node and assigns the appropriate status for each node. This module also supports launchable scripts on monitored events. Read the [Monitor Common Documentation](../Monitors/Monitor-Common.md) for more details. +### Multi-master mode for MySQL Monitor + +The MySQL monitor now detects complex multi-master replication +topologies. This allows the mysqlmon module to be used as a replacement +for the mmmon module. For more details, please read the +[MySQL Monitor Documentation](../Monitors/MySQL-Monitor.md). + +### Failover mode for MySQL Monitor + +A simple failover mode has been added to the MySQL Monitor. This mode is +aimed for two node master-slave clusters where the slave can act as a +master in case the original master fails. For more details, please read +the [MySQL Monitor Documentation](../Monitors/MySQL-Monitor.md). + ## Bug fixes [Here is a list of bugs fixed since the release of MaxScale 2.0.X.](https://jira.mariadb.org/browse/MXS-739?jql=project%20%3D%20MXS%20AND%20issuetype%20%3D%20Bug%20AND%20resolution%20in%20(Fixed%2C%20Done)%20AND%20fixVersion%20%3D%202.0.0) diff --git a/server/core/config.c b/server/core/config.c index baaec3596..df8fb44c7 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -185,6 +185,8 @@ static char *monitor_params[] = "disable_master_role_setting", "use_priority", "multimaster", + "failover", + "failcount", NULL }; diff --git a/server/core/gateway.c b/server/core/gateway.c index 5eed47fc2..657c37add 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -365,9 +365,9 @@ sigfatal_handler(int i) GATEWAY_CONF* cnf = config_get_global_options(); fprintf(stderr, "\n\nMaxScale "MAXSCALE_VERSION" received fatal signal %d\n", i); - MXS_ERROR("Fatal: MaxScale "MAXSCALE_VERSION" received fatal signal %d. Attempting backtrace.", i); + MXS_ALERT("Fatal: MaxScale "MAXSCALE_VERSION" received fatal signal %d. Attempting backtrace.", i); - MXS_ERROR("Commit ID: %s System name: %s " + MXS_ALERT("Commit ID: %s System name: %s " "Release string: %s", maxscale_commit, cnf->sysname, cnf->release_string); @@ -380,7 +380,7 @@ sigfatal_handler(int i) { for (n = 0; n < count; n++) { - MXS_ERROR(" %s\n", symbols[n]); + MXS_ALERT(" %s\n", symbols[n]); } MXS_FREE(symbols); } diff --git a/server/core/log_manager.cc b/server/core/log_manager.cc index 64bab964a..bd4eeb58c 100644 --- a/server/core/log_manager.cc +++ b/server/core/log_manager.cc @@ -548,7 +548,7 @@ static bool logmanager_init_nomutex(const char* ident, /** * Set global variable */ - mxs_log_enabled_priorities = MXS_LOG_ERR | MXS_LOG_NOTICE | MXS_LOG_WARNING; + mxs_log_enabled_priorities = (1 << LOG_ERR) | (1 << LOG_NOTICE) | (1 << LOG_WARNING); /** * Initialize filewriter data and open the log file diff --git a/server/include/log_manager.h b/server/include/log_manager.h index 3f1152a85..127d3e985 100644 --- a/server/include/log_manager.h +++ b/server/include/log_manager.h @@ -54,21 +54,6 @@ extern "C" { #define MXS_MODULE_NAME NULL #endif -enum mxs_log_priorities -{ - MXS_LOG_EMERG = (1 << LOG_EMERG), - MXS_LOG_ALERT = (1 << LOG_ALERT), - MXS_LOG_CRIT = (1 << LOG_CRIT), - MXS_LOG_ERR = (1 << LOG_ERR), - MXS_LOG_WARNING = (1 << LOG_WARNING), - MXS_LOG_NOTICE = (1 << LOG_NOTICE), - MXS_LOG_INFO = (1 << LOG_INFO), - MXS_LOG_DEBUG = (1 << LOG_DEBUG), - - MXS_LOG_MASK = (MXS_LOG_EMERG | MXS_LOG_ALERT | MXS_LOG_CRIT | MXS_LOG_ERR | - MXS_LOG_WARNING | MXS_LOG_NOTICE | MXS_LOG_INFO | MXS_LOG_DEBUG), -}; - typedef enum { MXS_LOG_TARGET_DEFAULT = 0, @@ -93,6 +78,8 @@ extern __thread mxs_log_info_t mxs_log_tls; /** * Check if specified log type is enabled in general or if it is enabled * for the current session. + * + * @param priority One of the syslog LOG_ERR, LOG_WARNING, etc. constants. */ #define MXS_LOG_PRIORITY_IS_ENABLED(priority) \ (((mxs_log_enabled_priorities & (1 << priority)) || \ @@ -149,11 +136,20 @@ int mxs_log_message(int priority, mxs_log_message(priority, MXS_MODULE_NAME, __FILE__, __LINE__, __func__, format, ##__VA_ARGS__) /** - * Log an error, warning, notice, info, or debug message. + * Log an alert, error, warning, notice, info, or debug message. + * + * MXS_ALERT Not throttled To be used when the system is about to go down in flames. + * MXS_ERROR Throttled For errors. + * MXS_WARNING Throttled For warnings. + * MXS_NOTICE Not Throttled For messages deemed important, typically used during startup. + * MXS_INFO Not Throttled For information thought to be of value for investigating some problem. + * MXS_DEBUG Not Throttled For debugging messages during development. Should be removed when a + * feature is ready. * * @param format The printf format of the message. * @param ... Arguments, depending on the format. */ +#define MXS_ALERT(format, ...) MXS_LOG_MESSAGE(LOG_ALERT, format, ##__VA_ARGS__) #define MXS_ERROR(format, ...) MXS_LOG_MESSAGE(LOG_ERR, format, ##__VA_ARGS__) #define MXS_WARNING(format, ...) MXS_LOG_MESSAGE(LOG_WARNING, format, ##__VA_ARGS__) #define MXS_NOTICE(format, ...) MXS_LOG_MESSAGE(LOG_NOTICE, format, ##__VA_ARGS__) diff --git a/server/modules/filter/cache/CMakeLists.txt b/server/modules/filter/cache/CMakeLists.txt index e3ccbf620..8f9b71333 100644 --- a/server/modules/filter/cache/CMakeLists.txt +++ b/server/modules/filter/cache/CMakeLists.txt @@ -1,6 +1,7 @@ -add_library(cache SHARED cache.c storage.c) -target_link_libraries(cache maxscale-common) +add_library(cache SHARED cache.c rules.c storage.c) +target_link_libraries(cache maxscale-common jansson) set_target_properties(cache PROPERTIES VERSION "1.0.0") +set_target_properties(cache PROPERTIES LINK_FLAGS -Wl,-z,defs) install_module(cache experimental) add_subdirectory(storage) diff --git a/server/modules/filter/cache/cache.c b/server/modules/filter/cache/cache.c index 4fc9b17ec..f500c5454 100644 --- a/server/modules/filter/cache/cache.c +++ b/server/modules/filter/cache/cache.c @@ -14,12 +14,14 @@ #define MXS_MODULE_NAME "cache" #include #include +#include #include #include #include #include #include #include "cache.h" +#include "rules.h" #include "storage.h" static char VERSION_STRING[] = "V1.0.0"; @@ -90,39 +92,43 @@ FILTER_OBJECT *GetModuleObject() typedef struct cache_config { - cache_references_t allowed_references; - uint32_t max_resultset_rows; - uint32_t max_resultset_size; - const char *storage; - const char *storage_args; - uint32_t ttl; + uint32_t max_resultset_rows; + uint32_t max_resultset_size; + const char* rules; + const char *storage; + const char *storage_options; + uint32_t ttl; + uint32_t debug; } CACHE_CONFIG; static const CACHE_CONFIG DEFAULT_CONFIG = { - CACHE_DEFAULT_ALLOWED_REFERENCES, CACHE_DEFAULT_MAX_RESULTSET_ROWS, CACHE_DEFAULT_MAX_RESULTSET_SIZE, NULL, NULL, - CACHE_DEFAULT_TTL + NULL, + CACHE_DEFAULT_TTL, + CACHE_DEFAULT_DEBUG }; typedef struct cache_instance { const char *name; CACHE_CONFIG config; + CACHE_RULES *rules; CACHE_STORAGE_MODULE *module; CACHE_STORAGE *storage; } CACHE_INSTANCE; typedef enum cache_session_state { - CACHE_EXPECTING_RESPONSE, // A select has been sent, and we are waiting for the response. - CACHE_EXPECTING_FIELDS, // A select has been sent, and we want more fields. - CACHE_EXPECTING_ROWS, // A select has been sent, and we want more rows. - CACHE_EXPECTING_NOTHING, // We are not expecting anything from the server. - CACHE_IGNORING_RESPONSE, // We are not interested in the data received from the server. + CACHE_EXPECTING_RESPONSE, // A select has been sent, and we are waiting for the response. + CACHE_EXPECTING_FIELDS, // A select has been sent, and we want more fields. + CACHE_EXPECTING_ROWS, // A select has been sent, and we want more rows. + CACHE_EXPECTING_NOTHING, // We are not expecting anything from the server. + CACHE_EXPECTING_USE_RESPONSE, // A "USE DB" was issued. + CACHE_IGNORING_RESPONSE, // We are not interested in the data received from the server. } cache_session_state_t; typedef struct cache_request_state @@ -143,15 +149,17 @@ static void cache_response_state_reset(CACHE_RESPONSE_STATE *state); typedef struct cache_session_data { - CACHE_INSTANCE *instance; /**< The cache instance the session is associated with. */ - CACHE_STORAGE_API *api; /**< The storage API to be used. */ - CACHE_STORAGE *storage; /**< The storage to be used with this session data. */ - DOWNSTREAM down; /**< The previous filter or equivalent. */ - UPSTREAM up; /**< The next filter or equivalent. */ - CACHE_REQUEST_STATE req; /**< The request state. */ - CACHE_RESPONSE_STATE res; /**< The response state. */ - SESSION *session; /**< The session this data is associated with. */ + CACHE_INSTANCE *instance; /**< The cache instance the session is associated with. */ + CACHE_STORAGE_API *api; /**< The storage API to be used. */ + CACHE_STORAGE *storage; /**< The storage to be used with this session data. */ + DOWNSTREAM down; /**< The previous filter or equivalent. */ + UPSTREAM up; /**< The next filter or equivalent. */ + CACHE_REQUEST_STATE req; /**< The request state. */ + CACHE_RESPONSE_STATE res; /**< The response state. */ + SESSION *session; /**< The session this data is associated with. */ char key[CACHE_KEY_MAXLEN]; /**< Key storage. */ + char *default_db; /**< The default database. */ + char *use_db; /**< Pending default database. Needs server response. */ cache_session_state_t state; } CACHE_SESSION_DATA; @@ -162,8 +170,9 @@ static int handle_expecting_fields(CACHE_SESSION_DATA *csdata); static int handle_expecting_nothing(CACHE_SESSION_DATA *csdata); static int handle_expecting_response(CACHE_SESSION_DATA *csdata); static int handle_expecting_rows(CACHE_SESSION_DATA *csdata); +static int handle_expecting_use_response(CACHE_SESSION_DATA *csdata); static int handle_ignoring_response(CACHE_SESSION_DATA *csdata); - +static bool process_params(char **options, FILTER_PARAMETER **params, CACHE_CONFIG* config); static bool route_using_cache(CACHE_SESSION_DATA *sdata, const GWBUF *key, GWBUF **value); static int send_upstream(CACHE_SESSION_DATA *csdata); @@ -186,129 +195,61 @@ static void store_result(CACHE_SESSION_DATA *csdata); */ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **params) { + CACHE_INSTANCE *cinstance = NULL; CACHE_CONFIG config = DEFAULT_CONFIG; - bool error = false; - - for (int i = 0; params[i]; ++i) + if (process_params(options, params, &config)) { - const FILTER_PARAMETER *param = params[i]; + CACHE_RULES *rules = NULL; - if (strcmp(param->name, "allowed_references") == 0) + if (config.rules) { - if (strcmp(param->value, "qualified") == 0) - { - config.allowed_references = CACHE_REFERENCES_QUALIFIED; - } - else if (strcmp(param->value, "any") == 0) - { - config.allowed_references = CACHE_REFERENCES_ANY; - } - else - { - MXS_ERROR("Unknown value '%s' for parameter '%s'.", param->value, param->name); - error = true; - } + rules = cache_rules_load(config.rules, config.debug); } - else if (strcmp(param->name, "max_resultset_rows") == 0) + else { - int v = atoi(param->value); - - if (v > 0) - { - config.max_resultset_rows = v; - } - else - { - config.max_resultset_rows = CACHE_DEFAULT_MAX_RESULTSET_ROWS; - } + rules = cache_rules_create(config.debug); } - else if (strcmp(param->name, "max_resultset_size") == 0) - { - int v = atoi(param->value); - if (v > 0) + if (rules) + { + if ((cinstance = MXS_CALLOC(1, sizeof(CACHE_INSTANCE))) != NULL) { - config.max_resultset_size = v * 1024; - } - else - { - MXS_ERROR("The value of the configuration entry '%s' must " - "be an integer larger than 0.", param->name); - error = true; - } - } - else if (strcmp(param->name, "storage_args") == 0) - { - config.storage_args = param->value; - } - else if (strcmp(param->name, "storage") == 0) - { - config.storage = param->value; - } - else if (strcmp(param->name, "ttl") == 0) - { - int v = atoi(param->value); + CACHE_STORAGE_MODULE *module = cache_storage_open(config.storage); - if (v > 0) - { - config.ttl = v; - } - else - { - MXS_ERROR("The value of the configuration entry '%s' must " - "be an integer larger than 0.", param->name); - error = true; - } - } - else if (!filter_standard_parameter(params[i]->name)) - { - MXS_ERROR("Unknown configuration entry '%s'.", param->name); - error = true; - } - } - - CACHE_INSTANCE *cinstance = NULL; - - if (!error) - { - if ((cinstance = MXS_CALLOC(1, sizeof(CACHE_INSTANCE))) != NULL) - { - CACHE_STORAGE_MODULE *module = cache_storage_open(config.storage); - - if (module) - { - CACHE_STORAGE *storage = module->api->createInstance(name, config.ttl, 0, NULL); - - if (storage) + if (module) { - cinstance->name = name; - cinstance->config = config; - cinstance->module = module; - cinstance->storage = storage; + CACHE_STORAGE *storage = module->api->createInstance(name, config.ttl, 0, NULL); - MXS_NOTICE("Cache storage %s opened and initialized.", config.storage); + if (storage) + { + cinstance->name = name; + cinstance->config = config; + cinstance->rules = rules; + cinstance->module = module; + cinstance->storage = storage; + + MXS_NOTICE("Cache storage %s opened and initialized.", config.storage); + } + else + { + MXS_ERROR("Could not create storage instance for '%s'.", name); + cache_rules_free(rules); + cache_storage_close(module); + MXS_FREE(cinstance); + cinstance = NULL; + } } else { - MXS_ERROR("Could not create storage instance for %s.", name); - cache_storage_close(module); + MXS_ERROR("Could not load cache storage module '%s'.", name); + cache_rules_free(rules); MXS_FREE(cinstance); cinstance = NULL; } } - else - { - MXS_ERROR("Could not load cache storage module %s.", name); - MXS_FREE(cinstance); - cinstance = NULL; - } } } - else - { - cinstance = NULL; - } return (FILTER*)cinstance; } @@ -417,36 +358,89 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *data) cache_response_state_reset(&csdata->res); csdata->state = CACHE_IGNORING_RESPONSE; - // TODO: This returns the wrong result if GWBUF_LENGTH(packet) is < 5. - if (modutil_is_SQL(packet)) + if (gwbuf_length(packet) > MYSQL_HEADER_LEN + 1) // We need at least a packet with a type. { - packet = gwbuf_make_contiguous(packet); + uint8_t header[MYSQL_HEADER_LEN + 1]; - // We do not care whether the query was fully parsed or not. - // If a query cannot be fully parsed, the worst thing that can - // happen is that caching is not used, even though it would be - // possible. + gwbuf_copy_data(packet, 0, sizeof(header), header); - if (qc_get_operation(packet) == QUERY_OP_SELECT) + switch ((int)MYSQL_GET_COMMAND(header)) { - GWBUF *result; - use_default = !route_using_cache(csdata, packet, &result); - - if (use_default) + case MYSQL_COM_INIT_DB: { - csdata->state = CACHE_EXPECTING_RESPONSE; - } - else - { - csdata->state = CACHE_EXPECTING_NOTHING; - C_DEBUG("Using data from cache."); - gwbuf_free(packet); - DCB *dcb = csdata->session->client_dcb; + ss_dassert(!csdata->use_db); + size_t len = MYSQL_GET_PACKET_LEN(header) - 1; // Remove the command byte. + csdata->use_db = MXS_MALLOC(len + 1); - // TODO: This is not ok. Any filters before this filter, will not - // TODO: see this data. - rv = dcb->func.write(dcb, result); + if (csdata->use_db) + { + uint8_t *use_db = (uint8_t*)csdata->use_db; + gwbuf_copy_data(packet, MYSQL_HEADER_LEN + 1, len, use_db); + csdata->use_db[len] = 0; + csdata->state = CACHE_EXPECTING_USE_RESPONSE; + } + else + { + // Memory allocation failed. We need to remove the default database to + // prevent incorrect cache entries, since we won't know what the + // default db is. But we only need to do that if "USE " really + // succeeds. The right thing will happen by itself in + // handle_expecting_use_response(); if OK is returned, default_db will + // become NULL, if ERR, default_db will not be changed. + } } + break; + + case MYSQL_COM_QUERY: + { + GWBUF *tmp = gwbuf_make_contiguous(packet); + + if (tmp) + { + packet = tmp; + + // We do not care whether the query was fully parsed or not. + // If a query cannot be fully parsed, the worst thing that can + // happen is that caching is not used, even though it would be + // possible. + + if (qc_get_operation(packet) == QUERY_OP_SELECT) + { + if (cache_rules_should_store(cinstance->rules, csdata->default_db, packet)) + { + if (cache_rules_should_use(cinstance->rules, csdata->session)) + { + GWBUF *result; + use_default = !route_using_cache(csdata, packet, &result); + + if (use_default) + { + csdata->state = CACHE_EXPECTING_RESPONSE; + } + else + { + csdata->state = CACHE_EXPECTING_NOTHING; + C_DEBUG("Using data from cache."); + gwbuf_free(packet); + DCB *dcb = csdata->session->client_dcb; + + // TODO: This is not ok. Any filters before this filter, will not + // TODO: see this data. + rv = dcb->func.write(dcb, result); + } + } + } + else + { + csdata->state = CACHE_IGNORING_RESPONSE; + } + } + } + } + break; + + default: + break; } } @@ -519,6 +513,10 @@ static int clientReply(FILTER *instance, void *sdata, GWBUF *data) rv = handle_expecting_rows(csdata); break; + case CACHE_EXPECTING_USE_RESPONSE: + rv = handle_expecting_use_response(csdata); + break; + case CACHE_IGNORING_RESPONSE: rv = handle_ignoring_response(csdata); break; @@ -584,11 +582,31 @@ static CACHE_SESSION_DATA *cache_session_data_create(CACHE_INSTANCE *instance, if (data) { - data->instance = instance; - data->api = instance->module->api; - data->storage = instance->storage; - data->session = session; - data->state = CACHE_EXPECTING_NOTHING; + char *default_db = NULL; + + ss_dassert(session->client_dcb); + ss_dassert(session->client_dcb->data); + MYSQL_session *mysql_session = (MYSQL_session*)session->client_dcb->data; + + if (mysql_session->db[0] != 0) + { + default_db = MXS_STRDUP(mysql_session->db); + } + + if ((mysql_session->db[0] == 0) || default_db) + { + data->instance = instance; + data->api = instance->module->api; + data->storage = instance->storage; + data->session = session; + data->state = CACHE_EXPECTING_NOTHING; + data->default_db = default_db; + } + else + { + MXS_FREE(data); + data = NULL; + } } return data; @@ -603,6 +621,8 @@ static void cache_session_data_free(CACHE_SESSION_DATA* data) { if (data) { + ss_dassert(!data->use_db); + MXS_FREE(data->default_db); MXS_FREE(data); } } @@ -704,7 +724,7 @@ static int handle_expecting_response(CACHE_SESSION_DATA *csdata) store_result(csdata); rv = send_upstream(csdata); - csdata->state = CACHE_EXPECTING_NOTHING; + csdata->state = CACHE_IGNORING_RESPONSE; break; case 0xfb: // GET_MORE_CLIENT_DATA/SEND_MORE_CLIENT_DATA @@ -818,6 +838,58 @@ static int handle_expecting_rows(CACHE_SESSION_DATA *csdata) return rv; } +/** + * Called when a response to a "USE db" is received from the server. + * + * @param csdata The cache session data. + */ +static int handle_expecting_use_response(CACHE_SESSION_DATA *csdata) +{ + ss_dassert(csdata->state == CACHE_EXPECTING_USE_RESPONSE); + ss_dassert(csdata->res.data); + + int rv = 1; + + size_t buflen = gwbuf_length(csdata->res.data); + + if (buflen >= MYSQL_HEADER_LEN + 1) // We need the command byte. + { + uint8_t command; + + gwbuf_copy_data(csdata->res.data, MYSQL_HEADER_LEN, 1, &command); + + switch (command) + { + case 0x00: // OK + // In case csdata->use_db could not be allocated in routeQuery(), we will + // in fact reset the default db here. That's ok as it will prevent broken + // entries in the cache. + MXS_FREE(csdata->default_db); + csdata->default_db = csdata->use_db; + csdata->use_db = NULL; + break; + + case 0xff: // ERR + MXS_FREE(csdata->use_db); + csdata->use_db = NULL; + break; + + default: + MXS_ERROR("\"USE %s\" received unexpected server response %d.", + csdata->use_db ? csdata->use_db : "", command); + MXS_FREE(csdata->default_db); + MXS_FREE(csdata->use_db); + csdata->default_db = NULL; + csdata->use_db = NULL; + } + + rv = send_upstream(csdata); + csdata->state = CACHE_IGNORING_RESPONSE; + } + + return rv; +} + /** * Called when all data from the server is ignored. * @@ -831,6 +903,125 @@ static int handle_ignoring_response(CACHE_SESSION_DATA *csdata) return send_upstream(csdata); } +/** + * Processes the cache params + * + * @param options Options as passed to the filter. + * @param params Parameters as passed to the filter. + * @param config Pointer to config instance where params will be stored. + * + * @return True if all parameters could be processed, false otherwise. + */ +static bool process_params(char **options, FILTER_PARAMETER **params, CACHE_CONFIG* config) +{ + bool error = false; + + for (int i = 0; params[i]; ++i) + { + const FILTER_PARAMETER *param = params[i]; + + if (strcmp(param->name, "max_resultset_rows") == 0) + { + int v = atoi(param->value); + + if (v > 0) + { + config->max_resultset_rows = v; + } + else + { + config->max_resultset_rows = CACHE_DEFAULT_MAX_RESULTSET_ROWS; + } + } + else if (strcmp(param->name, "max_resultset_size") == 0) + { + int v = atoi(param->value); + + if (v > 0) + { + config->max_resultset_size = v * 1024; + } + else + { + MXS_ERROR("The value of the configuration entry '%s' must " + "be an integer larger than 0.", param->name); + error = true; + } + } + else if (strcmp(param->name, "rules") == 0) + { + if (*param->value == '/') + { + config->rules = MXS_STRDUP(param->value); + } + else + { + const char *datadir = get_datadir(); + size_t len = strlen(datadir) + 1 + strlen(param->value) + 1; + + char *rules = MXS_MALLOC(len); + + if (rules) + { + sprintf(rules, "%s/%s", datadir, param->value); + config->rules = rules; + } + } + + if (!config->rules) + { + error = true; + } + } + else if (strcmp(param->name, "storage_options") == 0) + { + config->storage_options = param->value; + } + else if (strcmp(param->name, "storage") == 0) + { + config->storage = param->value; + } + else if (strcmp(param->name, "ttl") == 0) + { + int v = atoi(param->value); + + if (v > 0) + { + config->ttl = v; + } + else + { + MXS_ERROR("The value of the configuration entry '%s' must " + "be an integer larger than 0.", param->name); + error = true; + } + } + else if (strcmp(param->name, "debug") == 0) + { + int v = atoi(param->value); + + if ((v >= CACHE_DEBUG_MIN) && (v <= CACHE_DEBUG_MAX)) + { + config->debug = v; + } + else + { + MXS_ERROR("The value of the configuration entry '%s' must " + "be between %d and %d, inclusive.", + param->name, CACHE_DEBUG_MIN, CACHE_DEBUG_MAX); + error = true; + } + } + else if (!filter_standard_parameter(params[i]->name)) + { + MXS_ERROR("Unknown configuration entry '%s'.", param->name); + error = true; + } + } + + return !error; +} + /** * Route a query via the cache. * @@ -843,7 +1034,7 @@ static bool route_using_cache(CACHE_SESSION_DATA *csdata, const GWBUF *query, GWBUF **value) { - cache_result_t result = csdata->api->getKey(csdata->storage, query, csdata->key); + cache_result_t result = csdata->api->getKey(csdata->storage, csdata->default_db, query, csdata->key); if (result == CACHE_RESULT_OK) { @@ -883,14 +1074,19 @@ static void store_result(CACHE_SESSION_DATA *csdata) { ss_dassert(csdata->res.data); - csdata->res.data = gwbuf_make_contiguous(csdata->res.data); + GWBUF *data = gwbuf_make_contiguous(csdata->res.data); - cache_result_t result = csdata->api->putValue(csdata->storage, - csdata->key, - csdata->res.data); - - if (result != CACHE_RESULT_OK) + if (data) { - MXS_ERROR("Could not store cache item."); + csdata->res.data = data; + + cache_result_t result = csdata->api->putValue(csdata->storage, + csdata->key, + csdata->res.data); + + if (result != CACHE_RESULT_OK) + { + MXS_ERROR("Could not store cache item."); + } } } diff --git a/server/modules/filter/cache/cache.h b/server/modules/filter/cache/cache.h index ce904dad0..c9deff52e 100644 --- a/server/modules/filter/cache/cache.h +++ b/server/modules/filter/cache/cache.h @@ -15,19 +15,24 @@ #include +#define CACHE_DEBUG_NONE 0 +#define CACHE_DEBUG_MATCHING 1 +#define CACHE_DEBUG_NON_MATCHING 2 +#define CACHE_DEBUG_USE 4 +#define CACHE_DEBUG_NON_USE 8 -typedef enum cache_references -{ - CACHE_REFERENCES_ANY, // select * from tbl; - CACHE_REFERENCES_QUALIFIED // select * from db.tbl; -} cache_references_t; +#define CACHE_DEBUG_RULES (CACHE_DEBUG_MATCHING | CACHE_DEBUG_NON_MATCHING) +#define CACHE_DEBUG_USAGE (CACHE_DEBUG_USE | CACHE_DEBUG_NON_USE) +#define CACHE_DEBUG_MIN CACHE_DEBUG_NONE +#define CACHE_DEBUG_MAX (CACHE_DEBUG_RULES | CACHE_DEBUG_USAGE) -#define CACHE_DEFAULT_ALLOWED_REFERENCES CACHE_REFERENCES_QUALIFIED // Count #define CACHE_DEFAULT_MAX_RESULTSET_ROWS UINT_MAX // Bytes #define CACHE_DEFAULT_MAX_RESULTSET_SIZE 64 * 1024 // Seconds #define CACHE_DEFAULT_TTL 10 +// Integer value +#define CACHE_DEFAULT_DEBUG 0 #endif diff --git a/server/modules/filter/cache/cache_storage_api.h b/server/modules/filter/cache/cache_storage_api.h index f6c1b6618..c6f889c3e 100644 --- a/server/modules/filter/cache/cache_storage_api.h +++ b/server/modules/filter/cache/cache_storage_api.h @@ -53,7 +53,7 @@ typedef struct cache_storage_api * @param name The name of the cache instance. * @param ttl Time to live; number of seconds the value is valid. * @param argc The number of elements in the argv array. - * @param argv Array of arguments, as passed in the `storage_args` parameter + * @param argv Array of arguments, as passed in the `storage_options` parameter * in the cache section in the MaxScale configuration file. * @return A new cache instance, or NULL if the instance could not be * created. @@ -79,6 +79,7 @@ typedef struct cache_storage_api * @return CACHE_RESULT_OK if a key was created, otherwise some error code. */ cache_result_t (*getKey)(CACHE_STORAGE* storage, + const char* default_db, const GWBUF* query, char* key); /** diff --git a/server/modules/filter/cache/rules.c b/server/modules/filter/cache/rules.c new file mode 100644 index 000000000..f09a157b0 --- /dev/null +++ b/server/modules/filter/cache/rules.c @@ -0,0 +1,1108 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#define MXS_MODULE_NAME "cache" +#include "rules.h" +#include +#include +#include +#include +#include +#include +#include +#include "cache.h" + +static const char KEY_ATTRIBUTE[] = "attribute"; +static const char KEY_OP[] = "op"; +static const char KEY_STORE[] = "store"; +static const char KEY_USE[] = "use"; +static const char KEY_VALUE[] = "value"; + +static const char VALUE_ATTRIBUTE_COLUMN[] = "column"; +static const char VALUE_ATTRIBUTE_DATABASE[] = "database"; +static const char VALUE_ATTRIBUTE_QUERY[] = "query"; +static const char VALUE_ATTRIBUTE_TABLE[] = "table"; +static const char VALUE_ATTRIBUTE_USER[] = "user"; + +static const char VALUE_OP_EQ[] = "="; +static const char VALUE_OP_NEQ[] = "!="; +static const char VALUE_OP_LIKE[] = "like"; +static const char VALUE_OP_UNLIKE[] = "unlike"; + +struct cache_attribute_mapping +{ + const char* name; + int value; +}; + +static struct cache_attribute_mapping cache_store_attributes[] = +{ + { VALUE_ATTRIBUTE_COLUMN, CACHE_ATTRIBUTE_COLUMN }, + { VALUE_ATTRIBUTE_DATABASE, CACHE_ATTRIBUTE_DATABASE }, + { VALUE_ATTRIBUTE_QUERY, CACHE_ATTRIBUTE_QUERY }, + { VALUE_ATTRIBUTE_TABLE, CACHE_ATTRIBUTE_TABLE }, + { NULL, 0 } +}; + +static struct cache_attribute_mapping cache_use_attributes[] = +{ + { VALUE_ATTRIBUTE_USER, CACHE_ATTRIBUTE_USER }, + { NULL, 0 } +}; + +static bool cache_rule_attribute_get(struct cache_attribute_mapping *mapping, + const char *s, + cache_rule_attribute_t *attribute); +static const char *cache_rule_attribute_to_string(cache_rule_attribute_t attribute); + +static bool cache_rule_op_get(const char *s, cache_rule_op_t *op); +static const char *cache_rule_op_to_string(cache_rule_op_t op); + +static bool cache_rule_compare(CACHE_RULE *rule, const char *value); +static bool cache_rule_compare_n(CACHE_RULE *rule, const char *value, size_t length); +static CACHE_RULE *cache_rule_create_regexp(cache_rule_attribute_t attribute, + cache_rule_op_t op, + const char *value, + uint32_t debug); +static CACHE_RULE *cache_rule_create_simple(cache_rule_attribute_t attribute, + cache_rule_op_t op, + const char *value, + uint32_t debug); +static CACHE_RULE *cache_rule_create(cache_rule_attribute_t attribute, + cache_rule_op_t op, + const char *value, + uint32_t debug); +static bool cache_rule_matches_column(CACHE_RULE *rule, + const char *default_db, + const GWBUF *query); +static bool cache_rule_matches_database(CACHE_RULE *rule, + const char *default_db, + const GWBUF *query); +static bool cache_rule_matches_query(CACHE_RULE *rule, + const char *default_db, + const GWBUF *query); +static bool cache_rule_matches_table(CACHE_RULE *rule, + const char *default_db, + const GWBUF *query); +static bool cache_rule_matches_user(CACHE_RULE *rule, const char *user); +static bool cache_rule_matches(CACHE_RULE *rule, + const char *default_db, + const GWBUF *query); + +static void cache_rule_free(CACHE_RULE *rule); +static bool cache_rule_matches(CACHE_RULE *rule, const char *default_db, const GWBUF *query); + +static void cache_rules_add_store_rule(CACHE_RULES* self, CACHE_RULE* rule); +static void cache_rules_add_use_rule(CACHE_RULES* self, CACHE_RULE* rule); +static bool cache_rules_parse_json(CACHE_RULES* self, json_t* root); + +typedef bool (*cache_rules_parse_element_t)(CACHE_RULES *self, json_t *object, size_t index); + +static bool cache_rules_parse_array(CACHE_RULES *self, json_t *store, const char* name, + cache_rules_parse_element_t); +static bool cache_rules_parse_store_element(CACHE_RULES *self, json_t *object, size_t index); +static bool cache_rules_parse_use_element(CACHE_RULES *self, json_t *object, size_t index); + +/* + * API begin + */ + + +/** + * Create a default cache rules object. + * + * @param debug The debug level. + * + * @return The rules object or NULL is allocation fails. + */ +CACHE_RULES *cache_rules_create(uint32_t debug) +{ + CACHE_RULES *rules = (CACHE_RULES*)MXS_CALLOC(1, sizeof(CACHE_RULES)); + + if (rules) + { + rules->debug = debug; + } + + return rules; +} + +/** + * Loads the caching rules from a file and returns corresponding object. + * + * @param path The path of the file containing the rules. + * @param debug The debug level. + * + * @return The corresponding rules object, or NULL in case of error. + */ +CACHE_RULES *cache_rules_load(const char *path, uint32_t debug) +{ + CACHE_RULES *rules = NULL; + + FILE *fp = fopen(path, "r"); + + if (fp) + { + json_error_t error; + json_t *root = json_loadf(fp, JSON_DISABLE_EOF_CHECK, &error); + + if (root) + { + rules = cache_rules_create(debug); + + if (rules) + { + if (!cache_rules_parse_json(rules, root)) + { + cache_rules_free(rules); + rules = NULL; + } + } + + json_decref(root); + } + else + { + MXS_ERROR("Loading rules file failed: (%s:%d:%d): %s", + path, error.line, error.column, error.text); + } + + fclose(fp); + } + else + { + char errbuf[STRERROR_BUFLEN]; + + MXS_ERROR("Could not open rules file %s for reading: %s", + path, strerror_r(errno, errbuf, sizeof(errbuf))); + } + + return rules; +} + +/** + * Frees the rules object. + * + * @param path The path of the file containing the rules. + * + * @return The corresponding rules object, or NULL in case of error. + */ +void cache_rules_free(CACHE_RULES *rules) +{ + if (rules) + { + cache_rule_free(rules->store_rules); + cache_rule_free(rules->use_rules); + MXS_FREE(rules); + } +} + +/** + * Returns boolean indicating whether the result of the query should be stored. + * + * @param self The CACHE_RULES object. + * @param default_db The current default database, NULL if there is none. + * @param query The query, expected to contain a COM_QUERY. + * + * @return True, if the results should be stored. + */ +bool cache_rules_should_store(CACHE_RULES *self, const char *default_db, const GWBUF* query) +{ + bool should_store = false; + + CACHE_RULE *rule = self->store_rules; + + if (rule) + { + while (rule && !should_store) + { + should_store = cache_rule_matches(rule, default_db, query); + rule = rule->next; + } + } + else + { + should_store = true; + } + + return should_store; +} + +/** + * Returns boolean indicating whether the cache should be used, that is consulted. + * + * @param self The CACHE_RULES object. + * @param session The current session. + * + * @return True, if the cache should be used. + */ +bool cache_rules_should_use(CACHE_RULES *self, const SESSION *session) +{ + bool should_use = false; + + CACHE_RULE *rule = self->use_rules; + const char *user = session_getUser((SESSION*)session); + + if (rule && user) + { + while (rule && !should_use) + { + should_use = cache_rule_matches_user(rule, user); + rule = rule->next; + } + } + else + { + should_use = true; + } + + return should_use; +} + +/* + * API end + */ + +/** + * Converts a string to an attribute + * + * @param Name/value mapping. + * @param s A string + * @param attribute On successful return contains the corresponding attribute type. + * + * @return True if the string could be converted, false otherwise. + */ +static bool cache_rule_attribute_get(struct cache_attribute_mapping *mapping, + const char *s, + cache_rule_attribute_t *attribute) +{ + ss_dassert(attribute); + + while (mapping->name) + { + if (strcmp(s, mapping->name) == 0) + { + *attribute = mapping->value; + return true; + } + ++mapping; + } + + return false; +} + +/** + * Returns a string representation of a attribute. + * + * @param attribute An attribute type. + * + * @return Corresponding string, not to be freed. + */ +static const char *cache_rule_attribute_to_string(cache_rule_attribute_t attribute) +{ + switch (attribute) + { + case CACHE_ATTRIBUTE_COLUMN: + return "column"; + + case CACHE_ATTRIBUTE_DATABASE: + return "database"; + + case CACHE_ATTRIBUTE_QUERY: + return "query"; + + case CACHE_ATTRIBUTE_TABLE: + return "table"; + + default: + ss_dassert(!true); + return ""; + } +} + +/** + * Converts a string to an operator + * + * @param s A string + * @param op On successful return contains the corresponding operator. + * + * @return True if the string could be converted, false otherwise. + */ +static bool cache_rule_op_get(const char *s, cache_rule_op_t *op) +{ + if (strcmp(s, VALUE_OP_EQ) == 0) + { + *op = CACHE_OP_EQ; + return true; + } + + if (strcmp(s, VALUE_OP_NEQ) == 0) + { + *op = CACHE_OP_NEQ; + return true; + } + + if (strcmp(s, VALUE_OP_LIKE) == 0) + { + *op = CACHE_OP_LIKE; + return true; + } + + if (strcmp(s, VALUE_OP_UNLIKE) == 0) + { + *op = CACHE_OP_UNLIKE; + return true; + } + + return false; +} + +/** + * Returns a string representation of an operator. + * + * @param op An operator. + * + * @return Corresponding string, not to be freed. + */ +static const char *cache_rule_op_to_string(cache_rule_op_t op) +{ + switch (op) + { + case CACHE_OP_EQ: + return "="; + + case CACHE_OP_NEQ: + return "!="; + + case CACHE_OP_LIKE: + return "like"; + + case CACHE_OP_UNLIKE: + return "unlike"; + + default: + ss_dassert(!true); + return ""; + } +} + +/** + * Creates a CACHE_RULE object doing regexp matching. + * + * @param attribute What attribute this rule applies to. + * @param op An operator, CACHE_OP_LIKE or CACHE_OP_UNLIKE. + * @param value A regular expression. + * @param debug The debug level. + * + * @return A new rule object or NULL in case of failure. + */ +static CACHE_RULE *cache_rule_create_regexp(cache_rule_attribute_t attribute, + cache_rule_op_t op, + const char *cvalue, + uint32_t debug) +{ + ss_dassert((op == CACHE_OP_LIKE) || (op == CACHE_OP_UNLIKE)); + + CACHE_RULE *rule = NULL; + + int errcode; + PCRE2_SIZE erroffset; + pcre2_code *code = pcre2_compile((PCRE2_SPTR)cvalue, PCRE2_ZERO_TERMINATED, 0, + &errcode, &erroffset, NULL); + + if (code) + { + pcre2_match_data *data = pcre2_match_data_create_from_pattern(code, NULL); + + if (data) + { + rule = (CACHE_RULE*)MXS_CALLOC(1, sizeof(CACHE_RULE)); + char* value = MXS_STRDUP(cvalue); + + if (rule && value) + { + rule->attribute = attribute; + rule->op = op; + rule->value = value; + rule->regexp.code = code; + rule->regexp.data = data; + rule->debug = debug; + } + else + { + MXS_FREE(value); + MXS_FREE(rule); + pcre2_match_data_free(data); + pcre2_code_free(code); + } + } + else + { + MXS_ERROR("PCRE2 match data creation failed. Most likely due to a " + "lack of available memory."); + pcre2_code_free(code); + } + } + else + { + PCRE2_UCHAR errbuf[512]; + pcre2_get_error_message(errcode, errbuf, sizeof(errbuf)); + MXS_ERROR("Regex compilation failed at %d for regex '%s': %s", + (int)erroffset, cvalue, errbuf); + } + + return rule; +} + +/** + * Creates a CACHE_RULE object doing simple matching. + * + * @param attribute What attribute this rule applies to. + * @param op An operator, CACHE_OP_EQ or CACHE_OP_NEQ. + * @param value A string. + * @param debug The debug level. + * + * @return A new rule object or NULL in case of failure. + */ +static CACHE_RULE *cache_rule_create_simple(cache_rule_attribute_t attribute, + cache_rule_op_t op, + const char *cvalue, + uint32_t debug) +{ + ss_dassert((op == CACHE_OP_EQ) || (op == CACHE_OP_NEQ)); + + CACHE_RULE *rule = (CACHE_RULE*)MXS_CALLOC(1, sizeof(CACHE_RULE)); + + char *value = MXS_STRDUP(cvalue); + + if (rule && value) + { + rule->attribute = attribute; + rule->op = op; + rule->value = value; + rule->debug = debug; + } + else + { + MXS_FREE(value); + MXS_FREE(rule); + } + + return rule; +} + +/** + * Creates a CACHE_RULE object. + * + * @param attribute What attribute this rule applies to. + * @param op What operator is used. + * @param value The value. + * @param debug The debug level. + * + * @param rule The rule to be freed. + */ +static CACHE_RULE *cache_rule_create(cache_rule_attribute_t attribute, + cache_rule_op_t op, + const char *value, + uint32_t debug) +{ + CACHE_RULE *rule = NULL; + + switch (op) + { + case CACHE_OP_EQ: + case CACHE_OP_NEQ: + rule = cache_rule_create_simple(attribute, op, value, debug); + break; + + case CACHE_OP_LIKE: + case CACHE_OP_UNLIKE: + rule = cache_rule_create_regexp(attribute, op, value, debug); + break; + + default: + ss_dassert(!true); + MXS_ERROR("Internal error."); + break; + } + + return rule; +} + +/** + * Frees a CACHE_RULE object (and the one it points to). + * + * @param rule The rule to be freed. + */ +static void cache_rule_free(CACHE_RULE* rule) +{ + if (rule) + { + if (rule->next) + { + cache_rule_free(rule->next); + } + + MXS_FREE(rule->value); + + if ((rule->op == CACHE_OP_LIKE) || (rule->op == CACHE_OP_UNLIKE)) + { + pcre2_match_data_free(rule->regexp.data); + pcre2_code_free(rule->regexp.code); + } + + MXS_FREE(rule); + } +} + +/** + * Check whether a value matches a rule. + * + * @param self The rule object. + * @param value The value to check. + * + * @return True if the value matches, false otherwise. + */ +static bool cache_rule_compare(CACHE_RULE *self, const char *value) +{ + return cache_rule_compare_n(self, value, strlen(value)); +} + +/** + * Check whether a value matches a rule. + * + * @param self The rule object. + * @param value The value to check. + * @param len The length of value. + * + * @return True if the value matches, false otherwise. + */ +static bool cache_rule_compare_n(CACHE_RULE *self, const char *value, size_t length) +{ + bool compares = false; + + switch (self->op) + { + case CACHE_OP_EQ: + case CACHE_OP_NEQ: + compares = (strncmp(self->value, value, length) == 0); + break; + + case CACHE_OP_LIKE: + case CACHE_OP_UNLIKE: + compares = (pcre2_match(self->regexp.code, + (PCRE2_SPTR)value, length, + 0, 0, self->regexp.data, NULL) >= 0); + break; + + default: + ss_dassert(!true); + } + + if ((self->op == CACHE_OP_NEQ) || (self->op == CACHE_OP_UNLIKE)) + { + compares = !compares; + } + + return compares; +} + +/** + * Returns boolean indicating whether the column rule matches the query or not. + * + * @param self The CACHE_RULE object. + * @param default_db The current default db. + * @param query The query. + * + * @return True, if the rule matches, false otherwise. + */ +static bool cache_rule_matches_column(CACHE_RULE *self, const char *default_db, const GWBUF *query) +{ + ss_dassert(self->attribute == CACHE_ATTRIBUTE_COLUMN); + ss_info_dassert(!true, "Column matching not implemented yet."); + + return false; +} + +/** + * Returns boolean indicating whether the database rule matches the query or not. + * + * @param self The CACHE_RULE object. + * @param default_db The current default db. + * @param query The query. + * + * @return True, if the rule matches, false otherwise. + */ +static bool cache_rule_matches_database(CACHE_RULE *self, const char *default_db, const GWBUF *query) +{ + ss_dassert(self->attribute == CACHE_ATTRIBUTE_DATABASE); + + bool matches = false; + + int n; + char **names = qc_get_database_names((GWBUF*)query, &n); // TODO: Make qc const-correct. + + if (names) + { + int i = 0; + + while (!matches && (i < n)) + { + matches = cache_rule_compare(self, names[i]); + ++i; + } + + for (int i = 0; i < n; ++i) + { + MXS_FREE(names[i]); + } + MXS_FREE(names); + } + + if (!matches && default_db) + { + matches = cache_rule_compare(self, default_db); + } + + return matches; +} + +/** + * Returns boolean indicating whether the query rule matches the query or not. + * + * @param self The CACHE_RULE object. + * @param default_db The current default db. + * @param query The query. + * + * @return True, if the rule matches, false otherwise. + */ +static bool cache_rule_matches_query(CACHE_RULE *self, const char *default_db, const GWBUF *query) +{ + ss_dassert(self->attribute == CACHE_ATTRIBUTE_QUERY); + + char* sql; + int len; + + // Will succeed, query contains a contiguous COM_QUERY. + modutil_extract_SQL((GWBUF*)query, &sql, &len); + + return cache_rule_compare_n(self, sql, len); +} + +/** + * Returns boolean indicating whether the table rule matches the query or not. + * + * @param self The CACHE_RULE object. + * @param default_db The current default db. + * @param query The query. + * + * @return True, if the rule matches, false otherwise. + */ +static bool cache_rule_matches_table(CACHE_RULE *self, const char *default_db, const GWBUF *query) +{ + ss_dassert(self->attribute == CACHE_ATTRIBUTE_TABLE); + + bool matches = false; + + int n; + char **names; + bool fullnames; + + fullnames = false; + names = qc_get_table_names((GWBUF*)query, &n, fullnames); + + if (names) + { + int i = 0; + while (!matches && (i < n)) + { + char *name = names[i]; + matches = cache_rule_compare(self, name); + MXS_FREE(name); + ++i; + } + + if (i < n) + { + MXS_FREE(names[i]); + ++i; + } + + MXS_FREE(names); + + if (!matches) + { + fullnames = true; + names = qc_get_table_names((GWBUF*)query, &n, fullnames); + + size_t default_db_len = default_db ? strlen(default_db) : 0; + i = 0; + + while (!matches && (i < n)) + { + char *name = names[i]; + char *dot = strchr(name, '.'); + + if (!dot) + { + if (default_db) + { + name = (char*)MXS_MALLOC(default_db_len + 1 + strlen(name) + 1); + + strcpy(name, default_db); + strcpy(name + default_db_len, "."); + strcpy(name + default_db_len + 1, names[i]); + + MXS_FREE(names[i]); + names[i] = name; + } + } + + matches = cache_rule_compare(self, name); + MXS_FREE(name); + ++i; + } + + if (i < n) + { + MXS_FREE(names[i]); + ++i; + } + + MXS_FREE(names); + } + } + + return matches; +} + +/** + * Returns boolean indicating whether the user rule matches the user or not. + * + * @param self The CACHE_RULE object. + * @param user The current default db. + * + * @return True, if the rule matches, false otherwise. + */ +static bool cache_rule_matches_user(CACHE_RULE *self, const char *user) +{ + ss_dassert(self->attribute == CACHE_ATTRIBUTE_USER); + + return cache_rule_compare(self, user); +} + +/** + * Returns boolean indicating whether the rule matches the query or not. + * + * @param self The CACHE_RULE object. + * @param default_db The current default db. + * @param query The query. + * + * @return True, if the rule matches, false otherwise. + */ +static bool cache_rule_matches(CACHE_RULE *self, const char *default_db, const GWBUF *query) +{ + bool matches = false; + + switch (self->attribute) + { + case CACHE_ATTRIBUTE_COLUMN: + matches = cache_rule_matches_column(self, default_db, query); + break; + + case CACHE_ATTRIBUTE_DATABASE: + matches = cache_rule_matches_database(self, default_db, query); + break; + + case CACHE_ATTRIBUTE_TABLE: + matches = cache_rule_matches_table(self, default_db, query); + break; + + case CACHE_ATTRIBUTE_QUERY: + matches = cache_rule_matches_query(self, default_db, query); + break; + + case CACHE_ATTRIBUTE_USER: + ss_dassert(!true); + break; + + default: + ss_dassert(!true); + } + + if ((matches && (self->debug & CACHE_DEBUG_MATCHING)) || + (!matches && (self->debug & CACHE_DEBUG_NON_MATCHING))) + { + char* sql; + int sql_len; + modutil_extract_SQL((GWBUF*)query, &sql, &sql_len); + const char* text; + + if (matches) + { + text = "MATCHES"; + } + else + { + text = "does NOT match"; + } + + MXS_NOTICE("Rule { \"attribute\": \"%s\", \"op\": \"%s\", \"value\": \"%s\" } %s \"%.*s\".", + cache_rule_attribute_to_string(self->attribute), + cache_rule_op_to_string(self->op), + self->value, + text, + sql_len, sql); + } + + return matches; +} + +/** + * Append a rule to the tail of a chain or rules. + * + * @param head The head of the chain, can be NULL. + * @param tail The tail to be added to the chain. + * + * @return The head. + */ +static CACHE_RULE* cache_rule_append(CACHE_RULE* head, CACHE_RULE* tail) +{ + ss_dassert(tail); + + if (!head) + { + head = tail; + } + else + { + CACHE_RULE *rule = head; + + while (rule->next) + { + rule = rule->next; + } + + rule->next = tail; + } + + return head; +} + +/** + * Adds a "store" rule to the rules object + * + * @param self Pointer to the CACHE_RULES object that is being built. + * @param rule The rule to be added. + */ +static void cache_rules_add_store_rule(CACHE_RULES* self, CACHE_RULE* rule) +{ + self->store_rules = cache_rule_append(self->store_rules, rule); +} + +/** + * Adds a "store" rule to the rules object + * + * @param self Pointer to the CACHE_RULES object that is being built. + * @param rule The rule to be added. + */ +static void cache_rules_add_use_rule(CACHE_RULES* self, CACHE_RULE* rule) +{ + self->use_rules = cache_rule_append(self->use_rules, rule); +} + +/** + * Parses the JSON object used for configuring the rules. + * + * @param self Pointer to the CACHE_RULES object that is being built. + * @param root The root JSON object in the rules file. + * + * @return True, if the object could be parsed, false otherwise. + */ +static bool cache_rules_parse_json(CACHE_RULES *self, json_t *root) +{ + bool parsed = false; + json_t *store = json_object_get(root, KEY_STORE); + + if (store) + { + if (json_is_array(store)) + { + parsed = cache_rules_parse_array(self, store, KEY_STORE, cache_rules_parse_store_element); + } + else + { + MXS_ERROR("The cache rules object contains a `%s` key, but it is not an array.", KEY_STORE); + } + } + + if (!store || parsed) + { + json_t *use = json_object_get(root, KEY_USE); + + if (use) + { + if (json_is_array(use)) + { + parsed = cache_rules_parse_array(self, use, KEY_USE, cache_rules_parse_use_element); + } + else + { + MXS_ERROR("The cache rules object contains a `%s` key, but it is not an array.", KEY_USE); + } + } + } + + return parsed; +} + +/** + * Parses a array. + * + * @param self Pointer to the CACHE_RULES object that is being built. + * @param array An array. + * @param name The name of the array. + * @param parse_element Function for parsing an element. + * + * @return True, if the array could be parsed, false otherwise. + */ +static bool cache_rules_parse_array(CACHE_RULES *self, + json_t *store, + const char *name, + cache_rules_parse_element_t parse_element) +{ + ss_dassert(json_is_array(store)); + + bool parsed = true; + + size_t n = json_array_size(store); + size_t i = 0; + + while (parsed && (i < n)) + { + json_t *element = json_array_get(store, i); + ss_dassert(element); + + if (json_is_object(element)) + { + parsed = parse_element(self, element, i); + } + else + { + MXS_ERROR("Element %lu of the '%s' array is not an object.", i, name); + parsed = false; + } + + ++i; + } + + return parsed; +} + +/** + * Parses an object in an array. + * + * @param self Pointer to the CACHE_RULES object that is being built. + * @param object An object from the "store" array. + * @param index Index of the object in the array. + * + * @return True, if the object could be parsed, false otherwise. + */ +static CACHE_RULE *cache_rules_parse_element(CACHE_RULES *self, json_t *object, + const char* array_name, size_t index, + struct cache_attribute_mapping *mapping) +{ + ss_dassert(json_is_object(object)); + + CACHE_RULE *rule = NULL; + + json_t *a = json_object_get(object, KEY_ATTRIBUTE); + json_t *o = json_object_get(object, KEY_OP); + json_t *v = json_object_get(object, KEY_VALUE); + + if (a && o && v && json_is_string(a) && json_is_string(o) && json_is_string(v)) + { + cache_rule_attribute_t attribute; + + if (cache_rule_attribute_get(mapping, json_string_value(a), &attribute)) + { + cache_rule_op_t op; + + if (cache_rule_op_get(json_string_value(o), &op)) + { + rule = cache_rule_create(attribute, op, json_string_value(v), self->debug); + } + else + { + MXS_ERROR("Element %lu in the `%s` array has an invalid value " + "\"%s\" for 'op'.", index, array_name, json_string_value(o)); + } + } + else + { + MXS_ERROR("Element %lu in the `%s` array has an invalid value " + "\"%s\" for 'attribute'.", index, array_name, json_string_value(a)); + } + } + else + { + MXS_ERROR("Element %lu in the `%s` array does not contain " + "'attribute', 'op' and/or 'value', or one or all of them " + "is not a string.", index, array_name); + } + + return rule; +} + + +/** + * Parses an object in the "store" array. + * + * @param self Pointer to the CACHE_RULES object that is being built. + * @param object An object from the "store" array. + * @param index Index of the object in the array. + * + * @return True, if the object could be parsed, false otherwise. + */ +static bool cache_rules_parse_store_element(CACHE_RULES *self, json_t *object, size_t index) +{ + CACHE_RULE *rule = cache_rules_parse_element(self, object, KEY_STORE, index, cache_store_attributes); + + if (rule) + { + cache_rules_add_store_rule(self, rule); + } + + return rule != NULL; +} + +/** + * Parses an object in the "use" array. + * + * @param self Pointer to the CACHE_RULES object that is being built. + * @param object An object from the "store" array. + * @param index Index of the object in the array. + * + * @return True, if the object could be parsed, false otherwise. + */ +static bool cache_rules_parse_use_element(CACHE_RULES *self, json_t *object, size_t index) +{ + CACHE_RULE *rule = cache_rules_parse_element(self, object, KEY_USE, index, cache_use_attributes); + + if (rule) + { + cache_rules_add_use_rule(self, rule); + } + + return rule != NULL; +} diff --git a/server/modules/filter/cache/rules.h b/server/modules/filter/cache/rules.h new file mode 100644 index 000000000..0b69bf18e --- /dev/null +++ b/server/modules/filter/cache/rules.h @@ -0,0 +1,71 @@ +#ifndef _MAXSCALE_FILTER_CACHE_RULES_H +#define _MAXSCALE_FILTER_CACHE_RULES_H +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include +#include +#include +#include + + +typedef enum cache_rule_attribute +{ + CACHE_ATTRIBUTE_COLUMN, + CACHE_ATTRIBUTE_DATABASE, + CACHE_ATTRIBUTE_QUERY, + CACHE_ATTRIBUTE_TABLE, + CACHE_ATTRIBUTE_USER, +} cache_rule_attribute_t; + +typedef enum cache_rule_op +{ + CACHE_OP_EQ, + CACHE_OP_NEQ, + CACHE_OP_LIKE, + CACHE_OP_UNLIKE +} cache_rule_op_t; + + +typedef struct cache_rule +{ + cache_rule_attribute_t attribute; // What attribute is evalued. + cache_rule_op_t op; // What operator is used. + char *value; // The value from the rule file. + struct + { + pcre2_code* code; + pcre2_match_data* data; + } regexp; // Regexp data, only for CACHE_OP_[LIKE|UNLIKE]. + uint32_t debug; // The debug level. + struct cache_rule *next; +} CACHE_RULE; + +typedef struct cache_rules +{ + uint32_t debug; // The debug level. + CACHE_RULE *store_rules; // The rules for when to store data to the cache. + CACHE_RULE *use_rules; // The rules for when to use data from the cache. +} CACHE_RULES; + + +CACHE_RULES *cache_rules_create(uint32_t debug); +void cache_rules_free(CACHE_RULES *rules); + +CACHE_RULES *cache_rules_load(const char *path, uint32_t debug); + +bool cache_rules_should_store(CACHE_RULES *rules, const char *default_db, const GWBUF* query); +bool cache_rules_should_use(CACHE_RULES *rules, const SESSION *session); + +#endif diff --git a/server/modules/filter/cache/storage/storage_rocksdb/rocksdbstorage.cc b/server/modules/filter/cache/storage/storage_rocksdb/rocksdbstorage.cc index 84d593257..e0ca677bd 100644 --- a/server/modules/filter/cache/storage/storage_rocksdb/rocksdbstorage.cc +++ b/server/modules/filter/cache/storage/storage_rocksdb/rocksdbstorage.cc @@ -15,10 +15,21 @@ #include #include #include +#include +#include #include +#include #include +extern "C" +{ +// TODO: Add extern "C" to modutil.h +#include +} +#include #include "rocksdbinternals.h" +using std::for_each; +using std::set; using std::string; using std::unique_ptr; @@ -28,13 +39,52 @@ namespace string u_storageDirectory; -const size_t ROCKSDB_KEY_LENGTH = SHA512_DIGEST_LENGTH; +const size_t ROCKSDB_KEY_LENGTH = 2 * SHA512_DIGEST_LENGTH; + +#if ROCKSDB_KEY_LENGTH > CACHE_KEY_MAXLEN +#error storage_rocksdb key is too long. +#endif // See https://github.com/facebook/rocksdb/wiki/Basic-Operations#thread-pools // These figures should perhaps depend upon the number of cache instances. const size_t ROCKSDB_N_LOW_THREADS = 2; const size_t ROCKSDB_N_HIGH_THREADS = 1; +struct StorageRocksDBVersion +{ + uint8_t major; + uint8_t minor; + uint8_t correction; +}; + +const uint8_t STORAGE_ROCKSDB_MAJOR = 0; +const uint8_t STORAGE_ROCKSDB_MINOR = 1; +const uint8_t STORAGE_ROCKSDB_CORRECTION = 0; + +const StorageRocksDBVersion STORAGE_ROCKSDB_VERSION = +{ + STORAGE_ROCKSDB_MAJOR, + STORAGE_ROCKSDB_MINOR, + STORAGE_ROCKSDB_CORRECTION +}; + +string toString(const StorageRocksDBVersion& version) +{ + string rv; + + rv += "{ "; + rv += std::to_string(version.major); + rv += ", "; + rv += std::to_string(version.minor); + rv += ", "; + rv += std::to_string(version.correction); + rv += " }"; + + return rv; +} + +const char STORAGE_ROCKSDB_VERSION_KEY[] = "MaxScale_Storage_RocksDB_Version"; + } //private @@ -99,39 +149,162 @@ RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc options.env = rocksdb::Env::Default(); options.max_background_compactions = ROCKSDB_N_LOW_THREADS; options.max_background_flushes = ROCKSDB_N_HIGH_THREADS; - options.create_if_missing = true; - rocksdb::DBWithTTL* pDb; - rocksdb::Status status = rocksdb::DBWithTTL::Open(options, path, &pDb, ttl); + rocksdb::DBWithTTL* pDb; + rocksdb::Status status; + rocksdb::Slice key(STORAGE_ROCKSDB_VERSION_KEY); + + do + { + // Try to open existing. + options.create_if_missing = false; + options.error_if_exists = false; + + status = rocksdb::DBWithTTL::Open(options, path, &pDb, ttl); + + if (status.IsInvalidArgument()) // Did not exist + { + MXS_NOTICE("Database \"%s\" does not exist, creating.", path.c_str()); + + options.create_if_missing = true; + options.error_if_exists = true; + + status = rocksdb::DBWithTTL::Open(options, path, &pDb, ttl); + + if (status.ok()) + { + MXS_NOTICE("Database \"%s\" created, storing version %s into it.", + path.c_str(), toString(STORAGE_ROCKSDB_VERSION).c_str()); + + rocksdb::Slice value(reinterpret_cast(&STORAGE_ROCKSDB_VERSION), + sizeof(STORAGE_ROCKSDB_VERSION)); + + status = pDb->Put(rocksdb::WriteOptions(), key, value); + + if (!status.ok()) + { + MXS_ERROR("Could not store version information to created RocksDB database \"%s\". " + "You may need to delete the database and retry. RocksDB error: %s", + path.c_str(), + status.ToString().c_str()); + } + } + } + } + while (status.IsInvalidArgument()); RocksDBStorage* pStorage = nullptr; if (status.ok()) { - unique_ptr sDb(pDb); + std::string value; - pStorage = new RocksDBStorage(sDb, zName, path, ttl); + status = pDb->Get(rocksdb::ReadOptions(), key, &value); + + if (status.ok()) + { + const StorageRocksDBVersion* pVersion = + reinterpret_cast(value.data()); + + // When the version is bumped, it needs to be decided what if any + // backward compatibility is provided. After all, it's a cache, so + // you should be able to delete it at any point and pay a small + // price while the cache is rebuilt. + if ((pVersion->major == STORAGE_ROCKSDB_MAJOR) && + (pVersion->minor == STORAGE_ROCKSDB_MINOR) && + (pVersion->correction == STORAGE_ROCKSDB_CORRECTION)) + { + MXS_NOTICE("Version of \"%s\" is %s, version of storage_rocksdb is %s.", + path.c_str(), + toString(*pVersion).c_str(), + toString(STORAGE_ROCKSDB_VERSION).c_str()); + + unique_ptr sDb(pDb); + + pStorage = new RocksDBStorage(sDb, zName, path, ttl); + } + else + { + MXS_ERROR("Version of RocksDB database \"%s\" is %s, while version required " + "is %s. You need to delete the database and restart.", + path.c_str(), + toString(*pVersion).c_str(), + toString(STORAGE_ROCKSDB_VERSION).c_str()); + delete pDb; + } + } + else + { + MXS_ERROR("Could not read version information from RocksDB database %s. " + "You may need to delete the database and retry. RocksDB error: %s", + path.c_str(), + status.ToString().c_str()); + delete pDb; + } } else { - MXS_ERROR("Could not open RocksDB database %s using path %s: %s", - zName, path.c_str(), status.ToString().c_str()); + MXS_ERROR("Could not open/initialize RocksDB database %s. RocksDB error: %s", + path.c_str(), status.ToString().c_str()); } return pStorage; } -cache_result_t RocksDBStorage::getKey(const GWBUF* pQuery, char* pKey) +cache_result_t RocksDBStorage::getKey(const char* zDefaultDB, const GWBUF* pQuery, char* pKey) { - // ss_dassert(gwbuf_is_contiguous(pQuery)); - const uint8_t* pData = static_cast(GWBUF_DATA(pQuery)); - size_t len = MYSQL_GET_PACKET_LEN(pData) - 1; // Subtract 1 for packet type byte. + ss_dassert(GWBUF_IS_CONTIGUOUS(pQuery)); - const uint8_t* pSql = &pData[5]; // Symbolic constant for 5? + int n; + bool fullnames = true; + char** pzTables = qc_get_table_names(const_cast(pQuery), &n, fullnames); + + set dbs; // Elements in set are sorted. + + for (int i = 0; i < n; ++i) + { + char *zTable = pzTables[i]; + char *zDot = strchr(zTable, '.'); + + if (zDot) + { + *zDot = 0; + dbs.insert(zTable); + } + else if (zDefaultDB) + { + // If zDefaultDB is NULL, then there will be a table for which we + // do not know the database. However, that will fail in the server, + // so nothing will be stored. + dbs.insert(zDefaultDB); + } + MXS_FREE(zTable); + } + MXS_FREE(pzTables); + + // dbs now contain each accessed database in sorted order. Now copy them to a single string. + string tag; + for_each(dbs.begin(), dbs.end(), [&tag](const string& db) { tag.append(db); }); memset(pKey, 0, CACHE_KEY_MAXLEN); - SHA512(pSql, len, reinterpret_cast(pKey)); + const unsigned char* pData; + + // We store the databases in the first half of the key. That will ensure that + // identical queries targeting different default databases will not clash. + // This will also mean that entries related to the same databases will + // be placed near each other. + pData = reinterpret_cast(tag.data()); + SHA512(pData, tag.length(), reinterpret_cast(pKey)); + + char *pSql; + int length; + + modutil_extract_SQL(const_cast(pQuery), &pSql, &length); + + // Then we store the query itself in the second half of the key. + pData = reinterpret_cast(pSql); + SHA512(pData, length, reinterpret_cast(pKey) + SHA512_DIGEST_LENGTH); return CACHE_RESULT_OK; } diff --git a/server/modules/filter/cache/storage/storage_rocksdb/rocksdbstorage.h b/server/modules/filter/cache/storage/storage_rocksdb/rocksdbstorage.h index 8da4a5561..6c2eb1a72 100644 --- a/server/modules/filter/cache/storage/storage_rocksdb/rocksdbstorage.h +++ b/server/modules/filter/cache/storage/storage_rocksdb/rocksdbstorage.h @@ -27,7 +27,7 @@ public: static RocksDBStorage* Create(const char* zName, uint32_t ttl, int argc, char* argv[]); ~RocksDBStorage(); - cache_result_t getKey(const GWBUF* pQuery, char* pKey); + cache_result_t getKey(const char* zDefaultDB, const GWBUF* pQuery, char* pKey); cache_result_t getValue(const char* pKey, GWBUF** ppResult); cache_result_t putValue(const char* pKey, const GWBUF* pValue); diff --git a/server/modules/filter/cache/storage/storage_rocksdb/storage_rocksdb.cc b/server/modules/filter/cache/storage/storage_rocksdb/storage_rocksdb.cc index efa285450..d5e72aabd 100644 --- a/server/modules/filter/cache/storage/storage_rocksdb/storage_rocksdb.cc +++ b/server/modules/filter/cache/storage/storage_rocksdb/storage_rocksdb.cc @@ -55,10 +55,12 @@ void freeInstance(CACHE_STORAGE* pInstance) } cache_result_t getKey(CACHE_STORAGE* pStorage, + const char* zDefaultDB, const GWBUF* pQuery, char* pKey) { ss_dassert(pStorage); + // zDefaultDB may be NULL. ss_dassert(pQuery); ss_dassert(pKey); @@ -66,7 +68,7 @@ cache_result_t getKey(CACHE_STORAGE* pStorage, try { - result = reinterpret_cast(pStorage)->getKey(pQuery, pKey); + result = reinterpret_cast(pStorage)->getKey(zDefaultDB, pQuery, pKey); } catch (const std::bad_alloc&) { diff --git a/server/modules/monitor/mysqlmon.h b/server/modules/monitor/mysqlmon.h index e2b5f17c5..3eb551ccc 100644 --- a/server/modules/monitor/mysqlmon.h +++ b/server/modules/monitor/mysqlmon.h @@ -50,6 +50,8 @@ * @endverbatim */ +#define MYSQLMON_DEFAULT_FAILCOUNT 5 + /** * The handle for an instance of a MySQL Monitor module */ @@ -72,6 +74,9 @@ typedef struct char* script; /*< Script to call when state changes occur on servers */ bool events[MAX_MONITOR_EVENT]; /*< enabled events */ HASHTABLE *server_info; /**< Contains server specific information */ + bool failover; /**< If simple failover is enabled */ + int failcount; /**< How many monitoring cycles servers must be + down before failover is initiated */ } MYSQL_MONITOR; #endif diff --git a/server/modules/monitor/mysqlmon/mysql_mon.c b/server/modules/monitor/mysqlmon/mysql_mon.c index c45e7b60b..0b289ce74 100644 --- a/server/modules/monitor/mysqlmon/mysql_mon.c +++ b/server/modules/monitor/mysqlmon/mysql_mon.c @@ -273,6 +273,8 @@ startMonitor(MONITOR *monitor, const CONFIG_PARAMETER* params) handle->script = NULL; handle->multimaster = false; handle->mysql51_replication = false; + handle->failover = false; + handle->failcount = MYSQLMON_DEFAULT_FAILCOUNT; memset(handle->events, false, sizeof(handle->events)); spinlock_init(&handle->lock); } @@ -295,6 +297,19 @@ startMonitor(MONITOR *monitor, const CONFIG_PARAMETER* params) { handle->multimaster = config_truth_value(params->value); } + else if (!strcmp(params->name, "failover")) + { + handle->failover = config_truth_value(params->value); + } + else if (!strcmp(params->name, "failcount")) + { + handle->failcount = atoi(params->value); + if (handle->failcount <= 0) + { + MXS_ERROR("[%s] Invalid value for 'failcount': %s", monitor->name, params->value); + error = true; + } + } else if (!strcmp(params->name, "script")) { if (externcmd_can_execute(params->value)) @@ -352,6 +367,7 @@ startMonitor(MONITOR *monitor, const CONFIG_PARAMETER* params) hashtable_free(handle->server_info); MXS_FREE(handle->script); MXS_FREE(handle); + handle = NULL; } else if (thread_start(&handle->thread, monitorMain, monitor) == NULL) { @@ -1021,6 +1037,80 @@ void find_graph_cycles(MYSQL_MONITOR *handle, MONITOR_SERVERS *database, int nse } } +/** + * @brief Check whether failover conditions have been met + * + * This function checks whether all the conditions to trigger a failover have + * been met. For a failover to happen, only one server must be available and + * other servers must have passed the configured tolerance level of failures. + * + * @param handle Monitor instance + * @param db Monitor servers + * + * @return True if failover is required + */ +bool failover_required(MYSQL_MONITOR *handle, MONITOR_SERVERS *db) +{ + int candidates = 0; + + while (db) + { + if (SERVER_IS_RUNNING(db->server)) + { + candidates++; + MYSQL_SERVER_INFO *server_info = hashtable_fetch(handle->server_info, db->server->unique_name); + + if (server_info->read_only || candidates > 1) + { + return false; + } + } + else if (db->mon_err_count < handle->failcount) + { + return false; + } + + db = db->next; + } + + return candidates == 1; +} + +/** + * @brief Initiate simple failover + * + * This function does the actual failover by assigning the last remaining server + * the master status and setting all other servers into maintenance mode. By + * setting the servers into maintenance mode, we prevent any possible conflicts + * when the failed servers come back up. + * + * @param handle Monitor instance + * @param db Monitor servers + */ +void do_failover(MYSQL_MONITOR *handle, MONITOR_SERVERS *db) +{ + while (db) + { + if (SERVER_IS_RUNNING(db->server)) + { + if (!SERVER_IS_MASTER(db->server)) + { + MXS_WARNING("Failover initiated, server '%s' is now the master. " + "All other servers are set into maintenance mode.", + db->server->unique_name); + } + + monitor_set_pending_status(db, SERVER_MASTER); + monitor_clear_pending_status(db, SERVER_SLAVE); + } + else + { + monitor_set_pending_status(db, SERVER_MAINT); + } + db = db->next; + } +} + /** * The entry point for the monitoring module thread * @@ -1296,6 +1386,17 @@ monitorMain(void *arg) ptr = ptr->next; } + /** Now that all servers have their status correctly set, we can check + if we need to do a failover */ + if (handle->failover) + { + if (failover_required(handle, mon->databases)) + { + /** Other servers have died, initiate a failover to the last remaining server */ + do_failover(handle, mon->databases); + } + } + ptr = mon->databases; monitor_event_t evtype; while (ptr) diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index c30011465..326c0d128 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -1511,6 +1511,12 @@ backend_ref_t *get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb) * * Calls hang-up function for DCB if it is not both running and in * master/slave/joined/ndb role. Called by DCB's callback routine. + * + * @param dcb DCB relating to a backend server + * @param reason The reason for the state change + * @param data Data is a backend reference structure belonging to this router + * + * @return 1 for success, 0 for failure */ int router_handle_state_switch(DCB *dcb, DCB_REASON reason, void *data) { @@ -1668,6 +1674,14 @@ static bool rwsplit_process_router_options(ROUTER_INSTANCE *router, return success; } +/** + * @brief Handle an error reply for a client + * + * @param ses Session + * @param rses Router session + * @param backend_dcb DCB for the backend server that has failed + * @param errmsg GWBUF containing the error message + */ static void handle_error_reply_client(SESSION *ses, ROUTER_CLIENT_SES *rses, DCB *backend_dcb, GWBUF *errmsg) { @@ -1801,6 +1815,13 @@ return_succp: return succp; } +/** + * @brief Calculate the number of backend servers + * + * @param inst Router instance + * + * @return int - count of servers + */ static int router_get_servercount(ROUTER_INSTANCE *inst) { int router_nservers = 0; @@ -1814,6 +1835,16 @@ static int router_get_servercount(ROUTER_INSTANCE *inst) return router_nservers; } +/** + * @brief Calculate whether we have enough servers to route a query + * + * @param p_rses Router session + * @param min_nsrv Minimum number of servers that is sufficient + * @param nsrv Actual number of servers + * @param router Router instance + * + * @return bool - whether enough, side effect is error logging + */ static bool have_enough_servers(ROUTER_CLIENT_SES **p_rses, const int min_nsrv, int router_nsrv, ROUTER_INSTANCE *router) { diff --git a/server/modules/routing/readwritesplit/rwsplit_mysql.c b/server/modules/routing/readwritesplit/rwsplit_mysql.c index bb6c30633..18c1f5a73 100644 --- a/server/modules/routing/readwritesplit/rwsplit_mysql.c +++ b/server/modules/routing/readwritesplit/rwsplit_mysql.c @@ -60,8 +60,26 @@ */ /* This could be placed in the protocol, with a new API entry point - * It is certainly MySQL specific. - * */ + * It is certainly MySQL specific. Packet types are DB specific, but can be + * assumed to be enums, which can be handled as integers without knowing + * which DB is involved until the packet type needs to be interpreted. + * + */ + +/** + * @brief Determine packet type + * + * Examine the packet in the buffer to extract the type, if possible. At the + * same time set the second parameter to indicate whether the packet was + * empty. + * + * It is assumed that the packet length and type are contained within a single + * buffer, the one indicated by the first parameter. + * + * @param querybuf Buffer containing the packet + * @param non_empty_packet bool indicating whether the packet is non-empty + * @return The packet type, or MYSQL_COM_UNDEFINED; also the second parameter is set + */ int determine_packet_type(GWBUF *querybuf, bool *non_empty_packet) { @@ -85,6 +103,17 @@ determine_packet_type(GWBUF *querybuf, bool *non_empty_packet) /* * This appears to be MySQL specific */ +/** + * @brief Determine if a packet contains a SQL query + * + * Packet type tells us this, but in a DB specific way. This function is + * provided so that code that is not DB specific can find out whether a packet + * contains a SQL query. Clearly, to be effective different functions must be + * called for different DB types. + * + * @param packet_type Type of packet (integer) + * @return bool indicating whether packet contains a SQL query + */ bool is_packet_a_query(int packet_type) { @@ -94,6 +123,17 @@ is_packet_a_query(int packet_type) /* * This looks MySQL specific */ +/** + * @brief Determine if a packet contains a one way message + * + * Packet type tells us this, but in a DB specific way. This function is + * provided so that code that is not DB specific can find out whether a packet + * contains a one way messsage. Clearly, to be effective different functions must be + * called for different DB types. + * + * @param packet_type Type of packet (integer) + * @return bool indicating whether packet contains a one way message + */ bool is_packet_a_one_way_message(int packet_type) { @@ -105,6 +145,17 @@ is_packet_a_one_way_message(int packet_type) * This one is problematic because it is MySQL specific, but also router * specific. */ +/** + * @brief Log the transaction status + * + * The router session and the query buffer are used to log the transaction + * status, along with the query type (which is a generic description that + * should be usable across all DB types). + * + * @param rses Router session + * @param querybuf Query buffer + * @param qtype Query type + */ void log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t qtype) { @@ -140,6 +191,23 @@ log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t * utility to convert packet type to a string. The aim is for most of this * code to remain as part of the router. */ +/** + * @brief Operations to be carried out if request is for all backend servers + * + * If the choice of sending to all backends is in conflict with other bit + * settings in route_target, then error messages are written to the log. + * + * Otherwise, the function route_session_write is called to carry out the + * actual routing. + * + * @param route_target Bit map indicating where packet should be routed + * @param inst Router instance + * @param rses Router session + * @param querybuf Query buffer containing packet + * @param packet_type Integer (enum) indicating type of packet + * @param qtype Query type + * @return bool indicating whether the session can continue + */ bool handle_target_is_all(route_target_t route_target, ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, @@ -218,6 +286,15 @@ handle_target_is_all(route_target_t route_target, } /* This is MySQL specific */ +/** + * @brief Write an error message to the log indicating failure + * + * Used when an attempt to lock the router session fails. + * + * @param querybuf Query buffer containing packet + * @param packet_type Integer (enum) indicating type of packet + * @param qtype Query type + */ void session_lock_failure_handling(GWBUF *querybuf, int packet_type, qc_query_type_t qtype) { @@ -237,6 +314,14 @@ session_lock_failure_handling(GWBUF *querybuf, int packet_type, qc_query_type_t /* * Probably MySQL specific because of modutil function */ +/** + * @brief Write an error message to the log for closed session + * + * This happens if a request is received for a session that is already + * closing down. + * + * @param querybuf Query buffer containing packet + */ void closed_session_reply(GWBUF *querybuf) { uint8_t* data = GWBUF_DATA(querybuf); @@ -254,6 +339,15 @@ void closed_session_reply(GWBUF *querybuf) /* * Probably MySQL specific because of modutil function */ +/** + * @brief First step to handle request in a live session + * + * Used when a request is about to be routed. Note that the query buffer is + * passed by name and is likely to be modified by this function. + * + * @param querybuf Query buffer containing packet + * @param rses Router session + */ void live_session_reply(GWBUF **querybuf, ROUTER_CLIENT_SES *rses) { GWBUF *tmpbuf = *querybuf; @@ -276,6 +370,16 @@ void live_session_reply(GWBUF **querybuf, ROUTER_CLIENT_SES *rses) /* * Uses MySQL specific mechanisms */ +/** + * @brief Write an error message to the log for session lock failure + * + * This happens when processing a client reply and the session cannot be + * locked. + * + * @param rses Router session + * @param buf Query buffer containing reply data + * @param dcb The backend DCB that sent the reply + */ void print_error_packet(ROUTER_CLIENT_SES *rses, GWBUF *buf, DCB *dcb) { #if defined(SS_DEBUG) @@ -326,6 +430,15 @@ void print_error_packet(ROUTER_CLIENT_SES *rses, GWBUF *buf, DCB *dcb) /* * Uses MySQL specific mechanisms */ +/** + * @brief Check the reply from a backend server to a session command + * + * If the reply is an error, a message may be logged. + * + * @param writebuf Query buffer containing reply data + * @param scur Session cursor + * @param bref Router session data for a backend server + */ void check_session_command_reply(GWBUF *writebuf, sescmd_cursor_t *scur, backend_ref_t *bref) { if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_ERR) && @@ -349,7 +462,7 @@ void check_session_command_reply(GWBUF *writebuf, sescmd_cursor_t *scur, backend } /** - * If session command cursor is passive, sends the command to backend for + * @brief If session command cursor is passive, sends the command to backend for * execution. * * Returns true if command was sent or added successfully to the queue. @@ -357,6 +470,9 @@ void check_session_command_reply(GWBUF *writebuf, sescmd_cursor_t *scur, backend * commands. * * Router session must be locked. + * + * @param backend_ref Router session backend database data + * @return bool - true for success, false for failure */ /* * Uses MySQL specific values in the large switch statement, although it diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.c b/server/modules/routing/readwritesplit/rwsplit_route_stmt.c index f8ca27c4d..2e2ca2ab1 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.c +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.c @@ -424,6 +424,14 @@ return_succp: return succp; } +/** + * @brief Function to hash keys in read-write split router + * + * Used to store information about temporary tables. + * + * @param key key to be hashed, actually a character string + * @result the hash value integer + */ int rwsplit_hashkeyfun(const void *key) { if (key == NULL) @@ -441,6 +449,15 @@ int rwsplit_hashkeyfun(const void *key) return hash; } +/** + * @brief Function to compare hash keys in read-write split router + * + * Used to manage information about temporary tables. + * + * @param key first key to be compared, actually a character string + * @param v2 second key to be compared, actually a character string + * @result 1 if keys are equal, 0 otherwise + */ int rwsplit_hashcmpfun(const void *v1, const void *v2) { const char *i1 = (const char *)v1; @@ -449,12 +466,27 @@ int rwsplit_hashcmpfun(const void *v1, const void *v2) return strcmp(i1, i2); } +/** + * @brief Function to duplicate a hash value in read-write split router + * + * Used to manage information about temporary tables. + * + * @param fval value to be duplicated, actually a character string + * @result the duplicated value, actually a character string + */ void *rwsplit_hstrdup(const void *fval) { char *str = (char *)fval; return MXS_STRDUP(str); } +/** + * @brief Function to free hash values in read-write split router + * + * Used to manage information about temporary tables. + * + * @param key value to be freed + */ void rwsplit_hfree(void *fval) { MXS_FREE(fval); @@ -869,6 +901,16 @@ route_target_t get_route_target(ROUTER_CLIENT_SES *rses, return target; } +/** + * @brief Handle multi statement queries and load statements + * + * One of the possible types of handling required when a request is routed + * + * @param ses Router session + * @param querybuf Buffer containing query to be routed + * @param packet_type Type of packet (database specific) + * @param qtype Query type + */ void handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, int packet_type, int *qtype) @@ -955,6 +997,18 @@ handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, } } +/** + * @brief Handle hinted target query + * + * One of the possible types of handling required when a request is routed + * + * @param ses Router session + * @param querybuf Buffer containing query to be routed + * @param route_target Target for the query + * @param target_dcb DCB for the target server + * + * @return bool - true if succeeded, false otherwise + */ bool handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, route_target_t route_target, DCB **target_dcb) { @@ -1027,6 +1081,17 @@ bool handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, return succp; } +/** + * @brief Handle slave is the target + * + * One of the possible types of handling required when a request is routed + * + * @param inst Router instance + * @param ses Router session + * @param target_dcb DCB for the target server + * + * @return bool - true if succeeded, false otherwise + */ bool handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, DCB **target_dcb) { @@ -1050,6 +1115,17 @@ bool handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, } } +/** + * @brief Handle master is the target + * + * One of the possible types of handling required when a request is routed + * + * @param inst Router instance + * @param ses Router session + * @param target_dcb DCB for the target server + * + * @return bool - true if succeeded, false otherwise + */ bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, DCB **target_dcb) { @@ -1090,6 +1166,18 @@ bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, return succp; } +/** + * @brief Handle got a target + * + * One of the possible types of handling required when a request is routed + * + * @param inst Router instance + * @param ses Router session + * @param querybuf Buffer containing query to be routed + * @param target_dcb DCB for the target server + * + * @return bool - true if succeeded, false otherwise + */ bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, GWBUF *querybuf, DCB *target_dcb) @@ -1149,7 +1237,11 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, } /** - * Create a generic router session property structure. + * @brief Create a generic router session property structure. + * + * @param prop_type Property type + * + * @return property structure of requested type, or NULL if failed */ rses_property_t *rses_property_init(rses_property_type_t prop_type) { @@ -1171,11 +1263,18 @@ rses_property_t *rses_property_init(rses_property_type_t prop_type) } /** + * @brief Add property to the router client session + * * Add property to the router_client_ses structure's rses_properties * array. The slot is determined by the type of property. * In each slot there is a list of properties of similar type. * * Router client session must be locked. + * + * @param rses Router session + * @param prop Router session property to be added + * + * @return -1 on failure, 0 on success */ int rses_property_add(ROUTER_CLIENT_SES *rses, rses_property_t *prop) { diff --git a/server/modules/routing/readwritesplit/rwsplit_tmp_table_multi.c b/server/modules/routing/readwritesplit/rwsplit_tmp_table_multi.c index 996583416..04177f317 100644 --- a/server/modules/routing/readwritesplit/rwsplit_tmp_table_multi.c +++ b/server/modules/routing/readwritesplit/rwsplit_tmp_table_multi.c @@ -44,6 +44,8 @@ */ /** + * @brief Check for dropping of temporary tables + * * Check if the query is a DROP TABLE... query and * if it targets a temporary table, remove it from the hashtable. * @param router_cli_ses Router client session @@ -351,6 +353,15 @@ bool check_for_multi_stmt(GWBUF *buf, void *protocol, mysql_server_cmd_t packet_ return rval; } +/** + * @brief Determine the type of a query + * + * @param querybuf GWBUF containing the query + * @param packet_type Integer denoting DB specific enum + * @param non_empty_packet Boolean to be set by this function + * + * @return qc_query_type_t the query type; also the non_empty_packet bool is set + */ qc_query_type_t determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet) {