Merge remote-tracking branch 'origin/MAX-160' into MAX-237

Conflicts:
	query_classifier/query_classifier.cc
	query_classifier/query_classifier.h
This commit is contained in:
Markus Makela 2014-08-29 11:02:03 +03:00
commit 7ea53f0141
21 changed files with 934 additions and 168 deletions

View File

@ -39,7 +39,7 @@ buildtests:
-o testlog \
-I$(MARIADB_SRC_PATH)/include \
-I$(LOG_MANAGER_PATH) -I$(UTILS_PATH) testlog.c \
-llog_manager $(LDLIBS) \
-lstdc++ -llog_manager $(LDLIBS) \
$(UTILS_PATH)/skygw_utils.o

View File

@ -8,6 +8,8 @@ SRCS := query_classifier.cc
UTILS_PATH := $(ROOT_PATH)/utils
QUERY_CLASSIFIER_PATH := $(ROOT_PATH)/query_classifier
LOG_MANAGER_PATH := $(ROOT_PATH)/log_manager
SERVER_INC_PATH := $(ROOT_PATH)/server/include
MODULE_INC_PATH := $(ROOT_PATH)/server/modules/include
makeall: clean all
@ -43,6 +45,9 @@ libcomp:
$(CPP) -c $(CFLAGS) \
$(MYSQL_HEADERS) \
-I$(LOG_MANAGER_PATH) \
-I$(SERVER_INC_PATH) \
-I$(MODULE_INC_PATH) \
-I$(UTILS_PATH) \
-I./ \
-fPIC ./query_classifier.cc -o query_classifier.o
@ -66,6 +71,9 @@ depend:
$(CPP) -M $(CFLAGS) \
$(MYSQL_HEADERS) \
-I$(LOG_MANAGER_PATH) \
-I$(SERVER_INC_PATH) \
-I$(MODULE_INC_PATH) \
-I$(UTILS_PATH) \
-I./ \
$(SRCS) > depend

View File

