Re-enabled the ignore list, it uses the parameter 'ignore databases=<list of db names>'.

Added routing of trivial queries and ignored databases to the first available backend.
This commit is contained in:
Markus Makela
2014-12-13 07:47:46 +02:00
parent d1f5eaaaec
commit a91845b324
3 changed files with 232 additions and 63 deletions

View File

@ -450,6 +450,58 @@ int error_count = 0;
param->value)));
}
}
if(is_dbshard)
{
CONFIG_PARAMETER* param = NULL;
char* ignore_databases;
bool succp;
ignore_databases =
config_get_value(obj->parameters,
"ignore_databases");
if (ignore_databases != NULL)
{
param = config_get_param(
obj->parameters,
"ignore_databases");
}
if (param == NULL)
{
succp = false;
}
else
{
param->qfd.valstr = strdup(param->value);
param->qfd_param_type = STRING_TYPE;
succp = service_set_param_value(obj->element,
param,
ignore_databases,
COUNT_NONE,
STRING_TYPE);
}
if (!succp)
{
if(param){
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Warning : invalid value type "
"for parameter \'%s.%s = %s\'\n\tExpected "
"type is [master|all] for "
"use sql variables in.",
((SERVICE*)obj->element)->name,
param->name,
param->value)));
}else{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Error : parameter was NULL")));
}
}
}
/** Parameters for rwsplit router only */
if (is_rwsplit)
{
@ -1685,6 +1737,7 @@ static char *service_params[] =
"max_slave_connections",
"max_slave_replication_lag",
"use_sql_variables_in", /*< rwsplit only */
"ignore_databases",
"version_string",
"filters",
"weightby",

View File

@ -80,12 +80,14 @@ typedef enum {
TARGET_SLAVE = 0x02,
TARGET_NAMED_SERVER = 0x04,
TARGET_ALL = 0x08,
TARGET_RLAG_MAX = 0x10
TARGET_RLAG_MAX = 0x10,
TARGET_ANY = 0x20
} route_target_t;
#define TARGET_IS_UNDEFINED(t) (t == TARGET_UNDEFINED)
#define TARGET_IS_NAMED_SERVER(t) (t & TARGET_NAMED_SERVER)
#define TARGET_IS_ALL(t) (t & TARGET_ALL)
#define TARGET_IS_ANY(t) (t & TARGET_ANY)
typedef struct rses_property_st rses_property_t;
typedef struct router_client_session ROUTER_CLIENT_SES;
@ -305,7 +307,7 @@ typedef struct router_instance {
#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED));
void* dbnames_hash_init(BACKEND** backends);
bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable);
void* dbnames_hash_init(ROUTER_INSTANCE* inst,BACKEND** backends);
bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* hashtable);
#endif /*< _DBSHARDROUTER_H */

View File

