Merge branch 'release-1.0beta-refresh' into blr

This commit is contained in:
Mark Riddoch
2014-09-16 13:22:03 +01:00
191 changed files with 11142 additions and 1012 deletions

39
.gitignore vendored
View File

@ -1,4 +1,35 @@
server/core/tags
server/core/maxscale
server/core/maxkeys
server/core/maxpasswd
# Object files
*.o
*.ko
*.lo
# Libraries
*.lib
*.a
*.la
# Shared objects (inc. Windows DLLs)
*.dll
*.so
*.so.*
*.dylib
# Executables
*.exe
*.out
*.app
# log files (from testing etc.)
*.log
# "make depend" generated stuff
depend
depend.mk
# various auto-backup stuff
*~
*#
.#*
# Vi swap files
.*.swp

Binary file not shown.

Binary file not shown.

View File

@ -42,6 +42,7 @@ all:
(cd client; make)
clean:
echo '#define MAXSCALE_VERSION "'`cat $(ROOT_PATH)/VERSION`'"' > $(ROOT_PATH)/server/include/version.h
(cd log_manager; make clean)
(cd query_classifier; make clean)
(cd server; make clean)

2
README
View File

@ -182,7 +182,7 @@ on localhost:
* a master on port 3000, with server_id=2
* a slave on port 3001, server_id doesn't matter
* a slave on port 2002, server_id doesn't matter
* a slave on port 3002, server_id doesn't matter
On the master full privileges on the databases "test" and "FOO"
are needed, on the saves SELECT permissions on test.* should

View File

@ -12,7 +12,7 @@
#
# Set debug flags
#
DEBUG :=
DEBUG := ${MAXSCALE_DEBUG}
#
# Set build env
@ -22,7 +22,7 @@ UNIX := Y
#
# Set MaxScale branch directory
#
ROOT_PATH := $(HOME)/src/bazaar/tmp/maxscale
ROOT_PATH := $(HOME)/${MAXSCALE_SOURCE}
INC_PATH := $(HOME)/usr/include
#
@ -38,7 +38,7 @@ MYSQL_HEADERS := -I$(INC_PATH) -I$(MYSQL_ROOT)/ -I$(MYSQL_ROOT)/private/ -I$(MYS
#
# Set DYNLIB=Y if you want to link MaxScale with dynamic embedded lib
#
DYNLIB :=
DYNLIB := ${MAXSCALE_DYNLIB}
#
# Set path to Embedded MySQL Server
@ -61,3 +61,8 @@ PROFILE := N
# Build a binary that produces code coverage data
#
GCOV := N
# Build optional RabbitMQ filter
# Requires librabbitmq-devel
#
BUILD_RABBITMQ := N

2
client/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
# binaries generated here
maxadmin

View File

@ -20,6 +20,9 @@
# client program
# 18/06/14 Mark Riddoch Addition of conditional for histedit
include ../build_gateway.inc
include ../makefile.inc
ifeq ($(wildcard /usr/include/histedit.h), )
HISTLIB=
HISTFLAG=
@ -63,13 +66,14 @@ maxadmin: $(OBJ)
clean:
rm -f $(OBJ) maxadmin
$(DEL) $(OBJ) maxadmin
$(DEL) *.so
tags:
ctags $(SRCS) $(HDRS)
depend:
@rm -f depend.mk
@$(DEL) depend.mk
cc -M $(CFLAGS) $(SRCS) > depend.mk
install: maxadmin

View File

@ -58,6 +58,7 @@ static int authMaxScale(int so, char *user, char *password);
static int sendCommand(int so, char *cmd);
static void DoSource(int so, char *cmd);
static void DoUsage();
static int isquit(char *buf);
#ifdef HISTORY
static char *
@ -289,7 +290,7 @@ int argno = 0;
history(hist, &ev, H_ENTER, buf);
#endif
if (!strcasecmp(buf, "quit"))
if (isquit(buf))
{
break;
}
@ -552,3 +553,23 @@ DoUsage()
printf("Any remaining arguments are treated as MaxScale commands or a file\n");
printf("containing commands to execute.\n");
}
/**
* Check command to see if it is a quit command
*
* @param buf The command buffer
* @return Non-zero if the command should cause maxadmin to quit
*/
static int
isquit(char *buf)
{
char *ptr = buf;
if (!buf)
return 0;
while (*ptr && isspace(*ptr))
ptr++;
if (strncasecmp(ptr, "quit", 4) == 0 || strncasecmp(ptr, "exit", 4) == 0)
return 1;
return 0;
}

View File

@ -1,5 +0,0 @@
*.o
*.so
*.so.*
depend.mk

View File

