diff --git a/server/modules/filter/cache/test/tester.cc b/server/modules/filter/cache/test/tester.cc new file mode 100644 index 000000000..cc93f2407 --- /dev/null +++ b/server/modules/filter/cache/test/tester.cc @@ -0,0 +1,275 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "tester.hh" +#include +#include +#include +#include "storage.hh" +// TODO: Move this to a common place. +#include "../../../../../query_classifier/test/testreader.hh" + +using maxscale::TestReader; +using namespace std; + +// +// class Tester::Thread +// + +class Tester::Thread +{ +public: + Thread(Tester::Task* pTask) + : m_pTask(pTask) + , m_thread(0) + { + ss_dassert(pTask); + } + + ~Thread() + { + ss_dassert(m_thread == 0); + } + + static Thread from_task(Tester::Task* pTask) + { + return Thread(pTask); + } + + Tester::Task* task() { return m_pTask; } + + void start() + { + ss_dassert(m_thread == 0); + + if (pthread_create(&m_thread, NULL, &Thread::thread_main, this) != 0) + { + cerr << "FATAL: Could not launch thread." << endl; + exit(EXIT_FAILURE); + } + } + + void wait() + { + ss_dassert(m_thread != 0); + + pthread_join(m_thread, NULL); + m_thread = 0; + } + + static void start_thread(Thread& thread) + { + thread.start(); + } + + static void wait_for_thread(Thread& thread) + { + thread.wait(); + } + + void run() + { + m_pTask->out() << "Thread started.\n" << flush; + m_pTask->set_rv(m_pTask->run()); + } + + static void run(Thread* pThread) + { + pThread->run(); + } + + static void* thread_main(void* pData) + { + run(static_cast(pData)); + return 0; + } + +private: + Tester::Task* m_pTask; + pthread_t m_thread; +}; + +// +// Tester::Task +// + +Tester::Task::Task(std::ostream* pOut) + : m_out(*pOut) + , m_terminate(false) + , m_rv(0) +{ +} + +Tester::Task::~Task() +{ +} + +// +// Tester +// + +Tester::Tester(ostream* pOut) + : m_out(*pOut) +{ +} + +Tester::~Tester() +{ +} + +// static +GWBUF* Tester::gwbuf_from_string(const std::string& s) +{ + size_t len = s.length(); + size_t payload_len = len + 1; + size_t gwbuf_len = MYSQL_HEADER_LEN + payload_len; + + GWBUF* pBuf = gwbuf_alloc(gwbuf_len); + + *((unsigned char*)((char*)GWBUF_DATA(pBuf))) = payload_len; + *((unsigned char*)((char*)GWBUF_DATA(pBuf) + 1)) = (payload_len >> 8); + *((unsigned char*)((char*)GWBUF_DATA(pBuf) + 2)) = (payload_len >> 16); + *((unsigned char*)((char*)GWBUF_DATA(pBuf) + 3)) = 0x00; + *((unsigned char*)((char*)GWBUF_DATA(pBuf) + 4)) = 0x03; + memcpy((char*)GWBUF_DATA(pBuf) + 5, s.c_str(), len); + + return pBuf; +} + +// static +bool Tester::get_statements(std::istream& in, size_t n_statements, Statements* pStatements) +{ + TestReader::result_t result = TestReader::RESULT_ERROR; + typedef set StatementsSet; + + StatementsSet statements; + + TestReader reader(in); + + size_t n = 0; + string statement; + while ((n < n_statements) && + ((result = reader.get_statement(statement)) == TestReader::RESULT_STMT)) + { + if (statements.find(statement) == statements.end()) + { + // Not seen before + statements.insert(statement); + + pStatements->push_back(statement); + ++n; + } + } + + return result != TestReader::RESULT_ERROR; +} + +// static +bool Tester::get_cache_items(const Statements& statements, const Storage& storage, CacheItems* pItems) +{ + bool success = true; + + Statements::const_iterator i = statements.begin(); + + while (success && (i != statements.end())) + { + GWBUF* pQuery = gwbuf_from_string(*i); + if (pQuery) + { + CACHE_KEY key; + cache_result_t result = storage.get_key(NULL, pQuery, &key); + + if (result == CACHE_RESULT_OK) + { + pItems->push_back(std::make_pair(key, pQuery)); + } + else + { + ss_dassert(!true); + success = false; + } + } + else + { + ss_dassert(!true); + success = false; + } + + ++i; + } + + return success; +} + +//static +bool Tester::get_cache_items(std::istream& in, + size_t n_items, + const Storage& storage, + CacheItems* pItems) +{ + Statements statements; + + bool rv = get_statements(in, n_items, &statements); + + if (rv) + { + rv = get_cache_items(statements, storage, pItems); + } + + return rv; +} + +//static +void Tester::clear_cache_items(CacheItems& cache_items) +{ + for (CacheItems::iterator i = cache_items.begin(); i != cache_items.end(); ++i) + { + gwbuf_free(i->second); + } + + cache_items.clear(); +} + +// static +int Tester::run(ostream& out, size_t n_seconds, const vector& tasks) +{ + vector threads; + + transform(tasks.begin(), tasks.end(), back_inserter(threads), &Thread::from_task); + + out << "Starting " << tasks.size() << " threads, running for " << n_seconds << " seconds." << endl; + + for_each(threads.begin(), threads.end(), &Thread::start_thread); + + sleep(n_seconds); + + for_each(tasks.begin(), tasks.end(), &Task::terminate_task); + + for_each(threads.begin(), threads.end(), &Thread::wait_for_thread); + + out << "Threads terminated." << endl; + + int rv; + + if (find_if(tasks.begin(), tasks.end(), &Task::failed) == tasks.end()) + { + rv = EXIT_SUCCESS; + } + else + { + rv = EXIT_FAILURE; + } + + return rv; +} + diff --git a/server/modules/filter/cache/test/tester.hh b/server/modules/filter/cache/test/tester.hh new file mode 100644 index 000000000..6cc34ba65 --- /dev/null +++ b/server/modules/filter/cache/test/tester.hh @@ -0,0 +1,221 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include +#include +#include +#include +#include +#include "cache_storage_api.hh" + +class Storage; + +class Tester +{ +public: + class Task; + + typedef std::vector Statements; + typedef std::vector > CacheItems; + typedef std::vector Tasks; + + class Thread; + class Task + { + public: + virtual ~Task(); + + /** + * Called from a thread. Concrete implementation in a derived class + * is expected to run continuously until @should_terminate returns + * true. The return value will be stored in the @m_rv member variable. + * + * @return EXIT_SUCCESS or EXIT_FAILURE. + */ + virtual int run() = 0; + + /** + * Whether the task should terminate. To be called by @run in derived + * concrete Task class. + * + * @return True, if the task should terminate, i.e., return from @c run. + */ + bool should_terminate() const { return m_terminate; } + + /** + * Tell the task to terminate. + */ + void terminate() { m_terminate = true; } + + /** + * Calls terminate on the provided task. For use in algorithms. + * + * @param pTask The task to terminate. + */ + static void terminate_task(Task* pTask) + { + pTask->terminate(); + } + + /** + * Deletes the provided task. For use in algorithms. + * + * @param pTask The task to delete. + */ + static void free(Task* pTask) + { + delete pTask; + } + + /** + * Predicate for finding out whether a task failed. To be called only + * after the task has terminated. For use in algorithms. + * + * @param pTask The task to query. + * + * @return True, if the task failed. + */ + static bool failed(const Task* pTask) + { + return pTask->rv() == EXIT_FAILURE; + } + + /** + * What the @run function returned. Meaningful only after the task + * has terminated. + * + * @return The value returned by @run. + */ + int rv() const { return m_rv; } + + /** + * The stream to be used for user output. + * + * @return The output stream to be used. + */ + std::ostream& out() const { return m_out; } + + protected: + /** + * Constructor + * + * @param pOut Pointer to the stream to use for user output. Note that + * the pointer must remain valid for the lifetime of the Task. + */ + Task(std::ostream* pOut); + + private: + friend class Thread; + void set_rv(int rv) { m_rv = rv; } + + private: + Task(const Task&); + Task& operator = (const Task&); + + private: + std::ostream& m_out; + bool m_terminate; + int m_rv; + }; + + virtual ~Tester(); + + /** + * Convert a string to a COM_QUERY GWBUF. + * + * @param s The string to be converted. + * + * @return A GWBUF or NULL if memory allocation failed. + */ + static GWBUF* gwbuf_from_string(const std::string& s); + + /** + * Returns statements from a MySQL/MariaDB server test file. + * + * @param in The stream from which input should be read. Assumed to refer to a + * MySQL/MariaDB test file. + * @param n_statements How many statements to return. + * @param pStatements Pointer to vector where statements will be back inserted. + * May contain less statements that specified in @n_statements. + * + * @return Whether reading was successful, not whether @n_statements statements were returned. + */ + static bool get_statements(std::istream& in, size_t n_statements, Statements* pStatements); + + /** + * Converts a set of statements into cache items (i.e. key + statement). + * + * @param statements A number of statements. + * @param storage The storage using which the cache keys should be generated. + * @param pItems Pointer to vector where the items will be back inserted. + * + * @return Whether the conversion was successful. + */ + static bool get_cache_items(const Statements& statements, + const Storage& storage, + CacheItems* pItems); + + /** + * Converts statements from a stream into cache items (i.e. key + GWBUF). + * + * @param statements A number of statements. + * @param n_items How many items should be returned. + * @param storage The storage using which the cache keys should be generated. + * @param pItems Pointer to vector where the items will be back inserted. + * + * @return Whether reading and conversion was successful, not whether @n_items + * items were returned. + */ + static bool get_cache_items(std::istream& in, + size_t n_items, + const Storage& storage, + CacheItems* pItems); + + /** + * Deletes the GWBUFs, and empties the vector. + * + * @param cache_items The vector to be cleared. + */ + static void clear_cache_items(CacheItems& cache_items); + +protected: + /** + * Constructor + * + * @param pOut Pointer to stream to be used for (user) output. Must remain + * valid for the lifetime of the Tester instance. + */ + Tester(std::ostream* pOut); + + /** + * The stream to be used for (user) output. + * + * @return A stream. + */ + std::ostream& out() const { return m_out; } + + /** + * Run a specific number of tasks in as many threads. + * + * @param out The stream to be used for (user) output. + * @param n_seconds How many seconds the tasks should run. + * @param tasks Vector of tasks. + * + * @return EXIT_SUCCESS if each task returned EXIT_SUCCESS, otherwise EXIT_FAILURE. + */ + static int run(std::ostream& out, size_t n_seconds, const Tasks& tasks); + +private: + std::ostream& m_out; +};