@ -34,6 +34,7 @@
#include "../utils/skygw_types.h"
#include "../utils/skygw_debug.h"
#include <log_manager.h>
#include <mysql_client_server_protocol.h>
#include <mysql.h>
#include <my_sys.h>
@ -83,122 +84,158 @@ static bool skygw_stmt_causes_implicit_commit(
static int is_autocommit_stmt(
LEX* lex);
/**
* @node (write brief function description here)
*
* Parameters:
* @param query_str - <usage>
* <description>
*
* @param client_flag - <usage>
* <description>
*
* @return
*
static void parsing_info_set_plain_str(void* ptr,
char* str);
/**
* Calls parser for the query includede in the buffer. Creates and adds parsing
* information to buffer if it doesn't exist already. Resolves the query type.
*
* @details (write detailed description here)
*
* @param querybuf buffer including the query and possibly the parsing information
*
* @return query type
*/
skygw_query_type_t skygw_query_classifier_get_type(
const char* query,
unsigned long client_flags,
MYSQL** p_mysql)
skygw_query_type_t query_classifier_get_type(
GWBUF* querybuf)
{
MYSQL* mysql;
char* query_str;
const char* user = "skygw";
const char* db = "skygw";
THD* thd;
MYSQL* mysql;
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
bool failp = FALSE;
ss_info_dassert(query != NULL, ("query_str is NULL"));
bool succp;
query_str = const_cast<char*>(query);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Query : \"%s\"", query_str)));
ss_info_dassert(querybuf != NULL, ("querybuf is NULL"));
/** Get server handle */
mysql = mysql_init(NULL);
/** Create parsing info for the query and store it to buffer */
succp = query_is_parsed(querybuf);
if (mysql == NULL) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : call to mysql_real_connect failed due %d, %s.",
mysql_errno(mysql),
mysql_error(mysql))));
if (!succp)
{
succp = parse_query(querybuf);
}
/** Read thd pointer and resolve the query type with it. */
if (succp)
{
parsing_info_t* pi;
mysql_library_end();
goto return_qtype;
}
pi = (parsing_info_t*)gwbuf_get_buffer_object_data(querybuf,
GWBUF_PARSING_INFO);
if (pi != NULL)
{
mysql = (MYSQL *)pi->pi_handle;
if (p_mysql != NULL)
{
*p_mysql = mysql;
/** Find out the query type */
if (mysql != NULL)
{
qtype = resolve_query_type((THD *)mysql->thd);
}
}
}
/** Set methods and authentication to mysql */
mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "libmysqld_skygw");
mysql_options(mysql, MYSQL_OPT_USE_EMBEDDED_CONNECTION, NULL);
mysql->methods = &embedded_methods;
mysql->user = my_strdup(user, MYF(0));
mysql->db = my_strdup(db, MYF(0));
mysql->passwd = NULL;
/** Get one or create new THD object to be use in parsing */
thd = get_or_create_thd_for_parsing(mysql, query_str);
if (thd == NULL)
{
skygw_query_classifier_free(mysql);
*p_mysql = NULL;
goto return_qtype;
}
/**
* Create parse_tree inside thd.
* thd and even lex are readable even if parser failed so let it
* continue despite failure.
*/
failp = create_parse_tree(thd);
qtype = resolve_query_type(thd);
if (p_mysql == NULL)
{
skygw_query_classifier_free(mysql);
}
return_qtype:
return qtype;
}
void skygw_query_classifier_free(
MYSQL* mysql)
/**
* Create parsing info and try to parse the query included in the query buffer.
* Store pointer to created parse_tree_t object to buffer.
*
* @param querybuf buffer including the query and possibly the parsing information
*
* @return true if succeed, false otherwise
*/
bool parse_query (
GWBUF* querybuf)
{
if (mysql->thd != NULL)
bool succp;
THD* thd;
uint8_t* data;
size_t len;
char* query_str;
parsing_info_t* pi;
CHK_GWBUF(querybuf);
/** Do not parse without releasing previous parse info first */
ss_dassert(!query_is_parsed(querybuf));
if (query_is_parsed(querybuf))
{
(*mysql->methods->free_embedded_thd)(mysql);
mysql->thd = NULL;
return false;
}
mysql_close(mysql);
mysql_thread_end();
}
/** Create parsing info */
pi = parsing_info_init(parsing_info_done);
if (pi == NULL)
{
succp = false;
goto retblock;
}
/** Extract query and copy it to different buffer */
data = (uint8_t*)GWBUF_DATA(querybuf);
len = MYSQL_GET_PACKET_LEN(data)-1; /*< distract 1 for packet type byte */
query_str = (char *)malloc(len+1);
if (query_str == NULL)
{
/** Free parsing info data */
parsing_info_done(pi);
succp = false;
goto retblock;
}
memcpy(query_str, &data[5], len);
memset(&query_str[len], 0, 1);
parsing_info_set_plain_str(pi, query_str);
/** Get one or create new THD object to be use in parsing */
thd = get_or_create_thd_for_parsing((MYSQL *)pi->pi_handle, query_str);
if (thd == NULL)
{
/** Free parsing info data */
parsing_info_done(pi);
succp = false;
goto retblock;
}
/**
* Create parse_tree inside thd.
* thd and lex are readable even if creating parse tree fails.
*/
create_parse_tree(thd);
/** Add complete parsing info struct to the query buffer */
gwbuf_add_buffer_object(querybuf,
GWBUF_PARSING_INFO,
(void *)pi,
parsing_info_done);
succp = true;
retblock:
return succp;
}
/**
* If buffer has non-NULL gwbuf_parsing_info it is parsed and it has parsing
* information included.
*
* @param buf buffer being examined
*
* @return true or false
*/
bool query_is_parsed(
GWBUF* buf)
{
CHK_GWBUF(buf);
return GWBUF_IS_PARSED(buf);
}
/**
* @node (write brief function description here)
/**
* Create a thread context, thd, init embedded server, connect to it, and allocate
* query to thd.
*
* Parameters:
* @param mysql - <usage>
* <description>
*
* @param query_str - <usage>
* <description>
*
* @return
*
* @param mysql Database handle
*
* @details (write detailed description here)
* @param query_str Query in plain txt string
*
* @return Thread context pointer
*
*/
static THD* get_or_create_thd_for_parsing(
@ -412,7 +449,7 @@ static skygw_query_type_t resolve_query_type(
ss_info_dassert(thd != NULL, ("thd is NULL\n"));
force_data_modify_op_replication = FALSE;
force_data_modify_op_replication = FALSE;
lex = thd->lex;
/** SELECT ..INTO variable|OUTFILE|DUMPFILE */
@ -642,7 +679,6 @@ static skygw_query_type_t resolve_query_type(
"%lu [resolve_query_type] "
"functype FUNC_SP, stored proc "
"or unknown function.",
"%s:%s",
pthread_self())));
break;
case Item_func::UDF_FUNC:
@ -836,6 +872,7 @@ char* skygw_query_classifier_get_stmtname(
}
/**
* Finds the head of the list of tables affected by the current select statement.
* @param thd Pointer to a valid thread descriptor structure
* @return Head of the TABLE_LIST chain or NULL in case of an error
@ -855,4 +892,194 @@ void* skygw_get_affected_tables(void* thdp)
}
return (void*)thd->lex->current_select->table_list.first;
}
}
/*
* Replace user-provided literals with question marks. Return a copy of the
* querystr with replacements.
*
* @param querybuf GWBUF buffer including necessary parsing info
*
* @return Copy of querystr where literals are replaces with question marks or
* NULL if querystr is NULL, thread context or lex are NULL or if replacement
* function fails.
*
* Replaced literal types are STRING_ITEM,INT_ITEM,DECIMAL_ITEM,REAL_ITEM,
* VARBIN_ITEM,NULL_ITEM
*/
char* skygw_get_canonical(
GWBUF* querybuf)
{
parsing_info_t* pi;
MYSQL* mysql;
THD* thd;
LEX* lex;
Item* item;
char* querystr;
if (!GWBUF_IS_PARSED(querybuf))
{
querystr = NULL;
goto retblock;
}
pi = (parsing_info_t *)gwbuf_get_buffer_object_data(querybuf,
GWBUF_PARSING_INFO);
if (pi == NULL)
{
querystr = NULL;
goto retblock;
}
if (pi->pi_query_plain_str == NULL ||
(mysql = (MYSQL *)pi->pi_handle) == NULL ||
(thd = (THD *)mysql->thd) == NULL ||
(lex = thd->lex) == NULL)
{
ss_dassert(pi->pi_query_plain_str != NULL &&
mysql != NULL &&
thd != NULL &&
lex != NULL);
querystr = NULL;
goto retblock;
}
querystr = strdup(pi->pi_query_plain_str);
for (item=thd->free_list; item != NULL; item=item->next)
{
Item::Type itype;
itype = item->type();
if (item->name != NULL &&
(itype == Item::STRING_ITEM ||
itype == Item::INT_ITEM ||
itype == Item::DECIMAL_ITEM ||
itype == Item::REAL_ITEM ||
itype == Item::VARBIN_ITEM ||
itype == Item::NULL_ITEM))
{
if (itype == Item::STRING_ITEM && strlen(item->name) == 0)
{
querystr = replace_literal(querystr, "\"\"", "\"?\"");
}
else
{
querystr = replace_literal(querystr, item->name, "?");
}
}
} /*< for */
retblock:
return querystr;
}
/**
* Create parsing information; initialize mysql handle, allocate parsing info
* struct and set handle and free function pointer to it.
*
* @param donefun pointer to free function
*
* @return pointer to parsing information
*/
parsing_info_t* parsing_info_init(
void (*donefun)(void *))
{
parsing_info_t* pi = NULL;
MYSQL* mysql;
const char* user = "skygw";
const char* db = "skygw";
ss_dassert(donefun != NULL);
/** Get server handle */
mysql = mysql_init(NULL);
ss_dassert(mysql != NULL);
if (mysql == NULL) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : call to mysql_real_connect failed due %d, %s.",
mysql_errno(mysql),
mysql_error(mysql))));
goto retblock;
}
/** Set methods and authentication to mysql */
mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "libmysqld_skygw");
mysql_options(mysql, MYSQL_OPT_USE_EMBEDDED_CONNECTION, NULL);
mysql->methods = &embedded_methods;
mysql->user = my_strdup(user, MYF(0));
mysql->db = my_strdup(db, MYF(0));
mysql->passwd = NULL;
pi = (parsing_info_t*)calloc(1, sizeof(parsing_info_t));
if (pi == NULL)
{
mysql_close(mysql);
mysql_thread_end();
goto retblock;
}
#if defined(SS_DEBUG)
pi->pi_chk_top = CHK_NUM_PINFO;
pi->pi_chk_tail = CHK_NUM_PINFO;
#endif
/** Set handle and free function to parsing info struct */
pi->pi_handle = mysql;
pi->pi_done_fp = donefun;
retblock:
return pi;
}
/**
* Free function for parsing info. Called by gwbuf_free or in case initialization
* of parsing information fails.
*
* @param ptr Pointer to parsing information, cast required
*
* @return void
*
*/
void parsing_info_done(
void* ptr)
{
parsing_info_t* pi = (parsing_info_t *)ptr;
if (pi->pi_handle != NULL)
{
MYSQL* mysql = (MYSQL *)pi->pi_handle;
if (mysql->thd != NULL)
{
(*mysql->methods->free_embedded_thd)(mysql);
mysql->thd = NULL;
}
mysql_close(mysql);
}
/** Free plain text query string */
if (pi->pi_query_plain_str != NULL)
{
free(pi->pi_query_plain_str);
}
free(pi);
}
/**
* Add plain text query string to parsing info.
*
* @param ptr Pointer to parsing info struct, cast required
* @param str String to be added
*
* @return void
*/
static void parsing_info_set_plain_str(
void* ptr,
char* str)
{
parsing_info_t* pi = (parsing_info_t *)ptr;
CHK_PARSING_INFO(pi);
pi->pi_query_plain_str = str;
}