@ -45,6 +45,12 @@
extern char *program_invocation_name;
extern char *program_invocation_short_name;
#if defined(SS_DEBUG)
static int write_index;
static int block_start_index;
static int prevval;
static simple_mutex_t msg_mutex;
#endif
/**
* Variable holding the enabled logfiles information.
* Used from log users to check enabled logs prior calling
@ -113,7 +119,7 @@ typedef struct blockbuf_st {
skygw_chk_t bb_chk_top;
#endif
logfile_id_t bb_fileid;
bool bb_isfull; /**< closed for disk write */
blockbuf_state_t bb_state; /**State of the block buffer*/
simple_mutex_t bb_mutex; /**< bb_buf_used, bb_isfull */
int bb_refcount; /**< protected by list mutex. #of clients */
// int bb_blankcount; /**< # of blanks used btw strings */
@ -255,11 +261,7 @@ static int logmanager_write_log(
static blockbuf_t* blockbuf_init(logfile_id_t id);
static void blockbuf_node_done(void* bb_data);
static char* blockbuf_get_writepos(
#if 0
int** refcount,
#else
blockbuf_t** p_bb,
#endif
logfile_id_t id,
size_t str_len,
bool flush);
@ -337,6 +339,10 @@ static bool logmanager_init_nomutex(
#if defined(SS_DEBUG)
lm->lm_chk_top = CHK_NUM_LOGMANAGER;
lm->lm_chk_tail = CHK_NUM_LOGMANAGER;
write_index = 0;
block_start_index = 0;
prevval = -1;
simple_mutex_init(&msg_mutex, "Message mutex");
#endif
lm->lm_clientmes = skygw_message_init();
lm->lm_logmes = skygw_message_init();
@ -653,31 +659,86 @@ static int logmanager_write_log(
int safe_str_len;
timestamp_len = get_timestamp_len();
safe_str_len = MIN(timestamp_len-1+str_len, lf->lf_buf_size);
/** Findout how much can be safely written with current block size */
if (timestamp_len-1+str_len > lf->lf_buf_size)
{
safe_str_len = lf->lf_buf_size;
}
else
{
safe_str_len = timestamp_len-1+str_len;
}
/**
* Seek write position and register to block buffer.
* Then print formatted string to write position.
*/
#if defined (SS_LOG_DEBUG)
{
char *copy,*tok;
int tokval;
simple_mutex_lock(&msg_mutex,true);
copy = strdup(str);
tok = strtok(copy,"|");
tok = strtok(NULL,"|");
if(strstr(str,"message|") && tok){
tokval = atoi(tok);
if(prevval > 0){
ss_dassert(tokval == (prevval + 1));
}
prevval = tokval;
}
free(copy);
simple_mutex_unlock(&msg_mutex);
}
#endif
wp = blockbuf_get_writepos(&bb,
id,
safe_str_len,
flush);
#if defined (SS_LOG_DEBUG)
{
sprintf(wp,"[msg:%d]",atomic_add(&write_index,1));
safe_str_len -= strlen(wp);
wp += strlen(wp);
}
#endif
/**
* Write timestamp with at most <timestamp_len> characters
* to wp.
* Returned timestamp_len doesn't include terminating null.
*/
timestamp_len = snprint_timestamp(wp, timestamp_len);
/**
* Write next string to overwrite terminating null character
* of the timestamp string.
*/
if (use_valist) {
vsnprintf(wp+timestamp_len, safe_str_len, str, valist);
vsnprintf(wp+timestamp_len, safe_str_len-timestamp_len, str, valist);
} else {
snprintf(wp+timestamp_len, safe_str_len, "%s", str);
snprintf(wp+timestamp_len, safe_str_len-timestamp_len, "%s", str);
}
/** write to syslog */
if (lf->lf_write_syslog)
{
@ -694,12 +755,7 @@ static int logmanager_write_log(
break;
}
}
/** remove double line feed */
if (wp[timestamp_len+str_len-2] == '\n') {
wp[timestamp_len+str_len-2]=' ';
}
wp[timestamp_len+str_len-1]='\n';
wp[safe_str_len-1] = '\n';
blockbuf_unregister(bb);
/**
@ -753,7 +809,7 @@ static int logmanager_write_log(
wp_c[timestamp_len-1+str_len-1]='\n';
/** lock-free unregistration, includes flush if
* bb_isfull */
* bb_state == BB_FULL */
blockbuf_unregister(bb_c);
}
} /* if (spread_down) */
@ -784,7 +840,7 @@ static void blockbuf_unregister(
/**
* if this is the last client in a full buffer, send write request.
*/
if (atomic_add(&bb->bb_refcount, -1) == 1 && bb->bb_isfull) {
if (atomic_add(&bb->bb_refcount, -1) == 1 && bb->bb_state == BB_FULL) {
skygw_message_send(lf->lf_logmes);
}
ss_dassert(bb->bb_refcount >= 0);
@ -816,12 +872,12 @@ static char* blockbuf_get_writepos(
size_t str_len,
bool flush)
{
logfile_t* lf;
mlist_t* bb_list;
char* pos = NULL;
mlist_node_t* node;
blockbuf_t* bb;
ss_debug(bool succp;)
logfile_t* lf;
mlist_t* bb_list;
char* pos = NULL;
mlist_node_t* node;
blockbuf_t* bb;
ss_debug(bool succp;)
CHK_LOGMANAGER(lm);
@ -838,6 +894,7 @@ static char* blockbuf_get_writepos(
* At least block buffer exists on the list.
*/
node = bb_list->mlist_first;
/** Loop over blockbuf list to find write position */
while (true) {
@ -852,22 +909,25 @@ static char* blockbuf_get_writepos(
/** Lock buffer */
simple_mutex_lock(&bb->bb_mutex, true);
if (bb->bb_isfull || bb->bb_buf_left < str_len) {
if (bb->bb_state == BB_FULL || bb->bb_buf_left < str_len) {
/**
* This block buffer is too full.
* Send flush request to file writer thread. This causes
* flushing all buffers, and (eventually) frees buffer space.
*/
blockbuf_register(bb);
bb->bb_isfull = true;
blockbuf_unregister(bb);
blockbuf_register(bb);
bb->bb_state = BB_FULL;
blockbuf_unregister(bb);
/** Unlock buffer */
simple_mutex_unlock(&bb->bb_mutex);
/** Lock list */
simple_mutex_lock(&bb_list->mlist_mutex, true);
/** Unlock buffer */
simple_mutex_unlock(&bb->bb_mutex);
/** Lock list */
simple_mutex_lock(&bb_list->mlist_mutex, true);
/**
* If next node exists move forward. Else check if there is
* space for a new block buffer on the list.
@ -916,7 +976,28 @@ static char* blockbuf_get_writepos(
node = bb_list->mlist_first;
continue;
}
} else {
}else if(bb->bb_state == BB_CLEARED){
/**
*Move the full buffer to the end of the list
*/
simple_mutex_unlock(&bb->bb_mutex);
simple_mutex_lock(&bb_list->mlist_mutex, true);
if(node->mlnode_next){
bb_list->mlist_first = node->mlnode_next;
bb_list->mlist_last->mlnode_next = node;
node->mlnode_next = NULL;
bb_list->mlist_last = node;
node = bb_list->mlist_first;
}
bb->bb_state = BB_READY;
}else if (bb->bb_state == BB_READY){
/**
* There is space for new log string.
*/
@ -924,9 +1005,11 @@ static char* blockbuf_get_writepos(
}
} /** while (true) */
} else {
/**
* Create the first block buffer to logfile's blockbuf list.
*/
bb = blockbuf_init(id);
CHK_BLOCKBUF(bb);
@ -952,7 +1035,7 @@ static char* blockbuf_get_writepos(
} /* if (bb_list->mlist_nodecount > 0) */
ss_dassert(pos == NULL);
ss_dassert(!(bb->bb_isfull || bb->bb_buf_left < str_len));
ss_dassert(!(bb->bb_state == BB_FULL || bb->bb_buf_left < str_len));
ss_dassert(bb_list->mlist_nodecount <= bb_list->mlist_nodecount_max);
/**
@ -991,7 +1074,7 @@ static char* blockbuf_get_writepos(
* If flush flag is set, set buffer full. As a consequence, no-one
* can write to it before it is flushed to disk.
*/
bb->bb_isfull = (flush == true ? true : bb->bb_isfull);
bb->bb_state = (flush == true ? BB_FULL : bb->bb_state);
/** Unlock buffer */
simple_mutex_unlock(&bb->bb_mutex);
@ -1021,6 +1104,12 @@ static blockbuf_t* blockbuf_init(
bb->bb_buf_left = MAX_LOGSTRLEN;
bb->bb_buf_size = MAX_LOGSTRLEN;
#if defined(SS_LOG_DEBUG)
sprintf(bb->bb_buf,"[block:%d]",atomic_add(&block_start_index,1));
bb->bb_buf_used += strlen(bb->bb_buf);
bb->bb_buf_left -= strlen(bb->bb_buf);
#endif
CHK_BLOCKBUF(bb);
return bb;
}
@ -1170,6 +1259,9 @@ int skygw_log_write_flush(
/**
* Find out the length of log string (to be formatted str).
*/
va_start(valist, str);
len = vsnprintf(NULL, 0, str, valist);
va_end(valist);
@ -1220,6 +1312,9 @@ int skygw_log_write(
err = 1;
goto return_unregister;
}
/**
* Find out the length of log string (to be formatted str).
*/
@ -1233,6 +1328,7 @@ int skygw_log_write(
/**
* Write log string to buffer and add to file write list.
*/
va_start(valist, str);
err = logmanager_write_log(id, false, true, true, len, str, valist);
va_end(valist);
@ -1507,7 +1603,7 @@ static bool fnames_conf_init(
ss_dfprintf(stderr, "%s ", argv[i]);
}
ss_dfprintf(stderr, "\n");*/
#if defined(NOT_USED)
fprintf(stderr,
"Error log :\t%s/%s1%s\n"
"Message log :\t%s/%s1%s\n"
@ -1525,7 +1621,7 @@ static bool fnames_conf_init(
fn->fn_logpath,
fn->fn_debug_prefix,
fn->fn_debug_suffix);
#endif
succp = true;
fn->fn_state = RUN;
CHK_FNAMES_CONF(fn);
@ -1996,8 +2092,18 @@ static bool logfile_init(
logfile->lf_full_link_name =
form_full_file_name(strparts,
logfile->lf_name_seqno,
2);
2);
fprintf(stderr, "%s\t: %s->%s\n",
STRLOGNAME(logfile_id),
logfile->lf_full_link_name,
logfile->lf_full_file_name);
}
else
{
fprintf(stderr, "%s\t: %s\n",
STRLOGNAME(logfile_id),
logfile->lf_full_file_name);
}
/**
* At least one of the files couldn't be created. Increase
* sequence number and retry until succeeds.
@ -2257,7 +2363,7 @@ static void filewriter_done(
* lists of each logfile object.
*
* Block buffer is written to log file if
* 1. bb_isfull == true,
* 1. bb_state == true,
* 2. logfile object's lf_flushflag == true, or
* 3. skygw_thread_must_exit returns true.
*
@ -2297,7 +2403,7 @@ static void* thr_filewriter_fun(
blockbuf_t* bb;
mlist_node_t* node;
int i;
bool flush_blockbuf; /**< flush single block buffer. */
blockbuf_state_t flush_blockbuf; /**< flush single block buffer. */
bool flush_logfile; /**< flush logfile */
bool flushall_logfiles;/**< flush all logfiles */
size_t vn1;
@ -2356,10 +2462,10 @@ static void* thr_filewriter_fun(
/** Lock block buffer */
simple_mutex_lock(&bb->bb_mutex, true);
flush_blockbuf = bb->bb_isfull;
flush_blockbuf = bb->bb_state;
if (bb->bb_buf_used != 0 &&
(flush_blockbuf ||
(flush_blockbuf == BB_FULL ||
flush_logfile ||
flushall_logfiles))
{
@ -2374,7 +2480,7 @@ static void* thr_filewriter_fun(
&bb->bb_mutex,
true);
}
skygw_file_write(file,
(void *)bb->bb_buf,
bb->bb_buf_used,
@ -2387,7 +2493,13 @@ static void* thr_filewriter_fun(
bb->bb_buf_left = bb->bb_buf_size;
bb->bb_buf_used = 0;
memset(bb->bb_buf, 0, bb->bb_buf_size);
bb->bb_isfull = false;
bb->bb_state = BB_CLEARED;
#if defined(SS_LOG_DEBUG)
sprintf(bb->bb_buf,"[block:%d]",atomic_add(&block_start_index,1));
bb->bb_buf_used += strlen(bb->bb_buf);
bb->bb_buf_left -= strlen(bb->bb_buf);
#endif
}
/** Release lock to block buffer */
simple_mutex_unlock(&bb->bb_mutex);

View File

@ -22,6 +22,12 @@ typedef struct logfile_st logfile_t;
typedef struct fnames_conf_st fnames_conf_t;
typedef struct logmanager_st logmanager_t;
typedef enum {
BB_READY = 0x00,
BB_FULL,
BB_CLEARED
} blockbuf_state_t;
typedef enum {
LOGFILE_ERROR = 1,
LOGFILE_FIRST = LOGFILE_ERROR,

View File

@ -8,6 +8,11 @@ SRCS := log_manager.cc
UTILS_PATH := $(ROOT_PATH)/utils
CUR_DIR := $(shell pwd)
ifeq ($(ADD_DEBUG_TAGS),Y)
CFLAGS += -DSS_LOG_DEBUG
endif
makeall: clean all
clean:
@ -44,7 +49,7 @@ install: liblink
install liblog_manager.so.1.0.1 liblog_manager.so $(DEST)/lib
depend:
@rm -f depend
@$(DEL) depend
$(CPP) -M $(CFLAGS) \
$(MYSQL_HEADERS) \
-I$(UTILS_PATH) -I./ \

2
log_manager/test/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
# binaries generated here
testlog

61
log_manager/test/logorder.sh Executable file
View File

@ -0,0 +1,61 @@
#! /bin/bash
if [[ $# -lt 4 ]]
then
echo "Usage: logorder.sh <iterations> <frequency of flushes> <message size>"
echo "To disable log flushing, use 0 for flush frequency"
exit
fi
rm *.log
#Create large messages
$PWD/testorder $1 $2 $3
TESTLOG=$4
MCOUNT=$1
BLOCKS=`cat skygw_err1.log |tr -s ' '|grep -o 'block:[[:digit:]]\+'|cut -d ':' -f 2`
MESSAGES=`cat skygw_err1.log |tr -s ' '|grep -o 'message|[[:digit:]]\+'|cut -d '|' -f 2`
prev=0
error=0
for i in $BLOCKS
do
if [[ $i -le $prev ]]
then
error=1
echo "block mismatch: $i was after $prev." >> $TESTLOG
fi
prev=$i
done
if [[ error -eq 0 ]]
then
echo "Block buffers were in order" >> $TESTLOG
else
echo "Error: block buffers were written in the wrong order" >> $TESTLOG
fi
prev=0
error=0
for i in $MESSAGES
do
if [[ $i -ne $(( prev + 1 )) ]]
then
error=1
echo "message mismatch: $i was after $prev." >> $TESTLOG
fi
prev=$i
done
if [[ error -eq 0 ]]
then
echo "Block buffer messages were in order" >> $TESTLOG
else
echo "Error: block buffer messages were written in the wrong order" >> $TESTLOG
fi

View File

@ -28,10 +28,13 @@ testall:
cleantests:
- $(DEL) *.o
- $(DEL) *.log
- $(DEL) testlog
- $(DEL) testorder
- $(DEL) *~
buildtests:
$(MAKE) -C $(LOG_MANAGER_PATH) ADD_DEBUG_TAGS=Y
$(CC) $(CFLAGS) \
-L$(LOG_MANAGER_PATH) \
-Wl,-rpath,$(DEST)/lib \
@ -39,8 +42,19 @@ buildtests:
-o testlog \
-I$(MARIADB_SRC_PATH)/include \
-I$(LOG_MANAGER_PATH) -I$(UTILS_PATH) testlog.c \
-llog_manager $(LDLIBS) \
-lstdc++ -llog_manager $(LDLIBS) \
$(UTILS_PATH)/skygw_utils.o
$(CC) $(CFLAGS) \
-L$(LOG_MANAGER_PATH) \
-Wl,-rpath,$(DEST)/lib \
-Wl,-rpath,$(LOG_MANAGER_PATH)/ \
-o testorder \
-I$(MARIADB_SRC_PATH)/include \
-I$(LOG_MANAGER_PATH) -I$(UTILS_PATH) testorder.c \
-lstdc++ -llog_manager $(LDLIBS) \
$(UTILS_PATH)/skygw_utils.o
runtests:
@ -61,6 +75,10 @@ runtests:
@echo "Use 16 threads" >> $(TESTLOG)
@echo "" >> $(TESTLOG)
@-$(LAUNCH_DEBUGGER) $(TESTAPP) "-t 16" 2>>$(TESTLOG)
@echo "" >> $(TEST_MAXSCALE_LOG)
@echo "Test Message Order" >> $(TEST_MAXSCALE_LOG)
@echo "" >> $(TEST_MAXSCALE_LOG)
./logorder.sh 500 0 500 $(TEST_MAXSCALE_LOG)
@echo "Log Manager PASSED" >> $(TESTLOG)
@echo "" >> $(TESTLOG)

View File

@ -0,0 +1,105 @@
/*
* This file is distributed as part of the SkySQL Gateway. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2013
*/
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <skygw_utils.h>
#include <log_manager.h>
int main(int argc, char** argv)
{
int iterations = 0, i, interval = 10;
int block_size;
int succp = 0, err = 0;
char cwd[1024];
char tmp[2048];
char *message;
char** optstr;
long msg_index = 1;
memset(cwd,0,1024);
if( argc <4){
fprintf(stderr,
"Log Manager Log Order Test\n"
"Writes an ascending number into the error log to determine if log writes are in order.\n"
"Usage:\t testorder <iterations> <frequency of log flushes> <size of message in bytes>\n");
return 1;
}
block_size = atoi(argv[3]);
if(block_size < 1){
fprintf(stderr,"Message size too small, must be at least 1 byte long.");
}
if(getcwd(cwd,sizeof(cwd)) == NULL ||
(optstr = (char**)malloc(sizeof(char*)*4)) == NULL ||
(message = (char*)malloc(sizeof(char)*block_size))== NULL){
fprintf(stderr,"Fatal Error, exiting...");
return 1;
}
memset(tmp,0,1024);
sprintf(tmp,"%s",cwd);
optstr[0] = strdup("log_manager");
optstr[1] = strdup("-j");
optstr[2] = strdup(tmp);
optstr[3] = NULL;
iterations = atoi(argv[1]);
interval = atoi(argv[2]);
succp = skygw_logmanager_init( 3, optstr);
ss_dassert(succp);
skygw_log_disable(LOGFILE_TRACE);
skygw_log_disable(LOGFILE_MESSAGE);
skygw_log_disable(LOGFILE_DEBUG);
for(i = 0;i<iterations;i++){
sprintf(message,"message|%ld",msg_index++);
memset(message + strlen(message),' ',block_size - strlen(message));
memset(message + block_size - 1,'\0',1);
if(interval > 0 && i % interval == 0){
err = skygw_log_write_flush(LOGFILE_ERROR, message);
}else{
err = skygw_log_write(LOGFILE_ERROR, message);
}
if(err){
fprintf(stderr,"Error: log_manager returned %d",err);
break;
}
usleep(100);
//printf("%s\n",message);
}
skygw_log_flush(LOGFILE_ERROR);
skygw_logmanager_done();
free(message);
free(optstr[0]);
free(optstr[1]);
free(optstr[2]);
free(optstr[3]);
free(optstr);
return 0;
}

View File

@ -16,9 +16,9 @@ Group: Development/Tools
#Requires:
%if 0%{?suse_version}
BuildRequires: gcc gcc-c++ ncurses-devel bison glibc-devel cmake libgcc_s1 perl make libtool libopenssl-devel libaio libaio-devel mariadb libedit-devel
BuildRequires: gcc gcc-c++ ncurses-devel bison glibc-devel cmake libgcc_s1 perl make libtool libopenssl-devel libaio libaio-devel mariadb libedit-devel librabbitmq-devel
%else
BuildRequires: gcc gcc-c++ ncurses-devel bison glibc-devel cmake libgcc perl make libtool openssl-devel libaio libaio-devel
BuildRequires: gcc gcc-c++ ncurses-devel bison glibc-devel cmake libgcc perl make libtool openssl-devel libaio libaio-devel librabbitmq-devel
%if 0%{?rhel} == 6
BuildRequires: libedit-devel
%endif

View File

@ -1,5 +0,0 @@
*.o
*.so
*.so.*
depend.mk

View File

@ -8,6 +8,8 @@ SRCS := query_classifier.cc
UTILS_PATH := $(ROOT_PATH)/utils
QUERY_CLASSIFIER_PATH := $(ROOT_PATH)/query_classifier
LOG_MANAGER_PATH := $(ROOT_PATH)/log_manager
SERVER_INC_PATH := $(ROOT_PATH)/server/include
MODULE_INC_PATH := $(ROOT_PATH)/server/modules/include
makeall: clean all
@ -32,8 +34,7 @@ runtests:
testall:
$(MAKE) -C test testall
utils:
$(MAKE) -C $(UTILS_PATH) clean all
@ -43,6 +44,9 @@ libcomp:
$(CPP) -c $(CFLAGS) \
$(MYSQL_HEADERS) \
-I$(LOG_MANAGER_PATH) \
-I$(SERVER_INC_PATH) \
-I$(MODULE_INC_PATH) \
-I$(UTILS_PATH) \
-I./ \
-fPIC ./query_classifier.cc -o query_classifier.o
@ -62,10 +66,13 @@ install: liblink
install ./libquery_classifier.so.1.0.1 ./libquery_classifier.so $(DEST)/lib
depend:
@rm -f depend
@$(DEL) depend
$(CPP) -M $(CFLAGS) \
$(MYSQL_HEADERS) \
-I$(LOG_MANAGER_PATH) \
-I$(SERVER_INC_PATH) \
-I$(MODULE_INC_PATH) \
-I$(UTILS_PATH) \
-I./ \
$(SRCS) > depend

View File

@ -34,6 +34,7 @@
#include "../utils/skygw_types.h"
#include "../utils/skygw_debug.h"
#include <log_manager.h>
#include <mysql_client_server_protocol.h>
#include <mysql.h>
#include <my_sys.h>
@ -83,122 +84,158 @@ static bool skygw_stmt_causes_implicit_commit(
static int is_autocommit_stmt(
LEX* lex);
/**
* @node (write brief function description here)
*
* Parameters:
* @param query_str - <usage>
* <description>
*
* @param client_flag - <usage>
* <description>
*
* @return
*
static void parsing_info_set_plain_str(void* ptr,
char* str);
/**
* Calls parser for the query includede in the buffer. Creates and adds parsing
* information to buffer if it doesn't exist already. Resolves the query type.
*
* @details (write detailed description here)
*
* @param querybuf buffer including the query and possibly the parsing information
*
* @return query type
*/
skygw_query_type_t skygw_query_classifier_get_type(
const char* query,
unsigned long client_flags,
MYSQL** p_mysql)
skygw_query_type_t query_classifier_get_type(
GWBUF* querybuf)
{
MYSQL* mysql;
char* query_str;
const char* user = "skygw";
const char* db = "skygw";
THD* thd;
MYSQL* mysql;
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
bool failp = FALSE;
ss_info_dassert(query != NULL, ("query_str is NULL"));
bool succp;
query_str = const_cast<char*>(query);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Query : \"%s\"", query_str)));
ss_info_dassert(querybuf != NULL, ("querybuf is NULL"));
/** Get server handle */
mysql = mysql_init(NULL);
/** Create parsing info for the query and store it to buffer */
succp = query_is_parsed(querybuf);
if (mysql == NULL) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : call to mysql_real_connect failed due %d, %s.",
mysql_errno(mysql),
mysql_error(mysql))));
if (!succp)
{
succp = parse_query(querybuf);
}
/** Read thd pointer and resolve the query type with it. */
if (succp)
{
parsing_info_t* pi;
mysql_library_end();
goto return_qtype;
}
pi = (parsing_info_t*)gwbuf_get_buffer_object_data(querybuf,
GWBUF_PARSING_INFO);
if (pi != NULL)
{
mysql = (MYSQL *)pi->pi_handle;
if (p_mysql != NULL)
{
*p_mysql = mysql;
/** Find out the query type */
if (mysql != NULL)
{
qtype = resolve_query_type((THD *)mysql->thd);
}
}
}
/** Set methods and authentication to mysql */
mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "libmysqld_skygw");
mysql_options(mysql, MYSQL_OPT_USE_EMBEDDED_CONNECTION, NULL);
mysql->methods = &embedded_methods;
mysql->user = my_strdup(user, MYF(0));
mysql->db = my_strdup(db, MYF(0));
mysql->passwd = NULL;
/** Get one or create new THD object to be use in parsing */
thd = get_or_create_thd_for_parsing(mysql, query_str);
if (thd == NULL)
{
skygw_query_classifier_free(mysql);
*p_mysql = NULL;
goto return_qtype;
}
/**
* Create parse_tree inside thd.
* thd and even lex are readable even if parser failed so let it
* continue despite failure.
*/
failp = create_parse_tree(thd);
qtype = resolve_query_type(thd);
if (p_mysql == NULL)
{
skygw_query_classifier_free(mysql);
}
return_qtype:
return qtype;
}
void skygw_query_classifier_free(
MYSQL* mysql)
/**
* Create parsing info and try to parse the query included in the query buffer.
* Store pointer to created parse_tree_t object to buffer.
*
* @param querybuf buffer including the query and possibly the parsing information
*
* @return true if succeed, false otherwise
*/
bool parse_query (
GWBUF* querybuf)
{
if (mysql->thd != NULL)
bool succp;
THD* thd;
uint8_t* data;
size_t len;
char* query_str;
parsing_info_t* pi;
CHK_GWBUF(querybuf);
/** Do not parse without releasing previous parse info first */
ss_dassert(!query_is_parsed(querybuf));
if (query_is_parsed(querybuf))
{
(*mysql->methods->free_embedded_thd)(mysql);
mysql->thd = NULL;
return false;
}
mysql_close(mysql);
mysql_thread_end();
}
/** Create parsing info */
pi = parsing_info_init(parsing_info_done);
if (pi == NULL)
{
succp = false;
goto retblock;
}
/** Extract query and copy it to different buffer */
data = (uint8_t*)GWBUF_DATA(querybuf);
len = MYSQL_GET_PACKET_LEN(data)-1; /*< distract 1 for packet type byte */
query_str = (char *)malloc(len+1);
if (query_str == NULL)
{
/** Free parsing info data */
parsing_info_done(pi);
succp = false;
goto retblock;
}
memcpy(query_str, &data[5], len);
memset(&query_str[len], 0, 1);
parsing_info_set_plain_str(pi, query_str);
/** Get one or create new THD object to be use in parsing */
thd = get_or_create_thd_for_parsing((MYSQL *)pi->pi_handle, query_str);
if (thd == NULL)
{
/** Free parsing info data */
parsing_info_done(pi);
succp = false;
goto retblock;
}
/**
* Create parse_tree inside thd.
* thd and lex are readable even if creating parse tree fails.
*/
create_parse_tree(thd);
/** Add complete parsing info struct to the query buffer */
gwbuf_add_buffer_object(querybuf,
GWBUF_PARSING_INFO,
(void *)pi,
parsing_info_done);
succp = true;
retblock:
return succp;
}
/**
* If buffer has non-NULL gwbuf_parsing_info it is parsed and it has parsing
* information included.
*
* @param buf buffer being examined
*
* @return true or false
*/
bool query_is_parsed(
GWBUF* buf)
{
CHK_GWBUF(buf);
return GWBUF_IS_PARSED(buf);
}
/**
* @node (write brief function description here)
/**
* Create a thread context, thd, init embedded server, connect to it, and allocate
* query to thd.
*
* Parameters:
* @param mysql - <usage>
* <description>
*
* @param query_str - <usage>
* <description>
*
* @return
*
* @param mysql Database handle
*
* @details (write detailed description here)
* @param query_str Query in plain txt string
*
* @return Thread context pointer
*
*/
static THD* get_or_create_thd_for_parsing(
@ -369,9 +406,9 @@ return_here:
* restrictive, for example, QUERY_TYPE_READ is smaller than QUERY_TYPE_WRITE.
*
*/
static u_int16_t set_query_type(
u_int16_t* qtype,
u_int16_t new_type)
static u_int32_t set_query_type(
u_int32_t* qtype,
u_int32_t new_type)
{
*qtype = MAX(*qtype, new_type);
return *qtype;
@ -397,7 +434,7 @@ static skygw_query_type_t resolve_query_type(
THD* thd)
{
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
u_int16_t type = QUERY_TYPE_UNKNOWN;
u_int32_t type = QUERY_TYPE_UNKNOWN;
int set_autocommit_stmt = -1; /*< -1 no, 0 disable, 1 enable */
LEX* lex;
Item* item;
@ -412,12 +449,12 @@ static skygw_query_type_t resolve_query_type(
ss_info_dassert(thd != NULL, ("thd is NULL\n"));
force_data_modify_op_replication = FALSE;
force_data_modify_op_replication = FALSE;
lex = thd->lex;
/** SELECT ..INTO variable|OUTFILE|DUMPFILE */
if (lex->result != NULL) {
type = QUERY_TYPE_SESSION_WRITE;
type = QUERY_TYPE_GSYSVAR_WRITE;
goto return_qtype;
}
@ -464,19 +501,51 @@ static skygw_query_type_t resolve_query_type(
type |= QUERY_TYPE_DISABLE_AUTOCOMMIT;
type |= QUERY_TYPE_BEGIN_TRX;
}
/**
* REVOKE ALL, ASSIGN_TO_KEYCACHE,
* PRELOAD_KEYS, FLUSH, RESET, CREATE|ALTER|DROP SERVER
*/
if (lex->option_type == OPT_GLOBAL)
{
type |= QUERY_TYPE_GLOBAL_WRITE;
goto return_qtype;
/**
* SHOW syntax http://dev.mysql.com/doc/refman/5.6/en/show.html
*/
if (lex->sql_command == SQLCOM_SHOW_VARIABLES)
{
type |= QUERY_TYPE_GSYSVAR_READ;
}
/**
* SET syntax http://dev.mysql.com/doc/refman/5.6/en/set-statement.html
*/
else if (lex->sql_command == SQLCOM_SET_OPTION)
{
type |= QUERY_TYPE_GSYSVAR_WRITE;
}
/**
* REVOKE ALL, ASSIGN_TO_KEYCACHE,
* PRELOAD_KEYS, FLUSH, RESET, CREATE|ALTER|DROP SERVER
*/
else
{
type |= QUERY_TYPE_GSYSVAR_WRITE;
}
goto return_qtype;
}
else if (lex->option_type == OPT_SESSION)
{
type |= QUERY_TYPE_SESSION_WRITE;
goto return_qtype;
/**
* SHOW syntax http://dev.mysql.com/doc/refman/5.6/en/show.html
*/
if (lex->sql_command == SQLCOM_SHOW_VARIABLES)
{
type |= QUERY_TYPE_SYSVAR_READ;
}
/**
* SET syntax http://dev.mysql.com/doc/refman/5.6/en/set-statement.html
*/
else if (lex->sql_command == SQLCOM_SET_OPTION)
{
/** Either user- or system variable write */
type |= QUERY_TYPE_GSYSVAR_WRITE;
}
goto return_qtype;
}
/**
* 1:ALTER TABLE, TRUNCATE, REPAIR, OPTIMIZE, ANALYZE, CHECK.
@ -493,23 +562,26 @@ static skygw_query_type_t resolve_query_type(
if (thd->variables.sql_log_bin == 0 &&
force_data_modify_op_replication)
{
/** Not replicated */
type |= QUERY_TYPE_SESSION_WRITE;
} else {
type |= QUERY_TYPE_WRITE;
}
else
{
/** Written to binlog, that is, replicated except tmp tables */
type |= QUERY_TYPE_WRITE; /*< to master */
if (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE &&
lex->sql_command == SQLCOM_CREATE_TABLE)
{
type |= QUERY_TYPE_CREATE_TMP_TABLE; /*< remember in router */
}
}
goto return_qtype;
}
/** Try to catch session modifications here */
switch (lex->sql_command) {
case SQLCOM_SET_OPTION: /*< SET commands. */
if (lex->option_type == OPT_GLOBAL)
{
type |= QUERY_TYPE_GLOBAL_WRITE;
break;
}
/**<! fall through */
/** fallthrough */
case SQLCOM_CHANGE_DB:
case SQLCOM_DEALLOCATE_PREPARE:
type |= QUERY_TYPE_SESSION_WRITE;
@ -546,15 +618,23 @@ static skygw_query_type_t resolve_query_type(
default:
break;
}
if (QTYPE_LESS_RESTRICTIVE_THAN_WRITE(type)) {
#if defined(UPDATE_VAR_SUPPORT)
if (QTYPE_LESS_RESTRICTIVE_THAN_WRITE(type))
#endif
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_UNKNOWN) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_LOCAL_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_READ))
{
/**
* These values won't change qtype more restrictive than write.
* UDFs and procedures could possibly cause session-wide write,
* but unless their content is replicated this is a limitation
* of this implementation.
* In other words : UDFs and procedures are not allowed to
* perform writes which are not replicated but nede to repeat
* perform writes which are not replicated but need to repeat
* in every node.
* It is not sure if such statements exist. vraa 25.10.13
*/
@ -575,7 +655,9 @@ static skygw_query_type_t resolve_query_type(
if (itype == Item::SUBSELECT_ITEM) {
continue;
} else if (itype == Item::FUNC_ITEM) {
}
else if (itype == Item::FUNC_ITEM)
{
int func_qtype = QUERY_TYPE_UNKNOWN;
/**
* Item types:
@ -635,7 +717,6 @@ static skygw_query_type_t resolve_query_type(
"%lu [resolve_query_type] "
"functype FUNC_SP, stored proc "
"or unknown function.",
"%s:%s",
pthread_self())));
break;
case Item_func::UDF_FUNC:
@ -648,7 +729,6 @@ static skygw_query_type_t resolve_query_type(
pthread_self())));
break;
case Item_func::NOW_FUNC:
case Item_func::GSYSVAR_FUNC:
func_qtype |= QUERY_TYPE_LOCAL_READ;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
@ -657,8 +737,51 @@ static skygw_query_type_t resolve_query_type(
"executed in MaxScale.",
pthread_self())));
break;
/** System session variable */
case Item_func::GSYSVAR_FUNC:
func_qtype |= QUERY_TYPE_SYSVAR_READ;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [resolve_query_type] "
"functype GSYSVAR_FUNC, system "
"variable read.",
pthread_self())));
break;
/** User-defined variable read */
case Item_func::GUSERVAR_FUNC:
func_qtype |= QUERY_TYPE_USERVAR_READ;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [resolve_query_type] "
"functype GUSERVAR_FUNC, user "
"variable read.",
pthread_self())));
break;
/** User-defined variable modification */
case Item_func::SUSERVAR_FUNC:
/**
* Really it is user variable but we
* don't separate sql variables atm.
* 15.9.14
*/
func_qtype |= QUERY_TYPE_GSYSVAR_WRITE;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [resolve_query_type] "
"functype SUSERVAR_FUNC, user "
"variable write.",
pthread_self())));
break;
case Item_func::UNKNOWN_FUNC:
func_qtype |= QUERY_TYPE_READ;
if (item->name != NULL &&
strcmp(item->name, "last_insert_id()") == 0)
{
func_qtype |= QUERY_TYPE_MASTER_READ;
}
else
{
func_qtype |= QUERY_TYPE_READ;
}
/**
* Many built-in functions are of this
* type, for example, rand(), soundex(),
@ -684,6 +807,7 @@ static skygw_query_type_t resolve_query_type(
/**< Set new query type */
type |= set_query_type(&type, func_qtype);
}
#if defined(UPDATE_VAR_SUPPORT)
/**
* Write is as restrictive as it gets due functions,
* so break.
@ -691,8 +815,9 @@ static skygw_query_type_t resolve_query_type(
if ((type & QUERY_TYPE_WRITE) == QUERY_TYPE_WRITE) {
break;
}
#endif
} /**< for */
} /**< if */
} /**< if */
return_qtype:
qtype = (skygw_query_type_t)type;
return qtype;
@ -816,3 +941,412 @@ char* skygw_query_classifier_get_stmtname(
return ((THD *)(mysql->thd))->lex->prepared_stmt_name.str;
}
/**
*Returns the LEX struct of the parsed GWBUF
*@param The parsed GWBUF
*@return Pointer to the LEX struct or NULL if an error occurred or the query was not parsed
*/
LEX* get_lex(GWBUF* querybuf)
{
parsing_info_t* pi;
MYSQL* mysql;
THD* thd;
if (!GWBUF_IS_PARSED(querybuf))
{
return NULL;
}
pi = (parsing_info_t *)gwbuf_get_buffer_object_data(querybuf,
GWBUF_PARSING_INFO);
if (pi == NULL)
{
return NULL;
}
if ((mysql = (MYSQL *)pi->pi_handle) == NULL ||
(thd = (THD *)mysql->thd) == NULL)
{
ss_dassert(mysql != NULL &&
thd != NULL);
return NULL;
}
return thd->lex;
}
/**
* Finds the head of the list of tables affected by the current select statement.
* @param thd Pointer to a valid THD
* @return Pointer to the head of the TABLE_LIST chain or NULL in case of an error
*/
void* skygw_get_affected_tables(void* lexptr)
{
LEX* lex = (LEX*)lexptr;
if(lex == NULL ||
lex->current_select == NULL)
{
ss_dassert(lex != NULL &&
lex->current_select != NULL);
return NULL;
}
return (void*)lex->current_select->table_list.first;
}
/**
* Reads the parsetree and lists all the affected tables and views in the query.
* In the case of an error, the size of the table is set to zero and no memory is allocated.
* The caller must free the allocated memory.
*
* @param querybuf GWBUF where the table names are extracted from
* @param tblsize Pointer where the number of tables is written
* @return Array of null-terminated strings with the table names
*/
char** skygw_get_table_names(GWBUF* querybuf,int* tblsize, bool fullnames)
{
LEX* lex;
TABLE_LIST* tbl;
int i = 0,
currtblsz = 0;
char **tables,
**tmp;
if((lex = get_lex(querybuf)) == NULL)
{
goto retblock;
}
lex->current_select = lex->all_selects_list;
while(lex->current_select){
tbl = (TABLE_LIST*)skygw_get_affected_tables(lex);
while (tbl)
{
if(i >= currtblsz){
tmp = (char**)malloc(sizeof(char*)*(currtblsz*2+1));
if(tmp){
if(currtblsz > 0){
int x;
for(x = 0;x<currtblsz;x++){
tmp[x] = tables[x];
}
free(tables);
}
tables = tmp;
currtblsz = currtblsz*2 + 1;
}
}
char *catnm = NULL;
if(fullnames)
{
if(tbl->db && strcmp(tbl->db,"skygw_virtual") != 0)
{
catnm = (char*)calloc(strlen(tbl->db) + strlen(tbl->table_name) + 2,sizeof(char));
strcpy(catnm,tbl->db);
strcat(catnm,".");
strcat(catnm,tbl->table_name);
}
}
if(catnm)
{
tables[i++] = catnm;
}
else
{
tables[i++] = strdup(tbl->table_name);
}
tbl=tbl->next_local;
}
lex->current_select = lex->current_select->next_select_in_list();
}
retblock:
*tblsize = i;
return tables;
}
/**
* Extract, allocate memory and copy the name of the created table.
* @param querybuf Buffer to use.
* @return A pointer to the name if a table was created, otherwise NULL
*/
char* skygw_get_created_table_name(GWBUF* querybuf)
{
LEX* lex;
if((lex = get_lex(querybuf)) == NULL)
{
return NULL;
}
if(lex->create_last_non_select_table &&
lex->create_last_non_select_table->table_name){
char* name = strdup(lex->create_last_non_select_table->table_name);
return name;
}else{
return NULL;
}
}
/**
* Checks whether the query is a "real" query ie. SELECT,UPDATE,INSERT,DELETE or any variation of these.
* Queries that affect the underlying database are not considered as real queries and the queries that target
* specific row or variable data are regarded as the real queries.
* @param GWBUF to analyze
* @return true if the query is a real query, otherwise false
*/
bool skygw_is_real_query(GWBUF* querybuf)
{
LEX* lex = get_lex(querybuf);
if(lex){
switch(lex->sql_command){
case SQLCOM_SELECT:
return lex->all_selects_list->table_list.elements > 0;
case SQLCOM_UPDATE:
case SQLCOM_INSERT:
case SQLCOM_INSERT_SELECT:
case SQLCOM_DELETE:
case SQLCOM_TRUNCATE:
case SQLCOM_REPLACE:
case SQLCOM_REPLACE_SELECT:
case SQLCOM_PREPARE:
case SQLCOM_EXECUTE:
return true;
default:
return false;
}
}
return false;
}
/**
* Checks whether the buffer contains a DROP TABLE... query.
* @param querybuf Buffer to inspect
* @return true if it contains the query otherwise false
*/
bool is_drop_table_query(GWBUF* querybuf)
{
LEX* lex;
return (lex = get_lex(querybuf)) != NULL &&
lex->sql_command == SQLCOM_DROP_TABLE;
}
/*
* Replace user-provided literals with question marks. Return a copy of the
* querystr with replacements.
*
* @param querybuf GWBUF buffer including necessary parsing info
*
* @return Copy of querystr where literals are replaces with question marks or
* NULL if querystr is NULL, thread context or lex are NULL or if replacement
* function fails.
*
* Replaced literal types are STRING_ITEM,INT_ITEM,DECIMAL_ITEM,REAL_ITEM,
* VARBIN_ITEM,NULL_ITEM
*/
char* skygw_get_canonical(
GWBUF* querybuf)
{
parsing_info_t* pi;
MYSQL* mysql;
THD* thd;
LEX* lex;
Item* item;
char* querystr;
if (!GWBUF_IS_PARSED(querybuf))
{
querystr = NULL;
goto retblock;
}
pi = (parsing_info_t *)gwbuf_get_buffer_object_data(querybuf,
GWBUF_PARSING_INFO);
CHK_PARSING_INFO(pi);
if (pi == NULL)
{
querystr = NULL;
goto retblock;
}
if (pi->pi_query_plain_str == NULL ||
(mysql = (MYSQL *)pi->pi_handle) == NULL ||
(thd = (THD *)mysql->thd) == NULL ||
(lex = thd->lex) == NULL)
{
ss_dassert(pi->pi_query_plain_str != NULL &&
mysql != NULL &&
thd != NULL &&
lex != NULL);
querystr = NULL;
goto retblock;
}
querystr = strdup(pi->pi_query_plain_str);
for (item=thd->free_list; item != NULL; item=item->next)
{
Item::Type itype;
if (item->name == NULL)
{
continue;
}
itype = item->type();
if (itype == Item::STRING_ITEM)
{
String tokenstr;
String* res = item->val_str_ascii(&tokenstr);
if (res->is_empty()) /*< empty string */
{
querystr = replace_literal(querystr, "\"\"", "\"?\"");
}
else
{
querystr = replace_literal(querystr, res->ptr(), "?");
}
}
else if (itype == Item::INT_ITEM ||
itype == Item::DECIMAL_ITEM ||
itype == Item::REAL_ITEM ||
itype == Item::VARBIN_ITEM ||
itype == Item::NULL_ITEM)
{
querystr = replace_literal(querystr, item->name, "?");
}
} /*< for */
retblock:
return querystr;
}
/**
* Create parsing information; initialize mysql handle, allocate parsing info
* struct and set handle and free function pointer to it.
*
* @param donefun pointer to free function
*
* @return pointer to parsing information
*/
parsing_info_t* parsing_info_init(
void (*donefun)(void *))
{
parsing_info_t* pi = NULL;
MYSQL* mysql;
const char* user = "skygw";
const char* db = "skygw";
ss_dassert(donefun != NULL);
/** Get server handle */
mysql = mysql_init(NULL);
ss_dassert(mysql != NULL);
if (mysql == NULL) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : call to mysql_real_connect failed due %d, %s.",
mysql_errno(mysql),
mysql_error(mysql))));
goto retblock;
}
/** Set methods and authentication to mysql */
mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "libmysqld_skygw");
mysql_options(mysql, MYSQL_OPT_USE_EMBEDDED_CONNECTION, NULL);
mysql->methods = &embedded_methods;
mysql->user = my_strdup(user, MYF(0));
mysql->db = my_strdup(db, MYF(0));
mysql->passwd = NULL;
pi = (parsing_info_t*)calloc(1, sizeof(parsing_info_t));
if (pi == NULL)
{
mysql_close(mysql);
goto retblock;
}
#if defined(SS_DEBUG)
pi->pi_chk_top = CHK_NUM_PINFO;
pi->pi_chk_tail = CHK_NUM_PINFO;
#endif
/** Set handle and free function to parsing info struct */
pi->pi_handle = mysql;
pi->pi_done_fp = donefun;
retblock:
return pi;
}
/**
* Free function for parsing info. Called by gwbuf_free or in case initialization
* of parsing information fails.
*
* @param ptr Pointer to parsing information, cast required
*
* @return void
*
*/
void parsing_info_done(
void* ptr)
{
parsing_info_t* pi = (parsing_info_t *)ptr;
if (pi->pi_handle != NULL)
{
MYSQL* mysql = (MYSQL *)pi->pi_handle;
if (mysql->thd != NULL)
{
(*mysql->methods->free_embedded_thd)(mysql);
mysql->thd = NULL;
}
mysql_close(mysql);
}
/** Free plain text query string */
if (pi->pi_query_plain_str != NULL)
{
free(pi->pi_query_plain_str);
}
free(pi);
}
/**
* Add plain text query string to parsing info.
*
* @param ptr Pointer to parsing info struct, cast required
* @param str String to be added
*
* @return void
*/
static void parsing_info_set_plain_str(
void* ptr,
char* str)
{
parsing_info_t* pi = (parsing_info_t *)ptr;
CHK_PARSING_INFO(pi);
pi->pi_query_plain_str = str;
}

