Merge branch 'blr' into release-1.0beta-refresh

This commit is contained in:
MassimilianoPinto 2014-09-25 17:08:30 +02:00
commit fb3ba269db
13 changed files with 536 additions and 425 deletions

View File

@ -123,12 +123,6 @@ DCB *rval;
rval->dcb_errhandle_called = false;
#endif
rval->dcb_role = role;
#if 1
simple_mutex_init(&rval->dcb_write_lock, "DCB write mutex");
simple_mutex_init(&rval->dcb_read_lock, "DCB read mutex");
rval->dcb_write_active = false;
rval->dcb_read_active = false;
#endif
spinlock_init(&rval->dcb_initlock);
spinlock_init(&rval->writeqlock);
spinlock_init(&rval->delayqlock);
@ -141,6 +135,13 @@ DCB *rval;
rval->polloutbusy = 0;
rval->writecheck = 0;
rval->fd = -1;
rval->evq.next = NULL;
rval->evq.prev = NULL;
rval->evq.pending_events = 0;
rval->evq.processing = 0;
spinlock_init(&rval->evq.eventqlock);
memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics
rval->state = DCB_STATE_ALLOC;
bitmask_init(&rval->memdata.bitmask);
@ -218,43 +219,11 @@ dcb_add_to_zombieslist(DCB *dcb)
spinlock_release(&zombiespin);
return;
}
#if 1
/*<
* Add closing dcb to the top of the list.
*/
dcb->memdata.next = zombies;
zombies = dcb;
#else
if (zombies == NULL) {
zombies = dcb;
} else {
DCB *ptr = zombies;
while (ptr->memdata.next)
{
ss_info_dassert(
ptr->memdata.next->state == DCB_STATE_ZOMBIE,
"Next zombie is not in DCB_STATE_ZOMBIE state");
ss_info_dassert(
ptr != dcb,
"Attempt to add DCB to zombies list although it "
"is already there.");
if (ptr == dcb)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Attempt to add DCB to zombies "
"list when it is already in the list")));
break;
}
ptr = ptr->memdata.next;
}
if (ptr != dcb) {
ptr->memdata.next = dcb;
}
}
#endif
/*<
* Set state which indicates that it has been added to zombies
* list.
@ -389,8 +358,6 @@ DCB_CALLBACK *cb;
spinlock_release(&dcb->cb_lock);
bitmask_free(&dcb->memdata.bitmask);
simple_mutex_done(&dcb->dcb_read_lock);
simple_mutex_done(&dcb->dcb_write_lock);
free(dcb);
}
@ -404,15 +371,10 @@ DCB_CALLBACK *cb;
* the memdata.bitmask then the DCB is no longer able to be
* referenced and it can be finally removed.
*
* The excluded DCB allows a thread to exclude a DCB from zombie processing.
* It is used when a thread calls dcb_process_zombies when there is
* a DCB that the caller knows it will continue processing with.
*
* @param threadid The thread ID of the caller
* @param excluded The DCB the thread currently uses, NULL or valid DCB.
*/
DCB *
dcb_process_zombies(int threadid, DCB *excluded)
dcb_process_zombies(int threadid)
{
DCB *ptr, *lptr;
DCB* dcb_list = NULL;
@ -446,9 +408,10 @@ bool succp = false;
CHK_DCB(ptr);
/*
* Skip processing of the excluded DCB
* Skip processing of DCB's that are
* in the event queue waiting to be processed.
*/
if (ptr == excluded)
if (ptr->evq.next || ptr->evq.prev)
{
lptr = ptr;
ptr = ptr->memdata.next;
@ -522,7 +485,6 @@ bool succp = false;
/*<
* Close file descriptor and move to clean-up phase.
*/
ss_dassert(excluded != dcb);
rc = close(dcb->fd);
if (rc < 0) {
@ -1255,7 +1217,7 @@ printDCB(DCB *dcb)
static void
spin_reporter(void *dcb, char *desc, int value)
{
dcb_printf((DCB *)dcb, "\t\t%-35s %d\n", desc, value);
dcb_printf((DCB *)dcb, "\t\t%-40s %d\n", desc, value);
}
@ -1316,10 +1278,6 @@ DCB *dcb;
dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls);
dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_readrechecks);
dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls);
dcb_printf(pdcb, "\t\tNo. of write rechecks: %d\n", dcb->stats.n_writerechecks);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
if (dcb->flags & DCBF_CLONE)
@ -1427,10 +1385,6 @@ dprintDCB(DCB *pdcb, DCB *dcb)
dcb->stats.n_buffered);
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n",
dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls);
dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_readrechecks);
dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls);
dcb_printf(pdcb, "\t\tNo. of write rechecks: %d\n", dcb->stats.n_writerechecks);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n",
dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n",
@ -1955,99 +1909,6 @@ int rval = 0;
return rval;
}
/**
* Called by the EPOLLIN event. Take care of calling the protocol
* read entry point and managing multiple threads competing for the DCB
* without blocking those threads.
*
* This mechanism does away with the need for a mutex on the EPOLLIN event
* and instead implements a queuing mechanism in which nested events are
* queued on the DCB such that when the thread processing the first event
* returns it will read the queued event and process it. This allows the
* thread that would otherwise have to wait to process the nested event
* to return immediately and and process other events.
*
* @param dcb The DCB that has data available
* @param thread_id The ID of the calling thread
* @param nozombies If non-zero then do not do zombie processing
*/
void
dcb_pollin(DCB *dcb, int thread_id, int nozombies)
{
spinlock_acquire(&dcb->pollinlock);
if (dcb->pollinbusy == 0)
{
dcb->pollinbusy = 1;
do {
if (dcb->readcheck)
{
dcb->stats.n_readrechecks++;
if (!nozombies)
dcb_process_zombies(thread_id, dcb);
}
dcb->readcheck = 0;
spinlock_release(&dcb->pollinlock);
dcb->func.read(dcb);
spinlock_acquire(&dcb->pollinlock);
} while (dcb->readcheck);
dcb->pollinbusy = 0;
}
else
{
dcb->stats.n_busypolls++;
dcb->readcheck = 1;
}
spinlock_release(&dcb->pollinlock);
}
/**
* Called by the EPOLLOUT event. Take care of calling the protocol
* write_ready entry point and managing multiple threads competing for the DCB
* without blocking those threads.
*
* This mechanism does away with the need for a mutex on the EPOLLOUT event
* and instead implements a queuing mechanism in which nested events are
* queued on the DCB such that when the thread processing the first event
* returns it will read the queued event and process it. This allows the
* thread that would otherwise have to wait to process the nested event
* to return immediately and and process other events.
*
* @param dcb The DCB thats available for writes
* @param thread_id The ID of the calling thread
* @param nozombies If non-zero then do not do zombie processing
*/
void
dcb_pollout(DCB *dcb, int thread_id, int nozombies)
{
spinlock_acquire(&dcb->polloutlock);
if (dcb->polloutbusy == 0)
{
dcb->polloutbusy = 1;
do {
if (dcb->writecheck)
{
if (!nozombies)
dcb_process_zombies(thread_id, dcb);
dcb->stats.n_writerechecks++;
}
dcb->writecheck = 0;
spinlock_release(&dcb->polloutlock);
dcb->func.write_ready(dcb);
spinlock_acquire(&dcb->polloutlock);
} while (dcb->writecheck);
dcb->polloutbusy = 0;
}
else
{
dcb->stats.n_busywrpolls++;
dcb->writecheck = 1;
}
spinlock_release(&dcb->polloutlock);
}
/**
* Get the next DCB in the list of all DCB's

View File

@ -30,6 +30,13 @@
#include <gw.h>
#include <config.h>
#include <housekeeper.h>
#include <mysql.h>
#define PROFILE_POLL 1
#if PROFILE_POLL
#include <rdtsc.h>
#endif
extern int lm_enabled_logfiles_bitmask;
@ -47,6 +54,11 @@ extern int lm_enabled_logfiles_bitmask;
* etc.
* 23/09/14 Mark Riddoch Make use of RDHUP conditional to allow CentOS 5
* builds.
* 24/09/14 Mark Riddoch Introduction of the event queue for processing the
* incoming events rather than processing them immediately
* in the loop after the epoll_wait. This allows for better
* thread utilisaiton and fairer scheduling of the event
* processing.
*
* @endverbatim
*/
@ -65,6 +77,11 @@ static GWBITMASK poll_mask;
static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */
#endif
static int n_waiting = 0; /*< No. of threads in epoll_wait */
static int process_pollq(int thread_id);
DCB *eventq = NULL;
SPINLOCK pollqlock = SPINLOCK_INIT;
/**
* Thread load average, this is the average number of descriptors in each
@ -125,6 +142,8 @@ static struct {
int n_nothreads; /*< Number of times no threads are polling */
int n_fds[MAXNFDS]; /*< Number of wakeups with particular
n_fds value */
int evq_length; /*< Event queue length */
int evq_max; /*< Maximum event queue length */
} pollStats;
/**
@ -338,6 +357,17 @@ return_rc:
* deschedule a process if a timeout is included, but will not do this if a 0 timeout
* value is given. this improves performance when the gateway is under heavy load.
*
* In order to provide a fairer means of sharign the threads between the different
* DCB's the poll mechanism has been decoupled from the processing of the events.
* The events are now recieved via the epoll_wait call, a queue of DCB's that have
* events pending is maintained and as new events arrive the DCB is added to the end
* of this queue. If an eent arrives for a DCB alreayd in the queue, then the event
* bits are added to the DCB but the DCB mantains the same point in the queue unless
* the original events are already being processed. If they are being processed then
* the DCB is moved to the back of the queue, this means that a DCB that is receiving
* events at a high rate will not block the execution of events for other DCB's and
* should result in a fairer polling strategy.
*
* @param arg The thread ID passed as a void * to satisfy the threading package
*/
void
@ -346,8 +376,6 @@ poll_waitevents(void *arg)
struct epoll_event events[MAX_EVENTS];
int i, nfds;
int thread_id = (int)arg;
bool no_op = false;
static bool process_zombies_only = false; /*< flag for all threads */
DCB *zombies = NULL;
/** Add this thread to the bitmask of running polling threads */
@ -362,20 +390,19 @@ DCB *zombies = NULL;
while (1)
{
/* Process of the queue of waiting requests */
while (process_pollq(thread_id))
{
if (thread_data)
thread_data[thread_id].state = THREAD_ZPROCESSING;
zombies = dcb_process_zombies(thread_id);
}
atomic_add(&n_waiting, 1);
#if BLOCKINGPOLL
nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
atomic_add(&n_waiting, -1);
#else /* BLOCKINGPOLL */
if (!no_op) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] MaxScale thread "
"%d > epoll_wait <",
pthread_self(),
thread_id)));
no_op = TRUE;
}
#if MUTEX_EPOLL
simple_mutex_lock(&epoll_wait_mutex, TRUE);
#endif
@ -396,32 +423,19 @@ DCB *zombies = NULL;
pthread_self(),
nfds,
eno)));
no_op = FALSE;
}
else if (nfds == 0)
/*
* If there are no new descriptors from the non-blocking call
* and nothing to proces on the event queue then for do a
* blocking call to epoll_wait.
*/
else if (nfds == 0 && process_pollq(thread_id) == 0)
{
atomic_add(&n_waiting, -1);
if (process_zombies_only) {
#if MUTEX_EPOLL
simple_mutex_unlock(&epoll_wait_mutex);
#endif
goto process_zombies;
} else {
nfds = epoll_wait(epoll_fd,
atomic_add(&n_waiting, 1);
nfds = epoll_wait(epoll_fd,
events,
MAX_EVENTS,
EPOLL_TIMEOUT);
/*<
* When there are zombies to be cleaned up but
* no client requests, allow all threads to call
* dcb_process_zombies without having to wait
* for the timeout.
*/
if (nfds == 0 && dcb_get_zombies() != NULL)
{
process_zombies_only = true;
}
}
}
else
{
@ -457,228 +471,64 @@ DCB *zombies = NULL;
atomic_add(&load_samples, 1);
atomic_add(&load_nfds, nfds);
/*
* Process every DCB that has a new event and add
* it to the poll queue.
* If the DCB is currently beign processed then we
* or in the new eent bits to the pending event bits
* and leave it in the queue.
* If the DCB was not already in the queue then it was
* idle and is added to the queue to process after
* setting the event bits.
*/
for (i = 0; i < nfds; i++)
{
DCB *dcb = (DCB *)events[i].data.ptr;
__uint32_t ev = events[i].events;
CHK_DCB(dcb);
if (thread_data)
spinlock_acquire(&pollqlock);
if (DCB_POLL_BUSY(dcb))
{
thread_data[thread_id].cur_dcb = dcb;
thread_data[thread_id].event = ev;
dcb->evq.pending_events |= ev;
}
#if defined(SS_DEBUG)
if (dcb_fake_write_ev[dcb->fd] != 0) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Added fake events %d to ev %d.",
pthread_self(),
dcb_fake_write_ev[dcb->fd],
ev)));
ev |= dcb_fake_write_ev[dcb->fd];
dcb_fake_write_ev[dcb->fd] = 0;
}
#endif
ss_debug(spinlock_acquire(&dcb->dcb_initlock);)
ss_dassert(dcb->state != DCB_STATE_ALLOC);
ss_dassert(dcb->state != DCB_STATE_DISCONNECTED);
ss_dassert(dcb->state != DCB_STATE_FREED);
ss_debug(spinlock_release(&dcb->dcb_initlock);)
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] event %d dcb %p "
"role %s",
pthread_self(),
ev,
dcb,
STRDCBROLE(dcb->dcb_role))));
if (ev & EPOLLOUT)
else
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
if (eno == 0) {
#if MUTEX_BLOCK
simple_mutex_lock(
&dcb->dcb_write_lock,
true);
ss_info_dassert(
!dcb->dcb_write_active,
"Write already active");
dcb->dcb_write_active = TRUE;
atomic_add(
&pollStats.n_write,
1);
dcb->func.write_ready(dcb);
dcb->dcb_write_active = FALSE;
simple_mutex_unlock(
&dcb->dcb_write_lock);
#else
atomic_add(&pollStats.n_write,
1);
dcb_pollout(dcb, thread_id, nfds);
#endif
} else {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLOUT due %d, %s. "
"dcb %p, fd %i",
pthread_self(),
eno,
strerror(eno),
dcb,
dcb->fd)));
}
}
if (ev & EPOLLIN)
{
#if MUTEX_BLOCK
simple_mutex_lock(&dcb->dcb_read_lock,
true);
ss_info_dassert(!dcb->dcb_read_active,
"Read already active");
dcb->dcb_read_active = TRUE;
#endif
if (dcb->state == DCB_STATE_LISTENING)
dcb->evq.pending_events = ev;
if (eventq)
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Accept in fd %d",
pthread_self(),
dcb->fd)));
atomic_add(
&pollStats.n_accept, 1);
dcb->func.accept(dcb);
}
else
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Read in dcb %p fd %d",
pthread_self(),
dcb,
dcb->fd)));
atomic_add(&pollStats.n_read, 1);
#if MUTEX_BLOCK
dcb->func.read(dcb);
#else
dcb_pollin(dcb, thread_id, nfds);
#endif
}
#if MUTEX_BLOCK
dcb->dcb_read_active = FALSE;
simple_mutex_unlock(
&dcb->dcb_read_lock);
#endif
}
if (ev & EPOLLERR)
{
int eno = gw_getsockerrno(dcb->fd);
#if defined(SS_DEBUG)
if (eno == 0) {
eno = dcb_fake_write_errno[dcb->fd];
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Added fake errno %d. "
"%s",
pthread_self(),
eno,
strerror(eno))));
}
dcb_fake_write_errno[dcb->fd] = 0;
#endif
if (eno != 0) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLERR due %d, %s.",
pthread_self(),
eno,
strerror(eno))));
}
atomic_add(&pollStats.n_error, 1);
dcb->func.error(dcb);
}
if (ev & EPOLLHUP)
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLHUP on dcb %p, fd %d. "
"Errno %d, %s.",
pthread_self(),
dcb,
dcb->fd,
eno,
strerror(eno))));
atomic_add(&pollStats.n_hup, 1);
spinlock_acquire(&dcb->dcb_initlock);
if ((dcb->flags & DCBF_HUNG) == 0)
{
dcb->flags |= DCBF_HUNG;
spinlock_release(&dcb->dcb_initlock);
dcb->func.hangup(dcb);
dcb->evq.prev = eventq->evq.prev;
eventq->evq.prev->evq.next = dcb;
eventq->evq.prev = dcb;
dcb->evq.next = eventq;
}
else
spinlock_release(&dcb->dcb_initlock);
}
#ifdef EPOLLRDHUP
if (ev & EPOLLRDHUP)
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLRDHUP on dcb %p, fd %d. "
"Errno %d, %s.",
pthread_self(),
dcb,
dcb->fd,
eno,
strerror(eno))));
atomic_add(&pollStats.n_hup, 1);
spinlock_acquire(&dcb->dcb_initlock);
if ((dcb->flags & DCBF_HUNG) == 0)
{
dcb->flags |= DCBF_HUNG;
spinlock_release(&dcb->dcb_initlock);
dcb->func.hangup(dcb);
eventq = dcb;
dcb->evq.prev = dcb;
dcb->evq.next = dcb;
}
pollStats.evq_length++;
if (pollStats.evq_length > pollStats.evq_max)
{
pollStats.evq_max = pollStats.evq_length;
}
else
spinlock_release(&dcb->dcb_initlock);
}
#endif
} /*< for */
no_op = FALSE;
} /*< if (nfds > 0) */
process_zombies:
if (thread_data)
{
thread_data[thread_id].state = THREAD_ZPROCESSING;
spinlock_release(&pollqlock);
}
}
zombies = dcb_process_zombies(thread_id, NULL);
if (zombies == NULL) {
process_zombies_only = false;
}
/*
* If there was nothing to process then process the zombie queue
*/
if (process_pollq(thread_id) == 0)
{
if (thread_data)
{
thread_data[thread_id].state = THREAD_ZPROCESSING;
}
zombies = dcb_process_zombies(thread_id);
}
if (do_shutdown)
{
/*<
@ -701,6 +551,320 @@ process_zombies:
} /*< while(1) */
}
/**
* Process of the queue of DCB's that have outstanding events
*
* The first event on the queue will be chosen to be executed by this thread,
* all other events will be left on the queue and may be picked up by other
* threads. When the processing is complete the thread will take the DCB off the
* queue if there are no pending events that have arrived since the thread started
* to process the DCB. If there are pending events the DCB will be moved to the
* back of the queue so that other DCB's will have a share of the threads to
* execute events for them.
*
* @param thread_id The thread ID of the calling thread
* @return 0 if no DCB's have been processed
*/
static int
process_pollq(int thread_id)
{
DCB *dcb;
int found = 0;
uint32_t ev;
spinlock_acquire(&pollqlock);
if (eventq == NULL)
{
/* Nothing to process */
spinlock_release(&pollqlock);
return 0;
}
dcb = eventq;
if (dcb->evq.next == dcb->evq.prev && dcb->evq.processing == 0)
{
found = 1;
dcb->evq.processing = 1;
}
else if (dcb->evq.next == dcb->evq.prev)
{
/* Only item in queue is being processed */
spinlock_release(&pollqlock);
return 0;
}
else
{
do {
dcb = dcb->evq.next;
} while (dcb != eventq && dcb->evq.processing == 1);
if (dcb->evq.processing == 0)
{
/* Found DCB to process */
dcb->evq.processing = 1;
found = 1;
}
}
if (found)
{
ev = dcb->evq.pending_events;
dcb->evq.pending_events = 0;
}
spinlock_release(&pollqlock);
if (found == 0)
return 0;
CHK_DCB(dcb);
if (thread_data)
{
thread_data[thread_id].state = THREAD_PROCESSING;
thread_data[thread_id].cur_dcb = dcb;
thread_data[thread_id].event = ev;
}
#if defined(SS_DEBUG)
if (dcb_fake_write_ev[dcb->fd] != 0) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Added fake events %d to ev %d.",
pthread_self(),
dcb_fake_write_ev[dcb->fd],
ev)));
ev |= dcb_fake_write_ev[dcb->fd];
dcb_fake_write_ev[dcb->fd] = 0;
}
#endif
ss_debug(spinlock_acquire(&dcb->dcb_initlock);)
ss_dassert(dcb->state != DCB_STATE_ALLOC);
ss_dassert(dcb->state != DCB_STATE_DISCONNECTED);
ss_dassert(dcb->state != DCB_STATE_FREED);
ss_debug(spinlock_release(&dcb->dcb_initlock);)
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] event %d dcb %p "
"role %s",
pthread_self(),
ev,
dcb,
STRDCBROLE(dcb->dcb_role))));
if (ev & EPOLLOUT)
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
if (eno == 0) {
#if MUTEX_BLOCK
simple_mutex_lock(
&dcb->dcb_write_lock,
true);
ss_info_dassert(
!dcb->dcb_write_active,
"Write already active");
dcb->dcb_write_active = TRUE;
atomic_add(
&pollStats.n_write,
1);
dcb->func.write_ready(dcb);
dcb->dcb_write_active = FALSE;
simple_mutex_unlock(
&dcb->dcb_write_lock);
#else
atomic_add(&pollStats.n_write,
1);
dcb->func.write_ready(dcb);
#endif
} else {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLOUT due %d, %s. "
"dcb %p, fd %i",
pthread_self(),
eno,
strerror(eno),
dcb,
dcb->fd)));
}
}
if (ev & EPOLLIN)
{
#if MUTEX_BLOCK
simple_mutex_lock(&dcb->dcb_read_lock,
true);
ss_info_dassert(!dcb->dcb_read_active,
"Read already active");
dcb->dcb_read_active = TRUE;
#endif
if (dcb->state == DCB_STATE_LISTENING)
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Accept in fd %d",
pthread_self(),
dcb->fd)));
atomic_add(
&pollStats.n_accept, 1);
dcb->func.accept(dcb);
}
else
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Read in dcb %p fd %d",
pthread_self(),
dcb,
dcb->fd)));
atomic_add(&pollStats.n_read, 1);
dcb->func.read(dcb);
}
#if MUTEX_BLOCK
dcb->dcb_read_active = FALSE;
simple_mutex_unlock(
&dcb->dcb_read_lock);
#endif
}
if (ev & EPOLLERR)
{
int eno = gw_getsockerrno(dcb->fd);
#if defined(SS_DEBUG)
if (eno == 0) {
eno = dcb_fake_write_errno[dcb->fd];
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Added fake errno %d. "
"%s",
pthread_self(),
eno,
strerror(eno))));
}
dcb_fake_write_errno[dcb->fd] = 0;
#endif
if (eno != 0) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLERR due %d, %s.",
pthread_self(),
eno,
strerror(eno))));
}
atomic_add(&pollStats.n_error, 1);
dcb->func.error(dcb);
}
if (ev & EPOLLHUP)
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLHUP on dcb %p, fd %d. "
"Errno %d, %s.",
pthread_self(),
dcb,
dcb->fd,
eno,
strerror(eno))));
atomic_add(&pollStats.n_hup, 1);
spinlock_acquire(&dcb->dcb_initlock);
if ((dcb->flags & DCBF_HUNG) == 0)
{
dcb->flags |= DCBF_HUNG;
spinlock_release(&dcb->dcb_initlock);
dcb->func.hangup(dcb);
}
else
spinlock_release(&dcb->dcb_initlock);
}
#ifdef EPOLLRDHUP
if (ev & EPOLLRDHUP)
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLRDHUP on dcb %p, fd %d. "
"Errno %d, %s.",
pthread_self(),
dcb,
dcb->fd,
eno,
strerror(eno))));
atomic_add(&pollStats.n_hup, 1);
spinlock_acquire(&dcb->dcb_initlock);
if ((dcb->flags & DCBF_HUNG) == 0)
{
dcb->flags |= DCBF_HUNG;
spinlock_release(&dcb->dcb_initlock);
dcb->func.hangup(dcb);
}
else
spinlock_release(&dcb->dcb_initlock);
}
#endif
spinlock_acquire(&pollqlock);
if (dcb->evq.pending_events == 0)
{
/* No pending events so remove from the queue */
if (dcb->evq.prev != dcb)
{
dcb->evq.prev->evq.next = dcb->evq.next;
dcb->evq.next->evq.prev = dcb->evq.prev;
if (eventq == dcb)
eventq = dcb->evq.next;
}
else
{
eventq = NULL;
}
dcb->evq.next = NULL;
dcb->evq.prev = NULL;
pollStats.evq_length--;
}
else
{
/*
* We have a pending event, move to the end of the queue
* if there are any other DCB's in the queue.
*
* If we are the first item on the queue this is easy, we
* just bump the eventq pointer.
*/
if (dcb->evq.prev != dcb)
{
if (eventq == dcb)
eventq = dcb->evq.next;
else
{
dcb->evq.prev->evq.next = dcb->evq.next;
dcb->evq.next->evq.prev = dcb->evq.prev;
dcb->evq.prev = eventq->evq.prev;
dcb->evq.next = eventq;
eventq->evq.prev = dcb;
dcb->evq.prev->evq.next = dcb;
}
}
}
dcb->evq.processing = 0;
spinlock_release(&pollqlock);
return 1;
}
/**
* Shutdown the polling loop
*/
@ -745,6 +909,10 @@ int i;
pollStats.n_accept);
dcb_printf(dcb, "Number of times no threads polling: %d\n",
pollStats.n_nothreads);
dcb_printf(dcb, "Current event queue length: %d\n",
pollStats.evq_length);
dcb_printf(dcb, "Maximum event queue length: %d\n",
pollStats.evq_max);
dcb_printf(dcb, "No of poll completions with descriptors\n");
dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n");

