Split the filter testing harness into two different versions, the interactive and command line tool.

This commit is contained in:
Markus Makela 2014-10-06 09:41:21 +03:00
parent 4743b2e5bf
commit 524f693991
5 changed files with 451 additions and 425 deletions

View File

@ -0,0 +1,4 @@
add_executable(harness_ui harness_ui.c harness_common.c)
add_executable(harness harness_util.c harness_common.c)
target_link_libraries(harness_ui log_manager utils pthread)
target_link_libraries(harness log_manager utils pthread)

View File

@ -167,6 +167,11 @@ typedef enum
typedef packet_t PACKET;
/**
* Initialize the static instance.
*/
int init(int argc,char** argv);
/**
* Frees all the query buffers
*/

View File

@ -1,40 +1,33 @@
#include <harness.h>
int main(int argc, char** argv){
int i;
char buffer[256];
char* tk;
FILTERCHAIN* tmp_chn;
FILTERCHAIN* del_chn;
int harness_init(int argc, char** argv){
if(!(argc == 2 && strcmp(argv[1],"-h") == 0)){
skygw_logmanager_init(0,NULL);
}
if(!(instance.head = calloc(1,sizeof(FILTERCHAIN))))
{
printf("Error: Out of memory\n");
skygw_log_write(LOGFILE_ERROR,"Error: Out of memory\n");
if(!(argc == 2 && strcmp(argv[1],"-h") == 0)){
skygw_logmanager_init(0,NULL);
}
if(!(instance.head = calloc(1,sizeof(FILTERCHAIN))))
{
printf("Error: Out of memory\n");
skygw_log_write(LOGFILE_ERROR,"Error: Out of memory\n");
return 1;
}
return 1;
}
instance.running = 1;
instance.infile = -1;
instance.outfile = -1;
instance.buff_ind = -1;
instance.last_ind = -1;
instance.sess_ind = -1;
instance.running = 1;
instance.infile = -1;
instance.outfile = -1;
instance.buff_ind = -1;
instance.last_ind = -1;
instance.sess_ind = -1;
int do_route = process_opts(argc,argv);
if(!(instance.thrpool = malloc(instance.thrcount * sizeof(pthread_t)))){
printf("Error: Out of memory\n");
skygw_log_write(LOGFILE_ERROR,"Error: Out of memory\n");
return 1;
}
process_opts(argc,argv);
if(!(instance.thrpool = malloc(instance.thrcount * sizeof(pthread_t)))){
printf("Error: Out of memory\n");
skygw_log_write(LOGFILE_ERROR,"Error: Out of memory\n");
return 1;
}
/**Initialize worker threads*/
pthread_mutex_lock(&instance.work_mtx);
@ -43,238 +36,11 @@ int main(int argc, char** argv){
pthread_create(&instance.thrpool[i],NULL,(void*)work_buffer,(void*)thr_num++);
}
if(instance.verbose){
printf("\n\n\tFilter Test Harness\n\n");
}
/**non-interactive mode*/
if(do_route == 0){
route_buffers();
instance.running = 0;
}
while(instance.running){
printf("Harness> ");
memset(buffer,0,256);
fgets(buffer,256,stdin);
tk = strtok(buffer," \n");
switch(user_input(tk))
{
case RUNFILTERS:
if(instance.head->next == NULL){
printf("No filters loaded.\n");
break;
}
if(instance.buffer == NULL){
if(instance.infile<0){
manual_query();
}else{
load_query();
}
}
route_buffers();
break;
case LOAD_FILTER:
tk = strtok(NULL," \n");
tmp_chn = load_filter_module(tk);
if(!tmp_chn || !load_filter(tmp_chn,instance.conf)){
printf("Error creating filter instance.\n");
skygw_log_write(LOGFILE_ERROR,"Error: Error creating filter instance.\n");
}else{
instance.head = tmp_chn;
}
break;
case DELETE_FILTER:
tk = strtok(NULL," \n\0");
tmp_chn = instance.head;
del_chn = instance.head;
if(tk){
if(strcmp(instance.head->name,tk) == 0){
instance.head = instance.head->next;
}else{
while(del_chn->next){
if(strcmp(del_chn->name,tk) == 0){
tmp_chn->next = del_chn->next;
break;
}else{
tmp_chn = del_chn;
del_chn = del_chn->next;
}
}
}
if(del_chn && del_chn->next){
printf("Deleted %s.\n",del_chn->name);
if(del_chn->instance){
del_chn->instance->freeSession(del_chn->filter,del_chn->session);
}
free(del_chn->filter);
free(del_chn->down);
free(del_chn->name);
free(del_chn);
}else{
printf("No matching filter found.\n");
}
}
break;
case LOAD_CONFIG:
tk = strtok(NULL," \n\0");
if(!load_config(tk)){
free_filters();
}
break;
case SET_INFILE:
tk = strtok(NULL," \n\0");
if(instance.infile >= 0){
close(instance.infile);
free(instance.infile_name);
}
if(tk!= NULL){
free_buffers();
instance.infile = open_file(tk,0);
if(instance.infile >= 0){
load_query();
instance.infile_name = strdup(tk);
if(instance.verbose){
printf("Loaded %d queries from file '%s'\n",instance.buffer_count,instance.infile_name);
}
}
}else{
instance.infile = -1;
printf("Queries are read from: command line\n");
}
break;
case SET_OUTFILE:
tk = strtok(NULL," \n\0");
if(instance.outfile >= 0){
close(instance.outfile);
free(instance.outfile_name);
}
if(tk!= NULL){
instance.outfile = open_file(tk,1);
if(instance.outfile >= 0){
instance.outfile_name = strdup(tk);
printf("Output is logged to: %s\n",tk);
}
}else{
instance.outfile = -1;
printf("Output logging disabled.\n");
}
break;
case SESS_COUNT:
tk = strtok(NULL," \n\0");
free_buffers();
free_filters();
instance.session_count = atoi(tk);
printf("Sessions set to: %d\n", instance.session_count);
break;
case THR_COUNT:
instance.running = 0;
pthread_mutex_unlock(&instance.work_mtx);
for(i = 0;i<instance.thrcount;i++){
pthread_join(instance.thrpool[i],NULL);
}
pthread_mutex_lock(&instance.work_mtx);
instance.running = 1;
tk = strtok(NULL," \n\0");
instance.thrcount = atoi(tk);
void* t_thr_pool;
if(!(t_thr_pool = realloc(instance.thrpool,instance.thrcount * sizeof(pthread_t)))){
printf("Error: Out of memory\n");
skygw_log_write(LOGFILE_ERROR,"Error: Out of memory\n");
instance.running = 0;
break;
}
instance.thrpool = t_thr_pool;
thr_num = 1;
for(i = 0;i<instance.thrcount;i++){
pthread_create(&instance.thrpool[i],
NULL,
(void*)work_buffer,
(void*)thr_num++);
}
printf("Threads set to: %d\n", instance.thrcount);
break;
case QUIT:
instance.running = 0;
pthread_mutex_unlock(&instance.work_mtx);
for(i = 0;i<instance.thrcount;i++){
pthread_join(instance.thrpool[i],NULL);
}
break;
case UNDEFINED:
printf("Command not found, enter \"help\" for a list of commands\n");
break;
default:
break;
}
}
if(instance.infile >= 0){
close(instance.infile);
}
if(instance.outfile >= 0){
close(instance.outfile);
}
free_buffers();
free_filters();
skygw_logmanager_done();
skygw_logmanager_exit();
free(instance.head);
return 0;
return 0;
}
void free_filters()
{
int i;
@ -318,106 +84,6 @@ void free_buffers()
instance.infile = -1;
}
}
operation_t user_input(char* tk)
{
if(tk){
char cmpbuff[256];
int tklen = strcspn(tk," \n\0");
memset(cmpbuff,0,256);
if(tklen > 0 && tklen < 256){
strncpy(cmpbuff,tk,tklen);
strcat(cmpbuff,"\0");
if(strcmp(tk,"run")==0 || strcmp(tk,"r")==0){
return RUNFILTERS;
}else if(strcmp(cmpbuff,"add")==0){
return LOAD_FILTER;
}else if(strcmp(cmpbuff,"delete")==0){
return DELETE_FILTER;
}else if(strcmp(cmpbuff,"clear")==0){
tk = strtok(NULL," \n\0");
if(tk && !strcmp(tk,"queries")){
free_buffers();
printf("Queries cleared.\n");
}else if(tk && !strcmp(tk,"filters")){
printf("Filters cleared.\n");
free_filters();
}else{
printf("All cleared.\n");
free_buffers();
free_filters();
}
return OK;
}else if(strcmp(cmpbuff,"config")==0){
return LOAD_CONFIG;
}else if(strcmp(cmpbuff,"in")==0){
return SET_INFILE;
}else if(strcmp(cmpbuff,"out")==0){
return SET_OUTFILE;
}else if(strcmp(cmpbuff,"exit")==0 || strcmp(cmpbuff,"quit")==0 || strcmp(cmpbuff,"q")==0){
return QUIT;
}else if(strcmp(cmpbuff,"help")==0){
print_help();
return OK;
}else if(strcmp(cmpbuff,"status")==0){
print_status();
return OK;
}else if(strcmp(cmpbuff,"quiet")==0){
instance.verbose = 0;
return OK;
}else if(strcmp(cmpbuff,"verbose")==0){
instance.verbose = 1;
return OK;
}else if(strcmp(cmpbuff,"sessions")==0){
return SESS_COUNT;
}else if(strcmp(cmpbuff,"threads")==0){
return THR_COUNT;
}
}
}
return UNDEFINED;
}
void print_help()
{
printf("\nFilter Test Harness\n\n"
"List of commands:\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n "
"%-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n "
"%-32s%s\n %-32s%s\n"
,"help","Prints this help message."
,"run","Feeds the contents of the buffer to the filter chain."
,"add <filter name>","Loads a filter and appeds it to the end of the chain."
,"delete <filter name>","Deletes a filter."
,"status","Lists all loaded filters and queries"
,"clear","Clears the filter chain."
,"config <file name>","Loads filter configurations from a file."
,"in <file name>","Source file for the SQL statements."
,"out <file name>","Destination file for the SQL statements. Defaults to stdout if no parameters were passed."
,"threads <number>","Sets the amount of threads to use"
,"sessions <number>","How many sessions to create for each filter. This clears all loaded filters."
,"quiet","Print only error messages."
,"verbose","Print everything."
,"exit","Exit the program"
);
}
int open_file(char* str, unsigned int write)
{
int mode;
@ -605,38 +271,6 @@ int clientReply(void* ins, void* session, GWBUF* queue)
return 1;
}
void manual_query()
{
char query[1024];
unsigned int qlen;
GWBUF** tmpbuf;
free_buffers();
printf("Enter query: ");
fgets(query,1024,stdin);
qlen = strnlen(query, 1024);
if((tmpbuf = malloc(sizeof(GWBUF*)))== NULL){
printf("Error: cannot allocate enough memory.\n");
skygw_log_write(LOGFILE_ERROR,"Error: cannot allocate enough memory.\n");
return;
}
instance.buffer = tmpbuf;
instance.buffer_count = 1;
instance.buffer[0] = gwbuf_alloc(qlen + 5);
gwbuf_set_type(instance.buffer[0],GWBUF_TYPE_MYSQL);
memcpy(instance.buffer[0]->sbuf->data + 5,query,qlen);
instance.buffer[0]->sbuf->data[0] = (qlen);
instance.buffer[0]->sbuf->data[1] = (qlen << 8);
instance.buffer[0]->sbuf->data[2] = (qlen << 16);
instance.buffer[0]->sbuf->data[3] = 0x00;
instance.buffer[0]->sbuf->data[4] = 0x03;
}
int load_query()
{
@ -1108,40 +742,6 @@ FILTERCHAIN* load_filter_module(char* str)
}
void print_status()
{
if(instance.head->filter){
printf("Filters currently loaded:\n\n");
FILTERCHAIN* hd = instance.head;
int i = 1;
while(hd->filter){
printf("%d: %s\n", i++, hd->name);
hd = hd->next;
}
}else{
printf("No filters loaded.\n");
}
printf("\n");
if(instance.buffer_count > 0){
printf("%d queries loaded.\n",instance.buffer_count);
}else{
printf("No queries loaded.\n");
}
printf("Using %d threads and %d sessions.\n",instance.thrcount,instance.session_count);
if(instance.infile_name){
printf("Input is read from %s.\n",instance.infile_name);
}
if(instance.outfile_name){
printf("Output is written to %s.\n",instance.outfile_name);
}
}
void route_buffers()
{
if(instance.buffer_count > 0){

View File

@ -0,0 +1,403 @@
#include <harness.h>
int main(int argc, char** argv){
int i;
char buffer[256];
char* tk;
FILTERCHAIN* tmp_chn;
FILTERCHAIN* del_chn;
if(harness_init(argc,argv)){
printf("Error: Initialization failed.\n");
skygw_log_write(LOGFILE_ERROR,"Error: Initialization failed.\n");
skygw_logmanager_done();
skygw_logmanager_exit();
return 1;
}
if(instance.verbose){
printf("\n\n\tFilter Test Harness\n\n");
}
while(instance.running){
printf("Harness> ");
memset(buffer,0,256);
fgets(buffer,256,stdin);
tk = strtok(buffer," \n");
switch(user_input(tk))
{
case RUNFILTERS:
if(instance.head->next == NULL){
printf("No filters loaded.\n");
break;
}
if(instance.buffer == NULL){
if(instance.infile<0){
manual_query();
}else{
load_query();
}
}
route_buffers();
break;
case LOAD_FILTER:
tk = strtok(NULL," \n");
tmp_chn = load_filter_module(tk);
if(!tmp_chn || !load_filter(tmp_chn,instance.conf)){
printf("Error creating filter instance.\n");
skygw_log_write(LOGFILE_ERROR,"Error: Error creating filter instance.\n");
}else{
instance.head = tmp_chn;
}
break;
case DELETE_FILTER:
tk = strtok(NULL," \n\0");
tmp_chn = instance.head;
del_chn = instance.head;
if(tk){
if(strcmp(instance.head->name,tk) == 0){
instance.head = instance.head->next;
}else{
while(del_chn->next){
if(strcmp(del_chn->name,tk) == 0){
tmp_chn->next = del_chn->next;
break;
}else{
tmp_chn = del_chn;
del_chn = del_chn->next;
}
}
}
if(del_chn && del_chn->next){
printf("Deleted %s.\n",del_chn->name);
if(del_chn->instance){
del_chn->instance->freeSession(del_chn->filter,del_chn->session);
}
free(del_chn->filter);
free(del_chn->down);
free(del_chn->name);
free(del_chn);
}else{
printf("No matching filter found.\n");
}
}
break;
case LOAD_CONFIG:
tk = strtok(NULL," \n\0");
if(!load_config(tk)){
free_filters();
}
break;
case SET_INFILE:
tk = strtok(NULL," \n\0");
if(instance.infile >= 0){
close(instance.infile);
free(instance.infile_name);
}
if(tk!= NULL){
free_buffers();
instance.infile = open_file(tk,0);
if(instance.infile >= 0){
load_query();
instance.infile_name = strdup(tk);
if(instance.verbose){
printf("Loaded %d queries from file '%s'\n",instance.buffer_count,instance.infile_name);
}
}
}else{
instance.infile = -1;
printf("Queries are read from: command line\n");
}
break;
case SET_OUTFILE:
tk = strtok(NULL," \n\0");
if(instance.outfile >= 0){
close(instance.outfile);
free(instance.outfile_name);
}
if(tk!= NULL){
instance.outfile = open_file(tk,1);
if(instance.outfile >= 0){
instance.outfile_name = strdup(tk);
printf("Output is logged to: %s\n",tk);
}
}else{
instance.outfile = -1;
printf("Output logging disabled.\n");
}
break;
case SESS_COUNT:
tk = strtok(NULL," \n\0");
free_buffers();
free_filters();
instance.session_count = atoi(tk);
printf("Sessions set to: %d\n", instance.session_count);
break;
case THR_COUNT:
instance.running = 0;
pthread_mutex_unlock(&instance.work_mtx);
for(i = 0;i<instance.thrcount;i++){
pthread_join(instance.thrpool[i],NULL);
}
pthread_mutex_lock(&instance.work_mtx);
instance.running = 1;
tk = strtok(NULL," \n\0");
instance.thrcount = atoi(tk);
void* t_thr_pool;
if(!(t_thr_pool = realloc(instance.thrpool,instance.thrcount * sizeof(pthread_t)))){
printf("Error: Out of memory\n");
skygw_log_write(LOGFILE_ERROR,"Error: Out of memory\n");
instance.running = 0;
break;
}
instance.thrpool = t_thr_pool;
thr_num = 1;
for(i = 0;i<instance.thrcount;i++){
pthread_create(&instance.thrpool[i],
NULL,
(void*)work_buffer,
(void*)thr_num++);
}
printf("Threads set to: %d\n", instance.thrcount);
break;
case QUIT:
instance.running = 0;
pthread_mutex_unlock(&instance.work_mtx);
for(i = 0;i<instance.thrcount;i++){
pthread_join(instance.thrpool[i],NULL);
}
break;
case UNDEFINED:
printf("Command not found, enter \"help\" for a list of commands\n");
break;
default:
break;
}
}
if(instance.infile >= 0){
close(instance.infile);
}
if(instance.outfile >= 0){
close(instance.outfile);
}
free_buffers();
free_filters();
skygw_logmanager_done();
skygw_logmanager_exit();
free(instance.head);
return 0;
}
operation_t user_input(char* tk)
{
if(tk){
char cmpbuff[256];
int tklen = strcspn(tk," \n\0");
memset(cmpbuff,0,256);
if(tklen > 0 && tklen < 256){
strncpy(cmpbuff,tk,tklen);
strcat(cmpbuff,"\0");
if(strcmp(tk,"run")==0 || strcmp(tk,"r")==0){
return RUNFILTERS;
}else if(strcmp(cmpbuff,"add")==0){
return LOAD_FILTER;
}else if(strcmp(cmpbuff,"delete")==0){
return DELETE_FILTER;
}else if(strcmp(cmpbuff,"clear")==0){
tk = strtok(NULL," \n\0");
if(tk && !strcmp(tk,"queries")){
free_buffers();
printf("Queries cleared.\n");
}else if(tk && !strcmp(tk,"filters")){
printf("Filters cleared.\n");
free_filters();
}else{
printf("All cleared.\n");
free_buffers();
free_filters();
}
return OK;
}else if(strcmp(cmpbuff,"config")==0){
return LOAD_CONFIG;
}else if(strcmp(cmpbuff,"in")==0){
return SET_INFILE;
}else if(strcmp(cmpbuff,"out")==0){
return SET_OUTFILE;
}else if(strcmp(cmpbuff,"exit")==0 || strcmp(cmpbuff,"quit")==0 || strcmp(cmpbuff,"q")==0){
return QUIT;
}else if(strcmp(cmpbuff,"help")==0){
print_help();
return OK;
}else if(strcmp(cmpbuff,"status")==0){
print_status();
return OK;
}else if(strcmp(cmpbuff,"quiet")==0){
instance.verbose = 0;
return OK;
}else if(strcmp(cmpbuff,"verbose")==0){
instance.verbose = 1;
return OK;
}else if(strcmp(cmpbuff,"sessions")==0){
return SESS_COUNT;
}else if(strcmp(cmpbuff,"threads")==0){
return THR_COUNT;
}
}
}
return UNDEFINED;
}
void print_help()
{
printf("\nFilter Test Harness\n\n"
"List of commands:\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n "
"%-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n %-32s%s\n "
"%-32s%s\n %-32s%s\n"
,"help","Prints this help message."
,"run","Feeds the contents of the buffer to the filter chain."
,"add <filter name>","Loads a filter and appeds it to the end of the chain."
,"delete <filter name>","Deletes a filter."
,"status","Lists all loaded filters and queries"
,"clear","Clears the filter chain."
,"config <file name>","Loads filter configurations from a file."
,"in <file name>","Source file for the SQL statements."
,"out <file name>","Destination file for the SQL statements. Defaults to stdout if no parameters were passed."
,"threads <number>","Sets the amount of threads to use"
,"sessions <number>","How many sessions to create for each filter. This clears all loaded filters."
,"quiet","Print only error messages."
,"verbose","Print everything."
,"exit","Exit the program"
);
}
void manual_query()
{
char query[1024];
unsigned int qlen;
GWBUF** tmpbuf;
free_buffers();
printf("Enter query: ");
fgets(query,1024,stdin);
qlen = strnlen(query, 1024);
if((tmpbuf = malloc(sizeof(GWBUF*)))== NULL){
printf("Error: cannot allocate enough memory.\n");
skygw_log_write(LOGFILE_ERROR,"Error: cannot allocate enough memory.\n");
return;
}
instance.buffer = tmpbuf;
instance.buffer_count = 1;
instance.buffer[0] = gwbuf_alloc(qlen + 5);
gwbuf_set_type(instance.buffer[0],GWBUF_TYPE_MYSQL);
memcpy(instance.buffer[0]->sbuf->data + 5,query,qlen);
instance.buffer[0]->sbuf->data[0] = (qlen);
instance.buffer[0]->sbuf->data[1] = (qlen << 8);
instance.buffer[0]->sbuf->data[2] = (qlen << 16);
instance.buffer[0]->sbuf->data[3] = 0x00;
instance.buffer[0]->sbuf->data[4] = 0x03;
}
void print_status()
{
if(instance.head->filter){
printf("Filters currently loaded:\n\n");
FILTERCHAIN* hd = instance.head;
int i = 1;
while(hd->filter){
printf("%d: %s\n", i++, hd->name);
hd = hd->next;
}
}else{
printf("No filters loaded.\n");
}
printf("\n");
if(instance.buffer_count > 0){
printf("%d queries loaded.\n",instance.buffer_count);
}else{
printf("No queries loaded.\n");
}
printf("Using %d threads and %d sessions.\n",instance.thrcount,instance.session_count);
if(instance.infile_name){
printf("Input is read from %s.\n",instance.infile_name);
}
if(instance.outfile_name){
printf("Output is written to %s.\n",instance.outfile_name);
}
}

View File

@ -0,0 +1,14 @@
#include <harness.h>
int main()
{
if(harness_init(argc,argv)){
printf("Error: Initialization failed.\n");
skygw_log_write(LOGFILE_ERROR,"Error: Initialization failed.\n");
skygw_logmanager_done();
skygw_logmanager_exit();
return 1;
}
route_buffers();
return 0;
}