diff --git a/include/maxscale/limits.h b/include/maxscale/limits.h index c167bb044..b5fbebfcb 100644 --- a/include/maxscale/limits.h +++ b/include/maxscale/limits.h @@ -64,11 +64,15 @@ MXS_BEGIN_DECLS /** * MXS_MAX_THREADS * - * Thread information is stored in a bitmask whose size must be a - * multiple of 8. The bitmask is indexed using the thread id that start - * from 1. Hence, the hard maximum number of threads must be a - * multiple of 8 minus 1. + * The maximum number of threads/workers. */ -#define MXS_MAX_THREADS 255 +#define MXS_MAX_THREADS 128 + +/** + * MXS_MAX_ROUTING_THREADS + * + * The maximum number of routing threads/workers. + */ +#define MXS_MAX_ROUTING_THREADS 100 MXS_END_DECLS diff --git a/server/core/config.cc b/server/core/config.cc index 0b23d3e20..5f44fed37 100644 --- a/server/core/config.cc +++ b/server/core/config.cc @@ -1481,12 +1481,12 @@ handle_global_item(const char *name, const char *value) } } - if (gateway.n_threads > MXS_MAX_THREADS) + if (gateway.n_threads > MXS_MAX_ROUTING_THREADS) { MXS_WARNING("Number of threads set to %d, which is greater than the " "hard maximum of %d. Number of threads adjusted down " - "accordingly.", gateway.n_threads, MXS_MAX_THREADS); - gateway.n_threads = MXS_MAX_THREADS; + "accordingly.", gateway.n_threads, MXS_MAX_ROUTING_THREADS); + gateway.n_threads = MXS_MAX_ROUTING_THREADS; } } else if (strcmp(name, CN_THREAD_STACK_SIZE) == 0) diff --git a/server/core/routingworker.cc b/server/core/routingworker.cc index 777cf5111..e945c40e1 100644 --- a/server/core/routingworker.cc +++ b/server/core/routingworker.cc @@ -25,7 +25,7 @@ #include #include #include -#include +#include #include #include #include @@ -67,6 +67,8 @@ struct this_unit int max_poll_sleep; // Maximum block time int epoll_listener_fd; // Shared epoll descriptor for listening descriptors. int id_main_worker; // The id of the worker running in the main thread. + int id_min_worker; // The smallest routing worker id. + int id_max_worker; // The largest routing worker id. } this_unit = { false, // initialized @@ -75,7 +77,9 @@ struct this_unit 0, // number_poll_spins 0, // max_poll_sleep -1, // epoll_listener_fd - WORKER_ABSENT_ID // id_main_worker + WORKER_ABSENT_ID, // id_main_worker + WORKER_ABSENT_ID, // id_min_worker + WORKER_ABSENT_ID, // id_max_worker }; thread_local struct this_thread @@ -180,11 +184,14 @@ bool RoutingWorker::init() if (this_unit.epoll_listener_fd != -1) { int nWorkers = config_threadcount(); - RoutingWorker** ppWorkers = new (std::nothrow) RoutingWorker* [nWorkers] (); // Zero inited array + RoutingWorker** ppWorkers = new (std::nothrow) RoutingWorker* [MXS_MAX_THREADS] (); // 0-inited array if (ppWorkers) { int id_main_worker = WORKER_ABSENT_ID; + int id_min_worker = INT_MAX; + int id_max_worker = INT_MIN; + int i; for (i = 0; i < nWorkers; ++i) { @@ -192,10 +199,22 @@ bool RoutingWorker::init() if (pWorker) { + int id = pWorker->id(); + // The first created worker will be the main worker. if (id_main_worker == WORKER_ABSENT_ID) { - id_main_worker = pWorker->id(); + id_main_worker = id; + } + + if (id < id_min_worker) + { + id_min_worker = id; + } + + if (id > id_max_worker) + { + id_max_worker = id; } ppWorkers[i] = pWorker; @@ -218,6 +237,8 @@ bool RoutingWorker::init() this_unit.ppWorkers = ppWorkers; this_unit.nWorkers = nWorkers; this_unit.id_main_worker = id_main_worker; + this_unit.id_min_worker = id_min_worker; + this_unit.id_max_worker = id_max_worker; this_unit.initialized = true; } @@ -250,9 +271,10 @@ void RoutingWorker::finish() { ss_dassert(this_unit.initialized); - for (int i = this_unit.nWorkers - 1; i >= 0; --i) + for (int i = this_unit.id_max_worker; i >= this_unit.id_min_worker; --i) { RoutingWorker* pWorker = this_unit.ppWorkers[i]; + ss_dassert(pWorker); delete pWorker; this_unit.ppWorkers[i] = NULL; @@ -329,7 +351,7 @@ RoutingWorker* RoutingWorker::get(int worker_id) worker_id = this_unit.id_main_worker; } - ss_dassert(worker_id < this_unit.nWorkers); + ss_dassert((worker_id >= this_unit.id_min_worker) && (worker_id <= this_unit.id_max_worker)); return this_unit.ppWorkers[worker_id]; } @@ -359,19 +381,22 @@ bool RoutingWorker::start_threaded_workers() bool rv = true; size_t stack_size = config_thread_stack_size(); - // The first RoutingWorker will be run in the main thread, so - // we start from 1 and not 0. - for (int i = 1; i < this_unit.nWorkers; ++i) + for (int i = this_unit.id_min_worker; i <= this_unit.id_max_worker; ++i) { - RoutingWorker* pWorker = this_unit.ppWorkers[i]; - ss_dassert(pWorker); - - if (!pWorker->start(stack_size)) + // The main RoutingWorker will run in the main thread, so + // we exclude that. + if (i != this_unit.id_main_worker) { - MXS_ALERT("Could not start routing worker %d of %d.", i, this_unit.nWorkers); - rv = false; - // At startup, so we don't even try to clean up. - break; + RoutingWorker* pWorker = this_unit.ppWorkers[i]; + ss_dassert(pWorker); + + if (!pWorker->start(stack_size)) + { + MXS_ALERT("Could not start routing worker %d of %d.", i, config_threadcount()); + rv = false; + // At startup, so we don't even try to clean up. + break; + } } } @@ -381,12 +406,15 @@ bool RoutingWorker::start_threaded_workers() //static void RoutingWorker::join_threaded_workers() { - for (int i = 1; i < this_unit.nWorkers; i++) + for (int i = this_unit.id_min_worker; i <= this_unit.id_max_worker; i++) { - RoutingWorker* pWorker = this_unit.ppWorkers[i]; - ss_dassert(pWorker); + if (i != this_unit.id_main_worker) + { + RoutingWorker* pWorker = this_unit.ppWorkers[i]; + ss_dassert(pWorker); - pWorker->join(); + pWorker->join(); + } } } diff --git a/server/core/worker.cc b/server/core/worker.cc index c8f380e9e..67326454f 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -57,13 +58,11 @@ const int MXS_WORKER_MSG_DISPOSABLE_TASK = -2; struct this_unit { bool initialized; // Whether the initialization has been performed. - int n_workers; // How many workers there are. Worker** ppWorkers; // Array of worker instances. int next_worker_id; // Next worker id. } this_unit = { false, // initialized - 0, // n_workers NULL, // ppWorkers 0, // next_worker_id }; @@ -197,13 +196,11 @@ bool Worker::init() { ss_dassert(!this_unit.initialized); - int n_workers = config_threadcount(); - Worker** ppWorkers = new (std::nothrow) Worker* [n_workers] (); // Zero initialized array + Worker** ppWorkers = new (std::nothrow) Worker* [MXS_MAX_THREADS] (); // Zero initialized array if (ppWorkers) { this_unit.ppWorkers = ppWorkers; - this_unit.n_workers = n_workers; this_unit.initialized = true; } @@ -233,7 +230,9 @@ int64_t one_stats_get(int64_t Worker::STATISTICS::*what, enum ts_stats_type type { int64_t best = type == TS_STATS_MAX ? LONG_MIN : (type == TS_STATS_MIX ? LONG_MAX : 0); - for (int i = 0; i < this_unit.n_workers; ++i) + int nWorkers = this_unit.next_worker_id; + + for (int i = 0; i < nWorkers; ++i) { Worker* pWorker = Worker::get(i); ss_dassert(pWorker); @@ -265,7 +264,7 @@ int64_t one_stats_get(int64_t Worker::STATISTICS::*what, enum ts_stats_type type } } - return type == TS_STATS_AVG ? best / this_unit.n_workers : best; + return type == TS_STATS_AVG ? best / (nWorkers != 0 ? nWorkers : 1) : best; } } @@ -291,7 +290,7 @@ Worker::STATISTICS Worker::get_statistics() for (int i = 0; i < Worker::STATISTICS::MAXNFDS - 1; i++) { - for (int j = 0; j < this_unit.n_workers; ++j) + for (int j = 0; j < this_unit.next_worker_id; ++j) { Worker* pWorker = Worker::get(j); ss_dassert(pWorker); @@ -302,7 +301,9 @@ Worker::STATISTICS Worker::get_statistics() for (int i = 0; i <= Worker::STATISTICS::N_QUEUE_TIMES; ++i) { - for (int j = 0; j < this_unit.n_workers; ++j) + int nWorkers = this_unit.next_worker_id; + + for (int j = 0; j < nWorkers; ++j) { Worker* pWorker = Worker::get(j); ss_dassert(pWorker); @@ -311,8 +312,8 @@ Worker::STATISTICS Worker::get_statistics() cs.exectimes[i] += pWorker->statistics().exectimes[i]; } - cs.qtimes[i] /= this_unit.n_workers; - cs.exectimes[i] /= this_unit.n_workers; + cs.qtimes[i] /= (nWorkers != 0 ? nWorkers : 1); + cs.exectimes[i] /= (nWorkers != 0 ? nWorkers : 1); } return cs; @@ -440,7 +441,7 @@ bool Worker::remove_fd(int fd) Worker* Worker::get(int worker_id) { - ss_dassert(worker_id < this_unit.n_workers); + ss_dassert((worker_id >= 0) && (worker_id < this_unit.next_worker_id)); return this_unit.ppWorkers[worker_id]; } @@ -528,9 +529,11 @@ size_t Worker::broadcast(Task* pTask, Semaphore* pSem) // No logging here, function must be signal safe. size_t n = 0; - for (int i = 0; i < this_unit.n_workers; ++i) + int nWorkers = this_unit.next_worker_id; + for (int i = 0; i < nWorkers; ++i) { Worker* pWorker = this_unit.ppWorkers[i]; + ss_dassert(pWorker); if (pWorker->post(pTask, pSem)) { @@ -549,9 +552,11 @@ size_t Worker::broadcast(std::auto_ptr sTask) size_t n = 0; - for (int i = 0; i < this_unit.n_workers; ++i) + int nWorkers = this_unit.next_worker_id; + for (int i = 0; i < nWorkers; ++i) { Worker* pWorker = this_unit.ppWorkers[i]; + ss_dassert(pWorker); if (pWorker->post_disposable(pTask)) { @@ -570,9 +575,11 @@ size_t Worker::execute_serially(Task& task) Semaphore sem; size_t n = 0; - for (int i = 0; i < this_unit.n_workers; ++i) + int nWorkers = this_unit.next_worker_id; + for (int i = 0; i < nWorkers; ++i) { Worker* pWorker = this_unit.ppWorkers[i]; + ss_dassert(pWorker); if (pWorker->post(&task, &sem)) { @@ -605,9 +612,11 @@ size_t Worker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) size_t n = 0; - for (int i = 0; i < this_unit.n_workers; ++i) + int nWorkers = this_unit.next_worker_id; + for (int i = 0; i < nWorkers; ++i) { Worker* pWorker = this_unit.ppWorkers[i]; + ss_dassert(pWorker); if (pWorker->post_message(msg_id, arg1, arg2)) { @@ -672,9 +681,10 @@ void Worker::shutdown() void Worker::shutdown_all() { // NOTE: No logging here, this function must be signal safe. - ss_dassert((this_unit.n_workers == 0) || (this_unit.ppWorkers != NULL)); + ss_dassert((this_unit.next_worker_id == 0) || (this_unit.ppWorkers != NULL)); - for (int i = 0; i < this_unit.n_workers; ++i) + int nWorkers = this_unit.next_worker_id; + for (int i = 0; i < nWorkers; ++i) { Worker* pWorker = this_unit.ppWorkers[i]; ss_dassert(pWorker);