View File

@ -20,7 +20,8 @@ Copyright SkySQL Ab
/** getpid */
#include <unistd.h>
#include <mysql.h>
#include "../utils/skygw_utils.h"
#include <skygw_utils.h>
#include <buffer.h>
EXTERN_C_BLOCK_BEGIN
@ -48,21 +49,35 @@ typedef enum {
QUERY_TYPE_READ_TMP_TABLE = 0x4000 /*< Read temporary table */
} skygw_query_type_t;
typedef struct parsing_info_st {
#if defined(SS_DEBUG)
skygw_chk_t pi_chk_top;
#endif
void* pi_handle; /*< parsing info object pointer */
char* pi_query_plain_str; /*< query as plain string */
void (*pi_done_fp)(void *); /*< clean-up function for parsing info */
#if defined(SS_DEBUG)
skygw_chk_t pi_chk_tail;
#endif
} parsing_info_t;
#define QUERY_IS_TYPE(mask,type) ((mask & type) == type)
/**
* Create THD and use it for creating parse tree. Examine parse tree and
* classify the query.
*/
skygw_query_type_t skygw_query_classifier_get_type(
const char* query_str,
unsigned long client_flags,
MYSQL** mysql);
skygw_query_type_t query_classifier_get_type(GWBUF* querybuf);
/** Free THD context and close MYSQL */
void skygw_query_classifier_free(MYSQL* mysql);
char* skygw_query_classifier_get_stmtname(MYSQL* mysql);
void* skygw_get_affected_tables(void* thdp);
char* skygw_query_classifier_get_stmtname(MYSQL* mysql);
char* skygw_get_canonical(GWBUF* querybuf);
bool parse_query (GWBUF* querybuf);
parsing_info_t* parsing_info_init(void (*donefun)(void *));
void parsing_info_done(void* ptr);
bool query_is_parsed(GWBUF* buf);
EXTERN_C_BLOCK_END

View File

