MXS-2004 Replace THREAD with std::thread in Worker

This commit is contained in:
Johan Wikman 2018-08-10 13:22:47 +03:00
parent 6db03d4b29
commit 47b53aae55
4 changed files with 29 additions and 17 deletions

View File

@ -17,6 +17,7 @@
#include <functional>
#include <map>
#include <memory>
#include <thread>
#include <unordered_set>
#include <maxscale/platform.h>
@ -672,12 +673,9 @@ public:
* This function will start a new thread, in which the `run`
* function will be executed.
*
* @param stack_size The stack size of the new thread. A value of 0 means
* that the pthread default should be used.
*
* @return True if the thread could be started, false otherwise.
*/
bool start(size_t stack_size = 0);
bool start();
/**
* Waits for the worker to finish.
@ -1171,7 +1169,7 @@ private:
void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override
static void thread_main(void* arg);
static void thread_main(Worker* pThis);
void poll_waitevents();
@ -1193,7 +1191,7 @@ private:
STATISTICS m_statistics; /*< Worker statistics. */
MessageQueue* m_pQueue; /*< The message queue of the worker. */
THREAD m_thread; /*< The thread handle of the worker. */
std::thread m_thread; /*< The thread object of the worker. */
bool m_started; /*< Whether the thread has been started or not. */
bool m_should_shutdown; /*< Whether shutdown should be performed. */
bool m_shutdown_initiated; /*< Whether shutdown has been initated. */

View File

@ -146,8 +146,15 @@ bool Housekeeper::start()
sem_init(&res.sem, 0, 0);
res.ok = false;
hk->m_thread = std::thread(hkthread, &res);
sem_wait(&res.sem);
try
{
hk->m_thread = std::thread(hkthread, &res);
sem_wait(&res.sem);
}
catch (const std::exception& x)
{
MXS_ERROR("Could not start housekeeping thread: %s", x.what());
}
sem_destroy(&res.sem);
return res.ok;

View File

@ -381,7 +381,6 @@ int RoutingWorker::get_current_id()
bool RoutingWorker::start_threaded_workers()
{
bool rv = true;
size_t stack_size = config_thread_stack_size();
for (int i = this_unit.id_min_worker; i <= this_unit.id_max_worker; ++i)
{
@ -392,7 +391,7 @@ bool RoutingWorker::start_threaded_workers()
RoutingWorker* pWorker = this_unit.ppWorkers[i];
ss_dassert(pWorker);
if (!pWorker->start(stack_size))
if (!pWorker->start())
{
MXS_ALERT("Could not start routing worker %d of %d.", i, config_threadcount());
rv = false;

View File

@ -301,7 +301,6 @@ Worker::Worker()
: m_epoll_fd(create_epoll_instance())
, m_state(STOPPED)
, m_pQueue(NULL)
, m_thread(0)
, m_started(false)
, m_should_shutdown(false)
, m_shutdown_initiated(false)
@ -552,14 +551,22 @@ void Worker::run()
this_thread.pCurrent_worker = nullptr;
}
bool Worker::start(size_t stack_size)
bool Worker::start()
{
ss_dassert(!m_started);
ss_dassert(m_thread.get_id() == std::thread::id());
m_started = true;
m_should_shutdown = false;
m_shutdown_initiated = false;
if (!thread_start(&m_thread, &Worker::thread_main, this, stack_size))
try
{
m_thread = std::thread(&Worker::thread_main, this);
}
catch (const std::exception& x)
{
MXS_ERROR("Could not start worker thread: %s", x.what());
m_started = false;
}
@ -568,10 +575,12 @@ bool Worker::start(size_t stack_size)
void Worker::join()
{
ss_dassert(m_thread.get_id() != std::thread::id());
if (m_started)
{
MXS_INFO("Waiting for worker %p.", this);
thread_wait(m_thread);
m_thread.join();
MXS_INFO("Waited for worker %p.", this);
m_started = false;
}
@ -659,10 +668,9 @@ void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& ms
* @param arg A worker.
*/
//static
void Worker::thread_main(void* pArg)
void Worker::thread_main(Worker* pThis)
{
Worker* pWorker = static_cast<Worker*>(pArg);
pWorker->run();
pThis->run();
}
bool Worker::pre_run()