Same database on more than one server now cause the schemarouter session to be closed.

This commit is contained in:
Markus Makela
2015-09-16 18:37:01 +03:00
parent 56ed36ee76
commit bff6db96a1
4 changed files with 245 additions and 96 deletions

View File

@ -34,6 +34,7 @@
#include <modinfo.h>
#include <modutil.h>
#include <mysql_client_server_protocol.h>
#include <pcre.h>
#define DEFAULT_REFRESH_INTERVAL 30.0
@ -305,86 +306,112 @@ char* get_lenenc_str(void* data)
* @param rses Router client session
* @param target Target server where the database is
* @param buf GWBUF containing the result set
* @return True if the buffer contained a result set with a single column. All other responses return false.
* @return 1 if a complete response was received, 0 if a partial response was received
* and -1 if a database was found on more than one server.
*/
bool parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* bref, GWBUF** buffer)
int parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* bref, GWBUF** buffer)
{
unsigned char* ptr;
char* target = bref->bref_backend->backend_server->unique_name;
GWBUF* buf;
unsigned char* ptr;
char* target = bref->bref_backend->backend_server->unique_name;
GWBUF* buf;
bool error = false;
int rval = 0;
if(buffer == NULL || *buffer == NULL)
return false;
if (buffer == NULL || *buffer == NULL)
return -1;
buf = modutil_get_complete_packets(buffer);
buf = modutil_get_complete_packets(buffer);
if(buf == NULL)
return false;
ptr = (unsigned char*)buf->start;
if(PTR_IS_ERR(ptr))
if (buf == NULL)
return 0;
ptr = (unsigned char*) buf->start;
if (PTR_IS_ERR(ptr))
{
skygw_log_write(LOGFILE_TRACE,"schemarouter: SHOW DATABASES returned an error.");
skygw_log_write(LOGFILE_TRACE, "schemarouter: SHOW DATABASES returned an error.");
gwbuf_free(buf);
return true;
return -1;
}
if(bref->n_mapping_eof == 0)
{
/** Skip column definitions */
while(ptr < (unsigned char*)buf->end && !PTR_IS_EOF(ptr))
{
ptr += gw_mysql_get_byte3(ptr) + 4;
}
if (bref->n_mapping_eof == 0)
{
/** Skip column definitions */
while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr))
{
ptr += gw_mysql_get_byte3(ptr) + 4;
}
if(ptr >= (unsigned char*)buf->end)
{
skygw_log_write(LOGFILE_TRACE,"schemarouter: Malformed packet for SHOW DATABASES.");
*buffer = gwbuf_append(buf,*buffer);
return false;
}
if (ptr >= (unsigned char*) buf->end)
{
skygw_log_write(LOGFILE_TRACE, "schemarouter: Malformed packet for SHOW DATABASES.");
*buffer = gwbuf_append(buf, *buffer);
return false;
}
atomic_add(&bref->n_mapping_eof,1);
/** Skip first EOF packet */
ptr += gw_mysql_get_byte3(ptr) + 4;
}
if(bref->n_mapping_eof == 1)
{
while(ptr < (unsigned char*)buf->end && !PTR_IS_EOF(ptr))
{
int payloadlen = gw_mysql_get_byte3(ptr);
int packetlen = payloadlen + 4;
char* data = get_lenenc_str(ptr+4);
atomic_add(&bref->n_mapping_eof, 1);
/** Skip first EOF packet */
ptr += gw_mysql_get_byte3(ptr) + 4;
}
if(data)
{
if(hashtable_add(rses->dbhash,data,target))
{
skygw_log_write(LOGFILE_TRACE,"schemarouter: <%s, %s>",target,data);
}
free(data);
}
ptr += packetlen;
}
}
if(ptr < (unsigned char*)buf->end && PTR_IS_EOF(ptr) &&
bref->n_mapping_eof == 1)
{
atomic_add(&bref->n_mapping_eof,1);
skygw_log_write(LOGFILE_TRACE,"schemarouter: SHOW DATABASES fully received from %s.",
bref->bref_backend->backend_server->unique_name);
}
else
{
skygw_log_write(LOGFILE_TRACE,"schemarouter: SHOW DATABASES partially received from %s.",
bref->bref_backend->backend_server->unique_name);
}
if (bref->n_mapping_eof == 1)
{
while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr))
{
int payloadlen = gw_mysql_get_byte3(ptr);
int packetlen = payloadlen + 4;
char* data = get_lenenc_str(ptr + 4);
gwbuf_free(buf);
if (data)
{
if (hashtable_add(rses->dbhash, data, target))
{
skygw_log_write(LOGFILE_TRACE, "schemarouter: <%s, %s>", target, data);
}
else
{
int ovector[24];
const int ovec_count = 24;
return bref->n_mapping_eof == 2;
if (!(hashtable_fetch(rses->router->ignored_dbs, data) ||
(rses->router->ignore_regex &&
pcre_exec(rses->router->ignore_regex, NULL, (const char*) data,
strlen(data), 0, 0, ovector, ovec_count) >= 0)))
{
error = true;
skygw_log_write(LE, "Error: Database '%s' found on servers '%s' and '%s' for user %s@%s.",
data, target, hashtable_fetch(rses->dbhash, data),
rses->rses_client_dcb->user,
rses->rses_client_dcb->remote);
}
}
free(data);
}
ptr += packetlen;
}
}
if (ptr < (unsigned char*) buf->end && PTR_IS_EOF(ptr) &&
bref->n_mapping_eof == 1)
{
atomic_add(&bref->n_mapping_eof, 1);
skygw_log_write(LOGFILE_TRACE, "schemarouter: SHOW DATABASES fully received from %s.",
bref->bref_backend->backend_server->unique_name);
}
else
{
skygw_log_write(LOGFILE_TRACE, "schemarouter: SHOW DATABASES partially received from %s.",
bref->bref_backend->backend_server->unique_name);
}
gwbuf_free(buf);
if (error)
rval = -1;
else if (bref->n_mapping_eof == 2)
rval = 1;
return rval;
}
/**
@ -718,10 +745,26 @@ createInstance(SERVICE *service, char **options)
CONFIG_PARAMETER* conf;
int nservers;
int i;
CONFIG_PARAMETER* param;
if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
return NULL;
}
if((router->ignored_dbs = hashtable_alloc(100, hashkeyfun, hashcmpfun)) == NULL)
{
skygw_log_write(LE,"Error: Memory allocation failed when allocating schemarouter database ignore list.");
free(router);
return NULL;
}
hashtable_memory_fns(router->ignored_dbs,(HASHMEMORYFN)strdup,
NULL,
(HASHMEMORYFN)free,
NULL);
/** Add default system databases to ignore */
hashtable_add(router->ignored_dbs,"mysql","");
hashtable_add(router->ignored_dbs,"information_schema","");
hashtable_add(router->ignored_dbs,"performance_schema","");
router->service = service;
router->schemarouter_config.max_sescmd_hist = 0;
router->schemarouter_config.last_refresh = time(NULL);
@ -733,7 +776,7 @@ createInstance(SERVICE *service, char **options)
router->stats.n_sescmd = 0;
router->stats.ses_longest = 0;
router->stats.ses_shortest = (double)((unsigned long)(~0));
spinlock_init(&router->lock);
spinlock_init(&router->lock);
/** Calculate number of servers */
server = service->dbref;
@ -747,6 +790,36 @@ createInstance(SERVICE *service, char **options)
service->users_from_all = true;
}
if((param = config_get_param(conf,"ignore_databases_regex")))
{
const char* errptr;
int erroffset;
pcre* re = pcre_compile(param->value, 0, &errptr, &erroffset, NULL);
if(re == NULL)
{
skygw_log_write(LE, "Error: Regex compilation failed at %d for regex '%s': %s",
erroffset, param->value, errptr);
hashtable_free(router->ignored_dbs);
free(router);
return NULL;
}
router->ignore_regex = re;
}
if((param = config_get_param(conf,"ignore_databases")))
{
char *sptr, *tok, *val = config_clean_string_list(param->value);
tok = strtok_r(val, ",", &sptr);
while(tok)
{
hashtable_add(router->ignored_dbs, tok, "");
tok = strtok_r(NULL, ",", &sptr);
}
}
bool failure = false;
for(i=0;options && options[i];i++)
@ -2078,7 +2151,8 @@ static int routeQuery(
{
sprintf(errbuf + strlen(errbuf)," ([%lu]: DB change failed)",router_cli_ses->rses_client_dcb->session->ses_id);
}
GWBUF* error = modutil_create_mysql_err_msg(1, 0, 1049, "42000", errbuf);
GWBUF* error = modutil_create_mysql_err_msg(1, 0, SCHEMA_ERR_DBNOTFOUND,
SCHEMA_ERRSTR_DBNOTFOUND, errbuf);
if (error == NULL)
{
@ -2564,31 +2638,70 @@ static void clientReply (
for(i = 0; i < router_cli_ses->rses_nbackends; i++)
{
if(bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i]))
{
if (bref->map_queue)
{
if(bref->map_queue)
{
writebuf = gwbuf_append(bref->map_queue,writebuf);
bref->map_queue = NULL;
}
if(parse_showdb_response(router_cli_ses,
&router_cli_ses->rses_backend_ref[i],
&writebuf))
{
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Received SHOW DATABASES reply from %s for session %p",
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
router_cli_ses->rses_client_dcb->session);
}
else
{
bref->map_queue = writebuf;
writebuf = NULL;
skygw_log_write(LOGFILE_DEBUG,"schemarouter: Received partial SHOW DATABASES reply from %s for session %p",
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
router_cli_ses->rses_client_dcb->session);
}
writebuf = gwbuf_append(bref->map_queue, writebuf);
bref->map_queue = NULL;
}
int rc = parse_showdb_response(router_cli_ses,
&router_cli_ses->rses_backend_ref[i],
&writebuf);
if (rc == 1)
{
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
skygw_log_write(LOGFILE_DEBUG, "schemarouter: Received SHOW DATABASES reply from %s for session %p",
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
router_cli_ses->rses_client_dcb->session);
}
else if (rc == 0)
{
bref->map_queue = writebuf;
writebuf = NULL;
skygw_log_write(LOGFILE_DEBUG, "schemarouter: Received partial SHOW DATABASES reply from %s for session %p",
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
router_cli_ses->rses_client_dcb->session);
}
else
{
while (writebuf && (writebuf = gwbuf_consume(writebuf, gwbuf_length(writebuf))));
DCB* client_dcb = NULL;
if((router_cli_ses->init & INIT_FAILED) == 0)
{
skygw_log_write(LE, "Error: Duplicate databases found, closing session.");
client_dcb = router_cli_ses->rses_client_dcb;
/** This is the first response to the database mapping which
* has duplicate database conflict. Set the initialization bitmask
* to INIT_FAILED */
router_cli_ses->init |= INIT_FAILED;
/** Send the client an error about duplicate databases
* if there is a queued query from the client. */
if (router_cli_ses->queue)
{
GWBUF* error = modutil_create_mysql_err_msg(1, 0,
SCHEMA_ERR_DUPLICATEDB, SCHEMA_ERRSTR_DUPLICATEDB,
"Error: duplicate databases found on two different shards.");
if (error)
{
client_dcb->func.write(client_dcb, error);
}
else
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Error : Creating buffer for error message failed.")));
}
}
}
rses_end_locked_router_action(router_cli_ses);
if(client_dcb)
dcb_close(client_dcb);
return;
}
}
if(BREF_IS_IN_USE(&bkrf[i]) &&
!BREF_IS_MAPPED(&bkrf[i]))
@ -2632,7 +2745,8 @@ static void clientReply (
{
sprintf(errmsg + strlen(errmsg)," ([%lu]: DB not found on connect)",router_cli_ses->rses_client_dcb->session->ses_id);
}
GWBUF* errbuff = modutil_create_mysql_err_msg(1,0,1049,"42000",errmsg);
GWBUF* errbuff = modutil_create_mysql_err_msg(1, 0, SCHEMA_ERR_DBNOTFOUND,
SCHEMA_ERRSTR_DBNOTFOUND, errmsg);
router_cli_ses->rses_client_dcb->func.write(router_cli_ses->rses_client_dcb,errbuff);
if(router_cli_ses->queue)