Merge branch 'develop' into MAX-268

This commit is contained in:
MassimilianoPinto 2014-09-29 09:51:57 +02:00
commit 1d14129edf
24 changed files with 747 additions and 442 deletions

Binary file not shown.

View File

@ -57,7 +57,7 @@ depend:
install:
(cd server; make DEST=$(DEST) install)
(cd log_manager; make DEST=$(DEST) install)
(cd query_classifier; make DEST=$(DEST) install)
(cd query_classifier;touch depend; make DEST=$(DEST) install)
(cd client; make DEST=$(DEST) install)
cleantests:

View File

@ -0,0 +1,53 @@
#!/bin/sh
failure=0
passed=0
clients=20
cmdcnt=1000
echo Running $clients parallel iterations of $cmdcnt commands
for ((cnt=0; cnt<$clients; cnt++ )); do
for ((i=0; i<$cmdcnt; i++ )); do
maxadmin -pskysql show services;
done >/dev/null &
done >& /dev/null
peak=0
while [ "`jobs -p`" != "" ]; do
jobs >& /dev/null
zombies=`maxadmin -pskysql list dcbs | grep -ci zombies`
if [ $zombies -gt $peak ] ; then
peak=$zombies
fi
sleep 1
done
if [ $peak -gt 10 ]; then
echo "High peak zombie count ($peak): Failed"
failure=`expr $failure + 1`
else
passed=`expr $passed + 1`
echo "Zombie collection ($peak): Passed"
fi
zombies=`maxadmin -pskysql list dcbs | grep -ci zombies`
if [ $zombies != "0" ]; then
echo "Residual zombie DCBs: Failed"
failure=`expr $failure + 1`
else
passed=`expr $passed + 1`
echo "Residual zombie DCBs: Passed"
fi
sessions=`maxadmin -pskysql list services | awk -F\| '/ cli/ { print $3 }'`
if [ $sessions -gt 3 ]; then
echo "Session shutdown, $sessions: Failed"
failure=`expr $failure + 1`
else
passed=`expr $passed + 1`
echo "Session shutdown: Passed"
fi
sessions=`maxadmin -pskysql list services | awk -F\| '/ cli/ { print $4 }'`
echo "Test run complete. $passed passes, $failure failures"
echo "$sessions CLI sessions executed"
exit $failure

View File