View File

@ -20,7 +20,8 @@ Copyright SkySQL Ab
/** getpid */
#include <unistd.h>
#include <mysql.h>
#include "../utils/skygw_utils.h"
#include <skygw_utils.h>
#include <buffer.h>
EXTERN_C_BLOCK_BEGIN
@ -30,36 +31,67 @@ EXTERN_C_BLOCK_BEGIN
* is modified
*/
typedef enum {
QUERY_TYPE_UNKNOWN = 0x0000, /*< Initial value, can't be tested bitwisely */
QUERY_TYPE_LOCAL_READ = 0x0001, /*< Read non-database data, execute in MaxScale */
QUERY_TYPE_READ = 0x0002, /*< No updates */
QUERY_TYPE_WRITE = 0x0004, /*< Master data will be modified */
QUERY_TYPE_SESSION_WRITE = 0x0008, /*< Session data will be modified */
QUERY_TYPE_GLOBAL_WRITE = 0x0010, /*< Global system variable modification */
QUERY_TYPE_BEGIN_TRX = 0x0020, /*< BEGIN or START TRANSACTION */
QUERY_TYPE_ENABLE_AUTOCOMMIT = 0x0040, /*< SET autocommit=1 */
QUERY_TYPE_DISABLE_AUTOCOMMIT = 0x0080, /*< SET autocommit=0 */
QUERY_TYPE_ROLLBACK = 0x0100, /*< ROLLBACK */
QUERY_TYPE_COMMIT = 0x0200, /*< COMMIT */
QUERY_TYPE_PREPARE_NAMED_STMT = 0x0400, /*< Prepared stmt with name from user */
QUERY_TYPE_PREPARE_STMT = 0x0800, /*< Prepared stmt with id provided by server */
QUERY_TYPE_EXEC_STMT = 0x1000 /*< Execute prepared statement */
QUERY_TYPE_UNKNOWN = 0x000000, /*< Initial value, can't be tested bitwisely */
QUERY_TYPE_LOCAL_READ = 0x000001, /*< Read non-database data, execute in MaxScale:any */
QUERY_TYPE_READ = 0x000002, /*< Read database data:any */
QUERY_TYPE_WRITE = 0x000004, /*< Master data will be modified:master */
QUERY_TYPE_MASTER_READ = 0x000008, /*< Read from the master:master */
QUERY_TYPE_SESSION_WRITE = 0x000010, /*< Session data will be modified:master or all */
/** Not implemented yet */
// QUERY_TYPE_USERVAR_WRITE = 0x000020, /*< Write a user variable:master or all */
QUERY_TYPE_USERVAR_READ = 0x000040, /*< Read a user variable:master or any */
QUERY_TYPE_SYSVAR_READ = 0x000080, /*< Read a system variable:master or any */
/** Not implemented yet */
// QUERY_TYPE_SYSVAR_WRITE = 0x000100, /*< Write a system variable:master or all */
QUERY_TYPE_GSYSVAR_READ = 0x000200, /*< Read global system variable:master or any */
QUERY_TYPE_GSYSVAR_WRITE = 0x000400, /*< Write global system variable:master or all */
QUERY_TYPE_BEGIN_TRX = 0x000800, /*< BEGIN or START TRANSACTION */
QUERY_TYPE_ENABLE_AUTOCOMMIT = 0x001000, /*< SET autocommit=1 */
QUERY_TYPE_DISABLE_AUTOCOMMIT = 0x002000, /*< SET autocommit=0 */
QUERY_TYPE_ROLLBACK = 0x004000, /*< ROLLBACK */
QUERY_TYPE_COMMIT = 0x008000, /*< COMMIT */
QUERY_TYPE_PREPARE_NAMED_STMT = 0x010000, /*< Prepared stmt with name from user:all */
QUERY_TYPE_PREPARE_STMT = 0x020000, /*< Prepared stmt with id provided by server:all */
QUERY_TYPE_EXEC_STMT = 0x040000, /*< Execute prepared statement:master or any */
QUERY_TYPE_CREATE_TMP_TABLE = 0x080000, /*< Create temporary table:master (could be all) */
QUERY_TYPE_READ_TMP_TABLE = 0x100000 /*< Read temporary table:master (could be any) */
} skygw_query_type_t;
typedef struct parsing_info_st {
#if defined(SS_DEBUG)
skygw_chk_t pi_chk_top;
#endif
void* pi_handle; /*< parsing info object pointer */
char* pi_query_plain_str; /*< query as plain string */
void (*pi_done_fp)(void *); /*< clean-up function for parsing info */
#if defined(SS_DEBUG)
skygw_chk_t pi_chk_tail;
#endif
} parsing_info_t;
#define QUERY_IS_TYPE(mask,type) ((mask & type) == type)
/**
* Create THD and use it for creating parse tree. Examine parse tree and
* classify the query.
*/
skygw_query_type_t skygw_query_classifier_get_type(
const char* query_str,
unsigned long client_flags,
MYSQL** mysql);
skygw_query_type_t query_classifier_get_type(GWBUF* querybuf);
/** Free THD context and close MYSQL */
void skygw_query_classifier_free(MYSQL* mysql);
char* skygw_query_classifier_get_stmtname(MYSQL* mysql);
char* skygw_query_classifier_get_stmtname(MYSQL* mysql);
char* skygw_get_created_table_name(GWBUF* querybuf);
bool is_drop_table_query(GWBUF* querybuf);
bool skygw_is_real_query(GWBUF* querybuf);
void* skygw_get_affected_tables(void* lexptr);
char** skygw_get_table_names(GWBUF* querybuf,int* tblsize,bool fullnames);
char* skygw_get_canonical(GWBUF* querybuf);
bool parse_query (GWBUF* querybuf);
parsing_info_t* parsing_info_init(void (*donefun)(void *));
void parsing_info_done(void* ptr);
bool query_is_parsed(GWBUF* buf);
EXTERN_C_BLOCK_END