@ -152,6 +152,9 @@ static bool get_shard_dcb(
DCB** dcb,
ROUTER_CLIENT_SES* rses,
char* name);
bool is_ignored_database(ROUTER_INSTANCE* inst, char* str);
#if 0
static void rwsplit_process_router_options(
ROUTER_INSTANCE* router,
@ -317,12 +320,13 @@ static void* hfree(void* fval)
/**
* Updates the hashtable with the database names and where to find them, adding
* new and removing obsolete pairs.
* @param inst Router instance
* @param backends Backends to query for database names
* @param hashtable Hashtable to use
* @return True if all database and server names were successfully retrieved
* otherwise false
*/
bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* hashtable)
{
const unsigned int connect_timeout = 15;
const unsigned int read_timeout = 10;
@ -434,20 +438,15 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
lengths = mysql_fetch_lengths(result);
/**
* Default databases to ignore, disable for now
*/
#ifdef NO_DEFAULT_DATABASES
if(strncmp(row[0],"information_schema",lengths[0]) == 0 ||
strncmp(row[0],"performance_schema",lengths[0]) == 0 ||
strncmp(row[0],"mysql",lengths[0]) == 0 ||
strncmp(row[0],"mysqlslap",lengths[0]) == 0 ||
strncmp(row[0],"test",lengths[0]) == 0){
continue;
}
#endif
dbnm = (char*)calloc(lengths[0] + 1,sizeof(char));
memcpy(dbnm,row[0],lengths[0]);
if(is_ignored_database(inst,dbnm))
{
free(dbnm);
continue;
}
servnm = strdup(server->unique_name);
if(hashtable_add(hashtable,dbnm,servnm) == 0)
@ -462,15 +461,16 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : conflicting "
"databases found. "
"Both \"%s\" and \"%s\" "
"have a database \"%s\".",
server->unique_name,
srvname,
dbnm)));
if(strcmp(srvname,server->unique_name) != 0)
{
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"Both \"%s\" and \"%s\" "
"have a database \"%s\".",
srvname,
server->unique_name,
dbnm)));
}
}
if(srvname)
@ -529,14 +529,37 @@ cleanup:
return rval;
}
/**
* Check if the database is in the ignore list of the router instance
* @param inst Router instance
* @param str Null-terminated string with the database name to check
* @return True if the database is in the ignore list and false if it is not in it
*/
bool is_ignored_database(ROUTER_INSTANCE* inst, char* str)
{
if(inst->ignore_list)
{
int i;
for(i = 0;inst->ignore_list[i];i++)
{
if(strcmp(inst->ignore_list[i],str) == 0)
{
return true;
}
}
}
return false;
}
/**
* Allocates a new hashtable and inserts database names and where to find them
* into it.
* @param inst Router instance
* @param backends Backends to query for database names
* @return Pointer to the newly allocated hashtable or NULL if an error occurred
*/
void* dbnames_hash_init(BACKEND** backends)
void* dbnames_hash_init(ROUTER_INSTANCE* inst,BACKEND** backends)
{
HASHTABLE* htbl = hashtable_alloc(512,hashkeyfun,hashcmpfun);
@ -548,13 +571,8 @@ void* dbnames_hash_init(BACKEND** backends)
return NULL;
}
/**Update the new hashtable with the key-value pairs*/
if(!update_dbnames_hash(backends,htbl))
if(!update_dbnames_hash(inst,backends,htbl))
{
/*
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Errors encountered while querying databases.")));
hashtable_free(htbl);
return NULL;
*/
hashtable_free(htbl);
htbl = NULL;
}
@ -629,7 +647,7 @@ bool check_shard_status(ROUTER_INSTANCE* router, char* shard)
}
else
{
update_dbnames_hash(router->servers,router->dbnames_hash);
update_dbnames_hash(router,router->servers,router->dbnames_hash);
}
break;
}
@ -637,6 +655,43 @@ bool check_shard_status(ROUTER_INSTANCE* router, char* shard)
return rval;
}
char** tokenize_string(char* str)
{
char *tok;
char **list = NULL;
int sz = 2, count = 0;
tok = strtok(str,", ");
if(tok == NULL)
return NULL;
list = (char**)malloc(sizeof(char*)*(sz));
while(tok)
{
if(count + 1 >= sz)
{
char** tmp = realloc(list,sizeof(char*)*(sz*2));
if(tmp == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : realloc returned NULL: %s.",strerror(errno))));
free(list);
return NULL;
}
list = tmp;
sz *= 2;
}
list[count] = strdup(tok);
count++;
tok = strtok(NULL,", ");
}
list[count] = NULL;
return list;
}
/**
* Implementation of the mandatory version entry point
*
@ -700,7 +755,8 @@ static void refreshInstance(
/** Catch unused parameter types */
ss_dassert(paramtype == COUNT_TYPE ||
paramtype == PERCENT_TYPE ||
paramtype == SQLVAR_TARGET_TYPE);
paramtype == SQLVAR_TARGET_TYPE ||
paramtype == STRING_TYPE);
if (paramtype == COUNT_TYPE)
{
@ -708,6 +764,16 @@ static void refreshInstance(
else if (paramtype == PERCENT_TYPE)
{
}
else if (paramtype == STRING_TYPE)
{
if (strncmp(param->name,
"ignore_databases",
MAX_PARAM_LEN) == 0)
{
router->ignore_list = tokenize_string(param->qfd.valstr);
}
}
else if (paramtype == SQLVAR_TARGET_TYPE)
{
if (strncmp(param->name,
@ -792,8 +858,7 @@ createInstance(SERVICE *service, char **options)
{
ROUTER_INSTANCE* router;
SERVER* server;
//CONFIG_PARAMETER* conf;
//char* confval = NULL;
CONFIG_PARAMETER* conf;
int nservers;
int i;
@ -862,21 +927,12 @@ createInstance(SERVICE *service, char **options)
router->bitvalue = 0;
/* Ignored for now
conf = config_get_param(service->svc_config_param,"ignore_databases");
if(conf)
{
confval = conf->value;
}
if(!parse_db_ignore_list(router,confval))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Parsing of database ignore list failed. ")));
goto clean_up;
}
*/
refreshInstance(router, conf);
}
/**
* Read config version number from service to inform what configuration
@ -888,7 +944,7 @@ createInstance(SERVICE *service, char **options)
/**
* Get hashtable which includes dbname,backend pairs
*/
router->dbnames_hash = (HASHTABLE*)dbnames_hash_init(router->servers);
router->dbnames_hash = (HASHTABLE*)dbnames_hash_init(router,router->servers);
if (router->dbnames_hash == NULL)
{
@ -1383,16 +1439,17 @@ static route_target_t get_shard_route_target (
QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_WRITE)) ||
/** enable or disable autocommit are always routed to all */
QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ)) /** added for @@version comment, temporary*/
QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT))
{
/** hints don't affect on routing */
target = TARGET_ALL;
}
/* else */
/* { */
/* target = TARGET_NAMED_SERVER; */
/* } */
else if(QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ) ||
(use_sql_variables_in == TYPE_ALL &&
QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_READ)))
{
target = TARGET_ANY;
}
#if defined(SS_DEBUG)
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
@ -1687,6 +1744,7 @@ static int routeQuery(
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
bool rses_is_closed = false;
bool change_successful = false;
route_target_t route_target = TARGET_UNDEFINED;
bool succp = false;
char* tname = NULL;
@ -1779,7 +1837,7 @@ static int routeQuery(
if (packet_type == MYSQL_COM_INIT_DB)
{
if (!change_current_db(inst, router_cli_ses, querybuf))
if (!(change_successful = change_current_db(inst, router_cli_ses, querybuf)))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
@ -1916,6 +1974,8 @@ static int routeQuery(
* the target is undefined and an error will be returned to the client.
*/
update_dbnames_hash(inst,inst->servers,inst->dbnames_hash);
if((tname = get_shard_target_name(inst,router_cli_ses,querybuf)) != NULL &&
check_shard_status(inst,tname))
{
@ -1926,6 +1986,11 @@ static int routeQuery(
else
{
/**
* The query targets something else than a shard.
*/
route_target = get_shard_route_target(qtype,
router_cli_ses->rses_transaction_active,
router_cli_ses->rses_config.rw_use_sql_variables_in,
@ -1941,16 +2006,21 @@ static int routeQuery(
char errstr[2048];
GWBUF *errbuff;
update_dbnames_hash(inst->servers,inst->dbnames_hash);
update_dbnames_hash(inst,inst->servers,inst->dbnames_hash);
tname = get_shard_target_name(inst,router_cli_ses,querybuf);
if(tname == NULL &&
router_cli_ses->rses_mysql_session->db[0] == '\0')
if((tname == NULL &&
router_cli_ses->rses_mysql_session->db[0] == '\0') ||
(packet_type == MYSQL_COM_INIT_DB && change_successful) ||
packet_type == MYSQL_COM_FIELD_LIST ||
(router_cli_ses->rses_mysql_session->db[0] != '\0' &&
is_ignored_database(inst,router_cli_ses->rses_mysql_session->db)))
{
/**
* No current database or databases in query, route to all.
* No current database and no databases in query or
* the database is ignored, route to first available backend.
*/
route_target = TARGET_ALL;
route_target = TARGET_ANY;
}
else
@ -2001,6 +2071,32 @@ static int routeQuery(
ret = 0;
goto retblock;
}
if (TARGET_IS_ANY(route_target))
{
int z;
for(z = 0;inst->servers[z];z++)
{
if(SERVER_IS_RUNNING(inst->servers[z]->backend_server))
{
tname = inst->servers[z]->backend_server->unique_name;
route_target = TARGET_NAMED_SERVER;
break;
}
}
if(TARGET_IS_ANY(route_target))
{
/**No valid backends alive*/
rses_end_locked_router_action(router_cli_ses);
ret = 0;
goto retblock;
}
}
/**
* Query is routed to one of the backends
*/
@ -4094,6 +4190,8 @@ static bool prep_stmt_drop(
}
#endif /*< PREP_STMT_CACHING */
#if 0
/********************************
* This routine returns the root master server from MySQL replication tree
* Get the root Master rule:
@ -4138,7 +4236,10 @@ static BACKEND *get_root_master(
return master_host;
}
#endif
#if 0
/********************************
* This routine returns the root master server from MySQL replication tree
* Get the root Master rule:
@ -4189,6 +4290,7 @@ static backend_ref_t* get_root_master_bref(
return candidate_bref;
}
#endif
static void dbshard_process_router_options(
ROUTER_INSTANCE* router,
@ -4261,7 +4363,7 @@ static bool change_current_db(
{
bool succp;
uint8_t* packet;
int message_len;
int message_len,i;
char* fail_str;
if(GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5)
@ -4276,12 +4378,24 @@ static bool change_current_db(
* If it isn't found, send a custom error packet to the client.
*/
update_dbnames_hash(inst->servers,inst->dbnames_hash);
update_dbnames_hash(inst,inst->servers,inst->dbnames_hash);
if(hashtable_fetch(
inst->dbnames_hash,
(char*)rses->rses_mysql_session->db) == NULL)
{
if(inst->ignore_list)
{
for(i = 0;inst->ignore_list[i];i++)
{
if(strcmp(inst->ignore_list[i],rses->rses_mysql_session->db) == 0)
{
succp = true;
goto retblock;
}
}
}
/** Create error message */
message_len = 25 + MYSQL_DATABASE_MAXLEN;
fail_str = calloc(1, message_len+1);