From f71be685b6a1e0c6ec3e9164d8e4032fbaf64f34 Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Mon, 12 Dec 2016 14:22:41 +0200 Subject: [PATCH] Cache: Test raw storage Here we create a number of threads and then randomly start getting putting and deleting values. The intent is to test that the locking behaviour of the storage modules is correctly implemented. --- .../modules/filter/cache/test/CMakeLists.txt | 10 + .../filter/cache/test/testrawstorage.cc | 372 ++++++++++++++++++ 2 files changed, 382 insertions(+) create mode 100644 server/modules/filter/cache/test/testrawstorage.cc diff --git a/server/modules/filter/cache/test/CMakeLists.txt b/server/modules/filter/cache/test/CMakeLists.txt index ae3285471..a7bf67d9a 100644 --- a/server/modules/filter/cache/test/CMakeLists.txt +++ b/server/modules/filter/cache/test/CMakeLists.txt @@ -9,6 +9,16 @@ add_executable(testkeygeneration ) target_link_libraries(testkeygeneration maxscale-common cache) +add_executable(testrawstorage + testrawstorage.cc + ../../../../../query_classifier/test/testreader.cc + ) +target_link_libraries(testrawstorage maxscale-common cache) + add_test(TestCache_rules testrules) + add_test(TestCache_keygeneration testkeygeneration storage_inmemory ${CMAKE_CURRENT_SOURCE_DIR}/input.test) add_test(TestCache_keygeneration testkeygeneration storage_rocksdb ${CMAKE_CURRENT_SOURCE_DIR}/input.test) + +add_test(TestCache_storage_inmemory testrawstorage 10 storage_inmemory ${CMAKE_CURRENT_SOURCE_DIR}/input.test) +add_test(TestCache_storage_rocksdb testrawstorage 10 storage_rocksdb ${CMAKE_CURRENT_SOURCE_DIR}/input.test) diff --git a/server/modules/filter/cache/test/testrawstorage.cc b/server/modules/filter/cache/test/testrawstorage.cc new file mode 100644 index 000000000..9c22393e2 --- /dev/null +++ b/server/modules/filter/cache/test/testrawstorage.cc @@ -0,0 +1,372 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "storagefactory.hh" +#include "storage.hh" +#include "cache_storage_api.hh" +// TODO: Move this to a common place. +#include "../../../../../query_classifier/test/testreader.hh" + + +using namespace std; +using namespace std::tr1; +using maxscale::TestReader; + +namespace +{ + +void print_usage(const char* zProgram) +{ + cout << "usage: " << zProgram << " time storage-module text-file\n" + << "\n" + << "where:\n" + << " time is the number of seconds we should run,\n" + << " storage-module is the name of a storage module,\n" + << " test-file is the name of a text file." << endl; +} + +GWBUF* create_gwbuf(const string& s) +{ + size_t len = s.length(); + size_t payload_len = len + 1; + size_t gwbuf_len = MYSQL_HEADER_LEN + payload_len; + + GWBUF* pBuf = gwbuf_alloc(gwbuf_len); + + *((unsigned char*)((char*)GWBUF_DATA(pBuf))) = payload_len; + *((unsigned char*)((char*)GWBUF_DATA(pBuf) + 1)) = (payload_len >> 8); + *((unsigned char*)((char*)GWBUF_DATA(pBuf) + 2)) = (payload_len >> 16); + *((unsigned char*)((char*)GWBUF_DATA(pBuf) + 3)) = 0x00; + *((unsigned char*)((char*)GWBUF_DATA(pBuf) + 4)) = 0x03; + memcpy((char*)GWBUF_DATA(pBuf) + 5, s.c_str(), len); + + return pBuf; +} + +typedef unordered_map StatementsByKey; +typedef vector > Statements; + +struct ThreadData +{ + ThreadData() + : pStorage(0) + , pStatements(0) + , thread(0) + , terminate(false) + , rv(EXIT_SUCCESS) + {} + + Storage* pStorage; + const Statements* pStatements; + pthread_t thread; + bool terminate; + int rv; +}; + +void* thread_main(void* pData) +{ + cout << "Thread starting.\n" << flush; + ThreadData* pThreadData = static_cast(pData); + + Storage& storage = *pThreadData->pStorage; + const Statements& statements = *pThreadData->pStatements; + bool& terminate = pThreadData->terminate; + + size_t n = statements.size(); + + enum action_t + { + PUT, + GET, + DEL + }; + + action_t action = PUT; + + size_t n_puts = 0; + size_t n_gets = 0; + size_t n_dels = 0; + size_t n_misses = 0; + + while (!terminate) + { + size_t i = n * ((double) random() / RAND_MAX); + ss_dassert(i < n); + + const Statements::value_type& statement = statements[i]; + + switch (action) + { + case PUT: + { + cache_result_t result = storage.put_value(statement.first, statement.second); + ss_dassert(result == CACHE_RESULT_OK); + action = GET; + ++n_puts; + } + break; + + case GET: + { + GWBUF* pQuery; + cache_result_t result = storage.get_value(statement.first, 0, &pQuery); + + if (result == CACHE_RESULT_OK) + { + ss_dassert(GWBUF_LENGTH(pQuery) == GWBUF_LENGTH(statement.second)); + ss_dassert(memcmp(GWBUF_DATA(pQuery), GWBUF_DATA(statement.second), + GWBUF_LENGTH(pQuery)) == 0); + + gwbuf_free(pQuery); + ++n_gets; + } + else + { + ss_dassert(result == CACHE_RESULT_NOT_FOUND); + ++n_misses; + } + action = DEL; + } + break; + + case DEL: + { + cache_result_t result = storage.del_value(statement.first); + + if (result == CACHE_RESULT_OK) + { + ++n_dels; + } + else if (result == CACHE_RESULT_NOT_FOUND) + { + ++n_misses; + } + else + { + ss_dassert(!true); + } + action = PUT; + } + break; + + default: + ss_dassert(!true); + } + } + + pThreadData->rv = EXIT_SUCCESS; + + stringstream ss; + ss << "Thread ending: " << n_gets << ", " << n_puts << ", " << n_dels << ", " << n_misses << "\n"; + cout << ss.str() << flush; + return 0; +} + +int test(size_t n_threads, size_t seconds, Storage& storage, const Statements& statements) +{ + int rv = EXIT_SUCCESS; + + ThreadData threadDatas[n_threads]; + + for (size_t i = 0; i < n_threads; ++i) + { + ThreadData* pThreadData = &threadDatas[i]; + + pThreadData->pStorage = &storage; + pThreadData->pStatements = &statements; + + if (pthread_create(&pThreadData->thread, NULL, thread_main, pThreadData) != 0) + { + // This is impossible, so we just return. + return EXIT_FAILURE; + } + } + + stringstream ss; + ss << "Main thread started " << n_threads << " threads.\n"; + + cout << ss.str() << flush; + + sleep(seconds); + + cout << "Woke up, now waiting for workers to terminate\n." << flush; + + for (size_t i = 0; i < n_threads; ++i) + { + threadDatas[i].terminate = true; + pthread_join(threadDatas[i].thread, NULL); + + if (rv == EXIT_SUCCESS) + { + rv = threadDatas[i].rv; + } + } + + cout << "Waited for workers.\n" << flush; + + return rv; +} + +int test(size_t n_threads, size_t seconds, Storage& storage, istream& in) +{ + int rv = EXIT_SUCCESS; + + StatementsByKey statementsByKey; + + TestReader reader(in); + + // Adjust the number of items according to number of threads and duration + // of test-run to ensure that there are collisions. + size_t n_max_items = n_threads * seconds * 50; + size_t n_items = 0; + + string line; + while ((rv == EXIT_SUCCESS) && + (n_items < n_max_items) && + (reader.get_statement(line) == TestReader::RESULT_STMT)) + { + GWBUF* pStmt = create_gwbuf(line); + + CACHE_KEY key; + cache_result_t result = storage.get_key(NULL, pStmt, &key); + + if (result == CACHE_RESULT_OK) + { + StatementsByKey::iterator i = statementsByKey.find(key); + + if (i == statementsByKey.end()) + { + ++n_items; + statementsByKey.insert(make_pair(key, pStmt)); + } + else + { + // Duplicate + gwbuf_free(pStmt); + } + } + else + { + cerr << "error: Could not generate a key for '" << line << "'." << endl; + rv = EXIT_FAILURE; + } + } + + Statements statements; + + copy(statementsByKey.begin(), statementsByKey.end(), back_inserter(statements)); + + if (rv == EXIT_SUCCESS) + { + rv = test(n_threads, seconds, storage, statements); + } + + return rv; +} + +int test(size_t n_threads, size_t seconds, StorageFactory& factory, istream& in) +{ + int rv = EXIT_FAILURE; + + Storage* pStorage = factory.createRawStorage(CACHE_THREAD_MODEL_MT, + "unspecified", + 0, + 0, + 0, + 0, NULL); + + if (pStorage) + { + rv = test(n_threads, seconds, *pStorage, in); + + delete pStorage; + } + + return rv; +} + +} + +int main(int argc, char* argv[]) +{ + int rv = EXIT_FAILURE; + + if ((argc == 3) || (argc == 4)) + { + if (mxs_log_init(NULL, ".", MXS_LOG_TARGET_DEFAULT)) + { + size_t seconds = atoi(argv[1]); + + if (qc_init(NULL, NULL)) + { + const char* zModule = argv[2]; + + StorageFactory* pFactory = StorageFactory::Open(zModule); + + if (pFactory) + { + size_t n_threads = get_processor_count() + 1; + + if (argc == 3) + { + rv = test(n_threads, seconds, *pFactory, cin); + } + else + { + fstream in(argv[3]); + + if (in) + { + rv = test(n_threads, seconds, *pFactory, in); + } + else + { + cerr << "error: Could not open " << argv[3] << "." << endl; + } + } + + delete pFactory; + } + else + { + cerr << "error: Could not initialize factory " << zModule << "." << endl; + } + } + else + { + cerr << "error: Could not initialize query classifier." << endl; + } + + mxs_log_finish(); + } + else + { + cerr << "error: Could not initialize log." << endl; + } + } + else + { + print_usage(argv[0]); + } + + return rv; +}