MXS-1754 Enable workers other than routing workers

The maximum number of workers and routing workers are now
hardwired to 128 and 100, respectively. It is still so that
all workers must be created at startup and destroyed at
shutdown, creating/destorying workers at runtime is not
possible.
This commit is contained in:
Johan Wikman
2018-04-17 15:06:23 +03:00
parent 289f4990d6
commit c011b22046
4 changed files with 89 additions and 47 deletions

View File

@ -64,11 +64,15 @@ MXS_BEGIN_DECLS
/**
* MXS_MAX_THREADS
*
* Thread information is stored in a bitmask whose size must be a
* multiple of 8. The bitmask is indexed using the thread id that start
* from 1. Hence, the hard maximum number of threads must be a
* multiple of 8 minus 1.
* The maximum number of threads/workers.
*/
#define MXS_MAX_THREADS 255
#define MXS_MAX_THREADS 128
/**
* MXS_MAX_ROUTING_THREADS
*
* The maximum number of routing threads/workers.
*/
#define MXS_MAX_ROUTING_THREADS 100
MXS_END_DECLS

View File

@ -1481,12 +1481,12 @@ handle_global_item(const char *name, const char *value)
}
}
if (gateway.n_threads > MXS_MAX_THREADS)
if (gateway.n_threads > MXS_MAX_ROUTING_THREADS)
{
MXS_WARNING("Number of threads set to %d, which is greater than the "
"hard maximum of %d. Number of threads adjusted down "
"accordingly.", gateway.n_threads, MXS_MAX_THREADS);
gateway.n_threads = MXS_MAX_THREADS;
"accordingly.", gateway.n_threads, MXS_MAX_ROUTING_THREADS);
gateway.n_threads = MXS_MAX_ROUTING_THREADS;
}
}
else if (strcmp(name, CN_THREAD_STACK_SIZE) == 0)

View File

