diff --git a/server/modules/filter/cache/storage/storage_rocksdb/CMakeLists.txt b/server/modules/filter/cache/storage/storage_rocksdb/CMakeLists.txt index 87ce77c25..24eaf34dd 100644 --- a/server/modules/filter/cache/storage/storage_rocksdb/CMakeLists.txt +++ b/server/modules/filter/cache/storage/storage_rocksdb/CMakeLists.txt @@ -12,6 +12,7 @@ if (ROCKSDB_BUILT) link_directories(${ROCKSDB_LIB_DIR}) add_library(storage_rocksdb SHARED + rocksdbinternals.cc rocksdbstorage.cc storage_rocksdb.cc ) diff --git a/server/modules/filter/cache/storage/storage_rocksdb/rocksdbinternals.cc b/server/modules/filter/cache/storage/storage_rocksdb/rocksdbinternals.cc new file mode 100644 index 000000000..51bd052cb --- /dev/null +++ b/server/modules/filter/cache/storage/storage_rocksdb/rocksdbinternals.cc @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "rocksdbinternals.h" +#include +#include + +namespace RocksDBInternals +{ + +/** + * Check whether a value is stale or not. + * + * @param value A value with the timestamp at the end. + * @param ttl The time-to-live in seconds. + * @param pEnv The used RocksDB environment instance. + * + * @return True of the value is stale. + * + * Basically a copy from RocksDB/utilities/ttl/db_ttl_impl.cc:160 + * but note that the here we claim the data is stale if we fail to + * get the time while the original code claims it is fresh. + */ +bool IsStale(const rocksdb::Slice& value, int32_t ttl, rocksdb::Env* pEnv) +{ + if (ttl <= 0) + { // Data is fresh if TTL is non-positive + return false; + } + + int64_t curtime; + if (!pEnv->GetCurrentTime(&curtime).ok()) + { + return true; // Treat the data as stale if could not get current time + } + + int32_t timestamp = rocksdb::DecodeFixed32(value.data() + value.size() - TS_LENGTH); + return (timestamp + ttl) < curtime; +} + +} diff --git a/server/modules/filter/cache/storage/storage_rocksdb/rocksdbinternals.h b/server/modules/filter/cache/storage/storage_rocksdb/rocksdbinternals.h new file mode 100644 index 000000000..9435f30d0 --- /dev/null +++ b/server/modules/filter/cache/storage/storage_rocksdb/rocksdbinternals.h @@ -0,0 +1,41 @@ +#pragma once +#ifndef _ROCKSDBINTERNALS_H +#define _ROCKSDBINTERNALS_H +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "storage_rocksdb.h" +#include +#include +#include + +#if (ROCKSDB_MAJOR != 4) || (ROCKSDB_MINOR != 9) +#error RocksDBStorage was created with knowledge of RocksDB 4.9 internals.\ + The version used is something else. Ensure the knowledge is still applicable. +#endif + +namespace RocksDBInternals +{ + +/** + * The length of the timestamp when stashed after the actual value. + * + * See RocksDB/utilities/ttl/db_ttl_impl.h + */ +static const uint32_t TS_LENGTH = sizeof(int32_t); + +bool IsStale(const rocksdb::Slice& slice, int32_t ttl, rocksdb::Env* pEnv); + +} + +#endif diff --git a/server/modules/filter/cache/storage/storage_rocksdb/rocksdbstorage.cc b/server/modules/filter/cache/storage/storage_rocksdb/rocksdbstorage.cc index 460f0f556..96e049f07 100644 --- a/server/modules/filter/cache/storage/storage_rocksdb/rocksdbstorage.cc +++ b/server/modules/filter/cache/storage/storage_rocksdb/rocksdbstorage.cc @@ -15,17 +15,26 @@ #include #include #include +#include #include +#include "rocksdbinternals.h" using std::string; using std::unique_ptr; + namespace { -const size_t ROCKSDB_KEY_LENGTH = SHA512_DIGEST_LENGTH; string u_storageDirectory; +const size_t ROCKSDB_KEY_LENGTH = SHA512_DIGEST_LENGTH; + +// See https://github.com/facebook/rocksdb/wiki/Basic-Operations#thread-pools +// These figures should perhaps depend upon the number of cache instances. +const size_t ROCKSDB_N_LOW_THREADS = 2; +const size_t ROCKSDB_N_HIGH_THREADS = 1; + } //private @@ -65,6 +74,12 @@ bool RocksDBStorage::Initialize() u_storageDirectory.c_str(), strerror_r(errno, errbuf, sizeof(errbuf))); } + else + { + auto pEnv = rocksdb::Env::Default(); + pEnv->SetBackgroundThreads(ROCKSDB_N_LOW_THREADS, rocksdb::Env::LOW); + pEnv->SetBackgroundThreads(ROCKSDB_N_HIGH_THREADS, rocksdb::Env::HIGH); + } return initialized; } @@ -81,6 +96,9 @@ RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc path += zName; rocksdb::Options options; + options.env = rocksdb::Env::Default(); + options.max_background_compactions = ROCKSDB_N_LOW_THREADS; + options.max_background_flushes = ROCKSDB_N_HIGH_THREADS; options.create_if_missing = true; rocksdb::DBWithTTL* pDb; @@ -120,25 +138,42 @@ cache_result_t RocksDBStorage::getKey(const GWBUF* pQuery, char* pKey) cache_result_t RocksDBStorage::getValue(const char* pKey, GWBUF** ppResult) { + // Use the root DB so that we get the value *with* the timestamp at the end. + rocksdb::DB* pDb = m_sDb->GetRootDB(); rocksdb::Slice key(pKey, ROCKSDB_KEY_LENGTH); string value; - rocksdb::Status status = m_sDb->Get(rocksdb::ReadOptions(), key, &value); + rocksdb::Status status = pDb->Get(rocksdb::ReadOptions(), key, &value); cache_result_t result = CACHE_RESULT_ERROR; switch (status.code()) { case rocksdb::Status::kOk: + if (value.length() >= RocksDBInternals::TS_LENGTH) { - *ppResult = gwbuf_alloc(value.length()); - - if (*ppResult) + if (!RocksDBInternals::IsStale(value, m_ttl, rocksdb::Env::Default())) { - memcpy(GWBUF_DATA(*ppResult), value.data(), value.length()); + size_t length = value.length() - RocksDBInternals::TS_LENGTH; - result = CACHE_RESULT_OK; + *ppResult = gwbuf_alloc(length); + + if (*ppResult) + { + memcpy(GWBUF_DATA(*ppResult), value.data(), length); + + result = CACHE_RESULT_OK; + } } + else + { + MXS_NOTICE("Cache item is stale, not using."); + } + } + else + { + MXS_ERROR("RocksDB value too short. Database corrupted?"); + result = CACHE_RESULT_ERROR; } break;