This commit is contained in:
VilhoRaatikka
2014-12-06 12:48:37 +02:00
3 changed files with 159 additions and 25 deletions

View File

@ -39,7 +39,7 @@ MODULE_INFO info = {
MODULE_API_ROUTER,
MODULE_BETA_RELEASE,
ROUTER_VERSION,
"A Read/Write splitting router for enhancement read scalability"
"A database sharding router for simple sharding"
};
#if defined(SS_DEBUG)
# include <mysql_client_server_protocol.h>
@ -103,12 +103,12 @@ static route_target_t get_route_target (
bool trx_active,
target_t use_sql_variables_in,
HINT* hint);
#if 0
static backend_ref_t* check_candidate_bref(
backend_ref_t* candidate_bref,
backend_ref_t* new_bref,
select_criteria_t sc);
#endif
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
@ -152,11 +152,11 @@ static bool get_dcb(
backend_type_t btype,
char* name,
int max_rlag);
#if 0
static void rwsplit_process_router_options(
ROUTER_INSTANCE* router,
char** options);
#endif
static ROUTER_OBJECT MyObject = {
createInstance,
@ -175,6 +175,9 @@ static bool rses_begin_locked_router_action(
static void rses_end_locked_router_action(
ROUTER_CLIENT_SES* rses);
static int rses_get_max_replication_lag(
ROUTER_CLIENT_SES* rses);
static void mysql_sescmd_done(
mysql_sescmd_t* sescmd);
@ -411,6 +414,36 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
if(hashtable_add(hashtable,dbnm,servnm) == 0){
#ifdef SHARD_UPDATES
{
char* old_backend = (char*)hashtable_fetch(hashtable,dbnm);
int j;
bool is_alive = false;
for(j = 0;backends[j];j++){
/**
* See if the old backend is still alive. If not then update
* the hashtable with the current backend's name.
*/
if(strcmp(server->unique_name,old_backend) == 0 &&
SERVER_IS_RUNNING(server)){
is_alive = true;
}
}
if(!is_alive){
hashtable_delete(hashtable,dbnm);
if(hashtable_add(hashtable,dbnm,servnm)){
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"Updated the backend of database '%s' to '%s'.",dbnm,servnm)));
}else{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Error: Failed to insert values into hashtable.")));
}
}
goto cleanup;
}
#endif
/*Check if the failure was due to a duplicate value*/
if(hashtable_fetch(hashtable,dbnm) == NULL){
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
@ -457,15 +490,43 @@ void* dbnames_hash_init(BACKEND** backends)
/**Update the new hashtable with the key-value pairs*/
if(!update_dbnames_hash(backends,htbl)){
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Errors encountered while querying databases.")));
hashtable_free(htbl);
return NULL;
//LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Errors encountered while querying databases.")));
//hashtable_free(htbl);
//return NULL;
}
return htbl;
}
bool add_shard_info(GWBUF* buffer, char* target)
{
HINT* hint = hint_create_route(NULL,HINT_ROUTE_TO_NAMED_SERVER,target);
return (bool)gwbuf_add_hint(buffer,hint);
}
char* get_shard_target_name(ROUTER_INSTANCE* router, GWBUF* buffer){
HASHTABLE* ht = router->dbnames_hash;
int sz = 0,i;
char** dbnms = NULL;
char* rval = NULL;
if(!query_is_parsed(buffer)){
parse_query(buffer);
}
dbnms = skygw_get_database_names(buffer,&sz);
if(sz > 0){
for(i = 0; i < sz; i++){
if((rval = (char*)hashtable_fetch(ht,dbnms[i]))){
break;
}
}
}
return rval;
}
/**
* Implementation of the mandatory version entry point
*
@ -730,7 +791,7 @@ createInstance(SERVICE *service, char **options)
* is used if any.
*/
router->dbshard_version = service->svc_config_version;
refreshInstance(router, NULL);
//refreshInstance(router, NULL);
/**
* Get hashtable which includes dbname,backend pairs
*/
@ -768,7 +829,7 @@ static void* newSession(
bool succp;
int router_nservers = 0; /*< # of servers in total */
int i;
const int min_nservers = 1; /*< hard-coded for now */
//const int min_nservers = 1; /*< hard-coded for now */
client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
@ -816,8 +877,9 @@ static void* newSession(
* Instead of calling this, ensure that there is at least one
* responding server.
*/
#if 0
router_nservers = router_get_servercount(router);
#if 0
if (!have_enough_servers(&client_rses,
min_nservers,
router_nservers,
@ -884,7 +946,7 @@ static void* newSession(
rses_end_locked_router_action(client_rses);
/**
/**
* Master and at least <min_nslaves> slaves must be found
*/
if (!succp) {
@ -897,6 +959,7 @@ static void* newSession(
client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT;
client_rses->rses_backend_ref = backend_ref;
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
client_rses->rses_master_ref = get_root_master_bref(client_rses);
router->stats.n_sessions += 1;
/**
@ -1236,10 +1299,12 @@ static bool get_dcb(
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
b->backend_server->rlag <= max_rlag)))
{
#if 0
candidate_bref = check_candidate_bref(
candidate_bref,
&backend_ref[i],
rses->rses_config.rw_slave_select_criteria);
#endif
}
else
{
@ -1349,6 +1414,7 @@ static route_target_t get_route_target (
HINT* hint) /*< !!! turha ? */
{
route_target_t target = TARGET_UNDEFINED;
/**
* These queries are not affected by hints
*/
@ -1689,6 +1755,7 @@ static int routeQuery(
route_target_t route_target;
bool succp = false;
backend_type_t btype; /*< target backend type */
char* tname = NULL;
CHK_CLIENT_RSES(router_cli_ses);
@ -1701,7 +1768,7 @@ static int routeQuery(
packet = GWBUF_DATA(querybuf);
packet_type = packet[4];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
if (rses_is_closed)
{
/**
@ -1872,10 +1939,23 @@ static int routeQuery(
* - route primarily according to the hints and if they failed,
* eventually to master
*/
route_target = get_route_target(qtype,
router_cli_ses->rses_transaction_active,
/**
* Added for simple sharding, using hints for testing.
*/
if((tname = get_shard_target_name(inst,querybuf)) != NULL &&
add_shard_info(querybuf,tname)){
route_target = TARGET_NAMED_SERVER;
}else{
route_target = get_route_target(qtype,
router_cli_ses->rses_transaction_active,
router_cli_ses->rses_config.rw_use_sql_variables_in,
querybuf->hint);
querybuf->hint);
}
if (TARGET_IS_ALL(route_target))
{
@ -2627,11 +2707,16 @@ static bool connect_backend_servers(
ROUTER_INSTANCE* router)
{
bool succp = true;
//bool is_synced_master;
bool master_connected = true;
int servers_found = 0;
int servers_connected = 0;
int i;
if (router->bitvalue != 0) /*< 'synced' is the only bitvalue in rwsplit */
int slaves_connected = 0;
int i,max_nservers = router_nservers;
select_criteria_t select_criteria = LEAST_GLOBAL_CONNECTIONS;
#if 0
if (router->bitvalue != 0) /*< 'synced' is the only bitvalue in rwsplit */
{
is_synced_master = true;
}
@ -2639,6 +2724,7 @@ static bool connect_backend_servers(
{
is_synced_master = false;
}
#endif
#if defined(EXTRA_SS_DEBUG)
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:")));
@ -2686,7 +2772,7 @@ static bool connect_backend_servers(
*/
for (i=0;
i<router_nservers &&
(server_connected < max_nservers || !master_connected);
(servers_connected < max_nservers || !master_connected);
i++)
{
BACKEND* b = backend_ref[i].bref_backend;
@ -2826,7 +2912,7 @@ static bool connect_backend_servers(
}
}
return_succp:
//return_succp:
return succp;
}
@ -3841,8 +3927,8 @@ static bool handle_error_new_connection(
{
SESSION* ses;
int router_nservers;
int max_nslaves;
int max_slave_rlag;
backend_ref_t* bref;
bool succp;
@ -3978,7 +4064,7 @@ static int router_get_servercount(
return router_nservers;
}
#if 0
/**
* Find out the number of read backend servers.
* Depending on the configuration value type, either copy direct count
@ -4006,7 +4092,7 @@ static int rses_get_max_slavecount(
return max_nslaves;
}
#endif
static int rses_get_max_replication_lag(
ROUTER_CLIENT_SES* rses)