|
|
|
@ -70,7 +70,6 @@ namespace
|
|
|
|
|
|
|
|
|
|
static struct
|
|
|
|
|
{
|
|
|
|
|
DCB dcb_initialized; /** A DCB with null values, used for initialization. */
|
|
|
|
|
DCB** all_dcbs; /** #workers sized array of pointers to DCBs where dcbs are listed. */
|
|
|
|
|
bool check_timeouts; /** Should session timeouts be checked. */
|
|
|
|
|
} this_unit;
|
|
|
|
@ -123,14 +122,6 @@ static int downstream_throttle_callback(DCB* dcb, DCB_REASON reason, void*
|
|
|
|
|
|
|
|
|
|
void dcb_global_init()
|
|
|
|
|
{
|
|
|
|
|
this_unit.dcb_initialized.fd = DCBFD_CLOSED;
|
|
|
|
|
this_unit.dcb_initialized.state = DCB_STATE_ALLOC;
|
|
|
|
|
this_unit.dcb_initialized.ssl_state = SSL_HANDSHAKE_UNKNOWN;
|
|
|
|
|
this_unit.dcb_initialized.poll.handler = dcb_poll_handler;
|
|
|
|
|
this_unit.dcb_initialized.high_water_reached = false;
|
|
|
|
|
this_unit.dcb_initialized.low_water = config_writeq_low_water();
|
|
|
|
|
this_unit.dcb_initialized.high_water = config_writeq_high_water();
|
|
|
|
|
|
|
|
|
|
int nthreads = config_threadcount();
|
|
|
|
|
|
|
|
|
|
if ((this_unit.all_dcbs = (DCB**)MXS_CALLOC(nthreads, sizeof(DCB*))) == NULL)
|
|
|
|
@ -150,20 +141,71 @@ uint64_t dcb_get_session_id(DCB* dcb)
|
|
|
|
|
return (dcb && dcb->session) ? dcb->session->ses_id : 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @brief Initialize a DCB
|
|
|
|
|
*
|
|
|
|
|
* This routine puts initial values into the fields of the DCB pointed to
|
|
|
|
|
* by the parameter.
|
|
|
|
|
*
|
|
|
|
|
* Most fields can be initialized by the assignment of the static
|
|
|
|
|
* initialized DCB. The exception is the bitmask.
|
|
|
|
|
*
|
|
|
|
|
* @param *dcb Pointer to the DCB to be initialized
|
|
|
|
|
*/
|
|
|
|
|
static void dcb_initialize(DCB* dcb)
|
|
|
|
|
static MXB_WORKER* get_dcb_owner(dcb_role_t role)
|
|
|
|
|
{
|
|
|
|
|
*dcb = this_unit.dcb_initialized;
|
|
|
|
|
MXB_WORKER* owner;
|
|
|
|
|
|
|
|
|
|
if (role == DCB_ROLE_SERVICE_LISTENER)
|
|
|
|
|
{
|
|
|
|
|
/** All listeners are owned by the main thread (i.e. thread no. 0) */
|
|
|
|
|
owner = RoutingWorker::get(RoutingWorker::MAIN);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
/** Otherwise the DCB is owned by the thread that allocates it */
|
|
|
|
|
mxb_assert(RoutingWorker::get_current_id() != -1);
|
|
|
|
|
owner = RoutingWorker::get_current();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return owner;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
DCB::DCB(dcb_role_t role, Listener* listener, SERVICE* service)
|
|
|
|
|
: MXB_POLL_DATA{dcb_poll_handler, get_dcb_owner(role)}
|
|
|
|
|
, dcb_role(role)
|
|
|
|
|
, listener(listener)
|
|
|
|
|
, high_water(config_writeq_high_water())
|
|
|
|
|
, low_water(config_writeq_low_water())
|
|
|
|
|
, service(service)
|
|
|
|
|
, last_read(mxs_clock())
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
DCB::~DCB()
|
|
|
|
|
{
|
|
|
|
|
if (data && authfunc.free)
|
|
|
|
|
{
|
|
|
|
|
authfunc.free(this);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (authfunc.destroy)
|
|
|
|
|
{
|
|
|
|
|
authfunc.destroy(authenticator_data);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
while (callbacks)
|
|
|
|
|
{
|
|
|
|
|
DCB_CALLBACK* tmp = callbacks;
|
|
|
|
|
callbacks = callbacks->next;
|
|
|
|
|
MXS_FREE(tmp);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (ssl)
|
|
|
|
|
{
|
|
|
|
|
SSL_free(ssl);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
MXS_FREE(protoname);
|
|
|
|
|
MXS_FREE(remote);
|
|
|
|
|
MXS_FREE(user);
|
|
|
|
|
MXS_FREE(path);
|
|
|
|
|
MXS_FREE(protocol);
|
|
|
|
|
gwbuf_free(delayq);
|
|
|
|
|
gwbuf_free(writeq);
|
|
|
|
|
gwbuf_free(readq);
|
|
|
|
|
gwbuf_free(fakeq);
|
|
|
|
|
|
|
|
|
|
owner = reinterpret_cast<MXB_WORKER*>(0xdeadbeef);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -182,34 +224,7 @@ static void dcb_initialize(DCB* dcb)
|
|
|
|
|
*/
|
|
|
|
|
DCB* dcb_alloc(dcb_role_t role, Listener* listener, SERVICE* service)
|
|
|
|
|
{
|
|
|
|
|
DCB* newdcb;
|
|
|
|
|
|
|
|
|
|
if ((newdcb = (DCB*)MXS_MALLOC(sizeof(*newdcb))) == NULL)
|
|
|
|
|
{
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
dcb_initialize(newdcb);
|
|
|
|
|
newdcb->dcb_role = role;
|
|
|
|
|
newdcb->listener = listener;
|
|
|
|
|
newdcb->service = service;
|
|
|
|
|
newdcb->last_read = mxs_clock();
|
|
|
|
|
newdcb->low_water = config_writeq_low_water();
|
|
|
|
|
newdcb->high_water = config_writeq_high_water();
|
|
|
|
|
|
|
|
|
|
if (role == DCB_ROLE_SERVICE_LISTENER)
|
|
|
|
|
{
|
|
|
|
|
/** All listeners are owned by the main thread (i.e. thread no. 0) */
|
|
|
|
|
newdcb->poll.owner = RoutingWorker::get(RoutingWorker::MAIN);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
/** Otherwise the DCB is owned by the thread that allocates it */
|
|
|
|
|
mxb_assert(RoutingWorker::get_current_id() != -1);
|
|
|
|
|
newdcb->poll.owner = RoutingWorker::get_current();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return newdcb;
|
|
|
|
|
return new(std::nothrow) DCB(role, listener, service);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -273,76 +288,7 @@ void dcb_free_all_memory(DCB* dcb)
|
|
|
|
|
this_thread.current_dcb = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
DCB_CALLBACK* cb_dcb;
|
|
|
|
|
|
|
|
|
|
if (dcb->protocol)
|
|
|
|
|
{
|
|
|
|
|
MXS_FREE(dcb->protocol);
|
|
|
|
|
}
|
|
|
|
|
if (dcb->data && dcb->authfunc.free)
|
|
|
|
|
{
|
|
|
|
|
dcb->authfunc.free(dcb);
|
|
|
|
|
dcb->data = NULL;
|
|
|
|
|
}
|
|
|
|
|
if (dcb->authfunc.destroy)
|
|
|
|
|
{
|
|
|
|
|
dcb->authfunc.destroy(dcb->authenticator_data);
|
|
|
|
|
dcb->authenticator_data = NULL;
|
|
|
|
|
}
|
|
|
|
|
if (dcb->protoname)
|
|
|
|
|
{
|
|
|
|
|
MXS_FREE(dcb->protoname);
|
|
|
|
|
}
|
|
|
|
|
if (dcb->remote)
|
|
|
|
|
{
|
|
|
|
|
MXS_FREE(dcb->remote);
|
|
|
|
|
}
|
|
|
|
|
if (dcb->user)
|
|
|
|
|
{
|
|
|
|
|
MXS_FREE(dcb->user);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Clear write and read buffers */
|
|
|
|
|
if (dcb->delayq)
|
|
|
|
|
{
|
|
|
|
|
gwbuf_free(dcb->delayq);
|
|
|
|
|
dcb->delayq = NULL;
|
|
|
|
|
}
|
|
|
|
|
if (dcb->writeq)
|
|
|
|
|
{
|
|
|
|
|
gwbuf_free(dcb->writeq);
|
|
|
|
|
dcb->writeq = NULL;
|
|
|
|
|
}
|
|
|
|
|
if (dcb->readq)
|
|
|
|
|
{
|
|
|
|
|
gwbuf_free(dcb->readq);
|
|
|
|
|
dcb->readq = NULL;
|
|
|
|
|
}
|
|
|
|
|
if (dcb->fakeq)
|
|
|
|
|
{
|
|
|
|
|
gwbuf_free(dcb->fakeq);
|
|
|
|
|
dcb->fakeq = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
while ((cb_dcb = dcb->callbacks) != NULL)
|
|
|
|
|
{
|
|
|
|
|
dcb->callbacks = cb_dcb->next;
|
|
|
|
|
MXS_FREE(cb_dcb);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (dcb->ssl)
|
|
|
|
|
{
|
|
|
|
|
SSL_free(dcb->ssl);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (dcb->path)
|
|
|
|
|
{
|
|
|
|
|
MXS_FREE(dcb->path);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Ensure that id is immediately the wrong one.
|
|
|
|
|
dcb->poll.owner = reinterpret_cast<MXB_WORKER*>(0xdeadbeef);
|
|
|
|
|
MXS_FREE(dcb);
|
|
|
|
|
delete dcb;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -390,7 +336,7 @@ DCB* dcb_connect(SERVER* server, MXS_SESSION* session, const char* protocol)
|
|
|
|
|
user,
|
|
|
|
|
session->client_dcb->remote,
|
|
|
|
|
protocol,
|
|
|
|
|
static_cast<RoutingWorker*>(session->client_dcb->poll.owner)->id());
|
|
|
|
|
static_cast<RoutingWorker*>(session->client_dcb->owner)->id());
|
|
|
|
|
if (dcb)
|
|
|
|
|
{
|
|
|
|
|
/**
|
|
|
|
@ -1102,7 +1048,7 @@ void dcb_close(DCB* dcb)
|
|
|
|
|
{
|
|
|
|
|
#if defined (SS_DEBUG)
|
|
|
|
|
RoutingWorker* current = RoutingWorker::get_current();
|
|
|
|
|
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->poll.owner);
|
|
|
|
|
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->owner);
|
|
|
|
|
if (current && (current != owner))
|
|
|
|
|
{
|
|
|
|
|
MXS_ALERT("dcb_close(%p) called by %d, owned by %d.",
|
|
|
|
@ -1152,7 +1098,7 @@ void dcb_close(DCB* dcb)
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
RoutingWorker* worker = static_cast<RoutingWorker*>(dcb->poll.owner);
|
|
|
|
|
RoutingWorker* worker = static_cast<RoutingWorker*>(dcb->owner);
|
|
|
|
|
mxb_assert(worker);
|
|
|
|
|
|
|
|
|
|
worker->register_zombie(dcb);
|
|
|
|
@ -1183,7 +1129,7 @@ void dcb_close_in_owning_thread(DCB* dcb)
|
|
|
|
|
// TODO: reference counted, so that we could addref before posting, thus
|
|
|
|
|
// TODO: preventing too early a deletion.
|
|
|
|
|
|
|
|
|
|
MXB_WORKER* worker = static_cast<MXB_WORKER*>(dcb->poll.owner); // The owning worker
|
|
|
|
|
MXB_WORKER* worker = static_cast<MXB_WORKER*>(dcb->owner); // The owning worker
|
|
|
|
|
mxb_assert(worker);
|
|
|
|
|
|
|
|
|
|
intptr_t arg1 = (intptr_t)cb_dcb_close_in_owning_thread;
|
|
|
|
@ -1199,7 +1145,7 @@ void dcb_final_close(DCB* dcb)
|
|
|
|
|
{
|
|
|
|
|
#if defined (SS_DEBUG)
|
|
|
|
|
RoutingWorker* current = RoutingWorker::get_current();
|
|
|
|
|
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->poll.owner);
|
|
|
|
|
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->owner);
|
|
|
|
|
if (current && (current != owner))
|
|
|
|
|
{
|
|
|
|
|
MXS_ALERT("dcb_final_close(%p) called by %d, owned by %d.",
|
|
|
|
@ -1296,7 +1242,7 @@ void dcb_final_close(DCB* dcb)
|
|
|
|
|
*/
|
|
|
|
|
static bool dcb_maybe_add_persistent(DCB* dcb)
|
|
|
|
|
{
|
|
|
|
|
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->poll.owner);
|
|
|
|
|
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->owner);
|
|
|
|
|
if (dcb->user != NULL
|
|
|
|
|
&& (dcb->func.established == NULL || dcb->func.established(dcb))
|
|
|
|
|
&& strlen(dcb->user)
|
|
|
|
@ -2776,7 +2722,7 @@ static void dcb_add_to_list_cb(int thread_id, void* data)
|
|
|
|
|
{
|
|
|
|
|
DCB* dcb = (DCB*)data;
|
|
|
|
|
|
|
|
|
|
mxb_assert(thread_id == static_cast<RoutingWorker*>(dcb->poll.owner)->id());
|
|
|
|
|
mxb_assert(thread_id == static_cast<RoutingWorker*>(dcb->owner)->id());
|
|
|
|
|
|
|
|
|
|
dcb_add_to_list(dcb);
|
|
|
|
|
}
|
|
|
|
@ -2791,7 +2737,7 @@ static void dcb_add_to_list(DCB* dcb)
|
|
|
|
|
* is not in the list. Stopped listeners are not removed from the list.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
int id = static_cast<RoutingWorker*>(dcb->poll.owner)->id();
|
|
|
|
|
int id = static_cast<RoutingWorker*>(dcb->owner)->id();
|
|
|
|
|
mxb_assert(id == RoutingWorker::get_current_id());
|
|
|
|
|
|
|
|
|
|
if (this_unit.all_dcbs[id] == NULL)
|
|
|
|
@ -2814,7 +2760,7 @@ static void dcb_add_to_list(DCB* dcb)
|
|
|
|
|
*/
|
|
|
|
|
static void dcb_remove_from_list(DCB* dcb)
|
|
|
|
|
{
|
|
|
|
|
int id = static_cast<RoutingWorker*>(dcb->poll.owner)->id();
|
|
|
|
|
int id = static_cast<RoutingWorker*>(dcb->owner)->id();
|
|
|
|
|
|
|
|
|
|
if (dcb == this_unit.all_dcbs[id])
|
|
|
|
|
{
|
|
|
|
@ -2995,7 +2941,7 @@ int dcb_get_port(const DCB* dcb)
|
|
|
|
|
|
|
|
|
|
static uint32_t dcb_process_poll_events(DCB* dcb, uint32_t events)
|
|
|
|
|
{
|
|
|
|
|
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->poll.owner);
|
|
|
|
|
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->owner);
|
|
|
|
|
mxb_assert(owner == RoutingWorker::get_current()
|
|
|
|
|
|| dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER);
|
|
|
|
|
|
|
|
|
@ -3307,7 +3253,7 @@ static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, uint32_t ev)
|
|
|
|
|
|
|
|
|
|
if (task)
|
|
|
|
|
{
|
|
|
|
|
RoutingWorker* worker = static_cast<RoutingWorker*>(dcb->poll.owner);
|
|
|
|
|
RoutingWorker* worker = static_cast<RoutingWorker*>(dcb->owner);
|
|
|
|
|
worker->execute(std::unique_ptr<FakeEventTask>(task), Worker::EXECUTE_QUEUED);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
@ -3407,7 +3353,7 @@ public:
|
|
|
|
|
{
|
|
|
|
|
RoutingWorker& rworker = static_cast<RoutingWorker&>(worker);
|
|
|
|
|
|
|
|
|
|
mxb_assert(rworker.id() == static_cast<RoutingWorker*>(m_dcb->poll.owner)->id());
|
|
|
|
|
mxb_assert(rworker.id() == static_cast<RoutingWorker*>(m_dcb->owner)->id());
|
|
|
|
|
|
|
|
|
|
bool added = dcb_add_to_worker(&rworker, m_dcb, m_events);
|
|
|
|
|
mxb_assert(added);
|
|
|
|
@ -3469,7 +3415,7 @@ static bool dcb_add_to_worker(Worker* worker, DCB* dcb, uint32_t events)
|
|
|
|
|
{
|
|
|
|
|
// If this takes place on the main thread (all listening DCBs are
|
|
|
|
|
// stored on the main thread)...
|
|
|
|
|
if (dcb->poll.owner == RoutingWorker::get_current())
|
|
|
|
|
if (dcb->owner == RoutingWorker::get_current())
|
|
|
|
|
{
|
|
|
|
|
// ..we'll add it immediately to the list,
|
|
|
|
|
dcb_add_to_list(dcb);
|
|
|
|
@ -3478,7 +3424,7 @@ static bool dcb_add_to_worker(Worker* worker, DCB* dcb, uint32_t events)
|
|
|
|
|
{
|
|
|
|
|
// otherwise we must move the adding to the main thread.
|
|
|
|
|
// TODO: Separate listening and other DCBs, as this is a mess.
|
|
|
|
|
RoutingWorker* worker = static_cast<RoutingWorker*>(dcb->poll.owner);
|
|
|
|
|
RoutingWorker* worker = static_cast<RoutingWorker*>(dcb->owner);
|
|
|
|
|
mxb_assert(worker);
|
|
|
|
|
|
|
|
|
|
intptr_t arg1 = (intptr_t)dcb_add_to_list_cb;
|
|
|
|
@ -3495,7 +3441,7 @@ static bool dcb_add_to_worker(Worker* worker, DCB* dcb, uint32_t events)
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
mxb_assert(worker == dcb->poll.owner);
|
|
|
|
|
mxb_assert(worker == dcb->owner);
|
|
|
|
|
|
|
|
|
|
if (worker == RoutingWorker::get_current())
|
|
|
|
|
{
|
|
|
|
@ -3517,7 +3463,7 @@ static bool dcb_add_to_worker(Worker* worker, DCB* dcb, uint32_t events)
|
|
|
|
|
|
|
|
|
|
if (task)
|
|
|
|
|
{
|
|
|
|
|
Worker* worker = static_cast<RoutingWorker*>(dcb->poll.owner);
|
|
|
|
|
Worker* worker = static_cast<RoutingWorker*>(dcb->owner);
|
|
|
|
|
mxb_assert(worker);
|
|
|
|
|
|
|
|
|
|
if (worker->execute(std::unique_ptr<AddDcbToWorker>(task), Worker::EXECUTE_QUEUED))
|
|
|
|
@ -3580,7 +3526,7 @@ int poll_add_dcb(DCB* dcb)
|
|
|
|
|
else if (dcb->state == DCB_STATE_NOPOLLING)
|
|
|
|
|
{
|
|
|
|
|
// This DCB was removed and added back to epoll. Assign it to the same worker it started with.
|
|
|
|
|
owner = static_cast<RoutingWorker*>(dcb->poll.owner);
|
|
|
|
|
owner = static_cast<RoutingWorker*>(dcb->owner);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
@ -3589,16 +3535,16 @@ int poll_add_dcb(DCB* dcb)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
new_state = DCB_STATE_POLLING;
|
|
|
|
|
dcb->poll.owner = owner;
|
|
|
|
|
dcb->owner = owner;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
mxb_assert(dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
|
|
|
|
|
mxb_assert(RoutingWorker::get_current_id() != -1);
|
|
|
|
|
mxb_assert(RoutingWorker::get_current() == dcb->poll.owner);
|
|
|
|
|
mxb_assert(RoutingWorker::get_current() == dcb->owner);
|
|
|
|
|
|
|
|
|
|
new_state = DCB_STATE_POLLING;
|
|
|
|
|
owner = static_cast<RoutingWorker*>(dcb->poll.owner);
|
|
|
|
|
owner = static_cast<RoutingWorker*>(dcb->owner);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -3619,7 +3565,7 @@ int poll_add_dcb(DCB* dcb)
|
|
|
|
|
* debug asserts will be triggered.
|
|
|
|
|
*/
|
|
|
|
|
dcb->state = old_state;
|
|
|
|
|
dcb->poll.owner = RoutingWorker::get_current();
|
|
|
|
|
dcb->owner = RoutingWorker::get_current();
|
|
|
|
|
rc = -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -3669,7 +3615,7 @@ int poll_remove_dcb(DCB* dcb)
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
Worker* worker = static_cast<Worker*>(dcb->poll.owner);
|
|
|
|
|
Worker* worker = static_cast<Worker*>(dcb->owner);
|
|
|
|
|
mxb_assert(worker);
|
|
|
|
|
|
|
|
|
|
if (worker->remove_fd(dcbfd))
|
|
|
|
|