Conflicts:
	server/modules/routing/dbshard/dbshard.c
This commit is contained in:
VilhoRaatikka
2014-12-15 15:24:56 +02:00
3 changed files with 415 additions and 69 deletions

View File

@ -450,6 +450,58 @@ int error_count = 0;
param->value)));
}
}
if(is_dbshard)
{
CONFIG_PARAMETER* param = NULL;
char* ignore_databases;
bool succp;
ignore_databases =
config_get_value(obj->parameters,
"ignore_databases");
if (ignore_databases != NULL)
{
param = config_get_param(
obj->parameters,
"ignore_databases");
}
if (param == NULL)
{
succp = false;
}
else
{
param->qfd.valstr = strdup(param->value);
param->qfd_param_type = STRING_TYPE;
succp = service_set_param_value(obj->element,
param,
ignore_databases,
COUNT_NONE,
STRING_TYPE);
}
if (!succp)
{
if(param){
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Warning : invalid value type "
"for parameter \'%s.%s = %s\'\n\tExpected "
"type is [master|all] for "
"use sql variables in.",
((SERVICE*)obj->element)->name,
param->name,
param->value)));
}else{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Error : parameter was NULL")));
}
}
}
/** Parameters for rwsplit router only */
if (is_rwsplit)
{
@ -1685,6 +1737,7 @@ static char *service_params[] =
"max_slave_connections",
"max_slave_replication_lag",
"use_sql_variables_in", /*< rwsplit only */
"ignore_databases",
"version_string",
"filters",
"weightby",

View File

@ -80,12 +80,14 @@ typedef enum {
TARGET_SLAVE = 0x02,
TARGET_NAMED_SERVER = 0x04,
TARGET_ALL = 0x08,
TARGET_RLAG_MAX = 0x10
TARGET_RLAG_MAX = 0x10,
TARGET_ANY = 0x20
} route_target_t;
#define TARGET_IS_UNDEFINED(t) (t == TARGET_UNDEFINED)
#define TARGET_IS_NAMED_SERVER(t) (t & TARGET_NAMED_SERVER)
#define TARGET_IS_ALL(t) (t & TARGET_ALL)
#define TARGET_IS_ANY(t) (t & TARGET_ANY)
typedef struct rses_property_st rses_property_t;
typedef struct router_client_session ROUTER_CLIENT_SES;
@ -305,7 +307,7 @@ typedef struct router_instance {
#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED));
void* dbnames_hash_init(BACKEND** backends);
bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable);
void* dbnames_hash_init(ROUTER_INSTANCE* inst,BACKEND** backends);
bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* hashtable);
#endif /*< _DBSHARDROUTER_H */

View File

