Added utility functions to skygw_utils and cleaned up tee filter.

This commit is contained in:
Markus Makela
2015-04-15 12:42:28 +03:00
parent 786f34cf49
commit 051d891680
3 changed files with 406 additions and 282 deletions

View File

@ -179,13 +179,10 @@ typedef struct {
int residual; /* Any outstanding SQL text */
GWBUF* tee_replybuf; /* Buffer for reply */
GWBUF* tee_partials[2];
GWBUF* querybuf;
GWBUF* queue;
SPINLOCK tee_lock;
DCB* client_dcb;
int statements; /*< Number of statements in the query,
* used to identify and track multi-statement
* queries and that both the parent and the child
* branch are in sync. */
#ifdef SS_DEBUG
long d_id;
#endif
@ -209,30 +206,12 @@ static SPINLOCK orphanLock;
static int packet_is_required(GWBUF *queue);
static int detect_loops(TEE_INSTANCE *instance, HASHTABLE* ht, SERVICE* session);
int internal_route(DCB* dcb);
static int hkfn(
void* key)
{
if(key == NULL){
return 0;
}
unsigned int hash = 0,c = 0;
char* ptr = (char*)key;
while((c = *ptr++)){
hash = c + (hash << 6) + (hash << 16) - hash;
}
return *(int *)key;
}
static int hcfn(
void* v1,
void* v2)
{
char* i1 = (char*) v1;
char* i2 = (char*) v2;
return strcmp(i1,i2);
}
GWBUF* clone_query(TEE_INSTANCE* my_instance, TEE_SESSION* my_session, GWBUF* buffer);
int route_single_query(TEE_INSTANCE* my_instance,
TEE_SESSION* my_session,
GWBUF* buffer,
GWBUF* clone);
int reset_session_state(TEE_SESSION* my_session, GWBUF* buffer);
static void
orphan_free(void* data)
@ -492,7 +471,7 @@ char *remote, *userName;
goto retblock;
}
HASHTABLE* ht = hashtable_alloc(100,hkfn,hcfn);
HASHTABLE* ht = hashtable_alloc(100,simple_str_hash,strcmp);
bool is_loop = detect_loops(my_instance,ht,session->service);
hashtable_free(ht);
@ -509,12 +488,11 @@ char *remote, *userName;
{
my_session->active = 1;
my_session->residual = 0;
my_session->statements = 0;
my_session->tee_replybuf = NULL;
my_session->client_dcb = session->client;
my_session->instance = my_instance;
my_session->client_multistatement = false;
my_session->queue = NULL;
spinlock_init(&my_session->tee_lock);
if (my_instance->source &&
(remote = session_get_remote(session)) != NULL)
@ -815,9 +793,10 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
TEE_INSTANCE *my_instance = (TEE_INSTANCE *)instance;
TEE_SESSION *my_session = (TEE_SESSION *)session;
char *ptr;
int length, rval, residual = 0;
GWBUF *clone = NULL;
unsigned char command = *((unsigned char*)queue->start + 4);
int rval;
GWBUF *buffer = NULL, *clone = NULL;
unsigned char command = gwbuf_length(queue) >= 5 ?
*((unsigned char*)queue->start + 4) : 1;
#ifdef SS_DEBUG
skygw_log_write(LOGFILE_TRACE,"Tee routeQuery: %d : %s",
@ -832,134 +811,39 @@ if(!my_session->active)
{
skygw_log_write(LOGFILE_TRACE, "Tee: Received a reply when the session was closed.");
gwbuf_free(queue);
rval = 0;
goto retblock;
spinlock_release(&my_session->tee_lock);
return 0;
}
if (my_session->branch_session &&
my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
if(my_session->queue)
{
if (my_session->residual)
{
clone = gwbuf_clone_all(queue);
if (my_session->residual < GWBUF_LENGTH(clone))
{
GWBUF_RTRIM(clone, GWBUF_LENGTH(clone) - residual);
my_session->queue = gwbuf_append(my_session->queue,queue);
buffer = modutil_get_next_MySQL_packet(&my_session->queue);
}
my_session->residual -= GWBUF_LENGTH(clone);
if (my_session->residual < 0)
else
{
my_session->residual = 0;
}
}
else if (my_session->active && (ptr = modutil_get_SQL(queue)) != NULL)
{
if ((my_instance->match == NULL ||
regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) &&
(my_instance->nomatch == NULL ||
regexec(&my_instance->nore,ptr,0,NULL, 0) != 0))
{
length = modutil_MySQL_query_len(queue, &residual);
clone = gwbuf_clone_all(queue);
my_session->residual = residual;
}
free(ptr);
}
else if (packet_is_required(queue))
{
clone = gwbuf_clone_all(queue);
}
buffer = modutil_get_next_MySQL_packet(&queue);
my_session->queue = queue;
}
if(buffer == NULL)
{
spinlock_release(&my_session->tee_lock);
return 1;
}
clone = clone_query(my_instance, my_session,buffer);
spinlock_release(&my_session->tee_lock);
/* Pass the query downstream */
/* Reset session state */
if(!reset_session_state(my_session,buffer))
return 0;
switch(command)
{
case 0x1b:
my_session->client_multistatement = *((unsigned char*) queue->start + 5);
case 0x03:
case 0x16:
case 0x17:
case 0x04:
case 0x0a:
memset(my_session->multipacket,(char)true,2*sizeof(bool));
break;
default:
memset(my_session->multipacket,(char)false,2*sizeof(bool));
break;
}
memset(my_session->replies,0,2*sizeof(int));
memset(my_session->reply_packets,0,2*sizeof(int));
memset(my_session->eof,0,2*sizeof(int));
memset(my_session->waiting,1,2*sizeof(bool));
my_session->statements = modutil_count_statements(queue);
my_session->command = command;
#ifdef SS_DEBUG
spinlock_acquire(&debug_lock);
my_session->d_id = ++debug_id;
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] command [%x]",
my_session->d_id,
my_session->command);
if(command == 0x03)
{
char* tmpstr = modutil_get_SQL(queue);
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c query: '%s'",
tmpstr);
free(tmpstr);
}
spinlock_release(&debug_lock);
#endif
/** Route query downstream */
spinlock_acquire(&my_session->tee_lock);
if(!my_session->active ||
my_session->branch_session == NULL ||
my_session->branch_session->state != SESSION_STATE_ROUTER_READY)
{
rval = 0;
my_session->active = 0;
goto retblock;
}
rval = my_session->down.routeQuery(my_session->down.instance,
my_session->down.session,
queue);
if (clone)
{
my_session->n_duped++;
if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
{
SESSION_ROUTE_QUERY(my_session->branch_session, clone);
}
else
{
/** Close tee session */
my_session->active = 0;
rval = 0;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Closed tee filter session: Child session in invalid state.")));
gwbuf_free(clone);
}
}
else
{
if (my_session->active)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Closed tee filter session: Child session is NULL.")));
my_session->active = 0;
rval = 0;
}
my_session->n_rejected++;
}
retblock:
rval = route_single_query(my_instance,my_session,buffer,clone);
spinlock_release(&my_session->tee_lock);
return rval;
}
@ -1059,6 +943,7 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
PTR_IS_ERR(ptr) ? "ERR" : PTR_IS_OK(ptr) ? "OK" : "RSET",
atomic_add(&debug_seq,1));
#endif
spinlock_acquire(&my_session->tee_lock);
if(!my_session->active)
@ -1228,8 +1113,23 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
my_session->tee_replybuf);
my_session->tee_replybuf = NULL;
}
if(my_session->queue &&
!my_session->waiting[PARENT] &&
!my_session->waiting[CHILD])
{
GWBUF* buffer = modutil_get_next_MySQL_packet(&my_session->queue);
GWBUF* clone = clone_query(my_session->instance,my_session,buffer);
reset_session_state(my_session,buffer);
route_single_query(my_session->instance,my_session,buffer,clone);
LOGIF(LT,(skygw_log_write(LT,"tee: routing queued query")));
}
retblock:
spinlock_release(&my_session->tee_lock);
return rc;
}
@ -1353,3 +1253,151 @@ int internal_route(DCB* dcb)
return routeQuery((FILTER*)session->instance,session,buffer);
}
/**
*
* @param my_instance
* @param my_session
* @param buffer
* @return
*/
GWBUF* clone_query(TEE_INSTANCE* my_instance, TEE_SESSION* my_session, GWBUF* buffer)
{
GWBUF* clone = NULL;
int length, residual;
char* ptr;
if (my_session->branch_session &&
my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
{
if (my_session->residual)
{
clone = gwbuf_clone_all(buffer);
if (my_session->residual < GWBUF_LENGTH(clone))
{
GWBUF_RTRIM(clone, GWBUF_LENGTH(clone) - residual);
}
my_session->residual -= GWBUF_LENGTH(clone);
if (my_session->residual < 0)
{
my_session->residual = 0;
}
}
else if (my_session->active && (ptr = modutil_get_SQL(buffer)) != NULL)
{
if ((my_instance->match == NULL ||
regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) &&
(my_instance->nomatch == NULL ||
regexec(&my_instance->nore,ptr,0,NULL, 0) != 0))
{
length = modutil_MySQL_query_len(buffer, &residual);
clone = gwbuf_clone_all(buffer);
my_session->residual = residual;
}
free(ptr);
}
else if (packet_is_required(buffer))
{
clone = gwbuf_clone_all(buffer);
}
}
return clone;
}
/**
* Route the main query downstream along the main filter chain and possibly route
* a clone of the buffer to the branch session. If the clone buffer is NULL, nothing
* is routed to the branch session.
* @param my_instance Tee instance
* @param my_session Tee session
* @param buffer Main buffer
* @param clone Cloned buffer
* @return 1 on success, 0 on failure.
*/
int route_single_query(TEE_INSTANCE* my_instance, TEE_SESSION* my_session, GWBUF* buffer, GWBUF* clone)
{
int rval = 0;
if(!my_session->active ||
my_session->branch_session == NULL ||
my_session->branch_session->state != SESSION_STATE_ROUTER_READY)
{
rval = 0;
my_session->active = 0;
return rval;
}
rval = my_session->down.routeQuery(my_session->down.instance,
my_session->down.session,
buffer);
if (clone)
{
my_session->n_duped++;
if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
{
SESSION_ROUTE_QUERY(my_session->branch_session, clone);
}
else
{
/** Close tee session */
my_session->active = 0;
rval = 0;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Closed tee filter session: Child session in invalid state.")));
gwbuf_free(clone);
}
}
else
{
if (my_session->active)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Closed tee filter session: Child session is NULL.")));
my_session->active = 0;
rval = 0;
}
my_session->n_rejected++;
}
return rval;
}
/**
* Reset the session's internal counters.
* @param my_session Tee session
* @param buffer Buffer with the query of the main branch in it
* @return 1 on success, 0 on error
*/
int reset_session_state(TEE_SESSION* my_session, GWBUF* buffer)
{
if(gwbuf_length(buffer) < 5)
return 0;
unsigned char command = *((unsigned char*)buffer->start + 4);
switch(command)
{
case 0x1b:
my_session->client_multistatement = *((unsigned char*) buffer->start + 5);
case 0x03:
case 0x16:
case 0x17:
case 0x04:
case 0x0a:
memset(my_session->multipacket,(char)true,2*sizeof(bool));
break;
default:
memset(my_session->multipacket,(char)false,2*sizeof(bool));
break;
}
memset(my_session->replies,0,2*sizeof(int));
memset(my_session->reply_packets,0,2*sizeof(int));
memset(my_session->eof,0,2*sizeof(int));
memset(my_session->waiting,1,2*sizeof(bool));
my_session->command = command;
return 1;
}

