This commit is contained in:
VilhoRaatikka 2014-09-04 15:49:10 +03:00
commit bac40654d9
66 changed files with 3366 additions and 278 deletions

View File

@ -890,26 +890,61 @@ char* skygw_query_classifier_get_stmtname(
}
/**
*Returns the LEX struct of the parsed GWBUF
*@param The parsed GWBUF
*@return Pointer to the LEX struct or NULL if an error occurred or the query was not parsed
*/
LEX* get_lex(GWBUF* querybuf)
{
parsing_info_t* pi;
MYSQL* mysql;
THD* thd;
if (!GWBUF_IS_PARSED(querybuf))
{
return NULL;
}
pi = (parsing_info_t *)gwbuf_get_buffer_object_data(querybuf,
GWBUF_PARSING_INFO);
if (pi == NULL)
{
return NULL;
}
if ((mysql = (MYSQL *)pi->pi_handle) == NULL ||
(thd = (THD *)mysql->thd) == NULL)
{
ss_dassert(mysql != NULL &&
thd != NULL);
return NULL;
}
return thd->lex;
}
/**
* Finds the head of the list of tables affected by the current select statement.
* @param thd Pointer to a valid THD
* @return Pointer to the head of the TABLE_LIST chain or NULL in case of an error
*/
void* skygw_get_affected_tables(void* thdp)
void* skygw_get_affected_tables(void* lexptr)
{
THD* thd = (THD*)thdp;
LEX* lex = (LEX*)lexptr;
if(thd == NULL ||
thd->lex == NULL ||
thd->lex->current_select == NULL)
if(lex == NULL ||
lex->current_select == NULL)
{
ss_dassert(thd != NULL &&
thd->lex != NULL &&
thd->lex->current_select != NULL);
ss_dassert(lex != NULL &&
lex->current_select != NULL);
return NULL;
}
return (void*)thd->lex->current_select->table_list.first;
return (void*)lex->current_select->table_list.first;
}
@ -922,45 +957,25 @@ void* skygw_get_affected_tables(void* thdp)
* @param tblsize Pointer where the number of tables is written
* @return Array of null-terminated strings with the table names
*/
char** skygw_get_table_names(GWBUF* querybuf,int* tblsize)
char** skygw_get_table_names(GWBUF* querybuf,int* tblsize, bool fullnames)
{
parsing_info_t* pi;
MYSQL* mysql;
THD* thd;
TABLE_LIST* tbl;
int i = 0, currtblsz = 0;
char**tables,**tmp;
if (!GWBUF_IS_PARSED(querybuf))
LEX* lex;
TABLE_LIST* tbl;
int i = 0,
currtblsz = 0;
char **tables,
**tmp;
if((lex = get_lex(querybuf)) == NULL)
{
tables = NULL;
goto retblock;
}
pi = (parsing_info_t *)gwbuf_get_buffer_object_data(querybuf,
GWBUF_PARSING_INFO);
}
if (pi == NULL)
{
tables = NULL;
goto retblock;
}
if (pi->pi_query_plain_str == NULL ||
(mysql = (MYSQL *)pi->pi_handle) == NULL ||
(thd = (THD *)mysql->thd) == NULL)
{
ss_dassert(pi->pi_query_plain_str != NULL &&
mysql != NULL &&
thd != NULL);
tables = NULL;
goto retblock;
}
lex->current_select = lex->all_selects_list;
thd->lex->current_select = thd->lex->all_selects_list;
while(thd->lex->current_select){
while(lex->current_select){
tbl = (TABLE_LIST*)skygw_get_affected_tables(thd);
tbl = (TABLE_LIST*)skygw_get_affected_tables(lex);
while (tbl)
{
@ -980,59 +995,98 @@ char** skygw_get_table_names(GWBUF* querybuf,int* tblsize)
tables = tmp;
currtblsz = currtblsz*2 + 1;
}
}
}
tables[i++] = strdup(tbl->alias);
char *catnm = NULL;
if(fullnames)
{
if(tbl->db && strcmp(tbl->db,"skygw_virtual") != 0)
{
catnm = (char*)calloc(strlen(tbl->db) + strlen(tbl->table_name) + 2,sizeof(char));
strcpy(catnm,tbl->db);
strcat(catnm,".");
strcat(catnm,tbl->table_name);
}
}
if(catnm)
{
tables[i++] = catnm;
}
else
{
tables[i++] = strdup(tbl->table_name);
}
tbl=tbl->next_local;
}
thd->lex->current_select = thd->lex->current_select->next_select_in_list();
lex->current_select = lex->current_select->next_select_in_list();
}
retblock:
*tblsize = i;
return tables;
}
/**
* Extract the name of the created table.
* Extract, allocate memory and copy the name of the created table.
* @param querybuf Buffer to use.
* @return A pointer to the name if a table was created, otherwise NULL
*/
char* skygw_get_created_table_name(GWBUF* querybuf)
{
parsing_info_t* pi;
MYSQL* mysql;
THD* thd;
if (!GWBUF_IS_PARSED(querybuf))
LEX* lex;
if((lex = get_lex(querybuf)) == NULL)
{
return NULL;
}
pi = (parsing_info_t *)gwbuf_get_buffer_object_data(querybuf,
GWBUF_PARSING_INFO);
if (pi == NULL)
{
return NULL;
}
if ((mysql = (MYSQL *)pi->pi_handle) == NULL ||
(thd = (THD *)mysql->thd) == NULL)
{
ss_dassert(mysql != NULL &&
thd != NULL);
return NULL;
}
if(thd->lex->create_last_non_select_table &&
thd->lex->create_last_non_select_table->table_name){
char* name = strdup(thd->lex->create_last_non_select_table->table_name);
if(lex->create_last_non_select_table &&
lex->create_last_non_select_table->table_name){
char* name = strdup(lex->create_last_non_select_table->table_name);
return name;
}else{
return NULL;
}
}
/**
* Checks whether the query is a "real" query ie. SELECT,UPDATE,INSERT,DELETE or any variation of these.
* Queries that affect the underlying database are not considered as real queries and the queries that target
* specific row or variable data are regarded as the real queries.
* @param GWBUF to analyze
* @return true if the query is a real query, otherwise false
*/
bool skygw_is_real_query(GWBUF* querybuf)
{
LEX* lex = get_lex(querybuf);
if(lex){
switch(lex->sql_command){
case SQLCOM_SELECT:
return lex->all_selects_list->table_list.elements > 0;
case SQLCOM_UPDATE:
case SQLCOM_INSERT:
case SQLCOM_INSERT_SELECT:
case SQLCOM_DELETE:
case SQLCOM_TRUNCATE:
case SQLCOM_REPLACE:
case SQLCOM_REPLACE_SELECT:
case SQLCOM_PREPARE:
case SQLCOM_EXECUTE:
return true;
default:
return false;
}
}
return false;
}
/**
* Checks whether the buffer contains a DROP TABLE... query.
* @param querybuf Buffer to inspect
@ -1040,31 +1094,10 @@ char* skygw_get_created_table_name(GWBUF* querybuf)
*/
bool is_drop_table_query(GWBUF* querybuf)
{
parsing_info_t* pi;
MYSQL* mysql;
THD* thd;
LEX* lex;
if (!GWBUF_IS_PARSED(querybuf))
{
return false;
}
pi = (parsing_info_t *)gwbuf_get_buffer_object_data(querybuf,
GWBUF_PARSING_INFO);
if (pi == NULL)
{
return false;
}
if ((mysql = (MYSQL *)pi->pi_handle) == NULL ||
(thd = (THD *)mysql->thd) == NULL)
{
ss_dassert(mysql != NULL &&
thd != NULL);
return false;
}
return thd->lex->sql_command == SQLCOM_DROP_TABLE;
return (lex = get_lex(querybuf)) != NULL &&
lex->sql_command == SQLCOM_DROP_TABLE;
}
/*
@ -1264,4 +1297,4 @@ static void parsing_info_set_plain_str(
CHK_PARSING_INFO(pi);
pi->pi_query_plain_str = str;
}
}

View File

@ -76,8 +76,9 @@ skygw_query_type_t query_classifier_get_type(GWBUF* querybuf);
char* skygw_query_classifier_get_stmtname(MYSQL* mysql);
char* skygw_get_created_table_name(GWBUF* querybuf);
bool is_drop_table_query(GWBUF* querybuf);
void* skygw_get_affected_tables(void* thdp);
char** skygw_get_table_names(GWBUF* querybuf,int* tblsize);
bool skygw_is_real_query(GWBUF* querybuf);
void* skygw_get_affected_tables(void* lexptr);
char** skygw_get_table_names(GWBUF* querybuf,int* tblsize,bool fullnames);
char* skygw_get_canonical(GWBUF* querybuf);
bool parse_query (GWBUF* querybuf);
parsing_info_t* parsing_info_init(void (*donefun)(void *));

View File

@ -0,0 +1,44 @@
cmake_minimum_required (VERSION 2.6)
set(CMAKE_LIBRARY_PATH ${CMAKE_LIBRARY_PATH} /usr/lib /usr/lib64 /usr/local/lib /usr/local/lib64 /usr/lib/mariadb /usr/lib64/mariadb)
set(CMAKE_INCLUDE_PATH ${CMAKE_INCLUDE_PATH} /usr/include /usr/local/include /usr/include/mysql /usr/local/include/mysql /usr/include/mariadb /usr/local/include/mariadb)
include(InstallRequiredSystemLibraries)
project (consumer)
find_path(MYSQL_INCLUDE_DIRS mysql.h)
find_library(MYSQL_LIBRARIES NAMES mysqlclient)
find_library(RABBITMQ_C_LIBRARIES NAMES rabbitmq)
include_directories(${MYSQL_INCLUDE_DIRS})
include_directories(${RABBITMQ_C_INCLUDE_DIRS})
include_directories(${CMAKE_SOURCE_DIR}/inih)
add_subdirectory (inih)
link_directories(${CMAKE_SOURCE_DIR}/inih)
if(RABBITMQ_C_LIBRARIES AND MYSQL_LIBRARIES AND MYSQL_INCLUDE_DIRS)
add_executable (consumer consumer.c ${MYSQL_LIBRARIES} ${RABBITMQ_C_LIBRARIES})
target_link_libraries(consumer mysqlclient)
target_link_libraries(consumer rabbitmq)
target_link_libraries(consumer inih)
install(TARGETS consumer DESTINATION bin)
install(FILES consumer.cnf DESTINATION share/consumer)
else(RABBITMQ_C_LIBRARIES AND MYSQL_LIBRARIES AND MYSQL_INCLUDE_DIRS)
message(FATAL_ERROR "Error: Can not find requred libraries: libmysqld, librabbitmq.")
endif(RABBITMQ_C_LIBRARIES AND MYSQL_LIBRARIES AND MYSQL_INCLUDE_DIRS)
set(CPACK_PACKAGE_DESCRIPTION_SUMMARY "RabbitMQ Consumer Client")
set(CPACK_PACKAGE_NAME "RabbitMQ Consumer")
set(CPACK_GENERATOR "RPM")
set(CPACK_PACKAGE_VERSION_MAJOR "1")
set(CPACK_PACKAGE_VERSION_MINOR "0")
set(CPACK_RPM_PACKAGE_NAME "rabbitmq-consumer")
set(CPACK_RPM_PACKAGE_VENDOR "SkySQL Ab")
set(CPACK_RPM_PACKAGE_AUTOREQPROV " no")
include(CPack)

View File

@ -0,0 +1,15 @@
include buildconfig.inc
CC=gcc
CFLAGS=-c -Wall -g -Iinih $(INCLUDE_DIRS)
LDFLAGS= $(LIBRARY_DIRS) -lrabbitmq -lmysqlclient
SRCS= inih/ini.c consumer.c
OBJ=$(SRCS:.c=.o)
all:$(OBJ)
$(CC) $(LDFLAGS) $(OBJ) -o consumer `mysql_config --cflags --libs`
%.o:%.c
$(CC) $(CFLAGS) $< -o $@
clean:
-rm *.o
-rm *~

39
rabbitmq_consumer/README Normal file
View File

@ -0,0 +1,39 @@
This program requires the librabbitmq and libmysqlclient libraries.
librabbitmq-c - https://github.com/alanxz/rabbitmq-c
MariaDB Client Library for C 2.0 Series - https://mariadb.com/kb/en/mariadb/client-libraries/client-library-for-c/
Building with CMake:
'cmake .'
Variables to pass for CMake:
Path to headers -DCMAKE_INCLUDE_PATH=<path to headers>
Path to libraries -DCMAKE_LIBRARY_PATH=<path to libraries>
Install prefix -DCMAKE_INSTALL_PREFIX=<prefix>
Separate multiple folders with colons, for example:
'path1:path2:path3'
After running CMake run 'make' to build the binaries and 'make package' to build RPMs.
To build without CMake, use the provided makefile and update the
include and library directories 'in buildvars.inc'
The configuration for the consumer client are red from 'consumer.cnf'.
Options for the configuration file:
hostname Hostname of the RabbitMQ server
port Port of the RabbitMQ server
vhost Virtual host location of the RabbitMQ server
user Username for the RabbitMQ server
passwd Password for the RabbitMQ server
queue Queue to consume from
dbserver Hostname of the SQL server
dbport Port of the SQL server
dbname Name of the SQL database to use
dbuser Database username
dbpasswd Database passwork
logfile Message log filename

View File

@ -0,0 +1,8 @@
#Use the '-I' prefix for include and '-L' for library directories
#You can use multiple library and include directories
#Path to the rabbitmq-c and mysqlclient libraries
LIBRARY_DIRS :=-L/usr/lib64
#path to headers
INCLUDE_DIRS :=-I/usr/include -I/usr/include/mysql

View File

@ -0,0 +1,524 @@
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <ini.h>
#include <stdint.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include <mysql.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
typedef struct delivery_t
{
uint64_t dtag;
amqp_message_t* message;
struct delivery_t *next,*prev;
}DELIVERY;
typedef struct consumer_t
{
char *hostname,*vhost,*user,*passwd,*queue,*dbserver,*dbname,*dbuser,*dbpasswd;
DELIVERY* query_stack;
int port,dbport;
}CONSUMER;
static int all_ok;
static FILE* out_fd;
static CONSUMER* c_inst;
static char* DB_DATABASE = "CREATE DATABASE IF NOT EXISTS %s;";
static char* DB_TABLE = "CREATE TABLE IF NOT EXISTS pairs (tag VARCHAR(64) PRIMARY KEY NOT NULL, query VARCHAR(2048), reply VARCHAR(2048), date_in DATETIME NOT NULL, date_out DATETIME DEFAULT NULL, counter INT DEFAULT 1)";
static char* DB_INSERT = "INSERT INTO pairs(tag, query, date_in) VALUES ('%s','%s',FROM_UNIXTIME(%s))";
static char* DB_UPDATE = "UPDATE pairs SET reply='%s', date_out=FROM_UNIXTIME(%s) WHERE tag='%s'";
static char* DB_INCREMENT = "UPDATE pairs SET counter = counter+1, date_out=FROM_UNIXTIME(%s) WHERE query='%s'";
void sighndl(int signum)
{
if(signum == SIGINT){
all_ok = 0;
alarm(1);
}
}
int handler(void* user, const char* section, const char* name,
const char* value)
{
if(strcmp(section,"consumer") == 0){
if(strcmp(name,"hostname") == 0){
c_inst->hostname = strdup(value);
}else if(strcmp(name,"vhost") == 0){
c_inst->vhost = strdup(value);
}else if(strcmp(name,"port") == 0){
c_inst->port = atoi(value);
}else if(strcmp(name,"user") == 0){
c_inst->user = strdup(value);
}else if(strcmp(name,"passwd") == 0){
c_inst->passwd = strdup(value);
}else if(strcmp(name,"queue") == 0){
c_inst->queue = strdup(value);
}else if(strcmp(name,"dbserver") == 0){
c_inst->dbserver = strdup(value);
}else if(strcmp(name,"dbport") == 0){
c_inst->dbport = atoi(value);
}else if(strcmp(name,"dbname") == 0){
c_inst->dbname = strdup(value);
}else if(strcmp(name,"dbuser") == 0){
c_inst->dbuser = strdup(value);
}else if(strcmp(name,"dbpasswd") == 0){
c_inst->dbpasswd = strdup(value);
}else if(strcmp(name,"logfile") == 0){
out_fd = fopen(value,"ab");
}
}
return 1;
}
int isPair(amqp_message_t* a, amqp_message_t* b)
{
int keylen = a->properties.correlation_id.len >=
b->properties.correlation_id.len ?
a->properties.correlation_id.len :
b->properties.correlation_id.len;
return strncmp(a->properties.correlation_id.bytes,
b->properties.correlation_id.bytes,
keylen) == 0 ? 1 : 0;
}
int connectToServer(MYSQL* server)
{
mysql_init(server);
mysql_options(server,MYSQL_READ_DEFAULT_GROUP,"client");
mysql_options(server,MYSQL_OPT_USE_REMOTE_CONNECTION,0);
my_bool tr = 1;
mysql_options(server,MYSQL_OPT_RECONNECT,&tr);
MYSQL* result = mysql_real_connect(server,
c_inst->dbserver,
c_inst->dbuser,
c_inst->dbpasswd,
NULL,
c_inst->dbport,
NULL,
0);
if(result==NULL){
fprintf(out_fd,"Error: Could not connect to MySQL server: %s\n",mysql_error(server));
return 0;
}
int bsz = 1024;
char *qstr = calloc(bsz,sizeof(char));
if(!qstr){
fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
return 0;
}
/**Connection ok, check that the database and table exist*/
memset(qstr,0,bsz);
sprintf(qstr,DB_DATABASE,c_inst->dbname);
if(mysql_query(server,qstr)){
fprintf(stderr,"Error: Could not send query MySQL server: %s\n",mysql_error(server));
}
memset(qstr,0,bsz);
sprintf(qstr,"USE %s;",c_inst->dbname);
if(mysql_query(server,qstr)){
fprintf(stderr,"Error: Could not send query MySQL server: %s\n",mysql_error(server));
}
memset(qstr,0,bsz);
sprintf(qstr,DB_TABLE);
if(mysql_query(server,qstr)){
fprintf(stderr,"Error: Could not send query MySQL server: %s\n",mysql_error(server));
}
free(qstr);
return 1;
}
int sendMessage(MYSQL* server, amqp_message_t* msg)
{
int buffsz = (int)((msg->body.len + 1)*2+1) +
(int)((msg->properties.correlation_id.len + 1)*2+1) +
strlen(DB_INSERT),
rval = 0;
char *qstr = calloc(buffsz,sizeof(char)),
*rawmsg = calloc((msg->body.len + 1),sizeof(char)),
*clnmsg = calloc(((msg->body.len + 1)*2+1),sizeof(char)),
*rawdate = calloc((msg->body.len + 1),sizeof(char)),
*clndate = calloc(((msg->body.len + 1)*2+1),sizeof(char)),
*rawtag = calloc((msg->properties.correlation_id.len + 1),sizeof(char)),
*clntag = calloc(((msg->properties.correlation_id.len + 1)*2+1),sizeof(char));
sprintf(qstr,"%.*s",(int)msg->body.len,(char *)msg->body.bytes);
fprintf(out_fd,"Received: %s\n",qstr);
char *ptr = strtok(qstr,"|");
sprintf(rawdate,"%s",ptr);
ptr = strtok(NULL,"\n\0");
if(ptr == NULL){
fprintf(out_fd,"Message content not valid.\n");
rval = 1;
goto cleanup;
}
sprintf(rawmsg,"%s",ptr);
sprintf(rawtag,"%.*s",(int)msg->properties.correlation_id.len,(char *)msg->properties.correlation_id.bytes);
memset(qstr,0,buffsz);
mysql_real_escape_string(server,clnmsg,rawmsg,strnlen(rawmsg,msg->body.len + 1));
mysql_real_escape_string(server,clndate,rawdate,strnlen(rawdate,msg->body.len + 1));
mysql_real_escape_string(server,clntag,rawtag,strnlen(rawtag,msg->properties.correlation_id.len + 1));
if(strncmp(msg->properties.message_id.bytes,
"query",msg->properties.message_id.len) == 0)
{
sprintf(qstr,DB_INCREMENT,clndate,clnmsg);
rval = mysql_query(server,qstr);
if(mysql_affected_rows(server) == 0){
memset(qstr,0,buffsz);
sprintf(qstr,DB_INSERT,clntag,clnmsg,clndate);
rval = mysql_query(server,qstr);
}
}else if(strncmp(msg->properties.message_id.bytes,
"reply",msg->properties.message_id.len) == 0){
sprintf(qstr,DB_UPDATE,clnmsg,clndate,clntag);
rval = mysql_query(server,qstr);
}else{
rval = 1;
goto cleanup;
}
if(rval){
fprintf(stderr,"Could not send query to SQL server:%s\n",mysql_error(server));
goto cleanup;
}
cleanup:
free(qstr);
free(rawmsg);
free(clnmsg);
free(rawdate);
free(clndate);
free(rawtag);
free(clntag);
return rval;
}
int sendToServer(MYSQL* server, amqp_message_t* a, amqp_message_t* b){
amqp_message_t *msg, *reply;
int buffsz = 2048;
char *qstr = calloc(buffsz,sizeof(char));
if(!qstr){
fprintf(out_fd, "Fatal Error: Cannot allocate enough memory.\n");
free(qstr);
return 0;
}
if( a->properties.message_id.len == strlen("query") &&
strncmp(a->properties.message_id.bytes,"query",
a->properties.message_id.len) == 0){
msg = a;
reply = b;
}else{
msg = b;
reply = a;
}
printf("pair: %.*s\nquery: %.*s\nreply: %.*s\n",
(int)msg->properties.correlation_id.len,
(char *)msg->properties.correlation_id.bytes,
(int)msg->body.len,
(char *)msg->body.bytes,
(int)reply->body.len,
(char *)reply->body.bytes);
if((int)msg->body.len +
(int)reply->body.len +
(int)msg->properties.correlation_id.len + 50 >= buffsz)
{
char *qtmp = calloc(buffsz*2,sizeof(char));
free(qstr);
if(qtmp){
qstr = qtmp;
buffsz *= 2;
}else{
fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
return 0;
}
}
char *rawmsg = calloc((msg->body.len + 1),sizeof(char)),
*clnmsg = calloc(((msg->body.len + 1)*2+1),sizeof(char)),
*rawrpl = calloc((reply->body.len + 1),sizeof(char)),
*clnrpl = calloc(((reply->body.len + 1)*2+1),sizeof(char)),
*rawtag = calloc((msg->properties.correlation_id.len + 1),sizeof(char)),
*clntag = calloc(((msg->properties.correlation_id.len + 1)*2+1),sizeof(char));
sprintf(rawmsg,"%.*s",(int)msg->body.len,(char *)msg->body.bytes);
sprintf(rawrpl,"%.*s",(int)reply->body.len,(char *)reply->body.bytes);
sprintf(rawtag,"%.*s",(int)msg->properties.correlation_id.len,(char *)msg->properties.correlation_id.bytes);
char *ptr;
while((ptr = strchr(rawmsg,'\n'))){
*ptr = ' ';
}
while((ptr = strchr(rawrpl,'\n'))){
*ptr = ' ';
}
while((ptr = strchr(rawtag,'\n'))){
*ptr = ' ';
}
mysql_real_escape_string(server,clnmsg,rawmsg,strnlen(rawmsg,msg->body.len + 1));
mysql_real_escape_string(server,clnrpl,rawrpl,strnlen(rawrpl,reply->body.len + 1));
mysql_real_escape_string(server,clntag,rawtag,strnlen(rawtag,msg->properties.correlation_id.len + 1));
sprintf(qstr,"INSERT INTO pairs VALUES ('%s','%s','%s');",clnmsg,clnrpl,clntag);
free(rawmsg);
free(clnmsg);
free(rawrpl);
free(clnrpl);
free(rawtag);
free(clntag);
if(mysql_query(server,qstr)){
fprintf(stderr,"Could not send query to SQL server:%s\n",mysql_error(server));
free(qstr);
return 0;
}
free(qstr);
return 1;
}
int main(int argc, char** argv)
{
int channel = 1, status = AMQP_STATUS_OK, cnfnlen;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
amqp_rpc_reply_t ret;
amqp_message_t *reply = NULL;
amqp_frame_t frame;
struct timeval timeout;
MYSQL db_inst;
char ch, *cnfname = NULL, *cnfpath = NULL;
static const char* fname = "consumer.cnf";
if((c_inst = calloc(1,sizeof(CONSUMER))) == NULL){
fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
return 1;
}
if(signal(SIGINT,sighndl) == SIG_IGN){
signal(SIGINT,SIG_IGN);
}
while((ch = getopt(argc,argv,"c:"))!= -1){
switch(ch){
case 'c':
cnfnlen = strlen(optarg);
cnfpath = strdup(optarg);
break;
default:
break;
}
}
cnfname = calloc(cnfnlen + strlen(fname) + 1,sizeof(char));
if(cnfpath){
/**Config file path as argument*/
strcpy(cnfname,cnfpath);
if(cnfpath[cnfnlen-1] != '/'){
strcat(cnfname,"/");
}
}
strcat(cnfname,fname);
timeout.tv_sec = 1;
timeout.tv_usec = 0;
all_ok = 1;
out_fd = NULL;
/**Parse the INI file*/
if(ini_parse(cnfname,handler,NULL) < 0){
/**Try to parse a config in the same directory*/
if(ini_parse(fname,handler,NULL) < 0){
fprintf(stderr, "Fatal Error: Error parsing configuration file!\n");
goto fatal_error;
}
}
if(out_fd == NULL){
out_fd = stdout;
}
fprintf(out_fd,"\n--------------------------------------------------------------\n");
/**Confirm that all parameters were in the configuration file*/
if(!c_inst->hostname||!c_inst->vhost||!c_inst->user||
!c_inst->passwd||!c_inst->dbpasswd||!c_inst->queue||
!c_inst->dbserver||!c_inst->dbname||!c_inst->dbuser){
fprintf(stderr, "Fatal Error: Inadequate configuration file!\n");
goto fatal_error;
}
connectToServer(&db_inst);
if((conn = amqp_new_connection()) == NULL ||
(socket = amqp_tcp_socket_new(conn)) == NULL){
fprintf(stderr, "Fatal Error: Cannot create connection object or socket.\n");
goto fatal_error;
}
if(amqp_socket_open(socket, c_inst->hostname, c_inst->port)){
fprintf(stderr, "RabbitMQ Error: Cannot open socket.\n");
goto error;
}
ret = amqp_login(conn, c_inst->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, c_inst->user, c_inst->passwd);
if(ret.reply_type != AMQP_RESPONSE_NORMAL){
fprintf(stderr, "RabbitMQ Error: Cannot login to server.\n");
goto error;
}
amqp_channel_open(conn, channel);
ret = amqp_get_rpc_reply(conn);
if(ret.reply_type != AMQP_RESPONSE_NORMAL){
fprintf(stderr, "RabbitMQ Error: Cannot open channel.\n");
goto error;
}
reply = malloc(sizeof(amqp_message_t));
if(!reply){
fprintf(stderr, "Error: Cannot allocate enough memory.\n");
goto error;
}
amqp_basic_consume(conn,channel,amqp_cstring_bytes(c_inst->queue),amqp_empty_bytes,0,0,0,amqp_empty_table);
while(all_ok){
status = amqp_simple_wait_frame_noblock(conn,&frame,&timeout);
/**No frames to read from server, possibly out of messages*/
if(status == AMQP_STATUS_TIMEOUT){
sleep(timeout.tv_sec);
continue;
}
if(frame.payload.method.id == AMQP_BASIC_DELIVER_METHOD){
amqp_basic_deliver_t* decoded = (amqp_basic_deliver_t*)frame.payload.method.decoded;
amqp_read_message(conn,channel,reply,0);
if(sendMessage(&db_inst,reply)){
fprintf(stderr,"RabbitMQ Error: Received malformed message.\n");
amqp_basic_reject(conn,channel,decoded->delivery_tag,0);
amqp_destroy_message(reply);
}else{
amqp_basic_ack(conn,channel,decoded->delivery_tag,0);
amqp_destroy_message(reply);
}
}else{
fprintf(stderr,"RabbitMQ Error: Received method from server: %s\n",amqp_method_name(frame.payload.method.id));
all_ok = 0;
goto error;
}
}
fprintf(out_fd,"Shutting down...\n");
error:
mysql_close(&db_inst);
mysql_library_end();
if(c_inst && c_inst->query_stack){
while(c_inst->query_stack){
DELIVERY* d = c_inst->query_stack->next;
amqp_destroy_message(c_inst->query_stack->message);
free(c_inst->query_stack);
c_inst->query_stack = d;
}
}
amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
fatal_error:
if(out_fd){
fclose(out_fd);
}
if(c_inst){
free(c_inst->hostname);
free(c_inst->vhost);
free(c_inst->user);
free(c_inst->passwd);
free(c_inst->queue);
free(c_inst->dbserver);
free(c_inst->dbname);
free(c_inst->dbuser);
free(c_inst->dbpasswd);
free(c_inst);
}
return all_ok;
}

