diff --git a/query_classifier/query_classifier.cc b/query_classifier/query_classifier.cc index 52cb11881..2078b82f3 100644 --- a/query_classifier/query_classifier.cc +++ b/query_classifier/query_classifier.cc @@ -1095,7 +1095,8 @@ char** skygw_get_table_names(GWBUF* querybuf, int* tblsize, bool fullnames) lex->current_select = lex->current_select->next_select_in_list(); } /*< while(lex->current_select) */ retblock: - *tblsize = i; + if(tblsize) + *tblsize = i; return tables; } @@ -1206,6 +1207,7 @@ inline void add_str(char** buf, int* buflen, int* bufsize, char* str) } if(*buflen > 0){ + if(*buf) strcat(*buf," "); } strcat(*buf,str); diff --git a/server/core/modutil.c b/server/core/modutil.c index fd7fb3b68..33c302e7f 100644 --- a/server/core/modutil.c +++ b/server/core/modutil.c @@ -494,38 +494,92 @@ return_packetbuf: return packetbuf; } +/** + * Parse the buffer and split complete packets into individual buffers. + * Any partial packets are left in the old buffer. + * @param p_readbuf Buffer to split + * @return Head of the chain of complete packets + */ +GWBUF* modutil_get_complete_packets(GWBUF** p_readbuf) +{ + GWBUF *buff = NULL, *packet = NULL; + + while((packet = modutil_get_next_MySQL_packet(p_readbuf)) != NULL) + { + buff = gwbuf_append(buff,packet); + } + + return buff; +} /** - * Count the number of EOF, OK or ERR packets in the buffer. + * Count the number of EOF, OK or ERR packets in the buffer. Only complete + * packets are inspected and the buffer is assumed to only contain whole packets. + * If partial packets are in the buffer, they are ingnored. The caller must handle the + * detection of partial packets in buffers. * @param reply Buffer to use * @param use_ok Whether the DEPRECATE_EOF flag is set * @param n_found If there were previous packets found * @return Number of EOF packets */ int -modutil_count_signal_packets(GWBUF *reply,int use_ok, int n_found) +modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found) { unsigned char* ptr = (unsigned char*) reply->start; unsigned char* end = (unsigned char*) reply->end; - int pktlen,pkt = 0; - + unsigned char* prev = ptr; + int pktlen, eof = 0, err = 0, found = n_found; + int errlen = 0, eoflen = 0; + int iserr = 0, iseof = 0; while(ptr < end) { - pktlen = gw_mysql_get_byte3(ptr) + 4; - - if(PTR_IS_ERR(ptr) || (PTR_IS_EOF(ptr) && !use_ok) || (use_ok && PTR_IS_OK(ptr))) + + pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4; + + if((iserr = PTR_IS_ERR(ptr)) || (iseof = PTR_IS_EOF(ptr))) { - if(n_found) + if(iserr) { - if(ptr + pktlen >= end) - pkt++; + err++; + errlen = pktlen; } - else + else if(iseof) { - pkt++; + eof++; + eoflen = pktlen; } } + + if((ptr + pktlen) > end) + { + ptr = prev; + break; + } + + prev = ptr; ptr += pktlen; } - return pkt; + + + /* + * If there were new EOF/ERR packets found, make sure that they are the last + * packet in the buffer. + */ + if((eof || err) && n_found) + { + if(err) + { + ptr -= errlen; + if(!PTR_IS_ERR(ptr)) + err = 0; + } + else + { + ptr -= eoflen; + if(!PTR_IS_EOF(ptr)) + eof = 0; + } + } + + return(eof + err); } diff --git a/server/include/modutil.h b/server/include/modutil.h index 2ccab523c..e287b080e 100644 --- a/server/include/modutil.h +++ b/server/include/modutil.h @@ -48,6 +48,7 @@ extern GWBUF *modutil_replace_SQL(GWBUF *, char *); extern char *modutil_get_query(GWBUF* buf); extern int modutil_send_mysql_err_packet(DCB *, int, int, int, const char *, const char *); GWBUF* modutil_get_next_MySQL_packet(GWBUF** p_readbuf); +GWBUF* modutil_get_complete_packets(GWBUF** p_readbuf); int modutil_MySQL_query_len(GWBUF* buf, int* nbytes_missing); GWBUF *modutil_create_mysql_err_msg( diff --git a/server/modules/filter/fwfilter.c b/server/modules/filter/fwfilter.c index 2a15d126f..2e2c5bd44 100644 --- a/server/modules/filter/fwfilter.c +++ b/server/modules/filter/fwfilter.c @@ -686,11 +686,13 @@ void link_rules(char* rule, FW_INSTANCE* instance) user = (USER*)calloc(1,sizeof(USER)); if(user == NULL){ + free(rulelist); return; } if((user->lock = (SPINLOCK*)malloc(sizeof(SPINLOCK))) == NULL){ free(user); + free(rulelist); return; } @@ -750,7 +752,21 @@ void parse_rule(char* rule, FW_INSTANCE* instance) RULELIST* rlist = NULL; ruledef = (RULE*)calloc(1,sizeof(RULE)); + + if(ruledef == NULL) + { + skygw_log_write(LOGFILE_ERROR,"Error : Memory allocation failed."); + goto retblock; + } + rlist = (RULELIST*)calloc(1,sizeof(RULELIST)); + + if(rlist == NULL) + { + free(ruledef); + skygw_log_write(LOGFILE_ERROR,"Error : Memory allocation failed."); + goto retblock; + } ruledef->name = strdup(tok); ruledef->type = RT_UNDEFINED; ruledef->on_queries = QUERY_OP_UNDEFINED; @@ -843,12 +859,17 @@ void parse_rule(char* rule, FW_INSTANCE* instance) } str = calloc(((tok - start) + 1),sizeof(char)); + if(str == NULL) + { + skygw_log_write_flush(LOGFILE_ERROR, "Fatal Error: malloc returned NULL."); + goto retblock; + } re = (regex_t*)malloc(sizeof(regex_t)); - if(re == NULL || str == NULL){ + if(re == NULL){ skygw_log_write_flush(LOGFILE_ERROR, "Fatal Error: malloc returned NULL."); - - return; + free(str); + goto retblock; } memcpy(str, start, (tok-start)); @@ -857,9 +878,11 @@ void parse_rule(char* rule, FW_INSTANCE* instance) skygw_log_write(LOGFILE_ERROR, "fwfilter: Invalid regular expression '%s'.", str); free(re); } - - ruledef->type = RT_REGEX; - ruledef->data = (void*) re; + else + { + ruledef->type = RT_REGEX; + ruledef->data = (void*) re; + } free(str); } @@ -929,6 +952,7 @@ createInstance(char **options, FILTER_PARAMETER **params) if ((my_instance = calloc(1, sizeof(FW_INSTANCE))) == NULL || (my_instance->lock = (SPINLOCK*)malloc(sizeof(SPINLOCK))) == NULL){ + skygw_log_write(LOGFILE_ERROR, "Memory allocation for firewall filter failed."); return NULL; } @@ -947,11 +971,23 @@ createInstance(char **options, FILTER_PARAMETER **params) for(i = 0;params[i];i++){ if(strcmp(params[i]->name, "rules") == 0){ + + if(filename) + free(filename); + filename = strdup(params[i]->value); } } - + + if(filename == NULL) + { + skygw_log_write(LOGFILE_ERROR, "Unable to find rule file for firewall filter."); + free(my_instance); + return NULL; + } + if((file = fopen(filename,"rb")) == NULL ){ + skygw_log_write(LOGFILE_ERROR, "Error while opening rule file for firewall filter."); free(my_instance); free(filename); return NULL; @@ -964,6 +1000,8 @@ createInstance(char **options, FILTER_PARAMETER **params) if(fgets(buffer,2048,file) == NULL){ if(ferror(file)){ + skygw_log_write(LOGFILE_ERROR, "Error while reading rule file for firewall filter."); + fclose(file); free(my_instance); return NULL; } @@ -971,13 +1009,13 @@ createInstance(char **options, FILTER_PARAMETER **params) if(feof(file)){ break; } - } - + } + if((nl = strchr(buffer,'\n')) != NULL && ((char*)nl - (char*)buffer) < 2048){ *nl = '\0'; } + parse_rule(buffer,my_instance); - } fclose(file); @@ -1074,16 +1112,26 @@ setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) GWBUF* gen_dummy_error(FW_SESSION* session, char* msg) { GWBUF* buf; - char* errmsg; - DCB* dcb = session->session->client; - MYSQL_session* mysql_session = (MYSQL_session*)session->session->data; + char* errmsg; + DCB* dcb; + MYSQL_session* mysql_session; unsigned int errlen; + + if(session == NULL || session->session == NULL || + session->session->data == NULL || + session->session->client == NULL) + { + skygw_log_write_flush(LOGFILE_ERROR, "Error : Firewall filter session missing data."); + return NULL; + } + dcb = session->session->client; + mysql_session = (MYSQL_session*)session->session->data; errlen = msg != NULL ? strlen(msg) : 0; errmsg = (char*)malloc((512 + errlen)*sizeof(char)); if(errmsg == NULL){ - skygw_log_write_flush(LOGFILE_ERROR, "Fatal Error: malloc returned NULL."); + skygw_log_write_flush(LOGFILE_ERROR, "Fatal Error: Memory allocation failed."); return NULL; } @@ -1110,7 +1158,8 @@ GWBUF* gen_dummy_error(FW_SESSION* session, char* msg) } buf = modutil_create_mysql_err_msg(1,0,1141,"HY000", (const char*)errmsg); - + free(errmsg); + return buf; } @@ -1192,7 +1241,7 @@ bool rule_matches(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue char emsg[512]; int qlen; bool is_sql, is_real, matches; - skygw_query_op_t optype; + skygw_query_op_t optype = QUERY_OP_UNDEFINED; STRLINK* strln = NULL; QUERYSPEED* queryspeed = NULL; QUERYSPEED* rule_qs = NULL; @@ -1469,7 +1518,7 @@ bool check_match_any(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *qu */ bool check_match_all(FW_INSTANCE* my_instance, FW_SESSION* my_session, GWBUF *queue, USER* user) { - bool is_sql, rval; + bool is_sql, rval = 0; int qlen; char *fullquery = NULL,*ptr; diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index 0f3021942..17331e21a 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -161,7 +161,7 @@ typedef struct { FILTER_DEF* dummy_filterdef; int active; /* filter is active? */ bool use_ok; - bool multipacket; + bool multipacket[2]; unsigned char command; bool waiting[2]; /* if the client is waiting for a reply */ int eof[2]; @@ -172,7 +172,11 @@ typedef struct { int n_rejected; /* Number of rejected queries */ int residual; /* Any outstanding SQL text */ GWBUF* tee_replybuf; /* Buffer for reply */ + GWBUF* tee_partials[2]; SPINLOCK tee_lock; +#ifdef SS_DEBUG + long d_id; +#endif } TEE_SESSION; typedef struct orphan_session_tt @@ -181,6 +185,11 @@ typedef struct orphan_session_tt struct orphan_session_tt* next; }orphan_session_t; +#ifdef SS_DEBUG +static SPINLOCK debug_lock; +static long debug_id = 0; +#endif + static orphan_session_t* allOrphans = NULL; static SPINLOCK orphanLock; @@ -283,7 +292,9 @@ orphan_free(void* data) while(finished) { +#ifdef SS_DEBUG o_freed++; +#endif tmp = finished; finished = finished->next; @@ -320,7 +331,10 @@ void ModuleInit() { spinlock_init(&orphanLock); - hktask_add("tee orphan cleanup",orphan_free,NULL,15); + //hktask_add("tee orphan cleanup",orphan_free,NULL,15); +#ifdef SS_DEBUG + spinlock_init(&debug_lock); +#endif } /** @@ -705,6 +719,8 @@ session_state_t state; gwbuf_free(my_session->tee_replybuf); free(session); + orphan_free(NULL); + return; } /** @@ -811,10 +827,10 @@ unsigned char command = *((unsigned char*)queue->start + 4); case 0x17: case 0x04: case 0x0a: - my_session->multipacket = true; + memset(my_session->multipacket,(char)true,2*sizeof(bool)); break; default: - my_session->multipacket = false; + memset(my_session->multipacket,(char)false,2*sizeof(bool)); break; } @@ -822,6 +838,21 @@ unsigned char command = *((unsigned char*)queue->start + 4); memset(my_session->eof,0,2*sizeof(int)); memset(my_session->waiting,1,2*sizeof(bool)); my_session->command = command; +#ifdef SS_DEBUG + spinlock_acquire(&debug_lock); + my_session->d_id = ++debug_id; + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] command [%x]", + my_session->d_id, + my_session->command); + if(command == 0x03) + { + char* tmpstr = modutil_get_SQL(queue); + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c query: '%s'", + tmpstr); + free(tmpstr); + } + spinlock_release(&debug_lock); +#endif rval = my_session->down.routeQuery(my_session->down.instance, my_session->down.session, queue); @@ -871,93 +902,159 @@ unsigned char command = *((unsigned char*)queue->start + 4); static int clientReply (FILTER* instance, void *session, GWBUF *reply) { - int rc, branch, eof; - TEE_SESSION *my_session = (TEE_SESSION *) session; + int rc, branch, eof; + TEE_SESSION *my_session = (TEE_SESSION *) session; + bool route = false,mpkt; + GWBUF *complete = NULL; + unsigned char *ptr; + int min_eof = my_session->command != 0x04 ? 2 : 1; + + spinlock_acquire(&my_session->tee_lock); - spinlock_acquire(&my_session->tee_lock); + ss_dassert(my_session->active); - ss_dassert(my_session->active); + branch = instance == NULL ? CHILD : PARENT; - branch = instance == NULL ? CHILD : PARENT; - unsigned char *ptr = (unsigned char*)reply->start; + my_session->tee_partials[branch] = gwbuf_append(my_session->tee_partials[branch], reply); + my_session->tee_partials[branch] = gwbuf_make_contiguous(my_session->tee_partials[branch]); + complete = modutil_get_complete_packets(&my_session->tee_partials[branch]); + complete = gwbuf_make_contiguous(complete); - if(my_session->replies[branch] == 0) - { - /* Reply is in a single packet if it is an OK, ERR or LOCAL_INFILE packet. - * Otherwise the reply is a result set and the amount of packets is unknown. - */ - if(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) || - PTR_IS_OK(ptr) || !my_session->multipacket ) - { - my_session->waiting[branch] = false; - } + if(my_session->tee_partials[branch] && + GWBUF_EMPTY(my_session->tee_partials[branch])) + { + gwbuf_free(my_session->tee_partials[branch]); + my_session->tee_partials[branch] = NULL; + } + + ptr = (unsigned char*) complete->start; + + if(my_session->replies[branch] == 0) + { + /* Reply is in a single packet if it is an OK, ERR or LOCAL_INFILE packet. + * Otherwise the reply is a result set and the amount of packets is unknown. + */ + if(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) || + PTR_IS_OK(ptr) || !my_session->multipacket[branch] ) + { + my_session->waiting[branch] = false; + my_session->multipacket[branch] = false; + } #ifdef SS_DEBUG - else - { - ss_dassert(PTR_IS_RESULTSET(ptr)); - skygw_log_write_flush(LOGFILE_DEBUG,"tee.c: Waiting for a result set from %s session.",branch == PARENT?"parent":"child"); - } - ss_dassert(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)|| - PTR_IS_OK(ptr) || my_session->waiting[branch] || - !my_session->multipacket); + else + { + ss_dassert(PTR_IS_RESULTSET(ptr)); + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c: [%d] Waiting for a result set from %s session.", + my_session->d_id, + branch == PARENT?"parent":"child"); + } + ss_dassert(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)|| + PTR_IS_OK(ptr) || my_session->waiting[branch] || + !my_session->multipacket); #endif - } + } - if(my_session->waiting[branch]) - { - - eof = modutil_count_signal_packets(reply,my_session->use_ok,my_session->eof[branch] > 0); - my_session->eof[branch] += eof; - if(my_session->eof[branch] >= 2 || - (my_session->command == 0x04 && my_session->eof[branch] > 0)) - { - ss_dassert(my_session->eof[branch] < 3) - my_session->waiting[branch] = false; - } - } + if(my_session->waiting[branch]) + { + + eof = modutil_count_signal_packets(complete,my_session->use_ok,my_session->eof[branch] > 0); + my_session->eof[branch] += eof; + if(my_session->eof[branch] >= min_eof) + { +#ifdef SS_DEBUG + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] %s received last EOF packet", + my_session->d_id, + branch == PARENT?"parent":"child"); +#endif + ss_dassert(my_session->eof[branch] < 3) + my_session->waiting[branch] = false; + } + } - if(branch == PARENT) - { - ss_dassert(my_session->tee_replybuf == NULL) - my_session->tee_replybuf = reply; - } - else - { - gwbuf_free(reply); - } + if(branch == PARENT) + { + ss_dassert(my_session->tee_replybuf == NULL); + my_session->tee_replybuf = complete; + } + else + { + if(complete) + gwbuf_free(complete); + } - my_session->replies[branch]++; - - if(my_session->tee_replybuf != NULL && - (my_session->branch_session == NULL || - my_session->waiting[PARENT] || - (!my_session->waiting[CHILD] && !my_session->waiting[PARENT]))) + my_session->replies[branch]++; + rc = 1; + mpkt = my_session->multipacket[PARENT] || my_session->multipacket[CHILD]; + + + + if(my_session->tee_replybuf != NULL) + { + + if(my_session->branch_session == NULL) + { + rc = 0; + gwbuf_free(my_session->tee_replybuf); + my_session->tee_replybuf = NULL; + skygw_log_write_flush(LOGFILE_ERROR,"Error : Tee child session was closed."); + } + + if(mpkt) + { + + if(my_session->waiting[PARENT]) + { + route = true; +#ifdef SS_DEBUG + ss_dassert(my_session->replies[PARENT] < 2 || + modutil_count_signal_packets(my_session->tee_replybuf, + my_session->use_ok, + my_session->eof[PARENT]) == 0); + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing partial response set.",my_session->d_id); +#endif + } + else if(my_session->eof[PARENT] == min_eof && + my_session->eof[CHILD] == min_eof) + { + route = true; +#ifdef SS_DEBUG + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing final packet of response set.",my_session->d_id); +#endif + } + } + else if(!my_session->waiting[PARENT] && + !my_session->waiting[CHILD]) + { +#ifdef SS_DEBUG + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing single packet response.",my_session->d_id); +#endif + route = true; + } + } + + if(route) { #ifdef SS_DEBUG - skygw_log_write_flush(LOGFILE_DEBUG, "tee.c: Routing buffer '%p' parent(waiting [%s] replies [%d] eof[%d])" - " child(waiting [%s] replies[%d] eof [%d])", - my_session->tee_replybuf, - my_session->waiting[PARENT] ? "true":"false", - my_session->replies[PARENT], - my_session->eof[PARENT], - my_session->waiting[CHILD]?"true":"false", - my_session->replies[CHILD], - my_session->eof[CHILD]); + skygw_log_write_flush(LOGFILE_DEBUG, "tee.c:[%d] Routing buffer '%p' parent(waiting [%s] replies [%d] eof[%d])" + " child(waiting [%s] replies[%d] eof [%d])", + my_session->d_id, + my_session->tee_replybuf, + my_session->waiting[PARENT] ? "true":"false", + my_session->replies[PARENT], + my_session->eof[PARENT], + my_session->waiting[CHILD]?"true":"false", + my_session->replies[CHILD], + my_session->eof[CHILD]); #endif - - rc = my_session->up.clientReply ( - my_session->up.instance, - my_session->up.session, - my_session->tee_replybuf); - my_session->tee_replybuf = NULL; - } - else - { - rc = 1; - } - - spinlock_release(&my_session->tee_lock); - return rc; + + rc = my_session->up.clientReply (my_session->up.instance, + my_session->up.session, + my_session->tee_replybuf); + my_session->tee_replybuf = NULL; + } + + spinlock_release(&my_session->tee_lock); + return rc; } /** * Diagnostics routine diff --git a/server/modules/filter/test/harness_common.c b/server/modules/filter/test/harness_common.c index 026f46234..254add4b1 100644 --- a/server/modules/filter/test/harness_common.c +++ b/server/modules/filter/test/harness_common.c @@ -995,7 +995,7 @@ GWBUF* gen_packet(PACKET pkt) int process_opts(int argc, char** argv) { int fd, buffsize = 1024; - int rd,rdsz, rval = 0,error; + int rd,rdsz, rval = 0, error = 0; size_t fsize; char *buff = calloc(buffsize,sizeof(char)), *tok = NULL;