@ -212,9 +212,14 @@ endmacro()
function(subdirs VAR DIRPATH)
if(${CMAKE_VERSION} VERSION_LESS 2.12 )
set(COMP_VAR PATH)
else()
set(COMP_VAR DIRECTORY)
endif()
file(GLOB_RECURSE SDIR ${DIRPATH}/*)
foreach(LOOP ${SDIR})
get_filename_component(LOOP ${LOOP} DIRECTORY)
get_filename_component(LOOP ${LOOP} ${COMP_VAR})
list(APPEND ALLDIRS ${LOOP})
endforeach()
list(REMOVE_DUPLICATES ALLDIRS)

View File

@ -123,12 +123,6 @@ DCB *rval;
rval->dcb_errhandle_called = false;
#endif
rval->dcb_role = role;
#if 1
simple_mutex_init(&rval->dcb_write_lock, "DCB write mutex");
simple_mutex_init(&rval->dcb_read_lock, "DCB read mutex");
rval->dcb_write_active = false;
rval->dcb_read_active = false;
#endif
spinlock_init(&rval->dcb_initlock);
spinlock_init(&rval->writeqlock);
spinlock_init(&rval->delayqlock);
@ -141,6 +135,13 @@ DCB *rval;
rval->polloutbusy = 0;
rval->writecheck = 0;
rval->fd = -1;
rval->evq.next = NULL;
rval->evq.prev = NULL;
rval->evq.pending_events = 0;
rval->evq.processing = 0;
spinlock_init(&rval->evq.eventqlock);
memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics
rval->state = DCB_STATE_ALLOC;
bitmask_init(&rval->memdata.bitmask);
@ -218,43 +219,11 @@ dcb_add_to_zombieslist(DCB *dcb)
spinlock_release(&zombiespin);
return;
}
#if 1
/*<
* Add closing dcb to the top of the list.
*/
dcb->memdata.next = zombies;
zombies = dcb;
#else
if (zombies == NULL) {
zombies = dcb;
} else {
DCB *ptr = zombies;
while (ptr->memdata.next)
{
ss_info_dassert(
ptr->memdata.next->state == DCB_STATE_ZOMBIE,
"Next zombie is not in DCB_STATE_ZOMBIE state");
ss_info_dassert(
ptr != dcb,
"Attempt to add DCB to zombies list although it "
"is already there.");
if (ptr == dcb)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Attempt to add DCB to zombies "
"list when it is already in the list")));
break;
}
ptr = ptr->memdata.next;
}
if (ptr != dcb) {
ptr->memdata.next = dcb;
}
}
#endif
/*<
* Set state which indicates that it has been added to zombies
* list.
@ -389,8 +358,6 @@ DCB_CALLBACK *cb;
spinlock_release(&dcb->cb_lock);
bitmask_free(&dcb->memdata.bitmask);
simple_mutex_done(&dcb->dcb_read_lock);
simple_mutex_done(&dcb->dcb_write_lock);
free(dcb);
}
@ -404,15 +371,10 @@ DCB_CALLBACK *cb;
* the memdata.bitmask then the DCB is no longer able to be
* referenced and it can be finally removed.
*
* The excluded DCB allows a thread to exclude a DCB from zombie processing.
* It is used when a thread calls dcb_process_zombies when there is
* a DCB that the caller knows it will continue processing with.
*
* @param threadid The thread ID of the caller
* @param excluded The DCB the thread currently uses, NULL or valid DCB.
*/
DCB *
dcb_process_zombies(int threadid, DCB *excluded)
dcb_process_zombies(int threadid)
{
DCB *ptr, *lptr;
DCB* dcb_list = NULL;
@ -446,9 +408,10 @@ bool succp = false;
CHK_DCB(ptr);
/*
* Skip processing of the excluded DCB
* Skip processing of DCB's that are
* in the event queue waiting to be processed.
*/
if (ptr == excluded)
if (ptr->evq.next || ptr->evq.prev)
{
lptr = ptr;
ptr = ptr->memdata.next;
@ -522,7 +485,6 @@ bool succp = false;
/*<
* Close file descriptor and move to clean-up phase.
*/
ss_dassert(excluded != dcb);
rc = close(dcb->fd);
if (rc < 0) {
@ -1255,7 +1217,7 @@ printDCB(DCB *dcb)
static void
spin_reporter(void *dcb, char *desc, int value)
{
dcb_printf((DCB *)dcb, "\t\t%-35s %d\n", desc, value);
dcb_printf((DCB *)dcb, "\t\t%-40s %d\n", desc, value);
}
@ -1316,10 +1278,6 @@ DCB *dcb;
dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls);
dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_readrechecks);
dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls);
dcb_printf(pdcb, "\t\tNo. of write rechecks: %d\n", dcb->stats.n_writerechecks);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
if (dcb->flags & DCBF_CLONE)
@ -1427,10 +1385,6 @@ dprintDCB(DCB *pdcb, DCB *dcb)
dcb->stats.n_buffered);
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n",
dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls);
dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_readrechecks);
dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls);
dcb_printf(pdcb, "\t\tNo. of write rechecks: %d\n", dcb->stats.n_writerechecks);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n",
dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n",
@ -1955,99 +1909,6 @@ int rval = 0;
return rval;
}
/**
* Called by the EPOLLIN event. Take care of calling the protocol
* read entry point and managing multiple threads competing for the DCB
* without blocking those threads.
*
* This mechanism does away with the need for a mutex on the EPOLLIN event
* and instead implements a queuing mechanism in which nested events are
* queued on the DCB such that when the thread processing the first event
* returns it will read the queued event and process it. This allows the
* thread that would otherwise have to wait to process the nested event
* to return immediately and and process other events.
*
* @param dcb The DCB that has data available
* @param thread_id The ID of the calling thread
* @param nozombies If non-zero then do not do zombie processing
*/
void
dcb_pollin(DCB *dcb, int thread_id, int nozombies)
{
spinlock_acquire(&dcb->pollinlock);
if (dcb->pollinbusy == 0)
{
dcb->pollinbusy = 1;
do {
if (dcb->readcheck)
{
dcb->stats.n_readrechecks++;
if (!nozombies)
dcb_process_zombies(thread_id, dcb);
}
dcb->readcheck = 0;
spinlock_release(&dcb->pollinlock);
dcb->func.read(dcb);
spinlock_acquire(&dcb->pollinlock);
} while (dcb->readcheck);
dcb->pollinbusy = 0;
}
else
{
dcb->stats.n_busypolls++;
dcb->readcheck = 1;
}
spinlock_release(&dcb->pollinlock);
}
/**
* Called by the EPOLLOUT event. Take care of calling the protocol
* write_ready entry point and managing multiple threads competing for the DCB
* without blocking those threads.
*
* This mechanism does away with the need for a mutex on the EPOLLOUT event
* and instead implements a queuing mechanism in which nested events are
* queued on the DCB such that when the thread processing the first event
* returns it will read the queued event and process it. This allows the
* thread that would otherwise have to wait to process the nested event
* to return immediately and and process other events.
*
* @param dcb The DCB thats available for writes
* @param thread_id The ID of the calling thread
* @param nozombies If non-zero then do not do zombie processing
*/
void
dcb_pollout(DCB *dcb, int thread_id, int nozombies)
{
spinlock_acquire(&dcb->polloutlock);
if (dcb->polloutbusy == 0)
{
dcb->polloutbusy = 1;
do {
if (dcb->writecheck)
{
if (!nozombies)
dcb_process_zombies(thread_id, dcb);
dcb->stats.n_writerechecks++;
}
dcb->writecheck = 0;
spinlock_release(&dcb->polloutlock);
dcb->func.write_ready(dcb);
spinlock_acquire(&dcb->polloutlock);
} while (dcb->writecheck);
dcb->polloutbusy = 0;
}
else
{
dcb->stats.n_busywrpolls++;
dcb->writecheck = 1;
}
spinlock_release(&dcb->polloutlock);
}
/**
* Get the next DCB in the list of all DCB's

View File

@ -1646,6 +1646,7 @@ void
shutdown_server()
{
poll_shutdown();
hkshutdown();
log_flush_shutdown();
}

View File

@ -42,6 +42,8 @@ static HKTASK *tasks = NULL;
*/
static SPINLOCK tasklock = SPINLOCK_INIT;
static int do_shutdown = 0;
static void hkthread(void *);
/**
@ -172,6 +174,8 @@ void *taskdata;
for (;;)
{
if (do_shutdown)
return;
thread_millisleep(1000);
now = time(0);
spinlock_acquire(&tasklock);
@ -194,3 +198,13 @@ void *taskdata;
spinlock_release(&tasklock);
}
}
/**
* Called to shutdown the housekeeper
*
*/
void
hkshutdown()
{
do_shutdown = 1;
}

View File

