diff --git a/query_classifier/query_classifier.cc b/query_classifier/query_classifier.cc index 234d56e31..87056ec23 100644 --- a/query_classifier/query_classifier.cc +++ b/query_classifier/query_classifier.cc @@ -1210,6 +1210,8 @@ bool is_drop_table_query(GWBUF* querybuf) lex->sql_command == SQLCOM_DROP_TABLE); } + + inline void add_str(char** buf, int* buflen, int* bufsize, char* str) { int isize = strlen(str) + 1; @@ -1542,7 +1544,7 @@ static void parsing_info_set_plain_str( * @return string representing the query type value */ char* skygw_get_qtype_str( - skygw_query_type_t qtype) + skygw_query_type_t qtype) { int t1 = (int)qtype; int t2 = 1; @@ -1554,27 +1556,27 @@ char* skygw_get_qtype_str( * t1 is completely cleared. */ while (t1 != 0) - { - if (t1&t2) - { - t = (skygw_query_type_t)t2; + { + if (t1&t2) + { + t = (skygw_query_type_t)t2; - if (qtype_str == NULL) - { - qtype_str = strdup(STRQTYPE(t)); - } - else - { - size_t len = strlen(STRQTYPE(t)); - /** reallocate space for delimiter, new string and termination */ - qtype_str = (char *)realloc(qtype_str, strlen(qtype_str)+1+len+1); - snprintf(qtype_str+strlen(qtype_str), 1+len+1, "|%s", STRQTYPE(t)); - } - /** Remove found value from t1 */ - t1 &= ~t2; - } - t2 <<= 1; + if (qtype_str == NULL) + { + qtype_str = strdup(STRQTYPE(t)); + } + else + { + size_t len = strlen(STRQTYPE(t)); + /** reallocate space for delimiter, new string and termination */ + qtype_str = (char *)realloc(qtype_str, strlen(qtype_str)+1+len+1); + snprintf(qtype_str+strlen(qtype_str), 1+len+1, "|%s", STRQTYPE(t)); + } + /** Remove found value from t1 */ + t1 &= ~t2; } + t2 <<= 1; + } return qtype_str; } diff --git a/server/core/dcb.c b/server/core/dcb.c index 0e9c8b594..896892e5d 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -1222,7 +1222,7 @@ dcb_close(DCB *dcb) * dcb_close may be called for freshly created dcb, in which case * it only needs to be freed. */ - if (dcb->state == DCB_STATE_ALLOC) + if (dcb->state == DCB_STATE_ALLOC) { dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL); dcb_final_free(dcb); diff --git a/server/core/modutil.c b/server/core/modutil.c index fd7fb3b68..33c302e7f 100644 --- a/server/core/modutil.c +++ b/server/core/modutil.c @@ -494,38 +494,92 @@ return_packetbuf: return packetbuf; } +/** + * Parse the buffer and split complete packets into individual buffers. + * Any partial packets are left in the old buffer. + * @param p_readbuf Buffer to split + * @return Head of the chain of complete packets + */ +GWBUF* modutil_get_complete_packets(GWBUF** p_readbuf) +{ + GWBUF *buff = NULL, *packet = NULL; + + while((packet = modutil_get_next_MySQL_packet(p_readbuf)) != NULL) + { + buff = gwbuf_append(buff,packet); + } + + return buff; +} /** - * Count the number of EOF, OK or ERR packets in the buffer. + * Count the number of EOF, OK or ERR packets in the buffer. Only complete + * packets are inspected and the buffer is assumed to only contain whole packets. + * If partial packets are in the buffer, they are ingnored. The caller must handle the + * detection of partial packets in buffers. * @param reply Buffer to use * @param use_ok Whether the DEPRECATE_EOF flag is set * @param n_found If there were previous packets found * @return Number of EOF packets */ int -modutil_count_signal_packets(GWBUF *reply,int use_ok, int n_found) +modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found) { unsigned char* ptr = (unsigned char*) reply->start; unsigned char* end = (unsigned char*) reply->end; - int pktlen,pkt = 0; - + unsigned char* prev = ptr; + int pktlen, eof = 0, err = 0, found = n_found; + int errlen = 0, eoflen = 0; + int iserr = 0, iseof = 0; while(ptr < end) { - pktlen = gw_mysql_get_byte3(ptr) + 4; - - if(PTR_IS_ERR(ptr) || (PTR_IS_EOF(ptr) && !use_ok) || (use_ok && PTR_IS_OK(ptr))) + + pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4; + + if((iserr = PTR_IS_ERR(ptr)) || (iseof = PTR_IS_EOF(ptr))) { - if(n_found) + if(iserr) { - if(ptr + pktlen >= end) - pkt++; + err++; + errlen = pktlen; } - else + else if(iseof) { - pkt++; + eof++; + eoflen = pktlen; } } + + if((ptr + pktlen) > end) + { + ptr = prev; + break; + } + + prev = ptr; ptr += pktlen; } - return pkt; + + + /* + * If there were new EOF/ERR packets found, make sure that they are the last + * packet in the buffer. + */ + if((eof || err) && n_found) + { + if(err) + { + ptr -= errlen; + if(!PTR_IS_ERR(ptr)) + err = 0; + } + else + { + ptr -= eoflen; + if(!PTR_IS_EOF(ptr)) + eof = 0; + } + } + + return(eof + err); } diff --git a/server/include/modutil.h b/server/include/modutil.h index 2ccab523c..e287b080e 100644 --- a/server/include/modutil.h +++ b/server/include/modutil.h @@ -48,6 +48,7 @@ extern GWBUF *modutil_replace_SQL(GWBUF *, char *); extern char *modutil_get_query(GWBUF* buf); extern int modutil_send_mysql_err_packet(DCB *, int, int, int, const char *, const char *); GWBUF* modutil_get_next_MySQL_packet(GWBUF** p_readbuf); +GWBUF* modutil_get_complete_packets(GWBUF** p_readbuf); int modutil_MySQL_query_len(GWBUF* buf, int* nbytes_missing); GWBUF *modutil_create_mysql_err_msg( diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index 639ad7a51..17331e21a 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -161,7 +161,7 @@ typedef struct { FILTER_DEF* dummy_filterdef; int active; /* filter is active? */ bool use_ok; - bool multipacket; + bool multipacket[2]; unsigned char command; bool waiting[2]; /* if the client is waiting for a reply */ int eof[2]; @@ -172,7 +172,11 @@ typedef struct { int n_rejected; /* Number of rejected queries */ int residual; /* Any outstanding SQL text */ GWBUF* tee_replybuf; /* Buffer for reply */ + GWBUF* tee_partials[2]; SPINLOCK tee_lock; +#ifdef SS_DEBUG + long d_id; +#endif } TEE_SESSION; typedef struct orphan_session_tt @@ -181,6 +185,11 @@ typedef struct orphan_session_tt struct orphan_session_tt* next; }orphan_session_t; +#ifdef SS_DEBUG +static SPINLOCK debug_lock; +static long debug_id = 0; +#endif + static orphan_session_t* allOrphans = NULL; static SPINLOCK orphanLock; @@ -322,7 +331,10 @@ void ModuleInit() { spinlock_init(&orphanLock); - hktask_add("tee orphan cleanup",orphan_free,NULL,15); + //hktask_add("tee orphan cleanup",orphan_free,NULL,15); +#ifdef SS_DEBUG + spinlock_init(&debug_lock); +#endif } /** @@ -707,6 +719,8 @@ session_state_t state; gwbuf_free(my_session->tee_replybuf); free(session); + orphan_free(NULL); + return; } /** @@ -813,10 +827,10 @@ unsigned char command = *((unsigned char*)queue->start + 4); case 0x17: case 0x04: case 0x0a: - my_session->multipacket = true; + memset(my_session->multipacket,(char)true,2*sizeof(bool)); break; default: - my_session->multipacket = false; + memset(my_session->multipacket,(char)false,2*sizeof(bool)); break; } @@ -824,6 +838,21 @@ unsigned char command = *((unsigned char*)queue->start + 4); memset(my_session->eof,0,2*sizeof(int)); memset(my_session->waiting,1,2*sizeof(bool)); my_session->command = command; +#ifdef SS_DEBUG + spinlock_acquire(&debug_lock); + my_session->d_id = ++debug_id; + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] command [%x]", + my_session->d_id, + my_session->command); + if(command == 0x03) + { + char* tmpstr = modutil_get_SQL(queue); + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c query: '%s'", + tmpstr); + free(tmpstr); + } + spinlock_release(&debug_lock); +#endif rval = my_session->down.routeQuery(my_session->down.instance, my_session->down.session, queue); @@ -873,93 +902,159 @@ unsigned char command = *((unsigned char*)queue->start + 4); static int clientReply (FILTER* instance, void *session, GWBUF *reply) { - int rc, branch, eof; - TEE_SESSION *my_session = (TEE_SESSION *) session; + int rc, branch, eof; + TEE_SESSION *my_session = (TEE_SESSION *) session; + bool route = false,mpkt; + GWBUF *complete = NULL; + unsigned char *ptr; + int min_eof = my_session->command != 0x04 ? 2 : 1; + + spinlock_acquire(&my_session->tee_lock); - spinlock_acquire(&my_session->tee_lock); + ss_dassert(my_session->active); - ss_dassert(my_session->active); + branch = instance == NULL ? CHILD : PARENT; - branch = instance == NULL ? CHILD : PARENT; - unsigned char *ptr = (unsigned char*)reply->start; + my_session->tee_partials[branch] = gwbuf_append(my_session->tee_partials[branch], reply); + my_session->tee_partials[branch] = gwbuf_make_contiguous(my_session->tee_partials[branch]); + complete = modutil_get_complete_packets(&my_session->tee_partials[branch]); + complete = gwbuf_make_contiguous(complete); - if(my_session->replies[branch] == 0) - { - /* Reply is in a single packet if it is an OK, ERR or LOCAL_INFILE packet. - * Otherwise the reply is a result set and the amount of packets is unknown. - */ - if(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) || - PTR_IS_OK(ptr) || !my_session->multipacket ) - { - my_session->waiting[branch] = false; - } + if(my_session->tee_partials[branch] && + GWBUF_EMPTY(my_session->tee_partials[branch])) + { + gwbuf_free(my_session->tee_partials[branch]); + my_session->tee_partials[branch] = NULL; + } + + ptr = (unsigned char*) complete->start; + + if(my_session->replies[branch] == 0) + { + /* Reply is in a single packet if it is an OK, ERR or LOCAL_INFILE packet. + * Otherwise the reply is a result set and the amount of packets is unknown. + */ + if(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) || + PTR_IS_OK(ptr) || !my_session->multipacket[branch] ) + { + my_session->waiting[branch] = false; + my_session->multipacket[branch] = false; + } #ifdef SS_DEBUG - else - { - ss_dassert(PTR_IS_RESULTSET(ptr)); - skygw_log_write_flush(LOGFILE_DEBUG,"tee.c: Waiting for a result set from %s session.",branch == PARENT?"parent":"child"); - } - ss_dassert(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)|| - PTR_IS_OK(ptr) || my_session->waiting[branch] || - !my_session->multipacket); + else + { + ss_dassert(PTR_IS_RESULTSET(ptr)); + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c: [%d] Waiting for a result set from %s session.", + my_session->d_id, + branch == PARENT?"parent":"child"); + } + ss_dassert(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)|| + PTR_IS_OK(ptr) || my_session->waiting[branch] || + !my_session->multipacket); #endif - } + } - if(my_session->waiting[branch]) - { - - eof = modutil_count_signal_packets(reply,my_session->use_ok,my_session->eof[branch] > 0); - my_session->eof[branch] += eof; - if(my_session->eof[branch] >= 2 || - (my_session->command == 0x04 && my_session->eof[branch] > 0)) - { - ss_dassert(my_session->eof[branch] < 3) - my_session->waiting[branch] = false; - } - } + if(my_session->waiting[branch]) + { + + eof = modutil_count_signal_packets(complete,my_session->use_ok,my_session->eof[branch] > 0); + my_session->eof[branch] += eof; + if(my_session->eof[branch] >= min_eof) + { +#ifdef SS_DEBUG + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] %s received last EOF packet", + my_session->d_id, + branch == PARENT?"parent":"child"); +#endif + ss_dassert(my_session->eof[branch] < 3) + my_session->waiting[branch] = false; + } + } - if(branch == PARENT) - { - ss_dassert(my_session->tee_replybuf == NULL) - my_session->tee_replybuf = reply; - } - else - { - gwbuf_free(reply); - } + if(branch == PARENT) + { + ss_dassert(my_session->tee_replybuf == NULL); + my_session->tee_replybuf = complete; + } + else + { + if(complete) + gwbuf_free(complete); + } - my_session->replies[branch]++; - - if(my_session->tee_replybuf != NULL && - (my_session->branch_session == NULL || - my_session->waiting[PARENT] || - (!my_session->waiting[CHILD] && !my_session->waiting[PARENT]))) + my_session->replies[branch]++; + rc = 1; + mpkt = my_session->multipacket[PARENT] || my_session->multipacket[CHILD]; + + + + if(my_session->tee_replybuf != NULL) + { + + if(my_session->branch_session == NULL) + { + rc = 0; + gwbuf_free(my_session->tee_replybuf); + my_session->tee_replybuf = NULL; + skygw_log_write_flush(LOGFILE_ERROR,"Error : Tee child session was closed."); + } + + if(mpkt) + { + + if(my_session->waiting[PARENT]) + { + route = true; +#ifdef SS_DEBUG + ss_dassert(my_session->replies[PARENT] < 2 || + modutil_count_signal_packets(my_session->tee_replybuf, + my_session->use_ok, + my_session->eof[PARENT]) == 0); + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing partial response set.",my_session->d_id); +#endif + } + else if(my_session->eof[PARENT] == min_eof && + my_session->eof[CHILD] == min_eof) + { + route = true; +#ifdef SS_DEBUG + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing final packet of response set.",my_session->d_id); +#endif + } + } + else if(!my_session->waiting[PARENT] && + !my_session->waiting[CHILD]) + { +#ifdef SS_DEBUG + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing single packet response.",my_session->d_id); +#endif + route = true; + } + } + + if(route) { #ifdef SS_DEBUG - skygw_log_write_flush(LOGFILE_DEBUG, "tee.c: Routing buffer '%p' parent(waiting [%s] replies [%d] eof[%d])" - " child(waiting [%s] replies[%d] eof [%d])", - my_session->tee_replybuf, - my_session->waiting[PARENT] ? "true":"false", - my_session->replies[PARENT], - my_session->eof[PARENT], - my_session->waiting[CHILD]?"true":"false", - my_session->replies[CHILD], - my_session->eof[CHILD]); + skygw_log_write_flush(LOGFILE_DEBUG, "tee.c:[%d] Routing buffer '%p' parent(waiting [%s] replies [%d] eof[%d])" + " child(waiting [%s] replies[%d] eof [%d])", + my_session->d_id, + my_session->tee_replybuf, + my_session->waiting[PARENT] ? "true":"false", + my_session->replies[PARENT], + my_session->eof[PARENT], + my_session->waiting[CHILD]?"true":"false", + my_session->replies[CHILD], + my_session->eof[CHILD]); #endif - - rc = my_session->up.clientReply ( - my_session->up.instance, - my_session->up.session, - my_session->tee_replybuf); - my_session->tee_replybuf = NULL; - } - else - { - rc = 1; - } - - spinlock_release(&my_session->tee_lock); - return rc; + + rc = my_session->up.clientReply (my_session->up.instance, + my_session->up.session, + my_session->tee_replybuf); + my_session->tee_replybuf = NULL; + } + + spinlock_release(&my_session->tee_lock); + return rc; } /** * Diagnostics routine diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 4fb6c15b9..ecfa4644a 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -1118,7 +1118,7 @@ gw_backend_hangup(DCB *dcb) gwbuf_free(errbuf); /** There are no required backends available, close session. */ - if (!succp) + if (!succp) { #if defined(SS_DEBUG) LOGIF(LE, (skygw_log_write_flush( diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index cc1a2e8f1..ad3b8e0b6 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -836,7 +836,7 @@ static void* newSession( free(backend_ref); client_rses = NULL; goto return_rses; - } + } /** * Initialize backend references with BACKEND ptr. * Initialize session command cursors for each backend reference. @@ -2706,7 +2706,7 @@ static void clientReply ( goto lock_failed; } /** There is one pending session command to be executed. */ - if (sescmd_cursor_is_active(scur)) + if (sescmd_cursor_is_active(scur)) { bool succp; @@ -2962,7 +2962,7 @@ static bool select_connect_backend_servers( { succp = false; goto return_succp; - } + } if (router->bitvalue != 0) /*< 'synced' is the only bitvalue in rwsplit */ { @@ -3609,7 +3609,7 @@ static GWBUF* sescmd_cursor_process_replies( bref_clear_state(bref, BREF_WAITING_RESULT); } /** Response is in the buffer and it will be sent to client. */ - else if (replybuf != NULL) + else { /** Mark the rest session commands as replied */ scmd->my_sescmd_is_replied = true; @@ -4542,7 +4542,7 @@ static bool handle_error_new_connection( succp = true; goto return_succp; } - /** + /** * Remove callback because this DCB won't be used * unless it is reconnected later, and then the callback * is set again.