Merge
This commit is contained in:
commit
5a4ae09c51
@ -45,6 +45,7 @@ CFLAGS=-c -I/usr/include -I../include -I../inih \
|
||||
-I$(LOGPATH) -I$(UTILSPATH) \
|
||||
-Wall -g
|
||||
|
||||
|
||||
ifdef DEBUG
|
||||
CFLAGS := $(CFLAGS) -DSS_DEBUG
|
||||
endif
|
||||
@ -54,13 +55,13 @@ LDFLAGS=-rdynamic -L$(LOGPATH) \
|
||||
|
||||
SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c \
|
||||
gw_utils.c utils.c dcb.c load_utils.c session.c service.c server.c \
|
||||
poll.c config.c users.c hashtable.c dbusers.c thread.c
|
||||
poll.c config.c users.c hashtable.c dbusers.c thread.c gwbitmask.c
|
||||
|
||||
HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \
|
||||
../include/gateway_mysql.h ../include/gw.h ../include/mysql_protocol.h \
|
||||
../include/session.h ../include/spinlock.h ../include/thread.h \
|
||||
../include/modules.h ../include/poll.h ../include/config.h \
|
||||
../include/users.h ../include/hashtable.h
|
||||
../include/users.h ../include/hashtable.h ../include/gwbitmask.h
|
||||
OBJ=$(SRCS:.c=.o)
|
||||
|
||||
LIBS=-L../inih/extra -linih -lssl -lstdc++ \
|
||||
|
141
core/dcb.c
141
core/dcb.c
@ -28,10 +28,13 @@
|
||||
* @verbatim
|
||||
* Revision History
|
||||
*
|
||||
* Date Who Description
|
||||
* 12/06/13 Mark Riddoch Initial implementation
|
||||
* Date Who Description
|
||||
* 12/06/13 Mark Riddoch Initial implementation
|
||||
* 21/06/13 Massimiliano Pinto free_dcb is used
|
||||
* 25/06/13 Massimiliano Pinto Added checks to session and router_session
|
||||
* 28/06/13 Mark Riddoch Changed the free mechanism ti
|
||||
* introduce a zombie state for the
|
||||
* dcb
|
||||
* @endverbatim
|
||||
*/
|
||||
#include <stdio.h>
|
||||
@ -51,8 +54,11 @@
|
||||
#include <atomic.h>
|
||||
|
||||
static DCB *allDCBs = NULL; /* Diagnotics need a list of DCBs */
|
||||
static SPINLOCK *dcbspin = NULL;
|
||||
static DCB *zombies = NULL;
|
||||
static SPINLOCK dcbspin = SPINLOCK_INIT;
|
||||
static SPINLOCK zombiespin = SPINLOCK_INIT;
|
||||
|
||||
static void dcb_final_free(DCB *dcb);
|
||||
/**
|
||||
* Allocate a new DCB.
|
||||
*
|
||||
@ -66,13 +72,6 @@ dcb_alloc()
|
||||
{
|
||||
DCB *rval;
|
||||
|
||||
if (dcbspin == NULL)
|
||||
{
|
||||
if ((dcbspin = malloc(sizeof(SPINLOCK))) == NULL)
|
||||
return NULL;
|
||||
spinlock_init(dcbspin);
|
||||
}
|
||||
|
||||
if ((rval = malloc(sizeof(DCB))) == NULL)
|
||||
{
|
||||
return NULL;
|
||||
@ -86,8 +85,10 @@ DCB *rval;
|
||||
rval->protocol = NULL;
|
||||
rval->session = NULL;
|
||||
memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics
|
||||
bitmask_init(&rval->memdata.bitmask);
|
||||
rval->memdata.next = NULL;
|
||||
|
||||
spinlock_acquire(dcbspin);
|
||||
spinlock_acquire(&dcbspin);
|
||||
if (allDCBs == NULL)
|
||||
allDCBs = rval;
|
||||
else
|
||||
@ -97,26 +98,59 @@ DCB *rval;
|
||||
ptr = ptr->next;
|
||||
ptr->next = rval;
|
||||
}
|
||||
spinlock_release(dcbspin);
|
||||
spinlock_release(&dcbspin);
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Free a DCB and remove it from the chain of all DCBs
|
||||
* Free a DCB, this only marks the DCB as a zombie and adds it
|
||||
* to the zombie list. The real working of removing it occurs once
|
||||
* all the threads signal they no longer have access to the DCB
|
||||
*
|
||||
* @param dcb The DCB to free
|
||||
*/
|
||||
void
|
||||
dcb_free(DCB *dcb)
|
||||
{
|
||||
/* Set the bitmask of running pollng threads */
|
||||
bitmask_copy(&dcb->memdata.bitmask, poll_bitmask());
|
||||
|
||||
/* Add the DCB to the Zombie list */
|
||||
spinlock_acquire(&zombiespin);
|
||||
if (zombies == NULL)
|
||||
zombies = dcb;
|
||||
else
|
||||
{
|
||||
DCB *ptr = zombies;
|
||||
while (ptr->memdata.next)
|
||||
ptr = ptr->memdata.next;
|
||||
ptr->memdata.next = dcb;
|
||||
}
|
||||
spinlock_release(&zombiespin);
|
||||
|
||||
|
||||
dcb->state = DCB_STATE_ZOMBIE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Free a DCB and remove it from the chain of all DCBs
|
||||
*
|
||||
* NB This is called with the caller holding the zombie queue
|
||||
* spinlock
|
||||
*
|
||||
* @param dcb The DCB to free
|
||||
*/
|
||||
static void
|
||||
dcb_final_free(DCB *dcb)
|
||||
{
|
||||
dcb->state = DCB_STATE_FREED;
|
||||
|
||||
/* First remove this DCB from the chain */
|
||||
spinlock_acquire(dcbspin);
|
||||
spinlock_acquire(&dcbspin);
|
||||
if (allDCBs == dcb)
|
||||
{
|
||||
/*
|
||||
* Deal with the special case of removign the DCB at the head of
|
||||
* Deal with the special case of removing the DCB at the head of
|
||||
* the chain.
|
||||
*/
|
||||
allDCBs = dcb->next;
|
||||
@ -134,7 +168,7 @@ dcb_free(DCB *dcb)
|
||||
if (ptr)
|
||||
ptr->next = dcb->next;
|
||||
}
|
||||
spinlock_release(dcbspin);
|
||||
spinlock_release(&dcbspin);
|
||||
|
||||
if (dcb->protocol)
|
||||
free(dcb->protocol);
|
||||
@ -145,6 +179,59 @@ dcb_free(DCB *dcb)
|
||||
free(dcb);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the DCB zombie queue
|
||||
*
|
||||
* This routine is called by each of the polling threads with
|
||||
* the thread id of the polling thread. It must clear the bit in
|
||||
* the memdata btmask for the polling thread that calls it. If the
|
||||
* operation of clearing this bit means that no bits are set in
|
||||
* the memdata.bitmask then the DCB is no longer able to be
|
||||
* referenced and it can be finally removed.
|
||||
*
|
||||
* @param threadid The thread ID of the caller
|
||||
*/
|
||||
void
|
||||
dcb_process_zombies(int threadid)
|
||||
{
|
||||
DCB *ptr, *lptr;
|
||||
|
||||
spinlock_acquire(&zombiespin);
|
||||
ptr = zombies;
|
||||
lptr = NULL;
|
||||
while (ptr)
|
||||
{
|
||||
bitmask_clear(&ptr->memdata.bitmask, threadid);
|
||||
if (bitmask_isallclear(&ptr->memdata.bitmask))
|
||||
{
|
||||
/*
|
||||
* Remove the DCB from the zombie queue
|
||||
* and call the final free routine for the
|
||||
* DCB
|
||||
*
|
||||
* ptr is the DCB we are processing
|
||||
* lptr is the previous DCB on the zombie queue
|
||||
* or NULL if the DCB is at the head of the queue
|
||||
* tptr is the DCB after the one we are processing
|
||||
* on the zombie queue
|
||||
*/
|
||||
DCB *tptr = ptr->memdata.next;
|
||||
if (lptr == NULL)
|
||||
zombies = tptr;
|
||||
else
|
||||
lptr->memdata.next = tptr;
|
||||
dcb_final_free(ptr);
|
||||
ptr = tptr;
|
||||
}
|
||||
else
|
||||
{
|
||||
lptr = ptr;
|
||||
ptr = ptr->memdata.next;
|
||||
}
|
||||
}
|
||||
spinlock_release(&zombiespin);
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to a server
|
||||
*
|
||||
@ -426,20 +513,14 @@ void printAllDCBs()
|
||||
{
|
||||
DCB *dcb;
|
||||
|
||||
if (dcbspin == NULL)
|
||||
{
|
||||
if ((dcbspin = malloc(sizeof(SPINLOCK))) == NULL)
|
||||
return;
|
||||
spinlock_init(dcbspin);
|
||||
}
|
||||
spinlock_acquire(dcbspin);
|
||||
spinlock_acquire(&dcbspin);
|
||||
dcb = allDCBs;
|
||||
while (dcb)
|
||||
{
|
||||
printDCB(dcb);
|
||||
dcb = dcb->next;
|
||||
}
|
||||
spinlock_release(dcbspin);
|
||||
spinlock_release(&dcbspin);
|
||||
}
|
||||
|
||||
|
||||
@ -451,13 +532,7 @@ void dprintAllDCBs(DCB *pdcb)
|
||||
{
|
||||
DCB *dcb;
|
||||
|
||||
if (dcbspin == NULL)
|
||||
{
|
||||
if ((dcbspin = malloc(sizeof(SPINLOCK))) == NULL)
|
||||
return;
|
||||
spinlock_init(dcbspin);
|
||||
}
|
||||
spinlock_acquire(dcbspin);
|
||||
spinlock_acquire(&dcbspin);
|
||||
dcb = allDCBs;
|
||||
while (dcb)
|
||||
{
|
||||
@ -475,7 +550,7 @@ DCB *dcb;
|
||||
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
|
||||
dcb = dcb->next;
|
||||
}
|
||||
spinlock_release(dcbspin);
|
||||
spinlock_release(&dcbspin);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -524,6 +599,8 @@ gw_dcb_state2string (int state) {
|
||||
return "DCB socket closed";
|
||||
case DCB_STATE_FREED:
|
||||
return "DCB memory could be freed";
|
||||
case DCB_STATE_ZOMBIE:
|
||||
return "DCB Zombie";
|
||||
default:
|
||||
return "DCB (unknown)";
|
||||
}
|
||||
|
@ -48,6 +48,8 @@
|
||||
#include <config.h>
|
||||
#include <poll.h>
|
||||
|
||||
#include <stdlib.h>
|
||||
|
||||
#if defined(SS_DEBUG)
|
||||
# include <skygw_utils.h>
|
||||
# include <log_manager.h>
|
||||
@ -178,19 +180,19 @@ int handle_event_errors_backend(DCB *dcb) {
|
||||
int
|
||||
main(int argc, char **argv)
|
||||
{
|
||||
int daemon_mode = 1;
|
||||
sigset_t sigset;
|
||||
int n, n_threads;
|
||||
void **threads;
|
||||
char buf[1024], *home, *cnf_file = NULL;
|
||||
int i;
|
||||
|
||||
int daemon_mode = 1;
|
||||
sigset_t sigset;
|
||||
int n, n_threads;
|
||||
void **threads;
|
||||
char buf[1024], *home, *cnf_file = NULL;
|
||||
#if defined(SS_DEBUG)
|
||||
i = atexit(skygw_logmanager_exit);
|
||||
int i;
|
||||
|
||||
if (i != 0) {
|
||||
fprintf(stderr, "Couldn't register exit function.\n");
|
||||
}
|
||||
i = atexit(skygw_logmanager_exit);
|
||||
|
||||
if (i != 0) {
|
||||
fprintf(stderr, "Couldn't register exit function.\n");
|
||||
}
|
||||
#endif
|
||||
if ((home = getenv("GATEWAY_HOME")) != NULL)
|
||||
{
|
||||
@ -219,7 +221,7 @@ main(int argc, char **argv)
|
||||
if (cnf_file == NULL)
|
||||
{
|
||||
#if defined(SS_DEBUG)
|
||||
skygw_log_write(
|
||||
skygw_log_write(
|
||||
NULL,
|
||||
LOGFILE_ERROR,
|
||||
strdup("Unable to find a gateway configuration file, either "
|
||||
@ -235,7 +237,7 @@ main(int argc, char **argv)
|
||||
if (!config_load(cnf_file))
|
||||
{
|
||||
#if defined(SS_DEBUG)
|
||||
skygw_log_write(NULL,
|
||||
skygw_log_write(NULL,
|
||||
LOGFILE_ERROR,
|
||||
"Failed to load gateway configuration file %s\n");
|
||||
#endif
|
||||
@ -286,8 +288,8 @@ main(int argc, char **argv)
|
||||
n_threads = config_threadcount();
|
||||
threads = (void **)calloc(n_threads, sizeof(void *));
|
||||
for (n = 0; n < n_threads - 1; n++)
|
||||
threads[n] = thread_start(poll_waitevents);
|
||||
poll_waitevents();
|
||||
threads[n] = thread_start(poll_waitevents, (void *)(n + 1));
|
||||
poll_waitevents((void *)0);
|
||||
for (n = 0; n < n_threads - 1; n++)
|
||||
thread_wait(threads[n]);
|
||||
|
||||
|
190
core/gwbitmask.c
Normal file
190
core/gwbitmask.c
Normal file
@ -0,0 +1,190 @@
|
||||
/*
|
||||
* This file is distributed as part of the SkySQL Gateway. It is free
|
||||
* software: you can redistribute it and/or modify it under the terms of the
|
||||
* GNU General Public License as published by the Free Software Foundation,
|
||||
* version 2.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
* details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along with
|
||||
* this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Copyright SkySQL Ab 2013
|
||||
*/
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <gwbitmask.h>
|
||||
|
||||
/**
|
||||
* @file gwbitmask.c - Implementation of bitmask opertions for the gateway
|
||||
*
|
||||
* We provide basic bitmask manipulation routines, the size of
|
||||
* the bitmask will grow dynamically based on the highest bit
|
||||
* number that is set or cleared within the bitmask.
|
||||
*
|
||||
* Bitmsk growth happens in increments rather than via a single bit as
|
||||
* a time.
|
||||
*
|
||||
* @verbatim
|
||||
* Revision History
|
||||
*
|
||||
* Date Who Description
|
||||
* 28/06/13 Mark Riddoch Initial implementation
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
|
||||
/**
|
||||
* Initialise a bitmask
|
||||
*
|
||||
* @param bitmask Pointer the bitmask
|
||||
* @return The value of *variable before the add occured
|
||||
*/
|
||||
void
|
||||
bitmask_init(GWBITMASK *bitmask)
|
||||
{
|
||||
bitmask->length = BIT_LENGTH_INITIAL;
|
||||
bitmask->bits = malloc(bitmask->length / 8);
|
||||
memset(bitmask->bits, 0, bitmask->length / 8);
|
||||
spinlock_init(&bitmask->lock);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the bit at the specified bit position in the bitmask.
|
||||
* The bitmask will automatically be extended if the bit is
|
||||
* beyond the current bitmask length
|
||||
*
|
||||
* @param bitmask Pointer the bitmask
|
||||
* @param bit Bit to set
|
||||
*/
|
||||
void
|
||||
bitmask_set(GWBITMASK *bitmask, int bit)
|
||||
{
|
||||
unsigned char *ptr;
|
||||
unsigned char mask;
|
||||
|
||||
spinlock_acquire(&bitmask->lock);
|
||||
if (bit >= bitmask->length)
|
||||
{
|
||||
bitmask->bits = realloc(bitmask->bits,
|
||||
(bitmask->length + BIT_LENGTH_INC) / 8);
|
||||
memset(bitmask + (bitmask->length / 8), 0,
|
||||
BIT_LENGTH_INC / 8);
|
||||
bitmask->length += (BIT_LENGTH_INC / 8);
|
||||
}
|
||||
ptr = bitmask->bits + (bit / 8);
|
||||
mask = 1 << (bit % 8);
|
||||
*ptr |= mask;
|
||||
spinlock_release(&bitmask->lock);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear the bit at the specified bit position in the bitmask.
|
||||
* The bitmask will automatically be extended if the bit is
|
||||
* beyond the current bitmask length
|
||||
*
|
||||
* @param bitmask Pointer the bitmask
|
||||
* @param bit Bit to clear
|
||||
*/
|
||||
void
|
||||
bitmask_clear(GWBITMASK *bitmask, int bit)
|
||||
{
|
||||
unsigned char *ptr;
|
||||
unsigned char mask;
|
||||
|
||||
if (bit >= bitmask->length)
|
||||
{
|
||||
bitmask->bits = realloc(bitmask->bits,
|
||||
(bitmask->length + BIT_LENGTH_INC) / 8);
|
||||
memset(bitmask + (bitmask->length / 8), 0,
|
||||
BIT_LENGTH_INC / 8);
|
||||
bitmask->length += (BIT_LENGTH_INC / 8);
|
||||
}
|
||||
ptr = bitmask->bits + (bit / 8);
|
||||
mask = 1 << (bit % 8);
|
||||
*ptr &= ~mask;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a non-zero value if the bit at the specified bit
|
||||
* position in the bitmask is set.
|
||||
* The bitmask will automatically be extended if the bit is
|
||||
* beyond the current bitmask length
|
||||
*
|
||||
* @param bitmask Pointer the bitmask
|
||||
* @param bit Bit to clear
|
||||
*/
|
||||
int
|
||||
bitmask_isset(GWBITMASK *bitmask, int bit)
|
||||
{
|
||||
unsigned char *ptr;
|
||||
unsigned char mask;
|
||||
|
||||
spinlock_acquire(&bitmask->lock);
|
||||
if (bit >= bitmask->length)
|
||||
{
|
||||
bitmask->bits = realloc(bitmask->bits,
|
||||
(bitmask->length + BIT_LENGTH_INC) / 8);
|
||||
memset(bitmask + (bitmask->length / 8), 0,
|
||||
BIT_LENGTH_INC / 8);
|
||||
bitmask->length += (BIT_LENGTH_INC / 8);
|
||||
}
|
||||
ptr = bitmask->bits + (bit / 8);
|
||||
mask = 1 << (bit % 8);
|
||||
spinlock_release(&bitmask->lock);
|
||||
return *ptr & mask;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a non-zero value of the bitmask has no bits set
|
||||
* in it.
|
||||
*
|
||||
* @param bitmask Pointer the bitmask
|
||||
* @return Non-zero if the bitmask has no bits set
|
||||
*/
|
||||
int
|
||||
bitmask_isallclear(GWBITMASK *bitmask)
|
||||
{
|
||||
unsigned char *ptr, *eptr;
|
||||
|
||||
spinlock_acquire(&bitmask->lock);
|
||||
ptr = bitmask->bits;
|
||||
eptr = ptr + (bitmask->length / 8);
|
||||
while (ptr < eptr)
|
||||
{
|
||||
if (*ptr != 0)
|
||||
{
|
||||
spinlock_release(&bitmask->lock);
|
||||
return 0;
|
||||
}
|
||||
ptr++;
|
||||
}
|
||||
spinlock_release(&bitmask->lock);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param bitmask Pointer the bitmask
|
||||
* @param value Value to be added
|
||||
* @return The value of *variable before the add occured
|
||||
*/
|
||||
void
|
||||
bitmask_copy(GWBITMASK *dest, GWBITMASK *src)
|
||||
{
|
||||
spinlock_acquire(&src->lock);
|
||||
spinlock_acquire(&dest->lock);
|
||||
if (dest->length)
|
||||
free(dest->bits);
|
||||
dest->bits = malloc(src->length / 8);
|
||||
dest->length = src->length;
|
||||
memcpy(dest->bits, src->bits, src->length / 8);
|
||||
spinlock_release(&dest->lock);
|
||||
spinlock_release(&src->lock);
|
||||
}
|
||||
|
35
core/poll.c
35
core/poll.c
@ -23,6 +23,7 @@
|
||||
#include <poll.h>
|
||||
#include <dcb.h>
|
||||
#include <atomic.h>
|
||||
#include <gwbitmask.h>
|
||||
|
||||
/**
|
||||
* @file poll.c - Abstraction of the epoll functionality
|
||||
@ -32,13 +33,15 @@
|
||||
*
|
||||
* Date Who Description
|
||||
* 19/06/13 Mark Riddoch Initial implementation
|
||||
* 28/06/13 Mark Riddoch Added poll mask support and DCB
|
||||
* zombie management
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
|
||||
static int epoll_fd = -1; /**< The epoll file descriptor */
|
||||
static int shutdown = 0; /**< Flag the shutdown of the poll subsystem */
|
||||
|
||||
static int epoll_fd = -1; /**< The epoll file descriptor */
|
||||
static int shutdown = 0; /**< Flag the shutdown of the poll subsystem */
|
||||
static GWBITMASK poll_mask;
|
||||
/**
|
||||
* The polling statistics
|
||||
*/
|
||||
@ -68,6 +71,7 @@ poll_init()
|
||||
exit(-1);
|
||||
}
|
||||
memset(&pollStats, 0, sizeof(pollStats));
|
||||
bitmask_init(&poll_mask);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -111,7 +115,8 @@ struct epoll_event ev;
|
||||
* The main polling loop
|
||||
*
|
||||
* This routine does the polling and despatches of IO events
|
||||
* to the DCB's
|
||||
* to the DCB's. It may be called either directly or as the entry point
|
||||
* of a polling thread within the gateway.
|
||||
*
|
||||
* The routine will loop as long as the variable "shutdown" is set to zero,
|
||||
* setting this to a non-zero value will cause the polling loop to return.
|
||||
@ -126,13 +131,17 @@ struct epoll_event ev;
|
||||
* with timeout. The call with the timeout differs in that the Linux scheduler may
|
||||
* deschedule a process if a timeout is included, but will not do this if a 0 timeout
|
||||
* value is given. this improves performance when the gateway is under heavy load.
|
||||
*
|
||||
* @parm arg The thread ID passed as a void * to satisfy the threading package
|
||||
*/
|
||||
void
|
||||
poll_waitevents()
|
||||
poll_waitevents(void *arg)
|
||||
{
|
||||
struct epoll_event events[MAX_EVENTS];
|
||||
int i, nfds;
|
||||
int thread_id = (int)arg;
|
||||
|
||||
bitmask_set(&poll_mask, thread_id);
|
||||
while (1)
|
||||
{
|
||||
#if BLOCKINGPOLL
|
||||
@ -158,6 +167,9 @@ int i, nfds;
|
||||
DCB *dcb = (DCB *)events[i].data.ptr;
|
||||
__uint32_t ev = events[i].events;
|
||||
|
||||
if (DCB_ISZOMBIE(dcb))
|
||||
continue;
|
||||
|
||||
if (ev & EPOLLERR)
|
||||
{
|
||||
atomic_add(&pollStats.n_error, 1);
|
||||
@ -188,8 +200,10 @@ int i, nfds;
|
||||
}
|
||||
}
|
||||
}
|
||||
dcb_process_zombies(thread_id);
|
||||
if (shutdown)
|
||||
{
|
||||
bitmask_clear(&poll_mask, thread_id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -204,6 +218,17 @@ poll_shutdown()
|
||||
shutdown = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the bitmask of polling threads
|
||||
*
|
||||
* @return The bitmask of the running polling threads
|
||||
*/
|
||||
GWBITMASK *
|
||||
poll_bitmask()
|
||||
{
|
||||
return &poll_mask;
|
||||
}
|
||||
|
||||
/**
|
||||
* Debug routine to print the polling statistics
|
||||
*
|
||||
|
@ -34,14 +34,15 @@
|
||||
* Start a polling thread
|
||||
*
|
||||
* @param entry The entry point to call
|
||||
* @param arg The argument to pass the thread entry point
|
||||
* @return The thread handle
|
||||
*/
|
||||
void *
|
||||
thread_start(void (*entry)())
|
||||
thread_start(void (*entry)(void *), void *arg)
|
||||
{
|
||||
pthread_t thd;
|
||||
|
||||
if (pthread_create(&thd, NULL, (void *(*)(void *))entry, NULL) != 0)
|
||||
if (pthread_create(&thd, NULL, (void *(*)(void *))entry, arg) != 0)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
*/
|
||||
#include <spinlock.h>
|
||||
#include <buffer.h>
|
||||
#include <gwbitmask.h>
|
||||
|
||||
struct session;
|
||||
struct server;
|
||||
@ -87,6 +88,29 @@ typedef struct dcbstats {
|
||||
int n_buffered; /**< Number of buffered writes */
|
||||
} DCBSTATS;
|
||||
|
||||
/**
|
||||
* The data structure that is embedded witin a DCB and manages the complex memory
|
||||
* management issues of a DCB.
|
||||
*
|
||||
* The DCB structures are used as the user data within the polling loop. This means that
|
||||
* polling threads may aschronously wake up and access these structures. It is not possible
|
||||
* to simple remove the DCB from the epoll system and then free the data, as every thread
|
||||
* that is currently running an epoll call must wake up and re-issue the epoll_wait system
|
||||
* call, the is the only way we can be sure that no polling thread is pending a wakeup or
|
||||
* processing an event that will access the DCB.
|
||||
*
|
||||
* We solve this issue by making the dcb_free routine merely mark a DCB as a zombie and
|
||||
* place it on a special zombie list. Before placing the DCB on the zombie list we create
|
||||
* a bitmask with a bit set in it for each active polling thread. Each thread will call
|
||||
* a routine to process the zombie list at the end of the polling loop. This routine
|
||||
* will clear the bit value that corresponds to the calling thread. Once the bitmask
|
||||
* is completely cleared the DCB can finally be freed and removed from the zombie list.
|
||||
*/
|
||||
typedef struct {
|
||||
GWBITMASK bitmask; /**< The bitmask of threads */
|
||||
struct dcb *next; /**< Next pointer for the zombie list */
|
||||
} DCBMM;
|
||||
|
||||
/**
|
||||
* Descriptor Control Block
|
||||
*
|
||||
@ -114,6 +138,7 @@ typedef struct dcb {
|
||||
struct dcb *next; /**< Next DCB in the chain of allocated DCB's */
|
||||
struct service *service; /**< The related service */
|
||||
void *data; /**< Specific client data */
|
||||
DCBMM memdata; /**< The data related to DCB memory management */
|
||||
} DCB;
|
||||
|
||||
/* DCB states */
|
||||
@ -124,10 +149,12 @@ typedef struct dcb {
|
||||
#define DCB_STATE_LISTENING 5 /**< The DCB is for a listening socket */
|
||||
#define DCB_STATE_DISCONNECTED 6 /**< The socket is now closed */
|
||||
#define DCB_STATE_FREED 7 /**< Memory freed */
|
||||
#define DCB_STATE_ZOMBIE 8 /**< DCB is no longer active, waiting to free it */
|
||||
|
||||
/* A few useful macros */
|
||||
#define DCB_SESSION(x) (x)->session
|
||||
#define DCB_PROTOCOL(x, type) (type *)((x)->protocol)
|
||||
#define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE)
|
||||
|
||||
extern DCB *dcb_alloc(); /* Allocate a DCB */
|
||||
extern void dcb_free(DCB *); /* Free a DCB */
|
||||
@ -136,6 +163,7 @@ extern int dcb_read(DCB *, GWBUF **); /* Generic read routine */
|
||||
extern int dcb_write(DCB *, GWBUF *); /* Generic write routine */
|
||||
extern int dcb_drain_writeq(DCB *); /* Generic write routine */
|
||||
extern void dcb_close(DCB *); /* Generic close functionality */
|
||||
extern void dcb_process_zombies(int); /* Process Zombies */
|
||||
extern void printAllDCBs(); /* Debug to print all DCB in the system */
|
||||
extern void printDCB(DCB *); /* Debug print routine */
|
||||
extern void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */
|
||||
|
52
include/gwbitmask.h
Normal file
52
include/gwbitmask.h
Normal file
@ -0,0 +1,52 @@
|
||||
#ifndef _GWBITMASK_H
|
||||
#define _GWBITMASK_H
|
||||
/*
|
||||
* This file is distributed as part of the SkySQL Gateway. It is free
|
||||
* software: you can redistribute it and/or modify it under the terms of the
|
||||
* GNU General Public License as published by the Free Software Foundation,
|
||||
* version 2.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
* details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along with
|
||||
* this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Copyright SkySQL Ab 2013
|
||||
*/
|
||||
#include <spinlock.h>
|
||||
|
||||
/**
|
||||
* @file gwbitmask.h An implementation of an arbitarly long bitmask
|
||||
*
|
||||
* @verbatim
|
||||
* Revision History
|
||||
*
|
||||
* Date Who Description
|
||||
* 28/06/13 Mark Riddoch Initial implementation
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
|
||||
#define BIT_LENGTH_INITIAL 32 /**< Initial number of bits in the bitmask */
|
||||
#define BIT_LENGTH_INC 32 /**< Number of bits to add on each increment */
|
||||
|
||||
/**
|
||||
* The bitmask structure used to store an arbitary large bitmask
|
||||
*/
|
||||
typedef struct {
|
||||
SPINLOCK lock; /**< Lock to protect the bitmask */
|
||||
unsigned char *bits; /**< Pointer to the bits themselves */
|
||||
unsigned int length; /**< The number of bits in the bitmask */
|
||||
} GWBITMASK;
|
||||
|
||||
extern void bitmask_init(GWBITMASK *);
|
||||
extern void bitmask_set(GWBITMASK *, int);
|
||||
extern void bitmask_clear(GWBITMASK *, int);
|
||||
extern int bitmask_isset(GWBITMASK *, int);
|
||||
extern int bitmask_isallclear(GWBITMASK *);
|
||||
extern void bitmask_copy(GWBITMASK *, GWBITMASK *);
|
||||
#endif
|
@ -18,6 +18,7 @@
|
||||
* Copyright SkySQL Ab 2013
|
||||
*/
|
||||
#include <dcb.h>
|
||||
#include <gwbitmask.h>
|
||||
|
||||
/**
|
||||
* @file poll.h The poll related functionality
|
||||
@ -33,10 +34,11 @@
|
||||
#define MAX_EVENTS 1000
|
||||
#define EPOLL_TIMEOUT 1000 /**< The epoll timeout we use (milliseconds) */
|
||||
|
||||
extern void poll_init();
|
||||
extern int poll_add_dcb(DCB *);
|
||||
extern int poll_remove_dcb(DCB *);
|
||||
extern void poll_waitevents();
|
||||
extern void poll_shutdown();
|
||||
extern void dprintPollStats(DCB *);
|
||||
extern void poll_init();
|
||||
extern int poll_add_dcb(DCB *);
|
||||
extern int poll_remove_dcb(DCB *);
|
||||
extern void poll_waitevents(void *);
|
||||
extern void poll_shutdown();
|
||||
extern GWBITMASK *poll_bitmask();
|
||||
extern void dprintPollStats(DCB *);
|
||||
#endif
|
||||
|
@ -19,10 +19,19 @@
|
||||
*/
|
||||
#include <pthread.h>
|
||||
|
||||
/**
|
||||
* @file thread.h The gateway threading interface
|
||||
*
|
||||
* An encapsulation of the threading used by the gateway. This is designed to
|
||||
* isolate the majority of the gateway code from th epthread library, enabling
|
||||
* the gateway to be ported to a different threading package with the minimum
|
||||
* of changes.
|
||||
*/
|
||||
|
||||
#define THREAD pthread_t
|
||||
#define THREAD_SHELF pthread_self
|
||||
|
||||
extern void *thread_start(void (*entry)());
|
||||
extern void *thread_start(void (*entry)(void *), void *arg);
|
||||
extern void thread_wait(void *thd);
|
||||
|
||||
#endif
|
||||
|
@ -55,7 +55,7 @@ libdebugcli.so: $(DEBUGCLIOBJ)
|
||||
$(CC) $(LDFLAGS) $(DEBUGCLIOBJ) $(LIBS) -o $@
|
||||
|
||||
libreadwritesplit.so:
|
||||
(cd readwritesplit; make; cp $@ ..)
|
||||
# (cd readwritesplit; make; cp $@ ..)
|
||||
|
||||
.c.o:
|
||||
$(CC) $(CFLAGS) $< -o $@
|
||||
|
@ -121,7 +121,7 @@ ModuleInit()
|
||||
ROUTER_OBJECT *
|
||||
GetModuleObject()
|
||||
{
|
||||
fprintf(stderr, "Returing test router module object.\n");
|
||||
fprintf(stderr, "Returning test router module object.\n");
|
||||
return &MyObject;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user