storage_rocksdb: Store version to database

The storage_rocksdb version is now stored to the database. That will
ensure that should the format (key content, length, etc.) change, we
can will detect whether a database is too old and take action.
This commit is contained in:
Johan Wikman
2016-09-29 14:06:23 +03:00
parent b9c956dda9
commit 087338910e
3 changed files with 125 additions and 9 deletions

View File

@ -233,7 +233,7 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
}
else
{
MXS_ERROR("Could not create storage instance for %s.", name);
MXS_ERROR("Could not create storage instance for '%s'.", name);
cache_rules_free(rules);
cache_storage_close(module);
MXS_FREE(cinstance);
@ -242,7 +242,7 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
}
else
{
MXS_ERROR("Could not load cache storage module %s.", name);
MXS_ERROR("Could not load cache storage module '%s'.", name);
cache_rules_free(rules);
MXS_FREE(cinstance);
cinstance = NULL;

View File

@ -202,6 +202,7 @@ void cache_rules_free(CACHE_RULES *rules)
if (rules)
{
cache_rule_free(rules->store_rules);
cache_rule_free(rules->use_rules);
MXS_FREE(rules);
}
}

View File

@ -50,6 +50,41 @@ const size_t ROCKSDB_KEY_LENGTH = 2 * SHA512_DIGEST_LENGTH;
const size_t ROCKSDB_N_LOW_THREADS = 2;
const size_t ROCKSDB_N_HIGH_THREADS = 1;
struct StorageRocksDBVersion
{
uint8_t major;
uint8_t minor;
uint8_t correction;
};
const uint8_t STORAGE_ROCKSDB_MAJOR = 0;
const uint8_t STORAGE_ROCKSDB_MINOR = 1;
const uint8_t STORAGE_ROCKSDB_CORRECTION = 0;
const StorageRocksDBVersion STORAGE_ROCKSDB_VERSION =
{
STORAGE_ROCKSDB_MAJOR,
STORAGE_ROCKSDB_MINOR,
STORAGE_ROCKSDB_CORRECTION
};
string toString(const StorageRocksDBVersion& version)
{
string rv;
rv += "{ ";
rv += std::to_string(version.major);
rv += ", ";
rv += std::to_string(version.minor);
rv += ", ";
rv += std::to_string(version.correction);
rv += " }";
return rv;
}
const char STORAGE_ROCKSDB_VERSION_KEY[] = "MaxScale_Storage_RocksDB_Version";
}
//private
@ -114,23 +149,103 @@ RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc
options.env = rocksdb::Env::Default();
options.max_background_compactions = ROCKSDB_N_LOW_THREADS;
options.max_background_flushes = ROCKSDB_N_HIGH_THREADS;
options.create_if_missing = true;
rocksdb::DBWithTTL* pDb;
rocksdb::Status status = rocksdb::DBWithTTL::Open(options, path, &pDb, ttl);
rocksdb::DBWithTTL* pDb;
rocksdb::Status status;
rocksdb::Slice key(STORAGE_ROCKSDB_VERSION_KEY);
do
{
// Try to open existing.
options.create_if_missing = false;
options.error_if_exists = false;
status = rocksdb::DBWithTTL::Open(options, path, &pDb, ttl);
if (status.IsInvalidArgument()) // Did not exist
{
MXS_NOTICE("Database \"%s\" does not exist, creating.", path.c_str());
options.create_if_missing = true;
options.error_if_exists = true;
status = rocksdb::DBWithTTL::Open(options, path, &pDb, ttl);
if (status.ok())
{
MXS_NOTICE("Database \"%s\" created, storing version %s into it.",
path.c_str(), toString(STORAGE_ROCKSDB_VERSION).c_str());
rocksdb::Slice value(reinterpret_cast<const char*>(&STORAGE_ROCKSDB_VERSION),
sizeof(STORAGE_ROCKSDB_VERSION));
status = pDb->Put(rocksdb::WriteOptions(), key, value);
if (!status.ok())
{
MXS_ERROR("Could not store version information to created RocksDB database \"%s\". "
"You may need to delete the database and retry. RocksDB error: %s",
path.c_str(),
status.ToString().c_str());
}
}
}
}
while (status.IsInvalidArgument());
RocksDBStorage* pStorage = nullptr;
if (status.ok())
{
unique_ptr<rocksdb::DBWithTTL> sDb(pDb);
std::string value;
pStorage = new RocksDBStorage(sDb, zName, path, ttl);
status = pDb->Get(rocksdb::ReadOptions(), key, &value);
if (status.ok())
{
const StorageRocksDBVersion* pVersion =
reinterpret_cast<const StorageRocksDBVersion*>(value.data());
// When the version is bumped, it needs to be decided what if any
// backward compatibility is provided. After all, it's a cache, so
// you should be able to delete it at any point and pay a small
// price while the cache is rebuilt.
if ((pVersion->major == STORAGE_ROCKSDB_MAJOR) &&
(pVersion->minor == STORAGE_ROCKSDB_MINOR) &&
(pVersion->correction == STORAGE_ROCKSDB_CORRECTION))
{
MXS_NOTICE("Version of \"%s\" is %s, version of storage_rocksdb is %s.",
path.c_str(),
toString(*pVersion).c_str(),
toString(STORAGE_ROCKSDB_VERSION).c_str());
unique_ptr<rocksdb::DBWithTTL> sDb(pDb);
pStorage = new RocksDBStorage(sDb, zName, path, ttl);
}
else
{
MXS_ERROR("Version of RocksDB database \"%s\" is %s, while version required "
"is %s. You need to delete the database and restart.",
path.c_str(),
toString(*pVersion).c_str(),
toString(STORAGE_ROCKSDB_VERSION).c_str());
delete pDb;
}
}
else
{
MXS_ERROR("Could not read version information from RocksDB database %s. "
"You may need to delete the database and retry. RocksDB error: %s",
path.c_str(),
status.ToString().c_str());
delete pDb;
}
}
else
{
MXS_ERROR("Could not open RocksDB database %s using path %s: %s",
zName, path.c_str(), status.ToString().c_str());
MXS_ERROR("Could not open/initialize RocksDB database %s. RocksDB error: %s",
path.c_str(), status.ToString().c_str());
}
return pStorage;