Merge branch 'MXS-544' into develop-MXS-544-merge

This commit is contained in:
Markus Makela
2016-03-03 21:39:39 +02:00
42 changed files with 3075 additions and 2294 deletions

View File

@ -1517,15 +1517,15 @@ GWBUF* gen_dummy_error(FW_SESSION* session, char* msg)
unsigned int errlen;
if (session == NULL || session->session == NULL ||
session->session->data == NULL ||
session->session->client == NULL)
session->session->client_dcb == NULL ||
session->session->client_dcb->data == NULL)
{
MXS_ERROR("Firewall filter session missing data.");
return NULL;
}
dcb = session->session->client;
mysql_session = (MYSQL_session*) session->session->data;
dcb = session->session->client_dcb;
mysql_session = (MYSQL_session*) dcb->data;
errlen = msg != NULL ? strlen(msg) : 0;
errmsg = (char*) malloc((512 + errlen) * sizeof(char));
@ -2042,7 +2042,7 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
{
FW_SESSION *my_session = (FW_SESSION *) session;
FW_INSTANCE *my_instance = (FW_INSTANCE *) instance;
DCB *dcb = my_session->session->client;
DCB *dcb = my_session->session->client_dcb;
int rval = 0;
ss_dassert(dcb && dcb->session);
@ -2057,9 +2057,7 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
}
else
{
USER *user = find_user_data(my_instance->htable,
my_session->session->client->user,
my_session->session->client->remote);
USER *user = find_user_data(my_instance->htable, dcb->user, dcb->remote);
bool query_ok = false;
if (user)

View File

@ -991,7 +991,7 @@ newSession(FILTER *instance, SESSION *session)
my_session->was_query = false;
my_session->uid = NULL;
my_session->session = session;
sessauth = my_session->session->data;
sessauth = my_session->session->client_dcb->data;
if (sessauth->db && strnlen(sessauth->db, 128) > 0)
{
my_session->db = strdup(sessauth->db);

View File

@ -251,7 +251,7 @@ orphan_free(void* data)
*/
if (ptr->session->state == SESSION_STATE_STOPPING &&
ptr->session->refcount == 0 && ptr->session->client == NULL)
ptr->session->refcount == 0 && ptr->session->client_dcb == NULL)
{
ptr->session->state = SESSION_STATE_TO_BE_FREED;
}
@ -511,7 +511,7 @@ newSession(FILTER *instance, SESSION *session)
my_session->active = 1;
my_session->residual = 0;
my_session->tee_replybuf = NULL;
my_session->client_dcb = session->client;
my_session->client_dcb = session->client_dcb;
my_session->instance = my_instance;
my_session->client_multistatement = false;
my_session->queue = NULL;
@ -544,7 +544,7 @@ newSession(FILTER *instance, SESSION *session)
FILTER_DEF* dummy;
UPSTREAM* dummy_upstream;
if ((dcb = dcb_clone(session->client)) == NULL)
if ((dcb = dcb_clone(session->client_dcb)) == NULL)
{
freeSession(instance, (void *) my_session);
my_session = NULL;
@ -604,7 +604,7 @@ newSession(FILTER *instance, SESSION *session)
}
ses->tail = *dummy_upstream;
MySQLProtocol* protocol = (MySQLProtocol*) session->client->protocol;
MySQLProtocol* protocol = (MySQLProtocol*) session->client_dcb->protocol;
my_session->use_ok = protocol->client_capabilities & (1 << 6);
free(dummy_upstream);
}

View File

