diff --git a/server/core/config.c b/server/core/config.c index 463122138..47d56a818 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -485,6 +485,16 @@ process_config_context(CONFIG_CONTEXT *context) 1,STRING_TYPE); } + CONFIG_PARAMETER* param; + if((param = config_get_param(obj->parameters, "ignore_databases"))) + { + service_set_param_value(obj->element, param, param->value, 0, STRING_TYPE); + } + + if((param = config_get_param(obj->parameters, "ignore_databases_regex"))) + { + service_set_param_value(obj->element, param, param->value, 0, STRING_TYPE); + } /** flag for rwsplit-specific parameters */ if (strncmp(router, "readwritesplit", strlen("readwritesplit")+1) == 0) { @@ -1746,6 +1756,18 @@ SERVER *server; version_string = config_get_value(obj->parameters, "version_string"); allow_localhost_match_wildcard_host = config_get_value(obj->parameters, "localhost_match_wildcard_host"); + CONFIG_PARAMETER* param; + + if((param = config_get_param(obj->parameters, "ignore_databases"))) + { + service_set_param_value(service, param, param->value, 0, STRING_TYPE); + } + + if((param = config_get_param(obj->parameters, "ignore_databases_regex"))) + { + service_set_param_value(service, param, param->value, 0, STRING_TYPE); + } + if (version_string) { if (service->version_string) { free(service->version_string); @@ -2150,6 +2172,8 @@ static char *service_params[] = "ssl_key", "ssl_version", "ssl_cert_verify_depth", + "ignore_databases", + "ignore_databases_regex", NULL }; diff --git a/server/include/maxconfig.h b/server/include/maxconfig.h index 99816d820..b1a1b1589 100644 --- a/server/include/maxconfig.h +++ b/server/include/maxconfig.h @@ -150,4 +150,5 @@ void config_disable_feedback_task(void); unsigned long config_get_gateway_id(void); GATEWAY_CONF* config_get_global_options(); bool isInternalService(char *router); +char* config_clean_string_list(char* str); #endif diff --git a/server/modules/include/schemarouter.h b/server/modules/include/schemarouter.h index 0dc7cc8c5..cfb4eb825 100644 --- a/server/modules/include/schemarouter.h +++ b/server/modules/include/schemarouter.h @@ -32,7 +32,7 @@ #include #include #include - +#include /** * Bitmask values for the router session's initialization. These values are used * to prevent responses from internal commands being forwarded to the client. @@ -42,8 +42,8 @@ typedef enum init_mask INIT_READY = 0x0, INIT_MAPPING = 0x1, INIT_USE_DB = 0x02, - INIT_UNINT = 0x04 - + INIT_UNINT = 0x04, + INIT_FAILED = 0x08 } init_mask_t; /** @@ -64,6 +64,10 @@ typedef enum bref_state { #define BREF_IS_CLOSED(s) ((s)->bref_state & BREF_CLOSED) #define BREF_IS_MAPPED(s) ((s)->bref_mapped) +#define SCHEMA_ERR_DUPLICATEDB 5000 +#define SCHEMA_ERRSTR_DUPLICATEDB "DUPDB" +#define SCHEMA_ERR_DBNOTFOUND 1049 +#define SCHEMA_ERRSTR_DBNOTFOUND "42000" /** * The type of the backend server */ @@ -316,6 +320,12 @@ typedef struct router_instance { ROUTER_STATS stats; /*< Statistics for this router */ struct router_instance* next; /*< Next router on the list */ bool available_slaves; /*< The router has some slaves available */ + HASHTABLE* ignored_dbs; /*< List of databases to ignore when the + * database mapping finds multiple servers + * with the same database */ + pcre* ignore_regex; /*< Databases matching this regex will + * not cause the session to be terminated + * if they are found on more than one server. */ } ROUTER_INSTANCE; diff --git a/server/modules/routing/schemarouter/schemarouter.c b/server/modules/routing/schemarouter/schemarouter.c index f3ad795e4..43d92e567 100644 --- a/server/modules/routing/schemarouter/schemarouter.c +++ b/server/modules/routing/schemarouter/schemarouter.c @@ -34,6 +34,7 @@ #include #include #include +#include #define DEFAULT_REFRESH_INTERVAL 30.0 @@ -305,86 +306,112 @@ char* get_lenenc_str(void* data) * @param rses Router client session * @param target Target server where the database is * @param buf GWBUF containing the result set - * @return True if the buffer contained a result set with a single column. All other responses return false. + * @return 1 if a complete response was received, 0 if a partial response was received + * and -1 if a database was found on more than one server. */ -bool parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* bref, GWBUF** buffer) +int parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* bref, GWBUF** buffer) { - unsigned char* ptr; - char* target = bref->bref_backend->backend_server->unique_name; - GWBUF* buf; + unsigned char* ptr; + char* target = bref->bref_backend->backend_server->unique_name; + GWBUF* buf; + bool error = false; + int rval = 0; - if(buffer == NULL || *buffer == NULL) - return false; + if (buffer == NULL || *buffer == NULL) + return -1; - buf = modutil_get_complete_packets(buffer); + buf = modutil_get_complete_packets(buffer); - if(buf == NULL) - return false; - - ptr = (unsigned char*)buf->start; - - if(PTR_IS_ERR(ptr)) + if (buf == NULL) + return 0; + + ptr = (unsigned char*) buf->start; + + if (PTR_IS_ERR(ptr)) { - skygw_log_write(LOGFILE_TRACE,"schemarouter: SHOW DATABASES returned an error."); + skygw_log_write(LOGFILE_TRACE, "schemarouter: SHOW DATABASES returned an error."); gwbuf_free(buf); - return true; + return -1; } - if(bref->n_mapping_eof == 0) - { - /** Skip column definitions */ - while(ptr < (unsigned char*)buf->end && !PTR_IS_EOF(ptr)) - { - ptr += gw_mysql_get_byte3(ptr) + 4; - } + if (bref->n_mapping_eof == 0) + { + /** Skip column definitions */ + while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr)) + { + ptr += gw_mysql_get_byte3(ptr) + 4; + } - if(ptr >= (unsigned char*)buf->end) - { - skygw_log_write(LOGFILE_TRACE,"schemarouter: Malformed packet for SHOW DATABASES."); - *buffer = gwbuf_append(buf,*buffer); - return false; - } + if (ptr >= (unsigned char*) buf->end) + { + skygw_log_write(LOGFILE_TRACE, "schemarouter: Malformed packet for SHOW DATABASES."); + *buffer = gwbuf_append(buf, *buffer); + return false; + } - atomic_add(&bref->n_mapping_eof,1); - /** Skip first EOF packet */ - ptr += gw_mysql_get_byte3(ptr) + 4; - } - - if(bref->n_mapping_eof == 1) - { - while(ptr < (unsigned char*)buf->end && !PTR_IS_EOF(ptr)) - { - int payloadlen = gw_mysql_get_byte3(ptr); - int packetlen = payloadlen + 4; - char* data = get_lenenc_str(ptr+4); + atomic_add(&bref->n_mapping_eof, 1); + /** Skip first EOF packet */ + ptr += gw_mysql_get_byte3(ptr) + 4; + } - if(data) - { - if(hashtable_add(rses->dbhash,data,target)) - { - skygw_log_write(LOGFILE_TRACE,"schemarouter: <%s, %s>",target,data); - } - free(data); - } - ptr += packetlen; - } - } - if(ptr < (unsigned char*)buf->end && PTR_IS_EOF(ptr) && - bref->n_mapping_eof == 1) - { - atomic_add(&bref->n_mapping_eof,1); - skygw_log_write(LOGFILE_TRACE,"schemarouter: SHOW DATABASES fully received from %s.", - bref->bref_backend->backend_server->unique_name); - } - else - { - skygw_log_write(LOGFILE_TRACE,"schemarouter: SHOW DATABASES partially received from %s.", - bref->bref_backend->backend_server->unique_name); - } + if (bref->n_mapping_eof == 1) + { + while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr)) + { + int payloadlen = gw_mysql_get_byte3(ptr); + int packetlen = payloadlen + 4; + char* data = get_lenenc_str(ptr + 4); - gwbuf_free(buf); + if (data) + { + if (hashtable_add(rses->dbhash, data, target)) + { + skygw_log_write(LOGFILE_TRACE, "schemarouter: <%s, %s>", target, data); + } + else + { + int ovector[24]; + const int ovec_count = 24; - return bref->n_mapping_eof == 2; + if (!(hashtable_fetch(rses->router->ignored_dbs, data) || + (rses->router->ignore_regex && + pcre_exec(rses->router->ignore_regex, NULL, (const char*) data, + strlen(data), 0, 0, ovector, ovec_count) >= 0))) + { + error = true; + skygw_log_write(LE, "Error: Database '%s' found on servers '%s' and '%s' for user %s@%s.", + data, target, hashtable_fetch(rses->dbhash, data), + rses->rses_client_dcb->user, + rses->rses_client_dcb->remote); + } + } + free(data); + } + ptr += packetlen; + } + } + + if (ptr < (unsigned char*) buf->end && PTR_IS_EOF(ptr) && + bref->n_mapping_eof == 1) + { + atomic_add(&bref->n_mapping_eof, 1); + skygw_log_write(LOGFILE_TRACE, "schemarouter: SHOW DATABASES fully received from %s.", + bref->bref_backend->backend_server->unique_name); + } + else + { + skygw_log_write(LOGFILE_TRACE, "schemarouter: SHOW DATABASES partially received from %s.", + bref->bref_backend->backend_server->unique_name); + } + + gwbuf_free(buf); + + if (error) + rval = -1; + else if (bref->n_mapping_eof == 2) + rval = 1; + + return rval; } /** @@ -718,10 +745,26 @@ createInstance(SERVICE *service, char **options) CONFIG_PARAMETER* conf; int nservers; int i; - + CONFIG_PARAMETER* param; + if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { return NULL; } + if((router->ignored_dbs = hashtable_alloc(100, hashkeyfun, hashcmpfun)) == NULL) + { + skygw_log_write(LE,"Error: Memory allocation failed when allocating schemarouter database ignore list."); + free(router); + return NULL; + } + hashtable_memory_fns(router->ignored_dbs,(HASHMEMORYFN)strdup, + NULL, + (HASHMEMORYFN)free, + NULL); + + /** Add default system databases to ignore */ + hashtable_add(router->ignored_dbs,"mysql",""); + hashtable_add(router->ignored_dbs,"information_schema",""); + hashtable_add(router->ignored_dbs,"performance_schema",""); router->service = service; router->schemarouter_config.max_sescmd_hist = 0; router->schemarouter_config.last_refresh = time(NULL); @@ -733,7 +776,7 @@ createInstance(SERVICE *service, char **options) router->stats.n_sescmd = 0; router->stats.ses_longest = 0; router->stats.ses_shortest = (double)((unsigned long)(~0)); - spinlock_init(&router->lock); + spinlock_init(&router->lock); /** Calculate number of servers */ server = service->dbref; @@ -747,6 +790,36 @@ createInstance(SERVICE *service, char **options) service->users_from_all = true; } + if((param = config_get_param(conf,"ignore_databases_regex"))) + { + const char* errptr; + int erroffset; + pcre* re = pcre_compile(param->value, 0, &errptr, &erroffset, NULL); + + if(re == NULL) + { + skygw_log_write(LE, "Error: Regex compilation failed at %d for regex '%s': %s", + erroffset, param->value, errptr); + hashtable_free(router->ignored_dbs); + free(router); + return NULL; + } + router->ignore_regex = re; + } + + if((param = config_get_param(conf,"ignore_databases"))) + { + char *sptr, *tok, *val = config_clean_string_list(param->value); + + tok = strtok_r(val, ",", &sptr); + + while(tok) + { + hashtable_add(router->ignored_dbs, tok, ""); + tok = strtok_r(NULL, ",", &sptr); + } + } + bool failure = false; for(i=0;options && options[i];i++) @@ -2078,7 +2151,8 @@ static int routeQuery( { sprintf(errbuf + strlen(errbuf)," ([%lu]: DB change failed)",router_cli_ses->rses_client_dcb->session->ses_id); } - GWBUF* error = modutil_create_mysql_err_msg(1, 0, 1049, "42000", errbuf); + GWBUF* error = modutil_create_mysql_err_msg(1, 0, SCHEMA_ERR_DBNOTFOUND, + SCHEMA_ERRSTR_DBNOTFOUND, errbuf); if (error == NULL) { @@ -2564,31 +2638,70 @@ static void clientReply ( for(i = 0; i < router_cli_ses->rses_nbackends; i++) { if(bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i])) + { + if (bref->map_queue) { - if(bref->map_queue) - { - writebuf = gwbuf_append(bref->map_queue,writebuf); - bref->map_queue = NULL; - } - - if(parse_showdb_response(router_cli_ses, - &router_cli_ses->rses_backend_ref[i], - &writebuf)) - { - router_cli_ses->rses_backend_ref[i].bref_mapped = true; - skygw_log_write(LOGFILE_DEBUG,"schemarouter: Received SHOW DATABASES reply from %s for session %p", - router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name, - router_cli_ses->rses_client_dcb->session); - } - else - { - bref->map_queue = writebuf; - writebuf = NULL; - skygw_log_write(LOGFILE_DEBUG,"schemarouter: Received partial SHOW DATABASES reply from %s for session %p", - router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name, - router_cli_ses->rses_client_dcb->session); - } + writebuf = gwbuf_append(bref->map_queue, writebuf); + bref->map_queue = NULL; } + int rc = parse_showdb_response(router_cli_ses, + &router_cli_ses->rses_backend_ref[i], + &writebuf); + if (rc == 1) + { + router_cli_ses->rses_backend_ref[i].bref_mapped = true; + skygw_log_write(LOGFILE_DEBUG, "schemarouter: Received SHOW DATABASES reply from %s for session %p", + router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name, + router_cli_ses->rses_client_dcb->session); + } + else if (rc == 0) + { + bref->map_queue = writebuf; + writebuf = NULL; + skygw_log_write(LOGFILE_DEBUG, "schemarouter: Received partial SHOW DATABASES reply from %s for session %p", + router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name, + router_cli_ses->rses_client_dcb->session); + } + else + { + while (writebuf && (writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf)))); + DCB* client_dcb = NULL; + + if((router_cli_ses->init & INIT_FAILED) == 0) + { + skygw_log_write(LE, "Error: Duplicate databases found, closing session."); + client_dcb = router_cli_ses->rses_client_dcb; + + /** This is the first response to the database mapping which + * has duplicate database conflict. Set the initialization bitmask + * to INIT_FAILED */ + router_cli_ses->init |= INIT_FAILED; + + /** Send the client an error about duplicate databases + * if there is a queued query from the client. */ + if (router_cli_ses->queue) + { + GWBUF* error = modutil_create_mysql_err_msg(1, 0, + SCHEMA_ERR_DUPLICATEDB, SCHEMA_ERRSTR_DUPLICATEDB, + "Error: duplicate databases found on two different shards."); + + if (error) + { + client_dcb->func.write(client_dcb, error); + } + else + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "Error : Creating buffer for error message failed."))); + } + } + } + rses_end_locked_router_action(router_cli_ses); + if(client_dcb) + dcb_close(client_dcb); + return; + } + } if(BREF_IS_IN_USE(&bkrf[i]) && !BREF_IS_MAPPED(&bkrf[i])) @@ -2632,7 +2745,8 @@ static void clientReply ( { sprintf(errmsg + strlen(errmsg)," ([%lu]: DB not found on connect)",router_cli_ses->rses_client_dcb->session->ses_id); } - GWBUF* errbuff = modutil_create_mysql_err_msg(1,0,1049,"42000",errmsg); + GWBUF* errbuff = modutil_create_mysql_err_msg(1, 0, SCHEMA_ERR_DBNOTFOUND, + SCHEMA_ERRSTR_DBNOTFOUND, errmsg); router_cli_ses->rses_client_dcb->func.write(router_cli_ses->rses_client_dcb,errbuff); if(router_cli_ses->queue)