diff --git a/Documentation/MaxScale 1.0.1beta Release Notes.pdf b/Documentation/MaxScale 1.0.1beta Release Notes.pdf new file mode 100644 index 000000000..7430fca50 Binary files /dev/null and b/Documentation/MaxScale 1.0.1beta Release Notes.pdf differ diff --git a/Documentation/MaxScale 1.0beta Release Notes.pdf b/Documentation/history/MaxScale 1.0beta Release Notes.pdf similarity index 100% rename from Documentation/MaxScale 1.0beta Release Notes.pdf rename to Documentation/history/MaxScale 1.0beta Release Notes.pdf diff --git a/Makefile b/Makefile index 63f92751f..fe49583b0 100644 --- a/Makefile +++ b/Makefile @@ -57,7 +57,7 @@ depend: install: (cd server; make DEST=$(DEST) install) (cd log_manager; make DEST=$(DEST) install) - (cd query_classifier; make DEST=$(DEST) install) + (cd query_classifier;touch depend; make DEST=$(DEST) install) (cd client; make DEST=$(DEST) install) cleantests: diff --git a/client/test/maxadmin_stress.sh b/client/test/maxadmin_stress.sh new file mode 100644 index 000000000..8d495dd4c --- /dev/null +++ b/client/test/maxadmin_stress.sh @@ -0,0 +1,53 @@ +#!/bin/sh +failure=0 +passed=0 + +clients=20 +cmdcnt=1000 + +echo Running $clients parallel iterations of $cmdcnt commands + +for ((cnt=0; cnt<$clients; cnt++ )); do + for ((i=0; i<$cmdcnt; i++ )); do + maxadmin -pskysql show services; + done >/dev/null & +done >& /dev/null + +peak=0 +while [ "`jobs -p`" != "" ]; do + jobs >& /dev/null + zombies=`maxadmin -pskysql list dcbs | grep -ci zombies` + if [ $zombies -gt $peak ] ; then + peak=$zombies + fi + sleep 1 +done +if [ $peak -gt 10 ]; then + echo "High peak zombie count ($peak): Failed" + failure=`expr $failure + 1` +else + passed=`expr $passed + 1` + echo "Zombie collection ($peak): Passed" +fi +zombies=`maxadmin -pskysql list dcbs | grep -ci zombies` +if [ $zombies != "0" ]; then + echo "Residual zombie DCBs: Failed" + failure=`expr $failure + 1` +else + passed=`expr $passed + 1` + echo "Residual zombie DCBs: Passed" +fi +sessions=`maxadmin -pskysql list services | awk -F\| '/ cli/ { print $3 }'` +if [ $sessions -gt 3 ]; then + echo "Session shutdown, $sessions: Failed" + failure=`expr $failure + 1` +else + passed=`expr $passed + 1` + echo "Session shutdown: Passed" +fi + +sessions=`maxadmin -pskysql list services | awk -F\| '/ cli/ { print $4 }'` + +echo "Test run complete. $passed passes, $failure failures" +echo "$sessions CLI sessions executed" +exit $failure diff --git a/macros.cmake b/macros.cmake index caa81f7b6..7e3e5d250 100644 --- a/macros.cmake +++ b/macros.cmake @@ -212,9 +212,14 @@ endmacro() function(subdirs VAR DIRPATH) +if(${CMAKE_VERSION} VERSION_LESS 2.12 ) +set(COMP_VAR PATH) +else() +set(COMP_VAR DIRECTORY) +endif() file(GLOB_RECURSE SDIR ${DIRPATH}/*) foreach(LOOP ${SDIR}) - get_filename_component(LOOP ${LOOP} DIRECTORY) + get_filename_component(LOOP ${LOOP} ${COMP_VAR}) list(APPEND ALLDIRS ${LOOP}) endforeach() list(REMOVE_DUPLICATES ALLDIRS) diff --git a/server/core/dcb.c b/server/core/dcb.c index b8f9108de..0b2b038e7 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -123,12 +123,6 @@ DCB *rval; rval->dcb_errhandle_called = false; #endif rval->dcb_role = role; -#if 1 - simple_mutex_init(&rval->dcb_write_lock, "DCB write mutex"); - simple_mutex_init(&rval->dcb_read_lock, "DCB read mutex"); - rval->dcb_write_active = false; - rval->dcb_read_active = false; -#endif spinlock_init(&rval->dcb_initlock); spinlock_init(&rval->writeqlock); spinlock_init(&rval->delayqlock); @@ -141,6 +135,13 @@ DCB *rval; rval->polloutbusy = 0; rval->writecheck = 0; rval->fd = -1; + + rval->evq.next = NULL; + rval->evq.prev = NULL; + rval->evq.pending_events = 0; + rval->evq.processing = 0; + spinlock_init(&rval->evq.eventqlock); + memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics rval->state = DCB_STATE_ALLOC; bitmask_init(&rval->memdata.bitmask); @@ -218,43 +219,11 @@ dcb_add_to_zombieslist(DCB *dcb) spinlock_release(&zombiespin); return; } -#if 1 /*< * Add closing dcb to the top of the list. */ dcb->memdata.next = zombies; zombies = dcb; -#else - if (zombies == NULL) { - zombies = dcb; - } else { - DCB *ptr = zombies; - while (ptr->memdata.next) - { - ss_info_dassert( - ptr->memdata.next->state == DCB_STATE_ZOMBIE, - "Next zombie is not in DCB_STATE_ZOMBIE state"); - - ss_info_dassert( - ptr != dcb, - "Attempt to add DCB to zombies list although it " - "is already there."); - - if (ptr == dcb) - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Attempt to add DCB to zombies " - "list when it is already in the list"))); - break; - } - ptr = ptr->memdata.next; - } - if (ptr != dcb) { - ptr->memdata.next = dcb; - } - } -#endif /*< * Set state which indicates that it has been added to zombies * list. @@ -389,8 +358,6 @@ DCB_CALLBACK *cb; spinlock_release(&dcb->cb_lock); bitmask_free(&dcb->memdata.bitmask); - simple_mutex_done(&dcb->dcb_read_lock); - simple_mutex_done(&dcb->dcb_write_lock); free(dcb); } @@ -404,15 +371,10 @@ DCB_CALLBACK *cb; * the memdata.bitmask then the DCB is no longer able to be * referenced and it can be finally removed. * - * The excluded DCB allows a thread to exclude a DCB from zombie processing. - * It is used when a thread calls dcb_process_zombies when there is - * a DCB that the caller knows it will continue processing with. - * * @param threadid The thread ID of the caller - * @param excluded The DCB the thread currently uses, NULL or valid DCB. */ DCB * -dcb_process_zombies(int threadid, DCB *excluded) +dcb_process_zombies(int threadid) { DCB *ptr, *lptr; DCB* dcb_list = NULL; @@ -446,9 +408,10 @@ bool succp = false; CHK_DCB(ptr); /* - * Skip processing of the excluded DCB + * Skip processing of DCB's that are + * in the event queue waiting to be processed. */ - if (ptr == excluded) + if (ptr->evq.next || ptr->evq.prev) { lptr = ptr; ptr = ptr->memdata.next; @@ -522,7 +485,6 @@ bool succp = false; /*< * Close file descriptor and move to clean-up phase. */ - ss_dassert(excluded != dcb); rc = close(dcb->fd); if (rc < 0) { @@ -1255,7 +1217,7 @@ printDCB(DCB *dcb) static void spin_reporter(void *dcb, char *desc, int value) { - dcb_printf((DCB *)dcb, "\t\t%-35s %d\n", desc, value); + dcb_printf((DCB *)dcb, "\t\t%-40s %d\n", desc, value); } @@ -1316,10 +1278,6 @@ DCB *dcb; dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes); dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered); dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); - dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls); - dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_readrechecks); - dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls); - dcb_printf(pdcb, "\t\tNo. of write rechecks: %d\n", dcb->stats.n_writerechecks); dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); if (dcb->flags & DCBF_CLONE) @@ -1427,10 +1385,6 @@ dprintDCB(DCB *pdcb, DCB *dcb) dcb->stats.n_buffered); dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); - dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls); - dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_readrechecks); - dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls); - dcb_printf(pdcb, "\t\tNo. of write rechecks: %d\n", dcb->stats.n_writerechecks); dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", @@ -1955,99 +1909,6 @@ int rval = 0; return rval; } -/** - * Called by the EPOLLIN event. Take care of calling the protocol - * read entry point and managing multiple threads competing for the DCB - * without blocking those threads. - * - * This mechanism does away with the need for a mutex on the EPOLLIN event - * and instead implements a queuing mechanism in which nested events are - * queued on the DCB such that when the thread processing the first event - * returns it will read the queued event and process it. This allows the - * thread that would otherwise have to wait to process the nested event - * to return immediately and and process other events. - * - * @param dcb The DCB that has data available - * @param thread_id The ID of the calling thread - * @param nozombies If non-zero then do not do zombie processing - */ -void -dcb_pollin(DCB *dcb, int thread_id, int nozombies) -{ - - spinlock_acquire(&dcb->pollinlock); - if (dcb->pollinbusy == 0) - { - dcb->pollinbusy = 1; - do { - if (dcb->readcheck) - { - dcb->stats.n_readrechecks++; - if (!nozombies) - dcb_process_zombies(thread_id, dcb); - } - dcb->readcheck = 0; - spinlock_release(&dcb->pollinlock); - dcb->func.read(dcb); - spinlock_acquire(&dcb->pollinlock); - } while (dcb->readcheck); - dcb->pollinbusy = 0; - } - else - { - dcb->stats.n_busypolls++; - dcb->readcheck = 1; - } - spinlock_release(&dcb->pollinlock); -} - - -/** - * Called by the EPOLLOUT event. Take care of calling the protocol - * write_ready entry point and managing multiple threads competing for the DCB - * without blocking those threads. - * - * This mechanism does away with the need for a mutex on the EPOLLOUT event - * and instead implements a queuing mechanism in which nested events are - * queued on the DCB such that when the thread processing the first event - * returns it will read the queued event and process it. This allows the - * thread that would otherwise have to wait to process the nested event - * to return immediately and and process other events. - * - * @param dcb The DCB thats available for writes - * @param thread_id The ID of the calling thread - * @param nozombies If non-zero then do not do zombie processing - */ -void -dcb_pollout(DCB *dcb, int thread_id, int nozombies) -{ - - spinlock_acquire(&dcb->polloutlock); - if (dcb->polloutbusy == 0) - { - dcb->polloutbusy = 1; - do { - if (dcb->writecheck) - { - if (!nozombies) - dcb_process_zombies(thread_id, dcb); - dcb->stats.n_writerechecks++; - } - dcb->writecheck = 0; - spinlock_release(&dcb->polloutlock); - dcb->func.write_ready(dcb); - spinlock_acquire(&dcb->polloutlock); - } while (dcb->writecheck); - dcb->polloutbusy = 0; - } - else - { - dcb->stats.n_busywrpolls++; - dcb->writecheck = 1; - } - spinlock_release(&dcb->polloutlock); -} - /** * Get the next DCB in the list of all DCB's diff --git a/server/core/gateway.c b/server/core/gateway.c index 79c2585fe..f2c1eefb2 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -1646,6 +1646,7 @@ void shutdown_server() { poll_shutdown(); + hkshutdown(); log_flush_shutdown(); } diff --git a/server/core/housekeeper.c b/server/core/housekeeper.c index 5aa5c7a29..fe6cb9047 100644 --- a/server/core/housekeeper.c +++ b/server/core/housekeeper.c @@ -42,6 +42,8 @@ static HKTASK *tasks = NULL; */ static SPINLOCK tasklock = SPINLOCK_INIT; +static int do_shutdown = 0; + static void hkthread(void *); /** @@ -172,6 +174,8 @@ void *taskdata; for (;;) { + if (do_shutdown) + return; thread_millisleep(1000); now = time(0); spinlock_acquire(&tasklock); @@ -194,3 +198,13 @@ void *taskdata; spinlock_release(&tasklock); } } + +/** + * Called to shutdown the housekeeper + * + */ +void +hkshutdown() +{ + do_shutdown = 1; +} diff --git a/server/core/poll.c b/server/core/poll.c index 15e4acadb..12cb1d69d 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -30,6 +30,13 @@ #include #include #include +#include + +#define PROFILE_POLL 1 + +#if PROFILE_POLL +#include +#endif extern int lm_enabled_logfiles_bitmask; @@ -47,6 +54,11 @@ extern int lm_enabled_logfiles_bitmask; * etc. * 23/09/14 Mark Riddoch Make use of RDHUP conditional to allow CentOS 5 * builds. + * 24/09/14 Mark Riddoch Introduction of the event queue for processing the + * incoming events rather than processing them immediately + * in the loop after the epoll_wait. This allows for better + * thread utilisaiton and fairer scheduling of the event + * processing. * * @endverbatim */ @@ -65,6 +77,11 @@ static GWBITMASK poll_mask; static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */ #endif static int n_waiting = 0; /*< No. of threads in epoll_wait */ +static int process_pollq(int thread_id); + + +DCB *eventq = NULL; +SPINLOCK pollqlock = SPINLOCK_INIT; /** * Thread load average, this is the average number of descriptors in each @@ -125,6 +142,8 @@ static struct { int n_nothreads; /*< Number of times no threads are polling */ int n_fds[MAXNFDS]; /*< Number of wakeups with particular n_fds value */ + int evq_length; /*< Event queue length */ + int evq_max; /*< Maximum event queue length */ } pollStats; /** @@ -338,6 +357,17 @@ return_rc: * deschedule a process if a timeout is included, but will not do this if a 0 timeout * value is given. this improves performance when the gateway is under heavy load. * + * In order to provide a fairer means of sharign the threads between the different + * DCB's the poll mechanism has been decoupled from the processing of the events. + * The events are now recieved via the epoll_wait call, a queue of DCB's that have + * events pending is maintained and as new events arrive the DCB is added to the end + * of this queue. If an eent arrives for a DCB alreayd in the queue, then the event + * bits are added to the DCB but the DCB mantains the same point in the queue unless + * the original events are already being processed. If they are being processed then + * the DCB is moved to the back of the queue, this means that a DCB that is receiving + * events at a high rate will not block the execution of events for other DCB's and + * should result in a fairer polling strategy. + * * @param arg The thread ID passed as a void * to satisfy the threading package */ void @@ -346,8 +376,6 @@ poll_waitevents(void *arg) struct epoll_event events[MAX_EVENTS]; int i, nfds; int thread_id = (int)arg; -bool no_op = false; -static bool process_zombies_only = false; /*< flag for all threads */ DCB *zombies = NULL; /** Add this thread to the bitmask of running polling threads */ @@ -362,20 +390,19 @@ DCB *zombies = NULL; while (1) { + /* Process of the queue of waiting requests */ + while (do_shutdown == 0 && process_pollq(thread_id)) + { + if (thread_data) + thread_data[thread_id].state = THREAD_ZPROCESSING; + zombies = dcb_process_zombies(thread_id); + } + atomic_add(&n_waiting, 1); #if BLOCKINGPOLL nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); atomic_add(&n_waiting, -1); #else /* BLOCKINGPOLL */ - if (!no_op) { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] MaxScale thread " - "%d > epoll_wait <", - pthread_self(), - thread_id))); - no_op = TRUE; - } #if MUTEX_EPOLL simple_mutex_lock(&epoll_wait_mutex, TRUE); #endif @@ -396,32 +423,19 @@ DCB *zombies = NULL; pthread_self(), nfds, eno))); - no_op = FALSE; } - else if (nfds == 0) + /* + * If there are no new descriptors from the non-blocking call + * and nothing to proces on the event queue then for do a + * blocking call to epoll_wait. + */ + else if (nfds == 0 && process_pollq(thread_id) == 0) { - atomic_add(&n_waiting, -1); - if (process_zombies_only) { -#if MUTEX_EPOLL - simple_mutex_unlock(&epoll_wait_mutex); -#endif - goto process_zombies; - } else { - nfds = epoll_wait(epoll_fd, + atomic_add(&n_waiting, 1); + nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, EPOLL_TIMEOUT); - /*< - * When there are zombies to be cleaned up but - * no client requests, allow all threads to call - * dcb_process_zombies without having to wait - * for the timeout. - */ - if (nfds == 0 && dcb_get_zombies() != NULL) - { - process_zombies_only = true; - } - } } else { @@ -457,228 +471,64 @@ DCB *zombies = NULL; atomic_add(&load_samples, 1); atomic_add(&load_nfds, nfds); + /* + * Process every DCB that has a new event and add + * it to the poll queue. + * If the DCB is currently beign processed then we + * or in the new eent bits to the pending event bits + * and leave it in the queue. + * If the DCB was not already in the queue then it was + * idle and is added to the queue to process after + * setting the event bits. + */ for (i = 0; i < nfds; i++) { DCB *dcb = (DCB *)events[i].data.ptr; __uint32_t ev = events[i].events; - CHK_DCB(dcb); - if (thread_data) + spinlock_acquire(&pollqlock); + if (DCB_POLL_BUSY(dcb)) { - thread_data[thread_id].cur_dcb = dcb; - thread_data[thread_id].event = ev; + dcb->evq.pending_events |= ev; } - -#if defined(SS_DEBUG) - if (dcb_fake_write_ev[dcb->fd] != 0) { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "Added fake events %d to ev %d.", - pthread_self(), - dcb_fake_write_ev[dcb->fd], - ev))); - ev |= dcb_fake_write_ev[dcb->fd]; - dcb_fake_write_ev[dcb->fd] = 0; - } -#endif - ss_debug(spinlock_acquire(&dcb->dcb_initlock);) - ss_dassert(dcb->state != DCB_STATE_ALLOC); - ss_dassert(dcb->state != DCB_STATE_DISCONNECTED); - ss_dassert(dcb->state != DCB_STATE_FREED); - ss_debug(spinlock_release(&dcb->dcb_initlock);) - - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] event %d dcb %p " - "role %s", - pthread_self(), - ev, - dcb, - STRDCBROLE(dcb->dcb_role)))); - - if (ev & EPOLLOUT) + else { - int eno = 0; - eno = gw_getsockerrno(dcb->fd); - - if (eno == 0) { -#if MUTEX_BLOCK - simple_mutex_lock( - &dcb->dcb_write_lock, - true); - ss_info_dassert( - !dcb->dcb_write_active, - "Write already active"); - dcb->dcb_write_active = TRUE; - atomic_add( - &pollStats.n_write, - 1); - dcb->func.write_ready(dcb); - dcb->dcb_write_active = FALSE; - simple_mutex_unlock( - &dcb->dcb_write_lock); -#else - atomic_add(&pollStats.n_write, - 1); - dcb_pollout(dcb, thread_id, nfds); -#endif - } else { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "EPOLLOUT due %d, %s. " - "dcb %p, fd %i", - pthread_self(), - eno, - strerror(eno), - dcb, - dcb->fd))); - } - } - if (ev & EPOLLIN) - { -#if MUTEX_BLOCK - simple_mutex_lock(&dcb->dcb_read_lock, - true); - ss_info_dassert(!dcb->dcb_read_active, - "Read already active"); - dcb->dcb_read_active = TRUE; -#endif - - if (dcb->state == DCB_STATE_LISTENING) + dcb->evq.pending_events = ev; + if (eventq) { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "Accept in fd %d", - pthread_self(), - dcb->fd))); - atomic_add( - &pollStats.n_accept, 1); - dcb->func.accept(dcb); - } - else - { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "Read in dcb %p fd %d", - pthread_self(), - dcb, - dcb->fd))); - atomic_add(&pollStats.n_read, 1); -#if MUTEX_BLOCK - dcb->func.read(dcb); -#else - dcb_pollin(dcb, thread_id, nfds); -#endif - } -#if MUTEX_BLOCK - dcb->dcb_read_active = FALSE; - simple_mutex_unlock( - &dcb->dcb_read_lock); -#endif - } - if (ev & EPOLLERR) - { - int eno = gw_getsockerrno(dcb->fd); -#if defined(SS_DEBUG) - if (eno == 0) { - eno = dcb_fake_write_errno[dcb->fd]; - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "Added fake errno %d. " - "%s", - pthread_self(), - eno, - strerror(eno)))); - } - dcb_fake_write_errno[dcb->fd] = 0; -#endif - if (eno != 0) { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "EPOLLERR due %d, %s.", - pthread_self(), - eno, - strerror(eno)))); - } - atomic_add(&pollStats.n_error, 1); - dcb->func.error(dcb); - } - - if (ev & EPOLLHUP) - { - int eno = 0; - eno = gw_getsockerrno(dcb->fd); - - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "EPOLLHUP on dcb %p, fd %d. " - "Errno %d, %s.", - pthread_self(), - dcb, - dcb->fd, - eno, - strerror(eno)))); - atomic_add(&pollStats.n_hup, 1); - spinlock_acquire(&dcb->dcb_initlock); - if ((dcb->flags & DCBF_HUNG) == 0) - { - dcb->flags |= DCBF_HUNG; - spinlock_release(&dcb->dcb_initlock); - dcb->func.hangup(dcb); + dcb->evq.prev = eventq->evq.prev; + eventq->evq.prev->evq.next = dcb; + eventq->evq.prev = dcb; + dcb->evq.next = eventq; } else - spinlock_release(&dcb->dcb_initlock); - } - -#ifdef EPOLLRDHUP - if (ev & EPOLLRDHUP) - { - int eno = 0; - eno = gw_getsockerrno(dcb->fd); - - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "EPOLLRDHUP on dcb %p, fd %d. " - "Errno %d, %s.", - pthread_self(), - dcb, - dcb->fd, - eno, - strerror(eno)))); - atomic_add(&pollStats.n_hup, 1); - spinlock_acquire(&dcb->dcb_initlock); - if ((dcb->flags & DCBF_HUNG) == 0) { - dcb->flags |= DCBF_HUNG; - spinlock_release(&dcb->dcb_initlock); - dcb->func.hangup(dcb); + eventq = dcb; + dcb->evq.prev = dcb; + dcb->evq.next = dcb; + } + pollStats.evq_length++; + if (pollStats.evq_length > pollStats.evq_max) + { + pollStats.evq_max = pollStats.evq_length; } - else - spinlock_release(&dcb->dcb_initlock); } -#endif - } /*< for */ - no_op = FALSE; - } /*< if (nfds > 0) */ -process_zombies: - if (thread_data) - { - thread_data[thread_id].state = THREAD_ZPROCESSING; + spinlock_release(&pollqlock); + } } - zombies = dcb_process_zombies(thread_id, NULL); - - if (zombies == NULL) { - process_zombies_only = false; - } + /* + * If there was nothing to process then process the zombie queue + */ + if (process_pollq(thread_id) == 0) + { + if (thread_data) + { + thread_data[thread_id].state = THREAD_ZPROCESSING; + } + zombies = dcb_process_zombies(thread_id); + } + if (do_shutdown) { /*< @@ -701,6 +551,320 @@ process_zombies: } /*< while(1) */ } +/** + * Process of the queue of DCB's that have outstanding events + * + * The first event on the queue will be chosen to be executed by this thread, + * all other events will be left on the queue and may be picked up by other + * threads. When the processing is complete the thread will take the DCB off the + * queue if there are no pending events that have arrived since the thread started + * to process the DCB. If there are pending events the DCB will be moved to the + * back of the queue so that other DCB's will have a share of the threads to + * execute events for them. + * + * @param thread_id The thread ID of the calling thread + * @return 0 if no DCB's have been processed + */ +static int +process_pollq(int thread_id) +{ +DCB *dcb; +int found = 0; +uint32_t ev; + + spinlock_acquire(&pollqlock); + if (eventq == NULL) + { + /* Nothing to process */ + spinlock_release(&pollqlock); + return 0; + } + dcb = eventq; + if (dcb->evq.next == dcb->evq.prev && dcb->evq.processing == 0) + { + found = 1; + dcb->evq.processing = 1; + } + else if (dcb->evq.next == dcb->evq.prev) + { + /* Only item in queue is being processed */ + spinlock_release(&pollqlock); + return 0; + } + else + { + do { + dcb = dcb->evq.next; + } while (dcb != eventq && dcb->evq.processing == 1); + + if (dcb->evq.processing == 0) + { + /* Found DCB to process */ + dcb->evq.processing = 1; + found = 1; + } + } + if (found) + { + ev = dcb->evq.pending_events; + dcb->evq.pending_events = 0; + } + spinlock_release(&pollqlock); + + if (found == 0) + return 0; + + + CHK_DCB(dcb); + if (thread_data) + { + thread_data[thread_id].state = THREAD_PROCESSING; + thread_data[thread_id].cur_dcb = dcb; + thread_data[thread_id].event = ev; + } + +#if defined(SS_DEBUG) + if (dcb_fake_write_ev[dcb->fd] != 0) { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "Added fake events %d to ev %d.", + pthread_self(), + dcb_fake_write_ev[dcb->fd], + ev))); + ev |= dcb_fake_write_ev[dcb->fd]; + dcb_fake_write_ev[dcb->fd] = 0; + } +#endif + ss_debug(spinlock_acquire(&dcb->dcb_initlock);) + ss_dassert(dcb->state != DCB_STATE_ALLOC); + ss_dassert(dcb->state != DCB_STATE_DISCONNECTED); + ss_dassert(dcb->state != DCB_STATE_FREED); + ss_debug(spinlock_release(&dcb->dcb_initlock);) + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] event %d dcb %p " + "role %s", + pthread_self(), + ev, + dcb, + STRDCBROLE(dcb->dcb_role)))); + + if (ev & EPOLLOUT) + { + int eno = 0; + eno = gw_getsockerrno(dcb->fd); + + if (eno == 0) { +#if MUTEX_BLOCK + simple_mutex_lock( + &dcb->dcb_write_lock, + true); + ss_info_dassert( + !dcb->dcb_write_active, + "Write already active"); + dcb->dcb_write_active = TRUE; + atomic_add( + &pollStats.n_write, + 1); + dcb->func.write_ready(dcb); + dcb->dcb_write_active = FALSE; + simple_mutex_unlock( + &dcb->dcb_write_lock); +#else + atomic_add(&pollStats.n_write, + 1); + dcb->func.write_ready(dcb); +#endif + } else { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "EPOLLOUT due %d, %s. " + "dcb %p, fd %i", + pthread_self(), + eno, + strerror(eno), + dcb, + dcb->fd))); + } + } + if (ev & EPOLLIN) + { +#if MUTEX_BLOCK + simple_mutex_lock(&dcb->dcb_read_lock, + true); + ss_info_dassert(!dcb->dcb_read_active, + "Read already active"); + dcb->dcb_read_active = TRUE; +#endif + + if (dcb->state == DCB_STATE_LISTENING) + { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "Accept in fd %d", + pthread_self(), + dcb->fd))); + atomic_add( + &pollStats.n_accept, 1); + dcb->func.accept(dcb); + } + else + { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "Read in dcb %p fd %d", + pthread_self(), + dcb, + dcb->fd))); + atomic_add(&pollStats.n_read, 1); + dcb->func.read(dcb); + } +#if MUTEX_BLOCK + dcb->dcb_read_active = FALSE; + simple_mutex_unlock( + &dcb->dcb_read_lock); +#endif + } + if (ev & EPOLLERR) + { + int eno = gw_getsockerrno(dcb->fd); +#if defined(SS_DEBUG) + if (eno == 0) { + eno = dcb_fake_write_errno[dcb->fd]; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "Added fake errno %d. " + "%s", + pthread_self(), + eno, + strerror(eno)))); + } + dcb_fake_write_errno[dcb->fd] = 0; +#endif + if (eno != 0) { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "EPOLLERR due %d, %s.", + pthread_self(), + eno, + strerror(eno)))); + } + atomic_add(&pollStats.n_error, 1); + dcb->func.error(dcb); + } + + if (ev & EPOLLHUP) + { + int eno = 0; + eno = gw_getsockerrno(dcb->fd); + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "EPOLLHUP on dcb %p, fd %d. " + "Errno %d, %s.", + pthread_self(), + dcb, + dcb->fd, + eno, + strerror(eno)))); + atomic_add(&pollStats.n_hup, 1); + spinlock_acquire(&dcb->dcb_initlock); + if ((dcb->flags & DCBF_HUNG) == 0) + { + dcb->flags |= DCBF_HUNG; + spinlock_release(&dcb->dcb_initlock); + dcb->func.hangup(dcb); + } + else + spinlock_release(&dcb->dcb_initlock); + } + +#ifdef EPOLLRDHUP + if (ev & EPOLLRDHUP) + { + int eno = 0; + eno = gw_getsockerrno(dcb->fd); + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "EPOLLRDHUP on dcb %p, fd %d. " + "Errno %d, %s.", + pthread_self(), + dcb, + dcb->fd, + eno, + strerror(eno)))); + atomic_add(&pollStats.n_hup, 1); + spinlock_acquire(&dcb->dcb_initlock); + if ((dcb->flags & DCBF_HUNG) == 0) + { + dcb->flags |= DCBF_HUNG; + spinlock_release(&dcb->dcb_initlock); + dcb->func.hangup(dcb); + } + else + spinlock_release(&dcb->dcb_initlock); + } +#endif + + spinlock_acquire(&pollqlock); + if (dcb->evq.pending_events == 0) + { + /* No pending events so remove from the queue */ + if (dcb->evq.prev != dcb) + { + dcb->evq.prev->evq.next = dcb->evq.next; + dcb->evq.next->evq.prev = dcb->evq.prev; + if (eventq == dcb) + eventq = dcb->evq.next; + } + else + { + eventq = NULL; + } + dcb->evq.next = NULL; + dcb->evq.prev = NULL; + pollStats.evq_length--; + } + else + { + /* + * We have a pending event, move to the end of the queue + * if there are any other DCB's in the queue. + * + * If we are the first item on the queue this is easy, we + * just bump the eventq pointer. + */ + if (dcb->evq.prev != dcb) + { + if (eventq == dcb) + eventq = dcb->evq.next; + else + { + dcb->evq.prev->evq.next = dcb->evq.next; + dcb->evq.next->evq.prev = dcb->evq.prev; + dcb->evq.prev = eventq->evq.prev; + dcb->evq.next = eventq; + eventq->evq.prev = dcb; + dcb->evq.prev->evq.next = dcb; + } + } + } + dcb->evq.processing = 0; + spinlock_release(&pollqlock); + + return 1; +} + /** * Shutdown the polling loop */ @@ -721,6 +885,20 @@ poll_bitmask() return &poll_mask; } +/** + * Display an entry from the spinlock statistics data + * + * @param dcb The DCB to print to + * @param desc Description of the statistic + * @param value The statistic value + */ +static void +spin_reporter(void *dcb, char *desc, int value) +{ + dcb_printf((DCB *)dcb, "\t%-40s %d\n", desc, value); +} + + /** * Debug routine to print the polling statistics * @@ -745,6 +923,10 @@ int i; pollStats.n_accept); dcb_printf(dcb, "Number of times no threads polling: %d\n", pollStats.n_nothreads); + dcb_printf(dcb, "Current event queue length: %d\n", + pollStats.evq_length); + dcb_printf(dcb, "Maximum event queue length: %d\n", + pollStats.evq_max); dcb_printf(dcb, "No of poll completions with descriptors\n"); dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n"); @@ -754,6 +936,11 @@ int i; } dcb_printf(dcb, "\t>= %d\t\t\t%d\n", MAXNFDS, pollStats.n_fds[MAXNFDS-1]); + +#if SPINLOCK_PROFILE + dcb_printf(dcb, "Event queue lock statistics:\n"); + spinlock_stats(&pollqlock, spin_reporter, dcb); +#endif } /** diff --git a/server/include/dcb.h b/server/include/dcb.h index 06687f349..72686b7b7 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -53,7 +53,8 @@ struct service; * 07/02/2014 Massimiliano Pinto Added ipv4 data struct into for dcb * 07/05/2014 Mark Riddoch Addition of callback mechanism * 08/05/2014 Mark Riddoch Addition of writeq high and low watermarks - * 27/08/2014 Mark Ridddoch Addition of write event queuing + * 27/08/2014 Mark Riddoch Addition of write event queuing + * 23/09/2014 Mark Riddoch New poll processing queue * * @endverbatim */ @@ -97,6 +98,14 @@ typedef struct gw_protocol { int (*session)(struct dcb *, void *); } GWPROTOCOL; +typedef struct { + struct dcb *next; + struct dcb *prev; + uint32_t pending_events; + int processing; + SPINLOCK eventqlock; +} DCBEVENTQ; + /** * The GWPROTOCOL version data. The following should be updated whenever * the GWPROTOCOL structure is changed. See the rules defined in modinfo.h @@ -114,10 +123,6 @@ typedef struct dcbstats { int n_buffered; /*< Number of buffered writes */ int n_high_water; /*< Number of crosses of high water mark */ int n_low_water; /*< Number of crosses of low water mark */ - int n_busypolls; /*< Number of read polls whiel reading */ - int n_readrechecks; /*< Number of rechecks for reads */ - int n_busywrpolls; /*< Number of write polls while writing */ - int n_writerechecks;/*< Number of rechecks for writes */ } DCBSTATS; /** @@ -204,12 +209,7 @@ typedef struct dcb { #endif dcb_role_t dcb_role; SPINLOCK dcb_initlock; -#if 1 - simple_mutex_t dcb_read_lock; - simple_mutex_t dcb_write_lock; - bool dcb_read_active; - bool dcb_write_active; -#endif + DCBEVENTQ evq; /**< The event queue for this DCB */ int fd; /**< The descriptor */ dcb_state_t state; /**< Current descriptor state */ int flags; /**< DCB flags */ @@ -271,8 +271,8 @@ int fail_accept_errno; #define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water) #define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) -void dcb_pollin(DCB *, int, int); -void dcb_pollout(DCB *, int, int); +#define DCB_POLL_BUSY(x) ((x)->evq.next != NULL) + DCB *dcb_get_zombies(void); int gw_write( #if defined(SS_DEBUG) @@ -289,7 +289,7 @@ DCB *dcb_clone(DCB *); int dcb_read(DCB *, GWBUF **); int dcb_drain_writeq(DCB *); void dcb_close(DCB *); -DCB *dcb_process_zombies(int, DCB*); /* Process Zombies except the one behind the pointer */ +DCB *dcb_process_zombies(int); /* Process Zombies except the one behind the pointer */ void printAllDCBs(); /* Debug to print all DCB in the system */ void printDCB(DCB *); /* Debug print routine */ void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */ diff --git a/server/include/gw.h b/server/include/gw.h index 822884f9b..14adfb1a0 100644 --- a/server/include/gw.h +++ b/server/include/gw.h @@ -28,10 +28,10 @@ * Configuration for send and receive socket buffer sizes for * backend and cleint connections. */ -#define GW_BACKEND_SO_SNDBUF 32768 -#define GW_BACKEND_SO_RCVBUF 32768 -#define GW_CLIENT_SO_SNDBUF 32768 -#define GW_CLIENT_SO_RCVBUF 32768 +#define GW_BACKEND_SO_SNDBUF (128 * 1024) +#define GW_BACKEND_SO_RCVBUF (128 * 1024) +#define GW_CLIENT_SO_SNDBUF (128 * 1024) +#define GW_CLIENT_SO_RCVBUF (128 * 1024) #define GW_NOINTR_CALL(A) do { errno = 0; A; } while (errno == EINTR) #define GW_MYSQL_LOOP_TIMEOUT 300000000 diff --git a/server/include/housekeeper.h b/server/include/housekeeper.h index 597f19a91..35e76e80d 100644 --- a/server/include/housekeeper.h +++ b/server/include/housekeeper.h @@ -47,4 +47,5 @@ typedef struct hktask { extern void hkinit(); extern int hktask_add(char *name, void (*task)(void *), void *data, int frequency); extern int hktask_remove(char *name); +extern void hkshutdown(); #endif diff --git a/server/include/rdtsc.h b/server/include/rdtsc.h new file mode 100644 index 000000000..50a752f31 --- /dev/null +++ b/server/include/rdtsc.h @@ -0,0 +1,61 @@ +#ifndef _RDTSC_H +#define _RDTSC_H +/* + * This file is distributed as part of the SkySQL Gateway. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file rdtsc.h Access the process time-stamp counter + * + * This is an Intel only facilty that is used to access an accurate time + * value, the granularity of which is related to the processor clock speed + * and the overhead for access is much lower than using any system call + * mechanism. + * + * + * @verbatim + * Revision History + * + * Date Who Description + * 19/09/2014 Mark Riddoch Initial implementation + * + * @endverbatim + */ + +typedef unsigned long long CYCLES; + +/** + * Get the current time-stamp counter value from the processor. This is the + * count of CPU cyceles as a 64 bit value. + * + * The value returned is related to the clock speed, to obtian a value in + * seconds divide the returned value by the clock frequency for the processor. + * + * Note, on multi-processsor systems care much be taken to avoid the thread + * moving to a different processor when taken succsive value of RDTSC to + * obtian accurate timing. This may be done by setting pocessor affinity for + * the thread. See sched_setaffinity/sched_getaffinity. + * + * @return CPU cycle count + */ +static __inline__ CYCLES rdtsc(void) +{ + unsigned long long int x; + __asm__ volatile (".byte 0x0f, 0x31" : "=A" (x)); + return x; +} +#endif diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index f493ec715..6e151d923 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -43,8 +43,8 @@ * High and Low water marks for the slave dcb. These values can be overriden * by the router options highwater and lowwater. */ -#define DEF_LOW_WATER 20000 -#define DEF_HIGH_WATER 300000 +#define DEF_LOW_WATER 2000 +#define DEF_HIGH_WATER 30000 /** * Some useful macros for examining the MySQL Response packets @@ -194,6 +194,7 @@ typedef struct router_instance { char *uuid; /*< UUID for the router to use w/master */ int masterid; /*< Server ID of the master */ int serverid; /*< Server ID to use with master */ + int initbinlog; /*< Initial binlog file number */ char *user; /*< User name to use with master */ char *password; /*< Password to use with master */ char *fileroot; /*< Root of binlog filename */ diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 5b17dbc7d..482ecea27 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -224,6 +224,7 @@ typedef struct backend_ref_st { bref_state_t bref_state; int bref_num_result_wait; sescmd_cursor_t bref_sescmd_cur; + GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */ #if defined(SS_DEBUG) skygw_chk_t bref_chk_tail; #endif diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index bace1eef5..42958fc2d 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -1180,9 +1180,9 @@ int gw_MySQLAccept(DCB *listener) conn_open[c_sock] = true; #endif /* set nonblocking */ - sendbuf = GW_BACKEND_SO_SNDBUF; + sendbuf = GW_CLIENT_SO_SNDBUF; setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen); - sendbuf = GW_BACKEND_SO_RCVBUF; + sendbuf = GW_CLIENT_SO_RCVBUF; setsockopt(c_sock, SOL_SOCKET, SO_RCVBUF, &sendbuf, optlen); setnonblocking(c_sock); diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index 8473e365b..0475c5b2d 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -766,9 +766,9 @@ int gw_do_connect_to_backend( /* prepare for connect */ setipaddress(&serv_addr.sin_addr, host); serv_addr.sin_port = htons(port); - bufsize = GW_CLIENT_SO_SNDBUF; + bufsize = GW_BACKEND_SO_SNDBUF; setsockopt(so, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)); - bufsize = GW_CLIENT_SO_RCVBUF; + bufsize = GW_BACKEND_SO_RCVBUF; setsockopt(so, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)); /* set socket to as non-blocking here */ setnonblocking(so); @@ -1660,6 +1660,8 @@ void protocol_archive_srv_command( server_command_t* h1; int len = 0; + CHK_PROTOCOL(p); + spinlock_acquire(&p->protocol_lock); if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE) @@ -1710,6 +1712,7 @@ void protocol_archive_srv_command( retblock: spinlock_release(&p->protocol_lock); + CHK_PROTOCOL(p); } diff --git a/server/modules/routing/CMakeLists.txt b/server/modules/routing/CMakeLists.txt index 595a216c8..9d2f25f76 100644 --- a/server/modules/routing/CMakeLists.txt +++ b/server/modules/routing/CMakeLists.txt @@ -15,4 +15,4 @@ target_link_libraries(cli log_manager utils) install(TARGETS cli DESTINATION modules) add_subdirectory(readwritesplit) -add_subdirectory(binlog) \ No newline at end of file + diff --git a/server/modules/routing/Makefile b/server/modules/routing/Makefile index 637d5f41c..d27430112 100644 --- a/server/modules/routing/Makefile +++ b/server/modules/routing/Makefile @@ -66,7 +66,7 @@ libcli.so: $(CLIOBJ) $(CC) $(LDFLAGS) $(CLIOBJ) $(LIBS) -o $@ libreadwritesplit.so: -# (cd readwritesplit; touch depend.mk ; make; cp $@ ..) + (cd readwritesplit; touch depend.mk ; make; cp $@ ..) .c.o: $(CC) $(CFLAGS) $< -o $@ diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index dec20f8b4..d92f03c10 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -172,6 +172,7 @@ int i; inst->low_water = DEF_LOW_WATER; inst->high_water = DEF_HIGH_WATER; + inst->initbinlog = 0; /* * We only support one server behind this router, since the server is @@ -244,6 +245,10 @@ int i; { inst->fileroot = strdup(value); } + else if (strcmp(options[i], "initialfile") == 0) + { + inst->initbinlog = atoi(value); + } else if (strcmp(options[i], "lowwater") == 0) { inst->low_water = atoi(value); @@ -450,11 +455,14 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session; * TODO: Handle closure of master session */ LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, "Binlog router close session with master"))); + LOGFILE_ERROR, + "Binlog router close session with master server %s", + router->service->databases->unique_name))); blr_master_reconnect(router); return; } CHK_CLIENT_RSES(slave); + /** * Lock router client session for secure read and update. */ diff --git a/server/modules/routing/binlog/blr_file.c b/server/modules/routing/binlog/blr_file.c index 4f7232e64..7a44033fe 100644 --- a/server/modules/routing/binlog/blr_file.c +++ b/server/modules/routing/binlog/blr_file.c @@ -115,7 +115,11 @@ struct dirent *dp; if (n == 0) // No binlog files found { - sprintf(filename, BINLOG_NAMEFMT, router->fileroot, 1); + if (router->initbinlog) + sprintf(filename, BINLOG_NAMEFMT, router->fileroot, + router->initbinlog); + else + sprintf(filename, BINLOG_NAMEFMT, router->fileroot, 1); blr_file_create(router, filename); } else diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index 412276e48..ee14cec4c 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -55,21 +55,57 @@ #include #include +#include + /* Temporary requirement for auth data */ #include +#define SAMPLE_COUNT 10000 +CYCLES samples[10][SAMPLE_COUNT]; +int sample_index[10] = { 0, 0, 0 }; + +#define LOGD_SLAVE_CATCHUP1 0 +#define LOGD_SLAVE_CATCHUP2 1 +#define LOGD_DISTRIBUTE 2 +#define LOGD_FILE_FLUSH 3 + +SPINLOCK logspin = SPINLOCK_INIT; + +void +log_duration(int sample, CYCLES duration) +{ +char fname[100]; +int i; +FILE *fp; + + spinlock_acquire(&logspin); + samples[sample][sample_index[sample]++] = duration; + if (sample_index[sample] == SAMPLE_COUNT) + { + sprintf(fname, "binlog_profile.%d", sample); + if ((fp = fopen(fname, "a")) != NULL) + { + for (i = 0; i < SAMPLE_COUNT; i++) + fprintf(fp, "%ld\n", samples[sample][i]); + fclose(fp); + } + sample_index[sample] = 0; + } + spinlock_release(&logspin); +} + extern int lm_enabled_logfiles_bitmask; static GWBUF *blr_make_query(char *statement); static GWBUF *blr_make_registration(ROUTER_INSTANCE *router); static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router); -static void encode_value(unsigned char *data, unsigned int value, int len); -static void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt); +void encode_value(unsigned char *data, unsigned int value, int len); +void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt); static void blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr); -static void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr); +void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr); static void *CreateMySQLAuthData(char *username, char *password, char *database); -static void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr); -static uint32_t extract_field(uint8_t *src, int bits); +void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr); +inline uint32_t extract_field(uint8_t *src, int bits); static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len); static int keepalive = 1; @@ -105,7 +141,8 @@ GWBUF *buf; if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL) { LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, - "Binlog router: failed to connect to master\n"))); + "Binlog router: failed to connect to master server '%s'\n", + router->service->databases->unique_name))); return; } @@ -459,7 +496,7 @@ int len = 0x1b; * @param value The value to pack * @param len Number of bits to encode value into */ -static void +void encode_value(unsigned char *data, unsigned int value, int len) { while (len > 0) @@ -477,7 +514,7 @@ encode_value(unsigned char *data, unsigned int value, int len) * @param router The router instance * @param pkt The binlog records */ -static void +void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) { uint8_t *msg = NULL, *ptr, *pdata; @@ -765,7 +802,9 @@ static REP_HEADER phdr; { ss_dassert(pkt_length == 0); } +{ CYCLES start = rdtsc(); blr_file_flush(router); +log_duration(LOGD_FILE_FLUSH, rdtsc() - start); } } /** @@ -774,7 +813,7 @@ static REP_HEADER phdr; * @param pkt The incoming packet in a GWBUF chain * @param hdr The packet header to populate */ -static void +void blr_extract_header(uint8_t *ptr, REP_HEADER *hdr) { @@ -795,10 +834,10 @@ blr_extract_header(uint8_t *ptr, REP_HEADER *hdr) * @param src The raw packet source * @param birs The number of bits to extract (multiple of 8) */ -static uint32_t -extract_field(uint8_t *src, int bits) +inline uint32_t +extract_field(register uint8_t *src, int bits) { -uint32_t rval = 0, shift = 0; +register uint32_t rval = 0, shift = 0; while (bits > 0) { @@ -828,7 +867,7 @@ char file[BINLOG_FNAMELEN+1]; pos = extract_field(ptr+4, 32); pos <<= 32; pos |= extract_field(ptr, 32); - slen = len - 8; + slen = len - (8 + 4); // Allow for position and CRC if (slen > BINLOG_FNAMELEN) slen = BINLOG_FNAMELEN; memcpy(file, ptr + 8, slen); @@ -883,14 +922,16 @@ MYSQL_session *auth_info; * @param hdr The replication event header * @param ptr The raw replication event data */ -static void +void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) { GWBUF *pkt; uint8_t *buf; ROUTER_SLAVE *slave; int action; +CYCLES entry; + entry = rdtsc(); spinlock_acquire(&router->lock); slave = router->slaves; while (slave) @@ -944,12 +985,16 @@ int action; spinlock_acquire(&slave->catch_lock); if (slave->overrun) { +CYCLES cycle_start, cycles; slave->stats.n_overrun++; slave->overrun = 0; spinlock_release(&router->lock); slave->cstate &= ~(CS_UPTODATE|CS_DIST); spinlock_release(&slave->catch_lock); +cycle_start = rdtsc(); blr_slave_catchup(router, slave); +cycles = rdtsc() - cycle_start; +log_duration(LOGD_SLAVE_CATCHUP2, cycles); spinlock_acquire(&router->lock); slave = router->slaves; if (slave) @@ -982,6 +1027,7 @@ int action; */ if (slave->cstate & CS_UPTODATE) { +CYCLES cycle_start, cycles; spinlock_release(&router->lock); LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, "Force slave %d into catchup mode %s@%d\n", @@ -990,7 +1036,10 @@ int action; spinlock_acquire(&slave->catch_lock); slave->cstate &= ~(CS_UPTODATE|CS_DIST); spinlock_release(&slave->catch_lock); +cycle_start = rdtsc(); blr_slave_catchup(router, slave); +cycles = rdtsc() - cycle_start; +log_duration(LOGD_SLAVE_CATCHUP1, cycles); spinlock_acquire(&router->lock); slave = router->slaves; if (slave) @@ -1004,6 +1053,7 @@ int action; slave = slave->next; } spinlock_release(&router->lock); + log_duration(LOGD_DISTRIBUTE, rdtsc() - entry); } static void diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index 176efbe4c..5d7a16475 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -95,7 +95,7 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) return 0; } - atomic_add(&slave->stats.n_requests, 1); + slave->stats.n_requests++; switch (MYSQL_COMMAND(queue)) { case COM_QUERY: @@ -796,7 +796,7 @@ doitagain: slave->binlogfile))); return 0; } - atomic_add(&slave->stats.n_bursts, 1); + slave->stats.n_bursts++; spinlock_acquire(&slave->catch_lock); slave->cstate |= CS_INNERLOOP; spinlock_release(&slave->catch_lock); @@ -830,7 +830,7 @@ if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++; slave->binlog_pos = hdr.next_pos; } rval = written; - atomic_add(&slave->stats.n_events, 1); + slave->stats.n_events++; burst++; } if (record == NULL) @@ -843,7 +843,7 @@ if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++; } while (record && DCB_BELOW_LOW_WATER(slave->dcb)); if (record) { - atomic_add(&slave->stats.n_flows, 1); + slave->stats.n_flows++; spinlock_acquire(&slave->catch_lock); slave->cstate |= CS_EXPECTCB; spinlock_release(&slave->catch_lock); @@ -854,7 +854,7 @@ if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++; spinlock_acquire(&slave->catch_lock); if ((slave->cstate & CS_UPTODATE) == 0) { - atomic_add(&slave->stats.n_upd, 1); + slave->stats.n_upd++; slave->cstate |= CS_UPTODATE; state_change = 1; } @@ -907,7 +907,7 @@ ROUTER_INSTANCE *router = slave->router; if (slave->state == BLRS_DUMPING && slave->binlog_pos != router->binlog_position) { - atomic_add(&slave->stats.n_dcb, 1); + slave->stats.n_dcb++; blr_slave_catchup(router, slave); } } @@ -916,12 +916,12 @@ ROUTER_INSTANCE *router = slave->router; { if (slave->state == BLRS_DUMPING) { - atomic_add(&slave->stats.n_cb, 1); + slave->stats.n_cb++; blr_slave_catchup(router, slave); } else { - atomic_add(&slave->stats.n_cbna, 1); + slave->stats.n_cbna++; } } return 0; diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 451b29742..c602d8913 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -1957,6 +1957,26 @@ static int routeQuery( if (succp) /*< Have DCB of the target backend */ { + backend_ref_t* bref; + sescmd_cursor_t* scur; + + bref = get_bref_from_dcb(router_cli_ses, target_dcb); + scur = &bref->bref_sescmd_cur; + /** + * Store current stmt if execution of previous session command + * haven't completed yet. Note that according to MySQL protocol + * there can only be one such non-sescmd stmt at the time. + */ + if (sescmd_cursor_is_active(scur)) + { + ss_dassert(bref->bref_pending_cmd == NULL); + bref->bref_pending_cmd = gwbuf_clone(querybuf); + + rses_end_locked_router_action(router_cli_ses); + ret = 1; + goto retblock; + } + if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1) { backend_ref_t* bref; @@ -2295,7 +2315,34 @@ static void clientReply ( ss_dassert(succp); } - /** Unlock router session */ + else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */ + { + int ret; + + CHK_GWBUF(bref->bref_pending_cmd); + + if ((ret = bref->bref_dcb->func.write(bref->bref_dcb, + gwbuf_clone(bref->bref_pending_cmd))) == 1) + { + ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; + atomic_add(&inst->stats.n_queries, 1); + /** + * Add one query response waiter to backend reference + */ + bref_set_state(bref, BREF_QUERY_ACTIVE); + bref_set_state(bref, BREF_WAITING_RESULT); + } + else + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Routing query \"%s\" failed.", + bref->bref_pending_cmd))); + } + gwbuf_free(bref->bref_pending_cmd); + bref->bref_pending_cmd = NULL; + } + /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); lock_failed: @@ -3656,6 +3703,14 @@ static bool route_session_write( { succp = false; } + else if (LOG_IS_ENABLED(LOGFILE_TRACE)) + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Wrote to %s:%d", + backend_ref[i].bref_backend->backend_server->name, + backend_ref[i].bref_backend->backend_server->port))); + } } } rses_end_locked_router_action(router_cli_ses);