@ -152,6 +152,9 @@ static bool get_shard_dcb(
DCB** dcb,
ROUTER_CLIENT_SES* rses,
char* name);
bool is_ignored_database(ROUTER_INSTANCE* inst, char* str);
#if 0
static void rwsplit_process_router_options(
ROUTER_INSTANCE* router,
@ -317,12 +320,13 @@ static void* hfree(void* fval)
/**
* Updates the hashtable with the database names and where to find them, adding
* new and removing obsolete pairs.
* @param inst Router instance
* @param backends Backends to query for database names
* @param hashtable Hashtable to use
* @return True if all database and server names were successfully retrieved
* otherwise false
*/
bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* hashtable)
{
const unsigned int connect_timeout = 15;
const unsigned int read_timeout = 10;
@ -434,20 +438,15 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
lengths = mysql_fetch_lengths(result);
/**
* Default databases to ignore, disable for now
*/
#ifdef NO_DEFAULT_DATABASES
if(strncmp(row[0],"information_schema",lengths[0]) == 0 ||
strncmp(row[0],"performance_schema",lengths[0]) == 0 ||
strncmp(row[0],"mysql",lengths[0]) == 0 ||
strncmp(row[0],"mysqlslap",lengths[0]) == 0 ||
strncmp(row[0],"test",lengths[0]) == 0){
continue;
}
#endif
dbnm = (char*)calloc(lengths[0] + 1,sizeof(char));
memcpy(dbnm,row[0],lengths[0]);
if(is_ignored_database(inst,dbnm))
{
free(dbnm);
continue;
}
servnm = strdup(server->unique_name);
if(hashtable_add(hashtable,dbnm,servnm) == 0)
@ -462,15 +461,16 @@ bool update_dbnames_hash(BACKEND** backends, HASHTABLE* hashtable)
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : conflicting "
"databases found. "
"Both \"%s\" and \"%s\" "
"have a database \"%s\".",
server->unique_name,
srvname,
dbnm)));
if(strcmp(srvname,server->unique_name) != 0)
{
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"Both \"%s\" and \"%s\" "
"have a database \"%s\".",
srvname,
server->unique_name,
dbnm)));
}
}
if(srvname)
@ -529,14 +529,37 @@ cleanup:
return rval;
}
/**
* Check if the database is in the ignore list of the router instance
* @param inst Router instance
* @param str Null-terminated string with the database name to check
* @return True if the database is in the ignore list and false if it is not in it
*/
bool is_ignored_database(ROUTER_INSTANCE* inst, char* str)
{
if(inst->ignore_list)
{
int i;
for(i = 0;inst->ignore_list[i];i++)
{
if(strcmp(inst->ignore_list[i],str) == 0)
{
return true;
}
}
}
return false;
}
/**
* Allocates a new hashtable and inserts database names and where to find them
* into it.
* @param inst Router instance
* @param backends Backends to query for database names
* @return Pointer to the newly allocated hashtable or NULL if an error occurred
*/
void* dbnames_hash_init(BACKEND** backends)
void* dbnames_hash_init(ROUTER_INSTANCE* inst,BACKEND** backends)
{
HASHTABLE* htbl = hashtable_alloc(512,hashkeyfun,hashcmpfun);
@ -548,13 +571,8 @@ void* dbnames_hash_init(BACKEND** backends)
return NULL;
}
/**Update the new hashtable with the key-value pairs*/
if(!update_dbnames_hash(backends,htbl))
if(!update_dbnames_hash(inst,backends,htbl))
{
/*
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Errors encountered while querying databases.")));
hashtable_free(htbl);
return NULL;
*/
hashtable_free(htbl);
htbl = NULL;
}
@ -568,12 +586,11 @@ void* dbnames_hash_init(BACKEND** backends)
* @param buffer Query to inspect
* @return Name of the backend or NULL if the query contains no known databases.
*/
char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF* buffer){
char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF* buffer,skygw_query_type_t qtype){
HASHTABLE* ht = router->dbnames_hash;
int sz = 0,i,j;
char** dbnms = NULL;
char* rval = NULL;
bool has_dbs = false; /**If the query targets any database other than the current one*/
if(!query_is_parsed(buffer)){
@ -600,7 +617,8 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client,
* check if the session has an active database and if it is sharded.
*/
if(rval == NULL && !has_dbs && client->rses_mysql_session->db[0] != '\0'){
if(QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_TABLES) ||
(rval == NULL && !has_dbs && client->rses_mysql_session->db[0] != '\0')){
rval = (char*)hashtable_fetch(ht,client->rses_mysql_session->db);
}
@ -629,7 +647,7 @@ bool check_shard_status(ROUTER_INSTANCE* router, char* shard)
}
else
{
update_dbnames_hash(router->servers,router->dbnames_hash);
update_dbnames_hash(router,router->servers,router->dbnames_hash);
}
break;
}
@ -637,6 +655,43 @@ bool check_shard_status(ROUTER_INSTANCE* router, char* shard)
return rval;
}
char** tokenize_string(char* str)
{
char *tok;
char **list = NULL;
int sz = 2, count = 0;
tok = strtok(str,", ");
if(tok == NULL)
return NULL;
list = (char**)malloc(sizeof(char*)*(sz));
while(tok)
{
if(count + 1 >= sz)
{
char** tmp = realloc(list,sizeof(char*)*(sz*2));
if(tmp == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : realloc returned NULL: %s.",strerror(errno))));
free(list);
return NULL;
}
list = tmp;
sz *= 2;
}
list[count] = strdup(tok);
count++;
tok = strtok(NULL,", ");
}
list[count] = NULL;
return list;
}
/**
* Implementation of the mandatory version entry point
*
@ -700,7 +755,8 @@ static void refreshInstance(
/** Catch unused parameter types */
ss_dassert(paramtype == COUNT_TYPE ||
paramtype == PERCENT_TYPE ||
paramtype == SQLVAR_TARGET_TYPE);
paramtype == SQLVAR_TARGET_TYPE ||
paramtype == STRING_TYPE);
if (paramtype == COUNT_TYPE)
{
@ -708,6 +764,16 @@ static void refreshInstance(
else if (paramtype == PERCENT_TYPE)
{
}
else if (paramtype == STRING_TYPE)
{
if (strncmp(param->name,
"ignore_databases",
MAX_PARAM_LEN) == 0)
{
router->ignore_list = tokenize_string(param->qfd.valstr);
}
}
else if (paramtype == SQLVAR_TARGET_TYPE)
{
if (strncmp(param->name,
@ -792,8 +858,7 @@ createInstance(SERVICE *service, char **options)
{
ROUTER_INSTANCE* router;
SERVER* server;
//CONFIG_PARAMETER* conf;
//char* confval = NULL;
CONFIG_PARAMETER* conf;
int nservers;
int i;
@ -862,21 +927,12 @@ createInstance(SERVICE *service, char **options)
router->bitvalue = 0;
/* Ignored for now
conf = config_get_param(service->svc_config_param,"ignore_databases");
if(conf)
{
confval = conf->value;
}
if(!parse_db_ignore_list(router,confval))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Parsing of database ignore list failed. ")));
goto clean_up;
}
*/
refreshInstance(router, conf);
}
/**
* Read config version number from service to inform what configuration
@ -888,7 +944,7 @@ createInstance(SERVICE *service, char **options)
/**
* Get hashtable which includes dbname,backend pairs
*/
router->dbnames_hash = (HASHTABLE*)dbnames_hash_init(router->servers);
router->dbnames_hash = (HASHTABLE*)dbnames_hash_init(router,router->servers);
if (router->dbnames_hash == NULL)
{
@ -1383,12 +1439,17 @@ static route_target_t get_shard_route_target (
QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_WRITE)) ||
/** enable or disable autocommit are always routed to all */
QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ)) /** added for @@version comment, temporary*/
QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT))
{
/** hints don't affect on routing */
target = TARGET_ALL;
}
else if(QUERY_IS_TYPE(qtype, QUERY_TYPE_SYSVAR_READ) ||
(use_sql_variables_in == TYPE_ALL &&
QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_READ)))
{
target = TARGET_ANY;
}
#if defined(SS_DEBUG)
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
@ -1647,6 +1708,137 @@ void check_create_tmp_table(
}
}
GWBUF* gen_show_dbs_response(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
{
GWBUF* rval = NULL;
HASHTABLE* ht = router->dbnames_hash;
HASHITERATOR* iter = hashtable_iterator(ht);
unsigned int coldef_len = 0;
char dbname[MYSQL_DATABASE_MAXLEN+1];
char *value;
unsigned char* ptr;
char catalog[4] = {0x03,'d','e','f'};
const char* schema = "information_schema";
const char* table = "SCHEMATA";
const char* org_table = "SCHEMATA";
const char* name = "Database";
const char* org_name = "SCHEMA_NAME";
char next_length = 0x0c;
char charset[2] = {0x21, 0x00};
char column_length[4] = { MYSQL_DATABASE_MAXLEN,
MYSQL_DATABASE_MAXLEN >> 8,
MYSQL_DATABASE_MAXLEN >> 16,
MYSQL_DATABASE_MAXLEN >> 24 };
char column_type = 0xfd;
char eof[9] = { 0x05,0x00,0x00,
0x03,0xfe,0x00,
0x00,0x22,0x00 };
char ok_packet[11] = { 0x07,0x00,0x00,0x00,
0x00,0x00,0x00,
0x00,0x00,
0x00,0x00 };
coldef_len = sizeof(catalog) + strlen(schema) + 1 +
strlen(table) + 1 +
strlen(org_table) + 1 +
strlen(name) + 1 +
strlen(org_name) + 1 +
1 + 2 + 4 + 1 + 2 + 1 + 2;
rval = gwbuf_alloc(5 + 4 + coldef_len + sizeof(eof));
ptr = rval->start;
/**First packet*/
*ptr++ = 0x01;
*ptr++ = 0x00;
*ptr++ = 0x00;
*ptr++ = 0x01;
*ptr++ = 0x01;
/**Second packet containing the column definitions*/
*ptr++ = coldef_len;
*ptr++ = coldef_len >> 8;
*ptr++ = coldef_len >> 16;
*ptr++ = 0x02;
memcpy((void*)ptr,catalog,4);
ptr += 4;
*ptr++ = strlen(schema);
memcpy((void*)ptr,schema,strlen(schema));
ptr += strlen(schema);
*ptr++ = strlen(table);
memcpy((void*)ptr,table,strlen(table));
ptr += strlen(table);
*ptr++ = strlen(org_table);
memcpy((void*)ptr,org_table,strlen(org_table));
ptr += strlen(org_table);
*ptr++ = strlen(name);
memcpy((void*)ptr,name,strlen(name));
ptr += strlen(name);
*ptr++ = strlen(org_name);
memcpy((void*)ptr,org_name,strlen(org_name));
ptr += strlen(org_name);
*ptr++ = next_length;
*ptr++ = charset[0];
*ptr++ = charset[1];
*ptr++ = column_length[0];
*ptr++ = column_length[1];
*ptr++ = column_length[2];
*ptr++ = column_length[3];
*ptr++ = column_type;
*ptr++ = 0x01;
memset(ptr,0,4);
ptr += 4;
memcpy(ptr,eof,sizeof(eof));
unsigned int packet_num = 4;
while((value = (char*)hashtable_next(iter)))
{
GWBUF* temp;
int plen = strlen(value) + 1;
sprintf(dbname,"%s",value);
temp = gwbuf_alloc(plen + 4);
ptr = temp->start;
*ptr++ = plen;
*ptr++ = plen >> 8;
*ptr++ = plen >> 16;
*ptr++ = packet_num++;
*ptr++ = plen - 1;
memcpy(ptr,dbname,plen - 1);
/** Append the row*/
rval = gwbuf_append(rval,temp);
}
eof[3] = packet_num;
GWBUF* last_packet = gwbuf_alloc(sizeof(eof));
memcpy(last_packet->start,eof,sizeof(eof));
rval = gwbuf_append(rval,last_packet);
rval = gwbuf_make_contiguous(rval);
return rval;
}
/**
* The main routing entry, this is called with every packet that is
* received and has to be forwarded to the backend database.
@ -1683,6 +1875,7 @@ static int routeQuery(
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
bool rses_is_closed = false;
bool change_successful = false;
route_target_t route_target = TARGET_UNDEFINED;
bool succp = false;
char* tname = NULL;
@ -1775,7 +1968,7 @@ static int routeQuery(
if (packet_type == MYSQL_COM_INIT_DB)
{
if (!change_current_db(inst, router_cli_ses, querybuf))
if (!(change_successful = change_current_db(inst, router_cli_ses, querybuf)))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
@ -1785,7 +1978,7 @@ static int routeQuery(
/* goto retblock; */
}
/**
/**
* !!! Temporary tablen tutkiminen voi olla turhaa. Poista tarvittaessa.
*/
/**
@ -1865,6 +2058,42 @@ static int routeQuery(
* Find out whether the query should be routed to single server or to
* all of them.
*/
if(QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_DATABASES))
{
/**
* Generate custom response that contains all the databases
* after updating the hashtable
*/
backend_ref_t* backend = NULL;
DCB* backend_dcb = NULL;
int i;
update_dbnames_hash(inst,inst->servers,inst->dbnames_hash);
for(i = 0;i < router_cli_ses->rses_nbackends;i++)
{
if(SERVER_IS_RUNNING(router_cli_ses->rses_backend_ref[i].bref_backend->backend_server))
{
backend = &router_cli_ses->rses_backend_ref[i];
backend_dcb = backend->bref_dcb;
break;
}
}
if(backend)
{
GWBUF* fake = gen_show_dbs_response(inst,router_cli_ses);
poll_add_epollin_event_to_dcb(backend_dcb,fake);
ret = 1;
}
else
{
ret = 0;
}
goto retblock;
}
if (packet_type == MYSQL_COM_INIT_DB)
{
char dbname[MYSQL_DATABASE_MAXLEN+1];
@ -1877,7 +2106,7 @@ static int routeQuery(
route_target = TARGET_NAMED_SERVER;
}
}
else if((tname = get_shard_target_name(inst,router_cli_ses,querybuf)) != NULL)
else if((tname = get_shard_target_name(inst,router_cli_ses,querybuf,qtype)) != NULL)
{
bool shard_ok = check_shard_status(inst,tname);
@ -1894,7 +2123,9 @@ static int routeQuery(
* the target is undefined and an error will be returned to the client.
*/
if((tname = get_shard_target_name(inst,router_cli_ses,querybuf)) != NULL &&
update_dbnames_hash(inst,inst->servers,inst->dbnames_hash);
if((tname = get_shard_target_name(inst,router_cli_ses,querybuf,qtype)) != NULL &&
check_shard_status(inst,tname))
{
route_target = TARGET_NAMED_SERVER;
@ -1904,6 +2135,11 @@ static int routeQuery(
else
{
/**
* The query targets something else than a shard.
*/
route_target = get_shard_route_target(qtype,
router_cli_ses->rses_transaction_active,
router_cli_ses->rses_config.rw_use_sql_variables_in,
@ -1919,16 +2155,21 @@ static int routeQuery(
char errstr[2048];
GWBUF *errbuff;
update_dbnames_hash(inst->servers,inst->dbnames_hash);
tname = get_shard_target_name(inst,router_cli_ses,querybuf);
update_dbnames_hash(inst,inst->servers,inst->dbnames_hash);
tname = get_shard_target_name(inst,router_cli_ses,querybuf,qtype);
if(tname == NULL &&
router_cli_ses->rses_mysql_session->db[0] == '\0')
if((tname == NULL &&
router_cli_ses->rses_mysql_session->db[0] == '\0') ||
(packet_type == MYSQL_COM_INIT_DB && change_successful) ||
packet_type == MYSQL_COM_FIELD_LIST ||
(router_cli_ses->rses_mysql_session->db[0] != '\0' &&
is_ignored_database(inst,router_cli_ses->rses_mysql_session->db)))
{
/**
* No current database or databases in query, route to all.
* No current database and no databases in query or
* the database is ignored, route to first available backend.
*/
route_target = TARGET_ALL;
route_target = TARGET_ANY;
}
else
@ -1979,6 +2220,32 @@ static int routeQuery(
ret = 0;
goto retblock;
}
if (TARGET_IS_ANY(route_target))
{
int z;
for(z = 0;inst->servers[z];z++)
{
if(SERVER_IS_RUNNING(inst->servers[z]->backend_server))
{
tname = inst->servers[z]->backend_server->unique_name;
route_target = TARGET_NAMED_SERVER;
break;
}
}
if(TARGET_IS_ANY(route_target))
{
/**No valid backends alive*/
rses_end_locked_router_action(router_cli_ses);
ret = 0;
goto retblock;
}
}
/**
* Query is routed to one of the backends
*/
@ -4072,6 +4339,8 @@ static bool prep_stmt_drop(
}
#endif /*< PREP_STMT_CACHING */
#if 0
/********************************
* This routine returns the root master server from MySQL replication tree
* Get the root Master rule:
@ -4116,7 +4385,10 @@ static BACKEND *get_root_master(
return master_host;
}
#endif
#if 0
/********************************
* This routine returns the root master server from MySQL replication tree
* Get the root Master rule:
@ -4167,6 +4439,7 @@ static backend_ref_t* get_root_master_bref(
return candidate_bref;
}
#endif
static void dbshard_process_router_options(
ROUTER_INSTANCE* router,
@ -4239,27 +4512,45 @@ static bool change_current_db(
{
bool succp;
uint8_t* packet;
int message_len;
unsigned int plen;
int message_len,i;
char* fail_str;
if(GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5)
{
packet = GWBUF_DATA(buf);
plen = gw_mysql_get_byte3(packet) - 1;
/** Copy database name from MySQL packet to session */
strncpy(rses->rses_mysql_session->db,
(char*)(packet + 5),
(int)(GWBUF_LENGTH(buf) - 5));
memcpy(rses->rses_mysql_session->db,
packet + 5,
plen);
memset(rses->rses_mysql_session->db + plen,0,1);
/**
* Update the session's active database only if it's in the hashtable.
* If it isn't found, send a custom error packet to the client.
*/
update_dbnames_hash(inst->servers,inst->dbnames_hash);
update_dbnames_hash(inst,inst->servers,inst->dbnames_hash);
if(hashtable_fetch(
inst->dbnames_hash,
(char*)rses->rses_mysql_session->db) == NULL)
{
if(inst->ignore_list)
{
for(i = 0;inst->ignore_list[i];i++)
{
if(strcmp(inst->ignore_list[i],rses->rses_mysql_session->db) == 0)
{
succp = true;
goto retblock;
}
}
}
/** Create error message */
message_len = 25 + MYSQL_DATABASE_MAXLEN;
fail_str = calloc(1, message_len+1);