diff --git a/include/maxscale/dcb.h b/include/maxscale/dcb.h index 1abcf9c5a..84eb7e0eb 100644 --- a/include/maxscale/dcb.h +++ b/include/maxscale/dcb.h @@ -293,6 +293,14 @@ void dcb_foreach_parallel(bool (*func)(DCB *dcb, void *data), void **data); */ int dcb_get_port(const DCB *dcb); +/** + * @brief Return the DCB currently being handled by the calling thread. + * + * @return A DCB, or NULL if the calling thread is not currently handling + * a DCB or if the calling thread is not a polling/worker thread. + */ +DCB* dcb_get_current(); + /** * DCB flags values */ diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 5887ab1b4..dd0bfed52 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -66,26 +66,35 @@ using maxscale::Worker; using maxscale::WorkerTask; using maxscale::Semaphore; -/* A DCB with null values, used for initialization */ -static DCB dcb_initialized; +namespace +{ -static DCB **all_dcbs; +struct +{ + DCB dcb_initialized; /** A DCB with null values, used for initialization. */ + DCB** all_dcbs; /** #workers sized array of pointers to DCBs where dcbs are listed. */ + bool check_timeouts; /** Should session timeouts be checked. */ +} this_unit; -/** Variables for session timeout checks */ -bool check_timeouts = false; -thread_local long next_timeout_check = 0; +thread_local struct +{ + long next_timeout_check; /** When to next check for idle sessions. */ + DCB* current_dcb; /** The DCB currently being handled by event handlers. */ +} this_thread; + +} void dcb_global_init() { - dcb_initialized.dcb_chk_top = CHK_NUM_DCB; - dcb_initialized.fd = DCBFD_CLOSED; - dcb_initialized.state = DCB_STATE_ALLOC; - dcb_initialized.ssl_state = SSL_HANDSHAKE_UNKNOWN; - dcb_initialized.dcb_chk_tail = CHK_NUM_DCB; + this_unit.dcb_initialized.dcb_chk_top = CHK_NUM_DCB; + this_unit.dcb_initialized.fd = DCBFD_CLOSED; + this_unit.dcb_initialized.state = DCB_STATE_ALLOC; + this_unit.dcb_initialized.ssl_state = SSL_HANDSHAKE_UNKNOWN; + this_unit.dcb_initialized.dcb_chk_tail = CHK_NUM_DCB; int nthreads = config_threadcount(); - if ((all_dcbs = (DCB**)MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL) + if ((this_unit.all_dcbs = (DCB**)MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL) { MXS_OOM(); raise(SIGABRT); @@ -148,7 +157,7 @@ uint64_t dcb_get_session_id(DCB *dcb) static void dcb_initialize(DCB *dcb) { - *dcb = dcb_initialized; + *dcb = this_unit.dcb_initialized; dcb->poll.handler = dcb_poll_handler; } @@ -1891,7 +1900,7 @@ dcb_isvalid(DCB *dcb) static void dcb_hangup_foreach_worker(int thread_id, struct server* server) { - for (DCB *dcb = all_dcbs[thread_id]; dcb; dcb = dcb->thread.next) + for (DCB *dcb = this_unit.all_dcbs[thread_id]; dcb; dcb = dcb->thread.next) { if (dcb->state == DCB_STATE_POLLING && dcb->server && dcb->server == server) @@ -2679,15 +2688,15 @@ void dcb_add_to_list(DCB *dcb) if (worker_id == dcb->poll.thread.id) { - if (all_dcbs[dcb->poll.thread.id] == NULL) + if (this_unit.all_dcbs[dcb->poll.thread.id] == NULL) { - all_dcbs[dcb->poll.thread.id] = dcb; - all_dcbs[dcb->poll.thread.id]->thread.tail = dcb; + this_unit.all_dcbs[dcb->poll.thread.id] = dcb; + this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb; } else { - all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb; - all_dcbs[dcb->poll.thread.id]->thread.tail = dcb; + this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb; + this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb; } } else @@ -2715,28 +2724,28 @@ static void dcb_remove_from_list(DCB *dcb) { ss_dassert(Worker::get_current_id() == dcb->poll.thread.id); - if (dcb == all_dcbs[dcb->poll.thread.id]) + if (dcb == this_unit.all_dcbs[dcb->poll.thread.id]) { - DCB *tail = all_dcbs[dcb->poll.thread.id]->thread.tail; - all_dcbs[dcb->poll.thread.id] = all_dcbs[dcb->poll.thread.id]->thread.next; + DCB *tail = this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail; + this_unit.all_dcbs[dcb->poll.thread.id] = this_unit.all_dcbs[dcb->poll.thread.id]->thread.next; - if (all_dcbs[dcb->poll.thread.id]) + if (this_unit.all_dcbs[dcb->poll.thread.id]) { - all_dcbs[dcb->poll.thread.id]->thread.tail = tail; + this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = tail; } } else { - DCB *current = all_dcbs[dcb->poll.thread.id]->thread.next; - DCB *prev = all_dcbs[dcb->poll.thread.id]; + DCB *current = this_unit.all_dcbs[dcb->poll.thread.id]->thread.next; + DCB *prev = this_unit.all_dcbs[dcb->poll.thread.id]; while (current) { if (current == dcb) { - if (current == all_dcbs[dcb->poll.thread.id]->thread.tail) + if (current == this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail) { - all_dcbs[dcb->poll.thread.id]->thread.tail = prev; + this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = prev; } prev->thread.next = current->thread.next; break; @@ -2757,7 +2766,7 @@ static void dcb_remove_from_list(DCB *dcb) */ void dcb_enable_session_timeouts() { - check_timeouts = true; + this_unit.check_timeouts = true; } /** @@ -2768,13 +2777,13 @@ void dcb_enable_session_timeouts() */ void dcb_process_idle_sessions(int thr) { - if (check_timeouts && hkheartbeat >= next_timeout_check) + if (this_unit.check_timeouts && hkheartbeat >= this_thread.next_timeout_check) { /** Because the resolution of the timeout is one second, we only need to * check for it once per second. One heartbeat is 100 milliseconds. */ - next_timeout_check = hkheartbeat + 10; + this_thread.next_timeout_check = hkheartbeat + 10; - for (DCB *dcb = all_dcbs[thr]; dcb; dcb = dcb->thread.next) + for (DCB *dcb = this_unit.all_dcbs[thr]; dcb; dcb = dcb->thread.next) { if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER) { @@ -2816,7 +2825,7 @@ public: { int thread_id = worker.id(); - for (DCB *dcb = all_dcbs[thread_id]; dcb && atomic_load_int32(&m_more); dcb = dcb->thread.next) + for (DCB *dcb = this_unit.all_dcbs[thread_id]; dcb && atomic_load_int32(&m_more); dcb = dcb->thread.next) { if (!m_func(dcb, m_data)) { @@ -2859,7 +2868,7 @@ public: { int thread_id = worker.id(); - for (DCB *dcb = all_dcbs[thread_id]; dcb; dcb = dcb->thread.next) + for (DCB *dcb = this_unit.all_dcbs[thread_id]; dcb; dcb = dcb->thread.next) { if (!m_func(dcb, m_data[thread_id])) { @@ -3076,7 +3085,11 @@ static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t ev { DCB *dcb = (DCB*)data; - return dcb_process_poll_events(dcb, events); + this_thread.current_dcb = dcb; + uint32_t rv = dcb_process_poll_events(dcb, events); + this_thread.current_dcb = NULL; + + return rv; } class FakeEventTask: public mxs::WorkerDisposableTask @@ -3325,3 +3338,8 @@ int poll_remove_dcb(DCB *dcb) } return rc; } + +DCB* dcb_get_current() +{ + return this_thread.current_dcb; +}