MXS-2008 Move maxscale/worker.h to maxbase/worker.h

This commit is contained in:
Johan Wikman
2018-08-17 15:37:00 +03:00
parent 932956d5f6
commit e0cb11151f
10 changed files with 49 additions and 54 deletions

View File

@ -13,8 +13,8 @@
*/ */
#include <maxscale/cdefs.h> #include <maxscale/cdefs.h>
#include <maxbase/worker.h>
#include <maxscale/session.h> #include <maxscale/session.h>
#include <maxscale/worker.h>
MXS_BEGIN_DECLS MXS_BEGIN_DECLS
@ -31,14 +31,14 @@ MXS_BEGIN_DECLS
* @return The corresponding routing worker instance, or NULL if the * @return The corresponding routing worker instance, or NULL if the
* id does not correspond to a routing worker. * id does not correspond to a routing worker.
*/ */
MXS_WORKER* mxs_rworker_get(int worker_id); MXB_WORKER* mxs_rworker_get(int worker_id);
/** /**
* Return the current routing worker. * Return the current routing worker.
* *
* @return A routing worker, or NULL if there is no current routing worker. * @return A routing worker, or NULL if there is no current routing worker.
*/ */
MXS_WORKER* mxs_rworker_get_current(); MXB_WORKER* mxs_rworker_get_current();
/** /**
* Return the id of the current routing worker. * Return the id of the current routing worker.

View File

@ -23,9 +23,9 @@
#include <maxbase/atomic.h> #include <maxbase/atomic.h>
#include <maxbase/semaphore.hh> #include <maxbase/semaphore.hh>
#include <maxbase/worker.h>
#include <maxscale/debug.h> #include <maxscale/debug.h>
#include <maxscale/messagequeue.hh> #include <maxscale/messagequeue.hh>
#include <maxscale/worker.h>
#include <maxscale/workertask.hh> #include <maxscale/workertask.hh>
namespace maxscale namespace maxscale
@ -483,7 +483,7 @@ private:
* associated with file descriptors. Internally Worker has a thread * associated with file descriptors. Internally Worker has a thread
* and an epoll-instance of its own. * and an epoll-instance of its own.
*/ */
class Worker : public MXS_WORKER class Worker : public MXB_WORKER
, private MessageQueue::Handler , private MessageQueue::Handler
{ {
Worker(const Worker&) = delete; Worker(const Worker&) = delete;

View File

@ -17,9 +17,7 @@
MXS_BEGIN_DECLS MXS_BEGIN_DECLS
typedef MXB_WORKER MXS_WORKER; typedef enum mxb_worker_msg_id_t
enum mxs_worker_msg_id
{ {
/** /**
* Shutdown message. * Shutdown message.
@ -27,23 +25,23 @@ enum mxs_worker_msg_id
* arg1: 0 * arg1: 0
* arg2: NULL * arg2: NULL
*/ */
MXS_WORKER_MSG_SHUTDOWN, MXB_WORKER_MSG_SHUTDOWN,
/** /**
* Function call message. * Function call message.
* *
* arg1: Pointer to function with the prototype: void (*)(MXS_WORKER*, void* arg2); * arg1: Pointer to function with the prototype: void (*)(MXB_WORKER*, void* arg2);
* arg2: Second argument for the function passed in arg1. * arg2: Second argument for the function passed in arg1.
*/ */
MXS_WORKER_MSG_CALL MXB_WORKER_MSG_CALL
}; } mxb_worker_msg_id_t;
/** /**
* Return the current worker. * Return the current worker.
* *
* @return A worker, or NULL if there is no current worker. * @return A worker, or NULL if there is no current worker.
*/ */
MXS_WORKER* mxs_worker_get_current(); MXB_WORKER* mxb_worker_get_current();
/** /**
* Post a message to a worker. * Post a message to a worker.
@ -61,6 +59,6 @@ MXS_WORKER* mxs_worker_get_current();
* *
* @attention This function is signal safe. * @attention This function is signal safe.
*/ */
bool mxs_worker_post_message(MXS_WORKER* worker, uint32_t msg_id, intptr_t arg1, intptr_t arg2); bool mxb_worker_post_message(MXB_WORKER* worker, uint32_t msg_id, intptr_t arg1, intptr_t arg2);
MXS_END_DECLS MXB_END_DECLS

View File

@ -1157,7 +1157,7 @@ void dcb_close(DCB *dcb)
} }
} }
static void cb_dcb_close_in_owning_thread(MXS_WORKER*, void* data) static void cb_dcb_close_in_owning_thread(MXB_WORKER*, void* data)
{ {
DCB* dcb = static_cast<DCB*>(data); DCB* dcb = static_cast<DCB*>(data);
ss_dassert(dcb); ss_dassert(dcb);
@ -1173,13 +1173,13 @@ void dcb_close_in_owning_thread(DCB* dcb)
// TODO: reference counted, so that we could addref before posting, thus // TODO: reference counted, so that we could addref before posting, thus
// TODO: preventing too early a deletion. // TODO: preventing too early a deletion.
MXS_WORKER* worker = static_cast<MXS_WORKER*>(dcb->poll.owner); // The owning worker MXB_WORKER* worker = static_cast<MXB_WORKER*>(dcb->poll.owner); // The owning worker
ss_dassert(worker); ss_dassert(worker);
intptr_t arg1 = (intptr_t)cb_dcb_close_in_owning_thread; intptr_t arg1 = (intptr_t)cb_dcb_close_in_owning_thread;
intptr_t arg2 = (intptr_t)dcb; intptr_t arg2 = (intptr_t)dcb;
if (!mxs_worker_post_message(worker, MXS_WORKER_MSG_CALL, arg1, arg2)) if (!mxb_worker_post_message(worker, MXB_WORKER_MSG_CALL, arg1, arg2))
{ {
MXS_ERROR("Could not post dcb for closing to the owning thread.."); MXS_ERROR("Could not post dcb for closing to the owning thread..");
} }
@ -1984,7 +1984,7 @@ dcb_call_callback(DCB *dcb, DCB_REASON reason)
} }
} }
static void dcb_hangup_foreach_worker(MXS_WORKER* worker, struct server* server) static void dcb_hangup_foreach_worker(MXB_WORKER* worker, struct server* server)
{ {
RoutingWorker* rworker = static_cast<RoutingWorker*>(worker); RoutingWorker* rworker = static_cast<RoutingWorker*>(worker);
int id = rworker->id(); int id = rworker->id();
@ -2011,7 +2011,7 @@ dcb_hangup_foreach(struct server* server)
intptr_t arg1 = (intptr_t)dcb_hangup_foreach_worker; intptr_t arg1 = (intptr_t)dcb_hangup_foreach_worker;
intptr_t arg2 = (intptr_t)server; intptr_t arg2 = (intptr_t)server;
RoutingWorker::broadcast_message(MXS_WORKER_MSG_CALL, arg1, arg2); RoutingWorker::broadcast_message(MXB_WORKER_MSG_CALL, arg1, arg2);
} }
/** /**
@ -3466,7 +3466,7 @@ static bool dcb_add_to_worker(Worker* worker, DCB* dcb, uint32_t events)
intptr_t arg1 = (intptr_t)dcb_add_to_list_cb; intptr_t arg1 = (intptr_t)dcb_add_to_list_cb;
intptr_t arg2 = (intptr_t)dcb; intptr_t arg2 = (intptr_t)dcb;
if (!worker->post_message(MXS_WORKER_MSG_CALL, arg1, arg2)) if (!worker->post_message(MXB_WORKER_MSG_CALL, arg1, arg2))
{ {
MXS_ERROR("Could not post listening DCB for book-keeping to worker."); MXS_ERROR("Could not post listening DCB for book-keeping to worker.");
} }

View File

@ -48,9 +48,6 @@ using std::stringstream;
namespace namespace
{ {
const int MXS_WORKER_MSG_TASK = -1;
const int MXS_WORKER_MSG_DISPOSABLE_TASK = -2;
/** /**
* Unit variables. * Unit variables.
*/ */
@ -340,7 +337,7 @@ bool RoutingWorker::remove_shared_fd(int fd)
return rv; return rv;
} }
bool mxs_worker_should_shutdown(MXS_WORKER* pWorker) bool mxs_worker_should_shutdown(MXB_WORKER* pWorker)
{ {
return static_cast<RoutingWorker*>(pWorker)->should_shutdown(); return static_cast<RoutingWorker*>(pWorker)->should_shutdown();
} }
@ -1034,12 +1031,12 @@ MXS_SESSION* mxs_rworker_find_session(uint64_t id)
return pWorker->session_registry().lookup(id); return pWorker->session_registry().lookup(id);
} }
MXS_WORKER* mxs_rworker_get(int worker_id) MXB_WORKER* mxs_rworker_get(int worker_id)
{ {
return RoutingWorker::get(worker_id); return RoutingWorker::get(worker_id);
} }
MXS_WORKER* mxs_rworker_get_current() MXB_WORKER* mxs_rworker_get_current()
{ {
return RoutingWorker::get_current(); return RoutingWorker::get_current();
} }

