Merge branch 'develop' into firewall
Conflicts: server/modules/filter/test/CMakeLists.txt
This commit is contained in:
@ -213,7 +213,7 @@ HINT_MODE mode = HM_EXECUTE;
|
||||
/*
|
||||
* If we have got here then we have a comment, ptr point to
|
||||
* the comment character if it is a '#' comment or the second
|
||||
* character of the comment if it is a -- or /* comment
|
||||
* character of the comment if it is a -- or \/\* comment
|
||||
*
|
||||
* Move to the next character in the SQL.
|
||||
*/
|
||||
|
@ -58,6 +58,10 @@
|
||||
#include <service.h>
|
||||
#include <router.h>
|
||||
#include <dcb.h>
|
||||
#include <sys/time.h>
|
||||
#include <poll.h>
|
||||
#include <mysql_client_server_protocol.h>
|
||||
#include <housekeeper.h>
|
||||
|
||||
#define MYSQL_COM_QUIT 0x01
|
||||
#define MYSQL_COM_INITDB 0x02
|
||||
@ -69,6 +73,13 @@
|
||||
#define MYSQL_COM_STMT_CLOSE 0x19
|
||||
#define MYSQL_COM_STMT_RESET 0x1a
|
||||
|
||||
#define REPLY_TIMEOUT_SECOND 5
|
||||
#define REPLY_TIMEOUT_MILLISECOND 1
|
||||
#define PARENT 0
|
||||
#define CHILD 1
|
||||
|
||||
#define PTR_IS_RESULTSET(b) (b[0] == 0x01 && b[1] == 0x0 && b[2] == 0x0 && b[3] == 0x01)
|
||||
#define PTR_IS_EOF(b) (b[4] == 0xfe)
|
||||
|
||||
static unsigned char required_packets[] = {
|
||||
MYSQL_COM_QUIT,
|
||||
@ -104,19 +115,20 @@ static void *newSession(FILTER *instance, SESSION *session);
|
||||
static void closeSession(FILTER *instance, void *session);
|
||||
static void freeSession(FILTER *instance, void *session);
|
||||
static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
|
||||
static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream);
|
||||
static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
|
||||
static int clientReply(FILTER *instance, void *fsession, GWBUF *queue);
|
||||
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
|
||||
|
||||
|
||||
static FILTER_OBJECT MyObject = {
|
||||
createInstance,
|
||||
newSession,
|
||||
closeSession,
|
||||
freeSession,
|
||||
setDownstream,
|
||||
NULL, // No Upstream requirement
|
||||
setUpstream,
|
||||
routeQuery,
|
||||
NULL, // No client reply
|
||||
clientReply,
|
||||
diagnostic,
|
||||
};
|
||||
|
||||
@ -144,15 +156,135 @@ typedef struct {
|
||||
*/
|
||||
typedef struct {
|
||||
DOWNSTREAM down; /* The downstream filter */
|
||||
UPSTREAM up; /* The upstream filter */
|
||||
|
||||
FILTER_DEF* dummy_filterdef;
|
||||
int active; /* filter is active? */
|
||||
bool waiting[2]; /* if the client is waiting for a reply */
|
||||
int eof[2];
|
||||
int replies[2]; /* Number of queries received */
|
||||
DCB *branch_dcb; /* Client DCB for "branch" service */
|
||||
SESSION *branch_session;/* The branch service session */
|
||||
int n_duped; /* Number of duplicated queries */
|
||||
int n_rejected; /* Number of rejected queries */
|
||||
int residual; /* Any outstanding SQL text */
|
||||
GWBUF* tee_replybuf; /* Buffer for reply */
|
||||
SPINLOCK tee_lock;
|
||||
} TEE_SESSION;
|
||||
|
||||
typedef struct orphan_session_tt
|
||||
{
|
||||
SESSION* session;
|
||||
struct orphan_session_tt* next;
|
||||
}orphan_session_t;
|
||||
|
||||
static orphan_session_t* allOrphans = NULL;
|
||||
|
||||
static SPINLOCK orphanLock;
|
||||
static int packet_is_required(GWBUF *queue);
|
||||
static int detect_loops(TEE_INSTANCE *instance, HASHTABLE* ht, SERVICE* session);
|
||||
|
||||
static int hkfn(
|
||||
void* key)
|
||||
{
|
||||
if(key == NULL){
|
||||
return 0;
|
||||
}
|
||||
unsigned int hash = 0,c = 0;
|
||||
char* ptr = (char*)key;
|
||||
while((c = *ptr++)){
|
||||
hash = c + (hash << 6) + (hash << 16) - hash;
|
||||
}
|
||||
return *(int *)key;
|
||||
}
|
||||
|
||||
static int hcfn(
|
||||
void* v1,
|
||||
void* v2)
|
||||
{
|
||||
char* i1 = (char*) v1;
|
||||
char* i2 = (char*) v2;
|
||||
|
||||
return strcmp(i1,i2);
|
||||
}
|
||||
|
||||
static void
|
||||
orphan_free(void* data)
|
||||
{
|
||||
spinlock_acquire(&orphanLock);
|
||||
orphan_session_t *ptr = allOrphans, *finished = NULL, *tmp = NULL;
|
||||
#ifdef SS_DEBUG
|
||||
int o_stopping = 0, o_ready = 0, o_freed = 0;
|
||||
#endif
|
||||
while(ptr)
|
||||
{
|
||||
if(ptr->session->state == SESSION_STATE_TO_BE_FREED)
|
||||
{
|
||||
if(ptr == allOrphans)
|
||||
{
|
||||
tmp = ptr;
|
||||
allOrphans = ptr->next;
|
||||
}
|
||||
else
|
||||
{
|
||||
tmp = allOrphans;
|
||||
while(tmp && tmp->next != ptr)
|
||||
tmp = tmp->next;
|
||||
if(tmp)
|
||||
{
|
||||
tmp->next = ptr->next;
|
||||
tmp = ptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
#ifdef SS_DEBUG
|
||||
else if(ptr->session->state == SESSION_STATE_STOPPING)
|
||||
{
|
||||
o_stopping++;
|
||||
}
|
||||
else if(ptr->session->state == SESSION_STATE_ROUTER_READY)
|
||||
{
|
||||
o_ready++;
|
||||
}
|
||||
#endif
|
||||
ptr = ptr->next;
|
||||
if(tmp)
|
||||
{
|
||||
tmp->next = finished;
|
||||
finished = tmp;
|
||||
tmp = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
spinlock_release(&orphanLock);
|
||||
|
||||
#ifdef SS_DEBUG
|
||||
if(o_stopping + o_ready > 0)
|
||||
skygw_log_write(LOGFILE_DEBUG, "tee.c: %d orphans in "
|
||||
"SESSION_STATE_STOPPING, %d orphans in "
|
||||
"SESSION_STATE_ROUTER_READY. ", o_stopping, o_ready);
|
||||
#endif
|
||||
|
||||
while(finished)
|
||||
{
|
||||
o_freed++;
|
||||
tmp = finished;
|
||||
finished = finished->next;
|
||||
|
||||
tmp->session->service->router->freeSession(
|
||||
tmp->session->service->router_instance,
|
||||
tmp->session->router_session);
|
||||
|
||||
tmp->session->state = SESSION_STATE_FREE;
|
||||
free(tmp->session);
|
||||
free(tmp);
|
||||
}
|
||||
|
||||
#ifdef SS_DEBUG
|
||||
skygw_log_write(LOGFILE_DEBUG, "tee.c: %d orphans freed.", o_freed);
|
||||
#endif
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of the mandatory version entry point
|
||||
*
|
||||
@ -171,6 +303,8 @@ version()
|
||||
void
|
||||
ModuleInit()
|
||||
{
|
||||
spinlock_init(&orphanLock);
|
||||
hktask_add("tee orphan cleanup",orphan_free,NULL,15);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -257,7 +391,8 @@ int i;
|
||||
free(my_instance->source);
|
||||
free(my_instance);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (my_instance->match &&
|
||||
regcomp(&my_instance->re, my_instance->match, REG_ICASE))
|
||||
{
|
||||
@ -313,12 +448,25 @@ char *remote, *userName;
|
||||
my_session = NULL;
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
|
||||
HASHTABLE* ht = hashtable_alloc(100,hkfn,hcfn);
|
||||
bool is_loop = detect_loops(my_instance,ht,session->service);
|
||||
hashtable_free(ht);
|
||||
|
||||
if(is_loop)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||
"Error : %s: Recursive use of tee filter in service.",
|
||||
session->service->name)));
|
||||
my_session = NULL;
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
if ((my_session = calloc(1, sizeof(TEE_SESSION))) != NULL)
|
||||
{
|
||||
my_session->active = 1;
|
||||
my_session->residual = 0;
|
||||
|
||||
spinlock_init(&my_session->tee_lock);
|
||||
if (my_instance->source &&
|
||||
(remote = session_get_remote(session)) != NULL)
|
||||
{
|
||||
@ -326,7 +474,7 @@ char *remote, *userName;
|
||||
{
|
||||
my_session->active = 0;
|
||||
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Warning : Tee filter is not active.")));
|
||||
}
|
||||
@ -339,7 +487,7 @@ char *remote, *userName;
|
||||
{
|
||||
my_session->active = 0;
|
||||
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Warning : Tee filter is not active.")));
|
||||
}
|
||||
@ -348,33 +496,88 @@ char *remote, *userName;
|
||||
{
|
||||
DCB* dcb;
|
||||
SESSION* ses;
|
||||
|
||||
FILTER_DEF* dummy;
|
||||
UPSTREAM* dummy_upstream;
|
||||
|
||||
if ((dcb = dcb_clone(session->client)) == NULL)
|
||||
{
|
||||
freeSession(my_instance, (void *)my_session);
|
||||
freeSession(instance, (void *)my_session);
|
||||
my_session = NULL;
|
||||
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Creating client DCB for Tee "
|
||||
"filter failed. Terminating session.")));
|
||||
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
if((dummy = filter_alloc("tee_dummy","tee_dummy")) == NULL)
|
||||
{
|
||||
dcb_close(dcb);
|
||||
freeSession(instance, (void *)my_session);
|
||||
my_session = NULL;
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : tee: Allocating memory for "
|
||||
"dummy filter definition failed."
|
||||
" Terminating session.")));
|
||||
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
|
||||
|
||||
if ((ses = session_alloc(my_instance->service, dcb)) == NULL)
|
||||
{
|
||||
dcb_close(dcb);
|
||||
freeSession(my_instance, (void *)my_session);
|
||||
freeSession(instance, (void *)my_session);
|
||||
my_session = NULL;
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Creating client session for Tee "
|
||||
"filter failed. Terminating session.")));
|
||||
|
||||
goto retblock;
|
||||
}
|
||||
my_session->branch_session = ses;
|
||||
my_session->branch_dcb = dcb;
|
||||
|
||||
ss_dassert(ses->ses_is_child);
|
||||
|
||||
dummy->obj = GetModuleObject();
|
||||
dummy->filter = NULL;
|
||||
|
||||
|
||||
if((dummy_upstream = filterUpstream(
|
||||
dummy, my_session, &ses->tail)) == NULL)
|
||||
{
|
||||
spinlock_acquire(&ses->ses_lock);
|
||||
ses->state = SESSION_STATE_STOPPING;
|
||||
spinlock_release(&ses->ses_lock);
|
||||
|
||||
ses->service->router->closeSession(
|
||||
ses->service->router_instance,
|
||||
ses->router_session);
|
||||
|
||||
ses->client = NULL;
|
||||
dcb->session = NULL;
|
||||
session_free(ses);
|
||||
dcb_close(dcb);
|
||||
freeSession(instance, (void *) my_session);
|
||||
my_session = NULL;
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : tee: Allocating memory for"
|
||||
"dummy upstream failed."
|
||||
" Terminating session.")));
|
||||
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
ses->tail = *dummy_upstream;
|
||||
my_session->branch_session = ses;
|
||||
my_session->branch_dcb = dcb;
|
||||
my_session->dummy_filterdef = dummy;
|
||||
free(dummy_upstream);
|
||||
}
|
||||
}
|
||||
retblock:
|
||||
@ -421,6 +624,7 @@ SESSION *bsession;
|
||||
* a side effect of closing the client DCB of the
|
||||
* session.
|
||||
*/
|
||||
|
||||
my_session->active = 0;
|
||||
}
|
||||
}
|
||||
@ -435,11 +639,57 @@ static void
|
||||
freeSession(FILTER *instance, void *session)
|
||||
{
|
||||
TEE_SESSION *my_session = (TEE_SESSION *)session;
|
||||
SESSION* ses = my_session->branch_session;
|
||||
|
||||
if (ses != NULL)
|
||||
{
|
||||
if (ses->state == SESSION_STATE_ROUTER_READY)
|
||||
{
|
||||
session_free(ses);
|
||||
}
|
||||
|
||||
if (ses->state == SESSION_STATE_TO_BE_FREED)
|
||||
{
|
||||
/** Free branch router session */
|
||||
ses->service->router->freeSession(
|
||||
ses->service->router_instance,
|
||||
ses->router_session);
|
||||
/** Free memory of branch client session */
|
||||
ses->state = SESSION_STATE_FREE;
|
||||
free(ses);
|
||||
/** This indicates that branch session is not available anymore */
|
||||
my_session->branch_session = NULL;
|
||||
}
|
||||
else if(ses->state == SESSION_STATE_STOPPING)
|
||||
{
|
||||
orphan_session_t* orphan;
|
||||
if((orphan = malloc(sizeof(orphan_session_t))) == NULL)
|
||||
{
|
||||
skygw_log_write(LOGFILE_ERROR,"Error : Failed to "
|
||||
"allocate memory for orphan session struct, "
|
||||
"child session might leak memory.");
|
||||
}else{
|
||||
orphan->session = ses;
|
||||
spinlock_acquire(&orphanLock);
|
||||
orphan->next = allOrphans;
|
||||
allOrphans = orphan;
|
||||
spinlock_release(&orphanLock);
|
||||
}
|
||||
if(ses->refcount == 0)
|
||||
{
|
||||
ss_dassert(ses->refcount == 0 && ses->client == NULL);
|
||||
ses->state = SESSION_STATE_TO_BE_FREED;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (my_session->dummy_filterdef)
|
||||
{
|
||||
filter_free(my_session->dummy_filterdef);
|
||||
}
|
||||
free(session);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the downstream filter or router to which queries will be
|
||||
* passed from this filter.
|
||||
@ -451,9 +701,23 @@ TEE_SESSION *my_session = (TEE_SESSION *)session;
|
||||
static void
|
||||
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
|
||||
{
|
||||
TEE_SESSION *my_session = (TEE_SESSION *)session;
|
||||
TEE_SESSION *my_session = (TEE_SESSION *) session;
|
||||
my_session->down = *downstream;
|
||||
}
|
||||
|
||||
my_session->down = *downstream;
|
||||
/**
|
||||
* Set the downstream filter or router to which queries will be
|
||||
* passed from this filter.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The filter session
|
||||
* @param downstream The downstream filter or router.
|
||||
*/
|
||||
static void
|
||||
setUpstream(FILTER *instance, void *session, UPSTREAM *upstream)
|
||||
{
|
||||
TEE_SESSION *my_session = (TEE_SESSION *) session;
|
||||
my_session->up = *upstream;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -483,51 +747,187 @@ char *ptr;
|
||||
int length, rval, residual = 0;
|
||||
GWBUF *clone = NULL;
|
||||
|
||||
if (my_session->residual)
|
||||
if (my_session->branch_session &&
|
||||
my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
|
||||
{
|
||||
clone = gwbuf_clone(queue);
|
||||
if (my_session->residual < GWBUF_LENGTH(clone))
|
||||
GWBUF_RTRIM(clone, GWBUF_LENGTH(clone) - residual);
|
||||
my_session->residual -= GWBUF_LENGTH(clone);
|
||||
if (my_session->residual < 0)
|
||||
my_session->residual = 0;
|
||||
}
|
||||
else if ( my_session->active && (ptr = modutil_get_SQL(queue)) != NULL)
|
||||
{
|
||||
if ((my_instance->match == NULL ||
|
||||
regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) &&
|
||||
(my_instance->nomatch == NULL ||
|
||||
regexec(&my_instance->nore,ptr,0,NULL, 0) != 0))
|
||||
if (my_session->residual)
|
||||
{
|
||||
char *dummy;
|
||||
|
||||
modutil_MySQL_Query(queue, &dummy, &length, &residual);
|
||||
clone = gwbuf_clone(queue);
|
||||
my_session->residual = residual;
|
||||
|
||||
clone = gwbuf_clone_all(queue);
|
||||
|
||||
if (my_session->residual < GWBUF_LENGTH(clone))
|
||||
{
|
||||
GWBUF_RTRIM(clone, GWBUF_LENGTH(clone) - residual);
|
||||
}
|
||||
my_session->residual -= GWBUF_LENGTH(clone);
|
||||
|
||||
if (my_session->residual < 0)
|
||||
{
|
||||
my_session->residual = 0;
|
||||
}
|
||||
}
|
||||
else if (my_session->active && (ptr = modutil_get_SQL(queue)) != NULL)
|
||||
{
|
||||
if ((my_instance->match == NULL ||
|
||||
regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) &&
|
||||
(my_instance->nomatch == NULL ||
|
||||
regexec(&my_instance->nore,ptr,0,NULL, 0) != 0))
|
||||
{
|
||||
length = modutil_MySQL_query_len(queue, &residual);
|
||||
clone = gwbuf_clone_all(queue);
|
||||
my_session->residual = residual;
|
||||
}
|
||||
free(ptr);
|
||||
}
|
||||
else if (packet_is_required(queue))
|
||||
{
|
||||
clone = gwbuf_clone_all(queue);
|
||||
}
|
||||
free(ptr);
|
||||
}
|
||||
else if (packet_is_required(queue))
|
||||
{
|
||||
clone = gwbuf_clone(queue);
|
||||
}
|
||||
|
||||
/* Pass the query downstream */
|
||||
rval = my_session->down.routeQuery(my_session->down.instance,
|
||||
my_session->down.session, queue);
|
||||
|
||||
ss_dassert(my_session->tee_replybuf == NULL);
|
||||
|
||||
memset(my_session->replies,0,2*sizeof(int));
|
||||
memset(my_session->eof,0,2*sizeof(int));
|
||||
memset(my_session->waiting,0,2*sizeof(bool));
|
||||
|
||||
rval = my_session->down.routeQuery(my_session->down.instance,
|
||||
my_session->down.session,
|
||||
queue);
|
||||
if (clone)
|
||||
{
|
||||
my_session->n_duped++;
|
||||
SESSION_ROUTE_QUERY(my_session->branch_session, clone);
|
||||
|
||||
if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
|
||||
{
|
||||
SESSION_ROUTE_QUERY(my_session->branch_session, clone);
|
||||
}
|
||||
else
|
||||
{
|
||||
/** Close tee session */
|
||||
my_session->active = 0;
|
||||
LOGIF(LT, (skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"Closed tee filter session.")));
|
||||
gwbuf_free(clone);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (my_session->active)
|
||||
{
|
||||
LOGIF(LT, (skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"Closed tee filter session.")));
|
||||
my_session->active = 0;
|
||||
}
|
||||
my_session->n_rejected++;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Scans the GWBUF for EOF packets. If two packets for this session have been found
|
||||
* from either the parent or the child branch, mark the response set from that branch as over.
|
||||
* @param session The Tee filter session
|
||||
* @param branch Parent or child branch
|
||||
* @param reply Buffer to scan
|
||||
*/
|
||||
void
|
||||
scan_resultset(TEE_SESSION *session, int branch, GWBUF *reply)
|
||||
{
|
||||
unsigned char* ptr = (unsigned char*) reply->start;
|
||||
unsigned char* end = (unsigned char*) reply->end;
|
||||
int pktlen = 0;
|
||||
|
||||
while(ptr < end)
|
||||
{
|
||||
pktlen = gw_mysql_get_byte3(ptr) + 4;
|
||||
if(PTR_IS_EOF(ptr))
|
||||
{
|
||||
session->eof[branch]++;
|
||||
|
||||
if(session->eof[branch] == 2)
|
||||
{
|
||||
session->waiting[branch] = false;
|
||||
session->eof[branch] = 0;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
ptr += pktlen;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The clientReply entry point. This is passed the response buffer
|
||||
* to which the filter should be applied. Once processed the
|
||||
* query is passed to the upstream component
|
||||
* (filter or router) in the filter chain.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The filter session
|
||||
* @param reply The response data
|
||||
*/
|
||||
static int
|
||||
clientReply (FILTER* instance, void *session, GWBUF *reply)
|
||||
{
|
||||
int rc, branch;
|
||||
TEE_SESSION *my_session = (TEE_SESSION *) session;
|
||||
|
||||
spinlock_acquire(&my_session->tee_lock);
|
||||
|
||||
ss_dassert(my_session->active);
|
||||
|
||||
branch = instance == NULL ? CHILD : PARENT;
|
||||
unsigned char *ptr = (unsigned char*)reply->start;
|
||||
|
||||
if(my_session->replies[branch] == 0)
|
||||
{
|
||||
if(PTR_IS_RESULTSET(ptr))
|
||||
{
|
||||
my_session->waiting[branch] = true;
|
||||
my_session->eof[branch] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if(my_session->waiting[branch])
|
||||
{
|
||||
scan_resultset(my_session,branch,reply);
|
||||
}
|
||||
|
||||
if(branch == PARENT)
|
||||
{
|
||||
ss_dassert(my_session->tee_replybuf == NULL)
|
||||
my_session->tee_replybuf = reply;
|
||||
}
|
||||
else
|
||||
{
|
||||
gwbuf_free(reply);
|
||||
}
|
||||
|
||||
my_session->replies[branch]++;
|
||||
|
||||
if(my_session->tee_replybuf != NULL &&
|
||||
(my_session->branch_session == NULL ||
|
||||
my_session->waiting[PARENT] ||
|
||||
(!my_session->waiting[CHILD] && !my_session->waiting[PARENT])))
|
||||
{
|
||||
rc = my_session->up.clientReply (
|
||||
my_session->up.instance,
|
||||
my_session->up.session,
|
||||
my_session->tee_replybuf);
|
||||
my_session->tee_replybuf = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
rc = 1;
|
||||
}
|
||||
|
||||
spinlock_release(&my_session->tee_lock);
|
||||
return rc;
|
||||
}
|
||||
/**
|
||||
* Diagnostics routine
|
||||
*
|
||||
@ -589,3 +989,52 @@ int i;
|
||||
return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Detects possible loops in the query cloning chain.
|
||||
*/
|
||||
int detect_loops(TEE_INSTANCE *instance,HASHTABLE* ht, SERVICE* service)
|
||||
{
|
||||
SERVICE* svc = service;
|
||||
int i;
|
||||
|
||||
if(ht == NULL)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
if(hashtable_add(ht,(void*)service->name,(void*)true) == 0)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
for(i = 0;i<svc->n_filters;i++)
|
||||
{
|
||||
if(strcmp(svc->filters[i]->module,"tee") == 0)
|
||||
{
|
||||
/*
|
||||
* Found a Tee filter, recurse down its path
|
||||
* if the service name isn't already in the hashtable.
|
||||
*/
|
||||
|
||||
TEE_INSTANCE* ninst = (TEE_INSTANCE*)svc->filters[i]->filter;
|
||||
if(ninst == NULL)
|
||||
{
|
||||
/**
|
||||
* This tee instance hasn't been initialized yet and full
|
||||
* resolution of recursion cannot be done now.
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
SERVICE* tgt = ninst->service;
|
||||
|
||||
if(detect_loops((TEE_INSTANCE*)svc->filters[i]->filter,ht,tgt))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
@ -11,16 +11,26 @@ add_executable(harness harness_util.c harness_common.c ${CORE})
|
||||
target_link_libraries(harness_ui fullcore log_manager utils)
|
||||
target_link_libraries(harness fullcore)
|
||||
execute_process(COMMAND ${CMAKE_COMMAND} -E copy ${ERRMSG} ${CMAKE_CURRENT_BINARY_DIR})
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fwfilter/fwtest.cnf.in ${CMAKE_CURRENT_BINARY_DIR}/fwfilter.cnf)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/testdriver.sh ${CMAKE_CURRENT_BINARY_DIR}/testdriver.sh @ONLY)
|
||||
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/hintfilter/hint_testing.cnf ${CMAKE_CURRENT_BINARY_DIR}/hintfilter/hint_testing.cnf)
|
||||
add_test(TestHintfilter testdriver.sh hintfilter/hint_testing.cnf hintfilter/hint_testing.input hintfilter/hint_testing.output hintfilter/hint_testing.expected)
|
||||
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/regexfilter/regextest.cnf ${CMAKE_CURRENT_BINARY_DIR}/regexfilter/regextest.cnf)
|
||||
add_test(TestRegexfilter testdriver.sh regexfilter/regextest.cnf regexfilter/regextest.input regexfilter/regextest.output regexfilter/regextest.expected)
|
||||
|
||||
add_test(TestHintfilter /bin/sh -c "MAXSCALE_HOME=\"${CMAKE_BINARY_DIR}\" ${CMAKE_CURRENT_BINARY_DIR}/harness -i ${CMAKE_CURRENT_SOURCE_DIR}/hintfilter/hint_testing.input -o ${CMAKE_CURRENT_BINARY_DIR}/hintfilter/hint_testing.output -c ${CMAKE_CURRENT_SOURCE_DIR}/hintfilter/hint_testing.cnf -t 1 -s 1 -e ${CMAKE_CURRENT_SOURCE_DIR}/hintfilter/hint_testing.expected ")
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fwfilter/fwtest.cnf.in ${CMAKE_CURRENT_BINARY_DIR}/fwfilter/fwtest.cnf)
|
||||
add_test(TestFwfilter1 testdriver.sh fwfilter/fwtest.cnf fwfilter/fwtest.input fwfilter/fwtest.output fwfilter/fwtest.expected)
|
||||
add_test(TestFwfilter2 testdriver.sh fwfilter/fwtest.cnf fwfilter/fwtest2.input fwfilter/fwtest2.output fwfilter/fwtest2.expected)
|
||||
|
||||
add_test(TestRegexfilter /bin/sh -c "MAXSCALE_HOME=\"${CMAKE_BINARY_DIR}\" ${CMAKE_CURRENT_BINARY_DIR}/harness -i ${CMAKE_CURRENT_SOURCE_DIR}/regexfilter/regextest.input -o ${CMAKE_CURRENT_BINARY_DIR}/regexfilter/regextest.output -c ${CMAKE_CURRENT_SOURCE_DIR}/regexfilter/regextest.cnf -t 1 -s 1 -e ${CMAKE_CURRENT_SOURCE_DIR}/regexfilter/regextest.expected ")
|
||||
add_test(TestTeeRecursion ${CMAKE_CURRENT_SOURCE_DIR}/tee_recursion.sh
|
||||
${CMAKE_BINARY_DIR}
|
||||
${CMAKE_SOURCE_DIR}
|
||||
${TEST_USER}
|
||||
${TEST_PASSWORD}
|
||||
${TEST_HOST}
|
||||
${TEST_PORT})
|
||||
|
||||
add_test(TestFwfilter /bin/sh -c "MAXSCALE_HOME=\"${CMAKE_BINARY_DIR}\" ${CMAKE_CURRENT_BINARY_DIR}/harness -i ${CMAKE_CURRENT_SOURCE_DIR}/fwfilter/fwtest.input -o ${CMAKE_CURRENT_BINARY_DIR}/fwfilter/fwtest.output -c ${CMAKE_CURRENT_BINARY_DIR}/fwfilter/fwfilter.cnf -t 1 -s 1 -e ${CMAKE_CURRENT_SOURCE_DIR}/fwfilter/fwtest.expected ")
|
||||
|
||||
add_test(TestFwfilter2 /bin/sh -c "MAXSCALE_HOME=\"${CMAKE_BINARY_DIR}\" ${CMAKE_CURRENT_BINARY_DIR}/harness -i ${CMAKE_CURRENT_SOURCE_DIR}/fwfilter/fwtest2.input -o ${CMAKE_CURRENT_BINARY_DIR}/fwfilter/fwtest2.output -c ${CMAKE_CURRENT_BINARY_DIR}/fwfilter/fwfilter.cnf -t 1 -s 1 -e ${CMAKE_CURRENT_SOURCE_DIR}/fwfilter/fwtest2.expected ")
|
||||
|
||||
add_test(TestFwfilter3 /bin/sh -c "MAXSCALE_HOME=\"${CMAKE_BINARY_DIR}\" ${CMAKE_CURRENT_BINARY_DIR}/harness -i ${CMAKE_CURRENT_SOURCE_DIR}/fwfilter/fwtest3.input -o ${CMAKE_CURRENT_BINARY_DIR}/fwfilter/fwtest3.output -c ${CMAKE_CURRENT_BINARY_DIR}/fwfilter/fwfilter.cnf -t 1 -s 1 -e ${CMAKE_CURRENT_SOURCE_DIR}/fwfilter/fwtest3.expected ")
|
||||
set_tests_properties(TestHintfilter TestRegexfilter TestFwfilter1 TestFwfilter2 TestTeeRecursion
|
||||
PROPERTIES
|
||||
ENVIRONMENT MAXSCALE_HOME=${CMAKE_BINARY_DIR}/)
|
||||
|
0
server/modules/filter/test/fwfilter/fwtest.cnf.in
Normal file → Executable file
0
server/modules/filter/test/fwfilter/fwtest.cnf.in
Normal file → Executable file
0
server/modules/filter/test/fwfilter/fwtest.input
Normal file → Executable file
0
server/modules/filter/test/fwfilter/fwtest.input
Normal file → Executable file
0
server/modules/filter/test/fwfilter/fwtest2.expected
Normal file → Executable file
0
server/modules/filter/test/fwfilter/fwtest2.expected
Normal file → Executable file
0
server/modules/filter/test/fwfilter/fwtest2.input
Normal file → Executable file
0
server/modules/filter/test/fwfilter/fwtest2.input
Normal file → Executable file
@ -10,7 +10,7 @@ int dcbfun(struct dcb* dcb, GWBUF * buffer)
|
||||
int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){
|
||||
|
||||
|
||||
int i = 0;
|
||||
int i = 0,rval = 0;
|
||||
MYSQL_session* mysqlsess;
|
||||
DCB* dcb;
|
||||
char cwd[1024];
|
||||
@ -60,7 +60,7 @@ int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){
|
||||
skygw_logmanager_init( 3, optstr);
|
||||
free(optstr);
|
||||
|
||||
process_opts(argc,argv);
|
||||
rval = process_opts(argc,argv);
|
||||
|
||||
if(!(instance.thrpool = malloc(instance.thrcount * sizeof(pthread_t)))){
|
||||
printf("Error: Out of memory\n");
|
||||
@ -72,10 +72,10 @@ int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){
|
||||
pthread_mutex_lock(&instance.work_mtx);
|
||||
size_t thr_num = 1;
|
||||
for(i = 0;i<instance.thrcount;i++){
|
||||
pthread_create(&instance.thrpool[i],NULL,(void*)work_buffer,(void*)thr_num++);
|
||||
rval |= pthread_create(&instance.thrpool[i],NULL,(void*)work_buffer,(void*)thr_num++);
|
||||
}
|
||||
|
||||
return 0;
|
||||
return rval;
|
||||
}
|
||||
|
||||
void free_filters()
|
||||
@ -543,10 +543,14 @@ int load_config( char* fname)
|
||||
{
|
||||
CONFIG* iter;
|
||||
CONFIG_ITEM* item;
|
||||
int config_ok = 1;
|
||||
int config_ok = 1,inirval;
|
||||
free_filters();
|
||||
if(ini_parse(fname,handler,instance.conf) < 0){
|
||||
if((inirval = ini_parse(fname,handler,instance.conf)) < 0){
|
||||
printf("Error parsing configuration file!\n");
|
||||
if(inirval == -1)
|
||||
printf("Inih file open error.\n");
|
||||
else if(inirval == -2)
|
||||
printf("inih memory error.\n");
|
||||
skygw_log_write(LOGFILE_ERROR,"Error parsing configuration file!\n");
|
||||
config_ok = 0;
|
||||
goto cleanup;
|
||||
@ -991,7 +995,7 @@ GWBUF* gen_packet(PACKET pkt)
|
||||
int process_opts(int argc, char** argv)
|
||||
{
|
||||
int fd, buffsize = 1024;
|
||||
int rd,rdsz, rval = 0;
|
||||
int rd,rdsz, rval = 0,error;
|
||||
size_t fsize;
|
||||
char *buff = calloc(buffsize,sizeof(char)), *tok = NULL;
|
||||
|
||||
@ -1071,6 +1075,7 @@ int process_opts(int argc, char** argv)
|
||||
free(conf_name);
|
||||
}
|
||||
conf_name = strdup(optarg);
|
||||
printf("Configuration: %s\n",optarg);
|
||||
break;
|
||||
|
||||
case 'q':
|
||||
@ -1079,12 +1084,12 @@ int process_opts(int argc, char** argv)
|
||||
|
||||
case 's':
|
||||
instance.session_count = atoi(optarg);
|
||||
printf("Sessions: %i ",instance.session_count);
|
||||
printf("Sessions: %i\n",instance.session_count);
|
||||
break;
|
||||
|
||||
case 't':
|
||||
instance.thrcount = atoi(optarg);
|
||||
printf("Threads: %i ",instance.thrcount);
|
||||
printf("Threads: %i\n",instance.thrcount);
|
||||
break;
|
||||
|
||||
case 'd':
|
||||
@ -1121,7 +1126,7 @@ int process_opts(int argc, char** argv)
|
||||
}
|
||||
printf("\n");
|
||||
|
||||
if(conf_name && load_config(conf_name)){
|
||||
if(conf_name && (error = load_config(conf_name))){
|
||||
load_query();
|
||||
}else{
|
||||
instance.running = 0;
|
||||
@ -1129,6 +1134,11 @@ int process_opts(int argc, char** argv)
|
||||
free(conf_name);
|
||||
close(fd);
|
||||
|
||||
if(!error)
|
||||
{
|
||||
rval = 1;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
|
86
server/modules/filter/test/tee_recursion.sh
Executable file
86
server/modules/filter/test/tee_recursion.sh
Executable file
@ -0,0 +1,86 @@
|
||||
#!/bin/bash
|
||||
|
||||
function execute_test()
|
||||
{
|
||||
|
||||
RVAL=$(mysql --connect-timeout=5 -u $USER -p$PWD -h $HOST -P $PORT -e "select 1;"|grep -i error)
|
||||
|
||||
if [[ ! -e $MAXPID ]]
|
||||
then
|
||||
echo "Test failed: $MAXPID was not found."
|
||||
return 1
|
||||
fi
|
||||
|
||||
if [[ "$RVAL" != "" ]]
|
||||
then
|
||||
echo "Test failed: Query to backend didn't return an error."
|
||||
return 1
|
||||
fi
|
||||
|
||||
LAST_LOG=$(ls $BINDIR/log -1|grep err|sort|uniq|tail -n 1)
|
||||
TEST_RESULT=$(cat $BINDIR/log/$LAST_LOG | grep -i recursive)
|
||||
if [[ "$TEST_RESULT" != "" ]]
|
||||
then
|
||||
return 0
|
||||
fi
|
||||
echo "Test failed: Log file didn't mention tee recursion."
|
||||
return 1
|
||||
}
|
||||
|
||||
function reload_conf()
|
||||
{
|
||||
$BINDIR/bin/maxadmin --user=admin --password=skysql reload config
|
||||
if [[ $? -ne 0 ]]
|
||||
then
|
||||
echo "Test failed: maxadmin returned a non-zero value."
|
||||
return 1
|
||||
fi
|
||||
return 0
|
||||
}
|
||||
|
||||
if [[ $# -lt 6 ]]
|
||||
then
|
||||
echo "usage: $0 <build dir> <source dir>"
|
||||
exit 1
|
||||
fi
|
||||
BINDIR=$1
|
||||
SRCDIR=$2
|
||||
USER=$3
|
||||
PWD=$4
|
||||
HOST=$5
|
||||
PORT=$6
|
||||
CONF=$BINDIR/etc/MaxScale.cnf
|
||||
OLDCONF=$BINDIR/etc/MaxScale.cnf.old
|
||||
MAXPID=$BINDIR/log/$(ls -1 $BINDIR/log|grep maxscale)
|
||||
TEST1=$SRCDIR/server/modules/filter/test/tee_recursion1.cnf
|
||||
TEST2=$SRCDIR/server/modules/filter/test/tee_recursion2.cnf
|
||||
|
||||
$BINDIR/bin/maxadmin --user=admin --password=skysql flush logs
|
||||
|
||||
mv $CONF $OLDCONF
|
||||
cp $TEST1 $CONF
|
||||
reload_conf
|
||||
execute_test
|
||||
T1RVAL=$?
|
||||
mv $CONF $CONF.test1
|
||||
cp $TEST2 $CONF
|
||||
reload_conf
|
||||
execute_test
|
||||
T2RVAL=$?
|
||||
mv $CONF $CONF.test2
|
||||
mv $OLDCONF $CONF
|
||||
reload_conf
|
||||
|
||||
if [[ $T1RVAL -ne 0 ]]
|
||||
then
|
||||
echo "Test 1 failed."
|
||||
exit 1
|
||||
elif [[ $T2RVAL -ne 0 ]]
|
||||
then
|
||||
echo "Test 2 failed"
|
||||
exit 1
|
||||
else
|
||||
echo "Test successful: log mentions recursive tee usage."
|
||||
fi
|
||||
|
||||
exit 0
|
114
server/modules/filter/test/tee_recursion1.cnf
Normal file
114
server/modules/filter/test/tee_recursion1.cnf
Normal file
@ -0,0 +1,114 @@
|
||||
[maxscale]
|
||||
threads=4
|
||||
|
||||
[MySQL Monitor]
|
||||
type=monitor
|
||||
module=mysqlmon
|
||||
servers=server1,server2,server3,server4
|
||||
user=maxuser
|
||||
passwd=maxpwd
|
||||
monitor_interval=10000
|
||||
|
||||
[RW Split Router]
|
||||
type=service
|
||||
router=readwritesplit
|
||||
servers=server1,server2,server3,server4
|
||||
user=maxuser
|
||||
passwd=maxpwd
|
||||
filters=recurse1
|
||||
|
||||
[RW Split Hint Router]
|
||||
type=service
|
||||
router=readwritesplit
|
||||
servers=server1,server2,server3,server4
|
||||
user=maxuser
|
||||
passwd=maxpwd
|
||||
filters=recurse2
|
||||
|
||||
|
||||
[Read Connection Router]
|
||||
type=service
|
||||
router=readconnroute
|
||||
router_options=master
|
||||
servers=server1
|
||||
user=maxuser
|
||||
passwd=maxpwd
|
||||
filters=recurse3
|
||||
|
||||
[recurse3]
|
||||
type=filter
|
||||
module=tee
|
||||
service=RW Split Router
|
||||
|
||||
[recurse2]
|
||||
type=filter
|
||||
module=tee
|
||||
service=Read Connection Router
|
||||
|
||||
[recurse1]
|
||||
type=filter
|
||||
module=tee
|
||||
service=RW Split Hint Router
|
||||
|
||||
|
||||
[Debug Interface]
|
||||
type=service
|
||||
router=debugcli
|
||||
|
||||
[CLI]
|
||||
type=service
|
||||
router=cli
|
||||
|
||||
[Read Connection Listener]
|
||||
type=listener
|
||||
service=Read Connection Router
|
||||
protocol=MySQLClient
|
||||
port=4008
|
||||
|
||||
[RW Split Listener]
|
||||
type=listener
|
||||
service=RW Split Router
|
||||
protocol=MySQLClient
|
||||
port=4006
|
||||
|
||||
[RW Split Hint Listener]
|
||||
type=listener
|
||||
service=RW Split Hint Router
|
||||
protocol=MySQLClient
|
||||
port=4009
|
||||
|
||||
[Debug Listener]
|
||||
type=listener
|
||||
service=Debug Interface
|
||||
protocol=telnetd
|
||||
port=4442
|
||||
|
||||
[CLI Listener]
|
||||
type=listener
|
||||
service=CLI
|
||||
protocol=maxscaled
|
||||
port=6603
|
||||
|
||||
[server1]
|
||||
type=server
|
||||
address=127.0.0.1
|
||||
port=3000
|
||||
protocol=MySQLBackend
|
||||
|
||||
[server2]
|
||||
type=server
|
||||
address=127.0.0.1
|
||||
port=3001
|
||||
protocol=MySQLBackend
|
||||
|
||||
[server3]
|
||||
type=server
|
||||
address=127.0.0.1
|
||||
port=3002
|
||||
protocol=MySQLBackend
|
||||
|
||||
[server4]
|
||||
type=server
|
||||
address=127.0.0.1
|
||||
port=3003
|
||||
protocol=MySQLBackend
|
112
server/modules/filter/test/tee_recursion2.cnf
Normal file
112
server/modules/filter/test/tee_recursion2.cnf
Normal file
@ -0,0 +1,112 @@
|
||||
[maxscale]
|
||||
threads=4
|
||||
|
||||
[MySQL Monitor]
|
||||
type=monitor
|
||||
module=mysqlmon
|
||||
servers=server1,server2,server3,server4
|
||||
user=maxuser
|
||||
passwd=maxpwd
|
||||
monitor_interval=10000
|
||||
|
||||
[RW Split Router]
|
||||
type=service
|
||||
router=readwritesplit
|
||||
servers=server1,server2,server3,server4
|
||||
user=maxuser
|
||||
passwd=maxpwd
|
||||
filters=recurse1|recurse2
|
||||
|
||||
[RW Split Hint Router]
|
||||
type=service
|
||||
router=readwritesplit
|
||||
servers=server1,server2,server3,server4
|
||||
user=maxuser
|
||||
passwd=maxpwd
|
||||
|
||||
[Read Connection Router]
|
||||
type=service
|
||||
router=readconnroute
|
||||
router_options=master
|
||||
servers=server1
|
||||
user=maxuser
|
||||
passwd=maxpwd
|
||||
filters=recurse3
|
||||
|
||||
[recurse3]
|
||||
type=filter
|
||||
module=tee
|
||||
service=RW Split Router
|
||||
|
||||
[recurse2]
|
||||
type=filter
|
||||
module=tee
|
||||
service=Read Connection Router
|
||||
|
||||
[recurse1]
|
||||
type=filter
|
||||
module=tee
|
||||
service=RW Split Hint Router
|
||||
|
||||
|
||||
[Debug Interface]
|
||||
type=service
|
||||
router=debugcli
|
||||
|
||||
[CLI]
|
||||
type=service
|
||||
router=cli
|
||||
|
||||
[Read Connection Listener]
|
||||
type=listener
|
||||
service=Read Connection Router
|
||||
protocol=MySQLClient
|
||||
port=4008
|
||||
|
||||
[RW Split Listener]
|
||||
type=listener
|
||||
service=RW Split Router
|
||||
protocol=MySQLClient
|
||||
port=4006
|
||||
|
||||
[RW Split Hint Listener]
|
||||
type=listener
|
||||
service=RW Split Hint Router
|
||||
protocol=MySQLClient
|
||||
port=4009
|
||||
|
||||
[Debug Listener]
|
||||
type=listener
|
||||
service=Debug Interface
|
||||
protocol=telnetd
|
||||
port=4442
|
||||
|
||||
[CLI Listener]
|
||||
type=listener
|
||||
service=CLI
|
||||
protocol=maxscaled
|
||||
port=6603
|
||||
|
||||
[server1]
|
||||
type=server
|
||||
address=127.0.0.1
|
||||
port=3000
|
||||
protocol=MySQLBackend
|
||||
|
||||
[server2]
|
||||
type=server
|
||||
address=127.0.0.1
|
||||
port=3001
|
||||
protocol=MySQLBackend
|
||||
|
||||
[server3]
|
||||
type=server
|
||||
address=127.0.0.1
|
||||
port=3002
|
||||
protocol=MySQLBackend
|
||||
|
||||
[server4]
|
||||
type=server
|
||||
address=127.0.0.1
|
||||
port=3003
|
||||
protocol=MySQLBackend
|
11
server/modules/filter/test/testdriver.sh
Executable file
11
server/modules/filter/test/testdriver.sh
Executable file
@ -0,0 +1,11 @@
|
||||
#! /bin/bash
|
||||
if [[ $# -lt 4 ]]
|
||||
then
|
||||
echo "Usage: $0 <config file> <input> <output> <expected>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
TESTDIR=@CMAKE_CURRENT_BINARY_DIR@
|
||||
SRCDIR=@CMAKE_CURRENT_SOURCE_DIR@
|
||||
$TESTDIR/harness -i $SRCDIR/$2 -o $TESTDIR/$3 -c $TESTDIR/$1 -t 1 -s 1 -e $SRCDIR/$4
|
||||
exit $?
|
Reference in New Issue
Block a user