2
query_classifier/test/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
# binaries generated here
testmain

View File

@ -0,0 +1,62 @@
# cleantests - clean local and subdirectories' tests
# buildtests - build all local and subdirectories' tests
# runtests - run all local tests
# testall - clean, build and run local and subdirectories' tests
include ../../../build_gateway.inc
include ../../../makefile.inc
include ../../../test.inc
CC = gcc
CPP = g++
TESTPATH := $(shell pwd)
TESTLOG := $(TESTPATH)/testqclass.log
QUERY_CLASSIFIER_PATH := $(ROOT_PATH)/query_classifier
LOG_MANAGER_PATH := $(ROOT_PATH)/log_manager
UTILS_PATH := $(ROOT_PATH)/utils
CORE_PATH := $(ROOT_PATH)/server/core
TESTAPP = $(TESTPATH)/canonizer
LDFLAGS=-L$(QUERY_CLASSIFIER_PATH) \
-L$(LOG_MANAGER_PATH) \
-L$(EMBEDDED_LIB) \
-Wl,-rpath,$(DEST)/lib \
-Wl,-rpath,$(EMBEDDED_LIB) \
-Wl,-rpath,$(LOG_MANAGER_PATH) \
-Wl,-rpath,$(QUERY_CLASSIFIER_PATH)
LIBS=-lstdc++ -lpthread -lquery_classifier -lz -ldl -lssl -laio -lcrypt -lcrypto -lrt -lm \
-llog_manager $(UTILS_PATH)/skygw_utils.o $(CORE_PATH)/buffer.o $(CORE_PATH)/atomic.o $(CORE_PATH)/spinlock.o $(CORE_PATH)/hint.o
CFLAGS=-g $(MYSQL_HEADERS) \
-I$(QUERY_CLASSIFIER_PATH) \
$(MYSQL_HEADERS) \
-I$(ROOT_PATH)/server/include \
-I$(UTILS_PATH)
testall:
$(MAKE) cleantests
$(MAKE) buildtests
$(MAKE) runtests
cleantests:
- $(DEL) *.o
- $(DEL) *~
- $(DEL) canonizer
- $(DEL) aria_log*
- $(DEL) ib*
buildtests: $(OBJS)
cp $(ERRMSG)/errmsg.sys .
$(CC) $(CFLAGS) $(LDFLAGS) $(LIBS) canonizer.c -o $(TESTAPP) $(LDLIBS) $(LDMYSQL)
runtests:
@echo "" > $(TESTLOG)
@echo "-------------------------------" >> $(TESTLOG)
@echo $(shell date) >> $(TESTLOG)
@echo "Canonical Query Tests" >> $(TESTLOG)
@echo "-------------------------------" >> $(TESTLOG)
@echo "" >> $(TESTLOG)
./canontest.sh $(TESTLOG) input.sql output.sql expected.sql