@ -0,0 +1,64 @@
# cleantests - clean local and subdirectories' tests
# buildtests - build all local and subdirectories' tests
# runtests - run all local tests
# testall - clean, build and run local and subdirectories' tests
include ../../../build_gateway.inc
include ../../../makefile.inc
include ../../../test.inc
CC = gcc
CPP = g++
TESTPATH := $(shell pwd)
TESTLOG := $(TESTPATH)/testqclass.log
QUERY_CLASSIFIER_PATH := $(ROOT_PATH)/query_classifier
LOG_MANAGER_PATH := $(ROOT_PATH)/log_manager
UTILS_PATH := $(ROOT_PATH)/utils
CORE_PATH := $(ROOT_PATH)/server/core
TESTAPP = $(TESTPATH)/canonizer
LDFLAGS=-L$(QUERY_CLASSIFIER_PATH) \
-L$(LOG_MANAGER_PATH) \
-L$(EMBEDDED_LIB) \
-Wl,-rpath,$(DEST)/lib \
-Wl,-rpath,$(EMBEDDED_LIB) \
-Wl,-rpath,$(LOG_MANAGER_PATH) \
-Wl,-rpath,$(QUERY_CLASSIFIER_PATH)
LIBS=-lstdc++ -lpthread -lquery_classifier -lz -ldl -lssl -laio -lcrypt -lcrypto -lrt \
-llog_manager $(UTILS_PATH)/skygw_utils.o $(CORE_PATH)/buffer.o $(CORE_PATH)/atomic.o $(CORE_PATH)/spinlock.o
CFLAGS=-g $(MYSQL_HEADERS) \
-I$(QUERY_CLASSIFIER_PATH) \
$(MYSQL_HEADERS) \
-I$(ROOT_PATH)/server/include \
-I$(UTILS_PATH)
EMBFLAGS=$(shell mysql_config --cflags --libmysqld-libs)
testall:
$(MAKE) cleantests
$(MAKE) buildtests
$(MAKE) runtests
cleantests:
- $(DEL) *.o
- $(DEL) *~
- $(DEL) canonizer
- $(DEL) aria_log*
- $(DEL) ib*
buildtests: $(OBJS)
cp $(ERRMSG)/errmsg.sys .
$(CC) $(CFLAGS) $(LDFLAGS) $(EMBFLAGS) $(LIBS) canonizer.c -o $(TESTAPP)
runtests:
@echo "" > $(TESTLOG)
@echo "-------------------------------" >> $(TESTLOG)
@echo $(shell date) >> $(TESTLOG)
@echo "Canonical Query Tests" >> $(TESTLOG)
@echo "-------------------------------" >> $(TESTLOG)
@echo "" >> $(TESTLOG)
./canontest.sh $(TESTLOG) input.sql output.sql expected.sql

View File

@ -0,0 +1,101 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <query_classifier.h>
#include <buffer.h>
#include <mysql.h>
static char* server_options[] = {
"SkySQL Gateway",
"--datadir=./",
"--language=./",
"--skip-innodb",
"--default-storage-engine=myisam",
NULL
};
const int num_elements = (sizeof(server_options) / sizeof(char *)) - 1;
static char* server_groups[] = {
"embedded",
"server",
"server",
NULL
};
int main(int argc, char** argv)
{
int fdin,fdout,i=0,fnamelen,fsz,lines = 0;
unsigned int psize;
GWBUF** qbuff;
char *qin, *outnm, *buffer, *tok;
if(argc != 3){
printf("Usage: canonizer <input file> <output file>\n");
return 1;
}
bool failed = mysql_library_init(num_elements, server_options, server_groups);
if(failed){
printf("Embedded server init failed.\n");
return 1;
}
fnamelen = strlen(argv[1]) + 16;
fdin = open(argv[1],O_RDONLY);
fsz = lseek(fdin,0,SEEK_END);
lseek(fdin,0,SEEK_SET);
if(!(buffer = malloc(sizeof(char)*fsz))){
printf("Error: Failed to allocate memory.");
return 1;
}
read(fdin,buffer,fsz);
tok = strpbrk(buffer,"\n");
lines = 1;
while((tok = strpbrk(tok + 1,"\n"))){
lines++;
}
qbuff = malloc(sizeof(GWBUF*)*lines);
i = 0;
tok = strtok(buffer,"\n");
while(tok){
qin = strdup(tok);
psize = strlen(qin);
qbuff[i] = gwbuf_alloc(psize + 6);
*(qbuff[i]->sbuf->data + 0) = (unsigned char)psize;
*(qbuff[i]->sbuf->data + 1) = (unsigned char)(psize>>8);
*(qbuff[i]->sbuf->data + 2) = (unsigned char)(psize>>16);
*(qbuff[i]->sbuf->data + 4) = 0x03;
memcpy(qbuff[i]->sbuf->data + 5,qin,psize);
*(qbuff[i]->sbuf->data + 5 + psize) = 0x00;
tok = strtok(NULL,"\n");
free(qin);
i++;
}
fdout = open(argv[2],O_TRUNC|O_CREAT|O_WRONLY,S_IRWXU|S_IXGRP|S_IXOTH);
for(i = 0;i<lines;i++){
parse_query(qbuff[i]);
tok = skygw_get_canonical(qbuff[i]);
write(fdout,tok,strlen(tok));
write(fdout,"\n",1);
gwbuf_free(qbuff[i]);
}
close(fdin);
close(fdout);
return 0;
}

View File