View File

@ -0,0 +1,28 @@
#
#The options for the consumer are:
#hostname RabbitMQ hostname
#port RabbitMQ port
#vhost RabbitMQ virtual host
#user RabbitMQ username
#passwd RabbitMQ password
#queue Name of the queue to use
#dbserver SQL server name
#dbport SQL server port
#dbname Name of the databse to use
#dbuser SQL server username
#dbpasswd SQL server password
#logfile Message log filename
#
[consumer]
hostname=127.0.0.1
port=5673
vhost=/
user=guest
passwd=guest
queue=q1
dbserver=127.0.0.1
dbport=3000
dbname=mqpairs
dbuser=maxuser
dbpasswd=maxpwd
#logfile=consumer.log

Binary file not shown.

Binary file not shown.

BIN
rabbitmq_consumer/inih/._cpp Executable file

Binary file not shown.

BIN
rabbitmq_consumer/inih/._examples Executable file

Binary file not shown.

BIN
rabbitmq_consumer/inih/._extra Executable file

Binary file not shown.

BIN
rabbitmq_consumer/inih/._ini.c Executable file

Binary file not shown.

BIN
rabbitmq_consumer/inih/._ini.h Executable file

Binary file not shown.

BIN
rabbitmq_consumer/inih/._tests Executable file

