diff --git a/include/maxscale/dcb.h b/include/maxscale/dcb.h index 80ac0b32d..93483d227 100644 --- a/include/maxscale/dcb.h +++ b/include/maxscale/dcb.h @@ -248,6 +248,7 @@ typedef struct dcb SPINLOCK delayqlock; /**< Delay Backend Write Queue spinlock */ GWBUF *delayq; /**< Delay Backend Write Data Queue */ GWBUF *dcb_readqueue; /**< read queue for storing incomplete reads */ + GWBUF *dcb_fakequeue; /**< Fake event queue for generated events */ SPINLOCK authlock; /**< Generic Authorization spinlock */ DCBSTATS stats; /**< DCB related statistics */ diff --git a/server/core/dcb.c b/server/core/dcb.c index cc76a3a89..1ada04d25 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -420,7 +420,11 @@ dcb_free_all_memory(DCB *dcb) gwbuf_free(dcb->dcb_readqueue); dcb->dcb_readqueue = NULL; } - + if (dcb->dcb_fakequeue) + { + gwbuf_free(dcb->dcb_fakequeue); + dcb->dcb_fakequeue = NULL; + } spinlock_acquire(&dcb->cb_lock); while ((cb_dcb = dcb->callbacks) != NULL) { @@ -913,11 +917,15 @@ int dcb_read(DCB *dcb, if (dcb->dcb_readqueue) { - spinlock_acquire(&dcb->authlock); *head = gwbuf_append(*head, dcb->dcb_readqueue); dcb->dcb_readqueue = NULL; nreadtotal = gwbuf_length(*head); - spinlock_release(&dcb->authlock); + } + else if (dcb->dcb_fakequeue) + { + *head = gwbuf_append(*head, dcb->dcb_fakequeue); + dcb->dcb_fakequeue = NULL; + nreadtotal = gwbuf_length(*head); } if (SSL_HANDSHAKE_DONE == dcb->ssl_state || SSL_ESTABLISHED == dcb->ssl_state) @@ -1661,7 +1669,7 @@ dcb_grab_writeq(DCB *dcb, bool first_time) if (first_time && dcb->ssl_read_want_write) { - poll_fake_event(dcb, EPOLLIN); + poll_fake_read_event(dcb); } if (first_time && dcb->draining_flag) @@ -3554,7 +3562,5 @@ dcb_role_name(DCB *dcb) */ void dcb_append_readqueue(DCB *dcb, GWBUF *buffer) { - spinlock_acquire(&dcb->authlock); dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, buffer); - spinlock_release(&dcb->authlock); } diff --git a/server/core/poll.c b/server/core/poll.c index 7b9b75768..cfa316a2e 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -81,8 +81,19 @@ int max_poll_sleep; */ #define MUTEX_EPOLL 0 +/** Fake epoll event struct */ +typedef struct fake_event +{ + DCB *dcb; /*< The DCB where this event was generated */ + GWBUF *data; /*< Fake data, placed in the DCB's read queue */ + uint32_t event; /*< The EPOLL event type */ + struct fake_event *tail; /*< The last event */ + struct fake_event *next; /*< The next event */ +} fake_event_t; + static int *epoll_fd; /*< The epoll file descriptor */ static int next_epoll_fd = 0; /*< Which thread handles the next DCB */ +static fake_event_t **fake_events; /*< Thread-specific fake event queue */ static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */ static GWBITMASK poll_mask; #if MUTEX_EPOLL @@ -91,7 +102,7 @@ static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */ static int n_waiting = 0; /*< No. of threads in epoll_wait */ static int process_pollq(int thread_id, struct epoll_event *event); -static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, __uint32_t ev); +static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, uint32_t ev); static bool poll_dcb_session_check(DCB *dcb, const char *); DCB *eventq = NULL; @@ -224,6 +235,11 @@ poll_init() } } + if ((fake_events = MXS_CALLOC(sizeof(fake_event_t*), n_threads)) == NULL) + { + exit(-1); + } + memset(&pollStats, 0, sizeof(pollStats)); memset(&queueStats, 0, sizeof(queueStats)); bitmask_init(&poll_mask); @@ -339,7 +355,7 @@ poll_add_dcb(DCB *dcb) STRDCBSTATE(dcb->state)); } dcb->state = new_state; - spinlock_release(&dcb->dcb_initlock); + /* * The only possible failure that will not cause a crash is * running out of system resources. @@ -356,6 +372,7 @@ poll_add_dcb(DCB *dcb) } dcb->owner = owner; + spinlock_release(&dcb->dcb_initlock); rc = epoll_ctl(epoll_fd[owner], EPOLL_CTL_ADD, dcb->fd, &ev); if (rc) @@ -702,6 +719,19 @@ poll_waitevents(void *arg) process_pollq(thread_id, &events[i]); } + /** Process fake events */ + while (fake_events[thread_id]) + { + fake_event_t *event = fake_events[thread_id]; + fake_events[thread_id] = fake_events[thread_id]->next; + + struct epoll_event ev; + event->dcb->dcb_fakequeue = event->data; + ev.data.ptr = event->dcb; + ev.events = event->event; + process_pollq(thread_id, &ev); + } + if (check_timeouts && hkheartbeat >= next_timeout_check) { process_idle_sessions(); @@ -795,7 +825,6 @@ process_pollq(int thread_id, struct epoll_event *event) unsigned long qtime; DCB *dcb = event->data.ptr; - atomic_add(&pollStats.evq_pending, -1); #if PROFILE_POLL memlog_log(plog, hkheartbeat - dcb->evq.inserted); @@ -1132,8 +1161,6 @@ dprintPollStats(DCB *dcb) pollStats.evq_length); dcb_printf(dcb, "Maximum event queue length: %" PRId32 "\n", pollStats.evq_max); - dcb_printf(dcb, "No. of DCBs with pending events: %" PRId32 "\n", - pollStats.evq_pending); dcb_printf(dcb, "No. of wakeups with pending queue: %" PRId32 "\n", pollStats.wake_evqpending); @@ -1366,7 +1393,6 @@ poll_loadav(void *data) current_avg = 0.0; } avg_samples[next_sample] = current_avg; - evqp_samples[next_sample] = pollStats.evq_pending; next_sample++; if (next_sample >= n_avg_samples) { @@ -1396,50 +1422,30 @@ void poll_add_epollin_event_to_dcb(DCB* dcb, static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, - __uint32_t ev) + uint32_t ev) { - /** Add buf to readqueue */ - spinlock_acquire(&dcb->authlock); - dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, buf); - spinlock_release(&dcb->authlock); + fake_event_t *event = MXS_MALLOC(sizeof(*event)); - spinlock_acquire(&pollqlock); + if (event) + { + event->data = buf; + event->dcb = dcb; + event->event = ev; + event->next = NULL; + event->tail = event; - /** Set event to DCB */ - if (DCB_POLL_BUSY(dcb)) - { - if (dcb->evq.pending_events == 0) + int thr = dcb->owner; + + if (fake_events[thr]) { - pollStats.evq_pending++; - } - dcb->evq.pending_events |= ev; - } - else - { - dcb->evq.pending_events = ev; - /** Add DCB to eventqueue if it isn't already there */ - if (eventq) - { - dcb->evq.prev = eventq->evq.prev; - eventq->evq.prev->evq.next = dcb; - eventq->evq.prev = dcb; - dcb->evq.next = eventq; + fake_events[thr]->tail->next = event; + fake_events[thr]->tail = event; } else { - eventq = dcb; - dcb->evq.prev = dcb; - dcb->evq.next = dcb; - } - pollStats.evq_length++; - pollStats.evq_pending++; - - if (pollStats.evq_length > pollStats.evq_max) - { - pollStats.evq_max = pollStats.evq_length; + fake_events[thr] = event; } } - spinlock_release(&pollqlock); } /* @@ -1458,7 +1464,7 @@ static void poll_add_event_to_dcb(DCB* dcb, void poll_fake_write_event(DCB *dcb) { - poll_fake_event(dcb, EPOLLOUT); + poll_add_event_to_dcb(dcb, NULL, EPOLLOUT); } /* @@ -1477,79 +1483,7 @@ poll_fake_write_event(DCB *dcb) void poll_fake_read_event(DCB *dcb) { - poll_fake_event(dcb, EPOLLIN); -} - -/* - * Insert a fake completion event for a DCB into the polling queue. - * - * This is used to trigger transmission activity on another DCB from - * within the event processing routine of a DCB. or to allow a DCB - * to defer some further output processing, to allow for other DCBs - * to receive a slice of the processing time. Fake events are added - * to the tail of the event queue, in the same way that real events - * are, so maintain the "fairness" of processing. - * - * @param dcb DCB to emulate an event for - * @param ev Event to emulate - */ -void -poll_fake_event(DCB *dcb, enum EPOLL_EVENTS ev) -{ - - spinlock_acquire(&pollqlock); - /* - * If the DCB is already on the queue, there are no pending events and - * there are other events on the queue, then - * take it off the queue. This stops the DCB hogging the threads. - */ - if (DCB_POLL_BUSY(dcb) && dcb->evq.pending_events == 0 && dcb->evq.prev != dcb) - { - dcb->evq.prev->evq.next = dcb->evq.next; - dcb->evq.next->evq.prev = dcb->evq.prev; - if (eventq == dcb) - { - eventq = dcb->evq.next; - } - dcb->evq.next = NULL; - dcb->evq.prev = NULL; - pollStats.evq_length--; - } - - if (DCB_POLL_BUSY(dcb)) - { - if (dcb->evq.pending_events == 0) - { - pollStats.evq_pending++; - } - dcb->evq.pending_events |= ev; - } - else - { - dcb->evq.pending_events = ev; - dcb->evq.inserted = hkheartbeat; - if (eventq) - { - dcb->evq.prev = eventq->evq.prev; - eventq->evq.prev->evq.next = dcb; - eventq->evq.prev = dcb; - dcb->evq.next = eventq; - } - else - { - eventq = dcb; - dcb->evq.prev = dcb; - dcb->evq.next = dcb; - } - pollStats.evq_length++; - pollStats.evq_pending++; - dcb->evq.inserted = hkheartbeat; - if (pollStats.evq_length > pollStats.evq_max) - { - pollStats.evq_max = pollStats.evq_length; - } - } - spinlock_release(&pollqlock); + poll_add_event_to_dcb(dcb, NULL, EPOLLIN); } /* @@ -1567,42 +1501,7 @@ poll_fake_hangup_event(DCB *dcb) #else uint32_t ev = EPOLLHUP; #endif - - spinlock_acquire(&pollqlock); - if (DCB_POLL_BUSY(dcb)) - { - if (dcb->evq.pending_events == 0) - { - pollStats.evq_pending++; - } - dcb->evq.pending_events |= ev; - } - else - { - dcb->evq.pending_events = ev; - dcb->evq.inserted = hkheartbeat; - if (eventq) - { - dcb->evq.prev = eventq->evq.prev; - eventq->evq.prev->evq.next = dcb; - eventq->evq.prev = dcb; - dcb->evq.next = eventq; - } - else - { - eventq = dcb; - dcb->evq.prev = dcb; - dcb->evq.next = dcb; - } - pollStats.evq_length++; - pollStats.evq_pending++; - dcb->evq.inserted = hkheartbeat; - if (pollStats.evq_length > pollStats.evq_max) - { - pollStats.evq_max = pollStats.evq_length; - } - } - spinlock_release(&pollqlock); + poll_add_event_to_dcb(dcb, NULL, ev); } /** @@ -1696,14 +1595,14 @@ poll_get_stat(POLL_STAT stat) return ts_stats_sum(pollStats.n_accept); case POLL_STAT_EVQ_LEN: return pollStats.evq_length; - case POLL_STAT_EVQ_PENDING: - return pollStats.evq_pending; case POLL_STAT_EVQ_MAX: return pollStats.evq_max; case POLL_STAT_MAX_QTIME: return (int)queueStats.maxqtime; case POLL_STAT_MAX_EXECTIME: return (int)queueStats.maxexectime; + default: + break; } return 0; } diff --git a/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c b/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c index cc943a460..67b602cdc 100644 --- a/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c +++ b/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c @@ -599,10 +599,8 @@ gw_read_backend_event(DCB *dcb) if (proto->protocol_auth_state == MXS_AUTH_STATE_COMPLETE) { /** Authentication completed successfully */ - spinlock_acquire(&dcb->authlock); GWBUF *localq = dcb->delayq; dcb->delayq = NULL; - spinlock_release(&dcb->authlock); if (localq) { @@ -746,9 +744,9 @@ gw_read_and_write(DCB *dcb) { GWBUF *tmp = modutil_get_complete_packets(&read_buffer); /* Put any residue into the read queue */ - spinlock_acquire(&dcb->authlock); + dcb->dcb_readqueue = read_buffer; - spinlock_release(&dcb->authlock); + if (tmp == NULL) { /** No complete packets */ @@ -1012,7 +1010,7 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) gwbuf_free(queue); rc = 0; - spinlock_release(&dcb->authlock); + break; case MXS_AUTH_STATE_COMPLETE: @@ -1027,7 +1025,7 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) dcb->fd, STRPROTOCOLSTATE(backend_protocol->protocol_auth_state)); - spinlock_release(&dcb->authlock); + /** * Statement type is used in readwrite split router. * Command is *not* set for readconn router. @@ -1082,7 +1080,7 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) * connected with auth ok */ backend_set_delayqueue(dcb, queue); - spinlock_release(&dcb->authlock); + rc = 1; } break; @@ -1807,9 +1805,9 @@ static GWBUF* process_response_data(DCB* dcb, /** Store the already read data into the readqueue of the DCB * and restore the response status to the initial number of packets */ - spinlock_acquire(&dcb->authlock); + dcb->dcb_readqueue = gwbuf_append(outbuf, dcb->dcb_readqueue); - spinlock_release(&dcb->authlock); + protocol_set_response_status(p, initial_packets, initial_bytes); return NULL; } diff --git a/server/modules/protocol/MySQL/MySQLClient/mysql_client.c b/server/modules/protocol/MySQL/MySQLClient/mysql_client.c index df10a1d1f..94040b3a3 100644 --- a/server/modules/protocol/MySQL/MySQLClient/mysql_client.c +++ b/server/modules/protocol/MySQL/MySQLClient/mysql_client.c @@ -433,19 +433,19 @@ int gw_read_client_event(DCB* dcb) * will be changed to MYSQL_IDLE (see below). * */ - case MXS_AUTH_STATE_MESSAGE_READ: - /* After this call read_buffer will point to freed data */ - if (nbytes_read < 3 || (0 == max_bytes && nbytes_read < - (MYSQL_GET_PACKET_LEN((uint8_t *) GWBUF_DATA(read_buffer)) + 4)) || - (0 != max_bytes && nbytes_read < max_bytes)) - { - spinlock_acquire(&dcb->authlock); - dcb->dcb_readqueue = read_buffer; - spinlock_release(&dcb->authlock); - return 0; - } - return_code = gw_read_do_authentication(dcb, read_buffer, nbytes_read); - break; + case MXS_AUTH_STATE_MESSAGE_READ: + /* After this call read_buffer will point to freed data */ + if (nbytes_read < 3 || (0 == max_bytes && nbytes_read < + (MYSQL_GET_PACKET_LEN((uint8_t *) GWBUF_DATA(read_buffer)) + 4)) || + (0 != max_bytes && nbytes_read < max_bytes)) + { + + dcb->dcb_readqueue = read_buffer; + + return 0; + } + return_code = gw_read_do_authentication(dcb, read_buffer, nbytes_read); + break; /** * @@ -861,9 +861,9 @@ gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read) if (nbytes_read < 3 || nbytes_read < (MYSQL_GET_PACKET_LEN((uint8_t *) GWBUF_DATA(read_buffer)) + 4)) { - spinlock_acquire(&dcb->authlock); + dcb->dcb_readqueue = read_buffer; - spinlock_release(&dcb->authlock); + return 0; } gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL); @@ -904,9 +904,9 @@ gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities) { /* Must have been data left over */ /* Add incomplete mysql packet to read queue */ - spinlock_acquire(&dcb->authlock); + dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, read_buffer); - spinlock_release(&dcb->authlock); + } } else if (NULL != session->router_session || (rcap_type_required(capabilities, RCAP_TYPE_NO_RSESSION))) diff --git a/server/modules/protocol/MySQL/mysql_common.c b/server/modules/protocol/MySQL/mysql_common.c index 9dad9552a..04618cb68 100644 --- a/server/modules/protocol/MySQL/mysql_common.c +++ b/server/modules/protocol/MySQL/mysql_common.c @@ -1039,9 +1039,9 @@ bool read_complete_packet(DCB *dcb, GWBUF **readbuf) if (localbuf) { /** Store any extra data in the DCB's readqueue */ - spinlock_acquire(&dcb->authlock); + dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, localbuf); - spinlock_release(&dcb->authlock); + } } @@ -1061,7 +1061,6 @@ bool gw_get_shared_session_auth_info(DCB* dcb, MYSQL_session* session) CHK_DCB(dcb); CHK_SESSION(dcb->session); - spinlock_acquire(&dcb->session->ses_lock); if (dcb->session->state != SESSION_STATE_ALLOC && dcb->session->state != SESSION_STATE_DUMMY) @@ -1076,7 +1075,7 @@ bool gw_get_shared_session_auth_info(DCB* dcb, MYSQL_session* session) pthread_self(), dcb->session->state); rval = false; } - spinlock_release(&dcb->session->ses_lock); + return rval; } diff --git a/server/modules/routing/maxinfo/maxinfo_exec.c b/server/modules/routing/maxinfo/maxinfo_exec.c index 3ede0d05b..b77cc148f 100644 --- a/server/modules/routing/maxinfo/maxinfo_exec.c +++ b/server/modules/routing/maxinfo/maxinfo_exec.c @@ -1044,15 +1044,6 @@ maxinfo_event_queue_length() return poll_get_stat(POLL_STAT_EVQ_LEN); } -/** - * Interface to poll stats for event pending queue length - */ -static int -maxinfo_event_pending_queue_length() -{ - return poll_get_stat(POLL_STAT_EVQ_PENDING); -} - /** * Interface to poll stats for max event queue length */ @@ -1108,7 +1099,6 @@ static struct { "Error_events", VT_INT, (STATSFUNC)maxinfo_error_events }, { "Accept_events", VT_INT, (STATSFUNC)maxinfo_accept_events }, { "Event_queue_length", VT_INT, (STATSFUNC)maxinfo_event_queue_length }, - { "Pending_events", VT_INT, (STATSFUNC)maxinfo_event_pending_queue_length }, { "Max_event_queue_length", VT_INT, (STATSFUNC)maxinfo_max_event_queue_length }, { "Max_event_queue_time", VT_INT, (STATSFUNC)maxinfo_max_event_queue_time }, { "Max_event_execution_time", VT_INT, (STATSFUNC)maxinfo_max_event_exec_time },