From 3b76ed43c43aa0bc457389c66ea54f0634ad4b9a Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Tue, 3 Mar 2015 13:31:12 +0200 Subject: [PATCH] Added tee filter multi-statement support. --- server/core/modutil.c | 96 ++++++++++++++++++++++++++++++++- server/include/modutil.h | 3 ++ server/modules/filter/tee.c | 102 ++++++++++++++++++++++++++++++++---- 3 files changed, 189 insertions(+), 12 deletions(-) diff --git a/server/core/modutil.c b/server/core/modutil.c index 9ba57bdf6..5fe936e6a 100644 --- a/server/core/modutil.c +++ b/server/core/modutil.c @@ -568,7 +568,7 @@ modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found) } } - if((ptr + pktlen) > end) + if((ptr + pktlen) > end || (eof + n_found) >= 2) { ptr = prev; break; @@ -676,3 +676,97 @@ static void modutil_reply_routing_error( poll_add_epollin_event_to_dcb(backend_dcb, buf); return; } + +/** + * Find the first occurrence of a character in a string. This function ignores + * escaped characters and all characters that are enclosed in single or double quotes. + * @param ptr Pointer to area of memory to inspect + * @param c Character to search for + * @param len Size of the memory area + * @return Pointer to the first non-escaped, non-quoted occurrence of the character. + * If the character is not found, NULL is returned. + */ +void* strnchr_esc(char* ptr,char c, int len) +{ + char* p = (char*)ptr; + char* start = p; + bool quoted = false, escaped = false; + char qc; + + while(p < start + len) + { + if(escaped) + { + escaped = false; + } + else if(*p == '\\') + { + escaped = true; + } + else if((*p == '\'' || *p == '"') && !quoted) + { + quoted = true; + qc = *p; + } + else if(quoted && *p == qc) + { + quoted = false; + } + else if(*p == c && !escaped && !quoted) + { + return p; + } + p++; + } + + return NULL; +} + +/** + * Create a COM_QUERY packet from a string. + * @param query Query to create. + * @return Pointer to GWBUF with the query or NULL if an error occurred. + */ +GWBUF* modutil_create_query(char* query) +{ + if(query == NULL) + return NULL; + + GWBUF* rval = gwbuf_alloc(strlen(query) + 5); + int pktlen = strlen(query) + 1; + unsigned char* ptr; + + if(rval) + { + ptr = (unsigned char*)rval->start; + *ptr++ = (pktlen); + *ptr++ = (pktlen)>>8; + *ptr++ = (pktlen)>>16; + *ptr++ = 0x0; + *ptr++ = 0x03; + memcpy(ptr,query,strlen(query)); + gwbuf_set_type(rval,GWBUF_TYPE_MYSQL); + } + + return rval; +} + +/** + * Count the number of statements in a query. + * @param buffer Buffer to analyze. + * @return Number of statements. + */ +int modutil_count_statements(GWBUF* buffer) +{ + char* ptr = ((char*)(buffer)->start + 5); + char* end = ((char*)(buffer)->end); + int num = 1; + + while((ptr = strnchr_esc(ptr,';', end - ptr))) + { + num++; + ptr++; + } + + return num; +} \ No newline at end of file diff --git a/server/include/modutil.h b/server/include/modutil.h index d40779c4c..59fb8d4f7 100644 --- a/server/include/modutil.h +++ b/server/include/modutil.h @@ -33,6 +33,7 @@ */ #include #include +#include #define PTR_IS_RESULTSET(b) (b[0] == 0x01 && b[1] == 0x0 && b[2] == 0x0 && b[3] == 0x01) #define PTR_IS_EOF(b) (b[0] == 0x05 && b[1] == 0x0 && b[2] == 0x0 && b[4] == 0xfe) @@ -52,6 +53,8 @@ GWBUF* modutil_get_complete_packets(GWBUF** p_readbuf); int modutil_MySQL_query_len(GWBUF* buf, int* nbytes_missing); void modutil_reply_parse_error(DCB* backend_dcb, char* errstr, uint32_t flags); void modutil_reply_auth_error(DCB* backend_dcb, char* errstr, uint32_t flags); +int modutil_count_statements(GWBUF* buffer); +GWBUF* modutil_create_query(char* query); GWBUF *modutil_create_mysql_err_msg( int packet_number, diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index 22d210c04..e0a6e51fe 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -63,7 +63,6 @@ #include #include - #define MYSQL_COM_QUIT 0x01 #define MYSQL_COM_INITDB 0x02 #define MYSQL_COM_FIELD_LIST 0x04 @@ -73,6 +72,7 @@ #define MYSQL_COM_STMT_SEND_LONG_DATA 0x18 #define MYSQL_COM_STMT_CLOSE 0x19 #define MYSQL_COM_STMT_RESET 0x1a +#define MYSQL_COM_CONNECT 0x1b #define REPLY_TIMEOUT_SECOND 5 #define REPLY_TIMEOUT_MILLISECOND 1 @@ -93,6 +93,7 @@ static unsigned char required_packets[] = { MYSQL_COM_STMT_SEND_LONG_DATA, MYSQL_COM_STMT_CLOSE, MYSQL_COM_STMT_RESET, + MYSQL_COM_CONNECT, 0 }; /** Defined in log_manager.cc */ @@ -168,15 +169,22 @@ typedef struct { bool waiting[2]; /* if the client is waiting for a reply */ int eof[2]; int replies[2]; /* Number of queries received */ + int reply_packets[2]; /* Number of OK, ERR, LOCAL_INFILE_REQUEST or RESULT_SET packets received */ DCB *branch_dcb; /* Client DCB for "branch" service */ SESSION *branch_session;/* The branch service session */ + TEE_INSTANCE *instance; int n_duped; /* Number of duplicated queries */ int n_rejected; /* Number of rejected queries */ int residual; /* Any outstanding SQL text */ GWBUF* tee_replybuf; /* Buffer for reply */ GWBUF* tee_partials[2]; + GWBUF* querybuf; SPINLOCK tee_lock; DCB* client_dcb; + int statements; /*< Number of statements in the query, + * used to identify and track multi-statement + * queries and that both the parent and the child + * branch are in sync. */ #ifdef SS_DEBUG long d_id; #endif @@ -184,7 +192,8 @@ typedef struct { typedef struct orphan_session_tt { - SESSION* session; + SESSION* session; /*< The child branch session whose parent was freed before + * the child session was in a suitable state. */ struct orphan_session_tt* next; }orphan_session_t; @@ -198,6 +207,7 @@ static orphan_session_t* allOrphans = NULL; static SPINLOCK orphanLock; static int packet_is_required(GWBUF *queue); static int detect_loops(TEE_INSTANCE *instance, HASHTABLE* ht, SERVICE* session); +int internal_route(DCB* dcb); static int hkfn( void* key) @@ -498,7 +508,11 @@ char *remote, *userName; { my_session->active = 1; my_session->residual = 0; + my_session->statements = 0; + my_session->tee_replybuf = NULL; my_session->client_dcb = session->client; + my_session->instance = my_instance; + spinlock_init(&my_session->tee_lock); if (my_instance->source && (remote = session_get_remote(session)) != NULL) @@ -544,7 +558,7 @@ char *remote, *userName; goto retblock; } - + if((dummy = filter_alloc("tee_dummy","tee_dummy")) == NULL) { dcb_close(dcb); @@ -641,6 +655,7 @@ skygw_log_write(LOGFILE_TRACE,"Tee close: %d", atomic_add(&debug_seq,1)); #endif if (my_session->active) { + if ((bsession = my_session->branch_session) != NULL) { CHK_SESSION(bsession); @@ -675,7 +690,8 @@ skygw_log_write(LOGFILE_TRACE,"Tee close: %d", atomic_add(&debug_seq,1)); } } - my_session->active = 0; + + my_session->active = 0; } } @@ -800,11 +816,14 @@ char *ptr; int length, rval, residual = 0; GWBUF *clone = NULL; unsigned char command = *((unsigned char*)queue->start + 4); + #ifdef SS_DEBUG skygw_log_write(LOGFILE_TRACE,"Tee routeQuery: %d : %s", atomic_add(&debug_seq,1), ((char*)queue->start + 5)); #endif + + spinlock_acquire(&my_session->tee_lock); if(!my_session->active) @@ -871,8 +890,10 @@ if(!my_session->active) } memset(my_session->replies,0,2*sizeof(int)); + memset(my_session->reply_packets,0,2*sizeof(int)); memset(my_session->eof,0,2*sizeof(int)); memset(my_session->waiting,1,2*sizeof(bool)); + my_session->statements = modutil_count_statements(queue); my_session->command = command; #ifdef SS_DEBUG spinlock_acquire(&debug_lock); @@ -938,6 +959,36 @@ if(!my_session->active) return rval; } +int count_replies(GWBUF* buffer) +{ + unsigned char* ptr = (unsigned char*)buffer->start; + unsigned char* end = (unsigned char*) buffer->end; + int pktlen, eof = 0; + int replies = 0; + while(ptr < end) + { + pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4; + if(PTR_IS_OK(ptr) || PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)) + { + replies++; + ptr += pktlen; + } + else + { + while(ptr < end && eof < 2) + { + pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4; + if(PTR_IS_EOF(ptr) || PTR_IS_ERR(ptr)) eof++; + ptr += pktlen; + } + if(eof == 2) replies++; + eof = 0; + } + } + + return replies; +} + /** * The clientReply entry point. This is passed the response buffer * to which the filter should be applied. Once processed the @@ -1012,14 +1063,16 @@ clientReply (FILTER* instance, void *session, GWBUF *reply) #ifdef SS_DEBUG else { - ss_dassert(PTR_IS_RESULTSET(ptr)); + //ss_dassert(PTR_IS_RESULTSET(ptr)); skygw_log_write_flush(LOGFILE_DEBUG,"tee.c: [%d] Waiting for a result set from %s session.", my_session->d_id, branch == PARENT?"parent":"child"); } +/* ss_dassert(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)|| PTR_IS_OK(ptr) || my_session->waiting[branch] || !my_session->multipacket); +*/ #endif } @@ -1027,6 +1080,7 @@ clientReply (FILTER* instance, void *session, GWBUF *reply) { eof = modutil_count_signal_packets(complete,my_session->use_ok,my_session->eof[branch] > 0); + my_session->eof[branch] += eof; if(my_session->eof[branch] >= min_eof) { @@ -1035,11 +1089,24 @@ clientReply (FILTER* instance, void *session, GWBUF *reply) my_session->d_id, branch == PARENT?"parent":"child"); #endif - ss_dassert(my_session->eof[branch] < 3) + //ss_dassert(my_session->eof[branch] < 3) my_session->waiting[branch] = false; } } - + + + int reply_packets = count_replies(complete); + + /** COM_SET_OPTION returns a single EOF or ERR packet*/ + if(my_session->command == 0x1b && + reply_packets == 0 && + PTR_IS_EOF(ptr)) + { + reply_packets = 1; + } + + my_session->reply_packets[branch] += reply_packets; + if(branch == PARENT) { //ss_dassert(my_session->tee_replybuf == NULL); @@ -1055,7 +1122,10 @@ clientReply (FILTER* instance, void *session, GWBUF *reply) rc = 1; mpkt = my_session->multipacket[PARENT] || my_session->multipacket[CHILD]; - + if(my_session->reply_packets[branch] < my_session->statements) + { + my_session->waiting[branch] = true; + } if(my_session->tee_replybuf != NULL) { @@ -1082,8 +1152,8 @@ clientReply (FILTER* instance, void *session, GWBUF *reply) skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing partial response set.",my_session->d_id); #endif } - else if(my_session->eof[PARENT] == min_eof && - my_session->eof[CHILD] == min_eof) + else if(my_session->eof[PARENT] >= min_eof && + my_session->eof[CHILD] >= min_eof) { route = true; #ifdef SS_DEBUG @@ -1098,7 +1168,7 @@ clientReply (FILTER* instance, void *session, GWBUF *reply) skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing single packet response.",my_session->d_id); #endif route = true; - } + } } if(route) @@ -1236,3 +1306,13 @@ int detect_loops(TEE_INSTANCE *instance,HASHTABLE* ht, SERVICE* service) return false; } + +int internal_route(DCB* dcb) +{ + GWBUF* buffer = dcb->dcb_readqueue; + + /** This was set in the newSession function*/ + TEE_SESSION* session = dcb->data; + + return routeQuery((FILTER*)session->instance,session,buffer); +} \ No newline at end of file