Binary file not shown.

3
rabbitmq_consumer/inih/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
*.o
*.a
make.depend

View File

@ -0,0 +1 @@
add_library(inih ini.c)

View File

@ -0,0 +1,27 @@
The "inih" library is distributed under the New BSD license:
Copyright (c) 2009, Brush Technology
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Brush Technology nor the names of its contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY BRUSH TECHNOLOGY ''AS IS'' AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL BRUSH TECHNOLOGY BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,5 @@
inih is a simple .INI file parser written in C, released under the New BSD
license (see LICENSE.txt). Go to the project home page for more info:
http://code.google.com/p/inih/

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,67 @@
// Read an INI file into easy-to-access name/value pairs.
#include <algorithm>
#include <cctype>
#include <cstdlib>
#include "../ini.h"
#include "INIReader.h"
using std::string;
INIReader::INIReader(string filename)
{
_error = ini_parse(filename.c_str(), ValueHandler, this);
}
int INIReader::ParseError()
{
return _error;
}
string INIReader::Get(string section, string name, string default_value)
{
string key = MakeKey(section, name);
return _values.count(key) ? _values[key] : default_value;
}
long INIReader::GetInteger(string section, string name, long default_value)
{
string valstr = Get(section, name, "");
const char* value = valstr.c_str();
char* end;
// This parses "1234" (decimal) and also "0x4D2" (hex)
long n = strtol(value, &end, 0);
return end > value ? n : default_value;
}
bool INIReader::GetBoolean(string section, string name, bool default_value)
{
string valstr = Get(section, name, "");
// Convert to lower case to make string comparisons case-insensitive
std::transform(valstr.begin(), valstr.end(), valstr.begin(), ::tolower);
if (valstr == "true" || valstr == "yes" || valstr == "on" || valstr == "1")
return true;
else if (valstr == "false" || valstr == "no" || valstr == "off" || valstr == "0")
return false;
else
return default_value;
}
string INIReader::MakeKey(string section, string name)
{
string key = section + "." + name;
// Convert to lower case to make section/name lookups case-insensitive
std::transform(key.begin(), key.end(), key.begin(), ::tolower);
return key;
}
int INIReader::ValueHandler(void* user, const char* section, const char* name,
const char* value)
{
INIReader* reader = (INIReader*)user;
string key = MakeKey(section, name);
if (reader->_values[key].size() > 0)
reader->_values[key] += "\n";
reader->_values[key] += value;
return 1;
}

