Merge branch 'develop' into MAX-324

Conflicts:
	query_classifier/query_classifier.cc
This commit is contained in:
Markus Makela
2015-01-14 04:56:58 +02:00
27 changed files with 2086 additions and 99 deletions

View File

@ -31,7 +31,7 @@
#include <buffer.h>
#include <string.h>
#include <mysql_client_server_protocol.h>
#include <modutil.h>
/**
* Check if a GWBUF structure is a MySQL COM_QUERY packet
*
@ -493,3 +493,39 @@ GWBUF* modutil_get_next_MySQL_packet(
return_packetbuf:
return packetbuf;
}
/**
* Count the number of EOF, OK or ERR packets in the buffer.
* @param reply Buffer to use
* @param use_ok Whether the DEPRECATE_EOF flag is set
* @param n_found If there were previous packets found
* @return Number of EOF packets
*/
int
modutil_count_signal_packets(GWBUF *reply,int use_ok, int n_found)
{
unsigned char* ptr = (unsigned char*) reply->start;
unsigned char* end = (unsigned char*) reply->end;
int pktlen,pkt = 0;
while(ptr < end)
{
pktlen = gw_mysql_get_byte3(ptr) + 4;
if(PTR_IS_ERR(ptr) || (PTR_IS_EOF(ptr) && !use_ok) || (use_ok && PTR_IS_OK(ptr)))
{
if(n_found)
{
if(ptr + pktlen >= end)
pkt++;
}
else
{
pkt++;
}
}
ptr += pktlen;
}
return pkt;
}

View File