View File

@ -35,8 +35,8 @@ namespace
using maxscale::Worker; using maxscale::Worker;
const int MXS_WORKER_MSG_TASK = -1; const int MXB_WORKER_MSG_TASK = -1;
const int MXS_WORKER_MSG_DISPOSABLE_TASK = -2; const int MXB_WORKER_MSG_DISPOSABLE_TASK = -2;
/** /**
* Unit variables. * Unit variables.
@ -428,7 +428,7 @@ bool Worker::execute(Task* pTask, mxb::Semaphore* pSem, enum execute_mode_t mode
intptr_t arg1 = reinterpret_cast<intptr_t>(pTask); intptr_t arg1 = reinterpret_cast<intptr_t>(pTask);
intptr_t arg2 = reinterpret_cast<intptr_t>(pSem); intptr_t arg2 = reinterpret_cast<intptr_t>(pSem);
rval = post_message(MXS_WORKER_MSG_TASK, arg1, arg2); rval = post_message(MXB_WORKER_MSG_TASK, arg1, arg2);
} }
return rval; return rval;
@ -456,7 +456,7 @@ bool Worker::post_disposable(DisposableTask* pTask, enum execute_mode_t mode)
{ {
intptr_t arg1 = reinterpret_cast<intptr_t>(pTask); intptr_t arg1 = reinterpret_cast<intptr_t>(pTask);
posted = post_message(MXS_WORKER_MSG_DISPOSABLE_TASK, arg1, 0); posted = post_message(MXB_WORKER_MSG_DISPOSABLE_TASK, arg1, 0);
if (!posted) if (!posted)
{ {
@ -598,7 +598,7 @@ void Worker::shutdown()
if (!m_shutdown_initiated) if (!m_shutdown_initiated)
{ {
if (post_message(MXS_WORKER_MSG_SHUTDOWN, 0, 0)) if (post_message(MXB_WORKER_MSG_SHUTDOWN, 0, 0))
{ {
m_shutdown_initiated = true; m_shutdown_initiated = true;
} }
@ -616,22 +616,22 @@ void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& ms
{ {
switch (msg.id()) switch (msg.id())
{ {
case MXS_WORKER_MSG_SHUTDOWN: case MXB_WORKER_MSG_SHUTDOWN:
{ {
MXS_INFO("Worker %p received shutdown message.", this); MXS_INFO("Worker %p received shutdown message.", this);
m_should_shutdown = true; m_should_shutdown = true;
} }
break; break;
case MXS_WORKER_MSG_CALL: case MXB_WORKER_MSG_CALL:
{ {
void (*f)(MXS_WORKER*, void*) = (void (*)(MXS_WORKER*, void*))msg.arg1(); void (*f)(MXB_WORKER*, void*) = (void (*)(MXB_WORKER*, void*))msg.arg1();
f(this, (void*)msg.arg2()); f(this, (void*)msg.arg2());
} }
break; break;
case MXS_WORKER_MSG_TASK: case MXB_WORKER_MSG_TASK:
{ {
Task *pTask = reinterpret_cast<Task*>(msg.arg1()); Task *pTask = reinterpret_cast<Task*>(msg.arg1());
mxb::Semaphore* pSem = reinterpret_cast<mxb::Semaphore*>(msg.arg2()); mxb::Semaphore* pSem = reinterpret_cast<mxb::Semaphore*>(msg.arg2());
@ -645,7 +645,7 @@ void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& ms
} }
break; break;
case MXS_WORKER_MSG_DISPOSABLE_TASK: case MXB_WORKER_MSG_DISPOSABLE_TASK:
{ {
DisposableTask *pTask = reinterpret_cast<DisposableTask*>(msg.arg1()); DisposableTask *pTask = reinterpret_cast<DisposableTask*>(msg.arg1());
pTask->execute(*this); pTask->execute(*this);
@ -1053,12 +1053,12 @@ bool Worker::cancel_delayed_call(uint32_t id)
} }
MXS_WORKER* mxs_worker_get_current() MXB_WORKER* mxb_worker_get_current()
{ {
return Worker::get_current(); return Worker::get_current();
} }
bool mxs_worker_post_message(MXS_WORKER* pWorker, uint32_t msg_id, intptr_t arg1, intptr_t arg2) bool mxb_worker_post_message(MXB_WORKER* pWorker, uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{ {
return static_cast<Worker*>(pWorker)->post_message(msg_id, arg1, arg2); return static_cast<Worker*>(pWorker)->post_message(msg_id, arg1, arg2);
} }

View File

@ -1415,9 +1415,9 @@ void mxs_mysql_execute_kill(MXS_SESSION* issuer, uint64_t target_id, kill_type_t
for (int i = 0; i < config_threadcount(); i++) for (int i = 0; i < config_threadcount(); i++)
{ {
MXS_WORKER* worker = mxs_rworker_get(i); MXB_WORKER* worker = mxs_rworker_get(i);
ss_dassert(worker); ss_dassert(worker);
mxs_worker_post_message(worker, MXS_WORKER_MSG_CALL, (intptr_t)worker_func, mxb_worker_post_message(worker, MXB_WORKER_MSG_CALL, (intptr_t)worker_func,
(intptr_t)new ConnKillInfo(target_id, ss.str(), issuer)); (intptr_t)new ConnKillInfo(target_id, ss.str(), issuer));
} }
@ -1433,9 +1433,9 @@ void mxs_mysql_execute_kill_user(MXS_SESSION* issuer, const char* user, kill_typ
for (int i = 0; i < config_threadcount(); i++) for (int i = 0; i < config_threadcount(); i++)
{ {
MXS_WORKER* worker = mxs_rworker_get(i); MXB_WORKER* worker = mxs_rworker_get(i);
ss_dassert(worker); ss_dassert(worker);
mxs_worker_post_message(worker, MXS_WORKER_MSG_CALL, (intptr_t)worker_func, mxb_worker_post_message(worker, MXB_WORKER_MSG_CALL, (intptr_t)worker_func,
(intptr_t)new UserKillInfo(user, ss.str(), issuer)); (intptr_t)new UserKillInfo(user, ss.str(), issuer));
} }

View File

@ -36,8 +36,9 @@
#include <time.h> #include <time.h>
#include <uuid/uuid.h> #include <uuid/uuid.h>
#include <maxscale/alloc.h>
#include <maxbase/atomic.h> #include <maxbase/atomic.h>
#include <maxbase/worker.h>
#include <maxscale/alloc.h>
#include <maxscale/config.hh> #include <maxscale/config.hh>
#include <maxscale/dcb.h> #include <maxscale/dcb.h>
#include <maxscale/housekeeper.h> #include <maxscale/housekeeper.h>
@ -51,7 +52,6 @@
#include <maxscale/users.h> #include <maxscale/users.h>
#include <maxscale/utils.h> #include <maxscale/utils.h>
#include <maxscale/utils.hh> #include <maxscale/utils.hh>
#include <maxscale/worker.h>
#include <maxscale/paths.h> #include <maxscale/paths.h>
/* The router entry points */ /* The router entry points */

