diff --git a/include/maxscale/worker.h b/include/maxscale/worker.h index 62e04d331..a3002342b 100644 --- a/include/maxscale/worker.h +++ b/include/maxscale/worker.h @@ -98,4 +98,24 @@ static inline int mxs_worker_id(MXS_WORKER* worker) */ bool mxs_worker_post_message(MXS_WORKER* worker, uint32_t msg_id, intptr_t arg1, intptr_t arg2); +/** + * Broadcast a message to all worker. + * + * @param msg_id The message id. + * @param arg1 Message specific first argument. + * @param arg2 Message specific second argument. + * + * @return The number of messages posted; if less that ne number of workers + * then some postings failed. + * + * @attention The return value tells *only* whether message could be posted, + * *not* that it has reached the worker. + * + * @attentsion Exactly the same arguments are passed to all workers. Take that + * into account if the passed data must be freed. + * + * @attention This function is signal safe. + */ +size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); + MXS_END_DECLS diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 20dd6bdfc..207a81b04 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -98,6 +98,7 @@ #include "maxscale/session.h" #include "maxscale/modules.h" #include "maxscale/queuemanager.h" +#include "maxscale/worker.h" /* A DCB with null values, used for initialization */ static DCB dcb_initialized; @@ -2243,6 +2244,18 @@ dcb_isvalid(DCB *dcb) return dcb && !dcb->dcb_is_zombie; } +static void dcb_hangup_foreach_worker(int thread_id, struct server* server) +{ + for (DCB *dcb = all_dcbs[thread_id]; dcb; dcb = dcb->thread.next) + { + if (dcb->state == DCB_STATE_POLLING && dcb->server && + dcb->server == server) + { + poll_fake_hangup_event(dcb); + } + } +} + /** * Call all the callbacks on all DCB's that match the server and the reason given * @@ -2251,24 +2264,10 @@ dcb_isvalid(DCB *dcb) void dcb_hangup_foreach(struct server* server) { - int nthr = config_threadcount(); + intptr_t arg1 = (intptr_t)dcb_hangup_foreach_worker; + intptr_t arg2 = (intptr_t)server; - - for (int i = 0; i < nthr; i++) - { - spinlock_acquire(&all_dcbs_lock[i]); - - for (DCB *dcb = all_dcbs[i]; dcb; dcb = dcb->thread.next) - { - if (dcb->state == DCB_STATE_POLLING && dcb->server && - dcb->server == server) - { - poll_fake_hangup_event(dcb); - } - } - - spinlock_release(&all_dcbs_lock[i]); - } + mxs_worker_broadcast_message(MXS_WORKER_MSG_CALL, arg1, arg2); } /** diff --git a/server/core/worker.c b/server/core/worker.c index c76574da2..1df0548b7 100644 --- a/server/core/worker.c +++ b/server/core/worker.c @@ -108,6 +108,25 @@ bool mxs_worker_post_message(MXS_WORKER *worker, uint32_t msg_id, intptr_t arg1, return n == sizeof(message) ? true : false; } +size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) +{ + // NOTE: No logging here, this function must be signal safe. + + size_t n = 0; + + for (int i = 0; i < this_unit.n_workers; ++i) + { + MXS_WORKER* worker = this_unit.workers[i]; + + if (mxs_worker_post_message(worker, msg_id, arg1, arg2)) + { + ++n; + } + } + + return n; +} + void mxs_worker_main(MXS_WORKER* worker) { poll_waitevents(worker);