Added functions for shard runtime updating.

This commit is contained in:
Markus Makela
2014-12-09 14:12:12 +02:00
parent cf40cbfbf0
commit 78d09c35cb
2 changed files with 142 additions and 52 deletions

View File

@ -298,6 +298,8 @@ typedef struct router_instance {
struct router_instance* next; /*< Next router on the list */ struct router_instance* next; /*< Next router on the list */
bool available_slaves; /*< The router has some slaves available */ bool available_slaves; /*< The router has some slaves available */
HASHTABLE* dbnames_hash; /** Hashtable containing the database names and where to find them */ HASHTABLE* dbnames_hash; /** Hashtable containing the database names and where to find them */
char** ignore_list;
bool update_hash;
} ROUTER_INSTANCE; } ROUTER_INSTANCE;
#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \ #define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \

View File

@ -124,6 +124,8 @@ static prep_stmt_t* prep_stmt_init(prep_stmt_type_t type, void* id);
static void prep_stmt_done(prep_stmt_t* pstmt); static void prep_stmt_done(prep_stmt_t* pstmt);
#endif /*< PREP_STMT_CACHING */ #endif /*< PREP_STMT_CACHING */
bool parse_db_ignore_list(ROUTER_INSTANCE* router,char* param);
int bref_cmp_global_conn( int bref_cmp_global_conn(
const void* bref1, const void* bref1,
const void* bref2); const void* bref2);
@ -433,9 +435,9 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
lengths = mysql_fetch_lengths(result); lengths = mysql_fetch_lengths(result);
/** /**
* Default databases to ignore * Default databases to ignore, disable for now
*/ */
#ifdef NO_DEFAULT_DATABASES
if(strncmp(row[0],"information_schema",lengths[0]) == 0 || if(strncmp(row[0],"information_schema",lengths[0]) == 0 ||
strncmp(row[0],"performance_schema",lengths[0]) == 0 || strncmp(row[0],"performance_schema",lengths[0]) == 0 ||
strncmp(row[0],"mysql",lengths[0]) == 0 || strncmp(row[0],"mysql",lengths[0]) == 0 ||
@ -443,6 +445,7 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
strncmp(row[0],"test",lengths[0]) == 0){ strncmp(row[0],"test",lengths[0]) == 0){
continue; continue;
} }
#endif
dbnm = (char*)calloc(lengths[0] + 1,sizeof(char)); dbnm = (char*)calloc(lengths[0] + 1,sizeof(char));
memcpy(dbnm,row[0],lengths[0]); memcpy(dbnm,row[0],lengths[0]);
servnm = strdup(server->unique_name); servnm = strdup(server->unique_name);
@ -450,51 +453,9 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
if(hashtable_add(hashtable,dbnm,servnm) == 0) if(hashtable_add(hashtable,dbnm,servnm) == 0)
{ {
char* srvname; char* srvname;
#ifdef SHARD_UPDATES
{
char* old_backend = (char*)hashtable_fetch(hashtable,dbnm);
int j;
bool is_alive = false;
for(j = 0;backends[j];j++)
{
/**
* See if the old backend is still
* alive. If not then update
* the hashtable with the current backend's name.
*/
if(strcmp(server->unique_name,old_backend) == 0 &&
SERVER_IS_RUNNING(server))
{
is_alive = true;
}
}
if(!is_alive)
{
hashtable_delete(hashtable,dbnm);
if(hashtable_add(hashtable,dbnm,servnm))
{
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"Updated the backend of database '%s' to '%s'.",
dbnm,
servnm)));
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error: failed to insert values into hashtable.")));
}
}
goto cleanup;
}
#endif /*< SHARD_UPDATES */
/*Check if the failure was due to a duplicate value*/
if((srvname = hashtable_fetch(hashtable,dbnm)) == NULL) if((srvname = hashtable_fetch(hashtable,dbnm)) == NULL)
{ {
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error: failed to insert values into hashtable."))); "Error: failed to insert values into hashtable.")));
@ -503,7 +464,7 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
{ {
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : conflicting " "Warning : conflicting "
"databases found. " "databases found. "
"Both \"%s\" and \"%s\" " "Both \"%s\" and \"%s\" "
"have a database \"%s\".", "have a database \"%s\".",
@ -511,11 +472,48 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
srvname, srvname,
dbnm))); dbnm)));
} }
rval = false;
free(dbnm); char* old_backend = (char*)hashtable_fetch(hashtable,dbnm);
free(servnm); int j;
bool is_alive = false;
for(j = 0;backends[j];j++)
{
/**
* See if the old backend is still
* alive. If not then update
* the hashtable with the current backend's name.
*/
if(strcmp(server->unique_name,old_backend) == 0 &&
SERVER_IS_RUNNING(server))
{
is_alive = true;
break;
}
}
if(!is_alive)
{
hashtable_delete(hashtable,dbnm);
if(hashtable_add(hashtable,dbnm,servnm))
{
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"Updated the backend of database '%s' to '%s'.",
dbnm,
servnm)));
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error: failed to insert values into hashtable.")));
}
}
} /*< hashtable_add failed */ } /*< hashtable_add failed */
} /*< while */ } /*< while */
cleanup: cleanup:
if(result) if(result)
{ {
@ -760,6 +758,8 @@ createInstance(SERVICE *service, char **options)
{ {
ROUTER_INSTANCE* router; ROUTER_INSTANCE* router;
SERVER* server; SERVER* server;
//CONFIG_PARAMETER* conf;
//char* confval = NULL;
int nservers; int nservers;
int i; int i;
@ -826,11 +826,30 @@ createInstance(SERVICE *service, char **options)
*/ */
router->bitmask = 0; router->bitmask = 0;
router->bitvalue = 0; router->bitvalue = 0;
/* Ignored for now
conf = config_get_param(service->svc_config_param,"ignore_databases");
if(conf)
{
confval = conf->value;
}
if(!parse_db_ignore_list(router,confval))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Parsing of database ignore list failed. ")));
goto clean_up;
}
*/
/** /**
* Read config version number from service to inform what configuration * Read config version number from service to inform what configuration
* is used if any. * is used if any.
*/ */
router->dbshard_version = service->svc_config_version; router->dbshard_version = service->svc_config_version;
/** refreshInstance(router, NULL); */ /** refreshInstance(router, NULL); */
/** /**
* Get hashtable which includes dbname,backend pairs * Get hashtable which includes dbname,backend pairs
@ -1724,7 +1743,7 @@ static int routeQuery(
if (packet_type == MYSQL_COM_INIT_DB) if (packet_type == MYSQL_COM_INIT_DB)
{ {
if (!change_current_db(instance, router_cli_ses, querybuf)) if (!change_current_db(inst, router_cli_ses, querybuf))
{ {
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
@ -1833,8 +1852,14 @@ static int routeQuery(
*/ */
/** /**
* Added for simple sharding, using hints for testing. * Update the hashtable
*/ */
if(inst->update_hash)
{
update_dbnames_hash(inst->servers,inst->dbnames_hash);
}
if((tname = get_shard_target_name(inst,router_cli_ses,querybuf)) != NULL) if((tname = get_shard_target_name(inst,router_cli_ses,querybuf)) != NULL)
{ {
route_target = TARGET_NAMED_SERVER; route_target = TARGET_NAMED_SERVER;
@ -1887,6 +1912,7 @@ static int routeQuery(
* Search backend server by name or replication lag. * Search backend server by name or replication lag.
* If it fails, then try to find valid slave or master. * If it fails, then try to find valid slave or master.
*/ */
succp = get_shard_dcb(&target_dcb, router_cli_ses, tname); succp = get_shard_dcb(&target_dcb, router_cli_ses, tname);
if (!succp) if (!succp)
@ -4210,4 +4236,66 @@ reply_error:
} }
retblock: retblock:
return succp; return succp;
} }
/**
* Parses the configuration for databases to ignore.
* @param router The router instance
* @param param Configuration parameters
* @return True if the parsing was successful and false if an error occurred.
*/
bool parse_db_ignore_list(ROUTER_INSTANCE* router, char* param)
{
char** list = router->ignore_list;
int count = 0, i = 0;
char *value = param, *tok;
char **saveptr = NULL,**tmp;
list = malloc(sizeof(char*));
if(list == NULL){
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : malloc returned NULL.")));
return false;
}
if(param == NULL)
{
list[0] = NULL;
return true;
}
/**We have at least one value to ignore*/
count++;
tok = value;
while((tok = strchr(tok,',')))
{
tok++;
count++;
}
tmp = realloc(list,(count + 1)*sizeof(char*));
tok = strtok_r(value,",",saveptr);
if(tmp == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : realloc returned NULL.")));
return false;
}
list = tmp;
while(tok && count > i)
{
list[i++] = strdup(tok);
tok = strtok_r(NULL,",",saveptr);
}
list[i] = NULL;
return true;
}