MXS-1378 Provide access to the current DCB
This will be used by a subsequent `session_get_current()` and `session_get_current_id()` for obtaining the current SESSION and session id, respectively. The latter of those will be used by the logging mechanism for logging the session id in conjunction with messages.
This commit is contained in:
@ -66,26 +66,35 @@ using maxscale::Worker;
|
||||
using maxscale::WorkerTask;
|
||||
using maxscale::Semaphore;
|
||||
|
||||
/* A DCB with null values, used for initialization */
|
||||
static DCB dcb_initialized;
|
||||
namespace
|
||||
{
|
||||
|
||||
static DCB **all_dcbs;
|
||||
struct
|
||||
{
|
||||
DCB dcb_initialized; /** A DCB with null values, used for initialization. */
|
||||
DCB** all_dcbs; /** #workers sized array of pointers to DCBs where dcbs are listed. */
|
||||
bool check_timeouts; /** Should session timeouts be checked. */
|
||||
} this_unit;
|
||||
|
||||
/** Variables for session timeout checks */
|
||||
bool check_timeouts = false;
|
||||
thread_local long next_timeout_check = 0;
|
||||
thread_local struct
|
||||
{
|
||||
long next_timeout_check; /** When to next check for idle sessions. */
|
||||
DCB* current_dcb; /** The DCB currently being handled by event handlers. */
|
||||
} this_thread;
|
||||
|
||||
}
|
||||
|
||||
void dcb_global_init()
|
||||
{
|
||||
dcb_initialized.dcb_chk_top = CHK_NUM_DCB;
|
||||
dcb_initialized.fd = DCBFD_CLOSED;
|
||||
dcb_initialized.state = DCB_STATE_ALLOC;
|
||||
dcb_initialized.ssl_state = SSL_HANDSHAKE_UNKNOWN;
|
||||
dcb_initialized.dcb_chk_tail = CHK_NUM_DCB;
|
||||
this_unit.dcb_initialized.dcb_chk_top = CHK_NUM_DCB;
|
||||
this_unit.dcb_initialized.fd = DCBFD_CLOSED;
|
||||
this_unit.dcb_initialized.state = DCB_STATE_ALLOC;
|
||||
this_unit.dcb_initialized.ssl_state = SSL_HANDSHAKE_UNKNOWN;
|
||||
this_unit.dcb_initialized.dcb_chk_tail = CHK_NUM_DCB;
|
||||
|
||||
int nthreads = config_threadcount();
|
||||
|
||||
if ((all_dcbs = (DCB**)MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL)
|
||||
if ((this_unit.all_dcbs = (DCB**)MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL)
|
||||
{
|
||||
MXS_OOM();
|
||||
raise(SIGABRT);
|
||||
@ -148,7 +157,7 @@ uint64_t dcb_get_session_id(DCB *dcb)
|
||||
static void
|
||||
dcb_initialize(DCB *dcb)
|
||||
{
|
||||
*dcb = dcb_initialized;
|
||||
*dcb = this_unit.dcb_initialized;
|
||||
|
||||
dcb->poll.handler = dcb_poll_handler;
|
||||
}
|
||||
@ -1891,7 +1900,7 @@ dcb_isvalid(DCB *dcb)
|
||||
|
||||
static void dcb_hangup_foreach_worker(int thread_id, struct server* server)
|
||||
{
|
||||
for (DCB *dcb = all_dcbs[thread_id]; dcb; dcb = dcb->thread.next)
|
||||
for (DCB *dcb = this_unit.all_dcbs[thread_id]; dcb; dcb = dcb->thread.next)
|
||||
{
|
||||
if (dcb->state == DCB_STATE_POLLING && dcb->server &&
|
||||
dcb->server == server)
|
||||
@ -2679,15 +2688,15 @@ void dcb_add_to_list(DCB *dcb)
|
||||
|
||||
if (worker_id == dcb->poll.thread.id)
|
||||
{
|
||||
if (all_dcbs[dcb->poll.thread.id] == NULL)
|
||||
if (this_unit.all_dcbs[dcb->poll.thread.id] == NULL)
|
||||
{
|
||||
all_dcbs[dcb->poll.thread.id] = dcb;
|
||||
all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
|
||||
this_unit.all_dcbs[dcb->poll.thread.id] = dcb;
|
||||
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
|
||||
}
|
||||
else
|
||||
{
|
||||
all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb;
|
||||
all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
|
||||
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb;
|
||||
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -2715,28 +2724,28 @@ static void dcb_remove_from_list(DCB *dcb)
|
||||
{
|
||||
ss_dassert(Worker::get_current_id() == dcb->poll.thread.id);
|
||||
|
||||
if (dcb == all_dcbs[dcb->poll.thread.id])
|
||||
if (dcb == this_unit.all_dcbs[dcb->poll.thread.id])
|
||||
{
|
||||
DCB *tail = all_dcbs[dcb->poll.thread.id]->thread.tail;
|
||||
all_dcbs[dcb->poll.thread.id] = all_dcbs[dcb->poll.thread.id]->thread.next;
|
||||
DCB *tail = this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail;
|
||||
this_unit.all_dcbs[dcb->poll.thread.id] = this_unit.all_dcbs[dcb->poll.thread.id]->thread.next;
|
||||
|
||||
if (all_dcbs[dcb->poll.thread.id])
|
||||
if (this_unit.all_dcbs[dcb->poll.thread.id])
|
||||
{
|
||||
all_dcbs[dcb->poll.thread.id]->thread.tail = tail;
|
||||
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = tail;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
DCB *current = all_dcbs[dcb->poll.thread.id]->thread.next;
|
||||
DCB *prev = all_dcbs[dcb->poll.thread.id];
|
||||
DCB *current = this_unit.all_dcbs[dcb->poll.thread.id]->thread.next;
|
||||
DCB *prev = this_unit.all_dcbs[dcb->poll.thread.id];
|
||||
|
||||
while (current)
|
||||
{
|
||||
if (current == dcb)
|
||||
{
|
||||
if (current == all_dcbs[dcb->poll.thread.id]->thread.tail)
|
||||
if (current == this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail)
|
||||
{
|
||||
all_dcbs[dcb->poll.thread.id]->thread.tail = prev;
|
||||
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = prev;
|
||||
}
|
||||
prev->thread.next = current->thread.next;
|
||||
break;
|
||||
@ -2757,7 +2766,7 @@ static void dcb_remove_from_list(DCB *dcb)
|
||||
*/
|
||||
void dcb_enable_session_timeouts()
|
||||
{
|
||||
check_timeouts = true;
|
||||
this_unit.check_timeouts = true;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -2768,13 +2777,13 @@ void dcb_enable_session_timeouts()
|
||||
*/
|
||||
void dcb_process_idle_sessions(int thr)
|
||||
{
|
||||
if (check_timeouts && hkheartbeat >= next_timeout_check)
|
||||
if (this_unit.check_timeouts && hkheartbeat >= this_thread.next_timeout_check)
|
||||
{
|
||||
/** Because the resolution of the timeout is one second, we only need to
|
||||
* check for it once per second. One heartbeat is 100 milliseconds. */
|
||||
next_timeout_check = hkheartbeat + 10;
|
||||
this_thread.next_timeout_check = hkheartbeat + 10;
|
||||
|
||||
for (DCB *dcb = all_dcbs[thr]; dcb; dcb = dcb->thread.next)
|
||||
for (DCB *dcb = this_unit.all_dcbs[thr]; dcb; dcb = dcb->thread.next)
|
||||
{
|
||||
if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER)
|
||||
{
|
||||
@ -2816,7 +2825,7 @@ public:
|
||||
{
|
||||
int thread_id = worker.id();
|
||||
|
||||
for (DCB *dcb = all_dcbs[thread_id]; dcb && atomic_load_int32(&m_more); dcb = dcb->thread.next)
|
||||
for (DCB *dcb = this_unit.all_dcbs[thread_id]; dcb && atomic_load_int32(&m_more); dcb = dcb->thread.next)
|
||||
{
|
||||
if (!m_func(dcb, m_data))
|
||||
{
|
||||
@ -2859,7 +2868,7 @@ public:
|
||||
{
|
||||
int thread_id = worker.id();
|
||||
|
||||
for (DCB *dcb = all_dcbs[thread_id]; dcb; dcb = dcb->thread.next)
|
||||
for (DCB *dcb = this_unit.all_dcbs[thread_id]; dcb; dcb = dcb->thread.next)
|
||||
{
|
||||
if (!m_func(dcb, m_data[thread_id]))
|
||||
{
|
||||
@ -3076,7 +3085,11 @@ static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t ev
|
||||
{
|
||||
DCB *dcb = (DCB*)data;
|
||||
|
||||
return dcb_process_poll_events(dcb, events);
|
||||
this_thread.current_dcb = dcb;
|
||||
uint32_t rv = dcb_process_poll_events(dcb, events);
|
||||
this_thread.current_dcb = NULL;
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
class FakeEventTask: public mxs::WorkerDisposableTask
|
||||
@ -3325,3 +3338,8 @@ int poll_remove_dcb(DCB *dcb)
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
DCB* dcb_get_current()
|
||||
{
|
||||
return this_thread.current_dcb;
|
||||
}
|
||||
|
Reference in New Issue
Block a user