diff --git a/query_classifier/query_classifier.cc b/query_classifier/query_classifier.cc index 28291634a..7bcb4d3ff 100644 --- a/query_classifier/query_classifier.cc +++ b/query_classifier/query_classifier.cc @@ -30,12 +30,7 @@ # undef MYSQL_CLIENT #endif -#include -#include "../utils/skygw_types.h" -#include "../utils/skygw_debug.h" -#include -#include - +#include #include #include #include @@ -55,6 +50,12 @@ #include #include +#include "../utils/skygw_types.h" +#include "../utils/skygw_debug.h" +#include +#include +#include + #include #include #include diff --git a/server/modules/filter/test/harness.h b/server/modules/filter/test/harness.h index a306a6811..ac3982d19 100644 --- a/server/modules/filter/test/harness.h +++ b/server/modules/filter/test/harness.h @@ -69,7 +69,7 @@ #include #include #include -#include +#include /** * A single name-value pair and a link to the next item in the * configuration. @@ -118,7 +118,6 @@ typedef struct int running; int verbose; /**Whether to print to stdout*/ int infile; /**A file where the queries are loaded from*/ - int expected; int error; char* mod_dir; /**Module directory absolute path*/ char* infile_name; @@ -127,6 +126,7 @@ typedef struct FILTERCHAIN* head; /**The head of the filter chain*/ FILTERCHAIN* tail; /**The tail of the filter chain*/ GWBUF** buffer; /**Buffers that are fed to the filter chain*/ + SESSION* session; int buffer_count; int session_count; DOWNSTREAM dummyrouter; /**Dummy downstream router for data extraction*/ @@ -174,7 +174,7 @@ typedef packet_t PACKET; /** * Initialize the static instance. */ -int harness_init(int argc,char** argv,HARNESS_INSTANCE** inst); +int harness_init(int argc,char** argv); /** * Frees all the query buffers @@ -361,14 +361,4 @@ GWBUF* gen_packet(PACKET pkt); */ int process_opts(int argc, char** argv); -/** - * Compares the contents of two files. - * This function resets the offsets of the file descriptors and leaves them in an - * undefined state. - * @param a The first file - * @param b The second file - * @return 0 if the files do not differ and 1 if they do or an error occurred. - */ -int compare_files(int a, int b); - #endif diff --git a/server/modules/filter/test/harness_common.c b/server/modules/filter/test/harness_common.c index c775333aa..0a853bae0 100644 --- a/server/modules/filter/test/harness_common.c +++ b/server/modules/filter/test/harness_common.c @@ -1,6 +1,12 @@ #include -int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){ +int dcbfun(struct dcb* dcb, GWBUF * buffer) +{ + printf("Data was written to client DCB.\n"); + return 1; +} + +int harness_init(int argc, char** argv){ int i = 0; if(!(argc == 2 && strcmp(argv[1],"-h") == 0)){ skygw_logmanager_init(0,NULL); @@ -14,14 +20,20 @@ int harness_init(int argc, char** argv, HARNESS_INSTANCE** inst){ return 1; } - *inst = &instance; instance.running = 1; instance.infile = -1; instance.outfile = -1; - instance.expected = -1; instance.buff_ind = -1; instance.last_ind = -1; instance.sess_ind = -1; + instance.session = calloc(1,sizeof(SESSION)); + MYSQL_session* mysqlsess = calloc(1,sizeof(MYSQL_session)); + DCB* dcb = calloc(1,sizeof(DCB)); + + sprintf(mysqlsess->user,"dummyuser"); + sprintf(mysqlsess->db,"dummydb"); + dcb->func.write = dcbfun; + instance.session->client = (void*)dcb; process_opts(argc,argv); @@ -86,17 +98,15 @@ void free_buffers() } int open_file(char* str, unsigned int write) { - int mode,fd; + int mode; if(write){ mode = O_RDWR|O_CREAT; }else{ mode = O_RDONLY; } - if((fd = open(str,mode,S_IRWXU|S_IRGRP|S_IXGRP|S_IXOTH)) < 0){ - printf("Error %d: %s\n",errno,strerror(errno)); - } - return fd; + + return open(str,mode,S_IRWXU|S_IRGRP|S_IXGRP|S_IXOTH); } @@ -612,8 +622,6 @@ int load_filter(FILTERCHAIN* fc, CONFIG* cnf) { FILTER_PARAMETER** fparams = NULL; int i, paramc = -1; - int sess_err = 0; - int x; if(cnf == NULL){ fparams = read_params(¶mc); @@ -678,19 +686,16 @@ int load_filter(FILTERCHAIN* fc, CONFIG* cnf) } } + int sess_err = 0; if(cnf && fc && fc->instance){ fc->filter = (FILTER*)fc->instance->createInstance(NULL,fparams); - if(fc->filter == NULL){ - printf("Error loading filter:%s: createInstance returned NULL.\n",fc->name); - sess_err = 1; - goto error; - } + for(i = 0;isession[i] = fc->instance->newSession(fc->filter, fc->session[i])) && + if((fc->session[i] = fc->instance->newSession(fc->filter, instance.session)) && (fc->down[i] = calloc(1,sizeof(DOWNSTREAM))) && (fc->up[i] = calloc(1,sizeof(UPSTREAM)))){ @@ -702,7 +707,7 @@ int load_filter(FILTERCHAIN* fc, CONFIG* cnf) fc->instance->setUpstream(fc->filter, fc->session[i], fc->up[i]); }else{ skygw_log_write(LOGFILE_MESSAGE, - "Warning: The filter %s does not support client relies.\n",fc->name); + "Warning: The filter %s does not support client replies.\n",fc->name); } if(fc->next && fc->next->next){ @@ -755,8 +760,8 @@ int load_filter(FILTERCHAIN* fc, CONFIG* cnf) } } - error: - + + int x; if(fparams){ for(x = 0;xinstance->routeQuery(instance.head->filter, + instance.head->instance->routeQuery(instance.head->filter, instance.head->session[index], - instance.buffer[instance.buff_ind]) == 0){ - if(instance.outfile > 0){ - const char* msg = "Query returned 0.\n"; - write(instance.outfile,msg,strlen(msg)); - } - } + instance.buffer[instance.buff_ind]); if(instance.tail->instance->clientReply){ instance.tail->instance->clientReply(instance.tail->filter, instance.tail->session[index], @@ -943,11 +943,10 @@ GWBUF* gen_packet(PACKET pkt) } - int process_opts(int argc, char** argv) { int fd, buffsize = 1024; - int rd,rdsz, rval = 0; + int rd,rdsz, rval; size_t fsize; char *buff = calloc(buffsize,sizeof(char)), *tok = NULL; @@ -1000,18 +999,10 @@ int process_opts(int argc, char** argv) close(fd); return 1; } - char* conf_name = NULL; - rval = 0; - - while((rd = getopt(argc,argv,"e:m:c:i:o:s:t:d:qh")) > 0){ + while((rd = getopt(argc,argv,"m:c:i:o:s:t:d:qh")) > 0){ switch(rd){ - case 'e': - instance.expected = open_file(optarg,0); - printf("Expected output is read from: %s\n",optarg); - break; - case 'o': instance.outfile = open_file(optarg,1); printf("Output is written to: %s\n",optarg); @@ -1076,7 +1067,6 @@ int process_opts(int argc, char** argv) } } printf("\n"); - if(conf_name && load_config(conf_name)){ load_query(); }else{ @@ -1085,30 +1075,5 @@ int process_opts(int argc, char** argv) free(conf_name); close(fd); - return rval; -} - -int compare_files(int a,int b) -{ - char in[4098]; - char exp[4098]; - int line = 1; - - if(a < 1 || b < 1){ - return 1; - } - - if(lseek(a,0,SEEK_SET) < 0 || - lseek(b,0,SEEK_SET) < 0){ - return 1; - } - - while(fdgets(a,in,4098) && fdgets(b,exp,4098)){ - if(strcmp(in,exp)){ - printf("The files differ at line %d:\n%s\n-------------------------------------\n%s\n",line,in,exp); - return 1; - } - line++; - } return 0; } diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 62f209dc0..05bcfe2b1 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -57,11 +57,11 @@ typedef enum bref_state { BREF_CLOSED = 0x08 } bref_state_t; -#define BREF_IS_NOT_USED(s) (s->bref_state & ~BREF_IN_USE) -#define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE) -#define BREF_IS_WAITING_RESULT(s) (s->bref_num_result_wait > 0) -#define BREF_IS_QUERY_ACTIVE(s) (s->bref_state & BREF_QUERY_ACTIVE) -#define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED) +#define BREF_IS_NOT_USED(s) ((s)->bref_state & ~BREF_IN_USE) +#define BREF_IS_IN_USE(s) ((s)->bref_state & BREF_IN_USE) +#define BREF_IS_WAITING_RESULT(s) ((s)->bref_num_result_wait > 0) +#define BREF_IS_QUERY_ACTIVE(s) ((s)->bref_state & BREF_QUERY_ACTIVE) +#define BREF_IS_CLOSED(s) ((s)->bref_state & BREF_CLOSED) typedef enum backend_type_t { BE_UNDEFINED=-1, diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index ce6f1c033..8b12dbf86 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -1867,14 +1867,13 @@ void protocol_add_srv_command( /** add to the end of list */ p->protocol_command.scom_next = server_command_init(NULL, cmd); } - +#if defined(EXTRA_SS_DEBUG) LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Added command %s to fd %d.", STRPACKETTYPE(cmd), p->owner_dcb->fd))); -#if defined(EXTRA_SS_DEBUG) c = &p->protocol_command; while (c != NULL && c->scom_cmd != MYSQL_COM_UNDEFINED) @@ -1905,13 +1904,13 @@ void protocol_remove_srv_command( server_command_t* s; spinlock_acquire(&p->protocol_lock); s = &p->protocol_command; - +#if defined(EXTRA_SS_DEBUG) LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Removed command %s from fd %d.", STRPACKETTYPE(s->scom_cmd), p->owner_dcb->fd))); - +#endif if (s->scom_next == NULL) { p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED; diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 22d57683e..9352fe650 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -105,6 +105,12 @@ static route_target_t get_route_target ( target_t use_sql_variables_in, HINT* hint); +static backend_ref_t* check_candidate_bref( + backend_ref_t* candidate_bref, + backend_ref_t* new_bref, + select_criteria_t sc); + + static uint8_t getCapabilities (ROUTER* inst, void* router_session); #if defined(NOT_USED) @@ -1046,10 +1052,10 @@ static void freeSession( /** * Provide the router with a pointer to a suitable backend dcb. * - * As of Nov. 2014, slave which has least connections is always chosen. - * * Detect failures in server statuses and reselect backends if necessary. - * If name is specified, server name becomes primary selection criteria. + * If name is specified, server name becomes primary selection criteria. + * Similarly, if max replication lag is specified, skip backends which lag too + * much. * * @param p_dcb Address of the pointer to the resulting DCB * @param rses Pointer to router client session @@ -1067,7 +1073,6 @@ static bool get_dcb( { backend_ref_t* backend_ref; backend_ref_t* master_bref; - int smallest_nconn = -1; int i; bool succp = false; BACKEND* master_host; @@ -1092,6 +1097,7 @@ static bool get_dcb( goto return_succp; } #if defined(SS_DEBUG) + /** master_host is just for additional checking */ master_host = get_root_master(backend_ref, rses->rses_nbackends); if (master_bref->bref_backend != master_host) { @@ -1133,71 +1139,107 @@ static bool get_dcb( { goto return_succp; } + else + { + btype = BE_SLAVE; + } } if (btype == BE_SLAVE) { + backend_ref_t* candidate_bref = NULL; + for (i=0; irses_nbackends; i++) { - BACKEND* b = backend_ref[i].bref_backend; - /** - * To become chosen: - * backend must be in use, - * root master node must be found, - * backend is not allowed to be the master, - * backend's role can be either slave or relay - * server and it must have least connections - * at the moment. - */ - if (BREF_IS_IN_USE((&backend_ref[i])) && - master_bref->bref_backend != NULL && - b->backend_server != master_bref->bref_backend->backend_server && - (max_rlag == MAX_RLAG_UNDEFINED || - (b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE && - b->backend_server->rlag <= max_rlag)) && - (SERVER_IS_SLAVE(b->backend_server) || - SERVER_IS_RELAY_SERVER(b->backend_server)) && - (smallest_nconn == -1 || - b->backend_conn_count < smallest_nconn)) + BACKEND* b = (&backend_ref[i])->bref_backend; + /** + * Unused backend or backend which is not master nor + * slave can't be used + */ + if (!BREF_IS_IN_USE(&backend_ref[i]) || + (!SERVER_IS_MASTER(b->backend_server) && + !SERVER_IS_SLAVE(b->backend_server))) { - *p_dcb = backend_ref[i].bref_dcb; - smallest_nconn = b->backend_conn_count; - succp = true; - ss_dassert(backend_ref[i].bref_dcb->state != DCB_STATE_ZOMBIE); + continue; } - } - - if (!succp) /*< No valid slave was found, search master next */ - { - if (rses->router->available_slaves) + /** + * If there are no candidates yet accept both master or + * slave. If candidate is master, any slave replaces it. + */ + else if (candidate_bref == NULL || + (SERVER_IS_MASTER(candidate_bref->bref_backend->backend_server) && + SERVER_IS_SLAVE(b->backend_server))) { - rses->router->available_slaves = false; - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Warning : No slaves available " - "for the service %s.", - rses->router->service->name))); - } - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Warning : Using master %s:%d.", - master_bref->bref_backend->backend_server->name, - master_bref->bref_backend->backend_server->port))); - btype = BE_MASTER; - } - /** Found slave, correct the status flag */ - else if (rses->router->available_slaves == false) + /** + * Ensure that master has not changed dunring + * session and abort if it has. + */ + if (SERVER_IS_MASTER(b->backend_server)) + { + if (candidate_bref != master_bref) + { + /** Log master failure */ + succp = false; + break; + } + else + { + /** found master */ + candidate_bref = &backend_ref[i]; + succp = true; + } + } + /** + * Ensure that max replication lag is not set + * or that candidate's lag doesn't exceed the + * maximum allowed replication lag. + */ + else if (max_rlag == MAX_RLAG_UNDEFINED || + (b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE && + b->backend_server->rlag <= max_rlag)) + { + /** found slave */ + candidate_bref = &backend_ref[i]; + succp = true; + } + } + /** + * When candidate exists, compare it against the current + * backend and update assign it to new candidate if + * necessary. + */ + else if (max_rlag == MAX_RLAG_UNDEFINED || + (b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE && + b->backend_server->rlag <= max_rlag)) + { + candidate_bref = check_candidate_bref( + candidate_bref, + &backend_ref[i], + rses->rses_config.rw_slave_select_criteria); + } + else + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Server %s:%d is too much behind the " + "master, %d s. and can't be chosen.", + b->backend_server->name, + b->backend_server->port, + b->backend_server->rlag))); + } + } /*< for */ + /** Assign selected DCB's pointer value */ + if (candidate_bref != NULL) { - rses->router->available_slaves = true; - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "At least one slave has become available for " - "the service %s.", - rses->router->service->name))); - goto return_succp; + *p_dcb = candidate_bref->bref_dcb; } - } - + + goto return_succp; + } /*< if (btype == BE_SLAVE) */ + /** + * If target was originally master only then the execution jumps + * directly here. + */ if (btype == BE_MASTER) { if (BREF_IS_IN_USE(master_bref) && @@ -1225,6 +1267,43 @@ return_succp: return succp; } + +/** + * Find out which of the two backend servers has smaller value for select + * criteria property. + * + * @param cand previously selected candidate + * @param new challenger + * @param sc select criteria + * + * @return pointer to backend reference of that backend server which has smaller + * value in selection criteria. If either reference pointer is NULL then the + * other reference pointer value is returned. + */ +static backend_ref_t* check_candidate_bref( + backend_ref_t* cand, + backend_ref_t* new, + select_criteria_t sc) +{ + int (*p)(const void *, const void *); + /** get compare function */ + p = criteria_cmpfun[sc]; + + if (new == NULL) + { + return cand; + } + else if (cand == NULL || (p((void *)cand,(void *)new) > 0)) + { + return new; + } + else + { + return cand; + } +} + + /** * Examine the query type, transaction state and routing hints. Find out the * target for query routing. @@ -1264,6 +1343,7 @@ static route_target_t get_route_target ( */ else if (!trx_active && (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) || /*< any SELECT */ + QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_TABLES) || /*< 'SHOW TABLES' */ QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ)|| /*< read user var */ QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ) || /*< read sys var */ QUERY_IS_TYPE(qtype, QUERY_TYPE_EXEC_STMT) || /*< prepared stmt exec */ @@ -1271,6 +1351,7 @@ static route_target_t get_route_target ( { /** First set expected targets before evaluating hints */ if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) || + QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_TABLES) || /*< 'SHOW TABLES' */ /** Configured to allow reading variables from slaves */ (use_sql_variables_in == TYPE_ALL && (QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ) || @@ -2021,9 +2102,8 @@ static int routeQuery( rlag_max))); } } - } - - if (!succp && TARGET_IS_SLAVE(route_target)) + } + else if (TARGET_IS_SLAVE(route_target)) { btype = BE_SLAVE; @@ -2040,8 +2120,8 @@ static int routeQuery( NULL, rlag_max); if (succp) - { - atomic_add(&inst->stats.n_slave, 1); + { + atomic_add(&inst->stats.n_slave, 1); } else { @@ -2051,8 +2131,7 @@ static int routeQuery( "failed."))); } } - - if (!succp && TARGET_IS_MASTER(route_target)) + else if (TARGET_IS_MASTER(route_target)) { DCB* curr_master_dcb = NULL;