MXS-1489 Create mechanism for running concurrent tasks
This commit introduces maxscale::future, maxscale::packaged_task and maxscale::thread that are modeled after C++11 std::future, std::packaged_task and std::thread as described here: http://en.cppreference.com/w/cpp/thread The standard classes rely upon rvalue references (and move constructors) introduced by C++11. As the C++ compilers we must use are pre-C++11 that feature is obviously not present. The absence of rvalue references is circumvented by implementing regular copy constructors and assignment operators as if the arguments were rvalue references. In practice the above means that when one of these objects are copied, the state is _moved_ rendering the copied object in default initialized state. Some care is needed to ensure that unintended copying does not occur.
This commit is contained in:
@ -15,6 +15,7 @@ add_executable(test_semaphore testsemaphore.cc)
|
||||
add_executable(test_server testserver.cc)
|
||||
add_executable(test_service testservice.cc)
|
||||
add_executable(test_spinlock testspinlock.cc)
|
||||
add_executable(test_thread testthread.cc)
|
||||
add_executable(test_trxcompare testtrxcompare.cc ../../../query_classifier/test/testreader.cc)
|
||||
add_executable(test_trxtracking testtrxtracking.cc)
|
||||
add_executable(test_users testusers.cc)
|
||||
@ -42,6 +43,7 @@ target_link_libraries(test_semaphore maxscale-common)
|
||||
target_link_libraries(test_server maxscale-common)
|
||||
target_link_libraries(test_service maxscale-common)
|
||||
target_link_libraries(test_spinlock maxscale-common)
|
||||
target_link_libraries(test_thread maxscale-common)
|
||||
target_link_libraries(test_trxcompare maxscale-common)
|
||||
target_link_libraries(test_trxtracking maxscale-common)
|
||||
target_link_libraries(test_users maxscale-common)
|
||||
@ -71,6 +73,7 @@ add_test(TestSemaphore test_semaphore)
|
||||
add_test(TestServer test_server)
|
||||
add_test(TestService test_service)
|
||||
add_test(TestSpinlock test_spinlock)
|
||||
add_test(TestThread test_thread)
|
||||
add_test(TestUsers test_users)
|
||||
add_test(TestUtils test_utils)
|
||||
add_test(TestModulecmd testmodulecmd)
|
||||
|
100
server/core/test/testthread.cc
Normal file
100
server/core/test/testthread.cc
Normal file
@ -0,0 +1,100 @@
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2020-01-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
#include <maxscale/thread.hh>
|
||||
// We want asserts in release mode as well.
|
||||
#if !defined(SS_DEBUG)
|
||||
#define SS_DEBUG
|
||||
#endif
|
||||
#include <maxscale/debug.h>
|
||||
using std::cout;
|
||||
using std::endl;
|
||||
using std::vector;
|
||||
|
||||
|
||||
int function(int i)
|
||||
{
|
||||
return i / 2;
|
||||
}
|
||||
|
||||
void test_basics()
|
||||
{
|
||||
cout << __func__ << endl;
|
||||
|
||||
mxs::packaged_task<int, int> t1;
|
||||
ss_dassert(!t1.valid());
|
||||
|
||||
mxs::packaged_task<int, int> t2(function);
|
||||
ss_dassert(t2.valid());
|
||||
|
||||
t1 = t2; // Move task.
|
||||
ss_dassert(t1.valid());
|
||||
ss_dassert(!t2.valid());
|
||||
|
||||
mxs::future<int> f1;
|
||||
ss_dassert(!f1.valid());
|
||||
|
||||
mxs::future<int> f2 = t1.get_future();
|
||||
ss_dassert(t1.valid());
|
||||
ss_dassert(f2.valid());
|
||||
|
||||
f1 = f2; // Move future
|
||||
ss_dassert(f1.valid());
|
||||
ss_dassert(!f2.valid());
|
||||
}
|
||||
|
||||
void test_running()
|
||||
{
|
||||
cout << __func__ << endl;
|
||||
|
||||
const int N = 10;
|
||||
|
||||
vector<mxs::future<int> > results;
|
||||
vector<mxs::thread> threads;
|
||||
|
||||
cout << "Starting threads" << endl;
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
cout << i << endl;
|
||||
mxs::packaged_task<int, int> task(function);
|
||||
mxs::future<int> r = task.get_future();
|
||||
int arg = i;
|
||||
mxs::thread t(task, arg);
|
||||
|
||||
results.push_back(r);
|
||||
threads.push_back(t);
|
||||
}
|
||||
|
||||
cout << "All threads started." << endl;
|
||||
cout << "Waiting for threads." << endl;
|
||||
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
cout << i << endl;
|
||||
threads[i].join();
|
||||
int got = results[i].get();
|
||||
int expected = function(i);
|
||||
|
||||
ss_dassert(got == expected);
|
||||
}
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
test_basics();
|
||||
test_running();
|
||||
return EXIT_SUCCESS;
|
||||
}
|
@ -12,6 +12,7 @@
|
||||
*/
|
||||
|
||||
#include <maxscale/thread.h>
|
||||
#include <maxscale/thread.hh>
|
||||
#include <maxscale/log_manager.h>
|
||||
|
||||
THREAD *thread_start(THREAD *thd, void (*entry)(void *), void *arg, size_t stack_size)
|
||||
@ -68,3 +69,116 @@ void thread_millisleep(int ms)
|
||||
req.tv_nsec = (ms % 1000) * 1000000;
|
||||
nanosleep(&req, NULL);
|
||||
}
|
||||
|
||||
//
|
||||
// maxscale::thread
|
||||
//
|
||||
|
||||
namespace maxscale
|
||||
{
|
||||
|
||||
thread::thread()
|
||||
: m_pInternal(NULL)
|
||||
{
|
||||
}
|
||||
|
||||
thread::thread(const thread& other)
|
||||
: m_pInternal(other.m_pInternal)
|
||||
{
|
||||
other.m_pInternal = NULL;
|
||||
}
|
||||
|
||||
thread& thread::operator = (const thread& rhs)
|
||||
{
|
||||
thread copy(rhs);
|
||||
copy.swap(*this);
|
||||
return *this;
|
||||
}
|
||||
|
||||
thread::~thread()
|
||||
{
|
||||
ss_dassert(!joinable());
|
||||
if (joinable())
|
||||
{
|
||||
MXS_ERROR("A thread that has not been joined is destructed.");
|
||||
}
|
||||
else
|
||||
{
|
||||
delete m_pInternal;
|
||||
}
|
||||
}
|
||||
|
||||
bool thread::joinable() const
|
||||
{
|
||||
return m_pInternal ? m_pInternal->joinable() : false;
|
||||
}
|
||||
|
||||
void thread::join()
|
||||
{
|
||||
ss_dassert(m_pInternal);
|
||||
if (!m_pInternal)
|
||||
{
|
||||
MXS_ERROR("Attempt to join a non-joinable thread.");
|
||||
}
|
||||
else
|
||||
{
|
||||
m_pInternal->join();
|
||||
}
|
||||
}
|
||||
|
||||
void thread::swap(thread& rhs)
|
||||
{
|
||||
std::swap(m_pInternal, rhs.m_pInternal);
|
||||
}
|
||||
|
||||
void thread::run()
|
||||
{
|
||||
ss_dassert(m_pInternal);
|
||||
m_pInternal->run();
|
||||
}
|
||||
|
||||
thread::internal::internal(thread::task* pTask)
|
||||
: m_pTask(pTask)
|
||||
, m_thread(0)
|
||||
{
|
||||
}
|
||||
|
||||
thread::internal::~internal()
|
||||
{
|
||||
ss_info_dassert(!m_pTask, "Thread not joined before destructed.");
|
||||
ss_dassert(m_thread == 0);
|
||||
}
|
||||
|
||||
bool thread::internal::joinable() const
|
||||
{
|
||||
return m_thread != 0;
|
||||
}
|
||||
|
||||
void thread::internal::join()
|
||||
{
|
||||
ss_dassert(joinable());
|
||||
thread_wait(m_thread);
|
||||
delete m_pTask;
|
||||
m_pTask = NULL;
|
||||
m_thread = 0;
|
||||
}
|
||||
|
||||
void thread::internal::run()
|
||||
{
|
||||
if (!thread_start(&m_thread, &thread::internal::main, this, 0))
|
||||
{
|
||||
MXS_ALERT("Could not start thread, MaxScale is likely to malfunction.");
|
||||
}
|
||||
}
|
||||
|
||||
void thread::internal::main()
|
||||
{
|
||||
m_pTask->run();
|
||||
}
|
||||
|
||||
void thread::internal::main(void* pArg)
|
||||
{
|
||||
static_cast<internal*>(pArg)->main();
|
||||
}
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user