View File

@ -110,7 +110,7 @@ extern int blr_write_special_event(ROUTER_INSTANCE *router,
extern int blr_file_new_binlog(ROUTER_INSTANCE *router, char *file); extern int blr_file_new_binlog(ROUTER_INSTANCE *router, char *file);
static bool blr_handle_missing_files(ROUTER_INSTANCE *router, static bool blr_handle_missing_files(ROUTER_INSTANCE *router,
char *new_file); char *new_file);
static void worker_cb_start_master(MXS_WORKER*, void* data); static void worker_cb_start_master(MXB_WORKER*, void* data);
extern void blr_file_update_gtid(ROUTER_INSTANCE *router); extern void blr_file_update_gtid(ROUTER_INSTANCE *router);
static int blr_check_connect_retry(ROUTER_INSTANCE *router); static int blr_check_connect_retry(ROUTER_INSTANCE *router);
@ -283,7 +283,7 @@ static void blr_start_master(void* data)
* @param worker The worker in whose context the function is called. * @param worker The worker in whose context the function is called.
* @param data The data to be passed to `blr_start_master` * @param data The data to be passed to `blr_start_master`
*/ */
static void worker_cb_start_master(MXS_WORKER* worker, void* data) static void worker_cb_start_master(MXB_WORKER* worker, void* data)
{ {
// This is itended to be called only in the main worker. // This is itended to be called only in the main worker.
ss_dassert(worker == mxs_rworker_get(MXS_RWORKER_MAIN)); ss_dassert(worker == mxs_rworker_get(MXS_RWORKER_MAIN));
@ -301,13 +301,13 @@ bool blr_start_master_in_main(void* data)
// The master should be connected to in the main worker, so we post it a // The master should be connected to in the main worker, so we post it a
// message and call `blr_start_master` there. // message and call `blr_start_master` there.
MXS_WORKER* worker = mxs_rworker_get(MXS_RWORKER_MAIN); // The worker running in the main thread. MXB_WORKER* worker = mxs_rworker_get(MXS_RWORKER_MAIN); // The worker running in the main thread.
ss_dassert(worker); ss_dassert(worker);
intptr_t arg1 = (intptr_t)worker_cb_start_master; intptr_t arg1 = (intptr_t)worker_cb_start_master;
intptr_t arg2 = (intptr_t)data; intptr_t arg2 = (intptr_t)data;
if (!mxs_worker_post_message(worker, MXS_WORKER_MSG_CALL, arg1, arg2)) if (!mxb_worker_post_message(worker, MXB_WORKER_MSG_CALL, arg1, arg2))
{ {
MXS_ERROR("Could not post 'blr_start_master' message to main worker."); MXS_ERROR("Could not post 'blr_start_master' message to main worker.");
} }
@ -321,7 +321,7 @@ bool blr_start_master_in_main(void* data)
* @param worker_id The id of the worker in whose context the function is called. * @param worker_id The id of the worker in whose context the function is called.
* @param data The data to be passed to `blr_start_master` * @param data The data to be passed to `blr_start_master`
*/ */
static void worker_cb_close_master(MXS_WORKER* worker, void* data) static void worker_cb_close_master(MXB_WORKER* worker, void* data)
{ {
// This is itended to be called only in the main worker. // This is itended to be called only in the main worker.
ss_dassert(worker == mxs_rworker_get(MXS_RWORKER_MAIN)); ss_dassert(worker == mxs_rworker_get(MXS_RWORKER_MAIN));
@ -339,13 +339,13 @@ void blr_close_master_in_main(void* data)
// The master should be connected to in the main worker, so we post it a // The master should be connected to in the main worker, so we post it a
// message and call `blr_master_close` there. // message and call `blr_master_close` there.
MXS_WORKER* worker = mxs_rworker_get(MXS_RWORKER_MAIN); // The worker running in the main thread. MXB_WORKER* worker = mxs_rworker_get(MXS_RWORKER_MAIN); // The worker running in the main thread.
ss_dassert(worker); ss_dassert(worker);
intptr_t arg1 = (intptr_t)worker_cb_close_master; intptr_t arg1 = (intptr_t)worker_cb_close_master;
intptr_t arg2 = (intptr_t)data; intptr_t arg2 = (intptr_t)data;
if (!mxs_worker_post_message(worker, MXS_WORKER_MSG_CALL, arg1, arg2)) if (!mxb_worker_post_message(worker, MXB_WORKER_MSG_CALL, arg1, arg2))
{ {
MXS_ERROR("Could not post 'blr_master_close' message to main worker."); MXS_ERROR("Could not post 'blr_master_close' message to main worker.");
} }

View File

@ -910,7 +910,7 @@ static void cmd_AddServer(DCB *dcb, SERVER *server, char *v1, char *v2, char *v3
namespace namespace
{ {
void ping(MXS_WORKER* worker, void* arg) void ping(MXB_WORKER* worker, void* arg)
{ {
MXS_NOTICE("Worker[%p]: Alive and kicking.", worker); MXS_NOTICE("Worker[%p]: Alive and kicking.", worker);
} }
@ -921,7 +921,7 @@ void ping_workers(DCB* dcb)
{ {
intptr_t arg1 = reinterpret_cast<intptr_t>(ping); intptr_t arg1 = reinterpret_cast<intptr_t>(ping);
int n = mxs_rworker_broadcast_message(MXS_WORKER_MSG_CALL, arg1, 0); int n = mxs_rworker_broadcast_message(MXB_WORKER_MSG_CALL, arg1, 0);
dcb_printf(dcb, "Broadcasted ping message to %d workers.\n", n); dcb_printf(dcb, "Broadcasted ping message to %d workers.\n", n);
} }