@ -30,6 +30,13 @@
#include <gw.h>
#include <config.h>
#include <housekeeper.h>
#include <mysql.h>
#define PROFILE_POLL 1
#if PROFILE_POLL
#include <rdtsc.h>
#endif
extern int lm_enabled_logfiles_bitmask;
@ -47,6 +54,11 @@ extern int lm_enabled_logfiles_bitmask;
* etc.
* 23/09/14 Mark Riddoch Make use of RDHUP conditional to allow CentOS 5
* builds.
* 24/09/14 Mark Riddoch Introduction of the event queue for processing the
* incoming events rather than processing them immediately
* in the loop after the epoll_wait. This allows for better
* thread utilisaiton and fairer scheduling of the event
* processing.
*
* @endverbatim
*/
@ -65,6 +77,11 @@ static GWBITMASK poll_mask;
static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */
#endif
static int n_waiting = 0; /*< No. of threads in epoll_wait */
static int process_pollq(int thread_id);
DCB *eventq = NULL;
SPINLOCK pollqlock = SPINLOCK_INIT;
/**
* Thread load average, this is the average number of descriptors in each
@ -125,6 +142,8 @@ static struct {
int n_nothreads; /*< Number of times no threads are polling */
int n_fds[MAXNFDS]; /*< Number of wakeups with particular
n_fds value */
int evq_length; /*< Event queue length */
int evq_max; /*< Maximum event queue length */
} pollStats;
/**
@ -338,6 +357,17 @@ return_rc:
* deschedule a process if a timeout is included, but will not do this if a 0 timeout
* value is given. this improves performance when the gateway is under heavy load.
*
* In order to provide a fairer means of sharign the threads between the different
* DCB's the poll mechanism has been decoupled from the processing of the events.
* The events are now recieved via the epoll_wait call, a queue of DCB's that have
* events pending is maintained and as new events arrive the DCB is added to the end
* of this queue. If an eent arrives for a DCB alreayd in the queue, then the event
* bits are added to the DCB but the DCB mantains the same point in the queue unless
* the original events are already being processed. If they are being processed then
* the DCB is moved to the back of the queue, this means that a DCB that is receiving
* events at a high rate will not block the execution of events for other DCB's and
* should result in a fairer polling strategy.
*
* @param arg The thread ID passed as a void * to satisfy the threading package
*/
void
@ -346,8 +376,6 @@ poll_waitevents(void *arg)
struct epoll_event events[MAX_EVENTS];
int i, nfds;
int thread_id = (int)arg;
bool no_op = false;
static bool process_zombies_only = false; /*< flag for all threads */
DCB *zombies = NULL;
/** Add this thread to the bitmask of running polling threads */
@ -362,20 +390,19 @@ DCB *zombies = NULL;
while (1)
{
/* Process of the queue of waiting requests */
while (do_shutdown == 0 && process_pollq(thread_id))
{
if (thread_data)
thread_data[thread_id].state = THREAD_ZPROCESSING;
zombies = dcb_process_zombies(thread_id);
}
atomic_add(&n_waiting, 1);
#if BLOCKINGPOLL
nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
atomic_add(&n_waiting, -1);
#else /* BLOCKINGPOLL */
if (!no_op) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] MaxScale thread "
"%d > epoll_wait <",
pthread_self(),
thread_id)));
no_op = TRUE;
}
#if MUTEX_EPOLL
simple_mutex_lock(&epoll_wait_mutex, TRUE);
#endif
@ -396,32 +423,19 @@ DCB *zombies = NULL;
pthread_self(),
nfds,
eno)));
no_op = FALSE;
}
else if (nfds == 0)
/*
* If there are no new descriptors from the non-blocking call
* and nothing to proces on the event queue then for do a
* blocking call to epoll_wait.
*/
else if (nfds == 0 && process_pollq(thread_id) == 0)
{
atomic_add(&n_waiting, -1);
if (process_zombies_only) {
#if MUTEX_EPOLL
simple_mutex_unlock(&epoll_wait_mutex);
#endif
goto process_zombies;
} else {
nfds = epoll_wait(epoll_fd,
atomic_add(&n_waiting, 1);
nfds = epoll_wait(epoll_fd,
events,
MAX_EVENTS,
EPOLL_TIMEOUT);
/*<
* When there are zombies to be cleaned up but
* no client requests, allow all threads to call
* dcb_process_zombies without having to wait
* for the timeout.
*/
if (nfds == 0 && dcb_get_zombies() != NULL)
{
process_zombies_only = true;
}
}
}
else
{
@ -457,228 +471,64 @@ DCB *zombies = NULL;
atomic_add(&load_samples, 1);
atomic_add(&load_nfds, nfds);
/*
* Process every DCB that has a new event and add
* it to the poll queue.
* If the DCB is currently beign processed then we
* or in the new eent bits to the pending event bits
* and leave it in the queue.
* If the DCB was not already in the queue then it was
* idle and is added to the queue to process after
* setting the event bits.
*/
for (i = 0; i < nfds; i++)
{
DCB *dcb = (DCB *)events[i].data.ptr;
__uint32_t ev = events[i].events;
CHK_DCB(dcb);
if (thread_data)
spinlock_acquire(&pollqlock);
if (DCB_POLL_BUSY(dcb))
{
thread_data[thread_id].cur_dcb = dcb;
thread_data[thread_id].event = ev;
dcb->evq.pending_events |= ev;
}
#if defined(SS_DEBUG)
if (dcb_fake_write_ev[dcb->fd] != 0) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Added fake events %d to ev %d.",
pthread_self(),
dcb_fake_write_ev[dcb->fd],
ev)));
ev |= dcb_fake_write_ev[dcb->fd];
dcb_fake_write_ev[dcb->fd] = 0;
}
#endif
ss_debug(spinlock_acquire(&dcb->dcb_initlock);)
ss_dassert(dcb->state != DCB_STATE_ALLOC);
ss_dassert(dcb->state != DCB_STATE_DISCONNECTED);
ss_dassert(dcb->state != DCB_STATE_FREED);
ss_debug(spinlock_release(&dcb->dcb_initlock);)
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] event %d dcb %p "
"role %s",
pthread_self(),
ev,
dcb,
STRDCBROLE(dcb->dcb_role))));
if (ev & EPOLLOUT)
else
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
if (eno == 0) {
#if MUTEX_BLOCK
simple_mutex_lock(
&dcb->dcb_write_lock,
true);
ss_info_dassert(
!dcb->dcb_write_active,
"Write already active");
dcb->dcb_write_active = TRUE;
atomic_add(
&pollStats.n_write,
1);
dcb->func.write_ready(dcb);
dcb->dcb_write_active = FALSE;
simple_mutex_unlock(
&dcb->dcb_write_lock);
#else
atomic_add(&pollStats.n_write,
1);
dcb_pollout(dcb, thread_id, nfds);
#endif
} else {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLOUT due %d, %s. "
"dcb %p, fd %i",
pthread_self(),
eno,
strerror(eno),
dcb,
dcb->fd)));
}
}
if (ev & EPOLLIN)
{
#if MUTEX_BLOCK
simple_mutex_lock(&dcb->dcb_read_lock,
true);
ss_info_dassert(!dcb->dcb_read_active,
"Read already active");
dcb->dcb_read_active = TRUE;
#endif
if (dcb->state == DCB_STATE_LISTENING)
dcb->evq.pending_events = ev;
if (eventq)
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Accept in fd %d",
pthread_self(),
dcb->fd)));
atomic_add(
&pollStats.n_accept, 1);
dcb->func.accept(dcb);
}
else
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Read in dcb %p fd %d",
pthread_self(),
dcb,
dcb->fd)));
atomic_add(&pollStats.n_read, 1);
#if MUTEX_BLOCK
dcb->func.read(dcb);
#else
dcb_pollin(dcb, thread_id, nfds);
#endif
}
#if MUTEX_BLOCK
dcb->dcb_read_active = FALSE;
simple_mutex_unlock(
&dcb->dcb_read_lock);
#endif
}
if (ev & EPOLLERR)
{
int eno = gw_getsockerrno(dcb->fd);
#if defined(SS_DEBUG)
if (eno == 0) {
eno = dcb_fake_write_errno[dcb->fd];
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Added fake errno %d. "
"%s",
pthread_self(),
eno,
strerror(eno))));
}
dcb_fake_write_errno[dcb->fd] = 0;
#endif
if (eno != 0) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLERR due %d, %s.",
pthread_self(),
eno,
strerror(eno))));
}
atomic_add(&pollStats.n_error, 1);
dcb->func.error(dcb);
}
if (ev & EPOLLHUP)
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLHUP on dcb %p, fd %d. "
"Errno %d, %s.",
pthread_self(),
dcb,
dcb->fd,
eno,
strerror(eno))));
atomic_add(&pollStats.n_hup, 1);
spinlock_acquire(&dcb->dcb_initlock);
if ((dcb->flags & DCBF_HUNG) == 0)
{
dcb->flags |= DCBF_HUNG;
spinlock_release(&dcb->dcb_initlock);
dcb->func.hangup(dcb);
dcb->evq.prev = eventq->evq.prev;
eventq->evq.prev->evq.next = dcb;
eventq->evq.prev = dcb;
dcb->evq.next = eventq;
}
else
spinlock_release(&dcb->dcb_initlock);
}
#ifdef EPOLLRDHUP
if (ev & EPOLLRDHUP)
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLRDHUP on dcb %p, fd %d. "
"Errno %d, %s.",
pthread_self(),
dcb,
dcb->fd,
eno,
strerror(eno))));
atomic_add(&pollStats.n_hup, 1);
spinlock_acquire(&dcb->dcb_initlock);
if ((dcb->flags & DCBF_HUNG) == 0)
{
dcb->flags |= DCBF_HUNG;
spinlock_release(&dcb->dcb_initlock);
dcb->func.hangup(dcb);
eventq = dcb;
dcb->evq.prev = dcb;
dcb->evq.next = dcb;
}
pollStats.evq_length++;
if (pollStats.evq_length > pollStats.evq_max)
{
pollStats.evq_max = pollStats.evq_length;
}
else
spinlock_release(&dcb->dcb_initlock);
}
#endif
} /*< for */
no_op = FALSE;
} /*< if (nfds > 0) */
process_zombies:
if (thread_data)
{
thread_data[thread_id].state = THREAD_ZPROCESSING;
spinlock_release(&pollqlock);
}
}
zombies = dcb_process_zombies(thread_id, NULL);
if (zombies == NULL) {
process_zombies_only = false;
}
/*
* If there was nothing to process then process the zombie queue
*/
if (process_pollq(thread_id) == 0)
{
if (thread_data)
{
thread_data[thread_id].state = THREAD_ZPROCESSING;
}
zombies = dcb_process_zombies(thread_id);
}
if (do_shutdown)
{
/*<
@ -701,6 +551,320 @@ process_zombies:
} /*< while(1) */
}
/**
* Process of the queue of DCB's that have outstanding events
*
* The first event on the queue will be chosen to be executed by this thread,
* all other events will be left on the queue and may be picked up by other
* threads. When the processing is complete the thread will take the DCB off the
* queue if there are no pending events that have arrived since the thread started
* to process the DCB. If there are pending events the DCB will be moved to the
* back of the queue so that other DCB's will have a share of the threads to
* execute events for them.
*
* @param thread_id The thread ID of the calling thread
* @return 0 if no DCB's have been processed
*/
static int
process_pollq(int thread_id)
{
DCB *dcb;
int found = 0;
uint32_t ev;
spinlock_acquire(&pollqlock);
if (eventq == NULL)
{
/* Nothing to process */
spinlock_release(&pollqlock);
return 0;
}
dcb = eventq;
if (dcb->evq.next == dcb->evq.prev && dcb->evq.processing == 0)
{
found = 1;
dcb->evq.processing = 1;
}
else if (dcb->evq.next == dcb->evq.prev)
{
/* Only item in queue is being processed */
spinlock_release(&pollqlock);
return 0;
}
else
{
do {
dcb = dcb->evq.next;
} while (dcb != eventq && dcb->evq.processing == 1);
if (dcb->evq.processing == 0)
{
/* Found DCB to process */
dcb->evq.processing = 1;
found = 1;
}
}
if (found)
{
ev = dcb->evq.pending_events;
dcb->evq.pending_events = 0;
}
spinlock_release(&pollqlock);
if (found == 0)
return 0;
CHK_DCB(dcb);
if (thread_data)
{
thread_data[thread_id].state = THREAD_PROCESSING;
thread_data[thread_id].cur_dcb = dcb;
thread_data[thread_id].event = ev;
}
#if defined(SS_DEBUG)
if (dcb_fake_write_ev[dcb->fd] != 0) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Added fake events %d to ev %d.",
pthread_self(),
dcb_fake_write_ev[dcb->fd],
ev)));
ev |= dcb_fake_write_ev[dcb->fd];
dcb_fake_write_ev[dcb->fd] = 0;
}
#endif
ss_debug(spinlock_acquire(&dcb->dcb_initlock);)
ss_dassert(dcb->state != DCB_STATE_ALLOC);
ss_dassert(dcb->state != DCB_STATE_DISCONNECTED);
ss_dassert(dcb->state != DCB_STATE_FREED);
ss_debug(spinlock_release(&dcb->dcb_initlock);)
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] event %d dcb %p "
"role %s",
pthread_self(),
ev,
dcb,
STRDCBROLE(dcb->dcb_role))));
if (ev & EPOLLOUT)
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
if (eno == 0) {
#if MUTEX_BLOCK
simple_mutex_lock(
&dcb->dcb_write_lock,
true);
ss_info_dassert(
!dcb->dcb_write_active,
"Write already active");
dcb->dcb_write_active = TRUE;
atomic_add(
&pollStats.n_write,
1);
dcb->func.write_ready(dcb);
dcb->dcb_write_active = FALSE;
simple_mutex_unlock(
&dcb->dcb_write_lock);
#else
atomic_add(&pollStats.n_write,
1);
dcb->func.write_ready(dcb);
#endif
} else {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLOUT due %d, %s. "
"dcb %p, fd %i",
pthread_self(),
eno,
strerror(eno),
dcb,
dcb->fd)));
}
}
if (ev & EPOLLIN)
{
#if MUTEX_BLOCK
simple_mutex_lock(&dcb->dcb_read_lock,
true);
ss_info_dassert(!dcb->dcb_read_active,
"Read already active");
dcb->dcb_read_active = TRUE;
#endif
if (dcb->state == DCB_STATE_LISTENING)
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Accept in fd %d",
pthread_self(),
dcb->fd)));
atomic_add(
&pollStats.n_accept, 1);
dcb->func.accept(dcb);
}
else
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Read in dcb %p fd %d",
pthread_self(),
dcb,
dcb->fd)));
atomic_add(&pollStats.n_read, 1);
dcb->func.read(dcb);
}
#if MUTEX_BLOCK
dcb->dcb_read_active = FALSE;
simple_mutex_unlock(
&dcb->dcb_read_lock);
#endif
}
if (ev & EPOLLERR)
{
int eno = gw_getsockerrno(dcb->fd);
#if defined(SS_DEBUG)
if (eno == 0) {
eno = dcb_fake_write_errno[dcb->fd];
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Added fake errno %d. "
"%s",
pthread_self(),
eno,
strerror(eno))));
}
dcb_fake_write_errno[dcb->fd] = 0;
#endif
if (eno != 0) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLERR due %d, %s.",
pthread_self(),
eno,
strerror(eno))));
}
atomic_add(&pollStats.n_error, 1);
dcb->func.error(dcb);
}
if (ev & EPOLLHUP)
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLHUP on dcb %p, fd %d. "
"Errno %d, %s.",
pthread_self(),
dcb,
dcb->fd,
eno,
strerror(eno))));
atomic_add(&pollStats.n_hup, 1);
spinlock_acquire(&dcb->dcb_initlock);
if ((dcb->flags & DCBF_HUNG) == 0)
{
dcb->flags |= DCBF_HUNG;
spinlock_release(&dcb->dcb_initlock);
dcb->func.hangup(dcb);
}
else
spinlock_release(&dcb->dcb_initlock);
}
#ifdef EPOLLRDHUP
if (ev & EPOLLRDHUP)
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLRDHUP on dcb %p, fd %d. "
"Errno %d, %s.",
pthread_self(),
dcb,
dcb->fd,
eno,
strerror(eno))));
atomic_add(&pollStats.n_hup, 1);
spinlock_acquire(&dcb->dcb_initlock);
if ((dcb->flags & DCBF_HUNG) == 0)
{
dcb->flags |= DCBF_HUNG;
spinlock_release(&dcb->dcb_initlock);
dcb->func.hangup(dcb);
}
else
spinlock_release(&dcb->dcb_initlock);
}
#endif
spinlock_acquire(&pollqlock);
if (dcb->evq.pending_events == 0)
{
/* No pending events so remove from the queue */
if (dcb->evq.prev != dcb)
{
dcb->evq.prev->evq.next = dcb->evq.next;
dcb->evq.next->evq.prev = dcb->evq.prev;
if (eventq == dcb)
eventq = dcb->evq.next;
}
else
{
eventq = NULL;
}
dcb->evq.next = NULL;
dcb->evq.prev = NULL;
pollStats.evq_length--;
}
else
{
/*
* We have a pending event, move to the end of the queue
* if there are any other DCB's in the queue.
*
* If we are the first item on the queue this is easy, we
* just bump the eventq pointer.
*/
if (dcb->evq.prev != dcb)
{
if (eventq == dcb)
eventq = dcb->evq.next;
else
{
dcb->evq.prev->evq.next = dcb->evq.next;
dcb->evq.next->evq.prev = dcb->evq.prev;
dcb->evq.prev = eventq->evq.prev;
dcb->evq.next = eventq;
eventq->evq.prev = dcb;
dcb->evq.prev->evq.next = dcb;
}
}
}
dcb->evq.processing = 0;
spinlock_release(&pollqlock);
return 1;
}
/**
* Shutdown the polling loop
*/
@ -721,6 +885,20 @@ poll_bitmask()
return &poll_mask;
}
/**
* Display an entry from the spinlock statistics data
*
* @param dcb The DCB to print to
* @param desc Description of the statistic
* @param value The statistic value
*/
static void
spin_reporter(void *dcb, char *desc, int value)
{
dcb_printf((DCB *)dcb, "\t%-40s %d\n", desc, value);
}
/**
* Debug routine to print the polling statistics
*
@ -745,6 +923,10 @@ int i;
pollStats.n_accept);
dcb_printf(dcb, "Number of times no threads polling: %d\n",
pollStats.n_nothreads);
dcb_printf(dcb, "Current event queue length: %d\n",
pollStats.evq_length);
dcb_printf(dcb, "Maximum event queue length: %d\n",
pollStats.evq_max);
dcb_printf(dcb, "No of poll completions with descriptors\n");
dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n");
@ -754,6 +936,11 @@ int i;
}
dcb_printf(dcb, "\t>= %d\t\t\t%d\n", MAXNFDS,
pollStats.n_fds[MAXNFDS-1]);
#if SPINLOCK_PROFILE
dcb_printf(dcb, "Event queue lock statistics:\n");
spinlock_stats(&pollqlock, spin_reporter, dcb);
#endif
}
/**

View File

@ -53,7 +53,8 @@ struct service;
* 07/02/2014 Massimiliano Pinto Added ipv4 data struct into for dcb
* 07/05/2014 Mark Riddoch Addition of callback mechanism
* 08/05/2014 Mark Riddoch Addition of writeq high and low watermarks
* 27/08/2014 Mark Ridddoch Addition of write event queuing
* 27/08/2014 Mark Riddoch Addition of write event queuing
* 23/09/2014 Mark Riddoch New poll processing queue
*
* @endverbatim
*/
@ -97,6 +98,14 @@ typedef struct gw_protocol {
int (*session)(struct dcb *, void *);
} GWPROTOCOL;
typedef struct {
struct dcb *next;
struct dcb *prev;
uint32_t pending_events;
int processing;
SPINLOCK eventqlock;
} DCBEVENTQ;
/**
* The GWPROTOCOL version data. The following should be updated whenever
* the GWPROTOCOL structure is changed. See the rules defined in modinfo.h
@ -114,10 +123,6 @@ typedef struct dcbstats {
int n_buffered; /*< Number of buffered writes */
int n_high_water; /*< Number of crosses of high water mark */
int n_low_water; /*< Number of crosses of low water mark */
int n_busypolls; /*< Number of read polls whiel reading */
int n_readrechecks; /*< Number of rechecks for reads */
int n_busywrpolls; /*< Number of write polls while writing */
int n_writerechecks;/*< Number of rechecks for writes */
} DCBSTATS;
/**
@ -204,12 +209,7 @@ typedef struct dcb {
#endif
dcb_role_t dcb_role;
SPINLOCK dcb_initlock;
#if 1
simple_mutex_t dcb_read_lock;
simple_mutex_t dcb_write_lock;
bool dcb_read_active;
bool dcb_write_active;
#endif
DCBEVENTQ evq; /**< The event queue for this DCB */
int fd; /**< The descriptor */
dcb_state_t state; /**< Current descriptor state */
int flags; /**< DCB flags */
@ -271,8 +271,8 @@ int fail_accept_errno;
#define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water)
#define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water)
void dcb_pollin(DCB *, int, int);
void dcb_pollout(DCB *, int, int);
#define DCB_POLL_BUSY(x) ((x)->evq.next != NULL)
DCB *dcb_get_zombies(void);
int gw_write(
#if defined(SS_DEBUG)
@ -289,7 +289,7 @@ DCB *dcb_clone(DCB *);
int dcb_read(DCB *, GWBUF **);
int dcb_drain_writeq(DCB *);
void dcb_close(DCB *);
DCB *dcb_process_zombies(int, DCB*); /* Process Zombies except the one behind the pointer */
DCB *dcb_process_zombies(int); /* Process Zombies except the one behind the pointer */
void printAllDCBs(); /* Debug to print all DCB in the system */
void printDCB(DCB *); /* Debug print routine */
void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */

View File

@ -28,10 +28,10 @@
* Configuration for send and receive socket buffer sizes for
* backend and cleint connections.
*/
#define GW_BACKEND_SO_SNDBUF 32768
#define GW_BACKEND_SO_RCVBUF 32768
#define GW_CLIENT_SO_SNDBUF 32768
#define GW_CLIENT_SO_RCVBUF 32768
#define GW_BACKEND_SO_SNDBUF (128 * 1024)
#define GW_BACKEND_SO_RCVBUF (128 * 1024)
#define GW_CLIENT_SO_SNDBUF (128 * 1024)
#define GW_CLIENT_SO_RCVBUF (128 * 1024)
#define GW_NOINTR_CALL(A) do { errno = 0; A; } while (errno == EINTR)
#define GW_MYSQL_LOOP_TIMEOUT 300000000

View File

@ -47,4 +47,5 @@ typedef struct hktask {
extern void hkinit();
extern int hktask_add(char *name, void (*task)(void *), void *data, int frequency);
extern int hktask_remove(char *name);
extern void hkshutdown();
#endif

61
server/include/rdtsc.h Normal file
View File

@ -0,0 +1,61 @@
#ifndef _RDTSC_H
#define _RDTSC_H
/*
* This file is distributed as part of the SkySQL Gateway. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* @file rdtsc.h Access the process time-stamp counter
*
* This is an Intel only facilty that is used to access an accurate time
* value, the granularity of which is related to the processor clock speed
* and the overhead for access is much lower than using any system call
* mechanism.
*
*
* @verbatim
* Revision History
*
* Date Who Description
* 19/09/2014 Mark Riddoch Initial implementation
*
* @endverbatim
*/
typedef unsigned long long CYCLES;
/**
* Get the current time-stamp counter value from the processor. This is the
* count of CPU cyceles as a 64 bit value.
*
* The value returned is related to the clock speed, to obtian a value in
* seconds divide the returned value by the clock frequency for the processor.
*
* Note, on multi-processsor systems care much be taken to avoid the thread
* moving to a different processor when taken succsive value of RDTSC to
* obtian accurate timing. This may be done by setting pocessor affinity for
* the thread. See sched_setaffinity/sched_getaffinity.
*
* @return CPU cycle count
*/
static __inline__ CYCLES rdtsc(void)
{
unsigned long long int x;
__asm__ volatile (".byte 0x0f, 0x31" : "=A" (x));
return x;
}
#endif

View File

@ -43,8 +43,8 @@
* High and Low water marks for the slave dcb. These values can be overriden
* by the router options highwater and lowwater.
*/
#define DEF_LOW_WATER 20000
#define DEF_HIGH_WATER 300000
#define DEF_LOW_WATER 2000
#define DEF_HIGH_WATER 30000
/**
* Some useful macros for examining the MySQL Response packets
@ -194,6 +194,7 @@ typedef struct router_instance {
char *uuid; /*< UUID for the router to use w/master */
int masterid; /*< Server ID of the master */
int serverid; /*< Server ID to use with master */
int initbinlog; /*< Initial binlog file number */
char *user; /*< User name to use with master */
char *password; /*< Password to use with master */
char *fileroot; /*< Root of binlog filename */

View File

@ -224,6 +224,7 @@ typedef struct backend_ref_st {
bref_state_t bref_state;
int bref_num_result_wait;
sescmd_cursor_t bref_sescmd_cur;
GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */
#if defined(SS_DEBUG)
skygw_chk_t bref_chk_tail;
#endif

View File

@ -1180,9 +1180,9 @@ int gw_MySQLAccept(DCB *listener)
conn_open[c_sock] = true;
#endif
/* set nonblocking */
sendbuf = GW_BACKEND_SO_SNDBUF;
sendbuf = GW_CLIENT_SO_SNDBUF;
setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen);
sendbuf = GW_BACKEND_SO_RCVBUF;
sendbuf = GW_CLIENT_SO_RCVBUF;
setsockopt(c_sock, SOL_SOCKET, SO_RCVBUF, &sendbuf, optlen);
setnonblocking(c_sock);

View File

@ -766,9 +766,9 @@ int gw_do_connect_to_backend(
/* prepare for connect */
setipaddress(&serv_addr.sin_addr, host);
serv_addr.sin_port = htons(port);
bufsize = GW_CLIENT_SO_SNDBUF;
bufsize = GW_BACKEND_SO_SNDBUF;
setsockopt(so, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize));
bufsize = GW_CLIENT_SO_RCVBUF;
bufsize = GW_BACKEND_SO_RCVBUF;
setsockopt(so, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize));
/* set socket to as non-blocking here */
setnonblocking(so);
@ -1660,6 +1660,8 @@ void protocol_archive_srv_command(
server_command_t* h1;
int len = 0;
CHK_PROTOCOL(p);
spinlock_acquire(&p->protocol_lock);
if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE)
@ -1710,6 +1712,7 @@ void protocol_archive_srv_command(
retblock:
spinlock_release(&p->protocol_lock);
CHK_PROTOCOL(p);
}

View File

@ -15,4 +15,4 @@ target_link_libraries(cli log_manager utils)
install(TARGETS cli DESTINATION modules)
add_subdirectory(readwritesplit)
add_subdirectory(binlog)

View File

@ -66,7 +66,7 @@ libcli.so: $(CLIOBJ)
$(CC) $(LDFLAGS) $(CLIOBJ) $(LIBS) -o $@
libreadwritesplit.so:
# (cd readwritesplit; touch depend.mk ; make; cp $@ ..)
(cd readwritesplit; touch depend.mk ; make; cp $@ ..)
.c.o:
$(CC) $(CFLAGS) $< -o $@

View File

@ -172,6 +172,7 @@ int i;
inst->low_water = DEF_LOW_WATER;
inst->high_water = DEF_HIGH_WATER;
inst->initbinlog = 0;
/*
* We only support one server behind this router, since the server is
@ -244,6 +245,10 @@ int i;
{
inst->fileroot = strdup(value);
}
else if (strcmp(options[i], "initialfile") == 0)
{
inst->initbinlog = atoi(value);
}
else if (strcmp(options[i], "lowwater") == 0)
{
inst->low_water = atoi(value);
@ -450,11 +455,14 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
* TODO: Handle closure of master session
*/
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, "Binlog router close session with master")));
LOGFILE_ERROR,
"Binlog router close session with master server %s",
router->service->databases->unique_name)));
blr_master_reconnect(router);
return;
}
CHK_CLIENT_RSES(slave);
/**
* Lock router client session for secure read and update.
*/

