Added support for multiple result sets.
This commit is contained in:
parent
2aaa367c37
commit
22849f7c90
@ -558,7 +558,7 @@ GWBUF* modutil_get_complete_packets(GWBUF** p_readbuf)
|
||||
* @return Number of EOF packets
|
||||
*/
|
||||
int
|
||||
modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found)
|
||||
modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found, int* more)
|
||||
{
|
||||
unsigned char* ptr = (unsigned char*) reply->start;
|
||||
unsigned char* end = (unsigned char*) reply->end;
|
||||
@ -566,6 +566,7 @@ modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found)
|
||||
int pktlen, eof = 0, err = 0;
|
||||
int errlen = 0, eoflen = 0;
|
||||
int iserr = 0, iseof = 0;
|
||||
bool moreresults = false;
|
||||
while(ptr < end)
|
||||
{
|
||||
|
||||
@ -587,6 +588,7 @@ modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found)
|
||||
|
||||
if((ptr + pktlen) > end || (eof + n_found) >= 2)
|
||||
{
|
||||
moreresults = PTR_EOF_MORE_RESULTS(ptr);
|
||||
ptr = prev;
|
||||
break;
|
||||
}
|
||||
@ -616,6 +618,8 @@ modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found)
|
||||
}
|
||||
}
|
||||
|
||||
*more = moreresults;
|
||||
|
||||
return(eof + err);
|
||||
}
|
||||
|
||||
|
@ -40,6 +40,7 @@
|
||||
#define PTR_IS_OK(b) (b[4] == 0x00)
|
||||
#define PTR_IS_ERR(b) (b[4] == 0xff)
|
||||
#define PTR_IS_LOCAL_INFILE(b) (b[4] == 0xfb)
|
||||
#define PTR_EOF_MORE_RESULTS(b) ((PTR_IS_EOF(b) && ptr[7] & 0x08))
|
||||
|
||||
extern int modutil_is_SQL(GWBUF *);
|
||||
extern int modutil_is_SQL_prepare(GWBUF *);
|
||||
@ -64,5 +65,5 @@ GWBUF *modutil_create_mysql_err_msg(
|
||||
const char *statemsg,
|
||||
const char *msg);
|
||||
|
||||
int modutil_count_signal_packets(GWBUF*,int,int);
|
||||
int modutil_count_signal_packets(GWBUF*,int,int,int*);
|
||||
#endif
|
||||
|
@ -164,6 +164,7 @@ typedef struct {
|
||||
FILTER_DEF* dummy_filterdef;
|
||||
int active; /* filter is active? */
|
||||
bool use_ok;
|
||||
int client_multistatement;
|
||||
bool multipacket[2];
|
||||
unsigned char command;
|
||||
bool waiting[2]; /* if the client is waiting for a reply */
|
||||
@ -512,6 +513,7 @@ char *remote, *userName;
|
||||
my_session->tee_replybuf = NULL;
|
||||
my_session->client_dcb = session->client;
|
||||
my_session->instance = my_instance;
|
||||
my_session->client_multistatement = false;
|
||||
|
||||
spinlock_init(&my_session->tee_lock);
|
||||
if (my_instance->source &&
|
||||
@ -877,6 +879,8 @@ if(!my_session->active)
|
||||
|
||||
switch(command)
|
||||
{
|
||||
case 0x1b:
|
||||
my_session->client_multistatement = *((unsigned char*) queue->start + 5);
|
||||
case 0x03:
|
||||
case 0x16:
|
||||
case 0x17:
|
||||
@ -989,6 +993,43 @@ int count_replies(GWBUF* buffer)
|
||||
return replies;
|
||||
}
|
||||
|
||||
int lenenc_length(uint8_t* ptr)
|
||||
{
|
||||
char val = *ptr;
|
||||
if(val < 251)
|
||||
return 1;
|
||||
else if(val == 0xfc)
|
||||
return 3;
|
||||
else if(val == 0xfd)
|
||||
return 4;
|
||||
else
|
||||
return 9;
|
||||
}
|
||||
|
||||
uint16_t get_response_flags(uint8_t* datastart, bool ok_packet)
|
||||
{
|
||||
uint8_t* ptr = datastart;
|
||||
uint16_t rval = 0;
|
||||
int pktlen = gw_mysql_get_byte3(ptr);
|
||||
|
||||
ptr += 4;
|
||||
|
||||
if(ok_packet)
|
||||
{
|
||||
ptr += lenenc_length(ptr);
|
||||
ptr += lenenc_length(ptr);
|
||||
memcpy(&rval,ptr,sizeof(uint8_t)*2);
|
||||
}
|
||||
else
|
||||
{
|
||||
/** This is an EOF packet*/
|
||||
ptr += 2;
|
||||
memcpy(&rval,ptr,sizeof(uint8_t)*2);
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* The clientReply entry point. This is passed the response buffer
|
||||
* to which the filter should be applied. Once processed the
|
||||
@ -1007,7 +1048,9 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
|
||||
bool route = false,mpkt;
|
||||
GWBUF *complete = NULL;
|
||||
unsigned char *ptr;
|
||||
uint16_t flags = 0;
|
||||
int min_eof = my_session->command != 0x04 ? 2 : 1;
|
||||
int more_results = 0;
|
||||
#ifdef SS_DEBUG
|
||||
ptr = (unsigned char*) reply->start;
|
||||
skygw_log_write(LOGFILE_TRACE,"Tee clientReply [%s] [%s] [%s]: %d",
|
||||
@ -1059,29 +1102,28 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
|
||||
{
|
||||
my_session->waiting[branch] = false;
|
||||
my_session->multipacket[branch] = false;
|
||||
if(PTR_IS_OK(ptr))
|
||||
{
|
||||
flags = get_response_flags(ptr,true);
|
||||
more_results = (flags & 0x08) && my_session->client_multistatement;
|
||||
}
|
||||
}
|
||||
#ifdef SS_DEBUG
|
||||
else
|
||||
{
|
||||
//ss_dassert(PTR_IS_RESULTSET(ptr));
|
||||
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c: [%d] Waiting for a result set from %s session.",
|
||||
my_session->d_id,
|
||||
branch == PARENT?"parent":"child");
|
||||
}
|
||||
/*
|
||||
ss_dassert(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)||
|
||||
PTR_IS_OK(ptr) || my_session->waiting[branch] ||
|
||||
!my_session->multipacket);
|
||||
*/
|
||||
#endif
|
||||
}
|
||||
|
||||
if(my_session->waiting[branch])
|
||||
{
|
||||
|
||||
eof = modutil_count_signal_packets(complete,my_session->use_ok,my_session->eof[branch] > 0);
|
||||
|
||||
eof = modutil_count_signal_packets(complete,my_session->use_ok,my_session->eof[branch] > 0,&more_results);
|
||||
more_results &= my_session->client_multistatement;
|
||||
my_session->eof[branch] += eof;
|
||||
|
||||
if(my_session->eof[branch] >= min_eof)
|
||||
{
|
||||
#ifdef SS_DEBUG
|
||||
@ -1089,43 +1131,27 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
|
||||
my_session->d_id,
|
||||
branch == PARENT?"parent":"child");
|
||||
#endif
|
||||
//ss_dassert(my_session->eof[branch] < 3)
|
||||
my_session->waiting[branch] = false;
|
||||
my_session->waiting[branch] = more_results;
|
||||
if(more_results)
|
||||
{
|
||||
my_session->eof[branch] = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int reply_packets = count_replies(complete);
|
||||
|
||||
/** COM_SET_OPTION returns a single EOF or ERR packet*/
|
||||
if(my_session->command == 0x1b &&
|
||||
reply_packets == 0 &&
|
||||
PTR_IS_EOF(ptr))
|
||||
{
|
||||
reply_packets = 1;
|
||||
}
|
||||
|
||||
my_session->reply_packets[branch] += reply_packets;
|
||||
|
||||
if(branch == PARENT)
|
||||
{
|
||||
//ss_dassert(my_session->tee_replybuf == NULL);
|
||||
my_session->tee_replybuf = gwbuf_append(my_session->tee_replybuf,complete);
|
||||
}
|
||||
else
|
||||
{
|
||||
if(complete)
|
||||
gwbuf_free(complete);
|
||||
gwbuf_free(complete);
|
||||
}
|
||||
|
||||
my_session->replies[branch]++;
|
||||
rc = 1;
|
||||
mpkt = my_session->multipacket[PARENT] || my_session->multipacket[CHILD];
|
||||
|
||||
if(my_session->reply_packets[branch] < my_session->statements)
|
||||
{
|
||||
my_session->waiting[branch] = true;
|
||||
}
|
||||
|
||||
if(my_session->tee_replybuf != NULL)
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user