Merge branch 'develop' into firewall

Conflicts:
	server/modules/filter/test/harness.h
	server/modules/filter/test/harness_common.c
This commit is contained in:
Markus Makela
2014-11-17 18:45:01 +02:00
6 changed files with 190 additions and 156 deletions

View File

@ -69,7 +69,7 @@
#include <ini.h>
#include <hint.h>
#include <modutil.h>
#include <errno.h>
#include <mysql_client_server_protocol.h>
/**
* A single name-value pair and a link to the next item in the
* configuration.
@ -118,7 +118,6 @@ typedef struct
int running;
int verbose; /**Whether to print to stdout*/
int infile; /**A file where the queries are loaded from*/
int expected;
int error;
char* mod_dir; /**Module directory absolute path*/
char* infile_name;
@ -127,6 +126,7 @@ typedef struct
FILTERCHAIN* head; /**The head of the filter chain*/
FILTERCHAIN* tail; /**The tail of the filter chain*/
GWBUF** buffer; /**Buffers that are fed to the filter chain*/
SESSION* session;
int buffer_count;
int session_count;
DOWNSTREAM dummyrouter; /**Dummy downstream router for data extraction*/
@ -174,7 +174,7 @@ typedef packet_t PACKET;
/**
* Initialize the static instance.
*/
int harness_init(int argc,char** argv,HARNESS_INSTANCE** inst);
int harness_init(int argc,char** argv);
/**
* Frees all the query buffers
@ -361,14 +361,4 @@ GWBUF* gen_packet(PACKET pkt);
*/
int process_opts(int argc, char** argv);
/**
* Compares the contents of two files.
* This function resets the offsets of the file descriptors and leaves them in an
* undefined state.
* @param a The first file
* @param b The second file
* @return 0 if the files do not differ and 1 if they do or an error occurred.
*/
int compare_files(int a, int b);
#endif

View File

