Fixed query throttling using the rule to store the values instead of the user.

This commit is contained in:
Markus Makela
2014-12-03 10:56:22 +02:00
parent e40519885b
commit 27913edc28
2 changed files with 70 additions and 13 deletions

View File

@ -138,6 +138,8 @@ typedef struct queryspeed_t{
double cooldown; double cooldown;
int count; int count;
int limit; int limit;
long id;
struct queryspeed_t* next;
}QUERYSPEED; }QUERYSPEED;
@ -166,6 +168,8 @@ typedef struct rulelist_t{
typedef struct user_t{ typedef struct user_t{
char* name; char* name;
SPINLOCK* lock;
QUERYSPEED* qs_limit;
RULELIST* rules_or; RULELIST* rules_or;
RULELIST* rules_and; RULELIST* rules_and;
}USER; }USER;
@ -187,17 +191,18 @@ typedef struct {
RULELIST* rules; RULELIST* rules;
STRLINK* userstrings; STRLINK* userstrings;
bool def_op; bool def_op;
SPINLOCK* lock;
long idgen; /**UID generator*/
} FW_INSTANCE; } FW_INSTANCE;
/** /**
* The session structure for Firewall filter. * The session structure for Firewall filter.
*/ */
typedef struct { typedef struct {
DOWNSTREAM down;
UPSTREAM up;
SESSION* session; SESSION* session;
char* errmsg; char* errmsg;
DOWNSTREAM down;
UPSTREAM up;
} FW_SESSION; } FW_SESSION;
static int hashkeyfun(void* key); static int hashkeyfun(void* key);
@ -661,12 +666,21 @@ void link_rules(char* rule, FW_INSTANCE* instance)
/**New user*/ /**New user*/
user = (USER*)calloc(1,sizeof(USER)); user = (USER*)calloc(1,sizeof(USER));
if(user == NULL){ if(user == NULL){
return; return;
} }
if((user->lock = (SPINLOCK*)malloc(sizeof(SPINLOCK))) == NULL){
free(user);
return;
}
spinlock_init(user->lock);
} }
user->name = (char*)strdup(userptr); user->name = (char*)strdup(userptr);
user->qs_limit = NULL;
tl = (RULELIST*)rlistdup(rulelist); tl = (RULELIST*)rlistdup(rulelist);
tail = tl; tail = tl;
while(tail && tail->next){ while(tail && tail->next){
@ -700,7 +714,7 @@ void link_rules(char* rule, FW_INSTANCE* instance)
*/ */
void parse_rule(char* rule, FW_INSTANCE* instance) void parse_rule(char* rule, FW_INSTANCE* instance)
{ {
assert(rule != NULL && instance != NULL); ss_dassert(rule != NULL && instance != NULL);
char *rulecpy = strdup(rule); char *rulecpy = strdup(rule);
char *tok = strtok(rulecpy," ,"); char *tok = strtok(rulecpy," ,");
@ -836,6 +850,10 @@ void parse_rule(char* rule, FW_INSTANCE* instance)
QUERYSPEED* qs = (QUERYSPEED*)calloc(1,sizeof(QUERYSPEED)); QUERYSPEED* qs = (QUERYSPEED*)calloc(1,sizeof(QUERYSPEED));
spinlock_acquire(instance->lock);
qs->id = ++instance->idgen;
spinlock_release(instance->lock);
tok = strtok(NULL," "); tok = strtok(NULL," ");
qs->limit = atoi(tok); qs->limit = atoi(tok);
@ -890,10 +908,14 @@ createInstance(char **options, FILTER_PARAMETER **params)
char *filename = NULL, *nl; char *filename = NULL, *nl;
char buffer[2048]; char buffer[2048];
FILE* file; FILE* file;
if ((my_instance = calloc(1, sizeof(FW_INSTANCE))) == NULL){
if ((my_instance = calloc(1, sizeof(FW_INSTANCE))) == NULL ||
(my_instance->lock = (SPINLOCK*)malloc(sizeof(SPINLOCK))) == NULL){
return NULL; return NULL;
} }
spinlock_init(my_instance->lock);
if((ht = hashtable_alloc(7, hashkeyfun, hashcmpfun)) == NULL){ if((ht = hashtable_alloc(7, hashkeyfun, hashcmpfun)) == NULL){
skygw_log_write(LOGFILE_ERROR, "Unable to allocate hashtable."); skygw_log_write(LOGFILE_ERROR, "Unable to allocate hashtable.");
free(my_instance); free(my_instance);
@ -936,7 +958,6 @@ createInstance(char **options, FILTER_PARAMETER **params)
if((nl = strchr(buffer,'\n')) != NULL && ((char*)nl - (char*)buffer) < 2048){ if((nl = strchr(buffer,'\n')) != NULL && ((char*)nl - (char*)buffer) < 2048){
*nl = '\0'; *nl = '\0';
} }
parse_rule(buffer,my_instance); parse_rule(buffer,my_instance);
} }
@ -1147,7 +1168,7 @@ bool rule_is_active(RULE* rule)
* @param query Pointer to the null-terminated query string * @param query Pointer to the null-terminated query string
* @return true if the query matches the rule * @return true if the query matches the rule
*/ */
bool rule_matches(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue, RULELIST *rulelist, char* query) bool rule_matches(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue, USER* user, RULELIST *rulelist, char* query)
{ {
char *ptr,*where,*msg = NULL; char *ptr,*where,*msg = NULL;
char emsg[512]; char emsg[512];
@ -1156,6 +1177,7 @@ bool rule_matches(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue
skygw_query_op_t optype; skygw_query_op_t optype;
STRLINK* strln = NULL; STRLINK* strln = NULL;
QUERYSPEED* queryspeed = NULL; QUERYSPEED* queryspeed = NULL;
QUERYSPEED* rule_qs = NULL;
time_t time_now; time_t time_now;
struct tm* tm_now; struct tm* tm_now;
@ -1269,7 +1291,39 @@ bool rule_matches(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue
break; break;
case RT_THROTTLE: case RT_THROTTLE:
queryspeed = (QUERYSPEED*)rulelist->rule->data;
/**
* Check if this is the first time this rule is matched and if so, allocate
* and initialize a new QUERYSPEED struct for this session.
*/
spinlock_acquire(my_instance->lock);
rule_qs = (QUERYSPEED*)rulelist->rule->data;
spinlock_release(my_instance->lock);
spinlock_acquire(user->lock);
queryspeed = user->qs_limit;
while(queryspeed){
if(queryspeed->id == rule_qs->id){
break;
}
queryspeed = queryspeed->next;
}
if(queryspeed == NULL){
/**No match found*/
queryspeed = (QUERYSPEED*)calloc(1,sizeof(QUERYSPEED));
queryspeed->period = rule_qs->period;
queryspeed->cooldown = rule_qs->cooldown;
queryspeed->limit = rule_qs->limit;
queryspeed->id = rule_qs->id;
queryspeed->next = user->qs_limit;
user->qs_limit = queryspeed;
}
if(queryspeed->count > queryspeed->limit) if(queryspeed->count > queryspeed->limit)
{ {
queryspeed->triggered = time_now; queryspeed->triggered = time_now;
@ -1283,6 +1337,9 @@ bool rule_matches(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue
queryspeed->limit, queryspeed->limit,
queryspeed->period, queryspeed->period,
queryspeed->cooldown); queryspeed->cooldown);
double blocked_for = queryspeed->cooldown - difftime(time_now,queryspeed->triggered);
sprintf(emsg,"Queries denied for %f seconds",blocked_for);
msg = strdup(emsg);
} }
else if(difftime(time_now,queryspeed->triggered) < queryspeed->cooldown) else if(difftime(time_now,queryspeed->triggered) < queryspeed->cooldown)
{ {
@ -1303,7 +1360,7 @@ bool rule_matches(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue
{ {
queryspeed->first_query = time_now; queryspeed->first_query = time_now;
} }
spinlock_release(user->lock);
break; break;
case RT_CLAUSE: case RT_CLAUSE:
@ -1366,7 +1423,7 @@ bool check_match_any(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *qu
rulelist = rulelist->next; rulelist = rulelist->next;
continue; continue;
} }
if((rval = rule_matches(my_instance,my_session,queue,rulelist,fullquery))){ if((rval = rule_matches(my_instance,my_session,queue,user,rulelist,fullquery))){
goto retblock; goto retblock;
} }
rulelist = rulelist->next; rulelist = rulelist->next;
@ -1417,7 +1474,7 @@ bool check_match_all(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *qu
continue; continue;
} }
if(!rule_matches(my_instance,my_session,queue,rulelist,fullquery)){ if(!rule_matches(my_instance,my_session,queue,user,rulelist,fullquery)){
rval = false; rval = false;
goto retblock; goto retblock;
} }

View File

@ -11,7 +11,7 @@ add_executable(harness harness_util.c harness_common.c ${CORE})
target_link_libraries(harness_ui fullcore log_manager utils) target_link_libraries(harness_ui fullcore log_manager utils)
target_link_libraries(harness fullcore) target_link_libraries(harness fullcore)
execute_process(COMMAND ${CMAKE_COMMAND} -E copy ${ERRMSG} ${CMAKE_CURRENT_BINARY_DIR}) execute_process(COMMAND ${CMAKE_COMMAND} -E copy ${ERRMSG} ${CMAKE_CURRENT_BINARY_DIR})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fwtest.cnf.in ${CMAKE_CURRENT_BINARY_DIR}/fwfilter.cnf) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fwfilter/fwtest.cnf.in ${CMAKE_CURRENT_BINARY_DIR}/fwfilter.cnf)