View File

@ -53,7 +53,8 @@ struct service;
* 07/02/2014 Massimiliano Pinto Added ipv4 data struct into for dcb
* 07/05/2014 Mark Riddoch Addition of callback mechanism
* 08/05/2014 Mark Riddoch Addition of writeq high and low watermarks
* 27/08/2014 Mark Ridddoch Addition of write event queuing
* 27/08/2014 Mark Riddoch Addition of write event queuing
* 23/09/2014 Mark Riddoch New poll processing queue
*
* @endverbatim
*/
@ -97,6 +98,14 @@ typedef struct gw_protocol {
int (*session)(struct dcb *, void *);
} GWPROTOCOL;
typedef struct {
struct dcb *next;
struct dcb *prev;
uint32_t pending_events;
int processing;
SPINLOCK eventqlock;
} DCBEVENTQ;
/**
* The GWPROTOCOL version data. The following should be updated whenever
* the GWPROTOCOL structure is changed. See the rules defined in modinfo.h
@ -114,10 +123,6 @@ typedef struct dcbstats {
int n_buffered; /*< Number of buffered writes */
int n_high_water; /*< Number of crosses of high water mark */
int n_low_water; /*< Number of crosses of low water mark */
int n_busypolls; /*< Number of read polls whiel reading */
int n_readrechecks; /*< Number of rechecks for reads */
int n_busywrpolls; /*< Number of write polls while writing */
int n_writerechecks;/*< Number of rechecks for writes */
} DCBSTATS;
/**
@ -204,12 +209,7 @@ typedef struct dcb {
#endif
dcb_role_t dcb_role;
SPINLOCK dcb_initlock;
#if 1
simple_mutex_t dcb_read_lock;
simple_mutex_t dcb_write_lock;
bool dcb_read_active;
bool dcb_write_active;
#endif
DCBEVENTQ evq; /**< The event queue for this DCB */
int fd; /**< The descriptor */
dcb_state_t state; /**< Current descriptor state */
int flags; /**< DCB flags */
@ -271,8 +271,8 @@ int fail_accept_errno;
#define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water)
#define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water)
void dcb_pollin(DCB *, int, int);
void dcb_pollout(DCB *, int, int);
#define DCB_POLL_BUSY(x) ((x)->evq.next != NULL)
DCB *dcb_get_zombies(void);
int gw_write(
#if defined(SS_DEBUG)
@ -289,7 +289,7 @@ DCB *dcb_clone(DCB *);
int dcb_read(DCB *, GWBUF **);
int dcb_drain_writeq(DCB *);
void dcb_close(DCB *);
DCB *dcb_process_zombies(int, DCB*); /* Process Zombies except the one behind the pointer */
DCB *dcb_process_zombies(int); /* Process Zombies except the one behind the pointer */
void printAllDCBs(); /* Debug to print all DCB in the system */
void printDCB(DCB *); /* Debug print routine */
void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */

View File

@ -28,10 +28,10 @@
* Configuration for send and receive socket buffer sizes for
* backend and cleint connections.
*/
#define GW_BACKEND_SO_SNDBUF 32768
#define GW_BACKEND_SO_RCVBUF 32768
#define GW_CLIENT_SO_SNDBUF 32768
#define GW_CLIENT_SO_RCVBUF 32768
#define GW_BACKEND_SO_SNDBUF (128 * 1024)
#define GW_BACKEND_SO_RCVBUF (128 * 1024)
#define GW_CLIENT_SO_SNDBUF (128 * 1024)
#define GW_CLIENT_SO_RCVBUF (128 * 1024)
#define GW_NOINTR_CALL(A) do { errno = 0; A; } while (errno == EINTR)
#define GW_MYSQL_LOOP_TIMEOUT 300000000

61
server/include/rdtsc.h Normal file
View File

@ -0,0 +1,61 @@
#ifndef _RDTSC_H
#define _RDTSC_H
/*
* This file is distributed as part of the SkySQL Gateway. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* @file rdtsc.h Access the process time-stamp counter
*
* This is an Intel only facilty that is used to access an accurate time
* value, the granularity of which is related to the processor clock speed
* and the overhead for access is much lower than using any system call
* mechanism.
*
*
* @verbatim
* Revision History
*
* Date Who Description
* 19/09/2014 Mark Riddoch Initial implementation
*
* @endverbatim
*/
typedef unsigned long long CYCLES;
/**
* Get the current time-stamp counter value from the processor. This is the
* count of CPU cyceles as a 64 bit value.
*
* The value returned is related to the clock speed, to obtian a value in
* seconds divide the returned value by the clock frequency for the processor.
*
* Note, on multi-processsor systems care much be taken to avoid the thread
* moving to a different processor when taken succsive value of RDTSC to
* obtian accurate timing. This may be done by setting pocessor affinity for
* the thread. See sched_setaffinity/sched_getaffinity.
*
* @return CPU cycle count
*/
static __inline__ CYCLES rdtsc(void)
{
unsigned long long int x;
__asm__ volatile (".byte 0x0f, 0x31" : "=A" (x));
return x;
}
#endif

View File

@ -194,6 +194,7 @@ typedef struct router_instance {
char *uuid; /*< UUID for the router to use w/master */
int masterid; /*< Server ID of the master */
int serverid; /*< Server ID to use with master */
int initbinlog; /*< Initial binlog file number */
char *user; /*< User name to use with master */
char *password; /*< Password to use with master */
char *fileroot; /*< Root of binlog filename */

View File

@ -1180,9 +1180,9 @@ int gw_MySQLAccept(DCB *listener)
conn_open[c_sock] = true;
#endif
/* set nonblocking */
sendbuf = GW_BACKEND_SO_SNDBUF;
sendbuf = GW_CLIENT_SO_SNDBUF;
setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen);
sendbuf = GW_BACKEND_SO_RCVBUF;
sendbuf = GW_CLIENT_SO_RCVBUF;
setsockopt(c_sock, SOL_SOCKET, SO_RCVBUF, &sendbuf, optlen);
setnonblocking(c_sock);

View File

@ -766,9 +766,9 @@ int gw_do_connect_to_backend(
/* prepare for connect */
setipaddress(&serv_addr.sin_addr, host);
serv_addr.sin_port = htons(port);
bufsize = GW_CLIENT_SO_SNDBUF;
bufsize = GW_BACKEND_SO_SNDBUF;
setsockopt(so, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize));
bufsize = GW_CLIENT_SO_RCVBUF;
bufsize = GW_BACKEND_SO_RCVBUF;
setsockopt(so, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize));
/* set socket to as non-blocking here */
setnonblocking(so);

