From 47b53aae55b64c509ed06be0f8a616bf1cd9aaaa Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Fri, 10 Aug 2018 13:22:47 +0300 Subject: [PATCH] MXS-2004 Replace THREAD with std::thread in Worker --- include/maxscale/worker.hh | 10 ++++------ server/core/housekeeper.cc | 11 +++++++++-- server/core/routingworker.cc | 3 +-- server/core/worker.cc | 22 +++++++++++++++------- 4 files changed, 29 insertions(+), 17 deletions(-) diff --git a/include/maxscale/worker.hh b/include/maxscale/worker.hh index 2ebcb5178..8e82efdfb 100644 --- a/include/maxscale/worker.hh +++ b/include/maxscale/worker.hh @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -672,12 +673,9 @@ public: * This function will start a new thread, in which the `run` * function will be executed. * - * @param stack_size The stack size of the new thread. A value of 0 means - * that the pthread default should be used. - * * @return True if the thread could be started, false otherwise. */ - bool start(size_t stack_size = 0); + bool start(); /** * Waits for the worker to finish. @@ -1171,7 +1169,7 @@ private: void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override - static void thread_main(void* arg); + static void thread_main(Worker* pThis); void poll_waitevents(); @@ -1193,7 +1191,7 @@ private: STATISTICS m_statistics; /*< Worker statistics. */ MessageQueue* m_pQueue; /*< The message queue of the worker. */ - THREAD m_thread; /*< The thread handle of the worker. */ + std::thread m_thread; /*< The thread object of the worker. */ bool m_started; /*< Whether the thread has been started or not. */ bool m_should_shutdown; /*< Whether shutdown should be performed. */ bool m_shutdown_initiated; /*< Whether shutdown has been initated. */ diff --git a/server/core/housekeeper.cc b/server/core/housekeeper.cc index fd80412bc..cc21b4284 100644 --- a/server/core/housekeeper.cc +++ b/server/core/housekeeper.cc @@ -146,8 +146,15 @@ bool Housekeeper::start() sem_init(&res.sem, 0, 0); res.ok = false; - hk->m_thread = std::thread(hkthread, &res); - sem_wait(&res.sem); + try + { + hk->m_thread = std::thread(hkthread, &res); + sem_wait(&res.sem); + } + catch (const std::exception& x) + { + MXS_ERROR("Could not start housekeeping thread: %s", x.what()); + } sem_destroy(&res.sem); return res.ok; diff --git a/server/core/routingworker.cc b/server/core/routingworker.cc index 780832e6b..ae443c9cb 100644 --- a/server/core/routingworker.cc +++ b/server/core/routingworker.cc @@ -381,7 +381,6 @@ int RoutingWorker::get_current_id() bool RoutingWorker::start_threaded_workers() { bool rv = true; - size_t stack_size = config_thread_stack_size(); for (int i = this_unit.id_min_worker; i <= this_unit.id_max_worker; ++i) { @@ -392,7 +391,7 @@ bool RoutingWorker::start_threaded_workers() RoutingWorker* pWorker = this_unit.ppWorkers[i]; ss_dassert(pWorker); - if (!pWorker->start(stack_size)) + if (!pWorker->start()) { MXS_ALERT("Could not start routing worker %d of %d.", i, config_threadcount()); rv = false; diff --git a/server/core/worker.cc b/server/core/worker.cc index c6f99043c..b62d07c2e 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -301,7 +301,6 @@ Worker::Worker() : m_epoll_fd(create_epoll_instance()) , m_state(STOPPED) , m_pQueue(NULL) - , m_thread(0) , m_started(false) , m_should_shutdown(false) , m_shutdown_initiated(false) @@ -552,14 +551,22 @@ void Worker::run() this_thread.pCurrent_worker = nullptr; } -bool Worker::start(size_t stack_size) +bool Worker::start() { + ss_dassert(!m_started); + ss_dassert(m_thread.get_id() == std::thread::id()); + m_started = true; m_should_shutdown = false; m_shutdown_initiated = false; - if (!thread_start(&m_thread, &Worker::thread_main, this, stack_size)) + try { + m_thread = std::thread(&Worker::thread_main, this); + } + catch (const std::exception& x) + { + MXS_ERROR("Could not start worker thread: %s", x.what()); m_started = false; } @@ -568,10 +575,12 @@ bool Worker::start(size_t stack_size) void Worker::join() { + ss_dassert(m_thread.get_id() != std::thread::id()); + if (m_started) { MXS_INFO("Waiting for worker %p.", this); - thread_wait(m_thread); + m_thread.join(); MXS_INFO("Waited for worker %p.", this); m_started = false; } @@ -659,10 +668,9 @@ void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& ms * @param arg A worker. */ //static -void Worker::thread_main(void* pArg) +void Worker::thread_main(Worker* pThis) { - Worker* pWorker = static_cast(pArg); - pWorker->run(); + pThis->run(); } bool Worker::pre_run()