diff --git a/include/maxscale/worker.hh b/include/maxscale/worker.hh index 9f343d1ba..a7870fa02 100644 --- a/include/maxscale/worker.hh +++ b/include/maxscale/worker.hh @@ -730,7 +730,12 @@ public: * MyResult& result = task.result(); * @endcode */ - bool post(Task* pTask, Semaphore* pSem = NULL, enum execute_mode_t mode = EXECUTE_AUTO); + bool post(Task* pTask, Semaphore* pSem, enum execute_mode_t mode); + + bool post(Task* pTask, enum execute_mode_t mode) + { + return post(pTask, NULL, mode); + } /** * Posts a task to a worker for execution. @@ -742,10 +747,10 @@ public: * * @attention Once the task has been executed, it will be deleted. */ - bool post(std::auto_ptr sTask, enum execute_mode_t mode = EXECUTE_AUTO); + bool post(std::auto_ptr sTask, enum execute_mode_t mode); template - bool post(std::auto_ptr sTask, enum execute_mode_t mode = EXECUTE_AUTO) + bool post(std::auto_ptr sTask, enum execute_mode_t mode) { return post(std::auto_ptr(sTask.release()), mode); } @@ -768,10 +773,11 @@ public: * semaphore. * * @param func Function to execute + * @param mode Execution mode * * @return True if function was executed on the worker */ - bool execute(GenericFunction func); + bool execute(GenericFunction func, enum execute_mode_t mode); /** * Post a message to a worker. @@ -933,7 +939,7 @@ protected: pTask->dec_ref(); } - bool post_disposable(DisposableTask* pTask, enum execute_mode_t mode = EXECUTE_AUTO); + bool post_disposable(DisposableTask* pTask, enum execute_mode_t mode); /** * Called by Worker::run() before starting the epoll loop. diff --git a/server/core/internal/routingworker.hh b/server/core/internal/routingworker.hh index 029dc2722..95deca0e5 100644 --- a/server/core/internal/routingworker.hh +++ b/server/core/internal/routingworker.hh @@ -177,6 +177,12 @@ public: * should either not have any sharable data or then it should * have data specific to each worker that can be accessed * without locks. + * + * @attention The task will be posted to each routing worker using the + * EXECUTE_AUTO execution mode. That is, if the calling thread + * is that of a routing worker, then the task will be executed + * directly without going through the message loop of the worker, + * otherwise the task is delivered via the message loop. */ static size_t broadcast(Task* pTask, Semaphore* pSem = NULL); @@ -194,6 +200,12 @@ public: * * @attention Once the task has been executed by all workers, it will * be deleted. + * + * @attention The task will be posted to each routing worker using the + * EXECUTE_AUTO execution mode. That is, if the calling thread + * is that of a routing worker, then the task will be executed + * directly without going through the message loop of the worker, + * otherwise the task is delivered via the message loop. */ static size_t broadcast(std::auto_ptr sTask); @@ -215,6 +227,12 @@ public: * @warning This function is extremely inefficient and will be slow compared * to the other functions. Only use this function when printing thread-specific * data to stdout. + * + * @attention The task will be posted to each routing worker using the + * EXECUTE_AUTO execution mode. That is, if the calling thread + * is that of a routing worker, then the task will be executed + * directly without going through the message loop of the worker, + * otherwise the task is delivered via the message loop. */ static size_t execute_serially(Task& task); @@ -226,6 +244,12 @@ public: * @param task The task to be executed. * * @return How many workers the task was posted to. + * + * @attention The task will be posted to each routing worker using the + * EXECUTE_AUTO execution mode. That is, if the calling thread + * is that of a routing worker, then the task will be executed + * directly without going through the message loop of the worker, + * otherwise the task is delivered via the message loop. */ static size_t execute_concurrently(Task& task); diff --git a/server/core/resource.cc b/server/core/resource.cc index 39dda0b0a..8df96f48b 100644 --- a/server/core/resource.cc +++ b/server/core/resource.cc @@ -1280,7 +1280,7 @@ HttpResponse resource_handle_request(const HttpRequest& request) mxs::Semaphore sem; ResourceTask task(request); - worker->post(&task, &sem); + worker->post(&task, &sem, mxs::Worker::EXECUTE_AUTO); sem.wait(); return task.result(); diff --git a/server/core/routingworker.cc b/server/core/routingworker.cc index 20052eff9..1d951df39 100644 --- a/server/core/routingworker.cc +++ b/server/core/routingworker.cc @@ -604,7 +604,7 @@ size_t RoutingWorker::broadcast(Task* pTask, Semaphore* pSem) Worker* pWorker = this_unit.ppWorkers[i]; ss_dassert(pWorker); - if (pWorker->post(pTask, pSem)) + if (pWorker->post(pTask, pSem, EXECUTE_AUTO)) { ++n; } @@ -627,7 +627,7 @@ size_t RoutingWorker::broadcast(std::auto_ptr sTask) RoutingWorker* pWorker = this_unit.ppWorkers[i]; ss_dassert(pWorker); - if (pWorker->post_disposable(pTask)) + if (pWorker->post_disposable(pTask, EXECUTE_AUTO)) { ++n; } @@ -650,7 +650,7 @@ size_t RoutingWorker::execute_serially(Task& task) RoutingWorker* pWorker = this_unit.ppWorkers[i]; ss_dassert(pWorker); - if (pWorker->post(&task, &sem)) + if (pWorker->post(&task, &sem, EXECUTE_AUTO)) { sem.wait(); ++n; @@ -894,7 +894,7 @@ bool RoutingWorker::get_qc_stats(int id, QC_CACHE_STATS* pStats) { Semaphore sem; Task task(pStats); - pWorker->post(&task, &sem); + pWorker->post(&task, &sem, EXECUTE_AUTO); sem.wait(); } @@ -1000,6 +1000,14 @@ std::unique_ptr RoutingWorker::get_qc_stats_as_json(const char* zHost) return std::unique_ptr(mxs_json_resource(zHost, MXS_JSON_API_QC_STATS, sAll_stats.release())); } +// static +RoutingWorker* RoutingWorker::pick_worker() +{ + static int id_generator = 0; + int id = this_unit.id_min_worker + (atomic_add(&id_generator, 1) % this_unit.nWorkers); + return get(id); +} + } size_t mxs_rworker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) @@ -1197,7 +1205,7 @@ json_t* mxs_rworker_to_json(const char* zHost, int id) WorkerInfoTask task(zHost, id + 1); mxs::Semaphore sem; - target->post(&task, &sem); + target->post(&task, &sem, mxs::Worker::EXECUTE_AUTO); sem.wait(); return task.resource(id); @@ -1209,11 +1217,3 @@ json_t* mxs_rworker_list_to_json(const char* host) RoutingWorker::execute_concurrently(task); return task.resource(); } - -// static -RoutingWorker* RoutingWorker::pick_worker() -{ - static int id_generator = 0; - int id = this_unit.id_min_worker + (atomic_add(&id_generator, 1) % this_unit.nWorkers); - return get(id); -} diff --git a/server/core/session.cc b/server/core/session.cc index 9ef4c1fe9..0331eca14 100644 --- a/server/core/session.cc +++ b/server/core/session.cc @@ -359,7 +359,8 @@ static void session_free(MXS_SESSION *session) { // Destroy the service in the main routing worker thread mxs::RoutingWorker* main_worker = mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN); - main_worker->post(std::auto_ptr(new ServiceDestroyTask(service))); + main_worker->post(std::auto_ptr(new ServiceDestroyTask(service)), + mxs::Worker::EXECUTE_AUTO); } } diff --git a/server/core/worker.cc b/server/core/worker.cc index 9e8244a70..40d8a2a63 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -516,10 +516,10 @@ bool Worker::post(GenericFunction func, Semaphore* pSem, execute_mode_t mode) return rval; } -bool Worker::execute(GenericFunction func) +bool Worker::execute(GenericFunction func, execute_mode_t mode) { Semaphore sem; - return post(func, &sem, EXECUTE_AUTO) && sem.wait(); + return post(func, &sem, mode) && sem.wait(); } bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) diff --git a/server/modules/monitor/mariadbmon/mariadbmon.cc b/server/modules/monitor/mariadbmon/mariadbmon.cc index 52984686c..1c1867277 100644 --- a/server/modules/monitor/mariadbmon/mariadbmon.cc +++ b/server/modules/monitor/mariadbmon/mariadbmon.cc @@ -263,7 +263,7 @@ void MariaDBMonitor::diagnostics(DCB *dcb) const diag_str = diagnostics_to_string(); }; - if (!mutable_ptr->execute(func)) + if (!mutable_ptr->execute(func, Worker::EXECUTE_AUTO)) { diag_str = DIAG_ERROR; } @@ -310,7 +310,7 @@ json_t* MariaDBMonitor::diagnostics_json() const rval = diagnostics_to_json(); }; - if (!mutable_ptr->execute(func)) + if (!mutable_ptr->execute(func, Worker::EXECUTE_AUTO)) { rval = mxs_json_error_append(rval, "%s", DIAG_ERROR); } diff --git a/server/modules/routing/avrorouter/avro_main.cc b/server/modules/routing/avrorouter/avro_main.cc index 4e17ec19f..d9123c97b 100644 --- a/server/modules/routing/avrorouter/avro_main.cc +++ b/server/modules/routing/avrorouter/avro_main.cc @@ -335,7 +335,7 @@ static bool conversion_task_ctl(Avro *inst, bool start) if (task.get()) { - worker->post(task); + worker->post(task, Worker::EXECUTE_AUTO); rval = true; } }