View File

@ -1045,6 +1045,59 @@ void slcursor_add_data(
CHK_SLIST_CURSOR(c);
}
/**
* Remove the current node in the slist. This does not delete the data in the
* node but will delete the structure pointing to that data. This is useful when
* the user wants to free the allocated memory. After node removal, the cursor
* will point to the node before the removed node.
* @param c Cursor pointing to the data node to be removed
*/
void slcursor_remove_data(slist_cursor_t* c)
{
slist_node_t* node = c->slcursor_pos;
int havemore = slist_size(c);
slcursor_move_to_begin (c);
if(node == c->slcursor_pos)
{
c->slcursor_list->slist_head = c->slcursor_list->slist_head->slnode_next;
slcursor_move_to_begin (c);
atomic_add((int*)&node->slnode_list->slist_nelems,-1);
atomic_add((int*)&node->slnode_cursor_refcount,-1);
if(node->slnode_cursor_refcount == 0)
{
free(node);
}
return;
}
while(havemore)
{
if( c->slcursor_pos->slnode_next == node)
{
c->slcursor_pos->slnode_next = node->slnode_next;
atomic_add((int*)&node->slnode_list->slist_nelems,-1);
atomic_add((int*)&node->slnode_cursor_refcount,-1);
if(node->slnode_cursor_refcount == 0)
{
free(node);
}
return;
}
havemore = slcursor_step_ahead (c);
}
}
/**
* Return the size of the slist.
* @param c slist cursor which refers to a list
* @return nummber of elements in the list
*/
size_t slist_size(slist_cursor_t* c)
{
return c->slcursor_list->slist_nelems;
}
void slist_done(
slist_cursor_t* c)
@ -2176,3 +2229,22 @@ strip_escape_chars (char* val)
}
return true;
}
/**
* Calculate a hash value for a null-terminated string.
* @param key String to hash
* @return Hash value of the string
*/
int simple_str_hash(char* key)
{
if(key == NULL){
return 0;
}
int hash = 0,c = 0;
char* ptr = key;
while((c = *ptr++)){
hash = c + (hash << 6) + (hash << 16) - hash;
}
return hash;
}

View File

@ -84,8 +84,9 @@ EXTERN_C_BLOCK_BEGIN
slist_cursor_t* slist_init(void);
void slist_done(slist_cursor_t* c);
size_t slist_size(slist_cursor_t* c);
void slcursor_add_data(slist_cursor_t* c, void* data);
void slcursor_remove_data(slist_cursor_t* c);
void* slcursor_get_data(slist_cursor_t* c);
bool slcursor_move_to_begin(slist_cursor_t* c);
@ -202,6 +203,9 @@ char* replace_literal(char* haystack,
const char* replacement);
bool is_valid_posix_path(char* path);
bool strip_escape_chars(char*);
int simple_str_hash(char* key);
EXTERN_C_BLOCK_END
#endif /* SKYGW_UTILS_H */