Move worker thread management to worker
The worker threads are now started by the workers themselves.
This commit is contained in:
@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
#include <maxscale/cdefs.h>
|
#include <maxscale/cdefs.h>
|
||||||
#include <maxscale/poll.h>
|
#include <maxscale/poll.h>
|
||||||
|
#include <maxscale/thread.h>
|
||||||
|
|
||||||
MXS_BEGIN_DECLS
|
MXS_BEGIN_DECLS
|
||||||
|
|
||||||
@ -23,6 +24,8 @@ typedef struct mxs_worker
|
|||||||
int id; /*< The id of the worker. */
|
int id; /*< The id of the worker. */
|
||||||
int read_fd; /*< The file descriptor the worked reads from. */
|
int read_fd; /*< The file descriptor the worked reads from. */
|
||||||
int write_fd; /*< The file descriptor used for sending data to the worker. */
|
int write_fd; /*< The file descriptor used for sending data to the worker. */
|
||||||
|
THREAD thread; /*< The thread handle of the worker. */
|
||||||
|
bool started; /*< Whether the thread has been started or not. */
|
||||||
} MXS_WORKER;
|
} MXS_WORKER;
|
||||||
|
|
||||||
enum mxs_worker_msg_id
|
enum mxs_worker_msg_id
|
||||||
|
@ -64,7 +64,6 @@
|
|||||||
#include <maxscale/query_classifier.h>
|
#include <maxscale/query_classifier.h>
|
||||||
#include <maxscale/server.h>
|
#include <maxscale/server.h>
|
||||||
#include <maxscale/session.h>
|
#include <maxscale/session.h>
|
||||||
#include <maxscale/thread.h>
|
|
||||||
#include <maxscale/utils.h>
|
#include <maxscale/utils.h>
|
||||||
#include <maxscale/version.h>
|
#include <maxscale/version.h>
|
||||||
#include <maxscale/random_jkiss.h>
|
#include <maxscale/random_jkiss.h>
|
||||||
@ -188,8 +187,6 @@ static bool daemonize();
|
|||||||
static bool sniff_configuration(const char* filepath);
|
static bool sniff_configuration(const char* filepath);
|
||||||
static bool modules_process_init();
|
static bool modules_process_init();
|
||||||
static void modules_process_finish();
|
static void modules_process_finish();
|
||||||
static bool modules_thread_init();
|
|
||||||
static void modules_thread_finish();
|
|
||||||
|
|
||||||
/** SSL multi-threading functions and structures */
|
/** SSL multi-threading functions and structures */
|
||||||
|
|
||||||
@ -1300,7 +1297,6 @@ int main(int argc, char **argv)
|
|||||||
int daemon_pipe[2] = { -1, -1};
|
int daemon_pipe[2] = { -1, -1};
|
||||||
bool parent_process;
|
bool parent_process;
|
||||||
int child_status;
|
int child_status;
|
||||||
THREAD* threads = NULL; /*< thread list */
|
|
||||||
char* cnf_file_path = NULL; /*< conf file, to be freed */
|
char* cnf_file_path = NULL; /*< conf file, to be freed */
|
||||||
char* cnf_file_arg = NULL; /*< conf filename from cmd-line arg */
|
char* cnf_file_arg = NULL; /*< conf filename from cmd-line arg */
|
||||||
THREAD log_flush_thr;
|
THREAD log_flush_thr;
|
||||||
@ -1318,6 +1314,7 @@ int main(int argc, char **argv)
|
|||||||
void (*exitfunp[4])(void) = { mxs_log_finish, cleanup_process_datadir, write_footer, NULL };
|
void (*exitfunp[4])(void) = { mxs_log_finish, cleanup_process_datadir, write_footer, NULL };
|
||||||
MXS_CONFIG* cnf = NULL;
|
MXS_CONFIG* cnf = NULL;
|
||||||
int numlocks = 0;
|
int numlocks = 0;
|
||||||
|
MXS_WORKER* worker;
|
||||||
|
|
||||||
*syslog_enabled = 1;
|
*syslog_enabled = 1;
|
||||||
*maxlog_enabled = 1;
|
*maxlog_enabled = 1;
|
||||||
@ -2001,14 +1998,16 @@ int main(int argc, char **argv)
|
|||||||
* configured as the main thread will also poll.
|
* configured as the main thread will also poll.
|
||||||
*/
|
*/
|
||||||
n_threads = config_threadcount();
|
n_threads = config_threadcount();
|
||||||
threads = (THREAD*)MXS_CALLOC(n_threads, sizeof(THREAD));
|
|
||||||
/*<
|
/*<
|
||||||
* Start server threads.
|
* Start workers. We start from 1, worker 0 will be running in the main thread.
|
||||||
*/
|
*/
|
||||||
for (thread_id = 0; thread_id < n_threads - 1; thread_id++)
|
for (i = 1; i < n_threads - 1; i++)
|
||||||
{
|
{
|
||||||
if (thread_start(&threads[thread_id], worker_thread_main,
|
MXS_WORKER* worker = mxs_worker_get(i);
|
||||||
(void *)(thread_id + 1)) == NULL)
|
ss_dassert(worker);
|
||||||
|
|
||||||
|
if (!mxs_worker_start(worker))
|
||||||
{
|
{
|
||||||
const char* logerr = "Failed to start worker thread.";
|
const char* logerr = "Failed to start worker thread.";
|
||||||
print_log_n_stderr(true, true, logerr, logerr, 0);
|
print_log_n_stderr(true, true, logerr, logerr, 0);
|
||||||
@ -2028,9 +2027,11 @@ int main(int argc, char **argv)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*<
|
/*<
|
||||||
* Serve clients.
|
* Run worker 0 in the main thread.
|
||||||
*/
|
*/
|
||||||
poll_waitevents((void *)0);
|
worker = mxs_worker_get(0);
|
||||||
|
ss_dassert(worker);
|
||||||
|
mxs_worker_main(worker);
|
||||||
|
|
||||||
/*<
|
/*<
|
||||||
* Wait for the housekeeper to finish.
|
* Wait for the housekeeper to finish.
|
||||||
@ -2038,13 +2039,18 @@ int main(int argc, char **argv)
|
|||||||
hkfinish();
|
hkfinish();
|
||||||
|
|
||||||
/*<
|
/*<
|
||||||
* Wait server threads' completion.
|
* Wait for worker threads to exit.
|
||||||
*/
|
*/
|
||||||
for (thread_id = 0; thread_id < n_threads - 1; thread_id++)
|
for (i = 1; i < n_threads - 1; i++)
|
||||||
{
|
{
|
||||||
thread_wait(threads[thread_id]);
|
MXS_WORKER *worker = mxs_worker_get(i);
|
||||||
|
ss_dassert(worker);
|
||||||
|
|
||||||
|
mxs_worker_join(worker);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mxs_worker_finish();
|
||||||
|
|
||||||
/*<
|
/*<
|
||||||
* Destroy the router and filter instances of all services.
|
* Destroy the router and filter instances of all services.
|
||||||
*/
|
*/
|
||||||
@ -2093,10 +2099,6 @@ return_main:
|
|||||||
|
|
||||||
MXS_FREE(cnf_file_arg);
|
MXS_FREE(cnf_file_arg);
|
||||||
|
|
||||||
if (threads)
|
|
||||||
{
|
|
||||||
MXS_FREE(threads);
|
|
||||||
}
|
|
||||||
if (cnf_file_path)
|
if (cnf_file_path)
|
||||||
{
|
{
|
||||||
MXS_FREE(cnf_file_path);
|
MXS_FREE(cnf_file_path);
|
||||||
@ -2908,69 +2910,3 @@ static void modules_process_finish()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Calls thread_init on all loaded modules.
|
|
||||||
*
|
|
||||||
* @return True, if all modules were successfully initialized.
|
|
||||||
*/
|
|
||||||
static bool modules_thread_init()
|
|
||||||
{
|
|
||||||
bool initialized = false;
|
|
||||||
|
|
||||||
MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL);
|
|
||||||
MXS_MODULE* module = NULL;
|
|
||||||
|
|
||||||
while ((module = mxs_module_iterator_get_next(&i)) != NULL)
|
|
||||||
{
|
|
||||||
if (module->thread_init)
|
|
||||||
{
|
|
||||||
int rc = (module->thread_init)();
|
|
||||||
|
|
||||||
if (rc != 0)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (module)
|
|
||||||
{
|
|
||||||
// If module is non-NULL it means that the initialization failed for
|
|
||||||
// that module. We now need to call finish on all modules that were
|
|
||||||
// successfully initialized.
|
|
||||||
MXS_MODULE* failed_module = module;
|
|
||||||
i = mxs_module_iterator_get(NULL);
|
|
||||||
|
|
||||||
while ((module = mxs_module_iterator_get_next(&i)) != failed_module)
|
|
||||||
{
|
|
||||||
if (module->thread_finish)
|
|
||||||
{
|
|
||||||
(module->thread_finish)();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
initialized = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return initialized;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Calls thread_finish on all loaded modules.
|
|
||||||
*/
|
|
||||||
static void modules_thread_finish()
|
|
||||||
{
|
|
||||||
MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL);
|
|
||||||
MXS_MODULE* module = NULL;
|
|
||||||
|
|
||||||
while ((module = mxs_module_iterator_get_next(&i)) != NULL)
|
|
||||||
{
|
|
||||||
if (module->thread_finish)
|
|
||||||
{
|
|
||||||
(module->thread_finish)();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -24,4 +24,39 @@ MXS_BEGIN_DECLS
|
|||||||
*/
|
*/
|
||||||
void mxs_worker_init();
|
void mxs_worker_init();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finalize the worker mechanism.
|
||||||
|
*
|
||||||
|
* To be called once at process shutdown. This will cause all workers
|
||||||
|
* to be destroyed. When the function is called, no worker should be
|
||||||
|
* running anymore.
|
||||||
|
*/
|
||||||
|
void mxs_worker_finish();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Main function of worker.
|
||||||
|
*
|
||||||
|
* This worker will run the poll loop, until it is told to shut down.
|
||||||
|
*
|
||||||
|
* @param worker The worker.
|
||||||
|
*/
|
||||||
|
void mxs_worker_main(MXS_WORKER* worker);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start worker in separate thread.
|
||||||
|
*
|
||||||
|
* This function will start a new thread, in which the `mxs_worker_main`
|
||||||
|
* function will be executed.
|
||||||
|
*
|
||||||
|
* @return True if the thread could be started, false otherwise.
|
||||||
|
*/
|
||||||
|
bool mxs_worker_start(MXS_WORKER* worker);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Waits for the worker to finish.
|
||||||
|
*
|
||||||
|
* @param worker The worker to wait for.
|
||||||
|
*/
|
||||||
|
void mxs_worker_join(MXS_WORKER* worker);
|
||||||
|
|
||||||
MXS_END_DECLS
|
MXS_END_DECLS
|
||||||
|
@ -19,6 +19,8 @@
|
|||||||
#include <maxscale/alloc.h>
|
#include <maxscale/alloc.h>
|
||||||
#include <maxscale/config.h>
|
#include <maxscale/config.h>
|
||||||
#include <maxscale/log_manager.h>
|
#include <maxscale/log_manager.h>
|
||||||
|
#include "maxscale/modules.h"
|
||||||
|
#include "maxscale/poll.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit variables.
|
* Unit variables.
|
||||||
@ -44,6 +46,10 @@ static MXS_WORKER* worker_create(int worker_id);
|
|||||||
static void worker_free(MXS_WORKER* worker);
|
static void worker_free(MXS_WORKER* worker);
|
||||||
static void worker_message_handler(MXS_WORKER* worker, int msg_id, int64_t arg1, void* arg2);
|
static void worker_message_handler(MXS_WORKER* worker, int msg_id, int64_t arg1, void* arg2);
|
||||||
static uint32_t worker_poll_handler(MXS_POLL_DATA *data, int worker_id, uint32_t events);
|
static uint32_t worker_poll_handler(MXS_POLL_DATA *data, int worker_id, uint32_t events);
|
||||||
|
static void worker_thread_main(void* arg);
|
||||||
|
|
||||||
|
static bool modules_thread_init();
|
||||||
|
static void modules_thread_finish();
|
||||||
|
|
||||||
|
|
||||||
void mxs_worker_init()
|
void mxs_worker_init()
|
||||||
@ -73,6 +79,17 @@ void mxs_worker_init()
|
|||||||
MXS_NOTICE("Workers created!");
|
MXS_NOTICE("Workers created!");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void mxs_worker_finish()
|
||||||
|
{
|
||||||
|
for (int i = 0; i < this_unit.n_workers; ++i)
|
||||||
|
{
|
||||||
|
MXS_WORKER* worker = this_unit.workers[i];
|
||||||
|
|
||||||
|
worker_free(worker);
|
||||||
|
this_unit.workers[i] = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
MXS_WORKER* mxs_worker_get(int worker_id)
|
MXS_WORKER* mxs_worker_get(int worker_id)
|
||||||
{
|
{
|
||||||
ss_dassert(worker_id < this_unit.n_workers);
|
ss_dassert(worker_id < this_unit.n_workers);
|
||||||
@ -89,6 +106,30 @@ bool mxs_worker_post_message(MXS_WORKER *worker, int id, int64_t arg1, void* arg
|
|||||||
return n == sizeof(message) ? true : false;
|
return n == sizeof(message) ? true : false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void mxs_worker_main(MXS_WORKER* worker)
|
||||||
|
{
|
||||||
|
poll_waitevents((void*)(intptr_t)worker->id);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool mxs_worker_start(MXS_WORKER* worker)
|
||||||
|
{
|
||||||
|
if (thread_start(&worker->thread, worker_thread_main, worker))
|
||||||
|
{
|
||||||
|
worker->started = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return worker->started;
|
||||||
|
}
|
||||||
|
|
||||||
|
void mxs_worker_join(MXS_WORKER* worker)
|
||||||
|
{
|
||||||
|
if (worker->started)
|
||||||
|
{
|
||||||
|
thread_wait(worker->thread);
|
||||||
|
worker->started = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a worker instance.
|
* Creates a worker instance.
|
||||||
* - Allocates the structure.
|
* - Allocates the structure.
|
||||||
@ -146,6 +187,9 @@ static void worker_free(MXS_WORKER* worker)
|
|||||||
{
|
{
|
||||||
if (worker)
|
if (worker)
|
||||||
{
|
{
|
||||||
|
ss_dassert(!worker->started);
|
||||||
|
|
||||||
|
poll_remove_fd_from_worker(worker->id, worker->read_fd);
|
||||||
close(worker->read_fd);
|
close(worker->read_fd);
|
||||||
close(worker->write_fd);
|
close(worker->write_fd);
|
||||||
|
|
||||||
@ -237,3 +281,90 @@ static uint32_t worker_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t
|
|||||||
|
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The entry point of each worker thread.
|
||||||
|
*
|
||||||
|
* @param arg A worker.
|
||||||
|
*/
|
||||||
|
static void worker_thread_main(void* arg)
|
||||||
|
{
|
||||||
|
if (modules_thread_init())
|
||||||
|
{
|
||||||
|
MXS_WORKER *worker = (MXS_WORKER*)arg;
|
||||||
|
|
||||||
|
mxs_worker_main(worker);
|
||||||
|
|
||||||
|
modules_thread_finish();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_ERROR("Could not perform thread initialization for all modules. Thread exits.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calls thread_init on all loaded modules.
|
||||||
|
*
|
||||||
|
* @return True, if all modules were successfully initialized.
|
||||||
|
*/
|
||||||
|
static bool modules_thread_init()
|
||||||
|
{
|
||||||
|
bool initialized = false;
|
||||||
|
|
||||||
|
MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL);
|
||||||
|
MXS_MODULE* module = NULL;
|
||||||
|
|
||||||
|
while ((module = mxs_module_iterator_get_next(&i)) != NULL)
|
||||||
|
{
|
||||||
|
if (module->thread_init)
|
||||||
|
{
|
||||||
|
int rc = (module->thread_init)();
|
||||||
|
|
||||||
|
if (rc != 0)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (module)
|
||||||
|
{
|
||||||
|
// If module is non-NULL it means that the initialization failed for
|
||||||
|
// that module. We now need to call finish on all modules that were
|
||||||
|
// successfully initialized.
|
||||||
|
MXS_MODULE* failed_module = module;
|
||||||
|
i = mxs_module_iterator_get(NULL);
|
||||||
|
|
||||||
|
while ((module = mxs_module_iterator_get_next(&i)) != failed_module)
|
||||||
|
{
|
||||||
|
if (module->thread_finish)
|
||||||
|
{
|
||||||
|
(module->thread_finish)();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
initialized = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return initialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calls thread_finish on all loaded modules.
|
||||||
|
*/
|
||||||
|
static void modules_thread_finish()
|
||||||
|
{
|
||||||
|
MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL);
|
||||||
|
MXS_MODULE* module = NULL;
|
||||||
|
|
||||||
|
while ((module = mxs_module_iterator_get_next(&i)) != NULL)
|
||||||
|
{
|
||||||
|
if (module->thread_finish)
|
||||||
|
{
|
||||||
|
(module->thread_finish)();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user