@ -0,0 +1,21 @@
#! /bin/sh
if [[ $# -ne 4 ]]
then
echo "Usage: canontest.sh <logfile name> <input file> <output file> <expected output>"
exit 0
fi
TESTLOG=$1
INPUT=$2
OUTPUT=$3
EXPECTED=$4
DIFFLOG=diff.out
$PWD/canonizer $INPUT $OUTPUT
diff $OUTPUT $EXPECTED > $DIFFLOG
if [ $? -eq 0 ]
then
echo "PASSED" >> $TESTLOG
else
echo "FAILED" >> $TESTLOG
echo "Diff output: " >> $TESTLOG
cat $DIFFLOG >> $TESTLOG
fi

View File

@ -0,0 +1,17 @@
select md5(?) =?, sleep(?), rand(?);
select * from my1 where md5(?) =?;
select md5(?) =?;
select * from my1 where md5(?) =?;
select sleep(?)
select * from tst where lname='?'
select ?,?,?,?,?,? from tst
select * from tst where fname like '?'
select * from tst where lname like '?' order by fname
insert into tst values ("?","?"),("?",?),("?","?")
drop table if exists tst
create table tst(fname varchar(30), lname varchar(30))
update tst set lname="?" where fname like '?' or lname like '?'
delete from tst where lname like '?' and fname like '?'
select ? from tst where fname='?' or lname like '?'
select ?,?,?,? from tst where name='?' or name='?' or name='?'
select count(?),count(?),count(?),count(?),count (?),count(?) from tst

View File

@ -0,0 +1,17 @@
select md5("200000foo") =10, sleep(2), rand(100);
select * from my1 where md5("110") =10;
select md5("100foo") =10;
select * from my1 where md5("100") =10;
select sleep(2);
select * from tst where lname='Doe';
select 1,2,3,4,5,6 from tst;
select * from tst where fname like '%a%';
select * from tst where lname like '%e%' order by fname;
insert into tst values ("John","Doe"),("Plato",null),("Nietzsche","");
drop table if exists tst;
create table tst(fname varchar(30), lname varchar(30));
update tst set lname="Human" where fname like '%a%' or lname like '%a%';
delete from tst where lname like '%man%' and fname like '%ard%';
select 100 from tst where fname='10' or lname like '%100%';
select 1,20,300,4000 from tst where name='1000' or name='200' or name='30' or name='4';
select count(1),count(10),count(100),count(2),count (20),count(200) from tst;

View File

@ -18,7 +18,7 @@ UTILS_PATH := $(ROOT_PATH)/utils
TESTAPP = $(TESTPATH)/testmain
testall:buildtests
$(MAKE) -C canonical_tests testall
testalllaters:
$(MAKE) cleantests
$(MAKE) DEBUG=Y DYNLIB=Y buildtests
@ -80,4 +80,4 @@ ifeq ($?, 0)
else
@echo "Query Classifier FAILED" >> $(TESTLOG)
endif
@cat $(TESTLOG) >> $(TEST_MAXSCALE_LOG)
@cat $(TESTLOG) >> $(TEST_MAXSCALE_LOG)

View File

@ -40,6 +40,11 @@
#include <atomic.h>
#include <skygw_debug.h>
static buffer_object_t* gwbuf_remove_buffer_object(
GWBUF* buf,
buffer_object_t* bufobj);
/**
* Allocate a new gateway buffer structure of size bytes.
*
@ -77,13 +82,15 @@ SHARED_BUF *sbuf;
free(sbuf);
return NULL;
}
spinlock_init(&rval->gwbuf_lock);
rval->start = sbuf->data;
rval->end = rval->start + size;
sbuf->refcount = 1;
rval->sbuf = sbuf;
rval->next = NULL;
rval->gwbuf_type = GWBUF_TYPE_UNDEFINED;
rval->command = 0;
rval->gwbuf_info = GWBUF_INFO_NONE;
rval->gwbuf_bufobj = NULL;
CHK_GWBUF(rval);
return rval;
}
@ -96,12 +103,20 @@ SHARED_BUF *sbuf;
void
gwbuf_free(GWBUF *buf)
{
buffer_object_t* bo;
CHK_GWBUF(buf);
if (atomic_add(&buf->sbuf->refcount, -1) == 1)
{
free(buf->sbuf->data);
free(buf->sbuf);
}
bo = buf->gwbuf_bufobj;
while (bo != NULL)
{
bo = gwbuf_remove_buffer_object(buf, bo);
}
}
free(buf);
}
@ -130,6 +145,8 @@ GWBUF *rval;
rval->start = buf->start;
rval->end = buf->end;
rval->gwbuf_type = buf->gwbuf_type;
rval->gwbuf_info = buf->gwbuf_info;
rval->gwbuf_bufobj = buf->gwbuf_bufobj;
rval->next = NULL;
CHK_GWBUF(rval);
return rval;
@ -156,6 +173,8 @@ GWBUF *gwbuf_clone_portion(
clonebuf->start = (void *)((char*)buf->start)+start_offset;
clonebuf->end = (void *)((char *)clonebuf->start)+length;
clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone the type for now */
clonebuf->gwbuf_info = buf->gwbuf_info;
clonebuf->gwbuf_bufobj = buf->gwbuf_bufobj;
clonebuf->next = NULL;
CHK_GWBUF(clonebuf);
return clonebuf;
@ -338,5 +357,87 @@ void gwbuf_set_type(
}
}
/**
* Add a buffer object to GWBUF buffer.
*
* @param buf GWBUF where object is added
* @param id Type identifier for object
* @param data Object data
* @param donefun_dp Clean-up function to be executed before buffer is freed.
*/
void gwbuf_add_buffer_object(
GWBUF* buf,
bufobj_id_t id,
void* data,
void (*donefun_fp)(void *))
{
buffer_object_t** p_b;
buffer_object_t* newb;
CHK_GWBUF(buf);
newb = (buffer_object_t *)malloc(sizeof(buffer_object_t));
newb->bo_id = id;
newb->bo_data = data;
newb->bo_donefun_fp = donefun_fp;
newb->bo_next = NULL;
/** Lock */
spinlock_acquire(&buf->gwbuf_lock);
p_b = &buf->gwbuf_bufobj;
/** Search the end of the list and add there */
while (*p_b != NULL)
{
p_b = &(*p_b)->bo_next;
}
*p_b = newb;
/** Set flag */
buf->gwbuf_info |= GWBUF_INFO_PARSED;
/** Unlock */
spinlock_release(&buf->gwbuf_lock);
}
/**
* Search buffer object which matches with the id.
*
* @param buf GWBUF to be searched
* @param id Identifier for the object
*
* @return Searched buffer object or NULL if not found
*/
void* gwbuf_get_buffer_object_data(
GWBUF* buf,
bufobj_id_t id)
{
buffer_object_t* bo;
CHK_GWBUF(buf);
/** Lock */
spinlock_acquire(&buf->gwbuf_lock);
bo = buf->gwbuf_bufobj;
while (bo != NULL && bo->bo_id != id)
{
bo = bo->bo_next;
}
/** Unlock */
spinlock_release(&buf->gwbuf_lock);
return bo->bo_data;
}
/**
* @return pointer to next buffer object or NULL
*/
static buffer_object_t* gwbuf_remove_buffer_object(
GWBUF* buf,
buffer_object_t* bufobj)
{
buffer_object_t* next;
next = bufobj->bo_next;
/** Call corresponding clean-up function to clean buffer object's data */
bufobj->bo_donefun_fp(bufobj->bo_data);
free(bufobj);
return next;
}

View File

@ -29,6 +29,7 @@
*/
#include <buffer.h>
#include <string.h>
#include <mysql_client_server_protocol.h>
/**
* Check if a GWBUF structure is a MySQL COM_QUERY packet
@ -171,3 +172,57 @@ GWBUF *addition;
return orig;
}
/**
* Copy query string from GWBUF buffer to separate memory area.
*
* @param buf GWBUF buffer including the query
*
* @return Plaint text query if the packet type is COM_QUERY. Otherwise return
* a string including the packet type.
*/
char* modutil_get_query(
GWBUF* buf)
{
uint8_t* packet;
mysql_server_cmd_t packet_type;
size_t len;
char* query_str;
packet = GWBUF_DATA(buf);
packet_type = packet[4];
switch (packet_type) {
case MYSQL_COM_QUIT:
len = strlen("[Quit msg]")+1;
if ((query_str = (char *)malloc(len+1)) == NULL)
{
goto retblock;
}
memcpy(query_str, "[Quit msg]", len);
memset(&query_str[len], 0, 1);
break;
case MYSQL_COM_QUERY:
len = MYSQL_GET_PACKET_LEN(packet)-1; /*< distract 1 for packet type byte */
if ((query_str = (char *)malloc(len+1)) == NULL)
{
goto retblock;
}
memcpy(query_str, &packet[5], len);
memset(&query_str[len], 0, 1);
break;
default:
len = strlen(STRPACKETTYPE(packet_type))+1;
if ((query_str = (char *)malloc(len+1)) == NULL)
{
goto retblock;
}
memcpy(query_str, STRPACKETTYPE(packet_type), len);
memset(&query_str[len], 0, 1);
break;
} /*< switch */
retblock:
return query_str;
}

View File

@ -675,6 +675,7 @@ int n = 0;
"Unable to find filter '%s' for service '%s'\n",
trim(ptr), service->name
)));
n--;
}
flist[n] = NULL;
ptr = strtok_r(NULL, "|", &brkt);