View File

@ -47,7 +47,8 @@ CLIOBJ=$(CLISRCS:.c=.o)
SRCS=$(TESTSRCS) $(READCONSRCS) $(DEBUGCLISRCS) cli.c
OBJ=$(SRCS:.c=.o)
LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager
MODULES= libdebugcli.so libreadconnroute.so libtestroute.so libcli.so
MODULES= libdebugcli.so libreadconnroute.so libtestroute.so libcli.so \
libbinlogrouter.so
all: $(MODULES)
@ -66,27 +67,33 @@ libcli.so: $(CLIOBJ)
$(CC) $(LDFLAGS) $(CLIOBJ) $(LIBS) -o $@
libreadwritesplit.so:
# (cd readwritesplit; touch depend.mk ; make; cp $@ ..)
(cd readwritesplit; touch depend.mk ; make; cp $@ ..)
libbinlogrouter.so:
(cd binlog; touch depend.mk ; make; cp $@ ..)
.c.o:
$(CC) $(CFLAGS) $< -o $@
clean:
$(DEL) $(OBJ) $(MODULES)
(cd readwritesplit; touch depend.mk; make clean)
(cd binlog; touch depend.mk; make clean)
tags:
ctags $(SRCS) $(HDRS)
(cd readwritesplit; make tags)
(cd binlog; make tags)
depend:
@$(DEL) depend.mk
cc -M $(CFLAGS) $(SRCS) > depend.mk
(cd readwritesplit; touch depend.mk ; make depend)
(cd binlog; touch depend.mk ; make depend)
install: $(MODULES)
install -D $(MODULES) $(DEST)/modules
(cd readwritesplit; make DEST=$(DEST) install)
(cd binlog; make DEST=$(DEST) install)
cleantests:
$(MAKE) -C test cleantests

