Updates to dbshard router:
Added a parsing function to query classifier that returns an array of database names the query uses. Added a check if the query targets a sharded database. If so, a hint is added that routes the query to the named server.
This commit is contained in:
@ -1438,3 +1438,50 @@ char* skygw_get_qtype_str(
|
|||||||
}
|
}
|
||||||
return qtype_str;
|
return qtype_str;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an array of strings of databases that this query uses.
|
||||||
|
* If the database isn't defined in the query, it is assumed that this query only targets the current database.
|
||||||
|
* The value of @p size is set to the number of allocated strings. The caller is responsible for freeing all the allocated memory.
|
||||||
|
* @param querybuf GWBUF containing the query
|
||||||
|
* @param size Size of the resulting array
|
||||||
|
* @return A new array of strings containing the database names or NULL if no databases were found.
|
||||||
|
*/
|
||||||
|
char** skygw_get_database_names(GWBUF* querybuf,int* size)
|
||||||
|
{
|
||||||
|
LEX* lex;
|
||||||
|
TABLE_LIST* tbl;
|
||||||
|
char **databases = NULL, **tmp = NULL;
|
||||||
|
int currsz = 0,i = 0;
|
||||||
|
|
||||||
|
if( (lex = get_lex(querybuf)) == NULL)
|
||||||
|
{
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
|
||||||
|
lex->current_select = lex->all_selects_list;
|
||||||
|
|
||||||
|
|
||||||
|
while(lex->current_select){
|
||||||
|
tbl = lex->current_select->join_list->head();
|
||||||
|
while(tbl)
|
||||||
|
{
|
||||||
|
if(strcmp(tbl->db,"skygw_virtual") != 0){
|
||||||
|
if(i>= currsz){
|
||||||
|
tmp = (char**)realloc(databases,sizeof(char*)*(currsz*2 + 1));
|
||||||
|
if(tmp == NULL) goto retblock;
|
||||||
|
databases = tmp;
|
||||||
|
currsz = currsz*2 + 1;
|
||||||
|
}
|
||||||
|
databases[i++] = strdup(tbl->db);
|
||||||
|
}
|
||||||
|
tbl=tbl->next_local;
|
||||||
|
}
|
||||||
|
|
||||||
|
lex->current_select = lex->current_select->next_select_in_list();
|
||||||
|
}
|
||||||
|
|
||||||
|
retblock:
|
||||||
|
*size = i;
|
||||||
|
return databases;
|
||||||
|
}
|
||||||
|
@ -89,6 +89,7 @@ bool is_drop_table_query(GWBUF* querybuf);
|
|||||||
bool skygw_is_real_query(GWBUF* querybuf);
|
bool skygw_is_real_query(GWBUF* querybuf);
|
||||||
void* skygw_get_affected_tables(void* lexptr);
|
void* skygw_get_affected_tables(void* lexptr);
|
||||||
char** skygw_get_table_names(GWBUF* querybuf,int* tblsize,bool fullnames);
|
char** skygw_get_table_names(GWBUF* querybuf,int* tblsize,bool fullnames);
|
||||||
|
char** skygw_get_database_names(GWBUF* querybuf,int* size);
|
||||||
char* skygw_get_canonical(GWBUF* querybuf);
|
char* skygw_get_canonical(GWBUF* querybuf);
|
||||||
bool parse_query (GWBUF* querybuf);
|
bool parse_query (GWBUF* querybuf);
|
||||||
parsing_info_t* parsing_info_init(void (*donefun)(void *));
|
parsing_info_t* parsing_info_init(void (*donefun)(void *));
|
||||||
|
@ -39,7 +39,7 @@ MODULE_INFO info = {
|
|||||||
MODULE_API_ROUTER,
|
MODULE_API_ROUTER,
|
||||||
MODULE_BETA_RELEASE,
|
MODULE_BETA_RELEASE,
|
||||||
ROUTER_VERSION,
|
ROUTER_VERSION,
|
||||||
"A Read/Write splitting router for enhancement read scalability"
|
"A database sharding router for simple sharding"
|
||||||
};
|
};
|
||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
# include <mysql_client_server_protocol.h>
|
# include <mysql_client_server_protocol.h>
|
||||||
@ -103,12 +103,12 @@ static route_target_t get_route_target (
|
|||||||
bool trx_active,
|
bool trx_active,
|
||||||
target_t use_sql_variables_in,
|
target_t use_sql_variables_in,
|
||||||
HINT* hint);
|
HINT* hint);
|
||||||
|
#if 0
|
||||||
static backend_ref_t* check_candidate_bref(
|
static backend_ref_t* check_candidate_bref(
|
||||||
backend_ref_t* candidate_bref,
|
backend_ref_t* candidate_bref,
|
||||||
backend_ref_t* new_bref,
|
backend_ref_t* new_bref,
|
||||||
select_criteria_t sc);
|
select_criteria_t sc);
|
||||||
|
#endif
|
||||||
|
|
||||||
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
|
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
|
||||||
|
|
||||||
@ -152,11 +152,11 @@ static bool get_dcb(
|
|||||||
backend_type_t btype,
|
backend_type_t btype,
|
||||||
char* name,
|
char* name,
|
||||||
int max_rlag);
|
int max_rlag);
|
||||||
|
#if 0
|
||||||
static void rwsplit_process_router_options(
|
static void rwsplit_process_router_options(
|
||||||
ROUTER_INSTANCE* router,
|
ROUTER_INSTANCE* router,
|
||||||
char** options);
|
char** options);
|
||||||
|
#endif
|
||||||
|
|
||||||
static ROUTER_OBJECT MyObject = {
|
static ROUTER_OBJECT MyObject = {
|
||||||
createInstance,
|
createInstance,
|
||||||
@ -175,6 +175,9 @@ static bool rses_begin_locked_router_action(
|
|||||||
static void rses_end_locked_router_action(
|
static void rses_end_locked_router_action(
|
||||||
ROUTER_CLIENT_SES* rses);
|
ROUTER_CLIENT_SES* rses);
|
||||||
|
|
||||||
|
static int rses_get_max_replication_lag(
|
||||||
|
ROUTER_CLIENT_SES* rses);
|
||||||
|
|
||||||
static void mysql_sescmd_done(
|
static void mysql_sescmd_done(
|
||||||
mysql_sescmd_t* sescmd);
|
mysql_sescmd_t* sescmd);
|
||||||
|
|
||||||
@ -411,6 +414,36 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
|
|||||||
|
|
||||||
if(hashtable_add(hashtable,dbnm,servnm) == 0){
|
if(hashtable_add(hashtable,dbnm,servnm) == 0){
|
||||||
|
|
||||||
|
#ifdef SHARD_UPDATES
|
||||||
|
{
|
||||||
|
char* old_backend = (char*)hashtable_fetch(hashtable,dbnm);
|
||||||
|
int j;
|
||||||
|
bool is_alive = false;
|
||||||
|
|
||||||
|
for(j = 0;backends[j];j++){
|
||||||
|
/**
|
||||||
|
* See if the old backend is still alive. If not then update
|
||||||
|
* the hashtable with the current backend's name.
|
||||||
|
*/
|
||||||
|
if(strcmp(server->unique_name,old_backend) == 0 &&
|
||||||
|
SERVER_IS_RUNNING(server)){
|
||||||
|
is_alive = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!is_alive){
|
||||||
|
hashtable_delete(hashtable,dbnm);
|
||||||
|
if(hashtable_add(hashtable,dbnm,servnm)){
|
||||||
|
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
|
||||||
|
"Updated the backend of database '%s' to '%s'.",dbnm,servnm)));
|
||||||
|
}else{
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||||
|
"Error: Failed to insert values into hashtable.")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
/*Check if the failure was due to a duplicate value*/
|
/*Check if the failure was due to a duplicate value*/
|
||||||
if(hashtable_fetch(hashtable,dbnm) == NULL){
|
if(hashtable_fetch(hashtable,dbnm) == NULL){
|
||||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
|
||||||
@ -457,15 +490,43 @@ void* dbnames_hash_init(BACKEND** backends)
|
|||||||
|
|
||||||
/**Update the new hashtable with the key-value pairs*/
|
/**Update the new hashtable with the key-value pairs*/
|
||||||
if(!update_dbnames_hash(backends,htbl)){
|
if(!update_dbnames_hash(backends,htbl)){
|
||||||
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Errors encountered while querying databases.")));
|
//LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Errors encountered while querying databases.")));
|
||||||
hashtable_free(htbl);
|
//hashtable_free(htbl);
|
||||||
return NULL;
|
//return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return htbl;
|
return htbl;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool add_shard_info(GWBUF* buffer, char* target)
|
||||||
|
{
|
||||||
|
HINT* hint = hint_create_route(NULL,HINT_ROUTE_TO_NAMED_SERVER,target);
|
||||||
|
return (bool)gwbuf_add_hint(buffer,hint);
|
||||||
|
}
|
||||||
|
|
||||||
|
char* get_shard_target_name(ROUTER_INSTANCE* router, GWBUF* buffer){
|
||||||
|
HASHTABLE* ht = router->dbnames_hash;
|
||||||
|
int sz = 0,i;
|
||||||
|
char** dbnms = NULL;
|
||||||
|
char* rval = NULL;
|
||||||
|
|
||||||
|
if(!query_is_parsed(buffer)){
|
||||||
|
parse_query(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
dbnms = skygw_get_database_names(buffer,&sz);
|
||||||
|
if(sz > 0){
|
||||||
|
|
||||||
|
for(i = 0; i < sz; i++){
|
||||||
|
if((rval = (char*)hashtable_fetch(ht,dbnms[i]))){
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of the mandatory version entry point
|
* Implementation of the mandatory version entry point
|
||||||
*
|
*
|
||||||
@ -730,7 +791,7 @@ createInstance(SERVICE *service, char **options)
|
|||||||
* is used if any.
|
* is used if any.
|
||||||
*/
|
*/
|
||||||
router->dbshard_version = service->svc_config_version;
|
router->dbshard_version = service->svc_config_version;
|
||||||
refreshInstance(router, NULL);
|
//refreshInstance(router, NULL);
|
||||||
/**
|
/**
|
||||||
* Get hashtable which includes dbname,backend pairs
|
* Get hashtable which includes dbname,backend pairs
|
||||||
*/
|
*/
|
||||||
@ -768,7 +829,7 @@ static void* newSession(
|
|||||||
bool succp;
|
bool succp;
|
||||||
int router_nservers = 0; /*< # of servers in total */
|
int router_nservers = 0; /*< # of servers in total */
|
||||||
int i;
|
int i;
|
||||||
const int min_nservers = 1; /*< hard-coded for now */
|
//const int min_nservers = 1; /*< hard-coded for now */
|
||||||
|
|
||||||
client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
|
client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
|
||||||
|
|
||||||
@ -816,8 +877,9 @@ static void* newSession(
|
|||||||
* Instead of calling this, ensure that there is at least one
|
* Instead of calling this, ensure that there is at least one
|
||||||
* responding server.
|
* responding server.
|
||||||
*/
|
*/
|
||||||
#if 0
|
|
||||||
router_nservers = router_get_servercount(router);
|
router_nservers = router_get_servercount(router);
|
||||||
|
#if 0
|
||||||
if (!have_enough_servers(&client_rses,
|
if (!have_enough_servers(&client_rses,
|
||||||
min_nservers,
|
min_nservers,
|
||||||
router_nservers,
|
router_nservers,
|
||||||
@ -897,6 +959,7 @@ static void* newSession(
|
|||||||
client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT;
|
client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT;
|
||||||
client_rses->rses_backend_ref = backend_ref;
|
client_rses->rses_backend_ref = backend_ref;
|
||||||
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
|
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
|
||||||
|
client_rses->rses_master_ref = get_root_master_bref(client_rses);
|
||||||
router->stats.n_sessions += 1;
|
router->stats.n_sessions += 1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1236,10 +1299,12 @@ static bool get_dcb(
|
|||||||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||||
b->backend_server->rlag <= max_rlag)))
|
b->backend_server->rlag <= max_rlag)))
|
||||||
{
|
{
|
||||||
|
#if 0
|
||||||
candidate_bref = check_candidate_bref(
|
candidate_bref = check_candidate_bref(
|
||||||
candidate_bref,
|
candidate_bref,
|
||||||
&backend_ref[i],
|
&backend_ref[i],
|
||||||
rses->rses_config.rw_slave_select_criteria);
|
rses->rses_config.rw_slave_select_criteria);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -1349,6 +1414,7 @@ static route_target_t get_route_target (
|
|||||||
HINT* hint)
|
HINT* hint)
|
||||||
{
|
{
|
||||||
route_target_t target = TARGET_UNDEFINED;
|
route_target_t target = TARGET_UNDEFINED;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* These queries are not affected by hints
|
* These queries are not affected by hints
|
||||||
*/
|
*/
|
||||||
@ -1817,6 +1883,7 @@ static int routeQuery(
|
|||||||
bool succp = false;
|
bool succp = false;
|
||||||
int rlag_max = MAX_RLAG_UNDEFINED;
|
int rlag_max = MAX_RLAG_UNDEFINED;
|
||||||
backend_type_t btype; /*< target backend type */
|
backend_type_t btype; /*< target backend type */
|
||||||
|
char* tname = NULL;
|
||||||
|
|
||||||
CHK_CLIENT_RSES(router_cli_ses);
|
CHK_CLIENT_RSES(router_cli_ses);
|
||||||
|
|
||||||
@ -1829,7 +1896,7 @@ static int routeQuery(
|
|||||||
|
|
||||||
packet = GWBUF_DATA(querybuf);
|
packet = GWBUF_DATA(querybuf);
|
||||||
packet_type = packet[4];
|
packet_type = packet[4];
|
||||||
|
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
|
||||||
if (rses_is_closed)
|
if (rses_is_closed)
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
@ -1990,10 +2057,23 @@ static int routeQuery(
|
|||||||
* - route primarily according to the hints and if they failed,
|
* - route primarily according to the hints and if they failed,
|
||||||
* eventually to master
|
* eventually to master
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Added for simple sharding, using hints for testing.
|
||||||
|
*/
|
||||||
|
if((tname = get_shard_target_name(inst,querybuf)) != NULL &&
|
||||||
|
add_shard_info(querybuf,tname)){
|
||||||
|
|
||||||
|
route_target = TARGET_NAMED_SERVER;
|
||||||
|
}else{
|
||||||
|
|
||||||
route_target = get_route_target(qtype,
|
route_target = get_route_target(qtype,
|
||||||
router_cli_ses->rses_transaction_active,
|
router_cli_ses->rses_transaction_active,
|
||||||
router_cli_ses->rses_config.rw_use_sql_variables_in,
|
router_cli_ses->rses_config.rw_use_sql_variables_in,
|
||||||
querybuf->hint);
|
querybuf->hint);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
if (TARGET_IS_ALL(route_target))
|
if (TARGET_IS_ALL(route_target))
|
||||||
{
|
{
|
||||||
@ -2745,10 +2825,15 @@ static bool connect_backend_servers(
|
|||||||
ROUTER_INSTANCE* router)
|
ROUTER_INSTANCE* router)
|
||||||
{
|
{
|
||||||
bool succp = true;
|
bool succp = true;
|
||||||
|
//bool is_synced_master;
|
||||||
|
bool master_connected = true;
|
||||||
int servers_found = 0;
|
int servers_found = 0;
|
||||||
int servers_connected = 0;
|
int servers_connected = 0;
|
||||||
int i;
|
int slaves_connected = 0;
|
||||||
|
int i,max_nservers = router_nservers;
|
||||||
|
select_criteria_t select_criteria = LEAST_GLOBAL_CONNECTIONS;
|
||||||
|
|
||||||
|
#if 0
|
||||||
if (router->bitvalue != 0) /*< 'synced' is the only bitvalue in rwsplit */
|
if (router->bitvalue != 0) /*< 'synced' is the only bitvalue in rwsplit */
|
||||||
{
|
{
|
||||||
is_synced_master = true;
|
is_synced_master = true;
|
||||||
@ -2757,6 +2842,7 @@ static bool connect_backend_servers(
|
|||||||
{
|
{
|
||||||
is_synced_master = false;
|
is_synced_master = false;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#if defined(EXTRA_SS_DEBUG)
|
#if defined(EXTRA_SS_DEBUG)
|
||||||
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:")));
|
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:")));
|
||||||
@ -2804,7 +2890,7 @@ static bool connect_backend_servers(
|
|||||||
*/
|
*/
|
||||||
for (i=0;
|
for (i=0;
|
||||||
i<router_nservers &&
|
i<router_nservers &&
|
||||||
(server_connected < max_nservers || !master_connected);
|
(servers_connected < max_nservers || !master_connected);
|
||||||
i++)
|
i++)
|
||||||
{
|
{
|
||||||
BACKEND* b = backend_ref[i].bref_backend;
|
BACKEND* b = backend_ref[i].bref_backend;
|
||||||
@ -2944,7 +3030,7 @@ static bool connect_backend_servers(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return_succp:
|
//return_succp:
|
||||||
|
|
||||||
return succp;
|
return succp;
|
||||||
}
|
}
|
||||||
@ -3959,8 +4045,8 @@ static bool handle_error_new_connection(
|
|||||||
{
|
{
|
||||||
SESSION* ses;
|
SESSION* ses;
|
||||||
int router_nservers;
|
int router_nservers;
|
||||||
int max_nslaves;
|
|
||||||
int max_slave_rlag;
|
|
||||||
backend_ref_t* bref;
|
backend_ref_t* bref;
|
||||||
bool succp;
|
bool succp;
|
||||||
|
|
||||||
@ -4096,7 +4182,7 @@ static int router_get_servercount(
|
|||||||
return router_nservers;
|
return router_nservers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
/**
|
/**
|
||||||
* Find out the number of read backend servers.
|
* Find out the number of read backend servers.
|
||||||
* Depending on the configuration value type, either copy direct count
|
* Depending on the configuration value type, either copy direct count
|
||||||
@ -4124,7 +4210,7 @@ static int rses_get_max_slavecount(
|
|||||||
|
|
||||||
return max_nslaves;
|
return max_nslaves;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
static int rses_get_max_replication_lag(
|
static int rses_get_max_replication_lag(
|
||||||
ROUTER_CLIENT_SES* rses)
|
ROUTER_CLIENT_SES* rses)
|
||||||
|
Reference in New Issue
Block a user