diff --git a/include/maxscale/dcb.h b/include/maxscale/dcb.h index d598d1678..12961092b 100644 --- a/include/maxscale/dcb.h +++ b/include/maxscale/dcb.h @@ -156,6 +156,7 @@ typedef struct dcb GWBUF *delayq; /**< Delay Backend Write Data Queue */ GWBUF *readq; /**< Read queue for storing incomplete reads */ GWBUF *fakeq; /**< Fake event queue for generated events */ + uint32_t fake_event; /**< Fake event to be delivered to handler */ DCBSTATS stats; /**< DCB related statistics */ struct dcb *nextpersistent; /**< Next DCB in the persistent pool for SERVER */ diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 01a18d153..1ec583e3e 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -3126,11 +3126,6 @@ static uint32_t dcb_process_poll_events(DCB *dcb, uint32_t events) } #endif - if (dcb->n_close != 0) - { - dcb_final_close(dcb); - } - return rc; } @@ -3138,6 +3133,25 @@ static uint32_t dcb_handler(DCB* dcb, uint32_t events) { this_thread.current_dcb = dcb; uint32_t rv = dcb_process_poll_events(dcb, events); + + // When all I/O events have been handled, we will immediately + // process an added fake event. As the handling of a fake event + // may lead to the addition of another fake event we loop until + // there is no fake event or the dcb has been closed. + + while ((dcb->n_close == 0) && (dcb->fake_event != 0)) + { + events = dcb->fake_event; + dcb->fake_event = 0; + + rv |= dcb_process_poll_events(dcb, events); + } + + if (dcb->n_close != 0) + { + dcb_final_close(dcb); + } + this_thread.current_dcb = NULL; return rv; @@ -3177,16 +3191,37 @@ private: static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, uint32_t ev) { - FakeEventTask* task = new (std::nothrow) FakeEventTask(dcb, buf, ev); - - if (task) + if (dcb == this_thread.current_dcb) { - Worker* worker = Worker::get(dcb->poll.thread.id); - worker->post(std::auto_ptr(task), mxs::Worker::EXECUTE_QUEUED); + // If the fake event is added to the current DCB, we arrange for + // it to be handled immediately in dcb_handler() when the handling + // of the current events are done... + + if (dcb->fake_event != 0) + { + MXS_WARNING("Events have already been injected to current DCB, discarding existing."); + gwbuf_free(dcb->fakeq); + dcb->fake_event = 0; + } + + dcb->fakeq = buf; + dcb->fake_event = ev; } else { - MXS_OOM(); + // ... otherwise we post the fake event using the messaging mechanism. + + FakeEventTask* task = new (std::nothrow) FakeEventTask(dcb, buf, ev); + + if (task) + { + Worker* worker = Worker::get(dcb->poll.thread.id); + worker->post(std::auto_ptr(task), mxs::Worker::EXECUTE_QUEUED); + } + else + { + MXS_OOM(); + } } }