View File

@ -0,0 +1,120 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <query_classifier.h>
#include <buffer.h>
#include <mysql.h>
static char* server_options[] = {
"SkySQL Gateway",
"--datadir=./",
"--language=./",
"--skip-innodb",
"--default-storage-engine=myisam",
NULL
};
const int num_elements = (sizeof(server_options) / sizeof(char *)) - 1;
static char* server_groups[] = {
"embedded",
"server",
"server",
NULL
};
int main(int argc, char** argv)
{
int fdin,fdout,i=0,fnamelen,fsz,lines = 0;
unsigned int psize;
GWBUF** qbuff;
char *qin, *outnm, *buffer, *tok;
if(argc != 3){
printf("Usage: canonizer <input file> <output file>\n");
return 1;
}
bool failed = mysql_library_init(num_elements, server_options, server_groups);
if(failed){
printf("Embedded server init failed.\n");
return 1;
}
fnamelen = strlen(argv[1]) + 16;
fdin = open(argv[1],O_RDONLY);
fsz = lseek(fdin,0,SEEK_END);
lseek(fdin,0,SEEK_SET);
if(!(buffer = malloc(sizeof(char)*fsz))){
printf("Error: Failed to allocate memory.");
return 1;
}
read(fdin,buffer,fsz);
i = 0;
int bsz = 4,z=0;
qbuff = calloc(bsz,sizeof(GWBUF*));
tok = strtok(buffer,"\n");
while(tok){
if(i>=bsz){
GWBUF** tmp = calloc(bsz*2,sizeof(GWBUF*));
if(!tmp){
printf("Error: Failed to allocate memory.");
return 1;
}
for(z=0;z<bsz;z++){
tmp[z] = qbuff[z];
}
free(qbuff);
qbuff = tmp;
bsz *= 2;
}
if(strlen(tok) > 0){
qin = strdup(tok);
psize = strlen(qin);
qbuff[i] = gwbuf_alloc(psize + 6);
*(qbuff[i]->sbuf->data + 0) = (unsigned char)psize;
*(qbuff[i]->sbuf->data + 1) = (unsigned char)(psize>>8);
*(qbuff[i]->sbuf->data + 2) = (unsigned char)(psize>>16);
*(qbuff[i]->sbuf->data + 4) = 0x03;
memcpy(qbuff[i]->sbuf->data + 5,qin,psize);
*(qbuff[i]->sbuf->data + 5 + psize) = 0x00;
tok = strtok(NULL,"\n\0");
free(qin);
i++;
}
}
fdout = open(argv[2],O_TRUNC|O_CREAT|O_WRONLY,S_IRWXU|S_IXGRP|S_IXOTH);
for(i = 0;i<bsz;i++){
if(qbuff[i]){
parse_query(qbuff[i]);
tok = skygw_get_canonical(qbuff[i]);
write(fdout,tok,strlen(tok));
write(fdout,"\n",1);
gwbuf_free(qbuff[i]);
}
}
free(qbuff);
free(buffer);
close(fdin);
close(fdout);
return 0;
}

View File

