MXS-2002 Make Worker excecution mode explicit

This is the first step in some cleanup of the Worker interface.
The execution mode must now be explicitly specified, but that is
just a temporary step. Further down the road, _posting_ will
*always* mean via the message loop while _executing_ will optionally
and by default mean direct execution if the calling thread is that
of the worker.
This commit is contained in:
Johan Wikman
2018-08-09 10:58:28 +03:00
parent b9ec3f5130
commit 9cfd451a1d
8 changed files with 56 additions and 25 deletions

View File

@ -177,6 +177,12 @@ public:
* should either not have any sharable data or then it should
* have data specific to each worker that can be accessed
* without locks.
*
* @attention The task will be posted to each routing worker using the
* EXECUTE_AUTO execution mode. That is, if the calling thread
* is that of a routing worker, then the task will be executed
* directly without going through the message loop of the worker,
* otherwise the task is delivered via the message loop.
*/
static size_t broadcast(Task* pTask, Semaphore* pSem = NULL);
@ -194,6 +200,12 @@ public:
*
* @attention Once the task has been executed by all workers, it will
* be deleted.
*
* @attention The task will be posted to each routing worker using the
* EXECUTE_AUTO execution mode. That is, if the calling thread
* is that of a routing worker, then the task will be executed
* directly without going through the message loop of the worker,
* otherwise the task is delivered via the message loop.
*/
static size_t broadcast(std::auto_ptr<DisposableTask> sTask);
@ -215,6 +227,12 @@ public:
* @warning This function is extremely inefficient and will be slow compared
* to the other functions. Only use this function when printing thread-specific
* data to stdout.
*
* @attention The task will be posted to each routing worker using the
* EXECUTE_AUTO execution mode. That is, if the calling thread
* is that of a routing worker, then the task will be executed
* directly without going through the message loop of the worker,
* otherwise the task is delivered via the message loop.
*/
static size_t execute_serially(Task& task);
@ -226,6 +244,12 @@ public:
* @param task The task to be executed.
*
* @return How many workers the task was posted to.
*
* @attention The task will be posted to each routing worker using the
* EXECUTE_AUTO execution mode. That is, if the calling thread
* is that of a routing worker, then the task will be executed
* directly without going through the message loop of the worker,
* otherwise the task is delivered via the message loop.
*/
static size_t execute_concurrently(Task& task);

View File

@ -1280,7 +1280,7 @@ HttpResponse resource_handle_request(const HttpRequest& request)
mxs::Semaphore sem;
ResourceTask task(request);
worker->post(&task, &sem);
worker->post(&task, &sem, mxs::Worker::EXECUTE_AUTO);
sem.wait();
return task.result();

View File

@ -604,7 +604,7 @@ size_t RoutingWorker::broadcast(Task* pTask, Semaphore* pSem)
Worker* pWorker = this_unit.ppWorkers[i];
ss_dassert(pWorker);
if (pWorker->post(pTask, pSem))
if (pWorker->post(pTask, pSem, EXECUTE_AUTO))
{
++n;
}
@ -627,7 +627,7 @@ size_t RoutingWorker::broadcast(std::auto_ptr<DisposableTask> sTask)
RoutingWorker* pWorker = this_unit.ppWorkers[i];
ss_dassert(pWorker);
if (pWorker->post_disposable(pTask))
if (pWorker->post_disposable(pTask, EXECUTE_AUTO))
{
++n;
}
@ -650,7 +650,7 @@ size_t RoutingWorker::execute_serially(Task& task)
RoutingWorker* pWorker = this_unit.ppWorkers[i];
ss_dassert(pWorker);
if (pWorker->post(&task, &sem))
if (pWorker->post(&task, &sem, EXECUTE_AUTO))
{
sem.wait();
++n;
@ -894,7 +894,7 @@ bool RoutingWorker::get_qc_stats(int id, QC_CACHE_STATS* pStats)
{
Semaphore sem;
Task task(pStats);
pWorker->post(&task, &sem);
pWorker->post(&task, &sem, EXECUTE_AUTO);
sem.wait();
}
@ -1000,6 +1000,14 @@ std::unique_ptr<json_t> RoutingWorker::get_qc_stats_as_json(const char* zHost)
return std::unique_ptr<json_t>(mxs_json_resource(zHost, MXS_JSON_API_QC_STATS, sAll_stats.release()));
}
// static
RoutingWorker* RoutingWorker::pick_worker()
{
static int id_generator = 0;
int id = this_unit.id_min_worker + (atomic_add(&id_generator, 1) % this_unit.nWorkers);
return get(id);
}
}
size_t mxs_rworker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
@ -1197,7 +1205,7 @@ json_t* mxs_rworker_to_json(const char* zHost, int id)
WorkerInfoTask task(zHost, id + 1);
mxs::Semaphore sem;
target->post(&task, &sem);
target->post(&task, &sem, mxs::Worker::EXECUTE_AUTO);
sem.wait();
return task.resource(id);
@ -1209,11 +1217,3 @@ json_t* mxs_rworker_list_to_json(const char* host)
RoutingWorker::execute_concurrently(task);
return task.resource();
}
// static
RoutingWorker* RoutingWorker::pick_worker()
{
static int id_generator = 0;
int id = this_unit.id_min_worker + (atomic_add(&id_generator, 1) % this_unit.nWorkers);
return get(id);
}

View File

@ -359,7 +359,8 @@ static void session_free(MXS_SESSION *session)
{
// Destroy the service in the main routing worker thread
mxs::RoutingWorker* main_worker = mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN);
main_worker->post(std::auto_ptr<ServiceDestroyTask>(new ServiceDestroyTask(service)));
main_worker->post(std::auto_ptr<ServiceDestroyTask>(new ServiceDestroyTask(service)),
mxs::Worker::EXECUTE_AUTO);
}
}

View File

@ -516,10 +516,10 @@ bool Worker::post(GenericFunction func, Semaphore* pSem, execute_mode_t mode)
return rval;
}
bool Worker::execute(GenericFunction func)
bool Worker::execute(GenericFunction func, execute_mode_t mode)
{
Semaphore sem;
return post(func, &sem, EXECUTE_AUTO) && sem.wait();
return post(func, &sem, mode) && sem.wait();
}
bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)