Add shutdown support

The shutdown is now performed so that a shutdown message is
sent to all workers. When the workers receive that message, they
turn on a shutdown flag, which subsequently is checked in the poll
loop.
This commit is contained in:
Johan Wikman
2017-02-16 13:54:18 +02:00
parent ab37333ce5
commit 5138032fe5
7 changed files with 99 additions and 31 deletions

View File

@ -20,12 +20,14 @@ MXS_BEGIN_DECLS
typedef struct mxs_worker
{
MXS_POLL_DATA poll; /*< The poll data used by the polling mechanism. */
int id; /*< The id of the worker. */
int read_fd; /*< The file descriptor the worked reads from. */
int write_fd; /*< The file descriptor used for sending data to the worker. */
THREAD thread; /*< The thread handle of the worker. */
bool started; /*< Whether the thread has been started or not. */
MXS_POLL_DATA poll; /*< The poll data used by the polling mechanism. */
int id; /*< The id of the worker. */
int read_fd; /*< The file descriptor the worked reads from. */
int write_fd; /*< The file descriptor used for sending data to the worker. */
THREAD thread; /*< The thread handle of the worker. */
bool started; /*< Whether the thread has been started or not. */
bool should_shutdown; /*< Whether shutdown should be performed. */
bool shutdown_initiated; /*< Whether shutdown has been initated. */
} MXS_WORKER;
enum mxs_worker_msg_id
@ -37,7 +39,15 @@ enum mxs_worker_msg_id
* arg2: NULL or pointer to dynamically allocated NULL-terminated string,
* to be freed by worker.
*/
MXS_WORKER_MSG_PING
MXS_WORKER_MSG_PING,
/**
* Shutdown message.
*
* arg1: 0
* arg2: NULL
*/
MXS_WORKER_MSG_SHUTDOWN
};
/**
@ -75,6 +85,8 @@ static inline int mxs_worker_id(MXS_WORKER* worker)
*
* @attention The return value tells *only* whether the message could be sent,
* *not* that it has reached the worker.
*
* @attention This function is signal safe.
*/
bool mxs_worker_post_message(MXS_WORKER* worker, int msg_id, int64_t arg1, void* arg2);

View File

@ -2002,7 +2002,7 @@ int main(int argc, char **argv)
/*<
* Start workers. We start from 1, worker 0 will be running in the main thread.
*/
for (i = 1; i < n_threads - 1; i++)
for (i = 1; i < n_threads; i++)
{
MXS_WORKER* worker = mxs_worker_get(i);
ss_dassert(worker);
@ -2041,7 +2041,7 @@ int main(int argc, char **argv)
/*<
* Wait for worker threads to exit.
*/
for (i = 1; i < n_threads - 1; i++)
for (i = 1; i < n_threads; i++)
{
MXS_WORKER *worker = mxs_worker_get(i);
ss_dassert(worker);
@ -2119,7 +2119,7 @@ int maxscale_shutdown()
if (n == 0)
{
service_shutdown();
poll_shutdown();
mxs_worker_shutdown_workers();
hkshutdown();
log_flush_shutdown();
}

View File

@ -48,7 +48,7 @@ enum poll_message
};
void poll_init();
void poll_shutdown();
//void poll_finish(); // TODO: Add this.
void poll_waitevents(struct mxs_worker *worker);
void poll_set_maxwait(unsigned int);

View File

@ -59,4 +59,38 @@ bool mxs_worker_start(MXS_WORKER* worker);
*/
void mxs_worker_join(MXS_WORKER* worker);
/**
* Initate shutdown of worker.
*
* @param worker The worker that should be shutdown.
*
* @attention A call to this function will only initiate the shutdowm,
* the worker will not have shut down when the function returns.
*
* @attention This function is signal safe.
*/
void mxs_worker_shutdown(MXS_WORKER* worker);
/**
* Initate shutdown of all workers.
*
* @attention A call to this function will only initiate the shutdowm,
* the workers will not have shut down when the function returns.
*
* @attention This function is signal safe.
*/
void mxs_worker_shutdown_workers();
/**
* Query whether worker should shutdown.
*
* @param worker The worker in question.
*
* @return True, if the worker should shut down, false otherwise.
*/
static inline bool mxs_worker_should_shutdown(MXS_WORKER* worker)
{
return worker->should_shutdown;
}
MXS_END_DECLS

View File

@ -32,7 +32,6 @@
#include <maxscale/log_manager.h>
#include <maxscale/platform.h>
#include <maxscale/query_classifier.h>
#include <maxscale/worker.h>
#include <maxscale/resultset.h>
#include <maxscale/server.h>
#include <maxscale/session.h>
@ -41,6 +40,7 @@
#include <maxscale/utils.h>
#include "maxscale/poll.h"
#include "maxscale/worker.h"
#define PROFILE_POLL 0
@ -88,7 +88,6 @@ int max_poll_sleep;
thread_local int current_thread_id; /**< This thread's ID */
static int *epoll_fd; /*< The epoll file descriptor */
static int next_epoll_fd = 0; /*< Which thread handles the next DCB */
static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */
/** Poll cross-thread messaging variables */
static volatile int *poll_msg;
@ -565,7 +564,7 @@ poll_waitevents(MXS_WORKER *worker)
thread_data[thread_id].state = THREAD_IDLE;
while (1)
while (!mxs_worker_should_shutdown(worker))
{
atomic_add(&n_waiting, 1);
#if MUTEX_EPOLL
@ -737,19 +736,9 @@ poll_waitevents(MXS_WORKER *worker)
poll_check_message();
thread_data[thread_id].state = THREAD_IDLE;
if (do_shutdown)
{
/*<
* Remove the thread from the bitmask of running
* polling threads.
*/
thread_data[thread_id].state = THREAD_STOPPED;
return;
}
thread_data[thread_id].state = THREAD_IDLE;
} /*< while(1) */
thread_data[thread_id].state = THREAD_STOPPED;
}
/**
@ -781,7 +770,6 @@ poll_set_maxwait(unsigned int maxwait)
}
/**
<<<<<<< 26336f2003f9cd9321faa7e0d7c0bc770c82f8c9
* Process of the queue of DCB's that have outstanding events
*
* The first event on the queue will be chosen to be executed by this thread,
@ -1030,8 +1018,6 @@ poll_dcb_session_check(DCB *dcb, const char *function)
}
/**
=======
>>>>>>> Move DCB specific event handling to dcb.c
* Shutdown the polling loop
*/
void

