Added functions that query backend servers for database names

This commit is contained in:
Markus Makela
2014-12-01 13:01:34 +02:00
parent 43d348e3ac
commit 66164aa664
3 changed files with 551 additions and 19 deletions

View File

@ -6,6 +6,10 @@ add_library(testroute SHARED testroute.c)
target_link_libraries(testroute log_manager utils)
install(TARGETS testroute DESTINATION modules)
add_library(dbshard SHARED dbshard/dbshard.c)
target_link_libraries(dbshard log_manager utils query_classifier)
install(TARGETS dbshard DESTINATION modules)
add_library(readconnroute SHARED readconnroute.c)
target_link_libraries(readconnroute log_manager utils)
install(TARGETS readconnroute DESTINATION modules)

View File

@ -23,7 +23,7 @@
#include <stdint.h>
#include <router.h>
#include <readwritesplit.h>
#include <dbshard.h>
#include <mysql.h>
#include <skygw_utils.h>
@ -142,13 +142,21 @@ int bref_cmp_current_load(
const void* bref1,
const void* bref2);
int bref_cmp_none(
const void* bref1,
const void* bref2)
{
return -1;
}
/**
* The order of functions _must_ match with the order the select criteria are
* listed in select_criteria_t definition in readwritesplit.h
*/
int (*criteria_cmpfun[LAST_CRITERIA])(const void*, const void*)=
{
NULL,
bref_cmp_none,
bref_cmp_global_conn,
bref_cmp_router_conn,
bref_cmp_behind_master,
@ -172,11 +180,6 @@ static bool get_dcb(
char* name,
int max_rlag);
static void rwsplit_process_router_options(
ROUTER_INSTANCE* router,
char** options);
static ROUTER_OBJECT MyObject = {
createInstance,
@ -253,6 +256,11 @@ static void tracelog_routed_query(
backend_ref_t* bref,
GWBUF* buf);
static void dbshard_process_router_options(
ROUTER_INSTANCE* router,
char** options);
static bool route_session_write(
ROUTER_CLIENT_SES* router_client_ses,
GWBUF* querybuf,
@ -336,6 +344,153 @@ static void* hfree(void* fval)
}
/**
* Updates the hashtable with the database names and where to find them, adding new and removing obsolete pairs.
* @param backends Backends to query for database names
* @param hashtable Hashtable to use
* @return True if all database and server names were successfully retrieved otherwise false
*/
bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
{
const unsigned int connect_timeout = 5;
const unsigned int read_timeout = 2;
bool rval = true;
SERVER* server;
MYSQL* handle;
MYSQL_RES* result;
MYSQL_ROW row;
int i, rc, numfields;
for(i = 0;backends[i] && rval;i++){
handle = mysql_init(NULL);
if(handle == NULL){
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Error: Failed to initialize MySQL handle.")));
continue;
}
rc = 0;
rc |= mysql_options(handle, MYSQL_OPT_CONNECT_TIMEOUT, (void *)&connect_timeout);
rc |= mysql_options(handle, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout);
if(rc != 0){
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Error: Failed to set MySQL connection options.")));
mysql_close(handle);
rval = false;
continue;
}
server = backends[i]->backend_server;
ss_dassert(server != NULL);
if (mysql_real_connect(handle,
server->name,
server->monuser,
server->monpw,
NULL,
server->port,
NULL,
0) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error: Failed to connect to backend server '%s'.",server->name)));
rval = false;
goto cleanup;
}
/**
* The server was successfully connected to, proceed to query for database names
*/
if((result = mysql_list_dbs(handle,NULL)) == NULL){
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Error: Failed to execute query in backend server '%s'.",server->name)));
goto cleanup;
}
numfields = mysql_num_fields(result);
if(numfields < 1){
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"Backend '%s' has no databases.",server->name)));
goto cleanup;
}
/**
* Walk through the list of databases in this backend
* and insert them into the hashtable. If the value is already in the hashtable
* but the backend isn't in the list of backends it is replaced with the first found backend.
*/
while((row = mysql_fetch_row(result)))
{
unsigned long *lengths;
char *dbnm = NULL,*servnm = NULL;
lengths = mysql_fetch_lengths(result);
dbnm = (char*)calloc(lengths[0] + 1,sizeof(char));
memcpy(dbnm,row[0],lengths[0]);
servnm = strdup(server->unique_name);
if(hashtable_add(hashtable,dbnm,servnm) == 0){
/*Check if the failure was due to a duplicate value*/
if(hashtable_fetch(hashtable,dbnm) == NULL){
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Error: Failed to insert values into hashtable.")));
}else{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Error: Duplicate value found.")));
}
rval = false;
free(dbnm);
free(servnm);
}
}
cleanup:
if(result){
mysql_free_result(result);
}
result = NULL;
mysql_close(handle);
}
return rval;
}
/**
* Allocates a new hashtable and inserts database names and where to find them into it.
* @param backends Backends to query for database names
* @return Pointer to the newly allocated hashtable or NULL if an error occurred
*/
void* dbnames_hash_init(BACKEND** backends)
{
HASHTABLE* htbl = hashtable_alloc(32,hashkeyfun,hashcmpfun);
if(htbl == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Error: Hashtable allocation failed.")));
return NULL;
}
/**Update the new hashtable with the key-value pairs*/
if(!update_dbnames_hash(backends,htbl)){
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Errors encountered while querying databases.")));
hashtable_free(htbl);
return NULL;
}
return htbl;
}
/**
* Implementation of the mandatory version entry point
*
@ -599,11 +754,11 @@ createInstance(SERVICE *service, char **options)
* is used if any.
*/
router->rwsplit_version = service->svc_config_version;
refreshInstance(router, NULL);
/**
* Get hashtable which includes dbname,backend pairs
*/
router->dbnames_hash = dbnames_hash_init(router->servers);
router->dbnames_hash = (HASHTABLE*)dbnames_hash_init(router->servers);
/**
* We have completed the creation of the router data, so now
* insert this router into the linked list of routers
@ -668,7 +823,7 @@ static void* newSession(
/** increment rwsplit router's config version number */
router->rwsplit_version = router->service->svc_config_version;
/** Read options */
rwsplit_process_router_options(router, router->service->routerOptions);
dbshard_process_router_options(router, router->service->routerOptions);
}
/** Copy config struct from router instance */
client_rses->rses_config = router->rwsplit_config;
@ -4331,6 +4486,7 @@ static bool have_enough_servers(
}
if (nservers < min_nsrv)
{
float err_pct = ((float)min_nsrv/(float)router_nsrv)*100.f;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
@ -4339,7 +4495,7 @@ static bool have_enough_servers(
"would be required.",
router->service->name,
(*p_rses)->rses_config.rw_max_slave_conn_percent,
min_nsrv/(router_nsrv/100))));
(int)err_pct)));
}
}
free(*p_rses);
@ -4646,11 +4802,54 @@ static backend_ref_t* get_root_master_bref(
}
static void dbshard_process_router_options(
ROUTER_INSTANCE* router,
char** options)
{
int i;
char* value;
select_criteria_t c;
for (i = 0; options[i]; i++)
{
if ((value = strchr(options[i], '=')) == NULL)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Warning : Unsupported "
"router option \"%s\" for "
"readwritesplit router.",
options[i])));
}
else
{
*value = 0;
value++;
if (strcmp(options[i], "slave_selection_criteria") == 0)
{
c = GET_SELECT_CRITERIA(value);
ss_dassert(
c == LEAST_GLOBAL_CONNECTIONS ||
c == LEAST_ROUTER_CONNECTIONS ||
c == LEAST_BEHIND_MASTER ||
c == LEAST_CURRENT_OPERATIONS ||
c == UNDEFINED_CRITERIA);
if (c == UNDEFINED_CRITERIA)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Warning : Unknown "
"slave selection criteria \"%s\". "
"Allowed values are LEAST_GLOBAL_CONNECTIONS, "
"LEAST_ROUTER_CONNECTIONS, "
"LEAST_BEHIND_MASTER,"
"and LEAST_CURRENT_OPERATIONS.",
STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria))));
}
else
{
router->rwsplit_config.rw_slave_select_criteria = c;
}
}
}
} /*< for */
}