@ -1,6 +1,12 @@
#include <harness.h>
int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){
int dcbfun(struct dcb* dcb, GWBUF * buffer)
{
printf("Data was written to client DCB.\n");
return 1;
}
int harness_init(int argc, char** argv){
int i = 0;
if(!(argc == 2 && strcmp(argv[1],"-h") == 0)){
skygw_logmanager_init(0,NULL);
@ -14,14 +20,20 @@ int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){
return 1;
}
*inst = &instance;
instance.running = 1;
instance.infile = -1;
instance.outfile = -1;
instance.expected = -1;
instance.buff_ind = -1;
instance.last_ind = -1;
instance.sess_ind = -1;
instance.session = calloc(1,sizeof(SESSION));
MYSQL_session* mysqlsess = calloc(1,sizeof(MYSQL_session));
DCB* dcb = calloc(1,sizeof(DCB));
sprintf(mysqlsess->user,"dummyuser");
sprintf(mysqlsess->db,"dummydb");
dcb->func.write = dcbfun;
instance.session->client = (void*)dcb;
process_opts(argc,argv);
@ -86,17 +98,15 @@ void free_buffers()
}
int open_file(char* str, unsigned int write)
{
int mode,fd;
int mode;
if(write){
mode = O_RDWR|O_CREAT;
}else{
mode = O_RDONLY;
}
if((fd = open(str,mode,S_IRWXU|S_IRGRP|S_IXGRP|S_IXOTH)) < 0){
printf("Error %d: %s\n",errno,strerror(errno));
}
return fd;
return open(str,mode,S_IRWXU|S_IRGRP|S_IXGRP|S_IXOTH);
}
@ -612,8 +622,6 @@ int load_filter(FILTERCHAIN* fc, CONFIG* cnf)
{
FILTER_PARAMETER** fparams = NULL;
int i, paramc = -1;
int sess_err = 0;
int x;
if(cnf == NULL){
fparams = read_params(&paramc);
@ -678,19 +686,16 @@ int load_filter(FILTERCHAIN* fc, CONFIG* cnf)
}
}
int sess_err = 0;
if(cnf && fc && fc->instance){
fc->filter = (FILTER*)fc->instance->createInstance(NULL,fparams);
if(fc->filter == NULL){
printf("Error loading filter:%s: createInstance returned NULL.\n",fc->name);
sess_err = 1;
goto error;
}
for(i = 0;i<instance.session_count;i++){
if((fc->session[i] = fc->instance->newSession(fc->filter, fc->session[i])) &&
if((fc->session[i] = fc->instance->newSession(fc->filter, instance.session)) &&
(fc->down[i] = calloc(1,sizeof(DOWNSTREAM))) &&
(fc->up[i] = calloc(1,sizeof(UPSTREAM)))){
@ -702,7 +707,7 @@ int load_filter(FILTERCHAIN* fc, CONFIG* cnf)
fc->instance->setUpstream(fc->filter, fc->session[i], fc->up[i]);
}else{
skygw_log_write(LOGFILE_MESSAGE,
"Warning: The filter %s does not support client relies.\n",fc->name);
"Warning: The filter %s does not support client replies.\n",fc->name);
}
if(fc->next && fc->next->next){
@ -755,8 +760,8 @@ int load_filter(FILTERCHAIN* fc, CONFIG* cnf)
}
}
error:
int x;
if(fparams){
for(x = 0;x<paramc;x++){
@ -872,14 +877,9 @@ void work_buffer(void* thr_num)
index < instance.session_count &&
instance.buff_ind < instance.buffer_count)
{
if(instance.head->instance->routeQuery(instance.head->filter,
instance.head->instance->routeQuery(instance.head->filter,
instance.head->session[index],
instance.buffer[instance.buff_ind]) == 0){
if(instance.outfile > 0){
const char* msg = "Query returned 0.\n";
write(instance.outfile,msg,strlen(msg));
}
}
instance.buffer[instance.buff_ind]);
if(instance.tail->instance->clientReply){
instance.tail->instance->clientReply(instance.tail->filter,
instance.tail->session[index],
@ -943,11 +943,10 @@ GWBUF* gen_packet(PACKET pkt)
}
int process_opts(int argc, char** argv)
{
int fd, buffsize = 1024;
int rd,rdsz, rval = 0;
int rd,rdsz, rval;
size_t fsize;
char *buff = calloc(buffsize,sizeof(char)), *tok = NULL;
@ -1000,18 +999,10 @@ int process_opts(int argc, char** argv)
close(fd);
return 1;
}
char* conf_name = NULL;
rval = 0;
while((rd = getopt(argc,argv,"e:m:c:i:o:s:t:d:qh")) > 0){
while((rd = getopt(argc,argv,"m:c:i:o:s:t:d:qh")) > 0){
switch(rd){
case 'e':
instance.expected = open_file(optarg,0);
printf("Expected output is read from: %s\n",optarg);
break;
case 'o':
instance.outfile = open_file(optarg,1);
printf("Output is written to: %s\n",optarg);
@ -1076,7 +1067,6 @@ int process_opts(int argc, char** argv)
}
}
printf("\n");
if(conf_name && load_config(conf_name)){
load_query();
}else{
@ -1085,30 +1075,5 @@ int process_opts(int argc, char** argv)
free(conf_name);
close(fd);
return rval;
}
int compare_files(int a,int b)
{
char in[4098];
char exp[4098];
int line = 1;
if(a < 1 || b < 1){
return 1;
}
if(lseek(a,0,SEEK_SET) < 0 ||
lseek(b,0,SEEK_SET) < 0){
return 1;
}
while(fdgets(a,in,4098) && fdgets(b,exp,4098)){
if(strcmp(in,exp)){
printf("The files differ at line %d:\n%s\n-------------------------------------\n%s\n",line,in,exp);
return 1;
}
line++;
}
return 0;
}

View File

@ -57,11 +57,11 @@ typedef enum bref_state {
BREF_CLOSED = 0x08
} bref_state_t;
#define BREF_IS_NOT_USED(s) (s->bref_state & ~BREF_IN_USE)
#define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE)
#define BREF_IS_WAITING_RESULT(s) (s->bref_num_result_wait > 0)
#define BREF_IS_QUERY_ACTIVE(s) (s->bref_state & BREF_QUERY_ACTIVE)
#define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED)
#define BREF_IS_NOT_USED(s) ((s)->bref_state & ~BREF_IN_USE)
#define BREF_IS_IN_USE(s) ((s)->bref_state & BREF_IN_USE)
#define BREF_IS_WAITING_RESULT(s) ((s)->bref_num_result_wait > 0)
#define BREF_IS_QUERY_ACTIVE(s) ((s)->bref_state & BREF_QUERY_ACTIVE)
#define BREF_IS_CLOSED(s) ((s)->bref_state & BREF_CLOSED)
typedef enum backend_type_t {
BE_UNDEFINED=-1,

View File

@ -1867,14 +1867,13 @@ void protocol_add_srv_command(
/** add to the end of list */
p->protocol_command.scom_next = server_command_init(NULL, cmd);
}
#if defined(EXTRA_SS_DEBUG)
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Added command %s to fd %d.",
STRPACKETTYPE(cmd),
p->owner_dcb->fd)));
#if defined(EXTRA_SS_DEBUG)
c = &p->protocol_command;
while (c != NULL && c->scom_cmd != MYSQL_COM_UNDEFINED)
@ -1905,13 +1904,13 @@ void protocol_remove_srv_command(
server_command_t* s;
spinlock_acquire(&p->protocol_lock);
s = &p->protocol_command;
#if defined(EXTRA_SS_DEBUG)
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Removed command %s from fd %d.",
STRPACKETTYPE(s->scom_cmd),
p->owner_dcb->fd)));
#endif
if (s->scom_next == NULL)
{
p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED;

View File

@ -105,6 +105,12 @@ static route_target_t get_route_target (
target_t use_sql_variables_in,
HINT* hint);
static backend_ref_t* check_candidate_bref(
backend_ref_t* candidate_bref,
backend_ref_t* new_bref,
select_criteria_t sc);
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
#if defined(NOT_USED)
@ -1046,10 +1052,10 @@ static void freeSession(
/**
* Provide the router with a pointer to a suitable backend dcb.
*
* As of Nov. 2014, slave which has least connections is always chosen.
*
* Detect failures in server statuses and reselect backends if necessary.
* If name is specified, server name becomes primary selection criteria.
* If name is specified, server name becomes primary selection criteria.
* Similarly, if max replication lag is specified, skip backends which lag too
* much.
*
* @param p_dcb Address of the pointer to the resulting DCB
* @param rses Pointer to router client session
@ -1067,7 +1073,6 @@ static bool get_dcb(
{
backend_ref_t* backend_ref;
backend_ref_t* master_bref;
int smallest_nconn = -1;
int i;
bool succp = false;
BACKEND* master_host;
@ -1092,6 +1097,7 @@ static bool get_dcb(
goto return_succp;
}
#if defined(SS_DEBUG)
/** master_host is just for additional checking */
master_host = get_root_master(backend_ref, rses->rses_nbackends);
if (master_bref->bref_backend != master_host)
{
@ -1133,71 +1139,107 @@ static bool get_dcb(
{
goto return_succp;
}
else
{
btype = BE_SLAVE;
}
}
if (btype == BE_SLAVE)
{
backend_ref_t* candidate_bref = NULL;
for (i=0; i<rses->rses_nbackends; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
/**
* To become chosen:
* backend must be in use,
* root master node must be found,
* backend is not allowed to be the master,
* backend's role can be either slave or relay
* server and it must have least connections
* at the moment.
*/
if (BREF_IS_IN_USE((&backend_ref[i])) &&
master_bref->bref_backend != NULL &&
b->backend_server != master_bref->bref_backend->backend_server &&
(max_rlag == MAX_RLAG_UNDEFINED ||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
b->backend_server->rlag <= max_rlag)) &&
(SERVER_IS_SLAVE(b->backend_server) ||
SERVER_IS_RELAY_SERVER(b->backend_server)) &&
(smallest_nconn == -1 ||
b->backend_conn_count < smallest_nconn))
BACKEND* b = (&backend_ref[i])->bref_backend;
/**
* Unused backend or backend which is not master nor
* slave can't be used
*/
if (!BREF_IS_IN_USE(&backend_ref[i]) ||
(!SERVER_IS_MASTER(b->backend_server) &&
!SERVER_IS_SLAVE(b->backend_server)))
{
*p_dcb = backend_ref[i].bref_dcb;
smallest_nconn = b->backend_conn_count;
succp = true;
ss_dassert(backend_ref[i].bref_dcb->state != DCB_STATE_ZOMBIE);
continue;
}
}
if (!succp) /*< No valid slave was found, search master next */
{
if (rses->router->available_slaves)
/**
* If there are no candidates yet accept both master or
* slave. If candidate is master, any slave replaces it.
*/
else if (candidate_bref == NULL ||
(SERVER_IS_MASTER(candidate_bref->bref_backend->backend_server) &&
SERVER_IS_SLAVE(b->backend_server)))
{
rses->router->available_slaves = false;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : No slaves available "
"for the service %s.",
rses->router->service->name)));
}
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : Using master %s:%d.",
master_bref->bref_backend->backend_server->name,
master_bref->bref_backend->backend_server->port)));
btype = BE_MASTER;
}
/** Found slave, correct the status flag */
else if (rses->router->available_slaves == false)
/**
* Ensure that master has not changed dunring
* session and abort if it has.
*/
if (SERVER_IS_MASTER(b->backend_server))
{
if (candidate_bref != master_bref)
{
/** Log master failure */
succp = false;
break;
}
else
{
/** found master */
candidate_bref = &backend_ref[i];
succp = true;
}
}
/**
* Ensure that max replication lag is not set
* or that candidate's lag doesn't exceed the
* maximum allowed replication lag.
*/
else if (max_rlag == MAX_RLAG_UNDEFINED ||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
b->backend_server->rlag <= max_rlag))
{
/** found slave */
candidate_bref = &backend_ref[i];
succp = true;
}
}
/**
* When candidate exists, compare it against the current
* backend and update assign it to new candidate if
* necessary.
*/
else if (max_rlag == MAX_RLAG_UNDEFINED ||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
b->backend_server->rlag <= max_rlag))
{
candidate_bref = check_candidate_bref(
candidate_bref,
&backend_ref[i],
rses->rses_config.rw_slave_select_criteria);
}
else
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Server %s:%d is too much behind the "
"master, %d s. and can't be chosen.",
b->backend_server->name,
b->backend_server->port,
b->backend_server->rlag)));
}
} /*< for */
/** Assign selected DCB's pointer value */
if (candidate_bref != NULL)
{
rses->router->available_slaves = true;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"At least one slave has become available for "
"the service %s.",
rses->router->service->name)));
goto return_succp;
*p_dcb = candidate_bref->bref_dcb;
}
}
goto return_succp;
} /*< if (btype == BE_SLAVE) */
/**
* If target was originally master only then the execution jumps
* directly here.
*/
if (btype == BE_MASTER)
{
if (BREF_IS_IN_USE(master_bref) &&
@ -1225,6 +1267,43 @@ return_succp:
return succp;
}
/**
* Find out which of the two backend servers has smaller value for select
* criteria property.
*
* @param cand previously selected candidate
* @param new challenger
* @param sc select criteria
*
* @return pointer to backend reference of that backend server which has smaller
* value in selection criteria. If either reference pointer is NULL then the
* other reference pointer value is returned.
*/
static backend_ref_t* check_candidate_bref(
backend_ref_t* cand,
backend_ref_t* new,
select_criteria_t sc)
{
int (*p)(const void *, const void *);
/** get compare function */
p = criteria_cmpfun[sc];
if (new == NULL)
{
return cand;
}
else if (cand == NULL || (p((void *)cand,(void *)new) > 0))
{
return new;
}
else
{
return cand;
}
}
/**
* Examine the query type, transaction state and routing hints. Find out the
* target for query routing.
@ -1264,6 +1343,7 @@ static route_target_t get_route_target (
*/
else if (!trx_active &&
(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) || /*< any SELECT */
QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_TABLES) || /*< 'SHOW TABLES' */
QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ)|| /*< read user var */
QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ) || /*< read sys var */
QUERY_IS_TYPE(qtype, QUERY_TYPE_EXEC_STMT) || /*< prepared stmt exec */
@ -1271,6 +1351,7 @@ static route_target_t get_route_target (
{
/** First set expected targets before evaluating hints */
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_TABLES) || /*< 'SHOW TABLES' */
/** Configured to allow reading variables from slaves */
(use_sql_variables_in == TYPE_ALL &&
(QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ) ||
@ -2021,9 +2102,8 @@ static int routeQuery(
rlag_max)));
}
}
}
if (!succp && TARGET_IS_SLAVE(route_target))
}
else if (TARGET_IS_SLAVE(route_target))
{
btype = BE_SLAVE;
@ -2040,8 +2120,8 @@ static int routeQuery(
NULL,
rlag_max);
if (succp)
{
atomic_add(&inst->stats.n_slave, 1);
{
atomic_add(&inst->stats.n_slave, 1);
}
else
{
@ -2051,8 +2131,7 @@ static int routeQuery(
"failed.")));
}
}
if (!succp && TARGET_IS_MASTER(route_target))
else if (TARGET_IS_MASTER(route_target))
{
DCB* curr_master_dcb = NULL;