Disable write-ahead log
Since it's a cache, we do not need to retain the data from one MaxScale invocation to the next. Consequently, we can turn off write ahead logging completely if we simply wipe the RocksDB database at each startup. In addition, the location of the cache directory can now be specified explicitly, so it can be placed, for instance, on a RAM disk.
This commit is contained in:
parent
9e4ee0323d
commit
97b039689f
@ -381,4 +381,26 @@ regardless of what host the `admin` user comes from.
|
||||
|
||||
# Storage
|
||||
|
||||
## Storage RocksDB
|
||||
## `storage_rocksdb`
|
||||
|
||||
This storage module uses RocksDB database for storing the cached data. The
|
||||
directory where the RocksDB database will be created is by default created
|
||||
into the _MaxScale cache_ directory, which usually is not on a RAM disk. For
|
||||
maximum performance, you may want to explicitly place the RocksDB database
|
||||
on a RAM disk.
|
||||
|
||||
### Parameters
|
||||
|
||||
#### `cache_directory`
|
||||
|
||||
Specifies the directory under which the filter instance specific RocksDB
|
||||
databases will be placed. Note that at startup, each RocksDB database will
|
||||
be deleted and recreated. That is, cache content will not be retained across
|
||||
MaxScale restarts.
|
||||
|
||||
```
|
||||
storage_options=cache_directory=/mnt/maxscale-cache
|
||||
```
|
||||
|
||||
With the above setting a directory `/mnt/macscale-cache/storage_rocksdb` will
|
||||
created, under which the actual instance specific cache directories are created.
|
||||
|
58
server/modules/filter/cache/cache.c
vendored
58
server/modules/filter/cache/cache.c
vendored
@ -12,6 +12,7 @@
|
||||
*/
|
||||
|
||||
#define MXS_MODULE_NAME "cache"
|
||||
#include "cache.h"
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/filter.h>
|
||||
#include <maxscale/gwdirs.h>
|
||||
@ -20,7 +21,6 @@
|
||||
#include <maxscale/modutil.h>
|
||||
#include <maxscale/mysql_utils.h>
|
||||
#include <maxscale/query_classifier.h>
|
||||
#include "cache.h"
|
||||
#include "rules.h"
|
||||
#include "storage.h"
|
||||
|
||||
@ -95,9 +95,11 @@ typedef struct cache_config
|
||||
{
|
||||
uint32_t max_resultset_rows;
|
||||
uint32_t max_resultset_size;
|
||||
const char* rules;
|
||||
const char *rules;
|
||||
const char *storage;
|
||||
const char *storage_options;
|
||||
char *storage_options;
|
||||
char **storage_argv;
|
||||
int storage_argc;
|
||||
uint32_t ttl;
|
||||
uint32_t debug;
|
||||
} CACHE_CONFIG;
|
||||
@ -109,6 +111,8 @@ static const CACHE_CONFIG DEFAULT_CONFIG =
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
0,
|
||||
CACHE_DEFAULT_TTL,
|
||||
CACHE_DEFAULT_DEBUG
|
||||
};
|
||||
@ -214,7 +218,11 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
|
||||
|
||||
if (module)
|
||||
{
|
||||
CACHE_STORAGE *storage = module->api->createInstance(name, config.ttl, 0, NULL);
|
||||
uint32_t ttl = config.ttl;
|
||||
int argc = config.storage_argc;
|
||||
char** argv = config.storage_argv;
|
||||
|
||||
CACHE_STORAGE *storage = module->api->createInstance(name, ttl, argc, argv);
|
||||
|
||||
if (storage)
|
||||
{
|
||||
@ -976,7 +984,47 @@ static bool process_params(char **options, FILTER_PARAMETER **params, CACHE_CONF
|
||||
}
|
||||
else if (strcmp(param->name, "storage_options") == 0)
|
||||
{
|
||||
config->storage_options = param->value;
|
||||
config->storage_options = MXS_STRDUP(param->value);
|
||||
|
||||
if (config->storage_options)
|
||||
{
|
||||
int argc = 1;
|
||||
char *arg = config->storage_options;
|
||||
|
||||
while ((arg = strchr(config->storage_options, ',')))
|
||||
{
|
||||
++argc;
|
||||
}
|
||||
|
||||
config->storage_argv = (char**) MXS_MALLOC((argc + 1) * sizeof(char*));
|
||||
|
||||
if (config->storage_argv)
|
||||
{
|
||||
config->storage_argc = argc;
|
||||
|
||||
int i = 0;
|
||||
arg = config->storage_options;
|
||||
config->storage_argv[i++] = arg;
|
||||
|
||||
while ((arg = strchr(config->storage_options, ',')))
|
||||
{
|
||||
*arg = 0;
|
||||
++arg;
|
||||
config->storage_argv[i++] = arg;
|
||||
}
|
||||
|
||||
config->storage_argv[i] = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_FREE(config->storage_options);
|
||||
config->storage_options = NULL;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
error = true;
|
||||
}
|
||||
}
|
||||
else if (strcmp(param->name, "storage") == 0)
|
||||
{
|
||||
|
1
server/modules/filter/cache/cache.h
vendored
1
server/modules/filter/cache/cache.h
vendored
@ -14,6 +14,7 @@
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include <maxscale/cdefs.h>
|
||||
#include <limits.h>
|
||||
|
||||
MXS_BEGIN_DECLS
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <openssl/sha.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <fts.h>
|
||||
#include <algorithm>
|
||||
#include <set>
|
||||
#include <rocksdb/env.h>
|
||||
@ -37,8 +38,6 @@ using std::unique_ptr;
|
||||
namespace
|
||||
{
|
||||
|
||||
string u_storageDirectory;
|
||||
|
||||
const size_t ROCKSDB_KEY_LENGTH = 2 * SHA512_DIGEST_LENGTH;
|
||||
|
||||
#if ROCKSDB_KEY_LENGTH > CACHE_KEY_MAXLEN
|
||||
@ -85,8 +84,116 @@ string toString(const StorageRocksDBVersion& version)
|
||||
|
||||
const char STORAGE_ROCKSDB_VERSION_KEY[] = "MaxScale_Storage_RocksDB_Version";
|
||||
|
||||
/**
|
||||
* Deletes a path, irrespective of whether it represents a file, a directory
|
||||
* or a directory hierarchy. If the path does not exist, then the path is
|
||||
* considered to have been removed.
|
||||
*
|
||||
* @param path A path (file or directory).
|
||||
*
|
||||
* @return True if the path could be deleted, false otherwise.
|
||||
*/
|
||||
bool deletePath(const string& path)
|
||||
{
|
||||
int rv = false;
|
||||
|
||||
struct stat st;
|
||||
|
||||
if (stat(path.c_str(), &st) == -1)
|
||||
{
|
||||
if (errno == ENOENT)
|
||||
{
|
||||
rv = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
char errbuf[MXS_STRERROR_BUFLEN];
|
||||
MXS_ERROR("Could not stat: %s", strerror_r(errno, errbuf, sizeof(errbuf)));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_NOTICE("Deleting cache storage at '%s'.", path.c_str());
|
||||
|
||||
rv = true;
|
||||
|
||||
char* files[] = { (char *) path.c_str(), NULL };
|
||||
|
||||
// FTS_NOCHDIR - Do not change CWD while traversing.
|
||||
// FTS_PHYSICAL - Don't follow symlinks.
|
||||
// FTS_XDEV - Don't cross filesystem boundaries
|
||||
FTS *pFts = fts_open(files, FTS_NOCHDIR | FTS_PHYSICAL | FTS_XDEV, NULL);
|
||||
|
||||
if (pFts) {
|
||||
FTSENT* pCurrent;
|
||||
while ((pCurrent = fts_read(pFts)))
|
||||
{
|
||||
switch (pCurrent->fts_info)
|
||||
{
|
||||
case FTS_NS:
|
||||
case FTS_DNR:
|
||||
case FTS_ERR:
|
||||
{
|
||||
char errbuf[MXS_STRERROR_BUFLEN];
|
||||
MXS_ERROR("Error while traversing %s: %s",
|
||||
pCurrent->fts_accpath,
|
||||
strerror_r(pCurrent->fts_errno, errbuf, sizeof(errbuf)));
|
||||
rv = false;
|
||||
}
|
||||
break;
|
||||
|
||||
case FTS_DC:
|
||||
case FTS_DOT:
|
||||
case FTS_NSOK:
|
||||
// Not reached unless FTS_LOGICAL, FTS_SEEDOT, or FTS_NOSTAT were
|
||||
// passed to fts_open()
|
||||
break;
|
||||
|
||||
case FTS_D:
|
||||
// Do nothing. Need depth-first search, so directories are deleted
|
||||
// in FTS_DP
|
||||
break;
|
||||
|
||||
case FTS_DP:
|
||||
case FTS_F:
|
||||
case FTS_SL:
|
||||
case FTS_SLNONE:
|
||||
case FTS_DEFAULT:
|
||||
if (remove(pCurrent->fts_accpath) < 0)
|
||||
{
|
||||
char errbuf[MXS_STRERROR_BUFLEN];
|
||||
MXS_ERROR("Could not remove '%s', the cache directory may need to "
|
||||
"be deleted manually: %s",
|
||||
pCurrent->fts_accpath,
|
||||
strerror_r(errno, errbuf, sizeof(errbuf)));
|
||||
rv = false;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
ss_dassert(!true);
|
||||
}
|
||||
}
|
||||
|
||||
if (rv)
|
||||
{
|
||||
MXS_NOTICE("Deleted cache storage at '%s'.", path.c_str());
|
||||
}
|
||||
|
||||
if (pFts) {
|
||||
fts_close(pFts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//private
|
||||
rocksdb::WriteOptions RocksDBStorage::s_writeOptions;
|
||||
|
||||
//private
|
||||
RocksDBStorage::RocksDBStorage(unique_ptr<rocksdb::DBWithTTL>& sDb,
|
||||
const string& name,
|
||||
@ -106,69 +213,98 @@ RocksDBStorage::~RocksDBStorage()
|
||||
//static
|
||||
bool RocksDBStorage::Initialize()
|
||||
{
|
||||
bool initialized = true;
|
||||
auto pEnv = rocksdb::Env::Default();
|
||||
pEnv->SetBackgroundThreads(ROCKSDB_N_LOW_THREADS, rocksdb::Env::LOW);
|
||||
pEnv->SetBackgroundThreads(ROCKSDB_N_HIGH_THREADS, rocksdb::Env::HIGH);
|
||||
|
||||
u_storageDirectory = get_cachedir();
|
||||
u_storageDirectory += "/storage_rocksdb";
|
||||
// No logging; the database will always be deleted at startup, so there's
|
||||
// no reason for usinf space and processing for writing the write ahead log.
|
||||
s_writeOptions.disableWAL = true;
|
||||
|
||||
if (mkdir(u_storageDirectory.c_str(), S_IRWXU) == 0)
|
||||
{
|
||||
MXS_NOTICE("Created storage directory %s.", u_storageDirectory.c_str());
|
||||
}
|
||||
else if (errno != EEXIST)
|
||||
{
|
||||
initialized = false;
|
||||
char errbuf[MXS_STRERROR_BUFLEN];
|
||||
|
||||
MXS_ERROR("Failed to create storage directory %s: %s",
|
||||
u_storageDirectory.c_str(),
|
||||
strerror_r(errno, errbuf, sizeof(errbuf)));
|
||||
}
|
||||
else
|
||||
{
|
||||
auto pEnv = rocksdb::Env::Default();
|
||||
pEnv->SetBackgroundThreads(ROCKSDB_N_LOW_THREADS, rocksdb::Env::LOW);
|
||||
pEnv->SetBackgroundThreads(ROCKSDB_N_HIGH_THREADS, rocksdb::Env::HIGH);
|
||||
}
|
||||
|
||||
return initialized;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
//static
|
||||
RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc, char* argv[])
|
||||
{
|
||||
ss_dassert(zName);
|
||||
|
||||
string path(u_storageDirectory);
|
||||
string storageDirectory = get_cachedir();
|
||||
|
||||
path += "/";
|
||||
path += zName;
|
||||
|
||||
rocksdb::Options options;
|
||||
options.env = rocksdb::Env::Default();
|
||||
options.max_background_compactions = ROCKSDB_N_LOW_THREADS;
|
||||
options.max_background_flushes = ROCKSDB_N_HIGH_THREADS;
|
||||
|
||||
rocksdb::DBWithTTL* pDb;
|
||||
rocksdb::Status status;
|
||||
rocksdb::Slice key(STORAGE_ROCKSDB_VERSION_KEY);
|
||||
|
||||
do
|
||||
for (int i = 0; i < argc; ++i)
|
||||
{
|
||||
// Try to open existing.
|
||||
options.create_if_missing = false;
|
||||
options.error_if_exists = false;
|
||||
size_t len = strlen(argv[i]);
|
||||
char arg[len + 1];
|
||||
strcpy(arg, argv[i]);
|
||||
|
||||
status = rocksdb::DBWithTTL::Open(options, path, &pDb, ttl);
|
||||
const char* zValue = NULL;
|
||||
char *zEq = strchr(arg, '=');
|
||||
|
||||
if (status.IsInvalidArgument()) // Did not exist
|
||||
if (zEq)
|
||||
{
|
||||
MXS_NOTICE("Database \"%s\" does not exist, creating.", path.c_str());
|
||||
*zEq = 0;
|
||||
zValue = trim(zEq + 1);
|
||||
}
|
||||
|
||||
const char* zKey = trim(arg);
|
||||
|
||||
if (strcmp(zKey, "cache_directory") == 0)
|
||||
{
|
||||
if (zValue)
|
||||
{
|
||||
storageDirectory = zValue;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_WARNING("No value specified for '%s', using default '%s' instead.",
|
||||
zKey, get_cachedir());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_WARNING("Unknown argument '%s'.", zKey);
|
||||
}
|
||||
}
|
||||
|
||||
storageDirectory += "/storage_rocksdb";
|
||||
|
||||
return Create(storageDirectory, zName, ttl);
|
||||
}
|
||||
|
||||
// static
|
||||
RocksDBStorage* RocksDBStorage::Create(const string& storageDirectory, const char* zName, uint32_t ttl)
|
||||
{
|
||||
RocksDBStorage* pStorage = nullptr;
|
||||
|
||||
if (mkdir(storageDirectory.c_str(), S_IRWXU) == 0)
|
||||
{
|
||||
MXS_NOTICE("Created storage directory %s.", storageDirectory.c_str());
|
||||
}
|
||||
else if (errno != EEXIST)
|
||||
{
|
||||
char errbuf[MXS_STRERROR_BUFLEN];
|
||||
MXS_ERROR("Failed to create storage directory %s: %s",
|
||||
storageDirectory.c_str(),
|
||||
strerror_r(errno, errbuf, sizeof(errbuf)));
|
||||
}
|
||||
else
|
||||
{
|
||||
string path(storageDirectory + "/" + zName);
|
||||
|
||||
if (deletePath(path))
|
||||
{
|
||||
rocksdb::Options options;
|
||||
options.env = rocksdb::Env::Default();
|
||||
options.max_background_compactions = ROCKSDB_N_LOW_THREADS;
|
||||
options.max_background_flushes = ROCKSDB_N_HIGH_THREADS;
|
||||
|
||||
options.create_if_missing = true;
|
||||
options.error_if_exists = true;
|
||||
|
||||
rocksdb::DBWithTTL* pDb;
|
||||
rocksdb::Status status;
|
||||
rocksdb::Slice key(STORAGE_ROCKSDB_VERSION_KEY);
|
||||
|
||||
status = rocksdb::DBWithTTL::Open(options, path, &pDb, ttl);
|
||||
|
||||
if (status.ok())
|
||||
@ -179,7 +315,7 @@ RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc
|
||||
rocksdb::Slice value(reinterpret_cast<const char*>(&STORAGE_ROCKSDB_VERSION),
|
||||
sizeof(STORAGE_ROCKSDB_VERSION));
|
||||
|
||||
status = pDb->Put(rocksdb::WriteOptions(), key, value);
|
||||
status = pDb->Put(writeOptions(), key, value);
|
||||
|
||||
if (!status.ok())
|
||||
{
|
||||
@ -188,36 +324,6 @@ RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc
|
||||
path.c_str(),
|
||||
status.ToString().c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
while (status.IsInvalidArgument());
|
||||
|
||||
RocksDBStorage* pStorage = nullptr;
|
||||
|
||||
if (status.ok())
|
||||
{
|
||||
std::string value;
|
||||
|
||||
status = pDb->Get(rocksdb::ReadOptions(), key, &value);
|
||||
|
||||
if (status.ok())
|
||||
{
|
||||
const StorageRocksDBVersion* pVersion =
|
||||
reinterpret_cast<const StorageRocksDBVersion*>(value.data());
|
||||
|
||||
// When the version is bumped, it needs to be decided what if any
|
||||
// backward compatibility is provided. After all, it's a cache, so
|
||||
// you should be able to delete it at any point and pay a small
|
||||
// price while the cache is rebuilt.
|
||||
if ((pVersion->major == STORAGE_ROCKSDB_MAJOR) &&
|
||||
(pVersion->minor == STORAGE_ROCKSDB_MINOR) &&
|
||||
(pVersion->correction == STORAGE_ROCKSDB_CORRECTION))
|
||||
{
|
||||
MXS_NOTICE("Version of \"%s\" is %s, version of storage_rocksdb is %s.",
|
||||
path.c_str(),
|
||||
toString(*pVersion).c_str(),
|
||||
toString(STORAGE_ROCKSDB_VERSION).c_str());
|
||||
|
||||
unique_ptr<rocksdb::DBWithTTL> sDb(pDb);
|
||||
|
||||
@ -225,31 +331,14 @@ RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Version of RocksDB database \"%s\" is %s, while version required "
|
||||
"is %s. You need to delete the database and restart.",
|
||||
path.c_str(),
|
||||
toString(*pVersion).c_str(),
|
||||
toString(STORAGE_ROCKSDB_VERSION).c_str());
|
||||
delete pDb;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Could not read version information from RocksDB database %s. "
|
||||
"You may need to delete the database and retry. RocksDB error: \"%s\"",
|
||||
path.c_str(),
|
||||
status.ToString().c_str());
|
||||
delete pDb;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Could not open/initialize RocksDB database %s. RocksDB error: \"%s\"",
|
||||
path.c_str(), status.ToString().c_str());
|
||||
MXS_ERROR("Could not create RocksDB database %s. RocksDB error: \"%s\"",
|
||||
path.c_str(), status.ToString().c_str());
|
||||
|
||||
if (status.IsIOError())
|
||||
{
|
||||
MXS_ERROR("Is an other MaxScale process running?");
|
||||
if (status.IsIOError())
|
||||
{
|
||||
MXS_ERROR("Is an other MaxScale process running?");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -374,7 +463,7 @@ cache_result_t RocksDBStorage::putValue(const char* pKey, const GWBUF* pValue)
|
||||
rocksdb::Slice key(pKey, ROCKSDB_KEY_LENGTH);
|
||||
rocksdb::Slice value(static_cast<const char*>(GWBUF_DATA(pValue)), GWBUF_LENGTH(pValue));
|
||||
|
||||
rocksdb::Status status = m_sDb->Put(rocksdb::WriteOptions(), key, value);
|
||||
rocksdb::Status status = m_sDb->Put(writeOptions(), key, value);
|
||||
|
||||
return status.ok() ? CACHE_RESULT_OK : CACHE_RESULT_ERROR;
|
||||
}
|
||||
|
@ -42,11 +42,22 @@ private:
|
||||
RocksDBStorage(const RocksDBStorage&) = delete;
|
||||
RocksDBStorage& operator = (const RocksDBStorage&) = delete;
|
||||
|
||||
static RocksDBStorage* Create(const std::string& storageDirectory,
|
||||
const char* zName,
|
||||
uint32_t ttl);
|
||||
|
||||
static const rocksdb::WriteOptions& writeOptions()
|
||||
{
|
||||
return s_writeOptions;
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<rocksdb::DBWithTTL> m_sDb;
|
||||
std::string m_name;
|
||||
std::string m_path;
|
||||
uint32_t m_ttl;
|
||||
|
||||
static rocksdb::WriteOptions s_writeOptions;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
Loading…
x
Reference in New Issue
Block a user