Merge branch 'develop' into pull-102

This commit is contained in:
Markus Makela
2016-09-15 10:26:53 +03:00
28 changed files with 807 additions and 129 deletions

View File

@ -2,3 +2,5 @@ add_library(cache SHARED cache.c storage.c)
target_link_libraries(cache maxscale-common)
set_target_properties(cache PROPERTIES VERSION "1.0.0")
install_module(cache experimental)
add_subdirectory(storage)

View File

@ -22,7 +22,17 @@
static char VERSION_STRING[] = "V1.0.0";
static const int DEFAULT_TTL = 10;
typedef enum cache_references
{
CACHE_REFERENCES_ANY,
CACHE_REFERENCES_QUALIFIED
} cache_references_t;
#define DEFAULT_ALLOWED_REFERENCES CACHE_REFERENCES_QUALIFIED
// Bytes
#define DEFAULT_MAX_RESULTSET_SIZE 64 * 1024
// Seconds
#define DEFAULT_TTL 10
static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **);
static void *newSession(FILTER *instance, SESSION *session);
@ -88,16 +98,32 @@ FILTER_OBJECT *GetModuleObject()
// Implementation
//
typedef struct cache_config
{
cache_references_t allowed_references;
uint32_t max_resultset_size;
const char *storage;
const char *storage_args;
uint32_t ttl;
} CACHE_CONFIG;
typedef struct cache_instance
{
const char *name;
const char *storage_name;
const char *storage_args;
uint32_t ttl; // Time to live in seconds.
CACHE_STORAGE_MODULE *module;
CACHE_STORAGE *storage;
const char *name;
CACHE_CONFIG config;
CACHE_STORAGE_MODULE *module;
CACHE_STORAGE *storage;
} CACHE_INSTANCE;
static const CACHE_CONFIG DEFAULT_CONFIG =
{
DEFAULT_ALLOWED_REFERENCES,
DEFAULT_MAX_RESULTSET_SIZE,
NULL,
NULL,
DEFAULT_TTL
};
typedef struct cache_session_data
{
CACHE_STORAGE_API *api; /**< The storage API to be used. */
@ -131,9 +157,7 @@ static bool route_using_cache(CACHE_INSTANCE *instance,
*/
static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **params)
{
const char *storage_name = NULL;
const char *storage_args = NULL;
uint32_t ttl = DEFAULT_TTL;
CACHE_CONFIG config = DEFAULT_CONFIG;
bool error = false;
@ -141,13 +165,44 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
{
const FILTER_PARAMETER *param = params[i];
if (strcmp(param->name, "storage_name") == 0)
if (strcmp(param->name, "allowed_references") == 0)
{
storage_name = param->value;
if (strcmp(param->value, "qualified") == 0)
{
config.allowed_references = CACHE_REFERENCES_QUALIFIED;
}
else if (strcmp(param->value, "any") == 0)
{
config.allowed_references = CACHE_REFERENCES_ANY;
}
else
{
MXS_ERROR("Unknown value '%s' for parameter '%s'.", param->value, param->name);
error = true;
}
}
else if (strcmp(param->name, "max_resultset_size") == 0)
{
int v = atoi(param->value);
if (v > 0)
{
config.max_resultset_size = v;
}
else
{
MXS_ERROR("The value of the configuration entry '%s' must "
"be an integer larger than 0.", param->name);
error = true;
}
}
else if (strcmp(param->name, "storage_args") == 0)
{
storage_args = param->value;
config.storage_args = param->value;
}
else if (strcmp(param->name, "storage") == 0)
{
config.storage = param->value;
}
else if (strcmp(param->name, "ttl") == 0)
{
@ -155,12 +210,12 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
if (v > 0)
{
ttl = v;
config.ttl = v;
}
else
{
MXS_ERROR("The value of the configuration entry 'ttl' must "
"be an integer larger than 0.");
MXS_ERROR("The value of the configuration entry '%s' must "
"be an integer larger than 0.", param->name);
error = true;
}
}
@ -177,22 +232,20 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
{
if ((cinstance = MXS_CALLOC(1, sizeof(CACHE_INSTANCE))) != NULL)
{
CACHE_STORAGE_MODULE *module = cache_storage_open(storage_name);
CACHE_STORAGE_MODULE *module = cache_storage_open(config.storage);
if (module)
{
CACHE_STORAGE *storage = module->api->createInstance(name, ttl, 0, NULL);
CACHE_STORAGE *storage = module->api->createInstance(name, config.ttl, 0, NULL);
if (storage)
{
cinstance->name = name;
cinstance->storage_name = storage_name;
cinstance->storage_args = storage_args;
cinstance->ttl = ttl;
cinstance->config = config;
cinstance->module = module;
cinstance->storage = storage;
MXS_NOTICE("Cache storage %s opened and initialized.", storage_name);
MXS_NOTICE("Cache storage %s opened and initialized.", config.storage);
}
else
{

View File

@ -0,0 +1 @@
add_subdirectory(storage_rocksdb)

View File

@ -0,0 +1,49 @@
# Build RocksDB
if ((CMAKE_CXX_COMPILER_ID STREQUAL "GNU") AND (NOT (CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.7)))
message(STATUS "GCC >= 4.7, RocksDB is built.")
set(ROCKSDB_REPO "https://github.com/facebook/rocksdb.git"
CACHE STRING "RocksDB Git repository")
# Release 4.9 of RocksDB
set(ROCKSDB_TAG "v4.9"
CACHE STRING "RocksDB Git tag")
set(ROCKSDB_SUBPATH "/server/modules/filter/cache/storage/storage_rocksdb/RocksDB-prefix/src/RocksDB")
set(ROCKSDB_ROOT ${CMAKE_BINARY_DIR}${ROCKSDB_SUBPATH})
ExternalProject_Add(RocksDB
GIT_REPOSITORY ${ROCKSDB_REPO}
GIT_TAG ${ROCKSDB_TAG}
CONFIGURE_COMMAND ""
BINARY_DIR ${ROCKSDB_ROOT}
CONFIGURE_COMMAND ""
BUILD_COMMAND make DISABLE_JEMALLOC=1 EXTRA_CXXFLAGS=-fPIC static_lib
INSTALL_COMMAND "")
set(ROCKSDB_BUILT TRUE CACHE INTERNAL "")
set(ROCKSDB_INCLUDE_DIRS ${ROCKSDB_ROOT}/include ${ROCKSDB_ROOT})
set(ROCKSDB_LIB_DIR ${ROCKSDB_ROOT})
set(ROCKSDB_LIB librocksdb.a)
# RocksDB supports several compression libraries and automatically
# uses them if it finds the headers in the environment. Consequently,
# we must ensure that a user of RocksDB can link to the needed
# libraries.
#
# ROCKSDB_LINK_LIBS specifies that libraries a module using ROCKSDB_LIB
# must link with.
find_package(BZip2)
if (BZIP2_FOUND)
set(ROCKSDB_LINK_LIBS ${ROCKSDB_LINK_LIBS} ${BZIP2_LIBRARIES})
endif()
find_package(ZLIB)
if (ZLIB_FOUND)
set(ROCKSDB_LINK_LIBS ${ROCKSDB_LINK_LIBS} ${ZLIB_LIBRARIES})
endif()
else()
message(STATUS "RocksDB requires GCC >= 4.7, only ${CMAKE_CXX_COMPILER_VERSION} available.")
endif()

View File

@ -0,0 +1,26 @@
include(BuildRocksDB.cmake)
if (ROCKSDB_BUILT)
message(STATUS "RocksDB is built, storage_rocksdb will be built.")
set(CMAKE_CXX_FLAGS "-std=c++11 ${CMAKE_CXX_FLAGS} -DROCKSDB_PLATFORM_POSIX")
set(CMAKE_CXX_FLAGS_DEBUG "-std=c++11 ${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_PLATFORM_POSIX")
set(CMAKE_CXX_FLAGS_RELEASE "-std=c++11 ${CMAKE_CXX_FLAGS_RELEASE} -DROCKSDB_PLATFORM_POSIX")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-std=c++11 ${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -DROCKSDB_PLATFORM_POSIX")
include_directories(${ROCKSDB_INCLUDE_DIRS})
link_directories(${ROCKSDB_LIB_DIR})
add_library(storage_rocksdb SHARED
rocksdbinternals.cc
rocksdbstorage.cc
storage_rocksdb.cc
)
add_dependencies(storage_rocksdb RocksDB)
target_link_libraries(storage_rocksdb maxscale-common ${ROCKSDB_LIB} ${ROCKSDB_LINK_LIBS})
set_target_properties(storage_rocksdb PROPERTIES VERSION "1.0.0")
set_target_properties(storage_rocksdb PROPERTIES LINK_FLAGS -Wl,-z,defs)
install_module(storage_rocksdb experimental)
else()
message("RocksDB not built, storage_rocksdb cannot be built.")
endif()

View File

@ -0,0 +1,46 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "rocksdbinternals.h"
#include <rocksdb/env.h>
#include <util/coding.h>
/**
* Check whether a value is stale or not.
*
* @param value A value with the timestamp at the end.
* @param ttl The time-to-live in seconds.
* @param pEnv The used RocksDB environment instance.
*
* @return True of the value is stale.
*
* Basically a copy from RocksDB/utilities/ttl/db_ttl_impl.cc:160
* but note that the here we claim the data is stale if we fail to
* get the time while the original code claims it is fresh.
*/
bool RocksDBInternals::IsStale(const rocksdb::Slice& value, int32_t ttl, rocksdb::Env* pEnv)
{
if (ttl <= 0)
{ // Data is fresh if TTL is non-positive
return false;
}
int64_t curtime;
if (!pEnv->GetCurrentTime(&curtime).ok())
{
return true; // Treat the data as stale if could not get current time
}
int32_t timestamp = rocksdb::DecodeFixed32(value.data() + value.size() - TS_LENGTH);
return (timestamp + ttl) < curtime;
}

View File

@ -0,0 +1,41 @@
#pragma once
#ifndef _ROCKSDBINTERNALS_H
#define _ROCKSDBINTERNALS_H
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "storage_rocksdb.h"
#include <rocksdb/env.h>
#include <rocksdb/version.h>
#include <rocksdb/slice.h>
#if (ROCKSDB_MAJOR != 4) || (ROCKSDB_MINOR != 9)
#error RocksDBStorage was created with knowledge of RocksDB 4.9 internals.\
The version used is something else. Ensure the knowledge is still applicable.
#endif
namespace RocksDBInternals
{
/**
* The length of the timestamp when stashed after the actual value.
*
* See RocksDB/utilities/ttl/db_ttl_impl.h
*/
static const uint32_t TS_LENGTH = sizeof(int32_t);
bool IsStale(const rocksdb::Slice& slice, int32_t ttl, rocksdb::Env* pEnv);
}
#endif

View File

@ -0,0 +1,201 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "rocksdbstorage.h"
#include <openssl/sha.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <rocksdb/env.h>
#include <gwdirs.h>
#include "rocksdbinternals.h"
using std::string;
using std::unique_ptr;
namespace
{
string u_storageDirectory;
const size_t ROCKSDB_KEY_LENGTH = SHA512_DIGEST_LENGTH;
// See https://github.com/facebook/rocksdb/wiki/Basic-Operations#thread-pools
// These figures should perhaps depend upon the number of cache instances.
const size_t ROCKSDB_N_LOW_THREADS = 2;
const size_t ROCKSDB_N_HIGH_THREADS = 1;
}
//private
RocksDBStorage::RocksDBStorage(unique_ptr<rocksdb::DBWithTTL>& sDb,
const string& name,
const string& path,
uint32_t ttl)
: m_sDb(std::move(sDb))
, m_name(name)
, m_path(path)
, m_ttl(ttl)
{
}
RocksDBStorage::~RocksDBStorage()
{
}
//static
bool RocksDBStorage::Initialize()
{
bool initialized = true;
u_storageDirectory = get_cachedir();
u_storageDirectory += "/storage_rocksdb";
if (mkdir(u_storageDirectory.c_str(), S_IRWXU) == 0)
{
MXS_NOTICE("Created storage directory %s.", u_storageDirectory.c_str());
}
else if (errno != EEXIST)
{
initialized = false;
char errbuf[STRERROR_BUFLEN];
MXS_ERROR("Failed to create storage directory %s: %s",
u_storageDirectory.c_str(),
strerror_r(errno, errbuf, sizeof(errbuf)));
}
else
{
auto pEnv = rocksdb::Env::Default();
pEnv->SetBackgroundThreads(ROCKSDB_N_LOW_THREADS, rocksdb::Env::LOW);
pEnv->SetBackgroundThreads(ROCKSDB_N_HIGH_THREADS, rocksdb::Env::HIGH);
}
return initialized;
}
//static
RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc, char* argv[])
{
ss_dassert(zName);
string path(u_storageDirectory);
path += "/";
path += zName;
rocksdb::Options options;
options.env = rocksdb::Env::Default();
options.max_background_compactions = ROCKSDB_N_LOW_THREADS;
options.max_background_flushes = ROCKSDB_N_HIGH_THREADS;
options.create_if_missing = true;
rocksdb::DBWithTTL* pDb;
rocksdb::Status status = rocksdb::DBWithTTL::Open(options, path, &pDb, ttl);
RocksDBStorage* pStorage = nullptr;
if (status.ok())
{
unique_ptr<rocksdb::DBWithTTL> sDb(pDb);
pStorage = new RocksDBStorage(sDb, zName, path, ttl);
}
else
{
MXS_ERROR("Could not open RocksDB database %s using path %s: %s",
zName, path.c_str(), status.ToString().c_str());
}
return pStorage;
}
cache_result_t RocksDBStorage::getKey(const GWBUF* pQuery, char* pKey)
{
// ss_dassert(gwbuf_is_contiguous(pQuery));
const uint8_t* pData = static_cast<const uint8_t*>(GWBUF_DATA(pQuery));
size_t len = MYSQL_GET_PACKET_LEN(pData) - 1; // Subtract 1 for packet type byte.
const uint8_t* pSql = &pData[5]; // Symbolic constant for 5?
memset(pKey, 0, CACHE_KEY_MAXLEN);
SHA512(pSql, len, reinterpret_cast<unsigned char*>(pKey));
return CACHE_RESULT_OK;
}
cache_result_t RocksDBStorage::getValue(const char* pKey, GWBUF** ppResult)
{
// Use the root DB so that we get the value *with* the timestamp at the end.
rocksdb::DB* pDb = m_sDb->GetRootDB();
rocksdb::Slice key(pKey, ROCKSDB_KEY_LENGTH);
string value;
rocksdb::Status status = pDb->Get(rocksdb::ReadOptions(), key, &value);
cache_result_t result = CACHE_RESULT_ERROR;
switch (status.code())
{
case rocksdb::Status::kOk:
if (value.length() >= RocksDBInternals::TS_LENGTH)
{
if (!RocksDBInternals::IsStale(value, m_ttl, rocksdb::Env::Default()))
{
size_t length = value.length() - RocksDBInternals::TS_LENGTH;
*ppResult = gwbuf_alloc(length);
if (*ppResult)
{
memcpy(GWBUF_DATA(*ppResult), value.data(), length);
result = CACHE_RESULT_OK;
}
}
else
{
MXS_NOTICE("Cache item is stale, not using.");
}
}
else
{
MXS_ERROR("RocksDB value too short. Database corrupted?");
result = CACHE_RESULT_ERROR;
}
break;
case rocksdb::Status::kNotFound:
result = CACHE_RESULT_NOT_FOUND;
break;
default:
MXS_ERROR("Failed to look up value: %s", status.ToString().c_str());
}
return result;
}
cache_result_t RocksDBStorage::putValue(const char* pKey, const GWBUF* pValue)
{
// ss_dassert(gwbuf_is_contiguous(pValue));
rocksdb::Slice key(pKey, ROCKSDB_KEY_LENGTH);
rocksdb::Slice value(static_cast<const char*>(GWBUF_DATA(pValue)), GWBUF_LENGTH(pValue));
rocksdb::Status status = m_sDb->Put(rocksdb::WriteOptions(), key, value);
return status.ok() ? CACHE_RESULT_OK : CACHE_RESULT_ERROR;
}

View File

@ -0,0 +1,50 @@
#ifndef _ROCKSDBSTORAGE_H
#define _ROCKSDBSTORAGE_H
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "storage_rocksdb.h"
#include <memory>
#include <string>
#include <rocksdb/utilities/db_ttl.h>
#include "../../cache_storage_api.h"
class RocksDBStorage
{
public:
static bool Initialize();
static RocksDBStorage* Create(const char* zName, uint32_t ttl, int argc, char* argv[]);
~RocksDBStorage();
cache_result_t getKey(const GWBUF* pQuery, char* pKey);
cache_result_t getValue(const char* pKey, GWBUF** ppResult);
cache_result_t putValue(const char* pKey, const GWBUF* pValue);
private:
RocksDBStorage(std::unique_ptr<rocksdb::DBWithTTL>& sDb,
const std::string& name,
const std::string& path,
uint32_t ttl);
RocksDBStorage(const RocksDBStorage&) = delete;
RocksDBStorage& operator = (const RocksDBStorage&) = delete;
private:
std::unique_ptr<rocksdb::DBWithTTL> m_sDb;
std::string m_name;
std::string m_path;
uint32_t m_ttl;
};
#endif

View File

@ -0,0 +1,151 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "storage_rocksdb.h"
#include "../../cache_storage_api.h"
#include "rocksdbstorage.h"
namespace
{
bool initialize()
{
return RocksDBStorage::Initialize();
}
CACHE_STORAGE* createInstance(const char* zName, uint32_t ttl, int argc, char* argv[])
{
CACHE_STORAGE* pStorage = 0;
try
{
pStorage = reinterpret_cast<CACHE_STORAGE*>(RocksDBStorage::Create(zName, ttl, argc, argv));
}
catch (const std::bad_alloc&)
{
MXS_OOM();
}
catch (const std::exception& x)
{
MXS_ERROR("Standard exception caught: %s", x.what());
}
catch (...)
{
MXS_ERROR("Unknown exception caught.");
}
return pStorage;
}
void freeInstance(CACHE_STORAGE* pInstance)
{
delete reinterpret_cast<RocksDBStorage*>(pInstance);
}
cache_result_t getKey(CACHE_STORAGE* pStorage,
const GWBUF* pQuery,
char* pKey)
{
cache_result_t result = CACHE_RESULT_ERROR;
try
{
result = reinterpret_cast<RocksDBStorage*>(pStorage)->getKey(pQuery, pKey);
}
catch (const std::bad_alloc&)
{
MXS_OOM();
}
catch (const std::exception& x)
{
MXS_ERROR("Standard exception caught: %s", x.what());
}
catch (...)
{
MXS_ERROR("Unknown exception caught.");
}
return result;
}
cache_result_t getValue(CACHE_STORAGE* pStorage, const char* pKey, GWBUF** ppResult)
{
cache_result_t result = CACHE_RESULT_ERROR;
try
{
result = reinterpret_cast<RocksDBStorage*>(pStorage)->getValue(pKey, ppResult);
}
catch (const std::bad_alloc&)
{
MXS_OOM();
}
catch (const std::exception& x)
{
MXS_ERROR("Standard exception caught: %s", x.what());
}
catch (...)
{
MXS_ERROR("Unknown exception caught.");
}
return result;
}
cache_result_t putValue(CACHE_STORAGE* pStorage,
const char* pKey,
const GWBUF* pValue)
{
cache_result_t result = CACHE_RESULT_ERROR;
try
{
result = reinterpret_cast<RocksDBStorage*>(pStorage)->putValue(pKey, pValue);
}
catch (const std::bad_alloc&)
{
MXS_OOM();
}
catch (const std::exception& x)
{
MXS_ERROR("Standard exception caught: %s", x.what());
}
catch (...)
{
MXS_ERROR("Unknown exception caught.");
}
return result;
}
}
extern "C"
{
CACHE_STORAGE_API* CacheGetStorageAPI()
{
static CACHE_STORAGE_API api =
{
initialize,
createInstance,
freeInstance,
getKey,
getValue,
putValue
};
return &api;
}
}

View File

@ -0,0 +1,19 @@
#ifndef _STORAGE_ROCKSDB_H
#define _STORAGE_ROCKSDB_H
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#define MXS_MODULE_NAME "storage_rocksdb"
#include <log_manager.h>
#endif