Synchronize the worker thread with the starting thread

This guarantees that the caller of the start function will know whether
the worker is running by looking at its state.

This will prevent multiple successive stop calls to a worker which
happened when the monitors were altered via the REST API.
This commit is contained in:
Markus Mäkelä 2018-08-17 16:21:22 +03:00
parent 9510a3ae1a
commit 208949f1cb
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19
2 changed files with 24 additions and 10 deletions

View File

@ -22,6 +22,7 @@
#include <maxbase/semaphore.hh>
#include <maxscale/platform.h>
#include <maxscale/semaphore.hh>
#include <maxscale/session.h>
#include <maxscale/utils.hh>
#include <maxscale/worker.h>
@ -672,8 +673,10 @@ public:
* The worker will run the poll loop, until it is told to shut down.
*
* @attention This function will run in the calling thread.
*
* @param pSem Semaphore that is posted on once the thread has started
*/
void run();
void run(mxs::Semaphore* pSem = NULL);
/**
* Run worker in separate thread.
@ -1177,7 +1180,7 @@ private:
void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override
static void thread_main(Worker* pThis);
static void thread_main(Worker* pThis, mxs::Semaphore* pSem);
void poll_waitevents();

View File

@ -527,17 +527,30 @@ bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
return m_pQueue->post(message);
}
void Worker::run()
void Worker::run(mxs::Semaphore* pSem)
{
this_thread.pCurrent_worker = this;
if (pre_run())
{
m_state = IDLE;
if (pSem)
{
pSem->post();
}
poll_waitevents();
m_state = STOPPED;
post_run();
MXS_INFO("Worker %p has shut down.", this);
}
else if (pSem)
{
pSem->post();
}
this_thread.pCurrent_worker = nullptr;
}
@ -546,6 +559,7 @@ bool Worker::start()
{
ss_dassert(!m_started);
ss_dassert(m_thread.get_id() == std::thread::id());
mxs::Semaphore sem;
m_started = true;
m_should_shutdown = false;
@ -553,7 +567,8 @@ bool Worker::start()
try
{
m_thread = std::thread(&Worker::thread_main, this);
m_thread = std::thread(&Worker::thread_main, this, &sem);
sem.wait();
}
catch (const std::exception& x)
{
@ -649,9 +664,9 @@ void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& ms
* @param arg A worker.
*/
//static
void Worker::thread_main(Worker* pThis)
void Worker::thread_main(Worker* pThis, mxs::Semaphore* pSem)
{
pThis->run();
pThis->run(pSem);
}
bool Worker::pre_run()
@ -741,8 +756,6 @@ void Worker::poll_waitevents()
{
struct epoll_event events[m_max_events];
m_state = IDLE;
m_load.reset();
int64_t nFds_total = 0;
@ -877,8 +890,6 @@ void Worker::poll_waitevents()
m_state = IDLE;
} /*< while(1) */
m_state = STOPPED;
}
namespace