Addition of an arbitary bitmask handling set of functions
New memory deallocation routines for the DCBS
This commit is contained in:
@ -43,8 +43,9 @@ CC=cc
|
|||||||
|
|
||||||
CFLAGS=-c -I/usr/include -I../include -I../inih \
|
CFLAGS=-c -I/usr/include -I../include -I../inih \
|
||||||
-I$(LOGPATH) -I$(UTILSPATH) -I$(QCLASSPATH) \
|
-I$(LOGPATH) -I$(UTILSPATH) -I$(QCLASSPATH) \
|
||||||
-I$(MARIADB_SRC_PATH)/include \
|
-I/usr/include/mysql \
|
||||||
-Wall -g
|
-Wall -g
|
||||||
|
# -I$(MARIADB_SRC_PATH)/include \
|
||||||
|
|
||||||
ifdef DEBUG
|
ifdef DEBUG
|
||||||
CFLAGS := $(CFLAGS) -DSS_DEBUG
|
CFLAGS := $(CFLAGS) -DSS_DEBUG
|
||||||
@ -56,20 +57,20 @@ LDFLAGS=-rdynamic -L$(LOGPATH) -L$(QCLASSPATH) -L$(MARIADB_SRC_PATH)/libmysqld \
|
|||||||
|
|
||||||
SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c \
|
SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c \
|
||||||
gw_utils.c utils.c dcb.c load_utils.c session.c service.c server.c \
|
gw_utils.c utils.c dcb.c load_utils.c session.c service.c server.c \
|
||||||
poll.c config.c users.c hashtable.c dbusers.c thread.c
|
poll.c config.c users.c hashtable.c dbusers.c thread.c gwbitmask.c
|
||||||
|
|
||||||
HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \
|
HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \
|
||||||
../include/gateway_mysql.h ../include/gw.h ../include/mysql_protocol.h \
|
../include/gateway_mysql.h ../include/gw.h ../include/mysql_protocol.h \
|
||||||
../include/session.h ../include/spinlock.h ../include/thread.h \
|
../include/session.h ../include/spinlock.h ../include/thread.h \
|
||||||
../include/modules.h ../include/poll.h ../include/config.h \
|
../include/modules.h ../include/poll.h ../include/config.h \
|
||||||
../include/users.h ../include/hashtable.h
|
../include/users.h ../include/hashtable.h ../include/gwbitmask.h
|
||||||
OBJ=$(SRCS:.c=.o)
|
OBJ=$(SRCS:.c=.o)
|
||||||
|
|
||||||
#LIBS=-L../inih/extra -linih -lssl -lstdc++ \
|
|
||||||
# -L/packages/mariadb-5.5.25/libmysql -lmysqlclient \
|
|
||||||
|
|
||||||
LIBS=-L../inih/extra -linih -lssl -lstdc++ \
|
LIBS=-L../inih/extra -linih -lssl -lstdc++ \
|
||||||
-lz -lm -lcrypto -ldl -pthread -llog_manager -lquery_classifier -lmysqld
|
-L/packages/mariadb-5.5.25/libmysql -lmysqlclient \
|
||||||
|
|
||||||
|
#LIBS=-L../inih/extra -linih -lssl -lstdc++ \
|
||||||
|
# -lz -lm -lcrypto -ldl -pthread -llog_manager -lquery_classifier -lmysqld
|
||||||
|
|
||||||
gateway: $(OBJ)
|
gateway: $(OBJ)
|
||||||
$(CC) $(LDFLAGS) $(OBJ) $(UTILSPATH)/skygw_utils.o $(LIBS) -o $@
|
$(CC) $(LDFLAGS) $(OBJ) $(UTILSPATH)/skygw_utils.o $(LIBS) -o $@
|
||||||
|
141
core/dcb.c
141
core/dcb.c
@ -28,10 +28,13 @@
|
|||||||
* @verbatim
|
* @verbatim
|
||||||
* Revision History
|
* Revision History
|
||||||
*
|
*
|
||||||
* Date Who Description
|
* Date Who Description
|
||||||
* 12/06/13 Mark Riddoch Initial implementation
|
* 12/06/13 Mark Riddoch Initial implementation
|
||||||
* 21/06/13 Massimiliano Pinto free_dcb is used
|
* 21/06/13 Massimiliano Pinto free_dcb is used
|
||||||
* 25/06/13 Massimiliano Pinto Added checks to session and router_session
|
* 25/06/13 Massimiliano Pinto Added checks to session and router_session
|
||||||
|
* 28/06/13 Mark Riddoch Changed the free mechanism ti
|
||||||
|
* introduce a zombie state for the
|
||||||
|
* dcb
|
||||||
* @endverbatim
|
* @endverbatim
|
||||||
*/
|
*/
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
@ -51,8 +54,11 @@
|
|||||||
#include <atomic.h>
|
#include <atomic.h>
|
||||||
|
|
||||||
static DCB *allDCBs = NULL; /* Diagnotics need a list of DCBs */
|
static DCB *allDCBs = NULL; /* Diagnotics need a list of DCBs */
|
||||||
static SPINLOCK *dcbspin = NULL;
|
static DCB *zombies = NULL;
|
||||||
|
static SPINLOCK dcbspin = SPINLOCK_INIT;
|
||||||
|
static SPINLOCK zombiespin = SPINLOCK_INIT;
|
||||||
|
|
||||||
|
static void dcb_final_free(DCB *dcb);
|
||||||
/**
|
/**
|
||||||
* Allocate a new DCB.
|
* Allocate a new DCB.
|
||||||
*
|
*
|
||||||
@ -66,13 +72,6 @@ dcb_alloc()
|
|||||||
{
|
{
|
||||||
DCB *rval;
|
DCB *rval;
|
||||||
|
|
||||||
if (dcbspin == NULL)
|
|
||||||
{
|
|
||||||
if ((dcbspin = malloc(sizeof(SPINLOCK))) == NULL)
|
|
||||||
return NULL;
|
|
||||||
spinlock_init(dcbspin);
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((rval = malloc(sizeof(DCB))) == NULL)
|
if ((rval = malloc(sizeof(DCB))) == NULL)
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
@ -86,8 +85,10 @@ DCB *rval;
|
|||||||
rval->protocol = NULL;
|
rval->protocol = NULL;
|
||||||
rval->session = NULL;
|
rval->session = NULL;
|
||||||
memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics
|
memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics
|
||||||
|
bitmask_init(&rval->memdata.bitmask);
|
||||||
|
rval->memdata.next = NULL;
|
||||||
|
|
||||||
spinlock_acquire(dcbspin);
|
spinlock_acquire(&dcbspin);
|
||||||
if (allDCBs == NULL)
|
if (allDCBs == NULL)
|
||||||
allDCBs = rval;
|
allDCBs = rval;
|
||||||
else
|
else
|
||||||
@ -97,26 +98,59 @@ DCB *rval;
|
|||||||
ptr = ptr->next;
|
ptr = ptr->next;
|
||||||
ptr->next = rval;
|
ptr->next = rval;
|
||||||
}
|
}
|
||||||
spinlock_release(dcbspin);
|
spinlock_release(&dcbspin);
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Free a DCB and remove it from the chain of all DCBs
|
* Free a DCB, this only marks the DCB as a zombie and adds it
|
||||||
|
* to the zombie list. The real working of removing it occurs once
|
||||||
|
* all the threads signal they no longer have access to the DCB
|
||||||
*
|
*
|
||||||
* @param dcb The DCB to free
|
* @param dcb The DCB to free
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
dcb_free(DCB *dcb)
|
dcb_free(DCB *dcb)
|
||||||
|
{
|
||||||
|
/* Set the bitmask of running pollng threads */
|
||||||
|
bitmask_copy(&dcb->memdata.bitmask, poll_bitmask());
|
||||||
|
|
||||||
|
/* Add the DCB to the Zombie list */
|
||||||
|
spinlock_acquire(&zombiespin);
|
||||||
|
if (zombies == NULL)
|
||||||
|
zombies = dcb;
|
||||||
|
else
|
||||||
|
{
|
||||||
|
DCB *ptr = zombies;
|
||||||
|
while (ptr->memdata.next)
|
||||||
|
ptr = ptr->memdata.next;
|
||||||
|
ptr->memdata.next = dcb;
|
||||||
|
}
|
||||||
|
spinlock_release(&zombiespin);
|
||||||
|
|
||||||
|
|
||||||
|
dcb->state = DCB_STATE_ZOMBIE;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Free a DCB and remove it from the chain of all DCBs
|
||||||
|
*
|
||||||
|
* NB This is called with the caller holding the zombie queue
|
||||||
|
* spinlock
|
||||||
|
*
|
||||||
|
* @param dcb The DCB to free
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
dcb_final_free(DCB *dcb)
|
||||||
{
|
{
|
||||||
dcb->state = DCB_STATE_FREED;
|
dcb->state = DCB_STATE_FREED;
|
||||||
|
|
||||||
/* First remove this DCB from the chain */
|
/* First remove this DCB from the chain */
|
||||||
spinlock_acquire(dcbspin);
|
spinlock_acquire(&dcbspin);
|
||||||
if (allDCBs == dcb)
|
if (allDCBs == dcb)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Deal with the special case of removign the DCB at the head of
|
* Deal with the special case of removing the DCB at the head of
|
||||||
* the chain.
|
* the chain.
|
||||||
*/
|
*/
|
||||||
allDCBs = dcb->next;
|
allDCBs = dcb->next;
|
||||||
@ -134,7 +168,7 @@ dcb_free(DCB *dcb)
|
|||||||
if (ptr)
|
if (ptr)
|
||||||
ptr->next = dcb->next;
|
ptr->next = dcb->next;
|
||||||
}
|
}
|
||||||
spinlock_release(dcbspin);
|
spinlock_release(&dcbspin);
|
||||||
|
|
||||||
if (dcb->protocol)
|
if (dcb->protocol)
|
||||||
free(dcb->protocol);
|
free(dcb->protocol);
|
||||||
@ -145,6 +179,59 @@ dcb_free(DCB *dcb)
|
|||||||
free(dcb);
|
free(dcb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process the DCB zombie queue
|
||||||
|
*
|
||||||
|
* This routine is called by each of the polling threads with
|
||||||
|
* the thread id of the polling thread. It must clear the bit in
|
||||||
|
* the memdata btmask for the polling thread that calls it. If the
|
||||||
|
* operation of clearing this bit means that no bits are set in
|
||||||
|
* the memdata.bitmask then the DCB is no longer able to be
|
||||||
|
* referenced and it can be finally removed.
|
||||||
|
*
|
||||||
|
* @param threadid The thread ID of the caller
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
dcb_process_zombies(int threadid)
|
||||||
|
{
|
||||||
|
DCB *ptr, *lptr;
|
||||||
|
|
||||||
|
spinlock_acquire(&zombiespin);
|
||||||
|
ptr = zombies;
|
||||||
|
lptr = NULL;
|
||||||
|
while (ptr)
|
||||||
|
{
|
||||||
|
bitmask_clear(&ptr->memdata.bitmask, threadid);
|
||||||
|
if (bitmask_isallclear(&ptr->memdata.bitmask))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Remove the DCB from the zombie queue
|
||||||
|
* and call the final free routine for the
|
||||||
|
* DCB
|
||||||
|
*
|
||||||
|
* ptr is the DCB we are processing
|
||||||
|
* lptr is the previous DCB on the zombie queue
|
||||||
|
* or NULL if the DCB is at the head of the queue
|
||||||
|
* tptr is the DCB after the one we are processing
|
||||||
|
* on the zombie queue
|
||||||
|
*/
|
||||||
|
DCB *tptr = ptr->memdata.next;
|
||||||
|
if (lptr == NULL)
|
||||||
|
zombies = tptr;
|
||||||
|
else
|
||||||
|
lptr->memdata.next = tptr;
|
||||||
|
dcb_final_free(ptr);
|
||||||
|
ptr = tptr;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
lptr = ptr;
|
||||||
|
ptr = ptr->memdata.next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
spinlock_release(&zombiespin);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connect to a server
|
* Connect to a server
|
||||||
*
|
*
|
||||||
@ -426,20 +513,14 @@ void printAllDCBs()
|
|||||||
{
|
{
|
||||||
DCB *dcb;
|
DCB *dcb;
|
||||||
|
|
||||||
if (dcbspin == NULL)
|
spinlock_acquire(&dcbspin);
|
||||||
{
|
|
||||||
if ((dcbspin = malloc(sizeof(SPINLOCK))) == NULL)
|
|
||||||
return;
|
|
||||||
spinlock_init(dcbspin);
|
|
||||||
}
|
|
||||||
spinlock_acquire(dcbspin);
|
|
||||||
dcb = allDCBs;
|
dcb = allDCBs;
|
||||||
while (dcb)
|
while (dcb)
|
||||||
{
|
{
|
||||||
printDCB(dcb);
|
printDCB(dcb);
|
||||||
dcb = dcb->next;
|
dcb = dcb->next;
|
||||||
}
|
}
|
||||||
spinlock_release(dcbspin);
|
spinlock_release(&dcbspin);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -451,13 +532,7 @@ void dprintAllDCBs(DCB *pdcb)
|
|||||||
{
|
{
|
||||||
DCB *dcb;
|
DCB *dcb;
|
||||||
|
|
||||||
if (dcbspin == NULL)
|
spinlock_acquire(&dcbspin);
|
||||||
{
|
|
||||||
if ((dcbspin = malloc(sizeof(SPINLOCK))) == NULL)
|
|
||||||
return;
|
|
||||||
spinlock_init(dcbspin);
|
|
||||||
}
|
|
||||||
spinlock_acquire(dcbspin);
|
|
||||||
dcb = allDCBs;
|
dcb = allDCBs;
|
||||||
while (dcb)
|
while (dcb)
|
||||||
{
|
{
|
||||||
@ -475,7 +550,7 @@ DCB *dcb;
|
|||||||
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
|
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
|
||||||
dcb = dcb->next;
|
dcb = dcb->next;
|
||||||
}
|
}
|
||||||
spinlock_release(dcbspin);
|
spinlock_release(&dcbspin);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -524,6 +599,8 @@ gw_dcb_state2string (int state) {
|
|||||||
return "DCB socket closed";
|
return "DCB socket closed";
|
||||||
case DCB_STATE_FREED:
|
case DCB_STATE_FREED:
|
||||||
return "DCB memory could be freed";
|
return "DCB memory could be freed";
|
||||||
|
case DCB_STATE_ZOMBIE:
|
||||||
|
return "DCB Zombie";
|
||||||
default:
|
default:
|
||||||
return "DCB (unknown)";
|
return "DCB (unknown)";
|
||||||
}
|
}
|
||||||
|
@ -49,7 +49,6 @@
|
|||||||
#include <poll.h>
|
#include <poll.h>
|
||||||
|
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <mysql.h>
|
|
||||||
|
|
||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
# include <skygw_utils.h>
|
# include <skygw_utils.h>
|
||||||
@ -248,20 +247,20 @@ return_without_server:
|
|||||||
int
|
int
|
||||||
main(int argc, char **argv)
|
main(int argc, char **argv)
|
||||||
{
|
{
|
||||||
int daemon_mode = 1;
|
int daemon_mode = 1;
|
||||||
sigset_t sigset;
|
sigset_t sigset;
|
||||||
int n, n_threads;
|
int n, n_threads;
|
||||||
void **threads;
|
void **threads;
|
||||||
char buf[1024], *home, *cnf_file = NULL;
|
char buf[1024], *home, *cnf_file = NULL;
|
||||||
int i;
|
|
||||||
|
|
||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
i = atexit(skygw_logmanager_exit);
|
int i;
|
||||||
|
|
||||||
if (i != 0) {
|
i = atexit(skygw_logmanager_exit);
|
||||||
fprintf(stderr, "Couldn't register exit function.\n");
|
|
||||||
}
|
if (i != 0) {
|
||||||
vilhos_test_for_query_classifier();
|
fprintf(stderr, "Couldn't register exit function.\n");
|
||||||
|
}
|
||||||
|
vilhos_test_for_query_classifier();
|
||||||
#endif
|
#endif
|
||||||
if ((home = getenv("GATEWAY_HOME")) != NULL)
|
if ((home = getenv("GATEWAY_HOME")) != NULL)
|
||||||
{
|
{
|
||||||
@ -290,7 +289,7 @@ main(int argc, char **argv)
|
|||||||
if (cnf_file == NULL)
|
if (cnf_file == NULL)
|
||||||
{
|
{
|
||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
skygw_log_write(
|
skygw_log_write(
|
||||||
NULL,
|
NULL,
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
strdup("Unable to find a gateway configuration file, either "
|
strdup("Unable to find a gateway configuration file, either "
|
||||||
@ -306,7 +305,7 @@ main(int argc, char **argv)
|
|||||||
if (!config_load(cnf_file))
|
if (!config_load(cnf_file))
|
||||||
{
|
{
|
||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
skygw_log_write(NULL,
|
skygw_log_write(NULL,
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"Failed to load gateway configuration file %s\n");
|
"Failed to load gateway configuration file %s\n");
|
||||||
#endif
|
#endif
|
||||||
@ -357,8 +356,8 @@ main(int argc, char **argv)
|
|||||||
n_threads = config_threadcount();
|
n_threads = config_threadcount();
|
||||||
threads = (void **)calloc(n_threads, sizeof(void *));
|
threads = (void **)calloc(n_threads, sizeof(void *));
|
||||||
for (n = 0; n < n_threads - 1; n++)
|
for (n = 0; n < n_threads - 1; n++)
|
||||||
threads[n] = thread_start(poll_waitevents);
|
threads[n] = thread_start(poll_waitevents, (void *)(n + 1));
|
||||||
poll_waitevents();
|
poll_waitevents((void *)0);
|
||||||
for (n = 0; n < n_threads - 1; n++)
|
for (n = 0; n < n_threads - 1; n++)
|
||||||
thread_wait(threads[n]);
|
thread_wait(threads[n]);
|
||||||
|
|
||||||
|
190
core/gwbitmask.c
Normal file
190
core/gwbitmask.c
Normal file
@ -0,0 +1,190 @@
|
|||||||
|
/*
|
||||||
|
* This file is distributed as part of the SkySQL Gateway. It is free
|
||||||
|
* software: you can redistribute it and/or modify it under the terms of the
|
||||||
|
* GNU General Public License as published by the Free Software Foundation,
|
||||||
|
* version 2.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||||
|
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||||
|
* details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License along with
|
||||||
|
* this program; if not, write to the Free Software Foundation, Inc., 51
|
||||||
|
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||||
|
*
|
||||||
|
* Copyright SkySQL Ab 2013
|
||||||
|
*/
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <gwbitmask.h>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @file gwbitmask.c - Implementation of bitmask opertions for the gateway
|
||||||
|
*
|
||||||
|
* We provide basic bitmask manipulation routines, the size of
|
||||||
|
* the bitmask will grow dynamically based on the highest bit
|
||||||
|
* number that is set or cleared within the bitmask.
|
||||||
|
*
|
||||||
|
* Bitmsk growth happens in increments rather than via a single bit as
|
||||||
|
* a time.
|
||||||
|
*
|
||||||
|
* @verbatim
|
||||||
|
* Revision History
|
||||||
|
*
|
||||||
|
* Date Who Description
|
||||||
|
* 28/06/13 Mark Riddoch Initial implementation
|
||||||
|
*
|
||||||
|
* @endverbatim
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialise a bitmask
|
||||||
|
*
|
||||||
|
* @param bitmask Pointer the bitmask
|
||||||
|
* @return The value of *variable before the add occured
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
bitmask_init(GWBITMASK *bitmask)
|
||||||
|
{
|
||||||
|
bitmask->length = BIT_LENGTH_INITIAL;
|
||||||
|
bitmask->bits = malloc(bitmask->length / 8);
|
||||||
|
memset(bitmask->bits, 0, bitmask->length / 8);
|
||||||
|
spinlock_init(&bitmask->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the bit at the specified bit position in the bitmask.
|
||||||
|
* The bitmask will automatically be extended if the bit is
|
||||||
|
* beyond the current bitmask length
|
||||||
|
*
|
||||||
|
* @param bitmask Pointer the bitmask
|
||||||
|
* @param bit Bit to set
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
bitmask_set(GWBITMASK *bitmask, int bit)
|
||||||
|
{
|
||||||
|
unsigned char *ptr;
|
||||||
|
unsigned char mask;
|
||||||
|
|
||||||
|
spinlock_acquire(&bitmask->lock);
|
||||||
|
if (bit >= bitmask->length)
|
||||||
|
{
|
||||||
|
bitmask->bits = realloc(bitmask->bits,
|
||||||
|
(bitmask->length + BIT_LENGTH_INC) / 8);
|
||||||
|
memset(bitmask + (bitmask->length / 8), 0,
|
||||||
|
BIT_LENGTH_INC / 8);
|
||||||
|
bitmask->length += (BIT_LENGTH_INC / 8);
|
||||||
|
}
|
||||||
|
ptr = bitmask->bits + (bit / 8);
|
||||||
|
mask = 1 << (bit % 8);
|
||||||
|
*ptr |= mask;
|
||||||
|
spinlock_release(&bitmask->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear the bit at the specified bit position in the bitmask.
|
||||||
|
* The bitmask will automatically be extended if the bit is
|
||||||
|
* beyond the current bitmask length
|
||||||
|
*
|
||||||
|
* @param bitmask Pointer the bitmask
|
||||||
|
* @param bit Bit to clear
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
bitmask_clear(GWBITMASK *bitmask, int bit)
|
||||||
|
{
|
||||||
|
unsigned char *ptr;
|
||||||
|
unsigned char mask;
|
||||||
|
|
||||||
|
if (bit >= bitmask->length)
|
||||||
|
{
|
||||||
|
bitmask->bits = realloc(bitmask->bits,
|
||||||
|
(bitmask->length + BIT_LENGTH_INC) / 8);
|
||||||
|
memset(bitmask + (bitmask->length / 8), 0,
|
||||||
|
BIT_LENGTH_INC / 8);
|
||||||
|
bitmask->length += (BIT_LENGTH_INC / 8);
|
||||||
|
}
|
||||||
|
ptr = bitmask->bits + (bit / 8);
|
||||||
|
mask = 1 << (bit % 8);
|
||||||
|
*ptr &= ~mask;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a non-zero value if the bit at the specified bit
|
||||||
|
* position in the bitmask is set.
|
||||||
|
* The bitmask will automatically be extended if the bit is
|
||||||
|
* beyond the current bitmask length
|
||||||
|
*
|
||||||
|
* @param bitmask Pointer the bitmask
|
||||||
|
* @param bit Bit to clear
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
bitmask_isset(GWBITMASK *bitmask, int bit)
|
||||||
|
{
|
||||||
|
unsigned char *ptr;
|
||||||
|
unsigned char mask;
|
||||||
|
|
||||||
|
spinlock_acquire(&bitmask->lock);
|
||||||
|
if (bit >= bitmask->length)
|
||||||
|
{
|
||||||
|
bitmask->bits = realloc(bitmask->bits,
|
||||||
|
(bitmask->length + BIT_LENGTH_INC) / 8);
|
||||||
|
memset(bitmask + (bitmask->length / 8), 0,
|
||||||
|
BIT_LENGTH_INC / 8);
|
||||||
|
bitmask->length += (BIT_LENGTH_INC / 8);
|
||||||
|
}
|
||||||
|
ptr = bitmask->bits + (bit / 8);
|
||||||
|
mask = 1 << (bit % 8);
|
||||||
|
spinlock_release(&bitmask->lock);
|
||||||
|
return *ptr & mask;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a non-zero value of the bitmask has no bits set
|
||||||
|
* in it.
|
||||||
|
*
|
||||||
|
* @param bitmask Pointer the bitmask
|
||||||
|
* @return Non-zero if the bitmask has no bits set
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
bitmask_isallclear(GWBITMASK *bitmask)
|
||||||
|
{
|
||||||
|
unsigned char *ptr, *eptr;
|
||||||
|
|
||||||
|
spinlock_acquire(&bitmask->lock);
|
||||||
|
ptr = bitmask->bits;
|
||||||
|
eptr = ptr + (bitmask->length / 8);
|
||||||
|
while (ptr < eptr)
|
||||||
|
{
|
||||||
|
if (*ptr != 0)
|
||||||
|
{
|
||||||
|
spinlock_release(&bitmask->lock);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
ptr++;
|
||||||
|
}
|
||||||
|
spinlock_release(&bitmask->lock);
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param bitmask Pointer the bitmask
|
||||||
|
* @param value Value to be added
|
||||||
|
* @return The value of *variable before the add occured
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
bitmask_copy(GWBITMASK *dest, GWBITMASK *src)
|
||||||
|
{
|
||||||
|
spinlock_acquire(&src->lock);
|
||||||
|
spinlock_acquire(&dest->lock);
|
||||||
|
if (dest->length)
|
||||||
|
free(dest->bits);
|
||||||
|
dest->bits = malloc(src->length / 8);
|
||||||
|
dest->length = src->length;
|
||||||
|
memcpy(dest->bits, src->bits, src->length / 8);
|
||||||
|
spinlock_release(&dest->lock);
|
||||||
|
spinlock_release(&src->lock);
|
||||||
|
}
|
||||||
|
|
35
core/poll.c
35
core/poll.c
@ -23,6 +23,7 @@
|
|||||||
#include <poll.h>
|
#include <poll.h>
|
||||||
#include <dcb.h>
|
#include <dcb.h>
|
||||||
#include <atomic.h>
|
#include <atomic.h>
|
||||||
|
#include <gwbitmask.h>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @file poll.c - Abstraction of the epoll functionality
|
* @file poll.c - Abstraction of the epoll functionality
|
||||||
@ -32,13 +33,15 @@
|
|||||||
*
|
*
|
||||||
* Date Who Description
|
* Date Who Description
|
||||||
* 19/06/13 Mark Riddoch Initial implementation
|
* 19/06/13 Mark Riddoch Initial implementation
|
||||||
|
* 28/06/13 Mark Riddoch Added poll mask support and DCB
|
||||||
|
* zombie management
|
||||||
*
|
*
|
||||||
* @endverbatim
|
* @endverbatim
|
||||||
*/
|
*/
|
||||||
|
|
||||||
static int epoll_fd = -1; /**< The epoll file descriptor */
|
static int epoll_fd = -1; /**< The epoll file descriptor */
|
||||||
static int shutdown = 0; /**< Flag the shutdown of the poll subsystem */
|
static int shutdown = 0; /**< Flag the shutdown of the poll subsystem */
|
||||||
|
static GWBITMASK poll_mask;
|
||||||
/**
|
/**
|
||||||
* The polling statistics
|
* The polling statistics
|
||||||
*/
|
*/
|
||||||
@ -68,6 +71,7 @@ poll_init()
|
|||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
memset(&pollStats, 0, sizeof(pollStats));
|
memset(&pollStats, 0, sizeof(pollStats));
|
||||||
|
bitmask_init(&poll_mask);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -111,7 +115,8 @@ struct epoll_event ev;
|
|||||||
* The main polling loop
|
* The main polling loop
|
||||||
*
|
*
|
||||||
* This routine does the polling and despatches of IO events
|
* This routine does the polling and despatches of IO events
|
||||||
* to the DCB's
|
* to the DCB's. It may be called either directly or as the entry point
|
||||||
|
* of a polling thread within the gateway.
|
||||||
*
|
*
|
||||||
* The routine will loop as long as the variable "shutdown" is set to zero,
|
* The routine will loop as long as the variable "shutdown" is set to zero,
|
||||||
* setting this to a non-zero value will cause the polling loop to return.
|
* setting this to a non-zero value will cause the polling loop to return.
|
||||||
@ -126,13 +131,17 @@ struct epoll_event ev;
|
|||||||
* with timeout. The call with the timeout differs in that the Linux scheduler may
|
* with timeout. The call with the timeout differs in that the Linux scheduler may
|
||||||
* deschedule a process if a timeout is included, but will not do this if a 0 timeout
|
* deschedule a process if a timeout is included, but will not do this if a 0 timeout
|
||||||
* value is given. this improves performance when the gateway is under heavy load.
|
* value is given. this improves performance when the gateway is under heavy load.
|
||||||
|
*
|
||||||
|
* @parm arg The thread ID passed as a void * to satisfy the threading package
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
poll_waitevents()
|
poll_waitevents(void *arg)
|
||||||
{
|
{
|
||||||
struct epoll_event events[MAX_EVENTS];
|
struct epoll_event events[MAX_EVENTS];
|
||||||
int i, nfds;
|
int i, nfds;
|
||||||
|
int thread_id = (int)arg;
|
||||||
|
|
||||||
|
bitmask_set(&poll_mask, thread_id);
|
||||||
while (1)
|
while (1)
|
||||||
{
|
{
|
||||||
#if BLOCKINGPOLL
|
#if BLOCKINGPOLL
|
||||||
@ -158,6 +167,9 @@ int i, nfds;
|
|||||||
DCB *dcb = (DCB *)events[i].data.ptr;
|
DCB *dcb = (DCB *)events[i].data.ptr;
|
||||||
__uint32_t ev = events[i].events;
|
__uint32_t ev = events[i].events;
|
||||||
|
|
||||||
|
if (DCB_ISZOMBIE(dcb))
|
||||||
|
continue;
|
||||||
|
|
||||||
if (ev & EPOLLERR)
|
if (ev & EPOLLERR)
|
||||||
{
|
{
|
||||||
atomic_add(&pollStats.n_error, 1);
|
atomic_add(&pollStats.n_error, 1);
|
||||||
@ -188,8 +200,10 @@ int i, nfds;
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
dcb_process_zombies(thread_id);
|
||||||
if (shutdown)
|
if (shutdown)
|
||||||
{
|
{
|
||||||
|
bitmask_clear(&poll_mask, thread_id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -204,6 +218,17 @@ poll_shutdown()
|
|||||||
shutdown = 1;
|
shutdown = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the bitmask of polling threads
|
||||||
|
*
|
||||||
|
* @return The bitmask of the running polling threads
|
||||||
|
*/
|
||||||
|
GWBITMASK *
|
||||||
|
poll_bitmask()
|
||||||
|
{
|
||||||
|
return &poll_mask;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Debug routine to print the polling statistics
|
* Debug routine to print the polling statistics
|
||||||
*
|
*
|
||||||
|
@ -34,14 +34,15 @@
|
|||||||
* Start a polling thread
|
* Start a polling thread
|
||||||
*
|
*
|
||||||
* @param entry The entry point to call
|
* @param entry The entry point to call
|
||||||
|
* @param arg The argument to pass the thread entry point
|
||||||
* @return The thread handle
|
* @return The thread handle
|
||||||
*/
|
*/
|
||||||
void *
|
void *
|
||||||
thread_start(void (*entry)())
|
thread_start(void (*entry)(void *), void *arg)
|
||||||
{
|
{
|
||||||
pthread_t thd;
|
pthread_t thd;
|
||||||
|
|
||||||
if (pthread_create(&thd, NULL, (void *(*)(void *))entry, NULL) != 0)
|
if (pthread_create(&thd, NULL, (void *(*)(void *))entry, arg) != 0)
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
*/
|
*/
|
||||||
#include <spinlock.h>
|
#include <spinlock.h>
|
||||||
#include <buffer.h>
|
#include <buffer.h>
|
||||||
|
#include <gwbitmask.h>
|
||||||
|
|
||||||
struct session;
|
struct session;
|
||||||
struct server;
|
struct server;
|
||||||
@ -87,6 +88,29 @@ typedef struct dcbstats {
|
|||||||
int n_buffered; /**< Number of buffered writes */
|
int n_buffered; /**< Number of buffered writes */
|
||||||
} DCBSTATS;
|
} DCBSTATS;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The data structure that is embedded witin a DCB and manages the complex memory
|
||||||
|
* management issues of a DCB.
|
||||||
|
*
|
||||||
|
* The DCB structures are used as the user data within the polling loop. This means that
|
||||||
|
* polling threads may aschronously wake up and access these structures. It is not possible
|
||||||
|
* to simple remove the DCB from the epoll system and then free the data, as every thread
|
||||||
|
* that is currently running an epoll call must wake up and re-issue the epoll_wait system
|
||||||
|
* call, the is the only way we can be sure that no polling thread is pending a wakeup or
|
||||||
|
* processing an event that will access the DCB.
|
||||||
|
*
|
||||||
|
* We solve this issue by making the dcb_free routine merely mark a DCB as a zombie and
|
||||||
|
* place it on a special zombie list. Before placing the DCB on the zombie list we create
|
||||||
|
* a bitmask with a bit set in it for each active polling thread. Each thread will call
|
||||||
|
* a routine to process the zombie list at the end of the polling loop. This routine
|
||||||
|
* will clear the bit value that corresponds to the calling thread. Once the bitmask
|
||||||
|
* is completely cleared the DCB can finally be freed and removed from the zombie list.
|
||||||
|
*/
|
||||||
|
typedef struct {
|
||||||
|
GWBITMASK bitmask; /**< The bitmask of threads */
|
||||||
|
struct dcb *next; /**< Next pointer for the zombie list */
|
||||||
|
} DCBMM;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Descriptor Control Block
|
* Descriptor Control Block
|
||||||
*
|
*
|
||||||
@ -114,6 +138,7 @@ typedef struct dcb {
|
|||||||
struct dcb *next; /**< Next DCB in the chain of allocated DCB's */
|
struct dcb *next; /**< Next DCB in the chain of allocated DCB's */
|
||||||
struct service *service; /**< The related service */
|
struct service *service; /**< The related service */
|
||||||
void *data; /**< Specific client data */
|
void *data; /**< Specific client data */
|
||||||
|
DCBMM memdata; /**< The data related to DCB memory management */
|
||||||
} DCB;
|
} DCB;
|
||||||
|
|
||||||
/* DCB states */
|
/* DCB states */
|
||||||
@ -124,10 +149,12 @@ typedef struct dcb {
|
|||||||
#define DCB_STATE_LISTENING 5 /**< The DCB is for a listening socket */
|
#define DCB_STATE_LISTENING 5 /**< The DCB is for a listening socket */
|
||||||
#define DCB_STATE_DISCONNECTED 6 /**< The socket is now closed */
|
#define DCB_STATE_DISCONNECTED 6 /**< The socket is now closed */
|
||||||
#define DCB_STATE_FREED 7 /**< Memory freed */
|
#define DCB_STATE_FREED 7 /**< Memory freed */
|
||||||
|
#define DCB_STATE_ZOMBIE 8 /**< DCB is no longer active, waiting to free it */
|
||||||
|
|
||||||
/* A few useful macros */
|
/* A few useful macros */
|
||||||
#define DCB_SESSION(x) (x)->session
|
#define DCB_SESSION(x) (x)->session
|
||||||
#define DCB_PROTOCOL(x, type) (type *)((x)->protocol)
|
#define DCB_PROTOCOL(x, type) (type *)((x)->protocol)
|
||||||
|
#define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE)
|
||||||
|
|
||||||
extern DCB *dcb_alloc(); /* Allocate a DCB */
|
extern DCB *dcb_alloc(); /* Allocate a DCB */
|
||||||
extern void dcb_free(DCB *); /* Free a DCB */
|
extern void dcb_free(DCB *); /* Free a DCB */
|
||||||
@ -136,6 +163,7 @@ extern int dcb_read(DCB *, GWBUF **); /* Generic read routine */
|
|||||||
extern int dcb_write(DCB *, GWBUF *); /* Generic write routine */
|
extern int dcb_write(DCB *, GWBUF *); /* Generic write routine */
|
||||||
extern int dcb_drain_writeq(DCB *); /* Generic write routine */
|
extern int dcb_drain_writeq(DCB *); /* Generic write routine */
|
||||||
extern void dcb_close(DCB *); /* Generic close functionality */
|
extern void dcb_close(DCB *); /* Generic close functionality */
|
||||||
|
extern void dcb_process_zombies(int); /* Process Zombies */
|
||||||
extern void printAllDCBs(); /* Debug to print all DCB in the system */
|
extern void printAllDCBs(); /* Debug to print all DCB in the system */
|
||||||
extern void printDCB(DCB *); /* Debug print routine */
|
extern void printDCB(DCB *); /* Debug print routine */
|
||||||
extern void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */
|
extern void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */
|
||||||
|
52
include/gwbitmask.h
Normal file
52
include/gwbitmask.h
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
#ifndef _GWBITMASK_H
|
||||||
|
#define _GWBITMASK_H
|
||||||
|
/*
|
||||||
|
* This file is distributed as part of the SkySQL Gateway. It is free
|
||||||
|
* software: you can redistribute it and/or modify it under the terms of the
|
||||||
|
* GNU General Public License as published by the Free Software Foundation,
|
||||||
|
* version 2.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||||
|
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||||
|
* details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License along with
|
||||||
|
* this program; if not, write to the Free Software Foundation, Inc., 51
|
||||||
|
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||||
|
*
|
||||||
|
* Copyright SkySQL Ab 2013
|
||||||
|
*/
|
||||||
|
#include <spinlock.h>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @file gwbitmask.h An implementation of an arbitarly long bitmask
|
||||||
|
*
|
||||||
|
* @verbatim
|
||||||
|
* Revision History
|
||||||
|
*
|
||||||
|
* Date Who Description
|
||||||
|
* 28/06/13 Mark Riddoch Initial implementation
|
||||||
|
*
|
||||||
|
* @endverbatim
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define BIT_LENGTH_INITIAL 32 /**< Initial number of bits in the bitmask */
|
||||||
|
#define BIT_LENGTH_INC 32 /**< Number of bits to add on each increment */
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The bitmask structure used to store an arbitary large bitmask
|
||||||
|
*/
|
||||||
|
typedef struct {
|
||||||
|
SPINLOCK lock; /**< Lock to protect the bitmask */
|
||||||
|
unsigned char *bits; /**< Pointer to the bits themselves */
|
||||||
|
unsigned int length; /**< The number of bits in the bitmask */
|
||||||
|
} GWBITMASK;
|
||||||
|
|
||||||
|
extern void bitmask_init(GWBITMASK *);
|
||||||
|
extern void bitmask_set(GWBITMASK *, int);
|
||||||
|
extern void bitmask_clear(GWBITMASK *, int);
|
||||||
|
extern int bitmask_isset(GWBITMASK *, int);
|
||||||
|
extern int bitmask_isallclear(GWBITMASK *);
|
||||||
|
extern void bitmask_copy(GWBITMASK *, GWBITMASK *);
|
||||||
|
#endif
|
@ -18,6 +18,7 @@
|
|||||||
* Copyright SkySQL Ab 2013
|
* Copyright SkySQL Ab 2013
|
||||||
*/
|
*/
|
||||||
#include <dcb.h>
|
#include <dcb.h>
|
||||||
|
#include <gwbitmask.h>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @file poll.h The poll related functionality
|
* @file poll.h The poll related functionality
|
||||||
@ -33,10 +34,11 @@
|
|||||||
#define MAX_EVENTS 1000
|
#define MAX_EVENTS 1000
|
||||||
#define EPOLL_TIMEOUT 1000 /**< The epoll timeout we use (milliseconds) */
|
#define EPOLL_TIMEOUT 1000 /**< The epoll timeout we use (milliseconds) */
|
||||||
|
|
||||||
extern void poll_init();
|
extern void poll_init();
|
||||||
extern int poll_add_dcb(DCB *);
|
extern int poll_add_dcb(DCB *);
|
||||||
extern int poll_remove_dcb(DCB *);
|
extern int poll_remove_dcb(DCB *);
|
||||||
extern void poll_waitevents();
|
extern void poll_waitevents(void *);
|
||||||
extern void poll_shutdown();
|
extern void poll_shutdown();
|
||||||
extern void dprintPollStats(DCB *);
|
extern GWBITMASK *poll_bitmask();
|
||||||
|
extern void dprintPollStats(DCB *);
|
||||||
#endif
|
#endif
|
||||||
|
@ -19,10 +19,19 @@
|
|||||||
*/
|
*/
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @file thread.h The gateway threading interface
|
||||||
|
*
|
||||||
|
* An encapsulation of the threading used by the gateway. This is designed to
|
||||||
|
* isolate the majority of the gateway code from th epthread library, enabling
|
||||||
|
* the gateway to be ported to a different threading package with the minimum
|
||||||
|
* of changes.
|
||||||
|
*/
|
||||||
|
|
||||||
#define THREAD pthread_t
|
#define THREAD pthread_t
|
||||||
#define THREAD_SHELF pthread_self
|
#define THREAD_SHELF pthread_self
|
||||||
|
|
||||||
extern void *thread_start(void (*entry)());
|
extern void *thread_start(void (*entry)(void *), void *arg);
|
||||||
extern void thread_wait(void *thd);
|
extern void thread_wait(void *thd);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -55,7 +55,7 @@ libdebugcli.so: $(DEBUGCLIOBJ)
|
|||||||
$(CC) $(LDFLAGS) $(DEBUGCLIOBJ) $(LIBS) -o $@
|
$(CC) $(LDFLAGS) $(DEBUGCLIOBJ) $(LIBS) -o $@
|
||||||
|
|
||||||
libreadwritesplit.so:
|
libreadwritesplit.so:
|
||||||
(cd readwritesplit; make; cp $@ ..)
|
# (cd readwritesplit; make; cp $@ ..)
|
||||||
|
|
||||||
.c.o:
|
.c.o:
|
||||||
$(CC) $(CFLAGS) $< -o $@
|
$(CC) $(CFLAGS) $< -o $@
|
||||||
|
@ -121,7 +121,7 @@ ModuleInit()
|
|||||||
ROUTER_OBJECT *
|
ROUTER_OBJECT *
|
||||||
GetModuleObject()
|
GetModuleObject()
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Returing test router module object.\n");
|
fprintf(stderr, "Returning test router module object.\n");
|
||||||
return &MyObject;
|
return &MyObject;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user