@ -34,6 +34,12 @@
#include <buffer.h>
#include <dcb.h>
#define PTR_IS_RESULTSET(b) (b[0] == 0x01 && b[1] == 0x0 && b[2] == 0x0 && b[3] == 0x01)
#define PTR_IS_EOF(b) (b[0] == 0x05 && b[1] == 0x0 && b[2] == 0x0 && b[4] == 0xfe)
#define PTR_IS_OK(b) (b[4] == 0x00)
#define PTR_IS_ERR(b) (b[4] == 0xff)
#define PTR_IS_LOCAL_INFILE(b) (b[4] == 0xfb)
extern int modutil_is_SQL(GWBUF *);
extern int modutil_extract_SQL(GWBUF *, char **, int *);
extern int modutil_MySQL_Query(GWBUF *, char **, int *, int *);
@ -44,7 +50,6 @@ extern int modutil_send_mysql_err_packet(DCB *, int, int, int, const char *, con
GWBUF* modutil_get_next_MySQL_packet(GWBUF** p_readbuf);
int modutil_MySQL_query_len(GWBUF* buf, int* nbytes_missing);
GWBUF *modutil_create_mysql_err_msg(
int packet_number,
int affected_rows,
@ -52,4 +57,5 @@ GWBUF *modutil_create_mysql_err_msg(
const char *statemsg,
const char *msg);
int modutil_count_signal_packets(GWBUF*,int,int);
#endif

View File

@ -24,8 +24,13 @@ add_library(topfilter SHARED topfilter.c)
target_link_libraries(topfilter log_manager utils)
install(TARGETS topfilter DESTINATION modules)
add_library(fwfilter SHARED fwfilter.c)
target_link_libraries(fwfilter log_manager utils query_classifier)
install(TARGETS fwfilter DESTINATION modules)
add_subdirectory(hint)
if(BUILD_TESTS)
add_subdirectory(test)
endif()
endif()

File diff suppressed because it is too large Load Diff

View File

@ -16,6 +16,9 @@
* Copyright MariaDB Corporation Ab 2014
*/
#include "spinlock.h"
/**
* @file tee.c A filter that splits the processing pipeline in two
* @verbatim
@ -78,9 +81,6 @@
#define PARENT 0
#define CHILD 1
#define PTR_IS_RESULTSET(b) (b[0] == 0x01 && b[1] == 0x0 && b[2] == 0x0 && b[3] == 0x01)
#define PTR_IS_EOF(b) (b[4] == 0xfe)
static unsigned char required_packets[] = {
MYSQL_COM_QUIT,
MYSQL_COM_INITDB,
@ -160,6 +160,9 @@ typedef struct {
FILTER_DEF* dummy_filterdef;
int active; /* filter is active? */
bool use_ok;
bool multipacket;
unsigned char command;
bool waiting[2]; /* if the client is waiting for a reply */
int eof[2];
int replies[2]; /* Number of queries received */
@ -220,6 +223,8 @@ orphan_free(void* data)
{
if(ptr->session->state == SESSION_STATE_TO_BE_FREED)
{
if(ptr == allOrphans)
{
tmp = ptr;
@ -236,6 +241,17 @@ orphan_free(void* data)
tmp = ptr;
}
}
}
/*
* The session has been unlinked from all the DCBs and it is ready to be freed.
*/
if(ptr->session->state == SESSION_STATE_STOPPING &&
ptr->session->refcount == 0 && ptr->session->client == NULL)
{
ptr->session->state = SESSION_STATE_TO_BE_FREED;
}
#ifdef SS_DEBUG
else if(ptr->session->state == SESSION_STATE_STOPPING)
@ -577,6 +593,9 @@ char *remote, *userName;
my_session->branch_session = ses;
my_session->branch_dcb = dcb;
my_session->dummy_filterdef = dummy;
MySQLProtocol* protocol = (MySQLProtocol*)session->client->protocol;
my_session->use_ok = protocol->client_capabilities & (1 << 6);
free(dummy_upstream);
}
}
@ -640,15 +659,16 @@ freeSession(FILTER *instance, void *session)
{
TEE_SESSION *my_session = (TEE_SESSION *)session;
SESSION* ses = my_session->branch_session;
session_state_t state;
if (ses != NULL)
{
if (ses->state == SESSION_STATE_ROUTER_READY)
state = ses->state;
if (state == SESSION_STATE_ROUTER_READY)
{
session_free(ses);
}
if (ses->state == SESSION_STATE_TO_BE_FREED)
else if (state == SESSION_STATE_TO_BE_FREED)
{
/** Free branch router session */
ses->service->router->freeSession(
@ -660,7 +680,7 @@ SESSION* ses = my_session->branch_session;
/** This indicates that branch session is not available anymore */
my_session->branch_session = NULL;
}
else if(ses->state == SESSION_STATE_STOPPING)
else if(state == SESSION_STATE_STOPPING)
{
orphan_session_t* orphan;
if((orphan = malloc(sizeof(orphan_session_t))) == NULL)
@ -675,17 +695,14 @@ SESSION* ses = my_session->branch_session;
allOrphans = orphan;
spinlock_release(&orphanLock);
}
if(ses->refcount == 0)
{
ss_dassert(ses->refcount == 0 && ses->client == NULL);
ses->state = SESSION_STATE_TO_BE_FREED;
}
}
}
if (my_session->dummy_filterdef)
{
filter_free(my_session->dummy_filterdef);
}
if(my_session->tee_replybuf)
gwbuf_free(my_session->tee_replybuf);
free(session);
return;
@ -746,7 +763,7 @@ TEE_SESSION *my_session = (TEE_SESSION *)session;
char *ptr;
int length, rval, residual = 0;
GWBUF *clone = NULL;
unsigned char command = *((unsigned char*)queue->start + 4);
if (my_session->branch_session &&
my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
{
@ -787,10 +804,24 @@ GWBUF *clone = NULL;
ss_dassert(my_session->tee_replybuf == NULL);
switch(command)
{
case 0x03:
case 0x16:
case 0x17:
case 0x04:
case 0x0a:
my_session->multipacket = true;
break;
default:
my_session->multipacket = false;
break;
}
memset(my_session->replies,0,2*sizeof(int));
memset(my_session->eof,0,2*sizeof(int));
memset(my_session->waiting,0,2*sizeof(bool));
memset(my_session->waiting,1,2*sizeof(bool));
my_session->command = command;
rval = my_session->down.routeQuery(my_session->down.instance,
my_session->down.session,
queue);
@ -827,39 +858,6 @@ GWBUF *clone = NULL;
return rval;
}
/**
* Scans the GWBUF for EOF packets. If two packets for this session have been found
* from either the parent or the child branch, mark the response set from that branch as over.
* @param session The Tee filter session
* @param branch Parent or child branch
* @param reply Buffer to scan
*/
void
scan_resultset(TEE_SESSION *session, int branch, GWBUF *reply)
{
unsigned char* ptr = (unsigned char*) reply->start;
unsigned char* end = (unsigned char*) reply->end;
int pktlen = 0;
while(ptr < end)
{
pktlen = gw_mysql_get_byte3(ptr) + 4;
if(PTR_IS_EOF(ptr))
{
session->eof[branch]++;
if(session->eof[branch] == 2)
{
session->waiting[branch] = false;
session->eof[branch] = 0;
return;
}
}
ptr += pktlen;
}
}
/**
* The clientReply entry point. This is passed the response buffer
* to which the filter should be applied. Once processed the
@ -873,28 +871,49 @@ scan_resultset(TEE_SESSION *session, int branch, GWBUF *reply)
static int
clientReply (FILTER* instance, void *session, GWBUF *reply)
{
int rc, branch;
int rc, branch, eof;
TEE_SESSION *my_session = (TEE_SESSION *) session;
spinlock_acquire(&my_session->tee_lock);
ss_dassert(my_session->active);
branch = instance == NULL ? CHILD : PARENT;
unsigned char *ptr = (unsigned char*)reply->start;
if(my_session->replies[branch] == 0)
{
if(PTR_IS_RESULTSET(ptr))
/* Reply is in a single packet if it is an OK, ERR or LOCAL_INFILE packet.
* Otherwise the reply is a result set and the amount of packets is unknown.
*/
if(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) ||
PTR_IS_OK(ptr) || !my_session->multipacket )
{
my_session->waiting[branch] = true;
my_session->eof[branch] = 0;
my_session->waiting[branch] = false;
}
#ifdef SS_DEBUG
else
{
ss_dassert(PTR_IS_RESULTSET(ptr));
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c: Waiting for a result set from %s session.",branch == PARENT?"parent":"child");
}
ss_dassert(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)||
PTR_IS_OK(ptr) || my_session->waiting[branch] ||
!my_session->multipacket);
#endif
}
if(my_session->waiting[branch])
{
scan_resultset(my_session,branch,reply);
eof = modutil_count_signal_packets(reply,my_session->use_ok,my_session->eof[branch] > 0);
my_session->eof[branch] += eof;
if(my_session->eof[branch] >= 2 ||
(my_session->command == 0x04 && my_session->eof[branch] > 0))
{
ss_dassert(my_session->eof[branch] < 3)
my_session->waiting[branch] = false;
}
}
if(branch == PARENT)
@ -913,7 +932,19 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
(my_session->branch_session == NULL ||
my_session->waiting[PARENT] ||
(!my_session->waiting[CHILD] && !my_session->waiting[PARENT])))
{
{
#ifdef SS_DEBUG
skygw_log_write_flush(LOGFILE_DEBUG, "tee.c: Routing buffer '%p' parent(waiting [%s] replies [%d] eof[%d])"
" child(waiting [%s] replies[%d] eof [%d])",
my_session->tee_replybuf,
my_session->waiting[PARENT] ? "true":"false",
my_session->replies[PARENT],
my_session->eof[PARENT],
my_session->waiting[CHILD]?"true":"false",
my_session->replies[CHILD],
my_session->eof[CHILD]);
#endif
rc = my_session->up.clientReply (
my_session->up.instance,
my_session->up.session,

View File

@ -11,9 +11,17 @@ add_executable(harness harness_util.c harness_common.c ${CORE})
target_link_libraries(harness_ui fullcore log_manager utils)
target_link_libraries(harness fullcore)
execute_process(COMMAND ${CMAKE_COMMAND} -E copy ${ERRMSG} ${CMAKE_CURRENT_BINARY_DIR})
add_test(TestHintfilter /bin/sh -c "MAXSCALE_HOME=\"${CMAKE_BINARY_DIR}\" ${CMAKE_CURRENT_BINARY_DIR}/harness -i ${CMAKE_CURRENT_SOURCE_DIR}/hint_testing.input -o ${CMAKE_CURRENT_BINARY_DIR}/hint_testing.output -c ${CMAKE_CURRENT_SOURCE_DIR}/hint_testing.cnf -t 1 -s 1 -e ${CMAKE_CURRENT_SOURCE_DIR}/hint_testing.expected")
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/testdriver.sh ${CMAKE_CURRENT_BINARY_DIR}/testdriver.sh @ONLY)
add_test(TestRegexfilter /bin/sh -c "MAXSCALE_HOME=\"${CMAKE_BINARY_DIR}\" ${CMAKE_CURRENT_BINARY_DIR}/harness -i ${CMAKE_CURRENT_SOURCE_DIR}/regextest.input -o ${CMAKE_CURRENT_BINARY_DIR}/regextest.output -c ${CMAKE_CURRENT_SOURCE_DIR}/regextest.cnf -t 1 -s 1 -e ${CMAKE_CURRENT_SOURCE_DIR}/regextest.expected")
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/hintfilter/hint_testing.cnf ${CMAKE_CURRENT_BINARY_DIR}/hintfilter/hint_testing.cnf)
add_test(TestHintfilter testdriver.sh hintfilter/hint_testing.cnf hintfilter/hint_testing.input hintfilter/hint_testing.output hintfilter/hint_testing.expected)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/regexfilter/regextest.cnf ${CMAKE_CURRENT_BINARY_DIR}/regexfilter/regextest.cnf)
add_test(TestRegexfilter testdriver.sh regexfilter/regextest.cnf regexfilter/regextest.input regexfilter/regextest.output regexfilter/regextest.expected)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fwfilter/fwtest.cnf.in ${CMAKE_CURRENT_BINARY_DIR}/fwfilter/fwtest.cnf)
add_test(TestFwfilter1 testdriver.sh fwfilter/fwtest.cnf fwfilter/fwtest.input fwfilter/fwtest.output fwfilter/fwtest.expected)
add_test(TestFwfilter2 testdriver.sh fwfilter/fwtest.cnf fwfilter/fwtest2.input fwfilter/fwtest2.output fwfilter/fwtest2.expected)
add_test(TestTeeRecursion ${CMAKE_CURRENT_SOURCE_DIR}/tee_recursion.sh
${CMAKE_BINARY_DIR}
@ -22,3 +30,7 @@ add_test(TestTeeRecursion ${CMAKE_CURRENT_SOURCE_DIR}/tee_recursion.sh
${TEST_PASSWORD}
${TEST_HOST}
${TEST_PORT})
set_tests_properties(TestHintfilter TestRegexfilter TestFwfilter1 TestFwfilter2 TestTeeRecursion
PROPERTIES
ENVIRONMENT MAXSCALE_HOME=${CMAKE_BINARY_DIR}/)

View File

@ -0,0 +1,4 @@
[Firewall]
type=filter
module=fwfilter
rules=@CMAKE_CURRENT_SOURCE_DIR@/rules

View File

@ -0,0 +1,8 @@
select id from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1;

View File

@ -0,0 +1,10 @@
delete from t1;
select id from t1;
select id from t1;
select id from t1;
delete from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1;

View File

@ -0,0 +1,8 @@
select id from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1;

View File

@ -0,0 +1,8 @@
select id from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1;

View File

@ -0,0 +1,10 @@
select id from t1;
select id from t1 union select name from t2;
select id from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1;
select id from t1 union select name from t2;
select id from t1;
select id from t1;

View File

@ -10,7 +10,7 @@ int dcbfun(struct dcb* dcb, GWBUF * buffer)
int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){
int i = 0;
int i = 0,rval = 0;
MYSQL_session* mysqlsess;
DCB* dcb;
char cwd[1024];
@ -60,7 +60,7 @@ int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){
skygw_logmanager_init( 3, optstr);
free(optstr);
process_opts(argc,argv);
rval = process_opts(argc,argv);
if(!(instance.thrpool = malloc(instance.thrcount * sizeof(pthread_t)))){
printf("Error: Out of memory\n");
@ -72,10 +72,10 @@ int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){
pthread_mutex_lock(&instance.work_mtx);
size_t thr_num = 1;
for(i = 0;i<instance.thrcount;i++){
pthread_create(&instance.thrpool[i],NULL,(void*)work_buffer,(void*)thr_num++);
rval |= pthread_create(&instance.thrpool[i],NULL,(void*)work_buffer,(void*)thr_num++);
}
return 0;
return rval;
}
void free_filters()
@ -543,10 +543,14 @@ int load_config( char* fname)
{
CONFIG* iter;
CONFIG_ITEM* item;
int config_ok = 1;
int config_ok = 1,inirval;
free_filters();
if(ini_parse(fname,handler,instance.conf) < 0){
if((inirval = ini_parse(fname,handler,instance.conf)) < 0){
printf("Error parsing configuration file!\n");
if(inirval == -1)
printf("Inih file open error.\n");
else if(inirval == -2)
printf("inih memory error.\n");
skygw_log_write(LOGFILE_ERROR,"Error parsing configuration file!\n");
config_ok = 0;
goto cleanup;
@ -991,7 +995,7 @@ GWBUF* gen_packet(PACKET pkt)
int process_opts(int argc, char** argv)
{
int fd, buffsize = 1024;
int rd,rdsz, rval = 0;
int rd,rdsz, rval = 0,error;
size_t fsize;
char *buff = calloc(buffsize,sizeof(char)), *tok = NULL;
@ -1071,6 +1075,7 @@ int process_opts(int argc, char** argv)
free(conf_name);
}
conf_name = strdup(optarg);
printf("Configuration: %s\n",optarg);
break;
case 'q':
@ -1079,12 +1084,12 @@ int process_opts(int argc, char** argv)
case 's':
instance.session_count = atoi(optarg);
printf("Sessions: %i ",instance.session_count);
printf("Sessions: %i\n",instance.session_count);
break;
case 't':
instance.thrcount = atoi(optarg);
printf("Threads: %i ",instance.thrcount);
printf("Threads: %i\n",instance.thrcount);
break;
case 'd':
@ -1121,7 +1126,7 @@ int process_opts(int argc, char** argv)
}
printf("\n");
if(conf_name && load_config(conf_name)){
if(conf_name && (error = load_config(conf_name))){
load_query();
}else{
instance.running = 0;
@ -1129,6 +1134,11 @@ int process_opts(int argc, char** argv)
free(conf_name);
close(fd);
if(!error)
{
rval = 1;
}
return rval;
}

View File

@ -41,7 +41,8 @@ int main(int argc,char** argv)
}
route_buffers();
if(inst->expected){
if(inst->expected > 0){
return compare_files(inst->outfile,inst->expected);
}
return 0;

View File

@ -0,0 +1,4 @@
rule union_regex deny regex '.*union.*'
rule dont_delete_everything deny no_where_clause on_operations delete|update
rule no_wildcard deny wildcard
users %@% match any rules union_regex dont_delete_everything no_wildcard

View File

@ -0,0 +1,11 @@
#! /bin/bash
if [[ $# -lt 4 ]]
then
echo "Usage: $0 <config file> <input> <output> <expected>"
exit 1
fi
TESTDIR=@CMAKE_CURRENT_BINARY_DIR@
SRCDIR=@CMAKE_CURRENT_SOURCE_DIR@
$TESTDIR/harness -i $SRCDIR/$2 -o $TESTDIR/$3 -c $TESTDIR/$1 -t 1 -s 1 -e $SRCDIR/$4
exit $?

View File

@ -1494,12 +1494,17 @@ retblock:
* and lengths have been noticed and counted.
* Session commands need to be marked so that they can be handled properly in
* the router's clientReply.
* Return the pointer to outbuf.
*
* @param dcb Backend's DCB where data was read from
* @param readbuf GWBUF where data was read to
* @param nbytes_to_process Number of bytes that has been read and need to be processed
*
* @return GWBUF which includes complete MySQL packet
*/
static GWBUF* process_response_data (
DCB* dcb,
GWBUF* readbuf,
int nbytes_to_process) /*< number of new bytes read */
int nbytes_to_process)
{
int npackets_left = 0; /*< response's packet count */
ssize_t nbytes_left = 0; /*< nbytes to be read for the packet */