@ -25,7 +25,7 @@
#include <maxscale/atomic.h>
#include <maxscale/config.h>
#include <maxscale/clock.h>
#include <maxscale/log_manager.h>
#include <maxscale/limits.h>
#include <maxscale/platform.h>
#include <maxscale/semaphore.hh>
#include <maxscale/json_api.h>
@ -67,6 +67,8 @@ struct this_unit
int max_poll_sleep; // Maximum block time
int epoll_listener_fd; // Shared epoll descriptor for listening descriptors.
int id_main_worker; // The id of the worker running in the main thread.
int id_min_worker; // The smallest routing worker id.
int id_max_worker; // The largest routing worker id.
} this_unit =
{
false, // initialized
@ -75,7 +77,9 @@ struct this_unit
0, // number_poll_spins
0, // max_poll_sleep
-1, // epoll_listener_fd
WORKER_ABSENT_ID // id_main_worker
WORKER_ABSENT_ID, // id_main_worker
WORKER_ABSENT_ID, // id_min_worker
WORKER_ABSENT_ID, // id_max_worker
};
thread_local struct this_thread
@ -180,11 +184,14 @@ bool RoutingWorker::init()
if (this_unit.epoll_listener_fd != -1)
{
int nWorkers = config_threadcount();
RoutingWorker** ppWorkers = new (std::nothrow) RoutingWorker* [nWorkers] (); // Zero inited array
RoutingWorker** ppWorkers = new (std::nothrow) RoutingWorker* [MXS_MAX_THREADS] (); // 0-inited array
if (ppWorkers)
{
int id_main_worker = WORKER_ABSENT_ID;
int id_min_worker = INT_MAX;
int id_max_worker = INT_MIN;
int i;
for (i = 0; i < nWorkers; ++i)
{
@ -192,10 +199,22 @@ bool RoutingWorker::init()
if (pWorker)
{
int id = pWorker->id();
// The first created worker will be the main worker.
if (id_main_worker == WORKER_ABSENT_ID)
{
id_main_worker = pWorker->id();
id_main_worker = id;
}
if (id < id_min_worker)
{
id_min_worker = id;
}
if (id > id_max_worker)
{
id_max_worker = id;
}
ppWorkers[i] = pWorker;
@ -218,6 +237,8 @@ bool RoutingWorker::init()
this_unit.ppWorkers = ppWorkers;
this_unit.nWorkers = nWorkers;
this_unit.id_main_worker = id_main_worker;
this_unit.id_min_worker = id_min_worker;
this_unit.id_max_worker = id_max_worker;
this_unit.initialized = true;
}
@ -250,9 +271,10 @@ void RoutingWorker::finish()
{
ss_dassert(this_unit.initialized);
for (int i = this_unit.nWorkers - 1; i >= 0; --i)
for (int i = this_unit.id_max_worker; i >= this_unit.id_min_worker; --i)
{
RoutingWorker* pWorker = this_unit.ppWorkers[i];
ss_dassert(pWorker);
delete pWorker;
this_unit.ppWorkers[i] = NULL;
@ -329,7 +351,7 @@ RoutingWorker* RoutingWorker::get(int worker_id)
worker_id = this_unit.id_main_worker;
}
ss_dassert(worker_id < this_unit.nWorkers);
ss_dassert((worker_id >= this_unit.id_min_worker) && (worker_id <= this_unit.id_max_worker));
return this_unit.ppWorkers[worker_id];
}
@ -359,19 +381,22 @@ bool RoutingWorker::start_threaded_workers()
bool rv = true;
size_t stack_size = config_thread_stack_size();
// The first RoutingWorker will be run in the main thread, so
// we start from 1 and not 0.
for (int i = 1; i < this_unit.nWorkers; ++i)
for (int i = this_unit.id_min_worker; i <= this_unit.id_max_worker; ++i)
{
RoutingWorker* pWorker = this_unit.ppWorkers[i];
ss_dassert(pWorker);
if (!pWorker->start(stack_size))
// The main RoutingWorker will run in the main thread, so
// we exclude that.
if (i != this_unit.id_main_worker)
{
MXS_ALERT("Could not start routing worker %d of %d.", i, this_unit.nWorkers);
rv = false;
// At startup, so we don't even try to clean up.
break;
RoutingWorker* pWorker = this_unit.ppWorkers[i];
ss_dassert(pWorker);
if (!pWorker->start(stack_size))
{
MXS_ALERT("Could not start routing worker %d of %d.", i, config_threadcount());
rv = false;
// At startup, so we don't even try to clean up.
break;
}
}
}
@ -381,12 +406,15 @@ bool RoutingWorker::start_threaded_workers()
//static
void RoutingWorker::join_threaded_workers()
{
for (int i = 1; i < this_unit.nWorkers; i++)
for (int i = this_unit.id_min_worker; i <= this_unit.id_max_worker; i++)
{
RoutingWorker* pWorker = this_unit.ppWorkers[i];
ss_dassert(pWorker);
if (i != this_unit.id_main_worker)
{
RoutingWorker* pWorker = this_unit.ppWorkers[i];
ss_dassert(pWorker);
pWorker->join();
pWorker->join();
}
}
}

View File

@ -25,6 +25,7 @@
#include <maxscale/atomic.h>
#include <maxscale/config.h>
#include <maxscale/clock.h>
#include <maxscale/limits.h>
#include <maxscale/log_manager.h>
#include <maxscale/platform.h>
#include <maxscale/semaphore.hh>
@ -57,13 +58,11 @@ const int MXS_WORKER_MSG_DISPOSABLE_TASK = -2;
struct this_unit
{
bool initialized; // Whether the initialization has been performed.
int n_workers; // How many workers there are.
Worker** ppWorkers; // Array of worker instances.
int next_worker_id; // Next worker id.
} this_unit =
{
false, // initialized
0, // n_workers
NULL, // ppWorkers
0, // next_worker_id
};
@ -197,13 +196,11 @@ bool Worker::init()
{
ss_dassert(!this_unit.initialized);
int n_workers = config_threadcount();
Worker** ppWorkers = new (std::nothrow) Worker* [n_workers] (); // Zero initialized array
Worker** ppWorkers = new (std::nothrow) Worker* [MXS_MAX_THREADS] (); // Zero initialized array
if (ppWorkers)
{
this_unit.ppWorkers = ppWorkers;
this_unit.n_workers = n_workers;
this_unit.initialized = true;
}
@ -233,7 +230,9 @@ int64_t one_stats_get(int64_t Worker::STATISTICS::*what, enum ts_stats_type type
{
int64_t best = type == TS_STATS_MAX ? LONG_MIN : (type == TS_STATS_MIX ? LONG_MAX : 0);
for (int i = 0; i < this_unit.n_workers; ++i)
int nWorkers = this_unit.next_worker_id;
for (int i = 0; i < nWorkers; ++i)
{
Worker* pWorker = Worker::get(i);
ss_dassert(pWorker);
@ -265,7 +264,7 @@ int64_t one_stats_get(int64_t Worker::STATISTICS::*what, enum ts_stats_type type
}
}
return type == TS_STATS_AVG ? best / this_unit.n_workers : best;
return type == TS_STATS_AVG ? best / (nWorkers != 0 ? nWorkers : 1) : best;
}
}
@ -291,7 +290,7 @@ Worker::STATISTICS Worker::get_statistics()
for (int i = 0; i < Worker::STATISTICS::MAXNFDS - 1; i++)
{
for (int j = 0; j < this_unit.n_workers; ++j)
for (int j = 0; j < this_unit.next_worker_id; ++j)
{
Worker* pWorker = Worker::get(j);
ss_dassert(pWorker);
@ -302,7 +301,9 @@ Worker::STATISTICS Worker::get_statistics()
for (int i = 0; i <= Worker::STATISTICS::N_QUEUE_TIMES; ++i)
{
for (int j = 0; j < this_unit.n_workers; ++j)
int nWorkers = this_unit.next_worker_id;
for (int j = 0; j < nWorkers; ++j)
{
Worker* pWorker = Worker::get(j);
ss_dassert(pWorker);
@ -311,8 +312,8 @@ Worker::STATISTICS Worker::get_statistics()
cs.exectimes[i] += pWorker->statistics().exectimes[i];
}
cs.qtimes[i] /= this_unit.n_workers;
cs.exectimes[i] /= this_unit.n_workers;
cs.qtimes[i] /= (nWorkers != 0 ? nWorkers : 1);
cs.exectimes[i] /= (nWorkers != 0 ? nWorkers : 1);
}
return cs;
@ -440,7 +441,7 @@ bool Worker::remove_fd(int fd)
Worker* Worker::get(int worker_id)
{
ss_dassert(worker_id < this_unit.n_workers);
ss_dassert((worker_id >= 0) && (worker_id < this_unit.next_worker_id));
return this_unit.ppWorkers[worker_id];
}
@ -528,9 +529,11 @@ size_t Worker::broadcast(Task* pTask, Semaphore* pSem)
// No logging here, function must be signal safe.
size_t n = 0;
for (int i = 0; i < this_unit.n_workers; ++i)
int nWorkers = this_unit.next_worker_id;
for (int i = 0; i < nWorkers; ++i)
{
Worker* pWorker = this_unit.ppWorkers[i];
ss_dassert(pWorker);
if (pWorker->post(pTask, pSem))
{
@ -549,9 +552,11 @@ size_t Worker::broadcast(std::auto_ptr<DisposableTask> sTask)
size_t n = 0;
for (int i = 0; i < this_unit.n_workers; ++i)
int nWorkers = this_unit.next_worker_id;
for (int i = 0; i < nWorkers; ++i)
{
Worker* pWorker = this_unit.ppWorkers[i];
ss_dassert(pWorker);
if (pWorker->post_disposable(pTask))
{
@ -570,9 +575,11 @@ size_t Worker::execute_serially(Task& task)
Semaphore sem;
size_t n = 0;
for (int i = 0; i < this_unit.n_workers; ++i)
int nWorkers = this_unit.next_worker_id;
for (int i = 0; i < nWorkers; ++i)
{
Worker* pWorker = this_unit.ppWorkers[i];
ss_dassert(pWorker);
if (pWorker->post(&task, &sem))
{
@ -605,9 +612,11 @@ size_t Worker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
size_t n = 0;
for (int i = 0; i < this_unit.n_workers; ++i)
int nWorkers = this_unit.next_worker_id;
for (int i = 0; i < nWorkers; ++i)
{
Worker* pWorker = this_unit.ppWorkers[i];
ss_dassert(pWorker);
if (pWorker->post_message(msg_id, arg1, arg2))
{
@ -672,9 +681,10 @@ void Worker::shutdown()
void Worker::shutdown_all()
{
// NOTE: No logging here, this function must be signal safe.
ss_dassert((this_unit.n_workers == 0) || (this_unit.ppWorkers != NULL));
ss_dassert((this_unit.next_worker_id == 0) || (this_unit.ppWorkers != NULL));
for (int i = 0; i < this_unit.n_workers; ++i)
int nWorkers = this_unit.next_worker_id;
for (int i = 0; i < nWorkers; ++i)
{
Worker* pWorker = this_unit.ppWorkers[i];
ss_dassert(pWorker);