View File

@ -172,6 +172,7 @@ int i;
inst->low_water = DEF_LOW_WATER;
inst->high_water = DEF_HIGH_WATER;
inst->initbinlog = 0;
/*
* We only support one server behind this router, since the server is
@ -244,6 +245,10 @@ int i;
{
inst->fileroot = strdup(value);
}
else if (strcmp(options[i], "initialfile") == 0)
{
inst->initbinlog = atoi(value);
}
else if (strcmp(options[i], "lowwater") == 0)
{
inst->low_water = atoi(value);
@ -450,11 +455,14 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
* TODO: Handle closure of master session
*/
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, "Binlog router close session with master")));
LOGFILE_ERROR,
"Binlog router close session with master server %s",
router->service->databases->unique_name)));
blr_master_reconnect(router);
return;
}
CHK_CLIENT_RSES(slave);
/**
* Lock router client session for secure read and update.
*/

View File

@ -115,7 +115,11 @@ struct dirent *dp;
if (n == 0) // No binlog files found
{
sprintf(filename, BINLOG_NAMEFMT, router->fileroot, 1);
if (router->initbinlog)
sprintf(filename, BINLOG_NAMEFMT, router->fileroot,
router->initbinlog);
else
sprintf(filename, BINLOG_NAMEFMT, router->fileroot, 1);
blr_file_create(router, filename);
}
else

