Cache: Add Tester class

A Tester class that simplifies the creation of various cache tests.
It provides the basic mechanisms for reading statements from MySQL/
MariaDB test files, for converting statements into equivalent
cache items (key + statement) and for running various tasks in
separate threads for a specific amount of time.
This commit is contained in:
Johan Wikman
2016-12-15 16:59:32 +02:00
parent 2751640a02
commit 179762ff0a
2 changed files with 496 additions and 0 deletions

View File

@ -0,0 +1,275 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "tester.hh"
#include <algorithm>
#include <iostream>
#include <set>
#include "storage.hh"
// TODO: Move this to a common place.
#include "../../../../../query_classifier/test/testreader.hh"
using maxscale::TestReader;
using namespace std;
//
// class Tester::Thread
//
class Tester::Thread
{
public:
Thread(Tester::Task* pTask)
: m_pTask(pTask)
, m_thread(0)
{
ss_dassert(pTask);
}
~Thread()
{
ss_dassert(m_thread == 0);
}
static Thread from_task(Tester::Task* pTask)
{
return Thread(pTask);
}
Tester::Task* task() { return m_pTask; }
void start()
{
ss_dassert(m_thread == 0);
if (pthread_create(&m_thread, NULL, &Thread::thread_main, this) != 0)
{
cerr << "FATAL: Could not launch thread." << endl;
exit(EXIT_FAILURE);
}
}
void wait()
{
ss_dassert(m_thread != 0);
pthread_join(m_thread, NULL);
m_thread = 0;
}
static void start_thread(Thread& thread)
{
thread.start();
}
static void wait_for_thread(Thread& thread)
{
thread.wait();
}
void run()
{
m_pTask->out() << "Thread started.\n" << flush;
m_pTask->set_rv(m_pTask->run());
}
static void run(Thread* pThread)
{
pThread->run();
}
static void* thread_main(void* pData)
{
run(static_cast<Thread*>(pData));
return 0;
}
private:
Tester::Task* m_pTask;
pthread_t m_thread;
};
//
// Tester::Task
//
Tester::Task::Task(std::ostream* pOut)
: m_out(*pOut)
, m_terminate(false)
, m_rv(0)
{
}
Tester::Task::~Task()
{
}
//
// Tester
//
Tester::Tester(ostream* pOut)
: m_out(*pOut)
{
}
Tester::~Tester()
{
}
// static
GWBUF* Tester::gwbuf_from_string(const std::string& s)
{
size_t len = s.length();
size_t payload_len = len + 1;
size_t gwbuf_len = MYSQL_HEADER_LEN + payload_len;
GWBUF* pBuf = gwbuf_alloc(gwbuf_len);
*((unsigned char*)((char*)GWBUF_DATA(pBuf))) = payload_len;
*((unsigned char*)((char*)GWBUF_DATA(pBuf) + 1)) = (payload_len >> 8);
*((unsigned char*)((char*)GWBUF_DATA(pBuf) + 2)) = (payload_len >> 16);
*((unsigned char*)((char*)GWBUF_DATA(pBuf) + 3)) = 0x00;
*((unsigned char*)((char*)GWBUF_DATA(pBuf) + 4)) = 0x03;
memcpy((char*)GWBUF_DATA(pBuf) + 5, s.c_str(), len);
return pBuf;
}
// static
bool Tester::get_statements(std::istream& in, size_t n_statements, Statements* pStatements)
{
TestReader::result_t result = TestReader::RESULT_ERROR;
typedef set<string> StatementsSet;
StatementsSet statements;
TestReader reader(in);
size_t n = 0;
string statement;
while ((n < n_statements) &&
((result = reader.get_statement(statement)) == TestReader::RESULT_STMT))
{
if (statements.find(statement) == statements.end())
{
// Not seen before
statements.insert(statement);
pStatements->push_back(statement);
++n;
}
}
return result != TestReader::RESULT_ERROR;
}
// static
bool Tester::get_cache_items(const Statements& statements, const Storage& storage, CacheItems* pItems)
{
bool success = true;
Statements::const_iterator i = statements.begin();
while (success && (i != statements.end()))
{
GWBUF* pQuery = gwbuf_from_string(*i);
if (pQuery)
{
CACHE_KEY key;
cache_result_t result = storage.get_key(NULL, pQuery, &key);
if (result == CACHE_RESULT_OK)
{
pItems->push_back(std::make_pair(key, pQuery));
}
else
{
ss_dassert(!true);
success = false;
}
}
else
{
ss_dassert(!true);
success = false;
}
++i;
}
return success;
}
//static
bool Tester::get_cache_items(std::istream& in,
size_t n_items,
const Storage& storage,
CacheItems* pItems)
{
Statements statements;
bool rv = get_statements(in, n_items, &statements);
if (rv)
{
rv = get_cache_items(statements, storage, pItems);
}
return rv;
}
//static
void Tester::clear_cache_items(CacheItems& cache_items)
{
for (CacheItems::iterator i = cache_items.begin(); i != cache_items.end(); ++i)
{
gwbuf_free(i->second);
}
cache_items.clear();
}
// static
int Tester::run(ostream& out, size_t n_seconds, const vector<Task*>& tasks)
{
vector<Thread> threads;
transform(tasks.begin(), tasks.end(), back_inserter(threads), &Thread::from_task);
out << "Starting " << tasks.size() << " threads, running for " << n_seconds << " seconds." << endl;
for_each(threads.begin(), threads.end(), &Thread::start_thread);
sleep(n_seconds);
for_each(tasks.begin(), tasks.end(), &Task::terminate_task);
for_each(threads.begin(), threads.end(), &Thread::wait_for_thread);
out << "Threads terminated." << endl;
int rv;
if (find_if(tasks.begin(), tasks.end(), &Task::failed) == tasks.end())
{
rv = EXIT_SUCCESS;
}
else
{
rv = EXIT_FAILURE;
}
return rv;
}