View File

@ -0,0 +1,48 @@
// Read an INI file into easy-to-access name/value pairs.
// inih and INIReader are released under the New BSD license (see LICENSE.txt).
// Go to the project home page for more info:
//
// http://code.google.com/p/inih/
#ifndef __INIREADER_H__
#define __INIREADER_H__
#include <map>
#include <string>
// Read an INI file into easy-to-access name/value pairs. (Note that I've gone
// for simplicity here rather than speed, but it should be pretty decent.)
class INIReader
{
public:
// Construct INIReader and parse given filename. See ini.h for more info
// about the parsing.
INIReader(std::string filename);
// Return the result of ini_parse(), i.e., 0 on success, line number of
// first error on parse error, or -1 on file open error.
int ParseError();
// Get a string value from INI file, returning default_value if not found.
std::string Get(std::string section, std::string name,
std::string default_value);
// Get an integer (long) value from INI file, returning default_value if
// not found or not a valid integer (decimal "1234", "-1234", or hex "0x4d2").
long GetInteger(std::string section, std::string name, long default_value);
// Get a boolean value from INI file, returning default_value if not found or if
// not a valid true/false value. Valid true values are "true", "yes", "on", "1",
// and valid false values are "false", "no", "off", "0" (not case sensitive).
bool GetBoolean(std::string section, std::string name, bool default_value);
private:
int _error;
std::map<std::string, std::string> _values;
static std::string MakeKey(std::string section, std::string name);
static int ValueHandler(void* user, const char* section, const char* name,
const char* value);
};
#endif // __INIREADER_H__