@ -10,7 +10,7 @@ int dcbfun(struct dcb* dcb, GWBUF * buffer)
int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){
int i = 0,rval = 0;
int i = 0,rval = 0;
MYSQL_session* mysqlsess;
DCB* dcb;
char cwd[1024];
@ -19,12 +19,12 @@ int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){
if(!(argc == 2 && strcmp(argv[1],"-h") == 0)){
mxs_log_init(NULL,NULL,MXS_LOG_TARGET_DEFAULT);
}
if(!(instance.head = calloc(1,sizeof(FILTERCHAIN))))
{
printf("Error: Out of memory\n");
MXS_ERROR("Out of memory\n");
return 1;
}
@ -41,26 +41,26 @@ int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){
mysqlsess = calloc(1,sizeof(MYSQL_session));
sprintf(mysqlsess->user,"dummyuser");
sprintf(mysqlsess->db,"dummydb");
sprintf(mysqlsess->db,"dummydb");
dcb->func.write = dcbfun;
dcb->remote = strdup("0.0.0.0");
dcb->user = strdup("user");
instance.session->client = (void*)dcb;
instance.session->data = (void*)mysqlsess;
instance.session->client_dcb = (void*)dcb;
instance.session->client_dcb->data = (void*)mysqlsess;
getcwd(cwd,sizeof(cwd));
sprintf(tmp,"%s",cwd);
mxs_log_init(NULL, tmp, MXS_LOG_TARGET_DEFAULT);
rval = process_opts(argc,argv);
if(!(instance.thrpool = malloc(instance.thrcount * sizeof(pthread_t)))){
printf("Error: Out of memory\n");
MXS_ERROR("Out of memory\n");
return 1;
}
/**Initialize worker threads*/
pthread_mutex_lock(&instance.work_mtx);
size_t thr_num = 1;
@ -100,14 +100,14 @@ void free_buffers()
if(instance.buffer){
int i;
for(i = 0;i<instance.buffer_count;i++){
gwbuf_free(instance.buffer[i]);
gwbuf_free(instance.buffer[i]);
}
free(instance.buffer);
instance.buffer = NULL;
instance.buffer_count = 0;
}
if(instance.infile >= 0){
close(instance.infile);
free(instance.infile_name);
@ -167,7 +167,7 @@ FILTER_PARAMETER** read_params(int* paramc)
}
pc++;
}
}
if(pc >= 64){
do_read = 0;
@ -183,7 +183,7 @@ FILTER_PARAMETER** read_params(int* paramc)
}
free(names[i]);
free(values[i]);
}
}
params[pc] = NULL;
*paramc = pc;
}
@ -207,7 +207,7 @@ int routeQuery(void* ins, void* session, GWBUF* queue)
buffsz += strnlen(queue->hint->value,1024);
}
}
qstr = calloc(buffsz + 1,sizeof(char));
if(qstr){
@ -235,7 +235,7 @@ int routeQuery(void* ins, void* session, GWBUF* queue)
case HINT_ROUTE_TO_ALL:
sprintf(ptr,"|HINT_ROUTE_TO_ALL");
break;
case HINT_PARAMETER:
sprintf(ptr,"|HINT_PARAMETER");
break;
@ -264,9 +264,9 @@ int routeQuery(void* ins, void* session, GWBUF* queue)
}
if(instance.verbose){
printf("Query endpoint: %s\n", qstr);
printf("Query endpoint: %s\n", qstr);
}
if(instance.outfile>=0){
write(instance.outfile,qstr,strlen(qstr));
write(instance.outfile,"\n",1);
@ -279,7 +279,7 @@ int routeQuery(void* ins, void* session, GWBUF* queue)
int clientReply(void* ins, void* session, GWBUF* queue)
{
if(instance.verbose){
pthread_mutex_lock(&instance.work_mtx);
unsigned char* ptr = (unsigned char*)queue->start;
@ -291,13 +291,13 @@ int clientReply(void* ins, void* session, GWBUF* queue)
printf("\n");
pthread_mutex_unlock(&instance.work_mtx);
}
if(instance.outfile>=0){
int qlen = queue->end - queue->start;
write(instance.outfile,"Reply: ",strlen("Reply: "));
write(instance.outfile,queue->start,qlen);
write(instance.outfile,"\n",1);
}
return 1;
@ -313,7 +313,7 @@ int clientReply(void* ins, void* session, GWBUF* queue)
int fdgets(int fd, char* buff, int size)
{
int i = 0;
while(i < size - 1 && read(fd,&buff[i],1))
{
if(buff[i] == '\n' || buff[i] == '\0')
@ -322,7 +322,7 @@ int fdgets(int fd, char* buff, int size)
}
i++;
}
buff[i] = '\0';
return i;
}
@ -365,24 +365,24 @@ int load_query()
rval = 1;
goto retblock;
}
query_list = tmpbuff;
qbuff_sz *= 2;
}
query_list[qcount] = calloc((offset + 1),sizeof(char));
strcpy(query_list[qcount],buffer);
offset = 0;
qcount++;
}
/**TODO check what messes up the first querystring*/
GWBUF** tmpbff = malloc(sizeof(GWBUF*)*(qcount + 1));
if(tmpbff){
for(i = 0;i<qcount;i++){
tmpbff[i] = gwbuf_alloc(strlen(query_list[i]) + 6);
if(tmpbff[i] == NULL)
@ -413,7 +413,7 @@ int load_query()
instance.buffer = tmpbff;
}else{
printf("Error: cannot allocate enough memory for buffers.\n");
MXS_ERROR("cannot allocate enough memory for buffers.\n");
MXS_ERROR("cannot allocate enough memory for buffers.\n");
free_buffers();
rval = 1;
goto retblock;
@ -423,7 +423,7 @@ int load_query()
rval = 1;
goto retblock;
}
instance.buffer_count = qcount;
retblock:
@ -478,7 +478,7 @@ int handler(void* user, const char* section, const char* name,
/**Section not found, creating a new one*/
if(iter == NULL){
CONFIG* nxt = malloc(sizeof(CONFIG));
if(nxt && (nxt->item = malloc(sizeof(CONFIG_ITEM)))){
nxt->section = strdup(section);
@ -576,7 +576,7 @@ int load_config( char* fname)
while(iter){
item = iter->item;
while(item){
if(!strcmp("module",item->name)){
if(instance.mod_dir){
@ -600,7 +600,7 @@ int load_config( char* fname)
}else{
if(instance.verbose){
printf("\t%s\n",iter->section);
printf("\t%s\n",iter->section);
}
}
}
@ -620,7 +620,7 @@ int load_config( char* fname)
item = instance.conf->item;
}
instance.conf = instance.conf->next;
}
cleanup:
@ -629,7 +629,7 @@ int load_config( char* fname)
instance.conf = instance.conf->next;
item = iter->item;
while(item){
while(item){
free(item->name);
free(item->value);
free(item);
@ -651,9 +651,9 @@ int load_filter(FILTERCHAIN* fc, CONFIG* cnf)
int sess_err = 0;
int x;
if(cnf == NULL){
fparams = read_params(&paramc);
}else{
CONFIG* iter = cnf;
@ -661,14 +661,14 @@ int load_filter(FILTERCHAIN* fc, CONFIG* cnf)
while(iter){
paramc = -1;
item = iter->item;
while(item){
/**Matching configuration found*/
if(!strcmp(item->name,"module") && !strcmp(item->value,fc->name)){
paramc = 0;
item = iter->item;
while(item){
if(strcmp(item->name,"module") && strcmp(item->name,"type")){
paramc++;
@ -678,7 +678,7 @@ int load_filter(FILTERCHAIN* fc, CONFIG* cnf)
item = iter->item;
fparams = calloc((paramc + 1),sizeof(FILTER_PARAMETER*));
if(fparams){
int i = 0;
while(item){
if(strcmp(item->name,"module") != 0 &&
@ -740,7 +740,7 @@ int load_filter(FILTERCHAIN* fc, CONFIG* cnf)
MXS_WARNING("The filter %s does not support client replies.\n",fc->name);
}
if(fc->next && fc->next->next){
if(fc->next && fc->next->next){
fc->down[i]->routeQuery = (void*)fc->next->instance->routeQuery;
fc->down[i]->session = fc->next->session[i];
@ -775,7 +775,7 @@ int load_filter(FILTERCHAIN* fc, CONFIG* cnf)
}
}
if(sess_err){
for(i = 0;i<instance.session_count;i++){
if(fc->filter && fc->session[i]){
@ -788,9 +788,9 @@ int load_filter(FILTERCHAIN* fc, CONFIG* cnf)
free(fc->name);
free(fc);
}
}
error:
error:
if(fparams){
@ -809,9 +809,9 @@ int load_filter(FILTERCHAIN* fc, CONFIG* cnf)
FILTERCHAIN* load_filter_module(char* str)
{
FILTERCHAIN* flt_ptr = NULL;
if((flt_ptr = calloc(1,sizeof(FILTERCHAIN))) != NULL &&
if((flt_ptr = calloc(1,sizeof(FILTERCHAIN))) != NULL &&
(flt_ptr->session = calloc(instance.session_count,sizeof(SESSION*))) != NULL &&
(flt_ptr->down = calloc(instance.session_count,sizeof(DOWNSTREAM*))) != NULL &&
(flt_ptr->down = calloc(instance.session_count,sizeof(DOWNSTREAM*))) != NULL &&
(flt_ptr->up = calloc(instance.session_count,sizeof(UPSTREAM*))) != NULL){
flt_ptr->next = instance.head;
}
@ -839,7 +839,7 @@ void route_buffers()
fin = instance.buffer_count*instance.session_count,
step = (fin/50.f)/fin;
FILTERCHAIN* fc = instance.head;
while(fc->next->next){
fc = fc->next;
}
@ -866,7 +866,7 @@ void route_buffers()
while(instance.last_ind < instance.session_count){
struct timespec ts1;
ts1.tv_sec = 0;
tprg = ((bprg + (float)instance.last_ind)/fin);
if(!instance.verbose){
if(tprg >= trig){
@ -883,7 +883,7 @@ void route_buffers()
instance.sess_ind = 0;
instance.last_ind = 0;
}
if(!instance.verbose){
@ -911,7 +911,7 @@ void work_buffer(void* thr_num)
instance.buff_ind < instance.buffer_count)
{
struct timespec ts1;
ts1.tv_sec = 0;
ts1.tv_sec = 0;
if(instance.head->instance->routeQuery(instance.head->filter,
@ -954,7 +954,7 @@ GWBUF* gen_packet(PACKET pkt)
if(psize > 0){
buff = gwbuf_alloc(psize);
ptr = (unsigned char*)buff->start;
switch(pkt){
case PACKET_OK:
@ -1007,8 +1007,8 @@ int process_opts(int argc, char** argv)
return 1;
}
if( (rval = lseek(fd,0,SEEK_END)) < 0 ||
if( (rval = lseek(fd,0,SEEK_END)) < 0 ||
lseek(fd,0,SEEK_SET) < 0){
printf("Error: Cannot seek file.\n");
close(fd);
@ -1033,9 +1033,9 @@ int process_opts(int argc, char** argv)
}
tok = strtok_r(NULL,"=",&saveptr);
}
free(buff);
instance.verbose = 1;
@ -1114,7 +1114,7 @@ int process_opts(int argc, char** argv)
break;
default:
break;
}