Conflicts:
	server/modules/include/dbshard.h
	server/modules/routing/dbshard/dbshard.c
This commit is contained in:
VilhoRaatikka
2014-12-01 19:34:20 +02:00
3 changed files with 237 additions and 10 deletions

View File

@ -233,6 +233,11 @@ static void tracelog_routed_query(
backend_ref_t* bref,
GWBUF* buf);
static void dbshard_process_router_options(
ROUTER_INSTANCE* router,
char** options);
static bool route_session_write(
ROUTER_CLIENT_SES* router_client_ses,
GWBUF* querybuf,
@ -311,6 +316,153 @@ static void* hfree(void* fval)
}
/**
* Updates the hashtable with the database names and where to find them, adding new and removing obsolete pairs.
* @param backends Backends to query for database names
* @param hashtable Hashtable to use
* @return True if all database and server names were successfully retrieved otherwise false
*/
bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
{
const unsigned int connect_timeout = 5;
const unsigned int read_timeout = 2;
bool rval = true;
SERVER* server;
MYSQL* handle;
MYSQL_RES* result;
MYSQL_ROW row;
int i, rc, numfields;
for(i = 0;backends[i] && rval;i++){
handle = mysql_init(NULL);
if(handle == NULL){
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Error: Failed to initialize MySQL handle.")));
continue;
}
rc = 0;
rc |= mysql_options(handle, MYSQL_OPT_CONNECT_TIMEOUT, (void *)&connect_timeout);
rc |= mysql_options(handle, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout);
if(rc != 0){
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Error: Failed to set MySQL connection options.")));
mysql_close(handle);
rval = false;
continue;
}
server = backends[i]->backend_server;
ss_dassert(server != NULL);
if (mysql_real_connect(handle,
server->name,
server->monuser,
server->monpw,
NULL,
server->port,
NULL,
0) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error: Failed to connect to backend server '%s'.",server->name)));
rval = false;
goto cleanup;
}
/**
* The server was successfully connected to, proceed to query for database names
*/
if((result = mysql_list_dbs(handle,NULL)) == NULL){
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Error: Failed to execute query in backend server '%s'.",server->name)));
goto cleanup;
}
numfields = mysql_num_fields(result);
if(numfields < 1){
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"Backend '%s' has no databases.",server->name)));
goto cleanup;
}
/**
* Walk through the list of databases in this backend
* and insert them into the hashtable. If the value is already in the hashtable
* but the backend isn't in the list of backends it is replaced with the first found backend.
*/
while((row = mysql_fetch_row(result)))
{
unsigned long *lengths;
char *dbnm = NULL,*servnm = NULL;
lengths = mysql_fetch_lengths(result);
dbnm = (char*)calloc(lengths[0] + 1,sizeof(char));
memcpy(dbnm,row[0],lengths[0]);
servnm = strdup(server->unique_name);
if(hashtable_add(hashtable,dbnm,servnm) == 0){
/*Check if the failure was due to a duplicate value*/
if(hashtable_fetch(hashtable,dbnm) == NULL){
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Error: Failed to insert values into hashtable.")));
}else{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Error: Duplicate value found.")));
}
rval = false;
free(dbnm);
free(servnm);
}
}
cleanup:
if(result){
mysql_free_result(result);
}
result = NULL;
mysql_close(handle);
}
return rval;
}
/**
* Allocates a new hashtable and inserts database names and where to find them into it.
* @param backends Backends to query for database names
* @return Pointer to the newly allocated hashtable or NULL if an error occurred
*/
void* dbnames_hash_init(BACKEND** backends)
{
HASHTABLE* htbl = hashtable_alloc(32,hashkeyfun,hashcmpfun);
if(htbl == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Error: Hashtable allocation failed.")));
return NULL;
}
/**Update the new hashtable with the key-value pairs*/
if(!update_dbnames_hash(backends,htbl)){
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Errors encountered while querying databases.")));
hashtable_free(htbl);
return NULL;
}
return htbl;
}
/**
* Implementation of the mandatory version entry point
*
@ -574,11 +726,11 @@ createInstance(SERVICE *service, char **options)
* is used if any.
*/
router->dbshard_version = service->svc_config_version;
refreshInstance(router, NULL);
/**
* Get hashtable which includes dbname,backend pairs
*/
router->dbnames_hash = dbnames_hash_init(router->servers);
router->dbnames_hash = (HASHTABLE*)dbnames_hash_init(router->servers);
/**
* We have completed the creation of the router data, so now
* insert this router into the linked list of routers
@ -4252,11 +4404,54 @@ static backend_ref_t* get_root_master_bref(
}
static void dbshard_process_router_options(
ROUTER_INSTANCE* router,
char** options)
{
int i;
char* value;
select_criteria_t c;
for (i = 0; options[i]; i++)
{
if ((value = strchr(options[i], '=')) == NULL)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Warning : Unsupported "
"router option \"%s\" for "
"readwritesplit router.",
options[i])));
}
else
{
*value = 0;
value++;
if (strcmp(options[i], "slave_selection_criteria") == 0)
{
c = GET_SELECT_CRITERIA(value);
ss_dassert(
c == LEAST_GLOBAL_CONNECTIONS ||
c == LEAST_ROUTER_CONNECTIONS ||
c == LEAST_BEHIND_MASTER ||
c == LEAST_CURRENT_OPERATIONS ||
c == UNDEFINED_CRITERIA);
if (c == UNDEFINED_CRITERIA)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Warning : Unknown "
"slave selection criteria \"%s\". "
"Allowed values are LEAST_GLOBAL_CONNECTIONS, "
"LEAST_ROUTER_CONNECTIONS, "
"LEAST_BEHIND_MASTER,"
"and LEAST_CURRENT_OPERATIONS.",
STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria))));
}
else
{
router->rwsplit_config.rw_slave_select_criteria = c;
}
}
}
} /*< for */
}