View File

@ -95,7 +95,7 @@ test1()
ss_dfprintf(stderr, "\t..done\nStart wait for events.");
sleep(10);
poll_shutdown();
//TODO, fix this for workers: poll_shutdown();
ss_dfprintf(stderr, "\t..done\nTidy up.");
dcb_close(dcb);
ss_dfprintf(stderr, "\t..done\n");

View File

@ -99,6 +99,8 @@ MXS_WORKER* mxs_worker_get(int worker_id)
bool mxs_worker_post_message(MXS_WORKER *worker, int id, int64_t arg1, void* arg2)
{
// NOTE: No logging here, this function must be signal safe.
WORKER_MESSAGE message = { .id = id, .arg1 = arg1, .arg2 = arg2 };
ssize_t n = write(worker->write_fd, &message, sizeof(message));
@ -109,6 +111,8 @@ bool mxs_worker_post_message(MXS_WORKER *worker, int id, int64_t arg1, void* arg
void mxs_worker_main(MXS_WORKER* worker)
{
poll_waitevents(worker);
MXS_NOTICE("Worker %d has shut down.", worker->id);
}
bool mxs_worker_start(MXS_WORKER* worker)
@ -125,11 +129,38 @@ void mxs_worker_join(MXS_WORKER* worker)
{
if (worker->started)
{
MXS_NOTICE("Waiting for worker %d.", worker->id);
thread_wait(worker->thread);
MXS_NOTICE("Waited for worker %d.", worker->id);
worker->started = false;
}
}
void mxs_worker_shutdown(MXS_WORKER* worker)
{
// NOTE: No logging here, this function must be signal safe.
if (!worker->shutdown_initiated)
{
if (mxs_worker_post_message(worker, MXS_WORKER_MSG_SHUTDOWN, 0, NULL))
{
worker->shutdown_initiated = true;
}
}
}
void mxs_worker_shutdown_workers()
{
// NOTE: No logging here, this function must be signal safe.
for (int i = 0; i < this_unit.n_workers; ++i)
{
MXS_WORKER* worker = this_unit.workers[i];
mxs_worker_shutdown(worker);
}
}
/**
* Creates a worker instance.
* - Allocates the structure.
@ -218,6 +249,11 @@ static void worker_message_handler(MXS_WORKER *worker, int msg_id, int64_t arg1,
}
break;
case MXS_WORKER_MSG_SHUTDOWN:
MXS_NOTICE("Worker %d received shutdown message.", worker->id);
worker->should_shutdown = true;
break;
default:
MXS_ERROR("Worker received unknown message %d.", msg_id);
}
@ -274,7 +310,7 @@ static uint32_t worker_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t
ss_dassert(!true);
}
}
while ((n != 0) && (n != 1));
while ((n != 0) && (n != -1));
rc = MXS_POLL_READ;
}