View File

@ -115,7 +115,11 @@ struct dirent *dp;
if (n == 0) // No binlog files found
{
sprintf(filename, BINLOG_NAMEFMT, router->fileroot, 1);
if (router->initbinlog)
sprintf(filename, BINLOG_NAMEFMT, router->fileroot,
router->initbinlog);
else
sprintf(filename, BINLOG_NAMEFMT, router->fileroot, 1);
blr_file_create(router, filename);
}
else

View File

@ -55,21 +55,57 @@
#include <skygw_utils.h>
#include <log_manager.h>
#include <rdtsc.h>
/* Temporary requirement for auth data */
#include <mysql_client_server_protocol.h>
#define SAMPLE_COUNT 10000
CYCLES samples[10][SAMPLE_COUNT];
int sample_index[10] = { 0, 0, 0 };
#define LOGD_SLAVE_CATCHUP1 0
#define LOGD_SLAVE_CATCHUP2 1
#define LOGD_DISTRIBUTE 2
#define LOGD_FILE_FLUSH 3
SPINLOCK logspin = SPINLOCK_INIT;
void
log_duration(int sample, CYCLES duration)
{
char fname[100];
int i;
FILE *fp;
spinlock_acquire(&logspin);
samples[sample][sample_index[sample]++] = duration;
if (sample_index[sample] == SAMPLE_COUNT)
{
sprintf(fname, "binlog_profile.%d", sample);
if ((fp = fopen(fname, "a")) != NULL)
{
for (i = 0; i < SAMPLE_COUNT; i++)
fprintf(fp, "%ld\n", samples[sample][i]);
fclose(fp);
}
sample_index[sample] = 0;
}
spinlock_release(&logspin);
}
extern int lm_enabled_logfiles_bitmask;
static GWBUF *blr_make_query(char *statement);
static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
static void encode_value(unsigned char *data, unsigned int value, int len);
static void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
void encode_value(unsigned char *data, unsigned int value, int len);
void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
static void blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
static void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
static void *CreateMySQLAuthData(char *username, char *password, char *database);
static void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
static uint32_t extract_field(uint8_t *src, int bits);
void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
inline uint32_t extract_field(uint8_t *src, int bits);
static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len);
static int keepalive = 1;
@ -105,7 +141,8 @@ GWBUF *buf;
if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Binlog router: failed to connect to master\n")));
"Binlog router: failed to connect to master server '%s'\n",
router->service->databases->unique_name)));
return;
}
@ -459,7 +496,7 @@ int len = 0x1b;
* @param value The value to pack
* @param len Number of bits to encode value into
*/
static void
void
encode_value(unsigned char *data, unsigned int value, int len)
{
while (len > 0)
@ -477,7 +514,7 @@ encode_value(unsigned char *data, unsigned int value, int len)
* @param router The router instance
* @param pkt The binlog records
*/
static void
void
blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
{
uint8_t *msg = NULL, *ptr, *pdata;
@ -765,7 +802,9 @@ static REP_HEADER phdr;
{
ss_dassert(pkt_length == 0);
}
{ CYCLES start = rdtsc();
blr_file_flush(router);
log_duration(LOGD_FILE_FLUSH, rdtsc() - start); }
}
/**
@ -774,7 +813,7 @@ static REP_HEADER phdr;
* @param pkt The incoming packet in a GWBUF chain
* @param hdr The packet header to populate
*/
static void
void
blr_extract_header(uint8_t *ptr, REP_HEADER *hdr)
{
@ -795,10 +834,10 @@ blr_extract_header(uint8_t *ptr, REP_HEADER *hdr)
* @param src The raw packet source
* @param birs The number of bits to extract (multiple of 8)
*/
static uint32_t
extract_field(uint8_t *src, int bits)
inline uint32_t
extract_field(register uint8_t *src, int bits)
{
uint32_t rval = 0, shift = 0;
register uint32_t rval = 0, shift = 0;
while (bits > 0)
{
@ -828,7 +867,7 @@ char file[BINLOG_FNAMELEN+1];
pos = extract_field(ptr+4, 32);
pos <<= 32;
pos |= extract_field(ptr, 32);
slen = len - 8;
slen = len - (8 + 4); // Allow for position and CRC
if (slen > BINLOG_FNAMELEN)
slen = BINLOG_FNAMELEN;
memcpy(file, ptr + 8, slen);
@ -883,14 +922,16 @@ MYSQL_session *auth_info;
* @param hdr The replication event header
* @param ptr The raw replication event data
*/
static void
void
blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
{
GWBUF *pkt;
uint8_t *buf;
ROUTER_SLAVE *slave;
int action;
CYCLES entry;
entry = rdtsc();
spinlock_acquire(&router->lock);
slave = router->slaves;
while (slave)
@ -944,12 +985,16 @@ int action;
spinlock_acquire(&slave->catch_lock);
if (slave->overrun)
{
CYCLES cycle_start, cycles;
slave->stats.n_overrun++;
slave->overrun = 0;
spinlock_release(&router->lock);
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
spinlock_release(&slave->catch_lock);
cycle_start = rdtsc();
blr_slave_catchup(router, slave);
cycles = rdtsc() - cycle_start;
log_duration(LOGD_SLAVE_CATCHUP2, cycles);
spinlock_acquire(&router->lock);
slave = router->slaves;
if (slave)
@ -982,6 +1027,7 @@ int action;
*/
if (slave->cstate & CS_UPTODATE)
{
CYCLES cycle_start, cycles;
spinlock_release(&router->lock);
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"Force slave %d into catchup mode %s@%d\n",
@ -990,7 +1036,10 @@ int action;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
spinlock_release(&slave->catch_lock);
cycle_start = rdtsc();
blr_slave_catchup(router, slave);
cycles = rdtsc() - cycle_start;
log_duration(LOGD_SLAVE_CATCHUP1, cycles);
spinlock_acquire(&router->lock);
slave = router->slaves;
if (slave)
@ -1004,6 +1053,7 @@ int action;
slave = slave->next;
}
spinlock_release(&router->lock);
log_duration(LOGD_DISTRIBUTE, rdtsc() - entry);
}
static void

View File

@ -95,7 +95,7 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
return 0;
}
atomic_add(&slave->stats.n_requests, 1);
slave->stats.n_requests++;
switch (MYSQL_COMMAND(queue))
{
case COM_QUERY:
@ -796,7 +796,7 @@ doitagain:
slave->binlogfile)));
return 0;
}
atomic_add(&slave->stats.n_bursts, 1);
slave->stats.n_bursts++;
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_INNERLOOP;
spinlock_release(&slave->catch_lock);
@ -830,7 +830,7 @@ if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++;
slave->binlog_pos = hdr.next_pos;
}
rval = written;
atomic_add(&slave->stats.n_events, 1);
slave->stats.n_events++;
burst++;
}
if (record == NULL)
@ -843,7 +843,7 @@ if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++;
} while (record && DCB_BELOW_LOW_WATER(slave->dcb));
if (record)
{
atomic_add(&slave->stats.n_flows, 1);
slave->stats.n_flows++;
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
@ -854,7 +854,7 @@ if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++;
spinlock_acquire(&slave->catch_lock);
if ((slave->cstate & CS_UPTODATE) == 0)
{
atomic_add(&slave->stats.n_upd, 1);
slave->stats.n_upd++;
slave->cstate |= CS_UPTODATE;
state_change = 1;
}
@ -907,7 +907,7 @@ ROUTER_INSTANCE *router = slave->router;
if (slave->state == BLRS_DUMPING &&
slave->binlog_pos != router->binlog_position)
{
atomic_add(&slave->stats.n_dcb, 1);
slave->stats.n_dcb++;
blr_slave_catchup(router, slave);
}
}
@ -916,12 +916,12 @@ ROUTER_INSTANCE *router = slave->router;
{
if (slave->state == BLRS_DUMPING)
{
atomic_add(&slave->stats.n_cb, 1);
slave->stats.n_cb++;
blr_slave_catchup(router, slave);
}
else
{
atomic_add(&slave->stats.n_cbna, 1);
slave->stats.n_cbna++;
}
}
return 0;

