Inject hangup event in owning thread
When a hangup event needs to be inserted into all DCBs referring to a particular server, it is done in the worker thread that owns the DCB.
This commit is contained in:
@ -98,4 +98,24 @@ static inline int mxs_worker_id(MXS_WORKER* worker)
|
|||||||
*/
|
*/
|
||||||
bool mxs_worker_post_message(MXS_WORKER* worker, uint32_t msg_id, intptr_t arg1, intptr_t arg2);
|
bool mxs_worker_post_message(MXS_WORKER* worker, uint32_t msg_id, intptr_t arg1, intptr_t arg2);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Broadcast a message to all worker.
|
||||||
|
*
|
||||||
|
* @param msg_id The message id.
|
||||||
|
* @param arg1 Message specific first argument.
|
||||||
|
* @param arg2 Message specific second argument.
|
||||||
|
*
|
||||||
|
* @return The number of messages posted; if less that ne number of workers
|
||||||
|
* then some postings failed.
|
||||||
|
*
|
||||||
|
* @attention The return value tells *only* whether message could be posted,
|
||||||
|
* *not* that it has reached the worker.
|
||||||
|
*
|
||||||
|
* @attentsion Exactly the same arguments are passed to all workers. Take that
|
||||||
|
* into account if the passed data must be freed.
|
||||||
|
*
|
||||||
|
* @attention This function is signal safe.
|
||||||
|
*/
|
||||||
|
size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2);
|
||||||
|
|
||||||
MXS_END_DECLS
|
MXS_END_DECLS
|
||||||
|
@ -98,6 +98,7 @@
|
|||||||
#include "maxscale/session.h"
|
#include "maxscale/session.h"
|
||||||
#include "maxscale/modules.h"
|
#include "maxscale/modules.h"
|
||||||
#include "maxscale/queuemanager.h"
|
#include "maxscale/queuemanager.h"
|
||||||
|
#include "maxscale/worker.h"
|
||||||
|
|
||||||
/* A DCB with null values, used for initialization */
|
/* A DCB with null values, used for initialization */
|
||||||
static DCB dcb_initialized;
|
static DCB dcb_initialized;
|
||||||
@ -2243,6 +2244,18 @@ dcb_isvalid(DCB *dcb)
|
|||||||
return dcb && !dcb->dcb_is_zombie;
|
return dcb && !dcb->dcb_is_zombie;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void dcb_hangup_foreach_worker(int thread_id, struct server* server)
|
||||||
|
{
|
||||||
|
for (DCB *dcb = all_dcbs[thread_id]; dcb; dcb = dcb->thread.next)
|
||||||
|
{
|
||||||
|
if (dcb->state == DCB_STATE_POLLING && dcb->server &&
|
||||||
|
dcb->server == server)
|
||||||
|
{
|
||||||
|
poll_fake_hangup_event(dcb);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Call all the callbacks on all DCB's that match the server and the reason given
|
* Call all the callbacks on all DCB's that match the server and the reason given
|
||||||
*
|
*
|
||||||
@ -2251,24 +2264,10 @@ dcb_isvalid(DCB *dcb)
|
|||||||
void
|
void
|
||||||
dcb_hangup_foreach(struct server* server)
|
dcb_hangup_foreach(struct server* server)
|
||||||
{
|
{
|
||||||
int nthr = config_threadcount();
|
intptr_t arg1 = (intptr_t)dcb_hangup_foreach_worker;
|
||||||
|
intptr_t arg2 = (intptr_t)server;
|
||||||
|
|
||||||
|
mxs_worker_broadcast_message(MXS_WORKER_MSG_CALL, arg1, arg2);
|
||||||
for (int i = 0; i < nthr; i++)
|
|
||||||
{
|
|
||||||
spinlock_acquire(&all_dcbs_lock[i]);
|
|
||||||
|
|
||||||
for (DCB *dcb = all_dcbs[i]; dcb; dcb = dcb->thread.next)
|
|
||||||
{
|
|
||||||
if (dcb->state == DCB_STATE_POLLING && dcb->server &&
|
|
||||||
dcb->server == server)
|
|
||||||
{
|
|
||||||
poll_fake_hangup_event(dcb);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
spinlock_release(&all_dcbs_lock[i]);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -108,6 +108,25 @@ bool mxs_worker_post_message(MXS_WORKER *worker, uint32_t msg_id, intptr_t arg1,
|
|||||||
return n == sizeof(message) ? true : false;
|
return n == sizeof(message) ? true : false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
|
||||||
|
{
|
||||||
|
// NOTE: No logging here, this function must be signal safe.
|
||||||
|
|
||||||
|
size_t n = 0;
|
||||||
|
|
||||||
|
for (int i = 0; i < this_unit.n_workers; ++i)
|
||||||
|
{
|
||||||
|
MXS_WORKER* worker = this_unit.workers[i];
|
||||||
|
|
||||||
|
if (mxs_worker_post_message(worker, msg_id, arg1, arg2))
|
||||||
|
{
|
||||||
|
++n;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
void mxs_worker_main(MXS_WORKER* worker)
|
void mxs_worker_main(MXS_WORKER* worker)
|
||||||
{
|
{
|
||||||
poll_waitevents(worker);
|
poll_waitevents(worker);
|
||||||
|
Reference in New Issue
Block a user