View File

@ -42,8 +42,11 @@
* @endverbatim
*/
#include <skygw_debug.h>
#include <spinlock.h>
EXTERN_C_BLOCK_BEGIN
typedef enum
{
GWBUF_TYPE_UNDEFINED = 0x00,
@ -73,6 +76,35 @@ typedef struct {
int refcount; /*< Reference count on the buffer */
} SHARED_BUF;
typedef enum
{
GWBUF_INFO_NONE = 0x0,
GWBUF_INFO_PARSED = 0x1
} gwbuf_info_t;
#define GWBUF_IS_PARSED(b) (b->gwbuf_info & GWBUF_INFO_PARSED)
/**
* A structure for cleaning up memory allocations of structures which are
* referred to by GWBUF and deallocated in gwbuf_free but GWBUF doesn't
* know what they are.
* All functions on the list are executed before freeing memory of GWBUF struct.
*/
typedef enum
{
GWBUF_PARSING_INFO
} bufobj_id_t;
typedef struct buffer_object_st buffer_object_t;
struct buffer_object_st {
bufobj_id_t bo_id;
void* bo_data;
void (*bo_donefun_fp)(void *);
buffer_object_t* bo_next;
};
/**
* The buffer structure used by the descriptor control blocks.
*
@ -82,11 +114,13 @@ typedef struct {
* be copied within the gateway.
*/
typedef struct gwbuf {
SPINLOCK gwbuf_lock;
struct gwbuf *next; /*< Next buffer in a linked chain of buffers */
void *start; /*< Start of the valid data */
void *end; /*< First byte after the valid data */
SHARED_BUF *sbuf; /*< The shared buffer with the real data */
int command;/*< The command type for the queue */
buffer_object_t *gwbuf_bufobj; /*< List of objects referred to by GWBUF */
gwbuf_info_t gwbuf_info; /*< Info bits */
gwbuf_type_t gwbuf_type; /*< buffer's data type information */
} GWBUF;
@ -121,4 +155,14 @@ extern unsigned int gwbuf_length(GWBUF *head);
extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len);
extern GWBUF *gwbuf_clone_transform(GWBUF *head, gwbuf_type_t type);
extern void gwbuf_set_type(GWBUF *head, gwbuf_type_t type);
void gwbuf_add_buffer_object(GWBUF* buf,
bufobj_id_t id,
void* data,
void (*donefun_fp)(void *));
void* gwbuf_get_buffer_object_data(GWBUF* buf, bufobj_id_t id);
EXTERN_C_BLOCK_END
#endif