View File

@ -1957,6 +1957,26 @@ static int routeQuery(
if (succp) /*< Have DCB of the target backend */
{
backend_ref_t* bref;
sescmd_cursor_t* scur;
bref = get_bref_from_dcb(router_cli_ses, target_dcb);
scur = &bref->bref_sescmd_cur;
/**
* Store current stmt if execution of previous session command
* haven't completed yet. Note that according to MySQL protocol
* there can only be one such non-sescmd stmt at the time.
*/
if (sescmd_cursor_is_active(scur))
{
ss_dassert(bref->bref_pending_cmd == NULL);
bref->bref_pending_cmd = gwbuf_clone(querybuf);
rses_end_locked_router_action(router_cli_ses);
ret = 1;
goto retblock;
}
if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1)
{
backend_ref_t* bref;
@ -2295,7 +2315,34 @@ static void clientReply (
ss_dassert(succp);
}
/** Unlock router session */
else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */
{
int ret;
CHK_GWBUF(bref->bref_pending_cmd);
if ((ret = bref->bref_dcb->func.write(bref->bref_dcb,
gwbuf_clone(bref->bref_pending_cmd))) == 1)
{
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
atomic_add(&inst->stats.n_queries, 1);
/**
* Add one query response waiter to backend reference
*/
bref_set_state(bref, BREF_QUERY_ACTIVE);
bref_set_state(bref, BREF_WAITING_RESULT);
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing query \"%s\" failed.",
bref->bref_pending_cmd)));
}
gwbuf_free(bref->bref_pending_cmd);
bref->bref_pending_cmd = NULL;
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
lock_failed:
@ -3656,6 +3703,14 @@ static bool route_session_write(
{
succp = false;
}
else if (LOG_IS_ENABLED(LOGFILE_TRACE))
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Wrote to %s:%d",
backend_ref[i].bref_backend->backend_server->name,
backend_ref[i].bref_backend->backend_server->port)));
}
}
}
rses_end_locked_router_action(router_cli_ses);