@ -0,0 +1,21 @@
#! /bin/sh
if [[ $# -ne 4 ]]
then
echo "Usage: canontest.sh <logfile name> <input file> <output file> <expected output>"
exit 0
fi
TESTLOG=$1
INPUT=$2
OUTPUT=$3
EXPECTED=$4
DIFFLOG=diff.out
$PWD/canonizer $INPUT $OUTPUT
diff $OUTPUT $EXPECTED > $DIFFLOG
if [ $? -eq 0 ]
then
echo "PASSED" >> $TESTLOG
else
echo "FAILED" >> $TESTLOG
echo "Diff output: " >> $TESTLOG
cat $DIFFLOG >> $TESTLOG
fi

View File

@ -0,0 +1,17 @@
select md5(?) =?, sleep(?), rand(?);
select * from my1 where md5(?) =?;
select md5(?) =?;
select * from my1 where md5(?) =?;
select sleep(?)
select * from tst where lname='?'
select ?,?,?,?,?,? from tst
select * from tst where fname like '?'
select * from tst where lname like '?' order by fname
insert into tst values ("?","?"),("?",?),("?","?")
drop table if exists tst
create table tst(fname varchar(30), lname varchar(30))
update tst set lname="?" where fname like '?' or lname like '?'
delete from tst where lname like '?' and fname like '?'
select ? from tst where fname='?' or lname like '?'
select ?,?,?,? from tst where name='?' or name='?' or name='?'
select count(?),count(?),count(?),count(?),count (?),count(?) from tst

View File

@ -0,0 +1,17 @@
select md5("200000foo") =10, sleep(2), rand(100);
select * from my1 where md5("110") =10;
select md5("100foo") =10;
select * from my1 where md5("100") =10;
select sleep(2);
select * from tst where lname='Doe';
select 1,2,3,4,5,6 from tst;
select * from tst where fname like '%a%';
select * from tst where lname like '%e%' order by fname;
insert into tst values ("John","Doe"),("Plato",null),("Nietzsche","");
drop table if exists tst;
create table tst(fname varchar(30), lname varchar(30));
update tst set lname="Human" where fname like '%a%' or lname like '%a%';
delete from tst where lname like '%man%' and fname like '%ard%';
select 100 from tst where fname='10' or lname like '%100%';
select 1,20,300,4000 from tst where name='1000' or name='200' or name='30' or name='4';
select count(1),count(10),count(100),count(2),count (20),count(200) from tst;

View File

@ -18,7 +18,7 @@ UTILS_PATH := $(ROOT_PATH)/utils
TESTAPP = $(TESTPATH)/testmain
testall:buildtests
$(MAKE) -C canonical_tests testall
testalllaters:
$(MAKE) cleantests
$(MAKE) DEBUG=Y DYNLIB=Y buildtests
@ -80,4 +80,4 @@ ifeq ($?, 0)
else
@echo "Query Classifier FAILED" >> $(TESTLOG)
endif
@cat $(TESTLOG) >> $(TEST_MAXSCALE_LOG)
@cat $(TESTLOG) >> $(TEST_MAXSCALE_LOG)

View File

@ -0,0 +1,44 @@
cmake_minimum_required (VERSION 2.6)
set(CMAKE_LIBRARY_PATH ${CMAKE_LIBRARY_PATH} /usr/lib /usr/lib64 /usr/local/lib /usr/local/lib64 /usr/lib/mariadb /usr/lib64/mariadb)
set(CMAKE_INCLUDE_PATH ${CMAKE_INCLUDE_PATH} /usr/include /usr/local/include /usr/include/mysql /usr/local/include/mysql /usr/include/mariadb /usr/local/include/mariadb)
include(InstallRequiredSystemLibraries)
project (consumer)
find_path(MYSQL_INCLUDE_DIRS mysql.h)
find_library(MYSQL_LIBRARIES NAMES mysqlclient)
find_library(RABBITMQ_C_LIBRARIES NAMES rabbitmq)
include_directories(${MYSQL_INCLUDE_DIRS})
include_directories(${RABBITMQ_C_INCLUDE_DIRS})
include_directories(${CMAKE_SOURCE_DIR}/inih)
add_subdirectory (inih)
link_directories(${CMAKE_SOURCE_DIR}/inih)
if(RABBITMQ_C_LIBRARIES AND MYSQL_LIBRARIES AND MYSQL_INCLUDE_DIRS)
add_executable (consumer consumer.c ${MYSQL_LIBRARIES} ${RABBITMQ_C_LIBRARIES})
target_link_libraries(consumer mysqlclient)
target_link_libraries(consumer rabbitmq)
target_link_libraries(consumer inih)
install(TARGETS consumer DESTINATION bin)
install(FILES consumer.cnf DESTINATION share/consumer)
else(RABBITMQ_C_LIBRARIES AND MYSQL_LIBRARIES AND MYSQL_INCLUDE_DIRS)
message(FATAL_ERROR "Error: Can not find requred libraries: libmysqld, librabbitmq.")
endif(RABBITMQ_C_LIBRARIES AND MYSQL_LIBRARIES AND MYSQL_INCLUDE_DIRS)
set(CPACK_PACKAGE_DESCRIPTION_SUMMARY "RabbitMQ Consumer Client")
set(CPACK_PACKAGE_NAME "RabbitMQ Consumer")
set(CPACK_GENERATOR "RPM")
set(CPACK_PACKAGE_VERSION_MAJOR "1")
set(CPACK_PACKAGE_VERSION_MINOR "0")
set(CPACK_RPM_PACKAGE_NAME "rabbitmq-consumer")
set(CPACK_RPM_PACKAGE_VENDOR "SkySQL Ab")
set(CPACK_RPM_PACKAGE_AUTOREQPROV " no")
include(CPack)

View File

@ -0,0 +1,15 @@
include buildconfig.inc
CC=gcc
CFLAGS=-c -Wall -g -Iinih $(INCLUDE_DIRS)
LDFLAGS= $(LIBRARY_DIRS) -lrabbitmq -lmysqlclient
SRCS= inih/ini.c consumer.c
OBJ=$(SRCS:.c=.o)
all:$(OBJ)
$(CC) $(LDFLAGS) $(OBJ) -o consumer `mysql_config --cflags --libs`
%.o:%.c
$(CC) $(CFLAGS) $< -o $@
clean:
-rm *.o
-rm *~

39
rabbitmq_consumer/README Normal file
View File

@ -0,0 +1,39 @@
This program requires the librabbitmq and libmysqlclient libraries.
librabbitmq-c - https://github.com/alanxz/rabbitmq-c
MariaDB Client Library for C 2.0 Series - https://mariadb.com/kb/en/mariadb/client-libraries/client-library-for-c/
Building with CMake:
'cmake .'
Variables to pass for CMake:
Path to headers -DCMAKE_INCLUDE_PATH=<path to headers>
Path to libraries -DCMAKE_LIBRARY_PATH=<path to libraries>
Install prefix -DCMAKE_INSTALL_PREFIX=<prefix>
Separate multiple folders with colons, for example:
'path1:path2:path3'
After running CMake run 'make' to build the binaries and 'make package' to build RPMs.
To build without CMake, use the provided makefile and update the
include and library directories 'in buildvars.inc'
The configuration for the consumer client are red from 'consumer.cnf'.
Options for the configuration file:
hostname Hostname of the RabbitMQ server
port Port of the RabbitMQ server
vhost Virtual host location of the RabbitMQ server
user Username for the RabbitMQ server
passwd Password for the RabbitMQ server
queue Queue to consume from
dbserver Hostname of the SQL server
dbport Port of the SQL server
dbname Name of the SQL database to use
dbuser Database username
dbpasswd Database passwork
logfile Message log filename

View File

@ -0,0 +1,8 @@
#Use the '-I' prefix for include and '-L' for library directories
#You can use multiple library and include directories
#Path to the rabbitmq-c and mysqlclient libraries
LIBRARY_DIRS :=-L/usr/lib64
#path to headers
INCLUDE_DIRS :=-I/usr/include -I/usr/include/mysql

View File

@ -0,0 +1,524 @@
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <ini.h>
#include <stdint.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include <mysql.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
typedef struct delivery_t
{
uint64_t dtag;
amqp_message_t* message;
struct delivery_t *next,*prev;
}DELIVERY;
typedef struct consumer_t
{
char *hostname,*vhost,*user,*passwd,*queue,*dbserver,*dbname,*dbuser,*dbpasswd;
DELIVERY* query_stack;
int port,dbport;
}CONSUMER;
static int all_ok;
static FILE* out_fd;
static CONSUMER* c_inst;
static char* DB_DATABASE = "CREATE DATABASE IF NOT EXISTS %s;";
static char* DB_TABLE = "CREATE TABLE IF NOT EXISTS pairs (tag VARCHAR(64) PRIMARY KEY NOT NULL, query VARCHAR(2048), reply VARCHAR(2048), date_in DATETIME NOT NULL, date_out DATETIME DEFAULT NULL, counter INT DEFAULT 1)";
static char* DB_INSERT = "INSERT INTO pairs(tag, query, date_in) VALUES ('%s','%s',FROM_UNIXTIME(%s))";
static char* DB_UPDATE = "UPDATE pairs SET reply='%s', date_out=FROM_UNIXTIME(%s) WHERE tag='%s'";
static char* DB_INCREMENT = "UPDATE pairs SET counter = counter+1, date_out=FROM_UNIXTIME(%s) WHERE query='%s'";
void sighndl(int signum)
{
if(signum == SIGINT){
all_ok = 0;
alarm(1);
}
}
int handler(void* user, const char* section, const char* name,
const char* value)
{
if(strcmp(section,"consumer") == 0){
if(strcmp(name,"hostname") == 0){
c_inst->hostname = strdup(value);
}else if(strcmp(name,"vhost") == 0){
c_inst->vhost = strdup(value);
}else if(strcmp(name,"port") == 0){
c_inst->port = atoi(value);
}else if(strcmp(name,"user") == 0){
c_inst->user = strdup(value);
}else if(strcmp(name,"passwd") == 0){
c_inst->passwd = strdup(value);
}else if(strcmp(name,"queue") == 0){
c_inst->queue = strdup(value);
}else if(strcmp(name,"dbserver") == 0){
c_inst->dbserver = strdup(value);
}else if(strcmp(name,"dbport") == 0){
c_inst->dbport = atoi(value);
}else if(strcmp(name,"dbname") == 0){
c_inst->dbname = strdup(value);
}else if(strcmp(name,"dbuser") == 0){
c_inst->dbuser = strdup(value);
}else if(strcmp(name,"dbpasswd") == 0){
c_inst->dbpasswd = strdup(value);
}else if(strcmp(name,"logfile") == 0){
out_fd = fopen(value,"ab");
}
}
return 1;
}
int isPair(amqp_message_t* a, amqp_message_t* b)
{
int keylen = a->properties.correlation_id.len >=
b->properties.correlation_id.len ?
a->properties.correlation_id.len :
b->properties.correlation_id.len;
return strncmp(a->properties.correlation_id.bytes,
b->properties.correlation_id.bytes,
keylen) == 0 ? 1 : 0;
}
int connectToServer(MYSQL* server)
{
mysql_init(server);
mysql_options(server,MYSQL_READ_DEFAULT_GROUP,"client");
mysql_options(server,MYSQL_OPT_USE_REMOTE_CONNECTION,0);
my_bool tr = 1;
mysql_options(server,MYSQL_OPT_RECONNECT,&tr);
MYSQL* result = mysql_real_connect(server,
c_inst->dbserver,
c_inst->dbuser,
c_inst->dbpasswd,
NULL,
c_inst->dbport,
NULL,
0);
if(result==NULL){
fprintf(out_fd,"Error: Could not connect to MySQL server: %s\n",mysql_error(server));
return 0;
}
int bsz = 1024;
char *qstr = calloc(bsz,sizeof(char));
if(!qstr){
fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
return 0;
}
/**Connection ok, check that the database and table exist*/
memset(qstr,0,bsz);
sprintf(qstr,DB_DATABASE,c_inst->dbname);
if(mysql_query(server,qstr)){
fprintf(stderr,"Error: Could not send query MySQL server: %s\n",mysql_error(server));
}
memset(qstr,0,bsz);
sprintf(qstr,"USE %s;",c_inst->dbname);
if(mysql_query(server,qstr)){
fprintf(stderr,"Error: Could not send query MySQL server: %s\n",mysql_error(server));
}
memset(qstr,0,bsz);
sprintf(qstr,DB_TABLE);
if(mysql_query(server,qstr)){
fprintf(stderr,"Error: Could not send query MySQL server: %s\n",mysql_error(server));
}
free(qstr);
return 1;
}
int sendMessage(MYSQL* server, amqp_message_t* msg)
{
int buffsz = (int)((msg->body.len + 1)*2+1) +
(int)((msg->properties.correlation_id.len + 1)*2+1) +
strlen(DB_INSERT),
rval = 0;
char *qstr = calloc(buffsz,sizeof(char)),
*rawmsg = calloc((msg->body.len + 1),sizeof(char)),
*clnmsg = calloc(((msg->body.len + 1)*2+1),sizeof(char)),
*rawdate = calloc((msg->body.len + 1),sizeof(char)),
*clndate = calloc(((msg->body.len + 1)*2+1),sizeof(char)),
*rawtag = calloc((msg->properties.correlation_id.len + 1),sizeof(char)),
*clntag = calloc(((msg->properties.correlation_id.len + 1)*2+1),sizeof(char));
sprintf(qstr,"%.*s",(int)msg->body.len,(char *)msg->body.bytes);
fprintf(out_fd,"Received: %s\n",qstr);
char *ptr = strtok(qstr,"|");
sprintf(rawdate,"%s",ptr);
ptr = strtok(NULL,"\n\0");
if(ptr == NULL){
fprintf(out_fd,"Message content not valid.\n");
rval = 1;
goto cleanup;
}
sprintf(rawmsg,"%s",ptr);
sprintf(rawtag,"%.*s",(int)msg->properties.correlation_id.len,(char *)msg->properties.correlation_id.bytes);
memset(qstr,0,buffsz);
mysql_real_escape_string(server,clnmsg,rawmsg,strnlen(rawmsg,msg->body.len + 1));
mysql_real_escape_string(server,clndate,rawdate,strnlen(rawdate,msg->body.len + 1));
mysql_real_escape_string(server,clntag,rawtag,strnlen(rawtag,msg->properties.correlation_id.len + 1));
if(strncmp(msg->properties.message_id.bytes,
"query",msg->properties.message_id.len) == 0)
{
sprintf(qstr,DB_INCREMENT,clndate,clnmsg);
rval = mysql_query(server,qstr);
if(mysql_affected_rows(server) == 0){
memset(qstr,0,buffsz);
sprintf(qstr,DB_INSERT,clntag,clnmsg,clndate);
rval = mysql_query(server,qstr);
}
}else if(strncmp(msg->properties.message_id.bytes,
"reply",msg->properties.message_id.len) == 0){
sprintf(qstr,DB_UPDATE,clnmsg,clndate,clntag);
rval = mysql_query(server,qstr);
}else{
rval = 1;
goto cleanup;
}
if(rval){
fprintf(stderr,"Could not send query to SQL server:%s\n",mysql_error(server));
goto cleanup;
}
cleanup:
free(qstr);
free(rawmsg);
free(clnmsg);
free(rawdate);
free(clndate);
free(rawtag);
free(clntag);
return rval;
}
int sendToServer(MYSQL* server, amqp_message_t* a, amqp_message_t* b){
amqp_message_t *msg, *reply;
int buffsz = 2048;
char *qstr = calloc(buffsz,sizeof(char));
if(!qstr){
fprintf(out_fd, "Fatal Error: Cannot allocate enough memory.\n");
free(qstr);
return 0;
}
if( a->properties.message_id.len == strlen("query") &&
strncmp(a->properties.message_id.bytes,"query",
a->properties.message_id.len) == 0){
msg = a;
reply = b;
}else{
msg = b;
reply = a;
}
printf("pair: %.*s\nquery: %.*s\nreply: %.*s\n",
(int)msg->properties.correlation_id.len,
(char *)msg->properties.correlation_id.bytes,
(int)msg->body.len,
(char *)msg->body.bytes,
(int)reply->body.len,
(char *)reply->body.bytes);
if((int)msg->body.len +
(int)reply->body.len +
(int)msg->properties.correlation_id.len + 50 >= buffsz)
{
char *qtmp = calloc(buffsz*2,sizeof(char));
free(qstr);
if(qtmp){
qstr = qtmp;
buffsz *= 2;
}else{
fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
return 0;
}
}
char *rawmsg = calloc((msg->body.len + 1),sizeof(char)),
*clnmsg = calloc(((msg->body.len + 1)*2+1),sizeof(char)),
*rawrpl = calloc((reply->body.len + 1),sizeof(char)),
*clnrpl = calloc(((reply->body.len + 1)*2+1),sizeof(char)),
*rawtag = calloc((msg->properties.correlation_id.len + 1),sizeof(char)),
*clntag = calloc(((msg->properties.correlation_id.len + 1)*2+1),sizeof(char));
sprintf(rawmsg,"%.*s",(int)msg->body.len,(char *)msg->body.bytes);
sprintf(rawrpl,"%.*s",(int)reply->body.len,(char *)reply->body.bytes);
sprintf(rawtag,"%.*s",(int)msg->properties.correlation_id.len,(char *)msg->properties.correlation_id.bytes);
char *ptr;
while((ptr = strchr(rawmsg,'\n'))){
*ptr = ' ';
}
while((ptr = strchr(rawrpl,'\n'))){
*ptr = ' ';
}
while((ptr = strchr(rawtag,'\n'))){
*ptr = ' ';
}
mysql_real_escape_string(server,clnmsg,rawmsg,strnlen(rawmsg,msg->body.len + 1));
mysql_real_escape_string(server,clnrpl,rawrpl,strnlen(rawrpl,reply->body.len + 1));
mysql_real_escape_string(server,clntag,rawtag,strnlen(rawtag,msg->properties.correlation_id.len + 1));
sprintf(qstr,"INSERT INTO pairs VALUES ('%s','%s','%s');",clnmsg,clnrpl,clntag);
free(rawmsg);
free(clnmsg);
free(rawrpl);
free(clnrpl);
free(rawtag);
free(clntag);
if(mysql_query(server,qstr)){
fprintf(stderr,"Could not send query to SQL server:%s\n",mysql_error(server));
free(qstr);
return 0;
}
free(qstr);
return 1;
}
int main(int argc, char** argv)
{
int channel = 1, status = AMQP_STATUS_OK, cnfnlen;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
amqp_rpc_reply_t ret;
amqp_message_t *reply = NULL;
amqp_frame_t frame;
struct timeval timeout;
MYSQL db_inst;
char ch, *cnfname = NULL, *cnfpath = NULL;
static const char* fname = "consumer.cnf";
if((c_inst = calloc(1,sizeof(CONSUMER))) == NULL){
fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
return 1;
}
if(signal(SIGINT,sighndl) == SIG_IGN){
signal(SIGINT,SIG_IGN);
}
while((ch = getopt(argc,argv,"c:"))!= -1){
switch(ch){
case 'c':
cnfnlen = strlen(optarg);
cnfpath = strdup(optarg);
break;
default:
break;
}
}
cnfname = calloc(cnfnlen + strlen(fname) + 1,sizeof(char));
if(cnfpath){
/**Config file path as argument*/
strcpy(cnfname,cnfpath);
if(cnfpath[cnfnlen-1] != '/'){
strcat(cnfname,"/");
}
}
strcat(cnfname,fname);
timeout.tv_sec = 1;
timeout.tv_usec = 0;
all_ok = 1;
out_fd = NULL;
/**Parse the INI file*/
if(ini_parse(cnfname,handler,NULL) < 0){
/**Try to parse a config in the same directory*/
if(ini_parse(fname,handler,NULL) < 0){
fprintf(stderr, "Fatal Error: Error parsing configuration file!\n");
goto fatal_error;
}
}
if(out_fd == NULL){
out_fd = stdout;
}
fprintf(out_fd,"\n--------------------------------------------------------------\n");
/**Confirm that all parameters were in the configuration file*/
if(!c_inst->hostname||!c_inst->vhost||!c_inst->user||
!c_inst->passwd||!c_inst->dbpasswd||!c_inst->queue||
!c_inst->dbserver||!c_inst->dbname||!c_inst->dbuser){
fprintf(stderr, "Fatal Error: Inadequate configuration file!\n");
goto fatal_error;
}
connectToServer(&db_inst);
if((conn = amqp_new_connection()) == NULL ||
(socket = amqp_tcp_socket_new(conn)) == NULL){
fprintf(stderr, "Fatal Error: Cannot create connection object or socket.\n");
goto fatal_error;
}
if(amqp_socket_open(socket, c_inst->hostname, c_inst->port)){
fprintf(stderr, "RabbitMQ Error: Cannot open socket.\n");
goto error;
}
ret = amqp_login(conn, c_inst->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, c_inst->user, c_inst->passwd);
if(ret.reply_type != AMQP_RESPONSE_NORMAL){
fprintf(stderr, "RabbitMQ Error: Cannot login to server.\n");
goto error;
}
amqp_channel_open(conn, channel);
ret = amqp_get_rpc_reply(conn);
if(ret.reply_type != AMQP_RESPONSE_NORMAL){
fprintf(stderr, "RabbitMQ Error: Cannot open channel.\n");
goto error;
}
reply = malloc(sizeof(amqp_message_t));
if(!reply){
fprintf(stderr, "Error: Cannot allocate enough memory.\n");
goto error;
}
amqp_basic_consume(conn,channel,amqp_cstring_bytes(c_inst->queue),amqp_empty_bytes,0,0,0,amqp_empty_table);
while(all_ok){
status = amqp_simple_wait_frame_noblock(conn,&frame,&timeout);
/**No frames to read from server, possibly out of messages*/
if(status == AMQP_STATUS_TIMEOUT){
sleep(timeout.tv_sec);
continue;
}
if(frame.payload.method.id == AMQP_BASIC_DELIVER_METHOD){
amqp_basic_deliver_t* decoded = (amqp_basic_deliver_t*)frame.payload.method.decoded;
amqp_read_message(conn,channel,reply,0);
if(sendMessage(&db_inst,reply)){
fprintf(stderr,"RabbitMQ Error: Received malformed message.\n");
amqp_basic_reject(conn,channel,decoded->delivery_tag,0);
amqp_destroy_message(reply);
}else{
amqp_basic_ack(conn,channel,decoded->delivery_tag,0);
amqp_destroy_message(reply);
}
}else{
fprintf(stderr,"RabbitMQ Error: Received method from server: %s\n",amqp_method_name(frame.payload.method.id));
all_ok = 0;
goto error;
}
}
fprintf(out_fd,"Shutting down...\n");
error:
mysql_close(&db_inst);
mysql_library_end();
if(c_inst && c_inst->query_stack){
while(c_inst->query_stack){
DELIVERY* d = c_inst->query_stack->next;
amqp_destroy_message(c_inst->query_stack->message);
free(c_inst->query_stack);
c_inst->query_stack = d;
}
}
amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
fatal_error:
if(out_fd){
fclose(out_fd);
}
if(c_inst){
free(c_inst->hostname);
free(c_inst->vhost);
free(c_inst->user);
free(c_inst->passwd);
free(c_inst->queue);
free(c_inst->dbserver);
free(c_inst->dbname);
free(c_inst->dbuser);
free(c_inst->dbpasswd);
free(c_inst);
}
return all_ok;
}

View File

@ -0,0 +1,28 @@
#
#The options for the consumer are:
#hostname RabbitMQ hostname
#port RabbitMQ port
#vhost RabbitMQ virtual host
#user RabbitMQ username
#passwd RabbitMQ password
#queue Name of the queue to use
#dbserver SQL server name
#dbport SQL server port
#dbname Name of the databse to use
#dbuser SQL server username
#dbpasswd SQL server password
#logfile Message log filename
#
[consumer]
hostname=127.0.0.1
port=5673
vhost=/
user=guest
passwd=guest
queue=q1
dbserver=127.0.0.1
dbport=3000
dbname=mqpairs
dbuser=maxuser
dbpasswd=maxpwd
#logfile=consumer.log

Binary file not shown.

Binary file not shown.

BIN
rabbitmq_consumer/inih/._cpp Executable file

Binary file not shown.

BIN
rabbitmq_consumer/inih/._examples Executable file

Binary file not shown.

BIN
rabbitmq_consumer/inih/._extra Executable file

Binary file not shown.

BIN
rabbitmq_consumer/inih/._ini.c Executable file

Binary file not shown.

BIN
rabbitmq_consumer/inih/._ini.h Executable file

Binary file not shown.

BIN
rabbitmq_consumer/inih/._tests Executable file

Binary file not shown.

3
rabbitmq_consumer/inih/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
*.o
*.a
make.depend

View File

@ -0,0 +1 @@
add_library(inih ini.c)

View File

@ -0,0 +1,27 @@
The "inih" library is distributed under the New BSD license:
Copyright (c) 2009, Brush Technology
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Brush Technology nor the names of its contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY BRUSH TECHNOLOGY ''AS IS'' AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL BRUSH TECHNOLOGY BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,5 @@
inih is a simple .INI file parser written in C, released under the New BSD
license (see LICENSE.txt). Go to the project home page for more info:
http://code.google.com/p/inih/

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,67 @@
// Read an INI file into easy-to-access name/value pairs.
#include <algorithm>
#include <cctype>
#include <cstdlib>
#include "../ini.h"
#include "INIReader.h"
using std::string;
INIReader::INIReader(string filename)
{
_error = ini_parse(filename.c_str(), ValueHandler, this);
}
int INIReader::ParseError()
{
return _error;
}
string INIReader::Get(string section, string name, string default_value)
{
string key = MakeKey(section, name);
return _values.count(key) ? _values[key] : default_value;
}
long INIReader::GetInteger(string section, string name, long default_value)
{
string valstr = Get(section, name, "");
const char* value = valstr.c_str();
char* end;
// This parses "1234" (decimal) and also "0x4D2" (hex)
long n = strtol(value, &end, 0);
return end > value ? n : default_value;
}
bool INIReader::GetBoolean(string section, string name, bool default_value)
{
string valstr = Get(section, name, "");
// Convert to lower case to make string comparisons case-insensitive
std::transform(valstr.begin(), valstr.end(), valstr.begin(), ::tolower);
if (valstr == "true" || valstr == "yes" || valstr == "on" || valstr == "1")
return true;
else if (valstr == "false" || valstr == "no" || valstr == "off" || valstr == "0")
return false;
else
return default_value;
}
string INIReader::MakeKey(string section, string name)
{
string key = section + "." + name;
// Convert to lower case to make section/name lookups case-insensitive
std::transform(key.begin(), key.end(), key.begin(), ::tolower);
return key;
}
int INIReader::ValueHandler(void* user, const char* section, const char* name,
const char* value)
{
INIReader* reader = (INIReader*)user;
string key = MakeKey(section, name);
if (reader->_values[key].size() > 0)
reader->_values[key] += "\n";
reader->_values[key] += value;
return 1;
}

View File

@ -0,0 +1,48 @@
// Read an INI file into easy-to-access name/value pairs.
// inih and INIReader are released under the New BSD license (see LICENSE.txt).
// Go to the project home page for more info:
//
// http://code.google.com/p/inih/
#ifndef __INIREADER_H__
#define __INIREADER_H__
#include <map>
#include <string>
// Read an INI file into easy-to-access name/value pairs. (Note that I've gone
// for simplicity here rather than speed, but it should be pretty decent.)
class INIReader
{
public:
// Construct INIReader and parse given filename. See ini.h for more info
// about the parsing.
INIReader(std::string filename);
// Return the result of ini_parse(), i.e., 0 on success, line number of
// first error on parse error, or -1 on file open error.
int ParseError();
// Get a string value from INI file, returning default_value if not found.
std::string Get(std::string section, std::string name,
std::string default_value);
// Get an integer (long) value from INI file, returning default_value if
// not found or not a valid integer (decimal "1234", "-1234", or hex "0x4d2").
long GetInteger(std::string section, std::string name, long default_value);
// Get a boolean value from INI file, returning default_value if not found or if
// not a valid true/false value. Valid true values are "true", "yes", "on", "1",
// and valid false values are "false", "no", "off", "0" (not case sensitive).
bool GetBoolean(std::string section, std::string name, bool default_value);
private:
int _error;
std::map<std::string, std::string> _values;
static std::string MakeKey(std::string section, std::string name);
static int ValueHandler(void* user, const char* section, const char* name,
const char* value);
};
#endif // __INIREADER_H__

View File

@ -0,0 +1,20 @@
// Example that shows simple usage of the INIReader class
#include <iostream>
#include "INIReader.h"
int main()
{
INIReader reader("../examples/test.ini");
if (reader.ParseError() < 0) {
std::cout << "Can't load 'test.ini'\n";
return 1;
}
std::cout << "Config loaded from 'test.ini': version="
<< reader.GetInteger("protocol", "version", -1) << ", name="
<< reader.Get("user", "name", "UNKNOWN") << ", email="
<< reader.Get("user", "email", "UNKNOWN") << ", active="
<< reader.GetBoolean("user", "active", true) << "\n";
return 0;
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,8 @@
// CFG(section, name, default)
CFG(protocol, version, "0")
CFG(user, name, "Fatty Lumpkin")
CFG(user, email, "fatty@lumpkin.com")
#undef CFG

View File

@ -0,0 +1,40 @@
/* ini.h example that simply dumps an INI file without comments */
#include <stdio.h>
#include <string.h>
#include "../ini.h"
static int dumper(void* user, const char* section, const char* name,
const char* value)
{
static char prev_section[50] = "";
if (strcmp(section, prev_section)) {
printf("%s[%s]\n", (prev_section[0] ? "\n" : ""), section);
strncpy(prev_section, section, sizeof(prev_section));
prev_section[sizeof(prev_section) - 1] = '\0';
}
printf("%s = %s\n", name, value);
return 1;
}
int main(int argc, char* argv[])
{
int error;
if (argc <= 1) {
printf("Usage: ini_dump filename.ini\n");
return 1;
}
error = ini_parse(argv[1], dumper, NULL);
if (error < 0) {
printf("Can't read '%s'!\n", argv[1]);
return 2;
}
else if (error) {
printf("Bad config file (first error on line %d)!\n", error);
return 3;
}
return 0;
}

View File

@ -0,0 +1,44 @@
/* Example: parse a simple configuration file */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "../ini.h"
typedef struct
{
int version;
const char* name;
const char* email;
} configuration;
static int handler(void* user, const char* section, const char* name,
const char* value)
{
configuration* pconfig = (configuration*)user;
#define MATCH(s, n) strcmp(section, s) == 0 && strcmp(name, n) == 0
if (MATCH("protocol", "version")) {
pconfig->version = atoi(value);
} else if (MATCH("user", "name")) {
pconfig->name = strdup(value);
} else if (MATCH("user", "email")) {
pconfig->email = strdup(value);
} else {
return 0; /* unknown section/name, error */
}
return 1;
}
int main(int argc, char* argv[])
{
configuration config;
if (ini_parse("test.ini", handler, &config) < 0) {
printf("Can't load 'test.ini'\n");
return 1;
}
printf("Config loaded from 'test.ini': version=%d, name=%s, email=%s\n",
config.version, config.name, config.email);
return 0;
}

View File

@ -0,0 +1,46 @@
/* Parse a configuration file into a struct using X-Macros */
#include <stdio.h>
#include <string.h>
#include "../ini.h"
/* define the config struct type */
typedef struct {
#define CFG(s, n, default) char *s##_##n;
#include "config.def"
} config;
/* create one and fill in its default values */
config Config = {
#define CFG(s, n, default) default,
#include "config.def"
};
/* process a line of the INI file, storing valid values into config struct */
int handler(void *user, const char *section, const char *name,
const char *value)
{
config *cfg = (config *)user;
if (0) ;
#define CFG(s, n, default) else if (strcmp(section, #s)==0 && \
strcmp(name, #n)==0) cfg->s##_##n = strdup(value);
#include "config.def"
return 1;
}
/* print all the variables in the config, one per line */
void dump_config(config *cfg)
{
#define CFG(s, n, default) printf("%s_%s = %s\n", #s, #n, cfg->s##_##n);
#include "config.def"
}
int main(int argc, char* argv[])
{
if (ini_parse("test.ini", handler, &Config) < 0)
printf("Can't load 'test.ini', using defaults\n");
dump_config(&Config);
return 0;
}

View File

@ -0,0 +1,9 @@
; Test config file for ini_example.c and INIReaderTest.cpp
[protocol] ; Protocol configuration
version=6 ; IPv6
[user]
name = Bob Smith ; Spaces around '=' are stripped
email = bob@smith.com ; And comments (like this) ignored
active = true ; Test a boolean

Binary file not shown.

View File

@ -0,0 +1,19 @@
# Simple makefile to build inih as a static library using g++
SRC = ../ini.c
OBJ = $(SRC:.c=.o)
OUT = libinih.a
INCLUDES = -I..
CCFLAGS = -g -O2
CC = g++
default: $(OUT)
.c.o:
$(CC) $(INCLUDES) $(CCFLAGS) $(EXTRACCFLAGS) -c $< -o $@
$(OUT): $(OBJ)
ar rcs $(OUT) $(OBJ) $(EXTRAARFLAGS)
clean:
rm -f $(OBJ) $(OUT)

176
rabbitmq_consumer/inih/ini.c Executable file
View File

@ -0,0 +1,176 @@
/* inih -- simple .INI file parser
inih is released under the New BSD license (see LICENSE.txt). Go to the project
home page for more info:
http://code.google.com/p/inih/
*/
#include <stdio.h>
#include <ctype.h>
#include <string.h>
#include "ini.h"
#if !INI_USE_STACK
#include <stdlib.h>
#endif
#define MAX_SECTION 50
#define MAX_NAME 50
/* Strip whitespace chars off end of given string, in place. Return s. */
static char* rstrip(char* s)
{
char* p = s + strlen(s);
while (p > s && isspace((unsigned char)(*--p)))
*p = '\0';
return s;
}
/* Return pointer to first non-whitespace char in given string. */
static char* lskip(const char* s)
{
while (*s && isspace((unsigned char)(*s)))
s++;
return (char*)s;
}
/* Return pointer to first char c or ';' comment in given string, or pointer to
null at end of string if neither found. ';' must be prefixed by a whitespace
character to register as a comment. */
static char* find_char_or_comment(const char* s, char c)
{
int was_whitespace = 0;
while (*s && *s != c && !(was_whitespace && *s == ';')) {
was_whitespace = isspace((unsigned char)(*s));
s++;
}
return (char*)s;
}
/* Version of strncpy that ensures dest (size bytes) is null-terminated. */
static char* strncpy0(char* dest, const char* src, size_t size)
{
strncpy(dest, src, size);
dest[size - 1] = '\0';
return dest;
}
/* See documentation in header file. */
int ini_parse_file(FILE* file,
int (*handler)(void*, const char*, const char*,
const char*),
void* user)
{
/* Uses a fair bit of stack (use heap instead if you need to) */
#if INI_USE_STACK
char line[INI_MAX_LINE];
#else
char* line;
#endif
char section[MAX_SECTION] = "";
char prev_name[MAX_NAME] = "";
char* start;
char* end;
char* name;
char* value;
int lineno = 0;
int error = 0;
#if !INI_USE_STACK
line = (char*)malloc(INI_MAX_LINE);
if (!line) {
return -2;
}
#endif
/* Scan through file line by line */
while (fgets(line, INI_MAX_LINE, file) != NULL) {
lineno++;
start = line;
#if INI_ALLOW_BOM
if (lineno == 1 && (unsigned char)start[0] == 0xEF &&
(unsigned char)start[1] == 0xBB &&
(unsigned char)start[2] == 0xBF) {
start += 3;
}
#endif
start = lskip(rstrip(start));
if (*start == ';' || *start == '#') {
/* Per Python ConfigParser, allow '#' comments at start of line */
}
#if INI_ALLOW_MULTILINE
else if (*prev_name && *start && start > line) {
/* Non-black line with leading whitespace, treat as continuation
of previous name's value (as per Python ConfigParser). */
if (!handler(user, section, prev_name, start) && !error)
error = lineno;
}
#endif
else if (*start == '[') {
/* A "[section]" line */
end = find_char_or_comment(start + 1, ']');
if (*end == ']') {
*end = '\0';
strncpy0(section, start + 1, sizeof(section));
*prev_name = '\0';
}
else if (!error) {
/* No ']' found on section line */
error = lineno;
}
}
else if (*start && *start != ';') {
/* Not a comment, must be a name[=:]value pair */
end = find_char_or_comment(start, '=');
if (*end != '=') {
end = find_char_or_comment(start, ':');
}
if (*end == '=' || *end == ':') {
*end = '\0';
name = rstrip(start);
value = lskip(end + 1);
end = find_char_or_comment(value, '\0');
if (*end == ';')
*end = '\0';
rstrip(value);
/* Valid name[=:]value pair found, call handler */
strncpy0(prev_name, name, sizeof(prev_name));
if (!handler(user, section, name, value) && !error)
error = lineno;
}
else if (!error) {
/* No '=' or ':' found on name[=:]value line */
error = lineno;
}
}
}
#if !INI_USE_STACK
free(line);
#endif
return error;
}
/* See documentation in header file. */
int ini_parse(const char* filename,
int (*handler)(void*, const char*, const char*, const char*),
void* user)
{
FILE* file;
int error;
file = fopen(filename, "r");
if (!file)
return -1;
error = ini_parse_file(file, handler, user);
fclose(file);
return error;
}

72
rabbitmq_consumer/inih/ini.h Executable file
View File

@ -0,0 +1,72 @@
/* inih -- simple .INI file parser
inih is released under the New BSD license (see LICENSE.txt). Go to the project
home page for more info:
http://code.google.com/p/inih/
*/
#ifndef __INI_H__
#define __INI_H__
/* Make this header file easier to include in C++ code */
#ifdef __cplusplus
extern "C" {
#endif
#include <stdio.h>
/* Parse given INI-style file. May have [section]s, name=value pairs
(whitespace stripped), and comments starting with ';' (semicolon). Section
is "" if name=value pair parsed before any section heading. name:value
pairs are also supported as a concession to Python's ConfigParser.
For each name=value pair parsed, call handler function with given user
pointer as well as section, name, and value (data only valid for duration
of handler call). Handler should return nonzero on success, zero on error.
Returns 0 on success, line number of first error on parse error (doesn't
stop on first error), -1 on file open error, or -2 on memory allocation
error (only when INI_USE_STACK is zero).
*/
int ini_parse(const char* filename,
int (*handler)(void* user, const char* section,
const char* name, const char* value),
void* user);
/* Same as ini_parse(), but takes a FILE* instead of filename. This doesn't
close the file when it's finished -- the caller must do that. */
int ini_parse_file(FILE* file,
int (*handler)(void* user, const char* section,
const char* name, const char* value),
void* user);
/* Nonzero to allow multi-line value parsing, in the style of Python's
ConfigParser. If allowed, ini_parse() will call the handler with the same
name for each subsequent line parsed. */
#ifndef INI_ALLOW_MULTILINE
#define INI_ALLOW_MULTILINE 1
#endif
/* Nonzero to allow a UTF-8 BOM sequence (0xEF 0xBB 0xBF) at the start of
the file. See http://code.google.com/p/inih/issues/detail?id=21 */
#ifndef INI_ALLOW_BOM
#define INI_ALLOW_BOM 1
#endif
/* Nonzero to use stack, zero to use heap (malloc/free). */
#ifndef INI_USE_STACK
#define INI_USE_STACK 1
#endif
/* Maximum line length for any line in INI file. */
#ifndef INI_MAX_LINE
#define INI_MAX_LINE 200
#endif
#ifdef __cplusplus
}
#endif
#endif /* __INI_H__ */

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1 @@
This is an error

View File

@ -0,0 +1 @@
indented

View File

@ -0,0 +1,5 @@
[section1]
name1=value1
[section2
[section3 ; comment ]
name2=value2

View File

@ -0,0 +1,47 @@
no_file.ini: e=-1 user=0
... [section1]
... one=This is a test;
... two=1234;
... [ section 2 ]
... happy=4;
... sad=;
... [comment_test]
... test1=1;2;3;
... test2=2;3;4;this won't be a comment, needs whitespace before ';';
... test;3=345;
... test4=4#5#6;
... [colon_tests]
... Content-Type=text/html;
... foo=bar;
... adams=42;
normal.ini: e=0 user=101
... [section1]
... name1=value1;
... name2=value2;
bad_section.ini: e=3 user=102
bad_comment.ini: e=1 user=102
... [section]
... a=b;
... user=parse_error;
... c=d;
user_error.ini: e=3 user=104
... [section1]
... single1=abc;
... multi=this is a;
... multi=multi-line value;
... single2=xyz;
... [section2]
... multi=a;
... multi=b;
... multi=c;
... [section3]
... single=ghi;
... multi=the quick;
... multi=brown fox;
... name=bob smith;
multi_line.ini: e=0 user=105
bad_multi.ini: e=1 user=105
... [bom_section]
... bom_name=bom_value;
... key“=value“;
bom.ini: e=0 user=107

View File

@ -0,0 +1,43 @@
no_file.ini: e=-1 user=0
... [section1]
... one=This is a test;
... two=1234;
... [ section 2 ]
... happy=4;
... sad=;
... [comment_test]
... test1=1;2;3;
... test2=2;3;4;this won't be a comment, needs whitespace before ';';
... test;3=345;
... test4=4#5#6;
... [colon_tests]
... Content-Type=text/html;
... foo=bar;
... adams=42;
normal.ini: e=0 user=101
... [section1]
... name1=value1;
... name2=value2;
bad_section.ini: e=3 user=102
bad_comment.ini: e=1 user=102
... [section]
... a=b;
... user=parse_error;
... c=d;
user_error.ini: e=3 user=104
... [section1]
... single1=abc;
... multi=this is a;
... single2=xyz;
... [section2]
... multi=a;
... [section3]
... single=ghi;
... multi=the quick;
... name=bob smith;
multi_line.ini: e=4 user=105
bad_multi.ini: e=1 user=105
... [bom_section]
... bom_name=bom_value;
... key“=value“;
bom.ini: e=0 user=107

View File

@ -0,0 +1,3 @@
[bom_section]
bom_name=bom_value
key“ = value“

View File

@ -0,0 +1,15 @@
[section1]
single1 = abc
multi = this is a
multi-line value
single2 = xyz
[section2]
multi = a
b
c
[section3]
single: ghi
multi: the quick
brown fox
name = bob smith ; comment line 1
; comment line 2

View File

@ -0,0 +1,25 @@
; This is an INI file
[section1] ; section comment
one=This is a test ; name=value comment
two = 1234
; x=y
[ section 2 ]
happy = 4
sad =
[empty]
; do nothing
[comment_test]
test1 = 1;2;3 ; only this will be a comment
test2 = 2;3;4;this won't be a comment, needs whitespace before ';'
test;3 = 345 ; key should be "test;3"
test4 = 4#5#6 ; '#' only starts a comment at start of line
#test5 = 567 ; entire line commented
# test6 = 678 ; entire line commented, except in MULTILINE mode
[colon_tests]
Content-Type: text/html
foo:bar
adams : 42

View File

@ -0,0 +1,2 @@
@call tcc ..\ini.c -I..\ -run unittest.c > baseline_multi.txt
@call tcc ..\ini.c -I..\ -DINI_ALLOW_MULTILINE=0 -run unittest.c > baseline_single.txt

View File

@ -0,0 +1,58 @@
/* inih -- unit tests
This works simply by dumping a bunch of info to standard output, which is
redirected to an output file (baseline_*.txt) and checked into the Subversion
repository. This baseline file is the test output, so the idea is to check it
once, and if it changes -- look at the diff and see which tests failed.
Here's how I produced the two baseline files (with Tiny C Compiler):
tcc -DINI_ALLOW_MULTILINE=1 ../ini.c -run unittest.c > baseline_multi.txt
tcc -DINI_ALLOW_MULTILINE=0 ../ini.c -run unittest.c > baseline_single.txt
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "../ini.h"
int User;
char Prev_section[50];
int dumper(void* user, const char* section, const char* name,
const char* value)
{
User = (int)user;
if (strcmp(section, Prev_section)) {
printf("... [%s]\n", section);
strncpy(Prev_section, section, sizeof(Prev_section));
Prev_section[sizeof(Prev_section) - 1] = '\0';
}
printf("... %s=%s;\n", name, value);
return strcmp(name, "user")==0 && strcmp(value, "parse_error")==0 ? 0 : 1;
}
void parse(const char* fname) {
static int u = 100;
int e;
*Prev_section = '\0';
e = ini_parse(fname, dumper, (void*)u);
printf("%s: e=%d user=%d\n", fname, e, User);
u++;
}
int main(void)
{
parse("no_file.ini");
parse("normal.ini");
parse("bad_section.ini");
parse("bad_comment.ini");
parse("user_error.ini");
parse("multi_line.ini");
parse("bad_multi.ini");
parse("bom.ini");
return 0;
}

View File

@ -0,0 +1,4 @@
[section]
a = b
user = parse_error
c = d

View File

@ -0,0 +1,55 @@
%define _topdir %(echo $PWD)/
%define name rabbitmq-message-consumer
%define release beta
%define version 1.0
%define install_path /usr/local/skysql/maxscale/extra/consumer/
BuildRoot: %{buildroot}
Summary: rabbitmq-message-consumer
License: GPL
Name: %{name}
Version: %{version}
Release: %{release}
Source: %{name}-%{version}-%{release}.tar.gz
Prefix: /
Group: Development/Tools
Requires: maxscale
%if 0%{?suse_version}
BuildRequires: gcc gcc-c++ ncurses-devel bison glibc-devel cmake libgcc_s1 perl make libtool libopenssl-devel libaio libaio-devel mariadb libedit-devel librabbitmq-devel MariaDB-shared
%else
BuildRequires: gcc gcc-c++ ncurses-devel bison glibc-devel cmake libgcc perl make libtool openssl-devel libaio libaio-devel librabbitmq-devel MariaDB-shared
%if 0%{?rhel} == 6
BuildRequires: libedit-devel
%endif
%if 0%{?rhel} == 7
BuildRequires: mariadb-devel mariadb-embedded-devel libedit-devel
%else
BuildRequires: MariaDB-devel MariaDB-server
%endif
%endif
%description
rabbitmq-message-consumer
%prep
%setup -q
%build
make clean
make
%install
mkdir -p $RPM_BUILD_ROOT%{install_path}
cp consumer $RPM_BUILD_ROOT%{install_path}
cp consumer.cnf $RPM_BUILD_ROOT%{install_path}
%clean
%files
%defattr(-,root,root)
%{install_path}/consumer
%{install_path}/consumer.cnf
%changelog

View File

@ -24,6 +24,9 @@
# 08/07/13 Mark Riddoch Addition of monitor modules
# 16/07/13 Mark Riddoch Renamed things to match the new naming
include ../build_gateway.inc
include ../makefile.inc
DEST=$(HOME)/usr/local/skysql
all:
@ -45,7 +48,7 @@ testall:
$(MAKE) -C test HAVE_SRV=$(HAVE_SRV) testall
clean:
(cd Documentation; rm -rf html)
(cd Documentation; $(DEL) html)
(cd core; touch depend.mk ; make clean)
(cd modules/routing; touch depend.mk ; make clean)
(cd modules/protocol; touch depend.mk ; make clean)

View File

@ -13,7 +13,7 @@ threads=1
# Define a monitor that can be used to determine the state and role of
# the servers.
#
# Valid options are:
# Valid options for all monitors are:
#
# module=<name of module to load>
# servers=<server name>,<server name>,...
@ -29,6 +29,16 @@ module=mysqlmon
servers=server1,server2,server3
user=maxuser
passwd=maxpwd
#
# options for mysql_monitor only
#
# detect_replication_lag=<enable detection of replication slaves lag
# via replication_heartbeat table,
# default value is 0>
# detect_stale_master=<if the replication is stopped or misconfigured
# the previous detected master will be still available
# until monitor or MaxSclale restart,
# default value is 0>
# A series of service definition
#
@ -41,7 +51,8 @@ passwd=maxpwd
# enable_root_user=<0 or 1, default is 0>
# version_string=<specific string for server handshake,
# default is the MariaDB embedded library version>
#
#
# use_sql_variables_in=[master|all] (default all)
# router_options=<option[=value]>,<option[=value]>,...
# where value=[master|slave|synced]
#
@ -60,6 +71,7 @@ router=readwritesplit
servers=server1,server2,server3
user=maxuser
passwd=maxpwd
use_sql_variables_in=all
max_slave_connections=50%
max_slave_replication_lag=30
router_options=slave_selection_criteria=LEAST_BEHIND_MASTER

View File

@ -1,3 +1,4 @@
*.o
# binaries generated here
maxscale
depend.mk
maxkeys
maxpasswd

View File

@ -33,7 +33,8 @@
# 29/06/13 Vilho Raatikka Reverted Query classifier changes because
# gateway needs mysql client lib, not qc.
# 24/07/13 Mark Ridoch Addition of encryption routines
# 30/05/14 Mark Ridoch Filter API added
# 30/05/14 Mark Riddoch Filter API added
# 25/07/14 Mark Riddoch Addition of hints
# 29/08/14 Mark Riddoch Added housekeeper
include ../../build_gateway.inc
@ -64,7 +65,7 @@ include ../../makefile.inc
SRCS= atomic.c buffer.c spinlock.c gateway.c \
gw_utils.c utils.c dcb.c load_utils.c session.c service.c server.c \
poll.c config.c users.c hashtable.c dbusers.c thread.c gwbitmask.c \
monitor.c adminusers.c secrets.c filter.c modutil.c housekeeper.c
monitor.c adminusers.c secrets.c filter.c modutil.c hint.c housekeeper.c
HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \
../include/gw.h ../modules/include/mysql_client_server_protocol.h \
@ -72,7 +73,7 @@ HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \
../include/modules.h ../include/poll.h ../include/config.h \
../include/users.h ../include/hashtable.h ../include/gwbitmask.h \
../include/adminusers.h ../include/version.h ../include/maxscale.h \
../include/filter.h ../include/modutil.h ../include/housekeeper.h
../include/filter.h ../include/modutil.h ../hint.h ../include/housekeeper.h
OBJ=$(SRCS:.c=.o)
@ -110,14 +111,14 @@ maxpasswd: $(POBJS)
echo '#define MAXSCALE_VERSION "'`cat ../../VERSION`'"' > ../include/version.h
clean:
rm -f $(OBJ) maxscale
- rm *.so
$(DEL) $(OBJ) maxscale
$(DEL) *.so
tags:
ctags $(SRCS) $(HDRS)
depend: ../include/version.h
@rm -f depend.mk
@$(DEL) depend.mk
cc -M $(CFLAGS) $(SRCS) > depend.mk
install: maxscale maxkeys maxpasswd

View File

@ -298,6 +298,7 @@ char* admin_remove_user(
fname,
err)));
fclose(fp);
fclose(fp_tmp);
unlink(fname_tmp);
return ADMIN_ERR_PWDFILEACCESS;
}
@ -325,6 +326,7 @@ char* admin_remove_user(
fname,
err)));
fclose(fp);
fclose(fp_tmp);
unlink(fname_tmp);
return ADMIN_ERR_PWDFILEACCESS;
}

Some files were not shown because too many files have changed in this diff Show More