diff --git a/include/maxscale/dcb.h b/include/maxscale/dcb.h index e3d2353b0..c6c7052fb 100644 --- a/include/maxscale/dcb.h +++ b/include/maxscale/dcb.h @@ -228,6 +228,8 @@ typedef struct dcb } thread; uint32_t n_close; /** How many times dcb_close has been called. */ char* path; /** If a Unix socket, the path it was bound to. */ + + uint64_t m_uid; /**< Unique identifier for this DCB */ } DCB; /** diff --git a/server/core/dcb.cc b/server/core/dcb.cc index d618619d0..ac1e8ac24 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -52,6 +52,8 @@ #include #include +#include + #include "internal/modules.h" #include "internal/session.h" @@ -79,6 +81,7 @@ static struct DCB dcb_initialized; /** A DCB with null values, used for initialization. */ DCB** all_dcbs; /** #workers sized array of pointers to DCBs where dcbs are listed. */ bool check_timeouts; /** Should session timeouts be checked. */ + std::atomic uid_generator {0}; } this_unit; static thread_local struct @@ -201,6 +204,7 @@ DCB* dcb_alloc(dcb_role_t role, SERV_LISTENER* listener) newdcb->last_read = mxs_clock(); newdcb->low_water = config_writeq_low_water(); newdcb->high_water = config_writeq_high_water(); + newdcb->m_uid = this_unit.uid_generator.fetch_add(1, std::memory_order_relaxed); if (role == DCB_ROLE_SERVICE_LISTENER) { @@ -3282,6 +3286,7 @@ public: : m_dcb(dcb) , m_buffer(buf) , m_ev(ev) + , m_uid(dcb->m_uid) { } @@ -3290,7 +3295,7 @@ public: mxb_assert(&worker == RoutingWorker::get_current()); RoutingWorker& rworker = static_cast(worker); - if (dcb_is_still_valid(m_dcb, rworker.id())) + if (dcb_is_still_valid(m_dcb, rworker.id()) && m_dcb->m_uid == m_uid) { m_dcb->fakeq = m_buffer; dcb_handler(m_dcb, m_ev); @@ -3305,6 +3310,7 @@ private: DCB* m_dcb; GWBUF* m_buffer; uint32_t m_ev; + uint64_t m_uid; /**< DCB UID guarantees we deliver the event to the correct DCB */ }; static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, uint32_t ev)