Expand testrawstorage

Now also rudimentary tests the LRU mechanism, which at the same
time makes the name a misnomer. These will be split into separate
programs to allow tests to be run individually.
This commit is contained in:
Johan Wikman
2016-12-14 22:31:16 +02:00
parent 8019bf71e0
commit f59c6d67c3

View File

@ -14,6 +14,7 @@
#include <maxscale/cppdefs.hh> #include <maxscale/cppdefs.hh>
#include <iostream> #include <iostream>
#include <fstream> #include <fstream>
#include <set>
#include <sstream> #include <sstream>
#include <vector> #include <vector>
#include <pthread.h> #include <pthread.h>
@ -46,7 +47,7 @@ void print_usage(const char* zProgram)
<< " test-file is the name of a text file." << endl; << " test-file is the name of a text file." << endl;
} }
GWBUF* create_gwbuf(const string& s) GWBUF* gwbuf_from_string(const string& s)
{ {
size_t len = s.length(); size_t len = s.length();
size_t payload_len = len + 1; size_t payload_len = len + 1;
@ -64,63 +65,73 @@ GWBUF* create_gwbuf(const string& s)
return pBuf; return pBuf;
} }
typedef vector<pair<CACHE_KEY, GWBUF*> > Statements; typedef vector<string> Statements;
typedef vector<pair<CACHE_KEY, GWBUF*> > CacheItems;
bool get_statements(istream& in, Storage& storage, size_t n_statements, Statements* pStatements) size_t get_num_statements(size_t n_threads, size_t n_seconds)
{
return n_threads * n_seconds * 10;
}
size_t get_statements(istream& in, size_t n_statements, Statements* pStatements)
{ {
bool success = true; bool success = true;
typedef unordered_map<CACHE_KEY, GWBUF*> StatementsByKey; typedef std::set<string> StatementsSet;
StatementsByKey statements_by_key; StatementsSet statements;
TestReader reader(in); TestReader reader(in);
size_t n = 0; size_t n = 0;
string line; string statement;
while (success && while (success &&
(n < n_statements) && (n < n_statements) &&
(reader.get_statement(line) == TestReader::RESULT_STMT)) (reader.get_statement(statement) == TestReader::RESULT_STMT))
{ {
GWBUF* pStmt = create_gwbuf(line); if (statements.find(statement) == statements.end())
CACHE_KEY key;
cache_result_t result = storage.get_key(NULL, pStmt, &key);
if (result == CACHE_RESULT_OK)
{ {
StatementsByKey::iterator i = statements_by_key.find(key); // Not seen before
statements.insert(statement);
if (i == statements_by_key.end()) pStatements->push_back(statement);
++n;
}
}
return n;
}
bool get_cache_items(const Statements& statements, const Storage& storage, CacheItems* pItems)
{
bool success = true;
Statements::const_iterator i = statements.begin();
while (success && (i != statements.end()))
{
GWBUF* pQuery = gwbuf_from_string(*i);
if (pQuery)
{
CACHE_KEY key;
cache_result_t result = storage.get_key(NULL, pQuery, &key);
if (result == CACHE_RESULT_OK)
{ {
++n; pItems->push_back(std::make_pair(key, pQuery));
statements_by_key.insert(make_pair(key, pStmt));
} }
else else
{ {
// Duplicate ss_dassert(!true);
gwbuf_free(pStmt); success = false;
} }
} }
else else
{ {
cerr << "error: Could not generate a key for '" << line << "'." << endl; ss_dassert(!true);
success = false; success = false;
} }
}
if (success) ++i;
{
copy(statements_by_key.begin(), statements_by_key.end(), back_inserter(*pStatements));
}
else
{
StatementsByKey::iterator i = statements_by_key.begin();
while (i != statements_by_key.end())
{
delete i->second;
++i;
}
} }
return success; return success;
@ -133,9 +144,24 @@ enum storage_action_t
STORAGE_DEL STORAGE_DEL
}; };
inline storage_action_t& operator++ (storage_action_t& action) storage_action_t get_action()
{ {
action = static_cast<storage_action_t>((action + 1) % 3); storage_action_t action;
long l = random();
if (l < RAND_MAX / 3)
{
action = STORAGE_PUT;
}
else if (l < 2 * (RAND_MAX / 3))
{
action = STORAGE_GET;
}
else
{
action = STORAGE_DEL;
}
return action; return action;
} }
@ -143,19 +169,17 @@ struct ThreadData
{ {
ThreadData() ThreadData()
: pStorage(0) : pStorage(0)
, pStatements(0) , pCache_items(0)
, thread(0) , thread(0)
, terminate(false) , terminate(false)
, rv(EXIT_SUCCESS) , rv(EXIT_SUCCESS)
, start_action(STORAGE_PUT)
{} {}
Storage* pStorage; Storage* pStorage;
const Statements* pStatements; const CacheItems* pCache_items;
pthread_t thread; pthread_t thread;
bool terminate; bool terminate;
int rv; int rv;
storage_action_t start_action;
}; };
/** /**
@ -169,18 +193,16 @@ struct ThreadData
*/ */
void* test_thread_hitting_thread(void* pData) void* test_thread_hitting_thread(void* pData)
{ {
int rv = EXIT_SUCCESS;
cout << "Thread starting.\n" << flush; cout << "Thread starting.\n" << flush;
ThreadData* pThreadData = static_cast<ThreadData*>(pData); ThreadData* pThread_data = static_cast<ThreadData*>(pData);
Storage& storage = *pThreadData->pStorage; Storage& storage = *pThread_data->pStorage;
const Statements& statements = *pThreadData->pStatements; const CacheItems& cache_items = *pThread_data->pCache_items;
bool& terminate = pThreadData->terminate;
size_t n = statements.size(); size_t n = cache_items.size();
ss_dassert(n > 0); ss_dassert(n > 0);
storage_action_t action = pThreadData->start_action;
size_t n_puts = 0; size_t n_puts = 0;
size_t n_gets = 0; size_t n_gets = 0;
size_t n_dels = 0; size_t n_dels = 0;
@ -188,50 +210,63 @@ void* test_thread_hitting_thread(void* pData)
size_t i = 0; size_t i = 0;
while (!terminate) while (!pThread_data->terminate)
{ {
if (i >= n) if (i >= n)
{ {
i = 0; i = 0;
} }
const Statements::value_type& statement = statements[i]; const CacheItems::value_type& cache_item = cache_items[i];
storage_action_t action = get_action();
switch (action) switch (action)
{ {
case STORAGE_PUT: case STORAGE_PUT:
{ {
cache_result_t result = storage.put_value(statement.first, statement.second); cache_result_t result = storage.put_value(cache_item.first, cache_item.second);
ss_dassert(result == CACHE_RESULT_OK); if (result == CACHE_RESULT_OK)
++n_puts; {
++n_puts;
}
else
{
ss_dassert(!true);
rv = EXIT_FAILURE;
}
} }
break; break;
case STORAGE_GET: case STORAGE_GET:
{ {
GWBUF* pQuery; GWBUF* pQuery;
cache_result_t result = storage.get_value(statement.first, 0, &pQuery); cache_result_t result = storage.get_value(cache_item.first, 0, &pQuery);
if (result == CACHE_RESULT_OK) if (result == CACHE_RESULT_OK)
{ {
ss_dassert(GWBUF_LENGTH(pQuery) == GWBUF_LENGTH(statement.second)); ss_dassert(GWBUF_LENGTH(pQuery) == GWBUF_LENGTH(cache_item.second));
ss_dassert(memcmp(GWBUF_DATA(pQuery), GWBUF_DATA(statement.second), ss_dassert(memcmp(GWBUF_DATA(pQuery), GWBUF_DATA(cache_item.second),
GWBUF_LENGTH(pQuery)) == 0); GWBUF_LENGTH(pQuery)) == 0);
gwbuf_free(pQuery); gwbuf_free(pQuery);
++n_gets; ++n_gets;
} }
else if (result == CACHE_RESULT_NOT_FOUND)
{
++n_misses;
}
else else
{ {
ss_dassert(result == CACHE_RESULT_NOT_FOUND); ss_dassert(!true);
++n_misses; rv = EXIT_FAILURE;
} }
} }
break; break;
case STORAGE_DEL: case STORAGE_DEL:
{ {
cache_result_t result = storage.del_value(statement.first); cache_result_t result = storage.del_value(cache_item.first);
if (result == CACHE_RESULT_OK) if (result == CACHE_RESULT_OK)
{ {
@ -244,6 +279,7 @@ void* test_thread_hitting_thread(void* pData)
else else
{ {
ss_dassert(!true); ss_dassert(!true);
rv = EXIT_FAILURE;
} }
} }
break; break;
@ -252,10 +288,10 @@ void* test_thread_hitting_thread(void* pData)
ss_dassert(!true); ss_dassert(!true);
} }
++action; ++i;
} }
pThreadData->rv = EXIT_SUCCESS; pThread_data->rv = rv;
stringstream ss; stringstream ss;
ss << "Thread ending: " << n_gets << ", " << n_puts << ", " << n_dels << ", " << n_misses << "\n"; ss << "Thread ending: " << n_gets << ", " << n_puts << ", " << n_dels << ", " << n_misses << "\n";
@ -275,33 +311,28 @@ void* test_thread_hitting_thread(void* pData)
* @param n_threads The number of threads that should be used. * @param n_threads The number of threads that should be used.
* @param n_seconds The number of seconds the test should run. * @param n_seconds The number of seconds the test should run.
* @param storage The storage instance to use. * @param storage The storage instance to use.
* @param statements The statements to be used. * @param cache_items The cache items to be used.
* *
* @return EXIT_SUCCESS if successful, otherwise EXIT_FAILURE. * @return EXIT_SUCCESS if successful, otherwise EXIT_FAILURE.
*/ */
int test_thread_hitting(size_t n_threads, size_t n_seconds, Storage& storage, const Statements& statements) int test_thread_hitting(size_t n_threads, size_t n_seconds, Storage& storage, const CacheItems& cache_items)
{ {
int rv = EXIT_SUCCESS; int rv = EXIT_SUCCESS;
ThreadData threadDatas[n_threads]; ThreadData thread_datas[n_threads];
storage_action_t start_action = STORAGE_PUT;
for (size_t i = 0; i < n_threads; ++i) for (size_t i = 0; i < n_threads; ++i)
{ {
ThreadData* pThreadData = &threadDatas[i]; ThreadData* pThread_data = &thread_datas[i];
pThreadData->pStorage = &storage; pThread_data->pStorage = &storage;
pThreadData->pStatements = &statements; pThread_data->pCache_items = &cache_items;
pThreadData->start_action = start_action;
if (pthread_create(&pThreadData->thread, NULL, test_thread_hitting_thread, pThreadData) != 0) if (pthread_create(&pThread_data->thread, NULL, test_thread_hitting_thread, pThread_data) != 0)
{ {
// This is impossible, so we just return. // This is impossible, so we just return.
return EXIT_FAILURE; return EXIT_FAILURE;
} }
++start_action;
} }
stringstream ss; stringstream ss;
@ -315,12 +346,12 @@ int test_thread_hitting(size_t n_threads, size_t n_seconds, Storage& storage, co
for (size_t i = 0; i < n_threads; ++i) for (size_t i = 0; i < n_threads; ++i)
{ {
threadDatas[i].terminate = true; thread_datas[i].terminate = true;
pthread_join(threadDatas[i].thread, NULL); pthread_join(thread_datas[i].thread, NULL);
if (rv == EXIT_SUCCESS) if (rv == EXIT_SUCCESS)
{ {
rv = threadDatas[i].rv; rv = thread_datas[i].rv;
} }
} }
@ -337,28 +368,29 @@ int test_thread_hitting(size_t n_threads, size_t n_seconds, Storage& storage, co
* @param n_threads The number of threads that should be used. * @param n_threads The number of threads that should be used.
* @param n_seconds The number of seconds the test should run. * @param n_seconds The number of seconds the test should run.
* @param storage The storage instance to use. * @param storage The storage instance to use.
* @param istream Stream, expected to refer to a MySQL/MariaDB test file. * @param statements The statements to be used.
* *
* @return EXIT_SUCCESS if successful, otherwise EXIT_FAILURE. * @return EXIT_SUCCESS if successful, otherwise EXIT_FAILURE.
*/ */
int test_thread_hitting(size_t n_threads, size_t n_seconds, Storage& storage, istream& in) int test_thread_hitting(size_t n_threads, size_t n_seconds, Storage& storage, const Statements& statements)
{ {
int rv = EXIT_FAILURE; int rv = EXIT_FAILURE;
// Adjust the number of items according to number of threads and duration CacheItems cache_items;
// of test-run, in the hope of ensuring collisions.
size_t n_statements = n_threads * n_seconds * 50;
Statements statements; if (get_cache_items(statements, storage, &cache_items))
if (get_statements(in, storage, n_statements, &statements))
{ {
rv = test_thread_hitting(n_threads, n_seconds, storage, statements); rv = test_thread_hitting(n_threads, n_seconds, storage, cache_items);
for (Statements::iterator i = statements.begin(); i < statements.end(); ++i) for (CacheItems::iterator i = cache_items.begin(); i < cache_items.end(); ++i)
{ {
gwbuf_free(i->second); gwbuf_free(i->second);
} }
} }
else
{
cerr << "Could not convert statements to cache items." << endl;
}
return rv; return rv;
} }
@ -371,24 +403,27 @@ int test_thread_hitting(size_t n_threads, size_t n_seconds, Storage& storage, is
* @param n_threads The number of threads that should be used. * @param n_threads The number of threads that should be used.
* @param n_seconds The number of seconds the test should run. * @param n_seconds The number of seconds the test should run.
* @param factory The storage factory using which to create the storage. * @param factory The storage factory using which to create the storage.
* @param istream Stream, expected to refer to a MySQL/MariaDB test file. * @param statements The statements that should be used.
* *
* @return EXIT_SUCCESS if successful, otherwise EXIT_FAILURE. * @return EXIT_SUCCESS if successful, otherwise EXIT_FAILURE.
*/ */
int test_raw_storage(size_t n_threads, size_t n_seconds, StorageFactory& factory, istream& in) int test_raw_storage(size_t n_threads,
size_t n_seconds,
StorageFactory& factory,
const Statements& statements)
{ {
int rv = EXIT_FAILURE; int rv = EXIT_FAILURE;
Storage* pStorage = factory.createRawStorage(CACHE_THREAD_MODEL_MT, Storage* pStorage = factory.createRawStorage(CACHE_THREAD_MODEL_MT,
"unspecified", "unspecified",
0, 0, // No TTL
0, 0, // No max count
0, 0, // No max size
0, NULL); 0, NULL);
if (pStorage) if (pStorage)
{ {
rv = test_thread_hitting(n_threads, n_seconds, *pStorage, in); rv = test_thread_hitting(n_threads, n_seconds, *pStorage, statements);
delete pStorage; delete pStorage;
} }
@ -396,6 +431,71 @@ int test_raw_storage(size_t n_threads, size_t n_seconds, StorageFactory& factory
return rv; return rv;
} }
int test_lru_storage(size_t n_threads,
size_t n_seconds,
StorageFactory& factory,
const Statements& statements)
{
int rv = EXIT_FAILURE;
const uint64_t max_count = get_num_statements(n_threads, n_seconds) / 10;
cout << "Statements: " << statements.size() << ", max_count: " << max_count << "." << endl;
Storage* pStorage = factory.createStorage(CACHE_THREAD_MODEL_MT,
"unspecified",
0, // No TTL
max_count,
0, // No max size
0, NULL);
if (pStorage)
{
rv = test_thread_hitting(n_threads, n_seconds, *pStorage, statements);
uint64_t items;
cache_result_t result = pStorage->get_items(&items);
ss_dassert(result == CACHE_RESULT_OK);
if (items != max_count)
{
cout << "Expected " << max_count << ", found " << items << "." << endl;
rv = EXIT_FAILURE;
}
delete pStorage;
}
return rv;
}
int test(size_t n_threads, size_t n_seconds, StorageFactory& factory, istream& in)
{
int rv = EXIT_FAILURE;
Statements statements;
size_t n_statements = get_num_statements(n_threads, n_seconds);
size_t n = get_statements(in, n_statements, &statements);
if (n != 0)
{
cout << "Requested " << n_statements << " statements, got " << n << "." << endl;
cout << "Testing raw storage." << endl;
int rv1 = test_raw_storage(n_threads, n_seconds, factory, statements);
cout << "Testing LRU storage." << endl;
int rv2 = test_lru_storage(n_threads, n_seconds, factory, statements);
rv = (rv1 == EXIT_FAILURE) || (rv2 == EXIT_FAILURE) ? EXIT_FAILURE : EXIT_SUCCESS;
}
else
{
cerr << "Could not read any statements." << endl;
}
return rv;
}
} }
int main(int argc, char* argv[]) int main(int argc, char* argv[])
@ -426,7 +526,7 @@ int main(int argc, char* argv[])
if (argc == 3) if (argc == 3)
{ {
rv = test_raw_storage(n_threads, n_seconds, *pFactory, cin); rv = test(n_threads, n_seconds, *pFactory, cin);
} }
else else
{ {
@ -434,7 +534,7 @@ int main(int argc, char* argv[])
if (in) if (in)
{ {
rv = test_raw_storage(n_threads, n_seconds, *pFactory, in); rv = test(n_threads, n_seconds, *pFactory, in);
} }
else else
{ {