diff --git a/server/core/dcb.c b/server/core/dcb.c index 0b2b038e7..8c6c83799 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -1389,6 +1389,12 @@ dprintDCB(DCB *pdcb, DCB *dcb) dcb->stats.n_high_water); dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); + if (DCB_POLL_BUSY(dcb)) + { + dcb_printf(pdcb, "\t\tPending events in the queue: %x %s\n", + dcb->evq.pending_events, dcb->evq.processing ? "(processing)" : ""); + + } if (dcb->flags & DCBF_CLONE) dcb_printf(pdcb, "\t\tDCB is a clone.\n"); #if SPINLOCK_PROFILE diff --git a/server/core/poll.c b/server/core/poll.c index 9bd11c3fb..d4cdde67e 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -1202,15 +1202,24 @@ poll_fake_write_event(DCB *dcb) uint32_t ev = EPOLLOUT; spinlock_acquire(&pollqlock); + /* + * If the DCB is already on the queue, there are no pending events and + * there are other events on the queue, then + * take it off the queue. This stops the DCB hogging the threads. + */ + if (DCB_POLL_BUSY(dcb) && dcb->evq.pending_events == 0 && dcb->evq.prev != dcb) + { + dcb->evq.prev->evq.next = dcb->evq.next; + dcb->evq.next->evq.prev = dcb->evq.prev; + if (eventq == dcb) + eventq = dcb->evq.next; + dcb->evq.next = NULL; + dcb->evq.prev = NULL; + pollStats.evq_length--; + } + if (DCB_POLL_BUSY(dcb)) { - if (dcb->evq.pending_events == 0) - { - pollStats.evq_pending++; -#if PROFILE_POLL - dcb->evq.inserted = hkheartbeat; -#endif - } dcb->evq.pending_events |= ev; } else @@ -1241,3 +1250,34 @@ uint32_t ev = EPOLLOUT; } spinlock_release(&pollqlock); } + +/** + * Print the event queue contents + * + * @param pdcb The DCB to print the event queue to + */ +void +dShowEventQ(DCB *pdcb) +{ +DCB *dcb; +int found = 0; +uint32_t ev; + + spinlock_acquire(&pollqlock); + if (eventq == NULL) + { + /* Nothing to process */ + spinlock_release(&pollqlock); + return 0; + } + dcb = eventq; + dcb_printf(pdcb, "%16s | %10s | %s\n", "DCB", "Status", "Events"); + dcb_printf(pdcb, "-----------------+------------+--------------------\n"); + do { + dcb_printf(pdcb, "%16p | %10s | %s\n", dcb, + dcb->evq.processing ? "Processing" : "Pending", + event_to_string(dcb->evq.pending_events)); + dcb = dcb->evq.next; + } while (dcb != eventq); + spinlock_release(&pollqlock); +} diff --git a/server/include/poll.h b/server/include/poll.h index 6524f1bbb..45959830f 100644 --- a/server/include/poll.h +++ b/server/include/poll.h @@ -42,4 +42,5 @@ extern void poll_shutdown(); extern GWBITMASK *poll_bitmask(); extern void dprintPollStats(DCB *); extern void dShowThreads(DCB *dcb); +extern void dShowEventQ(DCB *dcb); #endif diff --git a/server/modules/routing/debugcmd.c b/server/modules/routing/debugcmd.c index f238e16e6..86c18389e 100644 --- a/server/modules/routing/debugcmd.c +++ b/server/modules/routing/debugcmd.c @@ -41,6 +41,7 @@ * than simply addresses * 23/05/14 Mark Riddoch Added support for developer and user modes * 29/05/14 Mark Riddoch Add Filter support + * 16/10/14 Mark Riddoch Add show eventq * * @endverbatim */ @@ -116,6 +117,10 @@ struct subcommand showoptions[] = { "Show the poll statistics", "Show the poll statistics", {0, 0, 0} }, + { "eventq", 0, dShowEventQ, + "Show the queue of events waiting to be processed", + "Show the queue of events waiting to be processed", + {0, 0, 0} }, { "filter", 1, dprintFilter, "Show details of a filter, called with a filter name", "Show details of a filter, called with the address of a filter",