View File

@ -0,0 +1,20 @@
// Example that shows simple usage of the INIReader class
#include <iostream>
#include "INIReader.h"
int main()
{
INIReader reader("../examples/test.ini");
if (reader.ParseError() < 0) {
std::cout << "Can't load 'test.ini'\n";
return 1;
}
std::cout << "Config loaded from 'test.ini': version="
<< reader.GetInteger("protocol", "version", -1) << ", name="
<< reader.Get("user", "name", "UNKNOWN") << ", email="
<< reader.Get("user", "email", "UNKNOWN") << ", active="
<< reader.GetBoolean("user", "active", true) << "\n";
return 0;
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,8 @@
// CFG(section, name, default)
CFG(protocol, version, "0")
CFG(user, name, "Fatty Lumpkin")
CFG(user, email, "fatty@lumpkin.com")
#undef CFG

View File

@ -0,0 +1,40 @@
/* ini.h example that simply dumps an INI file without comments */
#include <stdio.h>
#include <string.h>
#include "../ini.h"
static int dumper(void* user, const char* section, const char* name,
const char* value)
{
static char prev_section[50] = "";
if (strcmp(section, prev_section)) {
printf("%s[%s]\n", (prev_section[0] ? "\n" : ""), section);
strncpy(prev_section, section, sizeof(prev_section));
prev_section[sizeof(prev_section) - 1] = '\0';
}
printf("%s = %s\n", name, value);
return 1;
}
int main(int argc, char* argv[])
{
int error;
if (argc <= 1) {
printf("Usage: ini_dump filename.ini\n");
return 1;
}
error = ini_parse(argv[1], dumper, NULL);
if (error < 0) {
printf("Can't read '%s'!\n", argv[1]);
return 2;
}
else if (error) {
printf("Bad config file (first error on line %d)!\n", error);
return 3;
}
return 0;
}

View File

@ -0,0 +1,44 @@
/* Example: parse a simple configuration file */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "../ini.h"
typedef struct
{
int version;
const char* name;
const char* email;
} configuration;
static int handler(void* user, const char* section, const char* name,
const char* value)
{
configuration* pconfig = (configuration*)user;
#define MATCH(s, n) strcmp(section, s) == 0 && strcmp(name, n) == 0
if (MATCH("protocol", "version")) {
pconfig->version = atoi(value);
} else if (MATCH("user", "name")) {
pconfig->name = strdup(value);
} else if (MATCH("user", "email")) {
pconfig->email = strdup(value);
} else {
return 0; /* unknown section/name, error */
}
return 1;
}
int main(int argc, char* argv[])
{
configuration config;
if (ini_parse("test.ini", handler, &config) < 0) {
printf("Can't load 'test.ini'\n");
return 1;
}
printf("Config loaded from 'test.ini': version=%d, name=%s, email=%s\n",
config.version, config.name, config.email);
return 0;
}

View File

@ -0,0 +1,46 @@
/* Parse a configuration file into a struct using X-Macros */
#include <stdio.h>
#include <string.h>
#include "../ini.h"
/* define the config struct type */
typedef struct {
#define CFG(s, n, default) char *s##_##n;
#include "config.def"
} config;
/* create one and fill in its default values */
config Config = {
#define CFG(s, n, default) default,
#include "config.def"
};
/* process a line of the INI file, storing valid values into config struct */
int handler(void *user, const char *section, const char *name,
const char *value)
{
config *cfg = (config *)user;
if (0) ;
#define CFG(s, n, default) else if (strcmp(section, #s)==0 && \
strcmp(name, #n)==0) cfg->s##_##n = strdup(value);
#include "config.def"
return 1;
}
/* print all the variables in the config, one per line */
void dump_config(config *cfg)
{
#define CFG(s, n, default) printf("%s_%s = %s\n", #s, #n, cfg->s##_##n);
#include "config.def"
}
int main(int argc, char* argv[])
{
if (ini_parse("test.ini", handler, &Config) < 0)
printf("Can't load 'test.ini', using defaults\n");
dump_config(&Config);
return 0;
}

View File

@ -0,0 +1,9 @@
; Test config file for ini_example.c and INIReaderTest.cpp
[protocol] ; Protocol configuration
version=6 ; IPv6
[user]
name = Bob Smith ; Spaces around '=' are stripped
email = bob@smith.com ; And comments (like this) ignored
active = true ; Test a boolean

Binary file not shown.

View File

@ -0,0 +1,19 @@
# Simple makefile to build inih as a static library using g++
SRC = ../ini.c
OBJ = $(SRC:.c=.o)
OUT = libinih.a
INCLUDES = -I..
CCFLAGS = -g -O2
CC = g++
default: $(OUT)
.c.o:
$(CC) $(INCLUDES) $(CCFLAGS) $(EXTRACCFLAGS) -c $< -o $@
$(OUT): $(OBJ)
ar rcs $(OUT) $(OBJ) $(EXTRAARFLAGS)
clean:
rm -f $(OBJ) $(OUT)

176
rabbitmq_consumer/inih/ini.c Executable file
View File

@ -0,0 +1,176 @@
/* inih -- simple .INI file parser
inih is released under the New BSD license (see LICENSE.txt). Go to the project
home page for more info:
http://code.google.com/p/inih/
*/
#include <stdio.h>
#include <ctype.h>
#include <string.h>
#include "ini.h"
#if !INI_USE_STACK
#include <stdlib.h>
#endif
#define MAX_SECTION 50
#define MAX_NAME 50
/* Strip whitespace chars off end of given string, in place. Return s. */
static char* rstrip(char* s)
{
char* p = s + strlen(s);
while (p > s && isspace((unsigned char)(*--p)))
*p = '\0';
return s;
}
/* Return pointer to first non-whitespace char in given string. */
static char* lskip(const char* s)
{
while (*s && isspace((unsigned char)(*s)))
s++;
return (char*)s;
}
/* Return pointer to first char c or ';' comment in given string, or pointer to
null at end of string if neither found. ';' must be prefixed by a whitespace
character to register as a comment. */
static char* find_char_or_comment(const char* s, char c)
{
int was_whitespace = 0;
while (*s && *s != c && !(was_whitespace && *s == ';')) {
was_whitespace = isspace((unsigned char)(*s));
s++;
}
return (char*)s;
}
/* Version of strncpy that ensures dest (size bytes) is null-terminated. */
static char* strncpy0(char* dest, const char* src, size_t size)
{
strncpy(dest, src, size);
dest[size - 1] = '\0';
return dest;
}
/* See documentation in header file. */
int ini_parse_file(FILE* file,
int (*handler)(void*, const char*, const char*,
const char*),
void* user)
{
/* Uses a fair bit of stack (use heap instead if you need to) */
#if INI_USE_STACK
char line[INI_MAX_LINE];
#else
char* line;
#endif
char section[MAX_SECTION] = "";
char prev_name[MAX_NAME] = "";
char* start;
char* end;
char* name;
char* value;
int lineno = 0;
int error = 0;
#if !INI_USE_STACK
line = (char*)malloc(INI_MAX_LINE);
if (!line) {
return -2;
}
#endif
/* Scan through file line by line */
while (fgets(line, INI_MAX_LINE, file) != NULL) {
lineno++;
start = line;
#if INI_ALLOW_BOM
if (lineno == 1 && (unsigned char)start[0] == 0xEF &&
(unsigned char)start[1] == 0xBB &&
(unsigned char)start[2] == 0xBF) {
start += 3;
}
#endif
start = lskip(rstrip(start));
if (*start == ';' || *start == '#') {
/* Per Python ConfigParser, allow '#' comments at start of line */
}
#if INI_ALLOW_MULTILINE
else if (*prev_name && *start && start > line) {
/* Non-black line with leading whitespace, treat as continuation
of previous name's value (as per Python ConfigParser). */
if (!handler(user, section, prev_name, start) && !error)
error = lineno;
}
#endif
else if (*start == '[') {
/* A "[section]" line */
end = find_char_or_comment(start + 1, ']');
if (*end == ']') {
*end = '\0';
strncpy0(section, start + 1, sizeof(section));
*prev_name = '\0';
}
else if (!error) {
/* No ']' found on section line */
error = lineno;
}
}
else if (*start && *start != ';') {
/* Not a comment, must be a name[=:]value pair */
end = find_char_or_comment(start, '=');
if (*end != '=') {
end = find_char_or_comment(start, ':');
}
if (*end == '=' || *end == ':') {
*end = '\0';
name = rstrip(start);
value = lskip(end + 1);
end = find_char_or_comment(value, '\0');
if (*end == ';')
*end = '\0';
rstrip(value);
/* Valid name[=:]value pair found, call handler */
strncpy0(prev_name, name, sizeof(prev_name));
if (!handler(user, section, name, value) && !error)
error = lineno;
}
else if (!error) {
/* No '=' or ':' found on name[=:]value line */
error = lineno;
}
}
}
#if !INI_USE_STACK
free(line);
#endif
return error;
}
/* See documentation in header file. */
int ini_parse(const char* filename,
int (*handler)(void*, const char*, const char*, const char*),
void* user)
{
FILE* file;
int error;
file = fopen(filename, "r");
if (!file)
return -1;
error = ini_parse_file(file, handler, user);
fclose(file);
return error;
}

72
rabbitmq_consumer/inih/ini.h Executable file
View File

@ -0,0 +1,72 @@
/* inih -- simple .INI file parser
inih is released under the New BSD license (see LICENSE.txt). Go to the project
home page for more info:
http://code.google.com/p/inih/
*/
#ifndef __INI_H__
#define __INI_H__
/* Make this header file easier to include in C++ code */
#ifdef __cplusplus
extern "C" {
#endif
#include <stdio.h>
/* Parse given INI-style file. May have [section]s, name=value pairs
(whitespace stripped), and comments starting with ';' (semicolon). Section
is "" if name=value pair parsed before any section heading. name:value
pairs are also supported as a concession to Python's ConfigParser.
For each name=value pair parsed, call handler function with given user
pointer as well as section, name, and value (data only valid for duration
of handler call). Handler should return nonzero on success, zero on error.
Returns 0 on success, line number of first error on parse error (doesn't
stop on first error), -1 on file open error, or -2 on memory allocation
error (only when INI_USE_STACK is zero).
*/
int ini_parse(const char* filename,
int (*handler)(void* user, const char* section,
const char* name, const char* value),
void* user);
/* Same as ini_parse(), but takes a FILE* instead of filename. This doesn't
close the file when it's finished -- the caller must do that. */
int ini_parse_file(FILE* file,
int (*handler)(void* user, const char* section,
const char* name, const char* value),
void* user);
/* Nonzero to allow multi-line value parsing, in the style of Python's
ConfigParser. If allowed, ini_parse() will call the handler with the same
name for each subsequent line parsed. */
#ifndef INI_ALLOW_MULTILINE
#define INI_ALLOW_MULTILINE 1
#endif
/* Nonzero to allow a UTF-8 BOM sequence (0xEF 0xBB 0xBF) at the start of
the file. See http://code.google.com/p/inih/issues/detail?id=21 */
#ifndef INI_ALLOW_BOM
#define INI_ALLOW_BOM 1
#endif
/* Nonzero to use stack, zero to use heap (malloc/free). */
#ifndef INI_USE_STACK
#define INI_USE_STACK 1
#endif
/* Maximum line length for any line in INI file. */
#ifndef INI_MAX_LINE
#define INI_MAX_LINE 200
#endif
#ifdef __cplusplus
}
#endif
#endif /* __INI_H__ */

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1 @@
This is an error

View File

@ -0,0 +1 @@
indented

View File

@ -0,0 +1,5 @@
[section1]
name1=value1
[section2
[section3 ; comment ]
name2=value2

View File

@ -0,0 +1,47 @@
no_file.ini: e=-1 user=0
... [section1]
... one=This is a test;
... two=1234;
... [ section 2 ]
... happy=4;
... sad=;
... [comment_test]
... test1=1;2;3;
... test2=2;3;4;this won't be a comment, needs whitespace before ';';
... test;3=345;
... test4=4#5#6;
... [colon_tests]
... Content-Type=text/html;
... foo=bar;
... adams=42;
normal.ini: e=0 user=101
... [section1]
... name1=value1;
... name2=value2;
bad_section.ini: e=3 user=102
bad_comment.ini: e=1 user=102
... [section]
... a=b;
... user=parse_error;
... c=d;
user_error.ini: e=3 user=104
... [section1]
... single1=abc;
... multi=this is a;
... multi=multi-line value;
... single2=xyz;
... [section2]
... multi=a;
... multi=b;
... multi=c;
... [section3]
... single=ghi;
... multi=the quick;
... multi=brown fox;
... name=bob smith;
multi_line.ini: e=0 user=105
bad_multi.ini: e=1 user=105
... [bom_section]
... bom_name=bom_value;
... key“=value“;
bom.ini: e=0 user=107

View File

@ -0,0 +1,43 @@
no_file.ini: e=-1 user=0
... [section1]
... one=This is a test;
... two=1234;
... [ section 2 ]
... happy=4;
... sad=;
... [comment_test]
... test1=1;2;3;
... test2=2;3;4;this won't be a comment, needs whitespace before ';';
... test;3=345;
... test4=4#5#6;
... [colon_tests]
... Content-Type=text/html;
... foo=bar;
... adams=42;
normal.ini: e=0 user=101
... [section1]
... name1=value1;
... name2=value2;
bad_section.ini: e=3 user=102
bad_comment.ini: e=1 user=102
... [section]
... a=b;
... user=parse_error;
... c=d;
user_error.ini: e=3 user=104
... [section1]
... single1=abc;
... multi=this is a;
... single2=xyz;
... [section2]
... multi=a;
... [section3]
... single=ghi;
... multi=the quick;
... name=bob smith;
multi_line.ini: e=4 user=105
bad_multi.ini: e=1 user=105
... [bom_section]
... bom_name=bom_value;
... key“=value“;
bom.ini: e=0 user=107

View File

@ -0,0 +1,3 @@
[bom_section]
bom_name=bom_value
key“ = value“

View File

@ -0,0 +1,15 @@
[section1]
single1 = abc
multi = this is a
multi-line value
single2 = xyz
[section2]
multi = a
b
c
[section3]
single: ghi
multi: the quick
brown fox
name = bob smith ; comment line 1
; comment line 2

View File

@ -0,0 +1,25 @@
; This is an INI file
[section1] ; section comment
one=This is a test ; name=value comment
two = 1234
; x=y
[ section 2 ]
happy = 4
sad =
[empty]
; do nothing
[comment_test]
test1 = 1;2;3 ; only this will be a comment
test2 = 2;3;4;this won't be a comment, needs whitespace before ';'
test;3 = 345 ; key should be "test;3"
test4 = 4#5#6 ; '#' only starts a comment at start of line
#test5 = 567 ; entire line commented
# test6 = 678 ; entire line commented, except in MULTILINE mode
[colon_tests]
Content-Type: text/html
foo:bar
adams : 42

View File

@ -0,0 +1,2 @@
@call tcc ..\ini.c -I..\ -run unittest.c > baseline_multi.txt
@call tcc ..\ini.c -I..\ -DINI_ALLOW_MULTILINE=0 -run unittest.c > baseline_single.txt

View File

@ -0,0 +1,58 @@
/* inih -- unit tests
This works simply by dumping a bunch of info to standard output, which is
redirected to an output file (baseline_*.txt) and checked into the Subversion
repository. This baseline file is the test output, so the idea is to check it
once, and if it changes -- look at the diff and see which tests failed.
Here's how I produced the two baseline files (with Tiny C Compiler):
tcc -DINI_ALLOW_MULTILINE=1 ../ini.c -run unittest.c > baseline_multi.txt
tcc -DINI_ALLOW_MULTILINE=0 ../ini.c -run unittest.c > baseline_single.txt
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "../ini.h"
int User;
char Prev_section[50];
int dumper(void* user, const char* section, const char* name,
const char* value)
{
User = (int)user;
if (strcmp(section, Prev_section)) {
printf("... [%s]\n", section);
strncpy(Prev_section, section, sizeof(Prev_section));
Prev_section[sizeof(Prev_section) - 1] = '\0';
}
printf("... %s=%s;\n", name, value);
return strcmp(name, "user")==0 && strcmp(value, "parse_error")==0 ? 0 : 1;
}
void parse(const char* fname) {
static int u = 100;
int e;
*Prev_section = '\0';
e = ini_parse(fname, dumper, (void*)u);
printf("%s: e=%d user=%d\n", fname, e, User);
u++;
}
int main(void)
{
parse("no_file.ini");
parse("normal.ini");
parse("bad_section.ini");
parse("bad_comment.ini");
parse("user_error.ini");
parse("multi_line.ini");
parse("bad_multi.ini");
parse("bom.ini");
return 0;
}

View File

@ -0,0 +1,4 @@
[section]
a = b
user = parse_error
c = d

View File

@ -0,0 +1,55 @@
%define _topdir %(echo $PWD)/
%define name rabbitmq-message-consumer
%define release beta
%define version 1.0
%define install_path /usr/local/skysql/maxscale/extra/consumer/
BuildRoot: %{buildroot}
Summary: rabbitmq-message-consumer
License: GPL
Name: %{name}
Version: %{version}
Release: %{release}
Source: %{name}-%{version}-%{release}.tar.gz
Prefix: /
Group: Development/Tools
Requires: maxscale
%if 0%{?suse_version}
BuildRequires: gcc gcc-c++ ncurses-devel bison glibc-devel cmake libgcc_s1 perl make libtool libopenssl-devel libaio libaio-devel mariadb libedit-devel librabbitmq-devel MariaDB-shared
%else
BuildRequires: gcc gcc-c++ ncurses-devel bison glibc-devel cmake libgcc perl make libtool openssl-devel libaio libaio-devel librabbitmq-devel MariaDB-shared
%if 0%{?rhel} == 6
BuildRequires: libedit-devel
%endif
%if 0%{?rhel} == 7
BuildRequires: mariadb-devel mariadb-embedded-devel libedit-devel
%else
BuildRequires: MariaDB-devel MariaDB-server
%endif
%endif
%description
rabbitmq-message-consumer
%prep
%setup -q
%build
make clean
make
%install
mkdir -p $RPM_BUILD_ROOT%{install_path}
cp consumer $RPM_BUILD_ROOT%{install_path}
cp consumer.cnf $RPM_BUILD_ROOT%{install_path}
%clean
%files
%defattr(-,root,root)
%{install_path}/consumer
%{install_path}/consumer.cnf
%changelog

View File

@ -21,15 +21,16 @@
include ../../../build_gateway.inc
LOGPATH := $(ROOT_PATH)/log_manager
QCLASSPATH := $(ROOT_PATH)/query_classifier
UTILSPATH := $(ROOT_PATH)/utils
CC=cc
CFLAGS=-c -fPIC -I/usr/include -I../include -I../../include -I$(LOGPATH) \
-I$(UTILSPATH) -Wall -g
CFLAGS=-c -fPIC -I/usr/include -I../include -I../../include -I$(LOGPATH) -I$(QCLASSPATH) \
-I$(UTILSPATH) -I$(MYSQL_ROOT) -Wall -g
include ../../../makefile.inc
LDFLAGS=-shared -L$(LOGPATH) -Wl,-rpath,$(DEST)/lib \
LDFLAGS=-shared -L$(LOGPATH) -L$(QCLASSPATH) -Wl,-rpath,$(DEST)/lib \
-Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH)
TESTSRCS=testfilter.c
@ -42,10 +43,12 @@ TOPNSRCS=topfilter.c
TOPNOBJ=$(TOPNSRCS:.c=.o)
TEESRCS=tee.c
TEEOBJ=$(TEESRCS:.c=.o)
SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) $(TOPNSRCS) $(TEESRCS)
MQSRCS=mqfilter.c
MQOBJ=$(MQSRCS:.c=.o)
SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) $(TOPNSRCS) $(TEESRCS) $(MQSRCS)
OBJ=$(SRCS:.c=.o)
LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager
MODULES= libtestfilter.so libqlafilter.so libregexfilter.so libtopfilter.so libtee.so libhintfilter.so
LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager -lrabbitmq -lquery_classifier
MODULES= libtestfilter.so libqlafilter.so libregexfilter.so libtopfilter.so libtee.so libmqfilter.so
all: $(MODULES)
@ -53,6 +56,9 @@ all: $(MODULES)
libtestfilter.so: $(TESTOBJ)
$(CC) $(LDFLAGS) $(TESTOBJ) $(LIBS) -o $@
libmqfilter.so: $(MQOBJ)
$(CC) $(LDFLAGS) $(MQOBJ) $(LIBS) -o $@
libqlafilter.so: $(QLAOBJ)
$(CC) $(LDFLAGS) $(QLAOBJ) $(LIBS) -o $@
@ -66,8 +72,7 @@ libtee.so: $(TEEOBJ)
$(CC) $(LDFLAGS) $(TEEOBJ) $(LIBS) -o $@
libhintfilter.so:
(cd hint; touch depend.mk ; make; cp $@ ..)
# (cd hint; touch depend.mk ; make; cp $@ ..)
.c.o:
$(CC) $(CFLAGS) $< -o $@
@ -78,26 +83,26 @@ clean:
tags:
ctags $(SRCS) $(HDRS)
(cd hint; touch depend.mk; make tags)
# (cd hint; touch depend.mk; make tags)
depend:
@$(DEL) depend.mk
@rm -f depend.mk
cc -M $(CFLAGS) $(SRCS) > depend.mk
(cd hint; touch depend.mk; make depend)
# (cd hint; touch depend.mk; make depend)
install: $(MODULES)
install -D $(MODULES) $(DEST)/modules
cleantests:
$(MAKE) -C test cleantests
buildtests:
$(MAKE) -C test DEBUG=Y buildtests
runtests:
$(MAKE) -C test runtests
testall:
$(MAKE) -C test testall
include depend.mk

File diff suppressed because it is too large Load Diff

View File

@ -1215,7 +1215,255 @@ static route_target_t get_route_target (
return target;
}
/**
* Check if the query is a DROP TABLE... query and
* if it targets a temporary table, remove it from the hashtable.
* @param instance Router instance
* @param router_session Router client session
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
*/
void check_drop_tmp_table(
ROUTER* instance,
void* router_session,
GWBUF* querybuf,
skygw_query_type_t type)
{
int tsize = 0, klen = 0,i;
char** tbl;
char *hkey,*dbname;
MYSQL_session* data;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
DCB* master_dcb = NULL;
rses_property_t* rses_prop_tmp;
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
CHK_DCB(master_dcb);
data = (MYSQL_session*)master_dcb->session->data;
dbname = (char*)data->db;
if (is_drop_table_query(querybuf))
{
tbl = skygw_get_table_names(querybuf,&tsize,false);
for(i = 0; i<tsize; i++)
{
klen = strlen(dbname) + strlen(tbl[i]) + 2;
hkey = calloc(klen,sizeof(char));
strcpy(hkey,dbname);
strcat(hkey,".");
strcat(hkey,tbl[i]);
if (rses_prop_tmp &&
rses_prop_tmp->rses_prop_data.temp_tables)
{
if (hashtable_delete(rses_prop_tmp->rses_prop_data.temp_tables,
(void *)hkey))
{
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"Temporary table dropped: %s",hkey)));
}
}
free(tbl[i]);
free(hkey);
}
free(tbl);
}
}
/**
* Check if the query targets a temporary table.
* @param instance Router instance
* @param router_session Router client session
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
* @return The type of the query
*/
skygw_query_type_t is_read_tmp_table(
ROUTER* instance,
void* router_session,
GWBUF* querybuf,
skygw_query_type_t type)
{
bool target_tmp_table = false;
int tsize = 0, klen = 0,i;
char** tbl;
char *hkey,*dbname;
MYSQL_session* data;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
DCB* master_dcb = NULL;
skygw_query_type_t qtype = type;
rses_property_t* rses_prop_tmp;
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
CHK_DCB(master_dcb);
data = (MYSQL_session*)master_dcb->session->data;
dbname = (char*)data->db;
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ))
{
tbl = skygw_get_table_names(querybuf,&tsize,false);
if (tsize > 0)
{
/** Query targets at least one table */
for(i = 0; i<tsize && !target_tmp_table && tbl[i]; i++)
{
klen = strlen(dbname) + strlen(tbl[i]) + 2;
hkey = calloc(klen,sizeof(char));
strcpy(hkey,dbname);
strcat(hkey,".");
strcat(hkey,tbl[i]);
if (rses_prop_tmp &&
rses_prop_tmp->rses_prop_data.temp_tables)
{
if( (target_tmp_table =
(bool)hashtable_fetch(rses_prop_tmp->rses_prop_data.temp_tables,(void *)hkey)))
{
/**Query target is a temporary table*/
qtype = QUERY_TYPE_READ_TMP_TABLE;
LOGIF(LT,
(skygw_log_write(LOGFILE_TRACE,
"Query targets a temporary table: %s",hkey)));
}
}
free(hkey);
free(tbl[i]);
}
free(tbl);
}
}
return qtype;
}
/**
* If query is of type QUERY_TYPE_CREATE_TMP_TABLE then find out
* the database and table name, create a hashvalue and
* add it to the router client session's property. If property
* doesn't exist then create it first.
*
* @param instance Router instance
* @param router_session Router client session
* @param querybuf GWBUF containing the query
* @param type The type of the query resolved so far
*/
void check_create_tmp_table(
ROUTER* instance,
void* router_session,
GWBUF* querybuf,
skygw_query_type_t type)
{
int klen = 0;
char *hkey,*dbname;
MYSQL_session* data;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
DCB* master_dcb = NULL;
rses_property_t* rses_prop_tmp;
HASHTABLE* h;
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
CHK_DCB(master_dcb);
data = (MYSQL_session*)master_dcb->session->data;
dbname = (char*)data->db;
if (QUERY_IS_TYPE(type, QUERY_TYPE_CREATE_TMP_TABLE))
{
bool is_temp = true;
char* tblname = NULL;
tblname = skygw_get_created_table_name(querybuf);
if (tblname && strlen(tblname) > 0)
{
klen = strlen(dbname) + strlen(tblname) + 2;
hkey = calloc(klen,sizeof(char));
strcpy(hkey,dbname);
strcat(hkey,".");
strcat(hkey,tblname);
}
else
{
hkey = NULL;
}
if(rses_prop_tmp == NULL)
{
if((rses_prop_tmp =
(rses_property_t*)calloc(1,sizeof(rses_property_t))))
{
#if defined(SS_DEBUG)
rses_prop_tmp->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY;
rses_prop_tmp->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY;
#endif
rses_prop_tmp->rses_prop_rsession = router_cli_ses;
rses_prop_tmp->rses_prop_refcount = 1;
rses_prop_tmp->rses_prop_next = NULL;
rses_prop_tmp->rses_prop_type = RSES_PROP_TYPE_TMPTABLES;
router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES] = rses_prop_tmp;
}
}
if (rses_prop_tmp->rses_prop_data.temp_tables == NULL)
{
h = hashtable_alloc(7, hashkeyfun, hashcmpfun);
hashtable_memory_fns(h,hstrdup,NULL,hfree,NULL);
if (h != NULL)
{
rses_prop_tmp->rses_prop_data.temp_tables = h;
}
}
if (hkey &&
hashtable_add(rses_prop_tmp->rses_prop_data.temp_tables,
(void *)hkey,
(void *)is_temp) == 0) /*< Conflict in hash table */
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Temporary table conflict in hashtable: %s",
hkey)));
}
#if defined(SS_DEBUG)
{
bool retkey =
hashtable_fetch(
rses_prop_tmp->rses_prop_data.temp_tables,
hkey);
if (retkey)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Temporary table added: %s",
hkey)));
}
}
#endif
free(hkey);
free(tblname);
}
}
/**
* The main routing entry, this is called with every packet that is
@ -1246,28 +1494,15 @@ static int routeQuery(
GWBUF* querybuf)
{
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
GWBUF* plainsqlbuf = NULL;
char* querystr = NULL;
char* startpos;
mysql_server_cmd_t packet_type;
uint8_t* packet;
int ret = 0;
int tsize = 0;
int klen = 0;
int i = 0;
DCB* master_dcb = NULL;
DCB* target_dcb = NULL;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
rses_property_t* rses_prop_tmp;
bool rses_is_closed = false;
bool target_tmp_table = false;
char* dbname;
char* hkey;
char** tbl;
HASHTABLE* h;
MYSQL_session* data;
size_t len;
route_target_t route_target;
bool succp = false;
int rlag_max = MAX_RLAG_UNDEFINED;
@ -1285,7 +1520,7 @@ static int routeQuery(
packet = GWBUF_DATA(querybuf);
packet_type = packet[4];
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
if (rses_is_closed)
{
@ -1317,8 +1552,6 @@ static int routeQuery(
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
CHK_DCB(master_dcb);
data = (MYSQL_session*)master_dcb->session->data;
dbname = data->db;
switch(packet_type) {
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
@ -1364,48 +1597,15 @@ static int routeQuery(
break;
} /**< switch by packet type */
/**
* Check if the query targets a temporary table
* Check if the query has anything to do with temporary tables.
*/
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ))
{
tbl = skygw_get_table_names(querybuf,&tsize);
if (tsize > 0)
{
/** Query targets at least one table */
for(i = 0; i<tsize && !target_tmp_table && tbl[i]; i++)
{
klen = strlen(dbname) + strlen(tbl[i]) + 2;
hkey = calloc(klen,sizeof(char));
strcpy(hkey,dbname);
strcat(hkey,".");
strcat(hkey,tbl[0]);
qtype = is_read_tmp_table(instance,router_session,querybuf,qtype);
check_create_tmp_table(instance,router_session,querybuf,qtype);
check_drop_tmp_table(instance,router_session,querybuf,qtype);
if (rses_prop_tmp &&
rses_prop_tmp->rses_prop_data.temp_tables)
{
if( (target_tmp_table =
(bool)hashtable_fetch(rses_prop_tmp->rses_prop_data.temp_tables,(void *)hkey)))
{
/**Query target is a temporary table*/
qtype = QUERY_TYPE_READ_TMP_TABLE;
LOGIF(LT,
(skygw_log_write(LOGFILE_TRACE,
"Query targets a temporary table: %s",hkey)));
}
}
free(hkey);
}
for (i = 0; i<tsize; i++)
{
free(tbl[i]);
}
free(tbl);
}
}
/**
* If autocommit is disabled or transaction is explicitly started
* transaction becomes active and master gets all statements until
@ -1443,117 +1643,7 @@ static int routeQuery(
router_cli_ses->rses_transaction_active = false;
}
/**
* If query is of type QUERY_TYPE_CREATE_TMP_TABLE then find out
* the database and table name, create a hashvalue and
* add it to the router client session's property. If property
* doesn't exist then create it first. If the query is DROP TABLE...
* then see if it targets a temporary table and remove it from the hashtable
* if it does.
*/
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_CREATE_TMP_TABLE))
{
bool is_temp = true;
char* tblname = NULL;
tblname = skygw_get_created_table_name(querybuf);
if (tblname && strlen(tblname) > 0)
{
klen = strlen(dbname) + strlen(tblname) + 2;
hkey = calloc(klen,sizeof(char));
strcpy(hkey,dbname);
strcat(hkey,".");
strcat(hkey,tblname);
}
else
{
hkey = NULL;
}
if(rses_prop_tmp == NULL)
{
if((rses_prop_tmp =
(rses_property_t*)calloc(1,sizeof(rses_property_t))))
{
#if defined(SS_DEBUG)
rses_prop_tmp->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY;
rses_prop_tmp->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY;
#endif
rses_prop_tmp->rses_prop_rsession = router_cli_ses;
rses_prop_tmp->rses_prop_refcount = 1;
rses_prop_tmp->rses_prop_next = NULL;
rses_prop_tmp->rses_prop_type = RSES_PROP_TYPE_TMPTABLES;
router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES] = rses_prop_tmp;
}
}
if (rses_prop_tmp->rses_prop_data.temp_tables == NULL)
{
h = hashtable_alloc(7, hashkeyfun, hashcmpfun);
hashtable_memory_fns(h,hstrdup,NULL,hfree,NULL);
if (h != NULL)
{
rses_prop_tmp->rses_prop_data.temp_tables = h;
}
}
if (hkey &&
hashtable_add(rses_prop_tmp->rses_prop_data.temp_tables,
(void *)hkey,
(void *)is_temp) == 0) /*< Conflict in hash table */
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Temporary table conflict in hashtable: %s",
hkey)));
}
#if defined(SS_DEBUG)
{
bool retkey =
hashtable_fetch(
rses_prop_tmp->rses_prop_data.temp_tables,
hkey);
if (retkey)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Temporary table added: %s",
hkey)));
}
}
#endif
free(hkey);
}
/** Check if DROP TABLE... targets a temporary table */
if (is_drop_table_query(querybuf))
{
tbl = skygw_get_table_names(querybuf,&tsize);
for(i = 0; i<tsize; i++)
{
klen = strlen(dbname) + strlen(tbl[i]) + 2;
hkey = calloc(klen,sizeof(char));
strcpy(hkey,dbname);
strcat(hkey,".");
strcat(hkey,tbl[i]);
if (rses_prop_tmp &&
rses_prop_tmp->rses_prop_data.temp_tables)
{
if (hashtable_delete(rses_prop_tmp->rses_prop_data.temp_tables,
(void *)hkey))
{
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"Temporary table dropped: %s",hkey)));
}
}
free(tbl[i]);
free(hkey);
}
free(tbl);
}
/**
* Find out where to route the query. Result may not be clear; it is
* possible to have a hint for routing to a named server which can
@ -3774,8 +3864,8 @@ static void print_error_packet(
}
}
ss_dassert(srv != NULL);
bufstr = strndup(&ptr[7], len-3);
char* str = (char*)&ptr[7];
bufstr = strndup(str, len-3);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,