View File

@ -36,4 +36,6 @@ extern int modutil_is_SQL(GWBUF *);
extern int modutil_extract_SQL(GWBUF *, char **, int *);
extern int modutil_MySQL_Query(GWBUF *, char **, int *, int *);
extern GWBUF *modutil_replace_SQL(GWBUF *, char *);
char* modutil_get_query(GWBUF* buf);
#endif

View File

@ -31,6 +31,7 @@
#include <dcb.h>
#include <spinlock.h>
#include <modinfo.h>
#include <modutil.h>
#include <mysql_client_server_protocol.h>
MODULE_INFO info = {
@ -703,7 +704,7 @@ static void* newSession(
backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property =
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL;
}
}
max_nslaves = rses_get_max_slavecount(client_rses, router_nservers);
max_slave_rlag = rses_get_max_replication_lag(client_rses);
@ -1031,9 +1032,6 @@ static int routeQuery(
GWBUF* querybuf)
{
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
GWBUF* plainsqlbuf = NULL;
char* querystr = NULL;
char* startpos;
mysql_server_cmd_t packet_type;
uint8_t* packet;
int ret = 0;
@ -1042,8 +1040,6 @@ static int routeQuery(
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
bool rses_is_closed = false;
size_t len;
MYSQL* mysql = NULL;
CHK_CLIENT_RSES(router_cli_ses);
@ -1066,6 +1062,8 @@ static int routeQuery(
*/
if (packet_type != MYSQL_COM_QUIT)
{
char* query_str = modutil_get_query(querybuf);
LOGIF(LE,
(skygw_log_write_flush(
LOGFILE_ERROR,
@ -1073,15 +1071,15 @@ static int routeQuery(
"backend server. %s.",
STRPACKETTYPE(packet_type),
STRQTYPE(qtype),
(querystr == NULL ? "(empty)" : querystr),
(query_str == NULL ? "(empty)" : query_str),
(rses_is_closed ? "Router was closed" :
"Router has no backend servers where to "
"route to"))));
free(querybuf);
}
goto return_ret;
}
inst->stats.n_queries++;
startpos = (char *)&packet[5];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
CHK_DCB(master_dcb);
@ -1105,44 +1103,16 @@ static int routeQuery(
break;
case MYSQL_COM_QUERY:
plainsqlbuf = gwbuf_clone_transform(querybuf,
GWBUF_TYPE_PLAINSQL);
len = GWBUF_LENGTH(plainsqlbuf);
/** unnecessary if buffer includes additional terminating null */
querystr = (char *)malloc(len+1);
memcpy(querystr, startpos, len);
memset(&querystr[len], 0, 1);
/**
* Use mysql handle to query information from parse tree.
* call skygw_query_classifier_free before exit!
*/
qtype = skygw_query_classifier_get_type(querystr, 0, &mysql);
qtype = query_classifier_get_type(querybuf);
break;
case MYSQL_COM_STMT_PREPARE:
plainsqlbuf = gwbuf_clone_transform(querybuf,
GWBUF_TYPE_PLAINSQL);
len = GWBUF_LENGTH(plainsqlbuf);
/** unnecessary if buffer includes additional terminating null */
querystr = (char *)malloc(len+1);
memcpy(querystr, startpos, len);
memset(&querystr[len], 0, 1);
qtype = skygw_query_classifier_get_type(querystr, 0, &mysql);
qtype = query_classifier_get_type(querybuf);
qtype |= QUERY_TYPE_PREPARE_STMT;
break;
case MYSQL_COM_STMT_EXECUTE:
/** Parsing is not needed for this type of packet */
#if defined(NOT_USED)
plainsqlbuf = gwbuf_clone_transform(querybuf,
GWBUF_TYPE_PLAINSQL);
len = GWBUF_LENGTH(plainsqlbuf);
/** unnecessary if buffer includes additional terminating null */
querystr = (char *)malloc(len+1);
memcpy(querystr, startpos, len);
memset(&querystr[len], 0, 1);
qtype = skygw_query_classifier_get_type(querystr, 0, &mysql);
#endif
qtype = QUERY_TYPE_EXEC_STMT;
break;
@ -1207,7 +1177,7 @@ static int routeQuery(
*/
bool succp = route_session_write(
router_cli_ses,
querybuf,
gwbuf_clone(querybuf),
inst,
packet_type,
qtype);
@ -1238,7 +1208,7 @@ static int routeQuery(
if (succp)
{
if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1)
if ((ret = slave_dcb->func.write(slave_dcb, gwbuf_clone(querybuf))) == 1)
{
backend_ref_t* bref;
@ -1252,10 +1222,12 @@ static int routeQuery(
}
else
{
char* query_str = modutil_get_query(querybuf);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing query \"%s\" failed.",
querystr)));
(query_str == NULL ? "not available" : query_str))));
free(query_str);
}
}
rses_end_locked_router_action(router_cli_ses);
@ -1307,7 +1279,7 @@ static int routeQuery(
if (succp)
{
if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1)
if ((ret = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf))) == 1)
{
backend_ref_t* bref;
@ -1333,18 +1305,23 @@ static int routeQuery(
}
}
return_ret:
if (plainsqlbuf != NULL)
#if defined(SS_DEBUG)
{
gwbuf_free(plainsqlbuf);
}
if (querystr != NULL)
{
free(querystr);
}
if (mysql != NULL)
{
skygw_query_classifier_free(mysql);
char* canonical_query_str;
canonical_query_str = skygw_get_canonical(querybuf);
if (canonical_query_str != NULL)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Canonical version: %s",
canonical_query_str)));
free(canonical_query_str);
}
}
#endif
gwbuf_free(querybuf);
return ret;
}
@ -3663,4 +3640,3 @@ static BACKEND *get_root_master(backend_ref_t *servers, int router_nservers) {
}
return master_host;
}

