Assign worker ID in dcb_alloc

Apart from listeners, all DCBs will be assigned to the current
thread. This simplifies the addition of DCBs to worker threads.

Also performed a small cleanup of poll_add_dcb to make it more readable.
This commit is contained in:
Markus Mäkelä
2017-09-12 11:22:32 +03:00
parent d2f790a06c
commit b18c0ed0b8

View File

@ -196,6 +196,18 @@ dcb_alloc(dcb_role_t role, SERV_LISTENER *listener)
newdcb->listener = listener; newdcb->listener = listener;
newdcb->last_read = hkheartbeat; newdcb->last_read = hkheartbeat;
if (role == DCB_ROLE_SERVICE_LISTENER)
{
/** All listeners are owned by the main thread (i.e. thread no. 0) */
newdcb->poll.thread.id = 0;
}
else
{
/** Otherwise the DCB is owned by the thread that allocates it */
ss_dassert(Worker::get_current_id() != -1);
newdcb->poll.thread.id = Worker::get_current_id();
}
return newdcb; return newdcb;
} }
@ -3273,14 +3285,31 @@ dcb_session_check(DCB *dcb, const char *function)
} }
} }
/** DCB Sanity checks */
static inline void dcb_sanity_check(DCB* dcb)
{
CHK_DCB(dcb);
if (dcb->state == DCB_STATE_DISCONNECTED || dcb->state == DCB_STATE_UNDEFINED)
{
MXS_ERROR("%lu [poll_add_dcb] Error : existing state of dcb %p "
"is %s, but this should be impossible, crashing.",
thread_self(), dcb, STRDCBSTATE(dcb->state));
raise(SIGABRT);
}
else if (dcb->state == DCB_STATE_POLLING || dcb->state == DCB_STATE_LISTENING)
{
MXS_ERROR("%lu [poll_add_dcb] Error : existing state of dcb %p "
"is %s, but this is probably an error, not crashing.",
thread_self(), dcb, STRDCBSTATE(dcb->state));
}
}
int poll_add_dcb(DCB *dcb) int poll_add_dcb(DCB *dcb)
{ {
int rc = -1; dcb_sanity_check(dcb);
dcb_state_t old_state = dcb->state;
dcb_state_t new_state;
uint32_t events = 0;
CHK_DCB(dcb); uint32_t events = 0;
#ifdef EPOLLRDHUP #ifdef EPOLLRDHUP
events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET; events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET;
@ -3288,14 +3317,11 @@ int poll_add_dcb(DCB *dcb)
events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET; events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET;
#endif #endif
/*< /** Choose new state and worker thread ID according to the role of DCB. */
* Choose new state according to the role of dcb. dcb_state_t new_state;
*/ int worker_id = 0;
if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER || dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER)
{ if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER)
new_state = DCB_STATE_POLLING;
}
else
{ {
/** /**
* Listeners are always added in level triggered mode. This will cause * Listeners are always added in level triggered mode. This will cause
@ -3303,72 +3329,45 @@ int poll_add_dcb(DCB *dcb)
* to accept. * to accept.
*/ */
events = EPOLLIN; events = EPOLLIN;
ss_dassert(dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER);
new_state = DCB_STATE_LISTENING; new_state = DCB_STATE_LISTENING;
}
/*
* Check DCB current state seems sensible
*/
if (DCB_STATE_DISCONNECTED == dcb->state
|| DCB_STATE_UNDEFINED == dcb->state)
{
MXS_ERROR("%lu [poll_add_dcb] Error : existing state of dcb %p "
"is %s, but this should be impossible, crashing.",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state));
raise(SIGABRT);
}
if (DCB_STATE_POLLING == dcb->state
|| DCB_STATE_LISTENING == dcb->state)
{
MXS_ERROR("%lu [poll_add_dcb] Error : existing state of dcb %p "
"is %s, but this is probably an error, not crashing.",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state));
}
dcb->state = new_state;
/*
* The only possible failure that will not cause a crash is
* running out of system resources.
*/
int worker_id = 0;
if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER)
{
worker_id = MXS_WORKER_ALL; worker_id = MXS_WORKER_ALL;
} }
else if (dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER)
{
ss_dassert(Worker::get_current_id() != -1);
worker_id = dcb->session->client_dcb->poll.thread.id;
ss_dassert(worker_id == Worker::get_current_id());
}
else else
{ {
ss_dassert(dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER ||
dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
ss_dassert(Worker::get_current_id() != -1); ss_dassert(Worker::get_current_id() != -1);
ss_dassert(Worker::get_current_id() == dcb->poll.thread.id);
worker_id = Worker::get_current_id(); new_state = DCB_STATE_POLLING;
worker_id = dcb->poll.thread.id;
} }
/**
* Assign the new state before adding the DCB to the worker and store the
* old state in case we need to revert it.
*/
dcb_state_t old_state = dcb->state;
dcb->state = new_state;
int rc = 0;
if (poll_add_fd_to_worker(worker_id, dcb->fd, events, (MXS_POLL_DATA*)dcb)) if (poll_add_fd_to_worker(worker_id, dcb->fd, events, (MXS_POLL_DATA*)dcb))
{ {
dcb_add_to_list(dcb); dcb_add_to_list(dcb);
MXS_DEBUG("%lu [poll_add_dcb] Added dcb %p in state %s to poll set.", MXS_DEBUG("%lu [poll_add_dcb] Added dcb %p in state %s to poll set.",
pthread_self(), thread_self(), dcb, STRDCBSTATE(dcb->state));
dcb,
STRDCBSTATE(dcb->state));
rc = 0;
} }
else else
{ {
/**
* We failed to add the DCB to a worker. Revert the state so that it
* will be treated as a DCB in the correct state.
*/
dcb->state = old_state; dcb->state = old_state;
rc = -1; rc = -1;
} }
return rc; return rc;
} }