diff --git a/include/maxscale/worker.h b/include/maxscale/worker.h index 102db5466..e8819b4e6 100644 --- a/include/maxscale/worker.h +++ b/include/maxscale/worker.h @@ -14,6 +14,7 @@ #include #include +#include MXS_BEGIN_DECLS @@ -23,6 +24,8 @@ typedef struct mxs_worker int id; /*< The id of the worker. */ int read_fd; /*< The file descriptor the worked reads from. */ int write_fd; /*< The file descriptor used for sending data to the worker. */ + THREAD thread; /*< The thread handle of the worker. */ + bool started; /*< Whether the thread has been started or not. */ } MXS_WORKER; enum mxs_worker_msg_id diff --git a/server/core/gateway.cc b/server/core/gateway.cc index 220238093..35581c663 100644 --- a/server/core/gateway.cc +++ b/server/core/gateway.cc @@ -64,7 +64,6 @@ #include #include #include -#include #include #include #include @@ -188,8 +187,6 @@ static bool daemonize(); static bool sniff_configuration(const char* filepath); static bool modules_process_init(); static void modules_process_finish(); -static bool modules_thread_init(); -static void modules_thread_finish(); /** SSL multi-threading functions and structures */ @@ -1300,7 +1297,6 @@ int main(int argc, char **argv) int daemon_pipe[2] = { -1, -1}; bool parent_process; int child_status; - THREAD* threads = NULL; /*< thread list */ char* cnf_file_path = NULL; /*< conf file, to be freed */ char* cnf_file_arg = NULL; /*< conf filename from cmd-line arg */ THREAD log_flush_thr; @@ -1318,6 +1314,7 @@ int main(int argc, char **argv) void (*exitfunp[4])(void) = { mxs_log_finish, cleanup_process_datadir, write_footer, NULL }; MXS_CONFIG* cnf = NULL; int numlocks = 0; + MXS_WORKER* worker; *syslog_enabled = 1; *maxlog_enabled = 1; @@ -2001,14 +1998,16 @@ int main(int argc, char **argv) * configured as the main thread will also poll. */ n_threads = config_threadcount(); - threads = (THREAD*)MXS_CALLOC(n_threads, sizeof(THREAD)); + /*< - * Start server threads. + * Start workers. We start from 1, worker 0 will be running in the main thread. */ - for (thread_id = 0; thread_id < n_threads - 1; thread_id++) + for (i = 1; i < n_threads - 1; i++) { - if (thread_start(&threads[thread_id], worker_thread_main, - (void *)(thread_id + 1)) == NULL) + MXS_WORKER* worker = mxs_worker_get(i); + ss_dassert(worker); + + if (!mxs_worker_start(worker)) { const char* logerr = "Failed to start worker thread."; print_log_n_stderr(true, true, logerr, logerr, 0); @@ -2028,9 +2027,11 @@ int main(int argc, char **argv) } /*< - * Serve clients. + * Run worker 0 in the main thread. */ - poll_waitevents((void *)0); + worker = mxs_worker_get(0); + ss_dassert(worker); + mxs_worker_main(worker); /*< * Wait for the housekeeper to finish. @@ -2038,13 +2039,18 @@ int main(int argc, char **argv) hkfinish(); /*< - * Wait server threads' completion. + * Wait for worker threads to exit. */ - for (thread_id = 0; thread_id < n_threads - 1; thread_id++) + for (i = 1; i < n_threads - 1; i++) { - thread_wait(threads[thread_id]); + MXS_WORKER *worker = mxs_worker_get(i); + ss_dassert(worker); + + mxs_worker_join(worker); } + mxs_worker_finish(); + /*< * Destroy the router and filter instances of all services. */ @@ -2093,10 +2099,6 @@ return_main: MXS_FREE(cnf_file_arg); - if (threads) - { - MXS_FREE(threads); - } if (cnf_file_path) { MXS_FREE(cnf_file_path); @@ -2908,69 +2910,3 @@ static void modules_process_finish() } } } - -/** - * Calls thread_init on all loaded modules. - * - * @return True, if all modules were successfully initialized. - */ -static bool modules_thread_init() -{ - bool initialized = false; - - MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL); - MXS_MODULE* module = NULL; - - while ((module = mxs_module_iterator_get_next(&i)) != NULL) - { - if (module->thread_init) - { - int rc = (module->thread_init)(); - - if (rc != 0) - { - break; - } - } - } - - if (module) - { - // If module is non-NULL it means that the initialization failed for - // that module. We now need to call finish on all modules that were - // successfully initialized. - MXS_MODULE* failed_module = module; - i = mxs_module_iterator_get(NULL); - - while ((module = mxs_module_iterator_get_next(&i)) != failed_module) - { - if (module->thread_finish) - { - (module->thread_finish)(); - } - } - } - else - { - initialized = true; - } - - return initialized; -} - -/** - * Calls thread_finish on all loaded modules. - */ -static void modules_thread_finish() -{ - MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL); - MXS_MODULE* module = NULL; - - while ((module = mxs_module_iterator_get_next(&i)) != NULL) - { - if (module->thread_finish) - { - (module->thread_finish)(); - } - } -} diff --git a/server/core/maxscale/worker.h b/server/core/maxscale/worker.h index 9d4be34f3..5b5297228 100644 --- a/server/core/maxscale/worker.h +++ b/server/core/maxscale/worker.h @@ -24,4 +24,39 @@ MXS_BEGIN_DECLS */ void mxs_worker_init(); +/** + * Finalize the worker mechanism. + * + * To be called once at process shutdown. This will cause all workers + * to be destroyed. When the function is called, no worker should be + * running anymore. + */ +void mxs_worker_finish(); + +/** + * Main function of worker. + * + * This worker will run the poll loop, until it is told to shut down. + * + * @param worker The worker. + */ +void mxs_worker_main(MXS_WORKER* worker); + +/** + * Start worker in separate thread. + * + * This function will start a new thread, in which the `mxs_worker_main` + * function will be executed. + * + * @return True if the thread could be started, false otherwise. + */ +bool mxs_worker_start(MXS_WORKER* worker); + +/** + * Waits for the worker to finish. + * + * @param worker The worker to wait for. + */ +void mxs_worker_join(MXS_WORKER* worker); + MXS_END_DECLS diff --git a/server/core/worker.c b/server/core/worker.c index 48fff9126..32061f7ad 100644 --- a/server/core/worker.c +++ b/server/core/worker.c @@ -19,6 +19,8 @@ #include #include #include +#include "maxscale/modules.h" +#include "maxscale/poll.h" /** * Unit variables. @@ -44,6 +46,10 @@ static MXS_WORKER* worker_create(int worker_id); static void worker_free(MXS_WORKER* worker); static void worker_message_handler(MXS_WORKER* worker, int msg_id, int64_t arg1, void* arg2); static uint32_t worker_poll_handler(MXS_POLL_DATA *data, int worker_id, uint32_t events); +static void worker_thread_main(void* arg); + +static bool modules_thread_init(); +static void modules_thread_finish(); void mxs_worker_init() @@ -73,6 +79,17 @@ void mxs_worker_init() MXS_NOTICE("Workers created!"); } +void mxs_worker_finish() +{ + for (int i = 0; i < this_unit.n_workers; ++i) + { + MXS_WORKER* worker = this_unit.workers[i]; + + worker_free(worker); + this_unit.workers[i] = NULL; + } +} + MXS_WORKER* mxs_worker_get(int worker_id) { ss_dassert(worker_id < this_unit.n_workers); @@ -89,6 +106,30 @@ bool mxs_worker_post_message(MXS_WORKER *worker, int id, int64_t arg1, void* arg return n == sizeof(message) ? true : false; } +void mxs_worker_main(MXS_WORKER* worker) +{ + poll_waitevents((void*)(intptr_t)worker->id); +} + +bool mxs_worker_start(MXS_WORKER* worker) +{ + if (thread_start(&worker->thread, worker_thread_main, worker)) + { + worker->started = true; + } + + return worker->started; +} + +void mxs_worker_join(MXS_WORKER* worker) +{ + if (worker->started) + { + thread_wait(worker->thread); + worker->started = false; + } +} + /** * Creates a worker instance. * - Allocates the structure. @@ -146,6 +187,9 @@ static void worker_free(MXS_WORKER* worker) { if (worker) { + ss_dassert(!worker->started); + + poll_remove_fd_from_worker(worker->id, worker->read_fd); close(worker->read_fd); close(worker->write_fd); @@ -237,3 +281,90 @@ static uint32_t worker_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t return rc; } + +/** + * The entry point of each worker thread. + * + * @param arg A worker. + */ +static void worker_thread_main(void* arg) +{ + if (modules_thread_init()) + { + MXS_WORKER *worker = (MXS_WORKER*)arg; + + mxs_worker_main(worker); + + modules_thread_finish(); + } + else + { + MXS_ERROR("Could not perform thread initialization for all modules. Thread exits."); + } +} + +/** + * Calls thread_init on all loaded modules. + * + * @return True, if all modules were successfully initialized. + */ +static bool modules_thread_init() +{ + bool initialized = false; + + MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL); + MXS_MODULE* module = NULL; + + while ((module = mxs_module_iterator_get_next(&i)) != NULL) + { + if (module->thread_init) + { + int rc = (module->thread_init)(); + + if (rc != 0) + { + break; + } + } + } + + if (module) + { + // If module is non-NULL it means that the initialization failed for + // that module. We now need to call finish on all modules that were + // successfully initialized. + MXS_MODULE* failed_module = module; + i = mxs_module_iterator_get(NULL); + + while ((module = mxs_module_iterator_get_next(&i)) != failed_module) + { + if (module->thread_finish) + { + (module->thread_finish)(); + } + } + } + else + { + initialized = true; + } + + return initialized; +} + +/** + * Calls thread_finish on all loaded modules. + */ +static void modules_thread_finish() +{ + MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL); + MXS_MODULE* module = NULL; + + while ((module = mxs_module_iterator_get_next(&i)) != NULL) + { + if (module->thread_finish) + { + (module->thread_finish)(); + } + } +}