View File

@ -105,7 +105,8 @@ GWBUF *buf;
if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Binlog router: failed to connect to master\n")));
"Binlog router: failed to connect to master server '%s'\n",
router->service->databases->unique_name)));
return;
}
@ -828,7 +829,7 @@ char file[BINLOG_FNAMELEN+1];
pos = extract_field(ptr+4, 32);
pos <<= 32;
pos |= extract_field(ptr, 32);
slen = len - 8;
slen = len - (8 + 4); // Allow for position and CRC
if (slen > BINLOG_FNAMELEN)
slen = BINLOG_FNAMELEN;
memcpy(file, ptr + 8, slen);

View File

@ -95,7 +95,7 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
return 0;
}
atomic_add(&slave->stats.n_requests, 1);
slave->stats.n_requests++;
switch (MYSQL_COMMAND(queue))
{
case COM_QUERY:
@ -796,7 +796,7 @@ doitagain:
slave->binlogfile)));
return 0;
}
atomic_add(&slave->stats.n_bursts, 1);
slave->stats.n_bursts++;
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_INNERLOOP;
spinlock_release(&slave->catch_lock);
@ -830,7 +830,7 @@ if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++;
slave->binlog_pos = hdr.next_pos;
}
rval = written;
atomic_add(&slave->stats.n_events, 1);
slave->stats.n_events++;
burst++;
}
if (record == NULL)
@ -843,7 +843,7 @@ if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++;
} while (record && DCB_BELOW_LOW_WATER(slave->dcb));
if (record)
{
atomic_add(&slave->stats.n_flows, 1);
slave->stats.n_flows++;
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
@ -854,7 +854,7 @@ if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++;
spinlock_acquire(&slave->catch_lock);
if ((slave->cstate & CS_UPTODATE) == 0)
{
atomic_add(&slave->stats.n_upd, 1);
slave->stats.n_upd++;
slave->cstate |= CS_UPTODATE;
state_change = 1;
}
@ -907,7 +907,7 @@ ROUTER_INSTANCE *router = slave->router;
if (slave->state == BLRS_DUMPING &&
slave->binlog_pos != router->binlog_position)
{
atomic_add(&slave->stats.n_dcb, 1);
slave->stats.n_dcb++;
blr_slave_catchup(router, slave);
}
}
@ -916,12 +916,12 @@ ROUTER_INSTANCE *router = slave->router;
{
if (slave->state == BLRS_DUMPING)
{
atomic_add(&slave->stats.n_cb, 1);
slave->stats.n_cb++;
blr_slave_catchup(router, slave);
}
else
{
atomic_add(&slave->stats.n_cbna, 1);
slave->stats.n_cbna++;
}
}
return 0;