View File

@ -3,6 +3,7 @@ include ../makefile.inc
CC = gcc
CPP = g++
UTILS_PATH := $(ROOT_PATH)/utils
makeall: clean all
@ -13,7 +14,7 @@ clean:
- $(DEL) *~
all:
$(CPP) -c $(CFLAGS) \
$(CPP) -c $(CFLAGS) -I$(UTILS_PATH) \
-fPIC skygw_utils.cc -o skygw_utils.o
cleantests:

View File

@ -123,7 +123,8 @@ typedef enum skygw_chk_t {
CHK_NUM_SESCMD_CUR,
CHK_NUM_BACKEND,
CHK_NUM_BACKEND_REF,
CHK_NUM_PREP_STMT
CHK_NUM_PREP_STMT,
CHK_NUM_PINFO
} skygw_chk_t;
# define STRBOOL(b) ((b) ? "true" : "false")
@ -486,6 +487,13 @@ typedef enum skygw_chk_t {
"Prepared statement struct has invalid check fields"); \
}
#define CHK_PARSING_INFO(p) { \
ss_info_dassert((p)->pi_chk_top == CHK_NUM_PINFO && \
(p)->pi_chk_tail == CHK_NUM_PINFO, \
"Parsing info struct has invalid check fields"); \
}
#if defined(SS_DEBUG)
bool conn_open[10240];

View File

@ -44,4 +44,6 @@
# endif
#endif
#define MAX_ERROR_MSG PATH_MAX
#endif /* SKYGW_TYPES_H */

View File

@ -23,9 +23,10 @@
#include <errno.h>
#include <string.h>
#include <time.h>
#include <stddef.h>
#include <regex.h>
#include "skygw_debug.h"
#include "skygw_types.h"
#include <skygw_types.h>
#include "skygw_utils.h"
const char* timestamp_formatstr = "%04d %02d/%02d %02d:%02d:%02d ";
@ -1863,3 +1864,100 @@ void skygw_file_done(
free(file);
}
}
/**
* Find the given needle - user-provided literal - and replace it with
* replacement string. Separate user-provided literals from matching table names
* etc. by searching only substrings preceded by non-letter and non-number.
*
* @param haystack Plain text query string, not to be freed
* @param needle Substring to be searched, not to be freed
* @param replacement Replacement text, not to be freed
*
* @return newly allocated string where needle is replaced
*/
char* replace_literal(
char* haystack,
const char* needle,
const char* replacement)
{
const char* prefix = "[ ='\",\\(]"; /*< ' ','=','(',''',''"',',' are allowed before needle */
const char* suffix = "[^[:alnum:]]"; /*< alpha-num chars aren't allowed after the needle */
char* search_re;
char* newstr;
regex_t re;
regmatch_t match;
int rc;
size_t rlen = strlen(replacement);
size_t nlen = strlen(needle);
size_t hlen = strlen(haystack);
search_re = (char *)malloc(strlen(prefix)+nlen+strlen(suffix)+1);
if (search_re == NULL)
{
fprintf(stderr, "Regex memory allocation failed : %s\n",
strerror(errno));
newstr = haystack;
goto retblock;
}
sprintf(search_re, "%s%s%s", prefix, needle, suffix);
/** Allocate memory for new string +1 for terminating byte */
newstr = (char *)malloc(hlen-nlen+rlen+1);
if (newstr == NULL)
{
fprintf(stderr, "Regex memory allocation failed : %s\n",
strerror(errno));
free(search_re);
newstr = haystack;
goto retblock;
}
rc = regcomp(&re, search_re, REG_EXTENDED|REG_ICASE);
ss_dassert(rc == 0);
if (rc != 0)
{
char error_message[MAX_ERROR_MSG];
regerror (rc, &re, error_message, MAX_ERROR_MSG);
fprintf(stderr,
"Regex error compiling '%s': %s\n",
search_re,
error_message);
free(search_re);
newstr = haystack;
goto retblock;
}
rc = regexec(&re, haystack, 1, &match, 0);
if (rc != 0)
{
free(search_re);
regfree(&re);
newstr = haystack;
goto retblock;
}
memcpy(newstr, haystack, match.rm_so+1);
memcpy(newstr+match.rm_so+1, replacement, rlen);
/** +1 is terminating byte */
memcpy(newstr+match.rm_so+1+rlen, haystack+match.rm_so+1+nlen, hlen-(match.rm_so+1)-nlen+1);
regfree(&re);
free(haystack);
retblock:
return newstr;
}

View File

@ -192,4 +192,12 @@ int skygw_rwlock_init(skygw_rwlock_t** rwlock);
int atomic_add(int *variable, int value);
EXTERN_C_BLOCK_BEGIN
char* replace_literal(char* haystack,
const char* needle,
const char* replacement);
EXTERN_C_BLOCK_END
#endif /* SKYGW_UTILS_H */