View File

@ -0,0 +1,221 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include <maxscale/buffer.h>
#include <ostream>
#include <string>
#include <vector>
#include <pthread.h>
#include "cache_storage_api.hh"
class Storage;
class Tester
{
public:
class Task;
typedef std::vector<std::string> Statements;
typedef std::vector<std::pair<CACHE_KEY, GWBUF*> > CacheItems;
typedef std::vector<Task*> Tasks;
class Thread;
class Task
{
public:
virtual ~Task();
/**
* Called from a thread. Concrete implementation in a derived class
* is expected to run continuously until @should_terminate returns
* true. The return value will be stored in the @m_rv member variable.
*
* @return EXIT_SUCCESS or EXIT_FAILURE.
*/
virtual int run() = 0;
/**
* Whether the task should terminate. To be called by @run in derived
* concrete Task class.
*
* @return True, if the task should terminate, i.e., return from @c run.
*/
bool should_terminate() const { return m_terminate; }
/**
* Tell the task to terminate.
*/
void terminate() { m_terminate = true; }
/**
* Calls terminate on the provided task. For use in algorithms.
*
* @param pTask The task to terminate.
*/
static void terminate_task(Task* pTask)
{
pTask->terminate();
}
/**
* Deletes the provided task. For use in algorithms.
*
* @param pTask The task to delete.
*/
static void free(Task* pTask)
{
delete pTask;
}
/**
* Predicate for finding out whether a task failed. To be called only
* after the task has terminated. For use in algorithms.
*
* @param pTask The task to query.
*
* @return True, if the task failed.
*/
static bool failed(const Task* pTask)
{
return pTask->rv() == EXIT_FAILURE;
}
/**
* What the @run function returned. Meaningful only after the task
* has terminated.
*
* @return The value returned by @run.
*/
int rv() const { return m_rv; }
/**
* The stream to be used for user output.
*
* @return The output stream to be used.
*/
std::ostream& out() const { return m_out; }
protected:
/**
* Constructor
*
* @param pOut Pointer to the stream to use for user output. Note that
* the pointer must remain valid for the lifetime of the Task.
*/
Task(std::ostream* pOut);
private:
friend class Thread;
void set_rv(int rv) { m_rv = rv; }
private:
Task(const Task&);
Task& operator = (const Task&);
private:
std::ostream& m_out;
bool m_terminate;
int m_rv;
};
virtual ~Tester();
/**
* Convert a string to a COM_QUERY GWBUF.
*
* @param s The string to be converted.
*
* @return A GWBUF or NULL if memory allocation failed.
*/
static GWBUF* gwbuf_from_string(const std::string& s);
/**
* Returns statements from a MySQL/MariaDB server test file.
*
* @param in The stream from which input should be read. Assumed to refer to a
* MySQL/MariaDB test file.
* @param n_statements How many statements to return.
* @param pStatements Pointer to vector where statements will be back inserted.
* May contain less statements that specified in @n_statements.
*
* @return Whether reading was successful, not whether @n_statements statements were returned.
*/
static bool get_statements(std::istream& in, size_t n_statements, Statements* pStatements);
/**
* Converts a set of statements into cache items (i.e. key + statement).
*
* @param statements A number of statements.
* @param storage The storage using which the cache keys should be generated.
* @param pItems Pointer to vector where the items will be back inserted.
*
* @return Whether the conversion was successful.
*/
static bool get_cache_items(const Statements& statements,
const Storage& storage,
CacheItems* pItems);
/**
* Converts statements from a stream into cache items (i.e. key + GWBUF).
*
* @param statements A number of statements.
* @param n_items How many items should be returned.
* @param storage The storage using which the cache keys should be generated.
* @param pItems Pointer to vector where the items will be back inserted.
*
* @return Whether reading and conversion was successful, not whether @n_items
* items were returned.
*/
static bool get_cache_items(std::istream& in,
size_t n_items,
const Storage& storage,
CacheItems* pItems);
/**
* Deletes the GWBUFs, and empties the vector.
*
* @param cache_items The vector to be cleared.
*/
static void clear_cache_items(CacheItems& cache_items);
protected:
/**
* Constructor
*
* @param pOut Pointer to stream to be used for (user) output. Must remain
* valid for the lifetime of the Tester instance.
*/
Tester(std::ostream* pOut);
/**
* The stream to be used for (user) output.
*
* @return A stream.
*/
std::ostream& out() const { return m_out; }
/**
* Run a specific number of tasks in as many threads.
*
* @param out The stream to be used for (user) output.
* @param n_seconds How many seconds the tasks should run.
* @param tasks Vector of tasks.
*
* @return EXIT_SUCCESS if each task returned EXIT_SUCCESS, otherwise EXIT_FAILURE.
*/
static int run(std::ostream& out, size_t n_seconds, const Tasks& tasks);
private:
std::ostream& m_out;
};