From a91845b3245aa1e05be1c575a94bcb9aa9a537bb Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Sat, 13 Dec 2014 07:47:46 +0200 Subject: [PATCH] Re-enabled the ignore list, it uses the parameter 'ignore databases='. Added routing of trivial queries and ignored databases to the first available backend. --- server/core/config.c | 53 +++++ server/modules/include/dbshard.h | 8 +- server/modules/routing/dbshard/dbshard.c | 234 +++++++++++++++++------ 3 files changed, 232 insertions(+), 63 deletions(-) diff --git a/server/core/config.c b/server/core/config.c index 92e0cdc2f..840920503 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -450,6 +450,58 @@ int error_count = 0; param->value))); } } + + if(is_dbshard) + { + CONFIG_PARAMETER* param = NULL; + char* ignore_databases; + bool succp; + ignore_databases = + config_get_value(obj->parameters, + "ignore_databases"); + + if (ignore_databases != NULL) + { + param = config_get_param( + obj->parameters, + "ignore_databases"); + } + + if (param == NULL) + { + succp = false; + } + else + { + param->qfd.valstr = strdup(param->value); + param->qfd_param_type = STRING_TYPE; + succp = service_set_param_value(obj->element, + param, + ignore_databases, + COUNT_NONE, + STRING_TYPE); + } + if (!succp) + { + if(param){ + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : invalid value type " + "for parameter \'%s.%s = %s\'\n\tExpected " + "type is [master|all] for " + "use sql variables in.", + ((SERVICE*)obj->element)->name, + param->name, + param->value))); + }else{ + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : parameter was NULL"))); + + } + } + + } /** Parameters for rwsplit router only */ if (is_rwsplit) { @@ -1685,6 +1737,7 @@ static char *service_params[] = "max_slave_connections", "max_slave_replication_lag", "use_sql_variables_in", /*< rwsplit only */ + "ignore_databases", "version_string", "filters", "weightby", diff --git a/server/modules/include/dbshard.h b/server/modules/include/dbshard.h index 4d945ee94..2ade0e66e 100644 --- a/server/modules/include/dbshard.h +++ b/server/modules/include/dbshard.h @@ -80,12 +80,14 @@ typedef enum { TARGET_SLAVE = 0x02, TARGET_NAMED_SERVER = 0x04, TARGET_ALL = 0x08, - TARGET_RLAG_MAX = 0x10 + TARGET_RLAG_MAX = 0x10, + TARGET_ANY = 0x20 } route_target_t; #define TARGET_IS_UNDEFINED(t) (t == TARGET_UNDEFINED) #define TARGET_IS_NAMED_SERVER(t) (t & TARGET_NAMED_SERVER) #define TARGET_IS_ALL(t) (t & TARGET_ALL) +#define TARGET_IS_ANY(t) (t & TARGET_ANY) typedef struct rses_property_st rses_property_t; typedef struct router_client_session ROUTER_CLIENT_SES; @@ -305,7 +307,7 @@ typedef struct router_instance { #define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \ (SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED)); -void* dbnames_hash_init(BACKEND** backends); -bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable); +void* dbnames_hash_init(ROUTER_INSTANCE* inst,BACKEND** backends); +bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* hashtable); #endif /*< _DBSHARDROUTER_H */ diff --git a/server/modules/routing/dbshard/dbshard.c b/server/modules/routing/dbshard/dbshard.c index 0af85c8bd..ee8087235 100644 --- a/server/modules/routing/dbshard/dbshard.c +++ b/server/modules/routing/dbshard/dbshard.c @@ -152,6 +152,9 @@ static bool get_shard_dcb( DCB** dcb, ROUTER_CLIENT_SES* rses, char* name); + +bool is_ignored_database(ROUTER_INSTANCE* inst, char* str); + #if 0 static void rwsplit_process_router_options( ROUTER_INSTANCE* router, @@ -317,12 +320,13 @@ static void* hfree(void* fval) /** * Updates the hashtable with the database names and where to find them, adding * new and removing obsolete pairs. + * @param inst Router instance * @param backends Backends to query for database names * @param hashtable Hashtable to use * @return True if all database and server names were successfully retrieved * otherwise false */ -bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable) +bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* hashtable) { const unsigned int connect_timeout = 15; const unsigned int read_timeout = 10; @@ -434,20 +438,15 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable) lengths = mysql_fetch_lengths(result); - /** - * Default databases to ignore, disable for now - */ -#ifdef NO_DEFAULT_DATABASES - if(strncmp(row[0],"information_schema",lengths[0]) == 0 || - strncmp(row[0],"performance_schema",lengths[0]) == 0 || - strncmp(row[0],"mysql",lengths[0]) == 0 || - strncmp(row[0],"mysqlslap",lengths[0]) == 0 || - strncmp(row[0],"test",lengths[0]) == 0){ - continue; - } -#endif dbnm = (char*)calloc(lengths[0] + 1,sizeof(char)); memcpy(dbnm,row[0],lengths[0]); + + if(is_ignored_database(inst,dbnm)) + { + free(dbnm); + continue; + } + servnm = strdup(server->unique_name); if(hashtable_add(hashtable,dbnm,servnm) == 0) @@ -462,15 +461,16 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable) } else { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Warning : conflicting " - "databases found. " - "Both \"%s\" and \"%s\" " - "have a database \"%s\".", - server->unique_name, - srvname, - dbnm))); + if(strcmp(srvname,server->unique_name) != 0) + { + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "Both \"%s\" and \"%s\" " + "have a database \"%s\".", + srvname, + server->unique_name, + dbnm))); + } } if(srvname) @@ -529,14 +529,37 @@ cleanup: return rval; } +/** + * Check if the database is in the ignore list of the router instance + * @param inst Router instance + * @param str Null-terminated string with the database name to check + * @return True if the database is in the ignore list and false if it is not in it + */ +bool is_ignored_database(ROUTER_INSTANCE* inst, char* str) +{ + if(inst->ignore_list) + { + int i; + for(i = 0;inst->ignore_list[i];i++) + { + if(strcmp(inst->ignore_list[i],str) == 0) + { + return true; + } + } + } + return false; +} + /** * Allocates a new hashtable and inserts database names and where to find them * into it. + * @param inst Router instance * @param backends Backends to query for database names * @return Pointer to the newly allocated hashtable or NULL if an error occurred */ -void* dbnames_hash_init(BACKEND** backends) +void* dbnames_hash_init(ROUTER_INSTANCE* inst,BACKEND** backends) { HASHTABLE* htbl = hashtable_alloc(512,hashkeyfun,hashcmpfun); @@ -548,13 +571,8 @@ void* dbnames_hash_init(BACKEND** backends) return NULL; } /**Update the new hashtable with the key-value pairs*/ - if(!update_dbnames_hash(backends,htbl)) + if(!update_dbnames_hash(inst,backends,htbl)) { - /* - LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Errors encountered while querying databases."))); - hashtable_free(htbl); - return NULL; - */ hashtable_free(htbl); htbl = NULL; } @@ -629,7 +647,7 @@ bool check_shard_status(ROUTER_INSTANCE* router, char* shard) } else { - update_dbnames_hash(router->servers,router->dbnames_hash); + update_dbnames_hash(router,router->servers,router->dbnames_hash); } break; } @@ -637,6 +655,43 @@ bool check_shard_status(ROUTER_INSTANCE* router, char* shard) return rval; } +char** tokenize_string(char* str) +{ + char *tok; + char **list = NULL; + int sz = 2, count = 0; + + tok = strtok(str,", "); + + if(tok == NULL) + return NULL; + + list = (char**)malloc(sizeof(char*)*(sz)); + + while(tok) + { + if(count + 1 >= sz) + { + char** tmp = realloc(list,sizeof(char*)*(sz*2)); + if(tmp == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : realloc returned NULL: %s.",strerror(errno)))); + free(list); + return NULL; + } + list = tmp; + sz *= 2; + } + list[count] = strdup(tok); + count++; + tok = strtok(NULL,", "); + } + list[count] = NULL; + return list; +} + /** * Implementation of the mandatory version entry point * @@ -700,7 +755,8 @@ static void refreshInstance( /** Catch unused parameter types */ ss_dassert(paramtype == COUNT_TYPE || paramtype == PERCENT_TYPE || - paramtype == SQLVAR_TARGET_TYPE); + paramtype == SQLVAR_TARGET_TYPE || + paramtype == STRING_TYPE); if (paramtype == COUNT_TYPE) { @@ -708,6 +764,16 @@ static void refreshInstance( else if (paramtype == PERCENT_TYPE) { } + else if (paramtype == STRING_TYPE) + { + if (strncmp(param->name, + "ignore_databases", + MAX_PARAM_LEN) == 0) + { + router->ignore_list = tokenize_string(param->qfd.valstr); + } + } + else if (paramtype == SQLVAR_TARGET_TYPE) { if (strncmp(param->name, @@ -792,8 +858,7 @@ createInstance(SERVICE *service, char **options) { ROUTER_INSTANCE* router; SERVER* server; - //CONFIG_PARAMETER* conf; - //char* confval = NULL; + CONFIG_PARAMETER* conf; int nservers; int i; @@ -862,21 +927,12 @@ createInstance(SERVICE *service, char **options) router->bitvalue = 0; - /* Ignored for now conf = config_get_param(service->svc_config_param,"ignore_databases"); + if(conf) { - confval = conf->value; - } - - if(!parse_db_ignore_list(router,confval)) - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Parsing of database ignore list failed. "))); - goto clean_up; - } - */ + refreshInstance(router, conf); + } /** * Read config version number from service to inform what configuration @@ -888,7 +944,7 @@ createInstance(SERVICE *service, char **options) /** * Get hashtable which includes dbname,backend pairs */ - router->dbnames_hash = (HASHTABLE*)dbnames_hash_init(router->servers); + router->dbnames_hash = (HASHTABLE*)dbnames_hash_init(router,router->servers); if (router->dbnames_hash == NULL) { @@ -1383,16 +1439,17 @@ static route_target_t get_shard_route_target ( QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_WRITE)) || /** enable or disable autocommit are always routed to all */ QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) || - QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT) || - QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ)) /** added for @@version comment, temporary*/ + QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT)) { /** hints don't affect on routing */ target = TARGET_ALL; } - /* else */ - /* { */ - /* target = TARGET_NAMED_SERVER; */ - /* } */ + else if(QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ) || + (use_sql_variables_in == TYPE_ALL && + QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_READ))) + { + target = TARGET_ANY; + } #if defined(SS_DEBUG) LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, @@ -1687,6 +1744,7 @@ static int routeQuery( ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; bool rses_is_closed = false; + bool change_successful = false; route_target_t route_target = TARGET_UNDEFINED; bool succp = false; char* tname = NULL; @@ -1779,7 +1837,7 @@ static int routeQuery( if (packet_type == MYSQL_COM_INIT_DB) { - if (!change_current_db(inst, router_cli_ses, querybuf)) + if (!(change_successful = change_current_db(inst, router_cli_ses, querybuf))) { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, @@ -1916,6 +1974,8 @@ static int routeQuery( * the target is undefined and an error will be returned to the client. */ + update_dbnames_hash(inst,inst->servers,inst->dbnames_hash); + if((tname = get_shard_target_name(inst,router_cli_ses,querybuf)) != NULL && check_shard_status(inst,tname)) { @@ -1926,6 +1986,11 @@ static int routeQuery( else { + + /** + * The query targets something else than a shard. + */ + route_target = get_shard_route_target(qtype, router_cli_ses->rses_transaction_active, router_cli_ses->rses_config.rw_use_sql_variables_in, @@ -1941,16 +2006,21 @@ static int routeQuery( char errstr[2048]; GWBUF *errbuff; - update_dbnames_hash(inst->servers,inst->dbnames_hash); + update_dbnames_hash(inst,inst->servers,inst->dbnames_hash); tname = get_shard_target_name(inst,router_cli_ses,querybuf); - if(tname == NULL && - router_cli_ses->rses_mysql_session->db[0] == '\0') + if((tname == NULL && + router_cli_ses->rses_mysql_session->db[0] == '\0') || + (packet_type == MYSQL_COM_INIT_DB && change_successful) || + packet_type == MYSQL_COM_FIELD_LIST || + (router_cli_ses->rses_mysql_session->db[0] != '\0' && + is_ignored_database(inst,router_cli_ses->rses_mysql_session->db))) { /** - * No current database or databases in query, route to all. + * No current database and no databases in query or + * the database is ignored, route to first available backend. */ - route_target = TARGET_ALL; + route_target = TARGET_ANY; } else @@ -2001,6 +2071,32 @@ static int routeQuery( ret = 0; goto retblock; } + + if (TARGET_IS_ANY(route_target)) + { + int z; + + for(z = 0;inst->servers[z];z++) + { + if(SERVER_IS_RUNNING(inst->servers[z]->backend_server)) + { + tname = inst->servers[z]->backend_server->unique_name; + route_target = TARGET_NAMED_SERVER; + break; + } + } + + if(TARGET_IS_ANY(route_target)) + { + + /**No valid backends alive*/ + rses_end_locked_router_action(router_cli_ses); + ret = 0; + goto retblock; + } + + } + /** * Query is routed to one of the backends */ @@ -4094,6 +4190,8 @@ static bool prep_stmt_drop( } #endif /*< PREP_STMT_CACHING */ +#if 0 + /******************************** * This routine returns the root master server from MySQL replication tree * Get the root Master rule: @@ -4138,7 +4236,10 @@ static BACKEND *get_root_master( return master_host; } +#endif + +#if 0 /******************************** * This routine returns the root master server from MySQL replication tree * Get the root Master rule: @@ -4189,6 +4290,7 @@ static backend_ref_t* get_root_master_bref( return candidate_bref; } +#endif static void dbshard_process_router_options( ROUTER_INSTANCE* router, @@ -4261,7 +4363,7 @@ static bool change_current_db( { bool succp; uint8_t* packet; - int message_len; + int message_len,i; char* fail_str; if(GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5) @@ -4276,12 +4378,24 @@ static bool change_current_db( * If it isn't found, send a custom error packet to the client. */ - update_dbnames_hash(inst->servers,inst->dbnames_hash); + update_dbnames_hash(inst,inst->servers,inst->dbnames_hash); if(hashtable_fetch( inst->dbnames_hash, (char*)rses->rses_mysql_session->db) == NULL) { + if(inst->ignore_list) + { + for(i = 0;inst->ignore_list[i];i++) + { + if(strcmp(inst->ignore_list[i],rses->rses_mysql_session->db) == 0) + { + succp = true; + goto retblock; + } + } + } + /** Create error message */ message_len = 25 + MYSQL_DATABASE_MAXLEN; fail_str = calloc(1, message_len+1);