Cache: Handle TTL manually
The RocksDB TTL database only honours the TTL when the database is compacted. If the database is not compacted, stale values will be returned until the end of time. Here we utilize the knowledge that the TTL is stored after the actual value and use the root database for getting the value, thereby getting access to the timestamp. It's still worthwhile using the TTL database as that'll give us compaction and the removal of stale items.
This commit is contained in:
parent
a96d215aa0
commit
7fe981c22f
@ -12,6 +12,7 @@ if (ROCKSDB_BUILT)
|
||||
link_directories(${ROCKSDB_LIB_DIR})
|
||||
|
||||
add_library(storage_rocksdb SHARED
|
||||
rocksdbinternals.cc
|
||||
rocksdbstorage.cc
|
||||
storage_rocksdb.cc
|
||||
)
|
||||
|
51
server/modules/filter/cache/storage/storage_rocksdb/rocksdbinternals.cc
vendored
Normal file
51
server/modules/filter/cache/storage/storage_rocksdb/rocksdbinternals.cc
vendored
Normal file
@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
|
||||
*
|
||||
* Change Date: 2019-07-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include "rocksdbinternals.h"
|
||||
#include <rocksdb/env.h>
|
||||
#include <util/coding.h>
|
||||
|
||||
namespace RocksDBInternals
|
||||
{
|
||||
|
||||
/**
|
||||
* Check whether a value is stale or not.
|
||||
*
|
||||
* @param value A value with the timestamp at the end.
|
||||
* @param ttl The time-to-live in seconds.
|
||||
* @param pEnv The used RocksDB environment instance.
|
||||
*
|
||||
* @return True of the value is stale.
|
||||
*
|
||||
* Basically a copy from RocksDB/utilities/ttl/db_ttl_impl.cc:160
|
||||
* but note that the here we claim the data is stale if we fail to
|
||||
* get the time while the original code claims it is fresh.
|
||||
*/
|
||||
bool IsStale(const rocksdb::Slice& value, int32_t ttl, rocksdb::Env* pEnv)
|
||||
{
|
||||
if (ttl <= 0)
|
||||
{ // Data is fresh if TTL is non-positive
|
||||
return false;
|
||||
}
|
||||
|
||||
int64_t curtime;
|
||||
if (!pEnv->GetCurrentTime(&curtime).ok())
|
||||
{
|
||||
return true; // Treat the data as stale if could not get current time
|
||||
}
|
||||
|
||||
int32_t timestamp = rocksdb::DecodeFixed32(value.data() + value.size() - TS_LENGTH);
|
||||
return (timestamp + ttl) < curtime;
|
||||
}
|
||||
|
||||
}
|
41
server/modules/filter/cache/storage/storage_rocksdb/rocksdbinternals.h
vendored
Normal file
41
server/modules/filter/cache/storage/storage_rocksdb/rocksdbinternals.h
vendored
Normal file
@ -0,0 +1,41 @@
|
||||
#pragma once
|
||||
#ifndef _ROCKSDBINTERNALS_H
|
||||
#define _ROCKSDBINTERNALS_H
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
|
||||
*
|
||||
* Change Date: 2019-07-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include "storage_rocksdb.h"
|
||||
#include <rocksdb/env.h>
|
||||
#include <rocksdb/version.h>
|
||||
#include <rocksdb/slice.h>
|
||||
|
||||
#if (ROCKSDB_MAJOR != 4) || (ROCKSDB_MINOR != 9)
|
||||
#error RocksDBStorage was created with knowledge of RocksDB 4.9 internals.\
|
||||
The version used is something else. Ensure the knowledge is still applicable.
|
||||
#endif
|
||||
|
||||
namespace RocksDBInternals
|
||||
{
|
||||
|
||||
/**
|
||||
* The length of the timestamp when stashed after the actual value.
|
||||
*
|
||||
* See RocksDB/utilities/ttl/db_ttl_impl.h
|
||||
*/
|
||||
static const uint32_t TS_LENGTH = sizeof(int32_t);
|
||||
|
||||
bool IsStale(const rocksdb::Slice& slice, int32_t ttl, rocksdb::Env* pEnv);
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -15,17 +15,26 @@
|
||||
#include <openssl/sha.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <rocksdb/env.h>
|
||||
#include <gwdirs.h>
|
||||
#include "rocksdbinternals.h"
|
||||
|
||||
using std::string;
|
||||
using std::unique_ptr;
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
const size_t ROCKSDB_KEY_LENGTH = SHA512_DIGEST_LENGTH;
|
||||
string u_storageDirectory;
|
||||
|
||||
const size_t ROCKSDB_KEY_LENGTH = SHA512_DIGEST_LENGTH;
|
||||
|
||||
// See https://github.com/facebook/rocksdb/wiki/Basic-Operations#thread-pools
|
||||
// These figures should perhaps depend upon the number of cache instances.
|
||||
const size_t ROCKSDB_N_LOW_THREADS = 2;
|
||||
const size_t ROCKSDB_N_HIGH_THREADS = 1;
|
||||
|
||||
}
|
||||
|
||||
//private
|
||||
@ -65,6 +74,12 @@ bool RocksDBStorage::Initialize()
|
||||
u_storageDirectory.c_str(),
|
||||
strerror_r(errno, errbuf, sizeof(errbuf)));
|
||||
}
|
||||
else
|
||||
{
|
||||
auto pEnv = rocksdb::Env::Default();
|
||||
pEnv->SetBackgroundThreads(ROCKSDB_N_LOW_THREADS, rocksdb::Env::LOW);
|
||||
pEnv->SetBackgroundThreads(ROCKSDB_N_HIGH_THREADS, rocksdb::Env::HIGH);
|
||||
}
|
||||
|
||||
return initialized;
|
||||
}
|
||||
@ -81,6 +96,9 @@ RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc
|
||||
path += zName;
|
||||
|
||||
rocksdb::Options options;
|
||||
options.env = rocksdb::Env::Default();
|
||||
options.max_background_compactions = ROCKSDB_N_LOW_THREADS;
|
||||
options.max_background_flushes = ROCKSDB_N_HIGH_THREADS;
|
||||
options.create_if_missing = true;
|
||||
rocksdb::DBWithTTL* pDb;
|
||||
|
||||
@ -120,25 +138,42 @@ cache_result_t RocksDBStorage::getKey(const GWBUF* pQuery, char* pKey)
|
||||
|
||||
cache_result_t RocksDBStorage::getValue(const char* pKey, GWBUF** ppResult)
|
||||
{
|
||||
// Use the root DB so that we get the value *with* the timestamp at the end.
|
||||
rocksdb::DB* pDb = m_sDb->GetRootDB();
|
||||
rocksdb::Slice key(pKey, ROCKSDB_KEY_LENGTH);
|
||||
string value;
|
||||
|
||||
rocksdb::Status status = m_sDb->Get(rocksdb::ReadOptions(), key, &value);
|
||||
rocksdb::Status status = pDb->Get(rocksdb::ReadOptions(), key, &value);
|
||||
|
||||
cache_result_t result = CACHE_RESULT_ERROR;
|
||||
|
||||
switch (status.code())
|
||||
{
|
||||
case rocksdb::Status::kOk:
|
||||
if (value.length() >= RocksDBInternals::TS_LENGTH)
|
||||
{
|
||||
*ppResult = gwbuf_alloc(value.length());
|
||||
|
||||
if (*ppResult)
|
||||
if (!RocksDBInternals::IsStale(value, m_ttl, rocksdb::Env::Default()))
|
||||
{
|
||||
memcpy(GWBUF_DATA(*ppResult), value.data(), value.length());
|
||||
size_t length = value.length() - RocksDBInternals::TS_LENGTH;
|
||||
|
||||
result = CACHE_RESULT_OK;
|
||||
*ppResult = gwbuf_alloc(length);
|
||||
|
||||
if (*ppResult)
|
||||
{
|
||||
memcpy(GWBUF_DATA(*ppResult), value.data(), length);
|
||||
|
||||
result = CACHE_RESULT_OK;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_NOTICE("Cache item is stale, not using.");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("RocksDB value too short. Database corrupted?");
|
||||
result = CACHE_RESULT_ERROR;
|
||||
}
|
||||
break;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user