MXS-1875 Add fd and make note of DCB simultaneously

The fact that a client dcb was immediately added to the epoll-
instance of the relevant worker (possible, since that is thread-
safe), but was added to the book-keeping via the message mechanism
(necessary, since that is not thread-safe), meant that if the
connection was closed before the message was delivered, the handling
of the message then caused an access error.

Now the fd is also added to the epoll-instance via the messaging
mechanism, so the problem can no longer occur. The only fds this
affects are connections made to maxadmin or maxinfo as they are
always handled by the main thread due to deadlock issues.
This commit is contained in:
Johan Wikman 2018-05-21 17:47:44 +03:00
parent 096f5e4dc6
commit f166b50b37
2 changed files with 117 additions and 38 deletions

View File

@ -232,13 +232,6 @@ void dcb_close(DCB *);
*/
void dcb_close_in_owning_thread(DCB *dcb);
/**
* Add a DCB to the owner's list
*
* @param dcb DCB to add
*/
void dcb_add_to_list(DCB *dcb);
void printAllDCBs(); /* Debug to print all DCB in the system */
void printDCB(DCB *); /* Debug print routine */
void dprintDCBList(DCB *); /* Debug print DCB list statistics */

View File

@ -114,6 +114,8 @@ static int dcb_listen_create_socket_inet(const char *host, uint16_t port);
static int dcb_listen_create_socket_unix(const char *path);
static int dcb_set_socket_option(int sockfd, int level, int optname, void *optval, socklen_t optlen);
static void dcb_add_to_all_list(DCB *dcb);
static void dcb_add_to_list(DCB *dcb);
static bool dcb_add_to_worker(int worker_id, DCB *dcb, uint32_t events);
static DCB *dcb_find_free();
static void dcb_remove_from_list(DCB *dcb);
@ -2754,7 +2756,7 @@ dcb_role_name(DCB *dcb)
return name;
}
static void dcb_add_to_worker_list(int thread_id, void* data)
static void dcb_add_to_list_cb(int thread_id, void* data)
{
DCB *dcb = (DCB*)data;
@ -2763,7 +2765,7 @@ static void dcb_add_to_worker_list(int thread_id, void* data)
dcb_add_to_list(dcb);
}
void dcb_add_to_list(DCB *dcb)
static void dcb_add_to_list(DCB *dcb)
{
if (dcb->dcb_role != DCB_ROLE_SERVICE_LISTENER ||
(dcb->thread.next == NULL && dcb->thread.tail == NULL))
@ -2773,33 +2775,17 @@ void dcb_add_to_list(DCB *dcb)
* is not in the list. Stopped listeners are not removed from the list.
*/
int worker_id = Worker::get_current_id();
ss_dassert(dcb->poll.thread.id == Worker::get_current_id());
if (worker_id == dcb->poll.thread.id)
if (this_unit.all_dcbs[dcb->poll.thread.id] == NULL)
{
if (this_unit.all_dcbs[dcb->poll.thread.id] == NULL)
{
this_unit.all_dcbs[dcb->poll.thread.id] = dcb;
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
}
else
{
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb;
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
}
this_unit.all_dcbs[dcb->poll.thread.id] = dcb;
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
}
else
{
Worker* worker = Worker::get(dcb->poll.thread.id);
ss_dassert(worker);
intptr_t arg1 = (intptr_t)dcb_add_to_worker_list;
intptr_t arg2 = (intptr_t)dcb;
if (!worker->post_message(MXS_WORKER_MSG_CALL, arg1, arg2))
{
MXS_ERROR("Could not post DCB to worker.");
}
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail->thread.next = dcb;
this_unit.all_dcbs[dcb->poll.thread.id]->thread.tail = dcb;
}
}
}
@ -3392,6 +3378,112 @@ static inline void dcb_sanity_check(DCB* dcb)
}
}
namespace
{
class AddDcbToWorker: public mxs::WorkerDisposableTask
{
public:
AddDcbToWorker(const AddDcbToWorker&) = delete;
AddDcbToWorker& operator=(const AddDcbToWorker&) = delete;
AddDcbToWorker(DCB* dcb, uint32_t events)
: m_dcb(dcb)
, m_events(events)
{
}
void execute(Worker& worker)
{
ss_dassert(worker.id() == m_dcb->poll.thread.id);
dcb_add_to_worker(worker.id(), m_dcb, m_events);
}
private:
DCB* m_dcb;
uint32_t m_events;
};
}
static bool dcb_add_to_worker(int worker_id, DCB* dcb, uint32_t events)
{
bool rv = false;
if (worker_id == MXS_WORKER_ALL)
{
// A listening DCB, we add it immediately (poll_add_fd_to_worker() is thread-safe).
if (poll_add_fd_to_worker(worker_id, dcb->fd, events, (MXS_POLL_DATA*)dcb))
{
// If this takes place on the main thread (all listening DCBs are
// stored on the main thread),
if (dcb->poll.thread.id == Worker::get_current_id())
{
// we'll add it immediatelt to the list,
dcb_add_to_list(dcb);
}
else
{
// otherwise we must move the adding to the main thread.
// TODO: Separate listening and other DCBs, as this is a mess.
Worker* worker = Worker::get(dcb->poll.thread.id);
ss_dassert(worker);
intptr_t arg1 = (intptr_t)dcb_add_to_list_cb;
intptr_t arg2 = (intptr_t)dcb;
if (!worker->post_message(MXS_WORKER_MSG_CALL, arg1, arg2))
{
MXS_ERROR("Could not post DCB to worker.");
}
}
rv = true;
}
}
else
{
ss_dassert(worker_id == dcb->poll.thread.id);
if (worker_id == Worker::get_current_id())
{
// If the DCB should end up on the current thread, we can both add it
// to the epoll-instance and to the DCB book-keeping immediately.
if (poll_add_fd_to_worker(worker_id, dcb->fd, events, (MXS_POLL_DATA*)dcb))
{
dcb_add_to_list(dcb);
rv = true;
}
}
else
{
// Otherwise we'll move the whole operation to the correct worker.
// This will only happen for "cli" and "maxinfo" services that must
// be served by one thread as there otherwise deadlocks can occur.
AddDcbToWorker* task = new (std::nothrow) AddDcbToWorker(dcb, events);
ss_dassert(task);
if (task)
{
Worker* worker = Worker::get(dcb->poll.thread.id);
ss_dassert(worker);
if (worker->post(std::auto_ptr<AddDcbToWorker>(task), mxs::Worker::EXECUTE_QUEUED))
{
MXS_ERROR("Could not post DCB to worker.");
rv = true;
}
}
else
{
MXS_OOM();
}
}
}
return rv;
}
int poll_add_dcb(DCB *dcb)
{
dcb_sanity_check(dcb);
@ -3452,13 +3544,7 @@ int poll_add_dcb(DCB *dcb)
int rc = 0;
if (poll_add_fd_to_worker(worker_id, dcb->fd, events, (MXS_POLL_DATA*)dcb))
{
dcb_add_to_list(dcb);
MXS_DEBUG("%lu [poll_add_dcb] Added dcb %p in state %s to poll set.",
thread_self(), dcb, STRDCBSTATE(dcb->state));
}
else
if (!dcb_add_to_worker(worker_id, dcb, events))
{
/**
* We failed to add the DCB to a worker. Revert the state so that it