Added tee filter multi-statement support.

This commit is contained in:
Markus Makela
2015-03-03 13:31:12 +02:00
parent 0e133cf82a
commit 3b76ed43c4
3 changed files with 189 additions and 12 deletions

View File

@ -63,7 +63,6 @@
#include <mysql_client_server_protocol.h>
#include <housekeeper.h>
#define MYSQL_COM_QUIT 0x01
#define MYSQL_COM_INITDB 0x02
#define MYSQL_COM_FIELD_LIST 0x04
@ -73,6 +72,7 @@
#define MYSQL_COM_STMT_SEND_LONG_DATA 0x18
#define MYSQL_COM_STMT_CLOSE 0x19
#define MYSQL_COM_STMT_RESET 0x1a
#define MYSQL_COM_CONNECT 0x1b
#define REPLY_TIMEOUT_SECOND 5
#define REPLY_TIMEOUT_MILLISECOND 1
@ -93,6 +93,7 @@ static unsigned char required_packets[] = {
MYSQL_COM_STMT_SEND_LONG_DATA,
MYSQL_COM_STMT_CLOSE,
MYSQL_COM_STMT_RESET,
MYSQL_COM_CONNECT,
0 };
/** Defined in log_manager.cc */
@ -168,15 +169,22 @@ typedef struct {
bool waiting[2]; /* if the client is waiting for a reply */
int eof[2];
int replies[2]; /* Number of queries received */
int reply_packets[2]; /* Number of OK, ERR, LOCAL_INFILE_REQUEST or RESULT_SET packets received */
DCB *branch_dcb; /* Client DCB for "branch" service */
SESSION *branch_session;/* The branch service session */
TEE_INSTANCE *instance;
int n_duped; /* Number of duplicated queries */
int n_rejected; /* Number of rejected queries */
int residual; /* Any outstanding SQL text */
GWBUF* tee_replybuf; /* Buffer for reply */
GWBUF* tee_partials[2];
GWBUF* querybuf;
SPINLOCK tee_lock;
DCB* client_dcb;
int statements; /*< Number of statements in the query,
* used to identify and track multi-statement
* queries and that both the parent and the child
* branch are in sync. */
#ifdef SS_DEBUG
long d_id;
#endif
@ -184,7 +192,8 @@ typedef struct {
typedef struct orphan_session_tt
{
SESSION* session;
SESSION* session; /*< The child branch session whose parent was freed before
* the child session was in a suitable state. */
struct orphan_session_tt* next;
}orphan_session_t;
@ -198,6 +207,7 @@ static orphan_session_t* allOrphans = NULL;
static SPINLOCK orphanLock;
static int packet_is_required(GWBUF *queue);
static int detect_loops(TEE_INSTANCE *instance, HASHTABLE* ht, SERVICE* session);
int internal_route(DCB* dcb);
static int hkfn(
void* key)
@ -498,7 +508,11 @@ char *remote, *userName;
{
my_session->active = 1;
my_session->residual = 0;
my_session->statements = 0;
my_session->tee_replybuf = NULL;
my_session->client_dcb = session->client;
my_session->instance = my_instance;
spinlock_init(&my_session->tee_lock);
if (my_instance->source &&
(remote = session_get_remote(session)) != NULL)
@ -544,7 +558,7 @@ char *remote, *userName;
goto retblock;
}
if((dummy = filter_alloc("tee_dummy","tee_dummy")) == NULL)
{
dcb_close(dcb);
@ -641,6 +655,7 @@ skygw_log_write(LOGFILE_TRACE,"Tee close: %d", atomic_add(&debug_seq,1));
#endif
if (my_session->active)
{
if ((bsession = my_session->branch_session) != NULL)
{
CHK_SESSION(bsession);
@ -675,7 +690,8 @@ skygw_log_write(LOGFILE_TRACE,"Tee close: %d", atomic_add(&debug_seq,1));
}
}
my_session->active = 0;
my_session->active = 0;
}
}
@ -800,11 +816,14 @@ char *ptr;
int length, rval, residual = 0;
GWBUF *clone = NULL;
unsigned char command = *((unsigned char*)queue->start + 4);
#ifdef SS_DEBUG
skygw_log_write(LOGFILE_TRACE,"Tee routeQuery: %d : %s",
atomic_add(&debug_seq,1),
((char*)queue->start + 5));
#endif
spinlock_acquire(&my_session->tee_lock);
if(!my_session->active)
@ -871,8 +890,10 @@ if(!my_session->active)
}
memset(my_session->replies,0,2*sizeof(int));
memset(my_session->reply_packets,0,2*sizeof(int));
memset(my_session->eof,0,2*sizeof(int));
memset(my_session->waiting,1,2*sizeof(bool));
my_session->statements = modutil_count_statements(queue);
my_session->command = command;
#ifdef SS_DEBUG
spinlock_acquire(&debug_lock);
@ -938,6 +959,36 @@ if(!my_session->active)
return rval;
}
int count_replies(GWBUF* buffer)
{
unsigned char* ptr = (unsigned char*)buffer->start;
unsigned char* end = (unsigned char*) buffer->end;
int pktlen, eof = 0;
int replies = 0;
while(ptr < end)
{
pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4;
if(PTR_IS_OK(ptr) || PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr))
{
replies++;
ptr += pktlen;
}
else
{
while(ptr < end && eof < 2)
{
pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4;
if(PTR_IS_EOF(ptr) || PTR_IS_ERR(ptr)) eof++;
ptr += pktlen;
}
if(eof == 2) replies++;
eof = 0;
}
}
return replies;
}
/**
* The clientReply entry point. This is passed the response buffer
* to which the filter should be applied. Once processed the
@ -1012,14 +1063,16 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
#ifdef SS_DEBUG
else
{
ss_dassert(PTR_IS_RESULTSET(ptr));
//ss_dassert(PTR_IS_RESULTSET(ptr));
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c: [%d] Waiting for a result set from %s session.",
my_session->d_id,
branch == PARENT?"parent":"child");
}
/*
ss_dassert(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)||
PTR_IS_OK(ptr) || my_session->waiting[branch] ||
!my_session->multipacket);
*/
#endif
}
@ -1027,6 +1080,7 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
{
eof = modutil_count_signal_packets(complete,my_session->use_ok,my_session->eof[branch] > 0);
my_session->eof[branch] += eof;
if(my_session->eof[branch] >= min_eof)
{
@ -1035,11 +1089,24 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
my_session->d_id,
branch == PARENT?"parent":"child");
#endif
ss_dassert(my_session->eof[branch] < 3)
//ss_dassert(my_session->eof[branch] < 3)
my_session->waiting[branch] = false;
}
}
int reply_packets = count_replies(complete);
/** COM_SET_OPTION returns a single EOF or ERR packet*/
if(my_session->command == 0x1b &&
reply_packets == 0 &&
PTR_IS_EOF(ptr))
{
reply_packets = 1;
}
my_session->reply_packets[branch] += reply_packets;
if(branch == PARENT)
{
//ss_dassert(my_session->tee_replybuf == NULL);
@ -1055,7 +1122,10 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
rc = 1;
mpkt = my_session->multipacket[PARENT] || my_session->multipacket[CHILD];
if(my_session->reply_packets[branch] < my_session->statements)
{
my_session->waiting[branch] = true;
}
if(my_session->tee_replybuf != NULL)
{
@ -1082,8 +1152,8 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing partial response set.",my_session->d_id);
#endif
}
else if(my_session->eof[PARENT] == min_eof &&
my_session->eof[CHILD] == min_eof)
else if(my_session->eof[PARENT] >= min_eof &&
my_session->eof[CHILD] >= min_eof)
{
route = true;
#ifdef SS_DEBUG
@ -1098,7 +1168,7 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing single packet response.",my_session->d_id);
#endif
route = true;
}
}
}
if(route)
@ -1236,3 +1306,13 @@ int detect_loops(TEE_INSTANCE *instance,HASHTABLE* ht, SERVICE* service)
return false;
}
int internal_route(DCB* dcb)
{
GWBUF* buffer = dcb->dcb_readqueue;
/** This was set in the newSession function*/
TEE_SESSION* session = dcb->data;
return routeQuery((FILTER*)session->instance,session,buffer);
}