Fix to bug 685: http://bugs.mariadb.com/show_bug.cgi?id=685
Added the missing ERR packet detection to modutil functions.
This commit is contained in:
@ -496,26 +496,36 @@ return_packetbuf:
|
|||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Count the number of EOF packets in the buffer.
|
* Count the number of EOF, OK or ERR packets in the buffer.
|
||||||
* @param reply Buffer to use
|
* @param reply Buffer to use
|
||||||
|
* @param use_ok Whether the DEPRECATE_EOF flag is set
|
||||||
|
* @param n_found If there were previous packets found
|
||||||
* @return Number of EOF packets
|
* @return Number of EOF packets
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
modutil_count_EOF(GWBUF *reply)
|
modutil_count_signal_packets(GWBUF *reply,int use_ok, int n_found)
|
||||||
{
|
{
|
||||||
unsigned char* ptr = (unsigned char*) reply->start;
|
unsigned char* ptr = (unsigned char*) reply->start;
|
||||||
unsigned char* end = (unsigned char*) reply->end;
|
unsigned char* end = (unsigned char*) reply->end;
|
||||||
int pktlen,eof = 0;
|
int pktlen,pkt = 0;
|
||||||
|
|
||||||
while(ptr < end)
|
while(ptr < end)
|
||||||
{
|
{
|
||||||
pktlen = gw_mysql_get_byte3(ptr) + 4;
|
pktlen = gw_mysql_get_byte3(ptr) + 4;
|
||||||
if(PTR_IS_EOF(ptr))
|
|
||||||
{
|
|
||||||
eof++;
|
|
||||||
}
|
|
||||||
|
|
||||||
ptr += pktlen;
|
if(PTR_IS_ERR(ptr) || (PTR_IS_EOF(ptr) && !use_ok) || (use_ok && PTR_IS_OK(ptr)))
|
||||||
|
{
|
||||||
|
if(n_found)
|
||||||
|
{
|
||||||
|
if(ptr + pktlen >= end)
|
||||||
|
pkt++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
pkt++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ptr += pktlen;
|
||||||
}
|
}
|
||||||
return eof;
|
return pkt;
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,7 @@
|
|||||||
#include <dcb.h>
|
#include <dcb.h>
|
||||||
|
|
||||||
#define PTR_IS_RESULTSET(b) (b[0] == 0x01 && b[1] == 0x0 && b[2] == 0x0 && b[3] == 0x01)
|
#define PTR_IS_RESULTSET(b) (b[0] == 0x01 && b[1] == 0x0 && b[2] == 0x0 && b[3] == 0x01)
|
||||||
#define PTR_IS_EOF(b) (b[4] == 0xfe)
|
#define PTR_IS_EOF(b) (b[0] == 0x05 && b[1] == 0x0 && b[2] == 0x0 && b[4] == 0xfe)
|
||||||
#define PTR_IS_OK(b) (b[4] == 0x00)
|
#define PTR_IS_OK(b) (b[4] == 0x00)
|
||||||
#define PTR_IS_ERR(b) (b[4] == 0xff)
|
#define PTR_IS_ERR(b) (b[4] == 0xff)
|
||||||
#define PTR_IS_LOCAL_INFILE(b) (b[4] == 0xfb)
|
#define PTR_IS_LOCAL_INFILE(b) (b[4] == 0xfb)
|
||||||
@ -50,7 +50,6 @@ extern int modutil_send_mysql_err_packet(DCB *, int, int, int, const char *, con
|
|||||||
GWBUF* modutil_get_next_MySQL_packet(GWBUF** p_readbuf);
|
GWBUF* modutil_get_next_MySQL_packet(GWBUF** p_readbuf);
|
||||||
int modutil_MySQL_query_len(GWBUF* buf, int* nbytes_missing);
|
int modutil_MySQL_query_len(GWBUF* buf, int* nbytes_missing);
|
||||||
|
|
||||||
|
|
||||||
GWBUF *modutil_create_mysql_err_msg(
|
GWBUF *modutil_create_mysql_err_msg(
|
||||||
int packet_number,
|
int packet_number,
|
||||||
int affected_rows,
|
int affected_rows,
|
||||||
@ -58,5 +57,5 @@ GWBUF *modutil_create_mysql_err_msg(
|
|||||||
const char *statemsg,
|
const char *statemsg,
|
||||||
const char *msg);
|
const char *msg);
|
||||||
|
|
||||||
int modutil_count_EOF(GWBUF*);
|
int modutil_count_signal_packets(GWBUF*,int,int);
|
||||||
#endif
|
#endif
|
||||||
|
@ -16,6 +16,9 @@
|
|||||||
* Copyright MariaDB Corporation Ab 2014
|
* Copyright MariaDB Corporation Ab 2014
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "spinlock.h"
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @file tee.c A filter that splits the processing pipeline in two
|
* @file tee.c A filter that splits the processing pipeline in two
|
||||||
* @verbatim
|
* @verbatim
|
||||||
@ -157,6 +160,8 @@ typedef struct {
|
|||||||
|
|
||||||
FILTER_DEF* dummy_filterdef;
|
FILTER_DEF* dummy_filterdef;
|
||||||
int active; /* filter is active? */
|
int active; /* filter is active? */
|
||||||
|
bool use_ok;
|
||||||
|
bool multipacket;
|
||||||
bool waiting[2]; /* if the client is waiting for a reply */
|
bool waiting[2]; /* if the client is waiting for a reply */
|
||||||
int eof[2];
|
int eof[2];
|
||||||
int replies[2]; /* Number of queries received */
|
int replies[2]; /* Number of queries received */
|
||||||
@ -217,6 +222,8 @@ orphan_free(void* data)
|
|||||||
{
|
{
|
||||||
if(ptr->session->state == SESSION_STATE_TO_BE_FREED)
|
if(ptr->session->state == SESSION_STATE_TO_BE_FREED)
|
||||||
{
|
{
|
||||||
|
|
||||||
|
|
||||||
if(ptr == allOrphans)
|
if(ptr == allOrphans)
|
||||||
{
|
{
|
||||||
tmp = ptr;
|
tmp = ptr;
|
||||||
@ -233,6 +240,17 @@ orphan_free(void* data)
|
|||||||
tmp = ptr;
|
tmp = ptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The session has been unlinked from all the DCBs and it is ready to be freed.
|
||||||
|
*/
|
||||||
|
|
||||||
|
if(ptr->session->state == SESSION_STATE_STOPPING &&
|
||||||
|
ptr->session->refcount == 0 && ptr->session->client == NULL)
|
||||||
|
{
|
||||||
|
ptr->session->state = SESSION_STATE_TO_BE_FREED;
|
||||||
}
|
}
|
||||||
#ifdef SS_DEBUG
|
#ifdef SS_DEBUG
|
||||||
else if(ptr->session->state == SESSION_STATE_STOPPING)
|
else if(ptr->session->state == SESSION_STATE_STOPPING)
|
||||||
@ -574,6 +592,9 @@ char *remote, *userName;
|
|||||||
my_session->branch_session = ses;
|
my_session->branch_session = ses;
|
||||||
my_session->branch_dcb = dcb;
|
my_session->branch_dcb = dcb;
|
||||||
my_session->dummy_filterdef = dummy;
|
my_session->dummy_filterdef = dummy;
|
||||||
|
|
||||||
|
MySQLProtocol* protocol = (MySQLProtocol*)session->client->protocol;
|
||||||
|
my_session->use_ok = protocol->client_capabilities & (1 << 6);
|
||||||
free(dummy_upstream);
|
free(dummy_upstream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -637,15 +658,16 @@ freeSession(FILTER *instance, void *session)
|
|||||||
{
|
{
|
||||||
TEE_SESSION *my_session = (TEE_SESSION *)session;
|
TEE_SESSION *my_session = (TEE_SESSION *)session;
|
||||||
SESSION* ses = my_session->branch_session;
|
SESSION* ses = my_session->branch_session;
|
||||||
|
session_state_t state;
|
||||||
if (ses != NULL)
|
if (ses != NULL)
|
||||||
{
|
{
|
||||||
if (ses->state == SESSION_STATE_ROUTER_READY)
|
state = ses->state;
|
||||||
|
|
||||||
|
if (state == SESSION_STATE_ROUTER_READY)
|
||||||
{
|
{
|
||||||
session_free(ses);
|
session_free(ses);
|
||||||
}
|
}
|
||||||
|
else if (state == SESSION_STATE_TO_BE_FREED)
|
||||||
if (ses->state == SESSION_STATE_TO_BE_FREED)
|
|
||||||
{
|
{
|
||||||
/** Free branch router session */
|
/** Free branch router session */
|
||||||
ses->service->router->freeSession(
|
ses->service->router->freeSession(
|
||||||
@ -657,7 +679,7 @@ SESSION* ses = my_session->branch_session;
|
|||||||
/** This indicates that branch session is not available anymore */
|
/** This indicates that branch session is not available anymore */
|
||||||
my_session->branch_session = NULL;
|
my_session->branch_session = NULL;
|
||||||
}
|
}
|
||||||
else if(ses->state == SESSION_STATE_STOPPING)
|
else if(state == SESSION_STATE_STOPPING)
|
||||||
{
|
{
|
||||||
orphan_session_t* orphan;
|
orphan_session_t* orphan;
|
||||||
if((orphan = malloc(sizeof(orphan_session_t))) == NULL)
|
if((orphan = malloc(sizeof(orphan_session_t))) == NULL)
|
||||||
@ -672,17 +694,14 @@ SESSION* ses = my_session->branch_session;
|
|||||||
allOrphans = orphan;
|
allOrphans = orphan;
|
||||||
spinlock_release(&orphanLock);
|
spinlock_release(&orphanLock);
|
||||||
}
|
}
|
||||||
if(ses->refcount == 0)
|
|
||||||
{
|
|
||||||
ss_dassert(ses->refcount == 0 && ses->client == NULL);
|
|
||||||
ses->state = SESSION_STATE_TO_BE_FREED;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (my_session->dummy_filterdef)
|
if (my_session->dummy_filterdef)
|
||||||
{
|
{
|
||||||
filter_free(my_session->dummy_filterdef);
|
filter_free(my_session->dummy_filterdef);
|
||||||
}
|
}
|
||||||
|
if(my_session->tee_replybuf)
|
||||||
|
gwbuf_free(my_session->tee_replybuf);
|
||||||
free(session);
|
free(session);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
@ -743,7 +762,7 @@ TEE_SESSION *my_session = (TEE_SESSION *)session;
|
|||||||
char *ptr;
|
char *ptr;
|
||||||
int length, rval, residual = 0;
|
int length, rval, residual = 0;
|
||||||
GWBUF *clone = NULL;
|
GWBUF *clone = NULL;
|
||||||
|
unsigned char command = *((unsigned char*)queue->start + 4);
|
||||||
if (my_session->branch_session &&
|
if (my_session->branch_session &&
|
||||||
my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
|
my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
|
||||||
{
|
{
|
||||||
@ -786,8 +805,8 @@ GWBUF *clone = NULL;
|
|||||||
|
|
||||||
memset(my_session->replies,0,2*sizeof(int));
|
memset(my_session->replies,0,2*sizeof(int));
|
||||||
memset(my_session->eof,0,2*sizeof(int));
|
memset(my_session->eof,0,2*sizeof(int));
|
||||||
memset(my_session->waiting,0,2*sizeof(bool));
|
memset(my_session->waiting,1,2*sizeof(bool));
|
||||||
|
my_session->multipacket = command == 0x03 || command == 0x16 || command == 0x17;
|
||||||
rval = my_session->down.routeQuery(my_session->down.instance,
|
rval = my_session->down.routeQuery(my_session->down.instance,
|
||||||
my_session->down.session,
|
my_session->down.session,
|
||||||
queue);
|
queue);
|
||||||
@ -837,32 +856,46 @@ GWBUF *clone = NULL;
|
|||||||
static int
|
static int
|
||||||
clientReply (FILTER* instance, void *session, GWBUF *reply)
|
clientReply (FILTER* instance, void *session, GWBUF *reply)
|
||||||
{
|
{
|
||||||
int rc, branch;
|
int rc, branch, eof;
|
||||||
TEE_SESSION *my_session = (TEE_SESSION *) session;
|
TEE_SESSION *my_session = (TEE_SESSION *) session;
|
||||||
|
|
||||||
spinlock_acquire(&my_session->tee_lock);
|
spinlock_acquire(&my_session->tee_lock);
|
||||||
|
|
||||||
ss_dassert(my_session->active);
|
ss_dassert(my_session->active);
|
||||||
|
|
||||||
branch = instance == NULL ? CHILD : PARENT;
|
branch = instance == NULL ? CHILD : PARENT;
|
||||||
unsigned char *ptr = (unsigned char*)reply->start;
|
unsigned char *ptr = (unsigned char*)reply->start;
|
||||||
|
|
||||||
if(my_session->replies[branch] == 0)
|
if(my_session->replies[branch] == 0)
|
||||||
{
|
{
|
||||||
if(PTR_IS_RESULTSET(ptr))
|
/* Reply is in a single packet if it is an OK, ERR or LOCAL_INFILE packet.
|
||||||
|
* Otherwise the reply is a result set and the amount of packets is unknown.
|
||||||
|
*/
|
||||||
|
if(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) ||
|
||||||
|
PTR_IS_OK(ptr) || !my_session->multipacket )
|
||||||
{
|
{
|
||||||
my_session->waiting[branch] = true;
|
my_session->waiting[branch] = false;
|
||||||
my_session->eof[branch] = 0;
|
|
||||||
}
|
}
|
||||||
|
#ifdef SS_DEBUG
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ss_dassert(PTR_IS_RESULTSET(ptr));
|
||||||
|
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c: Waiting for a result set from %s session.",branch == PARENT?"parent":"child");
|
||||||
|
}
|
||||||
|
ss_dassert(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)||
|
||||||
|
PTR_IS_OK(ptr) || my_session->waiting[branch] ||
|
||||||
|
!my_session->multipacket);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
if(my_session->waiting[branch])
|
if(my_session->waiting[branch])
|
||||||
{
|
{
|
||||||
int eof = modutil_count_EOF(reply);
|
|
||||||
|
eof = modutil_count_signal_packets(reply,my_session->use_ok,my_session->eof[branch] > 0);
|
||||||
|
|
||||||
if((my_session->eof[branch] += eof) >= 2)
|
if((my_session->eof[branch] += eof) >= 2)
|
||||||
{
|
{
|
||||||
my_session->eof[branch] = 0;
|
ss_dassert(my_session->eof[branch] < 3)
|
||||||
my_session->waiting[branch] = false;
|
my_session->waiting[branch] = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -883,7 +916,19 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
|
|||||||
(my_session->branch_session == NULL ||
|
(my_session->branch_session == NULL ||
|
||||||
my_session->waiting[PARENT] ||
|
my_session->waiting[PARENT] ||
|
||||||
(!my_session->waiting[CHILD] && !my_session->waiting[PARENT])))
|
(!my_session->waiting[CHILD] && !my_session->waiting[PARENT])))
|
||||||
{
|
{
|
||||||
|
#ifdef SS_DEBUG
|
||||||
|
skygw_log_write_flush(LOGFILE_DEBUG, "tee.c: Routing buffer '%p' parent(waiting [%s] replies [%d] eof[%d])"
|
||||||
|
" child(waiting [%s] replies[%d] eof [%d])",
|
||||||
|
my_session->tee_replybuf,
|
||||||
|
my_session->waiting[PARENT] ? "true":"false",
|
||||||
|
my_session->replies[PARENT],
|
||||||
|
my_session->eof[PARENT],
|
||||||
|
my_session->waiting[CHILD]?"true":"false",
|
||||||
|
my_session->replies[CHILD],
|
||||||
|
my_session->eof[CHILD]);
|
||||||
|
#endif
|
||||||
|
|
||||||
rc = my_session->up.clientReply (
|
rc = my_session->up.clientReply (
|
||||||
my_session->up.instance,
|
my_session->up.instance,
|
||||||
my_session->up.session,
|
my_session->up.session,
|
||||||
|
Reference in New Issue
Block a user