diff --git a/log_manager/log_manager.cc b/log_manager/log_manager.cc index 75bd03872..04eef42e4 100644 --- a/log_manager/log_manager.cc +++ b/log_manager/log_manager.cc @@ -3079,7 +3079,7 @@ static int find_last_seqno( for (i=0, p=parts; p->sp_string != NULL; i++, p=p->sp_next) { - if (snstr != NULL && i == seqnoidx && strnlen(snstr,NAME_MAX) < NAME_MAX) + if (snstr != NULL && i == seqnoidx) { strncat(filename, snstr, NAME_MAX - 1); /*< add sequence number */ } diff --git a/query_classifier/query_classifier.cc b/query_classifier/query_classifier.cc index 6fa800e95..234d56e31 100644 --- a/query_classifier/query_classifier.cc +++ b/query_classifier/query_classifier.cc @@ -1210,7 +1210,124 @@ bool is_drop_table_query(GWBUF* querybuf) lex->sql_command == SQLCOM_DROP_TABLE); } +inline void add_str(char** buf, int* buflen, int* bufsize, char* str) +{ + int isize = strlen(str) + 1; + if(*buf == NULL || isize + *buflen >= *bufsize) + { + char *tmp = (char*)calloc((*bufsize) * 2 + isize, sizeof(char)); + if(tmp){ + memcpy(tmp,*buf,*bufsize); + if(*buf){ + free(*buf); + } + *buf = tmp; + *bufsize = (*bufsize) * 2 + isize; + } + } + + if(*buflen > 0){ + strcat(*buf," "); + } + strcat(*buf,str); + *buflen += isize; + +} + + /** + * Returns all the fields that the query affects. + * @param buf Buffer to parse + * @return Pointer to newly allocated string or NULL if nothing was found + */ +char* skygw_get_affected_fields(GWBUF* buf) +{ + LEX* lex; + int buffsz = 0,bufflen = 0; + char* where = NULL; + Item* item; + Item::Type itype; + + if(!query_is_parsed(buf)){ + parse_query(buf); + } + + if((lex = get_lex(buf)) == NULL){ + return NULL; + } + + lex->current_select = lex->all_selects_list; + + while(lex->current_select) + { + + List_iterator ilist(lex->current_select->item_list); + item = (Item*)ilist.next(); + for (item; item != NULL; item=(Item*)ilist.next()) + { + + itype = item->type(); + if(item->name && itype == Item::FIELD_ITEM){ + add_str(&where,&buffsz,&bufflen,item->name); + } + } + + + if(lex->current_select->where){ + for (item=lex->current_select->where; item != NULL; item=item->next) + { + + itype = item->type(); + if(item->name && itype == Item::FIELD_ITEM){ + add_str(&where,&buffsz,&bufflen,item->name); + } + } + } + + if(lex->current_select->having){ + for (item=lex->current_select->having; item != NULL; item=item->next) + { + + itype = item->type(); + if(item->name && itype == Item::FIELD_ITEM){ + add_str(&where,&buffsz,&bufflen,item->name); + } + } + } + + lex->current_select = lex->current_select->next_select_in_list(); + } + return where; +} + +bool skygw_query_has_clause(GWBUF* buf) +{ + LEX* lex; + SELECT_LEX* current; + bool clause = false; + + if(!query_is_parsed(buf)){ + parse_query(buf); + } + + if((lex = get_lex(buf)) == NULL){ + return false; + } + + current = lex->all_selects_list; + + while(current) + { + if(current->where || current->having){ + clause = true; + } + + current = current->next_select_in_list(); + } + return clause; +} + +/* * Replace user-provided literals with question marks. Return a copy of the * querystr with replacements. * @@ -1425,7 +1542,7 @@ static void parsing_info_set_plain_str( * @return string representing the query type value */ char* skygw_get_qtype_str( - skygw_query_type_t qtype) + skygw_query_type_t qtype) { int t1 = (int)qtype; int t2 = 1; @@ -1437,27 +1554,27 @@ char* skygw_get_qtype_str( * t1 is completely cleared. */ while (t1 != 0) - { - if (t1&t2) - { - t = (skygw_query_type_t)t2; + { + if (t1&t2) + { + t = (skygw_query_type_t)t2; - if (qtype_str == NULL) - { - qtype_str = strdup(STRQTYPE(t)); - } - else - { - size_t len = strlen(STRQTYPE(t)); - /** reallocate space for delimiter, new string and termination */ - qtype_str = (char *)realloc(qtype_str, strlen(qtype_str)+1+len+1); - snprintf(qtype_str+strlen(qtype_str), 1+len+1, "|%s", STRQTYPE(t)); - } - /** Remove found value from t1 */ - t1 &= ~t2; + if (qtype_str == NULL) + { + qtype_str = strdup(STRQTYPE(t)); + } + else + { + size_t len = strlen(STRQTYPE(t)); + /** reallocate space for delimiter, new string and termination */ + qtype_str = (char *)realloc(qtype_str, strlen(qtype_str)+1+len+1); + snprintf(qtype_str+strlen(qtype_str), 1+len+1, "|%s", STRQTYPE(t)); + } + /** Remove found value from t1 */ + t1 &= ~t2; + } + t2 <<= 1; } - t2 <<= 1; - } return qtype_str; } @@ -1516,3 +1633,50 @@ retblock: *size = i; return databases; } + +skygw_query_op_t query_classifier_get_operation(GWBUF* querybuf) +{ + LEX* lex = get_lex(querybuf); + skygw_query_op_t operation; + if(lex){ + switch(lex->sql_command){ + case SQLCOM_SELECT: + operation = QUERY_OP_SELECT; + break; + case SQLCOM_CREATE_TABLE: + operation = QUERY_OP_CREATE_TABLE; + break; + case SQLCOM_CREATE_INDEX: + operation = QUERY_OP_CREATE_INDEX; + break; + case SQLCOM_ALTER_TABLE: + operation = QUERY_OP_ALTER_TABLE; + break; + case SQLCOM_UPDATE: + operation = QUERY_OP_UPDATE; + break; + case SQLCOM_INSERT: + operation = QUERY_OP_INSERT; + break; + case SQLCOM_INSERT_SELECT: + operation = QUERY_OP_INSERT_SELECT; + break; + case SQLCOM_DELETE: + operation = QUERY_OP_DELETE; + break; + case SQLCOM_TRUNCATE: + operation = QUERY_OP_TRUNCATE; + break; + case SQLCOM_DROP_TABLE: + operation = QUERY_OP_DROP_TABLE; + break; + case SQLCOM_DROP_INDEX: + operation = QUERY_OP_DROP_INDEX; + break; + + default: + operation = QUERY_OP_UNDEFINED; + } + } + return operation; +} diff --git a/query_classifier/query_classifier.h b/query_classifier/query_classifier.h index 5837b81c7..754e69478 100644 --- a/query_classifier/query_classifier.h +++ b/query_classifier/query_classifier.h @@ -60,6 +60,20 @@ typedef enum { QUERY_TYPE_SHOW_TABLES = 0x400000 /*< Show list of tables */ } skygw_query_type_t; +typedef enum { + QUERY_OP_UNDEFINED = 0, + QUERY_OP_SELECT = 1, + QUERY_OP_UPDATE = (1 << 1), + QUERY_OP_INSERT = (1 << 2), + QUERY_OP_DELETE = (1 << 3), + QUERY_OP_INSERT_SELECT = (1 << 4), + QUERY_OP_TRUNCATE = (1 << 5), + QUERY_OP_ALTER_TABLE = (1 << 6), + QUERY_OP_CREATE_TABLE = (1 << 7), + QUERY_OP_CREATE_INDEX = (1 << 8), + QUERY_OP_DROP_TABLE = (1 << 9), + QUERY_OP_DROP_INDEX = (1 << 10) +}skygw_query_op_t; typedef struct parsing_info_st { #if defined(SS_DEBUG) @@ -81,7 +95,7 @@ typedef struct parsing_info_st { * classify the query. */ skygw_query_type_t query_classifier_get_type(GWBUF* querybuf); - +skygw_query_op_t query_classifier_get_operation(GWBUF* querybuf); /** Free THD context and close MYSQL */ #if defined(NOT_USED) char* skygw_query_classifier_get_stmtname(GWBUF* buf); @@ -95,8 +109,9 @@ bool parse_query (GWBUF* querybuf); parsing_info_t* parsing_info_init(void (*donefun)(void *)); void parsing_info_done(void* ptr); bool query_is_parsed(GWBUF* buf); +bool skygw_query_has_clause(GWBUF* buf); char* skygw_get_qtype_str(skygw_query_type_t qtype); - +char* skygw_get_affected_fields(GWBUF* buf); EXTERN_C_BLOCK_END diff --git a/server/core/modutil.c b/server/core/modutil.c index d6dbfa7b1..fd7fb3b68 100644 --- a/server/core/modutil.c +++ b/server/core/modutil.c @@ -31,7 +31,7 @@ #include #include #include - +#include /** * Check if a GWBUF structure is a MySQL COM_QUERY packet * @@ -493,3 +493,39 @@ GWBUF* modutil_get_next_MySQL_packet( return_packetbuf: return packetbuf; } + + +/** + * Count the number of EOF, OK or ERR packets in the buffer. + * @param reply Buffer to use + * @param use_ok Whether the DEPRECATE_EOF flag is set + * @param n_found If there were previous packets found + * @return Number of EOF packets + */ +int +modutil_count_signal_packets(GWBUF *reply,int use_ok, int n_found) +{ + unsigned char* ptr = (unsigned char*) reply->start; + unsigned char* end = (unsigned char*) reply->end; + int pktlen,pkt = 0; + + while(ptr < end) + { + pktlen = gw_mysql_get_byte3(ptr) + 4; + + if(PTR_IS_ERR(ptr) || (PTR_IS_EOF(ptr) && !use_ok) || (use_ok && PTR_IS_OK(ptr))) + { + if(n_found) + { + if(ptr + pktlen >= end) + pkt++; + } + else + { + pkt++; + } + } + ptr += pktlen; + } + return pkt; +} diff --git a/server/include/modutil.h b/server/include/modutil.h index fac39cbcc..2ccab523c 100644 --- a/server/include/modutil.h +++ b/server/include/modutil.h @@ -34,6 +34,12 @@ #include #include +#define PTR_IS_RESULTSET(b) (b[0] == 0x01 && b[1] == 0x0 && b[2] == 0x0 && b[3] == 0x01) +#define PTR_IS_EOF(b) (b[0] == 0x05 && b[1] == 0x0 && b[2] == 0x0 && b[4] == 0xfe) +#define PTR_IS_OK(b) (b[4] == 0x00) +#define PTR_IS_ERR(b) (b[4] == 0xff) +#define PTR_IS_LOCAL_INFILE(b) (b[4] == 0xfb) + extern int modutil_is_SQL(GWBUF *); extern int modutil_extract_SQL(GWBUF *, char **, int *); extern int modutil_MySQL_Query(GWBUF *, char **, int *, int *); @@ -44,7 +50,6 @@ extern int modutil_send_mysql_err_packet(DCB *, int, int, int, const char *, con GWBUF* modutil_get_next_MySQL_packet(GWBUF** p_readbuf); int modutil_MySQL_query_len(GWBUF* buf, int* nbytes_missing); - GWBUF *modutil_create_mysql_err_msg( int packet_number, int affected_rows, @@ -52,4 +57,5 @@ GWBUF *modutil_create_mysql_err_msg( const char *statemsg, const char *msg); +int modutil_count_signal_packets(GWBUF*,int,int); #endif diff --git a/server/modules/filter/CMakeLists.txt b/server/modules/filter/CMakeLists.txt index 6e70adaf3..3039d394b 100644 --- a/server/modules/filter/CMakeLists.txt +++ b/server/modules/filter/CMakeLists.txt @@ -24,8 +24,13 @@ add_library(topfilter SHARED topfilter.c) target_link_libraries(topfilter log_manager utils) install(TARGETS topfilter DESTINATION modules) +add_library(fwfilter SHARED fwfilter.c) +target_link_libraries(fwfilter log_manager utils query_classifier) +install(TARGETS fwfilter DESTINATION modules) + + add_subdirectory(hint) if(BUILD_TESTS) add_subdirectory(test) -endif() \ No newline at end of file +endif() diff --git a/server/modules/filter/fwfilter.c b/server/modules/filter/fwfilter.c new file mode 100644 index 000000000..2a15d126f --- /dev/null +++ b/server/modules/filter/fwfilter.c @@ -0,0 +1,1639 @@ +/* + * This file is distributed as part of MaxScale by MariaDB Corporation. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright MariaDB Corporation Ab 2014 + */ + +/** + * @file fwfilter.c + * Firewall Filter + * + * A filter that acts as a firewall, denying queries that do not meet a set of rules. + * + * Filter configuration parameters: + * + * rules= Location of the rule file + * + * Rules are defined in a separate rule file that lists all the rules and the users to whom the rules are applied. + * Rules follow a simple syntax that denies the queries that meet the requirements of the rules. + * For example, to define a rule denying users from accessing the column 'salary' between + * the times 15:00 and 17:00, the following rule is to be configured into the configuration file: + * + * rule block_salary deny columns salary at_times 15:00:00-17:00:00 + * + * The users are matched by username and network address. Wildcard values can be provided by using the '%' character. + * For example, to apply this rule to users John, connecting from any address + * that starts with the octets 198.168.%, and Jane, connecting from the address 192.168.0.1: + * + * users John@192.168.% Jane@192.168.0.1 match any rules block_salary + * + * + * The 'match' keyword controls the way rules are matched. If it is set to 'any' the first active rule that is triggered will cause the query to be denied. + * If it is set to 'all' all the active rules need to match before the query is denied. + * + * Rule syntax + * + * rule NAME deny [wildcard | columns VALUE ... | regex REGEX | limit_queries COUNT TIMEPERIOD HOLDOFF | no_where_clause] [at_times VALUE...] [on_queries [select|update|insert|delete]] + * + * User syntax + * + * users NAME ... match [any|all] rules RULE ... + * + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +MODULE_INFO info = { + MODULE_API_FILTER, + MODULE_ALPHA_RELEASE, + FILTER_VERSION, + "Firewall Filter" +}; + +static char *version_str = "V1.0.0"; + +/* + * The filter entry points + */ +static FILTER *createInstance(char **options, FILTER_PARAMETER **); +static void *newSession(FILTER *instance, SESSION *session); +static void closeSession(FILTER *instance, void *session); +static void freeSession(FILTER *instance, void *session); +static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); +static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); + +static FILTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + setDownstream, + NULL, + routeQuery, + NULL, + diagnostic, +}; + + +/** + * Rule types + */ +typedef enum { + RT_UNDEFINED = 0x00, + RT_COLUMN, + RT_THROTTLE, + RT_PERMISSION, + RT_WILDCARD, + RT_REGEX, + RT_CLAUSE +}ruletype_t; + +const char* rule_names[] = { + "UNDEFINED", + "COLUMN", + "THROTTLE", + "PERMISSION", + "WILDCARD", + "REGEX", + "CLAUSE" +}; + + +/** + * Linked list of strings. + */ +typedef struct strlink_t{ + struct strlink_t *next; + char* value; +}STRLINK; + +typedef struct timerange_t{ + struct timerange_t* next; + struct tm start; + struct tm end; +}TIMERANGE; + +typedef struct queryspeed_t{ + time_t first_query; + time_t triggered; + double period; + double cooldown; + int count; + int limit; + long id; + struct queryspeed_t* next; +}QUERYSPEED; + + +/** + * A structure used to identify individual rules and to store their contents + * + * Each type of rule has different requirements that are expressed as void pointers. + * This allows to match an arbitrary set of rules against a user. + */ +typedef struct rule_t{ + void* data; + char* name; + ruletype_t type; + skygw_query_op_t on_queries; + bool allow; + int times_matched; + TIMERANGE* active; +}RULE; + +/** + * Linked list of pointers to a global pool of RULE structs + */ +typedef struct rulelist_t{ + RULE* rule; + struct rulelist_t* next; +}RULELIST; + +typedef struct user_t{ + char* name; + SPINLOCK* lock; + QUERYSPEED* qs_limit; + RULELIST* rules_or; + RULELIST* rules_and; +}USER; + +/** + * Linked list of IP adresses and subnet masks + */ +typedef struct iprange_t{ + struct iprange_t* next; + uint32_t ip; + uint32_t mask; +}IPRANGE; + +/** + * The Firewall filter instance. + */ +typedef struct { + HASHTABLE* htable; + RULELIST* rules; + STRLINK* userstrings; + bool def_op; + SPINLOCK* lock; + long idgen; /**UID generator*/ +} FW_INSTANCE; + +/** + * The session structure for Firewall filter. + */ +typedef struct { + SESSION* session; + char* errmsg; + DOWNSTREAM down; + UPSTREAM up; +} FW_SESSION; + +static int hashkeyfun(void* key); +static int hashcmpfun (void *, void *); + +static int hashkeyfun( + void* key) +{ + if(key == NULL){ + return 0; + } + unsigned int hash = 0,c = 0; + char* ptr = (char*)key; + while((c = *ptr++)){ + hash = c + (hash << 6) + (hash << 16) - hash; + } + return (int)hash > 0 ? hash : -hash; +} + +static int hashcmpfun( + void* v1, + void* v2) +{ + char* i1 = (char*) v1; + char* i2 = (char*) v2; + + return strcmp(i1,i2); +} + +static void* hstrdup(void* fval) +{ + char* str = (char*)fval; + return strdup(str); +} + + +static void* hstrfree(void* fval) +{ + free (fval); + return NULL; +} + + +void* rlistdup(void* fval) +{ + + RULELIST *rule = NULL, + *ptr = (RULELIST*)fval; + + + while(ptr){ + RULELIST* tmp = (RULELIST*)malloc(sizeof(RULELIST)); + tmp->next = rule; + tmp->rule = ptr->rule; + rule = tmp; + ptr = ptr->next; + } + + return (void*)rule; + +} + +static void* hrulefree(void* fval) +{ + USER* user = (USER*)fval; + RULELIST *ptr = user->rules_or,*tmp; + while(ptr){ + tmp = ptr; + ptr = ptr->next; + free(tmp); + } + ptr = user->rules_and; + while(ptr){ + tmp = ptr; + ptr = ptr->next; + free(tmp); + } + free(user->name); + free(user); + return NULL; +} + + +/** + * Strips the single or double quotes from a string. + * This function modifies the passed string. + * @param str String to parse + * @return Pointer to the modified string + */ +char* strip_tags(char* str) +{ + + assert(str != NULL); + + char *ptr = str,*re_start = NULL; + bool found = false; + while(*ptr != '\0'){ + + if(*ptr == '"' || + *ptr == '\''){ + if(found){ + *ptr = '\0'; + memmove(str,re_start,ptr - re_start); + break; + }else{ + *ptr = ' '; + re_start = ptr + 1; + found = true; + } + } + ptr++; + + } + + return str; +} + +/** + * Parses a string that contains an IP address and converts the last octet to '%'. + * This modifies the string passed as the parameter. + * @param str String to parse + * @return Pointer to modified string or NULL if an error occurred or the string can't be made any less specific + */ +char* next_ip_class(char* str) +{ + assert(str != NULL); + + /**The least specific form is reached*/ + if(*str == '%'){ + return NULL; + } + + char* ptr = strchr(str,'\0'); + + if(ptr == NULL){ + return NULL; + } + + while(ptr > str){ + ptr--; + if(*ptr == '.' && *(ptr+1) != '%'){ + break; + } + } + + if(ptr == str){ + *ptr++ = '%'; + *ptr = '\0'; + return str; + } + + *++ptr = '%'; + *++ptr = '\0'; + + + return str; +} +/** + * Parses the strign for the types of queries this rule should be applied to. + * @param str String to parse + * @param rule Poiter to a rule + * @return True if the string was parses successfully, false if an error occurred + */ +bool parse_querytypes(char* str,RULE* rule) +{ + char buffer[512]; + char *ptr,*dest; + bool done = false; + rule->on_queries = 0; + ptr = str; + dest = buffer; + + while(ptr - buffer < 512) + { + if(*ptr == '|' || *ptr == ' ' || (done = *ptr == '\0')){ + *dest = '\0'; + if(strcmp(buffer,"select") == 0){ + rule->on_queries |= QUERY_OP_SELECT; + }else if(strcmp(buffer,"insert") == 0){ + rule->on_queries |= QUERY_OP_INSERT; + }else if(strcmp(buffer,"update") == 0){ + rule->on_queries |= QUERY_OP_UPDATE; + }else if(strcmp(buffer,"delete") == 0){ + rule->on_queries |= QUERY_OP_DELETE; + } + + if(done){ + return true; + } + + dest = buffer; + ptr++; + }else{ + *dest++ = *ptr++; + } + } + return false; +} + +/** + * Checks whether a null-terminated string contains two ISO-8601 compliant times separated + * by a single dash. + * @param str String to check + * @return True if the string is valid + */ +bool check_time(char* str) +{ + assert(str != NULL); + + char* ptr = str; + int colons = 0,numbers = 0,dashes = 0; + while(*ptr){ + if(isdigit(*ptr)){numbers++;} + else if(*ptr == ':'){colons++;} + else if(*ptr == '-'){dashes++;} + ptr++; + } + return numbers == 12 && colons == 4 && dashes == 1; +} + + +#ifdef SS_DEBUG +#define CHK_TIMES(t)(ss_dassert(t->tm_sec > -1 && t->tm_sec < 62 \ + && t->tm_min > -1 && t->tm_min < 60 \ + && t->tm_hour > -1 && t->tm_hour < 24)) +#else +#define CHK_TIMES(t) +#endif + +#define IS_RVRS_TIME(tr) (mktime(&tr->end) < mktime(&tr->start)) +/** + * Parses a null-terminated string into two tm_t structs that mark a timerange + * @param str String to parse + * @param instance FW_FILTER instance + * @return If successful returns a pointer to the new TIMERANGE instance. If errors occurred or + * the timerange was invalid, a NULL pointer is returned. + */ +TIMERANGE* parse_time(char* str, FW_INSTANCE* instance) +{ + + TIMERANGE* tr = NULL; + int intbuffer[3]; + int* idest = intbuffer; + char strbuffer[3]; + char *ptr,*sdest; + struct tm* tmptr; + + assert(str != NULL && instance != NULL); + + tr = (TIMERANGE*)calloc(1,sizeof(TIMERANGE)); + + if(tr == NULL){ + skygw_log_write(LOGFILE_ERROR, "fwfilter: malloc returned NULL."); + return NULL; + } + + memset(&tr->start,0,sizeof(struct tm)); + memset(&tr->end,0,sizeof(struct tm)); + ptr = str; + sdest = strbuffer; + tmptr = &tr->start; + + while(ptr - str < 19){ + if(isdigit(*ptr)){ + *sdest = *ptr; + }else if(*ptr == ':' ||*ptr == '-' || *ptr == '\0'){ + *sdest = '\0'; + *idest++ = atoi(strbuffer); + sdest = strbuffer; + + if(*ptr == '-' || *ptr == '\0'){ + + tmptr->tm_hour = intbuffer[0]; + tmptr->tm_min = intbuffer[1]; + tmptr->tm_sec = intbuffer[2]; + + CHK_TIMES(tmptr); + + if(*ptr == '\0'){ + return tr; + } + + idest = intbuffer; + tmptr = &tr->end; + } + ptr++; + continue; + } + ptr++; + sdest++; + } + + + return tr; +} + + +/** + * Splits the reversed timerange into two. + *@param tr A reversed timerange + *@return If the timerange is reversed, returns a pointer to the new TIMERANGE otherwise returns a NULL pointer + */ +TIMERANGE* split_reverse_time(TIMERANGE* tr) +{ + TIMERANGE* tmp = NULL; + + if(IS_RVRS_TIME(tr)){ + tmp = (TIMERANGE*)calloc(1,sizeof(TIMERANGE)); + tmp->next = tr; + tmp->start.tm_hour = 0; + tmp->start.tm_min = 0; + tmp->start.tm_sec = 0; + tmp->end = tr->end; + tr->end.tm_hour = 23; + tr->end.tm_min = 59; + tr->end.tm_sec = 59; + } + + return tmp; +} + +/** + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/** +* The module initialisation routine, called when the module +* is first loaded. +*/ +void +ModuleInit() +{ +} + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +FILTER_OBJECT * +GetModuleObject() +{ + return &MyObject; +} + +/** + * Finds the rule with a name matching the passed string. + * + * @param tok Name to search for + * @param instance A valid FW_FILTER instance + * @return A pointer to the matching RULE if found, else returns NULL + */ +RULE* find_rule(char* tok, FW_INSTANCE* instance) +{ + RULELIST* rlist = instance->rules; + + while(rlist){ + if(strcmp(rlist->rule->name,tok) == 0){ + return rlist->rule; + } + rlist = rlist->next; + } + skygw_log_write(LOGFILE_ERROR, "fwfilter: Rule not found: %s",tok); + return NULL; +} + +/** + * Adds the given rule string to the list of strings to be parsed for users. + * @param rule The rule string, assumed to be null-terminated + * @param instance The FW_FILTER instance + */ +void add_users(char* rule, FW_INSTANCE* instance) +{ + assert(rule != NULL && instance != NULL); + + STRLINK* link = calloc(1,sizeof(STRLINK)); + link->next = instance->userstrings; + link->value = strdup(rule); + instance->userstrings = link; +} + +/** + * Parses the list of rule strings for users and links them against the listed rules. + * Only adds those rules that are found. If the rule isn't found a message is written to the error log. + * @param rule Rule string to parse + * @param instance The FW_FILTER instance + */ +void link_rules(char* rule, FW_INSTANCE* instance) +{ + assert(rule != NULL && instance != NULL); + + /**Apply rules to users*/ + + bool match_any; + char *tok, *ruleptr, *userptr, *modeptr; + RULELIST* rulelist = NULL; + + userptr = strstr(rule,"users "); + modeptr = strstr(rule," match "); + ruleptr = strstr(rule," rules "); + + if((userptr == NULL || ruleptr == NULL || modeptr == NULL)|| + (userptr > modeptr || userptr > ruleptr || modeptr > ruleptr)) { + skygw_log_write(LOGFILE_ERROR, "fwfilter: Rule syntax incorrect, right keywords not found in the correct order: %s",rule); + return; + } + + *modeptr++ = '\0'; + *ruleptr++ = '\0'; + + tok = strtok(modeptr," "); + if(strcmp(tok,"match") == 0){ + tok = strtok(NULL," "); + if(strcmp(tok,"any") == 0){ + match_any = true; + }else if(strcmp(tok,"all") == 0){ + match_any = false; + }else{ + skygw_log_write(LOGFILE_ERROR, "fwfilter: Rule syntax incorrect, 'match' was not followed by 'any' or 'all': %s",rule); + return; + } + } + + tok = strtok(ruleptr," "); + tok = strtok(NULL," "); + + while(tok) + { + RULE* rule_found = NULL; + + if((rule_found = find_rule(tok,instance)) != NULL) + { + RULELIST* tmp_rl = (RULELIST*)calloc(1,sizeof(RULELIST)); + tmp_rl->rule = rule_found; + tmp_rl->next = rulelist; + rulelist = tmp_rl; + + } + tok = strtok(NULL," "); + } + + /** + * Apply this list of rules to all the listed users + */ + + *(ruleptr) = '\0'; + userptr = strtok(rule," "); + userptr = strtok(NULL," "); + + while(userptr) + { + USER* user; + RULELIST *tl = NULL,*tail = NULL; + + if((user = (USER*)hashtable_fetch(instance->htable,userptr)) == NULL){ + + /**New user*/ + user = (USER*)calloc(1,sizeof(USER)); + + if(user == NULL){ + return; + } + + if((user->lock = (SPINLOCK*)malloc(sizeof(SPINLOCK))) == NULL){ + free(user); + return; + } + + spinlock_init(user->lock); + } + + user->name = (char*)strdup(userptr); + user->qs_limit = NULL; + tl = (RULELIST*)rlistdup(rulelist); + tail = tl; + while(tail && tail->next){ + tail = tail->next; + } + + + if(match_any){ + tail->next = user->rules_or; + user->rules_or = tl; + }else{ + tail->next = user->rules_and; + user->rules_and = tl; + } + + hashtable_add(instance->htable, + (void *)userptr, + (void *)user); + + userptr = strtok(NULL," "); + + } + +} + + +/** + * Parse the configuration value either as a new rule or a list of users. + * @param rule The string to parse + * @param instance The FW_FILTER instance + */ +void parse_rule(char* rule, FW_INSTANCE* instance) +{ + ss_dassert(rule != NULL && instance != NULL); + + char *rulecpy = strdup(rule); + char *tok = strtok(rulecpy," ,"); + bool allow,deny,mode; + RULE* ruledef = NULL; + + if(tok == NULL) goto retblock; + + if(strcmp("rule",tok) == 0){ /**Define a new rule*/ + + tok = strtok(NULL," ,"); + + if(tok == NULL) goto retblock; + + RULELIST* rlist = NULL; + + ruledef = (RULE*)calloc(1,sizeof(RULE)); + rlist = (RULELIST*)calloc(1,sizeof(RULELIST)); + ruledef->name = strdup(tok); + ruledef->type = RT_UNDEFINED; + ruledef->on_queries = QUERY_OP_UNDEFINED; + rlist->rule = ruledef; + rlist->next = instance->rules; + instance->rules = rlist; + + }else if(strcmp("users",tok) == 0){ + + /**Apply rules to users*/ + add_users(rule, instance); + goto retblock; + } + + tok = strtok(NULL, " ,"); + + + if((allow = (strcmp(tok,"allow") == 0)) || + (deny = (strcmp(tok,"deny") == 0))){ + + mode = allow ? true:false; + ruledef->allow = mode; + ruledef->type = RT_PERMISSION; + tok = strtok(NULL, " ,"); + + + while(tok){ + if(strcmp(tok,"wildcard") == 0) + { + ruledef->type = RT_WILDCARD; + } + else if(strcmp(tok,"columns") == 0) + { + STRLINK *tail = NULL,*current; + ruledef->type = RT_COLUMN; + tok = strtok(NULL, " ,"); + while(tok && strcmp(tok,"at_times") != 0){ + current = malloc(sizeof(STRLINK)); + current->value = strdup(tok); + current->next = tail; + tail = current; + tok = strtok(NULL, " ,"); + } + + ruledef->data = (void*)tail; + continue; + + } + else if(strcmp(tok,"at_times") == 0) + { + + tok = strtok(NULL, " ,"); + TIMERANGE *tr = NULL; + while(tok){ + TIMERANGE *tmp = parse_time(tok,instance); + + if(IS_RVRS_TIME(tmp)){ + tmp = split_reverse_time(tmp); + } + tmp->next = tr; + tr = tmp; + tok = strtok(NULL, " ,"); + } + ruledef->active = tr; + } + else if(strcmp(tok,"regex") == 0) + { + bool escaped = false; + regex_t *re; + char* start, *str; + tok = strtok(NULL," "); + + while(*tok == '\'' || *tok == '"'){ + tok++; + } + + start = tok; + + while(isspace(*tok) || *tok == '\'' || *tok == '"'){ + tok++; + } + + while(true){ + + if((*tok == '\'' || *tok == '"') && !escaped){ + break; + } + escaped = (*tok == '\\'); + tok++; + } + + str = calloc(((tok - start) + 1),sizeof(char)); + re = (regex_t*)malloc(sizeof(regex_t)); + + if(re == NULL || str == NULL){ + skygw_log_write_flush(LOGFILE_ERROR, "Fatal Error: malloc returned NULL."); + + return; + } + + memcpy(str, start, (tok-start)); + + if(regcomp(re, str,REG_NOSUB)){ + skygw_log_write(LOGFILE_ERROR, "fwfilter: Invalid regular expression '%s'.", str); + free(re); + } + + ruledef->type = RT_REGEX; + ruledef->data = (void*) re; + free(str); + + } + else if(strcmp(tok,"limit_queries") == 0) + { + + QUERYSPEED* qs = (QUERYSPEED*)calloc(1,sizeof(QUERYSPEED)); + + spinlock_acquire(instance->lock); + qs->id = ++instance->idgen; + spinlock_release(instance->lock); + + tok = strtok(NULL," "); + qs->limit = atoi(tok); + + tok = strtok(NULL," "); + qs->period = atof(tok); + tok = strtok(NULL," "); + qs->cooldown = atof(tok); + ruledef->type = RT_THROTTLE; + ruledef->data = (void*)qs; + } + else if(strcmp(tok,"no_where_clause") == 0) + { + ruledef->type = RT_CLAUSE; + ruledef->data = (void*)mode; + } + else if(strcmp(tok,"on_operations") == 0) + { + tok = strtok(NULL," "); + if(!parse_querytypes(tok,ruledef)){ + skygw_log_write(LOGFILE_ERROR, + "fwfilter: Invalid query type" + "requirements on where/having clauses: %s." + ,tok); + } + } + tok = strtok(NULL," ,"); + } + + goto retblock; + } + + retblock: + free(rulecpy); + +} + +/** + * Create an instance of the filter for a particular service + * within MaxScale. + * + * @param options The options for this filter + * + * @return The instance data for this new instance + */ +static FILTER * +createInstance(char **options, FILTER_PARAMETER **params) +{ + FW_INSTANCE *my_instance; + int i; + HASHTABLE* ht; + STRLINK *ptr,*tmp; + char *filename = NULL, *nl; + char buffer[2048]; + FILE* file; + + if ((my_instance = calloc(1, sizeof(FW_INSTANCE))) == NULL || + (my_instance->lock = (SPINLOCK*)malloc(sizeof(SPINLOCK))) == NULL){ + return NULL; + } + + spinlock_init(my_instance->lock); + + if((ht = hashtable_alloc(7, hashkeyfun, hashcmpfun)) == NULL){ + skygw_log_write(LOGFILE_ERROR, "Unable to allocate hashtable."); + free(my_instance); + return NULL; + } + + hashtable_memory_fns(ht,hstrdup,NULL,hstrfree,hrulefree); + + my_instance->htable = ht; + my_instance->def_op = true; + + for(i = 0;params[i];i++){ + if(strcmp(params[i]->name, "rules") == 0){ + filename = strdup(params[i]->value); + } + } + + if((file = fopen(filename,"rb")) == NULL ){ + free(my_instance); + free(filename); + return NULL; + } + + free(filename); + + while(!feof(file)) + { + + if(fgets(buffer,2048,file) == NULL){ + if(ferror(file)){ + free(my_instance); + return NULL; + } + + if(feof(file)){ + break; + } + } + + if((nl = strchr(buffer,'\n')) != NULL && ((char*)nl - (char*)buffer) < 2048){ + *nl = '\0'; + } + parse_rule(buffer,my_instance); + + } + + fclose(file); + + /**Apply the rules to users*/ + ptr = my_instance->userstrings; + while(ptr){ + link_rules(ptr->value,my_instance); + tmp = ptr; + ptr = ptr->next; + free(tmp->value); + free(tmp); + } + + return (FILTER *)my_instance; +} + + + + +/** + * Associate a new session with this instance of the filter and opens + * a connection to the server and prepares the exchange and the queue for use. + * + * + * @param instance The filter instance data + * @param session The session itself + * @return Session specific data for this session + */ +static void * +newSession(FILTER *instance, SESSION *session) +{ + FW_SESSION *my_session; + + if ((my_session = calloc(1, sizeof(FW_SESSION))) == NULL){ + return NULL; + } + my_session->session = session; + return my_session; +} + + + +/** + * Close a session with the filter, this is the mechanism + * by which a filter may cleanup data structure etc. + * + * @param instance The filter instance data + * @param session The session being closed + */ +static void +closeSession(FILTER *instance, void *session) +{ +} + +/** + * Free the memory associated with the session + * + * @param instance The filter instance + * @param session The filter session + */ +static void +freeSession(FILTER *instance, void *session) +{ + FW_SESSION *my_session = (FW_SESSION *)session; + if(my_session->errmsg){ + free(my_session->errmsg); + + } + free(my_session); +} + +/** + * Set the downstream filter or router to which queries will be + * passed from this filter. + * + * @param instance The filter instance data + * @param session The filter session + * @param downstream The downstream filter or router. + */ +static void +setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) +{ + FW_SESSION *my_session = (FW_SESSION *)session; + my_session->down = *downstream; +} + +/** + * Generates a dummy error packet for the client with a custom message. + * @param session The FW_SESSION object + * @param msg Custom error message for the packet. + * @return The dummy packet or NULL if an error occurred + */ +GWBUF* gen_dummy_error(FW_SESSION* session, char* msg) +{ + GWBUF* buf; + char* errmsg; + DCB* dcb = session->session->client; + MYSQL_session* mysql_session = (MYSQL_session*)session->session->data; + unsigned int errlen; + + errlen = msg != NULL ? strlen(msg) : 0; + errmsg = (char*)malloc((512 + errlen)*sizeof(char)); + + if(errmsg == NULL){ + skygw_log_write_flush(LOGFILE_ERROR, "Fatal Error: malloc returned NULL."); + return NULL; + } + + + if(mysql_session->db[0] == '\0') + { + sprintf(errmsg, + "Access denied for user '%s'@'%s'", + dcb->user, + dcb->remote); + }else + { + sprintf(errmsg, + "Access denied for user '%s'@'%s' to database '%s'", + dcb->user, + dcb->remote, + mysql_session->db); + } + + if(msg != NULL){ + char* ptr = strchr(errmsg,'\0'); + sprintf(ptr,": %s",msg); + + } + + buf = modutil_create_mysql_err_msg(1,0,1141,"HY000", (const char*)errmsg); + + return buf; +} + +/** + * Checks if the timerange object is active. + * @return Whether the timerange is active + */ +bool inside_timerange(TIMERANGE* comp) +{ + + struct tm* tm_now; + struct tm tm_before,tm_after; + time_t before,after,now, time_now; + double to_before,to_after; + + time(&time_now); + tm_now = localtime(&time_now); + memcpy(&tm_before,tm_now,sizeof(struct tm)); + memcpy(&tm_after,tm_now,sizeof(struct tm)); + + + tm_before.tm_sec = comp->start.tm_sec; + tm_before.tm_min = comp->start.tm_min; + tm_before.tm_hour = comp->start.tm_hour; + tm_after.tm_sec = comp->end.tm_sec; + tm_after.tm_min = comp->end.tm_min; + tm_after.tm_hour = comp->end.tm_hour; + + + before = mktime(&tm_before); + after = mktime(&tm_after); + now = mktime(tm_now); + to_before = difftime(now,before); + to_after = difftime(now,after); + + if(to_before > 0.0 && to_after < 0.0){ + return true; + } + return false; +} + +/** + * Checks for active timeranges for a given rule. + * @param rule Pointer to a RULE object + * @return true if the rule is active + */ +bool rule_is_active(RULE* rule) +{ + TIMERANGE* times; + if(rule->active != NULL){ + + times = (TIMERANGE*)rule->active; + + while(times){ + + if(inside_timerange(times)){ + return true; + } + + times = times->next; + } + return false; + } + return true; +} + +/** + * Check if a query matches a single rule + * @param my_instance Fwfilter instance + * @param my_session Fwfilter session + * @param queue The GWBUF containing the query + * @param rulelist The rule to check + * @param query Pointer to the null-terminated query string + * @return true if the query matches the rule + */ +bool rule_matches(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue, USER* user, RULELIST *rulelist, char* query) +{ + char *ptr,*where,*msg = NULL; + char emsg[512]; + int qlen; + bool is_sql, is_real, matches; + skygw_query_op_t optype; + STRLINK* strln = NULL; + QUERYSPEED* queryspeed = NULL; + QUERYSPEED* rule_qs = NULL; + time_t time_now; + struct tm* tm_now; + + if(my_session->errmsg){ + free(my_session->errmsg); + my_session->errmsg = NULL; + } + + time(&time_now); + tm_now = localtime(&time_now); + + matches = false; + is_sql = modutil_is_SQL(queue); + + if(is_sql){ + if(!query_is_parsed(queue)){ + parse_query(queue); + } + optype = query_classifier_get_operation(queue); + modutil_extract_SQL(queue, &ptr, &qlen); + is_real = skygw_is_real_query(queue); + } + + if(rulelist->rule->on_queries == QUERY_OP_UNDEFINED || rulelist->rule->on_queries & optype){ + + switch(rulelist->rule->type){ + + case RT_UNDEFINED: + skygw_log_write_flush(LOGFILE_ERROR, "Error: Undefined rule type found."); + break; + + case RT_REGEX: + + if(query && regexec(rulelist->rule->data,query,0,NULL,0) == 0){ + + matches = true; + + if(!rulelist->rule->allow){ + msg = strdup("Permission denied, query matched regular expression."); + skygw_log_write(LOGFILE_TRACE, "fwfilter: rule '%s': regex matched on query",rulelist->rule->name); + goto queryresolved; + }else{ + break; + } + } + + break; + + case RT_PERMISSION: + if(!rulelist->rule->allow){ + matches = true; + msg = strdup("Permission denied at this time."); + skygw_log_write(LOGFILE_TRACE, "fwfilter: rule '%s': query denied at: %s",rulelist->rule->name,asctime(tm_now)); + goto queryresolved; + }else{ + break; + } + break; + + case RT_COLUMN: + + if(is_sql && is_real){ + + strln = (STRLINK*)rulelist->rule->data; + where = skygw_get_affected_fields(queue); + + if(where != NULL){ + + while(strln){ + if(strstr(where,strln->value)){ + + matches = true; + + if(!rulelist->rule->allow){ + sprintf(emsg,"Permission denied to column '%s'.",strln->value); + skygw_log_write(LOGFILE_TRACE, "fwfilter: rule '%s': query targets forbidden column: %s",rulelist->rule->name,strln->value); + msg = strdup(emsg); + goto queryresolved; + }else{ + break; + } + } + strln = strln->next; + } + } + } + + break; + + case RT_WILDCARD: + + + if(is_sql && is_real){ + char * strptr; + where = skygw_get_affected_fields(queue); + + if(where != NULL){ + strptr = where; + }else{ + strptr = query; + } + if(strchr(strptr,'*')){ + + matches = true; + msg = strdup("Usage of wildcard denied."); + skygw_log_write(LOGFILE_TRACE, "fwfilter: rule '%s': query contains a wildcard.",rulelist->rule->name); + goto queryresolved; + } + } + + break; + + case RT_THROTTLE: + + /** + * Check if this is the first time this rule is matched and if so, allocate + * and initialize a new QUERYSPEED struct for this session. + */ + + spinlock_acquire(my_instance->lock); + rule_qs = (QUERYSPEED*)rulelist->rule->data; + spinlock_release(my_instance->lock); + + spinlock_acquire(user->lock); + queryspeed = user->qs_limit; + + + while(queryspeed){ + if(queryspeed->id == rule_qs->id){ + break; + } + queryspeed = queryspeed->next; + } + + if(queryspeed == NULL){ + + /**No match found*/ + queryspeed = (QUERYSPEED*)calloc(1,sizeof(QUERYSPEED)); + queryspeed->period = rule_qs->period; + queryspeed->cooldown = rule_qs->cooldown; + queryspeed->limit = rule_qs->limit; + queryspeed->id = rule_qs->id; + queryspeed->next = user->qs_limit; + user->qs_limit = queryspeed; + } + + if(queryspeed->count > queryspeed->limit) + { + queryspeed->triggered = time_now; + queryspeed->count = 0; + matches = true; + + + skygw_log_write(LOGFILE_TRACE, + "fwfilter: rule '%s': query limit triggered (%d queries in %f seconds), denying queries from user for %f seconds.", + rulelist->rule->name, + queryspeed->limit, + queryspeed->period, + queryspeed->cooldown); + double blocked_for = queryspeed->cooldown - difftime(time_now,queryspeed->triggered); + sprintf(emsg,"Queries denied for %f seconds",blocked_for); + msg = strdup(emsg); + } + else if(difftime(time_now,queryspeed->triggered) < queryspeed->cooldown) + { + + double blocked_for = queryspeed->cooldown - difftime(time_now,queryspeed->triggered); + + sprintf(emsg,"Queries denied for %f seconds",blocked_for); + skygw_log_write(LOGFILE_TRACE, "fwfilter: rule '%s': user denied for %f seconds",rulelist->rule->name,blocked_for); + msg = strdup(emsg); + + matches = true; + } + else if(difftime(time_now,queryspeed->first_query) < queryspeed->period) + { + queryspeed->count++; + } + else + { + queryspeed->first_query = time_now; + } + spinlock_release(user->lock); + break; + + case RT_CLAUSE: + + if(is_sql && is_real && + !skygw_query_has_clause(queue)) + { + matches = true; + msg = strdup("Required WHERE/HAVING clause is missing."); + skygw_log_write(LOGFILE_TRACE, "fwfilter: rule '%s': query has no where/having clause, query is denied.", + rulelist->rule->name); + } + break; + + default: + break; + + } + } + + queryresolved: + if(msg){ + my_session->errmsg = msg; + } + + if(matches){ + rulelist->rule->times_matched++; + } + + return matches; +} + +/** + * Check if the query matches any of the rules in the user's rulelist. + * @param my_instance Fwfilter instance + * @param my_session Fwfilter session + * @param queue The GWBUF containing the query + * @param user The user whose rulelist is checked + * @return True if the query matches at least one of the rules otherwise false + */ +bool check_match_any(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue, USER* user) +{ + bool is_sql, rval = false; + int qlen; + char *fullquery = NULL,*ptr; + + RULELIST* rulelist; + is_sql = modutil_is_SQL(queue); + + if(is_sql){ + if(!query_is_parsed(queue)){ + parse_query(queue); + } + modutil_extract_SQL(queue, &ptr, &qlen); + fullquery = malloc((qlen + 1) * sizeof(char)); + memcpy(fullquery,ptr,qlen); + memset(fullquery + qlen,0,1); + } + + rulelist = user->rules_or; + + while(rulelist){ + + if(!rule_is_active(rulelist->rule)){ + rulelist = rulelist->next; + continue; + } + if((rval = rule_matches(my_instance,my_session,queue,user,rulelist,fullquery))){ + goto retblock; + } + rulelist = rulelist->next; + } + + retblock: + + free(fullquery); + + return rval; +} + +/** + * Check if the query matches all rules in the user's rulelist. + * @param my_instance Fwfilter instance + * @param my_session Fwfilter session + * @param queue The GWBUF containing the query + * @param user The user whose rulelist is checked + * @return True if the query matches all of the rules otherwise false + */ +bool check_match_all(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue, USER* user) +{ + bool is_sql, rval; + int qlen; + char *fullquery = NULL,*ptr; + + RULELIST* rulelist; + is_sql = modutil_is_SQL(queue); + + if(is_sql){ + if(!query_is_parsed(queue)){ + parse_query(queue); + } + modutil_extract_SQL(queue, &ptr, &qlen); + fullquery = malloc((qlen + 1) * sizeof(char)); + memcpy(fullquery,ptr,qlen); + memset(fullquery + qlen,0,1); + + + } + + rulelist = user->rules_or; + + while(rulelist){ + + if(!rule_is_active(rulelist->rule)){ + rulelist = rulelist->next; + continue; + } + + if(!rule_matches(my_instance,my_session,queue,user,rulelist,fullquery)){ + rval = false; + goto retblock; + } + rulelist = rulelist->next; + } + + retblock: + + free(fullquery); + + return rval; +} + +/** + * The routeQuery entry point. This is passed the query buffer + * to which the filter should be applied. Once processed the + * query is passed to the downstream component + * (filter or router) in the filter chain. + * + * @param instance The filter instance data + * @param session The filter session + * @param queue The query data + */ +static int +routeQuery(FILTER *instance, void *session, GWBUF *queue) +{ + FW_SESSION *my_session = (FW_SESSION *)session; + FW_INSTANCE *my_instance = (FW_INSTANCE *)instance; + bool accept = my_instance->def_op; + char *msg = NULL, *fullquery = NULL,*ipaddr; + char uname_addr[128]; + DCB* dcb = my_session->session->client; + USER* user = NULL; + GWBUF* forward; + ipaddr = strdup(dcb->remote); + sprintf(uname_addr,"%s@%s",dcb->user,ipaddr); + + + if((user = (USER*)hashtable_fetch(my_instance->htable, uname_addr)) == NULL){ + while(user == NULL && next_ip_class(ipaddr)){ + sprintf(uname_addr,"%s@%s",dcb->user,ipaddr); + user = (USER*)hashtable_fetch(my_instance->htable, uname_addr); + } + } + + if(user == NULL){ + strcpy(ipaddr,dcb->remote); + + do{ + sprintf(uname_addr,"%%@%s",ipaddr); + user = (USER*)hashtable_fetch(my_instance->htable, uname_addr); + }while(user == NULL && next_ip_class(ipaddr)); + } + + if(user == NULL){ + + /** + *No rules matched, do default operation. + */ + + goto queryresolved; + } + + if(check_match_any(my_instance,my_session,queue,user)){ + accept = false; + goto queryresolved; + } + + if(check_match_all(my_instance,my_session,queue,user)){ + accept = false; + goto queryresolved; + } + + queryresolved: + + free(ipaddr); + free(fullquery); + + if(accept){ + + return my_session->down.routeQuery(my_session->down.instance, + my_session->down.session, queue); + }else{ + + gwbuf_free(queue); + + if(my_session->errmsg){ + msg = my_session->errmsg; + } + forward = gen_dummy_error(my_session,msg); + + if(my_session->errmsg){ + free(my_session->errmsg); + my_session->errmsg = NULL; + } + return dcb->func.write(dcb,forward); + } +} + +/** + * Diagnostics routine + * + * Prints the connection details and the names of the exchange, + * queue and the routing key. + * + * @param instance The filter instance + * @param fsession Filter session, may be NULL + * @param dcb The DCB for diagnostic output + */ +static void +diagnostic(FILTER *instance, void *fsession, DCB *dcb) +{ + FW_INSTANCE *my_instance = (FW_INSTANCE *)instance; + RULELIST* rules; + int type; + + if (my_instance) + { + spinlock_acquire(my_instance->lock); + rules = my_instance->rules; + + dcb_printf(dcb, "Firewall Filter\n"); + dcb_printf(dcb, "%-24s%-24s%-24s\n","Rule","Type","Times Matched"); + while(rules){ + if((int)rules->rule->type > 0 && + (int)rules->rule->type < sizeof(rule_names)/sizeof(char**)){ + type = (int)rules->rule->type; + }else{ + type = 0; + } + dcb_printf(dcb,"%-24s%-24s%-24d\n", + rules->rule->name, + rule_names[type], + rules->rule->times_matched); + rules = rules->next; + } + spinlock_release(my_instance->lock); + } +} diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index af56660db..0f3021942 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -16,6 +16,9 @@ * Copyright MariaDB Corporation Ab 2014 */ +#include "spinlock.h" + + /** * @file tee.c A filter that splits the processing pipeline in two * @verbatim @@ -78,9 +81,6 @@ #define PARENT 0 #define CHILD 1 -#define PTR_IS_RESULTSET(b) (b[0] == 0x01 && b[1] == 0x0 && b[2] == 0x0 && b[3] == 0x01) -#define PTR_IS_EOF(b) (b[4] == 0xfe) - static unsigned char required_packets[] = { MYSQL_COM_QUIT, MYSQL_COM_INITDB, @@ -160,6 +160,9 @@ typedef struct { FILTER_DEF* dummy_filterdef; int active; /* filter is active? */ + bool use_ok; + bool multipacket; + unsigned char command; bool waiting[2]; /* if the client is waiting for a reply */ int eof[2]; int replies[2]; /* Number of queries received */ @@ -220,6 +223,8 @@ orphan_free(void* data) { if(ptr->session->state == SESSION_STATE_TO_BE_FREED) { + + if(ptr == allOrphans) { tmp = ptr; @@ -236,6 +241,17 @@ orphan_free(void* data) tmp = ptr; } } + + } + + /* + * The session has been unlinked from all the DCBs and it is ready to be freed. + */ + + if(ptr->session->state == SESSION_STATE_STOPPING && + ptr->session->refcount == 0 && ptr->session->client == NULL) + { + ptr->session->state = SESSION_STATE_TO_BE_FREED; } #ifdef SS_DEBUG else if(ptr->session->state == SESSION_STATE_STOPPING) @@ -577,6 +593,9 @@ char *remote, *userName; my_session->branch_session = ses; my_session->branch_dcb = dcb; my_session->dummy_filterdef = dummy; + + MySQLProtocol* protocol = (MySQLProtocol*)session->client->protocol; + my_session->use_ok = protocol->client_capabilities & (1 << 6); free(dummy_upstream); } } @@ -640,15 +659,16 @@ freeSession(FILTER *instance, void *session) { TEE_SESSION *my_session = (TEE_SESSION *)session; SESSION* ses = my_session->branch_session; - +session_state_t state; if (ses != NULL) { - if (ses->state == SESSION_STATE_ROUTER_READY) + state = ses->state; + + if (state == SESSION_STATE_ROUTER_READY) { session_free(ses); } - - if (ses->state == SESSION_STATE_TO_BE_FREED) + else if (state == SESSION_STATE_TO_BE_FREED) { /** Free branch router session */ ses->service->router->freeSession( @@ -660,7 +680,7 @@ SESSION* ses = my_session->branch_session; /** This indicates that branch session is not available anymore */ my_session->branch_session = NULL; } - else if(ses->state == SESSION_STATE_STOPPING) + else if(state == SESSION_STATE_STOPPING) { orphan_session_t* orphan; if((orphan = malloc(sizeof(orphan_session_t))) == NULL) @@ -675,17 +695,14 @@ SESSION* ses = my_session->branch_session; allOrphans = orphan; spinlock_release(&orphanLock); } - if(ses->refcount == 0) - { - ss_dassert(ses->refcount == 0 && ses->client == NULL); - ses->state = SESSION_STATE_TO_BE_FREED; - } } } if (my_session->dummy_filterdef) { filter_free(my_session->dummy_filterdef); } + if(my_session->tee_replybuf) + gwbuf_free(my_session->tee_replybuf); free(session); return; @@ -746,7 +763,7 @@ TEE_SESSION *my_session = (TEE_SESSION *)session; char *ptr; int length, rval, residual = 0; GWBUF *clone = NULL; - +unsigned char command = *((unsigned char*)queue->start + 4); if (my_session->branch_session && my_session->branch_session->state == SESSION_STATE_ROUTER_READY) { @@ -787,10 +804,24 @@ GWBUF *clone = NULL; ss_dassert(my_session->tee_replybuf == NULL); + switch(command) + { + case 0x03: + case 0x16: + case 0x17: + case 0x04: + case 0x0a: + my_session->multipacket = true; + break; + default: + my_session->multipacket = false; + break; + } + memset(my_session->replies,0,2*sizeof(int)); memset(my_session->eof,0,2*sizeof(int)); - memset(my_session->waiting,0,2*sizeof(bool)); - + memset(my_session->waiting,1,2*sizeof(bool)); + my_session->command = command; rval = my_session->down.routeQuery(my_session->down.instance, my_session->down.session, queue); @@ -827,39 +858,6 @@ GWBUF *clone = NULL; return rval; } -/** - * Scans the GWBUF for EOF packets. If two packets for this session have been found - * from either the parent or the child branch, mark the response set from that branch as over. - * @param session The Tee filter session - * @param branch Parent or child branch - * @param reply Buffer to scan - */ -void -scan_resultset(TEE_SESSION *session, int branch, GWBUF *reply) -{ - unsigned char* ptr = (unsigned char*) reply->start; - unsigned char* end = (unsigned char*) reply->end; - int pktlen = 0; - - while(ptr < end) - { - pktlen = gw_mysql_get_byte3(ptr) + 4; - if(PTR_IS_EOF(ptr)) - { - session->eof[branch]++; - - if(session->eof[branch] == 2) - { - session->waiting[branch] = false; - session->eof[branch] = 0; - return; - } - } - - ptr += pktlen; - } -} - /** * The clientReply entry point. This is passed the response buffer * to which the filter should be applied. Once processed the @@ -873,28 +871,49 @@ scan_resultset(TEE_SESSION *session, int branch, GWBUF *reply) static int clientReply (FILTER* instance, void *session, GWBUF *reply) { - int rc, branch; + int rc, branch, eof; TEE_SESSION *my_session = (TEE_SESSION *) session; spinlock_acquire(&my_session->tee_lock); - + ss_dassert(my_session->active); branch = instance == NULL ? CHILD : PARENT; unsigned char *ptr = (unsigned char*)reply->start; - + if(my_session->replies[branch] == 0) { - if(PTR_IS_RESULTSET(ptr)) + /* Reply is in a single packet if it is an OK, ERR or LOCAL_INFILE packet. + * Otherwise the reply is a result set and the amount of packets is unknown. + */ + if(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) || + PTR_IS_OK(ptr) || !my_session->multipacket ) { - my_session->waiting[branch] = true; - my_session->eof[branch] = 0; + my_session->waiting[branch] = false; } +#ifdef SS_DEBUG + else + { + ss_dassert(PTR_IS_RESULTSET(ptr)); + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c: Waiting for a result set from %s session.",branch == PARENT?"parent":"child"); + } + ss_dassert(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)|| + PTR_IS_OK(ptr) || my_session->waiting[branch] || + !my_session->multipacket); +#endif } - + if(my_session->waiting[branch]) { - scan_resultset(my_session,branch,reply); + + eof = modutil_count_signal_packets(reply,my_session->use_ok,my_session->eof[branch] > 0); + my_session->eof[branch] += eof; + if(my_session->eof[branch] >= 2 || + (my_session->command == 0x04 && my_session->eof[branch] > 0)) + { + ss_dassert(my_session->eof[branch] < 3) + my_session->waiting[branch] = false; + } } if(branch == PARENT) @@ -913,7 +932,19 @@ clientReply (FILTER* instance, void *session, GWBUF *reply) (my_session->branch_session == NULL || my_session->waiting[PARENT] || (!my_session->waiting[CHILD] && !my_session->waiting[PARENT]))) - { + { +#ifdef SS_DEBUG + skygw_log_write_flush(LOGFILE_DEBUG, "tee.c: Routing buffer '%p' parent(waiting [%s] replies [%d] eof[%d])" + " child(waiting [%s] replies[%d] eof [%d])", + my_session->tee_replybuf, + my_session->waiting[PARENT] ? "true":"false", + my_session->replies[PARENT], + my_session->eof[PARENT], + my_session->waiting[CHILD]?"true":"false", + my_session->replies[CHILD], + my_session->eof[CHILD]); +#endif + rc = my_session->up.clientReply ( my_session->up.instance, my_session->up.session, diff --git a/server/modules/filter/test/CMakeLists.txt b/server/modules/filter/test/CMakeLists.txt index d703bc7be..ed1f18e7e 100644 --- a/server/modules/filter/test/CMakeLists.txt +++ b/server/modules/filter/test/CMakeLists.txt @@ -11,9 +11,17 @@ add_executable(harness harness_util.c harness_common.c ${CORE}) target_link_libraries(harness_ui fullcore log_manager utils) target_link_libraries(harness fullcore) execute_process(COMMAND ${CMAKE_COMMAND} -E copy ${ERRMSG} ${CMAKE_CURRENT_BINARY_DIR}) -add_test(TestHintfilter /bin/sh -c "MAXSCALE_HOME=\"${CMAKE_BINARY_DIR}\" ${CMAKE_CURRENT_BINARY_DIR}/harness -i ${CMAKE_CURRENT_SOURCE_DIR}/hint_testing.input -o ${CMAKE_CURRENT_BINARY_DIR}/hint_testing.output -c ${CMAKE_CURRENT_SOURCE_DIR}/hint_testing.cnf -t 1 -s 1 -e ${CMAKE_CURRENT_SOURCE_DIR}/hint_testing.expected") +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/testdriver.sh ${CMAKE_CURRENT_BINARY_DIR}/testdriver.sh @ONLY) -add_test(TestRegexfilter /bin/sh -c "MAXSCALE_HOME=\"${CMAKE_BINARY_DIR}\" ${CMAKE_CURRENT_BINARY_DIR}/harness -i ${CMAKE_CURRENT_SOURCE_DIR}/regextest.input -o ${CMAKE_CURRENT_BINARY_DIR}/regextest.output -c ${CMAKE_CURRENT_SOURCE_DIR}/regextest.cnf -t 1 -s 1 -e ${CMAKE_CURRENT_SOURCE_DIR}/regextest.expected") +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/hintfilter/hint_testing.cnf ${CMAKE_CURRENT_BINARY_DIR}/hintfilter/hint_testing.cnf) +add_test(TestHintfilter testdriver.sh hintfilter/hint_testing.cnf hintfilter/hint_testing.input hintfilter/hint_testing.output hintfilter/hint_testing.expected) + +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/regexfilter/regextest.cnf ${CMAKE_CURRENT_BINARY_DIR}/regexfilter/regextest.cnf) +add_test(TestRegexfilter testdriver.sh regexfilter/regextest.cnf regexfilter/regextest.input regexfilter/regextest.output regexfilter/regextest.expected) + +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fwfilter/fwtest.cnf.in ${CMAKE_CURRENT_BINARY_DIR}/fwfilter/fwtest.cnf) +add_test(TestFwfilter1 testdriver.sh fwfilter/fwtest.cnf fwfilter/fwtest.input fwfilter/fwtest.output fwfilter/fwtest.expected) +add_test(TestFwfilter2 testdriver.sh fwfilter/fwtest.cnf fwfilter/fwtest2.input fwfilter/fwtest2.output fwfilter/fwtest2.expected) add_test(TestTeeRecursion ${CMAKE_CURRENT_SOURCE_DIR}/tee_recursion.sh ${CMAKE_BINARY_DIR} @@ -22,3 +30,7 @@ add_test(TestTeeRecursion ${CMAKE_CURRENT_SOURCE_DIR}/tee_recursion.sh ${TEST_PASSWORD} ${TEST_HOST} ${TEST_PORT}) + +set_tests_properties(TestHintfilter TestRegexfilter TestFwfilter1 TestFwfilter2 TestTeeRecursion +PROPERTIES +ENVIRONMENT MAXSCALE_HOME=${CMAKE_BINARY_DIR}/) diff --git a/server/modules/filter/test/fwfilter/fwtest.cnf.in b/server/modules/filter/test/fwfilter/fwtest.cnf.in new file mode 100755 index 000000000..13a1504b9 --- /dev/null +++ b/server/modules/filter/test/fwfilter/fwtest.cnf.in @@ -0,0 +1,4 @@ +[Firewall] +type=filter +module=fwfilter +rules=@CMAKE_CURRENT_SOURCE_DIR@/rules diff --git a/server/modules/filter/test/fwfilter/fwtest.expected b/server/modules/filter/test/fwfilter/fwtest.expected new file mode 100755 index 000000000..b10d04a79 --- /dev/null +++ b/server/modules/filter/test/fwfilter/fwtest.expected @@ -0,0 +1,8 @@ +select id from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1; \ No newline at end of file diff --git a/server/modules/filter/test/fwfilter/fwtest.input b/server/modules/filter/test/fwfilter/fwtest.input new file mode 100755 index 000000000..7b9a5071c --- /dev/null +++ b/server/modules/filter/test/fwfilter/fwtest.input @@ -0,0 +1,10 @@ +delete from t1; +select id from t1; +select id from t1; +select id from t1; +delete from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1; \ No newline at end of file diff --git a/server/modules/filter/test/fwfilter/fwtest.output b/server/modules/filter/test/fwfilter/fwtest.output new file mode 100755 index 000000000..b10d04a79 --- /dev/null +++ b/server/modules/filter/test/fwfilter/fwtest.output @@ -0,0 +1,8 @@ +select id from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1; \ No newline at end of file diff --git a/server/modules/filter/test/fwfilter/fwtest2.expected b/server/modules/filter/test/fwfilter/fwtest2.expected new file mode 100755 index 000000000..b10d04a79 --- /dev/null +++ b/server/modules/filter/test/fwfilter/fwtest2.expected @@ -0,0 +1,8 @@ +select id from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1; \ No newline at end of file diff --git a/server/modules/filter/test/fwfilter/fwtest2.input b/server/modules/filter/test/fwfilter/fwtest2.input new file mode 100755 index 000000000..9cd95178d --- /dev/null +++ b/server/modules/filter/test/fwfilter/fwtest2.input @@ -0,0 +1,10 @@ +select id from t1; +select id from t1 union select name from t2; +select id from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1; +select id from t1 union select name from t2; +select id from t1; +select id from t1; \ No newline at end of file diff --git a/server/modules/filter/test/harness_common.c b/server/modules/filter/test/harness_common.c index 20fe81c49..026f46234 100644 --- a/server/modules/filter/test/harness_common.c +++ b/server/modules/filter/test/harness_common.c @@ -10,7 +10,7 @@ int dcbfun(struct dcb* dcb, GWBUF * buffer) int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){ - int i = 0; + int i = 0,rval = 0; MYSQL_session* mysqlsess; DCB* dcb; char cwd[1024]; @@ -60,7 +60,7 @@ int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){ skygw_logmanager_init( 3, optstr); free(optstr); - process_opts(argc,argv); + rval = process_opts(argc,argv); if(!(instance.thrpool = malloc(instance.thrcount * sizeof(pthread_t)))){ printf("Error: Out of memory\n"); @@ -72,10 +72,10 @@ int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){ pthread_mutex_lock(&instance.work_mtx); size_t thr_num = 1; for(i = 0;iexpected){ + + if(inst->expected > 0){ return compare_files(inst->outfile,inst->expected); } return 0; diff --git a/server/modules/filter/test/hint_testing.cnf b/server/modules/filter/test/hintfilter/hint_testing.cnf similarity index 100% rename from server/modules/filter/test/hint_testing.cnf rename to server/modules/filter/test/hintfilter/hint_testing.cnf diff --git a/server/modules/filter/test/hint_testing.expected b/server/modules/filter/test/hintfilter/hint_testing.expected similarity index 100% rename from server/modules/filter/test/hint_testing.expected rename to server/modules/filter/test/hintfilter/hint_testing.expected diff --git a/server/modules/filter/test/hint_testing.input b/server/modules/filter/test/hintfilter/hint_testing.input similarity index 100% rename from server/modules/filter/test/hint_testing.input rename to server/modules/filter/test/hintfilter/hint_testing.input diff --git a/server/modules/filter/test/hint_tests.sh b/server/modules/filter/test/hintfilter/hint_tests.sh similarity index 100% rename from server/modules/filter/test/hint_tests.sh rename to server/modules/filter/test/hintfilter/hint_tests.sh diff --git a/server/modules/filter/test/regextest.cnf b/server/modules/filter/test/regexfilter/regextest.cnf similarity index 100% rename from server/modules/filter/test/regextest.cnf rename to server/modules/filter/test/regexfilter/regextest.cnf diff --git a/server/modules/filter/test/regextest.expected b/server/modules/filter/test/regexfilter/regextest.expected similarity index 100% rename from server/modules/filter/test/regextest.expected rename to server/modules/filter/test/regexfilter/regextest.expected diff --git a/server/modules/filter/test/regextest.input b/server/modules/filter/test/regexfilter/regextest.input similarity index 100% rename from server/modules/filter/test/regextest.input rename to server/modules/filter/test/regexfilter/regextest.input diff --git a/server/modules/filter/test/rules b/server/modules/filter/test/rules new file mode 100644 index 000000000..050386139 --- /dev/null +++ b/server/modules/filter/test/rules @@ -0,0 +1,4 @@ +rule union_regex deny regex '.*union.*' +rule dont_delete_everything deny no_where_clause on_operations delete|update +rule no_wildcard deny wildcard +users %@% match any rules union_regex dont_delete_everything no_wildcard diff --git a/server/modules/filter/test/testdriver.sh b/server/modules/filter/test/testdriver.sh new file mode 100755 index 000000000..1b34f3781 --- /dev/null +++ b/server/modules/filter/test/testdriver.sh @@ -0,0 +1,11 @@ +#! /bin/bash +if [[ $# -lt 4 ]] +then + echo "Usage: $0 " + exit 1 +fi + +TESTDIR=@CMAKE_CURRENT_BINARY_DIR@ +SRCDIR=@CMAKE_CURRENT_SOURCE_DIR@ +$TESTDIR/harness -i $SRCDIR/$2 -o $TESTDIR/$3 -c $TESTDIR/$1 -t 1 -s 1 -e $SRCDIR/$4 +exit $? diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index ca3708c2f..4fb6c15b9 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -1494,12 +1494,17 @@ retblock: * and lengths have been noticed and counted. * Session commands need to be marked so that they can be handled properly in * the router's clientReply. - * Return the pointer to outbuf. + * + * @param dcb Backend's DCB where data was read from + * @param readbuf GWBUF where data was read to + * @param nbytes_to_process Number of bytes that has been read and need to be processed + * + * @return GWBUF which includes complete MySQL packet */ static GWBUF* process_response_data ( DCB* dcb, GWBUF* readbuf, - int nbytes_to_process) /*< number of new bytes read */ + int nbytes_to_process) { int npackets_left = 0; /*< response's packet count */ ssize_t nbytes_left = 0; /*< nbytes to be read for the packet */